Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make KStream health result serializable (#1026) #1088

Merged
merged 1 commit into from
Oct 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions kafka-streams/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ micronaut {
dependencies {
api projects.micronautKafka
api libs.managed.kafka.streams
compileOnly mnSerde.micronaut.serde.api
compileOnly mnMicrometer.micronaut.micrometer.core
testImplementation mn.micronaut.http.client
testImplementation mnSerde.micronaut.serde.jackson
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Copyright 2017-2024 original authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.micronaut.configuration.kafka.streams.health.serde;

import io.micronaut.core.annotation.NonNull;
import io.micronaut.core.annotation.Nullable;
import io.micronaut.core.type.Argument;
import io.micronaut.serde.Decoder;
import io.micronaut.serde.Encoder;
import io.micronaut.serde.Serde;
import jakarta.inject.Singleton;
import org.apache.kafka.streams.processor.TaskId;

import java.io.IOException;


/**
* A custom {@link Serde} implementation for serializing and deserializing
* {@link TaskId}.
*
* @see TaskId
* @see Serde
*/
@Singleton
public final class TaskIdSerde implements Serde<TaskId> {

/**
* Deserializes a {@link TaskId} from its string representation.
*
* @param decoder The {@link Decoder} used to read the input.
* @param context The deserialization context.
* @param type The {@link TaskId} type argument.
* @return The deserialized {@link TaskId}.
* @throws IOException If an I/O error occurs during deserialization.
*/
@Override
public @Nullable TaskId deserialize(@NonNull Decoder decoder, DecoderContext context, @NonNull Argument<? super TaskId> type) throws IOException {
final String taskIdStr = decoder.decodeString();
return TaskId.parse(taskIdStr);
}

/**
* Serializes a {@link TaskId} into its string representation.
*
* @param encoder The {@link Encoder} used to write the serialized output.
* @param context The serialization context.
* @param type The {@link TaskId} type argument.
* @param value The {@link TaskId} instance to be serialized.
* @throws IOException If an I/O error occurs during serialization.
*/
@Override
public void serialize(@NonNull Encoder encoder, EncoderContext context, @NonNull Argument<? extends TaskId> type, @NonNull TaskId value) throws IOException {
encoder.encodeString(value.toString());
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package io.micronaut.configuration.kafka.streams.health.serde

import io.micronaut.configuration.kafka.streams.AbstractTestContainersSpec
import io.micronaut.json.JsonMapper
import org.apache.kafka.streams.processor.TaskId

class TaskIdSerdeSpec extends AbstractTestContainersSpec {

void "should serialize TaskId"() {
given:
def jsonMapper = context.getBean(JsonMapper)
def taskId = new TaskId(1, 5, "my-topology")
def expectedTaskStr = taskId.toString()

when:
String json = jsonMapper.writeValueAsString(taskId)

then:
json != null
json == "\"${expectedTaskStr}\""
}

void "should deserialize TaskId"() {
given:
def jsonMapper = context.getBean(JsonMapper)
def expectedTaskStr = "my-topology__1_5"
String serializedString = "\"${expectedTaskStr}\""

when:
TaskId deserializedTaskId = jsonMapper.readValue(serializedString, TaskId)

then:
deserializedTaskId != null
deserializedTaskId.toString() == expectedTaskStr
}
}
Loading