pymavswarm package#

Subpackages#

Submodules#

pymavswarm.agent module#

class pymavswarm.agent.Agent(system_id: int, component_id: int, timeout_period: float = 30.0, max_params_stored: int = 5)#

Bases: object

Swarm agent.

Agent represents and stores the state of an agent in the network. The agent’s state is updated as new MAVLink messages are received from the associated message.

property acceleration: Acceleration#

Acceleration of the agent.

Returns

agent acceleration

Return type

swarm_state.Acceleration

property armed: Generic#

Arm state of an agent.

Returns

agent’s arm state

Return type

bool

property attitude: Attitude#

Attitude of the agent.

Returns

agent’s attitude

Return type

Attitude

property battery: Battery#

Battery state of the agent.

Returns

agent battery state

Return type

Battery

property clock_offset: Generic#

Clock offset from the global clock.

Returns

clock offset

Return type

Generic

property component_id: int#

Component ID of the agent.

Returns

component ID

Return type

int

compute_reachable_set(position_error: float, velocity_error: float, reach_time: float, initial_step_size: float = 0.5, reach_timeout: float = 0.001) tuple[pymavswarm.safety.hyperrectangle.HyperRectangle, float]#

Compute the current reachable set of the agent.

In the current state of the system, the reachable set is only computed using the global frame. In the future, if transformations between frames is implemented, reachable set computation for multiple frames may be supported.

Parameters
  • position_error (float) – 3D position error to account for in the measurement

  • velocity_error (float) – 3D velocity error to account for in the measurement

  • reach_time (float) – time that the reachable set should reach forward to

  • initial_step_size (float, optional) – initial step to step forward when performing face lifting (lower means higher accuracy but slower; higher means lower accuracy but faster), defaults to 0.5

  • reach_timeout (float, optional) – maximum amount of time to spend computing the reachable set [s], defaults to 0.001

Returns

reachable set for the agent, time that the reachable set reaches to from the start time

Return type

tuple[HyperRectangle, float]

property current_waypoint: Generic#

Index of the current waypoint that an agent is maneuvering to.

Returns

current target waypoint

Return type

int

property docker_info: DockerInfo#

Deployed docker image info.

Returns

agent docker information, if any

Return type

DockerInfo

property ekf: EKFStatus#

EKF status of the agent.

Returns

agent EKF

Return type

EKFStatus

property gps_info: GPSInfo#

Agent GPS information.

Returns

agent GPS information

Return type

GPSInfo

property home_position: Vector#

Home position of the agent.

Returns

agent’s home position

Return type

Position

property hrl_state: Generic#

HRL state that the agent is in.

Returns

hrl state

Return type

str

property last_heartbeat: Any#

Timestamp of the last heartbeat message received.

Returns

last heartbeat timestamp

Return type

Any

property last_params_read: ParameterList#

Circle buffer containing the most recent parameters read and their values.

Returns

parameters read from the agent

Return type

deque

property mission: SwarmMission#

Mission being completed by an agent.

Returns

agent’s mission

Return type

Mission

property mode: Generic#

Flight mode that an agent is operating in.

Returns

agent’s flight mode

Return type

str

property ping: Generic#

Agent latency [ms].

Returns

latency

Return type

Generic

property position: Position#

Position of an agent.

Returns

agent position

Return type

Position

property system_id: int#

System ID of the agent.

Returns

system ID

Return type

int

property system_status: Generic#

System-level status of an agent.

Returns

agent system status

Return type

str

property telemetry: Telemetry#

Telemetry status information.

Returns

telemetry information

Return type

Telemetry

property timeout: Generic#

Timeout state of the agent.

Returns

whether or not the agent is timed out

Return type

bool

property timeout_period: Generic#

Max time between heartbeat messages.

Returns

max spacing between heartbeat messages

Return type

float

property vehicle_type: Generic#

Type of vehicle that the agent exists as.

Returns

agent vehicle type

Return type

str

property velocity: Velocity#

Velocity of the agent.

return: agent velocity :rtype: Velocity

pymavswarm.connection module#

class pymavswarm.connection.Connection(log_level: int = 20)#

Bases: object

Handle interaction with the network and the MAVLink connection.

connect(port: str, baudrate: int, source_system: int, source_component: int, connection_attempt_timeout: float) bool#

Establish a MAVLink connection.

Attempt to establish a MAVLink connection using the provided configurations.

