Skip to content

Commit

Permalink
feat: Add public API prototype, add grpc-web
Browse files Browse the repository at this point in the history
  • Loading branch information
ppodolsky committed Oct 23, 2023
1 parent 3631321 commit e3cc49a
Show file tree
Hide file tree
Showing 38 changed files with 348 additions and 116 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ serde_cbor = "0.11"
serde_json = { version = "1.0" }
serde_yaml = { version = "0.8" }
strfmt = "0.2"
summa-proto = { version = "0.34.0", path = "./summa-proto", default_features = false }
summa-proto = { version = "0.35.0", path = "./summa-proto", default_features = false }
take_mut = "0.2"
tantivy = { package = "izihawa-tantivy", version = "0.21.1", default_features = false, features = ["quickwit", "zstd-compression"] }
tantivy-common = { package = "izihawa-tantivy-common", version = "0.6.0" }
Expand Down
8 changes: 6 additions & 2 deletions summa-core/src/components/fruit_extractors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ pub fn build_fruit_extractor(
) -> SummaResult<Box<dyn FruitExtractor>> {
match collector_proto.collector {
Some(proto::collector::Collector::TopDocs(top_docs_collector_proto)) => {
let query_fields = validators::parse_fields(searcher.schema(), &top_docs_collector_proto.fields)?;
let query_fields = validators::parse_fields(searcher.schema(), &top_docs_collector_proto.fields, &top_docs_collector_proto.removed_fields)?;
let query_fields = (!query_fields.is_empty()).then(|| HashSet::from_iter(query_fields.into_iter().map(|x| x.0)));
Ok(match top_docs_collector_proto.scorer {
None | Some(proto::Scorer { scorer: None }) => Box::new(
Expand Down Expand Up @@ -164,7 +164,11 @@ pub fn build_fruit_extractor(
})
}
Some(proto::collector::Collector::ReservoirSampling(reservoir_sampling_collector_proto)) => {
let query_fields = validators::parse_fields(searcher.schema(), &reservoir_sampling_collector_proto.fields)?;
let query_fields = validators::parse_fields(
searcher.schema(),
&reservoir_sampling_collector_proto.fields,
&reservoir_sampling_collector_proto.removed_fields,
)?;
let query_fields = (!query_fields.is_empty()).then(|| HashSet::from_iter(query_fields.into_iter().map(|x| x.0)));
let reservoir_sampling_collector = collectors::ReservoirSampling::with_limit(reservoir_sampling_collector_proto.limit as usize);
Ok(Box::new(
Expand Down
2 changes: 1 addition & 1 deletion summa-core/src/components/query_parser/summa_ql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ impl QueryParser {
morphology_manager: &MorphologyManager,
tokenizer_manager: &TokenizerManager,
) -> SummaResult<QueryParser> {
validators::parse_fields(&schema, &query_parser_config.0.default_fields)?;
validators::parse_fields(&schema, &query_parser_config.0.default_fields, &[])?;
Ok(QueryParser {
term_field_mappers_manager: TermFieldMappersManager::new(&schema, tokenizer_manager),
morphology_manager: morphology_manager.clone(),
Expand Down
26 changes: 21 additions & 5 deletions summa-core/src/validators.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,25 @@ pub fn parse_schema(schema: &str) -> SummaResult<Schema> {
serde_yaml::from_str(schema).map_err(|_| Error::Validation(Box::new(ValidationError::InvalidSchema(schema.to_owned()))))
}

pub fn parse_fields<'a>(schema: &'a Schema, fields: &'a [String]) -> SummaResult<Vec<(Field, &'a str)>> {
Ok(fields
.iter()
.map(|f| schema.find_field(f).ok_or_else(|| ValidationError::MissingField(f.to_string())))
.collect::<Result<_, _>>()?)
pub fn parse_fields<'a>(schema: &'a Schema, fields: &'a [String], removed_fields: &'a [String]) -> SummaResult<Vec<(Field, &'a str)>> {
if removed_fields.is_empty() {
Ok(fields
.iter()
.map(|f| schema.find_field(f).ok_or_else(|| ValidationError::MissingField(f.to_string())))
.collect::<Result<_, _>>()?)
} else if fields.is_empty() {
Ok(schema
.fields()
.map(|(_, field_entry)| {
schema
.find_field(field_entry.name())
.ok_or_else(|| ValidationError::MissingField(field_entry.name().to_string()))
})
.collect::<Result<_, _>>()?)
} else {
Ok(fields
.iter()
.map(|f| schema.find_field(f).ok_or_else(|| ValidationError::MissingField(f.to_string())))
.collect::<Result<_, _>>()?)
}
}
2 changes: 1 addition & 1 deletion summa-proto/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "summa-proto"
version = "0.34.0"
version = "0.35.0"
authors = ["Pasha Podolsky <[email protected]>"]
edition = "2021"
license-file = "LICENSE"
Expand Down
1 change: 1 addition & 0 deletions summa-proto/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
"proto/dag_pb.proto",
"proto/index_service.proto",
"proto/query.proto",
"proto/public_service.proto",
"proto/reflection_service.proto",
"proto/search_service.proto",
"proto/unixfs.proto",
Expand Down
12 changes: 12 additions & 0 deletions summa-proto/proto/public_service.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
syntax = "proto3";
package summa.proto;

import "search_service.proto";
import "query.proto";


// Searches documents in the stored indices
service PublicApi {
// Make search in Summa
rpc search (SearchRequest) returns (SearchResponse) {}
}
2 changes: 2 additions & 0 deletions summa-proto/proto/query.proto
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,7 @@ message FacetCollectorOutput {
message ReservoirSamplingCollector {
uint32 limit = 1;
repeated string fields = 2;
repeated string removed_fields = 3;
}

message RandomDocument {
Expand All @@ -230,6 +231,7 @@ message TopDocsCollector {
map<string, uint32> snippet_configs = 4;
bool explain = 5;
repeated string fields = 6;
repeated string removed_fields = 7;
}

message DocumentsCollectorOutput {
Expand Down
2 changes: 2 additions & 0 deletions summa-proto/src/proto_traits/collector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ pub mod shortcuts {
snippet_configs: HashMap::new(),
explain: false,
fields: Vec::new(),
removed_fields: Vec::new(),
})),
}
}
Expand All @@ -57,6 +58,7 @@ pub mod shortcuts {
snippet_configs: HashMap::new(),
explain: false,
fields: Vec::new(),
removed_fields: Vec::new(),
})),
}
}
Expand Down
2 changes: 1 addition & 1 deletion summa-server/src/apis/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ impl proto::index_api_server::IndexApi for IndexApiImpl {
let schema = index_holder.schema().clone();
let multi_fields = index_holder.multi_fields().clone();

let query_fields = validators::parse_fields(searcher.schema(), &proto_request.fields).map_err(crate::errors::Error::from)?;
let query_fields = validators::parse_fields(searcher.schema(), &proto_request.fields, &[]).map_err(crate::errors::Error::from)?;
let query_fields = (!query_fields.is_empty()).then(|| HashSet::from_iter(query_fields.into_iter().map(|x| x.0)));

let documents_receiver = index_holder
Expand Down
1 change: 1 addition & 0 deletions summa-server/src/apis/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,6 @@
pub mod consumer;
pub mod index;
pub mod public;
pub mod reflection;
pub mod search;
43 changes: 43 additions & 0 deletions summa-server/src/apis/public.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
//! Public GRPC API
//!
//! Public GRPC API is using for querying indices but queries are restricted for making it safe to open endpoint in public
use std::time::Instant;

use summa_proto::proto;
use tonic::{Request, Response, Status};
use tracing::{info_span, Instrument};

use crate::errors::SummaServerResult;
use crate::services::Index;

pub struct PublicApiImpl {
index_service: Index,
}

impl PublicApiImpl {
pub fn new(index_service: &Index) -> SummaServerResult<PublicApiImpl> {
Ok(PublicApiImpl {
index_service: index_service.clone(),
})
}
}

#[tonic::async_trait]
impl proto::public_api_server::PublicApi for PublicApiImpl {
async fn search(&self, proto_request: Request<proto::SearchRequest>) -> Result<Response<proto::SearchResponse>, Status> {
let proto_request = proto_request.into_inner();
let now = Instant::now();
let collector_outputs = self
.index_service
.constrained_search(proto_request)
.instrument(info_span!("search"))
.await
.map_err(crate::errors::Error::from)?;
let elapsed_secs = now.elapsed().as_secs_f64();
Ok(Response::new(proto::SearchResponse {
collector_outputs,
elapsed_secs,
}))
}
}
10 changes: 9 additions & 1 deletion summa-server/src/services/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use futures_util::future::try_join_all;
use hyper::header::{HeaderName, HeaderValue};
use proto::consumer_api_server::ConsumerApiServer;
use proto::index_api_server::IndexApiServer;
use proto::public_api_server::PublicApiServer;
use proto::reflection_api_server::ReflectionApiServer;
use proto::search_api_server::SearchApiServer;
use summa_core::configs::ConfigProxy;
Expand All @@ -23,6 +24,7 @@ use tracing::{info, info_span, instrument, warn, Instrument, Span};

use crate::apis::consumer::ConsumerApiImpl;
use crate::apis::index::IndexApiImpl;
use crate::apis::public::PublicApiImpl;
use crate::apis::reflection::ReflectionApiImpl;
use crate::apis::search::SearchApiImpl;
use crate::errors::SummaServerResult;
Expand Down Expand Up @@ -77,6 +79,8 @@ impl Api {
let index_api = IndexApiImpl::new(&self.server_config_holder, &index_service)?;
let reflection_api = ReflectionApiImpl::new(&index_service)?;
let search_api = SearchApiImpl::new(&index_service)?;
let public_api = PublicApiImpl::new(&index_service)?;

let grpc_reflection_service = tonic_reflection::server::Builder::configure()
.include_reflection_service(false)
.register_encoded_file_descriptor_set(proto::FILE_DESCRIPTOR_SET)
Expand Down Expand Up @@ -121,6 +125,10 @@ impl Api {
.max_encoding_message_size(max_from_size_bytes as usize);
}

let public_service = PublicApiServer::new(public_api)
.accept_compressed(CompressionEncoding::Gzip)
.send_compressed(CompressionEncoding::Gzip);

let grpc_router = Server::builder()
.layer(layer)
.max_frame_size(api_config.max_frame_size_bytes.map(|x| x / 256))
Expand All @@ -145,7 +153,7 @@ impl Api {
}));

if let Some(http_endpoint) = api_config.http_endpoint {
let http_router = Server::builder().accept_http1(true).add_service(tonic_web::enable(search_service));
let http_router = Server::builder().accept_http1(true).add_service(tonic_web::enable(public_service));
let http_listener = Api::set_listener(&http_endpoint)?;
let mut http_terminator = terminator.clone();
futures.push(Box::new(async move {
Expand Down
43 changes: 40 additions & 3 deletions summa-server/src/services/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -292,9 +292,9 @@ impl Index {
let mut index_attributes = create_index_request.index_attributes.unwrap_or_default();
let query_parser_config = create_index_request.query_parser_config;
let default_fields = query_parser_config.as_ref().map(|q| q.default_fields.clone()).unwrap_or_default();
validators::parse_fields(&schema, &default_fields)?;
validators::parse_fields(&schema, &index_attributes.multi_fields)?;
validators::parse_fields(&schema, &index_attributes.unique_fields)?;
validators::parse_fields(&schema, &default_fields, &[])?;
validators::parse_fields(&schema, &index_attributes.multi_fields, &[])?;
validators::parse_fields(&schema, &index_attributes.unique_fields, &[])?;

index_attributes.created_at = SystemTime::now().duration_since(UNIX_EPOCH).expect("cannot retrieve time").as_secs();

Expand Down Expand Up @@ -647,6 +647,43 @@ impl Index {
Ok(self.index_registry.finalize_extraction(collector_outputs).await?)
}

/// Search documents
pub async fn constrained_search(&self, mut search_request: proto::SearchRequest) -> SummaServerResult<Vec<proto::CollectorOutput>> {
let index_holder = self.index_registry.get_index_holder(&search_request.index_alias).await?;
let query = search_request
.query
.and_then(|query| query.query)
.unwrap_or_else(|| proto::query::Query::All(proto::AllQuery {}));

for collector in &mut search_request.collectors {
match &mut collector.collector {
Some(proto::collector::Collector::TopDocs(top_docs)) => {
top_docs.limit = std::cmp::min(top_docs.limit, 10);
top_docs.offset = std::cmp::min(top_docs.offset, 100);
top_docs.removed_fields = vec!["content".to_string()];
}
Some(proto::collector::Collector::ReservoirSampling(reservoir_sampling)) => {
reservoir_sampling.limit = std::cmp::min(reservoir_sampling.limit, 10);
reservoir_sampling.removed_fields = vec!["content".to_string()];
}
Some(proto::collector::Collector::Count(_)) => {}
_ => panic!("Not allowed"),
}
}

let collector_outputs = index_holder
.custom_search(
&search_request.index_alias,
query,
search_request.collectors,
search_request.is_fieldnorms_scoring_enabled,
search_request.load_cache,
search_request.store_cache,
)
.await?;
Ok(self.index_registry.finalize_extraction(collector_outputs).await?)
}

/// Merge several segments into a single one
#[instrument(skip(self, merge_segments_request), fields(index_name = merge_segments_request.index_name))]
pub async fn merge_segments(&self, merge_segments_request: proto::MergeSegmentsRequest) -> SummaServerResult<Option<SegmentId>> {
Expand Down
2 changes: 1 addition & 1 deletion summa-wasm/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "summa-wasm"
version = "0.132.2"
version = "0.133.0"
authors = ["Pasha Podolsky <[email protected]>"]
edition = "2021"
license-file = "LICENSE"
Expand Down
3 changes: 1 addition & 2 deletions summa-wasm/build.sh
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
#!/usr/bin/env bash

npx protoc \
--ts_out src/grpc-web/ \
--ts_opt long_type_string \
--ts_out src/grpc-web \
--ts_opt use_proto_field_name \
--proto_path ../summa-proto/proto \
../summa-proto/proto/*.proto
Expand Down
10 changes: 10 additions & 0 deletions summa-wasm/crate/web_index_registry.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use std::sync::Arc;

use js_sys::Uint8Array;
use prost::Message;
use serde::Serialize;
use serde_wasm_bindgen::Serializer;
use summa_core::components::{IndexHolder, IndexRegistry, SummaDocument};
Expand Down Expand Up @@ -48,6 +50,14 @@ impl WrappedIndexRegistry {
Ok(self.search_internal(search_request).await.map_err(Error::from)?.serialize(&serializer)?)
}

/// Do pooled search
#[wasm_bindgen]
pub async fn search_by_binary_proto(&self, search_request: Uint8Array) -> Result<JsValue, JsValue> {
let search_request: proto::SearchRequest = proto::SearchRequest::decode(search_request.to_vec().as_slice()).expect("cannot decode proto");
let serializer = Serializer::new().serialize_maps_as_objects(true).serialize_large_number_types_as_bigints(true);
Ok(self.search_internal(search_request).await.map_err(Error::from)?.serialize(&serializer)?)
}

async fn search_internal(&self, search_request: proto::SearchRequest) -> SummaResult<Vec<proto::CollectorOutput>> {
info!(action = "search", search_request = ?search_request);
let index_holder = self.index_registry.get_index_holder(&search_request.index_alias).await?;
Expand Down
2 changes: 1 addition & 1 deletion summa-wasm/package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "summa-wasm",
"description": "WASM-bindings for Summa",
"version": "0.132.2",
"version": "0.133.0",
"keywords": [
"search",
"database",
Expand Down
2 changes: 1 addition & 1 deletion summa-wasm/src/grpc-web/consumer_service.client.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// @generated by protobuf-ts 2.9.1 with parameter long_type_string,use_proto_field_name
// @generated by protobuf-ts 2.9.1 with parameter use_proto_field_name
// @generated from protobuf file "consumer_service.proto" (package "summa.proto", syntax proto3)
// tslint:disable
import type { RpcTransport } from "@protobuf-ts/runtime-rpc";
Expand Down
2 changes: 1 addition & 1 deletion summa-wasm/src/grpc-web/consumer_service.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// @generated by protobuf-ts 2.9.1 with parameter long_type_string,use_proto_field_name
// @generated by protobuf-ts 2.9.1 with parameter use_proto_field_name
// @generated from protobuf file "consumer_service.proto" (package "summa.proto", syntax proto3)
// tslint:disable
import { ServiceType } from "@protobuf-ts/runtime-rpc";
Expand Down
8 changes: 4 additions & 4 deletions summa-wasm/src/grpc-web/dag_pb.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// @generated by protobuf-ts 2.9.1 with parameter long_type_string,use_proto_field_name
// @generated by protobuf-ts 2.9.1 with parameter use_proto_field_name
// @generated from protobuf file "dag_pb.proto" (package "dag_pb", syntax proto3)
// tslint:disable
import type { BinaryWriteOptions } from "@protobuf-ts/runtime";
Expand Down Expand Up @@ -32,7 +32,7 @@ export interface PBLink {
*
* @generated from protobuf field: optional uint64 t_size = 3;
*/
t_size?: string;
t_size?: bigint;
}
/**
* @generated from protobuf message dag_pb.PBNode
Expand All @@ -57,7 +57,7 @@ class PBLink$Type extends MessageType<PBLink> {
super("dag_pb.PBLink", [
{ no: 1, name: "hash", kind: "scalar", opt: true, T: 12 /*ScalarType.BYTES*/ },
{ no: 2, name: "name", kind: "scalar", opt: true, T: 9 /*ScalarType.STRING*/ },
{ no: 3, name: "t_size", kind: "scalar", localName: "t_size", opt: true, T: 4 /*ScalarType.UINT64*/ }
{ no: 3, name: "t_size", kind: "scalar", localName: "t_size", opt: true, T: 4 /*ScalarType.UINT64*/, L: 0 /*LongType.BIGINT*/ }
]);
}
create(value?: PartialMessage<PBLink>): PBLink {
Expand All @@ -79,7 +79,7 @@ class PBLink$Type extends MessageType<PBLink> {
message.name = reader.string();
break;
case /* optional uint64 t_size */ 3:
message.t_size = reader.uint64().toString();
message.t_size = reader.uint64().toBigInt();
break;
default:
let u = options.readUnknownField;
Expand Down
4 changes: 4 additions & 0 deletions summa-wasm/src/grpc-web/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
export * as index_service from "./index_service"
export * as query from "./query"
export * as search_service from "./search_service"
export * as search_service_client from "./search_service.client"
Loading

0 comments on commit e3cc49a

Please sign in to comment.