diff --git a/examples/mavlink_tunnel.py b/examples/mavlink_tunnel.py new file mode 100755 index 0000000..45c3aa9 --- /dev/null +++ b/examples/mavlink_tunnel.py @@ -0,0 +1,190 @@ +#!/usr/bin/env python3 +import dronecan, time, threading +from pymavlink.dialects.v20 import ardupilotmega as mavlink2 + +# get command line arguments +from argparse import ArgumentParser +parser = ArgumentParser(description='mavlink DroneCAN tunnel') +parser.add_argument("--node-id", default=100, type=int, help="CAN node ID") +parser.add_argument("--debug", action='store_true', help="enable debug") +parser.add_argument("--rate", type=float, default=10, help="broadcast rate Hz") +parser.add_argument("uri", default=None, type=str, help="CAN URI") +args = parser.parse_args() + +#Create mavlink parser instance +mav = mavlink2.MAVLink(None) + +#Vehicle State +last_mode = -1 +is_armed = False + +def handle_Targetted(msg): + '''handle Targetted message''' + global last_mode, is_armed + if args.debug: + # display the message on the console in human readable format + print(dronecan.to_yaml(msg)) + + if msg.message.target_node == args.node_id: + if msg.message.buffer: + for data in msg.message.buffer: + try: + mav_message = mav.parse_char(bytes([data])) + if mav_message is not None: + if args.debug: + print("Parsed Mavlink message: ", mav_message) + mav_msg_id = mav_message.get_msgId() + if mav_msg_id == mavlink2.MAVLINK_MSG_ID_HEARTBEAT: + if mav_message.type == mavlink2.MAV_TYPE_GCS: + return + mode = mav_message.custom_mode + is_armed = mav_message.base_mode & mavlink2.MAV_MODE_FLAG_SAFETY_ARMED != 0 + if mode != last_mode: + if last_mode == -1: + # Request Mavlink messages + mav_pkt = get_MavlinkRequestDataStreamMsg(mavlink2.MAVLINK_MSG_ID_EKF_STATUS_REPORT, 500) + publish_MavlinkMsg(mav_pkt) + mav_pkt = get_MavlinkRequestDataStreamMsg(mavlink2.MAVLINK_MSG_ID_SYS_STATUS, 500) + publish_MavlinkMsg(mav_pkt) + + last_mode = mode + elif mav_msg_id == mavlink2.MAVLINK_MSG_ID_EKF_STATUS_REPORT: + pass + elif mav_msg_id == mavlink2.MAVLINK_MSG_ID_SYS_STATUS: + pass + + except mavlink2.MAVError as e: + print(f"Error parsing MAVLink message: {e}") + continue + +def publish_NodeInfo(): + msg = dronecan.uavcan.protocol.NodeStatus() + msg.uptime_sec = int(time.time()) + msg.health = msg.HEALTH_OK + msg.mode = msg.MODE_OPERATIONAL + msg.submode = 0 + msg.vendor_specific_status_code = 0 + node.broadcast(msg) + if args.debug: + # display the message on the console in human readable format + print(dronecan.to_yaml(msg)) + +def publish_MavlinkMsg(mav_pkt): + '''send Tunnel message''' + buffer = list(mav_pkt) + chunks = [buffer[i:i + 120] for i in range(0, len(buffer), 120)] + for chunk in chunks: + msg = dronecan.uavcan.tunnel.Targetted() + msg.serial_id = 1 + msg.target_node = 10 + msg.protocol.protocol = 1 # MAVLink2 + msg.baudrate = 115200 + msg.options = msg.OPTION_LOCK_PORT + msg.buffer = chunk + node.broadcast(msg) + if args.debug: + # display the message on the console in human readable format + print(dronecan.to_yaml(msg)) + +def get_MavlinkRequestDataStreamMsg(message_id, interval_ms): + '''create a mavlink COMMAND_LONG message to set the interval between messages''' + target_system = 1 # The system ID of your drone + target_component = 0 # The component ID of your drone + command = mavlink2.MAV_CMD_SET_MESSAGE_INTERVAL # The command ID + confirmation = 0 # The confirmation + param1 = message_id # The ID of the message + param2 = interval_ms * 1000 # The new interval between messages, in microseconds + param3 = param4 = param5 = param6 = param7 = 0 # Not used + msg = mavlink2.MAVLink_command_long_message(target_system, target_component, command, confirmation, param1, param2, param3, param4, param5, param6, param7) + mav_pkt = msg.pack(mav) + return mav_pkt + +def get_MavlnikSetModeMsg(mode_id): + target_system = 1 # The system ID of your drone + target_component = 0 # The component ID of your drone + command = mavlink2.MAV_CMD_DO_SET_MODE # The command ID + confirmation = 0 # The confirmation + param1 = 1 # The new base mode (1 means custom mode) + param2 = mode_id # The new custom mode + param3 = param4 = param5 = param6 = param7 = 0 # Not used + msg = mavlink2.MAVLink_command_long_message(target_system, target_component, command, confirmation, param1, param2, param3, param4, param5, param6, param7) + mav_pkt = msg.pack(mav) + return mav_pkt + +def get_MavlinkSetArmMsg(arm): + '''create a mavlink COMMAND_LONG message to arm/disarm''' + target_system = 1 # The system ID of your drone + target_component = 0 # The component ID of your drone + command = mavlink2.MAV_CMD_COMPONENT_ARM_DISARM # The command ID + confirmation = 0 # The confirmation + param1 = 1 if arm else 0 # 1 to arm, 0 to disarm + param2 = param3 = param4 = param5 = param6 = param7 = 0 # Not used + msg = mavlink2.MAVLink_command_long_message(target_system, target_component, command, confirmation, param1, param2, param3, param4, param5, param6, param7) + mav_pkt = msg.pack(mav) + return mav_pkt + +def get_MavlinkTakeoffMsg(alt): + '''create a mavlink COMMAND_LONG message to take off''' + target_system = 1 # The system ID of your drone + target_component = 0 # The component ID of your drone + command = mavlink2.MAV_CMD_NAV_TAKEOFF # The command ID + confirmation = 0 # The confirmation + param1 = 0 # Not used + param2 = 0 # Not used + param3 = 0 # Not used + param4 = 0 # Not used + param5 = 0 # Not used + param6 = 0 # Not used + param7 = alt # The altitude to take off to + msg = mavlink2.MAVLink_command_long_message(target_system, target_component, command, confirmation, param1, param2, param3, param4, param5, param6, param7) + mav_pkt = msg.pack(mav) + return mav_pkt + +def vehicle_Task(): + print("Vehicle Task started") + + mav_pkt = get_MavlnikSetModeMsg(mavlink2.COPTER_MODE_GUIDED) + publish_MavlinkMsg(mav_pkt) + print("Changed mode to GUIDED") + + print("Arming vehicle...") + mav_pkt = get_MavlinkSetArmMsg(True) + publish_MavlinkMsg(mav_pkt) + + while not is_armed: + time.sleep(1) + + print("Vehicle armed") + + mav_pkt = get_MavlinkTakeoffMsg(10) + publish_MavlinkMsg(mav_pkt) + print("Takeoff to 10m") + + time.sleep(5) + print("Set mode to RTL") + mav_pkt = get_MavlnikSetModeMsg(mavlink2.COPTER_MODE_RTL) + publish_MavlinkMsg(mav_pkt) + + while is_armed: + time.sleep(1) + print("Vehicle disarmed, task finished") + +# Initializing a DroneCAN node instance. +node = dronecan.make_node(args.uri, node_id=args.node_id, bitrate=1000000) + +# Initializing a node monitor, so we can see what nodes are online +node_monitor = dronecan.app.node_monitor.NodeMonitor(node) + +node.periodic(1.0/args.rate, publish_NodeInfo) + +node.add_handler(dronecan.uavcan.tunnel.Targetted, handle_Targetted) + +#Create Vehicle task thread +vehicle_task_thread = threading.Thread(target=vehicle_Task) +vehicle_task_thread.start() + +# Running the node until the application is terminated or until first error. +try: + node.spin() +except KeyboardInterrupt: + pass \ No newline at end of file