forked from jsyoon0823/TimeGAN
-
Notifications
You must be signed in to change notification settings - Fork 0
/
utils.py
161 lines (125 loc) · 4.65 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
"""Time-series Generative Adversarial Networks (TimeGAN) Codebase.
Reference: Jinsung Yoon, Daniel Jarrett, Mihaela van der Schaar,
"Time-series Generative Adversarial Networks,"
Neural Information Processing Systems (NeurIPS), 2019.
Paper link: https://papers.nips.cc/paper/8789-time-series-generative-adversarial-networks
Last updated Date: April 24th 2020
Code author: Jinsung Yoon ([email protected])
-----------------------------
utils.py
(1) train_test_divide: Divide train and test data for both original and synthetic data.
(2) extract_time: Returns Maximum sequence length and each sequence length.
(3) rnn_cell: Basic RNN Cell.
(4) random_generator: random vector generator
(5) batch_generator: mini-batch generator
"""
## Necessary Packages
import numpy as np
import tensorflow as tf
import pandas as pd
import copy
import data_loading as dl
def train_test_divide(data_x, data_x_hat, data_t, data_t_hat, train_rate=0.8):
"""Divide train and test data for both original and synthetic data.
Args:
- data_x: original data
- data_x_hat: generated data
- data_t: original time
- data_t_hat: generated time
- train_rate: ratio of training data from the original data
"""
# Divide train/test index (original data)
no = len(data_x)
idx = np.random.permutation(no)
train_idx = idx[:int(no * train_rate)]
test_idx = idx[int(no * train_rate):]
train_x = [data_x[i] for i in train_idx]
test_x = [data_x[i] for i in test_idx]
train_t = [data_t[i] for i in train_idx]
test_t = [data_t[i] for i in test_idx]
# Divide train/test index (synthetic data)
no = len(data_x_hat)
idx = np.random.permutation(no)
train_idx = idx[:int(no * train_rate)]
test_idx = idx[int(no * train_rate):]
train_x_hat = [data_x_hat[i] for i in train_idx]
test_x_hat = [data_x_hat[i] for i in test_idx]
train_t_hat = [data_t_hat[i] for i in train_idx]
test_t_hat = [data_t_hat[i] for i in test_idx]
return train_x, train_x_hat, test_x, test_x_hat, train_t, train_t_hat, test_t, test_t_hat
def extract_time(data):
"""Returns Maximum sequence length and each sequence length.
Args:
- data: original data
Returns:
- time: extracted time information
- max_seq_len: maximum sequence length
"""
time = list()
max_seq_len = 0
for i in range(len(data)):
max_seq_len = max(max_seq_len, len(data[i][:, 0]))
time.append(len(data[i][:, 0]))
return time, max_seq_len
def rnn_cell(module_name, hidden_dim):
"""Basic RNN Cell.
Args:
- module_name: gru, lstm, or lstmLN
Returns:
- rnn_cell: RNN Cell
"""
assert module_name in ['gru', 'lstm', 'lstmLN']
# GRU
if (module_name == 'gru'):
rnn_cell = tf.nn.rnn_cell.GRUCell(num_units=hidden_dim, activation=tf.nn.tanh)
# LSTM
elif (module_name == 'lstm'):
rnn_cell = tf.contrib.rnn.BasicLSTMCell(num_units=hidden_dim, activation=tf.nn.tanh)
# LSTM Layer Normalization
elif (module_name == 'lstmLN'):
rnn_cell = tf.contrib.rnn.LayerNormBasicLSTMCell(num_units=hidden_dim, activation=tf.nn.tanh)
return rnn_cell
def random_generator(batch_size, z_dim, T_mb, max_seq_len):
"""Random vector generation.
Args:
- batch_size: size of the random vector
- z_dim: dimension of random vector
- T_mb: time information for the random vector
- max_seq_len: maximum sequence length
Returns:
- Z_mb: generated random vector
"""
Z_mb = list()
for i in range(batch_size):
temp = np.zeros([max_seq_len, z_dim])
temp_Z = np.random.uniform(0., 1, [T_mb[i], z_dim])
temp[:T_mb[i], :] = temp_Z
Z_mb.append(temp_Z)
return Z_mb
def batch_generator(data, time, batch_size):
"""Mini-batch generator.
Args:
- data: time-series data
- time: time information
- batch_size: the number of samples in each batch
Returns:
- X_mb: time-series data in each batch
- T_mb: time information in each batch
"""
no = len(data)
idx = np.random.permutation(no)
train_idx = idx[:batch_size]
X_mb = list(data[i] for i in train_idx)
T_mb = list(time[i] for i in train_idx)
return X_mb, T_mb
def denormalizer(dat_max, dat_min, generated_data, columns):
norm_range = dat_max - dat_min
denorm_data = pd.DataFrame({})
for i in range(0, generated_data.shape[0]):
x = copy.deepcopy(generated_data[i])
df = pd.DataFrame(x, columns=columns)
for j, k in enumerate(columns):
df[k] = (df[k] * norm_range[j]) + dat_min[j]
denorm_data = denorm_data.append(df)
denorm_data = denorm_data.reset_index(drop=True)
return denorm_data