Parameters
  • port (str) – serial port to attempt connection on

  • baudrate (int) – serial connection baudrate

  • source_system (int) – system ID for the source system

  • source_component (int) – component ID for the source system

  • connection_attempt_timeout (float) – maximum time taken to establish a connection

Returns

flag indicating whether connection was successful

Return type

bool

property connected: bool#

Mavlink connection status.

Returns

connection status

Return type

bool

disconnect() None#

Close the connection and stop all threads.

Mavlink connection.

Returns

mavlink connection

Return type

Any

pymavswarm.mavswarm module#

class pymavswarm.mavswarm.MavSwarm(max_workers: int = 5, log_level: int = 20, log_to_file: bool = False, log_filename: Optional[str] = None, ignore_ids: Optional[list[tuple[int | None, int | None]]] = None)#

Bases: object

User-facing pymavswarm interface.

Provides an interface for controlling robot swarms, obtaining swarm state, and configuring agents.

COLLISION_RESPONSE_FORCE_DISARM = 4#
COLLISION_RESPONSE_LAND = 1#
COLLISION_RESPONSE_LOITER = 3#
COLLISION_RESPONSE_NONE = 0#
COLLISION_RESPONSE_RTL = 2#
GLOBAL_FRAME = 0#
GLOBAL_RELATIVE_FRAME = 10#
LOCAL_FRAME = 1#
accelerometer_calibration(simple_calibration: bool = True, agent_ids: Optional[list[tuple[int, int]]] = None, retry: bool = False, message_timeout: float = 2.5, ack_timeout: float = 0.5) Future#

Perform accelerometer calibration on the specified agents.

If the target agent IDs are not provided, the system will attempt to perform accelerometer calibration on all swarm agents.

Parameters
  • simple_calibration (bool, optional) – perform simple accelerometer calibration, defaults to True

  • agent_ids (list[AgentID] | None, optional) – optional list of target agent IDs, defaults to None

  • retry (bool, optional) – retry accelerometer calibration on an agent on calibration failure, defaults to False

  • message_timeout (float, optional) – maximum amount of time allowed to try accelerometer calibration on an agent before a timeout occurs, defaults to 2.5 [s]

  • ack_timeout (float, optional) – maximum amount of time allowed per attempt to verify acknowledgement of a accelerometer calibration attempt, defaults to 0.5 [s]

Returns

future message response, if any

Return type

Future

add_agent(agent: Agent) None#

Manually add an agent to the swarm.

This is useful if an agent isn’t automatically recognized, but commands still need to be sent to this agent.

Parameters

agent (Agent) – agent to add

add_custom_message_handler(message: str, callback: Callable[[Any, dict[tuple[int, int], pymavswarm.agent.Agent]], None]) None#

Add a custom message handler for the specified message.

Parameters
  • message (str) – message type to call the handler on

  • callback (MessageHandler) – function to call when the message is received

property agent_ids: list[tuple[int, int]]#

List of agent IDs in the swarm.

Returns

list of agent IDs

Return type

list[AgentID]

property agent_list_changed: Event#

Event signaling that the list of agents in the swarm has changed.

Returns

event

Return type

Event

property agents: list[pymavswarm.agent.Agent]#

List of agents in the swarm.

Returns

swarm agents

Return type

list[Agent]

airspeed_calibration(agent_ids: Optional[list[tuple[int, int]]] = None, retry: bool = False, message_timeout: float = 2.5, ack_timeout: float = 0.5) Future#

Perform airspeed calibration on the specified agents.

If the target agent IDs are not provided, the system will attempt to perform airspeed calibration on all swarm agents.

Parameters
  • agent_ids (list[AgentID] | None, optional) – optional list of target agent IDs, defaults to None

  • retry (bool, optional) – retry airspeed calibration on an agent on calibration failure, defaults to False

  • message_timeout (float, optional) – maximum amount of time allowed to try airspeed calibration on an agent before a timeout occurs, defaults to 2.5 [s]

  • ack_timeout (float, optional) – maximum amount of time allowed per attempt to verify acknowledgement of a airspeed calibration attempt, defaults to 0.5 [s]

Returns

future message response, if any

Return type

Future

arm(agent_ids: Optional[list[tuple[int, int]]] = None, retry: bool = False, message_timeout: float = 2.5, ack_timeout: float = 0.5, verify_state: bool = False, verify_state_timeout: float = 1.0) Future#

