From 5704c84d89e5ff18dd3fe06827e76a53b90199d3 Mon Sep 17 00:00:00 2001 From: Parag Jain Date: Sun, 1 Nov 2015 20:25:26 -0600 Subject: [PATCH] handle gran changes --- .../com/metamx/tranquility/beam/Beam.scala | 10 +- .../tranquility/beam/ClusteredBeam.scala | 220 ++++++++++++------ .../tranquility/beam/HashPartitionBeam.scala | 8 + .../metamx/tranquility/beam/HttpBeam.scala | 3 + .../metamx/tranquility/beam/MemoryBeam.scala | 2 + .../metamx/tranquility/beam/NoopBeam.scala | 4 + .../tranquility/beam/RoundRobinBeam.scala | 8 + .../metamx/tranquility/druid/DruidBeam.scala | 5 +- .../tranquility/druid/DruidBeamMaker.scala | 9 +- .../metamx/tranquility/druid/DruidBeams.scala | 3 + .../tranquility/test/BeamPacketizerTest.scala | 2 + .../tranquility/test/ClusteredBeamTest.scala | 4 + .../tranquility/test/StormBoltTest.scala | 2 + 13 files changed, 196 insertions(+), 84 deletions(-) diff --git a/core/src/main/scala/com/metamx/tranquility/beam/Beam.scala b/core/src/main/scala/com/metamx/tranquility/beam/Beam.scala index 2c9b5ae..f8cb0ae 100644 --- a/core/src/main/scala/com/metamx/tranquility/beam/Beam.scala +++ b/core/src/main/scala/com/metamx/tranquility/beam/Beam.scala @@ -17,12 +17,13 @@ package com.metamx.tranquility.beam import com.twitter.util.Future +import org.joda.time.Interval /** * Beams can accept events and forward them along. The propagate method may throw a DefunctBeamException, which means * the beam should be discarded (after calling close()). */ -trait Beam[A] +trait Beam[A] extends DiscoverableInterval { /** * Request propagation of events. The operation may fail in various ways, which tend to be specific to @@ -40,3 +41,10 @@ class DefunctBeamException(s: String, t: Throwable) extends Exception(s, t) { def this(s: String) = this(s, null) } + +trait DiscoverableInterval +{ + /** + * */ + def getInterval(): Option[Interval] +} diff --git a/core/src/main/scala/com/metamx/tranquility/beam/ClusteredBeam.scala b/core/src/main/scala/com/metamx/tranquility/beam/ClusteredBeam.scala index 192ca78..a2a432e 100644 --- a/core/src/main/scala/com/metamx/tranquility/beam/ClusteredBeam.scala +++ b/core/src/main/scala/com/metamx/tranquility/beam/ClusteredBeam.scala @@ -18,9 +18,8 @@ package com.metamx.tranquility.beam import com.fasterxml.jackson.databind.ObjectMapper import com.google.common.util.concurrent.ThreadFactoryBuilder -import com.metamx.common.scala.Logging +import com.metamx.common.scala.{untyped, Logging} import com.metamx.common.scala.Predef._ -import com.metamx.common.scala.collection.mutable.ConcurrentMap import com.metamx.common.scala.event._ import com.metamx.common.scala.event.emit.emitAlert import com.metamx.common.scala.option._ @@ -88,6 +87,8 @@ class ClusteredBeam[EventType: Timestamper, InnerBeamType <: Beam[EventType]]( alertMap: Dict ) extends Beam[EventType] with Logging { + def getInterval() = None + require(tuning.partitions > 0, "tuning.partitions > 0") require(tuning.minSegmentsPerBeam > 0, "tuning.minSegmentsPerBeam > 0") require(tuning.maxSegmentsPerBeam >= tuning.minSegmentsPerBeam, "tuning.maxSegmentsPerBeam >= tuning.minSegmentsPerBeam") @@ -132,7 +133,40 @@ class ClusteredBeam[EventType: Timestamper, InnerBeamType <: Beam[EventType]]( private[this] val rand = new Random // Merged beams we are currently aware of, interval start millis -> merged beam. - private[this] val beams = ConcurrentMap[Long, Beam[EventType]]() + private[this] var beams: List[Beam[EventType]] = + { + try { + val dataPath = zpathWithDefault("data", ClusteredBeamMeta.empty.toBytes(objectMapper)) + curator.sync().forPath(dataPath) + val zkMetaData = ClusteredBeamMeta.fromBytes(objectMapper, curator.getData.forPath(dataPath)).fold( + e => { + emitAlert(e, log, emitter, WARN, "Failed to read beam data from cache: %s" format identifier, alertMap) + throw e + }, + meta => meta + ) + log.info("Synced from ZK - [%s]", zkMetaData) + (zkMetaData.beamDictss map { + beamDicts => + beamMergeFn( + beamDicts._2.zipWithIndex map { + case (beamDict, partitionNum) => + val decorate = beamDecorateFn( + new Interval(beamDict.get("interval").get), + partitionNum + ) + decorate(beamMaker.fromDict(beamDict)) + } + ) + }).toList sortBy (-_.getInterval().get.start.millis) + } + catch { + case e: Throwable => + // Log Throwables to avoid invisible errors caused by https://github.com/twitter/util/issues/100. + log.error(e, "Failed to sync with zookeeper: %s", identifier) + throw e + } + } // Lock updates to "localLatestCloseTime" and "beams" to prevent races. private[this] val beamWriteMonitor = new AnyRef @@ -140,7 +174,7 @@ class ClusteredBeam[EventType: Timestamper, InnerBeamType <: Beam[EventType]]( private[this] lazy val data = new { val dataPath = zpathWithDefault("data", ClusteredBeamMeta.empty.toBytes(objectMapper)) - def modify(f: ClusteredBeamMeta => ClusteredBeamMeta): Future[ClusteredBeamMeta] = zkFuturePool { + def modify(f: ClusteredBeamMeta => (ClusteredBeamMeta, Interval)): Future[(ClusteredBeamMeta, Interval)] = zkFuturePool { mutex.acquire() try { curator.sync().forPath(dataPath) @@ -151,13 +185,14 @@ class ClusteredBeam[EventType: Timestamper, InnerBeamType <: Beam[EventType]]( }, meta => meta ) - val newMeta = f(prevMeta) + val newMetaAndInterval = f(prevMeta) + val newMeta = newMetaAndInterval._1 if (newMeta != prevMeta) { val newMetaBytes = newMeta.toBytes(objectMapper) log.info("Writing new beam data to[%s]: %s", dataPath, new String(newMetaBytes)) curator.setData().forPath(dataPath, newMetaBytes) } - newMeta + newMetaAndInterval } catch { case e: Throwable => @@ -178,67 +213,78 @@ class ClusteredBeam[EventType: Timestamper, InnerBeamType <: Beam[EventType]]( t => theImplicit(t).withZone(DateTimeZone.UTC) } - private[this] def beam(timestamp: DateTime, now: DateTime): Future[Beam[EventType]] = { - val bucket = tuning.segmentBucket(timestamp) + private[this] def beam(beamPair :(Interval, Option[Beam[EventType]]), now: DateTime): Future[Beam[EventType]] = { val creationInterval = new Interval( - tuning.segmentBucket(now - tuning.windowPeriod).start.millis, - tuning.segmentBucket(Seq(now + tuning.warmingPeriod, now + tuning.windowPeriod).maxBy(_.millis)).end.millis, - ISOChronology.getInstanceUTC + tuning.segmentBucket(now - tuning.windowPeriod).start, + tuning.segmentBucket(Seq(now + tuning.warmingPeriod, now + tuning.windowPeriod).maxBy(_.millis)).end ) - val windowInterval = new Interval( - tuning.segmentBucket(now - tuning.windowPeriod).start.millis, - tuning.segmentBucket(now + tuning.windowPeriod).end.millis, - ISOChronology.getInstanceUTC - ) - val futureBeamOption = beams.get(timestamp.millis) match { + + val futureBeamOption = beamPair match { case _ if !open => Future.value(None) - case Some(x) if windowInterval.overlaps(bucket) => Future.value(Some(x)) - case Some(x) => Future.value(None) - case None if timestamp <= localLatestCloseTime => Future.value(None) - case None if !creationInterval.overlaps(bucket) => Future.value(None) - case None => - // We may want to create new merged beam(s). Acquire the zk mutex and examine the situation. - // This could be more efficient, but it's happening infrequently so it's probably not a big deal. + case (interval, Some(foundBeam)) if foundBeam.getInterval().get.end + tuning.windowPeriod <= now => Future.value(None) + case (interval, Some(foundBeam)) => Future.value(Some(foundBeam)) + //case (interval, None) if interval.start <= localLatestCloseTime => Future.value(None) + case (interval, None) if !creationInterval.overlaps(interval) => Future.value(None) + case null => Future.value(None) + case (interval, None) => data.modify { prev => - val prevBeamDicts = prev.beamDictss.getOrElse(timestamp.millis, Nil) - if (prevBeamDicts.size >= tuning.partitions) { + log.info("Trying to create new beam with interval [%s]", interval) + // We want to create this new beam + // But first let us check in ZK if some other tranquility already created the beam for this interval meanwhile + + val beamDicts: Seq[untyped.Dict] = prev.beamDictss.collectFirst[Seq[untyped.Dict]]({ + case x if new Interval(x._2.head.get("interval").get).overlaps(interval) => x._2 + }) getOrElse Nil + + if (beamDicts.size >= tuning.partitions && new Interval(beamDicts.head.get("interval").get).contains(interval)) { log.info( "Merged beam already created for identifier[%s] timestamp[%s], with sufficient partitions (target = %d, actual = %d)", identifier, - timestamp, + interval.start, tuning.partitions, - prevBeamDicts.size + beamDicts.size ) - prev - } else if (timestamp <= prev.latestCloseTime) { + (prev, new Interval(beamDicts.head.get("interval").get)) + } else if (interval.start <= prev.latestCloseTime && beamDicts.isEmpty) { log.info( "Global latestCloseTime[%s] for identifier[%s] has moved past timestamp[%s], not creating merged beam", prev.latestCloseTime, identifier, - timestamp + interval.start ) - prev + (prev, interval) } else { - assert(prevBeamDicts.size < tuning.partitions) - assert(timestamp > prev.latestCloseTime) + // Create the new beam + //assert(beamDicts.size < tuning.partitions) + //assert(interval.start > prev.latestCloseTime) - // We might want to cover multiple time segments in advance. val numSegmentsToCover = tuning.minSegmentsPerBeam + rand.nextInt(tuning.maxSegmentsPerBeam - tuning.minSegmentsPerBeam + 1) - val intervalToCover = new Interval( - timestamp.millis, - tuning.segmentGranularity.increment(timestamp, numSegmentsToCover).millis, + var intervalToCover = new Interval( + interval.start.millis, + tuning.segmentGranularity.increment(interval.start, numSegmentsToCover).millis, ISOChronology.getInstanceUTC ) - val timestampsToCover = tuning.segmentGranularity.getIterable(intervalToCover).asScala.map(_.start) + var timestampsToCover = tuning.segmentGranularity.getIterable(intervalToCover).asScala.map(_.start) + // If we are here and beamDicts is not Nil then there is an existing valid overlapping beam in ZK + // so truncate the new beam so that it does not overlaps with any existing beam + if (beamDicts.nonEmpty) { + log.info("Found overlapping beams [%s] with interval [%s]", beamDicts, interval) + intervalToCover = new Interval(beamDicts.head.get("interval").get) match { + case x if interval.end.millis > x.end.millis => new Interval(x.end, interval.end) + case x => new Interval(interval.start, x.start) + } + timestampsToCover = Iterable(intervalToCover.start) + } // OK, create them where needed. val newInnerBeamDictsByPartition = new mutable.HashMap[Int, Dict] val newBeamDictss: Map[Long, Seq[Dict]] = (prev.beamDictss filterNot { case (millis, beam) => // Expire old beamDicts - tuning.segmentGranularity.increment(new DateTime(millis)) + tuning.windowPeriod < now + // Should use segment gran and window period saved in ZK + new Interval(beam.head.get("interval").get).end + tuning.windowPeriod < now }) ++ (for (ts <- timestampsToCover) yield { val tsPrevDicts = prev.beamDictss.getOrElse(ts.millis, Nil) log.info( @@ -250,7 +296,8 @@ class ClusteredBeam[EventType: Timestamper, InnerBeamType <: Beam[EventType]]( ) val tsNewDicts = tsPrevDicts ++ ((tsPrevDicts.size until tuning.partitions) map { partition => - newInnerBeamDictsByPartition.getOrElseUpdate(partition, { + newInnerBeamDictsByPartition.getOrElseUpdate( + partition, { // Create sub-beams and then immediately close them, just so we can get the dict representations. // Close asynchronously, ignore return value. beamMaker.newBeam(intervalToCover, partition).withFinally(_.close()) { @@ -259,7 +306,8 @@ class ClusteredBeam[EventType: Timestamper, InnerBeamType <: Beam[EventType]]( log.info("Created beam: %s", objectMapper.writeValueAsString(beamDict)) beamDict } - }) + } + ) }) (ts.millis, tsNewDicts) }) @@ -267,48 +315,54 @@ class ClusteredBeam[EventType: Timestamper, InnerBeamType <: Beam[EventType]]( (Seq(prev.latestCloseTime.millis) ++ (prev.beamDictss.keySet -- newBeamDictss.keySet)).max, ISOChronology.getInstanceUTC ) - ClusteredBeamMeta( - newLatestCloseTime, - newBeamDictss - ) + (ClusteredBeamMeta(newLatestCloseTime, newBeamDictss), intervalToCover) } } rescue { case e: Throwable => Future.exception( new IllegalStateException( - "Failed to save new beam for identifier[%s] timestamp[%s]" format(identifier, timestamp), e + "Failed to save new beam for identifier[%s] timestamp[%s]" format(identifier, interval.start), e ) ) } map { - meta => + metaAndInterval => // Update local stuff with our goodies from zk. + val meta = metaAndInterval._1 + val interval = metaAndInterval._2 + localLatestCloseTime = meta.latestCloseTime beamWriteMonitor.synchronized { - localLatestCloseTime = meta.latestCloseTime + val timestamp = interval.start // Only add the beams we actually wanted at this time. This is because there might be other beams in ZK // that we don't want to add just yet, on account of maybe they need their partitions expanded (this only // happens when they are the necessary ones). - if (!beams.contains(timestamp.millis) && meta.beamDictss.contains(timestamp.millis)) { + if (!beams.exists(beam => beam.getInterval().get.start.eq(timestamp)) && + meta.beamDictss.contains(timestamp.millis)) { + val beamDicts = meta.beamDictss(timestamp.millis) log.info("Adding beams for identifier[%s] timestamp[%s]: %s", identifier, timestamp, beamDicts) // Should have better handling of unparseable zk data. Changing BeamMaker implementations currently // just causes exceptions until the old dicts are cleared out. - beams(timestamp.millis) = beamMergeFn( + beams = beamMergeFn( beamDicts.zipWithIndex map { case (beamDict, partitionNum) => val decorate = beamDecorateFn(tuning.segmentBucket(timestamp), partitionNum) decorate(beamMaker.fromDict(beamDict)) } - ) + ) +: beams } // Remove beams that are gone from ZK metadata. They have expired. - for ((timestamp, beam) <- beams -- meta.beamDictss.keys) { - log.info("Removing beams for identifier[%s] timestamp[%s]", identifier, timestamp) + val expiredBeams = beams.filterNot(beam => meta.beamDictss.contains(beam.getInterval().get.start.millis)) + + for (beam <- expiredBeams) { + log.info("Removing beams for identifier[%s] timestamp[%s]", identifier, beam.getInterval().get.start) // Close asynchronously, ignore return value. - beams(timestamp).close() - beams.remove(timestamp) + beam.close() } + beams = beams.diff(expiredBeams) + beams = beams.sortBy(-_.getInterval().get.start.millis) + // Return requested beam. It may not have actually been created, so it's an Option. - beams.get(timestamp.millis) ifEmpty { + beams.find(beam => beam.getInterval().get.contains(timestamp)) ifEmpty { log.info( "Turns out we decided not to actually make beams for identifier[%s] timestamp[%s]. Returning None.", identifier, @@ -327,16 +381,35 @@ class ClusteredBeam[EventType: Timestamper, InnerBeamType <: Beam[EventType]]( beamMergeFn( (0 until tuning.partitions) map { partition => - beamDecorateFn(bucket, partition)(new NoopBeam[EventType]) + beamDecorateFn(beamPair._1, partition)(new NoopBeam[EventType]) } ) ) } } + def groupEvents(event: EventType): (Interval, Option[Beam[EventType]]) = { + val eventTimestamp = timestamper(event) + if (beams.headOption.nonEmpty && beams.headOption.get.getInterval().get.contains(eventTimestamp)) { + // we except to hit this case majority of times + (beams.head.getInterval().get, beams.headOption) + } else { + // looks in sequential order for the predicate to be true + val foundBeam = beams.find(_.getInterval().get.contains(eventTimestamp)) + if(foundBeam.nonEmpty) { + (foundBeam.get.getInterval().get, foundBeam) + } else { + // We did not find any beam that can handle this event + // We may create a new beam if no overlapping beam exists in the ZK, see beam method + val requiredInterval = tuning.segmentBucket(eventTimestamp) + (requiredInterval, None) + } + } + } + def propagate(events: Seq[EventType]) = { val now = timekeeper.now.withZone(DateTimeZone.UTC) - val grouped = events.groupBy(x => tuning.segmentBucket(timestamper(x)).start).toSeq.sortBy(_._1.millis) + val grouped = events.groupBy(groupEvents).toSeq.sortBy(_._1._1.start.millis) // Possibly warm up future beams def toBeWarmed(dt: DateTime, end: DateTime): List[DateTime] = { if (dt <= end) { @@ -350,11 +423,11 @@ class ClusteredBeam[EventType: Timestamper, InnerBeamType <: Beam[EventType]]( tbwTimestamp <- toBeWarmed(latestEvent, latestEvent + tuning.warmingPeriod) if tbwTimestamp > latestEvent ) yield { // Create beam asynchronously - beam(tbwTimestamp, now) + beam((tuning.segmentBucket(tbwTimestamp), None), now) }) // Propagate data - val countFutures = for ((timestamp, eventGroup) <- grouped) yield { - beam(timestamp, now) onFailure { + val countFutures = for ((beamToUse, eventGroup) <- grouped) yield { + beam(beamToUse, now) onFailure { e => emitAlert(e, log, emitter, WARN, "Failed to create merged beam: %s" format identifier, alertMap) } flatMap { @@ -362,6 +435,7 @@ class ClusteredBeam[EventType: Timestamper, InnerBeamType <: Beam[EventType]]( // We expect beams to handle retries, so if we get an exception here let's drop the batch beam.propagate(eventGroup) rescue { case e: DefunctBeamException => + val timestamp = beam.getInterval().get.start // Just drop data until the next segment starts. At some point we should look at doing something // more intelligent. emitAlert( @@ -369,20 +443,20 @@ class ClusteredBeam[EventType: Timestamper, InnerBeamType <: Beam[EventType]]( alertMap ++ Dict( "eventCount" -> eventGroup.size, - "timestamp" -> timestamp.toString(), + "timestamp" -> timestamp.toString, "beam" -> beam.toString ) ) data.modify { prev => - ClusteredBeamMeta( + (ClusteredBeamMeta( Seq(prev.latestCloseTime, timestamp).maxBy(_.millis), prev.beamDictss - timestamp.millis - ) + ), beam.getInterval().get) } onSuccess { meta => beamWriteMonitor.synchronized { - beams.remove(timestamp.millis) + beams = beams diff List(beam) } } map (_ => 0) @@ -392,7 +466,7 @@ class ClusteredBeam[EventType: Timestamper, InnerBeamType <: Beam[EventType]]( alertMap ++ Dict( "eventCount" -> eventGroup.size, - "timestamp" -> timestamp.toString(), + "timestamp" -> beam.getInterval().get.toString, "beams" -> beam.toString ) ) @@ -407,8 +481,8 @@ class ClusteredBeam[EventType: Timestamper, InnerBeamType <: Beam[EventType]]( def close() = { beamWriteMonitor.synchronized { open = false - val closeFuture = Future.collect(beams.values.toList map (_.close())) map (_ => ()) - beams.clear() + val closeFuture = Future.collect(beams map (_.close())) map (_ => ()) + beams = beams diff beams closeFuture } } @@ -428,11 +502,11 @@ case class ClusteredBeamMeta(latestCloseTime: DateTime, beamDictss: Map[Long, Se Dict( // latestTime is only being written for backwards compatibility "latestTime" -> new DateTime( - (Seq(latestCloseTime.millis) ++ beamDictss.map(_._1)).max, + (Seq(latestCloseTime.millis) ++ beamDictss.keys).max, ISOChronology.getInstanceUTC - ).toString(), - "latestCloseTime" -> latestCloseTime.toString(), - "beams" -> beamDictss.map(kv => (new DateTime(kv._1, ISOChronology.getInstanceUTC).toString(), kv._2)) + ).toString, + "latestCloseTime" -> latestCloseTime.toString, + "beams" -> beamDictss.map(kv => (new DateTime(kv._1, ISOChronology.getInstanceUTC).toString, kv._2)) ) ) } diff --git a/core/src/main/scala/com/metamx/tranquility/beam/HashPartitionBeam.scala b/core/src/main/scala/com/metamx/tranquility/beam/HashPartitionBeam.scala index 042064a..f1be60b 100644 --- a/core/src/main/scala/com/metamx/tranquility/beam/HashPartitionBeam.scala +++ b/core/src/main/scala/com/metamx/tranquility/beam/HashPartitionBeam.scala @@ -41,4 +41,12 @@ class HashPartitionBeam[A]( } override def toString = "HashPartitionBeam(%s)" format delegates.mkString(", ") + + def getInterval() = { + val headOptional = delegates.headOption + if(headOptional.isEmpty) + None + else + headOptional.get.getInterval() + } } diff --git a/core/src/main/scala/com/metamx/tranquility/beam/HttpBeam.scala b/core/src/main/scala/com/metamx/tranquility/beam/HttpBeam.scala index c682f29..cbc056f 100644 --- a/core/src/main/scala/com/metamx/tranquility/beam/HttpBeam.scala +++ b/core/src/main/scala/com/metamx/tranquility/beam/HttpBeam.scala @@ -60,6 +60,9 @@ class HttpBeam[A: Timestamper]( emitter: ServiceEmitter ) extends Beam[A] with Logging { + + def getInterval() = None + private[this] implicit val timer: Timer = DefaultTimer.twitter private[this] val port = if (uri.port > 0) { diff --git a/core/src/main/scala/com/metamx/tranquility/beam/MemoryBeam.scala b/core/src/main/scala/com/metamx/tranquility/beam/MemoryBeam.scala index cd713b7..6058458 100644 --- a/core/src/main/scala/com/metamx/tranquility/beam/MemoryBeam.scala +++ b/core/src/main/scala/com/metamx/tranquility/beam/MemoryBeam.scala @@ -28,6 +28,8 @@ class MemoryBeam[A]( jsonWriter: JsonWriter[A] ) extends Beam[A] { + def getInterval() = None + def propagate(events: Seq[A]) = { events.map(event => Jackson.parse[Dict](jsonWriter.asBytes(event))) foreach { d => diff --git a/core/src/main/scala/com/metamx/tranquility/beam/NoopBeam.scala b/core/src/main/scala/com/metamx/tranquility/beam/NoopBeam.scala index 52acdbf..b5047c3 100644 --- a/core/src/main/scala/com/metamx/tranquility/beam/NoopBeam.scala +++ b/core/src/main/scala/com/metamx/tranquility/beam/NoopBeam.scala @@ -17,6 +17,8 @@ package com.metamx.tranquility.beam import com.twitter.util.Future +import org.joda.time.Interval +import org.joda.time.chrono.ISOChronology class NoopBeam[A] extends Beam[A] { @@ -25,4 +27,6 @@ class NoopBeam[A] extends Beam[A] def close() = Future.Done override def toString = "NoopBeam()" + + def getInterval() = Some(new Interval(0, 0, ISOChronology.getInstanceUTC)) } diff --git a/core/src/main/scala/com/metamx/tranquility/beam/RoundRobinBeam.scala b/core/src/main/scala/com/metamx/tranquility/beam/RoundRobinBeam.scala index 3be692c..2931614 100644 --- a/core/src/main/scala/com/metamx/tranquility/beam/RoundRobinBeam.scala +++ b/core/src/main/scala/com/metamx/tranquility/beam/RoundRobinBeam.scala @@ -38,4 +38,12 @@ class RoundRobinBeam[A]( } override def toString = "RoundRobinBeam(%s)" format beams.mkString(", ") + + def getInterval() = { + val headOptional = beams.headOption + if(headOptional.isEmpty) + None + else + headOptional.get.getInterval() + } } diff --git a/core/src/main/scala/com/metamx/tranquility/druid/DruidBeam.scala b/core/src/main/scala/com/metamx/tranquility/druid/DruidBeam.scala index dc0ef89..f90bf75 100644 --- a/core/src/main/scala/com/metamx/tranquility/druid/DruidBeam.scala +++ b/core/src/main/scala/com/metamx/tranquility/druid/DruidBeam.scala @@ -24,8 +24,7 @@ import com.metamx.common.scala.event.WARN import com.metamx.common.scala.event.emit.emitAlert import com.metamx.common.scala.untyped._ import com.metamx.emitter.service.ServiceEmitter -import com.metamx.tranquility.beam.Beam -import com.metamx.tranquility.beam.DefunctBeamException +import com.metamx.tranquility.beam.{DiscoverableInterval, Beam, DefunctBeamException} import com.metamx.tranquility.finagle._ import com.metamx.tranquility.typeclass.ObjectWriter import com.metamx.tranquility.typeclass.Timestamper @@ -53,6 +52,8 @@ class DruidBeam[A : Timestamper]( objectWriter: ObjectWriter[A] ) extends Beam[A] with Logging { + def getInterval() = Some(interval) + private[this] implicit val timer = DefaultTimer.twitter // Keeps track of each task's client and most recently checked status diff --git a/core/src/main/scala/com/metamx/tranquility/druid/DruidBeamMaker.scala b/core/src/main/scala/com/metamx/tranquility/druid/DruidBeamMaker.scala index 4944dca..7f6b620 100644 --- a/core/src/main/scala/com/metamx/tranquility/druid/DruidBeamMaker.scala +++ b/core/src/main/scala/com/metamx/tranquility/druid/DruidBeamMaker.scala @@ -138,10 +138,6 @@ class DruidBeamMaker[A: Timestamper]( } override def newBeam(interval: Interval, partition: Int) = { - require( - beamTuning.segmentGranularity.widen(interval) == interval, - "Interval does not match segmentGranularity[%s]: %s" format(beamTuning.segmentGranularity, interval) - ) val availabilityGroup = DruidBeamMaker.generateBaseFirehoseId( location.dataSource, beamTuning.segmentGranularity, @@ -190,10 +186,7 @@ class DruidBeamMaker[A: Timestamper]( // Backwards compatibility (see toDict). beamTuning.segmentBucket(new DateTime(d("timestamp"), ISOChronology.getInstanceUTC)) } - require( - beamTuning.segmentGranularity.widen(interval) == interval, - "Interval does not match segmentGranularity[%s]: %s" format(beamTuning.segmentGranularity, interval) - ) + val partition = int(d("partition")) val tasks = if (d contains "tasks") { list(d("tasks")).map(dict(_)).map(d => DruidTaskPointer(str(d("id")), str(d("firehoseId")))) diff --git a/core/src/main/scala/com/metamx/tranquility/druid/DruidBeams.scala b/core/src/main/scala/com/metamx/tranquility/druid/DruidBeams.scala index d58e935..9c8185d 100644 --- a/core/src/main/scala/com/metamx/tranquility/druid/DruidBeams.scala +++ b/core/src/main/scala/com/metamx/tranquility/druid/DruidBeams.scala @@ -194,6 +194,9 @@ object DruidBeams def close() = clusteredBeam.close() map (_ => lifecycle.stop()) override def toString = clusteredBeam.toString + + def getInterval() = clusteredBeam.getInterval() + } } diff --git a/core/src/test/scala/com/metamx/tranquility/test/BeamPacketizerTest.scala b/core/src/test/scala/com/metamx/tranquility/test/BeamPacketizerTest.scala index f9da7e9..6379b1a 100644 --- a/core/src/test/scala/com/metamx/tranquility/test/BeamPacketizerTest.scala +++ b/core/src/test/scala/com/metamx/tranquility/test/BeamPacketizerTest.scala @@ -56,6 +56,8 @@ class BeamPacketizerTest extends FunSuite with Logging } override def close() = memoryBeam.close() + + def getInterval() = None } val acked = new AtomicLong() diff --git a/core/src/test/scala/com/metamx/tranquility/test/ClusteredBeamTest.scala b/core/src/test/scala/com/metamx/tranquility/test/ClusteredBeamTest.scala index 5cf51be..23380d1 100644 --- a/core/src/test/scala/com/metamx/tranquility/test/ClusteredBeamTest.scala +++ b/core/src/test/scala/com/metamx/tranquility/test/ClusteredBeamTest.scala @@ -109,6 +109,8 @@ class ClusteredBeamTest extends FunSuite with CuratorRequiringSuite with BeforeA def close() = { beam.close() } + + def getInterval() = None } class TestingBeam(val timestamp: DateTime, val partition: Int, val uuid: String = UUID.randomUUID().toString) @@ -118,6 +120,8 @@ class ClusteredBeamTest extends FunSuite with CuratorRequiringSuite with BeforeA _beams += this } + def getInterval() = None + def propagate(_events: Seq[SimpleEvent]) = _lock.synchronized { if (_events.contains(events("defunct"))) { Future.exception(new DefunctBeamException("Defunct")) diff --git a/storm/src/test/scala/com/metamx/tranquility/test/StormBoltTest.scala b/storm/src/test/scala/com/metamx/tranquility/test/StormBoltTest.scala index 5b79b66..a3e5b71 100644 --- a/storm/src/test/scala/com/metamx/tranquility/test/StormBoltTest.scala +++ b/storm/src/test/scala/com/metamx/tranquility/test/StormBoltTest.scala @@ -52,6 +52,8 @@ class SimpleBeam extends Beam[SimpleEvent] } def close() = Future.Done + + def getInterval() = None } object SimpleBeam