From bf82e0e1d6d53eb44c5364645bd16f4d41fdf989 Mon Sep 17 00:00:00 2001 From: Johan Henriksson Date: Tue, 26 May 2020 15:51:23 +0200 Subject: [PATCH 1/7] traefik2 support wip --- cowait/engine/docker.py | 4 +- cowait/engine/kubernetes.py | 4 +- cowait/engine/routers/__init__.py | 14 +++ cowait/engine/routers/traefik2_router.py | 136 +++++++++++++++++++++++ 4 files changed, 154 insertions(+), 4 deletions(-) create mode 100644 cowait/engine/routers/traefik2_router.py diff --git a/cowait/engine/docker.py b/cowait/engine/docker.py index a13df691..d5aeb46b 100644 --- a/cowait/engine/docker.py +++ b/cowait/engine/docker.py @@ -3,8 +3,8 @@ from cowait.tasks import TaskDefinition, RemoteTask from .cluster import ClusterProvider from .const import LABEL_TASK_ID, LABEL_PARENT_ID -from .routers import LocalPortRouter from .errors import ProviderError +from .routers import create_router DEFAULT_NETWORK = 'cowait' @@ -28,7 +28,7 @@ class DockerProvider(ClusterProvider): def __init__(self, args={}): super().__init__('docker', args) self.docker = docker.from_env() - self.router = LocalPortRouter(self) + self.router = create_router(self) @property def network(self): diff --git a/cowait/engine/kubernetes.py b/cowait/engine/kubernetes.py index c27151b5..48d2bf15 100644 --- a/cowait/engine/kubernetes.py +++ b/cowait/engine/kubernetes.py @@ -6,8 +6,8 @@ from cowait.tasks import TaskDefinition, RemoteTask from .const import ENV_TASK_CLUSTER, LABEL_TASK_ID, LABEL_PARENT_ID from .cluster import ClusterProvider -from .routers import TraefikRouter from .errors import TaskCreationError, ProviderError +from .routers import create_router DEFAULT_NAMESPACE = 'default' @@ -43,7 +43,7 @@ def __init__(self, args={}): self.ext = client.ExtensionsV1beta1Api(self.client) self.custom = client.CustomObjectsApi(self.client) - self.router = TraefikRouter(self) + self.router = create_router(self) @property def namespace(self): diff --git a/cowait/engine/routers/__init__.py b/cowait/engine/routers/__init__.py index 76ee4960..b4b1389a 100644 --- a/cowait/engine/routers/__init__.py +++ b/cowait/engine/routers/__init__.py @@ -3,3 +3,17 @@ from .router import Router from .local_port_router import LocalPortRouter from .traefik_router import TraefikRouter +from .traefik2_router import Traefik2Router + + +def create_router(cluster): + kind = cluster.args.get('router', 'local').lower() + + if kind == 'local': + return LocalPortRouter(cluster) + if kind == 'traefik': + return TraefikRouter(cluster) + if kind == 'traefik2': + return Traefik2Router(cluster) + + raise RuntimeError(f'Unknown router type {kind}') diff --git a/cowait/engine/routers/traefik2_router.py b/cowait/engine/routers/traefik2_router.py new file mode 100644 index 00000000..20dc7387 --- /dev/null +++ b/cowait/engine/routers/traefik2_router.py @@ -0,0 +1,136 @@ +from kubernetes import client +from .router import Router +from ..const import LABEL_TASK_ID + + +TRAEFIK2_API_GROUP = 'traefik.containo.us' +TRAEFIK2_API_VERSION = 'v1alpha1' +TRAEFIK2_API_NAMESPACE = 'default' +TRAEFIK2_INGRESSROUTE = 'IngressRoute' +TRAEFIK2_INGRESSROUTE_PLURAL = 'ingressroutes' + + +class Traefik2Router(Router): + def __init__(self, cluster): + super().__init__(cluster) + cluster.on('prepare', self.on_prepare) + cluster.on('spawn', self.on_spawn) + cluster.on('kill', self.on_kill) + + self.config = cluster.args.get('traefik2', {}) + self.secure = self.config.get('secure', False) + self.middlewares = self.config.get('middlewares', []) + self.entrypoints = self.config.get('entrypoints', ['web']) + + def on_prepare(self, taskdef): + domain = self.cluster.domain + if domain is None: + raise RuntimeError('No cluster domain configured') + + protocol = 'https' if self.secure else 'http' + + for path, port in taskdef.routes.items(): + if len(path) < 1 and path[0] != '/': + raise RuntimeError(f'Paths must start with /, got {path}') + + taskdef.routes[path] = { + 'port': port, + 'path': path, + 'url': f'{protocol}://{taskdef.id}.{domain}{path}', + } + + return taskdef + + def on_spawn(self, task): + ports = [] + routes = [] + + idx = 0 + for path, route in task.routes.items(): + port = route['port'] + idx += 1 + + ports.append(client.V1ServicePort( + port=port, + target_port=port, + )) + + host = f'{task.id}.{self.cluster.domain}' + routes.append({ + 'match': f'Host(`{host}`) && PathPrefix(`{path}`)', + 'middlewares': self.middlewares, + 'kind': 'Rule', + 'services': [ + {'name': task.id, 'port': port} + ] + }) + + if len(routes) == 0: + return + + print('~~ creating task ingress', path, '-> port', port) + + self.cluster.core.create_namespaced_service( + namespace=self.cluster.namespace, + body=client.V1Service( + metadata=client.V1ObjectMeta( + name=task.id, + namespace=self.cluster.namespace, + labels={ + LABEL_TASK_ID: task.id, + }, + ), + spec=client.V1ServiceSpec( + selector={ + LABEL_TASK_ID: task.id, + }, + ports=ports, + ), + ), + ) + + self.cluster.custom.create_namespaced_custom_object( + group=TRAEFIK2_API_GROUP, + version=TRAEFIK2_API_VERSION, + plural=TRAEFIK2_INGRESSROUTE_PLURAL, + namespace=TRAEFIK2_API_NAMESPACE, + body={ + 'apiVersion': f'{TRAEFIK2_API_GROUP}/{TRAEFIK2_API_VERSION}', + 'kind': TRAEFIK2_INGRESSROUTE, + 'metadata': { + 'name': f'{task.id}', + 'namespace': self.cluster.namespace, + 'labels': { + LABEL_TASK_ID: task.id, + 'ingress-for': task.id + }, + }, + 'spec': { + 'entryPoints': self.entrypoints, + 'routes': routes, + } + }, + ) + + def on_kill(self, task_id): + try: + self.cluster.core.delete_namespaced_service( + namespace=self.cluster.namespace, + name=task_id, + ) + except client.rest.ApiException: + pass + + try: + self.cluster.ext.delete_namespaced_object( + namespace=self.cluster.namespace, + name=task_id + ) + self.cluster.custom.delete_cluster_custom_object_0( + group=TRAEFIK2_API_GROUP, + version=TRAEFIK2_API_VERSION, + plural=TRAEFIK2_INGRESSROUTE_PLURAL, + name=task_id, + ) + except client.rest.ApiException: + pass From 6b9af37c87da04fb39804b7c94ceaf8ebe66d8dd Mon Sep 17 00:00:00 2001 From: Johan Henriksson Date: Tue, 26 May 2020 16:03:25 +0200 Subject: [PATCH 2/7] add nop router, per-cluster default router --- cowait/engine/docker.py | 2 +- cowait/engine/kubernetes.py | 2 +- cowait/engine/routers/__init__.py | 6 +++--- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/cowait/engine/docker.py b/cowait/engine/docker.py index d5aeb46b..f797dcd3 100644 --- a/cowait/engine/docker.py +++ b/cowait/engine/docker.py @@ -28,7 +28,7 @@ class DockerProvider(ClusterProvider): def __init__(self, args={}): super().__init__('docker', args) self.docker = docker.from_env() - self.router = create_router(self) + self.router = create_router(self, self.args.get('router', 'local')) @property def network(self): diff --git a/cowait/engine/kubernetes.py b/cowait/engine/kubernetes.py index 48d2bf15..30ced1f7 100644 --- a/cowait/engine/kubernetes.py +++ b/cowait/engine/kubernetes.py @@ -43,7 +43,7 @@ def __init__(self, args={}): self.ext = client.ExtensionsV1beta1Api(self.client) self.custom = client.CustomObjectsApi(self.client) - self.router = create_router(self) + self.router = create_router(self, self.args.get('router', 'none')) @property def namespace(self): diff --git a/cowait/engine/routers/__init__.py b/cowait/engine/routers/__init__.py index b4b1389a..7bbb40a1 100644 --- a/cowait/engine/routers/__init__.py +++ b/cowait/engine/routers/__init__.py @@ -6,9 +6,9 @@ from .traefik2_router import Traefik2Router -def create_router(cluster): - kind = cluster.args.get('router', 'local').lower() - +def create_router(cluster, kind: str = 'none'): + if kind == 'none': + return Router(cluster) if kind == 'local': return LocalPortRouter(cluster) if kind == 'traefik': From 282d474380a7e97633a4f1148dc5009245d04fd9 Mon Sep 17 00:00:00 2001 From: Johan Henriksson Date: Tue, 26 May 2020 16:31:12 +0200 Subject: [PATCH 3/7] add context option to kubernetes provider --- cowait/engine/kubernetes.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cowait/engine/kubernetes.py b/cowait/engine/kubernetes.py index 30ced1f7..7032d5a6 100644 --- a/cowait/engine/kubernetes.py +++ b/cowait/engine/kubernetes.py @@ -35,7 +35,7 @@ def __init__(self, args={}): if ENV_TASK_CLUSTER in os.environ: config.load_incluster_config() else: - config.load_kube_config() + config.load_kube_config(context=self.args.get('context', None)) configuration = client.Configuration() self.client = kubernetes.client.ApiClient(configuration) From 17cc5d4715d5f7cd1d44bb2cc74d6ce56ea3cefd Mon Sep 17 00:00:00 2001 From: Johan Henriksson Date: Tue, 26 May 2020 16:31:28 +0200 Subject: [PATCH 4/7] fix incorrect await in node.close() --- cowait/worker/worker_node.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cowait/worker/worker_node.py b/cowait/worker/worker_node.py index 348d029f..67dacbff 100644 --- a/cowait/worker/worker_node.py +++ b/cowait/worker/worker_node.py @@ -28,7 +28,7 @@ async def close(): if self.parent: await self.parent.close() - await self.io.create_task(close()) + self.io.create_task(close()) def capture_logs(self) -> StreamCapturing: """ Sets up a stream capturing context, forwarding logs to the node """ From 8c53336a5789f389888bc372ecb1e87bc2235da7 Mon Sep 17 00:00:00 2001 From: Johan Henriksson Date: Tue, 26 May 2020 16:40:09 +0200 Subject: [PATCH 5/7] add permissions for traefik apis to default k8s setup --- k8setup.yml | 3 +++ 1 file changed, 3 insertions(+) diff --git a/k8setup.yml b/k8setup.yml index a2022fa3..5c210929 100644 --- a/k8setup.yml +++ b/k8setup.yml @@ -11,6 +11,9 @@ rules: - apiGroups: ["extensions"] resources: ["ingresses"] verbs: ["get", "create", "list", "delete", "deletecollection"] +- apiGroups: ["traefik.containo.us"] + resources: ["ingressroutes"] + verbs: ["get", "create", "list", "delete", "deletecollection"] --- kind: ClusterRoleBinding apiVersion: rbac.authorization.k8s.io/v1 From fe1b5bc502cb99e4352d58d64b4f4dab9bdaa5be Mon Sep 17 00:00:00 2001 From: Johan Henriksson Date: Wed, 27 May 2020 15:42:12 +0200 Subject: [PATCH 6/7] fix traefik2 ingress removal --- cowait/engine/routers/traefik2_router.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/cowait/engine/routers/traefik2_router.py b/cowait/engine/routers/traefik2_router.py index 20dc7387..7ee8dbdb 100644 --- a/cowait/engine/routers/traefik2_router.py +++ b/cowait/engine/routers/traefik2_router.py @@ -122,10 +122,6 @@ def on_kill(self, task_id): pass try: - self.cluster.ext.delete_namespaced_object( - namespace=self.cluster.namespace, - name=task_id - ) self.cluster.custom.delete_cluster_custom_object_0( group=TRAEFIK2_API_GROUP, version=TRAEFIK2_API_VERSION, From acd15cff1248c8c50fb8622c7bde94e6efe1511e Mon Sep 17 00:00:00 2001 From: Johan Henriksson Date: Fri, 29 May 2020 13:33:00 +0200 Subject: [PATCH 7/7] rename cowait rm to cowait clean --- bin/cowait | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/bin/cowait b/bin/cowait index b8c3878f..2130c66e 100755 --- a/bin/cowait +++ b/bin/cowait @@ -151,11 +151,15 @@ def test(ctx, cluster: str, push: bool): @cli.command(help='destroy tasks') +@click.option('-c', '--cluster', + default=None, + type=str, + help='cluster name') @click.pass_context def rm(ctx, cluster: str): if cluster is not None: ctx.obj.default_cluster = cluster - cowait.cli.destroy(ctx.obj, cluster) + cowait.cli.destroy(ctx.obj) @cli.command(help='list tasks')