Skip to content

Commit

Permalink
🐛 Neptune ID Fix (#177)
Browse files Browse the repository at this point in the history
* Initial ID mapping, let's see where this goes

* Making some progress

* wip bulkk delete

* wip bulk override

* Further separation of bulk transaction classes

* Handle ODB -> Neptune

* Id null safe

* Overrided wholegraph

* Seeing if mapVertex is fixed

* Enabling stdout

* THink I fixed mapvertex

* Resetting id trackker

* Fix id alloc

* Handling null under delete vertex

* Ready for completion

* Added neptune coverage data

* Documented changes

* Removed `GremlinOverriddenIdDriver` as it is no longer used.

* Added neptune coverage to jacoco
  • Loading branch information
DavidBakerEffendi authored Apr 13, 2021
1 parent 2fdfa3f commit a0123fb
Show file tree
Hide file tree
Showing 8 changed files with 210 additions and 92 deletions.
7 changes: 5 additions & 2 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,11 @@ gradle-app.setting
.project
.settings
.classpath
/src/test/resources/conf/SFSRootCAG2.pem
/sootOutput/

# Misc build
/plume/src/test/resources/conf/SFSRootCAG2.pem
/plume/build/
!/plume/build/jacoco/neptuneIntTest.exec
/sootOutput/
/plume/sootOutput/
/testing/build/
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,13 @@ and this project adheres to [Semantic Versioning](http://semver.org/).

- `TigerGraphDriver` now has timeout as a configurable parameter.

### Fixed

- Neptune driver by mapping `Long` IDs to Neptune's native `String` IDs

### Changed

- Removed `GremlinOverriddenIdDriver` as it is no longer used.

## [0.4.4] - 2021-04-12

Expand Down
2 changes: 1 addition & 1 deletion plume/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ jacocoTestReport {
csv.enabled false
}
executionData test, extractorTest, tinkerGraphIntTest, overflowDbIntTest, janusGraphIntTest, tigerGraphIntTest,
neo4jIntTest
neo4jIntTest, neptuneIntTest
}

check.dependsOn jacocoTestCoverageVerification
Expand Down
Binary file added plume/build/jacoco/neptuneIntTest.exec
Binary file not shown.
63 changes: 41 additions & 22 deletions plume/src/main/kotlin/io/github/plume/oss/drivers/GremlinDriver.kt
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package io.github.plume.oss.drivers

import io.github.plume.oss.domain.exceptions.PlumeSchemaViolationException
import io.github.plume.oss.domain.mappers.ListMapper
import io.github.plume.oss.domain.mappers.VertexMapper
import io.github.plume.oss.domain.mappers.VertexMapper.checkSchemaConstraints
import io.github.plume.oss.domain.model.DeltaGraph
Expand All @@ -39,6 +40,7 @@ import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerGraph
import overflowdb.Config
import overflowdb.Node
import scala.collection.immutable.`$colon$colon`
import scala.collection.immutable.`Nil$`
import scala.jdk.CollectionConverters
import java.util.*
import io.shiftleft.codepropertygraph.generated.edges.Factories as EdgeFactories
Expand Down Expand Up @@ -138,24 +140,41 @@ abstract class GremlinDriver : IDriver {
val eAdds = mutableListOf<DeltaGraph.EdgeAdd>()
val vDels = mutableListOf<DeltaGraph.VertexDelete>()
val eDels = mutableListOf<DeltaGraph.EdgeDelete>()
PlumeTimer.measure(ExtractorTimeKey.DATABASE_READ) {
dg.changes.filterIsInstance<DeltaGraph.VertexAdd>().map { it.n }.toCollection(vAdds)
dg.changes.filterIsInstance<DeltaGraph.EdgeAdd>().toCollection(eAdds)
dg.changes.filterIsInstance<DeltaGraph.VertexDelete>().filter { g.V(it.id).hasNext() }
.toCollection(vDels)
dg.changes.filterIsInstance<DeltaGraph.EdgeDelete>().filter { exists(it.src, it.dst, it.e) }
.toCollection(eDels)
}
PlumeTimer.measure(ExtractorTimeKey.DATABASE_WRITE) {
val tx = if (graph.features().graph().supportsTransactions()) g.tx() else null
tx?.open()
vAdds.forEach { if (!exists(it)) createVertex(it) }
tx?.commit(); tx?.open()
eAdds.forEach { addEdge(it.src, it.dst, it.e) }
vDels.forEach { deleteVertex(it.id, it.label) }
eDels.forEach { g.V(it.src.id()).outE(it.e).where(un.otherV().hasId(it.dst.id())).drop().iterate() }
tx?.commit()
PlumeTimer.measure(ExtractorTimeKey.DATABASE_READ) { bulkTxReads(dg, vAdds, eAdds, vDels, eDels) }
PlumeTimer.measure(ExtractorTimeKey.DATABASE_WRITE) { bulkTxWrites(vAdds, eAdds, vDels, eDels) }
}

protected open fun bulkTxReads(
dg: DeltaGraph,
vAdds: MutableList<NewNodeBuilder>,
eAdds: MutableList<DeltaGraph.EdgeAdd>,
vDels: MutableList<DeltaGraph.VertexDelete>,
eDels: MutableList<DeltaGraph.EdgeDelete>,
) {
dg.changes.filterIsInstance<DeltaGraph.VertexAdd>().map { it.n }.toCollection(vAdds)
dg.changes.filterIsInstance<DeltaGraph.EdgeAdd>().toCollection(eAdds)
dg.changes.filterIsInstance<DeltaGraph.VertexDelete>().filter { g.V(it.id).hasNext() }
.toCollection(vDels)
dg.changes.filterIsInstance<DeltaGraph.EdgeDelete>().filter { exists(it.src, it.dst, it.e) }
.toCollection(eDels)
}

protected open fun bulkTxWrites(
vAdds: MutableList<NewNodeBuilder>,
eAdds: MutableList<DeltaGraph.EdgeAdd>,
vDels: MutableList<DeltaGraph.VertexDelete>,
eDels: MutableList<DeltaGraph.EdgeDelete>,
) {
val tx = if (graph.features().graph().supportsTransactions()) g.tx() else null
tx?.open()
vAdds.forEach { if (!exists(it)) createVertex(it) }
tx?.commit(); tx?.open()
eAdds.forEach { addEdge(it.src, it.dst, it.e) }
vDels.forEach { deleteVertex(it.id, it.label) }
eDels.forEach {
findVertexTraversal(it.src).outE(it.e).where(un.otherV().V(findVertexTraversal(it.dst))).drop().iterate()
}
tx?.commit()
}

override fun clearGraph() = apply {
Expand Down Expand Up @@ -184,12 +203,12 @@ abstract class GremlinDriver : IDriver {
CollectionConverters.MapHasAsJava(v.build().properties()).asJava()
.mapValues { (_, value) ->
when (value) {
is `$colon$colon`<*> -> value.head()
is `$colon$colon`<*> -> ListMapper.scalaListToString(value)
is `Nil$` -> ListMapper.scalaListToString(value)
else -> value
}
}.toMap()


/**
* Wrapper method for creating an edge between two vertices.
*
Expand Down Expand Up @@ -288,7 +307,7 @@ abstract class GremlinDriver : IDriver {
n.properties().foreachEntry { key, value -> newNode.setProperty(key, value) }
}
PlumeTimer.measure(ExtractorTimeKey.DATABASE_READ) {
g.V(v.id())
findVertexTraversal(v)
.repeat(un.outE(AST).bothV())
.times(1)
.inE()
Expand Down Expand Up @@ -429,7 +448,7 @@ abstract class GremlinDriver : IDriver {
return overflowGraph
}

private fun addNodeToODB(graph: overflowdb.Graph, nBuilder: NewNodeBuilder): Node? {
protected fun addNodeToODB(graph: overflowdb.Graph, nBuilder: NewNodeBuilder): Node? {
val n = nBuilder.build()
val maybeNode = graph.node(nBuilder.id())
return if (maybeNode != null) return maybeNode
Expand All @@ -438,7 +457,7 @@ abstract class GremlinDriver : IDriver {
}
}

private fun newOverflowGraph(): overflowdb.Graph = overflowdb.Graph.open(
protected fun newOverflowGraph(): overflowdb.Graph = overflowdb.Graph.open(
Config.withDefaults(),
NodeFactories.allAsJava(),
EdgeFactories.allAsJava()
Expand Down

This file was deleted.

159 changes: 156 additions & 3 deletions plume/src/main/kotlin/io/github/plume/oss/drivers/NeptuneDriver.kt
Original file line number Diff line number Diff line change
Expand Up @@ -15,24 +15,40 @@
*/
package io.github.plume.oss.drivers

import io.github.plume.oss.domain.mappers.ListMapper
import io.github.plume.oss.domain.mappers.VertexMapper
import io.github.plume.oss.domain.model.DeltaGraph
import io.github.plume.oss.metrics.ExtractorTimeKey
import io.github.plume.oss.metrics.PlumeTimer
import io.github.plume.oss.store.LocalCache
import io.github.plume.oss.store.PlumeStorage
import io.shiftleft.codepropertygraph.generated.nodes.NewNodeBuilder
import org.apache.logging.log4j.LogManager
import org.apache.tinkerpop.gremlin.driver.Cluster
import org.apache.tinkerpop.gremlin.driver.remote.DriverRemoteConnection
import org.apache.tinkerpop.gremlin.process.traversal.AnonymousTraversalSource.traversal
import org.apache.tinkerpop.gremlin.process.traversal.Order
import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversal
import org.apache.tinkerpop.gremlin.process.traversal.step.util.WithOptions
import org.apache.tinkerpop.gremlin.structure.Graph
import org.apache.tinkerpop.gremlin.structure.T
import org.apache.tinkerpop.gremlin.structure.Vertex
import scala.collection.immutable.`$colon$colon`
import scala.collection.immutable.`Nil$`
import scala.jdk.CollectionConverters
import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.`__` as un


/**
* The driver used to connect to a remote Amazon Neptune instance.
*/
class NeptuneDriver internal constructor() : GremlinOverriddenIdDriver() {
class NeptuneDriver internal constructor() : GremlinDriver() {
private val logger = LogManager.getLogger(NeptuneDriver::class.java)

private val builder: Cluster.Builder = Cluster.build()
private lateinit var cluster: Cluster
private val idMapper = mutableMapOf<Long, String>()
private var id: Long = 0

init {
builder.port(DEFAULT_PORT).enableSsl(true)
Expand Down Expand Up @@ -74,6 +90,28 @@ class NeptuneDriver internal constructor() : GremlinOverriddenIdDriver() {
super.g = traversal().withRemote(DriverRemoteConnection.using(cluster))
graph = g.graph
connected = true
populateIdMapper()
}

private fun resetIdMapper() {
idMapper.clear()
idMapper[-1L] = "null"
id = 0
}

/**
* When connecting to a database with a subgraph already loaded, create a mapping for existing graph data.
*/
private fun populateIdMapper() {
resetIdMapper()
val vCount = g.V().count().next()
var inc = 0L
val loadedIds = idMapper.values.toSet()
(1..vCount).chunked(10000).map { Pair(it.minOrNull() ?: 0L, it.maxOrNull() ?: 10000) }
.flatMap { (l, h) -> g.V().order().by(T.id, Order.asc).range(l, h).id().toList().map { it.toString() } }
.filterNot(loadedIds::contains)
.forEach { id -> idMapper[inc++] = id }
id = idMapper.keys.maxOrNull() ?: 0L
}

/**
Expand All @@ -88,9 +126,24 @@ class NeptuneDriver internal constructor() : GremlinOverriddenIdDriver() {
connected = false
} catch (e: Exception) {
logger.warn("Exception thrown while attempting to close graph.", e)
} finally {
// Have to also clear the cache otherwise the IDs won't be mapped correctly
LocalCache.clear()
PlumeStorage.clear()
resetIdMapper()
}
}

override fun findVertexTraversal(v: NewNodeBuilder): GraphTraversal<Vertex, Vertex> {
var result: GraphTraversal<Vertex, Vertex>? = null
PlumeTimer.measure(ExtractorTimeKey.DATABASE_READ) {
val strId = idMapper[v.id()]
result = if (strId != null) g.V(strId)
else g.V("null")
}
return result!!
}

/**
* Given a [NewNodeBuilder], creates a [Vertex] and translates the object's field properties to key-value
* pairs on the [Vertex] object. This is then added to this driver's [Graph].
Expand All @@ -100,9 +153,109 @@ class NeptuneDriver internal constructor() : GremlinOverriddenIdDriver() {
*/
override fun createVertex(v: NewNodeBuilder): Vertex {
val propertyMap = prepareVertexProperties(v)
var traversalPointer = g.addV(v.build().label()).property(T.id, v.id())
var traversalPointer = g.addV(v.build().label())
for ((key, value) in propertyMap) traversalPointer = traversalPointer.property(key, value)
return traversalPointer.next()
return traversalPointer.next().apply {
val newId = id++
idMapper[newId] = this.id().toString()
v.id(newId)
}
}

override fun deleteVertex(id: Long, label: String?) {
val mappedId = idMapper[id]
var res = false
PlumeTimer.measure(ExtractorTimeKey.DATABASE_READ) {
res = if (mappedId != null) g.V(mappedId).hasNext()
else false
}
if (!res) return
PlumeTimer.measure(ExtractorTimeKey.DATABASE_WRITE) { g.V(mappedId).drop().iterate() }
idMapper.remove(id)
}

override fun deleteEdge(src: NewNodeBuilder, tgt: NewNodeBuilder, edge: String) {
if (!exists(src, tgt, edge)) return
PlumeTimer.measure(ExtractorTimeKey.DATABASE_WRITE) {
val srcId = idMapper[src.id()]
val dstId = idMapper[tgt.id()]
g.V(srcId).outE(edge).where(un.otherV().hasId(dstId)).drop().iterate()
}
}

// This handles Neptune -> ODB
override fun mapVertexKeys(props: Map<Any, Any>): Map<String, Any> {
val outM = mutableMapOf<String, Any>()
props.filterKeys { it != T.id }.mapKeys { it.key.toString() }.toMap(outM)
val id = props.getOrDefault(T.id, "null")
idMapper.entries.find { it.value == id }?.let { idL -> outM["id"] = idL.key }
return outM
}

// This handles ODB -> Neptune
override fun prepareVertexProperties(v: NewNodeBuilder): Map<String, Any> {
val outMap = CollectionConverters.MapHasAsJava(v.build().properties()).asJava()
.mapValues { (_, value) ->
when (value) {
is `$colon$colon`<*> -> ListMapper.scalaListToString(value)
is `Nil$` -> ListMapper.scalaListToString(value)
else -> value
}
}.toMutableMap()
if (outMap.containsKey("id")) {
outMap["id"] = idMapper[outMap["id"]]
}
return outMap
}

override fun bulkTxReads(
dg: DeltaGraph,
vAdds: MutableList<NewNodeBuilder>,
eAdds: MutableList<DeltaGraph.EdgeAdd>,
vDels: MutableList<DeltaGraph.VertexDelete>,
eDels: MutableList<DeltaGraph.EdgeDelete>,
) {
dg.changes.filterIsInstance<DeltaGraph.VertexAdd>().map { it.n }.toCollection(vAdds)
dg.changes.filterIsInstance<DeltaGraph.EdgeAdd>().toCollection(eAdds)
dg.changes.filterIsInstance<DeltaGraph.VertexDelete>().filter { g.V(idMapper[it.id]).hasNext() }
.toCollection(vDels)
dg.changes.filterIsInstance<DeltaGraph.EdgeDelete>().filter { exists(it.src, it.dst, it.e) }
.toCollection(eDels)
}

override fun updateVertexProperty(id: Long, label: String?, key: String, value: Any) {
var res = false
PlumeTimer.measure(ExtractorTimeKey.DATABASE_READ) { res = g.V(idMapper[id]).hasNext() }
if (!res) return
PlumeTimer.measure(ExtractorTimeKey.DATABASE_WRITE) { g.V(idMapper[id]).property(key, value).iterate() }
}

override fun getWholeGraph(): overflowdb.Graph {
val graph = newOverflowGraph()
PlumeTimer.measure(ExtractorTimeKey.DATABASE_READ) {
g.V().valueMap<Any>()
.by(un.unfold<Any>())
.with(WithOptions.tokens).toList()
.map { VertexMapper.mapToVertex(mapVertexKeys(it)) }
.forEach { addNodeToODB(graph, it) }
g.E().toList()
.map { e ->
Triple(
graph.node(idMapper.entries.find { it.value == e.outVertex().id() }!!.key),
graph.node(idMapper.entries.find { it.value == e.inVertex().id() }!!.key),
e.label()
)
}
.forEach { (src, dst, e) ->
src?.addEdge(e, dst)
}
}
return graph
}

override fun clearGraph(): GremlinDriver {
resetIdMapper()
return super.clearGraph()
}

companion object {
Expand Down
Loading

0 comments on commit a0123fb

Please sign in to comment.