Skip to content

Commit

Permalink
Migrate pygstc to asyncio based sockets.
Browse files Browse the repository at this point in the history
Signed-off-by: James Hilliard <[email protected]>
  • Loading branch information
jameshilliard committed Aug 8, 2022
1 parent a06ad84 commit a2fb4ff
Show file tree
Hide file tree
Showing 32 changed files with 330 additions and 374 deletions.
154 changes: 77 additions & 77 deletions libgstc/python/pygstc/gstc.py

Large diffs are not rendered by default.

100 changes: 29 additions & 71 deletions libgstc/python/pygstc/tcp.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
# OF THE POSSIBILITY OF SUCH DAMAGE.

import json
import select
import asyncio
import socket
from contextlib import asynccontextmanager

"""
GstClient - Ipc Class
Expand All @@ -54,7 +54,7 @@ def __init__(
ip,
port,
maxsize=None,
terminator='\x00'.encode('utf-8'),
terminator=b'\x00',
):
"""
Initialize new Ipc
Expand All @@ -81,7 +81,23 @@ def __init__(
self._maxsize = maxsize
self._terminator = terminator

def send(self, line, timeout=None):
@asynccontextmanager
async def gstd_conn(self):
kwargs = {
'host': self._ip,
'port': self._port
}
if self._maxsize is not None:
kwargs['limit'] = self._maxsize
reader, writer = await asyncio.open_connection(**kwargs)
try:
yield reader, writer
finally:
if not writer.is_closing():
writer.close()
await writer.wait_closed()

async def send(self, line, timeout=None):
"""
Create a socket and sends a message through it
Expand All @@ -103,87 +119,29 @@ def send(self, line, timeout=None):
data : string
Decoded JSON string with the response
"""
data = None
self._logger.debug('GSTD socket sending line: {}'.format(line))
try:
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

s.connect((self._ip, self._port))
s.sendall(' '.join(line).encode('utf-8'))
data = self._recvall(s, timeout)
if not data:
raise socket.error("Socket read error happened")
data = data.decode('utf-8')
s.close()
return data

async with self.gstd_conn() as (reader, writer):
writer.write(' '.join(line).encode('utf-8'))
await writer.drain()
fut = reader.readuntil(separator=self._terminator)
data = await asyncio.wait_for(fut, timeout=timeout)
if not data:
raise socket.error("Socket read error happened")
data = data[:-1].decode('utf-8')
return data
except BufferError as e:
s.close()
error_msg = 'Server response too long'
self._logger.error(error_msg)
raise BufferError(error_msg)\
from e
except TimeoutError as e:
s.close()
error_msg = 'Server took too long to respond'
self._logger.error(error_msg)
raise TimeoutError(error_msg)\
from e
except socket.error as e:
s.close()
error_msg = 'Server did not respond. Is it up?'
self._logger.error(error_msg)
raise ConnectionRefusedError(error_msg)\
from e

def _recvall(self, sock, timeout):
"""
Wait for a response message from the socket
Parameters
----------
sock : string
The socket to poll
timeout : float
Timeout in seconds to wait for a response. 0: non-blocking, None: blocking
Raises
------
socket.error
Error is triggered when Gstd IPC fails
BufferError
When the incoming buffer is too big.
Returns
-------
buf : string
Raw socket response
"""
buf = b''
newbuf = ''
try:
sock.settimeout(timeout)
except socket.error as e:
raise TimeoutError from e

while True:
if (self._maxsize and self._maxsize > len(newbuf)):
raise BufferError

try:
newbuf = sock.recv(self._socket_read_size)
# Raise an exception timeout
except socket.error as e:
raise TimeoutError from e

# When a connection dies, the socket does not close properly and it
# returns immediately with an empty string. So, check that first.
if len(newbuf) == 0:
break

if self._terminator in newbuf:
buf += newbuf[:newbuf.find(self._terminator)]
break
else:
buf += newbuf
return buf
8 changes: 4 additions & 4 deletions libgstc/python/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,10 @@
'tests']),
scripts=[],
classifiers=['Development Status :: 3 - Alpha',
'Programming Language :: Python :: 3.5',
'Programming Language :: Python :: 3.6',
'Programming Language :: Python :: 3.7'],
python_requires='>=3.5',
'Programming Language :: Python :: 3.7',
'Programming Language :: Python :: 3.8',
'Programming Language :: Python :: 3.9'],
python_requires='>=3.7',
install_requires=[],
command_options={},
extras_require={},
Expand Down
38 changes: 22 additions & 16 deletions tests/libgstc/python/gstd_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,15 @@
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
# OF THE POSSIBILITY OF SUCH DAMAGE.

import asyncio
import pathlib
import socket
import subprocess
import unittest


DEFAULT_TEAR_DOWN_TIMEOUT = 1

class GstdTestRunner(unittest.TestCase):
class GstdTestRunner(unittest.IsolatedAsyncioTestCase):

