From 3d957a39cb012a5a837fe69c5b148b260da67577 Mon Sep 17 00:00:00 2001 From: Kylie Smith Date: Wed, 4 Oct 2023 22:23:23 +1000 Subject: [PATCH] Database State Server (#15) --- config.example.yml | 14 + src/database/database_utils.cpp | 64 ++-- src/database/database_utils.h | 14 +- src/messagedirector/channel_subscriber.cpp | 89 ++++- src/messagedirector/channel_subscriber.h | 19 +- src/messagedirector/message_director.cpp | 14 +- src/messagedirector/message_director.h | 3 + src/net/datagram.cpp | 3 +- src/stateserver/database_state_server.cpp | 325 ++++++++++++++++-- src/stateserver/database_state_server.h | 42 ++- src/stateserver/distributed_object.cpp | 31 +- src/stateserver/distributed_object.h | 23 +- src/stateserver/loading_object.cpp | 207 ++++++++++- src/stateserver/loading_object.h | 51 ++- src/stateserver/state_server.cpp | 4 +- src/stateserver/state_server.h | 7 +- src/stateserver/state_server_implementation.h | 15 + src/util/globals.h | 2 + 18 files changed, 838 insertions(+), 89 deletions(-) create mode 100644 src/stateserver/state_server_implementation.h diff --git a/config.example.yml b/config.example.yml index 0147a0d..1ed7504 100644 --- a/config.example.yml +++ b/config.example.yml @@ -17,6 +17,9 @@ want-client-agent: true # Do we want to handle database operations on this instance? want-database: true +# Do we want database-backed distributed objects to live on this instance? +want-db-state-server: true + # Do we want metrics collection on this instance? want-metrics: true @@ -87,3 +90,14 @@ database-server: generate: min: 100000000 max: 399999999 + +# Database State Server configuration. +db-state-server: + # The channel of the database server we should use for querying. + database: 4003 + + # The range of DoId's we should be listening for. + # Notice how this matches our database server generate range. + ranges: + min: 100000000 + max: 399999999 diff --git a/src/database/database_utils.cpp b/src/database/database_utils.cpp index 58d57a8..ee6c2fc 100644 --- a/src/database/database_utils.cpp +++ b/src/database/database_utils.cpp @@ -100,6 +100,22 @@ bool DatabaseUtils::UnpackFields(DatagramIterator &dgi, void DatabaseUtils::FieldToBson( bsoncxx::builder::stream::single_context builder, DCPacker &packer) { + // Check if we have an atomic field. + // If we do, recursively unpack into an array. + auto atomicField = packer.get_current_field()->as_field()->as_atomic_field(); + if (atomicField != nullptr) { + auto arrayBuilder = builder << bsoncxx::builder::stream::open_array; + + packer.push(); + while (packer.more_nested_fields()) { + FieldToBson(arrayBuilder, packer); + } + packer.pop(); + + arrayBuilder << bsoncxx::builder::stream::close_array; + return; + } + // Unpack the field into a bson format. // Note that this function can be recursively called with certain pack types. DCPackType packType = packer.get_pack_type(); @@ -192,37 +208,37 @@ void DatabaseUtils::FieldToBson( void DatabaseUtils::BsonToField(const DCSubatomicType &fieldType, const std::string &fieldName, const bsoncxx::types::bson_value::view &value, - Datagram &dg) { + const int &divisor, Datagram &dg) { try { switch (fieldType) { case ST_invalid: throw ConversionException("Got invalid field type"); case ST_int8: - dg.AddInt8(BsonToNumber(value)); + dg.AddInt8(BsonToNumber(value, divisor)); break; case ST_int16: - dg.AddInt16(BsonToNumber(value)); + dg.AddInt16(BsonToNumber(value, divisor)); break; case ST_int32: - dg.AddInt32(BsonToNumber(value)); + dg.AddInt32(BsonToNumber(value, divisor)); break; case ST_int64: - dg.AddInt64(BsonToNumber(value)); + dg.AddInt64(BsonToNumber(value, divisor)); break; case ST_uint8: - dg.AddUint8(BsonToNumber(value)); + dg.AddUint8(BsonToNumber(value, divisor)); break; case ST_uint16: - dg.AddUint16(BsonToNumber(value)); + dg.AddUint16(BsonToNumber(value, divisor)); break; case ST_uint32: - dg.AddUint32(BsonToNumber(value)); + dg.AddUint32(BsonToNumber(value, divisor)); break; case ST_uint64: - dg.AddUint64(BsonToNumber(value)); + dg.AddUint64(BsonToNumber(value, divisor)); break; case ST_float64: - dg.AddFloat64(BsonToNumber(value)); + dg.AddFloat64(BsonToNumber(value, divisor)); break; case ST_string: if (value.type() != bsoncxx::type::k_string) { @@ -244,7 +260,7 @@ void DatabaseUtils::BsonToField(const DCSubatomicType &fieldType, Datagram arrDg; for (const auto &it : value.get_array().value) { - arrDg.AddInt16(BsonToNumber(it.get_value())); + arrDg.AddInt16(BsonToNumber(it.get_value(), divisor)); } dg.AddBlob(arrDg.GetData(), arrDg.Size()); @@ -257,7 +273,7 @@ void DatabaseUtils::BsonToField(const DCSubatomicType &fieldType, Datagram arrDg; for (const auto &it : value.get_array().value) { - arrDg.AddInt32(BsonToNumber(it.get_value())); + arrDg.AddInt32(BsonToNumber(it.get_value(), divisor)); } dg.AddBlob(arrDg.GetData(), arrDg.Size()); @@ -270,7 +286,7 @@ void DatabaseUtils::BsonToField(const DCSubatomicType &fieldType, Datagram arrDg; for (const auto &it : value.get_array().value) { - arrDg.AddUint16(BsonToNumber(it.get_value())); + arrDg.AddUint16(BsonToNumber(it.get_value(), divisor)); } dg.AddBlob(arrDg.GetData(), arrDg.Size()); @@ -283,7 +299,7 @@ void DatabaseUtils::BsonToField(const DCSubatomicType &fieldType, Datagram arrDg; for (const auto &it : value.get_array().value) { - arrDg.AddUint32(BsonToNumber(it.get_value())); + arrDg.AddUint32(BsonToNumber(it.get_value(), divisor)); } dg.AddBlob(arrDg.GetData(), arrDg.Size()); @@ -296,7 +312,7 @@ void DatabaseUtils::BsonToField(const DCSubatomicType &fieldType, Datagram arrDg; for (const auto &it : value.get_array().value) { - arrDg.AddInt8(BsonToNumber(it.get_value())); + arrDg.AddInt8(BsonToNumber(it.get_value(), divisor)); } dg.AddBlob(arrDg.GetData(), arrDg.Size()); @@ -309,7 +325,7 @@ void DatabaseUtils::BsonToField(const DCSubatomicType &fieldType, Datagram arrDg; for (const auto &it : value.get_array().value) { - arrDg.AddUint8(BsonToNumber(it.get_value())); + arrDg.AddUint8(BsonToNumber(it.get_value(), divisor)); } dg.AddBlob(arrDg.GetData(), arrDg.Size()); @@ -321,10 +337,10 @@ void DatabaseUtils::BsonToField(const DCSubatomicType &fieldType, } dg.AddUint16(value.get_array().value.length()); for (size_t i = 0; i < value.get_array().value.length();) { - dg.AddUint32( - BsonToNumber(value.get_array().value[i].get_value())); - dg.AddUint8( - BsonToNumber(value.get_array().value[i + 1].get_value())); + dg.AddUint32(BsonToNumber( + value.get_array().value[i].get_value(), divisor)); + dg.AddUint8(BsonToNumber( + value.get_array().value[i + 1].get_value(), divisor)); i += 2; } break; @@ -352,7 +368,8 @@ void DatabaseUtils::PackField(const DCField *field, // Unpack each sub-element that composes the field. auto numFields = atomicField->get_num_elements(); for (int i = 0; i < numFields; i++) { - PackField(atomicField->get_element(i), value, dg); + PackField(atomicField->get_element(i), + value.get_array().value[i].get_value(), dg); } return; } @@ -364,7 +381,7 @@ void DatabaseUtils::PackField(const DCField *field, auto fieldSimple = fieldParameter->as_simple_parameter(); if (fieldSimple != nullptr) { DatabaseUtils::BsonToField(fieldSimple->get_type(), field->get_name(), - value, dg); + value, fieldSimple->get_divisor(), dg); } // Do we have a class field type? @@ -387,7 +404,8 @@ void DatabaseUtils::PackField(const DCField *field, Datagram arrDg; for (const auto &arrVal : value.get_array().value) { DatabaseUtils::BsonToField(fieldType, field->get_name(), - arrVal.get_value(), arrDg); + arrVal.get_value(), + elemParamSimple->get_divisor(), arrDg); } dg.AddBlob(arrDg.GetData(), arrDg.Size()); diff --git a/src/database/database_utils.h b/src/database/database_utils.h index d8bd8c6..2e9d443 100644 --- a/src/database/database_utils.h +++ b/src/database/database_utils.h @@ -1,6 +1,9 @@ #ifndef ARDOS_DATABASE_UTILS_H #define ARDOS_DATABASE_UTILS_H +#include /* modf */ +#include + #include #include #include @@ -8,11 +11,10 @@ #include #include "../net/datagram_iterator.h" +#include "../util/globals.h" namespace Ardos { -typedef std::map> FieldMap; - /** * Exception thrown by many database utility functions. */ @@ -75,7 +77,7 @@ class DatabaseUtils { static void BsonToField(const DCSubatomicType &fieldType, const std::string &fieldName, const bsoncxx::types::bson_value::view &value, - Datagram &dg); + const int &divisor, Datagram &dg); static void PackField(const DCField *field, const bsoncxx::types::bson_value::view &value, @@ -101,7 +103,8 @@ class DatabaseUtils { * @return */ template - static T BsonToNumber(const bsoncxx::types::bson_value::view &value) { + static T BsonToNumber(const bsoncxx::types::bson_value::view &value, + const int &divisor = 1) { // TODO: Can we prevent a double alloc here? int64_t i; double d; @@ -111,10 +114,13 @@ class DatabaseUtils { // We've got three fundamental number types if (value.type() == bsoncxx::type::k_int32) { i = value.get_int32().value; + i = i * divisor; } else if (value.type() == bsoncxx::type::k_int64) { i = value.get_int64().value; + i = i * divisor; } else if (value.type() == bsoncxx::type::k_double) { d = value.get_double().value; + d = d * divisor; isDouble = true; } else { throw ConversionException("Non-numeric BSON type encountered"); diff --git a/src/messagedirector/channel_subscriber.cpp b/src/messagedirector/channel_subscriber.cpp index 74de7ec..e5b2dda 100644 --- a/src/messagedirector/channel_subscriber.cpp +++ b/src/messagedirector/channel_subscriber.cpp @@ -12,6 +12,8 @@ namespace Ardos { std::unordered_map ChannelSubscriber::_globalChannels = std::unordered_map(); +std::map ChannelSubscriber::_globalRanges = + std::map(); ChannelSubscriber::ChannelSubscriber() { // Fetch the global channel and our local queue. @@ -21,29 +23,36 @@ ChannelSubscriber::ChannelSubscriber() { MessageDirector::Instance()->AddSubscriber(this); } -ChannelSubscriber::~ChannelSubscriber() {} - void ChannelSubscriber::Shutdown() { MessageDirector::Instance()->RemoveSubscriber(this); // Cleanup our local channel subscriptions. - std::unordered_set channels(_localChannels); - for (const auto &channel : channels) { - UnsubscribeChannel(std::stoull(channel)); + while (!_localChannels.empty()) { + auto channel = std::stoull(_localChannels.back()); + _localChannels.pop_back(); + + UnsubscribeChannel(channel); } - _localChannels.clear(); + // Cleanup our local range subscriptions. + while (!_localRanges.empty()) { + auto range = _localRanges.back(); + _localRanges.pop_back(); + + UnsubscribeRange(range.first, range.second); + } } void ChannelSubscriber::SubscribeChannel(const uint64_t &channel) { std::string channelStr = std::to_string(channel); // Don't add duplicate channels. - if (_localChannels.contains(channelStr)) { + if (std::find(_localChannels.begin(), _localChannels.end(), channelStr) != + _localChannels.end()) { return; } - _localChannels.insert(channelStr); + _localChannels.push_back(channelStr); // Next, lets check if this channel is already being listened to elsewhere. // If it is, increment the subscriber count. @@ -63,11 +72,13 @@ void ChannelSubscriber::UnsubscribeChannel(const uint64_t &channel) { std::string channelStr = std::to_string(channel); // Make sure we've subscribed to this channel. - if (!_localChannels.contains(channelStr)) { + auto position = + std::find(_localChannels.begin(), _localChannels.end(), channelStr); + if (position == _localChannels.end()) { return; } - _localChannels.erase(channelStr); + _localChannels.erase(position); // We can safely assume the channel exists in a global context. _globalChannels[channelStr]--; @@ -80,10 +91,49 @@ void ChannelSubscriber::UnsubscribeChannel(const uint64_t &channel) { } } -/** - * Routes a datagram through the message director to the target channels. - * @param dg - */ +void ChannelSubscriber::SubscribeRange(const uint64_t &min, + const uint64_t &max) { + // Make sure we're not adding a duplicate range. + auto range = std::make_pair(min, max); + if (std::find(_localRanges.begin(), _localRanges.end(), range) != + _localRanges.end()) { + return; + } + + _localRanges.push_back(range); + + // Next, lets check if this channel range is already being listened to + // elsewhere. If it is, increment the subscriber count. + if (_globalRanges.contains(range)) { + _globalRanges[range]++; + return; + } + + // Register it as a newly opened global channel range. + _globalRanges[range] = 1; +} + +void ChannelSubscriber::UnsubscribeRange(const uint64_t &min, + const uint64_t &max) { + auto range = std::make_pair(min, max); + + auto position = std::find(_localRanges.begin(), _localRanges.end(), range); + if (position == _localRanges.end()) { + return; + } + + _localRanges.erase(position); + + // We can safely assume the channel range exists in a global context. + _globalRanges[range]--; + + // If we have 0 current listeners for this channel range, let RabbitMQ know we + // no longer care about it. + if (!_globalRanges[range]) { + _globalRanges.erase(range); + } +} + void ChannelSubscriber::PublishDatagram(const std::shared_ptr &dg) { DatagramIterator dgi(dg); @@ -99,7 +149,9 @@ void ChannelSubscriber::PublishDatagram(const std::shared_ptr &dg) { void ChannelSubscriber::HandleUpdate(const std::string &channel, const std::shared_ptr &dg) { // First, check if this ChannelSubscriber cares about the message. - if (!_localChannels.contains(channel)) { + if (std::find(_localChannels.begin(), _localChannels.end(), channel) == + _localChannels.end() && + !WithinLocalRange(channel)) { return; } @@ -107,4 +159,11 @@ void ChannelSubscriber::HandleUpdate(const std::string &channel, HandleDatagram(dg); } +bool ChannelSubscriber::WithinLocalRange(const std::string &routingKey) { + auto channel = std::stoull(routingKey); + return std::any_of( + _localRanges.begin(), _localRanges.end(), + [channel](auto i) { return channel >= i.first && channel <= i.second; }); +} + } // namespace Ardos diff --git a/src/messagedirector/channel_subscriber.h b/src/messagedirector/channel_subscriber.h index 2219517..ac5907d 100644 --- a/src/messagedirector/channel_subscriber.h +++ b/src/messagedirector/channel_subscriber.h @@ -2,6 +2,7 @@ #define ARDOS_CHANNEL_SUBSCRIBER_H #include +#include #include @@ -9,17 +10,27 @@ namespace Ardos { +typedef std::pair ChannelRange; + class ChannelSubscriber { public: friend class MessageDirector; ChannelSubscriber(); - virtual ~ChannelSubscriber(); + virtual ~ChannelSubscriber() = default; virtual void Shutdown(); void SubscribeChannel(const uint64_t &channel); void UnsubscribeChannel(const uint64_t &channel); + + void SubscribeRange(const uint64_t &min, const uint64_t &max); + void UnsubscribeRange(const uint64_t &min, const uint64_t &max); + + /** + * Routes a datagram through the message director to the target channels. + * @param dg + */ void PublishDatagram(const std::shared_ptr &dg); protected: @@ -29,11 +40,15 @@ class ChannelSubscriber { void HandleUpdate(const std::string &channel, const std::shared_ptr &dg); + bool WithinLocalRange(const std::string &routingKey); + // A static map of globally registered channels. static std::unordered_map _globalChannels; + static std::map _globalRanges; // List of channels that this ChannelSubscriber is listening to. - std::unordered_set _localChannels; + std::vector _localChannels; + std::vector _localRanges; AMQP::Channel *_globalChannel; std::string _localQueue; diff --git a/src/messagedirector/message_director.cpp b/src/messagedirector/message_director.cpp index fd0a6fc..ea810fa 100644 --- a/src/messagedirector/message_director.cpp +++ b/src/messagedirector/message_director.cpp @@ -176,7 +176,7 @@ void MessageDirector::onReady(AMQP::Connection *connection) { #endif } - if (Config::Instance()->GetBool("want-dbss")) { + if (Config::Instance()->GetBool("want-db-state-server")) { new DatabaseStateServer(); } @@ -345,7 +345,8 @@ void MessageDirector::StartConsuming() { // First, check if we have at least one channel subscriber listening to // the channel in this cluster. if (!ChannelSubscriber::_globalChannels.contains( - message.routingkey())) { + message.routingkey()) && + !WithinGlobalRange(message.routingkey())) { return; } @@ -388,4 +389,13 @@ void MessageDirector::StartConsuming() { }); } +bool MessageDirector::WithinGlobalRange(const std::string &routingKey) { + auto channel = std::stoull(routingKey); + return std::any_of(ChannelSubscriber::_globalRanges.begin(), + ChannelSubscriber::_globalRanges.end(), [channel](auto i) { + return channel >= i.first.first && + channel <= i.first.second; + }); +} + } // namespace Ardos diff --git a/src/messagedirector/message_director.h b/src/messagedirector/message_director.h index 3483a3c..26f71ec 100644 --- a/src/messagedirector/message_director.h +++ b/src/messagedirector/message_director.h @@ -38,8 +38,11 @@ class MessageDirector : public AMQP::ConnectionHandler { MessageDirector(); void InitMetrics(); + void StartConsuming(); + static bool WithinGlobalRange(const std::string &channel); + static MessageDirector *_instance; std::unordered_set _subscribers; diff --git a/src/net/datagram.cpp b/src/net/datagram.cpp index f568fca..f621a41 100644 --- a/src/net/datagram.cpp +++ b/src/net/datagram.cpp @@ -40,9 +40,8 @@ Datagram::~Datagram() { delete[] _buf; } * Good for re-using datagrams rather than re-alloc. */ void Datagram::Clear() { - // Wipe out the buffer without deleting it. + // Wipe out the buffer offset without deleting it. // This should prevent redundant re-sizing. - std::fill(_buf, _buf + _bufOffset, 0); _bufOffset = 0; } diff --git a/src/stateserver/database_state_server.cpp b/src/stateserver/database_state_server.cpp index a0aaae6..dd73c69 100644 --- a/src/stateserver/database_state_server.cpp +++ b/src/stateserver/database_state_server.cpp @@ -4,6 +4,7 @@ #include "../util/config.h" #include "../util/logger.h" +#include "loading_object.h" namespace Ardos { @@ -12,39 +13,35 @@ DatabaseStateServer::DatabaseStateServer() : ChannelSubscriber() { // Database State Server configuration. auto config = Config::Instance()->GetNode("db-state-server"); - if (!config["channel"]) { - Logger::Error("[DBSS] Missing or invalid channel!"); + if (!config["database"]) { + Logger::Error("[DBSS] Missing or invalid database channel!"); exit(1); } - // Start listening to our channel. - _channel = config["channel"].as(); - SubscribeChannel(_channel); + // Start listening to our broadcast channel. SubscribeChannel(BCHAN_STATESERVERS); // Database channel. _dbChannel = config["database"].as(); - auto rangeParam = config["range"]; - auto min = rangeParam["min"].as(); - auto max = rangeParam["max"].as(); + auto rangeParam = config["ranges"]; + auto min = rangeParam["min"].as(); + auto max = rangeParam["max"].as(); // Start listening to DoId's in our listening range. SubscribeRange(min, max); } -/** - * Subscribe to the DoId's of all database objects within our range. - * TODO: Evaluate the performance of this. We cannot subscribe to a *range* of - * channels exactly due to RabbitMQ wildcard limitations. - */ -void DatabaseStateServer::SubscribeRange(const uint32_t &min, - const uint32_t &max) { - // TODO: It might be more efficient to get a list of existing DoId's in the - // database by sending a message to the DB server. - for (auto i = min; i < max; i++) { - SubscribeChannel(i); - } +void DatabaseStateServer::ReceiveObject(DistributedObject *distObj) { + _distObjs[distObj->GetDoId()] = distObj; +} + +void DatabaseStateServer::RemoveDistributedObject(const uint32_t &doId) { + _distObjs.erase(doId); +} + +void DatabaseStateServer::DiscardLoader(const uint32_t &doId) { + _loadObjs.erase(doId); } void DatabaseStateServer::HandleDatagram(const std::shared_ptr &dg) { @@ -63,6 +60,30 @@ void DatabaseStateServer::HandleDatagram(const std::shared_ptr &dg) { case DBSS_OBJECT_ACTIVATE_WITH_DEFAULTS_OTHER: HandleActivate(dgi, true); break; + case DBSS_OBJECT_DELETE_DISK: + HandleDeleteDisk(dgi, sender); + break; + case STATESERVER_OBJECT_SET_FIELD: + case STATESERVER_OBJECT_SET_FIELDS: + HandleSetField(dgi, msgType == STATESERVER_OBJECT_SET_FIELDS); + break; + case STATESERVER_OBJECT_GET_FIELD: + case STATESERVER_OBJECT_GET_FIELDS: + HandleGetField(dgi, sender, msgType == STATESERVER_OBJECT_GET_FIELDS); + break; + case DBSERVER_OBJECT_GET_FIELD_RESP: + case DBSERVER_OBJECT_GET_FIELDS_RESP: + HandleGetFieldResp(dgi, msgType == DBSERVER_OBJECT_GET_FIELDS_RESP); + break; + case STATESERVER_OBJECT_GET_ALL: + HandleGetAll(dgi, sender); + break; + case DBSERVER_OBJECT_GET_ALL_RESP: + HandleGetAllResp(dgi); + break; + case DBSS_OBJECT_GET_ACTIVATED: + HandleGetActivated(dgi, sender); + break; default: // Hopefully we managed to unpack the sender... Logger::Verbose(std::format("[DBSS] Ignoring message: {} from sender: {}", @@ -86,8 +107,270 @@ void DatabaseStateServer::HandleActivate(DatagramIterator &dgi, return; } - if (other) { + // We're loading without any additional fields set. + if (!other) { + if (!_inactiveLoads.contains(doId)) { + _loadObjs[doId] = new LoadingObject(this, doId, parentId, zoneId); + _loadObjs[doId]->Start(); + } else { + _loadObjs[doId] = + new LoadingObject(this, doId, parentId, zoneId, _inactiveLoads[doId]); + } + return; + } + + // We have some additional fields provided with our activate. + uint16_t dcId = dgi.GetUint16(); + + // Make sure we have a valid distributed class. + DCClass *dcClass = g_dc_file->get_class(dcId); + if (!dcClass) { + Logger::Error(std::format( + "[DBSS] Received ACTIVATE_OTHER with unknown distributed class {}: {}", + doId, dcId)); + return; + } + + if (!_inactiveLoads.contains(doId)) { + _loadObjs[doId] = + new LoadingObject(this, doId, parentId, zoneId, dcClass, dgi); + _loadObjs[doId]->Start(); + } else { + _loadObjs[doId] = new LoadingObject(this, doId, parentId, zoneId, dcClass, + dgi, _inactiveLoads[doId]); + } +} + +void DatabaseStateServer::HandleDeleteDisk(DatagramIterator &dgi, + const uint64_t &sender) { + auto doId = dgi.GetUint32(); + if (_loadObjs.contains(doId)) { + // Ignore this message for now, it'll be bounced back to us + // from the loading object if it succeeds or fails at loading. + return; + } + + // If the object is loaded in memory, broadcast a delete message. + if (_distObjs.contains(doId)) { + auto distObj = _distObjs[doId]; + std::unordered_set targets; + + // Add location to broadcast. + if (distObj->GetLocation()) { + targets.insert(distObj->GetLocation()); + } + + // Add AI broadcast. + if (distObj->GetAI()) { + targets.insert(distObj->GetAI()); + } + + // Add owner to broadcast. + if (distObj->GetOwner()) { + targets.insert(distObj->GetOwner()); + } + + // Send the datagram! + auto dg = + std::make_shared(targets, sender, DBSS_OBJECT_DELETE_DISK); + dg->AddUint32(doId); + PublishDatagram(dg); + } + + // Send delete message to the database. + auto dg = + std::make_shared(_dbChannel, doId, DBSERVER_OBJECT_DELETE); + dg->AddUint32(doId); + PublishDatagram(dg); +} + +void DatabaseStateServer::HandleSetField(DatagramIterator &dgi, + const bool &multiple) { + auto doId = dgi.GetUint32(); + if (_loadObjs.contains(doId)) { + // Ignore this message for now, it'll be bounced back to us + // from the loading object if it succeeds or fails at loading. + return; + } + + auto fieldCount = multiple ? dgi.GetUint16() : 1; + + auto responseType = + multiple ? DBSERVER_OBJECT_SET_FIELDS : DBSERVER_OBJECT_SET_FIELD; + + FieldMap objectFields; + for (size_t i = 0; i < fieldCount; ++i) { + auto fieldId = dgi.GetUint16(); + + auto field = g_dc_file->get_field_by_index(fieldId); + if (!field) { + Logger::Warn(std::format("[DBSS] Distributed object: {} received set " + "field(s) with invalid field id: {}", + doId, fieldId)); + continue; + } + + if (field->is_db()) { + dgi.UnpackField(field, objectFields[field]); + } else { + dgi.SkipField(field); + } } + + // We didn't unpack any fields for the database. + if (objectFields.empty()) { + return; + } + + auto dg = std::make_shared(_dbChannel, doId, responseType); + dg->AddUint32(doId); + if (multiple) { + dg->AddUint16(objectFields.size()); + } + for (const auto &it : objectFields) { + dg->AddUint16(it.first->get_number()); + dg->AddData(it.second); + } + PublishDatagram(dg); +} + +void DatabaseStateServer::HandleGetField(DatagramIterator &dgi, + const uint64_t &sender, + const bool &multiple) { + auto ctx = dgi.GetUint32(); + auto doId = dgi.GetUint32(); + + if (_distObjs.contains(doId) || _loadObjs.contains(doId)) { + return; + } + + auto fieldCount = multiple ? dgi.GetUint16() : 1; + + auto responseType = multiple ? STATESERVER_OBJECT_GET_FIELDS_RESP + : STATESERVER_OBJECT_GET_FIELD_RESP; + + std::vector dbFields; + std::vector ramFields; + for (size_t i = 0; i < fieldCount; ++i) { + auto fieldId = dgi.GetUint16(); + + auto field = g_dc_file->get_field_by_index(fieldId); + if (!field) { + auto dg = std::make_shared(sender, doId, responseType); + dg->AddUint32(ctx); + dg->AddBool(false); + PublishDatagram(dg); + return; + } + + if (field->is_required() || field->is_ram()) { + if (field->is_db()) { + dbFields.push_back(field); + } else { + ramFields.push_back(field); + } + } + } + + if (!dbFields.empty()) { + // Get a new database context. + auto dbCtx = _nextContext++; + + // Prepare response datagram. + auto dg = std::make_shared(sender, doId, responseType); + dg->AddUint32(ctx); + dg->AddBool(true); + if (multiple) { + dg->AddUint16(ramFields.size() + dbFields.size()); + } + for (const auto &field : ramFields) { + dg->AddUint16(field->get_number()); + dg->AddData(field->get_default_value()); + } + + _contextDatagrams[dbCtx] = dg; + + // Send query off to the database. + auto dbDg = std::make_shared( + _dbChannel, doId, + multiple ? DBSERVER_OBJECT_GET_FIELDS : DBSERVER_OBJECT_GET_FIELD); + dbDg->AddUint32(dbCtx); + dbDg->AddUint32(doId); + if (multiple) { + dbDg->AddUint16(dbFields.size()); + } + for (const auto &field : dbFields) { + dg->AddUint16(field->get_number()); + } + PublishDatagram(dbDg); + } else if (!ramFields.empty() && ramFields.back()->has_default_value()) { + // If no database fields exist, and we have a RAM field with a default + // value... + auto dg = std::make_shared(sender, doId, responseType); + dg->AddUint32(ctx); + dg->AddBool(true); + if (multiple) { + dg->AddUint16(ramFields.size()); + } + for (const auto &field : ramFields) { + dg->AddUint16(field->get_number()); + dg->AddData(field->get_default_value()); + } + PublishDatagram(dg); + } else { + // Otherwise, return false. + auto dg = std::make_shared(sender, doId, responseType); + dg->AddUint32(ctx); + dg->AddBool(false); + PublishDatagram(dg); + } +} + +void DatabaseStateServer::HandleGetFieldResp(DatagramIterator &dgi, + const bool &multiple) {} + +void DatabaseStateServer::HandleGetAll(DatagramIterator &dgi, + const uint64_t &sender) {} + +void DatabaseStateServer::HandleGetAllResp(DatagramIterator &dgi) {} + +void DatabaseStateServer::HandleGetActivated(DatagramIterator &dgi, + const uint64_t &sender) { + auto ctx = dgi.GetUint32(); + auto doId = dgi.GetUint32(); + + // An object is considered active if it's in memory as a distributed object. + // If it doesn't exist, or is loading, return false. + auto dg = + std::make_shared(sender, doId, DBSS_OBJECT_GET_ACTIVATED_RESP); + dg->AddUint32(ctx); + dg->AddUint32(doId); + dg->AddBool(_distObjs.contains(doId)); + PublishDatagram(dg); +} + +bool UnpackDBFields(DatagramIterator &dgi, DCClass *dclass, FieldMap &required, + FieldMap &ram) { + // Unload RAM and REQUIRED fields from database response. + auto fieldCount = dgi.GetUint16(); + for (size_t i = 0; i < fieldCount; ++i) { + auto fieldId = dgi.GetUint16(); + + auto field = dclass->get_field_by_index(fieldId); + if (!field) { + return false; + } + + if (field->is_required()) { + dgi.UnpackField(field, required[field]); + } else if (field->is_ram()) { + dgi.UnpackField(field, ram[field]); + } else { + dgi.SkipField(field); + } + } + + return true; } } // namespace Ardos diff --git a/src/stateserver/database_state_server.h b/src/stateserver/database_state_server.h index 6ac27da..3aadcbb 100644 --- a/src/stateserver/database_state_server.h +++ b/src/stateserver/database_state_server.h @@ -3,27 +3,55 @@ #include "../messagedirector/channel_subscriber.h" #include "../net/datagram_iterator.h" +#include "../util/globals.h" #include "distributed_object.h" -#include "loading_object.h" namespace Ardos { -class DatabaseStateServer : public ChannelSubscriber { +bool UnpackDBFields(DatagramIterator &dgi, DCClass *dclass, FieldMap &required, + FieldMap &ram); + +class LoadingObject; + +class DatabaseStateServer : public StateServerImplementation, + public ChannelSubscriber { public: + friend class LoadingObject; + DatabaseStateServer(); -private: - void SubscribeRange(const uint32_t &min, const uint32_t &max); + void RemoveDistributedObject(const uint32_t &doId) override; +private: void HandleDatagram(const std::shared_ptr &dg) override; + void ReceiveObject(DistributedObject *distObj); + void DiscardLoader(const uint32_t &doId); + void HandleActivate(DatagramIterator &dgi, const bool &other); + void HandleDeleteDisk(DatagramIterator &dgi, const uint64_t &sender); + void HandleSetField(DatagramIterator &dgi, const bool &multiple); + + void HandleGetField(DatagramIterator &dgi, const uint64_t &sender, + const bool &multiple); + void HandleGetFieldResp(DatagramIterator &dgi, const bool &multiple); + + void HandleGetAll(DatagramIterator &dgi, const uint64_t &sender); + void HandleGetAllResp(DatagramIterator &dgi); + + void HandleGetActivated(DatagramIterator &dgi, const uint64_t &sender); - uint64_t _channel; uint64_t _dbChannel; - std::unordered_map> _distObjs; - std::unordered_map> _loadObjs; + std::unordered_map _distObjs; + std::unordered_map _loadObjs; + + // DoId -> Request context + std::unordered_map> _inactiveLoads; + + std::unordered_map> _contextDatagrams; + + uint32_t _nextContext = 0; }; } // namespace Ardos diff --git a/src/stateserver/distributed_object.cpp b/src/stateserver/distributed_object.cpp index fb78370..004f317 100644 --- a/src/stateserver/distributed_object.cpp +++ b/src/stateserver/distributed_object.cpp @@ -10,7 +10,7 @@ namespace Ardos { -DistributedObject::DistributedObject(StateServer *stateServer, +DistributedObject::DistributedObject(StateServerImplementation *stateServer, const uint32_t &doId, const uint32_t &parentId, const uint32_t &zoneId, DCClass *dclass, @@ -61,6 +61,25 @@ DistributedObject::DistributedObject(StateServer *stateServer, WakeChildren(); } +DistributedObject::DistributedObject(StateServerImplementation *stateServer, + const uint64_t &sender, + const uint32_t &doId, + const uint32_t &parentId, + const uint32_t &zoneId, DCClass *dclass, + FieldMap &reqFields, FieldMap &ramFields) + : ChannelSubscriber(), _stateServer(stateServer), _doId(doId), + _parentId(INVALID_DO_ID), _zoneId(INVALID_DO_ID), _dclass(dclass), + _requiredFields(reqFields), _ramFields(ramFields) { + SubscribeChannel(_doId); + + Logger::Verbose( + std::format("[SS] Distributed Object: '{}' generated with DoId: {}", + _dclass->get_name(), _doId)); + + HandleLocationChange(parentId, zoneId, sender); + WakeChildren(); +} + size_t DistributedObject::Size() const { size_t objectSize{0}; @@ -78,6 +97,14 @@ uint64_t DistributedObject::GetAI() const { return _aiChannel; } bool DistributedObject::IsAIExplicitlySet() const { return _aiExplicitlySet; } +uint32_t DistributedObject::GetDoId() const { return _doId; } + +uint64_t DistributedObject::GetLocation() const { + return LocationAsChannel(_parentId, _zoneId); +} + +uint64_t DistributedObject::GetOwner() const { return _ownerChannel; } + void DistributedObject::Annihilate(const uint64_t &sender, const bool ¬ifyParent) { std::unordered_set targets; @@ -110,6 +137,8 @@ void DistributedObject::Annihilate(const uint64_t &sender, DeleteChildren(sender); _stateServer->RemoveDistributedObject(_doId); + ChannelSubscriber::Shutdown(); + Logger::Verbose(std::format("[SS] Distributed Object: '{}' deleted.", _doId)); } diff --git a/src/stateserver/distributed_object.h b/src/stateserver/distributed_object.h index 947669c..7ab1172 100644 --- a/src/stateserver/distributed_object.h +++ b/src/stateserver/distributed_object.h @@ -4,21 +4,34 @@ #include #include "../net/message_types.h" +#include "../util/globals.h" #include "state_server.h" namespace Ardos { class DistributedObject : public ChannelSubscriber { public: - DistributedObject(StateServer *stateServer, const uint32_t &doId, + friend class LoadingObject; + + DistributedObject(StateServerImplementation *stateServer, + const uint32_t &doId, const uint32_t &parentId, + const uint32_t &zoneId, DCClass *dclass, + DatagramIterator &dgi, const bool &other); + DistributedObject(StateServerImplementation *stateServer, + const uint64_t &sender, const uint32_t &doId, const uint32_t &parentId, const uint32_t &zoneId, - DCClass *dclass, DatagramIterator &dgi, const bool &other); + DCClass *dclass, FieldMap &reqFields, FieldMap &ramFields); [[nodiscard]] size_t Size() const; [[nodiscard]] uint64_t GetAI() const; [[nodiscard]] bool IsAIExplicitlySet() const; + [[nodiscard]] uint32_t GetDoId() const; + + [[nodiscard]] uint64_t GetLocation() const; + [[nodiscard]] uint64_t GetOwner() const; + private: void Annihilate(const uint64_t &sender, const bool ¬ifyParent = true); void DeleteChildren(const uint64_t &sender); @@ -50,14 +63,14 @@ class DistributedObject : public ChannelSubscriber { const bool &succeedIfUnset = false, const bool &isSubfield = false); - StateServer *_stateServer; + StateServerImplementation *_stateServer; uint32_t _doId; uint32_t _parentId; uint32_t _zoneId; DCClass *_dclass; - std::unordered_map> _requiredFields; - std::map> _ramFields; + FieldMap _requiredFields; + FieldMap _ramFields; std::unordered_map> _zoneObjects; diff --git a/src/stateserver/loading_object.cpp b/src/stateserver/loading_object.cpp index af2ec53..574d3b3 100644 --- a/src/stateserver/loading_object.cpp +++ b/src/stateserver/loading_object.cpp @@ -1,3 +1,208 @@ #include "loading_object.h" -namespace Ardos {} // namespace Ardos \ No newline at end of file +#include + +#include "../util/logger.h" + +namespace Ardos { + +LoadingObject::LoadingObject(DatabaseStateServer *stateServer, + const uint32_t &doId, const uint32_t &parentId, + const uint32_t &zoneId, + const std::unordered_set &contexts) + : ChannelSubscriber(), _stateServer(stateServer), _doId(doId), + _parentId(parentId), _zoneId(zoneId), + _context(stateServer->_nextContext++), _validContexts(contexts) { + SubscribeChannel(doId); +} + +LoadingObject::LoadingObject(DatabaseStateServer *stateServer, + const uint32_t &doId, const uint32_t &parentId, + const uint32_t &zoneId, DCClass *dclass, + DatagramIterator &dgi, + const std::unordered_set &contexts) + : ChannelSubscriber(), _stateServer(stateServer), _doId(doId), + _parentId(parentId), _zoneId(zoneId), + _context(stateServer->_nextContext++), _validContexts(contexts), + _dclass(dclass) { + SubscribeChannel(doId); + + // Unpack the RAM fields we received in the generate message. + auto fieldCount = dgi.GetUint16(); + for (size_t i = 0; i < fieldCount; ++i) { + auto fieldId = dgi.GetUint16(); + + auto field = _dclass->get_field_by_index(fieldId); + if (!field) { + Logger::Error(std::format("[DBSS] Loading object: {} received invalid " + "field index on generate: {}", + _doId, fieldId)); + return; + } + + if (field->is_ram() || field->is_required()) { + dgi.UnpackField(field, _fieldUpdates[field]); + } else { + Logger::Error(std::format( + "[DBSS] Loading object: {} received non-RAM field on generate: {}", + _doId, field->get_name())); + } + } +} + +void LoadingObject::Start() { + if (!_validContexts.size()) { + // Fetch our stored fields from the database. + auto dg = std::make_shared(_stateServer->_dbChannel, _doId, + DBSERVER_OBJECT_GET_ALL); + dg->AddUint32(_context); + dg->AddUint32(_doId); + PublishDatagram(dg); + } +} + +void LoadingObject::HandleDatagram(const std::shared_ptr &dg) { + DatagramIterator dgi(dg); + + // Skip MD routing headers. + dgi.SeekPayload(); + + try { + uint64_t sender = dgi.GetUint64(); + uint16_t msgType = dgi.GetUint16(); + switch (msgType) { + case DBSERVER_OBJECT_GET_ALL_RESP: { + if (_isLoaded) { + break; + } + + // Make sure the context from the database is valid. + uint32_t context = dgi.GetUint32(); + if (context != _context && !_validContexts.contains(context)) { + Logger::Warn(std::format("[DBSS] Loading object: {} received " + "GET_ALL_RESP with invalid context: {}", + _doId, context)); + break; + } + + Logger::Verbose(std::format( + "[DBSS] Loading object: {} received GET_ALL_RESP", _doId)); + _isLoaded = true; + + if (!dgi.GetBool()) { + Logger::Verbose(std::format( + "[DBSS] Loading object: {} was not found in database", _doId)); + Finalize(); + break; + } + + uint16_t dcId = dgi.GetUint16(); + auto dcClass = g_dc_file->get_class(dcId); + if (!dcClass) { + Logger::Error(std::format("[DBSS] Loading object: {} received invalid " + "dclass from database: {}", + _doId, dcId)); + Finalize(); + break; + } + + // Make sure both dclass's match if we were supplied with one. + if (_dclass && dcClass != _dclass) { + Logger::Error(std::format( + "[DBSS] Loading object: {} received mismatched dclass: {} - {}", + _doId, _dclass->get_name(), dcClass->get_name())); + Finalize(); + break; + } + + // Unpack fields from database. + if (!UnpackDBFields(dgi, dcClass, _requiredFields, _ramFields)) { + Logger::Error(std::format( + "[DBSS] Loading object: {} failed to unpack fields from database.", + _doId)); + Finalize(); + break; + } + + // Add default values and update values. + auto numFields = dcClass->get_num_inherited_fields(); + for (size_t i = 0; i < numFields; ++i) { + auto field = dcClass->get_inherited_field(i); + if (!field->as_molecular_field()) { + if (field->is_required()) { + if (_fieldUpdates.contains(field)) { + _requiredFields[field] = _fieldUpdates[field]; + } else if (!_requiredFields.contains(field)) { + _requiredFields[field] = field->get_default_value(); + } + } else if (field->is_ram()) { + if (_fieldUpdates.contains(field)) { + _ramFields[field] = _fieldUpdates[field]; + } + } + } + } + + // Create object on state server. + auto distObj = new DistributedObject( + _stateServer, _stateServer->_dbChannel, _doId, _parentId, _zoneId, + dcClass, _requiredFields, _ramFields); + + // Tell DBSS about object and handle datagram queue. + _stateServer->ReceiveObject(distObj); + ReplayDatagrams(distObj); + + // Cleanup this loader. + Finalize(); + break; + } + case DBSS_OBJECT_ACTIVATE_WITH_DEFAULTS: + case DBSS_OBJECT_ACTIVATE_WITH_DEFAULTS_OTHER: + // Don't cache these messages in the queue, they are received and + // handled by the DBSS. Since the object is already loading they + // are simply ignored (the DBSS may generate a warning/error). + break; + default: + _datagramQueue.push_back(dg); + break; + } + } catch (const DatagramIteratorEOF &) { + Logger::Error(std::format( + "[DBSS] Loading object: {} received a truncated datagram!", _doId)); + } +} + +void LoadingObject::Finalize() { + _stateServer->DiscardLoader(_doId); + ForwardDatagrams(); + ChannelSubscriber::Shutdown(); +} + +void LoadingObject::ReplayDatagrams(DistributedObject *distObj) { + Logger::Verbose(std::format( + "[DBSS] Loading object: {} replaying datagrams received while loading...", + _doId)); + for (const auto &dg : _datagramQueue) { + if (!_stateServer->_distObjs.contains(_doId)) { + Logger::Verbose( + std::format("[DBSS] Deleted while replaying, aborting...", _doId)); + return; + } + + distObj->HandleDatagram(dg); + } + + Logger::Verbose(std::format("[DBSS] Replay finished.", _doId)); +} + +void LoadingObject::ForwardDatagrams() { + Logger::Verbose(std::format("[DBSS] Loading object: {} forwarding datagrams " + "received while loading...", + _doId)); + for (const auto &dg : _datagramQueue) { + _stateServer->HandleDatagram(dg); + } + Logger::Verbose(std::format("[DBSS] Finished forwarding.", _doId)); +} + +} // namespace Ardos \ No newline at end of file diff --git a/src/stateserver/loading_object.h b/src/stateserver/loading_object.h index 39d47c1..8e9aaa3 100644 --- a/src/stateserver/loading_object.h +++ b/src/stateserver/loading_object.h @@ -1,9 +1,58 @@ #ifndef ARDOS_LOADING_OBJECT_H #define ARDOS_LOADING_OBJECT_H +#include + +#include "../messagedirector/channel_subscriber.h" +#include "../net/datagram_iterator.h" +#include "../util/globals.h" +#include "database_state_server.h" + namespace Ardos { -class LoadingObject {}; +class LoadingObject : public ChannelSubscriber { +public: + friend class DatabaseStateServer; + + LoadingObject(DatabaseStateServer *stateServer, const uint32_t &doId, + const uint32_t &parentId, const uint32_t &zoneId, + const std::unordered_set &contexts = + std::unordered_set()); + LoadingObject(DatabaseStateServer *stateServer, const uint32_t &doId, + const uint32_t &parentId, const uint32_t &zoneId, + DCClass *dclass, DatagramIterator &dgi, + const std::unordered_set &contexts = + std::unordered_set()); + + void Start(); + +private: + void HandleDatagram(const std::shared_ptr &dg) override; + + void Finalize(); + + void ReplayDatagrams(DistributedObject *distObj); + void ForwardDatagrams(); + + DatabaseStateServer *_stateServer; + + uint32_t _doId; + uint32_t _parentId; + uint32_t _zoneId; + + uint32_t _context; + std::unordered_set _validContexts; + + DCClass *_dclass = nullptr; + + bool _isLoaded = false; + + FieldMap _fieldUpdates; + FieldMap _requiredFields; + FieldMap _ramFields; + + std::vector> _datagramQueue; +}; } // namespace Ardos diff --git a/src/stateserver/state_server.cpp b/src/stateserver/state_server.cpp index 3f11bdd..5d68768 100644 --- a/src/stateserver/state_server.cpp +++ b/src/stateserver/state_server.cpp @@ -90,8 +90,8 @@ void StateServer::HandleGenerate(DatagramIterator &dgi, const bool &other) { } // Create the distributed object. - _distObjs[doId] = std::make_unique( - this, doId, parentId, zoneId, dcClass, dgi, other); + _distObjs[doId] = + new DistributedObject(this, doId, parentId, zoneId, dcClass, dgi, other); if (_objectsGauge) { _objectsGauge->Increment(); diff --git a/src/stateserver/state_server.h b/src/stateserver/state_server.h index 46e7b76..ba90b28 100644 --- a/src/stateserver/state_server.h +++ b/src/stateserver/state_server.h @@ -9,16 +9,17 @@ #include "../messagedirector/channel_subscriber.h" #include "../net/datagram.h" #include "../net/datagram_iterator.h" +#include "state_server_implementation.h" namespace Ardos { class DistributedObject; -class StateServer : public ChannelSubscriber { +class StateServer : public StateServerImplementation, public ChannelSubscriber { public: StateServer(); - void RemoveDistributedObject(const uint32_t &doId); + void RemoveDistributedObject(const uint32_t &doId) override; private: void HandleDatagram(const std::shared_ptr &dg) override; @@ -28,7 +29,7 @@ class StateServer : public ChannelSubscriber { void InitMetrics(); uint64_t _channel; - std::unordered_map> _distObjs; + std::unordered_map _distObjs; prometheus::Gauge *_objectsGauge = nullptr; prometheus::Histogram *_objectsSizeHistogram = nullptr; diff --git a/src/stateserver/state_server_implementation.h b/src/stateserver/state_server_implementation.h new file mode 100644 index 0000000..6f5f92b --- /dev/null +++ b/src/stateserver/state_server_implementation.h @@ -0,0 +1,15 @@ +#ifndef ARDOS_STATE_SERVER_IMPLEMENTATION_H +#define ARDOS_STATE_SERVER_IMPLEMENTATION_H + +#include + +namespace Ardos { + +class StateServerImplementation { +public: + virtual void RemoveDistributedObject(const uint32_t &doId) = 0; +}; + +} // namespace Ardos + +#endif // ARDOS_STATE_SERVER_IMPLEMENTATION_H diff --git a/src/util/globals.h b/src/util/globals.h index fef0094..93cc5c4 100644 --- a/src/util/globals.h +++ b/src/util/globals.h @@ -18,6 +18,8 @@ extern DCFile *g_dc_file; extern std::thread::id g_main_thread_id; extern std::shared_ptr g_loop; +typedef std::map> FieldMap; + } // namespace Ardos #endif // ARDOS_GLOBALS_H