본문 바로가기
Deep Learning/밑바닥부터 시작하는 딥러닝

게이트가 추가된 RNN

by 대소기 2022. 2. 10.

RNN의 문제점

기울기 소실(Gradient Vanishing), 기울기 폭발(Gradient Exploding)

 

* 위 그림은 RNN의 역전파 과정을 나타낸 것이다. +의 경우 상류에서 흘러들어온 기울기를 그대로 흘려보내기 때문에 신경쓰지 말고 MatMul과 tanh의 기울기 역전파 과정을 살펴보자.

* y=tanh(x)의 경우 미분 값이 ${\partial y \over \partial x} = 1 - y^2$이다. 기울기를 그래프로 그려보면 다음과 같다.

 

* tanh(x)미분 값은 모두 1 이하이고, x가 0에서 멀어질수록 0에 다가가다가 0으로 수렴한다. 값이 모두 1 이하이기 때문에 역전파를 계속 거치면서 tanh노드의 역전파가 계속되면 기울기가 0이 된다.

* MatMul 연산의 경우 상류로부터 dh라는 기울기가 들어온다고 가정했을 때 $dhW_h^T$가 전달된다. 매번 같은 가중치 $W_h$가 사용된다는 것을 주의해야 한다.

* MatMul 노드 반복해 지난 기울기는 가중치 행렬의 특잇값에 따라 소실될수도 폭발할 수도 있다. 가중치 $W_h$의 특잇값 중 가장 큰 값이 1보다 크면 지수적으로 증가해 폭발하고 1보다 작으면 지수적으로 감소해 소실되는 것이다(모든 경우에 그렇지는 않고 대체로 그렇다).

 

 

기울기 폭발 대책 - Gradient Clipping

 

$if ||\hat g|| \geq threshold : \hat g = {threshold \over || \hat g|| } \hat g$

 

* 위 수식은 gradient clipping을 의사 코드로 작성한 것이다. $\hat g$는 매개변수의 깅루기를 하나로 모은 것이다.

* python code로 구현하면 아래와 같다.

 

import numpy as np


dW1 = np.random.rand(3, 3) * 10
dW2 = np.random.rand(3, 3) * 10
grads = [dW1, dW2]
max_norm = 5.0


def clip_grads(grads, max_norm):
    total_norm = 0
    for grad in grads:
        total_norm += np.sum(grad ** 2)
    # total gradient의 norm
    total_norm = np.sqrt(total_norm)

    rate = max_norm / (total_norm + 1e-6)
    if rate < 1:
        for grad in grads:
            grad *= rate


print('before:', dW1.flatten())
clip_grads(grads, max_norm)
print('after:', dW1.flatten())

 

 

 

기울기 소실과 LSTM

 

* 기울기 소실은 RNN의 아키텍쳐를 수정해야 해결할 수 있다. 이를 위해 LSTM에 대해 알아보자.

 

 

LSTM의 인터페이스

 

* $tanh(h_{t-1} W_h + x_t W_x + b)$ 계산을 tanh이라는 직사각형 노드 하나로 표현할 수 있다.

 

 

* LSTM 계층과 RNN을 비교해보자.

* c는 memory cell을 뜻한다. hidden state와 달리 lstm 계층 내에서만 전달되고 다른 계층으로는 전달되지 않는다.

 

 

LSTM 계층 조립하기

 

 

 

1) output gate

* output gate는 $c_{t-1}$을 이용해 $h_t$를 계산할 때 $c_{t}$을 다음 은닉 상태에 얼마나 반영할지를 출력한다.

* $\sigma$는 시그모이드 함수를 뜻한다. 시그모이드 함수를 이용하는 이유는 반영 정도를 0 ~ 1 사이의 값으로 표현할 수 있기 때문이다.

* 출력된 output gate의 값인 o는 $c_{t}$와 곱해지는데, 이 때 원소별 곱(Hadamard product)를 시행한다.

 

 

2) forget gate

* forget gate는 $c_{t-1}$을 얼마나 잊을 것인지 정도를 출력한다.

 

 

3) 새로운 기억 셀

 

