229 lines
5.3 KiB
Python
229 lines
5.3 KiB
Python
import time
|
|
import traceback
|
|
from collections import defaultdict
|
|
from threading import Thread
|
|
|
|
from Queue import Queue
|
|
|
|
from ZeekControl import ser as json
|
|
from ZeekControl import web
|
|
from ZeekControl.zeekctl import ZeekCtl
|
|
|
|
STOP_RUNNING = object()
|
|
|
|
|
|
class TermUI:
|
|
def __init__(self):
|
|
pass
|
|
|
|
def info(self, msg):
|
|
print(msg)
|
|
|
|
warn = info
|
|
|
|
def error(self, msg):
|
|
print("ERROR", msg)
|
|
|
|
|
|
class Logs:
|
|
def __init__(self):
|
|
self.store = defaultdict(list)
|
|
|
|
def append(self, id, stream, txt):
|
|
self.store[id].append((stream, txt))
|
|
|
|
def get(self, id, since=0):
|
|
msgs = self.store.get(id) or []
|
|
return msgs[since:]
|
|
|
|
|
|
class Common:
|
|
def dump(self, *args):
|
|
return json.dumps(*args)
|
|
|
|
def load(self, msg):
|
|
return json.loads(msg)
|
|
|
|
|
|
class ZeekCtrldWorker(Thread, Common):
|
|
def __init__(self, command_queue):
|
|
self.q = Queue()
|
|
Thread.__init__(self)
|
|
self.daemon = True
|
|
self.command_queue = command_queue
|
|
self._id = None
|
|
|
|
def send(self, id, cmd, *args):
|
|
self.q.put((id, cmd, args))
|
|
|
|
def run(self):
|
|
# FIXME: deepcopy breaks here if i set ui=self
|
|
self.zeekctl = ZeekCtl(ui=TermUI())
|
|
self.zeekctl.ui = self
|
|
self.zeekctl.controller.ui = self
|
|
self.zeekctl.executor.ui = self
|
|
while True:
|
|
if self.iteration():
|
|
return
|
|
|
|
def noop(self, *args, **kwargs):
|
|
return True
|
|
|
|
def iteration(self):
|
|
id, cmd, args = self.q.get()
|
|
self._id = id
|
|
|
|
if cmd is STOP_RUNNING:
|
|
return True
|
|
|
|
func = getattr(self.zeekctl, cmd, self.noop)
|
|
|
|
def respond(r):
|
|
self.call("result", id, r)
|
|
|
|
if not hasattr(func, "api_exposed"):
|
|
return respond("invalid function")
|
|
|
|
try:
|
|
res = func(*args)
|
|
except Exception:
|
|
res = traceback.format_exc()
|
|
respond(res)
|
|
|
|
def call(self, func, *args):
|
|
self.command_queue.put((None, func, args))
|
|
|
|
def info(self, msg):
|
|
return self.call("info", self._id, msg)
|
|
|
|
def error(self, msg):
|
|
return self.call("err", self._id, msg)
|
|
|
|
warn = error
|
|
|
|
|
|
class ZeekCtld(Thread, Common):
|
|
def __init__(self, command_queue, logs):
|
|
Thread.__init__(self)
|
|
self.daemon = True
|
|
self.logs = logs
|
|
self.command_queue = command_queue
|
|
self.worker = ZeekCtrldWorker(self.command_queue)
|
|
|
|
self.results = {}
|
|
self.running = True
|
|
|
|
self.id_gen = iter(range(10000000)).next
|
|
|
|
self.init()
|
|
|
|
def init(self):
|
|
self.worker.start()
|
|
|
|
def recv(self):
|
|
msg = self.command_queue.get()
|
|
print("Received", msg)
|
|
return msg
|
|
|
|
def run(self):
|
|
return self._run()
|
|
|
|
def handle_result(self, id, result):
|
|
print(f"Got result id={id!r} result={result!r}")
|
|
self.results[id] = result
|
|
return "ok"
|
|
|
|
def handle_out(self, id, txt):
|
|
print("Got {} id={!r} result={!r}".format("out", id, txt))
|
|
self.logs.append(id, "out", txt)
|
|
return "ok"
|
|
|
|
def handle_info(self, id, txt):
|
|
print("Got {} id={!r} result={!r}".format("info", id, txt))
|
|
self.logs.append(id, "info", txt)
|
|
return "ok"
|
|
|
|
def handle_err(self, id, txt):
|
|
print("Got {} id={!r} result={!r}".format("err", id, txt))
|
|
self.logs.append(id, "err", txt)
|
|
return "ok"
|
|
|
|
def handle_getresult(self, id):
|
|
result = self.results.get(id)
|
|
if result:
|
|
del self.results[id]
|
|
print(f"sending result={result!r} for id={id!r}")
|
|
return result
|
|
|
|
def handle_getlog(self, id, since):
|
|
result = self.logs.get(id, since)
|
|
print(f"sending log={result!r} for id={id!r}")
|
|
return result
|
|
|
|
def _run(self):
|
|
while self.running:
|
|
(rq, cmd, args) = self.recv()
|
|
func = getattr(self, "handle_" + cmd, None)
|
|
if func:
|
|
res = func(*args)
|
|
if rq:
|
|
rq.put(res)
|
|
else:
|
|
t_id = self.send_to_worker(cmd, *args)
|
|
rq.put(t_id)
|
|
|
|
def send_to_worker(self, cmd, *args):
|
|
t_id = self.id_gen()
|
|
self.worker.send(t_id, cmd, *args)
|
|
print(f"started id={t_id!r} cmd={cmd!r} args={args!r}")
|
|
return t_id
|
|
|
|
|
|
class Client(Common):
|
|
def __init__(self, command_queue):
|
|
self.command_queue = command_queue
|
|
|
|
def call(self, func, *args):
|
|
rq = Queue()
|
|
self.command_queue.put((rq, func, args))
|
|
return rq.get()
|
|
|
|
def sync_call(self, func, *args):
|
|
id = self.call(func, *args)
|
|
print("got id", id)
|
|
while True:
|
|
res = self.getresult(id)
|
|
if res:
|
|
return res
|
|
time.sleep(0.1)
|
|
|
|
def result(self, id, result):
|
|
return self.call("result", id, result)
|
|
|
|
def getresult(self, id):
|
|
return self.call("getresult", id)
|
|
|
|
def getlog(self, id, since=0):
|
|
return self.call("getlog", id, since)
|
|
|
|
|
|
def main(basedir="/zeek"):
|
|
logs = Logs()
|
|
|
|
command_queue = Queue()
|
|
d = ZeekCtld(command_queue, logs)
|
|
|
|
d.start()
|
|
|
|
c = Client(command_queue)
|
|
|
|
ww = Thread(target=web.run_app, args=[c])
|
|
ww.start()
|
|
print("I Got:", c.sync_call("status"))
|
|
for x in range(20):
|
|
time.sleep(1)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|