Mobile wallpaper 1Mobile wallpaper 2Mobile wallpaper 3Mobile wallpaper 4Mobile wallpaper 5Mobile wallpaper 6
1935 字
10 分钟
Ros学习
2025-12-01

Ros#

一、环境安装#

1. ubuntu#

省略了

2. 一键安装 Ros(鱼香Ros)#
wget http://fishros.com/install -O fishros && . fishros
3. 妙妙小工具#
// OpenCV
sudo apt install python3-opencv
// CvBridge —— ROS 与 OpenCV 图像转换(xxx 为 Ros 的版本)
sudo apt install ros-xxx-cv-bridge
// usb 相机标准驱动
sudo apt install ros-xxx-usb-cam
ros2 run usb_cam usb_cam_node_exe

二、Ros 基础功能#

1. 工作空间#

安装 colcon 编译工具

// pip
sudo apt install python3-pip
// colcon
sudo apt install python3-colcon-ros

自动安装所需依赖项

sudo rosdepc init & rosdepc update
rosdepc install -i --from-path src --rosdistro xxx -y

① 创建 xxx_ws/src 文件夹

② 在 xxx_ws 下,colcon build 编译代码

  • build:编译中间文件
  • install:可执行文件
  • log:日志

③ 环境变量

source ~/.../xxx_ws/install/local_setup.sh 添加到 .bashrc 文件中

2. 功能包#

将不同功能代码划分,降低耦合性

① 在 xxx_ws/src 下,创建功能包

ros2 pkg create --build-type ament_cmake xxx_pkg_c
ros2 pkg create --build-type ament_python xxx_pkg_p
3. 节点#

工作细胞,独立运行的可执行文件

① 配置节点(Python)

setup.py 中

'console_scripts': [
'node_xxx = xxx_node.node_xxx:main',
......
],

② 编写节点代码

class HelloWorldNode(Node):
def __init__(self, name):
super().__init__(name) # ROS2 节点父类初始化
while rclpy.ok(): # ROS2 系统是否正常运行
self.get_logger().info("Hello World")
time.sleep(0.5)
def main(args=None):
rclpy.init(args=args) # ROS2 Python 接口初始化
node = HelloWorldNode("node_xxx") # 创建 ROS2 节点对象并进行初始化
node.destroy_node() # 销毁节点对象
rclpy.shutdown() # 关闭 ROS2 Python 接口

③ 指令

// 运行节点
ros2 run xxx_node node_xxx
// 查看运行节点列表
ros2 node list
// 查看节点详细信息
ros2 node info /node_xxx
4. 话题#

节点之间传递数据

  • 发布 - 订阅模型(Topic)
  • 异步
  • .msg 文件定义 通信消息 数据结构

① 发布者(Publisher)

class PublisherNode(Node):
def __init__(self, name):
super().__init__(name) # ROS2节点父类初始化
self.pub = self.create_publisher(String, "chatter", 10) # 创建发布者对象(消息类型、话题名、队列长度)
self.timer = self.create_timer(0.5, self.timer_callback)
def timer_callback(self):
msg = String() # 创建一个String类型的消息对象
msg.data = 'Hello World' # 填充消息对象中的消息数据
self.pub.publish(msg) # 发布话题消息
self.get_logger().info('Publishing: "%s"' % msg.data) # 输出日志信息,提示已经完成话题发布
def main(args=None):
rclpy.init(args=args) # ROS2 Python接口初始化
node = PublisherNode("topic_helloworld_pub") # 创建ROS2节点对象并进行初始化
rclpy.spin(node) # 循环等待ROS2退出
node.destroy_node() # 销毁节点对象
rclpy.shutdown() # 关闭ROS2 Python接口

② 订阅者(Subscriber)

class SubscriberNode(Node):
def __init__(self, name):
super().__init__(name) # ROS2节点父类初始化
self.sub = self.create_subscription(\
String, "chatter", self.listener_callback, 10) # 创建订阅者对象(消息类型、话题名、订阅者回调函数、队列长度)
def listener_callback(self, msg): # 创建回调函数,执行收到话题消息后对数据的处理
self.get_logger().info('I heard: "%s"' % msg.data) # 输出日志信息,提示订阅收到的话题消息
def main(args=None):
rclpy.init(args=args) # ROS2 Python接口初始化
node = SubscriberNode("topic_helloworld_sub") # 创建ROS2节点对象并进行初始化
rclpy.spin(node) # 循环等待ROS2退出
node.destroy_node() # 销毁节点对象
rclpy.shutdown() # 关闭ROS2 Python接口
5. 服务#

节点之间传递数据

  • C/S 模型
  • 同步
  • .srv 文件定义 请求应答 数据结构

① 服务端(Service)

class adderServer(Node):
def __init__(self, name):
super().__init__(name) # ROS2节点父类初始化
self.srv = self.create_service(AddTwoInts, 'add_two_ints', self.adder_callback) # 创建服务器对象(接口类型、服务名、服务器回调函数)
def adder_callback(self, request, response): # 创建回调函数,执行收到请求后对数据的处理
response.sum = request.a + request.b # 完成加法求和计算,将结果放到反馈的数据中
self.get_logger().info('Incoming request\na: %d b: %d' % (request.a, request.b)) # 输出日志信息,提示已经完成加法求和计算
return response # 反馈应答信息
def main(args=None):
rclpy.init(args=args) # ROS2 Python接口初始化
node = adderServer("service_adder_server") # 创建ROS2节点对象并进行初始化
rclpy.spin(node) # 循环等待ROS2退出
node.destroy_node() # 销毁节点对象
rclpy.shutdown() # 关闭ROS2 Python接口

② 客户端(Client)

