from __future__ import annotations
import logging
import warnings
from collections import defaultdict
from itertools import repeat
from pprint import pformat
from typing import (
ClassVar,
Iterable,
Iterator,
List,
Literal,
MutableMapping,
Optional,
Tuple,
Type,
TypeAlias,
TypeVar,
Union,
overload,
)
import marshmallow as mm
import pandas as pd
from dimcat.base import (
DimcatConfig,
DimcatObject,
ObjectEnum,
get_schema,
is_instance_of,
)
from dimcat.data.base import Data
from dimcat.data.datasets.base import Dataset
from dimcat.data.datasets.processed import _AnalyzedMixin
from dimcat.data.packages.dc import DimcatPackage
from dimcat.data.resources.base import (
DR,
F,
Resource,
ResourceSpecs,
resource_specs2resource,
)
from dimcat.data.resources.dc import DimcatResource, FeatureSpecs
from dimcat.data.resources.utils import (
check_configs_against_allowed_configs,
feature_specs2config,
features_argument2config_list,
)
from dimcat.dc_exceptions import (
EmptyDatasetError,
EmptyResourceError,
FeatureNotProcessableError,
FeatureUnavailableError,
NoFeaturesActiveError,
ResourceAlreadyTransformed,
ResourceNotProcessableError,
)
from dimcat.dc_warnings import OrderOfPipelineStepsWarning
module_logger = logging.getLogger(__name__)
D = TypeVar("D", bound=Data)
[docs]class PipelineStep(DimcatObject):
"""
This base class unites all classes able to transform some data in a pre-defined way.
The initializer will set some parameters of the processing, and then the
:meth:`process` method is used to transform an input Data object, returning a copy.
"""
_new_dataset_type: ClassVar[Optional[Type[Dataset]]] = None
"""If specified, :meth:`process_dataset` will return Datasets of this type, otherwise same as input type."""
_new_resource_type: ClassVar[Optional[Type[DR]]] = None
"""If specified, :meth:`process_resource` will return Resources of this type, otherwise same as input type."""
_applicable_to_empty_datasets: ClassVar[bool] = True
"""If False, :meth:`check_dataset` will raise an EmptyDatasetError if no data has been loaded yet. This makes sense
for PipelineSteps that are dependent on the data, e.g. because they use :meth:`fit_to_dataset`."""
[docs] class Schema(DimcatObject.Schema):
"""PipelineSteps do not depend on previously serialized data, so their serialization can be validated by
default after dumping them to a dict-like structure. For Data objects, this default is safe only for their
PickleSchema, which PipelineSteps do not use.
"""
[docs] @mm.post_dump()
def validate_dump(self, data, **kwargs):
"""Make sure to never return invalid serialization data."""
if "dtype" not in data:
msg = (
f"{self.name}: The serialized data doesn't have a 'dtype' field, meaning that DiMCAT would "
f"not be able to deserialize it."
)
raise mm.ValidationError(msg)
dtype_schema = get_schema(data["dtype"])
report = dtype_schema.validate(data)
if report:
raise mm.ValidationError(
f"Dump of {data['dtype']} created with a {self.name} could not be validated by "
f"{dtype_schema.name}."
f"\n\nDUMP:\n{pformat(data, sort_dicts=False)}"
f"\n\nREPORT:\n{pformat(report, sort_dicts=False)}"
)
return data
@property
def is_transformation(self) -> Literal[False]:
"""True if this PipelineStep transforms features, replacing the dataset.outputs['features'] package."""
return False
[docs] def check_dataset(self, dataset: Dataset) -> None:
"""Check if the dataset is eligible for processing.
Raises:
TypeError: if the given dataset is not a Dataset
EmptyDatasetError: if :attr:`applicable_to_empty_datasets` is False and the given dataset is empty
"""
if not isinstance(dataset, Dataset):
raise TypeError(f"Expected Dataset, got {type(dataset)}")
if isinstance(dataset, _AnalyzedMixin) and not is_instance_of(
self.name, "Analyzer"
):
warnings.warn(
f"You're applying a {self.name} to an AnalyzedDataset. As things stand, Analyzers should "
f"always be the last thing to be applied to a Dataset. Consider a different Pipeline.",
OrderOfPipelineStepsWarning,
)
if not self._applicable_to_empty_datasets:
if dataset.n_features_available == 0:
raise EmptyDatasetError
[docs] def check_resource(self, resource: Resource) -> None:
"""Check if the resource is eligible for processing.
Raises:
TypeError: if the given resource is not a DimcatResource
EmptyResourceError: if the given resource is empty
"""
if not isinstance(resource, DimcatResource):
raise TypeError(f"Expected DimcatResource, got {type(resource)}")
if resource.is_empty:
raise EmptyResourceError
[docs] def fit_to_dataset(self, dataset: Dataset) -> None:
"""Adjust the PipelineStep to the passed dataset.
Args:
dataset: The dataset to adjust to.
"""
return
def _make_new_resource(self, resource: DimcatResource) -> DR:
"""Dispatch the passed resource to the appropriate method."""
resource_constructor = self._get_new_resource_type(resource)
# This is where the input resource is being processed
resource_name = self.resource_name_factory(resource)
new_resource = resource_constructor.from_resource(
resource, resource_name=resource_name
)
self.logger.debug(
f"Created new resource {new_resource} of type {resource_constructor.name}."
)
return new_resource
def _get_new_resource_type(self, resource: DimcatResource) -> Type[DR]:
if self._new_resource_type is None:
resource_constructor = resource.__class__
else:
resource_constructor = self._new_resource_type
return resource_constructor
def _make_new_dataset(self, dataset: Dataset) -> Dataset:
if self._new_dataset_type is None:
dataset_constructor: Type[Dataset] = dataset.__class__
else:
dataset_constructor: Type[Dataset] = self._new_dataset_type
new_dataset = dataset_constructor.from_dataset(dataset)
self.logger.debug(
f"Created new dataset {new_dataset} of type {dataset_constructor.__name__}."
)
return new_dataset
def _post_process_result(
self,
result: DR,
original_resource: DimcatResource,
) -> DR:
"""Perform some post-processing on a resource after processing it."""
return result
def _pre_process_resource(self, resource: DR) -> DR:
"""Perform some pre-processing on a resource before processing it."""
return resource
@overload
def process(self, data: D) -> D:
...
@overload
def process(self, data: List[D] | Tuple[D]) -> List[D]:
...
@overload
def process(self, *data: D) -> List[D]:
...
[docs] def process(self, *data: D) -> D | List[D]:
"""Same as process_data(), with the difference that arbitrarily many objects are accepted."""
if not data:
raise ValueError("Please pass a Dataset or a Resource to process.")
if len(data) == 1:
single_obj = data[0]
if isinstance(single_obj, (Tuple, List)):
data = single_obj
else:
# a single object was given which is neither a list nor a tuple, this is the
# case where not to return a list
return self.process_data(single_obj)
return [self.process_data(d) for d in data]
@overload
def process_data(self, data: Dataset) -> Dataset:
...
@overload
def process_data(self, data: DimcatResource) -> DR:
...
[docs] def process_data(self, data: Dataset | DimcatResource) -> Dataset | DR:
"""
Perform a transformation on an input Data object. This should never alter the
Data or its properties in place, instead returning a copy or view of the input.
Args:
data: The data to be transformed. Must not be altered in place.
Returns:
A copy of the input Data, potentially transformed or enhanced in some way defined by this PipelineStep.
"""
if isinstance(data, Dataset):
return self.process_dataset(data)
if isinstance(data, DimcatResource):
return self.process_resource(data)
raise TypeError(f"Expected Dataset or DimcatResource, got {type(data)}")
def _process_dataset(self, dataset: Dataset) -> Dataset:
"""Apply this PipelineStep to a :class:`Dataset` and return a copy containing the output(s)."""
new_dataset = self._make_new_dataset(dataset)
self.fit_to_dataset(new_dataset)
# this is where subclasses create a new package and add it to the dataset
return new_dataset
[docs] def process_dataset(self, dataset: Dataset) -> Dataset:
"""Apply this PipelineStep to a :class:`Dataset` and return a copy containing the output(s)."""
self.check_dataset(dataset)
return self._process_dataset(dataset)
def _process_resource(self, resource: Resource) -> DR:
"""Apply this PipelineStep to a :class:`Resource` and return a copy containing the output(s)."""
resource = self._pre_process_resource(resource)
result = self._make_new_resource(resource)
return self._post_process_result(result, resource)
[docs] def process_resource(self, resource: ResourceSpecs) -> DR:
resource = resource_specs2resource(resource)
self.check_resource(resource)
return self._process_resource(resource)
[docs] def resource_name_factory(self, resource: DR) -> str:
"""Creates a unique name for the new resource based on the input resource."""
return resource.resource_name
[docs]class FeatureProcessingStep(PipelineStep):
"""
This class unites all PipelineSteps that work on one or all features that can be or have been extracted from a
Dataset. They can be instantiated with the ``features`` argument, with the behaviour defined by class variables.
"""
_allowed_features: ClassVar[Optional[Tuple[FeatureSpecs, ...]]] = None
"""If set, this FeatureProcessingStep can only be initialized with features that are in this tuple."""
_output_package_name: ClassVar[Optional[str]] = None
"""Name of the package in which to store the outputs of this step. If None, the PipeLine step will replace the
'features' package of the given dataset. FeatureProcessingSteps that replace the 'features' packages are called
transformations internally."""
_requires_at_least_one_feature: ClassVar[bool] = False
"""If set to True, this PipelineStep cannot be initialized without specifying at least one feature."""
[docs] class Schema(PipelineStep.Schema):
features = mm.fields.List(
mm.fields.Nested(DimcatConfig.Schema),
allow_none=True,
metadata=dict(
expose=True,
description="The Feature objects you want this PipelineStep to process. If not specified, "
"the step will try to process all features in a given Dataset's Outputs catalog.",
),
)
[docs] @mm.pre_load
def deal_with_single_item(self, data, **kwargs):
if isinstance(data, MutableMapping) and "features" in data:
if isinstance(data["features"], list):
features = data["features"]
else:
features = [data["features"]]
feature_list = []
for feature in features:
feature_list.append(feature_specs2config(feature))
data = dict(
data, features=feature_list
) # make sure to not modify data inplace
return data
def __init__(
self, features: Optional[FeatureSpecs | Iterable[FeatureSpecs]] = None, **kwargs
):
self._features: List[DimcatConfig] = []
self.features = features
if len(kwargs) > 0:
self.logger.warning(f"Ignored unknown keyword arguments: {kwargs}")
@property
def features(self) -> List[DimcatConfig]:
"""The Feature objects you want this PipelineStep to process. If not specified, the step will try to process
all features in a given Dataset's Outputs catalog."""
return self._features
@features.setter
def features(self, features):
try:
configs = features_argument2config_list(
features, allowed_configs=self._allowed_features
)
except ResourceNotProcessableError as e:
raise FeatureNotProcessableError(
features, self.name, self._allowed_features
) from e
if len(self._features) > 0:
self.logger.info(
f"Previously selected features {self._features} overwritten by {configs}."
)
self._features = configs
@property
def is_transformation(self) -> bool:
"""True if this PipelineStep replaces the :attr:`output_package_name` in dataset.outputs rather than extending
it. Currently, this is the case only if :attr:`output_package_name` 'features' or None, defaulting to
'features')."""
return (
self._output_package_name is None or self._output_package_name == "features"
)
[docs] def check_dataset(self, dataset: Dataset) -> None:
"""Check if the dataset is eligible for processing.
Raises:
TypeError: if the given dataset is not a Dataset
EmptyDatasetError: if :attr:`applicable_to_empty_datasets` is False and the given dataset is empty
NoFeaturesActiveError: if :attr:`requires_at_least_one_feature` is True and no features are active
FeatureUnavailableError: if any of the required features is not available in the dataset.
"""
super().check_dataset(dataset)
required_features = self.get_feature_specs()
if self._requires_at_least_one_feature:
if len(required_features) == 0 and dataset.n_active_features == 0:
raise NoFeaturesActiveError
for feature in required_features:
if not dataset.check_feature_availability(feature):
raise FeatureUnavailableError
[docs] def check_resource(self, resource: DimcatResource) -> None:
"""Check if the resource is eligible for processing.
Raises:
TypeError: if the given resource is not a DimcatResource
EmptyResourceError: if the given resource is empty
FeatureNotProcessableError: if the given resource cannot be processed by this step
"""
super().check_resource(resource)
if self._allowed_features:
try:
check_configs_against_allowed_configs(
resource.to_config(), self._allowed_features
)
except ResourceNotProcessableError as e:
raise ResourceNotProcessableError(
resource.resource_name,
self.name,
resource.name,
self._allowed_features,
) from e
def _iter_features(self, dataset: Dataset) -> Iterator[DimcatResource]:
"""Iterate over all features that are required for this PipelineStep.
If :meth:`get_feature_specs` returns None, the Dataset will return an iterator over all active features.
"""
feature_specs = self.get_feature_specs()
return dataset.iter_features(feature_specs)
def _iter_resources(self, dataset: Dataset) -> Iterator[Tuple[str, DimcatResource]]:
"""Iterate over all resources in the dataset's OutputCatalog.
"""
# ToDo: Pull down to ResourceTransformation and rename to iter_output_resources()
return dataset.outputs.iter_resources()
[docs] def get_feature_specs(self) -> List[DimcatConfig]:
"""Return a list of feature names required for this PipelineStep."""
return self.features
def _make_new_package(self, package_name: Optional[str] = None) -> DimcatPackage:
"""Create a new package for the output of this PipelineStep, based on :attr:`output_package_name`."""
if package_name is not None:
return DimcatPackage(package_name=package_name)
if self._output_package_name is None:
return DimcatPackage(package_name="features")
return DimcatPackage(package_name=self._output_package_name)
def _process_dataset(self, dataset: Dataset) -> Dataset:
"""Apply this PipelineStep to a :class:`Dataset` and return a copy containing the output(s)."""
new_dataset = super()._process_dataset(dataset)
resources = self._iter_features(new_dataset)
new_package = self._make_new_package()
n_processed = 0
for n_processed, resource in enumerate(resources, 1):
new_resource = self.process_resource(resource)
new_package.add_resource(new_resource)
if new_package.n_resources < n_processed:
if new_package.n_resources == 0:
self.logger.warning(
f"None of the {n_processed} features were successfully transformed."
)
else:
self.logger.warning(
f"Transformation was successful only on {new_package.n_resources} of the "
f"{n_processed} features."
)
if self.is_transformation:
new_dataset.outputs.replace_package(new_package)
else:
new_dataset.outputs.extend_package(new_package)
new_dataset._pipeline.add_step(self)
return new_dataset
StepSpecs: TypeAlias = Union[
PipelineStep | Type[PipelineStep] | DimcatConfig | dict | ObjectEnum | str
]