Skip to content

Commit

Permalink
fix build
Browse files Browse the repository at this point in the history
  • Loading branch information
junwen12221 committed Dec 16, 2021
1 parent 8507e32 commit a7d9d66
Showing 1 changed file with 12 additions and 7 deletions.
19 changes: 12 additions & 7 deletions mycat2/src/main/java/io/mycat/commands/ReceiverImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,7 @@ public Future<Void> onComplete() {
}



protected boolean hasMoreResultSet() {
return count < this.stmtSize;
}
Expand Down Expand Up @@ -354,17 +355,23 @@ public void onComplete() {
}

@Override
public Future<Void> sendVectorResultSet(PrepareExecutor.ArrowObservable rootObservable) {
MycatRowMetaData mycatRowMetaData = rootObservable.getMycatRowMetaData();
class Writer implements io.reactivex.rxjava3.functions.Function<VectorSchemaRoot, ObservableSource<? extends MysqlPayloadObject>> {
public Future<Void> sendVectorResultSet(Observable<VectorSchemaRoot> rootObservable) {
Observable<MysqlPayloadObject> mysqlPacketObservable = rootObservable.flatMap(new io.reactivex.rxjava3.functions.Function<VectorSchemaRoot, ObservableSource<? extends MysqlPayloadObject>>() {

InnerType[] types = null;

@Override
public ObservableSource<? extends MysqlPayloadObject> apply(VectorSchemaRoot vectorRowBatch) throws Throwable {
int rowCount = vectorRowBatch.getRowCount();
ArrayList<MysqlPayloadObject> objects = new ArrayList<>(rowCount);
ArrayList<MysqlPayloadObject> objects;
if (types == null) {
types = SchemaBuilder.getInnerTypes(vectorRowBatch);
MycatRowMetaData rowMetaData = ResultWriterUtil.vectorRowBatchToResultSetColumn(vectorRowBatch.getSchema());
MySQLColumnDef mySQLColumnDef = MySQLColumnDef.of(rowMetaData);
objects = new ArrayList<>(rowCount + 1);
objects.add(mySQLColumnDef);
} else {
objects = new ArrayList<>(rowCount);
}
for (int rowId = 0; rowId < rowCount; rowId++) {
ResultSetWriter newWriter = binary ? new SimpleBinaryWriterImpl() : new SimpleTextWriterImpl();
Expand All @@ -373,9 +380,7 @@ public ObservableSource<? extends MysqlPayloadObject> apply(VectorSchemaRoot vec
}
return Observable.fromIterable(objects);
}
};
Writer writer = new Writer();
Observable<MysqlPayloadObject> mysqlPacketObservable = Observable.concat(Observable.fromArray(MySQLColumnDef.of(mycatRowMetaData)), rootObservable.getObservable().flatMap(writer));
});
return sendResultSet(mysqlPacketObservable);
}

Expand Down

0 comments on commit a7d9d66

Please sign in to comment.