##### Aug 22, 2016 · whitepaper

# Under the Hood of the Variational Autoencoder (in Prose and Code)

##### The Variational Autoencoder (VAE) neatly synthesizes unsupervised deep learning and variational Bayesian methods into one sleek package. In Part I of this series, we introduced the theory and intuition behind the VAE, an exciting development in machine learning for combined generative modeling and inference—“machines that imagine and reason.”

from functional import compose, partial import numpy as np import tensorflow as tf

```
<p>One perk of these models is their modularity—VAEs are naturally amenable to swapping in whatever encoder/decoder architecture is most fitting for the task at hand: <a href="https://arxiv.org/abs/1502.04623">recurrent</a> <a href="https://arxiv.org/abs/1511.06349">neural</a> <a href="https://arxiv.org/abs/1412.6581">networks</a>, <a href="https://arxiv.org/abs/1411.5928">convolutional</a> and <a href="https://arxiv.org/abs/1503.03167">deconvolutional</a> networks, etc.</p>
<p>For our purposes, we will model the relatively simple <a href="http://yann.lecun.com/exdb/mnist/">MNIST</a> dataset using densely-connected layers, wired symmetrically around the hidden code.</p>
```python
class Dense():
"""Fully-connected layer"""
def __init__(self, scope="dense_layer", size=None, dropout=1.,
nonlinearity=tf.identity):
# (str, int, (float | tf.Tensor), tf.op)
assert size, "Must specify layer size (num nodes)"
self.scope = scope
self.size = size
self.dropout = dropout # keep_prob
self.nonlinearity = nonlinearity
def __call__(self, x):
"""Dense layer currying, to apply layer to any input tensor `x`"""
# tf.Tensor -> tf.Tensor
with tf.name_scope(self.scope):
while True:
try: # reuse weights if already initialized
return self.nonlinearity(tf.matmul(x, self.w) + self.b)
except(AttributeError):
self.w, self.b = self.wbVars(x.get_shape()[1].value, self.size)
self.w = tf.nn.dropout(self.w, self.dropout)
...
```

```
i.e. composed = composeAll([f, g, h])
composed(x) # == f(g(h(x)))
"""
# adapted from https://docs.python.org/3.1/howto/functional.html
return partial(functools.reduce, compose)(*args)
```

```
<p>Now that we’ve defined our model primitives, we can tackle the VAE itself.</p>
<p>Keep in mind: the TensorFlow computational graph is cleanly divorced from the numerical computations themselves. In other words, a <code>tf.Graph</code> wireframes the underlying skeleton of the model, upon which we may hang values only within the context of a <code>tf.Session</code>.</p>
<p>Below, we initialize class <code>VAE</code> and activate a session for future convenience (so we can initialize and evaluate tensors within a single session, e.g. to persist weights and biases across rounds of training).</p>
<p>Here are some relevant snippets, cobbled together from the <a href="https://github.com/fastforwardlabs/vae-tf/blob/master/vae.py">full source code</a>:</p>
```python
class VAE():
"""Variational Autoencoder
see: Kingma & Welling - Auto-Encoding Variational Bayes
(https://arxiv.org/abs/1312.6114)
"""
DEFAULTS = {
"batch_size": 128,
"learning_rate": 1E-3,
"dropout": 1., # keep_prob
"lambda_l2_reg": 0.,
"nonlinearity": tf.nn.elu,
"squashing": tf.nn.sigmoid
}
RESTORE_KEY = "to_restore"
def __init__(self, architecture, d_hyperparams={}, meta_graph=None,
save_graph_def=True, log_dir="./log"):
"""(Re)build a symmetric VAE model with given:
* architecture (list of nodes per encoder layer); e.g.
[1000, 500, 250, 10] specifies a VAE with 1000-D inputs, 10-D latents,
& end-to-end architecture [1000, 500, 250, 10, 250, 500, 1000]
* hyperparameters (optional dictionary of updates to `DEFAULTS`)
"""
self.architecture = architecture
self.__dict__.update(VAE.DEFAULTS, **d_hyperparams)
self.sesh = tf.Session()
if not meta_graph: # new model
handles = self._buildGraph()
...
self.sesh.run(tf.initialize_all_variables())
```

```
# encoding / "recognition": q(z|x)
encoding = [Dense("encoding", hidden_size, dropout, self.nonlinearity)
# hidden layers reversed for function composition: outer -> inner
for hidden_size in reversed(self.architecture[1:-1])]
h_encoded = composeAll(encoding)(x_in)
# latent distribution parameterized by hidden encoding
# z ~ N(z_mean, np.exp(z_log_sigma)**2)
z_mean = Dense("z_mean", self.architecture[-1], dropout)(h_encoded)
z_log_sigma = Dense("z_log_sigma", self.architecture[-1], dropout)(h_encoded)
```

```
<p>Here, we build a pipe from <code>x_in</code> (an empty placeholder for input data <span class="math inline">\(x\)</span>), through the sequential hidden encoding, to the corresponding distribution over latent space—the variational approximate posterior, or hidden representation, <span class="math inline">\(z \sim q_\phi(z|x)\)</span>.</p>
<p>As observed in lines <code>14</code> - <code>15</code>, latent <span class="math inline">\(z\)</span> is distributed as a multivariate <a href="http://www.ncbi.nlm.nih.gov/pmc/articles/PMC2465539/figure/fig1/">normal</a> with mean <span class="math inline">\(\mu\)</span> and diagonal covariance values <span class="math inline">\(\sigma^2\)</span> (the square of the “sigma” in <code>z_log_sigma</code>) directly parameterized by the encoder: <span class="math inline">\(\mathcal{N}(\mu, \sigma^2I)\)</span>. In other words, we set out to “explain” highly complex observations as the consequence of an unobserved collection of simplified latent variables, i.e. independent Gaussians. (This is dictated by our choice of a conjugate spherical Gaussian prior over <span class="math inline">\(z\)</span>—see <a href="http://blog.fastforwardlabs.com/2016/08/12/introducing-variational-autoencoders-in-prose-and.html">Part I</a>.)</p>
<p>Next, we sample from this latent distribution (in practice, <a href="https://arxiv.org/abs/1312.6114">one draw is enough</a> given sufficient minibatch size, i.e. >100). This method involves a trick—can you figure out why?—that we will explore in more detail later.</p>
```python
z = self.sampleGaussian(z_mean, z_log_sigma)
```

```
# decoding / "generative": p(x|z)
decoding = [Dense("decoding", hidden_size, dropout, self.nonlinearity)
for hidden_size in self.architecture[1:-1]] # assumes symmetry
# final reconstruction: restore original dims, squash outputs [0, 1]
decoding.insert(0, Dense( # prepend as outermost function
"reconstruction", self.architecture[0], dropout, self.squashing))
x_reconstructed = tf.identity(composeAll(decoding)(z), name="x_reconstructed")
```

```
# ops to directly explore latent space
# defaults to prior z ~ N(0, I)
z_ = tf.placeholder_with_default(tf.random_normal([1, self.architecture[-1]]),
shape=[None, self.architecture[-1]],
name="latent_in")
x_reconstructed_ = composeAll(decoding)(z_)
```

```
def sampleGaussian(self, mu, log_sigma):
"""Draw sample from Gaussian with given shape, subject to random noise epsilon"""
with tf.name_scope("sample_gaussian"):
# reparameterization trick
epsilon = tf.random_normal(tf.shape(log_sigma), name="epsilon")
return mu + epsilon * tf.exp(log_sigma) # N(mu, sigma**2)
```

```
@staticmethod
def crossEntropy(obs, actual, offset=1e-7):
"""Binary cross-entropy, per training example"""
# (tf.Tensor, tf.Tensor, float) -> tf.Tensor
with tf.name_scope("cross_entropy"):
# bound by clipping to avoid nan
obs_ = tf.clip_by_value(obs, offset, 1 - offset)
return -tf.reduce_sum(actual * tf.log(obs_) +
(1 - actual) * tf.log(1 - obs_), 1)
```

```
@staticmethod
def kullbackLeibler(mu, log_sigma):
"""(Gaussian) Kullback-Leibler divergence KL(q||p), per training example"""
# (tf.Tensor, tf.Tensor) -> tf.Tensor
with tf.name_scope("KL_divergence"):
# = -0.5 * (1 + log(sigma**2) - mu**2 - sigma**2)
return -0.5 * tf.reduce_sum(1 + 2 * log_sigma - mu**2 -
tf.exp(2 * log_sigma), 1)
```

```
# reconstruction loss: mismatch b/w x & x_reconstructed
# binary cross-entropy -- assumes p(x) & p(x|z) are iid Bernoullis
rec_loss = VAE.crossEntropy(x_reconstructed, x_in)
# Kullback-Leibler divergence: mismatch b/w approximate posterior & imposed prior
# KL[q(z|x) || p(z)]
kl_loss = VAE.kullbackLeibler(z_mean, z_log_sigma)
# average over minibatch
cost = tf.reduce_mean(rec_loss + kl_loss, name="cost")
```

```
# optimization
global_step = tf.Variable(0, trainable=False)
with tf.name_scope("Adam_optimizer"):
optimizer = tf.train.AdamOptimizer(self.learning_rate)
tvars = tf.trainable_variables()
grads_and_vars = optimizer.compute_gradients(cost, tvars)
clipped = [(tf.clip_by_value(grad, -5, 5), tvar) # gradient clipping
for grad, tvar in grads_and_vars]
train_op = optimizer.apply_gradients(clipped, global_step=global_step,
name="minimize_cost") # back-prop
```

```
return (x_in, dropout, z_mean, z_log_sigma, x_reconstructed,
z_, x_reconstructed_, cost, global_step, train_op)
```

```
def train(self, X, max_iter=np.inf, max_epochs=np.inf, cross_validate=True,
verbose=True, save=False, outdir="./out", plots_outdir="./png"):
try:
err_train = 0
now = datetime.now().isoformat()[11:]
print("------- Training begin: {} -------\n".format(now))
while True:
x, _ = X.train.next_batch(self.batch_size)
feed_dict = {self.x_in: x, self.dropout_: self.dropout}
fetches = [self.x_reconstructed, self.cost, self.global_step, self.train_op]
x_reconstructed, cost, i, _ = self.sesh.run(fetches, feed_dict)
err_train += cost
if i%1000 == 0 and verbose:
print("round {} --> avg cost: ".format(i), err_train / i)
if i >= max_iter or X.train.epochs_completed >= max_epochs:
print("final avg cost (@ step {} = epoch {}): {}".format(
i, X.train.epochs_completed, err_train / i))
now = datetime.now().isoformat()[11:]
print("------- Training end: {} -------\n".format(now))
break
```