-
Notifications
You must be signed in to change notification settings - Fork 4
课程2.让底盘动起来!差速控制模型及串口驱动编写
本节将编写一个坦克底盘的驱动程序,采用串口通信方式,主要实现以下功能:
- 订阅ROS其它节点发布的速度指令,控制底盘运行;
- 读取底盘两个轮子的编码器数据,并解算成底盘的位置坐标。
首先,在tank_driver包的src目录下编写一个用Python写的ROS节点,命名为tank_driver.py,此节点以10Hz的频率,将ROS中nav_msgs包下的Odometry类型消息发布到名为/odom的主题上。
Odometry消息类型定义可查看ROS官方文档http://docs.ros.org/api/nav_msgs/html/msg/Odometry.html。
注意:需将tank_driver.py文件添加执行权限后才可运行:chmod +x tank_driver.py。
#!/usr/bin/python2
import rospy
from nav_msgs.msg import Odometry
if __name__=='__main__':
rospy.init_node('tank_driver001')
pub = rospy.Publisher('/odom', Odometry, queue_size=100)
rate = rospy.Rate(10)
while not rospy.is_shutdown():
current_odom = Odometry()
pub.publish(current_odom)
rate.sleep()
此时,先启动roscore,然后打开新的终端,执行:
$ cd ~/catkin_ws/src/tank_driver/src/
$ ./tank_driver.py
另一种执行方式是用rosrun指令,此命令可在任意路径下执行,无需进入到tank_driver.py所在路径。
$ rosrun tank_driver tank_driver.py
运行此节点后,利用rostopic命令查看节点发布的主题:
$ rostopic list
$ rostopic echo /odom
此时,可看到终端不停地打印出/odom主题,由于未对其赋值,所以默认值均为0。
然后,在节点文件中加入一个订阅者,订阅名为/cmd_vel主题的消息,消息类型为geometry_msgs包下的Twist类型,为ROS中专门描述速度的消息。
Twist消息类型定义可查看ROS官方文档http://docs.ros.org/api/geometry_msgs/html/msg/Twist.html。
#!/usr/bin/python2
import rospy
from nav_msgs.msg import Odometry
from geometry_msgs.msg import Twist
def vel_callback(msg):
print(msg.linear.x, msg.angular.z)
if __name__=='__main__':
rospy.init_node('tank_driver001')
pub = rospy.Publisher('/odom', Odometry, queue_size=100)
sub = rospy.Subscriber('/cmd_vel', Twist, vel_callback)
rate = rospy.Rate(10)
while not rospy.is_shutdown():
current_odom = Odometry()
pub.publish(current_odom)
rate.sleep()
然后,在src目录下新建名为keyboard_teleop.py的ROS节点(记得修改执行权限),作用是将键盘控制信号发布到/cmd_vel速度主题。 其源代码到此处下载。
在两个终端中分别启动tank_driver.py和keyboard_teleop.py,并通过键盘的i
,
j
l
键盘发布速度消息,观察tank_driver.py的输出。
$ rosrun tank_driver tank_driver.py
$ rosrun tank_driver keyboard_teleop.py
为了便于下一步开发,将所有功能封装成类比较好。
#!/usr/bin/python2
import rospy
from nav_msgs.msg import Odometry
from geometry_msgs.msg import Twist
class TankDriver():
def __init__(self):
self.pub = rospy.Publisher('/odom', Odometry, queue_size=100)
self.sub = rospy.Subscriber('/cmd_vel', Twist, self.vel_callback)
def run(self):
rate = rospy.Rate(10)
while not rospy.is_shutdown():
current_odom = Odometry()
self.pub.publish(current_odom)
rate.sleep()
def vel_callback(self,msg):
print(msg.linear.x, msg.angular.z)
if __name__=='__main__':
rospy.init_node('tank_driver001')
tank_driver = TankDriver()
tank_driver.run()
将坦克底盘的串口转USB线连接到电脑的USB接口,此时会出现"/dev/ttyUSB0"的串口设备。 在节点中,引入python的serial串口驱动包,打开"/dev/ttyUSB0"串口,波特率为115200。
在本节示例中,只用到了设置速度和读编码器的功能,其接口为:
功能 | 命令 | 说明 |
---|---|---|
设置速度 | "!M 100 110\r" | 设置2电机速度分别为100,110 |
读编码器 | "?C\r" | 查询两2电机的编码器数值 返回值为?C\rC=104692:29694\r,即2电机编码器值 |
再此,先发送一个速度指令进行测试,此时电机应该转动。
#!/usr/bin/python2
import rospy
from nav_msgs.msg import Odometry
from geometry_msgs.msg import Twist
import serial
class TankDriver():
def __init__(self):
self.pub = rospy.Publisher('/odom', Odometry, queue_size=100)
self.sub = rospy.Subscriber('/cmd_vel', Twist, self.vel_callback)
self.ser = serial.Serial('/dev/ttyUSB0', 115200)
def send(self, cmd):
print('send cmd:', cmd)
self.ser.write(cmd)
def set_speed(self, v1, v2):
cmd = '!M {} {}\r'.format(v1, v2)
self.send(cmd)
def run(self):
rate = rospy.Rate(10)
while not rospy.is_shutdown():
current_odom = Odometry()
self.pub.publish(current_odom)
rate.sleep()
def vel_callback(self,msg):
print(msg.linear.x, msg.angular.z)
if __name__=='__main__':
rospy.init_node('tank_driver001')
tank_driver = TankDriver()
tank_driver.set_speed(100,110)
tank_driver.run()
在订阅/cmd_vel的回调函数中,根据接收到的速度数据控制底盘运动。
其中,msg.linear.x表示前进或后退速度,单位为m/s。msg.angular.z表示旋转速度,单位为rad/s。
#!/usr/bin/python2
import rospy
from nav_msgs.msg import Odometry
from geometry_msgs.msg import Twist
import serial
import time
from threading import Lock
lock = Lock()
class TankDriver():
def __init__(self):
self.pub = rospy.Publisher('/odom', Odometry, queue_size=100)
self.sub = rospy.Subscriber('/cmd_vel', Twist, self.vel_callback)
self.ser = serial.Serial('/dev/ttyUSB0', 115200)
def send(self, cmd):
print('send cmd:', cmd)
self.ser.write(cmd)
def read_buffer(self):
time.sleep(0.01)
res = ''
while self.ser.inWaiting() > 0:
res += self.ser.read(1)
res = bytearray(res)
print('got res:', res)
return res
def get_encoder(self):
cmd = '?C\r'
self.send(cmd)
def set_speed(self, v1, v2):
cmd = '!M {} {}\r'.format(v1, v2)
self.send(cmd)
def run(self):
rate = rospy.Rate(10)
while not rospy.is_shutdown():
lock.acquire()
self.get_encoder()
self.read_buffer()
lock.release()
current_odom = Odometry()
self.pub.publish(current_odom)
rate.sleep()
def vel_callback(self,msg):
print(msg.linear.x, msg.angular.z)
lock.acquire()
v1 = 200*msg.linear.x
v1 += 100*msg.angular.z
v2 = 200*msg.linear.x
v2 -= 100*msg.angular.z
self.set_speed(v1, -v2)
self.read_buffer()
lock.release()
if __name__=='__main__':
rospy.init_node('tank_driver001')
tank_driver = TankDriver()
tank_driver.run()
在两个终端中分别启动tank_driver.py和keyboard_teleop.py,并通过键盘的i
,
j
l
键盘发布速度消息,观察底盘运动情况。
$ rosrun tank_driver tank_driver.py
$ rosrun tank_driver keyboard_teleop.py
差速模型示意图:
根据此模型,实现update_odom方法,可根据获取到的编码器值,计算底盘实时里程,并发布到/odom主题。
#!/usr/bin/python2
# coding: utf-8
import rospy
from nav_msgs.msg import Odometry
from geometry_msgs.msg import Twist
import tf
import serial
import time
import math
from threading import Lock
lock = Lock()
class TankDriver():
def __init__(self, serialport, baudrate):
self.pub = rospy.Publisher('/odom', Odometry, queue_size=100)
self.sub = rospy.Subscriber('/cmd_vel', Twist, self.vel_callback)
self.ser = serial.Serial(serialport, baudrate)
self.wheel_diameter = 0.1
self.base_width = 0.5
self.encoder_ticks_per_rev = 4400
self.encoder1 = 0
self.encoder2 = 0
self.encoder1_prev = 0
self.encoder2_prev = 0
self.x = 0
self.y = 0
self.theta = 0
self.odom = Odometry()
self.odom.header.frame_id = 'odom'
self.odom.child_frame_id = 'base_link'
self.time_prev = rospy.Time.now()
def send(self, cmd):
print('send cmd:', cmd)
self.ser.write(cmd)
def read_buffer(self):
time.sleep(0.05)
res = ''
while self.ser.inWaiting() > 0:
res += self.ser.read(1)
res = bytearray(res)
print('got res:', res)
if res[0:2]=='?C' and res[-1]==13:
self.encoder1 = int(res.split(':')[0].split('=')[1])
self.encoder2 = int(res.split(':')[1][:-1])
print('encoder', self.encoder1, self.encoder2)
return res
def get_encoder(self):
cmd = '?C\r'
self.send(cmd)
def set_speed(self, v1, v2):
cmd = '!M {} {}\r'.format(v1, v2)
self.send(cmd)
def update_odom(self):
encoder1 = self.encoder1
encoder2 = self.encoder2
time_current = rospy.Time.now()
time_elapsed = (time_current - self.time_prev).to_sec()
self.time_prev = time_current
dleft = math.pi * self.wheel_diameter * \
(encoder1 - self.encoder1_prev) / self.encoder_ticks_per_rev
dright = math.pi * self.wheel_diameter * \
(encoder2 - self.encoder2_prev) / self.encoder_ticks_per_rev
self.encoder1_prev = encoder1
self.encoder2_prev = encoder2
d = (dleft + dright) / 2
dtheta = (dright - dleft) / self.base_width
if d != 0:
dx = math.cos(dtheta) * d
dy = -math.sin(dtheta) * d
self.x += dx*math.cos(self.theta)-dy*math.sin(self.theta)
self.y += dx*math.sin(self.theta)+dy*math.cos(self.theta)
self.theta += dtheta
self.odom.header.stamp = time_current
self.odom.pose.pose.position.x = self.x
self.odom.pose.pose.position.y = self.y
q = tf.transformations.quaternion_from_euler(0,0,self.theta)
self.odom.pose.pose.orientation.x = q[0]
self.odom.pose.pose.orientation.y = q[1]
self.odom.pose.pose.orientation.z = q[2]
self.odom.pose.pose.orientation.w = q[3]
self.odom.twist.twist.linear.x = d / time_elapsed
self.odom.twist.twist.angular.z = dtheta / time_elapsed
def run(self):
rate = rospy.Rate(10)
while not rospy.is_shutdown():
# 读编码器值
lock.acquire()
self.get_encoder()
self.read_buffer()
lock.release()
# 更新计算并更新里程信息
self.update_odom()
# 发布里程信息
self.pub.publish(self.odom)
rate.sleep()
def vel_callback(self,msg):
lock.acquire()
v1 = 200*msg.linear.x
v1 += 100*msg.angular.z
v2 = 200*msg.linear.x
v2 -= 100*msg.angular.z
self.set_speed(v1, -v2)
self.read_buffer()
lock.release()
if __name__=='__main__':
rospy.init_node('tank_driver001')
serialport = rospy.get_param('~serialport', default='/dev/ttyUSB0')
baudrate = rospy.get_param('~baudrate', default=115200)
tank_driver = TankDriver(serialport, baudrate)
tank_driver.run()
在两个终端中分别启动tank_driver.py和keyboard_teleop.py,并通过键盘的i
,
j
l
键盘发布速度消息。
并查看/odom消息内容更新情况。
$ rosrun tank_driver tank_driver.py
$ rosrun tank_driver keyboard_teleop.py
$ rostopic echo /odom
在tanker_driver包下新建launch文件夹,并在launch文件夹下新建tank_control.launch文件。
这样,可以将串口名和波特率传入节点。
<?xml version="1.0"?>
<launch>
<node name="tank_driver" pkg="tank_driver" type="tank_driver.py">
<param name="serialport" value="/dev/ttyUSB0" />
<param name="baudrate" value="115200" />
</node>
</launch>
考虑到tank_control.launch文件可能会被其它launch文件调用,所以最好采用arg标签指定。
<?xml version="1.0"?>
<launch>
<arg name="serialport" default="/dev/ttyUSB0" />
<arg name="baudrate" default="115200" />
<node name="tank_driver" pkg="tank_driver" type="tank_driver.py">
<param name="serialport" value="$(arg serialport)" />
<param name="baudrate" value="$(arg baudrate)" />
</node>
</launch>
让创造机器人更便捷!