Source code for lexrpc.server

"""XRPC server implementation."""
from functools import wraps
import logging

from .base import fail, METHOD_TYPES, NSID_RE, Base

logger = logging.getLogger(__name__)


[docs] class Redirect(Exception): """Raised by XRPC handlers to direct the server to serve an HTTP redirect. Whether this is official supported by the XRPC spec is still TBD: https://github.com/bluesky-social/atproto/discussions/1228 Attributes: to (str): URL to redirect to status (int): HTTP status code, defaults to 302 """ def __init__(self, to, status=302, headers=None): assert to assert status self.to = to self.status = status self.headers = headers or {}
[docs] class Server(Base): """XRPC server base class. Subclass to implement specific methods.""" _methods = None # dict, maps string NSID to Python callable def __init__(self, **kwargs): """Constructor. Args: kwargs: passed through to :class:`Base` Raises: ValidationError: if any schema is invalid """ super().__init__(**kwargs) self._methods = {}
[docs] def method(self, nsid): """XRPC method decorator. Use on each function that implements a method. Args: nsid (str) """ def decorated(fn): self.register(nsid, fn) @wraps(fn) def wrapped(*args, **kwargs): return fn(*args, **kwargs) return wrapped return decorated
[docs] def register(self, nsid, fn): """Registers an XRPC method decorator. Alternative to :meth:`method`. Args: nsid (str) fn (callable) """ assert NSID_RE.fullmatch(nsid) existing = self._methods.get(nsid) if existing: fail(f'{nsid} already registered with {existing}', AssertionError) self._methods[nsid] = fn
[docs] def call(self, nsid, input=None, **params): """Calls an XRPC query or procedure method. For subscriptions, returns a generator that yields ``(header dict, payload dict)`` tuples to be DAG-CBOR encoded and sent to the websocket client. Args: nsid (str): method NSID input (dict or bytes): input body, optional for subscriptions params: optional parameters Returns: dict: output Raises: NotImplementedError: if the given NSID is not implemented or found in any of the loaded lexicons ValidationError: if the parameters, input, or returned output don't validate against the method's schemas """ logger.debug(f'{nsid}: {params} {self.loggable(input)}') fn = self._methods.get(nsid) if not fn: fail(f'{nsid} not implemented', NotImplementedError) subscription = self.defs[nsid]['type'] == 'subscription' # validate params and input, then encode params params = self.validate(nsid, 'parameters', params) input = self.validate(nsid, 'input', input) args = [] if subscription else [input] output = fn(*args, **params) if subscription: def validator(): for header, payload in output: payload = self.validate(nsid, 'message', payload) yield header, payload return validator() else: logger.debug(f'Returning {self.loggable(output)}') return self.validate(nsid, 'output', output)