From 2af989108d219bd6e69a842ebac3774d2547b87b Mon Sep 17 00:00:00 2001 From: Igor Gaponenko Date: Wed, 14 Sep 2022 22:50:18 +0000 Subject: [PATCH 1/3] Extended query generator for dropping MySQL partitions --- src/replica/DatabaseMySQLGenerator.h | 18 ++++++++++++++---- src/replica/testQueryGenerator.cc | 1 + 2 files changed, 15 insertions(+), 4 deletions(-) diff --git a/src/replica/DatabaseMySQLGenerator.h b/src/replica/DatabaseMySQLGenerator.h index c9c18b8b22..5648455f51 100644 --- a/src/replica/DatabaseMySQLGenerator.h +++ b/src/replica/DatabaseMySQLGenerator.h @@ -906,8 +906,8 @@ class QueryGenerator { } std::string partition(TransactionId transactionId) const { - return " (PARTITION " + id("p" + std::to_string(transactionId)).str + " VALUES IN (" + - std::to_string(transactionId) + "))"; + return " (PARTITION " + partId(transactionId).str + " VALUES IN (" + std::to_string(transactionId) + + "))"; } // Generators for ALTER TABLE ... @@ -928,8 +928,18 @@ class QueryGenerator { return sql; } - std::string dropPartition(TransactionId transactionId) const { - return " DROP PARTITION " + id("p" + std::to_string(transactionId)).str; + /** + * @brief Generate " DROP PARTITION [IF EXISTS] `p`". + * @param transactionId An identifier of the super-transaction corresponding to + * a partition to be removed. + * @param ifExists If 'true' then add 'IF EXISTS'. + * @return The complete query fragment. + */ + std::string dropPartition(TransactionId transactionId, bool ifExists = false) const { + std::string sql = " DROP PARTITION "; + if (ifExists) sql += "IF EXISTS "; + sql += partId(transactionId).str; + return sql; } // Generators for LOAD DATA INFILE diff --git a/src/replica/testQueryGenerator.cc b/src/replica/testQueryGenerator.cc index dab6bb3094..07ceccc105 100644 --- a/src/replica/testQueryGenerator.cc +++ b/src/replica/testQueryGenerator.cc @@ -377,6 +377,7 @@ BOOST_AUTO_TEST_CASE(QueryGeneratorTest) { {" ADD PARTITION (PARTITION `p12` VALUES IN (12))", g.addPartition(12)}, {" ADD PARTITION IF NOT EXISTS (PARTITION `p12` VALUES IN (12))", g.addPartition(12, true)}, {" DROP PARTITION `p2`", g.dropPartition(2)}, + {" DROP PARTITION IF EXISTS `p3`", g.dropPartition(3, true)}, // LOAD DATA [LOCAL] INFILE ... {"LOAD DATA INFILE '/tmp/infile.csv' INTO TABLE `table` " + csv::Dialect().sqlOptions(), From 40110a6587d31fb8d80165e9c241ac42c8a65583 Mon Sep 17 00:00:00 2001 From: Igor Gaponenko Date: Wed, 14 Sep 2022 22:50:53 +0000 Subject: [PATCH 2/3] Fixed bugs in the MySQL partition removal code --- src/replica/HttpIngestTransModule.cc | 4 +++- src/replica/IngestFileSvc.cc | 10 ++++++---- src/replica/WorkerSqlRequest.cc | 4 +++- 3 files changed, 12 insertions(+), 6 deletions(-) diff --git a/src/replica/HttpIngestTransModule.cc b/src/replica/HttpIngestTransModule.cc index 9c4fbb2f88..a501ad6eab 100644 --- a/src/replica/HttpIngestTransModule.cc +++ b/src/replica/HttpIngestTransModule.cc @@ -481,7 +481,9 @@ void HttpIngestTransModule::_removePartitionFromSecondaryIndex(DatabaseInfo cons database::mysql::ConnectionHandler const h(qservMasterDbConnection("qservMeta")); database::mysql::QueryGenerator const g(h.conn); - string const query = g.alterTable(database.name + "__" + table.name) + g.dropPartition(transactionId); + bool const ifExists = true; + string const query = + g.alterTable(database.name + "__" + table.name) + g.dropPartition(transactionId, ifExists); // Not having the specified partition is still fine as it couldn't be properly // created after the transaction was created. diff --git a/src/replica/IngestFileSvc.cc b/src/replica/IngestFileSvc.cc index cdd7a2e432..0298e7d5e1 100644 --- a/src/replica/IngestFileSvc.cc +++ b/src/replica/IngestFileSvc.cc @@ -239,9 +239,10 @@ void IngestFileSvc::loadDataIntoTable() { SqlId const sqlDestinationTable = _isOverlap ? sqlFullOverlapTable : sqlTable; bool const local = false; dataLoadQuery = g.loadDataInfile(_fileName, sqlDestinationTable, local, _dialect); - partitionRemovalQuery = - Query(g.alterTable(sqlDestinationTable) + g.dropPartition(_transactionId), - sqlDestinationTable.str); + bool const ifExists = true; + partitionRemovalQuery = Query( + g.alterTable(sqlDestinationTable) + g.dropPartition(_transactionId, ifExists), + sqlDestinationTable.str); } } } else { @@ -253,8 +254,9 @@ void IngestFileSvc::loadDataIntoTable() { g.alterTable(sqlTable) + g.addPartition(_transactionId, ifNotExists), sqlTable.str)); bool const local = false; dataLoadQuery = g.loadDataInfile(_fileName, sqlTable, local, _dialect); + bool const ifExists = true; partitionRemovalQuery = - Query(g.alterTable(sqlTable) + g.dropPartition(_transactionId), sqlTable.str); + Query(g.alterTable(sqlTable) + g.dropPartition(_transactionId, ifExists), sqlTable.str); } for (auto&& statement : tableMgtStatements) { LOGS(_log, LOG_LVL_DEBUG, context_ << "query: " << statement.query); diff --git a/src/replica/WorkerSqlRequest.cc b/src/replica/WorkerSqlRequest.cc index cfdc2f1e4d..b9dddffe89 100644 --- a/src/replica/WorkerSqlRequest.cc +++ b/src/replica/WorkerSqlRequest.cc @@ -309,7 +309,9 @@ Query WorkerSqlRequest::_query(Connection::Ptr const& conn, string const& table) return Query(query, databaseTable.str); } case ProtocolRequestSql::DROP_TABLE_PARTITION: { - string const query = g.alterTable(databaseTable) + g.dropPartition(_request.transaction_id()); + bool const ifExists = true; + string const query = + g.alterTable(databaseTable) + g.dropPartition(_request.transaction_id(), ifExists); return Query(query, databaseTable.str); } case ProtocolRequestSql::REMOVE_TABLE_PARTITIONING: { From 33129d5e5b2a9e1a70f90ea2b366ff5c3d6345a2 Mon Sep 17 00:00:00 2001 From: Igor Gaponenko Date: Wed, 14 Sep 2022 23:19:01 +0000 Subject: [PATCH 3/3] Fixed a bug in the worker ingest service --- src/replica/IngestHttpSvcMod.cc | 22 ++++++++++++---------- 1 file changed, 12 insertions(+), 10 deletions(-) diff --git a/src/replica/IngestHttpSvcMod.cc b/src/replica/IngestHttpSvcMod.cc index 4d19bd9b20..e4321d5ec9 100644 --- a/src/replica/IngestHttpSvcMod.cc +++ b/src/replica/IngestHttpSvcMod.cc @@ -133,18 +133,20 @@ IngestRequest::Ptr IngestHttpSvcMod::_createRequest(bool async) const { string const url = body().required("url"); csv::DialectInput dialectInput; - // Allow "column_separator" for the sake of the backward compatibility with the older - // version of the API. The parameter "column_separator" if present will override the one - // of "fields_terminated_by" - dialectInput.fieldsTerminatedBy = body().optional( - "column_separator", - body().optional("fields_terminated_by", csv::Dialect::defaultFieldsTerminatedBy)); + // Allow an empty string in the input. Simply replace the one (if present) with + // the corresponding default value of the parameter. + auto const getDialectParam = [&](string const& param, string const& defaultValue) -> string { + string val = body().optional(param, defaultValue); + if (val.empty()) val = defaultValue; + return val; + }; + dialectInput.fieldsTerminatedBy = + getDialectParam("fields_terminated_by", csv::Dialect::defaultFieldsTerminatedBy); dialectInput.fieldsEnclosedBy = - body().optional("fields_enclosed_by", csv::Dialect::defaultFieldsEnclosedBy); - dialectInput.fieldsEscapedBy = - body().optional("fields_escaped_by", csv::Dialect::defaultFieldsEscapedBy); + getDialectParam("fields_enclosed_by", csv::Dialect::defaultFieldsEnclosedBy); + dialectInput.fieldsEscapedBy = getDialectParam("fields_escaped_by", csv::Dialect::defaultFieldsEscapedBy); dialectInput.linesTerminatedBy = - body().optional("lines_terminated_by", csv::Dialect::defaultLinesTerminatedBy); + getDialectParam("lines_terminated_by", csv::Dialect::defaultLinesTerminatedBy); string const httpMethod = body().optional("http_method", "GET"); string const httpData = body().optional("http_data", string());