import itertools
import os
from abc import ABC, abstractmethod
from collections import defaultdict
from tempfile import NamedTemporaryFile
from ocdskit.exceptions import InconsistentVersionError, MissingOcidKeyError
from ocdskit.util import (_empty_record_package, _remove_empty_optional_metadata, _resolve_metadata,
_update_package_metadata, get_ocds_minor_version, is_release, json_dumps, jsonlib)
try:
import sqlite3
USING_SQLITE = True
[docs] def adapt_json(data):
return json_dumps(data)
[docs] def convert_json(string):
return jsonlib.loads(string)
sqlite3.register_adapter(dict, adapt_json)
sqlite3.register_converter('json', convert_json)
except ImportError:
USING_SQLITE = False
[docs]class Packager:
"""
The Packager context manager helps to build a single record package, or a stream of compiled releases or merged
releases. Release packages and/or individual releases can be added to the packager. All releases should use the
same version of OCDS.
"""
def __init__(self):
self.package = _empty_record_package()
self.version = None
if USING_SQLITE:
self.backend = SQLiteBackend()
else:
self.backend = PythonBackend()
def __enter__(self):
return self
def __exit__(self, type_, value, traceback):
self.backend.close()
[docs] def add(self, data):
"""
Adds release packages and/or individual releases to be merged.
:param data: an iterable of release packages and individual releases
:raises InconsistentVersionError: if the versions are inconsistent across packages to merge
"""
for i, item in enumerate(data):
version = get_ocds_minor_version(item)
if self.version:
if version != self.version:
# OCDS 1.1 and OCDS 1.0 have different merge rules for `awards.suppliers`. Also, mixing new and
# deprecated fields can lead to inconsistencies (e.g. transaction `amount` and `value`).
# https://standard.open-contracting.org/latest/en/schema/changelog/#advisories
raise InconsistentVersionError(f'item {i}: version error: this item uses version {version}, but '
f'earlier items used version {self.version}', self.version, version)
else:
self.version = version
if is_release(item):
self.backend.add_release(item, '')
else: # release package
uri = item.get('uri', '')
_update_package_metadata(self.package, item)
# Note: If there are millions of packages to merge, we should use SQLite to store the packages instead.
if uri and version < '1.2':
self.package['packages'].append(uri)
for release in item['releases']:
self.backend.add_release(release, uri)
self.backend.flush()
[docs] def output_package(self, merger, return_versioned_release=False, use_linked_releases=False, streaming=False):
"""
Yields a record package.
:param ocdsmerge.merge.Merger merger: a merger
:param bool return_versioned_release: whether to include a versioned release in each record
:param bool use_linked_releases: whether to use linked releases instead of full releases, if possible
:param bool streaming: whether to set the package's records to a generator instead of a list
"""
records = self.output_records(merger, return_versioned_release=return_versioned_release,
use_linked_releases=use_linked_releases)
# If a user wants to stream data but can’t exhaust records right away, we can add an `autoclose=True` argument.
# If set to `False`, `__exit__` will do nothing, and the user will need to call `packager.backend.close()`.
if not streaming:
records = list(records)
self.package['records'] = records
_resolve_metadata(self.package, 'packages')
_resolve_metadata(self.package, 'extensions')
_remove_empty_optional_metadata(self.package)
yield self.package
[docs] def output_records(self, merger, return_versioned_release=False, use_linked_releases=False):
"""
Yields records, ordered by OCID.
:param ocdsmerge.merge.Merger merger: a merger
:param bool return_versioned_release: whether to include a versioned release in the record
:param bool use_linked_releases: whether to use linked releases instead of full releases, if possible
"""
for ocid, rows in self.backend.get_releases_by_ocid():
record = {
'ocid': ocid,
'releases': [],
}
releases = []
for _, uri, release in rows:
releases.append(release)
if use_linked_releases and uri:
package_release = {
'url': uri + '#' + release['id'],
'date': release['date'],
'tag': release['tag'],
}
else:
package_release = release
record['releases'].append(package_release)
record['compiledRelease'] = merger.create_compiled_release(releases)
if return_versioned_release:
record['versionedRelease'] = merger.create_versioned_release(releases)
yield record
[docs] def output_releases(self, merger, return_versioned_release=False):
"""
Yields compiled releases or versioned releases, ordered by OCID.
:param ocdsmerge.merge.Merger merger: a merger
:param bool return_versioned_release: whether to yield versioned releases instead of compiled releases
"""
for _, rows in self.backend.get_releases_by_ocid():
releases = (row[-1] for row in rows)
if return_versioned_release:
yield merger.create_versioned_release(releases)
else:
yield merger.create_compiled_release(releases)
# The backend's responsibilities (for now) are exclusively to:
#
# * Group releases by OCID
# * Store each release's package URI
#
# For a PostgreSQL backend, see https://github.com/open-contracting/ocdskit/issues/116
[docs]class AbstractBackend(ABC):
[docs] def add_release(self, release, package_uri):
"""
Adds a release to the backend. (The release might be added to an internal buffer.)
:raises MissingOcidKeyError: if the release is missing an ``ocid`` field
"""
try:
self._add_release(release['ocid'], package_uri, release)
except KeyError as e:
raise MissingOcidKeyError('ocid') from e
@abstractmethod
def _add_release(self, ocid, package_uri, release):
pass
[docs] @abstractmethod
def get_releases_by_ocid(self):
"""
Yields an OCIDs and an iterable of tuples of ``(ocid, package_uri, release)``.
OCIDs are yielded in alphabetical order. The iterable is in any order.
"""
[docs] def flush(self):
"""
Flushes the internal buffer of releases. This may be a no-op on some backends.
"""
[docs] def close(self):
"""
Tidies up any resources used by the backend. This may be a no-op on some backends.
"""
[docs]class PythonBackend(AbstractBackend):
def __init__(self):
self.groups = defaultdict(list)
def _add_release(self, ocid, package_uri, release):
self.groups[ocid].append((ocid, package_uri, release))
[docs] def get_releases_by_ocid(self):
for ocid in sorted(self.groups):
yield ocid, self.groups[ocid]
[docs]class SQLiteBackend(AbstractBackend):
# "The sqlite3 module internally uses a statement cache to avoid SQL parsing overhead."
# https://docs.python.org/3/library/sqlite3.html#sqlite3.connect
# Note: We never commit changes. SQLite manages the memory usage of uncommitted changes.
# https://sqlite.org/atomiccommit.html#_cache_spill_prior_to_commit
def __init__(self):
self.file = NamedTemporaryFile(delete=False)
# https://docs.python.org/3/library/sqlite3.html#sqlite3.PARSE_DECLTYPES
self.connection = sqlite3.connect(self.file.name, detect_types=sqlite3.PARSE_DECLTYPES)
# https://sqlite.org/tempfiles.html#temp_databases
self.connection.execute("CREATE TEMP TABLE releases (ocid text, uri text, release json)")
self.buffer = []
def _add_release(self, ocid, package_uri, release):
self.buffer.append((ocid, package_uri, release))
[docs] def flush(self):
# https://docs.python.org/3/library/sqlite3.html#sqlite3.Connection.executemany
self.connection.executemany("INSERT INTO releases VALUES (?, ?, ?)", self.buffer)
self.buffer = []
[docs] def get_releases_by_ocid(self):
self.connection.execute("CREATE INDEX IF NOT EXISTS ocid_idx ON releases(ocid)")
results = self.connection.execute("SELECT * FROM releases ORDER BY ocid")
for ocid, rows in itertools.groupby(results, lambda row: row[0]):
yield ocid, rows
[docs] def close(self):
self.file.close()
self.connection.close()
os.unlink(self.file.name)