Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Multi jar and memory leak fix for java 11 #42

Open
wants to merge 12 commits into
base: master
Choose a base branch
from
13 changes: 13 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,16 @@
/.settings
/.classpath
/.project
.idea/.gitignore
.idea/codeStyles/
.idea/compiler.xml
.idea/encodings.xml
.idea/jarRepositories.xml
.idea/libraries/
.idea/misc.xml
.idea/modules.xml
.idea/sbt.xml
.idea/sonarlint/
.idea/vcs.xml
.java-version
bigqueue.iml
50 changes: 35 additions & 15 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>com.leansoft</groupId>
<groupId>org.clojars.nitishgoyal13</groupId>
<artifactId>bigqueue</artifactId>
<version>0.7.2</version>
<version>0.7.6</version>
<packaging>jar</packaging>

<name>bigqueue</name>
Expand Down Expand Up @@ -37,15 +37,10 @@

<distributionManagement>
<repository>
<id>github.release.repo</id>
<name>Release Repository</name>
<url>file:///dev/bulldog-repo/repo/releases</url>
<id>clojars</id>
<name>Clojars repository</name>
<url>https://clojars.org/repo</url>
</repository>
<snapshotRepository>
<id>github.snapshot.repo</id>
<name>Snapshot Repository</name>
<url>file:///dev/bulldog-repo/repo/snapshots</url>
</snapshotRepository>
</distributionManagement>

