Skip to content

Commit

Permalink
fix for dynamic entry point address
Browse files Browse the repository at this point in the history
  • Loading branch information
pablobuenaposada committed Nov 4, 2024
1 parent 3cb9c08 commit ab92d93
Show file tree
Hide file tree
Showing 7 changed files with 81 additions and 55 deletions.
5 changes: 3 additions & 2 deletions src/bench/kpro.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
# PYTHONPATH=src python src/backend/bench/kpro.py
# PYTHONPATH=src poetry run python src/bench/kpro.py
from __future__ import print_function

from backend.devices.kpro.kpro import Kpro
from reprint import output

from devices.kpro.kpro import Kpro

kpro = Kpro()
with output(output_type="dict", initial_len=1, interval=0) as output_list:
while True:
Expand Down
5 changes: 3 additions & 2 deletions src/bench/s300.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
# PYTHONPATH=src python src/backend/bench/s300.py
# PYTHONPATH=src poetry run python src/bench/s300.py
from __future__ import print_function

from backend.devices.s300.s300 import S300
from reprint import output

from devices.s300.s300 import S300

s300 = S300()
with output(output_type="dict", initial_len=1, interval=0) as output_list:
while True:
Expand Down
12 changes: 7 additions & 5 deletions src/devices/ecu_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,14 @@ def establish_connection(device):
device.set_configuration()
cfg = device.get_active_configuration()
intf = cfg[(0, 0)]
entry_point = usb.util.find_descriptor(
intf,
custom_match=lambda e: usb.util.endpoint_direction(e.bEndpointAddress)
== usb.util.ENDPOINT_OUT,
return (
usb.util.find_descriptor(
intf,
custom_match=lambda e: usb.util.endpoint_direction(e.bEndpointAddress)
== usb.util.ENDPOINT_OUT,
),
intf[0].bEndpointAddress,
)
return entry_point


def get_value_from_ecu(version, indexes, data, default=0):
Expand Down
35 changes: 21 additions & 14 deletions src/devices/kpro/kpro.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@ class Kpro:

def __init__(self):
self.data0 = self.data1 = self.data2 = self.data3 = self.data4 = self.data5 = []
self.kpro_device = self.version = self.entry_point = None
self.kpro_device = self.version = self.entry_point = (
self.entry_point_address
) = None
self.status = False

self.find_and_connect()
Expand All @@ -47,7 +49,9 @@ def find_and_connect(self):

if self.kpro_device: # if kpro device is found
try:
self.entry_point = establish_connection(self.kpro_device)
self.entry_point, self.entry_point_address = establish_connection(
self.kpro_device
)
except usb.core.USBError:
# if there's an error while connecting to the usb device we just want to try again
# so let's ensure that we keep in the while loop
Expand All @@ -58,41 +62,41 @@ def find_and_connect(self):
threading.Thread(target=self._update).start()

@staticmethod
def _read_from_device(version, device, entry_point):
def _read_from_device(version, device, entry_point, entry_point_address):
data0 = data1 = data2 = data3 = data5 = []

if version == constants.KPRO4_ID:
entry_point.write("\x40")
data4 = device.read(0x82, 1000) # kpro v4
data4 = device.read(entry_point_address, 1000) # kpro v4
else:
entry_point.write("\x40")
data4 = device.read(0x81, 1000) # kpro v2 & v3
data4 = device.read(entry_point_address, 1000) # kpro v2 & v3

entry_point.write("\x60")
if version == constants.KPRO23_ID:
data0 = device.read(0x81, 1000) # kpro v2 & v3
data0 = device.read(entry_point_address, 1000) # kpro v2 & v3
elif version == constants.KPRO4_ID:
data0 = device.read(0x82, 1000) # kpro v4
data0 = device.read(entry_point_address, 1000) # kpro v4

entry_point.write("\x61")
# found on kpro2 that sometimes len=44, normally 16
if version == constants.KPRO23_ID:
data1 = device.read(0x81, 1000) # kpro v2 & v3
data1 = device.read(entry_point_address, 1000) # kpro v2 & v3
elif version == constants.KPRO4_ID:
data1 = device.read(0x82, 1000) # kpro v4
data1 = device.read(entry_point_address, 1000) # kpro v4

entry_point.write("\x62")
if version == constants.KPRO23_ID:
data2 = device.read(0x81, 1000) # kpro v2 & v3
data2 = device.read(entry_point_address, 1000) # kpro v2 & v3
elif version == constants.KPRO4_ID:
data2 = device.read(0x82, 1000) # kpro v4
data2 = device.read(entry_point_address, 1000) # kpro v4

if version == constants.KPRO4_ID:
entry_point.write("\x65")
data3 = device.read(0x82, 128, 1000) # kpro v4
data3 = device.read(entry_point_address, 128, 1000) # kpro v4
else: # for v3 only, v2 will not return anything meaningful
entry_point.write("\xb0")
data5 = device.read(0x81, 1000)
data5 = device.read(entry_point_address, 1000)

return data0, data1, data2, data3, data4, data5

Expand All @@ -107,7 +111,10 @@ def _update(self):
self.data4,
self.data5,
) = self._read_from_device(
self.version, self.kpro_device, self.entry_point
self.version,
self.kpro_device,
self.entry_point,
self.entry_point_address,
)
except usb.core.USBError as e:
# error 60 (operation timed out), just continue to try again
Expand Down
19 changes: 12 additions & 7 deletions src/devices/s300/s300.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ class S300:

