Skip to content

Commit

Permalink
add support for local external collections (#154)
Browse files Browse the repository at this point in the history
* add support for local external collections

Co-authored-by: Carmen Tawalika <[email protected]>
  • Loading branch information
metzm and mmacata authored Oct 29, 2022
1 parent e6e93e1 commit 9c3b7f5
Show file tree
Hide file tree
Showing 7 changed files with 551 additions and 331 deletions.
1 change: 1 addition & 0 deletions .flake8
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ per-file-ignores =
./src/openeo_grass_gis_driver/models/job_schemas.py: W605, E501
./src/openeo_grass_gis_driver/models/service_schemas.py: W605
./src/openeo_grass_gis_driver/collection_information.py: E501
./src/openeo_grass_gis_driver/local_collections.py: E501
./src/openeo_grass_gis_driver/jobs_job_id.py: E501
./src/openeo_grass_gis_driver/jobs_job_id_estimate.py: E501
./src/openeo_grass_gis_driver/jobs_job_id_logs.py: E501
Expand Down
201 changes: 127 additions & 74 deletions src/openeo_grass_gis_driver/actinia_processing/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@

# This is the process dictionary that is used to store all processes of
# the Actinia wrapper
from openeo_grass_gis_driver.actinia_processing.actinia_interface import \
ActiniaInterface
from openeo_grass_gis_driver.actinia_processing.actinia_interface import (
ActiniaInterface,
)

from openeo_grass_gis_driver.models.process_graph_schemas import ProcessGraph

Expand Down Expand Up @@ -62,7 +63,14 @@ class GrassDataType(Enum):
RASTER = "raster"
VECTOR = "vector"
STRDS = "strds"
STAC = "rastercube"
EXTERN = "rastercube"


class DataSource(Enum):

GRASS = "grass"
STAC = "stac"
LOCAL = "gdallocal"


class DataObject:
Expand All @@ -71,14 +79,17 @@ class DataObject:
"""

def __init__(
self,
name: str,
datatype: GrassDataType,
mapset: str = None,
location: str = None):
self,
name: str,
datatype: GrassDataType,
datasource: DataSource = DataSource.GRASS,
mapset: str = None,
location: str = None,
):

self.name = name
self.datatype = datatype
self.datasource = datasource
self.mapset = mapset
self.location = location

Expand All @@ -90,38 +101,67 @@ def __str__(self):
def from_string(name: str):

AI = ActiniaInterface
location, mapset, datatype, layer_name = AI.layer_def_to_components(
name)
location, mapset, datatype, layer_name = AI.layer_def_to_components(name)

if datatype is None:
raise Exception(f"Invalid collection id <{name}>")

if GrassDataType.RASTER.value == datatype:
datasource = DataSource.GRASS
if location == "stac":
datasource = DataSource.STAC
elif location == "local":
datasource = DataSource.LOCAL

if DataSource.GRASS == datasource and GrassDataType.RASTER.value == datatype:
return DataObject(
name=layer_name,
datatype=GrassDataType.RASTER,
datasource=datasource,
mapset=mapset,
location=location)
elif GrassDataType.VECTOR.value == datatype:
location=location,
)
elif DataSource.GRASS == datasource and GrassDataType.VECTOR.value == datatype:
return DataObject(
name=layer_name,
datatype=GrassDataType.VECTOR,
datasource=datasource,
mapset=mapset,
location=location)
elif GrassDataType.STRDS.value == datatype:
location=location,
)
elif DataSource.GRASS == datasource and GrassDataType.STRDS.value == datatype:
return DataObject(
name=layer_name,
datatype=GrassDataType.STRDS,
datasource=datasource,
mapset=mapset,
location=location)
elif GrassDataType.STAC.value == datatype:
location=location,
)
elif DataSource.STAC == datasource and GrassDataType.EXTERN.value == datatype:
# location = "stac"
del AI.PROCESS_LOCATION[location]
AI.PROCESS_LOCATION["latlong_wgs84"] = "latlong_wgs84"
return DataObject(
name=layer_name,
datatype=GrassDataType.STAC,
datatype=GrassDataType.EXTERN,
datasource=datasource,
mapset=mapset,
location="latlong_wgs84")
location="latlong_wgs84",
)
elif DataSource.LOCAL == datasource and GrassDataType.EXTERN.value == datatype:
# location = "local"
del AI.PROCESS_LOCATION[location]
AI.PROCESS_LOCATION["latlong_wgs84"] = "latlong_wgs84"
return DataObject(
name=layer_name,
datatype=GrassDataType.EXTERN,
datasource=datasource,
mapset=mapset,
location="latlong_wgs84",
)

raise Exception(f"Unsupported object type <{datatype}>")
raise Exception(
f"Unsupported object type <{datatype}> and data source <{datasource.value}>"
)

def grass_name(self):

Expand All @@ -134,24 +174,40 @@ def full_name(self):

def is_strds(self):

return self.datatype == GrassDataType.STRDS
return (
self.datasource == DataSource.GRASS and self.datatype == GrassDataType.STRDS
)

def is_raster(self):

return self.datatype == GrassDataType.RASTER
return (
self.datasource == DataSource.GRASS
and self.datatype == GrassDataType.RASTER
)

def is_vector(self):

return self.datatype == GrassDataType.VECTOR
return (
self.datasource == DataSource.GRASS
and self.datatype == GrassDataType.VECTOR
)

def is_stac(self):

return self.datatype == GrassDataType.STAC
return (
self.datasource == DataSource.STAC and self.datatype == GrassDataType.EXTERN
)

def is_local(self):

return (
self.datasource == DataSource.LOCAL
and self.datatype == GrassDataType.EXTERN
)


class Node:
"""A single node in the process graph
"""
"""A single node in the process graph"""

def __init__(self, id, process_description: dict):

Expand All @@ -171,7 +227,7 @@ def __init__(self, id, process_description: dict):
def add_output(self, output_object: DataObject):
self.output_objects.add(output_object)

def get_parent_by_name(self, parent_name: str) -> Optional['Node']:
def get_parent_by_name(self, parent_name: str) -> Optional["Node"]:
if parent_name in self.parents_dict.keys():
return self.parents_dict[parent_name]
return None
Expand All @@ -198,8 +254,10 @@ def __str__(self):
if self.parents_dict:
parent_names = list(self.parents_dict.keys())

return (f"Node: {self.id} parent names: {parent_names} parent "
f"ids: {parent_ids} child ids: {child_ids}")
return (
f"Node: {self.id} parent names: {parent_names} parent "
f"ids: {parent_ids} child ids: {child_ids}"
)

def get_parents_dict(self) -> dict:

Expand All @@ -220,9 +278,7 @@ def as_dict(self) -> dict:


class Graph:
"""This class represents a process graph
"""
"""This class represents a process graph"""

def __init__(self, graph_description: Union[Dict, ProcessGraph]):
"""The constructor checks the validity of the provided dictionary
Expand All @@ -237,7 +293,8 @@ def __init__(self, graph_description: Union[Dict, ProcessGraph]):
self.title: str = graph_description.title
self.description: str = graph_description.description
self.build_process_graph_from_description(
process_graph=graph_description.process_graph)
process_graph=graph_description.process_graph
)
else:
if "title" not in graph_description:
# raise Exception("Title is required in the process graph")
Expand All @@ -250,13 +307,16 @@ def __init__(self, graph_description: Union[Dict, ProcessGraph]):

