본문 바로가기
Deep Learning/Hands On Machine Learning

PG(Policy Gradient) Algorithm

by 대소기 2022. 1. 8.

* Policy Gradient Algorithm(이하 PG 알고리즘) 은 높은 reward를 얻는 방향의 gradient를 따르도록 policy의 parameter를 최하는 알고리즘이다.

REINFORCE 알고리즘

* 1992년 Ronald Williams가 소개한 가장 많이 사용되는 알고리즘 중 하나이다. 알고리즘은 아래와 같다.

1) NN(Neural Network) policy에 따라 여러 번에 걸쳐 게임을 플레이하고 매 step마다 선택된 action이 더 높은 가능성을 가지는 gradient를 계산한다. 이 때 gradient는 NN의 출력 $hat{p}$의 log값 $log(hat{p})$가 더 커지는 방향으로 gradient를 update한다. 이는 경사 하강법의 반대 기법을 사용하는 것(gradient가 더 커지는 방향으로 update)이기 때문에 gradient descent optimizer를 사용하되, 목적 함수를 반대로 하여 적용할 것이다.

2) episode를 몇 번 실행한 다음 각 action의 Advantage을 계산한다(Discount Factor를 통하여).

3) 한 행동의 Advantage가 양수면 좋은 것이므로 미래에 선택될 가능성이 높도록 앞서 계산한 gradient를 적용한다. 반대로 음수의 경우 gradient의 반대를 적용한다(gradient vector와 상응하는 행동의 Advantage를 곱함 - 음수인 advantage를 곱하면 자동으로 반대의 radient가 됨).

4) 모든 결과 gradient vector를 평균 내어 gradient descent step을 수행한다.

def play_one_step(env, obs, model, loss_fn):
    with tf.GradientTape() as tape:
        left_proba = model(obs[np.newaxis]) # model에 obs를 집어넣어 left로 움직일 확률 추출
        action = (tf.random.uniform([1, 1]) > left_proba)
        y_target = tf.constant([[1.]]) - tf.cast(action, tf.float32)
        loss = tf.reduce_mean(loss_fn(y_target, left_proba))
    grads = tape.gradient(loss, model.trainable_variables)
    obs, reward, done, info = env.step(int(action[0, 0].numpy()))
    return obs, reward, done, grads

* environment는 우리가 사용하던 cartpole이다.

* 위 함수는 1 step을 진행하는 함수이다. action에 대한 loss와 gradient계산을 위해 어떤 action을 하더라도 옳다고 가정한다.

* left proba는 왼쪽으로 움직일 확률이다.

* action은 left_proba 확률로 False가 될 것이고(0이 되어 action으로 0(왼쪽으로 움직임)이 입력됨), 1 - left_proba 확률로 True가 될 것이다(1이 되어 action으로 1(오른쪽으로 움직임)이 입력됨).

* y_target은 왼쪽으로 이동할 target 확률을 의미한다. action이 0이면 왼쪽으로 이동할 target 확률이 1이 될 것이다. 반대의 경우 0이 된다.

* parameter로 입력된 loss function을 통해 loss를 계산하고, trainable variable에 대해 gradient를 계산한다. 이 때 loss는 y_target값과 left_proba의 차이이다. 우리가 사용할 모델의 예측 목표는 왼쪽으로 이동할지 아닐지(오른쪽으로 이동할지)를 정확히 맞추는 것이다. 만약 left_proba 확률로 왼쪽으로 이동한다고 예측했고, 실제로 random 한 확률을 sampling한 결과 왼쪽으로 이동했다고 해보자. 그러면 y_target은 1이 될 것이고 이 1과 우리 모델이 예측한 확률 p의 차이를 loss function을 통해 계산해 gradient를 계산하게 된다.

* 마지막으로 선택한 action을 step()메소드를 통해 실행한다.

* reutrn 값으로 obs, reward, done, grads(info가 아님)를 반환한다.

def play_multiple_episodes(env, n_episodes, n_max_steps, model, loss_fn):
    all_rewards = []
    all_grads = []
    for episode in range(n_episodes):
        current_rewards = []
        current_grads = []
        obs = env.reset()
        for step in range(n_max_steps):
            obs, reward, done, grads = play_one_step(env, obs, model, loss_fn)
            current_rewards.append(reward)
            current_grads.append(grads)
            if done:
                break
        all_rewards.append(current_rewards)
        all_grads.append(current_grads)
    return all_rewards, all_grads

