Skip to content

Commit

Permalink
add e2e-test log
Browse files Browse the repository at this point in the history
  • Loading branch information
zackyoungh committed Dec 12, 2024
1 parent 21b0900 commit ce00e76
Show file tree
Hide file tree
Showing 4 changed files with 64 additions and 13 deletions.
8 changes: 4 additions & 4 deletions e2e_test/docker-compose-env/flink/conf/flink-conf.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,14 @@ jobmanager.rpc.port: 6123
#
# Note this accounts for all memory usage within the JobManager process, including JVM metaspace and other overhead.

jobmanager.memory.process.size: 1600m
jobmanager.memory.process.size: 16000m


# The total process memory size for the TaskManager.
#
# Note this accounts for all memory usage within the TaskManager process, including JVM metaspace and other overhead.

taskmanager.memory.process.size: 1728m
taskmanager.memory.process.size: 17280m

# To exclude JVM metaspace and overhead, please, use total Flink memory size instead of 'taskmanager.memory.process.size'.
# It is not recommended to set both 'taskmanager.memory.process.size' and Flink memory.
Expand All @@ -55,7 +55,7 @@ taskmanager.memory.process.size: 1728m

# The number of task slots that each TaskManager offers. Each slot runs one parallel pipeline.

taskmanager.numberOfTaskSlots: 3
taskmanager.numberOfTaskSlots: 20

# taskmanager.cpu.cores: 1
# taskmanager.memory.task.heap.size: 1200m
Expand Down Expand Up @@ -157,7 +157,7 @@ jobmanager.execution.failover-strategy: region
# The port to which the REST client connects to. If rest.bind-port has
# not been specified, then the server will bind to this port as well.
#
#rest.port: 8282
rest.port: 8282

# The address to which the REST client will connect to
#
Expand Down
26 changes: 26 additions & 0 deletions e2e_test/tools/env.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
from requests import Session
import urllib.parse as urlparse

from login import url, assertRespOk


def addCluster(session: Session) -> int:
"""
en: Add a cluster instance
zh: 添加一个集群实例
:param session: requests.Session
:return: clusterId
"""
name = 'flink-standalone'
add_cluster_resp = session.put(url("api/cluster"), json={
"name": name,
"type": "standalone",
"hosts": "jobmanager:8282"
})
assertRespOk(add_cluster_resp, "Add cluster")
get_data_list = session.get(url(f"api/cluster/list?searchKeyWord={urlparse.quote(name)}&isAutoCreate=false"))
assertRespOk(get_data_list, "Get cluster list")
for data in get_data_list.json()['data']:
if data['name'] == name:
return data['id']
raise Exception(f"Cluster {name} not found")
10 changes: 7 additions & 3 deletions e2e_test/tools/main.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
import requests

from env import addCluster
from login import login
from task import addCatalogue, runFlinkLocalTask
from task import addCatalogue, runFlinkLocalTask, runFlinkSessionTask

