版權 2023 TF-Agents 作者。
開始使用
![]() |
![]() |
![]() |
![]() |
設定
如果您尚未安裝以下依賴項,請執行
pip install tf-agents
pip install tf-keras
import os
# Keep using keras-2 (tf-keras) rather than keras-3 (keras).
os.environ['TF_USE_LEGACY_KERAS'] = '1'
匯入
import abc
import numpy as np
import tensorflow as tf
from tf_agents.agents import tf_agent
from tf_agents.drivers import driver
from tf_agents.environments import py_environment
from tf_agents.environments import tf_environment
from tf_agents.environments import tf_py_environment
from tf_agents.policies import tf_policy
from tf_agents.specs import array_spec
from tf_agents.specs import tensor_spec
from tf_agents.trajectories import time_step as ts
from tf_agents.trajectories import trajectory
from tf_agents.trajectories import policy_step
nest = tf.nest
簡介
多臂老虎機問題 (MAB) 是強化學習的一個特例:代理程式透過在觀察環境的某些狀態後採取一些動作,在環境中收集獎勵。一般 RL 和 MAB 之間的主要區別在於,在 MAB 中,我們假設代理程式採取的動作不會影響環境的下一個狀態。因此,代理程式不對狀態轉換進行建模,不將獎勵歸功於過去的動作,也不會「提前計劃」以達到獎勵豐富的狀態。
如同在其他 RL 領域中,MAB 代理程式的目標是找到一個策略,以盡可能多地收集獎勵。然而,總是試圖利用承諾最高獎勵的動作將是一個錯誤,因為如果我們探索得不夠,我們可能會錯過更好的動作。這是 (MAB) 中要解決的主要問題,通常稱為探索-利用兩難。
MAB 的 Bandit 環境、策略和代理程式可以在 tf_agents/bandits 的子目錄中找到。
環境
在 TF-Agents 中,環境類別的作用是提供有關當前狀態(稱為觀察或情境)的資訊、接收作為輸入的動作、執行狀態轉換以及輸出獎勵。此類別還負責在 episode 結束時重置,以便可以開始新的 episode。這是透過在狀態標記為 episode 的「last」時呼叫 reset
函數來實現的。
更多詳細資訊,請參閱 TF-Agents 環境教學課程。
如上所述,MAB 與一般 RL 的不同之處在於,動作不會影響下一次觀察。另一個區別是,在 Bandit 中,沒有「episode」:每個時間步都以新的觀察開始,獨立於先前的時間步。
為了確保觀察是獨立的並抽象化 RL episode 的概念,我們引入了 PyEnvironment
和 TFEnvironment
的子類別:BanditPyEnvironment 和 BanditTFEnvironment。這些類別公開了兩個私有成員函數,仍有待使用者實作
@abc.abstractmethod
def _observe(self):
以及
@abc.abstractmethod
def _apply_action(self, action):
_observe
函數返回一個觀察。然後,策略根據此觀察選擇一個動作。_apply_action
接收該動作作為輸入,並返回相應的獎勵。這些私有成員函數分別由 reset
和 step
函數呼叫。
class BanditPyEnvironment(py_environment.PyEnvironment):
def __init__(self, observation_spec, action_spec):
self._observation_spec = observation_spec
self._action_spec = action_spec
super(BanditPyEnvironment, self).__init__()
# Helper functions.
def action_spec(self):
return self._action_spec
def observation_spec(self):
return self._observation_spec
def _empty_observation(self):
return tf.nest.map_structure(lambda x: np.zeros(x.shape, x.dtype),
self.observation_spec())
# These two functions below should not be overridden by subclasses.
def _reset(self):
"""Returns a time step containing an observation."""
return ts.restart(self._observe(), batch_size=self.batch_size)
def _step(self, action):
"""Returns a time step containing the reward for the action taken."""
reward = self._apply_action(action)
return ts.termination(self._observe(), reward)
# These two functions below are to be implemented in subclasses.
@abc.abstractmethod
def _observe(self):
"""Returns an observation."""
@abc.abstractmethod
def _apply_action(self, action):
"""Applies `action` to the Environment and returns the corresponding reward.
"""
上述臨時抽象類別實作了 PyEnvironment
的 _reset
和 _step
函數,並公開了抽象函數 _observe
和 _apply_action
以供子類別實作。
一個簡單的環境類別範例
以下類別提供了一個非常簡單的環境,其中觀察是 -2 到 2 之間的隨機整數,有 3 個可能的動作 (0、1、2),獎勵是動作和觀察的乘積。
class SimplePyEnvironment(BanditPyEnvironment):
def __init__(self):
action_spec = array_spec.BoundedArraySpec(
shape=(), dtype=np.int32, minimum=0, maximum=2, name='action')
observation_spec = array_spec.BoundedArraySpec(
shape=(1,), dtype=np.int32, minimum=-2, maximum=2, name='observation')
super(SimplePyEnvironment, self).__init__(observation_spec, action_spec)
def _observe(self):
self._observation = np.random.randint(-2, 3, (1,), dtype='int32')
return self._observation
def _apply_action(self, action):
return action * self._observation
現在我們可以利用這個環境來獲取觀察,並接收我們動作的獎勵。
environment = SimplePyEnvironment()
observation = environment.reset().observation
print("observation: %d" % observation)
action = 2
print("action: %d" % action)
reward = environment.step(action).reward
print("reward: %f" % reward)
observation: -2 action: 2 reward: -4.000000 /tmpfs/tmp/ipykernel_30068/1543604332.py:3: DeprecationWarning: Conversion of an array with ndim > 0 to a scalar is deprecated, and will error in future. Ensure you extract a single element from your array before performing this operation. (Deprecated NumPy 1.25.) print("observation: %d" % observation) /tmpfs/tmp/ipykernel_30068/1543604332.py:9: DeprecationWarning: Conversion of an array with ndim > 0 to a scalar is deprecated, and will error in future. Ensure you extract a single element from your array before performing this operation. (Deprecated NumPy 1.25.) print("reward: %f" % reward)
TF 環境
可以透過子類別化 BanditTFEnvironment
來定義 bandit 環境,或者,類似於 RL 環境,可以定義 BanditPyEnvironment
並使用 TFPyEnvironment
包裝它。為了簡單起見,我們在本教學課程中選擇後者。
tf_environment = tf_py_environment.TFPyEnvironment(environment)
策略
bandit 問題中的策略與 RL 問題中的策略工作方式相同:它在給定觀察作為輸入的情況下,提供一個動作(或動作的分佈)。
更多詳細資訊,請參閱 TF-Agents 策略教學課程。
與環境一樣,有兩種建構策略的方法:可以建立 PyPolicy
並使用 TFPyPolicy
包裝它,或直接建立 TFPolicy
。在這裡,我們選擇直接方法。
由於這個範例非常簡單,我們可以手動定義最佳策略。動作僅取決於觀察的符號,負數時為 0,正數時為 2。
class SignPolicy(tf_policy.TFPolicy):
def __init__(self):
observation_spec = tensor_spec.BoundedTensorSpec(
shape=(1,), dtype=tf.int32, minimum=-2, maximum=2)
time_step_spec = ts.time_step_spec(observation_spec)
action_spec = tensor_spec.BoundedTensorSpec(
shape=(), dtype=tf.int32, minimum=0, maximum=2)
super(SignPolicy, self).__init__(time_step_spec=time_step_spec,
action_spec=action_spec)
def _distribution(self, time_step):
pass
def _variables(self):
return ()
def _action(self, time_step, policy_state, seed):
observation_sign = tf.cast(tf.sign(time_step.observation[0]), dtype=tf.int32)
action = observation_sign + 1
return policy_step.PolicyStep(action, policy_state)
現在我們可以從環境請求觀察,呼叫策略以選擇動作,然後環境將輸出獎勵
sign_policy = SignPolicy()
current_time_step = tf_environment.reset()
print('Observation:')
print (current_time_step.observation)
action = sign_policy.action(current_time_step).action
print('Action:')
print (action)
reward = tf_environment.step(action).reward
print('Reward:')
print(reward)
Observation: tf.Tensor([[-1]], shape=(1, 1), dtype=int32) Action: tf.Tensor([0], shape=(1,), dtype=int32) Reward: tf.Tensor([[0.]], shape=(1, 1), dtype=float32)
bandit 環境的實作方式確保每次我們採取步驟時,我們不僅收到我們所採取動作的獎勵,還收到下一次觀察。
step = tf_environment.reset()
action = 1
next_step = tf_environment.step(action)
reward = next_step.reward
next_observation = next_step.observation
print("Reward: ")
print(reward)
print("Next observation:")
print(next_observation)
Reward: tf.Tensor([[0.]], shape=(1, 1), dtype=float32) Next observation: tf.Tensor([[1]], shape=(1, 1), dtype=int32)
Agents
現在我們有了 bandit 環境和 bandit 策略,現在是時候也定義 bandit 代理程式了,它們負責根據訓練樣本更改策略。
bandit 代理程式的 API 與 RL 代理程式的 API 沒有區別:代理程式只需要實作 _initialize
和 _train
方法,並定義一個 policy
和一個 collect_policy
。
一個更複雜的環境
在我們編寫我們的 bandit 代理程式之前,我們需要有一個更難以理解的環境。為了讓事情更有趣一點,下一個環境將始終給出 reward = observation * action
或 reward = -observation * action
。這將在初始化環境時決定。
class TwoWayPyEnvironment(BanditPyEnvironment):
def __init__(self):
action_spec = array_spec.BoundedArraySpec(
shape=(), dtype=np.int32, minimum=0, maximum=2, name='action')
observation_spec = array_spec.BoundedArraySpec(
shape=(1,), dtype=np.int32, minimum=-2, maximum=2, name='observation')
# Flipping the sign with probability 1/2.
self._reward_sign = 2 * np.random.randint(2) - 1
print("reward sign:")
print(self._reward_sign)
super(TwoWayPyEnvironment, self).__init__(observation_spec, action_spec)
def _observe(self):
self._observation = np.random.randint(-2, 3, (1,), dtype='int32')
return self._observation
def _apply_action(self, action):
return self._reward_sign * action * self._observation[0]
two_way_tf_environment = tf_py_environment.TFPyEnvironment(TwoWayPyEnvironment())
reward sign: 1
一個更複雜的策略
更複雜的環境需要更複雜的策略。我們需要一個策略來檢測底層環境的行為。策略需要處理三種情況
- 代理程式尚未偵測到正在運行的環境版本。
- 代理程式偵測到原始版本的環境正在運行。
- 代理程式偵測到翻轉版本的環境正在運行。
我們定義一個名為 _situation
的 tf_variable
來儲存此資訊,編碼為 [0, 2]
中的值,然後使策略相應地運作。
class TwoWaySignPolicy(tf_policy.TFPolicy):
def __init__(self, situation):
observation_spec = tensor_spec.BoundedTensorSpec(
shape=(1,), dtype=tf.int32, minimum=-2, maximum=2)
action_spec = tensor_spec.BoundedTensorSpec(
shape=(), dtype=tf.int32, minimum=0, maximum=2)
time_step_spec = ts.time_step_spec(observation_spec)
self._situation = situation
super(TwoWaySignPolicy, self).__init__(time_step_spec=time_step_spec,
action_spec=action_spec)
def _distribution(self, time_step):
pass
def _variables(self):
return [self._situation]
def _action(self, time_step, policy_state, seed):
sign = tf.cast(tf.sign(time_step.observation[0, 0]), dtype=tf.int32)
def case_unknown_fn():
# Choose 1 so that we get information on the sign.
return tf.constant(1, shape=(1,))
# Choose 0 or 2, depending on the situation and the sign of the observation.
def case_normal_fn():
return tf.constant(sign + 1, shape=(1,))
def case_flipped_fn():
return tf.constant(1 - sign, shape=(1,))
cases = [(tf.equal(self._situation, 0), case_unknown_fn),
(tf.equal(self._situation, 1), case_normal_fn),
(tf.equal(self._situation, 2), case_flipped_fn)]
action = tf.case(cases, exclusive=True)
return policy_step.PolicyStep(action, policy_state)
代理程式
現在是時候定義代理程式了,它可以偵測環境的符號並適當地設定策略。
class SignAgent(tf_agent.TFAgent):
def __init__(self):
self._situation = tf.Variable(0, dtype=tf.int32)
policy = TwoWaySignPolicy(self._situation)
time_step_spec = policy.time_step_spec
action_spec = policy.action_spec
super(SignAgent, self).__init__(time_step_spec=time_step_spec,
action_spec=action_spec,
policy=policy,
collect_policy=policy,
train_sequence_length=None)
def _initialize(self):
return tf.compat.v1.variables_initializer(self.variables)
def _train(self, experience, weights=None):
observation = experience.observation
action = experience.action
reward = experience.reward
# We only need to change the value of the situation variable if it is
# unknown (0) right now, and we can infer the situation only if the
# observation is not 0.
needs_action = tf.logical_and(tf.equal(self._situation, 0),
tf.not_equal(reward, 0))
def new_situation_fn():
"""This returns either 1 or 2, depending on the signs."""
return (3 - tf.sign(tf.cast(observation[0, 0, 0], dtype=tf.int32) *
tf.cast(action[0, 0], dtype=tf.int32) *
tf.cast(reward[0, 0], dtype=tf.int32))) / 2
new_situation = tf.cond(needs_action,
new_situation_fn,
lambda: self._situation)
new_situation = tf.cast(new_situation, tf.int32)
tf.compat.v1.assign(self._situation, new_situation)
return tf_agent.LossInfo((), ())
sign_agent = SignAgent()
在上面的程式碼中,代理程式定義了策略,變數 situation
由代理程式和策略共享。
此外,_train
函數的參數 experience
是一個 trajectory
Trajectories (軌跡)
在 TF-Agents 中,trajectories
是具名元組,其中包含從先前採取的步驟中採樣的樣本。然後,代理程式使用這些樣本來訓練和更新策略。在 RL 中,trajectories 必須包含有關當前狀態、下一個狀態以及當前 episode 是否已結束的資訊。由於在 Bandit 世界中我們不需要這些東西,因此我們設定了一個輔助函數來建立 trajectory
# We need to add another dimension here because the agent expects the
# trajectory of shape [batch_size, time, ...], but in this tutorial we assume
# that both batch size and time are 1. Hence all the expand_dims.
def trajectory_for_bandit(initial_step, action_step, final_step):
return trajectory.Trajectory(observation=tf.expand_dims(initial_step.observation, 0),
action=tf.expand_dims(action_step.action, 0),
policy_info=action_step.info,
reward=tf.expand_dims(final_step.reward, 0),
discount=tf.expand_dims(final_step.discount, 0),
step_type=tf.expand_dims(initial_step.step_type, 0),
next_step_type=tf.expand_dims(final_step.step_type, 0))
訓練代理程式
現在所有組件都已準備就緒,可以訓練我們的 bandit 代理程式了。
step = two_way_tf_environment.reset()
for _ in range(10):
action_step = sign_agent.collect_policy.action(step)
next_step = two_way_tf_environment.step(action_step.action)
experience = trajectory_for_bandit(step, action_step, next_step)
print(experience)
sign_agent.train(experience)
step = next_step
Trajectory( {'step_type': <tf.Tensor: shape=(1, 1), dtype=int32, numpy=array([[0]], dtype=int32)>, 'observation': <tf.Tensor: shape=(1, 1, 1), dtype=int32, numpy=array([[[2]]], dtype=int32)>, 'action': <tf.Tensor: shape=(1, 1), dtype=int32, numpy=array([[1]], dtype=int32)>, 'policy_info': (), 'next_step_type': <tf.Tensor: shape=(1, 1), dtype=int32, numpy=array([[2]], dtype=int32)>, 'reward': <tf.Tensor: shape=(1, 1), dtype=float32, numpy=array([[2.]], dtype=float32)>, 'discount': <tf.Tensor: shape=(1, 1), dtype=float32, numpy=array([[0.]], dtype=float32)>}) Trajectory( {'step_type': <tf.Tensor: shape=(1, 1), dtype=int32, numpy=array([[2]], dtype=int32)>, 'observation': <tf.Tensor: shape=(1, 1, 1), dtype=int32, numpy=array([[[-1]]], dtype=int32)>, 'action': <tf.Tensor: shape=(1, 1), dtype=int32, numpy=array([[0]], dtype=int32)>, 'policy_info': (), 'next_step_type': <tf.Tensor: shape=(1, 1), dtype=int32, numpy=array([[2]], dtype=int32)>, 'reward': <tf.Tensor: shape=(1, 1), dtype=float32, numpy=array([[0.]], dtype=float32)>, 'discount': <tf.Tensor: shape=(1, 1), dtype=float32, numpy=array([[0.]], dtype=float32)>}) Trajectory( {'step_type': <tf.Tensor: shape=(1, 1), dtype=int32, numpy=array([[2]], dtype=int32)>, 'observation': <tf.Tensor: shape=(1, 1, 1), dtype=int32, numpy=array([[[-2]]], dtype=int32)>, 'action': <tf.Tensor: shape=(1, 1), dtype=int32, numpy=array([[0]], dtype=int32)>, 'policy_info': (), 'next_step_type': <tf.Tensor: shape=(1, 1), dtype=int32, numpy=array([[2]], dtype=int32)>, 'reward': <tf.Tensor: shape=(1, 1), dtype=float32, numpy=array([[0.]], dtype=float32)>, 'discount': <tf.Tensor: shape=(1, 1), dtype=float32, numpy=array([[0.]], dtype=float32)>}) Trajectory( {'step_type': <tf.Tensor: shape=(1, 1), dtype=int32, numpy=array([[2]], dtype=int32)>, 'observation': <tf.Tensor: shape=(1, 1, 1), dtype=int32, numpy=array([[[2]]], dtype=int32)>, 'action': <tf.Tensor: shape=(1, 1), dtype=int32, numpy=array([[2]], dtype=int32)>, 'policy_info': (), 'next_step_type': <tf.Tensor: shape=(1, 1), dtype=int32, numpy=array([[2]], dtype=int32)>, 'reward': <tf.Tensor: shape=(1, 1), dtype=float32, numpy=array([[4.]], dtype=float32)>, 'discount': <tf.Tensor: shape=(1, 1), dtype=float32, numpy=array([[0.]], dtype=float32)>}) Trajectory( {'step_type': <tf.Tensor: shape=(1, 1), dtype=int32, numpy=array([[2]], dtype=int32)>, 'observation': <tf.Tensor: shape=(1, 1, 1), dtype=int32, numpy=array([[[1]]], dtype=int32)>, 'action': <tf.Tensor: shape=(1, 1), dtype=int32, numpy=array([[2]], dtype=int32)>, 'policy_info': (), 'next_step_type': <tf.Tensor: shape=(1, 1), dtype=int32, numpy=array([[2]], dtype=int32)>, 'reward': <tf.Tensor: shape=(1, 1), dtype=float32, numpy=array([[2.]], dtype=float32)>, 'discount': <tf.Tensor: shape=(1, 1), dtype=float32, numpy=array([[0.]], dtype=float32)>}) Trajectory( {'step_type': <tf.Tensor: shape=(1, 1), dtype=int32, numpy=array([[2]], dtype=int32)>, 'observation': <tf.Tensor: shape=(1, 1, 1), dtype=int32, numpy=array([[[-1]]], dtype=int32)>, 'action': <tf.Tensor: shape=(1, 1), dtype=int32, numpy=array([[0]], dtype=int32)>, 'policy_info': (), 'next_step_type': <tf.Tensor: shape=(1, 1), dtype=int32, numpy=array([[2]], dtype=int32)>, 'reward': <tf.Tensor: shape=(1, 1), dtype=float32, numpy=array([[0.]], dtype=float32)>, 'discount': <tf.Tensor: shape=(1, 1), dtype=float32, numpy=array([[0.]], dtype=float32)>}) Trajectory( {'step_type': <tf.Tensor: shape=(1, 1), dtype=int32, numpy=array([[2]], dtype=int32)>, 'observation': <tf.Tensor: shape=(1, 1, 1), dtype=int32, numpy=array([[[1]]], dtype=int32)>, 'action': <tf.Tensor: shape=(1, 1), dtype=int32, numpy=array([[2]], dtype=int32)>, 'policy_info': (), 'next_step_type': <tf.Tensor: shape=(1, 1), dtype=int32, numpy=array([[2]], dtype=int32)>, 'reward': <tf.Tensor: shape=(1, 1), dtype=float32, numpy=array([[2.]], dtype=float32)>, 'discount': <tf.Tensor: shape=(1, 1), dtype=float32, numpy=array([[0.]], dtype=float32)>}) Trajectory( {'step_type': <tf.Tensor: shape=(1, 1), dtype=int32, numpy=array([[2]], dtype=int32)>, 'observation': <tf.Tensor: shape=(1, 1, 1), dtype=int32, numpy=array([[[2]]], dtype=int32)>, 'action': <tf.Tensor: shape=(1, 1), dtype=int32, numpy=array([[2]], dtype=int32)>, 'policy_info': (), 'next_step_type': <tf.Tensor: shape=(1, 1), dtype=int32, numpy=array([[2]], dtype=int32)>, 'reward': <tf.Tensor: shape=(1, 1), dtype=float32, numpy=array([[4.]], dtype=float32)>, 'discount': <tf.Tensor: shape=(1, 1), dtype=float32, numpy=array([[0.]], dtype=float32)>}) Trajectory( {'step_type': <tf.Tensor: shape=(1, 1), dtype=int32, numpy=array([[2]], dtype=int32)>, 'observation': <tf.Tensor: shape=(1, 1, 1), dtype=int32, numpy=array([[[0]]], dtype=int32)>, 'action': <tf.Tensor: shape=(1, 1), dtype=int32, numpy=array([[1]], dtype=int32)>, 'policy_info': (), 'next_step_type': <tf.Tensor: shape=(1, 1), dtype=int32, numpy=array([[2]], dtype=int32)>, 'reward': <tf.Tensor: shape=(1, 1), dtype=float32, numpy=array([[0.]], dtype=float32)>, 'discount': <tf.Tensor: shape=(1, 1), dtype=float32, numpy=array([[0.]], dtype=float32)>}) Trajectory( {'step_type': <tf.Tensor: shape=(1, 1), dtype=int32, numpy=array([[2]], dtype=int32)>, 'observation': <tf.Tensor: shape=(1, 1, 1), dtype=int32, numpy=array([[[-1]]], dtype=int32)>, 'action': <tf.Tensor: shape=(1, 1), dtype=int32, numpy=array([[0]], dtype=int32)>, 'policy_info': (), 'next_step_type': <tf.Tensor: shape=(1, 1), dtype=int32, numpy=array([[2]], dtype=int32)>, 'reward': <tf.Tensor: shape=(1, 1), dtype=float32, numpy=array([[0.]], dtype=float32)>, 'discount': <tf.Tensor: shape=(1, 1), dtype=float32, numpy=array([[0.]], dtype=float32)>})
從輸出中可以看出,在第二步之後(除非第一步中的觀察值為 0),策略以正確的方式選擇動作,因此收集的獎勵始終是非負數。
一個真實的情境 Bandit 範例
在本教學課程的其餘部分,我們使用 TF-Agents Bandits 函式庫的預先實作的環境和代理程式。
# Imports for example.
from tf_agents.bandits.agents import lin_ucb_agent
from tf_agents.bandits.environments import stationary_stochastic_py_environment as sspe
from tf_agents.bandits.metrics import tf_metrics
from tf_agents.drivers import dynamic_step_driver
from tf_agents.replay_buffers import tf_uniform_replay_buffer
import matplotlib.pyplot as plt
具有線性收益函數的靜態隨機環境
本範例中使用的環境是 StationaryStochasticPyEnvironment。此環境將(通常是有雜訊的)函數作為參數,用於提供觀察(情境),並且對於每個臂,採用(也是有雜訊的)函數,該函數基於給定的觀察計算獎勵。在我們的範例中,我們從 d 維立方體均勻採樣情境,獎勵函數是情境的線性函數,加上一些高斯雜訊。
batch_size = 2 # @param
arm0_param = [-3, 0, 1, -2] # @param
arm1_param = [1, -2, 3, 0] # @param
arm2_param = [0, 0, 1, 1] # @param
def context_sampling_fn(batch_size):
"""Contexts from [-10, 10]^4."""
def _context_sampling_fn():
return np.random.randint(-10, 10, [batch_size, 4]).astype(np.float32)
return _context_sampling_fn
class LinearNormalReward(object):
"""A class that acts as linear reward function when called."""
def __init__(self, theta, sigma):
self.theta = theta
self.sigma = sigma
def __call__(self, x):
mu = np.dot(x, self.theta)
return np.random.normal(mu, self.sigma)
arm0_reward_fn = LinearNormalReward(arm0_param, 1)
arm1_reward_fn = LinearNormalReward(arm1_param, 1)
arm2_reward_fn = LinearNormalReward(arm2_param, 1)
environment = tf_py_environment.TFPyEnvironment(
sspe.StationaryStochasticPyEnvironment(
context_sampling_fn(batch_size),
[arm0_reward_fn, arm1_reward_fn, arm2_reward_fn],
batch_size=batch_size))
LinUCB 代理程式
下面的代理程式實作了 LinUCB 演算法。
observation_spec = tensor_spec.TensorSpec([4], tf.float32)
time_step_spec = ts.time_step_spec(observation_spec)
action_spec = tensor_spec.BoundedTensorSpec(
dtype=tf.int32, shape=(), minimum=0, maximum=2)
agent = lin_ucb_agent.LinearUCBAgent(time_step_spec=time_step_spec,
action_spec=action_spec)
遺憾值指標
Bandit 最重要的指標是遺憾值,計算為代理程式收集的獎勵與可以訪問環境獎勵函數的 oracle 策略的預期獎勵之間的差異。RegretMetric 因此需要一個 baseline_reward_fn 函數,該函數計算給定觀察值的最佳可實現預期獎勵。對於我們的範例,我們需要取我們已經為環境定義的獎勵函數的無雜訊等效值的最大值。
def compute_optimal_reward(observation):
expected_reward_for_arms = [
tf.linalg.matvec(observation, tf.cast(arm0_param, dtype=tf.float32)),
tf.linalg.matvec(observation, tf.cast(arm1_param, dtype=tf.float32)),
tf.linalg.matvec(observation, tf.cast(arm2_param, dtype=tf.float32))]
optimal_action_reward = tf.reduce_max(expected_reward_for_arms, axis=0)
return optimal_action_reward
regret_metric = tf_metrics.RegretMetric(compute_optimal_reward)
訓練
現在我們將上面介紹的所有組件放在一起:環境、策略和代理程式。我們在環境中運行策略,並在驅動程式的幫助下輸出訓練資料,並根據資料訓練代理程式。
請注意,有兩個參數一起指定採取的步驟數。num_iterations
指定我們運行訓練器迴圈的次數,而驅動程式將在每次迭代中執行 steps_per_loop
步驟。保留這兩個參數的主要原因是某些操作是按迭代完成的,而某些操作是由驅動程式在每個步驟中完成的。例如,代理程式的 train
函數在每次迭代中僅呼叫一次。這裡的權衡是,如果我們更頻繁地訓練,那麼我們的策略就會「更新鮮」,另一方面,以更大的批次進行訓練可能會更有效率。
num_iterations = 90 # @param
steps_per_loop = 1 # @param
replay_buffer = tf_uniform_replay_buffer.TFUniformReplayBuffer(
data_spec=agent.policy.trajectory_spec,
batch_size=batch_size,
max_length=steps_per_loop)
observers = [replay_buffer.add_batch, regret_metric]
driver = dynamic_step_driver.DynamicStepDriver(
env=environment,
policy=agent.collect_policy,
num_steps=steps_per_loop * batch_size,
observers=observers)
regret_values = []
for _ in range(num_iterations):
driver.run()
loss_info = agent.train(replay_buffer.gather_all())
replay_buffer.clear()
regret_values.append(regret_metric.result())
plt.plot(regret_values)
plt.ylabel('Average Regret')
plt.xlabel('Number of Iterations')
WARNING:tensorflow:From /tmpfs/tmp/ipykernel_30068/3138849230.py:21: ReplayBuffer.gather_all (from tf_agents.replay_buffers.replay_buffer) is deprecated and will be removed in a future version. Instructions for updating: Use `as_dataset(..., single_deterministic_pass=True)` instead. Text(0.5, 0, 'Number of Iterations')
運行最後的程式碼片段後,結果圖(希望)顯示,隨著代理程式的訓練以及策略在弄清楚給定觀察值的正確動作方面變得更好,平均遺憾值正在下降。
下一步?
要查看更多工作範例,請參閱 bandits/agents/examples,其中包含適用於不同代理程式和環境的現成範例。
TF-Agents 函式庫也能够處理具有單臂特徵的多臂老虎機。為此,我們建議讀者參考單臂 bandit 教學課程。