vmas.simulator.scenario.BaseScenario
- class BaseScenario[source]
Bases:
ABCBase class for scenarios.
This is the class that scenarios inherit from.
The methods that are compulsory to instantiate are:
The methods that are optional to instantiate are:
- viewer_size
The size of the rendering viewer window. This can be changed in the
make_worldfunction.
- viewer_zoom
The zoom of the rendering camera (a lower value means more zoom). This can be changed in the
make_worldfunction.
- render_origin
The origin of the rendering camera when
agent_index_to_focusis None in therender()arguments. This can be changed in themake_worldfunction.
- plot_grid
Whether to plot a grid in the scenario rendering background. This can be changed in the
make_worldfunction.
- grid_spacing
If
plot_grid, the distance between lines in the background grid. This can be changed in themake_worldfunction.
- visualize_semidims
Whether to display boundaries in dimension-limited environment. This can be changed in the
make_worldfunction.
- property world
The
Worldassociated toi this scenario.
- to(device: device)[source]
Casts the scenario to a different device.
- Parameters:
device (Union[str, int, torch.device]) – the device to cast to
- abstract make_world(batch_dim: int, device: device, **kwargs) World[source]
This function needs to be implemented when creating a scenario. In this function the user should instantiate the world and insert agents and landmarks in it.
- Parameters:
batch_dim (int) – the number of vecotrized environments.
device (Union[str, int, torch.device], optional) – the device of the environmemnt.
kwargs (dict, optional) – named arguments passed from environment creation
- Returns:
the
Worldinstance which is automatically set inworld.- Return type:
World
Examples
>>> from vmas.simulator.core import Agent, World, Landmark, Sphere, Box >>> from vmas.simulator.scenario import BaseScenario >>> from vmas.simulator.utils import Color >>> class Scenario(BaseScenario): >>> def make_world(self, batch_dim: int, device: torch.device, **kwargs): ... # Pass any kwargs you desire when creating the environment ... n_agents = kwargs.get("n_agents", 5) ... ... # Create world ... world = World(batch_dim, device, dt=0.1, drag=0.25, dim_c=0) ... # Add agents ... for i in range(n_agents): ... agent = Agent( ... name=f"agent {i}", ... collide=True, ... mass=1.0, ... shape=Sphere(radius=0.04), ... max_speed=None, ... color=Color.BLUE, ... u_range=1.0, ... ) ... world.add_agent(agent) ... # Add landmarks ... for i in range(5): ... landmark = Landmark( ... name=f"landmark {i}", ... collide=True, ... movable=False, ... shape=Box(length=0.3,width=0.1), ... color=Color.RED, ... ) ... world.add_landmark(landmark) ... return world
- abstract reset_world_at(env_index: int | None = None)[source]
Resets the world at the specified env_index.
When a
Noneindex is passed, the world should make a vectorized (batched) reset. Theentity.set_x()methods already have this logic integrated and will perform batched operations when index isNone.When this function is called, all entities have already had their state reset to zeros according to the
env_index. In this function you shoud change the values of the reset states according to your task. For example, some functions you might want to use are:entity.set_pos(),entity.set_vel(),entity.set_rot(),entity.set_ang_vel().
Implementors can access the world at
world.To increase performance, torch tensors should be created with the device already set, like:
torch.tensor(..., device=self.world.device)- Parameters:
env_index (int, otpional) – index of the environment to reset. If
Nonea vectorized reset should be performed.
Spawning at fixed positions
Examples
>>> from vmas.simulator.scenario import BaseScenario >>> import torch >>> class Scenario(BaseScenario): >>> def reset_world_at(self, env_index) ... for i, agent in enumerate(self.world.agents): ... agent.set_pos( ... torch.tensor( ... [-0.2 + 0.1 * i, 1.0], ... dtype=torch.float32, ... device=self.world.device, ... ), ... batch_index=env_index, ... ) ... for i, landmark in enumerate(self.world.landmarks): ... landmark.set_pos( ... torch.tensor( ... [0.2 if i % 2 else -0.2, 0.6 - 0.3 * i], ... dtype=torch.float32, ... device=self.world.device, ... ), ... batch_index=env_index, ... ) ... landmark.set_rot( ... torch.tensor( ... [torch.pi / 4 if i % 2 else -torch.pi / 4], ... dtype=torch.float32, ... device=self.world.device, ... ), ... batch_index=env_index, ... )
Spawning at random positions
Examples
>>> from vmas.simulator.scenario import BaseScenario >>> from vmas.simulator.utils import ScenarioUtils >>> class Scenario(BaseScenario): >>> def reset_world_at(self, env_index) >>> ScenarioUtils.spawn_entities_randomly( ... self.world.agents + self.world.landmarks, ... self.world, ... env_index, ... min_dist_between_entities=0.02, ... x_bounds=(-1.0,1.0), ... y_bounds=(-1.0,1.0), ... )
- abstract observation(agent: Agent) Tensor | Dict[str, Tensor][source]
This function computes the observations for
agentin a vectorized way.The returned tensor should contain the observations for
agentin all envs and should have shape(self.world.batch_dim, n_agent_obs), or be a dict with leaves following that shape.Implementors can access the world at
world.To increase performance, torch tensors should be created with the device already set, like:
torch.tensor(..., device=self.world.device)- Parameters:
agent (Agent) – the agent to compute the observations for
- Returns:
the observation
- Return type:
Union[torch.Tensor, Dict[str, torch.Tensor]]
Examples
>>> from vmas.simulator.scenario import BaseScenario >>> import torch >>> class Scenario(BaseScenario): >>> def observation(self, agent): ... # get positions of all landmarks in this agent's reference frame ... landmark_rel_poses = [] ... for landmark in self.world.landmarks: ... landmark_rel_poses.append(landmark.state.pos - agent.state.pos) ... return torch.cat([agent.state.pos, agent.state.vel, *landmark_rel_poses], dim=-1)
You can also return observations in a dictionary
Examples
>>> from vmas.simulator.scenario import BaseScenario >>> from vmas.simulator.utils import Color >>> class Scenario(BaseScenario): >>> def observation(self, agent): ... return {"pos": agent.state.pos, "vel": agent.state.vel}
- abstract reward(agent: Agent) Tensor[source]
This function computes the reward for
agentin a vectorized way.The returned tensor should contain the reward for
agentin all envs and should have shape(self.world.batch_dim)and dtypetorch.float.Implementors can access the world at
world.To increase performance, torch tensors should be created with the device already set, like:
torch.tensor(..., device=self.world.device)- Parameters:
agent (Agent) – the agent to compute the reward for
- Returns:
reward tensor of shape
(self.world.batch_dim)- Return type:
Examples
>>> from vmas.simulator.scenario import BaseScenario >>> import torch >>> class Scenario(BaseScenario): >>> def reward(self, agent): ... # reward every agent proportionally to distance from first landmark ... rew = -torch.linalg.vector_norm(agent.state.pos - self.world.landmarks[0].state.pos, dim=-1) ... return rew
- done() Tensor[source]
This function computes the done flag for each env in a vectorized way.
The returned tensor should contain the
donefor all envs and should have shape(n_envs)and dtypetorch.bool.Implementors can access the world at
world.To increase performance, torch tensors should be created with the device already set, like:
torch.tensor(..., device=self.world.device)By default, this function returns all
Falses.The scenario can still be done if
max_stepshas been set at envirtonment construction.- Returns:
done tensor of shape
(self.world.batch_dim)- Return type:
Examples
>>> from vmas.simulator.scenario import BaseScenario >>> import torch >>> class Scenario(BaseScenario): >>> def done(self): ... # retrun done when all agents have battery level lower than a threshold ... return torch.stack([a.battery_level < threshold for a in self.world.agents], dim=-1).all(-1)
- info(agent: Agent) Dict[str, Tensor][source]
This function computes the info dict for
agentin a vectorized way.The returned dict should have a key for each info of interest and the corresponding value should be a tensor of shape
(n_envs, info_size)By default this function returns an empty dictionary.
Implementors can access the world at
world.To increase performance, torch tensors should be created with the device already set, like:
torch.tensor(..., device=self.world.device)- Parameters:
agent (Agent) – the agent to compute the info for
- Returns:
the info
- Return type:
Union[torch.Tensor, Dict[str, torch.Tensor]]
- extra_render(env_index: int = 0) List[Geom][source]
This function facilitates additional user/scenario-level rendering for a specific environment index.
The returned list is a list of geometries. It is the user’s responsibility to set attributes such as color, position and rotation.
- Parameters:
env_index (int, optional) – index of the environment to render. Defaults to
0.
Returns: A list of geometries to render for the current time step.
Examples
>>> from vmas.simulator.utils import Color >>> from vmas.simulator.scenario import BaseScenario >>> class Scenario(BaseScenario): >>> def extra_render(self, env_index): >>> from vmas.simulator import rendering >>> color = Color.BLACK.value >>> line = rendering.Line( ... (self.world.agents[0].state.pos[env_index]), ... (self.world.agents[1].state.pos[env_index]), ... width=1, ... ) >>> xform = rendering.Transform() >>> line.add_attr(xform) >>> line.set_color(*color) >>> return [line]
- process_action(agent: Agent)[source]
This function can be overridden to process the agent actions before the simulation step.
It has access to the world through the
worldattributeFor example here you can manage additional actions before passing them to the dynamics.
- Parameters:
agent (Agent) – the agent process the action of
Examples
>>> from vmas.simulator.scenario import BaseScenario >>> from vmas.simulator.utils import TorchUtils >>> class Scenario(BaseScenario): >>> def process_action(self, agent): >>> # Clamp square to circle >>> agent.action.u = TorchUtils.clamp_with_norm(agent.action.u, agent.u_range) >>> # Can use a PID controller to turn velocity actions into forces >>> # (e.g., from vmas.simulator.controllers.velocity_controller) >>> agent.controller.process_force() >>> return
- pre_step()[source]
This function can be overridden to perform any computation that has to happen before the simulation step. Its intended use is for computation that has to happen only once before the simulation step has accured.
For example, you can store temporal data before letting the world step.
Examples
>>> from vmas.simulator.scenario import BaseScenario >>> class Scenario(BaseScenario): >>> def pre_step(self): >>> for agent in self.world.agents: >>> agent.prev_state = agent.state >>> return
- post_step()[source]
This function can be overridden to perform any computation that has to happen after the simulation step. Its intended use is for computation that has to happen only once after the simulation step has accured.
For example, you can store temporal sensor data in this function.
Examples
>>> from vmas.simulator.scenario import BaseScenario >>> class Scenario(BaseScenario): >>> def post_step(self): >>> for agent in self.world.agents: >>> # Let the sensor take a measurement >>> measurements = agent.sensors[0].measure() >>> # Store sensor data in agent.sensor_history >>> agent.sensor_history.append(measurements) >>> return