dimcat.steps.analyzers package#

Submodules#

dimcat.steps.analyzers.base module#

Analyzers are PipelineSteps that process data and store the results in Data.processed.

class dimcat.steps.analyzers.base.Analyzer(features: Optional[Union[Feature, Type[Feature], DimcatConfig, MutableMapping, FeatureName, str, Iterable[Union[Feature, Type[Feature], DimcatConfig, MutableMapping, FeatureName, str]]]] = None, strategy: DispatchStrategy = DispatchStrategy.GROUPBY_APPLY, smallest_unit: UnitOfAnalysis = UnitOfAnalysis.SLICE, dimension_column: Optional[str] = None)[source]#

Bases: FeatureProcessingStep

Analyzers are PipelineSteps that process data and store the results in Data.processed. The base class performs no analysis, instantiating it serves mere testing purpose.

class Schema(*, only: Optional[Union[Sequence[str], AbstractSet[str]]] = None, exclude: Union[Sequence[str], AbstractSet[str]] = (), many: Optional[bool] = None, load_only: Union[Sequence[str], AbstractSet[str]] = (), dump_only: Union[Sequence[str], AbstractSet[str]] = (), partial: Optional[Union[bool, Sequence[str], AbstractSet[str]]] = None, unknown: Optional[Literal['exclude', 'include', 'raise']] = None)[source]#

Bases: Schema

dump_fields: dict[str, Field]#
exclude: set[Any] | MutableSet[Any]#
features_as_list(obj, **kwargs)[source]#

Ensure that features is a list.

fields: dict[str, Field]#

Dictionary mapping field_names -> Field objects

load_fields: dict[str, Field]#
opts: Any = <marshmallow.schema.SchemaOpts object>#
unknown: types.UnknownOption#
static aggregate(result_a: R, result_b: R) R[source]#

Static method that combines two results of compute().

This needs to be equivalent to calling self.compute on the concatenation of the respective data resulting in the two arguments.

static compute(feature: Feature, **kwargs) Any[source]#

Static method that performs the actual computation on a single unit of analysis (slice, piece, or group). The result of analyzing a resource should be tantamount to a concatenation of the results of applying self.compute() to each contained unit, turned into a Feature object in its own right. In practice, the analyzers .groupby_apply() method re-implements the same computation and performs it on the entire DataFrame at once using .groupby(). In other words, it would be redundant to turn each group into a Feature first. self.compute(), however, cannot take a DataFrame as input because it is a static method that needs to rely on the Feature object to know which column(s) to process.

property dimension_column: Optional[str]#

Name of a column, contained in the Results produced by this analyzer, containing some dimension, e.g. one to be interpreted as quantity (durations, counts, etc.) or as color.

groupby_apply(feature: Feature, groupby: Optional[Series] = None, **kwargs)[source]#

Performs the computation on a groupby. The value of groupby needs to be a Series of the same length as feature or otherwise work as positional argument to feature.groupby().

resource_name_factory(resource: DimcatResource) str[source]#

Returns a name for the resource based on its name and the name of the pipeline step.

property smallest_unit: UnitOfAnalysis#
property strategy: DispatchStrategy#
class dimcat.steps.analyzers.base.AnalyzerName(value)[source]#

Bases: ObjectEnum

Identifies the available analyzers.

Analyzer = 'Analyzer'#
BigramAnalyzer = 'BigramAnalyzer'#
CadenceCounter = 'CadenceCounter'#
Counter = 'Counter'#
PhraseDataAnalyzer = 'PhraseDataAnalyzer'#
PitchClassVectors = 'PitchClassVectors'#
PrevalenceAnalyzer = 'PrevalenceAnalyzer'#
Proportions = 'Proportions'#
class dimcat.steps.analyzers.base.DispatchStrategy(value)[source]#

Bases: str, Enum

An enumeration.

GROUPBY_APPLY = 'GROUPBY_APPLY'#

dimcat.steps.analyzers.counters module#

class dimcat.steps.analyzers.counters.BigramAnalyzer(features: Optional[Union[Feature, Type[Feature], DimcatConfig, MutableMapping, FeatureName, str, Iterable[Union[Feature, Type[Feature], DimcatConfig, MutableMapping, FeatureName, str]]]] = None, n: int = 2, format: NgramTableFormat = NgramTableFormat.CONVENIENCE, strategy: DispatchStrategy = DispatchStrategy.GROUPBY_APPLY, smallest_unit: UnitOfAnalysis = UnitOfAnalysis.SLICE, dimension_column: Optional[str] = None)[source]#

Bases: NgramAnalyzer

property n: int#
resource_name_factory(resource: DR) str[source]#

Returns a name for the resource based on its name and the name of the pipeline step.

