-
Notifications
You must be signed in to change notification settings - Fork 0
/
ros2_range_2_points_xyz.py
99 lines (77 loc) · 3 KB
/
ros2_range_2_points_xyz.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
#!/usr/bin/env python
# Software License Agreement (BSD License)
#
# Copyright (c) 2024, Ouster, Inc.
# All rights reserved.
"""
Produce the xyz point cloud from the range_image topic.
Author: Ussama Naal
"""
import rclpy
from rclpy.node import Node
from std_msgs.msg import Header, String
from sensor_msgs.msg import Image
from cv_bridge import CvBridge
from rclpy.qos import QoSProfile, ReliabilityPolicy, DurabilityPolicy, LivelinessPolicy
from sensor_msgs.msg import PointCloud2, PointField
import point_cloud2 as pc2
from ouster import client
class ImageSubscriberNode(Node):
def __init__(self):
super().__init__('image_subscriber')
self.br = CvBridge()
qos_profile = QoSProfile(
reliability=ReliabilityPolicy.RMW_QOS_POLICY_RELIABILITY_RELIABLE,
durability=DurabilityPolicy.RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL,
liveliness=LivelinessPolicy.RMW_QOS_POLICY_LIVELINESS_AUTOMATIC,
depth=5
)
self.metadata_sub = self.create_subscription(
String,
'/ouster/metadata',
self.metadata_callback,
qos_profile=qos_profile)
def metadata_callback(self, metadata):
self.sensor_info = client.SensorInfo(s=metadata.data)
self.xyzlut = client.XYZLut(self.sensor_info)
self.create_image_subscriber()
def create_image_subscriber(self):
qos_profile = QoSProfile(
reliability=ReliabilityPolicy.RMW_QOS_POLICY_RELIABILITY_BEST_EFFORT,
durability=DurabilityPolicy.RMW_QOS_POLICY_DURABILITY_VOLATILE,
liveliness=LivelinessPolicy.RMW_QOS_POLICY_LIVELINESS_AUTOMATIC,
depth=5
)
self.range_image_sub = self.create_subscription(
Image,
'/ouster/range_image',
self.image_callback,
qos_profile=qos_profile)
self.publisher = self.create_publisher(
PointCloud2,
'/ouster/points_xyz',
qos_profile=qos_profile)
def image_callback(self, msg):
cv_image = self.br.imgmsg_to_cv2(msg)
cv_image *= 4 # this is needed because we truncate the values
# xyzlut seems to destagger by default so we re-stagger
cv_image2 = client.destagger(self.sensor_info, cv_image, inverse=True)
xyz = self.xyzlut(cv_image2)
fields = [
PointField(name='x', offset=0,
datatype=PointField.FLOAT32, count=1),
PointField(name='y', offset=4,
datatype=PointField.FLOAT32, count=1),
PointField(name='z', offset=8,
datatype=PointField.FLOAT32, count=1)
]
header = Header(stamp=msg.header.stamp, frame_id="os_sensor")
pc_msg = pc2.create_cloud(header, fields, points=xyz.reshape((-1, 3)))
self.publisher.publish(pc_msg)
def main(args=None):
rclpy.init(args=args)
node = ImageSubscriberNode()
rclpy.spin(node)
rclpy.shutdown()
if __name__ == '__main__':
main()