Source code for ocdskit.cli.commands.base

import os
import sys
from abc import ABC, abstractmethod

import ijson

from ocdskit.util import iterencode, json_dumps

[docs]class StandardInputReader: def __init__(self, encoding): self.encoding = encoding
[docs] def read(self, buf_size): data = if self.encoding is None or self.encoding == 'utf-8': return data return data.decode(self.encoding).encode('utf-8')
[docs]class BaseCommand(ABC): kwargs = {} def __init__(self, subparsers): """ Initializes the subparser and adds arguments. """ self.subparser = subparsers.add_parser(,, **self.kwargs) self.add_base_arguments() self.add_arguments() self.args = None
[docs] def add_base_arguments(self): """ Adds default arguments to all commands. """
[docs] def add_arguments(self): """ Adds arguments specific to this command. """
[docs] def add_argument(self, *args, **kwargs): """ Adds an argument to the subparser. """ self.subparser.add_argument(*args, **kwargs)
[docs] @abstractmethod def handle(self): """ Runs the command. """
[docs] def prefix(self): """ Returns the path to the items to process within each input. """ return ''
[docs] def items(self, **kwargs): """ Yields the items in the input. """ file = StandardInputReader(self.args.encoding) yield from ijson.items(file, self.prefix(), multiple_values=True, **kwargs)
[docs] def print(self, data, streaming=False): """ Prints JSON data. :param bool streaming: whether to stream output using ``json.JSONEncoder().iterencode()`` (it is only more memory efficient if ``data`` contains iterators) """ kwargs = {} if self.args.pretty: kwargs['indent'] = 2 if self.args.ascii: kwargs['ensure_ascii'] = True try: if streaming: for chunk in iterencode(data, **kwargs): print(chunk, end='') print() else: print(json_dumps(data, **kwargs)) sys.stdout.flush() # except BrokenPipeError: devnull =, os.O_WRONLY) os.dup2(devnull, sys.stdout.fileno()) sys.exit(1)
[docs]class OCDSCommand(BaseCommand, ABC):
[docs] def add_base_arguments(self): self.add_argument('--root-path', type=str, default='', help='the path to the items to process within each input')
[docs] def prefix(self): return self.args.root_path
[docs] def items(self, **kwargs): """ Yields the items in the input. If an item is an array, yields each entry of the array. """ for item in super().items(**kwargs): if isinstance(item, list): for i in item: yield i else: yield item
[docs] def add_package_arguments(self, infix, prefix='', version='1.1'): """ Adds arguments for setting package metadata to the subparser. """ kwargs = { 'infix': infix, 'prefix': prefix, } template = "{prefix}set the {infix} package's {{}} to this value".format(**kwargs) self.add_argument('--uri', type=str, default='', help=template.format('uri')) self.add_argument('--published-date', type=str, default='', help=template.format('publishedDate')) self.add_argument('--version', type=str, default=version, help=template.format("version")) self.add_argument('--publisher-name', type=str, default='', help=template.format("publisher's name")) self.add_argument('--publisher-uri', type=str, default='', help=template.format("publisher's uri")) self.add_argument('--publisher-scheme', type=str, default='', help=template.format("publisher's scheme")) self.add_argument('--publisher-uid', type=str, default='', help=template.format("publisher's uid")) self.add_argument('--fake', action='store_true', help="{prefix}set the {infix} package's required metadata to dummy values".format(**kwargs))
[docs] def parse_package_arguments(self): """ Returns package metadata as a dictionary to be used as keyword arguments. """ kwargs = { 'uri': self.args.uri, 'publisher': {}, 'published_date': self.args.published_date, 'version': self.args.version, } if self.args.fake: if not kwargs['uri']: kwargs['uri'] = 'placeholder:' if not kwargs['published_date']: kwargs['published_date'] = '9999-01-01T00:00:00Z' for key in ('publisher_name', 'publisher_uri', 'publisher_scheme', 'publisher_uid'): value = getattr(self.args, key) if value: kwargs['publisher'][key[10:]] = value return kwargs