From 5608a874cede45d18836ebecacc5d201ece2c3d9 Mon Sep 17 00:00:00 2001 From: Justin Brooks Date: Tue, 10 Sep 2024 17:54:46 -0400 Subject: [PATCH] fix kubernetes job agent --- .../providers-kubernetes-job-agent.yaml | 60 ++++++ pnpm-lock.yaml | 18 +- providers/kubernetes-job-agent/package.json | 6 +- providers/kubernetes-job-agent/src/config.ts | 14 +- .../kubernetes-job-agent/src/gke-connect.ts | 43 ---- providers/kubernetes-job-agent/src/gke.ts | 121 ----------- providers/kubernetes-job-agent/src/index.ts | 195 ++++++++++++++---- providers/kubernetes-job-agent/src/k8s.ts | 88 ++++++++ providers/kubernetes-job-agent/src/utils.ts | 6 + 9 files changed, 326 insertions(+), 225 deletions(-) create mode 100644 .github/workflows/providers-kubernetes-job-agent.yaml delete mode 100644 providers/kubernetes-job-agent/src/gke-connect.ts delete mode 100644 providers/kubernetes-job-agent/src/gke.ts create mode 100644 providers/kubernetes-job-agent/src/k8s.ts diff --git a/.github/workflows/providers-kubernetes-job-agent.yaml b/.github/workflows/providers-kubernetes-job-agent.yaml new file mode 100644 index 00000000..f0ab027d --- /dev/null +++ b/.github/workflows/providers-kubernetes-job-agent.yaml @@ -0,0 +1,60 @@ +name: Providers / Kubernetes Job Agent + +on: + pull_request: + branches: ["*"] + paths: + - providers/kubernetes-job-agent/** + - .github/workflows/providers-kubernetes-job-agent.yaml + - pnpm-lock.yaml + push: + branches: ["main"] + paths: + - providers/kubernetes-job-agent/** + - .github/workflows/providers-kubernetes-job-agent.yaml + - pnpm-lock.yaml + +jobs: + build: + runs-on: ubuntu-latest + permissions: + contents: read + id-token: write + steps: + - uses: actions/checkout@v4 + + - name: Set up QEMU + uses: docker/setup-qemu-action@v3 + + - name: Set up Docker Buildx + uses: docker/setup-buildx-action@v3 + + - name: Login to Docker Hub + uses: docker/login-action@v3 + with: + username: ${{ secrets.DOCKERHUB_USERNAME }} + password: ${{ secrets.DOCKERHUB_TOKEN }} + + - name: Extract metadata (tags, labels) for Docker + id: meta + uses: docker/metadata-action@v4 + with: + images: ctrlplane/kubernetes-job-agent + tags: | + type=sha,format=short,prefix= + + - name: Build + uses: docker/build-push-action@v6 + if: github.ref != 'refs/heads/main' + with: + push: false + file: providers/kubernetes-job-agent/Dockerfile + tags: ${{ steps.meta.outputs.tags }} + + - name: Build and Push + uses: docker/build-push-action@v6 + if: github.ref == 'refs/heads/main' + with: + push: true + file: providers/kubernetes-job-agent/Dockerfile + tags: ${{ steps.meta.outputs.tags }} diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 39bbbb7e..ea91936b 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -1184,9 +1184,6 @@ importers: '@ctrlplane/validators': specifier: workspace:* version: link:../../packages/validators - '@google-cloud/container': - specifier: ^5.16.0 - version: 5.16.0 '@kubernetes/client-node': specifier: ^0.21.0 version: 0.21.0 @@ -1199,21 +1196,18 @@ importers: dotenv: specifier: ^16.4.5 version: 16.4.5 - google-auth-library: - specifier: ^9.13.0 - version: 9.14.0 handlebars: specifier: ^4.7.8 version: 4.7.8 + js-yaml: + specifier: ^4.1.0 + version: 4.1.0 lodash: specifier: ^4.17.21 version: 4.17.21 p-retry: specifier: ^6.2.0 version: 6.2.0 - semver: - specifier: ^7.6.2 - version: 7.6.3 zod: specifier: 'catalog:' version: 3.23.8 @@ -1227,12 +1221,12 @@ importers: '@ctrlplane/tsconfig': specifier: workspace:* version: link:../../tooling/typescript + '@types/js-yaml': + specifier: ^4.0.9 + version: 4.0.9 '@types/lodash': specifier: ^4.17.5 version: 4.17.7 - '@types/semver': - specifier: ^7.5.8 - version: 7.5.8 eslint: specifier: 'catalog:' version: 9.9.1(jiti@1.21.6) diff --git a/providers/kubernetes-job-agent/package.json b/providers/kubernetes-job-agent/package.json index 5ea380e8..fe55c7ff 100644 --- a/providers/kubernetes-job-agent/package.json +++ b/providers/kubernetes-job-agent/package.json @@ -22,24 +22,22 @@ "@ctrlplane/logger": "workspace:*", "@ctrlplane/node-sdk": "workspace:*", "@ctrlplane/validators": "workspace:*", - "@google-cloud/container": "^5.16.0", "@kubernetes/client-node": "^0.21.0", "@t3-oss/env-core": "^0.10.1", "cron": "^3.1.7", "dotenv": "^16.4.5", - "google-auth-library": "^9.13.0", "handlebars": "^4.7.8", + "js-yaml": "^4.1.0", "lodash": "^4.17.21", "p-retry": "^6.2.0", - "semver": "^7.6.2", "zod": "catalog:" }, "devDependencies": { "@ctrlplane/eslint-config": "workspace:*", "@ctrlplane/prettier-config": "workspace:*", "@ctrlplane/tsconfig": "workspace:*", + "@types/js-yaml": "^4.0.9", "@types/lodash": "^4.17.5", - "@types/semver": "^7.5.8", "eslint": "catalog:", "prettier": "catalog:", "typescript": "^5.4.5" diff --git a/providers/kubernetes-job-agent/src/config.ts b/providers/kubernetes-job-agent/src/config.ts index acd5614b..c6a683a3 100644 --- a/providers/kubernetes-job-agent/src/config.ts +++ b/providers/kubernetes-job-agent/src/config.ts @@ -9,19 +9,13 @@ export const env = createEnv({ CTRLPLANE_API_URL: z.string().default("http://localhost:3000"), CTRLPLANE_API_KEY: z.string(), CTRLPLANE_WORKSPACE: z.string(), - CTRLPLANE_SCANNER_NAME: z.string().default("offical-google-scanner"), - CTRLPLANE_GKE_TARGET_NAME: z - .string() - .default("gke-{{ projectId }}-{{ cluster.name }}"), - CTRLPLANE_COMPUTE_TARGET_NAME: z - .string() - .default("gc-{{ projectId }}-{{ vm.name }}"), + CTRLPLANE_AGENT_NAME: z.string().default("kubernetes-job-agent"), + + KUBE_CONFIG_PATH: z.string().optional(), + KUBE_NAMESPACE: z.string().default("default"), CRON_ENABLED: z.boolean().default(true), CRON_TIME: z.string().default("* * * * *"), - - GOOGLE_PROJECT_ID: z.string().min(1), - GOOGLE_SCAN_GKE: z.boolean().default(true), }, runtimeEnv: process.env, diff --git a/providers/kubernetes-job-agent/src/gke-connect.ts b/providers/kubernetes-job-agent/src/gke-connect.ts deleted file mode 100644 index 057fe84a..00000000 --- a/providers/kubernetes-job-agent/src/gke-connect.ts +++ /dev/null @@ -1,43 +0,0 @@ -import type { ClusterManagerClient } from "@google-cloud/container"; -import { KubeConfig } from "@kubernetes/client-node"; -import { GoogleAuth } from "google-auth-library"; - -const sourceCredentials = new GoogleAuth({ - scopes: ["https://www.googleapis.com/auth/cloud-platform"], -}); - -export const connectToCluster = async ( - clusterClient: ClusterManagerClient, - project: string, - clusterName: string, - clusterLocation: string, -) => { - const [credentials] = await clusterClient.getCluster({ - name: `projects/${project}/locations/${clusterLocation}/clusters/${clusterName}`, - }); - const kubeConfig = new KubeConfig(); - kubeConfig.loadFromOptions({ - clusters: [ - { - name: clusterName, - server: `https://${credentials.endpoint}`, - caData: credentials.masterAuth!.clusterCaCertificate!, - }, - ], - users: [ - { - name: clusterName, - token: (await sourceCredentials.getAccessToken())!, - }, - ], - contexts: [ - { - name: clusterName, - user: clusterName, - cluster: clusterName, - }, - ], - currentContext: clusterName, - }); - return kubeConfig; -}; diff --git a/providers/kubernetes-job-agent/src/gke.ts b/providers/kubernetes-job-agent/src/gke.ts deleted file mode 100644 index 3973739b..00000000 --- a/providers/kubernetes-job-agent/src/gke.ts +++ /dev/null @@ -1,121 +0,0 @@ -import type { KubernetesClusterAPIV1 } from "@ctrlplane/validators/targets"; -import type { google } from "@google-cloud/container/build/protos/protos.js"; -import Container from "@google-cloud/container"; -import { CoreV1Api } from "@kubernetes/client-node"; -import handlebars from "handlebars"; -import _ from "lodash"; -import { SemVer } from "semver"; - -import { logger } from "@ctrlplane/logger"; -import { kubernetesNamespaceV1 } from "@ctrlplane/validators/targets"; - -import { env } from "./config.js"; -import { connectToCluster } from "./gke-connect.js"; -import { omitNullUndefined } from "./utils.js"; - -export const gkeLogger = logger.child({ label: "gke" }); - -const clusterClient = new Container.v1.ClusterManagerClient(); - -const getClusters = async () => { - const request = { parent: `projects/${env.GOOGLE_PROJECT_ID}/locations/-` }; - const [response] = await clusterClient.listClusters(request); - const { clusters } = response; - return clusters; -}; - -const template = handlebars.compile(env.CTRLPLANE_GKE_TARGET_NAME); -const targetName = (cluster: google.container.v1.ICluster) => - template({ cluster, projectId: env.GOOGLE_PROJECT_ID }); - -export const getKubernetesClusters = async (): Promise< - Array<{ - cluster: google.container.v1.ICluster; - target: KubernetesClusterAPIV1; - }> -> => { - gkeLogger.info("Scanning Google Cloud GKE clusters"); - const clusters = (await getClusters()) ?? []; - return clusters.map((cluster) => { - const masterVersion = new SemVer(cluster.currentMasterVersion ?? "0"); - const nodeVersion = new SemVer(cluster.currentNodeVersion ?? "0"); - const autoscaling = String( - cluster.autoscaling?.enableNodeAutoprovisioning ?? false, - ); - - const appUrl = `https://console.cloud.google.com/kubernetes/clusters/details/${cluster.location}/${cluster.name}/details?project=${env.GOOGLE_PROJECT_ID}`; - - return { - cluster, - target: { - version: "kubernetes/v1", - kind: "ClusterAPI", - name: targetName(cluster), - identifier: `${env.GOOGLE_PROJECT_ID}/${cluster.name}`, - config: { - name: cluster.name!, - server: { - certificateAuthorityData: - cluster.masterAuth?.clusterCaCertificate ?? "", - endpoint: `https://${cluster.endpoint}`, - }, - }, - labels: omitNullUndefined({ - "ctrlplane/url": appUrl, - - "kubernetes/distribution": "gke", - "kubernetes/status": cluster.status, - "kubernetes/node-count": String(cluster.currentNodeCount ?? 0), - - "kubernetes/master-version": masterVersion.version, - "kubernetes/master-version-major": String(masterVersion.major), - "kubernetes/master-version-minor": String(masterVersion.minor), - "kubernetes/master-version-patch": String(masterVersion.patch), - - "kubernetes/node-version": nodeVersion.version, - "kubernetes/node-version-major": String(nodeVersion.major), - "kubernetes/node-version-minor": String(nodeVersion.minor), - "kubernetes/node-version-patch": String(nodeVersion.patch), - - "kubernetes/autoscaling-enabled": autoscaling, - }), - }, - }; - }); -}; - -export const getKubernetesNamespace = async ( - clusters: Array<{ - cluster: google.container.v1.ICluster; - target: KubernetesClusterAPIV1; - }>, -) => { - gkeLogger.info("Coverting GKE clusters to namespaces"); - - const namespaceTargets = clusters.map(async ({ cluster, target }) => { - const kubeConfig = await connectToCluster( - clusterClient, - env.GOOGLE_PROJECT_ID, - cluster.name!, - cluster.location!, - ); - const k8sApi = kubeConfig.makeApiClient(CoreV1Api); - const namespaces = await k8sApi - .listNamespace() - .then((r) => r.body.items.filter((n) => n.metadata != null)); - return namespaces.map((n) => - kubernetesNamespaceV1.parse( - _.merge(target, { - kind: "Namespace", - identifier: `${env.GOOGLE_PROJECT_ID}/${cluster.name}/${n.metadata!.name}`, - config: { namespace: n.metadata!.name }, - labels: { - "kubernetes/namespace": n.metadata!.name, - }, - }), - ), - ); - }); - - return Promise.all(namespaceTargets).then((v) => v.flat()); -}; diff --git a/providers/kubernetes-job-agent/src/index.ts b/providers/kubernetes-job-agent/src/index.ts index 8499ccb7..12de65c7 100644 --- a/providers/kubernetes-job-agent/src/index.ts +++ b/providers/kubernetes-job-agent/src/index.ts @@ -1,57 +1,182 @@ import { CronJob } from "cron"; +import handlebars from "handlebars"; +import yaml from "js-yaml"; import { logger } from "@ctrlplane/logger"; import { env } from "./config.js"; -import { - getKubernetesClusters, - getKubernetesNamespace, - gkeLogger, -} from "./gke.js"; +import { getBatchClient, getJobStatus } from "./k8s.js"; import { api } from "./sdk.js"; -const getScannerId = async () => { +const renderManifest = (manifestTemplate: string, variables: object) => { try { - const { id } = await api.upsertTargetProvider({ - workspace: env.CTRLPLANE_WORKSPACE, - name: env.CTRLPLANE_SCANNER_NAME, - }); - return id; + const template = handlebars.compile(manifestTemplate); + const manifestYaml = template(variables); + return yaml.load(manifestYaml) as any; } catch (error) { - logger.error(error); - logger.error( - `Failed to get scanner ID. This could be caused by incorrect workspace (${env.CTRLPLANE_WORKSPACE}), or API Key`, - { error }, + logger.error("Error rendering manifest", { error }); + throw error; + } +}; + +const deployManifest = async ( + jobExecutionId: string, + namespace: string, + manifest: any, +) => { + try { + const name = manifest?.metadata?.name; + logger.info(`Deploying manifest: ${namespace}/${name}`); + if (name == null) { + logger.error("Job name not found in manifest", { + jobExecutionId, + namespace, + }); + await api.updateJobExecution({ + executionId: jobExecutionId, + updateJobExecutionRequest: { + status: "invalid_job_agent", + message: "Job name not found in manifest.", + }, + }); + return; + } + + logger.info(`Creating job - ${namespace}/${name}`); + await getBatchClient().createNamespacedJob(namespace, manifest); + await api.updateJobExecution({ + executionId: jobExecutionId, + updateJobExecutionRequest: { + status: "in_progress", + externalRunId: `${namespace}/${name}`, + message: "Job created successfully.", + }, + }); + logger.info(`Job created successfully`, { + jobExecutionId, + namespace, + name, + }); + } catch (error: any) { + logger.error("Error deploying manifest", { + jobExecutionId, + namespace, + error: error.message, + }); + await api.updateJobExecution({ + executionId: jobExecutionId, + updateJobExecutionRequest: { + status: "invalid_job_agent", + message: error.body?.message || error.message, + }, + }); + } +}; + +const spinUpNewJobs = async (agentId: string) => { + try { + const { jobExecutions = [] } = await api.getNextJobs({ agentId }); + logger.info(`Found ${jobExecutions.length} jobExecution(s) to run.`); + await Promise.allSettled( + jobExecutions.map(async (jobExecution) => { + logger.info(`Running job execution ${jobExecution.id}`); + try { + const je = await api.getJobExecution({ + executionId: jobExecution.id, + }); + const manifest = renderManifest( + (jobExecution.jobAgentConfig as any).manifest, + je, + ); + const namespace = manifest?.metadata?.namespace ?? env.KUBE_NAMESPACE; + await api.acknowledgeJob({ executionId: jobExecution.id }); + await deployManifest(jobExecution.id, namespace, manifest); + } catch (error: any) { + logger.error(`Error processing job execution ${jobExecution.id}`, { + error: error.message, + }); + throw error; + } + }), ); + } catch (error: any) { + logger.error("Error spinning up new jobs", { + agentId, + error: error.message, + }); } - return null; }; -const scan = async () => { - const id = await getScannerId(); - if (id == null) return; +const updateExecutionStatus = async (agentId: string) => { + try { + const executions = await api.getAgentRunningExecutions({ agentId }); + logger.info(`Found ${executions.length} running execution(s)`); + await Promise.allSettled( + executions.map(async (exec) => { + const [namespace, name] = exec.externalRunId?.split("/") ?? []; + if (namespace == null || name == null) { + logger.error("Invalid external run ID", { + executionId: exec.id, + externalRunId: exec.externalRunId, + }); + return; + } - logger.info(`Scanner ID: ${id}`, { id }); - logger.info("Running google compute scanner", { - date: new Date().toISOString(), - }); + logger.debug(`Checking status of ${namespace}/${name}`); + try { + const { status, message } = await getJobStatus(namespace, name); + await api.updateJobExecution({ + executionId: exec.id, + updateJobExecutionRequest: { status, message }, + }); + logger.info(`Updated status for ${namespace}/${name}`, { + status, + message, + }); + } catch (error: any) { + logger.error(`Error updating status for ${namespace}/${name}`, { + error: error.message, + }); + } + }), + ); + } catch (error: any) { + logger.error("Error updating execution statuses", { + agentId, + error: error.message, + }); + } +}; - const targets = await getKubernetesClusters(); - gkeLogger.info(`Found ${targets.length} clusters`, { count: targets.length }); +const scan = async () => { + try { + const { id } = await api.updateJobAgent({ + workspace: env.CTRLPLANE_WORKSPACE, + updateJobAgentRequest: { + name: env.CTRLPLANE_AGENT_NAME, + type: "kubernetes-job", + }, + }); - const namespaces = await getKubernetesNamespace(targets); - gkeLogger.info(`Found ${namespaces.length} namespaces`, { - count: namespaces.length, - }); + logger.info(`Agent ID: ${id}`); + await spinUpNewJobs(id); + await updateExecutionStatus(id); + } catch (error: any) { + logger.error("Error during scan operation", { error: error.message }); + throw error; + } }; -logger.info( - `Starting google compute scanner from project '${env.GOOGLE_PROJECT_ID}' into workspace '${env.CTRLPLANE_WORKSPACE}'`, - env, -); +scan().catch((error) => { + logger.error("Unhandled error in scan operation", { error: error.message }); + console.error(error); +}); -scan().catch(console.error); if (env.CRON_ENABLED) { logger.info(`Enabling cron job, ${env.CRON_TIME}`, { time: env.CRON_TIME }); - new CronJob(env.CRON_TIME, scan).start(); + new CronJob(env.CRON_TIME, () => { + scan().catch((error) => { + logger.error("Unhandled error in cron job", { error: error.message }); + }); + }).start(); } diff --git a/providers/kubernetes-job-agent/src/k8s.ts b/providers/kubernetes-job-agent/src/k8s.ts new file mode 100644 index 00000000..593eecd5 --- /dev/null +++ b/providers/kubernetes-job-agent/src/k8s.ts @@ -0,0 +1,88 @@ +import k8s from "@kubernetes/client-node"; + +import { logger } from "@ctrlplane/logger"; + +import { env } from "./config.js"; + +const getKubeConfig = (configPath?: string | null) => { + const kc = new k8s.KubeConfig(); + try { + if (configPath) { + logger.info(`Loading config from file ${configPath}`); + kc.loadFromFile(configPath); + } else { + logger.info(`Loading config from default.`); + kc.loadFromDefault(); + } + return kc; + } catch (error) { + logger.error( + `Failed to load KubeConfig: ${error instanceof Error ? error.message : String(error)}`, + ); + throw error; + } +}; + +let _client: k8s.BatchV1Api | null = null; +export const getBatchClient = () => { + if (_client) return _client; + + try { + const kc = getKubeConfig(env.KUBE_CONFIG_PATH); + const cu = kc.getCurrentUser(); + logger.info(`Current user: ${cu?.name ?? cu?.username ?? "unknown"}`); + + logger.info("Creating BatchV1Api client..."); + const batchapi = kc.makeApiClient(k8s.BatchV1Api); + + logger.info("Batch V1 API client created successfully."); + _client = batchapi; + + return batchapi; + } catch (error) { + logger.error( + `Failed to create BatchV1Api client: ${error instanceof Error ? error.message : String(error)}`, + ); + throw error; + } +}; + +export const getJobStatus = async (namespace: string, name: string) => { + try { + logger.info(`Fetching job status for ${name} in namespace ${namespace}`); + const { body } = await getBatchClient().readNamespacedJob(name, namespace); + const { failed = 0, succeeded = 0, active = 0 } = body.status ?? {}; + const message = body.metadata?.name ?? ""; + + if (failed > 0) { + logger.warn(`Job ${name} in namespace ${namespace} failed`); + return { status: "failure" as const, message }; + } + + if (active > 0) { + logger.info(`Job ${name} in namespace ${namespace} is in progress`); + return { status: "in_progress" as const, message }; + } + + if (succeeded > 0) { + logger.info( + `Job ${name} in namespace ${namespace} completed successfully`, + ); + return { status: "completed" as const, message }; + } + + logger.warn(`Job ${name} in namespace ${namespace} has an unknown status`); + return {}; + } catch (error) { + logger.error( + `Error fetching job status for ${name} in namespace ${namespace}: ${error instanceof Error ? error.message : String(error)}`, + ); + return { + status: "invalid_job_agent" as const, + message: + error instanceof Error && "body" in error + ? (error.body as any).message + : "Unknown error occurred", + }; + } +}; diff --git a/providers/kubernetes-job-agent/src/utils.ts b/providers/kubernetes-job-agent/src/utils.ts index dc99eda3..70005e49 100644 --- a/providers/kubernetes-job-agent/src/utils.ts +++ b/providers/kubernetes-job-agent/src/utils.ts @@ -1,3 +1,5 @@ +import type { SetTargetProvidersTargetsRequest } from "@ctrlplane/node-sdk"; + export function omitNullUndefined(obj: object) { return Object.entries(obj).reduce>( (acc, [key, value]) => { @@ -7,3 +9,7 @@ export function omitNullUndefined(obj: object) { {}, ); } + +export type ScannerFunc = () => Promise< + SetTargetProvidersTargetsRequest["targets"] +>;