Skip to content

Commit

Permalink
Merge pull request druid-io#35 from gianm/tz-fixes
Browse files Browse the repository at this point in the history
Use UTC DateTimes and Intervals whenever making decisions involving segments.
  • Loading branch information
fjy committed Oct 19, 2015
2 parents 8cc0ef2 + f60ade5 commit e006195
Show file tree
Hide file tree
Showing 5 changed files with 76 additions and 42 deletions.
47 changes: 32 additions & 15 deletions core/src/main/scala/com/metamx/tranquility/beam/ClusteredBeam.scala
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,9 @@ import org.apache.curator.framework.CuratorFramework
import org.apache.curator.framework.recipes.locks.InterProcessSemaphoreMutex
import org.apache.zookeeper.KeeperException.NodeExistsException
import org.joda.time.DateTime
import org.joda.time.DateTimeZone
import org.joda.time.Interval
import org.joda.time.chrono.ISOChronology
import org.scala_tools.time.Implicits._
import scala.collection.JavaConverters._
import scala.collection.mutable
Expand Down Expand Up @@ -125,7 +127,7 @@ class ClusteredBeam[EventType: Timestamper, InnerBeamType <: Beam[EventType]](

// We will refuse to create beams earlier than this timestamp. The purpose of this is to prevent recreating beams
// that we thought were closed.
@volatile private[this] var localLatestCloseTime = new DateTime(0)
@volatile private[this] var localLatestCloseTime = new DateTime(0, ISOChronology.getInstanceUTC)

private[this] val rand = new Random

Expand Down Expand Up @@ -171,15 +173,22 @@ class ClusteredBeam[EventType: Timestamper, InnerBeamType <: Beam[EventType]](

@volatile private[this] var open = true

val timestamper: EventType => DateTime = {
val theImplicit = implicitly[Timestamper[EventType]].timestamp _
t => theImplicit(t).withZone(DateTimeZone.UTC)
}

private[this] def beam(timestamp: DateTime, now: DateTime): Future[Beam[EventType]] = {
val bucket = tuning.segmentBucket(timestamp)
val creationInterval = new Interval(
tuning.segmentBucket(now - tuning.windowPeriod).start,
tuning.segmentBucket(Seq(now + tuning.warmingPeriod, now + tuning.windowPeriod).maxBy(_.millis)).end
tuning.segmentBucket(now - tuning.windowPeriod).start.millis,
tuning.segmentBucket(Seq(now + tuning.warmingPeriod, now + tuning.windowPeriod).maxBy(_.millis)).end.millis,
ISOChronology.getInstanceUTC
)
val windowInterval = new Interval(
tuning.segmentBucket(now - tuning.windowPeriod).start,
tuning.segmentBucket(now + tuning.windowPeriod).end
tuning.segmentBucket(now - tuning.windowPeriod).start.millis,
tuning.segmentBucket(now + tuning.windowPeriod).end.millis,
ISOChronology.getInstanceUTC
)
val futureBeamOption = beams.get(timestamp.millis) match {
case _ if !open => Future.value(None)
Expand Down Expand Up @@ -217,7 +226,11 @@ class ClusteredBeam[EventType: Timestamper, InnerBeamType <: Beam[EventType]](
// We might want to cover multiple time segments in advance.
val numSegmentsToCover = tuning.minSegmentsPerBeam +
rand.nextInt(tuning.maxSegmentsPerBeam - tuning.minSegmentsPerBeam + 1)
val intervalToCover = timestamp to tuning.segmentGranularity.increment(timestamp, numSegmentsToCover)
val intervalToCover = new Interval(
timestamp.millis,
tuning.segmentGranularity.increment(timestamp, numSegmentsToCover).millis,
ISOChronology.getInstanceUTC
)
val timestampsToCover = tuning.segmentGranularity.getIterable(intervalToCover).asScala.map(_.start)

// OK, create them where needed.
Expand Down Expand Up @@ -251,7 +264,8 @@ class ClusteredBeam[EventType: Timestamper, InnerBeamType <: Beam[EventType]](
(ts.millis, tsNewDicts)
})
val newLatestCloseTime = new DateTime(
(Seq(prev.latestCloseTime.millis) ++ (prev.beamDictss.keySet -- newBeamDictss.keySet)).max
(Seq(prev.latestCloseTime.millis) ++ (prev.beamDictss.keySet -- newBeamDictss.keySet)).max,
ISOChronology.getInstanceUTC
)
ClusteredBeamMeta(
newLatestCloseTime,
Expand Down Expand Up @@ -321,7 +335,7 @@ class ClusteredBeam[EventType: Timestamper, InnerBeamType <: Beam[EventType]](
}

def propagate(events: Seq[EventType]) = {
val timestamper = implicitly[Timestamper[EventType]].timestamp _
val now = timekeeper.now.withZone(DateTimeZone.UTC)
val grouped = events.groupBy(x => tuning.segmentBucket(timestamper(x)).start).toSeq.sortBy(_._1.millis)
// Possibly warm up future beams
def toBeWarmed(dt: DateTime, end: DateTime): List[DateTime] = {
Expand All @@ -336,11 +350,11 @@ class ClusteredBeam[EventType: Timestamper, InnerBeamType <: Beam[EventType]](
tbwTimestamp <- toBeWarmed(latestEvent, latestEvent + tuning.warmingPeriod) if tbwTimestamp > latestEvent
) yield {
// Create beam asynchronously
beam(tbwTimestamp, timekeeper.now)
beam(tbwTimestamp, now)
})
// Propagate data
val countFutures = for ((timestamp, eventGroup) <- grouped) yield {
beam(timestamp, timekeeper.now) onFailure {
beam(timestamp, now) onFailure {
e =>
emitAlert(e, log, emitter, WARN, "Failed to create merged beam: %s" format identifier, alertMap)
} flatMap {
Expand Down Expand Up @@ -413,27 +427,30 @@ case class ClusteredBeamMeta(latestCloseTime: DateTime, beamDictss: Map[Long, Se
def toBytes(objectMapper: ObjectMapper) = objectMapper.writeValueAsBytes(
Dict(
// latestTime is only being written for backwards compatibility
"latestTime" -> new DateTime((Seq(latestCloseTime.millis) ++ beamDictss.map(_._1)).max).toString(),
"latestTime" -> new DateTime(
(Seq(latestCloseTime.millis) ++ beamDictss.map(_._1)).max,
ISOChronology.getInstanceUTC
).toString(),
"latestCloseTime" -> latestCloseTime.toString(),
"beams" -> beamDictss.map(kv => (new DateTime(kv._1).toString(), kv._2))
"beams" -> beamDictss.map(kv => (new DateTime(kv._1, ISOChronology.getInstanceUTC).toString(), kv._2))
)
)
}

object ClusteredBeamMeta
{
def empty = ClusteredBeamMeta(new DateTime(0), Map.empty)
def empty = ClusteredBeamMeta(new DateTime(0, ISOChronology.getInstanceUTC), Map.empty)

def fromBytes[A](objectMapper: ObjectMapper, bytes: Array[Byte]): Either[Exception, ClusteredBeamMeta] = {
try {
val d = objectMapper.readValue(bytes, classOf[Dict])
val beams: Map[Long, Seq[Dict]] = dict(d.getOrElse("beams", Dict())) map {
case (k, vs) =>
val ts = new DateTime(k)
val ts = new DateTime(k, ISOChronology.getInstanceUTC)
val beamDicts = list(vs) map (dict(_))
(ts.millis, beamDicts)
}
val latestCloseTime = new DateTime(d.getOrElse("latestCloseTime", 0L))
val latestCloseTime = new DateTime(d.getOrElse("latestCloseTime", 0L), ISOChronology.getInstanceUTC)
Right(ClusteredBeamMeta(latestCloseTime, beams))
}
catch {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import com.metamx.common.scala.Logging
import com.metamx.common.scala.Predef._
import com.metamx.common.scala.event.WARN
import com.metamx.common.scala.event.emit.emitAlert
import com.metamx.common.scala.timekeeper.Timekeeper
import com.metamx.common.scala.untyped._
import com.metamx.emitter.service.ServiceEmitter
import com.metamx.tranquility.beam.Beam
Expand Down Expand Up @@ -51,7 +50,6 @@ class DruidBeam[A : Timestamper](
finagleRegistry: FinagleRegistry,
indexService: IndexService,
emitter: ServiceEmitter,
timekeeper: Timekeeper,
objectWriter: ObjectWriter[A]
) extends Beam[A] with Logging
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,35 @@ package com.metamx.tranquility.druid

import com.metamx.common.Granularity
import com.metamx.common.scala.Logging
import com.metamx.common.scala.timekeeper.Timekeeper
import com.metamx.common.scala.untyped._
import com.metamx.emitter.service.ServiceEmitter
import com.metamx.tranquility.beam.{BeamMaker, ClusteredBeamTuning}
import com.metamx.tranquility.beam.BeamMaker
import com.metamx.tranquility.beam.ClusteredBeamTuning
import com.metamx.tranquility.finagle.FinagleRegistry
import com.metamx.tranquility.typeclass.{ObjectWriter, Timestamper}
import com.twitter.util.{Await, Future}
import io.druid.data.input.impl.{JSONParseSpec, MapInputRowParser, TimestampSpec}
import io.druid.indexing.common.task.{RealtimeIndexTask, Task, TaskResource}
import com.metamx.tranquility.typeclass.ObjectWriter
import com.metamx.tranquility.typeclass.Timestamper
import com.twitter.util.Await
import com.twitter.util.Future
import io.druid.data.input.impl.JSONParseSpec
import io.druid.data.input.impl.MapInputRowParser
import io.druid.data.input.impl.TimestampSpec
import io.druid.indexing.common.task.RealtimeIndexTask
import io.druid.indexing.common.task.Task
import io.druid.indexing.common.task.TaskResource
import io.druid.segment.indexing.granularity.UniformGranularitySpec
import io.druid.segment.indexing.{DataSchema, RealtimeIOConfig, RealtimeTuningConfig}
import io.druid.segment.indexing.DataSchema
import io.druid.segment.indexing.RealtimeIOConfig
import io.druid.segment.indexing.RealtimeTuningConfig
import io.druid.segment.realtime.FireDepartment
import io.druid.segment.realtime.firehose.{ClippedFirehoseFactory, EventReceiverFirehoseFactory, TimedShutoffFirehoseFactory}
import io.druid.segment.realtime.plumber.{ServerTimeRejectionPolicyFactory, NoopRejectionPolicyFactory}
import io.druid.segment.realtime.firehose.ClippedFirehoseFactory
import io.druid.segment.realtime.firehose.EventReceiverFirehoseFactory
import io.druid.segment.realtime.firehose.TimedShutoffFirehoseFactory
import io.druid.segment.realtime.plumber.NoopRejectionPolicyFactory
import io.druid.segment.realtime.plumber.ServerTimeRejectionPolicyFactory
import io.druid.timeline.partition.LinearShardSpec
import org.joda.time.chrono.ISOChronology
import org.joda.time.{DateTime, DateTimeZone, Interval}
import org.joda.time.DateTime
import org.joda.time.Interval
import org.scala_tools.time.Implicits._
import scala.util.Random

Expand All @@ -48,7 +60,6 @@ class DruidBeamMaker[A: Timestamper](
finagleRegistry: FinagleRegistry,
indexService: IndexService,
emitter: ServiceEmitter,
timekeeper: Timekeeper,
objectWriter: ObjectWriter[A]
) extends BeamMaker[A, DruidBeam[A]] with Logging
{
Expand Down Expand Up @@ -154,7 +165,6 @@ class DruidBeamMaker[A: Timestamper](
finagleRegistry,
indexService,
emitter,
timekeeper,
objectWriter
)
}
Expand All @@ -175,10 +185,10 @@ class DruidBeamMaker[A: Timestamper](

override def fromDict(d: Dict) = {
val interval = if (d contains "interval") {
new Interval(d("interval"))
new Interval(d("interval"), ISOChronology.getInstanceUTC)
} else {
// Backwards compatibility (see toDict).
beamTuning.segmentBucket(new DateTime(d("timestamp")))
beamTuning.segmentBucket(new DateTime(d("timestamp"), ISOChronology.getInstanceUTC))
}
require(
beamTuning.segmentGranularity.widen(interval) == interval,
Expand All @@ -199,7 +209,6 @@ class DruidBeamMaker[A: Timestamper](
finagleRegistry,
indexService,
emitter,
timekeeper,
objectWriter
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,6 @@ object DruidBeams
things.finagleRegistry,
indexService,
things.emitter,
things.timekeeper,
things.objectWriter
)
val clusteredBeam = new ClusteredBeam(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,25 @@ import com.metamx.common.scala.timekeeper.TestingTimekeeper
import com.metamx.common.scala.untyped._
import com.metamx.emitter.core.LoggingEmitter
import com.metamx.emitter.service.ServiceEmitter
import com.metamx.tranquility.beam.{Beam, BeamMaker, ClusteredBeam, ClusteredBeamMeta, ClusteredBeamTuning, DefunctBeamException, RoundRobinBeam}
import com.metamx.tranquility.beam.Beam
import com.metamx.tranquility.beam.BeamMaker
import com.metamx.tranquility.beam.ClusteredBeam
import com.metamx.tranquility.beam.ClusteredBeamMeta
import com.metamx.tranquility.beam.ClusteredBeamTuning
import com.metamx.tranquility.beam.DefunctBeamException
import com.metamx.tranquility.beam.RoundRobinBeam
import com.metamx.tranquility.test.common.CuratorRequiringSuite
import com.metamx.tranquility.typeclass.Timestamper
import com.twitter.util.{Await, Future}
import com.twitter.util.Await
import com.twitter.util.Future
import java.util.UUID
import org.apache.curator.framework.CuratorFramework
import org.joda.time.{DateTime, Interval}
import org.joda.time.DateTimeZone
import org.joda.time.DateTime
import org.joda.time.Interval
import org.scala_tools.time.Implicits._
import org.scalatest.{BeforeAndAfter, FunSuite}
import java.util.UUID
import org.scalatest.BeforeAndAfter
import org.scalatest.FunSuite
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer

Expand Down Expand Up @@ -66,9 +76,10 @@ class ClusteredBeamTest extends FunSuite with CuratorRequiringSuite with BeforeA
val _beams = new ArrayBuffer[TestingBeam]()
val _buffers = mutable.HashMap[String, EventBuffer]()
val _lock = new AnyRef
val localZone = new DateTime().getZone

def buffers = _lock.synchronized {
_buffers.values.map(x => (x.timestamp, x.partition, x.open, x.buffer.toSeq)).toSet
_buffers.values.map(x => (x.timestamp.withZone(localZone), x.partition, x.open, x.buffer.toSeq)).toSet
}

def beamsList = _lock.synchronized {
Expand Down Expand Up @@ -555,16 +566,16 @@ class ClusteredBeamTest extends FunSuite with CuratorRequiringSuite with BeforeA
test("MetaSerde") {
val s = """{
| "beams" : {
| "2000-01-01T15:00:00.000Z" : [
| "2000-01-01T20:30:00.000+05:30" : [
| {
| "partition" : 123
| }
| ]
| },
| "latestCloseTime" : "2000-01-01T14:00:00.000Z"
| "latestCloseTime" : "2000-01-01T19:30:00.000+05:30"
|}""".stripMargin
def checkMeta(meta: ClusteredBeamMeta) {
assert(meta.latestCloseTime === new DateTime("2000-01-01T14:00:00.000Z"))
assert(meta.latestCloseTime === new DateTime("2000-01-01T14:00:00.000Z").withZone(DateTimeZone.UTC))
assert(meta.beamDictss.keys.toSet === Set(
new DateTime("2000-01-01T15:00:00.000Z").millis
))
Expand Down

0 comments on commit e006195

Please sign in to comment.