class dimcat.steps.analyzers.counters.CadenceCounter(features: Optional[Union[Feature, Type[Feature], DimcatConfig, MutableMapping, FeatureName, str, Iterable[Union[Feature, Type[Feature], DimcatConfig, MutableMapping, FeatureName, str]]]] = None, strategy: DispatchStrategy = DispatchStrategy.GROUPBY_APPLY, smallest_unit: UnitOfAnalysis = UnitOfAnalysis.SLICE, dimension_column: Optional[str] = None)[source]#

Bases: Counter

class dimcat.steps.analyzers.counters.Counter(features: Optional[Union[Feature, Type[Feature], DimcatConfig, MutableMapping, FeatureName, str, Iterable[Union[Feature, Type[Feature], DimcatConfig, MutableMapping, FeatureName, str]]]] = None, strategy: DispatchStrategy = DispatchStrategy.GROUPBY_APPLY, smallest_unit: UnitOfAnalysis = UnitOfAnalysis.SLICE, dimension_column: Optional[str] = None)[source]#

Bases: Analyzer

class Schema(*, only: Optional[Union[Sequence[str], AbstractSet[str]]] = None, exclude: Union[Sequence[str], AbstractSet[str]] = (), many: Optional[bool] = None, load_only: Union[Sequence[str], AbstractSet[str]] = (), dump_only: Union[Sequence[str], AbstractSet[str]] = (), partial: Optional[Union[bool, Sequence[str], AbstractSet[str]]] = None, unknown: Optional[Literal['exclude', 'include', 'raise']] = None)[source]#

Bases: Schema

dump_fields: dict[str, Field]#
exclude: set[Any] | MutableSet[Any]#
fields: dict[str, Field]#

Dictionary mapping field_names -> Field objects

load_fields: dict[str, Field]#
opts: Any = <marshmallow.schema.SchemaOpts object>#
unknown: types.UnknownOption#
static compute(feature: Feature, **kwargs) D[source]#

Static method that performs the actual computation on a single unit of analysis (slice, piece, or group). The result of analyzing a resource should be tantamount to a concatenation of the results of applying self.compute() to each contained unit, turned into a Feature object in its own right. In practice, the analyzers .groupby_apply() method re-implements the same computation and performs it on the entire DataFrame at once using .groupby(). In other words, it would be redundant to turn each group into a Feature first. self.compute(), however, cannot take a DataFrame as input because it is a static method that needs to rely on the Feature object to know which column(s) to process.

groupby_apply(feature: Feature, groupby: Optional[Series] = None, **kwargs)[source]#

Performs the computation on a groupby. The value of groupby needs to be a Series of the same length as feature or otherwise work as positional argument to feature.groupby().

resource_name_factory(resource: DR) str[source]#

Returns a name for the resource based on its name and the name of the pipeline step.

class dimcat.steps.analyzers.counters.NgramAnalyzer(features: Optional[Union[Feature, Type[Feature], DimcatConfig, MutableMapping, FeatureName, str, Iterable[Union[Feature, Type[Feature], DimcatConfig, MutableMapping, FeatureName, str]]]] = None, n: int = 2, format: NgramTableFormat = NgramTableFormat.CONVENIENCE, strategy: DispatchStrategy = DispatchStrategy.GROUPBY_APPLY, smallest_unit: UnitOfAnalysis = UnitOfAnalysis.SLICE, dimension_column: Optional[str] = None)[source]#

Bases: Analyzer

class Schema(*, only: Optional[Union[Sequence[str], AbstractSet[str]]] = None, exclude: Union[Sequence[str], AbstractSet[str]] = (), many: Optional[bool] = None, load_only: Union[Sequence[str], AbstractSet[str]] = (), dump_only: Union[Sequence[str], AbstractSet[str]] = (), partial: Optional[Union[bool, Sequence[str], AbstractSet[str]]] = None, unknown: Optional[Literal['exclude', 'include', 'raise']] = None)[source]#

Bases: Schema

dump_fields: dict[str, Field]#
exclude: set[Any] | MutableSet[Any]#
fields: dict[str, Field]#

Dictionary mapping field_names -> Field objects

load_fields: dict[str, Field]#
opts: Any = <marshmallow.schema.SchemaOpts object>#
unknown: types.UnknownOption#
static compute(feature: dimcat.data.resources.dc.DimcatResource | pandas.core.frame.DataFrame, **kwargs) int[source]#

Static method that performs the actual computation on a single unit of analysis (slice, piece, or group). The result of analyzing a resource should be tantamount to a concatenation of the results of applying self.compute() to each contained unit, turned into a Feature object in its own right. In practice, the analyzers .groupby_apply() method re-implements the same computation and performs it on the entire DataFrame at once using .groupby(). In other words, it would be redundant to turn each group into a Feature first. self.compute(), however, cannot take a DataFrame as input because it is a static method that needs to rely on the Feature object to know which column(s) to process.

