Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: br_cnj_improbidade_administrativa #728

Draft
wants to merge 13 commits into
base: main
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,12 @@ class constants(Enum): # pylint: disable=c0103

HOME_PAGE_TEMPLATE = "https://www.cnj.jus.br/improbidade_adm/consultar_requerido.php?validar=form&rs=pesquisarRequeridoGetTabela&rst=&rsrnd=0&rsargs[]=&rsargs[]=&rsargs[]=&rsargs[]=&rsargs[]=&rsargs[]=&rsargs[]=I&rsargs[]=0&rsargs[]=POSICAO_INICIAL_PAGINACAO_PHP{index}&rsargs[]=QUANTIDADE_REGISTROS_PAGINACAO15"

CONDENACAO_URL_TEMPLATE = "https://www.cnj.jus.br/improbidade_adm/visualizar_condenacao.php?seq_condenacao={id}"
CONDENACAO_URL_TEMPLATE = (
"https://www.cnj.jus.br/improbidade_adm/visualizar_condenacao.php?seq_condenacao={id}"
)

PROCESS_URL_TEMPLATE = "https://www.cnj.jus.br/improbidade_adm/visualizar_processo.php?seq_processo={id}"
PROCESS_URL_TEMPLATE = (
"https://www.cnj.jus.br/improbidade_adm/visualizar_processo.php?seq_processo={id}"
)

PEOPLE_INFO_URL_TEMPLATE = "https://www.cnj.jus.br/improbidade_adm/visualizar_condenacao.php?seq_condenacao={sentence_id}&rs=getDadosParte&rst=&rsrnd=0&rsargs[]={people_id}"
16 changes: 9 additions & 7 deletions pipelines/datasets/br_cnj_improbidade_administrativa/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@
from pipelines.constants import constants
from pipelines.datasets.br_cnj_improbidade_administrativa.schedules import every_month
from pipelines.datasets.br_cnj_improbidade_administrativa.tasks import (
get_max_date,
is_up_to_date,
get_cookies,
main_task,
write_csv_file,
)
Expand All @@ -35,7 +35,7 @@
table_id = Parameter("table_id", default="condenacao", required=True)
update_metadata = Parameter("update_metadata", default=True, required=False)
materialization_mode = Parameter("materialization_mode", default="prod", required=False)
materialize_after_dump = Parameter("materialize after dump", default=True, required=False)
materialize_after_dump = Parameter("materialize_after_dump", default=True, required=False)
dbt_alias = Parameter("dbt_alias", default=True, required=False)

