HoraeDBClient 是 Apache HoraeDB (Incubating) 的高性能 Java 版客户端。
- 通信层基于 SPI 可扩展,默认提供 gRPC 的实现
- 提供纯异步的流式高性能写入 API
- 默认提供丰富的性能指标采集,可输指标统计到本地文件
- 支持关键对象内存状态快照或配置输出到本地文件以协助排查问题
┌─────────────────────┐
│ HoraeDBClient │
└─────────────────────┘
│
▼
┌─────────────────────┐
│ WriteClient │───┐
└─────────────────────┘ │
│ Async to retry and merge responses
│ │
┌────Split requests │
│ │
│ ┌─────────────────────┐ │ ┌─────────────────────┐ ┌─────────────────────┐
└─▶│ RouterClient │◀─┴──▶│ RouterCache │◀─────▶│ RouterFor │
└─────────────────────┘ └─────────────────────┘ └─────────────────────┘
▲ │
│ │
▼ │
┌─────────────────────┐ │
│ RpcClient │◀──────────────────────────────────────────────┘
└─────────────────────┘
▲
│
▼
┌─────────────────────┐
│ Default gRPC impl │
└─────────────────────┘
▲
│
┌───────────────────┴ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐
│ │
▼ ▼ ▼
┌─────────────────────┐ ┌─────────────────────┐ ┌─────────────────────┐
│ HoraeDB Node1 │ │ HoraeDB Node2 │ │ ... │
└─────────────────────┘ └─────────────────────┘ └─────────────────────┘
┌─────────────────────┐
│ HoraeDBClient │
└─────────────────────┘
│
▼
┌─────────────────────┐
│ QueryClient │───┐
└─────────────────────┘ │
│ │Async to retry
│ │
┌────────────┘ │
│ │
│ ┌─────────────────────┐ │ ┌─────────────────────┐ ┌─────────────────────┐
└─▶│ RouterClient │◀─┴──▶│ RouterCache │◀─────▶│ RouterFor │
└─────────────────────┘ └─────────────────────┘ └─────────────────────┘
▲ │
│ │
▼ │
┌─────────────────────┐ │
│ RpcClient │◀──────────────────────────────────────────────┘
└─────────────────────┘
▲
│
▼
┌─────────────────────┐
│ Default gRPC impl │
└─────────────────────┘
▲
│
┌───────────────────┴ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐
│ │
▼ ▼ ▼
┌─────────────────────┐ ┌─────────────────────┐ ┌─────────────────────┐
│ HoraeDB Node1 │ │ HoraeDB Node2 │ │ ... │
└─────────────────────┘ └─────────────────────┘ └─────────────────────┘
编译需要 Java 8 及以上
<dependency>
<groupId>io.ceresdb</groupId>
<artifactId>horaedb-all</artifactId>
<version>1.0.0-alpha</version>
</dependency>
// HoraeDB options
final HoraeOptions opts = HoraeOptions.newBuilder("127.0.0.1", 8831, DIRECT) // 默认 gprc 端口号,DIRECT 模式
.database("public") // Client所使用的database,可被RequestContext的database覆盖
.writeMaxRetries(1) // 写入失败重试次数上限(只有部分错误 code 才会重试,比如路由表失效)
.readMaxRetries(1) // 查询失败重试次数上限(只有部分错误 code 才会重试,比如路由表失效)
.build();
final HoraeClient client = new HoraeClient();
if (!client.init(opts)) {
throw new IllegalStateException("Fail to start HoraeClient");
}
配置详情见 configuration
HoraeDB 是一个 Schema-less 的时序数据引擎,你可以不必创建 schema 就立刻写入数据(HoraeDB 会根据你的第一次写入帮你创建一个默认的 schema)。 当然你也可以自行创建一个 schema 来更精细化的管理的表(比如索引等)
下面的建表语句(使用 SDK 的 SQL API)包含了 HoraeDB 支持的所有字段类型:
String createTableSql = "CREATE TABLE IF NOT EXISTS machine_table(" + "ts TIMESTAMP NOT NULL," + //
"ts TIMESTAMP NOT NULL," +
"city STRING TAG NOT NULL," +
"ip STRING TAG NOT NULL," +
"cpu DOUBLE NULL," +
"mem DOUBLE NULL," +
"TIMESTAMP KEY(ts)" + // 建表时必须指定时间戳序列
") ENGINE=Analytic";
Result<SqlQueryOk, Err> createResult = client.sqlQuery(new SqlQueryRequest(createTableSql)).get();
if (!createResult.isOk()) {
throw new IllegalStateException("Fail to create table");
}
详情见 table
final Point point = Point.newPointBuilder("machine_table")
.setTimestamp(t0)
.addTag("city", "Singapore")
.addTag("ip", "10.0.0.1")
.addField("cpu", Value.withDouble(0.23))
.addField("mem", Value.withDouble(0.55))
.build();
final CompletableFuture<Result<WriteOk, Err>> wf = client.write(new WriteRequest(pointList));
// 这里用 `future.get` 只是方便演示,推荐借助 CompletableFuture 强大的 API 实现异步编程
final Result<WriteOk, Err> writeResult = wf.get();
Assert.assertTrue(writeResult.isOk());
Assert.assertEquals(3, writeResult.getOk().getSuccess());
// `Result` 类参考了 Rust 语言,提供了丰富的 mapXXX、andThen 类 function 方便对结果值进行转换,提高编程效率,欢迎参考 API 文档使用
Assert.assertEquals(3, writeResult.mapOr(0, WriteOk::getSuccess).intValue());
Assert.assertEquals(0, writeResult.mapOr(-1, WriteOk::getFailed).intValue());
详情见 write
final SqlQueryRequest queryRequest = SqlQueryRequest.newBuilder()
.forTables("machine_table") // 这里表名是可选的,如果未提供,SDK将自动解析SQL填充表名并自动路由
.sql("select * from machine_table where ts = %d", t0) //
.build();
final CompletableFuture<Result<SqlQueryOk, Err>> qf = client.sqlQuery(queryRequest);
// 这里用 `future.get` 只是方便演示,推荐借助 CompletableFuture 强大的 API 实现异步编程
final Result<SqlQueryOk, Err> queryResult = qf.get();
Assert.assertTrue(queryResult.isOk());
final SqlQueryOk queryOk = queryResult.getOk();
Assert.assertEquals(1, queryOk.getRowCount());
// 直接获取结果数组
final List<Row> rows = queryOk.getRowList();
// 获取结果流
final Stream<Row> rowStream = queryOk.stream();
rowStream.forEach(row -> System.out.println(row.toString()));
详情见 read
HoraeDB 支持流式读写,适用于大规模数据读写。
final StreamWriteBuf<Point, WriteOk> writeBuf = client.streamWrite("machine_table");
for (int i = 0; i < 1000; i++) {
final Point point = Point.newPointBuilder("machine_table")
.setTimestamp(timestamp)
.addTag("city", "Beijing")
.addTag("ip", "10.0.0.3")
.addField("cpu", Value.withDouble(0.42))
.addField("mem", Value.withDouble(0.67))
.build();
writeBuf.writeAndFlush(Arrays.asList(point));
timestamp = timestamp+1;
}
final CompletableFuture<WriteOk> writeOk = writeBuf.completed();
详情见 streaming
- 搜索并加入钉钉社区与用户群 44602802