diff --git a/.github/workflows/cd_staging.yaml b/.github/workflows/cd-staging.yaml similarity index 100% rename from .github/workflows/cd_staging.yaml rename to .github/workflows/cd-staging.yaml diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 28de55cc7..9963281e4 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -4,21 +4,22 @@ on: pull_request: jobs: - tests: - name: ${{ matrix.os }}${{ matrix.arch }} - Python ${{ matrix.python-version }} - tests + test: + name: Test runs-on: ${{ matrix.os }} strategy: matrix: - os: [ubuntu-latest] arch: [x64] + os: [ubuntu-latest] python-version: ["3.9.x"] steps: - - uses: actions/checkout@v2 + - name: Checkout + uses: actions/checkout@v3 - name: Set up Python uses: actions/setup-python@v2 with: - python-version: ${{ matrix.python-version }} architecture: ${{ matrix.arch }} + python-version: ${{ matrix.python-version }} - name: Set up Poetry and upgrade pip run: | pip install -U pip poetry @@ -47,3 +48,34 @@ jobs: - name: Run tests run: | pytest + lint_python: + name: Lint Python + runs-on: ubuntu-latest + steps: + - name: Checkout + uses: actions/checkout@v3 + - name: Set up Python + uses: actions/setup-python@v2 + with: + python-version: "3.9.x" + - name: Set up poetry and upgrade pip + run: | + pip install -U pip poetry + pip install isort + pip install black + pip install autoflake + pip install flake8 + - name: Install this package + run: poetry install + - name: Lint source code + run: poetry run lint + lint_docker: + name: Lint Dockerfile + runs-on: ubuntu-latest + steps: + - name: Checkout + uses: actions/checkout@v3 + - name: Run hadolint + uses: reviewdog/action-hadolint@v1 + with: + reporter: github-pr-review diff --git a/.github/workflows/lint_docker.yaml b/.github/workflows/lint_docker.yaml deleted file mode 100644 index 9db59bfb5..000000000 --- a/.github/workflows/lint_docker.yaml +++ /dev/null @@ -1,15 +0,0 @@ -name: Lint Dockerfile - -on: - pull_request: - -jobs: - docker_lint: - name: Lint Dockerfile - runs-on: ubuntu-latest - steps: - - uses: actions/checkout@v2 - - name: Run hadolint - uses: reviewdog/action-hadolint@v1 - with: - reporter: github-pr-review diff --git a/.github/workflows/lint_python.yaml b/.github/workflows/lint_python.yaml deleted file mode 100644 index f509661c8..000000000 --- a/.github/workflows/lint_python.yaml +++ /dev/null @@ -1,31 +0,0 @@ -name: Lint Python - -on: - pull_request: - -jobs: - docker_lint: - name: Lint Python - runs-on: ubuntu-latest - steps: - - name: Checkout - uses: actions/checkout@v2 - - - name: Set up Python - uses: actions/setup-python@v2 - with: - python-version: "3.9.x" - - - name: Set up poetry and upgrade pip - run: | - pip install -U pip poetry - pip install isort - pip install black - pip install autoflake - pip install flake8 - - - name: Install this package - run: poetry install - - - name: Lint source code - run: poetry run lint diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index b6b1e2aa6..2e5254390 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -12,16 +12,16 @@ repos: rev: 1.6.0 hooks: - id: poetry-check - - repo: https://github.com/pycqa/isort - rev: 5.12.0 - hooks: - - id: isort - args: [--profile, black, --skip, "pipelines/{{cookiecutter.project_name}}"] - repo: https://github.com/psf/black rev: 23.9.1 hooks: - id: black exclude: 'pipelines\/\{\{cookiecutter\.project_name\}\}.*' + - repo: https://github.com/pycqa/isort + rev: 5.12.0 + hooks: + - id: isort + args: [--profile, black, --skip, "pipelines/{{cookiecutter.project_name}}"] - repo: https://github.com/PyCQA/autoflake rev: v2.2.1 hooks: diff --git a/.pylintrc b/.pylintrc deleted file mode 100644 index 9c7bf3819..000000000 --- a/.pylintrc +++ /dev/null @@ -1,568 +0,0 @@ -[MASTER] - -# A comma-separated list of package or module names from where C extensions may -# be loaded. Extensions are loading into the active Python interpreter and may -# run arbitrary code. -extension-pkg-allow-list=cx_Oracle,netCDF4 - -# A comma-separated list of package or module names from where C extensions may -# be loaded. Extensions are loading into the active Python interpreter and may -# run arbitrary code. (This is an alternative name to extension-pkg-allow-list -# for backward compatibility.) -extension-pkg-whitelist= - -# Return non-zero exit code if any of these messages/categories are detected, -# even if score is above --fail-under value. Syntax same as enable. Messages -# specified are enabled, while categories only check already-enabled messages. -fail-on= - -# Specify a score threshold to be exceeded before program exits with error. -fail-under=10.0 - -# Files or directories to be skipped. They should be base names, not paths. -ignore={{cookiecutter.project_name}} - -# Add files or directories matching the regex patterns to the ignore-list. The -# regex matches against paths. -ignore-paths= - -# Files or directories matching the regex patterns are skipped. The regex -# matches against base names, not paths. -ignore-patterns= - -# Python code to execute, usually for sys.path manipulation such as -# pygtk.require(). -#init-hook= - -# Use multiple processes to speed up Pylint. Specifying 0 will auto-detect the -# number of processors available to use. -jobs=1 - -# Control the amount of potential inferred values when inferring a single -# object. This can help the performance when dealing with large functions or -# complex, nested conditions. -limit-inference-results=100 - -# List of plugins (as comma separated values of python module names) to load, -# usually to register additional checkers. -load-plugins= - -# Pickle collected data for later comparisons. -persistent=yes - -# Min Python version to use for version dependend checks. Will default to the -# version used to run pylint. -py-version=3.9 - -# When enabled, pylint would attempt to guess common misconfiguration and emit -# user-friendly hints instead of false-positive error messages. -suggestion-mode=yes - -# Allow loading of arbitrary C extensions. Extensions are imported into the -# active Python interpreter and may run arbitrary code. -unsafe-load-any-extension=no - - -[MESSAGES CONTROL] - -# Only show warnings with the listed confidence levels. Leave empty to show -# all. Valid levels: HIGH, INFERENCE, INFERENCE_FAILURE, UNDEFINED. -confidence= - -# Disable the message, report, category or checker with the given id(s). You -# can either give multiple identifiers separated by comma (,) or put this -# option multiple times (only on the command line, not in the configuration -# file where it should appear only once). You can also use "--disable=all" to -# disable everything first and then reenable specific checks. For example, if -# you want to run only the similarities checker, you can use "--disable=all -# --enable=similarities". If you want to run only the classes checker, but have -# no Warning level messages displayed, use "--disable=all --enable=classes -# --disable=W". -disable=raw-checker-failed, - bad-inline-option, - locally-disabled, - file-ignored, - suppressed-message, - useless-suppression, - deprecated-pragma, - use-symbolic-message-instead, - line-too-long, - duplicate-code, - consider-using-f-string, - unexpected-keyword-arg, - broad-except, - unsubscriptable-object, - unsupported-assignment-operation, - no-member - -# Enable the message, report, category or checker with the given id(s). You can -# either give multiple identifier separated by comma (,) or put this option -# multiple time (only on the command line, not in the configuration file where -# it should appear only once). See also the "--disable" option for examples. -enable=c-extension-no-member - - -[REPORTS] - -# Python expression which should return a score less than or equal to 10. You -# have access to the variables 'error', 'warning', 'refactor', and 'convention' -# which contain the number of messages in each category, as well as 'statement' -# which is the total number of statements analyzed. This score is used by the -# global evaluation report (RP0004). -evaluation=10.0 - ((float(5 * error + warning + refactor + convention) / statement) * 10) - -# Template used to display messages. This is a python new-style format string -# used to format the message information. See doc for all details. -#msg-template= - -# Set the output format. Available formats are text, parseable, colorized, json -# and msvs (visual studio). You can also give a reporter class, e.g. -# mypackage.mymodule.MyReporterClass. -output-format=text - -# Tells whether to display a full report or only the messages. -reports=no - -# Activate the evaluation score. -score=yes - - -[REFACTORING] - -# Maximum number of nested blocks for function / method body -max-nested-blocks=5 - -# Complete name of functions that never returns. When checking for -# inconsistent-return-statements if a never returning function is called then -# it will be considered as an explicit return statement and no message will be -# printed. -never-returning-functions=sys.exit,argparse.parse_error - - -[VARIABLES] - -# List of additional names supposed to be defined in builtins. Remember that -# you should avoid defining new builtins when possible. -additional-builtins= - -# Tells whether unused global variables should be treated as a violation. -allow-global-unused-variables=yes - -# List of names allowed to shadow builtins -allowed-redefined-builtins= - -# List of strings which can identify a callback function by name. A callback -# name must start or end with one of those strings. -callbacks=cb_, - _cb - -# A regular expression matching the name of dummy variables (i.e. expected to -# not be used). -dummy-variables-rgx=_+$|(_[a-zA-Z0-9_]*[a-zA-Z0-9]+?$)|dummy|^ignored_|^unused_ - -# Argument names that match this expression will be ignored. Default to name -# with leading underscore. -ignored-argument-names=_.*|^ignored_|^unused_ - -# Tells whether we should check for unused import in __init__ files. -init-import=no - -# List of qualified module names which can have objects that can redefine -# builtins. -redefining-builtins-modules=six.moves,past.builtins,future.builtins,builtins,io - - -[FORMAT] - -# Expected format of line ending, e.g. empty (any line ending), LF or CRLF. -expected-line-ending-format= - -# Regexp for a line that is allowed to be longer than the limit. -ignore-long-lines=^\s*(# )??$ - -# Number of spaces of indent required inside a hanging or continued line. -indent-after-paren=4 - -# String used as indentation unit. This is usually " " (4 spaces) or "\t" (1 -# tab). -indent-string=' ' - -# Maximum number of characters on a single line. -max-line-length=100 - -# Maximum number of lines in a module. -max-module-lines=1000 - -# Allow the body of a class to be on the same line as the declaration if body -# contains single statement. -single-line-class-stmt=no - -# Allow the body of an if to be on the same line as the test if there is no -# else. -single-line-if-stmt=no - - -[MISCELLANEOUS] - -# List of note tags to take in consideration, separated by a comma. -notes=FIXME, - XXX, - TODO - -# Regular expression of note tags to take in consideration. -#notes-rgx= - - -[SIMILARITIES] - -# Comments are removed from the similarity computation -ignore-comments=yes - -# Docstrings are removed from the similarity computation -ignore-docstrings=yes - -# Imports are removed from the similarity computation -ignore-imports=no - -# Signatures are removed from the similarity computation -ignore-signatures=no - -# Minimum lines number of a similarity. -min-similarity-lines=4 - - -[LOGGING] - -# The type of string formatting that logging methods do. `old` means using % -# formatting, `new` is for `{}` formatting. -logging-format-style=old - -# Logging modules to check that the string format arguments are in logging -# function parameter format. -logging-modules=logging - - -[SPELLING] - -# Limits count of emitted suggestions for spelling mistakes. -max-spelling-suggestions=4 - -# Spelling dictionary name. Available dictionaries: none. To make it work, -# install the 'python-enchant' package. -spelling-dict= - -# List of comma separated words that should be considered directives if they -# appear and the beginning of a comment and should not be checked. -spelling-ignore-comment-directives=fmt: on,fmt: off,noqa:,noqa,nosec,isort:skip,mypy: - -# List of comma separated words that should not be checked. -spelling-ignore-words= - -# A path to a file that contains the private dictionary; one word per line. -spelling-private-dict-file= - -# Tells whether to store unknown words to the private dictionary (see the -# --spelling-private-dict-file option) instead of raising a message. -spelling-store-unknown-words=no - - -[STRING] - -# This flag controls whether inconsistent-quotes generates a warning when the -# character used as a quote delimiter is used inconsistently within a module. -check-quote-consistency=no - -# This flag controls whether the implicit-str-concat should generate a warning -# on implicit string concatenation in sequences defined over several lines. -check-str-concat-over-line-jumps=no - - -[TYPECHECK] - -# List of decorators that produce context managers, such as -# contextlib.contextmanager. Add to this list to register other decorators that -# produce valid context managers. -contextmanager-decorators=contextlib.contextmanager - -# List of members which are set dynamically and missed by pylint inference -# system, and so shouldn't trigger E1101 when accessed. Python regular -# expressions are accepted. -generated-members= - -# Tells whether missing members accessed in mixin class should be ignored. A -# mixin class is detected if its name ends with "mixin" (case insensitive). -ignore-mixin-members=yes - -# Tells whether to warn about missing members when the owner of the attribute -# is inferred to be None. -ignore-none=yes - -# This flag controls whether pylint should warn about no-member and similar -# checks whenever an opaque object is returned when inferring. The inference -# can return multiple potential results while evaluating a Python object, but -# some branches might not be evaluated, which results in partial inference. In -# that case, it might be useful to still emit no-member and other checks for -# the rest of the inferred objects. -ignore-on-opaque-inference=yes - -# List of class names for which member attributes should not be checked (useful -# for classes with dynamically set attributes). This supports the use of -# qualified names. -ignored-classes=optparse.Values,thread._local,_thread._local - -# List of module names for which member attributes should not be checked -# (useful for modules/projects where namespaces are manipulated during runtime -# and thus existing member attributes cannot be deduced by static analysis). It -# supports qualified module names, as well as Unix pattern matching. -ignored-modules= - -# Show a hint with possible names when a member name was not found. The aspect -# of finding the hint is based on edit distance. -missing-member-hint=yes - -# The minimum edit distance a name should have in order to be considered a -# similar match for a missing member name. -missing-member-hint-distance=1 - -# The total number of similar names that should be taken in consideration when -# showing a hint for a missing member. -missing-member-max-choices=1 - -# List of decorators that change the signature of a decorated function. -signature-mutators= - - -[BASIC] - -# Naming style matching correct argument names. -argument-naming-style=snake_case - -# Regular expression matching correct argument names. Overrides argument- -# naming-style. -#argument-rgx= - -# Naming style matching correct attribute names. -attr-naming-style=snake_case - -# Regular expression matching correct attribute names. Overrides attr-naming- -# style. -#attr-rgx= - -# Bad variable names which should always be refused, separated by a comma. -bad-names=foo, - bar, - baz, - toto, - tutu, - tata - -# Bad variable names regexes, separated by a comma. If names match any regex, -# they will always be refused -bad-names-rgxs= - -# Naming style matching correct class attribute names. -class-attribute-naming-style=any - -# Regular expression matching correct class attribute names. Overrides class- -# attribute-naming-style. -#class-attribute-rgx= - -# Naming style matching correct class constant names. -class-const-naming-style=UPPER_CASE - -# Regular expression matching correct class constant names. Overrides class- -# const-naming-style. -#class-const-rgx= - -# Naming style matching correct class names. -class-naming-style=PascalCase - -# Regular expression matching correct class names. Overrides class-naming- -# style. -#class-rgx= - -# Naming style matching correct constant names. -const-naming-style=UPPER_CASE - -# Regular expression matching correct constant names. Overrides const-naming- -# style. -#const-rgx= - -# Minimum line length for functions/classes that require docstrings, shorter -# ones are exempt. -docstring-min-length=-1 - -# Naming style matching correct function names. -function-naming-style=snake_case - -# Regular expression matching correct function names. Overrides function- -# naming-style. -#function-rgx= - -# Good variable names which should always be accepted, separated by a comma. -good-names=i, - j, - k, - ex, - Run, - _ - -# Good variable names regexes, separated by a comma. If names match any regex, -# they will always be accepted -good-names-rgxs= - -# Include a hint for the correct naming format with invalid-name. -include-naming-hint=no - -# Naming style matching correct inline iteration names. -inlinevar-naming-style=any - -# Regular expression matching correct inline iteration names. Overrides -# inlinevar-naming-style. -#inlinevar-rgx= - -# Naming style matching correct method names. -method-naming-style=snake_case - -# Regular expression matching correct method names. Overrides method-naming- -# style. -#method-rgx= - -# Naming style matching correct module names. -module-naming-style=snake_case - -# Regular expression matching correct module names. Overrides module-naming- -# style. -#module-rgx= - -# Colon-delimited sets of names that determine each other's naming style when -# the name regexes allow several styles. -name-group= - -# Regular expression which should only match function or class names that do -# not require a docstring. -no-docstring-rgx=^_ - -# List of decorators that produce properties, such as abc.abstractproperty. Add -# to this list to register other decorators that produce valid properties. -# These decorators are taken in consideration only for invalid-name. -property-classes=abc.abstractproperty - -# Naming style matching correct variable names. -variable-naming-style=snake_case - -# Regular expression matching correct variable names. Overrides variable- -# naming-style. -#variable-rgx= - - -[IMPORTS] - -# List of modules that can be imported at any level, not just the top level -# one. -allow-any-import-level= - -# Allow wildcard imports from modules that define __all__. -allow-wildcard-with-all=no - -# Analyse import fallback blocks. This can be used to support both Python 2 and -# 3 compatible code, which means that the block might have code that exists -# only in one or another interpreter, leading to false positives when analysed. -analyse-fallback-blocks=no - -# Deprecated modules which should not be used, separated by a comma. -deprecated-modules= - -# Output a graph (.gv or any supported image format) of external dependencies -# to the given file (report RP0402 must not be disabled). -ext-import-graph= - -# Output a graph (.gv or any supported image format) of all (i.e. internal and -# external) dependencies to the given file (report RP0402 must not be -# disabled). -import-graph= - -# Output a graph (.gv or any supported image format) of internal dependencies -# to the given file (report RP0402 must not be disabled). -int-import-graph= - -# Force import order to recognize a module as part of the standard -# compatibility libraries. -known-standard-library= - -# Force import order to recognize a module as part of a third party library. -known-third-party=enchant - -# Couples of modules and preferred modules, separated by a comma. -preferred-modules= - - -[CLASSES] - -# Warn about protected attribute access inside special methods -check-protected-access-in-special-methods=no - -# List of method names used to declare (i.e. assign) instance attributes. -defining-attr-methods=__init__, - __new__, - setUp, - __post_init__ - -# List of member names, which should be excluded from the protected access -# warning. -exclude-protected=_asdict, - _fields, - _replace, - _source, - _make - -# List of valid names for the first argument in a class method. -valid-classmethod-first-arg=cls - -# List of valid names for the first argument in a metaclass class method. -valid-metaclass-classmethod-first-arg=cls - - -[DESIGN] - -# List of qualified class names to ignore when counting class parents (see -# R0901) -ignored-parents= - -# Maximum number of arguments for function / method. -max-args=5 - -# Maximum number of attributes for a class (see R0902). -max-attributes=7 - -# Maximum number of boolean expressions in an if statement (see R0916). -max-bool-expr=5 - -# Maximum number of branch for function / method body. -max-branches=12 - -# Maximum number of locals for function / method body. -max-locals=15 - -# Maximum number of parents for a class (see R0901). -max-parents=7 - -# Maximum number of public methods for a class (see R0904). -max-public-methods=20 - -# Maximum number of return / yield for function / method body. -max-returns=6 - -# Maximum number of statements in function / method body. -max-statements=50 - -# Minimum number of public methods for a class (see R0903). -min-public-methods=2 - - -[EXCEPTIONS] - -# Exceptions that will emit a warning when being caught. Defaults to -# "BaseException, Exception". -overgeneral-exceptions=BaseException, - Exception diff --git a/pipelines/datasets/__init__.py b/pipelines/datasets/__init__.py index 3866718a8..4e98c3313 100644 --- a/pipelines/datasets/__init__.py +++ b/pipelines/datasets/__init__.py @@ -43,6 +43,7 @@ from pipelines.datasets.br_rf_cafir.flows import * from pipelines.datasets.br_rj_isp_estatisticas_seguranca.flows import * from pipelines.datasets.br_sp_saopaulo_dieese_icv.flows import * +from pipelines.datasets.br_stf_corte_aberta.flows import * from pipelines.datasets.br_tse_eleicoes.flows import * from pipelines.datasets.cross_update.flows import * from pipelines.datasets.delete_flows.flows import * diff --git a/pipelines/datasets/br_anatel_banda_larga_fixa/flows.py b/pipelines/datasets/br_anatel_banda_larga_fixa/flows.py index c4b85f662..adb9678dc 100644 --- a/pipelines/datasets/br_anatel_banda_larga_fixa/flows.py +++ b/pipelines/datasets/br_anatel_banda_larga_fixa/flows.py @@ -24,11 +24,11 @@ from pipelines.utils.constants import constants as utils_constants from pipelines.utils.decorators import Flow from pipelines.utils.execute_dbt_model.constants import constants as dump_db_constants +from pipelines.utils.metadata.tasks import update_django_metadata from pipelines.utils.tasks import ( create_table_and_upload_to_gcs, get_current_flow_labels, rename_current_flow_run_dataset_table, - update_django_metadata, ) with Flow( @@ -110,44 +110,18 @@ wait_for_materialization.retry_delay = timedelta( seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value ) - - # ! tabela bd pro - with case(materialize_after_dump, True): - # Trigger DBT flow run - current_flow_labels = get_current_flow_labels() - materialization_flow = create_flow_run( - flow_name=utils_constants.FLOW_EXECUTE_DBT_MODEL_NAME.value, - project_name=constants.PREFECT_DEFAULT_PROJECT.value, - parameters={ - "dataset_id": dataset_id, - "table_id": table_id[0] + "_atualizado", - "mode": materialization_mode, - "dbt_alias": dbt_alias, - }, - labels=current_flow_labels, - run_name=f"Materialize {dataset_id}.{table_id[0]}" + "_atualizado", - ) - - wait_for_materialization = wait_for_flow_run( - materialization_flow, - stream_states=True, - stream_logs=True, - raise_final_state=True, - ) - wait_for_materialization.max_retries = ( - dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_ATTEMPTS.value - ) - wait_for_materialization.retry_delay = timedelta( - seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value - ) - with case(update_metadata, True): date = get_today_date_atualizado() # task que retorna a data atual update_django_metadata( dataset_id, - table_id[0] + "_atualizado", + table_id[0], metadata_type="DateTimeRange", bq_last_update=False, + bq_table_last_year_month=False, + is_bd_pro=True, + is_free=True, + time_delta=2, + time_unit="months", api_mode="prod", date_format="yy-mm", _last_date=date, @@ -192,43 +166,18 @@ seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value ) - # ! tabela bd pro - with case(materialize_after_dump, True): - # Trigger DBT flow run - current_flow_labels = get_current_flow_labels() - materialization_flow = create_flow_run( - flow_name=utils_constants.FLOW_EXECUTE_DBT_MODEL_NAME.value, - project_name=constants.PREFECT_DEFAULT_PROJECT.value, - parameters={ - "dataset_id": dataset_id, - "table_id": table_id[1] + "_atualizado", - "mode": materialization_mode, - "dbt_alias": dbt_alias, - }, - labels=current_flow_labels, - run_name=f"Materialize {dataset_id}.{table_id[1]}" + "_atualizado", - ) - - wait_for_materialization = wait_for_flow_run( - materialization_flow, - stream_states=True, - stream_logs=True, - raise_final_state=True, - ) - wait_for_materialization.max_retries = ( - dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_ATTEMPTS.value - ) - wait_for_materialization.retry_delay = timedelta( - seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value - ) - with case(update_metadata, True): date = get_today_date_atualizado() # task que retorna a data atual update_django_metadata( dataset_id, - table_id[1] + "_atualizado", + table_id[1], metadata_type="DateTimeRange", bq_last_update=False, + bq_table_last_year_month=False, + is_bd_pro=True, + is_free=True, + time_delta=2, + time_unit="months", api_mode="prod", date_format="yy-mm", _last_date=date, @@ -275,43 +224,18 @@ seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value ) - # ! tabela bd pro - with case(materialize_after_dump, True): - # Trigger DBT flow run - current_flow_labels = get_current_flow_labels() - materialization_flow = create_flow_run( - flow_name=utils_constants.FLOW_EXECUTE_DBT_MODEL_NAME.value, - project_name=constants.PREFECT_DEFAULT_PROJECT.value, - parameters={ - "dataset_id": dataset_id, - "table_id": table_id[2] + "_atualizado", - "mode": materialization_mode, - "dbt_alias": dbt_alias, - }, - labels=current_flow_labels, - run_name=f"Materialize {dataset_id}.{table_id[2]}" + "_atualizado", - ) - - wait_for_materialization = wait_for_flow_run( - materialization_flow, - stream_states=True, - stream_logs=True, - raise_final_state=True, - ) - wait_for_materialization.max_retries = ( - dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_ATTEMPTS.value - ) - wait_for_materialization.retry_delay = timedelta( - seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value - ) - with case(update_metadata, True): date = get_today_date_atualizado() # task que retorna a data atual update_django_metadata( dataset_id, - table_id[2] + "_atualizado", + table_id[2], metadata_type="DateTimeRange", bq_last_update=False, + bq_table_last_year_month=False, + is_bd_pro=True, + is_free=True, + time_delta=2, + time_unit="months", api_mode="prod", date_format="yy-mm", _last_date=date, @@ -357,44 +281,18 @@ seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value ) - # ! tabela bd pro - - with case(materialize_after_dump, True): - # Trigger DBT flow run - current_flow_labels = get_current_flow_labels() - materialization_flow = create_flow_run( - flow_name=utils_constants.FLOW_EXECUTE_DBT_MODEL_NAME.value, - project_name=constants.PREFECT_DEFAULT_PROJECT.value, - parameters={ - "dataset_id": dataset_id, - "table_id": table_id[3] + "_atualizado", - "mode": materialization_mode, - "dbt_alias": dbt_alias, - }, - labels=current_flow_labels, - run_name=f"Materialize {dataset_id}.{table_id[3]}" + "_atualizado", - ) - - wait_for_materialization = wait_for_flow_run( - materialization_flow, - stream_states=True, - stream_logs=True, - raise_final_state=True, - ) - wait_for_materialization.max_retries = ( - dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_ATTEMPTS.value - ) - wait_for_materialization.retry_delay = timedelta( - seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value - ) - with case(update_metadata, True): date = get_today_date_atualizado() # task que retorna a data atual update_django_metadata( dataset_id, - table_id[3] + "_atualizado", + table_id[3], metadata_type="DateTimeRange", bq_last_update=False, + bq_table_last_year_month=False, + is_bd_pro=True, + is_free=True, + time_delta=2, + time_unit="months", api_mode="prod", date_format="yy-mm", _last_date=date, diff --git a/pipelines/datasets/br_anatel_telefonia_movel/flows.py b/pipelines/datasets/br_anatel_telefonia_movel/flows.py index 072503bcc..d021bba7c 100644 --- a/pipelines/datasets/br_anatel_telefonia_movel/flows.py +++ b/pipelines/datasets/br_anatel_telefonia_movel/flows.py @@ -25,11 +25,11 @@ from pipelines.utils.constants import constants as utils_constants from pipelines.utils.decorators import Flow from pipelines.utils.execute_dbt_model.constants import constants as dump_db_constants +from pipelines.utils.metadata.tasks import update_django_metadata from pipelines.utils.tasks import ( create_table_and_upload_to_gcs, get_current_flow_labels, rename_current_flow_run_dataset_table, - update_django_metadata, ) with Flow(name="br_anatel_telefonia_movel", code_owners=["tricktx"]) as br_anatel: @@ -116,43 +116,18 @@ seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value ) - # ! tabela bd pro - with case(materialize_after_dump, True): - # Trigger DBT flow run - current_flow_labels = get_current_flow_labels() - materialization_flow = create_flow_run( - flow_name=utils_constants.FLOW_EXECUTE_DBT_MODEL_NAME.value, - project_name=constants.PREFECT_DEFAULT_PROJECT.value, - parameters={ - "dataset_id": dataset_id, - "table_id": table_id[0] + "_atualizado", - "mode": materialization_mode, - "dbt_alias": dbt_alias, - }, - labels=current_flow_labels, - run_name=f"Materialize {dataset_id}.{table_id[0]}" + "_atualizado", - ) - - wait_for_materialization = wait_for_flow_run( - materialization_flow, - stream_states=True, - stream_logs=True, - raise_final_state=True, - ) - wait_for_materialization.max_retries = ( - dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_ATTEMPTS.value - ) - wait_for_materialization.retry_delay = timedelta( - seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value - ) - with case(update_metadata, True): date = get_today_date_atualizado() # task que retorna a data atual update_django_metadata( dataset_id, - table_id[0] + "_atualizado", + table_id[0], metadata_type="DateTimeRange", bq_last_update=False, + bq_table_last_year_month=False, + is_bd_pro=True, + is_free=True, + time_delta=2, + time_unit="months", api_mode="prod", date_format="yy-mm", _last_date=date, @@ -198,43 +173,18 @@ seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value ) - # ! tabela bd pro - with case(materialize_after_dump, True): - # Trigger DBT flow run - current_flow_labels = get_current_flow_labels() - materialization_flow = create_flow_run( - flow_name=utils_constants.FLOW_EXECUTE_DBT_MODEL_NAME.value, - project_name=constants.PREFECT_DEFAULT_PROJECT.value, - parameters={ - "dataset_id": dataset_id, - "table_id": table_id[1] + "_atualizado", - "mode": materialization_mode, - "dbt_alias": dbt_alias, - }, - labels=current_flow_labels, - run_name=f"Materialize {dataset_id}.{table_id[1]}" + "_atualizado", - ) - - wait_for_materialization = wait_for_flow_run( - materialization_flow, - stream_states=True, - stream_logs=True, - raise_final_state=True, - ) - wait_for_materialization.max_retries = ( - dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_ATTEMPTS.value - ) - wait_for_materialization.retry_delay = timedelta( - seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value - ) - with case(update_metadata, True): date = get_today_date_atualizado() # task que retorna a data atual update_django_metadata( dataset_id, - table_id[1] + "_atualizado", + table_id[1], metadata_type="DateTimeRange", bq_last_update=False, + bq_table_last_year_month=False, + is_bd_pro=True, + is_free=True, + time_delta=2, + time_unit="months", api_mode="prod", date_format="yy-mm", _last_date=date, @@ -282,43 +232,18 @@ seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value ) - # ! tabela bd pro - with case(materialize_after_dump, True): - # Trigger DBT flow run - current_flow_labels = get_current_flow_labels() - materialization_flow = create_flow_run( - flow_name=utils_constants.FLOW_EXECUTE_DBT_MODEL_NAME.value, - project_name=constants.PREFECT_DEFAULT_PROJECT.value, - parameters={ - "dataset_id": dataset_id, - "table_id": table_id[2] + "_atualizado", - "mode": materialization_mode, - "dbt_alias": dbt_alias, - }, - labels=current_flow_labels, - run_name=f"Materialize {dataset_id}.{table_id[2]}" + "_atualizado", - ) - - wait_for_materialization = wait_for_flow_run( - materialization_flow, - stream_states=True, - stream_logs=True, - raise_final_state=True, - ) - wait_for_materialization.max_retries = ( - dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_ATTEMPTS.value - ) - wait_for_materialization.retry_delay = timedelta( - seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value - ) - with case(update_metadata, True): date = get_today_date_atualizado() # task que retorna a data atual update_django_metadata( dataset_id, - table_id[2] + "_atualizado", + table_id[2], metadata_type="DateTimeRange", bq_last_update=False, + bq_table_last_year_month=False, + is_bd_pro=True, + is_free=True, + time_delta=2, + time_unit="months", api_mode="prod", date_format="yy-mm", _last_date=date, @@ -365,44 +290,18 @@ seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value ) - # ! tabela bd pro - - with case(materialize_after_dump, True): - # Trigger DBT flow run - current_flow_labels = get_current_flow_labels() - materialization_flow = create_flow_run( - flow_name=utils_constants.FLOW_EXECUTE_DBT_MODEL_NAME.value, - project_name=constants.PREFECT_DEFAULT_PROJECT.value, - parameters={ - "dataset_id": dataset_id, - "table_id": table_id[3] + "_atualizado", - "mode": materialization_mode, - "dbt_alias": dbt_alias, - }, - labels=current_flow_labels, - run_name=f"Materialize {dataset_id}.{table_id[3]}" + "_atualizado", - ) - - wait_for_materialization = wait_for_flow_run( - materialization_flow, - stream_states=True, - stream_logs=True, - raise_final_state=True, - ) - wait_for_materialization.max_retries = ( - dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_ATTEMPTS.value - ) - wait_for_materialization.retry_delay = timedelta( - seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value - ) - with case(update_metadata, True): date = get_today_date_atualizado() # task que retorna a data atual update_django_metadata( dataset_id, - table_id[3] + "_atualizado", + table_id[3], metadata_type="DateTimeRange", bq_last_update=False, + bq_table_last_year_month=False, + is_bd_pro=True, + is_free=True, + time_delta=2, + time_unit="months", api_mode="prod", date_format="yy-mm", _last_date=date, diff --git a/pipelines/datasets/br_anp_precos_combustiveis/flows.py b/pipelines/datasets/br_anp_precos_combustiveis/flows.py index 1fb522cd5..6a003b91d 100644 --- a/pipelines/datasets/br_anp_precos_combustiveis/flows.py +++ b/pipelines/datasets/br_anp_precos_combustiveis/flows.py @@ -11,15 +11,11 @@ from prefect.tasks.prefect import create_flow_run, wait_for_flow_run from pipelines.constants import constants -from pipelines.datasets.br_anp_precos_combustiveis.constants import ( - constants as anatel_constants, -) from pipelines.datasets.br_anp_precos_combustiveis.schedules import ( every_week_anp_microdados, ) from pipelines.datasets.br_anp_precos_combustiveis.tasks import ( check_for_updates, - data_max_bd_mais, data_max_bd_pro, download_and_transform, make_partitions, @@ -28,12 +24,12 @@ from pipelines.utils.constants import constants as utils_constants from pipelines.utils.decorators import Flow from pipelines.utils.execute_dbt_model.constants import constants as dump_db_constants +from pipelines.utils.metadata.tasks import update_django_metadata from pipelines.utils.tasks import ( create_table_and_upload_to_gcs, get_current_flow_labels, log_task, rename_current_flow_run_dataset_table, - update_django_metadata, ) with Flow( @@ -70,7 +66,6 @@ with case(dados_desatualizados, True): df = download_and_transform(upstream_tasks=[rename_flow_run]) output_path = make_partitions(df=df, upstream_tasks=[df]) - get_date_max_pro = data_max_bd_pro(df=df) # pylint: disable=C0103 wait_upload_table = create_table_and_upload_to_gcs( @@ -95,6 +90,7 @@ }, labels=current_flow_labels, run_name=f"Materialize {dataset_id}.{table_id}", + upstream_tasks=[wait_upload_table], ) wait_for_materialization = wait_for_flow_run( @@ -109,19 +105,25 @@ wait_for_materialization.retry_delay = timedelta( seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value ) - + get_date_max_pro = data_max_bd_pro( + df=df, upstream_tasks=[wait_upload_table] + ) with case(update_metadata, True): update_django_metadata( dataset_id, table_id, metadata_type="DateTimeRange", bq_last_update=False, + bq_table_last_year_month=False, api_mode="prod", date_format="yy-mm-dd", + is_bd_pro=True, + is_free=True, + time_delta=6, + time_unit="weeks", _last_date=get_date_max_pro, - upstream_tasks=[wait_upload_table], + upstream_tasks=[get_date_max_pro], ) - anp_microdados.storage = GCS(constants.GCS_FLOWS_BUCKET.value) anp_microdados.run_config = KubernetesRun(image=constants.DOCKER_IMAGE.value) anp_microdados.schedule = every_week_anp_microdados diff --git a/pipelines/datasets/br_anp_precos_combustiveis/tasks.py b/pipelines/datasets/br_anp_precos_combustiveis/tasks.py index 456731b94..60f37a94a 100644 --- a/pipelines/datasets/br_anp_precos_combustiveis/tasks.py +++ b/pipelines/datasets/br_anp_precos_combustiveis/tasks.py @@ -40,10 +40,16 @@ def check_for_updates(dataset_id, table_id): # Obtém a data mais recente do site download_files(anatel_constants.URLS_DATA.value, anatel_constants.PATH_INPUT.value) df = pd.read_csv(anatel_constants.URL_GLP.value, sep=";", encoding="utf-8") - data_obj = df["Data da Coleta"].max() - data_obj = datetime.strptime(data_obj, "%d/%m/%Y").strftime("%Y-%m-%d") - data_obj = datetime.strptime(data_obj, "%Y-%m-%d").date() - + data_obj = ( + df["Data da Coleta"].str[6:10] + + "-" + + df["Data da Coleta"].str[3:5] + + "-" + + df["Data da Coleta"].str[0:2] + ) + data_obj = data_obj.apply(lambda x: pd.to_datetime(x).strftime("%Y-%m-%d")) + data_obj = pd.to_datetime(data_obj, format="%Y-%m-%d").dt.date + data_obj = data_obj.max() # Obtém a última data no site BD data_bq_obj = extract_last_date( dataset_id, table_id, "yy-mm-dd", "basedosdados", data="data_coleta" diff --git a/pipelines/datasets/br_b3_cotacoes/flows.py b/pipelines/datasets/br_b3_cotacoes/flows.py index 21068a654..22b2cbdd4 100644 --- a/pipelines/datasets/br_b3_cotacoes/flows.py +++ b/pipelines/datasets/br_b3_cotacoes/flows.py @@ -16,11 +16,11 @@ from pipelines.utils.constants import constants as utils_constants from pipelines.utils.decorators import Flow from pipelines.utils.execute_dbt_model.constants import constants as dump_db_constants +from pipelines.utils.metadata.tasks import update_django_metadata from pipelines.utils.tasks import ( create_table_and_upload_to_gcs, get_current_flow_labels, rename_current_flow_run_dataset_table, - update_django_metadata, ) from pipelines.utils.utils import log @@ -94,6 +94,9 @@ table_id, metadata_type="DateTimeRange", bq_last_update=False, + bq_table_last_year_month=False, + is_bd_pro=True, + is_free=False, api_mode="prod", date_format="yy-mm-dd", _last_date=data_max, diff --git a/pipelines/datasets/br_bcb_agencia/flows.py b/pipelines/datasets/br_bcb_agencia/flows.py index 67dfa704a..abab85f35 100644 --- a/pipelines/datasets/br_bcb_agencia/flows.py +++ b/pipelines/datasets/br_bcb_agencia/flows.py @@ -13,7 +13,7 @@ from pipelines.utils.constants import constants as utils_constants from pipelines.utils.decorators import Flow from pipelines.utils.execute_dbt_model.constants import constants as dump_db_constants -from pipelines.utils.metadata.flows import update_django_metadata +from pipelines.utils.metadata.tasks import update_django_metadata from pipelines.utils.tasks import ( create_table_and_upload_to_gcs, get_current_flow_labels, diff --git a/pipelines/datasets/br_bcb_estban/flows.py b/pipelines/datasets/br_bcb_estban/flows.py index c519474f3..bc1490e22 100644 --- a/pipelines/datasets/br_bcb_estban/flows.py +++ b/pipelines/datasets/br_bcb_estban/flows.py @@ -27,7 +27,7 @@ from pipelines.utils.constants import constants as utils_constants from pipelines.utils.decorators import Flow from pipelines.utils.execute_dbt_model.constants import constants as dump_db_constants -from pipelines.utils.metadata.flows import update_django_metadata +from pipelines.utils.metadata.tasks import update_django_metadata from pipelines.utils.tasks import ( create_table_and_upload_to_gcs, get_current_flow_labels, diff --git a/pipelines/datasets/br_inmet_bdmep/flows.py b/pipelines/datasets/br_inmet_bdmep/flows.py index 8f21d36e8..ff9cee42d 100644 --- a/pipelines/datasets/br_inmet_bdmep/flows.py +++ b/pipelines/datasets/br_inmet_bdmep/flows.py @@ -12,10 +12,11 @@ from pipelines.constants import constants from pipelines.datasets.br_inmet_bdmep.schedules import every_month_inmet -from pipelines.datasets.br_inmet_bdmep.tasks import get_base_inmet +from pipelines.datasets.br_inmet_bdmep.tasks import get_base_inmet, get_today_date from pipelines.utils.constants import constants as utils_constants from pipelines.utils.decorators import Flow from pipelines.utils.execute_dbt_model.constants import constants as dump_db_constants +from pipelines.utils.metadata.tasks import update_django_metadata from pipelines.utils.tasks import ( create_table_and_upload_to_gcs, get_current_flow_labels, @@ -29,7 +30,8 @@ # Parameters dataset_id = Parameter("dataset_id", default="br_inmet_bdmep", required=True) table_id = Parameter("table_id", default="microdados", required=True) - year = Parameter("year", default=2023, required=True) + year = Parameter("year", default=2023, required=False) + update_metadata = Parameter("update_metadata", default=True, required=False) materialization_mode = Parameter( "materialization_mode", default="prod", required=False @@ -80,6 +82,23 @@ wait_for_materialization.retry_delay = timedelta( seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value ) + with case(update_metadata, True): + date = get_today_date() + update = update_django_metadata( + dataset_id, + table_id, + metadata_type="DateTimeRange", + bq_last_update=False, + bq_table_last_year_month=False, + _last_date=date, + api_mode="prod", + date_format="yy-mm", + is_bd_pro=True, + is_free=True, + time_delta=6, + time_unit="months", + upstream_tasks=[wait_for_materialization], + ) br_inmet.storage = GCS(constants.GCS_FLOWS_BUCKET.value) br_inmet.run_config = KubernetesRun(image=constants.DOCKER_IMAGE.value) diff --git a/pipelines/datasets/br_inmet_bdmep/schedules.py b/pipelines/datasets/br_inmet_bdmep/schedules.py index ac5a6c842..4d14b73a2 100644 --- a/pipelines/datasets/br_inmet_bdmep/schedules.py +++ b/pipelines/datasets/br_inmet_bdmep/schedules.py @@ -77,6 +77,7 @@ from pipelines.constants import constants +d = datetime.today() every_month_inmet = Schedule( clocks=[ CronClock( @@ -88,9 +89,11 @@ parameter_defaults={ "dataset_id": "br_inmet_bdmep", "table_id": "microdados", - "materialization_mode": "dev", + "materialization_mode": "prod", "materialize_after_dump": True, "dbt_alias": True, + "year": d.strftime("%Y"), + "update_metadata": True, }, ), ], diff --git a/pipelines/datasets/br_inmet_bdmep/tasks.py b/pipelines/datasets/br_inmet_bdmep/tasks.py index d13785593..11af202a9 100644 --- a/pipelines/datasets/br_inmet_bdmep/tasks.py +++ b/pipelines/datasets/br_inmet_bdmep/tasks.py @@ -4,6 +4,7 @@ """ import glob import os +from datetime import datetime import numpy as np import pandas as pd @@ -29,8 +30,10 @@ def get_base_inmet(year: int) -> str: Retorna: - str: o caminho para o diretório que contém os arquivos CSV de saída. """ + log(f"Baixando os dados para o ano {year}.") download_inmet(year) + log("Dados baixados.") files = glob.glob(os.path.join(f"/tmp/data/input/{year}/", "*.CSV")) @@ -48,3 +51,10 @@ def get_base_inmet(year: int) -> str: base.to_csv(name, index=False) return "/tmp/data/output/microdados/" + + +@task +def get_today_date(): + d = datetime.today() + + return d.strftime("%Y-%m") diff --git a/pipelines/datasets/br_me_comex_stat/flows.py b/pipelines/datasets/br_me_comex_stat/flows.py index 376310173..359c78499 100644 --- a/pipelines/datasets/br_me_comex_stat/flows.py +++ b/pipelines/datasets/br_me_comex_stat/flows.py @@ -25,11 +25,11 @@ from pipelines.utils.constants import constants as utils_constants from pipelines.utils.decorators import Flow from pipelines.utils.execute_dbt_model.constants import constants as dump_db_constants +from pipelines.utils.metadata.tasks import update_django_metadata from pipelines.utils.tasks import ( create_table_and_upload_to_gcs, get_current_flow_labels, rename_current_flow_run_dataset_table, - update_django_metadata, ) with Flow( @@ -102,36 +102,6 @@ wait_for_materialization.retry_delay = timedelta( seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value ) - - # materialize municipio_exportacao_atualizado - with case(materialize_after_dump, True): - # Trigger DBT flow run - current_flow_labels = get_current_flow_labels() - materialization_flow = create_flow_run( - flow_name=utils_constants.FLOW_EXECUTE_DBT_MODEL_NAME.value, - project_name=constants.PREFECT_DEFAULT_PROJECT.value, - parameters={ - "dataset_id": dataset_id, - "table_id": table_id + "_atualizado", - "mode": materialization_mode, - "dbt_alias": dbt_alias, - }, - labels=current_flow_labels, - run_name=f"Materialize {dataset_id}.{table_id}", - ) - - wait_for_materialization = wait_for_flow_run( - materialization_flow, - stream_states=True, - stream_logs=True, - raise_final_state=True, - ) - wait_for_materialization.max_retries = ( - dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_ATTEMPTS.value - ) - wait_for_materialization.retry_delay = timedelta( - seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value - ) # coverage updater with case(update_metadata, True): update = update_django_metadata( @@ -140,22 +110,13 @@ metadata_type="DateTimeRange", bq_last_update=False, bq_table_last_year_month=True, + is_bd_pro=True, + is_free=True, api_mode="prod", billing_project_id="basedosdados", date_format="yy-mm", - ) - - with case(update_metadata, True): - update_django_metadata( - dataset_id, - table_id + "_atualizado", - metadata_type="DateTimeRange", - bq_last_update=False, - bq_table_last_year_month=True, - api_mode="prod", - billing_project_id="basedosdados", - date_format="yy-mm", - upstream_tasks=[update], + time_delta=6, + time_unit="months", ) br_comex_municipio_exportacao.storage = GCS(constants.GCS_FLOWS_BUCKET.value) @@ -240,63 +201,15 @@ metadata_type="DateTimeRange", bq_last_update=False, bq_table_last_year_month=True, + is_bd_pro=True, + is_free=True, api_mode="prod", billing_project_id="basedosdados", date_format="yy-mm", + time_delta=6, + time_unit="months", ) - # materialize municipio_importacao_atualizado - with case(materialize_after_dump, True): - # Trigger DBT flow run - current_flow_labels = get_current_flow_labels() - materialization_flow = create_flow_run( - flow_name=utils_constants.FLOW_EXECUTE_DBT_MODEL_NAME.value, - project_name=constants.PREFECT_DEFAULT_PROJECT.value, - parameters={ - "dataset_id": dataset_id, - "table_id": table_id + "_atualizado", - "mode": materialization_mode, - "dbt_alias": dbt_alias, - }, - labels=current_flow_labels, - run_name=f"Materialize {dataset_id}.{table_id}", - ) - - wait_for_materialization = wait_for_flow_run( - materialization_flow, - stream_states=True, - stream_logs=True, - raise_final_state=True, - ) - wait_for_materialization.max_retries = ( - dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_ATTEMPTS.value - ) - wait_for_materialization.retry_delay = timedelta( - seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value - ) - with case(update_metadata, True): - update = update_django_metadata( - dataset_id, - table_id, - metadata_type="DateTimeRange", - bq_last_update=False, - bq_table_last_year_month=True, - api_mode="prod", - billing_project_id="basedosdados", - date_format="yy-mm", - ) - with case(update_metadata, True): - update_django_metadata( - dataset_id, - table_id + "_atualizado", - metadata_type="DateTimeRange", - bq_last_update=False, - bq_table_last_year_month=True, - api_mode="prod", - billing_project_id="basedosdados", - date_format="yy-mm", - upstream_tasks=[update], - ) br_comex_municipio_importacao.storage = GCS(constants.GCS_FLOWS_BUCKET.value) br_comex_municipio_importacao.run_config = KubernetesRun( @@ -362,35 +275,6 @@ run_name=f"Materialize {dataset_id}.{table_id}", ) - wait_for_materialization = wait_for_flow_run( - materialization_flow, - stream_states=True, - stream_logs=True, - raise_final_state=True, - ) - wait_for_materialization.max_retries = ( - dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_ATTEMPTS.value - ) - wait_for_materialization.retry_delay = timedelta( - seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value - ) - # materialize ncm_exportacao - with case(materialize_after_dump, True): - # Trigger DBT flow run - current_flow_labels = get_current_flow_labels() - materialization_flow = create_flow_run( - flow_name=utils_constants.FLOW_EXECUTE_DBT_MODEL_NAME.value, - project_name=constants.PREFECT_DEFAULT_PROJECT.value, - parameters={ - "dataset_id": dataset_id, - "table_id": table_id + "_atualizado", - "mode": materialization_mode, - "dbt_alias": dbt_alias, - }, - labels=current_flow_labels, - run_name=f"Materialize {dataset_id}.{table_id}", - ) - wait_for_materialization = wait_for_flow_run( materialization_flow, stream_states=True, @@ -410,23 +294,15 @@ metadata_type="DateTimeRange", bq_last_update=False, bq_table_last_year_month=True, + is_bd_pro=True, + is_free=True, api_mode="prod", billing_project_id="basedosdados", date_format="yy-mm", + time_delta=6, + time_unit="months", ) - with case(update_metadata, True): - update_django_metadata( - dataset_id, - table_id + "_atualizado", - metadata_type="DateTimeRange", - bq_last_update=False, - bq_table_last_year_month=True, - api_mode="prod", - billing_project_id="basedosdados", - date_format="yy-mm", - upstream_tasks=[update], - ) br_comex_ncm_exportacao.storage = GCS(constants.GCS_FLOWS_BUCKET.value) br_comex_ncm_exportacao.run_config = KubernetesRun(image=constants.DOCKER_IMAGE.value) @@ -502,35 +378,6 @@ seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value ) - # materialize ncm_importacao_atualizado - with case(materialize_after_dump, True): - # Trigger DBT flow run - current_flow_labels = get_current_flow_labels() - materialization_flow = create_flow_run( - flow_name=utils_constants.FLOW_EXECUTE_DBT_MODEL_NAME.value, - project_name=constants.PREFECT_DEFAULT_PROJECT.value, - parameters={ - "dataset_id": dataset_id, - "table_id": table_id + "_atualizado", - "mode": materialization_mode, - "dbt_alias": dbt_alias, - }, - labels=current_flow_labels, - run_name=f"Materialize {dataset_id}.{table_id}", - ) - - wait_for_materialization = wait_for_flow_run( - materialization_flow, - stream_states=True, - stream_logs=True, - raise_final_state=True, - ) - wait_for_materialization.max_retries = ( - dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_ATTEMPTS.value - ) - wait_for_materialization.retry_delay = timedelta( - seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value - ) with case(update_metadata, True): update = update_django_metadata( dataset_id, @@ -538,22 +385,13 @@ metadata_type="DateTimeRange", bq_last_update=False, bq_table_last_year_month=True, + is_bd_pro=True, + is_free=True, api_mode="prod", billing_project_id="basedosdados", date_format="yy-mm", - ) - - with case(update_metadata, True): - update_django_metadata( - dataset_id, - table_id + "_atualizado", - metadata_type="DateTimeRange", - bq_last_update=False, - bq_table_last_year_month=True, - api_mode="prod", - billing_project_id="basedosdados", - date_format="yy-mm", - upstream_tasks=[update], + time_delta=6, + time_unit="months", ) br_comex_ncm_importacao.storage = GCS(constants.GCS_FLOWS_BUCKET.value) diff --git a/pipelines/datasets/br_me_comex_stat/schedules.py b/pipelines/datasets/br_me_comex_stat/schedules.py index 0f28294fa..6d9b9caf3 100644 --- a/pipelines/datasets/br_me_comex_stat/schedules.py +++ b/pipelines/datasets/br_me_comex_stat/schedules.py @@ -15,7 +15,7 @@ clocks=[ CronClock( cron="@monthly", - start_date=datetime(2023, 8, 8, 0, 0), + start_date=datetime(2023, 10, 4, 0, 0), labels=[ constants.BASEDOSDADOS_PROD_AGENT_LABEL.value, ], @@ -25,6 +25,7 @@ "materialization_mode": "prod", "materialize after dump": True, "dbt_alias": False, + "update_metadata": True, }, ) ], @@ -36,7 +37,7 @@ clocks=[ CronClock( cron="@monthly", - start_date=datetime(2023, 8, 8, 0, 0), + start_date=datetime(2023, 10, 4, 0, 0), labels=[ constants.BASEDOSDADOS_PROD_AGENT_LABEL.value, ], @@ -46,6 +47,7 @@ "materialization_mode": "prod", "materialize after dump": True, "dbt_alias": False, + "update_metadata": True, }, ) ], @@ -57,7 +59,7 @@ clocks=[ CronClock( cron="@monthly", - start_date=datetime(2023, 8, 8, 0, 0), + start_date=datetime(2023, 10, 4, 0, 0), labels=[ constants.BASEDOSDADOS_PROD_AGENT_LABEL.value, ], @@ -67,6 +69,7 @@ "materialization_mode": "prod", "materialize after dump": True, "dbt_alias": False, + "update_metadata": True, }, ) ], @@ -78,7 +81,7 @@ clocks=[ CronClock( cron="@monthly", - start_date=datetime(2023, 8, 8, 0, 0), + start_date=datetime(2023, 10, 4, 0, 0), labels=[ constants.BASEDOSDADOS_PROD_AGENT_LABEL.value, ], @@ -88,6 +91,7 @@ "materialization_mode": "prod", "materialize after dump": True, "dbt_alias": False, + "update_metadata": True, }, ) ], diff --git a/pipelines/datasets/br_ons_avaliacao_operacao/flows.py b/pipelines/datasets/br_ons_avaliacao_operacao/flows.py index 2c21e96ff..9a8aea290 100644 --- a/pipelines/datasets/br_ons_avaliacao_operacao/flows.py +++ b/pipelines/datasets/br_ons_avaliacao_operacao/flows.py @@ -25,11 +25,11 @@ from pipelines.utils.constants import constants as utils_constants from pipelines.utils.decorators import Flow from pipelines.utils.execute_dbt_model.constants import constants as dump_db_constants +from pipelines.utils.metadata.tasks import update_django_metadata from pipelines.utils.tasks import ( create_table_and_upload_to_gcs, get_current_flow_labels, rename_current_flow_run_dataset_table, - update_django_metadata, ) with Flow( @@ -108,6 +108,8 @@ bq_last_update=False, bq_table_last_year_month=True, billing_project_id="basedosdados", + is_bd_pro=True, + is_free=False, api_mode="prod", date_format="yy-mm-dd", upstream_tasks=[wait_for_materialization], @@ -196,6 +198,8 @@ bq_last_update=False, bq_table_last_year_month=True, billing_project_id="basedosdados", + is_bd_pro=True, + is_free=False, api_mode="prod", date_format="yy-mm-dd", upstream_tasks=[wait_for_materialization], @@ -287,6 +291,8 @@ bq_last_update=False, bq_table_last_year_month=True, billing_project_id="basedosdados", + is_bd_pro=True, + is_free=False, api_mode="prod", date_format="yy-mm-dd", upstream_tasks=[wait_for_materialization], @@ -379,6 +385,8 @@ bq_last_update=False, bq_table_last_year_month=True, billing_project_id="basedosdados", + is_bd_pro=True, + is_free=False, api_mode="prod", date_format="yy-mm-dd", upstream_tasks=[wait_for_materialization], @@ -472,6 +480,8 @@ bq_last_update=False, bq_table_last_year_month=True, billing_project_id="basedosdados", + is_bd_pro=True, + is_free=False, api_mode="prod", date_format="yy-mm-dd", upstream_tasks=[wait_for_materialization], diff --git a/pipelines/datasets/br_ons_estimativa_custos/flows.py b/pipelines/datasets/br_ons_estimativa_custos/flows.py index dc3b3c942..cf1b30eec 100644 --- a/pipelines/datasets/br_ons_estimativa_custos/flows.py +++ b/pipelines/datasets/br_ons_estimativa_custos/flows.py @@ -24,11 +24,11 @@ from pipelines.utils.constants import constants as utils_constants from pipelines.utils.decorators import Flow from pipelines.utils.execute_dbt_model.constants import constants as dump_db_constants +from pipelines.utils.metadata.tasks import update_django_metadata from pipelines.utils.tasks import ( create_table_and_upload_to_gcs, get_current_flow_labels, rename_current_flow_run_dataset_table, - update_django_metadata, ) with Flow( @@ -108,6 +108,8 @@ bq_last_update=False, bq_table_last_year_month=True, billing_project_id="basedosdados", + is_bd_pro=True, + is_free=False, api_mode="prod", date_format="yy-mm-dd", upstream_tasks=[wait_for_materialization], @@ -201,6 +203,8 @@ bq_last_update=False, bq_table_last_year_month=True, billing_project_id="basedosdados", + is_bd_pro=True, + is_free=False, api_mode="prod", date_format="yy-mm-dd", upstream_tasks=[wait_for_materialization], @@ -294,6 +298,8 @@ bq_last_update=False, bq_table_last_year_month=True, billing_project_id="basedosdados", + is_bd_pro=True, + is_free=False, api_mode="prod", date_format="yy-mm-dd", upstream_tasks=[wait_for_materialization], @@ -388,6 +394,8 @@ bq_last_update=False, bq_table_last_year_month=True, billing_project_id="basedosdados", + is_bd_pro=True, + is_free=False, api_mode="prod", date_format="yy-mm-dd", upstream_tasks=[wait_for_materialization], diff --git a/pipelines/datasets/br_rj_isp_estatisticas_seguranca/flows.py b/pipelines/datasets/br_rj_isp_estatisticas_seguranca/flows.py index ecdb83d03..d70322502 100644 --- a/pipelines/datasets/br_rj_isp_estatisticas_seguranca/flows.py +++ b/pipelines/datasets/br_rj_isp_estatisticas_seguranca/flows.py @@ -31,11 +31,11 @@ from pipelines.utils.constants import constants as utils_constants from pipelines.utils.decorators import Flow from pipelines.utils.execute_dbt_model.constants import constants as dump_db_constants +from pipelines.utils.metadata.tasks import update_django_metadata from pipelines.utils.tasks import ( create_table_and_upload_to_gcs, get_current_flow_labels, rename_current_flow_run_dataset_table, - update_django_metadata, ) # ! Evolucao_mensal_cisp @@ -112,17 +112,20 @@ seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value ) - with case(update_metadata, True): - date = get_today_date() # task que retorna a data atual - update_django_metadata( - dataset_id, - table_id, - metadata_type="DateTimeRange", - bq_last_update=False, - api_mode="prod", - date_format="yy-mm", - _last_date=date, - ) + with case(update_metadata, True): + date = get_today_date() # task que retorna a data atual + update_django_metadata( + dataset_id, + table_id, + metadata_type="DateTimeRange", + bq_table_last_year_month=False, + bq_last_update=False, + is_bd_pro=False, + is_free=True, + api_mode="prod", + date_format="yy-mm", + _last_date=date, + ) evolucao_mensal_cisp.storage = GCS(constants.GCS_FLOWS_BUCKET.value) evolucao_mensal_cisp.run_config = KubernetesRun(image=constants.DOCKER_IMAGE.value) @@ -203,17 +206,20 @@ seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value ) - with case(update_metadata, True): - date = get_today_date() # task que retorna a data atual - update_django_metadata( - dataset_id, - table_id, - metadata_type="DateTimeRange", - bq_last_update=False, - api_mode="prod", - date_format="yy-mm", - _last_date=date, - ) + with case(update_metadata, True): + date = get_today_date() # task que retorna a data atual + update_django_metadata( + dataset_id, + table_id, + metadata_type="DateTimeRange", + bq_table_last_year_month=False, + bq_last_update=False, + is_bd_pro=False, + is_free=True, + api_mode="prod", + date_format="yy-mm", + _last_date=date, + ) taxa_evolucao_mensal_uf.storage = GCS(constants.GCS_FLOWS_BUCKET.value) taxa_evolucao_mensal_uf.run_config = KubernetesRun(image=constants.DOCKER_IMAGE.value) @@ -296,17 +302,20 @@ seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value ) - with case(update_metadata, True): - date = get_today_date() # task que retorna a data atual - update_django_metadata( - dataset_id, - table_id, - metadata_type="DateTimeRange", - bq_last_update=False, - api_mode="prod", - date_format="yy-mm", - _last_date=date, - ) + with case(update_metadata, True): + date = get_today_date() # task que retorna a data atual + update_django_metadata( + dataset_id, + table_id, + metadata_type="DateTimeRange", + bq_table_last_year_month=False, + bq_last_update=False, + is_bd_pro=False, + is_free=True, + api_mode="prod", + date_format="yy-mm", + _last_date=date, + ) taxa_evolucao_mensal_municipio.storage = GCS(constants.GCS_FLOWS_BUCKET.value) @@ -390,17 +399,20 @@ seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value ) - with case(update_metadata, True): - date = get_today_date() # task que retorna a data atual - update_django_metadata( - dataset_id, - table_id, - metadata_type="DateTimeRange", - bq_last_update=False, - api_mode="prod", - date_format="yy-mm", - _last_date=date, - ) + with case(update_metadata, True): + date = get_today_date() # task que retorna a data atual + update_django_metadata( + dataset_id, + table_id, + metadata_type="DateTimeRange", + bq_table_last_year_month=False, + bq_last_update=False, + is_bd_pro=False, + is_free=True, + api_mode="prod", + date_format="yy-mm", + _last_date=date, + ) feminicidio_mensal_cisp.storage = GCS(constants.GCS_FLOWS_BUCKET.value) @@ -484,17 +496,20 @@ seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value ) - with case(update_metadata, True): - date = get_today_date() # task que retorna a data atual - update_django_metadata( - dataset_id, - table_id, - metadata_type="DateTimeRange", - bq_last_update=False, - api_mode="prod", - date_format="yy-mm", - _last_date=date, - ) + with case(update_metadata, True): + date = get_today_date() # task que retorna a data atual + update_django_metadata( + dataset_id, + table_id, + metadata_type="DateTimeRange", + bq_table_last_year_month=False, + bq_last_update=False, + is_bd_pro=False, + is_free=True, + api_mode="prod", + date_format="yy-mm", + _last_date=date, + ) evolucao_policial_morto_servico_mensal.storage = GCS(constants.GCS_FLOWS_BUCKET.value) evolucao_policial_morto_servico_mensal.run_config = KubernetesRun( @@ -579,17 +594,20 @@ seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value ) - with case(update_metadata, True): - date = get_today_date() # task que retorna a data atual - update_django_metadata( - dataset_id, - table_id, - metadata_type="DateTimeRange", - bq_last_update=False, - api_mode="prod", - date_format="yy-mm", - _last_date=date, - ) + with case(update_metadata, True): + date = get_today_date() # task que retorna a data atual + update_django_metadata( + dataset_id, + table_id, + metadata_type="DateTimeRange", + bq_table_last_year_month=False, + bq_last_update=False, + is_bd_pro=False, + is_free=True, + api_mode="prod", + date_format="yy-mm", + _last_date=date, + ) armas_apreendidas_mensal.storage = GCS(constants.GCS_FLOWS_BUCKET.value) armas_apreendidas_mensal.run_config = KubernetesRun(image=constants.DOCKER_IMAGE.value) @@ -670,17 +688,20 @@ seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value ) - with case(update_metadata, True): - date = get_today_date() # task que retorna a data atual - update_django_metadata( - dataset_id, - table_id, - metadata_type="DateTimeRange", - bq_last_update=False, - api_mode="prod", - date_format="yy-mm", - _last_date=date, - ) + with case(update_metadata, True): + date = get_today_date() # task que retorna a data atual + update_django_metadata( + dataset_id, + table_id, + metadata_type="DateTimeRange", + bq_table_last_year_month=False, + bq_last_update=False, + is_bd_pro=False, + is_free=True, + api_mode="prod", + date_format="yy-mm", + _last_date=date, + ) evolucao_mensal_municipio.storage = GCS(constants.GCS_FLOWS_BUCKET.value) evolucao_mensal_municipio.run_config = KubernetesRun(image=constants.DOCKER_IMAGE.value) @@ -761,17 +782,20 @@ seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value ) - with case(update_metadata, True): - date = get_today_date() # task que retorna a data atual - update_django_metadata( - dataset_id, - table_id, - metadata_type="DateTimeRange", - bq_last_update=False, - api_mode="prod", - date_format="yy-mm", - _last_date=date, - ) + with case(update_metadata, True): + date = get_today_date() # task que retorna a data atual + update_django_metadata( + dataset_id, + table_id, + metadata_type="DateTimeRange", + bq_table_last_year_month=False, + bq_last_update=False, + is_bd_pro=False, + is_free=True, + api_mode="prod", + date_format="yy-mm", + _last_date=date, + ) evolucao_mensal_uf.storage = GCS(constants.GCS_FLOWS_BUCKET.value) evolucao_mensal_uf.run_config = KubernetesRun(image=constants.DOCKER_IMAGE.value) diff --git a/pipelines/datasets/br_stf_corte_aberta/__init__.py b/pipelines/datasets/br_stf_corte_aberta/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/pipelines/datasets/br_stf_corte_aberta/constants.py b/pipelines/datasets/br_stf_corte_aberta/constants.py new file mode 100644 index 000000000..ee6711acd --- /dev/null +++ b/pipelines/datasets/br_stf_corte_aberta/constants.py @@ -0,0 +1,52 @@ +# -*- coding: utf-8 -*- +""" +Constant values for the datasets projects +""" +from enum import Enum + + +class constants(Enum): # pylint: disable=c0103 + STF_INPUT = "/tmp/input/" + STF_OUTPUT = "/tmp/output/" + + STF_LINK = "https://transparencia.stf.jus.br/extensions/decisoes/decisoes.html" + + RENAME = { + "Ano da decisão": "ano", + "Classe": "classe", + "Número": "numero", + "Relator": "relator", + "Link": "link", + "Subgrupo andamento decisão": "subgrupo_andamento", + "Andamento decisão": "andamento", + "Observação do andamento": "observacao_andamento_decisao", + "Indicador virtual": "modalidade_julgamento", + "Indicador Colegiada": "tipo_julgamento", + "Indicador eletrônico": "meio_tramitacao", + "Indicador de tramitação": "indicador_tramitacao", + "Assuntos do processo": "assunto_processo", + "Ramo direito": "ramo_direito", + "Data de autuação": "data_autuacao", + "Data da decisão": "data_decisao", + "Data baixa processo": "data_baixa_processo", + } + + ORDEM = [ + "ano", + "classe", + "numero", + "relator", + "link", + "subgrupo_andamento", + "andamento", + "observacao_andamento_decisao", + "modalidade_julgamento", + "tipo_julgamento", + "meio_tramitacao", + "indicador_tramitacao", + "assunto_processo", + "ramo_direito", + "data_autuacao", + "data_decisao", + "data_baixa_processo", + ] diff --git a/pipelines/datasets/br_stf_corte_aberta/flows.py b/pipelines/datasets/br_stf_corte_aberta/flows.py new file mode 100644 index 000000000..52a86280e --- /dev/null +++ b/pipelines/datasets/br_stf_corte_aberta/flows.py @@ -0,0 +1,116 @@ +# -*- coding: utf-8 -*- +""" +Flows for br_stf_corte_aberta +""" +from datetime import timedelta + +from prefect import Parameter, case +from prefect.run_configs import KubernetesRun +from prefect.storage import GCS +from prefect.tasks.prefect import create_flow_run, wait_for_flow_run + +from pipelines.constants import constants +from pipelines.datasets.br_stf_corte_aberta.schedules import every_day_stf +from pipelines.datasets.br_stf_corte_aberta.tasks import ( + check_for_updates, + download_and_transform, + make_partitions, +) +from pipelines.datasets.br_stf_corte_aberta.utils import check_for_data +from pipelines.utils.constants import constants as utils_constants +from pipelines.utils.decorators import Flow +from pipelines.utils.execute_dbt_model.constants import constants as dump_db_constants +from pipelines.utils.metadata.tasks import update_django_metadata +from pipelines.utils.tasks import ( + create_table_and_upload_to_gcs, + get_current_flow_labels, + log_task, + rename_current_flow_run_dataset_table, +) + +with Flow(name="br_stf_corte_aberta.decisoes", code_owners=["trick"]) as br_stf: + # Parameters + dataset_id = Parameter("dataset_id", default="br_stf_corte_aberta", required=True) + table_id = Parameter("table_id", default="decisoes", required=True) + materialization_mode = Parameter( + "materialization_mode", default="dev", required=False + ) + materialize_after_dump = Parameter( + "materialize_after_dump", default=True, required=False + ) + dbt_alias = Parameter("dbt_alias", default=True, required=False) + update_metadata = Parameter("update_metadata", default=True, required=False) + rename_flow_run = rename_current_flow_run_dataset_table( + prefix="Dump: ", dataset_id=dataset_id, table_id=table_id, wait=table_id + ) + update_metadata = Parameter("update_metadata", default=True, required=False) + dados_desatualizados = check_for_updates( + dataset_id=dataset_id, table_id=table_id, upstream_tasks=[rename_flow_run] + ) + log_task(f"Checando se os dados estão desatualizados: {dados_desatualizados}") + with case(dados_desatualizados, False): + log_task( + "Dados atualizados, não é necessário fazer o download", + upstream_tasks=[dados_desatualizados, rename_flow_run], + ) + with case(dados_desatualizados, True): + df = download_and_transform(upstream_tasks=[rename_flow_run]) + output_path = make_partitions(df=df, upstream_tasks=[df]) + wait_upload_table = create_table_and_upload_to_gcs( + data_path=output_path, + dataset_id=dataset_id, + table_id=table_id, + dump_mode="append", + wait=output_path, + upstream_tasks=[output_path], + ) + with case(materialize_after_dump, True): + # Trigger DBT flow run + current_flow_labels = get_current_flow_labels() + materialization_flow = create_flow_run( + flow_name=utils_constants.FLOW_EXECUTE_DBT_MODEL_NAME.value, + project_name=constants.PREFECT_DEFAULT_PROJECT.value, + parameters={ + "dataset_id": dataset_id, + "table_id": table_id, + "mode": materialization_mode, + "dbt_alias": dbt_alias, + }, + labels=current_flow_labels, + run_name=f"Materialize {dataset_id}.{table_id}", + upstream_tasks=[wait_upload_table], + ) + wait_for_materialization = wait_for_flow_run( + materialization_flow, + stream_states=True, + stream_logs=True, + raise_final_state=True, + ) + wait_for_materialization.max_retries = ( + dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_ATTEMPTS.value + ) + wait_for_materialization.retry_delay = timedelta( + seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value + ) + get_max_date = check_for_data() + get_max_date_string = str(get_max_date) + with case(update_metadata, True): + update_django_metadata( + dataset_id=dataset_id, + table_id=table_id, + metadata_type="DateTimeRange", + bq_last_update=False, + bq_table_last_year_month=False, + # billing_project_id="basedosdados-dev", + api_mode="prod", + date_format="yy-mm-dd", + is_bd_pro=True, + _last_date=get_max_date_string, + is_free=True, + time_delta=6, + time_unit="weeks", + upstream_tasks=[wait_for_materialization], + ) +br_stf.storage = GCS(constants.GCS_FLOWS_BUCKET.value) +br_stf.run_config = KubernetesRun(image=constants.DOCKER_IMAGE.value) +br_stf.schedule = every_day_stf diff --git a/pipelines/datasets/br_stf_corte_aberta/schedules.py b/pipelines/datasets/br_stf_corte_aberta/schedules.py new file mode 100644 index 000000000..a14d57105 --- /dev/null +++ b/pipelines/datasets/br_stf_corte_aberta/schedules.py @@ -0,0 +1,31 @@ +# -*- coding: utf-8 -*- +""" +Schedules for br_stf_corte_aberta +""" + +from datetime import datetime + +from prefect.schedules import Schedule +from prefect.schedules.clocks import CronClock + +from pipelines.constants import constants + +every_day_stf = Schedule( + clocks=[ + CronClock( + cron="0 12 * * *", # Irá rodar todos os dias meio dia + start_date=datetime(2021, 1, 1), + labels=[ + constants.BASEDOSDADOS_PROD_AGENT_LABEL.value, + ], + parameter_defaults={ + "dataset_id": "br_stf_corte_aberta", + "table_id": "decisoes", + "materialization_mode": "prod", + "materialize_after_dump": True, + "dbt_alias": True, + "update_metadata": True, + }, + ), + ] +) diff --git a/pipelines/datasets/br_stf_corte_aberta/tasks.py b/pipelines/datasets/br_stf_corte_aberta/tasks.py new file mode 100644 index 000000000..fba15990a --- /dev/null +++ b/pipelines/datasets/br_stf_corte_aberta/tasks.py @@ -0,0 +1,74 @@ +# -*- coding: utf-8 -*- +""" +Tasks for br_stf_corte_aberta +""" +from datetime import timedelta + +import pandas as pd +from prefect import task + +from pipelines.constants import constants +from pipelines.datasets.br_stf_corte_aberta.constants import constants as stf_constants +from pipelines.datasets.br_stf_corte_aberta.utils import ( + check_for_data, + column_bool, + extract_last_date, + fix_columns_data, + partition_data, + read_csv, + rename_ordening_columns, + replace_columns, +) +from pipelines.utils.utils import log + + +@task( + max_retries=constants.TASK_MAX_RETRIES.value, + retry_delay=timedelta(seconds=constants.TASK_RETRY_DELAY.value), +) +def check_for_updates(dataset_id, table_id): + data_obj = check_for_data() + # Obtém a última data no site BD + data_bq_obj = extract_last_date( + dataset_id=dataset_id, + table_id=table_id, + date_format="yy-mm-dd", + billing_project_id="basedosdados", + data="data_decisao", + ) + # Registra a data mais recente do site + log(f"Última data no site do STF: {data_obj}") + log(f"Última data no site da BD: {data_bq_obj}") + # Compara as datas para verificar se há atualizações + if data_obj > data_bq_obj: + return True # Há atualizações disponíveis + else: + return False # Não há novas atualizações disponíveis + + +@task( + max_retries=constants.TASK_MAX_RETRIES.value, + retry_delay=timedelta(seconds=constants.TASK_RETRY_DELAY.value), +) +# make_partition +def make_partitions(df): + partition_data(df, "data_decisao", stf_constants.STF_OUTPUT.value) + return stf_constants.STF_OUTPUT.value + + +@task( + max_retries=constants.TASK_MAX_RETRIES.value, + retry_delay=timedelta(seconds=constants.TASK_RETRY_DELAY.value), +) +def download_and_transform(): + log("Iniciando a leitura do csv") + df = read_csv() + log("Iniciando a correção das colunas de data") + df = fix_columns_data(df) + log("Iniciando a correção da coluna booleana") + df = column_bool(df) + log("Iniciando a renomeação e ordenação das colunas") + df = rename_ordening_columns(df) + log("Iniciando a substituição de variáveis") + df = replace_columns(df) + return df diff --git a/pipelines/datasets/br_stf_corte_aberta/utils.py b/pipelines/datasets/br_stf_corte_aberta/utils.py new file mode 100644 index 000000000..e1cfa4f3f --- /dev/null +++ b/pipelines/datasets/br_stf_corte_aberta/utils.py @@ -0,0 +1,207 @@ +# -*- coding: utf-8 -*- +""" +General purpose functions for the br_stf_corte_aberta project +""" +import os +import time +from datetime import datetime + +import basedosdados as bd +import numpy as np +import pandas as pd +from selenium import webdriver + +from pipelines.datasets.br_stf_corte_aberta.constants import constants as stf_constants +from pipelines.utils.utils import log + + +def web_scrapping(): + if not os.path.exists(stf_constants.STF_INPUT.value): + os.mkdir(stf_constants.STF_INPUT.value) + options = webdriver.ChromeOptions() + # https://github.com/SeleniumHQ/selenium/issues/11637 + prefs = { + "download.default_directory": stf_constants.STF_INPUT.value, + "download.prompt_for_download": False, + "download.directory_upgrade": True, + "safebrowsing.enabled": True, + } + options.add_experimental_option( + "prefs", + prefs, + ) + options.add_argument("--headless=new") + options.add_argument("--no-sandbox") + options.add_argument("--disable-gpu") + options.add_argument("--disable-dev-shm-usage") + options.add_argument("--crash-dumps-dir=/tmp") + options.add_argument("--remote-debugging-port=9222") + driver = webdriver.Chrome(options=options) + time.sleep(30) + driver.get(stf_constants.STF_LINK.value) + time.sleep(30) + driver.maximize_window() + time.sleep(30) + driver.find_element("xpath", '//*[@id="EXPORT-BUTTON-2"]/button').click() + time.sleep(30) + + +def read_csv(): + arquivos = os.listdir(stf_constants.STF_INPUT.value) + log("Verificando dados dentro do container") + log(arquivos) + for arquivo in arquivos: + if arquivo.endswith(".csv"): + df = pd.read_csv(stf_constants.STF_INPUT.value + arquivo, dtype=str) + return df + + +def fix_columns_data(df): + lista = ["Data de autuação", "Data da decisão", "Data baixa processo"] + for x in lista: + df[x] = df[x].astype(str).str[0:10] + df[x] = ( + df[x].astype(str).str[6:10] + + "-" + + df[x].astype(str).str[3:5] + + "-" + + df[x].astype(str).str[0:2] + ) + return df + + +def column_bool(df): + df["Indicador de tramitação"] = ( + df["Indicador de tramitação"].replace("Não", "false").replace("Sim", "true") + ) + return df + + +def rename_ordening_columns(df): + df.rename(columns=stf_constants.RENAME.value, inplace=True) + df = df[stf_constants.ORDEM.value] + return df + + +def replace_columns(df): + df["assunto_processo"] = df["assunto_processo"].apply( + lambda x: str(x).replace("\r", " ") + ) + df = df.apply(lambda x: x.replace("-", "").replace(np.nan, "")) + return df + + +def partition_data(df: pd.DataFrame, column_name: list[str], output_directory: str): + """ + Particiona os dados em subconjuntos de acordo com os valores únicos de uma coluna. + Salva cada subconjunto em um arquivo CSV separado. + df: DataFrame a ser particionado + column_name: nome da coluna a ser usada para particionar os dados + output_directory: diretório onde os arquivos CSV serão salvos + """ + unique_values = df[column_name].unique() + for value in unique_values: + value_str = str(value)[:10] + date_value = datetime.strptime(value_str, "%Y-%m-%d").date() + formatted_value = date_value.strftime("%Y-%m-%d") + partition_path = os.path.join( + output_directory, f"{column_name}={formatted_value}" + ) + if not os.path.exists(partition_path): + os.makedirs(partition_path) + df_partition = df[df[column_name] == value].copy() + df_partition.drop([column_name], axis=1, inplace=True) + csv_path = os.path.join(partition_path, "data.csv") + # mode = "a" if os.path.exists(csv_path) else "w" + df_partition.to_csv( + csv_path, + sep=",", + index=False, + encoding="utf-8", + na_rep="", + ) + + +def extract_last_date( + dataset_id, table_id, date_format: str, billing_project_id: str, data: str = "data" +): + """ + Extracts the last update date of a given dataset table. + Args: + dataset_id (str): The ID of the dataset. + table_id (str): The ID of the table. + date_format (str): Date format ('yy-mm' or 'yy-mm-dd') + if set to 'yy-mm' the function will look for ano and mes named columns in the table_id + and return a concatenated string in the formar yyyy-mm. if set to 'yyyy-mm-dd' + the function will look for data named column in the format 'yyyy-mm-dd' and return it. + Returns: + str: The last update date in the format 'yyyy-mm' or 'yyyy-mm-dd'. + Raises: + Exception: If an error occurs while extracting the last update date. + """ + if date_format == "yy-mm": + try: + query_bd = f""" + SELECT + MAX(CONCAT(ano,"-",mes)) as max_date + FROM + `{billing_project_id}.{dataset_id}.{table_id}` + """ + t = bd.read_sql( + query=query_bd, + billing_project_id=billing_project_id, + from_file=True, + ) + input_date_str = t["max_date"][0] + date_obj = datetime.strptime(input_date_str, "%Y-%m") + last_date = date_obj.strftime("%Y-%m") + log(f"Última data YYYY-MM: {last_date}") + return last_date + except Exception as e: + log(f"An error occurred while extracting the last update date: {str(e)}") + raise + else: + try: + query_bd = f""" + SELECT + MAX({data}) as max_date + FROM + `{billing_project_id}.{dataset_id}.{table_id}` + """ + log(f"Query: {query_bd}") + t = bd.read_sql( + query=query_bd, + billing_project_id=billing_project_id, + from_file=True, + ) + # it infers that the data variable is already on basedosdados standart format + # yyyy-mm-dd + last_date = t["max_date"][0] + log(f"Última data YYYY-MM-DD: {last_date}") + return last_date + except Exception as e: + log(f"An error occurred while extracting the last update date: {str(e)}") + raise + + +def check_for_data(): + log("Iniciando web scrapping") + web_scrapping() + log("Iniciando o check for data") + arquivos = os.listdir(stf_constants.STF_INPUT.value) + for arquivo in arquivos: + if arquivo.endswith(".csv"): + df = pd.read_csv(stf_constants.STF_INPUT.value + arquivo, dtype=str) + + df["Data da decisão"] = df["Data da decisão"].astype(str).str[0:10] + data_obj = df["Data da decisão"] = ( + df["Data da decisão"].astype(str).str[6:10] + + "-" + + df["Data da decisão"].astype(str).str[3:5] + + "-" + + df["Data da decisão"].astype(str).str[0:2] + ) + data_obj = data_obj.max() + data_obj = datetime.strptime(data_obj, "%Y-%m-%d").date() + + return data_obj diff --git a/pipelines/utils/execute_dbt_model/flows.py b/pipelines/utils/execute_dbt_model/flows.py index 66ae2f99b..a36108ccb 100644 --- a/pipelines/utils/execute_dbt_model/flows.py +++ b/pipelines/utils/execute_dbt_model/flows.py @@ -19,6 +19,7 @@ table_id = Parameter("table_id") mode = Parameter("mode", default="dev", required=False) dbt_alias = Parameter("dbt_alias", default=False, required=False) + dbt_command = Parameter("dbt_command", default="run", required=False) ################# #################### # @@ -39,7 +40,9 @@ table_id=table_id, dbt_alias=dbt_alias, sync=True, + dbt_command=dbt_command, ) + run_dbt_model_flow.storage = GCS(constants.GCS_FLOWS_BUCKET.value) run_dbt_model_flow.run_config = KubernetesRun(image=constants.DOCKER_IMAGE.value) diff --git a/pipelines/utils/execute_dbt_model/tasks.py b/pipelines/utils/execute_dbt_model/tasks.py index 5c005e88f..1092232ba 100644 --- a/pipelines/utils/execute_dbt_model/tasks.py +++ b/pipelines/utils/execute_dbt_model/tasks.py @@ -11,6 +11,7 @@ from pipelines.constants import constants from pipelines.utils.execute_dbt_model.utils import get_dbt_client +from pipelines.utils.utils import log @task( @@ -42,15 +43,40 @@ def run_dbt_model( dataset_id: str, table_id: str, dbt_alias: bool, + dbt_command: str, sync: bool = True, ): """ Run a DBT model. """ + if dbt_command not in ["run", "test", "run and test", "run/test"]: + raise ValueError(f"Invalid dbt_command: {dbt_command}") + if dbt_alias: table_id = f"{dataset_id}__{table_id}" - dbt_client.cli( - f"run --models {dataset_id}.{table_id}", - sync=sync, - logs=True, - ) + + if "run" in dbt_command: + logs_dict = dbt_client.cli( + f"run --models {dataset_id}.{table_id}", + sync=sync, + logs=True, + ) + for event in logs_dict["result"]["logs"]: + if event["levelname"] == "INFO": + log(event["message"]) + if event["levelname"] == "DEBUG": + if "On model" in event["message"]: + log(event["message"]) + + if "test" in dbt_command: + logs_dict = dbt_client.cli( + f"test --models {dataset_id}.{table_id}", + sync=sync, + logs=True, + ) + for event in logs_dict["result"]["logs"]: + if event["levelname"] == "INFO": + log(event["message"]) + if event["levelname"] == "DEBUG": + if "On model" in event["message"]: + log(event["message"]) diff --git a/pipelines/utils/metadata/tasks.py b/pipelines/utils/metadata/tasks.py index 6c1969e43..1c529f374 100644 --- a/pipelines/utils/metadata/tasks.py +++ b/pipelines/utils/metadata/tasks.py @@ -45,7 +45,7 @@ def update_django_metadata( time_unit: str = "days", ): """ - Updates Django metadata. + Updates Django metadata. Version 1.2. Args: - `dataset_id (str):` The ID of the dataset. @@ -117,9 +117,6 @@ def update_django_metadata( f"Unidade temporal inválida. Escolha entre {', '.join(unidades_permitidas.keys())}" ) - if not isinstance(time_delta, int) or time_delta <= 0: - raise ValueError("Defasagem deve ser um número inteiro positivo") - if billing_project_id not in accepted_billing_project_id: raise Exception( f"The given billing_project_id: {billing_project_id} is invalid. The accepted valuesare {accepted_billing_project_id}" @@ -166,6 +163,8 @@ def update_django_metadata( api_mode=api_mode, ) elif is_bd_pro and is_free: + if not isinstance(time_delta, int) or time_delta <= 0: + raise ValueError("Defasagem deve ser um número inteiro positivo") last_date = extract_last_update( dataset_id, table_id, @@ -215,7 +214,7 @@ def update_django_metadata( ] = resource_to_temporal_coverage_free["endYear"] log( - f"Cobertura PRO ->> {_last_date} || Cobertura Grátis ->> {free_data}" + f"Cobertura PRO ->> {last_date} || Cobertura Grátis ->> {free_data}" ) # resource_to_temporal_coverage = parse_temporal_coverage(f"{last_date}") @@ -254,7 +253,7 @@ def update_django_metadata( date_format, billing_project_id=billing_project_id, ) - log(f"Cobertura PRO ->> {_last_date}") + log(f"Cobertura PRO ->> {last_date}") resource_to_temporal_coverage = parse_temporal_coverage(f"{last_date}") resource_to_temporal_coverage["coverage"] = ids.get("coverage_id_pro") @@ -346,7 +345,7 @@ def update_django_metadata( ] = resource_to_temporal_coverage_free["endYear"] log( - f"Cobertura PRO ->> {_last_date} || Cobertura Grátis ->> {free_data}" + f"Cobertura PRO ->> {last_date} || Cobertura Grátis ->> {free_data}" ) # resource_to_temporal_coverage = parse_temporal_coverage(f"{last_date}") @@ -385,7 +384,7 @@ def update_django_metadata( date_format, billing_project_id=billing_project_id, ) - log(f"Cobertura PRO ->> {_last_date}") + log(f"Cobertura PRO ->> {last_date}") resource_to_temporal_coverage = parse_temporal_coverage(f"{last_date}") resource_to_temporal_coverage["coverage"] = ids.get("coverage_id_pro") @@ -402,6 +401,9 @@ def update_django_metadata( api_mode=api_mode, ) else: + if not isinstance(_last_date, str): + raise ValueError("O parâmetro `last_date` deve ser do tipo string") + if is_free and not is_bd_pro: last_date = _last_date resource_to_temporal_coverage = parse_temporal_coverage(f"{_last_date}") diff --git a/pipelines/utils/metadata/utils.py b/pipelines/utils/metadata/utils.py index b2dae8094..bffffbffd 100644 --- a/pipelines/utils/metadata/utils.py +++ b/pipelines/utils/metadata/utils.py @@ -472,21 +472,6 @@ def create_update( def parse_temporal_coverage(temporal_coverage): - padrao_ano = r"\d{4}\(\d{1,2}\)\d{4}" - padrao_mes = r"\d{4}-\d{2}\(\d{1,2}\)\d{4}-\d{2}" - padrao_semana = r"\d{4}-\d{2}-\d{2}\(\d{1,2}\)\d{4}-\d{2}-\d{2}" - padrao_dia = r"\d{4}-\d{2}-\d{2}\(\d{1,2}\)\d{4}-\d{2}-\d{2}" - - if ( - re.match(padrao_ano, temporal_coverage) - or re.match(padrao_mes, temporal_coverage) - or re.match(padrao_semana, temporal_coverage) - or re.match(padrao_dia, temporal_coverage) - ): - print("A data está no formato correto.") - else: - print("Aviso: A data não está no formato correto.") - # Extrai as informações de data e intervalo da string if "(" in temporal_coverage: start_str, interval_str, end_str = re.split(r"[(|)]", temporal_coverage) diff --git a/pipelines/utils/utils.py b/pipelines/utils/utils.py index d7a2acce1..9dee7e36c 100644 --- a/pipelines/utils/utils.py +++ b/pipelines/utils/utils.py @@ -774,7 +774,7 @@ def extract_last_date( SELECT MAX({data}) as max_date FROM - `basedosdados.{dataset_id}.{table_id}` + `{billing_project_id}.{dataset_id}.{table_id}` """ log(f"Query: {query_bd}") t = bd.read_sql( diff --git a/pyproject.toml b/pyproject.toml index 32e1161a9..5f75f0af2 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -113,6 +113,19 @@ pytest_cov = "^3.0.0" [tool.poetry.scripts] lint = "scripts.lint:main" +[tool.black] +exclude = "pipelines/{{cookiecutter.project_name}}" + +[tool.isort] +profile = "black" +multi_line_output = 3 +use_parentheses = true +include_trailing_comma = true +skip = ["pipelines/{{cookiecutter.project_name}}"] + +[tool.autoflake] +exclude = "pipelines/{{cookiecutter.project_name}}" + [build-system] build-backend = "poetry.core.masonry.api" requires = ["poetry-core>=1.0.0"] diff --git a/scripts/lint.py b/scripts/lint.py index eb60aa416..c601d3f6a 100644 --- a/scripts/lint.py +++ b/scripts/lint.py @@ -11,43 +11,8 @@ def main(): """Lint all python files in the project""" code = 0 code |= run(["poetry", "check"]) - code |= run( - [ - "isort", - "--profile", - "black", - "--skip", - "pipelines/{{cookiecutter.project_name}}", - "--check-only", - ".", - ] - ) - code |= run( - [ - "black", - "--exclude", - "pipelines/{{cookiecutter.project_name}}", - "--check", - ".", - ] - ) - code |= run( - [ - "autoflake", - "--exclude", - "pipelines/{{cookiecutter.project_name}}", - "--check", - "--recursive", - "--quiet", - ".", - ] - ) - code |= run( - [ - "flake8", - "--exclude", - "pipelines/{{cookiecutter.project_name}}", - ".", - ] - ) + code |= run(["black", "--check", "."]) + code |= run(["isort", "--check-only", "."]) + code |= run(["autoflake", "--check", "--recursive", "--quiet", "."]) + code |= run(["flake8", "--exclude", "pipelines/{{cookiecutter.project_name}}", "."]) exit(code)