Skip to content

Commit

Permalink
updated processing files
Browse files Browse the repository at this point in the history
  • Loading branch information
Girish3320 committed Dec 23, 2024
1 parent 6231940 commit e11384f
Show file tree
Hide file tree
Showing 12 changed files with 188 additions and 633,828 deletions.
92,737 changes: 0 additions & 92,737 deletions scripts/us_bls/jolts/BLSJolts.csv

This file was deleted.

3,304 changes: 0 additions & 3,304 deletions scripts/us_bls/jolts/BLSJolts_StatisticalVariables.mcf

This file was deleted.

26 changes: 22 additions & 4 deletions scripts/us_bls/jolts/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,26 @@ Additional information about each dataframe.
"""

IMPORTANT:
If any new statistical variable comes in future , it is to be added to the dictionaries :
1. _dcid_map in map_config.py
2. _mcf_map in mcf_config.py

If any new statistical variable comes in future , it is to be added to the dictionaries : _dcid_map in map_config.py

### Import Procedure

The below script will download the data, generate csv and mcf files.

`/usr/bin/python3 scripts/us_bls/jolts/bls_jolts.py`

Execute the 'bls_jolts.py' script by using the following commands:

- if you want to perform "download and process", run the below command:

`python3 bls_jolts.py`
- if you want to perform "only download", run the below command:

`python3 bls_jolts.py --mode=download`

- if you want to perform "only process", run the below command:

`python3 bls_jolts.py --mode=process`

301 changes: 166 additions & 135 deletions scripts/us_bls/jolts/bls_jolts.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,17 @@
import os
import textwrap
from absl import app
from absl import logging
from absl import flags
import time
import pandas as pd
import requests
from map_config import _dcid_map
import fileinput

_FLAGS = flags.FLAGS
flags.DEFINE_string('mode', '', 'Options: download or process')

# JOLTS dataset contains both NAICS industry codes and BLS jolts aggregations.
# Existing NAICS Codes are mapped directly while
# custom JOLTS codes include a colon distinguishing their new name.
Expand Down Expand Up @@ -68,92 +74,107 @@


def generate_cleaned_dataframe():
"""Fetches and combines BLS Jolts data sources.
Downloads detailed series information from the entire JOLTS dataset.
Each of the files is read, combined into a single dataframe, and processed.
Returns:
jolts_df: The 6 job data categories by industry, year, and adjustment,
as a data frame.
schema_mapping: List of tuples that contains information for each dataset.
"""
# Series descriptions are used for adjustment status and industry code.
exp_series_columns = [
'series_id', 'seasonal', 'industry_code', 'state_code', 'area_code',
'sizeclass_code', 'dataelement_code', 'ratelevel_code',
'footnote_codes', 'begin_year', 'begin_period', 'end_year', 'end_period'
]

"""Fetches and combines BLS Jolts data sources, with retry logic for file downloads.
Downloads detailed series information from the entire JOLTS dataset.
Each of the files is read, combined into a single dataframe, and processed.
Returns:
jolts_df: The 6 job data categories by industry, year, and adjustment, as a dataframe.
schema_mapping: List of tuples that contains information for each dataset.
"""

header = {'User-Agent': '[email protected]'}

series_desc = pd.read_csv(
"https://download.bls.gov/pub/time.series/jt/jt.series",
storage_options=header,
converters={'industry_code': str},
sep="\\t")
series_desc.columns = exp_series_columns
series_desc["series_id"] = series_desc["series_id"].apply(
lambda x: x.strip())

series_desc.to_csv("jolts_input_jt_series.csv")
assert len(series_desc.columns) == len(exp_series_columns)
assert (series_desc.columns == exp_series_columns).all()
series_desc = series_desc.set_index("series_id")

# Download various series datapoints
#job_openings = pd.read_csv(

job_openings = pd.read_csv(
"https://download.bls.gov/pub/time.series/jt/jt.data.2.JobOpenings",
storage_options=header,
sep="\\s+")
job_openings.to_csv("jolts_input_jt_job_openings.csv")

job_hires = pd.read_csv(
"https://download.bls.gov/pub/time.series/jt/jt.data.3.Hires",
storage_options=header,
sep="\\s+")
job_hires.to_csv("jolts_input_jt_job_hires.csv")

total_seps = pd.read_csv(
"https://download.bls.gov/pub/time.series/jt/jt.data.4.TotalSeparations", # pylint: disable=line-too-long
storage_options=header,
sep="\\s+")
total_seps.to_csv("jolts_input_jt_totlal_separations.csv")

total_quits = pd.read_csv(
"https://download.bls.gov/pub/time.series/jt/jt.data.5.Quits",
storage_options=header,
sep="\\s+")
total_quits.to_csv("jolts_input_jt_total_quits.csv")

total_layoffs = pd.read_csv(
"https://download.bls.gov/pub/time.series/jt/jt.data.6.LayoffsDischarges", # pylint: disable=line-too-long
storage_options=header,
sep="\\s+")
total_layoffs.to_csv("jolts_input_jt_total_layoffs.csv")

total_other_seps = pd.read_csv(
"https://download.bls.gov/pub/time.series/jt/jt.data.7.OtherSeparations", # pylint: disable=line-too-long
storage_options=header,
sep="\\s+")
total_other_seps.to_csv("jolts_input_jt_total_other_separations.csv")

max_retry = 5

def download_with_retries(url, retries=max_retry):
"""Download file with retry logic."""
retry_count = 0
while retry_count <= retries:
try:
data = pd.read_csv(url, storage_options=header, sep="\\s+")
return data
except Exception as e:
logging.error(f"Error downloading {url}, retrying... ({retry_count + 1}/{retries})")
time.sleep(5)
retry_count += 1
if retry_count > retries:
logging.fatal(f"Error downloading {url} after {retries} retries.")
raise e

# Download and process series description
try:
series_desc = pd.read_csv(
"https://download.bls.gov/pub/time.series/jt/jt.series",
storage_options=header,
converters={'industry_code': str},
sep="\\t"
)
series_desc.columns = [
'series_id', 'seasonal', 'industry_code', 'state_code', 'area_code',
'sizeclass_code', 'dataelement_code', 'ratelevel_code',
'footnote_codes', 'begin_year', 'begin_period', 'end_year', 'end_period'
]
series_desc["series_id"] = series_desc["series_id"].apply(lambda x: x.strip())
series_desc.to_csv("jolts_input_jt_series.csv")
series_desc = series_desc.set_index("series_id")
except Exception as e:
logging.fatal(f"Failed to download series description: {e}")
return

# Download and process other datasets with retries
try:
job_openings = download_with_retries("https://download.bls.gov/pub/time.series/jt/jt.data.2.JobOpenings")
job_openings.to_csv("jolts_input_jt_job_openings.csv")
except Exception as e:
logging.fatal(f"Failed to download job openings data: {e}")
return

try:
job_hires = download_with_retries("https://download.bls.gov/pub/time.series/jt/jt.data.3.Hires")
job_hires.to_csv("jolts_input_jt_job_hires.csv")
except Exception as e:
logging.fatal(f"Failed to download job hires data: {e}")
return

try:
total_seps = download_with_retries("https://download.bls.gov/pub/time.series/jt/jt.data.4.TotalSeparations")
total_seps.to_csv("jolts_input_jt_totlal_separations.csv")
except Exception as e:
logging.fatal(f"Failed to download total separations data: {e}")
return

try:
total_quits = download_with_retries("https://download.bls.gov/pub/time.series/jt/jt.data.5.Quits")
total_quits.to_csv("jolts_input_jt_total_quits.csv")
except Exception as e:
logging.fatal(f"Failed to download total quits data: {e}")
return

try:
total_layoffs = download_with_retries("https://download.bls.gov/pub/time.series/jt/jt.data.6.LayoffsDischarges")
total_layoffs.to_csv("jolts_input_jt_total_layoffs.csv")
except Exception as e:
logging.fatal(f"Failed to download total layoffs data: {e}")
return

try:
total_other_seps = download_with_retries("https://download.bls.gov/pub/time.series/jt/jt.data.7.OtherSeparations")
total_other_seps.to_csv("jolts_input_jt_total_other_separations.csv")
except Exception as e:
logging.fatal(f"Failed to download other separations data: {e}")
return

# Additional information about each dataframe.
# Tuple Format: Statistical Variable name, Stat Var population,
# Stat Var Job Change Type If Relevant, Dataframe for Stat Var.
schema_mapping = [
("Count_JobPosting", "schema:JobPosting", "", job_openings),
("Count_Worker_Hire", "dcs:BLSWorker", "Hire", job_hires),
("Count_Worker_Separation", "dcs:BLSWorker", "Separation", total_seps),
("Count_Worker_VoluntarySeparation", "dcs:BLSWorker",
"VoluntarySeparation", total_quits),
("Count_Worker_InvoluntarySeparation", "dcs:BLSWorker",
"InvoluntarySeparation", total_layoffs),
("Count_Worker_OtherSeparation", "dcs:BLSWorker", "OtherSeparation",
total_other_seps),
("Count_Worker_VoluntarySeparation", "dcs:BLSWorker", "VoluntarySeparation", total_quits),
("Count_Worker_InvoluntarySeparation", "dcs:BLSWorker", "InvoluntarySeparation", total_layoffs),
("Count_Worker_OtherSeparation", "dcs:BLSWorker", "OtherSeparation", total_other_seps),
]

# Combine datasets into a single dataframe including origin of data.
jolts_df = pd.DataFrame()
job_columns = ['series_id', 'year', 'period', 'value', 'footnote_codes']
Expand Down Expand Up @@ -191,6 +212,7 @@ def period_year_to_iso_8601(row):
left_on=["series_id"],
right_index=True)
jolts_df.to_csv("before_query.csv", index=False)

# Drop rate data, preliminary data, and non-national data.
jolts_df = jolts_df.query("ratelevel_code == 'L'")
jolts_df = jolts_df.query("footnote_codes != 'P'")
Expand Down Expand Up @@ -237,7 +259,7 @@ def row_to_stat_var(row):
return jolts_df, schema_mapping


def create_statistical_variables(jolts_df, schema_mapping):
def process(jolts_df, schema_mapping):
"""Creates Statistical Variable nodes.
A new statistical industry is needed for each of the 6 job variables
Expand All @@ -250,69 +272,78 @@ def create_statistical_variables(jolts_df, schema_mapping):
Args:
jolts_df: The df of BLS Jolts data created by generate_cleaned_dataframe.
schema_mapping: The schema mapping created by generate_cleaned_dataframe.
"""
"""
template_stat_var = """
Node: dcid:{STAT_CLASS}_NAICS{INDUSTRY}_{ADJUSTMENT}
typeOf: dcs:StatisticalVariable
populationType: {POPULATION}
jobChangeEvent: dcs:{JOB_CHANGE_EVENT}
statType: dcs:measuredValue
measuredProperty: dcs:count
measurementQualifier: {BLS_ADJUSTMENT}
naics: dcid:NAICS/{INDUSTRY}
"""
Node: dcid:{STAT_CLASS}_NAICS{INDUSTRY}_{ADJUSTMENT}
typeOf: dcs:StatisticalVariable
populationType: {POPULATION}
jobChangeEvent: dcs:{JOB_CHANGE_EVENT}
statType: dcs:measuredValue
measuredProperty: dcs:count
measurementQualifier: {BLS_ADJUSTMENT}
naics: dcid:NAICS/{INDUSTRY}
"""

# Map industry and seasonal adjustment to statistical variable name.
adjustment_types = [("Adjusted", "dcs:BLSSeasonallyAdjusted"),
("Unadjusted", "dcs:BLSSeasonallyUnadjusted")]
# adjustment_types = [("Adjusted", "dcs:Adjusted"),
# ("Unadjusted", "dcs:Unadjusted")]

# Output the schema mapping to a new file.
with open("BLSJolts_StatisticalVariables.mcf", "w+", newline="") as f_out:
for schema_name, pop_type, job_change_event, _ in schema_mapping:
for industry_code in list(jolts_df['industry_code'].unique()):
for adjusted_dcid_map, adjusted_schema in adjustment_types:
if adjusted_schema == "dcs:BLSSeasonallyAdjusted":
adjusted_schema = "dcs:Adjusted"
else:
adjusted_schema = "dcs:Unadjusted"

# Create new schema object.
stat_var_schema = textwrap.dedent(template_stat_var)

# Remove separation type entry if not includes.
if job_change_event == "":
stat_var_schema = (stat_var_schema.replace(
"jobChangeEvent: dcs:{JOB_CHANGE_EVENT}\n", ""))

# Replace all other fields.
f_out.write(
stat_var_schema.replace(
"{STAT_CLASS}", schema_name).replace(
"{INDUSTRY}", industry_code).replace(

try:
# Output the schema mapping to a new file.
with open("BLSJolts_StatisticalVariables.mcf", "w+", newline="") as f_out:
logging.info("Started writing statistical variable schemas to 'BLSJolts_StatisticalVariables.mcf'.")
for schema_name, pop_type, job_change_event, _ in schema_mapping:
for industry_code in list(jolts_df['industry_code'].unique()):
for adjusted_dcid_map, adjusted_schema in adjustment_types:
if adjusted_schema == "dcs:BLSSeasonallyAdjusted":
adjusted_schema = "dcs:Adjusted"
else:
adjusted_schema = "dcs:Unadjusted"

# Create new schema object.
stat_var_schema = textwrap.dedent(template_stat_var)

# Remove jobChangeEvent if not provided.
if job_change_event == "":
stat_var_schema = (stat_var_schema.replace(
"jobChangeEvent: dcs:{JOB_CHANGE_EVENT}\n", ""))

# Replace all placeholders with the actual values.
try:
f_out.write(
stat_var_schema.replace(
"{STAT_CLASS}", schema_name).replace(
"{INDUSTRY}", industry_code).replace(
"{ADJUSTMENT}", adjusted_dcid_map).replace(
"{BLS_ADJUSTMENT}",
adjusted_schema).replace(
"{POPULATION}", pop_type).replace(
"{JOB_CHANGE_EVENT}",
job_change_event))
"{BLS_ADJUSTMENT}", adjusted_schema).replace(
"{POPULATION}", pop_type).replace(
"{JOB_CHANGE_EVENT}", job_change_event)
)
except Exception as e:
logging.error(f"Error writing schema for {schema_name}, {industry_code}, {adjusted_dcid_map}: {e}")
continue # Skip to the next item if writing fails

logging.info("Finished writing all statistical variable schemas.")
except Exception as e:
logging.error(f"Error processing BLS Jolts data or writing to file: {e}")


def main(_):
""" Executes the downloading, preprocessing, and outputting of
required MCF and CSV for JOLTS data.
"""
# Download and clean data.
jolts_df, schema_mapping = generate_cleaned_dataframe()

# Output final cleaned CSV.
final_columns = ['Date', 'StatisticalVariable', 'Value']
jolts_df.loc[:, final_columns].to_csv("BLSJolts.csv",
index=False,
encoding="utf-8")

# Create and output Statistical Variables.
create_statistical_variables(jolts_df, schema_mapping)
mode = _FLAGS.mode

if mode == "download" or mode == "":
logging.info("Downloading files...")
generate_cleaned_dataframe() # This function handles file download
logging.info("Download completed!")

if mode == "process" or mode == "":
logging.info("Processing data...")
jolts_df, schema_mapping = generate_cleaned_dataframe()
# Process and output final cleaned CSV and MCF
final_columns = ['Date', 'StatisticalVariable', 'Value']
jolts_df.loc[:, final_columns].to_csv("BLSJolts.csv", index=False, encoding="utf-8")
process(jolts_df, schema_mapping)
logging.info("Process completed!")


if __name__ == '__main__':
Expand Down
Loading

0 comments on commit e11384f

Please sign in to comment.