Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
…wss-common into zmq
  • Loading branch information
divyagayathri-hcl committed Dec 3, 2024
2 parents c19af49 + aa1021f commit ffd7396
Show file tree
Hide file tree
Showing 6 changed files with 174 additions and 8 deletions.
2 changes: 1 addition & 1 deletion common/redispipeline.h
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ class RedisPipeline {
return;

m_channels.insert(channel);
m_luaPub += "redis.call('PUBLISH', '" + channel + "', 'G');";
m_luaPub += "redis.call('PUBLISH', '" + channel + "', 'G')\n";
m_shaPub = loadRedisScript(m_luaPub);
}

Expand Down
1 change: 1 addition & 0 deletions common/zmqclient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,7 @@ bool ZmqClient::wait(std::string& dbName,
rc = zmq_recv(m_socket, m_sendbuffer.data(), m_sendbuffer.size(), 0);
if (rc < 0)
{
SWSS_LOG_DEBUG("MQ_MAX_RETRY value is: %d", MQ_MAX_RETRY);
if (zmq_errno() == EINTR && i <= MQ_MAX_RETRY)
{
continue;
Expand Down
2 changes: 2 additions & 0 deletions common/zmqconsumerstatetable.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
#include "table.h"
#include "zmqserver.h"

#define MQ_MAX_RETRY 100

namespace swss {

class ZmqConsumerStateTable : public Selectable, public TableBase, public ZmqMessageHandler
Expand Down
66 changes: 66 additions & 0 deletions common/zmqserver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -171,4 +171,70 @@ void ZmqServer::mqPollThread()
SWSS_LOG_NOTICE("mqPollThread end");
}

void ZmqServer::sendMsg(const std::string& dbName, const std::string& tableName,
const std::vector<swss::KeyOpFieldsValuesTuple>& values)
{
int serializedlen = (int)BinarySerializer::serializeBuffer(
m_buffer.data(),
m_buffer.size(),
dbName,
tableName,
values);
SWSS_LOG_DEBUG("sending: %d", serializedlen);
int zmq_err = 0;
int retry_delay = 10;
int rc = 0;
for (int i = 0; i <= MQ_MAX_RETRY; ++i)
{
rc = zmq_send(m_socket, m_buffer.data(), serializedlen, 0);

if (rc >= 0)
{
m_allowZmqPoll = true;
SWSS_LOG_DEBUG("zmq sent %d bytes", serializedlen);
return;
}
zmq_err = zmq_errno();
// sleep (2 ^ retry time) * 10 ms
retry_delay *= 2;
if (zmq_err == EINTR
|| zmq_err== EFSM)
{
// EINTR: interrupted by signal
// EFSM: socket state not ready
// For example when ZMQ socket still not receive reply message from last sended package.
// There was state machine inside ZMQ socket, when the socket is not in ready to send state, this
// error will happen.
// for more detail, please check: http://api.zeromq.org/2-1:zmq-send
SWSS_LOG_DEBUG("zmq send retry, endpoint: %s, error: %d", m_endpoint.c_str(), zmq_err);

retry_delay = 0;
}
else if (zmq_err == EAGAIN)
{
// EAGAIN: ZMQ is full to need try again
SWSS_LOG_WARN("zmq is full, will retry in %d ms, endpoint: %s, error: %d", retry_delay, m_endpoint.c_str(), zmq_err);
}
else if (zmq_err == ETERM)
{
auto message = "zmq connection break, endpoint: " + m_endpoint + ", error: " + to_string(rc);
SWSS_LOG_ERROR("%s", message.c_str());
throw system_error(make_error_code(errc::connection_reset), message);
}
else
{
// for other error, send failed immediately.
auto message = "zmq send failed, endpoint: " + m_endpoint + ", error: " + to_string(rc);
SWSS_LOG_ERROR("%s", message.c_str());
// throw system_error(make_error_code(errc::io_error), message);
}
usleep(retry_delay * 1000);
}

// failed after retry
auto message = "zmq send failed, endpoint: " + m_endpoint + ", zmqerrno: " + to_string(zmq_err) + ":" + zmq_strerror(zmq_err) + ", msg length:" + to_string(serializedlen);
SWSS_LOG_ERROR("%s", message.c_str());
// throw system_error(make_error_code(errc::io_error), message);
}

}
7 changes: 6 additions & 1 deletion common/zmqserver.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

#define MQ_RESPONSE_MAX_COUNT (16*1024*1024)
#define MQ_SIZE 100
#define MQ_MAX_RETRY 10
#define MQ_MAX_RETRY 100
#define MQ_POLL_TIMEOUT (1000)
#define MQ_WATERMARK 10000

Expand Down Expand Up @@ -39,6 +39,9 @@ class ZmqServer
const std::string tableName,
ZmqMessageHandler* handler);

void sendMsg(const std::string& dbName, const std::string& tableName,
const std::vector<swss::KeyOpFieldsValuesTuple>& values);

private:
void handleReceivedData(const char* buffer, const size_t size);

Expand All @@ -56,6 +59,8 @@ class ZmqServer

std::string m_vrf;

void* m_socket;

bool m_allowZmqPoll;

std::map<std::string, std::map<std::string, ZmqMessageHandler*>> m_HandlerMap;
Expand Down
104 changes: 98 additions & 6 deletions tests/zmq_state_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -288,9 +288,6 @@ static void testMethod(bool producerPersistence)
// start consumer first, SHM can only have 1 consumer per table.
thread *consumerThread = new thread(consumerWorker, testTableName, pullEndpoint, !producerPersistence);

// Wait for the consumer to start.
sleep(1);

cout << "Starting " << NUMBER_OF_THREADS << " producers" << endl;
/* Starting the producer before the producer */
for (int i = 0; i < NUMBER_OF_THREADS; i++)
Expand Down Expand Up @@ -354,9 +351,6 @@ static void testBatchMethod(bool producerPersistence)
// start consumer first, SHM can only have 1 consumer per table.
thread *consumerThread = new thread(consumerWorker, testTableName, pullEndpoint, !producerPersistence);

// Wait for the consumer to start.
sleep(1);

cout << "Starting " << NUMBER_OF_THREADS << " producers" << endl;
/* Starting the producer before the producer */
for (int i = 0; i < NUMBER_OF_THREADS; i++)
Expand Down Expand Up @@ -471,3 +465,101 @@ TEST(ZmqProducerStateTableDeleteAfterSend, test)
table.getKeys(keys);
EXPECT_EQ(keys.front(), testKey);
}

