Skip to content

Commit

Permalink
-Added device id, configurable number of devices and event types and
Browse files Browse the repository at this point in the history
Random payload
  • Loading branch information
ameyb committed Aug 6, 2019
1 parent 880043b commit 49aafda
Show file tree
Hide file tree
Showing 4 changed files with 881 additions and 814 deletions.
9 changes: 7 additions & 2 deletions src/main/java/com/yugabyte/sample/apps/AppBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -365,20 +365,25 @@ protected byte[] getRandomValue(Key key) {
getRandomValue(key, buffer);
return buffer;
}


protected byte[] getRandomValue(Key key, byte[] outBuffer) {
getRandomValue(key, outBuffer.length, outBuffer);
return outBuffer;
}

protected void getRandomValue(Key key, int valueSize, byte[] outBuffer) {
final byte[] keyValueBytes = key.getValueStr().getBytes();
getRandomValue(keyValueBytes, valueSize, outBuffer);
}

protected void getRandomValue(byte[] keyValueBytes, int valueSize, byte[] outBuffer) {
outBuffer[0] = appConfig.restrictValuesToAscii ? ASCII_MARKER : BINARY_MARKER;
final int checksumSize = appConfig.restrictValuesToAscii ? CHECKSUM_ASCII_SIZE : CHECKSUM_SIZE;
final boolean isUseChecksum = isUseChecksum(valueSize, checksumSize);
final int contentSize = valueSize - (isUseChecksum ? checksumSize : 0);
int i = 1;
if (isUsePrefix(valueSize)) {
final byte[] keyValueBytes = key.getValueStr().getBytes();

// Beginning of value is not random, but has format "<MARKER><PREFIX>", where prefix is
// "val: $key" (or part of it in case small value size). This is needed to verify expected
Expand Down
6 changes: 6 additions & 0 deletions src/main/java/com/yugabyte/sample/apps/AppConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -165,4 +165,10 @@ public static enum Type {

// The path to the certificate to be used for the SSL connection.
public String sslCert = null;

public int num_devices = 100;

public int num_event_types = 100;


}
108 changes: 53 additions & 55 deletions src/main/java/com/yugabyte/sample/apps/CassandraEventData.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,13 @@
import com.datastax.driver.core.utils.Bytes;

/**
* A sample IoT event data application with batch processing.
* A sample IoT event data application with batch processing
*
* This app tracks a bunch of metrics which have a series of data points ordered
* by timestamp. The metric's id can be thought of as a compound/concatenated
* This app tracks a bunch of devices which have a series of data points ordered
* by timestamp. The device id can be thought of as a compound/concatenated
* unique id - which could include user id, on a node id and device id (and id
* of the device). For simplicity, we use it as a single metric id to cover all
* of them.
* of the device). Every device can have different types of event codes
* generated.
*/
public class CassandraEventData extends AppBase {
private static final Logger LOG = Logger.getLogger(CassandraEventData.class);
Expand All @@ -63,14 +63,14 @@ public class CassandraEventData extends AppBase {
}

static Random random = new Random();
// The default table name that has the raw metric data.
private final String DEFAULT_TABLE_NAME = "batch_event_data_ts_metrics_raw";
// The structure to hold info per metric.
// The default table name that has the raw device data.
private final String DEFAULT_TABLE_NAME = "event_data_raw";
// The structure to hold info per device.
static List<DataSource> dataSources = new CopyOnWriteArrayList<DataSource>();
// The minimum number of metrics to simulate.
private static int min_metrics_count = 50;
// The maximum number of metrics to simulate.
private static int max_metrics_count = 100;
// The number of devices to simulate.
private static int num_devices = 100;
// The number of event types
private static int num_event_types = 100;
// The shared prepared select statement for fetching the data.
private static volatile PreparedStatement preparedSelect;
// The shared prepared statement for inserting into the table.
Expand All @@ -90,20 +90,12 @@ public void initialize(CmdLineOpts configuration) {

// Read the various params from the command line.
CommandLine commandLine = configuration.getCommandLine();
if (commandLine.hasOption("min_metrics_count")) {
min_metrics_count = Integer.parseInt(commandLine.getOptionValue("min_metrics_count"));
if (commandLine.hasOption("num_devices")) {
num_devices = Integer.parseInt(commandLine.getOptionValue("num_devices"));
}
if (commandLine.hasOption("max_metrics_count")) {
max_metrics_count = Integer.parseInt(commandLine.getOptionValue("max_metrics_count"));
}

// Generate the number of metrics this data source would emit.
int num_metrics = min_metrics_count
+ (max_metrics_count > min_metrics_count ? random.nextInt(max_metrics_count - min_metrics_count)
: 0);

// Create all the metric data sources.
for (int i = 0; i < num_metrics; i++) {
// Create all the device data sources.
for (int i = 0; i < num_devices; i++) {
DataSource dataSource = new DataSource(i);
dataSources.add(dataSource);
}
Expand All @@ -121,13 +113,14 @@ public void dropTable() {

@Override
protected List<String> getCreateTableStatements() {

return Arrays.asList(
String.format("CREATE TABLE IF NOT EXISTS %s %s %s %s;", getTableName()," ( metric_id varchar , ts bigint , event_type varchar , value blob , primary key (metric_id, ts, event_type))"
, " WITH default_time_to_live = ", appConfig.tableTTLSeconds),
String.format("CREATE INDEX IF NOT EXISTS search_by_event_type ON %s %s %s;", getTableName(),
" ( metric_id, event_type, ts ) ",
"WITH transactions = { 'enabled' : false, 'consistency_level' : 'user_enforced' }"));

return Arrays.asList(String.format("CREATE TABLE IF NOT EXISTS %s %s %s %s %s;", getTableName(),
" ( device_id varchar , ts bigint , event_type varchar , value blob , primary key (device_id, ts, event_type))",
" WITH CLUSTERING ORDER BY (ts DESC, event_type ASC)", " AND default_time_to_live = ",
appConfig.tableTTLSeconds),
String.format("CREATE INDEX IF NOT EXISTS search_by_event_type ON %s %s %s %s;", getTableName(),
" ( device_id, event_type, ts ) ", " WITH CLUSTERING ORDER BY (event_type ASC, ts DESC)",
"AND transactions = { 'enabled' : false, 'consistency_level' : 'user_enforced' }"));

}

Expand All @@ -136,8 +129,8 @@ private PreparedStatement getPreparedInsert() {
synchronized (prepareInitLock) {
if (preparedInsert == null) {
// Create the prepared statement object.
String insert_stmt = String.format("INSERT INTO %s (metric_id, ts, event_type, value) VALUES "
+ "(:metric_id, :ts,:event_type, :value);", getTableName());
String insert_stmt = String.format("INSERT INTO %s (device_id, ts, event_type, value) VALUES "
+ "(:device_id, :ts,:event_type, :value);", getTableName());
preparedInsert = getCassandraClient().prepare(insert_stmt);
}
}
Expand All @@ -150,7 +143,7 @@ private PreparedStatement getPreparedSelect() {
synchronized (prepareInitLock) {
if (preparedSelect == null) {
// Create the prepared statement object.
String select_stmt = String.format("SELECT * from %s WHERE metric_id = :metricId AND "
String select_stmt = String.format("SELECT * from %s WHERE device_id = :deviceId AND "
+ "ts > :startTs AND ts < :endTs AND event_type=:event_type ORDER BY ts DESC "
+ "LIMIT :readBatchSize;", getTableName());
preparedSelect = getCassandraClient().prepare(select_stmt);
Expand Down Expand Up @@ -190,17 +183,20 @@ public long doRead() {
long endTs = dataSource.getEndTs();

// Bind the select statement.
BoundStatement select = getPreparedSelect().bind().setString("metricId", dataSource.getMetricId())
BoundStatement select = getPreparedSelect().bind().setString("deviceId", dataSource.getDeviceId())
.setLong("startTs", startTs).setLong("endTs", endTs).setString("event_type", dataSource.getEvent_type())
.setInt("readBatchSize", appConfig.cassandraReadBatchSize);
// Make the query.
ResultSet rs = getCassandraClient().execute(select);
return 1;
}

private ByteBuffer getValue() {
// Initialize byte[] for required size
private ByteBuffer getValue(String device_id) {

byte[] randBytesArr = new byte[appConfig.valueSize];

getRandomValue(device_id.getBytes(), appConfig.valueSize, randBytesArr);

return ByteBuffer.wrap(randBytesArr);
}

Expand All @@ -214,8 +210,9 @@ public long doWrite(int threadIdx) {
// Enter a batch of data points.
long ts = dataSource.getDataEmitTs();
for (int i = 0; i < appConfig.cassandraBatchSize; i++) {
batch.add(getPreparedInsert().bind().setString("metric_id", dataSource.getMetricId()).setLong("ts", ts)
.setString("event_type", dataSource.getEvent_type()).setBytesUnsafe("value", getValue()));
batch.add(getPreparedInsert().bind().setString("device_id", dataSource.getDeviceId()).setLong("ts", ts)
.setString("event_type", dataSource.getEvent_type())
.setBytesUnsafe("value", getValue(dataSource.getDeviceId())));
numKeysWritten++;
ts++;
}
Expand All @@ -227,12 +224,12 @@ public long doWrite(int threadIdx) {
}

/**
* This class represents a single metric data source, which sends back
* timeseries data for that metric. It generates data governed by the emit rate.
* This class represents a single device data source, which sends back
* timeseries data for that device. It generates data governed by the emit rate.
*/
public static class DataSource {
// The list of metrics to emit for this data source.
private String metric_id;
// The list of devices to emit for this data source.
private String device_id;
// The timestamp at which the data emit started.
private long dataEmitStartTs = 1;
// State variable tracking the last time emitted by this source.
Expand All @@ -241,11 +238,11 @@ public static class DataSource {
private String event_type;

public DataSource(int index) {
this.metric_id = String.format("metric-%05d", index);
this.device_id = String.format("device-%05d", index);
}

public String getMetricId() {
return metric_id;
public String getDeviceId() {
return device_id;
}

public boolean getHasEmittedData() {
Expand Down Expand Up @@ -280,30 +277,31 @@ public synchronized void setLastEmittedTs(long ts) {

@Override
public String toString() {
return getMetricId();
return getDeviceId();
}

public String getEvent_type() {
Random rand = new Random();
return Integer.toString(rand.nextInt(99));
return Integer.toString(rand.nextInt(num_event_types));
}

}

@Override
public List<String> getWorkloadDescription() {
return Arrays.asList("Timeseries/IoT app built that simulates metric data emitted by devices periodically. ",
"The data is written into the 'batch_ts_metrics_raw' table, which retains data for one day.",
"Note that the number of metrics written is a lot more than the number of metrics read as ",
"is typical in such workloads, and the payload size for each write is 100 bytes. Every ",
"read query fetches the a limited batch from recently written values for a randome metric.");
return Arrays.asList("Timeseries/IoT app built that simulates device data emitted by devices periodically. ",
"The data is written into the 'event_data_raw' table, which retains data for one day.",
"Note that the number of devices written is a lot more than the number of devices read as ",
"is typical in such workloads, and the payload size for each write can be configurable(in bytes). Every ",
"read query fetches the a limited batch from recently written values for a random device.");
}

@Override
public List<String> getWorkloadOptionalArguments() {
return Arrays.asList("--num_threads_read " + appConfig.numReaderThreads,
"--num_threads_write " + appConfig.numWriterThreads, "--max_metrics_count " + max_metrics_count,
"--table_ttl_seconds " + appConfig.tableTTLSeconds, "--batch_size " + appConfig.cassandraBatchSize,
"--num_threads_write " + appConfig.numWriterThreads, "--num_devices " + num_devices,
"--num_event_types " + num_event_types, "--table_ttl_seconds " + appConfig.tableTTLSeconds,
"--batch_size " + appConfig.cassandraBatchSize,
"--read_batch_size " + appConfig.cassandraReadBatchSize);
}
}
Loading

0 comments on commit 49aafda

Please sign in to comment.