본문 바로가기
Deep Learning/밑바닥부터 시작하는 딥러닝

RNN을 사용한 문장 생성

by 대소기 2022. 2. 18.

언어 모델을 사용한 문장 생성

RNN을 사용한 문장 생성의 순서

 

* 위 그림은 LSTM을 사용한 언어모델을 도식화 한 것이다.

* 만약 위 모델을 사용하여 문장을 생성한다고 해보자. ' I '라는 단어를 입력하면 모델은 다음에 올 단어의 확률분포를 출력한다.

 

* 이 때 우리는 다음에 올 단어를 선택하는 2가지 선택에 마주한다.

 

1) 결정적(deterministic) 방법

- 확률이 가장 높은 단어를 선택하는 방법

- 가장 확률이 높은 단어를 선택하기 때문에 동일한 input에 대해 항상 동일한 output을 출력한다.

 

2) 확률적(probabilistic) 방법

- 확률이 가장 높은 단어를 무조건 선택하는 것이 아닌, 확률분포대로 sampling하는 것

- 동일한 input에 대해 다른 output이 출력될 수 있다.

 

* 두 가지 방법 중 확률적 방법이 다양한 문장을 생성하기 좋기 때문에 확률적 방법을 사용해보자(어떤 방법을 사용하더라도 전체적인 작동 원리는 같다).

* 출력된 단어는 다시 다음 time step의 입력으로 사용된다. 

* 이렇게 단어를 출력하다가 <eos> 토큰이 출력되면 문장 생성을 중지한다.

* 이렇게 모델이 문장의 단어 등장 패턴을 학습하여 새로운 문장을 생성하는 것이 가능하다.

 

 

 

문장 생성 구현

 

import sys
sys.path.append('..')
import numpy as np
from common.functions import softmax
from ch06.rnnlm import Rnnlm
from ch06.better_rnnlm import BetterRnnlm


class RnnlmGen(Rnnlm):
	# skip_ids = 최초로 주는 단어의 ID
    def generate(self, start_id, skip_ids=None, sample_size=100):
        word_ids = [start_id]

        x = start_id
        while len(word_ids) < sample_size:
            x = np.array(x).reshape(1, 1)
            score = self.predict(x) # 각 단어의 점수 출력
            p = softmax(score.flatten()) # 점수들을 softmax를 이용해 정규화

            sampled = np.random.choice(len(p), size=1, p=p) #softmax 결과대로 sample
            if (skip_ids is None) or (sampled not in skip_ids):
                x = sampled
                word_ids.append(int(x))

        return word_ids

* 위 RnnlmGen 클래스는 Rnnlm을 계승하여 만든 클래스이다.

* skip ID는 단어 ID의 리스트이고, 리스트에 속하는 단어 ID는 샘플링되지 않도록 해준다. <unk>나 N 등 전처리도니 단어를 샘플링하지 않게 하는 용도로 사용된다.

 

 

* 모델을 만들었으니 문장을 생성해보자.

corpus, word_to_id, id_to_word = ptb.load_data('train')
vocab_size = len(word_to_id)
corpus_size = len(corpus)

model = RnnlmGen()
# model.load_params('../Rnnlm.pkl')

# 시작(start) 문자와 건너뜀(skip) 문자 설정
start_word = 'you'
start_id = word_to_id[start_word]
skip_words = ['N', '<unk>', '$']
skip_ids = [word_to_id[w] for w in skip_words]

# 문장 생성
word_ids = model.generate(start_id, skip_ids)
# txt가 ID배열을 문장으로 변환해준다.
txt = ' '.join([id_to_word[i] for i in word_ids])
txt = txt.replace(' <eos>', '.\n')
print(txt)

# you numerous mystery diluted incorrectly plummeted manitoba fully painted usx fend korea powerful realist competition invests breed wellington holidays creativity jail impetus rothschilds aware hitachi suggested real powerhouse wage solicitation climbing collateralized laband begin sequester shape del. ...

* 위 모델은 학습이 전혀 되지 않은 랜덤 파라미터를 가진 모델이기 때문에 생성된 문장도 엉망인 것을 볼 수 있다.

* 만약 이전 포스트에서 사용했던 Rnnlm의 가중치를 사용하기 위해서는 model.load_params의 주석을 풀어주면 된다.

 

