Source code for xbout.xarraybackend

"""License:
Distributed under the OSI-approved Apache License, Version 2.0.  See
accompanying file Copyright.txt for details.
"""

from __future__ import annotations

import os

# import warnings

from collections.abc import Iterable
from typing import TYPE_CHECKING, Any, ItemsView

import numpy as np

from xarray import Dataset, Variable
from xarray.backends.common import (
    BackendArray,
    BackendEntrypoint,
    _normalize_path,
)

from xarray.core import indexing

if TYPE_CHECKING:
    from adios2 import FileReader
    from io import BufferedIOBase
    from xarray.backends.common import AbstractDataStore


# need some special secret attributes to tell us the dimensions
DIMENSION_KEY = "time_dimension"

adios_to_numpy_type = {
    "char": np.char,
    "int8_t": np.int8,
    "int16_t": np.int16,
    "int32_t": np.int32,
    "int64_t": np.int64,
    "uint8_t": np.uint8,
    "uint16_t": np.uint16,
    "uint32_t": np.uint32,
    "uint64_t": np.uint64,
    "float": float,
    "double": np.double,
    "long double": np.longdouble,
    "float complex": np.complex64,
    "double complex": np.complex128,
    "string": np.char,
}


[docs] class BoutADIOSBackendArray(BackendArray): """ADIOS2 backend for lazily indexed arrays"""
[docs] def __init__( self, shape: list, dtype: np.dtype, lock, adiosfile: FileReader, varname: str ): self.shape = shape self.dtype = dtype self.lock = lock self.fh = adiosfile self.varname = varname self.adiosvar = self.fh.inquire_variable(varname) self.steps = self.adiosvar.steps()
# print(f"BoutADIOSBackendArray.__init__: {dtype} {varname} {shape} {dtype.itemsize}") def __getitem__(self, key: indexing.ExplicitIndexer) -> np.typing.ArrayLike: # print(f"**** BoutADIOSBackendArray.__getitem__: {self.varname} key = {key}") return indexing.explicit_indexing_adapter( key, self.shape, indexing.IndexingSupport.BASIC, self._raw_indexing_method, ) def _raw_indexing_method(self, key: tuple) -> np.typing.ArrayLike: # print(f"****BoutADIOSBackendArray._raw_indexing_method: {self.varname} " # f"key = {key} steps = {self.steps}") # print(f" data shape {data.shape}") # thread safe method that access to data on disk needed because # adios is not thread safe even for reading # with self.lock: start = [] count = [] dimid = 0 first_sl = True for sl in key: if isinstance(sl, slice): if sl.start is None: st = 0 else: st = sl.start if sl.stop is None: ct = self.shape[dimid] - st else: ct = sl.stop - st if sl.step != 1 and sl.step is not None: msg = ( "The indexing operation with step != 1 you are attempting to perform " "is not valid on ADIOS2.Variable object. " ) raise IndexError(msg) else: st = sl - 1 ct = 1 if self.steps > 1 and first_sl: # key[0] is the step selection # print(f" data step selection start = {st} count = {ct}") self.adiosvar.set_step_selection([st, ct]) else: start.append(st) count.append(ct) dimid += 1 first_sl = False # print(f" data selection start = {start} count = {count}") self.adiosvar.set_selection([start, count]) data = self.fh.read(self.adiosvar) if self.steps > 1: # ADIOS does not have time dimension. Read returns n-dim array # with the steps included in the first dimension dim0 = int(data.shape[0] / self.steps) if data.shape[0] % self.steps != 0: print( f"ERROR in BoutADIOSBackendArray: first dimension problem " f"with handling steps. Variable name={self.varname} " f"shape={data.shape}, steps={self.steps}" ) data = data.reshape((self.steps, dim0) + data.shape[1:]) return data
[docs] def attrs_of_var(varname: str, items: ItemsView, separator: str = "/"): """Return attributes whose name starts with a variable's name""" return [(key, value) for key, value in items if key.startswith(varname + separator)]
# pylint: disable=R0902 # Too many instance attributes # pylint: disable=R0912 # Too many branches # pylint: disable=E1121 # too-many-function-args
[docs] class BoutAdiosBackendEntrypoint(BackendEntrypoint): """ Backend for ".bp" folders based on the adios2 package. For more information about the underlying library, visit: https://adios2.readthedocs.io/en/stable See Also -------- backends.AdiosStore """ description = "Open ADIOS2 files/folders (.bp) using adios2 in Xarray" url = "https://docs.xarray.dev/en/stable/generated/xarray.backends.ZarrBackendEntrypoint.html"
[docs] def __init__(self): self._fh = None
[docs] def close(): """Close the ADIOS file"""
# print("BoutAdiosBackendEntrypoint.close() called") # Note that this is a strange method without 'self', so we cannot close the file because # we don't have any handle to it # if self._fh is not None: # self._fh.close()
[docs] def guess_can_open( self, filename_or_obj: str | os.PathLike[Any] | BufferedIOBase | AbstractDataStore, ) -> bool: if isinstance(filename_or_obj, (str, os.PathLike)): _, ext = os.path.splitext(filename_or_obj) return ext in {".bp"} return False
[docs] def open_dataset( # type: ignore[override] # allow LSP violation, not supporting **kwargs self, filename_or_obj: str | os.PathLike[Any] | BufferedIOBase | AbstractDataStore, *, # mask_and_scale=True, # decode_times=True, # concat_characters=True, # decode_coords=True, drop_variables: str | Iterable[str] | None = None, # use_cftime=None, # decode_timedelta=None, # group=None, # mode="r", # synchronizer=None, # consolidated=None, # chunk_store=None, # storage_options=None, # stacklevel=3, # adios_version=None, ) -> Dataset: from adios2 import FileReader filename_or_obj = _normalize_path(filename_or_obj) # print(f"BoutAdiosBackendEntrypoint: path = {filename_or_obj} type = {type(filename_or_obj)}") # if isinstance(filename_or_obj, os.PathLike): # print(f" os.PathLike: {os.fspath(filename_or_obj)}") # # if isinstance(filename_or_obj, str): # print(f" str: {os.path.abspath(os.path.expanduser(filename_or_obj))}") # if isinstance(filename_or_obj, BufferedIOBase): # raise ValueError("ADIOS2 does not support BufferedIOBase input") # # if isinstance(filename_or_obj, AbstractDataStore): # raise ValueError("ADIOS2 does not support AbstractDataStore input") self._fh = FileReader(filename_or_obj) vars = self._fh.available_variables() attrs = self._fh.available_attributes() attr_items = attrs.items() # print(f"BoutAdiosBackendEntrypoint: {len(vars)} variables, {len(attrs)} attributes") xvars = {} for varname, varinfo in vars.items(): if drop_variables is not None and varname in drop_variables: continue shape_str = varinfo["Shape"].split(", ") if shape_str[0]: shape_list = list(map(int, shape_str)) else: shape_list = [] shape_str = [] steps = int(varinfo["AvailableStepsCount"]) # print(f"{varinfo['Type']} {varname}\t {shape_list}") varattrs = attrs_of_var(varname, attr_items, "/") dims = None vlen = len(varname) + 1 # include / xattrs = {} for aname, ainfo in varattrs: # print(f"\t{ainfo['Type']} {aname}\t = {ainfo['Value']}") attr_value = self._fh.read_attribute(aname) if aname == varname + "/__xarray_dimensions__": dims = attr_value # print(f"\t\tDIMENSIONS = {dims}") else: xattrs[aname[vlen:]] = attr_value attrs.pop(aname) # print(f"\txattrs = {xattrs}") # Create the xarray variable if dims is None: dims = shape_str if shape_list != []: # for i in range(len(shape_str)): # shape_str[i] = "d" + shape_str[i] if steps > 1: shape_list.insert(0, steps) dims.insert(0, "t") # print(f"\tAdd time to shape {shape_list} {dims}") nptype = np.dtype(adios_to_numpy_type[varinfo["Type"]]) xdata = indexing.LazilyIndexedArray( BoutADIOSBackendArray(shape_list, nptype, None, self._fh, varname) ) # print(f"\tDefine VARIABLE {varname} with dims {dims}") xvar = Variable(dims, xdata, attrs=xattrs, encoding={"dtype": nptype}) # print(f"{xvar.dtype} {xvar.attrs["name"]} {xvar.dims} {xvar.encoding}") else: if steps > 1: avar = self._fh.inquire_variable(varname) avar.set_step_selection([0, avar.steps()]) data = self._fh.read(avar) # print(f"\tCreate timed scalar variable {varname}") xvar = Variable( "t", data, attrs=xattrs, encoding={"dtype": data.dtype} ) else: data = self._fh.read(varname) if varinfo["Type"] == "string": # print(f"\tCreate string scalar variable {varname}") xvar = Variable([], data, attrs=xattrs, encoding=None) else: # print(f"\tCreate scalar variable {varname}") xvar = Variable([], data, attrs=xattrs, encoding=None) xvars[varname] = xvar # print(f"--- {xvar}") for attname, attinfo in attrs.items(): print(f"{attinfo['Type']} {attname}\t = {attinfo['Value']}") ds = Dataset(xvars, None, None) ds.set_close(BoutAdiosBackendEntrypoint.close) return ds