from __future__ import annotations
import logging
from typing import Iterable, Iterator, List, Literal, Optional, Type
import frictionless as fl
import marshmallow as mm
from dimcat import DimcatConfig, get_class
from dimcat.data.base import Data
from dimcat.data.packages.base import Package, PackageSpecs
from dimcat.data.resources.base import R, Resource
from dimcat.data.resources.dc import FeatureSpecs
from dimcat.dc_exceptions import (
DuplicatePackageNameError,
EmptyCatalogError,
EmptyPackageError,
NoMatchingResourceFoundError,
PackageNotFoundError,
ResourceNotFoundError,
)
from dimcat.utils import treat_basepath_argument
from frictionless import FrictionlessException
from typing_extensions import Self
module_logger = logging.getLogger(__name__)
[docs]class DimcatCatalog(Data):
"""Has the purpose of collecting and managing a set of :obj:`Package` objects.
Analogous to a :obj:`frictionless.Catalog`, but without intermediate :obj:`frictionless.Dataset` objects.
Nevertheless, a DimcatCatalog can be stored as and created from a Catalog descriptor (ToDo).
"""
[docs] class PickleSchema(Data.PickleSchema):
packages = mm.fields.List(
mm.fields.Nested(Package.Schema),
allow_none=True,
metadata=dict(description="The packages in the catalog."),
)
[docs] class Schema(PickleSchema, Data.Schema):
pass
def __init__(
self,
basepath: Optional[str] = None,
packages: Optional[PackageSpecs | List[PackageSpecs]] = None,
) -> None:
"""Creates a DimcatCatalog which is essentially a list of :obj:`Package` objects.
Args:
basepath: The basepath for all packages in the catalog.
"""
self._packages: List[Package] = []
super().__init__(basepath=basepath)
if packages is not None:
self.packages = packages
def __getitem__(self, item: str) -> Package:
try:
return self.get_package(item)
except Exception as e:
raise KeyError(str(e)) from e
def __iter__(self) -> Iterator[Package]:
yield from self._packages
def __len__(self) -> int:
return len(self._packages)
@property
def basepath(self) -> Optional[str]:
"""If specified, the basepath for all packages added to the catalog."""
return self._basepath
@basepath.setter
def basepath(self, basepath: str) -> None:
new_catalog = self._basepath is None
self._set_basepath(basepath, set_packages=new_catalog)
@property
def package_names(self) -> List[str]:
return [package.package_name for package in self._packages]
@property
def packages(self) -> List[Package]:
return self._packages
@packages.setter
def packages(self, packages: PackageSpecs | List[PackageSpecs]) -> None:
if len(self._packages) > 0:
raise ValueError("Cannot set packages if packages are already present.")
if isinstance(packages, (Package, fl.Package, str)):
packages = [packages]
for package in packages:
try:
self.add_package(package)
except FrictionlessException as e:
self.logger.error(f"Adding the package {package!r} failed with\n{e!r}")
[docs] def add_package(
self,
package: PackageSpecs,
basepath: Optional[str] = None,
copy: bool = False,
):
"""Adds a :obj:`Package` to the catalog."""
if isinstance(package, fl.Package):
dc_package = Package.from_descriptor(package)
elif isinstance(package, str):
dc_package = Package.from_descriptor_path(package)
elif isinstance(package, Package):
if copy:
dc_package = package.copy()
else:
dc_package = package
else:
msg = f"{self.name}.add_package() takes a package, not {type(package)!r}."
raise TypeError(msg)
if dc_package.package_name in self.package_names:
raise DuplicatePackageNameError(dc_package.package_name)
if basepath is not None:
dc_package.basepath = basepath
self._packages.append(dc_package)
[docs] def add_resource(
self,
resource: Resource,
package_name: Optional[str] = None,
):
"""Adds a resource to the catalog. If package_name is given, adds the resource to the package with that name."""
package = self.get_package_by_name(package_name, create=True)
package.add_resource(resource=resource)
[docs] def check_feature_availability(self, feature: FeatureSpecs) -> bool:
"""Checks whether the given feature is potentially available."""
return True
[docs] def copy(self) -> Self:
new_object = self.__class__(basepath=self.basepath)
new_object.packages = self.packages
return new_object
[docs] def extend(self, catalog: Iterable[Package]) -> None:
"""Adds all packages from another catalog to this one."""
for package in catalog:
if package.package_name not in self.package_names:
self.add_package(package.copy())
continue
self_package = self.get_package_by_name(package.package_name)
self_package.extend(package)
[docs] def extend_package(self, package: Package) -> None:
"""Adds all resources from the given package to the existing one with the same name."""
catalog_package = self.get_package_by_name(package.package_name, create=True)
catalog_package.extend(package)
[docs] def get_package(self, name: Optional[str] = None) -> Package:
"""If a name is given, calls :meth:`get_package_by_name`, otherwise returns the last loaded package.
Raises:
RuntimeError if no package has been loaded.
"""
if name is not None:
return self.get_package_by_name(name=name)
if len(self._packages) == 0:
raise EmptyCatalogError
return self._packages[-1]
[docs] def get_package_by_name(self, name: str, create: bool = False) -> Package:
"""
Raises:
fl.FrictionlessException if none of the loaded packages has the given name.
"""
for package in self._packages:
if package.package_name == name:
return package
if create:
self.make_new_package(
package_name=name,
basepath=self.basepath,
)
self.logger.info(f"Automatically added new empty package {name!r}")
return self.get_package()
raise PackageNotFoundError(name)
[docs] def get_resource_by_config(self, config: DimcatConfig) -> Resource:
"""Returns the first resource that matches the given config.
Raises:
EmptyCatalogError: If the package is empty.
NoMatchingResourceFoundError: If no resource matching the specs is found in the "features" package.
"""
if len(self._packages) == 0:
raise EmptyCatalogError
for package in self._packages:
try:
return package.get_resource_by_config(config)
except (EmptyPackageError, ResourceNotFoundError):
pass
raise NoMatchingResourceFoundError(config)
[docs] def get_resource_by_name(self, name: str) -> R:
"""Returns the Resource with the given name.
Raises:
EmptyCatalogError: If the package is empty.
ResourceNotFoundError: If the resource with the given name is not found.
"""
if len(self._packages) == 0:
raise EmptyCatalogError
for package in self._packages:
try:
return package.get_resource_by_name(name=name)
except (EmptyPackageError, ResourceNotFoundError):
pass
raise ResourceNotFoundError(name, self.catalog_name)
[docs] def get_resources_by_regex(self, regex: str) -> List[Resource]:
"""Returns the Resource objects whose names contain the given regex."""
result = []
for package in self._packages:
result.extend(package.get_resources_by_regex(regex=regex))
return result
[docs] def get_resources_by_type(
self,
resource_type: Type[Resource] | str,
) -> List[Resource]:
"""Returns the Resource objects of the given type."""
if isinstance(resource_type, str):
resource_type = get_class(resource_type)
results = []
for package in self._packages:
results.extend(package.get_resources_by_type(resource_type=resource_type))
return results
[docs] def has_package(self, name: str) -> bool:
"""Returns True if a package with the given name is loaded, False otherwise."""
for package in self._packages:
if package.package_name == name:
return True
return False
[docs] def iter_resources(self):
"""Iterates over all resources in all packages."""
for package in self:
for resource in package:
yield resource
[docs] def make_new_package(
self,
package: Optional[PackageSpecs] = None,
package_name: Optional[str] = None,
basepath: Optional[str] = None,
auto_validate: bool = False,
):
"""Adds a package to the catalog. Parameters are the same as for :class:`Package`."""
if package is None or isinstance(package, (fl.Package, str)):
package = Package(
package_name=package_name,
basepath=basepath,
auto_validate=auto_validate,
)
elif not isinstance(package, Package):
msg = f"{self.name} takes a Package, not {type(package)!r}."
raise ValueError(msg)
self.add_package(package, basepath=basepath)
[docs] def replace_package(self, package: Package) -> None:
"""Replaces the package with the same name as the given package with the given package."""
if not isinstance(package, Package):
msg = (
f"{self.name}.replace_package() takes a Package, not {type(package)!r}."
)
raise TypeError(msg)
for i, p in enumerate(self._packages):
if p.package_name == package.package_name:
self.logger.info(
f"Replacing package {p.package_name!r} ({p.n_resources} resources) with "
f"package {package.package_name!r} ({package.n_resources} resources)"
)
self._packages[i] = package
return
self.add_package(package)
def _set_basepath(
self,
basepath: str | Literal[None],
set_packages: bool = True,
) -> None:
"""Sets the basepath for all packages in the catalog (if set_packages=True)."""
self._basepath = treat_basepath_argument(basepath, self.logger)
if not set_packages:
return
for package in self._packages:
package.basepath = self.basepath
[docs] def summary_dict(self, include_type: bool = True) -> dict:
"""Returns a summary of the dataset."""
if include_type:
summary = {
p.package_name: [f"{r.resource_name!r} ({r.dtype})" for r in p]
for p in self._packages
}
else:
summary = {p.package_name: p.resource_names for p in self._packages}
return dict(basepath=self.basepath, packages=summary)