Skip to content

Commit

Permalink
Issue NASA-AMMOS#517 - TCP input support
Browse files Browse the repository at this point in the history
	- Adds a TCP client and server option to input streams
	- Updates docunmentation with new input specifications
	- Adds client specific tests
	- Adds additional stream tests
	- Formatting
  • Loading branch information
cjjacks committed Mar 14, 2024
1 parent 70d6ed4 commit 2cb4163
Show file tree
Hide file tree
Showing 7 changed files with 694 additions and 115 deletions.
4 changes: 2 additions & 2 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ repos:
rev: v2.6.0
hooks:
- id: reorder-python-imports
files: ^src/|test/
files: ^ait/|tests/

- repo: local
hooks:
Expand All @@ -27,7 +27,7 @@ repos:
- id: black
name: black
entry: black
files: ^src/|test/
files: ^ait/|tests/
language: system
types: [python]

Expand Down
212 changes: 203 additions & 9 deletions ait/core/server/client.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import gevent
import gevent.socket
import gevent.server as gs
import gevent.monkey
import gevent.server as gs
import gevent.socket

gevent.monkey.patch_all()

Expand All @@ -27,13 +26,12 @@ def __init__(
zmq_proxy_xpub_url=ait.SERVER_DEFAULT_XPUB_URL,
**kwargs,
):

self.context = zmq_context
# open PUB socket & connect to broker
self.pub = self.context.socket(zmq.PUB)
self.pub.connect(zmq_proxy_xsub_url.replace("*", "localhost"))
if 'listener' in kwargs and isinstance(kwargs['listener'], int) :
kwargs['listener'] = "127.0.0.1:"+str(kwargs['listener'])
if "listener" in kwargs and isinstance(kwargs["listener"], int):
kwargs["listener"] = "127.0.0.1:" + str(kwargs["listener"])
# calls gevent.Greenlet or gs.DatagramServer __init__
super(ZMQClient, self).__init__(**kwargs)

Expand Down Expand Up @@ -89,7 +87,6 @@ def __init__(
zmq_proxy_xpub_url=ait.SERVER_DEFAULT_XPUB_URL,
**kwargs,
):

super(ZMQInputClient, self).__init__(
zmq_context, zmq_proxy_xsub_url, zmq_proxy_xpub_url
)
Expand Down Expand Up @@ -134,7 +131,6 @@ def __init__(
zmq_proxy_xpub_url=ait.SERVER_DEFAULT_XPUB_URL,
**kwargs,
):

super(PortOutputClient, self).__init__(
zmq_context, zmq_proxy_xsub_url, zmq_proxy_xpub_url
)
Expand Down Expand Up @@ -162,7 +158,6 @@ def __init__(
zmq_proxy_xpub_url=ait.SERVER_DEFAULT_XPUB_URL,
**kwargs,
):

