Python CSV Pipelines Need Schema Guards, Not Hope
CSV imports fail silently when columns shift or names drift. A schema gate catches structural problems before corrupted data reaches downstream jobs.
Step 1: Declare expected columns and types
EXPECTED = {
"user_id": str,
"event_time": str,
"score": float,
}
Step 2: Validate header and parse failures explicitly
def validate_header(cols):
missing = [c for c in EXPECTED if c not in cols]
if missing:
raise ValueError(f"missing columns: {missing}")
Step 3: Quarantine bad rows instead of dropping silently
good, bad = [], []
for row in rows:
try:
good.append(parse(row))
except Exception as e:
bad.append((row, str(e)))
Common pitfall
Auto-filling missing fields with defaults during ingest. It hides upstream breakage.
What to check
- Header mismatches fail the run early.
- Bad-row report is stored for review.
- Downstream jobs receive only validated rows.