def __init__(self):
self.data4 = self.data5 = self.data6 = []
self.device = self.version = self.entry_point = None
self.device = self.version = self.entry_point = self.entry_point_address = None
self.status = False

self.find_and_connect()
Expand All @@ -41,7 +41,9 @@ def find_and_connect(self):

if self.device: # if s300 device is found
try:
self.entry_point = establish_connection(self.device)
self.entry_point, self.entry_point_address = establish_connection(
self.device
)
except usb.core.USBError:
# if there's an error while connecting to the usb device we just want to try again
# so let's ensure that we keep in the while loop
Expand All @@ -52,23 +54,26 @@ def find_and_connect(self):
threading.Thread(target=self._update).start()

@staticmethod
def _read_from_device(version, device, entry_point):
def _read_from_device(version, device, entry_point, entry_point_address):
entry_point.write(b"\x90")
data6 = device.read(0x82, 1000, 1000)
data6 = device.read(entry_point_address, 1000, 1000)

entry_point.write(b"\xB0")
data5 = device.read(0x82, 128, 1000)
data5 = device.read(entry_point_address, 128, 1000)

entry_point.write(b"\x40")
data4 = device.read(0x82, 1000)
data4 = device.read(entry_point_address, 1000)

return data4, data5, data6

def _update(self):
while self.status:
try:
self.data4, self.data5, self.data6 = self._read_from_device(
self.version, self.device, self.entry_point
self.version,
self.device,
self.entry_point,
self.entry_point_address,
)

except usb.core.USBError as e:
Expand Down
35 changes: 20 additions & 15 deletions src/tests/devices/kpro/test_kpro.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from unittest import mock
from unittest.mock import MagicMock, call
from unittest.mock import ANY, MagicMock, call

