Skip to content

Commit

Permalink
region-hash to region-name mapping is now stored in a own table and t…
Browse files Browse the repository at this point in the history
…he region-name can now be as lengthy as 65536 characters #27
  • Loading branch information
Nils Kübler committed May 20, 2014
1 parent fa29526 commit a95cfee
Show file tree
Hide file tree
Showing 4 changed files with 250 additions and 108 deletions.
217 changes: 109 additions & 108 deletions app/models/Metric.scala
Original file line number Diff line number Diff line change
Expand Up @@ -9,138 +9,137 @@ import play.api.Play.current
import play.api.db.DB
import play.api.Logger
import models.MetricDef._
import collection.mutable.{ListBuffer, MutableList}
import org.apache.hadoop.hbase.util.Bytes
import utils.ByteUtil
import java.security.MessageDigest

object MetricDef {

val MAX_TARGET_DESC_LENGTH = 1000
/**
* Note:
* The metrics were originally designed to be able to hold metrics other than region-based metrics, thus
* the paramater is called MetricDef.target and not regionHash. However currently, there is always regions stored in
* the metrics, so that currently the following is true:
* - target is always mapped to the hashed region name
* - targetDesc is always mapped to the region name
*/
object MetricDef {

val ALL_REGION_METRICS = Set("storefileSizeMB", "memstoreSizeMB", "storefiles", "compactions")

val STOREFILE_SIZE_MB = "storefileSizeMB"
def STOREFILE_SIZE_MB(region:String) : MetricDef = findRegionMetricDef(region, STOREFILE_SIZE_MB)

def STOREFILE_SIZE_MB(region: String): MetricDef = findRegionMetricDef(region, STOREFILE_SIZE_MB)

val MEMSTORE_SIZE_MB = "memstoreSizeMB"
def MEMSTORE_SIZE_MB(region:String) : MetricDef = findRegionMetricDef(region, MEMSTORE_SIZE_MB)

val STOREFILES = "storefiles"
def STOREFILES(region:String) : MetricDef = findRegionMetricDef(region, STOREFILES)
def MEMSTORE_SIZE_MB(region: String): MetricDef = findRegionMetricDef(region, MEMSTORE_SIZE_MB)

val STOREFILES = "storefiles"

def STOREFILES(region: String): MetricDef = findRegionMetricDef(region, STOREFILES)

val COMPACTIONS = "compactions"
def COMPACTIONS(region: String) : MetricDef = findRegionMetricDef(region, COMPACTIONS)

def findRegionMetricDef(region: String, name: String) = find(hash(region), name, region)

def find(target: String, name: String, targetDesc: String) = {
DB.withConnection { implicit c =>
val stream = SQL_FIND_METRIC.on("name" -> name, "target" -> target)()

val truncatedTargetDesc = if(targetDesc.length > MAX_TARGET_DESC_LENGTH) {
Logger.warn("truncating long region-name to %d characters".format(MAX_TARGET_DESC_LENGTH))
targetDesc.substring(0, MAX_TARGET_DESC_LENGTH)
} else {
targetDesc
}

if(stream.isEmpty) {
Logger.info("creating new metric for " + target + " : " + name)
val id = SQL_INSERT_METRIC.on("target" -> target, "name" -> name, "target_desc" -> truncatedTargetDesc).executeInsert()
MetricDef(id.get, target, name, 0.0, 0, truncatedTargetDesc)
} else {
val row = stream.head
if(row[String]("target_desc") != truncatedTargetDesc)
{
Logger.info("updating metric targetDesc in old metric '" + target +"' for " + name)
SQL_MIGRATE_METRIC_3.on("id" -> row[Long]("id"), "target_desc" -> truncatedTargetDesc).executeUpdate()
}
MetricDef(
row[Long]("id"),
row[String]("target"),
row[String]("name"),
row[Double]("last_value"),
row[Long]("last_update"),
truncatedTargetDesc
)
}
}
}

def findByName(name: String):Seq[MetricDef] = {
DB.withConnection { implicit c =>
val stream = SQL_FIND_METRIC_ALL.on("name" -> name)()
def COMPACTIONS(region: String): MetricDef = findRegionMetricDef(region, COMPACTIONS)

def findRegionMetricDef(region: String, name: String) = find(RegionHash.byName(region).hash, name, region)

def find(target: String, name: String, targetDesc:String) = {
DB.withConnection {
implicit c =>
val stream = SQL_FIND_METRIC.on("name" -> name, "target" -> target)()

if (stream.isEmpty) {
Logger.info("creating new metric for " + target + " : " + name)
val id = SQL_INSERT_METRIC.on("target" -> target, "name" -> name).executeInsert()
MetricDef(id.get, target, name, 0.0, 0, targetDesc)
} else {
val row = stream.head

if(stream.isEmpty) {
Logger.info("no metrics found for : " + name)
List()
} else {
stream.map( row => {
MetricDef(
row[Long]("id"),
row[String]("target"),
target,
row[String]("name"),
row[Double]("last_value"),
row[Long]("last_update"),
row[String]("target_desc")
targetDesc // this is currently always the region name
)
}).toList
}
}
}
}

def findByName(name: String): Seq[MetricDef] = {
DB.withConnection {
implicit c =>
val stream = SQL_FIND_METRIC_ALL.on("name" -> name)()

if (stream.isEmpty) {
Logger.info("no metrics found for : " + name)
List()
} else {
stream.map(row => {
val regionHash = RegionHash.byHash(row[String]("target"))
// NOTE: actually, using the RegionHash here is against the intended design. Metrics itself were not
// designed specific for regions. However we currently don't use the metrics for anything else,
// so it's OK for now to resolve the targetDesc by using the RegionHash.
MetricDef(
row[Long]("id"),
regionHash.hash,
row[String]("name"),
row[Double]("last_value"),
row[Long]("last_update"),
regionHash.name
)
}).toList
}
}
}

def clean(until: Long = now() - 1000 * 3600 * 24 * 7) = {
var recordsCleaned = 0;
var metricsCleaned = 0;
DB.withConnection { implicit c =>
recordsCleaned = SQL_DELETE_RECORDS.on("until" -> until).executeUpdate()
metricsCleaned = SQL_DELETE_METRICS.on("until" -> until).executeUpdate()
DB.withConnection {
implicit c =>
recordsCleaned = SQL_DELETE_RECORDS.on("until" -> until).executeUpdate()
metricsCleaned = SQL_DELETE_METRICS.on("until" -> until).executeUpdate()
}
Tuple2(metricsCleaned, recordsCleaned);
}

def now() = new java.util.Date().getTime()

def hash(value:String) = ByteUtil.toHexString(MessageDigest.getInstance("MD5").digest(Bytes.toBytes(value)))

val SQL_FIND_METRIC_ALL = SQL("""
val SQL_FIND_METRIC_ALL = SQL( """
SELECT
id, target, name, last_value, last_update, target_desc
id, target, name, last_value, last_update
FROM
metric
WHERE
name={name}
""")
""")

val SQL_FIND_METRIC = SQL("""
val SQL_FIND_METRIC = SQL( """
SELECT
id, target, name, last_value, last_update, target_desc
id, target, name, last_value, last_update
FROM
metric
WHERE
name={name} AND target={target}
""")
""")

val SQL_INSERT_METRIC = SQL("""
val SQL_INSERT_METRIC = SQL( """
INSERT INTO
metric(target, name, last_value, last_update, target_desc)
metric(target, name, last_value, last_update)
VALUES
({target}, {name}, 0.0, 0, {target_desc})
""")
({target}, {name}, 0.0, 0)
""")

val SQL_UPDATE_METRIC = SQL("UPDATE metric SET last_value={last_value}, last_update={last_update} WHERE id={id}")

val SQL_MIGRATE_METRIC_3 = SQL("UPDATE metric SET target_desc={target_desc} WHERE id={id}")

val SQL_INSERT_RECORD = SQL("""
val SQL_INSERT_RECORD = SQL( """
INSERT INTO
record(metric_id, timestamp, prev_value, value)
VALUES
({metric_id}, {timestamp}, {prev_value}, {value})
""")
""")

val SQL_FIND_RECORDS = SQL("""
val SQL_FIND_RECORDS = SQL( """
SELECT
timestamp, prev_value, value
FROM
Expand All @@ -149,59 +148,61 @@ object MetricDef {
metric_id = {metric_id} AND timestamp > {since} AND timestamp <= {until}
ORDER BY
timestamp
""")
""")

val SQL_DELETE_RECORDS = SQL("""
val SQL_DELETE_RECORDS = SQL( """
DELETE FROM
record
WHERE
timestamp < {until}
""")
""")

val SQL_DELETE_METRICS = SQL("""
val SQL_DELETE_METRICS = SQL( """
DELETE FROM
metric
WHERE
last_update < {until}
""")
""")
}

case class MetricDef(id: Long, target: String, name: String, var lastValue: Double, var lastUpdate: Long, targetDesc:String) {
def update(value: Double, timestamp:Long = now) = {
case class MetricDef(id: Long, target: String, name: String, var lastValue: Double, var lastUpdate: Long, var targetDesc: String) {
def update(value: Double, timestamp: Long = now) = {
var updated = false
DB.withConnection { implicit c =>
if(lastValue != value) {
SQL_INSERT_RECORD.on("metric_id" -> id, "timestamp" -> timestamp, "prev_value" -> lastValue, "value" -> value).executeInsert()
lastValue = value
updated = true
}
if(lastUpdate < timestamp) {
lastUpdate = timestamp
SQL_UPDATE_METRIC.on("id" -> id, "last_update" -> lastUpdate, "last_value" -> lastValue).executeUpdate()
}
DB.withConnection {
implicit c =>
if (lastValue != value) {
SQL_INSERT_RECORD.on("metric_id" -> id, "timestamp" -> timestamp, "prev_value" -> lastValue, "value" -> value).executeInsert()
lastValue = value
updated = true
}
if (lastUpdate < timestamp) {
lastUpdate = timestamp
SQL_UPDATE_METRIC.on("id" -> id, "last_update" -> lastUpdate, "last_value" -> lastValue).executeUpdate()
}
}
updated
}

def metric(since: Long, until: Long) : Metric = {
var values:List[MetricRecord] = null;
var prevValue:Option[Double] = None;
DB.withConnection { implicit c =>
values = SQL_FIND_RECORDS.on("metric_id" -> id, "since" -> since, "until" -> until)().map( row => {
if (prevValue == None) {
prevValue = Some(row[Double]("prev_value"))
}
MetricRecord(row[Long]("timestamp"), row[Double]("value"));
}).toList
def metric(since: Long, until: Long): Metric = {
var values: List[MetricRecord] = null;
var prevValue: Option[Double] = None;
DB.withConnection {
implicit c =>
values = SQL_FIND_RECORDS.on("metric_id" -> id, "since" -> since, "until" -> until)().map(row => {
if (prevValue == None) {
prevValue = Some(row[Double]("prev_value"))
}
MetricRecord(row[Long]("timestamp"), row[Double]("value"));
}).toList
}
if(values.size < 1)
if (values.size < 1)
Metric(name, target, since, until, values, lastValue, lastUpdate == 0, targetDesc)
else
Metric(name, target, since, until, values, prevValue.get, lastUpdate == 0, targetDesc)
}
}

case class Metric(name: String, target: String, begin: Long, end: Long, values: Seq[MetricRecord], prevValue: Double, isEmpty: Boolean, targetDesc:String)
case class Metric(name: String, target: String, begin: Long, end: Long, values: Seq[MetricRecord], prevValue: Double, isEmpty: Boolean, targetDesc: String)

case class MetricRecord(ts: Long, v: Double)

Expand Down
75 changes: 75 additions & 0 deletions app/models/RegionHash.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package models

import utils.ByteUtil
import java.security.MessageDigest
import org.apache.hadoop.hbase.util.Bytes
import scala.collection.mutable
import anorm._
import play.api.db.DB
import play.api.Logger
import play.api.Play.current

/**
* Created by nkuebler on 20/05/14.
*/
case class RegionHash(name:String, hash:String) {
}

object RegionHash {

def byName(name:String) = {
val hash:String = nameToHash.get(name).getOrElse {
val hashedName = this.hash(name)
DB.withConnection { implicit c =>
val stream = SQL_FIND_NAME.on("hash" -> hashedName)()
if(stream.isEmpty) { // add record if not yet exists
SQL_INSERT_NAME.on("hash" -> hashedName, "name" -> name).executeInsert()
}
}
hashToName.put(hashedName, name)
nameToHash.put(name, hashedName)
hashedName
}
RegionHash(name, hash)
}

def byHash(hash:String) : RegionHash = {
val name = hashToName.get(hash).getOrElse {
DB.withConnection { implicit c =>
val stream = SQL_FIND_NAME.on("hash" -> hash)()
if(stream.isEmpty) {
Logger.info("no region found for hash: " +hash)
"-unknown-"
} else {
val row = stream.head
val name = row[String]("name")
hashToName.put(hash, name)
nameToHash.put(name, hash)
name
}
}
}
RegionHash(name, hash)
}

def hash(value:String) = ByteUtil.toHexString(MessageDigest.getInstance("MD5").digest(Bytes.toBytes(value)))

val hashToName:mutable.HashMap[String, String] = new mutable.HashMap()
val nameToHash:mutable.HashMap[String, String] = new mutable.HashMap()

val SQL_FIND_NAME = SQL("""
SELECT
name
FROM
region
WHERE
hash={hash}
""")

val SQL_INSERT_NAME = SQL("""
INSERT INTO
region(hash, name)
VALUES
({hash}, {name})
""")
}
Loading

0 comments on commit a95cfee

Please sign in to comment.