diff --git a/src/main/java/com/yugabyte/sample/apps/AppBase.java b/src/main/java/com/yugabyte/sample/apps/AppBase.java index 6219d9a..1519776 100644 --- a/src/main/java/com/yugabyte/sample/apps/AppBase.java +++ b/src/main/java/com/yugabyte/sample/apps/AppBase.java @@ -365,20 +365,25 @@ protected byte[] getRandomValue(Key key) { getRandomValue(key, buffer); return buffer; } + protected byte[] getRandomValue(Key key, byte[] outBuffer) { getRandomValue(key, outBuffer.length, outBuffer); return outBuffer; } - + protected void getRandomValue(Key key, int valueSize, byte[] outBuffer) { + final byte[] keyValueBytes = key.getValueStr().getBytes(); + getRandomValue(keyValueBytes, valueSize, outBuffer); + } + + protected void getRandomValue(byte[] keyValueBytes, int valueSize, byte[] outBuffer) { outBuffer[0] = appConfig.restrictValuesToAscii ? ASCII_MARKER : BINARY_MARKER; final int checksumSize = appConfig.restrictValuesToAscii ? CHECKSUM_ASCII_SIZE : CHECKSUM_SIZE; final boolean isUseChecksum = isUseChecksum(valueSize, checksumSize); final int contentSize = valueSize - (isUseChecksum ? checksumSize : 0); int i = 1; if (isUsePrefix(valueSize)) { - final byte[] keyValueBytes = key.getValueStr().getBytes(); // Beginning of value is not random, but has format "", where prefix is // "val: $key" (or part of it in case small value size). This is needed to verify expected diff --git a/src/main/java/com/yugabyte/sample/apps/AppConfig.java b/src/main/java/com/yugabyte/sample/apps/AppConfig.java index 4068865..35a6495 100644 --- a/src/main/java/com/yugabyte/sample/apps/AppConfig.java +++ b/src/main/java/com/yugabyte/sample/apps/AppConfig.java @@ -165,4 +165,10 @@ public static enum Type { // The path to the certificate to be used for the SSL connection. public String sslCert = null; + + public int num_devices = 100; + + public int num_event_types = 100; + + } diff --git a/src/main/java/com/yugabyte/sample/apps/CassandraEventData.java b/src/main/java/com/yugabyte/sample/apps/CassandraEventData.java index 8cc1aeb..6976458 100644 --- a/src/main/java/com/yugabyte/sample/apps/CassandraEventData.java +++ b/src/main/java/com/yugabyte/sample/apps/CassandraEventData.java @@ -32,13 +32,13 @@ import com.datastax.driver.core.utils.Bytes; /** - * A sample IoT event data application with batch processing. + * A sample IoT event data application with batch processing * - * This app tracks a bunch of metrics which have a series of data points ordered - * by timestamp. The metric's id can be thought of as a compound/concatenated + * This app tracks a bunch of devices which have a series of data points ordered + * by timestamp. The device id can be thought of as a compound/concatenated * unique id - which could include user id, on a node id and device id (and id - * of the device). For simplicity, we use it as a single metric id to cover all - * of them. + * of the device). Every device can have different types of event codes + * generated. */ public class CassandraEventData extends AppBase { private static final Logger LOG = Logger.getLogger(CassandraEventData.class); @@ -63,14 +63,14 @@ public class CassandraEventData extends AppBase { } static Random random = new Random(); - // The default table name that has the raw metric data. - private final String DEFAULT_TABLE_NAME = "batch_event_data_ts_metrics_raw"; - // The structure to hold info per metric. + // The default table name that has the raw device data. + private final String DEFAULT_TABLE_NAME = "event_data_raw"; + // The structure to hold info per device. static List dataSources = new CopyOnWriteArrayList(); - // The minimum number of metrics to simulate. - private static int min_metrics_count = 50; - // The maximum number of metrics to simulate. - private static int max_metrics_count = 100; + // The number of devices to simulate. + private static int num_devices = 100; + // The number of event types + private static int num_event_types = 100; // The shared prepared select statement for fetching the data. private static volatile PreparedStatement preparedSelect; // The shared prepared statement for inserting into the table. @@ -90,20 +90,12 @@ public void initialize(CmdLineOpts configuration) { // Read the various params from the command line. CommandLine commandLine = configuration.getCommandLine(); - if (commandLine.hasOption("min_metrics_count")) { - min_metrics_count = Integer.parseInt(commandLine.getOptionValue("min_metrics_count")); + if (commandLine.hasOption("num_devices")) { + num_devices = Integer.parseInt(commandLine.getOptionValue("num_devices")); } - if (commandLine.hasOption("max_metrics_count")) { - max_metrics_count = Integer.parseInt(commandLine.getOptionValue("max_metrics_count")); - } - - // Generate the number of metrics this data source would emit. - int num_metrics = min_metrics_count - + (max_metrics_count > min_metrics_count ? random.nextInt(max_metrics_count - min_metrics_count) - : 0); - // Create all the metric data sources. - for (int i = 0; i < num_metrics; i++) { + // Create all the device data sources. + for (int i = 0; i < num_devices; i++) { DataSource dataSource = new DataSource(i); dataSources.add(dataSource); } @@ -121,13 +113,14 @@ public void dropTable() { @Override protected List getCreateTableStatements() { - - return Arrays.asList( - String.format("CREATE TABLE IF NOT EXISTS %s %s %s %s;", getTableName()," ( metric_id varchar , ts bigint , event_type varchar , value blob , primary key (metric_id, ts, event_type))" - , " WITH default_time_to_live = ", appConfig.tableTTLSeconds), - String.format("CREATE INDEX IF NOT EXISTS search_by_event_type ON %s %s %s;", getTableName(), - " ( metric_id, event_type, ts ) ", - "WITH transactions = { 'enabled' : false, 'consistency_level' : 'user_enforced' }")); + + return Arrays.asList(String.format("CREATE TABLE IF NOT EXISTS %s %s %s %s %s;", getTableName(), + " ( device_id varchar , ts bigint , event_type varchar , value blob , primary key (device_id, ts, event_type))", + " WITH CLUSTERING ORDER BY (ts DESC, event_type ASC)", " AND default_time_to_live = ", + appConfig.tableTTLSeconds), + String.format("CREATE INDEX IF NOT EXISTS search_by_event_type ON %s %s %s %s;", getTableName(), + " ( device_id, event_type, ts ) ", " WITH CLUSTERING ORDER BY (event_type ASC, ts DESC)", + "AND transactions = { 'enabled' : false, 'consistency_level' : 'user_enforced' }")); } @@ -136,8 +129,8 @@ private PreparedStatement getPreparedInsert() { synchronized (prepareInitLock) { if (preparedInsert == null) { // Create the prepared statement object. - String insert_stmt = String.format("INSERT INTO %s (metric_id, ts, event_type, value) VALUES " - + "(:metric_id, :ts,:event_type, :value);", getTableName()); + String insert_stmt = String.format("INSERT INTO %s (device_id, ts, event_type, value) VALUES " + + "(:device_id, :ts,:event_type, :value);", getTableName()); preparedInsert = getCassandraClient().prepare(insert_stmt); } } @@ -150,7 +143,7 @@ private PreparedStatement getPreparedSelect() { synchronized (prepareInitLock) { if (preparedSelect == null) { // Create the prepared statement object. - String select_stmt = String.format("SELECT * from %s WHERE metric_id = :metricId AND " + String select_stmt = String.format("SELECT * from %s WHERE device_id = :deviceId AND " + "ts > :startTs AND ts < :endTs AND event_type=:event_type ORDER BY ts DESC " + "LIMIT :readBatchSize;", getTableName()); preparedSelect = getCassandraClient().prepare(select_stmt); @@ -190,7 +183,7 @@ public long doRead() { long endTs = dataSource.getEndTs(); // Bind the select statement. - BoundStatement select = getPreparedSelect().bind().setString("metricId", dataSource.getMetricId()) + BoundStatement select = getPreparedSelect().bind().setString("deviceId", dataSource.getDeviceId()) .setLong("startTs", startTs).setLong("endTs", endTs).setString("event_type", dataSource.getEvent_type()) .setInt("readBatchSize", appConfig.cassandraReadBatchSize); // Make the query. @@ -198,9 +191,12 @@ public long doRead() { return 1; } - private ByteBuffer getValue() { - // Initialize byte[] for required size + private ByteBuffer getValue(String device_id) { + byte[] randBytesArr = new byte[appConfig.valueSize]; + + getRandomValue(device_id.getBytes(), appConfig.valueSize, randBytesArr); + return ByteBuffer.wrap(randBytesArr); } @@ -214,8 +210,9 @@ public long doWrite(int threadIdx) { // Enter a batch of data points. long ts = dataSource.getDataEmitTs(); for (int i = 0; i < appConfig.cassandraBatchSize; i++) { - batch.add(getPreparedInsert().bind().setString("metric_id", dataSource.getMetricId()).setLong("ts", ts) - .setString("event_type", dataSource.getEvent_type()).setBytesUnsafe("value", getValue())); + batch.add(getPreparedInsert().bind().setString("device_id", dataSource.getDeviceId()).setLong("ts", ts) + .setString("event_type", dataSource.getEvent_type()) + .setBytesUnsafe("value", getValue(dataSource.getDeviceId()))); numKeysWritten++; ts++; } @@ -227,12 +224,12 @@ public long doWrite(int threadIdx) { } /** - * This class represents a single metric data source, which sends back - * timeseries data for that metric. It generates data governed by the emit rate. + * This class represents a single device data source, which sends back + * timeseries data for that device. It generates data governed by the emit rate. */ public static class DataSource { - // The list of metrics to emit for this data source. - private String metric_id; + // The list of devices to emit for this data source. + private String device_id; // The timestamp at which the data emit started. private long dataEmitStartTs = 1; // State variable tracking the last time emitted by this source. @@ -241,11 +238,11 @@ public static class DataSource { private String event_type; public DataSource(int index) { - this.metric_id = String.format("metric-%05d", index); + this.device_id = String.format("device-%05d", index); } - public String getMetricId() { - return metric_id; + public String getDeviceId() { + return device_id; } public boolean getHasEmittedData() { @@ -280,30 +277,31 @@ public synchronized void setLastEmittedTs(long ts) { @Override public String toString() { - return getMetricId(); + return getDeviceId(); } public String getEvent_type() { Random rand = new Random(); - return Integer.toString(rand.nextInt(99)); + return Integer.toString(rand.nextInt(num_event_types)); } } @Override public List getWorkloadDescription() { - return Arrays.asList("Timeseries/IoT app built that simulates metric data emitted by devices periodically. ", - "The data is written into the 'batch_ts_metrics_raw' table, which retains data for one day.", - "Note that the number of metrics written is a lot more than the number of metrics read as ", - "is typical in such workloads, and the payload size for each write is 100 bytes. Every ", - "read query fetches the a limited batch from recently written values for a randome metric."); + return Arrays.asList("Timeseries/IoT app built that simulates device data emitted by devices periodically. ", + "The data is written into the 'event_data_raw' table, which retains data for one day.", + "Note that the number of devices written is a lot more than the number of devices read as ", + "is typical in such workloads, and the payload size for each write can be configurable(in bytes). Every ", + "read query fetches the a limited batch from recently written values for a random device."); } @Override public List getWorkloadOptionalArguments() { return Arrays.asList("--num_threads_read " + appConfig.numReaderThreads, - "--num_threads_write " + appConfig.numWriterThreads, "--max_metrics_count " + max_metrics_count, - "--table_ttl_seconds " + appConfig.tableTTLSeconds, "--batch_size " + appConfig.cassandraBatchSize, + "--num_threads_write " + appConfig.numWriterThreads, "--num_devices " + num_devices, + "--num_event_types " + num_event_types, "--table_ttl_seconds " + appConfig.tableTTLSeconds, + "--batch_size " + appConfig.cassandraBatchSize, "--read_batch_size " + appConfig.cassandraReadBatchSize); } } diff --git a/src/main/java/com/yugabyte/sample/common/CmdLineOpts.java b/src/main/java/com/yugabyte/sample/common/CmdLineOpts.java index edf7169..96f72ee 100644 --- a/src/main/java/com/yugabyte/sample/common/CmdLineOpts.java +++ b/src/main/java/com/yugabyte/sample/common/CmdLineOpts.java @@ -34,765 +34,823 @@ import com.yugabyte.sample.apps.*; /** - * This is a helper class to parse the user specified command-line options if - * they were specified, print help messages when running the app, etc. + * This is a helper class to parse the user specified command-line options if they were specified, + * print help messages when running the app, etc. */ public class CmdLineOpts { - private static final Logger LOG = Logger.getLogger(CmdLineOpts.class); - - // This is a unique UUID that is created by each instance of the application. - // This UUID is used in - // various apps to make the keys unique. This allows us to run multiple - // instances of the app - // safely. - public static UUID loadTesterUUID; - - // The various apps present in this sample. - private final static List HELP_WORKLOADS = ImmutableList.of(CassandraHelloWorld.class, - CassandraKeyValue.class, CassandraRangeKeyValue.class, CassandraBatchKeyValue.class, - CassandraBatchTimeseries.class, CassandraEventData.class, CassandraTransactionalKeyValue.class, - CassandraTransactionalRestartRead.class, CassandraStockTicker.class, CassandraTimeseries.class, - CassandraUserId.class, CassandraPersonalization.class, CassandraSecondaryIndex.class, - CassandraUniqueSecondaryIndex.class, RedisKeyValue.class, RedisPipelinedKeyValue.class, - RedisHashPipelined.class, RedisYBClientKeyValue.class, SqlInserts.class, SqlUpdates.class, - SqlSecondaryIndex.class, SqlSnapshotTxns.class); - - // The class type of the app needed to spawn new objects. - private Class appClass; - // List of database contact points. One contact point could be resolved to - // multiple nodes IPs. - public List contactPoints = new ArrayList<>(); - // The number of reader threads to spawn for OLTP apps. - int numReaderThreads; - // The number of writer threads to spawn for OLTP apps. - int numWriterThreads; - boolean readOnly = false; - boolean localReads = false; - // Random number generator. - Random random = new Random(); - // Command line opts parser. - CommandLine commandLine; - - public void initialize(CommandLine commandLine) throws ClassNotFoundException { - this.commandLine = commandLine; - if (commandLine.hasOption("uuid")) { - loadTesterUUID = UUID.fromString(commandLine.getOptionValue("uuid")); - LOG.info("Using given UUID : " + loadTesterUUID); - } else if (commandLine.hasOption("nouuid")) { - loadTesterUUID = null; - LOG.info("Using NO UUID"); - } else { - loadTesterUUID = UUID.randomUUID(); - LOG.info("Using a randomly generated UUID : " + loadTesterUUID); - } - - // Get the workload. - String appName = commandLine.getOptionValue("workload"); - appClass = getAppClass(appName); - LOG.info("App: " + appClass.getSimpleName()); - - AppBase.appConfig.appName = appClass.getSimpleName(); - - if (commandLine.hasOption("skip_workload")) { - AppBase.appConfig.skipWorkload = true; - // For now, drop table is the only op that is permitted without workload. - if (!commandLine.hasOption("drop_table_name")) { - LOG.error("Table name to be dropped is expected when skipping workload run."); - System.exit(1); - } - } - - if (commandLine.hasOption("run_time")) { - AppBase.appConfig.runTimeSeconds = Integer.parseInt(commandLine.getOptionValue("run_time")); - } - LOG.info("Run time (seconds): " + AppBase.appConfig.runTimeSeconds); - - // Get the proxy contact points. - List hostPortList = Arrays.asList(commandLine.getOptionValue("nodes").split(",")); - for (String hostPort : hostPortList) { - LOG.info("Adding node: " + hostPort); - this.contactPoints.add(ContactPoint.fromHostPort(hostPort)); - } - - // This check needs to be done before initializeThreadCount is called. - if (commandLine.hasOption("read_only")) { - AppBase.appConfig.readOnly = true; - readOnly = true; - if (!commandLine.hasOption("uuid") && !commandLine.hasOption("nouuid")) { - LOG.error("uuid (or nouuid) needs to be provided when using --read-only"); - System.exit(1); - } - } - - // Set the number of threads. - initializeThreadCount(commandLine); - // Initialize num keys. - initializeNumKeys(commandLine); - // Initialize table properties. - initializeTableProperties(commandLine); - if (commandLine.hasOption("local_reads")) { - AppBase.appConfig.localReads = true; - localReads = true; - } - LOG.info("Local reads: " + localReads); - LOG.info("Read only load: " + readOnly); - if (appName.equals(CassandraBatchTimeseries.class.getSimpleName())) { - if (commandLine.hasOption("read_batch_size")) { - AppBase.appConfig.cassandraReadBatchSize = Integer - .parseInt(commandLine.getOptionValue("read_batch_size")); - LOG.info("CassandraBatchTimeseries batch size: " + AppBase.appConfig.cassandraReadBatchSize); - } - if (commandLine.hasOption("read_back_delta_from_now")) { - AppBase.appConfig.readBackDeltaTimeFromNow = Integer - .parseInt(commandLine.getOptionValue("read_back_delta_from_now")); - LOG.info("CassandraBatchTimeseries delta read: " + AppBase.appConfig.readBackDeltaTimeFromNow); - } - } - if (appName.equals(CassandraEventData.class.getSimpleName())) { - if (commandLine.hasOption("read_batch_size")) { - AppBase.appConfig.cassandraReadBatchSize = Integer - .parseInt(commandLine.getOptionValue("read_batch_size")); - LOG.info("CassandraEventData batch size: " + AppBase.appConfig.cassandraReadBatchSize); - } - if (commandLine.hasOption("read_back_delta_from_now")) { - AppBase.appConfig.readBackDeltaTimeFromNow = Integer - .parseInt(commandLine.getOptionValue("read_back_delta_from_now")); - LOG.info("CassandraEventData delta read: " + AppBase.appConfig.readBackDeltaTimeFromNow); - } - } - if (commandLine.hasOption("batch_size")) { - AppBase.appConfig.cassandraBatchSize = Integer.parseInt(commandLine.getOptionValue("batch_size")); - if (AppBase.appConfig.cassandraBatchSize > AppBase.appConfig.numUniqueKeysToWrite) { - LOG.fatal("The batch size cannot be more than the number of unique keys"); - System.exit(-1); - } - LOG.info("Batch size : " + AppBase.appConfig.cassandraBatchSize); - } - if (appName.equals(CassandraPersonalization.class.getSimpleName())) { - if (commandLine.hasOption("num_stores")) { - AppBase.appConfig.numStores = Integer.parseInt(commandLine.getOptionValue("num_stores")); - } - LOG.info("CassandraPersonalization number of stores : " + AppBase.appConfig.numStores); - if (commandLine.hasOption("num_new_coupons_per_customer")) { - AppBase.appConfig.numNewCouponsPerCustomer = Integer - .parseInt(commandLine.getOptionValue("num_new_coupons_per_customer")); - } - LOG.info("CassandraPersonalization number of new coupons per costomer : " - + AppBase.appConfig.numNewCouponsPerCustomer); - if (commandLine.hasOption("max_coupons_per_customer")) { - AppBase.appConfig.maxCouponsPerCustomer = Integer - .parseInt(commandLine.getOptionValue("max_coupons_per_customer")); - } - if (AppBase.appConfig.numNewCouponsPerCustomer > AppBase.appConfig.maxCouponsPerCustomer) { - LOG.fatal("The number of new coupons cannot exceed the maximum number of coupons per customer"); - System.exit(-1); - } - LOG.info("CassandraPersonalization maximum number of coupons per costomer : " - + AppBase.appConfig.maxCouponsPerCustomer); - } - if (appName.equals(CassandraSecondaryIndex.class.getSimpleName())) { - if (commandLine.hasOption("non_transactional_index")) { - AppBase.appConfig.nonTransactionalIndex = true; - } - LOG.info("CassandraSecondaryIndex non-transactional index"); - if (commandLine.hasOption("batch_write")) { - AppBase.appConfig.batchWrite = true; - } - LOG.info("CassandraSecondaryIndex batch write"); - } - if (appName.equals(RedisPipelinedKeyValue.class.getSimpleName()) - || appName.equals(RedisHashPipelined.class.getSimpleName())) { - if (commandLine.hasOption("pipeline_length")) { - AppBase.appConfig.redisPipelineLength = Integer.parseInt(commandLine.getOptionValue("pipeline_length")); - if (AppBase.appConfig.redisPipelineLength > AppBase.appConfig.numUniqueKeysToWrite) { - LOG.fatal("The pipeline length cannot be more than the number of unique keys"); - System.exit(-1); - } - } - LOG.info("RedisPipelinedKeyValue pipeline length : " + AppBase.appConfig.redisPipelineLength); - } - if (appName.equals(RedisHashPipelined.class.getSimpleName())) { - if (commandLine.hasOption("num_subkeys_per_key")) { - AppBase.appConfig.numSubkeysPerKey = Integer - .parseInt(commandLine.getOptionValue("num_subkeys_per_key")); - if (AppBase.appConfig.redisPipelineLength > AppBase.appConfig.numUniqueKeysToWrite) { - LOG.fatal("The pipeline length cannot be more than the number of unique keys"); - System.exit(-1); - } - } - if (commandLine.hasOption("key_freq_zipf_exponent")) { - AppBase.appConfig.keyUpdateFreqZipfExponent = Double - .parseDouble(commandLine.getOptionValue("key_freq_zipf_exponent")); - } - if (commandLine.hasOption("subkey_freq_zipf_exponent")) { - AppBase.appConfig.subkeyUpdateFreqZipfExponent = Double - .parseDouble(commandLine.getOptionValue("subkey_freq_zipf_exponent")); - } - if (commandLine.hasOption("subkey_value_size_zipf_exponent")) { - AppBase.appConfig.valueSizeZipfExponent = Double - .parseDouble(commandLine.getOptionValue("subkey_value_size_zipf_exponent")); - } - if (commandLine.hasOption("subkey_value_max_size")) { - AppBase.appConfig.maxValueSize = Integer.parseInt(commandLine.getOptionValue("subkey_value_max_size")); - } - if (commandLine.hasOption("num_subkeys_per_write")) { - AppBase.appConfig.numSubkeysPerWrite = Integer - .parseInt(commandLine.getOptionValue("num_subkeys_per_write")); - if (AppBase.appConfig.numSubkeysPerWrite > AppBase.appConfig.numSubkeysPerKey) { - LOG.fatal("Writing more subkeys than the number of subkeys per key."); - System.exit(-1); - } - } - if (commandLine.hasOption("num_subkeys_per_read")) { - AppBase.appConfig.numSubkeysPerRead = Integer - .parseInt(commandLine.getOptionValue("num_subkeys_per_read")); - if (AppBase.appConfig.numSubkeysPerRead > AppBase.appConfig.numSubkeysPerKey) { - LOG.fatal("Writing more subkeys than the number of subkeys per key."); - System.exit(-1); - } - } - } - if (commandLine.hasOption("with_local_dc")) { - if (AppBase.appConfig.disableYBLoadBalancingPolicy == true) { - LOG.error("--disable_yb_load_balancing_policy cannot be used with --with_local_dc"); - System.exit(1); - } - AppBase.appConfig.localDc = commandLine.getOptionValue("with_local_dc"); - } - if (commandLine.hasOption("use_redis_cluster")) { - AppBase.appConfig.useRedisCluster = true; - } - if (commandLine.hasOption("yql_username")) { - if (!commandLine.hasOption("yql_password")) { - LOG.error("--yql_username requires --yql_password to be set"); - System.exit(1); - } - AppBase.appConfig.cassandraUsername = commandLine.getOptionValue("yql_username"); - } - if (commandLine.hasOption("yql_password")) { - if (!commandLine.hasOption("yql_username")) { - LOG.error("--yql_password requires --yql_username to be set"); - System.exit(1); - } - AppBase.appConfig.cassandraPassword = commandLine.getOptionValue("yql_password"); - } - if (commandLine.hasOption("concurrent_clients")) { - AppBase.appConfig.concurrentClients = Integer.parseInt(commandLine.getOptionValue("concurrent_clients")); - } - if (commandLine.hasOption("ssl_cert")) { - AppBase.appConfig.sslCert = commandLine.getOptionValue("ssl_cert"); - } - } - - /** - * Creates new instance of the app. - * - * @return the app instance. - */ - public AppBase createAppInstance() { - return createAppInstance(true /* enableMetrics */); - } - - /** - * Creates new instance of the app. - * - * @param enableMetrics Should metrics tracker be enabled. - * @return the app instance. - */ - public AppBase createAppInstance(boolean enableMetrics) { - AppBase workload = null; - try { - // Create a new workload object. - workload = appClass.newInstance(); - // Initialize the workload. - workload.workloadInit(this, enableMetrics); - } catch (Exception e) { - LOG.error("Could not create instance of " + appClass.getName(), e); - } - return workload; - } - - public CommandLine getCommandLine() { - return commandLine; - } - - public List getContactPoints() { - return contactPoints; - } - - public ContactPoint getRandomContactPoint() { - int contactPointId = random.nextInt(contactPoints.size()); - LOG.debug("Returning random contact point id " + contactPointId); - return contactPoints.get(contactPointId); - } - - public int getNumReaderThreads() { - return numReaderThreads; - } - - public int getNumWriterThreads() { - return numWriterThreads; - } - - public boolean getReadOnly() { - return readOnly; - } - - public boolean doErrorChecking() { - return AppBase.appConfig.sanityCheckAtEnd; - } - - public boolean shouldDropTable() { - return AppBase.appConfig.shouldDropTable; - } - - public boolean skipWorkload() { - return AppBase.appConfig.skipWorkload; - } - - public String appName() { - return AppBase.appConfig.appName; - } - - private static Class getAppClass(String workloadType) throws ClassNotFoundException { - // Get the workload class. - return Class.forName("com.yugabyte.sample.apps." + workloadType).asSubclass(AppBase.class); - } - - private void initializeThreadCount(CommandLine cmd) { - // Check if there are a fixed number of threads or variable. - String numThreadsStr = cmd.getOptionValue("num_threads"); - if (readOnly) { - numReaderThreads = AppBase.appConfig.numReaderThreads; - numWriterThreads = 0; - } else if (AppBase.appConfig.readIOPSPercentage == -1) { - numReaderThreads = AppBase.appConfig.numReaderThreads; - numWriterThreads = AppBase.appConfig.numWriterThreads; - } else { - int numThreads = 0; - if (numThreadsStr != null) { - numThreads = Integer.parseInt(numThreadsStr); - } else { - // Default to 8 * num-cores - numThreads = 8 * Runtime.getRuntime().availableProcessors(); - } - numReaderThreads = (int) Math.round(1.0 * numThreads * AppBase.appConfig.readIOPSPercentage / 100); - numWriterThreads = numThreads - numReaderThreads; - } - - // If number of read and write threads are specified on the command line, that - // overrides all - // the other values. - if (cmd.hasOption("num_threads_read")) { - numReaderThreads = Integer.parseInt(cmd.getOptionValue("num_threads_read")); - } - if (cmd.hasOption("num_threads_write")) { - if (readOnly) { - LOG.warn("Ignoring num_threads_write option. It shouldn't be used with read_only."); - } else { - numWriterThreads = Integer.parseInt(cmd.getOptionValue("num_threads_write")); - } - } - LOG.info("Num reader threads: " + numReaderThreads + ", num writer threads: " + numWriterThreads); - } - - private void initializeNumKeys(CommandLine cmd) { - if (cmd.hasOption("num_writes")) { - AppBase.appConfig.numKeysToWrite = Long.parseLong(cmd.getOptionValue("num_writes")); - } - if (cmd.hasOption("num_reads")) { - AppBase.appConfig.numKeysToRead = Long.parseLong(cmd.getOptionValue("num_reads")); - } - if (cmd.hasOption("num_unique_keys")) { - AppBase.appConfig.numUniqueKeysToWrite = Long.parseLong(cmd.getOptionValue("num_unique_keys")); - } - AppBase.appConfig.maxWrittenKey = Long - .parseLong(cmd.getOptionValue("max_written_key", String.valueOf(AppBase.appConfig.maxWrittenKey))); - if (cmd.hasOption("value_size")) { - AppBase.appConfig.valueSize = Integer.parseInt(cmd.getOptionValue("value_size")); - } - if (cmd.hasOption("sleep_time")) { - AppBase.appConfig.sleepTime = Integer.parseInt(cmd.getOptionValue("sleep_time")); - } - if (cmd.hasOption("socket_timeout")) { - AppBase.appConfig.jedisSocketTimeout = Integer.parseInt(cmd.getOptionValue("socket_timeout")); - } - if (cmd.hasOption("use_ascii_values")) { - AppBase.appConfig.restrictValuesToAscii = true; - } - if (cmd.hasOption("sanity_check_at_end")) { - AppBase.appConfig.sanityCheckAtEnd = true; - } - if (cmd.hasOption("disable_yb_load_balancing_policy")) { - AppBase.appConfig.disableYBLoadBalancingPolicy = true; - } - if (cmd.hasOption("print_all_exceptions")) { - AppBase.appConfig.printAllExceptions = true; - } - if (cmd.hasOption("create_table_name") && cmd.hasOption("drop_table_name")) { - LOG.error("Both create and drop table options cannot be provided together."); - System.exit(1); - } - if (cmd.hasOption("create_table_name")) { - AppBase.appConfig.tableName = cmd.getOptionValue("create_table_name"); - LOG.info("Create table name: " + AppBase.appConfig.tableName); - } - if (cmd.hasOption("default_postgres_database")) { - AppBase.appConfig.defaultPostgresDatabase = cmd.getOptionValue("default_postgres_database"); - LOG.info("Default postgres database: " + AppBase.appConfig.defaultPostgresDatabase); - } - if (cmd.hasOption("drop_table_name")) { - AppBase.appConfig.tableName = cmd.getOptionValue("drop_table_name"); - LOG.info("Drop table name: " + AppBase.appConfig.tableName); - AppBase.appConfig.shouldDropTable = true; - } - LOG.info("Num unique keys to insert: " + AppBase.appConfig.numUniqueKeysToWrite); - LOG.info("Num keys to update: " + (AppBase.appConfig.numKeysToWrite - AppBase.appConfig.numUniqueKeysToWrite)); - LOG.info("Num keys to read: " + AppBase.appConfig.numKeysToRead); - LOG.info("Value size: " + AppBase.appConfig.valueSize); - LOG.info("Restrict values to ASCII strings: " + AppBase.appConfig.restrictValuesToAscii); - LOG.info("Perform sanity check at end of app run: " + AppBase.appConfig.sanityCheckAtEnd); - } - - private void initializeTableProperties(CommandLine cmd) { - // Initialize the TTL. - if (cmd.hasOption("table_ttl_seconds")) { - AppBase.appConfig.tableTTLSeconds = Long.parseLong(cmd.getOptionValue("table_ttl_seconds")); - } - - LOG.info("Table TTL (secs): " + AppBase.appConfig.tableTTLSeconds); - } - - /** - * Creates a command line opts object from the arguments specified on the - * command line. - * - * @param args command line args. - * @return a CmdLineOpts object. - * @throws java.lang.Exception exceptions during parsing and preparing of - * options. - */ - public static CmdLineOpts createFromArgs(String[] args) throws Exception { - Options options = new Options(); - - Option appType = OptionBuilder.create("workload"); - appType.setDescription("The workload to run."); - appType.setRequired(true); - appType.setLongOpt("workload"); - appType.setArgs(1); - options.addOption(appType); - - Option proxyAddrs = OptionBuilder.create("nodes"); - proxyAddrs.setDescription("Comma separated proxies, host1:port1,....,hostN:portN"); - proxyAddrs.setRequired(true); - proxyAddrs.setLongOpt("nodes"); - proxyAddrs.setArgs(1); - options.addOption(proxyAddrs); - - options.addOption("help", false, "Show help message."); - options.addOption("verbose", false, "Enable debug level logging."); - - options.addOption("uuid", true, "The UUID to use for this loadtester."); - options.addOption("nouuid", false, "Do not use a UUID. Keys will be key:1, key:2, key:3, " - + "instead of :1, :2, :3 etc."); - options.addOption("create_table_name", true, "The name of the CQL table to create."); - options.addOption("default_postgres_database", true, "The name of the default postgres db."); - options.addOption("drop_table_name", true, "The name of the CQL table to drop."); - options.addOption("read_only", false, - "Read-only workload. " + "Values must have been written previously and uuid must be provided. " - + "num_threads_write will be ignored."); - options.addOption("local_reads", false, "Use consistency ONE for reads."); - options.addOption("num_threads", true, "The total number of threads."); - options.addOption("num_threads_read", true, "The number of threads that perform reads."); - options.addOption("num_threads_write", true, "The number of threads that perform writes."); - options.addOption("num_writes", true, "The total number of writes to perform."); - options.addOption("num_reads", true, "The total number of reads to perform."); - options.addOption("sleep_time", true, "How long (in ms) to sleep between multiple pipeline batches."); - options.addOption("socket_timeout", true, "How long (in ms) to wait for a response from jedis."); - options.addOption("value_size", true, - "Size in bytes of the value. " - + "The bytes are random. Value size should be more than 5 (9) bytes for binary (ascii) " - + "values in order to have checksum for read verification. First byte is used as a " - + "binary/ascii marker. If value size is more than 16 bytes, random content is prepended " - + "with \"val: $key\" string, so we can check if value matches the key during read."); - options.addOption("use_ascii_values", false, - "[RedisKeyValue] If " + "specified, values are restricted to ASCII strings."); - options.addOption("table_ttl_seconds", true, "The TTL in seconds to create the table with."); - options.addOption("sanity_check_at_end", false, - "Add FATAL logs to ensure no failures before terminating the app."); - options.addOption("disable_yb_load_balancing_policy", false, "Disable Yugabyte load-balancing policy."); - options.addOption("print_all_exceptions", false, - "Print all exceptions encountered on the client, instead of sampling."); - options.addOption("skip_workload", false, "Skip running workload."); - options.addOption("run_time", true, "Run time for workload. Negative value means forever (default)."); - options.addOption("use_redis_cluster", false, "Use redis cluster client."); - options.addOption("yql_username", true, "Use authentication with the YQL client using the provided username. " - + "If this option is set, yql_password option should be used too."); - options.addOption("yql_password", true, "Use authentication with the YQL client using the provided password. " - + "If this option is set, yql_username option should be used too."); - options.addOption("concurrent_clients", true, - "The number of client connections to establish to each host in the YugaByte DB cluster."); - options.addOption("ssl_cert", true, "Use an SSL connection while connecting to YugaByte."); - - // Options for CassandraTimeseries workload. - options.addOption("num_users", true, "[CassandraTimeseries] The total number of users."); - options.addOption("min_nodes_per_user", true, "[CassandraTimeseries] The min number of nodes per user."); - options.addOption("min_nodes_per_user", true, "[CassandraTimeseries] The min number of nodes per user."); - options.addOption("max_nodes_per_user", true, "[CassandraTimeseries] The max number of nodes per user."); - options.addOption("min_metrics_count", true, - "[CassandraTimeseries] The min number of metrics for all nodes of user."); - options.addOption("max_metrics_count", true, - "[CassandraTimeseries] The max number of metrics for all nodes of user."); - options.addOption("data_emit_rate_millis", true, - "[CassandraTimeseries] The rate at which each node emits all metrics."); - - // Options for CassandraStockTicker workload. - options.addOption("num_ticker_symbols", true, - "[CassandraStockTicker] The total number of stock ticker symbols."); - - // Options for the key-value workloads. - options.addOption("num_unique_keys", true, "[KV workloads only] Number of unique keys to write into the DB."); - options.addOption("max_written_key", true, - "[KV workloads only, reusing existing table] Max written key number."); - - // Options for CassandraBatchKeyValue app. - options.addOption("batch_size", true, "[CassandraBatchKeyValue] Number of keys to write in a batch."); - - // Options for CassandraBatchTimeseries app. - options.addOption("read_batch_size", true, "[CassandraBatchTimeseries] Number of keys to read in a batch."); - options.addOption("read_back_delta_from_now", true, - "[CassandraBatchTimeseries] Time unit delta back from current time unit."); - - options.addOption("with_local_dc", true, "Local DC name."); - - // Options for CassandraEventData app. - options.addOption("read_batch_size", true, "[CassandraEventData] Number of keys to read in a batch."); - options.addOption("read_back_delta_from_now", true, - "[CassandraEventData] Time unit delta back from current time unit."); - - options.addOption("with_local_dc", true, "Local DC name."); - - // Options for CassandraPersonalization app. - options.addOption("num_stores", true, "[CassandraPersonalization] Number of stores."); - options.addOption("num_new_coupons_per_customer", true, - "[CassandraPersonalization] Number of new coupons per customer."); - options.addOption("max_coupons_per_customer", true, - "[CassandraPersonalization] Maximum number of coupons per customer."); - - // Options for CassandraSecondaryIndex app. - options.addOption("non_transactional_index", false, - "[CassandraSecondaryIndex] Create secondary index without transactions " + "enabled."); - options.addOption("batch_write", false, "[CassandraSecondaryIndex] Enable batch write of key values."); - - // Options for Redis Pipelined Key Value - options.addOption("pipeline_length", true, - "[RedisPipelinedKeyValue/RedisHashPipelined] Number of commands to be sent out" - + " in a redis pipelined sync."); - - options.addOption("num_subkeys_per_key", true, - "[RedisHashPipelined] Number of subkeys in each key for the RedisHashPipelined workload."); - options.addOption("num_subkeys_per_write", true, "[RedisHashPipelined] Number of subkeys updated in each HMSet " - + "for the RedisHashPipelined workload."); - options.addOption("num_subkeys_per_read", true, - "[RedisHashPipelined] Number of subkeys read in each HMGet " + "for the RedisHashPipelined workload."); - - options.addOption("key_freq_zipf_exponent", true, - "[RedisHashPipelined] The zipf distribution exponent, if keys " - + "should be picked using a Zipf distribution. If <= 0, we use " + "a uniform distribution"); - options.addOption("subkey_freq_zipf_exponent", true, - "[RedisHashPipelined] The zipf distribution exponent, if subkeys " - + "should be picked using a Zipf distribution. If <= 0, we use " + "a uniform distribution"); - options.addOption("subkey_value_size_zipf_exponent", true, - "[RedisHashPipelined] The zipf distribution exponent, if the value " - + "sizes should be picked using a Zipf distribution. Value sizes are " - + "chosen such that the expected mean is the value specified by --value_size. " - + "If <= 0, all subkeys will have the value specified by --value_size"); - options.addOption("subkey_value_max_size", true, - "[RedisHashPipelined] If using zipf distribution to choose value sizes, " - + "specifies an upper bound on the value sizes."); - - // First check if a "--help" argument is passed with a simple parser. Note that - // if we add - // required args, then the help string would not work. See: - // https://stackoverflow.com/questions/36720946/apache-cli-required-options-contradicts-with-help-option - // The first function check if help was called with an app name to print - // details. The second - // function just check if help was called without any args to print the - // overview. - parseHelpDetailed(args, options); - parseHelpOverview(args, options); - - // Do the actual arg parsing. - CommandLineParser parser = new BasicParser(); - CommandLine commandLine = null; - - try { - commandLine = parser.parse(options, args); - } catch (ParseException e) { - LOG.error("Error in args, use the --help option to see usage. Exception:", e); - System.exit(0); - } - - CmdLineOpts configuration = new CmdLineOpts(); - configuration.initialize(commandLine); - return configuration; - } - - private static void parseHelpOverview(String[] args, Options options) throws Exception { - Options helpOptions = new Options(); - helpOptions.addOption("help", false, "Print help."); - CommandLineParser helpParser = new BasicParser(); - CommandLine helpCommandLine = null; - try { - helpCommandLine = helpParser.parse(helpOptions, args); - } catch (ParseException e) { - return; - } - if (helpCommandLine.hasOption("help")) { - printUsage(options, "Usage:"); - System.exit(0); - } - } - - private static void parseHelpDetailed(String[] args, Options options) throws Exception { - Options helpOptions = new Options(); - helpOptions.addOption("help", true, "Print help."); - CommandLineParser helpParser = new BasicParser(); - CommandLine helpCommandLine = null; - try { - helpCommandLine = helpParser.parse(helpOptions, args); - } catch (org.apache.commons.cli.MissingArgumentException e1) { - // This is ok since there was no help argument passed. - return; - } catch (ParseException e) { - return; - } - if (helpCommandLine.hasOption("help")) { - printUsageDetails(options, "Usage:", helpCommandLine.getOptionValue("help")); - System.exit(0); - } - } - - private static int getAppPort(String appName) { - if (appName.startsWith("Cassandra")) - return 9042; - else if (appName.startsWith("Redis")) - return 6379; - else if (appName.startsWith("Sql")) - return 5433; - return 0; - } - - private static void printUsage(Options options, String header) throws Exception { - StringBuilder footer = new StringBuilder(); - - footer.append("****************************************************************************\n"); - footer.append("* *\n"); - footer.append("* YugaByte DB Sample Apps *\n"); - footer.append("* *\n"); - footer.append("****************************************************************************\n"); - footer.append("\n"); - footer.append("Use this sample app to try out a variety of workloads against YugaByte DB.\n"); - footer.append(" Use the --help option to get more details on how to run it.\n"); - String optsPrefix = "\t\t\t"; - String optsSuffix = " \\\n"; - for (Class cls : HELP_WORKLOADS) { - String workloadType = cls.getSimpleName(); - AppBase workload = getAppClass(workloadType).newInstance(); - String formattedName = String.format("%-35s: ", workloadType); - footer.append("\n * " + formattedName); - List description = workload.getWorkloadDescription(); - if (!description.isEmpty()) { - footer.append(description.get(0)); - } - } - footer.append("\n"); - System.out.println(footer.toString()); - System.exit(0); - } - - private static void printUsageDetails(Options options, String header, String appName) throws Exception { - StringBuilder footer = new StringBuilder(); - - footer.append("Usage and options for workload " + appName + " in YugaByte DB Sample Apps.\n"); - String optsPrefix = "\t\t\t"; - String optsSuffix = " \\\n"; - int port = getAppPort(appName); - AppBase workload = getAppClass(appName).newInstance(); - - footer.append("\n - " + appName + " :\n"); - footer.append(" "); - for (int idx = 0; idx < appName.length(); idx++) { - footer.append("-"); - } - footer.append("\n"); - - List description = workload.getWorkloadDescription(); - if (!description.isEmpty()) { - for (String line : description) { - footer.append("\t\t"); - footer.append(line); - footer.append("\n"); - } - footer.append("\n"); - } - footer.append("\t\tUsage:\n"); - footer.append(optsPrefix); - footer.append("java -jar yb-sample-apps.jar"); - footer.append(optsSuffix); - footer.append(optsPrefix + "--workload " + appName + optsSuffix); - footer.append(optsPrefix + "--nodes 127.0.0.1:" + port); - - List requiredArgs = workload.getWorkloadRequiredArguments(); - for (String line : requiredArgs) { - footer.append(optsSuffix).append(optsPrefix).append(line); - } - - List optionalArgs = workload.getWorkloadOptionalArguments(); - if (!optionalArgs.isEmpty()) { - footer.append("\n\n\t\tOther options (with default values):\n"); - for (String line : optionalArgs) { - footer.append(optsPrefix + "[ "); - footer.append(line); - footer.append(" ]\n"); - } - } - footer.append("\n"); - System.out.println(footer.toString()); - System.exit(0); - } - - /** - * One contact point could be resolved to multiple nodes IPs. For example for - * Kubernetes yb-tservers.default.svc.cluster.local contact point is resolved to - * all tservers IPs. - */ - public static class ContactPoint { - String host; - int port; - - public ContactPoint(String host, int port) { - this.host = host; - this.port = port; - } - - public String getHost() { - return host; - } - - public int getPort() { - return port; - } - - public static ContactPoint fromHostPort(String hostPort) { - String[] parts = hostPort.split(":"); - String host = parts[0]; - int port = Integer.parseInt(parts[1]); - return new ContactPoint(host, port); - } - - public String ToString() { - return host + ":" + port; + private static final Logger LOG = Logger.getLogger(CmdLineOpts.class); + + // This is a unique UUID that is created by each instance of the application. This UUID is used in + // various apps to make the keys unique. This allows us to run multiple instances of the app + // safely. + public static UUID loadTesterUUID; + + // The various apps present in this sample. + private final static List HELP_WORKLOADS = ImmutableList.of( + CassandraHelloWorld.class, + CassandraKeyValue.class, + CassandraRangeKeyValue.class, + CassandraBatchKeyValue.class, + CassandraBatchTimeseries.class, + CassandraEventData.class, + CassandraTransactionalKeyValue.class, + CassandraTransactionalRestartRead.class, + CassandraStockTicker.class, + CassandraTimeseries.class, + CassandraUserId.class, + CassandraPersonalization.class, + CassandraSecondaryIndex.class, + CassandraUniqueSecondaryIndex.class, + RedisKeyValue.class, + RedisPipelinedKeyValue.class, + RedisHashPipelined.class, + RedisYBClientKeyValue.class, + SqlInserts.class, + SqlUpdates.class, + SqlSecondaryIndex.class, + SqlSnapshotTxns.class + ); + + // The class type of the app needed to spawn new objects. + private Class appClass; + // List of database contact points. One contact point could be resolved to multiple nodes IPs. + public List contactPoints = new ArrayList<>(); + // The number of reader threads to spawn for OLTP apps. + int numReaderThreads; + // The number of writer threads to spawn for OLTP apps. + int numWriterThreads; + boolean readOnly = false; + boolean localReads = false; + // Random number generator. + Random random = new Random(); + // Command line opts parser. + CommandLine commandLine; + + public void initialize(CommandLine commandLine) throws ClassNotFoundException { + this.commandLine = commandLine; + if (commandLine.hasOption("uuid")) { + loadTesterUUID = UUID.fromString(commandLine.getOptionValue("uuid")); + LOG.info("Using given UUID : " + loadTesterUUID); + } else if (commandLine.hasOption("nouuid")) { + loadTesterUUID = null; + LOG.info("Using NO UUID"); + } else { + loadTesterUUID = UUID.randomUUID(); + LOG.info("Using a randomly generated UUID : " + loadTesterUUID); + } + + // Get the workload. + String appName = commandLine.getOptionValue("workload"); + appClass = getAppClass(appName); + LOG.info("App: " + appClass.getSimpleName()); + + AppBase.appConfig.appName = appClass.getSimpleName(); + + if (commandLine.hasOption("skip_workload")) { + AppBase.appConfig.skipWorkload = true; + // For now, drop table is the only op that is permitted without workload. + if (!commandLine.hasOption("drop_table_name")) { + LOG.error("Table name to be dropped is expected when skipping workload run."); + System.exit(1); + } + } + + if (commandLine.hasOption("run_time")) { + AppBase.appConfig.runTimeSeconds = + Integer.parseInt(commandLine.getOptionValue("run_time")); + } + LOG.info("Run time (seconds): " + AppBase.appConfig.runTimeSeconds); + + // Get the proxy contact points. + List hostPortList = Arrays.asList(commandLine.getOptionValue("nodes").split(",")); + for (String hostPort : hostPortList) { + LOG.info("Adding node: " + hostPort); + this.contactPoints.add(ContactPoint.fromHostPort(hostPort)); + } + + // This check needs to be done before initializeThreadCount is called. + if (commandLine.hasOption("read_only")) { + AppBase.appConfig.readOnly = true; + readOnly = true; + if (!commandLine.hasOption("uuid") && !commandLine.hasOption("nouuid")) { + LOG.error("uuid (or nouuid) needs to be provided when using --read-only"); + System.exit(1); + } + } + + // Set the number of threads. + initializeThreadCount(commandLine); + // Initialize num keys. + initializeNumKeys(commandLine); + // Initialize table properties. + initializeTableProperties(commandLine); + if (commandLine.hasOption("local_reads")) { + AppBase.appConfig.localReads = true; + localReads = true; + } + LOG.info("Local reads: " + localReads); + LOG.info("Read only load: " + readOnly); + if (appName.equals(CassandraBatchTimeseries.class.getSimpleName())) { + if (commandLine.hasOption("read_batch_size")) { + AppBase.appConfig.cassandraReadBatchSize = + Integer.parseInt(commandLine.getOptionValue("read_batch_size")); + LOG.info("CassandraBatchTimeseries batch size: " + + AppBase.appConfig.cassandraReadBatchSize); + } + if (commandLine.hasOption("read_back_delta_from_now")) { + AppBase.appConfig.readBackDeltaTimeFromNow = + Integer.parseInt(commandLine.getOptionValue("read_back_delta_from_now")); + LOG.info("CassandraBatchTimeseries delta read: " + + AppBase.appConfig.readBackDeltaTimeFromNow); + } + } + if (appName.equals(CassandraEventData.class.getSimpleName())) { + if (commandLine.hasOption("read_batch_size")) { + AppBase.appConfig.cassandraReadBatchSize = Integer + .parseInt(commandLine.getOptionValue("read_batch_size")); + LOG.info("CassandraEventData batch size: " + AppBase.appConfig.cassandraReadBatchSize); + } + if (commandLine.hasOption("num_devices")) { + AppBase.appConfig.num_devices = Integer + .parseInt(commandLine.getOptionValue("num_devices")); + LOG.info("CassandraEventData num_devices: " + AppBase.appConfig.num_devices); + } + if (commandLine.hasOption("num_event_types")) { + AppBase.appConfig.num_event_types = Integer + .parseInt(commandLine.getOptionValue("num_event_types")); + LOG.info("CassandraEventData num_event_types: " + AppBase.appConfig.num_event_types); + } + if (commandLine.hasOption("read_back_delta_from_now")) { + AppBase.appConfig.readBackDeltaTimeFromNow = Integer + .parseInt(commandLine.getOptionValue("read_back_delta_from_now")); + LOG.info("CassandraEventData delta read: " + AppBase.appConfig.readBackDeltaTimeFromNow); } } + if (commandLine.hasOption("batch_size")) { + AppBase.appConfig.cassandraBatchSize = + Integer.parseInt(commandLine.getOptionValue("batch_size")); + if (AppBase.appConfig.cassandraBatchSize > AppBase.appConfig.numUniqueKeysToWrite) { + LOG.fatal("The batch size cannot be more than the number of unique keys"); + System.exit(-1); + } + LOG.info("Batch size : " + AppBase.appConfig.cassandraBatchSize); + } + if (appName.equals(CassandraPersonalization.class.getSimpleName())) { + if (commandLine.hasOption("num_stores")) { + AppBase.appConfig.numStores = Integer.parseInt(commandLine.getOptionValue("num_stores")); + } + LOG.info("CassandraPersonalization number of stores : " + AppBase.appConfig.numStores); + if (commandLine.hasOption("num_new_coupons_per_customer")) { + AppBase.appConfig.numNewCouponsPerCustomer = + Integer.parseInt(commandLine.getOptionValue("num_new_coupons_per_customer")); + } + LOG.info("CassandraPersonalization number of new coupons per costomer : " + + AppBase.appConfig.numNewCouponsPerCustomer); + if (commandLine.hasOption("max_coupons_per_customer")) { + AppBase.appConfig.maxCouponsPerCustomer = + Integer.parseInt(commandLine.getOptionValue("max_coupons_per_customer")); + } + if (AppBase.appConfig.numNewCouponsPerCustomer > + AppBase.appConfig.maxCouponsPerCustomer) { + LOG.fatal( + "The number of new coupons cannot exceed the maximum number of coupons per customer"); + System.exit(-1); + } + LOG.info("CassandraPersonalization maximum number of coupons per costomer : " + + AppBase.appConfig.maxCouponsPerCustomer); + } + if (appName.equals(CassandraSecondaryIndex.class.getSimpleName())) { + if (commandLine.hasOption("non_transactional_index")) { + AppBase.appConfig.nonTransactionalIndex = true; + } + LOG.info("CassandraSecondaryIndex non-transactional index"); + if (commandLine.hasOption("batch_write")) { + AppBase.appConfig.batchWrite = true; + } + LOG.info("CassandraSecondaryIndex batch write"); + } + if (appName.equals(RedisPipelinedKeyValue.class.getSimpleName()) || + appName.equals(RedisHashPipelined.class.getSimpleName())) { + if (commandLine.hasOption("pipeline_length")) { + AppBase.appConfig.redisPipelineLength = + Integer.parseInt(commandLine.getOptionValue("pipeline_length")); + if (AppBase.appConfig.redisPipelineLength > AppBase.appConfig.numUniqueKeysToWrite) { + LOG.fatal("The pipeline length cannot be more than the number of unique keys"); + System.exit(-1); + } + } + LOG.info("RedisPipelinedKeyValue pipeline length : " + AppBase.appConfig.redisPipelineLength); + } + if (appName.equals(RedisHashPipelined.class.getSimpleName())) { + if (commandLine.hasOption("num_subkeys_per_key")) { + AppBase.appConfig.numSubkeysPerKey = + Integer.parseInt(commandLine.getOptionValue("num_subkeys_per_key")); + if (AppBase.appConfig.redisPipelineLength > + AppBase.appConfig.numUniqueKeysToWrite) { + LOG.fatal("The pipeline length cannot be more than the number of unique keys"); + System.exit(-1); + } + } + if (commandLine.hasOption("key_freq_zipf_exponent")) { + AppBase.appConfig.keyUpdateFreqZipfExponent = Double.parseDouble( + commandLine.getOptionValue("key_freq_zipf_exponent")); + } + if (commandLine.hasOption("subkey_freq_zipf_exponent")) { + AppBase.appConfig.subkeyUpdateFreqZipfExponent = Double.parseDouble( + commandLine.getOptionValue("subkey_freq_zipf_exponent")); + } + if (commandLine.hasOption("subkey_value_size_zipf_exponent")) { + AppBase.appConfig.valueSizeZipfExponent = Double.parseDouble( + commandLine.getOptionValue("subkey_value_size_zipf_exponent")); + } + if (commandLine.hasOption("subkey_value_max_size")) { + AppBase.appConfig.maxValueSize = Integer.parseInt( + commandLine.getOptionValue("subkey_value_max_size")); + } + if (commandLine.hasOption("num_subkeys_per_write")) { + AppBase.appConfig.numSubkeysPerWrite = Integer.parseInt( + commandLine.getOptionValue("num_subkeys_per_write")); + if (AppBase.appConfig.numSubkeysPerWrite > + AppBase.appConfig.numSubkeysPerKey) { + LOG.fatal("Writing more subkeys than the number of subkeys per key."); + System.exit(-1); + } + } + if (commandLine.hasOption("num_subkeys_per_read")) { + AppBase.appConfig.numSubkeysPerRead = Integer.parseInt( + commandLine.getOptionValue("num_subkeys_per_read")); + if (AppBase.appConfig.numSubkeysPerRead > + AppBase.appConfig.numSubkeysPerKey) { + LOG.fatal("Writing more subkeys than the number of subkeys per key."); + System.exit(-1); + } + } + } + if (commandLine.hasOption("with_local_dc")) { + if (AppBase.appConfig.disableYBLoadBalancingPolicy == true) { + LOG.error("--disable_yb_load_balancing_policy cannot be used with --with_local_dc"); + System.exit(1); + } + AppBase.appConfig.localDc = commandLine.getOptionValue("with_local_dc"); + } + if (commandLine.hasOption("use_redis_cluster")) { + AppBase.appConfig.useRedisCluster = true; + } + if (commandLine.hasOption("yql_username")) { + if (!commandLine.hasOption("yql_password")) { + LOG.error("--yql_username requires --yql_password to be set"); + System.exit(1); + } + AppBase.appConfig.cassandraUsername = commandLine.getOptionValue("yql_username"); + } + if (commandLine.hasOption("yql_password")) { + if (!commandLine.hasOption("yql_username")) { + LOG.error("--yql_password requires --yql_username to be set"); + System.exit(1); + } + AppBase.appConfig.cassandraPassword = commandLine.getOptionValue("yql_password"); + } + if (commandLine.hasOption("concurrent_clients")) { + AppBase.appConfig.concurrentClients = Integer.parseInt( + commandLine.getOptionValue("concurrent_clients")); + } + if (commandLine.hasOption("ssl_cert")) { + AppBase.appConfig.sslCert = commandLine.getOptionValue("ssl_cert"); + } + } + + /** + * Creates new instance of the app. + * @return the app instance. + */ + public AppBase createAppInstance() { + return createAppInstance(true /* enableMetrics */); + } + + /** + * Creates new instance of the app. + * @param enableMetrics Should metrics tracker be enabled. + * @return the app instance. + */ + public AppBase createAppInstance(boolean enableMetrics) { + AppBase workload = null; + try { + // Create a new workload object. + workload = appClass.newInstance(); + // Initialize the workload. + workload.workloadInit(this, enableMetrics); + } catch (Exception e) { + LOG.error("Could not create instance of " + appClass.getName(), e); + } + return workload; + } + + public CommandLine getCommandLine() { + return commandLine; + } + + public List getContactPoints() { + return contactPoints; + } + + public ContactPoint getRandomContactPoint() { + int contactPointId = random.nextInt(contactPoints.size()); + LOG.debug("Returning random contact point id " + contactPointId); + return contactPoints.get(contactPointId); + } + + public int getNumReaderThreads() { + return numReaderThreads; + } + + public int getNumWriterThreads() { + return numWriterThreads; + } + + public boolean getReadOnly() { + return readOnly; + } + + public boolean doErrorChecking() { + return AppBase.appConfig.sanityCheckAtEnd; + } + + public boolean shouldDropTable() { + return AppBase.appConfig.shouldDropTable; + } + + public boolean skipWorkload() { + return AppBase.appConfig.skipWorkload; + } + + public String appName() { + return AppBase.appConfig.appName; + } + + private static Class getAppClass(String workloadType) + throws ClassNotFoundException { + // Get the workload class. + return Class.forName("com.yugabyte.sample.apps." + workloadType) + .asSubclass(AppBase.class); + } + + private void initializeThreadCount(CommandLine cmd) { + // Check if there are a fixed number of threads or variable. + String numThreadsStr = cmd.getOptionValue("num_threads"); + if (readOnly) { + numReaderThreads = AppBase.appConfig.numReaderThreads; + numWriterThreads = 0; + } else if (AppBase.appConfig.readIOPSPercentage == -1) { + numReaderThreads = AppBase.appConfig.numReaderThreads; + numWriterThreads = AppBase.appConfig.numWriterThreads; + } else { + int numThreads = 0; + if (numThreadsStr != null) { + numThreads = Integer.parseInt(numThreadsStr); + } else { + // Default to 8 * num-cores + numThreads = 8 * Runtime.getRuntime().availableProcessors(); + } + numReaderThreads = + (int) Math.round(1.0 * numThreads * AppBase.appConfig.readIOPSPercentage / 100); + numWriterThreads = numThreads - numReaderThreads; + } + + // If number of read and write threads are specified on the command line, that overrides all + // the other values. + if (cmd.hasOption("num_threads_read")) { + numReaderThreads = Integer.parseInt(cmd.getOptionValue("num_threads_read")); + } + if (cmd.hasOption("num_threads_write")) { + if (readOnly) { + LOG.warn("Ignoring num_threads_write option. It shouldn't be used with read_only."); + } else { + numWriterThreads = Integer.parseInt(cmd.getOptionValue("num_threads_write")); + } + } + LOG.info("Num reader threads: " + numReaderThreads + + ", num writer threads: " + numWriterThreads); + } + + private void initializeNumKeys(CommandLine cmd) { + if (cmd.hasOption("num_writes")) { + AppBase.appConfig.numKeysToWrite = Long.parseLong(cmd.getOptionValue("num_writes")); + } + if (cmd.hasOption("num_reads")) { + AppBase.appConfig.numKeysToRead = Long.parseLong(cmd.getOptionValue("num_reads")); + } + if (cmd.hasOption("num_unique_keys")) { + AppBase.appConfig.numUniqueKeysToWrite = + Long.parseLong(cmd.getOptionValue("num_unique_keys")); + } + AppBase.appConfig.maxWrittenKey = Long.parseLong(cmd.getOptionValue("max_written_key", + String.valueOf(AppBase.appConfig.maxWrittenKey))); + if (cmd.hasOption("value_size")) { + AppBase.appConfig.valueSize = Integer.parseInt(cmd.getOptionValue("value_size")); + } + if (cmd.hasOption("sleep_time")) { + AppBase.appConfig.sleepTime = + Integer.parseInt(cmd.getOptionValue("sleep_time")); + } + if (cmd.hasOption("socket_timeout")) { + AppBase.appConfig.jedisSocketTimeout = + Integer.parseInt(cmd.getOptionValue("socket_timeout")); + } + if (cmd.hasOption("use_ascii_values")) { + AppBase.appConfig.restrictValuesToAscii = true; + } + if (cmd.hasOption("sanity_check_at_end")) { + AppBase.appConfig.sanityCheckAtEnd = true; + } + if (cmd.hasOption("disable_yb_load_balancing_policy")) { + AppBase.appConfig.disableYBLoadBalancingPolicy = true; + } + if (cmd.hasOption("print_all_exceptions")) { + AppBase.appConfig.printAllExceptions = true; + } + if (cmd.hasOption("create_table_name") && cmd.hasOption("drop_table_name")) { + LOG.error("Both create and drop table options cannot be provided together."); + System.exit(1); + } + if (cmd.hasOption("create_table_name")) { + AppBase.appConfig.tableName = cmd.getOptionValue("create_table_name"); + LOG.info("Create table name: " + AppBase.appConfig.tableName); + } + if (cmd.hasOption("default_postgres_database")) { + AppBase.appConfig.defaultPostgresDatabase = cmd.getOptionValue("default_postgres_database"); + LOG.info("Default postgres database: " + AppBase.appConfig.defaultPostgresDatabase); + } + if (cmd.hasOption("drop_table_name")) { + AppBase.appConfig.tableName = cmd.getOptionValue("drop_table_name"); + LOG.info("Drop table name: " + AppBase.appConfig.tableName); + AppBase.appConfig.shouldDropTable = true; + } + LOG.info("Num unique keys to insert: " + AppBase.appConfig.numUniqueKeysToWrite); + LOG.info("Num keys to update: " + + (AppBase.appConfig.numKeysToWrite - AppBase.appConfig.numUniqueKeysToWrite)); + LOG.info("Num keys to read: " + AppBase.appConfig.numKeysToRead); + LOG.info("Value size: " + AppBase.appConfig.valueSize); + LOG.info("Restrict values to ASCII strings: " + AppBase.appConfig.restrictValuesToAscii); + LOG.info("Perform sanity check at end of app run: " + AppBase.appConfig.sanityCheckAtEnd); + } + + private void initializeTableProperties(CommandLine cmd) { + // Initialize the TTL. + if (cmd.hasOption("table_ttl_seconds")) { + AppBase.appConfig.tableTTLSeconds = + Long.parseLong(cmd.getOptionValue("table_ttl_seconds")); + } + + LOG.info("Table TTL (secs): " + AppBase.appConfig.tableTTLSeconds); + } + + /** + * Creates a command line opts object from the arguments specified on the command line. + * @param args command line args. + * @return a CmdLineOpts object. + * @throws java.lang.Exception exceptions during parsing and preparing of options. + */ + public static CmdLineOpts createFromArgs(String[] args) throws Exception { + Options options = new Options(); + + Option appType = OptionBuilder.create("workload"); + appType.setDescription("The workload to run."); + appType.setRequired(true); + appType.setLongOpt("workload"); + appType.setArgs(1); + options.addOption(appType); + + Option proxyAddrs = OptionBuilder.create("nodes"); + proxyAddrs.setDescription("Comma separated proxies, host1:port1,....,hostN:portN"); + proxyAddrs.setRequired(true); + proxyAddrs.setLongOpt("nodes"); + proxyAddrs.setArgs(1); + options.addOption(proxyAddrs); + + options.addOption("help", false, "Show help message."); + options.addOption("verbose", false, "Enable debug level logging."); + + options.addOption("uuid", true, "The UUID to use for this loadtester."); + options.addOption("nouuid", false, + "Do not use a UUID. Keys will be key:1, key:2, key:3, " + + "instead of :1, :2, :3 etc."); + options.addOption("create_table_name", true, "The name of the CQL table to create."); + options.addOption("default_postgres_database", true, "The name of the default postgres db."); + options.addOption("drop_table_name", true, "The name of the CQL table to drop."); + options.addOption("read_only", false, "Read-only workload. " + + "Values must have been written previously and uuid must be provided. " + + "num_threads_write will be ignored."); + options.addOption("local_reads", false, "Use consistency ONE for reads."); + options.addOption("num_threads", true, "The total number of threads."); + options.addOption("num_threads_read", true, "The number of threads that perform reads."); + options.addOption("num_threads_write", true, "The number of threads that perform writes."); + options.addOption("num_writes", true, "The total number of writes to perform."); + options.addOption("num_reads", true, "The total number of reads to perform."); + options.addOption( + "sleep_time", true, + "How long (in ms) to sleep between multiple pipeline batches."); + options.addOption("socket_timeout", true, + "How long (in ms) to wait for a response from jedis."); + options.addOption("value_size", true, "Size in bytes of the value. " + + "The bytes are random. Value size should be more than 5 (9) bytes for binary (ascii) " + + "values in order to have checksum for read verification. First byte is used as a " + + "binary/ascii marker. If value size is more than 16 bytes, random content is prepended " + + "with \"val: $key\" string, so we can check if value matches the key during read."); + options.addOption("use_ascii_values", false, "[RedisKeyValue] If " + + "specified, values are restricted to ASCII strings."); + options.addOption("table_ttl_seconds", true, "The TTL in seconds to create the table with."); + options.addOption("sanity_check_at_end", false, + "Add FATAL logs to ensure no failures before terminating the app."); + options.addOption("disable_yb_load_balancing_policy", false, + "Disable Yugabyte load-balancing policy."); + options.addOption("print_all_exceptions", false, + "Print all exceptions encountered on the client, instead of sampling."); + options.addOption("skip_workload", false, "Skip running workload."); + options.addOption("run_time", true, + "Run time for workload. Negative value means forever (default)."); + options.addOption("use_redis_cluster", false, "Use redis cluster client."); + options.addOption("yql_username", true, + "Use authentication with the YQL client using the provided username. " + + "If this option is set, yql_password option should be used too."); + options.addOption("yql_password", true, + "Use authentication with the YQL client using the provided password. " + + "If this option is set, yql_username option should be used too."); + options.addOption("concurrent_clients", true, + "The number of client connections to establish to each host in the YugaByte DB cluster."); + options.addOption("ssl_cert", true, + "Use an SSL connection while connecting to YugaByte."); + + // Options for CassandraTimeseries workload. + options.addOption("num_users", true, "[CassandraTimeseries] The total number of users."); + options.addOption("min_nodes_per_user", true, + "[CassandraTimeseries] The min number of nodes per user."); + options.addOption("min_nodes_per_user", true, + "[CassandraTimeseries] The min number of nodes per user."); + options.addOption("max_nodes_per_user", true, + "[CassandraTimeseries] The max number of nodes per user."); + options.addOption("min_metrics_count", true, + "[CassandraTimeseries] The min number of metrics for all nodes of user."); + options.addOption("max_metrics_count", true, + "[CassandraTimeseries] The max number of metrics for all nodes of user."); + options.addOption("data_emit_rate_millis", true, + "[CassandraTimeseries] The rate at which each node emits all metrics."); + + // Options for CassandraStockTicker workload. + options.addOption("num_ticker_symbols", true, + "[CassandraStockTicker] The total number of stock ticker symbols."); + + // Options for the key-value workloads. + options.addOption("num_unique_keys", true, + "[KV workloads only] Number of unique keys to write into the DB."); + options.addOption("max_written_key", true, + "[KV workloads only, reusing existing table] Max written key number."); + + // Options for CassandraBatchKeyValue app. + options.addOption("batch_size", true, + "[CassandraBatchKeyValue] Number of keys to write in a batch."); + + // Options for CassandraBatchTimeseries app. + options.addOption("read_batch_size", true, + "[CassandraBatchTimeseries] Number of keys to read in a batch."); + options.addOption("read_back_delta_from_now", true, + "[CassandraBatchTimeseries] Time unit delta back from current time unit."); + + options.addOption("with_local_dc", true, "Local DC name."); + // Options for CassandraEventData app. + options.addOption("read_batch_size", true, "[CassandraEventData] Number of keys to read in a batch."); + options.addOption("num_devices", true, "[CassandraEventData] Number of devices to generate data"); + options.addOption("num_event_types", true, "[CassandraEventData] Number of event yypes to generate per device"); + options.addOption("read_batch_size", true, "[CassandraEventData] Number of keys to read in a batch."); + options.addOption("read_back_delta_from_now", true, + "[CassandraEventData] Time unit delta back from current time unit."); + + // Options for CassandraPersonalization app. + options.addOption("num_stores", true, + "[CassandraPersonalization] Number of stores."); + options.addOption("num_new_coupons_per_customer", true, + "[CassandraPersonalization] Number of new coupons per customer."); + options.addOption("max_coupons_per_customer", true, + "[CassandraPersonalization] Maximum number of coupons per customer."); + + // Options for CassandraSecondaryIndex app. + options.addOption("non_transactional_index", false, + "[CassandraSecondaryIndex] Create secondary index without transactions " + + "enabled."); + options.addOption("batch_write", false, + "[CassandraSecondaryIndex] Enable batch write of key values."); + + // Options for Redis Pipelined Key Value + options.addOption( + "pipeline_length", true, + "[RedisPipelinedKeyValue/RedisHashPipelined] Number of commands to be sent out" + + " in a redis pipelined sync."); + + options.addOption( + "num_subkeys_per_key", true, + "[RedisHashPipelined] Number of subkeys in each key for the RedisHashPipelined workload."); + options.addOption( + "num_subkeys_per_write", true, + "[RedisHashPipelined] Number of subkeys updated in each HMSet " + + "for the RedisHashPipelined workload."); + options.addOption( + "num_subkeys_per_read", true, + "[RedisHashPipelined] Number of subkeys read in each HMGet " + + "for the RedisHashPipelined workload."); + + options.addOption( + "key_freq_zipf_exponent", true, + "[RedisHashPipelined] The zipf distribution exponent, if keys " + + "should be picked using a Zipf distribution. If <= 0, we use " + + "a uniform distribution"); + options.addOption( + "subkey_freq_zipf_exponent", true, + "[RedisHashPipelined] The zipf distribution exponent, if subkeys " + + "should be picked using a Zipf distribution. If <= 0, we use " + + "a uniform distribution"); + options.addOption( + "subkey_value_size_zipf_exponent", true, + "[RedisHashPipelined] The zipf distribution exponent, if the value " + + "sizes should be picked using a Zipf distribution. Value sizes are " + + "chosen such that the expected mean is the value specified by --value_size. " + + "If <= 0, all subkeys will have the value specified by --value_size"); + options.addOption( + "subkey_value_max_size", true, + "[RedisHashPipelined] If using zipf distribution to choose value sizes, " + + "specifies an upper bound on the value sizes."); + + // First check if a "--help" argument is passed with a simple parser. Note that if we add + // required args, then the help string would not work. See: + // https://stackoverflow.com/questions/36720946/apache-cli-required-options-contradicts-with-help-option + // The first function check if help was called with an app name to print details. The second + // function just check if help was called without any args to print the overview. + parseHelpDetailed(args, options); + parseHelpOverview(args, options); + + // Do the actual arg parsing. + CommandLineParser parser = new BasicParser(); + CommandLine commandLine = null; + + try { + commandLine = parser.parse(options, args); + } catch (ParseException e) { + LOG.error("Error in args, use the --help option to see usage. Exception:", e); + System.exit(0); + } + + CmdLineOpts configuration = new CmdLineOpts(); + configuration.initialize(commandLine); + return configuration; + } + + private static void parseHelpOverview(String[] args, Options options) throws Exception { + Options helpOptions = new Options(); + helpOptions.addOption("help", false, "Print help."); + CommandLineParser helpParser = new BasicParser(); + CommandLine helpCommandLine = null; + try { + helpCommandLine = helpParser.parse(helpOptions, args); + } catch (ParseException e) { + return; + } + if (helpCommandLine.hasOption("help")) { + printUsage(options, "Usage:"); + System.exit(0); + } + } + + private static void parseHelpDetailed(String[] args, Options options) throws Exception { + Options helpOptions = new Options(); + helpOptions.addOption("help", true, "Print help."); + CommandLineParser helpParser = new BasicParser(); + CommandLine helpCommandLine = null; + try { + helpCommandLine = helpParser.parse(helpOptions, args); + } catch (org.apache.commons.cli.MissingArgumentException e1) { + // This is ok since there was no help argument passed. + return; + } catch (ParseException e) { + return; + } + if (helpCommandLine.hasOption("help")) { + printUsageDetails(options, "Usage:", helpCommandLine.getOptionValue("help")); + System.exit(0); + } + } + + private static int getAppPort(String appName) { + if (appName.startsWith("Cassandra")) return 9042; + else if (appName.startsWith("Redis")) return 6379; + else if (appName.startsWith("Sql")) return 5433; + return 0; + } + + private static void printUsage(Options options, String header) throws Exception { + StringBuilder footer = new StringBuilder(); + + footer.append("****************************************************************************\n"); + footer.append("* *\n"); + footer.append("* YugaByte DB Sample Apps *\n"); + footer.append("* *\n"); + footer.append("****************************************************************************\n"); + footer.append("\n"); + footer.append("Use this sample app to try out a variety of workloads against YugaByte DB.\n"); + footer.append(" Use the --help option to get more details on how to run it.\n"); + String optsPrefix = "\t\t\t"; + String optsSuffix = " \\\n"; + for (Class cls: HELP_WORKLOADS) { + String workloadType = cls.getSimpleName(); + AppBase workload = getAppClass(workloadType).newInstance(); + String formattedName = String.format("%-35s: ", workloadType); + footer.append("\n * " + formattedName); + List description = workload.getWorkloadDescription(); + if (!description.isEmpty()) { + footer.append(description.get(0)); + } + } + footer.append("\n"); + System.out.println(footer.toString()); + System.exit(0); + } + + private static void printUsageDetails(Options options, String header, String appName) throws Exception { + StringBuilder footer = new StringBuilder(); + + footer.append("Usage and options for workload " + appName + " in YugaByte DB Sample Apps.\n"); + String optsPrefix = "\t\t\t"; + String optsSuffix = " \\\n"; + int port = getAppPort(appName); + AppBase workload = getAppClass(appName).newInstance(); + + footer.append("\n - " + appName + " :\n"); + footer.append(" "); + for (int idx = 0; idx < appName.length(); idx++) { + footer.append("-"); + } + footer.append("\n"); + + List description = workload.getWorkloadDescription(); + if (!description.isEmpty()) { + for (String line : description) { + footer.append("\t\t"); + footer.append(line); + footer.append("\n"); + } + footer.append("\n"); + } + footer.append("\t\tUsage:\n"); + footer.append(optsPrefix); + footer.append("java -jar yb-sample-apps.jar"); + footer.append(optsSuffix); + footer.append(optsPrefix + "--workload " + appName + optsSuffix); + footer.append(optsPrefix + "--nodes 127.0.0.1:" + port); + + List requiredArgs = workload.getWorkloadRequiredArguments(); + for (String line : requiredArgs) { + footer.append(optsSuffix).append(optsPrefix).append(line); + } + + List optionalArgs = workload.getWorkloadOptionalArguments(); + if (!optionalArgs.isEmpty()) { + footer.append("\n\n\t\tOther options (with default values):\n"); + for (String line : optionalArgs) { + footer.append(optsPrefix + "[ "); + footer.append(line); + footer.append(" ]\n"); + } + } + footer.append("\n"); + System.out.println(footer.toString()); + System.exit(0); + } + + /** + * One contact point could be resolved to multiple nodes IPs. For example for Kubernetes + * yb-tservers.default.svc.cluster.local contact point is resolved to all tservers IPs. + */ + public static class ContactPoint { + String host; + int port; + + public ContactPoint(String host, int port) { + this.host = host; + this.port = port; + } + + public String getHost() { + return host; + } + + public int getPort() { + return port; + } + + public static ContactPoint fromHostPort(String hostPort) { + String[] parts = hostPort.split(":"); + String host = parts[0]; + int port = Integer.parseInt(parts[1]); + return new ContactPoint(host, port); + } + + public String ToString() { return host + ":" + port; } + } }