-
Notifications
You must be signed in to change notification settings - Fork 0
/
TestTimer.java
55 lines (50 loc) · 2.08 KB
/
TestTimer.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
package org.apache.beam.runners.samza.adapter;
import org.apache.beam.runners.samza.SamzaPipelineOptions;
import org.apache.beam.runners.samza.TestSamzaRunner;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.kafka.KafkaIO;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.Values;
import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.joda.time.Duration;
import org.junit.Test;
/**
* Created by xiliu on 2/5/18.
*/
public class TestTimer {
@Test
public void testTimer() {
SamzaPipelineOptions options = PipelineOptionsFactory.as(SamzaPipelineOptions.class);
options.setRunner(TestSamzaRunner.class);
Pipeline pipeline = Pipeline.create(options);
pipeline
.apply(KafkaIO.<String, String>read()
.withTopic("TestTimerTopic")
.withBootstrapServers("localhost:9092")
.withKeyDeserializer(StringDeserializer.class)
.withValueDeserializer(StringDeserializer.class)
.withoutMetadata())
.apply(Values.create())
.apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(5)))
.triggering(AfterProcessingTime.pastFirstElementInPane()
.plusDelayOf(Duration.standardSeconds(5)))
.discardingFiredPanes()
.withAllowedLateness(Duration.standardSeconds(1)))
.apply(Combine.globally(Count.<String>combineFn()).withoutDefaults())
.apply(MapElements
.into(TypeDescriptors.strings())
.via(count -> {
String msg = "Count in window is " + count;
System.out.println(msg);
return msg;
}));
pipeline.run().waitUntilFinish();
}
}