-
Notifications
You must be signed in to change notification settings - Fork 0
/
prod.py
executable file
·186 lines (153 loc) · 6.14 KB
/
prod.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
#!/usr/bin/env python3
"""
Knowledge of all tables is in the root of
the parquet distribution. Naively it is the
directory name. A more evolved distribution would
have a machine readable data structure.
There is a directory root for the SQLITE database
and related data are delivered to. Its name is
computed from a small number to support multiple runs.
this support re-running in the case of failure.
We use the SQLITE database for bookkeeping we make
all_tables, table begun, procesing attempt records
and table_finished records.
Cleanup phase:
we first look for table_beguns w/out table_finshed.
For each begun record we delete any artifacts from a
previous run, then delete the table_begun record.
Creation phase
for each table lacking a begun record..
we ingest the table.
"""
import os
import glob
import sqlite3
import pandas as pd
import argparse
import logging
class Manager:
def __init__(self, args):
self.parquet_root = args.parquet_root
self.run_number = 0
self.delivery_root = os.path.join(args.delivery_root,f"_{self.run_number}")
self.database_file = os.path.join(self.delivery_root,"files.db")
def initialize(self):
self.conn = sqlite3.connect(self.database_file)
self.cur = self.conn.cursor()
def initiate(self):
"make the one-time content"
os.system(f"mkdir -p {self.delivery_root}")
self.initialize()
self.q("CREATE TABLE tables (tab TEXT, begun INTEGER, done INTEGER)")
self.q("CREATE TABLE attempts (tab TEXT)")
for table in glob.glob(os.path.join(self.parquet_root,"*")):
table = os.path.basename(table)
self.q(f"INSERT INTO tables VALUES ('{table}', 0, 0)")
# common functions
#
def q(self, sql):
"log and execute queries"
logging.debug(sql)
result = self.cur.execute(sql)
self.conn.commit()
return result
def parquet_table_root(self,table):
return os.path.join( self.parquet_root, table)
def shell(self, cmd):
logging.debug(cmd)
os.system(cmd)
def db_shell(self):
self.shell(f"sqlite3 {self.database_file}")
#
# Track and inform about state of table ingest
#
def state_get_crashed_tables(self):
sql = "SELECT tab FROM tables WHERE begun = 1 AND done = 0"
ret = [r[0] for r in self.q(sql)]
return ret
def state_reset_crashed_table(self, table):
sql = f"UPDATE tables SET begun = 0 WHERE tab = '{table}'"
return self.q(sql)
def state_begin_table(self, table):
sql = f"UPDATE tables SET begun = 1 WHERE tab = '{table}'"
return self.q(sql)
def state_get_table_to_ingest(self):
sql = f"SELECT tab FROM tables WHERE begun = 0"
ret = [r[0] for r in self.q(sql)]
return ret
def state_mark_table_ingested(self, table):
sql = f"UPDATE tables SET done = 1 where tab = '{table}'"
return self.q(sql)
#
# clean actions
#
def act_clean_crashed_runs(self):
"undo any actions for tables partially processed."
for table in self.state_get_crashed_tables():
self.act_clean_crashed_run(table)
self.state_reset_crashed_table(table)
def act_clean_crashed_run(self, table):
"remove any half-built state"
sql = f"DROP TABLE IF EXISTS {table}"
self.q(sql)
#
# ingest actons
#
def act_ingest_tables(self):
"ingest all tables"
for table in self.state_get_table_to_ingest():
self.state_begin_table(table)
self.act_ingest_table(table)
# Assume any error is a fatal exception
self.state_mark_table_ingested(table)
def act_ingest_table(self, table):
"ingest one set o fparquet files into table"
files = glob.glob(os.path.join(self.parquet_table_root(table), "*.parquet"))
for p_file in files:
logging.info(f"{p_file} begin ingest")
df = pd.read_parquet(p_file, engine='pyarrow')
df.to_sql(name=table, con=self.conn, if_exists='append')
logging.info(f"{p_file} done ingest")
def ingest(args):
"ingest all parquet files into the DB"
m = Manager(args)
m.initialize()
m.act_clean_crashed_runs()
m.act_ingest_tables()
def init(args):
"Initialize a directory structure for the DB release"
m = Manager(args)
m.initiate()
def shell(arg):
"connect interactively into the DB"
m = Manager(args)
m.db_shell()
if __name__ == "__main__" :
main_parser = argparse.ArgumentParser(
description=__doc__,
formatter_class=argparse.RawDescriptionHelpFormatter)
main_parser.add_argument('--loglevel','-l',
help='loglevel NONE, "INFO", DEBUG',
default="INFO")
main_parser.set_defaults(func=None)
subparsers = main_parser.add_subparsers()
parser = subparsers.add_parser('ingest', help=ingest.__doc__)
parser.set_defaults(func=ingest)
parser.add_argument("-p", "--parquet_root", help = "root of parquet files")
parser.add_argument("-d", "--delivery_root", help = "root of the sqlpuls delivery ", default="./")
parser = subparsers.add_parser('init', help=init.__doc__)
parser.set_defaults(func=init)
parser.add_argument("-p", "--parquet_root", help = "root of parquet files")
parser.add_argument("-d", "--delivery_root", help = "root of the sqlpuls delivery ", default="./")
parser = subparsers.add_parser('shell', help=shell.__doc__)
parser.set_defaults(func=shell)
parser.add_argument("-p", "--parquet_root", help = "root of parquet files")
parser.add_argument("-d", "--delivery_root", help = "root of the sqlpuls delivery ", default="./")
args = main_parser.parse_args()
loglevel=logging.__dict__[args.loglevel]
assert type(loglevel) == type(1)
logging.basicConfig(level=logging.__dict__[args.loglevel])
if not args.func: # there are no subfunctions
main_parser.print_help()
exit(1)
args.func(args)