Skip to content
Draft
Changes from 1 commit
Commits
Show all changes
69 commits
Select commit Hold shift + click to select a range
e6940e9
make diff of time series
May 4, 2025
8337e9a
`AliasDataFrame` is a small utility that extends `pandas.DataFrame` f…
May 29, 2025
8ddfbf7
adding perfmonitor
May 31, 2025
350f786
adding PerfromanceLogger extracted from calibration code
May 31, 2025
1ba0686
supressing linter warning
May 31, 2025
4a7d520
Add dtype support and alias dependency graph to AliasDataFrame
Jun 1, 2025
54de3fd
Add support for dtype persistence and alias filtering in save/load
Jun 1, 2025
b8e241e
Save aliases directly to pyarrow metadata
Jun 1, 2025
fcb9bb9
add FormulaLinearModel.py used for the dEdx and distortion calibration
Jun 2, 2025
cfe72d4
add FormulaLinearModel.py used for the dEdx and distortion calibration
Jun 2, 2025
9087f54
special treatment for constants - should be enver materialized but used
Jun 2, 2025
60e26cb
special treatment for constants
Jun 2, 2025
b188456
special treatment for constants
Jun 2, 2025
f77f57c
`Add ROOT SetAlias export and Python-to-ROOT AST translation for alia…
Jun 3, 2025
664db50
Add dependency-aware alias materialization with optional cleanup and …
Jun 4, 2025
679141b
Extended usnit test for the sub_frames
Jun 9, 2025
6561696
Add extended unit tests for AliasDataFrame including lazy join and er…
Jun 9, 2025
3aae8ee
fixed - Circular dependency detection
Jun 9, 2025
6759c26
fixing all unit test - except oth the automatic materialization
Jun 9, 2025
071a860
fixing automatic materialization test + working in the distrtion cali…
Jun 9, 2025
ea7c0d6
fixing circular depndency test - all test are OK now
Jun 9, 2025
7389cda
adding unit test for the export_import tree
Jun 10, 2025
da90789
add failing test for export/import of the subframes
Jun 10, 2025
64b27cb
make test_export_tree_read_tree_with_subframe already OK
Jun 10, 2025
2a6bd71
adding metadata to all trees
Jun 11, 2025
9b7a038
Updated documentation
Jun 11, 2025
c2e7ca6
AliasDataFrame: add index-based subframe join and robust error handling
Jun 11, 2025
3753500
AliasDataFrame: Add __getattr__ support for subframes + docstring/typ…
Jun 12, 2025
718259a
AliasDataFrame: Add support for __getattr__ access to subframes and c…
Jun 12, 2025
d55b796
Refactor GroupByRegressor with robust fit logic, dtype casting, and u…
Jun 12, 2025
c45e5d0
Fix: ensure regression outputs are preserved for underpopulated groups
Jun 12, 2025
4f4f425
Fix NaN handling in robust regression and enable predictor-specific m…
Jun 12, 2025
22ce23c
Add NaN filtering and robust fit fallback logic to GroupByRegressor
Jun 12, 2025
2785bc4
Add flexible regression model selection via `fitter` parameter
Jun 12, 2025
c3d3617
* removing pylint warning
Jun 13, 2025
67e3699
* adding __init__.py
Jun 13, 2025
27c9fbe
* adding protection for infinite recursion
Jun 13, 2025
e9da107
pylint fix
Jun 13, 2025
d4d20e6
adding test for the logger
Jun 23, 2025
4d44bb2
adding conversions to the function list
Jun 25, 2025
cb4b5d1
adding chunksize and compression as argument
Jun 27, 2025
87fa521
adding chunksize and compression as argument
Jun 27, 2025
4ef6973
adding df drawing interface similar to the tree::Draw
Aug 14, 2025
512323d
docs(quantile_fit_nd): add v3.1 Δq-centered ND quantile fitting spec
Oct 11, 2025
257d2ea
Commit latest working version of AliasDataFrame
Oct 11, 2025
fc54430
Commit latest working version of perfoemance_logger.py
Oct 11, 2025
161f0f0
Commit latest working version of groupby_regression.py
Oct 11, 2025
53db0b8
feat(DataFrameUtils): Enhance docstrings and error handling for scatt…
Oct 11, 2025
0ae7eac
feat(dfextensions): add ND quantile fitting (Δq-centered) + tests & b…
Oct 11, 2025
273d6f8
test(dfextensions): fix quantile ND tests vs synthetic truth; add rob…
Oct 11, 2025
6d65a12
fix(quantile_fit_nd): exclude q_center from nuisance axes; silence si…
Oct 11, 2025
b4b5b41
fix(dfextensions/quantile_fit_nd): evaluator axis bug + window-local …
Oct 11, 2025
a578c17
tests(quantile_fit_nd): snapshot pre-fix state with rich diagnostics …
Oct 11, 2025
5d9cacd
fix(quantile_fit_nd): do not floor degenerate Δq windows; keep NaN an…
Oct 11, 2025
30b7ee7
tests(quantile_fit_nd): handle Poisson via randomized PIT pre-processing
Oct 11, 2025
12d5fe4
docs(quantile_fit_nd): add Discrete Inputs policy and utilities
Oct 11, 2025
1b2ed00
bench(quantile_fit_nd): correct scaling assertions — α_b≈−0.5, α_rt≈0.0
Oct 11, 2025
8625857
docs(quantile_fit_nd): add contextLLM.md (cold-start guide + policies)
Oct 11, 2025
2b27e47
docs(quantile_fit_nd): add contextLLM.md (cold-start guide + policies)
Oct 11, 2025
ec9f424
Forgottend commit of refernce test and bench log
Oct 11, 2025
cd63f42
feat(bench): add single-file GroupBy regression benchmark + reports
Oct 22, 2025
57b3293
docs(groupby_regression): add Performance & Benchmarking section + fi…
Oct 22, 2025
7d215d3
docs(bench): set default to 5k groups; document 30% outlier scenario
Oct 22, 2025
bb51bc0
docs(restartContext): update with 5k/5 default, 30% outliers, and lev…
Oct 22, 2025
5c9d14b
feat(groupby_regression): add optional per-group diagnostics (diag, d…
Oct 22, 2025
aa024b0
feat(bench): integrate class-level diagnostics summary into benchmark…
Oct 23, 2025
a71cc4d
docs(restartContext): record diagnostics integration and real-data va…
Oct 23, 2025
cc1ecb4
docs(restartContext): record diagnostics integration and real-data va…
Oct 23, 2025
5cf7431
use faster compression by default
Oct 23, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
feat(groupby_regression): add optional per-group diagnostics (diag, d…
…iag_prefix)

- process_group_robust: record n_refits, frac_rejected, hat_max, cond_xtx, time_ms, n_rows (only when diag=True)
- make_parallel_fit: new args diag / diag_prefix (default off; no behavior change)
- add summarize_diagnostics(dfGB) helper for quick triage
  • Loading branch information
miranov25 committed Oct 22, 2025
commit 5c9d14b2071a6b2cb4fe756826e7c3a517929cb2
295 changes: 292 additions & 3 deletions UTILS/dfextensions/groupby_regression.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ def make_linear_fit(
return df, dfGB

@staticmethod
def process_group_robust(
def process_group_robustBackup(
key: tuple,
df_group: pd.DataFrame,
gb_columns: List[str],
Expand All @@ -114,7 +114,7 @@ def process_group_robust(
sigmaCut: float = 4,
fitter: Union[str, Callable] = "auto"
) -> dict:
# TODO 0handle the case os singl gb column
# TODO handle the case os single gb column
group_dict = dict(zip(gb_columns, key))
predictors = []
if isinstance(weights, str) and weights not in df_group.columns:
Expand Down Expand Up @@ -213,6 +213,248 @@ def process_group_robust(

return group_dict

@staticmethod
def process_group_robust(
key: tuple,
df_group: pd.DataFrame,
gb_columns: List[str],
fit_columns: List[str],
linear_columns0: List[str],
median_columns: List[str],
weights: str,
minStat: List[int],
sigmaCut: float = 4,
fitter: Union[str, Callable] = "auto",
# --- NEW (optional) diagnostics ---
diag: bool = False,
diag_prefix: str = "diag_",
) -> dict:
"""
Per-group robust/OLS fit with optional diagnostics.

Diagnostics (only when diag=True; added once per group into the result dict):
- {diag_prefix}n_refits : int, number of extra fits after the initial one (0 or 1 in this implementation)
- {diag_prefix}frac_rejected : float, fraction rejected by sigmaCut at final mask
- {diag_prefix}hat_max : float, max leverage proxy via QR (max rowwise ||Q||^2)
- {diag_prefix}cond_xtx : float, condition number of X^T X
- {diag_prefix}time_ms : float, wall-time per group (ms) excluding leverage/cond computation
- {diag_prefix}n_rows : int, number of rows in the group (after dropna for predictors/target/weights)

Notes:
- n_refits counts *additional* iterations beyond the first fit. With this one-pass sigmaCut scheme,
it will be 0 (no re-fit) or 1 (re-fit once on inliers).
"""
import time
import numpy as np
import logging
from sklearn.linear_model import HuberRegressor, LinearRegression

# TODO handle the case of single gb column
group_dict = dict(zip(gb_columns, key))

if isinstance(weights, str) and weights not in df_group.columns:
raise ValueError(f"Weight column '{weights}' not found in input DataFrame.")

# Select predictors that meet per-predictor minStat (based on non-null rows with target+weights)
predictors: List[str] = []
for i, col in enumerate(linear_columns0):
required_columns = [col] + fit_columns + [weights]
df_valid = df_group[required_columns].dropna()
if len(df_valid) >= minStat[i]:
predictors.append(col)

# Prepare diagnostics state (group-level)
n_refits_group = 0 # extra fits after initial fit
frac_rejected_group = np.nan
hat_max_group = np.nan
cond_xtx_group = np.nan
time_ms_group = np.nan
n_rows_group = int(len(df_group)) # raw group size (will refine to cleaned size later)

# Start timing the *fitting* work (we will stop before leverage/cond to avoid polluting time)
t0_group = time.perf_counter()

# Loop over target columns
for target_col in fit_columns:
try:
if not predictors:
# No valid predictors met minStat; emit NaNs for this target
for col in linear_columns0:
group_dict[f"{target_col}_slope_{col}"] = np.nan
group_dict[f"{target_col}_err_{col}"] = np.nan
group_dict[f"{target_col}_intercept"] = np.nan
group_dict[f"{target_col}_rms"] = np.nan
group_dict[f"{target_col}_mad"] = np.nan
continue

subset_columns = predictors + [target_col, weights]
df_clean = df_group.dropna(subset=subset_columns)
if len(df_clean) < min(minStat):
# Not enough rows to fit
for col in linear_columns0:
group_dict[f"{target_col}_slope_{col}"] = np.nan
group_dict[f"{target_col}_err_{col}"] = np.nan
group_dict[f"{target_col}_intercept"] = np.nan
group_dict[f"{target_col}_rms"] = np.nan
group_dict[f"{target_col}_mad"] = np.nan
continue

# Update cleaned group size for diagnostics
n_rows_group = int(len(df_clean))

X = df_clean[predictors].to_numpy(copy=False)
y = df_clean[target_col].to_numpy(copy=False)
w = df_clean[weights].to_numpy(copy=False)

# Choose model
if callable(fitter):
model = fitter()
elif fitter == "robust":
model = HuberRegressor(tol=1e-4)
elif fitter == "ols":
model = LinearRegression()
else:
model = HuberRegressor(tol=1e-4)

# Initial fit
try:
model.fit(X, y, sample_weight=w)
except Exception as e:
logging.warning(
f"{model.__class__.__name__} failed for {target_col} in group {key}: {e}. "
f"Falling back to LinearRegression."
)
model = LinearRegression()
model.fit(X, y, sample_weight=w)

# Residuals and robust stats
predicted = model.predict(X)
residuals = y - predicted
rms = float(np.sqrt(np.mean(residuals ** 2)))
mad = float(np.median(np.abs(residuals)))

# One-pass sigmaCut masking (current implementation supports at most a single re-fit)
final_mask = None
if np.isfinite(mad) and mad > 0 and sigmaCut is not None and sigmaCut < np.inf:
mask = (np.abs(residuals) <= sigmaCut * mad)
if mask.sum() >= min(minStat):
# Re-fit on inliers
n_refits_group += 1 # <-- counts *extra* fits beyond the first
try:
model.fit(X[mask], y[mask], sample_weight=w[mask])
except Exception as e:
logging.warning(
f"{model.__class__.__name__} re-fit with outlier mask failed for {target_col} "
f"in group {key}: {e}. Falling back to LinearRegression."
)
model = LinearRegression()
model.fit(X[mask], y[mask], sample_weight=w[mask])

# Recompute residuals on full X (to report global rms/mad)
predicted = model.predict(X)
residuals = y - predicted
rms = float(np.sqrt(np.mean(residuals ** 2)))
mad = float(np.median(np.abs(residuals)))
final_mask = mask
else:
final_mask = np.ones_like(residuals, dtype=bool)
else:
final_mask = np.ones_like(residuals, dtype=bool)

# Parameter errors from final fit (on the design actually used to fit)
try:
if final_mask is not None and final_mask.any():
X_used = X[final_mask]
y_used = y[final_mask]
else:
X_used = X
y_used = y

n, p = X_used.shape
denom = n - p if n > p else 1e-9
s2 = float(np.sum((y_used - model.predict(X_used)) ** 2) / denom)
cov_matrix = np.linalg.inv(X_used.T @ X_used) * s2
std_errors = np.sqrt(np.diag(cov_matrix))
except np.linalg.LinAlgError:
std_errors = np.full(len(predictors), np.nan, dtype=float)

# Store results for this target
for col in linear_columns0:
if col in predictors:
idx = predictors.index(col)
group_dict[f"{target_col}_slope_{col}"] = float(model.coef_[idx])
group_dict[f"{target_col}_err_{col}"] = float(std_errors[idx]) if idx < len(std_errors) else np.nan
else:
group_dict[f"{target_col}_slope_{col}"] = np.nan
group_dict[f"{target_col}_err_{col}"] = np.nan

group_dict[f"{target_col}_intercept"] = float(model.intercept_) if hasattr(model, "intercept_") else np.nan
group_dict[f"{target_col}_rms"] = rms
group_dict[f"{target_col}_mad"] = mad

# Update group-level diagnostics that depend on the final mask
if diag:
# Capture timing up to here (pure fitting + residuals + errors); exclude leverage/cond below
time_ms_group = (time.perf_counter() - t0_group) * 1e3
if final_mask is not None and len(final_mask) > 0:
frac_rejected_group = 1.0 - (float(np.count_nonzero(final_mask)) / float(len(final_mask)))
else:
frac_rejected_group = np.nan

except Exception as e:
logging.warning(f"Robust regression failed for {target_col} in group {key}: {e}")
for col in linear_columns0:
group_dict[f"{target_col}_slope_{col}"] = np.nan
group_dict[f"{target_col}_err_{col}"] = np.nan
group_dict[f"{target_col}_intercept"] = np.nan
group_dict[f"{target_col}_rms"] = np.nan
group_dict[f"{target_col}_mad"] = np.nan

# Medians
for col in median_columns:
try:
group_dict[col] = df_group[col].median()
except Exception:
group_dict[col] = np.nan

# Compute leverage & conditioning proxies (kept OUTSIDE the timed span)
if diag:
try:
X_cols = [c for c in linear_columns0 if c in df_group.columns and c in predictors]
if X_cols:
X_diag = df_group[X_cols].dropna().to_numpy(dtype=np.float64, copy=False)
else:
X_diag = None

hat_max_group = np.nan
cond_xtx_group = np.nan
if X_diag is not None and X_diag.size and X_diag.shape[1] > 0:
# cond(X^T X)
try:
s = np.linalg.svd(X_diag.T @ X_diag, compute_uv=False)
cond_xtx_group = float(s[0] / s[-1]) if (s.size > 0 and s[-1] > 0) else float("inf")
except Exception:
cond_xtx_group = float("inf")
# leverage via QR
try:
Q, _ = np.linalg.qr(X_diag, mode="reduced")
hat_max_group = float(np.max(np.sum(Q * Q, axis=1)))
except Exception:
pass
except Exception:
pass

# Attach diagnostics (once per group)
group_dict[f"{diag_prefix}n_refits"] = int(n_refits_group)
group_dict[f"{diag_prefix}frac_rejected"] = float(frac_rejected_group) if np.isfinite(frac_rejected_group) else np.nan
group_dict[f"{diag_prefix}hat_max"] = float(hat_max_group) if np.isfinite(hat_max_group) else np.nan
group_dict[f"{diag_prefix}cond_xtx"] = float(cond_xtx_group) if np.isfinite(cond_xtx_group) else np.nan
group_dict[f"{diag_prefix}time_ms"] = float(time_ms_group) if np.isfinite(time_ms_group) else np.nan
group_dict[f"{diag_prefix}n_rows"] = int(n_rows_group)

return group_dict


@staticmethod
def make_parallel_fit(
df: pd.DataFrame,
Expand All @@ -229,7 +471,10 @@ def make_parallel_fit(
min_stat: List[int] = [10, 10],
sigmaCut: float = 4.0,
fitter: Union[str, Callable] = "auto",
batch_size: Union[int, None] = None # ← new argument
batch_size: Union[int, None] = None, # ← new argument
# --- NEW: diagnostics switch ---
diag: bool = False,
diag_prefix: str = "diag_"
) -> Tuple[pd.DataFrame, pd.DataFrame]:
"""
Perform grouped robust linear regression using HuberRegressor in parallel.
Expand Down Expand Up @@ -292,3 +537,47 @@ def make_parallel_fit(
df[f"{target_col}{suffix}"] += df[slope_col] * df[col]

return df, dfGB

def summarize_diagnostics(dfGB, diag_prefix: str = "diag_", top: int = 50):
"""
Quick look at diagnostic columns emitted by make_parallel_fit(..., diag=True).
Returns a dict of small DataFrames for top offenders, and prints a short summary.

Example:
summ = summarize_diagnostics(dfGB, top=20)
summ["slowest"].head()
"""
import pandas as pd
cols = {
"time": f"{diag_prefix}time_ms",
"refits": f"{diag_prefix}n_refits",
"rej": f"{diag_prefix}frac_rejected",
"lev": f"{diag_prefix}hat_max",
"cond": f"{diag_prefix}cond_xtx",
"nrows": f"{diag_prefix}n_rows",
}
missing = [c for c in cols.values() if c not in dfGB.columns]
if missing:
print("[diagnostics] Missing columns (did you run diag=True?):", missing)
return {}

summary = {}
# Defensive: numeric coerce
d = dfGB.copy()
for k, c in cols.items():
d[c] = pd.to_numeric(d[c], errors="coerce")

summary["slowest"] = d.sort_values(cols["time"], ascending=False).head(top)[list({*dfGB.columns[:len(dfGB.columns)//4], *cols.values()})]
summary["most_refits"] = d.sort_values(cols["refits"], ascending=False).head(top)[list({*dfGB.columns[:len(dfGB.columns)//4], *cols.values()})]
summary["most_rejected"] = d.sort_values(cols["rej"], ascending=False).head(top)[list({*dfGB.columns[:len(dfGB.columns)//4], *cols.values()})]
summary["highest_leverage"] = d.sort_values(cols["lev"], ascending=False).head(top)[list({*dfGB.columns[:len(dfGB.columns)//4], *cols.values()})]
summary["worst_conditioned"] = d.sort_values(cols["cond"], ascending=False).head(top)[list({*dfGB.columns[:len(dfGB.columns)//4], *cols.values()})]

# Console summary
print("[diagnostics] Groups:", len(dfGB))
print("[diagnostics] mean time (ms):", float(d[cols["time"]].mean()))
print("[diagnostics] pct with refits>0:", float((d[cols["refits"]] > 0).mean()) * 100.0)
print("[diagnostics] mean frac_rejected:", float(d[cols["rej"]].mean()))
print("[diagnostics] 99p cond_xtx:", float(d[cols["cond"]].quantile(0.99)))
print("[diagnostics] 99p hat_max:", float(d[cols["lev"]].quantile(0.99)))
return summary