전에 tf1으로 dqn을 작성했다.
tf2에서 제공하는 Eager execution + gradienttape 연습할겸 다시 짜봤다.
env는 간단하게 cartpole 했다. 코드를 보자
1. replay buffer
class ReplayBuffer:
def __init__(self, buffer_size):
self.buffer_size = buffer_size
self.buffer = deque(maxlen=self.buffer_size)
def sample(self, batch_size):
size = batch_size if len(self.buffer) > batch_size else len(self.buffer)
return random.sample(self.buffer, size)
def clear(self):
self.buffer.clear()
def append(self, transition):
self.buffer.append(transition)
def __len__(self):
return len(self.buffer)
순차적으로 들어오는 데이터를 학습시 correlation 때문에 (가까운 데이터는 비슷한 분포를 가지므로 전체 데이터에 대한 분포를 학습하기 어려움), 그리고 한 번 학습에 사용한 데이터를 바로 버리는게 아니라 계속해서 사용함으로써 data efficiency 향상시킴 그래서 replay buffer를 쓴다. deque으로 append랑 sample만 짜면된다. sample은 random 모듈사용
random: docs.python.org/3/library/random.html
2. network
class QNetworkDense(tf.keras.Model):
def __init__(self, obs_dim, acs_dim):
super(QNetworkDense, self).__init__()
self.layer1 = Dense(24, activation='relu', kernel_initializer='he_uniform')
self.layer2 = Dense(24, activation='relu', kernel_initializer='he_uniform')
self.output_layer = Dense(acs_dim, activation='linear', kernel_initializer='he_uniform')
self.build(input_shape=(None,) + obs_dim)
def call(self, x):
x = self.layer1(x)
x = self.layer2(x)
엄청 간단한 네트워크긴한데 그냥 Model 상속받아서 class로 짯다. class는 입력 shape을 어떻게 줘야하나 고민을 좀 했다. 일단 build 함수로 입력 차원 넣긴했는데 변수 여러개의 입력일 때 어떻게 해야할지 생각해봐야 겠다. 각각 입력에 대해서 케라스 Model을 구현하고, 모델을 빌드해야하나?? 이런것들을...
고수들의 코드를 보려고 keras에서 구현한 resnet, denset 보면 데코레이터로 레이어블록을 짜고 class로 network를 딱히 구현을 안했다. 담에 ddpg 짤때는 network 안쓰고 구현해봐야겠다.
keras application: github.com/tensorflow/tensorflow/blob/b8d2a994e4fda783f27894689b85c73581998407/tensorflow/python/keras/applications/resnet.py#L60
3. Agent
class Agent(object):
def __init__(self, env, obs_dim, acs_dim, steps,
gamma=0.99, epsilon=1.0, epsilon_decay=0.999,
buffer_size=2000, batch_size=64, target_update_step=100):
self.env = env
self.args = args
self.obs_dim = obs_dim
self.acs_dim = acs_dim
self.steps = steps
self.gamma = gamma
self.epsilon = epsilon
self.epsilon_decay = epsilon_decay
self.buffer_size = buffer_size
self.target_update_step = target_update_step
self.batch_size = batch_size
self.replay_buffer = ReplayBuffer(self.buffer_size)
self.q_fn = QNetworkDense(self.obs_dim, self.acs_dim)
self.q_fn_target = QNetworkDense(self.obs_dim, self.acs_dim)
self.q_fn.compile(optimizer='adam', loss='mse')
self.target_update()
self.loss_fn = tf.keras.losses.MeanSquaredError()
self.optimizer = tf.keras.optimizers.Adam()
dqn Agent, env와 env 정보, hyperparameter를 받는다. env 정보를 밖에서 받고, discrete, continous action or state space에 대해서 바뀌는 네트워크 같은것도 추가로 작성해주긴 해야한다.
4. agent - learn
@tf.function
def learn(self, obs, acs, next_obs, rewards, dones):
q_target = rewards + (1 - dones) * self.gamma * tf.reduce_max(self.q_fn_target(next_obs), axis=1, keepdims=True)
with tf.GradientTape() as tape:
qt = self.q_fn(obs)
acs_onehot = tf.one_hot(tf.cast(tf.reshape(acs, [-1]), tf.int32), self.acs_dim)
qt = tf.reduce_sum(qt * acs_onehot, axis=1, keepdims=True)
loss = self.loss_fn(q_target, qt)
grads = tape.gradient(loss, self.q_fn.trainable_weights)
self.optimizer.apply_gradients(zip(grads, self.q_fn.trainable_weights))
agent의 학습하는 부분 transition을 받고, gradienttape로 학습하게끔 했다.
acs_onehot = tf.one_hot(tf.cast(tf.reshape(acs, [-1]), tf.int32), self.acs_dim)
qt = tf.reduce_sum(qt * acs_onehot, axis=1, keepdims=True)
요부분에서 좀 헤맸다. q table을 보고 acs에 해당하는 q 값을 가져오도록 하는 코드인데.. acs를 indices로 보고 가져오고 싶은데, tf.function으로 데코레이팅 해줘서 tf ops만 써야해서 어려웠다... tf.gather 이거나 tf.gather_nd 이거 쓰면 되는 것 같은데.. 아래 글이 내가 하고싶은 작업이긴한데..
tf.gather : stackoverflow.com/questions/36764791/in-tensorflow-how-to-use-tf-gather-for-the-last-dimension
x = tf.constant([[1, 2, 3],
[4, 5, 6],
[7, 8, 9]])
idx = tf.constant([1, 0, 2])
idx_flattened = tf.range(0, x.shape[0]) * x.shape[1] + idx
y = tf.gather(tf.reshape(x, [-1]), # flatten input
idx_flattened) # use flattened indices [2 4 9]
요롷게..? 하기에는 조금 읽기 어려워 보여서!
그냥 acs를 one_hot encoding해주고 qtable과 곱해서 해당 dim을 다 더해줬다(reduce_sum)
* 그리고 tf.function으로 데코레이팅 된 부분에는 다른 ops를 쓰면 안된다..
단순히 넘파이와 파이썬 호출이 상수로 된다고해서 그냥 사용했는데, (처음에 buffer sampling을 learn 코드에서 했음) 학습이 아에 안되었다. (이거 때문에 2시간 날림^^) 무조건 tf.function 안에는 tf ops를 쓰자..
5. agent - target update, select_action
def target_update(self):
self.q_fn_target.set_weights(self.q_fn.get_weights())
def select_action(self, ob):
self.epsilon *= self.epsilon_decay
self.epsilon = max(self.epsilon, 0.01)
if np.random.rand() <= self.epsilon: # exploration
return np.random.randint(self.acs_dim)
else:
action = self.q_fn(ob[np.newaxis])
return np.argmax(action[0])
- td error 계산시, 학습에 사용할 q값 학습하는 q값이 같으면 td error가 많이 차이가 안남 학습하는데 unstable 해짐 그래서 target network를 두고 특정 주기마다 target network의 param을 업데이트 해줌
- select action은 eplision 확률만큼 exploit, exploration 해줌 그리고 가장 큰 q값을 선택하는 greedy함으로서 엡실론-그리디 부분
6. agent - train
def train(self):
epochs = 0
global_step = 0
reward_list = []
while global_step < self.steps:
ob = env.reset()
rewards = 0
while True:
ac = self.select_action(ob)
next_ob, reward, done, _ = env.step(ac)
transition = (ob, ac, next_ob, reward, done)
self.replay_buffer.append(transition)
rewards += reward
ob = next_ob
if done:
reward_list.append(rewards)
self.target_update()
print(f"epochs #{epochs+1} end, score is {rewards}")
break
# self.env.render()
if global_step > 1000:
transitions = self.replay_buffer.sample(batch_size=self.batch_size)
self.learn(*map(lambda x: np.vstack(x).astype('float32'), np.transpose(transitions)))
global_step += 1
epochs += 1
plt.title('Cartpole-v1')
plt.xlabel('epoch')
plt.ylabel('rewards')
plt.plot(reward_list)
plt.show()
미리 설정해준 step만큼만 학습진행, gym env interface에 따라 매 스탭마다 ob, ac, next_ob, reward, done의 transition을 버퍼에 넣어주고, replay buffer가 어느정도 쌓이면 (저는 1000으로 설정했습니다) 학습을 시작한다.
transition을 dict로 만들어서 넣어줄까도 고민했는데.. 최대한 간결하게 짜려고 그냥 tuple사용
7. main
if __name__ == '__main__':
tf.keras.backend.set_floatx('float32')
env = gym.make('CartPole-v0')
obs_dim = env.observation_space.shape
acs_dim = None
if isinstance(env.action_space, spaces.Box):
acs_type = 'continuous'
acs_dim = env.action_space.shape
elif isinstance(env.action_space, spaces.Discrete):
acs_type = 'discrete'
acs_dim = env.action_space.n
else:
raise NotImplementedError('Not implemented ㅎㅎ')
agent = Agent(env, obs_dim, acs_dim, 10000)
agent.train()
env 설정해주고, env 정보를 찾아주고 학습만 시키면 된당
결과
전체코드
import random
from collections import deque, defaultdict
import numpy as np
import tensorflow as tf
from tensorflow.keras.datasets import mnist
from tensorflow.keras.layers import Conv2D, Flatten, Dense, MaxPooling2D, Input
import matplotlib.pyplot as plt
import gym
from gym import spaces
class ReplayBuffer:
def __init__(self, buffer_size):
self.buffer_size = buffer_size
self.buffer = deque(maxlen=self.buffer_size)
def sample(self, batch_size):
size = batch_size if len(self.buffer) > batch_size else len(self.buffer)
return random.sample(self.buffer, size)
def clear(self):
self.buffer.clear()
def append(self, transition):
self.buffer.append(transition)
def __len__(self):
return len(self.buffer)
class QNetworkDense(tf.keras.Model):
def __init__(self, obs_dim, acs_dim):
super(QNetworkDense, self).__init__()
self.layer1 = Dense(24, activation='relu', kernel_initializer='he_uniform')
self.layer2 = Dense(24, activation='relu', kernel_initializer='he_uniform')
self.output_layer = Dense(acs_dim, activation='linear', kernel_initializer='he_uniform')
self.build(input_shape=(None,) + obs_dim)
def call(self, x):
x = self.layer1(x)
x = self.layer2(x)
return self.output_layer(x)
class Agent(object):
def __init__(self, env, obs_dim, acs_dim, steps,
gamma=0.99, epsilon=1.0, epsilon_decay=0.999,
buffer_size=2000, batch_size=64, target_update_step=100):
self.env = env
self.obs_dim = obs_dim
self.acs_dim = acs_dim
self.steps = steps
self.gamma = gamma
self.epsilon = epsilon
self.epsilon_decay = epsilon_decay
self.buffer_size = buffer_size
self.target_update_step = target_update_step
self.batch_size = batch_size
self.replay_buffer = ReplayBuffer(self.buffer_size)
self.q_fn = QNetworkDense(self.obs_dim, self.acs_dim)
self.q_fn_target = QNetworkDense(self.obs_dim, self.acs_dim)
self.q_fn.compile(optimizer='adam', loss='mse')
self.target_update()
self.loss_fn = tf.keras.losses.MeanSquaredError()
self.optimizer = tf.keras.optimizers.Adam()
@tf.function
def learn(self, obs, acs, next_obs, rewards, dones):
q_target = rewards + (1 - dones) * self.gamma * tf.reduce_max(self.q_fn_target(next_obs), axis=1, keepdims=True)
with tf.GradientTape() as tape:
qt = self.q_fn(obs)
acs_onehot = tf.one_hot(tf.cast(tf.reshape(acs, [-1]), tf.int32), self.acs_dim)
qt = tf.reduce_sum(qt * acs_onehot, axis=1, keepdims=True)
loss = self.loss_fn(q_target, qt)
grads = tape.gradient(loss, self.q_fn.trainable_weights)
self.optimizer.apply_gradients(zip(grads, self.q_fn.trainable_weights))
def train(self):
epochs = 0
global_step = 0
reward_list = []
while global_step < self.steps:
ob = env.reset()
rewards = 0
while True:
ac = self.select_action(ob)
next_ob, reward, done, _ = env.step(ac)
transition = (ob, ac, next_ob, reward, done)
self.replay_buffer.append(transition)
rewards += reward
ob = next_ob
if done:
reward_list.append(rewards)
self.target_update()
print(f"epochs #{epochs+1} end, score is {rewards}")
break
self.env.render()
if global_step > 1000:
transitions = self.replay_buffer.sample(batch_size=self.batch_size)
self.learn(*map(lambda x: np.vstack(x).astype('float32'), np.transpose(transitions)))
global_step += 1
epochs += 1
plt.title('Cartpole-v1')
plt.xlabel('epoch')
plt.ylabel('rewards')
plt.plot(reward_list)
plt.show()
def test(self):
pass
def target_update(self):
self.q_fn_target.set_weights(self.q_fn.get_weights())
def select_action(self, ob):
self.epsilon *= self.epsilon_decay
self.epsilon = max(self.epsilon, 0.01)
if np.random.rand() <= self.epsilon: # exploration
return np.random.randint(self.acs_dim)
else:
action = self.q_fn(ob[np.newaxis])
return np.argmax(action[0])
if __name__ == '__main__':
tf.keras.backend.set_floatx('float32')
env = gym.make('CartPole-v0')
obs_dim = env.observation_space.shape
acs_dim = None
if isinstance(env.action_space, spaces.Box):
acs_type = 'continuous'
acs_dim = env.action_space.shape
elif isinstance(env.action_space, spaces.Discrete):
acs_type = 'discrete'
acs_dim = env.action_space.n
else:
raise NotImplementedError('Not implemented ㅎㅎ')
agent = Agent(env, obs_dim, acs_dim, 10000)
agent.train()
ddpg, drqn, rdpg, ppo도 tf2로 짜봐야겠당
'연구 > 딥러닝' 카테고리의 다른 글
[강화학습] tensorflow2로 ddpg (0) | 2021.02.14 |
---|---|
[Colab] Colab 빠르게 사용해보기 (0) | 2019.11.15 |
[cs294-112] 1.Supervised Learning of Behaviors (0) | 2019.08.03 |