由于度量指标是在keras.Model
的train_step
函数内运行的,因此在不改变API的情况下过滤掉禁用训练的度量指标需要对keras.Model
进行子类化。
我们定义一个简单的度量指标包装器:
class TrainDisabledMetric(Metric):
def __init__(self, metric: Metric):
super().__init__(name=metric.name)
self._metric = metric
def update_state(self, *args, **kwargs):
return self._metric.update_state(*args, **kwargs)
def reset_state(self):
return self._metric.reset_state()
def result(self):
return self._metric.result()
并且通过子类化 keras.Model
在训练期间筛选出那些指标:
class CustomModel(keras.Model):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
def compile(self, optimizer='rmsprop', loss=None, metrics=None,
loss_weights=None, weighted_metrics=None, run_eagerly=None,
steps_per_execution=None, jit_compile=None, **kwargs):
from_serialized = kwargs.get('from_serialized', False)
super().compile(optimizer, loss, metrics=metrics, loss_weights=loss_weights,
weighted_metrics=weighted_metrics, run_eagerly=run_eagerly,
steps_per_execution=steps_per_execution,
jit_compile=jit_compile, **kwargs)
self.on_train_compiled_metrics = self.compiled_metrics
if metrics is not None:
def get_on_train_traverse_tree(structure):
flat = tf.nest.flatten(structure)
on_train = [not isinstance(e, TrainDisabledMetric) for e in flat]
full_tree = tf.nest.pack_sequence_as(structure, on_train)
return get_traverse_shallow_structure(lambda s: any(tf.nest.flatten(s)),
full_tree)
on_train_sub_tree = get_on_train_traverse_tree(metrics)
flat_on_train = flatten_up_to(on_train_sub_tree, metrics)
def clean_tree(tree):
if isinstance(tree, list):
_list = []
for t in tree:
r = clean_tree(t)
if r:
_list.append(r)
return _list
elif isinstance(tree, dict):
_tree = {}
for k, v in tree.items():
r = clean_tree(v)
if r:
_tree[k] = r
return _tree
else:
return tree
pruned_on_train_sub_tree = clean_tree(on_train_sub_tree)
pruned_flat_on_train = [m for keep, m in
zip(tf.nest.flatten(on_train_sub_tree),
flat_on_train) if keep]
on_train_metrics = tf.nest.pack_sequence_as(pruned_on_train_sub_tree,
pruned_flat_on_train)
self.on_train_compiled_metrics = compile_utils.MetricsContainer(
on_train_metrics, weighted_metrics=None, output_names=self.output_names,
from_serialized=from_serialized)
def train_step(self, data):
x, y, sample_weight = data_adapter.unpack_x_y_sample_weight(data)
with tf.GradientTape() as tape:
y_pred = self(x, training=True)
loss = self.compute_loss(x, y, y_pred, sample_weight)
self._validate_target_and_loss(y, loss)
self.optimizer.minimize(loss, self.trainable_variables, tape=tape)
return self.compute_metrics(x, y, y_pred, sample_weight, training=True)
def compute_metrics(self, x, y, y_pred, sample_weight, training=False):
del x
if training:
self.on_train_compiled_metrics.update_state(y, y_pred, sample_weight)
metrics = self.on_train_metrics
else:
self.compiled_metrics.update_state(y, y_pred, sample_weight)
metrics = self.metrics
return_metrics = {}
for metric in metrics:
result = metric.result()
if isinstance(result, dict):
return_metrics.update(result)
else:
return_metrics[metric.name] = result
return return_metrics
@property
def on_train_metrics(self):
metrics = []
if self._is_compiled:
if self.compiled_loss is not None:
metrics += self.compiled_loss.metrics
if self.on_train_compiled_metrics is not None:
metrics += self.on_train_compiled_metrics.metrics
for l in self._flatten_layers():
metrics.extend(l._metrics)
return metrics
现在,给出一个keras模型,我们可以将其封装并编译,禁用训练指标。
model: keras.Model = ...
custom_model = CustomModel(inputs=model.input, outputs=model.output)
train_enabled_metrics = [tf.keras.metrics.SparseCategoricalAccuracy()]
train_disabled_metrics = [
TrainDisabledMetric(tf.keras.metrics.SparseCategoricalCrossentropy())]
metrics = train_enabled_metrics + train_disabled_metrics
custom_model.compile(optimizer=tf.keras.optimizers.Adam(0.001),
loss=tf.keras.losses.SparseCategoricalCrossentropy(
from_logits=True), metrics=metrics, )
custom_model.fit(ds_train, epochs=6, validation_data=ds_test, )
度量指标
SparseCategoricalCrossentropy
仅在验证期间计算:
Epoch 1/6
469/469 [==============================] - 2s 2ms/step - loss: 0.3522 - sparse_categorical_accuracy: 0.8366 - val_loss: 0.1978 - val_sparse_categorical_accuracy: 0.9086 - val_sparse_categorical_crossentropy: 1.3197
Epoch 2/6
469/469 [==============================] - 1s 1ms/step - loss: 0.1631 - sparse_categorical_accuracy: 0.9526 - val_loss: 0.1429 - val_sparse_categorical_accuracy: 0.9587 - val_sparse_categorical_crossentropy: 1.1910
Epoch 3/6
469/469 [==============================] - 1s 1ms/step - loss: 0.1178 - sparse_categorical_accuracy: 0.9654 - val_loss: 0.1139 - val_sparse_categorical_accuracy: 0.9661 - val_sparse_categorical_crossentropy: 1.1369
Epoch 4/6
469/469 [==============================] - 1s 1ms/step - loss: 0.0909 - sparse_categorical_accuracy: 0.9735 - val_loss: 0.0981 - val_sparse_categorical_accuracy: 0.9715 - val_sparse_categorical_crossentropy: 1.0434
Epoch 5/6
469/469 [==============================] - 1s 1ms/step - loss: 0.0735 - sparse_categorical_accuracy: 0.9784 - val_loss: 0.0913 - val_sparse_categorical_accuracy: 0.9721 - val_sparse_categorical_crossentropy: 0.9862
Epoch 6/6
469/469 [==============================] - 1s 1ms/step - loss: 0.0606 - sparse_categorical_accuracy: 0.9823 - val_loss: 0.0824 - val_sparse_categorical_accuracy: 0.9761 - val_sparse_categorical_crossentropy: 1.0024
K.switch(tf.convert_to_tensor(K.learning_phase() == 1, dtype=bool), tf.convert_to_tensor(0.0, dtype=tf.float64), actual_metric)
请注意,我使用了tf.float64
,因为在我的情况下,actual_metric 的返回类型是tf.float64
。 - BenHeid