import os
import sys
from abc import ABC, abstractmethod
from ocdskit.util import ijson, iterencode, json_dumps
[docs]
class BaseCommand(ABC):
kwargs = {} # noqa: RUF012
def __init__(self, subparsers):
"""Initialize the subparser and add arguments."""
self.subparser = subparsers.add_parser(self.name, description=self.help, **self.kwargs)
self.add_base_arguments()
self.add_arguments()
self.args = None
[docs]
def add_base_arguments(self): # noqa: B027 # noop
"""Add default arguments to all commands."""
[docs]
def add_arguments(self): # noqa: B027 # noop
"""Add arguments specific to this command."""
[docs]
def add_argument(self, *args, **kwargs):
"""Add an argument to the subparser."""
self.subparser.add_argument(*args, **kwargs)
[docs]
@abstractmethod
def handle(self):
"""Run the command."""
[docs]
def prefix(self):
"""Return the path to the items to process within each input."""
return ""
[docs]
def items(self, **kwargs):
"""Yield the items in the input."""
file = StandardInputReader(self.args.encoding)
yield from ijson.items(file, self.prefix(), multiple_values=True, **kwargs)
[docs]
def print(self, data, *, streaming=False):
"""
Print JSON data.
:param bool streaming: whether to stream output using ``json.JSONEncoder().iterencode()`` (it is only more
memory efficient if ``data`` contains iterators)
"""
kwargs = {}
if self.args.pretty:
kwargs["indent"] = 2
if self.args.ascii:
kwargs["ensure_ascii"] = True
try:
if streaming:
for chunk in iterencode(data, **kwargs):
print(chunk, end="")
print()
else:
print(json_dumps(data, **kwargs))
sys.stdout.flush()
# https://docs.python.org/3/library/signal.html#note-on-sigpipe
except BrokenPipeError:
devnull = os.open(os.devnull, os.O_WRONLY)
os.dup2(devnull, sys.stdout.fileno())
sys.exit(1)
[docs]
class OCDSCommand(BaseCommand, ABC):
[docs]
def add_base_arguments(self):
self.add_argument(
"--root-path", type=str, default="", help="the path to the items to process within each input"
)
[docs]
def prefix(self):
return self.args.root_path
[docs]
def items(self, **kwargs):
"""Yield the items in the input. If an item is an array, yield each entry of the array."""
for item in super().items(**kwargs):
if isinstance(item, list):
yield from item
else:
yield item
[docs]
def add_package_arguments(self, infix, prefix="", version="1.1"):
"""Add arguments for setting package metadata to the subparser."""
template = f"{prefix}set the {infix} package's {{}} to this value"
self.add_argument("--uri", type=str, default="", help=template.format("uri"))
self.add_argument("--published-date", type=str, default="", help=template.format("publishedDate"))
self.add_argument("--version", type=str, default=version, help=template.format("version"))
self.add_argument("--publisher-name", type=str, default="", help=template.format("publisher's name"))
self.add_argument("--publisher-uri", type=str, default="", help=template.format("publisher's uri"))
self.add_argument("--publisher-scheme", type=str, default="", help=template.format("publisher's scheme"))
self.add_argument("--publisher-uid", type=str, default="", help=template.format("publisher's uid"))
self.add_argument(
"--fake", action="store_true", help=f"{prefix}set the {infix} package's required metadata to dummy values"
)
[docs]
def parse_package_arguments(self):
"""Return package metadata as a dictionary to be used as keyword arguments."""
kwargs = {
"uri": self.args.uri,
"publisher": {},
"published_date": self.args.published_date,
"version": self.args.version,
}
if self.args.fake:
if not kwargs["uri"]:
kwargs["uri"] = "placeholder:"
if not kwargs["published_date"]:
kwargs["published_date"] = "9999-01-01T00:00:00Z"
for key in ("publisher_name", "publisher_uri", "publisher_scheme", "publisher_uid"):
value = getattr(self.args, key)
if value:
kwargs["publisher"][key[10:]] = value
return kwargs