import functools
import json
import zlib
from collections import defaultdict
from ocdskit.util import _get_prop_name, get_definitions_keyword
VALIDATION_AND_METADATA_KEYWORDS = { # except `type`
# https://json-schema.org/draft/2020-12/draft-bhutton-json-schema-validation-00#rfc.section.6
# Any
"enum",
"const",
# Numeric
"multipleOf",
"maximum",
"exclusiveMaximum",
"minimum",
"exclusiveMinimum",
# String
"maxLength",
"minLength",
"pattern",
# Array
"maxItems",
"minItems",
"uniqueItems",
"maxContains",
"minContains",
# Object
"minProperties",
"maxProperties",
"required",
"dependentRequired",
# https://json-schema.org/draft/2020-12/draft-bhutton-json-schema-validation-00#rfc.section.7
"format",
# https://json-schema.org/draft/2020-12/draft-bhutton-json-schema-validation-00#rfc.section.9
"title",
"description",
"default",
"deprecated",
"readOnly",
"writeOnly",
"examples",
# https://swagger.io/specification/v3/#fixed-fields-21
"nullable",
"discriminator",
"xml",
"externalDocs",
"example",
}
# https://json-schema.org/draft/2020-12/meta/applicator
APPLICATOR_KEYWORDS = {
"prefixItems",
"additionalItems", # removed in draft 2020-12
"items",
"contains",
"additionalProperties",
"properties",
"patternProperties",
"dependentSchemas",
"propertyNames",
"if",
"then",
"else",
"allOf",
"anyOf",
"oneOf",
"not",
}
def _traverse_in_place(block):
def _method(value):
if isinstance(value, dict):
block(value)
for v in value.values():
_method(v)
elif isinstance(value, list):
for v in value:
_method(v)
return _method
[docs]
def get_schema_hash(schema, normalizer):
"""
:param dict schema: a JSON schema
:param normalizer: a function that accepts a JSON Schema and returns a JSON Schema,
with all structurally-irrelevant properties removed
:returns: a checksum
:rtype: int
"""
return zlib.crc32(json.dumps(normalizer(schema), sort_keys=True).encode())
[docs]
def convert_from_oas3(schema, *, get_only=False):
"""
Convert from OpenAPI Specification 3.0 to JSON Schema draft 4.
.. warning::
Modifies schema ``$ref`` values in-place.
.. admonition:: Limitations
Unsupported:
- Schema Objects not under ``#/components/schemas``
- External ``$ref`` to Schema Objects
- Any ``$ref`` to Response Objects
- Any ``$ref`` to Path Item Objects
:param dict value: a JSON schema
:param bool get_only: whether to convert only schemas used by GET paths
:returns: a schema using JSON Schema draft 4
:rtype: object
"""
def _replace_refs(value):
if (ref := value.get("$ref")) and ref.startswith("#/components/schemas/"):
name = ref[21:]
# Update the reference.
value["$ref"] = f"#/definitions/{name}"
if name not in definitions:
definition = schemas[name]
# Add to the schema's definitions.
definitions[name] = definition
# Recurse into the referenced definition.
replace_refs(definition)
replace_refs = _traverse_in_place(_replace_refs)
# https://swagger.io/specification/v3/#openapi-object
schemas = schema.get("components", {}).get("schemas", {})
if not get_only:
definitions = schemas
replace_refs(schemas)
else:
definitions = {}
for path_item_object in schema["paths"].values():
if operation := path_item_object.get("get"):
# `responses` is required, but the schema can be invalid.
for response in operation.get("responses", {}).values():
for media_type in response.get("content", {}).values():
if schema_object := media_type.get("schema"):
replace_refs(schema_object)
return {"$schema": "http://json-schema.org/draft-04/schema#", "definitions": definitions}
def _remove_private_fields(value):
if properties := value.get("properties"):
value["properties"] = {name: subschema for name, subschema in properties.items() if not name.startswith("_")}
remove_private_fields = _traverse_in_place(_remove_private_fields)
"""
Remove ``properties`` members that start with underscores.
.. warning::
Assumes ``properties`` is never a direct member of the ``properties`` validation keyword.
:param dict value: a JSON schema
"""
[docs]
def remove_fields(schema, fields):
"""
Remove the given fields from the ``properties`` mapping.
.. warning::
Assumes ``properties`` is never a direct member of the ``properties`` validation keyword.
:param dict value: a JSON schema
:param set[str] fields: fields to remove
"""
def _remove_fields(value):
if properties := value.get("properties"):
value["properties"] = {name: subschema for name, subschema in properties.items() if name not in fields}
_traverse_in_place(_remove_fields)(schema)
[docs]
def remove_unreachable_definitions(schema, pattern):
"""
Remove any definitions that are unreachable from definitions whose names contain ``pattern``.
Performs a breadth-first search from the root definitions, following ``$ref`` edges.
:param dict schema: a JSON schema
:param str pattern: a substring to case-sensitively match against definition names
"""
def _build_graph(value, source):
if isinstance(value, dict):
if (ref := value.get("$ref")) and ref.startswith(ref_prefix):
target = ref[len_prefix:]
if target in definitions:
graph[source].add(target)
for v in value.values():
_build_graph(v, source)
elif isinstance(value, list):
for v in value:
_build_graph(v, source)
definitions_keyword = get_definitions_keyword(schema)
definitions = schema.get(definitions_keyword, {})
graph = defaultdict(set)
ref_prefix = f"#/{definitions_keyword}/"
len_prefix = len(ref_prefix)
for name, definition in definitions.items():
_build_graph(definition, name)
# Breadth-first search.
keep = set()
queue = [name for name in definitions if pattern in name]
while queue:
node = queue.pop()
if node not in keep:
keep.add(node)
queue.extend(target for target in graph[node] if target not in keep)
for name in list(definitions):
if name not in keep:
del definitions[name]
[docs]
def fix_validation_errors(schema, normalizer=None):
"""
Fix validation errors in a JSON Schema.
Changes ``anyOf`` from an object to an array, deduplicating values based on their normalized form.
.. warning::
Assumes ``anyOf`` is never a direct member of the ``properties`` validation keyword.
:param dict schema: a JSON schema
:param normalizer: a function that accepts a JSON Schema and returns a JSON Schema,
with all structurally-irrelevant properties removed, for deduplication
"""
def _fix_validation_errors(value):
if (anyof := value.get("anyOf")) and isinstance(anyof, dict):
seen = []
value["anyOf"] = []
for v in anyof.values():
normalized = normalizer(v) if normalizer else v
if normalized not in seen:
seen.append(normalized)
value["anyOf"].append(v)
_traverse_in_place(_fix_validation_errors)(schema)
[docs]
def get_normal_schema(value, *, remove_nontype_keywords=False, remove_x_keywords=False, remove_fields=()):
"""
Remove metadata and validation keywords, ``x-*`` keywords and/or specific fields.
.. warning::
Assumes ``properties`` is never a direct member of the ``properties`` validation keyword.
:param dict value: a JSON schema
:param bool remove_nontype_keywords: whether to remove metadata and validation keywords
:param bool remove_x_keywords: whether to remove ``x-*`` keywords
:param set[str] remove_fields: fields to remove
:returns: a new schema with keywords removed
:rtype: object
"""
recurse = functools.partial(
get_normal_schema,
remove_nontype_keywords=remove_nontype_keywords,
remove_x_keywords=remove_x_keywords,
remove_fields=remove_fields,
)
if isinstance(value, dict):
result = {}
for k, v in value.items():
if remove_nontype_keywords and k in VALIDATION_AND_METADATA_KEYWORDS:
continue
if remove_x_keywords and k.startswith("x-"):
continue
if k == "properties": # avoid removing properties with the same names as keywords
result[k] = {pk: recurse(pv) for pk, pv in v.items() if not (remove_fields and pk in remove_fields)}
else:
result[k] = recurse(v)
return result
if isinstance(value, list):
return [recurse(v) for v in value]
return value
[docs]
def hoist_deep_properties(schema, normalizer):
"""
Move any sub-schema with a ``properties`` keyword to the definitions location.
If neither ``$defs`` nor ``definitions`` exists, ``$defs`` is used.
The schema is named using its ``title`` keyword, or its parent property.
.. warning::
Assumes ``properties`` is never a direct member of the ``properties`` validation keyword.
.. admonition:: Limitations
The schema is named after an earlier ancestor if the parent property has the same name as an
`applicator keyword <https://json-schema.org/draft/2020-12/meta/applicator>`__.
:param dict schema: a JSON schema
:param normalizer: a function that accepts a JSON Schema and returns a JSON Schema,
with all structurally-irrelevant properties removed
"""
def _hoist(value, key, parent, definition=None, definition_name=None, prop=""):
if isinstance(value, dict):
if "properties" in value and value is not definition: # don't hoist at top level
hashed = hasher(value)
name = hashes.get(hashed)
# Hoist if no match.
if not name:
name = value.get("title")
if not name: # don't use default argument to `get` in case prop is empty
name = prop[0].upper() + prop[1:] if prop else definition_name
if name in definitions:
name = f"{name}_{format(hashed & 0xFFFFFFFF, '08x')}"
definitions[name] = value
hashes[hashed] = name
# Replace the properties with a $ref.
parent[key] = {"$ref": f"#/{definition_keyword}/{name}"}
# Recalculate the current definition's hash.
if definition is not None:
hashes[hasher(definition)] = definition_name
# Special case for allOf inheritance (note the `definition` argument).
if value is definition and "allOf" in value and len(value) == 1:
for i, v in enumerate(definition["allOf"]):
_hoist(v, i, value["allOf"], v, definition_name, prop)
else:
for k, v in value.items():
_hoist(v, k, value, definition, definition_name, prop if k in APPLICATOR_KEYWORDS else k)
elif isinstance(value, list):
for i, v in enumerate(value):
_hoist(v, i, value, definition, definition_name, prop)
hasher = functools.partial(get_schema_hash, normalizer=normalizer)
definition_keyword = get_definitions_keyword(schema)
definitions = schema.setdefault(definition_keyword, {})
hashes = {hasher(definition): definition_name for definition_name, definition in definitions.items()}
for definition_name, definition in list(definitions.items()):
_hoist(definition, definition_name, definitions, definition, definition_name)
schema.pop(definition_keyword) # avoid re-processing definitions
for key, value in schema.items():
_hoist(value, key, schema, definition_name="Root")
schema[definition_keyword] = definitions
[docs]
def normalize_schema(schema, normalizer, get_base_classes):
"""
Extract base classes from a schema's definitions. Rewrite definitions to use ``allOf`` inheritance.
Hashes each ``properties`` member using ``normalizer``, calls ``get_base_classes``, then performs greedy set-cover
to determine multiple inheritance for both base classes and original definitions.
.. warning::
Modifies ``schema`` in-place.
.. admonition:: Limitations
All ``properties`` mappings must be at each definition's top-level.
See :func:`~ocdskit.normalize.hoist_deep_properties`.
:param dict schema: a JSON schema
:param normalizer: a function that accepts a JSON Schema and returns a JSON Schema,
with all structurally-irrelevant properties removed
:param get_base_classes: a function that accepts the schema's definitions as a mapping of definition names to sets
of ``{prop}:{hash}`` strings, and returns base classes as a list of dicts with the keys:
``name``
The name of the base class
``members``
A sequence of child classes
``props``
A set of ``{prop}:{hash}`` strings
"""
definitions_keyword = get_definitions_keyword(schema)
definitions = schema[definitions_keyword]
ref_prefix = f"#/{definitions_keyword}/"
# Base class calculation requires hashable values.
classes = defaultdict(set)
hashed_to_schema = {}
for name, definition in definitions.items():
if "properties" in definition:
for prop, subschema in definition["properties"].items():
hashed = f"{prop}:{get_schema_hash(subschema, normalizer)}"
classes[name].add(hashed)
hashed_to_schema[hashed] = subschema
# Calculate base classes.
base_classes = get_base_classes(classes)
# Invert base classes.
subclass_bases = defaultdict(list)
for base_class in base_classes:
for subclass in base_class["members"]:
subclass_bases[subclass].append(base_class)
# Greedy set-cover: for each class, find the fewest bases that cover its properties.
used_bases = set()
specificity_order = sorted(base_classes, key=lambda base_class: -len(base_class["props"]))
# Inheritance between base classes.
base_allofs = {}
for i, base_class in enumerate(specificity_order):
allof = []
covered = set()
for other in specificity_order[i + 1 :]:
if other["props"] < base_class["props"] and other["props"] - covered:
allof.append(other)
covered |= other["props"]
if allof:
base_allofs[base_class["name"]] = allof
used_bases.update(id(base) for base in allof)
# Inheritance between original classes and base classes.
subclass_allofs = {}
for subclass, bases in subclass_bases.items():
allof = []
covered = set()
for base_class in sorted(bases, key=lambda base_class: -len(base_class["props"])):
if base_class["props"] - covered:
allof.append(base_class)
covered |= base_class["props"]
subclass_allofs[subclass] = allof
used_bases.update(id(base) for base in allof)
# Add base classes to schema definitions.
for base_class in base_classes:
if id(base_class) not in used_bases:
continue
name = base_class["name"]
if allof := base_allofs.get(name):
subschema = {"allOf": [{"$ref": f"{ref_prefix}{base['name']}"} for base in allof]}
if remaining := base_class["props"] - set().union(*(base["props"] for base in allof)):
properties = {_get_prop_name(p): hashed_to_schema[p] for p in remaining}
subschema["allOf"].append({"properties": properties})
else:
properties = {_get_prop_name(p): hashed_to_schema[p] for p in base_class["props"]}
subschema = {"properties": properties}
definitions[name] = subschema
# Modify existing definitions to reference base classes.
for subclass, allof in subclass_allofs.items():
definition = definitions[subclass]
properties = definition["properties"]
# Remove properties covered by base classes.
for base in allof:
for prop in base["props"]:
if prop in classes[subclass]:
properties.pop(_get_prop_name(prop), None) # a property can be covered by multiple base classes
if not properties:
del definition["properties"]
# Build allOf value.
value = [{"$ref": f"{ref_prefix}{base['name']}"} for base in allof]
value.append(definition.copy())
definition.clear()
definition["allOf"] = value