-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathread_cf_uwb.py
executable file
·67 lines (52 loc) · 2.1 KB
/
read_cf_uwb.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
#!/usr/bin/env python3
# coding:utf-8
# Peixuan Shu
# First created on 5 Jan. 2023
# Read USB connected crazyflie UWB pose by cflib and publish it in a ROS topic
# Note: cflib is in python3:
# pip3 install cflib # should > 0.1.20
# Get UGV pos from the attached crazyflie uwb:
import logging
import cflib.crtp
from cflib.crazyflie import Crazyflie
from cflib.crazyflie.log import LogConfig
from cflib.crazyflie.syncCrazyflie import SyncCrazyflie
from cflib.crazyflie.syncLogger import SyncLogger
from cflib.utils import uri_helper
import rospy
import geometry_msgs.msg
# URI to the Crazyflie to connect to
uri = uri_helper.uri_from_env(default='usb://0')
# Only output errors from the logging framework
logging.basicConfig(level=logging.ERROR)
def log_stab_callback(timestamp, data, logconf):
# print('[%d][%s]: %s' % (timestamp, logconf.name, data))
msg = geometry_msgs.msg.PoseStamped()
msg.pose.position.x = data['stateEstimateZ.x'] # mm
msg.pose.position.y = data['stateEstimateZ.y'] # mm
msg.pose.position.z = data['stateEstimateZ.z'] # mm
PosePublisher.publish(msg)
def simple_log_async(scf, logconf):
cf = scf.cf
cf.log.add_config(logconf)
logconf.data_received_cb.add_callback(log_stab_callback)
logconf.start()
# time.sleep(5)
# logconf.stop()
rospy.spin() # block for other threads active
if __name__ == '__main__':
# Initialize the low-level drivers
cflib.crtp.init_drivers()
lg_stab = LogConfig(name='stateEstimateZ', period_in_ms=20)
lg_stab.add_variable('stateEstimateZ.x', 'int16_t') # mm
lg_stab.add_variable('stateEstimateZ.y', 'int16_t') # mm
lg_stab.add_variable('stateEstimateZ.z', 'int16_t') # mm
# group = 'stateEstimateZ'
# name = 'estimator'
rospy.init_node("read_cf_uwb", anonymous=False)
topic_name = "/car20/pos"
PosePublisher = rospy.Publisher(topic_name, geometry_msgs.msg.PoseStamped, queue_size=1)
with SyncCrazyflie(uri, cf=Crazyflie(rw_cache='./cache')) as scf:
print("Yeah, I'm connected!")
print("check {0} streaming!".format(topic_name))
simple_log_async(scf, lg_stab)