The Variational Autoencoder (VAE) is an algorithm for inference and learning in a latent variable generative model. In it’s simplest form, it’s an unsupervised learning algorithm and like normalizing flows, the generative model can be used to create new examples similar to the data set. However, unlike normalizing flows, the generative model is not invertible and so it’s not so straightforward to train the model using maximum likelihood.



The VAE uses the principle of variational inference to approximate the posterior distribution by defining a parameterized family of distributions conditioned on a data example, and then maximizing a lower bound on the marginal likelihood. This is the evidence lower bound (ELBO), which we also used when we looked at the Bayes-by-Backprop algorithm.

Standard Autoencoders

A standard autoencoder can be viewed as a compression algorithm, it is not a probabilistic model and it isn’t modeling the underlying data distribution. You can easily implement it using standard Karas tools without any of the modules from Tensorflow Probability.

import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Flatten, Dense, Reshape

autoencoder = Sequential([
  Flatten(input_shape=(28, 28)),
  Dense(256, activation='sigmoid'),
  Dense(64, activation='sigmoid'),
  Dense(2, activation='sigmoid'),
  Dense(64, activation='sigmoid'),
  Dense(256, activation='sigmoid'),
  Dense(784, activation='sigmoid'),
  Reshape((28, 28))
])

opt = tf.keras.optimizers.RMSprop(learning_rate=0.0005)
autoencoder.compile(loss='mse', optimizer=opt)
autoencoder.fit(x_train, x_train, epochs=20)

The autoencoder has a bottleneck architecture. The width of each of the dense layers is decreasing at first all the way down to the middle bottleneck layer. Then the network starts to widen out again afterwards, until the final layer is the same size and shape as the input layer

When we train the autoencoder, we feed it the same data examples for the input as for the output. This is an unsupervised learning algorithm, as we are not using any labels. The autoencoder is being trained to reconstruct the data example that it receives as the output of the network.

The bottleneck architecture forces the network to learn a compressed representation which captures the main features of the data so that it’s able to approximately reconstruct the data example from this compressed representation.

In practice, we’re going to find it useful to separate out the construction of the encoder and decoder parts of the network architecture:

  1. The encoder network takes a batch of data examples as input and returns a batch of latent codes Dense(2, ...) that compress those examples.
  2. The decoder network takes a batch of latent codes input_shape=(2,) and decompresses them. The output of the decoder network is 28 by 28, the same size as the inputs to the encoder.
encoder = Sequential([
  Flatten(input_shape=(28, 28)),
  Dense(256, activation='sigmoid'),
  Dense(64, activation='sigmoid'),
  Dense(2, activation='sigmoid')
])

decoder = Sequential([
  Dense(64, activation='sigmoid', input_shape=(2,)),
  Dense(256, activation='sigmoid'),
  Dense(784, activation='sigmoid'),
  Reshape((28, 28))
])

autoencoder = Model(inputs=encoder.input, outputs=decoder(encoder.output))

Remember though that the standard autoencoder isn’t a generative model, it hasn’t been trained to learn the data distribution.



Minimizing Kullback-Leibler Divergence

In variational autoencoder algorithm, the objective function that we want to maximize is the ELBO (evidence lower bound). This objective function can be expressed as below:

ELBO = Σz~qφ log pθ (x | z) - DKL( qφ(z | x) || pθ(z) )

The first term is the expected reconstruction loss, the second term is the KL divergence from the prior distribution pθ to the estimated posterior qφ, which is a measure of the discrepancy between the two distributions.

DKL( q || p ) = Ez~q ( log q(z) - log p(z) )

KL divergence is always non negative, 0 if and only if the two distributions p and q are equal almost everywhere. It isn’t symmetric, if you swap p and q round in this expression, you’ll get a different quantity. KL divergence isn’t a metric on the space of probability distributions.

Let’s see an example, suppose the first distribution p is a full covariance two-dimensional Gaussian distribution, implemented using MultivariateNormalTriL distribution. The scale_tril argument should be the lower triangular matrix that comes from the Cholesky decomposition of the full covariance matrix.

