Skip to content

Commit

Permalink
Database State Server (#15)
Browse files Browse the repository at this point in the history
  • Loading branch information
ksmit799 authored Oct 4, 2023
1 parent 3eb3d18 commit 3d957a3
Show file tree
Hide file tree
Showing 18 changed files with 838 additions and 89 deletions.
14 changes: 14 additions & 0 deletions config.example.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ want-client-agent: true
# Do we want to handle database operations on this instance?
want-database: true

# Do we want database-backed distributed objects to live on this instance?
want-db-state-server: true

# Do we want metrics collection on this instance?
want-metrics: true

Expand Down Expand Up @@ -87,3 +90,14 @@ database-server:
generate:
min: 100000000
max: 399999999

# Database State Server configuration.
db-state-server:
# The channel of the database server we should use for querying.
database: 4003

# The range of DoId's we should be listening for.
# Notice how this matches our database server generate range.
ranges:
min: 100000000
max: 399999999
64 changes: 41 additions & 23 deletions src/database/database_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,22 @@ bool DatabaseUtils::UnpackFields(DatagramIterator &dgi,

void DatabaseUtils::FieldToBson(
bsoncxx::builder::stream::single_context builder, DCPacker &packer) {
// Check if we have an atomic field.
// If we do, recursively unpack into an array.
auto atomicField = packer.get_current_field()->as_field()->as_atomic_field();
if (atomicField != nullptr) {
auto arrayBuilder = builder << bsoncxx::builder::stream::open_array;

packer.push();
while (packer.more_nested_fields()) {
FieldToBson(arrayBuilder, packer);
}
packer.pop();

arrayBuilder << bsoncxx::builder::stream::close_array;
return;
}

// Unpack the field into a bson format.
// Note that this function can be recursively called with certain pack types.
DCPackType packType = packer.get_pack_type();
Expand Down Expand Up @@ -192,37 +208,37 @@ void DatabaseUtils::FieldToBson(
void DatabaseUtils::BsonToField(const DCSubatomicType &fieldType,
const std::string &fieldName,
const bsoncxx::types::bson_value::view &value,
Datagram &dg) {
const int &divisor, Datagram &dg) {
try {
switch (fieldType) {
case ST_invalid:
throw ConversionException("Got invalid field type");
case ST_int8:
dg.AddInt8(BsonToNumber<int8_t>(value));
dg.AddInt8(BsonToNumber<int8_t>(value, divisor));
break;
case ST_int16:
dg.AddInt16(BsonToNumber<int16_t>(value));
dg.AddInt16(BsonToNumber<int16_t>(value, divisor));
break;
case ST_int32:
dg.AddInt32(BsonToNumber<int32_t>(value));
dg.AddInt32(BsonToNumber<int32_t>(value, divisor));
break;
case ST_int64:
dg.AddInt64(BsonToNumber<int64_t>(value));
dg.AddInt64(BsonToNumber<int64_t>(value, divisor));
break;
case ST_uint8:
dg.AddUint8(BsonToNumber<uint8_t>(value));
dg.AddUint8(BsonToNumber<uint8_t>(value, divisor));
break;
case ST_uint16:
dg.AddUint16(BsonToNumber<uint16_t>(value));
dg.AddUint16(BsonToNumber<uint16_t>(value, divisor));
break;
case ST_uint32:
dg.AddUint32(BsonToNumber<uint32_t>(value));
dg.AddUint32(BsonToNumber<uint32_t>(value, divisor));
break;
case ST_uint64:
dg.AddUint64(BsonToNumber<uint64_t>(value));
dg.AddUint64(BsonToNumber<uint64_t>(value, divisor));
break;
case ST_float64:
dg.AddFloat64(BsonToNumber<double>(value));
dg.AddFloat64(BsonToNumber<double>(value, divisor));
break;
case ST_string:
if (value.type() != bsoncxx::type::k_string) {
Expand All @@ -244,7 +260,7 @@ void DatabaseUtils::BsonToField(const DCSubatomicType &fieldType,

Datagram arrDg;
for (const auto &it : value.get_array().value) {
arrDg.AddInt16(BsonToNumber<int16_t>(it.get_value()));
arrDg.AddInt16(BsonToNumber<int16_t>(it.get_value(), divisor));
}

dg.AddBlob(arrDg.GetData(), arrDg.Size());
Expand All @@ -257,7 +273,7 @@ void DatabaseUtils::BsonToField(const DCSubatomicType &fieldType,

Datagram arrDg;
for (const auto &it : value.get_array().value) {
arrDg.AddInt32(BsonToNumber<int32_t>(it.get_value()));
arrDg.AddInt32(BsonToNumber<int32_t>(it.get_value(), divisor));
}

dg.AddBlob(arrDg.GetData(), arrDg.Size());
Expand All @@ -270,7 +286,7 @@ void DatabaseUtils::BsonToField(const DCSubatomicType &fieldType,

Datagram arrDg;
for (const auto &it : value.get_array().value) {
arrDg.AddUint16(BsonToNumber<uint16_t>(it.get_value()));
arrDg.AddUint16(BsonToNumber<uint16_t>(it.get_value(), divisor));
}

dg.AddBlob(arrDg.GetData(), arrDg.Size());
Expand All @@ -283,7 +299,7 @@ void DatabaseUtils::BsonToField(const DCSubatomicType &fieldType,

Datagram arrDg;
for (const auto &it : value.get_array().value) {
arrDg.AddUint32(BsonToNumber<uint32_t>(it.get_value()));
arrDg.AddUint32(BsonToNumber<uint32_t>(it.get_value(), divisor));
}

dg.AddBlob(arrDg.GetData(), arrDg.Size());
Expand All @@ -296,7 +312,7 @@ void DatabaseUtils::BsonToField(const DCSubatomicType &fieldType,

Datagram arrDg;
for (const auto &it : value.get_array().value) {
arrDg.AddInt8(BsonToNumber<int8_t>(it.get_value()));
arrDg.AddInt8(BsonToNumber<int8_t>(it.get_value(), divisor));
}

dg.AddBlob(arrDg.GetData(), arrDg.Size());
Expand All @@ -309,7 +325,7 @@ void DatabaseUtils::BsonToField(const DCSubatomicType &fieldType,

Datagram arrDg;
for (const auto &it : value.get_array().value) {
arrDg.AddUint8(BsonToNumber<uint8_t>(it.get_value()));
arrDg.AddUint8(BsonToNumber<uint8_t>(it.get_value(), divisor));
}

dg.AddBlob(arrDg.GetData(), arrDg.Size());
Expand All @@ -321,10 +337,10 @@ void DatabaseUtils::BsonToField(const DCSubatomicType &fieldType,
}
dg.AddUint16(value.get_array().value.length());
for (size_t i = 0; i < value.get_array().value.length();) {
dg.AddUint32(
BsonToNumber<uint32_t>(value.get_array().value[i].get_value()));
dg.AddUint8(
BsonToNumber<uint8_t>(value.get_array().value[i + 1].get_value()));
dg.AddUint32(BsonToNumber<uint32_t>(
value.get_array().value[i].get_value(), divisor));
dg.AddUint8(BsonToNumber<uint8_t>(
value.get_array().value[i + 1].get_value(), divisor));
i += 2;
}
break;
Expand Down Expand Up @@ -352,7 +368,8 @@ void DatabaseUtils::PackField(const DCField *field,
// Unpack each sub-element that composes the field.
auto numFields = atomicField->get_num_elements();
for (int i = 0; i < numFields; i++) {
PackField(atomicField->get_element(i), value, dg);
PackField(atomicField->get_element(i),
value.get_array().value[i].get_value(), dg);
}
return;
}
Expand All @@ -364,7 +381,7 @@ void DatabaseUtils::PackField(const DCField *field,
auto fieldSimple = fieldParameter->as_simple_parameter();
if (fieldSimple != nullptr) {
DatabaseUtils::BsonToField(fieldSimple->get_type(), field->get_name(),
value, dg);
value, fieldSimple->get_divisor(), dg);
}

// Do we have a class field type?
Expand All @@ -387,7 +404,8 @@ void DatabaseUtils::PackField(const DCField *field,
Datagram arrDg;
for (const auto &arrVal : value.get_array().value) {
DatabaseUtils::BsonToField(fieldType, field->get_name(),
arrVal.get_value(), arrDg);
arrVal.get_value(),
elemParamSimple->get_divisor(), arrDg);
}

dg.AddBlob(arrDg.GetData(), arrDg.Size());
Expand Down
14 changes: 10 additions & 4 deletions src/database/database_utils.h
Original file line number Diff line number Diff line change
@@ -1,18 +1,20 @@
#ifndef ARDOS_DATABASE_UTILS_H
#define ARDOS_DATABASE_UTILS_H

#include <cmath> /* modf */
#include <list>

#include <bsoncxx/builder/stream/document.hpp>
#include <bsoncxx/types/bson_value/view.hpp>
#include <dcClass.h>
#include <dcClassParameter.h>
#include <dcPacker.h>

#include "../net/datagram_iterator.h"
#include "../util/globals.h"

namespace Ardos {

typedef std::map<const DCField *, std::vector<uint8_t>> FieldMap;

/**
* Exception thrown by many database utility functions.
*/
Expand Down Expand Up @@ -75,7 +77,7 @@ class DatabaseUtils {
static void BsonToField(const DCSubatomicType &fieldType,
const std::string &fieldName,
const bsoncxx::types::bson_value::view &value,
Datagram &dg);
const int &divisor, Datagram &dg);

static void PackField(const DCField *field,
const bsoncxx::types::bson_value::view &value,
Expand All @@ -101,7 +103,8 @@ class DatabaseUtils {
* @return
*/
template <typename T>
static T BsonToNumber(const bsoncxx::types::bson_value::view &value) {
static T BsonToNumber(const bsoncxx::types::bson_value::view &value,
const int &divisor = 1) {
// TODO: Can we prevent a double alloc here?
int64_t i;
double d;
Expand All @@ -111,10 +114,13 @@ class DatabaseUtils {
// We've got three fundamental number types
if (value.type() == bsoncxx::type::k_int32) {
i = value.get_int32().value;
i = i * divisor;
} else if (value.type() == bsoncxx::type::k_int64) {
i = value.get_int64().value;
i = i * divisor;
} else if (value.type() == bsoncxx::type::k_double) {
d = value.get_double().value;
d = d * divisor;
isDouble = true;
} else {
throw ConversionException("Non-numeric BSON type encountered");
Expand Down
89 changes: 74 additions & 15 deletions src/messagedirector/channel_subscriber.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ namespace Ardos {
std::unordered_map<std::string, unsigned int>
ChannelSubscriber::_globalChannels =
std::unordered_map<std::string, unsigned int>();
std::map<ChannelRange, unsigned int> ChannelSubscriber::_globalRanges =
std::map<ChannelRange, unsigned int>();

ChannelSubscriber::ChannelSubscriber() {
// Fetch the global channel and our local queue.
Expand All @@ -21,29 +23,36 @@ ChannelSubscriber::ChannelSubscriber() {
MessageDirector::Instance()->AddSubscriber(this);
}

ChannelSubscriber::~ChannelSubscriber() {}

void ChannelSubscriber::Shutdown() {
MessageDirector::Instance()->RemoveSubscriber(this);

// Cleanup our local channel subscriptions.
std::unordered_set<std::string> channels(_localChannels);
for (const auto &channel : channels) {
UnsubscribeChannel(std::stoull(channel));
while (!_localChannels.empty()) {
auto channel = std::stoull(_localChannels.back());
_localChannels.pop_back();

UnsubscribeChannel(channel);
}

_localChannels.clear();
// Cleanup our local range subscriptions.
while (!_localRanges.empty()) {
auto range = _localRanges.back();
_localRanges.pop_back();

UnsubscribeRange(range.first, range.second);
}
}

void ChannelSubscriber::SubscribeChannel(const uint64_t &channel) {
std::string channelStr = std::to_string(channel);

// Don't add duplicate channels.
if (_localChannels.contains(channelStr)) {
if (std::find(_localChannels.begin(), _localChannels.end(), channelStr) !=
_localChannels.end()) {
return;
}

_localChannels.insert(channelStr);
_localChannels.push_back(channelStr);

// Next, lets check if this channel is already being listened to elsewhere.
// If it is, increment the subscriber count.
Expand All @@ -63,11 +72,13 @@ void ChannelSubscriber::UnsubscribeChannel(const uint64_t &channel) {
std::string channelStr = std::to_string(channel);

// Make sure we've subscribed to this channel.
if (!_localChannels.contains(channelStr)) {
auto position =
std::find(_localChannels.begin(), _localChannels.end(), channelStr);
if (position == _localChannels.end()) {
return;
}

_localChannels.erase(channelStr);
_localChannels.erase(position);

// We can safely assume the channel exists in a global context.
_globalChannels[channelStr]--;
Expand All @@ -80,10 +91,49 @@ void ChannelSubscriber::UnsubscribeChannel(const uint64_t &channel) {
}
}

/**
* Routes a datagram through the message director to the target channels.
* @param dg
*/
void ChannelSubscriber::SubscribeRange(const uint64_t &min,
const uint64_t &max) {
// Make sure we're not adding a duplicate range.
auto range = std::make_pair(min, max);
if (std::find(_localRanges.begin(), _localRanges.end(), range) !=
_localRanges.end()) {
return;
}

_localRanges.push_back(range);

// Next, lets check if this channel range is already being listened to
// elsewhere. If it is, increment the subscriber count.
if (_globalRanges.contains(range)) {
_globalRanges[range]++;
return;
}

// Register it as a newly opened global channel range.
_globalRanges[range] = 1;
}

void ChannelSubscriber::UnsubscribeRange(const uint64_t &min,
const uint64_t &max) {
auto range = std::make_pair(min, max);

auto position = std::find(_localRanges.begin(), _localRanges.end(), range);
if (position == _localRanges.end()) {
return;
}

_localRanges.erase(position);

// We can safely assume the channel range exists in a global context.
_globalRanges[range]--;

// If we have 0 current listeners for this channel range, let RabbitMQ know we
// no longer care about it.
if (!_globalRanges[range]) {
_globalRanges.erase(range);
}
}

void ChannelSubscriber::PublishDatagram(const std::shared_ptr<Datagram> &dg) {
DatagramIterator dgi(dg);

Expand All @@ -99,12 +149,21 @@ void ChannelSubscriber::PublishDatagram(const std::shared_ptr<Datagram> &dg) {
void ChannelSubscriber::HandleUpdate(const std::string &channel,
const std::shared_ptr<Datagram> &dg) {
// First, check if this ChannelSubscriber cares about the message.
if (!_localChannels.contains(channel)) {
if (std::find(_localChannels.begin(), _localChannels.end(), channel) ==
_localChannels.end() &&
!WithinLocalRange(channel)) {
return;
}

// We do care about the message, handle it!
HandleDatagram(dg);
}

bool ChannelSubscriber::WithinLocalRange(const std::string &routingKey) {
auto channel = std::stoull(routingKey);
return std::any_of(
_localRanges.begin(), _localRanges.end(),
[channel](auto i) { return channel >= i.first && channel <= i.second; });
}

} // namespace Ardos
Loading

0 comments on commit 3d957a3

Please sign in to comment.