Source code for earthkit.utils.array.namespace.unknown

# (C) Copyright 2025 ECMWF.
#
# This software is licensed under the terms of the Apache Licence Version 2.0
# which can be obtained at http://www.apache.org/licenses/LICENSE-2.0.
# In applying this licence, ECMWF does not waive the privileges and immunities
# granted to it by virtue of its status as an intergovernmental organisation
# nor does it submit to any jurisdiction.


[docs] class UnknownPatchedNamespace:
[docs] def __init__(self, xp): self._xp = xp
@property def xp(self): return self._xp def __getattr__(self, name): return getattr(self.xp, name) # Delegate to underlying namespace # TODO: come up with a better way to compare namespaces def __eq__(self, other): return self._earthkit_array_namespace_name == other._earthkit_array_namespace_name @property def _earthkit_array_namespace_name(self): return self.xp.__name__
[docs] def polyval(self, x, c): """Evaluation of a polynomial using Horner's scheme. If ``c`` is of length ``n + 1``, this function returns the value .. math:: p(x) = c_0 + c_1 * x + ... + c_n * x^n Parameters ---------- x: array-like The values(s) at which to evaluate the polynomial. Its elements must support addition and multiplication with with themselves and with the elements of ``c``. c: array-like Array of coefficients ordered so that the coefficients for terms of degree n are contained in c[n]. Returns ------- values : array-like The value(s) of the polynomial at the given point(s). Comments -------- Based on the ``numpy.polynomal.polynomial.polyval`` function. """ c0 = c[-1] + x * 0 for i in range(2, len(c) + 1): c0 = c[-i] + c0 * x return c0
[docs] def percentile(self, a, q, axis=None): """Compute percentiles by calling the quantile function.""" if axis is None: axis = 0 a = self.reshape(a, (-1,)) a = self.sort(a, axis=axis) n = self.shape(a)[axis] rank = (q / 100) * (n - 1) low = int(self.floor(rank)) high = int(self.ceil(rank)) weight = rank - low a_low = self.take(a, low, axis=axis) a_high = self.take(a, high, axis=axis) return (1 - weight) * a_low + weight * a_high
[docs] def quantile(self, a, q, axis=None): return self.percentile(a, q * 100, axis=axis)
[docs] def histogram2d(self, x, y, *, bins=10): """Compute a 2D histogram. Parameters ---------- x: array-like An array containing the x coordinates of the points to be histogrammed. y: array-like An array containing the y coordinates of the points to be histogrammed. bins: int or list of int, optional The number of bins for the histogram in each dimension. If bins is an int, it is used for both dimensions. """ return self.xp.histogramdd(self.xp.stack([x, y]).T, bins=bins)
[docs] def histogramdd(self, x, *, bins=10): N, D = x.shape if isinstance(bins, int): bins = [bins] * D elif len(bins) != D: raise ValueError("bins must have length equal to number of dimensions") mins = self.xp.min(x, axis=0) maxs = self.xp.max(x, axis=0) edges = [self.xp.linspace(mins[d], maxs[d], bins[d] + 1) for d in range(D)] idx = [] for d in range(D): bin_width = edges[d][1] - edges[d][0] i = self.xp.floor((x[:, d] - edges[d][0]) / bin_width).astype(int) i = self.xp.clip(i, 0, bins[d] - 1) idx.append(i) idx = self.xp.stack(idx, axis=1) # TODO: do we need to handle device here? H = self.xp.zeros(bins, dtype=float) for i in range(N): H[tuple(idx[i])] += 1 return H, edges
[docs] def size(self, x): """Return the size of an array.""" # array.size is part of array api spec # but in practice not all backends implement it yet # therefore we provide this function # in order to be able to patch it if needed return x.size
[docs] def shape(self, x): """Return the shape of an array.""" # array.shape is part of array api spec # but in practice not all backends implement it yet # therefore we provide this function # in order to be able to patch it if needed return x.shape
[docs] def to_device(self, x, device, **kwargs): # array.to_device(device, **kwargs) is part of array api spec # but in practice not all backends implement it yet # therefore we provide this function # in order to be able to patch it if needed import array_api_compat return array_api_compat.to_device(x, device, **kwargs)
[docs] def device(self, x): # array.device is part of array api spec # but in practice not all backends implement it yet # therefore we provide this function # in order to be able to patch it if needed import array_api_compat return array_api_compat.device(x)
[docs] def dtype(self, x): # array.device is part of array api spec # but in practice not all backends implement it yet # therefore we provide this function # in order to be able to patch it if needed return x.dtype
[docs] def isclose(self, x, y, *, rtol=1e-5, atol=1e-8, equal_nan=False): result = self.xp.abs(x - y) <= atol + rtol * self.xp.abs(y) if equal_nan: nan_mask = self.xp.isnan(x) & self.xp.isnan(y) result = result | nan_mask return result
[docs] def allclose(self, x, y, *, rtol=1e-5, atol=1e-8, equal_nan=False): return self.xp.all(self.isclose(x, y, rtol=rtol, atol=atol, equal_nan=equal_nan))
[docs] def deg2rad(self, x): from earthkit.utils.constants import radian return x * radian
[docs] def rad2deg(self, x): from earthkit.utils.constants import degree return x * degree