diff --git a/cl_hubeau/watercourses_flow/__init__.py b/cl_hubeau/watercourses_flow/__init__.py new file mode 100755 index 0000000..499c1df --- /dev/null +++ b/cl_hubeau/watercourses_flow/__init__.py @@ -0,0 +1,11 @@ +# -*- coding: utf-8 -*- + +from .watercourses_flow_scraper import WatercoursesFlowSession +from .utils import get_all_stations, get_all_observations + + +__all__ = [ + "get_all_stations", + "get_all_observations", + "WatercoursesFlowSession", +] diff --git a/cl_hubeau/watercourses_flow/utils.py b/cl_hubeau/watercourses_flow/utils.py new file mode 100644 index 0000000..f13f3d4 --- /dev/null +++ b/cl_hubeau/watercourses_flow/utils.py @@ -0,0 +1,189 @@ +import geopandas as gpd +import pandas as pd +from tqdm import tqdm +from datetime import date, datetime +from itertools import product + +from cl_hubeau.watercourses_flow.watercourses_flow_scraper import ( + WatercoursesFlowSession, +) +from cl_hubeau import _config +from cl_hubeau.utils import get_departements, prepare_kwargs_loops + + +def get_all_stations(**kwargs) -> gpd.GeoDataFrame: + """ + Retrieve all stations from France. + + Parameters + ---------- + **kwargs : + kwargs passed to WatercoursesFlowSession.get_stations (hence mostly intended + for hub'eau API's arguments). Do not use `format` or `code_departement` + as they are set by the current function. + + Returns + ------- + results : gpd.GeoDataFrame + GeoDataFrame of stations + + """ + + with WatercoursesFlowSession() as session: + + deps = get_departements() + results = [ + session.get_stations(code_departement=dep, format="geojson", **kwargs) + for dep in tqdm( + deps, + desc="querying dep/dep", + leave=_config["TQDM_LEAVE"], + position=tqdm._get_free_pos(), + ) + ] + results = [x.dropna(axis=1, how="all") for x in results if not x.empty] + results = gpd.pd.concat(results, ignore_index=True) + try: + results["code_station"] + results = results.drop_duplicates("code_station") + except KeyError: + pass + return results + + +def get_all_observations(**kwargs) -> gpd.GeoDataFrame: + """ + Retrieve all observsations from France. + + Parameters + ---------- + **kwargs : + kwargs passed to WatercoursesFlowSession.get_observations (hence mostly intended + for hub'eau API's arguments). Do not use `format` or `code_departement` + as they are set by the current function. + + Returns + ------- + results : gpd.GeoDataFrame + GeoDataFrame of observations + """ + + deps = get_departements() + + # Set a loop for yearly querying as dataset are big + start_auto_determination = False + if "date_observation_min" not in kwargs: + start_auto_determination = True + kwargs["date_observation_min"] = "1960-01-01" + if "date_observation_max" not in kwargs: + kwargs["date_observation_max"] = date.today().strftime("%Y-%m-%d") + + # ranges = pd.date_range( + # start=datetime.strptime(kwargs.pop("date_observation_min"), "%Y-%m-%d").date(), + # end=datetime.strptime(kwargs.pop("date_observation_max"), "%Y-%m-%d").date(), + # ) + # dates = pd.Series(ranges).to_frame("date") + # dates["year"] = dates["date"].dt.year + # dates = dates.groupby("year")["date"].agg(["min", "max"]) + # for d in "min", "max": + # dates[d] = dates[d].dt.strftime("%Y-%m-%d") + # if start_auto_determination: + # dates = pd.concat( + # [ + # dates, + # pd.DataFrame([{"min": "1900-01-01", "max": "2015-12-31"}]), + # ], + # ignore_index=False, + # ).sort_index() + + # args = list(product(deps, dates.values.tolist())) + + # with WatercoursesFlowSession() as session: + + # results = [ + # session.get_observations( + # format="geojson", + # date_observation_min=date_min, + # date_observation_max=date_max, + # **{"code_departement": chunk}, + # **kwargs, + # ) + # for chunk, (date_min, date_max) in tqdm( + # args, + # desc="querying station/station and year/year", + # leave=_config["TQDM_LEAVE"], + # position=tqdm._get_free_pos(), + # ) + # ] + + desc = "querying year/year" + (" & dep/dep" if "code_departement" in kwargs else "") + + kwargs_loop = prepare_kwargs_loops( + "date_observation_min", + "date_observation_max", + kwargs, + start_auto_determination, + ) + + with WatercoursesFlowSession() as session: + + results = [ + session.get_observations( + format="geojson", + **kwargs, + **kw_loop, + ) + for kw_loop in tqdm( + kwargs_loop, + desc=desc, + leave=_config["TQDM_LEAVE"], + position=tqdm._get_free_pos(), + ) + ] + + results = [x.dropna(axis=1, how="all") for x in results if not x.empty] + results = pd.concat(results, ignore_index=True) + return results + + +def get_all_campagnes(**kwargs) -> gpd.GeoDataFrame: + """ + Retrieve all campagnes from France. + + Parameters + ---------- + **kwargs : + kwargs passed to WatercoursesFlowSession.get_campagnes (hence mostly intended + for hub'eau API's arguments). Do not use `code_departement` + as they are set by the current function. + + Returns + ------- + results : gpd.GeoDataFrame + GeoDataFrame of campagnes + """ + + with WatercoursesFlowSession() as session: + try: + results = session.get_campagnes(**kwargs) + except ValueError: + # If request is too big + deps = get_departements() + results = [ + session.get_campagnes(code_departement=dep, **kwargs) + for dep in tqdm( + deps, + desc="querying dep/dep", + leave=_config["TQDM_LEAVE"], + position=tqdm._get_free_pos(), + ) + ] + results = [x.dropna(axis=1, how="all") for x in results if not x.empty] + results = gpd.pd.concat(results, ignore_index=True) + return results + + +# if __name__ == "__main__": +# # print(get_all_stations()) +# # print(get_all_observations()) +# print(get_all_campagnes()) diff --git a/cl_hubeau/watercourses_flow/watercourses_flow_scraper.py b/cl_hubeau/watercourses_flow/watercourses_flow_scraper.py new file mode 100755 index 0000000..de483d4 --- /dev/null +++ b/cl_hubeau/watercourses_flow/watercourses_flow_scraper.py @@ -0,0 +1,324 @@ +# -*- coding: utf-8 -*- +""" +Created on Fri Sep 13 10:57:00 2024 + +low level class to collect data from the watercourses-flow API from hub'eau +""" + +import pandas as pd + +from cl_hubeau.session import BaseHubeauSession + + +class WatercoursesFlowSession(BaseHubeauSession): + """ + Base session class to handle the watercourses-flow API + """ + + def __init__(self, *args, **kwargs): + + super().__init__(version="1.0.0", *args, **kwargs) + + # TODO où trouve-t-on cette taille dans la doc ? + # Set default size for API queries, based on hub'eau piezo's doc + self.size = 1000 + + def get_stations(self, **kwargs): + """ + Lister les stations + Endpoint /v1/ecoulement/stations + + Doc: https://hubeau.eaufrance.fr/page/api-ecoulement + """ + + params = {} + + try: + variable = kwargs.pop("format") + if variable not in ("json", "geojson"): + raise ValueError( + "format must be among ('json', 'geojson'), " + f"found format='{variable}' instead" + ) + params["format"] = variable + except KeyError: + pass + + try: + params["bbox"] = self.list_to_str_param(kwargs.pop("bbox"), None, 4) + except KeyError: + pass + + for arg in ( + "code_station", + "libelle_station", + "code_departement", + "libelle_departement", + "code_commune", + "libelle_commune", + "code_region", + "libelle_region", + "code_cours_eau", + "libelle_cours_eau", + ): + try: + variable = kwargs.pop(arg) + params[arg] = self.list_to_str_param(variable, 200) + except KeyError: + continue + + for arg in ( + "code_bassin", + "libelle_bassin", + ): + try: + variable = kwargs.pop(arg) + params[arg] = self.list_to_str_param(variable, 15) + except KeyError: + continue + + try: + fields = kwargs.pop("fields") + params["fields"] = self.list_to_str_param(fields) + except KeyError: + pass + + for arg in ("distance", "latitude", "longitude"): + try: + params[arg] = kwargs.pop(arg) + except KeyError: + continue + + try: + variable = kwargs.pop("sort") + if variable not in ("asc", "desc"): + raise ValueError( + "format must be among ('asc', 'sort'), " + f"found sort='{variable}' instead" + ) + params["sort"] = variable + except KeyError: + pass + + if kwargs: + raise ValueError( + f"found unexpected arguments {kwargs}, " + "please have a look at the documentation on " + "https://hubeau.eaufrance.fr/page/api-ecoulement" + ) + + method = "GET" + url = self.BASE_URL + "/v1/ecoulement/stations" + df = self.get_result(method, url, params=params) + + return df + + def get_observations(self, **kwargs): + """ + Lister les observations + Endpoint /v1/ecoulement/observations + + Doc: https://hubeau.eaufrance.fr/page/api-ecoulement + """ + + params = {} + + try: + variable = kwargs.pop("format") + if variable not in ("json", "geojson"): + raise ValueError( + "format must be among ('json', 'geojson'), " + f"found format='{variable}' instead" + ) + params["format"] = variable + except KeyError: + pass + + try: + params["bbox"] = self.list_to_str_param(kwargs.pop("bbox"), None, 4) + except KeyError: + pass + + for arg in "date_observation_min", "date_observation_max": + try: + variable = kwargs.pop(arg) + self.ensure_date_format_is_ok(variable) + params[arg] = variable + except KeyError: + continue + + for arg in ( + "code_station", + "libelle_station", + "code_departement", + "libelle_departement", + "code_commune", + "libelle_commune", + "code_region", + "libelle_region", + "code_cours_eau", + "libelle_cours_eau", + "code_campagne", + "code_reseau", + "libelle_reseau", + ): + try: + variable = kwargs.pop(arg) + params[arg] = self.list_to_str_param(variable, 200) + except KeyError: + continue + + for arg in ( + "code_bassin", + "libelle_bassin", + ): + try: + variable = kwargs.pop(arg) + params[arg] = self.list_to_str_param(variable, 15) + except KeyError: + continue + + for arg in ( + "code_ecoulement", + "libelle_ecoulement", + ): + try: + variable = kwargs.pop(arg) + params[arg] = self.list_to_str_param(variable, 5) + except KeyError: + continue + + try: + fields = kwargs.pop("fields") + params["fields"] = self.list_to_str_param(fields) + except KeyError: + pass + + for arg in ("distance", "latitude", "longitude"): + try: + params[arg] = kwargs.pop(arg) + except KeyError: + continue + + try: + variable = kwargs.pop("sort") + if variable not in ("asc", "desc"): + raise ValueError( + "format must be among ('asc', 'sort'), " + f"found sort='{variable}' instead" + ) + params["sort"] = variable + except KeyError: + pass + + if kwargs: + raise ValueError( + f"found unexpected arguments {kwargs}, " + "please have a look at the documentation on " + "https://hubeau.eaufrance.fr/page/api-ecoulement" + ) + + method = "GET" + url = self.BASE_URL + "/v1/ecoulement/observations" + df = self.get_result(method, url, params=params) + + return df + + def get_campagnes(self, **kwargs): + """ + Lister les campagnes + Endpoint /v1/ecoulement/campagnes + + Doc: https://hubeau.eaufrance.fr/page/api-ecoulement + """ + + params = {} + + for arg in "date_campagne_min", "date_campagne_max": + try: + variable = kwargs.pop(arg) + self.ensure_date_format_is_ok(variable) + params[arg] = variable + except KeyError: + continue + + try: + code_campagne = kwargs.pop("code_campagne") + params["code_campagne"] = self.list_to_str_param(code_campagne, 20) + except KeyError: + pass + + for arg in ( + "code_reseau", + "libelle_reseau", + "code_departement", + "libelle_departement", + ): + try: + variable = kwargs.pop(arg) + params[arg] = self.list_to_str_param(variable, 200) + except KeyError: + continue + + try: + variable = kwargs.pop("code_campagne") + if str(code_campagne) in ["1", "2"]: + params["code_campagne"] = variable + else: + raise ValueError( + "code_campagne must be among ('1', '2'), " + f"found sort='{variable}' instead" + ) + except KeyError: + pass + + try: + variable = kwargs.pop("libelle_type_campagne") + if variable.capitalize() in ["Usuelle", "Complémentaire"]: + params["libelle_type_campagne"] = variable.capitalize() + else: + raise ValueError( + "libelle_type_campagne must be among ('Usuelle', 'Complémentaire'), " + f"found sort='{variable}' instead" + ) + except KeyError: + pass + + try: + fields = kwargs.pop("fields") + params["fields"] = self.list_to_str_param(fields) + except KeyError: + pass + + try: + variable = kwargs.pop("sort") + if variable not in ("asc", "desc"): + raise ValueError( + "format must be among ('asc', 'sort'), " + f"found sort='{variable}' instead" + ) + params["sort"] = variable + except KeyError: + pass + + if kwargs: + raise ValueError( + f"found unexpected arguments {kwargs}, " + "please have a look at the documentation on " + "https://hubeau.eaufrance.fr/page/api-ecoulement" + ) + + method = "GET" + url = self.BASE_URL + "/v1/ecoulement/campagnes" + df = self.get_result(method, url, params=params) + + return df + + +# if __name__ == "__main__": +# with WatercoursesFlowSession() as session: +# # df = session.get_stations(code_departement="59", format="geojson") +# # df = session.get_campagnes(code_campagne=[12]) +# df = session.get_observations(code_station="F6640008") + +# print(df) diff --git a/docs/watercourses_flow.md b/docs/watercourses_flow.md new file mode 100644 index 0000000..ceb7c7d --- /dev/null +++ b/docs/watercourses_flow.md @@ -0,0 +1,101 @@ +--- +layout: default +title: API Ecoulement des cours d'eau +language: fr +handle: /watercourses_flow +nav_order: 8 + +--- +# API Ecoulement des cours d'eau + +[https://hubeau.eaufrance.fr/page/api-ecoulement](https://hubeau.eaufrance.fr/page/api-ecoulement) + +`cl-hubeau` définit : + +* des fonctions de haut niveau implémentant des boucles basiques ; +* des fonctions de bas niveau qui implémentent directement les différents points d'entrée de l'API. + +{: .warning } +Lors de l'utilisation des fonctions de bas niveau, l'utilisateur est responsable +de la consommation de l'API. En particulier, il s'agit d'être vigilant quant au seuil +de 20 000 résultats récupérables d'une seule requête. +Par ailleurs, la gestion du cache par les fonctions de bas niveau est de la responsabilité +de l'utilisateur, notamment pour l'accès aux données de temps réel (expiration par défaut +fixée à 30 jours). + +Dans les deux cas, les fonctions implémentées sont conçues pour boucler sur les résultats de la +requête : les arguments optionnels `size` et `page` ou `cursor` ne doivent pas être fournis +au client python. + +## Fonctions de haut niveau + +### Récupération de la totalité des stations + +Cette fonction permet de récupérer les stations de la France entière. + +```python +from cl_hubeau import watercourses_flow +gdf = watercourses_flow.get_all_stations() +``` + +Il est également possible de spécifier des arguments à la fonction, parmi ceux supportés +par le point de sortie "stations" de l'API, à l'exception de : +* `format` (fixé par défaut au format geojson pour retourner un geodataframe) +* `code_departement` (utilisé pour boucler sur les données nationales) + +Par exemple : +```python +from cl_hubeau import watercourses_flow +gdf = watercourses_flow.get_all_stations(code_cours_eau="D0110600") +``` + +### Récupération de la totalité des observations + +Cette fonction permet de récupérer les observations de la France entière. + +```python +from cl_hubeau import watercourses_flow +gdf = watercourses_flow.get_all_observations() +``` + +Il est également possible de spécifier des arguments à la fonction, parmi ceux supportés +par le point de sortie "stations" de l'API, à l'exception de : +* `format` (fixé par défaut au format geojson pour retourner un geodataframe) +* `code_departement` (utilisé pour boucler sur les données nationales) + +Par exemple : +```python +from cl_hubeau import watercourses_flow +gdf = watercourses_flow.get_all_observations(code_cours_eau="D0110600") +``` + +## Fonctions de bas niveau + +Un objet session est défini pour consommer l'API à l'aide de méthodes de bas niveau. +Ces méthodes correspondent strictement aux fonctions disponibles via l'API : l'utilisateur +est invité à se reporter à la documentation de l'API concernant le détail des arguments +disponibles. + +### Lister les stations + +```python +from cl_hubeau import watercourses_flow +with watercourses_flow.WatercoursesFlowSession() as session: + df = session.get_stations(code_departement=['02', '59', '60', '62', '80'], format="geojson") +``` + +### Lister les observations + +```python +from cl_hubeau import watercourses_flow +with watercourses_flow.WatercoursesFlowSession() as session: + df = session.get_observations(code_station="F6640008") +``` + +### Lister les campagnes + +```python +from cl_hubeau import watercourses_flow +with watercourses_flow.WatercoursesFlowSession() as session: + df = session.get_campagnes(code_departement="59") +``` \ No newline at end of file diff --git a/tests/test_watercourses_flow.py b/tests/test_watercourses_flow.py new file mode 100644 index 0000000..17eb31a --- /dev/null +++ b/tests/test_watercourses_flow.py @@ -0,0 +1,106 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +""" +Created on Fri Sep 13 16:54:09 2024 + +Test mostly high level functions +""" + +import geopandas as gpd +import pandas as pd +import pytest + +from requests_cache import CacheMixin + +from cl_hubeau import watercourses_flow +from cl_hubeau.watercourses_flow import WatercoursesFlowSession + + +class MockResponse: + def __init__(self, json_data): + self.json_data = json_data + self.ok = True + + def json(self): + return self.json_data + + +@pytest.fixture +def mock_get_data(monkeypatch): + + def mock_request(*args, **kwargs): + self, method, url, *args = args + + if "stations" in url or "observations" in url: + data = { + "count": 1, + "first": "blah_page", + "data": [ + { + "code_station": "dummy", + "libelle_station": "Dummy", + "uri_station": "blah_dummy", + "geometry": { + "type": "Point", + "crs": { + "type": "name", + "properties": {"name": "urn:ogc:def:crs:OGC:1.3:CRS84"}, + }, + "coordinates": [0, 0], + }, + } + ], + "features": [ + { + "type": "Feature", + "properties": { + "code_station": "dummy_code", + "libelle_station": "dummy", + }, + "geometry": {"type": "Point", "coordinates": [0, 0]}, + } + ], + } + elif "campagnes" in url: + data = { + "count": 1, + "first": "blah_campagne", + "next": None, + "data": [ + { + "code_campagne": "dummy", + "date_campagne": "2011-10-20", + } + ], + } + + return MockResponse(data) + + # init = CachedSession.request + monkeypatch.setattr(CacheMixin, "request", mock_request) + + +def test_get_one_station_live(): + with WatercoursesFlowSession() as session: + data = session.get_stations(code_station=["D0110001"], format="geojson") + assert isinstance(data, gpd.GeoDataFrame) + assert len(data) == 1 + + +def test_get_one_campagne_live(): + with WatercoursesFlowSession() as session: + data = session.get_campagnes(code_campagne=[12]) + assert isinstance(data, pd.DataFrame) + assert len(data) == 1 + + +def test_get_all_stations_mocked(mock_get_data): + data = watercourses_flow.get_all_stations() + assert isinstance(data, gpd.GeoDataFrame) + assert len(data) == 1 + + +def test_get_all_observations_mocked(mock_get_data): + data = watercourses_flow.get_all_observations() + assert isinstance(data, gpd.GeoDataFrame) + assert len(data) == 1