Artificial Intelligence

Implementing a RNN with numpy

Javier Cárdenas

30/06/2021

No Comments

In today’s post we will try to build a Recurrent Neural Network with numpy, in order to get a better understanding of how recurrent algorithms are used in NLP.

The limits of my language mean the limits of my world.

Ludwig Wittgenstein

Natural language processing is a branch of artificial intelligence that focuses on enabling computers to understand and manipulate human language.

It is fascinating to see how fast has natural language processing evolved during the last decade. As an example, the paragraph above in italic was written by GTP3, one of the most powerful language models nowadays. I just wrote the phrase “Natural language processing is” and the model returned the rest of the phrase.

But alongside evolution sometimes comes complexity and, similar to what we have seen in other posts, when using deep learning frameworks to build complex models sometimes it feels as if we were working with a black box.

So I thought that building a RNN from scratch could help us gain some intuition of how these complex models work.

Why a RNN from scratch?

RNN is one of the simplest algorithms used in NLP and once we have the intuition of how it works, it will be much easier for us to understand more complex models.

Furthermore, sometimes, instead of reading about it, building the model from scratch forces you to understand what is happening under the hood.

Our model

For our example, we will build a RNN that will try to create new (or real) names of people. In order to do so, we will build the typical many-to-many RNN. This is, given a set of \(n \) inputs, the model iterates for each one of them to compute the expected \(n \) outputs.

Representation of forward propagation
Representation of forward propagation

In our case, the input will be a name containing \(n\) letters and the model will recursively try to predict the next following letter for each letter in the word.

I once did a similar example in the DeepLearningAI deep learning course, but I never had the time to try and build one completely from scratch.

Give me the code!

Even though our class RNNModel will have other methods inside, in this post we will only go through the main ones. These are:

  • Forward propagation.
  • Loss calculation.
  • Backward propagation.
  • Clip method.

Some other methods have been implemented but are not shown in this post, such as:

  • Random initialization of parameters of the model
  • Optimization method.

Please note that the full model with the methods used can be found here.

class RNNModel:
    """
    Recurrent neural network implementation.
    """
    def __init__(self, input_dim, output_dim, hidden_dim):
        """
        """
        pass

    def forward(self, input_X):
        """
        Computes the forward propagation of the RNN.
        """
        pass

    def loss(self, Y):
        """
        Computes the Loss of the RNN.
        """
        pass

    def backward(self):  
        """
        Computes the backward propagation of the RNN.
        """
        pass

    def clip(self, clip_value):
        """
        Clips the gradients in order to avoid the problem of 
        exploding gradient.
        """
        pass

    def optimize(self):
        """
        Updates the parameters of the model.
        """
        pass  

The forward propagation

The forward propagation of the RNN is really simple and can be resumed with the following formulas:

Representation of a RNN cell
Representation of a RNN cell
  • Hidden state: $$ a_{t} = tanh(W_{a,x} * X + W_{a, a} * a_{t-1} + b_{a})
    $$
  • Y prediction: $$ y_{pred} = softmax(W_{y,a} * a_{t} + b_{y}) $$

In our example, for each epoch, we will iterate (with the same weights) over these two formulas for each letter in the word. The \(a\) denotes the hidden state, and \(a_{t-1}\) the hidden state of the previous cell. Note that the first \(a\), \(a_{0}\) will be set to 0.

    def forward(self, input_X):
        """
        Computes the forward propagation of the RNN.
        Parameters
        ----------
        input_X : numpy.array or list
            List containing all the inputs that will be used to 
            propagate along the RNN cell.
        Returns
        -------
        y_preds : list
            List containing all the preditions for each input of the
            input_X list.
        """
        self.input_X = input_X

        self.layers_tanh = [Tanh() for x in input_X]
        hidden = np.zeros((self.hidden_dim , 1))
        
        self.hidden_list = [hidden]
        self.y_preds = []

        for input_x, layer_tanh in zip(input_X, self.layers_tanh):
            input_tanh = np.dot(self.Wax, input_x) + np.dot(self.Waa, hidden) + self.b
            hidden = layer_tanh.forward(input_tanh)
            self.hidden_list.append(hidden)

            input_softmax = np.dot(self.Wya, hidden) + self.by
            y_pred = self.softmax.forward(input_softmax)
            self.y_preds.append(y_pred)

        return self.y_preds

