Source code for dimcat.data.resources.facets

from __future__ import annotations

import logging
from pathlib import Path
from typing import Any, Dict, Iterable, List, Optional, Tuple  # , ClassVar, Tuple

import frictionless as fl
import ms3
import numpy as np
import pandas as pd
from dimcat.base import DimcatConfig, ObjectEnum
from dimcat.data.resources import DimcatResource, FeatureName, Metadata, Resource
from dimcat.data.resources.base import D, S
from dimcat.data.resources.dc import HARMONY_FEATURE_NAMES, Playthrough
from dimcat.data.resources.features import (
    CHORD_TONE_INTERVALS_COLUMNS,
    CHORD_TONE_SCALE_DEGREES_COLUMNS,
    HARMONY_FEATURE_COLUMNS,
    CadenceLabels,
    DcmlAnnotations,
    KeyAnnotations,
    PhraseAnnotations,
)
from dimcat.data.resources.utils import (
    apply_playthrough,
    boolean_is_minor_column_to_mode,
    condense_dataframe_by_groups,
    condense_pedal_points,
    drop_rows_with_missing_values,
    make_adjacency_groups,
    make_group_start_mask,
    make_groups_lasts_mask,
    safe_row_tuple,
    tuple2str,
    update_duration_qb,
)
from dimcat.dc_exceptions import DataframeIsMissingExpectedColumnsError
from numpy import typing as npt
from typing_extensions import Self

module_logger = logging.getLogger(__name__)

# region helpers


