pymavswarm package#
Subpackages#
- pymavswarm.handlers package
- pymavswarm.message package
- pymavswarm.mission package
- pymavswarm.safety package
- pymavswarm.state package
- Submodules
- pymavswarm.state.docker_info module
- pymavswarm.state.ekf_status module
- pymavswarm.state.generic module
- pymavswarm.state.gps_info module
- pymavswarm.state.parameter module
- pymavswarm.state.parameter_list module
- pymavswarm.state.position module
- pymavswarm.state.state module
- pymavswarm.state.telemetry module
- pymavswarm.state.vector module
- pymavswarm.state.velocity module
- Module contents
- pymavswarm.utils package
Submodules#
pymavswarm.agent module#
- class pymavswarm.agent.Agent(system_id: int, component_id: int, timeout_period: float = 30.0, max_params_stored: int = 5)#
Bases:
object
Swarm agent.
Agent represents and stores the state of an agent in the network. The agent’s state is updated as new MAVLink messages are received from the associated message.
- property acceleration: Acceleration#
Acceleration of the agent.
- Returns
agent acceleration
- Return type
swarm_state.Acceleration
- property clock_offset: Generic#
Clock offset from the global clock.
- Returns
clock offset
- Return type
- property component_id: int#
Component ID of the agent.
- Returns
component ID
- Return type
int
- compute_reachable_set(position_error: float, velocity_error: float, reach_time: float, initial_step_size: float = 0.5, reach_timeout: float = 0.001) tuple[pymavswarm.safety.hyperrectangle.HyperRectangle, float] #
Compute the current reachable set of the agent.
In the current state of the system, the reachable set is only computed using the global frame. In the future, if transformations between frames is implemented, reachable set computation for multiple frames may be supported.
- Parameters
position_error (float) – 3D position error to account for in the measurement
velocity_error (float) – 3D velocity error to account for in the measurement
reach_time (float) – time that the reachable set should reach forward to
initial_step_size (float, optional) – initial step to step forward when performing face lifting (lower means higher accuracy but slower; higher means lower accuracy but faster), defaults to 0.5
reach_timeout (float, optional) – maximum amount of time to spend computing the reachable set [s], defaults to 0.001
- Returns
reachable set for the agent, time that the reachable set reaches to from the start time
- Return type
tuple[HyperRectangle, float]
- property current_waypoint: Generic#
Index of the current waypoint that an agent is maneuvering to.
- Returns
current target waypoint
- Return type
int
- property docker_info: DockerInfo#
Deployed docker image info.
- Returns
agent docker information, if any
- Return type
- property home_position: Vector#
Home position of the agent.
- Returns
agent’s home position
- Return type
- property last_heartbeat: Any#
Timestamp of the last heartbeat message received.
- Returns
last heartbeat timestamp
- Return type
Any
- property last_params_read: ParameterList#
Circle buffer containing the most recent parameters read and their values.
- Returns
parameters read from the agent
- Return type
deque
- property mission: SwarmMission#
Mission being completed by an agent.
- Returns
agent’s mission
- Return type
Mission
- property mode: Generic#
Flight mode that an agent is operating in.
- Returns
agent’s flight mode
- Return type
str
- property system_id: int#
System ID of the agent.
- Returns
system ID
- Return type
int
- property system_status: Generic#
System-level status of an agent.
- Returns
agent system status
- Return type
str
- property telemetry: Telemetry#
Telemetry status information.
- Returns
telemetry information
- Return type
- property timeout: Generic#
Timeout state of the agent.
- Returns
whether or not the agent is timed out
- Return type
bool
- property timeout_period: Generic#
Max time between heartbeat messages.
- Returns
max spacing between heartbeat messages
- Return type
float
pymavswarm.connection module#
- class pymavswarm.connection.Connection(log_level: int = 20)#
Bases:
object
Handle interaction with the network and the MAVLink connection.
- connect(port: str, baudrate: int, source_system: int, source_component: int, connection_attempt_timeout: float) bool #
Establish a MAVLink connection.
Attempt to establish a MAVLink connection using the provided configurations.
- Parameters
port (str) – serial port to attempt connection on
baudrate (int) – serial connection baudrate
source_system (int) – system ID for the source system
source_component (int) – component ID for the source system
connection_attempt_timeout (float) – maximum time taken to establish a connection
- Returns
flag indicating whether connection was successful
- Return type
bool
- property connected: bool#
Mavlink connection status.
- Returns
connection status
- Return type
bool
- disconnect() None #
Close the connection and stop all threads.
- property mavlink_connection: Optional[Any]#
Mavlink connection.
- Returns
mavlink connection
- Return type
Any
pymavswarm.mavswarm module#
- class pymavswarm.mavswarm.MavSwarm(max_workers: int = 5, log_level: int = 20, log_to_file: bool = False, log_filename: Optional[str] = None, ignore_ids: Optional[list[tuple[int | None, int | None]]] = None)#
Bases:
object
User-facing pymavswarm interface.
Provides an interface for controlling robot swarms, obtaining swarm state, and configuring agents.
- COLLISION_RESPONSE_FORCE_DISARM = 4#
- COLLISION_RESPONSE_LAND = 1#
- COLLISION_RESPONSE_LOITER = 3#
- COLLISION_RESPONSE_NONE = 0#
- COLLISION_RESPONSE_RTL = 2#
- GLOBAL_FRAME = 0#
- GLOBAL_RELATIVE_FRAME = 10#
- LOCAL_FRAME = 1#
- accelerometer_calibration(simple_calibration: bool = True, agent_ids: Optional[list[tuple[int, int]]] = None, retry: bool = False, message_timeout: float = 2.5, ack_timeout: float = 0.5) Future #
Perform accelerometer calibration on the specified agents.
If the target agent IDs are not provided, the system will attempt to perform accelerometer calibration on all swarm agents.
- Parameters
simple_calibration (bool, optional) – perform simple accelerometer calibration, defaults to True
agent_ids (list[AgentID] | None, optional) – optional list of target agent IDs, defaults to None
retry (bool, optional) – retry accelerometer calibration on an agent on calibration failure, defaults to False
message_timeout (float, optional) – maximum amount of time allowed to try accelerometer calibration on an agent before a timeout occurs, defaults to 2.5 [s]
ack_timeout (float, optional) – maximum amount of time allowed per attempt to verify acknowledgement of a accelerometer calibration attempt, defaults to 0.5 [s]
- Returns
future message response, if any
- Return type
Future
- add_agent(agent: Agent) None #
Manually add an agent to the swarm.
This is useful if an agent isn’t automatically recognized, but commands still need to be sent to this agent.
- Parameters
agent (Agent) – agent to add
- add_custom_message_handler(message: str, callback: Callable[[Any, dict[tuple[int, int], pymavswarm.agent.Agent]], None]) None #
Add a custom message handler for the specified message.
- Parameters
message (str) – message type to call the handler on
callback (MessageHandler) – function to call when the message is received
- property agent_ids: list[tuple[int, int]]#
List of agent IDs in the swarm.
- Returns
list of agent IDs
- Return type
list[AgentID]
- property agent_list_changed: Event#
Event signaling that the list of agents in the swarm has changed.
- Returns
event
- Return type
- property agents: list[pymavswarm.agent.Agent]#
List of agents in the swarm.
- Returns
swarm agents
- Return type
list[Agent]
- airspeed_calibration(agent_ids: Optional[list[tuple[int, int]]] = None, retry: bool = False, message_timeout: float = 2.5, ack_timeout: float = 0.5) Future #
Perform airspeed calibration on the specified agents.
If the target agent IDs are not provided, the system will attempt to perform airspeed calibration on all swarm agents.
- Parameters
agent_ids (list[AgentID] | None, optional) – optional list of target agent IDs, defaults to None
retry (bool, optional) – retry airspeed calibration on an agent on calibration failure, defaults to False
message_timeout (float, optional) – maximum amount of time allowed to try airspeed calibration on an agent before a timeout occurs, defaults to 2.5 [s]
ack_timeout (float, optional) – maximum amount of time allowed per attempt to verify acknowledgement of a airspeed calibration attempt, defaults to 0.5 [s]
- Returns
future message response, if any
- Return type
Future
- arm(agent_ids: Optional[list[tuple[int, int]]] = None, retry: bool = False, message_timeout: float = 2.5, ack_timeout: float = 0.5, verify_state: bool = False, verify_state_timeout: float = 1.0) Future #
Arm the desired agents.
If the target agent IDs are not provided, the system will attempt to arm all agents in the swarm.
- Parameters
agent_ids (list[AgentID] | None, optional) – optional list of target agent IDs, defaults to None
retry (bool, optional) – retry arming an agent on failure, defaults to False
message_timeout (float, optional) – maximum amount of time allowed to try arming an agent before a timeout occurs, defaults to 2.5 [s]
ack_timeout (float, optional) – maximum amount of time allowed per attempt to verify acknowledgement of an arming attempt, defaults to 0.5 [s]
verify_state (bool, optional) – flag indicating whether or not the system should attempt to verify that the agent switched into the armed state, defaults to False
verify_state_timeout (float, optional) – maximum amount of time allowed per attempt to verify that an agent is in the armed state, defaults to 1.0 [s]
- Returns
future message response, if any
- Return type
Future
- barometer_temperature_calibration(agent_ids: Optional[list[tuple[int, int]]] = None, retry: bool = False, message_timeout: float = 2.5, ack_timeout: float = 0.5) Future #
Perform barometer temperature calibration on the specified agents.
If the target agent IDs are not provided, the system will attempt to perform barometer calibration on all swarm agents.
- Parameters
agent_ids (list[AgentID] | None, optional) – optional list of target agent IDs, defaults to None
retry (bool, optional) – retry barometer temperature calibration on an agent on calibration failure, defaults to False
message_timeout (float, optional) – maximum amount of time allowed to try barometer temperature calibration on an agent before a timeout occurs, defaults to 2.5 [s]
ack_timeout (float, optional) – maximum amount of time allowed per attempt to verify acknowledgement of a barometer temperature calibration attempt, defaults to 0.5 [s]
- Returns
future message response, if any
- Return type
Future
- connect(port: str, baudrate: Optional[int] = None, source_system: int = 255, source_component: int = 0, connection_attempt_timeout: float = 2.0) bool #
Connect to the network.
Attempt to establish a MAVLink connection using the provided configurations.
- Parameters
port (str) – port over which a connection should be established
baud (int) – baudrate that a connection should be established with
source_system (int, optional) – system ID of the connection, defaults to 255
source_component (int, optional) – component ID of the connection, defaults to 0
connection_attempt_timeout (float, optional) – maximum amount of time allowed to attempt to establish a connection, defaults to 2.0 [s]
- Returns
whether or not the connection attempt was successful
- Return type
bool
- property connected: bool#
Flag indicating whether there is an active MAVLink connection.
- Returns
flag
- Return type
bool
- disable_collision_avoidance() None #
Disable collision detection.
- disarm(agent_ids: Optional[list[tuple[int, int]]] = None, retry: bool = False, message_timeout: float = 2.5, ack_timeout: float = 0.5, verify_state: bool = False, verify_state_timeout: float = 1.0, force: bool = False) Future #
Disarm the desired agents.
If the target agent IDs are not provided, the system will attempt to disarm all agents in the swarm.
- Parameters
agent_ids (list[AgentID] | None, optional) – optional list of target agent IDs, defaults to None
retry (bool, optional) – retry disarming an agent on failure, defaults to False
message_timeout (float, optional) – maximum amount of time allowed to try disarming an agent before a timeout occurs, defaults to 2.5 [s]
ack_timeout (float, optional) – maximum amount of time allowed per attempt to verify acknowledgement of a disarming attempt, defaults to 0.5 [s]
verify_state (bool, optional) – flag indicating whether or not the system should attempt to verify that the agent switched into the disarmed state, defaults to False
verify_state_timeout (float, optional) – maximum amount of time allowed per attempt to verify that an agent is in the disarmed state, defaults to 1.0 [s]
force (bool, optional) – force disarm the agents, defaults to False
- Returns
future message response, if any
- Return type
Future
- disconnect() None #
Disconnect from the MAVLink network and shutdown all services.
- enable_collision_avoidance(reach_time: float, position_error: float, velocity_error: float, collision_response: int, use_latency: bool = True, initial_step_size: float = 0.5, reach_timeout: float = 0.001, retry_collision_response: bool = True, verify_collision_response_state: bool = True, max_time_difference: float = 2.0) None #
Enable collision avoidance between agents.
WARNING: THIS IS A HIGHLY EXPERIMENTAL FEATURE!
Please use extreme caution when using collision avoidance! The pymavswarm team does not take any credit for any collisions that occur when using this feature.
- Parameters
reach_time (float) – amount of time to project forward when computing the reachable states of agents [s]
position_error (float) – 3D position error of each agent [m]
velocity_error (float) – 3D velocity error of each agent [m/s]
collision_response (int) – collision response to execute when a potential collision is detected
use_latency (bool, optional) – integrate the current communications latency between the agents into the reach time calculation, defaults to True
initial_step_size (float, optional) – initial step to step forward when performing face lifting (lower means higher accuracy but slower; higher means lower accuracy but faster), defaults to 0.5
reach_timeout (float, optional) – maximum amount of time to spend computing the reachable set [s], defaults to 0.001
retry_collision_response (bool, optional) – retry sending a collision response if an agent does not acknowledge the collision response command, defaults to True
verify_collision_response_state (bool, optional) – verify that an agent properly changes states after receiving a collision response command, defaults to True
max_time_difference (float, optional) – max difference between agent timestamps before the state is considered stale and not checked [s], defaults to 2.0
- get_agent_by_id(agent_id: tuple[int, int]) pymavswarm.agent.Agent | None #
Get the agent with the specified ID (system ID, component ID).
- Parameters
agent_id (AgentID) – (system ID, component ID)
- Returns
agent, if found
- Return type
Agent | None
- get_home_position(agent_ids: Optional[list[tuple[int, int]]] = None, retry: bool = False, message_timeout: float = 2.5, ack_timeout: float = 0.5) Future #
Get the current home position of the swarm agents.
If the target agent IDs are not provided, the system will attempt to get the home position of all swarm agents.
The home position will be updated in the agent’s home position property.
- Parameters
agent_ids (list[AgentID] | None, optional) – optional list of target agent IDs, defaults to None
retry (bool, optional) – retry getting the home position of an agent on failure, defaults to False
message_timeout (float, optional) – maximum amount of time allowed to try getting the home position of an agent before a timeout occurs, defaults to 2.5 [s]
ack_timeout (float, optional) – maximum amount of time allowed per attempt to verify acknowledgement of the get home position request message, defaults to 0.5 [s]
- Returns
future message response, if any
- Return type
Future
- goto(x: float = 0, y: float = 0, z: float = 0, hold: float = 0, agent_ids: Optional[list[tuple[int, int]]] = None, retry: bool = False, message_timeout: float = 2.5, ack_timeout: float = 0.5, config_file: Optional[str] = None, frame: int = 10) Future #
Command the agents to go to the desired location.
Use this cautiously. If multiple agents fly to the same location, there may be a collision at this location.
If using a configuration file for pre-planned goto execution, the system will only send commands to the agents with locations specified in the file.
- Parameters
x (float, optional) – target x position in the specified frame, defaults to 0
y (float, optional) – target y position in the specified frame, defaults to 0
z (float, optional) – target z position in the specified frame, defaults to 0
hold (float, optional) – time to stay at waypoint for rotary wing (ignored by fixed wing), defaults to 0
agent_ids (list[AgentID] | None, optional) – optional list of target agent IDs, defaults to None
retry (bool, optional) – retry commanding the desired agents to go to the desired location on acknowledgement failure, defaults to False
message_timeout (float, optional) – maximum amount of time allowed to try commanding an agent to fly to the specified location before a timeout occurs, defaults to 2.5 [s]
ack_timeout (float, optional) – maximum amount of time allowed per attempt to verify acknowledgement of the goto message, defaults to 0.5 [s]
config_file (str | None, optional) – full path to file with pre-planned goto locations, defaults to None
frame (int, optional) – coordinate frame that the x, y, and z positions are provided in, defaults to GLOBAL_FRAME_RELATIVE
- Returns
future message response, if any
- Return type
Future
- ground_pressure_calibration(agent_ids: Optional[list[tuple[int, int]]] = None, retry: bool = False, message_timeout: float = 2.5, ack_timeout: float = 0.5) Future #
Perform ground pressure calibration on the specified agents.
If the target agent IDs are not provided, the system will attempt to perform ground pressure calibration on all swarm agents.
- Parameters
agent_ids (list[AgentID] | None, optional) – optional list of target agent IDs, defaults to None
retry (bool, optional) – retry ground pressure calibration on an agent on calibration failure, defaults to False
message_timeout (float, optional) – maximum amount of time allowed to try ground pressure calibration on an agent before a timeout occurs, defaults to 2.5 [s]
ack_timeout (float, optional) – maximum amount of time allowed per attempt to verify acknowledgement of a ground pressure calibration attempt, defaults to 0.5 [s]
- Returns
future message response, if any
- Return type
Future
- gyroscope_calibration(agent_ids: Optional[list[tuple[int, int]]] = None, retry: bool = False, message_timeout: float = 2.5, ack_timeout: float = 0.5) Future #
Perform gyroscope calibration on the specified agents.
If the target agent IDs are not provided, the system will attempt to perform gyroscope calibration on all swarm agents.
- Parameters
agent_ids (list[AgentID] | None, optional) – optional list of target agent IDs, defaults to None
retry (bool, optional) – retry gyroscope calibration on an agent on calibration failure, defaults to False
message_timeout (float, optional) – maximum amount of time allowed to try gyroscope calibration on an agent before a timeout occurs, defaults to 2.5 [s]
ack_timeout (float, optional) – maximum amount of time allowed per attempt to verify acknowledgement of a gyroscope calibration attempt, defaults to 0.5 [s]
- Returns
future message response, if any
- Return type
Future
- handle_collision(colliding_agents: list[tuple[int, int]], collision_response: int, retry: bool = True, verify_state: bool = True) None #
Execute a collision response on the colliding agents.
- Parameters
colliding_agents (list[AgentID]) – agents that are on track to collide
collision_response (int) – behavior that should be executed on the colliding agents; options are available as MavSwarm constants
retry (bool, optional) – retry sending the collision response on failure, defaults to True
verify_state (bool, optional) – verify that the agent switched into the collision response mode, defaults to True
- Raises
ValueError – an invalid collision response was provided
- magnetometer_calibration(agent_ids: Optional[list[tuple[int, int]]] = None, retry: bool = False, message_timeout: float = 2.5, ack_timeout: float = 0.5) Future #
Perform magnetometer calibration on the specified agents.
If the target agent IDs are not provided, the system will attempt to perform magnetometer calibration on all swarm agents.
- Parameters
agent_ids (list[AgentID] | None, optional) – optional list of target agent IDs, defaults to None
retry (bool, optional) – retry magnetometer calibration on an agent on calibration failure, defaults to False
message_timeout (float, optional) – maximum amount of time allowed to try magnetometer calibration on an agent before a timeout occurs, defaults to 2.5 [s]
ack_timeout (float, optional) – maximum amount of time allowed per attempt to verify acknowledgement of a magnetometer calibration attempt, defaults to 0.5 [s]
- Returns
future message response, if any
- Return type
Future
- parse_yaml_mission(config_file: str) dict #
Parse a pre-planned trajectory/mission.
- Parameters
config_file (str) – configuration file to parse
- Returns
parsed configuration file
- Return type
dict
- read_parameter(parameter_id: str, agent_ids: Optional[list[tuple[int, int]]] = None, retry: bool = False, message_timeout: float = 2.5, ack_timeout: float = 0.5) Future #
Read the parameter value from the desired agents.
- Parameters
parameter_id (str) – ID of the parameter to read
agent_ids (list[AgentID] | None, optional) – optional list of target agent IDs, defaults to None
retry (bool, optional) – retry reading the parameter on an agent when parameter read fails, defaults to False
message_timeout (float, optional) – maximum amount of time allowed to try reading the value of the parameter on an agent before a timeout occurs, defaults to 2.5 [s]
ack_timeout (float, optional) – maximum amount of time allowed per attempt to verify acknowledgement of the read parameter message, defaults to 0.5 [s]
- Returns
future message response, if any
- Return type
Future
- reboot(agent_ids: Optional[list[tuple[int, int]]] = None, retry: bool = False, message_timeout: float = 2.5, ack_timeout: float = 0.5) Future #
Reboot the desired agents.
If the target agent IDs are not provided, the system will attempt to reboot all agents in the swarm.
- Parameters
agent_ids (list[AgentID] | None, optional) – optional list of target agent IDs, defaults to None
retry (bool, optional) – retry rebooting an agent on failure, defaults to False
message_timeout (float, optional) – maximum amount of time allowed to try rebooting an agent before a timeout occurs, defaults to 2.5 [s]
ack_timeout (float, optional) – maximum amount of time allowed per attempt to verify acknowledgement of a reboot attempt, defaults to 0.5 [s]
- Returns
future message response, if any
- Return type
Future
- remove_agent(agent: Agent) None #
Manually remove an agent from the swarm.
- Parameters
agent (Agent) – agent to remove
- send_debug_message(name: str, value: int | float | list, agent_ids: Optional[list[tuple[int, int]]] = None, retry: bool = False, message_timeout: float = 2.5, ack_timeout: float = 0.5) Future #
Send a debug message to the specified agents.
If the target agent IDs are not provided, the system will attempt to send the debug message to all swarm agents.
- Parameters
name (str) – debug message name
value (int | float | list) – debug message value
agent_ids (list[AgentID] | None, optional) – optional list of target agent IDs, defaults to None
retry (bool, optional) – retry sending the debug message to an agent on failure, defaults to False
message_timeout (float, optional) – maximum amount of time allowed to try sending the debug message to an agent before a timeout occurs, defaults to 2.5 [s]
ack_timeout (float, optional) – maximum amount of time allowed per attempt to verify acknowledgement of the debug message, defaults to 0.5 [s]
- Returns
future message response, if any
- Return type
Future
- set_airspeed(speed: float, agent_ids: Optional[list[tuple[int, int]]] = None, retry: bool = False, message_timeout: float = 2.5, ack_timeout: float = 0.5) Future #
Set the airspeed of the desired agents.
If the target agent IDs are not provided, the system will attempt to set the airspeed of all agents in the swarm.
- Parameters
speed (float) – target airspeed [m/s]
agent_ids (list[AgentID] | None, optional) – optional list of target agent IDs, defaults to None
retry (bool, optional) – retry setting the airspeed of an agent on failure, defaults to False
message_timeout (float, optional) – maximum amount of time allowed to try setting the airspeed of an agent before a timeout occurs, defaults to 2.5 [s]
ack_timeout (float, optional) – maximum amount of time allowed per attempt to verify acknowledgement of a airspeed change attempt, defaults to 0.5 [s]
- Returns
future message response, if any
- Return type
Future
- set_groundspeed(speed: float, agent_ids: Optional[list[tuple[int, int]]] = None, retry: bool = False, message_timeout: float = 2.5, ack_timeout: float = 0.5) Future #
Set the groundspeed of the desired agents.
If the target agent IDs are not provided, the system will attempt to set the groundspeed of all agents in the swarm.
- Parameters
speed (float) – target groundspeed [m/s]
agent_ids (list[AgentID] | None, optional) – optional list of target agent IDs, defaults to None
retry (bool, optional) – retry setting the groundspeed of an agent on failure, defaults to False
message_timeout (float, optional) – maximum amount of time allowed to try setting the groundspeed of an agent before a timeout occurs, defaults to 2.5 [s]
ack_timeout (float, optional) – maximum amount of time allowed per attempt to verify acknowledgement of a groundspeed change attempt, defaults to 0.5 [s]
- Returns
future message response, if any
- Return type
Future
- set_home_position(use_current_position: bool = True, latitude: float = 0, longitude: float = 0, altitude: float = 0, agent_ids: Optional[list[tuple[int, int]]] = None, retry: bool = False, message_timeout: float = 2.5, ack_timeout: float = 0.5, verify_state: bool = False, lat_lon_deviation_tolerance: float = 0.001, altitude_deviation_tolerance: float = 1.0, verify_state_timeout: float = 1.0) Future #
Set the home position of the swarm agents.
If the target agent IDs are not provided, the system will attempt to set the home position of all swarm agents.
- Parameters
use_current_position (bool, optional) – use the current position of an agent as its home position, defaults to True
latitude (float, optional) – latitude of the home position, defaults to 0
longitude (float, optional) – longitude of the home position, defaults to 0
altitude (float, optional) – altitude of the home position, defaults to 0
agent_ids (list[AgentID] | None, optional) – optional list of target agent IDs, defaults to None
retry (bool, optional) – retry setting the home position of an agent on failure, defaults to False
message_timeout (float, optional) – maximum amount of time allowed to try setting the home position of an agent before a timeout occurs, defaults to 2.5 [s]
ack_timeout (float, optional) – maximum amount of time allowed per attempt to verify acknowledgement of the set home position request message, defaults to 0.5 [s]
verify_state (bool, optional) – flag indicating whether or not the system should attempt to verify that the agent’s home position was properly set, defaults to False
lat_lon_deviation_tolerance (float, optional) – maximum lat/lon deviation allowed when verifying that an agent’s home position was set to its current position, defaults to 0.001
altitude_deviation_tolerance (float, optional) – maximum altitude deviation allowed when verifying that an agent’s home position was set to its current position, defaults to 1.0
verify_state_timeout (float, optional) – maximum amount of time allowed per attempt to verify that an agent’s home position has been set properly, defaults to 1.0 [s]
- Returns
future message response, if any
- Return type
Future
- set_message_interval(message: str, frequency: float, response_target: int = 0, agent_ids: Optional[list[tuple[int, int]]] = None, retry: bool = False, message_timeout: float = 2.5, ack_timeout: float = 0.5) Future #
Set the interval that a message should be sent at.
This can be used to increase the frequency or to block messages from being sent.
- Parameters
message (str) – MAVLink message whose message interval should be set
frequency (float) – frequency that the message should be sent at [Hz]
response_target (int, optional) – target address of message stream, defaults to 0 (flight stack default)
agent_ids (list[AgentID] | None, optional) – optional list of agents that should have their message interval set, defaults to None
retry (bool, optional) – retry setting the message interval on acknowledgement failure, defaults to False
message_timeout (float, optional) – maximum amount of time allowed to try setting a message interval before a timeout occurs, defaults to 2.5 [s]
ack_timeout (float, optional) – maximum amount of time allowed per attempt to verify acknowledgement of the message interval setting message, defaults to 0.5 [s]
- Returns
future message response, if any
- Return type
Future
- set_mode(flight_mode: str, agent_ids: Optional[list[tuple[int, int]]] = None, retry: bool = False, message_timeout: float = 2.5, ack_timeout: float = 0.5, verify_state: bool = False, verify_state_timeout: float = 1.0) Future #
Set the flight mode of the desired agents.
If the target agent IDs are not provided, the system will attempt to set the flight mode of all agents in the swarm.
- Parameters
flight_mode (str) – flight mode to switch the agents into
agent_ids (list[AgentID] | None, optional) – optional list of target agent IDs, defaults to None
retry (bool, optional) – retry changing the mode of an agent on failure, defaults to False
message_timeout (float, optional) – maximum amount of time allowed to try changing the mode of an agent before a timeout occurs, defaults to 2.5 [s]
ack_timeout (float, optional) – maximum amount of time allowed per attempt to verify acknowledgement of a mode change attempt, defaults to 0.5 [s]
verify_state (bool, optional) – flag indicating whether or not the system should attempt to verify that the agent switched into the target mode, defaults to False
verify_state_timeout (float, optional) – maximum amount of time allowed per attempt to verify that an agent is in the desired mode, defaults to 1.0 [s]
- Returns
future message response, if any
- Return type
Future
- set_parameter(parameter_id: str, parameter_value: Any, parameter_type: int = 9, agent_ids: Optional[list[tuple[int, int]]] = None, retry: bool = False, message_timeout: float = 2.5, ack_timeout: float = 0.5) Future #
Set a parameter on the specified agents.
If the target agent IDs are not provided, the system will attempt to set the specified parameter on all swarm agents.
- Parameters
parameter_id (str) – ID of the parameter to set
parameter_value (Any) – value to set the parameter to
parameter_type (int, optional) – parameter value type, defaults to 9
agent_ids (list[AgentID] | None, optional) – optional list of target agent IDs, defaults to None
retry (bool, optional) – retry setting the parameter on an agent when parameter setting fails, defaults to False
message_timeout (float, optional) – maximum amount of time allowed to try setting the value of the parameter on an agent before a timeout occurs, defaults to 2.5 [s]
ack_timeout (float, optional) – maximum amount of time allowed per attempt to verify acknowledgement of the set parameter message, defaults to 0.5 [s]
- Returns
future message response, if any
- Return type
Future
- shutdown(agent_ids: Optional[list[tuple[int, int]]] = None, retry: bool = False, message_timeout: float = 2.5, ack_timeout: float = 0.5) Future #
Shutdown the desired agents.
If the target agent IDs are not provided, the system will attempt to shutdown all agents in the swarm.
- Parameters
agent_ids (list[AgentID] | None, optional) – optional list of target agent IDs, defaults to None
retry (bool, optional) – retry shutting down an agent on failure, defaults to False
message_timeout (float, optional) – maximum amount of time allowed to try shutting down an agent before a timeout occurs, defaults to 2.5 [s]
ack_timeout (float, optional) – maximum amount of time allowed per attempt to verify acknowledgement of a shutdown attempt, defaults to 0.5 [s]
- Returns
future message response, if any
- Return type
Future
- property supported_modes: dict[str, int] | None#
List of supported flight modes.
- Returns
supported flight modes
- Return type
dict[str, int]
- takeoff(altitude: float, latitude: float = 0, longitude: float = 0, agent_ids: Optional[list[tuple[int, int]]] = None, retry: bool = False, message_timeout: float = 2.5, ack_timeout: float = 0.5) Future #
Switch the specified agents into takeoff mode.
This does NOT perform a full takeoff sequence.
If the target agent IDs are not provided, the system will attempt to switch all swarm agents into takeoff mode.
State verification is not supported for this command.
- Parameters
altitude (float) – altitude to takeoff to
latitude (float, optional) – latitude to takeoff to, defaults to 0
longitude (float, optional) – longitude to takeoff to, defaults to 0
agent_ids (list[AgentID] | None, optional) – optional list of target agent IDs, defaults to None
retry (bool, optional) – retry switching the agent into takeoff mode on failure, defaults to False
message_timeout (float, optional) – maximum amount of time allowed to try switching an agent into takeoff mode before a timeout occurs, defaults to 2.5 [s]
ack_timeout (float, optional) – maximum amount of time allowed per attempt to verify acknowledgement of the takeoff message, defaults to 0.5 [s]
- Returns
future message response, if any
- Return type
Future
- takeoff_sequence(altitude: float, latitude: float = 0, longitude: float = 0, agent_ids: Optional[list[tuple[int, int]]] = None, retry: bool = False, message_timeout: float = 2.5, ack_timeout: float = 0.5, verify_state: bool = False, verify_state_timeout: float = 1.0, stage_delay: float = 3.0) list[pymavswarm.message.response.Response] | pymavswarm.message.response.Response #
Command the desired agents to perform a full takeoff sequence.
The full takeoff sequence includes the following stages:
Arm the agent
Command takeoff
Prior to executing this command, ensure that all agents are in the correct flight mode. In the case of ArduPilot, this should be GUIDED mode. If a failure occurs at stage 1, the system will attempt to command all agents to disarm. If a failure occurs at stage 2, the system will attempt to command all agents to land.
This command is a blocking command and does not run asynchronously (i.e., no future will be returned). This command is multi-staged as well; therefore, it may take a while to complete. Furthermore, there is NO altitude verification to ensure that an agent has reached the desired altitude.
- Parameters
altitude (float) – altitude to takeoff to
latitude (float, optional) – latitude to takeoff to, defaults to 0
longitude (float, optional) – longitude to takeoff to, defaults to 0
agent_ids (list[AgentID] | None, optional) – optional list of target agent IDs, defaults to None
retry (bool, optional) – retry performing a stage on stage failure, defaults to False
message_timeout (float, optional) – maximum amount of time allowed to try complete a stage on an agent before a timeout occurs, defaults to 2.5 [s]
ack_timeout (float, optional) – maximum amount of time allowed per attempt to verify acknowledgement of a stage, defaults to 0.5 [s]
verify_state (bool, optional) – verify that the agents successfully changed flight modes and armed; if state verification fails, failure response will be initiaed, defaults to False
verify_state_timeout (float, optional) – maximum amount of time allowed to verify a state change on an agent, defaults to 1.0
stage_delay (float, optional) – amount of time to delay between takeoff sequence stages, defaults to 3.0
- Returns
message response; if a failure occurs, the response(s) will be the stage’s responses (e.g., if any message fails at stage 1, the returned value will be the message responses for that particular stage)
- Return type
- property time_since_boot: int | None#
Time since the connection was established.
- Returns
time since connection was established.
- Return type
int | None