def get_open_port(self):
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
Expand All @@ -48,24 +48,30 @@ def get_open_port(self):
s.close()
return port

def setUp(self):
async def asyncSetUp(self):
self.port = self.get_open_port()
self.gstd_path = (pathlib.Path(__file__).parent.parent.parent.parent
.joinpath('gstd').joinpath('gstd').resolve())
self.gstd = subprocess.Popen([self.gstd_path, '-p', str(self.port)])
connected = -1
while connected != 0:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
connected = sock.connect_ex(("", self.port))
sock.close()
self.gstd = await asyncio.create_subprocess_exec(self.gstd_path, '-p', str(self.port))
asyncio.get_event_loop().call_later(5, self.gstd.kill)
connected = False
while not connected:
try:
reader, writer = await asyncio.open_connection(port=self.port)
writer.close()
await writer.wait_closed()
connected = True
except OSError:
pass

def tearDown(self):
self.gstd.terminate()
try:
self.gstd.wait(DEFAULT_TEAR_DOWN_TIMEOUT)
except subprocess.TimeoutExpired:
self.gstd.kill()
self.gstd.wait()
async def asyncTearDown(self):
if self.gstd.returncode is None:
self.gstd.terminate()
try:
await asyncio.wait_for(self.gstd.wait(), timeout=DEFAULT_TEAR_DOWN_TIMEOUT)
except asyncio.TimeoutError:
self.gstd.kill()
await self.gstd.wait()


if __name__ == '__main__':
Expand Down
16 changes: 8 additions & 8 deletions tests/libgstc/python/test_libgstc_python_bus_filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,18 +38,18 @@

class TestGstcBusFilterMethods(GstdTestRunner):

def test_bus_filter_eos(self):
async def test_bus_filter_eos(self):
pipeline = 'videotestsrc name=v0 ! fakesink'
self.gstd_logger = CustomLogger('test_libgstc', loglevel='DEBUG')
self.gstd_client = GstdClient(port=self.port, logger=self.gstd_logger)
self.gstd_client.pipeline_create('p0', pipeline)
self.gstd_client.pipeline_play('p0')
self.gstd_client.event_eos('p0')
self.gstd_client.bus_filter('p0', 'eos')
ret = self.gstd_client.bus_read('p0')
await self.gstd_client.pipeline_create('p0', pipeline)
await self.gstd_client.pipeline_play('p0')
await self.gstd_client.event_eos('p0')
await self.gstd_client.bus_filter('p0', 'eos')
ret = await self.gstd_client.bus_read('p0')
self.assertEqual(ret['type'], 'eos')
self.gstd_client.pipeline_stop('p0')
self.gstd_client.pipeline_delete('p0')
await self.gstd_client.pipeline_stop('p0')
await self.gstd_client.pipeline_delete('p0')


if __name__ == '__main__':
Expand Down
32 changes: 16 additions & 16 deletions tests/libgstc/python/test_libgstc_python_bus_timeout.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,32 +38,32 @@

class TestGstcBusTimeoutMethods(GstdTestRunner):

def test_bus_timeout_eos(self):
async def test_bus_timeout_eos(self):
pipeline = 'videotestsrc name=v0 ! fakesink'
self.gstd_logger = CustomLogger('test_libgstc', loglevel='DEBUG')
self.gstd_client = GstdClient(port=self.port, logger=self.gstd_logger)
self.gstd_client.pipeline_create('p0', pipeline)
self.gstd_client.pipeline_play('p0')
self.gstd_client.event_eos('p0')
self.gstd_client.bus_filter('p0', 'eos')
self.gstd_client.bus_timeout('p0', 1000)
ret = self.gstd_client.bus_read('p0')
await self.gstd_client.pipeline_create('p0', pipeline)
await self.gstd_client.pipeline_play('p0')
await self.gstd_client.event_eos('p0')
await self.gstd_client.bus_filter('p0', 'eos')
await self.gstd_client.bus_timeout('p0', 1000)
ret = await self.gstd_client.bus_read('p0')
if ret:
self.assertEqual(ret['type'], 'eos')
self.gstd_client.pipeline_stop('p0')
self.gstd_client.pipeline_delete('p0')
await self.gstd_client.pipeline_stop('p0')
await self.gstd_client.pipeline_delete('p0')

def test_bus_timeout_no_response(self):
async def test_bus_timeout_no_response(self):
pipeline = 'videotestsrc name=v0 ! fakesink'
self.gstd_logger = CustomLogger('test_libgstc', loglevel='DEBUG')
self.gstd_client = GstdClient(port=self.port, logger=self.gstd_logger)
self.gstd_client.pipeline_create('p0', pipeline)
self.gstd_client.pipeline_play('p0')
self.gstd_client.bus_timeout('p0', 1000)
ret = self.gstd_client.bus_read('p0')
await self.gstd_client.pipeline_create('p0', pipeline)
await self.gstd_client.pipeline_play('p0')
await self.gstd_client.bus_timeout('p0', 1000)
ret = await self.gstd_client.bus_read('p0')
self.assertEqual(ret, None)
self.gstd_client.pipeline_stop('p0')
self.gstd_client.pipeline_delete('p0')
await self.gstd_client.pipeline_stop('p0')
await self.gstd_client.pipeline_delete('p0')