[docs]def add_chord_tone_scale_degrees( feature_df, ): """Turns 'chord_tones' column into multiple scale-degree columns.""" columns_to_add = CHORD_TONE_SCALE_DEGREES_COLUMNS if all(col in feature_df.columns for col in columns_to_add): return feature_df expected_columns = ("chord_tones", "localkey_is_minor", "localkey_mode") if not all(col in feature_df.columns for col in expected_columns): raise DataframeIsMissingExpectedColumnsError( [col for col in expected_columns if col not in feature_df.columns], feature_df.columns.to_list(), ) concatenate_this = [feature_df] if "scale_degrees" not in feature_df.columns: concatenate_this.append( ms3.transform( feature_df, ms3.fifths2sd, ["chord_tones", "localkey_is_minor"] ).rename("scale_degrees") ) if "scale_degrees_major" not in feature_df.columns: concatenate_this.append( ms3.transform(feature_df.chord_tones, ms3.fifths2sd, minor=False).rename( "scale_degrees_major" ) ) if "scale_degrees_minor" not in feature_df.columns: concatenate_this.append( ms3.transform(feature_df.chord_tones, ms3.fifths2sd, minor=True).rename( "scale_degrees_minor" ) ) feature_df = pd.concat(concatenate_this, axis=1) if "scale_degrees_and_mode" not in feature_df.columns: sd_and_mode = pd.Series( feature_df[["scale_degrees", "localkey_mode"]].itertuples( index=False, name=None ), index=feature_df.index, name="scale_degrees_and_mode", ) concatenate_this = [feature_df, sd_and_mode.apply(tuple2str)] feature_df = pd.concat(concatenate_this, axis=1) return feature_df
[docs]def add_chord_tone_intervals( feature_df, ): """Turns 'chord_tones' column into one or two additional columns, depending on whether a 'root' column is present, where the chord_tones (which come as fifths) are represented as strings representing intervals over the bass_note and above the root, if present. """ columns_to_add = CHORD_TONE_INTERVALS_COLUMNS if all(col in feature_df.columns for col in columns_to_add): return feature_df expected_columns = ("chord_tones",) # "root" is optional if not all(col in feature_df.columns for col in expected_columns): raise DataframeIsMissingExpectedColumnsError( [col for col in expected_columns if col not in feature_df.columns], feature_df.columns.to_list(), ) concatenate_this = [feature_df] if "intervals_over_bass" not in feature_df.columns: concatenate_this.append( ms3.transform( feature_df.chord_tones, chord_tones2interval_structure ).rename("intervals_over_bass") ) if "intervals_over_root" not in feature_df.columns and "root" in feature_df.columns: concatenate_this.append( ms3.transform( feature_df, chord_tones2interval_structure, ["chord_tones", "root"] ).rename("intervals_over_root") ) feature_df = pd.concat(concatenate_this, axis=1) return feature_df
[docs]def chord_tones2interval_structure( fifths: Iterable[int], reference: Optional[int] = None ) -> Tuple[str]: """The fifth are interpreted as intervals expressing distances from the local tonic ("neutral degrees"). The result will be a tuple of strings that express the same intervals but expressed with respect to the given reference (neutral degree), removing unisons. If no reference is specified, the first degree (usually, the bass note) is used as such. """ try: fifths = tuple(fifths) if len(fifths) == 0: return () except Exception: return () if reference is None: reference = fifths[0] elif reference in fifths: position = fifths.index(reference) if position > 0: fifths = fifths[position:] + fifths[:position] adapted_intervals = [ ms3.fifths2iv(adapted) for interval in fifths if (adapted := interval - reference) != 0 ] return tuple(adapted_intervals)
[docs]def extend_cadence_feature( feature_df, ): columns_to_add = ( "cadence_type", "cadence_subtype", ) if all(col in feature_df.columns for col in columns_to_add): return feature_df if "cadence" not in feature_df.columns: raise DataframeIsMissingExpectedColumnsError( "cadence", feature_df.columns.to_list(), ) split_labels = feature_df.cadence.str.split(".", expand=True).rename( columns={0: "cadence_type", 1: "cadence_subtype"} ) feature_df = pd.concat([feature_df, split_labels], axis=1) return feature_df
[docs]def extend_keys_feature( feature_df, ): columns_to_add = ( "globalkey_mode", "localkey_mode", "localkey_resolved", # resolves relative keys such as V/V (to II) "localkey_and_mode", ) if all(col in feature_df.columns for col in columns_to_add): return feature_df expected_columns = ("localkey", "localkey_is_minor", "globalkey_is_minor") if not all(col in feature_df.columns for col in expected_columns): raise DataframeIsMissingExpectedColumnsError( [col for col in expected_columns if col not in feature_df.columns], feature_df.columns.to_list(), ) concatenate_this = [ feature_df, boolean_is_minor_column_to_mode(feature_df.globalkey_is_minor).rename( "globalkey_mode" ), boolean_is_minor_column_to_mode(feature_df.localkey_is_minor).rename( "localkey_mode" ), ms3.transform( feature_df, ms3.resolve_relative_keys, ["localkey", "globalkey_is_minor"] ).rename("localkey_resolved"), ] feature_df = pd.concat(concatenate_this, axis=1) concatenate_this = [ feature_df, feature_df[["localkey", "globalkey_mode"]] .apply(safe_row_tuple, axis=1) .rename("localkey_and_mode"), ] feature_df = pd.concat(concatenate_this, axis=1) return feature_df
[docs]def extend_harmony_feature( feature_df, ): """Requires previous application of :func:`transform_keys_feature`.""" columns_to_add = HARMONY_FEATURE_COLUMNS if all(col in feature_df.columns for col in columns_to_add): return feature_df expected_columns = ( "chord", "form", "figbass", "pedal", "numeral", "relativeroot", "globalkey_is_minor", "localkey_is_minor", "localkey_mode", "localkey_resolved", ) if not all(col in feature_df.columns for col in expected_columns): raise DataframeIsMissingExpectedColumnsError( [col for col in expected_columns if col not in feature_df.columns], feature_df.columns.to_list(), ) concatenate_this = [feature_df] if "root_roman" not in feature_df.columns: concatenate_this.append( (feature_df.numeral + ("/" + feature_df.relativeroot).fillna("")).rename( "root_roman" ) ) if "relativeroot_resolved" not in feature_df.columns: concatenate_this.append( ms3.transform( feature_df, ms3.resolve_relative_keys, ["relativeroot", "localkey_is_minor"], ).rename("relativeroot_resolved") ) if "effective_localkey" not in feature_df.columns: concatenate_this.append( ( effective_localkey := ( (feature_df.relativeroot + "/").fillna("") + feature_df.localkey_resolved ).rename("effective_localkey") ) ) effective_localkey_and_mode = pd.concat( [effective_localkey, feature_df.globalkey_is_minor], axis=1 ) concatenate_this.append( ( effective_localkey_resolved := ms3.transform( effective_localkey_and_mode, ms3.resolve_relative_keys ).rename("effective_localkey_resolved") ) ) else: effective_localkey_resolved = feature_df.effective_localkey_resolved if "effective_localkey_is_minor" not in feature_df.columns: concatenate_this.append( effective_localkey_resolved.str.islower() .fillna(feature_df.localkey_is_minor) .rename("effective_localkey_is_minor") ) if "chord_reduced" not in feature_df.columns: concatenate_this.append( ( reduced_col := make_chord_col( feature_df, cols=["numeral", "form", "figbass", "relativeroot"], name="chord_reduced", ) ) ) else: reduced_col = feature_df.chord_reduced if "chord_reduced_and_mode" not in feature_df.columns: concatenate_this.append( (reduced_col + ", " + feature_df.localkey_mode).rename( "chord_reduced_and_mode" ) ) if "pedal_resolved" not in feature_df.columns: concatenate_this.append( ms3.transform( feature_df, ms3.resolve_relative_keys, ["pedal", "localkey_is_minor"] ).rename("pedal_resolved") ) if "chord_and_mode" not in feature_df.columns: concatenate_this.append( feature_df[["chord", "localkey_mode"]] .apply(safe_row_tuple, axis=1) .rename("chord_and_mode") ) if "applied_to_numeral" not in feature_df.columns: applied_to_numeral = feature_df.relativeroot.str.split("/").map( lambda lst: lst[-1], na_action="ignore" ) concatenate_this.append(applied_to_numeral.rename("applied_to_numeral")) else: applied_to_numeral = feature_df.applied_to_numeral if "numeral_or_applied_to_numeral" not in feature_df.columns: concatenate_this.append( applied_to_numeral.copy() .fillna(feature_df.numeral) .rename("numeral_or_applied_to_numeral") ) # if "root_roman_resolved" not in feature_df.columns: # concatenate_this.append( # ms3.transform( # feature_df, # ms3.rel2abs_key, # ["numeral", "localkey_resolved", "localkey_resolved_is_minor"], # ).rename("root_roman_resolved") # ) feature_df = pd.concat(concatenate_this, axis=1) return feature_df
def _get_body_end_positions_from_raw_phrases(phrase_df: D) -> List[int]: """Returns for each phrase body the index position of the last row. Typical input is a dataframe representing a MultiIndex. Expects the columns 'phrase_id' and 'phrase_component'. If the latter is present, all components except 'body' are disregarded. If not, phrase sequences are expected to be bodies only. """ body_end_positions = [] if "phrase_component" in phrase_df.columns: for (phrase_id, phrase_component), idx in phrase_df.groupby( ["phrase_id", "phrase_component"] ).indices.items(): if phrase_component != "body": continue body_end_positions.append(idx[-1]) else: for idx in phrase_df.groupby("phrase_id").indices.values(): body_end_positions.append(idx[-1]) return body_end_positions def _get_body_start_positions_from_raw_phrases(phrase_df: D) -> List[int]: """Returns for each phrase body the index position of the first row.""" body_start_positions = [] phrase_df = phrase_df.reset_index() if "phrase_component" in phrase_df.columns: for (phrase_id, phrase_component), idx in phrase_df.groupby( ["phrase_id", "phrase_component"] ).indices.items(): if phrase_component != "body": continue body_start_positions.append(idx[0]) else: for phrase_id, idx in phrase_df.groupby("phrase_id").indices.items(): body_start_positions.append(idx[0]) return body_start_positions def _get_index_intervals_for_phrases( markers: S, n_ante: int = 0, n_post: int = 0, logger: Optional[logging.Logger] = None, ) -> List[Tuple[int, int, Optional[int], int, int]]: """Expects a Series with a RangeIndex and computes (from, to) index position intervals based on the presence of either the start_symbol or the end_symbol. If both are found, an error is thrown. If None is found, the result is an empty list. The function operates based on the constants start_symbol ``"{"`` If this symbol is present in any of the series' strings, intervals will be formed starting from one to the next occurrences (within strings). The interval for the last symbol reaches until the end of the series (that is, the last index position + 1). end_symbol ``"\\"`` If this symbol is present in any of the series' strings, intervals will be formed starting from the first index position to the position of the first end_symbol + 1, and from there until one after the next, and so on. Args: markers: A Series containing either start or end symbols of phrases. Expected to have a RangeIndex. When the series corresponds to a chunk of a larger one, the RangeIndex should correspond to the respective positions in the original series. n_ante: Pass a positive integer to have the intervals include n earlier positions. n_post: Pass a positive integer > 0 to have the intervals include n subsequent positions. The minimum is 1 because for new-style phrase endings (``}``) the end_symbol may actually appear only with the beginning of the subsequent phrase in the case of ``}{``. logger: Returns: A list of (first_i, start_i, end_i, subsequent_i, stop_i) index positions that can be used for slicing rows of the dataframe from which the series was taken. The meaning of the included slice intervals is as follows: * ``[start_i:start_i)``: The n_ante positions before the phrase. * ``[start_i:end_i]``: The body of the phrase, including end symbol. * ``[end_i:subsequent_i)``: The codetta, i.e., the part between the end_symbol and the subsequent phrase. In the case of phrase overlap, the two are identical and the codetta is empty. * ``[subsequent_i:stop_i)``: The n_post positions after the phrase. """ if logger is None: logger = module_logger present_symbols = markers.unique() start_symbol, end_symbol = "{", r"\\" has_start = start_symbol in present_symbols has_end = end_symbol in present_symbols if not (has_start or has_end): return [] if has_start and has_end: logger.warning( f"Currently I can create phrases either based on end symbols or on start symbols, but this df has both:" f":\n{markers.value_counts().to_dict()}\nUsing {start_symbol}, ignoring {end_symbol}..." ) ix_min = markers.index.min() ix_max = markers.index.max() + 1 if has_start: end_symbol = "}" start_symbol_mask = markers.str.contains(start_symbol).fillna(False) starts_ix = start_symbol_mask.index[start_symbol_mask].to_list() end_symbol_mask = markers.str.contains(end_symbol).fillna(False) ends_ix = end_symbol_mask.index[end_symbol_mask].to_list() def include_end_ix(fro, to): potential = range(fro + 1, to + 1) included_ends = [ix for ix in ends_ix if ix in potential] n_ends = len(included_ends) if not n_ends: inspect_series = markers.loc[fro + 1 : to + 1].dropna() logger.warning( f"Phrase [{fro}:{to}] was expected to have an end symbol within [{fro+1}:{to+1}]:\n{inspect_series}" ) return (fro, None, to) elif n_ends > 2: inspect_series = markers.loc[fro + 1 : to + 1].dropna() logger.warning( f"Phrase [{fro}:{to}] has multiple end symbols within [{fro+1}:{to+1}]:\n{inspect_series}" ) return (fro, None, to) end_ix = included_ends[0] return (fro, end_ix, to) start_end_subsequent = [ include_end_ix(fro, to) for fro, to in zip(starts_ix, starts_ix[1:] + [ix_max]) ] else: end_symbol_mask = markers.str.contains(end_symbol).fillna(False) subsequent_ix = (end_symbol_mask.index[end_symbol_mask] + 1).to_list() start_end_subsequent = [ (fro, to - 1, to) for fro, to in zip([ix_min] + subsequent_ix[:-1], subsequent_ix) ] result = [] for start_i, end_i, subsequent_i in start_end_subsequent: first_i = start_i if n_ante: new_first_i = start_i - n_ante if new_first_i >= ix_min: first_i = new_first_i else: first_i = ix_min stop_i = subsequent_i if n_post: new_stop_i = subsequent_i + n_post if new_stop_i <= ix_max: stop_i = new_stop_i else: stop_i = ix_max result.append((first_i, start_i, end_i, subsequent_i, stop_i)) return result
[docs]def get_index_intervals_for_phrases( harmony_labels: D, group_cols: List[str], n_ante: int = 0, n_post: int = 0, logger: Optional[logging.Logger] = None, ) -> Dict[Any, List[Tuple[int, int]]]: """Returns a list of slice intervals for selecting the rows belonging to a phrase.""" if logger is None: logger = module_logger phraseends_reset = harmony_labels.reset_index() group_intervals = {} groupby = phraseends_reset.groupby(group_cols) for group, markers in groupby.phraseend: first_start_end_sbsq_last = _get_index_intervals_for_phrases( markers, n_ante=n_ante, n_post=n_post, logger=logger ) group_intervals[group] = first_start_end_sbsq_last return group_intervals
[docs]def make_chord_col(df: D, cols: Optional[List[str]] = None, name: str = "chord"): """The 'chord' column contains the chord part of a DCML label, i.e. without indications of key, pedal, cadence, or phrase. This function can re-create this column, e.g. if the feature columns were changed. To that aim, the function takes a DataFrame and the column names that it adds together, creating new strings. """ if cols is None: cols = ["numeral", "form", "figbass", "changes", "relativeroot"] cols = [c for c in cols if c in df.columns] summing_cols = [c for c in cols if c not in ("changes", "relativeroot")] if len(summing_cols) == 1: chord_col = df[summing_cols[0]].fillna("").astype("string") else: chord_col = df[summing_cols].fillna("").astype("string").sum(axis=1) if "changes" in cols: chord_col += ("(" + df.changes.astype("string") + ")").fillna("") if "relativeroot" in cols: chord_col += ("/" + df.relativeroot.astype("string")).fillna("") return chord_col.rename(name)
def _make_concatenated_ranges( starts: npt.NDArray[np.int64], stops: npt.NDArray[np.int64], counts: npt.NDArray[np.int64], ): """Helper function that is a vectorized version of the equivalent but roughly 100x slower .. code-block:: python np.array([np.arange(start, stop) for start, stop in zip(starts, stops)]).flatten() Solution adapted from Warren Weckesser's via https://stackoverflow.com/a/20033438 Args: starts: Array of index range starts. stops: Array of index range stops (exclusive). counts: Corresponds to stops - starts. 0-count ranges need to be excluded beforehand. Returns: """ counts1 = counts[:-1] reset_index = np.cumsum(counts1) reset_values = 1 + starts[1:] - stops[:-1] incr = np.ones(counts.sum(), dtype=int) incr[0] = starts[0] incr[reset_index] = reset_values incr.cumsum(out=incr) return incr def _make_range_boundaries( first: int, start: int, end: int, sbsq: int, stop: int ) -> npt.NDArray[np.int64]: """Turns the individual tuples output by :func:`_get_index_intervals_for_phrases` into four range boundaries. The four intervals are [first:start), [start:end], [end:sbsq), [sbsq:stop), which correspond to the components (ante, body, codetta, post) of the phrase. The body interval is right-inclusive, which means that the end symbol is included both in the body and, in the beginning of the 'codetta' or 'post' component. """ return np.array([[first, start], [start, end + 1], [end, sbsq], [sbsq, stop]])
[docs]def make_raw_phrase_df( feature_df: D, ix_intervals: List[Tuple[int, int, Optional[int], int, int]], logger: Optional[logging.Logger] = None, ): """Takes the intervals generated by :meth:`get_index_intervals_for_phrases` and returns a dataframe with two additional index levels, one expressing a running count of phrases used as IDs, and one exhibiting for each phrase between one and four of the phrase_component names (ante, body, codetta, post), where 'body' is guaranteed to be present. """ if logger is None: logger = module_logger take_mask, id_level, name_level = make_take_mask_and_index( ix_intervals, logger=logger ) phrase_df = feature_df.take(take_mask) old_index = phrase_df.index.to_frame(index=False) new_levels = pd.DataFrame( dict( phrase_id=id_level, phrase_component=name_level, ) ) nlevels = phrase_df.index.nlevels new_index = pd.concat( [ old_index.take(range(nlevels - 1), axis=1), new_levels, old_index.take([-1], axis=1), ], axis=1, ) phrase_df.index = pd.MultiIndex.from_frame(new_index) # here we correct durations for the fact that the end symbol is included both as last symbol of the body and the # first symbol of the codetta or subsequent phrase. At the end of the body, the duration is set to 0. body_end_positions = _get_body_end_positions_from_raw_phrases(new_index) duration_col_position = phrase_df.columns.get_loc("duration_qb") phrase_df.iloc[body_end_positions, duration_col_position] = 0.0 components_lasts = make_groups_lasts_mask( new_index, ["phrase_id", "phrase_component"] ) # ToDo: add to documentation the fact that the duration of terminal harmonies is not amended. This allows for # inspecting the duration of the last harmony but leads to the fact that the summed duration of all phrases in a # piece may be longer than the piece itself, namely when a long terminal harmony is 'interrupted' by the beginning # of the next phrase: the following code duplicates the duration following the { update_duration_qb( phrase_df, ~components_lasts, logger ) # ToDo: check 0-durations in codetta, e.g. for } labels; overhaul phrase duration update (condense_pedal_points) return phrase_df
[docs]def make_take_mask_and_index( ix_intervals: List[Tuple[int, int, Optional[int], int, int]], logger: logging.Logger, ) -> Tuple[npt.NDArray, npt.NDArray, npt.NDArray]: """Takes a list of (first_i, start_i, end_i, subsequent_i, stop_i) index positions and turns them into * an array of corresponding index positions that can be used as argument for :meth:`pandas.DataFrame.take` * an array of equal length that specifies the corresponding phrase IDs (which come from an integer range) * an array of equal length that specifies the corresponding phrase components (ante, body, codetta, post) """ range_boundaries = [] for first, start, end, sbsq, last in ix_intervals: if end is None: logger.info("Skipping phrase with undefined end symbol.") continue range_boundaries.append(_make_range_boundaries(first, start, end, sbsq, last)) ranges = np.vstack(range_boundaries) starts, stops = ranges.T counts = stops - starts not_empty_mask = counts > 0 if not_empty_mask.any(): take_mask = _make_concatenated_ranges( starts[not_empty_mask], stops[not_empty_mask], counts[not_empty_mask] ) else: take_mask = _make_concatenated_ranges(starts, stops, counts) n_repeats = int(counts.shape[0] / 4) phrase_ids = np.repeat(np.arange(n_repeats), 4) names = np.tile(np.array(["ante", "body", "codetta", "post"]), n_repeats) id_level = phrase_ids.repeat(counts) name_level = names.repeat(counts) return take_mask, id_level, name_level
# endregion helpers
[docs]class MuseScoreFacetName(ObjectEnum): MuseScoreChords = "MuseScoreChords" MuseScoreFacet = "MuseScoreFacet" MuseScoreHarmonies = "MuseScoreHarmonies" MuseScoreMeasures = "MuseScoreMeasures" MuseScoreNotes = "MuseScoreNotes"
[docs]class Facet(DimcatResource): """A facet is one aspect of a score that can sensibly ordered and conceived of along the score's timeline. The format of a facet depends on the score format and tries to stay as close to the original as possible, using only the necessary minimum of standardization. Content and format of a facet define which features can be extracted, based on which configuration options. """ pass
[docs]class EventsFacet(Facet): """A facet that represents sounding events and/or rests. Events specify 'what' is to be performed.""" pass
[docs]class ControlsFacet(Facet): """A facet that represents 'control events' in MEI parlance; i.e. elements that depend on events to exist, such as dynamics, ties, phrase marks, pedal marks, etc. Controls define 'how' something is to be performed. """ pass
[docs]class AnnotationsFacet(Facet): """A facet that represents one or several annotation layers.""" pass
[docs]class StructureFacet(Facet): """A facet that describes structural elements of a score, pertaining to its timeline, segmentations, or its repeat structure.""" pass
[docs]class MuseScoreFacet(Facet): """A single facet of a MuseScore package as created by the ms3 MuseScore parsing library. Contains a single TSV facet one or several corpora. Naming format ``<name>.<facet>[.tsv]``.""" _enum_type = MuseScoreFacetName
[docs] @classmethod def from_descriptor( cls, descriptor: dict | Resource, descriptor_filename: Optional[str] = None, basepath: Optional[str] = None, auto_validate: bool = False, default_groupby: Optional[str | list[str]] = None, ) -> Self: if isinstance(descriptor, (str, Path)): raise TypeError( f"This method expects a descriptor dictionary. In order to create a " f"{cls.name} from a path, use {cls.__name__}.from_descriptor_path() instead." ) if cls.name == "MuseScoreFacet": # dispatch to the respective facet based on the resource name if isinstance(descriptor, fl.Resource): fl_resource = descriptor else: fl_resource = fl.Resource.from_descriptor(descriptor) facet_name2constructor = dict( chords=MuseScoreChords, expanded=MuseScoreHarmonies, harmonies=MuseScoreHarmonies, measures=MuseScoreMeasures, metadata=Metadata, notes=MuseScoreNotes, ) resource_name = fl_resource.name try: _, facet_name = resource_name.rsplit(".", 1) Klass = facet_name2constructor.get(facet_name) if Klass is None: raise NotImplementedError( f"MuseScoreFacet {facet_name} is not implemented." ) except ValueError: if any( resource_name.endswith(f_name) for f_name in facet_name2constructor ): Klass = next( klass for f_name, klass in facet_name2constructor.items() if resource_name.endswith(f_name) ) return Klass.from_descriptor( descriptor=descriptor, descriptor_filename=descriptor_filename, basepath=basepath, auto_validate=auto_validate, default_groupby=default_groupby, ) return super().from_descriptor( descriptor=descriptor, descriptor_filename=descriptor_filename, basepath=basepath, auto_validate=auto_validate, default_groupby=default_groupby, )
[docs]class MuseScoreChords(MuseScoreFacet, ControlsFacet): _extractable_features = (FeatureName.Articulation,)
[docs]class MuseScoreHarmonies(MuseScoreFacet, AnnotationsFacet): _extractable_features = ( FeatureName.DcmlAnnotations, FeatureName.CadenceLabels, FeatureName.PhraseAnnotations, FeatureName.PhraseComponents, FeatureName.PhraseLabels, ) + HARMONY_FEATURE_NAMES def _prepare_feature_df(self, feature_config: DimcatConfig) -> D: Constructor = feature_config.options_class columns_to_load_if_available = Constructor.get_default_column_names() feature_df = self.get_dataframe(usecols=tuple(columns_to_load_if_available)) return feature_df def _transform_df_for_extraction( self, feature_df: D, feature_config: DimcatConfig ) -> D: feature_name = FeatureName(feature_config.options_dtype) feature_settings = dict(feature_config.complete()) if playthrough_value := feature_settings.get("playthrough"): playthrough = Playthrough(playthrough_value) if playthrough != Playthrough.RAW: feature_df = apply_playthrough( feature_df, playthrough, logger=self.logger ) cls = feature_name.get_class() feature_column_names = cls._feature_column_names if issubclass(cls, DcmlAnnotations): feature_df = extend_keys_feature(feature_df) if issubclass(cls, CadenceLabels): feature_df = drop_rows_with_missing_values( feature_df, feature_column_names, logger=self.logger ) feature_df = extend_cadence_feature(feature_df) elif issubclass(cls, KeyAnnotations): groupby_levels = feature_df.index.names[:-1] group_keys, _ = make_adjacency_groups( feature_df.localkey, groupby=groupby_levels ) feature_df = condense_dataframe_by_groups( feature_df, group_keys, logger=self.logger ) else: # issubclass(cls, (HarmonyLabels, PhraseAnnotations)) if issubclass(cls, PhraseAnnotations): missing_mask = feature_df.chord.isna() groupby_levels = feature_df.index.names[:-1] group_start_mask = make_group_start_mask(feature_df, groupby_levels) feature_df.loc[missing_mask, ["chord_tones", "added_tones"]] = pd.NA ffill_mask = missing_mask | ( missing_mask.shift(-1).fillna(False) & ~group_start_mask ) harmony_fill_columns = [ col for col in ( "pedal", "chord", "special", "numeral", "form", "figbass", "changes", "relativeroot", "chord_type", "chord_tones", "root", "bass_note", "alt_label", "pedalend", ) if col in feature_df.columns ] feature_df.loc[ffill_mask, harmony_fill_columns] = ( feature_df.loc[ffill_mask, harmony_fill_columns] .groupby(groupby_levels) .ffill() ) if issubclass(cls, PhraseAnnotations): group_intervals = get_index_intervals_for_phrases( harmony_labels=feature_df, group_cols=groupby_levels, n_ante=feature_settings.get("n_ante"), n_post=feature_settings.get("n_post"), logger=self.logger, ) ix_intervals = sum(group_intervals.values(), []) feature_df = make_raw_phrase_df( feature_df, ix_intervals, self.logger ) feature_df = condense_pedal_points(feature_df) else: feature_df = drop_rows_with_missing_values( feature_df, feature_column_names, logger=self.logger ) feature_df = extend_harmony_feature(feature_df) feature_df = add_chord_tone_intervals(feature_df) feature_df = add_chord_tone_scale_degrees(feature_df) return feature_df
[docs]class MuseScoreMeasures(MuseScoreFacet, StructureFacet): _extractable_features = (FeatureName.Measures,)
[docs]class MuseScoreNotes(MuseScoreFacet, EventsFacet): _extractable_features = (FeatureName.Notes,)