Skip to content

Commit

Permalink
handle gran changes
Browse files Browse the repository at this point in the history
  • Loading branch information
Parag Jain authored and pjain1 committed Nov 10, 2015
1 parent e006195 commit 5704c84
Show file tree
Hide file tree
Showing 13 changed files with 196 additions and 84 deletions.
10 changes: 9 additions & 1 deletion core/src/main/scala/com/metamx/tranquility/beam/Beam.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,13 @@
package com.metamx.tranquility.beam

import com.twitter.util.Future
import org.joda.time.Interval

/**
* Beams can accept events and forward them along. The propagate method may throw a DefunctBeamException, which means
* the beam should be discarded (after calling close()).
*/
trait Beam[A]
trait Beam[A] extends DiscoverableInterval
{
/**
* Request propagation of events. The operation may fail in various ways, which tend to be specific to
Expand All @@ -40,3 +41,10 @@ class DefunctBeamException(s: String, t: Throwable) extends Exception(s, t)
{
def this(s: String) = this(s, null)
}

trait DiscoverableInterval
{
/**
* */
def getInterval(): Option[Interval]
}
220 changes: 147 additions & 73 deletions core/src/main/scala/com/metamx/tranquility/beam/ClusteredBeam.scala

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -41,4 +41,12 @@ class HashPartitionBeam[A](
}

override def toString = "HashPartitionBeam(%s)" format delegates.mkString(", ")

def getInterval() = {
val headOptional = delegates.headOption
if(headOptional.isEmpty)
None
else
headOptional.get.getInterval()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ class HttpBeam[A: Timestamper](
emitter: ServiceEmitter
) extends Beam[A] with Logging
{

def getInterval() = None

private[this] implicit val timer: Timer = DefaultTimer.twitter

private[this] val port = if (uri.port > 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ class MemoryBeam[A](
jsonWriter: JsonWriter[A]
) extends Beam[A]
{
def getInterval() = None

def propagate(events: Seq[A]) = {
events.map(event => Jackson.parse[Dict](jsonWriter.asBytes(event))) foreach {
d =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
package com.metamx.tranquility.beam

import com.twitter.util.Future
import org.joda.time.Interval
import org.joda.time.chrono.ISOChronology

class NoopBeam[A] extends Beam[A]
{
Expand All @@ -25,4 +27,6 @@ class NoopBeam[A] extends Beam[A]
def close() = Future.Done

override def toString = "NoopBeam()"

def getInterval() = Some(new Interval(0, 0, ISOChronology.getInstanceUTC))
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,12 @@ class RoundRobinBeam[A](
}

override def toString = "RoundRobinBeam(%s)" format beams.mkString(", ")

def getInterval() = {
val headOptional = beams.headOption
if(headOptional.isEmpty)
None
else
headOptional.get.getInterval()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,7 @@ import com.metamx.common.scala.event.WARN
import com.metamx.common.scala.event.emit.emitAlert
import com.metamx.common.scala.untyped._
import com.metamx.emitter.service.ServiceEmitter
import com.metamx.tranquility.beam.Beam
import com.metamx.tranquility.beam.DefunctBeamException
import com.metamx.tranquility.beam.{DiscoverableInterval, Beam, DefunctBeamException}
import com.metamx.tranquility.finagle._
import com.metamx.tranquility.typeclass.ObjectWriter
import com.metamx.tranquility.typeclass.Timestamper
Expand Down Expand Up @@ -53,6 +52,8 @@ class DruidBeam[A : Timestamper](
objectWriter: ObjectWriter[A]
) extends Beam[A] with Logging
{
def getInterval() = Some(interval)

private[this] implicit val timer = DefaultTimer.twitter

// Keeps track of each task's client and most recently checked status
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,10 +138,6 @@ class DruidBeamMaker[A: Timestamper](
}

override def newBeam(interval: Interval, partition: Int) = {
require(
beamTuning.segmentGranularity.widen(interval) == interval,
"Interval does not match segmentGranularity[%s]: %s" format(beamTuning.segmentGranularity, interval)
)
val availabilityGroup = DruidBeamMaker.generateBaseFirehoseId(
location.dataSource,
beamTuning.segmentGranularity,
Expand Down Expand Up @@ -190,10 +186,7 @@ class DruidBeamMaker[A: Timestamper](
// Backwards compatibility (see toDict).
beamTuning.segmentBucket(new DateTime(d("timestamp"), ISOChronology.getInstanceUTC))
}
require(
beamTuning.segmentGranularity.widen(interval) == interval,
"Interval does not match segmentGranularity[%s]: %s" format(beamTuning.segmentGranularity, interval)
)

val partition = int(d("partition"))
val tasks = if (d contains "tasks") {
list(d("tasks")).map(dict(_)).map(d => DruidTaskPointer(str(d("id")), str(d("firehoseId"))))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,9 @@ object DruidBeams
def close() = clusteredBeam.close() map (_ => lifecycle.stop())

override def toString = clusteredBeam.toString

def getInterval() = clusteredBeam.getInterval()

}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ class BeamPacketizerTest extends FunSuite with Logging
}

override def close() = memoryBeam.close()

def getInterval() = None
}

val acked = new AtomicLong()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,8 @@ class ClusteredBeamTest extends FunSuite with CuratorRequiringSuite with BeforeA
def close() = {
beam.close()
}

def getInterval() = None
}

class TestingBeam(val timestamp: DateTime, val partition: Int, val uuid: String = UUID.randomUUID().toString)
Expand All @@ -118,6 +120,8 @@ class ClusteredBeamTest extends FunSuite with CuratorRequiringSuite with BeforeA
_beams += this
}

def getInterval() = None

def propagate(_events: Seq[SimpleEvent]) = _lock.synchronized {
if (_events.contains(events("defunct"))) {
Future.exception(new DefunctBeamException("Defunct"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ class SimpleBeam extends Beam[SimpleEvent]
}

def close() = Future.Done

def getInterval() = None
}

object SimpleBeam
Expand Down

0 comments on commit 5704c84

Please sign in to comment.