import pytest
import usb
Expand Down Expand Up @@ -84,11 +84,11 @@ def test_init_no_kpro_connected(self):
(
constants.KPRO4_ID,
[
call.read(0x82, 1000),
call.read(0x82, 1000),
call.read(0x82, 1000),
call.read(0x82, 1000),
call.read(0x82, 128, 1000),
call.read(ANY, 1000),
call.read(ANY, 1000),
call.read(ANY, 1000),
call.read(ANY, 1000),
call.read(ANY, 128, 1000),
],
[
call.write("\x40"),
Expand All @@ -101,11 +101,11 @@ def test_init_no_kpro_connected(self):
(
constants.KPRO23_ID,
[
call.read(0x81, 1000),
call.read(0x81, 1000),
call.read(0x81, 1000),
call.read(0x81, 1000),
call.read(0x81, 1000),
call.read(ANY, 1000),
call.read(ANY, 1000),
call.read(ANY, 1000),
call.read(ANY, 1000),
call.read(ANY, 1000),
],
[
call.write("\x40"),
Expand All @@ -117,7 +117,7 @@ def test_init_no_kpro_connected(self):
),
(
None,
[call.read(0x81, 1000), call.read(0x81, 1000)],
[call.read(ANY, 1000), call.read(ANY, 1000)],
[
call.write("\x40"),
call.write("\x60"),
Expand All @@ -132,8 +132,11 @@ def test__read_from_device(self, version, read_calls, write_calls):
device = MagicMock()
device.read = MagicMock(return_value=[])
entry_point = MagicMock()
entry_point_address = MagicMock()

assert self.kpro._read_from_device(version, device, entry_point) == (
assert self.kpro._read_from_device(
version, device, entry_point, entry_point_address
) == (
[],
[],
[],
Expand Down Expand Up @@ -166,10 +169,12 @@ def test_find_and_connect_exception(self):
# "(usb.core.USBError("foo", errno=60), True, False)
),
)
def test__update_excption(self, error, status_result, init_called_result):
def test__update_exception(self, error, status_result, init_called_result):
"""in case any usb error while fetching the data"""
self.kpro.status = True
self.kpro.version = self.kpro.kpro_device = self.kpro.entry_point = None
self.kpro.version = self.kpro.kpro_device = self.kpro.entry_point = (
self.kpro.entry_point_address
) = None
with mock.patch(
"devices.kpro.kpro.Kpro._read_from_device"
) as m__read_from_device, mock.patch(
Expand Down
25 changes: 15 additions & 10 deletions src/tests/devices/s300/test_s300.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from unittest import mock
from unittest.mock import MagicMock, call
from unittest.mock import ANY, MagicMock, call

import pytest
import usb
Expand Down Expand Up @@ -75,9 +75,9 @@ def test_init_no_s300_connected(self):
(
constants.S3003_ID,
[
call.read(0x82, 1000, 1000),
call.read(0x82, 128, 1000),
call.read(0x82, 1000),
call.read(ANY, 1000, 1000),
call.read(ANY, 128, 1000),
call.read(ANY, 1000),
],
[
call.write(b"\x90"),
Expand All @@ -88,9 +88,9 @@ def test_init_no_s300_connected(self):
(
None,
[
call.read(0x82, 1000, 1000),
call.read(0x82, 128, 1000),
call.read(0x82, 1000),
call.read(ANY, 1000, 1000),
call.read(ANY, 128, 1000),
call.read(ANY, 1000),
],
[
call.write(b"\x90"),
Expand All @@ -104,8 +104,11 @@ def test__read_from_device(self, version, read_calls, write_calls):
device = MagicMock()
device.read = MagicMock(return_value=[])
entry_point = MagicMock()
entry_point_address = MagicMock()

assert self.s300._read_from_device(version, device, entry_point) == ([], [], [])
assert self.s300._read_from_device(
version, device, entry_point, entry_point_address
) == ([], [], [])
assert device.mock_calls == read_calls
assert entry_point.mock_calls == write_calls

Expand All @@ -131,10 +134,12 @@ def test_find_and_connect_exception(self):
# "(usb.core.USBError("foo", errno=60), True, False)
),
)
def test__update_excption(self, error, status_result, init_called_result):
def test__update_exception(self, error, status_result, init_called_result):
"""in case any usb error while fetching the data"""
self.s300.status = True
self.s300.version = self.s300.device = self.s300.entry_point = None
self.s300.version = self.s300.device = self.s300.entry_point = (
self.s300.entry_point_address
) = None
with mock.patch(
"devices.s300.s300.S300._read_from_device"
) as m__read_from_device, mock.patch(
Expand Down

0 comments on commit ab92d93

Please sign in to comment.