diff --git a/cmem_plugin_graphql/workflow/graphql.py b/cmem_plugin_graphql/workflow/graphql.py index 49d45a2..187765c 100644 --- a/cmem_plugin_graphql/workflow/graphql.py +++ b/cmem_plugin_graphql/workflow/graphql.py @@ -15,14 +15,17 @@ MultilineStringParameterType, ) from cmem_plugin_base.dataintegration.plugins import WorkflowPlugin +from cmem_plugin_base.dataintegration.ports import ( + UnknownSchemaPort, +) from cmem_plugin_base.dataintegration.utils import write_to_dataset +from cmem_plugin_base.dataintegration.utils.entity_builder import build_entities_from_data from gql import Client, gql from gql.transport.aiohttp import AIOHTTPTransport from graphql import GraphQLError, GraphQLSyntaxError from cmem_plugin_graphql.workflow.utils import ( get_dict, - get_entities_from_list, is_jinja_template, ) @@ -128,6 +131,8 @@ def __init__( # nosec if oauth_access_token: self.headers["Authorization"] = f"Bearer {oauth_access_token}" + self._set_ports() + def set_graphql_variable_values(self, variable_values: str) -> None: """Validate and set graphql_variable_values""" try: @@ -213,7 +218,7 @@ def execute(self, inputs: Sequence[Entities], context: ExecutionContext) -> Enti context=context.user, ) - return get_entities_from_list(payload) + return build_entities_from_data(payload) def process_entities(self, entities: Entities) -> Iterator[dict[str, Any] | None]: """Process entities""" @@ -242,3 +247,10 @@ def process_entities(self, entities: Entities) -> Iterator[dict[str, Any] | None self.log.error(f"Failed entity: {type(ex)}") # noqa: TRY400 yield result + + def _set_ports(self) -> None: + """Define input/output ports based on the configuration""" + if self.graphql_dataset: + self.output_port = None + else: + self.output_port = UnknownSchemaPort()