Arm the desired agents.

If the target agent IDs are not provided, the system will attempt to arm all agents in the swarm.

Parameters
  • agent_ids (list[AgentID] | None, optional) – optional list of target agent IDs, defaults to None

  • retry (bool, optional) – retry arming an agent on failure, defaults to False

  • message_timeout (float, optional) – maximum amount of time allowed to try arming an agent before a timeout occurs, defaults to 2.5 [s]

  • ack_timeout (float, optional) – maximum amount of time allowed per attempt to verify acknowledgement of an arming attempt, defaults to 0.5 [s]

  • verify_state (bool, optional) – flag indicating whether or not the system should attempt to verify that the agent switched into the armed state, defaults to False

  • verify_state_timeout (float, optional) – maximum amount of time allowed per attempt to verify that an agent is in the armed state, defaults to 1.0 [s]

Returns

future message response, if any

Return type

Future

barometer_temperature_calibration(agent_ids: Optional[list[tuple[int, int]]] = None, retry: bool = False, message_timeout: float = 2.5, ack_timeout: float = 0.5) Future#

Perform barometer temperature calibration on the specified agents.

If the target agent IDs are not provided, the system will attempt to perform barometer calibration on all swarm agents.

Parameters
  • agent_ids (list[AgentID] | None, optional) – optional list of target agent IDs, defaults to None

  • retry (bool, optional) – retry barometer temperature calibration on an agent on calibration failure, defaults to False

  • message_timeout (float, optional) – maximum amount of time allowed to try barometer temperature calibration on an agent before a timeout occurs, defaults to 2.5 [s]

  • ack_timeout (float, optional) – maximum amount of time allowed per attempt to verify acknowledgement of a barometer temperature calibration attempt, defaults to 0.5 [s]

Returns

future message response, if any

Return type

Future

connect(port: str, baudrate: Optional[int] = None, source_system: int = 255, source_component: int = 0, connection_attempt_timeout: float = 2.0) bool#

Connect to the network.

Attempt to establish a MAVLink connection using the provided configurations.

Parameters
  • port (str) – port over which a connection should be established

  • baud (int) – baudrate that a connection should be established with

  • source_system (int, optional) – system ID of the connection, defaults to 255

  • source_component (int, optional) – component ID of the connection, defaults to 0

  • connection_attempt_timeout (float, optional) – maximum amount of time allowed to attempt to establish a connection, defaults to 2.0 [s]

Returns

whether or not the connection attempt was successful

Return type

bool

property connected: bool#

Flag indicating whether there is an active MAVLink connection.

Returns

flag

Return type

bool

disable_collision_avoidance() None#

Disable collision detection.

disarm(agent_ids: Optional[list[tuple[int, int]]] = None, retry: bool = False, message_timeout: float = 2.5, ack_timeout: float = 0.5, verify_state: bool = False, verify_state_timeout: float = 1.0, force: bool = False) Future#

Disarm the desired agents.

If the target agent IDs are not provided, the system will attempt to disarm all agents in the swarm.

Parameters
  • agent_ids (list[AgentID] | None, optional) – optional list of target agent IDs, defaults to None

  • retry (bool, optional) – retry disarming an agent on failure, defaults to False

  • message_timeout (float, optional) – maximum amount of time allowed to try disarming an agent before a timeout occurs, defaults to 2.5 [s]

  • ack_timeout (float, optional) – maximum amount of time allowed per attempt to verify acknowledgement of a disarming attempt, defaults to 0.5 [s]

  • verify_state (bool, optional) – flag indicating whether or not the system should attempt to verify that the agent switched into the disarmed state, defaults to False

  • verify_state_timeout (float, optional) – maximum amount of time allowed per attempt to verify that an agent is in the disarmed state, defaults to 1.0 [s]

  • force (bool, optional) – force disarm the agents, defaults to False

Returns

future message response, if any

Return type

Future

disconnect() None#

Disconnect from the MAVLink network and shutdown all services.

enable_collision_avoidance(reach_time: float, position_error: float, velocity_error: float, collision_response: int, use_latency: bool = True, initial_step_size: float = 0.5, reach_timeout: float = 0.001, retry_collision_response: bool = True, verify_collision_response_state: bool = True, max_time_difference: float = 2.0) None#

Enable collision avoidance between agents.

WARNING: THIS IS A HIGHLY EXPERIMENTAL FEATURE!

