from __future__ import annotations
import json
from collections.abc import Iterable, Mapping
from typing import Any
import numpy as np
DATASET_ATTR_PREFIX = "__xarray_dataset_attrs__/"
VAR_ATTR_PREFIX_DIMENSIONS = "__xarray_dimensions__"
VAR_ATTR_PREFIX_ORIGINAL_DTYPE = "__xarray_original_dtype__"
[docs]
class Adios2NotInstalledError(ImportError):
pass
def _safe_int32_cast(arr: np.ndarray) -> np.ndarray:
if arr.size == 0:
return arr.astype(np.int32, copy=False)
info = np.iinfo(np.int32)
if arr.dtype.kind == "u":
max_value = int(np.asarray(arr).max())
if max_value > info.max:
raise ValueError(
f"Cannot safely cast unsigned integer data to int32: max={max_value} > {info.max}"
)
return arr.astype(np.int32, copy=False)
min_value = int(np.asarray(arr).min())
max_value = int(np.asarray(arr).max())
if min_value < info.min or max_value > info.max:
raise ValueError(
"Cannot safely cast integer data to int32: "
f"min={min_value}, max={max_value}, int32=[{info.min}, {info.max}]"
)
return arr.astype(np.int32, copy=False)
def _normalize_attr_value(value: Any) -> Any:
if value is None:
return "null"
if isinstance(value, (str, bytes, int, float, bool, np.number)):
return value
if isinstance(value, np.ndarray):
return value
if isinstance(value, (list, tuple)):
if all(isinstance(v, (str, bytes, int, float, bool, np.number)) for v in value):
return list(value)
return json.dumps(value, default=str)
if isinstance(value, Mapping):
return json.dumps(value, default=str)
return json.dumps(value, default=str)
def _numpy_for_write(data: Any, *, write_ints_as_int32: bool = False) -> np.ndarray:
arr = np.asarray(data)
if arr.dtype == np.dtype("bool"):
return arr.astype(np.uint8)
if write_ints_as_int32 and arr.dtype.kind in {"i", "u"} and arr.dtype != np.int32:
arr = _safe_int32_cast(arr)
if arr.dtype.kind in {"M", "m"}:
raise TypeError(
"datetime64/timedelta64 not supported for ADIOS2 writer yet; "
"encode to int64 with units attrs first"
)
if arr.dtype == np.dtype("O"):
# Conservative: only allow scalar string-like objects.
if arr.ndim == 0 and isinstance(arr.item(), (str, bytes)):
return np.asarray(arr.item())
raise TypeError(
"object dtype not supported for ADIOS2 writer (except scalar strings)"
)
return arr
def _attr_to_numpy(value: Any) -> Any:
norm = _normalize_attr_value(value)
if isinstance(norm, np.ndarray):
if norm.ndim == 0:
return norm.item()
return norm
if isinstance(norm, (str, bytes, int, float, bool, np.number)):
return norm
return np.asarray(norm)
[docs]
def write_dataset_bp(
ds,
path: str,
*,
time_dim: str = "t",
engine: str = "BP4",
parameters: Mapping[str, str] | None = None,
overwrite: bool = True,
variables: Iterable[str] | None = None,
write_ints_as_int32: bool = False,
) -> None:
"""
Write an xarray Dataset to an ADIOS2 .bp output.
Design choices:
- Map ``time_dim`` to ADIOS2 steps (variables containing time_dim are written
one slice per step).
- Store per-variable dimension names as attribute ``{var}/__xarray_dimensions__``
excluding ``time_dim`` when writing steps.
- Store dataset attributes under ``__xarray_dataset_attrs__/{key}``.
- Optionally store all integer variables as int32 on disk (see
``write_ints_as_int32``), while preserving the original dtype via the
``__xarray_original_dtype__`` attribute for round-tripping.
"""
try:
import adios2 # type: ignore
except ImportError as e: # pragma: no cover
raise Adios2NotInstalledError(
"adios2 is required to write .bp files; install adios2-python"
) from e
if engine != "BP4":
raise NotImplementedError(
"Only BP4 is supported by the current writer implementation"
)
if parameters:
raise NotImplementedError(
"ADIOS2 engine parameters are not supported by the current writer implementation"
)
if variables is None:
names_to_write = list(ds.variables)
else:
names_to_write = list(variables)
# Determine step count across all time-dependent variables.
step_count: int | None = None
for name in names_to_write:
var = ds.variables[name]
if time_dim in var.dims:
if var.dims[0] != time_dim:
raise ValueError(
f"Only supports {time_dim!r} as the first dimension for {name!r}; "
f"got dims={var.dims!r}"
)
nsteps = int(var.sizes[time_dim])
step_count = nsteps if step_count is None else max(step_count, nsteps)
if step_count is None:
step_count = 1
mode = "w" if overwrite else "a"
stream = adios2.Stream(path, mode)
try:
# Dataset attrs.
for k, v in ds.attrs.items():
stream.write_attribute(DATASET_ATTR_PREFIX + str(k), _attr_to_numpy(v))
# Define all variables up-front to avoid Stream.write(name, ndarray) inference,
# which is unreliable on some adios2-python builds.
io = stream.io
adios_vars: dict[str, Any] = {}
sample_buffers: dict[str, np.ndarray] = {}
for name in names_to_write:
var = ds.variables[name]
write_dtype = _numpy_for_write(
var.data, write_ints_as_int32=write_ints_as_int32
).dtype
if time_dim in var.dims:
shape = [int(var.sizes[d]) for d in var.dims if d != time_dim]
else:
shape = [int(s) for s in np.asarray(var.data).shape]
start = [0] * len(shape)
count = shape[:]
if write_dtype.kind in {"U", "S"}:
sample = np.asarray([""], dtype=write_dtype)
else:
sample = np.array([0], dtype=write_dtype)
# Keep a reference to the sample buffer alive for the lifetime of the
# stream. Some adios2-python builds appear to keep a view/pointer to the
# provided numpy object when defining variables.
sample_buffers[name] = sample
adios_var = io.define_variable(name, sample, shape, start, count)
if shape:
adios_var.set_shape(shape)
adios_var.set_selection([start, count])
adios_vars[name] = adios_var
# Per-variable attrs/dims (now that variables exist).
for name in names_to_write:
var = ds.variables[name]
dims_wo_time = [d for d in var.dims if d != time_dim]
stream.write_attribute(
VAR_ATTR_PREFIX_DIMENSIONS,
_attr_to_numpy(dims_wo_time),
variable_name=name,
separator="/",
)
for ak, av in var.attrs.items():
stream.write_attribute(
str(ak),
_attr_to_numpy(av),
variable_name=name,
separator="/",
)
original_dtype = np.asarray(var.data).dtype
write_arr = _numpy_for_write(
var.data, write_ints_as_int32=write_ints_as_int32
)
if write_arr.dtype != original_dtype:
stream.write_attribute(
VAR_ATTR_PREFIX_ORIGINAL_DTYPE,
_attr_to_numpy(str(original_dtype)),
variable_name=name,
separator="/",
)
for step in range(step_count):
stream.begin_step()
try:
for name in names_to_write:
var = ds.variables[name]
adios_var = adios_vars[name]
if time_dim in var.dims:
if step >= int(var.sizes[time_dim]):
continue
data = _numpy_for_write(
var.data, write_ints_as_int32=write_ints_as_int32
)[step]
elif step == 0:
data = _numpy_for_write(
var.data, write_ints_as_int32=write_ints_as_int32
)
else:
continue
data_arr = np.asarray(data)
if data_arr.ndim:
# Use an owned, contiguous buffer for ADIOS2 Put().
# Some adios2-python builds appear to segfault when passed
# non-owned views.
data_arr = np.array(data_arr, copy=True, order="C")
# Ensure selection matches the provided buffer.
adios_var.set_selection(
[[0] * data_arr.ndim, list(data_arr.shape)]
)
else:
# Ensure a stable 0-d buffer with the intended dtype.
data_arr = np.asarray(
data_arr.item(), dtype=data_arr.dtype
).reshape(())
stream.write(adios_var, data_arr)
finally:
stream.end_step()
finally:
stream.close()