emmi_inference.writer

Classes

AsyncWriter

Asynchronous, atomic writer for inference outputs.

Module Contents

class emmi_inference.writer.AsyncWriter(out_root, save_format=SaveFormat.PYTORCH_PT, dict_mode=DictMode.SINGLE, workers=4, max_inflight=256, small_sync_bytes=64000, upload_queue=None)

Asynchronous, atomic writer for inference outputs.

The writer schedules filesystem writes on a thread pool while applying backpressure when too many writes are in flight. Small outputs are written synchronously on the calling thread for reduced overhead. All writes are crash-safe: data is written to a temporary file and then atomically `os.replace`d into place.

Parameters:
  • out_root (pathlib.Path) – Root directory where results are written.

  • save_format (emmi_inference.common.SaveFormat) – Output format (see SaveFormat).

  • dict_mode (emmi_inference.common.DictMode) – How to serialize mapping outputs (see DictMode).

  • workers (int) – Thread-pool size for background writes.

  • max_inflight (int) – Max number of futures before backpressure blocks.

  • small_sync_bytes (int) – Threshold (bytes) under which writes are performed synchronously instead of enqueued.

  • upload_queue (queue.Queue[pathlib.Path] | None)

Examples

Write a single tensor:

writer = AsyncWriter(Path("out"), save_format=SaveFormat.PYTORCH_PT)
writer.write("runs/sample_0001", tensor)  # -> out/runs/sample_0001.pt
writer.close()
# or:
with AsyncWriter(Path("out"), save_format=SaveFormat.PYTORCH_PT) as writer:
    writer.write("runs/sample_0001", tensor)  # -> out/runs/sample_0001.pt

Write a mapping as a single NPZ:

writer = AsyncWriter(Path("out"), save_format=SaveFormat.NUMPY_NPZ, dict_mode=DictMode.SINGLE)
writer.write("preds/scene_42", {"flow": flow, "pressure": p})  # -> preds/scene_42.npz
writer.close()

Split mapping into multiple PT files:

writer = AsyncWriter(Path("out"), save_format=SaveFormat.PYTORCH_PT, dict_mode=DictMode.SPLIT)
writer.write(
    "preds/scene_42", {"flow": flow, "pressure": p}
)  # -> preds/scene_42_flow.pt, preds/scene_42_pressure.pt
writer.close()
root
save_format
dict_mode
max_inflight = 256
small_sync_bytes = 64000
upload_queue = None
pool
write(relative_path_no_ext, to_save)

Schedule persistence of tensors or collections.

Parameters:
  • relative_path_no_ext (str) – Relative path without extension (joined to out_root).

  • to_save (torch.Tensor | collections.abc.Mapping[str, torch.Tensor] | collections.abc.Sequence[torch.Tensor]) – One of: - torch.Tensor - Mapping[str, torch.Tensor] - Sequence[torch.Tensor]

Raises:

TypeError – If to_save type is unsupported.

Return type:

None

close()

Wait for all in-flight writes to complete and shut down the pool.

Propagates any exceptions raised by background tasks.

Return type:

None