static bool zmq_done = false;

static void zmqConsumerWorker(string tableName, string endpoint, bool dbPersistence)
{
cout << "Consumer thread started: " << tableName << endl;
DBConnector db(TEST_DB, 0, true);
ZmqServer server(endpoint);
ZmqConsumerStateTable c(&db, tableName, server, 128, 0, dbPersistence);
Select cs;
cs.addSelectable(&c);
//validate received data
Selectable *selectcs;
std::deque<KeyOpFieldsValuesTuple> vkco;
int ret = 0;
while (!zmq_done)
{
ret = cs.select(&selectcs, 10, true);
if (ret == Select::OBJECT)
{
c.pops(vkco);
std::vector<swss::KeyOpFieldsValuesTuple> values;
values.push_back(KeyOpFieldsValuesTuple{"k", SET_COMMAND, std::vector<FieldValueTuple>{FieldValueTuple{"f", "v"}}});
server.sendMsg(TEST_DB, tableName, values);
}
}

allDataReceived = true;
if (dbPersistence)
{
// wait all persist data write to redis
while (c.dbUpdaterQueueSize() > 0)
{
sleep(1);
}
}

cout << "Consumer thread ended: " << tableName << endl;
}

static void ZmqWithResponse(bool producerPersistence)
{
std::string testTableName = "ZMQ_PROD_CONS_UT";
std::string pushEndpoint = "tcp://localhost:1234";
std::string pullEndpoint = "tcp://*:1234";
// start consumer first, SHM can only have 1 consumer per table.
thread *consumerThread = new thread(zmqConsumerWorker, testTableName, pullEndpoint, !producerPersistence);

// Wait for the consumer to be ready.
sleep(1);
DBConnector db(TEST_DB, 0, true);
ZmqClient client(pushEndpoint);
ZmqProducerStateTable p(&db, testTableName, client, true);
std::vector<KeyOpFieldsValuesTuple> kcos;
kcos.push_back(KeyOpFieldsValuesTuple{"k", SET_COMMAND, std::vector<FieldValueTuple>{FieldValueTuple{"f", "v"}}});
std::vector<std::shared_ptr<KeyOpFieldsValuesTuple>> kcos_p;
std::string dbName, tableName;
for (int i =0; i < 3; ++i)
{
p.send(kcos);
ASSERT_TRUE(p.wait(dbName, tableName, kcos_p));
EXPECT_EQ(dbName, TEST_DB);
EXPECT_EQ(tableName, testTableName);
ASSERT_EQ(kcos_p.size(), 1);
EXPECT_EQ(kfvKey(*kcos_p[0]), "k");
EXPECT_EQ(kfvOp(*kcos_p[0]), SET_COMMAND);
std::vector<FieldValueTuple> cos = std::vector<FieldValueTuple>{FieldValueTuple{"f", "v"}};
EXPECT_EQ(kfvFieldsValues(*kcos_p[0]), cos);
}

zmq_done = true;
consumerThread->join();
delete consumerThread;
}

TEST(ZmqWithResponse, test)
{
// test with persist by consumer
ZmqWithResponse(false);
}

TEST(ZmqWithResponseClientError, test)
{
std::string testTableName = "ZMQ_PROD_CONS_UT";
std::string pushEndpoint = "tcp://localhost:1234";
DBConnector db(TEST_DB, 0, true);
ZmqClient client(pushEndpoint);
ZmqProducerStateTable p(&db, testTableName, client, true);
std::vector<KeyOpFieldsValuesTuple> kcos;
kcos.push_back(KeyOpFieldsValuesTuple{"k", SET_COMMAND, std::vector<FieldValueTuple>{}});
std::vector<std::shared_ptr<KeyOpFieldsValuesTuple>> kcos_p;
std::string dbName, tableName;
p.send(kcos);
// Wait will timeout without server reply.
EXPECT_FALSE(p.wait(dbName, tableName, kcos_p));
// Send will return error without server reply.
}

0 comments on commit ffd7396

Please sign in to comment.