* memory cell을 update하기 위한 부분이다. 입력 $x_t$와 hidden state $h_{t-1}$이 memory cell에 입력된다.

* gate가 아닌 입력이기 때문에 tanh을 사용하였다.

 

 

4) input gate

* 새롭게 입력된 g를 얼마나 반영할지 결정하는 gate이다. 

 

 

 

 

 

LSTM의 기울기 흐름과 기울기 소실

* $c_t$의 역전파를 보면 더하기 노드에서는 기울기가 그대로 전해지기 때문에 신경쓰지 않아도 되고 곱셈 노드만 신경쓰면 된다.

* LSTM의 역전파에서는 행렬 곱이 아닌 원소별 곱이 이뤄진다. RNN에서는 행렬 곱이 반복적으로 계산되며 기울기 소실 문제가 발생하였지만, LSTM의 경우 원소별 곱이 반복적으로 이뤄지고, 매번 새로운 gate의 값과 곱해지기 때문에 기울기 소실이 일어나지 않는다(정확히는 일어나기 어렵다).

 

 

 

LSTM 구현

* gate 계산에서는 행렬 변환과 평행 이동(편향)을 동시에 시행하는($xW_x + hW_h + b$) Affine transformation이 일어난다.  

* 이렇게 동일한 계산이 반복적으로 이뤄지기 때문에 이를 하나의 식으로 변환하자면 아래와 같아진다.

 

 

* 위와 같이 4번의 행렬 계산을 한 번에 끝낼 수 있다. 한 번에 큰 행렬을 계산하는게 작은 행렬을 여러 번 계산하는 것 보다 속도가 빠르다.

 

 

* 이 과정을 도식화 하면 위와 같다. slice는 affine transformation의 결과를 균등하게 4조각으로 나누는 역할을 한다.

 

 

class LSTM:
    def __init__(self, Wx, Wh, b):
        '''
        Parameters
        ----------
        Wx: 입력 x에 대한 가중치 매개변수(4개분의 가중치가 담겨 있음)
        Wh: 은닉 상태 h에 대한 가장추 매개변수(4개분의 가중치가 담겨 있음)
        b: 편향(4개분의 편향이 담겨 있음)
        '''
        self.params = [Wx, Wh, b]
        self.grads = [np.zeros_like(Wx), np.zeros_like(Wh), np.zeros_like(b)]
        self.cache = None

    def forward(self, x, h_prev, c_prev):
        Wx, Wh, b = self.params
        N, H = h_prev.shape

        A = np.dot(x, Wx) + np.dot(h_prev, Wh) + b

        f = A[:, :H]
        g = A[:, H:2*H]
        i = A[:, 2*H:3*H]
        o = A[:, 3*H:]

        f = sigmoid(f)
        g = np.tanh(g)
        i = sigmoid(i)
        o = sigmoid(o)

        c_next = f * c_prev + g * i
        h_next = o * np.tanh(c_next)

        self.cache = (x, h_prev, c_prev, i, f, g, o, c_next)
        return h_next, c_next

    def backward(self, dh_next, dc_next):
        Wx, Wh, b = self.params
        x, h_prev, c_prev, i, f, g, o, c_next = self.cache

        tanh_c_next = np.tanh(c_next)

        ds = dc_next + (dh_next * o) * (1 - tanh_c_next ** 2)

        dc_prev = ds * f

        di = ds * g
        df = ds * c_prev
        do = dh_next * tanh_c_next
        dg = ds * i

        di *= i * (1 - i)
        df *= f * (1 - f)
        do *= o * (1 - o)
        dg *= (1 - g ** 2)

        dA = np.hstack((df, dg, di, do))

        dWh = np.dot(h_prev.T, dA)
        dWx = np.dot(x.T, dA)
        db = dA.sum(axis=0)

        self.grads[0][...] = dWx
        self.grads[1][...] = dWh
        self.grads[2][...] = db

        dx = np.dot(dA, Wx.T)
        dh_prev = np.dot(dA, Wh.T)

        return dx, dh_prev, dc_prev

 

 

Time LSTM 구현

 

