Skip to content

Commit

Permalink
Merge pull request #5 from yubing744/owen/bugfix-close-position
Browse files Browse the repository at this point in the history
Owen/bugfix close position
  • Loading branch information
yubing744 authored Apr 14, 2024
2 parents e7cfb6a + c2f4f19 commit 5def674
Show file tree
Hide file tree
Showing 7 changed files with 775 additions and 11 deletions.
182 changes: 180 additions & 2 deletions pkg/exchange/okex/exchange.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ var (
batchCancelOrderLimiter = rate.NewLimiter(rate.Every(33*time.Millisecond), 1)
// Rate Limit: 60 requests per 2 seconds, Rate limit rule: UserID
queryOpenOrderLimiter = rate.NewLimiter(rate.Every(33*time.Millisecond), 1)
// Rate Limit: 60 requests per 2 seconds, Rate limit rule: UserID
queryAlgoOpenOrderLimiter = rate.NewLimiter(rate.Every(33*time.Millisecond), 1)
// Rate Limit: 20 requests per 2 seconds, Rate limit rule: UserID
queryClosedOrderRateLimiter = rate.NewLimiter(rate.Every(100*time.Millisecond), 1)
// Rate Limit: 10 requests per 2 seconds, Rate limit rule: UserID
Expand Down Expand Up @@ -325,6 +327,37 @@ func (e *Exchange) submitSpotOrder(ctx context.Context, order types.SubmitOrder)

func (e *Exchange) submitMarginOrder(ctx context.Context, order types.SubmitOrder) (*types.Order, error) {
if order.ClosePosition {
orders, err1 := e.QueryAlgoOpenOrders(ctx, order.Symbol)
ocoOrders, err2 := e.QueryOCOAlgoOpenOrders(ctx, order.Symbol)

log.WithField("orders", orders).
WithField("ocoOrders", ocoOrders).
WithField("error1", err1).
WithField("error2", err2).
Info("before_ClosePosition_QueryOpenOrders_result")

if len(orders)+len(ocoOrders) > 0 {
allOrders := make([]types.Order, 0)

if len(orders) > 0 {
allOrders = append(allOrders, orders...)
}

if len(ocoOrders) > 0 {
allOrders = append(allOrders, ocoOrders...)
}

err := e.CancelAlgoOrders(ctx, allOrders...)
if err != nil {
log.WithField("Symbol", order.Symbol).
WithError(err).
Error("before_ClosePosition_CancelOrders_fail")
}

log.WithField("Symbol", order.Symbol).
Info("before_ClosePosition_CancelOrders_ok")
}

return e.submitClosePositionOrder(ctx, order)
}

Expand Down Expand Up @@ -421,14 +454,19 @@ func (e *Exchange) submitClosePositionOrder(ctx context.Context, order types.Sub
}

orderReq.Tag(order.Tag)
orderReq.AutoCxl(true)

params, _ := orderReq.GetParameters()
log.WithField("params", params).
Info("order_req_start")

orderHead, err := orderReq.Do(ctx)
if err != nil {
return nil, err
}

log.WithField("orderHead", orderHead).
Debug("order req result")
Info("order_req_end")

return &types.Order{
SubmitOrder: order,
Expand All @@ -455,9 +493,15 @@ func (e *Exchange) QueryOpenOrders(ctx context.Context, symbol string) (orders [
return nil, fmt.Errorf("query open orders rate limiter wait error: %w", err)
}

req := e.client.NewGetOpenOrdersRequest().
req := e.client.NewGetOpenOrdersRequest()
if e.IsMargin {
req = e.client.NewGetMarginOpenOrdersRequest()
}

req.
InstrumentID(instrumentID).
After(strconv.FormatInt(nextCursor, 10))

openOrders, err := req.Do(ctx)
if err != nil {
return nil, fmt.Errorf("failed to query open orders: %w", err)
Expand Down Expand Up @@ -487,6 +531,104 @@ func (e *Exchange) QueryOpenOrders(ctx context.Context, symbol string) (orders [
return orders, err
}

func (e *Exchange) QueryAlgoOpenOrders(ctx context.Context, symbol string) (orders []types.Order, err error) {
instrumentID := toLocalSymbol(symbol)

for {
if err := queryAlgoOpenOrderLimiter.Wait(ctx); err != nil {
return nil, fmt.Errorf("query open orders rate limiter wait error: %w", err)
}

req := e.client.NewGetAlgoOrdersRequest()
req.
InstrumentID(instrumentID)

params, _ := req.GetQueryParameters()
log.WithField("symbol", symbol).
WithField("params", params).
Info("QueryAlgoOpenOrders_start")

openOrders, err := req.Do(ctx)
if err != nil {
return nil, fmt.Errorf("failed to query open orders: %w", err)
}

log.WithField("symbol", symbol).
WithField("openOrders", openOrders).
Info("QueryAlgoOpenOrders_result")

for _, o := range openOrders {
orders = append(orders, types.Order{
SubmitOrder: types.SubmitOrder{
Symbol: symbol,
},
UUID: o.AlgoID,
})
}

orderLen := len(openOrders)
// a defensive programming to ensure the length of order response is expected.
if orderLen > defaultQueryLimit {
return nil, fmt.Errorf("unexpected open orders length %d", orderLen)
}

if orderLen < defaultQueryLimit {
break
}
}

return orders, err
}

func (e *Exchange) QueryOCOAlgoOpenOrders(ctx context.Context, symbol string) (orders []types.Order, err error) {
instrumentID := toLocalSymbol(symbol)

for {
if err := queryAlgoOpenOrderLimiter.Wait(ctx); err != nil {
return nil, fmt.Errorf("query open orders rate limiter wait error: %w", err)
}

req := e.client.NewGetOCOAlgoOrdersRequest()
req.
InstrumentID(instrumentID)

params, _ := req.GetQueryParameters()
log.WithField("symbol", symbol).
WithField("params", params).
Info("QueryOCOAlgoOpenOrders_start")

openOrders, err := req.Do(ctx)
if err != nil {
return nil, fmt.Errorf("failed to query open orders: %w", err)
}

log.WithField("symbol", symbol).
WithField("openOrders", openOrders).
Info("QueryOCOAlgoOpenOrders_result")

for _, o := range openOrders {
orders = append(orders, types.Order{
SubmitOrder: types.SubmitOrder{
Symbol: symbol,
},
UUID: o.AlgoID,
})
}

orderLen := len(openOrders)
// a defensive programming to ensure the length of order response is expected.
if orderLen > defaultQueryLimit {
return nil, fmt.Errorf("unexpected open orders length %d", orderLen)
}

if orderLen < defaultQueryLimit {
break
}
}

return orders, err
}

func (e *Exchange) CancelOrders(ctx context.Context, orders ...types.Order) error {
if len(orders) == 0 {
return nil
Expand Down Expand Up @@ -519,6 +661,42 @@ func (e *Exchange) CancelOrders(ctx context.Context, orders ...types.Order) erro
return err
}

func (e *Exchange) CancelAlgoOrders(ctx context.Context, orders ...types.Order) error {
if len(orders) == 0 {
return nil
}

var reqs []*okexapi.CancelAlgoOrder
for _, order := range orders {
if len(order.Symbol) == 0 {
return ErrSymbolRequired
}

reqs = append(reqs, &okexapi.CancelAlgoOrder{
InstrumentID: toLocalSymbol(order.Symbol),
AlgoOrderID: order.UUID,
})
}

if err := batchCancelOrderLimiter.Wait(ctx); err != nil {
return fmt.Errorf("batch cancel order rate limiter wait error: %w", err)
}
batchReq := e.client.NewCancelAlgoOrderRequest()
batchReq.SetPayload(reqs)

params, _ := batchReq.GetParameters()
log.WithField("params", params).
Info("CancelAlgoOrders_start")

resp, err := batchReq.Do(ctx)

log.WithField("resp", resp).
WithField("error", err).
Info("CancelAlgoOrders_result")

return err
}

func (e *Exchange) NewStream() types.Stream {
return NewStream(e.client, e)
}
Expand Down
163 changes: 163 additions & 0 deletions pkg/exchange/okex/okexapi/cancel_algo_order_request.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
package okexapi

import (
"context"
"encoding/json"
"fmt"
"net/url"
"reflect"
"regexp"

"github.com/c9s/requestgen"
)

type CancelAlgoOrderResponse struct {
AlgoID string `json:"algoId"`
SCode string `json:"sCode"`
SMsg string `json:"sMsg"`
}

type CancelAlgoOrder struct {
InstrumentID string `json:"instId"`
AlgoOrderID string `json:"algoId"`
}

type CancelAlgoOrderRequest struct {
client requestgen.AuthenticatedAPIClient

Payload []*CancelAlgoOrder
}

func (c *RestClient) NewCancelAlgoOrderRequest() *CancelAlgoOrderRequest {
return &CancelAlgoOrderRequest{
client: c,
}
}

func (c *CancelAlgoOrderRequest) SetPayload(CancelAlgoOrders []*CancelAlgoOrder) *CancelAlgoOrderRequest {
c.Payload = CancelAlgoOrders
return c
}

// GetQueryParameters builds and checks the query parameters and returns url.Values
func (c *CancelAlgoOrderRequest) GetQueryParameters() (url.Values, error) {
var params = map[string]interface{}{}

query := url.Values{}
for _k, _v := range params {
query.Add(_k, fmt.Sprintf("%v", _v))
}

return query, nil
}

// GetParameters builds and checks the parameters and return the result in a map object
func (c *CancelAlgoOrderRequest) GetParameters() (interface{}, error) {
return c.Payload, nil
}

// GetParametersJSON converts the parameters from GetParameters into the JSON format
func (c *CancelAlgoOrderRequest) GetParametersJSON() ([]byte, error) {
params, err := c.GetParameters()
if err != nil {
return nil, err
}

return json.Marshal(params)
}

// GetSlugParameters builds and checks the slug parameters and return the result in a map object
func (c *CancelAlgoOrderRequest) GetSlugParameters() (map[string]interface{}, error) {
var params = map[string]interface{}{}

return params, nil
}

func (c *CancelAlgoOrderRequest) applySlugsToUrl(url string, slugs map[string]string) string {
for _k, _v := range slugs {
needleRE := regexp.MustCompile(":" + _k + "\\b")
url = needleRE.ReplaceAllString(url, _v)
}

return url
}

func (c *CancelAlgoOrderRequest) iterateSlice(slice interface{}, _f func(it interface{})) {
sliceValue := reflect.ValueOf(slice)
for _i := 0; _i < sliceValue.Len(); _i++ {
it := sliceValue.Index(_i).Interface()
_f(it)
}
}

func (c *CancelAlgoOrderRequest) isVarSlice(_v interface{}) bool {
rt := reflect.TypeOf(_v)
switch rt.Kind() {
case reflect.Slice:
return true
}
return false
}

func (c *CancelAlgoOrderRequest) GetSlugsMap() (map[string]string, error) {
slugs := map[string]string{}
params, err := c.GetSlugParameters()
if err != nil {
return slugs, nil
}

for _k, _v := range params {
slugs[_k] = fmt.Sprintf("%v", _v)
}

return slugs, nil
}

// GetPath returns the request path of the API
func (c *CancelAlgoOrderRequest) GetPath() string {
return "/api/v5/trade/cancel-algos"
}

// Do generates the request object and send the request object to the API endpoint
func (c *CancelAlgoOrderRequest) Do(ctx context.Context) ([]CancelAlgoOrderResponse, error) {

params, err := c.GetParameters()
if err != nil {
return nil, err
}
query := url.Values{}

var apiURL string

apiURL = c.GetPath()

req, err := c.client.NewAuthenticatedRequest(ctx, "POST", apiURL, query, params)
if err != nil {
return nil, err
}

response, err := c.client.SendRequest(req)
if err != nil {
return nil, err
}

var apiResponse APIResponse
if err := response.DecodeJSON(&apiResponse); err != nil {
return nil, err
}

type responseValidator interface {
Validate() error
}
validator, ok := interface{}(apiResponse).(responseValidator)
if ok {
if err := validator.Validate(); err != nil {
return nil, err
}
}
var data []CancelAlgoOrderResponse
if err := json.Unmarshal(apiResponse.Data, &data); err != nil {
return nil, err
}
return data, nil
}
Loading

0 comments on commit 5def674

Please sign in to comment.