corpus, word_to_id, id_to_word = ptb.load_data('train')
vocab_size = len(word_to_id)
corpus_size = len(corpus)

model = RnnlmGen()
model.load_params('../Rnnlm.pkl')

# 시작(start) 문자와 건너뜀(skip) 문자 설정
start_word = 'you'
start_id = word_to_id[start_word]
skip_words = ['N', '<unk>', '$']
skip_ids = [word_to_id[w] for w in skip_words]

# 문장 생성
word_ids = model.generate(start_id, skip_ids)
# txt가 ID배열을 문장으로 변환해준다.
txt = ' '.join([id_to_word[i] for i in word_ids])
txt = txt.replace(' <eos>', '.\n')
print(txt)

# you talks on the institutions ' parent of taxes increases and played buying differ management and ca n't gauge death. mr. jones said remains throughout contrast with arby 's state created for navigation mixte 's west stock to make close on confidential medical....

* 훈련된 가중치를 사용한 결과 더 자연스러운 문장이 생성되었지만, 이 보다 좋은 모델을 만들어보자.

 

 

 

 

더 좋은 문장으로

class BetterRnnlmGen(BetterRnnlm):
    def generate(self, start_id, skip_ids=None, sample_size=100):
        word_ids = [start_id]

        x = start_id
        while len(word_ids) < sample_size:
            x = np.array(x).reshape(1, 1)
            score = self.predict(x).flatten()
            p = softmax(score).flatten()

            sampled = np.random.choice(len(p), size=1, p=p)
            if (skip_ids is None) or (sampled not in skip_ids):
                x = sampled
                word_ids.append(int(x))

        return word_ids

    def get_state(self):
        states = []
        for layer in self.lstm_layers:
            states.append((layer.h, layer.c))
        return states

    def set_state(self, states):
        for layer, state in zip(self.lstm_layers, states):
            layer.set_state(*state)
            
you would pitch attracting die mr. warren says. the most part the firm is willing to take he one. the jittery difference between china and england of the u.k. finally proved more success in the ec in making the of all free barriers in putting the goods together. it would guide the problems where they break a package with greater quality to the west....

* 더 좋은 언어 모델을 만들기 위해 이전 포스팅에서 dropout과 가중치 묶기를 시행하여 perplexity가 더 낮았던 모델을 사용해보자.

 

 

 

* 논문 Regularizing and optimizing LSTM language models에 나온 신기한 실험 하나를 해보자면, 우리는 LSTM 계층에 단어열 정보를 유지할 수 있다.

* 예를 들어 'the meaning of life is'라는 글을 주고 말을 생성할 때 모델에 ['the', 'meaning', 'of', 'life']를 차례로 입력하고, 'is'를 첫 단어로 입력해 문장 생성을 시작시키면 'the meaning of life is'에 이어지는 문장을 생성할 수 있다.

 

start_words = 'the meaning of life is'
start_ids = [word_to_id[w] for w in start_words.split(' ')]

for x in start_ids[:-1]:
    x = np.array(x).reshape(1, 1)
    model.predict(x)

word_ids = model.generate(start_ids[-1], skip_ids)
word_ids = start_ids[:-1] + word_ids
txt = ' '.join([id_to_word[i] for i in word_ids])
txt = txt.replace(' <eos>', '.\n')
print('-' * 50)
print(txt)

# the meaning of life is not a good version of paintings

 

 

 

 

Seq2Seq(Sequence to Sequence)

 

* 시계열 데이터를 다른 시계열 데이터로 변환하는 모델을 seq2seq 모델이라고 한다.

 

 

seq2seq의 구조

* seq2seq은 encoder, decoder로 이뤄져 있다.

 

 

Encoder

* encoder에서는 time step마다 단어 혹은 글자를 입력받는다. 그리고 마지막 hidden state vector h(고정 길이 vector)를 decoder에게 넘겨주게 된다. 이 마지막 time step의 hidden state vector에는 입력되었던 모든 정보들에 대한 응축된 정보가 들어있게 된다.

 

 

Decoder

* decoder는 encoder와 구조가 동일하다. 다만, 최초 LSTM의 입력으로 <eos>와 같은 새로운 input 뿐만 아니라 이전 encoder에서 전해져온 hidden state vector h를 전달받는다는 점이다.

