Skip to content

Commit

Permalink
add mavlink tunnel example
Browse files Browse the repository at this point in the history
  • Loading branch information
Huibean committed Apr 24, 2024
1 parent 4b6f3d3 commit 5e7bea0
Showing 1 changed file with 190 additions and 0 deletions.
190 changes: 190 additions & 0 deletions examples/mavlink_tunnel.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
#!/usr/bin/env python3
import dronecan, time, threading
from pymavlink.dialects.v20 import ardupilotmega as mavlink2

# get command line arguments
from argparse import ArgumentParser
parser = ArgumentParser(description='mavlink DroneCAN tunnel')
parser.add_argument("--node-id", default=100, type=int, help="CAN node ID")
parser.add_argument("--debug", action='store_true', help="enable debug")
parser.add_argument("--rate", type=float, default=10, help="broadcast rate Hz")
parser.add_argument("uri", default=None, type=str, help="CAN URI")
args = parser.parse_args()

#Create mavlink parser instance
mav = mavlink2.MAVLink(None)

#Vehicle State
last_mode = -1
is_armed = False

def handle_Targetted(msg):
'''handle Targetted message'''
global last_mode, is_armed
if args.debug:
# display the message on the console in human readable format
print(dronecan.to_yaml(msg))

if msg.message.target_node == args.node_id:
if msg.message.buffer:
for data in msg.message.buffer:
try:
mav_message = mav.parse_char(bytes([data]))
if mav_message is not None:
if args.debug:
print("Parsed Mavlink message: ", mav_message)
mav_msg_id = mav_message.get_msgId()
if mav_msg_id == mavlink2.MAVLINK_MSG_ID_HEARTBEAT:
if mav_message.type == mavlink2.MAV_TYPE_GCS:
return
mode = mav_message.custom_mode
is_armed = mav_message.base_mode & mavlink2.MAV_MODE_FLAG_SAFETY_ARMED != 0
if mode != last_mode:
if last_mode == -1:
# Request Mavlink messages
mav_pkt = get_MavlinkRequestDataStreamMsg(mavlink2.MAVLINK_MSG_ID_EKF_STATUS_REPORT, 500)
publish_MavlinkMsg(mav_pkt)
mav_pkt = get_MavlinkRequestDataStreamMsg(mavlink2.MAVLINK_MSG_ID_SYS_STATUS, 500)
publish_MavlinkMsg(mav_pkt)

last_mode = mode
elif mav_msg_id == mavlink2.MAVLINK_MSG_ID_EKF_STATUS_REPORT:
pass
elif mav_msg_id == mavlink2.MAVLINK_MSG_ID_SYS_STATUS:
pass

except mavlink2.MAVError as e:
print(f"Error parsing MAVLink message: {e}")
continue

def publish_NodeInfo():
msg = dronecan.uavcan.protocol.NodeStatus()
msg.uptime_sec = int(time.time())
msg.health = msg.HEALTH_OK
msg.mode = msg.MODE_OPERATIONAL
msg.submode = 0
msg.vendor_specific_status_code = 0
node.broadcast(msg)
if args.debug:
# display the message on the console in human readable format
print(dronecan.to_yaml(msg))

def publish_MavlinkMsg(mav_pkt):
'''send Tunnel message'''
buffer = list(mav_pkt)
chunks = [buffer[i:i + 120] for i in range(0, len(buffer), 120)]
for chunk in chunks:
msg = dronecan.uavcan.tunnel.Targetted()
msg.serial_id = 1
msg.target_node = 10
msg.protocol.protocol = 1 # MAVLink2
msg.baudrate = 115200
msg.options = msg.OPTION_LOCK_PORT
msg.buffer = chunk
node.broadcast(msg)
if args.debug:
# display the message on the console in human readable format
print(dronecan.to_yaml(msg))

