Source code for dimcat.steps.pipelines.base

from __future__ import annotations

import logging
from typing import Iterable, Iterator, List, Literal, Optional, Type, overload

from dimcat import Dataset, DimcatConfig
from dimcat.base import (
    DimcatObject,
    DimcatObjectField,
    get_class,
    make_config_from_specs,
    make_object_from_specs,
)
from dimcat.data.resources.base import DR
from dimcat.data.resources.dc import DimcatResource
from dimcat.dc_exceptions import NoMatchingPipelineStepFoundError
from dimcat.steps.base import PipelineStep, StepSpecs
from marshmallow import fields

module_logger = logging.getLogger(__name__)


[docs]class Pipeline(PipelineStep):
[docs] @classmethod def from_step_configs(cls, configs: Iterable[DimcatConfig]) -> Pipeline: steps = [] for config in configs: if not isinstance(config, DimcatConfig): config = DimcatConfig(config) steps.append(config.create()) return cls(steps)
[docs] @classmethod def from_pipeline(cls, pipeline: Pipeline) -> Pipeline: return cls(steps=pipeline.steps)
[docs] class Schema(PipelineStep.Schema): steps = fields.List(DimcatObjectField())
def __init__( self, steps: Optional[StepSpecs | Iterable[StepSpecs]] = None, **kwargs, ): super().__init__(**kwargs) self._steps: List[PipelineStep] = [] if steps is not None: self.steps = steps def __iter__(self) -> Iterator[PipelineStep]: yield from self._steps def __len__(self): return len(self._steps) @property def steps(self) -> List[PipelineStep]: """The pipeline steps as a list. Modifying the list does not affect the pipeline.""" return list(self._steps) @steps.setter def steps( self, steps: StepSpecs | Iterable[StepSpecs], ) -> None: if isinstance(steps, (DimcatObject, dict, type, str)): steps = [steps] for step in steps: self.add_step(step)
[docs] def add_step(self, step: StepSpecs) -> None: step = make_object_from_specs(step, PipelineStep) self._steps.append(step)
[docs] def get_last_step( self, step_specs: Optional[StepSpecs] = None, allow_subclasses: bool = True, ) -> PipelineStep: """Returns the last step that matches the given specs. Args: step_specs: Specification that can be converted to a :class:`DimcatConfig` describing a :class:`PipelineStep`. If None, the last step is returned. allow_subclasses: By default, matches the last applied :class:`PipelineStep` of the type described by ``step_specs`` or one of its subclasses. Set to ``False`` to return the last step that matches exactly. Returns: PipelineStep object that matches the given specs. Raises: NoMatchingPipelineStepFoundError: If no matching step is found. """ if step_specs is None: if len(self._steps) == 0: raise NoMatchingPipelineStepFoundError return self._steps[-1] steps = self.get_steps(step_specs, allow_subclasses=allow_subclasses) if len(steps) == 0: raise NoMatchingPipelineStepFoundError(step_specs) return steps[-1]
[docs] def get_steps( self, step_specs: Optional[StepSpecs] = None, allow_subclasses: bool = True, ) -> List[PipelineStep]: """Returns all steps that match the given specs. Args: step_specs: Specification that can be converted to a :class:`DimcatConfig` describing a :class:`PipelineStep`. If None, all steps are returned (equivalent to :attr:`steps`). allow_subclasses: By default, matching subclasses of the :class:`PipelineStep` described by ``step_specs`` are also included. Set to ``False`` to only return steps that match exactly. Returns: PipelineStep objects that matches the given specs. """ if step_specs is None: return self.steps step_config = make_config_from_specs(step_specs, "PipelineStep") covariant = allow_subclasses steps = [] for step in self._steps: cfg = step.to_config() if cfg.matches(step_config, covariant=covariant): steps.append(step) return steps
@overload def info(self, return_str: Literal[False] = False) -> None: ... @overload def info(self, return_str: Literal[True]) -> str: ...
[docs] def info(self, return_str=False) -> Optional[str]: """Show the names of the included steps.""" info_str = f"Pipeline([{', '.join(step.name for step in self._steps)}])" if return_str: return info_str print(info_str)
def _process_dataset(self, dataset: Dataset) -> Dataset: if len(self._steps) == 0: self.logger.info("Nothing to do.") return dataset processed_dataset = dataset for step in self._steps: previous_dataset = processed_dataset processed_dataset = step.process(previous_dataset) # ToDo: checks? return processed_dataset def _process_resource( self, resource: DimcatResource, ignore_exceptions: bool = False, skip_step_types: Optional[Iterable[Type[PipelineStep] | str]] = None, ) -> DR: if skip_step_types: if isinstance(skip_step_types, (str, type)): skip_step_types = [skip_step_types] ignored_types = [ get_class(dtype) if isinstance(dtype, str) else dtype for dtype in skip_step_types ] ignored_types = tuple(set(ignored_types)) pipeline_steps = [ step for step in self._steps if not isinstance(step, ignored_types) ] else: pipeline_steps = self._steps if len(pipeline_steps) == 0: self.logger.info("Nothing to do.") return resource processed_resource = resource for step in pipeline_steps: previous_resource = processed_resource try: processed_resource = step.process_resource(previous_resource) except Exception as e: if ignore_exceptions: self.logger.info( f"{step.name!r}._process_resource() failed on {previous_resource.resource_name!r} and " f"ignore_exceptions=True, so the {previous_resource.name} will not have been processed by " f"this PipelineStep. Exception:\n{e!r}" ) continue raise # ToDo: Check the processed resource and handle errors return processed_resource def __getitem__(self, item): return self._steps[item]