脚本专栏 发布日期:2025/1/10 浏览次数:1
# 模型构造 inputs = keras.Input(shape=(784,), name='mnist_input') h1 = layers.Dense(64, activation='relu')(inputs) h1 = layers.Dense(64, activation='relu')(h1) outputs = layers.Dense(10, activation='softmax')(h1) model = keras.Model(inputs, outputs) # keras.utils.plot_model(model, 'net001.png', show_shapes=True) model.compile(optimizer=keras.optimizers.RMSprop(), loss=keras.losses.SparseCategoricalCrossentropy(), metrics=[keras.metrics.SparseCategoricalAccuracy()]) # 载入数据 (x_train, y_train), (x_test, y_test) = keras.datasets.mnist.load_data() x_train = x_train.reshape(60000, 784).astype('float32') /255 x_test = x_test.reshape(10000, 784).astype('float32') /255 x_val = x_train[-10000:] y_val = y_train[-10000:] x_train = x_train[:-10000] y_train = y_train[:-10000] # 训练模型 history = model.fit(x_train, y_train, batch_size=64, epochs=3, validation_data=(x_val, y_val)) print('history:') print(history.history) result = model.evaluate(x_test, y_test, batch_size=128) print('evaluate:') print(result) pred = model.predict(x_test[:2]) print('predict:') print(pred)
自定义指标只需继承Metric类, 并重写一下函数
_init_(self),初始化。
update_state(self,y_true,y_pred,sample_weight = None),它使用目标y_true和模型预测y_pred来更新状态变量。
result(self),它使用状态变量来计算最终结果。
reset_states(self),重新初始化度量的状态。
# 这是一个简单的示例,显示如何实现CatgoricalTruePositives指标,该指标计算正确分类为属于给定类的样本数量 class CatgoricalTruePostives(keras.metrics.Metric): def __init__(self, name='binary_true_postives', **kwargs): super(CatgoricalTruePostives, self).__init__(name=name, **kwargs) self.true_postives = self.add_weight(name='tp', initializer='zeros') def update_state(self, y_true, y_pred, sample_weight=None): y_pred = tf.argmax(y_pred) y_true = tf.equal(tf.cast(y_pred, tf.int32), tf.cast(y_true, tf.int32)) y_true = tf.cast(y_true, tf.float32) if sample_weight is not None: sample_weight = tf.cast(sample_weight, tf.float32) y_true = tf.multiply(sample_weight, y_true) return self.true_postives.assign_add(tf.reduce_sum(y_true)) def result(self): return tf.identity(self.true_postives) def reset_states(self): self.true_postives.assign(0.) model.compile(optimizer=keras.optimizers.RMSprop(1e-3), loss=keras.losses.SparseCategoricalCrossentropy(), metrics=[CatgoricalTruePostives()]) model.fit(x_train, y_train, batch_size=64, epochs=3)
# 以定义网络层的方式添加网络loss class ActivityRegularizationLayer(layers.Layer): def call(self, inputs): self.add_loss(tf.reduce_sum(inputs) * 0.1) return inputs inputs = keras.Input(shape=(784,), name='mnist_input') h1 = layers.Dense(64, activation='relu')(inputs) h1 = ActivityRegularizationLayer()(h1) h1 = layers.Dense(64, activation='relu')(h1) outputs = layers.Dense(10, activation='softmax')(h1) model = keras.Model(inputs, outputs) # keras.utils.plot_model(model, 'net001.png', show_shapes=True) model.compile(optimizer=keras.optimizers.RMSprop(), loss=keras.losses.SparseCategoricalCrossentropy(), metrics=[keras.metrics.SparseCategoricalAccuracy()]) model.fit(x_train, y_train, batch_size=32, epochs=1)
# 也可以以定义网络层的方式添加要统计的metric class MetricLoggingLayer(layers.Layer): def call(self, inputs): self.add_metric(keras.backend.std(inputs), name='std_of_activation', aggregation='mean') return inputs inputs = keras.Input(shape=(784,), name='mnist_input') h1 = layers.Dense(64, activation='relu')(inputs) h1 = MetricLoggingLayer()(h1) h1 = layers.Dense(64, activation='relu')(h1) outputs = layers.Dense(10, activation='softmax')(h1) model = keras.Model(inputs, outputs) # keras.utils.plot_model(model, 'net001.png', show_shapes=True) model.compile(optimizer=keras.optimizers.RMSprop(), loss=keras.losses.SparseCategoricalCrossentropy(), metrics=[keras.metrics.SparseCategoricalAccuracy()]) model.fit(x_train, y_train, batch_size=32, epochs=1)
# 也可以直接在model上面加 # 也可以以定义网络层的方式添加要统计的metric class MetricLoggingLayer(layers.Layer): def call(self, inputs): self.add_metric(keras.backend.std(inputs), name='std_of_activation', aggregation='mean') return inputs inputs = keras.Input(shape=(784,), name='mnist_input') h1 = layers.Dense(64, activation='relu')(inputs) h2 = layers.Dense(64, activation='relu')(h1) outputs = layers.Dense(10, activation='softmax')(h2) model = keras.Model(inputs, outputs) model.add_metric(keras.backend.std(inputs), name='std_of_activation', aggregation='mean') model.add_loss(tf.reduce_sum(h1)*0.1) # keras.utils.plot_model(model, 'net001.png', show_shapes=True) model.compile(optimizer=keras.optimizers.RMSprop(), loss=keras.losses.SparseCategoricalCrossentropy(), metrics=[keras.metrics.SparseCategoricalAccuracy()]) model.fit(x_train, y_train, batch_size=32, epochs=1)
处理使用validation_data传入测试数据,还可以使用validation_split划分验证数据
ps:validation_split只能在用numpy数据训练的情况下使用
model.fit(x_train, y_train, batch_size=32, epochs=1, validation_split=0.2)
def get_compiled_model(): inputs = keras.Input(shape=(784,), name='mnist_input') h1 = layers.Dense(64, activation='relu')(inputs) h2 = layers.Dense(64, activation='relu')(h1) outputs = layers.Dense(10, activation='softmax')(h2) model = keras.Model(inputs, outputs) model.compile(optimizer=keras.optimizers.RMSprop(), loss=keras.losses.SparseCategoricalCrossentropy(), metrics=[keras.metrics.SparseCategoricalAccuracy()]) return model model = get_compiled_model() train_dataset = tf.data.Dataset.from_tensor_slices((x_train, y_train)) train_dataset = train_dataset.shuffle(buffer_size=1024).batch(64) val_dataset = tf.data.Dataset.from_tensor_slices((x_val, y_val)) val_dataset = val_dataset.batch(64) # model.fit(train_dataset, epochs=3) # steps_per_epoch 每个epoch只训练几步 # validation_steps 每次验证,验证几步 model.fit(train_dataset, epochs=3, steps_per_epoch=100, validation_data=val_dataset, validation_steps=3)
“样本权重”数组是一个数字数组,用于指定批处理中每个样本在计算总损失时应具有多少权重。 它通常用于不平衡的分类问题(这个想法是为了给予很少见的类更多的权重)。 当使用的权重是1和0时,该数组可以用作损失函数的掩码(完全丢弃某些样本对总损失的贡献)。
“类权重”dict是同一概念的更具体的实例:它将类索引映射到应该用于属于该类的样本的样本权重。 例如,如果类“0”比数据中的类“1”少两倍,则可以使用class_weight = {0:1.,1:0.5}。
# 增加第5类的权重 import numpy as np # 样本权重 model = get_compiled_model() class_weight = {i:1.0 for i in range(10)} class_weight[5] = 2.0 print(class_weight) model.fit(x_train, y_train, class_weight=class_weight, batch_size=64, epochs=4) # 类权重 model = get_compiled_model() sample_weight = np.ones(shape=(len(y_train),)) sample_weight[y_train == 5] = 2.0 model.fit(x_train, y_train, sample_weight=sample_weight, batch_size=64, epochs=4)
# tf.data数据 model = get_compiled_model() sample_weight = np.ones(shape=(len(y_train),)) sample_weight[y_train == 5] = 2.0 train_dataset = tf.data.Dataset.from_tensor_slices((x_train, y_train, sample_weight)) train_dataset = train_dataset.shuffle(buffer_size=1024).batch(64) val_dataset = tf.data.Dataset.from_tensor_slices((x_val, y_val)) val_dataset = val_dataset.batch(64) model.fit(train_dataset, epochs=3, )
image_input = keras.Input(shape=(32, 32, 3), name='img_input') timeseries_input = keras.Input(shape=(None, 10), name='ts_input') x1 = layers.Conv2D(3, 3)(image_input) x1 = layers.GlobalMaxPooling2D()(x1) x2 = layers.Conv1D(3, 3)(timeseries_input) x2 = layers.GlobalMaxPooling1D()(x2) x = layers.concatenate([x1, x2]) score_output = layers.Dense(1, name='score_output')(x) class_output = layers.Dense(5, activation='softmax', name='class_output')(x) model = keras.Model(inputs=[image_input, timeseries_input], outputs=[score_output, class_output]) keras.utils.plot_model(model, 'multi_input_output_model.png' , show_shapes=True)
# 可以为模型指定不同的loss和metrics model.compile( optimizer=keras.optimizers.RMSprop(1e-3), loss=[keras.losses.MeanSquaredError(), keras.losses.CategoricalCrossentropy()]) # 还可以指定loss的权重 model.compile( optimizer=keras.optimizers.RMSprop(1e-3), loss={'score_output': keras.losses.MeanSquaredError(), 'class_output': keras.losses.CategoricalCrossentropy()}, metrics={'score_output': [keras.metrics.MeanAbsolutePercentageError(), keras.metrics.MeanAbsoluteError()], 'class_output': [keras.metrics.CategoricalAccuracy()]}, loss_weight={'score_output': 2., 'class_output': 1.}) # 可以把不需要传播的loss置0 model.compile( optimizer=keras.optimizers.RMSprop(1e-3), loss=[None, keras.losses.CategoricalCrossentropy()]) # Or dict loss version model.compile( optimizer=keras.optimizers.RMSprop(1e-3), loss={'class_output': keras.losses.CategoricalCrossentropy()})
Keras中的回调是在训练期间(在epoch开始时,batch结束时,epoch结束时等)在不同点调用的对象,可用于实现以下行为:
可使用的内置回调有
model = get_compiled_model() callbacks = [ keras.callbacks.EarlyStopping( # Stop training when `val_loss` is no longer improving monitor='val_loss', # "no longer improving" being defined as "no better than 1e-2 less" min_delta=1e-2, # "no longer improving" being further defined as "for at least 2 epochs" patience=2, verbose=1) ] model.fit(x_train, y_train, epochs=20, batch_size=64, callbacks=callbacks, validation_split=0.2)
# checkpoint模型回调 model = get_compiled_model() check_callback = keras.callbacks.ModelCheckpoint( filepath='mymodel_{epoch}.h5', save_best_only=True, monitor='val_loss', verbose=1 ) model.fit(x_train, y_train, epochs=3, batch_size=64, callbacks=[check_callback], validation_split=0.2)
# 动态调整学习率 initial_learning_rate = 0.1 lr_schedule = keras.optimizers.schedules.ExponentialDecay( initial_learning_rate, decay_steps=10000, decay_rate=0.96, staircase=True ) optimizer = keras.optimizers.RMSprop(learning_rate=lr_schedule)
# 使用tensorboard tensorboard_cbk = keras.callbacks.TensorBoard(log_dir='./full_path_to_your_logs') model.fit(x_train, y_train, epochs=5, batch_size=64, callbacks=[tensorboard_cbk], validation_split=0.2)
class LossHistory(keras.callbacks.Callback): def on_train_begin(self, logs): self.losses = [] def on_epoch_end(self, batch, logs): self.losses.append(logs.get('loss')) print('\nloss:',self.losses[-1]) model = get_compiled_model() callbacks = [ LossHistory() ] model.fit(x_train, y_train, epochs=3, batch_size=64, callbacks=callbacks, validation_split=0.2)
# Get the model. inputs = keras.Input(shape=(784,), name='digits') x = layers.Dense(64, activation='relu', name='dense_1')(inputs) x = layers.Dense(64, activation='relu', name='dense_2')(x) outputs = layers.Dense(10, activation='softmax', name='predictions')(x) model = keras.Model(inputs=inputs, outputs=outputs) # Instantiate an optimizer. optimizer = keras.optimizers.SGD(learning_rate=1e-3) # Instantiate a loss function. loss_fn = keras.losses.SparseCategoricalCrossentropy() # Prepare the training dataset. batch_size = 64 train_dataset = tf.data.Dataset.from_tensor_slices((x_train, y_train)) train_dataset = train_dataset.shuffle(buffer_size=1024).batch(batch_size) # 自己构造循环 for epoch in range(3): print('epoch: ', epoch) for step, (x_batch_train, y_batch_train) in enumerate(train_dataset): # 开一个gradient tape, 计算梯度 with tf.GradientTape() as tape: logits = model(x_batch_train) loss_value = loss_fn(y_batch_train, logits) grads = tape.gradient(loss_value, model.trainable_variables) optimizer.apply_gradients(zip(grads, model.trainable_variables)) if step % 200 == 0: print('Training loss (for one batch) at step %s: %s' % (step, float(loss_value))) print('Seen so far: %s samples' % ((step + 1) * 64))
# 训练并验证 # Get model inputs = keras.Input(shape=(784,), name='digits') x = layers.Dense(64, activation='relu', name='dense_1')(inputs) x = layers.Dense(64, activation='relu', name='dense_2')(x) outputs = layers.Dense(10, activation='softmax', name='predictions')(x) model = keras.Model(inputs=inputs, outputs=outputs) # Instantiate an optimizer to train the model. optimizer = keras.optimizers.SGD(learning_rate=1e-3) # Instantiate a loss function. loss_fn = keras.losses.SparseCategoricalCrossentropy() # Prepare the metrics. train_acc_metric = keras.metrics.SparseCategoricalAccuracy() val_acc_metric = keras.metrics.SparseCategoricalAccuracy() # Prepare the training dataset. batch_size = 64 train_dataset = tf.data.Dataset.from_tensor_slices((x_train, y_train)) train_dataset = train_dataset.shuffle(buffer_size=1024).batch(batch_size) # Prepare the validation dataset. val_dataset = tf.data.Dataset.from_tensor_slices((x_val, y_val)) val_dataset = val_dataset.batch(64) # Iterate over epochs. for epoch in range(3): print('Start of epoch %d' % (epoch,)) # Iterate over the batches of the dataset. for step, (x_batch_train, y_batch_train) in enumerate(train_dataset): with tf.GradientTape() as tape: logits = model(x_batch_train) loss_value = loss_fn(y_batch_train, logits) grads = tape.gradient(loss_value, model.trainable_variables) optimizer.apply_gradients(zip(grads, model.trainable_variables)) # Update training metric. train_acc_metric(y_batch_train, logits) # Log every 200 batches. if step % 200 == 0: print('Training loss (for one batch) at step %s: %s' % (step, float(loss_value))) print('Seen so far: %s samples' % ((step + 1) * 64)) # Display metrics at the end of each epoch. train_acc = train_acc_metric.result() print('Training acc over epoch: %s' % (float(train_acc),)) # Reset training metrics at the end of each epoch train_acc_metric.reset_states() # Run a validation loop at the end of each epoch. for x_batch_val, y_batch_val in val_dataset: val_logits = model(x_batch_val) # Update val metrics val_acc_metric(y_batch_val, val_logits) val_acc = val_acc_metric.result() val_acc_metric.reset_states() print('Validation acc: %s' % (float(val_acc),))
## 添加自己构造的loss, 每次只能看到最新一次训练增加的loss class ActivityRegularizationLayer(layers.Layer): def call(self, inputs): self.add_loss(1e-2 * tf.reduce_sum(inputs)) return inputs inputs = keras.Input(shape=(784,), name='digits') x = layers.Dense(64, activation='relu', name='dense_1')(inputs) # Insert activity regularization as a layer x = ActivityRegularizationLayer()(x) x = layers.Dense(64, activation='relu', name='dense_2')(x) outputs = layers.Dense(10, activation='softmax', name='predictions')(x) model = keras.Model(inputs=inputs, outputs=outputs) logits = model(x_train[:64]) print(model.losses) logits = model(x_train[:64]) logits = model(x_train[64: 128]) logits = model(x_train[128: 192]) print(model.losses)
# 将loss添加进求导中 optimizer = keras.optimizers.SGD(learning_rate=1e-3) for epoch in range(3): print('Start of epoch %d' % (epoch,)) for step, (x_batch_train, y_batch_train) in enumerate(train_dataset): with tf.GradientTape() as tape: logits = model(x_batch_train) loss_value = loss_fn(y_batch_train, logits) # Add extra losses created during this forward pass: loss_value += sum(model.losses) grads = tape.gradient(loss_value, model.trainable_variables) optimizer.apply_gradients(zip(grads, model.trainable_variables)) # Log every 200 batches. if step % 200 == 0: print('Training loss (for one batch) at step %s: %s' % (step, float(loss_value))) print('Seen so far: %s samples' % ((step + 1) * 64))