Please use extreme caution when using collision avoidance! The pymavswarm team does not take any credit for any collisions that occur when using this feature.

Parameters
  • reach_time (float) – amount of time to project forward when computing the reachable states of agents [s]

  • position_error (float) – 3D position error of each agent [m]

  • velocity_error (float) – 3D velocity error of each agent [m/s]

  • collision_response (int) – collision response to execute when a potential collision is detected

  • use_latency (bool, optional) – integrate the current communications latency between the agents into the reach time calculation, defaults to True

  • initial_step_size (float, optional) – initial step to step forward when performing face lifting (lower means higher accuracy but slower; higher means lower accuracy but faster), defaults to 0.5

  • reach_timeout (float, optional) – maximum amount of time to spend computing the reachable set [s], defaults to 0.001

  • retry_collision_response (bool, optional) – retry sending a collision response if an agent does not acknowledge the collision response command, defaults to True

  • verify_collision_response_state (bool, optional) – verify that an agent properly changes states after receiving a collision response command, defaults to True

  • max_time_difference (float, optional) – max difference between agent timestamps before the state is considered stale and not checked [s], defaults to 2.0

get_agent_by_id(agent_id: tuple[int, int]) pymavswarm.agent.Agent | None#

Get the agent with the specified ID (system ID, component ID).

Parameters

agent_id (AgentID) – (system ID, component ID)

Returns

agent, if found

Return type

Agent | None

get_home_position(agent_ids: Optional[list[tuple[int, int]]] = None, retry: bool = False, message_timeout: float = 2.5, ack_timeout: float = 0.5) Future#

Get the current home position of the swarm agents.

If the target agent IDs are not provided, the system will attempt to get the home position of all swarm agents.

The home position will be updated in the agent’s home position property.

Parameters
  • agent_ids (list[AgentID] | None, optional) – optional list of target agent IDs, defaults to None

  • retry (bool, optional) – retry getting the home position of an agent on failure, defaults to False

  • message_timeout (float, optional) – maximum amount of time allowed to try getting the home position of an agent before a timeout occurs, defaults to 2.5 [s]

  • ack_timeout (float, optional) – maximum amount of time allowed per attempt to verify acknowledgement of the get home position request message, defaults to 0.5 [s]

Returns

future message response, if any

Return type

Future

goto(x: float = 0, y: float = 0, z: float = 0, hold: float = 0, agent_ids: Optional[list[tuple[int, int]]] = None, retry: bool = False, message_timeout: float = 2.5, ack_timeout: float = 0.5, config_file: Optional[str] = None, frame: int = 10) Future#

Command the agents to go to the desired location.

Use this cautiously. If multiple agents fly to the same location, there may be a collision at this location.

If using a configuration file for pre-planned goto execution, the system will only send commands to the agents with locations specified in the file.

Parameters
  • x (float, optional) – target x position in the specified frame, defaults to 0

  • y (float, optional) – target y position in the specified frame, defaults to 0

  • z (float, optional) – target z position in the specified frame, defaults to 0

  • hold (float, optional) – time to stay at waypoint for rotary wing (ignored by fixed wing), defaults to 0

  • agent_ids (list[AgentID] | None, optional) – optional list of target agent IDs, defaults to None

  • retry (bool, optional) – retry commanding the desired agents to go to the desired location on acknowledgement failure, defaults to False

  • message_timeout (float, optional) – maximum amount of time allowed to try commanding an agent to fly to the specified location before a timeout occurs, defaults to 2.5 [s]

  • ack_timeout (float, optional) – maximum amount of time allowed per attempt to verify acknowledgement of the goto message, defaults to 0.5 [s]

  • config_file (str | None, optional) – full path to file with pre-planned goto locations, defaults to None

  • frame (int, optional) – coordinate frame that the x, y, and z positions are provided in, defaults to GLOBAL_FRAME_RELATIVE

Returns

future message response, if any

Return type

Future

ground_pressure_calibration(agent_ids: Optional[list[tuple[int, int]]] = None, retry: bool = False, message_timeout: float = 2.5, ack_timeout: float = 0.5) Future#

Perform ground pressure calibration on the specified agents.

If the target agent IDs are not provided, the system will attempt to perform ground pressure calibration on all swarm agents.

