Patrick Kelley 8fd444092b initial
2025-05-07 15:35:15 -04:00

409 lines
16 KiB
Python

#
# Testing the data conversion back and forth between broker.Data and Python.
# Note that the user will rarely see broker.Data instances directly. We make
# sure that the transparent conversion works as expected.
#
import datetime
import ipaddress
import math
import time
import types
import unittest
import broker
class TestDataConstruction(unittest.TestCase):
# Given a Python value 'p' and expected broker type 't', convert 'p' to
# the corresponding broker type and verify that it has the correct type
# and the expected string representation 's' (note this is not necessarily
# the same as str(p)).
def check_to_broker(self, p, s, t):
assert not isinstance(p, broker.Data)
b = broker.Data(p)
# print("[to_broker] ({} / {}) -> ({} / {}) (expected: {} / {})".format(p, type(p), str(b), b.get_type(), s, t))
if s is not None:
self.assertEqual(str(b), s)
self.assertEqual(b.get_type(), t)
return b
# Given a broker value 'b' and a Python value 'p', convert 'b' to the
# corresponding python type, then verify that the type and value match
# the original python value 'p'.
def check_to_py(self, b, p):
b2p = broker.Data.to_py(b)
# print("[to_py] data({} / {}) -> ({} / {})".format(str(b), b.get_type(), b2p, type(b2p)))
self.assertIsInstance(b2p, type(p))
if isinstance(p, datetime.datetime):
# Some pythons may have rounding issues converting datetime to
# timestamp and back again. https://bugs.python.org/issue23517
if p.tzinfo:
# Our test cases were explicit in using UTC
self.assertEqual(b2p.tzinfo, p.tzinfo)
self.assertEqual(b2p.year, p.year)
self.assertEqual(b2p.month, p.month)
self.assertEqual(b2p.day, p.day)
self.assertEqual(b2p.hour, p.hour)
self.assertEqual(b2p.minute, p.minute)
self.assertEqual(b2p.second, p.second)
us_equal = (
b2p.microsecond == p.microsecond
or b2p.microsecond == p.microsecond - 1
or b2p.microsecond == p.microsecond + 1
)
self.assertTrue(us_equal)
else:
# 'b2p' is in UTC and 'p' is assumed to be local time
self.assertEqual(b2p.tzinfo, broker.utc)
b2p_us = b2p.microsecond
b2p_ts = (
b2p - datetime.datetime(1970, 1, 1, tzinfo=broker.utc)
).total_seconds()
b2p_ts = math.trunc(b2p_ts)
p_us = p.microsecond
p_ts = time.mktime(p.timetuple())
self.assertEqual(b2p_ts, p_ts)
us_equal = b2p_us == p_us or b2p_us == p_us - 1 or b2p_us == p_us + 1
self.assertTrue(us_equal)
else:
self.assertEqual(b2p, p)
def check_to_broker_and_back(self, p, s, t, p_final=None):
"""Given a Python value p, convert to Broker and check that the latter renders
to string s and matches Broker type t. Then convert back to Python and
verify that type and value match p_final. If p_final is None, use the
original input p.
"""
b = self.check_to_broker(p, s, t)
self.check_to_py(b, p if p_final is None else p_final)
return b
def test_bool(self):
self.check_to_broker_and_back(True, "T", broker.Data.Type.Boolean)
self.check_to_broker_and_back(False, "F", broker.Data.Type.Boolean)
def test_integer(self):
self.check_to_broker_and_back(42, "42", broker.Data.Type.Integer)
self.check_to_broker_and_back(-42, "-42", broker.Data.Type.Integer)
# Test a value that is beyond range of unsigned 32-bit integer
self.check_to_broker_and_back(
5123123123, "5123123123", broker.Data.Type.Integer
)
def test_count(self):
self.check_to_broker_and_back(broker.Count(42), "42", broker.Data.Type.Count)
# Test a value that is beyond range of unsigned 32-bit integer
self.check_to_broker_and_back(
broker.Count(5123123123), "5123123123", broker.Data.Type.Count
)
def test_count_overflow(self):
with self.assertRaises(Exception) as context: # noqa: F841
# I've seen this raise either OverflowError or SystemError
# depending on Python version is seems.
self.check_to_broker(broker.Count(-1), "-1", broker.Data.Type.Count)
def test_real(self):
self.check_to_broker_and_back(
1e18, "1000000000000000000.000000", broker.Data.Type.Real
)
self.check_to_broker_and_back(4.2, "4.200000", broker.Data.Type.Real)
self.check_to_broker_and_back(-4.2, "-4.200000", broker.Data.Type.Real)
def test_timespan(self):
to_us = lambda x: x.microseconds + (x.seconds + x.days * 24 * 3600) * 10**6
to_ns = lambda x: to_us(x) * 10**3
# Setup timespan values
neg1ms = datetime.timedelta(microseconds=-1000)
neg1ms_ns = -1000 * 10**3
self.assertEqual(neg1ms_ns, to_ns(neg1ms))
neg42sec = datetime.timedelta(milliseconds=-42 * 10**3)
neg42sec_ns = (-42 * 10**6) * 10**3
self.assertEqual(neg42sec_ns, to_ns(neg42sec))
pos1ms2us = datetime.timedelta(milliseconds=1, microseconds=2)
pos1ms2us_ns = (1000 + 2) * 10**3
self.assertEqual(pos1ms2us_ns, to_ns(pos1ms2us))
pos1day2s3us = datetime.timedelta(days=1, seconds=2, microseconds=3)
pos1day2s3us_ns = (3 + (2 + 24 * 3600) * 10**6) * 10**3
self.assertEqual(pos1day2s3us_ns, to_ns(pos1day2s3us))
# Verify Timespan only
self.assertEqual(broker.Timespan(neg1ms_ns), broker.Timespan(to_ns(neg1ms)))
self.assertEqual(broker.Timespan(neg42sec_ns), broker.Timespan(to_ns(neg42sec)))
self.assertEqual(
broker.Timespan(pos1ms2us_ns), broker.Timespan(to_ns(pos1ms2us))
)
self.assertEqual(
broker.Timespan(pos1day2s3us_ns), broker.Timespan(to_ns(pos1day2s3us))
)
# Verify Data
self.check_to_broker_and_back(neg1ms, "-1000000ns", broker.Data.Type.Timespan)
self.check_to_broker_and_back(
neg42sec, "-42000000000ns", broker.Data.Type.Timespan
)
self.check_to_broker_and_back(pos1ms2us, "1002000ns", broker.Data.Type.Timespan)
self.check_to_broker_and_back(
pos1day2s3us, "86402000003000ns", broker.Data.Type.Timespan
)
def test_timestamp(self):
self.check_to_broker(broker.now(), None, broker.Data.Type.Timestamp)
today = datetime.datetime.now(broker.utc)
self.check_to_broker_and_back(today, None, broker.Data.Type.Timestamp)
today = datetime.datetime.now(datetime.timezone.utc)
self.check_to_broker_and_back(today, None, broker.Data.Type.Timestamp)
today = datetime.datetime.now()
self.check_to_broker_and_back(today, None, broker.Data.Type.Timestamp)
past = datetime.datetime(1970, 1, 31, tzinfo=broker.utc)
self.check_to_broker_and_back(past, None, broker.Data.Type.Timestamp)
# Test a time value with number of seconds since Jan. 1 1970 beyond
# the range of a signed 32-bit integer (the "year 2038 problem").
future = datetime.datetime(2040, 1, 31, tzinfo=broker.utc)
try:
self.check_to_broker_and_back(future, None, broker.Data.Type.Timestamp)
except OverflowError:
# This test fails on some 32-bit systems (such as Debian 9 i386),
# but for now we just ignore this failure.
pass
def test_string(self):
self.check_to_broker_and_back("", "", broker.Data.Type.String)
self.check_to_broker_and_back("foo", "foo", broker.Data.Type.String)
self.check_to_broker_and_back("\ttab", "\ttab", broker.Data.Type.String)
self.check_to_broker_and_back("new\n", "new\n", broker.Data.Type.String)
def test_address_v4(self):
addr = ipaddress.IPv4Address("0.0.0.0")
self.check_to_broker_and_back(addr, "0.0.0.0", broker.Data.Type.Address)
addr = ipaddress.IPv4Address("1.2.3.4")
self.check_to_broker_and_back(addr, "1.2.3.4", broker.Data.Type.Address)
addr = ipaddress.IPv4Address("255.255.255.255")
self.check_to_broker_and_back(addr, "255.255.255.255", broker.Data.Type.Address)
def test_address_v6(self):
addr = ipaddress.IPv6Address("::")
self.check_to_broker_and_back(addr, "::", broker.Data.Type.Address)
addr = ipaddress.IPv6Address("::1")
self.check_to_broker_and_back(addr, "::1", broker.Data.Type.Address)
addr = ipaddress.IPv6Address("1:2:3:4:5:6:7:8")
self.check_to_broker_and_back(addr, "1:2:3:4:5:6:7:8", broker.Data.Type.Address)
addr = ipaddress.IPv6Address("ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff")
self.check_to_broker_and_back(
addr, "ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff", broker.Data.Type.Address
)
def test_subnet_v4(self):
sn = ipaddress.IPv4Network("1.2.3.4/32")
self.check_to_broker_and_back(sn, "1.2.3.4/32", broker.Data.Type.Subnet)
sn = ipaddress.IPv4Network("10.0.0.0/8")
self.check_to_broker_and_back(sn, "10.0.0.0/8", broker.Data.Type.Subnet)
sn = ipaddress.IPv4Network("0.0.0.0/0")
self.check_to_broker_and_back(sn, "0.0.0.0/0", broker.Data.Type.Subnet)
def test_subnet_v6(self):
sn = ipaddress.IPv6Network("::1/128")
self.check_to_broker_and_back(sn, "::1/128", broker.Data.Type.Subnet)
sn = ipaddress.IPv6Network("fc00::/7")
self.check_to_broker_and_back(sn, "fc00::/7", broker.Data.Type.Subnet)
sn = ipaddress.IPv6Network("::/0")
self.check_to_broker_and_back(sn, "::/0", broker.Data.Type.Subnet)
def test_port(self):
self.check_to_broker_and_back(
broker.Port(65535, broker.Port.TCP), "65535/tcp", broker.Data.Type.Port
)
self.check_to_broker_and_back(
broker.Port(53, broker.Port.UDP), "53/udp", broker.Data.Type.Port
)
self.check_to_broker_and_back(
broker.Port(8, broker.Port.ICMP), "8/icmp", broker.Data.Type.Port
)
self.check_to_broker_and_back(
broker.Port(0, broker.Port.Unknown), "0/?", broker.Data.Type.Port
)
def _test_set_impl(self, set_itype, set_otype=None):
# Common set testing functionality for an input type into Broker and a
# corresponding output type (set_itype/set_otype). When the output type
# isn't provided, use the input type:
set_otype = set_itype if set_otype is None else set_otype
# Test an empty set
self.check_to_broker_and_back(
set_itype(), "{}", broker.Data.Type.Set, set_otype()
)
# Test a simple set
pi, po = set_itype([1, 2, 3]), set_otype([1, 2, 3])
d = self.check_to_broker_and_back(pi, "{1, 2, 3}", broker.Data.Type.Set, po)
for i, x in enumerate(d.as_set()):
self.check_to_broker(x, str(i + 1), broker.Data.Type.Integer)
self.check_to_py(x, i + 1)
# Test a set that contains various data types
d = broker.Data(set_itype(["foo", ipaddress.IPv6Address("::1"), None]))
for i, x in enumerate(d.as_set()):
if i == 1:
self.check_to_broker(x, "foo", broker.Data.Type.String)
self.check_to_py(x, "foo")
elif i == 2:
self.check_to_broker(x, "::1", broker.Data.Type.Address)
self.check_to_py(x, ipaddress.IPv6Address("::1"))
# Test some of our own methods on wrapped sets.
d = broker.Data(set_itype([1, 2, 3])).as_set()
self.assertEqual(str(d), "Set{1, 2, 3}")
d.remove(broker.Data(2))
self.assertEqual(str(d), "Set{1, 3}")
d.clear()
self.assertEqual(str(d), "Set{}")
def test_set(self):
self._test_set_impl(set)
def test_frozenset(self):
# Python frozensets convert to Broker sets, so by default their mapping
# back to Python results in regular sets.
self._test_set_impl(frozenset, set)
def _test_table_impl(self, table_itype, table_otype=None):
# Common table testing functionality for an input type into Broker and a
# corresponding output type (table_itype/table_otype). When the output
# type isn't provided, use the input type:
table_otype = table_itype if table_otype is None else table_otype
# Test an empty table
self.check_to_broker_and_back(
table_itype({}), "{}", broker.Data.Type.Table, table_otype()
)
# Test a simple table
d = {"a": 1, "b": 2, "c": 3}
pi, po = table_itype(d), table_otype(d)
d = self.check_to_broker_and_back(
pi, "{a -> 1, b -> 2, c -> 3}", broker.Data.Type.Table, po
)
for i, (k, v) in enumerate(d.as_table().items()):
self.check_to_broker(k, ["a", "b", "c"][i], broker.Data.Type.String)
self.check_to_py(k, ["a", "b", "c"][i])
self.check_to_broker(v, str(i + 1), broker.Data.Type.Integer)
self.check_to_py(v, i + 1)
# Test a table that contains different data types
p = table_itype(
{
True: 42,
broker.Port(22, broker.Port.TCP): False,
(1, 2, 3): [4, 5, 6],
broker.Count(13): "test",
}
)
d = self.check_to_broker(
p,
"{T -> 42, 13 -> test, 22/tcp -> F, (1, 2, 3) -> (4, 5, 6)}",
broker.Data.Type.Table,
)
t = d.as_table()
self.check_to_broker(t[broker.Data(True)], "42", broker.Data.Type.Integer)
self.check_to_py(t[broker.Data(True)], 42)
self.check_to_broker(
t[broker.Data(broker.Port(22, broker.Port.TCP))],
"F",
broker.Data.Type.Boolean,
)
self.check_to_py(t[broker.Data(broker.Port(22, broker.Port.TCP))], False)
self.check_to_broker(
t[broker.Data((1, 2, 3))], "(4, 5, 6)", broker.Data.Type.Vector
)
self.check_to_py(t[broker.Data([1, 2, 3])], (4, 5, 6))
self.check_to_broker(
t[broker.Data(broker.Count(13))], "test", broker.Data.Type.String
)
self.check_to_py(t[broker.Data(broker.Count(13))], "test")
def test_dict(self):
self._test_table_impl(dict)
def test_mapping_proxy_type(self):
# Python MappingProxyType instances will convert to Broker tables, which
# by default convert back to regular dicts.
self._test_table_impl(types.MappingProxyType, dict)
def test_vector(self):
# Test an empty vector
self.check_to_broker_and_back((), "()", broker.Data.Type.Vector)
# Test a simple vector
d = self.check_to_broker((1, 2, 3), "(1, 2, 3)", broker.Data.Type.Vector)
# Either a list or a tuple are mapped to a Broker vector.
d = self.check_to_broker([1, 2, 3], "(1, 2, 3)", broker.Data.Type.Vector)
for i, x in enumerate(d.as_vector()):
self.check_to_broker(x, str(i + 1), broker.Data.Type.Integer)
self.check_to_py(x, i + 1)
# Test a vector that contains various data types
d = broker.Data(["foo", [[True]], ipaddress.IPv6Address("::1"), None])
v = d.as_vector()
self.check_to_broker(v[0], "foo", broker.Data.Type.String)
self.check_to_py(v[0], "foo")
v1 = v[1].as_vector()
self.check_to_broker(v1, "((T))", broker.Data.Type.Vector)
self.check_to_py(v[1], ((True,),))
self.check_to_broker(v1[0], "(T)", broker.Data.Type.Vector)
self.check_to_py(v1[0], (True,))
self.check_to_broker(v[2], "::1", broker.Data.Type.Address)
self.check_to_py(v[2], ipaddress.IPv6Address("::1"))
self.check_to_broker(v[3], "nil", broker.Data.Type.Nil)
if __name__ == "__main__":
unittest.main(verbosity=3)