* encoder는 굳이 따지자면 lstm의 입력으로 zero vector에 해당하는 hidden state vector를 입력받는다고 볼 수 있다.

 

 

 

seq2seq의 전체 계층 구성

* seq2seq은 LSTM 두 개(Encoder의 LSTM과 Decoder의 LSTM)로 구성된다.

* 기울기는 decoder에서 encoder로 전달된다.

 

 

 

시계열 데이터 변환용 Toy Problem - seq2seq 구현

* seq2seq을 이용해 덧셈을 수행하는 계산기를 만들어보자.

* 하지만 입력을 '57 + 5'와 같은 string으로 주고 출력을 '62'와 같이 구성하기 때문에 덧셈 로직 없이 문자의 등장 패턴만으로 결과적으로 덧셈을 수행하는 계산기를 생성할 것이다.

* 문장의 입력은 문자 단위로 분할되어 진행된다.

 

 

가변 길이 시계열 데이터

* 모델을 만들기 이전에 입력되는 문자의 길이(timestep)를 생각해보자. '57+5'는 length가 4이고, '62+1149'는 length가 7이다. 이렇게 문장별로 길이가 다르기 때문에 lstm에 들어갈 때 문장별로 시간 방향의 크기가 다르게 된다.

* 이를 해결하기 위해 문장을 고정 길이 시계열 데이터로 바꿀 것이며 zero-padding을 집어넣을 것이다.

* zero padding은 문장의 가장 긴 길이인 '999+999' 즉, 7로 설정한다. 출력의 경우 입력과 구분을 위해 앞에 구분자로서 ' _ '를 붙이기로 한다. 이 구분자는 decoder에 입력될 경우 문자열을 생성하라고 알리는 토큰과 같이 활용된다.

* 하지만 이 zero padding은 seq2seq에 전용 처리를 추가해줘야 한다. 입력된 데이터가 padding이라면 mask를 이용하여 이전 시각의 입력을 그대로 출력하게 해줘야 한다. 이런 과정을 통해 zero padding을 입력으로서 loss에 반영하지 않을 수 있다.

* 실제로는 이렇게 해줘야 하지만, 구현의 용이성을 위해 zero padding도 입력으로 처리해보자.

 

from dataset import sequence
  
(x_train, t_train), (x_test, t_test) = \
    sequence.load_data('addition.txt', seed=1984)
char_to_id, id_to_char = sequence.get_vocab()
# char_to_id : {'1': 0, '6': 1, '+': 2, '7': 3, '5': 4, ' ': 5, '_': 6, '9': 7, '2': 8, '0': 9, '3': 10, '8': 11, '4': 12}
print(x_train.shape, t_train.shape)
print(x_test.shape, t_test.shape)
# (45000, 7) (45000, 5)
# (5000, 7) (5000, 5)

print(x_train[0])
print(t_train[0])
# [ 3 0 2 0 0 11 5 ]
# [ 6 0 11 7 5 ]

print(''.join([id_to_char[c] for c in x_train[0]]))
print(''.join([id_to_char[c] for c in t_train[0]]))
# 71+118
# _189

 

Encoder class

 

* encoder class는 embedding과 lstm으로 이뤄져 있다.

 

class Encoder:
    def __init__(self, vocab_size, wordvec_size, hidden_size):
        # vocab_size 문자 종류 0~9, '+', '공백 문자', '_'
        V, D ,H = vocab_size, wordvec_size, hidden_size
        rn = np.random.randn
        
        embed_W = (rn(V, D) / 100).astype('f')
        lstm_Wx = (rn(D, 4 * H) / np.sqrt(D)).astype('f')
        lstm_Wh = (rn(H, 4 * H) / np.sqrt(H)).astype('f')
        lstm_b = np.zeros(4 * H).astype('f')
        
        self.embed = TimeEmbedding(embed_W)
        self.lstm = TimeLSTM(lstm_Wx, lstm_Wh, lstm_b, stateful=False)
        
        self.params = self.embed.params + self.lstm.params
        self.grads = self.embed.grads + self.lstm.grads
        self.hs = None
        
    def forward(self, xs):
        xs = self.embed.forward(xs)
        hs = self.lstm.forward(xs)
        self.hs = hs
        return hs[:, -1, :]
    
    def backward(self, dh):
        dhs = np.zeros_like(self.hs)
        dhs[:, -1, :] = dh
        
        dout = self.lstm.backward(dhs)
        dout = self.embed.backward(dout)
        return dout

 

 