Cross Entropy loss

Since we are doing a many-to-many algorithm, we will need to sum up all the losses obtained by each prediction (letter) made for the given word.

Following the example, for the word ‘Hello’ we will have an input 5 and an output of 5 letters as well, so we will have to sum up the losses obtained for the 5 letters predicted.

$$
l_{t}(y_{pred, t}, y_{t}) = -y_{t} * log(y_{pred, t}) – (1-y_{t}) * log(1 – y_{pred, t})
$$

$$
L(y_{pred}, y) = \sum_{i} l_{t}(y_{pred, t}, y_{t})
$$

class CrossEntropyLoss:
    """
    Class that implements the Cross entropy loss function.
    Given a target math:`y` and an estimate math:`\hat{y}` the 
    Cross Entropy loss can be written as:
    .. math::
        \begin{aligned}
            l_{\hat{y}, class} = -\log\left(\frac{\exp(\hat{y_n}[class])}{\sum_j \exp(\hat{y_n}[j])}\right), \\
            L(\hat{y}, y) = \frac{\sum^{N}_{i=1} l_{i, class[i]}}{\sum^{N}_{i=1} weight_{class[i]}},
        \end{aligned}
    References
    ----------
    .. [1] Wikipedia - Cross entropy:
       https://en.wikipedia.org/wiki/Cross_entropy    
    """
    def __init__(self):
        self.type = 'CELoss'
        self.eps = 1e-15
    
    def forward(self, Y_hat, Y):
        """
        Computes the forward propagation.
        Parameters
        ----------
        Y_hat : numpy.array
            Array containing the predictions.
        Y : numpy.array
            Array with the real labels.
        
        Returns
        -------
        Numpy.arry containing the cost.
        """
        self.Y = Y
        self.Y_hat = Y_hat

        _loss = - Y * np.log(self.Y_hat)
        loss = np.sum(_loss, axis=0).mean()

        return np.squeeze(loss) 

    def backward(self):
        """
        Computes the backward propagation.
        Returns
        -------
        grad : numpy.array
            Array containg the gradients of the weights.
        """
        grad = self.Y_hat - self.Y
        
        return grad
    def loss(self, Y):
        """
        Computes the Cross Entropy Loss for the predicted values.
        Parameters
        ----------
        Y : numpy.array or list
            List containing the real labels to predict.
        Returns
        -------
        cost : int
            Cost of the given model.
        """
        self.Y = Y
        self.layers_loss = [CrossEntropyLoss() for y in self.Y]
        cost = 0
        
        for y_pred, y, layer in zip(self.y_preds, self.Y, self.layers_loss):
            cost += layer.forward(y_pred, y)
        
        return cost

The backward propagation

Also known as backpropagation through time is always the most difficult part. Using the chain rule, the backward propagation of our model can be described with the following formulas:

Loss function derivative.

$$
\frac{\partial J}{\partial y} = (Y_{pred} – Y)
$$

Weights and bias derivative for the y_{pred}.

$$
\frac{\partial J}{\partial W_{y, a}} =
\frac{\partial y}{\partial W_{y, a}} *
\frac{\partial J}{\partial y} = a *
\frac{\partial J}{\partial y}
$$

$$
\frac{\partial J}{\partial b_{y}} = \frac{\partial J}{\partial y}
$$

Derivative of the hidden state.

$$
\frac{\partial J}{\partial a} =
\frac{\partial y}{\partial a} *
\frac{\partial J}{\partial y} = W_{y, a} *
\frac{\partial J}{\partial y} +
\frac{\partial J}{\partial a_{t+1}}
$$

$$
\frac{\partial J}{\partial tanh(x, a)} = (1 – a^{2}) *
\frac{\partial J}{\partial a}
$$

Gradients of the weights and bias to compute the hidden state.

$$
\frac{\partial J}{\partial W_{x, a}} =
X *
\frac{\partial J}{\partial tanh(x, a)}
$$