Suppose the second distribution is an isotropic Gaussian with zero mean. Computing the KL divergence between distributions q and p is easy. The kl_divergence() function from the distribution’s module of TensorFlow Probability will compute the analytic expression for the KL divergence between q and p, assuming an analytic expression exists.

scale_tril = tfb.FillScaleTriL()([-0.5, 1.25, 1.])
# tf.Tensor(
# [[1.3132716 0.]
#  [1.25           0.474087]], shape=(2, 2), dtype=float32)

p = tfb.MultivariateNormalTriL(loc=0, scale_tril=scale_tril)

q = tfb.MultivariateNormalDiag(loc=[0., 0,])

tfd.kl_divergence(q, p)
# tf.Tensor(0.4280361, shape=(), dtype=float32)

The shape of the result reduces out the event_shape of the two distributions. If p and q are batched distributions, then the result would be the KL divergence for each distribution in the batch.

KL divergence is also used as an objective function to train the parameterize distribution qΟ†. The distribution p is the same as before, think of it as being the target density pΞΈ that we’re trying to approximate. However qΟ† is the distribution that we want to learn. The parameters of this distribution Ο† (including the mean ΞΌ and the standard deviation Οƒ) has been set to be learnable TensorFlow variables.

q = tfd.MultivariateNormalDiag(
  loc=tf.Variable(tf.random.normal([2])),  # isotropic Gaussian of dim 2
  scale_diag=tfp.util.TransformedVariable(  # associates a variable with a bijector
    tf.random.uniform([2]), bijector=tfb.Exp()  # ensures positive
  )
)

# training loop
@tf.function
def loss_and_grads(q_dist):
  with tf.GradientTape() as tape:
    loss = tfd.kl_divergence(q_dist, p)
  return loss, tape.gradient(loss, q_dist.trainable_variables)

opt = tf.keras.optimizers.Adam()
for i in range(num_train_steps):
  loss, grads = loss_and_grads(q)
  opt.apply_gradients(zip(grads, q.trainable_variables))

After a training run like this, the distribution q will try to match the distribution p as closely as possible.

You’ll notice that the target density p is a full covariance Gaussian distribution, whereas the approximating distribution q is a diagonal Gaussian. It’s impossible for q to perfectly match p, because the parameterized family of distributions, that q comes from, doesn’t contain p. This is a very common situation when applying the principle of variational inference, where we choose a parameterized family of distributions to approximate the posterior. We’ll see this in the variational autoencoder.

The KL divergence encourages the support of the distribution q to lie within the support of the distribution p. But that doesn’t mean that’s a perfect match, because there can still be regions where p is positive but q is 0. The KL divergence ignores these regions, as the expectation is taken over q. This intuitive explanation hopefully gives you some insight into why variational inference often leads to an underestimate of the variance of the posterior distribution q.



Bijector FillScaleTriL()

The bijector FillScaleTriL() makes the lower triangular matrix for the MultivariateNormalTriL distribution. The length of the vector that you pass in needs to correspond with the number of elements in the lower triangular matrix. Behind the scene, what FillScaleTriL() is doing:

  1. takes the elements of the vector you pass in, stores them in the entries of a lower triangular matrix.
  2. makes sure that the diagonal elements are positive.
tfb.Chain([
  tfb.TransformDiagonal(  # makes diagonal elements are positive
    tfb.Chain([
      tfb.Shift(1e-5),  # ensures all entries are bounded away from 0
      tfb.Softplus()  # maps real line to the positive reals
  ])),
  tfb.FillTriangular()  # stores vector in a lower triangular matrix
])

FillScaleTriL() bijector is a useful way to initialize or randomize a covariance matrix, which needs to be symmetric and positive definite, so we can’t just directly randomize the elements of a general matrix. Instead, we can randomize a vector, and then apply the FillScaleTriL() bijector.

Variational Autoencoder Algorithm