<properties>
Expand Down Expand Up @@ -101,11 +96,33 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.6</source>
<target>1.6</target>
<encoding>UTF-8</encoding>
</configuration>
<version>3.8.1</version>
<executions>
<execution>
<id>compile-java-8</id>
<goals>
<goal>compile</goal>
</goals>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</execution>
<execution>
<id>compile-java-11</id>
<phase>compile</phase>
<goals>
<goal>compile</goal>
</goals>
<configuration>
<release>11</release>
<compileSourceRoots>
<compileSourceRoot>${project.basedir}/src/main/java11</compileSourceRoot>
</compileSourceRoots>
<outputDirectory>${project.build.outputDirectory}/META-INF/versions/11</outputDirectory>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
Expand Down Expand Up @@ -134,6 +151,9 @@
<configuration>
<archive>
<manifestFile>src/main/resources/META-INF/MANIFEST.MF</manifestFile>
<manifestEntries>
<Multi-Release>true</Multi-Release>
</manifestEntries>
</archive>
</configuration>
</plugin>
Expand Down
37 changes: 19 additions & 18 deletions src/main/java/com/leansoft/bigqueue/page/MappedPageImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,51 +3,52 @@
import java.io.Closeable;
import java.io.IOException;
import java.lang.reflect.Method;
import java.nio.Buffer;
import java.nio.ByteBuffer;
import java.nio.MappedByteBuffer;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MappedPageImpl implements IMappedPage, Closeable {

private final static Logger logger = LoggerFactory.getLogger(MappedPageImpl.class);

private ThreadLocalByteBuffer threadLocalBuffer;
private volatile boolean dirty = false;
private volatile boolean closed = false;
private String pageFile;
private long index;

public MappedPageImpl(MappedByteBuffer mbb, String pageFile, long index) {
this.threadLocalBuffer = new ThreadLocalByteBuffer(mbb);
this.pageFile = pageFile;
this.index = index;
}

public void close() throws IOException {
synchronized(this) {
if (closed) return;

flush();

MappedByteBuffer srcBuf = (MappedByteBuffer)threadLocalBuffer.getSourceBuffer();
unmap(srcBuf);

this.threadLocalBuffer = null; // hint GC

closed = true;
if (logger.isDebugEnabled()) {
logger.debug("Mapped page for " + this.pageFile + " was just unmapped and closed.");
}
}
}

@Override
public void setDirty(boolean dirty) {
this.dirty = dirty;
}

@Override
public void flush() {
synchronized(this) {
Expand All @@ -69,19 +70,19 @@ public byte[] getLocal(int position, int length) {
buf.get(data);
return data;
}

@Override
public ByteBuffer getLocal(int position) {
ByteBuffer buf = this.threadLocalBuffer.get();
buf.position(position);
((Buffer)buf).position(position);
return buf;
}

private static void unmap(MappedByteBuffer buffer)
{
Cleaner.clean(buffer);
}

/**
* Helper class allowing to clean direct buffers.
*/
Expand Down Expand Up @@ -120,18 +121,18 @@ public static void clean(ByteBuffer buffer) {
}
}
}

private static class ThreadLocalByteBuffer extends ThreadLocal<ByteBuffer> {
private ByteBuffer _src;

public ThreadLocalByteBuffer(ByteBuffer src) {
_src = src;
}

public ByteBuffer getSourceBuffer() {
return _src;
}

@Override
protected synchronized ByteBuffer initialValue() {
ByteBuffer dup = _src.duplicate();
Expand All @@ -143,7 +144,7 @@ protected synchronized ByteBuffer initialValue() {
public boolean isClosed() {
return closed;
}

public String toString() {
return "Mapped page for " + this.pageFile + ", index = " + this.index + ".";
}
Expand Down
150 changes: 150 additions & 0 deletions src/main/java11/com/leansoft/bigqueue/page/MappedPageImpl.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
package com.leansoft.bigqueue.page;

import java.io.Closeable;
import java.io.IOException;
import java.lang.reflect.Field;
import java.nio.ByteBuffer;
import java.nio.MappedByteBuffer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import sun.misc.Unsafe;

public class MappedPageImpl implements IMappedPage, Closeable {

private final static Logger logger = LoggerFactory.getLogger(MappedPageImpl.class);

private ThreadLocalByteBuffer threadLocalBuffer;
private volatile boolean dirty = false;
private volatile boolean closed = false;
private String pageFile;
private long index;

public MappedPageImpl(MappedByteBuffer mbb, String pageFile, long index) {
this.threadLocalBuffer = new ThreadLocalByteBuffer(mbb);
this.pageFile = pageFile;
this.index = index;
}

public void close() throws IOException {
synchronized(this) {
if (closed) return;

flush();

MappedByteBuffer srcBuf = (MappedByteBuffer)threadLocalBuffer.getSourceBuffer();
unmap(srcBuf);

this.threadLocalBuffer = null; // hint GC

closed = true;
if (logger.isDebugEnabled()) {
logger.debug("Mapped page for " + this.pageFile + " was just unmapped and closed.");
}
}
}

@Override
public void setDirty(boolean dirty) {
this.dirty = dirty;
}

@Override
public void flush() {
synchronized(this) {
if (closed) return;
if (dirty) {
MappedByteBuffer srcBuf = (MappedByteBuffer)threadLocalBuffer.getSourceBuffer();
srcBuf.force(); // flush the changes
dirty = false;
if (logger.isDebugEnabled()) {
logger.debug("Mapped page for " + this.pageFile + " was just flushed.");
}
}
}
}

@Override
public ByteBuffer getLocal(int position) {
ByteBuffer buf = this.threadLocalBuffer.get();
buf.position(position);
return buf;
}

public byte[] getLocal(int position, int length) {
ByteBuffer buf = this.getLocal(position);
byte[] data = new byte[length];
buf.get(data);
return data;
}

private static void unmap(MappedByteBuffer buffer)
{
Cleaner.clean(buffer);
}

/**
* Helper class allowing to clean direct buffers.
*/
private static class Cleaner {

private static Unsafe unsafe;

static {
try {
Field f = Unsafe.class.getDeclaredField("theUnsafe");
f.setAccessible(true);
unsafe = (Unsafe) f.get(null);
} catch (Exception e) {
logger.warn("Unsafe Not support {}", e.getMessage(), e);
}
}

public static void clean(ByteBuffer buffer) {
if (buffer == null) return;
if (buffer.isDirect()) {
if (unsafe != null) {
unsafe.invokeCleaner(buffer);
} else {
logger.warn("Unable to clean bytebuffer");
}
}
}
}

private static class ThreadLocalByteBuffer extends ThreadLocal<ByteBuffer> {
private ByteBuffer _src;

public ThreadLocalByteBuffer(ByteBuffer src) {
_src = src;
}

public ByteBuffer getSourceBuffer() {
return _src;
}

@Override
protected synchronized ByteBuffer initialValue() {
ByteBuffer dup = _src.duplicate();
return dup;
}
}

@Override
public boolean isClosed() {
return closed;
}

public String toString() {
return "Mapped page for " + this.pageFile + ", index = " + this.index + ".";
}

@Override
public String getPageFile() {
return this.pageFile;
}

@Override
public long getPageIndex() {
return this.index;
}
}