Parameters
  • agent_ids (list[AgentID] | None, optional) – optional list of target agent IDs, defaults to None

  • retry (bool, optional) – retry ground pressure calibration on an agent on calibration failure, defaults to False

  • message_timeout (float, optional) – maximum amount of time allowed to try ground pressure calibration on an agent before a timeout occurs, defaults to 2.5 [s]

  • ack_timeout (float, optional) – maximum amount of time allowed per attempt to verify acknowledgement of a ground pressure calibration attempt, defaults to 0.5 [s]

Returns

future message response, if any

Return type

Future

gyroscope_calibration(agent_ids: Optional[list[tuple[int, int]]] = None, retry: bool = False, message_timeout: float = 2.5, ack_timeout: float = 0.5) Future#

Perform gyroscope calibration on the specified agents.

If the target agent IDs are not provided, the system will attempt to perform gyroscope calibration on all swarm agents.

Parameters
  • agent_ids (list[AgentID] | None, optional) – optional list of target agent IDs, defaults to None

  • retry (bool, optional) – retry gyroscope calibration on an agent on calibration failure, defaults to False

  • message_timeout (float, optional) – maximum amount of time allowed to try gyroscope calibration on an agent before a timeout occurs, defaults to 2.5 [s]

  • ack_timeout (float, optional) – maximum amount of time allowed per attempt to verify acknowledgement of a gyroscope calibration attempt, defaults to 0.5 [s]

Returns

future message response, if any

Return type

Future

handle_collision(colliding_agents: list[tuple[int, int]], collision_response: int, retry: bool = True, verify_state: bool = True) None#

Execute a collision response on the colliding agents.

Parameters
  • colliding_agents (list[AgentID]) – agents that are on track to collide

  • collision_response (int) – behavior that should be executed on the colliding agents; options are available as MavSwarm constants

  • retry (bool, optional) – retry sending the collision response on failure, defaults to True

  • verify_state (bool, optional) – verify that the agent switched into the collision response mode, defaults to True

Raises

ValueError – an invalid collision response was provided

magnetometer_calibration(agent_ids: Optional[list[tuple[int, int]]] = None, retry: bool = False, message_timeout: float = 2.5, ack_timeout: float = 0.5) Future#

Perform magnetometer calibration on the specified agents.

If the target agent IDs are not provided, the system will attempt to perform magnetometer calibration on all swarm agents.

Parameters
  • agent_ids (list[AgentID] | None, optional) – optional list of target agent IDs, defaults to None

  • retry (bool, optional) – retry magnetometer calibration on an agent on calibration failure, defaults to False

  • message_timeout (float, optional) – maximum amount of time allowed to try magnetometer calibration on an agent before a timeout occurs, defaults to 2.5 [s]

  • ack_timeout (float, optional) – maximum amount of time allowed per attempt to verify acknowledgement of a magnetometer calibration attempt, defaults to 0.5 [s]

Returns

future message response, if any

Return type

Future

parse_yaml_mission(config_file: str) dict#

Parse a pre-planned trajectory/mission.

Parameters

config_file (str) – configuration file to parse

Returns

parsed configuration file

Return type

dict

read_parameter(parameter_id: str, agent_ids: Optional[list[tuple[int, int]]] = None, retry: bool = False, message_timeout: float = 2.5, ack_timeout: float = 0.5) Future#

Read the parameter value from the desired agents.

Parameters
  • parameter_id (str) – ID of the parameter to read

  • agent_ids (list[AgentID] | None, optional) – optional list of target agent IDs, defaults to None

  • retry (bool, optional) – retry reading the parameter on an agent when parameter read fails, defaults to False

  • message_timeout (float, optional) – maximum amount of time allowed to try reading the value of the parameter on an agent before a timeout occurs, defaults to 2.5 [s]

  • ack_timeout (float, optional) – maximum amount of time allowed per attempt to verify acknowledgement of the read parameter message, defaults to 0.5 [s]

Returns

future message response, if any

Return type

Future

reboot(agent_ids: Optional[list[tuple[int, int]]] = None, retry: bool = False, message_timeout: float = 2.5, ack_timeout: float = 0.5) Future#

Reboot the desired agents.

If the target agent IDs are not provided, the system will attempt to reboot all agents in the swarm.