def get_MavlinkRequestDataStreamMsg(message_id, interval_ms):
'''create a mavlink COMMAND_LONG message to set the interval between messages'''
target_system = 1 # The system ID of your drone
target_component = 0 # The component ID of your drone
command = mavlink2.MAV_CMD_SET_MESSAGE_INTERVAL # The command ID
confirmation = 0 # The confirmation
param1 = message_id # The ID of the message
param2 = interval_ms * 1000 # The new interval between messages, in microseconds
param3 = param4 = param5 = param6 = param7 = 0 # Not used
msg = mavlink2.MAVLink_command_long_message(target_system, target_component, command, confirmation, param1, param2, param3, param4, param5, param6, param7)
mav_pkt = msg.pack(mav)
return mav_pkt

def get_MavlnikSetModeMsg(mode_id):
target_system = 1 # The system ID of your drone
target_component = 0 # The component ID of your drone
command = mavlink2.MAV_CMD_DO_SET_MODE # The command ID
confirmation = 0 # The confirmation
param1 = 1 # The new base mode (1 means custom mode)
param2 = mode_id # The new custom mode
param3 = param4 = param5 = param6 = param7 = 0 # Not used
msg = mavlink2.MAVLink_command_long_message(target_system, target_component, command, confirmation, param1, param2, param3, param4, param5, param6, param7)
mav_pkt = msg.pack(mav)
return mav_pkt

def get_MavlinkSetArmMsg(arm):
'''create a mavlink COMMAND_LONG message to arm/disarm'''
target_system = 1 # The system ID of your drone
target_component = 0 # The component ID of your drone
command = mavlink2.MAV_CMD_COMPONENT_ARM_DISARM # The command ID
confirmation = 0 # The confirmation
param1 = 1 if arm else 0 # 1 to arm, 0 to disarm
param2 = param3 = param4 = param5 = param6 = param7 = 0 # Not used
msg = mavlink2.MAVLink_command_long_message(target_system, target_component, command, confirmation, param1, param2, param3, param4, param5, param6, param7)
mav_pkt = msg.pack(mav)
return mav_pkt

def get_MavlinkTakeoffMsg(alt):
'''create a mavlink COMMAND_LONG message to take off'''
target_system = 1 # The system ID of your drone
target_component = 0 # The component ID of your drone
command = mavlink2.MAV_CMD_NAV_TAKEOFF # The command ID
confirmation = 0 # The confirmation
param1 = 0 # Not used
param2 = 0 # Not used
param3 = 0 # Not used
param4 = 0 # Not used
param5 = 0 # Not used
param6 = 0 # Not used
param7 = alt # The altitude to take off to
msg = mavlink2.MAVLink_command_long_message(target_system, target_component, command, confirmation, param1, param2, param3, param4, param5, param6, param7)
mav_pkt = msg.pack(mav)
return mav_pkt

def vehicle_Task():
print("Vehicle Task started")

mav_pkt = get_MavlnikSetModeMsg(mavlink2.COPTER_MODE_GUIDED)
publish_MavlinkMsg(mav_pkt)
print("Changed mode to GUIDED")

print("Arming vehicle...")
mav_pkt = get_MavlinkSetArmMsg(True)
publish_MavlinkMsg(mav_pkt)

while not is_armed:
time.sleep(1)

print("Vehicle armed")

mav_pkt = get_MavlinkTakeoffMsg(10)
publish_MavlinkMsg(mav_pkt)
print("Takeoff to 10m")

time.sleep(5)
print("Set mode to RTL")
mav_pkt = get_MavlnikSetModeMsg(mavlink2.COPTER_MODE_RTL)
publish_MavlinkMsg(mav_pkt)

while is_armed:
time.sleep(1)
print("Vehicle disarmed, task finished")

# Initializing a DroneCAN node instance.
node = dronecan.make_node(args.uri, node_id=args.node_id, bitrate=1000000)

# Initializing a node monitor, so we can see what nodes are online
node_monitor = dronecan.app.node_monitor.NodeMonitor(node)

node.periodic(1.0/args.rate, publish_NodeInfo)

node.add_handler(dronecan.uavcan.tunnel.Targetted, handle_Targetted)

#Create Vehicle task thread
vehicle_task_thread = threading.Thread(target=vehicle_Task)
vehicle_task_thread.start()

# Running the node until the application is terminated or until first error.
try:
node.spin()
except KeyboardInterrupt:
pass

0 comments on commit 5e7bea0

Please sign in to comment.