diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index e085749b68..f380c33d93 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -1,3 +1,3 @@ # Contributing to Flyte -For information related to contributing to Flyte, please check out the [Contributing to Flyte](https://docs.flyte.org/en/latest/community/contribute.html) section of the documentation. +For information related to contributing to Flyte, please check out the [Contributing to Flyte](https://docs.flyte.org/en/latest/community/contribute/index.html) section of the documentation. diff --git a/deployment/stats/prometheus/flyteuser-dashboard.json b/deployment/stats/prometheus/flyteuser-dashboard.json index 55c7ad5851..36eb2bb7bf 100644 --- a/deployment/stats/prometheus/flyteuser-dashboard.json +++ b/deployment/stats/prometheus/flyteuser-dashboard.json @@ -12,7 +12,7 @@ "annotations": { "list": [] }, - "description": "Flyte User Dashboard. This is great to get a birds-eye and drill down view of executions in your Flyte cluster. Useful for the user.", + "description": "Flyte User Dashboard. It's designed to give an overview of execution status and resource consumption.", "editable": false, "gnetId": null, "graphTooltip": 0, @@ -40,7 +40,8 @@ "thresholds": { "mode": "absolute", "steps": [] - } + }, + "unit": "" } }, "fill": 1, @@ -96,7 +97,7 @@ "targets": [ { "datasource": null, - "expr": "sum(rate(flyte:propeller:all:workflow:accepted{project=~\"$project\", domain=~\"$domain\", wf=~\"$workflow\"}[5m]))", + "expr": "avg(flyte:propeller:all:workflow:accepted{project=~\"$project\", domain=~\"$domain\", wf=~\"$workflow\"})", "format": "time_series", "hide": false, "instant": false, @@ -104,7 +105,7 @@ "intervalFactor": 2, "legendFormat": "", "metric": "", - "query": "sum(rate(flyte:propeller:all:workflow:accepted{project=~\"$project\", domain=~\"$domain\", wf=~\"$workflow\"}[5m]))", + "query": "avg(flyte:propeller:all:workflow:accepted{project=~\"$project\", domain=~\"$domain\", wf=~\"$workflow\"})", "refId": "A", "step": 10, "target": "" @@ -113,7 +114,7 @@ "thresholds": [], "timeFrom": null, "timeShift": null, - "title": "Accepted Workflow", + "title": "Accepted Workflows (avg)", "tooltip": { "msResolution": true, "shared": true, @@ -167,7 +168,8 @@ "thresholds": { "mode": "absolute", "steps": [] - } + }, + "unit": "" } }, "fill": 1, @@ -240,7 +242,7 @@ "thresholds": [], "timeFrom": null, "timeShift": null, - "title": "Successful Workflow", + "title": "Workflow success rate", "tooltip": { "msResolution": true, "shared": true, @@ -294,7 +296,8 @@ "thresholds": { "mode": "absolute", "steps": [] - } + }, + "unit": "" } }, "fill": 1, @@ -367,7 +370,7 @@ "thresholds": [], "timeFrom": null, "timeShift": null, - "title": "Failed Workflow", + "title": "Workflow failure rate", "tooltip": { "msResolution": true, "shared": true, @@ -421,7 +424,8 @@ "thresholds": { "mode": "absolute", "steps": [] - } + }, + "unit": "" } }, "fill": 1, @@ -477,7 +481,7 @@ "targets": [ { "datasource": null, - "expr": "sum(rate(flyte:propeller:all:workflow:workflow_aborted{project=~\"$project\", domain=~\"$domain\", wf=~\"$workflow\"}[5m]))", + "expr": "avg_over_time(flyte:propeller:all:workflow:workflow_aborted{project=~\"$project\", domain=~\"$domain\", wf=~\"$workflow\"}[5m])", "format": "time_series", "hide": false, "instant": false, @@ -485,7 +489,7 @@ "intervalFactor": 2, "legendFormat": "", "metric": "", - "query": "sum(rate(flyte:propeller:all:workflow:workflow_aborted{project=~\"$project\", domain=~\"$domain\", wf=~\"$workflow\"}[5m]))", + "query": "avg_over_time(flyte:propeller:all:workflow:workflow_aborted{project=~\"$project\", domain=~\"$domain\", wf=~\"$workflow\"}[5m])", "refId": "A", "step": 10, "target": "" @@ -494,7 +498,7 @@ "thresholds": [], "timeFrom": null, "timeShift": null, - "title": "Aborted Workflow", + "title": "Aborted Workflows (avg)", "tooltip": { "msResolution": true, "shared": true, @@ -513,7 +517,7 @@ "yaxes": [ { "decimals": null, - "format": "ops", + "format": "short", "label": null, "logBase": 1, "max": null, @@ -536,8 +540,6 @@ } }, { - "aliasColors": {}, - "bars": false, "cacheTimeout": null, "datasource": "${DS_PROM}", "description": null, @@ -547,64 +549,84 @@ "defaults": { "thresholds": { "mode": "absolute", - "steps": [] + "steps": [ + { + "color": "green", + "index": 0, + "line": true, + "op": "gt", + "value": "null", + "yaxis": "left" + }, + { + "color": "red", + "index": 1, + "line": true, + "op": "gt", + "value": 80.0, + "yaxis": "left" + } + ] } } }, - "fill": 1, - "grid": { - "threshold1": null, - "threshold1Color": "rgba(216, 200, 27, 0.27)", - "threshold2": null, - "threshold2Color": "rgba(234, 112, 112, 0.22)" - }, "gridPos": null, "height": null, "hideTimeOverride": false, "id": 5, "interval": null, - "isNew": true, - "legend": { - "alignAsTable": false, - "avg": false, - "current": false, - "hideEmpty": false, - "hideZero": false, - "max": false, - "min": false, - "rightSide": false, - "show": true, - "sideWidth": null, - "sort": null, - "sortDesc": false, - "total": false, - "values": false - }, - "lines": true, - "linewidth": 2, "links": [], "maxDataPoints": 100, "maxPerRow": null, "minSpan": null, - "nullPointMode": "connected", "options": { - "alertThreshold": true, - "dataLinks": [] + "displayMode": "lcd", + "fieldOptions": { + "calcs": [ + "mean" + ], + "defaults": { + "decimals": null, + "links": [], + "max": 100, + "min": 0, + "title": null, + "unit": "s" + }, + "limit": null, + "mappings": [], + "override": {}, + "thresholds": [ + { + "color": "green", + "index": 0, + "line": true, + "op": "gt", + "value": "null", + "yaxis": "left" + }, + { + "color": "red", + "index": 1, + "line": true, + "op": "gt", + "value": 80.0, + "yaxis": "left" + } + ], + "values": false + }, + "orientation": "horizontal", + "showThresholdLabels": false, + "showThresholdMarkers": true }, - "percentage": false, - "pointradius": 5, - "points": false, - "renderer": "flot", "repeat": null, "repeatDirection": null, - "seriesOverrides": [], "span": 2, - "stack": false, - "steppedLine": false, "targets": [ { "datasource": null, - "expr": "sum(flyte:propeller:all:workflow:success_duration_ms{project=~\"$project\", domain=~\"$domain\", wf=~\"$workflow\"}) by (quantile)", + "expr": "(avg(flyte:propeller:all:workflow:success_duration_ms{project=~\"$project\", domain=~\"$domain\", wf=~\"$workflow\"}) by(quantile))/1000", "format": "time_series", "hide": false, "instant": false, @@ -612,59 +634,20 @@ "intervalFactor": 2, "legendFormat": "", "metric": "", - "query": "sum(flyte:propeller:all:workflow:success_duration_ms{project=~\"$project\", domain=~\"$domain\", wf=~\"$workflow\"}) by (quantile)", + "query": "(avg(flyte:propeller:all:workflow:success_duration_ms{project=~\"$project\", domain=~\"$domain\", wf=~\"$workflow\"}) by(quantile))/1000", "refId": "A", "step": 10, "target": "" } ], - "thresholds": [], "timeFrom": null, "timeShift": null, - "title": "Successful workflow execution time by Quantile", - "tooltip": { - "msResolution": true, - "shared": true, - "sort": 0, - "value_type": "cumulative" - }, + "title": "Successful wf execution duration by quantile", "transformations": [], "transparent": false, - "type": "graph", - "xaxis": { - "mode": "time", - "name": null, - "show": true, - "values": [] - }, - "yaxes": [ - { - "decimals": null, - "format": "ms", - "label": null, - "logBase": 1, - "max": null, - "min": null, - "show": true - }, - { - "decimals": null, - "format": "short", - "label": null, - "logBase": 1, - "max": null, - "min": null, - "show": true - } - ], - "yaxis": { - "align": false, - "alignLevel": 0 - } + "type": "bargauge" }, { - "aliasColors": {}, - "bars": false, "cacheTimeout": null, "datasource": "${DS_PROM}", "description": null, @@ -674,191 +657,84 @@ "defaults": { "thresholds": { "mode": "absolute", - "steps": [] + "steps": [ + { + "color": "green", + "index": 0, + "line": true, + "op": "gt", + "value": "null", + "yaxis": "left" + }, + { + "color": "red", + "index": 1, + "line": true, + "op": "gt", + "value": 80.0, + "yaxis": "left" + } + ] } } }, - "fill": 1, - "grid": { - "threshold1": null, - "threshold1Color": "rgba(216, 200, 27, 0.27)", - "threshold2": null, - "threshold2Color": "rgba(234, 112, 112, 0.22)" - }, "gridPos": null, "height": null, "hideTimeOverride": false, "id": 6, "interval": null, - "isNew": true, - "legend": { - "alignAsTable": false, - "avg": false, - "current": false, - "hideEmpty": false, - "hideZero": false, - "max": false, - "min": false, - "rightSide": false, - "show": true, - "sideWidth": null, - "sort": null, - "sortDesc": false, - "total": false, - "values": false - }, - "lines": true, - "linewidth": 2, "links": [], "maxDataPoints": 100, "maxPerRow": null, "minSpan": null, - "nullPointMode": "connected", "options": { - "alertThreshold": true, - "dataLinks": [] - }, - "percentage": false, - "pointradius": 5, - "points": false, - "renderer": "flot", - "repeat": null, - "repeatDirection": null, - "seriesOverrides": [], - "span": 2, - "stack": false, - "steppedLine": false, - "targets": [ - { - "datasource": null, - "expr": "sum(flyte:propeller:all:workflow:failure_duration_ms{project=~\"$project\", domain=~\"$domain\", wf=~\"$workflow\"}) by (quantile)", - "format": "time_series", - "hide": false, - "instant": false, - "interval": "", - "intervalFactor": 2, - "legendFormat": "", - "metric": "", - "query": "sum(flyte:propeller:all:workflow:failure_duration_ms{project=~\"$project\", domain=~\"$domain\", wf=~\"$workflow\"}) by (quantile)", - "refId": "A", - "step": 10, - "target": "" - } - ], - "thresholds": [], - "timeFrom": null, - "timeShift": null, - "title": "Failed workflow execution time by Quantile", - "tooltip": { - "msResolution": true, - "shared": true, - "sort": 0, - "value_type": "cumulative" - }, - "transformations": [], - "transparent": false, - "type": "graph", - "xaxis": { - "mode": "time", - "name": null, - "show": true, - "values": [] - }, - "yaxes": [ - { - "decimals": null, - "format": "ms", - "label": null, - "logBase": 1, - "max": null, - "min": null, - "show": true + "displayMode": "lcd", + "fieldOptions": { + "calcs": [ + "mean" + ], + "defaults": { + "decimals": null, + "links": [], + "max": 100, + "min": 0, + "title": null, + "unit": "s" + }, + "limit": null, + "mappings": [], + "override": {}, + "thresholds": [ + { + "color": "green", + "index": 0, + "line": true, + "op": "gt", + "value": "null", + "yaxis": "left" + }, + { + "color": "red", + "index": 1, + "line": true, + "op": "gt", + "value": 80.0, + "yaxis": "left" + } + ], + "values": false }, - { - "decimals": null, - "format": "short", - "label": null, - "logBase": 1, - "max": null, - "min": null, - "show": true - } - ], - "yaxis": { - "align": false, - "alignLevel": 0 - } - }, - { - "aliasColors": {}, - "bars": false, - "cacheTimeout": null, - "datasource": "${DS_PROM}", - "description": null, - "editable": true, - "error": false, - "fieldConfig": { - "defaults": { - "thresholds": { - "mode": "absolute", - "steps": [] - } - } - }, - "fill": 1, - "grid": { - "threshold1": null, - "threshold1Color": "rgba(216, 200, 27, 0.27)", - "threshold2": null, - "threshold2Color": "rgba(234, 112, 112, 0.22)" + "orientation": "horizontal", + "showThresholdLabels": false, + "showThresholdMarkers": true }, - "gridPos": null, - "height": null, - "hideTimeOverride": false, - "id": 7, - "interval": null, - "isNew": true, - "legend": { - "alignAsTable": false, - "avg": false, - "current": false, - "hideEmpty": false, - "hideZero": false, - "max": false, - "min": false, - "rightSide": false, - "show": true, - "sideWidth": null, - "sort": null, - "sortDesc": false, - "total": false, - "values": false - }, - "lines": true, - "linewidth": 2, - "links": [], - "maxDataPoints": 100, - "maxPerRow": null, - "minSpan": null, - "nullPointMode": "connected", - "options": { - "alertThreshold": true, - "dataLinks": [] - }, - "percentage": false, - "pointradius": 5, - "points": false, - "renderer": "flot", "repeat": null, "repeatDirection": null, - "seriesOverrides": [], "span": 2, - "stack": false, - "steppedLine": false, "targets": [ { "datasource": null, - "expr": "sum(flyte:propeller:all:node:queueing_latency_ms{project=~\"$project\", domain=~\"$domain\", wf=~\"$workflow\"}) by (quantile)", + "expr": "(avg(flyte:propeller:all:workflow:failure_duration_ms{project=~\"$project\", domain=~\"$domain\", wf=~\"$workflow\"}) by(quantile))/1000", "format": "time_series", "hide": false, "instant": false, @@ -866,55 +742,18 @@ "intervalFactor": 2, "legendFormat": "", "metric": "", - "query": "sum(flyte:propeller:all:node:queueing_latency_ms{project=~\"$project\", domain=~\"$domain\", wf=~\"$workflow\"}) by (quantile)", + "query": "(avg(flyte:propeller:all:workflow:failure_duration_ms{project=~\"$project\", domain=~\"$domain\", wf=~\"$workflow\"}) by(quantile))/1000", "refId": "A", "step": 10, "target": "" } ], - "thresholds": [], "timeFrom": null, "timeShift": null, - "title": "Node queuing latency by Quantile", - "tooltip": { - "msResolution": true, - "shared": true, - "sort": 0, - "value_type": "cumulative" - }, + "title": "Failed wf execution duration by quantile", "transformations": [], "transparent": false, - "type": "graph", - "xaxis": { - "mode": "time", - "name": null, - "show": true, - "values": [] - }, - "yaxes": [ - { - "decimals": null, - "format": "ms", - "label": null, - "logBase": 1, - "max": null, - "min": null, - "show": true - }, - { - "decimals": null, - "format": "short", - "label": null, - "logBase": 1, - "max": null, - "min": null, - "show": true - } - ], - "yaxis": { - "align": false, - "alignLevel": 0 - } + "type": "bargauge" } ], "repeat": null, @@ -939,7 +778,8 @@ "thresholds": { "mode": "absolute", "steps": [] - } + }, + "unit": "" } }, "fill": 1, @@ -952,7 +792,7 @@ "gridPos": null, "height": null, "hideTimeOverride": false, - "id": 8, + "id": 7, "interval": null, "isNew": true, "legend": { @@ -1001,7 +841,7 @@ "instant": false, "interval": "", "intervalFactor": 2, - "legendFormat": "max cpu", + "legendFormat": "CPU limit", "metric": "", "query": "kube_resourcequota{resource=\"limits.cpu\", namespace=\"$project-$domain\", type=\"hard\"}", "refId": "A", @@ -1016,7 +856,7 @@ "instant": false, "interval": "", "intervalFactor": 2, - "legendFormat": "used cpu", + "legendFormat": "CPU requested", "metric": "", "query": "kube_resourcequota{resource=\"limits.cpu\", namespace=\"$project-$domain\", type=\"used\"}", "refId": "B", @@ -1027,7 +867,7 @@ "thresholds": [], "timeFrom": null, "timeShift": null, - "title": "CPU Limits vs usage", + "title": "CPU Limit vs requested by namespace", "tooltip": { "msResolution": true, "shared": true, @@ -1046,7 +886,7 @@ "yaxes": [ { "decimals": null, - "format": "ops", + "format": "short", "label": null, "logBase": 1, "max": null, @@ -1081,7 +921,8 @@ "thresholds": { "mode": "absolute", "steps": [] - } + }, + "unit": "" } }, "fill": 1, @@ -1094,7 +935,7 @@ "gridPos": null, "height": null, "hideTimeOverride": false, - "id": 9, + "id": 8, "interval": null, "isNew": true, "legend": { @@ -1137,30 +978,30 @@ "targets": [ { "datasource": null, - "expr": "kube_resourcequota{resource=\"limits.memory\", namespace=\"$project-$domain\", type=\"hard\"}", + "expr": "(kube_resourcequota{resource=\"limits.memory\", namespace=\"$project-$domain\", type=\"hard\"})*9.5367e-7", "format": "time_series", "hide": false, "instant": false, "interval": "", "intervalFactor": 2, - "legendFormat": "max mem", + "legendFormat": "Memory limit (MiB)", "metric": "", - "query": "kube_resourcequota{resource=\"limits.memory\", namespace=\"$project-$domain\", type=\"hard\"}", + "query": "(kube_resourcequota{resource=\"limits.memory\", namespace=\"$project-$domain\", type=\"hard\"})*9.5367e-7", "refId": "A", "step": 10, "target": "" }, { "datasource": null, - "expr": "kube_resourcequota{resource=\"limits.memory\", namespace=\"$project-$domain\", type=\"used\"}", + "expr": "(kube_resourcequota{resource=\"limits.memory\", namespace=\"$project-$domain\", type=\"used\"})*9.5367e-7", "format": "time_series", "hide": false, "instant": false, "interval": "", "intervalFactor": 2, - "legendFormat": "used mem", + "legendFormat": "Memory requested (MiB)", "metric": "", - "query": "kube_resourcequota{resource=\"limits.memory\", namespace=\"$project-$domain\", type=\"used\"}", + "query": "(kube_resourcequota{resource=\"limits.memory\", namespace=\"$project-$domain\", type=\"used\"})*9.5367e-7", "refId": "B", "step": 10, "target": "" @@ -1169,7 +1010,7 @@ "thresholds": [], "timeFrom": null, "timeShift": null, - "title": "Mem Limits vs usage", + "title": "Memory limit vs requested by namespace (MiB)", "tooltip": { "msResolution": true, "shared": true, @@ -1188,7 +1029,7 @@ "yaxes": [ { "decimals": null, - "format": "ops", + "format": "short", "label": null, "logBase": 1, "max": null, @@ -1213,7 +1054,7 @@ ], "repeat": null, "showTitle": true, - "title": "Kubernetes Quota Usage stats" + "title": "Kubernetes Resource Quota Usage" }, { "collapse": true, @@ -1233,7 +1074,8 @@ "thresholds": { "mode": "absolute", "steps": [] - } + }, + "unit": "" } }, "fill": 1, @@ -1246,7 +1088,7 @@ "gridPos": null, "height": null, "hideTimeOverride": false, - "id": 10, + "id": 9, "interval": null, "isNew": true, "legend": { @@ -1289,7 +1131,7 @@ "targets": [ { "datasource": null, - "expr": "sum(kube_pod_container_status_waiting * on(pod) group_left(label_execution_id, label_task_name, label_node_id, label_workflow_name) kube_pod_labels{label_execution_id !=\"\",namespace=~\"$project-$domain\",label_workflow_name=~\"$workflow\"}) by (namespace, label_execution_id, label_task_name, label_node_id, label_workflow_name) > 0", + "expr": "sum(kube_pod_status_phase{phase=\"Pending\"} * on(pod) group_left(label_task_name, label_node_id, label_workflow_name) kube_pod_labels{label_workflow_name=~\"$workflow\"}) by (namespace, label_task_name, label_node_id, label_workflow_name) > 0", "format": "time_series", "hide": false, "instant": false, @@ -1297,7 +1139,7 @@ "intervalFactor": 2, "legendFormat": "", "metric": "", - "query": "sum(kube_pod_container_status_waiting * on(pod) group_left(label_execution_id, label_task_name, label_node_id, label_workflow_name) kube_pod_labels{label_execution_id !=\"\",namespace=~\"$project-$domain\",label_workflow_name=~\"$workflow\"}) by (namespace, label_execution_id, label_task_name, label_node_id, label_workflow_name) > 0", + "query": "sum(kube_pod_status_phase{phase=\"Pending\"} * on(pod) group_left(label_task_name, label_node_id, label_workflow_name) kube_pod_labels{label_workflow_name=~\"$workflow\"}) by (namespace, label_task_name, label_node_id, label_workflow_name) > 0", "refId": "A", "step": 10, "target": "" @@ -1306,7 +1148,7 @@ "thresholds": [], "timeFrom": null, "timeShift": null, - "title": "Pending tasks", + "title": "Pending Tasks", "tooltip": { "msResolution": true, "shared": true, @@ -1348,8 +1190,6 @@ } }, { - "aliasColors": {}, - "bars": false, "cacheTimeout": null, "datasource": "${DS_PROM}", "description": null, @@ -1357,66 +1197,80 @@ "error": false, "fieldConfig": { "defaults": { + "color": { + "fixedColor": "none", + "mode": "palette-classic" + }, + "custom": { + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "fillOpacity": 80, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "lineWidth": 1, + "scaleDistribution": { + "type": "linear" + } + }, + "mappings": [], "thresholds": { "mode": "absolute", - "steps": [] + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] } - } - }, - "fill": 1, - "grid": { - "threshold1": null, - "threshold1Color": "rgba(216, 200, 27, 0.27)", - "threshold2": null, - "threshold2Color": "rgba(234, 112, 112, 0.22)" + }, + "overrides": [] }, "gridPos": null, "height": null, "hideTimeOverride": false, - "id": 11, + "id": 10, "interval": null, - "isNew": true, - "legend": { - "alignAsTable": false, - "avg": false, - "current": false, - "hideEmpty": false, - "hideZero": false, - "max": false, - "min": false, - "rightSide": false, - "show": true, - "sideWidth": null, - "sort": null, - "sortDesc": false, - "total": false, - "values": false - }, - "lines": true, - "linewidth": 2, "links": [], "maxDataPoints": 100, "maxPerRow": null, "minSpan": null, - "nullPointMode": "connected", "options": { - "alertThreshold": true, - "dataLinks": [] + "barRadius": 0.0, + "barWidth": 0.97, + "groupWidth": 0.7, + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "orientation": "auto", + "showValue": "true", + "stacking": "none", + "tooltip": { + "mode": "single", + "sort": "none" + }, + "xTickLabelRotation": 0, + "xTickLabelSpacing": 0 }, - "percentage": false, - "pointradius": 5, - "points": false, - "renderer": "flot", "repeat": null, "repeatDirection": null, - "seriesOverrides": [], "span": 4, - "stack": false, - "steppedLine": false, "targets": [ { "datasource": null, - "expr": "(100 * max(container_memory_rss{image!=\"\"} * on(pod) group_left(label_execution_id, label_task_name, label_node_id, label_workflow_name) kube_pod_labels{label_execution_id !=\"\",namespace=~\"$project-$domain\",label_workflow_name=~\"$workflow\"} * on(pod) group_left(phase) kube_pod_status_phase{phase=\"Running\"}) by (namespace, pod, label_execution_id, label_task_name, label_node_id, label_workflow_name) / max(kube_pod_container_resource_limits_memory_bytes{container!=\"\"} * on(pod) group_left(label_execution_id, label_task_name, label_node_id, label_workflow_name) kube_pod_labels{label_execution_id !=\"\"} * on(pod) group_left(phase) kube_pod_status_phase{phase=\"Running\"}) by (namespace, pod, label_execution_id, label_task_name, label_node_id, label_workflow_name)) > 0", + "expr": "(100 * (max(container_memory_working_set_bytes{container!=\"\"} * on(pod) group_left(label_task_name, label_node_id, label_workflow_name) kube_pod_labels{namespace=~\"$project-$domain\",label_workflow_name=~\"$workflow\"} * on(pod) group_left(phase) kube_pod_status_phase{phase=\"Running\"}) by (namespace, pod, label_task_name, label_node_id, label_workflow_name) / max(cluster:namespace:pod_memory:active:kube_pod_container_resource_limits{container!=\"\"} * on(pod) group_left(label_task_name, label_node_id, label_workflow_name) kube_pod_labels * on(pod) group_left(phase) kube_pod_status_phase{phase=\"Running\"}) by (namespace, pod, label_task_name, label_node_id, label_workflow_name))) > 0", "format": "time_series", "hide": false, "instant": false, @@ -1424,59 +1278,20 @@ "intervalFactor": 2, "legendFormat": "", "metric": "", - "query": "(100 * max(container_memory_rss{image!=\"\"} * on(pod) group_left(label_execution_id, label_task_name, label_node_id, label_workflow_name) kube_pod_labels{label_execution_id !=\"\",namespace=~\"$project-$domain\",label_workflow_name=~\"$workflow\"} * on(pod) group_left(phase) kube_pod_status_phase{phase=\"Running\"}) by (namespace, pod, label_execution_id, label_task_name, label_node_id, label_workflow_name) / max(kube_pod_container_resource_limits_memory_bytes{container!=\"\"} * on(pod) group_left(label_execution_id, label_task_name, label_node_id, label_workflow_name) kube_pod_labels{label_execution_id !=\"\"} * on(pod) group_left(phase) kube_pod_status_phase{phase=\"Running\"}) by (namespace, pod, label_execution_id, label_task_name, label_node_id, label_workflow_name)) > 0", + "query": "(100 * (max(container_memory_working_set_bytes{container!=\"\"} * on(pod) group_left(label_task_name, label_node_id, label_workflow_name) kube_pod_labels{namespace=~\"$project-$domain\",label_workflow_name=~\"$workflow\"} * on(pod) group_left(phase) kube_pod_status_phase{phase=\"Running\"}) by (namespace, pod, label_task_name, label_node_id, label_workflow_name) / max(cluster:namespace:pod_memory:active:kube_pod_container_resource_limits{container!=\"\"} * on(pod) group_left(label_task_name, label_node_id, label_workflow_name) kube_pod_labels * on(pod) group_left(phase) kube_pod_status_phase{phase=\"Running\"}) by (namespace, pod, label_task_name, label_node_id, label_workflow_name))) > 0", "refId": "A", "step": 10, "target": "" } ], - "thresholds": [], "timeFrom": null, "timeShift": null, - "title": "Memory Usage Percentage", - "tooltip": { - "msResolution": true, - "shared": true, - "sort": 0, - "value_type": "cumulative" - }, + "title": "Memory Usage per Task(%)", "transformations": [], "transparent": false, - "type": "graph", - "xaxis": { - "mode": "time", - "name": null, - "show": true, - "values": [] - }, - "yaxes": [ - { - "decimals": null, - "format": "short", - "label": null, - "logBase": 1, - "max": null, - "min": null, - "show": true - }, - { - "decimals": null, - "format": "short", - "label": null, - "logBase": 1, - "max": null, - "min": null, - "show": true - } - ], - "yaxis": { - "align": false, - "alignLevel": 0 - } + "type": "barchart" }, { - "aliasColors": {}, - "bars": false, "cacheTimeout": null, "datasource": "${DS_PROM}", "description": null, @@ -1484,66 +1299,80 @@ "error": false, "fieldConfig": { "defaults": { + "color": { + "fixedColor": "none", + "mode": "palette-classic" + }, + "custom": { + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "fillOpacity": 80, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "lineWidth": 1, + "scaleDistribution": { + "type": "linear" + } + }, + "mappings": [], "thresholds": { "mode": "absolute", - "steps": [] + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] } - } - }, - "fill": 1, - "grid": { - "threshold1": null, - "threshold1Color": "rgba(216, 200, 27, 0.27)", - "threshold2": null, - "threshold2Color": "rgba(234, 112, 112, 0.22)" + }, + "overrides": [] }, "gridPos": null, "height": null, "hideTimeOverride": false, - "id": 12, + "id": 11, "interval": null, - "isNew": true, - "legend": { - "alignAsTable": false, - "avg": false, - "current": false, - "hideEmpty": false, - "hideZero": false, - "max": false, - "min": false, - "rightSide": false, - "show": true, - "sideWidth": null, - "sort": null, - "sortDesc": false, - "total": false, - "values": false - }, - "lines": true, - "linewidth": 2, "links": [], "maxDataPoints": 100, "maxPerRow": null, "minSpan": null, - "nullPointMode": "connected", "options": { - "alertThreshold": true, - "dataLinks": [] + "barRadius": 0.0, + "barWidth": 0.97, + "groupWidth": 0.7, + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "orientation": "auto", + "showValue": "true", + "stacking": "none", + "tooltip": { + "mode": "single", + "sort": "none" + }, + "xTickLabelRotation": 0, + "xTickLabelSpacing": 0 }, - "percentage": false, - "pointradius": 5, - "points": false, - "renderer": "flot", "repeat": null, "repeatDirection": null, - "seriesOverrides": [], "span": 4, - "stack": false, - "steppedLine": false, "targets": [ { "datasource": null, - "expr": "(100* sum(rate(container_cpu_usage_seconds_total{image!=\"\"}[2m]) * on(pod) group_left(label_execution_id, label_task_name, label_node_id, label_workflow_name) kube_pod_labels{label_execution_id !=\"\",namespace=~\"$project-$domain\",label_workflow_name=~\"$workflow\"} * on(pod) group_left(phase) kube_pod_status_phase{phase=\"Running\"}) by (namespace, pod, label_execution_id, label_task_name, label_node_id, label_workflow_name) / sum(kube_pod_container_resource_limits_cpu_cores{container!=\"\"} * on(pod) group_left(label_execution_id, label_task_name, label_node_id, label_workflow_name) kube_pod_labels{label_execution_id !=\"\"} * on(pod) group_left(phase) kube_pod_status_phase{phase=\"Running\"}) by (namespace, pod, label_execution_id, label_task_name, label_node_id, label_workflow_name)) > 0", + "expr": "(100 * (sum(rate(container_cpu_usage_seconds_total{image!=\"\"}[2m]) * on(pod) group_left(label_task_name, label_node_id, label_workflow_name) kube_pod_labels{namespace=~\"$project-$domain\",label_workflow_name=~\"$workflow\"} * on(pod) group_left(phase) kube_pod_status_phase{phase=\"Running\"}) by (namespace, pod, label_task_name, label_node_id, label_workflow_name) / sum(cluster:namespace:pod_cpu:active:kube_pod_container_resource_limits{container!=\"\"} * on(pod) group_left(label_task_name, label_node_id, label_workflow_name) kube_pod_labels * on(pod) group_left(phase) kube_pod_status_phase{phase=\"Running\"}) by (namespace, pod, label_task_name, label_node_id, label_workflow_name))) > 0", "format": "time_series", "hide": false, "instant": false, @@ -1551,55 +1380,18 @@ "intervalFactor": 2, "legendFormat": "", "metric": "", - "query": "(100* sum(rate(container_cpu_usage_seconds_total{image!=\"\"}[2m]) * on(pod) group_left(label_execution_id, label_task_name, label_node_id, label_workflow_name) kube_pod_labels{label_execution_id !=\"\",namespace=~\"$project-$domain\",label_workflow_name=~\"$workflow\"} * on(pod) group_left(phase) kube_pod_status_phase{phase=\"Running\"}) by (namespace, pod, label_execution_id, label_task_name, label_node_id, label_workflow_name) / sum(kube_pod_container_resource_limits_cpu_cores{container!=\"\"} * on(pod) group_left(label_execution_id, label_task_name, label_node_id, label_workflow_name) kube_pod_labels{label_execution_id !=\"\"} * on(pod) group_left(phase) kube_pod_status_phase{phase=\"Running\"}) by (namespace, pod, label_execution_id, label_task_name, label_node_id, label_workflow_name)) > 0", + "query": "(100 * (sum(rate(container_cpu_usage_seconds_total{image!=\"\"}[2m]) * on(pod) group_left(label_task_name, label_node_id, label_workflow_name) kube_pod_labels{namespace=~\"$project-$domain\",label_workflow_name=~\"$workflow\"} * on(pod) group_left(phase) kube_pod_status_phase{phase=\"Running\"}) by (namespace, pod, label_task_name, label_node_id, label_workflow_name) / sum(cluster:namespace:pod_cpu:active:kube_pod_container_resource_limits{container!=\"\"} * on(pod) group_left(label_task_name, label_node_id, label_workflow_name) kube_pod_labels * on(pod) group_left(phase) kube_pod_status_phase{phase=\"Running\"}) by (namespace, pod, label_task_name, label_node_id, label_workflow_name))) > 0", "refId": "A", "step": 10, "target": "" } ], - "thresholds": [], "timeFrom": null, "timeShift": null, - "title": "CPU Usage Percentage", - "tooltip": { - "msResolution": true, - "shared": true, - "sort": 0, - "value_type": "cumulative" - }, + "title": "CPU Usage per Task(%)", "transformations": [], "transparent": false, - "type": "graph", - "xaxis": { - "mode": "time", - "name": null, - "show": true, - "values": [] - }, - "yaxes": [ - { - "decimals": null, - "format": "short", - "label": null, - "logBase": 1, - "max": null, - "min": null, - "show": true - }, - { - "decimals": null, - "format": "short", - "label": null, - "logBase": 1, - "max": null, - "min": null, - "show": true - } - ], - "yaxis": { - "align": false, - "alignLevel": 0 - } + "type": "barchart" } ], "repeat": null, @@ -1624,7 +1416,8 @@ "thresholds": { "mode": "absolute", "steps": [] - } + }, + "unit": "" } }, "fill": 1, @@ -1637,7 +1430,7 @@ "gridPos": null, "height": null, "hideTimeOverride": false, - "id": 13, + "id": 12, "interval": null, "isNew": true, "legend": { @@ -1697,7 +1490,7 @@ "thresholds": [], "timeFrom": null, "timeShift": null, - "title": "User errors", + "title": "User error rate", "tooltip": { "msResolution": true, "shared": true, @@ -1751,7 +1544,8 @@ "thresholds": { "mode": "absolute", "steps": [] - } + }, + "unit": "" } }, "fill": 1, @@ -1764,7 +1558,7 @@ "gridPos": null, "height": null, "hideTimeOverride": false, - "id": 14, + "id": 13, "interval": null, "isNew": true, "legend": { @@ -1824,7 +1618,7 @@ "thresholds": [], "timeFrom": null, "timeShift": null, - "title": "System errors", + "title": "System error rate", "tooltip": { "msResolution": true, "shared": true, @@ -1868,7 +1662,7 @@ ], "repeat": null, "showTitle": true, - "title": "Error (System vs user)" + "title": "Error (System vs User)" } ], "schemaVersion": 12, @@ -1971,6 +1765,7 @@ }, "timepicker": { "hidden": false, + "nowDelay": null, "refresh_intervals": [ "5s", "10s", diff --git a/docs/user_guide/concepts/main_concepts/data_management.rst b/docs/user_guide/concepts/main_concepts/data_management.rst index 0d4edbd0a8..bc492a56f8 100644 --- a/docs/user_guide/concepts/main_concepts/data_management.rst +++ b/docs/user_guide/concepts/main_concepts/data_management.rst @@ -9,47 +9,47 @@ Understand How Flyte Handles Data Types of Data ============= -There are two parts to the data in Flyte: +In Flyte, data is categorized into metadata and raw data to optimize data handling and improve performance and security. -1. Metadata +* **Metadata**: Small values, like integers and strings, are treated as "stack parameters" (passed by value). This metadata is globally accessible to Flyte components (FlytePropeller, FlyteAdmin, and other running pods/jobs). Each entry is limited to 10MB and is passed directly between tasks. On top of that, metadata allow in-memory computations for branches, partial outputs, and composition of multiple outputs as input for other tasks. -* It consists of data about inputs to a task, and other artifacts. -* It is configured globally for FlytePropeller, FlyteAdmin etc., and the running pods/jobs need access to this bucket to get the data. +* **Raw data**: Larger data, such as files and dataframes, are treated as "heap parameters" (passed by reference). Flyte stores raw data in an object store (e.g., S3), uploading it on first use and passing only a reference thereafter. Tasks can then access this data via Flyte’s automated download or streaming, enabling efficient access to large datasets without needing to transfer full copies. -2. Raw data +*Source code reference for auto-offloading value sizes limitation:* -* It is the actual data (such as the Pandas DataFrame, Spark DataFrame, etc.). -* Raw data paths are unique for every execution, and the prefixes can be modified per execution. -* None of the Flyte control plane components would access the raw data. This provides great separation of data between the control plane and the data plane. +.. raw:: html -.. note: - Metadata and raw data can be present in entirely separate buckets. + View source code on GitHub +Data Flow and Security +~~~~~~~~~~~~~~~~~~~~~~ -Let us consider a simple Python task: +Flyte’s data separation avoids bottlenecks and security risks: + +* **Metadata** remains within Flyte’s control plane, making it accessible through the Flyte Console or CLI. +* **Raw Data** is accessible only by tasks, stored securely in an external blob store, preventing Flyte’s control plane from directly handling large data files. + +Moreover, a unique property of this separation is that all meta values are read by FlytePropeller engine and available on the FlyteConsole or CLI from the control plane. + +Example +~~~~~~~ + +Consider a basic Flyte task: .. code-block:: python - @task - def my_task(m: int, n: str, o: FlyteFile) -> pd.DataFrame: - ... + @task + def my_task(m: int, n: str, o: FlyteFile) -> pd.DataFrame: + ... -In the above code sample, ``m``, ``n``, ``o`` are inputs to the task. -``m`` of type ``int`` and ``n`` of type ``str`` are simple primitive types, while ``o`` is an arbitrarily sized file. -All of them from Flyte's point of view are ``data``. -The difference lies in how Flyte stores and passes each of these data items. -For every task that receives input, Flyte sends an **Inputs Metadata** object, which contains all the primitive or simple scalar values inlined, but in the case of -complex, large objects, they are offloaded and the `Metadata` simply stores a reference to the object. In our example, ``m`` and ``n`` are inlined while -``o`` and the output ``pd.DataFrame`` are offloaded to an object store, and their reference is captured in the metadata. +In this task, ``m``, ``n``, and ``o`` are inputs: ``m`` (int) and ``n`` (str) are simple types, while ``o`` is a large, arbitrarily sized file. +Flyte treats each differently: -`Flytekit TypeTransformers` make it possible to use complex objects as if they are available locally - just like persistent filehandles. But Flyte backend only deals with -the references. +* Metadata: Small values like ``m`` and ``n`` are inlined within Flyte’s metadata and passed directly between tasks. +* Raw data: Objects like ``o`` and the output pd.DataFrame are offloaded to an object store (e.g., S3), with only references retained in metadata. -Thus, primitive data types and references to large objects fall under Metadata - `Meta input` or `Meta output`, and the actual large object is known as **Raw data**. -A unique property of this separation is that all `meta values` are read by FlytePropeller engine and available on the FlyteConsole or CLI from the control plane. -`Raw` data is not read by any of the Flyte components and hence it is possible to store it in a completely separate blob storage or alternate stores, which can't be accessed by Flyte control plane components -but can be accessed by users's container/tasks. +Flytekit TypeTransformers make it possible to use complex objects as if they are available locally, just like persistent filehandles. However, the Flyte backend only deals with the references. Raw Data Prefix ~~~~~~~~~~~~~~~ @@ -57,22 +57,17 @@ Raw Data Prefix Every task can read/write its own data files. If ``FlyteFile`` or any natively supported type like ``pandas.DataFrame`` is used, Flyte will automatically offload and download data from the configured object-store paths. These paths are completely customizable per `LaunchPlan` or `Execution`. -- The default Rawoutput path (prefix in an object store like S3/GCS) can be configured during registration as shown in :std:ref:`flytectl_register_files`. +* The default Rawoutput path (prefix in an object store like S3/GCS) can be configured during registration as shown in :std:ref:`flytectl_register_files`. The argument ``--outputLocationPrefix`` allows us to set the destination directory for all the raw data produced. Flyte will create randomized folders in this path to store the data. -- To override the ``RawOutput`` path (prefix in an object store like S3/GCS), you can specify an alternate location when invoking a Flyte execution, as shown in the following screenshot of the LaunchForm in FlyteConsole: +* To override the ``RawOutput`` path (prefix in an object store like S3/GCS), + you can specify an alternate location when invoking a Flyte execution, as shown in the following screenshot of the LaunchForm in FlyteConsole: - .. image:: https://raw.githubusercontent.com/flyteorg/static-resources/main/flyte/concepts/data_movement/launch_raw_output.png + .. image:: https://raw.githubusercontent.com/flyteorg/static-resources/9cb3d56d7f3b88622749b41ff7ad2d3ebce92726/flyte/concepts/data_movement/launch_raw_output.png -- In the sandbox, the default Rawoutput-prefix is configured to be the root of the local bucket. Hence Flyte will write all the raw data (reference types like blob, file, df/schema/parquet, etc.) under a path defined by the execution. +* In the sandbox, the default Rawoutput-prefix is configured to be the root of the local bucket. + Hence Flyte will write all the raw data (reference types like blob, file, df/schema/parquet, etc.) under a path defined by the execution. -Metadata -~~~~~~~~ - -Metadata in Flyte is critical to enable the passing of data between tasks. It allows to perform in-memory computations for branches or send partial outputs from one task to another or compose outputs from multiple tasks into one input to be sent to a task. - -Thus, metadata is restricted due to its omnipresence. Each `meta output`/`input` cannot be larger than 1MB. If you have `List[int]`, it cannot be larger than 1MB, considering other input entities. In scenarios where large lists or strings need to be sent between tasks, file abstraction is preferred. - ``LiteralType`` & Literals ~~~~~~~~~~~~~~~~~~~~~~~~~~ @@ -154,15 +149,15 @@ The illustration below explains how data flows from engine to the task and how t We could use fast metadata stores to speed up data movement or exploit locality. Between Flytepropeller and Tasks -~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -.. image:: https://raw.githubusercontent.com/flyteorg/static-resources/main/flyte/concepts/data_movement/flyte_data_movement.png +.. image:: https://raw.githubusercontent.com/flyteorg/static-resources/9cb3d56d7f3b88622749b41ff7ad2d3ebce92726/flyte/concepts/data_movement/flyte_data_movement.png Between Tasks -~~~~~~~~~~~~~~ +~~~~~~~~~~~~~ -.. image:: https://raw.githubusercontent.com/flyteorg/static-resources/main/flyte/concepts/data_movement/flyte_data_transfer.png +.. image:: https://raw.githubusercontent.com/flyteorg/static-resources/9cb3d56d7f3b88622749b41ff7ad2d3ebce92726/flyte/concepts/data_movement/flyte_data_transfer.png Bringing in Your Own Datastores for Raw Data @@ -174,3 +169,75 @@ For example, it is theoretically possible to use S3 ``s3://`` for metadata and G But for Metadata, the data should be accessible to Flyte control plane. Data persistence is also pluggable. By default, it supports all major blob stores and uses an interface defined in Flytestdlib. + +Practical Example +~~~~~~~~~~~~~~~~~ + +Let's consider a simple example where we have some tasks that needs to operate huge dataframes. + +The first task reads a file from the object store, shuffles the data, saves to local disk, and passes the path to the next task. + +.. code-block:: python + + @task() + def task_remove_column(input_file: FlyteFile, column_name: str) -> FlyteFile: + """ + Reads the input file as a DataFrame, removes a specified column, and outputs it as a new file. + """ + input_file.download() + df = pd.read_csv(input_file.path) + + # remove column + if column_name in df.columns: + df = df.drop(columns=[column_name]) + + output_file_path = "data_finished.csv" + df.to_csv(output_file_path, index=False) + + return FlyteFile(output_file_path) + ... + +The second task reads the file from the previous task, removes a column, saves to local disk, and returns the path. + +.. code-block:: python + + @task() + def task_remove_column(input_file: FlyteFile, column_name: str) -> FlyteFile: + """ + Reads the input file as a DataFrame, removes a specified column, and outputs it as a new file. + """ + input_file.download() + df = pd.read_csv(input_file.path) + + # remove column + if column_name in df.columns: + df = df.drop(columns=[column_name]) + + output_file_path = "data_finished.csv" + df.to_csv(output_file_path, index=False) + + return FlyteFile(output_file_path) + ... + +And here is the workflow: + +.. code-block:: python + + @workflow + def wf() -> FlyteFile: + existed_file = FlyteFile("s3://custom-bucket/data.csv") + shuffled_file = task_read_and_shuffle_file(input_file=existed_file) + result_file = task_remove_column(input_file=shuffled_file, column_name="County") + return result_file + ... + +This example shows how to access an existing file in a MinIO bucket from the Flyte Sandbox and pass it between tasks with ``FlyteFile``. +When a workflow outputs a local file as a ``FlyteFile``, Flyte automatically uploads it to MinIO and provides an S3 URL for downstream tasks, no manual uploads needed. Take a look at the following: + +First task output metadata: + +.. image:: https://raw.githubusercontent.com/flyteorg/static-resources/9cb3d56d7f3b88622749b41ff7ad2d3ebce92726/flyte/concepts/data_movement/flyte_data_movement_example_output.png + +Second task input metadata: + +.. image:: https://raw.githubusercontent.com/flyteorg/static-resources/9cb3d56d7f3b88622749b41ff7ad2d3ebce92726/flyte/concepts/data_movement/flyte_data_movement_example_input.png diff --git a/docs/user_guide/data_types_and_io/flytedirectory.md b/docs/user_guide/data_types_and_io/flytedirectory.md index 121a7d9b67..4ad2316ded 100644 --- a/docs/user_guide/data_types_and_io/flytedirectory.md +++ b/docs/user_guide/data_types_and_io/flytedirectory.md @@ -86,4 +86,21 @@ You can run the workflow locally as follows: :lines: 94-114 ``` + +## Streaming support + +Flyte `1.5` introduced support for streaming `FlyteDirectory` types via the `fsspec` library. +The `FlyteDirectory` streaming feature enables efficient streaming and handling of entire directories, simplifying operations involving multiple files. + +:::{note} +This feature is marked as experimental. We'd love feedback on the API! +::: + +Here is a simple example, you can accept a `FlyteDirectory` as an input, walk through it and copy the files to another `FlyteDirectory` one by one. + +```{rli} https://raw.githubusercontent.com/flyteorg/flytesnacks/ddce0448141ea6d2cb148df52bf408874adb15ad/examples/data_types_and_io/data_types_and_io/file_streaming.py +:caption: data_types_and_io/file_streaming.py +:lines: 23-33 +``` + [flytesnacks]: https://github.com/flyteorg/flytesnacks/tree/master/examples/data_types_and_io/ diff --git a/docs/user_guide/data_types_and_io/flytefile.md b/docs/user_guide/data_types_and_io/flytefile.md index e9c02e2132..44378a315a 100644 --- a/docs/user_guide/data_types_and_io/flytefile.md +++ b/docs/user_guide/data_types_and_io/flytefile.md @@ -90,4 +90,20 @@ You can enable type validation if you have the [python-magic](https://pypi.org/p Currently, type validation is only supported on the `Mac OS` and `Linux` platforms. ::: +## Streaming support + +Flyte `1.5` introduced support for streaming `FlyteFile` types via the `fsspec` library. +This integration enables efficient, on-demand access to remote files, eliminating the need for fully downloading them to local storage. + +:::{note} +This feature is marked as experimental. We'd love feedback on the API! +::: + +Here is a simple example of removing some columns from a CSV file and writing the result to a new file: + +```{rli} https://raw.githubusercontent.com/flyteorg/flytesnacks/ddce0448141ea6d2cb148df52bf408874adb15ad/examples/data_types_and_io/data_types_and_io/file_streaming.py +:caption: data_types_and_io/file_streaming.py +:lines: 8-20 +``` + [flytesnacks]: https://github.com/flyteorg/flytesnacks/tree/master/examples/data_types_and_io/ diff --git a/docs/user_guide/data_types_and_io/structureddataset.md b/docs/user_guide/data_types_and_io/structureddataset.md index e4eed0a956..9a82610590 100644 --- a/docs/user_guide/data_types_and_io/structureddataset.md +++ b/docs/user_guide/data_types_and_io/structureddataset.md @@ -39,7 +39,7 @@ To begin, import the dependencies for the example: ```{rli} https://raw.githubusercontent.com/flyteorg/flytesnacks/69dbe4840031a85d79d9ded25f80397c6834752d/examples/data_types_and_io/data_types_and_io/structured_dataset.py :caption: data_types_and_io/structured_dataset.py -:lines: 1-18 +:lines: 1-19 ``` Define a task that returns a Pandas DataFrame. @@ -68,7 +68,7 @@ First, initialize column types you want to extract from the `StructuredDataset`. ```{rli} https://raw.githubusercontent.com/flyteorg/flytesnacks/69dbe4840031a85d79d9ded25f80397c6834752d/examples/data_types_and_io/data_types_and_io/structured_dataset.py :caption: data_types_and_io/structured_dataset.py -:lines: 30-31 +:lines: 31-32 ``` Define a task that opens a structured dataset by calling `all()`. @@ -78,7 +78,7 @@ For instance, you can use ``pa.Table`` to convert the Pandas DataFrame to a PyAr ```{rli} https://raw.githubusercontent.com/flyteorg/flytesnacks/69dbe4840031a85d79d9ded25f80397c6834752d/examples/data_types_and_io/data_types_and_io/structured_dataset.py :caption: data_types_and_io/structured_dataset.py -:lines: 41-51 +:lines: 42-52 ``` The code may result in runtime failures if the columns do not match. @@ -91,7 +91,7 @@ and enable the CSV serialization by annotating the structured dataset with the C ```{rli} https://raw.githubusercontent.com/flyteorg/flytesnacks/69dbe4840031a85d79d9ded25f80397c6834752d/examples/data_types_and_io/data_types_and_io/structured_dataset.py :caption: data_types_and_io/structured_dataset.py -:lines: 57-71 +:lines: 58-72 ``` ## Storage driver and location @@ -230,14 +230,14 @@ and the byte format, which in this case is `PARQUET`. ```{rli} https://raw.githubusercontent.com/flyteorg/flytesnacks/69dbe4840031a85d79d9ded25f80397c6834752d/examples/data_types_and_io/data_types_and_io/structured_dataset.py :caption: data_types_and_io/structured_dataset.py -:lines: 127-129 +:lines: 128-130 ``` You can now use `numpy.ndarray` to deserialize the parquet file to NumPy and serialize a task's output (NumPy array) to a parquet file. ```{rli} https://raw.githubusercontent.com/flyteorg/flytesnacks/69dbe4840031a85d79d9ded25f80397c6834752d/examples/data_types_and_io/data_types_and_io/structured_dataset.py :caption: data_types_and_io/structured_dataset.py -:lines: 134-149 +:lines: 135-148 ``` :::{note} @@ -248,7 +248,7 @@ You can run the code locally as follows: ```{rli} https://raw.githubusercontent.com/flyteorg/flytesnacks/69dbe4840031a85d79d9ded25f80397c6834752d/examples/data_types_and_io/data_types_and_io/structured_dataset.py :caption: data_types_and_io/structured_dataset.py -:lines: 153-157 +:lines: 152-156 ``` ### The nested typed columns @@ -261,7 +261,7 @@ Nested field StructuredDataset should be run when flytekit version > 1.11.0. ```{rli} https://raw.githubusercontent.com/flyteorg/flytesnacks/69dbe4840031a85d79d9ded25f80397c6834752d/examples/data_types_and_io/data_types_and_io/structured_dataset.py :caption: data_types_and_io/structured_dataset.py -:lines: 159-270 +:lines: 158-285 ``` [flytesnacks]: https://github.com/flyteorg/flytesnacks/tree/master/examples/data_types_and_io/ diff --git a/flyteadmin/pkg/server/service.go b/flyteadmin/pkg/server/service.go index 840d0d9f17..3c5197d6c8 100644 --- a/flyteadmin/pkg/server/service.go +++ b/flyteadmin/pkg/server/service.go @@ -516,9 +516,19 @@ func serveGatewaySecure(ctx context.Context, pluginRegistry *plugins.Registry, c panic(err) } + handler := grpcHandlerFunc(grpcServer, httpServer) + if cfg.Security.AllowCors { + handler = handlers.CORS( + handlers.AllowCredentials(), + handlers.AllowedOrigins(cfg.Security.AllowedOrigins), + handlers.AllowedHeaders(append(defaultCorsHeaders, cfg.Security.AllowedHeaders...)), + handlers.AllowedMethods([]string{"GET", "POST", "DELETE", "HEAD", "PUT", "PATCH"}), + )(handler) + } + srv := &http.Server{ Addr: cfg.GetHostAddress(), - Handler: grpcHandlerFunc(grpcServer, httpServer), + Handler: handler, // #nosec G402 TLSConfig: &tls.Config{ Certificates: []tls.Certificate{*cert}, diff --git a/rfc/system/0008-community-plugins.md b/rfc/system/0008-community-plugins.md new file mode 100644 index 0000000000..0283657c87 --- /dev/null +++ b/rfc/system/0008-community-plugins.md @@ -0,0 +1,59 @@ +# Management of community-contributed plugins + +**Authors:** + +- @davidmirror-ops + + +## 1 Executive Summary + +The Flyte community as a self-governed and productive collective of individuals, values contributions. This proposal aims to discuss the process community contributors should follow to submit a new `flytekit` plugin, with special attention to mechanisms that ensure stability and maintainability of core flyte code. + +## 2 Motivation + +- With the current "in-tree" approach, plugins developed by the community land in the `flytekit` repo ([example](https://github.com/flyteorg/flytekit/pull/2537)). It results in Flyte maintainers having to take care of CI test failures due to plugin code or flytekit updates incompatible with plugin code, etc. Flyte maintainers are also expected to provide support about and fix bugs in plugins integrating 3rd party libraries that they might have little knowledge off. + +- The goal is to agree on a process for contributors to follow when submitting new integrations in a "out-of-tree" way that clearly communicates that it is a community-contributed -and then- community-supported integration. + +## 3 Proposed Implementation + +- Create a `community` folder under `flytekit/plugins` and keep releasing the plugins in that folder as separate `pypi` packages. +- Configure CI to only run tests on `plugins/community` when there are changes to a respective plugin. +- Keep releasing community plugins alongside flytekit, even if there are no changes. +- Explicitly mark plugins as community maintained in the import via `import flytekitplugins.contrib.x` +- Plugin authors are responsible for maintaining their plugins. In case there are PRs to change a community plugin, the plugin maintainers review the PR and give a non-binding approval. Once a community plugin maintainer has given a non-binding approval, a `flytekit` maintainer has to give a binding approval in order for the PR to be merged. + +This proposal includes agent plugins. +### Promotion process to official plugin + +An official plugin is one that is maintained by the core Flyte team and is made part of the official `flytekit` documentation. + +- Plugin maintainers or community members can propose the promotion of a plugin to official by creating an Issue on the `flytekit` repo. +- The supermajority of the TSC must approve publicly before promoting a plugin. + +To consider it for promotion, a plugin must meet the following criteria: + +- Production readiness testing performed by the core Flyte team or documented by plugin users or maintainers +- Evidence of ongoing usage through Github issues or Slack threads +- Documented in flytekit's documentation + + + +## 4 Drawbacks + +- Potential overhead: CI configuration changes in flytekit (probably a one-time change) + +## 5 Alternatives + +- Maintain community plugins on a separate repo + - Against the monorepo initiative +- Have community packages be it's own org + - Significantly higher management overhead +- `flytekit` plugins built into their own package + - Potentially heavier development process + +- Adding plugin authors as CODEOWNERS won't be considered due to a [Github permission model](https://docs.github.com/en/repositories/managing-your-repositorys-settings-and-features/customizing-your-repository/about-code-owners) limitation: + +>The people you choose as code owners must have write permissions for the repository. + +Getting write permissions in `flytekit` via contributing plugins is not part of the [current Governance model](https://github.com/flyteorg/community/blob/main/GOVERNANCE.md#community-roles-and-path-to-maintainership) for flyte. diff --git a/stats/flyteuser.dashboard.py b/stats/flyteuser.dashboard.py index d554532b24..763362f004 100644 --- a/stats/flyteuser.dashboard.py +++ b/stats/flyteuser.dashboard.py @@ -1,6 +1,6 @@ import typing from grafanalib.core import ( - Alert, AlertCondition, Dashboard, Graph, + Alert, AlertCondition, Dashboard, Graph,BarChart,BarGauge, GreaterThan, OP_AND, OPS_FORMAT, Row, RTYPE_SUM, SECONDS_FORMAT, SHORT_FORMAT, single_y_axis, Target, TimeRange, YAxes, YAxis, MILLISECONDS_FORMAT, Templating, Template, DataSourceInput @@ -19,11 +19,11 @@ def workflow_stats(collapse: bool) -> Row: collapse=collapse, panels=[ Graph( - title="Accepted Workflow", + title="Accepted Workflows (avg)", dataSource=DATASOURCE, targets=[ Target( - expr='sum(rate(flyte:propeller:all:workflow:accepted{project=~"$project", domain=~"$domain", wf=~"$workflow"}[5m]))', + expr='avg(flyte:propeller:all:workflow:accepted{project=~"$project", domain=~"$domain", wf=~"$workflow"})', refId='A', ), ], @@ -33,7 +33,7 @@ def workflow_stats(collapse: bool) -> Row: ), ), Graph( - title="Successful Workflow", + title="Workflow success rate", dataSource=DATASOURCE, targets=[ Target( @@ -41,13 +41,10 @@ def workflow_stats(collapse: bool) -> Row: refId='A', ), ], - yAxes=YAxes( - YAxis(format=OPS_FORMAT), - YAxis(format=SHORT_FORMAT), - ), + yAxes=single_y_axis(format=OPS_FORMAT), ), Graph( - title="Failed Workflow", + title="Workflow failure rate", dataSource=DATASOURCE, targets=[ Target( @@ -55,105 +52,85 @@ def workflow_stats(collapse: bool) -> Row: refId='A', ), ], - yAxes=YAxes( - YAxis(format=OPS_FORMAT), - YAxis(format=SHORT_FORMAT), - ), + yAxes=single_y_axis(format=OPS_FORMAT), ), Graph( - title="Aborted Workflow", + title="Aborted Workflows (avg)", dataSource=DATASOURCE, targets=[ Target( - expr='sum(rate(flyte:propeller:all:workflow:workflow_aborted{project=~"$project", domain=~"$domain", wf=~"$workflow"}[5m]))', + expr='avg_over_time(flyte:propeller:all:workflow:workflow_aborted{project=~"$project", domain=~"$domain", wf=~"$workflow"}[5m])', refId='A', ), ], - yAxes=YAxes( - YAxis(format=OPS_FORMAT), - YAxis(format=SHORT_FORMAT), - ), + yAxes=single_y_axis(format=SHORT_FORMAT), ), - Graph( - title="Successful workflow execution time by Quantile", + BarGauge( + title="Successful wf execution duration by quantile", dataSource=DATASOURCE, targets=[ Target( - expr='sum(flyte:propeller:all:workflow:success_duration_ms{project=~"$project", domain=~"$domain", wf=~"$workflow"}) by (quantile)', + expr='(avg(flyte:propeller:all:workflow:success_duration_ms{project=~"$project", domain=~"$domain", wf=~"$workflow"}) by(quantile))/1000', refId='A', ), ], - yAxes=single_y_axis(format=MILLISECONDS_FORMAT), + orientation='horizontal', + format=SECONDS_FORMAT, ), - Graph( - title="Failed workflow execution time by Quantile", + BarGauge( + title="Failed wf execution duration by quantile", dataSource=DATASOURCE, targets=[ Target( - expr='sum(flyte:propeller:all:workflow:failure_duration_ms{project=~"$project", domain=~"$domain", wf=~"$workflow"}) by (quantile)', + expr='(avg(flyte:propeller:all:workflow:failure_duration_ms{project=~"$project", domain=~"$domain", wf=~"$workflow"}) by(quantile))/1000', refId='A', ), ], - yAxes=single_y_axis(format=MILLISECONDS_FORMAT), - ), - Graph( - title="Node queuing latency by Quantile", - dataSource=DATASOURCE, - targets=[ - Target( - expr='sum(flyte:propeller:all:node:queueing_latency_ms{project=~"$project", domain=~"$domain", wf=~"$workflow"}) by (quantile)', - refId='A', - ), - ], - yAxes=single_y_axis(format=MILLISECONDS_FORMAT), + orientation='horizontal', + format=SECONDS_FORMAT, ), ]) @staticmethod def quota_stats(collapse: bool) -> Row: return Row( - title="Kubernetes Quota Usage stats", + title="Kubernetes Resource Quota Usage", collapse=collapse, panels=[ Graph( - title="CPU Limits vs usage", + title="CPU Limit vs requested by namespace", dataSource=DATASOURCE, targets=[ Target( expr='kube_resourcequota{resource="limits.cpu", namespace="$project-$domain", type="hard"}', refId='A', - legendFormat="max cpu", + legendFormat="CPU limit", ), Target( expr='kube_resourcequota{resource="limits.cpu", namespace="$project-$domain", type="used"}', refId='B', - legendFormat="used cpu", + legendFormat="CPU requested", ), ], yAxes=YAxes( - YAxis(format=OPS_FORMAT), YAxis(format=SHORT_FORMAT), ), ), Graph( - title="Mem Limits vs usage", + title="Memory limit vs requested by namespace (MiB)", dataSource=DATASOURCE, targets=[ Target( - expr='kube_resourcequota{resource="limits.memory", namespace="$project-$domain", type="hard"}', + expr='(kube_resourcequota{resource="limits.memory", namespace="$project-$domain", type="hard"})*9.5367e-7', refId='A', - legendFormat="max mem", + legendFormat="Memory limit (MiB)", ), Target( - expr='kube_resourcequota{resource="limits.memory", namespace="$project-$domain", type="used"}', + expr='(kube_resourcequota{resource="limits.memory", namespace="$project-$domain", type="used"})*9.5367e-7', refId='B', - legendFormat="used mem", + legendFormat="Memory requested (MiB)", ), ], - yAxes=YAxes( - YAxis(format=OPS_FORMAT), - YAxis(format=SHORT_FORMAT), - ), ), ]) @@ -164,48 +141,48 @@ def resource_stats(collapse: bool) -> Row: collapse=collapse, panels=[ Graph( - title="Pending tasks", + title="Pending Tasks", dataSource=DATASOURCE, targets=[ Target( - expr='sum(kube_pod_container_status_waiting * on(pod) group_left(label_execution_id, label_task_name, label_node_id, label_workflow_name) kube_pod_labels{label_execution_id !="",namespace=~"$project-$domain",label_workflow_name=~"$workflow"}) by (namespace, label_execution_id, label_task_name, label_node_id, label_workflow_name) > 0', + expr='sum(kube_pod_status_phase{phase="Pending"} * on(pod) group_left(label_task_name, label_node_id, label_workflow_name) kube_pod_labels{label_workflow_name=~"$workflow"}) by (namespace, label_task_name, label_node_id, label_workflow_name) > 0', refId='A', ), ], yAxes=single_y_axis(format=SHORT_FORMAT), ), - Graph( - title="Memory Usage Percentage", + BarChart( + title="Memory Usage per Task(%)", dataSource=DATASOURCE, targets=[ Target( - expr='(100 * max(container_memory_rss{image!=""} * on(pod) group_left(label_execution_id, label_task_name, label_node_id, label_workflow_name) kube_pod_labels{label_execution_id !="",namespace=~"$project-$domain",label_workflow_name=~"$workflow"} * on(pod) group_left(phase) kube_pod_status_phase{phase="Running"}) by (namespace, pod, label_execution_id, label_task_name, label_node_id, label_workflow_name) / max(kube_pod_container_resource_limits_memory_bytes{container!=""} * on(pod) group_left(label_execution_id, label_task_name, label_node_id, label_workflow_name) kube_pod_labels{label_execution_id !=""} * on(pod) group_left(phase) kube_pod_status_phase{phase="Running"}) by (namespace, pod, label_execution_id, label_task_name, label_node_id, label_workflow_name)) > 0', + expr='(100 * (max(container_memory_working_set_bytes{container!=""} * on(pod) group_left(label_task_name, label_node_id, label_workflow_name) kube_pod_labels{namespace=~"$project-$domain",label_workflow_name=~"$workflow"} * on(pod) group_left(phase) kube_pod_status_phase{phase="Running"}) by (namespace, pod, label_task_name, label_node_id, label_workflow_name) / max(cluster:namespace:pod_memory:active:kube_pod_container_resource_limits{container!=""} * on(pod) group_left(label_task_name, label_node_id, label_workflow_name) kube_pod_labels * on(pod) group_left(phase) kube_pod_status_phase{phase="Running"}) by (namespace, pod, label_task_name, label_node_id, label_workflow_name))) > 0', refId='A', ), ], - yAxes=single_y_axis(format=SHORT_FORMAT), + showValue='true', ), - Graph( - title="CPU Usage Percentage", + BarChart( + title="CPU Usage per Task(%)", dataSource=DATASOURCE, targets=[ Target( - expr='(100* sum(rate(container_cpu_usage_seconds_total{image!=""}[2m]) * on(pod) group_left(label_execution_id, label_task_name, label_node_id, label_workflow_name) kube_pod_labels{label_execution_id !="",namespace=~"$project-$domain",label_workflow_name=~"$workflow"} * on(pod) group_left(phase) kube_pod_status_phase{phase="Running"}) by (namespace, pod, label_execution_id, label_task_name, label_node_id, label_workflow_name) / sum(kube_pod_container_resource_limits_cpu_cores{container!=""} * on(pod) group_left(label_execution_id, label_task_name, label_node_id, label_workflow_name) kube_pod_labels{label_execution_id !=""} * on(pod) group_left(phase) kube_pod_status_phase{phase="Running"}) by (namespace, pod, label_execution_id, label_task_name, label_node_id, label_workflow_name)) > 0', + expr='(100 * (sum(rate(container_cpu_usage_seconds_total{image!=""}[2m]) * on(pod) group_left(label_task_name, label_node_id, label_workflow_name) kube_pod_labels{namespace=~"$project-$domain",label_workflow_name=~"$workflow"} * on(pod) group_left(phase) kube_pod_status_phase{phase="Running"}) by (namespace, pod, label_task_name, label_node_id, label_workflow_name) / sum(cluster:namespace:pod_cpu:active:kube_pod_container_resource_limits{container!=""} * on(pod) group_left(label_task_name, label_node_id, label_workflow_name) kube_pod_labels * on(pod) group_left(phase) kube_pod_status_phase{phase="Running"}) by (namespace, pod, label_task_name, label_node_id, label_workflow_name))) > 0', refId='A', ), ], - yAxes=single_y_axis(format=SHORT_FORMAT), + showValue='true', ), ]) @staticmethod def errors(collapse: bool) -> Row: return Row( - title="Error (System vs user)", + title="Error (System vs User)", collapse=collapse, panels=[ Graph( - title="User errors", + title="User error rate", dataSource=DATASOURCE, targets=[ Target( @@ -216,7 +193,7 @@ def errors(collapse: bool) -> Row: yAxes=single_y_axis(format=SHORT_FORMAT), ), Graph( - title="System errors", + title="System error rate", dataSource=DATASOURCE, targets=[ Target( @@ -280,7 +257,7 @@ def create_all_rows(interval: int) -> typing.List[Row]: domain_template, wf_template, ]), - description="Flyte User Dashboard. This is great to get a birds-eye and drill down view of executions in your Flyte cluster. Useful for the user.", + description="Flyte User Dashboard. It's designed to give an overview of execution status and resource consumption.", ).auto_panel_ids() if __name__ == "__main__":