class TimeLSTM:
    def __init__(self, Wx, Wh, b, stateful=False):
        self.params = [Wx, Wh, b]
        self.grads = [np.zeros_like(Wx), np.zeros_like(Wh), np.zeros_like(b)]
        self.layers = None

        self.h, self.c = None, None
        self.dh = None
        self.stateful = stateful

    def forward(self, xs):
        Wx, Wh, b = self.params
        N, T, D = xs.shape
        H = Wh.shape[0]

        self.layers = []
        hs = np.empty((N, T, H), dtype='f')

        if not self.stateful or self.h is None:
            self.h = np.zeros((N, H), dtype='f')
        if not self.stateful or self.c is None:
            self.c = np.zeros((N, H), dtype='f')

        for t in range(T):
            layer = LSTM(*self.params)
            self.h, self.c = layer.forward(xs[:, t, :], self.h, self.c)
            hs[:, t, :] = self.h

            self.layers.append(layer)

        return hs

    def backward(self, dhs):
        Wx, Wh, b = self.params
        N, T, H = dhs.shape
        D = Wx.shape[0]

        dxs = np.empty((N, T, D), dtype='f')
        dh, dc = 0, 0

        grads = [0, 0, 0]
        for t in reversed(range(T)):
            layer = self.layers[t]
            dx, dh, dc = layer.backward(dhs[:, t, :] + dh, dc)
            dxs[:, t, :] = dx
            for i, grad in enumerate(layer.grads):
                grads[i] += grad

        for i, grad in enumerate(grads):
            self.grads[i][...] = grad
        self.dh = dh
        return dxs

    def set_state(self, h, c=None):
        self.h, self.c = h, c

    def reset_state(self):
        self.h, self.c = None, None

* truncated BPTT를 위해 hidden state와 cell state를 인스턴스 변수로 유지한다.

* staetful을 통해 상태를 유지할 것인지를 지정한다.

 

LSTM을 사용한 언어 모델

 

* 앞서 구현한 모델과 동일하지만 lstm을 사용한다는 것만 다르다.

import sys
sys.path.append('..')
from common.time_layers import *
from common.base_model import BaseModel


class Rnnlm(BaseModel):
    def __init__(self, vocab_size=10000, wordvec_size=100, hidden_size=100):
        V, D, H = vocab_size, wordvec_size, hidden_size
        rn = np.random.randn

        # 가중치 초기화
        embed_W = (rn(V, D) / 100).astype('f')
        lstm_Wx = (rn(D, 4 * H) / np.sqrt(D)).astype('f')
        lstm_Wh = (rn(H, 4 * H) / np.sqrt(H)).astype('f')
        lstm_b = np.zeros(4 * H).astype('f')
        affine_W = (rn(H, V) / np.sqrt(H)).astype('f')
        affine_b = np.zeros(V).astype('f')

        # 계층 생성
        self.layers = [
            TimeEmbedding(embed_W),
            TimeLSTM(lstm_Wx, lstm_Wh, lstm_b, stateful=True),
            TimeAffine(affine_W, affine_b)
        ]
        self.loss_layer = TimeSoftmaxWithLoss()
        self.lstm_layer = self.layers[1]

        # 모든 가중치와 기울기를 리스트에 모은다.
        self.params, self.grads = [], []
        for layer in self.layers:
            self.params += layer.params
            self.grads += layer.grads

    def predict(self, xs):
        for layer in self.layers:
            xs = layer.forward(xs)
        return xs

    def forward(self, xs, ts):
        score = self.predict(xs)
        loss = self.loss_layer.forward(score, ts)
        return loss

    def backward(self, dout=1):
        dout = self.loss_layer.backward(dout)
        for layer in reversed(self.layers):
            dout = layer.backward(dout)
        return dout

    def reset_state(self):
        self.lstm_layer.reset_state()

 

* 훈련을 위한 코드는 아래와 같다.

 

# coding: utf-8
import sys
sys.path.append('..')
from common.optimizer import SGD
from common.trainer import RnnlmTrainer
from common.util import eval_perplexity
from dataset import ptb
from rnnlm import Rnnlm


