Skip to content

Commit

Permalink
modular source ingest script
Browse files Browse the repository at this point in the history
  • Loading branch information
kelle committed Dec 13, 2023
1 parent 03b9ffd commit 1b048b0
Showing 1 changed file with 205 additions and 157 deletions.
362 changes: 205 additions & 157 deletions scripts/ingests/ingest_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,9 +92,9 @@ def ingest_sources(db, sources, references=None, ras=None, decs=None, comments=N

# SETUP INPUTS
if ras is None and decs is None:
coords = False
coords_provided = False
else:
coords = True
coords_provided = True

if isinstance(sources, str):
n_sources = 1
Expand All @@ -121,165 +121,42 @@ def ingest_sources(db, sources, references=None, ras=None, decs=None, comments=N
logger.info(f"Trying to add {n_sources} sources")

# Loop over each source and decide to ingest, skip, or add alt name
for i, source in enumerate(sources):
# Find out if source is already in database or not
if coords and search_db:
name_matches = find_source_in_db(db, source, ra=ras[i], dec=decs[i])
elif search_db:
name_matches = find_source_in_db(db, source)
elif not search_db:
name_matches = []
else:
name_matches = None
ra = None
dec = None

if len(name_matches) == 1 and search_db: # Source is already in database
n_existing += 1
msg1 = f"{i}: Skipping {source}. Already in database as {name_matches[0]}. \n "
logger.debug(msg1)

# Figure out if ingest name is an alternate name and add
db_matches = db.search_object(source, output_table='Sources', fuzzy_search=False)
if len(db_matches) == 0:
#add other name to names table
ingest_names(db, name_matches[0], source)
n_alt_names += 1
continue
elif len(name_matches) > 1 and search_db: # Multiple source matches in the database
n_multiples += 1
msg1 = f"{i} Skipping {source} "
msg = f"{i} More than one match for {source}\n {name_matches}\n"
logger.warning(msg1 + msg)
if raise_error:
raise SimpleError(msg)
else:
continue
elif len(name_matches) == 0 or not search_db: # No match in the database, INGEST!
if coords: # Coordinates were provided as input
ra = ras[i]
dec = decs[i]
epoch = None if ma.is_masked(epochs[i]) else epochs[i]
equinox = None if ma.is_masked(equinoxes[i]) else equinoxes[i]
else: # Try to get coordinates from SIMBAD
simbad_result_table = Simbad.query_object(source)
if simbad_result_table is None:
n_skipped += 1
ra = None
dec = None
msg = f"{i}: Skipping: {source}. Coordinates are needed and could not be retrieved from SIMBAD. \n"
logger.warning(msg)
if raise_error:
raise SimpleError(msg)
else:
continue
elif len(simbad_result_table) == 1:
simbad_coords = simbad_result_table['RA'][0] + ' ' + simbad_result_table['DEC'][0]
simbad_skycoord = SkyCoord(simbad_coords, unit=(u.hourangle, u.deg))
ra = simbad_skycoord.to_string(style='decimal').split()[0]
dec = simbad_skycoord.to_string(style='decimal').split()[1]
epoch = '2000' # Default coordinates from SIMBAD are epoch 2000.
equinox = 'J2000' # Default frame from SIMBAD is IRCS and J2000.
msg = f"Coordinates retrieved from SIMBAD {ra}, {dec}"
logger.debug(msg)
else:
n_skipped += 1
ra = None
dec = None
msg = f"{i}: Skipping: {source}. Coordinates are needed and could not be retrieved from SIMBAD. \n"
logger.warning(msg)
if raise_error:
raise SimpleError(msg)
else:
continue

logger.debug(f"{i}: Ingesting {source}. Not already in database. ")
for source_counter, source in enumerate(sources):
# ingest_source function starts here
# TODO: figure out counter
logger.debug(f"{source_counter}: Trying to ingest {source}")

reference = references[source_counter]
other_reference = other_references[source_counter]
comment = None if ma.is_masked(comments[source_counter]) else comments[source_counter]

if coords_provided:
ra = ras[source_counter]
dec = decs[source_counter]
epoch = None if ma.is_masked(epochs[source_counter]) else epochs[source_counter]
equinox = None if ma.is_masked(equinoxes[source_counter]) else equinoxes[source_counter]

ingest_source(db, source, reference=reference, ra=ra, dec=dec, epoch=epoch, equinox=equinox,
other_reference=other_reference, comment=comment, raise_error=raise_error, search_db=search_db)
else:
msg = f"{i}: unexpected condition encountered ingesting {source}"
logger.error(msg)
raise SimpleError(msg)
ingest_source(db, source, reference=reference, other_reference=other_reference, comment=comment,
raise_error=raise_error, search_db=search_db)

# Construct data to be added
source_data = [{'source': source,
'ra': ra,
'dec': dec,
'reference': references[i],
'epoch': epoch,
'equinox': equinox,
'other_references': other_references[i],
'comments': None if ma.is_masked(comments[i]) else comments[i]}]
names_data = [{'source': source,
'other_name': source}]

# Try to add the source to the database
try:
with db.engine.connect() as conn:
conn.execute(db.Sources.insert().values(source_data))
conn.commit()
n_added += 1
msg = f"Added {str(source_data)}"
logger.debug(msg)
except sqlalchemy.exc.IntegrityError:
if ma.is_masked(source_data[0]['reference']): # check if reference is blank
msg = f"{i}: Skipping: {source}. Discovery reference is blank. \n"
msg2 = f"\n {str(source_data)}\n"
logger.warning(msg)
logger.debug(msg2)
n_skipped += 1
if raise_error:
raise SimpleError(msg + msg2)
else:
continue
elif db.query(db.Publications).filter(db.Publications.c.publication == references[i]).count() == 0:
# check if reference is in Publications table
msg = f"{i}: Skipping: {source}. Discovery reference {references[i]} is not in Publications table. \n" \
f"(Add it with add_publication function.) \n "
msg2 = f"\n {str(source_data)}\n"
logger.warning(msg)
logger.debug(msg2)
n_skipped += 1
if raise_error:
raise SimpleError(msg + msg2)
else:
continue
else:
msg = f"{i}: Skipping: {source}. Not sure why."
msg2 = f"\n {str(source_data)} "
logger.warning(msg)
logger.debug(msg2)
n_skipped += 1
if raise_error:
raise SimpleError(msg + msg2)
else:
continue
# if n_sources > 1:
# logger.info(f"Sources added to database: {n_added}")
# logger.info(f"Names added to database: {n_names} \n")
# logger.info(f"Sources already in database: {n_existing}")
# logger.info(f"Alt Names added to database: {n_alt_names}")
# logger.info(f"Sources NOT added to database because multiple matches: {n_multiples}")
# logger.info(f"Sources NOT added to database: {n_skipped} \n")

# Try to add the source name to the Names table
try:
ingest_names(db, source, source)
n_names += 1
except sqlalchemy.exc.IntegrityError:
msg = f"{i}: Could not add {names_data} to database"
logger.warning(msg)
if raise_error:
raise SimpleError(msg)
else:
continue
# if n_added != n_names:
# msg = f"Number added should equal names added."
# raise SimpleError(msg)

if n_sources > 1:
logger.info(f"Sources added to database: {n_added}")
logger.info(f"Names added to database: {n_names} \n")
logger.info(f"Sources already in database: {n_existing}")
logger.info(f"Alt Names added to database: {n_alt_names}")
logger.info(f"Sources NOT added to database because multiple matches: {n_multiples}")
logger.info(f"Sources NOT added to database: {n_skipped} \n")

if n_added != n_names:
msg = f"Number added should equal names added."
raise SimpleError(msg)

if n_added + n_existing + n_multiples + n_skipped != n_sources:
msg = f"Number added + Number skipped doesn't add up to total sources"
raise SimpleError(msg)
# if n_added + n_existing + n_multiples + n_skipped != n_sources:
# msg = f"Number added + Number skipped doesn't add up to total sources"
# raise SimpleError(msg)

return

Expand Down Expand Up @@ -1580,3 +1457,174 @@ def ingest_companion_relationships(db, source, companion_name, relationship,
raise SimpleError(msg)


def ingest_source(db, source, reference=None, ra=None, dec=None, epoch=None, equinox=None,
raise_error=True, search_db=True, other_reference=None, comment=None):
# TODO: ADD DOCSTRING

if ra is None and dec is None:
coords_provided = False
else:
coords_provided = True
ra = ra
dec = dec
epoch = epoch
equinox = equinox

logger.debug(f"coords_provided:{coords_provided}")

# Find out if source is already in database or not
if coords_provided and search_db:
name_matches = find_source_in_db(db, source, ra=ra, dec=dec)
elif search_db:
name_matches = find_source_in_db(db, source)
elif not search_db:
name_matches = []
else:
name_matches = None

logger.debug(f"name_matches: {name_matches}")

# Source is already in database
if len(name_matches) == 1 and search_db:
msg = f"Not ingesting {source}. Already in database as {name_matches[0]}. \n "
logger.info(msg)

# Figure out if source name provided is an alternate name
db_source_matches = db.search_object(source, output_table='Sources', fuzzy_search=False)

# Try to add alternate source name to Names table
if len(db_source_matches) == 0:
alt_names_data = [{'source': name_matches[0], 'other_name': source}]
try:
with db.engine.connect() as conn:
conn.execute(db.Names.insert().values(alt_names_data))
conn.commit()
logger.debug(f" Name added to database: {alt_names_data}\n")
except sqlalchemy.exc.IntegrityError as e:
msg = f" Could not add {alt_names_data} to database"
logger.warning(msg)
if raise_error:
raise SimpleError(msg + '\n' + str(e))
else:
return
return # Source is already in database, nothing new to ingest

# Multiple source matches in the database so unable to ingest source
elif len(name_matches) > 1 and search_db:
msg1 = f" Not ingesting {source}."
msg = f" More than one match for {source}\n {name_matches}\n"
logger.warning(msg1 + msg)
if raise_error:
raise SimpleError(msg)
else:
return

# No match in the database, INGEST!
elif len(name_matches) == 0 or not search_db:

# Make sure reference is provided and in References table
if reference is None or ma.is_masked(reference):
msg = f"Not ingesting {source}. Discovery reference is blank. \n"
logger.warning(msg)
if raise_error:
raise SimpleError(msg)
else:
return

ref_check = find_publication(db, name=reference)
logger.debug(f'ref_check: {ref_check}')

if ref_check is False:
msg = f"Skipping: {source}. Discovery reference {reference} is not in Publications table. \n" \
f"(Add it with ingest_publication function.)"
logger.warning(msg)
if raise_error:
raise SimpleError(msg + msg2)
else:
return

# Try to get coordinates from SIMBAD if they were not provided
if not coords_provided:
# Try to get coordinates from SIMBAD
simbad_result_table = Simbad.query_object(source)

if simbad_result_table is None:
msg = f"Not ingesting {source}. Coordinates are needed and could not be retrieved from SIMBAD. \n"
logger.warning(msg)
if raise_error:
raise SimpleError(msg)
else:
return
# One SIMBAD match! Using those coordinates for source.
elif len(simbad_result_table) == 1:
simbad_coords = simbad_result_table['RA'][0] + ' ' + simbad_result_table['DEC'][0]
simbad_skycoord = SkyCoord(simbad_coords, unit=(u.hourangle, u.deg))
ra = simbad_skycoord.to_string(style='decimal').split()[0]
dec = simbad_skycoord.to_string(style='decimal').split()[1]
epoch = '2000' # Default coordinates from SIMBAD are epoch 2000.
equinox = 'J2000' # Default frame from SIMBAD is IRCS and J2000.
msg = f"Coordinates retrieved from SIMBAD {ra}, {dec}"
logger.debug(msg)
else:
msg = f"Not ingesting {source}. Coordinates are needed and could not be retrieved from SIMBAD. \n"
logger.warning(msg)
if raise_error:
raise SimpleError(msg)
else:
return



# Just in case other conditionals not met
else:
msg = f"Unexpected condition encountered ingesting {source}"
logger.error(msg)
raise SimpleError(msg)

logger.debug(f" Ingesting {source}.")

# Construct data to be added
source_data = [{'source': source,
'ra': ra,
'dec': dec,
'reference': reference,
'epoch': epoch,
'equinox': equinox,
'other_references': other_reference,
'comments': comment }]
names_data = [{'source': source,
'other_name': source}]

# Try to add the source to the database
try:
with db.engine.connect() as conn:
conn.execute(db.Sources.insert().values(source_data))
conn.commit()
msg = f"Added {str(source_data)}"
logger.info(f"Added {source}")
logger.debug(msg)
except sqlalchemy.exc.IntegrityError:
msg = f"Not ingesting {source}. Not sure why. \n"
msg2 = f" {str(source_data)} "
logger.warning(msg)
logger.debug(msg2)
if raise_error:
raise SimpleError(msg + msg2)
else:
return

# Try to add the source name to the Names table
try:
with db.engine.connect() as conn:
conn.execute(db.Names.insert().values(names_data))
conn.commit()
logger.debug(f" Name added to database: {names_data}\n")
except sqlalchemy.exc.IntegrityError:
msg = f" Could not add {names_data} to database"
logger.warning(msg)
if raise_error:
raise SimpleError(msg)
else:
return

return

0 comments on commit 1b048b0

Please sign in to comment.