DeepFM不算什么新技术了,用TensorFlow实现DeepFM也有开源实现,那我为什么要炒这个冷饭,重复造轮子?
用Google搜索“TensorFlow+DeepFM”,一般都能搜索到“ChenglongChen/tensorflow-DeepFM”和“lambdaJi的TensorFlow Estimator of DeepFM”这二位的实现。二位不仅用TensorFlow实现了DeepFM,还在Criteo数据集上,给出了完整的训练、测试的代码,的确给了我很大的启发,在这里要表示感谢。
但是,同样是由于二位的实现都是根据Criteo简单数据集的,使他们的代码,如果移植到实际的推荐系统中,存在一定困难。比如:
稀疏要求。尽管criteo的原始数据集是排零存储的,但是以上的两个实现,都是用稠密矩阵来表示输入,将0又都补了回来。这种做法,在criteo这种只有39列的简单数据集上是可行的,但是实际系统中,特征数量以千、万计,这种稀疏转稠密的方式是不可取的。
一列多值的要求。Criteo数据集有13列numeric特征+26列categorical特征,所有列都只有一个值。但是,在实际系统中,一个field下往往有多个
这个要求固然可以通过,去除field这个“特征单位”,只针对一个个独立的feature来建模。但是,这样一来,既凭空增加了模型的规模,又破坏模型的“层次化”与“模块化”,使代码不易扩展与维护。
权值共享的要求。Criteo数据集经过脱敏感处理,我们无法知道每列的具体含义,自然也就没有列与列之间共享权重的需求,以上提到的两个实现也就只用一整块稠密矩阵来建模embedding矩阵。
但是,以上面提到的“近xxx天活跃app”+“近xxx天新安装app”+“近xxx天卸载app”这三个field为例,这些 field中的feature都来源于同一个”app字典”。如果不做权重共享,
因此,在实际系统中,“共享权重”是必须的,
正因为在目前我能够找到的基于TensorFlow实现的DeepFM中,没有一个能够满足以上“稀疏”、“多值”、“共享权重”这三个要求的,所以,我自己动手实现了一个,代码见我的github。接下来,我简单讲解一下我的代码。
我依然用criteo数据集来做演示之用。为了演示“一列多值”和“稀疏”,我把criteo中的特征分为两个field,所有数值特征I1~I13归为numeric field,所有类别特征C1~C26归为categorical field。
需要特别指出的是:
对criteo中数值特征与类别特征,都是最常规的预处理,不是这次演示的重点
预处理的代码见criteo_data_preproc.py,处理好的数据文件如下所示,图中的亮块是列分隔符。可以看到,每列是由多个tag_index:value“键值对”组成的,而不同行中“键值对”个数互不同,而value绝没有0,实现排零、稀疏存储。
输入数据
为了配合TensorFlow Estimator,我们需要定义input_fn来读取上图所示的数据。
看似简单的任务,实现起来,却很花费了我一番功夫:
最终,解析一行文本的代码如下。
def _decode_tsv(line):
columns = tf.decode_csv(line, record_defaults=DEFAULT_VALUES, field_delim='\t')
y = columns[0]
feat_columns = dict(zip((t[0] for t in COLUMNS_MAX_TOKENS), columns[1:]))
X = {}
for colname, max_tokens in COLUMNS_MAX_TOKENS:
# 调用string_split时,第一个参数必须是一个list,所以要把columns[colname]放在[]中
# 这时每个kv还是'k:v'这样的字符串
kvpairs = tf.string_split([feat_columns[colname]], ',').values[:max_tokens]
# k,v已经拆开, kvpairs是一个SparseTensor,因为每个kvpair格式相同,都是"k:v"
# 既不会出现"k",也不会出现"k:v1:v2:v3:..."
# 所以,这时的kvpairs实际上是一个满阵
kvpairs = tf.string_split(kvpairs, ':')
# kvpairs是一个[n_valid_pairs,2]矩阵
kvpairs = tf.reshape(kvpairs.values, kvpairs.dense_shape)
feat_ids, feat_vals = tf.split(kvpairs, num_or_size_splits=2, axis=1)
feat_ids = tf.string_to_number(feat_ids, out_type=tf.int32)
feat_vals = tf.string_to_number(feat_vals, out_type=tf.float32)
# 不能调用squeeze, squeeze的限制太多, 当原始矩阵有1行或0行时,squeeze都会报错
X[colname + "_ids"] = tf.reshape(feat_ids, shape=[-1])
X[colname + "_values"] = tf.reshape(feat_vals, shape=[-1])
return X, y
然后,将整个文件转化成TensorFlow Dataset的代码如下所示。每一个field“xxx”在dataset中将由两个SparseTensor表示,“xxx_ids”表示sparse ids,“xxx_values”表示sparse values。
def input_fn(data_file, n_repeat, batch_size, batches_per_shuffle):
# ----------- prepare padding
pad_shapes = {}
pad_values = {}
for c, max_tokens in COLUMNS_MAX_TOKENS:
pad_shapes[c + "_ids"] = tf.TensorShape([max_tokens])
pad_shapes[c + "_values"] = tf.TensorShape([max_tokens])
pad_values[c + "_ids"] = -1 # 0 is still valid token-id, -1 for padding
pad_values[c + "_values"] = 0.0
# no need to pad labels
pad_shapes = (pad_shapes, tf.TensorShape([]))
pad_values = (pad_values, 0)
# ----------- define reading ops
dataset = tf.data.TextLineDataset(data_file).skip(1) # skip the header
dataset = dataset.map(_decode_tsv, num_parallel_calls=4)
if batches_per_shuffle > 0:
dataset = dataset.shuffle(batches_per_shuffle * batch_size)
dataset = dataset.repeat(n_repeat)
dataset = dataset.padded_batch(batch_size=batch_size,
padded_shapes=pad_shapes,
padding_values=pad_values)
iterator = dataset.make_one_shot_iterator()
dense_Xs, ys = iterator.get_next()
# ----------- convert dense to sparse
sparse_Xs = {}
for c, _ in COLUMNS_MAX_TOKENS:
for suffix in ["ids", "values"]:
k = "{}_{}".format(c, suffix)
sparse_Xs[k] = tf_utils.to_sparse_input_and_drop_ignore_values(dense_Xs[k])
# ----------- return
return sparse_Xs, ys
其中也不得不调用padded_batch补齐,这一步也将稀疏格式转化成了稠密格式,不过只是在一个batch(batch_size=128已经算很大了)中临时稠密一下,很快就又通过调用to_sparse_input_and_drop_ignore_values这个函数重新转化成稀疏格式了。to_sparse_input_and_drop_ignore_values实际上是从feature_column.py这个module中的_to_sparse_input_and_drop_ignore_values函数拷贝而来,因为原函数不是public的,无法在featurecolumn.py以外调用,所以我将它的代码拷贝到tf_utils.py中。
重申几个概念。比如我们的特征集中包括active_pkgs(app活跃情况)、install_pkgs(app安装情况)、uninstall_pkgs(app卸载情况)。每列包含的内容是一系列feature和其数值,比如qq:0.1, weixin:0.9, taobao:1.1, ……。这些feature都来源于同一份名为package的字典
建立共享权重的代码如下所示:
class EmbeddingTable:
def __init__(self):
self._weights = {}
def add_weights(self, vocab_name, vocab_size, embed_dim):
""" :param vocab_name: 一个field拥有两个权重矩阵,一个用于线性连接,另一个用于非线性(二阶或更高阶交叉)连接 :param vocab_size: 字典总长度 :param embed_dim: 二阶权重矩阵shape=[vocab_size, order2dim],映射成的embedding 既用于接入DNN的第一屋,也是用于FM二阶交互的隐向量 :return: None """
linear_weight = tf.get_variable(name='{}_linear_weight'.format(vocab_name),
shape=[vocab_size, 1],
initializer=tf.glorot_normal_initializer(),
dtype=tf.float32)
# 二阶(FM)与高阶(DNN)的特征交互,共享embedding矩阵
embed_weight = tf.get_variable(name='{}_embed_weight'.format(vocab_name),
shape=[vocab_size, embed_dim],
initializer=tf.glorot_normal_initializer(),
dtype=tf.float32)
self._weights[vocab_name] = (linear_weight, embed_weight)
def get_linear_weights(self, vocab_name): return self._weights[vocab_name][0]
def get_embed_weights(self, vocab_name): return self._weights[vocab_name][1]
def build_embedding_table(params):
embed_dim = params['embed_dim'] # 必须有统一的embedding长度
embedding_table = EmbeddingTable()
for vocab_name, vocab_size in params['vocab_sizes'].items():
embedding_table.add_weights(vocab_name=vocab_name, vocab_size=vocab_size, embed_dim=embed_dim)
return embedding_table
def output_logits_from_linear(features, embedding_table, params):
field2vocab_mapping = params['field_vocab_mapping']
combiner = params.get('multi_embed_combiner', 'sum')
fields_outputs = []
# 当前field下有一系列的
# 将所有tag对应的bias,按照其value进行加权平均,得到这个field对应的bias
for fieldname, vocabname in field2vocab_mapping.items():
sp_ids = features[fieldname + "_ids"]
sp_values = features[fieldname + "_values"]
linear_weights = embedding_table.get_linear_weights(vocab_name=vocabname)
# weights: [vocab_size,1]
# sp_ids: [batch_size, max_tags_per_example]
# sp_weights: [batch_size, max_tags_per_example]
# output: [batch_size, 1]
output = embedding_ops.safe_embedding_lookup_sparse(linear_weights, sp_ids, sp_values,
combiner=combiner,
name='{}_linear_output'.format(fieldname))
fields_outputs.append(output)
# 因为不同field可以共享同一个vocab的linear weight,所以将各个field的output相加,会损失大量的信息
# 因此,所有field对应的output拼接起来,反正每个field的output都是[batch_size,1],拼接起来,并不占多少空间
# whole_linear_output: [batch_size, total_fields]
whole_linear_output = tf.concat(fields_outputs, axis=1)
tf.logging.info("linear output, shape={}".format(whole_linear_output.shape))
# 再映射到final logits(二分类,也是[batch_size,1])
# 这时,就不要用任何activation了,特别是ReLU
return tf.layers.dense(whole_linear_output, units=1, use_bias=True, activation=None)
二阶交互部分与DeepFM论文中稍有不同,而是使用了《Neural Factorization Machines for Sparse Predictive Analytics》中Bi-Interaction的公式。这也是网上实现的通用做法。
而我的实现与上边公式最大的不同,就是不再只有一个embedding矩阵V,而是每个feature根据自己所在的field,再根据超参指定的field与vocabulary的映射关系,找到自己对应的embedding矩阵。某个field对应的embedding矩阵有可能是与另外一个field共享的。
另外,
本文展示了我写的一套基于TensorFlow的DeepFM的实现。重点阐述了“一列多值”、“稀疏”、“权重共享”在实际推荐系统中的重要性,和我是如何在DeepFM中实现以上需求的。欢迎各位看官指正。