class adderClient(Node):
def __init__(self, name):
super().__init__(name) # ROS2节点父类初始化
self.client = self.create_client(AddTwoInts, 'add_two_ints') # 创建服务客户端对象(服务接口类型,服务名)
while not self.client.wait_for_service(timeout_sec=1.0): # 循环等待服务器端成功启动
self.get_logger().info('service not available, waiting again...')
self.request = AddTwoInts.Request() # 创建服务请求的数据对象
def send_request(self): # 创建一个发送服务请求的函数
self.request.a = int(sys.argv[1])
self.request.b = int(sys.argv[2])
self.future = self.client.call_async(self.request) # 异步方式发送服务请求
def main(args=None):
rclpy.init(args=args) # ROS2 Python接口初始化
node = adderClient("service_adder_client") # 创建ROS2节点对象并进行初始化
node.send_request() # 发送服务请求
while rclpy.ok(): # ROS2系统正常运行
rclpy.spin_once(node) # 循环执行一次节点
if node.future.done(): # 数据是否处理完成
try:
response = node.future.result() # 接收服务器端的反馈数据
except Exception as e:
node.get_logger().info(
'Service call failed %r' % (e,))
else:
node.get_logger().info( # 将收到的反馈信息打印输出
'Result of add_two_ints: for %d + %d = %d' %
(node.request.a, node.request.b, response.sum))
break
node.destroy_node() # 销毁节点对象
rclpy.shutdown() # 关闭ROS2 Python接口
6. 动作#

节点之间传递数据,基于 话题 + 服务 实现

  • C/S 模型
  • 同步
  • .action 文件定义 目标结果反馈 数据结构

① 服务端

class MoveCircleActionServer(Node):
def __init__(self, name):
super().__init__(name) # ROS2节点父类初始化
self._action_server = ActionServer( # 创建动作服务器(接口类型、动作名、回调函数)
self,
MoveCircle,
'move_circle',
self.execute_callback)
def execute_callback(self, goal_handle): # 执行收到动作目标之后的处理函数
self.get_logger().info('Moving circle...')
feedback_msg = MoveCircle.Feedback() # 创建一个动作反馈信息的消息
for i in range(0, 360, 30): # 从0到360度,执行圆周运动,并周期反馈信息
feedback_msg.state = i # 创建反馈信息,表示当前执行到的角度
self.get_logger().info('Publishing feedback: %d' % feedback_msg.state)
goal_handle.publish_feedback(feedback_msg) # 发布反馈信息
time.sleep(0.5)
goal_handle.succeed() # 动作执行成功
result = MoveCircle.Result() # 创建结果消息
result.finish = True
return result # 反馈最终动作执行的结果
def main(args=None):
rclpy.init(args=args) # ROS2 Python接口初始化
node = MoveCircleActionServer("action_move_server") # 创建ROS2节点对象并进行初始化
rclpy.spin(node) # 循环等待ROS2退出
node.destroy_node() # 销毁节点对象
rclpy.shutdown() # 关闭ROS2 Python接口

② 客户端

class MoveCircleActionClient(Node):
def __init__(self, name):
super().__init__(name) # ROS2节点父类初始化
self._action_client = ActionClient( # 创建动作客户端(接口类型、动作名)
self, MoveCircle, 'move_circle')
def send_goal(self, enable): # 创建一个发送动作目标的函数
goal_msg = MoveCircle.Goal() # 创建一个动作目标的消息
goal_msg.enable = enable # 设置动作目标为使能,希望机器人开始运动
self._action_client.wait_for_server() # 等待动作的服务器端启动
self._send_goal_future = self._action_client.send_goal_async( # 异步方式发送动作的目标
goal_msg, # 动作目标
feedback_callback=self.feedback_callback) # 处理周期反馈消息的回调函数
self._send_goal_future.add_done_callback(self.goal_response_callback) # 设置一个服务器收到目标之后反馈时的回调函数
def goal_response_callback(self, future): # 创建一个服务器收到目标之后反馈时的回调函数
goal_handle = future.result() # 接收动作的结果
if not goal_handle.accepted: # 如果动作被拒绝执行
self.get_logger().info('Goal rejected :(')
return
self.get_logger().info('Goal accepted :)') # 动作被顺利执行
self._get_result_future = goal_handle.get_result_async() # 异步获取动作最终执行的结果反馈
self._get_result_future.add_done_callback(self.get_result_callback) # 设置一个收到最终结果的回调函数
def get_result_callback(self, future): # 创建一个收到最终结果的回调函数
result = future.result().result # 读取动作执行的结果
self.get_logger().info('Result: {%d}' % result.finish) # 日志输出执行结果
def feedback_callback(self, feedback_msg): # 创建处理周期反馈消息的回调函数
feedback = feedback_msg.feedback # 读取反馈的数据
self.get_logger().info('Received feedback: {%d}' % feedback.state)
def main(args=None):
rclpy.init(args=args) # ROS2 Python接口初始化
node = MoveCircleActionClient("action_move_client") # 创建ROS2节点对象并进行初始化
node.send_goal(True) # 发送动作目标
rclpy.spin(node) # 循环等待ROS2退出
node.destroy_node() # 销毁节点对象
rclpy.shutdown() # 关闭ROS2 Python接口
7. 通信接口#

通信方式的数据结构

① 话题(.msg)

# 通信数据
int32 x
int32 y

② 服务(.srv)

# 请求数据
int64 x
int64 y
---
# 应答数据
int64 s

③ 动作(.action)

# 目标
bool enable
---
# 结果
bool finish
---
# 反馈
int32 state
8. 参数#

机器人的全局字典,以 key-value 的方式存储

三、Ros 工具#

1.#
Ros学习
https://www.zangmiyu.com/posts/ros/
作者
zangmiyu
发布于
2025-12-01
许可协议
CC 脏谜语

部分信息可能已经过时