본문 바로가기

연구/딥러닝

[강화학습] tensorflow 2로 DQN

전에 tf1으로 dqn을 작성했다.

 

tf2에서 제공하는 Eager execution + gradienttape 연습할겸 다시 짜봤다.

 

env는 간단하게 cartpole 했다. 코드를 보자

 

 

1. replay buffer

 

class ReplayBuffer:
    def __init__(self, buffer_size):
        self.buffer_size = buffer_size
        self.buffer = deque(maxlen=self.buffer_size)
    
    def sample(self, batch_size):
        size = batch_size if len(self.buffer) > batch_size else len(self.buffer)
        return random.sample(self.buffer, size)

    def clear(self):
        self.buffer.clear()
    
    def append(self, transition):
        self.buffer.append(transition)

    def __len__(self):
        return len(self.buffer)

순차적으로 들어오는 데이터를 학습시 correlation 때문에 (가까운 데이터는 비슷한 분포를 가지므로 전체 데이터에 대한 분포를 학습하기 어려움), 그리고 한 번 학습에 사용한 데이터를 바로 버리는게 아니라 계속해서 사용함으로써 data efficiency 향상시킴 그래서 replay buffer를 쓴다. deque으로 append랑 sample만 짜면된다. sample은 random 모듈사용

 

random: docs.python.org/3/library/random.html

 

2. network

 

class QNetworkDense(tf.keras.Model):
    def __init__(self, obs_dim, acs_dim):
        super(QNetworkDense, self).__init__()
        self.layer1 = Dense(24, activation='relu', kernel_initializer='he_uniform')
        self.layer2 = Dense(24, activation='relu', kernel_initializer='he_uniform')
        self.output_layer = Dense(acs_dim, activation='linear', kernel_initializer='he_uniform')
        self.build(input_shape=(None,) + obs_dim)

    def call(self, x):
        x = self.layer1(x)
        x = self.layer2(x)

엄청 간단한 네트워크긴한데 그냥 Model 상속받아서 class로 짯다. class는 입력 shape을 어떻게 줘야하나 고민을 좀 했다. 일단 build 함수로 입력 차원 넣긴했는데 변수 여러개의 입력일 때 어떻게 해야할지 생각해봐야 겠다. 각각 입력에 대해서 케라스 Model을 구현하고, 모델을 빌드해야하나?? 이런것들을...

고수들의 코드를 보려고 keras에서 구현한 resnet, denset 보면 데코레이터로 레이어블록을 짜고 class로 network를 딱히 구현을 안했다. 담에 ddpg 짤때는 network 안쓰고 구현해봐야겠다.

 

keras application: github.com/tensorflow/tensorflow/blob/b8d2a994e4fda783f27894689b85c73581998407/tensorflow/python/keras/applications/resnet.py#L60

 

 

3. Agent

 

class Agent(object):
    def __init__(self, env, obs_dim, acs_dim, steps, 
                gamma=0.99, epsilon=1.0, epsilon_decay=0.999, 
                buffer_size=2000, batch_size=64, target_update_step=100):
        self.env = env
        self.args = args
        self.obs_dim = obs_dim
        self.acs_dim = acs_dim
        self.steps = steps
        self.gamma = gamma
        self.epsilon = epsilon
        self.epsilon_decay = epsilon_decay
        self.buffer_size = buffer_size
        self.target_update_step = target_update_step
        self.batch_size = batch_size

        self.replay_buffer = ReplayBuffer(self.buffer_size)
        
        self.q_fn = QNetworkDense(self.obs_dim, self.acs_dim)
        self.q_fn_target = QNetworkDense(self.obs_dim, self.acs_dim)

        self.q_fn.compile(optimizer='adam', loss='mse')
        self.target_update()
        
        self.loss_fn = tf.keras.losses.MeanSquaredError()
        self.optimizer = tf.keras.optimizers.Adam()

