symaware.extra.ros.utils package

Submodules

symaware.extra.ros.utils.ros_client module

class symaware.extra.ros.utils.ros_client.RosClient[source]

Bases: ABC, Generic[TMessage, TSubscriber, TPublisher]

Interface for ROS clients used to communicate with a ROS network. The subclasses specialise the implementation in order to use different ROS versions. Those include roslibpy for [ROS bridge](http://wiki.ros.org/rosbridge_suite), rospy for [ROS1 Python client library](http://wiki.ros.org/rospy) and rclpy for [ROS2 Python client library](https://github.com/ros2/rclpy).

Using this class allows to abstract the communication with the ROS network, making it easier to switch between different ROS versions.

Warning

The message types used in the topics must be compatible with the ROS version used. Use the message types from the libraries if you are using rospy or rclpy. Use the message types provided by this package if you are using roslibpy.

connect()[source]

Connect to the ROS network. Override in subclasses.

disconnect()[source]

Disconnect from the ROS network. Override in subclasses.

abstractmethod publish(publisher, message)[source]

Publish the given message to the provided publisher.

Parameters:
  • publisher (TypeVar(TPublisher)) – Publisher to use to send the message

  • message (TypeVar(TMessage)) – Message to publish to the ROS topics

abstractmethod publish_topic(topic_name, message_type, compression=None, latch=False, throttle_rate=0, queue_size=0, reconnect_on_close=True)[source]

Create and advertise a new ROS topic with the given name and message_type.

Parameters:
  • topic_name (str) – ROS topic the sender publish to, e.g. /my_topic

  • message_type (type[TypeVar(TMessage)]) – Type of messages the sender will publish, e.g. std_msgs/String

  • compression (Any, default: None) – Type of compression to use, e.g. png

  • latch (bool, default: False) – True to latch the topic when publishing, False otherwise

  • throttle_rate (int, default: 0) – Rate (in ms between messages) at which to throttle the topics

  • queue_size (int, default: 0) – Queue size created at bridge side for re-publishing webtopics

  • reconnect_on_close (bool, default: True) – Reconnect topic (both publisher & subscriber) on reconnection

Return type:

TypeVar(TPublisher)

property publishers

List of created publishers.

abstractmethod subscribe_topic(topic_name, message_type, callback, compression=None, throttle_rate=0, queue_size=0, reconnect_on_close=True)[source]

Subscribe to the ROS topic with the given name and message_type.

Parameters:
  • topic_name (str) – ROS topic the sender publish to, e.g. /my_topic

  • message_type (type[TypeVar(TMessage)]) – Type of messages the sender will publish, e.g. std_msgs/String

  • compression (Any, default: None) – Type of compression to use, e.g. png

  • throttle_rate (int, default: 0) – Rate (in ms between messages) at which to throttle the topics

  • queue_size (int, default: 0) – Queue length at bridge side used when subscribing

  • reconnect_on_close (bool, default: True) – Reconnect topic (both publisher & subscriber) on reconnection

Return type:

TypeVar(TSubscriber)

property subscribers

List of created subscribers.

symaware.extra.ros.utils.tools module

symaware.extra.ros.utils.tools.azimuth(source_pos, target_pos, source_heading=0)[source]

Calculate relative angle from source to target accounting for source heading.

symaware.extra.ros.utils.tools.calculate_curvature(points)[source]

Calculate curvature from three points using triangle area formula. Returns value clipped to [0, 1].

Return type:

float

symaware.extra.ros.utils.tools.clip(x, min_value, max_value)[source]

Clip value x between min_value and max_value.

Return type:

TypeVar(T)

symaware.extra.ros.utils.tools.dist(source_pos, target_pos)[source]

Calculate Euclidean distance between two positions.

Return type:

float

symaware.extra.ros.utils.tools.euler_to_quaternion(roll, pitch, yaw, degrees=False)[source]

Convert Euler angles to quaternion representation.

Parameters:
  • roll (float) – Roll angle

  • pitch (float) – Pitch angle

  • yaw (float) – Yaw angle

  • degrees (bool, default: False) – If True, input angles are in degrees, otherwise radians

Returns:

ndarray – Quaternion as array (x, y, z, w)

symaware.extra.ros.utils.tools.position(source_pos, distance, az)[source]

Calculate the position of a point given a source position, distance and azimuth.

Parameters:
  • source_pos (Iterable[float]) – Position of the source point

  • distance (Iterable[float]) – Distance from the source point

  • az (float) – Azimuth from the source point

Returns:

Iterable[float] – Position of the point

symaware.extra.ros.utils.tools.quaternion_to_euler(x, y, z, w, degrees=False)[source]

Convert quaternion to Euler angle representation.

Parameters:
  • x (float) – Quaternion x component

  • y (float) – Quaternion y component

  • z (float) – Quaternion z component

  • w (float) – Quaternion w component

  • degrees (bool, default: False) – If True, output angles are in degrees, otherwise radians

Returns:

ndarray – Euler angles as array (roll, pitch, yaw)

symaware.extra.ros.utils.tools.scale(arr, low=-1.0, high=1.0, clip_min=-1.0, clip_max=1.0)[source]

Clip array to [clip_min, clip_max] then scale to [low, high].

Return type:

ndarray

symaware.extra.ros.utils.tools.wrap_angle(angle, wrap_offset=-3.141592653589793, degrees=False)[source]

Wrap angle to range [wrap_offset, wrap_offset + 2π] or degree equivalent.

Module contents