Skip to content

Commit

Permalink
feat(ingestion): Adds more advanced configurations for runtime debugg…
Browse files Browse the repository at this point in the history
  • Loading branch information
pedro93 authored Oct 21, 2023
1 parent 63599c9 commit 86e0023
Show file tree
Hide file tree
Showing 15 changed files with 267 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import com.linkedin.datahub.graphql.generated.IngestionConfig;
import com.linkedin.datahub.graphql.generated.IngestionSchedule;
import com.linkedin.datahub.graphql.generated.IngestionSource;
import com.linkedin.datahub.graphql.generated.StringMapEntry;
import com.linkedin.datahub.graphql.generated.StructuredReport;
import com.linkedin.datahub.graphql.types.common.mappers.StringMapMapper;
import com.linkedin.entity.EntityResponse;
Expand All @@ -21,6 +22,7 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;


Expand Down Expand Up @@ -143,6 +145,14 @@ public static IngestionConfig mapIngestionSourceConfig(final DataHubIngestionSou
result.setVersion(config.getVersion());
result.setExecutorId(config.getExecutorId());
result.setDebugMode(config.isDebugMode());
if (config.getExtraArgs() != null) {
List<StringMapEntry> extraArgs = config.getExtraArgs()
.keySet()
.stream()
.map(key -> new StringMapEntry(key, config.getExtraArgs().get(key)))
.collect(Collectors.toList());
result.setExtraArgs(extraArgs);
}
return result;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,9 @@ public CompletableFuture<String> get(final DataFetchingEnvironment environment)
if (ingestionSourceInfo.getConfig().hasDebugMode()) {
debugMode = ingestionSourceInfo.getConfig().isDebugMode() ? "true" : "false";
}
if (ingestionSourceInfo.getConfig().hasExtraArgs()) {
arguments.putAll(ingestionSourceInfo.getConfig().getExtraArgs());
}
arguments.put(DEBUG_MODE_ARG_NAME, debugMode);
execInput.setArgs(new StringMap(arguments));

Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
package com.linkedin.datahub.graphql.resolvers.ingest.source;

import com.linkedin.common.urn.Urn;
import com.linkedin.data.template.StringMap;
import com.linkedin.datahub.graphql.QueryContext;
import com.linkedin.datahub.graphql.exception.AuthorizationException;
import com.linkedin.datahub.graphql.exception.DataHubGraphQLErrorCode;
import com.linkedin.datahub.graphql.exception.DataHubGraphQLException;
import com.linkedin.datahub.graphql.generated.StringMapEntryInput;
import com.linkedin.datahub.graphql.generated.UpdateIngestionSourceConfigInput;
import com.linkedin.datahub.graphql.generated.UpdateIngestionSourceInput;
import com.linkedin.datahub.graphql.generated.UpdateIngestionSourceScheduleInput;
Expand All @@ -17,6 +19,8 @@
import com.linkedin.mxe.MetadataChangeProposal;
import graphql.schema.DataFetcher;
import graphql.schema.DataFetchingEnvironment;
import java.util.Map;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;

import java.net.URISyntaxException;
Expand Down Expand Up @@ -108,6 +112,12 @@ private DataHubIngestionSourceConfig mapConfig(final UpdateIngestionSourceConfig
if (input.getDebugMode() != null) {
result.setDebugMode(input.getDebugMode());
}
if (input.getExtraArgs() != null) {
Map<String, String> extraArgs = input.getExtraArgs()
.stream()
.collect(Collectors.toMap(StringMapEntryInput::getKey, StringMapEntryInput::getValue));
result.setExtraArgs(new StringMap(extraArgs));
}
return result;
}

Expand Down
10 changes: 10 additions & 0 deletions datahub-graphql-core/src/main/resources/ingestion.graphql
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,11 @@ type IngestionConfig {
Advanced: Whether or not to run ingestion in debug mode
"""
debugMode: Boolean

"""
Advanced: Extra arguments for the ingestion run.
"""
extraArgs: [StringMapEntry!]
}

"""
Expand Down Expand Up @@ -483,6 +488,11 @@ input UpdateIngestionSourceConfigInput {
Whether or not to run ingestion in debug mode
"""
debugMode: Boolean

"""
Extra arguments for the ingestion run.
"""
extraArgs: [StringMapEntryInput!]
}

"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public class UpsertIngestionSourceResolverTest {
"Test source",
"mysql", "Test source description",
new UpdateIngestionSourceScheduleInput("* * * * *", "UTC"),
new UpdateIngestionSourceConfigInput("my test recipe", "0.8.18", "executor id", false)
new UpdateIngestionSourceConfigInput("my test recipe", "0.8.18", "executor id", false, null)
);

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import { Message } from '../../shared/Message';
import TabToolbar from '../../entity/shared/components/styled/TabToolbar';
import { IngestionSourceBuilderModal } from './builder/IngestionSourceBuilderModal';
import { addToListIngestionSourcesCache, CLI_EXECUTOR_ID, removeFromListIngestionSourcesCache } from './utils';
import { DEFAULT_EXECUTOR_ID, SourceBuilderState } from './builder/types';
import { DEFAULT_EXECUTOR_ID, SourceBuilderState, StringMapEntryInput } from './builder/types';
import { IngestionSource, UpdateIngestionSourceInput } from '../../../types.generated';
import { SearchBar } from '../../search/SearchBar';
import { useEntityRegistry } from '../../useEntityRegistry';
Expand Down Expand Up @@ -173,6 +173,11 @@ export const IngestionSourceList = () => {
setFocusSourceUrn(undefined);
};

const formatExtraArgs = (extraArgs): StringMapEntryInput[] => {
if (extraArgs === null || extraArgs === undefined) return [];
return extraArgs.map((entry) => ({ key: entry.key, value: entry.value }));
};

const createOrUpdateIngestionSource = (
input: UpdateIngestionSourceInput,
resetState: () => void,
Expand Down Expand Up @@ -294,6 +299,7 @@ export const IngestionSourceList = () => {
(recipeBuilderState.config?.executorId as string)) ||
DEFAULT_EXECUTOR_ID,
debugMode: recipeBuilderState.config?.debugMode || false,
extraArgs: formatExtraArgs(recipeBuilderState.config?.extraArgs || []),
},
schedule: recipeBuilderState.schedule && {
interval: recipeBuilderState.schedule?.interval as string,
Expand Down
123 changes: 122 additions & 1 deletion datahub-web-react/src/app/ingest/source/builder/NameSourceStep.tsx
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { Button, Checkbox, Collapse, Form, Input, Typography } from 'antd';
import React from 'react';
import styled from 'styled-components';
import { SourceBuilderState, StepProps } from './types';
import { SourceBuilderState, StepProps, StringMapEntryInput } from './types';

const ControlsContainer = styled.div`
display: flex;
Expand All @@ -13,6 +13,10 @@ const SaveButton = styled(Button)`
margin-right: 15px;
`;

const ExtraEnvKey = 'extra_env_vars';
const ExtraReqKey = 'extra_pip_requirements';
const ExtraPluginKey = 'extra_pip_plugins';

export const NameSourceStep = ({ state, updateState, prev, submit }: StepProps) => {
const setName = (stagedName: string) => {
const newState: SourceBuilderState = {
Expand Down Expand Up @@ -55,6 +59,90 @@ export const NameSourceStep = ({ state, updateState, prev, submit }: StepProps)
updateState(newState);
};

const retrieveExtraEnvs = () => {
const extraArgs: StringMapEntryInput[] = state.config?.extraArgs ? state.config?.extraArgs : [];
const index: number = extraArgs.findIndex((entry) => entry.key === ExtraEnvKey) as number;
if (index > -1) {
return extraArgs[index].value;
}
return '';
};

const setExtraEnvs = (envs: string) => {
let extraArgs: StringMapEntryInput[] = state.config?.extraArgs ? state.config?.extraArgs : [];
const indxOfEnvVars: number = extraArgs.findIndex((entry) => entry.key === ExtraEnvKey) as number;
const value = { key: ExtraEnvKey, value: envs };
if (indxOfEnvVars > -1) {
extraArgs[indxOfEnvVars] = value;
} else {
extraArgs = [...extraArgs, value];
}
const newState: SourceBuilderState = {
...state,
config: {
...state.config,
extraArgs,
},
};
updateState(newState);
};

const retrieveExtraDataHubPlugins = () => {
const extraArgs: StringMapEntryInput[] = state.config?.extraArgs ? state.config?.extraArgs : [];
const index: number = extraArgs.findIndex((entry) => entry.key === ExtraPluginKey) as number;
if (index > -1) {
return extraArgs[index].value;
}
return '';
};

const setExtraDataHubPlugins = (plugins: string) => {
let extraArgs: StringMapEntryInput[] = state.config?.extraArgs ? state.config?.extraArgs : [];
const indxOfPlugins: number = extraArgs.findIndex((entry) => entry.key === ExtraPluginKey) as number;
const value = { key: ExtraPluginKey, value: plugins };
if (indxOfPlugins > -1) {
extraArgs[indxOfPlugins] = value;
} else {
extraArgs = [...extraArgs, value];
}
const newState: SourceBuilderState = {
...state,
config: {
...state.config,
extraArgs,
},
};
updateState(newState);
};

const retrieveExtraReqs = () => {
const extraArgs: StringMapEntryInput[] = state.config?.extraArgs ? state.config?.extraArgs : [];
const index: number = extraArgs.findIndex((entry) => entry.key === ExtraReqKey) as number;
if (index > -1) {
return extraArgs[index].value;
}
return '';
};

const setExtraReqs = (reqs: string) => {
let extraArgs: StringMapEntryInput[] = state.config?.extraArgs ? state.config?.extraArgs : [];
const indxOfReqs: number = extraArgs.findIndex((entry) => entry.key === ExtraReqKey) as number;
const value = { key: ExtraReqKey, value: reqs };
if (indxOfReqs > -1) {
extraArgs[indxOfReqs] = value;
} else {
extraArgs = [...extraArgs, value];
}
const newState: SourceBuilderState = {
...state,
config: {
...state.config,
extraArgs,
},
};
updateState(newState);
};

const onClickCreate = (shouldRun?: boolean) => {
if (state.name !== undefined && state.name.length > 0) {
submit(shouldRun);
Expand Down Expand Up @@ -116,6 +204,39 @@ export const NameSourceStep = ({ state, updateState, prev, submit }: StepProps)
onChange={(event) => setDebugMode(event.target.checked)}
/>
</Form.Item>
<Form.Item label={<Typography.Text strong>Extra Enviroment Variables</Typography.Text>}>
<Typography.Paragraph>
Advanced: Set extra environment variables to an ingestion execution
</Typography.Paragraph>
<Input
data-testid="extra-args-input"
placeholder='{"MY_CUSTOM_ENV": "my_custom_value2"}'
value={retrieveExtraEnvs()}
onChange={(event) => setExtraEnvs(event.target.value)}
/>
</Form.Item>
<Form.Item label={<Typography.Text strong>Extra DataHub plugins</Typography.Text>}>
<Typography.Paragraph>
Advanced: Set extra DataHub plugins for an ingestion execution
</Typography.Paragraph>
<Input
data-testid="extra-pip-plugin-input"
placeholder='["debug"]'
value={retrieveExtraDataHubPlugins()}
onChange={(event) => setExtraDataHubPlugins(event.target.value)}
/>
</Form.Item>
<Form.Item label={<Typography.Text strong>Extra Pip Libraries</Typography.Text>}>
<Typography.Paragraph>
Advanced: Add extra pip libraries for an ingestion execution
</Typography.Paragraph>
<Input
data-testid="extra-pip-reqs-input"
placeholder='["sqlparse==0.4.3"]'
value={retrieveExtraReqs()}
onChange={(event) => setExtraReqs(event.target.value)}
/>
</Form.Item>
</Collapse.Panel>
</Collapse>
</Form>
Expand Down
17 changes: 17 additions & 0 deletions datahub-web-react/src/app/ingest/source/builder/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,18 @@ export type StepProps = {
ingestionSources: SourceConfig[];
};

export type StringMapEntryInput = {
/**
* The key of the map entry
*/
key: string;

/**
* The value fo the map entry
*/
value: string;
};

/**
* The object represents the state of the Ingestion Source Builder form.
*/
Expand Down Expand Up @@ -91,5 +103,10 @@ export interface SourceBuilderState {
* Advanced: Whether or not to run this ingestion source in debug mode
*/
debugMode?: boolean | null;

/**
* Advanced: Extra arguments for the ingestion run.
*/
extraArgs?: StringMapEntryInput[] | null;
};
}
8 changes: 8 additions & 0 deletions datahub-web-react/src/graphql/ingestion.graphql
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@ query listIngestionSources($input: ListIngestionSourcesInput!) {
version
executorId
debugMode
extraArgs {
key
value
}
}
schedule {
interval
Expand Down Expand Up @@ -51,6 +55,10 @@ query getIngestionSource($urn: String!, $runStart: Int, $runCount: Int) {
version
executorId
debugMode
extraArgs {
key
value
}
}
schedule {
interval
Expand Down
12 changes: 11 additions & 1 deletion docker/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -97,10 +97,20 @@ task quickstartDebug(type: Exec, dependsOn: ':metadata-ingestion:install') {
dependsOn(debug_modules.collect { it + ':dockerTagDebug' })
shouldRunAfter ':metadata-ingestion:clean', 'quickstartNuke'

environment "DATAHUB_PRECREATE_TOPICS", "true"
environment "DATAHUB_TELEMETRY_ENABLED", "false"
environment "DOCKER_COMPOSE_BASE", "file://${rootProject.projectDir}"

// Elastic
// environment "DATAHUB_SEARCH_IMAGE", 'elasticsearch'
// environment "DATAHUB_SEARCH_TAG", '7.10.1'

// OpenSearch
environment "DATAHUB_SEARCH_IMAGE", 'opensearchproject/opensearch'
environment "DATAHUB_SEARCH_TAG", '2.9.0'
environment "XPACK_SECURITY_ENABLED", 'plugins.security.disabled=true'
environment "USE_AWS_ELASTICSEARCH", 'true'


def cmd = [
'source ../metadata-ingestion/venv/bin/activate && ',
'datahub docker quickstart',
Expand Down
Loading

0 comments on commit 86e0023

Please sign in to comment.