Skip to content

Commit

Permalink
add write stress
Browse files Browse the repository at this point in the history
Signed-off-by: Liqi Geng <[email protected]>
  • Loading branch information
gengliqi committed May 13, 2020
1 parent 2725790 commit 31a04ec
Show file tree
Hide file tree
Showing 3 changed files with 272 additions and 1 deletion.
5 changes: 4 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ default: tidy fmt lint build

build: tidb pocket tpcc ledger txn-rand-pessimistic on-dup sqllogic block-writer \
region-available deadlock-detector crud bank bank2 abtest cdc-pocket tiflash-pocket vbank \
read-stress rawkv-linearizability tiflash-abtest tiflash-cdc follower-read
read-stress rawkv-linearizability tiflash-abtest tiflash-cdc follower-read write-stress

tidb:
$(GOBUILD) $(GOMOD) -o bin/chaos-tidb cmd/tidb/main.go
Expand Down Expand Up @@ -97,6 +97,9 @@ tiflash-cdc:
follower-read:
$(GOBUILD) $(GOMOD) -o bin/follower-read cmd/follower-read/*.go

write-stress:
$(GOBUILD) $(GOMOD) -o bin/write-stress cmd/write-stress/*.go

fmt: groupimports
go fmt ./...

Expand Down
61 changes: 61 additions & 0 deletions cmd/write-stress/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
// Copyright 2020 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package main

import (
"context"
"flag"

// use mysql
_ "github.com/go-sql-driver/mysql"

test_infra "github.com/pingcap/tipocket/pkg/test-infra"
writestress "github.com/pingcap/tipocket/tests/write-stress"

"github.com/pingcap/tipocket/cmd/util"
"github.com/pingcap/tipocket/pkg/cluster"
"github.com/pingcap/tipocket/pkg/control"
"github.com/pingcap/tipocket/pkg/test-infra/fixture"
)

var (
dataNum = flag.Int("dataNum", 2000, "the number of data(the unit is 10 thoudstand)")
concurrency = flag.Int("concurrency", 400, "concurrency of worker")
batch = flag.Int("batch", 100, "batch of insert sql")
)

func main() {
flag.Parse()
cfg := control.Config{
Mode: control.ModeSelfScheduled,
ClientCount: 1,
RunTime: fixture.Context.RunTime,
RunRound: 1,
}
kvs := []string{"127.0.0.1:20160", "127.0.0.1:20162", "127.0.0.1:20161"}
suit := util.Suit{
Config: &cfg,
//Provisioner: cluster.NewK8sProvisioner(),
Provisioner: cluster.NewLocalClusterProvisioner([]string{"127.0.0.1:4000"}, []string{"127.0.0.1:2379"}, kvs),
ClientCreator: writestress.ClientCreator{Cfg: &writestress.Config{
DataNum: *dataNum,
Concurrency: *concurrency,
Batch: *batch,
}},
NemesisGens: util.ParseNemesisGenerators(fixture.Context.Nemesis),
ClusterDefs: test_infra.NewDefaultCluster(fixture.Context.Namespace, fixture.Context.Namespace,
fixture.Context.TiDBClusterConfig),
}
suit.Run(context.Background())
}
207 changes: 207 additions & 0 deletions tests/write-stress/write_stress.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,207 @@
package writestress

import (
"context"
"database/sql"
"encoding/binary"
"fmt"
"math/rand"
"sync"
"time"

"github.com/juju/errors"
"github.com/ngaut/log"

"github.com/pingcap/tipocket/pkg/cluster/types"
"github.com/pingcap/tipocket/pkg/core"
"github.com/pingcap/tipocket/util"
)

// Table schema comes from the bank of pufa `tmp_jieb_instmnt_daily`
// CREATE TABLE `tmp_jieb_instmnt_daily` (
// `ID` bigint(20) DEFAULT NULL COMMENT '主键ID',
// `TABLE_ID` int(11) NOT NULL COMMENT '分库ID',
// `FILE_DATE` char(8) NOT NULL COMMENT '文件日期',
// `CONTRACT_NO` varchar(128) NOT NULL COMMENT '借据号',
// `SETTLE_DATE` char(8) NOT NULL COMMENT '减免会计日期',
// `TERM_NO` int(11) NOT NULL COMMENT '期次号',
//
// `INPT_DATE` char(8) DEFAULT NULL COMMENT '录入日期',
// `INPT_TIME` varchar(20) DEFAULT NULL COMMENT '录入时间',
// `RCRD_ST_CODE` varchar(1) DEFAULT NULL COMMENT '记录状态代码',
// UNIQUE KEY `TMP_JIEB_INSTMNT_DAILY_IDX1` (`CONTRACT_NO`,`TERM_NO`),
// KEY `TMP_JIEB_INSTMNT_DAILY_IDX2` (`TABLE_ID`,`CONTRACT_NO`)
// ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin/*!90000 SHARD_ROW_ID_BITS=5 PRE_SPLIT_REGIONS=5 */ COMMENT='借呗日终(分期)信息临时表';
const (
stmtDrop = `DROP TABLE IF EXISTS write_stress`
stmtCreate = `
CREATE TABLE write_stress (
TABLE_ID int(11) NOT NULL COMMENT '分库ID',
CONTRACT_NO varchar(128) NOT NULL COMMENT '借据号',
TERM_NO int(11) NOT NULL COMMENT '期次号',
NOUSE char(60) NOT NULL COMMENT '填充位',
UNIQUE KEY TMP_JIEB_INSTMNT_DAILY_IDX1 (CONTRACT_NO, TERM_NO),
KEY TMP_JIEB_INSTMNT_DAILY_IDX2 (TABLE_ID, CONTRACT_NO)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;
`
)

