Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions ci/jenkins_tests/run_rllib_tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@ docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $DOCKER_SHA \
/ray/ci/suppress_output python /ray/python/ray/rllib/tests/test_checkpoint_restore.py

docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $DOCKER_SHA \
/ray/ci/suppress_output python /ray/python/ray/rllib/tests/test_policy_evaluator.py
/ray/ci/suppress_output python /ray/python/ray/rllib/tests/test_rollout_worker.py

docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $DOCKER_SHA \
/ray/ci/suppress_output python /ray/python/ray/rllib/tests/test_nested_spaces.py
Expand Down Expand Up @@ -389,14 +389,17 @@ docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $DOCKER_SHA \
docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $DOCKER_SHA \
/ray/ci/suppress_output python /ray/python/ray/rllib/examples/custom_loss.py --iters=2

docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $DOCKER_SHA \
/ray/ci/suppress_output python /ray/python/ray/rllib/examples/rollout_worker_custom_workflow.py

docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $DOCKER_SHA \
/ray/ci/suppress_output python /ray/python/ray/rllib/examples/custom_tf_policy.py --iters=2

docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $DOCKER_SHA \
/ray/ci/suppress_output python /ray/python/ray/rllib/examples/custom_torch_policy.py --iters=2

docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $DOCKER_SHA \
/ray/ci/suppress_output python /ray/python/ray/rllib/examples/policy_evaluator_custom_workflow.py
/ray/ci/suppress_output python /ray/python/ray/rllib/examples/rollout_worker_custom_workflow.py

docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $DOCKER_SHA \
/ray/ci/suppress_output python /ray/python/ray/rllib/examples/custom_metrics_and_callbacks.py --num-iters=2
Expand Down
2 changes: 1 addition & 1 deletion doc/source/rllib-concepts.rst
Original file line number Diff line number Diff line change
Expand Up @@ -453,7 +453,7 @@ Policy Evaluation

Given an environment and policy, policy evaluation produces `batches <https://github.com/ray-project/ray/blob/master/python/ray/rllib/policy/sample_batch.py>`__ of experiences. This is your classic "environment interaction loop". Efficient policy evaluation can be burdensome to get right, especially when leveraging vectorization, RNNs, or when operating in a multi-agent environment. RLlib provides a `RolloutWorker <https://github.com/ray-project/ray/blob/master/python/ray/rllib/evaluation/rollout_worker.py>`__ class that manages all of this, and this class is used in most RLlib algorithms.

You can use rollout workers standalone to produce batches of experiences. This can be done by calling ``worker.sample()`` on a worker instance, or ``worker.sample.remote()`` in parallel on worker instances created as Ray actors (see ``RolloutWorkers.create_remote``).
You can use rollout workers standalone to produce batches of experiences. This can be done by calling ``worker.sample()`` on a worker instance, or ``worker.sample.remote()`` in parallel on worker instances created as Ray actors (see `WorkerSet <https://github.com/ray-project/ray/blob/master/python/ray/rllib/evaluation/worker_set.py>`__).

Here is an example of creating a set of rollout workers and using them gather experiences in parallel. The trajectories are concatenated, the policy learns on the trajectory batch, and then we broadcast the policy weights to the workers for the next round of rollouts:

Expand Down
2 changes: 1 addition & 1 deletion doc/source/rllib-config.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
2 changes: 1 addition & 1 deletion doc/source/rllib-examples.rst
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ Training Workflows
Example of how to adjust the configuration of an environment over time.
- `Custom metrics <https://github.com/ray-project/ray/blob/master/python/ray/rllib/examples/custom_metrics_and_callbacks.py>`__:
Example of how to output custom training metrics to TensorBoard.
- `Using policy evaluators directly for control over the whole training workflow <https://github.com/ray-project/ray/blob/master/python/ray/rllib/examples/policy_evaluator_custom_workflow.py>`__:
- `Using rollout workers directly for control over the whole training workflow <https://github.com/ray-project/ray/blob/master/python/ray/rllib/examples/rollout_worker_custom_workflow.py>`__:
Example of how to use RLlib's lower-level building blocks to implement a fully customized training workflow.

