Skip to content

Commit

Permalink
#1088 add new divisions, fix null geometry issue, rename
Browse files Browse the repository at this point in the history
  • Loading branch information
gabrielwol committed Nov 20, 2024
1 parent d2003b0 commit 3470497
Show file tree
Hide file tree
Showing 6 changed files with 73 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,11 @@
from psycopg2 import sql, Error
from psycopg2.extras import execute_values
from io import BytesIO
from airflow.providers.postgres.hooks.postgres import PostgresHook

#fpath = '/data/home/gwolofs/bdit_data-sources/events/rodars/rodars_functions.py'
#SQL_DIR = os.path.join(os.path.abspath(os.path.dirname(fpath)), 'sql')
SQL_DIR = os.path.join(os.path.abspath(os.path.dirname(__file__)), 'sql')
fpath = '/data/home/gwolofs/bdit_data-sources/events/rodars/rodars_functions.py'
SQL_DIR = os.path.join(os.path.abspath(os.path.dirname(fpath)), 'sql')
#SQL_DIR = os.path.join(os.path.abspath(os.path.dirname(__file__)), 'sql')

LOGGER = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)
Expand All @@ -29,7 +30,11 @@ def geometry_from_bytes(geo_bytes):
coordinates_list = []
with BytesIO(geo_bytes) as ms:
# Read the first 4 bytes = length
len_val = struct.unpack('i', ms.read(4))[0]
try:
len_val = struct.unpack('i', ms.read(4))[0]
except struct.error:
#struct.error: unpack requires a buffer of 4 bytes
return None
# Iterate and unpack each pair of doubles as coordinates
for _ in range(len_val):
coordinates = coordinates_from_binary(ms)
Expand All @@ -41,7 +46,7 @@ def fetch_and_insert_data(
insert_conn = PostgresHook('vds_bot')
):
#generic function to pull and insert data using different connections and queries.
select_fpath = os.path.join(SQL_DIR, 'select-rodars_new.sql')
select_fpath = os.path.join(SQL_DIR, 'select-itsc_issues.sql')
with open(select_fpath, 'r', encoding="utf-8") as file:
select_query = sql.SQL(file.read())
#.format(
Expand All @@ -58,28 +63,35 @@ def fetch_and_insert_data(
LOGGER.critical(f"Error fetching RODARS data.")
LOGGER.critical(exc)
raise Exception()


#older rodars data doesn't have this value?
df['locationindex'] = df['locationindex'].replace({nan: 0})

geom_data = df['geometry'].map(geometry_from_bytes)
geoms_df = df[['issueid', 'divisionid']]
geoms_df.insert(2, 'geom_text', geom_data.map(coordinates_to_geomfromtext))
valid_geoms = [not(x is None) for x in geom_data]

geoms_df = df[['issueid', 'divisionid']][valid_geoms]
geoms_df.insert(2, 'geom_text', geom_data[valid_geoms].map(coordinates_to_geomfromtext))
geoms_df = geoms_df.replace({nan: None})
geoms_df = [tuple(x) for x in geoms_df.to_numpy()]

#transform values for inserting
df_no_geom = df.drop('geometry', axis = 1)
df_no_geom = df_no_geom.replace({pd.NaT: None, nan: None})
df_no_geom = [tuple(x) for x in df_no_geom.to_numpy()]

insert_fpath = os.path.join(SQL_DIR, 'insert-rodars_new.sql')
insert_fpath = os.path.join(SQL_DIR, 'insert-itsc_issues.sql')
with open(insert_fpath, 'r', encoding="utf-8") as file:
insert_query = sql.SQL(file.read())

with insert_conn.get_conn() as con, con.cursor() as cur:
execute_values(cur, insert_query, df_no_geom)

geom_update_fpath = os.path.join(SQL_DIR, 'update-rodars_geometry.sql')
with open(geom_update_fpath, 'r', encoding="utf-8") as file:
geom_update_query = sql.SQL(file.read())

with insert_conn.get_conn() as con, con.cursor() as cur:
execute_values(cur, geom_update_query, geoms_df)
execute_values(cur, geom_update_query, geoms_df)

#fetch_and_insert_data()
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
-- Table: gwolofs.rodars_new
-- Table: gwolofs.itsc_issues

-- DROP TABLE IF EXISTS gwolofs.rodars_new;
-- DROP TABLE IF EXISTS gwolofs.itsc_issues;

CREATE TABLE IF NOT EXISTS gwolofs.rodars_new
CREATE TABLE IF NOT EXISTS gwolofs.itsc_issues
(
divisionid smallint NOT NULL,
divisionname text,
issueid integer NOT NULL,
timestamputc timestamp without time zone,
issuetype smallint,
Expand Down Expand Up @@ -39,19 +40,19 @@ CREATE TABLE IF NOT EXISTS gwolofs.rodars_new
startissueonplannedstarttime boolean,
startstatus integer,
updateremindernoticeseconds integer,
CONSTRAINT rodars_new_pkey PRIMARY KEY (divisionid, issueid, locationindex)
CONSTRAINT itsc_issues_pkey PRIMARY KEY (divisionid, issueid, locationindex)
)

TABLESPACE pg_default;

ALTER TABLE IF EXISTS gwolofs.rodars_new OWNER TO dbadmin;
ALTER TABLE IF EXISTS gwolofs.itsc_issues OWNER TO dbadmin;

REVOKE ALL ON TABLE gwolofs.rodars_new FROM bdit_humans;
REVOKE ALL ON TABLE gwolofs.itsc_issues FROM bdit_humans;

GRANT SELECT ON TABLE gwolofs.rodars_new TO bdit_humans;
GRANT SELECT ON TABLE gwolofs.itsc_issues TO bdit_humans;

GRANT ALL ON TABLE gwolofs.rodars_new TO dbadmin;
GRANT ALL ON TABLE gwolofs.itsc_issues TO dbadmin;

GRANT ALL ON TABLE gwolofs.rodars_new TO rds_superuser WITH GRANT OPTION;
GRANT ALL ON TABLE gwolofs.itsc_issues TO rds_superuser WITH GRANT OPTION;

GRANT ALL ON TABLE gwolofs.rodars_new TO vds_bot;
GRANT ALL ON TABLE gwolofs.itsc_issues TO vds_bot;
22 changes: 22 additions & 0 deletions events/construction/sql/division_explore.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
SELECT DISTINCT ON (divisionid)
divisionid,
datadivision.shortname,
issueid,
timestamputc,
issuetype,
issuedata.description,
priority,
proposedstarttimestamputc,
proposedendtimestamputc,
earlyendtimestamputc,
status,
timeoption
FROM public.issuedata
JOIN public.datadivision USING (divisionid)
/*WHERE
divisionid IN (
8048, --rodars new
8014, --rodars (old)
8023 --TMMS TM3 Planned Work
)*/
ORDER BY divisionid, timestamputc, issueid DESC
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
INSERT INTO gwolofs.rodars_new (
divisionid, issueid, timestamputc, issuetype, description, priority,
INSERT INTO gwolofs.itsc_issues (
divisionid, divisionname, issueid, timestamputc, issuetype, description, priority,
proposedstarttimestamputc, proposedendtimestamputc, earlyendtimestamputc, status, timeoption,
locationindex, mainroadname, fromroadname, toroadname, direction, lanesaffected,
streetnumber, locationtype, groupid, groupdescription, sourceid, starttimestamputc,
Expand All @@ -11,6 +11,7 @@ INSERT INTO gwolofs.rodars_new (
VALUES %s
ON CONFLICT (divisionid, issueid, locationindex)
DO UPDATE SET
divisionname = excluded.divisionname,
timestamputc = excluded.timestamputc,
issuetype = excluded.issuetype,
description = excluded.description,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,18 @@ WITH issues AS (
status,
timeoption
FROM public.issuedata
WHERE divisionid = 8048 --rodars new
WHERE
divisionid IN (
8048, --rodars new
8014, --rodars (old)
8023 --TMMS TM3 Planned Work
)
ORDER BY divisionid, issueid, timestamputc DESC
)

SELECT
issues.divisionid,
datadivision.shortname AS divisionname,
issues.issueid,
issues.timestamputc,
issues.issuetype,
Expand Down Expand Up @@ -56,4 +62,5 @@ SELECT
FROM issues
LEFT JOIN public.issuelocationnew USING (divisionid, issueid, timestamputc)
LEFT JOIN public.issueconfig USING (divisionid, issueid)
LEFT JOIN public.datadivision USING (divisionid)
ORDER BY issueid DESC
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ WITH new_geoms (issueid, divisionid, geom_text) AS (
VALUES %s
)

UPDATE gwolofs.rodars_new
UPDATE gwolofs.itsc_issues
SET geometry = ST_GeomFromText(geom_text, 4326)
FROM new_geoms
WHERE
rodars_new.issueid = new_geoms.issueid
AND rodars_new.divisionid = new_geoms.divisionid;
itsc_issues.issueid = new_geoms.issueid
AND itsc_issues.divisionid = new_geoms.divisionid;

0 comments on commit 3470497

Please sign in to comment.