Source code for jsonargparse.jsonschema

"""Action to support jsonschema and type hint annotations."""

import os
import json
import yaml
import inspect
from enum import Enum
from argparse import Namespace, Action
from typing import Any, Union, Tuple, List, Iterable, Sequence, Set, Dict

from .actions import _is_action_value_list
from .typing import is_optional, annotation_to_schema, type_to_str
from .util import (
    namespace_to_dict,
    Path,
    yamlParserError,
    yamlScannerError,
    ParserError,
    strip_meta,
    import_object,
    _check_unknown_kwargs,
    _issubclass,
)
from .optionals import (
    ModuleNotFound,
    jsonschemaValidationError,
    jsonschema_support,
    import_jsonschema,
    get_config_read_mode,
    files_completer,
    argcomplete_warn_redraw_prompt,
)


__all__ = ['ActionJsonSchema']


typesmap = {
    str: 'string',
    int: 'integer',
    float: 'number',
    bool: 'boolean',
    type(None): 'null',
}

supported_types = {
    bool,
    Any,
    Union,
    List, list, Iterable, Sequence,
    Tuple, tuple,
    Set, set,
    Dict, dict,
}

if jsonschema_support:
    jsonschema, jsonvalidator = import_jsonschema('jsonschema.py')


[docs]class ActionJsonSchema(Action): """Action to parse option as json validated by a jsonschema."""
[docs] def __init__(self, **kwargs): """Initializer for ActionJsonSchema instance. Args: schema (str or dict): Schema to validate values against. annotation (type): Type object from which to generate schema. enable_path (bool): Whether to try to load json from path (def.=True). with_meta (bool): Whether to include metadata (def.=True). Raises: ValueError: If a parameter is invalid. jsonschema.exceptions.SchemaError: If the schema is invalid. """ if 'schema' in kwargs or 'annotation' in kwargs: _check_unknown_kwargs(kwargs, {'schema', 'annotation', 'enable_path', 'with_meta'}) if 'annotation' in kwargs: if 'schema' in kwargs: raise ValueError('Only one of schema or annotation is accepted.') self._annotation = kwargs['annotation'] schema, subschemas = ActionJsonSchema._typing_schema(self._annotation) if schema is None or schema == {'type': 'null'}: raise ValueError('Unable to generate schema from annotation '+str(self._annotation)) self._subschemas = subschemas else: self._annotation = self._subschemas = None schema = kwargs['schema'] if isinstance(schema, str): try: schema = yaml.safe_load(schema) except (yamlParserError, yamlScannerError) as ex: raise ValueError('Problems parsing schema :: '+str(ex)) jsonvalidator.check_schema(schema) self._validator = self._extend_jsonvalidator_with_default(jsonvalidator)(schema) self._enable_path = kwargs.get('enable_path', True) self._with_meta = kwargs.get('with_meta', True) elif '_validator' not in kwargs: raise ValueError('Expected schema or annotation keyword arguments.') else: self._annotation = kwargs.pop('_annotation') self._subschemas = kwargs.pop('_subschemas') self._validator = kwargs.pop('_validator') self._enable_path = kwargs.pop('_enable_path') self._with_meta = kwargs.pop('_with_meta') metavar = self._annotation_metavar() if metavar is not None: kwargs['metavar'] = metavar super().__init__(**kwargs)
[docs] def __call__(self, *args, **kwargs): """Parses an argument validating against the corresponding jsonschema. Raises: TypeError: If the argument is not valid. """ if len(args) == 0: kwargs['_annotation'] = self._annotation kwargs['_subschemas'] = self._subschemas kwargs['_validator'] = self._validator kwargs['_enable_path'] = self._enable_path kwargs['_with_meta'] = self._with_meta if 'help' in kwargs and isinstance(kwargs['help'], str) and '%s' in kwargs['help']: kwargs['help'] = kwargs['help'] % json.dumps(self._validator.schema, sort_keys=True) return ActionJsonSchema(**kwargs) val = self._check_type(args[2]) if not self._with_meta: val = strip_meta(val) setattr(args[1], self.dest, val)
def _check_type(self, value, cfg=None): islist = _is_action_value_list(self) if not islist: value = [value] for num, val in enumerate(value): try: fpath = None if isinstance(val, str) and val.strip() != '': parsed_val = yaml.safe_load(val) if not isinstance(parsed_val, str): val = parsed_val if self._enable_path and isinstance(val, str): try: fpath = Path(val, mode=get_config_read_mode()) except TypeError: pass else: val = yaml.safe_load(fpath.get_content()) if isinstance(val, Namespace): val = namespace_to_dict(val) val = self._adapt_types(val, self._annotation, self._subschemas, reverse=True) path_meta = val.pop('__path__') if isinstance(val, dict) and '__path__' in val else None self._validator.validate(val) val = self._adapt_types(val, self._annotation, self._subschemas) if path_meta is not None: val['__path__'] = path_meta if isinstance(val, dict) and fpath is not None: val['__path__'] = fpath value[num] = val except (TypeError, yamlParserError, yamlScannerError, jsonschemaValidationError) as ex: elem = '' if not islist else ' element '+str(num+1) raise TypeError('Parser key "'+self.dest+'"'+elem+': '+str(ex)) return value if islist else value[0] @staticmethod def _extend_jsonvalidator_with_default(validator_class): """Extends a json schema validator so that it fills in default values.""" validate_properties = validator_class.VALIDATORS['properties'] def set_defaults(validator, properties, instance, schema): for prop, subschema in properties.items(): if 'default' in subschema: instance.setdefault(prop, subschema['default']) for error in validate_properties(validator, properties, instance, schema): yield error return jsonschema.validators.extend(validator_class, {'properties': set_defaults}) def _instantiate_classes(self, val): if self._annotation is not None: val = self._adapt_types(val, self._annotation, self._subschemas, instantiate_classes=True) return val @staticmethod def _adapt_types(val, annotation, subschemas, reverse=False, instantiate_classes=False): def validate_adapt(v, subschema): if subschema is not None: subannotation, subvalidator, subsubschemas = subschema if reverse: v = ActionJsonSchema._adapt_types(v, subannotation, subsubschemas, reverse, instantiate_classes) else: try: if subvalidator is not None and not instantiate_classes: subvalidator.validate(v) v = ActionJsonSchema._adapt_types(v, subannotation, subsubschemas, reverse, instantiate_classes) except jsonschemaValidationError: pass return v if subschemas is None: subschemas = [] if _issubclass(annotation, Enum): if reverse and isinstance(val, annotation): val = val.name elif not reverse and val in annotation.__members__: val = annotation[val] elif _issubclass(annotation, Path): if reverse and isinstance(val, annotation): val = str(val) elif not reverse: val = annotation(val) elif not hasattr(annotation, '__origin__'): if not reverse and \ not _issubclass(annotation, (str, int, float)) and \ isinstance(val, dict) and \ 'class_path' in val: try: val_class = import_object(val['class_path']) assert _issubclass(val_class, annotation), 'Not a subclass of '+annotation.__name__ if 'init_args' in val: from jsonargparse import ArgumentParser parser = ArgumentParser(error_handler=None, parse_as_dict=True) parser.add_class_arguments(val_class) parser.check_config(val['init_args']) if instantiate_classes: init_args = parser.instantiate_subclasses(val['init_args']) val = val_class(**init_args) # pylint: disable=not-a-mapping except (ImportError, ModuleNotFound, AttributeError, AssertionError, ParserError) as ex: raise ParserError('Problem with given class_path "'+val['class_path']+'" :: '+str(ex)) return val elif annotation.__origin__ == Union: for subschema in subschemas: val = validate_adapt(val, subschema) elif annotation.__origin__ in {Tuple, tuple, Set, set} and isinstance(val, (list, tuple, set)): if reverse: val = list(val) for n, v in enumerate(val): if n < len(subschemas) and subschemas[n] is not None: for subschema in subschemas[n]: val[n] = validate_adapt(v, subschema) if not reverse: val = tuple(val) if annotation.__origin__ in {Tuple, tuple} else set(val) elif annotation.__origin__ in {List, list, Set, set, Iterable, Sequence} and isinstance(val, list): for n, v in enumerate(val): for subschema in subschemas: val[n] = validate_adapt(v, subschema) elif annotation.__origin__ in {Dict, dict} and isinstance(val, dict): if annotation.__args__[0] == int: cast = str if reverse else int val = {cast(k): v for k, v in val.items()} if annotation.__args__[1] not in typesmap: for k, v in val.items(): for subschema in subschemas: val[k] = validate_adapt(v, subschema) return val @staticmethod def _typing_schema(annotation): """Generates a schema based on a type annotation.""" if annotation == Any: return {}, None elif annotation in typesmap: return {'type': typesmap[annotation]}, None elif _issubclass(annotation, Enum): return {'type': 'string', 'enum': list(annotation.__members__.keys())}, [(annotation, None, None)] elif _issubclass(annotation, Path): return {'type': 'string'}, [(annotation, None, None)] elif _issubclass(annotation, (str, int, float)): return annotation_to_schema(annotation), None elif not hasattr(annotation, '__origin__'): if annotation != inspect._empty: schema = { 'type': 'object', 'properties': { 'class_path': {'type': 'string'}, 'init_args': {'type': 'object'}, }, 'required': ['class_path'], 'additionalProperties': False, } return schema, [(annotation, jsonvalidator(schema), None)] return None, None elif annotation.__origin__ == Union: members = [] union_subschemas = [] for arg in annotation.__args__: schema, subschemas = ActionJsonSchema._typing_schema(arg) if schema is not None: members.append(schema) if arg not in typesmap: union_subschemas.append((arg, jsonvalidator(schema), subschemas)) if len(members) == 1: return members[0], union_subschemas elif len(members) > 1: return {'anyOf': members}, union_subschemas elif annotation.__origin__ in {Tuple, tuple}: items = [] tuple_subschemas = [] for arg in annotation.__args__: item, subschemas = ActionJsonSchema._typing_schema(arg) items.append(item) tuple_subschemas.append(subschemas) #if any(a is None for a in items): # return None, None return {'type': 'array', 'items': items, 'minItems': len(items), 'maxItems': len(items)}, tuple_subschemas elif annotation.__origin__ in {List, list, Iterable, Sequence, Set, set}: items, subschemas = ActionJsonSchema._typing_schema(annotation.__args__[0]) if items is not None: return {'type': 'array', 'items': items}, subschemas elif annotation.__origin__ in {Dict, dict} and annotation.__args__[0] in {str, int}: pattern = {str: '.*', int: '[0-9]+'}[annotation.__args__[0]] schema, subschemas = ActionJsonSchema._typing_schema(annotation.__args__[1]) if schema is not None: return {'type': 'object', 'patternProperties': {pattern: schema}}, subschemas return None, None def _annotation_metavar(self): """Generates a metavar for some types.""" metavar = None if self._annotation == bool: metavar = '{true,false}' elif is_optional(self._annotation, bool): metavar = '{true,false,null}' elif is_optional(self._annotation, Enum): enum = self._annotation.__args__[0] metavar = '{'+','.join(list(enum.__members__.keys())+['null'])+'}' return metavar
[docs] def completer(self, prefix, **kwargs): """Used by argcomplete, validates value and shows expected type.""" if self._annotation == bool: return ['true', 'false'] elif is_optional(self._annotation, bool): return ['true', 'false', 'null'] elif is_optional(self._annotation, Enum): enum = self._annotation.__args__[0] return list(enum.__members__.keys())+['null'] elif is_optional(self._annotation, Path): return ['null'] + sorted(files_completer(prefix, **kwargs)) elif chr(int(os.environ['COMP_TYPE'])) == '?': try: if prefix.strip() == '': raise ValueError() self._validator.validate(yaml.safe_load(prefix)) msg = 'value already valid, ' except (ValueError, yamlParserError, yamlScannerError, jsonschemaValidationError): msg = 'value not yet valid, ' if self._annotation is not None: msg += 'expected type '+type_to_str(self._annotation) else: schema = json.dumps(self._validator.schema, indent=2, sort_keys=True).replace('\n', '\n ') msg += 'required to be valid according to schema:\n '+schema+'\n' return argcomplete_warn_redraw_prompt(prefix, msg)