property format: NgramTableFormat#
groupby_apply(feature: Feature, groupby: Optional[Series] = None, **kwargs)[source]#

Performs the computation on a groupby. The value of groupby needs to be a Series of the same length as feature or otherwise work as positional argument to feature.groupby().

property n: int#
resource_name_factory(resource: DR) str[source]#

Returns a name for the resource based on its name and the name of the pipeline step.

dimcat.steps.analyzers.phrases module#

class dimcat.steps.analyzers.phrases.PhraseDataAnalyzer(features: Optional[Union[Feature, Type[Feature], DimcatConfig, MutableMapping, FeatureName, str, Iterable[Union[Feature, Type[Feature], DimcatConfig, MutableMapping, FeatureName, str]]]] = None, columns: Union[str, List[str]] = 'label', components: Union[PhraseComponentName, Literal['phrase'], Iterable[PhraseComponentName]] = 'body', query: Optional[str] = None, reverse: bool = False, level_name: str = 'i', format: PhraseDataFormat = PhraseDataFormat.LONG, drop_levels: Union[bool, int, str, Iterable[str | int]] = False, drop_duplicated_ultima_rows: bool = False, strategy: DispatchStrategy = DispatchStrategy.GROUPBY_APPLY, smallest_unit: UnitOfAnalysis = UnitOfAnalysis.SLICE, dimension_column: Optional[str] = None)[source]#

Bases: Analyzer

class Schema(*, only: Optional[Union[Sequence[str], AbstractSet[str]]] = None, exclude: Union[Sequence[str], AbstractSet[str]] = (), many: Optional[bool] = None, load_only: Union[Sequence[str], AbstractSet[str]] = (), dump_only: Union[Sequence[str], AbstractSet[str]] = (), partial: Optional[Union[bool, Sequence[str], AbstractSet[str]]] = None, unknown: Optional[Literal['exclude', 'include', 'raise']] = None)[source]#

Bases: Schema

dump_fields: dict[str, Field]#
exclude: set[Any] | MutableSet[Any]#
fields: dict[str, Field]#

Dictionary mapping field_names -> Field objects

load_fields: dict[str, Field]#
opts: Any = <marshmallow.schema.SchemaOpts object>#
unknown: types.UnknownOption#
property columns: List[str]#
property components: List[PhraseComponentName]#
property format: PhraseDataFormat#
groupby_apply(feature: Feature, groupby: Optional[Series] = None, **kwargs)[source]#

Performs the computation on a groupby. The value of groupby needs to be a Series of the same length as feature or otherwise work as positional argument to feature.groupby().

resource_name_factory(resource: DR) str[source]#

Returns a name for the resource based on its name and the name of the pipeline step.

dimcat.steps.analyzers.prevalence module#

class dimcat.steps.analyzers.prevalence.PrevalenceAnalyzer(features: Optional[Union[Feature, Type[Feature], DimcatConfig, MutableMapping, FeatureName, str, Iterable[Union[Feature, Type[Feature], DimcatConfig, MutableMapping, FeatureName, str]]]] = None, columns: Optional[Union[str, Iterable[str]]] = None, index: Optional[Union[str, Iterable[str]]] = None, strategy: DispatchStrategy = DispatchStrategy.GROUPBY_APPLY, smallest_unit: UnitOfAnalysis = UnitOfAnalysis.SLICE, dimension_column: Optional[str] = None)[source]#

Bases: Analyzer

Creates what is the equivalent to NLP’s “frequency matrix” except that in the case of music, the coefficients are not restricted to represent count frequencies (when created from a Counts object) but can also represent durations (when created from a Durations object). When the analyzer is applied to a Feature, its default analysis will be used.

class Schema(*, only: Optional[Union[Sequence[str], AbstractSet[str]]] = None, exclude: Union[Sequence[str], AbstractSet[str]] = (), many: Optional[bool] = None, load_only: Union[Sequence[str], AbstractSet[str]] = (), dump_only: Union[Sequence[str], AbstractSet[str]] = (), partial: Optional[Union[bool, Sequence[str], AbstractSet[str]]] = None, unknown: Optional[Literal['exclude', 'include', 'raise']] = None)[source]#

Bases: Schema

dump_fields: dict[str, Field]#
exclude: set[Any] | MutableSet[Any]#
fields: dict[str, Field]#

Dictionary mapping field_names -> Field objects

