Source code for train.agents.base

"""
All agents should extend the base :class:`~train.Agent` class and implement the :func:`~train.Agent.act` method:

.. code:: python

    from train import Agent

    class MyAgent(Agent):

        def act(self, state):
            ...

When :func:`~train.Agent.train` or :func:`~train.Agent.test` methods are called, an action is selected by calling the :func:`~train.Agent.act` method and passed to the environment. Then the environment returns a reward and observation. This entire transition (S, A, R, S') is saved in a :class:`~train.Transitions` object which can be accessed using ``self.transitions``. When an episode terminates, a new episode is started by resetting the environment and agent.

During training, the following callback methods on agent are called at respective stages:

.. code:: python

    on_step_begin
    on_step_end
    on_episode_begin
    on_episode_end

These methods combined with the :class:`~train.Transitions` object in ``self.transitions`` can be used to implement various algorithms. ``on_step_end()`` can be used to implement online algorithms such as TD(0) and ``on_episode_end()`` can be used to implement algorithms such as Monte Carlo methods:

.. code:: python

    class MyAgent(Agent):

        def on_step_end(self):
            # DQN
            S, A, R, Snext, dones = self.transitions.sample(32) # randomly sample transitions
            ...

        def on_episode_end(self):
            # REINFORCE
            S, A, R, Snext, dones = self.transitions.get() # get all recent transitions
            self.transitions.reset() # reset transitions for next episode
            ...

.. note::

   Transitions are not recorded when running :func:`~train.Agent.test`.
"""

from itertools import count

import numpy as np

from .. import utils as U
from ..policy import epsilon_greedy, greedy
from ..state import State, Transition, Transitions


class Utils():

    def compute_returns(self, R, gamma=None):
        if gamma is None:
            gamma = self.gamma
        G, T = np.array(R), len(R)
        for t in reversed(range(T - 1)):
            G[t] += gamma * G[t + 1]
        U.check_shape(G, R)
        return G

    def compute_td_zero(self, data, V, R=None, gamma=None):
        if gamma is None:
            gamma = self.gamma
        S, A, rewards, Snext, dones = data
        if R is None:
            R = rewards
        batch_size = len(S)
        batch_shape = (batch_size, )
        targets = R + gamma * V(Snext).flatten() * (1 - dones)
        U.check_shape(targets, batch_shape)
        deltas = targets - V(S).flatten()
        U.check_shape(deltas, batch_shape)
        return targets, deltas

    def compute_gae(self, deltas, dones, gamma=None, lambd=None):
        if gamma is None:
            gamma = self.gamma
        if lambd is None:
            lambd = self.lambd
        T = len(dones)
        batch_shape = (T, )
        advantages = np.array(deltas)
        for t in reversed(range(T - 1)):
            advantages[t] += gamma * lambd * advantages[t + 1] * (1 - dones[t])
        U.check_shape(advantages, batch_shape)
        return advantages

    def epsilon_greedy(self, values, epsilon=None):
        if self.training:
            if epsilon is None:
                epsilon = 1 / self.episode
            return epsilon_greedy(values, epsilon=epsilon)
        else:
            return greedy(values)


class BaseAgent(Utils):

    def __init__(self,
                 env=None,
                 gamma=.99,
                 alpha=.1,
                 lambd=.95,
                 parameters=None):
        self.env = env
        self.gamma = gamma
        self.alpha = alpha
        self.lambd = lambd
        self._parameters = parameters
        self.training = False
        self.episode = 0
        self.episode_step = 0
        self.global_step = 0
        self.init()

    def init(self):
        pass

    def train(self, *args, **kwargs):
        """Run the agent in training mode by setting ``self.training = True``.

        See: :func:`~train.Agent.run`
        """
        self.training = True
        return self.run(*args, **kwargs)

    def test(self, *args, **kwargs):
        """Run the agent in test mode by setting ``self.training = False``.

        See: :func:`~train.Agent.run`
        """
        self.training = False
        return self.run(*args, **kwargs)

    def run(self,
            episodes,
            env=None,
            max_steps=-1,
            max_episode_steps=-1,
            render=False):
        """Run the agent in environment.

        Args:
            episodes (int): Maximum number of episodes to run.
            env: OpenAI Gym like environment object.
            max_steps (int): Maximum number of total steps to run.
            max_episode_steps (int): Maximum number steps to run in each episode.
            render (bool): Visualize interaction of agent in environment.

        Returns:
            list: List of cumulative rewards in each episode.
        """
        env = env or self.env
        max_episode_steps -= 1
        scores = []
        for _ in range(episodes):
            observation = env.reset()
            self._reset(observation)
            score = 0
            self.episode += 1
            self.trigger('episode_begin')

            for self.episode_step in count():
                self.trigger('step_begin')
                action = self._act(observation)
                next_observation, reward, done, info = env.step(action)
                transition = Transition(state=observation,
                                        action=action,
                                        next_state=next_observation,
                                        reward=reward,
                                        done=done)
                self._observe(transition, info)
                self.trigger('step_end')
                self.global_step += 1
                observation = next_observation
                score += reward
                max_steps -= 1
                if render:
                    env.render()
                if done or max_steps == 0 or self.episode_step == max_episode_steps:
                    break

            self.trigger('episode_end')
            scores.append(score)
            if max_steps == 0:
                break

        return scores

    def _act(self, observation):
        return self.act(observation)

    def act(self, state):
        """Select an action by reading the current state.

        Args:
            state (array_like): Current state of agent based on past observations.

        Returns:
            An action to take in the environment.
        """
        raise NotImplementedError()

    def _reset(self, observation):
        return self.reset(observation)

    def reset(self, state):
        pass

    def _observe(self, transition, info):
        return self.observe(transition)

    def observe(self, transition):
        pass

    @property
    def parameters(self):
        return self._parameters

    def trigger(self, name, *args, **kwargs):
        names = []
        if self.training:
            names.append(name)
            names.append(f'train_{name}')
        else:
            names.append(f'test_{name}')
        self.trigger_events(names, *args, **kwargs)

    def trigger_events(self, names, *args, **kwargs):
        if not isinstance(names, list):
            names = [names]
        for name in names:
            name = f'on_{name}'
            if hasattr(self, name):
                getattr(self, name)(*args, **kwargs)


[docs]class Agent(BaseAgent): """Base class for all agents. Args: state (int, State): A number representing the number of recent observations to save in state or a custom :class:`~train.State` object. transitions (int, Transitions): A number representing the number of recent transitions to save in history or a custom :class:`~train.Transitions` object. env: OpenAI Gym like environment object. gamma (float): A custom parameter that can be used as discount factor, alpha (float): A custom parameter that can be used as learning rate , lambd (float): A custom parameter that can be used by various algorithms such as TD(lambda), parameters: List of trainable variables used by agent. """ def __init__(self, state=0, transitions=1, **kwargs): if isinstance(state, int): state = State(state) if isinstance(transitions, int): transitions = Transitions(transitions) self.state = state self.transitions = transitions super(Agent, self).__init__(**kwargs) def _act(self, observation): return self.act(self.state.get()) def _reset(self, observation): self.state.reset() self.state.update(observation) return self.reset(self.state.get()) def _observe(self, transition, info): state = self.state.get() self.state.update(transition.next_state) next_state = self.state.get() transition = Transition(state=state, action=transition.action, next_state=next_state, reward=transition.reward, done=transition.done) if self.training: self.transitions.append(transition) return self.observe(transition)