First let us recap the main background points for the variational autoencoder. The modeling assumption is that the data examples are generated according to the following process:

  1. We first sample a latent variable z, which we assume is distributed according to a simple distribution (like a zero mean isotropic Gaussian). This is the prior distribution.
  2. This sample z is then transformed by a decoder, which parametrizes a distribution from which we can sample a data point x.
  3. An encoder (recognition network) is also required. It is usually denoted as q(z | x). This encode network takes a data example x as input, and outputs the distribution over the latent variable z. This distribution is an approximation to the true posterior distribution of our model p(z | x).
  4. The objective that we want to maximize is the ELBO, which is a lower bound on the log evidence of our model. This objective can be written as the sum of two terms:
    • The first is a negative KL divergence from the prior p(z) to the approximate posterior given by the encoder q(z | x).
    • The second term is the expected log likelihood p(x | z) under the approximate posterior distribution of the encoder q(z | x).
1. the prior distribution
z ~ N(0, I) = p(z)

2. the transform of latent variable z
p(x | z) = decoder(z)
x ~ p(x | z)

3. the recognition network
encoder(x) = q(z | x) β‰ˆ p(z | x)

4. the objective
log p(x) β‰₯ Ez~q(z | x) [-log q(z | x) + log p(x, z)]
= -KL( q(z | x) || p(z) ) + Ez~q(z | x) [log p(x | z)] := ELBO

Encoder Network

Overall, you can see how the encoder network is compressing the input progressively through the layers, until it’s encoded with a two dimensional latent variable. What this encoder network is doing is if we passed in a batch of data examples of batch size 16, then the encoder network will return a MultivariateNormalDiag distribution object with batch_shape of 16 and an event_shape of 2.

latent_size = 2
event_shape = (28, 28, 1)

encoder = Sequential([
  Conv2D(8, (5, 5), strides=2, activation="tanh", input_shape=event_shape),
  Conv2D(8, (5, 5), strides=2, activation="tanh"),
  Flatten(),
  Dense(64, activation="tanh"),
  Dense(2 * latent_size),  # used to parameterize the approximate posterior dist
  tfpl.DistributionLamba(lambda t: tfd.MultivariateNormalDiag(
    loc=t[..., :latent_size],  # slices of the input Dense layer
    scale_diag=tf.math.exp(t[..., latent_size:])  # slices of the input Dense layer
  ))
])

encoder(x_train[:16])
# tfp.distributions.MultivariateNormalDiag(
#  "sequential_distribution_lambda_MultivariateNormalDiag",
#  batch_shape=[16], event_shape=[2], dtype=float32)


Decoder Network

Overall the decoder network is expanding its input progressively through the layers. The decoder expects a batch of inputs of length two, which is the size of the latent variable.

decoder = Sequential([
  Dense(64, activation="tanh", input_shape=(latent_size,))
  Dense(128, activation="tanh"),
  Reshape((4, 4, 8)),
  Conv2DTranspose(8, (5, 5), strides=2, output_padding=1, activation='tanh'),
  Conv2DTranspose(8, (5, 5), strides=2, output_padding=1, activation='tanh'),
  Conv2D(1, (3, 3), padding='SAME'),
  Flatten(),
  tfpl.IndependentBernoulli(event_shape)
])

decoder(tf.random.normal([16, latent_size]))
# tfp.distributions.Independent(
#  "...",
#  batch_shape=[16], event_shape=[28, 28, 1], dtype=float32)

These transpose convolution layers Conv2DTranspose() are essentially the reverse of the Conv2D() layers in the encoder. If the argument output_padding is used, the value passed in must be less than the value used for the strides and it will affect the size of the output of the Conv2DTranspose layer. So by this point, we’ve up sample our input back to the spatial dimensions of the data.

Next a regular Conv2D layer is used to reduce the filters down to 1. This layer doesn’t have an activation function, so the activation can again have any real value.

Then a Flatten layer is used before an IndependentBernoulli probabilistic layer with the same event_shape as the data. This distribution will take the input tensor from the Flatten layer as the logits to parameterize these Bernoulli distributions.

Prior Distribution

We’ve now defined our encoder and decoder networks, but we still need to define the prior distribution which we assumed to be a zero mean Gaussian with identity covariance matrix, i.e. z ~ N(0, I) = p(z).

