# filename: scripts/run_ingests.py import argparse import subprocess import json from pathlib import Path from datetime import datetime import sys import jsonschema # New utilities for manifest creation & central path logic (non-breaking addition) try: from data_pipeline.common.manifest import generate_manifest from data_pipeline.common.config import today_str except Exception: generate_manifest = None # type: ignore today_str = lambda: datetime.now().strftime("%Y-%m-%d") # fallback def run_script(script_path, args=None): print(f"\n▶️ Running {script_path.name}...") cmd = ["python", str(script_path)] + (args or []) result = subprocess.run(cmd, capture_output=True, text=True) if result.returncode != 0: print(f"❌ {script_path.name} failed:\n{result.stderr or result.stdout}") else: print(result.stdout) def validate_output(output_file): path = Path(output_file) if not path.exists(): print(f"⚠️ {path.name} not found.") return try: with open(path, "r") as f: records = json.load(f) except Exception as e: print(f"⚠️ Failed to load {path.name}: {e}") return if not records: print(f"⚠️ {path.name} is empty.") return first, last = records[0], records[-1] times = [r.get("time_ka") for r in records if "time_ka" in r] c_vals = [r.get("C_prime") or r.get("C") for r in records if (r.get("C_prime") or r.get("C")) is not None] print(f"✅ {path.name}: {len(records)} records") print(f" Time range: {min(times)} → {max(times)} ka") print(f" First record: {first}") print(f" Last record: {last}") if c_vals: print(f" C': min={min(c_vals):.3f}, max={max(c_vals):.3f}") def main(): parser = argparse.ArgumentParser(description="Run all ingests and validate outputs.") parser.add_argument("--force", action="store_true", help="Overwrite processed/latest outputs for all datasets.") parser.add_argument("--no-manifest", action="store_true", help="Skip run manifest generation.") parser.add_argument("--skip-validate", action="store_true", help="Skip schema validation stage.") parser.add_argument("--soft-validate", action="store_true", help="Do not exit non-zero on validation failure.") args = parser.parse_args() base_dir = Path(__file__).resolve().parent ingest_dir = base_dir / "ingest" processed_dir = base_dir.parent / "processed" latest_dir = processed_dir / "latest" scripts = [ ingest_dir / "ingest_padm2m.py", ingest_dir / "ingest_ggf100k.py", ingest_dir / "ingest_cals10k_fortran.py", # prefer Fortran-backed ingest ingest_dir / "ingest_2206.py", ] latest_outputs = [ latest_dir / "padm2m_charlie_index.json", latest_dir / "ggf100k_charlie_index.json", latest_dir / "cals10k_charlie_index.json", latest_dir / "2206_charlie_index.json", ] for script, latest_out in zip(scripts, latest_outputs): # If --force passed to runner, always forward it; otherwise only when latest exists fwd_args = ["--force"] if (args.force or latest_out.exists()) else [] run_script(script, args=fwd_args) # Validate latest output after run validate_output(latest_out) # Generate a run manifest (dated dir) for provenance if not args.no_manifest and generate_manifest: date = today_str() try: manifest_path = generate_manifest(["padm2m", "ggf100k", "cals10k", "2206"], date=date) print(f"\n🗂 Run manifest written: {manifest_path}") except Exception as e: print(f"⚠️ Manifest generation failed: {e}") # Inline schema validation (programmatic) unless skipped if not args.skip_validate: try: from data_pipeline.common.config import list_latest_charlie_indices schema_path = Path(__file__).resolve().parents[1] / "schema" / "charlie_record.schema.json" with open(schema_path, "r") as sf: schema = json.load(sf) validator = jsonschema.Draft7Validator(schema) v_errors = 0 for f in list_latest_charlie_indices(): try: data = json.loads(Path(f).read_text()) except Exception as e: print(f"❌ {f.name}: load error {e}") v_errors += 1 continue for idx, rec in enumerate(data if isinstance(data, list) else []): for err in validator.iter_errors(rec): if v_errors < 10: print(f"❌ {f.name} rec {idx}: {err.message}") v_errors += 1 if v_errors: print(f"Schema validation errors: {v_errors}") if not args.soft_validate: print("Exiting due to validation failures.") sys.exit(3) else: print("✅ Schema validation passed (inline)") except Exception as e: print(f"⚠️ Inline validation failed to execute: {e}") print("\nTip: validate schema with: python data_pipeline/scripts/validate_charlie.py --latest") if __name__ == "__main__": main()