rename_flow_run = rename_current_flow_run_dataset_table(
Expand All @@ -50,15 +50,17 @@
with case(is_updated, False):
log_task("Data is outdated")

df = main_task(upstream_tasks=[is_updated])
cookies_driver = get_cookies(upstream_tasks=[is_updated])

log_task(df)
csv_paths = main_task(cookies_driver, upstream_tasks=[cookies_driver])

max_date = get_max_date(df, upstream_tasks=[df])
log_task(csv_paths)

log_task(f"Max date: {max_date}")
# max_date = get_max_date(csv_paths, upstream_tasks=[csv_paths])

output_filepath = write_csv_file(df, upstream_tasks=[max_date])
# log_task(f"Max date: {max_date}")

output_filepath = write_csv_file(csv_paths, upstream_tasks=[csv_paths])

wait_upload_table = create_table_and_upload_to_gcs(
data_path=output_filepath,
Expand Down
206 changes: 166 additions & 40 deletions pipelines/datasets/br_cnj_improbidade_administrativa/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@
import itertools
import time

from selenium import webdriver
import basedosdados as bd
import httpx
import numpy as np
import pandas as pd
from lxml import html
from prefect import task
from selenium.webdriver.chrome.webdriver import WebDriver

from pipelines.datasets.br_cnj_improbidade_administrativa.utils import (
PeopleInfoResponse,
Expand Down Expand Up @@ -42,31 +44,29 @@ def get_number_pages() -> int:
return int(nodes[0].text)


async def crawler_home_page(total_pages: int) -> list[httpx.Response]:

async def crawler_home_page(total_pages: int, cookies) -> list[httpx.Response]:
pages_urls = [build_home_url_page(i) for i in range(0, total_pages)]

max_connections = 3
timeout = httpx.Timeout(30, pool=3.0)
max_connections = 20
timeout = httpx.Timeout(100, pool=10.0, connect=60.0)
limits = httpx.Limits(max_connections=max_connections)
semaphore = asyncio.Semaphore(max_connections)

async def wrapper(co):
async with semaphore:
return await co

async with httpx.AsyncClient(limits=limits, timeout=timeout) as client:
return await asyncio.gather(
*[wrapper(get_async(client, url)) for url in pages_urls]
)
async with httpx.AsyncClient(limits=limits, timeout=timeout, cookies=cookies) as client:
log(client.cookies)
return await asyncio.gather(*[wrapper(get_async(client, url)) for url in pages_urls])


async def crawler_sentences(
peoples_info: list[PeopleLine],
) -> list[SentenceResponse]:
condenacao_ids: list[str] = np.unique([i["condenacao_id"] for i in peoples_info])

max_connections = 3
max_connections = 5
timeout = httpx.Timeout(30, pool=3.0)
limits = httpx.Limits(max_connections=max_connections)
semaphore = asyncio.Semaphore(max_connections)
Expand All @@ -77,7 +77,9 @@ async def wrapper(client, sentence_id):
return {"condenacao_id": sentence_id, "response": response}

async with httpx.AsyncClient(limits=limits, timeout=timeout) as client:
return await asyncio.gather(*[wrapper(client, sentence_id) for sentence_id in condenacao_ids]) # type: ignore
return await asyncio.gather(
*[wrapper(client, sentence_id) for sentence_id in condenacao_ids]
) # type: ignore


async def get_peoples_info(ids: list[tuple[str, str]]) -> list[PeopleInfoResponse]:
Expand All @@ -88,21 +90,18 @@ async def get_peoples_info(ids: list[tuple[str, str]]) -> list[PeopleInfoRespons

async def wrapper(client, sentence_id, people_id):
async with semaphore:
response = await get_async(
client, build_people_info_url(sentence_id, people_id)
)
response = await get_async(client, build_people_info_url(sentence_id, people_id))
return {"condenacao_id": sentence_id, "response": response}

async with httpx.AsyncClient(timeout=timeout, limits=limits) as client:
async with httpx.AsyncClient(
timeout=timeout, limits=limits, headers={"Connection": "close"}
) as client:
return await asyncio.gather(
*[
wrapper(client, sentence_id, people_id)
for (sentence_id, people_id) in ids
] # type: ignore
*[wrapper(client, sentence_id, people_id) for (sentence_id, people_id) in ids] # type: ignore
)


async def crawler_process(peoples: list[PeopleLine]) -> list[ProcessInfoResponse]:
async def crawler_processes(peoples: list[PeopleLine]) -> list[ProcessInfoResponse]:
max_connections = 5
timeout = httpx.Timeout(30, pool=3.0, read=None)
semaphore = asyncio.Semaphore(max_connections)
Expand All @@ -119,17 +118,103 @@ async def wrapper(client: httpx.AsyncClient, people: PeopleLine):
)


async def run_async(total_pages: int) -> pd.DataFrame:
async def get_peoples_main_page(total_pages: int, cookies) -> list[PeopleLine]:
requests_home_page = await crawler_home_page(total_pages, cookies)

return list(itertools.chain(*[parse_peoples(i) for i in requests_home_page]))


async def get_all_sentences(peoples: list[PeopleLine]) -> list[dict]:
sentence_responses = await crawler_sentences(peoples)

return [parse_sentence(i) for i in sentence_responses]

# df_senteces = pd.DataFrame(parsed_sentences)
# df_senteces["condenacao_id"] = df_senteces["condenacao_id"].astype("string")

# path = "tmp/sentences.csv"
# df_senteces.to_csv(path, index=False)

# return df_senteces


async def get_all_processes(peoples: list[PeopleLine]):
process_responses = await crawler_processes(peoples)
parsed_process = [parse_process(i) for i in process_responses]

df_process = pd.DataFrame(parsed_process)
df_process["processo_id"] = df_process["processo_id"].astype("string")
path = "/tmp/processes.csv"

df_process.to_csv(path, index=False)

return path


async def get_all_peoples_info(peoples_sentence_id: list[tuple[str, str]]) -> str:
peoples_info_responses = await get_peoples_info(peoples_sentence_id)
parsed_peoples_info = [parse_people_data(i) for i in peoples_info_responses]
df_parsed_peoples_info = pd.DataFrame(parsed_peoples_info)
df_parsed_peoples_info["condenacao_id"] = df_parsed_peoples_info["condenacao_id"].astype(
"string"
)

path = "/tmp/peoples_info.csv"
df_parsed_peoples_info.to_csv(path, index=False)
return path


async def main_crawler(total_pages, cookies):
peoples = await get_peoples_main_page(total_pages, cookies)
log("Get peoples main page finished")
log("Starting get_all_sentences")
sentences = await get_all_sentences(peoples)
log("Starting get_all_processes")
process_csv_path = await get_all_processes(peoples)

# sentences, process_csv_path = await asyncio.gather(
# *[get_all_sentences(peoples), get_all_processes(peoples)] # type: ignore
# )

log("Sentences and processes finished")

valid_info_peoples = [
(sentence["condenacao_id"], sentence["pessoa_id"]) # type: ignore
for sentence in sentences
if "pessoa_id" in sentence # type: ignore
]

log("Starting get peoples infos")
peoples_info_csv = await get_all_peoples_info(valid_info_peoples)
log("Get all peoples info finished")

# Save peoples
peoples_path = "/tmp/peoples.csv"
pd.DataFrame(peoples).to_csv(peoples_path, index=False)

# Save sentences
sentences_path = "/tmp/sentences.csv"
df_sentences = pd.DataFrame(sentences)
df_sentences["condenacao_id"] = df_sentences["condenacao_id"].astype("string")
df_sentences.rename(
columns={
name: normalize_string(name.replace(":", "").replace("?", ""))
for name in df_sentences.columns
},
errors="raise",
).to_csv(sentences_path, index=False)

return (peoples_path, peoples_info_csv, sentences_path, process_csv_path)


async def run_async(total_pages: int) -> pd.DataFrame:
time_start_home_page = time.time()
requests_home_page = await crawler_home_page(total_pages)
time_end_home_page = time.time()

log("Crawler home page finished")

parsed_main_list = list(
itertools.chain(*[parse_peoples(i) for i in requests_home_page])
)
parsed_main_list = list(itertools.chain(*[parse_peoples(i) for i in requests_home_page]))

log("Starting crawler sentences")
time_start_get_sentences = time.time()
Expand Down Expand Up @@ -157,27 +242,23 @@ async def run_async(total_pages: int) -> pd.DataFrame:
parsed_peoples_info = [parse_people_data(i) for i in peoples_info_responses]

time_start_crawler_process = time.time()
process_responses = await crawler_process(parsed_main_list)
process_responses = await crawler_processes(parsed_main_list)
time_end_crawler_process = time.time()

parsed_process = [parse_process(i) for i in process_responses]

log(f"Crawler home page. Time {time_end_home_page - time_start_home_page}")
log(f"Crawler sentences. Time {time_end_get_sentences - time_start_get_sentences}")
log(
f"Crawler peoples infos. Time {time_end_get_info_peples - time_start_get_info_peoples}"
)
log(
f"Crawler process. Time {time_end_crawler_process - time_start_crawler_process}"
)
log(f"Crawler peoples infos. Time {time_end_get_info_peples - time_start_get_info_peoples}")
log(f"Crawler process. Time {time_end_crawler_process - time_start_crawler_process}")

df_main_list = pd.DataFrame(parsed_main_list)
df_main_list["condenacao_id"] = df_main_list["condenacao_id"].astype("string")

df_parsed_peoples_info = pd.DataFrame(parsed_peoples_info)
df_parsed_peoples_info["condenacao_id"] = df_parsed_peoples_info[
"condenacao_id"
].astype("string")
df_parsed_peoples_info["condenacao_id"] = df_parsed_peoples_info["condenacao_id"].astype(
"string"
)

df_process = pd.DataFrame(parsed_process)
df_process["processo_id"] = df_process["processo_id"].astype("string")
Expand All @@ -204,17 +285,37 @@ async def run_async(total_pages: int) -> pd.DataFrame:


@task
def main_task() -> pd.DataFrame:
def main_task(cookies_driver: tuple[WebDriver, dict[str, str]]):
driver, cookies = cookies_driver
pages = get_number_pages()
return asyncio.run(run_async(pages))
return asyncio.run(main_crawler(pages, cookies))


@task
def write_csv_file(csv_paths: tuple[str, str, str, str]) -> str:
peoples, peoples_info, sentences, processes = csv_paths

peoples_df = pd.read_csv(peoples)
peoples_info_df = pd.read_csv(peoples_info)
sentences_df = pd.read_csv(sentences)
processes_df = pd.read_csv(processes)

path = "/tmp/data.csv"

peoples_df.merge(peoples_info_df, left_on="condenacao_id", right_on="condenacao_id").merge(
sentences_df, left_on="condenacao_id", right_on="condenacao_id"
).merge(processes_df, left_on="processo_id", right_on="processo_id").to_csv(path, index=False)

log("csv file saved")

return path


@task
def is_up_to_date() -> bool:
number_lines_bq = bd.read_sql(
query="select count(*) as n from `basedosdados.br_cnj_improbidade_administrativa.condenacao`",
from_file=True,
billing_project_id="basedosdados-dev",
)
value_from_bq = number_lines_bq["n"][0] # type: ignore

Expand Down Expand Up @@ -245,7 +346,32 @@ def get_max_date(df: pd.DataFrame) -> datetime.date:


@task
def write_csv_file(df: pd.DataFrame) -> str:
path = "/tmp/data.csv"
df.to_csv(path, index=False)
return path
def get_cookies() -> tuple[WebDriver, dict[str, str]]:
options = webdriver.ChromeOptions()

# https://github.com/SeleniumHQ/selenium/issues/11637
prefs = {
"download.directory_upgrade": True,
"safebrowsing.enabled": True,
}
options.add_experimental_option(
"prefs",
prefs,
)

options.add_argument("--no-sandbox")
options.add_argument("--disable-gpu")
options.add_argument("--disable-dev-shm-usage")
options.add_argument("--crash-dumps-dir=/tmp")
options.add_argument("--remote-debugging-port=9222")
# NOTE: A resolucao afeta a renderizacao dos elementos
options.add_argument("--window-size=1920,1080")
options.add_argument("--headless=new")

driver = webdriver.Chrome(options=options)

driver.get(build_home_url_page(0))

cookies = {cookie["name"]: cookie["value"] for cookie in driver.get_cookies()}

return (driver, cookies)
Loading
Loading