Parameters
  • agent_ids (list[AgentID] | None, optional) – optional list of target agent IDs, defaults to None

  • retry (bool, optional) – retry rebooting an agent on failure, defaults to False

  • message_timeout (float, optional) – maximum amount of time allowed to try rebooting an agent before a timeout occurs, defaults to 2.5 [s]

  • ack_timeout (float, optional) – maximum amount of time allowed per attempt to verify acknowledgement of a reboot attempt, defaults to 0.5 [s]

Returns

future message response, if any

Return type

Future

remove_agent(agent: Agent) None#

Manually remove an agent from the swarm.

Parameters

agent (Agent) – agent to remove

send_debug_message(name: str, value: int | float | list, agent_ids: Optional[list[tuple[int, int]]] = None, retry: bool = False, message_timeout: float = 2.5, ack_timeout: float = 0.5) Future#

Send a debug message to the specified agents.

If the target agent IDs are not provided, the system will attempt to send the debug message to all swarm agents.

Parameters
  • name (str) – debug message name

  • value (int | float | list) – debug message value

  • agent_ids (list[AgentID] | None, optional) – optional list of target agent IDs, defaults to None

  • retry (bool, optional) – retry sending the debug message to an agent on failure, defaults to False

  • message_timeout (float, optional) – maximum amount of time allowed to try sending the debug message to an agent before a timeout occurs, defaults to 2.5 [s]

  • ack_timeout (float, optional) – maximum amount of time allowed per attempt to verify acknowledgement of the debug message, defaults to 0.5 [s]

Returns

future message response, if any

Return type

Future

set_airspeed(speed: float, agent_ids: Optional[list[tuple[int, int]]] = None, retry: bool = False, message_timeout: float = 2.5, ack_timeout: float = 0.5) Future#

Set the airspeed of the desired agents.

If the target agent IDs are not provided, the system will attempt to set the airspeed of all agents in the swarm.

Parameters
  • speed (float) – target airspeed [m/s]

  • agent_ids (list[AgentID] | None, optional) – optional list of target agent IDs, defaults to None

  • retry (bool, optional) – retry setting the airspeed of an agent on failure, defaults to False

  • message_timeout (float, optional) – maximum amount of time allowed to try setting the airspeed of an agent before a timeout occurs, defaults to 2.5 [s]

  • ack_timeout (float, optional) – maximum amount of time allowed per attempt to verify acknowledgement of a airspeed change attempt, defaults to 0.5 [s]

Returns

future message response, if any

Return type

Future

set_groundspeed(speed: float, agent_ids: Optional[list[tuple[int, int]]] = None, retry: bool = False, message_timeout: float = 2.5, ack_timeout: float = 0.5) Future#

Set the groundspeed of the desired agents.

If the target agent IDs are not provided, the system will attempt to set the groundspeed of all agents in the swarm.

Parameters
  • speed (float) – target groundspeed [m/s]

  • agent_ids (list[AgentID] | None, optional) – optional list of target agent IDs, defaults to None

  • retry (bool, optional) – retry setting the groundspeed of an agent on failure, defaults to False

  • message_timeout (float, optional) – maximum amount of time allowed to try setting the groundspeed of an agent before a timeout occurs, defaults to 2.5 [s]

  • ack_timeout (float, optional) – maximum amount of time allowed per attempt to verify acknowledgement of a groundspeed change attempt, defaults to 0.5 [s]

Returns

future message response, if any

Return type

Future

set_home_position(use_current_position: bool = True, latitude: float = 0, longitude: float = 0, altitude: float = 0, agent_ids: Optional[list[tuple[int, int]]] = None, retry: bool = False, message_timeout: float = 2.5, ack_timeout: float = 0.5, verify_state: bool = False, lat_lon_deviation_tolerance: float = 0.001, altitude_deviation_tolerance: float = 1.0, verify_state_timeout: float = 1.0) Future#

Set the home position of the swarm agents.

If the target agent IDs are not provided, the system will attempt to set the home position of all swarm agents.

