diff --git a/docker-entrypoint.sh b/docker-entrypoint.sh index ec6505d..f1f31d6 100644 --- a/docker-entrypoint.sh +++ b/docker-entrypoint.sh @@ -49,75 +49,8 @@ echo -e "Environment variables:\n\ MARATHON_APP_ID: $MARATHON_APP_ID\n\ MESOS_TASK_ID: $MESOS_TASK_ID\n" -KAFKA_GROUP=`echo ${MARATHON_APP_ID} | cut -d / -f 2` -echo -e "Derived the group this application is running in from the marathon app ID: $KAFKA_GROUP\n" - -# DNS = -# marathon app id, -# split on / -# reverse the list -# remove empty lines -# replace line endings by . -# append marathon.mesos -# Which becomes something like container_name.tenant.marathon.mesos - -DNS=`echo ${MARATHON_APP_ID} | tr "/" "\n" | sed '1!G;h;$!d' |grep -v "^$" |tr "\n" "." | sed "s/$/marathon.mesos/g"` -echo -e "Derived the DNS name from the marathon app ID: $DNS\n" - # Get DSH CA certificate into a file echo "${DSH_CA_CERTIFICATE}" > ${PKI_CONFIG_DIR}/ca.crt - -echo "Using the CA certificate to request DN from Pikachu:" -DN=`curl --cacert ${PKI_CONFIG_DIR}/ca.crt -s "https://${KAFKA_CONFIG_HOST}/dn/${KAFKA_GROUP}/${MESOS_TASK_ID}"` - -# Bail out if we get back an invalid DN -if echo "${DN}" | grep "^CN=" -then - echo -else - echo "Error: Expected the DN received from Pikachu to start with 'CN=' but received '${DN}'" - exit 1 -fi - -# DN comes back as a comma delimited list, split it into separate lines: -echo $DN > ${PKI_CONFIG_DIR}/dndata -sed -i 's/,/\n/g' ${PKI_CONFIG_DIR}/dndata -#echo "DN before sed/=/ = /g: $(cat ${PKI_CONFIG_DIR}/dndata)" - -# Format the DN in the temp file -sed -i 's/=/ = /g' ${PKI_CONFIG_DIR}/dndata -# Read into a variable so we can place it in the csr.conf -DNDATA=$(cat ${PKI_CONFIG_DIR}/dndata) -#echo "DN after sed/=/ = /g: ${DNDATA}" - -echo "DN data: ${DNDATA}" -echo "DNS data: ${DNS}" - -echo "Prepare CSR data: ${PKI_CONFIG_DIR}/csr.conf" -tee ${PKI_CONFIG_DIR}/csr.conf < ${PKI_CONFIG_DIR}/client.pem - -# Remove intermediate files -rm -f ${PKI_CONFIG_DIR}/client.csr ${PKI_CONFIG_DIR}/csr.conf - set +u if [ -n "${KAFKA_PROXY_CA_CHAIN:-}" ]; then @@ -173,24 +106,17 @@ done # this should be picked up by kafka-proxy subjects="$( printf "%s" "$subjects" | cut -c2- )" - -printf "%s\n" "$subjects" - # ------------------------------------------------------------------ # using an if to avoid passing a variable as parameters # this can cause issues with string splitting # which than makes the subjects fail in case of CN=[MY Space Company] # Do not try to be clever with the shell skills and leave it as is! # ------------------------------------------------------------------ - # --tls-client-cert-file /app/pki/client.pem \ - # --tls-client-key-file /app/pki/key.pem \ if [[ "x$subjects" = "x" ]] then ./kafka-proxy server \ --tls-enable \ --tls-ca-chain-cert-file /app/pki/ca.crt \ - --tls-client-cert-file /app/pki/client.pem \ - --tls-client-key-file /app/pki/key.pem \ --proxy-listener-tls-enable \ --proxy-listener-cipher-suites TLS_ECDHE_ECDSA_WITH_CHACHA20_POLY1305_SHA256 \ --proxy-listener-cipher-suites TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384 \ @@ -215,8 +141,6 @@ else ./kafka-proxy server \ --tls-enable \ --tls-ca-chain-cert-file /app/pki/ca.crt \ - --tls-client-cert-file /app/pki/client.pem \ - --tls-client-key-file /app/pki/key.pem \ --proxy-listener-tls-enable \ --proxy-listener-cipher-suites TLS_ECDHE_ECDSA_WITH_CHACHA20_POLY1305_SHA256 \ --proxy-listener-cipher-suites TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384 \ diff --git a/proxy/client.go b/proxy/client.go index 4337992..6dfe018 100644 --- a/proxy/client.go +++ b/proxy/client.go @@ -2,19 +2,11 @@ package proxy import ( "context" - "crypto/rand" - "crypto/rsa" "crypto/tls" "crypto/x509" - "crypto/x509/pkix" - "encoding/pem" "fmt" - "io" - "io/ioutil" "net" - "net/http" "os" - "strings" "sync" "time" @@ -34,123 +26,7 @@ type CertStore interface { Get(clientID string) (tls.Certificate, error) } -func NewPikachuCertStore() (*PikachuCertStore, error) { - caCertPool := x509.NewCertPool() - caCert := os.Getenv("DSH_CA_CERTIFICATE") - if ok := caCertPool.AppendCertsFromPEM([]byte(caCert)); !ok { - return nil, errors.New("failed to parse DSH CA certificate") - } - tlsConfig := &tls.Config{ - RootCAs: caCertPool, - } - transport := &http.Transport{ - TLSClientConfig: tlsConfig, - } - - httpClient := http.Client{ - Transport: transport, - } - - - pikachuAddr := os.Getenv("KAFKA_CONFIG_HOST") - - certStore := PikachuCertStore{ - httpClient: httpClient, - pikachuAddr: pikachuAddr, - } - - return &certStore, nil -} - -type PikachuCertStore struct { - httpClient http.Client; - pikachuAddr string; -} - -func (s *PikachuCertStore) Get(clientID string) (tls.Certificate, error) { - startedAt := time.Now() - logrus.WithField( "clientID",clientID ).Info("Getting certificate from Pikachu") - - marathonAppId := os.Getenv("MARATHON_APP_ID") - mesosTaskId := os.Getenv("MESOS_TASK_ID") - - if marathonAppId == "" { - return tls.Certificate{}, errors.New("MARATHON_APP_ID is not set") - } - - pieces := strings.Split(marathonAppId, "/") - appName := pieces[2] - tenant := pieces[1] - - dnsName := fmt.Sprintf("%s.%s.marathon.mesos", appName, tenant) - - privateKey, err := rsa.GenerateKey(rand.Reader, 2048) - if err != nil { - return tls.Certificate{}, fmt.Errorf("failed to generate private key: %v", err) - } - - csrTemplate := &x509.CertificateRequest{ - Subject: pkix.Name{ - CommonName: fmt.Sprintf( "%s.acl.%s", clientID, tenant), - OrganizationalUnit: []string{tenant}, - Organization: []string{"dsh"}, - }, - SignatureAlgorithm: x509.SHA256WithRSA, - DNSNames: []string{dnsName}, - } - - csrBytes, err := x509.CreateCertificateRequest(rand.Reader, csrTemplate, privateKey) - if err != nil { - fmt.Printf("Failed to create CSR: %v\n", err) - return tls.Certificate{}, err - } - - csrPEM := pem.EncodeToMemory(&pem.Block{ - Type: "CERTIFICATE REQUEST", - Bytes: csrBytes, - }) - - req, err := http.NewRequest("POST", fmt.Sprintf( "https://%s/sign/%s/%s", s.pikachuAddr, tenant, mesosTaskId ), strings.NewReader(string(csrPEM))) - if err != nil { - return tls.Certificate{}, fmt.Errorf("failed to create request: %v", err) - } - req.Header.Set("X-Kafka-Config-Token", os.Getenv("DSH_SECRET_TOKEN")) - - resp, err := s.httpClient.Do(req) - if err != nil { - logrus.WithField("clientID",clientID).Error("Failed to send request to Pikachu") - return tls.Certificate{}, fmt.Errorf("failed to send request: %v", err) - } - defer resp.Body.Close() - - if resp.StatusCode != http.StatusOK { - body, err := io.ReadAll(resp.Body) // The body often contains more information about the error - if err == nil { - logrus.WithField("status", resp.Status).WithField("clientID",clientID).Error("Failed to get certificate from Pikachu") - } else { - logrus.WithField("status", resp.Status).WithField("clientID",clientID).Errorf("Failed to get certificate from Pikachu: %s", string(body)) - } - return tls.Certificate{}, fmt.Errorf("received unexpected response: %s", resp.Status) - } - - crtPEM, err := io.ReadAll(resp.Body) - if err != nil { - return tls.Certificate{}, fmt.Errorf("failed to read response body: %v", err) - } - - cert, err := tls.X509KeyPair(crtPEM, pem.EncodeToMemory(&pem.Block{ - Type: "RSA PRIVATE KEY", - Bytes: x509.MarshalPKCS1PrivateKey(privateKey), - })) - if err != nil { - return tls.Certificate{}, fmt.Errorf("failed to parse certificate and key: %v", err) - } - - logrus.WithField("clientID",clientID).WithField("duration", time.Since(startedAt)).Info("Got certificate from Pikachu") - - return cert, nil -} type SingleCertStore struct { cert tls.Certificate @@ -160,34 +36,6 @@ func (s *SingleCertStore) Get(clientID string) (tls.Certificate, error) { return s.cert, nil } -type InMemoryCertStore struct { - mutex sync.Mutex - byCommonName map[string]tls.Certificate -} - -func NewInMemoryCertStore() *InMemoryCertStore { - return &InMemoryCertStore{ - byCommonName: make(map[string]tls.Certificate), - } -} - -func (cs *InMemoryCertStore) Add(clientID string, cert tls.Certificate) { - cs.mutex.Lock() - defer cs.mutex.Unlock() - cs.byCommonName[clientID] = cert -} - -func (cs *InMemoryCertStore) Get(clientID string) (tls.Certificate, error) { - cs.mutex.Lock() - defer cs.mutex.Unlock() - - if cert, ok := cs.byCommonName[clientID]; !ok { - return tls.Certificate{}, fmt.Errorf("certificate not found for clientID %s", clientID) - } else { - return cert, nil - } -} - // Client is a type to handle connecting to a Server. All fields are required // unless otherwise specified. type Client struct { @@ -405,7 +253,6 @@ func newDialer(c *config.Config, tlsConfig *tls.Config) (DialerWithContext, erro return nil, errors.New("tlsConfig must not be nil") } - // certStore, err := hardcodedCertStore() // certStore, err := createCertStore(c) certStore, err := NewPikachuCertStore() if err != nil { @@ -423,161 +270,13 @@ func newDialer(c *config.Config, tlsConfig *tls.Config) (DialerWithContext, erro return rawDialer, nil } -func hardcodedCertStore() (CertStore, error) { - certStore := NewInMemoryCertStore() - - password := os.Getenv("KAFKA_PROXY_TENANT_CERT_PASSWORD") - - test1CertPEMBlock := []byte(`-----BEGIN CERTIFICATE----- -MIIDhTCCAyugAwIBAgIQJ6YChcNKEEsF5mZF3Vsj0jAKBggqhkjOPQQDAjAOMQww -CgYDVQQDEwNkc2gwHhcNMjQxMDA5MTIyODU0WhcNMjUxMDA5MTIyODU0WjA2MRUw -EwYDVQQDDAx0ZXN0MS5kc2hkZXYxDzANBgNVBAsMBmRzaGRldjEMMAoGA1UECgwD -ZHNoMIICIjANBgkqhkiG9w0BAQEFAAOCAg8AMIICCgKCAgEAtxWWb7D09gW0iUi8 -4OV/KgPAg7jHadTK254BXzALnuF//L56T2vpzNxlg8q1jkKO7/tClr97P1kDuhsX -cxMYC30JFozJpmAoWLUH+ckN8W5MMZfVhMasxyOqFMgLKx1H3lewMJwXm6tio+Td -GP9b5mPNpQxRuBRe3a0zjYUwLAmL1Ile2lNFdDu/WGNfZMh1fLZfY0e7Kb/cde9f -I4Bp4CTPahWQk+xOIAjFmFvfJ0LnnFXgJ2YKD2KSR3dKSBYHsutBz2+T4HHzB/dy -xQEPIzcynFPF58Vsr3CS3mDk9All+ZIkUKJ2eYTgkoQBmpGU5awuJvrbhWs/f3TH -RH6YfDFAL8iLCDLLyEMKOT7dXv9h5aFckv2CrVe0aF+1jJ3XUFKO4JPn2CoMyJTZ -TFQiDDaPeqSNTBneD22xMhcpdWsF3CzH7X6sauFsYl8neEuseNFvFCdyrBYVde7q -dSCD/v/dLJaPgYd+dbenvKxpcf+o7dOKzLs1J1F/we8D52vcQW4HleJFfFu7QS+f -M2vK/vuHYp1EJ4om9r6P+vFskuFyIaHWnB/+hNFvjPaheRpKN1skiTvNRTXi/A/a -SmoXjTwZAsAqVvsWvE+cUknvZZ0lrsj0DN8U0nylnvpo1SW97Mtg6SQ0ti0OkeEc -zoQmJ+ZuZk3RUpYeGvCtAPGxOfcCAwEAAaN4MHYwDgYDVR0PAQH/BAQDAgWgMAwG -A1UdEwEB/wQCMAAwHwYDVR0jBBgwFoAU7r3L9vVXYMb+u1UvfxnXLqI13iQwNQYD -VR0RBC4wLIIqa2Fma2EtZHluY2VydC1tYW51YWwuZHNoZGV2Lm1hcmF0aG9uLm1l -c29zMAoGCCqGSM49BAMCA0gAMEUCIQDHyumuKfa2Y2C7w38C0skuKUszpQGi17Xy -9oAwPVNNYgIgf4CpDeYaUnSRuNEbEyheRp5XhRGKu/bDzCeIFyv0cz8= ------END CERTIFICATE-----`) - test1KeyPEMBlock := []byte(`-----BEGIN PRIVATE KEY----- -MIIJQwIBADANBgkqhkiG9w0BAQEFAASCCS0wggkpAgEAAoICAQC3FZZvsPT2BbSJ -SLzg5X8qA8CDuMdp1MrbngFfMAue4X/8vnpPa+nM3GWDyrWOQo7v+0KWv3s/WQO6 -GxdzExgLfQkWjMmmYChYtQf5yQ3xbkwxl9WExqzHI6oUyAsrHUfeV7AwnBebq2Kj -5N0Y/1vmY82lDFG4FF7drTONhTAsCYvUiV7aU0V0O79YY19kyHV8tl9jR7spv9x1 -718jgGngJM9qFZCT7E4gCMWYW98nQuecVeAnZgoPYpJHd0pIFgey60HPb5PgcfMH -93LFAQ8jNzKcU8XnxWyvcJLeYOT0CWX5kiRQonZ5hOCShAGakZTlrC4m+tuFaz9/ -dMdEfph8MUAvyIsIMsvIQwo5Pt1e/2HloVyS/YKtV7RoX7WMnddQUo7gk+fYKgzI -lNlMVCIMNo96pI1MGd4PbbEyFyl1awXcLMftfqxq4WxiXyd4S6x40W8UJ3KsFhV1 -7up1IIP+/90slo+Bh351t6e8rGlx/6jt04rMuzUnUX/B7wPna9xBbgeV4kV8W7tB -L58za8r++4dinUQniib2vo/68WyS4XIhodacH/6E0W+M9qF5Gko3WySJO81FNeL8 -D9pKaheNPBkCwCpW+xa8T5xSSe9lnSWuyPQM3xTSfKWe+mjVJb3sy2DpJDS2LQ6R -4RzOhCYn5m5mTdFSlh4a8K0A8bE59wIDAQABAoICACbZC8yzTacOWtpudrV5vNvb -Y/QPNzD7l/e1tTDPbwfNbWzhte44RanUci3ey1teQBonxF9cTfiJGMNakEU4nfeC -FnWUdg05hhYcg4663JyH/N7hCstbpLm0lZhHQidp1SAWlyoZVOERIqMIIun09TkT -oEUPMh/PIroC0FeNFUWi2OrGIzxMQwaAzv8gRtOJvXQy9laIl+wp68uSeSt9Sfnn -laM65CyfDYy6j8Lea6FXDvIbPq7ddfEBx2wmobpeAlAynwo28LfFNf5QwSeQK/ga -9EA7fljJiWBIkMFi6fHmNJ8aJV5+uojaaxRrsjNsBJ4fRMO8ouvSb+zJiKa9/fiw -Szyy5fN2mvAjK1PrfOaUVBA7154/AxDQswdZkbPajgFih/qFHhxWWR8/PQ0+5d3g -p4kra8xm9klrMA/QEGSJQwozuze4RYzTgUHIij578XDxYOrTPaq646vDB9OGisY7 -2WlMSOdsx0aC8jWjRSZvLigVKaS6C9+4qX7oLItEk7riAVYKpBR6Ja+Hr/a1VyAm -7Lt5dT8SV60eEWSQ4zj09BVv35EoviHGqNHs4O7MxlZBRLbgXv5PT9JZU44RFlp5 -ta4mx0PtoVOJoZHmauRl2PKXleC6QDFlvgCt7T6jWf4M1LaFEgsyayedh8QW1zy/ -Qvbxh7tsMc8wN8ytOCjRAoIBAQDd+pDrDa2MZhJ6lxGDUjk4kvlH/DarCpyGRevK -Ol9CL5ZmZunNFDJs8UwtpwLRPUMbXnTy1ee04LH5YXLWGEakLMXIAoMxLdqQJeff -MzkD098OgGSjro20kWDADV2IKDlN9kr4tANAe7wpkyM0m9bxQijgPTyi4gZXUmiB -kq8TwuX1GG25Kptqa4UdcKxRX8plqcFpuhLlZWPGb/qEcRT7UXT72RZ/gp3uBXHf -jq6TuDXzocaCF9iwL7TeK7OfxsqciozH/2TQATTtYrdp1x4LasqNdh3x8tqEPKOq -QR9SzN0LlX3lmp9xkpq0zLSTB+Wkqf0AHKA6inKFCFXhGlVLAoIBAQDTJPshVZ7B -FVuvkf06lu2kcdXay/FtEJJZs0NR5ksQ4Q3SgVccYbQpxOVdzVu8yfeiqHcT965G -hyuTI2Ymlzp0Yc4RUGD/Oz0yybaXcaHzfve+jbOZQlVgstSXGtOocVVsZRMR1ReY -ln24e3XDc3E5ZQ1sCU2bNDSXexTXKJ+/Pxs6jzTmdgVjjoQxHmTWfc7kMxQSbRg4 -+WauXi06lOGQUjAzwqqykzLUo6SAuiNDk4hs8S1VDIY3T53FN07MS8LOYw3f6j4b -0W3/fm16bIrcMHtvzXSvoOz/5TM0gXa0s7IkHIFp0Q3ZviDD+vJjMdqabovnaUZL -3o7xBNveGH6FAoIBAClbL3i7eaP4uGSilJ60yPhjWZIxyTWgqnzRthZdPbYlC45U -iiatJp/pbNnsjbfSBBirRnqfo6WtXzspDq4Yo8CentKetTaKZngIt+R7tJmj9aDM -BvP/52LG2xGs68ocRGDkDB+92gU/iFTultaMWkVkoBz3C78FAU1viwzo+TljTFiX -jom7CG2RTqGnhlM614iNUf2Sr0aR1z73SeM29kPEb1kknu5utWHNHeQavyfCYDrd -3xAJxVM98CKcXcDc1o7HuWbhjDS8pNG9XJKZg315DffBE98p8dBFQ/DKZT2V8XDh -Dy2h2ybbu3sH0HS8OeqtyPcmXivJ84AZh3aC43UCggEBAJgjT2hnfyp1L0lEkghD -dBxHgeZTiTQi+rDzgBBlNbWF1CXX2GK069Ik83gxXmabyHlFlfenQcXD49vzZ8lq -sk3lo0vKdPmzP5d2zMA2upDu7ag6BizqJQa2FISEafo3H2wpMCJrPsNNvGzPWL1q -Uy0d6RvtYCPy6+oyX4Pg26o72GzLfzyH0b44lHuoANSkdgzr2Qyl2wxCeIAvj7tk -R1tsPRsuqzxc4AVjnEa82Gf6yBRne1iK27LDBVVzUq3V8RoxOXAcIoB82Rt9h3js -+OlLbcXTbr6V3+9KwSiWGGiACl4GxCDw51/7pTRRf5GF/JPGD7zAmExWovx7mclg -MNkCggEBAMZvA/cwM+awUARNtKWJXXm0MPa8ZoIQb85e2cQaNHcJww8GFm/WZZdl -62yKPMynJZYquO8C+PgLHQETe1wU1uROUC8FdaA1+1zTfMYtSNcCv+TZCbDpDdtg -bzFwROVwRmVbnGrMkIGJ3KA2SR4IVW/vmH67RnMiz66Wh2ULV846G1BIeSVBB3PF -nyxN8YCCoDVMJIDsuh8wtmxWrKq52ipxYM22pzjrggB81UWc6g1Rc740geNoN0kb -pg39fTpGfv1dR30GlRsrYml8Yyd+77H6lsUeftBDv57SStZBNB1/G3cby+a3vbRt -OfHoHMHDoBCO8uWksf5o5nGL/bJnKpQ= ------END PRIVATE KEY-----`) - // test1KeyPEMBlock, err := decryptPEM(test1KeyPEMBlock, password ) - // if err != nil { - // return nil, err - // } - - test1, err := tls.X509KeyPair(test1CertPEMBlock, test1KeyPEMBlock) - if err != nil { - return nil, err - } - - certStore.Add("test1", test1) - test2CertPEMBlock := []byte(`-----BEGIN CERTIFICATE----- -MIICTjCCAfWgAwIBAgIRAIHHXqg/VV7LNeakO6qlr00wCgYIKoZIzj0EAwIwDjEM -MAoGA1UEAxMDZHNoMB4XDTI0MTAwNDE5MTYzNVoXDTI1MDEwMjE5MTYzNVowNjEM -MAoGA1UEChMDZHNoMQ8wDQYDVQQLEwZkc2hkZXYxFTATBgNVBAMTDHRlc3QyLmRz -aGRldjCCASIwDQYJKoZIhvcNAQEBBQADggEPADCCAQoCggEBANpKJ/lQIKxWdSM7 -rqrcZVI+37GhMFgqt4Bo/A61qbjYHr1QD/Sf5cjN5s7+D7VGNj195am3cuwQbNX4 -X/sQYZLs/7BlBNRgl8XR3+aMZWP1q/PIFsIS95TeJvreUkhg18tWaWDrN3n1+sJX -5WU9BAFkkwWSIYS5wE1mRa7pLHpW6GwHf6TZjGOUrcQOamW2oZz15g6iTwtXrnY/ -oFAvHm9UaiYgAAiv5yOjHacyPHMEOAFLfeiaBLLhRvkB/CTjL9rqaS1SkciF5QHT -oIVpi/SnaLLTTCe56pyzP+AWA712/RmcqprfI8RaXPCvxRj39WvAWA5iImraQLcR -0JW+JG8CAwEAAaNBMD8wDgYDVR0PAQH/BAQDAgWgMAwGA1UdEwEB/wQCMAAwHwYD -VR0jBBgwFoAU8SGFhS6C5YEbZTVdYYvqmK9CEYEwCgYIKoZIzj0EAwIDRwAwRAIg -VBTbGbkxeiCyYZZQXWEpTi74+nYvvb7OwXpEaek0amgCIB6CtckF5QCFrmYXkJ3g -vepmYaCBf0ltAa6LF4WkqthA ------END CERTIFICATE-----`) - test2KeyPEMBlock := []byte(`-----BEGIN ENCRYPTED PRIVATE KEY----- -MIIFNTBfBgkqhkiG9w0BBQ0wUjAxBgkqhkiG9w0BBQwwJAQQoZBZN2ffiH5uZjNs -G/CVgwICCAAwDAYIKoZIhvcNAgkFADAdBglghkgBZQMEASoEELd9Q46y0aB1etfV -Wp9KYDcEggTQExaJ8gtsDi33ZIJgsEouOmMK61XFfmiNPmL4Wo2mAqbECaA3TISX -GE0cPBHWDFwEgJO2Yd0dn/bl0m9kzJlg29dwOPHbVz9akdw6gVX10j0rPRQ0Yy0A -nSyc7yNW/P9hhZ8WW8UvpYXpaf2o9M4au2WFnrWt07ezkppl0gErjk7zBylBcGMU -ivHJFfmmg+0CDhW0VFclSDpifRZ7AN8btw8SU8HYucGoqIIkZEBypgTp7I/hYIaK -UcYV3KFTVhK1w7dp6+aT1gxoZtyqTvfYVWG0dW5op2AUAHfbmdu9lM8RDYPSvA85 -blZdupjYdAlT3mDgW8lPKVXU+3K1Em8HK+U3u1n2kNNsKTdyg/blSUmg2k30ENP5 -vl/oe0aKN9IWfTrnXtqxLQQVneeU6TfzBx8n6sl0M64VC7t03A3IUw1sL2hCLAy8 -shBOb1cPF/Lgkh4q6sgqQc8nIPUHHTUClMU38PO7bmZiLx+rARHKgYURfPrSmKHj -UC15E8P4gAWRnuhVwCVs6cuD2Qfl+P1zLP7YfUnYrWgdZOm16NZfasfsmwWo4EEd -N4iclWpf3rSlNP5tkXsVqBwedhOJrnBrIQkCHHEwOn01qmGo3N8MZnl7bssPDtzK -86p2mPvkaKs1XazJILPeN/iTNU9jDeKPE2foz/wLfK7BZQqq+usfBqaa2TlVFjoe -CoTkR3Yp8bkfQH4O8ZBl+9sUSmZsyGuqLRG/QeucWexaAwburXiG8IPiL83e6QkG -5W0hQ6f5RCRgf5zqHmefBa0aLb4kAJTxAJsemKqzHHYD5JwkGx5iUhdDAbCGxiRu -30y3n3j++jnVRDtW3gOt2LtTcBBYl7GABgu8+Nxue+m8tKaK1FQXPSMyQmjlHSX/ -7Te0BQwz0UfRMkOCnOFm00WKkVb3FmGWEUCTN+W5QvZ5kjAeRbSJX2Bu63/vxSrU -ZacSN+Uu7UWbGRpA88lXsRW8p6y5Fda0WxgFsLAotXSV7uhJBq0MljmSuG7QyYYR -ukaYlaZvIQCZsL+88VIL7I7sui01oYhb3AjrtVNGlhIS+T1vlRvJDJx+nubox6lQ -XzYzsviPWtvtzX93TxtBv5JINxOR2NL/wryxDU+O8NlnZHCoSQhCHDc3qPihCgvZ -ZslAHL6Fuqt5omkZMargs/fdCKS8R8EPseRuGUx7gajfrBSAvi7kwEqQjKDM491Z -TbP6MBxf3AN0LUmjcZCj2AJ+rjYOf5rNl+Nq/xg31WAQ6FjA56q9wZvKmZ11jsJj -+9G54fg0k6OhlEVgc73grVfOttkunrXCjy2DscmNrLLTIo8HhbiwjuC/C3ByHhaa -UJ3F7zLnrBJSlzhvsOFnabnxoMc0DAdqgB9s58WoL6cBHsKlYmkpYmGtVLA9Vcaw -sZ31Fdtkb42nPI6OdmDl0V3qN3/ULDCXvXIvwfqbTy2gnz1E+DOLAAt06h29Nm27 -3u62LD6eukc1orGxC/coU/eI8D/Oh0B6oFMAnuOrqQNiUVP6A03hVpWicn5wszZA -dmr8Zsm8ItAk7IA1aYCCpfI813gNtMs5aBg7ckrbW25KybFsc3NpqumbxoyaUfsc -tlImLNAe4zfg1Q5bsvwp0KCg3PtW3ErwH+M19TbtEcc2kojz3kVg4Rg= ------END ENCRYPTED PRIVATE KEY-----`) - test2KeyPEMBlock, err = decryptPEM(test2KeyPEMBlock, password ) - if err != nil { - return nil, err - } - - test2, err := tls.X509KeyPair(test2CertPEMBlock, test2KeyPEMBlock) - if err != nil { - return nil, err - } - - certStore.Add("test2", test2) - return certStore, nil -} - func createCertStore(c *config.Config) (CertStore, error) { if c.Kafka.TLS.ClientCertFile != "" && c.Kafka.TLS.ClientKeyFile != "" { - certPEMBlock, err := ioutil.ReadFile(c.Kafka.TLS.ClientCertFile) + certPEMBlock, err := os.ReadFile(c.Kafka.TLS.ClientCertFile) if err != nil { return nil, err } - keyPEMBlock, err := ioutil.ReadFile(c.Kafka.TLS.ClientKeyFile) + keyPEMBlock, err := os.ReadFile(c.Kafka.TLS.ClientKeyFile) if err != nil { return nil, err } diff --git a/proxy/dial.go b/proxy/dial.go index c27dfca..00ac3ed 100644 --- a/proxy/dial.go +++ b/proxy/dial.go @@ -109,17 +109,12 @@ func (d tlsDialer) DialWithContext(network, addr string, ctx *ConnectionContext) return nil, errors.New("clientID must not be nil or empty") } - logrus.WithField("clientID", *ctx.clientID).Info("Dialing with TLS") - config, err := d.getTLSConfig(addr, ctx) if err != nil { return nil, err } - logrus.WithField("clientID", *ctx.clientID).Debug("Handshaking with TLS") - timeout := d.timeout - var errChannel chan error if timeout != 0 { @@ -183,7 +178,6 @@ func (d tlsDialer) getTLSConfig(addr string, ctx *ConnectionContext) (*tls.Confi if err != nil { return nil, errors.Wrapf(err, "Failed to get certificate for clientID %s", *ctx.clientID) } else { - logrus.Debugf("Certificate found: %+v", cert) c := config.Clone() c.Certificates = []tls.Certificate{cert} config = c diff --git a/proxy/pikachu.go b/proxy/pikachu.go new file mode 100644 index 0000000..b76c198 --- /dev/null +++ b/proxy/pikachu.go @@ -0,0 +1,265 @@ +package proxy + +import ( + "crypto/rand" + "crypto/rsa" + "crypto/tls" + "crypto/x509" + "crypto/x509/pkix" + "encoding/pem" + "fmt" + "io" + "net/http" + "os" + "strings" + "sync" + "time" + + "github.com/pkg/errors" + "github.com/sirupsen/logrus" +) + +type PikachuConfig struct { + Addr string; + Tenant string; + AppName string; + MarathonAppId string; + MesosTaskId string; + SecretToken string; +} + +func NewPikachuConfig() (PikachuConfig, error) { + marathonAppId := os.Getenv("MARATHON_APP_ID") + pieces := strings.Split(marathonAppId, "/") + if len(pieces) != 3 || pieces[1] == "" || pieces[2] == "" { + return PikachuConfig{}, fmt.Errorf("invalid MARATHON_APP_ID: %s", marathonAppId) + } + appName := pieces[2] + tenant := pieces[1] + + pikachuAddr := os.Getenv("KAFKA_CONFIG_HOST") + if pikachuAddr == "" { + return PikachuConfig{}, errors.New("missing KAFKA_CONFIG_HOST") + } + + mesosTaskId := os.Getenv("MESOS_TASK_ID") + if mesosTaskId == "" { + return PikachuConfig{}, errors.New("missing MESOS_TASK_ID") + } + + secretToken := os.Getenv("DSH_SECRET_TOKEN") + if secretToken == "" { + return PikachuConfig{}, errors.New("missing DSH_SECRET_TOKEN") + } + + return PikachuConfig { + Addr: pikachuAddr, + Tenant: tenant, + AppName: appName, + MesosTaskId: mesosTaskId, + SecretToken: secretToken, // TODO: Now we read the secret token from the environment. We should read it from a file. + }, nil +} + +type PikachuClient struct { + config PikachuConfig; + httpClient http.Client; +} + +func NewPikachuClient(config PikachuConfig, httpClient http.Client) PikachuClient { + return PikachuClient{ + config: config, + httpClient: httpClient, + } +} + +func NewHttpClientFromEnv() (http.Client, error) { + caCertPool := x509.NewCertPool() + caCert := os.Getenv("DSH_CA_CERTIFICATE") + + if caCert == "" { + return http.Client{}, errors.New("missing DSH CA certificate") + } + + if ok := caCertPool.AppendCertsFromPEM([]byte(caCert)); !ok { + return http.Client{}, errors.New("failed to parse DSH CA certificate") + } + tlsConfig := &tls.Config{ + RootCAs: caCertPool, + } + transport := &http.Transport{ + TLSClientConfig: tlsConfig, + } + + return http.Client{ + Transport: transport, + }, nil +} + +func NewPikachuClientFromEnv() (PikachuClient, error) { + config, err := NewPikachuConfig() + if err != nil { + return PikachuClient{}, err + } + + httpClient, err := NewHttpClientFromEnv() + if err != nil { + return PikachuClient{}, err + } + + return NewPikachuClient(config, httpClient), nil +} + +func (c *PikachuClient) makePrivateKey() (*rsa.PrivateKey, error) { + privateKey, err := rsa.GenerateKey(rand.Reader, 2048) + if err != nil { + return nil, fmt.Errorf("failed to generate private key: %v", err) + } + + return privateKey, nil +} + +func (c *PikachuClient) makeCertificateRequest(clientID string, privateKey *rsa.PrivateKey) ([]byte,error) { + dnsName := fmt.Sprintf("%s.%s.marathon.mesos", c.config.AppName, c.config.Tenant) + + csrTemplate := &x509.CertificateRequest{ + Subject: pkix.Name{ + CommonName: fmt.Sprintf("acl.%s", clientID), + OrganizationalUnit: []string{c.config.Tenant}, + Organization: []string{"dsh"}, + }, + SignatureAlgorithm: x509.SHA256WithRSA, + DNSNames: []string{dnsName}, + ExtraExtensions: []pkix.Extension{ + { + Id: []int{2, 5, 29, 15}, // Key Usage OID + Critical: true, + Value: []byte{0x03, 0x02, 0x05, 0xa0}, // keyEncipherment and digitalSignature + }, + { + Id: []int{2, 5, 29, 37}, // Extended Key Usage OID + Critical: false, + Value: []byte{0x30, 0x0a, 0x06, 0x08, 0x2a, 0x86, 0x48, 0xce, 0x3d, 0x04, 0x03, 0x02}, // id-kp-clientAuth + }, + { + Id: []int{2, 5, 29, 19}, // Basic Constraints OID + Critical: true, + Value: []byte{0x30, 0x00}, // CA: FALSE + }, + }, + } + + csrBytes, err := x509.CreateCertificateRequest(rand.Reader, csrTemplate, privateKey) + if err != nil { + fmt.Printf("Failed to create CSR: %v\n", err) + return nil, err + } + + return pem.EncodeToMemory(&pem.Block{ + Type: "CERTIFICATE REQUEST", + Bytes: csrBytes, + }), nil +} + +func (c *PikachuClient) requestSignedCertificate(clientID string, csr []byte) ([]byte, error) { + req, err := http.NewRequest("POST", fmt.Sprintf( "https://%s/sign/%s/%s", c.config.Addr, c.config.Tenant, c.config.MesosTaskId ), strings.NewReader(string(csr))) + if err != nil { + return []byte{}, errors.Wrap(err, "failed to create certificate signing http request for Pikachu") + } + req.Header.Set("X-Kafka-Config-Token", c.config.SecretToken) + + resp, err := c.httpClient.Do(req) + if err != nil { + logrus.WithError( err ).WithField("clientID",clientID).Error("Failed to send request to Pikachu") + return []byte{}, errors.Wrap(err, "failed to send request to Pikachu") + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + body, err := io.ReadAll(resp.Body) // The body often contains more information about the error + if err == nil { + logrus.WithField("status", resp.Status).WithField("clientID",clientID).Error("Failed to get certificate from Pikachu") + } else { + logrus.WithField("status", resp.Status).WithField("clientID",clientID).Errorf("Failed to get certificate from Pikachu: %s", string(body)) + } + return []byte{}, fmt.Errorf("received unexpected response: %s", resp.Status) + } + + csr, nil := io.ReadAll(resp.Body) + if err != nil { + return []byte{}, errors.Wrap(err, "failed to read response body of certificate signing request") + } + + return csr, nil +} + +func (c *PikachuClient) RequestCertificate(clientID string) (tls.Certificate, error) { + startedAt := time.Now() + logrus.WithField( "clientID",clientID ).Info("Getting certificate from Pikachu") + + privateKey, err := c.makePrivateKey() + if err != nil { + return tls.Certificate{}, errors.Wrap(err, "failed to generate private key") + } + + csrPEM, err := c.makeCertificateRequest(clientID, privateKey) + if err != nil { + return tls.Certificate{}, errors.Wrap(err, "failed to generate certificate request") + } + + crtPEM, err := c.requestSignedCertificate(clientID, csrPEM) + if err != nil { + return tls.Certificate{}, errors.Wrap(err, "failed to get signed certificate") + } + + cert, err := tls.X509KeyPair(crtPEM, pem.EncodeToMemory(&pem.Block{ + Type: "RSA PRIVATE KEY", + Bytes: x509.MarshalPKCS1PrivateKey(privateKey), + })) + if err != nil { + return tls.Certificate{}, errors.Wrap(err, "failed to parse signed certificate") + } + + logrus.WithField("clientID",clientID).WithField("duration", time.Since(startedAt)).Info("Got signed certificate from Pikachu") + + return cert, nil +} + +type PikachuCertStore struct { + client PikachuClient; + cache map[string]tls.Certificate; + lock sync.Mutex; +} + +func NewPikachuCertStore() (*PikachuCertStore, error) { + client, err := NewPikachuClientFromEnv() + if err != nil { + return nil, err + } + + certStore := PikachuCertStore{ + client: client, + cache: make(map[string]tls.Certificate), + lock: sync.Mutex{}, + } + + return &certStore, nil +} + +func (s *PikachuCertStore) Get(clientID string) (tls.Certificate, error) { + s.lock.Lock() + defer s.lock.Unlock() + + if cert, ok := s.cache[clientID]; ok { + logrus.WithField("clientID",clientID).Info("Cache hit: Got certificate from cache") + return cert, nil + } + + logrus.WithField("clientID",clientID).Info("Cache miss: Getting certificate from Pikachu") + cert, err := s.client.RequestCertificate(clientID) + if err != nil { + return tls.Certificate{}, err + } + s.cache[clientID] = cert + return cert, nil +} \ No newline at end of file diff --git a/proxy/tls.go b/proxy/tls.go index 77b8e64..b5b5096 100644 --- a/proxy/tls.go +++ b/proxy/tls.go @@ -155,7 +155,7 @@ func newTLSClientConfig(conf *config.Config) (*tls.Config, error) { cfg := &tls.Config{InsecureSkipVerify: opts.InsecureSkipVerify} if opts.CAChainCertFile != "" { - caCertPEMBlock, err := ioutil.ReadFile(opts.CAChainCertFile) + caCertPEMBlock, err := os.ReadFile(opts.CAChainCertFile) if err != nil { return nil, err } diff --git a/test1.yaml b/test1.yaml deleted file mode 100644 index 77a6670..0000000 --- a/test1.yaml +++ /dev/null @@ -1,34 +0,0 @@ ---- -apiVersion: cert-manager.io/v1 -kind: Certificate -metadata: - name: kafka-proxy-test1 - namespace: dshdev-default -spec: - secretName: kafka-proxy-test1-secret - issuerRef: - name: dsh - kind: ClusterIssuer - commonName: test1.dshdev - subject: - organizations: - - dsh - organizationalUnits: - - dshdev ---- -apiVersion: cert-manager.io/v1 -kind: Certificate -metadata: - name: kafka-proxy-test2 - namespace: dshdev-default -spec: - secretName: kafka-proxy-test2-secret - issuerRef: - name: dsh - kind: ClusterIssuer - commonName: test2.dshdev - subject: - organizations: - - dsh - organizationalUnits: - - dshdev