DDPM - 论文阅读
1. 扩散模型定义
2. 相关定义
3. 前向过程
3. 损失函数
4. 反向过程
5. 缩放数据和$L_0$
6. 简化损失函数
7. 抽样
1. 实现要点
2. $\beta_t$的选择
3. 需要计算的常用固定值
4. 通过代码梳理思路
1. 位置编码
2. 调度计算(这里使用了linear和cosine)
4. 训练代码(这里只展示大致逻辑)


什么是扩散模型?扩散模型是根据非平衡热力学(nonequilibrium thermodynamics) 所构思的一种思路。基本原理是通过给一张图片一步步(timesteps) 的添加高斯噪音(Gaussian noise),最后得到一张符合高斯噪音分布的纯噪音图片,这是前向过程(forward process),这一步可以通过直接计算得到。之后再对这张噪音图像进行一步步的降噪(denoising),最后得到原来的图像,这是反向过程(reverse process)。主要训练的目标就是通过一个神经网络在降噪过程中学习噪音分布。




1. 扩散模型定义

扩散概率模型(diffusion probabilistic model),简称扩散模型(diffusion model)是一种使用变分推理训练的参数化马尔可夫链,其在有限时间后产生与数据匹配的样本。

a parameterized Markov chain trained using variational inference to produce samples matching the data after finite time.

2. 相关定义

定义原始数据x0q(x0)x_0 \backsim q(x_0),这通过扩散模型可以得到相同维度的x1,x2,...,xTx_1, x_2, ..., x_T数据(其中TT是扩散的总步骤),则扩散模型是pθ(x0):=pθ(x0:T)dx1:Tp_{\theta}(x_0) := \int p_{\theta}(x_{0:T})dx_{1:T}形式的潜在变量模型。

模型中的pθ(x0:T)p_{\theta}(x_{0:T})被叫做反向过程(reverse process),它被定义为从p(xT)=N(xT;0,I)p(x_T)=\mathcal N(x_T; 0, I)开始学习高斯转换的马尔可夫链:

pθ(x0:T):=p(xT)t=1Tpθ(xt1xt)pθ(xt1xt):=N(xt1;μθ(xt,t),Eθ(xt,t))\begin{align} p_{\theta}(x_{0:T}) &:= p(x_T)\prod_{t=1}^T p_{\theta}(x_{t-1} | x_t) \\ p_{\theta}(x_{t-1} | x_t) &:= \mathcal N(x_{t-1}; \mu_{\theta}(x_t, t), \mathcal E_{\theta}(x_t, t)) \end{align}

这里x1,...,xt1x_1, ..., x_{t-1}都是通过xtx_ttt作为参数,从模型中获取对应的噪音来计算出xt1x_{t-1}

3. 前向过程

扩散模型和其他潜在变量模型的区分就是:近似后验(approximate posterior)。扩散模型使用近似后验(approximate posterior) 完成前向扩散过程(forward process, diffusion process)。这个过程是通过一个固定或者可学习的方差调度(variance schedule)即:β1,...,βT\beta_1, ..., \beta_T 的马尔可夫链,逐步的的添加高斯噪音到数据中,得到一个新的数据,最后得到符合高斯分布的纯噪音数据。(这个有论文证明,当TT \to \infty,最终得到的就是符合高斯分布的数据):

q(x1:Tx0):=t=1Tq(xtxt1),q(xtxt1):=N(xt;1βtxt1,βtI)\begin{align} q(x_{1:T} | x_0) &:= \prod_{t=1}^T q(x_t | x_{t-1}), \\ q(x_t | x_{t-1}) &:= \mathcal N(x_t; \sqrt{1-\beta_t}x_{t-1}, \beta_tI) \end{align}
  1. 表示每一个xtx_t都是可以通过调度参数定义的高斯分布获取的噪音和xt1x_{t-1}得到。
  2. βt\beta_t可以通过通过重新参数化(reparameterization) 学习得到,也可以作为一个超参数设置成固定的值。
  3. 反向过程的表达性部分需要通过在pθ(xt1xt)p_\theta(x_{t−1}| x_t)中选择的高斯条件来确保, 只有当βt\beta_t是很小时,两个过程具有相同的函数形式。


αt:=1βt,αˉt:=s=1tαsq(xtx0)=N(xt;αˉtx0,(1αˉt)I)\begin{align} \alpha_t &:= 1 - \beta_t, \\ \bar \alpha_t &:= \prod_{s=1}^t \alpha_s \\ q(x_t | x_0) &= \mathcal N(x_t; \sqrt{\bar \alpha_t}x_0, (1-\bar \alpha_t)I) \end{align}

其中 (7式) 通过重新参数化为:

xt(x0,ϵ)=αˉtx0+1αˉtϵ,其中 ϵN(0,I)\begin{align} x_t(x_0, \epsilon) &= \sqrt{\bar \alpha_t}x_0 + \sqrt{1 - \bar\alpha_t}\epsilon, \qquad 其中 \ \epsilon \in \mathcal N(0, I) \end{align}

