Skip to content

Commit

Permalink
Text实现,但是读取显示依然有错
Browse files Browse the repository at this point in the history
  • Loading branch information
CGHu committed May 5, 2024
1 parent 320554b commit 180e3d4
Show file tree
Hide file tree
Showing 24 changed files with 1,214 additions and 652 deletions.
1 change: 1 addition & 0 deletions src/observer/common/rc.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ See the Mulan PSL v2 for more details. */
DEFINE_RC(IOERR_ACCESS) \
DEFINE_RC(IOERR_OPEN) \
DEFINE_RC(IOERR_CLOSE) \
DEFINE_RC(TEXT_OVERFLOW) \
DEFINE_RC(IOERR_SEEK) \
DEFINE_RC(IOERR_TOO_LONG) \
DEFINE_RC(IOERR_SYNC) \
Expand Down
77 changes: 75 additions & 2 deletions src/observer/net/plain_communicator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ See the Mulan PSL v2 for more details. */
#include "net/buffered_writer.h"
#include "session/session.h"
#include "sql/expr/tuple.h"
#include <memory>
#include "sql/operator/project_physical_operator.h"

using namespace std;

Expand All @@ -39,7 +41,7 @@ RC PlainCommunicator::read_event(SessionEvent *&event)
int data_len = 0;
int read_len = 0;

const int max_packet_size = 8192;
const int max_packet_size = 81920;
vector<char> buf(max_packet_size);

// 持续接收消息,直到遇到'\0'。将'\0'遇到的后续数据直接丢弃没有处理,因为目前仅支持一收一发的模式
Expand Down Expand Up @@ -180,6 +182,62 @@ RC PlainCommunicator::write_result(SessionEvent *event, bool &need_disconnect)
return rc;
}

RC PlainCommunicator::write_tuple(SqlResult *sql_result) {
RC rc = RC::SUCCESS;

Tuple *tuple = nullptr;
bool aggregate = false;
std::vector<Value> last_values;


while (RC::SUCCESS == (rc = sql_result->next_tuple(tuple))) {
int cell_num = tuple->cell_num();


for (int i = 0; i < cell_num; i++) {
if (i != 0 && !aggregate) {
const char *delim = " | ";
writer_->writen(delim, strlen(delim));
}

Value value;
rc = tuple->cell_at(i, value);
if (rc != RC::SUCCESS)
return rc;

if (value.attr_type() == TEXTS)
{
RID *rid = reinterpret_cast<RID *>(const_cast<char *>(value.data()));
size_t len = rid->text_value;
char *ss = new char[len + 1];
char *p = ss;
while (rid != nullptr && rid->init) {
Record rec_new;
tuple->get_text_record(rec_new, rid);
memcpy(p, rec_new.data(), rec_new.len());
p += rec_new.len();
rid = rid->next_RID;
}
writer_->writen(ss, len);
delete[] ss;
}
if (value.attr_type() != TEXTS) {
writer_->writen(value.to_string().c_str(), value.to_string().size());
}

if (last_values.size() == (size_t)cell_num)
last_values[i] = value;
else
last_values.push_back(value);
}
return RC::SUCCESS;

writer_->writen("\n", 1);
}

return rc;
}

