import itertools
import json
import re
from decimal import Decimal
import ijson
from ocdsmerge.util import get_tags
from ocdskit.exceptions import UnknownFormatError, UnknownVersionError
try:
import orjson
jsonlib = orjson
except ImportError:
jsonlib = json
# https://tomwojcik.com/posts/2023-01-02/python-311-str-enum-breaking-change
try:
from enum import StrEnum
except ImportError:
from enum import Enum
class StrEnum(str, Enum):
pass
# See `grouper` recipe: https://docs.python.org/3/library/itertools.html#recipes
[docs]
def grouper(iterable, n, fillvalue=None):
args = [iter(iterable)] * n
return itertools.zip_longest(*args, fillvalue=fillvalue)
# https://stackoverflow.com/questions/21663800/python-make-a-list-generator-json-serializable/46841935#46841935
[docs]
class SerializableGenerator(list):
def __init__(self, iterable):
try:
# If `iter()` is omitted, then `__iter__` won't exhaust `head`.
self.head = iter([next(iterable)])
# Adding an item to the list ensures `__bool__` and `__len__` work.
self.append(iterable)
except StopIteration:
# `__iter__` requires `head` to be set.
self.head = []
def __iter__(self):
# `*self[:1]` is used, because `self[0]` raises IndexError when `iterable` is empty.
return itertools.chain(self.head, *self[:1])
[docs]
class JSONEncoder(json.JSONEncoder):
[docs]
def default(self, obj):
if isinstance(obj, Decimal):
return float(obj)
# https://docs.python.org/3/library/json.html#json.JSONEncoder.default
try:
iterable = iter(obj)
except TypeError:
pass
else:
return SerializableGenerator(iterable)
return json.JSONEncoder.default(self, obj)
[docs]
def iterencode(data, *, ensure_ascii=False, **kwargs):
"""Return a generator that yields each string representation as available."""
if "indent" not in kwargs:
kwargs["separators"] = (",", ":")
return JSONEncoder(ensure_ascii=ensure_ascii, **kwargs).iterencode(data)
[docs]
def json_dump(data, io, *, ensure_ascii=False, **kwargs):
"""Dump JSON to a file-like object."""
if "indent" not in kwargs:
kwargs["separators"] = (",", ":")
json.dump(data, io, ensure_ascii=ensure_ascii, cls=JSONEncoder, **kwargs)
[docs]
def json_dumps(data, *, ensure_ascii=False, indent=None, sort_keys=False, **kwargs):
"""Dump JSON to a string, and return it."""
# orjson doesn't support `ensure_ascii` if `True`, `indent` if not `2` or other arguments except for `sort_keys`.
if jsonlib == json or ensure_ascii or (indent and indent != 2) or kwargs:
if not indent:
kwargs["separators"] = (",", ":")
return json.dumps(
data, cls=JSONEncoder, ensure_ascii=ensure_ascii, indent=indent, sort_keys=sort_keys, **kwargs
)
option = 0
if indent:
option |= orjson.OPT_INDENT_2
if sort_keys:
option |= orjson.OPT_SORT_KEYS
# orjson dumps to bytes.
return orjson.dumps(data, default=JSONEncoder().default, option=option).decode()
[docs]
def get_definitions_keyword(schema):
"""
Return the schema's definitions keyword, defaulting to ``$defs``.
:param dict schema: a JSON schema
:returns: ``"$defs"`` or ``"definitions"``
:rtype: str
"""
return next((keyword for keyword in ("$defs", "definitions") if keyword in schema), "$defs")
[docs]
def get_ocds_minor_version(data):
"""Return the OCDS minor version of the release package, record package, release or record."""
if is_package(data):
if "version" in data:
return data["version"]
return "1.0"
if is_record(data):
if any("parties" in release for release in data["releases"]):
return "1.1"
return "1.0"
# release
if "parties" in data:
return "1.1"
return "1.0"
[docs]
def get_ocds_patch_tag(version):
"""
Return the OCDS patch version as a git tag (like ``1__1__4``) for a given minor version (like ``1.1``).
:raises UnknownVersionError: if the OCDS version is not recognized
"""
prefix = version.replace(".", "__") + "__"
try:
return next(tag for tag in reversed(get_tags()) if tag.startswith(prefix))
except StopIteration as e:
raise UnknownVersionError(version) from e
[docs]
def is_package(data):
"""Return whether the data is a release package or record package."""
return is_release_package(data) or is_record_package(data)
[docs]
def is_record_package(data):
"""
Return whether the data is a record package.
A record package has a required ``records`` field. Its other required fields are shared with release packages.
"""
return "records" in data
[docs]
def is_record(data):
"""
Return whether the data is a record.
A record has required ``releases`` and ``ocid`` fields.
"""
return "releases" in data and "ocid" in data
[docs]
def is_release_package(data):
"""
Return whether the data is a release package.
A release package has a required ``releases`` field. Its other required fields are shared with record packages.
To distinguish a release package from a record, we test for the absence of the ``ocid`` field.
"""
return "releases" in data and "ocid" not in data
[docs]
def is_release(data):
"""Return whether the data is a release (embedded or linked, individual or compiled)."""
return "date" in data
[docs]
def is_compiled_release(data):
"""Return whether the data is a compiled release (embedded or linked)."""
return "tag" in data and isinstance(data["tag"], list) and "compiled" in data["tag"]
[docs]
def is_linked_release(data, maximum_properties=3):
"""
Return whether the data is a linked release.
A linked release has required ``url`` and ``date`` fields and an optional ``tag`` field. An embedded release has
required ``date`` and ``tag`` fields (among others), and it can have a ``url`` field as an additional field.
To distinguish a linked release from an embedded release, we test for the presence of the required ``url`` field
and test whether the number of fields is fewer than three.
"""
return "url" in data and len(data) <= maximum_properties
def _detect_format_result(
is_concatenated, is_array, has_records, has_releases, has_ocid, has_tag, is_compiled, metadata_count
):
if has_records:
detected_format = Format.record_package
elif has_releases and has_ocid:
detected_format = Format.record
elif has_releases:
detected_format = Format.release_package
elif is_compiled:
detected_format = Format.compiled_release
elif has_tag:
detected_format = Format.release
elif has_ocid:
detected_format = Format.versioned_release
elif metadata_count >= 4:
detected_format = Format.empty_package
else:
infix = "array" if is_array else "object"
raise UnknownFormatError(f"top-level JSON value is a non-OCDS {infix}")
return (detected_format, is_concatenated, is_array)
def _empty_record_package(uri="", publisher=None, published_date="", version=None):
package = _empty_package(uri, publisher, published_date, version)
package["packages"] = []
package["records"] = []
return package
def _empty_release_package(uri="", publisher=None, published_date="", version=None):
package = _empty_package(uri, publisher, published_date, version)
package["releases"] = []
return package
def _empty_package(uri, publisher, published_date, version):
if publisher is None:
publisher = {}
return {
"uri": uri,
"publisher": publisher,
"publishedDate": published_date,
"license": None,
"publicationPolicy": None,
"version": version,
"extensions": {},
}
def _update_package_metadata(output, package):
for field in ("publisher", "license", "publicationPolicy"):
if field in package:
output[field] = package[field]
# We use an insertion-ordered dict to keep extensions in order without duplication.
if "extensions" in package:
output["extensions"].update(dict.fromkeys(package["extensions"]))
def _resolve_metadata(output, field):
if output[field]:
output[field] = list(output[field])
else:
del output[field]
def _remove_empty_optional_metadata(output):
for field in ("license", "publicationPolicy", "version"):
if output[field] is None:
del output[field]
def _cast_as_list(value):
if isinstance(value, str):
return [value]
return sorted(value)
def _get_prop_name(pair):
"""Extract the property name from a ``prop:hash`` string."""
return pair.partition(":")[0]
WORD_BOUNDARIES = re.compile(r"[ ._-]+|(?<=[a-z])(?=[A-Z])|(?<=[A-Z])(?=[A-Z][a-z])")
def _split_camel_case(name):
"""Split into capitalized words at space, dot, underscore, dash and camelCase boundaries."""
return [word.capitalize() for word in WORD_BOUNDARIES.split(name)]
def _dedupe_with_counter(name, names):
if name in names:
root = name
counter = 2
while name in names:
name = f"{root}{counter}"
counter += 1
return name
[docs]
def longest_common_subsequence(x, y):
"""Return the longest common subsequence of two word lists."""
# https://en.wikipedia.org/wiki/Longest_common_subsequence#Computing_the_length_of_the_LCS
m, n = len(x), len(y)
c = [[0] * (n + 1) for _ in range(m + 1)]
for i in range(1, m + 1):
for j in range(1, n + 1):
if x[i - 1] == y[j - 1]:
c[i][j] = c[i - 1][j - 1] + 1
else:
c[i][j] = max(c[i][j - 1], c[i - 1][j])
i, j = m, n
result = []
while i > 0 and j > 0:
if x[i - 1] == y[j - 1]:
result.append(x[i - 1])
i -= 1
j -= 1
elif c[i][j - 1] >= c[i - 1][j]:
j -= 1
else:
i -= 1
return list(reversed(result))