Skip to content

Draft Design doc about Supporting PyTorch

Kelang edited this page Aug 3, 2020 · 1 revision

Supporting PyTorch

Introduction

ElasticDL is an open-source distributed deep learning programming framework based on TensorFlow and Kubernetes. Considering that PyTorch is more widely used in academia, this project will support PyTorch.

How PyTorch is Used

The training of most neural networks can be simplified to this process:

  1. Define the network: Define the Class of the network, declare the instance of the network net=Net().
  2. Define the optimizer: optimizer=optim.xxx(net.parameters(),lr=xxx)
  3. Define the loss function: compute_loss=nn.MSELoss()
  4. training loop: (a) Clear the gradient information in the optimizer: optimizer.zero_grad() (b) Forward: output=net(input) (c) Calculate the loss: loss=compute_loss(target,output) (d) Backward: loss.backward() (e) Update parameters: optimizer.step()

Define a Class

If you want to build a network, you need to define a Class first which inherits nn.Module. nn is a very useful toolbox, import torch.nn as nn is needed. For example, there are mainly two functions written in this class called Net, one is the initialized __init__ function, and the other is the forward function.

class Net(nn.Module):
    def __init__(self):
        super(CNN, self).__init__()
        self.conv1 = nn.Sequential(         # input shape (1, 28, 28)
            nn.Conv2d(
                in_channels=1,              # input height
                out_channels=16,            # n_filters
                kernel_size=5,              # filter size
                stride=1,                   # filter movement/step
                padding=2,                  # if want same width and length of this image after Conv2d, padding=(kernel_size-1)/2 if stride=1
            ),                              # output shape (16, 28, 28)
            nn.ReLU(),                      # activation
            nn.MaxPool2d(kernel_size=2),    # choose max value in 2x2 area, output shape (16, 14, 14)
        )
        self.conv2 = nn.Sequential(         # input shape (16, 14, 14)
            nn.Conv2d(16, 32, 5, 1, 2),     # output shape (32, 14, 14)
            nn.ReLU(),                      # activation
            nn.MaxPool2d(2),                # output shape (32, 7, 7)
        )
        self.out = nn.Linear(32 * 7 * 7, 10)   # fully connected layer, output 10 classes

    def forward(self, x):
        x = self.conv1(x)
        x = self.conv2(x)
        x = x.view(x.size(0), -1)           # flatten the output of conv2 to (batch_size, 32 * 7 * 7)
        output = self.out(x)
        return output
net = Net()

__init__ is the definition of the convolutional layer, and super() must be executed first to initialize the parent class nn.Module.forward is the real execution of the data flow. For example, in the above code, the input x passes through the defined conv1 first and then passes through the activation function F.relu().In the beginning, you should import torch.nn.functional as F. F.relu() is an official function. If you define relu as myrelu in __init__, then your first sentence will be x=F.max_pool2d(myrelu(self.conv1(x)),2). After a series of flows, return x to the outside.

Input with DataLoader

PyTorch has an abstract Dataset class. Thistutorial walks through a nice example of creating a custom FacialLandmarkDataset class as a subclass of Dataset.

PyTorch’s TensorDataset is a Dataset wrapping tensors. torch.utils.data.DataLoader is an iterator that provides all these features.

  • Batching the data
  • Shuffling the data
  • Load the data in parallel using multiprocessing workers.
# Mnist digits dataset
train_data = torchvision.datasets.MNIST(
    root='./mnist/',
    train=True,                                     # this is training data
    transform=torchvision.transforms.ToTensor(),    # Converts a PIL.Image or numpy.ndarray to
                                                    # torch.FloatTensor of shape (C x H x W) and normalize in the range [0.0, 1.0]
    download=DOWNLOAD_MNIST,
)

Training

Next, we input train_data into neural network and get output by forward(), and finally calculate the error. The code below omits the part of calculating the accuracy. If you want to take a closer look at the accuracy code, please go to me see all the code on github.

optimizer = torch.optim.Adam(cnn.parameters(), lr=LR)   # optimize all cnn parameters
loss_func = nn.CrossEntropyLoss()   # the target label is not one-hotted

# training and testing
for epoch in range(EPOCH):
    for step, (b_x, b_y) in enumerate(train_loader):   # 分配 batch data, normalize x when iterate train_loader
        output = cnn(b_x)               # cnn output
        loss = loss_func(output, b_y)   # cross entropy loss
        optimizer.zero_grad()           # clear gradients for this training step
        loss.backward()                 # backpropagation, compute gradients
        optimizer.step()                # apply gradients

If you want the output of the neural network to be similar to the ground truth you expect, that is to keep reducing the difference between the two. This difference is defined by you, which is the object function or loss function. If the loss function approaches 0, then the goal is naturally achieved.

How to make ElasticDL work with PyTorch

The master process of ElasticDL uses asynchronous or synchronous SGD methods to coordinate workers for training. When using asynchronous SGD method, the master will start a high-performance parameter server for each worker to use. When using synchronous SGD, ElasticDL uses Kubernetes-native's fault-tolerable AllReduce implementation.

