Skip to content

Commit

Permalink
rest probe endpoints(conduktor#108)
Browse files Browse the repository at this point in the history
  • Loading branch information
eroznik committed Jun 25, 2021
1 parent 621186c commit 0214a78
Show file tree
Hide file tree
Showing 8 changed files with 140 additions and 3 deletions.
5 changes: 5 additions & 0 deletions src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,11 @@ ksm {

}

server {
port = 8080
port = ${?SERVER_PORT}
}

parser {
csv {
delimiter = ","
Expand Down
11 changes: 9 additions & 2 deletions src/main/scala/io/conduktor/ksm/AclSynchronizer.scala
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package io.conduktor.ksm

import io.conduktor.ksm.source.SourceAcl
import io.conduktor.ksm.notification.Notification
import io.conduktor.ksm.source.{ParsingContext, SourceAcl}
import io.conduktor.ksm.web.Probe
import kafka.security.auth.{Acl, Authorizer, Resource}
import org.slf4j.{Logger, LoggerFactory}

Expand Down Expand Up @@ -62,12 +62,13 @@ class AclSynchronizer(
notification: Notification,
numFailedRefreshesBeforeNotification: Int,
readOnly: Boolean = false
) extends Runnable {
) extends Runnable with Probe {

import AclSynchronizer._

private var sourceAclsCache: Set[(Resource, Acl)] = _
private var failedRefreshes: Int = 0
private var isRefreshFailing = false

if (readOnly) {
log.warn("""
Expand All @@ -84,6 +85,7 @@ class AclSynchronizer(
Try(sourceAcl.refresh()) match {
case Success(result) =>
failedRefreshes = 0
isRefreshFailing = false
result match {
// the source has not changed
case None =>
Expand Down Expand Up @@ -125,6 +127,7 @@ class AclSynchronizer(
case Failure(e) =>
// errors such as HTTP exceptions when refreshing
failedRefreshes += 1
isRefreshFailing = true
try {
log.error("Exceptions while refreshing ACL source:", e)
if(failedRefreshes >= numFailedRefreshesBeforeNotification){
Expand All @@ -147,4 +150,8 @@ class AclSynchronizer(
sourceAcl.close()
notification.close()
}

override def isSuccessful: Boolean = {
!isRefreshFailing
}
}
5 changes: 5 additions & 0 deletions src/main/scala/io/conduktor/ksm/AppConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,11 @@ class AppConfig(config: Config) {
val readOnly: Boolean = ksmConfig.getBoolean("readonly")
}

object Server {
private val serverConfig = config.getConfig("server")
val port: Int = serverConfig.getInt("port")
}

object Parser {
private val aclParserConfig = config.getConfig("parser")
val csvDelimiter: Char =
Expand Down
7 changes: 6 additions & 1 deletion src/main/scala/io/conduktor/ksm/KafkaSecurityManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package io.conduktor.ksm

import com.typesafe.config.ConfigFactory
import io.conduktor.ksm.parser.AclParserRegistry
import io.conduktor.ksm.web.Server
import org.slf4j.LoggerFactory

import java.util.concurrent.atomic.AtomicBoolean
Expand All @@ -18,6 +19,7 @@ object KafkaSecurityManager extends App {
var aclSynchronizer: AclSynchronizer = _
val parserRegistry: AclParserRegistry = new AclParserRegistry(appConfig)
val scheduler: ScheduledExecutorService = Executors.newScheduledThreadPool(1)
var server: Server = _

// For backward compatibility, see https://github.com/conduktor/kafka-security-manager/issues/103
val oldExtractConfig = sys.env.get("KSM_EXTRACT")
Expand All @@ -37,6 +39,7 @@ object KafkaSecurityManager extends App {
appConfig.KSM.numFailedRefreshesBeforeNotification,
appConfig.KSM.readOnly
)
server = new Server(appConfig.Server.port, List(aclSynchronizer))

Runtime.getRuntime.addShutdownHook(new Thread() {
override def run(): Unit = {
Expand All @@ -54,6 +57,7 @@ object KafkaSecurityManager extends App {
log.info(
"Continuous mode: ACL will be synchronized every " + appConfig.KSM.refreshFrequencyMs + " ms."
)
server.start()
val handle = scheduler.scheduleAtFixedRate(
aclSynchronizer,
0,
Expand All @@ -68,13 +72,14 @@ object KafkaSecurityManager extends App {
} finally {
shutdown()
}

}

def shutdown(): Unit = {
log.info("Kafka Security Manager is shutting down...")
isCancelled = new AtomicBoolean(true)
aclSynchronizer.close()
if (server != null)
server.stop()
scheduler.shutdownNow()
}
}
23 changes: 23 additions & 0 deletions src/main/scala/io/conduktor/ksm/web/ProbeHandler.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package io.conduktor.ksm.web

import com.sun.net.httpserver.{HttpExchange, HttpHandler}

trait Probe {
// implementation should be non blocking
def isSuccessful: Boolean
}

class ProbeHandler(probes: List[Probe]) extends HttpHandler {
override def handle(exc: HttpExchange): Unit = {
val checkup = probes.forall(p => p.isSuccessful)
val payload = Server.responseMapper
.createObjectNode()
.put("success", checkup)
val response = Server.responseMapper.writeValueAsString(payload)
val responseCode = if (checkup) 200 else 500
exc.sendResponseHeaders(responseCode, response.length())
val os = exc.getResponseBody
os.write(response.getBytes)
os.close()
}
}
29 changes: 29 additions & 0 deletions src/main/scala/io/conduktor/ksm/web/Server.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package io.conduktor.ksm.web

import com.fasterxml.jackson.databind.ObjectMapper
import com.sun.net.httpserver.HttpServer
import org.slf4j.LoggerFactory

import java.net.InetSocketAddress
import java.util.concurrent.Executors

object Server {
val responseMapper = new ObjectMapper()
}

class Server(port: Int, livenessProbes: List[Probe]) {
private val log = LoggerFactory.getLogger(Server.getClass)
private val server = HttpServer.create(new InetSocketAddress(port), 0)
server.createContext("/api/probe/ready", new ProbeHandler(List()))
server.createContext("/api/probe/alive", new ProbeHandler(livenessProbes))

def start(): Unit = {
log.info("Staring server on {}", port)
server.setExecutor(Executors.newSingleThreadExecutor())
server.start()
}

def stop(): Unit = {
server.stop(0)
}
}
3 changes: 3 additions & 0 deletions src/test/scala/io/conduktor/ksm/AclSynchronizerTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,7 @@ class AclSynchronizerTest
aclSynchronizer.run()
dummyNotification.addedAcls.size shouldBe 3
dummyNotification.removedAcls.size shouldBe 0
aclSynchronizer.isSuccessful shouldBe true
eventually(timeout(3000 milliseconds), interval(200 milliseconds)) {
simpleAclAuthorizer
.getAcls() shouldBe Map(res1 -> Set(acl1, acl2), res2 -> Set(acl3))
Expand All @@ -251,6 +252,7 @@ class AclSynchronizerTest
dummyNotification.reset()
dummySourceAcl.setErrorNext()
aclSynchronizer.run()
aclSynchronizer.isSuccessful shouldBe false
dummyNotification.addedAcls.size shouldBe 0
dummyNotification.removedAcls.size shouldBe 0
dummyNotification.errorCounter shouldBe 1
Expand All @@ -264,6 +266,7 @@ class AclSynchronizerTest
aclSynchronizer.run()
dummyNotification.addedAcls.size shouldBe 1
dummyNotification.removedAcls.size shouldBe 1
aclSynchronizer.isSuccessful shouldBe true
eventually(timeout(3000 milliseconds), interval(200 milliseconds)) {
simpleAclAuthorizer.getAcls() shouldBe Map(
res1 -> Set(acl1),
Expand Down
60 changes: 60 additions & 0 deletions src/test/scala/io/conduktor/ksm/web/ServerTest.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package io.conduktor.ksm.web

import org.scalatest.{BeforeAndAfterAll, FlatSpec}
import skinny.http.{HTTP, Request}

class LivenessTestProbe extends Probe {
var success = false;

override def isSuccessful: Boolean = {
success
}
}

class ServerTest extends FlatSpec with BeforeAndAfterAll {
private val livenessTestProbe1 = new LivenessTestProbe
private val livenessTestProbe2 = new LivenessTestProbe
private val testSubject =
new Server(7777, List(livenessTestProbe1, livenessTestProbe2))
private val testServerUrl = "http://localhost:7777";
private val testReadyEndpoint = testServerUrl + "/api/probe/ready"
private val testAliveEndpoint = testServerUrl + "/api/probe/alive"

override protected def beforeAll(): Unit = {
testSubject.start()
}

override protected def afterAll(): Unit = {
testSubject.stop()
}

"get ready probe endpoint" should "return 200 with success true" in {
val response = HTTP.get(Request(testReadyEndpoint))
assert(response.status == 200)
assert(new String(response.body) == "{\"success\":true}")
}

"get alive probe endpoint" should "return 200 with success true, if all probes are successful" in {
livenessTestProbe1.success = true
livenessTestProbe2.success = true
val response = HTTP.get(Request(testAliveEndpoint))
assert(response.status == 200)
assert(new String(response.body) == "{\"success\":true}")
}

"get alive probe endpoint" should "return 500 with success false, if some probes are un-successful" in {
livenessTestProbe1.success = true
livenessTestProbe2.success = false
val response = HTTP.get(Request(testAliveEndpoint))
assert(response.status == 500)
assert(new String(response.body) == "{\"success\":false}")
}

"get alive probe endpoint" should "return 500 with success false, if all probes are un-successful" in {
livenessTestProbe1.success = false
livenessTestProbe2.success = false
val response = HTTP.get(Request(testAliveEndpoint))
assert(response.status == 500)
assert(new String(response.body) == "{\"success\":false}")
}
}

0 comments on commit 0214a78

Please sign in to comment.