Skip to content

Commit

Permalink
Port the g.Tec driver to PyUSB 1.0
Browse files Browse the repository at this point in the history
  • Loading branch information
tdy committed Aug 8, 2016
1 parent b0f2818 commit 7a68066
Showing 1 changed file with 59 additions and 66 deletions.
125 changes: 59 additions & 66 deletions libmushu/driver/gtec.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,13 @@
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.


# TODO: update to new version of pyusb

import struct
import time
from exceptions import Exception
import logging

import usb
import usb.core
import usb.util
from scipy.signal import iirfilter
import numpy as np

Expand All @@ -36,7 +35,7 @@
logger.info('Logger started')

ID_VENDOR_GTEC = 0x153c
# I saw an am with this vendorid too
# I saw an amp with this vendorid too
ID_VENDOR_GTEC2 = 0x15c3
ID_PRODUCT_GUSB_AMP = 0x0001

Expand All @@ -47,49 +46,50 @@ class GUSBamp(Amplifier):

def __init__(self):
logger.info('Initializing GUSBamp instance')
# list of available amps
self.amps = []
for bus in usb.busses():
for device in bus.devices:
if (device.idVendor in [ID_VENDOR_GTEC, ID_VENDOR_GTEC2] and
device.idProduct == ID_PRODUCT_GUSB_AMP):
self.amps.append(device)
self.devh = None
# find the amplifier
self.dev = usb.core.find(custom_match=
lambda d:
d.idVendor in (ID_VENDOR_GTEC, ID_VENDOR_GTEC2) and
d.idProduct == ID_PRODUCT_GUSB_AMP)
assert(self.dev is not None)
self.mode = None
# Initialize the amplifier and make it ready.
device = self.amps[0]
self.devh = device.open()
# detach kernel driver if nessecairy
config = device.configurations[0]
self.devh.setConfiguration(config)
assert(len(config.interfaces) > 0)
# sometimes it is the other one
first_interface = config.interfaces[0][0]
if first_interface is None:
first_interface = config.interfaces[0][1]
first_setting = first_interface.alternateSetting
self.devh.claimInterface(first_interface)
self.devh.setAltInterface(first_interface)
# initialization straight from the usb-dump
# activate its first configuration
self.dev.set_configuration()
cfg = self.dev[0]
assert(cfg.bNumInterfaces > 0)
# access its interface
intf = cfg[(0,0)]
if intf is None:
intf = cfg[(0,1)]
# detach if necessary
intf_num = intf.bInterfaceNumber
if self.dev.is_kernel_driver_active(intf_num):
self.dev.detach_kernel_driver(intf_num)
usb.util.claim_interface(self.dev, intf_num)
try:
intf.set_altsetting()
except USBError:
pass
# initialize straight from the usb-dump
self.set_mode('data')
self.devh.controlMsg(CX_OUT, 0xb6, value=0x80, buffer=0)
self.devh.controlMsg(CX_OUT, 0xb5, value=0x80, buffer=0)
self.devh.controlMsg(CX_OUT, 0xb9, value=0x00, buffer="\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0c\x0d\x0e\x0f\x10")
self.dev.ctrl_transfer(CX_OUT, 0xb6, wValue=0x80, data_or_wLength=0)
self.dev.ctrl_transfer(CX_OUT, 0xb5, wValue=0x80, data_or_wLength=0)
self.dev.ctrl_transfer(CX_OUT, 0xb9, wValue=0x00, data_or_wLength="\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0c\x0d\x0e\x0f\x10")
self.set_slave_mode(False)
self.devh.controlMsg(CX_OUT, 0xd3, value=0x01, buffer=0)
self.devh.controlMsg(CX_OUT, 0xca, value=0x01, buffer=0)
self.devh.controlMsg(CX_OUT, 0xc8, value=0x01, buffer="\x00"*16)
self.dev.ctrl_transfer(CX_OUT, 0xd3, wValue=0x01, data_or_wLength=0)
self.dev.ctrl_transfer(CX_OUT, 0xca, wValue=0x01, data_or_wLength=0)
self.dev.ctrl_transfer(CX_OUT, 0xc8, wValue=0x01, data_or_wLength="\x00"*16)
self.set_common_reference()
self.set_common_ground()
self.set_calibration_mode('sine')
self.set_sampling_ferquency(128, [False for i in range(16)], None, None)
self.set_sampling_frequency(128, [False for i in range(16)], None, None)