ElasticDL holds master-worker architecture. The master node plays the master role in two aspects.

  1. It's the master of the cluster.
  2. It's the master of the model training/evaluation/prediction process.

1. Simple and Standardized Model Method

In a distributed deep learning system, several workers need to be started and monitored. It is necessary to split the training data to the workers and to update the model by integrating the gradients calculated by each worker, which involves communication and synchronization. Fault-tolerant design is also an issue that must be considered.

A common solution is to provide a distributed programming framework for jobs so that users only need to fill in the business logic like cloze, and distributed computing processes such as communication, synchronization, and fault tolerance are completed by the code of the framework.

After completing the definition of class, the user is required to fill in the forward, loss, optimizer and feedfunctions. forward defines the forward calculation process of deep learning.The back propagation process is automatically derived by PyTorch.The loss function returns the loss function used during model training. The optimizer function returns the optimizer used during model training.feed customizes the conversion process of training data to PyTorch model input.

The programming of all these functions only requires knowledge of the PyTorch API, and no background knowledge of distributed training is required. After writing, users can use small data for debugging and verification on a single machine. If it passes, it can be submitted to the Kubernetes cluster for distributed fault-tolerant large-scale training without any code modification.

The specific model building method can refer to this mnist_subclass.py.

2. Load Data from Task

ElasticDL introduces a master process for each job. By calling the Kubernetes API, the master process understands the cluster situation. The data is distributed by the master.dynamic_data_sharding.md

  1. A worker get a task from the master.
  2. A worker reads real data according to the offset in the task feed customizes the conversion process of training data to PyTorch model input.

TODO: Make DataLoader works with task, more details will be added.

There is a tutorial about feed in TensorFlow.

3. Transmission of Gradient Information

A task received by an ElasticDL worker usually includes multiple minibatches. For each task, the worker opens the corresponding file or table, and then:

  1. Get a mini-batch training data.
  2. Call the user-defined forward function with the local model as a parameter to calculate the cost. If the model is large, some parameters may come from the parameter server.
  3. The worker performs backward calculations to obtain the gradient.
  4. In synchronous SGD, the worker calls AllReduce to implement FTlib to synchronize gradients and update the model. In asynchronous SGD, the worker uploads gradients to the parameter server from time to time, and also obtains global model parameters from the parameter server from time to time.
while (True):
    task = get_task()
    dataset = create_dataset(task)
    for minibatch in dataset:
        pull_parameters()
        forward()
        backward()
        push_gradients()

Gradient information acquisition

The advanced API in PyTorch such as torch.optim is not available,we had to update the value of each parameter by name, and manually zero the gradient of each parameter.

torch.no_grad() context is necessary because we don't want to record these operations in the next gradient calculation. To go further, we can use model.parameters() and model.zero_grad() (defined by PyTorch for nn.Module) to make these steps more concise, and there will be no errors of forgetting some parameters, especially when we build a complex model:

with torch.no_grad():
    for param in model.parameters(): 
        param -= param.grad * lr
    model.zero_grad()

Work with Parameter Server Client

This document describes the design of a distributed parameter server for ElasticDL.

Model Parameter Access from Worker

Each PS pod has a RPC server to provide RPC services. Workers use RPC services to pull model parameters. pull_variable service is to pull all non-embedding parameters. pull_embedding_vector service is to pull embedding vectors specified by an embedding layer name and a list of discrete IDs.

service PServer{
    rpc pull_variable(PullModelRequest) returns (PullModelResponse);
    rpc pull_embedding_vector(PullEmbeddingVectorRequest) returns (Tensor);
}

Model Parameter Update

A worker computes gradients in each training iteration, which contain gradients for non-embedding parameters and some embedding vectors if applicable. The worker partitions these gradients using their corresponding parameter names or discrete IDs for embedding vectors. Then the worker sends gradient partitions to their corresponding PS pods by RPC calls push_gradient.

service PServer{
    rpc push_gradient(PushGradientRequest) returns (PushGradientResponse);
}

When a PS pod receives gradients in push_gradient, it uses a PyTorch optimizer to apply gradients to non-embedding parameters.

import os
import torch
import torch.nn as nn
import torch.utils.data as Data
import torchvision
import matplotlib.pyplot as plt

# torch.manual_seed(1)    # reproducible
EPOCH = 1               # train the training data n times, to save time, we just train 1 epoch
BATCH_SIZE = 50
LR = 0.001              # learning rate
DOWNLOAD_MNIST = False


# Mnist digits dataset
if not(os.path.exists('./mnist/')) or not os.listdir('./mnist/'):
    # not mnist dir or mnist is empyt dir
    DOWNLOAD_MNIST = True

train_data = torchvision.datasets.MNIST(
    root='./mnist/',
    train=True,                                     # this is training data
    transform=torchvision.transforms.ToTensor(),    # Converts a PIL.Image or numpy.ndarray to
                                                    # torch.FloatTensor of shape (C x H x W) and normalize in the range [0.0, 1.0]
    download=DOWNLOAD_MNIST,
)

