-
Notifications
You must be signed in to change notification settings - Fork 0
/
sensors.scala
82 lines (61 loc) · 2.24 KB
/
sensors.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
package com.luxoft
package sensors
import java.nio.file.Path
import Reading._
final case class SensorObservation(
invalidCount: Long,
validCount: Long,
sumValue: Long,
minValue: Long,
maxValue: Long
) {
def isValid: Boolean = validCount > 0
def totalCount: Long = invalidCount + validCount
}
object SensorObservation {
val Empty = SensorObservation(0, 0, 0, Long.MaxValue, Long.MinValue)
}
trait SensorsOps {
def updateObservation(sensors: SensorObservation, reading: Reading): SensorObservation = reading match {
case InvalidReading(path, sensor) => sensors.copy(invalidCount = sensors.invalidCount + 1)
case ValidReading(path, sensor, value) => {
val min = Math.min(sensors.minValue, value)
val max = Math.max(sensors.maxValue, value)
sensors
.copy(validCount = sensors.validCount + 1)
.copy(sumValue = sensors.sumValue + value)
.copy(minValue = min)
.copy(maxValue = max)
}
}
}
object SensorsOps extends SensorsOps
final case class Accumulator(private val paths: Set[Path], private val ref: Map[Sensor, SensorObservation]) {
def accumulate(reading: Reading): Accumulator = {
val so = ref.getOrElse(reading.sensor, SensorObservation.Empty)
val updated = SensorsOps.updateObservation(so, reading)
new Accumulator(paths + reading.path, ref + (reading.sensor -> updated))
}
def makeSummary(): SensorsSummary =
ref.foldLeft(SensorsSummary.Empty) {
case (acc, (sensor, observation)) =>
val os: Option[ObservationSummary] = if (observation.isValid) {
Some(
ObservationSummary(observation.minValue,
observation.maxValue,
observation.sumValue / observation.validCount)
)
} else {
None
}
acc
.copy(sourceCount = paths.size)
.copy(observations = acc.observations + observation.totalCount)
.copy(failedObservations = acc.failedObservations + observation.invalidCount)
.copy(observationsSummary = acc.observationsSummary :+ (sensor, os))
}
}
object Accumulator {
def apply(): Accumulator = new Accumulator(Set.empty, Map.empty)
val Empty = Accumulator()
}