Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[source-mongodb-v2] Force state update once a while #48715

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,7 @@ class DebeziumMessageProducer<T>(
private val offsetManager: AirbyteFileOffsetBackingStore?
private val targetPosition: CdcTargetPosition<T>
private val schemaHistoryManager: Optional<AirbyteSchemaHistoryStorage>

private var shouldEmitStateMessage = false


private val eventConverter: DebeziumEventConverter

init {
Expand All @@ -68,7 +66,6 @@ class DebeziumMessageProducer<T>(
previousCheckpointOffset.clear()
previousCheckpointOffset.putAll(checkpointOffsetToSend)
checkpointOffsetToSend.clear()
shouldEmitStateMessage = false
return stateMessage
}

Expand All @@ -94,12 +91,6 @@ class DebeziumMessageProducer<T>(
}
}

if (checkpointOffsetToSend.size == 1 && !message.isSnapshotEvent) {
if (targetPosition.isEventAheadOffset(checkpointOffsetToSend, message)) {
shouldEmitStateMessage = true
}
}

return eventConverter.toAirbyteMessage(message)
}

Expand All @@ -119,7 +110,7 @@ class DebeziumMessageProducer<T>(
}

override fun shouldEmitStateMessage(stream: ConfiguredAirbyteStream?): Boolean {
return shouldEmitStateMessage
return true
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,26 @@ open class SourceStateIterator<T>(
private var lastCheckpoint: Instant = Instant.now()

override fun computeNext(): AirbyteMessage? {
var iteratorHasNextValue: Boolean
var shouldForceUpdateState = shouldEmitStateMessage() &&
sourceStateMessageProducer.shouldEmitStateMessage(stream);
// check shouldForceUpdateState in case the sync is stuck; therefore we should send out
// state messages even if there is no records update. This could be useful to advance DBZ
// pointer on a less updated table.
if (shouldForceUpdateState) {
val stateMessage =
sourceStateMessageProducer.generateStateMessageAtCheckpoint(stream)
stateMessage!!.withSourceStats(
AirbyteStateStats().withRecordCount(recordCount.toDouble())
)
LOGGER.info { "sending state message, with count per stream: $streamRecordCount " }

recordCount = 0L
streamRecordCount.clear()

lastCheckpoint = Instant.now()
return AirbyteMessage().withType(AirbyteMessage.Type.STATE).withState(stateMessage)
}
val iteratorHasNextValue: Boolean
try {
iteratorHasNextValue = messageIterator.hasNext()
} catch (ex: Exception) {
Expand All @@ -38,23 +57,6 @@ open class SourceStateIterator<T>(
throw FailedRecordIteratorException(ex)
}
if (iteratorHasNextValue) {
if (
shouldEmitStateMessage() &&
sourceStateMessageProducer.shouldEmitStateMessage(stream)
) {
val stateMessage =
sourceStateMessageProducer.generateStateMessageAtCheckpoint(stream)
stateMessage!!.withSourceStats(
AirbyteStateStats().withRecordCount(recordCount.toDouble())
)
LOGGER.info { "sending state message, with count per stream: $streamRecordCount " }

recordCount = 0L
streamRecordCount.clear()

lastCheckpoint = Instant.now()
return AirbyteMessage().withType(AirbyteMessage.Type.STATE).withState(stateMessage)
}
// Use try-catch to catch Exception that could occur when connection to the database
// fails
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ plugins {
airbyteJavaConnector {
cdkVersionRequired = '0.45.1'
features = ['db-sources', 'datastore-mongo']
useLocalCdk = false
useLocalCdk = true
}

application {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ java {
airbyteJavaConnector {
cdkVersionRequired = '0.48.2'
features = ['db-sources', 'datastore-postgres']
useLocalCdk = false
useLocalCdk = true
}

application {
Expand Down
Loading