if "input" in kwargs and type(kwargs["input"][0]) is int:
super(PortInputClient, self).__init__(
zmq_context,
Expand All @@ -180,3 +175,202 @@ def handle(self, packet, address):
# This function provided for gs.DatagramServer class
log.debug("{} received message from port {}".format(self, address))
self.process(packet)


class TCPInputServer(ZMQClient, gs.StreamServer):
"""
This class is similar to PortInputClient except its TCP instead of UDP.
"""

def __init__(
self,
zmq_context,
zmq_proxy_xsub_url=ait.SERVER_DEFAULT_XSUB_URL,
zmq_proxy_xpub_url=ait.SERVER_DEFAULT_XPUB_URL,
buffer=1024,
**kwargs,
):
self.cur_socket = None
self.buffer = buffer
if "input" in kwargs:
if (
type(kwargs["input"]) not in [tuple, list]
or kwargs["input"][0].lower() != "server"
or type(kwargs["input"][1]) != int
):
raise (
ValueError(
"TCPInputServer input must be tuple|list of (str,int) e.g. ('server',1234)"
)
)

self.sub = gevent.socket.socket(
gevent.socket.AF_INET, gevent.socket.SOCK_STREAM
)
super(TCPInputServer, self).__init__(
zmq_context,
zmq_proxy_xsub_url,
zmq_proxy_xpub_url,
listener=("127.0.0.1", kwargs["input"][1]),
)
else:
raise (
ValueError(
"TCPInputServer input must be tuple|list of (str,int) e.g. ('server',1234)"
)
)

def handle(self, socket, address):
self.cur_socket = socket
with socket:
while True:
data = socket.recv(self.buffer)
if not data:
break
log.debug("{} received message from port {}".format(self, address))
self.process(data)


class TCPInputClient(ZMQClient):
"""
This class creates a TCP input client. Unlike TCPInputServer and PortInputClient,
this class will proactively initiate a connection with an input source and begin
receiving data from that source. This class does not inherit directly from gevent
servers and thus implements its own housekeeping functions. It also implements a
start function that spawns a process to stay consistent with the behavior of
TCPInputServer and PortInputClient.
"""

def __init__(
self,
zmq_context,
zmq_proxy_xsub_url=ait.SERVER_DEFAULT_XSUB_URL,
zmq_proxy_xpub_url=ait.SERVER_DEFAULT_XPUB_URL,
connection_reattempts=5,
buffer=1024,
**kwargs,
):
self.connection_reattempts = connection_reattempts
self.buffer = buffer
self.connection_status = -1
self.proc = None

if "buffer" in kwargs and type(kwargs["buffer"]) == int:
self.buffer = kwargs["buffer"]

if "input" in kwargs:
super(TCPInputClient, self).__init__(
zmq_context, zmq_proxy_xsub_url, zmq_proxy_xpub_url
)
if (
type(kwargs["input"]) not in [tuple, list]
or type(kwargs["input"][0]) != str
or type(kwargs["input"][1]) != int
):
raise (
ValueError(
"TCPInputClient 'input' must be tuple|list of (str,int) e.g. ('127.0.0.1',1234)"
)
)
self.sub = gevent.socket.socket(
gevent.socket.AF_INET, gevent.socket.SOCK_STREAM
)

self.host = kwargs["input"][0]
self.port = kwargs["input"][1]
self.address = tuple(kwargs["input"])

else:
raise (
ValueError(
"TCPInputClient 'input' must be tuple of (str,int) e.g. ('127.0.0.1',1234)"
)
)

def __exit__(self):
try:
if self.sub:
self.sub.close()
if self.proc:
self.proc.kill()
except Exception as e:
log.error(e)

def __del__(self):
try:
if self.sub:
self.sub.close()
if self.proc:
self.proc.kill()
except Exception as e:
log.error(e)

def __repr__(self):
return "<%s at %s %s>" % (
type(self).__name__,
hex(id(self)),
self._formatinfo(),
)

def __str__(self):
return "<%s %s>" % (type(self).__name__, self._formatinfo())

def start(self):
self.proc = gevent.spawn(self._client).join()

def _connect(self):
while self.connection_reattempts:
try:
res = self.sub.connect_ex((self.host, self.port))
if res == 0:
self.connection_reattempts = 5
return res
else:
self.connection_reattempts -= 1
gevent.sleep(1)
except Exception as e:
log.error(e)
self.connection_reattempts -= 1
gevent.sleep(1)

def _exit(self):
try:
if self.sub:
self.sub.close()
if self.proc:
self.proc.kill()
except Exception as e:
log.error(e)

def _client(self):
self.connection_status = self._connect()
if self.connection_status != 0:
log.error(
f"Unable to connect to client: {self.address[0]}:{self.address[1]}"
)
self._exit()
while True:
packet = self.sub.recv(self.buffer)
if not packet:
gevent.sleep(1)
log.info(
f"Trying to reconnect to client: {self.address[0]}:{self.address[1]}"
)
if self._connect() != 0:
log.error(
f"Unable to connect to client: {self.address[0]}:{self.address[1]}"
)
self._exit()
self.process(packet)

def _formatinfo(self):
result = ""
try:
if isinstance(self.address, tuple) and len(self.address) == 2:
result += "address=%s:%s" % self.address
else:
result += "address=%s" % (self.address,)
except Exception as ex:
result += str(ex) or "<error>"
return result
Loading

0 comments on commit 2cb4163

Please sign in to comment.