From fc4ac39fb51a340be208b9832b49dccc4df5c5ba Mon Sep 17 00:00:00 2001 From: Nitish Goyal Date: Thu, 7 Oct 2021 09:13:23 +0530 Subject: [PATCH 01/12] Cleanup memory on cleanup --- .../bigqueue/page/MappedPageImpl.java | 41 ++++++++----------- 1 file changed, 16 insertions(+), 25 deletions(-) diff --git a/src/main/java/com/leansoft/bigqueue/page/MappedPageImpl.java b/src/main/java/com/leansoft/bigqueue/page/MappedPageImpl.java index 6fb1249..c204cbd 100644 --- a/src/main/java/com/leansoft/bigqueue/page/MappedPageImpl.java +++ b/src/main/java/com/leansoft/bigqueue/page/MappedPageImpl.java @@ -1,14 +1,15 @@ package com.leansoft.bigqueue.page; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import sun.misc.Unsafe; + import java.io.Closeable; import java.io.IOException; -import java.lang.reflect.Method; +import java.lang.reflect.Field; import java.nio.ByteBuffer; import java.nio.MappedByteBuffer; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - public class MappedPageImpl implements IMappedPage, Closeable { private final static Logger logger = LoggerFactory.getLogger(MappedPageImpl.class); @@ -86,36 +87,26 @@ private static void unmap(MappedByteBuffer buffer) * Helper class allowing to clean direct buffers. */ private static class Cleaner { - public static final boolean CLEAN_SUPPORTED; - private static final Method directBufferCleaner; - private static final Method directBufferCleanerClean; + + private static Unsafe unsafe; static { - Method directBufferCleanerX = null; - Method directBufferCleanerCleanX = null; - boolean v; try { - directBufferCleanerX = Class.forName("java.nio.DirectByteBuffer").getMethod("cleaner"); - directBufferCleanerX.setAccessible(true); - directBufferCleanerCleanX = Class.forName("sun.misc.Cleaner").getMethod("clean"); - directBufferCleanerCleanX.setAccessible(true); - v = true; + Field f = Unsafe.class.getDeclaredField("theUnsafe"); + f.setAccessible(true); + unsafe = (Unsafe) f.get(null); } catch (Exception e) { - v = false; + logger.warn("Unsafe Not support {}", e.getMessage(), e); } - CLEAN_SUPPORTED = v; - directBufferCleaner = directBufferCleanerX; - directBufferCleanerClean = directBufferCleanerCleanX; } public static void clean(ByteBuffer buffer) { if (buffer == null) return; - if (CLEAN_SUPPORTED && buffer.isDirect()) { - try { - Object cleaner = directBufferCleaner.invoke(buffer); - directBufferCleanerClean.invoke(cleaner); - } catch (Exception e) { - // silently ignore exception + if (buffer.isDirect()) { + if (unsafe != null) { + unsafe.invokeCleaner(buffer); + } else { + logger.warn("Unable to clean bytebuffer"); } } } From f52700f1261d93cb76e542cf33f4660c31f0b377 Mon Sep 17 00:00:00 2001 From: Nitish Goyal Date: Thu, 7 Oct 2021 09:46:40 +0530 Subject: [PATCH 02/12] Version Bump --- pom.xml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pom.xml b/pom.xml index e85e829..0cff475 100644 --- a/pom.xml +++ b/pom.xml @@ -2,9 +2,9 @@ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> 4.0.0 - com.leansoft + com.platform bigqueue - 0.7.2 + 0.7.3-SNAPSHOT jar bigqueue From 5fa2394f569889ed6bab4191497e44adb737be22 Mon Sep 17 00:00:00 2001 From: Nitish Goyal Date: Mon, 11 Oct 2021 11:12:25 +0530 Subject: [PATCH 03/12] Version Bump --- pom.xml | 18 ++++++------------ 1 file changed, 6 insertions(+), 12 deletions(-) diff --git a/pom.xml b/pom.xml index 0cff475..80af7c4 100644 --- a/pom.xml +++ b/pom.xml @@ -2,9 +2,9 @@ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> 4.0.0 - com.platform + com.platform.bigqueue bigqueue - 0.7.3-SNAPSHOT + 0.7.3 jar bigqueue @@ -37,15 +37,9 @@ - github.release.repo - Release Repository - file:///dev/bulldog-repo/repo/releases + clojars + https://repo.clojars.org/ - - github.snapshot.repo - Snapshot Repository - file:///dev/bulldog-repo/repo/snapshots - @@ -102,8 +96,8 @@ org.apache.maven.plugins maven-compiler-plugin - 1.6 - 1.6 + 9 + 9 UTF-8 From 8bbeaff73717e1c5e039a82de8cee344913c2e93 Mon Sep 17 00:00:00 2001 From: Nitish Goyal Date: Mon, 11 Oct 2021 11:13:41 +0530 Subject: [PATCH 04/12] Change group name --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 80af7c4..6bda544 100644 --- a/pom.xml +++ b/pom.xml @@ -2,7 +2,7 @@ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> 4.0.0 - com.platform.bigqueue + org.clojars.nitishgoyal13 bigqueue 0.7.3 jar From d17e2f2f48e8d6b4120c7db88085ecdae9fefb91 Mon Sep 17 00:00:00 2001 From: Nitish Goyal Date: Tue, 12 Oct 2021 12:40:30 +0530 Subject: [PATCH 05/12] Upstream memory leak fix --- pom.xml | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/pom.xml b/pom.xml index 6bda544..4a2ec89 100644 --- a/pom.xml +++ b/pom.xml @@ -2,7 +2,7 @@ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> 4.0.0 - org.clojars.nitishgoyal13 + com.leansoft bigqueue 0.7.3 jar @@ -37,9 +37,15 @@ - clojars - https://repo.clojars.org/ + github.release.repo + Release Repository + file:///dev/bulldog-repo/repo/releases + + github.snapshot.repo + Snapshot Repository + file:///dev/bulldog-repo/repo/snapshots + @@ -96,8 +102,8 @@ org.apache.maven.plugins maven-compiler-plugin - 9 - 9 + 11 + 11 UTF-8 From c9c5f6dc7ff4dd4dd9e4d18db6d846d6298bd9e3 Mon Sep 17 00:00:00 2001 From: Nitish Goyal Date: Tue, 23 Nov 2021 15:20:11 +0530 Subject: [PATCH 06/12] Change groupname to io.appform --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 4a2ec89..e88e6f8 100644 --- a/pom.xml +++ b/pom.xml @@ -2,7 +2,7 @@ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> 4.0.0 - com.leansoft + io.appform.bigqueue bigqueue 0.7.3 jar From 91e8a74c8cdcad66e9c4cb83d8e603ca8322d72f Mon Sep 17 00:00:00 2001 From: Nitish Goyal Date: Thu, 25 Nov 2021 18:26:49 +0530 Subject: [PATCH 07/12] Make it multi module --- pom.xml | 50 ++++-- .../bigqueue/page/MappedPageImpl.java | 150 ++++++++++++++++++ 2 files changed, 185 insertions(+), 15 deletions(-) create mode 100644 src/main/java11/com/leansoft/bigqueue/page/MappedPageImpl.java diff --git a/pom.xml b/pom.xml index e88e6f8..e755a52 100644 --- a/pom.xml +++ b/pom.xml @@ -2,9 +2,9 @@ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> 4.0.0 - io.appform.bigqueue + org.clojars.nitishgoyal13 bigqueue - 0.7.3 + 0.7.4-SNAPSHOT jar bigqueue @@ -37,15 +37,10 @@ - github.release.repo - Release Repository - file:///dev/bulldog-repo/repo/releases + clojars + Clojars repository + https://clojars.org/repo - - github.snapshot.repo - Snapshot Repository - file:///dev/bulldog-repo/repo/snapshots - @@ -101,11 +96,33 @@ org.apache.maven.plugins maven-compiler-plugin - - 11 - 11 - UTF-8 - + 3.8.1 + + + compile-java-8 + + compile + + + 1.8 + 1.8 + + + + compile-java-11 + compile + + compile + + + 11 + + ${project.basedir}/src/main/java11 + + ${project.build.outputDirectory}/META-INF/versions/11 + + + org.apache.maven.plugins @@ -134,6 +151,9 @@ src/main/resources/META-INF/MANIFEST.MF + + true + diff --git a/src/main/java11/com/leansoft/bigqueue/page/MappedPageImpl.java b/src/main/java11/com/leansoft/bigqueue/page/MappedPageImpl.java new file mode 100644 index 0000000..0accfd3 --- /dev/null +++ b/src/main/java11/com/leansoft/bigqueue/page/MappedPageImpl.java @@ -0,0 +1,150 @@ +package com.leansoft.bigqueue.page; + +import java.io.Closeable; +import java.io.IOException; +import java.lang.reflect.Field; +import java.nio.ByteBuffer; +import java.nio.MappedByteBuffer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import sun.misc.Unsafe; + +public class MappedPageImpl implements IMappedPage, Closeable { + + private final static Logger logger = LoggerFactory.getLogger(MappedPageImpl.class); + + private ThreadLocalByteBuffer threadLocalBuffer; + private volatile boolean dirty = false; + private volatile boolean closed = false; + private String pageFile; + private long index; + + public MappedPageImpl(MappedByteBuffer mbb, String pageFile, long index) { + this.threadLocalBuffer = new ThreadLocalByteBuffer(mbb); + this.pageFile = pageFile; + this.index = index; + } + + public void close() throws IOException { + synchronized(this) { + if (closed) return; + + flush(); + + MappedByteBuffer srcBuf = (MappedByteBuffer)threadLocalBuffer.getSourceBuffer(); + unmap(srcBuf); + + this.threadLocalBuffer = null; // hint GC + + closed = true; + if (logger.isDebugEnabled()) { + logger.debug("Mapped page for " + this.pageFile + " was just unmapped and closed."); + } + } + } + + @Override + public void setDirty(boolean dirty) { + this.dirty = dirty; + } + + @Override + public void flush() { + synchronized(this) { + if (closed) return; + if (dirty) { + MappedByteBuffer srcBuf = (MappedByteBuffer)threadLocalBuffer.getSourceBuffer(); + srcBuf.force(); // flush the changes + dirty = false; + if (logger.isDebugEnabled()) { + logger.debug("Mapped page for " + this.pageFile + " was just flushed."); + } + } + } + } + + public byte[] getLocal(int position, int length) { + ByteBuffer buf = this.getLocal(position); + byte[] data = new byte[length]; + buf.get(data); + return data; + } + + @Override + public ByteBuffer getLocal(int position) { + ByteBuffer buf = this.threadLocalBuffer.get(); + buf.position(position); + return buf; + } + + private static void unmap(MappedByteBuffer buffer) + { + Cleaner.clean(buffer); + } + + /** + * Helper class allowing to clean direct buffers. + */ + private static class Cleaner { + + private static Unsafe unsafe; + + static { + try { + Field f = Unsafe.class.getDeclaredField("theUnsafe"); + f.setAccessible(true); + unsafe = (Unsafe) f.get(null); + } catch (Exception e) { + logger.warn("Unsafe Not support {}", e.getMessage(), e); + } + } + + public static void clean(ByteBuffer buffer) { + if (buffer == null) return; + if (buffer.isDirect()) { + if (unsafe != null) { + unsafe.invokeCleaner(buffer); + } else { + logger.warn("Unable to clean bytebuffer"); + } + } + } + } + + private static class ThreadLocalByteBuffer extends ThreadLocal { + private ByteBuffer _src; + + public ThreadLocalByteBuffer(ByteBuffer src) { + _src = src; + } + + public ByteBuffer getSourceBuffer() { + return _src; + } + + @Override + protected synchronized ByteBuffer initialValue() { + ByteBuffer dup = _src.duplicate(); + return dup; + } + } + + @Override + public boolean isClosed() { + return closed; + } + + public String toString() { + return "Mapped page for " + this.pageFile + ", index = " + this.index + "."; + } + + @Override + public String getPageFile() { + return this.pageFile; + } + + @Override + public long getPageIndex() { + return this.index; + } +} From e20009c0e8b62a7c7ca78db503fbe722dab6f98e Mon Sep 17 00:00:00 2001 From: Nitish Goyal Date: Thu, 25 Nov 2021 18:31:07 +0530 Subject: [PATCH 08/12] Revert code to original in java 8 version --- .../bigqueue/page/MappedPageImpl.java | 41 +++++++++++-------- 1 file changed, 25 insertions(+), 16 deletions(-) diff --git a/src/main/java/com/leansoft/bigqueue/page/MappedPageImpl.java b/src/main/java/com/leansoft/bigqueue/page/MappedPageImpl.java index c204cbd..6fb1249 100644 --- a/src/main/java/com/leansoft/bigqueue/page/MappedPageImpl.java +++ b/src/main/java/com/leansoft/bigqueue/page/MappedPageImpl.java @@ -1,15 +1,14 @@ package com.leansoft.bigqueue.page; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import sun.misc.Unsafe; - import java.io.Closeable; import java.io.IOException; -import java.lang.reflect.Field; +import java.lang.reflect.Method; import java.nio.ByteBuffer; import java.nio.MappedByteBuffer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + public class MappedPageImpl implements IMappedPage, Closeable { private final static Logger logger = LoggerFactory.getLogger(MappedPageImpl.class); @@ -87,26 +86,36 @@ private static void unmap(MappedByteBuffer buffer) * Helper class allowing to clean direct buffers. */ private static class Cleaner { - - private static Unsafe unsafe; + public static final boolean CLEAN_SUPPORTED; + private static final Method directBufferCleaner; + private static final Method directBufferCleanerClean; static { + Method directBufferCleanerX = null; + Method directBufferCleanerCleanX = null; + boolean v; try { - Field f = Unsafe.class.getDeclaredField("theUnsafe"); - f.setAccessible(true); - unsafe = (Unsafe) f.get(null); + directBufferCleanerX = Class.forName("java.nio.DirectByteBuffer").getMethod("cleaner"); + directBufferCleanerX.setAccessible(true); + directBufferCleanerCleanX = Class.forName("sun.misc.Cleaner").getMethod("clean"); + directBufferCleanerCleanX.setAccessible(true); + v = true; } catch (Exception e) { - logger.warn("Unsafe Not support {}", e.getMessage(), e); + v = false; } + CLEAN_SUPPORTED = v; + directBufferCleaner = directBufferCleanerX; + directBufferCleanerClean = directBufferCleanerCleanX; } public static void clean(ByteBuffer buffer) { if (buffer == null) return; - if (buffer.isDirect()) { - if (unsafe != null) { - unsafe.invokeCleaner(buffer); - } else { - logger.warn("Unable to clean bytebuffer"); + if (CLEAN_SUPPORTED && buffer.isDirect()) { + try { + Object cleaner = directBufferCleaner.invoke(buffer); + directBufferCleanerClean.invoke(cleaner); + } catch (Exception e) { + // silently ignore exception } } } From 3d972593c172e5f11b40b1949d30df96d56b3854 Mon Sep 17 00:00:00 2001 From: Nitish Goyal Date: Thu, 25 Nov 2021 18:34:19 +0530 Subject: [PATCH 09/12] Release version --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index e755a52..01a730c 100644 --- a/pom.xml +++ b/pom.xml @@ -4,7 +4,7 @@ org.clojars.nitishgoyal13 bigqueue - 0.7.4-SNAPSHOT + 0.7.4 jar bigqueue From c7a41f3a756b017616d5b78c45e077e2c113137f Mon Sep 17 00:00:00 2001 From: Nitish Goyal Date: Tue, 30 Nov 2021 12:32:59 +0530 Subject: [PATCH 10/12] Updated .gitignore --- .gitignore | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/.gitignore b/.gitignore index c708c36..1baf13f 100644 --- a/.gitignore +++ b/.gitignore @@ -2,3 +2,16 @@ /.settings /.classpath /.project +.idea/.gitignore +.idea/codeStyles/ +.idea/compiler.xml +.idea/encodings.xml +.idea/jarRepositories.xml +.idea/libraries/ +.idea/misc.xml +.idea/modules.xml +.idea/sbt.xml +.idea/sonarlint/ +.idea/vcs.xml +.java-version +bigqueue.iml From 4c11c1e1ccb2aa9cdde05fd923b33006d9244e36 Mon Sep 17 00:00:00 2001 From: Nitish Goyal Date: Tue, 30 Nov 2021 12:38:40 +0530 Subject: [PATCH 11/12] Version Bump --- pom.xml | 2 +- .../leansoft/bigqueue/page/MappedPageImpl.java | 16 ++++++++-------- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/pom.xml b/pom.xml index 01a730c..e04dcd7 100644 --- a/pom.xml +++ b/pom.xml @@ -4,7 +4,7 @@ org.clojars.nitishgoyal13 bigqueue - 0.7.4 + 0.7.5 jar bigqueue diff --git a/src/main/java11/com/leansoft/bigqueue/page/MappedPageImpl.java b/src/main/java11/com/leansoft/bigqueue/page/MappedPageImpl.java index 0accfd3..110c44a 100644 --- a/src/main/java11/com/leansoft/bigqueue/page/MappedPageImpl.java +++ b/src/main/java11/com/leansoft/bigqueue/page/MappedPageImpl.java @@ -63,20 +63,20 @@ public void flush() { } } - public byte[] getLocal(int position, int length) { - ByteBuffer buf = this.getLocal(position); - byte[] data = new byte[length]; - buf.get(data); - return data; - } - @Override public ByteBuffer getLocal(int position) { ByteBuffer buf = this.threadLocalBuffer.get(); buf.position(position); return buf; } - + + public byte[] getLocal(int position, int length) { + ByteBuffer buf = this.getLocal(position); + byte[] data = new byte[length]; + buf.get(data); + return data; + } + private static void unmap(MappedByteBuffer buffer) { Cleaner.clean(buffer); From 0c0a515450a1a46bf1b032a82c43488921de8925 Mon Sep 17 00:00:00 2001 From: Nitish Goyal Date: Wed, 1 Dec 2021 17:29:48 +0530 Subject: [PATCH 12/12] Make it java 8 compatible at runtime --- pom.xml | 2 +- .../bigqueue/page/MappedPageImpl.java | 37 ++++++++++--------- 2 files changed, 20 insertions(+), 19 deletions(-) diff --git a/pom.xml b/pom.xml index e04dcd7..b981e30 100644 --- a/pom.xml +++ b/pom.xml @@ -4,7 +4,7 @@ org.clojars.nitishgoyal13 bigqueue - 0.7.5 + 0.7.6 jar bigqueue diff --git a/src/main/java/com/leansoft/bigqueue/page/MappedPageImpl.java b/src/main/java/com/leansoft/bigqueue/page/MappedPageImpl.java index 6fb1249..02e8b65 100644 --- a/src/main/java/com/leansoft/bigqueue/page/MappedPageImpl.java +++ b/src/main/java/com/leansoft/bigqueue/page/MappedPageImpl.java @@ -3,6 +3,7 @@ import java.io.Closeable; import java.io.IOException; import java.lang.reflect.Method; +import java.nio.Buffer; import java.nio.ByteBuffer; import java.nio.MappedByteBuffer; @@ -10,44 +11,44 @@ import org.slf4j.LoggerFactory; public class MappedPageImpl implements IMappedPage, Closeable { - + private final static Logger logger = LoggerFactory.getLogger(MappedPageImpl.class); - + private ThreadLocalByteBuffer threadLocalBuffer; private volatile boolean dirty = false; private volatile boolean closed = false; private String pageFile; private long index; - + public MappedPageImpl(MappedByteBuffer mbb, String pageFile, long index) { this.threadLocalBuffer = new ThreadLocalByteBuffer(mbb); this.pageFile = pageFile; this.index = index; } - + public void close() throws IOException { synchronized(this) { if (closed) return; flush(); - + MappedByteBuffer srcBuf = (MappedByteBuffer)threadLocalBuffer.getSourceBuffer(); unmap(srcBuf); - + this.threadLocalBuffer = null; // hint GC - + closed = true; if (logger.isDebugEnabled()) { logger.debug("Mapped page for " + this.pageFile + " was just unmapped and closed."); } } } - + @Override public void setDirty(boolean dirty) { this.dirty = dirty; } - + @Override public void flush() { synchronized(this) { @@ -69,19 +70,19 @@ public byte[] getLocal(int position, int length) { buf.get(data); return data; } - + @Override public ByteBuffer getLocal(int position) { ByteBuffer buf = this.threadLocalBuffer.get(); - buf.position(position); + ((Buffer)buf).position(position); return buf; } - + private static void unmap(MappedByteBuffer buffer) { Cleaner.clean(buffer); } - + /** * Helper class allowing to clean direct buffers. */ @@ -120,18 +121,18 @@ public static void clean(ByteBuffer buffer) { } } } - + private static class ThreadLocalByteBuffer extends ThreadLocal { private ByteBuffer _src; - + public ThreadLocalByteBuffer(ByteBuffer src) { _src = src; } - + public ByteBuffer getSourceBuffer() { return _src; } - + @Override protected synchronized ByteBuffer initialValue() { ByteBuffer dup = _src.duplicate(); @@ -143,7 +144,7 @@ protected synchronized ByteBuffer initialValue() { public boolean isClosed() { return closed; } - + public String toString() { return "Mapped page for " + this.pageFile + ", index = " + this.index + "."; }