Skip to content

Commit

Permalink
Merge pull request #180 from bersler/fix_bugs
Browse files Browse the repository at this point in the history
enhancement: swap memory to disk in low memory situations
  • Loading branch information
bersler authored Sep 30, 2024
2 parents b8cdd73 + 212c423 commit fe92725
Show file tree
Hide file tree
Showing 32 changed files with 87 additions and 85 deletions.
2 changes: 1 addition & 1 deletion src/builder/Builder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -931,7 +931,7 @@ namespace OpenLogReplicator {
}

void Builder::processDml(typeScn scn, typeSeq sequence, time_t timestamp, LobCtx* lobCtx, const XmlCtx* xmlCtx,
std::deque<const RedoLogRecord*>& redo1, std::deque<const RedoLogRecord*>& redo2,
const std::deque<const RedoLogRecord*>& redo1, const std::deque<const RedoLogRecord*>& redo2,
uint64_t type, bool system, bool schema, bool dump) {
uint8_t fb;
typeObj obj;
Expand Down
4 changes: 2 additions & 2 deletions src/builder/Builder.h
Original file line number Diff line number Diff line change
Expand Up @@ -1330,8 +1330,8 @@ namespace OpenLogReplicator {
const RedoLogRecord* redoLogRecord2, bool system, bool schema, bool dump);
void processDeleteMultiple(typeScn scn, typeSeq sequence, time_t timestamp, LobCtx* lobCtx, const XmlCtx* xmlCtx, const RedoLogRecord* redoLogRecord1,
const RedoLogRecord* redoLogRecord2, bool system, bool schema, bool dump);
void processDml(typeScn scn, typeSeq sequence, time_t timestamp, LobCtx* lobCtx, const XmlCtx* xmlCtx, std::deque<const RedoLogRecord*>& redo1,
std::deque<const RedoLogRecord*>& redo2, uint64_t type, bool system, bool schema, bool dump);
void processDml(typeScn scn, typeSeq sequence, time_t timestamp, LobCtx* lobCtx, const XmlCtx* xmlCtx, const std::deque<const RedoLogRecord*>& redo1,
const std::deque<const RedoLogRecord*>& redo2, uint64_t type, bool system, bool schema, bool dump);
void processDdlHeader(typeScn scn, typeSeq sequence, time_t timestamp, const RedoLogRecord* redoLogRecord1);
virtual void initialize();
virtual void processCommit(typeScn scn, typeSeq sequence, time_t timestamp) = 0;
Expand Down
60 changes: 19 additions & 41 deletions src/common/Ctx.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -738,17 +738,6 @@ namespace OpenLogReplicator {
return ret;
}

uint64_t Ctx::getUsedMemory(Thread* t) const {
uint64_t ret;
{
t->contextSet(Thread::CONTEXT_MUTEX, Thread::CTX_GET_USED);
std::unique_lock<std::mutex> lck(memoryMtx);
ret = (memoryChunksAllocated - memoryChunksFree) * MEMORY_CHUNK_SIZE_MB;
}
t->contextSet(Thread::CONTEXT_CPU);
return ret;
}

uint8_t* Ctx::getMemoryChunk(Thread* t, uint64_t module, bool swap) {
uint64_t allocatedModule = 0, usedTotal = 0, allocatedTotal = 0;
uint8_t* chunk = nullptr;
Expand All @@ -773,23 +762,24 @@ namespace OpenLogReplicator {
if (!swap)
reservedChunks += memoryChunksUnswapBufferMin;

if (memoryChunksFree > reservedChunks)
break;
if (module != MEMORY_MODULE_BUILDER || memoryModulesAllocated[MEMORY_MODULE_BUILDER] < memoryChunksWriteBufferMax) {
if (memoryChunksFree > reservedChunks)
break;

if (memoryChunksAllocated < memoryChunksMax &&
(module != MEMORY_MODULE_BUILDER || memoryModulesAllocated[MEMORY_MODULE_BUILDER] < memoryChunksWriteBufferMax)) {
t->contextSet(Thread::CONTEXT_OS, Thread::REASON_OS);
memoryChunks[memoryChunksFree] = reinterpret_cast<uint8_t*>(aligned_alloc(MEMORY_ALIGNMENT, MEMORY_CHUNK_SIZE));
t->contextSet(Thread::CONTEXT_MEM, Thread::REASON_MEM);
if (unlikely(memoryChunks[memoryChunksFree] == nullptr))
throw RuntimeException(10016, "couldn't allocate " + std::to_string(MEMORY_CHUNK_SIZE_MB) +
" bytes memory for: " + memoryModules[module]);
++memoryChunksFree;
allocatedTotal = ++memoryChunksAllocated;

if (memoryChunksAllocated > memoryChunksHWM)
memoryChunksHWM = memoryChunksAllocated;
break;
if (memoryChunksAllocated < memoryChunksMax) {
t->contextSet(Thread::CONTEXT_OS, Thread::REASON_OS);
memoryChunks[memoryChunksFree] = reinterpret_cast<uint8_t*>(aligned_alloc(MEMORY_ALIGNMENT, MEMORY_CHUNK_SIZE));
t->contextSet(Thread::CONTEXT_MEM, Thread::REASON_MEM);
if (unlikely(memoryChunks[memoryChunksFree] == nullptr))
throw RuntimeException(10016, "couldn't allocate " + std::to_string(MEMORY_CHUNK_SIZE_MB) +
" bytes memory for: " + memoryModules[module]);
++memoryChunksFree;
allocatedTotal = ++memoryChunksAllocated;

if (memoryChunksAllocated > memoryChunksHWM)
memoryChunksHWM = memoryChunksAllocated;
break;
}
}

if (module == MEMORY_MODULE_PARSER)
Expand Down Expand Up @@ -935,12 +925,6 @@ namespace OpenLogReplicator {
throw RuntimeException(50070, "swap chunk not found for xid: " + xid.toString() + " during memory get");
SwapChunk* sc = it->second;

if (index == sc->lockedChunk) {
sc->breakLock = true;
while (index == sc->lockedChunk)
chunksTransaction.wait(lck);
}

while (!hardShutdown) {
if (index < sc->swappedMin || index > sc->swappedMax) {
t->contextSet(Thread::CONTEXT_CPU);
Expand Down Expand Up @@ -1008,21 +992,15 @@ namespace OpenLogReplicator {
throw RuntimeException(50070, "swap chunk not found for xid: " + xid.toString() + " during memory shrink");
sc = it->second;
tc = sc->chunks.back();

if (static_cast<int64_t>(sc->chunks.size() - 1) == sc->lockedChunk) {
sc->breakLock = true;
while (static_cast<int64_t>(sc->chunks.size() - 1) == sc->lockedChunk)
chunksTransaction.wait(lck);
}
sc->chunks.pop_back();
}

freeMemoryChunk(t, Ctx::MEMORY_MODULE_TRANSACTIONS, tc);

{
t->contextSet(Thread::CONTEXT_MUTEX, Thread::CTX_SWAPPED_SHRINK2);
std::unique_lock<std::mutex> lck(swapMtx);
sc->chunks.pop_back();
if (sc->chunks.size() <= 0) {
if (sc->chunks.size() == 0) {
t->contextSet(Thread::CONTEXT_CPU);
return nullptr;
}
Expand Down
4 changes: 1 addition & 3 deletions src/common/Ctx.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,9 @@ namespace OpenLogReplicator {
int64_t swappedMax;
int64_t lockedChunk;
bool release;
bool breakLock;

SwapChunk() :
swappedMin(-1), swappedMax(-1), lockedChunk(-1), release(false), breakLock(false) {};
swappedMin(-1), swappedMax(-1), lockedChunk(-1), release(false) {};
};


Expand Down Expand Up @@ -624,7 +623,6 @@ namespace OpenLogReplicator {
[[nodiscard]] uint64_t getMemoryHWM() const;
[[nodiscard]] uint64_t getAllocatedMemory() const;
[[nodiscard]] uint64_t getSwapMemory(Thread* t) const;
[[nodiscard]] uint64_t getUsedMemory(Thread* t) const;
[[nodiscard]] uint64_t getFreeMemory(Thread* t) const;
[[nodiscard]] uint8_t* getMemoryChunk(Thread* t, uint64_t module, bool swap = false);
void freeMemoryChunk(Thread* t, uint64_t module, uint8_t* chunk);
Expand Down
29 changes: 15 additions & 14 deletions src/common/MemoryManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,6 @@ namespace OpenLogReplicator {
throw RuntimeException(50072, "swap file: " + fileName + " - unswapping: " + std::to_string(index) + " not in range " +
std::to_string(sc->swappedMin) + "-" + std::to_string(sc->swappedMax));
}
contextSet(CONTEXT_CPU);
}

bool MemoryManager::swap(typeXid xid, int64_t index) {
Expand All @@ -326,8 +325,13 @@ namespace OpenLogReplicator {
return false;
}

sc->lockedChunk = index;
tc = sc->chunks[index];

sc->swappedMax = index;
if (sc->swappedMin == -1)
sc->swappedMin = sc->swappedMax;

sc->chunks[index] = nullptr;
}
contextSet(CONTEXT_CPU);

Expand Down Expand Up @@ -365,21 +369,18 @@ namespace OpenLogReplicator {
contextSet(CONTEXT_MUTEX, MEMORY_SWAP2);
std::unique_lock<std::mutex> lck(ctx->swapMtx);

sc->lockedChunk = -1;
if (sc->release || sc->breakLock) {
sc->breakLock = false;
if (sc->swappedMax == -1)
if (ctx->swappedShrinkXid == xid) {
sc->chunks[index] = tc;

if (sc->swappedMax == 0) {
sc->swappedMin = sc->swappedMax = -1;
remove = true;
else
} else {
--sc->swappedMax;
truncateSize = (sc->swappedMax + 1) * Ctx::MEMORY_CHUNK_SIZE;
} else {
sc->swappedMax = index;
if (sc->swappedMin == -1)
sc->swappedMin = sc->swappedMax;

sc->chunks[index] = nullptr;
}
ctx->chunksTransaction.notify_all();
}
ctx->chunksTransaction.notify_all();
}
contextSet(CONTEXT_CPU);

Expand Down
2 changes: 2 additions & 0 deletions src/common/exception/BootException.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ namespace OpenLogReplicator {
explicit BootException(int newCode, const std::string& newMsg);
explicit BootException(int newCode, const char* newMsg);
virtual ~BootException();
BootException(const BootException&) = delete;
BootException& operator=(const BootException&) = delete;

friend std::ostream& operator<<(std::ostream& os, const BootException& exception);
};
Expand Down
2 changes: 2 additions & 0 deletions src/common/exception/ConfigurationException.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ namespace OpenLogReplicator {
explicit ConfigurationException(int newCode, const std::string& newMsg);
explicit ConfigurationException(int newCode, const char* newMsg);
~ConfigurationException() override;
ConfigurationException(const ConfigurationException&) = delete;
ConfigurationException& operator=(const ConfigurationException&) = delete;

friend std::ostream& operator<<(std::ostream& os, const ConfigurationException& exception);
};
Expand Down
2 changes: 2 additions & 0 deletions src/common/exception/DataException.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ namespace OpenLogReplicator {
explicit DataException(int newCode, const std::string& newMsg);
explicit DataException(int newCode, const char* newMsg);
~DataException() override;
DataException(const DataException&) = delete;
DataException& operator=(const DataException&) = delete;

friend std::ostream& operator<<(std::ostream& os, const DataException& exception);
};
Expand Down
2 changes: 2 additions & 0 deletions src/common/exception/NetworkException.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ namespace OpenLogReplicator {
explicit NetworkException(int newCode, const std::string& newMsg);
explicit NetworkException(int newCode, const char* newMsg);
~NetworkException() override;
NetworkException(const NetworkException&) = delete;
NetworkException& operator=(const NetworkException&) = delete;

friend std::ostream& operator<<(std::ostream& os, const NetworkException& exception);
};
Expand Down
2 changes: 2 additions & 0 deletions src/common/exception/RedoLogException.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ namespace OpenLogReplicator {
explicit RedoLogException(int newCode, std::string newMsg);
explicit RedoLogException(int newCode, const char* newMsg);
~RedoLogException() override;
RedoLogException(const RedoLogException&) = delete;
RedoLogException& operator=(const RedoLogException&) = delete;

friend std::ostream& operator<<(std::ostream& os, const RedoLogException& exception);
};
Expand Down
2 changes: 2 additions & 0 deletions src/common/exception/RuntimeException.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ namespace OpenLogReplicator {
explicit RuntimeException(int newCode, const std::string& newMsg, int newSupCode = 0);
explicit RuntimeException(int newCode, const char* newMsg, int newSupCode = 0);
virtual ~RuntimeException();
RuntimeException(const RuntimeException&) = delete;
RuntimeException& operator=(const RuntimeException&) = delete;

friend std::ostream& operator<<(std::ostream& os, const RuntimeException& exception);
};
Expand Down
1 change: 0 additions & 1 deletion src/common/metrics/Metrics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ along with OpenLogReplicator; see the file LICENSE; If not see
<http:////www.gnu.org/licenses/>. */

#include <fstream>
#include <thread>
#include <unistd.h>

#include "Metrics.h"
Expand Down
10 changes: 5 additions & 5 deletions src/common/table/SysCCol.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,13 @@ along with OpenLogReplicator; see the file LICENSE; If not see
namespace OpenLogReplicator {
class SysCColKey final {
public:
SysCColKey(typeObj newObj, typeCol newIntCol, typeCon newCon) :
SysCColKey(typeObj newObj, typeCon newCon, typeCol newIntCol) :
obj(newObj),
intCol(newIntCol),
con(newCon) {
con(newCon),
intCol(newIntCol) {
}

bool operator<(const SysCColKey& other) const {
bool operator<(const SysCColKey other) const {
if (obj < other.obj)
return true;
if (other.obj < obj)
Expand All @@ -48,8 +48,8 @@ namespace OpenLogReplicator {
}

typeObj obj;
typeCol intCol;
typeCon con;
typeCol intCol;
};

class SysCCol final {
Expand Down
2 changes: 2 additions & 0 deletions src/locales/CharacterSetJA16EUCTILDE.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ namespace OpenLogReplicator {
public:
CharacterSetJA16EUCTILDE();
~CharacterSetJA16EUCTILDE() override;
CharacterSetJA16EUCTILDE(const CharacterSetJA16EUCTILDE&) = delete;
CharacterSetJA16EUCTILDE& operator=(const CharacterSetJA16EUCTILDE&) = delete;
};
}

Expand Down
2 changes: 2 additions & 0 deletions src/locales/CharacterSetZHS32GB18030.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ namespace OpenLogReplicator {
public:
CharacterSetZHS32GB18030();
~CharacterSetZHS32GB18030() override;
CharacterSetZHS32GB18030(const CharacterSetZHS32GB18030&) = delete;
CharacterSetZHS32GB18030& operator=(const CharacterSetZHS32GB18030&) = delete;

virtual typeUnicode decode(const Ctx* ctx, typeXid xid, const uint8_t*& str, uint64_t& length) const override;
};
Expand Down
2 changes: 2 additions & 0 deletions src/locales/CharacterSetZHT32EUC.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ namespace OpenLogReplicator {
public:
CharacterSetZHT32EUC();
~CharacterSetZHT32EUC() override;
CharacterSetZHT32EUC(const CharacterSetZHT32EUC&) = delete;
CharacterSetZHT32EUC& operator=(const CharacterSetZHT32EUC&) = delete;

virtual typeUnicode decode(const Ctx* ctx, typeXid xid, const uint8_t*& str, uint64_t& length) const override;
};
Expand Down
6 changes: 3 additions & 3 deletions src/metadata/Schema.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1148,7 +1148,7 @@ namespace OpenLogReplicator {
", OBJ#: " + std::to_string(sysCCol->obj) +
", SPARE1: " + sysCCol->spare1.toString() + ")");

SysCColKey sysCColKey(sysCCol->obj, sysCCol->intCol, sysCCol->con);
SysCColKey sysCColKey(sysCCol->obj, sysCCol->con, sysCCol->intCol);
auto sysCColMapKeyIt = sysCColMapKey.find(sysCColKey);
if (unlikely(sysCColMapKeyIt != sysCColMapKey.end()))
throw DataException(50024, "duplicate SYS.CCOL$ value for unique (OBJ#: " + std::to_string(sysCCol->obj) + ", INTCOL#: " +
Expand Down Expand Up @@ -1554,7 +1554,7 @@ namespace OpenLogReplicator {
return;
sysCColMapRowId.erase(sysCColMapRowIdIt);

SysCColKey sysCColKey(sysCCol->obj, sysCCol->intCol, sysCCol->con);
SysCColKey sysCColKey(sysCCol->obj, sysCCol->con, sysCCol->intCol);
auto sysCColMapKeyIt = sysCColMapKey.find(sysCColKey);
if (sysCColMapKeyIt != sysCColMapKey.end())
sysCColMapKey.erase(sysCColMapKeyIt);
Expand Down Expand Up @@ -2666,7 +2666,7 @@ namespace OpenLogReplicator {
}
}

SysCColKey sysCColKeyFirst(sysObj->obj, sysCol->intCol, 0);
SysCColKey sysCColKeyFirst(sysObj->obj, 0, sysCol->intCol);
for (auto sysCColMapKeyIt = sysCColMapKey.upper_bound(sysCColKeyFirst);
sysCColMapKeyIt != sysCColMapKey.end() && sysCColMapKeyIt->first.obj == sysObj->obj && sysCColMapKeyIt->first.intCol == sysCol->intCol;
++sysCColMapKeyIt) {
Expand Down
2 changes: 2 additions & 0 deletions src/metadata/SerializerJson.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ namespace OpenLogReplicator {
public:
SerializerJson();
~SerializerJson() override;
SerializerJson(const SerializerJson&) = delete;
SerializerJson& operator=(const SerializerJson&) = delete;

[[nodiscard]] bool deserialize(Metadata* metadata, const std::string& ss, const std::string& fileName, std::vector<std::string>& msgs,
bool loadMetadata, bool storeSchema) override;
Expand Down
2 changes: 1 addition & 1 deletion src/parser/OpCode.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1897,7 +1897,7 @@ namespace OpenLogReplicator {
tablespaceUndo = " No";
}

const char* userOnly(" No");
const char* userOnly;
if ((redoLogRecord->flg & FLG_USERONLY) != 0)
userOnly = "Yes";
else {
Expand Down
2 changes: 1 addition & 1 deletion src/parser/OpCode0501.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -403,7 +403,7 @@ namespace OpenLogReplicator {
}
}

void OpCode0501::kdilk(const Ctx* ctx, RedoLogRecord* redoLogRecord, typePos fieldPos, typeSize fieldSize) {
void OpCode0501::kdilk(const Ctx* ctx, const RedoLogRecord* redoLogRecord, typePos fieldPos, typeSize fieldSize) {
if (unlikely(fieldSize < 20))
throw RedoLogException(50061, "too short field kdilk: " + std::to_string(fieldSize) + " offset: " +
std::to_string(redoLogRecord->dataOffset));
Expand Down
2 changes: 1 addition & 1 deletion src/parser/OpCode0501.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ namespace OpenLogReplicator {
static void init(const Ctx* ctx, RedoLogRecord* redoLogRecord);
static void ktudb(const Ctx* ctx, RedoLogRecord* redoLogRecord, typePos fieldPos, typeSize fieldSize);
static void kteoputrn(const Ctx* ctx, RedoLogRecord* redoLogRecord, typePos fieldPos, typeSize fieldSize);
static void kdilk(const Ctx* ctx, RedoLogRecord* redoLogRecord, typePos fieldPos, typeSize fieldSize);
static void kdilk(const Ctx* ctx, const RedoLogRecord* redoLogRecord, typePos fieldPos, typeSize fieldSize);
static void rowDeps(const Ctx* ctx, RedoLogRecord* redoLogRecord, typePos fieldPos, typeSize fieldSize);
static void suppLog(Ctx* ctx, RedoLogRecord* redoLogRecord, typeField& fieldNum, typePos& fieldPos, typeSize& fieldSize);
static void opc0A16(const Ctx* ctx, RedoLogRecord* redoLogRecord, typeField& fieldNum, typePos& fieldPos, typeSize& fieldSize);
Expand Down
2 changes: 1 addition & 1 deletion src/parser/OpCode0513.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ along with OpenLogReplicator; see the file LICENSE; If not see
#include "Transaction.h"

namespace OpenLogReplicator {
void OpCode0513::attribute(const Ctx* ctx, RedoLogRecord* redoLogRecord, typePos fieldPos, typeSize fieldSize, const char* header,
void OpCode0513::attribute(const Ctx* ctx, const RedoLogRecord* redoLogRecord, typePos fieldPos, typeSize fieldSize, const char* header,
const char* name, Transaction* transaction) {
std::string value(reinterpret_cast<const char*>(redoLogRecord->data() + fieldPos), fieldSize);
if (value != "")
Expand Down
2 changes: 1 addition & 1 deletion src/parser/OpCode0513.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ namespace OpenLogReplicator {

class OpCode0513 : public OpCode {
protected:
static void attribute(const Ctx* ctx, RedoLogRecord* redoLogRecord, typePos fieldPos, typeSize fieldSize, const char* header,
static void attribute(const Ctx* ctx, const RedoLogRecord* redoLogRecord, typePos fieldPos, typeSize fieldSize, const char* header,
const char* name, Transaction* transaction);
static void attributeSessionSerial(const Ctx* ctx, RedoLogRecord* redoLogRecord, typePos fieldPos, typeSize fieldSize, Transaction* transaction);
static void attributeFlags(const Ctx* ctx, RedoLogRecord* redoLogRecord, typePos fieldPos, typeSize fieldSize, Transaction* transaction);
Expand Down
2 changes: 1 addition & 1 deletion src/parser/OpCode0A08.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ namespace OpenLogReplicator {
dumpMemory(ctx, redoLogRecord, fieldPos, fieldSize);
}

void OpCode0A08::kdxln(const Ctx* ctx, RedoLogRecord* redoLogRecord, typePos fieldPos, typeSize fieldSize) {
void OpCode0A08::kdxln(const Ctx* ctx, const RedoLogRecord* redoLogRecord, typePos fieldPos, typeSize fieldSize) {
if (fieldSize < 16) {
ctx->warning(70001, "too short field kdxln: " + std::to_string(fieldSize) + " offset: " +
std::to_string(redoLogRecord->dataOffset));
Expand Down
Loading

0 comments on commit fe92725

Please sign in to comment.