-
Notifications
You must be signed in to change notification settings - Fork 0
/
prov_health_data_to_minio.py
181 lines (141 loc) · 6.91 KB
/
prov_health_data_to_minio.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
import json
import os
import logging
import re
import sys
import stat
import tempfile
import zipfile
import paramiko
from db_utils import minio_utils
CITY_PROXY_HOSTNAME = "internet.capetown.gov.za"
CITY_PROXY_PORT = "8080"
FTP_HOSTNAME = "164.151.8.14"
FTP_PORT = "5022"
FTP_SYNC_DIR_NAME = 'WCGH_COCT'
MIN_FILE_SIZE = 1024
FETCH_WINDOW_SIZE = 90
PROV_HEALTH_BACKUP_PREFIX = "data/staging/wcgh_backup/"
RESTRICTED_PREFIX = "data/private/"
BUCKET = 'covid'
BUCKET_CLASSIFICATION = minio_utils.DataClassification.EDGE
COVID_SUM_FILENAME_REGEX = "^covid_sum.*txt$"
COVID_SUM_FILENAME = "covid_sum_latest.txt"
def get_sftp_client(proxy_username, proxy_password, ftp_username, ftp_password):
proxy = paramiko.proxy.ProxyCommand(
(f'/usr/bin/ncat --proxy {CITY_PROXY_HOSTNAME}:{CITY_PROXY_PORT} '
f'--proxy-type http '
f'--proxy-auth {proxy_username}:{proxy_password} '
f'{FTP_HOSTNAME} {FTP_PORT}')
)
transport = paramiko.Transport(sock=proxy)
transport.connect(username=ftp_username, password=ftp_password)
sftp = paramiko.SFTPClient.from_transport(transport)
return sftp
def sftp_flatten(parent_path, sftp):
for sftp_fileattr in sftp.listdir_attr(parent_path):
remote_path = os.path.join(parent_path, sftp_fileattr.filename)
logging.debug(f"remote_path={remote_path}, "
f"stat.S_ISDIR(sftp_fileattr.st_mode)={stat.S_ISDIR(sftp_fileattr.st_mode)}")
if stat.S_ISDIR(sftp_fileattr.st_mode):
for child_fileattr, child_path in sftp_flatten(remote_path, sftp):
yield child_fileattr, child_path
else:
yield sftp_fileattr, remote_path
def get_prov_files(sftp):
# Recursive fetch of files and their paths
list_of_files = list(sftp_flatten(FTP_SYNC_DIR_NAME, sftp))
# Sorting by modification time
list_of_files.sort(key=lambda sftp_file_tuple: sftp_file_tuple[0].st_mtime)
# Filtering out empty files
logging.debug(f"( pre-filter): len(list_of_files)={len(list_of_files)}")
list_of_files = list(filter(
lambda sftp_file_tuple: sftp_file_tuple[0].st_size > MIN_FILE_SIZE,
list_of_files
))
logging.debug(f"( post-filter): len(list_of_files)={len(list_of_files)}")
# Selecting more recent files
list_of_files = list_of_files[-1*FETCH_WINDOW_SIZE:]
filename_list = ', '.join(map(
lambda sftp_file_tuple: sftp_file_tuple[1], list_of_files
))
logging.debug(f"Got the following list of files from FTP server: '{filename_list}'")
with tempfile.TemporaryDirectory() as tempdir:
# Getting the files from the FTP server
for sftp_file, sftp_file_remote_path in list_of_files:
filename = sftp_file.filename
logging.debug(f"Getting '{sftp_file_remote_path}'...")
local_path = os.path.join(tempdir, filename)
sftp.get(sftp_file_remote_path, local_path)
# Still doing this within the tempdir context manager
for sftp_file, _ in list_of_files:
filename = sftp_file.filename
local_path = os.path.join(tempdir, filename)
# This is reliant on the file's modified time, hence it's probably the latest
last_sftp_file, _ = list_of_files[-1]
probably_latest = sftp_file is last_sftp_file
logging.debug(f"local_path={local_path}, probably_latest={probably_latest}")
yield local_path, probably_latest
def get_zipfile_contents(zfilename, zfile_password, latest):
with tempfile.TemporaryDirectory() as tempdir, zipfile.ZipFile(zfilename) as zfile:
zfile_contents = zfile.namelist()
logging.debug(f"Found the following files listed in '{zfilename}': {', '.join(zfile_contents)}")
for zcontent_filename in zfile_contents:
logging.debug(f"Extracting '{zcontent_filename}'")
zfile.extract(zcontent_filename, path=tempdir, pwd=zfile_password.encode())
local_path = os.path.join(tempdir, zcontent_filename)
yield PROV_HEALTH_BACKUP_PREFIX, local_path
# Additionally, if this looks like a covid sum file, and its the latest
# then make a generic latest file symlink for it
if re.match(COVID_SUM_FILENAME_REGEX, zcontent_filename) and latest:
latest_file_local_path = os.path.join(tempdir, COVID_SUM_FILENAME)
os.link(local_path, latest_file_local_path)
yield RESTRICTED_PREFIX, latest_file_local_path
if __name__ == "__main__":
logging.basicConfig(level=logging.DEBUG,
format='%(asctime)s-%(module)s.%(funcName)s [%(levelname)s]: %(message)s')
# Loading secrets
SECRETS_PATH_VAR = "SECRETS_PATH"
if SECRETS_PATH_VAR not in os.environ:
logging.error(f"'{SECRETS_PATH_VAR}' env var missing!")
sys.exit(-1)
secrets_path = os.environ["SECRETS_PATH"]
secrets = json.load(open(secrets_path))
# Getting SFTP client
logging.info("Auth[ing] with FTP server")
sftp_client = get_sftp_client(
secrets["proxy"]["username"], secrets["proxy"]["password"],
secrets["ftp"]["wcgh"]["username"], secrets["ftp"]["wcgh"]["password"],
)
logging.info("Auth[ed] with FTP server")
# Getting files from provincial server
logging.info("Get[ing] files from FTP server...")
seen_the_latest_covid_sum_file = False
for ftp_file_path, probably_latest_file in get_prov_files(sftp_client):
logging.debug(f"Backing up {ftp_file_path}...")
minio_utils.file_to_minio(
filename=ftp_file_path,
filename_prefix_override=PROV_HEALTH_BACKUP_PREFIX,
minio_bucket=BUCKET,
minio_key=secrets["minio"]["edge"]["access"],
minio_secret=secrets["minio"]["edge"]["secret"],
data_classification=BUCKET_CLASSIFICATION,
)
if zipfile.is_zipfile(ftp_file_path):
logging.debug(f"{ftp_file_path} appears to be a zip file, attempting to decompress...")
for file_path_prefix, zcontent_file_path in get_zipfile_contents(ftp_file_path,
secrets["ftp"]["wcgh"]["password"],
probably_latest_file):
if COVID_SUM_FILENAME in zcontent_file_path:
seen_the_latest_covid_sum_file = True
logging.debug(f"...extracted {zcontent_file_path}")
minio_utils.file_to_minio(
filename=zcontent_file_path,
filename_prefix_override=file_path_prefix,
minio_bucket=BUCKET,
minio_key=secrets["minio"]["edge"]["access"],
minio_secret=secrets["minio"]["edge"]["secret"],
data_classification=BUCKET_CLASSIFICATION,
)
assert seen_the_latest_covid_sum_file, "Did *not* copy a latest file"
logging.info("G[ot] files from FTP server")