Skip to content

Commit

Permalink
[Manual backport 2.x] Add support for async ppl to discover (opensear…
Browse files Browse the repository at this point in the history
…ch-project#8706)

Backport PR:
opensearch-project#8706

From original PR:
* add support for async ppl to discover

Signed-off-by: Shenoy Pratik <[email protected]>

* Changeset file for PR opensearch-project#8706 created/updated

* update s3_type test to add PPL as supported lang

Signed-off-by: Shenoy Pratik <[email protected]>

* fix lint error

Signed-off-by: Shenoy Pratik <[email protected]>

---------

Signed-off-by: Shenoy Pratik <[email protected]>
Co-authored-by: opensearch-changeset-bot[bot] <154024398+opensearch-changeset-bot[bot]@users.noreply.github.com>
  • Loading branch information
2 people authored and ananzh committed Oct 30, 2024
1 parent 25c3ac3 commit 5a12276
Show file tree
Hide file tree
Showing 11 changed files with 139 additions and 18 deletions.
2 changes: 2 additions & 0 deletions changelogs/fragments/8706.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
feat:
- Add support for async ppl to discover ([#8706](https://github.com/opensearch-project/OpenSearch-Dashboards/pull/8706))
2 changes: 2 additions & 0 deletions src/plugins/query_enhancements/common/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,15 @@ export const SEARCH_STRATEGY = {
PPL_RAW: 'pplraw',
SQL: 'sql',
SQL_ASYNC: 'sqlasync',
PPL_ASYNC: 'pplasync',
};

export const API = {
SEARCH: `${BASE_API}/search`,
PPL_SEARCH: `${BASE_API}/search/${SEARCH_STRATEGY.PPL}`,
SQL_SEARCH: `${BASE_API}/search/${SEARCH_STRATEGY.SQL}`,
SQL_ASYNC_SEARCH: `${BASE_API}/search/${SEARCH_STRATEGY.SQL_ASYNC}`,
PPL_ASYNC_SEARCH: `${BASE_API}/search/${SEARCH_STRATEGY.PPL_ASYNC}`,
QUERY_ASSIST: {
LANGUAGES: `${BASE_API}/assist/languages`,
GENERATE: `${BASE_API}/assist/generate`,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,9 +219,9 @@ describe('s3TypeConfig', () => {
expect(result[3].type).toBe('number');
});

test('supportedLanguages returns SQL', () => {
test('supportedLanguages returns SQL, PPL', () => {
const mockDataset: Dataset = { id: 'table1', title: 'Table 1', type: 'S3' };
expect(s3TypeConfig.supportedLanguages(mockDataset)).toEqual(['SQL']);
expect(s3TypeConfig.supportedLanguages(mockDataset)).toEqual(['SQL', 'PPL']);
});

describe('castS3FieldTypeToOSDFieldType()', () => {
Expand Down
4 changes: 2 additions & 2 deletions src/plugins/query_enhancements/public/datasets/s3_type.ts
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ export const s3TypeConfig: DatasetTypeConfig = {
},

supportedLanguages: (dataset: Dataset): string[] => {
return ['SQL'];
return ['SQL', 'PPL'];
},

getSampleQueries: (dataset: Dataset, language: string) => {
Expand All @@ -129,7 +129,7 @@ export const s3TypeConfig: DatasetTypeConfig = {
title: i18n.translate('queryEnhancements.s3Type.sampleQuery.basicPPLQuery', {
defaultMessage: 'Sample query for PPL',
}),
query: `source = ${dataset.title}`,
query: `source = ${dataset.title} | head 10`,
},
];
case 'SQL':
Expand Down
19 changes: 12 additions & 7 deletions src/plugins/query_enhancements/public/plugin.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,26 @@
*/
import { i18n } from '@osd/i18n';
import { CoreSetup, CoreStart, Plugin, PluginInitializerContext } from '../../../core/public';
import { DataStorage } from '../../data/common';
import {
createEditor,
DefaultInput,
LanguageConfig,
Query,
SingleLineInput,
} from '../../data/public';
import { ConfigSchema } from '../common/config';
import { setData, setStorage } from './services';
import { s3TypeConfig } from './datasets';
import { createQueryAssistExtension } from './query_assist';
import { pplLanguageReference, sqlLanguageReference } from './query_editor_extensions';
import { PPLSearchInterceptor, SQLSearchInterceptor } from './search';
import { setData, setStorage } from './services';
import {
QueryEnhancementsPluginSetup,
QueryEnhancementsPluginSetupDependencies,
QueryEnhancementsPluginStart,
QueryEnhancementsPluginStartDependencies,
} from './types';
import { LanguageConfig, Query } from '../../data/public';
import { s3TypeConfig } from './datasets';
import { createEditor, DefaultInput, SingleLineInput } from '../../data/public';
import { DataStorage } from '../../data/common';
import { pplLanguageReference, sqlLanguageReference } from './query_editor_extensions';

export class QueryEnhancementsPlugin
implements
Expand Down Expand Up @@ -57,7 +62,7 @@ export class QueryEnhancementsPlugin
startServices: core.getStartServices(),
usageCollector: data.search.usageCollector,
}),
getQueryString: (currentQuery: Query) => `source = ${currentQuery.dataset?.title}`,
getQueryString: (currentQuery: Query) => `source = ${currentQuery.dataset?.title} | head 10`,
fields: { filterable: false, visualizable: false },
docLink: {
title: i18n.translate('queryEnhancements.pplLanguage.docLink', {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import {
} from '../../../data/public';
import {
API,
DATASET,
EnhancedFetchContext,
fetch,
formatDate,
Expand Down Expand Up @@ -60,7 +61,7 @@ export class PPLSearchInterceptor extends SearchInterceptor {
public search(request: IOpenSearchDashboardsSearchRequest, options: ISearchOptions) {
const dataset = this.queryService.queryString.getQuery().dataset;
const datasetType = dataset?.type;
let strategy = SEARCH_STRATEGY.PPL;
let strategy = datasetType === DATASET.S3 ? SEARCH_STRATEGY.PPL_ASYNC : SEARCH_STRATEGY.PPL;

if (datasetType) {
const datasetTypeConfig = this.queryService.queryString
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@
*/

import { trimEnd } from 'lodash';
import { CoreStart } from 'opensearch-dashboards/public';
import { Observable, throwError } from 'rxjs';
import { catchError } from 'rxjs/operators';
import { CoreStart } from 'opensearch-dashboards/public';
import {
DataPublicPluginStart,
IOpenSearchDashboardsSearchRequest,
Expand Down
12 changes: 10 additions & 2 deletions src/plugins/query_enhancements/server/plugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,11 @@ import { SEARCH_STRATEGY } from '../common';
import { ConfigSchema } from '../common/config';
import { defineRoutes, defineSearchStrategyRouteProvider } from './routes';
import {
pplSearchStrategyProvider,
pplAsyncSearchStrategyProvider,
pplRawSearchStrategyProvider,
sqlSearchStrategyProvider,
pplSearchStrategyProvider,
sqlAsyncSearchStrategyProvider,
sqlSearchStrategyProvider,
} from './search';
import {
QueryEnhancementsPluginSetup,
Expand Down Expand Up @@ -58,11 +59,17 @@ export class QueryEnhancementsPlugin
this.logger,
client
);
const pplAsyncSearchStrategy = pplAsyncSearchStrategyProvider(
this.config$,
this.logger,
client
);

data.search.registerSearchStrategy(SEARCH_STRATEGY.PPL, pplSearchStrategy);
data.search.registerSearchStrategy(SEARCH_STRATEGY.PPL_RAW, pplRawSearchStrategy);
data.search.registerSearchStrategy(SEARCH_STRATEGY.SQL, sqlSearchStrategy);
data.search.registerSearchStrategy(SEARCH_STRATEGY.SQL_ASYNC, sqlAsyncSearchStrategy);
data.search.registerSearchStrategy(SEARCH_STRATEGY.PPL_ASYNC, pplAsyncSearchStrategy);

core.http.registerRouteHandlerContext('query_assist', () => ({
logger: this.logger,
Expand All @@ -86,6 +93,7 @@ export class QueryEnhancementsPlugin
ppl: pplSearchStrategy,
sql: sqlSearchStrategy,
sqlasync: sqlAsyncSearchStrategy,
pplasync: pplAsyncSearchStrategy,
});

this.logger.info('queryEnhancements: Setup complete');
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
*/

import { schema } from '@osd/config-schema';
import { IRouter, ILegacyClusterClient } from 'opensearch-dashboards/server';
import { ILegacyClusterClient, IRouter } from 'opensearch-dashboards/server';
import { API } from '../../../common';

export function registerDataSourceConnectionsRoutes(
Expand Down
5 changes: 3 additions & 2 deletions src/plugins/query_enhancements/server/search/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
* SPDX-License-Identifier: Apache-2.0
*/

export { pplSearchStrategyProvider } from './ppl_search_strategy';
export { pplAsyncSearchStrategyProvider } from './ppl_async_search_strategy';
export { pplRawSearchStrategyProvider } from './ppl_raw_search_strategy';
export { sqlSearchStrategyProvider } from './sql_search_strategy';
export { pplSearchStrategyProvider } from './ppl_search_strategy';
export { sqlAsyncSearchStrategyProvider } from './sql_async_search_strategy';
export { sqlSearchStrategyProvider } from './sql_search_strategy';
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

import { ILegacyClusterClient, Logger, SharedGlobalConfig } from 'opensearch-dashboards/server';
import { Observable } from 'rxjs';
import {
createDataFrame,
DATA_FRAME_TYPES,
IDataFrameResponse,
IOpenSearchDashboardsSearchRequest,
Query,
} from '../../../data/common';
import { ISearchStrategy, SearchUsage } from '../../../data/server';
import { buildQueryStatusConfig, getFields, handleFacetError, SEARCH_STRATEGY } from '../../common';
import { Facet } from '../utils';

export const pplAsyncSearchStrategyProvider = (
config$: Observable<SharedGlobalConfig>,
logger: Logger,
client: ILegacyClusterClient,
usage?: SearchUsage
): ISearchStrategy<IOpenSearchDashboardsSearchRequest, IDataFrameResponse> => {
const pplAsyncFacet = new Facet({
client,
logger,
endpoint: 'enhancements.runDirectQuery',
});
const pplAsyncJobsFacet = new Facet({
client,
logger,
endpoint: 'enhancements.getJobStatus',
useJobs: true,
});

return {
search: async (context, request: any, options) => {
try {
const query: Query = request.body.query;
const pollQueryResultsParams = request.body.pollQueryResultsParams;
const inProgressQueryId = pollQueryResultsParams?.queryId;

if (!inProgressQueryId) {
request.body = { ...request.body, lang: SEARCH_STRATEGY.PPL };
const rawResponse: any = await pplAsyncFacet.describeQuery(context, request);

if (!rawResponse.success) handleFacetError(rawResponse);

const statusConfig = buildQueryStatusConfig(rawResponse);

return {
type: DATA_FRAME_TYPES.POLLING,
status: 'started',
body: {
queryStatusConfig: statusConfig,
},
} as IDataFrameResponse;
} else {
request.params = { queryId: inProgressQueryId };
const queryStatusResponse: any = await pplAsyncJobsFacet.describeQuery(context, request);
const queryStatus = queryStatusResponse?.data?.status;
logger.info(`pplAsyncSearchStrategy: JOB: ${inProgressQueryId} - STATUS: ${queryStatus}`);

if (queryStatus?.toUpperCase() === 'SUCCESS') {
const dataFrame = createDataFrame({
name: query.dataset?.id,
schema: queryStatusResponse.data.schema,
meta: { ...pollQueryResultsParams },
fields: getFields(queryStatusResponse),
});

dataFrame.size = queryStatusResponse.data.datarows.length;

return {
type: DATA_FRAME_TYPES.POLLING,
status: 'success',
body: dataFrame,
} as IDataFrameResponse;
} else if (queryStatus?.toUpperCase() === 'FAILED') {
return {
type: DATA_FRAME_TYPES.POLLING,
status: 'failed',
body: {
error: `JOB: ${inProgressQueryId} failed: ${queryStatusResponse.data.error}`,
},
} as IDataFrameResponse;
}

return {
type: DATA_FRAME_TYPES.POLLING,
status: queryStatus,
} as IDataFrameResponse;
}
} catch (e: any) {
logger.error(`pplAsyncSearchStrategy: ${e.message}`);
if (usage) usage.trackError();
throw e;
}
},
};
};

0 comments on commit 5a12276

Please sign in to comment.