Skip to content

Commit

Permalink
dev
Browse files Browse the repository at this point in the history
  • Loading branch information
ppodolsky committed Dec 5, 2024
1 parent c7ae45f commit 332507e
Show file tree
Hide file tree
Showing 22 changed files with 703 additions and 149 deletions.
58 changes: 45 additions & 13 deletions aiosumma/aiosumma/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,41 @@
from .proto.utils_pb2 import Asc, Desc # noqa


def setup_metadata(session_id, request_id):
def prepare_filters(filters: dict[str, list | tuple | str]) -> list[dict]:
all_extra_filters = []
for field_name, values in filters.items():
subqueries = []
for value in values:
if isinstance(value, (list, tuple)):
subqueries.append(
{
"query": {
"match": {
"value": f"{field_name}:+[{value[0]} TO {value[1]}]"
}
},
"occur": "must",
}
)
else:
subqueries.append(
{
"occur": "should",
"query": {
"match": {"value": f"{field_name}:{value}"},
},
}
)
all_extra_filters.append(
{
"query": {"boolean": {"subqueries": subqueries}},
"occur": "must",
}
)
return all_extra_filters


def setup_metadata(session_id: str, request_id: str):
metadata = []
if session_id:
metadata.append(('session-id', session_id))
Expand All @@ -29,15 +63,15 @@ def setup_metadata(session_id, request_id):
return metadata


def prepare_search_request(search_request):
def prepare_search_request(search_request: dict | query_pb.SearchRequest) -> query_pb.SearchRequest:
if isinstance(search_request, Dict):
dict_search_request = search_request
search_request = query_pb.SearchRequest()
ParseDict(dict_search_request, search_request)
return search_request


def prepare_query(query):
def prepare_query(query: dict | str) -> query_pb.Query:
if isinstance(query, Dict):
dict_query = query
elif isinstance(query, str):
Expand Down Expand Up @@ -473,12 +507,12 @@ async def get_indices_aliases(

@expose
async def documents(
self,
index_name: str,
query_filter: Optional[Union[dict, str]] = None,
fields: Optional[List[str]] = None,
request_id: Optional[str] = None,
session_id: Optional[str] = None,
self,
index_name: str,
query_filter: Optional[Union[dict, str]] = None,
fields: Optional[List[str]] = None,
request_id: Optional[str] = None,
session_id: Optional[str] = None,
) -> AsyncIterator[str]:
"""
Retrieve all documents from the index
Expand All @@ -491,16 +525,14 @@ async def documents(
# asyncfor is buggy: https://github.com/grpc/grpc/issues/32005
if query_filter:
query_filter = prepare_query(query_filter)
streaming_call = self.stubs['index_api'].documents(
async for document in self.stubs['index_api'].documents(
index_service_pb.DocumentsRequest(
index_name=index_name,
query_filter=query_filter,
fields=fields,
),
metadata=setup_metadata(session_id, request_id),
)
while True:
document = await asyncio.create_task(streaming_call.read())
):
if document == grpc.aio.EOF:
break
yield document.document
Expand Down
16 changes: 13 additions & 3 deletions aiosumma/aiosumma/proto/consumer_service_pb2.py

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

93 changes: 77 additions & 16 deletions aiosumma/aiosumma/proto/consumer_service_pb2_grpc.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,29 @@
# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT!
"""Client and server classes corresponding to protobuf-defined services."""
import grpc
import warnings

from . import consumer_service_pb2 as consumer__service__pb2

GRPC_GENERATED_VERSION = '1.67.1'
GRPC_VERSION = grpc.__version__
_version_not_supported = False

try:
from grpc._utilities import first_version_is_lower
_version_not_supported = first_version_is_lower(GRPC_VERSION, GRPC_GENERATED_VERSION)
except ImportError:
_version_not_supported = True

if _version_not_supported:
raise RuntimeError(
f'The grpc package installed is at version {GRPC_VERSION},'
+ f' but the generated code in consumer_service_pb2_grpc.py depends on'
+ f' grpcio>={GRPC_GENERATED_VERSION}.'
+ f' Please upgrade your grpc module to grpcio>={GRPC_GENERATED_VERSION}'
+ f' or downgrade your generated code using grpcio-tools<={GRPC_VERSION}.'
)


class ConsumerApiStub(object):
"""Manage ingestion data from Kafka
Expand All @@ -19,22 +39,22 @@ def __init__(self, channel):
'/summa.proto.ConsumerApi/create_consumer',
request_serializer=consumer__service__pb2.CreateConsumerRequest.SerializeToString,
response_deserializer=consumer__service__pb2.CreateConsumerResponse.FromString,
)
_registered_method=True)
self.get_consumer = channel.unary_unary(
'/summa.proto.ConsumerApi/get_consumer',
request_serializer=consumer__service__pb2.GetConsumerRequest.SerializeToString,
response_deserializer=consumer__service__pb2.GetConsumerResponse.FromString,
)
_registered_method=True)
self.get_consumers = channel.unary_unary(
'/summa.proto.ConsumerApi/get_consumers',
request_serializer=consumer__service__pb2.GetConsumersRequest.SerializeToString,
response_deserializer=consumer__service__pb2.GetConsumersResponse.FromString,
)
_registered_method=True)
self.delete_consumer = channel.unary_unary(
'/summa.proto.ConsumerApi/delete_consumer',
request_serializer=consumer__service__pb2.DeleteConsumerRequest.SerializeToString,
response_deserializer=consumer__service__pb2.DeleteConsumerResponse.FromString,
)
_registered_method=True)


