Skip to content

Commit

Permalink
Merge pull request #96 from backtick-se/traefik2_support
Browse files Browse the repository at this point in the history
Traefik 2 Support
  • Loading branch information
johanhenriksson authored Jun 2, 2020
2 parents 0616c00 + acd15cf commit 3269900
Show file tree
Hide file tree
Showing 7 changed files with 160 additions and 7 deletions.
6 changes: 5 additions & 1 deletion bin/cowait
Original file line number Diff line number Diff line change
Expand Up @@ -151,11 +151,15 @@ def test(ctx, cluster: str, push: bool):


@cli.command(help='destroy tasks')
@click.option('-c', '--cluster',
default=None,
type=str,
help='cluster name')
@click.pass_context
def rm(ctx, cluster: str):
if cluster is not None:
ctx.obj.default_cluster = cluster
cowait.cli.destroy(ctx.obj, cluster)
cowait.cli.destroy(ctx.obj)


@cli.command(help='list tasks')
Expand Down
4 changes: 2 additions & 2 deletions cowait/engine/docker.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
from cowait.tasks import TaskDefinition, RemoteTask
from .cluster import ClusterProvider
from .const import LABEL_TASK_ID, LABEL_PARENT_ID
from .routers import LocalPortRouter
from .errors import ProviderError
from .routers import create_router

DEFAULT_NETWORK = 'cowait'

Expand All @@ -28,7 +28,7 @@ class DockerProvider(ClusterProvider):
def __init__(self, args={}):
super().__init__('docker', args)
self.docker = docker.from_env()
self.router = LocalPortRouter(self)
self.router = create_router(self, self.args.get('router', 'local'))

@property
def network(self):
Expand Down
6 changes: 3 additions & 3 deletions cowait/engine/kubernetes.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@
from cowait.tasks import TaskDefinition, RemoteTask
from .const import ENV_TASK_CLUSTER, LABEL_TASK_ID, LABEL_PARENT_ID
from .cluster import ClusterProvider
from .routers import TraefikRouter
from .errors import TaskCreationError, ProviderError
from .routers import create_router

DEFAULT_NAMESPACE = 'default'

Expand Down Expand Up @@ -35,15 +35,15 @@ def __init__(self, args={}):
if ENV_TASK_CLUSTER in os.environ:
config.load_incluster_config()
else:
config.load_kube_config()
config.load_kube_config(context=self.args.get('context', None))

configuration = client.Configuration()
self.client = kubernetes.client.ApiClient(configuration)
self.core = client.CoreV1Api(self.client)
self.ext = client.ExtensionsV1beta1Api(self.client)
self.custom = client.CustomObjectsApi(self.client)

self.router = TraefikRouter(self)
self.router = create_router(self, self.args.get('router', 'none'))

@property
def namespace(self):
Expand Down
14 changes: 14 additions & 0 deletions cowait/engine/routers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,17 @@
from .router import Router
from .local_port_router import LocalPortRouter
from .traefik_router import TraefikRouter
from .traefik2_router import Traefik2Router


def create_router(cluster, kind: str = 'none'):
if kind == 'none':
return Router(cluster)
if kind == 'local':
return LocalPortRouter(cluster)
if kind == 'traefik':
return TraefikRouter(cluster)
if kind == 'traefik2':
return Traefik2Router(cluster)

raise RuntimeError(f'Unknown router type {kind}')
132 changes: 132 additions & 0 deletions cowait/engine/routers/traefik2_router.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
from kubernetes import client
from .router import Router
from ..const import LABEL_TASK_ID


TRAEFIK2_API_GROUP = 'traefik.containo.us'
TRAEFIK2_API_VERSION = 'v1alpha1'
TRAEFIK2_API_NAMESPACE = 'default'
TRAEFIK2_INGRESSROUTE = 'IngressRoute'
TRAEFIK2_INGRESSROUTE_PLURAL = 'ingressroutes'


