Source code for lexrpc.base

"""Base code shared by both server and client."""
import copy
import logging
import re
import urllib.parse

import jsonschema
from jsonschema.validators import validator_for

logger = logging.getLogger(__name__)

LEXICON_TYPES = frozenset((
    'query',
    'procedure',
    'record',
    'token',
))
LEXICON_METHOD_TYPES = frozenset((
    'query',
    'procedure',
))
PARAMETER_TYPES = frozenset((
    'boolean',
    'integer',
    'number',
    'string',
))

# https://atproto.com/specs/nsid
NSID_SEGMENT = '[a-zA-Z0-9-]+'
NSID_SEGMENT_RE = re.compile(f'^{NSID_SEGMENT}$')
NSID_RE = re.compile(f'^{NSID_SEGMENT}(\.{NSID_SEGMENT})*$')


def fail(msg, exc=NotImplementedError):
    """Logs an error and raises an exception with the given message."""
    logger.error(msg)
    raise exc(msg)


[docs]class Base(): """Base class for both XRPC client and server.""" _lexicons = None # dict mapping NSID to lexicon object
[docs] def __init__(self, lexicons): """Constructor. Args: lexicons: sequence of dict lexicons Raises: :class:`jsonschema.SchemaError` if any schema is invalid """ assert isinstance(lexicons, (list, tuple)) self._lexicons = {l['id']: l for l in copy.deepcopy(lexicons)} logger.debug(f'Got lexicons for: {self._lexicons.keys()}') # validate schemas, convert parameters field into full JSON schema for i, lexicon in enumerate(self._lexicons.values()): id = lexicon.get('id') assert id, f'Lexicon {i} missing id field' # support new defs format defs_main = lexicon.get('defs', {}).get('main') if defs_main: lexicon = self._lexicons[id] = defs_main type = lexicon.get('type') assert type in LEXICON_TYPES, f'Bad type for lexicon {id}: {type}' # preprocess parameters properties into full JSON Schema props = lexicon.get('parameters', {}) lexicon['parameters'] = { 'schema': { 'type': 'object', 'required': [], 'properties': props, }, } for name, schema in props.items(): if schema.pop('required', False): lexicon['parameters']['schema']['required'].append(name) # validate schemas for field in 'input', 'output', 'parameters', 'record': logger.debug(f'Validating {id} {field} schema') schema = lexicon.get(field, {}).get('schema') if schema: validator_for(schema).check_schema(schema)
def _get_lexicon(self, nsid): """Returns the given lexicon object. Raises: NotImplementedError if no lexicon exists for the given NSID """ lexicon = self._lexicons.get(nsid) if not lexicon: fail(f'{nsid} not found') return lexicon def _validate(self, nsid, type, obj): """Validates a JSON object against a given schema. Args: nsid: str, method NSID type: either 'input' or 'output' obj: decoded JSON object Returns: None Raises: NotImplementedError if no lexicon exists for the given NSID, or the lexicon does not define a schema for the given type :class:`jsonschema.ValidationError` if the object is invalid """ assert type in ('input', 'output', 'parameters', 'record'), type schema = self._get_lexicon(nsid).get(type, {}).get('schema') if not schema: if not obj: return fail(f'{nsid} has no schema for {type}') logger.debug(f'Validating {nsid} {type}') try: jsonschema.validate(obj, schema) except jsonschema.ValidationError as e: e.message = f'Error validating {nsid} {type}: {e.message}' raise
[docs] def encode_params(self, params): """Encodes decoded parameter values. Based on https://atproto.com/specs/xrpc#path Args: params: dict mapping str names to boolean, number, or str values Returns: dict mapping str names to str encoded values """ return {name: ('true' if val is True else 'false' if val is False else urllib.parse.quote(str(val))) for name, val in params.items()}
[docs] def decode_params(self, method_nsid, params): """Decodes encoded parameter values. Based on https://atproto.com/specs/xrpc#path Args: method_nsid: str params: dict mapping str names to encoded str values Returns: dict mapping str names to decoded boolean, number, and str values Raises: ValueError if a parameter value can't be decoded NotImplementedError if no method lexicon is registered for the given NSID """ lexicon = self._get_lexicon(method_nsid) params_schema = lexicon.get('parameters', {})\ .get('schema', {})\ .get('properties', {}) decoded = {} for name, val in params.items(): type = params_schema.get(name, {}).get('type') or 'string' assert type in PARAMETER_TYPES if type == 'boolean': if val == 'true': decoded[name] = True elif val == 'false': decoded[name] = False else: raise ValueError( f'Got {val!r} for boolean parameter {name}, expected true or false') try: if type == 'number': decoded[name] = float(val) elif type == 'int': decoded[name] = int(val) except ValueError as e: e.args = [f'{e.args[0]} for {type} parameter {name}'] raise e if type == 'string': decoded[name] = val return decoded