Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add socket channel, unclean #6

Open
wants to merge 3 commits into
base: messaging_refactor_develop_phase3
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ set (MESSAGE_INFRASTRUCTURE_SRCS
"message_infrastructure/csrc/multiprocessing.cc"
"message_infrastructure/csrc/posix_actor.cc"
"message_infrastructure/csrc/shm.cc"
"message_infrastructure/csrc/socket.cc"
"message_infrastructure/csrc/shmem_channel.cc"
"message_infrastructure/csrc/shmem_port.cc"
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,10 @@ class ChannelFactory {
break;
case DDSCHANNEL:
break;
case SOCKETCHANNEL:

default:
return GetShmemChannel(size, nbytes, src_name, dst_name);
return GetShmemChannel(channel_type, size, nbytes, src_name, dst_name);
}
return NULL;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
// Copyright (C) 2021 Intel Corporation
// SPDX-License-Identifier: BSD-3-Clause
// See: https://spdx.org/licenses/

#ifndef COMMUNICATOR_H_
#define COMMUNICATOR_H_

#include <functional>

namespace message_infrastructure {

using HandleFn = std::function<void(void *)>;

class SharedCommunicator {
public:
SharedCommunicator() {}
virtual void Start() = 0;
virtual bool Load(HandleFn consume_fn) = 0;
virtual void Store(HandleFn store_fn) = 0;
};

using SharedCommunicatorPtr = std::shared_ptr<SharedCommunicator>;

} // namespace message_infrastructure

#endif // COMMUNICATOR_H_
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#define LOG_LAYER (0)
#define DEBUG_MODE (0)
#define LOG_SMMP (0) // log for shmemport
#define LOG_SSKP (0) // log for socketport

#define LAVA_LOG(_cond, _fmt, ...) { \
if ((_cond)) { \
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ PYBIND11_MODULE(MessageInfrastructurePywrapper, m) {
.value("SHMEMCHANNEL", SHMEMCHANNEL)
.value("RPCCHANNEL", RPCCHANNEL)
.value("DDSCHANNEL", DDSCHANNEL)
.value("SOCKETCHANNEL", SOCKETCHANNEL)
.export_values();
py::class_<PortProxy, std::shared_ptr<PortProxy>> (m, "AbstractTransferPort")
.def(py::init<>());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include <ctime>

#include "message_infrastructure_logging.h"
#include "communicator.h"

namespace message_infrastructure {

Expand All @@ -30,7 +31,7 @@ namespace message_infrastructure {

using HandleFn = std::function<void(void *)>;

class SharedMemory {
class SharedMemory : public SharedCommunicator {
public:
SharedMemory() {}
SharedMemory(const size_t &mem_size, const int &shmfd, const int &key);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,29 @@

namespace message_infrastructure {

ShmemChannel::ShmemChannel(const std::string &src_name,
ShmemChannel::ShmemChannel(const ChannelType &channel_type,
const std::string &src_name,
const std::string &dst_name,
const size_t &size,
const size_t &nbytes) {
unsigned long shmem_size = nbytes + sizeof(MetaData);

shm_ = GetSharedMemManager().AllocChannelSharedMemory<SharedMemory>(shmem_size);

send_port_ = std::make_shared<ShmemSendPort>(src_name, shm_, size, shmem_size);
recv_port_ = std::make_shared<ShmemRecvPort>(dst_name, shm_, size, shmem_size);
size_t items_size = size;
switch (channel_type) {
case RPCCHANNEL:
break;
case DDSCHANNEL:
break;
case SOCKETCHANNEL:
items_size = 0;
shm_ = GetSharedSktManager().AllocChannelSharedSocket<SharedSocket>(shmem_size);
break;
default:
shm_ = GetSharedMemManager().AllocChannelSharedMemory<SharedMemory>(shmem_size);
}
// shm_ = GetSharedMemManager().AllocChannelSharedMemory<SharedMemory>(shmem_size);

send_port_ = std::make_shared<ShmemSendPort>(src_name, shm_, items_size, shmem_size);
recv_port_ = std::make_shared<ShmemRecvPort>(dst_name, shm_, items_size, shmem_size);
}

AbstractSendPortPtr ShmemChannel::GetSendPort() {
Expand All @@ -28,11 +41,13 @@ AbstractRecvPortPtr ShmemChannel::GetRecvPort() {
return recv_port_;
}

std::shared_ptr<ShmemChannel> GetShmemChannel(const size_t &size,
std::shared_ptr<ShmemChannel> GetShmemChannel(const ChannelType &channel_type,
const size_t &size,
const size_t &nbytes,
const std::string &src_name,
const std::string &dst_name) {
return (std::make_shared<ShmemChannel>(src_name,
return (std::make_shared<ShmemChannel>(channel_type,
src_name,
dst_name,
size,
nbytes));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,26 +11,29 @@
#include "abstract_channel.h"
#include "abstract_port.h"
#include "shm.h"
#include "socket.h"
#include "shmem_port.h"

namespace message_infrastructure {

class ShmemChannel : public AbstractChannel {
public:
ShmemChannel() {}
ShmemChannel(const std::string &src_name,
ShmemChannel(const ChannelType &channel_type,
const std::string &src_name,
const std::string &dst_name,
const size_t &size,
const size_t &nbytes);
AbstractSendPortPtr GetSendPort();
AbstractRecvPortPtr GetRecvPort();
private:
SharedMemoryPtr shm_ = nullptr;
ShmemSendPortPtr send_port_ = nullptr;
ShmemRecvPortPtr recv_port_ = nullptr;
SharedCommunicatorPtr shm_ = NULL;
ShmemSendPortPtr send_port_ = NULL;
ShmemRecvPortPtr recv_port_ = NULL;
};

std::shared_ptr<ShmemChannel> GetShmemChannel(const size_t &size,
std::shared_ptr<ShmemChannel> GetShmemChannel(const ChannelType &channel_type,
const size_t &size,
const size_t &nbytes,
const std::string &src_name,
const std::string &dst_name);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ ShmemRecvQueue::~ShmemRecvQueue() {
}

ShmemSendPort::ShmemSendPort(const std::string &name,
SharedMemoryPtr shm,
SharedCommunicatorPtr shm,
const size_t &size,
const size_t &nbytes)
: AbstractSendPort(name, size, nbytes), shm_(shm), done_(false)
Expand Down Expand Up @@ -146,15 +146,17 @@ void ShmemSendPort::Join() {
}

ShmemRecvPort::ShmemRecvPort(const std::string &name,
SharedMemoryPtr shm,
SharedCommunicatorPtr shm,
const size_t &size,
const size_t &nbytes)
: AbstractRecvPort(name, size, nbytes), shm_(shm), done_(false)
{
if (size_ != 0)
queue_ = std::make_shared<ShmemRecvQueue>(name_, size_, nbytes_);
}

void ShmemRecvPort::Start() {
if (size_ != 0)
recv_queue_thread_ = std::make_shared<std::thread>(&message_infrastructure::ShmemRecvPort::QueueRecv, this);
}

Expand All @@ -173,12 +175,41 @@ void ShmemRecvPort::QueueRecv() {
}
}

char * ShmemRecvPort::NoQueueRecv(){
if(!done_.load()) {
bool ret = false;
// if (this->queue_->AvailableCount() > 0) {
void *ptr = malloc(nbytes_);
ret = shm_->Load([this, ptr](void* data){
//this->queue_->Push(data);
std::memcpy(ptr, data, nbytes_);
});
// }
if (!ret) {
// sleep
// helper::Sleep();
free(ptr);
return NULL;
}
return (char *)ptr;
}
return NULL;
}

bool ShmemRecvPort::Probe() {
return queue_->Probe();
}

MetaDataPtr ShmemRecvPort::Recv() {
char *cptr = (char *)queue_->Pop(true);
char *cptr;
if (size_ != 0){
cptr = (char *)queue_->Pop(true);
}else{
cptr = NoQueueRecv();
}
if (cptr == NULL){

}
MetaDataPtr metadata_res = std::make_shared<MetaData>();
std::memcpy(metadata_res.get(), cptr, sizeof(MetaData));
metadata_res->mdata = (void*)(cptr + sizeof(MetaData));
Expand All @@ -188,8 +219,10 @@ MetaDataPtr ShmemRecvPort::Recv() {
void ShmemRecvPort::Join() {
if (!done_) {
done_ = true;
if (size_ != 0){
recv_queue_thread_->join();
queue_->Stop();
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ using ThreadPtr = std::shared_ptr<std::thread>;
class ShmemSendPort final : public AbstractSendPort {
public:
ShmemSendPort(const std::string &name,
SharedMemoryPtr shm,
SharedCommunicatorPtr shm,
const size_t &size,
const size_t &nbytes);
void Start();
Expand All @@ -30,7 +30,7 @@ class ShmemSendPort final : public AbstractSendPort {
bool Probe();

private:
SharedMemoryPtr shm_ = nullptr;
SharedCommunicatorPtr shm_ = nullptr;
int idx_ = 0;
std::atomic_bool done_;
ThreadPtr ack_callback_thread_ = nullptr;
Expand Down Expand Up @@ -68,7 +68,7 @@ using ShmemRecvQueuePtr = std::shared_ptr<ShmemRecvQueue>;
class ShmemRecvPort final : public AbstractRecvPort {
public:
ShmemRecvPort(const std::string &name,
SharedMemoryPtr shm,
SharedCommunicatorPtr shm,
const size_t &size,
const size_t &nbytes);
void Start();
Expand All @@ -77,9 +77,10 @@ class ShmemRecvPort final : public AbstractRecvPort {
void Join();
MetaDataPtr Peek();
void QueueRecv();
char *NoQueueRecv();

private:
SharedMemoryPtr shm_ = nullptr;
SharedCommunicatorPtr shm_ = nullptr;
int idx_ = 0;
std::atomic_bool done_;
ShmemRecvQueuePtr queue_ = nullptr;
Expand Down
Loading