什么是扩散模型?扩散模型是根据非平衡热力学(nonequilibrium thermodynamics) 所构思的一种思路。基本原理是通过给一张图片一步步(timesteps) 的添加高斯噪音(Gaussian noise),最后得到一张符合高斯噪音分布的纯噪音图片,这是前向过程(forward process),这一步可以通过直接计算得到。之后再对这张噪音图像进行一步步的降噪(denoising),最后得到原来的图像,这是反向过程(reverse process)。主要训练的目标就是通过一个神经网络在降噪过程中学习噪音分布。
扩散概率模型(diffusion probabilistic model),简称扩散模型(diffusion model)
是一种使用变分推理训练的参数化马尔可夫链,其在有限时间后产生与数据匹配的样本。
a parameterized Markov chain trained using variational inference to produce samples matching the data after finite time.
定义原始数据,这通过扩散模型可以得到相同维度的数据(其中是扩散的总步骤),则扩散模型是形式的潜在变量模型。
模型中的被叫做反向过程(reverse process),它被定义为从开始学习高斯转换的马尔可夫链:
这里都是通过和作为参数,从模型中获取对应的噪音来计算出
扩散模型和其他潜在变量模型的区分就是:近似后验(approximate posterior)。扩散模型使用近似后验(approximate posterior) 完成前向扩散过程(forward process, diffusion process)。这个过程是通过一个固定或者可学习的方差调度(variance schedule)即: 的马尔可夫链,逐步的的添加高斯噪音到数据中,得到一个新的数据,最后得到符合高斯分布的纯噪音数据。(这个有论文证明,当,最终得到的就是符合高斯分布的数据):
- 表示每一个都是可以通过调度参数定义的高斯分布获取的噪音和得到。
- 可以通过通过重新参数化(reparameterization) 学习得到,也可以作为一个超参数设置成固定的值。
- 反向过程的表达性部分需要通过在中选择的高斯条件来确保, 只有当是很小时,两个过程具有相同的函数形式。
值得注意的是,任意时刻的都可以通过直接获得:
其中 (7式) 通过重新参数化为:
这也就是说,每一个都是对的均值缩放和一个噪音的和。 加了噪声之后的图片所属于的分布的均值,是基于上一张图片的均值轻微偏移后得到的
用随机梯度下降优化的随机项,上面的式子可以优化为:
在项中:
注意:由于在
DDPM
论文中,使用的是固定的常量,所以在训练期间是常量,直接忽略。
反向过程(reverse process) 就是通过对逐渐降燥(denosing),得到原始图像的过程。在这个过程中通过使用神经网络拟合数据分布的过程:
DDPM
中,使用的是固定的方差:,其中和有相似的结果。这也就是说,我们需要使用模型拟合前向过程中的(即,前向过程的后验均值(posterior mean)):
假设图像数据是由组成,这线性映射到,这将有利于神经网络的逆向过程和从高斯分布获得的数据相一致。
为了获取离散对数似然(discrete log likelihoods),DDPM
论文中设置反向过程最后一项是从高斯分布为导出的独立离散解码器:
其中
D
是数据的维度;i
是纬度的坐标。
通过 (22式) 可以知道,就是通过给出的来预测,所以,可以参数化:
这一步就是把从原来的预测数据分布转变成了预测噪音分布。
通过 (22式和25式), 这可以简化损失函数:
可是DDPM
论文中发现在变分界的下面的变体上训练对样本质量有益(并且更容易实现):
t
是1
到T
之间均匀分布的值
抽样过程是从正太分布中获取一个噪音数据,然后根据模型通过和 一步步获取到相对应的噪音,直到最后得到:
t
做位置编码注意,是递增的:
常用的方差调度计算有:
linear
cosine
quadratic/sqrt
sigmoid
warmup
jsd
基本都是根据
DDPM
论文中的范围
alphas
, 即: alphas_cumprod
, 即: np.cumprod(alphas, axis=0)
sqrt_alphas_cumprod
, 即: np.sqrt(alphas_cumprod)
sqrt_one_minus_alphas_cumprod
, 即: np.sqrt(1.0 - alphas_cumprod)
alphas_cumprod_prev
: np.append(1., alphas_cumprod[:-1])
,方便后面计算,使用统一的t
参数。posterior_mean_coeff1
, 即: betas * np.sqrt(alphas_cumprod_prev) / (1. - alphas_cumprod)
,这里使用的是一个标量,所以1
在连乘中没有影响。posterior_mean_coeff2
, 即: (1. - self.alphas_cumprod_prev) * np.sqrt(alphas) / (1. - alphas_cumprod)
posterior_variance
, 即:betas * (1 - alphas_cumprod_prev) / (1 - alphas_cumprod)
,这里是为了计算,计算方式是 (18式)
注意
posterior_variance
和公式中的方差
sqrt_recip_alphas_cumprod
, 即: np.sqrt(1.0 / alphas_cumprod)
t
从sqrt_one_minus_alphas_cumprod
和获取posterior_variance
, 即:betas * (1 - alphas_cumprod_prev) / (1 - alphas_cumprod)
,这里是为了计算,计算方式是 (18式)
sqrt_recip_alphas_cumprod
, 即: np.sqrt(1.0 / alphas_cumprod)
sqrt_recipm1_alphas_cumprod
, 即: np.sqrt(1 / alphas_cumprod - 1)
pythonclass SinusoidalPositionEmbeddings(nn.Module):
def __init__(self, dim):
super().__init__()
self.dim = dim
def forward(self, time):
device = time.device
half_dim = self.dim // 2
embeddings = math.log(10000) / (half_dim - 1)
embeddings = torch.exp(torch.arange(half_dim, device=device) * -embeddings)
embeddings = time[:, None] * embeddings[None, :]
embeddings = torch.cat((embeddings.sin(), embeddings.cos()), dim=-1)
if self.dim % 2 == 1:
embeddings = F.pad(embeddings, (0, 1), value=0.0)
return embeddings
linear
和cosine
)pythondef get_named_beta_schedule(schedule_name, num_diffusion_timesteps):
"""
Get a pre-defined beta schedule for the given name.
The beta schedule library consists of beta schedules which remain similar
in the limit of num_diffusion_timesteps.
Beta schedules may be added, but should not be removed or changed once
they are committed to maintain backwards compatibility.
"""
if schedule_name == "linear":
# Linear schedule from Ho et al, extended to work for any number of
# diffusion steps.
scale = 1000 / num_diffusion_timesteps
beta_start = scale * 0.0001
beta_end = scale * 0.02
return np.linspace(
beta_start, beta_end, num_diffusion_timesteps, dtype=np.float64
)
elif schedule_name == "cosine":
return betas_for_alpha_bar(
num_diffusion_timesteps,
lambda t: math.cos((t + 0.008) / 1.008 * math.pi / 2) ** 2,
)
else:
raise NotImplementedError(f"unknown beta schedule: {schedule_name}")
def betas_for_alpha_bar(num_diffusion_timesteps, alpha_bar, max_beta=0.999):
"""
Create a beta schedule that discretizes the given alpha_t_bar function,
which defines the cumulative product of (1-beta) over time from t = [0,1].
:param num_diffusion_timesteps: the number of betas to produce.
:param alpha_bar: a lambda that takes an argument t from 0 to 1 and
produces the cumulative product of (1-beta) up to that
part of the diffusion process.
:param max_beta: the maximum beta to use; use values lower than 1 to
prevent singularities.
"""
betas = []
for i in range(num_diffusion_timesteps):
t1 = i / num_diffusion_timesteps
t2 = (i + 1) / num_diffusion_timesteps
betas.append(min(1 - alpha_bar(t2) / alpha_bar(t1), max_beta))
return np.array(betas)
pythonclass GaussianDiffusion:
def __init__(self, betas, predict_xstart=False, rescale_timesteps=False, model_mean_type='eps'):
assert len(betas.shape) == 1, "betas must be 1-D"
assert (betas > 0).all() and (betas <= 1).all()
# 基础的属性
self.num_timesteps = betas.shape[0]
self.predict_xstart = predict_xstart
self.rescale_timesteps = rescale_timesteps
self.model_mean_type = model_mean_type
# 从原始数据到噪音数据映射相关的系数,即x_0 -> x_t相关的系数
self.betas = betas
self.alphas = 1 - betas
self.alphas_cumprod = np.cumprod(self.alphas, axis=0)
self.sqrt_alphas_cumprod = np.sqrt(self.alphas_cumprod)
self.sqrt_one_minus_alphas_cumprod = np.sqrt(1 - self.alphas_cumprod)
# 使用x_t和x_0来预测x_{t-1}相关的系数
self.alphas_cumprod_prev = np.append(1., self.alphas_cumprod[:-1])
self.posterior_mean_coeff1 = self.betas * np.sqrt(self.alphas_cumprod_prev) / (1. - self.alphas_cumprod)
self.posterior_mean_coeff2 = np.sqrt(self.alphas) * (1. - self.alphas_cumprod_prev) / (
1. - self.alphas_cumprod)
# 使用模型预测的噪音noise数据,以及x_t和新的噪音预测x_{t-1}相关的系数
self.sqrt_recip_alphas_cumprod = np.sqrt(1. / self.alphas_cumprod)
self.sqrt_recip_m1_alphas_cumprod = np.sqrt(1 / self.alphas_cumprod - 1)
self.posterior_variance = self.betas * (1. - self.alphas_cumprod_prev) / (1. - self.alphas_cumprod)
self.model_noise_coeff = self.betas / self.sqrt_one_minus_alphas_cumprod
# 方便计算损失函数需要的变量
self.log_one_minus_alphas_cumprod = np.log(1.0 - self.alphas_cumprod)
self.posterior_log_variance_clipped = np.log(
np.append(self.posterior_variance[1], self.posterior_variance[1:]))
def q_sample(self, x_start, timesteps, noise=None):
"""
通过使用原始数据x_start来预测第timesteps步的噪音数据,即:sample from q(x_t | x_0).
:param x_start: 原始数据.
:param timesteps: 像要获取第几步的噪音数据,这个timesteps是-1之后得到的,即0表示第一个步骤
:param noise: 噪音数据,如果设置了,就使用这个数据给数据加噪音,如果没有设置,则使用随机的噪音
:return: 返回一个加噪音后的数据。
"""
if noise is None:
noise = torch.randn_like(x_start)
assert noise.shape == x_start.shape
return (
self._extract_into_tensor(self.sqrt_alphas_cumprod, timesteps, x_start.shape) * x_start
+ self._extract_into_tensor(self.sqrt_one_minus_alphas_cumprod, timesteps, x_start.shape)
* noise
)
def q_mean_variance(self, x_start, timesteps):
"""
这是一个工具类,是对`q_sample`的一个缩小版,这个函数只返回对应timesteps的均值和方差
:param x_start: 原始数据,这里只是需要它的shape,用来对各种参数进行`扩维`。
:param timesteps: 需要获取哪个时间步的噪音,这里的timesteps也是从0开始。
:return: 返回对应的均值,方差和log方差
"""
mean = (
self._extract_into_tensor(self.sqrt_alphas_cumprod, timesteps, x_start.shape) * x_start
)
variance = self._extract_into_tensor(1.0 - self.alphas_cumprod, timesteps, x_start.shape)
log_variance = self._extract_into_tensor(
self.log_one_minus_alphas_cumprod, timesteps, x_start.shape
)
return mean, variance, log_variance
def q_posterior_mean_variance(self, x_start, x_t, timesteps):
"""
通过使用x_start和x_t来获取中间第`t`步的后验均值和方差,即q(x_{t-1} | x_t, x_0)
:param x_start: 原始数据
:param x_t: 加`t`步噪音后的数据
:param timesteps: 需要获取哪个时间步的噪音,这里的timesteps也是从0开始。
:return: 返回对应的均值和方差
"""
assert x_start.shape == x_t.shape
posterior_mean = (
self._extract_into_tensor(self.posterior_mean_coeff1, timesteps, x_t.shape) * x_start
+ self._extract_into_tensor(self.posterior_mean_coeff2, timesteps, x_t.shape) * x_t
)
posterior_variance = self._extract_into_tensor(self.posterior_variance, timesteps, x_t.shape)
posterior_log_variance_clipped = self._extract_into_tensor(
self.posterior_log_variance_clipped, timesteps, x_t.shape
)
assert (
posterior_mean.shape[0]
== posterior_variance.shape[0]
== posterior_log_variance_clipped.shape[0]
== x_start.shape[0]
)
return posterior_mean, posterior_variance, posterior_log_variance_clipped
def p_mean_variance(
self, model, x, timesteps, clip_denoised=True, denoised_fn=None, model_kwargs=None
):
"""
使用模型基于x_t对x_{t-1}的预测,即p(x_{t-1} | x_t);也能通过参数`predict_xstart`指定输出预测的`x_0`
:param model: 预测噪音的模型,其接受一个`batch_size`的数据,和一个`(batch_size,)`的`timesteps`
:param x: the [N x C x ...] tensor at time t.
:param timesteps: a 1-D Tensor of timesteps.
:param clip_denoised: 如果为`True`,则把数据缩放到`[-1, 1]`之间。
论文里说这确保了神经网络反向过程在从标准正态先验p(x_T)开始的一致缩放操作。
:param denoised_fn: 如果不为`None`,这个函数在用于采样之前应用于`x_start`预测,但在`clip_denoised`之后。
:param model_kwargs: 传给模型的参数,默认是`None`
:return: a dict with the following keys:
- 'mean': the model mean output.
- 'variance': the model variance output.
- 'pred_xstart': the prediction for x_0.
"""
if model_kwargs is None:
model_kwargs = {}
B, C = x.shape[:2]
assert timesteps.shape == (B,)
# 模型输出预测的eps
model_output = model(x, self._scale_timesteps(timesteps), **model_kwargs)
# for fixed large, we set the initial (log-)variance like so
# to get a better decoder log likelihood.
model_variance = np.append(self.posterior_variance[1], self.betas[1:])
model_log_variance = np.log(model_variance)
# 抽取值
model_variance = self._extract_into_tensor(model_variance, timesteps, x.shape)
model_log_variance = self._extract_into_tensor(model_log_variance, timesteps, x.shape)
def process_xstart(xx):
if denoised_fn is not None:
return denoised_fn(xx, timesteps)
if clip_denoised:
return xx.clamp(-1, 1)
if self.predict_xstart:
pred_xstart = process_xstart(model_output)
else:
# model is used to predict eps
pred_xstart = process_xstart(
self._predict_xstart_from_eps(x_t=x, timesteps=timesteps, eps=model_output)
)
# 获取pred_xstart和纯噪音的x_t之间的均值和方差
model_mean, _, _ = self.q_posterior_mean_variance(
x_start=pred_xstart, x_t=x, timesteps=timesteps
)
assert model_mean.shape == model_log_variance.shape == pred_xstart.shape == x.shape
return {
"mean": model_mean,
"variance": model_variance,
"log_variance": model_log_variance,
"pred_xstart": pred_xstart,
}
def _predict_xstart_from_eps(self, x_t, timesteps, eps):
"""
使用`eps`噪音数据来预测`x_0`
:param x_t: 加`t`步噪音后的数据
:param timesteps: 需要获取哪个时间步的噪音,这里的timesteps也是从0开始。
:param eps: 噪音数据,通常是模型返回的噪音数据
:return: 返回使用`eps`计算出来的`x_0`
"""
assert x_t.shape == eps.shape
return (
self._extract_into_tensor(self.sqrt_recip_alphas_cumprod, timesteps, x_t.shape) * x_t
- self._extract_into_tensor(self.sqrt_recip_m1_alphas_cumprod, timesteps, x_t.shape) * eps
)
@torch.no_grad()
def p_sample(
self, model, x, timesteps, clip_denoised=True, denoised_fn=None, model_kwargs=None,
noise_scale=1.0):
"""
在给定的timesteps下,从模型中采样x_{t-1}。
:param model: 用来获取噪音的模型
:param x: the current tensor at x_{t-1}.
:param timesteps: 需要获取哪个时间步的样本,这里的timesteps也是从0开始。
:param clip_denoised: 如果为`True`,则把数据缩放到`[-1, 1]`之间。
:param denoised_fn: 如果不为`None`,这个函数在用于采样之前应用于`x_start`预测,但在`clip_denoised`之后。
:param model_kwargs: 传给模型的参数,默认是`None`
:param noise_scale: 对噪音数据的缩放值,默认是1.0
:return: a dict containing the following keys:
- 'sample': a random sample from the model.
- 'pred_xstart': a prediction of x_0.
"""
out = self.p_mean_variance(
model,
x,
timesteps,
clip_denoised=clip_denoised,
denoised_fn=denoised_fn,
model_kwargs=model_kwargs,
)
noise = torch.randn_like(x)
nonzero_mask = (
(timesteps != 0).float().view(-1, *([1] * (len(x.shape) - 1)))
) # no noise when t == 0
sample = out["mean"] + nonzero_mask * torch.exp(0.5 * out["variance"]) * noise * noise_scale
return {
"sample": sample,
"pred_xstart": out["pred_xstart"]
}
@torch.no_grad()
def p_sample_loop(
self,
model,
shape,
noise=None,
clip_denoised=True,
denoised_fn=None,
model_kwargs=None,
device=None
):
"""
Generate samples from the model.
:param model: the model module.
:param shape: the shape of the samples, (N, C, H, W).
:param noise: if specified, the noise from the encoder to sample.
Should be of the same shape as `shape`.
:param clip_denoised: if True, clip x_start predictions to [-1, 1].
:param denoised_fn: if not None, a function which applies to the
x_start prediction before it is used to sample.
:param model_kwargs: if not None, a dict of extra keyword arguments to
pass to the model. This can be used for conditioning.
:param device: if specified, the device to create the samples on.
If not specified, use a model parameter's device.
:return: a non-differentiable batch of samples.
"""
final = None
for sample in self.p_sample_loop_progressive(
model,
shape,
noise=noise,
clip_denoised=clip_denoised,
denoised_fn=denoised_fn,
model_kwargs=model_kwargs,
device=device,
):
final = sample
return final["sample"]
def p_sample_loop_progressive(
self,
model,
shape,
noise=None,
clip_denoised=True,
denoised_fn=None,
model_kwargs=None,
device=None,
):
"""
Generate samples from the model and yield intermediate samples from
each timestep of diffusion.
Arguments are the same as p_sample_loop().
Returns a generator over dicts, where each dict is the return value of
p_sample().
"""
if device is None:
device = next(model.parameters()).device
assert isinstance(shape, (tuple, list))
# 高斯噪音数据
if noise is not None:
img = noise
else:
img = torch.randn(*shape, device=device)
# 从T....1,即[T-1, ..., 0]时间步
indices = list(range(self.num_timesteps))[::-1]
for i in tqdm(iterable=indices, desc='sampling'):
# 获取batch_size个第i个timestep,即
# [999, 999, 999, 999, 999, 999, 999, ...]
# [998, 998, 998, 998, 998, 998, 998, ...]
t = torch.tensor([i] * shape[0], device=device)
with torch.no_grad():
out = self.p_sample(
model,
img,
t,
clip_denoised=clip_denoised,
denoised_fn=denoised_fn,
model_kwargs=model_kwargs,
)
yield out
img = out["sample"]
def training_losses(self, model, x_start, timesteps, noise=None):
"""
Training loss calculation
"""
# Add noise to data
assert timesteps.shape[0] == x_start.shape[0]
if noise is None:
noise = torch.randn_like(x_start, dtype=x_start.dtype)
assert noise.shape == x_start.shape and noise.dtype == x_start.dtype
# 从样本中抽取x_t
x_t = self.q_sample(x_start=x_start, timesteps=timesteps, noise=noise)
# 计算mes loss,可以根据model_mean_type类型计算
target = {
'xprev': self.q_posterior_mean_variance(x_start=x_start, x_t=x_t, timesteps=timesteps)[0],
'xstart': x_start,
'eps': noise
}[self.model_mean_type]
model_output = model(x_t, timesteps)
assert model_output.shape == target.shape == x_start.shape
loss = F.mse_loss(target, model_output)
return loss
def _scale_timesteps(self, timestep):
"""
根据`rescale_timesteps`参数对`timestep`进行缩放
:param timestep: 时间步
:return: 处理后的timestep
"""
if self.rescale_timesteps:
return timestep.float() * (1000.0 / self.num_timesteps)
return timestep
@staticmethod
def _extract_into_tensor(arr, timesteps, broadcast_shape):
"""
从`arr`数组中根据`timesteps`获取数据;并且对其进行扩维到`broadcast_shape`
:param arr: 一个一维数组,对应的就是各种参数的数组
:param timesteps: 要提取的数组中的索引的张量,这个参数是一个一维的,`(batch_size,)`
:param broadcast_shape: 需要返回数据的`shape`
:return: 返回一个`[batch_size, 1, ..., 1]`形状的张量。最后一维为什么是`1`?因为每一个数据都是一个时间步
"""
res = torch.from_numpy(arr).to(device=timesteps.device)[timesteps].float()
while len(res.shape) < len(broadcast_shape):
# 这是对res扩维
res = res[..., None]
return res.expand(broadcast_shape)
pythondef train(generate_dir, num_epochs, batch_size, img_size, input_channel,
shape, lr, timesteps, num_units, save_image_to, schedule_name):
# 运行相关模块
betas = get_named_beta_schedule(schedule_name, timesteps)
loader = LoadLiveDataset(batch_size=batch_size, resize=img_size)
model = UNetModel(image_size=img_size,
in_channels=input_channel,
model_channels=num_units,
out_channels=3,
num_res_blocks=2,
attention_resolutions=(16, 8)).to(device())
optimizer = Adam(model.parameters(), lr=lr)
gd = GaussianDiffusion(betas)
# 这里num_epochs + 1是保证后面抽样可以每100步抽样一次
for epoch in range(num_epochs + 1):
print(f'epoch: {epoch}')
for step, (features, _) in enumerate(loader.data_loader):
optimizer.zero_grad()
features = features.to(device())
# 获取随机的timestep
random_batch_t = torch.randint(0, timesteps, (batch_size,), device=device()).long()
loss = gd.training_losses(model=model, x_start=features, timesteps=random_batch_t)
print(f"\t step: {step}, Loss: {loss.item()}")
loss.backward()
# 梯度clip,保持稳定性
torch.nn.utils.clip_grad_norm_(model.parameters(), 1.)
optimizer.step()
with torch.no_grad():
# save generated images
if (epoch + 1) % 100 == 0:
# 保存的文件名
file_name = f'sample-{epoch}-{step}.png'
# 从模型随机抽样,一步步从1000开始抽样,直到最后一张x_0
all_images = gd.p_sample_loop(model, shape=shape, device=device())
# 这里是把数据从[-1, 1] -> [0, 1]
all_images = (all_images + 1) * 0.5
if save_image_to == 'local':
# 保存数据到本地
if not pathlib.Path.exists(generate_dir):
pathlib.Path.mkdir(generate_dir)
# save_image相关操作: [0, 1] -> [0, 255] + 0.5再取整(这里加0.5是保证四舍五入),同时 CWH ->WHC,和转成numpy格式
save_image(all_images, str(generate_dir + file_name), nrow=batch_size // 8)
这里省略的
UNetModel
代码,建议参考官方代码
对论文中的公式只能大致理解,细节方面还是欠佳。其中,论文中使用了多种损失方法,我这里是使用了mse损失,对于kl散度并没有做很多说明。
本文作者:小屁孩
本文链接:
版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!