dqn Agent, env와 env 정보, hyperparameter를 받는다. env 정보를 밖에서 받고, discrete, continous action or state space에 대해서 바뀌는 네트워크 같은것도 추가로 작성해주긴 해야한다.

 

4. agent - learn

 

    @tf.function
    def learn(self, obs, acs, next_obs, rewards, dones):
        q_target = rewards + (1 - dones) * self.gamma * tf.reduce_max(self.q_fn_target(next_obs), axis=1, keepdims=True)

        with tf.GradientTape() as tape:
            qt = self.q_fn(obs)
            acs_onehot = tf.one_hot(tf.cast(tf.reshape(acs, [-1]), tf.int32), self.acs_dim)
            qt = tf.reduce_sum(qt * acs_onehot, axis=1, keepdims=True)
            loss = self.loss_fn(q_target, qt)
        grads = tape.gradient(loss, self.q_fn.trainable_weights)
        self.optimizer.apply_gradients(zip(grads, self.q_fn.trainable_weights))

agent의 학습하는 부분 transition을 받고, gradienttape로 학습하게끔 했다.

 

acs_onehot = tf.one_hot(tf.cast(tf.reshape(acs, [-1]), tf.int32), self.acs_dim)
qt = tf.reduce_sum(qt * acs_onehot, axis=1, keepdims=True)

요부분에서 좀 헤맸다. q table을 보고 acs에 해당하는 q 값을 가져오도록 하는 코드인데.. acs를 indices로 보고 가져오고 싶은데, tf.function으로 데코레이팅 해줘서 tf ops만 써야해서 어려웠다... tf.gather 이거나 tf.gather_nd 이거 쓰면 되는 것 같은데.. 아래 글이 내가 하고싶은 작업이긴한데.. 

 

tf.gather : stackoverflow.com/questions/36764791/in-tensorflow-how-to-use-tf-gather-for-the-last-dimension

 

x = tf.constant([[1, 2, 3],
                 [4, 5, 6],
                 [7, 8, 9]])
idx = tf.constant([1, 0, 2])
idx_flattened = tf.range(0, x.shape[0]) * x.shape[1] + idx
y = tf.gather(tf.reshape(x, [-1]),  # flatten input
              idx_flattened)  # use flattened indices [2 4 9]

요롷게..? 하기에는 조금 읽기 어려워 보여서!

 

그냥 acs를 one_hot encoding해주고 qtable과 곱해서 해당 dim을 다 더해줬다(reduce_sum)

 

* 그리고 tf.function으로 데코레이팅 된 부분에는 다른 ops를 쓰면 안된다..

 

tf.function 사용법 페이지.. : https://www.tensorflow.org/guide/function?hl=ko

단순히 넘파이와 파이썬 호출이 상수로 된다고해서 그냥 사용했는데, (처음에 buffer sampling을 learn 코드에서 했음) 학습이 아에 안되었다. (이거 때문에 2시간 날림^^) 무조건 tf.function 안에는 tf ops를 쓰자..

 

5. agent - target update, select_action

    def target_update(self):
        self.q_fn_target.set_weights(self.q_fn.get_weights())

    def select_action(self, ob):
        self.epsilon *= self.epsilon_decay
        self.epsilon = max(self.epsilon, 0.01)
        if np.random.rand() <= self.epsilon: # exploration
            return np.random.randint(self.acs_dim)
        else:
            action = self.q_fn(ob[np.newaxis])
            return np.argmax(action[0])

- td error 계산시, 학습에 사용할 q값 학습하는 q값이 같으면 td error가 많이 차이가 안남 학습하는데 unstable 해짐 그래서 target network를 두고 특정 주기마다 target network의 param을 업데이트 해줌

 

- select action은 eplision 확률만큼 exploit, exploration 해줌 그리고 가장 큰 q값을 선택하는 greedy함으로서 엡실론-그리디 부분

 

