Source code for ocdskit.packager

import itertools
import os
from abc import ABC, abstractmethod
from collections import defaultdict
from tempfile import NamedTemporaryFile

from ocdskit.exceptions import InconsistentVersionError, MissingOcidKeyError
from ocdskit.util import (_empty_record_package, _remove_empty_optional_metadata, _resolve_metadata,
                          _update_package_metadata, get_ocds_minor_version, is_release, json_dumps, jsonlib)

try:
    import sqlite3

    USING_SQLITE = True

[docs] def adapt_json(data): return json_dumps(data)
[docs] def convert_json(string): return jsonlib.loads(string)
sqlite3.register_adapter(dict, adapt_json) sqlite3.register_converter('json', convert_json) except ImportError: USING_SQLITE = False
[docs]class Packager: """ The Packager context manager helps to build a single record package, or a stream of compiled releases or merged releases. Release packages and/or individual releases can be added to the packager. All releases should use the same version of OCDS. """ def __init__(self): self.package = _empty_record_package() self.version = None if USING_SQLITE: self.backend = SQLiteBackend() else: self.backend = PythonBackend() def __enter__(self): return self def __exit__(self, type_, value, traceback): self.backend.close()
[docs] def add(self, data): """ Adds release packages and/or individual releases to be merged. :param data: an iterable of release packages and individual releases :raises InconsistentVersionError: if the versions are inconsistent across packages to merge """ for i, item in enumerate(data): version = get_ocds_minor_version(item) if self.version: if version != self.version: # OCDS 1.1 and OCDS 1.0 have different merge rules for `awards.suppliers`. Also, mixing new and # deprecated fields can lead to inconsistencies (e.g. transaction `amount` and `value`). # https://standard.open-contracting.org/latest/en/schema/changelog/#advisories raise InconsistentVersionError(f'item {i}: version error: this item uses version {version}, but ' f'earlier items used version {self.version}', self.version, version) else: self.version = version if is_release(item): self.backend.add_release(item, '') else: # release package uri = item.get('uri', '') _update_package_metadata(self.package, item) # Note: If there are millions of packages to merge, we should use SQLite to store the packages instead. if uri and version < '1.2': self.package['packages'].append(uri) for release in item['releases']: self.backend.add_release(release, uri) self.backend.flush()
[docs] def output_package(self, merger, return_versioned_release=False, use_linked_releases=False, streaming=False): """ Yields a record package. :param ocdsmerge.merge.Merger merger: a merger :param bool return_versioned_release: whether to include a versioned release in each record :param bool use_linked_releases: whether to use linked releases instead of full releases, if possible :param bool streaming: whether to set the package's records to a generator instead of a list """ records = self.output_records(merger, return_versioned_release=return_versioned_release, use_linked_releases=use_linked_releases) # If a user wants to stream data but can’t exhaust records right away, we can add an `autoclose=True` argument. # If set to `False`, `__exit__` will do nothing, and the user will need to call `packager.backend.close()`. if not streaming: records = list(records) self.package['records'] = records _resolve_metadata(self.package, 'packages') _resolve_metadata(self.package, 'extensions') _remove_empty_optional_metadata(self.package) yield self.package
[docs] def output_records(self, merger, return_versioned_release=False, use_linked_releases=False): """ Yields records, ordered by OCID. :param ocdsmerge.merge.Merger merger: a merger :param bool return_versioned_release: whether to include a versioned release in the record :param bool use_linked_releases: whether to use linked releases instead of full releases, if possible """ for ocid, rows in self.backend.get_releases_by_ocid(): record = { 'ocid': ocid, 'releases': [], } releases = [] for _, uri, release in rows: releases.append(release) if use_linked_releases and uri: package_release = { 'url': uri + '#' + release['id'], 'date': release['date'], 'tag': release['tag'], } else: package_release = release record['releases'].append(package_release) record['compiledRelease'] = merger.create_compiled_release(releases) if return_versioned_release: record['versionedRelease'] = merger.create_versioned_release(releases) yield record
[docs] def output_releases(self, merger, return_versioned_release=False): """ Yields compiled releases or versioned releases, ordered by OCID. :param ocdsmerge.merge.Merger merger: a merger :param bool return_versioned_release: whether to yield versioned releases instead of compiled releases """ for _, rows in self.backend.get_releases_by_ocid(): releases = (row[-1] for row in rows) if return_versioned_release: yield merger.create_versioned_release(releases) else: yield merger.create_compiled_release(releases)
# The backend's responsibilities (for now) are exclusively to: # # * Group releases by OCID # * Store each release's package URI # # For a PostgreSQL backend, see https://github.com/open-contracting/ocdskit/issues/116
[docs]class AbstractBackend(ABC):
[docs] def add_release(self, release, package_uri): """ Adds a release to the backend. (The release might be added to an internal buffer.) :raises MissingOcidKeyError: if the release is missing an ``ocid`` field """ try: self._add_release(release['ocid'], package_uri, release) except KeyError as e: raise MissingOcidKeyError('ocid') from e
@abstractmethod def _add_release(self, ocid, package_uri, release): pass
[docs] @abstractmethod def get_releases_by_ocid(self): """ Yields an OCIDs and an iterable of tuples of ``(ocid, package_uri, release)``. OCIDs are yielded in alphabetical order. The iterable is in any order. """
[docs] def flush(self): """ Flushes the internal buffer of releases. This may be a no-op on some backends. """
[docs] def close(self): """ Tidies up any resources used by the backend. This may be a no-op on some backends. """
[docs]class PythonBackend(AbstractBackend): def __init__(self): self.groups = defaultdict(list) def _add_release(self, ocid, package_uri, release): self.groups[ocid].append((ocid, package_uri, release))
[docs] def get_releases_by_ocid(self): for ocid in sorted(self.groups): yield ocid, self.groups[ocid]
[docs]class SQLiteBackend(AbstractBackend): # "The sqlite3 module internally uses a statement cache to avoid SQL parsing overhead." # https://docs.python.org/3/library/sqlite3.html#sqlite3.connect # Note: We never commit changes. SQLite manages the memory usage of uncommitted changes. # https://sqlite.org/atomiccommit.html#_cache_spill_prior_to_commit def __init__(self): self.file = NamedTemporaryFile(delete=False) # https://docs.python.org/3/library/sqlite3.html#sqlite3.PARSE_DECLTYPES self.connection = sqlite3.connect(self.file.name, detect_types=sqlite3.PARSE_DECLTYPES) # https://sqlite.org/tempfiles.html#temp_databases self.connection.execute("CREATE TEMP TABLE releases (ocid text, uri text, release json)") self.buffer = [] def _add_release(self, ocid, package_uri, release): self.buffer.append((ocid, package_uri, release))
[docs] def flush(self): # https://docs.python.org/3/library/sqlite3.html#sqlite3.Connection.executemany self.connection.executemany("INSERT INTO releases VALUES (?, ?, ?)", self.buffer) self.buffer = []
[docs] def get_releases_by_ocid(self): self.connection.execute("CREATE INDEX IF NOT EXISTS ocid_idx ON releases(ocid)") results = self.connection.execute("SELECT * FROM releases ORDER BY ocid") for ocid, rows in itertools.groupby(results, lambda row: row[0]): yield ocid, rows
[docs] def close(self): self.file.close() self.connection.close() os.unlink(self.file.name)