# graph_description can be a process with process_graph or
# only a process_graph
if "process" not in graph_description and \
"process_graph" not in graph_description:
raise Exception(
"process_graph is required in the process graph")

if "process" in graph_description and \
"process_graph" not in graph_description["process"]:
if (
"process" not in graph_description
and "process_graph" not in graph_description
):
raise Exception("process_graph is required in the process graph")

if (
"process" in graph_description
and "process_graph" not in graph_description["process"]
):
raise Exception("process_graph is required in the process")

if "process" in graph_description:
Expand All @@ -266,8 +326,7 @@ def __init__(self, graph_description: Union[Dict, ProcessGraph]):
self.title: str = graph_description["title"]
self.description: str = graph_description["description"]

self.build_process_graph_from_description(
process_graph=process_graph)
self.build_process_graph_from_description(process_graph=process_graph)

def build_process_graph_from_description(self, process_graph: dict):
"""Build the directed process graph from the graph description
Expand Down Expand Up @@ -425,31 +484,30 @@ def openeo_to_actinia(node: Node) -> Tuple[list, list]:
# warning, error, exception?
continue
# check if it is an input object
if isinstance(node.arguments[key], dict) and \
"from_node" in node.arguments[key]:
if isinstance(node.arguments[key], dict) and "from_node" in node.arguments[key]:
# input option comes from another node in the process graph
# which output object in the set of output_objects?
value = list(
node.get_parent_by_name(
parent_name=key).output_objects)[0]
value = list(node.get_parent_by_name(parent_name=key).output_objects)[0]
data_object = value
# check schema subtype of parameter and compare with
# datatype of data_object
if ao["schema"]["subtype"] == "cell" and \
data_object.datatype != GrassDataType.RASTER:
raise Exception(
"Wrong input data type, expecting 'cell'")
elif ao["schema"]["subtype"] == "strds" and \
data_object.datatype != GrassDataType.STRDS:
raise Exception(
"Wrong input data type, expecting 'strds'")
elif ao["schema"]["subtype"] == "vector" and \
data_object.datatype != GrassDataType.VECTOR:
raise Exception(
"Wrong input data type, expecting 'vector'")

param = {"param": key,
"value": data_object.grass_name()}
if (
ao["schema"]["subtype"] == "cell"
and data_object.datatype != GrassDataType.RASTER
):
raise Exception("Wrong input data type, expecting 'cell'")
elif (
ao["schema"]["subtype"] == "strds"
and data_object.datatype != GrassDataType.STRDS
):
raise Exception("Wrong input data type, expecting 'strds'")
elif (
ao["schema"]["subtype"] == "vector"
and data_object.datatype != GrassDataType.VECTOR
):
raise Exception("Wrong input data type, expecting 'vector'")

param = {"param": key, "value": data_object.grass_name()}
pc["inputs"].append(param)
elif ao["schema"]["type"] == "boolean":
# flag
Expand All @@ -461,18 +519,15 @@ def openeo_to_actinia(node: Node) -> Tuple[list, list]:
else:
# option answer, treat as string
value = node.arguments[key]
param = {"param": key,
"value": str(value)}
param = {"param": key, "value": str(value)}
pc["inputs"].append(param)

if pc["flags"] is None:
del pc["flags"]

# TODO: support modules that do not have input maps
if data_object is None:
raise Exception(
"No input data object for actinia process '%s'" %
module_name)
raise Exception("No input data object for actinia process '%s'" % module_name)

# output parameters
if "returns" in module and openeo_returns is not None:
Expand All @@ -499,16 +554,14 @@ def openeo_to_actinia(node: Node) -> Tuple[list, list]:
# in order to distinguish between different outputs
# of the same module
output_object = DataObject(
name=create_output_name(data_object.name, node),
datatype=datatype)
param = {"param": key,
"value": output_object.grass_name()}
name=create_output_name(data_object.name, node), datatype=datatype
)
param = {"param": key, "value": output_object.grass_name()}
pc["inputs"].append(param)
output_objects.append(output_object)
node.add_output(output_object=output_object)
if module_name in T_BASENAME_MODULES_LIST:
param = {"param": "basename",
"value": output_object.name}
param = {"param": "basename", "value": output_object.name}
pc["inputs"].append(param)

process_list.append(pc)
Expand All @@ -532,7 +585,7 @@ def create_output_name(input: str, node: Node):
new_uuid = uuid.uuid4().hex

# shorter version: only uuid + node id
node_id = node.id.lower().replace(' ', '_')
node_id = node.id.lower().replace(" ", "_")
output = f"uuid{new_uuid}_{node_id}"

return output
Expand Down
2 changes: 2 additions & 0 deletions src/openeo_grass_gis_driver/actinia_processing/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ class ACTINIA:
# The database file that stores the actinia jobs
ACTINIA_JOB_DB = "%s/.actinia_job_db_file.sqlite" % os.environ["HOME"]
SECRET_KEY = "jaNguzeef4seiv5shahchimoo8teiLah"
# path to json files with collection definitions
LOCAL_COLLECTIONS = "%s/.openeo_local_collections" % os.environ["HOME"]


class Configfile:
Expand Down
Loading

0 comments on commit 9c3b7f5

Please sign in to comment.