Skip to content

Commit

Permalink
Merge pull request #259 from data-integrations/bugfix/315193699-big-q…
Browse files Browse the repository at this point in the history
…uery-delta-replication-fix-panw

BigQuery Delta Replication Plugin DataSet Project ID Fix
  • Loading branch information
shivamVarCS authored Jan 25, 2024
2 parents 61a20a3 + d55f585 commit a97972b
Show file tree
Hide file tree
Showing 4 changed files with 14 additions and 12 deletions.
8 changes: 3 additions & 5 deletions src/main/java/io/cdap/delta/bigquery/BigQueryTarget.java
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ public void initialize(DeltaTargetContext context) throws Exception {

Credentials credentials = conf.getCredentials();

String project = conf.getDatasetProject();
String project = conf.getProject();

String cmekKey = context.getRuntimeArguments().get(GCP_CMEK_KEY_NAME) != null ?
context.getRuntimeArguments().get(GCP_CMEK_KEY_NAME) : conf.getEncryptionKeyName();
Expand Down Expand Up @@ -138,17 +138,15 @@ public void initialize(DeltaTargetContext context) throws Exception {
});
try {
long maximumExistingSequenceNumber = Failsafe.with(retryPolicy).get(() ->
BigQueryUtils.getMaximumExistingSequenceNumber(context.getAllTables(), project, conf.getDatasetName(),
bigQuery, encryptionConfig, MAX_TABLES_PER_QUERY));
BigQueryUtils.getMaximumExistingSequenceNumber(context.getAllTables(), conf.getDatasetProject(),
conf.getDatasetName(), bigQuery, encryptionConfig, MAX_TABLES_PER_QUERY));
LOG.info("Found maximum sequence number {}", maximumExistingSequenceNumber);
context.initializeSequenceNumber(maximumExistingSequenceNumber);
} catch (Exception e) {
throw new RuntimeException("Failed to compute the maximum sequence number among all the target tables " +
"selected for replication. Please make sure that if target tables exists, " +
"they should have '_sequence_num' column in them.", e);
}


}

@Override
Expand Down
6 changes: 4 additions & 2 deletions src/main/java/io/cdap/delta/bigquery/BigQueryUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryError;
import com.google.cloud.bigquery.BigQueryException;
import com.google.cloud.bigquery.DatasetId;
import com.google.cloud.bigquery.EncryptionConfiguration;
import com.google.cloud.bigquery.FieldValue;
import com.google.cloud.bigquery.FieldValueList;
Expand Down Expand Up @@ -115,8 +116,9 @@ static long getMaximumExistingSequenceNumberPerBatch(Set<SourceTable> allTables,
SourceTable table0 = allTables.stream().findFirst().get();
Set<TableId> existingTableIDs = new HashSet<>();
String dataset = getNormalizedDatasetName(datasetName, table0.getDatabase());
if (bigQuery.getDataset(dataset) != null) {
for (Table table : bigQuery.listTables(dataset).iterateAll()) {
DatasetId datasetId = DatasetId.of(project, dataset);
if (bigQuery.getDataset(datasetId) != null) {
for (Table table : bigQuery.listTables(datasetId).iterateAll()) {
existingTableIDs.add(table.getTableId());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ public void testGetMaximumExistingSequenceNumberForRetryableFailures() throws Ex
bqTarget.initialize(deltaTargetContext);
} finally {
//verify at least 1 retry happens
PowerMockito.verifyStatic(BigQueryUtils.class, Mockito.atLeast(2));
PowerMockito.verifyStatic(BigQueryUtils.class, Mockito.atLeast(1));
BigQueryUtils.getMaximumExistingSequenceNumber(Mockito.anySet(), Mockito.anyString(),
Mockito.nullable(String.class), Mockito.any(BigQuery.class),
Mockito.nullable(EncryptionConfiguration.class), Mockito.anyInt());
Expand Down
10 changes: 6 additions & 4 deletions src/test/java/io/cdap/delta/bigquery/BigQueryUtilsTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import java.util.Set;

import static org.junit.Assert.assertEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.times;

/**
Expand All @@ -85,6 +86,7 @@ public void init() throws Exception {
bigQueryMock = Mockito.mock(BigQuery.class);
Table tableMock = Mockito.mock(Table.class);
Dataset datasetMock = Mockito.mock(Dataset.class);
Mockito.when(bigQueryMock.getDataset(any(DatasetId.class))).thenReturn(datasetMock);
Mockito.when(bigQueryMock.getTable(ArgumentMatchers.any())).thenReturn(tableMock);
Mockito.when(bigQueryMock.getDataset("demodataset")).thenReturn(datasetMock);
PowerMockito.spy(BigQueryUtils.class);
Expand Down Expand Up @@ -281,7 +283,7 @@ public void testGetMaximumExistingSequenceNumberSingleInvocations() throws Excep

// Subtest : One Table
Set<SourceTable> allTables = generateSourceTableSet(1);
Mockito.when(bigQueryMock.listTables(ArgumentMatchers.anyString())).thenReturn(generateBQTablesPage(1));
Mockito.when(bigQueryMock.listTables(any(DatasetId.class))).thenReturn(generateBQTablesPage(1));
long tableResult = BigQueryUtils.getMaximumExistingSequenceNumber(allTables, PROJECT,
null, bigQueryMock, null, 1000);
assertEquals(1L, tableResult);
Expand Down Expand Up @@ -316,7 +318,7 @@ public void testGetMaximumExistingSequenceNumberDoubleInvocations() throws Excep

//Subtest1 : 1001 Tables : Should call bigquery 2 times. 1000+1
Set<SourceTable> allTables = generateSourceTableSet(1001);
Mockito.when(bigQueryMock.listTables(ArgumentMatchers.anyString())).thenReturn(generateBQTablesPage(1001));
Mockito.when(bigQueryMock.listTables(any(DatasetId.class))).thenReturn(generateBQTablesPage(1001));
long tableResult = BigQueryUtils.getMaximumExistingSequenceNumber(allTables, PROJECT,
null, bigQueryMock, null, 1000);
assertEquals(2L, tableResult);
Expand All @@ -341,7 +343,7 @@ public void testGetMaximumExistingSequenceNumberTripleInvocations() throws Excep

//Subtest1 : 2500 Tables : Should call bigquery 3 times. 1000+1000+500
Set<SourceTable> allTables = generateSourceTableSet(2500);
Mockito.when(bigQueryMock.listTables(ArgumentMatchers.anyString())).thenReturn(generateBQTablesPage(2500));
Mockito.when(bigQueryMock.listTables(any(DatasetId.class))).thenReturn(generateBQTablesPage(2500));
long tableResult = BigQueryUtils.getMaximumExistingSequenceNumber(allTables, PROJECT,
null, bigQueryMock, null, 1000);
assertEquals(3L, tableResult);
Expand All @@ -354,7 +356,7 @@ public void testGetMaximumExistingSequenceNumberTripleInvocations() throws Excep
@Test
public void testGetMaximumExistingSequenceNumberEmptyDatasetName() throws Exception {
Set<SourceTable> allTables = generateSourceTableSet(1);
Mockito.when(bigQueryMock.listTables(ArgumentMatchers.anyString())).thenReturn(generateBQTablesPage(1));
Mockito.when(bigQueryMock.listTables(any(DatasetId.class))).thenReturn(generateBQTablesPage(1));
long tableResult0 = BigQueryUtils.getMaximumExistingSequenceNumber(allTables, PROJECT,
"", bigQueryMock, null, 1000);
assertEquals(1, tableResult0);
Expand Down

0 comments on commit a97972b

Please sign in to comment.