latent_size = 2
prior = tfd.MultivariateNormalDiag(loc=tf.zeros(latent_size))
# tfp.distributions.MultivariateNormalDiag(
#  "...", batch_shape=[], event_shape=[2], dtype=float32)

Loss Function

We want to maximize the ELBO objective, let’s define our loss function as the negative of the ELBO objective, so we’ll want to minimize our loss function.

def loss_fn(x_true, approx_posterior, x_pred, prior_dist):
  return tf.reduce_mean(
    tfd.kl_divergence(approx_posterior, prior_dist) - x_pred.log_prob(x_true)
  )
  • x_true will be a batch of data examples.
  • approx_posterior is the output of the encoder.
  • x_pred is the output of the decoder.
  • prior_dist is the prior distribution.

We could also write our loss function to compute the KL divergence using Monte Carlo samples instead of analytically with the kl_divergence function. Depending on your choice of prior and posterior, you might need to use Monte Carlo samples for the KL divergence term.

def loss_fn(x_true, approx_posterior, x_pred, prior_dist):
  reconstruction_loss = - x_pred.log_prob(x_true)

  approx_posterior_sample = approx_posterior.sample()  # or use multiple samples
  kl_approx = (approx_posterior.log_prob(approx_posterior_sample)
    - prior_dist.log_prob(approx_posterior_sample))

  return tf.reduce_mean(kl_approx + reconstruction_loss)

Training Loop

@tf.function
def get_loss_and_grads(x):
  with tf.GradientTape() as tape:
    approx_posterior = encoder(x)
    approx_posterior_sample = approx_posterior.sample()
    x_pred = decoder(approx_posterior_sample)
    current_loss = loss_fn(x, approx_posterior, x_pred, prior)

  grads = tape.gradient(current_loss,
    encoder.trainable_variables + decoder.trainable_variables)

  return current_loss, grads

opt = tf.keras.optimizers.Adam()
for epoch in range(num_epochs):
  for train_batch in train_data:
    loss, grads =  get_loss_and_grads(train_batch)
    opt.apply_gradients(zip(grads,
      encoder.trainable_variables + decoder.trainable_variables))


How to Use Trained Networks

Remember our starting point – the generative model where a latent variable z is sampled from a prior distribution and pass through the decoder. To get a distribution p(x | z) that we can sample from to produce a data example x.

To implement the generative model, we just sample z from the prior, pass it through the decoder and sample to get a data point x.

z = prior.sample(1)  # input 1 as batch size, output shape is (1, 2)
x = decoder(z).sample()  # (1, 28, 28, 1)

Also, we can use the encoder network to encode a batch of sample data points into the latent space. The output of the encoder is a distribution object, which defines a batch distribution over the latent variable.

x_encoded = encoder(x_sample)

If x_sample here had a batch size of one, then the output distribution object will have a batch_shape of 1 and an event_shape of 2.

We can also define the end to end bottleneck architecture:

def vae(inputs):
  approx_posterior = encoder(inputs)
  decoded = decoder(approx_posterior.sample())
  return decoded.sample()

reconstruction = vae(x_sample)  # (1, 28, 28, 1)

KL Divergence Layers

KLDiverganceAddLoss Layer

KL divergence loss can be added within the layers of a model. With this approach, the KL divergence loss is included as an activity regularizer in a similar way to how we have previously included weights regularization.

latent_size = 4

# four dimensional diagonal Gaussian distribution with identity covariance matrix
prior = tfd.MultivariateNormalDiag(loc=tf.zeros(latent_size))

encoder = Sequential([
  Dense(64, activation='relu', input_shape=(12,)),
  Dense(tfpl.MultivariateNormalTriL.params_size(latent_size)),  # output params
  tfpl.MultivariateNormalTriL(latent_size), # posterior dist approx
  tfpl.KLDivergenceAddLoss(prior)
])

The KLDivergenceAddLoss layer comes from the layers model of tensor flow probability. This layer doesn’t change its input in any way, but it requires the previous layer in the model to output a distribution. What it does do is add the KL divergence loss to the model, so that it automatically becomes part of the loss function to be optimized later on, when we train the model using the compile and fit methods.