这也就是说,每一个xt,t[1...T]x_t, t \in [1...T]都是对x0x_0的均值缩放和一个噪音的和。 加了噪声之后的图片所属于的分布的均值,是基于上一张图片的均值轻微偏移后得到的

3. 损失函数

L:=E[log pθ(x0)]Eq[log pθ(x0:T)q(x1:Tx0)]=Eq[log p(xT)t1log pθ(xt1xt)q(xtxt1)]=Eq[log p(xT)t>1log pθ(xt1xt)q(xtxt1)log pθ(x0x1)q(x1x0)]=Eq[log p(xT)t>1log pθ(xt1xt)q(xt1xt,x0)q(xt1x0)q(x1x0)log pθ(x0x1)q(x1x0)]=Eq[log p(xT)q(xTx0)t>1log pθ(xt1xt)q(xt1xt,x0)log pθ(x0x1)]\begin{align} \mathcal L &:= \mathbb E [-log\ p_\theta(x_0)] \\ & \leq \mathbb E_q[-log\ \frac{p_\theta(x_{0:T})}{q(x_{1:T} | x_0)}] \\ & = \mathbb E_q[-log\ p(x_T) - \sum_{t \geq 1}log\ \frac{p_\theta(x_{t-1} | x_t)}{q(x_t | x_{t-1})}] \\ & = \mathbb E_q[-log\ p(x_T) - \sum_{t \gt 1}log\ \frac{p_\theta(x_{t-1} | x_t)}{q(x_t | x_{t-1})} - log\ \frac{p_{\theta}(x_0|x_1)}{q(x_1|x_0)}] \\ & = \mathbb E_q[-log\ p(x_T) - \sum_{t \gt 1}log\ \frac{p_\theta(x_{t-1} | x_t)}{q(x_{t-1} | x_t, x_0)} * \frac{q(x_{t-1} | x_0)}{q(x_1 | x_0)} - log\ \frac{p_{\theta}(x_0|x_1)}{q(x_1|x_0)}] \\ & = \mathbb E_q[-log\ \frac{p(x_T)}{q(x_T|x_0)} - \sum_{t \gt 1}log\ \frac{p_\theta(x_{t-1} | x_t)}{q(x_{t-1} | x_t, x_0)} - log\ p_{\theta}(x_0|x_1)] \\ \end{align}


Eq[DKL(q(xTx0)  p(xT))LT + t>1DKL(q(xt1xt,x0)  pθ(xt1xt))Lt1  log pθ(x0x1)L0]\begin{align} \mathbb E_q[\underbrace{D_{KL}(q(x_T | x_0) \ || \ p(x_T))}_{L_T} \ + \ \sum_{t \gt 1} \underbrace{D_{KL}(q(x_{t-1} | x_t, x_0) \ || \ p_{\theta}({x_{t-1} | x_t}))}_{L_{t-1}} \ - \ \underbrace{log\ p_{\theta}(x_0 | x_1)}_{L_0}] \\ \end{align}


q(xt1xt,x0)=N(xt1;μ~t(xt,x0),β~tI)μ~t(xt,x0):=αˉt1βt1αˉtx0 + αt(1αˉt1)1αˉtxtβ~t:=1αˉt11αˉtβt\begin{align} q(x_{t-1} | x_t, x_0) &= \mathcal N(x_{t-1}; \tilde \mu_t(x_t, x_0),\tilde \beta_t I) \\ \tilde \mu_t(x_t, x_0) &:= \frac{\sqrt{\bar \alpha_{t-1}}\beta_t}{1- \bar\alpha_t} x_0 \ + \ \frac{\sqrt\alpha_t (1 - \bar \alpha_{t-1})}{1 - \bar\alpha_t}x_t \\ \tilde \beta_t &:= \frac{1 - \bar \alpha_{t-1}}{1 - \bar \alpha_t}\beta_t \end{align}


4. 反向过程

反向过程(reverse process) 就是通过对xTx_T逐渐降燥(denosing),得到原始图像x0x_0的过程。在这个过程中通过使用神经网络拟合数据分布的过程:

