emmi_inference.writer¶
Classes¶
Asynchronous, atomic writer for inference outputs. |
Module Contents¶
- class emmi_inference.writer.AsyncWriter(out_root, save_format=SaveFormat.PYTORCH_PT, dict_mode=DictMode.SINGLE, workers=4, max_inflight=256, small_sync_bytes=64000, upload_queue=None)¶
Asynchronous, atomic writer for inference outputs.
The writer schedules filesystem writes on a thread pool while applying backpressure when too many writes are in flight. Small outputs are written synchronously on the calling thread for reduced overhead. All writes are crash-safe: data is written to a temporary file and then atomically `os.replace`d into place.
- Parameters:
out_root (pathlib.Path) – Root directory where results are written.
save_format (emmi_inference.common.SaveFormat) – Output format (see SaveFormat).
dict_mode (emmi_inference.common.DictMode) – How to serialize mapping outputs (see DictMode).
workers (int) – Thread-pool size for background writes.
max_inflight (int) – Max number of futures before backpressure blocks.
small_sync_bytes (int) – Threshold (bytes) under which writes are performed synchronously instead of enqueued.
upload_queue (queue.Queue[pathlib.Path] | None)
Examples
Write a single tensor:
writer = AsyncWriter(Path("out"), save_format=SaveFormat.PYTORCH_PT) writer.write("runs/sample_0001", tensor) # -> out/runs/sample_0001.pt writer.close() # or: with AsyncWriter(Path("out"), save_format=SaveFormat.PYTORCH_PT) as writer: writer.write("runs/sample_0001", tensor) # -> out/runs/sample_0001.pt
Write a mapping as a single NPZ:
writer = AsyncWriter(Path("out"), save_format=SaveFormat.NUMPY_NPZ, dict_mode=DictMode.SINGLE) writer.write("preds/scene_42", {"flow": flow, "pressure": p}) # -> preds/scene_42.npz writer.close()
Split mapping into multiple PT files:
writer = AsyncWriter(Path("out"), save_format=SaveFormat.PYTORCH_PT, dict_mode=DictMode.SPLIT) writer.write( "preds/scene_42", {"flow": flow, "pressure": p} ) # -> preds/scene_42_flow.pt, preds/scene_42_pressure.pt writer.close()
- root¶
- save_format¶
- dict_mode¶
- max_inflight = 256¶
- small_sync_bytes = 64000¶
- upload_queue = None¶
- pool¶
- write(relative_path_no_ext, to_save)¶
Schedule persistence of tensors or collections.
- Parameters:
relative_path_no_ext (str) – Relative path without extension (joined to out_root).
to_save (torch.Tensor | collections.abc.Mapping[str, torch.Tensor] | collections.abc.Sequence[torch.Tensor]) – One of: - torch.Tensor - Mapping[str, torch.Tensor] - Sequence[torch.Tensor]
- Raises:
TypeError – If to_save type is unsupported.
- Return type:
None
- close()¶
Wait for all in-flight writes to complete and shut down the pool.
Propagates any exceptions raised by background tasks.
- Return type:
None