if __name__ == '__main__':
session = requests.session()
login(session)
clusterId = addCluster(session)
catalogue = addCatalogue(session, "flink-sql-task")
runFlinkLocalTask(session, catalogue.id, "flink-sql-datagen-test",
"DROP TABLE IF EXISTS source_table3;\r\nCREATE TABLE IF NOT EXISTS source_table3(\r\n--订单id\r\n`order_id` BIGINT,\r\n--产品\r\n\r\n`product` BIGINT,\r\n--金额\r\n`amount` BIGINT,\r\n\r\n--支付时间\r\n`order_time` as CAST(CURRENT_TIMESTAMP AS TIMESTAMP(3)), -- `在这里插入代码片`\r\n--WATERMARK\r\nWATERMARK FOR order_time AS order_time - INTERVAL '2' SECOND\r\n) WITH(\r\n'connector' = 'datagen',\r\n 'rows-per-second' = '1',\r\n 'fields.order_id.min' = '1',\r\n 'fields.order_id.max' = '2',\r\n 'fields.amount.min' = '1',\r\n 'fields.amount.max' = '10',\r\n 'fields.product.min' = '1',\r\n 'fields.product.max' = '2'\r\n);\r\n\r\n-- SELECT * FROM source_table3 LIMIT 10;\r\n\r\nDROP TABLE IF EXISTS sink_table5;\r\nCREATE TABLE IF NOT EXISTS sink_table5(\r\n--产品\r\n`product` BIGINT,\r\n--金额\r\n`amount` BIGINT,\r\n--支付时间\r\n`order_time` TIMESTAMP(3),\r\n--1分钟时间聚合总数\r\n`one_minute_sum` BIGINT\r\n) WITH(\r\n'connector'='print'\r\n);\r\n\r\nINSERT INTO sink_table5\r\nSELECT\r\nproduct,\r\namount,\r\norder_time,\r\nSUM(amount) OVER(\r\nPARTITION BY product\r\nORDER BY order_time\r\n-- 标识统计范围是1个 product 的最近 1 分钟的数据\r\nRANGE BETWEEN INTERVAL '1' MINUTE PRECEDING AND CURRENT ROW\r\n) as one_minute_sum\r\nFROM source_table3;")
sql = "DROP TABLE IF EXISTS source_table3;\r\nCREATE TABLE IF NOT EXISTS source_table3(\r\n--订单id\r\n`order_id` BIGINT,\r\n--产品\r\n\r\n`product` BIGINT,\r\n--金额\r\n`amount` BIGINT,\r\n\r\n--支付时间\r\n`order_time` as CAST(CURRENT_TIMESTAMP AS TIMESTAMP(3)), -- `在这里插入代码片`\r\n--WATERMARK\r\nWATERMARK FOR order_time AS order_time - INTERVAL '2' SECOND\r\n) WITH(\r\n'connector' = 'datagen',\r\n 'rows-per-second' = '1',\r\n 'fields.order_id.min' = '1',\r\n 'fields.order_id.max' = '2',\r\n 'fields.amount.min' = '1',\r\n 'fields.amount.max' = '10',\r\n 'fields.product.min' = '1',\r\n 'fields.product.max' = '2'\r\n);\r\n\r\n-- SELECT * FROM source_table3 LIMIT 10;\r\n\r\nDROP TABLE IF EXISTS sink_table5;\r\nCREATE TABLE IF NOT EXISTS sink_table5(\r\n--产品\r\n`product` BIGINT,\r\n--金额\r\n`amount` BIGINT,\r\n--支付时间\r\n`order_time` TIMESTAMP(3),\r\n--1分钟时间聚合总数\r\n`one_minute_sum` BIGINT\r\n) WITH(\r\n'connector'='print'\r\n);\r\n\r\nINSERT INTO sink_table5\r\nSELECT\r\nproduct,\r\namount,\r\norder_time,\r\nSUM(amount) OVER(\r\nPARTITION BY product\r\nORDER BY order_time\r\n-- 标识统计范围是1个 product 的最近 1 分钟的数据\r\nRANGE BETWEEN INTERVAL '1' MINUTE PRECEDING AND CURRENT ROW\r\n) as one_minute_sum\r\nFROM source_table3;"
runFlinkLocalTask(session, catalogue.id, "flink-sql-datagen-test",sql )
runFlinkSessionTask(session, catalogue.id, clusterId, "flink-sql-datagen-test-session", sql)
33 changes: 27 additions & 6 deletions e2e_test/tools/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ def addCatalogue(session: requests.Session, name: str, isLeaf: bool = False, par
return getTask(data_list, name)


def addTask(session: requests.Session, name: str, parent_id: int = 0, type: str = "FlinkSql",
statement: str = "") -> CatalogueTree:
def addTask(session: requests.Session, name: str, parent_id: int = 0, dialect: str = "FlinkSql",
statement: str = "", runtModel: str = "local", clusterId: int = -1) -> CatalogueTree:
"""
en: Add a task
zh: 添加一个任务
Expand All @@ -60,18 +60,19 @@ def addTask(session: requests.Session, name: str, parent_id: int = 0, type: str
"""
add_parent_dir_resp = session.put(url("api/catalogue/saveOrUpdateCatalogueAndTask"), json={
"name": name,
"type": type,
"type": dialect,
"firstLevelOwner": 1,
"task": {
"savePointStrategy": 0,
"parallelism": 1,
"envId": -1,
"step": 1,
"alertGroupId": -1,
"type": "local",
"dialect": type,
"type": runtModel,
"dialect": dialect,
"statement": statement,
"firstLevelOwner": 1
"firstLevelOwner": 1,
"clusterId":clusterId
},
"isLeaf": False,
"parentId": parent_id
Expand Down Expand Up @@ -129,3 +130,23 @@ def runFlinkLocalTask(session: requests.Session, parentId: int, name: str, state
sleep(waitTime)
status = getFlinkTaskStatus(session, jobInstanceId)
assertFlinkTaskIsRunning(status, name)


def runFlinkSessionTask(session: requests.Session, parentId: int,clusterId:int, name: str, statement: str,
waitTime: int = 10) -> None:
"""
en: Run a FlinkLocal task
zh: 运行一个 FlinkLocal任务
:param session: requests.Session
:param parentId: dir id
:param name: task name
:param statement: statement
:param waitTime: zh:等待时间
"""
log.info(
f"======================\nA Session Flink task is currently executed,name: {name}, statement: \n{statement}\n ======================")
task = addTask(session, name, parentId, "FlinkSql", statement,"standalone",clusterId)
jobInstanceId = runTask(session, task.taskId)
sleep(waitTime)
status = getFlinkTaskStatus(session, jobInstanceId)
assertFlinkTaskIsRunning(status, name)

0 comments on commit ce00e76

Please sign in to comment.