# 하이퍼파라미터 설정
batch_size = 20
wordvec_size = 100
hidden_size = 100  # RNN의 은닉 상태 벡터의 원소 수
time_size = 35     # RNN을 펼치는 크기
lr = 20.0
max_epoch = 4
max_grad = 0.25

# 학습 데이터 읽기
corpus, word_to_id, id_to_word = ptb.load_data('train')
corpus_test, _, _ = ptb.load_data('test')
vocab_size = len(word_to_id)
xs = corpus[:-1]
ts = corpus[1:]

# 모델 생성
model = Rnnlm(vocab_size, wordvec_size, hidden_size)
optimizer = SGD(lr)
trainer = RnnlmTrainer(model, optimizer)

# 기울기 클리핑을 적용하여 학습
trainer.fit(xs, ts, max_epoch, batch_size, time_size, max_grad,
            eval_interval=20)
trainer.plot(ylim=(0, 500))

# 테스트 데이터로 평가
model.reset_state()
ppl_test = eval_perplexity(model, corpus_test)
print('테스트 퍼플렉서티: ', ppl_test)

# 매개변수 저장
model.save_params()

 

 

RNNLM 추가 개선

 

 

LSTM 계층 다층화

 

* RNNLM을 통해 정확한 모델을 만들고자 한다면 LSTM을 깊게 쌓는 방법이 있다.

 

 

 

드롭아웃에 의한 overfitting 억제

 

* 층을 깊게 쌓을 수록 overfitting의 위험성은 커진다. 이를 방지하기 위해 훈련 데이터의 양을 늘리거나 모델의 복잡도를 줄일 수 있지만, 우리는 dropout을 사용할 것이다.

 

* 위와 같이 시계열 방향으로 dropout을 삽입해야 할까? 안된다. dropout을 시계열 방향으로 삽입하면 시간의 흐름에 따라 정보가 사라질 위험성이 있다. 때문에 아래와 같이 상하 방향으로 dropout을 삽입해야 한다.

 

 

* 혹은 variational dropout 구조를 사용할 수도 있다. 이 구조는 같은 계층에 적용되는 dropout끼리는 동일한 마스크를 공유하는 방식으로 시간 방향에 dropout을 적용할 수도 있다는 장점이 있다.

 

 

 

가중치 공유

 

* weight tying을 사용하면 학습해야할 매개변수의 수를 줄일 수 있다. 이는 학습시간을 줄여줄 뿐만 아니라 학습해야 할 매개변수의 개수도 줄여주기 떄문에 과적합 억제의 기능도 하게 된다.

 

* weight tying은 그저 Embedding 계층의 V x H 크기의 가중치를 전치해 H x V 크기의 가중치를 Affine 계층에 사용하는 것으로 구현할 수 있다.

 

 

 

개선된 RNNLM 구현

 

import sys
sys.path.append('..')
from common.time_layers import *
from common.np import *  # import numpy as np
from common.base_model import BaseModel


