diff --git a/app/models/Metric.scala b/app/models/Metric.scala index 6e2ea61..c6fd3fb 100644 --- a/app/models/Metric.scala +++ b/app/models/Metric.scala @@ -9,138 +9,137 @@ import play.api.Play.current import play.api.db.DB import play.api.Logger import models.MetricDef._ -import collection.mutable.{ListBuffer, MutableList} -import org.apache.hadoop.hbase.util.Bytes -import utils.ByteUtil -import java.security.MessageDigest -object MetricDef { - val MAX_TARGET_DESC_LENGTH = 1000 +/** + * Note: + * The metrics were originally designed to be able to hold metrics other than region-based metrics, thus + * the paramater is called MetricDef.target and not regionHash. However currently, there is always regions stored in + * the metrics, so that currently the following is true: + * - target is always mapped to the hashed region name + * - targetDesc is always mapped to the region name + */ +object MetricDef { val ALL_REGION_METRICS = Set("storefileSizeMB", "memstoreSizeMB", "storefiles", "compactions") val STOREFILE_SIZE_MB = "storefileSizeMB" - def STOREFILE_SIZE_MB(region:String) : MetricDef = findRegionMetricDef(region, STOREFILE_SIZE_MB) + + def STOREFILE_SIZE_MB(region: String): MetricDef = findRegionMetricDef(region, STOREFILE_SIZE_MB) val MEMSTORE_SIZE_MB = "memstoreSizeMB" - def MEMSTORE_SIZE_MB(region:String) : MetricDef = findRegionMetricDef(region, MEMSTORE_SIZE_MB) - val STOREFILES = "storefiles" - def STOREFILES(region:String) : MetricDef = findRegionMetricDef(region, STOREFILES) + def MEMSTORE_SIZE_MB(region: String): MetricDef = findRegionMetricDef(region, MEMSTORE_SIZE_MB) + + val STOREFILES = "storefiles" + + def STOREFILES(region: String): MetricDef = findRegionMetricDef(region, STOREFILES) val COMPACTIONS = "compactions" - def COMPACTIONS(region: String) : MetricDef = findRegionMetricDef(region, COMPACTIONS) - - def findRegionMetricDef(region: String, name: String) = find(hash(region), name, region) - - def find(target: String, name: String, targetDesc: String) = { - DB.withConnection { implicit c => - val stream = SQL_FIND_METRIC.on("name" -> name, "target" -> target)() - - val truncatedTargetDesc = if(targetDesc.length > MAX_TARGET_DESC_LENGTH) { - Logger.warn("truncating long region-name to %d characters".format(MAX_TARGET_DESC_LENGTH)) - targetDesc.substring(0, MAX_TARGET_DESC_LENGTH) - } else { - targetDesc - } - - if(stream.isEmpty) { - Logger.info("creating new metric for " + target + " : " + name) - val id = SQL_INSERT_METRIC.on("target" -> target, "name" -> name, "target_desc" -> truncatedTargetDesc).executeInsert() - MetricDef(id.get, target, name, 0.0, 0, truncatedTargetDesc) - } else { - val row = stream.head - if(row[String]("target_desc") != truncatedTargetDesc) - { - Logger.info("updating metric targetDesc in old metric '" + target +"' for " + name) - SQL_MIGRATE_METRIC_3.on("id" -> row[Long]("id"), "target_desc" -> truncatedTargetDesc).executeUpdate() - } - MetricDef( - row[Long]("id"), - row[String]("target"), - row[String]("name"), - row[Double]("last_value"), - row[Long]("last_update"), - truncatedTargetDesc - ) - } - } - } - def findByName(name: String):Seq[MetricDef] = { - DB.withConnection { implicit c => - val stream = SQL_FIND_METRIC_ALL.on("name" -> name)() + def COMPACTIONS(region: String): MetricDef = findRegionMetricDef(region, COMPACTIONS) + + def findRegionMetricDef(region: String, name: String) = find(RegionHash.byName(region).hash, name, region) + + def find(target: String, name: String, targetDesc:String) = { + DB.withConnection { + implicit c => + val stream = SQL_FIND_METRIC.on("name" -> name, "target" -> target)() + + if (stream.isEmpty) { + Logger.info("creating new metric for " + target + " : " + name) + val id = SQL_INSERT_METRIC.on("target" -> target, "name" -> name).executeInsert() + MetricDef(id.get, target, name, 0.0, 0, targetDesc) + } else { + val row = stream.head - if(stream.isEmpty) { - Logger.info("no metrics found for : " + name) - List() - } else { - stream.map( row => { MetricDef( row[Long]("id"), - row[String]("target"), + target, row[String]("name"), row[Double]("last_value"), row[Long]("last_update"), - row[String]("target_desc") + targetDesc // this is currently always the region name ) - }).toList - } + } + } + } + + def findByName(name: String): Seq[MetricDef] = { + DB.withConnection { + implicit c => + val stream = SQL_FIND_METRIC_ALL.on("name" -> name)() + + if (stream.isEmpty) { + Logger.info("no metrics found for : " + name) + List() + } else { + stream.map(row => { + val regionHash = RegionHash.byHash(row[String]("target")) + // NOTE: actually, using the RegionHash here is against the intended design. Metrics itself were not + // designed specific for regions. However we currently don't use the metrics for anything else, + // so it's OK for now to resolve the targetDesc by using the RegionHash. + MetricDef( + row[Long]("id"), + regionHash.hash, + row[String]("name"), + row[Double]("last_value"), + row[Long]("last_update"), + regionHash.name + ) + }).toList + } } } def clean(until: Long = now() - 1000 * 3600 * 24 * 7) = { var recordsCleaned = 0; var metricsCleaned = 0; - DB.withConnection { implicit c => - recordsCleaned = SQL_DELETE_RECORDS.on("until" -> until).executeUpdate() - metricsCleaned = SQL_DELETE_METRICS.on("until" -> until).executeUpdate() + DB.withConnection { + implicit c => + recordsCleaned = SQL_DELETE_RECORDS.on("until" -> until).executeUpdate() + metricsCleaned = SQL_DELETE_METRICS.on("until" -> until).executeUpdate() } Tuple2(metricsCleaned, recordsCleaned); } def now() = new java.util.Date().getTime() - def hash(value:String) = ByteUtil.toHexString(MessageDigest.getInstance("MD5").digest(Bytes.toBytes(value))) - - val SQL_FIND_METRIC_ALL = SQL(""" + val SQL_FIND_METRIC_ALL = SQL( """ SELECT - id, target, name, last_value, last_update, target_desc + id, target, name, last_value, last_update FROM metric WHERE name={name} - """) + """) - val SQL_FIND_METRIC = SQL(""" + val SQL_FIND_METRIC = SQL( """ SELECT - id, target, name, last_value, last_update, target_desc + id, target, name, last_value, last_update FROM metric WHERE name={name} AND target={target} - """) + """) - val SQL_INSERT_METRIC = SQL(""" + val SQL_INSERT_METRIC = SQL( """ INSERT INTO - metric(target, name, last_value, last_update, target_desc) + metric(target, name, last_value, last_update) VALUES - ({target}, {name}, 0.0, 0, {target_desc}) - """) + ({target}, {name}, 0.0, 0) + """) val SQL_UPDATE_METRIC = SQL("UPDATE metric SET last_value={last_value}, last_update={last_update} WHERE id={id}") - val SQL_MIGRATE_METRIC_3 = SQL("UPDATE metric SET target_desc={target_desc} WHERE id={id}") - - val SQL_INSERT_RECORD = SQL(""" + val SQL_INSERT_RECORD = SQL( """ INSERT INTO record(metric_id, timestamp, prev_value, value) VALUES ({metric_id}, {timestamp}, {prev_value}, {value}) - """) + """) - val SQL_FIND_RECORDS = SQL(""" + val SQL_FIND_RECORDS = SQL( """ SELECT timestamp, prev_value, value FROM @@ -149,59 +148,61 @@ object MetricDef { metric_id = {metric_id} AND timestamp > {since} AND timestamp <= {until} ORDER BY timestamp - """) + """) - val SQL_DELETE_RECORDS = SQL(""" + val SQL_DELETE_RECORDS = SQL( """ DELETE FROM record WHERE timestamp < {until} - """) + """) - val SQL_DELETE_METRICS = SQL(""" + val SQL_DELETE_METRICS = SQL( """ DELETE FROM metric WHERE last_update < {until} - """) + """) } -case class MetricDef(id: Long, target: String, name: String, var lastValue: Double, var lastUpdate: Long, targetDesc:String) { - def update(value: Double, timestamp:Long = now) = { +case class MetricDef(id: Long, target: String, name: String, var lastValue: Double, var lastUpdate: Long, var targetDesc: String) { + def update(value: Double, timestamp: Long = now) = { var updated = false - DB.withConnection { implicit c => - if(lastValue != value) { - SQL_INSERT_RECORD.on("metric_id" -> id, "timestamp" -> timestamp, "prev_value" -> lastValue, "value" -> value).executeInsert() - lastValue = value - updated = true - } - if(lastUpdate < timestamp) { - lastUpdate = timestamp - SQL_UPDATE_METRIC.on("id" -> id, "last_update" -> lastUpdate, "last_value" -> lastValue).executeUpdate() - } + DB.withConnection { + implicit c => + if (lastValue != value) { + SQL_INSERT_RECORD.on("metric_id" -> id, "timestamp" -> timestamp, "prev_value" -> lastValue, "value" -> value).executeInsert() + lastValue = value + updated = true + } + if (lastUpdate < timestamp) { + lastUpdate = timestamp + SQL_UPDATE_METRIC.on("id" -> id, "last_update" -> lastUpdate, "last_value" -> lastValue).executeUpdate() + } } updated } - def metric(since: Long, until: Long) : Metric = { - var values:List[MetricRecord] = null; - var prevValue:Option[Double] = None; - DB.withConnection { implicit c => - values = SQL_FIND_RECORDS.on("metric_id" -> id, "since" -> since, "until" -> until)().map( row => { - if (prevValue == None) { - prevValue = Some(row[Double]("prev_value")) - } - MetricRecord(row[Long]("timestamp"), row[Double]("value")); - }).toList + def metric(since: Long, until: Long): Metric = { + var values: List[MetricRecord] = null; + var prevValue: Option[Double] = None; + DB.withConnection { + implicit c => + values = SQL_FIND_RECORDS.on("metric_id" -> id, "since" -> since, "until" -> until)().map(row => { + if (prevValue == None) { + prevValue = Some(row[Double]("prev_value")) + } + MetricRecord(row[Long]("timestamp"), row[Double]("value")); + }).toList } - if(values.size < 1) + if (values.size < 1) Metric(name, target, since, until, values, lastValue, lastUpdate == 0, targetDesc) else Metric(name, target, since, until, values, prevValue.get, lastUpdate == 0, targetDesc) } } -case class Metric(name: String, target: String, begin: Long, end: Long, values: Seq[MetricRecord], prevValue: Double, isEmpty: Boolean, targetDesc:String) +case class Metric(name: String, target: String, begin: Long, end: Long, values: Seq[MetricRecord], prevValue: Double, isEmpty: Boolean, targetDesc: String) case class MetricRecord(ts: Long, v: Double) diff --git a/app/models/RegionHash.scala b/app/models/RegionHash.scala new file mode 100644 index 0000000..1f11625 --- /dev/null +++ b/app/models/RegionHash.scala @@ -0,0 +1,75 @@ +package models + +import utils.ByteUtil +import java.security.MessageDigest +import org.apache.hadoop.hbase.util.Bytes +import scala.collection.mutable +import anorm._ +import play.api.db.DB +import play.api.Logger +import play.api.Play.current + +/** + * Created by nkuebler on 20/05/14. + */ +case class RegionHash(name:String, hash:String) { +} + +object RegionHash { + + def byName(name:String) = { + val hash:String = nameToHash.get(name).getOrElse { + val hashedName = this.hash(name) + DB.withConnection { implicit c => + val stream = SQL_FIND_NAME.on("hash" -> hashedName)() + if(stream.isEmpty) { // add record if not yet exists + SQL_INSERT_NAME.on("hash" -> hashedName, "name" -> name).executeInsert() + } + } + hashToName.put(hashedName, name) + nameToHash.put(name, hashedName) + hashedName + } + RegionHash(name, hash) + } + + def byHash(hash:String) : RegionHash = { + val name = hashToName.get(hash).getOrElse { + DB.withConnection { implicit c => + val stream = SQL_FIND_NAME.on("hash" -> hash)() + if(stream.isEmpty) { + Logger.info("no region found for hash: " +hash) + "-unknown-" + } else { + val row = stream.head + val name = row[String]("name") + hashToName.put(hash, name) + nameToHash.put(name, hash) + name + } + } + } + RegionHash(name, hash) + } + + def hash(value:String) = ByteUtil.toHexString(MessageDigest.getInstance("MD5").digest(Bytes.toBytes(value))) + + val hashToName:mutable.HashMap[String, String] = new mutable.HashMap() + val nameToHash:mutable.HashMap[String, String] = new mutable.HashMap() + + val SQL_FIND_NAME = SQL(""" + SELECT + name + FROM + region + WHERE + hash={hash} + """) + + val SQL_INSERT_NAME = SQL(""" + INSERT INTO + region(hash, name) + VALUES + ({hash}, {name}) + """) +} diff --git a/conf/evolutions/default/5.sql b/conf/evolutions/default/5.sql new file mode 100644 index 0000000..143b7e1 --- /dev/null +++ b/conf/evolutions/default/5.sql @@ -0,0 +1,19 @@ +# Metrics schema + +# --- !Ups + +CREATE TABLE region ( + hash VARCHAR(255), + name VARCHAR(65536) +); +CREATE UNIQUE INDEX region_hash ON region(hash); + +INSERT INTO region (hash, name) SELECT DISTINCT target, target_desc FROM metric; + +ALTER TABLE metric DROP target_desc; + +# --- !Downs + +ALTER TABLE metric ADD target_desc VARCHAR(1000) DEFAULT '-unkown-'; + +DROP TABLE region; \ No newline at end of file diff --git a/test/models/RegionHashSpec.scala b/test/models/RegionHashSpec.scala new file mode 100644 index 0000000..b37188d --- /dev/null +++ b/test/models/RegionHashSpec.scala @@ -0,0 +1,47 @@ +package models + +import org.specs2.mutable._ +import play.api.test._ +import play.api.test.Helpers._ + +class RegionHashSpec extends Specification { + + "RegionHashSpec" should { + + "provide a method: #byName" >> { + "should return correct Regionhash" >> { + running(FakeApplication(additionalConfiguration = inMemoryDatabase())) { + val regionHash = RegionHash.byName("a") + regionHash.name must equalTo("a") + regionHash.hash must equalTo(RegionHash.hash("a")) + } + } + + "should return not be equal to another hash" >> { + running(FakeApplication(additionalConfiguration = inMemoryDatabase())) { + val regionHashA = RegionHash.byName("a") + val regionHashB = RegionHash.byName("b") + regionHashA.name mustNotEqual(regionHashB) + } + } + } + "provide a method: #byHash" >> { + "should return -unkown-" >> { + running(FakeApplication(additionalConfiguration = inMemoryDatabase())) { + val regionHash = RegionHash.byHash(RegionHash.hash("c")) + regionHash.name must equalTo("-unknown-") + } + } + + "should return c" >> { + running(FakeApplication(additionalConfiguration = inMemoryDatabase())) { + val md5Sum = RegionHash.hash("c") + RegionHash.byHash(md5Sum) // this should not be cached + RegionHash.byName("c") // this should be cached + val regionHash = RegionHash.byHash(md5Sum) + regionHash.name must equalTo("c") + } + } + } + } +} \ No newline at end of file