Parameters
  • use_current_position (bool, optional) – use the current position of an agent as its home position, defaults to True

  • latitude (float, optional) – latitude of the home position, defaults to 0

  • longitude (float, optional) – longitude of the home position, defaults to 0

  • altitude (float, optional) – altitude of the home position, defaults to 0

  • agent_ids (list[AgentID] | None, optional) – optional list of target agent IDs, defaults to None

  • retry (bool, optional) – retry setting the home position of an agent on failure, defaults to False

  • message_timeout (float, optional) – maximum amount of time allowed to try setting the home position of an agent before a timeout occurs, defaults to 2.5 [s]

  • ack_timeout (float, optional) – maximum amount of time allowed per attempt to verify acknowledgement of the set home position request message, defaults to 0.5 [s]

  • verify_state (bool, optional) – flag indicating whether or not the system should attempt to verify that the agent’s home position was properly set, defaults to False

  • lat_lon_deviation_tolerance (float, optional) – maximum lat/lon deviation allowed when verifying that an agent’s home position was set to its current position, defaults to 0.001

  • altitude_deviation_tolerance (float, optional) – maximum altitude deviation allowed when verifying that an agent’s home position was set to its current position, defaults to 1.0

  • verify_state_timeout (float, optional) – maximum amount of time allowed per attempt to verify that an agent’s home position has been set properly, defaults to 1.0 [s]

Returns

future message response, if any

Return type

Future

set_message_interval(message: str, frequency: float, response_target: int = 0, agent_ids: Optional[list[tuple[int, int]]] = None, retry: bool = False, message_timeout: float = 2.5, ack_timeout: float = 0.5) Future#

Set the interval that a message should be sent at.

This can be used to increase the frequency or to block messages from being sent.

Parameters
  • message (str) – MAVLink message whose message interval should be set

  • frequency (float) – frequency that the message should be sent at [Hz]

  • response_target (int, optional) – target address of message stream, defaults to 0 (flight stack default)

  • agent_ids (list[AgentID] | None, optional) – optional list of agents that should have their message interval set, defaults to None

  • retry (bool, optional) – retry setting the message interval on acknowledgement failure, defaults to False

  • message_timeout (float, optional) – maximum amount of time allowed to try setting a message interval before a timeout occurs, defaults to 2.5 [s]

  • ack_timeout (float, optional) – maximum amount of time allowed per attempt to verify acknowledgement of the message interval setting message, defaults to 0.5 [s]

Returns

future message response, if any

Return type

Future

set_mode(flight_mode: str, agent_ids: Optional[list[tuple[int, int]]] = None, retry: bool = False, message_timeout: float = 2.5, ack_timeout: float = 0.5, verify_state: bool = False, verify_state_timeout: float = 1.0) Future#

Set the flight mode of the desired agents.

If the target agent IDs are not provided, the system will attempt to set the flight mode of all agents in the swarm.

Parameters
  • flight_mode (str) – flight mode to switch the agents into

  • agent_ids (list[AgentID] | None, optional) – optional list of target agent IDs, defaults to None

  • retry (bool, optional) – retry changing the mode of an agent on failure, defaults to False

  • message_timeout (float, optional) – maximum amount of time allowed to try changing the mode of an agent before a timeout occurs, defaults to 2.5 [s]

  • ack_timeout (float, optional) – maximum amount of time allowed per attempt to verify acknowledgement of a mode change attempt, defaults to 0.5 [s]

  • verify_state (bool, optional) – flag indicating whether or not the system should attempt to verify that the agent switched into the target mode, defaults to False

  • verify_state_timeout (float, optional) – maximum amount of time allowed per attempt to verify that an agent is in the desired mode, defaults to 1.0 [s]

Returns

future message response, if any

Return type

Future

set_parameter(parameter_id: str, parameter_value: Any, parameter_type: int = 9, agent_ids: Optional[list[tuple[int, int]]] = None, retry: bool = False, message_timeout: float = 2.5, ack_timeout: float = 0.5) Future#

Set a parameter on the specified agents.

If the target agent IDs are not provided, the system will attempt to set the specified parameter on all swarm agents.

Parameters
  • parameter_id (str) – ID of the parameter to set

  • parameter_value (Any) – value to set the parameter to

  • parameter_type (int, optional) – parameter value type, defaults to 9

  • agent_ids (list[AgentID] | None, optional) – optional list of target agent IDs, defaults to None

  • retry (bool, optional) – retry setting the parameter on an agent when parameter setting fails, defaults to False

  • message_timeout (float, optional) – maximum amount of time allowed to try setting the value of the parameter on an agent before a timeout occurs, defaults to 2.5 [s]

  • ack_timeout (float, optional) – maximum amount of time allowed per attempt to verify acknowledgement of the set parameter message, defaults to 0.5 [s]

Returns

future message response, if any

Return type

Future

shutdown(agent_ids: Optional[list[tuple[int, int]]] = None, retry: bool = False, message_timeout: float = 2.5, ack_timeout: float = 0.5) Future#

