class
ThreadedVectorEnvProvides same functionality as VectorEnv, the only difference is it runs in a multi-thread setup inside a single process.
The VectorEnv runs in a multi-proc setup. This makes it much easier to debug when using VectorEnv because you can actually put break points in the environment methods. It should not be used for best performance.
Methods
- def _worker_env(connection_read_fn: typing.Callable, connection_write_fn: typing.Callable, env_fn: typing.Callable, env_fn_args: typing.Tuple[typing.Any], auto_reset_done: bool, mask_signals: bool = False, child_pipe: typing.Optional[multiprocessing.connection.Connection] = None, parent_pipe: typing.Optional[multiprocessing.connection.Connection] = None) -> None
- process worker for creating and interacting with the environment.
- def async_step(self, data: typing.Sequence[typing.Union[int, numpy.ndarray]]) -> None
- Asynchronously step in the environments.
- def async_step_at(self, index_env: int, action: typing.Union[int, numpy.ndarray]) -> None
- def call(self, function_names: typing.List[str], function_args_list: typing.Optional[typing.List[typing.Any]] = None) -> typing.List[typing.Any]
- Calls a list of functions (which are passed by name) on the corresponding env (by index).
- def call_at(self, index: int, function_name: str, function_args: typing.Optional[typing.Dict[str, typing.Any]] = None) -> typing.Any
- Calls a function or retrieves a property/member variable (which is passed by name) on the selected env and returns the result.
- def close(self) -> None
- def count_episodes(self)
- def current_episodes(self)
- def episode_over(self)
- def get_metrics(self)
- def initialize_batch_renderer(self, config: DictConfig)
- Provides VectorEnv with batch rendering capability. Refer to the EnvBatchRenderer class.
- def pause_at(self, index: int) -> None
- Pauses computation on this env without destroying the env.
- def post_step(self, observations) -> typing.List[typing.OrderedDict]
- Performs batch transformations on step outputs.
- def render(self, mode: str = 'human', *args, **kwargs) -> typing.Optional[numpy.ndarray]
- Render observations from all environments in a tiled image.
- def reset(self)
- Reset all the vectorized environments
- def reset_at(self, index_env: int)
- Reset in the index_env environment in the vector.
- def resume_all(self) -> None
- Resumes any paused envs.
- def step(self, data: typing.Sequence[typing.Union[int, numpy.ndarray]]) -> typing.List[typing.Any]
- Perform actions in the vectorized environments.
- def step_at(self, index_env: int, action: typing.Union[int, numpy.ndarray])
- Step in the index_env environment in the vector.
- def wait_step(self) -> typing.List[typing.Any]
- Wait until all the asynchronous environments have synchronized.
- def wait_step_at(self, index_env: int) -> typing.Any
Special methods
- def __del__(self)
- def __enter__(self)
- def __exit__(self, exc_type, exc_val, exc_tb)
- def __init__(self, make_env_fn: typing.Callable[[...], gym.core.Env], env_fn_args: typing.Sequence[typing.Tuple], auto_reset_done: bool = True, multiprocessing_start_method: str = 'forkserver', workers_ignore_signals: bool = False) -> None
Properties
- num_envs get
- number of individual environments.
Data
- observation_spaces: typing.List[gym.spaces.dict.Dict] = None
- number_of_episodes: typing.List[typing.Optional[int]] = None
- action_spaces: typing.List[gym.spaces.dict.Dict] = None
Method documentation
def habitat. core. vector_env. ThreadedVectorEnv. async_step(self,
data: typing.Sequence[typing.Union[int, numpy.ndarray]]) -> None
Asynchronously step in the environments.
Parameters | |
---|---|
data | list of size _num_envs containing keyword arguments to
pass to step() method for each Environment. For example,
[1, 3 ,5 , ...] . |
def habitat. core. vector_env. ThreadedVectorEnv. call(self,
function_names: typing.List[str],
function_args_list: typing.Optional[typing.List[typing.Any]] = None) -> typing.List[typing.Any]
Calls a list of functions (which are passed by name) on the corresponding env (by index).
Parameters | |
---|---|
function_names | the name of the functions to call on the envs. |
function_args_list | list of function args for each function. If
provided, len(function_args_list) should be as long as
len(function_names) . |
Returns | result of calling the function. |
def habitat. core. vector_env. ThreadedVectorEnv. call_at(self,
index: int,
function_name: str,
function_args: typing.Optional[typing.Dict[str, typing.Any]] = None) -> typing.Any
Calls a function or retrieves a property/member variable (which is passed by name) on the selected env and returns the result.
Parameters | |
---|---|
index | which env to call the function on. |
function_name | the name of the function to call or property to retrieve on the env. |
function_args | optional function args. |
Returns | result of calling the function. |
def habitat. core. vector_env. ThreadedVectorEnv. initialize_batch_renderer(self,
config: DictConfig)
Provides VectorEnv with batch rendering capability. Refer to the EnvBatchRenderer class.
Parameters | |
---|---|
config | Base configuration. |
def habitat. core. vector_env. ThreadedVectorEnv. pause_at(self,
index: int) -> None
Pauses computation on this env without destroying the env.
Parameters | |
---|---|
index | which env to pause. All indexes after this one will be shifted down by one. |
This is useful for not needing to call steps on all environments when only some are active (for example during the last episodes of running eval episodes).
def habitat. core. vector_env. ThreadedVectorEnv. post_step(self, observations) -> typing.List[typing.OrderedDict]
Performs batch transformations on step outputs.
Parameters | |
---|---|
observations | Observation dicts for each environment. |
Returns | Processed observation dicts for each environment. |
def habitat. core. vector_env. ThreadedVectorEnv. reset(self)
Reset all the vectorized environments
Returns | list of outputs from the reset method of envs. |
---|
def habitat. core. vector_env. ThreadedVectorEnv. step(self,
data: typing.Sequence[typing.Union[int, numpy.ndarray]]) -> typing.List[typing.Any]
Perform actions in the vectorized environments.
Parameters | |
---|---|
data | list of size _num_envs containing keyword arguments to
pass to step() method for each Environment. For example,
[1, 3 ,5 , ...] . |
Returns | list of outputs from the step method of envs. |
def habitat. core. vector_env. ThreadedVectorEnv. step_at(self,
index_env: int,
action: typing.Union[int, numpy.ndarray])
Step in the index_env environment in the vector.
Parameters | |
---|---|
index_env | index of the environment to be stepped into |
action | action to be taken |
Returns | list containing the output of step method of indexed env. |
def habitat. core. vector_env. ThreadedVectorEnv. __init__(self,
make_env_fn: typing.Callable[[...], gym.core.Env],
env_fn_args: typing.Sequence[typing.Tuple],
auto_reset_done: bool = True,
multiprocessing_start_method: str = 'forkserver',
workers_ignore_signals: bool = False) -> None
Parameters | |
---|---|
make_env_fn | function which creates a single environment. An environment can be of type env.Env or env.RLEnv |
env_fn_args | tuple of tuple of args to pass to the
make_gym_from_config . |
auto_reset_done | automatically reset the environment when done. This functionality is provided for seamless training of vectorized environments. |
multiprocessing_start_method | the multiprocessing method used to
spawn worker processes. Valid methods are
{'spawn', 'forkserver', 'fork'} ; 'forkserver' is the
recommended method as it works well with CUDA. If 'fork' is
used, the subproccess must be started before any other GPU usage. |
workers_ignore_signals | Whether or not workers will ignore SIGINT and SIGTERM and instead will only exit when close() is called |