if __name__ == '__main__':
Expand Down
14 changes: 7 additions & 7 deletions tests/libgstc/python/test_libgstc_python_create.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,24 +38,24 @@

class TestGstcCreateMethods(GstdTestRunner):

def test_create_pipeline(self):
async def test_create_pipeline(self):
pipeline = 'videotestsrc name=v0 ! fakesink'
self.gstd_logger = CustomLogger('test_libgstc', loglevel='DEBUG')
self.gstd_client = GstdClient(port=self.port, logger=self.gstd_logger)
ret = self.gstd_client.read('pipelines')
ret = await self.gstd_client.read('pipelines')
initial_n_pipes = len(ret['nodes'])
self.gstd_client.create('pipelines', 'p0', pipeline)
ret = self.gstd_client.read('pipelines')
await self.gstd_client.create('pipelines', 'p0', pipeline)
ret = await self.gstd_client.read('pipelines')
final_n_pipes = len(ret['nodes'])
self.assertEqual(final_n_pipes, initial_n_pipes + 1)
self.gstd_client.pipeline_delete('p0')
await self.gstd_client.pipeline_delete('p0')

def test_create_bad_pipeline(self):
async def test_create_bad_pipeline(self):
pipeline = 'source sink'
self.gstd_logger = CustomLogger('test_libgstc', loglevel='DEBUG')
self.gstd_client = GstdClient(port=self.port, logger=self.gstd_logger)
with self.assertRaises(GstdError):
self.gstd_client.create('pipelines', 'p0', pipeline)
await self.gstd_client.create('pipelines', 'p0', pipeline)


if __name__ == '__main__':
Expand Down
8 changes: 4 additions & 4 deletions tests/libgstc/python/test_libgstc_python_debug_color.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,15 @@

class TestGstcDebugColorMethods(GstdTestRunner):

def test_debug_color_true(self):
async def test_debug_color_true(self):
self.gstd_logger = CustomLogger('test_libgstc', loglevel='DEBUG')
self.gstd_client = GstdClient(port=self.port, logger=self.gstd_logger)
self.gstd_client.debug_color(True)
await self.gstd_client.debug_color(True)

def test_debug_color_false(self):
async def test_debug_color_false(self):
self.gstd_logger = CustomLogger('test_libgstc', loglevel='DEBUG')
self.gstd_client = GstdClient(port=self.port, logger=self.gstd_logger)
self.gstd_client.debug_color(False)
await self.gstd_client.debug_color(False)


if __name__ == '__main__':
Expand Down
8 changes: 4 additions & 4 deletions tests/libgstc/python/test_libgstc_python_debug_enable.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,15 @@

class TestGstcDebugEnableMethods(GstdTestRunner):

def test_debug_enable_true(self):
async def test_debug_enable_true(self):
self.gstd_logger = CustomLogger('test_libgstc', loglevel='DEBUG')
self.gstd_client = GstdClient(port=self.port, logger=self.gstd_logger)
self.gstd_client.debug_enable(True)
await self.gstd_client.debug_enable(True)

def test_debug_enable_false(self):
async def test_debug_enable_false(self):
self.gstd_logger = CustomLogger('test_libgstc', loglevel='DEBUG')
self.gstd_client = GstdClient(port=self.port, logger=self.gstd_logger)
self.gstd_client.debug_enable(False)
await self.gstd_client.debug_enable(False)


if __name__ == '__main__':
Expand Down
8 changes: 4 additions & 4 deletions tests/libgstc/python/test_libgstc_python_debug_reset.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,15 @@

class TestGstcDebugResetMethods(GstdTestRunner):

def test_debug_reset_true(self):
async def test_debug_reset_true(self):
self.gstd_logger = CustomLogger('test_libgstc', loglevel='DEBUG')
self.gstd_client = GstdClient(port=self.port, logger=self.gstd_logger)
self.gstd_client.debug_reset(True)
await self.gstd_client.debug_reset(True)

def test_debug_reset_false(self):
async def test_debug_reset_false(self):
self.gstd_logger = CustomLogger('test_libgstc', loglevel='DEBUG')
self.gstd_client = GstdClient(port=self.port, logger=self.gstd_logger)
self.gstd_client.debug_reset(False)
await self.gstd_client.debug_reset(False)


if __name__ == '__main__':
Expand Down
Loading

0 comments on commit a2fb4ff

Please sign in to comment.