Skip to content

Commit

Permalink
separate in 2 rpc
Browse files Browse the repository at this point in the history
  • Loading branch information
chamini2 committed Oct 22, 2024
1 parent 16ec00b commit a2fc4ec
Show file tree
Hide file tree
Showing 5 changed files with 81 additions and 16 deletions.
4 changes: 3 additions & 1 deletion src/isolate/server/definitions/server.proto
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@ import "google/protobuf/struct.proto";
service Isolate {
// Run the given function on the specified environment. Streams logs
// and the result originating from that function.
rpc Run (RunRequest) returns (stream PartialRunResult) {}
rpc Run (BoundFunction) returns (stream PartialRunResult) {}

rpc RunFunction (RunRequest) returns (stream PartialRunResult) {}

// Submit a function to be run without waiting for results.
rpc Submit (SubmitRequest) returns (SubmitResponse) {}
Expand Down
4 changes: 2 additions & 2 deletions src/isolate/server/definitions/server_pb2.py

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

43 changes: 43 additions & 0 deletions src/isolate/server/definitions/server_pb2_grpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,11 @@ def __init__(self, channel):
"""
self.Run = channel.unary_stream(
'/Isolate/Run',
request_serializer=server__pb2.BoundFunction.SerializeToString,
response_deserializer=common__pb2.PartialRunResult.FromString,
_registered_method=True)
self.RunFunction = channel.unary_stream(
'/Isolate/RunFunction',
request_serializer=server__pb2.RunRequest.SerializeToString,
response_deserializer=common__pb2.PartialRunResult.FromString,
_registered_method=True)
Expand Down Expand Up @@ -78,6 +83,12 @@ def Run(self, request, context):
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')

def RunFunction(self, request, context):
"""Missing associated documentation comment in .proto file."""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')

def Submit(self, request, context):
"""Submit a function to be run without waiting for results.
"""
Expand Down Expand Up @@ -111,6 +122,11 @@ def add_IsolateServicer_to_server(servicer, server):
rpc_method_handlers = {
'Run': grpc.unary_stream_rpc_method_handler(
servicer.Run,
request_deserializer=server__pb2.BoundFunction.FromString,
response_serializer=common__pb2.PartialRunResult.SerializeToString,
),
'RunFunction': grpc.unary_stream_rpc_method_handler(
servicer.RunFunction,
request_deserializer=server__pb2.RunRequest.FromString,
response_serializer=common__pb2.PartialRunResult.SerializeToString,
),
Expand Down Expand Up @@ -160,6 +176,33 @@ def Run(request,
request,
target,
'/Isolate/Run',
server__pb2.BoundFunction.SerializeToString,
common__pb2.PartialRunResult.FromString,
options,
channel_credentials,
insecure,
call_credentials,
compression,
wait_for_ready,
timeout,
metadata,
_registered_method=True)

@staticmethod
def RunFunction(request,
target,
options=(),
channel_credentials=None,
call_credentials=None,
insecure=False,
compression=None,
wait_for_ready=None,
timeout=None,
metadata=None):
return grpc.experimental.unary_stream(
request,
target,
'/Isolate/RunFunction',
server__pb2.RunRequest.SerializeToString,
common__pb2.PartialRunResult.FromString,
options,
Expand Down
37 changes: 25 additions & 12 deletions src/isolate/server/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -363,24 +363,37 @@ def set_metadata(self, task: RunTask, metadata: definitions.TaskMetadata) -> Non
# Stream_logs defaults to False if not set
task.stream_logs = metadata.stream_logs

def Run(
def RunFunction(
self,
request: definitions.RunRequest,
context: ServicerContext,
) -> Iterator[definitions.PartialRunResult]:
try:
if isinstance(request, definitions.RunRequest):
task = RunTask(request=request.function)
self.set_metadata(task, request.metadata)
elif isinstance(request, definitions.BoundFunction):
task = RunTask(request=request)
else:
raise GRPCException(
"Invalid request type.",
StatusCode.INVALID_ARGUMENT,
)
task = RunTask(request=request.function)
self.set_metadata(task, request.metadata)

# HACK: we can support only one task at a time
# TODO: move away from this when we use submit for env-aware tasks
self.background_tasks["RUN"] = task
yield from self._run_task(task)
except GRPCException as exc:
return self.abort_with_msg(
exc.message,
context,
code=exc.code,
)
finally:
self.background_tasks.pop("RUN", None)

def Run(
self,
request: definitions.BoundFunction,
context: ServicerContext,
) -> Iterator[definitions.PartialRunResult]:
try:
task = RunTask(request=request)

# HACK: we can support only one task at a time for Run
# HACK: we can support only one task at a time
# TODO: move away from this when we use submit for env-aware tasks
self.background_tasks["RUN"] = task
yield from self._run_task(task)
Expand Down
9 changes: 8 additions & 1 deletion tests/test_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,14 @@ def run_request(
}

return_value = _NOT_SET
for result in stub.Run(request):
if isinstance(request, definitions.BoundFunction):
func = stub.Run
elif isinstance(request, definitions.RunRequest):
func = stub.RunFunction
else:
raise ValueError(f"Unknown request type: {type(request)}")

for result in func(request):
for _log in result.logs:
log = from_grpc(_log)
log_store[log.source].append(log)
Expand Down

0 comments on commit a2fc4ec

Please sign in to comment.