Source code for ampform.data

# cspell:ignore csqrt ndmin ufunc vstack
# pylint: disable=too-many-ancestors
"""Data containers for working with four-momenta.

.. seealso:: :doc:`numpy:user/basics.dispatch`
"""

from collections import abc
from typing import (
    Dict,
    ItemsView,
    Iterable,
    Iterator,
    KeysView,
    Mapping,
    Optional,
    Sequence,
    Tuple,
    Union,
    ValuesView,
)

import numpy as np
from numpy.lib.mixins import NDArrayOperatorsMixin
from numpy.lib.scimath import sqrt as csqrt

try:
    # pyright: reportMissingImports=false
    from numpy.typing import ArrayLike, DTypeLike
except ImportError:
    ArrayLike = Union[Sequence, np.ndarray]  # type: ignore
    DTypeLike = object  # type: ignore


[docs]class ScalarSequence(NDArrayOperatorsMixin, abc.Sequence): """`numpy.array` data container of rank 1.""" def __init__( self, data: ArrayLike, dtype: Optional[DTypeLike] = None ) -> None: self.__data = np.array(data, dtype) if len(self.__data.shape) != 1: raise ValueError( f"{self.__class__.__name__} has to be of rank 1," f" but input data is of rank {len(self.__data.shape)}" ) def __array__(self, _: Optional[DTypeLike] = None) -> np.ndarray: return self.__data
[docs] def __getitem__(self, i: Union[int, slice]) -> np.ndarray: # type: ignore return self.__data[i]
def __len__(self) -> int: return len(self.__data) def __repr__(self) -> str: return f"{self.__class__.__name__}({np.array(self)}"
[docs]class ThreeMomentum(NDArrayOperatorsMixin, abc.Sequence): def __init__( self, data: ArrayLike, dtype: Optional[DTypeLike] = None ) -> None: self.__data = np.array(data, dtype=dtype, ndmin=2) if len(self.__data.shape) != 2: raise ValueError( f"{self.__class__.__name__} has to be of rank 2," f" but this data is of rank {len(self.__data.shape)}" ) if self.__data.shape[1] != 3: raise ValueError( f"{self.__class__.__name__} has to be of shape (N, 3)," f" but this data sample is of shape {self.__data.shape}" ) def __array__(self, _: Optional[DTypeLike] = None) -> np.ndarray: return self.__data
[docs] def __getitem__( # type: ignore self, i: Union[Tuple[Union[int, slice], Union[int, slice]], int, slice], ) -> np.ndarray: return self.__data[i]
def __len__(self) -> int: return len(self.__data) def __repr__(self) -> str: return f"{self.__class__.__name__}({np.array(self)})"
[docs]class FourMomentumSequence(NDArrayOperatorsMixin, abc.Sequence): """Container for a `numpy.array` of four-momentum tuples. The input data has to be of shape (N, 4) and the order of the items has to be :math:`(E, p)` (energy first). """ def __init__(self, data: ArrayLike) -> None: self.__data = np.array(data) if len(self.__data.shape) != 2: raise ValueError( f"{self.__class__.__name__} has to be of rank 2," f" but this data is of rank {len(self.__data.shape)}" ) if self.__data.shape[1] != 4: raise ValueError( f"{self.__class__.__name__} has to be of shape (N, 4)," f" but this data sample is of shape {self.__data.shape}" ) if np.min(self.energy) < 0: raise ValueError( "Energy column contains entries that are less than 0." " Did you order the four-momentum tuples as (E, p)?" ) def __array__(self, _: Optional[DTypeLike] = None) -> np.ndarray: return self.__data
[docs] def __getitem__( # type: ignore self, i: Union[Tuple[Union[int, slice], Union[int, slice]], int, slice], ) -> np.ndarray: return self.__data[i]
def __len__(self) -> int: return len(self.__data) def __repr__(self) -> str: return f"{self.__class__.__name__}({np.array(self)})" @property def energy(self) -> ScalarSequence: return ScalarSequence(self[:, 0]) @property def three_momentum(self) -> ThreeMomentum: return ThreeMomentum(self[:, 1:]) @property def p_x(self) -> ScalarSequence: return ScalarSequence(self[:, 1]) @property def p_y(self) -> ScalarSequence: return ScalarSequence(self[:, 2]) @property def p_z(self) -> ScalarSequence: return ScalarSequence(self[:, 3])
[docs] def p_norm(self) -> ScalarSequence: """Norm of `.three_momentum`.""" return ScalarSequence(np.sqrt(self.p_squared()))
[docs] def p_squared(self) -> ScalarSequence: """Squared norm of `.three_momentum`.""" return ScalarSequence(np.sum(self.three_momentum ** 2, axis=1))
[docs] def phi(self) -> ScalarSequence: return ScalarSequence(np.arctan2(self.p_y, self.p_x))
[docs] def theta(self) -> ScalarSequence: return ScalarSequence(np.arccos(self.p_z / self.p_norm()))
[docs] def mass(self) -> ScalarSequence: mass_squared = self.mass_squared(dtype=np.float64) return ScalarSequence(csqrt(mass_squared))
[docs] def mass_squared( self, dtype: Optional[DTypeLike] = None ) -> ScalarSequence: return ScalarSequence( self.energy ** 2 - self.p_norm() ** 2, dtype=dtype )
[docs]class MatrixSequence(NDArrayOperatorsMixin, abc.Sequence): """Safe data container for a sequence of 4x4-matrices.""" def __init__(self, data: ArrayLike) -> None: self.__data = np.array(data) if len(self.__data.shape) != 3: raise ValueError( f"{self.__class__.__name__} has to be of rank 3," f" but this data is of rank {len(self.__data.shape)}" ) if self.__data.shape[1:] != (4, 4): raise ValueError( f"{self.__class__.__name__} has to be of shape (N, 4, 4)," f" but this data sample is of shape {self.__data.shape}" ) def __array__(self, _: Optional[DTypeLike] = None) -> np.ndarray: return self.__data
[docs] def __getitem__(self, i: Union[int, slice]) -> np.ndarray: # type: ignore return self.__data[i]
def __len__(self) -> int: return len(self.__data) def __repr__(self) -> str: return f"{self.__class__.__name__}({np.array(self)})"
[docs] def dot(self, vector: FourMomentumSequence) -> FourMomentumSequence: return FourMomentumSequence( np.einsum( "ij...,j...", np.transpose(self, axes=(1, 2, 0)), np.transpose(vector), ) )
[docs]class EventCollection(abc.Mapping): """A mapping of state IDs to their `FourMomentumSequence` data samples. An `EventCollection` has to be converted to `DataSet` so that it can be used to evaluate a `.HelicityModel`. """ def __init__(self, data: Mapping[int, ArrayLike]) -> None: self.__data = {i: FourMomentumSequence(v) for i, v in data.items()} n_events = self.n_events if any(map(lambda v: len(v) != n_events, self.values())): raise ValueError( f"Not all {FourMomentumSequence.__name__} items" f" are of length {n_events}" )
[docs] def __getitem__(self, i: int) -> FourMomentumSequence: return self.__data[i]
def __iter__(self) -> Iterator[int]: return iter(self.__data) def __len__(self) -> int: return len(self.__data) def __repr__(self) -> str: return f"{self.__class__.__name__}({self.__data})" @property def n_events(self) -> int: if len(self) == 0: return 0 return len(next(iter(self.values())))
[docs] def sum( # noqa: A003 self, indices: Iterable[int] ) -> FourMomentumSequence: return FourMomentumSequence(sum(self.__data[i] for i in indices)) # type: ignore
[docs] def keys(self) -> KeysView[int]: return self.__data.keys()
[docs] def items(self) -> ItemsView[int, FourMomentumSequence]: return self.__data.items()
[docs] def values(self) -> ValuesView[FourMomentumSequence]: return self.__data.values()
[docs] def append(self, other: Mapping[int, ArrayLike]) -> None: if not isinstance(other, EventCollection): other = EventCollection(other) if self.n_events != 0 and set(self) != set(other): raise ValueError( f"Trying to append a momentum pool with state IDs {set(other)}" f" to a momentum pool with state IDs {set(self)}" ) self.__data = { i: FourMomentumSequence(np.vstack((values, other[i]))) for i, values in self.items() }
[docs] def select_events(self, selection: Union[int, slice]) -> "EventCollection": return EventCollection( {i: values[selection] for i, values in self.items()} )
[docs] def to_pandas( self, _: Optional[DTypeLike] = None ) -> Dict[Tuple[int, str], np.ndarray]: """Converter for the :code:`data` argument of `pandas.DataFrame`. The resulting `~pandas.DataFrame` has multi-columns (see :doc:`pandas:user_guide/advanced`) where the first column layer represents the state IDs and the second column layer represents each of the four-momentum entries (:math:`E, p_x, p_y, p_z`). """ return { (k, label): np.transpose(v)[i] for k, v in self.items() for i, label in enumerate(["E", "px", "py", "pz"]) }
[docs]class DataSet(abc.Mapping): """A mapping of variable names to their `ScalarSequence`. The `~.DataSet.keys` of `DataSet` represent variable names in a `.HelicityModel`, while its `~.DataSet.values` are inserted in their place. """ def __init__( self, data: Mapping[str, ArrayLike], dtype: Optional[DTypeLike] = None ) -> None: self.__data = { name: ScalarSequence(v, dtype=dtype) for name, v in data.items() } if not all(map(lambda k: isinstance(k, str), self.__data)): raise TypeError(f"Not all keys {set(data)} are strings") n_events = self.n_events if any(map(lambda v: len(v) != n_events, self.values())): raise ValueError( f"Not all {FourMomentumSequence.__name__} items" f" are of length {n_events}" ) def __repr__(self) -> str: return f"{self.__class__.__name__}({self.__data})"
[docs] def __getitem__(self, i: str) -> ScalarSequence: return self.__data[i]
def __iter__(self) -> Iterator[str]: return iter(self.__data) def __len__(self) -> int: return len(self.__data) @property def n_events(self) -> int: if len(self) == 0: return 0 return len(next(iter(self.values())))
[docs] def keys(self) -> KeysView[str]: return self.__data.keys()
[docs] def items(self) -> ItemsView[str, ScalarSequence]: return self.__data.items()
[docs] def values(self) -> ValuesView[ScalarSequence]: return self.__data.values()
[docs] def append(self, other: Mapping[str, ArrayLike]) -> None: if not isinstance(other, DataSet): other = DataSet(other) if self.n_events != 0 and set(self) != set(other): raise ValueError( f"Trying to append a data set with state IDs {set(other)}" f" to a data set with state IDs {set(self)}" ) self.__data = { i: ScalarSequence(np.vstack((values, other[i]))) for i, values in self.items() }
[docs] def select_events(self, selection: Union[int, slice]) -> "DataSet": return DataSet({i: values[selection] for i, values in self.items()})
[docs] def to_pandas( self, _: Optional[DTypeLike] = None ) -> Dict[str, np.ndarray]: """Converter for the :code:`data` argument of `pandas.DataFrame`.""" return {k: np.array(v) for k, v in self.items()}