Skip to content
This repository has been archived by the owner on Feb 17, 2023. It is now read-only.

Commit

Permalink
refactor: change SimpleStorage API
Browse files Browse the repository at this point in the history
  • Loading branch information
rockjam committed Jul 3, 2016
1 parent d5d39e8 commit e0f9bb6
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 29 deletions.
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
package im.actor.storage.slick

import java.util.concurrent.ConcurrentHashMap

import com.github.tminglei.slickpg._
import im.actor.storage.Connector
import im.actor.storage.api._
import org.slf4j.LoggerFactory

import scala.collection.mutable
import scala.concurrent.duration._
import scala.concurrent.{Await, ExecutionContext, Future}

Expand All @@ -14,33 +14,36 @@ import im.actor.storage.slick.Driver.api._

class SlickConnector(db: Database)(implicit ec: ExecutionContext) extends Connector {

private val tables = new ConcurrentHashMap[String, Unit]()
private val log = LoggerFactory.getLogger(this.getClass)
private val tables = mutable.Set.empty[String]

override def createSync(name: String): Unit = synchronized {
if(tables.containsKey(name)) {
()
} else {
Await.result(create(name), 10 seconds)
tables.put(name, ())
}
}

override def create(name: String): Future[Unit] =
db.run(sqlu"""CREATE TABLE IF NOT EXISTS #${tableName(name)} (key TEXT, value BYTEA, PRIMARY KEY (key))""") map (_ => ())
override def runSync[R](action: Action[R])(implicit timeout: FiniteDuration): R =
Await.result(run(action), timeout)

override def run[R](action: Action[R]): Future[R] = {
createSync(action.name) // create if not exists
createTableIfNotExists(action.name)
for {
result <- (action match {
case GetAction(name, key) => get(name, key)
case GetByPrefix(name, key) => getByPrefix(name, key)
case PutAction(name, key, value) => put(name, key, value)
case GetByPrefixAction(name, key) => getByPrefix(name, key)
case UpsertAction(name, key, value) => upsert(name, key, value)
case DeleteAction(name, key) => delete(name, key)
case GetKeys(name) => getKeys(name)
case GetKeysAction(name) => getKeys(name)
}).asInstanceOf[Future[R]]
} yield result
}

private def createTableIfNotExists(name: String): Unit = synchronized {
if (!tables.contains(name)) {
val tName = tableName(name)
Await.result(
db.run(sqlu"""CREATE TABLE IF NOT EXISTS #$tName (key TEXT, value BYTEA, PRIMARY KEY (key))""") map (_ => ()),
10 seconds)
log.debug("Created table: {}", tName)
tables += name
}
}

private def tableName(name: String) = s"kv_$name"

private def get(name: String, key: String): Future[Option[Array[Byte]]] =
Expand All @@ -49,8 +52,19 @@ class SlickConnector(db: Database)(implicit ec: ExecutionContext) extends Connec
private def getByPrefix(name: String, keyPrefix: String): Future[Vector[(String, Array[Byte])]] =
db.run(sql"""SELECT (key, value) FROM #${tableName(name)} WHERE key like $keyPrefix%""".as[(String, Array[Byte])])

private def put(name: String, key: String, value: Array[Byte]): Future[Int] =
db.run(sqlu"""INSERT INTO #${tableName(name)} VALUES ($key, $value)""")
private def upsert(name: String, key: String, value: Array[Byte]): Future[Int] = {
val tName = tableName(name)
val action: DBIO[Int] = for {
count <- sql"SELECT COUNT(*) FROM #$tName WHERE KEY = $key".as[Int]
exists = count.headOption.exists(_>0)
result <- if(exists)
sqlu"UPDATE #$tName SET value = $value WHERE key = $key"
else
sqlu"INSERT INTO #$tName VALUES ($key, $value)"

} yield result
db.run(action.transactionally)
}

private def delete(name: String, key: String) =
db.run(sqlu"""DELETE FROM #${tableName(name)} WHERE key = $key""")
Expand Down
Original file line number Diff line number Diff line change
@@ -1,27 +1,26 @@
package im.actor.storage

import scala.concurrent.Future
import scala.concurrent.duration.FiniteDuration

trait Connector {
def run[R](action: api.Action[R]): Future[R]

def createSync(name: String): Unit

def create(name: String): Future[Unit]
def runSync[R](action: api.Action[R])(implicit timeout: FiniteDuration): R
}

class SimpleStorage(val name: String) {
import api._

final def get(key: String) = GetAction(name, key)

final def getByPrefix(keyPrefix: String) = GetByPrefix(name, keyPrefix)
final def getByPrefix(keyPrefix: String) = GetByPrefixAction(name, keyPrefix)

final def put(key: String, value: Array[Byte]) = PutAction(name, key, value)
final def upsert(key: String, value: Array[Byte]) = UpsertAction(name, key, value)

final def delete(key: String) = DeleteAction(name, key)

final def getKeys = GetKeys(name)
final def getKeys = GetKeysAction(name)
}

object api {
Expand All @@ -31,12 +30,12 @@ object api {

final case class GetAction(name: String, key: String) extends Action[Option[Array[Byte]]]

final case class GetByPrefix(name: String, keyPrefix: String) extends Action[Vector[(String, Array[Byte])]]
final case class GetByPrefixAction(name: String, keyPrefix: String) extends Action[Vector[(String, Array[Byte])]]

final case class PutAction(name: String, key: String, value: Array[Byte]) extends Action[Int]
final case class UpsertAction(name: String, key: String, value: Array[Byte]) extends Action[Int]

final case class DeleteAction(name: String, key: String) extends Action[Int]

final case class GetKeys(name: String) extends Action[Seq[String]]
final case class GetKeysAction(name: String) extends Action[Seq[String]]

}

0 comments on commit e0f9bb6

Please sign in to comment.