实现 BahdanauAttention,其中socre的实现方法为 perceptron 形式
class BahdanauAttention(tf.keras.Model):def __init__(self, units):super(BahdanauAttention, self).__init__()self.W1 = tf.keras.layers.Dense(units)self.W2 = tf.keras.layers.Dense(units)self.V = tf.keras.layers.Dense(1)def call(self, features, hidden):# feature 为encoder 生成的source编码矩阵 , hidden为 i-1 时刻的隐元状态hidden_with_time_axis = tf.expand_dims(hidden, 1)# score shape == (batch_size, output length, hidden_size)score = self.V(tf.nn.tanh(self.W1(features) + self.W2(hidden_with_time_axis)))# we get 1 at the last axis because we are applying score to self.Vattention_weights = tf.nn.softmax(score, axis=1)# context_vector shape after sum == (batch_size, hidden_size)context_vector = attention_weights * featurescontext_vector = tf.reduce_sum(context_vector, axis=1)return context_vector, attention_weights
定义GRU单元
def gru(units):if tf.test.is_gpu_available():return tf.keras.layers.CuDNNGRU(units,return_sequences=True,return_state=True,recurrent_initializer='glorot_uniform')else:return tf.keras.layers.GRU(units,return_sequences=True,return_state=True,recurrent_activation='sigmoid',recurrent_initializer='glorot_uniform')
使用CRNN feature 提取层 和 单层GRU生成编码器Encoder
class Encoder(tf.keras.Model):"""enc_units: encoder 隐元数量batch_sz: batch size"""def __init__(self, enc_units, batch_sz):super(Encoder, self).__init__()self.batch_sz = batch_szself.enc_units = enc_unitsself.cnn = tf.keras.Sequential([tf.keras.layers.Conv2D(64, [3, 3], padding="same", activation='relu'),tf.keras.layers.MaxPool2D(pool_size=[2, 2], strides=2),tf.keras.layers.Conv2D(128, [3, 3], padding="same", activation='relu'),tf.keras.layers.MaxPool2D(pool_size=[2, 2], strides=2),tf.keras.layers.Conv2D(256, [3, 3], padding="same", activation='relu'),tf.keras.layers.Conv2D(256, [3, 3], padding="same", activation='relu'),tf.keras.layers.MaxPool2D(pool_size=[2, 1], strides=[2, 1]),tf.keras.layers.Conv2D(512, [3, 3], padding="same", activation='relu'),tf.keras.layers.BatchNormalization(),tf.keras.layers.Conv2D(512, [3, 3], padding="same", activation='relu'),tf.keras.layers.BatchNormalization(),tf.keras.layers.MaxPool2D(pool_size=[2, 1], strides=[2, 1]),tf.keras.layers.Conv2D(512, [2, 2], strides=[2, 1], padding="same", activation='relu'),tf.keras.layers.Reshape((25, 512))])self.gru = gru(self.enc_units)def call(self, x):x = self.cnn(x)output, state = self.gru(x)return output, statedef initialize_hidden_state(self):return tf.zeros((self.batch_sz, self.enc_units))
定义 attention 机制和 GRU 单元的解码器 Decoder
class Decoder(tf.keras.Model):def __init__(self, vocab_size, embedding_dim, dec_units, batch_sz):super(Decoder, self).__init__()self.batch_sz = batch_szself.dec_units = dec_unitsself.embedding = tf.keras.layers.Embedding(vocab_size, embedding_dim)self.gru = gru(self.dec_units)self.fc = tf.keras.layers.Dense(vocab_size)self.attention = BahdanauAttention(self.dec_units)def call(self, x, hidden, enc_output):context_vector, attention_weights = self.attention(enc_output, hidden)# x shape after passing through embedding == (batch_size, 1, embedding_dim)x1 = self.embedding(x)# x shape after cOncatenation== (batch_size, 1, embedding_dim + hidden_size)x2 = tf.concat([tf.expand_dims(context_vector, 1), x1], axis=-1)# passing the concatenated vector to the GRUoutput, state = self.gru(x2)# output shape == (batch_size * 1, hidden_size)output = tf.reshape(output, (-1, output.shape[2]))# output shape == (batch_size * 1, vocab)x = self.fc(output)return x, state, attention_weights
数据集采用mjsynth.tar.gz,这个数据集有些问题,某些样本大小写未分开标注,某些样本颜色梯度不够,可以先训练一个模型后对数据集做筛选,然后再fine tuen.
定义字典
'] = 0self.word2idx['# 将每个词汇映射为一个数字
class LanguageIndex():def __init__(self):self.word2idx = {}self.idx2word = {}self.vocab = cfg.CHAR_VECTORself.create_index()def create_index(self):self.word2idx['
处理 label 为
root = "../mnt/ramdisk/max/90kDICT32px"def create_dataset_from_file(root, file_path):with open(file_path, "r") as f:readlines = f.readlines()img_paths = []for img_name in tqdm(readlines, desc="read dir:"):img_name = img_name.rstrip().strip()img_path = root + "/" + img_nameif osp.exists(img_path):img_paths.append(img_path)img_paths = img_paths[:1000000]labels = [img_path.split("/")[-1].split("_")[-2] for img_path in tqdm(img_paths, desc="generator label:")]return img_paths, labelsdef preprocess_label(label):label = label.rstrip().strip()w = '
构建数据 dataset
def process_img(img_path):imread = cv2.imread(img_path, cv2.IMREAD_GRAYSCALE)imread = resize_image(imread, 100, 32)imread = np.expand_dims(imread, axis=-1)imread = np.array(imread, np.float32)return imreaddef resize_image(image, out_width, out_height):"""Resize an image to the "good" input size"""im_arr = imageh, w = np.shape(im_arr)[:2]ratio = out_height / him_arr_resized = cv2.resize(im_arr, (int(w * ratio), out_height))re_h, re_w = np.shape(im_arr_resized)[:2]if re_w >= out_width:final_arr = cv2.resize(im_arr, (out_width, out_height))else:final_arr = np.ones((out_height, out_width), dtype=np.uint8) * 255final_arr[:, 0:np.shape(im_arr_resized)[1]] = im_arr_resizedreturn final_arrimg_paths_tensor, labels_tensor, labels, label_lang, label_max_len = load_dataset(root)BATCH_SIZE = cfg.TRAIN_BATCH_SIZE
N_BATCH = len(img_paths_tensor) // BATCH_SIZE
embedding_dim = cfg.EMBEDDING_DIM
units = cfg.UNITSvocab_size = len(label_lang.word2idx)def map_func(img_path_tensor, label_tensor, label):imread = cv2.imread(img_path_tensor.decode('utf-8'), cv2.IMREAD_GRAYSCALE)imread = resize_image(imread, 100, 32)imread = np.expand_dims(imread, axis=-1)imread = np.array(imread, np.float32)return imread, label_tensor, labeldataset = tf.data.Dataset.from_tensor_slices((img_paths_tensor, labels_tensor, labels)) \.map(lambda item1, item2, item3: tf.py_func(map_func, [item1, item2, item3], [tf.float32, tf.int32, tf.string]),num_parallel_calls=8) \.shuffle(10000, reshuffle_each_iteration=True)
dataset = dataset.batch(BATCH_SIZE, drop_remainder=True)
定义Encoder、Decoder和Optimizer ,loss函数
encoder = Encoder(units, BATCH_SIZE)
decoder = Decoder(vocab_size, embedding_dim, units, BATCH_SIZE)optimizer = tf.train.AdamOptimizer(learning_rate=0.0001)def loss_function(real, pred):mask = 1 - np.equal(real, 0)loss_ = tf.nn.sparse_softmax_cross_entropy_with_logits(labels=real, logits=pred) * maskreturn tf.reduce_mean(loss_)
开启训练
checkpoint_dir = './checkpoints'
checkpoint_prefix = os.path.join(checkpoint_dir, "ckpt")
checkpoint = tf.train.Checkpoint(optimizer=optimizer, encoder=encoder, decoder=decoder)EPOCHS = 100for epoch in range(EPOCHS):start = time.time()total_loss = 0for (batch, (inp, targ, ground_truths)) in enumerate(dataset):loss = 0results = np.zeros((BATCH_SIZE, targ.shape[1] - 1), np.int32)with tf.GradientTape() as tape:enc_output, enc_hidden = encoder(inp)dec_hidden = enc_hiddendec_input = tf.expand_dims([label_lang.word2idx['
测试代码
import osfrom config import cfg
from lang_dict.lang import LanguageIndex
from net.net import *
from utils.img_utils import *os.environ["CUDA_VISIBLE_DEVICES"] = "1"label_lang = LanguageIndex()
vocab_size = len(label_lang.word2idx)BATCH_SIZE = 1
embedding_dim = cfg.EMBEDDING_DIM
units = cfg.UNITSencoder = Encoder(units, BATCH_SIZE)
decoder = Decoder(vocab_size, embedding_dim, units, BATCH_SIZE)checkpoint_dir = './checkpoints'
checkpoint_prefix = os.path.join(checkpoint_dir, "ckpt")
checkpoint = tf.train.Checkpoint(encoder=encoder, decoder=decoder)checkpoint.restore(tf.train.latest_checkpoint(checkpoint_dir))def evaluate(encoder, decoder, img_path, label_lang):img = process_img(img_path)enc_output, enc_hidden = encoder(np.expand_dims(img, axis=0))dec_hidden = enc_hiddendec_input = tf.expand_dims([label_lang.word2idx['
添加attention后,crnn收敛非常迅速,基本一个epoch就能基本收敛
全部代码