Skip to content

Commit

Permalink
feat: BigQuery data plane (#178)
Browse files Browse the repository at this point in the history
* feat: BigQuery data plane

- added DataPlaneBigQueryExtension
- pipeline with BigQueryDataSource and BigQueryDataSink
- e2e test with goccy/bigquery-emulator

* chore: DEPENDENCIES

* refactor: fix CodeQL findings in BigQuerySinkServiceImpl

* test: updated BigQuery e2e test with dataplane signaling and 2 failure tests

* chore: DEPENDENCIES

* test: BigQuery sink and source implementation unit tests

* refactor: consolidated BigQuery service classes into source and sink pipeline

* chore: DEPENDENCIES

* refactor: removed gson dependency from BigQuery sink

* chore: DEPENDENCIES

* fix: added error handling for BigQuery sink

* chore: DEPENDENCIES

* test: expanded unit tests for BigQuery source and sink, minor refactor / doc

* chore: DEPENDENCIES

* refactor: restored BigQuery sink exception test at the end of the transfer

* refactor: BigQuery data sink looping through pages, test extended

* test: added test for multi-page result in BigQuery sink

- refactor: use constants, removed unused methods

* refactor: PR review

* refactor: resolve review comments for BigQuery data plane

* refactor: lambda expression in BigQuery data source

* chore: DEPENDENCIES

* test: BigQuery data plane unit and system tests fixed

* fix: checkstyle indentation
  • Loading branch information
man8pr authored Jul 15, 2024
1 parent 8362eb7 commit 0c8a25e
Show file tree
Hide file tree
Showing 30 changed files with 3,337 additions and 59 deletions.
193 changes: 154 additions & 39 deletions DEPENDENCIES

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
* Copyright (c) 2024 Google LLC
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0
*
* SPDX-License-Identifier: Apache-2.0
*
* Contributors:
* Google LLC
*
*/

package org.eclipse.edc.gcp.bigquery;

import org.eclipse.edc.gcp.common.GcpConfiguration;
import org.eclipse.edc.util.configuration.ConfigurationFunctions;

/**
* Configuration class embedding the common GcpConfiguration and BigQuery-specific parameters.
*/
public class BigQueryConfiguration {
private GcpConfiguration gcpConfiguration;
private String restEndpoint;
private String rpcEndpoint;
private int threadPoolSize;


public BigQueryConfiguration(GcpConfiguration gcpConfiguration,
String restEndpoint, String rpcEndpoint, int threadPoolSize) {
this.gcpConfiguration = gcpConfiguration;
this.restEndpoint = restEndpoint;
this.rpcEndpoint = rpcEndpoint;
this.threadPoolSize = threadPoolSize;
}

public BigQueryConfiguration(GcpConfiguration gcpConfiguration) {
this(gcpConfiguration, null, null, 0);
}

/**
* The common GCP configuration.
*
* @return the GcpConfiguration object.
*/
public GcpConfiguration gcpConfiguration() {
return gcpConfiguration;
}

/**
* The BigQuery REST API endpoint host if defined in the EDC configuration or, if not
* found in the EDC configuration, as defined in the system properties "edc.gcp.bq.rest".
*
* @return the REST endpoint as http://host:port.
*/
public String restEndpoint() {
if (restEndpoint != null) {
return restEndpoint;
}

return ConfigurationFunctions.propOrEnv("edc.gcp.bq.rest", null);
}

/**
* The BigQuery Storage RPC API endpoint host if defined in the EDC configuration or, if not
* found in the EDC configuration, as defined in the system properties "edc.gcp.bq.rpc".
*
* @return the RPC endpoint as http://host:port.
*/
public String rpcEndpoint() {
if (rpcEndpoint != null) {
return rpcEndpoint;
}

return ConfigurationFunctions.propOrEnv("edc.gcp.bq.rpc", null);
}

/**
* The thread pool size used to prepare the ExecutorService for the BigQuery source.
*
* @return the number of threads in the pool.
*/
public int threadPoolSize() {
return threadPoolSize;
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ public String getTable() {
return getStringProperty(BigQueryServiceSchema.TABLE);
}

public String getDestinationTable() {
return getStringProperty(BigQueryServiceSchema.DESTINATION_TABLE);
}

public String getQuery() {
return getStringProperty(BigQueryServiceSchema.QUERY);
}
Expand Down Expand Up @@ -74,6 +78,11 @@ public Builder table(String table) {
return this;
}

public Builder destinationTable(String destinationTable) {
property(BigQueryServiceSchema.DESTINATION_TABLE, destinationTable);
return this;
}

public Builder query(String query) {
property(BigQueryServiceSchema.QUERY, query);
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,32 @@
import com.google.cloud.bigquery.TableId;
import com.google.cloud.bigquery.storage.v1.TableName;

/**
* Utility class for getting the TableId and the TableName from GCP project and BigQuery dataset
* and table.
*/
public record BigQueryTarget(String project, String dataset, String table) {
/**
* Returns the TableId corresponding to project / dataset / table, used a parameter to e.g.
* verify the existunce of a table in BigQuery API.
*/
public TableId getTableId() {
return TableId.of(project, dataset, table);
}

/**
* Returns the TableId corresponding to a destination table used when executing queries.
*
* @param tableName the name of the table used as destination for queries.
*/
public TableId getTableId(String tableName) {
return TableId.of(project, dataset, tableName);
}

/**
* Returns the TableName corresponding to project / dataset / table, used a parameter for the
* BigQuery Storage API (by the sink).
*/
public TableName getTableName() {
return TableName.of(project, dataset, table);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,19 @@
package org.eclipse.edc.gcp.bigquery.service;

import com.google.auth.oauth2.GoogleCredentials;
import com.google.cloud.NoCredentials;
import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryOptions;
import org.eclipse.edc.gcp.common.GcpConfiguration;
import org.eclipse.edc.gcp.bigquery.BigQueryConfiguration;
import org.eclipse.edc.gcp.common.GcpServiceAccount;
import org.eclipse.edc.gcp.iam.IamService;

public class BigQueryFactoryImpl implements BigQueryFactory {
private final GcpConfiguration gcpConfiguration;
private final BigQueryConfiguration configuration;
private final IamService iamService;

public BigQueryFactoryImpl(GcpConfiguration gcpConfiguration, IamService iamService) {
this.gcpConfiguration = gcpConfiguration;
public BigQueryFactoryImpl(BigQueryConfiguration configuration, IamService iamService) {
this.configuration = configuration;
this.iamService = iamService;
}

Expand All @@ -38,9 +39,15 @@ public BigQuery createBigQuery(GcpServiceAccount serviceAccount) {
}

private BigQuery createBigQuery(GoogleCredentials credentials) {
return BigQueryOptions.newBuilder()
.setProjectId(gcpConfiguration.projectId())
.setCredentials(credentials)
.build().getService();
var bqBuilder = BigQueryOptions.newBuilder();
var host = configuration.restEndpoint();
if (host != null) {
bqBuilder.setHost(host);
bqBuilder.setLocation(host);
bqBuilder.setCredentials(NoCredentials.getInstance());
} else {
bqBuilder.setCredentials(credentials);
}
return bqBuilder.build().getService();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ public interface BigQueryServiceSchema {
String DATASET = "dataset";
/** Table name for the BigQuery DataAddress. */
String TABLE = "table";
/** Destination table name used by the BigQuery source to store data. */
String DESTINATION_TABLE = "destination_table";
/** Query used to extract data by the BigQuery source. */
String QUERY = "query";
/** Service account used to access BigQuery service, overrides the service account defined in connector config. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import org.eclipse.edc.connector.controlplane.transfer.spi.provision.ProvisionManager;
import org.eclipse.edc.connector.controlplane.transfer.spi.provision.ResourceManifestGenerator;
import org.eclipse.edc.gcp.bigquery.BigQueryConfiguration;
import org.eclipse.edc.gcp.bigquery.service.BigQueryFactoryImpl;
import org.eclipse.edc.gcp.common.GcpAccessToken;
import org.eclipse.edc.gcp.common.GcpConfiguration;
Expand Down Expand Up @@ -48,7 +49,8 @@ public String name() {
@Override
public void initialize(ServiceExtensionContext context) {
var monitor = context.getMonitor();
var bqFactory = new BigQueryFactoryImpl(gcpConfiguration, iamService);
var bigQueryConfiguration = new BigQueryConfiguration(gcpConfiguration);
var bqFactory = new BigQueryFactoryImpl(bigQueryConfiguration, iamService);
var provisioner = new BigQueryProvisioner(gcpConfiguration, bqFactory, iamService, monitor);

provisionManager.register(provisioner);
Expand Down
33 changes: 33 additions & 0 deletions extensions/data-plane/data-plane-google-bigquery/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Copyright (c) 2024 Google LLC
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0
*
* SPDX-License-Identifier: Apache-2.0
*
* Contributors:
* Google LLC
*
*/

plugins {
`java-library`
}

dependencies {
api(libs.edc.spi.dataplane)
api(libs.edc.core.dataplane.util)

implementation(project(":extensions:common:gcp:gcp-core"))
implementation(libs.edc.spi.validator)

testImplementation(libs.edc.core.dataplane)
testImplementation(libs.edc.junit)
testImplementation(libs.edc.keys.spi)

// GCP dependencies.
implementation(platform(libs.googlecloud.bom))
implementation(libs.googlecloud.bigquery)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
/*
* Copyright (c) 2024 Google LLC
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0
*
* SPDX-License-Identifier: Apache-2.0
*
* Contributors:
* Google LLC
*
*/

package org.eclipse.edc.connector.dataplane.gcp.bigquery;

import org.eclipse.edc.connector.dataplane.gcp.bigquery.params.BigQueryRequestParamsProvider;
import org.eclipse.edc.connector.dataplane.gcp.bigquery.pipeline.BigQueryDataSinkFactory;
import org.eclipse.edc.connector.dataplane.gcp.bigquery.pipeline.BigQueryDataSourceFactory;
import org.eclipse.edc.connector.dataplane.spi.pipeline.DataTransferExecutorServiceContainer;
import org.eclipse.edc.connector.dataplane.spi.pipeline.PipelineService;
import org.eclipse.edc.gcp.bigquery.BigQueryConfiguration;
import org.eclipse.edc.gcp.common.GcpConfiguration;
import org.eclipse.edc.gcp.iam.IamService;
import org.eclipse.edc.runtime.metamodel.annotation.Extension;
import org.eclipse.edc.runtime.metamodel.annotation.Inject;
import org.eclipse.edc.runtime.metamodel.annotation.Provider;
import org.eclipse.edc.runtime.metamodel.annotation.Provides;
import org.eclipse.edc.runtime.metamodel.annotation.Setting;
import org.eclipse.edc.spi.security.Vault;
import org.eclipse.edc.spi.system.ExecutorInstrumentation;
import org.eclipse.edc.spi.system.ServiceExtension;
import org.eclipse.edc.spi.system.ServiceExtensionContext;
import org.eclipse.edc.spi.types.TypeManager;
import org.eclipse.edc.util.configuration.ConfigurationFunctions;

import java.util.concurrent.Executors;

/**
* Registers source and sink factories for BigQuery data transfers.
*/
@Provides(BigQueryRequestParamsProvider.class)
@Extension(value = DataPlaneBigQueryExtension.NAME)
public class DataPlaneBigQueryExtension implements ServiceExtension {
public static final String NAME = "Data Plane BigQuery";
static final int DEFAULT_THREAD_POOL_SIZE = 5;
@Setting(value = "BigQuery source thread pool size", required = false, type = "int", defaultValue = "" + DEFAULT_THREAD_POOL_SIZE)
public static final String BIGQUERY_THREAD_POOL = "edc.gcp.bq.threads";
@Setting(value = "BigQuery API REST host, for testing purpose", required = false, type = "string")
public static final String BIGQUERY_REST_ENDPOINT = "edc.gcp.bq.rest";
@Setting(value = "BigQuery Storage API RPC host, for testing purpose", required = false, type = "string")
public static final String BIGQUERY_RPC_ENDPOINT = "edc.gcp.bq.rpc";
@Inject
private PipelineService pipelineService;
@Inject
private GcpConfiguration gcpConfiguration;
@Inject
private Vault vault;
@Inject
private TypeManager typeManager;
@Inject
private DataTransferExecutorServiceContainer executorContainer;
@Inject
private ExecutorInstrumentation executorInstrumentation;
@Inject
private IamService iamService;
private BigQueryConfiguration bigQueryConfiguration;

@Override
public String name() {
return NAME;
}

@Override
public void initialize(ServiceExtensionContext context) {
var monitor = context.getMonitor();
var paramsProvider = new BigQueryRequestParamsProvider();
context.registerService(BigQueryRequestParamsProvider.class, paramsProvider);
bigQueryConfiguration = getBigQueryConfiguration(gcpConfiguration, context);

var executorService = executorInstrumentation.instrument(
Executors.newFixedThreadPool(bigQueryConfiguration.threadPoolSize()), "BigQuery Source");

var sourceFactory = new BigQueryDataSourceFactory(bigQueryConfiguration, monitor, paramsProvider, typeManager, executorService, iamService);
pipelineService.registerFactory(sourceFactory);

var sinkFactory = new BigQueryDataSinkFactory(bigQueryConfiguration, executorContainer.getExecutorService(), monitor, vault, typeManager, paramsProvider, iamService);
pipelineService.registerFactory(sinkFactory);
}

@Provider
public BigQueryConfiguration getBigQueryConfiguration() {
return bigQueryConfiguration;
}

private BigQueryConfiguration getBigQueryConfiguration(GcpConfiguration gcpConfiguration, ServiceExtensionContext context) {
var restEndpoint = context.getSetting(BIGQUERY_REST_ENDPOINT, null);
var rpcEndpoint = context.getSetting(BIGQUERY_RPC_ENDPOINT, null);
var threadPoolSize = context.getSetting(BIGQUERY_THREAD_POOL, DataPlaneBigQueryExtension.DEFAULT_THREAD_POOL_SIZE);

if (restEndpoint == null) {
ConfigurationFunctions.propOrEnv(BIGQUERY_REST_ENDPOINT, null);
}

if (rpcEndpoint == null) {
ConfigurationFunctions.propOrEnv(BIGQUERY_RPC_ENDPOINT, null);
}

return new BigQueryConfiguration(gcpConfiguration, restEndpoint, rpcEndpoint, threadPoolSize);
}
}
Loading

0 comments on commit 0c8a25e

Please sign in to comment.