Skip to content

Commit

Permalink
Add async dir parser
Browse files Browse the repository at this point in the history
  • Loading branch information
ate47 committed Jan 9, 2023
1 parent 74dd119 commit 8335366
Show file tree
Hide file tree
Showing 18 changed files with 519 additions and 151 deletions.
8 changes: 6 additions & 2 deletions hdt-api/src/main/java/org/rdfhdt/hdt/enums/RDFNotation.java
Original file line number Diff line number Diff line change
Expand Up @@ -185,8 +185,12 @@ public static RDFNotation guess(String fileName) throws IllegalArgumentException

throw new IllegalArgumentException("Could not guess the format for "+fileName);
}

public static RDFNotation guess(File fileName) throws IllegalArgumentException {
return guess(fileName.getName());
return guess(fileName.getAbsolutePath());
}

public static RDFNotation guess(Path fileName) throws IllegalArgumentException {
return guess(fileName.toAbsolutePath().toString());
}
}
54 changes: 54 additions & 0 deletions hdt-api/src/main/java/org/rdfhdt/hdt/options/HDTOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
import org.rdfhdt.hdt.rdf.RDFFluxStop;
import org.rdfhdt.hdt.util.Profiler;

import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.function.DoubleSupplier;
Expand All @@ -42,6 +44,58 @@
* @author mario.arias
*/
public interface HDTOptions {
/**
* empty option, can be used to set values
*/
HDTOptions EMPTY = new HDTOptions() {
@Override
public void clear() {
// already empty
}

@Override
public String get(String key) {
// no value for key
return null;
}

@Override
public void set(String key, String value) {
throw new NotImplementedException("set");
}
};

/**
* @return create empty, modifiable options
*/
static HDTOptions of() {
return of(Map.of());
}

/**
* create modifiable options starting from the copy of the data map
* @param data data map
* @return options
*/
static HDTOptions of(Map<String, String> data) {
Map<String, String> map = new HashMap<>(data);
return new HDTOptions() {
@Override
public void clear() {
map.clear();
}

@Override
public String get(String key) {
return map.get(key);
}

@Override
public void set(String key, String value) {
map.put(key, value);
}
};
}


/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,12 @@ public class HDTOptionsKeys {
*/
@Key(type = Key.Type.BOOLEAN, desc = "Use the canonical NT file parser, removing checks")
public static final String NT_SIMPLE_PARSER_KEY = "parser.ntSimpleParser";
/**
* Key for setting the maximum amount of file loaded with the directory parser, 1 for no async parsing, 0
* for the number of processors, default 1. Number value
*/
@Key(type = Key.Type.NUMBER, desc = "Use async dir parser")
public static final String ASYNC_DIR_PARSER_KEY = "parser.dir.async";
/**
* Key for setting the triple order. see {@link org.rdfhdt.hdt.enums.TripleComponentOrder}'s names to have the values
* default to {@link org.rdfhdt.hdt.enums.TripleComponentOrder#SPO}
Expand Down
11 changes: 11 additions & 0 deletions hdt-api/src/main/java/org/rdfhdt/hdt/rdf/RDFParserCallback.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,17 @@ public interface RDFParserCallback {
@FunctionalInterface
interface RDFCallback {
void processTriple(TripleString triple, long pos);

/**
* @return an async version of this callback
*/
default RDFCallback async() {
return ((triple, pos) -> {
synchronized (this) {
this.processTriple(triple, pos);
}
});
}
}

void doParse(String fileName, String baseUri, RDFNotation notation, boolean keepBNode, RDFCallback callback) throws ParserException;
Expand Down
15 changes: 5 additions & 10 deletions hdt-java-core/src/main/java/org/rdfhdt/hdt/hdt/HDTManagerImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,6 @@
public class HDTManagerImpl extends HDTManager {
private static final Logger logger = LoggerFactory.getLogger(HDTManagerImpl.class);

private boolean useSimple(HDTOptions spec) {
String value = spec.get(HDTOptionsKeys.NT_SIMPLE_PARSER_KEY);
return value != null && !value.isEmpty() && !value.equals("false");
}

@Override
public HDTOptions doReadOptions(String file) throws IOException {
return new HDTSpecification(file);
Expand Down Expand Up @@ -161,13 +156,13 @@ public HDT doGenerateHDT(String rdfFileName, String baseURI, RDFNotation rdfNota
} else if (HDTOptionsKeys.LOADER_TYPE_VALUE_CAT.equals(loaderType)) {
return doHDTCatTree(readFluxStopOrSizeLimit(spec), HDTSupplier.fromSpec(spec), rdfFileName, baseURI, rdfNotation, spec, listener);
} else if (HDTOptionsKeys.LOADER_TYPE_VALUE_TWO_PASS.equals(loaderType)) {
loader = new TempHDTImporterTwoPass(useSimple(spec));
loader = new TempHDTImporterTwoPass(spec);
} else {
if (loaderType != null && !HDTOptionsKeys.LOADER_TYPE_VALUE_ONE_PASS.equals(loaderType)) {
logger.warn("Used the option {} with value {}, which isn't recognize, using default value {}",
HDTOptionsKeys.LOADER_TYPE_KEY, loaderType, HDTOptionsKeys.LOADER_TYPE_VALUE_ONE_PASS);
}
loader = new TempHDTImporterOnePass(useSimple(spec));
loader = new TempHDTImporterOnePass(spec);
}

// Create TempHDT
Expand Down Expand Up @@ -229,7 +224,7 @@ public HDT doGenerateHDT(Iterator<TripleString> triples, String baseURI, HDTOpti
HDTOptionsKeys.LOADER_TYPE_KEY, loaderType, HDTOptionsKeys.LOADER_TYPE_VALUE_ONE_PASS);
}
}
loader = new TempHDTImporterOnePass(useSimple(spec));
loader = new TempHDTImporterOnePass(spec);
}