* 여러번의 episode를 실행하는 함수이다.

def discount_rewards(rewards, discount_rate):
    discounted = np.array(rewards)
    for step in range(len(rewards) - 2, -1, -1):
        discounted[step] += discounted[step + 1] * discount_rate #현재 reward에 discount된 미래 reward의 합 더해줌
    return discounted

def discount_and_normalize_rewards(all_rewards, discount_rate): #reward 정규화 함수
    all_discounted_rewards = [discount_rewards(rewards, discount_rate)
                              for rewards in all_rewards]
    flat_rewards = np.concatenate(all_discounted_rewards) # episode의 reward들을 flatten해 하나의 array로
    # flatten된 모든 reward들로 mean, std 구함
    reward_mean = flat_rewards.mean()
    reward_std = flat_rewards.std()
    return [(discounted_rewards - reward_mean) / reward_std
            for discounted_rewards in all_discounted_rewards]

* discount_reward함수는 reward를 받아 각 step의 reward에 discount된 미래 reward의 합을 더해줌. discount_and_normalize_rewards는 각 episode의 reward를 모아 discount_reward함수를 사용해 각 reward에 discount된 미래 reward의 합을 더해주고, 각 episode의 reward들을 정규화 해 주는 함수이다.

print(discount_rewards([10, 0, -50], discount_rate=0.8))
print(discount_and_normalize_rewards([[10, 0, -50],[10, 20]], discount_rate=0.8))

# [-22 -40 -50]
# [array([-0.28435071, -0.86597718, -1.18910299]), array([1.26665318, 1.0727777 ])]

* 위에 설명한 두 함수를 test해보았다. normalize된 rewards의 경우 첫 번째 episode의 각 step의 discount된 reward는 보상이 너무 안 좋으므로, 정규화 결과 모두 음수의 값을 띠는 것을 볼 수 있다. 이를 통해 첫 번째 episode는 모든 action들은 나쁜 것으로 간주된다.

n_iterations = 150
n_episodes_per_update = 10
n_max_steps = 200
discount_rate = 0.95

optimizer = keras.optimizers.Adam(learning_rate=0.01)
loss_fn = keras.losses.binary_crossentropy

keras.backend.clear_session()
np.random.seed(42)
tf.random.set_seed(42)

model = keras.models.Sequential([
    keras.layers.Dense(5, activation="elu", input_shape=[4]),
    keras.layers.Dense(1, activation="sigmoid"),
])

env = gym.make("CartPole-v1")
env.seed(42);

for iteration in range(n_iterations): # 150epoch 훈련
    all_rewards, all_grads = play_multiple_episodes(
        env, n_episodes_per_update, n_max_steps, model, loss_fn)
    total_rewards = sum(map(sum, all_rewards))                   
    print("\rIteration: {}, mean rewards: {:.1f}".format(        
        iteration, total_rewards / n_episodes_per_update), end="")
    all_final_rewards = discount_and_normalize_rewards(all_rewards, discount_rate) #rewards 정규화

    all_mean_grads = []
    for var_index in range(len(model.trainable_variables)): # 가중치 행렬 개수만큼 반복
        # episode의 reward list의 element마다 var_index 번째의 variable의 gradient 곱해주고 episode별로 mean 구함.
        mean_grads = tf.reduce_mean(
            [final_reward * all_grads[episode_index][step][var_index]
             for episode_index, final_rewards in enumerate(all_final_rewards)
                 for step, final_reward in enumerate(final_rewards)], axis=0)
        all_mean_grads.append(mean_grads)
    #optimizer에 gradient 적용. 각 weight matrix별로 gradient의 mean값을 적용.    
    optimizer.apply_gradients(zip(all_mean_grads, model.trainable_variables)) 

env.close()

# Iteration: 149, mean rewards: 199.9

* 위 NN policy를 통해 에피소드마다 거의 200에 가까운 평균 보상을 얻을 수 있도록 학습되었다. 다음 포스팅에서는 다른 종류의 알고리즘을 살펴보기 전에 마르코프 결정 과정에 대해 살펴보겠다.