Skip to content

Commit

Permalink
Merge pull request #5 from ebremer/develop
Browse files Browse the repository at this point in the history
Fix memory leak in PAW, add License, and update README.md
  • Loading branch information
ebremer authored May 1, 2023
2 parents dbda61f + 3a6a491 commit a18086a
Show file tree
Hide file tree
Showing 9 changed files with 174 additions and 118 deletions.
12 changes: 6 additions & 6 deletions nbactions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@
<goal>org.codehaus.mojo:exec-maven-plugin:3.0.0:exec</goal>
</goals>
<properties>
<exec.vmArgs>--enable-preview -Xmx32G -Xms32G --add-opens java.base/java.nio=ALL-UNNAMED</exec.vmArgs>
<exec.vmArgs>--enable-preview -Xmx32G -Xms32G --add-opens java.base/java.nio=ALL-UNNAMED -Darrow.memory.debug.allocator=true</exec.vmArgs>
<exec.args>${exec.vmArgs} -classpath %classpath ${exec.mainClass} ${exec.appArgs}</exec.args>
<exec.appArgs></exec.appArgs>
<exec.mainClass>com.ebremer.beakgraph.rdf.BeakGraph</exec.mainClass>
<exec.mainClass>com.ebremer.beakgraph.rdf.Test</exec.mainClass>
<exec.executable>java</exec.executable>
</properties>
</action>
Expand All @@ -27,10 +27,10 @@
<goal>org.codehaus.mojo:exec-maven-plugin:3.0.0:exec</goal>
</goals>
<properties>
<exec.vmArgs>--enable-preview -Xmx32G -Xms32G --add-opens java.base/java.nio=ALL-UNNAMED -agentlib:jdwp=transport=dt_socket,server=n,address=${jpda.address}</exec.vmArgs>
<exec.vmArgs>--enable-preview -Xmx32G -Xms32G --add-opens java.base/java.nio=ALL-UNNAMED -Darrow.memory.debug.allocator=true -agentlib:jdwp=transport=dt_socket,server=n,address=${jpda.address}</exec.vmArgs>
<exec.args>${exec.vmArgs} -classpath %classpath ${exec.mainClass} ${exec.appArgs}</exec.args>
<exec.appArgs></exec.appArgs>
<exec.mainClass>com.ebremer.beakgraph.rdf.BeakGraph</exec.mainClass>
<exec.mainClass>com.ebremer.beakgraph.rdf.Test</exec.mainClass>
<exec.executable>java</exec.executable>
<jpda.listen>true</jpda.listen>
</properties>
Expand All @@ -45,9 +45,9 @@
<goal>org.codehaus.mojo:exec-maven-plugin:3.0.0:exec</goal>
</goals>
<properties>
<exec.vmArgs>--enable-preview -Xmx32G -Xms32G --add-opens java.base/java.nio=ALL-UNNAMED</exec.vmArgs>
<exec.vmArgs>--enable-preview -Xmx32G -Xms32G --add-opens java.base/java.nio=ALL-UNNAMED -Darrow.memory.debug.allocator=true</exec.vmArgs>
<exec.args>${exec.vmArgs} -classpath %classpath ${exec.mainClass} ${exec.appArgs}</exec.args>
<exec.mainClass>com.ebremer.beakgraph.rdf.BeakGraph</exec.mainClass>
<exec.mainClass>com.ebremer.beakgraph.rdf.Test</exec.mainClass>
<exec.executable>java</exec.executable>
<exec.appArgs></exec.appArgs>
</properties>
Expand Down
16 changes: 11 additions & 5 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,14 @@
<modelVersion>4.0.0</modelVersion>
<groupId>com.ebremer</groupId>
<artifactId>BeakGraph</artifactId>
<version>0.0.0</version>
<version>0.1.0</version>
<packaging>jar</packaging>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.10.1</version>
<version>3.11.0</version>
<configuration>
<compilerArgs>
<arg>--enable-preview</arg>
Expand All @@ -25,13 +25,14 @@
<maven.compiler.target>17</maven.compiler.target>
<exec.mainClass>com.ebremer.beakgraph.BeakGraph</exec.mainClass>
<jena.ver>4.6.1</jena.ver>
<arrow.version>11.0.0</arrow.version>
<arrow.version>11.0.0</arrow.version>
<zip4j.version>2.11.5</zip4j.version>
</properties>
<dependencies>
<dependency>
<groupId>com.ebremer</groupId>
<artifactId>RO-Crate4J</artifactId>
<version>0.0.0</version>
<version>0.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.jena</groupId>
Expand All @@ -51,6 +52,11 @@
<version>${jena.ver}</version>
<type>jar</type>
</dependency>
<dependency>
<groupId>org.apache.arrow</groupId>
<artifactId>arrow-memory-core</artifactId>
<version>${arrow.version}</version>
</dependency>
<dependency>
<groupId>org.apache.arrow</groupId>
<artifactId>arrow-vector</artifactId>
Expand Down Expand Up @@ -84,7 +90,7 @@
<dependency>
<groupId>net.lingala.zip4j</groupId>
<artifactId>zip4j</artifactId>
<version>2.11.2</version>
<version>${zip4j.version}</version>
<type>jar</type>
</dependency>
</dependencies>
Expand Down
3 changes: 2 additions & 1 deletion src/main/java/com/ebremer/beakgraph/rdf/BeakGraph.java
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ public ReorderTransformation getReorderTransform() {
return null;
}

