Source code for ocdskit.combine

import warnings

from ocdsextensionregistry import ProfileBuilder
from ocdsmerge import Merger
from ocdsmerge.util import get_release_schema_url

from ocdskit.exceptions import MissingRecordsWarning, MissingReleasesWarning
from ocdskit.packager import Packager
from ocdskit.util import (
    _empty_record_package,
    _empty_release_package,
    _remove_empty_optional_metadata,
    _resolve_metadata,
    _update_package_metadata,
    get_ocds_patch_tag,
)

DEFAULT_VERSION = '1.1'  # fields might be deprecated


def _package(key, items, uri, publisher, published_date, version, extensions=None):
    if publisher is None:
        publisher = {}
    publisher.setdefault('name', '')

    output = {
        'uri': uri,
        'publisher': publisher,
        'publishedDate': published_date,
        'version': version,
    }

    if extensions:
        output['extensions'] = extensions

    output[key] = items

    return output


[docs] def package_records(records, uri='', publisher=None, published_date='', version=DEFAULT_VERSION, extensions=None): """ Wraps records in a record package. :param list records: a list of records :param str uri: the record package's ``uri`` :param dict publisher: the record package's ``publisher`` :param str published_date: the record package's ``publishedDate`` :param str version: the record package's ``version`` :param list extensions: the record package's ``extensions`` """ return _package('records', records, uri, publisher, published_date, version, extensions)
[docs] def package_releases(releases, uri='', publisher=None, published_date='', version=DEFAULT_VERSION, extensions=None): """ Wraps releases in a release package. :param list releases: a list of releases :param str uri: the release package's ``uri`` :param dict publisher: the release package's ``publisher`` :param str published_date: the release package's ``publishedDate`` :param str version: the release package's ``version`` :param list extensions: the release package's ``extensions`` """ return _package('releases', releases, uri, publisher, published_date, version, extensions)
[docs] def combine_record_packages(packages, uri='', publisher=None, published_date='', version=DEFAULT_VERSION): """ Collects the packages and records from the record packages into one record package. Warns ``~ocdskit.exceptions.MissingRecordsWarning`` if the "records" field is missing from a record package. :param packages: an iterable of record packages :param str uri: the record package's ``uri`` :param dict publisher: the record package's ``publisher`` :param str published_date: the record package's ``publishedDate`` :param str version: the record package's ``version`` """ # See options for not buffering all inputs into memory: https://github.com/open-contracting/ocdskit/issues/119 output = _empty_record_package(uri, publisher, published_date, version) output['packages'] = {} for i, package in enumerate(packages): _update_package_metadata(output, package) if 'records' in package: output['records'].extend(package['records']) else: warnings.warn(MissingRecordsWarning(i)) if 'packages' in package: output['packages'].update(dict.fromkeys(package['packages'])) if publisher: output['publisher'] = publisher _resolve_metadata(output, 'packages') _resolve_metadata(output, 'extensions') _remove_empty_optional_metadata(output) return output
[docs] def combine_release_packages(packages, uri='', publisher=None, published_date='', version=DEFAULT_VERSION): """ Collects the releases from the release packages into one release package. Warns ``~ocdskit.exceptions.MissingReleasesWarning`` if the "releases" field is missing from a release package. :param packages: an iterable of release packages :param str uri: the release package's ``uri`` :param dict publisher: the release package's ``publisher`` :param str published_date: the release package's ``publishedDate`` :param str version: the release package's ``version`` """ # See options for not buffering all inputs into memory: https://github.com/open-contracting/ocdskit/issues/119 output = _empty_release_package(uri, publisher, published_date, version) for i, package in enumerate(packages): _update_package_metadata(output, package) if 'releases' in package: output['releases'].extend(package['releases']) else: warnings.warn(MissingReleasesWarning(i)) if publisher: output['publisher'] = publisher _resolve_metadata(output, 'extensions') _remove_empty_optional_metadata(output) return output
[docs] def merge(data, uri='', publisher=None, published_date='', version=DEFAULT_VERSION, schema=None, return_versioned_release=False, return_package=False, use_linked_releases=False, streaming=False): """ Merges release packages and individual releases. By default, yields compiled releases. If ``return_versioned_release`` is ``True``, yields versioned releases. If ``return_package`` is ``True``, wraps the compiled releases (and versioned releases if ``return_versioned_release`` is ``True``) in a record package. If ``return_package`` is set and ``publisher`` isn't set, the output record package will have the same publisher as the last input release package. :param data: an iterable of release packages and individual releases :param str uri: if ``return_package`` is ``True``, the record package's ``uri`` :param dict publisher: if ``return_package`` is ``True``, the record package's ``publisher`` :param str published_date: if ``return_package`` is ``True``, the record package's ``publishedDate`` :param str version: if ``return_package`` is ``True``, the record package's ``version`` :param dict schema: the URL, path or dict of the patched release schema to use :param bool return_package: wrap the compiled releases in a record package :param bool use_linked_releases: if ``return_package`` is ``True``, use linked releases instead of full releases, if the input is a release package :param bool return_versioned_release: if ``return_package`` is ``True``, include versioned releases in the record package; otherwise, yield versioned releases instead of compiled releases :param bool streaming: if ``return_package`` is ``True``, set the package's records to a generator (this only works if the calling code exhausts the generator before ``merge`` returns) :raises InconsistentVersionError: if the versions are inconsistent across packages to merge :raises MissingOcidKeyError: if the release is missing an ``ocid`` field :raises UnknownVersionError: if the OCDS version is not recognized """ with Packager() as packager: packager.add(data) if not schema and packager.version: tag = get_ocds_patch_tag(packager.version) if packager.package['extensions']: # `extensions` is an insertion-ordered dict at this point. builder = ProfileBuilder(tag, list(packager.package['extensions'])) schema = builder.patched_release_schema() else: schema = get_release_schema_url(tag) merger = Merger(schema) if return_package: packager.package['uri'] = uri packager.package['publishedDate'] = published_date packager.package['version'] = version if publisher: packager.package['publisher'] = publisher yield from packager.output_package(merger, return_versioned_release=return_versioned_release, use_linked_releases=use_linked_releases, streaming=streaming) else: yield from packager.output_releases(merger, return_versioned_release=return_versioned_release)
[docs] def compile_release_packages(*args, **kwargs): warnings.warn('compile_release_packages() is deprecated. Use merge() instead.', DeprecationWarning, stacklevel=2) yield from merge(*args, **kwargs)