Custom Envs and Models
Expand Down
18 changes: 9 additions & 9 deletions doc/source/rllib-training.rst
Original file line number Diff line number Diff line change
Expand Up @@ -178,27 +178,27 @@ Custom Training Workflows

In the `basic training example <https://github.com/ray-project/ray/blob/master/python/ray/rllib/examples/custom_env.py>`__, Tune will call ``train()`` on your trainer once per iteration and report the new training results. Sometimes, it is desirable to have full control over training, but still run inside Tune. Tune supports `custom trainable functions <tune-usage.html#training-api>`__ that can be used to implement `custom training workflows (example) <https://github.com/ray-project/ray/blob/master/python/ray/rllib/examples/custom_train_fn.py>`__.

For even finer-grained control over training, you can use RLlib's lower-level `building blocks <rllib-concepts.html>`__ directly to implement `fully customized training workflows <https://github.com/ray-project/ray/blob/master/python/ray/rllib/examples/policy_evaluator_custom_workflow.py>`__.
For even finer-grained control over training, you can use RLlib's lower-level `building blocks <rllib-concepts.html>`__ directly to implement `fully customized training workflows <https://github.com/ray-project/ray/blob/master/python/ray/rllib/examples/rollout_worker_custom_workflow.py>`__.

Accessing Policy State
~~~~~~~~~~~~~~~~~~~~~~
It is common to need to access a trainer's internal state, e.g., to set or get internal weights. In RLlib trainer state is replicated across multiple *policy evaluators* (Ray actors) in the cluster. However, you can easily get and update this state between calls to ``train()`` via ``trainer.optimizer.foreach_evaluator()`` or ``trainer.optimizer.foreach_evaluator_with_index()``. These functions take a lambda function that is applied with the evaluator as an arg. You can also return values from these functions and those will be returned as a list.
It is common to need to access a trainer's internal state, e.g., to set or get internal weights. In RLlib trainer state is replicated across multiple *rollout workers* (Ray actors) in the cluster. However, you can easily get and update this state between calls to ``train()`` via ``trainer.workers.foreach_worker()`` or ``trainer.workers.foreach_worker_with_index()``. These functions take a lambda function that is applied with the worker as an arg. You can also return values from these functions and those will be returned as a list.

You can also access just the "master" copy of the trainer state through ``trainer.get_policy()`` or ``trainer.local_evaluator``, but note that updates here may not be immediately reflected in remote replicas if you have configured ``num_workers > 0``. For example, to access the weights of a local TF policy, you can run ``trainer.get_policy().get_weights()``. This is also equivalent to ``trainer.local_evaluator.policy_map["default_policy"].get_weights()``:
You can also access just the "master" copy of the trainer state through ``trainer.get_policy()`` or ``trainer.workers.local_worker()``, but note that updates here may not be immediately reflected in remote replicas if you have configured ``num_workers > 0``. For example, to access the weights of a local TF policy, you can run ``trainer.get_policy().get_weights()``. This is also equivalent to ``trainer.workers.local_worker().policy_map["default_policy"].get_weights()``:

.. code-block:: python

# Get weights of the default local policy
trainer.get_policy().get_weights()

# Same as above
trainer.local_evaluator.policy_map["default_policy"].get_weights()
trainer.workers.local_worker().policy_map["default_policy"].get_weights()

# Get list of weights of each evaluator, including remote replicas
trainer.optimizer.foreach_evaluator(lambda ev: ev.get_policy().get_weights())
# Get list of weights of each worker, including remote replicas
trainer.workers.foreach_worker(lambda ev: ev.get_policy().get_weights())

# Same as above
trainer.optimizer.foreach_evaluator_with_index(lambda ev, i: ev.get_policy().get_weights())
trainer.workers.foreach_worker_with_index(lambda ev, i: ev.get_policy().get_weights())

