Skip to content

Commit

Permalink
ajout script pote sas to parquet
Browse files Browse the repository at this point in the history
  • Loading branch information
clallemand committed Jul 31, 2024
1 parent 570e68e commit c1514a9
Show file tree
Hide file tree
Showing 2 changed files with 103 additions and 36 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
import shutil
from pathlib import Path

import pandas as pd
from tqdm import tqdm
import json
import gc
import glob
import os
import pyarrow.parquet as pq
import pyarrow.compute as pc

year = "2022"
SAS_FILE = (
r"\\casd.fr\casdfs\Projets\LEXIMPA\Data\POTE_POTE_"
+ year
+ "\pote_diff_"
+ year
+ ".sas7bdat"
)
CHUNKS_OUT_PATH = (r"C:\Users\Public\Documents\donnees_brutes\POTE/chunks/" + year)
ARROW_OUT_PATH = (r"C:\Users\Public\Documents\donnees_brutes\POTE/parquet_columns/" + year)
taille_chunk = 100_000

dfi = pd.read_sas(
SAS_FILE, chunksize=taille_chunk, encoding="iso8859-15", iterator=True
)

dd_values = None
i = 0

# 1) on fait des chunks au format parquet de la table complète en SAS.
for chunk in tqdm(dfi):
columns = [c.lower() for c in chunk.columns.to_list()]
chunk.columns = [c.lower() for c in chunk.columns.to_list()]
chunk.drop(["fip18_c"], axis=1, inplace=True)
chunk.to_parquet(f"{CHUNKS_OUT_PATH}/pote_{i}.parquet")
del chunk
gc.collect()

# 2) On lit tous les chunks pour faire une table parquet par colonne
parquet_file = glob.glob(os.path.join(CHUNKS_OUT_PATH, "*.parquet"))

dfv = pq.ParquetDataset(parquet_file)

column_names = list()
for col in dfv.schema:
column_names.append(col.name)

stats = dict()

for col in column_names:
print(col)
datas = dfv.read([col])
stats[col] = {
'nombre_na' : datas.column(col).null_count,
'dtype' : str(datas.column(col).type)
}
if datas.column(col).type in ("double", "integer"):
stats[col]['somme'] = pc.sum(datas.column(col)).as_py()
pq.write_table(datas,f"{ARROW_OUT_PATH}/pote_{col}.parquet")

with open(f"{ARROW_OUT_PATH}/columns_stats_desc.json", "w") as outfile:
json.dump(stats_corr, outfile)

stats_corr = pd.DataFrame(stats_corr)
stats_corr.transpose().to_excel(f"{ARROW_OUT_PATH}/columns_stats_desc.xlsx")
Original file line number Diff line number Diff line change
@@ -1,36 +1,36 @@
from openfisca_france_data.utils import build_cerfa_fields_by_variable
import pandas as pd
import logging
import glob

def create_pote_openfisca_variables_list(year, errors_path, raw_data_directory):
logging.warning("Récupération des colonnes en commun entre Pote et Openfisca")
dict_variables_cerfa_field = build_cerfa_fields_by_variable(year = year)

variables_cerfa_field = list()
for var_list in list(dict_variables_cerfa_field.values()):
variables_cerfa_field += var_list
pd.DataFrame({'liste_var': variables_cerfa_field}).to_csv(f"{errors_path}cerfa_openfisca.csv")
doublons = dict([(n,variables_cerfa_field.count(n)) for n in set(variables_cerfa_field)])
doublons = list({k:v for (k,v) in doublons.items() if v>1}.keys())
assert len(doublons) == 0, f"Il y a des doublons dans les cases cerfa d'openfisca france : {doublons}"

del doublons

colonnes_pote = glob.glob(f"{raw_data_directory}*.parquet")
colonnes_pote = [col.split("\\")[-1].split("_")[1].split(".")[0] for col in colonnes_pote]
colonnes_pote = ["f" + str.lower(c[1:]) for c in colonnes_pote if str.lower(c).startswith('z')]

var_to_keep = list(set(colonnes_pote) & set(variables_cerfa_field))
logging.warning(f"Parmi les {len(colonnes_pote)} variables de pote, {len(var_to_keep)} ont été trouvées dans openfisca")
var_not_in_openfisca = [c for c in colonnes_pote if c not in variables_cerfa_field]
pd.DataFrame({'liste_var': var_not_in_openfisca}).to_csv(f"{errors_path}cerfa_manquants_openfisca.csv")

variables_foyer_fiscal = dict()
variables_individu = dict()
for openfisca_var, cerfa in dict_variables_cerfa_field.items():
if len(cerfa) == 1:
variables_foyer_fiscal[openfisca_var] = cerfa[0]
else:
variables_individu[openfisca_var] = cerfa
return variables_individu, variables_foyer_fiscal
from openfisca_france_data.utils import build_cerfa_fields_by_variable
import pandas as pd
import logging
import glob

def create_pote_openfisca_variables_list(year, errors_path, raw_data_directory):
logging.warning("Récupération des colonnes en commun entre Pote et Openfisca")
dict_variables_cerfa_field = build_cerfa_fields_by_variable(year = year)

variables_cerfa_field = list()
for var_list in list(dict_variables_cerfa_field.values()):
variables_cerfa_field += var_list
pd.DataFrame({'liste_var': variables_cerfa_field}).to_csv(f"{errors_path}cerfa_openfisca.csv")
doublons = dict([(n,variables_cerfa_field.count(n)) for n in set(variables_cerfa_field)])
doublons = list({k:v for (k,v) in doublons.items() if v>1}.keys())
assert len(doublons) == 0, f"Il y a des doublons dans les cases cerfa d'openfisca france : {doublons}"

del doublons

colonnes_pote = glob.glob(f"{raw_data_directory}*.parquet")
colonnes_pote = [col.split("\\")[-1].split("_")[1].split(".")[0] for col in colonnes_pote]
colonnes_pote = ["f" + str.lower(c[1:]) for c in colonnes_pote if str.lower(c).startswith('z')]

var_to_keep = list(set(colonnes_pote) & set(variables_cerfa_field))
logging.warning(f"Parmi les {len(colonnes_pote)} variables de pote, {len(var_to_keep)} ont été trouvées dans openfisca")
var_not_in_openfisca = [c for c in colonnes_pote if c not in variables_cerfa_field]
pd.DataFrame({'liste_var': var_not_in_openfisca}).to_csv(f"{errors_path}cerfa_manquants_openfisca.csv")

variables_foyer_fiscal = dict()
variables_individu = dict()
for openfisca_var, cerfa in dict_variables_cerfa_field.items():
if len(cerfa) == 1:
variables_foyer_fiscal[openfisca_var] = cerfa[0]
else:
variables_individu[openfisca_var] = cerfa
return variables_individu, variables_foyer_fiscal

0 comments on commit c1514a9

Please sign in to comment.