Source code for ocdskit.upgrade

import json
import logging
from collections import OrderedDict  # for move_to_end()
from copy import deepcopy
from hashlib import md5

from ocdskit.util import (
    _cast_as_list,
    get_ocds_minor_version,
    is_package,
    is_record,
    is_record_package,
    is_release_package,
)

logger = logging.getLogger("ocdskit")

# See https://standard.open-contracting.org/1.0/en/schema/reference/#identifier
organization_identification_1_0 = (
    (None, ("name",)),
    ("identifier", ("scheme", "id", "legalName", "uri")),
    ("address", ("streetAddress", "locality", "region", "postalCode", "countryName")),
    ("contactPoint", ("name", "email", "telephone", "faxNumber", "url")),
)


def _move_to_top(data, fields):
    for field in reversed(fields):
        if field in data:
            data.move_to_end(field, last=False)


def _in(obj, field):
    return field in obj and obj[field] is not None


[docs] def upgrade_10_10(data): """Upgrade a release package, record package, release or record from 1.0 to 1.0 (no-op).""" return data
[docs] def upgrade_11_11(data): """Upgrade a release package, record package, release or record from 1.1 to 1.1 (no-op).""" return data
[docs] def upgrade_10_11(data): """ Upgrade a release package, record package, release or record from 1.0 to 1.1. Retain the deprecated Amendment.changes, Budget.source and Milestone.documents fields. ``data`` must be an ``OrderedDict``. If you have only the parsed JSON, re-parse it with: ``upgrade_10_11(json.loads(json.dumps(data), object_pairs_hook=OrderedDict))`` """ version = get_ocds_minor_version(data) if version != "1.0": return data if is_package(data): data["version"] = "1.1" _move_to_top(data, ("uri", "version")) if is_record_package(data): for record in data["records"]: upgrade_record_10_11(record) elif is_release_package(data): for release in data["releases"]: upgrade_release_10_11(release) elif is_record(data): upgrade_record_10_11(data) else: # release upgrade_release_10_11(data) return data
[docs] def upgrade_record_10_11(record): """Upgrade a record from 1.0 to 1.1.""" if "releases" in record: for release in record["releases"]: upgrade_release_10_11(release) if "compiledRelease" in record: upgrade_release_10_11(record["compiledRelease"])
[docs] def upgrade_release_10_11(release): """Apply upgrades for organization handling, amendment handling and transactions terminology.""" upgrade_parties_10_to_11(release) upgrade_amendments_10_11(release) upgrade_transactions_10_11(release)
[docs] def upgrade_parties_10_to_11(release): """Convert organizations to organization references and fill in the ``parties`` array.""" parties = _get_parties(release) if _in(release, "buyer"): buyer = release["buyer"] release["buyer"] = _add_party(parties, buyer, "buyer") if _in(release, "tender"): if _in(release["tender"], "procuringEntity"): procuring_entity = release["tender"]["procuringEntity"] release["tender"]["procuringEntity"] = _add_party(parties, procuring_entity, "procuringEntity") if _in(release["tender"], "tenderers"): for i, tenderer in enumerate(release["tender"]["tenderers"]): release["tender"]["tenderers"][i] = _add_party(parties, tenderer, "tenderer") if _in(release, "awards"): for award in release["awards"]: if _in(award, "suppliers"): for i, supplier in enumerate(award["suppliers"]): award["suppliers"][i] = _add_party(parties, supplier, "supplier") if parties: if "parties" not in release: release["parties"] = [] _move_to_top(release, ("ocid", "id", "date", "tag", "initiationType", "parties")) for party in parties.values(): if party not in release["parties"]: release["parties"].append(party)
def _get_parties(release): parties = {} if "parties" in release: for party in release["parties"]: _id = party["id"] if "id" in party else _create_party_id(party) parties[_id] = party return parties def _add_party(parties, party, role): """ Add an ``id`` to the party, add the party to the ``parties`` array, set the party's role, and return an OrganizationReference. Warn if there is any data loss from differences in non-identifying fields. """ party = deepcopy(party) if "id" not in party: party["id"] = _create_party_id(party) _move_to_top(party, ("id")) _id = party["id"] if _id not in parties: parties[_id] = party else: # Warn about data loss. survivor = deepcopy(parties[_id]) roles = survivor.pop("roles", []) if any(key not in survivor or party[key] != survivor[key] for key in party): logger.warning( 'party in "%s" role differs from party in %s roles:\n%s\n%s', role, json.dumps(roles), json.dumps(party), json.dumps(survivor), ) if "roles" not in parties[_id]: parties[_id]["roles"] = [] _move_to_top(parties[_id], ("id", "roles")) # In case the data is invalid. parties[_id]["roles"] = _cast_as_list(parties[_id]["roles"]) if role not in parties[_id]["roles"]: # Update the `roles` of the party in the `parties` array. parties[_id]["roles"].append(role) # Create the OrganizationReference. organization_reference = {"id": _id} if "name" in party: organization_reference["name"] = party["name"] return organization_reference def _create_party_id(party): parts = [] for parent, fields in organization_identification_1_0: if not parent: parts.extend(_get_bytes(party, field) for field in fields) elif parent in party: parts.extend(_get_bytes(party[parent], field) for field in fields) return md5(b"-".join(parts)).hexdigest() # noqa: S324 # non-cryptographic def _get_bytes(obj, field): # Handle null and integers. return str(obj.get(field) or "").encode("utf-8")
[docs] def upgrade_amendments_10_11(release): """ Rename ``amendment`` to ``amendments`` under ``tender``, ``awards`` and ``contracts``. If ``amendments`` already exists, append the ``amendment`` value to the ``amendments`` array, unless it already contains it. """ if _in(release, "tender"): _upgrade_amendment_10_11(release["tender"]) for field in ("awards", "contracts"): if _in(release, field): for block in release[field]: _upgrade_amendment_10_11(block)
def _upgrade_amendment_10_11(block): if "amendment" in block: block.setdefault("amendments", []) if block["amendment"] not in block["amendments"]: block["amendments"].append(block["amendment"]) del block["amendment"]
[docs] def upgrade_transactions_10_11(release): """ Rename ``providerOrganization`` to ``payer``, ``receiverOrganization`` to ``payee``, and ``amount`` to ``value`` under ``contracts.implementation.transactions``, unless they already exist. Convert ``providerOrganization`` and ``receiverOrganization`` from an Identifier to an OrganizationReference and fill in the ``parties`` array. """ parties = _get_parties(release) if _in(release, "contracts"): for contract in release["contracts"]: if _in(contract, "implementation") and _in(contract["implementation"], "transactions"): for transaction in contract["implementation"]["transactions"]: if "value" not in transaction: transaction["value"] = transaction["amount"] del transaction["amount"] for old, new in (("providerOrganization", "payer"), ("receiverOrganization", "payee")): if old in transaction and new not in transaction: party = OrderedDict([("identifier", transaction[old])]) if "legalName" in transaction[old]: party["name"] = transaction[old]["legalName"] transaction[new] = _add_party(parties, party, new) del transaction[old]