Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

handle gran changes #2

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion core/src/main/scala/com/metamx/tranquility/beam/Beam.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,13 @@
package com.metamx.tranquility.beam

import com.twitter.util.Future
import org.joda.time.Interval

/**
* Beams can accept events and forward them along. The propagate method may throw a DefunctBeamException, which means
* the beam should be discarded (after calling close()).
*/
trait Beam[A]
trait Beam[A] extends DiscoverableInterval
{
/**
* Request propagation of events. The operation may fail in various ways, which tend to be specific to
Expand All @@ -40,3 +41,10 @@ class DefunctBeamException(s: String, t: Throwable) extends Exception(s, t)
{
def this(s: String) = this(s, null)
}

trait DiscoverableInterval
{
/**
* */
def getInterval(): Option[Interval]
}
220 changes: 153 additions & 67 deletions core/src/main/scala/com/metamx/tranquility/beam/ClusteredBeam.scala

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -41,4 +41,12 @@ class HashPartitionBeam[A](
}

override def toString = "HashPartitionBeam(%s)" format delegates.mkString(", ")

def getInterval() = {
val headOptional = delegates.headOption
if(headOptional.isEmpty)
None
else
headOptional.get.getInterval()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ class HttpBeam[A: Timestamper](
emitter: ServiceEmitter
) extends Beam[A] with Logging
{

def getInterval() = None

private[this] implicit val timer: Timer = DefaultTimer.twitter

private[this] val port = if (uri.port > 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ class MemoryBeam[A](
jsonWriter: JsonWriter[A]
) extends Beam[A]
{
def getInterval() = None

def propagate(events: Seq[A]) = {
events.map(event => Jackson.parse[Dict](jsonWriter.asBytes(event))) foreach {
d =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
package com.metamx.tranquility.beam

import com.twitter.util.Future
import org.joda.time.Interval
import org.joda.time.chrono.ISOChronology

class NoopBeam[A] extends Beam[A]
{
Expand All @@ -25,4 +27,6 @@ class NoopBeam[A] extends Beam[A]
def close() = Future.Done

override def toString = "NoopBeam()"

def getInterval() = Some(new Interval(0, 0, ISOChronology.getInstanceUTC))
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,12 @@ class RoundRobinBeam[A](
}

override def toString = "RoundRobinBeam(%s)" format beams.mkString(", ")

def getInterval() = {
val headOptional = beams.headOption
if(headOptional.isEmpty)
None
else
headOptional.get.getInterval()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,7 @@ import com.metamx.common.scala.event.WARN
import com.metamx.common.scala.event.emit.emitAlert
import com.metamx.common.scala.untyped._
import com.metamx.emitter.service.ServiceEmitter
import com.metamx.tranquility.beam.Beam
import com.metamx.tranquility.beam.DefunctBeamException
import com.metamx.tranquility.beam.{DiscoverableInterval, Beam, DefunctBeamException}
import com.metamx.tranquility.finagle._
import com.metamx.tranquility.typeclass.ObjectWriter
import com.metamx.tranquility.typeclass.Timestamper
Expand Down Expand Up @@ -53,6 +52,8 @@ class DruidBeam[A : Timestamper](
objectWriter: ObjectWriter[A]
) extends Beam[A] with Logging
{
def getInterval() = Some(interval)

private[this] implicit val timer = DefaultTimer.twitter

// Keeps track of each task's client and most recently checked status
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,10 +138,6 @@ class DruidBeamMaker[A: Timestamper](
}

override def newBeam(interval: Interval, partition: Int) = {
require(
beamTuning.segmentGranularity.widen(interval) == interval,
"Interval does not match segmentGranularity[%s]: %s" format(beamTuning.segmentGranularity, interval)
)
val availabilityGroup = DruidBeamMaker.generateBaseFirehoseId(
location.dataSource,
beamTuning.segmentGranularity,
Expand Down Expand Up @@ -190,10 +186,7 @@ class DruidBeamMaker[A: Timestamper](
// Backwards compatibility (see toDict).
beamTuning.segmentBucket(new DateTime(d("timestamp"), ISOChronology.getInstanceUTC))
}
require(
beamTuning.segmentGranularity.widen(interval) == interval,
"Interval does not match segmentGranularity[%s]: %s" format(beamTuning.segmentGranularity, interval)
)

val partition = int(d("partition"))
val tasks = if (d contains "tasks") {
list(d("tasks")).map(dict(_)).map(d => DruidTaskPointer(str(d("id")), str(d("firehoseId"))))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,9 @@ object DruidBeams
def close() = clusteredBeam.close() map (_ => lifecycle.stop())

override def toString = clusteredBeam.toString

def getInterval() = clusteredBeam.getInterval()

}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ class BeamPacketizerTest extends FunSuite with Logging
}

override def close() = memoryBeam.close()

def getInterval() = None
}

val acked = new AtomicLong()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,8 @@ class ClusteredBeamTest extends FunSuite with CuratorRequiringSuite with BeforeA
def close() = {
beam.close()
}

def getInterval() = None
}

class TestingBeam(val timestamp: DateTime, val partition: Int, val uuid: String = UUID.randomUUID().toString)
Expand All @@ -118,6 +120,8 @@ class ClusteredBeamTest extends FunSuite with CuratorRequiringSuite with BeforeA
_beams += this
}

def getInterval() = None

def propagate(_events: Seq[SimpleEvent]) = _lock.synchronized {
if (_events.contains(events("defunct"))) {
Future.exception(new DefunctBeamException("Defunct"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ class SimpleBeam extends Beam[SimpleEvent]
}

def close() = Future.Done

def getInterval() = None
}

object SimpleBeam
Expand Down