Skip to content

Commit

Permalink
Checkpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
gfinocchiaro committed Feb 27, 2024
1 parent b683d6b commit 3531904
Show file tree
Hide file tree
Showing 18 changed files with 337 additions and 285 deletions.
Submodule Lightstreamer-example-StockList-client-javascript deleted from ec8c3e
1 change: 1 addition & 0 deletions gradle.properties
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
version = 0.1.0
deployDirName = deploy
quickstartDeployDirName = examples/quickstart/deploy
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@

import com.fasterxml.jackson.annotation.JsonProperty;

import java.text.SimpleDateFormat;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Random;
import java.util.Timer;
import java.util.TimerTask;
Expand All @@ -35,7 +37,7 @@ public class FeedSimluator {

public interface ExternalFeedListener {

void onEvent(Stock stock, boolean b);
void onEvent(Stock stock);
}

private static final Timer dispatcher = new Timer();
Expand Down Expand Up @@ -108,7 +110,7 @@ public interface ExternalFeedListener {
};

/** Used to keep the contexts of the 30 stocks. */
private final ArrayList<StockProducer> stockGenerators = new ArrayList<>();
private final List<StockProducer> stockGenerators = new ArrayList<>();

private ExternalFeedListener listener;

Expand All @@ -117,11 +119,11 @@ public interface ExternalFeedListener {
* external broadcast feed.
*/
public void start() {
for (int i = 0; i < 30; i++) {
for (int i = 0; i < 10; i++) {
StockProducer stock = new StockProducer(i);
stockGenerators.add(stock);
long waitTime = stock.computeNextWaitTime();
scheduleGenerator(stock, waitTime);
// long waitTime = stock.computeNextWaitTime();
scheduleGenerator(stock, 0);
}
}

Expand All @@ -135,15 +137,15 @@ public void setFeedListener(ExternalFeedListener listener) {
/**
* Generates new values and sends a new update event at the time the producer declared to do it.
*/
private void scheduleGenerator(final StockProducer stock, long waitTime) {
private void scheduleGenerator(StockProducer stock, long waitTime) {
dispatcher.schedule(
new TimerTask() {
public void run() {
long nextWaitTime;
synchronized (stock) {
stock.computeNewValues();
if (listener != null) {
listener.onEvent(stock.getCurrentValues(false), false);
listener.onEvent(stock.getCurrentValues(true));
}
nextWaitTime = stock.computeNextWaitTime();
}
Expand Down Expand Up @@ -234,61 +236,56 @@ public void computeNewValues() {
* is not strict).
*/
public Stock getCurrentValues(boolean fullData) {
Stock stock = new Stock();
stock.name = name;
final HashMap<String, String> event = new HashMap<String, String>();
HashMap<String, String> event = new HashMap<String, String>();

String format = "HH:mm:ss";
SimpleDateFormat formatter = new SimpleDateFormat(format);
Date now = new Date();
event.put("time", formatter.format(now));
event.put("timestamp", Long.toString(now.getTime()));
stock.time = formatter.format(now);
stock.timestamp = Long.toString(now.getTime());
LocalDateTime now = LocalDateTime.now();
event.put("time", now.format(DateTimeFormatter.ofPattern("HH:mm:ss")));
event.put("timestamp", String.valueOf(now.toInstant(ZoneOffset.UTC).toEpochMilli()));
addDecField("last_price", last, event);
stock.last_price = addDecField(last);
if (other > last) {
addDecField("ask", other, event);
stock.ask = addDecField(other);
addDecField("bid", last, event);
stock.bid = addDecField(last);
} else {
addDecField("ask", last, event);
stock.ask = addDecField(last);
addDecField("bid", other, event);
stock.bid = addDecField(other);
}
int quantity;
quantity = uniform(1, 200) * 500;
event.put("bid_quantity", Integer.toString(quantity));
stock.bid_quantity = Integer.toString(quantity);

quantity = uniform(1, 200) * 500;
event.put("ask_quantity", Integer.toString(quantity));
stock.ask_quantity = Integer.toString(quantity);

double var = (last - ref) / (double) ref * 100;
addDecField("pct_change", (int) (var * 100), event);
stock.pct_change = addDecField((int) (var * 100));
if ((last == min) || fullData) {
addDecField("min", min, event);
stock.min = addDecField(min);
}
if ((last == max) || fullData) {
addDecField("max", max, event);
stock.max = addDecField(max);
}
if (fullData) {
event.put("stock_name", name);
addDecField("ref_price", ref, event);
stock.ref_price = addDecField(ref);
addDecField("open_price", open, event);
stock.open_price = addDecField(open);
// since it's a simulator the item is always active
event.put("item_status", "active");
stock.item_status = "active";
}
return stock;
return new Stock(
name,
event.get("time"),
event.get("timestamp"),
event.get("last_price"),
event.get("ask"),
event.get("bid"),
event.get("bid_quantity"),
event.get("ask_quantity"),
event.get("pct_change"),
event.get("min"),
event.get("max"),
event.get("ref_price"),
event.get("open_price"),
event.get("item_status"));
}

private void addDecField(String fld, int val100, HashMap<String, String> target) {
Expand All @@ -297,10 +294,6 @@ private void addDecField(String fld, int val100, HashMap<String, String> target)
target.put(fld, buf);
}

private String addDecField(int val100) {
return Double.toString((((double) val100) / 100));
}

private long gaussian(double mean, double stddev) {
double base = random.nextGaussian();
return (long) (base * stddev + mean);
Expand All @@ -312,20 +305,19 @@ private int uniform(int min, int max) {
}
}

public static class Stock {
@JsonProperty public String name;
@JsonProperty public String time;
@JsonProperty public String timestamp;
@JsonProperty public String last_price;
@JsonProperty public String ask;
@JsonProperty public String bid;
@JsonProperty public String bid_quantity;
@JsonProperty public String ask_quantity;
@JsonProperty public String pct_change;
@JsonProperty public String min;
@JsonProperty public String max;
@JsonProperty public String ref_price;
@JsonProperty public String open_price;
@JsonProperty public String item_status;
}
public static record Stock(
@JsonProperty String name,
@JsonProperty String time,
@JsonProperty String timestamp,
@JsonProperty String last_price,
@JsonProperty String ask,
@JsonProperty String bid,
@JsonProperty String bid_quantity,
@JsonProperty String ask_quantity,
@JsonProperty String pct_change,
@JsonProperty String min,
@JsonProperty String max,
@JsonProperty String ref_price,
@JsonProperty String open_price,
@JsonProperty String item_status) {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,9 @@ public void run() {
}

@Override
public void onEvent(Stock stock, boolean b) {
public void onEvent(Stock stock) {
ProducerRecord<String, Stock> record =
new ProducerRecord<>(this.topic, stock.name.replace(' ', '-'), stock);
new ProducerRecord<>(this.topic, stock.name().replace(' ', '-'), stock);
producer.send(
record,
new Callback() {
Expand Down
6 changes: 3 additions & 3 deletions kafka-connector/src/connector/dist/adapters.xml
Original file line number Diff line number Diff line change
Expand Up @@ -141,13 +141,13 @@
<param name="field.stock_name">#{VALUE.name}</param>

<!-- Extraction of the record timestamp to the field "ts". -->
<param name="field.time">#{TIMESTAMP}</param>
<param name="field.time">#{VALUE.time}</param>

<!-- Extraction of the record partition mapped to the field "partition". -->
<param name="field.timestamp">#{PARTITION}</param>
<param name="field.timestamp">#{VALUE.timestamp}</param>

<!-- Extraction of the record offset mapped to the field "offset". -->
<param name="field.last_price">#{OFFSET}</param>
<param name="field.last_price">#{VALUE.last_price}</param>

<param name="field.ask">#{VALUE.ask}</param>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ enum ItemEvaluator implements ItemExpressionEvaluator {
Pattern.compile("(([a-zA-Z\\._]\\w*)=([a-zA-Z0-9\\.\\[\\]\\*]+)),?")),

SUBSCRIBED(
Pattern.compile("([a-zA-Z0-9_-]+)(-<(.*)>)?"),
Pattern.compile("([a-zA-Z0-9_-]+)(-\\[(.*)\\])?"),
Pattern.compile("(([a-zA-Z\\._]\\w*)=([^,]+)),?"));

private final Pattern gobal;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.slf4j.LoggerFactory;

import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -120,11 +121,12 @@ public int mappedValuesSize() {

@Override
public Map<String, String> filter(Selectors<?, ?> selectors) {
return valuesContainers.stream()
Map<String, String> eventsMap = new HashMap<>();
valuesContainers.stream()
.filter(container -> container.selectors().equals(selectors))
.flatMap(container -> container.values().stream())
.filter(v -> v.text() != null)
.collect(Collectors.toMap(Value::name, Value::text));
.forEach(value -> eventsMap.put(value.name(), value.text()));
return eventsMap;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ protected Value eval(GenericRecord record) {
LinkedNode<NodeEvaluator<GenericRecord, Object>> currentNode = linkedNode;
while (currentNode != null) {
if (value == null) {
ValueException.throwNullObject(currentNode.previous().value().name());
ValueException.throwFieldNotFound(currentNode.value().name());
continue;
}
if (value instanceof GenericRecord genericRecord) {
Expand All @@ -185,7 +185,7 @@ protected Value eval(GenericRecord record) {
ValueException.throwNonComplexObjectRequired(expression());
}

String text = value != null ? value.toString() : "NULL";
String text = value != null ? value.toString() : null;
return Value.of(name(), text);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ Value eval(JsonNode node) {
ValueException.throwNonComplexObjectRequired(expression());
}

String text = !node.isNull() ? node.asText() : "NULL";
String text = !node.isNull() ? node.asText() : null;
return Value.of(name(), text);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,11 +120,11 @@ public void shouldOneToMany() {
ItemTemplates<String, JsonNode> templates = Items.templatesFrom(topicsConfig, selected);
assertThat(templates.topics()).containsExactly("topic");

Item subcribingItem1 = Items.itemFrom("template-family-<topic=aSpecificTopic>", "");
Item subcribingItem1 = Items.itemFrom("template-family-[topic=aSpecificTopic]", "");
assertThat(templates.matches(subcribingItem1)).isTrue();

Item subcribingItem2 =
Items.itemFrom("template-relatives-<topic=anotherSpecificTopic>", "");
Items.itemFrom("template-relatives-[topic=anotherSpecificTopic]", "");
assertThat(templates.matches(subcribingItem2)).isTrue();

RecordMapper<String, JsonNode> mapper =
Expand Down Expand Up @@ -163,7 +163,7 @@ public void shouldManyToOne() {
ItemTemplates<String, JsonNode> templates = Items.templatesFrom(topicsConfig, suppliers);
assertThat(templates.topics()).containsExactly("new_orders", "past_orders");

Item subcribingItem = Items.itemFrom("template-orders-<topic=aSpecifgicTopic>", "");
Item subcribingItem = Items.itemFrom("template-orders-[topic=aSpecifgicTopic]", "");
assertThat(templates.matches(subcribingItem)).isTrue();

RecordMapper<String, JsonNode> mapper =
Expand Down Expand Up @@ -255,31 +255,31 @@ static Stream<Arguments> templateArgs() {
List.of("item-#{key=KEY,value=VALUE}"),
List.of(
Items.itemFrom("item", new Object()),
Items.itemFrom("item-<key=key>", new Object()),
Items.itemFrom("item-<key=key,value=value>", new Object()),
Items.itemFrom("item-<value=value>", new Object())),
Items.itemFrom("item-[key=key]", new Object()),
Items.itemFrom("item-[key=key,value=value]", new Object()),
Items.itemFrom("item-[value=value]", new Object())),
List.of(
Items.itemFrom("nonRoutable", new Object()),
Items.itemFrom("item-<key=anotherKey>", new Object()),
Items.itemFrom("item-<value=anotherValue>", new Object()))),
Items.itemFrom("item-[key=anotherKey]", new Object()),
Items.itemFrom("item-[value=anotherValue]", new Object()))),
arguments(
List.of(
"item-#{key=KEY,value=VALUE}",
"item-#{topic=TOPIC}",
"myItem-#{topic=TOPIC}"),
List.of(
Items.itemFrom("item", new Object()),
Items.itemFrom("item-<key=key>", new Object()),
Items.itemFrom("item-<key=key,value=value>", new Object()),
Items.itemFrom("item-<value=value>", new Object()),
Items.itemFrom("item-<topic=topic>", new Object()),
Items.itemFrom("myItem-<topic=topic>", new Object())),
Items.itemFrom("item-[key=key]", new Object()),
Items.itemFrom("item-[key=key,value=value]", new Object()),
Items.itemFrom("item-[value=value]", new Object()),
Items.itemFrom("item-[topic=topic]", new Object()),
Items.itemFrom("myItem-[topic=topic]", new Object())),
List.of(
Items.itemFrom("nonRoutable", new Object()),
Items.itemFrom("item-<key=anotherKey>", new Object()),
Items.itemFrom("item-<value=anotherValue>", new Object()),
Items.itemFrom("item-<topic=anotherTopic>", new Object()),
Items.itemFrom("myItem-<topic=anotherTopic>", new Object()))));
Items.itemFrom("item-[key=anotherKey]", new Object()),
Items.itemFrom("item-[value=anotherValue]", new Object()),
Items.itemFrom("item-[topic=anotherTopic]", new Object()),
Items.itemFrom("myItem-[topic=anotherTopic]", new Object()))));
}

@ParameterizedTest
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ public void shouldNotMatcDueToDifferentPrefix() {
item-first | item-first
item_123_ | item_123_
item- | item-
prefix-<> | prefix
prefix-[] | prefix
""")
public void shouldMakeWithEmptySchemaKeys(String input, String expectedPrefix) {
Object handle = new Object();
Expand All @@ -124,14 +124,14 @@ public void shouldMakeWithEmptySchemaKeys(String input, String expectedPrefix) {
textBlock =
"""
INPUT | EXPECTED_PREFIX | EXPECTED_NAME | EXPECTED_VALUE
item-<name=field1> | item | name | field1
item-first-<height=12.34> | item-first | height | 12.34
item_123_-<test=\\> | item_123_ | test | \\
item-<test=""> | item | test | ""
prefix-<test=>> | prefix | test | >
item-<test=value,> | item | test | value
item-[name=field1] | item | name | field1
item-first-[height=12.34] | item-first | height | 12.34
item_123_-[test=\\] | item_123_ | test | \\
item-[test=""] | item | test | ""
prefix-[test=]] | prefix | test | ]
item-[test=value,] | item | test | value
item- | item- | |
item-<> | item | |
item-[] | item | |
""")
public void shouldMakeWithValue(
String input, String expectedPrefix, String expectedName, String expectedValue) {
Expand All @@ -155,7 +155,7 @@ public void shouldMakeWithValue(
textBlock =
"""
INPUT | EXPECTED_NAME1 | EXPECTED_VALUE1 | EXPECTED_NAME2 | EXPECTED_VALUE2
item-<name1=field1,name2=field2> | name1 | field1 | name2 | field2
item-[name1=field1,name2=field2] | name1 | field1 | name2 | field2
""")
public void shouldMakeWithMoreValues(
String input, String name1, String val1, String name2, String value2) {
Expand Down
Loading

0 comments on commit 3531904

Please sign in to comment.