6. agent - train

    def train(self):
        epochs = 0
        global_step = 0
        reward_list = []
        while global_step < self.steps:
            ob = env.reset()
            rewards = 0
            while True:
                ac = self.select_action(ob)
                next_ob, reward, done, _ = env.step(ac)

                transition = (ob, ac, next_ob, reward, done)
                self.replay_buffer.append(transition)

                rewards += reward
                ob = next_ob

                if done:
                    reward_list.append(rewards)
                    self.target_update()
                    print(f"epochs #{epochs+1} end, score is {rewards}")
                    break

                # self.env.render()

                if global_step > 1000:
                    transitions = self.replay_buffer.sample(batch_size=self.batch_size)
                    self.learn(*map(lambda x: np.vstack(x).astype('float32'), np.transpose(transitions)))

                global_step += 1
            epochs += 1
        plt.title('Cartpole-v1')
        plt.xlabel('epoch')
        plt.ylabel('rewards')
        plt.plot(reward_list)
        plt.show()

미리 설정해준 step만큼만 학습진행, gym env interface에 따라 매 스탭마다 ob, ac, next_ob, reward, done의 transition을 버퍼에 넣어주고, replay buffer가 어느정도 쌓이면 (저는 1000으로 설정했습니다) 학습을 시작한다.

transition을 dict로 만들어서 넣어줄까도 고민했는데.. 최대한 간결하게 짜려고 그냥 tuple사용

 

 

7. main

 

if __name__ == '__main__':
    tf.keras.backend.set_floatx('float32')
    env = gym.make('CartPole-v0')

    obs_dim = env.observation_space.shape
    acs_dim = None
    
    if isinstance(env.action_space, spaces.Box):
        acs_type = 'continuous'
        acs_dim = env.action_space.shape
    elif isinstance(env.action_space, spaces.Discrete):
        acs_type = 'discrete'
        acs_dim = env.action_space.n
    else:
        raise NotImplementedError('Not implemented ㅎㅎ')

    agent = Agent(env, obs_dim, acs_dim, 10000)
    agent.train()

env 설정해주고, env 정보를 찾아주고 학습만 시키면 된당

 

 

결과

 

 

 

학습초기 (막대기가 쓰러지면 죽음)
학습후

 

전체코드

 

import random
from collections import deque, defaultdict

import numpy as np

import tensorflow as tf
from tensorflow.keras.datasets import mnist
from tensorflow.keras.layers import Conv2D, Flatten, Dense, MaxPooling2D, Input

import matplotlib.pyplot as plt

import gym
from gym import spaces


class ReplayBuffer:
    def __init__(self, buffer_size):
        self.buffer_size = buffer_size
        self.buffer = deque(maxlen=self.buffer_size)
    
    def sample(self, batch_size):
        size = batch_size if len(self.buffer) > batch_size else len(self.buffer)
        return random.sample(self.buffer, size)

    def clear(self):
        self.buffer.clear()
    
    def append(self, transition):
        self.buffer.append(transition)

    def __len__(self):
        return len(self.buffer)


class QNetworkDense(tf.keras.Model):
    def __init__(self, obs_dim, acs_dim):
        super(QNetworkDense, self).__init__()
        self.layer1 = Dense(24, activation='relu', kernel_initializer='he_uniform')
        self.layer2 = Dense(24, activation='relu', kernel_initializer='he_uniform')
        self.output_layer = Dense(acs_dim, activation='linear', kernel_initializer='he_uniform')
        self.build(input_shape=(None,) + obs_dim)

    def call(self, x):
        x = self.layer1(x)
        x = self.layer2(x)
        return self.output_layer(x)
        

