From 98b0daf3990610501d4a07a12204766691ad0a88 Mon Sep 17 00:00:00 2001 From: Kirill Yankov Date: Thu, 9 Jun 2022 17:05:13 +0200 Subject: [PATCH] finally caching of context #15 --- .../scala/org/dbpedia/databus/ApiImpl.scala | 39 +++---- .../databus/CachingJsonldContext.scala | 67 +++++++++++ .../org/dbpedia/databus/SparqlClient.scala | 109 +++++++++++++----- .../org/dbpedia/databus/CacheTests.scala | 50 ++++++++ .../dbpedia/databus/DatabusScalatraTest.scala | 5 +- 5 files changed, 208 insertions(+), 62 deletions(-) create mode 100644 src/main/scala/org/dbpedia/databus/CachingJsonldContext.scala create mode 100644 src/test/scala/org/dbpedia/databus/CacheTests.scala diff --git a/src/main/scala/org/dbpedia/databus/ApiImpl.scala b/src/main/scala/org/dbpedia/databus/ApiImpl.scala index dea1384..5e4a558 100644 --- a/src/main/scala/org/dbpedia/databus/ApiImpl.scala +++ b/src/main/scala/org/dbpedia/databus/ApiImpl.scala @@ -10,7 +10,7 @@ import org.apache.jena.rdf.model.Model import org.apache.jena.riot.Lang import org.apache.jena.shared.JenaException import org.dbpedia.databus.ApiImpl.Config -import org.dbpedia.databus.RdfConversions.{generateGraphId, graphToBytes, jsonLdContextUriString, mapContentType, readModel} +import org.dbpedia.databus.RdfConversions.{contextUri, generateGraphId, graphToBytes, mapContentType, readModel} import org.dbpedia.databus.swagger.api.DatabusApi import org.dbpedia.databus.swagger.model.{OperationFailure, OperationSuccess} import sttp.model.Uri @@ -31,7 +31,7 @@ class ApiImpl(config: Config) extends DatabusApi { override def dataidSubgraph(body: String)(request: HttpServletRequest): Try[String] = - readModel(body.getBytes, defaultLang) + readModel(body.getBytes, defaultLang, contextUri(body.getBytes, defaultLang)) .flatMap(m => Tractate.extract(m.getGraph, TractateV1.Version)) .map(_.stringForSigning) @@ -63,18 +63,11 @@ class ApiImpl(config: Config) extends DatabusApi { .map(_.toLowerCase) .getOrElse("") val lang = mapContentType(ct, defaultLang) - readModel(body.getBytes, lang) + val ctxUri = contextUri(body.getBytes, lang) + readModel(body.getBytes, lang, ctxUri) .flatMap(model => { saveToVirtuoso(model, graphId)({ - val ctxUriString = { - // TODO maybe extract it somehow from reader (custom reader needed) - if (lang.getName == Lang.JSONLD.getName) { - jsonLdContextUriString(body) - } else { - None - } - } - graphToBytes(model.getGraph, defaultLang, ctxUriString) + graphToBytes(model.getGraph, defaultLang, ctxUri) .flatMap(a => saveFiles(username, Map( pa -> a )).map(hash => OperationSuccess(graphId, hash))) @@ -124,19 +117,13 @@ class ApiImpl(config: Config) extends DatabusApi { val lang = getLangFromAcceptHeader(request) setResponseHeaders(Map("Content-Type" -> lang.getContentType.toHeaderString))(request) client.readFile(username, p) - .flatMap(body => - readModel(body, defaultLang) - .flatMap(m => { - val ctxUriString = { - // TODO maybe extract it somehow from reader (custom reader needed) - if (lang.getName == Lang.JSONLD.getName) { - jsonLdContextUriString(new String(body)) - } else { - None - } - } - graphToBytes(m.getGraph, lang, ctxUriString) - })) + .flatMap(body => { + val ctxUri = contextUri(body, defaultLang) + readModel(body, defaultLang, ctxUri) + .flatMap(m => + graphToBytes(m.getGraph, lang, ctxUri) + ) + }) .map(new String(_)) } @@ -155,7 +142,7 @@ class ApiImpl(config: Config) extends DatabusApi { } private[databus] def saveToVirtuoso[T](data: Array[Byte], lang: Lang, graphId: String)(execInTransaction: => Try[T]): Try[T] = - readModel(data, lang) + readModel(data, lang, contextUri(data, lang)) .flatMap(saveToVirtuoso(_, graphId)(execInTransaction)) private[databus] def saveToVirtuoso[T](model: Model, graphId: String)(execInTransaction: => Try[T]): Try[T] = { diff --git a/src/main/scala/org/dbpedia/databus/CachingJsonldContext.scala b/src/main/scala/org/dbpedia/databus/CachingJsonldContext.scala new file mode 100644 index 0000000..7981001 --- /dev/null +++ b/src/main/scala/org/dbpedia/databus/CachingJsonldContext.scala @@ -0,0 +1,67 @@ +package org.dbpedia.databus + +import java.util.concurrent.ConcurrentHashMap + +import com.github.jsonldjava.core.{Context, JsonLdOptions} +import org.dbpedia.databus.CachingJsonldContext.ApproxSizeStringKeyCache + +import scala.collection.JavaConverters._ + +class CachingJsonldContext(sizeLimit: Int, opts: JsonLdOptions) extends Context(opts) { + + private val cache = new ApproxSizeStringKeyCache[Context](sizeLimit) + + override def parse(ctx: Object): Context = + ctx match { + case s: String => + cache.get(s) + .map(c => super.parse(c)) + .getOrElse({ + val re = super.parse(ctx) + cache.put(s, re) + re + }) + case _ => super.parse(ctx) + } + + + +} + +object CachingJsonldContext { + + // not the most efficient impl, but should work for now :) + class ApproxSizeStringKeyCache[T](sizeLimit: Int) { + private val cache = new ConcurrentHashMap[StringCacheKey, T](sizeLimit) + + def put(s: String, c: T) = { + // not trying to keep the size strictly equal to the limit + cache.put(new StringCacheKey(s), c) + if (cache.size() > sizeLimit) { + keysSorted + .take(cache.size() - sizeLimit) + .foreach(cache.remove) + } + } + + def get(s: String): Option[T] = + Option(cache.get(new StringCacheKey(s))) + + def keysSorted: Seq[StringCacheKey] = + cache.keySet() + .asScala.toSeq.sorted + + } + + class StringCacheKey(val str: String, val order: Long = System.nanoTime()) extends Comparable[StringCacheKey] { + override def equals(other: Any): Boolean = other match { + case that: StringCacheKey => that.str == this.str + case _ => false + } + + override def hashCode(): Int = str.hashCode + + override def compareTo(o: StringCacheKey): Int = this.order.compareTo(o.order) + } + +} diff --git a/src/main/scala/org/dbpedia/databus/SparqlClient.scala b/src/main/scala/org/dbpedia/databus/SparqlClient.scala index 795ce79..b4cc8fc 100644 --- a/src/main/scala/org/dbpedia/databus/SparqlClient.scala +++ b/src/main/scala/org/dbpedia/databus/SparqlClient.scala @@ -3,15 +3,18 @@ package org.dbpedia.databus import java.io.{ByteArrayInputStream, ByteArrayOutputStream} -import com.github.jsonldjava.core.JsonLdConsts +import com.github.jsonldjava.core.{JsonLdConsts, JsonLdOptions} import com.github.jsonldjava.utils.JsonUtils import com.mchange.v2.c3p0.ComboPooledDataSource import org.apache.jena.atlas.json.JsonString import org.apache.jena.graph.{Graph, Node} import org.apache.jena.rdf.model.{Model, ModelFactory} +import org.apache.jena.riot.lang.JsonLDReader +import org.apache.jena.riot.system.StreamRDFLib import org.apache.jena.riot.writer.JsonLDWriter -import org.apache.jena.riot.{Lang, RDFDataMgr, RDFFormat, RDFLanguages, RDFWriter} +import org.apache.jena.riot.{Lang, RDFDataMgr, RDFFormat, RDFLanguages, RDFParserBuilder, RDFWriter, RIOT} import org.apache.jena.shacl.{ShaclValidator, Shapes, ValidationReport} +import org.apache.jena.sparql.util import org.dbpedia.databus.ApiImpl.Config import org.slf4j.LoggerFactory import sttp.client3.{DigestAuthenticationBackend, HttpURLConnectionBackend, basicRequest} @@ -151,15 +154,46 @@ class FusekiJDBCClient(host: String, port: Int, user: String, pass: String, data object RdfConversions { + private lazy val CachingContext = initCachingContext() + private val DefaultShaclLang = Lang.TTL - def readModel(data: Array[Byte], lang: Lang): Try[Model] = Try { + def readModel(data: Array[Byte], lang: Lang, context: Option[String]): Try[Model] = Try { val model = ModelFactory.createDefaultModel() val dataStream = new ByteArrayInputStream(data) - RDFDataMgr.read(model, dataStream, lang) + val dest = StreamRDFLib.graph(model.getGraph) + val parser = RDFParserBuilder.create() + .source(dataStream) + .base(null) + .lang(lang) + + context.foreach(cs => + parser.context( + jenaContext(CachingContext.parse(cs)) + ) + ) + + parser.parse(dest) model } + def graphToBytes(model: Graph, outputLang: Lang, context: Option[String]): Try[Array[Byte]] = Try { + val str = new ByteArrayOutputStream() + val builder = RDFWriter.create.format(langToFormat(outputLang)) + .source(model) + + context.foreach(ctx => { + val jctx = jenaContext(CachingContext.parse(ctx)) + builder.context(jctx) + builder.set(JsonLDWriter.JSONLD_CONTEXT_SUBSTITUTION, new JsonString(ctx)) + }) + + builder + .build() + .output(str) + str.toByteArray + } + def validateWithShacl(model: Model, shacl: Graph): Try[ValidationReport] = Try( ShaclValidator.get() @@ -168,29 +202,20 @@ object RdfConversions { def validateWithShacl(file: Array[Byte], shaclData: Array[Byte], modelLang: Lang): Try[ValidationReport] = for { - shaclGra <- readModel(shaclData, DefaultShaclLang) - model <- readModel(file, modelLang) + shaclGra <- readModel(shaclData, DefaultShaclLang, contextUri(shaclData, DefaultShaclLang)) + ctxUri = contextUri(file, modelLang) + model <- readModel(file, modelLang, ctxUri) re <- validateWithShacl(model, shaclGra.getGraph) } yield re def validateWithShacl(file: Array[Byte], shaclUri: String, modelLang: Lang): Try[ValidationReport] = for { shaclGra <- Try(RDFDataMgr.loadGraph(shaclUri)) - model <- readModel(file, modelLang) + ctxUri = contextUri(file, modelLang) + model <- readModel(file, modelLang, ctxUri) re <- validateWithShacl(model, shaclGra) } yield re - def graphToBytes(model: Graph, outputLang: Lang, context: Option[String]): Try[Array[Byte]] = Try { - val str = new ByteArrayOutputStream() - val builder = RDFWriter.create.format(langToFormat(outputLang)) - .source(model) - context.foreach(ctx => builder.set(JsonLDWriter.JSONLD_CONTEXT_SUBSTITUTION, new JsonString(ctx))) - builder - .build() - .output(str) - str.toByteArray - } - def langToFormat(lang: Lang): RDFFormat = lang match { case RDFLanguages.TURTLE => RDFFormat.TURTLE_PRETTY case RDFLanguages.TTL => RDFFormat.TTL @@ -203,20 +228,6 @@ object RdfConversions { case RDFLanguages.TRIX => RDFFormat.TRIX } - def jsonLdContextUriString(data: String): Option[String] = { - val jsonObject = JsonUtils.fromString(new String(data)) - Option( - jsonObject - .asInstanceOf[java.util.Map[String, Object]] - .get(JsonLdConsts.CONTEXT) - ) - .map(_.toString) - .flatMap(ctx => Uri.parse(ctx) match { - case Left(_) => None - case Right(uri) => Some(uri.toString()) - }) - } - def mapFilenameToContentType(fn: String): String = fn.split('.').last match { case "ttl" => "text/turtle" @@ -287,6 +298,40 @@ object RdfConversions { bld.append(">") } + // TODO implement extraction of context as an object and then setting it directly + def contextUri(data: Array[Byte], lang: Lang): Option[String] = + if (lang.getName == Lang.JSONLD.getName) jsonLdContextUriString(new String(data)) else None + + private def jsonLdContextUriString(data: String): Option[String] = { + val jsonObject = JsonUtils.fromString(new String(data)) + Option( + jsonObject + .asInstanceOf[java.util.Map[String, Object]] + .get(JsonLdConsts.CONTEXT) + ) + .map(_.toString) + .flatMap(ctx => Uri.parse(ctx) match { + case Left(_) => None + case Right(uri) => Some(uri.toString()) + }) + } + + import com.github.jsonldjava.core.Context + + private def initCachingContext() = { + val opts = new JsonLdOptions(null) + opts.useNamespaces = true + new CachingJsonldContext(30, opts) + } + + private def jenaContext(jsonLdCtx: Context) = { + val context: util.Context = RIOT.getContext.copy() + jsonLdCtx.putAll(jsonLdCtx.getPrefixes(true)) + context.put(JsonLDWriter.JSONLD_CONTEXT, jsonLdCtx) + context.put(JsonLDReader.JSONLD_CONTEXT, jsonLdCtx) + context + } + private def escapeString(s: String) = { val sb = new StringBuilder(s.length()) val slen = s.length() diff --git a/src/test/scala/org/dbpedia/databus/CacheTests.scala b/src/test/scala/org/dbpedia/databus/CacheTests.scala new file mode 100644 index 0000000..14732b2 --- /dev/null +++ b/src/test/scala/org/dbpedia/databus/CacheTests.scala @@ -0,0 +1,50 @@ +package org.dbpedia.databus + +import java.util.UUID + +import org.dbpedia.databus.CachingJsonldContext.ApproxSizeStringKeyCache +import org.scalatest.{FlatSpec, Matchers} + +class CacheTests extends FlatSpec with Matchers { + + "CacheKey" should "be sorted by time of creation" in { + + val caches = + Seq( + new CachingJsonldContext.StringCacheKey("scsc", 0), + new CachingJsonldContext.StringCacheKey("scsc", -10), + new CachingJsonldContext.StringCacheKey("scsc", 100), + new CachingJsonldContext.StringCacheKey("zzzz", 0), + new CachingJsonldContext.StringCacheKey("aaaa", 0) + ) + + caches.sorted.map(k => k.order) should contain theSameElementsInOrderAs (Seq(-10, 0, 0, 0, 100)) + + } + + "CacheKey" should "be equal with same string" in { + val re = new CachingJsonldContext.StringCacheKey("scsc", 0) == new CachingJsonldContext.StringCacheKey("scsc", -10) + re should be(true) + + val re2 = new CachingJsonldContext.StringCacheKey("scsc", 0) == new CachingJsonldContext.StringCacheKey("aaaa", 0) + re2 should be(false) + } + + "ApproxSizeCache" should "not overflow the size" in { + val cache = new ApproxSizeStringKeyCache[Int](10) + val seq = (1 to 100).map(i => (UUID.randomUUID().toString, i)) + seq.foreach(p => cache.put(p._1, p._2)) + + cache.keysSorted.map(_.str) should contain theSameElementsInOrderAs (seq.drop(90).map(_._1)) + } + + "ApproxSizeCache" should "have same size for same string key" in { + val cache = new ApproxSizeStringKeyCache[Int](10) + val seq = Seq("a", "a", "a") + seq.foreach(p => cache.put(p, UUID.randomUUID().hashCode())) + + cache.keysSorted.size should be(1) + } + + +} diff --git a/src/test/scala/org/dbpedia/databus/DatabusScalatraTest.scala b/src/test/scala/org/dbpedia/databus/DatabusScalatraTest.scala index 425c7c8..1e1530a 100644 --- a/src/test/scala/org/dbpedia/databus/DatabusScalatraTest.scala +++ b/src/test/scala/org/dbpedia/databus/DatabusScalatraTest.scala @@ -49,10 +49,7 @@ class DatabusScalatraTest extends ScalatraFlatSpec { } } - "File read" should "work" in { - - val file = "group.jsonld" - val bytes = Files.readAllBytes(Paths.get(getClass.getClassLoader.getResource(file).getFile)) + "File read" should "return 500" in { get("/databus/graph/read?repo=kuckuck&path=pa/not_existing.jsonld") { status should equal(500)