diff --git a/.github/workflows/code-qa.yaml b/.github/workflows/code-qa.yaml index efe9cdd..d76f99f 100644 --- a/.github/workflows/code-qa.yaml +++ b/.github/workflows/code-qa.yaml @@ -15,7 +15,7 @@ jobs: uses: actions/checkout@v4 - name: Install deps run: | - pip install mypy==1.8.0 pylint==3.2.5 ruff==0.1.14 remotivelabs-broker>=0.1.8 pytest + pip install mypy==1.8.0 pylint==3.2.5 ruff==0.1.14 pylint-protobuf==0.22.0 remotivelabs-broker>=0.1.8 pytest - name: Run lint run: | cd python diff --git a/python/.gitignore b/python/.gitignore index 0cbb74e..56c42f8 100644 --- a/python/.gitignore +++ b/python/.gitignore @@ -1,3 +1,4 @@ .pyc .venv __pycache__ +.* \ No newline at end of file diff --git a/python/=0.1.8 b/python/=0.1.8 new file mode 100644 index 0000000..fc554dd --- /dev/null +++ b/python/=0.1.8 @@ -0,0 +1,38 @@ +Requirement already satisfied: mypy==1.8.0 in /mnt/c/Users/Oscar Gren/Documents/RemotiveLabs/lib/python3.10/site-packages (1.8.0) +Requirement already satisfied: pylint==3.2.5 in /mnt/c/Users/Oscar Gren/Documents/RemotiveLabs/lib/python3.10/site-packages (3.2.5) +Requirement already satisfied: ruff==0.1.14 in /mnt/c/Users/Oscar Gren/Documents/RemotiveLabs/lib/python3.10/site-packages (0.1.14) +Requirement already satisfied: pylint-protobuf==0.22.0 in /mnt/c/Users/Oscar Gren/Documents/RemotiveLabs/lib/python3.10/site-packages (0.22.0) +Requirement already satisfied: remotivelabs-broker in /mnt/c/Users/Oscar Gren/Documents/RemotiveLabs/lib/python3.10/site-packages (0.1.26) +Requirement already satisfied: pytest in /mnt/c/Users/Oscar Gren/Documents/RemotiveLabs/lib/python3.10/site-packages (8.2.2) +Requirement already satisfied: tomli>=1.1.0 in /mnt/c/Users/Oscar Gren/Documents/RemotiveLabs/lib/python3.10/site-packages (from mypy==1.8.0) (2.0.1) +Requirement already satisfied: typing-extensions>=4.1.0 in /mnt/c/Users/Oscar Gren/Documents/RemotiveLabs/lib/python3.10/site-packages (from mypy==1.8.0) (4.12.2) +Requirement already satisfied: mypy-extensions>=1.0.0 in /mnt/c/Users/Oscar Gren/Documents/RemotiveLabs/lib/python3.10/site-packages (from mypy==1.8.0) (1.0.0) +Requirement already satisfied: dill>=0.2 in /mnt/c/Users/Oscar Gren/Documents/RemotiveLabs/lib/python3.10/site-packages (from pylint==3.2.5) (0.3.8) +Requirement already satisfied: astroid<=3.3.0-dev0,>=3.2.2 in /mnt/c/Users/Oscar Gren/Documents/RemotiveLabs/lib/python3.10/site-packages (from pylint==3.2.5) (3.2.2) +Requirement already satisfied: platformdirs>=2.2.0 in /mnt/c/Users/Oscar Gren/Documents/RemotiveLabs/lib/python3.10/site-packages (from pylint==3.2.5) (4.2.2) +Requirement already satisfied: mccabe<0.8,>=0.6 in /mnt/c/Users/Oscar Gren/Documents/RemotiveLabs/lib/python3.10/site-packages (from pylint==3.2.5) (0.7.0) +Requirement already satisfied: isort!=5.13.0,<6,>=4.2.5 in /mnt/c/Users/Oscar Gren/Documents/RemotiveLabs/lib/python3.10/site-packages (from pylint==3.2.5) (5.13.2) +Requirement already satisfied: tomlkit>=0.10.1 in /mnt/c/Users/Oscar Gren/Documents/RemotiveLabs/lib/python3.10/site-packages (from pylint==3.2.5) (0.12.5) +Requirement already satisfied: protobuf in /mnt/c/Users/Oscar Gren/Documents/RemotiveLabs/lib/python3.10/site-packages (from pylint-protobuf==0.22.0) (5.27.2) +Requirement already satisfied: requests~=2.21 in /mnt/c/Users/Oscar Gren/Documents/RemotiveLabs/lib/python3.10/site-packages (from remotivelabs-broker) (2.32.3) +Requirement already satisfied: grpc-stubs~=1.53.0.5 in /mnt/c/Users/Oscar Gren/Documents/RemotiveLabs/lib/python3.10/site-packages (from remotivelabs-broker) (1.53.0.5) +Collecting protobuf + Using cached protobuf-3.20.1-cp310-cp310-manylinux_2_12_x86_64.manylinux2010_x86_64.whl (1.1 MB) +Requirement already satisfied: grpcio~=1.44 in /mnt/c/Users/Oscar Gren/Documents/RemotiveLabs/lib/python3.10/site-packages (from remotivelabs-broker) (1.64.1) +Requirement already satisfied: types-protobuf~=4.24.0.20240106 in /mnt/c/Users/Oscar Gren/Documents/RemotiveLabs/lib/python3.10/site-packages (from remotivelabs-broker) (4.24.0.20240408) +Requirement already satisfied: grpc-interceptor~=0.14 in /mnt/c/Users/Oscar Gren/Documents/RemotiveLabs/lib/python3.10/site-packages (from remotivelabs-broker) (0.15.4) +Requirement already satisfied: mypy-protobuf~=3.3.0 in /mnt/c/Users/Oscar Gren/Documents/RemotiveLabs/lib/python3.10/site-packages (from remotivelabs-broker) (3.3.0) +Requirement already satisfied: iniconfig in /mnt/c/Users/Oscar Gren/Documents/RemotiveLabs/lib/python3.10/site-packages (from pytest) (2.0.0) +Requirement already satisfied: exceptiongroup>=1.0.0rc8 in /mnt/c/Users/Oscar Gren/Documents/RemotiveLabs/lib/python3.10/site-packages (from pytest) (1.2.1) +Requirement already satisfied: pluggy<2.0,>=1.5 in /mnt/c/Users/Oscar Gren/Documents/RemotiveLabs/lib/python3.10/site-packages (from pytest) (1.5.0) +Requirement already satisfied: packaging in /mnt/c/Users/Oscar Gren/Documents/RemotiveLabs/lib/python3.10/site-packages (from pytest) (24.1) +Requirement already satisfied: certifi>=2017.4.17 in /mnt/c/Users/Oscar Gren/Documents/RemotiveLabs/lib/python3.10/site-packages (from requests~=2.21->remotivelabs-broker) (2024.6.2) +Requirement already satisfied: idna<4,>=2.5 in /mnt/c/Users/Oscar Gren/Documents/RemotiveLabs/lib/python3.10/site-packages (from requests~=2.21->remotivelabs-broker) (3.7) +Requirement already satisfied: urllib3<3,>=1.21.1 in /mnt/c/Users/Oscar Gren/Documents/RemotiveLabs/lib/python3.10/site-packages (from requests~=2.21->remotivelabs-broker) (2.2.2) +Requirement already satisfied: charset-normalizer<4,>=2 in /mnt/c/Users/Oscar Gren/Documents/RemotiveLabs/lib/python3.10/site-packages (from requests~=2.21->remotivelabs-broker) (3.3.2) +Installing collected packages: protobuf + Attempting uninstall: protobuf + Found existing installation: protobuf 5.27.2 + Uninstalling protobuf-5.27.2: + Successfully uninstalled protobuf-5.27.2 +Successfully installed protobuf-3.20.1 diff --git a/python/cloud-demo/README.md b/python/cloud-demo/README.md index 9dab699..186c60c 100644 --- a/python/cloud-demo/README.md +++ b/python/cloud-demo/README.md @@ -17,7 +17,7 @@ Python 3 is required ``` pip3 install -r requirements.txt -python3 cloud-demo.py \ +python3 cloud_demo.py \ --url \ --api-key \ --signals Speed,SteeringWheel_Position,Accelerator_PedalPosition diff --git a/python/cloud-demo/cloud-demo.py b/python/cloud-demo/cloud-demo.py deleted file mode 100644 index 6dcabbc..0000000 --- a/python/cloud-demo/cloud-demo.py +++ /dev/null @@ -1,77 +0,0 @@ -import threading -import time -import signal as signals - -from lib.broker import Broker -from lib import arguments -import sys - -""" -Simple program designed to be used with our cloud demo. -It is expected that you have followed the steps at -https://demo.remotivelabs.com and started a broker + uploaded -a recording. - -Once you complete the stages in our cloud-demo you will get all -information required to run this program. It will look something like: - -python3 cloud-demo.py \ - --url \ - --api-key \ - --signals VehicleSpeed,ChassisAcceleratorPedalposition - -""" - -expected_available_signals = ['VehicleSpeed', 'ChassisSteeringwheelAngle', 'ChassisAcceleratorPedalposition', - 'VehicleCurrentlocationLongitude', 'VehicleCurrentlocationLatitude'] - - -def print_signals(frame): - for s in frame: - print(s) - - -def main(args): - print(f"Connecting to {args.url}") - - broker = Broker(args.url, args.api_key, args.access_token) - - print("Listing available signals") - available_signals = broker.list_signal_names() - for signal in available_signals: - print(f" - {signal}") - - # Sanity check so we are running against the expected recording - if available_signals != expected_available_signals: - print( - 'It does not look like you have started the demo recording in cloud. \n' - 'Make sure you play turning-torso-drivecycle.zip on this broker from ' - 'https://demo.remotivelabs.com/p/demo/brokers') - exit(0) - - # Start subscribe to selected signals - subscription = broker.subscribe( - signals=args.signals, - on_frame=print_signals, - changed_values_only=True) - - # play demo recording - broker.play(namespace="custom_can", path="turning-torso-drivecycle.zip") - - # Wait 20 seconds and then shutdown. - # Remove the lines below to just have it running - sleep(seconds=20) - - print("Cancelling subscription (20 secs or ctr-c, you can change this to just keep it running)") - subscription.cancel() - - -def sleep(seconds): - lock = threading.Event() - signals.signal(signals.SIGINT, lambda signum, frame: lock.set()) - lock.wait(timeout=seconds) - - -if __name__ == "__main__": - args = arguments.parse(sys.argv[1:]) - main(args) diff --git a/python/cloud-demo/cloud_demo.py b/python/cloud-demo/cloud_demo.py new file mode 100644 index 0000000..cdf1ac8 --- /dev/null +++ b/python/cloud-demo/cloud_demo.py @@ -0,0 +1,78 @@ +import signal as signals +import sys +import threading +from argparse import Namespace +from typing import Any + +from lib import arguments +from lib.broker import Broker + +# Simple program designed to be used with our cloud demo. +# It is expected that you have followed the steps at +# https://demo.remotivelabs.com and started a broker + uploaded +# a recording. + +# Once you complete the stages in our cloud-demo you will get all +# information required to run this program. It will look something like: + +# python3 cloud_demo.py \ +# --url \ +# --api-key \ +# --signals VehicleSpeed,ChassisAcceleratorPedalposition + +expected_available_signals = [ + "VehicleSpeed", + "ChassisSteeringwheelAngle", + "ChassisAcceleratorPedalposition", + "VehicleCurrentlocationLongitude", + "VehicleCurrentlocationLatitude", +] + + +def print_signals(frame: Any) -> None: + for s in frame: + print(s) + + +def main(argv: Namespace) -> None: + print(f"Connecting to {argv.url}") + + broker = Broker(argv.url, argv.api_key, argv.access_token) + + print("Listing available signals") + available_signals = broker.list_signal_names() + for signal in available_signals: + print(f" - {signal}") + + # Sanity check so we are running against the expected recording + if available_signals != expected_available_signals: + print( + "It does not look like you have started the demo recording in cloud. \n" + "Make sure you play turning-torso-drivecycle.zip on this broker from " + "https://demo.remotivelabs.com/p/demo/brokers" + ) + sys.exit(0) + + # Start subscribe to selected signals + subscription = broker.subscribe(signals=argv.signals, on_frame=print_signals, changed_values_only=True) + + # play demo recording + broker.play(namespace="custom_can", path="turning-torso-drivecycle.zip") + + # Wait 20 seconds and then shutdown. + # Remove the lines below to just have it running + sleep(seconds=20) + + print("Cancelling subscription (20 secs or ctr-c, you can change this to just keep it running)") + subscription.cancel() + + +def sleep(seconds: float) -> None: + lock = threading.Event() + signals.signal(signals.SIGINT, lambda signum, frame: lock.set()) + lock.wait(timeout=seconds) + + +if __name__ == "__main__": + args = arguments.parse() + main(args) diff --git a/python/cloud-demo/lib/__init__.py b/python/cloud-demo/lib/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/python/cloud-demo/lib/arguments.py b/python/cloud-demo/lib/arguments.py index c17576d..09506c2 100644 --- a/python/cloud-demo/lib/arguments.py +++ b/python/cloud-demo/lib/arguments.py @@ -1,7 +1,9 @@ +from __future__ import annotations + import argparse -def parse(argv): +def parse() -> argparse.Namespace: parser = argparse.ArgumentParser(description="Provide address to RemotiveBroker") parser.add_argument( @@ -18,7 +20,7 @@ def parse(argv): type=str, help="API key is required when accessing brokers running in the cloud", required=False, - default=None + default=None, ) parser.add_argument( @@ -36,7 +38,7 @@ def parse(argv): required=False, help="Comma separated list of signal names to subscribe on", default="Speed", - type=lambda s: [item for item in s.split(',')] + type=lambda s: list(s.split(",")), ) args = parser.parse_args() diff --git a/python/cloud-demo/lib/broker.py b/python/cloud-demo/lib/broker.py index c598ff6..02b6718 100644 --- a/python/cloud-demo/lib/broker.py +++ b/python/cloud-demo/lib/broker.py @@ -1,17 +1,20 @@ +from __future__ import annotations + import binascii import queue from threading import Thread -import remotivelabs.broker.sync as br +from typing import Any, Callable -from typing import Optional +import remotivelabs.broker.sync as br +from typing_extensions import Self +# pylint: disable=R0902 class Broker: - - def __init__(self, url, api_key:str = Optional[None], access_token:str = Optional[None]): + def __init__(self, url: str, api_key: str | None = None, access_token: str | None = None) -> None: self.url = url self.api_key = api_key - self.q = queue.Queue() + self.q: queue.Queue[Any] = queue.Queue() """Main function, checking arguments passed to script, setting up stubs, configuration and starting Threads.""" # Setting up stubs and configuration self.intercept_channel = br.create_channel(url, api_key, access_token) @@ -21,37 +24,35 @@ def __init__(self, url, api_key:str = Optional[None], access_token:str = Optiona self.traffic_stub = br.traffic_api_pb2_grpc.TrafficServiceStub(self.intercept_channel) self.signal_creator = br.SignalCreator(self.system_stub) - def play(self, namespace: str, path: str): + def play(self, namespace: str, path: str) -> None: playback_list = [ { "namespace": namespace, "path": path, "mode": br.traffic_api_pb2.Mode.PLAY, - }] + } + ] - status = self.traffic_stub.PlayTraffic( - br.traffic_api_pb2.PlaybackInfos( - playbackInfo=list(map(self.__create_playback_config, playback_list)) - ) + self.traffic_stub.PlayTraffic( + br.traffic_api_pb2.PlaybackInfos(playbackInfo=list(map(self.__create_playback_config, playback_list))) ) - def list_signal_names(self): + def list_signal_names(self) -> list[str]: # Lists available signals configuration = self.system_stub.GetConfiguration(br.common_pb2.Empty()) signal_names = [] - for networkInfo in configuration.networkInfo: - res = self.system_stub.ListSignals(networkInfo.namespace) + for network_info in configuration.networkInfo: + res = self.system_stub.ListSignals(network_info.namespace) for finfo in res.frame: for sinfo in finfo.childInfo: signal_names.append(sinfo.id.name) return signal_names - def subscribe(self, signals: list, on_frame, changed_values_only: bool = True): + def subscribe(self, signals: list[br.network_api_pb2.Signal], on_frame: Callable[..., None], changed_values_only: bool = True) -> Any: client_id = br.common_pb2.ClientId(id="cloud_demo") - signals_to_subscribe_on = \ - map(lambda signal: self.signal_creator.signal(signal, "custom_can"), signals) + signals_to_subscribe_on = map(lambda signal: self.signal_creator.signal(signal, "custom_can"), signals) Thread( target=br.act_on_signal, @@ -65,35 +66,31 @@ def subscribe(self, signals: list, on_frame, changed_values_only: bool = True): ), ).start() # Wait for subscription - ecu, subscription = self.q.get() + _, subscription = self.q.get() return subscription @classmethod - def connect(cls, url, api_key: str = Optional[None], access_token: str = Optional[None]): - return Broker(url, api_key, access_token) + def connect(cls, url: str, api_key: str | None = None, access_token: str | None = None) -> Self: + return Broker(url, api_key, access_token) # type: ignore - def __each_signal(self, signals, callback): - callback(map(lambda s: { - 'timestamp_nanos': s.timestamp, - 'name': s.id.name, - 'value': self.__get_value(s) - }, signals)) + def __each_signal(self, signals: br.network_api_pb2.Signals, callback: Callable[..., Any]) -> None: + callback(map(lambda s: {"timestamp_nanos": s.timestamp, "name": s.id.name, "value": self.__get_value(s)}, signals)) @staticmethod - def __get_value(signal): + def __get_value(signal: br.network_api_pb2.Signal) -> Any: if signal.raw != b"": return "0x" + binascii.hexlify(signal.raw).decode("ascii") - elif signal.HasField("integer"): + if signal.HasField("integer"): return signal.integer - elif signal.HasField("double"): + if signal.HasField("double"): return signal.double - elif signal.HasField("arbitration"): + if signal.HasField("arbitration"): return signal.arbitration - else: - return "empty" + + return "empty" @staticmethod - def __create_playback_config(item): + def __create_playback_config(item: dict[str, Any]) -> br.traffic_api_pb2.PlaybackInfo: """Creating configuration for playback Parameters @@ -107,14 +104,14 @@ def __create_playback_config(item): Object instance of class """ - playbackConfig = br.traffic_api_pb2.PlaybackConfig( + playback_config = br.traffic_api_pb2.PlaybackConfig( fileDescription=br.system_api_pb2.FileDescription(path=item["path"]), namespace=br.common_pb2.NameSpace(name=item["namespace"]), ) return br.traffic_api_pb2.PlaybackInfo( - playbackConfig=playbackConfig, + playbackConfig=playback_config, playbackMode=br.traffic_api_pb2.PlaybackMode(mode=item["mode"]), ) -Broker.connect = classmethod(Broker.connect) +Broker.connect = classmethod(Broker.connect) # type: ignore diff --git a/python/playback-record/playback.py b/python/playback-record/playback.py index b190f6b..eb46716 100644 --- a/python/playback-record/playback.py +++ b/python/playback-record/playback.py @@ -1,17 +1,14 @@ +from __future__ import annotations + import argparse -import getopt -import grpc -import os -import signal -import sys +import signal as sig import time +from threading import Event +from typing import Any, Optional +import grpc import remotivelabs.broker.sync as br -from threading import Thread, Timer, Event -from typing import Optional - - exit_event = Event() playbacklist = [ @@ -32,7 +29,8 @@ }, ] -def read_signal(stub, signal): + +def read_signal(stub: br.network_api_pb2_grpc.NetworkServiceStub, signal: br.common_pb2.SignalId) -> br.network_api_pb2.Signals: """Read signals Parameters @@ -52,7 +50,7 @@ def read_signal(stub, signal): return stub.ReadSignals(read_info) -def ecu_B_read(stub, pause): +def ecu_b_read(stub: br.network_api_pb2_grpc.NetworkServiceStub, pause: int) -> None: """Read some value published by ecu_A Parameters @@ -65,19 +63,17 @@ def ecu_B_read(stub, pause): """ while not exit_event.is_set(): namespace = "custom_can" - client_id = br.common_pb2.ClientId(id="id_ecu_B") + # client_id = br.common_pb2.ClientId(id="id_ecu_B") # Read value 'SteerAngle' - steer_angle = br.common_pb2.SignalId( - name="SteerAngle", namespace=br.common_pb2.NameSpace(name=namespace) - ) + steer_angle = br.common_pb2.SignalId(name="SteerAngle", namespace=br.common_pb2.NameSpace(name=namespace)) response = read_signal(stub, steer_angle) print("ecu_B, (read) SteerAngle is ", response.signal[0].double) time.sleep(pause) -def ecu_B_subscribe_(stub): +def ecu_b_subscribe_(stub: br.network_api_pb2_grpc.NetworkServiceStub) -> None: """Subscribe to a value published by ecu_A and output value Parameters @@ -91,9 +87,7 @@ def ecu_B_subscribe_(stub): client_id = br.common_pb2.ClientId(id="id_ecu_B") # Subscribe to value 'SteerAngle' - steer_angle = br.common_pb2.SignalId( - name="SteerAngle", namespace=br.common_pb2.NameSpace(name=namespace) - ) + steer_angle = br.common_pb2.SignalId(name="SteerAngle", namespace=br.common_pb2.NameSpace(name=namespace)) sub_info = br.network_api_pb2.SubscriberConfig( clientId=client_id, signals=br.network_api_pb2.SignalIds(signalId=[steer_angle]), @@ -107,11 +101,11 @@ def ecu_B_subscribe_(stub): if exit_event.is_set(): break print("ecu_B, (subscribe) SteerAngle is ", subs_counter.signal[0]) - except grpc._channel._Rendezvous as err: + except grpc.RpcError as err: print(err) -def read_on_timer(stub, signals, pause): +def read_on_timer(stub: br.network_api_pb2_grpc.NetworkServiceStub, signals: br.network_api_pb2.Signals, pause: int) -> None: """Simple reading with timer, logs on purpose tabbed with double space Parameters @@ -129,16 +123,14 @@ def read_on_timer(stub, signals, pause): try: response = stub.ReadSignals(read_info) for signal in response.signal: - print( - " read_on_timer " + signal.id.name + " value " + str(signal.double) - ) - except grpc._channel._Rendezvous as err: + print(" read_on_timer " + signal.id.name + " value " + str(signal.double)) + except grpc.RpcError as err: print(err) time.sleep(pause) -def create_playback_config(item): +def create_playback_config(item: dict[str, Any]) -> br.traffic_api_pb2.PlaybackInfo: """Creating configuration for playback Parameters @@ -152,38 +144,34 @@ def create_playback_config(item): Object instance of class """ - playbackConfig = br.traffic_api_pb2.PlaybackConfig( + playback_config = br.traffic_api_pb2.PlaybackConfig( fileDescription=br.system_api_pb2.FileDescription(path=item["path"]), namespace=br.common_pb2.NameSpace(name=item["namespace"]), ) return br.traffic_api_pb2.PlaybackInfo( - playbackConfig=playbackConfig, + playbackConfig=playback_config, playbackMode=br.traffic_api_pb2.PlaybackMode(mode=item["mode"]), ) -def stop_playback(url, x_api_key, access_token): +def stop_playback(url: str, x_api_key: str | None, access_token: str | None) -> None: """Stop ongoing playback""" intercept_channel = br.create_channel(url, x_api_key, access_token) traffic_stub = br.traffic_api_pb2_grpc.TrafficServiceStub(intercept_channel) for playback in playbacklist: playback["mode"] = br.traffic_api_pb2.Mode.STOP - status = traffic_stub.PlayTraffic( - br.traffic_api_pb2.PlaybackInfos( - playbackInfo=list(map(create_playback_config, playbacklist)) - ) - ) + status = traffic_stub.PlayTraffic(br.traffic_api_pb2.PlaybackInfos(playbackInfo=list(map(create_playback_config, playbacklist)))) print("Stop traffic status is ", status) -def exit_handler(url, x_api_key, access_token): +def exit_handler(url: str, x_api_key: str | None, access_token: str | None) -> None: exit_event.set() time.sleep(0.5) stop_playback(url, x_api_key, access_token) -def main(argv): +def main() -> None: parser = argparse.ArgumentParser(description="Provide address to Beambroker") parser.add_argument( "-url", @@ -200,7 +188,7 @@ def main(argv): type=str, help="API key is required when accessing brokers running in the cloud", required=False, - default=None + default=None, ) parser.add_argument( @@ -218,7 +206,7 @@ def main(argv): type=str, metavar="DIRECTORY", help="Configure broker with specified configuration directory", - default="configuration_custom_udp" + default="configuration_custom_udp", ) args = parser.parse_args() @@ -226,16 +214,13 @@ def main(argv): run(args.url, args.configure, args.x_api_key, args.access_token) -def run(url: str, - configure:str, - x_api_key: Optional[str] = None, - access_token: Optional[str] = None): +def run(url: str, configure: str, x_api_key: Optional[str] = None, access_token: Optional[str] = None) -> None: # To do a clean exit of the script on CTRL+C - signal.signal(signal.SIGINT, lambda signum, frame: exit_handler(url, x_api_key, access_token)) + sig.signal(sig.SIGINT, lambda signum, frame: exit_handler(url, x_api_key, access_token)) # Setting up stubs and configuration intercept_channel = br.create_channel(url, x_api_key, access_token) - network_stub = br.network_api_pb2_grpc.NetworkServiceStub(intercept_channel) + # network_stub = br.network_api_pb2_grpc.NetworkServiceStub(intercept_channel) traffic_stub = br.traffic_api_pb2_grpc.TrafficServiceStub(intercept_channel) system_stub = br.system_api_pb2_grpc.SystemServiceStub(intercept_channel) br.check_license(system_stub) @@ -247,11 +232,11 @@ def run(url: str, # Lists available signals configuration = system_stub.GetConfiguration(br.common_pb2.Empty()) - for networkInfo in configuration.networkInfo: + for network_info in configuration.networkInfo: print( "signals in namespace ", - networkInfo.namespace.name, - system_stub.ListSignals(networkInfo.namespace), + network_info.namespace.name, + system_stub.ListSignals(network_info.namespace), ) # Optonally start threads @@ -280,19 +265,11 @@ def run(url: str, "mode": br.traffic_api_pb2.Mode.RECORD, }, ] - status_record = traffic_stub.PlayTraffic( - br.traffic_api_pb2.PlaybackInfos( - playbackInfo=list(map(create_playback_config, recordlist)) - ) - ) + status_record = traffic_stub.PlayTraffic(br.traffic_api_pb2.PlaybackInfos(playbackInfo=list(map(create_playback_config, recordlist)))) print("record traffic result is ", status_record) # expect candump_.log does not exist, thus error string will be returned - status = traffic_stub.PlayTraffic( - br.traffic_api_pb2.PlaybackInfos( - playbackInfo=list(map(create_playback_config, playbacklist)) - ) - ) + status = traffic_stub.PlayTraffic(br.traffic_api_pb2.PlaybackInfos(playbackInfo=list(map(create_playback_config, playbacklist)))) print("play traffic result is ", status) time.sleep(5) @@ -304,11 +281,7 @@ def run(url: str, "mode": br.traffic_api_pb2.Mode.STOP, }, ] - status_record = traffic_stub.PlayTraffic( - br.traffic_api_pb2.PlaybackInfos( - playbackInfo=list(map(create_playback_config, recordlist)) - ) - ) + status_record = traffic_stub.PlayTraffic(br.traffic_api_pb2.PlaybackInfos(playbackInfo=list(map(create_playback_config, recordlist)))) # now stop recording and download the recorded file br.download_file( @@ -321,11 +294,11 @@ def run(url: str, # ecu_B_thread_subscribe = Thread(target = ecu_B_subscribe_, args = (network_stub,)) # ecu_B_thread_subscribe.start() - # read_signals = [br.common_pb2.SignalId(name="SteerAngle", namespace=br.common_pb2.NameSpace(name = "custom_can")), br.common_pb2.SignalId(name="SteerAngleSpeed", namespace=br.common_pb2.NameSpace(name = "custom_can"))] + # read_signals = [br.common_pb2.SignalId(name="SteerAngle", namespace=br.common_pb2.NameSpace(name = "custom_can")), + # br.common_pb2.SignalId(name="SteerAngleSpeed", namespace=br.common_pb2.NameSpace(name = "custom_can"))] # ecu_read_on_timer = Thread(target = read_on_timer, args = (network_stub, read_signals, 2)) # ecu_read_on_timer.start() if __name__ == "__main__": - main(sys.argv[1:]) - + main() diff --git a/python/pyproject.toml b/python/pyproject.toml index 3973adf..81ccc83 100644 --- a/python/pyproject.toml +++ b/python/pyproject.toml @@ -75,6 +75,7 @@ ignore=[ '__pycache__' ] recursive=true +load-plugins = ["pylint_protobuf"] [tool.pylint.format] max-line-length=140 diff --git a/python/pytest/test_sample.py b/python/pytest/test_sample.py index ea8e9c3..54120eb 100644 --- a/python/pytest/test_sample.py +++ b/python/pytest/test_sample.py @@ -1,13 +1,14 @@ -import pytest import remotivelabs.broker.sync as br +import pytest + # Server address: -_SERVER_URL = 'http://127.0.0.1:50051' +_SERVER_URL = "http://127.0.0.1:50051" _SERVER_APIKEY = None -class Broker: - def __init__(self): +class Broker: # pylint: disable=R0903 + def __init__(self) -> None: self.channel = br.create_channel(_SERVER_URL, _SERVER_APIKEY) self.network_stub = br.network_api_pb2_grpc.NetworkServiceStub(self.channel) self.system_stub = br.system_api_pb2_grpc.SystemServiceStub(self.channel) @@ -17,51 +18,54 @@ def __init__(self): # Setup broker with predefined settings @pytest.fixture -def broker(): +def broker() -> Broker: return Broker() -def test_check_license(broker): + +def test_check_license(local_broker: Broker) -> None: """Check valid license""" - br.check_license(broker.system_stub) + br.check_license(local_broker.system_stub) -def test_serverInfo(broker): + +def test_server_info(local_broker: Broker) -> None: """Validate server information""" - conf = broker.system_stub.GetConfiguration(br.common_pb2.Empty()) + conf = local_broker.system_stub.GetConfiguration(br.common_pb2.Empty()) # Major version should be 1 - assert conf.serverVersion.startswith('v1.') + assert conf.serverVersion.startswith("v1.") # Should have 1 namespace assert len(conf.networkInfo) == 1 assert conf.networkInfo[0].namespace.name == "mynamespace" -def test_listSignals(broker): + +def test_list_signals(local_broker: Broker) -> None: """List and valitade signals.""" - ns = br.common_pb2.NameSpace(name='mynamespace') - signals = broker.system_stub.ListSignals(ns) + ns = br.common_pb2.NameSpace(name="mynamespace") + signals = local_broker.system_stub.ListSignals(ns) assert len(signals.frame) == 1 -def test_metaFields(broker): - """Validate signal meta information.""" - sc = br.SignalCreator(broker.system_stub) - metaSignal = sc.get_meta('mysignal1', 'mynamespace') - frame = sc.frame_by_signal('mysignal1', 'mynamespace') - assert frame.name == 'myframe1' - metaFrame = sc.get_meta(frame.name, 'mynamespace') - - assert metaSignal.getDescription() == "My signal 1" - assert metaSignal.getMax() == 255.0 - assert metaSignal.getMin() == 0 - assert metaSignal.getUnit() == "My unit" - assert metaSignal.getSize() == 8 - assert metaSignal.getIsRaw() == False - assert metaFrame.getIsRaw() == True - assert metaSignal.getFactor() == 1.0 - assert metaSignal.getOffset() == 0.0 - assert metaSignal.getSenders() == ['NONE'] - assert metaFrame.getSenders() == ['NONE'] - assert metaSignal.getReceivers() == ['NONE1', 'NONE2'] +def test_meta_fields(local_broker: Broker) -> None: + """Validate signal meta information.""" + sc = br.SignalCreator(local_broker.system_stub) + meta_signal = sc.get_meta("mysignal1", "mynamespace") + frame = sc.frame_by_signal("mysignal1", "mynamespace") + assert frame.name == "myframe1" + meta_frame = sc.get_meta(frame.name, "mynamespace") + + assert meta_signal.getDescription() == "My signal 1" + assert meta_signal.getMax() == 255.0 + assert meta_signal.getMin() == 0 + assert meta_signal.getUnit() == "My unit" + assert meta_signal.getSize() == 8 + assert meta_signal.getIsRaw() is False + assert meta_frame.getIsRaw() is True + assert meta_signal.getFactor() == 1.0 + assert meta_signal.getOffset() == 0.0 + assert meta_signal.getSenders() == ["NONE"] + assert meta_frame.getSenders() == ["NONE"] + assert meta_signal.getReceivers() == ["NONE1", "NONE2"] diff --git a/python/reflector-ecu/reflector.py b/python/reflector-ecu/reflector.py index 25a3a53..5e62355 100644 --- a/python/reflector-ecu/reflector.py +++ b/python/reflector-ecu/reflector.py @@ -1,22 +1,19 @@ +from __future__ import annotations + import argparse -import binascii -import getopt -import grpc -import os import queue import sys import time +from threading import Thread +from typing import Any, Callable, Optional, Sequence, Tuple -from threading import Thread, Timer -from typing import Optional - +import grpc import remotivelabs.broker.sync as br -signal_creator = None -q = queue.Queue() +q: queue.Queue[Any] = queue.Queue() -def read_signals(stub, signal): +def read_signals(stub: br.network_api_pb2_grpc.NetworkServiceStub, signal: br.common_pb2.SignalId) -> br.network_api_pb2.Signals: """Read signals Parameters @@ -35,36 +32,39 @@ def read_signals(stub, signal): try: read_info = br.network_api_pb2.SignalIds(signalId=[signal]) return stub.ReadSignals(read_info) - except grpc._channel._Rendezvous as err: + except grpc.RpcError as err: print(err) + sys.exit() -def ecu_A(stub, pause): +def ecu_a(stub: br.network_api_pb2_grpc.NetworkServiceStub, signal_creator: br.SignalCreator, pause: int) -> None: """Publishes a value, read other value (published by ecu_B) Parameters ---------- stub : NetworkServiceStub Object instance of class + signal_creator: SignalCreator + Object instance of class pause : int Amount of time to pause, in seconds """ increasing_counter = 0 namespace = "ecu_A" - clientId = br.common_pb2.ClientId(id="id_ecu_A") - while True: + client_id = br.common_pb2.ClientId(id="id_ecu_A") + if signal_creator is None: + return + while True: print("\necu_A, seed/counter is ", increasing_counter) # Publishes value 'counter' br.publish_signals( - clientId, + client_id, stub, [ - signal_creator.signal_with_payload( - "counter", namespace, ("integer", increasing_counter) - ), + signal_creator.signal_with_payload("counter", namespace, ("integer", increasing_counter)), # signal_creator.signal_with_payload( # "TestFr07_Child01_UB", namespace, ("integer", 1) # ), @@ -82,15 +82,13 @@ def ecu_A(stub, pause): # Read the other value 'counter_times_2' and output result - read_signal_response = read_signals( - stub, signal_creator.signal("counter_times_2", namespace) - ) + read_signal_response = read_signals(stub, signal_creator.signal("counter_times_2", namespace)) for signal in read_signal_response.signal: - print(f"ecu_A, (result) {signal.id.name} is {get_value(signal)}") + print(f"ecu_A, (result) {signal.id.name} is {get_value_pair(signal)[1]}") increasing_counter = (increasing_counter + 1) % 4 -def read_on_timer(stub, signals, pause): +def read_on_timer(stub: br.network_api_pb2_grpc.NetworkServiceStub, signals: br.network_api_pb2.Signals, pause: int) -> None: """Simple reading with timer Parameters @@ -108,28 +106,36 @@ def read_on_timer(stub, signals, pause): try: response = stub.ReadSignals(read_info) for signal in response.signal: - print(f"ecu_B, (read) {signal.id.name} is {get_value(signal)}") - except grpc._channel._Rendezvous as err: + print(f"ecu_B, (read) {signal.id.name} is {get_value_pair(signal)[1]}") + except grpc.RpcError as err: print(err) time.sleep(pause) -def get_value_pair(signal): +def get_value_pair(signal: br.network_api_pb2.Signal) -> Tuple[str, Any]: if signal.raw != b"": - raise Exception(f"not a valid signal, probably a frame {signal}") - elif signal.HasField("integer"): + raise ValueError(f"not a valid signal, probably a frame {signal}") + if signal.HasField("integer"): return ("integer", signal.integer) - elif signal.HasField("double"): + if signal.HasField("double"): return ("double", signal.double) - elif signal.HasField("arbitration"): + if signal.HasField("arbitration"): return ("arbitration", signal.arbitration) - elif signal.HasField("empty"): + if signal.HasField("empty"): return ("empty", signal.empty) - else: - raise Exception(f"not a valid signal {signal}") + raise ValueError(f"not a valid signal {signal}") -def act_on_signal(client_id, stub, sub_signals, on_change, fun, on_subcribed=None): + +def act_on_signal( + client_id: br.common_pb2.ClientId, + stub: br.network_api_pb2_grpc.NetworkServiceStub, + sub_signals: br.common_pb2.SignalId, + on_change: bool, + fun: Callable[[Any], None], + on_subcribed: Callable[[Any], None] | None = None, +) -> None: + # pylint: disable=R0913 sub_info = br.network_api_pb2.SubscriberConfig( clientId=client_id, signals=br.network_api_pb2.SignalIds(signalId=sub_signals), @@ -143,19 +149,17 @@ def act_on_signal(client_id, stub, sub_signals, on_change, fun, on_subcribed=Non for subs_counter in subscripton: fun(subs_counter.signal) - except grpc.RpcError as e: + except grpc.RpcError: try: subscripton.cancel() - except grpc.RpcError as e2: - pass + except grpc.RpcError as err: + print(err) - except grpc._channel._Rendezvous as err: - print(err) # reload, alternatively non-existing signal print("subscription terminated") -def main(argv): +def main() -> None: parser = argparse.ArgumentParser(description="Provide address to Beambroker") parser.add_argument( @@ -172,7 +176,7 @@ def main(argv): type=str, help="API key is required when accessing brokers running in the cloud", required=False, - default=None + default=None, ) parser.add_argument( @@ -188,18 +192,25 @@ def main(argv): run(args.url, args.x_api_key, args.access_token) -def double_and_publish(network_stub, client_id, trigger, signals): +def double_and_publish( + network_stub: br.network_api_pb2_grpc.NetworkServiceStub, + client_id: br.common_pb2.ClientId, + trigger: Any, + signals: br.network_api_pb2.Signals, + signal_creator: br.SignalCreator, +) -> None: + if signal_creator is None: + return + for signal in signals: # print(f"signals contains {signals}") - print(f"ecu_B, (subscribe) {signal.id.name} {get_value(signal)}") + print(f"ecu_B, (subscribe) {signal.id.name} {get_value_pair(signal)[1]}") if signal.id == trigger: br.publish_signals( client_id, network_stub, [ - signal_creator.signal_with_payload( - "counter_times_2", "ecu_B", ("integer", get_value(signal) * 2) - ), + signal_creator.signal_with_payload("counter_times_2", "ecu_B", ("integer", get_value_pair(signal)[1] * 2)), # add any number of signals/frames here # signal_creator.signal_with_payload( # "TestFr04", "ecu_B", ("raw", binascii.unhexlify("0a0b0c0d")), False @@ -208,16 +219,18 @@ def double_and_publish(network_stub, client_id, trigger, signals): ) -def all_siblings(name, namespace_name): +def all_siblings(name: str, namespace_name: str, signal_creator: br.SignalCreator) -> Sequence[br.common_pb2.SignalId]: + if signal_creator is None: + return [] frame_name = signal_creator.frame_by_signal(name, namespace_name) return signal_creator.signals_in_frame(frame_name.name, frame_name.namespace.name) -def some_function_to_calculate_crc(a, b, c): +def some_function_to_calculate_crc(a: Any, b: Any, c: Any) -> int: # pylint: disable=W0613 return 1 -def change_namespace(signals, namespace_name): +def change_namespace(signals: list[br.network_api_pb2.Signal], namespace_name: str) -> None: for signal in signals: signal.id.namespace.name = namespace_name @@ -228,9 +241,7 @@ def change_namespace(signals, namespace_name): # TestFr07 is split into all signals. Some signals are modified and then dispatched on ecu_B # # refer to interfaces.json for reflector configuration. -def run(url, - x_api_key: Optional[str] = None, - access_token: Optional[str] = None): +def run(url: str, x_api_key: Optional[str] = None, access_token: Optional[str] = None) -> None: """Main function, checking arguments passed to script, setting up stubs, configuration and starting Threads.""" # Setting up stubs and configuration intercept_channel = br.create_channel(url, x_api_key, access_token) @@ -242,39 +253,41 @@ def run(url, br.upload_folder(system_stub, "configuration_can") br.reload_configuration(system_stub) - global signal_creator signal_creator = br.SignalCreator(system_stub) # ecu a, we do this with lambda refering to modify_signal_publish_frame. reflector_client_id = br.common_pb2.ClientId(id="reflector_client_id") def modify_signals_publish_frame( - network_stub, client_id, destination_namespace_name, signals - ): + network_stub: br.network_api_pb2_grpc.NetworkServiceStub, + client_id: br.common_pb2.ClientId, + destination_namespace_name: str, + signals: br.network_api_pb2.Signals, + ) -> None: """Modifiy recieved signals and publish them.""" # work in dictonary domain for easier access. - signal_dict = {signal.id.name: signal for signal in signals} + signal_dict: dict[str, br.network_api_pb2.Signal] = {signal.id.name: signal for signal in signals} # example, lets update TestFr07_Child02 - (type, value) = get_value_pair(signal_dict["TestFr07_Child02"]) + (signal_type, value) = get_value_pair(signal_dict["TestFr07_Child02"]) signal_dict["TestFr07_Child02"] = signal_creator.signal_with_payload( - "TestFr07_Child02", destination_namespace_name, (type, value + 1) + "TestFr07_Child02", destination_namespace_name, (signal_type, value + 1) ) # example, lets update TestFr07_Child01_UB just invert this single bit - (type, value) = get_value_pair(signal_dict["TestFr07_Child01_UB"]) + (signal_type, value) = get_value_pair(signal_dict["TestFr07_Child01_UB"]) signal_dict["TestFr07_Child01_UB"] = signal_creator.signal_with_payload( - "TestFr07_Child01_UB", destination_namespace_name, (type, 1 - value) + "TestFr07_Child01_UB", destination_namespace_name, (signal_type, 1 - value) ) # example, lets compute counter_times_2 using some formula - (type, value) = get_value_pair(signal_dict["counter_times_2"]) + (signal_type, value) = get_value_pair(signal_dict["counter_times_2"]) signal_dict["counter_times_2"] = signal_creator.signal_with_payload( "counter_times_2", destination_namespace_name, ( - type, + signal_type, some_function_to_calculate_crc( id, destination_namespace_name, @@ -292,7 +305,7 @@ def modify_signals_publish_frame( ), ) - publish_list = signal_dict.values() + publish_list = list(signal_dict.values()) # update destination namespace for all entrys in list change_namespace(publish_list, destination_namespace_name) # print(f"updates lists {publish_list}") @@ -317,4 +330,4 @@ def modify_signals_publish_frame( if __name__ == "__main__": - main(sys.argv[1:]) + main() diff --git a/python/restbus/restbus.py b/python/restbus/restbus.py index e20aaf7..257a4ee 100644 --- a/python/restbus/restbus.py +++ b/python/restbus/restbus.py @@ -1,19 +1,23 @@ +from __future__ import annotations + import argparse import math -import time import re -from typing import Generator, Tuple, TypeAlias, Iterable, Optional +import time +from typing import Any, Generator, Iterable, Optional, Tuple, TypeAlias +import google.protobuf.internal.containers # type: ignore import remotivelabs.broker.sync as br +from grpc import Channel -class SignalValue: - def __init__(self, name: str, values: list[br.network_api_pb2.Signal]): +class SignalValue: # pylint: disable=R0903 + def __init__(self, name: str, values: list[br.network_api_pb2.Signal]) -> None: self.index = 0 self.name = name self.values = values - def next(self): + def next(self) -> br.network_api_pb2.Signal: ret = self.values[self.index] self.index = (self.index + 1) % len(self.values) return ret @@ -24,34 +28,32 @@ def next(self): OverrideValues: TypeAlias = dict[str, list[float]] -def genDefaultPublishValues( - signal_creator, manual_sets, child_info +def gen_default_publish_values( + signal_creator: br.SignalCreator, + manual_sets: OverrideValues, + child_info: google.protobuf.internal.containers.RepeatedCompositeFieldContainer[Any], ) -> Generator[SignalValue, None, None]: - for ci in child_info: - signalId = ci.id - meta_data = signal_creator.get_meta(signalId.name, signalId.namespace.name) + signal_id = ci.id + meta_data = signal_creator.get_meta(signal_id.name, signal_id.namespace.name) default_values = [meta_data.getStartValue(0.0)] - if signalId.name in manual_sets: - default_values = manual_sets[signalId.name] + if signal_id.name in manual_sets: + default_values = manual_sets[signal_id.name] - def _yield_values(): - for value in default_values: - yield signal_creator.signal_with_payload( - signalId.name, signalId.namespace.name, ("double", value) - ) + def _yield_values() -> Any: + for value in default_values: # pylint: disable=W0640 + yield signal_creator.signal_with_payload(signal_id.name, signal_id.namespace.name, ("double", value)) # pylint: disable=W0640 - yield SignalValue(signalId.name, list(_yield_values())) + yield SignalValue(signal_id.name, list(_yield_values())) -def selectRestBusFrames( +def select_rest_bus_frames( signal_creator: br.SignalCreator, manual_sets: OverrideValues, frame_infos: Iterable[br.common_pb2.FrameInfo], match_frames: list[str], exclude: bool, ) -> Generator[SchedulingTuple, None, None]: - for fi in frame_infos: si = fi.signalInfo @@ -63,21 +65,19 @@ def selectRestBusFrames( frame_id = si.id meta_data = signal_creator.get_meta(frame_id.name, frame_id.namespace.name) cycle_time = meta_data.getCycleTime(0.0) - publish_values = list( - genDefaultPublishValues(signal_creator, manual_sets, fi.childInfo) - ) + publish_values = list(gen_default_publish_values(signal_creator, manual_sets, fi.childInfo)) yield (cycle_time, frame_id.name, publish_values) -def selectE2eCounters( +def select_e2e_counters( frame_infos: Iterable[br.common_pb2.FrameInfo], ) -> Generator[str, None, None]: - def _yield_all_e2e(): + def _yield_all_e2e() -> Generator[str, None, None]: for frame in frame_infos: - metaData = frame.signalInfo.metaData - if metaData.e2e and metaData.e2e.signalCounter: - yield metaData.e2e.signalCounter - for group in metaData.groups: + meta_data = frame.signalInfo.metaData + if meta_data.e2e and meta_data.e2e.signalCounter: + yield meta_data.e2e.signalCounter + for group in meta_data.groups: if group.e2e and group.e2e.signalCounter: yield group.e2e.signalCounter @@ -86,13 +86,13 @@ def _yield_all_e2e(): yield opt_e2e_counter -def restBusSchedule( - frameSelection: list[SchedulingTuple], - e2eCounters: E2eCounterStates, +# pylint: disable=R0914,R1702 +def rest_bus_schedule( + frame_selection: list[SchedulingTuple], + e2e_counters: E2eCounterStates, network_stub: br.network_api_pb2_grpc.NetworkServiceStub, verbose: bool, ) -> None: - # Use a monotonic timer for scheduling clock: float = time.monotonic() @@ -100,18 +100,18 @@ def restBusSchedule( # - Next schedule trigger # - Cycle time # - Publishable RemoviteBroker values - schedule: list[Tuple[float, float, list[br.network_api_pb2.Signal]]] = [] + schedule: list[Tuple[float, float, list[SignalValue]]] = [] # Put all signals from frame selection in scheduling array - for cycle_time_ms, _, publish_values in frameSelection: + for cycle_time_ms, _, publish_values in frame_selection: cycle_time: float = cycle_time_ms * 0.001 schedule.append((clock, cycle_time, publish_values)) # Client ID of our problam to use in our publish operation - clientId: br.common_pb2.ClientId = br.common_pb2.ClientId(id="MyRestbus") + client_id: br.common_pb2.ClientId = br.common_pb2.ClientId(id="MyRestbus") # Counter only used to print verbose information - sentFramesCount: int = 0 + sent_frames_count: int = 0 # Scheduling loop, run as long as there are cyclic frames to publish while len(schedule) > 0: @@ -123,8 +123,8 @@ def restBusSchedule( # For debugging, print what's going on in scheduler if verbose: ms: int = math.ceil((next_publish - clock) * 1000.0) - print("Sent {} frames, sleeping for {} ms".format(sentFramesCount, ms)) - sentFramesCount = 0 + print(f"Sent {sent_frames_count} frames, sleeping for {ms} ms") + sent_frames_count = 0 # Sleep scheduler until next scheduled event sleep_time: float = next_publish - clock @@ -143,29 +143,26 @@ def restBusSchedule( if len(triggers) > 0: # Collect values to be published publish_combined: list[br.network_api_pb2.Signal] = [] - for next_sleep, cycle_time, signalValues in triggers: - - for signalValue in signalValues: - name = signalValue.name - if name in e2eCounters: - next = e2eCounters[name] - next += 1 - if next > 14: - next = 0 + for next_sleep, cycle_time, signal_values in triggers: + for signal_value in signal_values: + name = signal_value.name + if name in e2e_counters: + next_value = e2e_counters[name] + next_value += 1 + if next_value > 14: + next_value = 0 # Update E2E counter - signalValue.values[0].integer = e2eCounters[name] = next + signal_value.values[0].integer = e2e_counters[name] = next_value - publish_data = list( - map(lambda signalValue: signalValue.next(), signalValues) - ) + publish_data = list(map(lambda signal_value: signal_value.next(), signal_values)) publish_combined += publish_data - sentFramesCount += 1 + sent_frames_count += 1 if cycle_time > 0.0: new_next_sleep: float = next_sleep + cycle_time - schedule.append((new_next_sleep, cycle_time, signalValues)) + schedule.append((new_next_sleep, cycle_time, signal_values)) # Publish values - br.publish_signals(clientId, network_stub, publish_combined) + br.publish_signals(client_id, network_stub, publish_combined) # Sort schedule by upcoming publish time, first signals in array are upcoming in schedule schedule.sort(key=lambda s: s[0]) @@ -174,82 +171,82 @@ def restBusSchedule( print("No more schedules...") -def run( - url: str, - namespace_name: str, - frames: list[str], - exclude: bool, - verbose: bool, - configure: Optional[str], - manual_sets: OverrideValues, - x_api_key: Optional[str] = None, - access_token: Optional[str] = None, -) -> None: +class RunInfo: + # pylint: disable=R0903,R0913 + # Contains properties used for a restbus run, such as url and keys. + + def __init__( + self, url: str, namespace_name: str, frames: list[str], x_api_key: Optional[str] = None, access_token: Optional[str] = None + ) -> None: + self.url: str = url + self.namespace_name: str = namespace_name + self.frames: list[str] = frames + self.x_api_key: Optional[str] = x_api_key + self.access_token: Optional[str] = access_token + + +def get_frame_selection( + run_info: RunInfo, intercept_channel: Channel, configure: Optional[str], manual_sets: OverrideValues, exclude: bool +) -> Tuple[list[SchedulingTuple], E2eCounterStates]: + """Get the frame selection and E2eCounterStates for a chosen run""" - # gRPC connection to RemotiveBroker - intercept_channel = br.create_channel(url, x_api_key, access_token) system_stub = br.system_api_pb2_grpc.SystemServiceStub(intercept_channel) - network_stub = br.network_api_pb2_grpc.NetworkServiceStub(intercept_channel) if configure: - print("Configuring broker with {}".format(configure)) + print(f"Configuring broker with {configure}") br.upload_folder(system_stub, configure) br.reload_configuration(system_stub) # Get all signals available on broker - namespace = br.common_pb2.NameSpace(name=namespace_name) + namespace = br.common_pb2.NameSpace(name=run_info.namespace_name) signals = system_stub.ListSignals(namespace) - if len(frames) == 0: + if len(run_info.frames) == 0: # Exit if no frames selected - print( - "No frames specified, selecting all frames in namespace {}".format( - namespace_name - ) - ) - frames = [] + print(f"No frames specified, selecting all frames in namespace {run_info.namespace_name}") + run_info.frames = [] exclude = True # Generate a list of values ready for publish sc = br.SignalCreator(system_stub) - frameSelection: list[SchedulingTuple] = list( - selectRestBusFrames(sc, manual_sets, signals.frame, frames, exclude) - ) - e2eCounters: E2eCounterStates = dict( - [(signal_name, 0) for signal_name in selectE2eCounters(signals.frame)] - ) - if len(frameSelection) > 0: - print( - "Running restbus for {} frames on namespace {}".format( - len(frameSelection), namespace_name - ) - ) - if verbose: + e2e_counters: E2eCounterStates = dict([(signal_name, 0) for signal_name in select_e2e_counters(signals.frame)]) # pylint: disable=R1717 + frame_selection: list[SchedulingTuple] = list(select_rest_bus_frames(sc, manual_sets, signals.frame, run_info.frames, exclude)) + # Return both the frame selection and counters to use for running the restbus + return frame_selection, e2e_counters + + +def run( + run_info: RunInfo, + exclude: bool, + verbose: bool, + configure: Optional[str], + manual_sets: OverrideValues, +) -> None: + # gRPC connection to RemotiveBroker + intercept_channel = br.create_channel(run_info.url, run_info.x_api_key, run_info.access_token) + network_stub = br.network_api_pb2_grpc.NetworkServiceStub(intercept_channel) + + e2e_counters: E2eCounterStates + frame_selection: list[SchedulingTuple] + frame_selection, e2e_counters = get_frame_selection(run_info, intercept_channel, configure, manual_sets, exclude) - for cycle_time, frame_id, signalValues in frameSelection: + # Run restbus with chosen frames + if len(frame_selection) > 0: + print(f"Running restbus for {len(frame_selection)} frames on namespace {run_info.namespace_name}") + if verbose: + for cycle_time, frame_id, signal_values in frame_selection: if cycle_time > 0.0: - print( - "- Frame {} with cycle time {} ms.".format(frame_id, cycle_time) - ) + print(f"- Frame {frame_id} with cycle time {cycle_time} ms.") else: - print("- Frame {} without cycle time.".format(frame_id)) - for signalValue in signalValues: - valuesMsg = ", ".join( - map( - lambda value: str(value.double), - signalValue.values - ) - ) - print( - " - Signal {}, default value(s): {}.".format( - signalValue.name, valuesMsg - ) - ) + print(f"- Frame {frame_id} without cycle time.") + for signal_value in signal_values: + values_msg = ", ".join(map(lambda value: str(value.double), signal_value.values)) + print(f" - Signal {signal_value.name}, default value(s): {values_msg}.") try: # Run scheduler loop - restBusSchedule(frameSelection, e2eCounters, network_stub, verbose) + rest_bus_schedule(frame_selection, e2e_counters, network_stub, verbose) except KeyboardInterrupt: print("Keyboard interrupt received. Closing scheduler.") else: @@ -265,8 +262,8 @@ def __override_argument_to_tuple(argument: str) -> Tuple[str, list[float]]: name = res.group(1) values = list(map(float, res.group(2).split(","))) return (name, values) - else: - raise Exception("Use pattern SIGNAL_NAME=VALUE") + + raise ValueError("Use pattern SIGNAL_NAME=VALUE") def main() -> None: @@ -343,29 +340,14 @@ def main() -> None: ) parser.add_argument( - "-s", - "--set", - type=str, - action="append", - metavar="NAME=VALUE", - default=[], - help="Manually the value of a given signal" + "-s", "--set", type=str, action="append", metavar="NAME=VALUE", default=[], help="Manually the value of a given signal" ) args = parser.parse_args() manual_sets = dict(map(__override_argument_to_tuple, args.set)) - run( - args.url, - args.namespace, - args.frame, - args.exclude, - args.verbose, - args.configure, - manual_sets, - args.x_api_key, - args.access_token - ) + run_info = RunInfo(args.url, args.namespace, args.frame, args.x_api_key, args.access_token) + run(run_info, args.exclude, args.verbose, args.configure, manual_sets) if __name__ == "__main__": diff --git a/python/simple-ecu/ecu.py b/python/simple-ecu/ecu.py index 5ce69e4..eb93d44 100644 --- a/python/simple-ecu/ecu.py +++ b/python/simple-ecu/ecu.py @@ -1,22 +1,20 @@ +from __future__ import annotations + import argparse import binascii -import grpc -import os import queue -import sys, getopt +import sys import time +from threading import Thread +from typing import Any, Callable, Optional, Tuple +import grpc import remotivelabs.broker.sync as br -from typing import Callable, Sequence, Optional -from threading import Thread, Timer +q: queue.Queue[Any] = queue.Queue() -signal_creator = None -q = queue.Queue() - - -def read_signals(stub, signal): +def read_signals(stub: br.network_api_pb2_grpc.NetworkServiceStub, signal: br.common_pb2.SignalId) -> br.network_api_pb2.Signals: """Read signals Parameters @@ -35,11 +33,12 @@ def read_signals(stub, signal): try: read_info = br.network_api_pb2.SignalIds(signalId=[signal]) return stub.ReadSignals(read_info) - except grpc._channel._Rendezvous as err: + except grpc.RpcError as err: print(err) + return sys.exit() -def ecu_A(stub): +def ecu_a(stub: br.network_api_pb2_grpc.NetworkServiceStub, signal_creator: br.SignalCreator) -> None: """Publishes a value with set frequncy in database or default to 1000ms, read other value (published by ecu_B) Parameters @@ -48,28 +47,24 @@ def ecu_A(stub): Object instance of class """ + if signal_creator is None: + return + namespace = "ecu_A" increasing_counter = 0 - counter_start_value = int( - signal_creator.get_meta("counter", namespace).getStartValue(0) - ) - clientId = br.common_pb2.ClientId(id="id_ecu_A") + counter_start_value = int(signal_creator.get_meta("counter", namespace).getStartValue(0)) + client_id = br.common_pb2.ClientId(id="id_ecu_A") counter_frame = signal_creator.frame_by_signal("counter", namespace) - pause = 0.001 * signal_creator.get_meta( - counter_frame.name, counter_frame.namespace.name - ).getCycleTime(1000.0) + pause = 0.001 * signal_creator.get_meta(counter_frame.name, counter_frame.namespace.name).getCycleTime(1000.0) while True: - print("\necu_A, seed is ", increasing_counter) # Publishes value 'counter' br.publish_signals( - clientId, + client_id, stub, [ - signal_creator.signal_with_payload( - "counter", namespace, ("integer", increasing_counter) - ), + signal_creator.signal_with_payload("counter", namespace, ("integer", increasing_counter)), # add any number of signals here, make sure that all signals/frames are unique. # signal_creator.signal_with_payload( # "TestFr04", namespace, ("raw", binascii.unhexlify("0a0b0c0d")), False @@ -81,15 +76,13 @@ def ecu_A(stub): # Read the other value 'counter_times_2' and output result - read_signal_response = read_signals( - stub, signal_creator.signal("counter_times_2", namespace) - ) + read_signal_response = read_signals(stub, signal_creator.signal("counter_times_2", namespace)) for signal in read_signal_response.signal: print(f"ecu_A, (result) {signal.id.name} is {get_value(signal)}") increasing_counter = counter_start_value + (increasing_counter + 1) % 4 -def read_on_timer(stub, signals, pause): +def read_on_timer(stub: br.network_api_pb2_grpc.NetworkServiceStub, signals: br.network_api_pb2.Signals, pause: int) -> None: """Simple reading with timer Parameters @@ -108,35 +101,27 @@ def read_on_timer(stub, signals, pause): response = stub.ReadSignals(read_info) for signal in response.signal: print(f"ecu_B, (read) {signal.id.name} is {get_value(signal)}") - except grpc._channel._Rendezvous as err: + except grpc.RpcError as err: print(err) time.sleep(pause) -def get_value(signal): +def get_value(signal: br.network_api_pb2.Signal) -> Any: if signal.raw != b"": return "0x" + binascii.hexlify(signal.raw).decode("ascii") - elif signal.HasField("integer"): + if signal.HasField("integer"): return signal.integer - elif signal.HasField("double"): + if signal.HasField("double"): return signal.double - elif signal.HasField("arbitration"): + if signal.HasField("arbitration"): return signal.arbitration - else: - return "empty" + return "empty" -def main(argv): +def main() -> None: parser = argparse.ArgumentParser(description="Provide address to Beambroker") - parser.add_argument( - "-url", - "--url", - type=str, - help="URL of the RemotiveBroker", - default="http://127.0.0.1:50051", - required=False - ) + parser.add_argument("-url", "--url", type=str, help="URL of the RemotiveBroker", default="http://127.0.0.1:50051", required=False) parser.add_argument( "-x_api_key", @@ -161,14 +146,23 @@ def main(argv): type=str, metavar="DIRECTORY", help="Configure broker with specified configuration directory", - default="configuration_udp" + default="configuration_udp", ) args = parser.parse_args() - run(args.url, args.configure, args.x_api_key,args.access_token ) + run(args.url, args.configure, args.x_api_key, args.access_token) + +def double_and_publish( + network_stub: br.network_api_pb2_grpc.NetworkServiceStub, + client_id: br.common_pb2.ClientId, + trigger: Any, + signals: br.network_api_pb2.Signals, + signal_creator: br.SignalCreator, +) -> None: + if signal_creator is None: + return -def double_and_publish(network_stub, client_id, trigger, signals): for signal in signals: print(f"ecu_B, (subscribe) {signal.id.name} {get_value(signal)}") if signal.id == trigger: @@ -176,9 +170,7 @@ def double_and_publish(network_stub, client_id, trigger, signals): client_id, network_stub, [ - signal_creator.signal_with_payload( - "counter_times_2", "ecu_B", ("integer", get_value(signal) * 2) - ), + signal_creator.signal_with_payload("counter_times_2", "ecu_B", ("integer", get_value(signal) * 2)), # add any number of signals/frames here # signal_creator.signal_with_payload( # "TestFr04", "ecu_B", ("raw", binascii.unhexlify("0a0b0c0d")), False @@ -187,35 +179,34 @@ def double_and_publish(network_stub, client_id, trigger, signals): ) +# pylint: disable=R0913 def subscribe( - broker, + broker: Any, client_id: br.common_pb2.ClientId, network_stub: br.network_api_pb2_grpc.NetworkServiceStub, - signals: br.network_api_pb2.Signals, - on_subscribe: Callable[[Sequence[br.network_api_pb2.Signal]], None], + script: list[br.common_pb2.SignalId], + on_subscribe: Callable[[br.network_api_pb2.Signals], None], on_change: bool = False, -) -> grpc.RpcContext: - sync = queue.Queue() - Thread( - target=broker.act_on_signal, +) -> Tuple[Any, Thread]: + sync: queue.Queue[Any] = queue.Queue() + thread: Thread = Thread( + target=broker.act_on_scripted_signal, args=( client_id, network_stub, - signals, + script, on_change, # True: only report when signal changes on_subscribe, - lambda subscription: (sync.put(subscription)), + sync.put, ), - ).start() + ) + thread.start() # wait for subscription to settle subscription = sync.get() - return subscription + return subscription, thread -def run(url: str, - configuration: str, - x_api_key: Optional[str] = None, - access_token: Optional[str] = None): +def run(url: str, configuration: str, x_api_key: Optional[str] = None, access_token: Optional[str] = None) -> None: """Main function, checking arguments passed to script, setting up stubs, configuration and starting Threads.""" # Setting up stubs and configuration intercept_channel = br.create_channel(url, x_api_key, access_token) @@ -224,27 +215,26 @@ def run(url: str, system_stub = br.system_api_pb2_grpc.SystemServiceStub(intercept_channel) br.check_license(system_stub) - print("Using configuration {}".format(configuration)) + print(f"Using configuration {configuration}") br.upload_folder(system_stub, configuration) br.reload_configuration(system_stub) - global signal_creator signal_creator = br.SignalCreator(system_stub) # Lists available signals configuration = system_stub.GetConfiguration(br.common_pb2.Empty()) - for networkInfo in configuration.networkInfo: + for network_info in configuration.networkInfo: print( "signals in namespace ", - networkInfo.namespace.name, - system_stub.ListSignals(networkInfo.namespace), + network_info.namespace.name, + system_stub.ListSignals(network_info.namespace), ) # ecu b, we do this with lambda refering to double_and_publish. ecu_b_client_id = br.common_pb2.ClientId(id="id_ecu_B") # Starting subscription thread - subscription = subscribe( + _ = subscribe( br, ecu_b_client_id, network_stub, @@ -254,16 +244,13 @@ def run(url: str, # signal_creator.signal("TestFr04", "ecu_B"), ], lambda signals: double_and_publish( - network_stub, - ecu_b_client_id, - signal_creator.signal("counter", "ecu_B"), - signals, + network_stub, ecu_b_client_id, signal_creator.signal("counter", "ecu_B"), signals, signal_creator ), ) # ecu a, this is where we publish, and Thread( - target=ecu_A, + target=ecu_a, args=(network_stub,), ).start() @@ -280,4 +267,4 @@ def run(url: str, if __name__ == "__main__": - main(sys.argv[1:]) + main() diff --git a/python/subscribe-to-scripted-signal/subscribe_demo.py b/python/subscribe-to-scripted-signal/subscribe_demo.py index df8b255..8ccbebe 100644 --- a/python/subscribe-to-scripted-signal/subscribe_demo.py +++ b/python/subscribe-to-scripted-signal/subscribe_demo.py @@ -1,24 +1,26 @@ +from __future__ import annotations + import argparse import math -import time -import remotivelabs.broker.sync as br import queue -from threading import Thread, Timer -import grpc +import time +from threading import Thread +from typing import Any, Callable, Generator, Optional, Tuple -from typing import Callable, Generator, Iterable, Optional, TypeVar, Sequence, Tuple +import remotivelabs.broker.sync as br def subscribe( - broker, + broker: Any, client_id: br.common_pb2.ClientId, network_stub: br.network_api_pb2_grpc.NetworkServiceStub, script: bytes, - on_subscribe: Callable[[Sequence[br.network_api_pb2.Signal]], None], + on_subscribe: Callable[[br.network_api_pb2.Signals], None], on_change: bool = False, -) -> grpc.RpcContext: - sync = queue.Queue() - thread = Thread( +) -> Tuple[Any, Thread]: + # pylint: disable=R0913 + sync: queue.Queue[Any] = queue.Queue() + thread: Thread = Thread( target=broker.act_on_scripted_signal, args=( client_id, @@ -26,7 +28,7 @@ def subscribe( script, on_change, # True: only report when signal changes on_subscribe, - lambda subscription: (sync.put(subscription)), + sync.put, ), ) thread.start() @@ -35,42 +37,35 @@ def subscribe( return subscription, thread -def subscribe_list( - signal_creator, signals: list[Tuple[str, str]] -) -> Generator[br.common_pb2.SignalId, None, None]: +def subscribe_list(signal_creator: Any, signals: list[Tuple[str, str]]) -> Generator[br.common_pb2.SignalId, None, None]: for namespace, signal in signals: yield signal_creator.signal(signal, namespace) def _get_value_str(signal: br.network_api_pb2.Signal) -> str: if signal.raw != b"": - return signal.raw - elif signal.HasField("integer"): - return signal.integer - elif signal.HasField("double"): - return signal.double - elif signal.HasField("arbitration"): - return signal.arbitration - else: - return "empty" + return str(signal.raw) + if signal.HasField("integer"): + return str(signal.integer) + if signal.HasField("double"): + return str(signal.double) + if signal.HasField("arbitration"): + return str(signal.arbitration) + return "empty" def printer(signals: br.network_api_pb2.Signals) -> None: for signal in signals: - print( - "{} {} {}".format( - signal.id.name, signal.id.namespace.name, _get_value_str(signal) - ) - ) + print(f"{signal.id.name} {signal.id.namespace.name} {_get_value_str(signal)}") -def read_script_file(file_path: str) -> str: +def read_script_file(file_path: str) -> bytes: try: with open(file_path, "rb") as file: return file.read() except FileNotFoundError: print("File not found. Please check your file path.") - return "" + return b"" def run( @@ -78,7 +73,6 @@ def run( script_path: str, x_api_key: str, access_token: Optional[str] = None, - ) -> None: # gRPC connection to RemotiveBroker intercept_channel = br.create_channel(url, x_api_key, access_token) @@ -93,7 +87,7 @@ def run( print("The script file is empty.") return - client_id_name = "MySubscriber_{}".format(math.floor(time.monotonic())) + client_id_name = f"MySubscriber_{math.floor(time.monotonic())}" client_id: br.common_pb2.ClientId = br.common_pb2.ClientId(id=client_id_name) print("Subscribing on signals...") @@ -107,12 +101,13 @@ def run( class ScriptPathArgument(argparse.Action): - def __call__(self, _parser, namespace, value, _option): + # pylint: disable=W0222 + def __call__(self, _parser: Any, namespace: Any, value: Any, _option: Any) -> None: # type: ignore print("Script path in use:", value) setattr(namespace, "script_path", value) -def main(): +def main() -> None: parser = argparse.ArgumentParser(description="Provide address to RemotiveBroker") parser.add_argument( @@ -153,9 +148,10 @@ def main(): try: args = parser.parse_args() - except Exception as e: - return print("Error specifying script to use:", e) - run(args.url, args.script_path, args.x_api_key,args.access_token) + except argparse.ArgumentError as e: + print("Error specifying script to use:", e) + return + run(args.url, args.script_path, args.x_api_key, args.access_token) if __name__ == "__main__": diff --git a/python/subscribe-to-scripted-signal/subscribe_standalone.py b/python/subscribe-to-scripted-signal/subscribe_standalone.py index 6165092..1300c5e 100644 --- a/python/subscribe-to-scripted-signal/subscribe_standalone.py +++ b/python/subscribe-to-scripted-signal/subscribe_standalone.py @@ -1,24 +1,27 @@ +from __future__ import annotations + import argparse import math -import time -import remotivelabs.broker.sync as br import queue -from threading import Thread, Timer -import grpc +import sys +import time +from threading import Thread +from typing import Any, Callable, Generator, Optional, Tuple -from typing import Callable, Generator, Iterable, Optional, TypeVar, Sequence, Tuple +import remotivelabs.broker.sync as br def subscribe( - broker, - client_id: br.common_pb2.ClientId, - network_stub: br.network_api_pb2_grpc.NetworkServiceStub, - script: bytes, - on_subscribe: Callable[[Sequence[br.network_api_pb2.Signal]], None], - on_change: bool = False, -) -> grpc.RpcContext: - sync = queue.Queue() - thread = Thread( + broker: Any, + client_id: br.common_pb2.ClientId, + network_stub: br.network_api_pb2_grpc.NetworkServiceStub, + script: bytes, + on_subscribe: Callable[[br.network_api_pb2.Signals], None], + on_change: bool = False, +) -> Tuple[Any, Thread]: + # pylint: disable=R0913 + sync: queue.Queue[Any] = queue.Queue() + thread: Thread = Thread( target=broker.act_on_scripted_signal, args=( client_id, @@ -26,7 +29,7 @@ def subscribe( script, on_change, # True: only report when signal changes on_subscribe, - lambda subscription: (sync.put(subscription)), + sync.put, ), ) thread.start() @@ -35,45 +38,38 @@ def subscribe( return subscription, thread -def subscribe_list( - signal_creator, signals: list[Tuple[str, str]] -) -> Generator[br.common_pb2.SignalId, None, None]: +def subscribe_list(signal_creator: Any, signals: list[Tuple[str, str]]) -> Generator[br.common_pb2.SignalId, None, None]: for namespace, signal in signals: yield signal_creator.signal(signal, namespace) def _get_value_str(signal: br.network_api_pb2.Signal) -> str: if signal.raw != b"": - return signal.raw - elif signal.HasField("integer"): - return signal.integer - elif signal.HasField("double"): - return signal.double - elif signal.HasField("arbitration"): - return signal.arbitration - else: - return "empty" + return str(signal.raw) + if signal.HasField("integer"): + return str(signal.integer) + if signal.HasField("double"): + return str(signal.double) + if signal.HasField("arbitration"): + return str(signal.arbitration) + return "empty" def printer(signals: br.network_api_pb2.Signals) -> None: for signal in signals: - print( - "{} {} {}".format( - signal.id.name, signal.id.namespace.name, _get_value_str(signal) - ) - ) + print(f"{signal.id.name} {signal.id.namespace.name} {_get_value_str(signal)}") -def read_script_file(file_path: str) -> str: +def read_script_file(file_path: str) -> bytes: try: with open(file_path, "rb") as file: return file.read() except FileNotFoundError: print("File not found. Please check your file path.") - return "" + return b"" -def create_playback_config(item): +def create_playback_config(item: dict[str, Any]) -> br.traffic_api_pb2.PlaybackInfo: """Creating configuration for playback Parameters @@ -87,22 +83,21 @@ def create_playback_config(item): Object instance of class """ - playbackConfig = br.traffic_api_pb2.PlaybackConfig( + playback_config = br.traffic_api_pb2.PlaybackConfig( fileDescription=br.system_api_pb2.FileDescription(path=item["path"]), namespace=br.common_pb2.NameSpace(name=item["namespace"]), ) return br.traffic_api_pb2.PlaybackInfo( - playbackConfig=playbackConfig, + playbackConfig=playback_config, playbackMode=br.traffic_api_pb2.PlaybackMode(mode=item["mode"]), ) def run( - url: str, - script_path: str, - x_api_key: str, - access_token: Optional[str] = None, - + url: str, + script_path: str, + x_api_key: str, + access_token: Optional[str] = None, ) -> None: # gRPC connection to RemotiveBroker intercept_channel = br.create_channel(url, x_api_key, access_token) @@ -133,11 +128,7 @@ def run( }, ] - status_record = traffic_stub.PlayTraffic( - br.traffic_api_pb2.PlaybackInfos( - playbackInfo=list(map(create_playback_config, record_list)) - ) - ) + status_record = traffic_stub.PlayTraffic(br.traffic_api_pb2.PlaybackInfos(playbackInfo=list(map(create_playback_config, record_list)))) print("Recording playback status: ", status_record) script = read_script_file(script_path) @@ -147,7 +138,7 @@ def run( print("The script file is empty.") return - client_id_name = "MySubscriber_{}".format(math.floor(time.monotonic())) + client_id_name = f"MySubscriber_{math.floor(time.monotonic())}" client_id: br.common_pb2.ClientId = br.common_pb2.ClientId(id=client_id_name) print("Subscribing on signals...") @@ -161,12 +152,13 @@ def run( class ScriptPathArgument(argparse.Action): - def __call__(self, _parser, namespace, value, _option): + # pylint: disable=W0222 + def __call__(self, _parser: Any, namespace: argparse.Namespace, value: Any, _option: Any) -> None: # type: ignore print("Script path in use:", value) setattr(namespace, "script_path", value) -def main(): +def main() -> None: parser = argparse.ArgumentParser(description="Provide address to RemotiveBroker") parser.add_argument( @@ -207,8 +199,9 @@ def main(): try: args = parser.parse_args() - except Exception as e: - return print("Error specifying script to use:", e) + except argparse.ArgumentError as e: + print("Error specifying script to use:", e) + sys.exit() run(args.url, args.script_path, args.x_api_key, args.access_token) diff --git a/python/subscribe/subscribe.py b/python/subscribe/subscribe.py index 23dfe43..25d8e49 100644 --- a/python/subscribe/subscribe.py +++ b/python/subscribe/subscribe.py @@ -1,21 +1,23 @@ from __future__ import annotations import argparse +import sys import time from typing import Optional + from remotivelabs.broker.sync import ( - Client, - SignalsInFrame, BrokerException, + Client, SignalIdentifier, + SignalsInFrame, ) -def run_subscribe_sample(url: str, signals: list[str], secret: Optional[str] = None): +def run_subscribe_sample(url: str, signals: list[str], secret: Optional[str] = None) -> None: client = Client(client_id="Sample client") client.connect(url=url, api_key=secret) - def on_signals(signals_in_frame: SignalsInFrame): + def on_signals(signals_in_frame: SignalsInFrame) -> None: for signal in signals_in_frame: print(signal.to_json()) @@ -23,11 +25,11 @@ def on_signals(signals_in_frame: SignalsInFrame): try: - def to_signal_id(signal: str): + def to_signal_id(signal: str) -> SignalIdentifier: s = signal.split(":") if len(s) != 2: print("--signals must be in format namespace:signal_name") - exit(1) + sys.exit(1) return SignalIdentifier(s[1], s[0]) subscription = client.subscribe( @@ -36,15 +38,13 @@ def to_signal_id(signal: str): ) except BrokerException as e: print(e) - exit(1) - except Exception as e: + sys.exit(1) + except Exception as e: # pylint: disable=W0718 print(e) - exit(1) + sys.exit(1) try: - print( - "Broker connection and subscription setup completed, waiting for signals..." - ) + print("Broker connection and subscription setup completed, waiting for signals...") while True: time.sleep(1) except KeyboardInterrupt: @@ -52,7 +52,7 @@ def to_signal_id(signal: str): print("Keyboard interrupt received, closing") -def main(): +def main() -> None: parser = argparse.ArgumentParser(description="Provide address to RemotiveBroker") parser.add_argument( @@ -82,18 +82,17 @@ def main(): default=None, ) - parser.add_argument( - "-s", "--signals", help="Signal to subscribe to", required=True, nargs="*" - ) + parser.add_argument("-s", "--signals", help="Signal to subscribe to", required=True, nargs="*") try: args = parser.parse_args() - except Exception as e: - return print("Error specifying signals to use:", e) + except argparse.ArgumentError as e: + print("Error specifying signals to use:", e) + sys.exit(1) if len(args.signals) == 0: print("You must subscribe to at least one signal with --signals namespace:somesignal") - exit(1) + sys.exit(1) secret = args.x_api_key if args.x_api_key is not None else args.access_token run_subscribe_sample(args.url, args.signals, secret)