Decoder class

* Encoder와 마찬가지로 LSTM 계층을 사용하면 되고, 이 때 Decoder의 계층 구성은 위와 같다.

* 학습을 자세히 살펴보자면 아래와 같다.

* 위와 같이 첫 번째 입력으로 _ 를 입력하게 된다. 입력으로 ['_' , '6', '2', ' ']가 입력되면 출력으로 [6, 2, ' ', ' ']가 출력되는 구조이다.

* 앞서 문장을 생성하는 언어모델에서 살펴봤던 것과 같이 출력에 대한 결정은 '결정적 방법', '확률적 방법' 두 가지를 사용할 수 있다. 여기서는 랜덤한 문장을 생성하는 것과 같은 task가 아니므로 결정적 방법을 통해 확실한 답을 출력할 것이다.

 

* 때문에 학습시와 다르게 test시에는 argmax를 통해 가장 높은 문자 ID만 출력하도록 한다.

 

 

class Decoder:
    def __init__(self, vocab_size, wordvec_size, hidden_size):
        V, D, H = vocab_size, wordvec_size, hidden_size
        rn = np.random.randn

        embed_W = (rn(V, D) / 100).astype('f')
        lstm_Wx = (rn(D, 4 * H) / np.sqrt(D)).astype('f')
        lstm_Wh = (rn(H, 4 * H) / np.sqrt(H)).astype('f')
        lstm_b = np.zeros(4 * H).astype('f')
        affine_W = (rn(H, V) / np.sqrt(H)).astype('f')
        affine_b = np.zeros(V).astype('f')

        self.embed = TimeEmbedding(embed_W)
        self.lstm = TimeLSTM(lstm_Wx, lstm_Wh, lstm_b, stateful=True)
        self.affine = TimeAffine(affine_W, affine_b)

        self.params, self.grads = [], []
        for layer in (self.embed, self.lstm, self.affine):
            self.params += layer.params
            self.grads += layer.grads

    def forward(self, xs, h):
        self.lstm.set_state(h)

        out = self.embed.forward(xs)
        out = self.lstm.forward(out)
        score = self.affine.forward(out)
        return score

    def backward(self, dscore):
        dout = self.affine.backward(dscore)
        dout = self.lstm.backward(dout)
        dout = self.embed.backward(dout)
        dh = self.lstm.dh
        return dh

    def generate(self, h, start_id, sample_size):
        sampled = []
        sample_id = start_id
        self.lstm.set_state(h)

        for _ in range(sample_size):
            x = np.array(sample_id).reshape((1, 1))
            out = self.embed.forward(x)
            out = self.lstm.forward(out)
            score = self.affine.forward(out)

            sample_id = np.argmax(score.flatten())
            sampled.append(int(sample_id))

        return sampled

* 학습 시에는 forward와 backward를 사용하고, test시에는 generate 함수를 사용해 argmax를 사용해 가장 높은 값을 출력한다.

 

 

seq2seq class

class Seq2seq(BaseModel):
    def __init__(self, vocab_size, wordvec_size, hidden_size):
        V, D, H = vocab_size, wordvec_size, hidden_size
        self.encoder = Encoder(V, D, H)
        self.decoder = Decoder(V, D, H)
        self.softmax = TimeSoftmaxWithLoss()

        self.params = self.encoder.params + self.decoder.params
        self.grads = self.encoder.grads + self.decoder.grads

    def forward(self, xs, ts):
        decoder_xs, decoder_ts = ts[:, :-1], ts[:, 1:] 

        h = self.encoder.forward(xs)
        score = self.decoder.forward(decoder_xs, h)
        loss = self.softmax.forward(score, decoder_ts)
        return loss

    def backward(self, dout=1):
        dout = self.softmax.backward(dout)
        dh = self.decoder.backward(dout)
        dout = self.encoder.backward(dh)
        return dout

    def generate(self, xs, start_id, sample_size):
        h = self.encoder.forward(xs)
        sampled = self.decoder.generate(h, start_id, sample_size)
        return sampled

 

 

 

 

