importtypingfromcollections.abcimportCallablefrom.importmodel# Type alias for a parser callable. It should be an async function.# Args:# str: The uid of the entity.# Any: The raw event data. zeep.xsd.ComplexType or zeep.xsd.AnySimpleType.# TODO: could we make this zeep.Type or zeep.AnyType?# Returns:# Awaitable[model.EventEntity]: The parsed EventEntity.ParserCallable:typing.TypeAlias=Callable[[str,typing.Any],typing.Awaitable[list[model.EventEntity]]]
[docs]classRegistry:"""A registry of parsers."""def__init__(self)->None:self.registry:dict[str,ParserCallable]={}
[docs]defregister(self,key:str,f:ParserCallable)->None:"""Register a parser function under a given key."""ifkeyinself.registry:raiseValueError(f"Key {key} already registered")self.registry[key]=f
[docs]defget(self,key:str)->ParserCallable|None:"""Get a parser function by key."""returnself.registry.get(key)
_REGISTRY=Registry()
[docs]defregister(topic:str)->Callable[[ParserCallable],ParserCallable]:"""Register an onvif parser callable with the given topic."""defwrapper(func:ParserCallable)->ParserCallable:_REGISTRY.register(topic,func)returnfuncreturnwrapper
[docs]defget_parser(topic:str)->ParserCallable|None:"""Get a parser callable for the given topic."""return_REGISTRY.get(topic)