/*
public static void main(String[] args) throws IOException {
//JenaSystem.init();
//File f = new File("D:\\HalcyonStorage\\heatmaps\\j3.zip");
Expand Down Expand Up @@ -155,5 +156,5 @@ public static void main(String[] args) throws IOException {
QueryExecution qe = QueryExecutionFactory.create(pss.toString(), m);
ResultSet rs = qe.execSelect();
ResultSetFormatter.out(System.out,rs);
}
}*/
}
25 changes: 13 additions & 12 deletions src/main/java/com/ebremer/beakgraph/rdf/BeakReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -71,19 +71,20 @@ public BeakReader(URI uri) throws FileNotFoundException, IOException {
String x = qs.get("file").asResource().getURI();
try {
SeekableByteChannel xxx = reader.getSeekableByteChannel(x);
ArrowFileReader afr = new ArrowFileReader(xxx, root);
VectorSchemaRoot za = afr.getVectorSchemaRoot();
afr.loadNextBatch();
StructVector v = (StructVector) za.getVector(0);
String p = v.getName();
String dt = p.substring(0, 1);
numtriples = numtriples + v.getValueCount();
p = p.substring(1);
if (!byPredicate.containsKey(p)) {
byPredicate.put(p, new PAR(p));
try (ArrowFileReader afr = new ArrowFileReader(xxx, root)) {
VectorSchemaRoot za = afr.getVectorSchemaRoot();
afr.loadNextBatch();
StructVector v = (StructVector) za.getVector(0);
String p = v.getName();
String dt = p.substring(0, 1);
numtriples = numtriples + v.getValueCount();
p = p.substring(1);
if (!byPredicate.containsKey(p)) {
byPredicate.put(p, new PAR(p));
}
PAR par = byPredicate.get(p);
par.put(dt, v);
}
PAR par = byPredicate.get(p);
par.put(dt, v);
} catch (FileNotFoundException ex) {
Logger.getLogger(ROCrateReader.class.getName()).log(Level.SEVERE, null, ex);
} catch (IOException ex) {
Expand Down
114 changes: 53 additions & 61 deletions src/main/java/com/ebremer/beakgraph/rdf/BeakWriter.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package com.ebremer.beakgraph.rdf;

import com.ebremer.rocrate4j.ROCrate;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.channels.Channels;
Expand All @@ -28,9 +27,11 @@
import net.lingala.zip4j.model.FileHeader;
import net.lingala.zip4j.model.enums.CompressionMethod;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.LargeVarCharVector;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.complex.StructVector;
import org.apache.arrow.vector.dictionary.Dictionary;
import org.apache.arrow.vector.dictionary.DictionaryProvider.MapDictionaryProvider;
import org.apache.arrow.vector.ipc.ArrowFileWriter;
Expand All @@ -49,48 +50,25 @@
*
* @author erich
*/
public final class BeakWriter {
public final class BeakWriter implements AutoCloseable {
private NodeTable nt;
private final BufferAllocator allocator;
private final MapDictionaryProvider provider = new MapDictionaryProvider();
private Dictionary dictionary;
private final CopyOnWriteArrayList<Field> fields = new CopyOnWriteArrayList<>();
private final CopyOnWriteArrayList<FieldVector> vectors = new CopyOnWriteArrayList<>();
private NodeTable nt;
private LargeVarCharVector dict;
private Resource metairi;
private final HashMap<String,PAW> byPredicate = new HashMap<>();
private final HashMap<String,Integer> blanknodes;
private final ConcurrentHashMap<String,Job> Jobs = new ConcurrentHashMap<>();
private final HashMap<String,Integer> resources = new HashMap<>();
private boolean locked = false;
private String base;
private ROCrate.ROCrateBuilder roc;
private BufferAllocator allocator;
private final String base;
private final BGVoID VoID = new BGVoID();
private LargeVarCharVector dict;

public BeakWriter(BufferAllocator allocator, ROCrate.ROCrateBuilder roc, String base) throws IOException {
//this.m = m;
this.allocator = allocator;
public BeakWriter(ROCrate.ROCrateBuilder roc, String base) {
this.allocator = new RootAllocator();
this.base = base;
this.roc = roc;
//StopWatch sw = new StopWatch();
//sw.LapStart("Create Dictionary");
blanknodes = new HashMap<>(2500000);
//CreateDictionary(allocator);
//nt = new NodeTable(dictionary);
//nt.setBlankNodes(blanknodes);
//sw.Lap("Dictionary Created");
//sw.LapStart("Create Predicate Vectors");

//System.out.println("# of vectors : "+vectors.size());
//sw.Lap("Predicate Vectors Created");

//sw.LapStart("Generate VoID Data");
//Model VoID = BGVoID.GenerateVoID(metairi, m);
//metairi.getModel().add(VoID);
//sw.Lap("VoID Data Generated");

//DisplayMeta();
//sw.Lapse("BeakGraph Completed");
}

public Resource getMetaIRI() {
Expand Down Expand Up @@ -143,11 +121,11 @@ public void Register(Model m) {

public void Add(Model m) {
m.listStatements().forEach(s->{
ProcessTriple(allocator, s);
ProcessTriple(s);
});
}

public void CreateDictionary(BufferAllocator allocator) {
public void CreateDictionary() {
System.out.println("Creating Dictionary...");
DictionaryEncoding dictionaryEncoding = new DictionaryEncoding(0, true, new ArrowType.Int(32, true));
dict = new LargeVarCharVector("Resource Dictionary", allocator);
Expand All @@ -161,9 +139,9 @@ public void CreateDictionary(BufferAllocator allocator) {
dict.setValueCount(resources.size());
System.out.println("RESOURCES # : "+resources.size());
System.out.println("BNODES # : "+blanknodes.size());
dictionary = new Dictionary(dict, dictionaryEncoding);
Dictionary dictionary = new Dictionary(dict, dictionaryEncoding);
provider.put(dictionary);
locked = true;
//locked = true;
nt = new NodeTable(dictionary);
//blanknodes.forEach((k,v)->{
// blanknodes.put(k,v-blanknodes.size());
Expand Down Expand Up @@ -229,34 +207,35 @@ public Resource WriteDataToFile(String base, ROCrate.ROCrateBuilder roc) {
root.setRowCount(v.getValueCount());
OutputStream zos = roc.getDestination().GetOutputStream(base+"/"+MD5(v.getName().getBytes()), CompressionMethod.STORE);
CountingOutputStream cos = new CountingOutputStream(zos);
ArrowFileWriter writer = new ArrowFileWriter(root, null, Channels.newChannel(cos));
writer.start();
writer.writeBatch();
writer.end();
long numbytes = cos.getNumberOfBytesWritten();
if (zos instanceof ZipOutputStream zz) {
FileHeader fh = zz.closeEntry();
fh.setUncompressedSize(numbytes);
try (SpecialArrowFileWriter writer = new SpecialArrowFileWriter(root, null, Channels.newChannel(cos))) {
writer.start();
writer.writeBatch();
writer.end();
long numbytes = cos.getNumberOfBytesWritten();
if (zos instanceof ZipOutputStream zz) {
FileHeader fh = zz.closeEntry();
fh.setUncompressedSize(numbytes);
}
roc
.Add(target, base, MD5(v.getField().getName().getBytes()), CompressionMethod.STORE, true)
.addProperty(SchemaDO.name, v.getField().getName().substring(1))
.addProperty(BG.property, ResourceFactory.createResource(v.getName().substring(1,v.getName().length())))
.addProperty(SchemaDO.encodingFormat, "application/vnd.apache.arrow.file")
.addLiteral(SchemaDO.contentSize, numbytes)
.addProperty(RDF.type, SchemaDO.MediaObject)
.addLiteral(BG.triples, v.getValueCount())
.addProperty(RDF.type, BG.PredicateVector);
}
roc
.Add(target, base, MD5(v.getField().getName().getBytes()), CompressionMethod.STORE, true)
.addProperty(SchemaDO.name, v.getField().getName().substring(1))
.addProperty(BG.property, ResourceFactory.createResource(v.getName().substring(1,v.getName().length())))
.addProperty(SchemaDO.encodingFormat, "application/vnd.apache.arrow.file")
.addLiteral(SchemaDO.contentSize, numbytes)
.addProperty(RDF.type, SchemaDO.MediaObject)
.addLiteral(BG.triples, v.getValueCount())
.addProperty(RDF.type, BG.PredicateVector);
} catch (IOException ex) {
Logger.getLogger(BeakWriter.class.getName()).log(Level.SEVERE, null, ex);
}
v.close();
});
System.out.println("================== FILE WRITTEN =====================================");
return target;
}

public void ProcessTriple(BufferAllocator allocator, Statement stmt) {
//System.out.println("ProcessTriple : "+stmt);
public void ProcessTriple(Statement stmt) {
VoID.Add(stmt);
Resource res = stmt.getSubject();
String s;
Expand Down Expand Up @@ -293,47 +272,47 @@ public void ProcessTriple(BufferAllocator allocator, Statement stmt) {
} else switch (ct) {
case "org.apache.jena.rdf.model.impl.ResourceImpl": {
if (!byPredicate.containsKey(p)) {
byPredicate.put(p, new PAW(allocator, nt, p));
byPredicate.put(p, new PAW(allocator.newChildAllocator("PAW -> "+p, 0, 1024*1024*1024), nt, p));
}
byPredicate.get(p).set(res, o.asResource());
break;
}
case "java.math.BigInteger": {
long oo = o.asLiteral().getLong();
if (!byPredicate.containsKey(p)) {
byPredicate.put(p, new PAW(allocator, nt, p));
byPredicate.put(p, new PAW(allocator.newChildAllocator("PAW -> "+p, 0, 1024*1024*1024), nt, p));
}
byPredicate.get(p).set(res, oo);
break;
}
case "java.lang.Integer": {
int oo = o.asLiteral().getInt();
if (!byPredicate.containsKey(p)) {
byPredicate.put(p, new PAW(allocator, nt, p));
byPredicate.put(p, new PAW(allocator.newChildAllocator("PAW -> "+p, 0, 1024*1024*1024), nt, p));
}
byPredicate.get(p).set(res, oo);
break;
}
case "java.lang.Long": {
long oo = o.asLiteral().getLong();
if (!byPredicate.containsKey(p)) {
byPredicate.put(p, new PAW(allocator, nt, p));
byPredicate.put(p, new PAW(allocator.newChildAllocator("PAW -> "+p, 0, 1024*1024*1024), nt, p));
}
byPredicate.get(p).set(res, oo);
break;
}
case "java.lang.Float": {
float oo = o.asLiteral().getFloat();
if (!byPredicate.containsKey(p)) {
byPredicate.put(p, new PAW(allocator, nt, p));
byPredicate.put(p, new PAW(allocator.newChildAllocator("PAW -> "+p, 0, 1024*1024*1024), nt, p));
}
byPredicate.get(p).set(res, oo);
break;
}
case "java.lang.String": {
String oo = o.asLiteral().toString();
if (!byPredicate.containsKey(p)) {
byPredicate.put(p, new PAW(allocator, nt, p));
byPredicate.put(p, new PAW(allocator.newChildAllocator("PAW -> "+p, 0, 1024*1024*1024), nt, p));
}
byPredicate.get(p).set(res, oo);
break;
Expand All @@ -346,7 +325,7 @@ public void ProcessTriple(BufferAllocator allocator, Statement stmt) {
}
}

public void Create(BufferAllocator allocator) throws IOException {
public void Create(ROCrate.ROCrateBuilder roc) throws IOException {
System.out.println("Creating BeakGraph...");
int cores = Runtime.getRuntime().availableProcessors();
System.out.println(cores+" cores available");
Expand Down Expand Up @@ -383,6 +362,19 @@ public void run() {
metairi = WriteDictionaryToFile(base, roc);
WriteDataToFile(base, roc);
}

@Override
public void close() {
vectors.forEach(v->{
v.close();
});
byPredicate.forEach((k,paw)->{
paw.close();
});
nt.close();
dict.close();
allocator.close();
}

class PredicateProcessor implements Callable<Model> {
private final PAW pa;
Expand Down
4 changes: 0 additions & 4 deletions src/main/java/com/ebremer/beakgraph/rdf/Job.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,3 @@
/*
* Click nbfs://nbhost/SystemFileSystem/Templates/Licenses/license-default.txt to change this license
* Click nbfs://nbhost/SystemFileSystem/Templates/Classes/Class.java to edit this template
*/
package com.ebremer.beakgraph.rdf;

import java.util.concurrent.Callable;
Expand Down
Loading

0 comments on commit a18086a

Please sign in to comment.