diff --git a/actor-storage-slick/src/main/scala/im/actor/storage/slick/SlickConnector.scala b/actor-storage-slick/src/main/scala/im/actor/storage/slick/SlickConnector.scala index 30fa8be..8bef41f 100644 --- a/actor-storage-slick/src/main/scala/im/actor/storage/slick/SlickConnector.scala +++ b/actor-storage-slick/src/main/scala/im/actor/storage/slick/SlickConnector.scala @@ -1,11 +1,11 @@ package im.actor.storage.slick -import java.util.concurrent.ConcurrentHashMap - import com.github.tminglei.slickpg._ import im.actor.storage.Connector import im.actor.storage.api._ +import org.slf4j.LoggerFactory +import scala.collection.mutable import scala.concurrent.duration._ import scala.concurrent.{Await, ExecutionContext, Future} @@ -14,33 +14,36 @@ import im.actor.storage.slick.Driver.api._ class SlickConnector(db: Database)(implicit ec: ExecutionContext) extends Connector { - private val tables = new ConcurrentHashMap[String, Unit]() + private val log = LoggerFactory.getLogger(this.getClass) + private val tables = mutable.Set.empty[String] - override def createSync(name: String): Unit = synchronized { - if(tables.containsKey(name)) { - () - } else { - Await.result(create(name), 10 seconds) - tables.put(name, ()) - } - } - - override def create(name: String): Future[Unit] = - db.run(sqlu"""CREATE TABLE IF NOT EXISTS #${tableName(name)} (key TEXT, value BYTEA, PRIMARY KEY (key))""") map (_ => ()) + override def runSync[R](action: Action[R])(implicit timeout: FiniteDuration): R = + Await.result(run(action), timeout) override def run[R](action: Action[R]): Future[R] = { - createSync(action.name) // create if not exists + createTableIfNotExists(action.name) for { result <- (action match { case GetAction(name, key) => get(name, key) - case GetByPrefix(name, key) => getByPrefix(name, key) - case PutAction(name, key, value) => put(name, key, value) + case GetByPrefixAction(name, key) => getByPrefix(name, key) + case UpsertAction(name, key, value) => upsert(name, key, value) case DeleteAction(name, key) => delete(name, key) - case GetKeys(name) => getKeys(name) + case GetKeysAction(name) => getKeys(name) }).asInstanceOf[Future[R]] } yield result } + private def createTableIfNotExists(name: String): Unit = synchronized { + if (!tables.contains(name)) { + val tName = tableName(name) + Await.result( + db.run(sqlu"""CREATE TABLE IF NOT EXISTS #$tName (key TEXT, value BYTEA, PRIMARY KEY (key))""") map (_ => ()), + 10 seconds) + log.debug("Created table: {}", tName) + tables += name + } + } + private def tableName(name: String) = s"kv_$name" private def get(name: String, key: String): Future[Option[Array[Byte]]] = @@ -49,8 +52,19 @@ class SlickConnector(db: Database)(implicit ec: ExecutionContext) extends Connec private def getByPrefix(name: String, keyPrefix: String): Future[Vector[(String, Array[Byte])]] = db.run(sql"""SELECT (key, value) FROM #${tableName(name)} WHERE key like $keyPrefix%""".as[(String, Array[Byte])]) - private def put(name: String, key: String, value: Array[Byte]): Future[Int] = - db.run(sqlu"""INSERT INTO #${tableName(name)} VALUES ($key, $value)""") + private def upsert(name: String, key: String, value: Array[Byte]): Future[Int] = { + val tName = tableName(name) + val action: DBIO[Int] = for { + count <- sql"SELECT COUNT(*) FROM #$tName WHERE KEY = $key".as[Int] + exists = count.headOption.exists(_>0) + result <- if(exists) + sqlu"UPDATE #$tName SET value = $value WHERE key = $key" + else + sqlu"INSERT INTO #$tName VALUES ($key, $value)" + + } yield result + db.run(action.transactionally) + } private def delete(name: String, key: String) = db.run(sqlu"""DELETE FROM #${tableName(name)} WHERE key = $key""") diff --git a/actor-storage/src/main/scala/im/actor/storage/SimpleStorage.scala b/actor-storage/src/main/scala/im/actor/storage/SimpleStorage.scala index ea748e7..669e96d 100644 --- a/actor-storage/src/main/scala/im/actor/storage/SimpleStorage.scala +++ b/actor-storage/src/main/scala/im/actor/storage/SimpleStorage.scala @@ -1,13 +1,12 @@ package im.actor.storage import scala.concurrent.Future +import scala.concurrent.duration.FiniteDuration trait Connector { def run[R](action: api.Action[R]): Future[R] - def createSync(name: String): Unit - - def create(name: String): Future[Unit] + def runSync[R](action: api.Action[R])(implicit timeout: FiniteDuration): R } class SimpleStorage(val name: String) { @@ -15,13 +14,13 @@ class SimpleStorage(val name: String) { final def get(key: String) = GetAction(name, key) - final def getByPrefix(keyPrefix: String) = GetByPrefix(name, keyPrefix) + final def getByPrefix(keyPrefix: String) = GetByPrefixAction(name, keyPrefix) - final def put(key: String, value: Array[Byte]) = PutAction(name, key, value) + final def upsert(key: String, value: Array[Byte]) = UpsertAction(name, key, value) final def delete(key: String) = DeleteAction(name, key) - final def getKeys = GetKeys(name) + final def getKeys = GetKeysAction(name) } object api { @@ -31,12 +30,12 @@ object api { final case class GetAction(name: String, key: String) extends Action[Option[Array[Byte]]] - final case class GetByPrefix(name: String, keyPrefix: String) extends Action[Vector[(String, Array[Byte])]] + final case class GetByPrefixAction(name: String, keyPrefix: String) extends Action[Vector[(String, Array[Byte])]] - final case class PutAction(name: String, key: String, value: Array[Byte]) extends Action[Int] + final case class UpsertAction(name: String, key: String, value: Array[Byte]) extends Action[Int] final case class DeleteAction(name: String, key: String) extends Action[Int] - final case class GetKeys(name: String) extends Action[Seq[String]] + final case class GetKeysAction(name: String) extends Action[Seq[String]] }