""" features_of_interest.py — Reliford Principle Extracts and catalogs features of interest ("wow"/"weird"/"anomalous") from processed Charlie index datasets. - Scans merged/resampled Charlie index JSONs for outliers, jumps, and unusual patterns in C and C′. - Flags features by type (outlier, jump, discontinuity, etc.), dataset(s), time, values, and provenance. - Outputs a reproducible features_of_interest.json for review and future cross-checking. - Designed for modular extension as new criteria or datasets are added. Usage: python features_of_interest.py --merged --out [--bin ] [--thresholds ...] """ import argparse import json from pathlib import Path import numpy as np from typing import Any, Dict, List, cast # --- Visualization imports --- import matplotlib.pyplot as plt import matplotlib.patches as mpatches def load_merged_charlie(path): with open(path, 'r') as f: data = json.load(f) # If data is a list of dicts (row-oriented), convert to expected columnar format if isinstance(data, list): # Find all dataset keys (e.g., C_cals10k, C_prime_cals10k) all_keys = set() for row in data: all_keys.update(row.keys()) time_key = 'time_ka' dataset_keys = [k for k in all_keys if k != time_key] # Parse dataset names datasets = set() for k in dataset_keys: if k.startswith('C_'): datasets.add(k[2:]) elif k.startswith('C_prime_'): datasets.add(k[8:]) datasets = list(datasets) # Build columnar structure merged: Dict[str, Any] = {'time_ka': [row[time_key] for row in data], 'datasets': datasets} for ds in datasets: merged[ds] = { 'C': [row.get(f'C_{ds}', float('nan')) for row in data], 'C_prime': [row.get(f'C_prime_{ds}', float('nan')) for row in data], 'provenance': None } return merged return data def detect_outliers(times, values, thresh=3.0): arr = np.array(values) median = np.nanmedian(arr) mad = np.nanmedian(np.abs(arr - median)) if mad == 0: return [] z = 0.6745 * (arr - median) / mad return [i for i, zi in enumerate(z) if abs(zi) > thresh] def detect_jumps(times, values, thresh=3.0): arr = np.array(values) diffs = np.diff(arr) std = np.nanstd(diffs) if std == 0: return [] jumps = [i+1 for i, d in enumerate(diffs) if abs(d) > thresh * std] return jumps def extract_features(merged, thresholds): features = [] times = merged['time_ka'] dataset = thresholds.get('dataset') if dataset is None: raise ValueError('No dataset specified for feature extraction.') if dataset not in merged['datasets']: raise ValueError(f"Dataset '{dataset}' not found in merged file. Available: {merged['datasets']}") for key in ['C', 'C_prime']: values = merged[dataset][key] # Outliers outlier_idx = detect_outliers(times, values, thresholds['outlier_z']) for i in outlier_idx: features.append({ 'type': 'outlier', 'dataset': dataset, 'quantity': key, 'time_ka': times[i], 'value': values[i], 'provenance': merged[dataset].get('provenance'), }) # Jumps jump_idx = detect_jumps(times, values, thresholds['jump_z']) for i in jump_idx: features.append({ 'type': 'jump', 'dataset': dataset, 'quantity': key, 'time_ka': times[i], 'value': values[i], 'provenance': merged[dataset].get('provenance'), }) return features def main(): parser = argparse.ArgumentParser(description='Extract features of interest from merged Charlie index JSON.') parser.add_argument('--merged', required=True, help='Path to merged_charlie_*.json') parser.add_argument('--out', required=True, help='Output features_of_interest.json') parser.add_argument('--bin', type=float, default=None, help='Bin size (ka)') parser.add_argument('--outlier_z', type=float, default=3.5, help='Z-score threshold for outliers (MAD-based)') parser.add_argument('--jump_z', type=float, default=3.0, help='Stddev threshold for jumps') parser.add_argument('--dataset', required=True, help='Dataset to analyze (e.g., ggf100k, padm2m, cals10k)') parser.add_argument('--plot', action='store_true', help='Show a plot of C, C_prime, and detected features') args = parser.parse_args() merged = load_merged_charlie(args.merged) ds = args.dataset # Type assertions for Pylance time_ka = cast(List[float], merged['time_ka']) ds_dict = cast(Dict[str, List[float]], merged[ds]) # Debug: Ensure merged[ds] is a dict with 'C' and 'C_prime' keys if not (isinstance(ds_dict, dict) and 'C' in ds_dict and 'C_prime' in ds_dict): print(f"[DEBUG] merged[ds] type: {type(ds_dict)}, value: {repr(ds_dict)}") raise RuntimeError(f"merged[ds] is not a dict with 'C' and 'C_prime' keys. Got: {type(ds_dict)} {ds_dict}") thresholds = {'outlier_z': args.outlier_z, 'jump_z': args.jump_z, 'dataset': args.dataset} features = extract_features(merged, thresholds) out = { 'bin_ka': args.bin, 'merged_path': str(Path(args.merged).resolve()), 'dataset': args.dataset, 'features': features, 'thresholds': thresholds } with open(args.out, 'w') as f: json.dump(out, f, indent=2) print(f"Wrote {len(features)} features to {args.out}") # --- Output cleaned (non-outlier) dataset and outlier log --- outlier_indices = { 'C': set(), 'C_prime': set() } for feat in features: if feat['type'] == 'outlier': outlier_indices[feat['quantity']].add(feat['time_ka']) n = len(time_ka) outlier_times_C = set(outlier_indices['C']) outlier_times_Cp = set(outlier_indices['C_prime']) C_list = cast(List[float], ds_dict['C']) Cp_list = cast(List[float], ds_dict['C_prime']) def nan_to_none(x): return None if isinstance(x, float) and np.isnan(x) else x cleaned = { 'time_ka': list(time_ka), 'C': [ nan_to_none(C_list[i]) if time_ka[i] not in outlier_times_C else None for i in range(n) ], 'C_prime': [ nan_to_none(Cp_list[i]) if time_ka[i] not in outlier_times_Cp else None for i in range(n) ] } cleaned_path = str(Path(args.out).with_name(f"cleaned_{ds}.json")) with open(cleaned_path, 'w') as f: json.dump(cleaned, f, indent=2) print(f"Wrote cleaned dataset to {cleaned_path}") # --- Visualization --- if args.plot: plot_features(time_ka, C_list, Cp_list, features, ds, args) def plot_features(time_ka, C, C_prime, features, dataset, args): # Super wide figure for zooming fig, ax = plt.subplots(figsize=(32, 6)) ax.plot(time_ka, C, label='C', color='tab:blue') ax.plot(time_ka, C_prime, label="C'", color='tab:orange') # Highlight outliers and jumps outlier_times_C = [f['time_ka'] for f in features if f['type']=='outlier' and f['quantity']=='C'] outlier_vals_C = [f['value'] for f in features if f['type']=='outlier' and f['quantity']=='C'] outlier_times_Cp = [f['time_ka'] for f in features if f['type']=='outlier' and f['quantity']=='C_prime'] outlier_vals_Cp = [f['value'] for f in features if f['type']=='outlier' and f['quantity']=='C_prime'] jump_times_C = [f['time_ka'] for f in features if f['type']=='jump' and f['quantity']=='C'] jump_vals_C = [f['value'] for f in features if f['type']=='jump' and f['quantity']=='C'] jump_times_Cp = [f['time_ka'] for f in features if f['type']=='jump' and f['quantity']=='C_prime'] jump_vals_Cp = [f['value'] for f in features if f['type']=='jump' and f['quantity']=='C_prime'] # Outliers: red X, Jumps: green triangle ax.scatter(outlier_times_C, outlier_vals_C, color='red', marker='x', s=60, label='C outlier') ax.scatter(outlier_times_Cp, outlier_vals_Cp, color='red', marker='o', s=60, label="C' outlier") ax.scatter(jump_times_C, jump_vals_C, color='green', marker='^', s=60, label='C jump') ax.scatter(jump_times_Cp, jump_vals_Cp, color='green', marker='v', s=60, label="C' jump") ax.set_xlabel('Time (ka)') ax.set_ylabel('Value') ax.set_title(f"Features of Interest: {dataset}") ax.legend(loc='best', fontsize=9) ax.grid(True, alpha=0.3) fig.tight_layout() # Autosave to PNG in same directory as output JSON if hasattr(args, 'out'): out_path = Path(args.out) plot_path = out_path.with_name(f"features_of_interest_{dataset}_plot.png") fig.savefig(plot_path, dpi=150) print(f"Plot autosaved to {plot_path}") plt.show() if __name__ == "__main__": main()