forked from minhnh/mas_perception_libs
-
Notifications
You must be signed in to change notification settings - Fork 8
/
cloud_processing_python_test
executable file
·79 lines (63 loc) · 3.61 KB
/
cloud_processing_python_test
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
#!/usr/bin/env python
import rospy
from tf import TransformListener
from dynamic_reconfigure.server import Server as ParamServer
from sensor_msgs.msg import PointCloud2
from visualization_msgs.msg import Marker
from mas_perception_libs.utils import PlaneSegmenter, transform_cloud_with_listener
from mas_perception_libs.cfg import PlaneFittingConfig
from mas_perception_libs.visualization import plane_msg_to_marker
class CloudProcessingTest(object):
_extract_planes = None # type: bool
_target_frame = None # type: str
_cloud_sub = None # type: rospy.Subscriber
_processed_cloud_pub = None # type: rospy.Publisher
_plane_marker_pub = None # type: rospy.Publisher
_plane_segmenter = None # type: PlaneSegmenter
_plane_fitting_param_server = None # type: ParamServer
_tf_listener = None # type: TransformListener
def __init__(self, cloud_topic, processed_cloud_topic, target_frame, extract_planes):
self._extract_planes = extract_planes
self._target_frame = target_frame
self._cloud_sub = rospy.Subscriber(cloud_topic, PointCloud2, self._cloud_cb)
self._processed_cloud_pub = rospy.Publisher(processed_cloud_topic, PointCloud2, queue_size=1)
self._plane_marker_pub = rospy.Publisher('plane_convex_hull', Marker, queue_size=1)
self._plane_segmenter = PlaneSegmenter()
self._plane_fitting_param_server = ParamServer(PlaneFittingConfig, self._plane_fitting_config_cb)
self._tf_listener = TransformListener()
def _cloud_cb(self, cloud_msg):
# don't process cloud if there's no subscriber
if self._processed_cloud_pub.get_num_connections() < 1:
return
transformed_cloud = transform_cloud_with_listener(cloud_msg, self._target_frame, self._tf_listener)
if self._extract_planes:
plane_list, processed_cloud = self._plane_segmenter.find_planes(transformed_cloud)
if len(plane_list.planes) != 0:
plane_point = plane_list.planes[0].plane_point
coefficients = plane_list.planes[0].coefficients
rospy.loginfo('plane point: x = %.3f, y = %.3f, z = %.3f; normal: x = %.3f, y = %.3f, z = %.3f'
% (plane_point.x, plane_point.y, plane_point.z,
coefficients[0], coefficients[1], coefficients[2]))
if self._plane_marker_pub.get_num_connections() > 0:
marker = plane_msg_to_marker(plane_list.planes[0], 'plane_convex')
self._plane_marker_pub.publish(marker)
else:
rospy.logerr('found no plane')
else:
processed_cloud = self._plane_segmenter.filter_cloud(transformed_cloud)
self._processed_cloud_pub.publish(processed_cloud)
def _plane_fitting_config_cb(self, config, _):
self._plane_segmenter.set_params(config)
return config
if __name__ == '__main__':
rospy.init_node('~cloud_processing_python_test')
# get cloud topic
param_cloud_topic = rospy.get_param('~cloud_topic', None)
if not param_cloud_topic:
raise ValueError('param "cloud_topic" not specified')
param_processed_cloud_topic = rospy.get_param('~processed_cloud_topic', 'processed_cloud')
param_target_frame = rospy.get_param('~target_frame', '/base_link')
# get option to extract plane or not
param_extract_planes = rospy.get_param('~extract_planes', False)
_ = CloudProcessingTest(param_cloud_topic, param_processed_cloud_topic, param_target_frame, param_extract_planes)
rospy.spin()