企业🤖AI Agent构建引擎,智能编排和调试,一键部署,支持私有化部署方案 广告
# 十二、使用 TensorFlow 自定义模型并训练 > 译者:[@SeanCheney](https://www.jianshu.com/u/130f76596b02) 目前为止,我们只是使用了 TensorFlow 的高级 API —— tf.keras,它的功能很强大:搭建了各种神经网络架构,包括回归、分类网络、Wide & Deep 网络、自归一化网络,使用了各种方法,包括批归一化、dropout 和学习率调度。事实上,你在实际案例中 95%碰到的情况只需要 tf.keras 就足够了(和 tf.data,见第 13 章)。现在来深入学习 TensorFlow 的低级 Python API。当你需要实现自定义损失函数、自定义标准、层、模型、初始化器、正则器、权重约束时,就需要低级 API 了。甚至有时需要全面控制训练过程,例如使用特殊变换或对约束梯度时。这一章就会讨论这些问题,还会学习如何使用 TensorFlow 的自动图生成特征提升自定义模型和训练算法。首先,先来快速学习下 TensorFlow。 > 笔记:TensorFlow 2.0(beta)是 2019 年六月发布的,相比前代更易使用。本书第一版使用的是 TF 1,这一版使用的是 TF 2。 ## TensorFlow 速览 TensorFlow 是一个强大的数值计算库,特别适合做和微调大规模机器学习(但也可以用来做其它的重型计算)。TensorFlow 是谷歌大脑团队开发的,支持了谷歌的许多大规模服务,包括谷歌云对话、谷歌图片和谷歌搜索。TensorFlow 是 2015 年 11 月开源的,(按文章引用、公司采用、GitHub 星数)是目前最流行的深度学习库。无数的项目是用 TensorFlow 来做各种机器学习任务,包括图片分类、自然语言处理、推荐系统和时间序列预测。TensorFlow 提供的功能如下: * TensorFlow 的核心与 NumPy 很像,但 TensorFlow 支持 GPU; * TensorFlow 支持(多设备和服务器)分布式计算; * TensorFlow 使用了即时 JIT 编译器对计算速度和内存使用优化。编译器的工作是从 Python 函数提取出计算图,然后对计算图优化(比如剪切无用的节点),最后高效运行(比如自动并行运行独立任务); * 计算图可以导出为迁移形式,因此可以在一个环境中训练一个 TensorFlow 模型(比如使用 Python 或 Linux),然后在另一个环境中运行(比如在安卓设备上用 Java 运行); * TensorFlow 实现了自动微分,并提供了一些高效的优化器,比如 RMSProp 和 NAdam,因此可以容易的最小化各种损失函数。 基于上面这些特点,TensorFlow 还提供了许多其他功能:最重要的是 tf.keras,还有数据加载和预处理操作(tf.data,tf.io 等等),图片处理操作(tf.image),信号处理操作(tf.signal),等等(图 12-1 总结了 TensorFlow 的 Python API) ![](https://img.kancloud.cn/bd/73/bd73df22931c05d41f1eb473949a3830_1440x993.png)图 12-1 TensorFlow 的 Python API > 提示:这一章会介绍 TensorFlow API 的多个包和函数,但来不及介绍全部,所以读者最好自己花点时间好好看看 API。TensorFlow 的 API 十分丰富,且文档详实。 TensorFlow 的低级操作都是用高效的 C++实现的。许多操作有多个实现,称为`核`:每个核对应一个具体的设备型号,比如 CPU、GPU,甚至 TPU(张量处理单元)。GPU 通过将任务分成小块,在多个 GPU 线程中并行运行,可以极大提高提高计算的速度。TPU 更快:TPU 是自定义的 ASIC 芯片,专门用来做深度学习运算的(第 19 章会讨论适合使用 GPU 和 TPU)。 TensorFlow 的架构见图 12-2。大多数时候你的代码使用高级 API 就够了(特别是 tf.keras 和 tf.data),但如果需要更大的灵活性,就需要使用低级 Python API,来直接处理张量。TensorFlow 也支持其它语言的 API。任何情况下,甚至是跨设备和机器的情况下,TensorFlow 的执行引擎都会负责高效运行。 ![](https://img.kancloud.cn/50/0d/500dd977856d7128b10076649ebabb37_1440x785.png)图 12-2 TensorFlow 的架构 TensorFlow 不仅可以运行在 Windows、Linux 和 macOS 上,也可以运行在移动设备上(使用 TensorFlow Lite),包括 iOS 和安卓(见第 19 章)。如果不想使用 Python API,还可以使用 C++、Java、Go 和 Swift 的 API。甚至还有 JavaScript 的实现 TensorFlow.js,它可以直接在浏览器中运行。 TensorFlow 不只有这些库。TensorFlow 处于一套可扩展的生态系统库的核心位置。首先,TensorBoard 可以用来可视化。其次,TensorFlow Extended(TFX),是谷歌推出的用来生产化的库,包括:数据确认、预处理、模型分析和服务(使用 TF Serving,见第 19 章)。谷歌的 TensorFlow Hub 上可以方便下载和复用预训练好的神经网络。你还可以从 TensorFlow 的 model garden([https://github.com/tensorflow/models/](https://links.jianshu.com/go?to=https%3A%2F%2Fgithub.com%2Ftensorflow%2Fmodels%2F))获取许多神经网络架构,其中一些是预训练好的。[TensorFlow Resources](https://links.jianshu.com/go?to=https%3A%2F%2Fwww.tensorflow.org%2Fresources) 和 [*https://github.com/jtoy/awesome-tensorflow*](https://links.jianshu.com/go?to=https%3A%2F%2Fgithub.com%2Fjtoy%2Fawesome-tensorflow)上有更多的资源。你可以在 GitHub 上找到数百个 TensorFlow 项目,无论干什么都可以方便地找到现成的代码。 > 提示:越来越多的 ML 论文都附带了实现过程,一些甚至带有预训练模型。可以在[*https://paperswithcode.com/*](https://links.jianshu.com/go?to=https%3A%2F%2Fpaperswithcode.com%2F)找到。 最后,TensorFlow 有一支热忱满满的开发者团队,也有庞大的社区。要是想问技术问题,可以去[*http://stackoverflow.com/*](https://links.jianshu.com/go?to=http%3A%2F%2Fstackoverflow.com%2F) ,问题上打上 tensorflow 和 python 标签。还可以在[GitHub](https://links.jianshu.com/go?to=https%3A%2F%2Fgithub.com%2Ftensorflow%2Ftensorflow)上提 bug 和新功能。一般的讨论可以去谷歌群组([https://groups.google.com/a/tensorflow.org/forum/](https://links.jianshu.com/go?to=https%3A%2F%2Fgroups.google.com%2Fa%2Ftensorflow.org%2Fforum%2F))。 下面开始写代码! ## 像 NumPy 一样使用 TensorFlow TensorFlow 的 API 是围绕张量(tensor)展开的,从一个操作流动(flow)到另一个操作,所以名字叫做 TensorFlow。张量通常是一个多维数组(就像 NumPy 的`ndarray`),但也可以是标量(即简单值,比如 42)。张量对于自定义的损失函数、标准、层等等非常重要,接下来学习如何创建和操作张量。 ### 张量和运算 使用`tf.constant()`创建张量。例如,下面的张量表示的是两行三列的浮点数矩阵: ```py >>> tf.constant([[1., 2., 3.], [4., 5., 6.]]) # matrix <tf.Tensor: id=0, shape=(2, 3), dtype=float32, numpy= array([[1., 2., 3.], [4., 5., 6.]], dtype=float32)> >>> tf.constant(42) # 标量 <tf.Tensor: id=1, shape=(), dtype=int32, numpy=42> ``` 就像`ndarray`一样,`tf.Tensor`也有形状和数据类型(`dtype`): ```py >>> t = tf.constant([[1., 2., 3.], [4., 5., 6.]]) >>> t.shape TensorShape([2, 3]) >>> t.dtype tf.float32 ``` 索引和 NumPy 中很像: ```py >>> t[:, 1:] <tf.Tensor: id=5, shape=(2, 2), dtype=float32, numpy= array([[2., 3.], [5., 6.]], dtype=float32)> >>> t[..., 1, tf.newaxis] <tf.Tensor: id=15, shape=(2, 1), dtype=float32, numpy= array([[2.], [5.]], dtype=float32)> ``` 最重要的,所有张量运算都可以执行: ```py >>> t + 10 <tf.Tensor: id=18, shape=(2, 3), dtype=float32, numpy= array([[11., 12., 13.], [14., 15., 16.]], dtype=float32)> >>> tf.square(t) <tf.Tensor: id=20, shape=(2, 3), dtype=float32, numpy= array([[ 1., 4., 9.], [16., 25., 36.]], dtype=float32)> >>> t @ tf.transpose(t) <tf.Tensor: id=24, shape=(2, 2), dtype=float32, numpy= array([[14., 32.], [32., 77.]], dtype=float32)> ``` 可以看到,`t + 10`等同于调用`tf.add(t, 10)`,`-`和`*`也支持。`@`运算符是在 Python3.5 中出现的,用于矩阵乘法,等同于调用函数`tf.matmul()`。 可以在 tf 中找到所有基本的数学运算(`tf.add()`、`tf.multiply()`、`tf.square()`、`tf.exp()`、`tf.sqrt()`),以及 NumPy 中的大部分运算(比如`tf.reshape()`、`tf.squeeze()`、`tf.tile()`)。一些 tf 中的函数与 NumPy 中不同,例如,`tf.reduce_mean()`、`tf.reduce_sum()`、`tf.reduce_max()`、`tf.math.log()`等同于`np.mean()`、`np.sum()`、`np.max()`和`np.log()`。当函数名不同时,通常都是有原因的。例如,TensorFlow 中必须使用`tf.transpose(t)`,不能像 NumPy 中那样使用`t.T`。原因是函数`tf.transpose(t)`所做的和 NumPy 的属性`T`并不完全相同:在 TensorFlow 中,是使用转置数据的复制来生成张量的,而在 NumPy 中,`t.T`是数据的转置视图。相似的,`tf.reduce_sum()`操作之所以这么命名,是因为它的 GPU 核(即 GPU 实现)所采用的 reduce 算法不能保证元素相加的顺序,因为 32 位的浮点数精度有限,每次调用的结果可能会有细微的不同。`tf.reduce_mean()`也是这样(`tf.reduce_max()`结果是确定的)。 > 笔记:许多函数和类都有假名。比如,`tf.add()`和`tf.math.add()`是相同的。这可以让 TensorFlow 对于最常用的操作有简洁的名字,同时包可以有序安置。 > Keras 的低级 API > Keras API 有自己的低级 API,位于`keras.backend`,包括:函数`square()`、`exp()`、`sqrt()`。在`tf.keras`中,这些函数通常通常只是调用对应的 TensorFlow 操作。如果你想写一些可以迁移到其它 Keras 实现上,就应该使用这些 Keras 函数。但是这些函数不多,所以这本书里就直接使用 TensorFlow 的运算了。下面是一个简单的使用了`keras.backend`的例子,简记为`k`: > > ```py > >>> from tensorflow import keras > >>> K = keras.backend > >>> K.square(K.transpose(t)) + 10 > <tf.Tensor: id=39, shape=(3, 2), dtype=float32, numpy= > array([[11., 26.], > [14., 35.], > [19., 46.]], dtype=float32)> > ``` ### 张量和 NumPy 张量和 NumPy 融合地非常好:使用 NumPy 数组可以创建张量,张量也可以创建 NumPy 数组。可以在 NumPy 数组上运行 TensorFlow 运算,也可以在张量上运行 NumPy 运算: ```py >>> a = np.array([2., 4., 5.]) >>> tf.constant(a) <tf.Tensor: id=111, shape=(3,), dtype=float64, numpy=array([2., 4., 5.])> >>> t.numpy() # 或 np.array(t) array([[1., 2., 3.], [4., 5., 6.]], dtype=float32) >>> tf.square(a) <tf.Tensor: id=116, shape=(3,), dtype=float64, numpy=array([4., 16., 25.])> >>> np.square(t) array([[ 1., 4., 9.], [16., 25., 36.]], dtype=float32) ``` > 警告:NumPy 默认使用 64 位精度,TensorFlow 默认用 32 位精度。这是因为 32 位精度通常对于神经网络就足够了,另外运行地更快,使用的内存更少。因此当你用 NumPy 数组创建张量时,一定要设置`dtype=tf.float32`。 ### 类型转换 类型转换对性能的影响非常大,并且如果类型转换是自动完成的,不容易被注意到。为了避免这样,TensorFlow 不会自动做任何类型转换:只是如果用不兼容的类型执行了张量运算,TensorFlow 就会报异常。例如,不能用浮点型张量与整数型张量相加,也不能将 32 位张量与 64 位张量相加: ```py >>> tf.constant(2.) + tf.constant(40) Traceback[...]InvalidArgumentError[...]expected to be a float[...] >>> tf.constant(2.) + tf.constant(40., dtype=tf.float64) Traceback[...]InvalidArgumentError[...]expected to be a double[...] ``` 这点可能一开始有点恼人,但是有其存在的理由。如果真的需要转换类型,可以使用`tf.cast()`: ```py >>> t2 = tf.constant(40., dtype=tf.float64) >>> tf.constant(2.0) + tf.cast(t2, tf.float32) <tf.Tensor: id=136, shape=(), dtype=float32, numpy=42.0> ``` ### 变量 到目前为止看到的`tf.Tensor`值都是不能修改的。意味着不能使用常规张量实现神经网络的权重,因为权重必须要能被反向传播调整。另外,其它的参数也需要随着时间调整(比如,动量优化器要跟踪过去的梯度)。此时需要的是`tf.Variable`: ```py >>> v = tf.Variable([[1., 2., 3.], [4., 5., 6.]]) >>> v <tf.Variable 'Variable:0' shape=(2, 3) dtype=float32, numpy= array([[1., 2., 3.], [4., 5., 6.]], dtype=float32)> ``` `tf.Variable`和`tf.Tensor`很像:可以运行同样的运算,可以配合 NumPy 使用,也要注意类型。可以使用`assign()`方法对其就地修改(或`assign_add()`、`assign_sub()`)。使用切片的`assign()`方法可以修改独立的切片(直接赋值行不通),或使用`scatter_update()`、`scatter_nd_update()`方法: ```py v.assign(2 * v) # => [[2., 4., 6.], [8., 10., 12.]] v[0, 1].assign(42) # => [[2., 42., 6.], [8., 10., 12.]] v[:, 2].assign([0., 1.]) # => [[2., 42., 0.], [8., 10., 1.]] v.scatter_nd_update(indices=[[0, 0], [1, 2]], updates=[100., 200.]) # => [[100., 42., 0.], [8., 10., 200.]] ``` > 笔记:在实践中,很少需要手动创建变量,因为 Keras 有`add_weight()`方法可以自动来做。另外,模型参数通常会直接通过优化器更新,因此很少需要手动更新。 ### 其它数据结构 TensorFlow 还支持其它几种数据结构,如下(可以参考 notebook 的 Tensors and Operations 部分,或附录的 F): 稀疏张量(`tf.SparseTensor`) 高效表示含有许多 0 的张量。`tf.sparse`包含有对稀疏张量的运算。 张量数组(`tf.TensorArray`) 是张量的列表。有默认固定大小,但也可以做成动态的。列表中的张量必须形状相同,数据类型也相同。 嵌套张量(`tf.RaggedTensor`) 张量列表的静态列表,张量的形状和数据结构相同。`tf.ragged`包里有嵌套张量的运算。 字符串张量 类型是`tf.string`的常规张量,是字节串而不是 Unicode 字符串,因此如果你用 Unicode 字符串(比如,Python3 字符串 café)创建了一个字符串张量,就会自动被转换为 UTF-8(b"caf\xc3\xa9")。另外,也可以用`tf.int32`类型的张量表示 Unicode 字符串,其中每项表示一个 Unicode 码(比如,`[99, 97, 102, 233]`)。`tf.strings`包里有字节串和 Unicode 字符串的运算,以及二者转换的运算。要注意`tf.string`是原子性的,也就是说它的长度不出现在张量的形状中,一旦将其转换成了 Unicode 张量(即,含有 Unicode 码的`tf.int32`张量),长度才出现在形状中。 集合 表示为常规张量(或稀疏张量)。例如`tf.constant([[1, 2], [3, 4]])`表示两个集合{1, 2}和{3, 4}。通常,用张量的最后一个轴的矢量表示集合。集合运算可以用`tf.sets`包。 队列 用来在多个步骤之间保存张量。TensorFlow 提供了多种队列。先进先出(FIFO)队列 FIFOQueue,优先级队列 PriorityQueue,随机队列 RandomShuffleQueue,通过填充的不同形状的批次项队列 PaddingFIFOQueue。这些队列都在`tf.queue`包中。 有了张量、运算、变量和各种数据结构,就可以开始自定义模型和训练算法啦! ## 自定义模型和训练算法 先从简单又常见的任务开始,创建一个自定义的损失函数。 ### 自定义损失函数 假如你想训练一个回归模型,但训练集有噪音。你当然可以通过清除或修正异常值来清理数据集,但是这样还不够:数据集还是有噪音。此时,该用什么损失函数呢?均方差可能对大误差惩罚过重,导致模型不准确。均绝对值误差不会对异常值惩罚过重,但训练可能要比较长的时间才能收敛,训练模型也可能不准确。此时使用 Huber 损失(第 10 章介绍过)就比 MSE 好多了。目前官方 Keras API 中没有 Huber 损失,但 tf.keras 有(使用类`keras.losses.Huber`的实例)。就算 tf.keras 没有,实现也不难!只需创建一个函数,参数是标签和预测值,使用 TensorFlow 运算计算每个实例的损失: ```py def huber_fn(y_true, y_pred): error = y_true - y_pred is_small_error = tf.abs(error) < 1 squared_loss = tf.square(error) / 2 linear_loss = tf.abs(error) - 0.5 return tf.where(is_small_error, squared_loss, linear_loss) ``` > 警告:要提高性能,应该像这个例子使用矢量。另外,如果想利用 TensorFlow 的图特性,则只能使用 TensorFlow 运算。 最好返回一个包含实例的张量,其中每个实例都有一个损失,而不是返回平均损失。这么做的话,Keras 可以在需要时,使用类权重或样本权重(见第 10 章)。 现在,编译 Keras 模型时,就可以使用 Huber 损失来训练了: ```py model.compile(loss=huber_fn, optimizer="nadam") model.fit(X_train, y_train, [...]) ``` 仅此而已!对于训练中的每个批次,Keras 会调用函数`huber_fn()`计算损失,用损失来做梯度下降。另外,Keras 会从一开始跟踪总损失,并展示平均损失。 在保存这个模型时,这个自定义损失会发生什么呢? ### 保存并加载包含自定义组件的模型 因为 Keras 可以保存函数名,保存含有自定义损失函数的模型也不成问题。当加载模型时,你需要提供一个字典,这个字典可以将函数名和真正的函数映射起来。一般说来,当加载一个含有自定义对象的模型时,你需要将名字映射到对象上: ```py model = keras.models.load_model("my_model_with_a_custom_loss.h5", custom_objects={"huber_fn": huber_fn}) ``` 对于刚刚的代码,在-1 和 1 之间的误差被认为是“小”误差。如果要改变阈值呢?一个解决方法是创建一个函数,它可以产生一个可配置的损失函数: ```py def create_huber(threshold=1.0): def huber_fn(y_true, y_pred): error = y_true - y_pred is_small_error = tf.abs(error) < threshold squared_loss = tf.square(error) / 2 linear_loss = threshold * tf.abs(error) - threshold**2 / 2 return tf.where(is_small_error, squared_loss, linear_loss) return huber_fn model.compile(loss=create_huber(2.0), optimizer="nadam") ``` 但在保存模型时,`threshold`不能被保存。这意味在加载模型时(注意,给 Keras 的函数名是“Huber_fn”,不是创造这个函数的函数名),必须要指定`threshold`的值: ```py model = keras.models.load_model("my_model_with_a_custom_loss_threshold_2.h5", custom_objects={"huber_fn": create_huber(2.0)}) ``` 要解决这个问题,可以创建一个`keras.losses.Loss`类的子类,然后实现`get_config()`方法: ```py class HuberLoss(keras.losses.Loss): def __init__(self, threshold=1.0, **kwargs): self.threshold = threshold super().__init__(**kwargs) def call(self, y_true, y_pred): error = y_true - y_pred is_small_error = tf.abs(error) < self.threshold squared_loss = tf.square(error) / 2 linear_loss = self.threshold * tf.abs(error) - self.threshold**2 / 2 return tf.where(is_small_error, squared_loss, linear_loss) def get_config(self): base_config = super().get_config() return {**base_config, "threshold": self.threshold} ``` > 警告:Keras API 目前只使用子类来定义层、模型、调回和正则器。如果使用子类创建其它组件(比如损失、指标、初始化器或约束),它们不能迁移到其它 Keras 实现上。可能 Keras API 经过更新,就会支持所有组件了。 逐行看下这段代码: * 构造器接收`**kwargs`,并将其传递给父构造器,父构造器负责处理超参数:损失的`name`,要使用的、用于将单个实例的损失汇总的`reduction`算法。默认情况下是`"sum_over_batch_size"`,意思是损失是各个实例的损失之和,如果有样本权重,则做权重加权,再除以批次大小(不是除以权重之和,所以不是加权平均)。其它可能的值是`"sum"`和`None`。 * `call()`方法接受标签和预测值,计算所有实例的损失,并返回。 * `get_config()`方法返回一个字典,将每个超参数映射到值上。它首先调用父类的`get_config()`方法,然后将新的超参数加入字典(`{**x}语法是 Python 3.5 引入的`)。 当编译模型时,可以使用这个类的实例: ```py model.compile(loss=HuberLoss(2.), optimizer="nadam") ``` 保存模型时,阈值会一起保存;加载模型时,只需将类名映射到具体的类上: ```py model = keras.models.load_model("my_model_with_a_custom_loss_class.h5", custom_objects={"HuberLoss": HuberLoss}) ``` 保存模型时,Keras 调用损失实例的`get_config()`方法,将配置以 JSON 的形式保存在 HDF5 中。当加载模型时,会调用`HuberLoss`类的`from_config()`方法:这个方法是父类`Loss`实现的,创建一个类`Loss`的实例,将`**config`传递给构造器。 ### 自定义激活函数、初始化器、正则器和约束 Keras 的大多数功能,比如损失、正则器、约束、初始化器、指标、激活函数、层,甚至是完整的模型,都可以用相似的方法做自定义。大多数时候,需要写一个简单的函数,带有合适的输入和输出。下面的例子是自定义激活函数(等价于`keras.activations.softplus()`或`tf.nn.softplus()`),自定义 Glorot 初始化器(等价于`keras.initializers.glorot_normal()`),自定义ℓ<sub>1</sub>正则化器(等价于`keras.regularizers.l1(0.01)`),可以保证权重都是正值的自定义约束(等价于`equivalent to keras.constraints.nonneg()`或`tf.nn.relu()`): ```py def my_softplus(z): # return value is just tf.nn.softplus(z) return tf.math.log(tf.exp(z) + 1.0) def my_glorot_initializer(shape, dtype=tf.float32): stddev = tf.sqrt(2\. / (shape[0] + shape[1])) return tf.random.normal(shape, stddev=stddev, dtype=dtype) def my_l1_regularizer(weights): return tf.reduce_sum(tf.abs(0.01 * weights)) def my_positive_weights(weights): # return value is just tf.nn.relu(weights) return tf.where(weights < 0., tf.zeros_like(weights), weights) ``` 可以看到,参数取决于自定义函数的类型。这些自定义函数可以如常使用,例如: ```py layer = keras.layers.Dense(30, activation=my_softplus, kernel_initializer=my_glorot_initializer, kernel_regularizer=my_l1_regularizer, kernel_constraint=my_positive_weights) ``` 激活函数会应用到这个`Dense`层的输出上,结果会传递到下一层。层的权重会使用初始化器的返回值。在每个训练步骤,权重会传递给正则化函数以计算正则损失,这个损失会与主损失相加,得到训练的最终损失。最后,会在每个训练步骤结束后调用约束函数,经过约束的权重会替换层的权重。 如果函数有需要连同模型一起保存的超参数,需要对相应的类做子类,比如`keras.regularizers.Regularizer`,`keras.constraints.Constraint`,`keras.initializers.Initializer`,或 `keras.layers.Layer`(任意层,包括激活函数)。就像前面的自定义损失一样,下面是一个简单的ℓ<sub>1</sub>正则类,可以保存它的超参数`factor`(这次不必调用其父构造器或`get_config()`方法,因为它们不是父类定义的): ```py class MyL1Regularizer(keras.regularizers.Regularizer): def __init__(self, factor): self.factor = factor def __call__(self, weights): return tf.reduce_sum(tf.abs(self.factor * weights)) def get_config(self): return {"factor": self.factor} ``` 注意,你必须要实现损失、层(包括激活函数)和模型的`call()`方法,或正则化器、初始化器和约束的`__call__()`方法。对于指标,处理方法有所不同。 ### 自定义指标 损失和指标的概念是不一样的:梯度下降使用损失(比如交叉熵损失)来训练模型,因此损失必须是可微分的(至少是在评估点可微分),梯度不能在所有地方都是 0。另外,就算损失比较难解释也没有关系。相反的,指标(比如准确率)是用来评估模型的:指标的解释性一定要好,可以是不可微分的,或者可以在任何地方的梯度都是 0。 但是,在多数情况下,定义一个自定义指标函数和定义一个自定义损失函数是完全一样的。事实上,刚才创建的 Huber 损失函数也可以用来当指标(持久化也是同样的,只需要保存函数名“Huber_fn”就成): ```py model.compile(loss="mse", optimizer="nadam", metrics=[create_huber(2.0)]) ``` 对于训练中的每个批次,Keras 能计算该指标,并跟踪自周期开始的指标平均值。大多数时候,这样没有问题。但会有例外!比如,考虑一个二元分类器的准确性。第 3 章介绍过,准确率是真正值除以正预测数(包括真正值和假正值)。假设模型在第一个批次做了 5 个正预测,其中 4 个是正确的,准确率就是 80%。再假设模型在第二个批次做了 3 次正预测,但没有一个预测对,则准确率是 0%。如果对这两个准确率做平均,则平均值是 40%。但它不是模型在两个批次上的准确率!事实上,真正值总共有 4 个,正预测有 8 个,整体的准确率是 50%。我们需要的是一个能跟踪真正值和正预测数的对象,用该对象计算准确率。这就是类`keras.metrics.Precision`所做的: ```py >>> precision = keras.metrics.Precision() >>> precision([0, 1, 1, 1, 0, 1, 0, 1], [1, 1, 0, 1, 0, 1, 0, 1]) <tf.Tensor: id=581729, shape=(), dtype=float32, numpy=0.8> >>> precision([0, 1, 0, 0, 1, 0, 1, 1], [1, 0, 1, 1, 0, 0, 0, 0]) <tf.Tensor: id=581780, shape=(), dtype=float32, numpy=0.5> ``` 在这个例子中,我们创建了一个`Precision`对象,然后将其用作函数,将第一个批次的标签和预测传给它,然后传第二个批次的数据(这里也可以传样本权重)。数据和前面的真正值和正预测一样。第一个批次之后,正确率是 80%;第二个批次之后,正确率是 50%(这是完整过程的准确率,不是第二个批次的准确率)。这叫做流式指标(或者静态指标),因为他是一个批次接一个批次,逐次更新的。 任何时候,可以调用`result()`方法获取指标的当前值。还可以通过`variables`属性,查看指标的变量(跟踪正预测和负预测的数量),还可以用`reset_states()`方法重置变量: ```py >>> p.result() <tf.Tensor: id=581794, shape=(), dtype=float32, numpy=0.5> >>> p.variables [<tf.Variable 'true_positives:0' [...] numpy=array([4.], dtype=float32)>, <tf.Variable 'false_positives:0' [...] numpy=array([4.], dtype=float32)>] >>> p.reset_states() # both variables get reset to 0.0 ``` 如果想创建一个这样的流式指标,可以创建一个`keras.metrics.Metric`类的子类。下面的例子跟踪了完整的 Huber 损失,以及实例的数量。当查询结果时,就能返回比例值,该值就是平均 Huber 损失: ```py class HuberMetric(keras.metrics.Metric): def __init__(self, threshold=1.0, **kwargs): super().__init__(**kwargs) # handles base args (e.g., dtype) self.threshold = threshold self.huber_fn = create_huber(threshold) self.total = self.add_weight("total", initializer="zeros") self.count = self.add_weight("count", initializer="zeros") def update_state(self, y_true, y_pred, sample_weight=None): metric = self.huber_fn(y_true, y_pred) self.total.assign_add(tf.reduce_sum(metric)) self.count.assign_add(tf.cast(tf.size(y_true), tf.float32)) def result(self): return self.total / self.count def get_config(self): base_config = super().get_config() return {**base_config, "threshold": self.threshold} ``` 逐行看下代码: * 构造器使用`add_weight()`方法来创建用来跟踪多个批次的变量 —— 在这个例子中,就是 Huber 损失的和(`total`)和实例的数量(`count`)。如果愿意的话,可以手动创建变量。Keras 会跟中任何被设为属性的`tf.Variable`(更一般的讲,任何“可追踪对象”,比如层和模型)。 * 当将这个类的实例当做函数使用时会调用`update_state()`方法(正如`Precision`对象)。它能用每个批次的标签和预测值(还有样本权重,但这个例子忽略了样本权重)来更新变量。 * `result()`方法计算并返回最终值,在这个例子中,是返回所有实例的平均 Huber 损失。当你将指标用作函数时,`update_state()`方法先被调用,然后调用`result()`方法,最后返回输出。 * 还实现了`get_config()`方法,用以确保`threshold`和模型一起存储。 * `reset_states()`方法默认将所有值重置为 0.0(也可以改为其它值)。 > 笔记:Keras 能无缝处理变量持久化。 当用简单函数定义指标时,Keras 会在每个批次自动调用它,还能跟踪平均值,就和刚才的手工处理一模一样。因此,`HuberMetric`类的唯一好处是`threshold`可以进行保存。当然,一些指标,比如准确率,不能简单的平均化;对于这些例子,只能实现一个流式指标。 创建好了流式指标,再创建自定义层就很简单了。 ### 自定义层 有时候你可能想搭建一个架构,但 TensorFlow 没有提供默认实现。这种情况下,就需要创建自定义层。否则只能搭建出的架构会是简单重复的,包含相同且重复的层块,每个层块实际上就是一个层而已。比如,如果模型的层顺序是 A、B、C、A、B、C、A、B、C,则完全可以创建一个包含 A、B、C 的自定义层 D,模型就可以简化为 D、D、D。 如何创建自定义层呢?首先,一些层没有权重,比如`keras.layers.Flatten`或`keras.layers.ReLU`。如果想创建一个没有任何权重的自定义层,最简单的方法是协议个函数,将其包装进`keras.layers.Lambda`层。比如,下面的层会对输入做指数运算: ```py exponential_layer = keras.layers.Lambda(lambda x: tf.exp(x)) ``` 这个自定义层可以像任何其它层一样使用 Sequential API、Functional API 或 Subclassing API。你还可以将其用作激活函数(或者使用`activation=tf.exp`,`activation=keras.activations.exponential`,或者`activation="exponential"`)。当预测值的数量级不同时,指数层有时用在回归模型的输出层。 你可能猜到了,要创建自定义状态层(即,有权重的层),需要创建`keras.layers.Layer`类的子类。例如,下面的类实现了一个紧密层的简化版本: ```py class MyDense(keras.layers.Layer): def __init__(self, units, activation=None, **kwargs): super().__init__(**kwargs) self.units = units self.activation = keras.activations.get(activation) def build(self, batch_input_shape): self.kernel = self.add_weight( name="kernel", shape=[batch_input_shape[-1], self.units], initializer="glorot_normal") self.bias = self.add_weight( name="bias", shape=[self.units], initializer="zeros") super().build(batch_input_shape) # must be at the end def call(self, X): return self.activation(X @ self.kernel + self.bias) def compute_output_shape(self, batch_input_shape): return tf.TensorShape(batch_input_shape.as_list()[:-1] + [self.units]) def get_config(self): base_config = super().get_config() return {**base_config, "units": self.units, "activation": keras.activations.serialize(self.activation)} ``` 逐行看下代码: * 构造器将所有超参数作为参数(这个例子中,是`units`和`activation`),更重要的,它还接收一个`**kwargs`参数。接着初始化了父类,传给父类`kwargs`:它负责标准参数,比如`input_shape`、`trainable`和`name`。然后将超参数存为属性,使用`keras.activations.get()`函数(这个函数接收函数、标准字符串,比如“relu”、“selu”、或“None”),将`activation`参数转换为合适的激活函数。 * `build()`方法通过对每个权重调用`add_weight()`方法,创建层的变量。层第一次被使用时,调用`build()`方法。此时,Keras 能知道该层输入的形状,并传入`build()`方法,这对创建权重是必要的。例如,需要知道前一层的神经元数量,来创建连接权重矩阵(即,`"kernel"`):对应的是输入的最后一维的大小。在`build()`方法最后(也只是在最后),必须调用父类的`build()`方法:这步告诉 Keras 这个层建好了(或者设定`self.built=True`)。 * `call()`方法执行预想操作。在这个例子中,计算了输入`X`和层的核的矩阵乘法,加上了偏置矢量,对结果使用了激活函数,得到了该层的输出。 * `compute_output_shape()`方法只是返回了该层输出的形状。在这个例子中,输出和输入的形状相同,除了最后一维被替换成了层的神经元数。在 tf.keras 中,形状是`tf.TensorShape`类的实例,可以用`as_list()`转换为 Python 列表。 * `get_config()`方法和前面的自定义类很像。注意是通过调用`keras.activations.serialize()`,保存了激活函数的完整配置。 现在,就可以像其它层一样,使用`MyDense`层了! > 笔记:一般情况下,可以忽略`compute_output_shape()`方法,因为 tf.keras 能自动推断输出的形状,除非层是动态的(后面会看到动态层)。在其它 Keras 实现中,要么需要`compute_output_shape()`方法,要么默认输出形状和输入形状相同。 要创建一个有多个输入(比如`Concatenate`)的层,`call()`方法的参数应该是包含所有输入的元组。相似的,`compute_output_shape()`方法的参数应该是一个包含每个输入的批次形状的元组。要创建一个有多输出的层,`call()`方法要返回输出的列表,`compute_output_shape()`方法要返回批次输出形状的列表(每个输出一个形状)。例如,下面的层有两个输入和三个输出: ```py class MyMultiLayer(keras.layers.Layer): def call(self, X): X1, X2 = X return [X1 + X2, X1 * X2, X1 / X2] def compute_output_shape(self, batch_input_shape): b1, b2 = batch_input_shape return [b1, b1, b1] # 可能需要处理广播规则 ``` 这个层现在就可以像其它层一样使用了,但只能使用 Functional 和 Subclassing API,Sequential API 不成(只能使用单输入和单输出的层)。 如果你的层需要在训练和测试时有不同的行为(比如,如果使用`Dropout` 或 `BatchNormalization`层),那么必须给`call()`方法加上`training`参数,用这个参数确定该做什么。比如,创建一个在训练中(为了正则)添加高斯造影的层,但不改动训练(Keras 有一个层做了同样的事,`keras.layers.GaussianNoise`): ```py class MyGaussianNoise(keras.layers.Layer): def __init__(self, stddev, **kwargs): super().__init__(**kwargs) self.stddev = stddev def call(self, X, training=None): if training: noise = tf.random.normal(tf.shape(X), stddev=self.stddev) return X + noise else: return X def compute_output_shape(self, batch_input_shape): return batch_input_shape ``` 上面这些就能让你创建自定义层了!接下来看看如何创建自定义模型。 ### 自定义模型 第 10 章在讨论 Subclassing API 时,接触过创建自定义模型的类。说白了:创建`keras.Model`类的子类,创建层和变量,用`call()`方法完成模型想做的任何事。假设你想搭建一个图 12-3 中的模型。 ![](https://img.kancloud.cn/75/57/755797e492c0470e635b3e6b61c346ca_1440x1017.png)图 12-3 自定义模型案例:包含残差块层,残块层含有跳连接 输入先进入一个紧密层,然后进入包含两个紧密层和一个添加操作的残差块(第 14 章会看见,残差块将输入和输出相加),经过 3 次同样的残差块,再通过第二个残差块,最终结果通过一个紧密输出层。这个模型没什么意义,只是一个搭建任意结构(包含循环和跳连接)模型的例子。要实现这个模型,最好先创建`ResidualBlock`层,因为这个层要用好几次: ```py class ResidualBlock(keras.layers.Layer): def __init__(self, n_layers, n_neurons, **kwargs): super().__init__(**kwargs) self.hidden = [keras.layers.Dense(n_neurons, activation="elu", kernel_initializer="he_normal") for _ in range(n_layers)] def call(self, inputs): Z = inputs for layer in self.hidden: Z = layer(Z) return inputs + Z ``` 这个层稍微有点特殊,因为它包含了其它层。用 Keras 来实现:自动检测`hidden`属性包含可追踪对象(即,层),内含层的变量可以自动添加到整层的变量列表中。类的其它部分很好懂。接下来,使用 Subclassing API 定义模型: ```py class ResidualRegressor(keras.Model): def __init__(self, output_dim, **kwargs): super().__init__(**kwargs) self.hidden1 = keras.layers.Dense(30, activation="elu", kernel_initializer="he_normal") self.block1 = ResidualBlock(2, 30) self.block2 = ResidualBlock(2, 30) self.out = keras.layers.Dense(output_dim) def call(self, inputs): Z = self.hidden1(inputs) for _ in range(1 + 3): Z = self.block1(Z) Z = self.block2(Z) return self.out(Z) ``` 在构造器中创建层,在`call()`方法中使用。这个模型可以像其它模型那样来使用(编译、拟合、评估、预测)。如果你还想使用`save()`方法保存模型,使用`keras.models.load_model()`方法加载模型,则必须在`ResidualBlock`类和`ResidualRegressor`类中实现`get_config()`方法。另外,可以使用`save_weights()`方法和`load_weights()`方法保存和加载权重。 `Model`类是`Layer`类的子类,因此模型可以像层一样定义和使用。但是模型还有一些其它的功能,包括`compile()`、`fit()`、`evaluate()` 和`predict()`(还有一些变量),还有`get_layers()`方法(它能通过名字或序号返回模型的任意层)、`save()`方法(支持`keras.models.load_model()`和`keras.models.clone_model()`)。 > 提示:如果模型提供的功能比层多,为什么不讲每一个层定义为模型呢?技术上当然可以这么做,但对内部组件和模型(即,层或可重复使用的层块)加以区别,可以更加清晰。前者应该是`Layer`类的子类,后者应该是`Model`类的子类。 掌握了上面的方法,你就可以使用 Sequential API、Functional API、Subclassing API 搭建几乎任何文章上的模型了。为什么是“几乎”?因为还有些内容需要掌握:首先,如何基于模型内部定义损失或指标,第二,如何搭建自定义训练循环。 ### 基于模型内部的损失和指标 前面的自定义损失和指标都是基于标签和预测(或者还有样本权重)。有时,你可能想基于模型的其它部分定义损失,比如隐藏层的权重或激活函数。这么做,可以是处于正则的目的,或监督模型的内部。 要基于模型内部自定义损失,需要先做基于这些组件的计算,然后将结果传递给`add_loss()`方法。例如,自定义一个包含五个隐藏层加一个输出层的回归 MLP 模型。这个自定义模型基于上层的隐藏层,还有一个辅助的输出。和辅助输出关联的损失,被称为重建损失(见第 17 章):它是重建和输入的均方差。通过将重建误差添加到主损失上,可以鼓励模型通过隐藏层保留尽量多的信息,即便是那些对回归任务没有直接帮助的信息。在实际中,重建损失有助于提高泛化能力(它是一个正则损失)。下面是含有自定义重建损失的自定义模型: ```py class ReconstructingRegressor(keras.Model): def __init__(self, output_dim, **kwargs): super().__init__(**kwargs) self.hidden = [keras.layers.Dense(30, activation="selu", kernel_initializer="lecun_normal") for _ in range(5)] self.out = keras.layers.Dense(output_dim) def build(self, batch_input_shape): n_inputs = batch_input_shape[-1] self.reconstruct = keras.layers.Dense(n_inputs) super().build(batch_input_shape) def call(self, inputs): Z = inputs for layer in self.hidden: Z = layer(Z) reconstruction = self.reconstruct(Z) recon_loss = tf.reduce_mean(tf.square(reconstruction - inputs)) self.add_loss(0.05 * recon_loss) return self.out(Z) ``` 逐行看下代码: * 构造器搭建了一个有五个紧密层和一个紧密输出层的 DNN。 * `build()`方法创建了另一个紧密层,可以重建模型的输入。必须要在这里创建`build()`方法的原因,是单元的数量必须等于输入数,而输入数在调用`build()`方法之前是不知道的。 * `call()`方法处理所有五个隐藏层的输入,然后将结果传给重建层,重建层产生重建。 * `call()`方法然后计算重建损失(重建和输入的均方差),然后使用`add_loss()`方法,将其加到模型的损失列表上。注意,这里对重建损失乘以了 0.05(这是个可调节的超参数),做了缩小,以确保重建损失不主导主损失。 * 最后,`call()`方法将隐藏层的输出传递给输出层,然后返回输出。 相似的,可以加上一个基于模型内部的自定义指标。例如,可以在构造器中创建一个`keras.metrics.Mean`对象,然后在`call()`方法中调用它,传递给它`recon_loss`,最后通过`add_metric()`方法,将其添加到模型上。使用这种方式,在训练模型时,Keras 能展示每个周期的平均损失(损失是主损失加上 0,05 乘以重建损失),和平均重建误差。两者都会在训练过程中下降: ```py Epoch 1/5 11610/11610 [=============] [...] loss: 4.3092 - reconstruction_error: 1.7360 Epoch 2/5 11610/11610 [=============] [...] loss: 1.1232 - reconstruction_error: 0.8964 [...] ``` 在超过 99%的情况中,前面所讨论的内容已经足够搭建你想要的模型了,就算是包含复杂架构、损失和指标也行。但是,在某些极端情况,你还需要自定义训练循环。介绍之前,先来看看 TensorFlow 如何自动计算梯度。 ### 使用自动微分计算梯度 要搞懂如何使用自动微分自动计算梯度,来看一个例子: ```py def f(w1, w2): return 3 * w1 ** 2 + 2 * w1 * w2 ``` 如果你会微积分,就能算出这个函数对`w1`的偏导是`6 * w1 + 2 * w2`,还能算出它对`w2`的偏导是`2 * w1`。例如,在点`(w1, w2) = (5, 3)`,这两个偏导数分别是 36 和 10,在这个点的梯度矢量就是(36, 10)。但对于神经网络来说,函数会复杂得多,可能会有上完个参数,用手算偏导几乎是不可能的任务。一个解决方法是计算每个偏导的大概值,通过调节参数,查看输出的变化: ```py >>> w1, w2 = 5, 3 >>> eps = 1e-6 >>> (f(w1 + eps, w2) - f(w1, w2)) / eps 36.000003007075065 >>> (f(w1, w2 + eps) - f(w1, w2)) / eps 10.000000003174137 ``` 这种方法很容易实现,但只是大概。重要的是,需要对每个参数至少要调用一次`f()`(不是至少两次,因为可以只计算一次`f(w1, w2)`)。这样,对于大神经网络,就不怎么可控。所以,应该使用自动微分。TensorFlow 的实现很简单: ```py w1, w2 = tf.Variable(5.), tf.Variable(3.) with tf.GradientTape() as tape: z = f(w1, w2) gradients = tape.gradient(z, [w1, w2]) ``` 先定义了两个变量`w1` 和 `w2`,然后创建了一个`tf.GradientTape`上下文,它能自动记录变脸的每个操作,最后使用它算出结果`z`关于两个变量`[w1, w2]`的梯度。TensorFlow 计算的梯度如下: ```py >>> gradients [<tf.Tensor: id=828234, shape=(), dtype=float32, numpy=36.0>, <tf.Tensor: id=828229, shape=(), dtype=float32, numpy=10.0>] ``` 很好!不仅结果是正确的(准确度只受浮点误差限制),`gradient()`方法只逆向算了一次,无论有多少个变量,效率很高。 > 提示:为了节省内存,只将严格的最小值放在`tf.GradientTape()`中。另外,通过`在 tf.GradientTape()`中创建一个`tape.stop_recording()`来暂停记录。 当调用记录器的`gradient()`方法时,记录器会自动清零,所以调用两次`gradient()`就会报错: ```py with tf.GradientTape() as tape: z = f(w1, w2) dz_dw1 = tape.gradient(z, w1) # => tensor 36.0 dz_dw2 = tape.gradient(z, w2) # 运行时错误 ``` 如果需要调用`gradient()`一次以上,比续将记录器持久化,并在每次用完之后删除,释放资源: ```py with tf.GradientTape(persistent=True) as tape: z = f(w1, w2) dz_dw1 = tape.gradient(z, w1) # => tensor 36.0 dz_dw2 = tape.gradient(z, w2) # => tensor 10.0, works fine now! del tape ``` 默认情况下,记录器只会跟踪包含变量的操作,所以如果是计算`z`的梯度,`z`和变量没关系,结果就会是 None: ```py c1, c2 = tf.constant(5.), tf.constant(3.) with tf.GradientTape() as tape: z = f(c1, c2) gradients = tape.gradient(z, [c1, c2]) # returns [None, None] ``` 但是,你也可以强制记录器监视任何你想监视的张量,将它们当做变量来计算梯度: ```py with tf.GradientTape() as tape: tape.watch(c1) tape.watch(c2) z = f(c1, c2) gradients = tape.gradient(z, [c1, c2]) # returns [tensor 36., tensor 10.] ``` 在某些情况下,这么做会有帮助,比如当输入的波动很小,而激活函数结果波动很大时,要实现一个正则损失,就可以这么做:损失会基于激活函数结果,激活函数结果会基于输入。因为输入不是变量,就需要记录器监视输入。 大多数时候,梯度记录器被用来计算单一值(通常是损失)的梯度。这就是自动微分发挥长度的地方了。因为自动微分只需要一次向前传播一次向后传播,就能计算所有梯度。如果你想计算一个矢量的梯度,比如一个包含多个损失的矢量,TensorFlow 就会计算矢量和的梯度。因此,如果你需要计算单个梯度的话(比如每个损失相对于模型参数的梯度),你必须调用记录器的`jabobian()`方法:它能做反向模式的自动微分,一次计算完矢量中的所有损失(默认是并行的)。甚至还可以计算二级偏导,但在实际中用的不多(见 notebook 中的“自动微分计算梯度部分”)。 某些情况下,你可能想让梯度在部分神经网络停止传播。要这么做的话,必须使用`tf.stop_gradient()`函数。它能在前向传播中(比如`tf.identity()`)返回输入,并能阻止梯度反向传播(就像常量一样): ```py def f(w1, w2): return 3 * w1 ** 2 + tf.stop_gradient(2 * w1 * w2) with tf.GradientTape() as tape: z = f(w1, w2) # same result as without stop_gradient() gradients = tape.gradient(z, [w1, w2]) # => returns [tensor 30., None] ``` 最后,在计算梯度时可能还会碰到数值问题。例如,如果对于很大的输入,计算`my_softplus()`函数的梯度,结果会是 NaN: ```py >>> x = tf.Variable([100.]) >>> with tf.GradientTape() as tape: ... z = my_softplus(x) ... >>> tape.gradient(z, [x]) <tf.Tensor: [...] numpy=array([nan], dtype=float32)> ``` 这是因为使用自动微分计算这个函数的梯度,会有些数值方面的难点:因为浮点数的精度误差,自动微分最后会变成无穷除以无穷(结果是 NaN)。幸好,softplus 函数的导数是`1 / (1 + 1 / exp(x))`,它是数值稳定的。接着,让 TensorFlow 使用这个稳定的函数,通过装饰器`@tf.custom_gradient`计算`my_softplus()`的梯度,既返回正常输出,也返回计算导数的函数(注意:它会接收的输入是反向传播的梯度;根据链式规则,应该乘以函数的梯度): ```py @tf.custom_gradient def my_better_softplus(z): exp = tf.exp(z) def my_softplus_gradients(grad): return grad / (1 + 1 / exp) return tf.math.log(exp + 1), my_softplus_gradients ``` 计算好了`my_better_softplus()`的梯度,就算对于特别大的输入值,也能得到正确的结果(但是,因为指数运算,主输出还是会发生爆炸;绕过的方法是,当输出很大时,使用`tf.where()`返回输入)。 祝贺你!现在你就可以计算任何函数的梯度(只要函数在计算点可微就行),甚至可以阻止反向传播,还能写自己的梯度函数!TensorFlow 的灵活性还能让你编写自定义的训练循环。 ### 自定义训练循环 在某些特殊情况下,`fit()`方法可能不够灵活。例如,第 10 章讨论过的 Wide & Deep 论文使用了两个优化器:一个用于宽路线,一个用于深路线。因为`fit()`方法智能使用一个优化器(编译时设置的优化器),要实现这篇论文就需要写自定义循环。 你可能还想写自定义的训练循环,只是想让训练过程更加可控(也许你对`fit()`方法的细节并不确定)。但是,自定义训练循环会让代码变长、更容易出错、也难以维护。 > 提示:除非真的需要自定义,最好还是使用`fit()`方法,而不是自定义训练循环,特别是当你是在一个团队之中时。 首先,搭建一个简单的模型。不用编译,因为是要手动处理训练循环: ```py l2_reg = keras.regularizers.l2(0.05) model = keras.models.Sequential([ keras.layers.Dense(30, activation="elu", kernel_initializer="he_normal", kernel_regularizer=l2_reg), keras.layers.Dense(1, kernel_regularizer=l2_reg) ]) ``` 接着,创建一个小函数,它能从训练集随机采样一个批次的实例(第 13 章会讨论更便捷的 Data API): ```py def random_batch(X, y, batch_size=32): idx = np.random.randint(len(X), size=batch_size) return X[idx], y[idx] ``` 再定义一个可以展示训练状态的函数,包括步骤数、总步骤数、平均损失(用`Mean`指标计算),和其它指标: ```py def print_status_bar(iteration, total, loss, metrics=None): metrics = " - ".join(["{}: {:.4f}".format(m.name, m.result()) for m in [loss] + (metrics or [])]) end = "" if iteration < total else "\n" print("\r{}/{} - ".format(iteration, total) + metrics, end=end) ``` 这段代码不难,除非你对 Python 字符串的`{:.4f}`不熟:它的作用是保留四位小数。使用`\r`(回车)和`end=""`连用,保证状态条总是打印在一条线上。notebook 中,`print_status_bar()`函数包括进度条,也可以使用`tqdm`库。 有了这些准备,就可以开干了!首先,我们定义超参数、选择优化器、损失函数和指标(这个例子中是 MAE): ```py n_epochs = 5 batch_size = 32 n_steps = len(X_train) // batch_size optimizer = keras.optimizers.Nadam(lr=0.01) loss_fn = keras.losses.mean_squared_error mean_loss = keras.metrics.Mean() metrics = [keras.metrics.MeanAbsoluteError()] ``` 可以搭建自定义循环了: ```py for epoch in range(1, n_epochs + 1): print("Epoch {}/{}".format(epoch, n_epochs)) for step in range(1, n_steps + 1): X_batch, y_batch = random_batch(X_train_scaled, y_train) with tf.GradientTape() as tape: y_pred = model(X_batch, training=True) main_loss = tf.reduce_mean(loss_fn(y_batch, y_pred)) loss = tf.add_n([main_loss] + model.losses) gradients = tape.gradient(loss, model.trainable_variables) optimizer.apply_gradients(zip(gradients, model.trainable_variables)) mean_loss(loss) for metric in metrics: metric(y_batch, y_pred) print_status_bar(step * batch_size, len(y_train), mean_loss, metrics) print_status_bar(len(y_train), len(y_train), mean_loss, metrics) for metric in [mean_loss] + metrics: metric.reset_states() ``` 逐行看下代码: * 创建了两个嵌套循环:一个是给周期的,一个是给周期里面的批次的。 * 然后从训练集随机批次采样。 * 在`tf.GradientTape()`内部,对一个批次做了预测(将模型用作函数),计算其损失:损失等于主损失加上其它损失(在这个模型中,每层有一个正则损失)。因为`mean_squared_error()`函数给每个实例返回一个损失,使用`tf.reduce_mean()`计算平均值(如果愿意的话,每个实例可以用不同的权重)。正则损失已经转变为单个的标量,所以只需求和就成(使用`tf.add_n()`,它能将相同形状和数据类型的张量求和)。 * 接着,让记录器计算损失相对于每个可训练变量的梯度(不是所有的变量!),然后用优化器对梯度做梯度下降。 * 然后,更新(当前周期)平均损失和平均指标,显示状态条。 * 在每个周期结束后,再次展示状态条,使其完整,然后换行,重置平均损失和平均指标。 如果设定优化器的`clipnorm`或`clipvalue`超参数,就可以自动重置。如果你想对梯度做任何其它变换,在调用`apply_gradients()`方法之前,做变换就行。 如果你对模型添加了权重约束(例如,添加层时设置`kernel_constraint`或`bias_constraint`),你需要在`apply_gradients()`之后,更新训练循环,以应用这些约束: ```py for variable in model.variables: if variable.constraint is not None: variable.assign(variable.constraint(variable)) ``` 最重要的,这个训练循环没有处理训练和测试过程中,行为不一样的层(例如,`BatchNormalization`或`Dropout`)。要处理的话,需要调用模型,令`training=True`,并传播到需要这么设置的每一层。 可以看到,有这么多步骤都要做对才成,很容易出错。但另一方面,训练的控制权完全在你手里。 现在你知道如何自定义模型中的任何部分了,也知道如何训练算法了,接下来看看如何使用 TensorFlow 的自动图生成特征:它能显著提高自定义代码的速度,并且还是可迁移的(见第 19 章)。 ## TensorFlow 的函数和图 在 TensorFlow 1 中,图是绕不过去的(同时图也很复杂),因为图是 TensorFlow 的 API 的核心。在 TensorFlow 2 中,图还在,但不是核心了,使用也简单多了。为了演示其易用性,从一个三次方函数开始: ```py def cube(x): return x ** 3 ``` 可以用一个值调用这个函数,整数、浮点数都成,或者用张量来调用: ```py >>> cube(2) 8 >>> cube(tf.constant(2.0)) <tf.Tensor: id=18634148, shape=(), dtype=float32, numpy=8.0> ``` 现在,使用`tf.function()`将这个 Python 函数变为 TensorFlow 函数: ```py >>> tf_cube = tf.function(cube) >>> tf_cube <tensorflow.python.eager.def_function.Function at 0x1546fc080> ``` 可以像原生 Python 函数一样使用这个 TF 函数,可以返回同样的结果(张量): ```py >>> tf_cube(2) <tf.Tensor: id=18634201, shape=(), dtype=int32, numpy=8> >>> tf_cube(tf.constant(2.0)) <tf.Tensor: id=18634211, shape=(), dtype=float32, numpy=8.0> ``` `tf.function()`在底层分析了`cube()`函数的计算,然后生成了一个等价的计算图!可以看到,过程十分简单(下面会讲解过程)。另外,也可以使用`tf.function`作为装饰器,更常见一些: ```py @tf.function def tf_cube(x): return x ** 3 ``` 原生的 Python 函数通过 TF 函数的`python_function`属性仍然可用: ```py >>> tf_cube.python_function(2) 8 ``` TensorFlow 优化了计算图,删掉了没用的节点,简化了表达式(比如,1 + 2 会替换为 3),等等。当优化好的计算图准备好之后,TF 函数可以在图中,按合适的顺序高效执行运算(该并行的时候就并行)。作为结果,TF 函数比普通的 Python 函数快的做,特别是在做复杂计算时。大多数时候,根本没必要知道底层到底发生了什么,如果需要对 Python 函数加速,将其转换为 TF 函数就行。 另外,当你写的自定义损失函数、自定义指标、自定义层或任何其它自定义函数,并在 Keras 模型中使用的,Keras 都自动将其转换成了 TF 函数,不用使用`tf.function()`。 > 提示:创建自定义层或模型时,设置`dynamic=True`,可以让 Keras 不转化你的 Python 函数。另外,当调用模型的`compile()`方法时,可以设置`run_eagerly=True`。 默认时,TF 函数对每个独立输入的形状和数据类型的集合,生成了一个新的计算图,并缓存以备后续使用。例如,如果你调用`tf_cube(tf.constant(10))`,就会生成一个 int32 张量、形状是[]的计算图。如果你调用`tf_cube(tf.constant(20))`,会使用相同的计算图。但如果调用`tf_cube(tf.constant([10, 20]))`,就会生成一个 int32、形状是[2]的新计算图。这就是 TF 如何处理多态的(即变化的参数类型和形状)。但是,这只适用于张量参数:如果你将 Python 数值传给 TF,就会为每个独立值创建一个计算图:比如,调用`tf_cube(10)`和`tf_cube(20)`会产生两个计算图。 > 警告:如果用多个不同的 Python 数值调用 TF 函数,就会产生多个计算图,这样会减慢程勋,使用很多的内存(必须删掉 TF 函数才能释放)。Python 的值应该复赋值给尽量重复的参数,比如超参数,每层有多少个神经元。这可以让 TensorFlow 更好的优化模型中的变量。 ### 自动图和跟踪 TensorFlow 是如何生成计算图的呢?它先分析了 Python 函数源码,得出所有的数据流控制语句,比如 for 循环,while 循环,if 条件,还有 break、continue、return。这个第一步被称为自动图(AutoGraph)。TensorFlow 之所以要分析源码,试分析 Python 没有提供任何其它的方式来获取控制流语句:Python 提供了`__add__()`和`__mul__()`这样的魔术方法,但没有`__while__()`或`__if__()`这样的魔术方法。分析完源码之后,自动图中的所有控制流语句都被替换成相应的 TensorFlow 方法,比如`tf.while_loop()`(while 循环)和`tf.cond()`(if 判断)。例如,见图 12-4,自动图分析了 Python 函数`sum_squares()`的源码,然后变为函数`tf__sum_squares()`。在这个函数中,for 循环被替换成了`loop_body()`(包括原生的 for 循环)。然后是函数`for_stmt()`,调用这个函数会形成运算`tf.while_loop()`。 ![](https://img.kancloud.cn/98/3c/983c52120eecd56fabccd1a2431c6953_1440x773.png)图 12-4 TensorFlow 是如何使用自动图和跟踪生成计算图的? 然后,TensorFlow 调用这个“升级”方法,但没有向其传递参数,而是传递一个符号张量(symbolic tensor)——一个没有任何真实值的张量,只有名字、数据类型和形状。例如,如果调用`sum_squares(tf.constant(10))`,然后会调用`tf__sum_squares()`,其符号张量的类型是 int32,形状是[]。函数会以图模式运行,意味着每个 TensorFlow 运算会在图中添加一个表示自身的节点,然后输出`tensor(s)`(与常规模式相对,这被称为动态图执行,或动态模式)。在图模式中,TF 运算不做任何计算。如果你懂 TensorFlow 1,这应该很熟悉,因为图模式是默认模式。在图 12-4 中,可以看到`tf__sum_squares()`函数被调用,参数是符号张量,最后的图是跟踪中生成的。节点表示运算,箭头表示张量(生成的函数和图都简化了)。 > 提示:想看生成出来的函数源码的话,可以调用`tf.autograph.to_code(sum_squares.python_function)`。源码不美观,但可以用来调试。 ### TF 函数规则 大多数时候,将 Python 函数转换为 TF 函数是琐碎的:要用`@tf.function`装饰,或让 Keras 来负责。但是,也有一些规则: * 如果调用任何外部库,包括 NumPy,甚至是标准库,调用只会在跟踪中运行,不会是图的一部分。事实上,TensorFlow 图只能包括 TensorFlow 的构件(张量、运算、变量、数据集,等等)。因此,要确保使用的是`tf.reduce_sum()`而不是`np.sum()`,使用的是`tf.sort()`而不是内置的`sorted()`,等等。还要注意: 1. 如果定义了一个 TF 函数`f(x)`,它只返回`np.random.rand()`,当函数被追踪时,生成的是个随机数,因此`f(tf.constant(2.))`和`f(tf.constant(3.))`会返回同样的随机数,但`f(tf.constant([2., 3.]))`会返回不同的数。如果将`np.random.rand()`替换为`tf.random.uniform([])`,每次调用都会返回新的随机数,因为运算是图的一部分。 2. 如果你的非 TensorFlow 代码有副作用(比如日志,或更新 Python 计数器),则 TF 函数被调用时,副作用不一定发生,因为只有函数被追踪时才有效。 3. 你可以在`tf.py_function()`运算中包装任意的 Python 代码,但这么做的话会使性能下降,因为 TensorFlow 不能做任何图优化。还会破坏移植性,因为图只能在有 Python 的平台上跑起来(且安装上正确的库)。 * 你可以调用其它 Python 函数或 TF 函数,但是它们要遵守相同的规则,因为 TensorFlow 会在计算图中记录它们的运算。注意,其它函数不需要用`@tf.function`装饰。 * 如果函数创建了一个 TensorFlow 变量(或任意其它静态 TensorFlow 对象,比如数据集或队列),它必须在第一次被调用时创建 TF 函数,否则会导致异常。通常,最好在 TF 函数的外部创建变量(比如在自定义层的`build()`方法中)。如果你想将一个新值赋值给变量,要确保调用它的`assign()`方法,而不是使用`=`。 * Python 的源码可以被 TensorFlow 使用。如果源码用不了(比如,如果是在 Python shell 中定义函数,源码就访问不了,或者部署的是编译文件`*.pyc`),图的生成就会失败或者缺失功能。 * TensorFlow 只能捕获迭代张量或数据集的 for 循环。因此要确保使用`for i in tf.range(x)`,而不是`for i in range(x)`,否则循环不能在图中捕获,而是在会在追踪中运行。(如果 for 循环使用创建计算图的,这可能是你想要的,比如创建神经网络中的每一层)。 * 出于性能原因,最好使用矢量化的实现方式,而不是使用循环。 总结一下,这一章一开始介绍了 TensorFlow,然后是 TensorFlow 的低级 API,包括张量、运算、变量和特殊的数据结构。然后使用这些工具自定义了 tf.keras 中的几乎每个组件。最后,学习了 TF 函数如何提升性能,计算图是如何通过自动图和追踪生成的,在写 TF 函数时要遵守什么规则。(附录 G 介绍了生成图的内部黑箱) 下一章会学习如何使用 TensorFlow 高效加载和预处理数据。 # 练习 1. 如何用一句话描述 TensorFlow?它的主要特点是什么?能列举出其它流行的深度学习库吗? 2. TensorFlow 是 NumPy 的简单替换吗?二者有什么区别? 3. `tf.range(10)`和`tf.constant(np.arange(10))`能拿到相同的结果吗? 4. 列举出除了常规张量之外,TensorFlow 的其它六种数据结构? 5. 可以通过函数或创建`keras.losses.Loss`的子类来自定义损失函数。两种方法各在什么时候使用? 6. 相似的,自定义指标可以通过定义函数或创建`keras.metrics.Metric`的子类。两种方法各在什么时候使用? 7. 什么时候应该创建自定义层,而不是自定义模型? 8. 什么时候需要创建自定义的训练循环? 9. 自定义 Keras 组件可以包含任意 Python 代码吗,或者 Python 代码需要转换为 TF 函数吗? 10. 如果想让一个函数可以转换为 TF 函数,要遵守设么规则? 11. 什么时候需要创建一个动态 Keras 模型?怎么做?为什么不让所有模型都是动态的? 12. 实现一个具有层归一化的自定义层(第 15 章会用到): a. `build()`方法要定义两个可训练权重α 和 β,形状都是`input_shape[-1:]`,数据类型是`tf.float32`。α用 1 初始化,β用 0 初始化。 b. `call()`方法要计算每个实例的特征的平均值μ和标准差σ。你可以使用`tf.nn.moments(inputs, axes=-1, keepdims=True)`,它可以返回平均值μ和方差σ<sup>2</sup>(计算其平方根得到标准差)。函数返回`α⊗(X - μ)/(σ + ε) + β`,其中`⊗`表示元素级别惩罚,`ε`是平滑项(避免发生除以 0,而是除以 0.001)。 c. 确保自定义层的输出和`keras.layers.LayerNormalization`层的输出一致(或非常接近)。 13. 训练一个自定义训练循环,来处理 Fashion MNIST 数据集。 a. 展示周期、迭代,每个周期的平均训练损失、平均准确度(每次迭代会更新),还有每个周期结束后的验证集损失和准确度。 b. 深层和浅层使用不同的优化器,不同的学习率。 参考答案见附录 A。