class BetterRnnlm(BaseModel):
    '''
     LSTM 계층을 2개 사용하고 각 층에 드롭아웃을 적용한 모델이다.
     아래 [1]에서 제안한 모델을 기초로 하였고, [2]와 [3]의 가중치 공유(weight tying)를 적용했다.
     [1] Recurrent Neural Network Regularization (https://arxiv.org/abs/1409.2329)
     [2] Using the Output Embedding to Improve Language Models (https://arxiv.org/abs/1608.05859)
     [3] Tying Word Vectors and Word Classifiers (https://arxiv.org/pdf/1611.01462.pdf)
    '''
    def __init__(self, vocab_size=10000, wordvec_size=650,
                 hidden_size=650, dropout_ratio=0.5):
        V, D, H = vocab_size, wordvec_size, hidden_size
        rn = np.random.randn

        embed_W = (rn(V, D) / 100).astype('f')
        lstm_Wx1 = (rn(D, 4 * H) / np.sqrt(D)).astype('f')
        lstm_Wh1 = (rn(H, 4 * H) / np.sqrt(H)).astype('f')
        lstm_b1 = np.zeros(4 * H).astype('f')
        lstm_Wx2 = (rn(H, 4 * H) / np.sqrt(H)).astype('f')
        lstm_Wh2 = (rn(H, 4 * H) / np.sqrt(H)).astype('f')
        lstm_b2 = np.zeros(4 * H).astype('f')
        affine_b = np.zeros(V).astype('f')

        self.layers = [
            TimeEmbedding(embed_W),
            TimeDropout(dropout_ratio),
            TimeLSTM(lstm_Wx1, lstm_Wh1, lstm_b1, stateful=True),
            TimeDropout(dropout_ratio),
            TimeLSTM(lstm_Wx2, lstm_Wh2, lstm_b2, stateful=True),
            TimeDropout(dropout_ratio),
            TimeAffine(embed_W.T, affine_b)  # weight tying!!
        ]
        self.loss_layer = TimeSoftmaxWithLoss()
        self.lstm_layers = [self.layers[2], self.layers[4]]
        self.drop_layers = [self.layers[1], self.layers[3], self.layers[5]]

        self.params, self.grads = [], []
        for layer in self.layers:
            self.params += layer.params
            self.grads += layer.grads

    def predict(self, xs, train_flg=False):
        for layer in self.drop_layers:
            layer.train_flg = train_flg

        for layer in self.layers:
            xs = layer.forward(xs)
        return xs

    def forward(self, xs, ts, train_flg=True):
        score = self.predict(xs, train_flg)
        loss = self.loss_layer.forward(score, ts)
        return loss

    def backward(self, dout=1):
        dout = self.loss_layer.backward(dout)
        for layer in reversed(self.layers):
            dout = layer.backward(dout)
        return dout

    def reset_state(self):
        for layer in self.lstm_layers:
            layer.reset_state()

 

* 학습 코드는 다음과 같다.

 

import sys
sys.path.append('..')
from common import config
# GPU에서 실행하려면 아래 주석을 해제하세요(CuPy 필요).
# ==============================================
# config.GPU = True
# ==============================================
from common.optimizer import SGD
from common.trainer import RnnlmTrainer
from common.util import eval_perplexity, to_gpu
from dataset import ptb
from better_rnnlm import BetterRnnlm


# 하이퍼파라미터 설정
batch_size = 20
wordvec_size = 650
hidden_size = 650
time_size = 35
lr = 20.0
max_epoch = 40
max_grad = 0.25
dropout = 0.5

# 학습 데이터 읽기
corpus, word_to_id, id_to_word = ptb.load_data('train')
corpus_val, _, _ = ptb.load_data('val')
corpus_test, _, _ = ptb.load_data('test')

if config.GPU:
    corpus = to_gpu(corpus)
    corpus_val = to_gpu(corpus_val)
    corpus_test = to_gpu(corpus_test)

vocab_size = len(word_to_id)
xs = corpus[:-1]
ts = corpus[1:]

model = BetterRnnlm(vocab_size, wordvec_size, hidden_size, dropout)
optimizer = SGD(lr)
trainer = RnnlmTrainer(model, optimizer)

best_ppl = float('inf')
for epoch in range(max_epoch):
    trainer.fit(xs, ts, max_epoch=1, batch_size=batch_size,
                time_size=time_size, max_grad=max_grad)

    model.reset_state()
    ppl = eval_perplexity(model, corpus_val)
    print('검증 퍼플렉서티: ', ppl)

    if best_ppl > ppl:
        best_ppl = ppl
        model.save_params()
    else:
        lr /= 4.0
        optimizer.lr = lr

    model.reset_state()
    print('-' * 50)


# 테스트 데이터로 평가
model.reset_state()
ppl_test = eval_perplexity(model, corpus_test)
print('테스트 퍼플렉서티: ', ppl_test)

 

'Deep Learning > 밑바닥부터 시작하는 딥러닝' 카테고리의 다른 글

어텐션(Attention)  (0) 2022.02.18
RNN을 사용한 문장 생성  (0) 2022.02.18
순환신경망(RNN)  (0) 2022.02.10
word2vec 속도개선  (0) 2022.02.10
Word2Vec  (0) 2022.02.03