class Traefik2Router(Router):
def __init__(self, cluster):
super().__init__(cluster)
cluster.on('prepare', self.on_prepare)
cluster.on('spawn', self.on_spawn)
cluster.on('kill', self.on_kill)

self.config = cluster.args.get('traefik2', {})
self.secure = self.config.get('secure', False)
self.middlewares = self.config.get('middlewares', [])
self.entrypoints = self.config.get('entrypoints', ['web'])

def on_prepare(self, taskdef):
domain = self.cluster.domain
if domain is None:
raise RuntimeError('No cluster domain configured')

protocol = 'https' if self.secure else 'http'

for path, port in taskdef.routes.items():
if len(path) < 1 and path[0] != '/':
raise RuntimeError(f'Paths must start with /, got {path}')

taskdef.routes[path] = {
'port': port,
'path': path,
'url': f'{protocol}://{taskdef.id}.{domain}{path}',
}

return taskdef

def on_spawn(self, task):
ports = []
routes = []

idx = 0
for path, route in task.routes.items():
port = route['port']
idx += 1

ports.append(client.V1ServicePort(
port=port,
target_port=port,
))

host = f'{task.id}.{self.cluster.domain}'
routes.append({
'match': f'Host(`{host}`) && PathPrefix(`{path}`)',
'middlewares': self.middlewares,
'kind': 'Rule',
'services': [
{'name': task.id, 'port': port}
]
})

if len(routes) == 0:
return

print('~~ creating task ingress', path, '-> port', port)

self.cluster.core.create_namespaced_service(
namespace=self.cluster.namespace,
body=client.V1Service(
metadata=client.V1ObjectMeta(
name=task.id,
namespace=self.cluster.namespace,
labels={
LABEL_TASK_ID: task.id,
},
),
spec=client.V1ServiceSpec(
selector={
LABEL_TASK_ID: task.id,
},
ports=ports,
),
),
)

self.cluster.custom.create_namespaced_custom_object(
group=TRAEFIK2_API_GROUP,
version=TRAEFIK2_API_VERSION,
plural=TRAEFIK2_INGRESSROUTE_PLURAL,
namespace=TRAEFIK2_API_NAMESPACE,
body={
'apiVersion': f'{TRAEFIK2_API_GROUP}/{TRAEFIK2_API_VERSION}',
'kind': TRAEFIK2_INGRESSROUTE,
'metadata': {
'name': f'{task.id}',
'namespace': self.cluster.namespace,
'labels': {
LABEL_TASK_ID: task.id,
'ingress-for': task.id
},
},
'spec': {
'entryPoints': self.entrypoints,
'routes': routes,
}
},
)

def on_kill(self, task_id):
try:
self.cluster.core.delete_namespaced_service(
namespace=self.cluster.namespace,
name=task_id,
)
except client.rest.ApiException:
pass

try:
self.cluster.custom.delete_cluster_custom_object_0(
group=TRAEFIK2_API_GROUP,
version=TRAEFIK2_API_VERSION,
plural=TRAEFIK2_INGRESSROUTE_PLURAL,
name=task_id,
)
except client.rest.ApiException:
pass
2 changes: 1 addition & 1 deletion cowait/worker/worker_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ async def close():
if self.parent:
await self.parent.close()

await self.io.create_task(close())
self.io.create_task(close())

def capture_logs(self) -> StreamCapturing:
""" Sets up a stream capturing context, forwarding logs to the node """
Expand Down
3 changes: 3 additions & 0 deletions k8setup.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ rules:
- apiGroups: ["extensions"]
resources: ["ingresses"]
verbs: ["get", "create", "list", "delete", "deletecollection"]
- apiGroups: ["traefik.containo.us"]
resources: ["ingressroutes"]
verbs: ["get", "create", "list", "delete", "deletecollection"]
---
kind: ClusterRoleBinding
apiVersion: rbac.authorization.k8s.io/v1
Expand Down

0 comments on commit 3269900

Please sign in to comment.