class Agent(object):
    def __init__(self, env, obs_dim, acs_dim, steps, 
                gamma=0.99, epsilon=1.0, epsilon_decay=0.999, 
                buffer_size=2000, batch_size=64, target_update_step=100):
        self.env = env
        self.obs_dim = obs_dim
        self.acs_dim = acs_dim
        self.steps = steps
        self.gamma = gamma
        self.epsilon = epsilon
        self.epsilon_decay = epsilon_decay
        self.buffer_size = buffer_size
        self.target_update_step = target_update_step
        self.batch_size = batch_size

        self.replay_buffer = ReplayBuffer(self.buffer_size)
        
        self.q_fn = QNetworkDense(self.obs_dim, self.acs_dim)
        self.q_fn_target = QNetworkDense(self.obs_dim, self.acs_dim)

        self.q_fn.compile(optimizer='adam', loss='mse')
        self.target_update()
        
        self.loss_fn = tf.keras.losses.MeanSquaredError()
        self.optimizer = tf.keras.optimizers.Adam()


    @tf.function
    def learn(self, obs, acs, next_obs, rewards, dones):
        q_target = rewards + (1 - dones) * self.gamma * tf.reduce_max(self.q_fn_target(next_obs), axis=1, keepdims=True)

        with tf.GradientTape() as tape:
            qt = self.q_fn(obs)
            acs_onehot = tf.one_hot(tf.cast(tf.reshape(acs, [-1]), tf.int32), self.acs_dim)
            qt = tf.reduce_sum(qt * acs_onehot, axis=1, keepdims=True)
            loss = self.loss_fn(q_target, qt)
        grads = tape.gradient(loss, self.q_fn.trainable_weights)
        self.optimizer.apply_gradients(zip(grads, self.q_fn.trainable_weights))

    def train(self):
        epochs = 0
        global_step = 0
        reward_list = []
        while global_step < self.steps:
            ob = env.reset()
            rewards = 0
            while True:
                ac = self.select_action(ob)
                next_ob, reward, done, _ = env.step(ac)

                transition = (ob, ac, next_ob, reward, done)
                self.replay_buffer.append(transition)

                rewards += reward
                ob = next_ob

                if done:
                    reward_list.append(rewards)
                    self.target_update()
                    print(f"epochs #{epochs+1} end, score is {rewards}")
                    break

                self.env.render()

                if global_step > 1000:
                    transitions = self.replay_buffer.sample(batch_size=self.batch_size)
                    self.learn(*map(lambda x: np.vstack(x).astype('float32'), np.transpose(transitions)))

                global_step += 1
            epochs += 1
        plt.title('Cartpole-v1')
        plt.xlabel('epoch')
        plt.ylabel('rewards')
        plt.plot(reward_list)
        plt.show()


    def test(self):
        pass

    def target_update(self):
        self.q_fn_target.set_weights(self.q_fn.get_weights())

    def select_action(self, ob):
        self.epsilon *= self.epsilon_decay
        self.epsilon = max(self.epsilon, 0.01)
        if np.random.rand() <= self.epsilon: # exploration
            return np.random.randint(self.acs_dim)
        else:
            action = self.q_fn(ob[np.newaxis])
            return np.argmax(action[0])


if __name__ == '__main__':
    tf.keras.backend.set_floatx('float32')
    env = gym.make('CartPole-v0')

    obs_dim = env.observation_space.shape
    acs_dim = None
    
    if isinstance(env.action_space, spaces.Box):
        acs_type = 'continuous'
        acs_dim = env.action_space.shape
    elif isinstance(env.action_space, spaces.Discrete):
        acs_type = 'discrete'
        acs_dim = env.action_space.n
    else:
        raise NotImplementedError('Not implemented ㅎㅎ')

    agent = Agent(env, obs_dim, acs_dim, 10000)
    agent.train()

 

 

 

ddpg, drqn, rdpg, ppo도 tf2로 짜봐야겠당

 

 

 

'연구 > 딥러닝' 카테고리의 다른 글

[강화학습] tensorflow2로 ddpg  (0) 2021.02.14
[Colab] Colab 빠르게 사용해보기  (0) 2019.11.15
[cs294-112] 1.Supervised Learning of Behaviors  (0) 2019.08.03