Skip to content

Commit

Permalink
Merge pull request #1 from anilkulkarni87/feature/publishtoKafka
Browse files Browse the repository at this point in the history
Feature : Publish to Kafka
  • Loading branch information
anilkulkarni87 authored Sep 27, 2023
2 parents 8b906f1 + 3f7130c commit 09c1652
Show file tree
Hide file tree
Showing 18 changed files with 590 additions and 23 deletions.
35 changes: 23 additions & 12 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@

## Badges
# Apache Avro - Schemas and Custom LogicalTypes

[![MIT License](https://img.shields.io/badge/License-MIT-green.svg)](https://choosealicense.com/licenses/mit/)
[![Java CI with Gradle](https://github.com/anilkulkarni87/AvroJnaana/actions/workflows/gradle.yml/badge.svg)](https://github.com/anilkulkarni87/AvroJnaana/actions/workflows/gradle.yml)
![GitHub last commit (branch)](https://img.shields.io/github/last-commit/anilkulkarni87/AvroJnaana/main)
![GitHub Repo stars](https://img.shields.io/github/stars/anilkulkarni87/AvroJnaana?style=social)



# Apache Avro - Schemas and Custom LogicalTypes

### AvroJnaana:
This name combines "Avro" and "Jnaana" (meaning "knowledge" or "wisdom" in Sanskrit)
to suggest a project focused on understanding and mastering the use of logical types in Apache Avro.
Expand All @@ -16,12 +16,14 @@ This is a companion code repo for the Apache Avro series of articles. The articl

- Custom Logical Types
- Apache Avro Schemas
- QueryRecord
- CustomerObjectModel
- Building Avrodoc via gradle
- Writing to Avro using Java Faker

Here is the link to the Avrodoc of the schemas that the repo currently has:

### [Avrodoc](avrodoc.html)
### [Avrodoc](avrodoc.html){:target="_blank"}



Expand All @@ -40,6 +42,9 @@ Some of the questions it will help answer right away:
- How to define a Custom Logical type and package it?
- How do we maintain our schemas?
- Better way of sharing schemas with other team members?
- How can we write complex schemas easily?
- How can schemas be made reusable?

## Schemas Used

- QueryRecord schema
Expand All @@ -48,7 +53,8 @@ Some of the questions it will help answer right away:
- CustomerObjectModel

My take on a generic Customer Model of a retail store.
## Usage/Examples

## Build project

- Clone the repo
- Build project
Expand All @@ -66,6 +72,8 @@ Some of the questions it will help answer right away:

- Run `QueryRecordOutput.java` and verify the logs.

## Testing with Kafka
Read more at [Test with Kafka](./kafka.md)


## Directory Tree
Expand Down Expand Up @@ -129,13 +137,16 @@ Some of the questions it will help answer right away:

## Roadmap

- Add Github workflow
- Add unit tests for Conversions
- Publish To Kafka topic
- Add spotless or checkstyle plugins
- Fix for fields which are union logicaltype and null
- Schema evolution
- Keep the Readme.md updated
- [ ] Add Github workflow
- [ ] Add unit tests for Conversions
- [x] Publish To Kafka topic
- [ ] Add spotless or checkstyle plugins
- [ ] Fix for fields which are union logicaltype and null
- [ ] Schema evolution
- [ ] Keep the README.md updated


![Complete flow](./docs/ecommerce.png)

## Tech Stack

Expand Down
15 changes: 13 additions & 2 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,26 @@ repositories {
maven {
url "https://clojars.org/repo/"
}
maven {
url "https://packages.confluent.io/maven"
}
}

dependencies {
testImplementation 'org.junit.jupiter:junit-jupiter-api:5.8.1'
testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine:5.8.1'
testImplementation 'org.junit.jupiter:junit-jupiter-api:5.10.0'
testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine:5.10.0'
// https://mvnrepository.com/artifact/org.slf4j/slf4j-api
implementation 'org.slf4j:slf4j-api:2.0.9'
// https://mvnrepository.com/artifact/org.slf4j/slf4j-simple
implementation 'org.slf4j:slf4j-simple:2.0.9'
implementation "org.apache.avro:avro:1.11.1"
implementation "com.github.javafaker:javafaker:1.0.2"
implementation(project(":schemas"))
implementation(project(":custom-conversions"))
implementation 'org.apache.kafka:kafka-clients:3.5.1'
// https://mvnrepository.com/artifact/io.confluent/kafka-avro-serializer
implementation 'io.confluent:kafka-avro-serializer:5.3.0'


}

Expand Down
2 changes: 1 addition & 1 deletion avrodoc.html → docs/avrodoc/avrodoc.html

Large diffs are not rendered by default.

104 changes: 104 additions & 0 deletions docs/ecommerce.drawio
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
<mxfile host="65bd71144e">
<diagram id="wj3ed6IGQTsFisLiYsuj" name="Page-1">
<mxGraphModel dx="1177" dy="1538" grid="1" gridSize="10" guides="1" tooltips="1" connect="1" arrows="1" fold="1" page="1" pageScale="1" pageWidth="850" pageHeight="1100" background="#818190" math="0" shadow="0">
<root>
<mxCell id="0"/>
<mxCell id="1" parent="0"/>
<mxCell id="7" value="" style="rounded=1;whiteSpace=wrap;html=1;fillColor=none;" parent="1" vertex="1">
<mxGeometry x="-10" y="50" width="480" height="200" as="geometry"/>
</mxCell>
<mxCell id="9" style="edgeStyle=none;html=1;entryX=0;entryY=0.5;entryDx=0;entryDy=0;" parent="1" source="3" target="4" edge="1">
<mxGeometry relative="1" as="geometry"/>
</mxCell>
<mxCell id="3" value="Avro Schema for Data records defined via Avro IDL and class files generated." style="whiteSpace=wrap;html=1;shape=mxgraph.basic.document" parent="1" vertex="1">
<mxGeometry x="20" y="90" width="100" height="100" as="geometry"/>
</mxCell>
<mxCell id="10" style="edgeStyle=none;html=1;entryX=0;entryY=0.5;entryDx=0;entryDy=0;" parent="1" source="4" target="5" edge="1">
<mxGeometry relative="1" as="geometry"/>
</mxCell>
<mxCell id="4" value="Build Schema files and generate java class files" style="rounded=0;whiteSpace=wrap;html=1;" parent="1" vertex="1">
<mxGeometry x="160" y="110" width="120" height="60" as="geometry"/>
</mxCell>
<mxCell id="11" style="edgeStyle=none;html=1;entryX=0;entryY=0.5;entryDx=0;entryDy=0;entryPerimeter=0;" parent="1" source="5" edge="1">
<mxGeometry relative="1" as="geometry">
<mxPoint x="500" y="140" as="targetPoint"/>
</mxGeometry>
</mxCell>
<mxCell id="5" value="Create fake&amp;nbsp;&lt;br&gt;records and&lt;br&gt;write producer to&lt;br&gt;KAFKA" style="shape=step;perimeter=stepPerimeter;whiteSpace=wrap;html=1;fixedSize=1;" parent="1" vertex="1">
<mxGeometry x="305" y="100" width="145" height="80" as="geometry"/>
</mxCell>
<mxCell id="8" value="" style="dashed=0;outlineConnect=0;html=1;align=center;labelPosition=center;verticalLabelPosition=bottom;verticalAlign=top;shape=mxgraph.weblogos.java;fillColor=none;" parent="1" vertex="1">
<mxGeometry y="200" width="20" height="35" as="geometry"/>
</mxCell>
<mxCell id="14" style="edgeStyle=none;html=1;exitX=1;exitY=0.5;exitDx=0;exitDy=0;" parent="1" source="12" edge="1">
<mxGeometry relative="1" as="geometry">
<mxPoint x="700" y="140" as="targetPoint"/>
</mxGeometry>
</mxCell>
<mxCell id="12" value="KAFKA" style="shape=process;whiteSpace=wrap;html=1;backgroundOutline=1;fillColor=default;" parent="1" vertex="1">
<mxGeometry x="500" y="110" width="120" height="60" as="geometry"/>
</mxCell>
<mxCell id="13" value="" style="endArrow=none;html=1;" parent="1" edge="1">
<mxGeometry width="50" height="50" relative="1" as="geometry">
<mxPoint x="700" y="280" as="sourcePoint"/>
<mxPoint x="700" y="5" as="targetPoint"/>
</mxGeometry>
</mxCell>
<mxCell id="32" style="edgeStyle=none;html=1;entryX=0.5;entryY=0;entryDx=0;entryDy=0;entryPerimeter=0;" parent="1" source="15" target="18" edge="1">
<mxGeometry relative="1" as="geometry"/>
</mxCell>
<mxCell id="15" value="" style="outlineConnect=0;dashed=0;verticalLabelPosition=bottom;verticalAlign=top;align=center;html=1;shape=mxgraph.aws3.s3;fillColor=#E05243;gradientColor=none;" parent="1" vertex="1">
<mxGeometry x="880" y="-30" width="60" height="70" as="geometry"/>
</mxCell>
<mxCell id="16" value="" style="endArrow=classic;html=1;entryX=0;entryY=0.5;entryDx=0;entryDy=0;entryPerimeter=0;" parent="1" target="15" edge="1">
<mxGeometry width="50" height="50" relative="1" as="geometry">
<mxPoint x="700" y="7" as="sourcePoint"/>
<mxPoint x="930" y="160" as="targetPoint"/>
</mxGeometry>
</mxCell>
<mxCell id="17" value="Kafka S3 connect" style="edgeLabel;html=1;align=center;verticalAlign=middle;resizable=0;points=[];" parent="16" vertex="1" connectable="0">
<mxGeometry x="0.067" y="1" relative="1" as="geometry">
<mxPoint x="-1" as="offset"/>
</mxGeometry>
</mxCell>
<mxCell id="31" style="edgeStyle=none;html=1;entryX=0;entryY=0.5;entryDx=0;entryDy=0;" parent="1" source="18" target="29" edge="1">
<mxGeometry relative="1" as="geometry"/>
</mxCell>
<mxCell id="18" value="" style="shape=mxgraph.signs.nature.snowflake;html=1;pointerEvents=1;fillColor=#1ba1e2;strokeColor=#006EAF;verticalLabelPosition=bottom;verticalAlign=top;align=center;fontColor=#ffffff;" parent="1" vertex="1">
<mxGeometry x="1080" y="110" width="60" height="78" as="geometry"/>
</mxCell>
<mxCell id="19" value="" style="endArrow=classic;html=1;entryX=0.031;entryY=0.412;entryDx=0;entryDy=0;entryPerimeter=0;" parent="1" target="18" edge="1">
<mxGeometry width="50" height="50" relative="1" as="geometry">
<mxPoint x="700" y="140" as="sourcePoint"/>
<mxPoint x="930" y="160" as="targetPoint"/>
</mxGeometry>
</mxCell>
<mxCell id="20" value="Kafka Snowflake connect" style="edgeLabel;html=1;align=center;verticalAlign=middle;resizable=0;points=[];" parent="19" vertex="1" connectable="0">
<mxGeometry x="0.0151" y="-3" relative="1" as="geometry">
<mxPoint x="-1" as="offset"/>
</mxGeometry>
</mxCell>
<mxCell id="33" style="edgeStyle=none;html=1;entryX=0.5;entryY=1;entryDx=0;entryDy=0;entryPerimeter=0;" parent="1" source="21" target="18" edge="1">
<mxGeometry relative="1" as="geometry"/>
</mxCell>
<mxCell id="21" value="" style="aspect=fixed;html=1;points=[];align=center;image;fontSize=12;image=img/lib/azure2/general/Code.svg;fillColor=default;" parent="1" vertex="1">
<mxGeometry x="887" y="250" width="64" height="52" as="geometry"/>
</mxCell>
<mxCell id="22" value="" style="endArrow=classic;html=1;" parent="1" target="21" edge="1">
<mxGeometry width="50" height="50" relative="1" as="geometry">
<mxPoint x="700" y="280" as="sourcePoint"/>
<mxPoint x="930" y="160" as="targetPoint"/>
</mxGeometry>
</mxCell>
<mxCell id="23" value="Spark Streaming" style="edgeLabel;html=1;align=center;verticalAlign=middle;resizable=0;points=[];" parent="22" vertex="1" connectable="0">
<mxGeometry x="-0.2945" y="-4" relative="1" as="geometry">
<mxPoint x="-1" as="offset"/>
</mxGeometry>
</mxCell>
<mxCell id="29" value="Superset" style="image;html=1;image=img/lib/clip_art/finance/Graph_128x128.png;fillColor=default;" parent="1" vertex="1">
<mxGeometry x="1250" y="110" width="80" height="80" as="geometry"/>
</mxCell>
</root>
</mxGraphModel>
</diagram>
</mxfile>
Binary file added docs/ecommerce.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
26 changes: 26 additions & 0 deletions kafka.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
## Project setup
Install the confluent kafka on your machine. This is the most easiest way as it comes packaged with all services like schema registry, connector.

[Installation instructions](https://docs.confluent.io/platform/current/installation/installing_cp/zip-tar.html){:target="_blank"}

```
confluent local services start
```

- Run the `ProducerDemo.java`

- Run the `ConsumerDemo.java`

Please take a look the output.

```
confluent local services stop
```

## Things to Observe:

- The producer uses the java classes generated from the avdl files.
- The consumer actually decrypts the data it reads from the topic.
- If you use the kafka consumer cli you will see encrypted data.
- The custom logical types along with conversions defined make this happen. The only way to decrypt is by reading the data by using java classes generated.
- If the message on the topic is intercepted, it cannot be decrypted.
Binary file added query.avro
Binary file not shown.
61 changes: 53 additions & 8 deletions schemas/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,12 @@ plugins {
id 'java'
id "com.github.davidmc24.gradle.plugin.avro" version "1.7.0"
id "com.github.node-gradle.node" version "3.5.1"
id 'scala'
id 'com.zlad.gradle.avrohugger' version '0.7.0'
}

apply plugin: "com.github.davidmc24.gradle.plugin.avro-base"

group 'com.lavro'
version '1.0-SNAPSHOT'

Expand All @@ -26,6 +30,7 @@ dependencies {
customConversions(project(":custom-conversions"))
testImplementation 'org.junit.jupiter:junit-jupiter-api:5.8.1'
testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine:5.8.1'
implementation 'org.scala-lang:scala-library:2.13.10'
implementation "org.apache.avro:avro:1.11.1"
implementation "com.github.javafaker:javafaker:1.0.2"
implementation(project(":custom-conversions"))
Expand All @@ -37,34 +42,74 @@ test {
useJUnitPlatform()
}

task generateProtocol(type: GenerateAvroProtocolTask) {
shouldRunAfter "build"
//task generateProtocol(type: GenerateAvroProtocolTask) {
// shouldRunAfter "build"
// source file("src/main/avro")
// include("**/*.avdl")
// outputDir = file("build/generated-main-avro-avpr")
//}

def generateProtocol = tasks.register("generateProtocol", GenerateAvroProtocolTask) {
shouldRunAfter("build")
source file("src/main/avro")
include("**/*.avdl")
outputDir = file("build/generated-main-avro-avpr")
}

task generateSchema(type: GenerateAvroSchemaTask){
shouldRunAfter "build"
shouldRunAfter "generateProtocol"
def generateSchema=tasks.register("generateSchema", GenerateAvroSchemaTask) {
// shouldRunAfter("GenerateAvroProtocolTask")
dependsOn generateProtocol
source file("src/main/avro")
source file("build/generated-main-avro-avpr")
include("**/*.avpr")
outputDir = file("build/generated-main-avro-avsc")
}

//task generateSchema(type: GenerateAvroSchemaTask){
// shouldRunAfter "build"
// shouldRunAfter "generateProtocol"
// source file("build/generated-main-avro-avpr")
// include("**/*.avpr")
// outputDir = file("build/generated-main-avro-avsc")
//}

avro {
enableDecimalLogicalType = false
stringType = "CharSequence"
createSetters = true
conversionsAndTypeFactoriesClasspath.from(configurations.customConversions)
logicalTypeFactoryClassNames.put("reversed", "com.lavro.ReversedLogicalTypeFactory")
customConversionClassNames.add("com.lavro.ReversedConversion")
logicalTypeFactoryClassNames.put("encrypted", "com.lavro.EncryptedLogicalTypeFactory")
customConversionClassNames.add("com.lavro.EncryptedConversion")
}

task generateAvrodoc(type: NpxTask) {
jar {
duplicatesStrategy(DuplicatesStrategy.EXCLUDE)
}

avrohugger {
sourceDirectories = files('src/main/avro')
destinationDirectory = file('src/main/scala')
typeMapping {
protocolType = ScalaADT
}
sourceFormat = Standard
}

tasks.register("generateAvrodoc", NpxTask) {
shouldRunAfter("generateSchema")
dependsOn(generateSchema)
command = '@mikaello/avrodoc-plus'
args = ['-i','build/generated-main-avro-avsc', '-o','../avrodoc.html']
args = ['-i','build/generated-main-avro-avsc', '-o','../docs/avrodoc/avrodoc.html']
mustRunAfter tasks.withType(GenerateAvroJavaTask)
}
}

//task generateAvrodoc(type: NpxTask) {
// dependsOn(generateSchema)
// command = '@mikaello/avrodoc-plus'
// args = ['-i','build/generated-main-avro-avsc', '-o','../docs/avrodoc/avrodoc.html']
// mustRunAfter tasks.withType(GenerateAvroJavaTask)
//}

//tasks.named("generateProtocol") { finalizedBy("generateSchema") }
17 changes: 17 additions & 0 deletions schemas/src/main/avro/SimpleCustomer.avdl
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
@namespace("com.dataanada.customer")
protocol Customer{
record CustomerRecord {
int customerId;
@logicalType("encrypted")
string firstName;
@logicalType("encrypted")
string lastName;
@logicalType("encrypted")
string email;
string addressLine1;
string city;
string state;
int zip;
string country;
}
}
10 changes: 10 additions & 0 deletions schemas/src/main/avro/SimpleOrder.avdl
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
@namespace("com.dataanada.order")
protocol Order{
record OrderRecord {
int orderId;
union {null, int} storeId = null;
int customerId;
decimal(5,2) price;
string orderType;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
/** MACHINE-GENERATED FROM AVRO SCHEMA. DO NOT EDIT DIRECTLY */
package com.dataanada.customer

final case class CustomerRecord(customerId: Int, firstName: String, lastName: String, email: String, addressLine1: String, city: String, state: String, zip: Int, country: String)
4 changes: 4 additions & 0 deletions schemas/src/main/scala/com/dataanada/order/OrderRecord.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
/** MACHINE-GENERATED FROM AVRO SCHEMA. DO NOT EDIT DIRECTLY */
package com.dataanada.order

final case class OrderRecord(orderId: Int, storeId: Option[Int] = None, customerId: Int, price: BigDecimal, orderType: String)
Loading

0 comments on commit 09c1652

Please sign in to comment.