This is similar to what happens when you add weight decay to a tensor flow model using the kernel_regularizer keyword argument of Dense or convolutional layers. By doing this, we now only need to take care of the reconstruction loss.

decoder = Sequential([
  Dense(64, activation='relu', input_shape=(latent_size,)),
  Dense(tfpl.IndependentNormal.params_size(12)),
  tfpl.IndependentNormal(12)
])

vae = Model(inputs=encoder.input, outputs=decoder(encoder.output))
vae.compile(loss=lambda x, pred: -pred.log_prob(x))
vae.fit(train_data, epochs=20)

So now, because the KL divergence loss term is being automatically tracked by the encoder network. We can compile the VAE model object passing in the reconstruction loss for the loss argument.



The reconstruction loss is just the negative log probability of the data example x. After it has been passed through the VAE model object, we just call the fit method to train the model. The data set train_data should generate a tuple of inputs and outputs when we iterate over it. In this case, the inputs and outputs are the same thing. The data example x.

This data example is passed as the inputs to the encoder, which returns the posterior approximation. We then sample from this distribution and pass the sample through the decoder, which then returns the model prediction which is the independent normal distribution.

KLDivergenceAddLoss layer has some options that you should know about, which give you more control over how this loss term is computed and combined with the reconstruction loss.

tfpl.KLDivergenceAddLoss(prior, weight=10, use_exact_kl=False,
  test_points_fn=lambda q: q.sample(10),
  test_points_reduce_axis=0) # other possible value: None, or empty tuple ()

weight

A factor to multiply the KL lost term before adding it to the reconstruction loss.
It’s also possible to pass in a tensor or array to have different weighting for each element of the batch.

use_exact_kl

Default value is false, the KL divergence is computed using Monte Carlo
samples from the posterior. This Monte Carlo estimate is computed by evaluating the log probability of a sample from the approximate posterior and the prior.

kl_approx = (approx_posterior.log_prob(approx_posterior_sample) - prior_dist.log_prob(approx_posterior_sample))

KLDivergenceAddLoss function extracts a tensor from the previous layer, according to the convert_to_tensor_fn function assigned to that layer. The default for this argument is the tfp.distributions.Distribution.sample method.

test_points_fn

If this is provided, then this is the function that will be used to compute the estimate.
Only used for the KL divergence estimate. It doesn’t affect the converter tensor function that is used in the output of the encoder.

test_points_reduce_axis

Specifies which axis of the KL divergence estimate tensor should be reduced out with a mean operation and which should be reduced by summing.
If you set the test_points_fn function argument to draw some samples for the estimate, then the tensor you get will have the number of samples in the first dimension and the batch size in the second.
If you set test_points_reduce_axis to be 0, then this tensor will be reduced to a scalar by averaging over the sample axis and summing over the batch axis. Other possible settings could be:

  1. None. All axes are averaged.
  2. Empty tuple (). All axes are summed over.

KLDivergenceRegularizer Object

An alternative way of implementing what we have described with the KLDivergenceAddLoss layer. We can instead pass the KLDivergenceRegularizer object to the activity_regularizer keyword arguments of the previous distribution layer itself.

encoder = Sequential([
  Dense(64, activation='relu', input_shape=(12,)),
  Dense(tfpl.MultivariateNormalTriL.params_size(latent_size)),  # output params
  tfpl.MultivariateNormalTriL(latent_size,
    activity_regularizer=tfpl.KLDivergenceRegularizer(
      prior, weight=10, use_exact_kl=False,
      test_points_fn=lambda q: q.sample(10),
      test_points_reduce_axis=0))
])

In fact, the KLDivergenceAddLoss layer is just using the KLDivergenceRegularizer class in the background. The keyword arguments for the KLDivergenceRegularizer are the same as the ones we’ve just looked at for the KLDivergenceAddLoss layer.



My Certificate

For more on Variational Autoencoders, please refer to the wonderful course here https://www.coursera.org/learn/probabilistic-deep-learning-with-tensorflow2


Related Quick Recap


I am Kesler Zhu, thank you for visiting my website. Check out more course reviews at https://KZHU.ai

Leave a Reply

Your email address will not be published. Required fields are marked *