From 0214a78ae92b15f824dd7b493b7a05824062b234 Mon Sep 17 00:00:00 2001 From: Etien Roznik <12816736+eroznik@users.noreply.github.com> Date: Fri, 25 Jun 2021 11:10:30 +0200 Subject: [PATCH] rest probe endpoints(#108) --- src/main/resources/application.conf | 5 ++ .../io/conduktor/ksm/AclSynchronizer.scala | 11 +++- .../scala/io/conduktor/ksm/AppConfig.scala | 5 ++ .../conduktor/ksm/KafkaSecurityManager.scala | 7 ++- .../io/conduktor/ksm/web/ProbeHandler.scala | 23 +++++++ .../scala/io/conduktor/ksm/web/Server.scala | 29 +++++++++ .../conduktor/ksm/AclSynchronizerTest.scala | 3 + .../io/conduktor/ksm/web/ServerTest.scala | 60 +++++++++++++++++++ 8 files changed, 140 insertions(+), 3 deletions(-) create mode 100644 src/main/scala/io/conduktor/ksm/web/ProbeHandler.scala create mode 100644 src/main/scala/io/conduktor/ksm/web/Server.scala create mode 100644 src/test/scala/io/conduktor/ksm/web/ServerTest.scala diff --git a/src/main/resources/application.conf b/src/main/resources/application.conf index 9bad7b5..2d45032 100644 --- a/src/main/resources/application.conf +++ b/src/main/resources/application.conf @@ -21,6 +21,11 @@ ksm { } +server { + port = 8080 + port = ${?SERVER_PORT} +} + parser { csv { delimiter = "," diff --git a/src/main/scala/io/conduktor/ksm/AclSynchronizer.scala b/src/main/scala/io/conduktor/ksm/AclSynchronizer.scala index dccc56f..5bdd8c2 100644 --- a/src/main/scala/io/conduktor/ksm/AclSynchronizer.scala +++ b/src/main/scala/io/conduktor/ksm/AclSynchronizer.scala @@ -1,8 +1,8 @@ package io.conduktor.ksm -import io.conduktor.ksm.source.SourceAcl import io.conduktor.ksm.notification.Notification import io.conduktor.ksm.source.{ParsingContext, SourceAcl} +import io.conduktor.ksm.web.Probe import kafka.security.auth.{Acl, Authorizer, Resource} import org.slf4j.{Logger, LoggerFactory} @@ -62,12 +62,13 @@ class AclSynchronizer( notification: Notification, numFailedRefreshesBeforeNotification: Int, readOnly: Boolean = false -) extends Runnable { +) extends Runnable with Probe { import AclSynchronizer._ private var sourceAclsCache: Set[(Resource, Acl)] = _ private var failedRefreshes: Int = 0 + private var isRefreshFailing = false if (readOnly) { log.warn(""" @@ -84,6 +85,7 @@ class AclSynchronizer( Try(sourceAcl.refresh()) match { case Success(result) => failedRefreshes = 0 + isRefreshFailing = false result match { // the source has not changed case None => @@ -125,6 +127,7 @@ class AclSynchronizer( case Failure(e) => // errors such as HTTP exceptions when refreshing failedRefreshes += 1 + isRefreshFailing = true try { log.error("Exceptions while refreshing ACL source:", e) if(failedRefreshes >= numFailedRefreshesBeforeNotification){ @@ -147,4 +150,8 @@ class AclSynchronizer( sourceAcl.close() notification.close() } + + override def isSuccessful: Boolean = { + !isRefreshFailing + } } diff --git a/src/main/scala/io/conduktor/ksm/AppConfig.scala b/src/main/scala/io/conduktor/ksm/AppConfig.scala index 6c7f5b0..40804d4 100644 --- a/src/main/scala/io/conduktor/ksm/AppConfig.scala +++ b/src/main/scala/io/conduktor/ksm/AppConfig.scala @@ -64,6 +64,11 @@ class AppConfig(config: Config) { val readOnly: Boolean = ksmConfig.getBoolean("readonly") } + object Server { + private val serverConfig = config.getConfig("server") + val port: Int = serverConfig.getInt("port") + } + object Parser { private val aclParserConfig = config.getConfig("parser") val csvDelimiter: Char = diff --git a/src/main/scala/io/conduktor/ksm/KafkaSecurityManager.scala b/src/main/scala/io/conduktor/ksm/KafkaSecurityManager.scala index 1db1932..c3a2ae8 100644 --- a/src/main/scala/io/conduktor/ksm/KafkaSecurityManager.scala +++ b/src/main/scala/io/conduktor/ksm/KafkaSecurityManager.scala @@ -2,6 +2,7 @@ package io.conduktor.ksm import com.typesafe.config.ConfigFactory import io.conduktor.ksm.parser.AclParserRegistry +import io.conduktor.ksm.web.Server import org.slf4j.LoggerFactory import java.util.concurrent.atomic.AtomicBoolean @@ -18,6 +19,7 @@ object KafkaSecurityManager extends App { var aclSynchronizer: AclSynchronizer = _ val parserRegistry: AclParserRegistry = new AclParserRegistry(appConfig) val scheduler: ScheduledExecutorService = Executors.newScheduledThreadPool(1) + var server: Server = _ // For backward compatibility, see https://github.com/conduktor/kafka-security-manager/issues/103 val oldExtractConfig = sys.env.get("KSM_EXTRACT") @@ -37,6 +39,7 @@ object KafkaSecurityManager extends App { appConfig.KSM.numFailedRefreshesBeforeNotification, appConfig.KSM.readOnly ) + server = new Server(appConfig.Server.port, List(aclSynchronizer)) Runtime.getRuntime.addShutdownHook(new Thread() { override def run(): Unit = { @@ -54,6 +57,7 @@ object KafkaSecurityManager extends App { log.info( "Continuous mode: ACL will be synchronized every " + appConfig.KSM.refreshFrequencyMs + " ms." ) + server.start() val handle = scheduler.scheduleAtFixedRate( aclSynchronizer, 0, @@ -68,13 +72,14 @@ object KafkaSecurityManager extends App { } finally { shutdown() } - } def shutdown(): Unit = { log.info("Kafka Security Manager is shutting down...") isCancelled = new AtomicBoolean(true) aclSynchronizer.close() + if (server != null) + server.stop() scheduler.shutdownNow() } } diff --git a/src/main/scala/io/conduktor/ksm/web/ProbeHandler.scala b/src/main/scala/io/conduktor/ksm/web/ProbeHandler.scala new file mode 100644 index 0000000..d9528cc --- /dev/null +++ b/src/main/scala/io/conduktor/ksm/web/ProbeHandler.scala @@ -0,0 +1,23 @@ +package io.conduktor.ksm.web + +import com.sun.net.httpserver.{HttpExchange, HttpHandler} + +trait Probe { + // implementation should be non blocking + def isSuccessful: Boolean +} + +class ProbeHandler(probes: List[Probe]) extends HttpHandler { + override def handle(exc: HttpExchange): Unit = { + val checkup = probes.forall(p => p.isSuccessful) + val payload = Server.responseMapper + .createObjectNode() + .put("success", checkup) + val response = Server.responseMapper.writeValueAsString(payload) + val responseCode = if (checkup) 200 else 500 + exc.sendResponseHeaders(responseCode, response.length()) + val os = exc.getResponseBody + os.write(response.getBytes) + os.close() + } +} diff --git a/src/main/scala/io/conduktor/ksm/web/Server.scala b/src/main/scala/io/conduktor/ksm/web/Server.scala new file mode 100644 index 0000000..c8976c7 --- /dev/null +++ b/src/main/scala/io/conduktor/ksm/web/Server.scala @@ -0,0 +1,29 @@ +package io.conduktor.ksm.web + +import com.fasterxml.jackson.databind.ObjectMapper +import com.sun.net.httpserver.HttpServer +import org.slf4j.LoggerFactory + +import java.net.InetSocketAddress +import java.util.concurrent.Executors + +object Server { + val responseMapper = new ObjectMapper() +} + +class Server(port: Int, livenessProbes: List[Probe]) { + private val log = LoggerFactory.getLogger(Server.getClass) + private val server = HttpServer.create(new InetSocketAddress(port), 0) + server.createContext("/api/probe/ready", new ProbeHandler(List())) + server.createContext("/api/probe/alive", new ProbeHandler(livenessProbes)) + + def start(): Unit = { + log.info("Staring server on {}", port) + server.setExecutor(Executors.newSingleThreadExecutor()) + server.start() + } + + def stop(): Unit = { + server.stop(0) + } +} diff --git a/src/test/scala/io/conduktor/ksm/AclSynchronizerTest.scala b/src/test/scala/io/conduktor/ksm/AclSynchronizerTest.scala index b69a100..aa6bc67 100644 --- a/src/test/scala/io/conduktor/ksm/AclSynchronizerTest.scala +++ b/src/test/scala/io/conduktor/ksm/AclSynchronizerTest.scala @@ -242,6 +242,7 @@ class AclSynchronizerTest aclSynchronizer.run() dummyNotification.addedAcls.size shouldBe 3 dummyNotification.removedAcls.size shouldBe 0 + aclSynchronizer.isSuccessful shouldBe true eventually(timeout(3000 milliseconds), interval(200 milliseconds)) { simpleAclAuthorizer .getAcls() shouldBe Map(res1 -> Set(acl1, acl2), res2 -> Set(acl3)) @@ -251,6 +252,7 @@ class AclSynchronizerTest dummyNotification.reset() dummySourceAcl.setErrorNext() aclSynchronizer.run() + aclSynchronizer.isSuccessful shouldBe false dummyNotification.addedAcls.size shouldBe 0 dummyNotification.removedAcls.size shouldBe 0 dummyNotification.errorCounter shouldBe 1 @@ -264,6 +266,7 @@ class AclSynchronizerTest aclSynchronizer.run() dummyNotification.addedAcls.size shouldBe 1 dummyNotification.removedAcls.size shouldBe 1 + aclSynchronizer.isSuccessful shouldBe true eventually(timeout(3000 milliseconds), interval(200 milliseconds)) { simpleAclAuthorizer.getAcls() shouldBe Map( res1 -> Set(acl1), diff --git a/src/test/scala/io/conduktor/ksm/web/ServerTest.scala b/src/test/scala/io/conduktor/ksm/web/ServerTest.scala new file mode 100644 index 0000000..a489126 --- /dev/null +++ b/src/test/scala/io/conduktor/ksm/web/ServerTest.scala @@ -0,0 +1,60 @@ +package io.conduktor.ksm.web + +import org.scalatest.{BeforeAndAfterAll, FlatSpec} +import skinny.http.{HTTP, Request} + +class LivenessTestProbe extends Probe { + var success = false; + + override def isSuccessful: Boolean = { + success + } +} + +class ServerTest extends FlatSpec with BeforeAndAfterAll { + private val livenessTestProbe1 = new LivenessTestProbe + private val livenessTestProbe2 = new LivenessTestProbe + private val testSubject = + new Server(7777, List(livenessTestProbe1, livenessTestProbe2)) + private val testServerUrl = "http://localhost:7777"; + private val testReadyEndpoint = testServerUrl + "/api/probe/ready" + private val testAliveEndpoint = testServerUrl + "/api/probe/alive" + + override protected def beforeAll(): Unit = { + testSubject.start() + } + + override protected def afterAll(): Unit = { + testSubject.stop() + } + + "get ready probe endpoint" should "return 200 with success true" in { + val response = HTTP.get(Request(testReadyEndpoint)) + assert(response.status == 200) + assert(new String(response.body) == "{\"success\":true}") + } + + "get alive probe endpoint" should "return 200 with success true, if all probes are successful" in { + livenessTestProbe1.success = true + livenessTestProbe2.success = true + val response = HTTP.get(Request(testAliveEndpoint)) + assert(response.status == 200) + assert(new String(response.body) == "{\"success\":true}") + } + + "get alive probe endpoint" should "return 500 with success false, if some probes are un-successful" in { + livenessTestProbe1.success = true + livenessTestProbe2.success = false + val response = HTTP.get(Request(testAliveEndpoint)) + assert(response.status == 500) + assert(new String(response.body) == "{\"success\":false}") + } + + "get alive probe endpoint" should "return 500 with success false, if all probes are un-successful" in { + livenessTestProbe1.success = false + livenessTestProbe2.success = false + val response = HTTP.get(Request(testAliveEndpoint)) + assert(response.status == 500) + assert(new String(response.body) == "{\"success\":false}") + } +}