diff --git a/.nojekyll b/.nojekyll new file mode 100644 index 000000000..e69de29bb diff --git a/404.html b/404.html new file mode 100644 index 000000000..4d32e2349 --- /dev/null +++ b/404.html @@ -0,0 +1,685 @@ + + + +
+ + + + + + + + + + + +O BigQuery é o um serviço de banco de dados em nuvem da +Google. Você faz consultas ao banco em SQL direto do navegador com:
+Rapidez: Mesmo queries muito longas demoram apenas minutos para serem processadas.
+Escala: O BigQuery escala magicamente para hexabytes se necessário.
+Economia: Todo usuário possui 1 TB gratuito por mês para consulta + aos dados.
+Pronto(a) para começar? Nesta página você encontra:
+ +Para criar um projeto no Google Cloud basta ter um email cadastrado no +Google. É necessário ter um projeto seu, mesmo que vazio, para você +fazer queries em nosso datalake público.
+Create Project/Criar Projeto
. Escolha um nome bacana para o projeto.Create/Criar
A Google fornece 1 TB gratuito por mês de uso do BigQuery para cada +projeto que você possui. Um projeto é necessário para ativar os +serviços do Google Cloud, incluindo a permissão de uso do BigQuery. +Pense no projeto como a "conta" na qual a Google vai contabilizar o +quanto de processamento você já utilizou. Não é necessário adicionar +nenhum cartão ou forma de pagamento - O BigQuery inicia automaticamente no modo Sandbox, que permite você utilizar seus recursos sem adicionar um modo de pagamento. Leia mais aqui.
+basedosdados
O botão abaixo via te direcionar ao nosso projeto no Google BigQuery:
+ +Agora você precisa fixar o projeto da BD no seu BigQuery, é bem simples, veja:
+!!! Warning A opção Fixar um projeto pode aparecer também como Marcar projeto com estrela por nome
+ +Dentro do projeto existem dois níveis de organização dos dados, +datasets (conjuntos de dados) e +tables (tabelas), nos quais:
+br_ibge_populacao
contém uma tabela municipio
com a série
+ histórica de população a
+ nível municipal)municipio
em br_ibge_populacao
é diferente de municipio
em br_bd_diretorios
)Caso não apareçam as tabelas na 1ª vez que você acessar, atualize a página.
+Que tal fazer uma consulta simples? Vamos usar o Editor de Consultas do +BigQuery para ver as informações sobre municípios direto na nossa base de diretórios brasileiros. Para isso, +copiar e colar o código abaixo:
+SELECT * FROM `basedosdados.br_bd_diretorios_brasil.municipio`
+
Só clicar em Executar e pronto!
+ +Dica
+Clicando no botão 🔍 Consultar tabela/Query View
, o BigQuery cria
+automaticamente a estrutura básica da sua query em Query Editor/Editor
+de consultas
- basta você completar com os campos e filtros que
+achar necessários.
Está seção é dedicada a apresentar dicas de como reduzir custos de processamento para aproveitar ao máximo os dados da BD!
+Para usuários que acessam os dados em projetos públicos como o da Base dos Dados o único tipo de custo associado se refere ao custo de processamento das consultas. A notícia boa, como mencionado acima, é que todo usuário possui 1 TB gratuito por mês para consultar livremente os dados do maior data lake público do Brasil. Se você ainda não possui um projeto no BQ consulte a sessão acima para criá-lo.
+Nesta seção, apresentamos algumas dicas simples para reduzir os custos das consultas no Big Query e aproveitar ao máximo os dados da BD! Antes de partir para os exemplos, apresentaremos o mecanismo básico de previsão de custos de processamento de consultas no Big Query (BQ).
+Estimativas de custos
+No canto superior direito da interface do BQ é informado um aviso com estimativa do custo de processamento que será cobrado do seu projeto apos a execução da consulta.
+ +Este é o mecanismo básico e prontamente acessível de previsibilidade dos custos de processamento. Infelizmente, não funciona para todas as tabelas. Por motivos de limitação interna do próprio Big Query, consultas à tabelas específicas não exibem estimativas de custos. É o caso das tabelas que possuem Row Access Policy. Isto é, tabelas onde o número de linhas acessíveis é limitada a depender do usuário. Este é o caso das tabelas que fazem parte do serviço BDpro
+Exemplo da tabela agencia
do conjunto br_bcb_estban
.
{ width=100% }
+A arquitetura do Big Query utiliza o armazenamento orientado a colunas, isto é, cada coluna é armazenada separadamente. Esta característica tem uma implicação clara quanto aos custos de processamento: quanto mais colunas forem selecionadas, maior será o custo.
+Evite: Selecionar colunas em excesso
+ SELECT *
+
SELECT coluna1, coluna2
+
microdados
do conjunto br_ms_sim
.
+SELECT sequencial_obito, tipo_obito, data_obito FROM `basedosdados.br_ms_sim.microdados`
+
As partições são divisões feitas em uma tabela para facilitar o gerenciamento e a consulta dos dados. No momento de execução da consulta, o Big Query ignora linhas que possuem um valor da partição diferente do utilizado no filtro. Isto normalmente reduz significativamente a quantidade de linhas lidas e, o que nos interessa, reduz o custo de processamento.
+Clusters são agrupamentos organizados em uma tabela com base nos valores de uma ou mais colunas especificadas. Durante a execução de uma consulta, o BigQuery otimiza a leitura dos dados, acessando apenas os segmentos que contêm os valores relevantes das colunas de cluster. Isso significa que, ao invés de escanear toda a tabela, apenas as partes necessárias são lidas, o que geralmente reduz a quantidade de dados processados e, consequentemente, reduz o custo de processamento.
+Como saber qual coluna foi utilizada para particionar e clusterizar uma tabela específica?
+Pelos metadados na página de tabela no site da BD
+Note que o campo Partições no Big Query elenca tanto as partições quanto os clusters.
+Pelos metadados na página de 'Detalhes' no Big Query
+Note que são elencadas ambas informações: partições e clusters. Neste caso, a coluna ano foi definida como partição e a coluna sigla_uf como cluster.
+Prática recomendada: sempre que possível, utilize colunas particionadas e clusterizadas para filtrar/agregar os dados.
+Exemplo
+SELECT sequencial_obito, tipo_obito, data_obito FROM `basedosdados.br_ms_sim.microdados` where ano = 2015
+
Certifique-se de que o join é realmente necessário para a análise que você está realizando. Às vezes, operações alternativas como subconsultas ou agregações podem ser mais eficientes.
+Entenda a Lógica do JOIN
+Para entender a fundo boas práticas e problemas recorrentes com joins sugerimos os guias SQL Joins na prática e Maximizando a Eficiência com JOIN em Consultas SQL para Combinar Tabelas
+Utilize as dicas anteriores
+Para entender mais sobre a interface do BigQuery e como explorar os +dados, preparamos um texto completo no blog com um exemplo de busca dos +dados da RAIS - Ministério da Economia.
+Cansado(a) da leitura? Temos também um vídeo completo no nosso Youtube.
+O BigQuery possui um mecanismo de busca que permite buscar por nomes +de datasets (conjuntos), tables (tabelas) ou labels (grupos). +Construímos regras de nomeação simples e práticas para facilitar sua +busca - veja mais.
+O Power BI é uma das tecnologias mais populares para o desenvolvimento +de dashboards com dados relacionais. Por isso, preparamos um tutorial +para você descobrir como usar os dados do datalake no desenvolvimento dos seus dashboards.
+Está começando a aprender sobre SQL para fazer suas consultas? Abaixo +colocamos algumas recomendações usadas pela nossa equipe tanto no +aprendizado quanto no dia-a-dia:
+ + + + + + + + + +Os pacotes da Base dos Dados permitem o acesso ao datalake público +direto do seu computador ou ambiente de desenvolvimento. Atualmente disponíveis em:
+Pronto(a) para começar? Nesta página você encontra:
+ +Para criar um projeto no Google Cloud basta ter um email cadastrado no +Google. É necessário ter um projeto seu, mesmo que vazio, para você +fazer queries em nosso datalake público.
+Create Project/Criar Projeto
. Escolha um nome bacana para o projeto.Create/Criar
A Google fornece 1 TB gratuito por mês de uso do BigQuery para cada +projeto que você possui. Um projeto é necessário para ativar os +serviços do Google Cloud, incluindo a permissão de uso do BigQuery. +Pense no projeto como a "conta" na qual a Google vai contabilizar o +quanto de processamento você já utilizou. Não é necessário adicionar +nenhum cartão ou forma de pagamento - O BigQuery inicia automaticamente no modo Sandbox, que permite você utilizar seus recursos sem adicionar um modo de pagamento. Leia mais aqui.
+Para instalação do pacote em Python e linha de comando, você pode usar o
+pip
direto do seu terminal. Em R, basta instalar diretamente no
+RStudio ou editor de sua preferência.
pip install basedosdados
+
install.packages("basedosdados")
+
Requerimentos:
+Com os requerimentos satisfeitos, rodar os comandos abaixo: +
net install basedosdados, from("https://raw.githubusercontent.com/basedosdados/mais/master/stata-package")
+
Uma vez com seu projeto, você precisa configurar o pacote para usar o ID
+desse projeto nas consultas ao datalake. Para isso, você deve usar o
+project_id
que a Google fornece para você assim que o
+projeto é criado.
Não é necessário configurar o projeto de antemão. Assim que você +roda a 1ª consulta, o pacote irá indicar os passos para configuração.
+Uma vez com o project_id
, você deve passar essa
+informação para o pacote usando a função set_billing_id
.
+
set_billing_id("<YOUR_PROJECT_ID>")
+
É necessário especificar o project_id
a cada vez que usar o pacote.
Um exemplo simples para começar a explorar o datalake é puxar informações cadastrais de
+municípios direto na nossa base de Diretórios Brasileiros (tabela municipio
). Para isso, vamos usar a
+função download
, baixando os dados direto para nossa máquina.
import basedosdados as bd
+bd.download(savepath="<PATH>",
+dataset_id="br-bd-diretorios-brasil", table_id="municipio")
+
Para entender mais sobre a função download
, leia o manual de referência.
library("basedosdados")
+query <- "SELECT * FROM `basedosdados.br_bd_diretorios_brasil.municipio`"
+dir <- tempdir()
+data <- download(query, "<PATH>")
+
Para entender mais sobre a função download
, leia o manual de referência.
bd_read_sql, ///
+ path("<PATH>") ///
+ query("SELECT * FROM `basedosdados.br_bd_diretorios_brasil.municipio`") ///
+ billing_project_id("<PROJECT_ID>")
+
basedosdados download "where/to/save/file" \
+--billing_project_id <YOUR_PROJECT_ID> \
+--query 'SELECT * FROM
+`basedosdados.br_bd_diretorios_brasil.municipio`'
+
download
, leia o manual de referência.
+Preparamos tutoriais apresentando as principais funções de cada pacote +para você começar a usá-los.
+ +Esta API é composta por funções com 2 tipos de funcionalidade:
+Módulos para requisição de dados: para aquele(as) que desejam + somente consultar os dados e metadados do nosso projeto.
+Classes para gerenciamento de dados no Google Cloud: para + aqueles(as) que desejam subir dados no nosso projeto (ou qualquer outro + projeto no Google Cloud, seguindo a nossa metodologia e infraestrutura).
+Toda documentação do código abaixo está em inglês
+Functions to get metadata from BD's API
+ + + +check_input(f)
+
+
+Checks if the number of inputs is valid
+ +basedosdados/download/metadata.py
def check_input(f):
+ """Checks if the number of inputs is valid"""
+
+ @wraps(f)
+ def wrapper(*args, **kwargs):
+ if sum([a is not None for a in args]) > 1:
+ raise ValueError("At most one of the inputs must be non null")
+ return f(*args, **kwargs)
+
+ return wrapper
+
get_columns(table_id=None, column_id=None, columns_name=None, page=1, page_size=10, backend=None)
+
+
+Get a list of available columns,
+either by table_id
, column_id
or column_name
Parameters:
+Name | +Type | +Description | +Default | +
---|---|---|---|
table_id(str) |
+ + | table slug in google big query (gbq). |
+ required | +
column_id(str) |
+ + | column slug in google big query (gbq). |
+ required | +
column_name(str) |
+ + | table name in base dos dados metadata. |
+ required | +
page(int) |
+ + | page for pagination. |
+ required | +
page_size(int) |
+ + | page size for pagination. |
+ required | +
backend(Backend) |
+ + | backend instance, injected automatically. |
+ required | +
Returns:
+Type | +Description | +
---|---|
dict |
+ List of tables. |
+
basedosdados/download/metadata.py
@check_input
+@inject_backend
+def get_columns(
+ table_id: str = None,
+ column_id: str = None,
+ columns_name: str = None,
+ page: int = 1,
+ page_size: int = 10,
+ backend: Backend = None,
+) -> list[dict]:
+ """
+ Get a list of available columns,
+ either by `table_id`, `column_id` or `column_name`
+
+ Args:
+ table_id(str): table slug in google big query (gbq).
+ column_id(str): column slug in google big query (gbq).
+ column_name(str): table name in base dos dados metadata.
+
+ page(int): page for pagination.
+ page_size(int): page size for pagination.
+ backend(Backend): backend instance, injected automatically.
+
+ Returns:
+ dict: List of tables.
+ """
+
+ result = backend.get_columns(table_id, column_id, columns_name, page, page_size)
+ for item in result.get("items", []) or []:
+ item["bigquery_type"] = item.pop("bigqueryType", {}).get("name")
+ return result
+
get_datasets(dataset_id=None, dataset_name=None, page=1, page_size=10, backend=None)
+
+
+Get a list of available datasets,
+either by dataset_id
or dataset_name
Parameters:
+Name | +Type | +Description | +Default | +
---|---|---|---|
dataset_id(str) |
+ + | dataset slug in google big query (gbq). |
+ required | +
dataset_name(str) |
+ + | dataset name in base dos dados metadata. |
+ required | +
page(int) |
+ + | page for pagination. |
+ required | +
page_size(int) |
+ + | page size for pagination. |
+ required | +
backend(Backend) |
+ + | backend instance, injected automatically. |
+ required | +
Returns:
+Type | +Description | +
---|---|
dict |
+ List of datasets. |
+
basedosdados/download/metadata.py
@check_input
+@inject_backend
+def get_datasets(
+ dataset_id: str = None,
+ dataset_name: str = None,
+ page: int = 1,
+ page_size: int = 10,
+ backend: Backend = None,
+) -> list[dict]:
+ """
+ Get a list of available datasets,
+ either by `dataset_id` or `dataset_name`
+
+ Args:
+ dataset_id(str): dataset slug in google big query (gbq).
+ dataset_name(str): dataset name in base dos dados metadata.
+
+ page(int): page for pagination.
+ page_size(int): page size for pagination.
+ backend(Backend): backend instance, injected automatically.
+
+ Returns:
+ dict: List of datasets.
+ """
+ result = backend.get_datasets(dataset_id, dataset_name, page, page_size)
+ for item in result.get("items", []) or []:
+ item["organization"] = item.get("organization", {}).get("name")
+ item["tags"] = [i.get("name") for i in item.get("tags", {}).get("items")]
+ item["themes"] = [i.get("name") for i in item.get("themes", {}).get("items")]
+ return result
+
get_tables(dataset_id=None, table_id=None, table_name=None, page=1, page_size=10, backend=None)
+
+
+Get a list of available tables,
+either by dataset_id
, table_id
or table_name
Parameters:
+Name | +Type | +Description | +Default | +
---|---|---|---|
dataset_id(str) |
+ + | dataset slug in google big query (gbq). |
+ required | +
table_id(str) |
+ + | table slug in google big query (gbq). |
+ required | +
table_name(str) |
+ + | table name in base dos dados metadata. |
+ required | +
page(int) |
+ + | page for pagination. |
+ required | +
page_size(int) |
+ + | page size for pagination. |
+ required | +
backend(Backend) |
+ + | backend instance, injected automatically. |
+ required | +
Returns:
+Type | +Description | +
---|---|
dict |
+ List of tables. |
+
basedosdados/download/metadata.py
@check_input
+@inject_backend
+def get_tables(
+ dataset_id: str = None,
+ table_id: str = None,
+ table_name: str = None,
+ page: int = 1,
+ page_size: int = 10,
+ backend: Backend = None,
+) -> list[dict]:
+ """
+ Get a list of available tables,
+ either by `dataset_id`, `table_id` or `table_name`
+
+ Args:
+ dataset_id(str): dataset slug in google big query (gbq).
+ table_id(str): table slug in google big query (gbq).
+ table_name(str): table name in base dos dados metadata.
+
+ page(int): page for pagination.
+ page_size(int): page size for pagination.
+ backend(Backend): backend instance, injected automatically.
+
+ Returns:
+ dict: List of tables.
+ """
+
+ return backend.get_tables(dataset_id, table_id, table_name, page, page_size)
+
inject_backend(f)
+
+
+Inject backend instance if doesn't exists
+ +basedosdados/download/metadata.py
def inject_backend(f):
+ """Inject backend instance if doesn't exists"""
+
+ @wraps(f)
+ def wrapper(*args, **kwargs):
+ if "backend" not in kwargs:
+ kwargs["backend"] = Backend()
+ return f(*args, **kwargs)
+
+ return wrapper
+
search(q=None, page=1, page_size=10, backend=None)
+
+
+Search for datasets, querying all available metadata for the term q
Parameters:
+Name | +Type | +Description | +Default | +
---|---|---|---|
q(str) |
+ + | search term. |
+ required | +
page(int) |
+ + | page for pagination. |
+ required | +
page_size(int) |
+ + | page size for pagination. |
+ required | +
backend(Backend) |
+ + | backend instance, injected automatically. |
+ required | +
Returns:
+Type | +Description | +
---|---|
dict |
+ List of datasets and metadata. |
+
basedosdados/download/metadata.py
@check_input
+@inject_backend
+def search(
+ q: str = None,
+ page: int = 1,
+ page_size: int = 10,
+ backend: Backend = None,
+) -> list[dict]:
+ """
+ Search for datasets, querying all available metadata for the term `q`
+
+ Args:
+ q(str): search term.
+
+ page(int): page for pagination.
+ page_size(int): page size for pagination.
+ backend(Backend): backend instance, injected automatically.
+
+ Returns:
+ dict: List of datasets and metadata.
+ """
+ items = []
+ for item in backend.search(q, page, page_size).get("results", []):
+ items.append(
+ {
+ "slug": item.get("slug"),
+ "name": item.get("name"),
+ "description": item.get("description"),
+ "n_tables": item.get("n_tables"),
+ "n_raw_data_sources": item.get("n_raw_data_sources"),
+ "n_information_requests": item.get("n_information_requests"),
+ "organization": {
+ "slug": item.get("organizations", [{}])[0].get("slug"),
+ "name": item.get("organizations", [{}])[0].get("name"),
+ },
+ }
+ )
+ return items
+
Functions for managing downloads
+ + + +download(savepath, query=None, dataset_id=None, table_id=None, billing_project_id=None, query_project_id='basedosdados', limit=None, from_file=False, reauth=False, compression='GZIP')
+
+
+Download table or query result from basedosdados BigQuery (or other).
+Using a query:
+download('select * from
basedosdados.br_suporte.diretorio_municipioslimit 10')
Using dataset_id & table_id:
+download(dataset_id='br_suporte', table_id='diretorio_municipios')
You can also add arguments to modify save parameters:
+download(dataset_id='br_suporte', table_id='diretorio_municipios', index=False, sep='|')
Parameters:
+Name | +Type | +Description | +Default | +
---|---|---|---|
savepath |
+ str, pathlib.PosixPath |
+ savepath must be a file path. Only supports |
+ required | +
query |
+ str |
+ Optional. +Valid SQL Standard Query to basedosdados. If query is available, +dataset_id and table_id are not required. |
+ None |
+
dataset_id |
+ str |
+ Optional. +Dataset id available in basedosdados. It should always come with table_id. |
+ None |
+
table_id |
+ str |
+ Optional. +Table id available in basedosdados.dataset_id. +It should always come with dataset_id. |
+ None |
+
billing_project_id |
+ str |
+ Optional. +Project that will be billed. Find your Project ID here https://console.cloud.google.com/projectselector2/home/dashboard |
+ None |
+
query_project_id |
+ str |
+ Optional. +Which project the table lives. You can change this you want to query different projects. |
+ 'basedosdados' |
+
limit |
+ int |
+ Optional +Number of rows. |
+ None |
+
from_file |
+ boolean |
+ Optional. +Uses the credentials from file, located in `~/.basedosdados/credentials/ |
+ False |
+
reauth |
+ boolean |
+ Optional. +Re-authorize Google Cloud Project in case you need to change user or reset configurations. |
+ False |
+
compression |
+ str |
+ Optional.
+Compression type. Only |
+ 'GZIP' |
+
Exceptions:
+Type | +Description | +
---|---|
Exception |
+ If either table_id, dataset_id or query are empty. |
+
basedosdados/download/download.py
def download(
+ savepath,
+ query=None,
+ dataset_id=None,
+ table_id=None,
+ billing_project_id=None,
+ query_project_id="basedosdados",
+ limit=None,
+ from_file=False,
+ reauth=False,
+ compression="GZIP",
+):
+ """Download table or query result from basedosdados BigQuery (or other).
+
+ * Using a **query**:
+
+ `download('select * from `basedosdados.br_suporte.diretorio_municipios` limit 10')`
+
+ * Using **dataset_id & table_id**:
+
+ `download(dataset_id='br_suporte', table_id='diretorio_municipios')`
+
+ You can also add arguments to modify save parameters:
+
+ `download(dataset_id='br_suporte', table_id='diretorio_municipios', index=False, sep='|')`
+
+
+ Args:
+ savepath (str, pathlib.PosixPath):
+ savepath must be a file path. Only supports `.csv`.
+ query (str): Optional.
+ Valid SQL Standard Query to basedosdados. If query is available,
+ dataset_id and table_id are not required.
+ dataset_id (str): Optional.
+ Dataset id available in basedosdados. It should always come with table_id.
+ table_id (str): Optional.
+ Table id available in basedosdados.dataset_id.
+ It should always come with dataset_id.
+ billing_project_id (str): Optional.
+ Project that will be billed. Find your Project ID here https://console.cloud.google.com/projectselector2/home/dashboard
+ query_project_id (str): Optional.
+ Which project the table lives. You can change this you want to query different projects.
+ limit (int): Optional
+ Number of rows.
+ from_file (boolean): Optional.
+ Uses the credentials from file, located in `~/.basedosdados/credentials/
+ reauth (boolean): Optional.
+ Re-authorize Google Cloud Project in case you need to change user or reset configurations.
+ compression (str): Optional.
+ Compression type. Only `GZIP` is available for now.
+ Raises:
+ Exception: If either table_id, dataset_id or query are empty.
+ """
+
+ billing_project_id, from_file = _set_config_variables(
+ billing_project_id=billing_project_id, from_file=from_file
+ )
+
+ if (query is None) and ((table_id is None) or (dataset_id is None)):
+ raise BaseDosDadosException(
+ "Either table_id, dataset_id or query should be filled."
+ )
+
+ client = _google_client(billing_project_id, from_file, reauth)
+
+ # makes sure that savepath is a filepath and not a folder
+ savepath = _sets_savepath(savepath)
+
+ # if query is not defined (so it won't be overwritten) and if
+ # table is a view or external or if limit is specified,
+ # convert it to a query.
+ if not query and (
+ not _is_table(client, dataset_id, table_id, query_project_id) or limit
+ ):
+ query = f"""
+ SELECT *
+ FROM {query_project_id}.{dataset_id}.{table_id}
+ """
+
+ if limit is not None:
+ query += f" limit {limit}"
+
+ if query:
+ # sql queries produces anonymous tables, whose names
+ # can be found within `job._properties`
+ job = client["bigquery"].query(query)
+
+ # views may take longer: wait for job to finish.
+ _wait_for(job)
+
+ dest_table = job._properties["configuration"]["query"]["destinationTable"]
+
+ project_id = dest_table["projectId"]
+ dataset_id = dest_table["datasetId"]
+ table_id = dest_table["tableId"]
+
+ _direct_download(client, dataset_id, table_id, savepath, project_id, compression)
+
read_sql(query, billing_project_id=None, from_file=False, reauth=False, use_bqstorage_api=False)
+
+
+Load data from BigQuery using a query. Just a wrapper around pandas.read_gbq
+ +Parameters:
+Name | +Type | +Description | +Default | +
---|---|---|---|
query |
+ sql |
+ Valid SQL Standard Query to basedosdados |
+ required | +
billing_project_id |
+ str |
+ Optional. +Project that will be billed. Find your Project ID here https://console.cloud.google.com/projectselector2/home/dashboard |
+ None |
+
from_file |
+ boolean |
+ Optional. +Uses the credentials from file, located in `~/.basedosdados/credentials/ |
+ False |
+
reauth |
+ boolean |
+ Optional. +Re-authorize Google Cloud Project in case you need to change user or reset configurations. |
+ False |
+
use_bqstorage_api |
+ boolean |
+ Optional. +Use the BigQuery Storage API to download query results quickly, but at an increased cost(https://cloud.google.com/bigquery/docs/reference/storage/). +To use this API, first enable it in the Cloud Console(https://console.cloud.google.com/apis/library/bigquerystorage.googleapis.com). +You must also have the bigquery.readsessions.create permission on the project you are billing queries to. |
+ False |
+
Returns:
+Type | +Description | +
---|---|
pd.DataFrame |
+ Query result |
+
basedosdados/download/download.py
def read_sql(
+ query,
+ billing_project_id=None,
+ from_file=False,
+ reauth=False,
+ use_bqstorage_api=False,
+):
+ """Load data from BigQuery using a query. Just a wrapper around pandas.read_gbq
+
+ Args:
+ query (sql):
+ Valid SQL Standard Query to basedosdados
+ billing_project_id (str): Optional.
+ Project that will be billed. Find your Project ID here https://console.cloud.google.com/projectselector2/home/dashboard
+ from_file (boolean): Optional.
+ Uses the credentials from file, located in `~/.basedosdados/credentials/
+ reauth (boolean): Optional.
+ Re-authorize Google Cloud Project in case you need to change user or reset configurations.
+ use_bqstorage_api (boolean): Optional.
+ Use the BigQuery Storage API to download query results quickly, but at an increased cost(https://cloud.google.com/bigquery/docs/reference/storage/).
+ To use this API, first enable it in the Cloud Console(https://console.cloud.google.com/apis/library/bigquerystorage.googleapis.com).
+ You must also have the bigquery.readsessions.create permission on the project you are billing queries to.
+
+ Returns:
+ pd.DataFrame:
+ Query result
+ """
+
+ billing_project_id, from_file = _set_config_variables(
+ billing_project_id=billing_project_id, from_file=from_file
+ )
+
+ try:
+ # Set a two hours timeout
+ bigquery_storage_v1.client.BigQueryReadClient.read_rows = partialmethod(
+ bigquery_storage_v1.client.BigQueryReadClient.read_rows,
+ timeout=3600 * 2,
+ )
+
+ return read_gbq(
+ query,
+ project_id=config.billing_project_id,
+ use_bqstorage_api=use_bqstorage_api,
+ credentials=_credentials(from_file=config.from_file, reauth=reauth),
+ )
+ except GenericGBQException as e:
+ if "Reason: 403" in str(e):
+ raise BaseDosDadosAccessDeniedException from e
+
+ if re.match("Reason: 400 POST .* [Pp]roject[ ]*I[Dd]", str(e)):
+ raise BaseDosDadosInvalidProjectIDException from e
+
+ raise
+
+ except PyDataCredentialsError as e:
+ raise BaseDosDadosAuthorizationException from e
+
+ except (OSError, ValueError) as e:
+ no_billing_id = "Could not determine project ID" in str(e)
+ no_billing_id |= "reading from stdin while output is captured" in str(e)
+ if no_billing_id:
+ raise BaseDosDadosNoBillingProjectIDException from e
+ raise
+
read_table(dataset_id, table_id, billing_project_id=None, query_project_id='basedosdados', limit=None, from_file=False, reauth=False, use_bqstorage_api=False)
+
+
+Load data from BigQuery using dataset_id and table_id.
+ +Parameters:
+Name | +Type | +Description | +Default | +
---|---|---|---|
dataset_id |
+ str |
+ Optional. +Dataset id available in basedosdados. It should always come with table_id. |
+ required | +
table_id |
+ str |
+ Optional. +Table id available in basedosdados.dataset_id. +It should always come with dataset_id. |
+ required | +
billing_project_id |
+ str |
+ Optional. +Project that will be billed. Find your Project ID here https://console.cloud.google.com/projectselector2/home/dashboard |
+ None |
+
query_project_id |
+ str |
+ Optional. +Which project the table lives. You can change this you want to query different projects. |
+ 'basedosdados' |
+
limit |
+ int |
+ Optional. +Number of rows to read from table. |
+ None |
+
from_file |
+ boolean |
+ Optional. +Uses the credentials from file, located in `~/.basedosdados/credentials/ |
+ False |
+
reauth |
+ boolean |
+ Optional. +Re-authorize Google Cloud Project in case you need to change user or reset configurations. |
+ False |
+
use_bqstorage_api |
+ boolean |
+ Optional. +Use the BigQuery Storage API to download query results quickly, but at an increased cost(https://cloud.google.com/bigquery/docs/reference/storage/). +To use this API, first enable it in the Cloud Console(https://console.cloud.google.com/apis/library/bigquerystorage.googleapis.com). +You must also have the bigquery.readsessions.create permission on the project you are billing queries to. |
+ False |
+
Returns:
+Type | +Description | +
---|---|
pd.DataFrame |
+ Query result |
+
basedosdados/download/download.py
def read_table(
+ dataset_id,
+ table_id,
+ billing_project_id=None,
+ query_project_id="basedosdados",
+ limit=None,
+ from_file=False,
+ reauth=False,
+ use_bqstorage_api=False,
+):
+ """Load data from BigQuery using dataset_id and table_id.
+
+ Args:
+ dataset_id (str): Optional.
+ Dataset id available in basedosdados. It should always come with table_id.
+ table_id (str): Optional.
+ Table id available in basedosdados.dataset_id.
+ It should always come with dataset_id.
+ billing_project_id (str): Optional.
+ Project that will be billed. Find your Project ID here https://console.cloud.google.com/projectselector2/home/dashboard
+ query_project_id (str): Optional.
+ Which project the table lives. You can change this you want to query different projects.
+ limit (int): Optional.
+ Number of rows to read from table.
+ from_file (boolean): Optional.
+ Uses the credentials from file, located in `~/.basedosdados/credentials/
+ reauth (boolean): Optional.
+ Re-authorize Google Cloud Project in case you need to change user or reset configurations.
+ use_bqstorage_api (boolean): Optional.
+ Use the BigQuery Storage API to download query results quickly, but at an increased cost(https://cloud.google.com/bigquery/docs/reference/storage/).
+ To use this API, first enable it in the Cloud Console(https://console.cloud.google.com/apis/library/bigquerystorage.googleapis.com).
+ You must also have the bigquery.readsessions.create permission on the project you are billing queries to.
+
+
+ Returns:
+ pd.DataFrame:
+ Query result
+ """
+
+ billing_project_id, from_file = _set_config_variables(
+ billing_project_id=billing_project_id, from_file=from_file
+ )
+
+ if (dataset_id is not None) and (table_id is not None):
+ query = f"""
+ SELECT *
+ FROM `{query_project_id}.{dataset_id}.{table_id}`"""
+
+ if limit is not None:
+ query += f" LIMIT {limit}"
+ else:
+ raise BaseDosDadosException("Both table_id and dataset_id should be filled.")
+
+ return read_sql(
+ query,
+ billing_project_id=billing_project_id,
+ from_file=from_file,
+ reauth=reauth,
+ use_bqstorage_api=use_bqstorage_api,
+ )
+
Class for managing the files in cloud storage.
+ + + +
+Storage (Base)
+
+
+
+
+Manage files on Google Cloud Storage.
+ +basedosdados/upload/storage.py
class Storage(Base):
+ """
+ Manage files on Google Cloud Storage.
+ """
+
+ def __init__(self, dataset_id, table_id, **kwargs):
+ super().__init__(**kwargs)
+
+ self.bucket = self.client["storage_staging"].bucket(self.bucket_name)
+ self.dataset_id = dataset_id.replace("-", "_")
+ self.table_id = table_id.replace("-", "_")
+
+ @staticmethod
+ def _resolve_partitions(partitions):
+ if isinstance(partitions, dict):
+ return "/".join(f"{k}={v}" for k, v in partitions.items()) + "/"
+
+ if isinstance(partitions, str):
+ if partitions.endswith("/"):
+ partitions = partitions[:-1]
+
+ # If there is no partition
+ if len(partitions) == 0:
+ return ""
+
+ # It should fail if there is folder which is not a partition
+ try:
+ # check if it fits rule
+ {b.split("=")[0]: b.split("=")[1] for b in partitions.split("/")}
+ except IndexError as e:
+ raise Exception(
+ f"The path {partitions} is not a valid partition"
+ ) from e
+
+ return partitions + "/"
+
+ raise Exception(f"Partitions format or type not accepted: {partitions}")
+
+ def _build_blob_name(self, filename, mode, partitions=None):
+ """
+ Builds the blob name.
+ """
+
+ # table folder
+ blob_name = f"{mode}/{self.dataset_id}/{self.table_id}/"
+
+ # add partition folder
+ if partitions is not None:
+ blob_name += self._resolve_partitions(partitions)
+
+ # add file name
+ blob_name += filename
+
+ return blob_name
+
+ def init(self, replace=False, very_sure=False):
+ """Initializes bucket and folders.
+
+ Folder should be:
+
+ * `raw` : that contains really raw data
+ * `staging` : preprocessed data ready to upload to BigQuery
+
+ Args:
+ replace (bool): Optional.
+ Whether to replace if bucket already exists
+ very_sure (bool): Optional.
+ Are you aware that everything is going to be erased if you
+ replace the bucket?
+
+ Raises:
+ Warning: very_sure argument is still False.
+ """
+
+ if replace:
+ if not very_sure:
+ raise Warning(
+ "\n********************************************************"
+ "\nYou are trying to replace all the data that you have "
+ f"in bucket {self.bucket_name}.\nAre you sure?\n"
+ "If yes, add the flag --very_sure\n"
+ "********************************************************"
+ )
+ self.bucket.delete(force=True)
+
+ self.client["storage_staging"].create_bucket(self.bucket)
+
+ for folder in ["staging/", "raw/"]:
+ self.bucket.blob(folder).upload_from_string("")
+
+ def upload(
+ self,
+ path,
+ mode="all",
+ partitions=None,
+ if_exists="raise",
+ chunk_size=None,
+ **upload_args,
+ ):
+ """Upload to storage at `<bucket_name>/<mode>/<dataset_id>/<table_id>`. You can:
+
+ * Add a single **file** setting `path = <file_path>`.
+
+ * Add a **folder** with multiple files setting `path =
+ <folder_path>`. *The folder should just contain the files and
+ no folders.*
+
+ * Add **partitioned files** setting `path = <folder_path>`.
+ This folder must follow the hive partitioning scheme i.e.
+ `<table_id>/<key>=<value>/<key2>=<value2>/<partition>.csv`
+ (ex: `mytable/country=brasil/year=2020/mypart.csv`).
+
+ *Remember all files must follow a single schema.* Otherwise, things
+ might fail in the future.
+
+ There are 6 modes:
+
+ * `raw` : should contain raw files from datasource
+ * `staging` : should contain pre-treated files ready to upload to BiqQuery
+ * `header`: should contain the header of the tables
+ * `auxiliary_files`: should contain auxiliary files from eache table
+ * `architecture`: should contain the architecture sheet of the tables
+ * `all`: if no treatment is needed, use `all`.
+
+ Args:
+ path (str or pathlib.PosixPath): Where to find the file or
+ folder that you want to upload to storage
+
+ mode (str): Folder of which dataset to update [raw|staging|header|auxiliary_files|architecture|all]
+
+ partitions (str, pathlib.PosixPath, or dict): Optional.
+ *If adding a single file*, use this to add it to a specific partition.
+
+ * str : `<key>=<value>/<key2>=<value2>`
+ * dict: `dict(key=value, key2=value2)`
+
+ if_exists (str): Optional.
+ What to do if data exists
+
+ * 'raise' : Raises Conflict exception
+ * 'replace' : Replace table
+ * 'pass' : Do nothing
+ chunk_size (int): Optional
+ The size of a chunk of data whenever iterating (in bytes).
+ This must be a multiple of 256 KB per the API specification.
+ If not specified, the chunk_size of the blob itself is used. If that is not specified, a default value of 40 MB is used.
+
+ upload_args ():
+ Extra arguments accepted by [`google.cloud.storage.blob.Blob.upload_from_file`](https://googleapis.dev/python/storage/latest/blobs.html?highlight=upload_from_filename#google.cloud.storage.blob.Blob.upload_from_filename)
+ """
+
+ if (self.dataset_id is None) or (self.table_id is None):
+ raise Exception("You need to pass dataset_id and table_id")
+
+ path = Path(path)
+
+ if path.is_dir():
+ paths = [
+ f
+ for f in path.glob("**/*")
+ if f.is_file() and f.suffix in [".csv", ".parquet", "parquet.gzip"]
+ ]
+
+ parts = [
+ (
+ filepath.as_posix()
+ .replace(path.as_posix() + "/", "")
+ .replace(str(filepath.name), "")
+ )
+ for filepath in paths
+ ]
+
+ else:
+ paths = [path]
+ parts = [partitions or None]
+
+ self._check_mode(mode)
+
+ mode = (
+ ["raw", "staging", "header", "auxiliary_files", "architecture"]
+ if mode == "all"
+ else [mode]
+ )
+ for m in mode:
+ for filepath, part in tqdm(list(zip(paths, parts)), desc="Uploading files"):
+ blob_name = self._build_blob_name(filepath.name, m, part)
+
+ blob = self.bucket.blob(blob_name, chunk_size=chunk_size)
+
+ if not blob.exists() or if_exists == "replace":
+ upload_args["timeout"] = upload_args.get("timeout", None)
+
+ blob.upload_from_filename(str(filepath), **upload_args)
+
+ elif if_exists == "pass":
+ pass
+
+ else:
+ raise BaseDosDadosException(
+ f"Data already exists at {self.bucket_name}/{blob_name}. "
+ "If you are using Storage.upload then set if_exists to "
+ "'replace' to overwrite data \n"
+ "If you are using Table.create then set if_storage_data_exists "
+ "to 'replace' to overwrite data."
+ )
+
+ logger.success(
+ " {object} {filename}_{mode} was {action}!",
+ filename=filepath.name,
+ mode=m,
+ object="File",
+ action="uploaded",
+ )
+
+ def download(
+ self,
+ filename="*",
+ savepath=".",
+ partitions=None,
+ mode="staging",
+ if_not_exists="raise",
+ ):
+ """Download files from Google Storage from path `mode`/`dataset_id`/`table_id`/`partitions`/`filename` and replicate folder hierarchy
+ on save,
+
+ There are 5 modes:
+ * `raw` : should contain raw files from datasource
+ * `staging` : should contain pre-treated files ready to upload to BiqQuery
+ * `header`: should contain the header of the tables
+ * `auxiliary_files`: should contain auxiliary files from eache table
+ * `architecture`: should contain the architecture sheet of the tables
+
+ You can also use the `partitions` argument to choose files from a partition
+
+ Args:
+ filename (str): Optional
+ Specify which file to download. If "*" , downloads all files within the bucket folder. Defaults to "*".
+
+ savepath (str):
+ Where you want to save the data on your computer. Must be a path to a directory.
+
+ partitions (str, dict): Optional
+ If downloading a single file, use this to specify the partition path from which to download.
+
+ * str : `<key>=<value>/<key2>=<value2>`
+ * dict: `dict(key=value, key2=value2)`
+
+
+ mode (str): Optional
+ Folder of which dataset to update.[raw|staging|header|auxiliary_files|architecture]
+
+ if_not_exists (str): Optional.
+ What to do if data not found.
+
+ * 'raise' : Raises FileNotFoundError.
+ * 'pass' : Do nothing and exit the function
+
+ Raises:
+ FileNotFoundError: If the given path `<mode>/<dataset_id>/<table_id>/<partitions>/<filename>` could not be found or there are no files to download.
+ """
+
+ # Prefix to locate files within the bucket
+ prefix = f"{mode}/{self.dataset_id}/{self.table_id}/"
+
+ # Add specific partition to search prefix
+ if partitions:
+ prefix += self._resolve_partitions(partitions)
+
+ # if no filename is passed, list all blobs within a given table
+ if filename != "*":
+ prefix += filename
+
+ blob_list = list(self.bucket.list_blobs(prefix=prefix))
+
+ # if there are no blobs matching the search raise FileNotFoundError or return
+ if not blob_list:
+ if if_not_exists == "raise":
+ raise FileNotFoundError(f"Could not locate files at {prefix}")
+ return
+
+ # download all blobs matching the search to given savepath
+ for blob in tqdm(blob_list, desc="Download Blob"):
+ # parse blob.name and get the csv file name
+ csv_name = blob.name.split("/")[-1]
+
+ # build folder path replicating storage hierarchy
+ blob_folder = blob.name.replace(csv_name, "")
+
+ # replicate folder hierarchy
+ savepath = Path(savepath)
+ (savepath / blob_folder).mkdir(parents=True, exist_ok=True)
+
+ # download blob to savepath
+ save_file_path = savepath / blob.name
+
+ blob.download_to_filename(filename=save_file_path)
+
+ logger.success(
+ " {object} {object_id}_{mode} was {action} at: {path}!",
+ object_id=self.dataset_id,
+ mode=mode,
+ object="File",
+ action="downloaded",
+ path={str(savepath)},
+ )
+
+ def delete_file(self, filename, mode, partitions=None, not_found_ok=False):
+ """Deletes file from path `<bucket_name>/<mode>/<dataset_id>/<table_id>/<partitions>/<filename>`.
+
+ Args:
+ filename (str): Name of the file to be deleted
+
+ mode (str): Folder of which dataset to update [raw|staging|header|auxiliary_files|architecture|all]
+
+ partitions (str, pathlib.PosixPath, or dict): Optional.
+ Hive structured partition as a string or dict
+
+ * str : `<key>=<value>/<key2>=<value2>`
+ * dict: `dict(key=value, key2=value2)`
+
+ not_found_ok (bool): Optional.
+ What to do if file not found
+ """
+
+ self._check_mode(mode)
+
+ mode = (
+ ["raw", "staging", "header", "auxiliary_files", "architecture"]
+ if mode == "all"
+ else [mode]
+ )
+
+ for m in mode:
+ blob = self.bucket.blob(self._build_blob_name(filename, m, partitions))
+
+ if blob.exists() or not blob.exists() and not not_found_ok:
+ blob.delete()
+ else:
+ return
+
+ logger.success(
+ " {object} {filename}_{mode} was {action}!",
+ filename=filename,
+ mode=mode,
+ object="File",
+ action="deleted",
+ )
+
+ def delete_table(self, mode="staging", bucket_name=None, not_found_ok=False):
+ """Deletes a table from storage, sends request in batches.
+
+ Args:
+ mode (str): Folder of which dataset to update [raw|staging|header|auxiliary_files|architecture]
+ Folder of which dataset to update. Defaults to "staging".
+
+ bucket_name (str):
+ The bucket name from which to delete the table. If None, defaults to the bucket initialized when instantiating the Storage object.
+ (You can check it with the Storage().bucket property)
+
+ not_found_ok (bool): Optional.
+ What to do if table not found
+
+ """
+
+ prefix = f"{mode}/{self.dataset_id}/{self.table_id}/"
+
+ if bucket_name is not None:
+ table_blobs = list(
+ self.client["storage_staging"]
+ .bucket(f"{bucket_name}")
+ .list_blobs(prefix=prefix)
+ )
+
+ else:
+ table_blobs = list(self.bucket.list_blobs(prefix=prefix))
+
+ if not table_blobs:
+ if not_found_ok:
+ return
+ raise FileNotFoundError(
+ f"Could not find the requested table {self.dataset_id}.{self.table_id}"
+ )
+ # Divides table_blobs list for maximum batch request size
+ table_blobs_chunks = [
+ table_blobs[i : i + 999] for i in range(0, len(table_blobs), 999) # noqa
+ ]
+
+ for i, source_table in enumerate(
+ tqdm(table_blobs_chunks, desc="Delete Table Chunk")
+ ):
+ counter = 0
+ while counter < 10:
+ try:
+ with self.client["storage_staging"].batch():
+ for blob in source_table:
+ blob.delete()
+ break
+ except Exception:
+ print(
+ f"Delete Table Chunk {i} | Attempt {counter}: delete operation starts again in 5 seconds...",
+ )
+ time.sleep(5)
+ counter += 1
+ traceback.print_exc(file=sys.stderr)
+ logger.success(
+ " {object} {object_id}_{mode} was {action}!",
+ object_id=self.table_id,
+ mode=mode,
+ object="Table",
+ action="deleted",
+ )
+
+ def copy_table(
+ self,
+ source_bucket_name="basedosdados",
+ destination_bucket_name=None,
+ mode="staging",
+ new_table_id=None,
+ ):
+ """Copies table from a source bucket to your bucket, sends request in batches.
+
+ Args:
+ source_bucket_name (str):
+ The bucket name from which to copy data. You can change it
+ to copy from other external bucket.
+
+ destination_bucket_name (str): Optional
+ The bucket name where data will be copied to.
+ If None, defaults to the bucket initialized when instantiating the Storage object (You can check it with the
+ Storage().bucket property)
+
+ mode (str): Folder of which dataset to update [raw|staging|header|auxiliary_files|architecture]
+ Folder of which dataset to update. Defaults to "staging".
+ new_table_id (str): Optional.
+ New table id to be copied to. If None, defaults to the table id initialized when instantiating the Storage object.
+ """
+
+ source_table_ref = list(
+ self.client["storage_staging"]
+ .bucket(source_bucket_name)
+ .list_blobs(prefix=f"{mode}/{self.dataset_id}/{self.table_id}/")
+ )
+
+ if not source_table_ref:
+ raise FileNotFoundError(
+ f"Could not find the requested table {self.dataset_id}.{self.table_id}"
+ )
+
+ if destination_bucket_name is None:
+ destination_bucket = self.bucket
+
+ else:
+ destination_bucket = self.client["storage_staging"].bucket(
+ destination_bucket_name
+ )
+
+ # Divides source_table_ref list for maximum batch request size
+ source_table_ref_chunks = [
+ source_table_ref[i : i + 999] # noqa
+ for i in range(0, len(source_table_ref), 999) # noqa
+ ]
+
+ for i, source_table in enumerate(
+ tqdm(source_table_ref_chunks, desc="Copy Table Chunk")
+ ):
+ counter = 0
+ while counter < 10:
+ try:
+ with self.client["storage_staging"].batch():
+ for blob in source_table:
+ new_name = None
+ if new_table_id:
+ new_name = blob.name.replace(
+ self.table_id, new_table_id
+ )
+ self.bucket.copy_blob(
+ blob,
+ destination_bucket=destination_bucket,
+ new_name=new_name,
+ )
+ break
+ except Exception:
+ print(
+ f"Copy Table Chunk {i} | Attempt {counter}: copy operation starts again in 5 seconds...",
+ )
+ counter += 1
+ time.sleep(5)
+ traceback.print_exc(file=sys.stderr)
+ logger.success(
+ " {object} {object_id}_{mode} was {action} to {new_object_id}_{mode}!",
+ object_id=self.table_id,
+ new_object_id=new_table_id if new_table_id else self.table_id,
+ mode=mode,
+ object="Table",
+ action="copied",
+ )
+
copy_table(self, source_bucket_name='basedosdados', destination_bucket_name=None, mode='staging', new_table_id=None)
+
+
+Copies table from a source bucket to your bucket, sends request in batches.
+ +Parameters:
+Name | +Type | +Description | +Default | +
---|---|---|---|
source_bucket_name |
+ str |
+ The bucket name from which to copy data. You can change it +to copy from other external bucket. |
+ 'basedosdados' |
+
destination_bucket_name |
+ str |
+ Optional +The bucket name where data will be copied to. +If None, defaults to the bucket initialized when instantiating the Storage object (You can check it with the +Storage().bucket property) |
+ None |
+
mode |
+ str |
+ Folder of which dataset to update [raw|staging|header|auxiliary_files|architecture] +Folder of which dataset to update. Defaults to "staging". |
+ 'staging' |
+
new_table_id |
+ str |
+ Optional. +New table id to be copied to. If None, defaults to the table id initialized when instantiating the Storage object. |
+ None |
+
basedosdados/upload/storage.py
def copy_table(
+ self,
+ source_bucket_name="basedosdados",
+ destination_bucket_name=None,
+ mode="staging",
+ new_table_id=None,
+):
+ """Copies table from a source bucket to your bucket, sends request in batches.
+
+ Args:
+ source_bucket_name (str):
+ The bucket name from which to copy data. You can change it
+ to copy from other external bucket.
+
+ destination_bucket_name (str): Optional
+ The bucket name where data will be copied to.
+ If None, defaults to the bucket initialized when instantiating the Storage object (You can check it with the
+ Storage().bucket property)
+
+ mode (str): Folder of which dataset to update [raw|staging|header|auxiliary_files|architecture]
+ Folder of which dataset to update. Defaults to "staging".
+ new_table_id (str): Optional.
+ New table id to be copied to. If None, defaults to the table id initialized when instantiating the Storage object.
+ """
+
+ source_table_ref = list(
+ self.client["storage_staging"]
+ .bucket(source_bucket_name)
+ .list_blobs(prefix=f"{mode}/{self.dataset_id}/{self.table_id}/")
+ )
+
+ if not source_table_ref:
+ raise FileNotFoundError(
+ f"Could not find the requested table {self.dataset_id}.{self.table_id}"
+ )
+
+ if destination_bucket_name is None:
+ destination_bucket = self.bucket
+
+ else:
+ destination_bucket = self.client["storage_staging"].bucket(
+ destination_bucket_name
+ )
+
+ # Divides source_table_ref list for maximum batch request size
+ source_table_ref_chunks = [
+ source_table_ref[i : i + 999] # noqa
+ for i in range(0, len(source_table_ref), 999) # noqa
+ ]
+
+ for i, source_table in enumerate(
+ tqdm(source_table_ref_chunks, desc="Copy Table Chunk")
+ ):
+ counter = 0
+ while counter < 10:
+ try:
+ with self.client["storage_staging"].batch():
+ for blob in source_table:
+ new_name = None
+ if new_table_id:
+ new_name = blob.name.replace(
+ self.table_id, new_table_id
+ )
+ self.bucket.copy_blob(
+ blob,
+ destination_bucket=destination_bucket,
+ new_name=new_name,
+ )
+ break
+ except Exception:
+ print(
+ f"Copy Table Chunk {i} | Attempt {counter}: copy operation starts again in 5 seconds...",
+ )
+ counter += 1
+ time.sleep(5)
+ traceback.print_exc(file=sys.stderr)
+ logger.success(
+ " {object} {object_id}_{mode} was {action} to {new_object_id}_{mode}!",
+ object_id=self.table_id,
+ new_object_id=new_table_id if new_table_id else self.table_id,
+ mode=mode,
+ object="Table",
+ action="copied",
+ )
+
delete_file(self, filename, mode, partitions=None, not_found_ok=False)
+
+
+Deletes file from path <bucket_name>/<mode>/<dataset_id>/<table_id>/<partitions>/<filename>
.
Parameters:
+Name | +Type | +Description | +Default | +
---|---|---|---|
filename |
+ str |
+ Name of the file to be deleted |
+ required | +
mode |
+ str |
+ Folder of which dataset to update [raw|staging|header|auxiliary_files|architecture|all] |
+ required | +
partitions |
+ str, pathlib.PosixPath, or dict |
+ Optional. +Hive structured partition as a string or dict +
|
+ None |
+
not_found_ok |
+ bool |
+ Optional. +What to do if file not found |
+ False |
+
basedosdados/upload/storage.py
def delete_file(self, filename, mode, partitions=None, not_found_ok=False):
+ """Deletes file from path `<bucket_name>/<mode>/<dataset_id>/<table_id>/<partitions>/<filename>`.
+
+ Args:
+ filename (str): Name of the file to be deleted
+
+ mode (str): Folder of which dataset to update [raw|staging|header|auxiliary_files|architecture|all]
+
+ partitions (str, pathlib.PosixPath, or dict): Optional.
+ Hive structured partition as a string or dict
+
+ * str : `<key>=<value>/<key2>=<value2>`
+ * dict: `dict(key=value, key2=value2)`
+
+ not_found_ok (bool): Optional.
+ What to do if file not found
+ """
+
+ self._check_mode(mode)
+
+ mode = (
+ ["raw", "staging", "header", "auxiliary_files", "architecture"]
+ if mode == "all"
+ else [mode]
+ )
+
+ for m in mode:
+ blob = self.bucket.blob(self._build_blob_name(filename, m, partitions))
+
+ if blob.exists() or not blob.exists() and not not_found_ok:
+ blob.delete()
+ else:
+ return
+
+ logger.success(
+ " {object} {filename}_{mode} was {action}!",
+ filename=filename,
+ mode=mode,
+ object="File",
+ action="deleted",
+ )
+
delete_table(self, mode='staging', bucket_name=None, not_found_ok=False)
+
+
+Deletes a table from storage, sends request in batches.
+ +Parameters:
+Name | +Type | +Description | +Default | +
---|---|---|---|
mode |
+ str |
+ Folder of which dataset to update [raw|staging|header|auxiliary_files|architecture] +Folder of which dataset to update. Defaults to "staging". |
+ 'staging' |
+
bucket_name |
+ str |
+ The bucket name from which to delete the table. If None, defaults to the bucket initialized when instantiating the Storage object. +(You can check it with the Storage().bucket property) |
+ None |
+
not_found_ok |
+ bool |
+ Optional. +What to do if table not found |
+ False |
+
basedosdados/upload/storage.py
def delete_table(self, mode="staging", bucket_name=None, not_found_ok=False):
+ """Deletes a table from storage, sends request in batches.
+
+ Args:
+ mode (str): Folder of which dataset to update [raw|staging|header|auxiliary_files|architecture]
+ Folder of which dataset to update. Defaults to "staging".
+
+ bucket_name (str):
+ The bucket name from which to delete the table. If None, defaults to the bucket initialized when instantiating the Storage object.
+ (You can check it with the Storage().bucket property)
+
+ not_found_ok (bool): Optional.
+ What to do if table not found
+
+ """
+
+ prefix = f"{mode}/{self.dataset_id}/{self.table_id}/"
+
+ if bucket_name is not None:
+ table_blobs = list(
+ self.client["storage_staging"]
+ .bucket(f"{bucket_name}")
+ .list_blobs(prefix=prefix)
+ )
+
+ else:
+ table_blobs = list(self.bucket.list_blobs(prefix=prefix))
+
+ if not table_blobs:
+ if not_found_ok:
+ return
+ raise FileNotFoundError(
+ f"Could not find the requested table {self.dataset_id}.{self.table_id}"
+ )
+ # Divides table_blobs list for maximum batch request size
+ table_blobs_chunks = [
+ table_blobs[i : i + 999] for i in range(0, len(table_blobs), 999) # noqa
+ ]
+
+ for i, source_table in enumerate(
+ tqdm(table_blobs_chunks, desc="Delete Table Chunk")
+ ):
+ counter = 0
+ while counter < 10:
+ try:
+ with self.client["storage_staging"].batch():
+ for blob in source_table:
+ blob.delete()
+ break
+ except Exception:
+ print(
+ f"Delete Table Chunk {i} | Attempt {counter}: delete operation starts again in 5 seconds...",
+ )
+ time.sleep(5)
+ counter += 1
+ traceback.print_exc(file=sys.stderr)
+ logger.success(
+ " {object} {object_id}_{mode} was {action}!",
+ object_id=self.table_id,
+ mode=mode,
+ object="Table",
+ action="deleted",
+ )
+
download(self, filename='*', savepath='.', partitions=None, mode='staging', if_not_exists='raise')
+
+
+Download files from Google Storage from path mode
/dataset_id
/table_id
/partitions
/filename
and replicate folder hierarchy
+on save,
There are 5 modes:
+* raw
: should contain raw files from datasource
+* staging
: should contain pre-treated files ready to upload to BiqQuery
+* header
: should contain the header of the tables
+* auxiliary_files
: should contain auxiliary files from eache table
+* architecture
: should contain the architecture sheet of the tables
You can also use the partitions
argument to choose files from a partition
Parameters:
+Name | +Type | +Description | +Default | +
---|---|---|---|
filename |
+ str |
+ Optional +Specify which file to download. If "" , downloads all files within the bucket folder. Defaults to "". |
+ '*' |
+
savepath |
+ str |
+ Where you want to save the data on your computer. Must be a path to a directory. |
+ '.' |
+
partitions |
+ str, dict |
+ Optional +If downloading a single file, use this to specify the partition path from which to download. +
|
+ None |
+
mode |
+ str |
+ Optional +Folder of which dataset to update.[raw|staging|header|auxiliary_files|architecture] |
+ 'staging' |
+
if_not_exists |
+ str |
+ Optional. +What to do if data not found. +
|
+ 'raise' |
+
Exceptions:
+Type | +Description | +
---|---|
FileNotFoundError |
+ If the given path |
+
basedosdados/upload/storage.py
def download(
+ self,
+ filename="*",
+ savepath=".",
+ partitions=None,
+ mode="staging",
+ if_not_exists="raise",
+):
+ """Download files from Google Storage from path `mode`/`dataset_id`/`table_id`/`partitions`/`filename` and replicate folder hierarchy
+ on save,
+
+ There are 5 modes:
+ * `raw` : should contain raw files from datasource
+ * `staging` : should contain pre-treated files ready to upload to BiqQuery
+ * `header`: should contain the header of the tables
+ * `auxiliary_files`: should contain auxiliary files from eache table
+ * `architecture`: should contain the architecture sheet of the tables
+
+ You can also use the `partitions` argument to choose files from a partition
+
+ Args:
+ filename (str): Optional
+ Specify which file to download. If "*" , downloads all files within the bucket folder. Defaults to "*".
+
+ savepath (str):
+ Where you want to save the data on your computer. Must be a path to a directory.
+
+ partitions (str, dict): Optional
+ If downloading a single file, use this to specify the partition path from which to download.
+
+ * str : `<key>=<value>/<key2>=<value2>`
+ * dict: `dict(key=value, key2=value2)`
+
+
+ mode (str): Optional
+ Folder of which dataset to update.[raw|staging|header|auxiliary_files|architecture]
+
+ if_not_exists (str): Optional.
+ What to do if data not found.
+
+ * 'raise' : Raises FileNotFoundError.
+ * 'pass' : Do nothing and exit the function
+
+ Raises:
+ FileNotFoundError: If the given path `<mode>/<dataset_id>/<table_id>/<partitions>/<filename>` could not be found or there are no files to download.
+ """
+
+ # Prefix to locate files within the bucket
+ prefix = f"{mode}/{self.dataset_id}/{self.table_id}/"
+
+ # Add specific partition to search prefix
+ if partitions:
+ prefix += self._resolve_partitions(partitions)
+
+ # if no filename is passed, list all blobs within a given table
+ if filename != "*":
+ prefix += filename
+
+ blob_list = list(self.bucket.list_blobs(prefix=prefix))
+
+ # if there are no blobs matching the search raise FileNotFoundError or return
+ if not blob_list:
+ if if_not_exists == "raise":
+ raise FileNotFoundError(f"Could not locate files at {prefix}")
+ return
+
+ # download all blobs matching the search to given savepath
+ for blob in tqdm(blob_list, desc="Download Blob"):
+ # parse blob.name and get the csv file name
+ csv_name = blob.name.split("/")[-1]
+
+ # build folder path replicating storage hierarchy
+ blob_folder = blob.name.replace(csv_name, "")
+
+ # replicate folder hierarchy
+ savepath = Path(savepath)
+ (savepath / blob_folder).mkdir(parents=True, exist_ok=True)
+
+ # download blob to savepath
+ save_file_path = savepath / blob.name
+
+ blob.download_to_filename(filename=save_file_path)
+
+ logger.success(
+ " {object} {object_id}_{mode} was {action} at: {path}!",
+ object_id=self.dataset_id,
+ mode=mode,
+ object="File",
+ action="downloaded",
+ path={str(savepath)},
+ )
+
init(self, replace=False, very_sure=False)
+
+
+Initializes bucket and folders.
+Folder should be:
+raw
: that contains really raw datastaging
: preprocessed data ready to upload to BigQueryParameters:
+Name | +Type | +Description | +Default | +
---|---|---|---|
replace |
+ bool |
+ Optional. +Whether to replace if bucket already exists |
+ False |
+
very_sure |
+ bool |
+ Optional. +Are you aware that everything is going to be erased if you +replace the bucket? |
+ False |
+
Exceptions:
+Type | +Description | +
---|---|
Warning |
+ very_sure argument is still False. |
+
basedosdados/upload/storage.py
def init(self, replace=False, very_sure=False):
+ """Initializes bucket and folders.
+
+ Folder should be:
+
+ * `raw` : that contains really raw data
+ * `staging` : preprocessed data ready to upload to BigQuery
+
+ Args:
+ replace (bool): Optional.
+ Whether to replace if bucket already exists
+ very_sure (bool): Optional.
+ Are you aware that everything is going to be erased if you
+ replace the bucket?
+
+ Raises:
+ Warning: very_sure argument is still False.
+ """
+
+ if replace:
+ if not very_sure:
+ raise Warning(
+ "\n********************************************************"
+ "\nYou are trying to replace all the data that you have "
+ f"in bucket {self.bucket_name}.\nAre you sure?\n"
+ "If yes, add the flag --very_sure\n"
+ "********************************************************"
+ )
+ self.bucket.delete(force=True)
+
+ self.client["storage_staging"].create_bucket(self.bucket)
+
+ for folder in ["staging/", "raw/"]:
+ self.bucket.blob(folder).upload_from_string("")
+
upload(self, path, mode='all', partitions=None, if_exists='raise', chunk_size=None, **upload_args)
+
+
+Upload to storage at <bucket_name>/<mode>/<dataset_id>/<table_id>
. You can:
Add a single file setting path = <file_path>
.
Add a folder with multiple files setting path =
+ <folder_path>
. The folder should just contain the files and
+ no folders.
Add partitioned files setting path = <folder_path>
.
+ This folder must follow the hive partitioning scheme i.e.
+ <table_id>/<key>=<value>/<key2>=<value2>/<partition>.csv
+ (ex: mytable/country=brasil/year=2020/mypart.csv
).
Remember all files must follow a single schema. Otherwise, things +might fail in the future.
+There are 6 modes:
+raw
: should contain raw files from datasourcestaging
: should contain pre-treated files ready to upload to BiqQueryheader
: should contain the header of the tablesauxiliary_files
: should contain auxiliary files from eache tablearchitecture
: should contain the architecture sheet of the tablesall
: if no treatment is needed, use all
.Parameters:
+Name | +Type | +Description | +Default | +
---|---|---|---|
path |
+ str or pathlib.PosixPath |
+ Where to find the file or +folder that you want to upload to storage |
+ required | +
mode |
+ str |
+ Folder of which dataset to update [raw|staging|header|auxiliary_files|architecture|all] |
+ 'all' |
+
partitions |
+ str, pathlib.PosixPath, or dict |
+ Optional. +If adding a single file, use this to add it to a specific partition. +
|
+ None |
+
if_exists |
+ str |
+ Optional. +What to do if data exists +
|
+ 'raise' |
+
chunk_size |
+ int |
+ Optional +The size of a chunk of data whenever iterating (in bytes). +This must be a multiple of 256 KB per the API specification. +If not specified, the chunk_size of the blob itself is used. If that is not specified, a default value of 40 MB is used. |
+ None |
+
upload_args |
+ + | Extra arguments accepted by |
+ {} |
+
basedosdados/upload/storage.py
def upload(
+ self,
+ path,
+ mode="all",
+ partitions=None,
+ if_exists="raise",
+ chunk_size=None,
+ **upload_args,
+):
+ """Upload to storage at `<bucket_name>/<mode>/<dataset_id>/<table_id>`. You can:
+
+ * Add a single **file** setting `path = <file_path>`.
+
+ * Add a **folder** with multiple files setting `path =
+ <folder_path>`. *The folder should just contain the files and
+ no folders.*
+
+ * Add **partitioned files** setting `path = <folder_path>`.
+ This folder must follow the hive partitioning scheme i.e.
+ `<table_id>/<key>=<value>/<key2>=<value2>/<partition>.csv`
+ (ex: `mytable/country=brasil/year=2020/mypart.csv`).
+
+ *Remember all files must follow a single schema.* Otherwise, things
+ might fail in the future.
+
+ There are 6 modes:
+
+ * `raw` : should contain raw files from datasource
+ * `staging` : should contain pre-treated files ready to upload to BiqQuery
+ * `header`: should contain the header of the tables
+ * `auxiliary_files`: should contain auxiliary files from eache table
+ * `architecture`: should contain the architecture sheet of the tables
+ * `all`: if no treatment is needed, use `all`.
+
+ Args:
+ path (str or pathlib.PosixPath): Where to find the file or
+ folder that you want to upload to storage
+
+ mode (str): Folder of which dataset to update [raw|staging|header|auxiliary_files|architecture|all]
+
+ partitions (str, pathlib.PosixPath, or dict): Optional.
+ *If adding a single file*, use this to add it to a specific partition.
+
+ * str : `<key>=<value>/<key2>=<value2>`
+ * dict: `dict(key=value, key2=value2)`
+
+ if_exists (str): Optional.
+ What to do if data exists
+
+ * 'raise' : Raises Conflict exception
+ * 'replace' : Replace table
+ * 'pass' : Do nothing
+ chunk_size (int): Optional
+ The size of a chunk of data whenever iterating (in bytes).
+ This must be a multiple of 256 KB per the API specification.
+ If not specified, the chunk_size of the blob itself is used. If that is not specified, a default value of 40 MB is used.
+
+ upload_args ():
+ Extra arguments accepted by [`google.cloud.storage.blob.Blob.upload_from_file`](https://googleapis.dev/python/storage/latest/blobs.html?highlight=upload_from_filename#google.cloud.storage.blob.Blob.upload_from_filename)
+ """
+
+ if (self.dataset_id is None) or (self.table_id is None):
+ raise Exception("You need to pass dataset_id and table_id")
+
+ path = Path(path)
+
+ if path.is_dir():
+ paths = [
+ f
+ for f in path.glob("**/*")
+ if f.is_file() and f.suffix in [".csv", ".parquet", "parquet.gzip"]
+ ]
+
+ parts = [
+ (
+ filepath.as_posix()
+ .replace(path.as_posix() + "/", "")
+ .replace(str(filepath.name), "")
+ )
+ for filepath in paths
+ ]
+
+ else:
+ paths = [path]
+ parts = [partitions or None]
+
+ self._check_mode(mode)
+
+ mode = (
+ ["raw", "staging", "header", "auxiliary_files", "architecture"]
+ if mode == "all"
+ else [mode]
+ )
+ for m in mode:
+ for filepath, part in tqdm(list(zip(paths, parts)), desc="Uploading files"):
+ blob_name = self._build_blob_name(filepath.name, m, part)
+
+ blob = self.bucket.blob(blob_name, chunk_size=chunk_size)
+
+ if not blob.exists() or if_exists == "replace":
+ upload_args["timeout"] = upload_args.get("timeout", None)
+
+ blob.upload_from_filename(str(filepath), **upload_args)
+
+ elif if_exists == "pass":
+ pass
+
+ else:
+ raise BaseDosDadosException(
+ f"Data already exists at {self.bucket_name}/{blob_name}. "
+ "If you are using Storage.upload then set if_exists to "
+ "'replace' to overwrite data \n"
+ "If you are using Table.create then set if_storage_data_exists "
+ "to 'replace' to overwrite data."
+ )
+
+ logger.success(
+ " {object} {filename}_{mode} was {action}!",
+ filename=filepath.name,
+ mode=m,
+ object="File",
+ action="uploaded",
+ )
+
Module for manage dataset to the server.
+ + + +
+Dataset (Base)
+
+
+
+
+Manage datasets in BigQuery.
+ +basedosdados/upload/dataset.py
class Dataset(Base):
+ """
+ Manage datasets in BigQuery.
+ """
+
+ def __init__(self, dataset_id, **kwargs):
+ super().__init__(**kwargs)
+ self.dataset_id = dataset_id.replace("-", "_")
+
+ @property
+ @lru_cache
+ def dataset_config(self):
+ """
+ Dataset config file.
+ """
+ return self.backend.get_dataset_config(self.dataset_id)
+
+ def _loop_modes(self, mode="all"):
+ """
+ Loop modes.
+ """
+
+ def dataset_tag(m):
+ return f"_{m}" if m == "staging" else ""
+
+ mode = ["prod", "staging"] if mode == "all" else [mode]
+ return (
+ {
+ "client": self.client[f"bigquery_{m}"],
+ "id": f"{self.client[f'bigquery_{m}'].project}.{self.dataset_id}{dataset_tag(m)}",
+ "mode": m,
+ }
+ for m in mode
+ )
+
+ def _setup_dataset_object(self, dataset_id, location=None, mode="staging"):
+ """
+ Setup dataset object.
+ """
+
+ dataset = bigquery.Dataset(dataset_id)
+ if mode == "staging":
+ dataset_path = dataset_id.replace("_staging", "")
+ description = f"staging dataset for `{dataset_path}`"
+ labels = {"staging": True}
+ else:
+ try:
+ description = self.dataset_config.get("descriptionPt", "")
+ labels = {
+ tag.get("namePt"): True for tag in self.dataset_config.get("tags")
+ }
+ except BaseException:
+ logger.warning(
+ f"dataset {dataset_id} does not have a description in the API."
+ )
+ description = "description not available in the API."
+ labels = {}
+
+ dataset.description = description
+ dataset.labels = labels
+ dataset.location = location
+ return dataset
+
+ def publicize(self, mode="all", dataset_is_public=True):
+ """Changes IAM configuration to turn BigQuery dataset public.
+
+ Args:
+ mode (bool): Which dataset to create [prod|staging|all].
+ dataset_is_public (bool): Control if prod dataset is public or not. By default staging datasets like `dataset_id_staging` are not public.
+ """
+
+ for m in self._loop_modes(mode):
+ dataset = m["client"].get_dataset(m["id"])
+ entries = dataset.access_entries
+ # TODO https://github.com/basedosdados/mais/pull/1020
+ # TODO if staging dataset is private, the prod view can't acess it: if dataset_is_public and "staging" not in dataset.dataset_id:
+ if dataset_is_public:
+ if "staging" not in dataset.dataset_id:
+ entries.extend(
+ [
+ bigquery.AccessEntry(
+ role="roles/bigquery.dataViewer",
+ entity_type="iamMember",
+ entity_id="allUsers",
+ ),
+ bigquery.AccessEntry(
+ role="roles/bigquery.metadataViewer",
+ entity_type="iamMember",
+ entity_id="allUsers",
+ ),
+ bigquery.AccessEntry(
+ role="roles/bigquery.user",
+ entity_type="iamMember",
+ entity_id="allUsers",
+ ),
+ ]
+ )
+ else:
+ entries.extend(
+ [
+ bigquery.AccessEntry(
+ role="roles/bigquery.dataViewer",
+ entity_type="iamMember",
+ entity_id="allUsers",
+ ),
+ ]
+ )
+ dataset.access_entries = entries
+ m["client"].update_dataset(dataset, ["access_entries"])
+ logger.success(
+ " {object} {object_id}_{mode} was {action}!",
+ object_id=self.dataset_id,
+ mode=m["mode"],
+ object="Dataset",
+ action="publicized",
+ )
+
+ def exists(self, mode="staging"):
+ """
+ Check if dataset exists.
+ """
+ ref_dataset_id = (
+ self.dataset_id if mode == "prod" else self.dataset_id + "_staging"
+ )
+ try:
+ ref = self.client[f"bigquery_{mode}"].get_dataset(ref_dataset_id)
+ except Exception:
+ ref = None
+ return bool(ref)
+
+ def create(
+ self, mode="all", if_exists="raise", dataset_is_public=True, location=None
+ ):
+ """Creates BigQuery datasets given `dataset_id`.
+
+ It can create two datasets:
+
+ * `<dataset_id>` (mode = 'prod')
+ * `<dataset_id>_staging` (mode = 'staging')
+
+ If `mode` is all, it creates both.
+
+ Args:
+ mode (str): Optional. Which dataset to create [prod|staging|all].
+ if_exists (str): Optional. What to do if dataset exists
+
+ * raise : Raises Conflict exception
+ * replace : Drop all tables and replace dataset
+ * update : Update dataset description
+ * pass : Do nothing
+
+ dataset_is_public (bool): Control if prod dataset is public or not. By default staging datasets like `dataset_id_staging` are not public.
+
+ location (str): Optional. Location of dataset data.
+ List of possible region names locations: https://cloud.google.com/bigquery/docs/locations
+
+
+ Raises:
+ Warning: Dataset already exists and if_exists is set to `raise`
+ """
+
+ # Set dataset_id to the ID of the dataset to create.
+ for m in self._loop_modes(mode):
+ if if_exists == "replace":
+ self.delete(mode=m["mode"])
+ elif if_exists == "update":
+ self.update(mode=m["mode"])
+ continue
+
+ # Send the dataset to the API for creation, with an explicit timeout.
+ # Raises google.api_core.exceptions.Conflict if the Dataset already
+ # exists within the project.
+ try:
+ if not self.exists(mode=m["mode"]):
+ # Construct a full Dataset object to send to the API.
+ dataset_obj = self._setup_dataset_object(
+ dataset_id=m["id"], location=location, mode=m["mode"]
+ )
+ m["client"].create_dataset(dataset_obj) # Make an API request.
+ logger.success(
+ " {object} {object_id}_{mode} was {action}!",
+ object_id=self.dataset_id,
+ mode=m["mode"],
+ object="Dataset",
+ action="created",
+ )
+ # Make prod dataset public
+ self.publicize(dataset_is_public=dataset_is_public, mode=m["mode"])
+ except Conflict as e:
+ if if_exists == "pass":
+ continue
+ raise Conflict(f"Dataset {self.dataset_id} already exists") from e
+
+ def delete(self, mode="all"):
+ """Deletes dataset in BigQuery. Toogle mode to choose which dataset to delete.
+
+ Args:
+ mode (str): Optional. Which dataset to delete [prod|staging|all]
+ """
+
+ for m in self._loop_modes(mode):
+ m["client"].delete_dataset(m["id"], delete_contents=True, not_found_ok=True)
+ logger.info(
+ " {object} {object_id}_{mode} was {action}!",
+ object_id=self.dataset_id,
+ mode=m["mode"],
+ object="Dataset",
+ action="deleted",
+ )
+
+ def update(self, mode="all", location=None):
+ """Update dataset description. Toogle mode to choose which dataset to update.
+
+ Args:
+ mode (str): Optional. Which dataset to update [prod|staging|all]
+ location (str): Optional. Location of dataset data.
+ List of possible region names locations: https://cloud.google.com/bigquery/docs/locations
+
+ """
+
+ for m in self._loop_modes(mode):
+ # Send the dataset to the API to update, with an explicit timeout.
+ # Raises google.api_core.exceptions.Conflict if the Dataset already
+ # exists within the project.
+ m["client"].update_dataset(
+ self._setup_dataset_object(m["id"], location=location, mode=m["mode"]),
+ fields=["description"],
+ ) # Make an API request.
+
+ logger.success(
+ " {object} {object_id}_{mode} was {action}!",
+ object_id=self.dataset_id,
+ mode=m["mode"],
+ object="Dataset",
+ action="updated",
+ )
+
dataset_config
+
+
+ property
+ readonly
+
+
+Dataset config file.
+create(self, mode='all', if_exists='raise', dataset_is_public=True, location=None)
+
+
+Creates BigQuery datasets given dataset_id
.
It can create two datasets:
+<dataset_id>
(mode = 'prod')<dataset_id>_staging
(mode = 'staging')If mode
is all, it creates both.
Parameters:
+Name | +Type | +Description | +Default | +
---|---|---|---|
mode |
+ str |
+ Optional. Which dataset to create [prod|staging|all]. |
+ 'all' |
+
if_exists |
+ str |
+ Optional. What to do if dataset exists +
|
+ 'raise' |
+
dataset_is_public |
+ bool |
+ Control if prod dataset is public or not. By default staging datasets like |
+ True |
+
location |
+ str |
+ Optional. Location of dataset data. +List of possible region names locations: https://cloud.google.com/bigquery/docs/locations |
+ None |
+
Exceptions:
+Type | +Description | +
---|---|
Warning |
+ Dataset already exists and if_exists is set to |
+
basedosdados/upload/dataset.py
def create(
+ self, mode="all", if_exists="raise", dataset_is_public=True, location=None
+):
+ """Creates BigQuery datasets given `dataset_id`.
+
+ It can create two datasets:
+
+ * `<dataset_id>` (mode = 'prod')
+ * `<dataset_id>_staging` (mode = 'staging')
+
+ If `mode` is all, it creates both.
+
+ Args:
+ mode (str): Optional. Which dataset to create [prod|staging|all].
+ if_exists (str): Optional. What to do if dataset exists
+
+ * raise : Raises Conflict exception
+ * replace : Drop all tables and replace dataset
+ * update : Update dataset description
+ * pass : Do nothing
+
+ dataset_is_public (bool): Control if prod dataset is public or not. By default staging datasets like `dataset_id_staging` are not public.
+
+ location (str): Optional. Location of dataset data.
+ List of possible region names locations: https://cloud.google.com/bigquery/docs/locations
+
+
+ Raises:
+ Warning: Dataset already exists and if_exists is set to `raise`
+ """
+
+ # Set dataset_id to the ID of the dataset to create.
+ for m in self._loop_modes(mode):
+ if if_exists == "replace":
+ self.delete(mode=m["mode"])
+ elif if_exists == "update":
+ self.update(mode=m["mode"])
+ continue
+
+ # Send the dataset to the API for creation, with an explicit timeout.
+ # Raises google.api_core.exceptions.Conflict if the Dataset already
+ # exists within the project.
+ try:
+ if not self.exists(mode=m["mode"]):
+ # Construct a full Dataset object to send to the API.
+ dataset_obj = self._setup_dataset_object(
+ dataset_id=m["id"], location=location, mode=m["mode"]
+ )
+ m["client"].create_dataset(dataset_obj) # Make an API request.
+ logger.success(
+ " {object} {object_id}_{mode} was {action}!",
+ object_id=self.dataset_id,
+ mode=m["mode"],
+ object="Dataset",
+ action="created",
+ )
+ # Make prod dataset public
+ self.publicize(dataset_is_public=dataset_is_public, mode=m["mode"])
+ except Conflict as e:
+ if if_exists == "pass":
+ continue
+ raise Conflict(f"Dataset {self.dataset_id} already exists") from e
+
delete(self, mode='all')
+
+
+Deletes dataset in BigQuery. Toogle mode to choose which dataset to delete.
+ +Parameters:
+Name | +Type | +Description | +Default | +
---|---|---|---|
mode |
+ str |
+ Optional. Which dataset to delete [prod|staging|all] |
+ 'all' |
+
basedosdados/upload/dataset.py
def delete(self, mode="all"):
+ """Deletes dataset in BigQuery. Toogle mode to choose which dataset to delete.
+
+ Args:
+ mode (str): Optional. Which dataset to delete [prod|staging|all]
+ """
+
+ for m in self._loop_modes(mode):
+ m["client"].delete_dataset(m["id"], delete_contents=True, not_found_ok=True)
+ logger.info(
+ " {object} {object_id}_{mode} was {action}!",
+ object_id=self.dataset_id,
+ mode=m["mode"],
+ object="Dataset",
+ action="deleted",
+ )
+
exists(self, mode='staging')
+
+
+Check if dataset exists.
+ +basedosdados/upload/dataset.py
def exists(self, mode="staging"):
+ """
+ Check if dataset exists.
+ """
+ ref_dataset_id = (
+ self.dataset_id if mode == "prod" else self.dataset_id + "_staging"
+ )
+ try:
+ ref = self.client[f"bigquery_{mode}"].get_dataset(ref_dataset_id)
+ except Exception:
+ ref = None
+ return bool(ref)
+
publicize(self, mode='all', dataset_is_public=True)
+
+
+Changes IAM configuration to turn BigQuery dataset public.
+ +Parameters:
+Name | +Type | +Description | +Default | +
---|---|---|---|
mode |
+ bool |
+ Which dataset to create [prod|staging|all]. |
+ 'all' |
+
dataset_is_public |
+ bool |
+ Control if prod dataset is public or not. By default staging datasets like |
+ True |
+
basedosdados/upload/dataset.py
def publicize(self, mode="all", dataset_is_public=True):
+ """Changes IAM configuration to turn BigQuery dataset public.
+
+ Args:
+ mode (bool): Which dataset to create [prod|staging|all].
+ dataset_is_public (bool): Control if prod dataset is public or not. By default staging datasets like `dataset_id_staging` are not public.
+ """
+
+ for m in self._loop_modes(mode):
+ dataset = m["client"].get_dataset(m["id"])
+ entries = dataset.access_entries
+ # TODO https://github.com/basedosdados/mais/pull/1020
+ # TODO if staging dataset is private, the prod view can't acess it: if dataset_is_public and "staging" not in dataset.dataset_id:
+ if dataset_is_public:
+ if "staging" not in dataset.dataset_id:
+ entries.extend(
+ [
+ bigquery.AccessEntry(
+ role="roles/bigquery.dataViewer",
+ entity_type="iamMember",
+ entity_id="allUsers",
+ ),
+ bigquery.AccessEntry(
+ role="roles/bigquery.metadataViewer",
+ entity_type="iamMember",
+ entity_id="allUsers",
+ ),
+ bigquery.AccessEntry(
+ role="roles/bigquery.user",
+ entity_type="iamMember",
+ entity_id="allUsers",
+ ),
+ ]
+ )
+ else:
+ entries.extend(
+ [
+ bigquery.AccessEntry(
+ role="roles/bigquery.dataViewer",
+ entity_type="iamMember",
+ entity_id="allUsers",
+ ),
+ ]
+ )
+ dataset.access_entries = entries
+ m["client"].update_dataset(dataset, ["access_entries"])
+ logger.success(
+ " {object} {object_id}_{mode} was {action}!",
+ object_id=self.dataset_id,
+ mode=m["mode"],
+ object="Dataset",
+ action="publicized",
+ )
+
update(self, mode='all', location=None)
+
+
+Update dataset description. Toogle mode to choose which dataset to update.
+ +Parameters:
+Name | +Type | +Description | +Default | +
---|---|---|---|
mode |
+ str |
+ Optional. Which dataset to update [prod|staging|all] |
+ 'all' |
+
location |
+ str |
+ Optional. Location of dataset data. +List of possible region names locations: https://cloud.google.com/bigquery/docs/locations |
+ None |
+
basedosdados/upload/dataset.py
def update(self, mode="all", location=None):
+ """Update dataset description. Toogle mode to choose which dataset to update.
+
+ Args:
+ mode (str): Optional. Which dataset to update [prod|staging|all]
+ location (str): Optional. Location of dataset data.
+ List of possible region names locations: https://cloud.google.com/bigquery/docs/locations
+
+ """
+
+ for m in self._loop_modes(mode):
+ # Send the dataset to the API to update, with an explicit timeout.
+ # Raises google.api_core.exceptions.Conflict if the Dataset already
+ # exists within the project.
+ m["client"].update_dataset(
+ self._setup_dataset_object(m["id"], location=location, mode=m["mode"]),
+ fields=["description"],
+ ) # Make an API request.
+
+ logger.success(
+ " {object} {object_id}_{mode} was {action}!",
+ object_id=self.dataset_id,
+ mode=m["mode"],
+ object="Dataset",
+ action="updated",
+ )
+
Class for manage tables in Storage and Big Query
+ + + +
+Table (Base)
+
+
+
+
+Manage tables in Google Cloud Storage and BigQuery.
+ +basedosdados/upload/table.py
class Table(Base):
+ """
+ Manage tables in Google Cloud Storage and BigQuery.
+ """
+
+ def __init__(self, dataset_id, table_id, **kwargs):
+ super().__init__(**kwargs)
+
+ self.table_id = table_id.replace("-", "_")
+ self.dataset_id = dataset_id.replace("-", "_")
+ self.table_full_name = dict(
+ prod=f"{self.client['bigquery_prod'].project}.{self.dataset_id}.{self.table_id}",
+ staging=f"{self.client['bigquery_staging'].project}.{self.dataset_id}_staging.{self.table_id}",
+ )
+ self.table_full_name.update(dict(all=deepcopy(self.table_full_name)))
+
+ @property
+ @lru_cache(256)
+ def table_config(self):
+ """
+ Load table config
+ """
+ # return self._load_yaml(self.table_folder / "table_config.yaml")
+ return self.backend.get_table_config(self.dataset_id, self.table_id)
+
+ def _get_table_obj(self, mode):
+ """
+ Get table object from BigQuery
+ """
+
+ return self.client[f"bigquery_{mode}"].get_table(self.table_full_name[mode])
+
+ def _is_partitioned(
+ self, data_sample_path=None, source_format=None, csv_delimiter=None
+ ):
+ if data_sample_path is not None:
+ table_columns = self._get_columns_from_data(
+ data_sample_path=data_sample_path,
+ source_format=source_format,
+ csv_delimiter=csv_delimiter,
+ mode="staging",
+ )
+ else:
+ table_columns = self._get_columns_metadata_from_api()
+
+ return bool(table_columns.get("partition_columns", []))
+
+ def _load_schema_from_json(
+ self,
+ columns=None,
+ ):
+ schema = []
+
+ for col in columns:
+ # ref: https://cloud.google.com/python/docs/reference/bigquery/latest/google.cloud.bigquery.schema.SchemaField
+ if col.get("name") is None:
+ msg = "Columns must have a name! Check your data files for columns without name"
+ raise BaseDosDadosException(msg)
+
+ schema.append(
+ SchemaField(
+ name=col.get("name"),
+ field_type=col.get("type"),
+ description=col.get("description", None),
+ )
+ )
+ return schema
+
+ def _load_staging_schema_from_data(
+ self, data_sample_path=None, source_format="csv", csv_delimiter=","
+ ):
+ """
+ Generate schema from columns metadata in data sample
+ """
+
+ if self.table_exists(mode="staging"):
+ logger.warning(
+ " {object} {object_id} allready exists, replacing schema!",
+ object_id=self.table_id,
+ object="Table",
+ )
+
+ table_columns = self._get_columns_from_data(
+ data_sample_path=data_sample_path,
+ source_format=source_format,
+ csv_delimiter=csv_delimiter,
+ mode="staging",
+ )
+
+ return self._load_schema_from_json(columns=table_columns.get("columns"))
+
+ def _load_schema_from_bq(self, mode="staging"):
+ """Load schema from table config
+
+ Args:
+ mode (bool): Which dataset to create [prod|staging].
+
+ """
+ table_columns = self._get_columns_from_bq()
+ columns = table_columns.get("partition_columns") + table_columns.get("columns")
+ return self._load_schema_from_json(columns=columns)
+
+ def _load_schema_from_api(self, mode="staging"):
+ """Load schema from table config
+
+ Args:
+ mode (bool): Which dataset to create [prod|staging].
+
+ """
+ if self.table_exists(mode=mode):
+ logger.warning(
+ " {object} {object_id} allready exists, replacing schema!",
+ object_id=self.table_id,
+ object="Table",
+ )
+
+ table_columns = self._get_columns_metadata_from_api()
+ columns = table_columns.get("partition_columns") + table_columns.get("columns")
+
+ return self._load_schema_from_json(columns=columns)
+
+ def _get_columns_from_data(
+ self,
+ data_sample_path=None,
+ source_format="csv",
+ csv_delimiter=",",
+ mode="staging",
+ ): # sourcery skip: low-code-quality
+ """
+ Get the partition columns from the structure of data_sample_path.
+
+ Args:
+ data_sample_path (str, pathlib.PosixPath): Optional.
+ Data sample path to auto complete columns names
+ It supports Comma Delimited CSV, Apache Avro and
+ Apache Parquet.
+ source_format (str): Optional
+ Data source format. Only 'csv', 'avro' and 'parquet'
+ are supported. Defaults to 'csv'.
+ """
+
+ partition_columns = []
+ if isinstance(
+ data_sample_path,
+ (
+ str,
+ Path,
+ ),
+ ):
+ # Check if partitioned and get data sample and partition columns
+ data_sample_path = Path(data_sample_path)
+
+ if data_sample_path.is_dir():
+ data_sample_path = [
+ f
+ for f in data_sample_path.glob("**/*")
+ if f.is_file() and f.suffix == f".{source_format}"
+ ][0]
+
+ partition_columns = [
+ k.split("=")[0]
+ for k in data_sample_path.as_posix().split("/")
+ if "=" in k
+ ]
+ columns = Datatype(source_format=source_format).header(
+ data_sample_path=data_sample_path, csv_delimiter=csv_delimiter
+ )
+
+ return {
+ "columns": [{"name": col, "type": "STRING"} for col in columns],
+ "partition_columns": [
+ {"name": col, "type": "STRING"} for col in partition_columns
+ ],
+ }
+
+ def _get_columns_metadata_from_api(
+ self,
+ ):
+ """
+ Get columns and partition columns from API.
+ """
+ table_columns = self.table_config.get("columns", {})
+ columns = [col for col in table_columns if col.get("isPartition", {}) is False]
+
+ partition_columns = [
+ col for col in table_columns if col.get("isPartition", {}) is True
+ ]
+
+ return {
+ "columns": [
+ {
+ "name": col.get("name"),
+ "type": col.get("bigqueryType").get("name"),
+ "description": col.get("descriptionPt"),
+ }
+ for col in columns
+ ],
+ "partition_columns": [
+ {
+ "name": col.get("name"),
+ "type": col.get("bigqueryType").get("name"),
+ "description": col.get("descriptionPt"),
+ }
+ for col in partition_columns
+ ],
+ }
+
+ def _parser_blobs_to_partition_dict(self) -> dict:
+ """
+ Extracts the partition information from the blobs.
+ """
+
+ if not self.table_exists(mode="staging"):
+ return
+ blobs = (
+ self.client["storage_staging"]
+ .bucket(self.bucket_name)
+ .list_blobs(prefix=f"staging/{self.dataset_id}/{self.table_id}/")
+ )
+ partitions_dict = {}
+ # only needs the first bloob
+ for blob in blobs:
+ for folder in blob.name.split("/"):
+ if "=" in folder:
+ key = folder.split("=")[0]
+ value = folder.split("=")
+ try:
+ partitions_dict[key].append(value)
+ except KeyError:
+ partitions_dict[key] = [value]
+ return partitions_dict
+
+ def _get_columns_from_bq(self, mode="staging"):
+ if not self.table_exists(mode=mode):
+ msg = f"Table {self.dataset_id}.{self.table_id} does not exist in {mode}, please create first!"
+ raise logger.error(msg)
+ else:
+ schema = self._get_table_obj(mode=mode).schema
+
+ partition_dict = self._parser_blobs_to_partition_dict()
+
+ if partition_dict:
+ partition_columns = list(partition_dict.keys())
+ else:
+ partition_columns = []
+
+ return {
+ "columns": [
+ {
+ "name": col.name,
+ "type": col.field_type,
+ "description": col.description,
+ }
+ for col in schema
+ if col.name not in partition_columns
+ ],
+ "partition_columns": [
+ {
+ "name": col.name,
+ "type": col.field_type,
+ "description": col.description,
+ }
+ for col in schema
+ if col.name in partition_columns
+ ],
+ }
+
+ def _get_cross_columns_from_bq_api(self):
+ bq = self._get_columns_from_bq(mode="staging")
+ bq_columns = bq.get("partition_columns") + bq.get("columns")
+
+ api = self._get_columns_metadata_from_api()
+ api_columns = api.get("partition_columns") + api.get("columns")
+
+ if api_columns != []:
+ for bq_col in bq_columns:
+ for api_col in api_columns:
+ if bq_col.get("name") == api_col.get("name"):
+ bq_col["type"] = api_col.get("type")
+ bq_col["description"] = api_col.get("description")
+
+ return bq_columns
+
+ def _make_publish_sql(self):
+ """Create publish.sql with columns and bigquery_type"""
+
+ # publish.sql header and instructions
+ publish_txt = """
+ /*
+ Query para publicar a tabela.
+
+ Esse é o lugar para:
+ - modificar nomes, ordem e tipos de colunas
+ - dar join com outras tabelas
+ - criar colunas extras (e.g. logs, proporções, etc.)
+
+ Qualquer coluna definida aqui deve também existir em `table_config.yaml`.
+
+ # Além disso, sinta-se à vontade para alterar alguns nomes obscuros
+ # para algo um pouco mais explícito.
+
+ TIPOS:
+ - Para modificar tipos de colunas, basta substituir STRING por outro tipo válido.
+ - Exemplo: `SAFE_CAST(column_name AS NUMERIC) column_name`
+ - Mais detalhes: https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types
+ */
+ """
+
+ # table_columns = self._get_columns_from_api(mode="staging")
+
+ columns = self._get_cross_columns_from_bq_api()
+
+ # remove triple quotes extra space
+ publish_txt = inspect.cleandoc(publish_txt)
+ publish_txt = textwrap.dedent(publish_txt)
+
+ # add create table statement
+ project_id_prod = self.client["bigquery_prod"].project
+ publish_txt += f"\n\nCREATE OR REPLACE VIEW {project_id_prod}.{self.dataset_id}.{self.table_id} AS\nSELECT \n"
+
+ # sort columns by is_partition, partitions_columns come first
+
+ # add columns in publish.sql
+ for col in columns:
+ name = col.get("name")
+ bigquery_type = (
+ "STRING" if col.get("type") is None else col.get("type").upper()
+ )
+
+ publish_txt += f"SAFE_CAST({name} AS {bigquery_type}) {name},\n"
+ # remove last comma
+ publish_txt = publish_txt[:-2] + "\n"
+
+ # add from statement
+ project_id_staging = self.client["bigquery_staging"].project
+ publish_txt += (
+ f"FROM {project_id_staging}.{self.dataset_id}_staging.{self.table_id} AS t"
+ )
+
+ return publish_txt
+
+ def table_exists(self, mode):
+ """Check if table exists in BigQuery.
+
+ Args:
+ mode (str): Which dataset to check [prod|staging].
+ """
+
+ try:
+ ref = self._get_table_obj(mode=mode)
+ except google.api_core.exceptions.NotFound:
+ ref = None
+
+ return bool(ref)
+
+ def _get_biglake_connection(
+ self, set_biglake_connection_permissions=True, location=None, mode="staging"
+ ):
+ connection = Connection(name="biglake", location=location, mode="staging")
+ if not connection.exists:
+ try:
+ logger.info("Creating BigLake connection...")
+ connection.create()
+ logger.success("BigLake connection created!")
+ except google.api_core.exceptions.Forbidden as exc:
+ logger.error(
+ "You don't have permission to create a BigLake connection. "
+ "Please contact an admin to create one for you."
+ )
+ raise BaseDosDadosException(
+ "You don't have permission to create a BigLake connection. "
+ "Please contact an admin to create one for you."
+ ) from exc
+ except Exception as exc:
+ logger.error(
+ "Something went wrong while creating the BigLake connection. "
+ "Please contact an admin to create one for you."
+ )
+ raise BaseDosDadosException(
+ "Something went wrong while creating the BigLake connection. "
+ "Please contact an admin to create one for you."
+ ) from exc
+ if set_biglake_connection_permissions:
+ try:
+ logger.info("Setting permissions for BigLake service account...")
+ connection.set_biglake_permissions()
+ logger.success("Permissions set successfully!")
+ except google.api_core.exceptions.Forbidden as exc:
+ logger.error(
+ "Could not set permissions for BigLake service account. "
+ "Please make sure you have permissions to grant roles/storage.objectViewer"
+ f" to the BigLake service account. ({connection.service_account})."
+ " If you don't, please ask an admin to do it for you or set "
+ "set_biglake_connection_permissions=False."
+ )
+ raise BaseDosDadosException(
+ "Could not set permissions for BigLake service account. "
+ "Please make sure you have permissions to grant roles/storage.objectViewer"
+ f" to the BigLake service account. ({connection.service_account})."
+ " If you don't, please ask an admin to do it for you or set "
+ "set_biglake_connection_permissions=False."
+ ) from exc
+ except Exception as exc:
+ logger.error(
+ "Something went wrong while setting permissions for BigLake service account. "
+ "Please make sure you have permissions to grant roles/storage.objectViewer"
+ f" to the BigLake service account. ({connection.service_account})."
+ " If you don't, please ask an admin to do it for you or set "
+ "set_biglake_connection_permissions=False."
+ )
+ raise BaseDosDadosException(
+ "Something went wrong while setting permissions for BigLake service account. "
+ "Please make sure you have permissions to grant roles/storage.objectViewer"
+ f" to the BigLake service account. ({connection.service_account})."
+ " If you don't, please ask an admin to do it for you or set "
+ "set_biglake_connection_permissions=False."
+ ) from exc
+
+ return connection
+
+ def _get_table_description(self, mode="staging"):
+ """Adds table description to BigQuery table.
+
+ Args:
+ table_obj (google.cloud.bigquery.table.Table): Table object.
+ mode (str): Which dataset to check [prod|staging].
+ """
+ table_path = self.table_full_name["prod"]
+ if mode == "staging":
+ description = f"staging table for `{table_path}`"
+ else:
+ try:
+ description = self.table_config.get("descriptionPt", "")
+ except BaseException:
+ logger.warning(
+ f"table {self.table_id} does not have a description in the API."
+ )
+ description = "description not available in the API."
+
+ return description
+
+ def create(
+ self,
+ path=None,
+ source_format="csv",
+ csv_delimiter=",",
+ csv_skip_leading_rows=1,
+ csv_allow_jagged_rows=False,
+ if_table_exists="raise",
+ if_storage_data_exists="raise",
+ if_dataset_exists="pass",
+ dataset_is_public=True,
+ location=None,
+ chunk_size=None,
+ biglake_table=False,
+ set_biglake_connection_permissions=True,
+ ):
+ """Creates a BigQuery table in the staging dataset.
+
+ If a path is provided, data is automatically saved in storage,
+ and a datasets folder and BigQuery location are created, in addition to creating
+ the table and its configuration files.
+
+ The new table is located at `<dataset_id>_staging.<table_id>` in BigQuery.
+
+ Data can be found in Storage at `<bucket_name>/staging/<dataset_id>/<table_id>/*`
+ and is used to build the table.
+
+ The following data types are supported:
+
+ - Comma-Delimited CSV
+ - Apache Avro
+ - Apache Parquet
+
+ Data can also be partitioned following the Hive partitioning scheme
+ `<key1>=<value1>/<key2>=<value2>`; for example,
+ `year=2012/country=BR`. The partition is automatically detected by searching for `partitions`
+ in the `table_config.yaml` file.
+
+ Args:
+ path (str or pathlib.PosixPath): The path to the file to be uploaded to create the table.
+ source_format (str): Optional. The format of the data source. Only 'csv', 'avro', and 'parquet'
+ are supported. Defaults to 'csv'.
+ csv_delimiter (str): Optional.
+ The separator for fields in a CSV file. The separator can be any ISO-8859-1
+ single-byte character. Defaults to ','.
+ csv_skip_leading_rows(int): Optional.
+ The number of rows at the top of a CSV file that BigQuery will skip when loading the data.
+ Defaults to 1.
+ csv_allow_jagged_rows (bool): Optional.
+ Indicates if BigQuery should allow extra values that are not represented in the table schema.
+ Defaults to False.
+ if_table_exists (str): Optional. Determines what to do if the table already exists:
+
+ * 'raise' : Raises a Conflict exception
+ * 'replace' : Replaces the table
+ * 'pass' : Does nothing
+ if_storage_data_exists (str): Optional. Determines what to do if the data already exists on your bucket:
+
+ * 'raise' : Raises a Conflict exception
+ * 'replace' : Replaces the table
+ * 'pass' : Does nothing
+ if_dataset_exists (str): Optional. Determines what to do if the dataset already exists:
+
+ * 'raise' : Raises a Conflict exception
+ * 'replace' : Replaces the dataset
+ * 'pass' : Does nothing
+ dataset_is_public (bool): Optional. Controls if the prod dataset is public or not. By default, staging datasets like `dataset_id_staging` are not public.
+ location (str): Optional. The location of the dataset data. List of possible region names locations: https://cloud.google.com/bigquery/docs/locations
+ chunk_size (int): Optional. The size of a chunk of data whenever iterating (in bytes). This must be a multiple of 256 KB per the API specification.
+ If not specified, the chunk_size of the blob itself is used. If that is not specified, a default value of 40 MB is used.
+ biglake_table (bool): Optional. Sets this as a BigLake table. BigLake tables allow end-users to query from external data (such as GCS) even if
+ they don't have access to the source data. IAM is managed like any other BigQuery native table. See https://cloud.google.com/bigquery/docs/biglake-intro for more on BigLake.
+ set_biglake_connection_permissions (bool): Optional. If set to `True`, attempts to grant the BigLake connection service account access to the table's data in GCS.
+
+ """
+
+ if path is None:
+ # Look if table data already exists at Storage
+ data = self.client["storage_staging"].list_blobs(
+ self.bucket_name, prefix=f"staging/{self.dataset_id}/{self.table_id}"
+ )
+
+ # Raise: Cannot create table without external data
+ if not data:
+ raise BaseDosDadosException(
+ "You must provide a path for uploading data"
+ )
+
+ # Add data to storage
+ if isinstance(
+ path,
+ (
+ str,
+ Path,
+ ),
+ ):
+ Storage(
+ dataset_id=self.dataset_id,
+ table_id=self.table_id,
+ config_path=self.config_path,
+ bucket_name=self.bucket_name,
+ ).upload(
+ path=path,
+ mode="staging",
+ if_exists=if_storage_data_exists,
+ chunk_size=chunk_size,
+ )
+
+ # Create Dataset if it doesn't exist
+
+ dataset_obj = Dataset(
+ self.dataset_id,
+ )
+
+ dataset_obj.create(
+ if_exists=if_dataset_exists,
+ mode="all",
+ location=location,
+ dataset_is_public=dataset_is_public,
+ )
+
+ if biglake_table:
+ biglake_connection = self._get_biglake_connection(
+ set_biglake_connection_permissions=set_biglake_connection_permissions,
+ location=location,
+ mode="staging",
+ )
+ biglake_connection_id = biglake_connection.connection_id
+
+ table = bigquery.Table(self.table_full_name["staging"])
+
+ table.description = self._get_table_description(mode="staging")
+
+ table.external_data_configuration = Datatype(
+ dataset_id=self.dataset_id,
+ table_id=self.table_id,
+ schema=self._load_staging_schema_from_data(
+ data_sample_path=path,
+ source_format=source_format,
+ csv_delimiter=csv_delimiter,
+ ),
+ source_format=source_format,
+ csv_skip_leading_rows=csv_skip_leading_rows,
+ csv_delimiter=csv_delimiter,
+ csv_allow_jagged_rows=csv_allow_jagged_rows,
+ mode="staging",
+ bucket_name=self.bucket_name,
+ partitioned=self._is_partitioned(
+ data_sample_path=path,
+ source_format=source_format,
+ csv_delimiter=csv_delimiter,
+ ),
+ biglake_connection_id=biglake_connection_id if biglake_table else None,
+ ).external_config
+
+ # When using BigLake tables, schema must be provided to the `Table` object
+ if biglake_table:
+ table.schema = self._load_staging_schema_from_data(
+ data_sample_path=path,
+ source_format=source_format,
+ csv_delimiter=csv_delimiter,
+ )
+ logger.info(f"Using BigLake connection {biglake_connection_id}")
+
+ # Lookup if table alreay exists
+ table_ref = None
+ with contextlib.suppress(google.api_core.exceptions.NotFound):
+ table_ref = self.client["bigquery_staging"].get_table(
+ self.table_full_name["staging"]
+ )
+
+ if isinstance(table_ref, google.cloud.bigquery.table.Table):
+ if if_table_exists == "pass":
+ return None
+
+ if if_table_exists == "raise":
+ raise FileExistsError(
+ "Table already exists, choose replace if you want to overwrite it"
+ )
+
+ if if_table_exists == "replace" and self.table_exists(mode="staging"):
+ self.delete(mode="staging")
+
+ try:
+ self.client["bigquery_staging"].create_table(table)
+ except google.api_core.exceptions.Forbidden as exc:
+ if biglake_table:
+ raise BaseDosDadosException(
+ "Permission denied. The service account used to create the BigLake connection"
+ " does not have permission to read data from the source bucket. Please grant"
+ f" the service account {biglake_connection.service_account} the Storage Object Viewer"
+ " (roles/storage.objectViewer) role on the source bucket (or on the project)."
+ " Or, you can try running this again with set_biglake_connection_permissions=True."
+ ) from exc
+ raise BaseDosDadosException(
+ "Something went wrong when creating the table. Please check the logs for more information."
+ ) from exc
+ except Exception as exc:
+ raise BaseDosDadosException(
+ "Something went wrong when creating the table. Please check the logs for more information."
+ ) from exc
+
+ logger.success(
+ "{object} {object_id} was {action} in {mode}!",
+ object_id=self.table_id,
+ mode="staging",
+ object="Table",
+ action="created",
+ )
+ # return None
+
+ def update(self, mode="prod", custom_schema=None):
+ """Updates BigQuery schema and description.
+ Args:
+ mode (str): Optional.
+ Table of which table to update [prod]
+ not_found_ok (bool): Optional.
+ What to do if table is not found
+ """
+
+ self._check_mode(mode)
+
+ table = self._get_table_obj(mode)
+
+ table.description = self._get_table_description()
+
+ # when mode is staging the table schema already exists
+ if mode == "prod" and custom_schema is None:
+ table.schema = self._load_schema_from_json(
+ columns=self._get_cross_columns_from_bq_api()
+ )
+ if mode == "prod" and custom_schema is not None:
+ table.schema = self._load_schema_from_json(custom_schema)
+
+ fields = ["description", "schema"]
+
+ self.client["bigquery_prod"].update_table(table, fields=fields)
+
+ logger.success(
+ " {object} {object_id} was {action} in {mode}!",
+ object_id=self.table_id,
+ mode=mode,
+ object="Table",
+ action="updated",
+ )
+
+ def publish(self, if_exists="raise", custon_publish_sql=None, custom_schema=None):
+ """Creates BigQuery table at production dataset.
+
+ Table should be located at `<dataset_id>.<table_id>`.
+
+ It creates a view that uses the query from
+ `<metadata_path>/<dataset_id>/<table_id>/publish.sql`.
+
+ Make sure that all columns from the query also exists at
+ `<metadata_path>/<dataset_id>/<table_id>/table_config.sql`, including
+ the partitions.
+
+ Args:
+ if_exists (str): Optional.
+ What to do if table exists.
+
+ * 'raise' : Raises Conflict exception
+ * 'replace' : Replace table
+ * 'pass' : Do nothing
+
+ Todo:
+
+ * Check if all required fields are filled
+ """
+ # TODO: review this method
+
+ if if_exists == "replace" and self.table_exists(mode="prod"):
+ self.delete(mode="prod")
+
+ publish_sql = self._make_publish_sql()
+
+ # create view using API metadata
+ if custon_publish_sql is None:
+ self.client["bigquery_prod"].query(publish_sql).result()
+ self.update(mode="prod")
+
+ # create view using custon query
+ if custon_publish_sql is not None:
+ self.client["bigquery_prod"].query(custon_publish_sql).result()
+ # update schema using a custom schema
+ if custom_schema is not None:
+ self.update(custom_schema=custom_schema)
+
+ logger.success(
+ " {object} {object_id} was {action}!",
+ object_id=self.table_id,
+ object="Table",
+ action="published",
+ )
+
+ def delete(self, mode="all"):
+ """Deletes table in BigQuery.
+
+ Args:
+ mode (str): Table of which table to delete [prod|staging]
+ """
+
+ self._check_mode(mode)
+
+ if mode == "all":
+ for m, n in self.table_full_name[mode].items():
+ self.client[f"bigquery_{m}"].delete_table(n, not_found_ok=True)
+ logger.info(
+ " {object} {object_id}_{mode} was {action}!",
+ object_id=self.table_id,
+ mode=m,
+ object="Table",
+ action="deleted",
+ )
+ else:
+ self.client[f"bigquery_{mode}"].delete_table(
+ self.table_full_name[mode], not_found_ok=True
+ )
+ logger.info(
+ " {object} {object_id}_{mode} was {action}!",
+ object_id=self.table_id,
+ mode=mode,
+ object="Table",
+ action="deleted",
+ )
+
+ def append(
+ self,
+ filepath,
+ partitions=None,
+ if_exists="replace",
+ chunk_size=None,
+ **upload_args,
+ ):
+ """Appends new data to existing BigQuery table.
+
+ As long as the data has the same schema. It appends the data in the
+ filepath to the existing table.
+
+ Args:
+ filepath (str or pathlib.PosixPath): Where to find the file that you want to upload to create a table with
+ partitions (str, pathlib.PosixPath, dict): Optional.
+ Hive structured partition as a string or dict
+
+ * str : `<key>=<value>/<key2>=<value2>`
+ * dict: `dict(key=value, key2=value2)`
+ if_exists (str): 0ptional.
+ What to do if data with same name exists in storage
+
+ * 'raise' : Raises Conflict exception
+ * 'replace' : Replace table
+ * 'pass' : Do nothing
+ chunk_size (int): Optional
+ The size of a chunk of data whenever iterating (in bytes).
+ This must be a multiple of 256 KB per the API specification.
+ If not specified, the chunk_size of the blob itself is used. If that is not specified, a default value of 40 MB is used.
+ """
+ if not self.table_exists("staging"):
+ raise BaseDosDadosException(
+ "You cannot append to a table that does not exist"
+ )
+ Storage(
+ self.dataset_id,
+ self.table_id,
+ ).upload(
+ filepath,
+ mode="staging",
+ partitions=partitions,
+ if_exists=if_exists,
+ chunk_size=chunk_size,
+ **upload_args,
+ )
+ logger.success(
+ " {object} {object_id} was {action}!",
+ object_id=self.table_id,
+ object="Table",
+ action="appended",
+ )
+
table_config
+
+
+ property
+ readonly
+
+
+Load table config
+append(self, filepath, partitions=None, if_exists='replace', chunk_size=None, **upload_args)
+
+
+Appends new data to existing BigQuery table.
+As long as the data has the same schema. It appends the data in the +filepath to the existing table.
+ +Parameters:
+Name | +Type | +Description | +Default | +
---|---|---|---|
filepath |
+ str or pathlib.PosixPath |
+ Where to find the file that you want to upload to create a table with |
+ required | +
partitions |
+ str, pathlib.PosixPath, dict |
+ Optional. +Hive structured partition as a string or dict +
|
+ None |
+
if_exists |
+ str |
+ 0ptional. +What to do if data with same name exists in storage +
|
+ 'replace' |
+
chunk_size |
+ int |
+ Optional +The size of a chunk of data whenever iterating (in bytes). +This must be a multiple of 256 KB per the API specification. +If not specified, the chunk_size of the blob itself is used. If that is not specified, a default value of 40 MB is used. |
+ None |
+
basedosdados/upload/table.py
def append(
+ self,
+ filepath,
+ partitions=None,
+ if_exists="replace",
+ chunk_size=None,
+ **upload_args,
+):
+ """Appends new data to existing BigQuery table.
+
+ As long as the data has the same schema. It appends the data in the
+ filepath to the existing table.
+
+ Args:
+ filepath (str or pathlib.PosixPath): Where to find the file that you want to upload to create a table with
+ partitions (str, pathlib.PosixPath, dict): Optional.
+ Hive structured partition as a string or dict
+
+ * str : `<key>=<value>/<key2>=<value2>`
+ * dict: `dict(key=value, key2=value2)`
+ if_exists (str): 0ptional.
+ What to do if data with same name exists in storage
+
+ * 'raise' : Raises Conflict exception
+ * 'replace' : Replace table
+ * 'pass' : Do nothing
+ chunk_size (int): Optional
+ The size of a chunk of data whenever iterating (in bytes).
+ This must be a multiple of 256 KB per the API specification.
+ If not specified, the chunk_size of the blob itself is used. If that is not specified, a default value of 40 MB is used.
+ """
+ if not self.table_exists("staging"):
+ raise BaseDosDadosException(
+ "You cannot append to a table that does not exist"
+ )
+ Storage(
+ self.dataset_id,
+ self.table_id,
+ ).upload(
+ filepath,
+ mode="staging",
+ partitions=partitions,
+ if_exists=if_exists,
+ chunk_size=chunk_size,
+ **upload_args,
+ )
+ logger.success(
+ " {object} {object_id} was {action}!",
+ object_id=self.table_id,
+ object="Table",
+ action="appended",
+ )
+
create(self, path=None, source_format='csv', csv_delimiter=',', csv_skip_leading_rows=1, csv_allow_jagged_rows=False, if_table_exists='raise', if_storage_data_exists='raise', if_dataset_exists='pass', dataset_is_public=True, location=None, chunk_size=None, biglake_table=False, set_biglake_connection_permissions=True)
+
+
+Creates a BigQuery table in the staging dataset.
+If a path is provided, data is automatically saved in storage, +and a datasets folder and BigQuery location are created, in addition to creating +the table and its configuration files.
+The new table is located at <dataset_id>_staging.<table_id>
in BigQuery.
Data can be found in Storage at <bucket_name>/staging/<dataset_id>/<table_id>/*
+and is used to build the table.
The following data types are supported:
+Data can also be partitioned following the Hive partitioning scheme
+<key1>=<value1>/<key2>=<value2>
; for example,
+year=2012/country=BR
. The partition is automatically detected by searching for partitions
+in the table_config.yaml
file.
Parameters:
+Name | +Type | +Description | +Default | +
---|---|---|---|
path |
+ str or pathlib.PosixPath |
+ The path to the file to be uploaded to create the table. |
+ None |
+
source_format |
+ str |
+ Optional. The format of the data source. Only 'csv', 'avro', and 'parquet' +are supported. Defaults to 'csv'. |
+ 'csv' |
+
csv_delimiter |
+ str |
+ Optional. +The separator for fields in a CSV file. The separator can be any ISO-8859-1 +single-byte character. Defaults to ','. |
+ ',' |
+
csv_skip_leading_rows(int) |
+ + | Optional. +The number of rows at the top of a CSV file that BigQuery will skip when loading the data. +Defaults to 1. |
+ required | +
csv_allow_jagged_rows |
+ bool |
+ Optional. +Indicates if BigQuery should allow extra values that are not represented in the table schema. +Defaults to False. |
+ False |
+
if_table_exists |
+ str |
+ Optional. Determines what to do if the table already exists: +
|
+ 'raise' |
+
if_storage_data_exists |
+ str |
+ Optional. Determines what to do if the data already exists on your bucket: +
|
+ 'raise' |
+
if_dataset_exists |
+ str |
+ Optional. Determines what to do if the dataset already exists: +
|
+ 'pass' |
+
dataset_is_public |
+ bool |
+ Optional. Controls if the prod dataset is public or not. By default, staging datasets like |
+ True |
+
location |
+ str |
+ Optional. The location of the dataset data. List of possible region names locations: https://cloud.google.com/bigquery/docs/locations |
+ None |
+
chunk_size |
+ int |
+ Optional. The size of a chunk of data whenever iterating (in bytes). This must be a multiple of 256 KB per the API specification. +If not specified, the chunk_size of the blob itself is used. If that is not specified, a default value of 40 MB is used. |
+ None |
+
biglake_table |
+ bool |
+ Optional. Sets this as a BigLake table. BigLake tables allow end-users to query from external data (such as GCS) even if +they don't have access to the source data. IAM is managed like any other BigQuery native table. See https://cloud.google.com/bigquery/docs/biglake-intro for more on BigLake. |
+ False |
+
set_biglake_connection_permissions |
+ bool |
+ Optional. If set to |
+ True |
+
basedosdados/upload/table.py
def create(
+ self,
+ path=None,
+ source_format="csv",
+ csv_delimiter=",",
+ csv_skip_leading_rows=1,
+ csv_allow_jagged_rows=False,
+ if_table_exists="raise",
+ if_storage_data_exists="raise",
+ if_dataset_exists="pass",
+ dataset_is_public=True,
+ location=None,
+ chunk_size=None,
+ biglake_table=False,
+ set_biglake_connection_permissions=True,
+):
+ """Creates a BigQuery table in the staging dataset.
+
+ If a path is provided, data is automatically saved in storage,
+ and a datasets folder and BigQuery location are created, in addition to creating
+ the table and its configuration files.
+
+ The new table is located at `<dataset_id>_staging.<table_id>` in BigQuery.
+
+ Data can be found in Storage at `<bucket_name>/staging/<dataset_id>/<table_id>/*`
+ and is used to build the table.
+
+ The following data types are supported:
+
+ - Comma-Delimited CSV
+ - Apache Avro
+ - Apache Parquet
+
+ Data can also be partitioned following the Hive partitioning scheme
+ `<key1>=<value1>/<key2>=<value2>`; for example,
+ `year=2012/country=BR`. The partition is automatically detected by searching for `partitions`
+ in the `table_config.yaml` file.
+
+ Args:
+ path (str or pathlib.PosixPath): The path to the file to be uploaded to create the table.
+ source_format (str): Optional. The format of the data source. Only 'csv', 'avro', and 'parquet'
+ are supported. Defaults to 'csv'.
+ csv_delimiter (str): Optional.
+ The separator for fields in a CSV file. The separator can be any ISO-8859-1
+ single-byte character. Defaults to ','.
+ csv_skip_leading_rows(int): Optional.
+ The number of rows at the top of a CSV file that BigQuery will skip when loading the data.
+ Defaults to 1.
+ csv_allow_jagged_rows (bool): Optional.
+ Indicates if BigQuery should allow extra values that are not represented in the table schema.
+ Defaults to False.
+ if_table_exists (str): Optional. Determines what to do if the table already exists:
+
+ * 'raise' : Raises a Conflict exception
+ * 'replace' : Replaces the table
+ * 'pass' : Does nothing
+ if_storage_data_exists (str): Optional. Determines what to do if the data already exists on your bucket:
+
+ * 'raise' : Raises a Conflict exception
+ * 'replace' : Replaces the table
+ * 'pass' : Does nothing
+ if_dataset_exists (str): Optional. Determines what to do if the dataset already exists:
+
+ * 'raise' : Raises a Conflict exception
+ * 'replace' : Replaces the dataset
+ * 'pass' : Does nothing
+ dataset_is_public (bool): Optional. Controls if the prod dataset is public or not. By default, staging datasets like `dataset_id_staging` are not public.
+ location (str): Optional. The location of the dataset data. List of possible region names locations: https://cloud.google.com/bigquery/docs/locations
+ chunk_size (int): Optional. The size of a chunk of data whenever iterating (in bytes). This must be a multiple of 256 KB per the API specification.
+ If not specified, the chunk_size of the blob itself is used. If that is not specified, a default value of 40 MB is used.
+ biglake_table (bool): Optional. Sets this as a BigLake table. BigLake tables allow end-users to query from external data (such as GCS) even if
+ they don't have access to the source data. IAM is managed like any other BigQuery native table. See https://cloud.google.com/bigquery/docs/biglake-intro for more on BigLake.
+ set_biglake_connection_permissions (bool): Optional. If set to `True`, attempts to grant the BigLake connection service account access to the table's data in GCS.
+
+ """
+
+ if path is None:
+ # Look if table data already exists at Storage
+ data = self.client["storage_staging"].list_blobs(
+ self.bucket_name, prefix=f"staging/{self.dataset_id}/{self.table_id}"
+ )
+
+ # Raise: Cannot create table without external data
+ if not data:
+ raise BaseDosDadosException(
+ "You must provide a path for uploading data"
+ )
+
+ # Add data to storage
+ if isinstance(
+ path,
+ (
+ str,
+ Path,
+ ),
+ ):
+ Storage(
+ dataset_id=self.dataset_id,
+ table_id=self.table_id,
+ config_path=self.config_path,
+ bucket_name=self.bucket_name,
+ ).upload(
+ path=path,
+ mode="staging",
+ if_exists=if_storage_data_exists,
+ chunk_size=chunk_size,
+ )
+
+ # Create Dataset if it doesn't exist
+
+ dataset_obj = Dataset(
+ self.dataset_id,
+ )
+
+ dataset_obj.create(
+ if_exists=if_dataset_exists,
+ mode="all",
+ location=location,
+ dataset_is_public=dataset_is_public,
+ )
+
+ if biglake_table:
+ biglake_connection = self._get_biglake_connection(
+ set_biglake_connection_permissions=set_biglake_connection_permissions,
+ location=location,
+ mode="staging",
+ )
+ biglake_connection_id = biglake_connection.connection_id
+
+ table = bigquery.Table(self.table_full_name["staging"])
+
+ table.description = self._get_table_description(mode="staging")
+
+ table.external_data_configuration = Datatype(
+ dataset_id=self.dataset_id,
+ table_id=self.table_id,
+ schema=self._load_staging_schema_from_data(
+ data_sample_path=path,
+ source_format=source_format,
+ csv_delimiter=csv_delimiter,
+ ),
+ source_format=source_format,
+ csv_skip_leading_rows=csv_skip_leading_rows,
+ csv_delimiter=csv_delimiter,
+ csv_allow_jagged_rows=csv_allow_jagged_rows,
+ mode="staging",
+ bucket_name=self.bucket_name,
+ partitioned=self._is_partitioned(
+ data_sample_path=path,
+ source_format=source_format,
+ csv_delimiter=csv_delimiter,
+ ),
+ biglake_connection_id=biglake_connection_id if biglake_table else None,
+ ).external_config
+
+ # When using BigLake tables, schema must be provided to the `Table` object
+ if biglake_table:
+ table.schema = self._load_staging_schema_from_data(
+ data_sample_path=path,
+ source_format=source_format,
+ csv_delimiter=csv_delimiter,
+ )
+ logger.info(f"Using BigLake connection {biglake_connection_id}")
+
+ # Lookup if table alreay exists
+ table_ref = None
+ with contextlib.suppress(google.api_core.exceptions.NotFound):
+ table_ref = self.client["bigquery_staging"].get_table(
+ self.table_full_name["staging"]
+ )
+
+ if isinstance(table_ref, google.cloud.bigquery.table.Table):
+ if if_table_exists == "pass":
+ return None
+
+ if if_table_exists == "raise":
+ raise FileExistsError(
+ "Table already exists, choose replace if you want to overwrite it"
+ )
+
+ if if_table_exists == "replace" and self.table_exists(mode="staging"):
+ self.delete(mode="staging")
+
+ try:
+ self.client["bigquery_staging"].create_table(table)
+ except google.api_core.exceptions.Forbidden as exc:
+ if biglake_table:
+ raise BaseDosDadosException(
+ "Permission denied. The service account used to create the BigLake connection"
+ " does not have permission to read data from the source bucket. Please grant"
+ f" the service account {biglake_connection.service_account} the Storage Object Viewer"
+ " (roles/storage.objectViewer) role on the source bucket (or on the project)."
+ " Or, you can try running this again with set_biglake_connection_permissions=True."
+ ) from exc
+ raise BaseDosDadosException(
+ "Something went wrong when creating the table. Please check the logs for more information."
+ ) from exc
+ except Exception as exc:
+ raise BaseDosDadosException(
+ "Something went wrong when creating the table. Please check the logs for more information."
+ ) from exc
+
+ logger.success(
+ "{object} {object_id} was {action} in {mode}!",
+ object_id=self.table_id,
+ mode="staging",
+ object="Table",
+ action="created",
+ )
+ # return None
+
delete(self, mode='all')
+
+
+Deletes table in BigQuery.
+ +Parameters:
+Name | +Type | +Description | +Default | +
---|---|---|---|
mode |
+ str |
+ Table of which table to delete [prod|staging] |
+ 'all' |
+
basedosdados/upload/table.py
def delete(self, mode="all"):
+ """Deletes table in BigQuery.
+
+ Args:
+ mode (str): Table of which table to delete [prod|staging]
+ """
+
+ self._check_mode(mode)
+
+ if mode == "all":
+ for m, n in self.table_full_name[mode].items():
+ self.client[f"bigquery_{m}"].delete_table(n, not_found_ok=True)
+ logger.info(
+ " {object} {object_id}_{mode} was {action}!",
+ object_id=self.table_id,
+ mode=m,
+ object="Table",
+ action="deleted",
+ )
+ else:
+ self.client[f"bigquery_{mode}"].delete_table(
+ self.table_full_name[mode], not_found_ok=True
+ )
+ logger.info(
+ " {object} {object_id}_{mode} was {action}!",
+ object_id=self.table_id,
+ mode=mode,
+ object="Table",
+ action="deleted",
+ )
+
publish(self, if_exists='raise', custon_publish_sql=None, custom_schema=None)
+
+
+Creates BigQuery table at production dataset.
+Table should be located at <dataset_id>.<table_id>
.
It creates a view that uses the query from
+<metadata_path>/<dataset_id>/<table_id>/publish.sql
.
Make sure that all columns from the query also exists at
+<metadata_path>/<dataset_id>/<table_id>/table_config.sql
, including
+the partitions.
Parameters:
+Name | +Type | +Description | +Default | +
---|---|---|---|
if_exists |
+ str |
+ Optional. +What to do if table exists. +
|
+ 'raise' |
+
Todo:
+* Check if all required fields are filled
+
+
+ basedosdados/upload/table.py
def publish(self, if_exists="raise", custon_publish_sql=None, custom_schema=None):
+ """Creates BigQuery table at production dataset.
+
+ Table should be located at `<dataset_id>.<table_id>`.
+
+ It creates a view that uses the query from
+ `<metadata_path>/<dataset_id>/<table_id>/publish.sql`.
+
+ Make sure that all columns from the query also exists at
+ `<metadata_path>/<dataset_id>/<table_id>/table_config.sql`, including
+ the partitions.
+
+ Args:
+ if_exists (str): Optional.
+ What to do if table exists.
+
+ * 'raise' : Raises Conflict exception
+ * 'replace' : Replace table
+ * 'pass' : Do nothing
+
+ Todo:
+
+ * Check if all required fields are filled
+ """
+ # TODO: review this method
+
+ if if_exists == "replace" and self.table_exists(mode="prod"):
+ self.delete(mode="prod")
+
+ publish_sql = self._make_publish_sql()
+
+ # create view using API metadata
+ if custon_publish_sql is None:
+ self.client["bigquery_prod"].query(publish_sql).result()
+ self.update(mode="prod")
+
+ # create view using custon query
+ if custon_publish_sql is not None:
+ self.client["bigquery_prod"].query(custon_publish_sql).result()
+ # update schema using a custom schema
+ if custom_schema is not None:
+ self.update(custom_schema=custom_schema)
+
+ logger.success(
+ " {object} {object_id} was {action}!",
+ object_id=self.table_id,
+ object="Table",
+ action="published",
+ )
+
table_exists(self, mode)
+
+
+Check if table exists in BigQuery.
+ +Parameters:
+Name | +Type | +Description | +Default | +
---|---|---|---|
mode |
+ str |
+ Which dataset to check [prod|staging]. |
+ required | +
basedosdados/upload/table.py
def table_exists(self, mode):
+ """Check if table exists in BigQuery.
+
+ Args:
+ mode (str): Which dataset to check [prod|staging].
+ """
+
+ try:
+ ref = self._get_table_obj(mode=mode)
+ except google.api_core.exceptions.NotFound:
+ ref = None
+
+ return bool(ref)
+
update(self, mode='prod', custom_schema=None)
+
+
+Updates BigQuery schema and description.
+ +Parameters:
+Name | +Type | +Description | +Default | +
---|---|---|---|
mode |
+ str |
+ Optional. +Table of which table to update [prod] |
+ 'prod' |
+
not_found_ok |
+ bool |
+ Optional. +What to do if table is not found |
+ required | +
basedosdados/upload/table.py
def update(self, mode="prod", custom_schema=None):
+ """Updates BigQuery schema and description.
+ Args:
+ mode (str): Optional.
+ Table of which table to update [prod]
+ not_found_ok (bool): Optional.
+ What to do if table is not found
+ """
+
+ self._check_mode(mode)
+
+ table = self._get_table_obj(mode)
+
+ table.description = self._get_table_description()
+
+ # when mode is staging the table schema already exists
+ if mode == "prod" and custom_schema is None:
+ table.schema = self._load_schema_from_json(
+ columns=self._get_cross_columns_from_bq_api()
+ )
+ if mode == "prod" and custom_schema is not None:
+ table.schema = self._load_schema_from_json(custom_schema)
+
+ fields = ["description", "schema"]
+
+ self.client["bigquery_prod"].update_table(table, fields=fields)
+
+ logger.success(
+ " {object} {object_id} was {action} in {mode}!",
+ object_id=self.table_id,
+ mode=mode,
+ object="Table",
+ action="updated",
+ )
+
Esta API é composta somente de módulos para requisição de dados, ou +seja, download e/ou carregamento de dados do projeto no seu ambiente de +análise). +Para fazer gerenciamento de dados no Google Cloud, busque as funções +na API de linha de comando ou em Python.
+A documentação completa encontra-se na página do CRAN do projeto, e +segue baixo.
+Toda documentação do código abaixo está em inglês
+Os principais erros encontrados do pacote da Base dos Dados no Rstudio são derivados de dois fatores:
+* Autenticação
+
+* Versão do pacote `dbplyr`
+
+Portanto, se algum erro aparecer para você, por favor, tente primeiro checar se ele está relacionado a esses dois fatores.
+A maioria dos erros do nosso pacote estão relacionados a problemas de autenticação. O pacote basedosdados
requer que o usuário forneça todas as autenticações solicitadas pela função basedosdados::set_billing_id
, inclusive aquelas que aparecem como optativas. Por isso, é necessário estar atento se você marcou todas as caixinhas de seleção quando o Rstudio disponibiliza essa tela no navegador:
Note que é preciso marcar inclusive as duas últimas "caixinhas", que aparecem como opcionais. Caso você tenha esquecido de marcá-las, todas as outras funções do pacote não irão funcionar posteriormente.
+Caso você já tenha autenticado com autorização incompleta, é preciso repetir o processo de autenticação. Você pode fazer isso rodando gargle::gargle_oauth_sitrep()
. Você deverá checar a pasta em que estão salvas as autenticações do seu R, entrar nesta pasta e deletar aquela referente ao Google Cloud/Bigquery. Feito isso, ao rodar basedosdados::set_billing_id
você poderá autenticar novamente.
Veja como é simples:
+ +Realizados todos esses procedimentos, é bem provável que os erros anteriores não ocorram mais.
+dbplyr
Outro erro comum está relacionado ao uso da função basedosdados::bdplyr
. Nosso pacote em R foi construído utilizando outros pacotes disponíveis na comunidade. Isso significa que atualizações destes pacotes podem alterar o funcionamento destes e gerar efeitos em cascata a outros pacotes desenvolvidos em cima deles. Neste contexto, o nosso pacote funciona apenas com a versão 2.1.1 do pacote dbplyr
, e não funciona com versões posteriores.
Você pode checar a sua versão do dbplyr
rodando utils::packageVersion("dbplyr")
no seu R. Caso ela seja superior à versão 2.1.1, você precisa dar um downgrade para a versão correta. Para isso, você pode rodar devtools::install_version("dbplyr", version = "2.1.1", repos = "http://cran.us.r-project.org")
.
Caso os erros persistam, você pode abrir uma issue no nosso Github clicando aqui. Você também visitar as issues que já foram resolvidas e estão atribuídas com o a etiqueta R
em nosso Github aqui.
Esta API é composta por módulos para requisição de dados: para aquele(as) que desejam + somente consultar os dados e metadados do nosso projeto (ou qualquer outro + projeto no Google Cloud).
+Toda documentação do código abaixo está em inglês
+Se é a sua primeira vez utilizando o pacote, digite db basedosdados
e confirme novamente se as etapas acima foram concluídas com sucesso.
O pacote contém 7 comandos, conforme suas funcionalidades descritas abaixo:
+Comando | +Descrição | +
---|---|
bd_download |
+baixa dados da Base dos Dados (BD). | +
bd_read_sql |
+baixa tabelas da BD usando consultas específicas. | +
bd_read_table |
+baixa tabelas da BD usando dataset_id e table_id . |
+
bd_list_datasets |
+lista o dataset_id dos conjuntos de dados disponíveis em query_project_id . |
+
bd_list_dataset_tables |
+lista table_id para tabelas disponíveis no dataset_id especificado. |
+
bd_get_table_description |
+mostra a descrição completa da tabela BD. | +
bd_get_table_columns |
+mostra os nomes, tipos e descrições das colunas na tabela especificada. | +
Cada comando tem um help file de apoio, bastando abrir o help e seguir as instruções:
+help [comando]
+
\n {translation(\"search.result.term.missing\")}: {...missing}\n
\n }\n