Skip to content

Commit

Permalink
Merge branch 'tickets/DM-36239' into main
Browse files Browse the repository at this point in the history
  • Loading branch information
iagaponenko committed Sep 15, 2022
2 parents 7284886 + 33129d5 commit 61a1a39
Show file tree
Hide file tree
Showing 6 changed files with 39 additions and 20 deletions.
18 changes: 14 additions & 4 deletions src/replica/DatabaseMySQLGenerator.h
Original file line number Diff line number Diff line change
Expand Up @@ -906,8 +906,8 @@ class QueryGenerator {
}

std::string partition(TransactionId transactionId) const {
return " (PARTITION " + id("p" + std::to_string(transactionId)).str + " VALUES IN (" +
std::to_string(transactionId) + "))";
return " (PARTITION " + partId(transactionId).str + " VALUES IN (" + std::to_string(transactionId) +
"))";
}

// Generators for ALTER TABLE ...
Expand All @@ -928,8 +928,18 @@ class QueryGenerator {
return sql;
}

std::string dropPartition(TransactionId transactionId) const {
return " DROP PARTITION " + id("p" + std::to_string(transactionId)).str;
/**
* @brief Generate " DROP PARTITION [IF EXISTS] `p<transaction-id>`".
* @param transactionId An identifier of the super-transaction corresponding to
* a partition to be removed.
* @param ifExists If 'true' then add 'IF EXISTS'.
* @return The complete query fragment.
*/
std::string dropPartition(TransactionId transactionId, bool ifExists = false) const {
std::string sql = " DROP PARTITION ";
if (ifExists) sql += "IF EXISTS ";
sql += partId(transactionId).str;
return sql;
}

// Generators for LOAD DATA INFILE
Expand Down
4 changes: 3 additions & 1 deletion src/replica/HttpIngestTransModule.cc
Original file line number Diff line number Diff line change
Expand Up @@ -481,7 +481,9 @@ void HttpIngestTransModule::_removePartitionFromSecondaryIndex(DatabaseInfo cons

database::mysql::ConnectionHandler const h(qservMasterDbConnection("qservMeta"));
database::mysql::QueryGenerator const g(h.conn);
string const query = g.alterTable(database.name + "__" + table.name) + g.dropPartition(transactionId);
bool const ifExists = true;
string const query =
g.alterTable(database.name + "__" + table.name) + g.dropPartition(transactionId, ifExists);

// Not having the specified partition is still fine as it couldn't be properly
// created after the transaction was created.
Expand Down
10 changes: 6 additions & 4 deletions src/replica/IngestFileSvc.cc
Original file line number Diff line number Diff line change
Expand Up @@ -239,9 +239,10 @@ void IngestFileSvc::loadDataIntoTable() {
SqlId const sqlDestinationTable = _isOverlap ? sqlFullOverlapTable : sqlTable;
bool const local = false;
dataLoadQuery = g.loadDataInfile(_fileName, sqlDestinationTable, local, _dialect);
partitionRemovalQuery =
Query(g.alterTable(sqlDestinationTable) + g.dropPartition(_transactionId),
sqlDestinationTable.str);
bool const ifExists = true;
partitionRemovalQuery = Query(
g.alterTable(sqlDestinationTable) + g.dropPartition(_transactionId, ifExists),
sqlDestinationTable.str);
}
}
} else {
Expand All @@ -253,8 +254,9 @@ void IngestFileSvc::loadDataIntoTable() {
g.alterTable(sqlTable) + g.addPartition(_transactionId, ifNotExists), sqlTable.str));
bool const local = false;
dataLoadQuery = g.loadDataInfile(_fileName, sqlTable, local, _dialect);
bool const ifExists = true;
partitionRemovalQuery =
Query(g.alterTable(sqlTable) + g.dropPartition(_transactionId), sqlTable.str);
Query(g.alterTable(sqlTable) + g.dropPartition(_transactionId, ifExists), sqlTable.str);
}
for (auto&& statement : tableMgtStatements) {
LOGS(_log, LOG_LVL_DEBUG, context_ << "query: " << statement.query);
Expand Down
22 changes: 12 additions & 10 deletions src/replica/IngestHttpSvcMod.cc
Original file line number Diff line number Diff line change
Expand Up @@ -133,18 +133,20 @@ IngestRequest::Ptr IngestHttpSvcMod::_createRequest(bool async) const {
string const url = body().required<string>("url");

csv::DialectInput dialectInput;
// Allow "column_separator" for the sake of the backward compatibility with the older
// version of the API. The parameter "column_separator" if present will override the one
// of "fields_terminated_by"
dialectInput.fieldsTerminatedBy = body().optional<string>(
"column_separator",
body().optional<string>("fields_terminated_by", csv::Dialect::defaultFieldsTerminatedBy));
// Allow an empty string in the input. Simply replace the one (if present) with
// the corresponding default value of the parameter.
auto const getDialectParam = [&](string const& param, string const& defaultValue) -> string {
string val = body().optional<string>(param, defaultValue);
if (val.empty()) val = defaultValue;
return val;
};
dialectInput.fieldsTerminatedBy =
getDialectParam("fields_terminated_by", csv::Dialect::defaultFieldsTerminatedBy);
dialectInput.fieldsEnclosedBy =
body().optional<string>("fields_enclosed_by", csv::Dialect::defaultFieldsEnclosedBy);
dialectInput.fieldsEscapedBy =
body().optional<string>("fields_escaped_by", csv::Dialect::defaultFieldsEscapedBy);
getDialectParam("fields_enclosed_by", csv::Dialect::defaultFieldsEnclosedBy);
dialectInput.fieldsEscapedBy = getDialectParam("fields_escaped_by", csv::Dialect::defaultFieldsEscapedBy);
dialectInput.linesTerminatedBy =
body().optional<string>("lines_terminated_by", csv::Dialect::defaultLinesTerminatedBy);
getDialectParam("lines_terminated_by", csv::Dialect::defaultLinesTerminatedBy);

string const httpMethod = body().optional<string>("http_method", "GET");
string const httpData = body().optional<string>("http_data", string());
Expand Down
4 changes: 3 additions & 1 deletion src/replica/WorkerSqlRequest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,9 @@ Query WorkerSqlRequest::_query(Connection::Ptr const& conn, string const& table)
return Query(query, databaseTable.str);
}
case ProtocolRequestSql::DROP_TABLE_PARTITION: {
string const query = g.alterTable(databaseTable) + g.dropPartition(_request.transaction_id());
bool const ifExists = true;
string const query =
g.alterTable(databaseTable) + g.dropPartition(_request.transaction_id(), ifExists);
return Query(query, databaseTable.str);
}
case ProtocolRequestSql::REMOVE_TABLE_PARTITIONING: {
Expand Down
1 change: 1 addition & 0 deletions src/replica/testQueryGenerator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,7 @@ BOOST_AUTO_TEST_CASE(QueryGeneratorTest) {
{" ADD PARTITION (PARTITION `p12` VALUES IN (12))", g.addPartition(12)},
{" ADD PARTITION IF NOT EXISTS (PARTITION `p12` VALUES IN (12))", g.addPartition(12, true)},
{" DROP PARTITION `p2`", g.dropPartition(2)},
{" DROP PARTITION IF EXISTS `p3`", g.dropPartition(3, true)},

// LOAD DATA [LOCAL] INFILE ...
{"LOAD DATA INFILE '/tmp/infile.csv' INTO TABLE `table` " + csv::Dialect().sqlOptions(),
Expand Down

0 comments on commit 61a1a39

Please sign in to comment.