"Open

# TP02 - VAE for MNIST clustering and generation

In this practical, we will explore Variational Auto-Encoders (VAE) with the MNIST digit recognition dataset.
The purpose of a VAE is to generate new samples "looking like" the training samples.
For this purpose, we will construct a two-block model: encoder and decoder, one takes a sample and maps it to a latent space, the second takes a latent point and maps it to the sample space. If the latent distribution is roughly gaussian, this gives a way to get new samples: take a latent point at random,
and map it to the sample space with the decoder.

In [None]:
import os
import torch
import torch.nn as nn
import torch.nn.functional as F
import torchvision
from torchvision import transforms
from torchvision.utils import save_image

import numpy as np
import matplotlib.pyplot as plt
from IPython.display import clear_output
%matplotlib inline

from sklearn.metrics.cluster import normalized_mutual_info_score

In [None]:
!pip install --quiet "git+https://gitlab.com/robindar/dl-scaman_checker.git"
from dl_scaman_checker import TP09
TP09.check_install()

In [None]:
N_CLASSES = 10
IMAGE_SIZE = 28 * 28

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
device

In [None]:
def show(img):
 plt.imshow(np.transpose(img.numpy(), (1,2,0)), interpolation='nearest')

def plot_reconstruction(model, n=24):
 x,_ = next(iter(data_loader))
 x = x[:n,:,:,:].to(device)
 try:
 out, _, _, log_p = model(x.view(-1, IMAGE_SIZE))
 except:
 out, _, _ = model(x.view(-1, IMAGE_SIZE))
 x_concat = torch.cat([x.view(-1, 1, 28, 28), out.view(-1, 1, 28, 28)], dim=3)
 out_grid = torchvision.utils.make_grid(x_concat).cpu().data
 show(out_grid)

@torch.no_grad()
def plot_generation(model, n=24):
 z = torch.randn(n, z_dim).to(device)
 out = model.decode(z).view(-1, 1, 28, 28)
 out_grid = torchvision.utils.make_grid(out).cpu()
 show(out_grid)

@torch.no_grad()
def plot_conditional_generation(model, n=8, fix_number=None):
 matrix = np.zeros((n, N_CLASSES))
 matrix[:,0] = 1

 if fix_number is None:
 final = matrix[:]
 for i in range(1, N_CLASSES):
 final = np.vstack((final,np.roll(matrix,i)))
 z = torch.randn(n, z_dim)
 z = z.repeat(N_CLASSES,1).to(device)
 y_onehot = torch.tensor(final).type(torch.FloatTensor).to(device)
 out = model.decode(z,y_onehot).view(-1, 1, 28, 28)
 else:
 z = torch.randn(n, z_dim).to(device)
 y_onehot = torch.tensor(np.roll(matrix, fix_number)).type(torch.FloatTensor).to(device)
 out = model.decode(z,y_onehot).view(-1, 1, 28, 28)

 out_grid = torchvision.utils.make_grid(out).cpu()
 show(out_grid)

In [None]:
data_dir = './data'
dataset = torchvision.datasets.MNIST(
 root=data_dir,
 train=True,
 transform=transforms.ToTensor(),
 download=True)

data_loader = torch.utils.data.DataLoader(
 dataset=dataset,
 batch_size=128,
 shuffle=True)

test_loader = torch.utils.data.DataLoader(
 torchvision.datasets.MNIST(data_dir, train=False, download=True, transform=transforms.ToTensor()),
 batch_size=10, shuffle=False)

# Part A - Variational Autoencoders

## A.1 - Autoencoding theory