// Create TempHDT
Expand Down Expand Up @@ -264,7 +259,7 @@ public HDT doGenerateHDTDisk(InputStream fileStream, String baseURI, RDFNotation
// uncompress the stream if required
fileStream = IOUtil.asUncompressed(fileStream, compressionType);
// create a parser for this rdf stream
RDFParserCallback parser = RDFParserFactory.getParserCallback(rdfNotation, useSimple(hdtFormat));
RDFParserCallback parser = RDFParserFactory.getParserCallback(rdfNotation, hdtFormat);
// read the stream as triples
try (PipedCopyIterator<TripleString> iterator = RDFParserFactory.readAsIterator(parser, fileStream, baseURI, true, rdfNotation)) {
return doGenerateHDTDisk0(iterator, true, baseURI, hdtFormat, listener);
Expand Down Expand Up @@ -380,7 +375,7 @@ protected HDT doHDTCatTree(RDFFluxStop fluxStop, HDTSupplier supplier, String fi

@Override
protected HDT doHDTCatTree(RDFFluxStop fluxStop, HDTSupplier supplier, InputStream stream, String baseURI, RDFNotation rdfNotation, HDTOptions hdtFormat, ProgressListener listener) throws IOException, ParserException {
RDFParserCallback parser = RDFParserFactory.getParserCallback(rdfNotation, useSimple(hdtFormat));
RDFParserCallback parser = RDFParserFactory.getParserCallback(rdfNotation, hdtFormat);
try (PipedCopyIterator<TripleString> iterator = RDFParserFactory.readAsIterator(parser, stream, baseURI, true, rdfNotation)) {
return doHDTCatTree(fluxStop, supplier, iterator, baseURI, hdtFormat, listener);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,17 +73,17 @@ public void processTriple(TripleString triple, long pos) {
}
}

private final boolean useSimple;
private final HDTOptions spec;

public TempHDTImporterOnePass(boolean useSimple) {
this.useSimple = useSimple;
public TempHDTImporterOnePass(HDTOptions spec) {
this.spec = spec;
}

@Override
public TempHDT loadFromRDF(HDTOptions specs, String filename, String baseUri, RDFNotation notation, ProgressListener listener)
throws ParserException {

RDFParserCallback parser = RDFParserFactory.getParserCallback(notation, useSimple);
RDFParserCallback parser = RDFParserFactory.getParserCallback(notation, spec);

// Create Modifiable Instance
TempHDT modHDT = new TempHDTImpl(specs, baseUri, ModeOfLoading.ONE_PASS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,17 +99,17 @@ public void processTriple(TripleString triple, long pos) {
}
}

private final boolean useSimple;
private final HDTOptions spec;

public TempHDTImporterTwoPass(boolean useSimple) {
this.useSimple = useSimple;
public TempHDTImporterTwoPass(HDTOptions spec) {
this.spec = spec;
}

@Override
public TempHDT loadFromRDF(HDTOptions specs, String filename, String baseUri, RDFNotation notation, ProgressListener listener)
throws ParserException {

RDFParserCallback parser = RDFParserFactory.getParserCallback(notation, useSimple);
RDFParserCallback parser = RDFParserFactory.getParserCallback(notation, spec);

// Create Modifiable Instance and parser
TempHDT modHDT = new TempHDTImpl(specs, baseUri, ModeOfLoading.TWO_PASS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
import org.rdfhdt.hdt.enums.RDFNotation;
import org.rdfhdt.hdt.exceptions.NotImplementedException;
import org.rdfhdt.hdt.iterator.utils.PipedCopyIterator;
import org.rdfhdt.hdt.options.HDTOptions;
import org.rdfhdt.hdt.options.HDTOptionsKeys;
import org.rdfhdt.hdt.rdf.parsers.RDFParserDir;
import org.rdfhdt.hdt.rdf.parsers.RDFParserHDT;
import org.rdfhdt.hdt.rdf.parsers.RDFParserList;
Expand All @@ -40,23 +42,23 @@
import org.rdfhdt.hdt.rdf.parsers.RDFParserZip;
import org.rdfhdt.hdt.triples.TripleString;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Iterator;

/**
* @author mario.arias
*
*/
public class RDFParserFactory {
public static boolean useSimple(HDTOptions options) {
return options != null && options.getBoolean(HDTOptionsKeys.NT_SIMPLE_PARSER_KEY, false);
}
public static RDFParserCallback getParserCallback(RDFNotation notation) {
return getParserCallback(notation, false);
return getParserCallback(notation, HDTOptions.EMPTY);
}
public static RDFParserCallback getParserCallback(RDFNotation notation, boolean useSimple) {
public static RDFParserCallback getParserCallback(RDFNotation notation, HDTOptions spec) {
switch(notation) {
case NTRIPLES:
if (useSimple) {
if (useSimple(spec)) {
return new RDFParserSimple();
}
case NQUAD:
Expand All @@ -65,15 +67,15 @@ public static RDFParserCallback getParserCallback(RDFNotation notation, boolean
case RDFXML:
return new RDFParserRIOT();
case DIR:
return new RDFParserDir(useSimple);
return new RDFParserDir(spec);
case LIST:
return new RDFParserList();
return new RDFParserList(spec);
case ZIP:
return new RDFParserZip(useSimple);
return new RDFParserZip(spec);
case TAR:
return new RDFParserTar(useSimple);
return new RDFParserTar(spec);
case RAR:
return new RDFParserRAR(useSimple);
return new RDFParserRAR(spec);
case HDT:
return new RDFParserHDT();
case JSONLD:
Expand Down
Loading

0 comments on commit 8335366

Please sign in to comment.