Skip to content

Commit

Permalink
Checkpoint
Browse files Browse the repository at this point in the history
gfinocchiaro committed Feb 25, 2024

Verified

This commit was signed with the committer’s verified signature.
MHajoha Maximilian Haye
1 parent 06fe600 commit b683d6b
Showing 26 changed files with 1,102 additions and 195 deletions.
5 changes: 5 additions & 0 deletions buildSrc/src/main/groovy/lightstreamer-kafka-connector.gradle
Original file line number Diff line number Diff line change
@@ -69,4 +69,9 @@ java {

tasks.named('test') {
useJUnitPlatform()
}


task cleanQuickStart(type: Delete) {
delete "$rootDir/$quickstartDeployDirName"
}
4 changes: 2 additions & 2 deletions examples/quickstart/Dockerfile.kafka-connector
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
FROM lightstreamer

ARG version
COPY deploy/lightstreamer-kafka-connector-*.zip /tmp/lightstreamer-kafka-connector.zip
COPY web /lightstreamer/pages/QuickStart

COPY deploy/lightstreamer-kafka-connector-${version}.zip /tmp/lightstreamer-kafka-connector.zip
USER root
RUN apt-get -y update; \
apt-get install -y unzip; \
8 changes: 4 additions & 4 deletions examples/quickstart/Dockerfile.producer
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
FROM eclipse-temurin:11
ARG VERSION
FROM eclipse-temurin:17

RUN mkdir /opt/producer
COPY deploy/lightstreamer-kafka-connector-samples-all-*.jar /opt/producer/lightstreamer-kafka-connector-samples-all.jar

COPY deploy/lightstreamer-kafka-connector-samples-all-0.1.0.jar /opt/producer/lightstreamer-kafka-connector-samples-all.jar
CMD ["java", "-jar", "/opt/producer/lightstreamer-kafka-connector-samples-all.jar"]
ENTRYPOINT ["java", "-jar", "/opt/producer/lightstreamer-kafka-connector-samples-all.jar"]
Submodule Lightstreamer-example-StockList-client-javascript added at ec8c3e
160 changes: 160 additions & 0 deletions examples/quickstart/README.html
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8">
<title></title>
<style>
/* From extension vscode.github */
/*---------------------------------------------------------------------------------------------
* Copyright (c) Microsoft Corporation. All rights reserved.
* Licensed under the MIT License. See License.txt in the project root for license information.
*--------------------------------------------------------------------------------------------*/

.vscode-dark img[src$=\#gh-light-mode-only],
.vscode-light img[src$=\#gh-dark-mode-only] {
display: none;
}

</style>

<link rel="stylesheet" href="https://cdn.jsdelivr.net/gh/Microsoft/vscode/extensions/markdown-language-features/media/markdown.css">
<link rel="stylesheet" href="https://cdn.jsdelivr.net/gh/Microsoft/vscode/extensions/markdown-language-features/media/highlight.css">
<style>
body {
font-family: -apple-system, BlinkMacSystemFont, 'Segoe WPC', 'Segoe UI', system-ui, 'Ubuntu', 'Droid Sans', sans-serif;
font-size: 14px;
line-height: 1.6;
}
</style>
<style>
.task-list-item {
list-style-type: none;
}

.task-list-item-checkbox {
margin-left: -20px;
vertical-align: middle;
pointer-events: none;
}
</style>
<style>
:root {
--color-note: #0969da;
--color-tip: #1a7f37;
--color-warning: #9a6700;
--color-severe: #bc4c00;
--color-caution: #d1242f;
--color-important: #8250df;
}

</style>
<style>
@media (prefers-color-scheme: dark) {
:root {
--color-note: #2f81f7;
--color-tip: #3fb950;
--color-warning: #d29922;
--color-severe: #db6d28;
--color-caution: #f85149;
--color-important: #a371f7;
}
}

</style>
<style>
.markdown-alert {
padding: 0.5rem 1rem;
margin-bottom: 16px;
color: inherit;
border-left: .25em solid #888;
}

.markdown-alert>:first-child {
margin-top: 0
}

.markdown-alert>:last-child {
margin-bottom: 0
}

.markdown-alert .markdown-alert-title {
display: flex;
font-weight: 500;
align-items: center;
line-height: 1
}

.markdown-alert .markdown-alert-title .octicon {
margin-right: 0.5rem;
display: inline-block;
overflow: visible !important;
vertical-align: text-bottom;
fill: currentColor;
}

.markdown-alert.markdown-alert-note {
border-left-color: var(--color-note);
}

.markdown-alert.markdown-alert-note .markdown-alert-title {
color: var(--color-note);
}

.markdown-alert.markdown-alert-important {
border-left-color: var(--color-important);
}

.markdown-alert.markdown-alert-important .markdown-alert-title {
color: var(--color-important);
}

.markdown-alert.markdown-alert-warning {
border-left-color: var(--color-warning);
}

.markdown-alert.markdown-alert-warning .markdown-alert-title {
color: var(--color-warning);
}

.markdown-alert.markdown-alert-tip {
border-left-color: var(--color-tip);
}

.markdown-alert.markdown-alert-tip .markdown-alert-title {
color: var(--color-tip);
}

.markdown-alert.markdown-alert-caution {
border-left-color: var(--color-caution);
}

.markdown-alert.markdown-alert-caution .markdown-alert-title {
color: var(--color-caution);
}

</style>

</head>
<body class="vscode-body vscode-light">
<p>To start the Quick Start example:</p>
<ol>
<li>
<p>cd to the root project and run:</p>
<p><code>./gradlew quickStart</code></p>
<p>to fill the deploy folder with the required staff for building Docker images.</p>
</li>
<li>
<p>From this directory, run:</p>
<p><code>docker compose up --build</code></p>
<p>to start the Docker compose stack.</p>
</li>
<li>
<p>To clean up the deploy dir, from the root project dir:</p>
<p><code>./gradlew cleanQuickStart</code></p>
</li>
</ol>



</body>
</html>
18 changes: 18 additions & 0 deletions examples/quickstart/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
To start the Quick Start example:

1. cd to the root project and run:

`./gradlew quickStart`

to fill the deploy folder with the required staff for building Docker images.

2. From this directory, run:

`docker compose up --build`

to start the Docker compose stack.

3. To clean up the deploy dir, from the root project dir:

`./gradlew cleanQuickStart`

10 changes: 0 additions & 10 deletions examples/quickstart/build.sh

This file was deleted.

71 changes: 36 additions & 35 deletions examples/quickstart/docker-compose.yaml
Original file line number Diff line number Diff line change
@@ -2,45 +2,46 @@
version: '2'
services:
lightstreamer:
container_name: lightstreamer
build:
dockerfile: Dockerfile.kafka-connector
args:
version: 0.1.0
# context: ../../
# depends_on:
# - broker
depends_on:
- producer
ports:
- 8080:8080

# producer:
# build:
# dockerfile: producer.Dockerfile
# args:
# VERSION: 0.1.0
producer:
container_name: producer
depends_on:
- broker
build:
dockerfile: Dockerfile.producer
command: ["--bootstrap-servers", "broker:29092", "--topic", "stocks"]


# broker:
# image: confluentinc/confluent-local:latest
# ports:
# - "9092:9092"
# - "8082:8082"
# environment:
# KAFKA_NODE_ID: 1
# KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT'
# KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092'
# KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
# KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
# KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
# KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
# KAFKA_SSL_KEYSTORE_LOCATION: /etc/kafka/secrets/kafka.server.keystore.jks
# KAFKA_SSL_KEYSTORE_PASSWORD: password
# KAFKA_PROCESS_ROLES: 'broker,controller'
# KAFKA_CONTROLLER_QUORUM_VOTERS: '1@broker:29093'
# KAFKA_LISTENERS: 'PLAINTEXT://broker:29092,CONTROLLER://broker:29093,PLAINTEXT_HOST://0.0.0.0:9092'
# KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT'
# KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
# KAFKA_LOG_DIRS: '/tmp/kraft-combined-logs'
# KAFKA_REST_HOST_NAME: rest-proxy
# KAFKA_REST_BOOTSTRAP_SERVERS: 'broker:29092'
# KAFKA_REST_LISTENERS: "http://0.0.0.0:8082"
# CLUSTER_ID: '4L6g3nShT-eMCtK--X86sw'
broker:
image: confluentinc/confluent-local:latest
container_name: broker
ports:
- "9092:9092"
- "8082:8082"
environment:
KAFKA_NODE_ID: 1
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT'
KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092'
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_SSL_KEYSTORE_LOCATION: /etc/kafka/secrets/kafka.server.keystore.jks
KAFKA_SSL_KEYSTORE_PASSWORD: password
KAFKA_PROCESS_ROLES: 'broker,controller'
KAFKA_CONTROLLER_QUORUM_VOTERS: '1@broker:29093'
KAFKA_LISTENERS: 'PLAINTEXT://broker:29092,CONTROLLER://broker:29093,PLAINTEXT_HOST://0.0.0.0:9092'
KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT'
KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
KAFKA_LOG_DIRS: '/tmp/kraft-combined-logs'
KAFKA_REST_HOST_NAME: rest-proxy
KAFKA_REST_BOOTSTRAP_SERVERS: 'broker:29092'
KAFKA_REST_LISTENERS: "http://0.0.0.0:8082"
# CLUSTER_ID: '4L6g3nShT-eMCtK--X86sw'
125 changes: 125 additions & 0 deletions examples/quickstart/web/css/table.css
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
/*
* LIGHTSTREAMER - www.lightstreamer.com
* Basic Stock-List Demo
*
* Copyright (c) Lightstreamer Srl
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
.banner {
width: 100%;
height: 110px;
position: relative;
background-image: url("../images/banner.jpg");
background-size: cover;
background-position: center;
}
.logo {
position: absolute;
left: 50%;
transform: translateX(-50%);
}
.banner p {
width: 100%;
position: absolute;
left: 0;
top: 60px;
font-family: "Amazon Ember", Arial, Helvetica, Sans-serif;
font-size: x-large;
color: #063d27;
font-weight: bold;
text-align: center;
}
.ribbon {
z-index: 1;
background-color: #003D06;
overflow: hidden;
white-space: nowrap;
position: absolute;
right: -50px;
top: 40px;
-webkit-transform: rotate(45deg);
-moz-transform: rotate(45deg);
-ms-transform: rotate(45deg);
-o-transform: rotate(45deg);
transform: rotate(45deg);
-webkit-box-shadow: 0 0 10px #888;
-moz-box-shadow: 0 0 10px #888;
box-shadow: 0 0 10px #888;
}
.ribbon a {
color: #fff;
display: block;
font: bold 81.25% 'Helvetica Neue', Helvetica, Arial, sans-serif;
margin: 1px 0;
padding: 10px 50px;
text-align: center;
text-decoration: none;
text-shadow: 0 0 5px #444;
}
.demoTitle {
font-family: Verdana, Arial, Helvetica, Sans-serif;
color: #000000;
font-weight: bold;
display: inline-block;
}
.tableContainer {
overflow-x:auto;
margin-top: 1em;
}
.tableTitle {
font-family: Verdana, Arial, Helvetica, Sans-serif;
color: #ffffff;
background-color: #9494ce;
font-weight: bold;
text-align: right;
}
.disc {
margin-top: 5px;
font-family: Verdana, Arial, Helvetica, sans-serif;
color: #000000;
background: #ffffff;
font-weight: normal;
text-align: left;
}
.stockNameOdd {
font-family: Verdana, Arial, Helvetica, Sans-serif;
color: #000080;
background-color: #eeeeee;
font-weight: bold;
text-align: left;
}
.stockNameEven {
font-family: Verdana, Arial, Helvetica, Sans-serif;
color: #000080;
background-color: #ddddee;
font-weight: bold;
text-align: left;
}
.coldOdd {
font-family: Verdana, Arial, Helvetica, Sans-serif;
color: #000000;
background-color: #eeeeee;
font-weight: normal;
text-align: right;
}
.coldEven {
font-family: Verdana, Arial, Helvetica, Sans-serif;
color: #000000;
background-color: #ddddee;
font-weight: normal;
text-align: right;
}
#wrap {
margin: 0 auto;
}
Binary file added examples/quickstart/web/images/banner.jpg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added examples/quickstart/web/images/logo.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
256 changes: 256 additions & 0 deletions examples/quickstart/web/index.html

Large diffs are not rendered by default.

17 changes: 17 additions & 0 deletions examples/quickstart/web/js/lightstreamer.min.js

Large diffs are not rendered by default.

5 changes: 5 additions & 0 deletions examples/quickstart/web/js/require.js

Large diffs are not rendered by default.

5 changes: 5 additions & 0 deletions kafka-connector-samples/build.gradle
Original file line number Diff line number Diff line change
@@ -31,3 +31,8 @@ task distribuite(type: Copy) {
from fatJar
into "$rootDir/$deployDirName"
}

task quickStart(type: Copy) {
from (fatJar)
into "$rootDir/$quickstartDeployDirName"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,331 @@

/*
* Copyright (C) 2024 Lightstreamer Srl
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.lightstreamer.kafka_connector.samples.producer;

import com.fasterxml.jackson.annotation.JsonProperty;

import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.Random;
import java.util.Timer;
import java.util.TimerTask;

/**
* Simulates an external data feed that supplies quote values for all the stocks needed for the
* demo.
*/
public class FeedSimluator {

public interface ExternalFeedListener {

void onEvent(Stock stock, boolean b);
}

private static final Timer dispatcher = new Timer();
private static final Random random = new Random();

/**
* Used to automatically generate the updates for the 30 stocks: mean and standard deviation of
* the times between consecutive updates for the same stock.
*/
private static final double[] updateTimeMeans = {
30000, 500, 3000, 90000,
7000, 10000, 3000, 7000,
7000, 7000, 500, 3000,
20000, 20000, 20000, 30000,
500, 3000, 90000, 7000,
10000, 3000, 7000, 7000,
7000, 500, 3000, 20000,
20000, 20000,
};

private static final double[] updateTimeStdDevs = {
6000, 300, 1000, 1000,
100, 5000, 1000, 3000,
1000, 6000, 300, 1000,
1000, 4000, 1000, 6000,
300, 1000, 1000, 100,
5000, 1000, 3000, 1000,
6000, 300, 1000, 1000,
4000, 1000,
};

/** Used to generate the initial field values for the 30 stocks. */
private static final double[] refprices = {
3.04, 16.09, 7.19, 3.63, 7.61, 2.30, 15.39, 5.31, 4.86, 7.61, 10.41, 3.94, 6.79, 26.87,
2.27, 13.04, 6.09, 17.19, 13.63, 17.61, 11.30, 5.39, 15.31, 14.86, 17.61, 5.41, 13.94,
16.79, 6.87, 11.27,
};

private static final double[] openprices = {
3.10, 16.20, 7.25, 3.62, 7.65, 2.30, 15.85, 5.31, 4.97, 7.70, 10.50, 3.95, 6.84, 27.05,
2.29, 13.20, 6.20, 17.25, 13.62, 17.65, 11.30, 5.55, 15.31, 14.97, 17.70, 5.42, 13.95,
16.84, 7.05, 11.29,
};
private static final double[] minprices = {
3.09, 15.78, 7.15, 3.62, 7.53, 2.28, 15.60, 5.23, 4.89, 7.70, 10.36, 3.90, 6.81, 26.74,
2.29, 13.09, 5.78, 17.15, 13.62, 17.53, 11.28, 5.60, 15.23, 14.89, 17.70, 5.36, 13.90,
16.81, 6.74, 11.29,
};
private static final double[] maxprices = {
3.19, 16.20, 7.26, 3.71, 7.65, 2.30, 15.89, 5.31, 4.97, 7.86, 10.50, 3.95, 6.87, 27.05,
2.31, 13.19, 6.20, 17.26, 13.71, 17.65, 11.30, 5.89, 15.31, 14.97, 17.86, 5.50, 13.95,
16.87, 7.05, 11.31,
};
private static final String[] stockNames = {
"Anduct", "Ations Europe",
"Bagies Consulting", "BAY Corporation",
"CON Consulting", "Corcor PLC",
"CVS Asia", "Datio PLC",
"Dentems", "ELE Manufacturing",
"Exacktum Systems", "KLA Systems Inc",
"Lted Europe", "Magasconall Capital",
"MED", "Mice Investments",
"Micropline PLC", "Nologicroup Devices",
"Phing Technology", "Pres Partners",
"Quips Devices", "Ress Devices",
"Sacle Research", "Seaging Devices",
"Sems Systems, Inc", "Softwora Consulting",
"Systeria Develop", "Thewlec Asia",
"Virtutis", "Yahl"
};

/** Used to keep the contexts of the 30 stocks. */
private final ArrayList<StockProducer> stockGenerators = new ArrayList<>();

private ExternalFeedListener listener;

/**
* Starts generating update events for the stocks. Sumulates attaching and reading from an
* external broadcast feed.
*/
public void start() {
for (int i = 0; i < 30; i++) {
StockProducer stock = new StockProducer(i);
stockGenerators.add(stock);
long waitTime = stock.computeNextWaitTime();
scheduleGenerator(stock, waitTime);
}
}

/**
* Sets an internal listener for the update events. Since now, the update events were ignored.
*/
public void setFeedListener(ExternalFeedListener listener) {
this.listener = listener;
}

/**
* Generates new values and sends a new update event at the time the producer declared to do it.
*/
private void scheduleGenerator(final StockProducer stock, long waitTime) {
dispatcher.schedule(
new TimerTask() {
public void run() {
long nextWaitTime;
synchronized (stock) {
stock.computeNewValues();
if (listener != null) {
listener.onEvent(stock.getCurrentValues(false), false);
}
nextWaitTime = stock.computeNextWaitTime();
}
scheduleGenerator(stock, nextWaitTime);
}
},
waitTime);
}

/** Manages the current state and generates update events for a single stock. */
private static class StockProducer {
private final int open, ref;
private final double mean, stddev;
private final String name;
private int min, max, last, other;

/** Initializes stock data based on the already prepared values. */
public StockProducer(int itemPos) {
// All prices are converted in integer form to simplify the
// management; they will be converted back before being sent
// in the update events
open = (int) Math.round(openprices[itemPos] * 100);
ref = (int) Math.round(refprices[itemPos] * 100);
min = (int) Math.ceil(minprices[itemPos] * 100);
max = (int) Math.floor(maxprices[itemPos] * 100);
name = stockNames[itemPos];
last = open;
mean = updateTimeMeans[itemPos];
stddev = updateTimeStdDevs[itemPos];
}

/**
* Decides, for ease of simulation, the time at which the next update for the stock will
* happen.
*/
public long computeNextWaitTime() {
long millis;
do {
millis = (long) gaussian(mean, stddev);
} while (millis <= 0);
return millis;
}

/** Changes the current data for the stock. */
public void computeNewValues() {
// this stuff is to ensure that new prices follow a random
// but nondivergent path, centered around the reference price
double limit = ref / 4.0;
int jump = ref / 100;
double relDist = (last - ref) / limit;
int direction = 1;
if (relDist < 0) {
direction = -1;
relDist = -relDist;
}
if (relDist > 1) {
relDist = 1;
}
double weight = (relDist * relDist * relDist);
double prob = (1 - weight) / 2;
boolean goFarther = random.nextDouble() < prob;
if (!goFarther) {
direction *= -1;
}
int difference = uniform(0, jump) * direction;
int gap = ref / 250;
int delta;
if (gap > 0) {
do {
delta = uniform(-gap, gap);
} while (delta == 0);
} else {
delta = 1;
}
last += difference;
other = last + delta;
if (last < min) {
min = last;
}
if (last > max) {
max = last;
}
}

/**
* Picks the stock field values and stores them in a <field->value> HashMap. If fullData is
* false, then only the fields whose value is just changed are considered (though this check
* is not strict).
*/
public Stock getCurrentValues(boolean fullData) {
Stock stock = new Stock();
stock.name = name;
final HashMap<String, String> event = new HashMap<String, String>();

String format = "HH:mm:ss";
SimpleDateFormat formatter = new SimpleDateFormat(format);
Date now = new Date();
event.put("time", formatter.format(now));
event.put("timestamp", Long.toString(now.getTime()));
stock.time = formatter.format(now);
stock.timestamp = Long.toString(now.getTime());
addDecField("last_price", last, event);
stock.last_price = addDecField(last);
if (other > last) {
addDecField("ask", other, event);
stock.ask = addDecField(other);
addDecField("bid", last, event);
stock.bid = addDecField(last);
} else {
addDecField("ask", last, event);
stock.ask = addDecField(last);
addDecField("bid", other, event);
stock.bid = addDecField(other);
}
int quantity;
quantity = uniform(1, 200) * 500;
event.put("bid_quantity", Integer.toString(quantity));
stock.bid_quantity = Integer.toString(quantity);

quantity = uniform(1, 200) * 500;
event.put("ask_quantity", Integer.toString(quantity));
stock.ask_quantity = Integer.toString(quantity);

double var = (last - ref) / (double) ref * 100;
addDecField("pct_change", (int) (var * 100), event);
stock.pct_change = addDecField((int) (var * 100));
if ((last == min) || fullData) {
addDecField("min", min, event);
stock.min = addDecField(min);
}
if ((last == max) || fullData) {
addDecField("max", max, event);
stock.max = addDecField(max);
}
if (fullData) {
event.put("stock_name", name);
addDecField("ref_price", ref, event);
stock.ref_price = addDecField(ref);
addDecField("open_price", open, event);
stock.open_price = addDecField(open);
// since it's a simulator the item is always active
event.put("item_status", "active");
stock.item_status = "active";
}
return stock;
}

private void addDecField(String fld, int val100, HashMap<String, String> target) {
double val = (((double) val100) / 100);
String buf = Double.toString(val);
target.put(fld, buf);
}

private String addDecField(int val100) {
return Double.toString((((double) val100) / 100));
}

private long gaussian(double mean, double stddev) {
double base = random.nextGaussian();
return (long) (base * stddev + mean);
}

private int uniform(int min, int max) {
int base = random.nextInt(max + 1 - min);
return base + min;
}
}

public static class Stock {
@JsonProperty public String name;
@JsonProperty public String time;
@JsonProperty public String timestamp;
@JsonProperty public String last_price;
@JsonProperty public String ask;
@JsonProperty public String bid;
@JsonProperty public String bid_quantity;
@JsonProperty public String ask_quantity;
@JsonProperty public String pct_change;
@JsonProperty public String min;
@JsonProperty public String max;
@JsonProperty public String ref_price;
@JsonProperty public String open_price;
@JsonProperty public String item_status;
}
}
Original file line number Diff line number Diff line change
@@ -17,25 +17,24 @@

package com.lightstreamer.kafka_connector.samples.producer;

import com.lightstreamer.kafka_connector.samples.producer.FeedSimluator.ExternalFeedListener;
import com.lightstreamer.kafka_connector.samples.producer.FeedSimluator.Stock;

import io.confluent.kafka.serializers.KafkaJsonSerializer;

import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;

import picocli.CommandLine;
import picocli.CommandLine.Option;

import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.security.SecureRandom;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

public class Producer implements Runnable {
public class Producer implements Runnable, ExternalFeedListener {

@Option(
names = "--bootstrap-servers",
@@ -46,68 +45,44 @@ public class Producer implements Runnable {
@Option(names = "--topic", description = "The target topic", required = true)
private String topic;

@Option(
names = "--config-path",
description = "The configuration file path",
required = false,
defaultValue = "src/clients/producer/simple-config.properties")
private String configPath;

@Option(
names = "--period",
description = "The interval in ms between two successive executions",
required = false,
defaultValue = "250")
private int periodMs;
private KafkaProducer<String, Stock> producer;

public void run() {
// BasicConfigurator.configure();
// Create producer configs
Properties properties = new Properties();
System.out.println(Paths.get(".").toAbsolutePath());
try (InputStream is = Files.newInputStream(Paths.get(this.configPath))) {
properties.load(is);
} catch (IOException e) {
throw new RuntimeException(e);
}

properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
// Create and start the producer.

try (KafkaProducer<String, String> producer = new KafkaProducer<>(properties); ) {
int key = 0;
while (true) {
String message =
new SecureRandom()
.ints(20, 48, 122)
.mapToObj(Character::toString)
.collect(Collectors.joining());

String keyString = String.valueOf(key++);
ProducerRecord<String, String> record =
new ProducerRecord<String, String>(this.topic, keyString, message);
producer.send(
record,
new Callback() {
public void onCompletion(RecordMetadata metadata, Exception e) {
if (e != null) {
e.printStackTrace();
System.err.println("Send failed");
return;
}
System.out.printf(
"Sent record [key=%s,value=%s]%n to topic [%s]]%n",
record.key(),
record.value(),
record.topic(),
record.partition());
}
});
TimeUnit.MILLISECONDS.sleep(this.periodMs);
}
} catch (Exception e) {
e.printStackTrace();
}
properties.setProperty(
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.setProperty(
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaJsonSerializer.class.getName());

// Create the producer
producer = new KafkaProducer<String, Stock>(properties);

// Create and start the feed simulator.
FeedSimluator simulator = new FeedSimluator();
simulator.setFeedListener(this);
simulator.start();
}

@Override
public void onEvent(Stock stock, boolean b) {
ProducerRecord<String, Stock> record =
new ProducerRecord<>(this.topic, stock.name.replace(' ', '-'), stock);
producer.send(
record,
new Callback() {
public void onCompletion(RecordMetadata metadata, Exception e) {
if (e != null) {
e.printStackTrace();
System.err.println("Send failed");
return;
}
System.out.printf(
"Sent record [key=%s,value=%s]%n to topic [%s]]%n",
record.key(), record.value(), record.topic(), record.partition());
}
});
}

public static void main(String[] args) {
37 changes: 5 additions & 32 deletions kafka-connector/build.gradle
Original file line number Diff line number Diff line change
@@ -12,38 +12,6 @@ dependencies {
implementation group: 'io.confluent', name: 'kafka-json-schema-serializer', version:'7.5.3'
}

task deployAdapters(type: Copy) {
dependsOn 'cleanDeploy'

into "deploy"

from (jar) {
into "connector/lib"
}

from (configurations.runtimeClasspath) {
into "connector/lib"
exclude "ls-adapter-inprocess*"
exclude "jsr305*"
}

from ("examples/quickstart") {
into "connector"
}
}

task deploy(type: Copy) {
dependsOn 'deployAdapters'
into "deploy/conf"
from ("examples/conf")
}

task cleanDeploy(type: Delete) {
delete "deploy"
}

sourceSets.main.java.srcDirs += ['src/clients/java']

javadoc {
include '**/pub/KafkaConnectorMetadataAdapter.java'
exclude '**/pub/*ConnectionInfo*'
@@ -75,3 +43,8 @@ task distribuite(type: Copy) {
from (connectorDistZip)
into "$rootDir/$deployDirName"
}

task quickStart(type: Copy) {
from (connectorDistZip)
into "$rootDir/$quickstartDeployDirName"
}
51 changes: 29 additions & 22 deletions kafka-connector/src/connector/dist/adapters.xml
Original file line number Diff line number Diff line change
@@ -120,47 +120,54 @@
<adapter_class>com.lightstreamer.kafka_connector.adapters.ConnectorDataAdapter</adapter_class>

<!-- The Kafka cluster address -->
<!--<param name="bootstrap.servers">pkc-z9doz.eu-west-1.aws.confluent.cloud:9092</param>-->
<param name="bootstrap.servers">broker:29092</param>

<param name="group.id">test</param>

<param name="group.id">quick_start</param>
<param name="enable">true</param>

<!-- TOPIC MAPPING SECTION -->

<!-- Define a "sample" item-template, which is simply made of the "sample" item name to be used by the Lighstreamer Client subscription. -->
<!-- <param name="item-template.sample1">sample-#{partition=PARTITION}</param>
<param name="item-template.sample2">sample</param> -->
<param name="item-template.sample1">sample-#{key=KEY.name}</param>
<param name="item-template.sample2">sample-#{value=VALUE}</param>
<param name="item-template.stock">stock-#{name=KEY}</param>

<!-- Map the Kafka topic "sample-topic" to the previous defined "sample" item template. -->
<param name="map.avro-topic-1.to">item-template.sample1</param>
<param name="map.stocks.to">item-template.stock</param>

<param name="key.evaluator.type">AVRO</param>
<!-- <param name="key.evaluator.schema.registry.enable">true</param> -->
<param name="key.evaluator.schema.path">user-key.avsc</param>
<param name="value.evaluator.type">AVRO</param>
<!-- <param name="value.evaluator.schema.registry.enable">true</param> -->
<param name="value.evaluator.schema.path">user-value.avsc</param>
<param name="key.evaluator.type">STRING</param>
<param name="value.evaluator.type">JSON</param>

<!-- FIELDS MAPPING SECTION -->

<!-- Extraction of the record key mapped to the field "key". -->
<param name="field.key">#{KEY.name}</param>

<!-- Extraction of the record value mapped to the field "value". -->
<param name="field.value">#{VALUE.name}</param>
<param name="field.stock_name">#{VALUE.name}</param>

<!-- Extraction of the record timestamp to the field "ts". -->
<param name="field.ts">#{TIMESTAMP}</param>
<param name="field.time">#{TIMESTAMP}</param>

<!-- Extraction of the record partition mapped to the field "partition". -->
<param name="field.partition">#{PARTITION}</param>
<param name="field.timestamp">#{PARTITION}</param>

<!-- Extraction of the record offset mapped to the field "offset". -->
<param name="field.offset">#{OFFSET}</param>
<param name="field.last_price">#{OFFSET}</param>

<param name="field.ask">#{VALUE.ask}</param>

<param name="field.ask_quantity">#{VALUE.ask_quantity}</param>

<param name="field.bid">#{VALUE.bid}</param>

<param name="field.bid_quantity">#{VALUE.bid_quantity}</param>

<param name="field.pct_change">#{VALUE.pct_change}</param>

<param name="field.min">#{VALUE.min}</param>

<param name="field.max">#{VALUE.max}</param>

<param name="field.ref_price">#{VALUE.ref_price}</param>

<param name="field.open_price">#{VALUE.open_price}</param>

<param name="field.item_status">#{VALUE.item_status}</param>

</data_provider>

7 changes: 0 additions & 7 deletions kafka-connector/src/connector/dist/log4j.properties
Original file line number Diff line number Diff line change
@@ -16,7 +16,6 @@
log4j.rootLogger=DEBUG, stdout
log4j.logger.org.apache.kafka=WARN, stdout
log4j.logger.QuickStart=TRACE, QuickStartFile
log4j.logger.JsonStart=TRACE, JsonStartFile

log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
@@ -33,9 +32,3 @@ log4j.appender.QuickStartFile=org.apache.log4j.RollingFileAppender
log4j.appender.QuickStartFile.layout=org.apache.log4j.PatternLayout
log4j.appender.QuickStartFile.layout.ConversionPattern=[%d] [%-10c{1}] %-5p %m%n
log4j.appender.QuickStartFile.File=quickstart.log

# JsonStartFile logger appender
log4j.appender.JsonStartFile=org.apache.log4j.RollingFileAppender
log4j.appender.JsonStartFile.layout=org.apache.log4j.PatternLayout
log4j.appender.JsonStartFile.layout.ConversionPattern=[%d] [%-10c{1}] %-5p %m%n
log4j.appender.JsonStartFile.File=jsonstart.log
Original file line number Diff line number Diff line change
@@ -108,13 +108,6 @@ static ConfigsSpec spec() {

static Properties addSchemaRegistry(ConnectorConfig cfg) {
NonNullKeyProperties props = new NonNullKeyProperties();
if (!cfg.isSchemaRegistryEnabled()) {
return props.properties();
}

props.setProperty(
AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, cfg.schemaRegistryUrl());

if (cfg.getKeyEvaluator().equals(EvaluatorType.JSON)) {
props.setProperty(
KafkaJsonSchemaDeserializerConfig.JSON_KEY_TYPE, JsonNode.class.getName());
@@ -124,6 +117,13 @@ static Properties addSchemaRegistry(ConnectorConfig cfg) {
KafkaJsonSchemaDeserializerConfig.JSON_VALUE_TYPE, JsonNode.class.getName());
}

if (!cfg.isSchemaRegistryEnabled()) {
return props.properties();
}

props.setProperty(
AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, cfg.schemaRegistryUrl());

if (cfg.isSchemaRegistryEncryptionEnabled()) {
props.setProperty(
ns(SslConfigs.SSL_PROTOCOL_CONFIG), cfg.schemaRegistrySslProtocol().toString());
Original file line number Diff line number Diff line change
@@ -74,6 +74,11 @@ public GenericRecord deserialize(String topic, byte[] data) {
GenericRecord deserialize = (GenericRecord) deserializer.deserialize(topic, data);
return deserialize;
}

@Override
public void close() {
deserializer.close();
}
}

class GenericRecordLocalSchemaDeserializer extends AbstractLocalSchemaDeserializer<GenericRecord> {
Original file line number Diff line number Diff line change
@@ -75,6 +75,11 @@ public String deserializerClassName() {
public JsonNode deserialize(String topic, byte[] data) {
return deserializer.deserialize(topic, data);
}

@Override
public void close() {
deserializer.close();
}
}

class JsonLocalSchemaDeserializer extends AbstractLocalSchemaDeserializer<JsonNode> {
Original file line number Diff line number Diff line change
@@ -181,7 +181,7 @@ private void notifyDataAdapter(String connectionName, boolean enabled) {
}

/**
* Only used for unit testing
* Only used for unit testing.
*
* @hidden
*/
Original file line number Diff line number Diff line change
@@ -56,7 +56,7 @@ public void shouldDeserializeWithSchemaRegistry() {
}

@Test
public void shouldDeserializeKeyWithSchemaRegistryValueWithLocalSchema() {
public void shouldGetKeyDeserializeWithSchemaRegistryValueDeserializerWithLocalSchema() {
Map<String, String> otherConfigs =
Map.of(
ConnectorConfig.KEY_EVALUATOR_SCHEMA_REGISTRY_ENABLE,
@@ -104,7 +104,7 @@ public void shouldDeserializeKeyWithLocalSchemaValueWithSchemaRegistry() {
}

@Test
public void shouldDeserializeWithLocalSchema() throws IOException {
public void shouldGetKeyAndValueDeserializerWithLocalSchema() throws IOException {
Map<String, String> otherConfigs =
Map.of(
ConnectorConfig.KEY_EVALUATOR_SCHEMA_PATH,
Original file line number Diff line number Diff line change
@@ -37,7 +37,24 @@ public class JsonNodeDeserializerTest {

@Test
public void shouldDeserializeWithNoSchema() {
ConnectorConfig config = ConnectorConfigProvider.minimal();
String s = "{\"stock_name\":\"Ations Europe\"}";
ConnectorConfig config =
ConnectorConfigProvider.minimalWith(
Map.of(ConnectorConfig.VALUE_EVALUATOR_TYPE, "JSON"));
try (JsonNodeDeserializer deser = new JsonNodeDeserializer(config, false)) {
deser.deserialize("topic", s.getBytes());
}
}

@Test
public void shouldGetKeyAndValueDeserializerWithNoSchema() {
ConnectorConfig config =
ConnectorConfigProvider.minimalWith(
Map.of(
ConnectorConfig.KEY_EVALUATOR_TYPE,
"JSON",
ConnectorConfig.VALUE_EVALUATOR_TYPE,
"JSON"));
try (JsonNodeDeserializer deser = new JsonNodeDeserializer(config, true)) {
assertThat(deser.deserializerClassName())
.isEqualTo(KafkaJsonDeserializer.class.getName());
@@ -50,9 +67,11 @@ public void shouldDeserializeWithNoSchema() {
}

@Test
public void shouldDeserializeKeyWithSchemaRegisty() {
public void shouldGeKeyDeserializerWithSchemaRegistry() {
Map<String, String> otherConfigs =
Map.of(
ConnectorConfig.KEY_EVALUATOR_TYPE,
"JSON",
ConnectorConfig.KEY_EVALUATOR_SCHEMA_REGISTRY_ENABLE,
"true",
SchemaRegistryConfigs.URL,
@@ -73,9 +92,11 @@ public void shouldDeserializeKeyWithSchemaRegisty() {
}

@Test
public void shouldDeserializeValueWithSchemaRegsitry() {
public void shouldGetValueDeserializerWithSchemaRegsitry() {
Map<String, String> otherConfigs =
Map.of(
ConnectorConfig.VALUE_EVALUATOR_TYPE,
"JSON",
ConnectorConfig.VALUE_EVALUATOR_SCHEMA_REGISTRY_ENABLE,
"true",
SchemaRegistryConfigs.URL,
@@ -96,9 +117,13 @@ public void shouldDeserializeValueWithSchemaRegsitry() {
}

@Test
public void shouldDeserializeKeyAndValueWithSchemaRegisstry() {
public void shouldGetKeyAndVaueDeserializeWithSchemaRegisstry() {
Map<String, String> otherConfigs =
Map.of(
ConnectorConfig.KEY_EVALUATOR_TYPE,
"JSON",
ConnectorConfig.VALUE_EVALUATOR_TYPE,
"JSON",
ConnectorConfig.VALUE_EVALUATOR_SCHEMA_REGISTRY_ENABLE,
"true",
ConnectorConfig.KEY_EVALUATOR_SCHEMA_REGISTRY_ENABLE,
@@ -120,11 +145,15 @@ public void shouldDeserializeKeyAndValueWithSchemaRegisstry() {
}

@Test
public void shouldDeserializeKeyWithLocalSchema() throws IOException {
public void shouldGetKeyDeserializerWithLocalSchema() throws IOException {
Path adapterDir = Files.createTempDirectory("adapter_dir");
Path keySchemaFile = Files.createTempFile(adapterDir, "key_schema_", "json");
Map<String, String> otherConfigs =
Map.of(ConnectorConfig.KEY_EVALUATOR_SCHEMA_PATH, keySchemaFile.toFile().getName());
Map.of(
ConnectorConfig.KEY_EVALUATOR_TYPE,
"JSON",
ConnectorConfig.KEY_EVALUATOR_SCHEMA_PATH,
keySchemaFile.toFile().getName());
ConnectorConfig config =
ConnectorConfigProvider.minimalWith(adapterDir.toString(), otherConfigs);

@@ -142,11 +171,13 @@ public void shouldDeserializeKeyWithLocalSchema() throws IOException {
}

@Test
public void shouldDeserializeValueWithLocalSchema() throws IOException {
public void shouldGetValueDeserializerWithLocalSchema() throws IOException {
Path adapterDir = Files.createTempDirectory("adapter_dir");
Path valueSchemaFile = Files.createTempFile(adapterDir, "value_schema_", "json");
Map<String, String> otherConfigs =
Map.of(
ConnectorConfig.VALUE_EVALUATOR_TYPE,
"JSON",
ConnectorConfig.VALUE_EVALUATOR_SCHEMA_PATH,
valueSchemaFile.toFile().getName());
ConnectorConfig config =
@@ -166,14 +197,18 @@ public void shouldDeserializeValueWithLocalSchema() throws IOException {
}

@Test
public void shouldDeserializeKeyAndValueWithLocalSchema() throws IOException {
public void shouldGetKeyAndValueDeserializerWithLocalSchema() throws IOException {
Path adapterDir = Files.createTempDirectory("adapter_dir");
Path keySchemaFile = Files.createTempFile(adapterDir, "key_schema_", "json");
Path valueSchemaFile = Files.createTempFile(adapterDir, "value_schema_", "json");
Map<String, String> otherConfigs =
Map.of(
ConnectorConfig.KEY_EVALUATOR_TYPE,
"JSON",
ConnectorConfig.KEY_EVALUATOR_SCHEMA_PATH,
keySchemaFile.toFile().getName(),
ConnectorConfig.VALUE_EVALUATOR_TYPE,
"JSON",
ConnectorConfig.VALUE_EVALUATOR_SCHEMA_PATH,
valueSchemaFile.toFile().getName());
ConnectorConfig config =

0 comments on commit b683d6b

Please sign in to comment.