def start(self):
self.devh.controlMsg(CX_OUT, 0xb5, value=0x08, buffer=0)
self.devh.controlMsg(CX_OUT, 0xf7, value=0x00, buffer=0)
self.dev.ctrl_transfer(CX_OUT, 0xb5, wValue=0x08, data_or_wLength=0)
self.dev.ctrl_transfer(CX_OUT, 0xf7, wValue=0x00, data_or_wLength=0)

def stop(self):
self.devh.controlMsg(CX_OUT, 0xb8, [])
self.dev.ctrl_transfer(CX_OUT, 0xb8, data_or_wLength=[])

def get_data(self):
"""Get data."""
Expand All @@ -101,7 +101,7 @@ def get_data(self):
size = 2028 #512
try:
# TODO what is the optimal timeout here?
data = self.devh.bulkRead(endpoint, size, 100)
data = self.dev.read(endpoint, size, timeout=100)
except usb.USBError:
data = []
data = ''.join(map(chr, data))
Expand All @@ -125,7 +125,7 @@ def get_channels(self):
def is_available():
for bus in usb.busses():
for device in bus.devices:
if (device.idVendor in [ID_VENDOR_GTEC, ID_VENDOR_GTEC2] and
if (device.idVendor in (ID_VENDOR_GTEC, ID_VENDOR_GTEC2) and
device.idProduct == ID_PRODUCT_GUSB_AMP):
return True
return False
Expand All @@ -135,24 +135,23 @@ def is_available():
###########################################################################

def set_mode(self, mode):
"""Set mode, 'impedance', 'data'."""
"""Set mode, 'impedance', 'calibrate', 'data'."""
if mode == 'impedance':
self.devh.controlMsg(CX_OUT, 0xc9, value=0x00, buffer=0)
self.devh.controlMsg(CX_OUT, 0xc2, value=0x03, buffer=0)
self.dev.ctrl_transfer(CX_OUT, 0xc9, wValue=0x00, data_or_wLength=0)
self.dev.ctrl_transfer(CX_OUT, 0xc2, wValue=0x03, data_or_wLength=0)
self.mode = 'impedance'
elif mode == 'calibrate':
self.devh.controlMsg(CX_OUT, 0xc1, value=0x00, buffer=0)
self.devh.controlMsg(CX_OUT, 0xc2, value=0x02, buffer=0)
self.dev.ctrl_transfer(CX_OUT, 0xc1, wValue=0x00, data_or_wLength=0)
self.dev.ctrl_transfer(CX_OUT, 0xc2, wValue=0x02, data_or_wLength=0)
self.mode = 'calibration'
elif mode == 'data':
self.devh.controlMsg(CX_OUT, 0xc0, value=0x00, buffer=0)
self.devh.controlMsg(CX_OUT, 0xc2, value=0x01, buffer=0)
self.dev.ctrl_transfer(CX_OUT, 0xc0, wValue=0x00, data_or_wLength=0)
self.dev.ctrl_transfer(CX_OUT, 0xc2, wValue=0x01, data_or_wLength=0)
self.mode = 'data'
else:
raise AmpError('Unknown mode: %s' % mode)


def set_sampling_ferquency(self, fs, channels, bpfilter, notchfilter):
def set_sampling_frequency(self, fs, channels, bpfilter, notchfilter):
""" Set the sampling frequency and filters for individual channels.
Parameters:
Expand Down Expand Up @@ -198,40 +197,38 @@ def set_sampling_ferquency(self, fs, channels, bpfilter, notchfilter):

# set the filters for all channels
if bpfilter == notchfilter == None:
self.devh.controlMsg(CX_OUT, 0xc6, value=0x01, buffer=bp_filter)
self.devh.controlMsg(CX_OUT, 0xc7, value=0x01, buffer=bs_filter)
self.dev.ctrl_transfer(CX_OUT, 0xc6, wValue=0x01, data_or_wLength=bp_filter)
self.dev.ctrl_transfer(CX_OUT, 0xc7, wValue=0x01, data_or_wLength=bs_filter)
else:
idx = 1
for i in channels:
if i:
self.devh.controlMsg(CX_OUT, 0xc6, value=idx, buffer=bp_filter)
self.devh.controlMsg(CX_OUT, 0xc7, value=idx, buffer=bs_filter)
self.dev.ctrl_transfer(CX_OUT, 0xc6, wValue=idx, data_or_wLength=bp_filter)
self.dev.ctrl_transfer(CX_OUT, 0xc7, wValue=idx, data_or_wLength=bs_filter)
idx += 1

# set the sampling frequency
self.devh.controlMsg(CX_OUT, 0xb6, value=fs, buffer=0)

self.dev.ctrl_transfer(CX_OUT, 0xb6, wValue=fs, data_or_wLength=0)

def set_calibration_mode(self, mode):
# buffer: [0x03, 0xd0, 0x07, 0x02, 0x00, 0xff, 0x07]
# ==== ==========
# (1) mode:
# (2) amplitude: little endian (0x07d0 = 2000)
if mode == 'sine':
self.devh.controlMsg(CX_OUT, 0xcb, value=0x00, buffer="\x03\xd0\x07\x02\x00\xff\x07")
self.dev.ctrl_transfer(CX_OUT, 0xcb, wValue=0x00, data_or_wLength="\x03\xd0\x07\x02\x00\xff\x07")
elif mode == 'sawtooth':
self.devh.controlMsg(CX_OUT, 0xcb, value=0x00, buffer="\x02\xd0\x07\x02\x00\xff\x07")
self.dev.ctrl_transfer(CX_OUT, 0xcb, wValue=0x00, data_or_wLength="\x02\xd0\x07\x02\x00\xff\x07")
elif mode == 'whitenoise':
self.devh.controlMsg(CX_OUT, 0xcb, value=0x00, buffer="\x05\xd0\x07\x02\x00\xff\x07")
self.dev.ctrl_transfer(CX_OUT, 0xcb, wValue=0x00, data_or_wLength="\x05\xd0\x07\x02\x00\xff\x07")
elif mode == 'square':
self.devh.controlMsg(CX_OUT, 0xcb, value=0x00, buffer="\x01\xd0\x07\x02\x00\xff\x07")
self.dev.ctrl_transfer(CX_OUT, 0xcb, wValue=0x00, data_or_wLength="\x01\xd0\x07\x02\x00\xff\x07")
else:
raise AmpError('Unknown mode: %s' % mode)

def calculate_impedance(self, u_measured, u_applied=1e4):
return (u_measured * 1e6) / (u_applied - u_measured) - 1e4


def set_common_ground(self, a=False, b=False, c=False, d=False):
"""Set common ground for the electrodes.
Expand All @@ -241,8 +238,7 @@ def set_common_ground(self, a=False, b=False, c=False, d=False):
"""
v = (d << 3) + (c << 2) + (b << 1) + a
self.devh.controlMsg(CX_OUT, 0xbe, value=v, buffer=0)

self.dev.ctrl_transfer(CX_OUT, 0xbe, wValue=v, data_or_wLength=0)

def set_common_reference(self, a=False, b=False, c=False, d=False):
"""Set common reference for the electrodes.
Expand All @@ -253,8 +249,7 @@ def set_common_reference(self, a=False, b=False, c=False, d=False):
"""
v = (d << 3) + (c << 2) + (b << 1) + a
self.devh.controlMsg(CX_OUT, 0xbf, value=v, buffer=0)

self.dev.ctrl_transfer(CX_OUT, 0xbf, wValue=v, data_or_wLength=0)

def set_slave_mode(self, slave):
"""Set amp into slave or master mode.
Expand All @@ -264,15 +259,13 @@ def set_slave_mode(self, slave):
"""
v = 1 if slave else 0
self.devh.controlMsg(CX_OUT, 0xcd, value=v, buffer=0)
self.dev.ctrl_transfer(CX_OUT, 0xcd, wValue=v, data_or_wLength=0)


class AmpError(Exception):
pass




def main():
amp = GUSBamp()
amp.start()
Expand Down

0 comments on commit 7a68066

Please sign in to comment.