Async PBF Parsing with Pyrosm Jump to heading

OpenStreetMap PBF extracts routinely exceed multi-gigabyte thresholds, transforming synchronous parsing into a bottleneck in production spatial ETL pipelines. Within the broader Parsing & Tag Normalization Workflows architecture, concurrent I/O patterns can decouple disk reads from CPU-bound tag validation. Pyrosm is a Cython-backed library that wraps libosmium to read PBF files into GeoDataFrames. Its native API is synchronous and reads the full file in a single pass — it does not support seeking to arbitrary byte offsets mid-stream. However, wrapping the parse step in a ProcessPoolExecutor and feeding results through an asyncio queue enables a producer-consumer architecture where parsing and downstream transformation overlap.

Concurrency Architecture & Memory-Efficient Chunk Processing Jump to heading

The fundamental constraint in PBF ingestion is balancing memory residency with I/O concurrency. Because Pyrosm materializes one GeoDataFrame per feature type, the practical approach for large files is to split the extract by bounding box using osmium extract before parsing, then process each regional slice in an isolated worker process. This avoids serializing large GeoDataFrames across IPC boundaries (which is expensive) while still achieving parallel throughput.

The architecture below uses asyncio to orchestrate a bounded queue of futures. Each worker process handles one regional tile; results are yielded as pyarrow.Table objects to the downstream consumer:

sequenceDiagram
    autonumber
    participant P as Producer task
    participant Q as Bounded asyncio.Queue
    participant E as ProcessPool worker
    participant C as Async consumer
    P->>E: executor.submit(parse_tile, tile_path)
    E-->>P: concurrent.futures.Future
    P->>Q: put(future)  (awaits if full)
    Q-->>C: await get() → future
    C->>C: await asyncio.to_thread(future.result)
    E-->>C: pyarrow.Table
    C-->>C: yield Table to downstream
python
import asyncio
import logging
from concurrent.futures import ProcessPoolExecutor
from pathlib import Path
from typing import AsyncIterator
import pyarrow as pa
from pyrosm import OSM

MAX_WORKERS = 4
QUEUE_MAXSIZE = 8

logger = logging.getLogger(__name__)


def _parse_tile_worker(tile_path: str) -> pa.Table:
    """Isolated worker: parse one PBF tile and return an Arrow Table.

    Each worker process owns its own OSM instance and GeoDataFrame, so
    there is no shared-memory contention. Tags are preserved as columns.
    """
    try:
        reader = OSM(tile_path)
        gdf = reader.get_network(network_type="driving")
        if gdf is None or gdf.empty:
            return pa.table({})
        # Drop geometry: Arrow has no native geometry type; callers can
        # reconstruct from WKB if needed.
        df = gdf.drop(columns="geometry")
        return pa.Table.from_pandas(df, preserve_index=False)
    except Exception as e:
        logger.error("Worker failed for tile %s: %s", tile_path, e)
        return pa.table({})


async def async_tile_stream(
    tile_paths: list[Path],
) -> AsyncIterator[pa.Table]:
    """Yield normalised Arrow tables for each PBF tile, max QUEUE_MAXSIZE in flight."""
    queue: asyncio.Queue = asyncio.Queue(maxsize=QUEUE_MAXSIZE)

    async def producer() -> None:
        with ProcessPoolExecutor(max_workers=MAX_WORKERS) as executor:
            for path in tile_paths:
                future = executor.submit(_parse_tile_worker, str(path))
                await queue.put(future)
        await queue.put(None)  # sentinel

    asyncio.create_task(producer())

    while True:
        future = await queue.get()
        if future is None:
            break
        try:
            result = await asyncio.to_thread(future.result)
            if result.num_rows:
                yield result
        except Exception as e:
            logger.warning("Tile processing failed: %s", e)
        finally:
            queue.task_done()

Tag Normalization & QA Enforcement Jump to heading

Raw OSM tags exhibit inconsistent casing, localized abbreviations, and deprecated keys. Implement normalization on the Arrow Table before yielding it downstream — this is the cheapest point because the table is already in columnar memory:

python
import pyarrow.compute as pc

def normalize_highway_column(table: pa.Table) -> pa.Table:
    """Map raw highway values to canonical routing classes."""
    HIGHWAY_MAP = {
        "motorway": "motorway", "trunk": "trunk",
        "primary": "arterial", "secondary": "arterial",
        "tertiary": "collector", "residential": "local",
        "unclassified": "local", "service": "access",
    }
    if "highway" not in table.schema.names:
        return table
    col = table.column("highway")
    # pc.replace_with_mask is not a dictionary map; use a Python dict via cast.
    import pandas as pd
    s = col.to_pandas().map(HIGHWAY_MAP)
    new_col = pa.array(s.tolist(), type=pa.string())
    idx = table.schema.get_field_index("highway")
    return table.set_column(idx, "highway", new_col)

When encountering malformed geometries or missing mandatory attributes, log the failure, quarantine the record to a dead-letter Parquet partition, and continue processing without halting the event loop. This approach directly supports robust Batch Attribute Mapping Strategies by decoupling schema enforcement from raw ingestion.

Graph Conversion & Scaling Jump to heading

Once normalized, streaming Arrow tables transition into network analysis frameworks. Integrating Pyrosm outputs with OSMnx Graph Conversion Techniques requires reconstructing GeoDataFrames from the Arrow tables (re-attaching geometry from a WKB column or re-running the parse with nodes=True), then passing them to ox.graph_from_gdfs.

For regional emergencies or rapid-response scenarios requiring fast turnaround, tile granularity controls the parallelism-to-overhead ratio. H3 resolution 5 (average cell area ~252 km²) provides a reasonable default for European country-scale extracts; adjust to resolution 4 for continental processing. Use osmium extract --strategy=smart to create tiles that include all way members needed for geometry reconstruction at each tile boundary.

For authoritative guidance on asyncio queue back-pressure and executor lifecycle management, consult the official Python asyncio documentation and the Pyrosm documentation.