Skip to content

Commit

Permalink
BugFix for PANW Issue
Browse files Browse the repository at this point in the history
  • Loading branch information
shivamVarCS committed Jan 22, 2024
1 parent 61a20a3 commit b1eb578
Show file tree
Hide file tree
Showing 3 changed files with 8 additions and 8 deletions.
2 changes: 1 addition & 1 deletion docs/bigquery-cdcTarget.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ that the BigQuery job will run in. `BigQuery Job User` role on this project must
account to run the job. If a temporary bucket needs to be created, the bucket will also be created in this project and
'GCE Storage Bucket Admin' role on this project must be granted to the specified service account to create buckets.

**Dataset Project ID**: Project the destination dataset belongs to. This is only required if the dataset is not
**Dataset Project ID**: Project Id for the destination dataset belongs to. This is only required if the dataset is not
in the same project that the BigQuery job will run in. If no value is given, it will default to the
configured Project ID. `BigQuery Data Editor` role on this project must be granted to the specified service account to
write BigQuery data to this project.
Expand Down
8 changes: 3 additions & 5 deletions src/main/java/io/cdap/delta/bigquery/BigQueryTarget.java
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ public void initialize(DeltaTargetContext context) throws Exception {

Credentials credentials = conf.getCredentials();

String project = conf.getDatasetProject();
String project = conf.getProject();

String cmekKey = context.getRuntimeArguments().get(GCP_CMEK_KEY_NAME) != null ?
context.getRuntimeArguments().get(GCP_CMEK_KEY_NAME) : conf.getEncryptionKeyName();
Expand Down Expand Up @@ -138,17 +138,15 @@ public void initialize(DeltaTargetContext context) throws Exception {
});
try {
long maximumExistingSequenceNumber = Failsafe.with(retryPolicy).get(() ->
BigQueryUtils.getMaximumExistingSequenceNumber(context.getAllTables(), project, conf.getDatasetName(),
bigQuery, encryptionConfig, MAX_TABLES_PER_QUERY));
BigQueryUtils.getMaximumExistingSequenceNumber(context.getAllTables(), conf.getDatasetProject(),
conf.getDatasetName(), bigQuery, encryptionConfig, MAX_TABLES_PER_QUERY));
LOG.info("Found maximum sequence number {}", maximumExistingSequenceNumber);
context.initializeSequenceNumber(maximumExistingSequenceNumber);
} catch (Exception e) {
throw new RuntimeException("Failed to compute the maximum sequence number among all the target tables " +
"selected for replication. Please make sure that if target tables exists, " +
"they should have '_sequence_num' column in them.", e);
}


}

@Override
Expand Down
6 changes: 4 additions & 2 deletions src/main/java/io/cdap/delta/bigquery/BigQueryUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryError;
import com.google.cloud.bigquery.BigQueryException;
import com.google.cloud.bigquery.DatasetId;
import com.google.cloud.bigquery.EncryptionConfiguration;
import com.google.cloud.bigquery.FieldValue;
import com.google.cloud.bigquery.FieldValueList;
Expand Down Expand Up @@ -115,8 +116,9 @@ static long getMaximumExistingSequenceNumberPerBatch(Set<SourceTable> allTables,
SourceTable table0 = allTables.stream().findFirst().get();
Set<TableId> existingTableIDs = new HashSet<>();
String dataset = getNormalizedDatasetName(datasetName, table0.getDatabase());
if (bigQuery.getDataset(dataset) != null) {
for (Table table : bigQuery.listTables(dataset).iterateAll()) {
DatasetId datasetId = DatasetId.of(project, dataset);
if (bigQuery.getDataset(datasetId) != null) {
for (Table table : bigQuery.listTables(datasetId).iterateAll()) {
existingTableIDs.add(table.getTableId());
}
}
Expand Down

0 comments on commit b1eb578

Please sign in to comment.