from __future__ import annotations
from dataclasses import dataclass
LANGUAGE_CODE_SUFFIX = "_(((([A-Za-z]{2,3}(-([A-Za-z]{3}(-[A-Za-z]{3}){0,2}))?)|[A-Za-z]{4}|[A-Za-z]{5,8})(-([A-Za-z]{4}))?(-([A-Za-z]{2}|[0-9]{3}))?(-([A-Za-z0-9]{5,8}|[0-9][A-Za-z0-9]{3}))*(-([0-9A-WY-Za-wy-z](-[A-Za-z0-9]{2,8})+))*(-(x(-[A-Za-z0-9]{1,8})+))?)|(x(-[A-Za-z0-9]{1,8})+))" # noqa: E501
LANGUAGE_CODE_SUFFIX_LEN = len(LANGUAGE_CODE_SUFFIX)
[docs]
@dataclass
class Field:
"""Initialize a schema field object."""
#: The field's name.
name: str
#: The field's schema.
schema: dict
#: The ``deprecated`` property of the field.
deprecated_self: dict
#: The ``deprecated`` property of the field, or an ancestor of the field.
deprecated: dict
#: The JSON pointer to the field in the schema, e.g. ``/properties/tender/properties/id``.
#: Used, for example, to look up a modified field's original schema in the release schema.
pointer: str
#: The path to the field in data, e.g. ``('tender', 'id')``.
path_components: tuple
#: The definition in which the field is defined, e.g. ``'Item'``.
definition: str
#: Whether the field is defined under ``patternProperties``.
pattern: bool = False
#: Whether the field has a corresponding field in the schema's ``patternProperties`` (like in OCDS 1.1).
multilingual: bool = False
#: Whether the field is listed under ``required``.
required: bool = False
#: Whether the field's name is ``id`` and isn't under a ``wholeListMerge`` array.
merge_by_id: bool = False
#: The field's codelist.
codelist: str = ""
#: Whether the field's codelist is open.
open_codelist: bool = False
#: The separator to use in string representations of paths.
sep = "."
@property
def path(self):
"""Return the path to the field in data with ``self.sep`` as separator, e.g. ``tender.id``."""
return self.sep.join(self.path_components)
def __repr__(self):
return repr(self.asdict())
[docs]
def asdict(self, sep=None, exclude=()):
"""
Return the field as a dict, with keys for all properties except ``path_components``.
:param list sep: the separator to use in string representations of paths, overriding ``self.sep``
:param list exclude: a list of keys to exclude from the dict
"""
sep = sep or self.sep
return {k: v for k, v in self.__dict__.items() if k not in exclude and k != "path_components"} | (
{} if "path" in exclude else {"path": sep.join(self.path_components)}
)
[docs]
def get_schema_fields(
schema: dict,
pointer: str = "",
path_components: tuple = (),
definition: str = "",
deprecated: dict | None = None,
*,
whole_list_merge: bool = False,
array: bool = False,
):
"""
Yield a :class:`~ocdskit.schema.Field` for each name under ``properties`` or ``patternProperties``.
:param schema: A dereferenced JSON schema. If using ``jsonref``, and if subschemas set both ``$ref`` and other
properties, the schema must be dereferenced with either ``proxies=True`` or ``merge_props=True``.
:param pointer: The JSON pointer to the field in the schema, e.g. ``/properties/tender/properties/id``.
:param path_components: The path to the field in data, e.g. ``('tender', 'id')``.
:param definition: The definition in which the field is defined, e.g. ``'Item'``.
:param deprecated: If the field, or an ancestor of the field, sets ``deprecated``, the ``deprecated`` object.
:param whole_list_merge: Whether the field, or an ancestor of the field, sets ``wholelistMerge``.
:param array: Whether the field is under ``items/properties`` or ``items/patternProperties``.
"""
multilingual = set()
nonmultilingual_pattern_properties = {}
required = schema.get("required", [])
# `deprecated` and `whole_list_merge` are inherited.
deprecated = deprecated or _deprecated(schema)
whole_list_merge = whole_list_merge or schema.get("wholeListMerge", False)
if pattern_properties := schema.get("patternProperties"):
for pattern, subschema in pattern_properties.items():
# The pattern might have an extra set of parentheses (like in OCDS 1.1). Assumes the final character is $.
for offset in (2, 1):
end = -LANGUAGE_CODE_SUFFIX_LEN - offset
# The pattern must be anchored and the suffix must occur at the end.
if (
pattern[end:-offset] == LANGUAGE_CODE_SUFFIX
and pattern[:offset] == "^("[:offset]
and pattern[-offset:] == ")$"[-offset:]
):
multilingual.add(pattern[offset:end])
break
# Set `multilingual` on corresponding `properties`. Yield remaining `patternProperties`.
else:
nonmultilingual_pattern_properties[pattern] = subschema
if items := schema.get("items"):
# `items` advances the pointer and sets array context (for the next level only).
if isinstance(items, dict):
yield from get_schema_fields(
items,
f"{pointer}/items",
path_components,
definition,
deprecated,
whole_list_merge=whole_list_merge,
array=True,
)
else:
for i, subschema in enumerate(items):
yield from get_schema_fields(
subschema,
f"{pointer}/items/{i}",
path_components,
definition,
deprecated,
whole_list_merge=whole_list_merge,
array=True,
)
for keyword in ("anyOf", "allOf", "oneOf"):
if elements := schema.get(keyword):
for i, subschema in enumerate(elements):
# These keywords advance the pointer.
yield from get_schema_fields(
subschema,
f"{pointer}/{keyword}/{i}",
path_components,
definition,
deprecated,
whole_list_merge=whole_list_merge,
)
for keyword in ("then", "else"):
if subschema := schema.get(keyword):
# These keywords advance the pointer.
yield from get_schema_fields(
subschema,
f"{pointer}/{keyword}",
path_components,
definition,
deprecated,
whole_list_merge=whole_list_merge,
)
if properties := schema.get("properties"):
for name, subschema in properties.items():
prop_pointer = f"{pointer}/properties/{name}"
prop_path_components = (*path_components, name)
prop_deprecated = _deprecated(subschema)
prop_codelist, prop_open_codelist = _codelist(subschema)
# To date, codelist and openCodelist in OCDS aren't set on `items`.
yield Field(
name=name,
schema=subschema,
pointer=prop_pointer,
path_components=prop_path_components,
definition=definition,
deprecated_self=prop_deprecated,
deprecated=deprecated or prop_deprecated,
codelist=prop_codelist,
open_codelist=prop_open_codelist,
multilingual=name in multilingual,
required=name in required,
merge_by_id=name == "id" and array and not whole_list_merge,
)
# `properties` advances the pointer and path.
yield from get_schema_fields(
subschema,
prop_pointer,
prop_path_components,
definition,
deprecated,
whole_list_merge=whole_list_merge,
)
# Yield `patternProperties` after `properties`, to be interpreted in context.
for name, subschema in nonmultilingual_pattern_properties.items():
# The duplication across `properties` and `patternProperties` can be avoided, but is >5% slower.
prop_pointer = f"{pointer}/patternProperties/{name}"
prop_path_components = (*path_components, name)
prop_deprecated = _deprecated(subschema)
prop_codelist, prop_open_codelist = _codelist(subschema)
yield Field(
name=name,
schema=subschema,
pointer=prop_pointer,
path_components=prop_path_components,
definition=definition,
deprecated_self=prop_deprecated,
deprecated=deprecated or prop_deprecated,
codelist=prop_codelist,
open_codelist=prop_open_codelist,
pattern=True,
# `patternProperties` can't be multilingual, required, or "id".
)
# `patternProperties` advances the pointer and path.
yield from get_schema_fields(
subschema,
prop_pointer,
prop_path_components,
definition,
deprecated,
whole_list_merge=whole_list_merge,
)
# `definitions` is canonically only at the top level.
if not pointer:
# Yield definitions after `properties` and `patternProperties`, to be interpreted in context.
for keyword in ("$defs", "definitions"):
if definitions := schema.get(keyword):
for name, subschema in definitions.items():
# These keywords advance the pointer and set the definition.
yield from get_schema_fields(subschema, f"/{keyword}/{name}", definition=name)
def _codelist(subschema):
default = "enum" not in subschema
if codelist := subschema.get("codelist"):
return codelist, subschema.get("openCodelist", default)
# The behavior hasn't been decided if `items` is an array (e.g. with conflicting codelist-related values).
if (items := subschema.get("items")) and isinstance(items, dict):
return items.get("codelist", ""), items.get("openCodelist", default)
return "", default
def _deprecated(value):
return value.get("deprecated") or (hasattr(value, "__reference__") and value.__reference__.get("deprecated")) or {}
[docs]
def add_validation_properties(schema, *, unique_items=True, coordinates=False):
"""
Add "minItems" and "uniqueItems" if an array, add "minProperties" if an object, and add "minLength" if a string
and if "enum", "format" and "pattern" aren't set.
:param dict schema: a JSON schema
:param bool unique_items: whether to add "uniqueItems" properties to array fields
:param bool coordinates: whether the parent is a geospatial coordinates field
"""
if isinstance(schema, list):
for item in schema:
add_validation_properties(item, unique_items=unique_items)
elif isinstance(schema, dict):
if "type" in schema:
if (
"string" in schema["type"]
# "enum" is more strict than "minLength".
and "enum" not in schema
# The defined formats do not match zero-length strings.
# https://datatracker.ietf.org/doc/html/draft-fge-json-schema-validation-00#section-7.3
and "format" not in schema
# The pattern is assumed to not match zero-length strings.
and "pattern" not in schema
):
schema.setdefault("minLength", 1)
if "array" in schema["type"]:
# Allow non-unique items for coordinates fields (e.g. closed polygons).
if sorted(schema.get("items", {}).get("type", [])) == ["array", "number"]:
coordinates = True
if unique_items and not coordinates:
schema.setdefault("uniqueItems", True)
schema.setdefault("minItems", 1)
if "object" in schema["type"]:
schema.setdefault("minProperties", 1)
for value in schema.values():
add_validation_properties(value, unique_items=unique_items, coordinates=coordinates)