From a0ad8f1fd75e14950d0d4fdac8333c2abbe193de Mon Sep 17 00:00:00 2001 From: Stefan `Sec` Zehl Date: Sun, 3 Nov 2024 21:39:15 +0100 Subject: [PATCH] [parser] zmq is sadly not thread-safe --- iridium-parser.py | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/iridium-parser.py b/iridium-parser.py index 0bb4a18..d1907fe 100755 --- a/iridium-parser.py +++ b/iridium-parser.py @@ -176,6 +176,8 @@ def __call__(self, parser, ns, values, option): cl=[] sl=[] +poller = None + if args.output == "zmq": import zmq @@ -187,10 +189,13 @@ def __call__(self, parser, ns, values, option): socket.bind(url) stats['clients']=0 - def zmq_thread(socket, stats): + poller = zmq.Poller() + poller.register(socket, zmq.POLLIN) + + def zmq_xpub(poller, stats): try: - while True: - event = socket.recv() + while len(rv:=poller.poll(0))>0: + event = rv[0][0].recv() # Event is one byte 0=unsub or 1=sub, followed by topic if event[0] == 1: log("new subscriber for", event[1:]) @@ -205,10 +210,6 @@ def log(*msg): s=time.strftime("%Y-%m-%d %H:%M:%S",time.localtime()) print("%s:"%s,*msg, end=eolnl, file=statsfile) - from threading import Thread - zthread = Thread(target = zmq_thread, args = [socket, stats], daemon= True, name='zmq') - zthread.start() - def stats_thread(stats): ltime=time.time() lline=0 @@ -278,12 +279,14 @@ def do_input(): stats['files']=len(args.remainder) stats['fileno']=0 for line in fileinput.input(args.remainder, openhook=openhook): - if args.do_stats: + if args.do_stats or poller is not None: if fileinput.isfirstline(): stats['fileno']+=1 stat=os.fstat(fileinput.fileno()) stats['size']=stat.st_size stats['in']+=1 + if poller is not None and len(poller.poll(0))>0: + zmq_xpub(poller, stats) if args.min_confidence is not None: q=bitsparser.Message(line.strip()) try: