Skip to content

Commit

Permalink
refactor: caged crawler
Browse files Browse the repository at this point in the history
  • Loading branch information
folhesgabriel committed Dec 16, 2024
1 parent 2b7e061 commit 47064a1
Showing 1 changed file with 163 additions and 28 deletions.
191 changes: 163 additions & 28 deletions models/br_me_caged/code/crawler_caged.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
from datetime import timedelta
from unidecode import unidecode
import re
from datetime import datetime, date
from dateutil.relativedelta import relativedelta

RENAME_DICT = {
"uf" : "sigla_uf",
Expand Down Expand Up @@ -37,9 +39,12 @@


}


#TODO: LÓGICA DE LOOPS
def download_file(ftp, remote_dir, filename, local_dir):
"""
Downloads and extracts a .7z file from an FTP server.
Downloads and extracts a .7z file from an FTP server with error handling.
Parameters:
ftp (ftplib.FTP): an active FTP connection
Expand All @@ -48,34 +53,83 @@ def download_file(ftp, remote_dir, filename, local_dir):
local_dir (str): the local directory to save and extract the file
Returns:
None
bool: True if file downloaded and extracted successfully, False otherwise
"""
global CORRUPT_FILES

os.makedirs(local_dir, exist_ok=True)
output_path = os.path.join(local_dir, filename)

with open(output_path, 'wb') as f:
ftp.retrbinary('RETR ' + filename, f.write)
try:
with open(output_path, 'wb') as f:
ftp.retrbinary('RETR ' + filename, f.write)

with py7zr.SevenZipFile(output_path, 'r') as archive:
archive.extractall(path=local_dir)
try:
with py7zr.SevenZipFile(output_path, 'r') as archive:
archive.extractall(path=local_dir)

os.remove(output_path)
return True

except (py7zr.Bad7zFile, _lzma.LZMAError) as extract_error:
print(f"Error extracting file {filename}: {extract_error}")
CORRUPT_FILES.append({
'filename': filename,
'local_path': output_path,
'error': str(extract_error)
})

return False

except Exception as download_error:
print(f"Error downloading file {filename}: {download_error}")
CORRUPT_FILES.append({
'filename': filename,
'local_path': output_path,
'error': str(download_error)
})

print(f'removendo zip corroimpido {output_path}')
if os.path.exists(output_path):
os.remove(output_path)

txt_output_path = output_path.replace('.7z', '.txt')
print(f'removendo txt corroimpido {txt_output_path}')
if os.path.exists(txt_output_path):
os.remove(txt_output_path)
return False

os.remove(output_path)

def crawler_novo_caged_ftp(yearmonth: str, ftp_host="ftp.mtps.gov.br") -> None:
def crawler_novo_caged_ftp(
yearmonth: str,
ftp_host: str = "ftp.mtps.gov.br",
file_types: list = None
) -> list:
"""
Downloads all .7z files from a specified year and month in the CAGED dataset from ftp://ftp.mtps.gov.br/pdet/microdados/NOVO CAGED/
Downloads specified .7z files from a CAGED dataset FTP server.
Parameters:
yearmonth (str): the month to download data from (e.g., '202301' for January 2023)
ftp_host (str): the FTP host to connect to (default: "ftp.mtps.gov.br")
file_types (list): list of file types to download.
Options: 'MOV' (movement), 'FOR' (out of deadline), 'EXC' (excluded)
If None, downloads all files
Returns:
None
list: List of successfully and unsuccessfully downloaded files
"""
global CORRUPT_FILES
CORRUPT_FILES = []

if len(yearmonth) != 6 or not yearmonth.isdigit():
raise ValueError("yearmonth must be a string in the format 'YYYYMM'")

if file_types:
file_types = [ft.upper() for ft in file_types]
valid_types = ['MOV', 'FOR', 'EXC']
if not all(ft in valid_types for ft in file_types):
raise ValueError(f"Invalid file types. Choose from {valid_types}")

ftp = ftplib.FTP(ftp_host)
ftp.login()
ftp.cwd(f'pdet/microdados/NOVO CAGED/{int(yearmonth[0:4])}/')
Expand All @@ -88,19 +142,46 @@ def crawler_novo_caged_ftp(yearmonth: str, ftp_host="ftp.mtps.gov.br") -> None:
print(f'Baixando para o mês: {yearmonth}')

filenames = [f for f in ftp.nlst() if f.endswith('.7z')]

successful_downloads = []
failed_downloads = []

for file in filenames:
if file.startswith('CAGEDMOV'):
if 'CAGEDMOV' in file and (not file_types or 'MOV' in file_types):
print(f'Baixando o arquivo: {file}')
download_file(ftp, yearmonth, file, "/tmp/caged/microdados_movimentacao/input/")
elif file.startswith('CAGEDFOR'):
success = download_file(ftp, yearmonth, file, "/tmp/caged/microdados_movimentacao/input/")
(successful_downloads if success else failed_downloads).append(file)

elif 'CAGEDFOR' in file and (not file_types or 'FOR' in file_types):
print(f'Baixando o arquivo: {file}')
download_file(ftp, yearmonth, file, "/tmp/caged/microdados_movimentacao_fora_prazo/input/")
elif file.startswith('CAGEDEXC'):
success = download_file(ftp, yearmonth, file, "/tmp/caged/microdados_movimentacao_fora_prazo/input/")
(successful_downloads if success else failed_downloads).append(file)

