From 47064a1dcc41749f372c9a7779f9d71a3a8936c6 Mon Sep 17 00:00:00 2001 From: folhesgabriel <pisagabriel09@gmail.com> Date: Mon, 16 Dec 2024 12:21:27 -0300 Subject: [PATCH] refactor: caged crawler --- models/br_me_caged/code/crawler_caged.py | 191 +++++++++++++++++++---- 1 file changed, 163 insertions(+), 28 deletions(-) diff --git a/models/br_me_caged/code/crawler_caged.py b/models/br_me_caged/code/crawler_caged.py index 5bdf6153..e27999c8 100644 --- a/models/br_me_caged/code/crawler_caged.py +++ b/models/br_me_caged/code/crawler_caged.py @@ -8,6 +8,8 @@ from datetime import timedelta from unidecode import unidecode import re +from datetime import datetime, date +from dateutil.relativedelta import relativedelta RENAME_DICT = { "uf" : "sigla_uf", @@ -37,9 +39,12 @@ } + + +#TODO: LÓGICA DE LOOPS def download_file(ftp, remote_dir, filename, local_dir): """ - Downloads and extracts a .7z file from an FTP server. + Downloads and extracts a .7z file from an FTP server with error handling. Parameters: ftp (ftplib.FTP): an active FTP connection @@ -48,34 +53,83 @@ def download_file(ftp, remote_dir, filename, local_dir): local_dir (str): the local directory to save and extract the file Returns: - None + bool: True if file downloaded and extracted successfully, False otherwise """ + global CORRUPT_FILES + os.makedirs(local_dir, exist_ok=True) output_path = os.path.join(local_dir, filename) - with open(output_path, 'wb') as f: - ftp.retrbinary('RETR ' + filename, f.write) + try: + with open(output_path, 'wb') as f: + ftp.retrbinary('RETR ' + filename, f.write) - with py7zr.SevenZipFile(output_path, 'r') as archive: - archive.extractall(path=local_dir) + try: + with py7zr.SevenZipFile(output_path, 'r') as archive: + archive.extractall(path=local_dir) + + os.remove(output_path) + return True + + except (py7zr.Bad7zFile, _lzma.LZMAError) as extract_error: + print(f"Error extracting file {filename}: {extract_error}") + CORRUPT_FILES.append({ + 'filename': filename, + 'local_path': output_path, + 'error': str(extract_error) + }) + + return False + + except Exception as download_error: + print(f"Error downloading file {filename}: {download_error}") + CORRUPT_FILES.append({ + 'filename': filename, + 'local_path': output_path, + 'error': str(download_error) + }) + + print(f'removendo zip corroimpido {output_path}') + if os.path.exists(output_path): + os.remove(output_path) + + txt_output_path = output_path.replace('.7z', '.txt') + print(f'removendo txt corroimpido {txt_output_path}') + if os.path.exists(txt_output_path): + os.remove(txt_output_path) + return False - os.remove(output_path) -def crawler_novo_caged_ftp(yearmonth: str, ftp_host="ftp.mtps.gov.br") -> None: +def crawler_novo_caged_ftp( + yearmonth: str, + ftp_host: str = "ftp.mtps.gov.br", + file_types: list = None +) -> list: """ - Downloads all .7z files from a specified year and month in the CAGED dataset from ftp://ftp.mtps.gov.br/pdet/microdados/NOVO CAGED/ + Downloads specified .7z files from a CAGED dataset FTP server. Parameters: yearmonth (str): the month to download data from (e.g., '202301' for January 2023) ftp_host (str): the FTP host to connect to (default: "ftp.mtps.gov.br") + file_types (list): list of file types to download. + Options: 'MOV' (movement), 'FOR' (out of deadline), 'EXC' (excluded) + If None, downloads all files Returns: - None + list: List of successfully and unsuccessfully downloaded files """ + global CORRUPT_FILES + CORRUPT_FILES = [] if len(yearmonth) != 6 or not yearmonth.isdigit(): raise ValueError("yearmonth must be a string in the format 'YYYYMM'") + if file_types: + file_types = [ft.upper() for ft in file_types] + valid_types = ['MOV', 'FOR', 'EXC'] + if not all(ft in valid_types for ft in file_types): + raise ValueError(f"Invalid file types. Choose from {valid_types}") + ftp = ftplib.FTP(ftp_host) ftp.login() ftp.cwd(f'pdet/microdados/NOVO CAGED/{int(yearmonth[0:4])}/') @@ -88,19 +142,46 @@ def crawler_novo_caged_ftp(yearmonth: str, ftp_host="ftp.mtps.gov.br") -> None: print(f'Baixando para o mês: {yearmonth}') filenames = [f for f in ftp.nlst() if f.endswith('.7z')] + + successful_downloads = [] + failed_downloads = [] + for file in filenames: - if file.startswith('CAGEDMOV'): + if 'CAGEDMOV' in file and (not file_types or 'MOV' in file_types): print(f'Baixando o arquivo: {file}') - download_file(ftp, yearmonth, file, "/tmp/caged/microdados_movimentacao/input/") - elif file.startswith('CAGEDFOR'): + success = download_file(ftp, yearmonth, file, "/tmp/caged/microdados_movimentacao/input/") + (successful_downloads if success else failed_downloads).append(file) + + elif 'CAGEDFOR' in file and (not file_types or 'FOR' in file_types): print(f'Baixando o arquivo: {file}') - download_file(ftp, yearmonth, file, "/tmp/caged/microdados_movimentacao_fora_prazo/input/") - elif file.startswith('CAGEDEXC'): + success = download_file(ftp, yearmonth, file, "/tmp/caged/microdados_movimentacao_fora_prazo/input/") + (successful_downloads if success else failed_downloads).append(file) + + elif 'CAGEDEXC' in file and (not file_types or 'EXC' in file_types): print(f'Baixando o arquivo: {file}') - download_file(ftp, yearmonth, file, "/tmp/caged/microdados_movimentacao_excluida/input/") + success = download_file(ftp, yearmonth, file, "/tmp/caged/microdados_movimentacao_excluida/input/") + (successful_downloads if success else failed_downloads).append(file) ftp.quit() + print("\nDownload Summary:") + print(f"Successfully downloaded: {successful_downloads}") + print(f"Failed downloads: {failed_downloads}") + + if CORRUPT_FILES: + print("\nCorrupt Files Details:") + for corrupt_file in CORRUPT_FILES: + print(f"Filename: {corrupt_file['filename']}") + print(f"Local Path: {corrupt_file['local_path']}") + print(f"Error: {corrupt_file['error']}") + print("---") + + return { + 'successful': successful_downloads, + 'failed': failed_downloads, + 'corrupt_files': CORRUPT_FILES + } + def build_partitions(table_id: str, yearmonth: str) -> str: """ build partitions from gtup files @@ -137,9 +218,13 @@ def build_partitions(table_id: str, yearmonth: str) -> str: } for uf in dict_uf.values(): - os.system(f"mkdir -p /tmp/caged/{table_id}/output/ano={yearmonth[0:4]}/mes={int(yearmonth[4:])}/sigla_uf={uf}/") + os.makedirs( + f"/tmp/caged/{table_id}/output/ano={yearmonth[0:4]}/mes={int(yearmonth[4:])}/sigla_uf={uf}/", + exist_ok=True + ) + + input_files = glob(f"/tmp/caged/{table_id}/input/*{yearmonth}*") - input_files = glob(f"/tmp/caged/{table_id}/input/*txt") for filename in tqdm(input_files): df = pd.read_csv(filename, sep=";", dtype={"uf": str}) @@ -151,16 +236,17 @@ def build_partitions(table_id: str, yearmonth: str) -> str: df["uf"] = df["uf"].map(dict_uf) df.rename(columns=RENAME_DICT, inplace=True) - df.columns - - for state in dict_uf.values(): data = df[df["sigla_uf"] == state] - if table_id != 'microdados_movimentacao_excluida': + + if table_id == 'microdados_movimentacao': data = data.drop(["competenciamov", "sigla_uf", "regiao","competenciadec", "unidadesalariocodigo", "valorsalariofixo"], axis=1) - else: - data = data.drop(["competenciamov", "sigla_uf", "regiao","competenciadec", "unidadesalariocodigo", "valorsalariofixo", "competenciaexc"], axis=1) + elif table_id == 'microdados_movimentacao_fora_prazo': + data = data.drop(["sigla_uf", "regiao","competenciadec", "unidadesalariocodigo", "valorsalariofixo"], axis=1) + elif table_id == 'microdados_movimentacao_excluida': + data = data.drop(["sigla_uf", "regiao","competenciadec", "unidadesalariocodigo", "valorsalariofixo", "competenciaexc"], axis=1) + data.to_csv( f"/tmp/caged/{table_id}/output/ano={ano}/mes={mes}/sigla_uf={state}/data.csv", index=False, @@ -168,8 +254,57 @@ def build_partitions(table_id: str, yearmonth: str) -> str: del data del df + + +def generate_yearmonth_range(start_date: str, end_date: str) -> list: + """ + Generate a list of yearmonth strings between start_date and end_date (inclusive). + + Parameters: + start_date (str): Start date in format 'YYYYMM' + end_date (str): End date in format 'YYYYMM' + + Returns: + list: List of yearmonth strings in chronological order + + Raises: + ValueError: If date format is incorrect or start_date is after end_date + """ + # Validate input format + if not (len(start_date) == 6 and len(end_date) == 6 and + start_date.isdigit() and end_date.isdigit()): + raise ValueError("Dates must be in 'YYYYMM' format") + + # Convert to datetime objects + start = datetime.strptime(start_date, '%Y%m') + end = datetime.strptime(end_date, '%Y%m') + + # Validate date order + if start > end: + raise ValueError("Start date must be before or equal to end date") + + # Generate list of yearmonths + yearmonths = [] + current = start + while current <= end: + yearmonths.append(current.strftime('%Y%m')) + current += relativedelta(months=1) + + return yearmonths + + if __name__ == '__main__': - YEARMONTH = '202301' - TABLE = 'microdados_movimentacao' - crawler_novo_caged_ftp(yearmonth=YEARMONTH) - build_partitions(table_id=TABLE, yearmonth=YEARMONTH) + #NOTE: mude intervalo de datas e de tabelas conforme necessário + + date_range = generate_yearmonth_range('202002', '202410') + table_ids = ['microdados_movimentacao_fora_prazo','microdados_movimentacao_excluida'] + + for YEARMONTH in date_range: + TABLE = 'microdados_movimentacao_excluida' + downloads = crawler_novo_caged_ftp(yearmonth=YEARMONTH, file_types=['EXC', 'FOR']) + print(downloads) + + for table in table_ids: + for YEARMONTH in date_range: + build_partitions(table_id=table, yearmonth=YEARMONTH) + \ No newline at end of file