Batch transforming 10k+ shapefiles without memory leaks Jump to heading
Processing legacy spatial datasets at scale requires strict resource isolation and deterministic schema enforcement. Government-scale ingestion volumes routinely trigger resident set size (RSS) growth when GDAL/OGR driver caches, lingering C-level file descriptors, and unbounded schema drift compound across sequential reads. Batch transforming 10k+ shapefiles without memory leaks demands a departure from monolithic DataFrame loads and reliance on implicit Python garbage collection.
The architecture below enforces explicit teardown, chunked I/O, and schema-first validation. It maintains stable memory footprints across multi-day ETL runs while preserving auditability and compliance alignment.
Driver Configuration & Memory Thresholds Jump to heading
GDAL/OGR maintains internal index buffers and geometry caches that persist across fiona or ogr sessions unless explicitly cleared. Set the following environment variables at process initialization to disable aggressive caching:
import os
os.environ["GDAL_CACHEMAX"] = "128"
os.environ["OGR_ENABLE_PARTIAL_REPROJECTION"] = "NO"
os.environ["CPL_DEBUG"] = "OFF"
os.environ["VSI_CACHE"] = "FALSE"
os.environ["GDAL_DISABLE_READDIR_ON_OPEN"] = "EMPTY_DIR"
Apply strict operational thresholds before iteration begins:
- Hard RSS ceiling: 2.0 GB per worker process
- Soft GC trigger: 1.5 GB sustained allocation
- Open descriptor limit: 256 concurrent file handles
- Cache flush interval: Every 75 processed files
Exceeding these limits on memory-constrained CI runners or state-managed VMs triggers swap thrashing. Explicit garbage collection and descriptor flushing prevent silent degradation.
Schema Mapping & Type Coercion Enforcement Jump to heading
Shapefiles lack native schema versioning. Field drift across directories is inevitable. Load a deterministic mapping configuration that defines target field names, data types, and fallback coercion rules. Reject files that deviate beyond acceptable mismatch thresholds to prevent silent data corruption.
# schema_mapping.yaml
target_schema:
- name: parcel_id
type: str
source_fields: ["PARCEL_ID", "PID", "APN"]
- name: land_use_code
type: str
source_fields: ["LU_CODE", "LANDUSE", "ZONING"]
- name: area_sqm
type: float
source_fields: ["AREA", "SQ_METERS", "ACRES"]
transform: "lambda x: x * 4046.86 if x else None"
validation:
max_field_mismatch_pct: 15
required_geometry: "Polygon"
Parse this configuration into a lookup dictionary before iteration. Validate each input file against the expected schema signature. Files failing the mismatch threshold are quarantined with a structured error log rather than forcing type coercion that breaks downstream joins. This deterministic mapping approach aligns with established practices in Automated Attribute Transformation & ETL Workflows where heuristic guessing is replaced by explicit rule evaluation.
Chunked Execution & Explicit Teardown Jump to heading
Process files in fixed batches of 75. This chunk size balances throughput with memory pressure. Each batch must complete with deterministic cleanup:
- Close dataset handles immediately after write completion
- Invoke
gc.collect()to reclaim cyclic references - Clear
fionadriver cache viafiona.envcontext exit - Log batch completion with memory delta metrics
Relying on Python’s reference counting alone leaves C-level allocations intact. Explicit teardown ensures Batch Schema Processing Pipelines remain stable during long-running government data harmonization cycles.
CRS Normalization & Geometry Validation Jump to heading
CRS mismatches and invalid geometries are primary causes of pipeline stalls. Enforce normalization rules before transformation:
- Reject files with missing or malformed
.prjfiles - Transform non-standard CRS to EPSG:3857 or EPSG:4326 using
pyproj - Validate geometry type against
required_geometry - Skip features with self-intersections or empty coordinates
Adhere to the OGC Simple Features specification for geometry validity checks. Invalid features must be logged and quarantined rather than silently dropped or forced into compliance.
Production-Ready Implementation Jump to heading
The following script integrates all constraints. It handles missing fields, CRS mismatches, memory thresholds, and CI failures. It is copy-paste ready for Unix/Linux CI environments.
import os
import gc
import glob
import yaml
import logging
import resource
from pathlib import Path
from typing import Dict, List, Any, Optional
import fiona
from fiona.transform import transform_geom
import pyproj
# Configure structured logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s | %(levelname)s | %(message)s"
)
logger = logging.getLogger("shapefile_etl")
# 1. Driver & Memory Configuration
os.environ["GDAL_CACHEMAX"] = "128"
os.environ["OGR_ENABLE_PARTIAL_REPROJECTION"] = "NO"
os.environ["CPL_DEBUG"] = "OFF"
os.environ["VSI_CACHE"] = "FALSE"
os.environ["GDAL_DISABLE_READDIR_ON_OPEN"] = "EMPTY_DIR"
def get_rss_mb() -> float:
"""Return current RSS in MB (Unix only)."""
return resource.getrusage(resource.RUSAGE_SELF).ru_maxrss / 1024
def load_schema_config(config_path: str) -> Dict[str, Any]:
with open(config_path, "r") as f:
return yaml.safe_load(f)
def validate_schema_match(src_schema: Dict[str, str], config: Dict[str, Any]) -> bool:
"""Check if source fields meet the mismatch threshold."""
target_fields = {f["name"] for f in config["target_schema"]}
source_fields = set(src_schema.keys())
matched = sum(1 for t in target_fields if any(
s in source_fields for s in next(
f["source_fields"] for f in config["target_schema"] if f["name"] == t
)
))
mismatch_pct = (1 - (matched / len(target_fields))) * 100
return mismatch_pct <= config["validation"]["max_field_mismatch_pct"]
def transform_record(record: Dict[str, Any], config: Dict[str, Any]) -> Dict[str, Any]:
"""Apply field mapping and type coercion."""
mapped = {}
for field in config["target_schema"]:
value = None
for src in field["source_fields"]:
if src in record["properties"]:
value = record["properties"][src]
break
if "transform" in field and value is not None:
value = eval(field["transform"])(value)
# Type coercion fallback
if field["type"] == "float" and value is not None:
try:
value = float(value)
except (ValueError, TypeError):
value = None
elif field["type"] == "str":
value = str(value) if value is not None else None
mapped[field["name"]] = value
return mapped
def process_batch(file_batch: List[Path], config: Dict[str, Any], output_dir: Path, quarantine_dir: Path):
"""Process a chunk of shapefiles with explicit teardown."""
for src_file in file_batch:
try:
with fiona.open(str(src_file), "r") as src:
if not validate_schema_match(src.schema["properties"], config):
raise ValueError("Schema mismatch exceeds threshold")
# CRS validation & transformation
target_crs = "EPSG:4326"
src_crs = src.crs
if src_crs is None:
raise RuntimeError("Missing .prj file")
# Prepare output schema
out_schema = {
"geometry": config["validation"]["required_geometry"],
"properties": {f["name"]: f["type"] for f in config["target_schema"]}
}
out_path = output_dir / f"{src_file.stem}_standardized.shp"
with fiona.open(
str(out_path), "w",
driver="ESRI Shapefile",
schema=out_schema,
crs=target_crs
) as dst:
for feat in src:
# Geometry validation & CRS transform
geom = feat["geometry"]
if geom is None or not geom.get("coordinates"):
continue
if src_crs != target_crs:
geom = transform_geom(src_crs, target_crs, geom)
# Field mapping
mapped_props = transform_record(feat, config)
dst.write({"type": "Feature", "geometry": geom, "properties": mapped_props})
logger.info(f"Completed: {src_file.name}")
except Exception as e:
# Quarantine on failure
q_path = quarantine_dir / src_file.name
src_file.rename(q_path)
logger.error(f"Quarantined {src_file.name}: {e}")
finally:
# Explicit teardown & memory check
gc.collect()
if get_rss_mb() > 1500:
logger.warning(f"High RSS ({get_rss_mb():.0f} MB). Triggering forced GC.")
gc.collect()
def main():
config = load_schema_config("schema_mapping.yaml")
input_dir = Path("input_shapefiles")
output_dir = Path("output_standardized")
quarantine_dir = Path("quarantine")
output_dir.mkdir(exist_ok=True)
quarantine_dir.mkdir(exist_ok=True)
files = sorted(input_dir.glob("*.shp"))
batch_size = 75
for i in range(0, len(files), batch_size):
batch = files[i:i + batch_size]
logger.info(f"Processing batch {i // batch_size + 1} ({len(batch)} files)")
process_batch(batch, config, output_dir, quarantine_dir)
logger.info("Batch transformation complete.")
if __name__ == "__main__":
main()
Compliance & CI Resilience Jump to heading
Government spatial pipelines require deterministic outputs and auditable failure states. The script above enforces strict procedural clarity:
- Missing fields resolve to
Nonerather than raisingKeyError - CRS mismatches trigger explicit transformation or quarantine
- CI failures are isolated via structured logging and directory quarantine
- Memory thresholds prevent runner OOM kills during parallel execution
Reference the official GDAL Shapefile Driver documentation for driver-specific limitations regarding field name truncation and attribute type constraints. Align transformation rules with agency data governance standards to ensure downstream interoperability.