Consider a latent variable model with a data variable $x\in \mathcal{X}$ and a latent variable $z\in \mathcal{Z}$, $p(z,x) = p(z)p_\theta(x|z)$. Given the data $x_1,\dots, x_n$, we want to train the model by maximizing the marginal log-likelihood:
\begin{eqnarray*}
\mathcal{L} = \mathbf{E}_{p_d(x)}\left[\log p_\theta(x)\right]=\mathbf{E}_{p_d(x)}\left[\log \int_{\mathcal{Z}}p_{\theta}(x|z)p(z)dz\right],
 \end{eqnarray*}
 where $p_d$ denotes the empirical distribution of $X$: $p_d(x) =\frac{1}{n}\sum_{i=1}^n \delta_{x_i}(x)$.

 To avoid the (often) difficult computation of the integral above, the idea behind variational methods is to instead maximize a lower bound to the log-likelihood:
 \begin{eqnarray*}
\mathcal{L} \geq L(p_\theta(x|z),q(z|x)) =\mathbf{E}_{p_d(x)}\left[\mathbf{E}_{q(z|x)}\left[\log p_\theta(x|z)\right]-\mathrm{KL}\left( q(z|x)||p(z)\right)\right].
 \end{eqnarray*}
 Any choice of $q(z|x)$ gives a valid lower bound. Variational autoencoders replace the variational posterior $q(z|x)$ by an inference network $q_{\phi}(z|x)$ that is trained together with $p_{\theta}(x|z)$ to jointly maximize $L(p_\theta,q_\phi)$.
 
The variational posterior $q_{\phi}(z|x)$ is also called the **encoder** and the generative model $p_{\theta}(x|z)$, the **decoder** or generator.

The first term $\mathbf{E}_{q(z|x)}\left[\log p_\theta(x|z)\right]$ is the negative reconstruction error. Indeed under a gaussian assumption i.e. $p_{\theta}(x|z) = \mathcal{N}(\mu_{\theta}(z), I)$ the term $\log p_\theta(x|z)$ reduces to $\propto \|x-\mu_\theta(z)\|^2$, which is often used in practice. The term $\mathrm{KL}\left( q(z|x)||p(z)\right)$ can be seen as a regularization term, where the variational posterior $q_\phi(z|x)$ should be matched to the prior $p(z)= \mathcal{N}(0, I)$.

