Source code for onvif_parsers.util
import typing
import zeep.helpers
from lxml import etree
[docs]
def extract_message(msg: typing.Any) -> tuple[str, typing.Any]:
"""Extract the message content and the topic."""
return msg.Topic._value_1, msg.Message._value_1
_VIDEO_SOURCE_MAPPING = {
"vsconf": "VideoSourceToken",
}
[docs]
def normalize_video_source(source: str) -> str:
"""
Normalize video source.
Some cameras do not set the VideoSourceToken correctly so we get duplicate
sensors, so we need to normalize it to the correct value.
"""
return _VIDEO_SOURCE_MAPPING.get(source, source)
[docs]
def event_to_debug_format(data: typing.Any) -> typing.Any:
"""
Converts an event to a format for debugging.
This is useful because the default repr for zeep event payload doesn't include the
body of unknown XML elements. This will convert the unknown XML into strings that
can then be deserialized back into an event for testing.
"""
# 1. Check if the object is a Zeep CompoundValue.
# Using serialize_object strips away the Zeep classes and leaves native dicts/lists.
if hasattr(data, "__values__"):
data = zeep.helpers.serialize_object(data)
if isinstance(data, dict):
return {k: event_to_debug_format(v) for k, v in data.items()}
if isinstance(data, list):
return [event_to_debug_format(i) for i in data]
if hasattr(data, "tag") and hasattr(data, "text"):
# It's an lxml Element. Convert the tree to a pretty XML string.
try:
return etree.tostring(data, pretty_print=True, encoding="unicode")
except Exception: # noqa: BLE001 - debug helper must never raise
return repr(data)
return data