habitat.core.vector_env.ThreadedVectorEnv class

Provides same functionality as VectorEnv, the only difference is it runs in a multi-thread setup inside a single process.

The VectorEnv runs in a multi-proc setup. This makes it much easier to debug when using VectorEnv because you can actually put break points in the environment methods. It should not be used for best performance.

Methods

def _worker_env(connection_read_fn: typing.Callable, connection_write_fn: typing.Callable, env_fn: typing.Callable, env_fn_args: typing.Tuple[typing.Any], auto_reset_done: bool, mask_signals: bool = False, child_pipe: typing.Optional[multiprocessing.connection.Connection] = None, parent_pipe: typing.Optional[multiprocessing.connection.Connection] = None) -> None
process worker for creating and interacting with the environment.
def async_step(self, data: typing.Sequence[typing.Union[int, numpy.ndarray]]) -> None
Asynchronously step in the environments.
def async_step_at(self, index_env: int, action: typing.Union[int, numpy.ndarray]) -> None
def call(self, function_names: typing.List[str], function_args_list: typing.Optional[typing.List[typing.Any]] = None) -> typing.List[typing.Any]
Calls a list of functions (which are passed by name) on the corresponding env (by index).
def call_at(self, index: int, function_name: str, function_args: typing.Optional[typing.Dict[str, typing.Any]] = None) -> typing.Any
Calls a function or retrieves a property/member variable (which is passed by name) on the selected env and returns the result.
def close(self) -> None
def count_episodes(self)
def current_episodes(self)
def episode_over(self)
def get_metrics(self)
def initialize_batch_renderer(self, config: DictConfig)
Provides VectorEnv with batch rendering capability. Refer to the EnvBatchRenderer class.
def pause_at(self, index: int) -> None
Pauses computation on this env without destroying the env.
def post_step(self, observations) -> typing.List[typing.OrderedDict]
Performs batch transformations on step outputs.
def render(self, mode: str = 'human', *args, **kwargs) -> typing.Optional[numpy.ndarray]
Render observations from all environments in a tiled image.
def reset(self)
Reset all the vectorized environments
def reset_at(self, index_env: int)
Reset in the index_env environment in the vector.
def resume_all(self) -> None
Resumes any paused envs.
def step(self, data: typing.Sequence[typing.Union[int, numpy.ndarray]]) -> typing.List[typing.Any]
Perform actions in the vectorized environments.
def step_at(self, index_env: int, action: typing.Union[int, numpy.ndarray])
Step in the index_env environment in the vector.
def wait_step(self) -> typing.List[typing.Any]
Wait until all the asynchronous environments have synchronized.
def wait_step_at(self, index_env: int) -> typing.Any

Special methods

def __del__(self)
def __enter__(self)
def __exit__(self, exc_type, exc_val, exc_tb)
def __init__(self, make_env_fn: typing.Callable[[...], gym.core.Env], env_fn_args: typing.Sequence[typing.Tuple], auto_reset_done: bool = True, multiprocessing_start_method: str = 'forkserver', workers_ignore_signals: bool = False) -> None

Properties

num_envs get
number of individual environments.

Data

observation_spaces: typing.List[gym.spaces.dict.Dict] = None
number_of_episodes: typing.List[typing.Optional[int]] = None
action_spaces: typing.List[gym.spaces.dict.Dict] = None

Method documentation

def habitat.core.vector_env.ThreadedVectorEnv.async_step(self, data: typing.Sequence[typing.Union[int, numpy.ndarray]]) -> None

Asynchronously step in the environments.

Parameters
data list of size _num_envs containing keyword arguments to pass to step() method for each Environment. For example, [1, 3 ,5 , ...].

def habitat.core.vector_env.ThreadedVectorEnv.call(self, function_names: typing.List[str], function_args_list: typing.Optional[typing.List[typing.Any]] = None) -> typing.List[typing.Any]

Calls a list of functions (which are passed by name) on the corresponding env (by index).

Parameters
function_names the name of the functions to call on the envs.
function_args_list list of function args for each function. If provided, len(function_args_list) should be as long as len(function_names).
Returns result of calling the function.

def habitat.core.vector_env.ThreadedVectorEnv.call_at(self, index: int, function_name: str, function_args: typing.Optional[typing.Dict[str, typing.Any]] = None) -> typing.Any

Calls a function or retrieves a property/member variable (which is passed by name) on the selected env and returns the result.

Parameters
index which env to call the function on.
function_name the name of the function to call or property to retrieve on the env.
function_args optional function args.
Returns result of calling the function.

def habitat.core.vector_env.ThreadedVectorEnv.initialize_batch_renderer(self, config: DictConfig)

Provides VectorEnv with batch rendering capability. Refer to the EnvBatchRenderer class.

Parameters
config Base configuration.

def habitat.core.vector_env.ThreadedVectorEnv.pause_at(self, index: int) -> None

Pauses computation on this env without destroying the env.

Parameters
index which env to pause. All indexes after this one will be shifted down by one.

This is useful for not needing to call steps on all environments when only some are active (for example during the last episodes of running eval episodes).

def habitat.core.vector_env.ThreadedVectorEnv.post_step(self, observations) -> typing.List[typing.OrderedDict]

Performs batch transformations on step outputs.

Parameters
observations Observation dicts for each environment.
Returns Processed observation dicts for each environment.

def habitat.core.vector_env.ThreadedVectorEnv.reset(self)

Reset all the vectorized environments

Returns list of outputs from the reset method of envs.

def habitat.core.vector_env.ThreadedVectorEnv.reset_at(self, index_env: int)

Reset in the index_env environment in the vector.

Parameters
index_env index of the environment to be reset
Returns list containing the output of reset method of indexed env.

def habitat.core.vector_env.ThreadedVectorEnv.step(self, data: typing.Sequence[typing.Union[int, numpy.ndarray]]) -> typing.List[typing.Any]

Perform actions in the vectorized environments.

Parameters
data list of size _num_envs containing keyword arguments to pass to step() method for each Environment. For example, [1, 3 ,5 , ...].
Returns list of outputs from the step method of envs.

def habitat.core.vector_env.ThreadedVectorEnv.step_at(self, index_env: int, action: typing.Union[int, numpy.ndarray])

Step in the index_env environment in the vector.

Parameters
index_env index of the environment to be stepped into
action action to be taken
Returns list containing the output of step method of indexed env.

def habitat.core.vector_env.ThreadedVectorEnv.__init__(self, make_env_fn: typing.Callable[[...], gym.core.Env], env_fn_args: typing.Sequence[typing.Tuple], auto_reset_done: bool = True, multiprocessing_start_method: str = 'forkserver', workers_ignore_signals: bool = False) -> None

Parameters
make_env_fn function which creates a single environment. An environment can be of type env.Env or env.RLEnv
env_fn_args tuple of tuple of args to pass to the make_gym_from_config.
auto_reset_done automatically reset the environment when done. This functionality is provided for seamless training of vectorized environments.
multiprocessing_start_method the multiprocessing method used to spawn worker processes. Valid methods are {'spawn', 'forkserver', 'fork'}; 'forkserver' is the recommended method as it works well with CUDA. If 'fork' is used, the subproccess must be started before any other GPU usage.
workers_ignore_signals Whether or not workers will ignore SIGINT and SIGTERM and instead will only exit when close() is called