-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
985dcec
commit d192464
Showing
12 changed files
with
462 additions
and
376 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,122 @@ | ||
import tensorflow as tf | ||
import os | ||
from tensorflow.python.keras.layers import Layer | ||
from tensorflow.python.keras import backend as K | ||
|
||
|
||
class AttentionLayer(Layer): | ||
""" | ||
This class implements Bahdanau attention (https://arxiv.org/pdf/1409.0473.pdf). | ||
There are three sets of weights introduced W_a, U_a, and V_a | ||
""" | ||
|
||
def __init__(self, **kwargs): | ||
super(AttentionLayer, self).__init__(**kwargs) | ||
|
||
def build(self, input_shape): | ||
assert isinstance(input_shape, list) | ||
# Create a trainable weight variable for this layer. | ||
|
||
self.W_a = self.add_weight(name='W_a', | ||
shape=tf.TensorShape((input_shape[0][2], input_shape[0][2])), | ||
initializer='uniform', | ||
trainable=True) | ||
self.U_a = self.add_weight(name='U_a', | ||
shape=tf.TensorShape((input_shape[1][2], input_shape[0][2])), | ||
initializer='uniform', | ||
trainable=True) | ||
self.V_a = self.add_weight(name='V_a', | ||
shape=tf.TensorShape((input_shape[0][2], 1)), | ||
initializer='uniform', | ||
trainable=True) | ||
|
||
super(AttentionLayer, self).build(input_shape) # Be sure to call this at the end | ||
|
||
def call(self, inputs, verbose=False): | ||
""" | ||
inputs: [encoder_output_sequence, decoder_output_sequence] | ||
""" | ||
assert type(inputs) == list | ||
encoder_out_seq, decoder_out_seq = inputs | ||
if verbose: | ||
print('encoder_out_seq>', encoder_out_seq.shape) | ||
print('decoder_out_seq>', decoder_out_seq.shape) | ||
|
||
def energy_step(inputs, states): | ||
""" Step function for computing energy for a single decoder state """ | ||
|
||
assert_msg = "States must be a list. However states {} is of type {}".format(states, type(states)) | ||
assert isinstance(states, list) or isinstance(states, tuple), assert_msg | ||
|
||
""" Some parameters required for shaping tensors""" | ||
en_seq_len, en_hidden = encoder_out_seq.shape[1], encoder_out_seq.shape[2] | ||
de_hidden = inputs.shape[-1] | ||
|
||
""" Computing S.Wa where S=[s0, s1, ..., si]""" | ||
# <= batch_size*en_seq_len, latent_dim | ||
reshaped_enc_outputs = K.reshape(encoder_out_seq, (-1, en_hidden)) | ||
# <= batch_size*en_seq_len, latent_dim | ||
W_a_dot_s = K.reshape(K.dot(reshaped_enc_outputs, self.W_a), (-1, en_seq_len, en_hidden)) | ||
if verbose: | ||
print('wa.s>',W_a_dot_s.shape) | ||
|
||
""" Computing hj.Ua """ | ||
U_a_dot_h = K.expand_dims(K.dot(inputs, self.U_a), 1) # <= batch_size, 1, latent_dim | ||
if verbose: | ||
print('Ua.h>',U_a_dot_h.shape) | ||
|
||
""" tanh(S.Wa + hj.Ua) """ | ||
# <= batch_size*en_seq_len, latent_dim | ||
reshaped_Ws_plus_Uh = K.tanh(K.reshape(W_a_dot_s + U_a_dot_h, (-1, en_hidden))) | ||
if verbose: | ||
print('Ws+Uh>', reshaped_Ws_plus_Uh.shape) | ||
|
||
""" softmax(va.tanh(S.Wa + hj.Ua)) """ | ||
# <= batch_size, en_seq_len | ||
e_i = K.reshape(K.dot(reshaped_Ws_plus_Uh, self.V_a), (-1, en_seq_len)) | ||
# <= batch_size, en_seq_len | ||
e_i = K.softmax(e_i) | ||
|
||
if verbose: | ||
print('ei>', e_i.shape) | ||
|
||
return e_i, [e_i] | ||
|
||
def context_step(inputs, states): | ||
""" Step function for computing ci using ei """ | ||
# <= batch_size, hidden_size | ||
c_i = K.sum(encoder_out_seq * K.expand_dims(inputs, -1), axis=1) | ||
if verbose: | ||
print('ci>', c_i.shape) | ||
return c_i, [c_i] | ||
|
||
def create_inital_state(inputs, hidden_size): | ||
# We are not using initial states, but need to pass something to K.rnn funciton | ||
fake_state = K.zeros_like(inputs) # <= (batch_size, enc_seq_len, latent_dim | ||
fake_state = K.sum(fake_state, axis=[1, 2]) # <= (batch_size) | ||
fake_state = K.expand_dims(fake_state) # <= (batch_size, 1) | ||
fake_state = K.tile(fake_state, [1, hidden_size]) # <= (batch_size, latent_dim | ||
return fake_state | ||
|
||
fake_state_c = create_inital_state(encoder_out_seq, encoder_out_seq.shape[-1]) | ||
fake_state_e = create_inital_state(encoder_out_seq, encoder_out_seq.shape[1]) # <= (batch_size, enc_seq_len, latent_dim | ||
|
||
""" Computing energy outputs """ | ||
# e_outputs => (batch_size, de_seq_len, en_seq_len) | ||
last_out, e_outputs, _ = K.rnn( | ||
energy_step, decoder_out_seq, [fake_state_e], | ||
) | ||
|
||
""" Computing context vectors """ | ||
last_out, c_outputs, _ = K.rnn( | ||
context_step, e_outputs, [fake_state_c], | ||
) | ||
|
||
return c_outputs, e_outputs | ||
|
||
def compute_output_shape(self, input_shape): | ||
""" Outputs produced by the layer """ | ||
return [ | ||
tf.TensorShape((input_shape[1][0], input_shape[1][1], input_shape[1][2])), | ||
tf.TensorShape((input_shape[1][0], input_shape[1][1], input_shape[0][1])) | ||
] |
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,37 +1,46 @@ | ||
#building the model keras---an attempt to build a seq2seq model in keras | ||
from data_clean import * | ||
|
||
from keras import backend as K | ||
K.clear_session() | ||
|
||
latent_dim = 300 | ||
embedding_dim=100 | ||
|
||
def define_model(max_text_length,max_summary_length,n_units): | ||
dim_rep=300 | ||
# Encoder | ||
encoder_inputs = Input(shape=(max_text_len,)) | ||
|
||
#embedding layer | ||
enc_emb = Embedding(x_voc, embedding_dim,trainable=True)(encoder_inputs) | ||
|
||
encoder_inputs=Input(shape=(None,max_text_length,dim_rep)) | ||
#encoder lstm 1 | ||
encoder_lstm1 = LSTM(latent_dim,return_sequences=True,return_state=True,dropout=0.4,recurrent_dropout=0.4) | ||
encoder_output1, state_h1, state_c1 = encoder_lstm1(enc_emb) | ||
|
||
encoder=LSTM(n_units,return_state=True) | ||
encoder_outputs, state_h, state_c = encoder(encoder_inputs) | ||
encoder_states = [state_h, state_c] | ||
#encoder lstm 2 | ||
encoder_lstm2 = LSTM(latent_dim,return_sequences=True,return_state=True,dropout=0.4,recurrent_dropout=0.4) | ||
encoder_output2, state_h2, state_c2 = encoder_lstm2(encoder_output1) | ||
|
||
#define training decoder | ||
decoder_inputs = Input(shape=(None, max_summary_length,dim_rep)) | ||
#encoder lstm 3 | ||
encoder_lstm3=LSTM(latent_dim, return_state=True, return_sequences=True,dropout=0.4,recurrent_dropout=0.4) | ||
encoder_outputs, state_h, state_c= encoder_lstm3(encoder_output2) | ||
|
||
decoder_lstm = LSTM(n_units, return_sequences=True, return_state=True) | ||
decoder_outputs, _, _ = decoder_lstm(decoder_inputs, initial_state=encoder_states) | ||
decoder_dense = Dense(n_output, activation='softmax') | ||
decoder_outputs = decoder_dense(decoder_outputs) | ||
model = Model([encoder_inputs, decoder_inputs], decoder_outputs) | ||
# Set up the decoder, using `encoder_states` as initial state. | ||
decoder_inputs = Input(shape=(None,)) | ||
|
||
#define inference encoder | ||
encoder_model = Model(encoder_inputs, encoder_states) | ||
#embedding layer | ||
dec_emb_layer = Embedding(y_voc, embedding_dim,trainable=True) | ||
dec_emb = dec_emb_layer(decoder_inputs) | ||
|
||
#define inference decoder | ||
decoder_state_input_h = Input(shape=(n_units,)) | ||
decoder_state_input_c = Input(shape=(n_units,)) | ||
decoder_states_inputs = [decoder_state_input_h, decoder_state_input_c] | ||
decoder_outputs, state_h, state_c = decoder_lstm(decoder_inputs, initial_state=decoder_states_inputs) | ||
decoder_states = [state_h, state_c] | ||
decoder_outputs = decoder_dense(decoder_outputs) | ||
decoder_model = Model([decoder_inputs] + decoder_states_inputs, [decoder_outputs] + decoder_states) | ||
decoder_lstm = LSTM(latent_dim, return_sequences=True, return_state=True,dropout=0.4,recurrent_dropout=0.2) | ||
decoder_outputs,decoder_fwd_state, decoder_back_state = decoder_lstm(dec_emb,initial_state=[state_h, state_c]) | ||
|
||
return model, encoder_model, decoder_model | ||
|
||
|
||
#dense layer | ||
decoder_dense = TimeDistributed(Dense(y_voc, activation='softmax')) | ||
decoder_outputs = decoder_dense(decoder_outputs) | ||
|
||
# Define the model | ||
model = Model([encoder_inputs, decoder_inputs], decoder_outputs) | ||
|
||
model.summary() |
Oops, something went wrong.