Patrick Kelley 8fd444092b initial
2025-05-07 15:35:15 -04:00

130 lines
3.6 KiB
Python

# @TEST-GROUP: web-socket
#
# @TEST-PORT: BROKER_WEB_SOCKET_PORT
#
# @TEST-COPY-FILE: ${CERTS_ROOT}/ca.pem
# @TEST-COPY-FILE: ${CERTS_ROOT}/cert.1.pem
# @TEST-COPY-FILE: ${CERTS_ROOT}/key.1.enc.pem
# @TEST-COPY-FILE: ${CERTS_ROOT}/cert.2.pem
# @TEST-COPY-FILE: ${CERTS_ROOT}/key.2.pem
# @TEST-COPY-FILE: ${API_ROOT}/v1-broker-out.json
#
# @TEST-EXEC: btest-bg-run node "broker-node --config-file=../node.cfg"
# @TEST-EXEC: btest-bg-run recv "python3 ../recv.py >recv.out"
# @TEST-EXEC: $SCRIPTS/wait-for-file recv/ready 15 || (btest-bg-wait -k 1 && false)
#
# @TEST-EXEC: btest-bg-run send "python3 ../send.py"
#
# @TEST-EXEC: $SCRIPTS/wait-for-file recv/done 30 || (btest-bg-wait -k 1 && false)
# @TEST-EXEC: btest-diff recv/recv.out
#
# @TEST-EXEC: btest-bg-wait -k 1
@TEST-START-FILE node.cfg
broker {
ssl {
cafile = "../ca.pem"
certificate = "../cert.1.pem"
key = "../key.1.enc.pem"
passphrase = "12345"
}
}
topics = ["/test"]
verbose = true
@TEST-END-FILE
@TEST-START-FILE recv.py
import asyncio, websockets, os, time, json, sys, ssl
from jsonschema import validate
ws_port = os.environ['BROKER_WEB_SOCKET_PORT'].split('/')[0]
ws_url = f'wss://localhost:{ws_port}/v1/messages/json'
ssl_ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
ssl_ctx.load_verify_locations("../ca.pem")
ssl_ctx.load_cert_chain(certfile="../cert.2.pem", keyfile="../key.2.pem")
schema = json.load(open('../v1-broker-out.json'))
async def do_run():
# Try up to 30 times.
connected = False
for i in range(30):
try:
ws = await websockets.connect(ws_url, ssl=ssl_ctx)
connected = True
# send filter and wait for ack
await ws.send('["/test"]')
ack_json = await ws.recv()
try:
validate(json.loads(ack_json), schema)
except Exception as err:
print(f'received invalid ack: {err}')
sys.exit(1)
# tell btest to start the sender now
with open('ready', 'w') as f:
f.write('ready')
# dump messages to stdout (redirected to recv.out)
for i in range(10):
msg = await ws.recv()
try:
validate(json.loads(msg), schema)
except Exception as err:
print(f'received invalid data message: {err}')
sys.exit(1)
print(f'{msg}')
# tell btest we're done
with open('done', 'w') as f:
f.write('done')
await ws.close()
sys.exit()
except:
if not connected:
print(f'failed to connect to {ws_url}, try again', file=sys.stderr)
time.sleep(1)
else:
sys.exit()
loop = asyncio.get_event_loop()
loop.run_until_complete(do_run())
@TEST-END-FILE
@TEST-START-FILE send.py
import asyncio, websockets, os, json, sys, ssl
ws_port = os.environ['BROKER_WEB_SOCKET_PORT'].split('/')[0]
ws_url = f'wss://localhost:{ws_port}/v1/messages/json'
ssl_ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
ssl_ctx.load_verify_locations("../ca.pem")
ssl_ctx.load_cert_chain(certfile="../cert.2.pem", keyfile="../key.2.pem")
msg = {
'type': 'data-message',
'topic': '/test',
'@data-type': "count",
"data": 0
}
async def do_run():
async with websockets.connect(ws_url, ssl=ssl_ctx) as ws:
await ws.send('[]')
await ws.recv() # wait for ACK
for i in range(10):
msg['data'] += 1
await ws.send(json.dumps(msg))
await ws.close()
loop = asyncio.get_event_loop()
loop.run_until_complete(do_run())
@TEST-END-FILE