Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make KStream health result serializable (#1026) #1088

Merged
merged 1 commit into from
Oct 24, 2024

Conversation

tmellanxt
Copy link
Contributor

@tmellanxt tmellanxt commented Oct 16, 2024

Summary

This pull request addresses issue #1026 by implementing a custom Serde<TaskId> to kafka-streams module. This avoid a serialization error when calling the /health endpoint, which throws in turn a:

No serializable introspection present for type TaskId. Consider adding Serdeable. Serializable annotate to type TaskId. Alternatively if you are not in control of the project's source code, you can use @SerdeImport(TaskId.class) to enable serialization of this type.

Changes Made

  • Added serde processor as annotationProcessor, and serde api as compileOnly to load in the annotation.
  • Created a class dedicated to adding annotation for external classes that need serialising as part of the health result

Related Issues

* Additional Serdes for Kafka streams health types returned as part of the {@link io.micronaut.management.health.indicator.HealthResult}.
*/
@SerdeImport(TaskId.class)
public class KafkaStreamHealthSerdes {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks can you make this non-public and add @Internal?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, done

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks, a test would be good if possible

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had written one, but wasn't sure if it was too specific a test to use a JsonMapper to check if a TaskId is serializable. If you think its appropriate here I'll add it.

The kafka-streams instances injected into the tests have 0 active or standby tasks which is why this was never caught.

Lmk your thoughts on this. If you want the concrete test I'll add it

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just a check that the type is serialisable is fine

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While testing I've realised that the TaskId isn't serializable out of the box with Jackson, and if it were it would yield:

"taskId": {
  "topicGroupId": 0,
  "partition": 0
}

but I'm not sure this is helpful as part of the health report however.

The TaskId object has a custom .toString() which formats it in a readable manner. I think its more appropriate here.

"taskId": "0_0"

After all, the task id is meant to be an identifier, showing its component internal fields in this manner doesn't seem helpful.

This would be the change, adding the toString(). I'd make it null safe of course

details.put("taskId", taskMetadata.taskId().toString());

Thoughts?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure, for that case you probably need to implement a Serializer and remove the @SerdeImport

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well I was just going to call .toString() on the task object that gets put into the Map<String, Object>.

Or would you rather a custom Serializer to allow users to override the serializer if they want?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Went with the custom Serde

kafka-streams/build.gradle Outdated Show resolved Hide resolved
@graemerocher graemerocher added the type: enhancement New feature or request label Oct 18, 2024
@graemerocher
Copy link
Contributor

sorry would you mind rebasing and then I will merge

@tmellanxt
Copy link
Contributor Author

sorry would you mind rebasing and then I will merge

Rebased @graemerocher

@graemerocher graemerocher merged commit 1028563 into micronaut-projects:5.7.x Oct 24, 2024
6 checks passed
@graemerocher
Copy link
Contributor

Thanks for the contribution!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type: enhancement New feature or request
Projects
No open projects
Status: Done
Development

Successfully merging this pull request may close these issues.

Kafka Streams and Jackson Serialization results in CodecException when calling Health endpoint
2 participants