Source code for ocdskit.packager

from __future__ import annotations

import itertools
import os
import warnings
from abc import ABC, abstractmethod
from collections import defaultdict
from tempfile import NamedTemporaryFile
from typing import TYPE_CHECKING

from ocdsmerge.exceptions import InconsistentTypeError

from ocdskit.exceptions import InconsistentVersionError, MergeErrorWarning, MissingOcidKeyError, NonObjectReleaseError
from ocdskit.util import (
    _empty_record_package,
    _remove_empty_optional_metadata,
    _resolve_metadata,
    _update_package_metadata,
    get_ocds_minor_version,
    is_release,
    json_dumps,
    jsonlib,
)

if TYPE_CHECKING:
    import ocdsmerge

try:
    import sqlite3

    USING_SQLITE = True

[docs] def adapt_json(data): return json_dumps(data)
[docs] def convert_json(string): return jsonlib.loads(string)
sqlite3.register_adapter(dict, adapt_json) sqlite3.register_converter("json", convert_json) except ImportError: USING_SQLITE = False # The `warnings.catch_warnings()` context manager resets the `showwarning` method to the module's definition. # Accept a `showwarning` method as an argument, to preserve any earlier override (e.g. by `__main__.py`). def _showwarning(showwarning, ocid): def function(message, category, filename, lineno, file=None, line=None): showwarning(f"{ocid}: {message}", category, filename, lineno, file=file, line=line) return function
[docs] class Packager: """ The Packager context manager helps to build a single record package, or a stream of compiled releases or merged releases. Release packages and/or individual releases can be added to the packager. All releases should use the same version of OCDS. """ def __init__(self, force_version: str | None = None): """ :param force_version: version to use instead of the version of the first release package or individual release """ self.package = _empty_record_package() self.version = force_version if USING_SQLITE: self.backend = SQLiteBackend() else: self.backend = PythonBackend() def __enter__(self): return self def __exit__(self, type_, value, traceback): self.backend.close()
[docs] def add(self, data, *, ignore_version: bool = False): """ Add release packages and/or individual releases to be merged. :param data: an iterable of release packages and individual releases :param ignore_version: do not raise an error if the versions are inconsistent across items to merge :raises InconsistentVersionError: if the versions are inconsistent across items to merge """ for i, item in enumerate(data): version = get_ocds_minor_version(item) if self.version: if not ignore_version and version != self.version: # OCDS 1.1 and OCDS 1.0 have different merge rules for `awards.suppliers`. Also, mixing new and # deprecated fields can lead to inconsistencies (e.g. transaction `amount` and `value`). # https://standard.open-contracting.org/latest/en/schema/changelog/#advisories raise InconsistentVersionError( f"item {i}: version error: this item uses version {version}, " f"but earlier items used version {self.version}", self.version, version, ) else: self.version = version if is_release(item): self.backend.add_release(item, "") else: # release package uri = item.get("uri", "") _update_package_metadata(self.package, item) # Note: If there are millions of packages to merge, we should use SQLite to store the packages instead. if uri and version < "1.2": self.package["packages"].append(uri) for release in item["releases"]: if release is not None: # observed in some release packages self.backend.add_release(release, uri) self.backend.flush()
[docs] def output_package( self, merger: ocdsmerge.merge.Merger, *, return_versioned_release: bool = False, use_linked_releases: bool = False, streaming: bool = False, convert_exceptions_to_warnings: bool = False, ): """ Yield a record package. :param merger: a merger :param return_versioned_release: whether to include a versioned release in each record :param use_linked_releases: whether to use linked releases instead of full releases, if possible :param streaming: whether to set the package's records to a generator instead of a list :param convert_exceptions_to_warnings: whether to convert inconsistent type errors from OCDS Merge to warnings """ records = self.output_records( merger, return_versioned_release=return_versioned_release, use_linked_releases=use_linked_releases, convert_exceptions_to_warnings=convert_exceptions_to_warnings, ) # If a user wants to stream data but can't exhaust records right away, we can add an `autoclose=True` argument. # If set to `False`, `__exit__` will do nothing, and the user will need to call `packager.backend.close()`. if not streaming: records = list(records) self.package["records"] = records _resolve_metadata(self.package, "packages") _resolve_metadata(self.package, "extensions") _remove_empty_optional_metadata(self.package) yield self.package
[docs] def output_records( self, merger: ocdsmerge.merge.Merger, *, return_versioned_release: bool = False, use_linked_releases: bool = False, convert_exceptions_to_warnings: bool = False, ): """ Yield records, ordered by OCID. :param merger: a merger :param return_versioned_release: whether to include a versioned release in the record :param use_linked_releases: whether to use linked releases instead of full releases, if possible :param convert_exceptions_to_warnings: whether to convert inconsistent type errors from OCDS Merge to warnings """ for ocid, rows in self.backend.get_releases_by_ocid(): record = { "ocid": ocid, "releases": [], } releases = [] for _, uri, release in rows: releases.append(release) if use_linked_releases and uri: package_release = { "url": uri + "#" + release["id"], "date": release["date"], "tag": release["tag"], } else: package_release = release record["releases"].append(package_release) showwarning = warnings.showwarning with warnings.catch_warnings(): warnings.showwarning = _showwarning(showwarning, ocid) try: record["compiledRelease"] = merger.create_compiled_release(releases) if return_versioned_release: record["versionedRelease"] = merger.create_versioned_release(releases) except InconsistentTypeError as e: if convert_exceptions_to_warnings: warnings.warn(str(e), category=MergeErrorWarning, stacklevel=2) else: raise yield record
[docs] def output_releases( self, merger: ocdsmerge.merge.Merger, *, return_versioned_release: bool = False, convert_exceptions_to_warnings: bool = False, ): """ Yield compiled releases or versioned releases, ordered by OCID. :param merger: a merger :param return_versioned_release: whether to yield versioned releases instead of compiled releases :param convert_exceptions_to_warnings: whether to convert inconsistent type errors from OCDS Merge to warnings """ for ocid, rows in self.backend.get_releases_by_ocid(): releases = (row[-1] for row in rows) showwarning = warnings.showwarning with warnings.catch_warnings(): warnings.showwarning = _showwarning(showwarning, ocid) try: if return_versioned_release: yield merger.create_versioned_release(releases) else: yield merger.create_compiled_release(releases) except InconsistentTypeError as e: if convert_exceptions_to_warnings: warnings.warn(str(e), category=MergeErrorWarning, stacklevel=2) else: raise
# The backend's responsibilities (for now) are exclusively to: # # * Group releases by OCID # * Store each release's package URI # # For a PostgreSQL backend, see https://github.com/open-contracting/ocdskit/issues/116
[docs] class AbstractBackend(ABC):
[docs] def add_release(self, release, package_uri): """ Add a release to the backend. (The release might be added to an internal buffer.) :raises MissingOcidKeyError: if the release is missing an ``ocid`` field """ try: ocid = release["ocid"] except KeyError as e: raise MissingOcidKeyError("ocid") from e except TypeError as e: raise NonObjectReleaseError(type(release).__name__) from e self._add_release(ocid, package_uri, release)
@abstractmethod def _add_release(self, ocid, package_uri, release): pass
[docs] @abstractmethod def get_releases_by_ocid(self): """ Yield an OCIDs and an iterable of tuples of ``(ocid, package_uri, release)``. OCIDs are yielded in alphabetical order. The iterable is in any order. """
[docs] def flush(self): # noqa: B027 # noop """Flushes the internal buffer of releases. This may be a no-op on some backends."""
[docs] def close(self): # noqa: B027 # noop """Tidies up any resources used by the backend. This may be a no-op on some backends."""
[docs] class PythonBackend(AbstractBackend): def __init__(self): self.groups = defaultdict(list) def _add_release(self, ocid, package_uri, release): self.groups[ocid].append((ocid, package_uri, release))
[docs] def get_releases_by_ocid(self): for ocid in sorted(self.groups): yield ocid, self.groups[ocid]
[docs] class SQLiteBackend(AbstractBackend): # "The sqlite3 module internally uses a statement cache to avoid SQL parsing overhead." # https://docs.python.org/3/library/sqlite3.html#sqlite3.connect # Note: We never commit changes. SQLite manages the memory usage of uncommitted changes. # https://sqlite.org/atomiccommit.html#_cache_spill_prior_to_commit def __init__(self): self.file = NamedTemporaryFile(delete=False) # noqa: SIM115 # https://docs.python.org/3/library/sqlite3.html#sqlite3.PARSE_DECLTYPES self.connection = sqlite3.connect(self.file.name, detect_types=sqlite3.PARSE_DECLTYPES) # https://sqlite.org/tempfiles.html#temp_databases self.connection.execute("CREATE TEMP TABLE releases (ocid text, uri text, release json)") self.buffer = [] def _add_release(self, ocid, package_uri, release): self.buffer.append((ocid, package_uri, release))
[docs] def flush(self): # https://docs.python.org/3/library/sqlite3.html#sqlite3.Connection.executemany self.connection.executemany("INSERT INTO releases VALUES (?, ?, ?)", self.buffer) self.buffer = []
[docs] def get_releases_by_ocid(self): self.connection.execute("CREATE INDEX IF NOT EXISTS ocid_idx ON releases(ocid)") results = self.connection.execute("SELECT * FROM releases ORDER BY ocid") yield from itertools.groupby(results, lambda row: row[0])
[docs] def close(self): self.file.close() self.connection.close() os.unlink(self.file.name)