from tensorflow.keras import losses
import tensorflow.keras.backend as K
from tensorflow.keras import regularizers
from tensorflow.keras.models import Model
from tensorflow.keras import optimizers
from tensorflow.keras.layers import Input, Conv1D, Reshape, Lambda, Flatten
from tensorflow.keras.layers import Layer, Dense, LSTM, Activation, Multiply, Add
class CalculateScoreMatrix(Layer):
def __init__(self, output_dim=None, **kwargs):
self.output_dim = output_dim
super(CalculateScoreMatrix, self).__init__(**kwargs)
def get_config(self):
cOnfig= super().get_config().copy()
config.update({'output_dim': self.output_dim})
return config
def build(self, input_shape):
self.kernel = self.add_weight(name='kernel',
shape=(input_shape[-1], self.output_dim),
initializer='uniform',
trainable=True)
super(CalculateScoreMatrix, self).build(input_shape)
def call(self, x):
res = K.dot(x, self.kernel)
return res
class TPALSTM:
'''
epochs: 最大迭代次数
feat_dim:变量维数
input_dim:时间步维度
lr:学习率大小
batch_size:批次大小
units:隐藏单元个数
model_name:模型名称
filter_size:卷积核大小
loss_fn:损失函数
_estimators:训练模型保存字典
'''
def __init__(self, hidden_unit=32, filter_size=3, batch_size=32, epochs=50, learning_rate=1.2e-3):
self.epochs = epochs
self.feat_dim = None
self.input_dim = None
self.output_dim = None
self.lr = learning_rate
self.filters = hidden_unit
self.batch_size = batch_size
self.units = hidden_unit * 2
self.model_name = "TPA-LSTM"
self.filter_size = filter_size
self.loss_fn = losses.mean_squared_error
self._estimators = {}
def build_model(self):
inp = Input(shape=(self.input_dim, self.feat_dim))
# convolution layer
x = Conv1D(filters=self.filters, kernel_size=self.filter_size, strides=1, padding="causal")(inp)
# LSTM layer
x = LSTM(units=self.units,
kernel_initializer="glorot_uniform",
bias_initializer="zeros",
kernel_regularizer=regularizers.l2(0.001),
return_sequences=True)(x)
# get the 1~t-1 and t hidden state
H = Lambda(lambda x: x[:, :-1, :])(x)
ht = Lambda(lambda x: x[:, -1, :])(x)
ht = Reshape((self.units, 1))(ht)
# get the HC by 1*1 convolution
HC = Lambda(lambda x: K.permute_dimensions(x, [0, 2, 1]))(H)
score_mat = CalculateScoreMatrix(self.units)(HC)
score_mat = Lambda(lambda x: K.batch_dot(x[0], x[1]))([score_mat, ht])
# get the attn matrix
score_mat = Activation("sigmoid")(score_mat)
attn_mat = Multiply()([HC, score_mat])
attn_vec = Lambda(lambda x: K.sum(x, axis=-1))(attn_mat)
# get the final prediction
wvt = Dense(units=self.filters * 4, activation=None)(attn_vec)
wht = Dense(units=self.filters * 4, activation=None)(Flatten()(ht))
yht = Add()([wht, wvt])
# get the output
out = Dense(self.output_dim, activation="sigmoid")(yht)
# compile
model = Model(inputs=inp, outputs=out)
optimizer = optimizers.Nadam(learning_rate=self.lr)
model.compile(loss=self.loss_fn, optimizer=optimizer)
return model