This repository has been archived by the owner on Feb 28, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathmanage.py
234 lines (187 loc) · 5.7 KB
/
manage.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
import asyncio
import importlib
import json
import sys
from functools import wraps
from traceback import format_exc
from typing import Any, Dict, Iterable, List, Optional
import click
from alembic import command
from alembic.config import Config
from algoliasearch.search_client import SearchClient # type: ignore
from dotenv import load_dotenv
from sqlalchemy.dialects.postgresql import Insert, insert
from common import nats
# Load from environment file
load_dotenv()
def coroutine(f):
@wraps(f)
def wrapper(*args, **kwargs):
return asyncio.run(f(*args, **kwargs))
return wrapper
@click.group()
def cli():
pass
@cli.group()
@click.option(
"--database", "-d", type=str, metavar="URL", envvar="DATABASE_URL", required=True
)
@click.pass_context
def migrations(ctx: click.Context, database: str):
"""
Manage the database migrations
"""
ctx.obj = Config("./alembic.ini")
ctx.obj.attributes["sqlalchemy.url"] = database
@migrations.command()
@click.pass_obj
def current(obj: Config):
"""
Get the current migration status
"""
command.current(obj)
@migrations.command()
@click.option("--no-autogenerate", is_flag=True, help="Create an empty migration")
@click.argument("message", type=str, metavar="MSG", required=True)
@click.pass_obj
def generate(obj: Config, message: str, no_autogenerate: bool):
"""
Generate a new database migration
"""
command.revision(obj, message=message, autogenerate=not no_autogenerate)
@migrations.command(name="run")
@click.option("--revision", "-r", type=str, metavar="REVISION", default="head")
@click.pass_obj
def migrations_run(obj: Config, revision: str):
"""
Apply all pending migrations
"""
command.upgrade(obj, revision)
@migrations.command()
@click.option("--revision", "-r", type=str, metavar="REVISION", default="base")
@click.pass_obj
def reset(obj: Config, revision: str):
"""
Rollback migrations to a given point
"""
command.downgrade(obj, revision)
@cli.command()
@click.option("-a", "--api", "app", flag_value="api")
@click.option("-s", "--statistics", "app", flag_value="statistics")
@click.option("-t", "--tasks", "app", flag_value="tasks")
def run(app: Optional[str]):
"""
Run an API development server
"""
if app is None:
click.echo("An API server must be selected")
sys.exit(1)
try:
# Load the module
module = importlib.import_module(app)
# Ensure the runner function exists
run_fn = getattr(module, "run")
run_fn()
except ModuleNotFoundError:
click.echo(f"{app} is not implemented yet")
click.echo(f"Reason:\n{format_exc()}")
sys.exit(1)
except AttributeError:
click.echo(f"{app} is misconfigured, could not find 'run' function")
click.echo(f"Reason:\n{format_exc()}")
sys.exit(1)
@cli.group()
@click.pass_context
def seed(ctx: click.Context):
"""
Seed different data sources
"""
ctx.obj = {
"schools": json.load(open("./common/database/migrations/schools.json", "r")),
"majors": json.load(open("./common/database/migrations/majors.json", "r")),
}
@seed.command(name="algolia")
@click.option(
"-i",
"--app-id",
"app_id",
type=str,
envvar="REGISTRATION_ALGOLIA_APP_ID",
)
@click.option(
"-k",
"--api-key",
"api_key",
type=str,
envvar="REGISTRATION_ALGOLIA_API_KEY",
)
@click.pass_obj
def seed_algolia(obj: Dict[str, List[Dict[str, Any]]], app_id: str, api_key: str):
"""
Seed the Algolia search indexes
"""
client = SearchClient.create(app_id, api_key)
def id_to_object_id(entry: Dict[str, str]) -> Dict[str, str]:
entry["objectID"] = entry["id"]
del entry["id"]
return entry
schools_index = client.init_index("schools")
schools_index.save_objects(map(id_to_object_id, obj["schools"]))
majors_index = client.init_index("majors")
majors_index.save_objects(map(id_to_object_id, obj["majors"]))
@seed.command(name="database")
@click.pass_obj
@coroutine
async def seed_database(obj: Dict[str, List[Dict[str, Any]]]):
"""
Seed the database with all the schools
"""
# Import here, so we don't need a database connection for every command
from common.database import School, db_context
schools = obj["schools"]
async with db_context() as db:
base_statement: Insert = insert(School).values(schools)
statement = base_statement.on_conflict_do_update(
index_elements=[School.id], # type: ignore
set_={
"name": base_statement.excluded.name,
"abbreviations": base_statement.excluded.abbreviations,
"alternatives": base_statement.excluded.alternatives,
},
)
await db.execute(statement)
await db.commit()
@cli.command()
@click.argument("namespace")
@click.argument("event")
@click.argument("data", nargs=-1)
@click.option(
"--manual",
is_flag=True,
help="Send a manually triggered event, defaults to automated events",
)
@coroutine
async def publish_event(
namespace: str,
event: str,
data: Iterable[str],
manual: bool = True,
):
"""
Publish an event to the NATS message bus
"""
await nats.healthcheck()
payload = {}
for pair in data:
try:
[key, value] = pair.split("=")
except ValueError:
click.echo("data must be formatted as key=value pairs")
sys.exit(1)
if value.isdecimal():
value = int(value) # type: ignore
payload[key] = value
event_type = "manual" if manual else "automated"
await nats.publish(f"{namespace}.{event_type}.{event}", payload)
if __name__ == "__main__":
cli()