diff --git a/mycat2/src/main/java/io/mycat/commands/ReceiverImpl.java b/mycat2/src/main/java/io/mycat/commands/ReceiverImpl.java index b4d32c180..28b9b38d6 100644 --- a/mycat2/src/main/java/io/mycat/commands/ReceiverImpl.java +++ b/mycat2/src/main/java/io/mycat/commands/ReceiverImpl.java @@ -284,6 +284,7 @@ public Future onComplete() { } + protected boolean hasMoreResultSet() { return count < this.stmtSize; } @@ -354,17 +355,23 @@ public void onComplete() { } @Override - public Future sendVectorResultSet(PrepareExecutor.ArrowObservable rootObservable) { - MycatRowMetaData mycatRowMetaData = rootObservable.getMycatRowMetaData(); - class Writer implements io.reactivex.rxjava3.functions.Function> { + public Future sendVectorResultSet(Observable rootObservable) { + Observable mysqlPacketObservable = rootObservable.flatMap(new io.reactivex.rxjava3.functions.Function>() { + InnerType[] types = null; @Override public ObservableSource apply(VectorSchemaRoot vectorRowBatch) throws Throwable { int rowCount = vectorRowBatch.getRowCount(); - ArrayList objects = new ArrayList<>(rowCount); + ArrayList objects; if (types == null) { types = SchemaBuilder.getInnerTypes(vectorRowBatch); + MycatRowMetaData rowMetaData = ResultWriterUtil.vectorRowBatchToResultSetColumn(vectorRowBatch.getSchema()); + MySQLColumnDef mySQLColumnDef = MySQLColumnDef.of(rowMetaData); + objects = new ArrayList<>(rowCount + 1); + objects.add(mySQLColumnDef); + } else { + objects = new ArrayList<>(rowCount); } for (int rowId = 0; rowId < rowCount; rowId++) { ResultSetWriter newWriter = binary ? new SimpleBinaryWriterImpl() : new SimpleTextWriterImpl(); @@ -373,9 +380,7 @@ public ObservableSource apply(VectorSchemaRoot vec } return Observable.fromIterable(objects); } - }; - Writer writer = new Writer(); - Observable mysqlPacketObservable = Observable.concat(Observable.fromArray(MySQLColumnDef.of(mycatRowMetaData)), rootObservable.getObservable().flatMap(writer)); + }); return sendResultSet(mysqlPacketObservable); }