// Config is for writestressClient
type Config struct {
DataNum int `toml:"dataNum"`
Concurrency int `toml:"concurrency"`
Batch int `toml:"batch"`
}

// ClientCreator creates writestressClient
type ClientCreator struct {
Cfg *Config
}

// Create ...
func (l ClientCreator) Create(node types.ClientNode) core.Client {
return &writestressClient{
Config: l.Cfg,
}
}

// ledgerClient simulates a complete record of financial transactions over the
// life of a bank (or other company).
type writestressClient struct {
*Config
db *sql.DB
contract_ids [][]byte
}

func (c *writestressClient) SetUp(ctx context.Context, nodes []types.ClientNode, idx int) error {
if idx != 0 {
return nil
}

var err error
node := nodes[idx]
dsn := fmt.Sprintf("root@tcp(%s:%d)/test", node.IP, node.Port)

log.Infof("start to init...")
c.db, err = util.OpenDB(dsn, c.Concurrency)
if err != nil {
return err
}
defer func() {
log.Infof("init end...")
}()

if _, err := c.db.Exec(stmtDrop); err != nil {
log.Fatalf("execute statement %s error %v", stmtDrop, err)
}

if _, err := c.db.Exec(stmtCreate); err != nil {
log.Fatalf("execute statement %s error %v", stmtCreate, err)
}

return nil
}

func (c *writestressClient) TearDown(ctx context.Context, nodes []types.ClientNode, idx int) error {
return nil
}

func (c *writestressClient) Invoke(ctx context.Context, node types.ClientNode, r interface{}) core.UnknownResponse {
panic("implement me")
}

func (c *writestressClient) NextRequest() interface{} {
panic("implement me")
}

func (c *writestressClient) DumpState(ctx context.Context) (interface{}, error) {
panic("implement me")
}

func (c *writestressClient) Start(ctx context.Context, cfg interface{}, clientNodes []types.ClientNode) error {
log.Infof("start to test...")
defer func() {
log.Infof("test end...")
}()
c.contract_ids = make([][]byte, c.DataNum)
timeUnix := time.Now().Unix()
count := uint8(0)
b := make([]byte, 8)
for i := 0; i < c.DataNum; i++ {
// "abcd" + timestamp(8 bit) + count(8 bit)
c.contract_ids[i] = append(c.contract_ids[i], []byte("abcd")...)
binary.LittleEndian.PutUint64(b, uint64(timeUnix))
c.contract_ids[i] = append(c.contract_ids[i], b...)
binary.LittleEndian.PutUint64(b, uint64(count))
c.contract_ids[i] = append(c.contract_ids[i], b...)

count++
if count == 0 {
timeUnix++
}
}

var wg sync.WaitGroup
for i := 0; i < c.Concurrency; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
for {
select {
case <-ctx.Done():
return
default:
}
if err := c.ExecuteInsert(c.db, i); err != nil {
log.Fatalf("exec failed %v", err)
}
}
}(i)
}

wg.Wait()
return nil
}

// ExecuteInsert is run case
func (c *writestressClient) ExecuteInsert(db *sql.DB, pos int) error {
num := c.Config.DataNum * 10000 / c.Config.Concurrency

tx, err := db.Begin()
if err != nil {
return errors.Trace(err)
}
defer tx.Rollback()
str := make([]byte, 50)
rnd := rand.New(rand.NewSource(time.Now().Unix()))
for i := 0; i < num/c.Config.Batch; i++ {
n := num*pos + i*c.Config.Batch
if n >= c.DataNum {
break
}
query := fmt.Sprintf(`INSERT INTO write_stress (TABLE_ID, CONTRACT_NO, TERM_NO, NOUSE) VALUES `)
for j := 0; j < c.Config.Batch; j++ {
n := num*pos + i*c.Config.Batch + j
if n >= c.DataNum {
break
}
contract_id := c.contract_ids[n]
util.RandString(str, rnd)
if j != 0 {
query += ","
}
query += fmt.Sprintf(`(%v, %v, %v, %v)`, rnd.Uint32()%960+1, string(contract_id[:]), rnd.Uint32()%36+1, string(str[:]))
}
fmt.Println(query)
if _, err := tx.Exec(query); err != nil {
return errors.Trace(err)
}
}

if err := tx.Commit(); err != nil {
return errors.Trace(err)
}

return nil
}

0 comments on commit 31a04ec

Please sign in to comment.