pθ(xt1xt)=N(xt1;μθ(xt,t),Eθ(xt,t)),for1<tT\begin{align} p_{\theta}(x_{t-1}|x_t) = \mathcal N(x_{t-1}; \mu_{\theta}(x_t, t), \mathcal E_{\theta}(x_t, t)), \qquad for \quad 1 \lt t \leq T \end{align}
  1. 对于模型(上式)中的Eθ(xt,t))\mathcal E_{\theta}(x_t, t)): 在论文DDPM中,使用的是固定的方差Eθ(xt,t))=σt2I\mathcal E_{\theta}(x_t, t)) = \sigma_t^2I,其中σt2=βt\sigma_t^2 = \beta_tσt2=β~t=1αˉt11αˉtβt\sigma_t^2 = \tilde \beta_t = \frac{1 - \bar \alpha_{t-1}}{1 - \bar \alpha_t}\beta_t有相似的结果。
  2. 对于模型(上式)中的μθ(xt,t)\mu_{\theta}(x_t, t),因为Eθ(xt,t))=σt2I\mathcal E_{\theta}(x_t, t)) = \sigma_t^2I,所以原分布函数则为pθ(xt1xt)=N(xt1;μθ(xt,t),σt2I)p_{\theta}(x_{t-1}| x_t) = \mathcal N(x_{t-1}; \mu_{\theta}(x_t, t), \sigma_t^2I),所以损失函数为:
    Lt1=Eq[12σt2μ~t(xt,x0)μθ(xt,t)2]+C,其中C是常数\begin{align} \mathcal L_{t-1} &= \mathbb E_q[\frac{1}{2\sigma^2_t}||\tilde \mu_t(x_t, x_0) - \mu_{\theta}(x_t, t)||^2] + C, \qquad 其中 C是常数 \end{align}

    这也就是说,我们需要使用模型μθ\mu_{\theta}拟合前向过程中的μ~t\tilde \mu_t(即,前向过程的后验均值(posterior mean)):

    Lt1C=Ex0,ϵ[12σt2μ~t(xt(x0,ϵ),1αˉt(xt(x0,ϵ)1αˉtϵ))μθ(xt(x0,ϵ),t)2]=Ex0,ϵ[12σt21αt(xt(x0,ϵ)βt1αˉtϵ)μθ(xt(x0,ϵ),t)2]\begin{align} \mathcal L_{t-1} - C &= \mathbb E_{x_0, \epsilon}[\frac{1}{2\sigma_t^2} \lVert \tilde \mu_t(x_t(x_0, \epsilon), \frac{1}{\sqrt{\bar\alpha_t}}(x_t(x_0, \epsilon) - \sqrt{1-\bar\alpha_t}\epsilon)) - \mu_{\theta}(x_t(x_0, \epsilon), t) \rVert ^2] \\ &= \mathbb E_{x_0, \epsilon}[\frac{1}{2\sigma_t^2} \lVert \frac{1}{\sqrt{\alpha_t}}(x_t(x_0, \epsilon) - \frac{\beta_t}{\sqrt{1- \bar\alpha_t}}\epsilon) - \mu_{\theta}(x_t(x_0, \epsilon), t) \rVert ^2] \end{align}

5. 缩放数据和L0L_0

假设图像数据是由0,1,...,255{0, 1, ..., 255}组成,这线性映射到[1,1][-1, 1],这将有利于神经网络的逆向过程和从高斯分布获得的xTx_T数据相一致。

为了获取离散对数似然(discrete log likelihoods)DDPM论文中设置反向过程最后一项是从高斯分布为N(x0;μθ(x1,1),σ12I)\mathcal N(x_0; \mu_{\theta}(x_1, 1), \sigma_1^2I)导出的独立离散解码器:

pθ(x0x1)=i=1Dδ(x0i)δ+(x0i)N(x;μθi(x1,1),σ12)dx\begin{align} p_{\theta}(x_0 | x_1) = \prod_{i=1}^{D} \int_{\delta_-(x_0^i)}^{\delta_+(x_0^i)} \mathcal N(x; \mu_{\theta}^i(x_1, 1), \sigma^2_1)dx \end{align}
δ+(x)={,if x=1x+1255if x<1δ(x)={,if x=1x1255if x>1\begin{align} \delta_{+}(x) = \begin{cases} \infty, &\text{if} \ x = 1 \\ x + \frac{1}{255} &\text{if} \ x < 1 \\ \end{cases} \nonumber \qquad \qquad \delta_{-}(x) = \begin{cases} -\infty, &\text{if} \ x = -1 \\ x - \frac{1}{255} &\text{if}\ x > -1 \\ \end{cases} \nonumber \end{align}


6. 简化损失函数

通过 (22式) 可以知道,μθ\mu_{\theta}就是通过给出的xtx_t来预测1αt(xtβt1αˉtϵ)\frac{1}{\sqrt{\alpha_t}}(x_t - \frac{\beta_t}{\sqrt{1 - \bar \alpha_t}}\epsilon),所以,可以参数化μθ\mu_{\theta}:

μθ(xt,t)=μ~t(xt,1αˉt(xt1αˉtϵθ(xt)))=1αt(xtβt1αˉtϵθ(xt,t))\begin{align} \mu_{\theta}(x_t, t) &= \tilde \mu_t(x_t, \frac{1}{\bar\alpha_t}(x_t - \sqrt{1 - \bar \alpha_t}\epsilon_{\theta}(x_t))) \\ &= \frac{1}{\sqrt{\alpha_t}}(x_t - \frac{\beta_t}{\sqrt{1 - \bar\alpha_t}}\epsilon_{\theta}(x_t, t)) \end{align}


通过 (22式和25式), 这可以简化损失函数:

Ex0,ϵ[βt22σt2αt(1αˉt)ϵϵθ(αˉtx0+1αˉtϵ,t)2]\begin{align} \mathbb E_{x_0, \epsilon}[\frac{\beta_t^2}{2\sigma_t^2\alpha_t(1 - \bar \alpha_t)} \lVert \epsilon - \epsilon_{\theta}(\sqrt{\bar\alpha_t} x_0 + \sqrt{1 - \bar \alpha_t}\epsilon, t) \rVert^2] \end{align}


Lsimple(θ):=Et,x0,ϵ[ϵϵθ(αˉtx0+1αˉtϵ, t)2]\begin{align} \mathcal L_{simple}(\theta) := \mathbb E_{t, x_0, \epsilon}[\lVert \epsilon - \epsilon_{\theta}(\sqrt{\bar \alpha_t} x_0 + \sqrt{1 - \bar\alpha_t}\epsilon,\ t)\rVert^2] \end{align}


7. 抽样


xt1=1αˉt(xtβt1αˉtϵθ(xt,t))+σtz,zN(0,I)x0x^0=xt1αˉtϵθ(xt)αˉt=1αˉtxt1αˉtαˉtϵθ(xt)=1αˉtxt1αˉt1ϵθ(xt)\begin{align} x_{t-1} &= \frac{1}{\sqrt{\bar \alpha_t}}(x_t - \frac{\beta_t}{\sqrt{1-\bar\alpha_t}}\epsilon_{\theta}(x_t, t)) + \sigma_tz, \qquad z \sim \mathcal N(0, I) \\ x_0 \approx \hat x_0 &= \frac{x_t - \sqrt{1- \bar \alpha_t}\epsilon_{\theta}(x_t)}{\sqrt{\bar \alpha_t}} \\ &= \frac{1}{\sqrt{\bar \alpha_t}}x_t - \frac{\sqrt{1 - \bar \alpha_t}}{\sqrt{\bar \alpha_t}} \epsilon_{\theta}(x_t) \\ &= \frac{1}{\sqrt{\bar \alpha_t}}x_t - \sqrt{\frac{1}{\bar\alpha_t} - 1} \epsilon_{\theta}(x_t) \end{align}



1. 实现要点

  1. 前向过程中的方差调度βt\beta_t
  2. 通过方差调度计算各种需要的αˉ\bar \alpha
  3. 对时间步t做位置编码
  4. 设计模型预测从xTx_T和时间步tt中学习噪音分布
  5. 使用模型获取从T,...,0T,..., 0获取对应的噪音分布,一步步使xTx_T降噪到x0x_0

注意,β\beta是递增的:0<β1<β2<...<βT<10 < \beta_1 < \beta_2 < ... < \beta_T < 1

2. βt\beta_t的选择


  • linear
  • cosine
  • quadratic/sqrt
  • sigmoid
  • warmup
  • jsd

基本都是根据DDPM论文中的范围[104,0.02][10^{-4}, 0.02]

3. 需要计算的常用固定值

  1. 基础定义的参数:
    • βt\beta_t
    • tt
  2. x0x_0获取噪音数据xtx_t所需的系数:
    • alphas, 即 α1:T \ \alpha_{1:T}\ : 1β1:T1 - \beta_{1:T}
    • alphas_cumprod, 即 αˉ \ \bar \alpha \ : np.cumprod(alphas, axis=0)
    • sqrt_alphas_cumprod, 即 αˉ \ \sqrt{\bar \alpha} \ : np.sqrt(alphas_cumprod)
    • sqrt_one_minus_alphas_cumprod, 即 1αˉ \ \sqrt{1-\bar\alpha} \ : np.sqrt(1.0 - alphas_cumprod) 获取噪音数据公式
  3. q(xt1  xt,x0)q(x_{t-1}\ | \ x_t, x_0)分布中获取数据所需参数
    • alphas_cumprod_prev: np.append(1., alphas_cumprod[:-1]),方便后面计算,使用统一的t参数。
    • posterior_mean_coeff1, 即βtαˉt11αˉ\frac{\beta_t \sqrt{\bar \alpha_{t-1}}}{1 - \bar \alpha}: betas * np.sqrt(alphas_cumprod_prev) / (1. - alphas_cumprod),这里αˉ\bar \alpha使用的是一个标量,所以1在连乘中没有影响。
    • posterior_mean_coeff2, 即(1αˉt1)αt1αˉ\frac{(1 - \bar \alpha_{t-1}) \sqrt{\alpha_t}}{1 - \bar \alpha}: (1. - self.alphas_cumprod_prev) * np.sqrt(alphas) / (1. - alphas_cumprod)
    • posterior_variance, 即1αˉt11αˉtβt\frac{1 - \bar\alpha_{t-1}}{1 - \bar\alpha_t}\beta_t:betas * (1 - alphas_cumprod_prev) / (1 - alphas_cumprod),这里是为了计算σt2\sigma_t^2,计算方式是 (18式) L_{t-1}损失


  4. 使用模型输出的噪音获取样本
    • sqrt_recip_alphas_cumprod, 即1αˉt\sqrt{\frac{1}{\bar\alpha_t}}: np.sqrt(1.0 / alphas_cumprod)
    • 后一项直接有对应的tsqrt_one_minus_alphas_cumprodβt\beta_t获取
    • posterior_variance, 即β(1αˉt1)1αˉt\frac{\beta (1 - \bar\alpha_{t-1})}{1 - \bar\alpha_t}:betas * (1 - alphas_cumprod_prev) / (1 - alphas_cumprod),这里是为了计算σt2\sigma_t^2,计算方式是 (18式) 后验计算
  5. 使用epseps计算x0x_0
    • sqrt_recip_alphas_cumprod, 即1αˉt\sqrt{\frac{1}{\bar\alpha_t}}: np.sqrt(1.0 / alphas_cumprod)
    • sqrt_recipm1_alphas_cumprod, 即1αˉt1=1αˉtαˉt\sqrt{\frac{1}{\bar\alpha_t} - 1} = \frac{\sqrt{1-\bar \alpha_t}}{\sqrt{\bar \alpha_t}}: np.sqrt(1 / alphas_cumprod - 1) eps

4. 通过代码梳理思路

1. 位置编码

class SinusoidalPositionEmbeddings(nn.Module): def __init__(self, dim): super().__init__() self.dim = dim def forward(self, time): device = time.device half_dim = self.dim // 2 embeddings = math.log(10000) / (half_dim - 1) embeddings = torch.exp(torch.arange(half_dim, device=device) * -embeddings) embeddings = time[:, None] * embeddings[None, :] embeddings = torch.cat((embeddings.sin(), embeddings.cos()), dim=-1) if self.dim % 2 == 1: embeddings = F.pad(embeddings, (0, 1), value=0.0) return embeddings

2. 调度计算(这里使用了linearcosine

def get_named_beta_schedule(schedule_name, num_diffusion_timesteps): """ Get a pre-defined beta schedule for the given name. The beta schedule library consists of beta schedules which remain similar in the limit of num_diffusion_timesteps. Beta schedules may be added, but should not be removed or changed once they are committed to maintain backwards compatibility. """ if schedule_name == "linear": # Linear schedule from Ho et al, extended to work for any number of # diffusion steps. scale = 1000 / num_diffusion_timesteps beta_start = scale * 0.0001 beta_end = scale * 0.02 return np.linspace( beta_start, beta_end, num_diffusion_timesteps, dtype=np.float64 ) elif schedule_name == "cosine": return betas_for_alpha_bar( num_diffusion_timesteps, lambda t: math.cos((t + 0.008) / 1.008 * math.pi / 2) ** 2, ) else: raise NotImplementedError(f"unknown beta schedule: {schedule_name}") def betas_for_alpha_bar(num_diffusion_timesteps, alpha_bar, max_beta=0.999): """ Create a beta schedule that discretizes the given alpha_t_bar function, which defines the cumulative product of (1-beta) over time from t = [0,1]. :param num_diffusion_timesteps: the number of betas to produce. :param alpha_bar: a lambda that takes an argument t from 0 to 1 and produces the cumulative product of (1-beta) up to that part of the diffusion process. :param max_beta: the maximum beta to use; use values lower than 1 to prevent singularities. """ betas = [] for i in range(num_diffusion_timesteps): t1 = i / num_diffusion_timesteps t2 = (i + 1) / num_diffusion_timesteps betas.append(min(1 - alpha_bar(t2) / alpha_bar(t1), max_beta)) return np.array(betas)


class GaussianDiffusion: def __init__(self, betas, predict_xstart=False, rescale_timesteps=False, model_mean_type='eps'): assert len(betas.shape) == 1, "betas must be 1-D" assert (betas > 0).all() and (betas <= 1).all() # 基础的属性 self.num_timesteps = betas.shape[0] self.predict_xstart = predict_xstart self.rescale_timesteps = rescale_timesteps self.model_mean_type = model_mean_type # 从原始数据到噪音数据映射相关的系数,即x_0 -> x_t相关的系数 self.betas = betas self.alphas = 1 - betas self.alphas_cumprod = np.cumprod(self.alphas, axis=0) self.sqrt_alphas_cumprod = np.sqrt(self.alphas_cumprod) self.sqrt_one_minus_alphas_cumprod = np.sqrt(1 - self.alphas_cumprod) # 使用x_t和x_0来预测x_{t-1}相关的系数 self.alphas_cumprod_prev = np.append(1., self.alphas_cumprod[:-1]) self.posterior_mean_coeff1 = self.betas * np.sqrt(self.alphas_cumprod_prev) / (1. - self.alphas_cumprod) self.posterior_mean_coeff2 = np.sqrt(self.alphas) * (1. - self.alphas_cumprod_prev) / ( 1. - self.alphas_cumprod) # 使用模型预测的噪音noise数据,以及x_t和新的噪音预测x_{t-1}相关的系数 self.sqrt_recip_alphas_cumprod = np.sqrt(1. / self.alphas_cumprod) self.sqrt_recip_m1_alphas_cumprod = np.sqrt(1 / self.alphas_cumprod - 1) self.posterior_variance = self.betas * (1. - self.alphas_cumprod_prev) / (1. - self.alphas_cumprod) self.model_noise_coeff = self.betas / self.sqrt_one_minus_alphas_cumprod # 方便计算损失函数需要的变量 self.log_one_minus_alphas_cumprod = np.log(1.0 - self.alphas_cumprod) self.posterior_log_variance_clipped = np.log( np.append(self.posterior_variance[1], self.posterior_variance[1:])) def q_sample(self, x_start, timesteps, noise=None): """ 通过使用原始数据x_start来预测第timesteps步的噪音数据,即:sample from q(x_t | x_0). :param x_start: 原始数据. :param timesteps: 像要获取第几步的噪音数据,这个timesteps是-1之后得到的,即0表示第一个步骤 :param noise: 噪音数据,如果设置了,就使用这个数据给数据加噪音,如果没有设置,则使用随机的噪音 :return: 返回一个加噪音后的数据。 """ if noise is None: noise = torch.randn_like(x_start) assert noise.shape == x_start.shape return ( self._extract_into_tensor(self.sqrt_alphas_cumprod, timesteps, x_start.shape) * x_start + self._extract_into_tensor(self.sqrt_one_minus_alphas_cumprod, timesteps, x_start.shape) * noise ) def q_mean_variance(self, x_start, timesteps): """ 这是一个工具类,是对`q_sample`的一个缩小版,这个函数只返回对应timesteps的均值和方差 :param x_start: 原始数据,这里只是需要它的shape,用来对各种参数进行`扩维`。 :param timesteps: 需要获取哪个时间步的噪音,这里的timesteps也是从0开始。 :return: 返回对应的均值,方差和log方差 """ mean = ( self._extract_into_tensor(self.sqrt_alphas_cumprod, timesteps, x_start.shape) * x_start ) variance = self._extract_into_tensor(1.0 - self.alphas_cumprod, timesteps, x_start.shape) log_variance = self._extract_into_tensor( self.log_one_minus_alphas_cumprod, timesteps, x_start.shape ) return mean, variance, log_variance def q_posterior_mean_variance(self, x_start, x_t, timesteps): """ 通过使用x_start和x_t来获取中间第`t`步的后验均值和方差,即q(x_{t-1} | x_t, x_0) :param x_start: 原始数据 :param x_t: 加`t`步噪音后的数据 :param timesteps: 需要获取哪个时间步的噪音,这里的timesteps也是从0开始。 :return: 返回对应的均值和方差 """ assert x_start.shape == x_t.shape posterior_mean = ( self._extract_into_tensor(self.posterior_mean_coeff1, timesteps, x_t.shape) * x_start + self._extract_into_tensor(self.posterior_mean_coeff2, timesteps, x_t.shape) * x_t ) posterior_variance = self._extract_into_tensor(self.posterior_variance, timesteps, x_t.shape) posterior_log_variance_clipped = self._extract_into_tensor( self.posterior_log_variance_clipped, timesteps, x_t.shape ) assert ( posterior_mean.shape[0] == posterior_variance.shape[0] == posterior_log_variance_clipped.shape[0] == x_start.shape[0] ) return posterior_mean, posterior_variance, posterior_log_variance_clipped def p_mean_variance( self, model, x, timesteps, clip_denoised=True, denoised_fn=None, model_kwargs=None ): """ 使用模型基于x_t对x_{t-1}的预测,即p(x_{t-1} | x_t);也能通过参数`predict_xstart`指定输出预测的`x_0` :param model: 预测噪音的模型,其接受一个`batch_size`的数据,和一个`(batch_size,)`的`timesteps` :param x: the [N x C x ...] tensor at time t. :param timesteps: a 1-D Tensor of timesteps. :param clip_denoised: 如果为`True`,则把数据缩放到`[-1, 1]`之间。 论文里说这确保了神经网络反向过程在从标准正态先验p(x_T)开始的一致缩放操作。 :param denoised_fn: 如果不为`None`,这个函数在用于采样之前应用于`x_start`预测,但在`clip_denoised`之后。 :param model_kwargs: 传给模型的参数,默认是`None` :return: a dict with the following keys: - 'mean': the model mean output. - 'variance': the model variance output. - 'pred_xstart': the prediction for x_0. """ if model_kwargs is None: model_kwargs = {} B, C = x.shape[:2] assert timesteps.shape == (B,) # 模型输出预测的eps model_output = model(x, self._scale_timesteps(timesteps), **model_kwargs) # for fixed large, we set the initial (log-)variance like so # to get a better decoder log likelihood. model_variance = np.append(self.posterior_variance[1], self.betas[1:]) model_log_variance = np.log(model_variance) # 抽取值 model_variance = self._extract_into_tensor(model_variance, timesteps, x.shape) model_log_variance = self._extract_into_tensor(model_log_variance, timesteps, x.shape) def process_xstart(xx): if denoised_fn is not None: return denoised_fn(xx, timesteps) if clip_denoised: return xx.clamp(-1, 1) if self.predict_xstart: pred_xstart = process_xstart(model_output) else: # model is used to predict eps pred_xstart = process_xstart( self._predict_xstart_from_eps(x_t=x, timesteps=timesteps, eps=model_output) ) # 获取pred_xstart和纯噪音的x_t之间的均值和方差 model_mean, _, _ = self.q_posterior_mean_variance( x_start=pred_xstart, x_t=x, timesteps=timesteps ) assert model_mean.shape == model_log_variance.shape == pred_xstart.shape == x.shape return { "mean": model_mean, "variance": model_variance, "log_variance": model_log_variance, "pred_xstart": pred_xstart, } def _predict_xstart_from_eps(self, x_t, timesteps, eps): """ 使用`eps`噪音数据来预测`x_0` :param x_t: 加`t`步噪音后的数据 :param timesteps: 需要获取哪个时间步的噪音,这里的timesteps也是从0开始。 :param eps: 噪音数据,通常是模型返回的噪音数据 :return: 返回使用`eps`计算出来的`x_0` """ assert x_t.shape == eps.shape return ( self._extract_into_tensor(self.sqrt_recip_alphas_cumprod, timesteps, x_t.shape) * x_t - self._extract_into_tensor(self.sqrt_recip_m1_alphas_cumprod, timesteps, x_t.shape) * eps ) @torch.no_grad() def p_sample( self, model, x, timesteps, clip_denoised=True, denoised_fn=None, model_kwargs=None, noise_scale=1.0): """ 在给定的timesteps下,从模型中采样x_{t-1}。 :param model: 用来获取噪音的模型 :param x: the current tensor at x_{t-1}. :param timesteps: 需要获取哪个时间步的样本,这里的timesteps也是从0开始。 :param clip_denoised: 如果为`True`,则把数据缩放到`[-1, 1]`之间。 :param denoised_fn: 如果不为`None`,这个函数在用于采样之前应用于`x_start`预测,但在`clip_denoised`之后。 :param model_kwargs: 传给模型的参数,默认是`None` :param noise_scale: 对噪音数据的缩放值,默认是1.0 :return: a dict containing the following keys: - 'sample': a random sample from the model. - 'pred_xstart': a prediction of x_0. """ out = self.p_mean_variance( model, x, timesteps, clip_denoised=clip_denoised, denoised_fn=denoised_fn, model_kwargs=model_kwargs, ) noise = torch.randn_like(x) nonzero_mask = ( (timesteps != 0).float().view(-1, *([1] * (len(x.shape) - 1))) ) # no noise when t == 0 sample = out["mean"] + nonzero_mask * torch.exp(0.5 * out["variance"]) * noise * noise_scale return { "sample": sample, "pred_xstart": out["pred_xstart"] } @torch.no_grad() def p_sample_loop( self, model, shape, noise=None, clip_denoised=True, denoised_fn=None, model_kwargs=None, device=None ): """ Generate samples from the model. :param model: the model module. :param shape: the shape of the samples, (N, C, H, W). :param noise: if specified, the noise from the encoder to sample. Should be of the same shape as `shape`. :param clip_denoised: if True, clip x_start predictions to [-1, 1]. :param denoised_fn: if not None, a function which applies to the x_start prediction before it is used to sample. :param model_kwargs: if not None, a dict of extra keyword arguments to pass to the model. This can be used for conditioning. :param device: if specified, the device to create the samples on. If not specified, use a model parameter's device. :return: a non-differentiable batch of samples. """ final = None for sample in self.p_sample_loop_progressive( model, shape, noise=noise, clip_denoised=clip_denoised, denoised_fn=denoised_fn, model_kwargs=model_kwargs, device=device, ): final = sample return final["sample"] def p_sample_loop_progressive( self, model, shape, noise=None, clip_denoised=True, denoised_fn=None, model_kwargs=None, device=None, ): """ Generate samples from the model and yield intermediate samples from each timestep of diffusion. Arguments are the same as p_sample_loop(). Returns a generator over dicts, where each dict is the return value of p_sample(). """ if device is None: device = next(model.parameters()).device assert isinstance(shape, (tuple, list)) # 高斯噪音数据 if noise is not None: img = noise else: img = torch.randn(*shape, device=device) # 从T....1,即[T-1, ..., 0]时间步 indices = list(range(self.num_timesteps))[::-1] for i in tqdm(iterable=indices, desc='sampling'): # 获取batch_size个第i个timestep,即 # [999, 999, 999, 999, 999, 999, 999, ...] # [998, 998, 998, 998, 998, 998, 998, ...] t = torch.tensor([i] * shape[0], device=device) with torch.no_grad(): out = self.p_sample( model, img, t, clip_denoised=clip_denoised, denoised_fn=denoised_fn, model_kwargs=model_kwargs, ) yield out img = out["sample"] def training_losses(self, model, x_start, timesteps, noise=None): """ Training loss calculation """ # Add noise to data assert timesteps.shape[0] == x_start.shape[0] if noise is None: noise = torch.randn_like(x_start, dtype=x_start.dtype) assert noise.shape == x_start.shape and noise.dtype == x_start.dtype # 从样本中抽取x_t x_t = self.q_sample(x_start=x_start, timesteps=timesteps, noise=noise) # 计算mes loss,可以根据model_mean_type类型计算 target = { 'xprev': self.q_posterior_mean_variance(x_start=x_start, x_t=x_t, timesteps=timesteps)[0], 'xstart': x_start, 'eps': noise }[self.model_mean_type] model_output = model(x_t, timesteps) assert model_output.shape == target.shape == x_start.shape loss = F.mse_loss(target, model_output) return loss def _scale_timesteps(self, timestep): """ 根据`rescale_timesteps`参数对`timestep`进行缩放 :param timestep: 时间步 :return: 处理后的timestep """ if self.rescale_timesteps: return timestep.float() * (1000.0 / self.num_timesteps) return timestep @staticmethod def _extract_into_tensor(arr, timesteps, broadcast_shape): """ 从`arr`数组中根据`timesteps`获取数据;并且对其进行扩维到`broadcast_shape` :param arr: 一个一维数组,对应的就是各种参数的数组 :param timesteps: 要提取的数组中的索引的张量,这个参数是一个一维的,`(batch_size,)` :param broadcast_shape: 需要返回数据的`shape` :return: 返回一个`[batch_size, 1, ..., 1]`形状的张量。最后一维为什么是`1`?因为每一个数据都是一个时间步 """ res = torch.from_numpy(arr).to(device=timesteps.device)[timesteps].float() while len(res.shape) < len(broadcast_shape): # 这是对res扩维 res = res[..., None] return res.expand(broadcast_shape)

4. 训练代码(这里只展示大致逻辑)

def train(generate_dir, num_epochs, batch_size, img_size, input_channel, shape, lr, timesteps, num_units, save_image_to, schedule_name): # 运行相关模块 betas = get_named_beta_schedule(schedule_name, timesteps) loader = LoadLiveDataset(batch_size=batch_size, resize=img_size) model = UNetModel(image_size=img_size, in_channels=input_channel, model_channels=num_units, out_channels=3, num_res_blocks=2, attention_resolutions=(16, 8)).to(device()) optimizer = Adam(model.parameters(), lr=lr) gd = GaussianDiffusion(betas) # 这里num_epochs + 1是保证后面抽样可以每100步抽样一次 for epoch in range(num_epochs + 1): print(f'epoch: {epoch}') for step, (features, _) in enumerate(loader.data_loader): optimizer.zero_grad() features = features.to(device()) # 获取随机的timestep random_batch_t = torch.randint(0, timesteps, (batch_size,), device=device()).long() loss = gd.training_losses(model=model, x_start=features, timesteps=random_batch_t) print(f"\t step: {step}, Loss: {loss.item()}") loss.backward() # 梯度clip,保持稳定性 torch.nn.utils.clip_grad_norm_(model.parameters(), 1.) optimizer.step() with torch.no_grad(): # save generated images if (epoch + 1) % 100 == 0: # 保存的文件名 file_name = f'sample-{epoch}-{step}.png' # 从模型随机抽样,一步步从1000开始抽样,直到最后一张x_0 all_images = gd.p_sample_loop(model, shape=shape, device=device()) # 这里是把数据从[-1, 1] -> [0, 1] all_images = (all_images + 1) * 0.5 if save_image_to == 'local': # 保存数据到本地 if not pathlib.Path.exists(generate_dir): pathlib.Path.mkdir(generate_dir) # save_image相关操作: [0, 1] -> [0, 255] + 0.5再取整(这里加0.5是保证四舍五入),同时 CWH ->WHC,和转成numpy格式 save_image(all_images, str(generate_dir + file_name), nrow=batch_size // 8)