# plot one example
print(train_data.train_data.size())                 # (60000, 28, 28)
print(train_data.train_labels.size())               # (60000)
plt.imshow(train_data.train_data[0].numpy(), cmap='gray')
plt.title('%i' % train_data.train_labels[0])
plt.show()

# Data Loader for easy mini-batch return in training, the image batch shape will be (50, 1, 28, 28)
train_loader = Data.DataLoader(dataset=train_data, batch_size=BATCH_SIZE, shuffle=True)

# pick 2000 samples to speed up testing
test_data = torchvision.datasets.MNIST(root='./mnist/', train=False)
test_x = torch.unsqueeze(test_data.test_data, dim=1).type(torch.FloatTensor)[:2000]/255.   # shape from (2000, 28, 28) to (2000, 1, 28, 28), value in range(0,1)
test_y = test_data.test_labels[:2000]


class CNN(nn.Module):
    def __init__(self):
        super(CNN, self).__init__()
        self.conv1 = nn.Sequential(         # input shape (1, 28, 28)
            nn.Conv2d(
                in_channels=1,              # input height
                out_channels=16,            # n_filters
                kernel_size=5,              # filter size
                stride=1,                   # filter movement/step
                padding=2,                  # if want same width and length of this image after Conv2d, padding=(kernel_size-1)/2 if stride=1
            ),                              # output shape (16, 28, 28)
            nn.ReLU(),                      # activation
            nn.MaxPool2d(kernel_size=2),    # choose max value in 2x2 area, output shape (16, 14, 14)
        )
        self.conv2 = nn.Sequential(         # input shape (16, 14, 14)
            nn.Conv2d(16, 32, 5, 1, 2),     # output shape (32, 14, 14)
            nn.ReLU(),                      # activation
            nn.MaxPool2d(2),                # output shape (32, 7, 7)
        )
        self.out = nn.Linear(32 * 7 * 7, 10)   # fully connected layer, output 10 classes

    def forward(self, x):
        x = self.conv1(x)
        x = self.conv2(x)
        x = x.view(x.size(0), -1)           # flatten the output of conv2 to (batch_size, 32 * 7 * 7)
        output = self.out(x)
        return output, x    # return x for visualization


cnn = CNN()
print(cnn)  # net architecture

# following function (plot_with_labels) is for visualization, can be ignored if not interested
from matplotlib import cm
try: from sklearn.manifold import TSNE; HAS_SK = True
except: HAS_SK = False; print('Please install sklearn for layer visualization')

def plot_with_labels(lowDWeights, labels):
    plt.cla()
    X, Y = lowDWeights[:, 0], lowDWeights[:, 1]
    for x, y, s in zip(X, Y, labels):
        c = cm.rainbow(int(255 * s / 9)); plt.text(x, y, s, backgroundcolor=c, fontsize=9)
    plt.xlim(X.min(), X.max()); plt.ylim(Y.min(), Y.max()); plt.title('Visualize last layer'); plt.show(); plt.pause(0.01)

plt.ion()


optimizer = torch.optim.Adam(cnn.parameters(), lr=LR)   # optimize all cnn parameters

loss_func = nn.CrossEntropyLoss()                       # the target label is not one-hotted
learning_rate = 1e-4

# training and testing
for epoch in range(EPOCH):
    for step, (b_x, b_y) in enumerate(train_loader):   # gives batch data, normalize x when iterate train_loader

        output = cnn(b_x)[0]               # cnn output
        loss = loss_func(output, b_y)   # cross entropy loss

        # print(epoch, loss.item())
        # optimizer.zero_grad()           # clear gradients for this training step
        # loss.backward()                 # backpropagation, compute gradients
        # optimizer.step()                # apply gradients

        cnn.zero_grad()
        loss.backward()
        with torch.no_grad():
            for param in cnn.parameters():
                param -= learning_rate * param.grad

        if step % 50 == 0:
            test_output, last_layer = (test_x)
            pred_y = torch.max(test_output, 1)[1].data.numpy()
            accuracy = float((pred_y == test_y.data.numpy()).astype(int).sum()) / float(test_y.size(0))
            print('Epoch: ', epoch, '| train loss: %.4f' % loss.data.numpy(), '| test accuracy: %.2f' % accuracy)
            if HAS_SK:
                # Visualization of trained flatten layer (T-SNE)
                tsne = TSNE(perplexity=30, n_components=2, init='pca', n_iter=5000)
                plot_only = 500
                low_dim_embs = tsne.fit_transform(last_layer.data.numpy()[:plot_only, :])
                labels = test_y.numpy()[:plot_only]
                plot_with_labels(low_dim_embs, labels)
plt.ioff()

# print 10 predictions from test data
test_output, _ = cnn(test_x[:10])
pred_y = torch.max(test_output, 1)[1].data.numpy()
print(pred_y, 'prediction number')
print(test_y[:10].numpy(), 'real number')