class ConsumerApiServicer(object):
Expand Down Expand Up @@ -96,6 +116,7 @@ def add_ConsumerApiServicer_to_server(servicer, server):
generic_handler = grpc.method_handlers_generic_handler(
'summa.proto.ConsumerApi', rpc_method_handlers)
server.add_generic_rpc_handlers((generic_handler,))
server.add_registered_method_handlers('summa.proto.ConsumerApi', rpc_method_handlers)


# This class is part of an EXPERIMENTAL API.
Expand All @@ -114,11 +135,21 @@ def create_consumer(request,
wait_for_ready=None,
timeout=None,
metadata=None):
return grpc.experimental.unary_unary(request, target, '/summa.proto.ConsumerApi/create_consumer',
return grpc.experimental.unary_unary(
request,
target,
'/summa.proto.ConsumerApi/create_consumer',
consumer__service__pb2.CreateConsumerRequest.SerializeToString,
consumer__service__pb2.CreateConsumerResponse.FromString,
options, channel_credentials,
insecure, call_credentials, compression, wait_for_ready, timeout, metadata)
options,
channel_credentials,
insecure,
call_credentials,
compression,
wait_for_ready,
timeout,
metadata,
_registered_method=True)

@staticmethod
def get_consumer(request,
Expand All @@ -131,11 +162,21 @@ def get_consumer(request,
wait_for_ready=None,
timeout=None,
metadata=None):
return grpc.experimental.unary_unary(request, target, '/summa.proto.ConsumerApi/get_consumer',
return grpc.experimental.unary_unary(
request,
target,
'/summa.proto.ConsumerApi/get_consumer',
consumer__service__pb2.GetConsumerRequest.SerializeToString,
consumer__service__pb2.GetConsumerResponse.FromString,
options, channel_credentials,
insecure, call_credentials, compression, wait_for_ready, timeout, metadata)
options,
channel_credentials,
insecure,
call_credentials,
compression,
wait_for_ready,
timeout,
metadata,
_registered_method=True)

@staticmethod
def get_consumers(request,
Expand All @@ -148,11 +189,21 @@ def get_consumers(request,
wait_for_ready=None,
timeout=None,
metadata=None):
return grpc.experimental.unary_unary(request, target, '/summa.proto.ConsumerApi/get_consumers',
return grpc.experimental.unary_unary(
request,
target,
'/summa.proto.ConsumerApi/get_consumers',
consumer__service__pb2.GetConsumersRequest.SerializeToString,
consumer__service__pb2.GetConsumersResponse.FromString,
options, channel_credentials,
insecure, call_credentials, compression, wait_for_ready, timeout, metadata)
options,
channel_credentials,
insecure,
call_credentials,
compression,
wait_for_ready,
timeout,
metadata,
_registered_method=True)

@staticmethod
def delete_consumer(request,
Expand All @@ -165,8 +216,18 @@ def delete_consumer(request,
wait_for_ready=None,
timeout=None,
metadata=None):
return grpc.experimental.unary_unary(request, target, '/summa.proto.ConsumerApi/delete_consumer',
return grpc.experimental.unary_unary(
request,
target,
'/summa.proto.ConsumerApi/delete_consumer',
consumer__service__pb2.DeleteConsumerRequest.SerializeToString,
consumer__service__pb2.DeleteConsumerResponse.FromString,
options, channel_credentials,
insecure, call_credentials, compression, wait_for_ready, timeout, metadata)
options,
channel_credentials,
insecure,
call_credentials,
compression,
wait_for_ready,
timeout,
metadata,
_registered_method=True)
16 changes: 13 additions & 3 deletions aiosumma/aiosumma/proto/dag_pb_pb2.py

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

20 changes: 20 additions & 0 deletions aiosumma/aiosumma/proto/dag_pb_pb2_grpc.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,24 @@
# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT!
"""Client and server classes corresponding to protobuf-defined services."""
import grpc
import warnings


GRPC_GENERATED_VERSION = '1.67.1'
GRPC_VERSION = grpc.__version__
_version_not_supported = False

try:
from grpc._utilities import first_version_is_lower
_version_not_supported = first_version_is_lower(GRPC_VERSION, GRPC_GENERATED_VERSION)
except ImportError:
_version_not_supported = True

if _version_not_supported:
raise RuntimeError(
f'The grpc package installed is at version {GRPC_VERSION},'
+ f' but the generated code in dag_pb_pb2_grpc.py depends on'
+ f' grpcio>={GRPC_GENERATED_VERSION}.'
+ f' Please upgrade your grpc module to grpcio>={GRPC_GENERATED_VERSION}'
+ f' or downgrade your generated code using grpcio-tools<={GRPC_VERSION}.'
)
20 changes: 15 additions & 5 deletions aiosumma/aiosumma/proto/index_service_pb2.py

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 332507e

Please sign in to comment.