Skip to content

Commit

Permalink
refactoring
Browse files Browse the repository at this point in the history
Signed-off-by: Moritz Wiesinger <[email protected]>
  • Loading branch information
mowies committed Sep 10, 2024
1 parent 125d3e9 commit f2f8fc5
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 49 deletions.
2 changes: 1 addition & 1 deletion internal/testbed/load/tests/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ func TestMetric10kDPS(t *testing.T) {
"batch": `
batch:
send_batch_max_size: 1000
timeout: 20s
timeout: 30s
send_batch_size : 800
`,
}
Expand Down
70 changes: 22 additions & 48 deletions internal/testbed/load/tests/scenarios.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,51 +126,12 @@ func Scenario10kItemsPerSecond(
processors map[string]string,
extensions map[string]string,
) {
resultDir, err := filepath.Abs(path.Join("results", t.Name()))
require.NoError(t, err)

options := testbed.LoadOptions{
loadOptions := testbed.LoadOptions{
DataItemsPerSecond: 10_000,
ItemsPerBatch: 100,
Parallel: 1,
}

agentProc := testbed.NewChildProcessCollector(testbed.WithEnvVar("GOMAXPROCS", "2"))

configStr := createConfigYaml(t, sender, receiver, resultDir, processors, extensions)
configCleanup, err := agentProc.PrepareConfig(configStr)
require.NoError(t, err)
defer configCleanup()

dataProvider := testbed.NewPerfTestDataProvider(options)
tc := testbed.NewTestCase(
t,
dataProvider,
sender,
receiver,
agentProc,
&testbed.PerfTestValidator{},
resultsSummary,
testbed.WithResourceLimits(resourceSpec),
)
t.Cleanup(tc.Stop)

tc.StartBackend()
tc.StartAgent()

tc.StartLoad(options)

tc.WaitFor(func() bool { return tc.LoadGenerator.DataItemsSent() > 0 }, "load generator started")

tc.Sleep(tc.Duration)

tc.StopLoad()

tc.WaitForN(func() bool { return tc.LoadGenerator.DataItemsSent() == tc.MockBackend.DataItemsReceived() },
time.Second*30,
"all data items received")

tc.ValidateData()
GenericScenario(t, sender, receiver, resourceSpec, resultsSummary, processors, extensions, loadOptions)
}

// Scenario100kItemsPerSecond runs 10k data items/sec test using specified sender and receiver protocols.
Expand All @@ -183,14 +144,26 @@ func Scenario100kItemsPerSecond(
processors map[string]string,
extensions map[string]string,
) {
resultDir, err := filepath.Abs(path.Join("results", t.Name()))
require.NoError(t, err)

options := testbed.LoadOptions{
loadOptions := testbed.LoadOptions{
DataItemsPerSecond: 100_000,
ItemsPerBatch: 100,
Parallel: 1,
}
GenericScenario(t, sender, receiver, resourceSpec, resultsSummary, processors, extensions, loadOptions)
}

func GenericScenario(
t *testing.T,
sender testbed.DataSender,
receiver testbed.DataReceiver,
resourceSpec testbed.ResourceSpec,
resultsSummary testbed.TestResultsSummary,
processors map[string]string,
extensions map[string]string,
loadOptions testbed.LoadOptions,
) {
resultDir, err := filepath.Abs(path.Join("results", t.Name()))
require.NoError(t, err)

agentProc := testbed.NewChildProcessCollector(testbed.WithEnvVar("GOMAXPROCS", "2"))

Expand All @@ -199,7 +172,7 @@ func Scenario100kItemsPerSecond(
require.NoError(t, err)
defer configCleanup()

dataProvider := testbed.NewPerfTestDataProvider(options)
dataProvider := testbed.NewPerfTestDataProvider(loadOptions)
tc := testbed.NewTestCase(
t,
dataProvider,
Expand All @@ -215,15 +188,16 @@ func Scenario100kItemsPerSecond(
tc.StartBackend()
tc.StartAgent()

tc.StartLoad(options)
tc.StartLoad(loadOptions)

tc.WaitFor(func() bool { return tc.LoadGenerator.DataItemsSent() > 0 }, "load generator started")

tc.Sleep(tc.Duration)

tc.StopLoad()

tc.WaitFor(func() bool { return tc.LoadGenerator.DataItemsSent() == tc.MockBackend.DataItemsReceived() },
tc.WaitForN(func() bool { return tc.LoadGenerator.DataItemsSent() == tc.MockBackend.DataItemsReceived() },
time.Second*30,
"all data items received")

tc.ValidateData()
Expand Down

0 comments on commit f2f8fc5

Please sign in to comment.