$$
\frac{\partial J}{\partial W_{a, a}} =
a_{t-1} *
\frac{\partial J}{\partial tanh(x, a)}
$$

    def backward(self):  
        """
        Computes the backward propagation of the model.
        Defines and updates the gradients of the parameters to used
        in order to actulized the weights.
        """
        gradients = self._define_gradients()
        self.dWax, self.dWaa, self.dWya, self.db, self.dby, dhidden_next = gradients

        for index, layer_loss in reversed(list(enumerate(self.layers_loss))):
            dy = layer_loss.backward()

            # hidden actual
            hidden = self.hidden_list[index + 1]
            hidden_prev = self.hidden_list[index]

            # gradients y
            self.dWya += np.dot(dy, hidden.T)
            self.dby += dy
            dhidden = np.dot(self.Wya.T, dy) + dhidden_next
    
            # gradients a
            dtanh = self.layers_tanh[index].backward(dhidden)
            self.db += dtanh
            self.dWax += np.dot(dtanh, self.input_X[index].T)
            self.dWaa += np.dot(dtanh, hidden_prev.T)
            dhidden_next = np.dot(self.Waa.T, dtanh)

After doing the backward propagation, we will need to clip the gradients in order to avoid the problem of exploding gradient.

    def clip(self, clip_value):
        """
        Clips the gradients in order to avoisd the problem of 
        exploding gradient.
        Parameters
        ----------
        clip_value : int
            Number that will be used to clip the gradients.
        """
        for gradient in [self.dWax, self.dWaa, self.dWya, self.db, self.dby]:
            np.clip(gradient, -clip_value, clip_value, out=gradient)

Finally, we will only need to optimize the parameters using the stochastic gradient descent (SGD), similar to the previous post, to obtain the new weights and compute again all the steps.

Test our model

So, it’s time to check if our model works and can create new (or real) names!

The full code of the example can be found in this notebook.

To train our model we will be using the dataset person_names, which contains around 19.000 names.

person_names = open('person_names.txt', 'r').read()
person_names= person_names.lower()
characters = list(set(person_names))

character_to_index = {character:index for index,character in enumerate(sorted(characters))}
index_to_character = {index:character for index,character in enumerate(sorted(characters))}

with open("person_names.txt") as f:
    person_names = f.readlines()

person_names = [name.lower().strip() for name in person_names]
np.random.shuffle(person_names)

Note that since there are 26 words in the alphabet and we add the character ‘\n’ to denote the end of the word, our input dimension will have a dimension of \(N x 27\) where \(N\) are the number of letters in the word and 27 a one-hot encoded vector indicating the position of the letter in the alphabet. The same applies to the output dimension.

I have set our hidden state to have a dimension of \( 50×50 \)

num_epochs = 100001
input_dim = 27
output_dim = 27
hidden_dim = 50

# initialize and define the model hyperparamaters
model = RNNModel(input_dim, output_dim, hidden_dim)
optim = SGD(lr=0.01)
costs = []

Train the model and generate some random names every 4.000 epochs.

# Training
for epoch in range(num_epochs):
    
    # create the X inputs and Y labels
    index = epoch % len(person_names)
    X = [None] + [character_to_index[ch] for ch in person_names[index]] 
    Y = X[1:] + [character_to_index["\n"]]

    # transform the input X and label Y into one hot enconding.
    X = one_hot_encoding(X, input_dim)
    Y = one_hot_encoding(Y, output_dim)
    
    # steps of the model
    model.forward(X)
    cost = model.loss(Y)
    model.backward()
    # clip gradients
    model.clip(clip_value=1)
    # optimize
    model.optimize(optim)

    if epoch % 10000 == 0:
        print ("Cost after iteration %d: %f" % (epoch, cost))
        costs.append(cost)

        print('Names created:', '\n')
        for i in range(4):
            name = model.generate_names(index_to_character)
            print(name)
Cost after iteration 0: 22.388388
Names created: 

wiibjnaswxngbih
msprglszcxepjjg
ntlipdryxt
minjhhsixfxkfo
Cost after iteration 40000: 16.226669
Names created: 

maryan
mily
seryna
shaques

Not bad at all! The names that the model outputs after iteration 40.000 seem quite fine to me. As we can see, the model has improved the generation of names since its first iteration.

Conclusions

In today’s post, we have built a RNN many-to-many model from scratch and then tried to create new names. In future posts, we could try to create more complex models such as the LSTM.

I hope you enjoyed it! Thanks for reading and see you in the next post.

References

0 Comments
Inline Feedbacks
View all comments