Global Coordination
~~~~~~~~~~~~~~~~~~~
Expand Down Expand Up @@ -299,7 +299,7 @@ Approach 1: Use the Trainer API and update the environment between calls to ``tr
phase = 1
else:
phase = 0
trainer.optimizer.foreach_evaluator(
trainer.workers.foreach_worker(
lambda ev: ev.foreach_env(
lambda env: env.set_phase(phase)))

Expand Down Expand Up @@ -333,7 +333,7 @@ Approach 2: Use the callbacks API to update the environment on new training resu
else:
phase = 0
trainer = info["trainer"]
trainer.optimizer.foreach_evaluator(
trainer.workers.foreach_worker(
lambda ev: ev.foreach_env(
lambda env: env.set_phase(phase)))

Expand Down
3 changes: 2 additions & 1 deletion python/ray/rllib/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

from ray.rllib.evaluation.policy_graph import PolicyGraph
from ray.rllib.evaluation.tf_policy_graph import TFPolicyGraph
from ray.rllib.evaluation.policy_evaluator import PolicyEvaluator
from ray.rllib.evaluation.rollout_worker import RolloutWorker
from ray.rllib.env.base_env import BaseEnv
from ray.rllib.env.multi_agent_env import MultiAgentEnv
from ray.rllib.env.vector_env import VectorEnv
Expand Down Expand Up @@ -55,6 +55,7 @@ def _register_all():
"PolicyGraph",
"TFPolicy",
"TFPolicyGraph",
"RolloutWorker",
"PolicyEvaluator",
"SampleBatch",
"BaseEnv",
Expand Down
26 changes: 10 additions & 16 deletions python/ray/rllib/agents/a3c/a2c.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@
from __future__ import division
from __future__ import print_function

from ray.rllib.agents.a3c.a3c import A3CTrainer, DEFAULT_CONFIG as A3C_CONFIG
from ray.rllib.optimizers import SyncSamplesOptimizer
from ray.rllib.utils.annotations import override
from ray.rllib.agents.a3c.a3c import DEFAULT_CONFIG as A3C_CONFIG, \
validate_config, get_policy_class
from ray.rllib.agents.a3c.a3c_tf_policy import A3CTFPolicy
from ray.rllib.agents.trainer_template import build_trainer
from ray.rllib.utils import merge_dicts

A2C_DEFAULT_CONFIG = merge_dicts(
Expand All @@ -16,16 +17,9 @@
},
)


class A2CTrainer(A3CTrainer):
"""Synchronous variant of the A3CTrainer."""

_name = "A2C"
_default_config = A2C_DEFAULT_CONFIG

@override(A3CTrainer)
def _make_optimizer(self):
return SyncSamplesOptimizer(
self.local_evaluator,
self.remote_evaluators,
train_batch_size=self.config["train_batch_size"])
A2CTrainer = build_trainer(
name="A2C",
default_config=A2C_DEFAULT_CONFIG,
default_policy=A3CTFPolicy,
get_policy_class=get_policy_class,
validate_config=validate_config)
59 changes: 21 additions & 38 deletions python/ray/rllib/agents/a3c/a3c.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,10 @@
from __future__ import division
from __future__ import print_function

import time

from ray.rllib.agents.a3c.a3c_tf_policy import A3CTFPolicy
from ray.rllib.agents.trainer import Trainer, with_common_config
from ray.rllib.agents.trainer import with_common_config
from ray.rllib.agents.trainer_template import build_trainer
from ray.rllib.optimizers import AsyncGradientsOptimizer
from ray.rllib.utils.annotations import override

# yapf: disable
# __sphinx_doc_begin__
Expand Down Expand Up @@ -38,43 +36,28 @@
# yapf: enable


class A3CTrainer(Trainer):
"""A3C implementations in TensorFlow and PyTorch."""
def get_policy_class(config):
if config["use_pytorch"]:
from ray.rllib.agents.a3c.a3c_torch_policy import \
A3CTorchPolicy
return A3CTorchPolicy
else:
return A3CTFPolicy

_name = "A3C"
_default_config = DEFAULT_CONFIG
_policy = A3CTFPolicy

@override(Trainer)
def _init(self, config, env_creator):
if config["use_pytorch"]:
from ray.rllib.agents.a3c.a3c_torch_policy import \
A3CTorchPolicy
policy_cls = A3CTorchPolicy
else:
policy_cls = self._policy
def validate_config(config):
if config["entropy_coeff"] < 0:
raise DeprecationWarning("entropy_coeff must be >= 0")

if config["entropy_coeff"] < 0:
raise DeprecationWarning("entropy_coeff must be >= 0")

self.local_evaluator = self.make_local_evaluator(
env_creator, policy_cls)
self.remote_evaluators = self.make_remote_evaluators(
env_creator, policy_cls, config["num_workers"])
self.optimizer = self._make_optimizer()
def make_async_optimizer(workers, config):
return AsyncGradientsOptimizer(workers, **config["optimizer"])

@override(Trainer)
def _train(self):
prev_steps = self.optimizer.num_steps_sampled
start = time.time()
while time.time() - start < self.config["min_iter_time_s"]:
self.optimizer.step()
result = self.collect_metrics()
result.update(timesteps_this_iter=self.optimizer.num_steps_sampled -
prev_steps)
return result

def _make_optimizer(self):
return AsyncGradientsOptimizer(self.local_evaluator,
self.remote_evaluators,
**self.config["optimizer"])
A3CTrainer = build_trainer(
name="A3C",
default_config=DEFAULT_CONFIG,
default_policy=A3CTFPolicy,
get_policy_class=get_policy_class,
validate_config=validate_config,
make_policy_optimizer=make_async_optimizer)
2 changes: 1 addition & 1 deletion python/ray/rllib/agents/ddpg/apex.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ def update_target_if_needed(self):
# Ape-X updates based on num steps trained, not sampled
if self.optimizer.num_steps_trained - self.last_target_update_ts > \
self.config["target_network_update_freq"]:
self.local_evaluator.foreach_trainable_policy(
self.workers.local_worker().foreach_trainable_policy(
lambda p, _: p.update_target())
self.last_target_update_ts = self.optimizer.num_steps_trained
self.num_target_updates += 1
4 changes: 2 additions & 2 deletions python/ray/rllib/agents/ddpg/ddpg.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,9 +171,9 @@ def _train(self):
if pure_expl_steps:
# tell workers whether they should do pure exploration
only_explore = self.global_timestep < pure_expl_steps
self.local_evaluator.foreach_trainable_policy(
self.workers.local_worker().foreach_trainable_policy(
lambda p, _: p.set_pure_exploration_phase(only_explore))
for e in self.remote_evaluators:
for e in self.workers.remote_workers():
e.foreach_trainable_policy.remote(
lambda p, _: p.set_pure_exploration_phase(only_explore))
return super(DDPGTrainer, self)._train()
Expand Down
2 changes: 1 addition & 1 deletion python/ray/rllib/agents/ddpg/ddpg_policy.py
Original file line number Diff line number Diff line change
Expand Up @@ -515,7 +515,7 @@ def make_uniform_random_actions():

stochastic_actions = tf.cond(
# need to condition on noise_scale > 0 because zeroing
# noise_scale is how evaluator signals no noise should be used
# noise_scale is how a worker signals no noise should be used
# (this is ugly and should be fixed by adding an "eval_mode"
# config flag or something)
tf.logical_and(enable_pure_exploration, noise_scale > 0),
Expand Down
2 changes: 1 addition & 1 deletion python/ray/rllib/agents/dqn/apex.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ def update_target_if_needed(self):
# Ape-X updates based on num steps trained, not sampled
if self.optimizer.num_steps_trained - self.last_target_update_ts > \
self.config["target_network_update_freq"]:
self.local_evaluator.foreach_trainable_policy(
self.workers.local_worker().foreach_trainable_policy(
lambda p, _: p.update_target())
self.last_target_update_ts = self.optimizer.num_steps_trained
self.num_target_updates += 1
50 changes: 26 additions & 24 deletions python/ray/rllib/agents/dqn/dqn.py
Original file line number Diff line number Diff line change
Expand Up @@ -196,26 +196,26 @@ def on_episode_end(info):
config["callbacks"]["on_episode_end"] = tune.function(
on_episode_end)

self.local_evaluator = self.make_local_evaluator(
env_creator, self._policy)

def create_remote_evaluators():
return self.make_remote_evaluators(env_creator, self._policy,
config["num_workers"])

if config["optimizer_class"] != "AsyncReplayOptimizer":
self.remote_evaluators = create_remote_evaluators()
self.workers = self._make_workers(
env_creator,
self._policy,
config,
num_workers=self.config["num_workers"])
workers_needed = 0
else:
# Hack to workaround https://github.com/ray-project/ray/issues/2541
self.remote_evaluators = None
self.workers = self._make_workers(
env_creator, self._policy, config, num_workers=0)
workers_needed = self.config["num_workers"]

self.optimizer = getattr(optimizers, config["optimizer_class"])(
self.local_evaluator, self.remote_evaluators,
**config["optimizer"])
# Create the remote evaluators *after* the replay actors
if self.remote_evaluators is None:
self.remote_evaluators = create_remote_evaluators()
self.optimizer._set_evaluators(self.remote_evaluators)
self.workers, **config["optimizer"])

# Create the remote workers *after* the replay actors
if workers_needed > 0:
self.workers.add_workers(workers_needed)
self.optimizer._set_workers(self.workers.remote_workers())

self.last_target_update_ts = 0
self.num_target_updates = 0
Expand All @@ -226,9 +226,9 @@ def _train(self):

# Update worker explorations
exp_vals = [self.exploration0.value(self.global_timestep)]
self.local_evaluator.foreach_trainable_policy(
self.workers.local_worker().foreach_trainable_policy(
lambda p, _: p.set_epsilon(exp_vals[0]))
for i, e in enumerate(self.remote_evaluators):
for i, e in enumerate(self.workers.remote_workers()):
exp_val = self.explorations[i].value(self.global_timestep)
e.foreach_trainable_policy.remote(
lambda p, _: p.set_epsilon(exp_val))
Expand All @@ -245,8 +245,8 @@ def _train(self):
if self.config["per_worker_exploration"]:
# Only collect metrics from the third of workers with lowest eps
result = self.collect_metrics(
selected_evaluators=self.remote_evaluators[
-len(self.remote_evaluators) // 3:])
selected_workers=self.workers.remote_workers()[
-len(self.workers.remote_workers()) // 3:])
else:
result = self.collect_metrics()

Expand All @@ -263,7 +263,7 @@ def _train(self):
def update_target_if_needed(self):
if self.global_timestep - self.last_target_update_ts > \
self.config["target_network_update_freq"]:
self.local_evaluator.foreach_trainable_policy(
self.workers.local_worker().foreach_trainable_policy(
lambda p, _: p.update_target())
self.last_target_update_ts = self.global_timestep
self.num_target_updates += 1
Expand All @@ -275,11 +275,13 @@ def global_timestep(self):
def _evaluate(self):
logger.info("Evaluating current policy for {} episodes".format(
self.config["evaluation_num_episodes"]))
self.evaluation_ev.restore(self.local_evaluator.save())
self.evaluation_ev.foreach_policy(lambda p, _: p.set_epsilon(0))
self.evaluation_workers.local_worker().restore(
self.workers.local_worker().save())
self.evaluation_workers.local_worker().foreach_policy(
lambda p, _: p.set_epsilon(0))
for _ in range(self.config["evaluation_num_episodes"]):
self.evaluation_ev.sample()
metrics = collect_metrics(self.evaluation_ev)
self.evaluation_workers.local_worker().sample()
metrics = collect_metrics(self.evaluation_workers.local_worker())
return {"evaluation": metrics}

def _make_exploration_schedule(self, worker_index):
Expand Down
Loading