Variational Autoencoders were introduced by [Kingma and Welling (2013)](https://arxiv.org/abs/1312.6114), see also [(Doersch, 2016)](https://arxiv.org/abs/1606.05908) for a tutorial.

## A.2 - A simple autoencoder for MNIST

There are various examples of VAE in PyTorch available [here](https://github.com/pytorch/examples/tree/master/vae) or [here](https://github.com/yunjey/pytorch-tutorial/blob/master/tutorials/03-advanced/variational_autoencoder/main.py#L38-L65). The code below is taken from this last source.

![A variational autoencoder.](https://github.com/dataflowr/notebooks/blob/master/HW3/vae.png?raw=true)

In [None]:
# VAE model
class VAE(nn.Module):
 def __init__(self, h_dim, z_dim):
 super(VAE, self).__init__()
 self.fc1 = nn.Linear(IMAGE_SIZE, h_dim)
 self.fc2 = nn.Linear(h_dim, z_dim)
 self.fc3 = nn.Linear(h_dim, z_dim)
 self.fc4 = nn.Linear(z_dim, h_dim)
 self.fc5 = nn.Linear(h_dim, IMAGE_SIZE)

 def encode(self, x):
 h = F.relu(self.fc1(x))
 return self.fc2(h), self.fc3(h)

 def reparameterize(self, mu, log_var):
 std = torch.exp(log_var/2)
 eps = torch.randn_like(std)
 return mu + eps * std

 def decode(self, z):
 h = F.relu(self.fc4(z))
 return torch.sigmoid(self.fc5(h))

 def forward(self, x):
 mu, log_var = self.encode(x)
 z = self.reparameterize(mu, log_var)
 x_reconst = self.decode(z)
 return x_reconst, mu, log_var

# Hyper-parameters
h_dim = 400
z_dim = 20
num_epochs = 15
learning_rate = 1e-3

model = VAE(h_dim=h_dim, z_dim=z_dim).to(device)
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)

Here for the loss, instead of MSE for the reconstruction loss, we take Binary Cross-Entropy. The code below is still from the PyTorch tutorial (with minor modifications to avoid warnings!).

In [None]:
@torch.no_grad()
def live_plot(data_dict, x_key=None, figsize=(7,5), title=''):
 clear_output(wait=True)
 plt.figure(figsize=figsize)
 for label, data in data_dict.items():
 if label == x_key or len(data) == 0:
 continue
 x = data_dict[x_key] if x_key is not None else np.arange(len(data))
 plt.plot(x, data, label=label, linewidth=1)
 plt.title(title)
 plt.grid(alpha=.5, which='both')
 plt.xlabel('epoch' if x_key is None else x_key)
 plt.legend(loc='center left') # the plot evolves to the right
 plt.show();

In [None]:
verbose = False
data_dict = { "epoch": [], "total loss": [], "reconstruction": [], "kl_div": [] }

for epoch in range(num_epochs):
 for i, (x, _) in enumerate(data_loader):
 # Forward pass
 x = x.to(device).view(-1, IMAGE_SIZE)
 x_reconst, mu, log_var = model(x)

 # Compute reconstruction loss and kl divergence
 # For KL divergence between Gaussians, see Appendix B in VAE paper or (Doersch, 2016):
 # https://arxiv.org/abs/1606.05908
 reconst_loss = F.binary_cross_entropy(x_reconst, x, reduction='sum')
 kl_div = - 0.5 * torch.sum(1 + log_var - mu.pow(2) - log_var.exp())

 # Backprop and optimize
 loss = reconst_loss + kl_div
 optimizer.zero_grad()
 loss.backward()
 optimizer.step()

 # Bookkeeping
 data_dict["total loss"].append(loss.item() / len(x))
 data_dict["reconstruction"].append(reconst_loss.item() / len(x))
 data_dict["kl_div"].append(kl_div.item() / len(x))
 data_dict["epoch"].append(epoch + float(1+i) / len(data_loader))


 if (i+1) % 100 == 0:
 if verbose:
 print ("Epoch[{}/{}], Step [{}/{}], Reconst Loss: {:.4f}, KL Div: {:.4f}"
 .format(epoch+1, num_epochs, i+1, len(data_loader),
 reconst_loss.item()/len(x), kl_div.item()/len(x)))
 else:
 live_plot(data_dict, x_key="epoch", title="VAE")

live_plot(data_dict, x_key="epoch", title="VAE")

## A.3 - Evaluating results

Let us see how our network reconstructs our last batch. We display pairs of original digits and reconstructed version side by side.

Observe how most reconstructed digits are essentially identical to the original version.
This means that the identity mapping has been learned well.
You should still see some blurry digits very different from the original (resample a couple times if needed).

In [None]:
plot_reconstruction(model)

Let's see now how our network generates new samples.

In [None]:
plot_generation(model)

Not great, but we did not train our network for long... That being said, we have no control of the generated digits. In the rest of this notebook, we explore ways to generates zeroes, ones, twos and so on.


As a by-product, we show how our VAE will allow us to do clustering thanks to the Gumbel VAE described below. But before that, we start by cheating a little bit...

# Part B - Cheating with the "conditional" VAE

We would like to generate samples of a given number. We have so far a model for images $X \in [0,1]^{P}$ with $P = 784$ pixels, generated as $X = f_\theta(Z)$ where $Z \sim \mathcal{N}(0,I)$ is a latent with normal distribution. In other to sample a specific digit, we will use a model
$g_\theta : \mathbb{R}^z \times \{0,\ldots,9\} \to [0,1]^P$ and sample images according to $X = g_\theta(Z, Y)$
where $Y \in \{0, \ldots, 9\}$ is a class label.

In the context of variational autoencoding, this is considered cheating,
because it uses external information (the class label) instead of learning the modes from the data.

To build the function $g$, we will simply concatenate the latent representation with
a one-hot encoding of the class. This way, we can use the above architecture with very little modification.

First, write a function transforming a label into its onehot encoding. This function will be used in the training loop (not in the architecture of the neural network!).

In [None]:
def l_2_onehot(labels, nb_digits=N_CLASSES):
 # take labels (from the dataloader) and return labels onehot-encoded
 ### your code here ###

You can test it on a batch.

In [None]:
(x,labels) = next(iter(data_loader))

In [None]:
labels

In [None]:
assert l_2_onehot(labels).shape == (*labels.shape, N_CLASSES)
l_2_onehot(labels)

Now modify the architecture of the VAE where the decoder takes as input the random code concatenated with the onehot encoding of the label, you can use `torch.cat`.

In [None]:
class VAE_Cond(nn.Module):
 def __init__(self, h_dim, z_dim):
 super(VAE_Cond, self).__init__()
 self.fc1 = nn.Linear(IMAGE_SIZE, h_dim)
 self.fc2 = nn.Linear(h_dim, z_dim)
 self.fc3 = nn.Linear(h_dim, z_dim)
 ### your code here ###

 def encode(self, x):
 h = F.relu(self.fc1(x))
 return self.fc2(h), self.fc3(h)

 def reparameterize(self, mu, log_var):
 std = torch.exp(log_var/2)
 eps = torch.randn_like(std)
 return mu + eps * std

 def decode(self, z, l_onehot):
 ### your code here ###

 def forward(self, x, l_onehot):
 ### your code here ###

Test your new model on a batch:

In [None]:
model_C = VAE_Cond(h_dim=h_dim, z_dim=z_dim).to(device)
x = x.to(device).view(-1, IMAGE_SIZE)
l_onehot = l_2_onehot(labels).to(device)
x_reconst, mu, log_var = model_C(x, l_onehot)
assert x_reconst.shape == x.shape
assert mu.shape == log_var.shape and mu.shape == (x.shape[0], z_dim)
x_reconst.shape, mu.shape, log_var.shape

Now you can modify the training loop of your network. The parameter $\beta$ will allow you to scale the KL term in your loss as explained in the [$\beta$-VAE paper](https://openreview.net/forum?id=Sy2fzU9gl) see formula (4) in the paper.

In [None]:
def train_C(model, data_loader=data_loader,num_epochs=num_epochs, beta=10., verbose=True):
 nmi_scores = []
 model.train(True)
 data_dict = { "epoch": [], "total loss": [], "reconstruction": [], "kl_div": [] }
 title = "Conditional VAE"

 for epoch in range(num_epochs):
 for i, (x, labels) in enumerate(data_loader):
 x = x.to(device).view(-1, IMAGE_SIZE)

 ### your forward code here ###

 reconst_loss = F.binary_cross_entropy(x_reconst, x, reduction='sum')
 kl_div = - 0.5 * torch.sum(1 + log_var - mu.pow(2) - log_var.exp())

 # Backprop and optimize
 loss = reconst_loss + beta * kl_div
 optimizer.zero_grad()
 loss.backward()
 optimizer.step()

 # Bookkeeping
 data_dict["total loss"].append(loss.item() / len(x))
 data_dict["reconstruction"].append(reconst_loss.item() / len(x))
 data_dict["kl_div"].append(kl_div.item() / len(x))
 data_dict["epoch"].append(epoch + float(1+i) / len(data_loader))


 if (i+1) % 100 == 0:
 if verbose:
 print("Epoch[{}/{}], Step [{}/{}], Reconst Loss: {:.4f}, KL Div: {:.4f}"
 .format(epoch+1, num_epochs, i+1, len(data_loader),
 reconst_loss.item()/len(x),
 kl_div.item()/len(x)))
 else:
 live_plot(data_dict, x_key="epoch", title=title)

 live_plot(data_dict, x_key="epoch", title=title)

In [None]:
model_C = VAE_Cond(h_dim=h_dim, z_dim=z_dim).to(device)
optimizer = torch.optim.Adam(model_C.parameters(), lr=learning_rate)

In [None]:
train_C(model_C, num_epochs=15, verbose=False)

In [None]:
plot_conditional_generation(model_C)

Here you should get nice results. Now we will avoid the use of the labels...

# Part C - No cheating with Gumbel VAE

Implement a VAE where you add a categorical variable $c\in \{0,\dots 9\}$ so that your latent variable model is $p(c,z,x) = p(c)p(z)p_{\theta}(x|c,z)$ and your variational posterior is $q_{\phi}(c|x)q_{\phi}(z|x)$ as described in this NeurIPS paper: [(Dupont, 2018)](https://arxiv.org/abs/1804.00104). Try to make only minimal modifications to previous architecture.

The idea is to incorporate a categorical variable in your latent space, without cheating by forcing it to take the value of the image's label. You hope that this categorical variable will encode the class of the digit, so that your network can use it for a better reconstruction, but can't force it. Moreover, if things work as planned, you will then be able to generate digits conditionally to the class, i.e. you can choose the class thanks to the latent categorical variable $c$ and then generate digits from this class.

## C.1 - Gumbel trick for discrete latent variables

As noticed above, in order to sample random variables while still being able to use backpropagation, we need to use the reparameterization trick which is easy for Gaussian random variables. For categorical random variables, the reparameterization trick is explained in [(Jang et al., 2016)](https://arxiv.org/abs/1611.01144). This is implemented in PyTorch thanks to [F.gumbel_softmax](https://pytorch.org/docs/stable/generated/torch.nn.functional.gumbel_softmax.html).

Note: there is an instability in the PyTorch `F.gumbel_softmax` implementation. We provide instead `TP09.gumbel_softmax` which takes the same arguments and ensures that the output is not NaN. If you encounter nans in your training or errors in the binary cross-entropy, make sure you are using the stabilized version.

In [None]:
class VAE_Gumbel(nn.Module):
 def __init__(self, h_dim, z_dim):
 super(VAE_Gumbel, self).__init__()
 ### your code here ###

 def encode(self, x):
 ### your code here ###

 def reparameterize(self, mu, log_var):
 std = torch.exp(log_var/2)
 eps = torch.randn_like(std)
 return mu + eps * std

 def decode(self, z, y_onehot):
 ### your code here ###

 def forward(self, x):
 ### your code here ###

In [None]:
model_G = VAE_Gumbel(h_dim=h_dim, z_dim=z_dim).to(device)
optimizer = torch.optim.Adam(model_G.parameters(), lr=learning_rate)

You need to modify the loss to take into account the categorical random variable with an uniform prior on $\{0,\dots 9\}$, see Appendix A.2 in [(Dupont, 2018)](https://arxiv.org/abs/1804.00104)

In [None]:
def train_G(model, data_loader=data_loader,num_epochs=num_epochs, beta = 1., verbose=True):
 nmi_scores = []
 model.train(True)
 data_dict = { "epoch": [], "total loss": [], "reconstruction": [], "kl_div": [], "entropy": [] }
 title = "Gumbel VAE"

 for epoch in range(num_epochs):
 all_labels = []
 all_labels_est = []
 for i, (x, labels) in enumerate(data_loader):
 x = x.to(device).view(-1, IMAGE_SIZE)
 ### your forward code here ###

 # Backprop and optimize
 loss = None ### your loss code here ###
 optimizer.zero_grad()
 loss.backward()
 optimizer.step()

 # Bookkeeping
 data_dict["total loss"].append(loss.item() / len(x))
 ### your bookkeeping code here ###
 data_dict["epoch"].append(epoch + float(1+i) / len(data_loader))

 if (i+1) % 100 == 0:
 if verbose:
 print ("Epoch[{}/{}], Step [{}/{}], Reconst Loss: {:.4f}, KL Div: {:.4f}, Entropy: {:.4f}"
 .format(epoch+1, num_epochs, i+1, len(data_loader),
 reconst_loss.item()/len(x),
 kl_div.item()/len(x),
 H_cat.item()/len(x)))
 else:
 live_plot(data_dict, x_key="epoch", title=title)

 live_plot(data_dict, x_key="epoch", title=title)

In [None]:
train_G(model_G, num_epochs=20, verbose=False)

In [None]:
plot_reconstruction(model_G)

The reconstruction is good, but we care more about the generation. For each category, we generate 8 samples thanks to the `plot_conditional_generation()` function.
Consistently with the previous use, each row is supposed to consist of the same digit sampled 8 times, and each row should correspond to a distinct digit.

In [None]:
plot_conditional_generation(model_G)

It does not look like our original idea is working...

What is happening is that our network is not using the categorical variable at all (all "digits" look the same, the variation comes from the re-sampling of Z). We can track the [normalized mutual information](https://en.wikipedia.org/wiki/Mutual_information#Normalized_variants) (see [this method in scikit-learn](http://scikit-learn.org/stable/modules/generated/sklearn.metrics.normalized_mutual_info_score.html)) between the true labels and the labels predicted by our network (just by taking the category with maximal probability).

Change your training loop to return the normalized mutual information (NMI) for each epoch. Plot the curve to check that the NMI is actually decreasing.

## C.2 - Robust disentangling with controlled capacity increase

This problem is explained in [(Burgess et al., 2018)](https://arxiv.org/abs/1804.03599) and a solution is proposed in Section 5.

In order to force our network to use the categorical variable, we will change the loss according to [(Dupont, 2018)](https://arxiv.org/abs/1804.00104), Section 3 Equation (7).

Implement this change in the training loop and plot the new NMI curve. Increase the $C_z$ and $C_c$ constants by a constant value every epoch until it reaches the value passed in argument.
For $\beta = 20, C_z=100, C_c=100$, you should see that NMI increases.

In [None]:
model_G = VAE_Gumbel(h_dim=h_dim, z_dim=z_dim).to(device)
optimizer = torch.optim.Adam(model_G.parameters(), lr=learning_rate)

In [None]:
def train_G_modified_loss(model, data_loader=data_loader,num_epochs=num_epochs, beta=1. , C_z_fin=0, C_c_fin=0, verbose=True):
 ### your code here ###
 return nmi_scores

In [None]:
# Hyper-parameters
num_epochs = 40
learning_rate = 1e-3
beta = 20
C_z_fin = 100
C_c_fin = 100

model_G = VAE_Gumbel(h_dim=h_dim, z_dim = z_dim).to(device)
optimizer = torch.optim.Adam(model_G.parameters(), lr=learning_rate)

nmi = train_G_modified_loss(
 model_G, data_loader,
 num_epochs=num_epochs,
 beta=beta,
 C_z_fin=C_z_fin,
 C_c_fin=C_c_fin,
 verbose=False)

In [None]:
plt.plot(nmi, '-o')
plt.xlabel("epoch")
plt.ylabel("Normalized Mutual Information")
plt.grid(alpha=.5, which='both')

In [None]:
plot_reconstruction(model_G)

In [None]:
plot_conditional_generation(model_G, fix_number=None)

In [None]:
plot_conditional_generation(model_G, fix_number=2)

In [None]:
plot_conditional_generation(model_G, fix_number=8)

## C.3 - Interpretation of learned results

Compare the generated digits from the Conditional VAE model and the modified Gumbel-VAE.
The conditional VAE produces all digits in order, which makes for a nice picture; on the other hand, the Gumbel VAE has no way of knowing the order of digits, it must just guess modes of pictures that look the same, so the order is meaningless.
Note however the variations of straight/inclined ones, are they present on the same row ?
What about shapes from the same row morphing from one digit to another, does it make sense that this was a considered a single mode by the model ? Are there distinct modes corresponding to the same digit, and if so, is it a reasonable distinction ?

In [None]:
plot_conditional_generation(model_C, fix_number=None)

In [None]:
plot_conditional_generation(model_G, fix_number=None)

---