Skip to content

Commit

Permalink
Support all the options for CDC creation
Browse files Browse the repository at this point in the history
  • Loading branch information
ivanrudn0303 committed Feb 5, 2024
1 parent 255679b commit 58a1c87
Show file tree
Hide file tree
Showing 6 changed files with 58 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,21 @@
*/
Format format() default Format.JSON;

/**
* Virtual timestamps
*/
boolean virtualTimestampsEnabled() default false;

/**
* Retention period for data in feed
*/
String retentionPeriod() default "PT24H";

/**
* Initial table scan
*/
boolean initialScanEnabled() default false;

enum Mode {
/**
* Only the key component of the modified row
Expand Down
23 changes: 22 additions & 1 deletion databind/src/main/java/tech/ydb/yoj/databind/schema/Schema.java
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ private List<Changefeed> prepareChangefeeds(List<tech.ydb.yoj.databind.schema.Ch
}
}
return changefeeds.stream()
.map(cf -> new Changefeed(cf.name(), cf.mode(), cf.format()))
.map(this::changefeedFromAnnotation)
.toList();
}

Expand Down Expand Up @@ -227,6 +227,20 @@ private JavaField newRootJavaField(@NonNull JavaField javaField) {
return new JavaField(javaField, null);
}

private Changefeed changefeedFromAnnotation(@NonNull tech.ydb.yoj.databind.schema.Changefeed changefeed) {
var retentionPeriod = Duration.parse(changefeed.retentionPeriod());
Preconditions.checkArgument(!(retentionPeriod.isNegative() || retentionPeriod.isZero()),
"RetentionPeriod value defined for %s must be positive", getType());
return new Changefeed(
changefeed.name(),
changefeed.mode(),
changefeed.format(),
changefeed.virtualTimestampsEnabled(),
retentionPeriod,
changefeed.initialScanEnabled()
);
}

/**
* @param field {@link FieldValueType#isComposite() composite} field
* @return {@code true} if the composite field can be flattened to a single field; {@code false otherwise}
Expand Down Expand Up @@ -693,6 +707,13 @@ public static class Changefeed {

@NonNull
tech.ydb.yoj.databind.schema.Changefeed.Format format;

boolean virtualTimestampsEnabled;

@NonNull
Duration retentionPeriod;

boolean initialScanEnabled;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
import org.assertj.core.api.Assertions;
import org.junit.Test;

import java.time.Duration;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

Expand All @@ -21,6 +23,10 @@ public void testChangefeedDefaultsEntity() {
assertThat(entitySchema.getChangefeeds()).hasSize(1);
Assertions.assertThat(entitySchema.getChangefeeds().get(0).getMode()).isEqualTo(Changefeed.Mode.NEW_IMAGE);
Assertions.assertThat(entitySchema.getChangefeeds().get(0).getFormat()).isEqualTo(Changefeed.Format.JSON);
Assertions.assertThat(entitySchema.getChangefeeds().get(0).getRetentionPeriod())
.isEqualTo(Duration.ofHours(24));
Assertions.assertThat(entitySchema.getChangefeeds().get(0).isVirtualTimestampsEnabled()).isFalse();
Assertions.assertThat(entitySchema.getChangefeeds().get(0).isInitialScanEnabled()).isFalse();
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,14 @@
import static tech.ydb.yoj.databind.schema.Changefeed.Mode.KEYS_ONLY;

@Value
@Changefeed(name = "test-changefeed1", mode = KEYS_ONLY, format = JSON)
@Changefeed(
name = "test-changefeed1",
mode = KEYS_ONLY,
format = JSON,
virtualTimestampsEnabled = true,
retentionPeriod = "PT1H",
initialScanEnabled = true
)
@Changefeed(name = "test-changefeed2")
public class ChangefeedEntity implements Entity<ChangefeedEntity> {
Id id;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,10 @@ public void createTable(String name, List<EntitySchema.JavaField> columns, List<
alterTableSettings.addChangefeed(new Changefeed(
changefeed.getName(),
Changefeed.Mode.valueOf(changefeed.getMode().name()),
Changefeed.Format.valueOf(changefeed.getFormat().name())
Changefeed.Format.valueOf(changefeed.getFormat().name()),
changefeed.isVirtualTimestampsEnabled(),
changefeed.getRetentionPeriod(),
changefeed.isInitialScanEnabled()
));
status = session.alterTable(tablespace + name, alterTableSettings).join();
if (status.getCode() != com.yandex.ydb.core.StatusCode.SUCCESS) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,9 @@ public void createTable(String name, List<EntitySchema.JavaField> columns, List<
Changefeed newChangefeed = Changefeed.newBuilder(changefeed.getName())
.withMode(Changefeed.Mode.valueOf(changefeed.getMode().name()))
.withFormat(Changefeed.Format.valueOf(changefeed.getFormat().name()))
.withVirtualTimestamps(changefeed.isVirtualTimestampsEnabled())
.withRetentionPeriod(changefeed.getRetentionPeriod())
.withInitialScan(changefeed.isInitialScanEnabled())
.build();

alterTableSettings.addChangefeed(newChangefeed);
Expand Down

0 comments on commit 58a1c87

Please sign in to comment.