Speed up OSM parsing with multiprocessing in Python Jump to heading
OpenStreetMap .pbf extraction pipelines routinely encounter CPU-bound bottlenecks during tag deserialization, geometry validation, and attribute mapping. While single-threaded parsers efficiently stream binary protobuf data, they remain constrained by the Python Global Interpreter Lock (GIL) when executing heavy regex normalization, cross-referencing, or graph conversion logic. Implementing a robust multiprocessing architecture requires precise worker isolation, chunk-aware memory budgeting, and deterministic error propagation. This guide details production-grade multiprocessing patterns for OSM ETL pipelines, focusing on edge-case resilience, diagnostic profiling, and exact API configurations.
Process Pool Architecture & Memory Isolation Jump to heading
flowchart LR
G["chunk_generator<br/>(main process)"] --> S{submit}
S --> F1["Future · chunk 1"] --> W1["Worker process 1<br/>parse_chunk()"]
S --> F2["Future · chunk 2"] --> W2["Worker process 2<br/>parse_chunk()"]
S --> Fn["Future · chunk N"] --> Wn["Worker process N<br/>parse_chunk()"]
W1 --> AC["as_completed iterator"]
W2 --> AC
Wn --> AC
AC --> Y["yield normalized chunk"]
Naive multiprocessing.Pool instantiations frequently trigger worker OOM kills or pickle serialization failures when passing large OSM feature dictionaries. The correct approach isolates heavy I/O from CPU-bound transformations using concurrent.futures.ProcessPoolExecutor. Limiting tasks per worker prevents memory fragmentation across long-running processes. Disabling GC in workers can reduce overhead during CPU-intensive transformations — but only call gc.collect() explicitly at chunk boundaries, not inside the inner loop.
import os
import gc
import logging
import multiprocessing as mp
from concurrent.futures import ProcessPoolExecutor, as_completed
from typing import Iterator, Dict, List, Any
import psutil
logger = logging.getLogger(__name__)
def worker_initializer() -> None:
"""Disable automatic GC in workers; we trigger it manually at chunk boundaries."""
gc.disable()
def parse_chunk(chunk_data: List[Dict[str, Any]]) -> Dict[str, Any]:
"""
CPU-bound transformation: geometry validation + tag normalization.
chunk_data contains pre-filtered OSM elements from a single bounding box or feature class.
Returns a dict with 'normalized' and 'errors' lists so failures are surfaced without
crashing the worker.
"""
normalized = []
errors = []
for idx, elem in enumerate(chunk_data):
try:
if not elem.get("tags"):
continue
normalized.append({
"osm_id": elem["id"],
"geometry": elem.get("geometry"),
"tags": elem["tags"],
"worker_pid": os.getpid(),
})
except Exception as e:
errors.append({"index": idx, "osm_id": elem.get("id"), "error": str(e)})
gc.collect() # Manual collection at chunk boundaries to stabilize RSS.
return {"normalized": normalized, "errors": errors}
def run_parallel_pipeline(
chunk_generator: Iterator[List[Dict[str, Any]]],
max_workers: int | None = None,
) -> Iterator[Dict[str, Any]]:
"""Submit chunks to a process pool and yield results as they complete."""
if max_workers is None:
max_workers = min(os.cpu_count() or 1, 8)
available_mb = psutil.virtual_memory().available // (1024 ** 2)
logger.info("Spawning %d workers; %d MB RAM available.", max_workers, available_mb)
# OMP_NUM_THREADS=1 prevents underlying C/BLAS libraries from spawning extra threads
# that compete with the process pool.
os.environ["OMP_NUM_THREADS"] = "1"
with ProcessPoolExecutor(
max_workers=max_workers,
initializer=worker_initializer,
mp_context=mp.get_context("spawn"),
max_tasks_per_child=50, # Recycle workers to mitigate C-extension memory leaks.
) as executor:
futures = {executor.submit(parse_chunk, chunk): i for i, chunk in enumerate(chunk_generator)}
for future in as_completed(futures):
chunk_idx = futures[future]
try:
yield future.result()
except Exception as e:
logger.error("Chunk %d failed with unrecoverable error: %s", chunk_idx, e)
continue
Memory-Efficient Chunk Processing & Generator Pipelines Jump to heading
Loading entire continental extracts into memory is unsustainable for production ETL. Instead, pipelines must leverage generator-based streaming that yields bounded feature arrays. When integrating with Parsing & Tag Normalization Workflows, chunk sizing should be dynamically calculated against available RAM rather than hardcoded. A safe baseline for pyrosm or osmium is 50,000–200,000 elements per chunk, scaling down for relation-heavy rural datasets where each element resolves more node references.
Implement a sliding window buffer that flushes to disk or downstream queues once chunk_size_mb > 128. This prevents multiprocessing from attempting to serialize multi-gigabyte payloads across IPC boundaries, which routinely triggers BrokenProcessPool exceptions on Linux kernels with restrictive vm.max_map_count settings.
CPU-Bound Transformations: Regex Cleaning & Batch Attribute Mapping Jump to heading
Tag deserialization and value standardization represent the heaviest CPU load in OSM parsing. Compiling regex patterns inside worker processes incurs redundant overhead. Pre-compile patterns at the module level so they are inherited by worker processes at fork (on Linux) or passed via initializer/functools.partial on Windows (spawn context):
import re
import functools
# Module-level: compiled once per interpreter.
_SPEED_RE = re.compile(r"^(\d+(?:\.\d+)?)(?:\s*(?:km/h|kmh|kph))?$", re.IGNORECASE)
_SURFACE_RE = re.compile(r"[^a-z0-9_]", re.IGNORECASE)
HIGHWAY_MAP = {
"motorway": "motorway", "trunk": "trunk",
"primary": "arterial", "secondary": "arterial",
"tertiary": "collector", "residential": "local",
"unclassified": "local", "service": "access",
}
def normalize_tags(tags: dict) -> dict:
out = {}
highway = tags.get("highway", "")
out["road_class"] = HIGHWAY_MAP.get(highway)
raw_speed = tags.get("maxspeed", "")
m = _SPEED_RE.match(str(raw_speed))
out["maxspeed_kmh"] = float(m.group(1)) if m else None
surface = _SURFACE_RE.sub("", tags.get("surface", "").lower())
out["surface_clean"] = surface or None
return out
For batch attribute mapping, convert high-cardinality mapping tables to pandas categoricals or Polars dictionaries before distribution. Apply vectorized string operations post-chunk rather than per-element to leverage SIMD instruction paths in the underlying C libraries. This aligns with Async PBF Parsing with Pyrosm paradigms where I/O and CPU stages are decoupled.
Cross-Region Tag Harmonization & Deterministic Error Handling Jump to heading
OSM tagging conventions vary significantly across regions (e.g., highway=trunk vs highway=motorway, or localized name:* keys). Multiprocessing workers should receive a region-specific configuration dictionary at startup via the initializer + initargs pattern. This enables parallel application of localized normalization rules without conditional branching overhead in the hot path.
Error handling must be deterministic. Instead of raising exceptions that terminate the pool, workers return structured error payloads containing osm_id, chunk_index, and sanitized stack traces. Implement a dead-letter queue (DLQ) that logs malformed geometries or invalid tag combinations to a separate Parquet partition. This allows GIS analysts to audit edge cases without halting the primary pipeline. Reference the official Python concurrent.futures documentation for robust as_completed error trapping patterns.
Integration with Graph Conversion & Async PBF Workflows Jump to heading
Once normalized, OSM features often feed into osmnx for topological graph construction. Graph conversion is memory-intensive due to adjacency matrix allocation. Parallelize this by partitioning the normalized dataset into spatial tiles (e.g., H3 resolution 5 or S2 level 8), processing each tile independently, and merging edge lists post-conversion. Tile boundaries must include a buffer overlap (typically 500 m) so that ways crossing tile edges are represented in both tiles during edge list construction.
When combining multiprocessing with async I/O, use asyncio.to_thread() or loop.run_in_executor() to offload blocking ProcessPoolExecutor calls. This prevents the event loop from stalling during heavy tag validation or coordinate reprojection. Ensure that networkx graph objects are serialized via pickle protocol 5 (Python 3.8+) to leverage out-of-band buffers and reduce IPC latency.
Emergency Pipeline Scaling & Production Diagnostics Jump to heading
When extraction jobs stall or encounter sudden memory spikes:
- Dynamic Worker Adjustment: Monitor
psutil.Process().memory_percent()per worker. If RSS exceeds 85% of the allocated threshold, gracefully drain the queue and respawn withmax_tasks_per_child=1. - Checkpointing: Write chunk offsets to a lightweight SQLite WAL file. On failure, resume from the last committed offset rather than restarting the full PBF stream.
- Diagnostic Profiling: Use
py-spyortracemallocto identify C-extension memory leaks inosmiumorshapelybindings. Pinshapely>=2.0to avoid known GEOS serialization regressions. - GIL Contention Mitigation: Ensure all heavy math operations use
numpywithnogil-capable routines. Protobuf decoding is process-safe when isolated per worker, but tag string interning can still trigger GIL contention if performed on shared memory structures.
By enforcing strict memory budgets, isolating CPU-bound transformations, and implementing deterministic error routing, mapping engineers can reliably parse multi-gigabyte OSM extracts at scale without compromising data quality or reproducibility.