load_fields: dict[str, Field]#
opts: Any = <marshmallow.schema.SchemaOpts object>#
unknown: types.UnknownOption#
property columns: List[str]#
static compute(resource: Union[D, DimcatResource], index: Optional[Union[str, Iterable[str]]] = None, columns: Optional[Union[str, Iterable[str]]] = None, smallest_unit: UnitOfAnalysis = UnitOfAnalysis.SLICE, dimension_column: Optional[str] = None, **kwargs) D[source]#

Computes the prevalence matrix from the given resource. This is basically a wrapper around pandas.DataFrame.pivot_table() with aggfunc="sum".

Parameters:
  • resource – A dataframe, Feature or Result which will be pivoted to produce a prevalence with index index level(s) and columns column level(s), summing up the respective values contained in dimension_column.

  • index – Column(s) and/or index level name(s) that will make up the index values of the PrevalenceMatrix (akin to a groupby). By default, all but the last level will be used.

  • columns – Column(s) and/or index level name(s) that will make up the column names of the PrevalenceMatrix. By default, the value_column will be used.

  • smallest_unit – The smallest unit to consider for analysis. Relevant only when index is not specified and resource is a DimcatResource.

  • dimension_column – Name of the column that represents absolute prevalence values, typically “duration_qb” or “count”. Required only when resource is a dataframe.

  • **kwargs

Returns:

A pivot table with summed (=absolute) prevalence coefficients. For the analogy with NLP’s frequency matrix, the index will correspond to documents and the columns to the vocabulary (words/tokens).

groupby_apply(feature: dimcat.data.resources.results.Result | dimcat.data.resources.dc.Feature, groupby: Optional[Union[str, Iterable[str]]] = None, **kwargs) D[source]#

Performs the computation on a groupby. The value of groupby needs to be a Series of the same length as feature or otherwise work as positional argument to feature.groupby().

property index: List[str]#

dimcat.steps.analyzers.proportions module#

class dimcat.steps.analyzers.proportions.PitchClassVectors(features: Optional[Union[Feature, Type[Feature], DimcatConfig, MutableMapping, FeatureName, str, Iterable[Union[Feature, Type[Feature], DimcatConfig, MutableMapping, FeatureName, str]]]] = None, strategy: DispatchStrategy = DispatchStrategy.GROUPBY_APPLY, smallest_unit: UnitOfAnalysis = UnitOfAnalysis.SLICE, dimension_column: Optional[str] = None)[source]#

Bases: Proportions

resource_name_factory(resource: DR) str[source]#

Returns a name for the resource based on its name and the name of the pipeline step.

class dimcat.steps.analyzers.proportions.Proportions(features: Optional[Union[Feature, Type[Feature], DimcatConfig, MutableMapping, FeatureName, str, Iterable[Union[Feature, Type[Feature], DimcatConfig, MutableMapping, FeatureName, str]]]] = None, strategy: DispatchStrategy = DispatchStrategy.GROUPBY_APPLY, smallest_unit: UnitOfAnalysis = UnitOfAnalysis.SLICE, dimension_column: Optional[str] = None)[source]#

Bases: Analyzer

class Schema(*, only: Optional[Union[Sequence[str], AbstractSet[str]]] = None, exclude: Union[Sequence[str], AbstractSet[str]] = (), many: Optional[bool] = None, load_only: Union[Sequence[str], AbstractSet[str]] = (), dump_only: Union[Sequence[str], AbstractSet[str]] = (), partial: Optional[Union[bool, Sequence[str], AbstractSet[str]]] = None, unknown: Optional[Literal['exclude', 'include', 'raise']] = None)[source]#

Bases: Schema

dump_fields: dict[str, Field]#
exclude: set[Any] | MutableSet[Any]#
fields: dict[str, Field]#

Dictionary mapping field_names -> Field objects

load_fields: dict[str, Field]#
opts: Any = <marshmallow.schema.SchemaOpts object>#
unknown: types.UnknownOption#
check_resource(resource: DimcatResource) None[source]#

Check if the resource has a value column.

static compute(feature: Feature, **kwargs) D[source]#

Static method that performs the actual computation on a single unit of analysis (slice, piece, or group). The result of analyzing a resource should be tantamount to a concatenation of the results of applying self.compute() to each contained unit, turned into a Feature object in its own right. In practice, the analyzers .groupby_apply() method re-implements the same computation and performs it on the entire DataFrame at once using .groupby(). In other words, it would be redundant to turn each group into a Feature first. self.compute(), however, cannot take a DataFrame as input because it is a static method that needs to rely on the Feature object to know which column(s) to process.

groupby_apply(feature: Feature, groupby: Optional[Series] = None, **kwargs)[source]#

Performs the computation on a groupby. The value of groupby needs to be a Series of the same length as feature or otherwise work as positional argument to feature.groupby().

resource_name_factory(resource: DR) str[source]#

Returns a name for the resource based on its name and the name of the pipeline step.

Module contents#