elif 'CAGEDEXC' in file and (not file_types or 'EXC' in file_types):
print(f'Baixando o arquivo: {file}')
download_file(ftp, yearmonth, file, "/tmp/caged/microdados_movimentacao_excluida/input/")
success = download_file(ftp, yearmonth, file, "/tmp/caged/microdados_movimentacao_excluida/input/")
(successful_downloads if success else failed_downloads).append(file)

ftp.quit()

print("\nDownload Summary:")
print(f"Successfully downloaded: {successful_downloads}")
print(f"Failed downloads: {failed_downloads}")

if CORRUPT_FILES:
print("\nCorrupt Files Details:")
for corrupt_file in CORRUPT_FILES:
print(f"Filename: {corrupt_file['filename']}")
print(f"Local Path: {corrupt_file['local_path']}")
print(f"Error: {corrupt_file['error']}")
print("---")

return {
'successful': successful_downloads,
'failed': failed_downloads,
'corrupt_files': CORRUPT_FILES
}

def build_partitions(table_id: str, yearmonth: str) -> str:
"""
build partitions from gtup files
Expand Down Expand Up @@ -137,9 +218,13 @@ def build_partitions(table_id: str, yearmonth: str) -> str:
}

for uf in dict_uf.values():
os.system(f"mkdir -p /tmp/caged/{table_id}/output/ano={yearmonth[0:4]}/mes={int(yearmonth[4:])}/sigla_uf={uf}/")
os.makedirs(
f"/tmp/caged/{table_id}/output/ano={yearmonth[0:4]}/mes={int(yearmonth[4:])}/sigla_uf={uf}/",
exist_ok=True
)

input_files = glob(f"/tmp/caged/{table_id}/input/*{yearmonth}*")

input_files = glob(f"/tmp/caged/{table_id}/input/*txt")

for filename in tqdm(input_files):
df = pd.read_csv(filename, sep=";", dtype={"uf": str})
Expand All @@ -151,25 +236,75 @@ def build_partitions(table_id: str, yearmonth: str) -> str:
df["uf"] = df["uf"].map(dict_uf)

df.rename(columns=RENAME_DICT, inplace=True)
df.columns



for state in dict_uf.values():
data = df[df["sigla_uf"] == state]
if table_id != 'microdados_movimentacao_excluida':

if table_id == 'microdados_movimentacao':
data = data.drop(["competenciamov", "sigla_uf", "regiao","competenciadec", "unidadesalariocodigo", "valorsalariofixo"], axis=1)
else:
data = data.drop(["competenciamov", "sigla_uf", "regiao","competenciadec", "unidadesalariocodigo", "valorsalariofixo", "competenciaexc"], axis=1)
elif table_id == 'microdados_movimentacao_fora_prazo':
data = data.drop(["sigla_uf", "regiao","competenciadec", "unidadesalariocodigo", "valorsalariofixo"], axis=1)
elif table_id == 'microdados_movimentacao_excluida':
data = data.drop(["sigla_uf", "regiao","competenciadec", "unidadesalariocodigo", "valorsalariofixo", "competenciaexc"], axis=1)

data.to_csv(
f"/tmp/caged/{table_id}/output/ano={ano}/mes={mes}/sigla_uf={state}/data.csv",
index=False,
)
del data
del df



def generate_yearmonth_range(start_date: str, end_date: str) -> list:
"""
Generate a list of yearmonth strings between start_date and end_date (inclusive).
Parameters:
start_date (str): Start date in format 'YYYYMM'
end_date (str): End date in format 'YYYYMM'
Returns:
list: List of yearmonth strings in chronological order
Raises:
ValueError: If date format is incorrect or start_date is after end_date
"""
# Validate input format
if not (len(start_date) == 6 and len(end_date) == 6 and
start_date.isdigit() and end_date.isdigit()):
raise ValueError("Dates must be in 'YYYYMM' format")

# Convert to datetime objects
start = datetime.strptime(start_date, '%Y%m')
end = datetime.strptime(end_date, '%Y%m')

# Validate date order
if start > end:
raise ValueError("Start date must be before or equal to end date")

# Generate list of yearmonths
yearmonths = []
current = start
while current <= end:
yearmonths.append(current.strftime('%Y%m'))
current += relativedelta(months=1)

return yearmonths


if __name__ == '__main__':
YEARMONTH = '202301'
TABLE = 'microdados_movimentacao'
crawler_novo_caged_ftp(yearmonth=YEARMONTH)
build_partitions(table_id=TABLE, yearmonth=YEARMONTH)
#NOTE: mude intervalo de datas e de tabelas conforme necessário

date_range = generate_yearmonth_range('202002', '202410')
table_ids = ['microdados_movimentacao_fora_prazo','microdados_movimentacao_excluida']

for YEARMONTH in date_range:
TABLE = 'microdados_movimentacao_excluida'
downloads = crawler_novo_caged_ftp(yearmonth=YEARMONTH, file_types=['EXC', 'FOR'])
print(downloads)

for table in table_ids:
for YEARMONTH in date_range:
build_partitions(table_id=table, yearmonth=YEARMONTH)

0 comments on commit 47064a1

Please sign in to comment.