Shutdown the desired agents.

If the target agent IDs are not provided, the system will attempt to shutdown all agents in the swarm.

Parameters
  • agent_ids (list[AgentID] | None, optional) – optional list of target agent IDs, defaults to None

  • retry (bool, optional) – retry shutting down an agent on failure, defaults to False

  • message_timeout (float, optional) – maximum amount of time allowed to try shutting down an agent before a timeout occurs, defaults to 2.5 [s]

  • ack_timeout (float, optional) – maximum amount of time allowed per attempt to verify acknowledgement of a shutdown attempt, defaults to 0.5 [s]

Returns

future message response, if any

Return type

Future

property supported_modes: dict[str, int] | None#

List of supported flight modes.

Returns

supported flight modes

Return type

dict[str, int]

takeoff(altitude: float, latitude: float = 0, longitude: float = 0, agent_ids: Optional[list[tuple[int, int]]] = None, retry: bool = False, message_timeout: float = 2.5, ack_timeout: float = 0.5) Future#

Switch the specified agents into takeoff mode.

This does NOT perform a full takeoff sequence.

If the target agent IDs are not provided, the system will attempt to switch all swarm agents into takeoff mode.

State verification is not supported for this command.

Parameters
  • altitude (float) – altitude to takeoff to

  • latitude (float, optional) – latitude to takeoff to, defaults to 0

  • longitude (float, optional) – longitude to takeoff to, defaults to 0

  • agent_ids (list[AgentID] | None, optional) – optional list of target agent IDs, defaults to None

  • retry (bool, optional) – retry switching the agent into takeoff mode on failure, defaults to False

  • message_timeout (float, optional) – maximum amount of time allowed to try switching an agent into takeoff mode before a timeout occurs, defaults to 2.5 [s]

  • ack_timeout (float, optional) – maximum amount of time allowed per attempt to verify acknowledgement of the takeoff message, defaults to 0.5 [s]

Returns

future message response, if any

Return type

Future

takeoff_sequence(altitude: float, latitude: float = 0, longitude: float = 0, agent_ids: Optional[list[tuple[int, int]]] = None, retry: bool = False, message_timeout: float = 2.5, ack_timeout: float = 0.5, verify_state: bool = False, verify_state_timeout: float = 1.0, stage_delay: float = 3.0) list[pymavswarm.message.response.Response] | pymavswarm.message.response.Response#

Command the desired agents to perform a full takeoff sequence.

The full takeoff sequence includes the following stages:

  1. Arm the agent

  2. Command takeoff

Prior to executing this command, ensure that all agents are in the correct flight mode. In the case of ArduPilot, this should be GUIDED mode. If a failure occurs at stage 1, the system will attempt to command all agents to disarm. If a failure occurs at stage 2, the system will attempt to command all agents to land.

This command is a blocking command and does not run asynchronously (i.e., no future will be returned). This command is multi-staged as well; therefore, it may take a while to complete. Furthermore, there is NO altitude verification to ensure that an agent has reached the desired altitude.

Parameters
  • altitude (float) – altitude to takeoff to

  • latitude (float, optional) – latitude to takeoff to, defaults to 0

  • longitude (float, optional) – longitude to takeoff to, defaults to 0

  • agent_ids (list[AgentID] | None, optional) – optional list of target agent IDs, defaults to None

  • retry (bool, optional) – retry performing a stage on stage failure, defaults to False

  • message_timeout (float, optional) – maximum amount of time allowed to try complete a stage on an agent before a timeout occurs, defaults to 2.5 [s]

  • ack_timeout (float, optional) – maximum amount of time allowed per attempt to verify acknowledgement of a stage, defaults to 0.5 [s]

  • verify_state (bool, optional) – verify that the agents successfully changed flight modes and armed; if state verification fails, failure response will be initiaed, defaults to False

  • verify_state_timeout (float, optional) – maximum amount of time allowed to verify a state change on an agent, defaults to 1.0

  • stage_delay (float, optional) – amount of time to delay between takeoff sequence stages, defaults to 3.0

Returns

message response; if a failure occurs, the response(s) will be the stage’s responses (e.g., if any message fails at stage 1, the returned value will be the message responses for that particular stage)

Return type

list[Response] | Response

property time_since_boot: int | None#

Time since the connection was established.

Returns

time since connection was established.

Return type

int | None

Module contents#