# # Testing the data conversion back and forth between broker.Data and Python. # Note that the user will rarely see broker.Data instances directly. We make # sure that the transparent conversion works as expected. # import datetime import ipaddress import math import time import types import unittest import broker class TestDataConstruction(unittest.TestCase): # Given a Python value 'p' and expected broker type 't', convert 'p' to # the corresponding broker type and verify that it has the correct type # and the expected string representation 's' (note this is not necessarily # the same as str(p)). def check_to_broker(self, p, s, t): assert not isinstance(p, broker.Data) b = broker.Data(p) # print("[to_broker] ({} / {}) -> ({} / {}) (expected: {} / {})".format(p, type(p), str(b), b.get_type(), s, t)) if s is not None: self.assertEqual(str(b), s) self.assertEqual(b.get_type(), t) return b # Given a broker value 'b' and a Python value 'p', convert 'b' to the # corresponding python type, then verify that the type and value match # the original python value 'p'. def check_to_py(self, b, p): b2p = broker.Data.to_py(b) # print("[to_py] data({} / {}) -> ({} / {})".format(str(b), b.get_type(), b2p, type(b2p))) self.assertIsInstance(b2p, type(p)) if isinstance(p, datetime.datetime): # Some pythons may have rounding issues converting datetime to # timestamp and back again. https://bugs.python.org/issue23517 if p.tzinfo: # Our test cases were explicit in using UTC self.assertEqual(b2p.tzinfo, p.tzinfo) self.assertEqual(b2p.year, p.year) self.assertEqual(b2p.month, p.month) self.assertEqual(b2p.day, p.day) self.assertEqual(b2p.hour, p.hour) self.assertEqual(b2p.minute, p.minute) self.assertEqual(b2p.second, p.second) us_equal = ( b2p.microsecond == p.microsecond or b2p.microsecond == p.microsecond - 1 or b2p.microsecond == p.microsecond + 1 ) self.assertTrue(us_equal) else: # 'b2p' is in UTC and 'p' is assumed to be local time self.assertEqual(b2p.tzinfo, broker.utc) b2p_us = b2p.microsecond b2p_ts = ( b2p - datetime.datetime(1970, 1, 1, tzinfo=broker.utc) ).total_seconds() b2p_ts = math.trunc(b2p_ts) p_us = p.microsecond p_ts = time.mktime(p.timetuple()) self.assertEqual(b2p_ts, p_ts) us_equal = b2p_us == p_us or b2p_us == p_us - 1 or b2p_us == p_us + 1 self.assertTrue(us_equal) else: self.assertEqual(b2p, p) def check_to_broker_and_back(self, p, s, t, p_final=None): """Given a Python value p, convert to Broker and check that the latter renders to string s and matches Broker type t. Then convert back to Python and verify that type and value match p_final. If p_final is None, use the original input p. """ b = self.check_to_broker(p, s, t) self.check_to_py(b, p if p_final is None else p_final) return b def test_bool(self): self.check_to_broker_and_back(True, "T", broker.Data.Type.Boolean) self.check_to_broker_and_back(False, "F", broker.Data.Type.Boolean) def test_integer(self): self.check_to_broker_and_back(42, "42", broker.Data.Type.Integer) self.check_to_broker_and_back(-42, "-42", broker.Data.Type.Integer) # Test a value that is beyond range of unsigned 32-bit integer self.check_to_broker_and_back( 5123123123, "5123123123", broker.Data.Type.Integer ) def test_count(self): self.check_to_broker_and_back(broker.Count(42), "42", broker.Data.Type.Count) # Test a value that is beyond range of unsigned 32-bit integer self.check_to_broker_and_back( broker.Count(5123123123), "5123123123", broker.Data.Type.Count ) def test_count_overflow(self): with self.assertRaises(Exception) as context: # noqa: F841 # I've seen this raise either OverflowError or SystemError # depending on Python version is seems. self.check_to_broker(broker.Count(-1), "-1", broker.Data.Type.Count) def test_real(self): self.check_to_broker_and_back( 1e18, "1000000000000000000.000000", broker.Data.Type.Real ) self.check_to_broker_and_back(4.2, "4.200000", broker.Data.Type.Real) self.check_to_broker_and_back(-4.2, "-4.200000", broker.Data.Type.Real) def test_timespan(self): to_us = lambda x: x.microseconds + (x.seconds + x.days * 24 * 3600) * 10**6 to_ns = lambda x: to_us(x) * 10**3 # Setup timespan values neg1ms = datetime.timedelta(microseconds=-1000) neg1ms_ns = -1000 * 10**3 self.assertEqual(neg1ms_ns, to_ns(neg1ms)) neg42sec = datetime.timedelta(milliseconds=-42 * 10**3) neg42sec_ns = (-42 * 10**6) * 10**3 self.assertEqual(neg42sec_ns, to_ns(neg42sec)) pos1ms2us = datetime.timedelta(milliseconds=1, microseconds=2) pos1ms2us_ns = (1000 + 2) * 10**3 self.assertEqual(pos1ms2us_ns, to_ns(pos1ms2us)) pos1day2s3us = datetime.timedelta(days=1, seconds=2, microseconds=3) pos1day2s3us_ns = (3 + (2 + 24 * 3600) * 10**6) * 10**3 self.assertEqual(pos1day2s3us_ns, to_ns(pos1day2s3us)) # Verify Timespan only self.assertEqual(broker.Timespan(neg1ms_ns), broker.Timespan(to_ns(neg1ms))) self.assertEqual(broker.Timespan(neg42sec_ns), broker.Timespan(to_ns(neg42sec))) self.assertEqual( broker.Timespan(pos1ms2us_ns), broker.Timespan(to_ns(pos1ms2us)) ) self.assertEqual( broker.Timespan(pos1day2s3us_ns), broker.Timespan(to_ns(pos1day2s3us)) ) # Verify Data self.check_to_broker_and_back(neg1ms, "-1000000ns", broker.Data.Type.Timespan) self.check_to_broker_and_back( neg42sec, "-42000000000ns", broker.Data.Type.Timespan ) self.check_to_broker_and_back(pos1ms2us, "1002000ns", broker.Data.Type.Timespan) self.check_to_broker_and_back( pos1day2s3us, "86402000003000ns", broker.Data.Type.Timespan ) def test_timestamp(self): self.check_to_broker(broker.now(), None, broker.Data.Type.Timestamp) today = datetime.datetime.now(broker.utc) self.check_to_broker_and_back(today, None, broker.Data.Type.Timestamp) today = datetime.datetime.now(datetime.timezone.utc) self.check_to_broker_and_back(today, None, broker.Data.Type.Timestamp) today = datetime.datetime.now() self.check_to_broker_and_back(today, None, broker.Data.Type.Timestamp) past = datetime.datetime(1970, 1, 31, tzinfo=broker.utc) self.check_to_broker_and_back(past, None, broker.Data.Type.Timestamp) # Test a time value with number of seconds since Jan. 1 1970 beyond # the range of a signed 32-bit integer (the "year 2038 problem"). future = datetime.datetime(2040, 1, 31, tzinfo=broker.utc) try: self.check_to_broker_and_back(future, None, broker.Data.Type.Timestamp) except OverflowError: # This test fails on some 32-bit systems (such as Debian 9 i386), # but for now we just ignore this failure. pass def test_string(self): self.check_to_broker_and_back("", "", broker.Data.Type.String) self.check_to_broker_and_back("foo", "foo", broker.Data.Type.String) self.check_to_broker_and_back("\ttab", "\ttab", broker.Data.Type.String) self.check_to_broker_and_back("new\n", "new\n", broker.Data.Type.String) def test_address_v4(self): addr = ipaddress.IPv4Address("0.0.0.0") self.check_to_broker_and_back(addr, "0.0.0.0", broker.Data.Type.Address) addr = ipaddress.IPv4Address("1.2.3.4") self.check_to_broker_and_back(addr, "1.2.3.4", broker.Data.Type.Address) addr = ipaddress.IPv4Address("255.255.255.255") self.check_to_broker_and_back(addr, "255.255.255.255", broker.Data.Type.Address) def test_address_v6(self): addr = ipaddress.IPv6Address("::") self.check_to_broker_and_back(addr, "::", broker.Data.Type.Address) addr = ipaddress.IPv6Address("::1") self.check_to_broker_and_back(addr, "::1", broker.Data.Type.Address) addr = ipaddress.IPv6Address("1:2:3:4:5:6:7:8") self.check_to_broker_and_back(addr, "1:2:3:4:5:6:7:8", broker.Data.Type.Address) addr = ipaddress.IPv6Address("ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff") self.check_to_broker_and_back( addr, "ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff", broker.Data.Type.Address ) def test_subnet_v4(self): sn = ipaddress.IPv4Network("1.2.3.4/32") self.check_to_broker_and_back(sn, "1.2.3.4/32", broker.Data.Type.Subnet) sn = ipaddress.IPv4Network("10.0.0.0/8") self.check_to_broker_and_back(sn, "10.0.0.0/8", broker.Data.Type.Subnet) sn = ipaddress.IPv4Network("0.0.0.0/0") self.check_to_broker_and_back(sn, "0.0.0.0/0", broker.Data.Type.Subnet) def test_subnet_v6(self): sn = ipaddress.IPv6Network("::1/128") self.check_to_broker_and_back(sn, "::1/128", broker.Data.Type.Subnet) sn = ipaddress.IPv6Network("fc00::/7") self.check_to_broker_and_back(sn, "fc00::/7", broker.Data.Type.Subnet) sn = ipaddress.IPv6Network("::/0") self.check_to_broker_and_back(sn, "::/0", broker.Data.Type.Subnet) def test_port(self): self.check_to_broker_and_back( broker.Port(65535, broker.Port.TCP), "65535/tcp", broker.Data.Type.Port ) self.check_to_broker_and_back( broker.Port(53, broker.Port.UDP), "53/udp", broker.Data.Type.Port ) self.check_to_broker_and_back( broker.Port(8, broker.Port.ICMP), "8/icmp", broker.Data.Type.Port ) self.check_to_broker_and_back( broker.Port(0, broker.Port.Unknown), "0/?", broker.Data.Type.Port ) def _test_set_impl(self, set_itype, set_otype=None): # Common set testing functionality for an input type into Broker and a # corresponding output type (set_itype/set_otype). When the output type # isn't provided, use the input type: set_otype = set_itype if set_otype is None else set_otype # Test an empty set self.check_to_broker_and_back( set_itype(), "{}", broker.Data.Type.Set, set_otype() ) # Test a simple set pi, po = set_itype([1, 2, 3]), set_otype([1, 2, 3]) d = self.check_to_broker_and_back(pi, "{1, 2, 3}", broker.Data.Type.Set, po) for i, x in enumerate(d.as_set()): self.check_to_broker(x, str(i + 1), broker.Data.Type.Integer) self.check_to_py(x, i + 1) # Test a set that contains various data types d = broker.Data(set_itype(["foo", ipaddress.IPv6Address("::1"), None])) for i, x in enumerate(d.as_set()): if i == 1: self.check_to_broker(x, "foo", broker.Data.Type.String) self.check_to_py(x, "foo") elif i == 2: self.check_to_broker(x, "::1", broker.Data.Type.Address) self.check_to_py(x, ipaddress.IPv6Address("::1")) # Test some of our own methods on wrapped sets. d = broker.Data(set_itype([1, 2, 3])).as_set() self.assertEqual(str(d), "Set{1, 2, 3}") d.remove(broker.Data(2)) self.assertEqual(str(d), "Set{1, 3}") d.clear() self.assertEqual(str(d), "Set{}") def test_set(self): self._test_set_impl(set) def test_frozenset(self): # Python frozensets convert to Broker sets, so by default their mapping # back to Python results in regular sets. self._test_set_impl(frozenset, set) def _test_table_impl(self, table_itype, table_otype=None): # Common table testing functionality for an input type into Broker and a # corresponding output type (table_itype/table_otype). When the output # type isn't provided, use the input type: table_otype = table_itype if table_otype is None else table_otype # Test an empty table self.check_to_broker_and_back( table_itype({}), "{}", broker.Data.Type.Table, table_otype() ) # Test a simple table d = {"a": 1, "b": 2, "c": 3} pi, po = table_itype(d), table_otype(d) d = self.check_to_broker_and_back( pi, "{a -> 1, b -> 2, c -> 3}", broker.Data.Type.Table, po ) for i, (k, v) in enumerate(d.as_table().items()): self.check_to_broker(k, ["a", "b", "c"][i], broker.Data.Type.String) self.check_to_py(k, ["a", "b", "c"][i]) self.check_to_broker(v, str(i + 1), broker.Data.Type.Integer) self.check_to_py(v, i + 1) # Test a table that contains different data types p = table_itype( { True: 42, broker.Port(22, broker.Port.TCP): False, (1, 2, 3): [4, 5, 6], broker.Count(13): "test", } ) d = self.check_to_broker( p, "{T -> 42, 13 -> test, 22/tcp -> F, (1, 2, 3) -> (4, 5, 6)}", broker.Data.Type.Table, ) t = d.as_table() self.check_to_broker(t[broker.Data(True)], "42", broker.Data.Type.Integer) self.check_to_py(t[broker.Data(True)], 42) self.check_to_broker( t[broker.Data(broker.Port(22, broker.Port.TCP))], "F", broker.Data.Type.Boolean, ) self.check_to_py(t[broker.Data(broker.Port(22, broker.Port.TCP))], False) self.check_to_broker( t[broker.Data((1, 2, 3))], "(4, 5, 6)", broker.Data.Type.Vector ) self.check_to_py(t[broker.Data([1, 2, 3])], (4, 5, 6)) self.check_to_broker( t[broker.Data(broker.Count(13))], "test", broker.Data.Type.String ) self.check_to_py(t[broker.Data(broker.Count(13))], "test") def test_dict(self): self._test_table_impl(dict) def test_mapping_proxy_type(self): # Python MappingProxyType instances will convert to Broker tables, which # by default convert back to regular dicts. self._test_table_impl(types.MappingProxyType, dict) def test_vector(self): # Test an empty vector self.check_to_broker_and_back((), "()", broker.Data.Type.Vector) # Test a simple vector d = self.check_to_broker((1, 2, 3), "(1, 2, 3)", broker.Data.Type.Vector) # Either a list or a tuple are mapped to a Broker vector. d = self.check_to_broker([1, 2, 3], "(1, 2, 3)", broker.Data.Type.Vector) for i, x in enumerate(d.as_vector()): self.check_to_broker(x, str(i + 1), broker.Data.Type.Integer) self.check_to_py(x, i + 1) # Test a vector that contains various data types d = broker.Data(["foo", [[True]], ipaddress.IPv6Address("::1"), None]) v = d.as_vector() self.check_to_broker(v[0], "foo", broker.Data.Type.String) self.check_to_py(v[0], "foo") v1 = v[1].as_vector() self.check_to_broker(v1, "((T))", broker.Data.Type.Vector) self.check_to_py(v[1], ((True,),)) self.check_to_broker(v1[0], "(T)", broker.Data.Type.Vector) self.check_to_py(v1[0], (True,)) self.check_to_broker(v[2], "::1", broker.Data.Type.Address) self.check_to_py(v[2], ipaddress.IPv6Address("::1")) self.check_to_broker(v[3], "nil", broker.Data.Type.Nil) if __name__ == "__main__": unittest.main(verbosity=3)