seq2seq 평가

import sys
sys.path.append('..')
import numpy as np
import matplotlib.pyplot as plt
from dataset import sequence
from common.optimizer import Adam
from common.trainer import Trainer
from common.util import eval_seq2seq
from seq2seq import Seq2seq
from peeky_seq2seq import PeekySeq2seq


# 데이터셋 읽기
(x_train, t_train), (x_test, t_test) = sequence.load_data('addition.txt')
char_to_id, id_to_char = sequence.get_vocab()

# 입력 반전 여부 설정 =============================================
is_reverse = False  # True
if is_reverse:
    x_train, x_test = x_train[:, ::-1], x_test[:, ::-1]
# ================================================================

# 하이퍼파라미터 설정
vocab_size = len(char_to_id)
wordvec_size = 16
hidden_size = 128
batch_size = 128
max_epoch = 25
max_grad = 5.0

# 일반 혹은 엿보기(Peeky) 설정 =====================================
model = Seq2seq(vocab_size, wordvec_size, hidden_size)
# model = PeekySeq2seq(vocab_size, wordvec_size, hidden_size)
# ================================================================
optimizer = Adam()
trainer = Trainer(model, optimizer)

acc_list = []
for epoch in range(max_epoch):
    trainer.fit(x_train, t_train, max_epoch=1,
                batch_size=batch_size, max_grad=max_grad)

    correct_num = 0
    for i in range(len(x_test)):
        question, correct = x_test[[i]], t_test[[i]]
        verbose = i < 10
        correct_num += eval_seq2seq(model, question, correct,
                                    id_to_char, verbose, is_reverse)

    acc = float(correct_num) / len(x_test)
    acc_list.append(acc)
    print('검증 정확도 %.3f%%' % (acc * 100))

 

* 학습이 끝난 시점까지 정확도를 확인해보면 상승 추세인 것은 확실한 것 같다. 하지만 이보다 좋은 모델을 생성할 수 있다. 다음을 살펴보자.

 

 

 

 

seq2seq 개선

 

 

 

입력 데이터 반전(Reverse)

* 논문 [41]에서 제시된 방법으로 입력 데이터를 반전시켜 학습하는 것이 있다.

 

# 입력 반전 여부 설정 =============================================
is_reverse = False  # True
if is_reverse:
    x_train, x_test = x_train[:, ::-1], x_test[:, ::-1]
# ================================================================

 

* 위 코드를 통해 입력 데이터셋을 반전시킨 후 학습해보면 아래와 같이 훨씬 정확도가 상승하는 것을 볼 수 있다.

 

* 이는 입력 데이터의 시점과 대응되는 출력 데이터의 시점이 더 가까워지는 것 때문이라고 직관적으로 이해할 수 있다.

* 예를 들어 'I am a cat'을 '나는 고양이 이다.' 로 변환할 때 뒤집은 입력인 'cat a am I' 의 ' I '가 '나'와 더 가까워지기 때문에 기울기가 직접 전해져 학습 효율이 더 높아진다고 이해할 수 있다. 

* 물론 뒤집기만 했으므로 대응되는 단어 사이의 '평균' 거리는 동일하다.

 

 

 

Peeky Decoder

 

* peeky decoder는 decoder의 모든 시점에서 encoder에서 전달된 hidden state vector h를 활용하는 것을 뜻한다.

* 그런데 이렇게 되면 affine 계층에서는 encoder로 부터 전해져 온 hidden state vector h와 LSTM의 출력 vector 두 개의 vector가 입력된다. 이 때 입력은 concatenate되어 softmax with loss 계층으로 들어가게 된다.

 

 