RC PlainCommunicator::write_result_internal(SessionEvent *event, bool &need_disconnect)
{
RC rc = RC::SUCCESS;
Expand All @@ -199,7 +257,7 @@ RC PlainCommunicator::write_result_internal(SessionEvent *event, bool &need_disc
return write_state(event, need_disconnect);
}

const TupleSchema &schema = sql_result->tuple_schema();
const TupleSchema &schema = const_cast<TupleSchema &>(sql_result->tuple_schema());
const int cell_num = schema.cell_num();

for (int i = 0; i < cell_num; i++) {
Expand Down Expand Up @@ -238,6 +296,16 @@ RC PlainCommunicator::write_result_internal(SessionEvent *event, bool &need_disc
}
}

// rc = write_tuple(sql_result);
// if (rc == RC::RECORD_EOF) {
// rc = RC::SUCCESS;
// } else {
// LOG_WARN("write tuple failed: %s", strrc(rc));
// sql_result->close();
// sql_result->set_return_code(rc);
// return write_state(event, need_disconnect);
// }

rc = RC::SUCCESS;

Tuple *tuple = nullptr;
Expand Down Expand Up @@ -286,6 +354,11 @@ RC PlainCommunicator::write_result_internal(SessionEvent *event, bool &need_disc

if (rc == RC::RECORD_EOF) {
rc = RC::SUCCESS;
}else {
LOG_WARN("write tuple failed: %s", strrc(rc));
sql_result->close();
sql_result->set_return_code(rc);
return write_state(event, need_disconnect);
}

if (cell_num == 0) {
Expand Down
2 changes: 2 additions & 0 deletions src/observer/net/plain_communicator.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ See the Mulan PSL v2 for more details. */
#include <vector>

#include "net/communicator.h"
#include "sql/executor/sql_result.h"

/**
* @brief 与客户端进行通讯
Expand All @@ -35,6 +36,7 @@ class PlainCommunicator : public Communicator
private:
RC write_state(SessionEvent *event, bool &need_disconnect);
RC write_debug(SessionEvent *event, bool &need_disconnect);
RC write_tuple(SqlResult *sql_result);
RC write_result_internal(SessionEvent *event, bool &need_disconnect);

protected:
Expand Down
8 changes: 6 additions & 2 deletions src/observer/sql/executor/sql_result.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,10 @@ class SqlResult
void set_state_string(const std::string &state_string) { state_string_ = state_string; }

void set_operator(std::unique_ptr<PhysicalOperator> oper);

std::unique_ptr<PhysicalOperator> &get_operator()
{
return operator_;
}
bool has_operator() const { return operator_ != nullptr; }
const TupleSchema &tuple_schema() const { return tuple_schema_; }
RC return_code() const { return return_code_; }
Expand All @@ -49,7 +52,8 @@ class SqlResult
RC open();
RC close();
RC next_tuple(Tuple *&tuple);


bool correlated_query_;
private:
Session *session_ = nullptr; ///< 当前所属会话
std::unique_ptr<PhysicalOperator> operator_; ///< 执行计划
Expand Down
31 changes: 31 additions & 0 deletions src/observer/sql/expr/tuple.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ class Tuple
*/
virtual RC cell_at(int index, Value &cell) const = 0;

virtual RC get_text_record(Record &rec,RID *rid) = 0;

/**
* @brief 根据cell的描述,获取cell的值
*
Expand Down Expand Up @@ -194,6 +196,16 @@ class RowTuple : public Tuple
Record &record() { return *record_; }

const Record &record() const { return *record_; }
const Table *table() const { return table_; }
void set_table(const Table *table) { table_ = table; }

std::vector<FieldExpr *> &speces() { return speces_; }
void set_speces(std::vector<FieldExpr *> &speces) { speces_ = speces; };

RC get_text_record(Record &rec,RID *rid) override {
RC rc = const_cast<Table *>(table_)->get_record(*rid, rec);
return rc;
}

private:
Record *record_ = nullptr;
Expand Down Expand Up @@ -237,6 +249,11 @@ class ProjectTuple : public Tuple
const TupleCellSpec *spec = speces_[index];
return tuple_->find_cell(*spec, cell);
}
RC get_text_record(Record &rec,RID *rid) override {
static_cast<RowTuple *>(tuple_)->get_text_record(rec, rid);
RC rc = RC::SUCCESS;
return rc;
}

RC find_cell(const TupleCellSpec &spec, Value &cell) const override { return tuple_->find_cell(spec, cell); }

Expand Down Expand Up @@ -283,6 +300,10 @@ class ExpressionTuple : public Tuple
}
return RC::NOTFOUND;
}
RC get_text_record(Record &rec,RID *rid) override {
RC rc = RC::SUCCESS;
return rc;
}

private:
const std::vector<std::unique_ptr<Expression>> &expressions_;
Expand Down Expand Up @@ -313,6 +334,11 @@ class ValueListTuple : public Tuple
}

virtual RC find_cell(const TupleCellSpec &spec, Value &cell) const override { return RC::INTERNAL; }

RC get_text_record(Record &rec,RID *rid) override {
RC rc = RC::SUCCESS;
return rc;
}

private:
std::vector<Value> cells_;
Expand Down Expand Up @@ -357,6 +383,11 @@ class JoinedTuple : public Tuple

return right_->find_cell(spec, value);
}

RC get_text_record(Record &rec,RID *rid) override {
RC rc = RC::SUCCESS;
return rc;
}

private:
Tuple *left_ = nullptr;
Expand Down
4 changes: 2 additions & 2 deletions src/observer/sql/operator/insert_physical_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,13 @@ InsertPhysicalOperator::InsertPhysicalOperator(Table *table, vector<Value> &&val
RC InsertPhysicalOperator::open(Trx *trx)
{
Record record;
RC rc = table_->make_record(static_cast<int>(values_.size()), values_.data(), record);
RC rc = table_->make_record(static_cast<int>(values_.size()), values_.data(), record); //构建一个record
if (rc != RC::SUCCESS) {
LOG_WARN("failed to make record. rc=%s", strrc(rc));
return rc;
}

rc = trx->insert_record(table_, record);
rc = trx->insert_record(table_, record); //按照record插入到表中,调用RecordFileHandler
if (rc != RC::SUCCESS) {
LOG_WARN("failed to insert record by transaction. rc=%s", strrc(rc));
}
Expand Down
1 change: 1 addition & 0 deletions src/observer/sql/operator/update_physical_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,5 @@ class UpdatePhysicalOperator : public PhysicalOperator
Field *field_ = nullptr;
Trx *trx_ = nullptr;
char *data_;
std::vector<const FieldMeta *> field_metas_;
};
Loading

0 comments on commit 180e3d4

Please sign in to comment.