Skip to content

Commit

Permalink
GH-4554 Fixed performance of Changeset in combination with MemoryOver…
Browse files Browse the repository at this point in the history
…flowModel

Increase the performance for large transactions requiring overflow to disk
for NativeStore and LmdbStore.
  • Loading branch information
kenwenzel committed May 17, 2023
1 parent 74158c7 commit 2b721c2
Show file tree
Hide file tree
Showing 10 changed files with 120 additions and 130 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -416,7 +416,7 @@ public void approve(Statement statement) {
approved = createEmptyModel();
}
approved.add(statement);
approvedEmpty = approved == null || approved.isEmpty();
approvedEmpty = false;
if (statement.getContext() != null) {
if (approvedContexts == null) {
approvedContexts = new HashSet<>();
Expand Down Expand Up @@ -447,7 +447,7 @@ public void deprecate(Statement statement) {
deprecated = createEmptyModel();
}
deprecated.add(statement);
deprecatedEmpty = deprecated == null || deprecated.isEmpty();
deprecatedEmpty = false;
Resource ctx = statement.getContext();
if (approvedContexts != null && approvedContexts.contains(ctx)
&& !approved.contains(null, null, null, ctx)) {
Expand Down Expand Up @@ -833,8 +833,9 @@ void sinkApproved(SailSink sink) {

boolean readLock = readWriteLock.readLock();
try {
if (approved != null) {
sink.approveAll(approved, approvedContexts);
Model approvedLocal = approved;
if (approvedLocal != null) {
sink.approveAll(approvedLocal, approvedContexts);
}
} finally {
readWriteLock.unlockReader(readLock);
Expand All @@ -848,8 +849,9 @@ void sinkDeprecated(SailSink sink) {

boolean readLock = readWriteLock.readLock();
try {
if (deprecated != null) {
sink.deprecateAll(deprecated);
Model deprecatedLocal = deprecated;
if (deprecatedLocal != null) {
sink.deprecateAll(deprecatedLocal);
}
} finally {
readWriteLock.unlockReader(readLock);
Expand Down Expand Up @@ -885,7 +887,7 @@ public void approveAll(Set<Statement> approve, Set<Resource> approveContexts) {
approved = createEmptyModel();
}
approved.addAll(approve);
approvedEmpty = approved == null || approved.isEmpty();
approvedEmpty = false;

if (approveContexts != null) {
if (approvedContexts == null) {
Expand All @@ -912,7 +914,7 @@ public void deprecateAll(Set<Statement> deprecate) {
deprecated = createEmptyModel();
}
deprecated.addAll(deprecate);
deprecatedEmpty = deprecated == null || deprecated.isEmpty();
deprecatedEmpty = false;

for (Statement statement : deprecate) {
Resource ctx = statement.getContext();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ protected void initializeInternal() throws SailException {
FileUtils.writeStringToFile(versionFile, VERSION, StandardCharsets.UTF_8);
}
backingStore = new LmdbSailStore(dataDir, config);
this.store = new SnapshotSailStore(backingStore, () -> new MemoryOverflowModel() {
this.store = new SnapshotSailStore(backingStore, () -> new MemoryOverflowModel(false) {
@Override
protected SailStore createSailStore(File dataDir) throws IOException, SailException {
// Model can't fit into memory, use another LmdbSailStore to store delta
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,23 +67,17 @@ abstract class MemoryOverflowModel extends AbstractModel {

private long maxBlockSize = 0;

SimpleValueFactory vf = SimpleValueFactory.getInstance();
private final boolean verifyAdditions;

public MemoryOverflowModel() {
memory = new LinkedHashModel(LARGE_BLOCK);
}
private final SimpleValueFactory vf = SimpleValueFactory.getInstance();

public MemoryOverflowModel(Model model) {
this(model.getNamespaces());
addAll(model);
}

public MemoryOverflowModel(Set<Namespace> namespaces, Collection<? extends Statement> c) {
this(namespaces);
addAll(c);
public MemoryOverflowModel(boolean verifyAdditions) {
this.verifyAdditions = verifyAdditions;
memory = new LinkedHashModel(LARGE_BLOCK);
}

public MemoryOverflowModel(Set<Namespace> namespaces) {
public MemoryOverflowModel(Set<Namespace> namespaces, boolean verifyAdditions) {
this.verifyAdditions = verifyAdditions;
memory = new LinkedHashModel(namespaces, LARGE_BLOCK);
}

Expand Down Expand Up @@ -271,7 +265,7 @@ private synchronized void overflowToDisk() {
dataDir = Files.createTempDirectory("model").toFile();
logger.debug("memory overflow using temp directory {}", dataDir);
store = createSailStore(dataDir);
disk = new SailSourceModel(store) {
disk = new SailSourceModel(store, verifyAdditions) {

@Override
protected void finalize() throws Throwable {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ class SailSourceModel extends AbstractModel {

private static final Logger logger = LoggerFactory.getLogger(SailSourceModel.class);

private final boolean verifyAdditions;

private final class StatementIterator implements Iterator<Statement> {

final CloseableIteration<? extends Statement, SailException> stmts;
Expand Down Expand Up @@ -97,16 +99,15 @@ public void remove() {

SailSink sink;

private long size;

private final IsolationLevels level = IsolationLevels.NONE;

public SailSourceModel(SailStore store) {
this(store.getExplicitSailSource());
public SailSourceModel(SailStore store, boolean verifyAdditions) {
this(store.getExplicitSailSource(), verifyAdditions);
}

public SailSourceModel(SailSource source) {
public SailSourceModel(SailSource source, boolean verifyAdditions) {
this.source = source;
this.verifyAdditions = verifyAdditions;
}

@Override
Expand Down Expand Up @@ -147,21 +148,20 @@ public String toString() {

@Override
public synchronized int size() {
if (size < 0) {
long size = 0;
try {
CloseableIteration<? extends Statement, SailException> iter;
iter = dataset().getStatements(null, null, null);
try {
CloseableIteration<? extends Statement, SailException> iter;
iter = dataset().getStatements(null, null, null);
try {
while (iter.hasNext()) {
iter.next();
size++;
}
} finally {
iter.close();
while (iter.hasNext()) {
iter.next();
size++;
}
} catch (SailException e) {
throw new ModelException(e);
} finally {
iter.close();
}
} catch (SailException e) {
throw new ModelException(e);
}
if (size > Integer.MAX_VALUE) {
return Integer.MAX_VALUE;
Expand Down Expand Up @@ -243,13 +243,10 @@ public synchronized boolean add(Resource subj, IRI pred, Value obj, Resource...
throw new UnsupportedOperationException("Incomplete statement");
}
try {
if (contains(subj, pred, obj, contexts)) {
if (verifyAdditions && contains(subj, pred, obj, contexts)) {
logger.trace("already contains statement {} {} {} {}", subj, pred, obj, contexts);
return false;
}
if (size >= 0) {
size++;
}
if (contexts == null || contexts.length == 0) {
sink().approve(subj, pred, obj, null);
} else {
Expand All @@ -268,7 +265,6 @@ public synchronized boolean clear(Resource... contexts) {
try {
if (contains(null, null, null, contexts)) {
sink().clear(contexts);
size = -1;
return true;
}
} catch (SailException e) {
Expand All @@ -279,25 +275,23 @@ public synchronized boolean clear(Resource... contexts) {

@Override
public synchronized boolean remove(Resource subj, IRI pred, Value obj, Resource... contexts) {
boolean removed = false;
try {
if (contains(subj, pred, obj, contexts)) {
size = -1;
CloseableIteration<? extends Statement, SailException> stmts;
stmts = dataset().getStatements(subj, pred, obj, contexts);
try {
while (stmts.hasNext()) {
Statement st = stmts.next();
sink().deprecate(st);
}
} finally {
stmts.close();
CloseableIteration<? extends Statement, SailException> stmts = dataset().getStatements(subj, pred, obj,
contexts);
try {
while (stmts.hasNext()) {
Statement st = stmts.next();
sink().deprecate(st);
removed = true;
}
return true;
} finally {
stmts.close();
}
} catch (SailException e) {
throw new ModelException(e);
}
return false;
return removed;
}

@Override
Expand Down Expand Up @@ -372,7 +366,6 @@ public synchronized void removeTermIteration(Iterator<Statement> iter, Resource
} finally {
stmts.close();
}
size = -1;
} catch (SailException e) {
throw new ModelException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ protected SailSourceModel getNewModel() {
LmdbSailStore store = new LmdbSailStore(Files.createTempDirectory("SailSourceModelTest-").toFile(),
new LmdbStoreConfig("spoc"));
stores.add(store);
return new SailSourceModel(store);
return new SailSourceModel(store, false);
} catch (Exception e) {
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public class OverflowBenchmarkSynthetic {
@Setup(Level.Trial)
public void setup() {
((Logger) (LoggerFactory
.getLogger("org.eclipse.rdf4j.sail.lmdbrdf.MemoryOverflowModel")))
.getLogger("org.eclipse.rdf4j.sail.lmdb.MemoryOverflowModel")))
.setLevel(ch.qos.logback.classic.Level.DEBUG);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.Optional;
import java.util.Set;

import org.eclipse.rdf4j.common.io.FileUtil;
import org.eclipse.rdf4j.model.IRI;
import org.eclipse.rdf4j.model.Model;
import org.eclipse.rdf4j.model.Namespace;
Expand All @@ -39,8 +40,6 @@
* Model implementation that stores in a {@link LinkedHashModel} until more than 10KB statements are added and the
* estimated memory usage is more than the amount of free memory available. Once the threshold is cross this
* implementation seamlessly changes to a disk based {@link SailSourceModel}.
*
* @author James Leigh
*/
abstract class MemoryOverflowModel extends AbstractModel {

Expand All @@ -56,35 +55,29 @@ abstract class MemoryOverflowModel extends AbstractModel {

final Logger logger = LoggerFactory.getLogger(MemoryOverflowModel.class);

private volatile LinkedHashModel memory;
private LinkedHashModel memory;

private transient File dataDir;
transient File dataDir;

private transient SailStore store;
transient SailStore store;

private transient volatile SailSourceModel disk;
transient SailSourceModel disk;

private long baseline = 0;

private long maxBlockSize = 0;

SimpleValueFactory vf = SimpleValueFactory.getInstance();

public MemoryOverflowModel() {
memory = new LinkedHashModel(LARGE_BLOCK);
}
private final boolean verifyAdditions;

public MemoryOverflowModel(Model model) {
this(model.getNamespaces());
addAll(model);
}
private final SimpleValueFactory vf = SimpleValueFactory.getInstance();

public MemoryOverflowModel(Set<Namespace> namespaces, Collection<? extends Statement> c) {
this(namespaces);
addAll(c);
public MemoryOverflowModel(boolean verifyAdditions) {
this.verifyAdditions = verifyAdditions;
memory = new LinkedHashModel(LARGE_BLOCK);
}

public MemoryOverflowModel(Set<Namespace> namespaces) {
public MemoryOverflowModel(Set<Namespace> namespaces, boolean verifyAdditions) {
this.verifyAdditions = verifyAdditions;
memory = new LinkedHashModel(namespaces, LARGE_BLOCK);
}

Expand Down Expand Up @@ -194,15 +187,11 @@ public synchronized void removeTermIteration(Iterator<Statement> iter, Resource

protected abstract SailStore createSailStore(File dataDir) throws IOException, SailException;

private Model getDelegate() {
LinkedHashModel memory = this.memory;
if (memory != null) {
synchronized Model getDelegate() {
if (disk == null) {
return memory;
} else {
synchronized (this) {
return disk;
}
}
return disk;
}

private void writeObject(ObjectOutputStream s) throws IOException {
Expand Down Expand Up @@ -272,15 +261,32 @@ private synchronized void checkMemoryOverflow() {

private synchronized void overflowToDisk() {
try {
LinkedHashModel memory = this.memory;
this.memory = null;

assert disk == null;
dataDir = Files.createTempDirectory("model").toFile();
logger.debug("memory overflow using temp directory {}", dataDir);
store = createSailStore(dataDir);
disk = new SailSourceModel(store);
disk = new SailSourceModel(store, verifyAdditions) {

@Override
protected void finalize() throws Throwable {
logger.debug("finalizing {}", dataDir);
if (disk == this) {
try {
store.close();
} catch (SailException e) {
logger.error(e.toString(), e);
} finally {
FileUtil.deleteDir(dataDir);
dataDir = null;
store = null;
disk = null;
}
}
super.finalize();
}
};
disk.addAll(memory);
memory = new LinkedHashModel(memory.getNamespaces(), LARGE_BLOCK);
logger.debug("overflow synced to disk");
} catch (IOException | SailException e) {
String path = dataDir != null ? dataDir.getAbsolutePath() : "(unknown)";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,10 @@ public class NativeStore extends AbstractNotifyingSail implements FederatedServi
final static class MemoryOverflowIntoNativeStore extends MemoryOverflowModel {
private static final long serialVersionUID = 1L;

public MemoryOverflowIntoNativeStore() {
super(false);
}

/**
* The class is static to avoid taking a pointer which might make it hard to get a phantom reference.
*/
Expand Down
Loading

0 comments on commit 2b721c2

Please sign in to comment.