class PeekyDecoder:
    def __init__(self, vocab_size, wordvec_size, hidden_size):
        V, D, H = vocab_size, wordvec_size, hidden_size
        rn = np.random.randn

        embed_W = (rn(V, D) / 100).astype('f')
        lstm_Wx = (rn(H + D, 4 * H) / np.sqrt(H + D)).astype('f')
        lstm_Wh = (rn(H, 4 * H) / np.sqrt(H)).astype('f')
        lstm_b = np.zeros(4 * H).astype('f')
        affine_W = (rn(H + H, V) / np.sqrt(H + H)).astype('f')
        affine_b = np.zeros(V).astype('f')

        self.embed = TimeEmbedding(embed_W)
        self.lstm = TimeLSTM(lstm_Wx, lstm_Wh, lstm_b, stateful=True)
        self.affine = TimeAffine(affine_W, affine_b)

        self.params, self.grads = [], []
        for layer in (self.embed, self.lstm, self.affine):
            self.params += layer.params
            self.grads += layer.grads
        self.cache = None

    def forward(self, xs, h):
        N, T = xs.shape
        N, H = h.shape

        self.lstm.set_state(h)

        out = self.embed.forward(xs)
        hs = np.repeat(h, T, axis=0).reshape(N, T, H) # 시계열 T만큼 h vector를 복제
        out = np.concatenate((hs, out), axis=2) # embedding계층의 출력과 hs 연결

        out = self.lstm.forward(out)
        out = np.concatenate((hs, out), axis=2) # lstm계층의 출력과 hs 연결

        score = self.affine.forward(out)
        self.cache = H
        return score

    def backward(self, dscore):
        H = self.cache

        dout = self.affine.backward(dscore)
        dout, dhs0 = dout[:, :, H:], dout[:, :, :H]
        dout = self.lstm.backward(dout)
        dembed, dhs1 = dout[:, :, H:], dout[:, :, :H]
        self.embed.backward(dembed)

        dhs = dhs0 + dhs1
        dh = self.lstm.dh + np.sum(dhs, axis=1)
        return dh

    def generate(self, h, start_id, sample_size):
        sampled = []
        char_id = start_id
        self.lstm.set_state(h)

        H = h.shape[1]
        peeky_h = h.reshape(1, 1, H)
        for _ in range(sample_size):
            x = np.array([char_id]).reshape((1, 1))
            out = self.embed.forward(x)

            out = np.concatenate((peeky_h, out), axis=2)
            out = self.lstm.forward(out)
            out = np.concatenate((peeky_h, out), axis=2)
            score = self.affine.forward(out)

            char_id = np.argmax(score.flatten())
            sampled.append(char_id)

        return sampled

 

* peeky decoder를 생성하였으니 이를 활용해 peeky seq2seq을 생성해보자.

 

from seq2seq import Seq2seq, Encoder

# peekyseq2seq은 seq2seq class를 계승
class PeekySeq2seq(Seq2seq):
    def __init__(self, vocab_size, wordvec_size, hidden_size):
        V, D, H = vocab_size, wordvec_size, hidden_size
        self.encoder = Encoder(V, D, H)
        self.decoder = PeekyDecoder(V, D, H)
        self.softmax = TimeSoftmaxWithLoss()

        self.params = self.encoder.params + self.decoder.params
        self.grads = self.encoder.grads + self.decoder.grads

* 이 peeky seq2seq을 사용했을 때의 훈련 결과를 확인해보자. 추가적으로 입력 반전까지 시행한 상태이다.

 

* 한 가지 peeky seq2seq에서 주의할 점은 입력 벡터의 크기가 증가하기 때문에 훈련 가중치의 크기도 증가한다는 점이다. 때문에 훈련이 더 오래걸릴 수 있다.

 

 

 

 

 

7.5 seq2seq을 이용하는 애플리케이션

 

 

seq2seq의 활용분야

* 한 시계열 데이터를 다른 시계열 데이터로 변경하는 task들에 적용할 수 있다.

- 기계번역 : '한 언어의 문장'을 '다른 언어의 문장'으로 변환

- 자동요약 : '긴 문장'을 '짧은 문장'으로 변환

- 질의응답 : '질문'을 '응답'으로 변환

- 메일 자동 응답 : '받은 메일의 문장'을 '답변 글'로 변환

 

* 이 외에도 음성, 영상 등에도 적용이 가능하다.

- 이미지 캡셔닝 : encoder에 CNN을 사용함. 이미지를 문장으로 변환

'Deep Learning > 밑바닥부터 시작하는 딥러닝' 카테고리의 다른 글

어텐션(Attention)  (0) 2022.02.18
게이트가 추가된 RNN  (0) 2022.02.10
순환신경망(RNN)  (0) 2022.02.10
word2vec 속도개선  (0) 2022.02.10
Word2Vec  (0) 2022.02.03