emmi_inference.writer ===================== .. py:module:: emmi_inference.writer Classes ------- .. autoapisummary:: emmi_inference.writer.AsyncWriter Module Contents --------------- .. py:class:: AsyncWriter(out_root, save_format = SaveFormat.PYTORCH_PT, dict_mode = DictMode.SINGLE, workers = 4, max_inflight = 256, small_sync_bytes = 64000, upload_queue = None) Asynchronous, atomic writer for inference outputs. The writer schedules filesystem writes on a thread pool while applying backpressure when too many writes are in flight. Small outputs are written synchronously on the calling thread for reduced overhead. All writes are crash-safe: data is written to a temporary file and then atomically `os.replace`d into place. :param out_root: Root directory where results are written. :param save_format: Output format (see `SaveFormat`). :param dict_mode: How to serialize mapping outputs (see `DictMode`). :param workers: Thread-pool size for background writes. :param max_inflight: Max number of futures before backpressure blocks. :param small_sync_bytes: Threshold (bytes) under which writes are performed synchronously instead of enqueued. .. rubric:: Examples Write a single tensor: .. code-block:: python writer = AsyncWriter(Path("out"), save_format=SaveFormat.PYTORCH_PT) writer.write("runs/sample_0001", tensor) # -> out/runs/sample_0001.pt writer.close() # or: with AsyncWriter(Path("out"), save_format=SaveFormat.PYTORCH_PT) as writer: writer.write("runs/sample_0001", tensor) # -> out/runs/sample_0001.pt Write a mapping as a single NPZ: .. code-block:: python writer = AsyncWriter(Path("out"), save_format=SaveFormat.NUMPY_NPZ, dict_mode=DictMode.SINGLE) writer.write("preds/scene_42", {"flow": flow, "pressure": p}) # -> preds/scene_42.npz writer.close() Split mapping into multiple PT files: .. code-block:: python writer = AsyncWriter(Path("out"), save_format=SaveFormat.PYTORCH_PT, dict_mode=DictMode.SPLIT) writer.write( "preds/scene_42", {"flow": flow, "pressure": p} ) # -> preds/scene_42_flow.pt, preds/scene_42_pressure.pt writer.close() .. py:attribute:: root .. py:attribute:: save_format .. py:attribute:: dict_mode .. py:attribute:: max_inflight :value: 256 .. py:attribute:: small_sync_bytes :value: 64000 .. py:attribute:: upload_queue :value: None .. py:attribute:: pool .. py:method:: write(relative_path_no_ext, to_save) Schedule persistence of tensors or collections. :param relative_path_no_ext: Relative path **without** extension (joined to `out_root`). :param to_save: One of: - `torch.Tensor` - `Mapping[str, torch.Tensor]` - `Sequence[torch.Tensor]` :raises TypeError: If `to_save` type is unsupported. .. py:method:: close() Wait for all in-flight writes to complete and shut down the pool. Propagates any exceptions raised by background tasks.