From 3f11d0e37f8792d14a4c409e3d9de3f1263a1562 Mon Sep 17 00:00:00 2001 From: Stephen Birch Date: Mon, 29 Nov 2021 16:53:25 +0000 Subject: [PATCH 1/9] Temp attempt at AES encryption --- lib/svrquery/protocol/titanfall/query.go | 88 ++++++++++++++++++- lib/svrquery/protocol/titanfall/query_test.go | 52 ++++++++++- 2 files changed, 135 insertions(+), 5 deletions(-) diff --git a/lib/svrquery/protocol/titanfall/query.go b/lib/svrquery/protocol/titanfall/query.go index 60e877b..63596d9 100644 --- a/lib/svrquery/protocol/titanfall/query.go +++ b/lib/svrquery/protocol/titanfall/query.go @@ -1,18 +1,27 @@ package titanfall import ( + "crypto/aes" + "crypto/cipher" + "crypto/rand" + "encoding/base64" "encoding/binary" + "encoding/hex" "fmt" - "github.com/multiplay/go-svrquery/lib/svrquery/common" "github.com/multiplay/go-svrquery/lib/svrquery/protocol" "github.com/netdata/go-orchestrator/module" + "io" + "os" ) var ( // minLength is the smallest packet we can expect. minLength = 26 + nonceSize = 12 + tagSize = 16 + packetSize = 1200 ) type queryer struct { @@ -29,6 +38,69 @@ func newQueryer(version byte) func(c protocol.Client) protocol.Queryer { } } +func encrypt(b []byte) ([]byte, error) { + key := os.Getenv("AES_KEY") + if key == "" { + return nil, fmt.Errorf("no aes key found. (Is AES_KEY env var set?)") + } + + keyBytes, err := base64.StdEncoding.DecodeString(key) + if err != nil { + return nil, err + } + + c, err := aes.NewCipher([]byte(hex.EncodeToString(keyBytes))) + if err != nil { + return nil, err + } + + gcm, err := cipher.NewGCM(c) + if err != nil { + fmt.Println(err) + } + + nonce := make([]byte, gcm.NonceSize()) + if _, err = io.ReadFull(rand.Reader, nonce); err != nil { + fmt.Println(err) + } + + return gcm.Seal(nonce, nonce, b, nil), nil +} + +func decrypt(b []byte) ([]byte, error) { + key := os.Getenv("AES_KEY") + if key == "" { + return nil, fmt.Errorf("no aes key found. (Is AES_KEY env var set?)") + } + + keyBytes, err := base64.StdEncoding.DecodeString(key) + if err != nil { + return nil, err + } + + c, err := aes.NewCipher([]byte(hex.EncodeToString(keyBytes))) + if err != nil { + return nil, err + } + + gcm, err := cipher.NewGCM(c) + if err != nil { + fmt.Println(err) + } + + if len(b) < gcm.NonceSize() { + return nil, fmt.Errorf("incoming bytes smaller than %d", gcm.NonceSize()) + } + + nonce, b := b[:gcm.NonceSize()], b[gcm.NonceSize():] + plaintext, err := gcm.Open(nil, nonce, b, nil) + if err != nil { + return nil, err + } + + return plaintext, nil +} + // Query implements protocol.Queryer. func (q *queryer) Query() (protocol.Responser, error) { b := make([]byte, 1200) @@ -42,17 +114,27 @@ func (q *queryer) Query() (protocol.Responser, error) { copy(b[6:], key) } - if _, err := q.c.Write(b); err != nil { + b, err := encrypt(b) + if err != nil { return nil, err } + if _, err := q.c.Write(b); err != nil { + return nil, fmt.Errorf("query write: %w", err) + } + n, err := q.c.Read(b) if err != nil { - return nil, err + return nil, fmt.Errorf("query read: %w", err) } else if n < minLength { return nil, fmt.Errorf("packet too short (len: %d)", n) } + b, err = decrypt(b) + if err != nil { + return nil, err + } + r := common.NewBinaryReader(b[:n], binary.LittleEndian) i := &Info{} diff --git a/lib/svrquery/protocol/titanfall/query_test.go b/lib/svrquery/protocol/titanfall/query_test.go index 1edb3ae..98106d5 100644 --- a/lib/svrquery/protocol/titanfall/query_test.go +++ b/lib/svrquery/protocol/titanfall/query_test.go @@ -1,6 +1,9 @@ package titanfall import ( + "encoding/base64" + "fmt" + "os" "testing" "github.com/multiplay/go-svrquery/lib/svrquery/clienttest" @@ -56,6 +59,10 @@ var ( ) func TestQuery(t *testing.T) { + key := "12345678901234567890123456789012" + + os.Setenv("AES_KEY", base64.StdEncoding.EncodeToString([]byte(key))) + keyed := base keyed.Version = 5 keyed.AverageFrameTime = 1.2347187 @@ -120,11 +127,15 @@ func TestQuery(t *testing.T) { for _, tc := range cases { t.Run(tc.name, func(t *testing.T) { req := clienttest.LoadData(t, testDir, tc.request) + encodedReq, err := encrypt(req) + require.NoError(t, err) resp := clienttest.LoadData(t, testDir, tc.response) + encodedResp, err := encrypt(resp) + require.NoError(t, err) m := &clienttest.MockClient{} - m.On("Write", req).Return(len(req), nil) - m.On("Read", mock.AnythingOfType("[]uint8")).Return(resp, nil) + m.On("Write", mock.AnythingOfType("[]uint8")).Return(len(encodedReq), nil) + m.On("Read", mock.AnythingOfType("[]uint8")).Return(encodedResp, nil) m.On("Key").Return(tc.key) p := newQueryer(tc.version)(m) @@ -136,3 +147,40 @@ func TestQuery(t *testing.T) { }) } } + +func TestEncryptAndDecrypt(t *testing.T) { + //key := "12345678901234567890123456789012" // 32 + key := "1234567890123456" // 16 + + os.Setenv("AES_KEY", base64.StdEncoding.EncodeToString([]byte(key))) + + startText := "Some test text to be encrypted and decrypted" + encoded, err := encrypt([]byte(startText)) + require.NoError(t, err) + + fmt.Println(encoded) + + decoded, err := decrypt(encoded) + require.NoError(t, err) + + fmt.Println(string(decoded)) + + bigText := `Line 1: Some test text to be encrypted and decrypted +Line 2: Some test text to be encrypted and decrypted +Line 3: Some test text to be encrypted and decrypted +Line 4: Some test text to be encrypted and decrypted +Line 5: Some test text to be encrypted and decrypted +Line 6: Some test text to be encrypted and decrypted +Line 7: Some test text to be encrypted and decrypted +Line 8: Some test text to be encrypted and decrypted` + + encoded, err = encrypt([]byte(bigText)) + require.NoError(t, err) + + fmt.Println(encoded) + + decoded, err = decrypt(encoded) + require.NoError(t, err) + + fmt.Println(string(decoded)) +} \ No newline at end of file From ac634797f77fe8d20a0e0bcc5b85ac0732e9a571 Mon Sep 17 00:00:00 2001 From: lwaddicor Date: Mon, 29 Nov 2021 19:18:51 +0000 Subject: [PATCH 2/9] Simplify the read to just anything and rearange the tag/data/nonce --- lib/svrquery/protocol/titanfall/query.go | 207 +++++++++++++---------- 1 file changed, 117 insertions(+), 90 deletions(-) diff --git a/lib/svrquery/protocol/titanfall/query.go b/lib/svrquery/protocol/titanfall/query.go index 63596d9..39a5b94 100644 --- a/lib/svrquery/protocol/titanfall/query.go +++ b/lib/svrquery/protocol/titanfall/query.go @@ -5,22 +5,22 @@ import ( "crypto/cipher" "crypto/rand" "encoding/base64" - "encoding/binary" "encoding/hex" "fmt" + "io" + "os" + "github.com/multiplay/go-svrquery/lib/svrquery/common" "github.com/multiplay/go-svrquery/lib/svrquery/protocol" "github.com/netdata/go-orchestrator/module" - "io" - "os" ) var ( // minLength is the smallest packet we can expect. - minLength = 26 - nonceSize = 12 - tagSize = 16 + minLength = 26 + nonceSize = 12 + tagSize = 16 packetSize = 1200 ) @@ -49,22 +49,40 @@ func encrypt(b []byte) ([]byte, error) { return nil, err } - c, err := aes.NewCipher([]byte(hex.EncodeToString(keyBytes))) + hexKey := []byte(hex.EncodeToString(keyBytes)) + fmt.Println("key length", len(hexKey)) + fmt.Println("key", hexKey) + + c, err := aes.NewCipher(hexKey) if err != nil { return nil, err } - gcm, err := cipher.NewGCM(c) - if err != nil { - fmt.Println(err) - } + gcm, err := cipher.NewGCM(c) + if err != nil { + return nil, err + } + + fmt.Println("Nonce size", gcm.NonceSize()) + nonce := make([]byte, gcm.NonceSize()) + if _, err = io.ReadFull(rand.Reader, nonce); err != nil { + return nil, err + } + + fmt.Println("Nonce: ", nonce) + + // This will output in the form CipherTest | Tag and will need rearranging + ciperTextAndTag := gcm.Seal(nil, nonce, b, nil) + fmt.Println(ciperTextAndTag) - nonce := make([]byte, gcm.NonceSize()) - if _, err = io.ReadFull(rand.Reader, nonce); err != nil { - fmt.Println(err) - } + // Rearange output to nonce | tag | ciphertext + newCipherText := nonce + newCipherText = append(newCipherText, ciperTextAndTag[len(ciperTextAndTag)-tagSize:]...) + newCipherText = append(newCipherText, ciperTextAndTag[:len(ciperTextAndTag)-tagSize]...) - return gcm.Seal(nonce, nonce, b, nil), nil + fmt.Println(newCipherText) + + return newCipherText, nil } func decrypt(b []byte) ([]byte, error) { @@ -83,20 +101,20 @@ func decrypt(b []byte) ([]byte, error) { return nil, err } - gcm, err := cipher.NewGCM(c) - if err != nil { - fmt.Println(err) - } + gcm, err := cipher.NewGCM(c) + if err != nil { + fmt.Println(err) + } - if len(b) < gcm.NonceSize() { - return nil, fmt.Errorf("incoming bytes smaller than %d", gcm.NonceSize()) - } + if len(b) < gcm.NonceSize() { + return nil, fmt.Errorf("incoming bytes smaller than %d", gcm.NonceSize()) + } - nonce, b := b[:gcm.NonceSize()], b[gcm.NonceSize():] - plaintext, err := gcm.Open(nil, nonce, b, nil) - if err != nil { - return nil, err - } + nonce, b := b[:gcm.NonceSize()], b[gcm.NonceSize():] + plaintext, err := gcm.Open(nil, nonce, b, nil) + if err != nil { + return nil, err + } return plaintext, nil } @@ -119,72 +137,81 @@ func (q *queryer) Query() (protocol.Responser, error) { return nil, err } + fmt.Printf("Length: %d\n", len(b)) + if _, err := q.c.Write(b); err != nil { return nil, fmt.Errorf("query write: %w", err) } - n, err := q.c.Read(b) - if err != nil { - return nil, fmt.Errorf("query read: %w", err) - } else if n < minLength { - return nil, fmt.Errorf("packet too short (len: %d)", n) - } - - b, err = decrypt(b) - if err != nil { - return nil, err - } - - r := common.NewBinaryReader(b[:n], binary.LittleEndian) - i := &Info{} - - // Header. - if err = r.Read(&i.Header); err != nil { - return nil, err - } else if i.Command != ServerInfoResponse { - return nil, fmt.Errorf("unexpected cmd %x", i.Command) - } - - if i.Version > 1 { - // InstanceInfo. - if err = q.instanceInfo(r, i); err != nil { - return nil, err - } - } - - // BasicInfo. - if err = q.basicInfo(r, i); err != nil { - return nil, err - } - - if i.Version > 4 { - // PerformanceInfo. - if err = r.Read(&i.PerformanceInfo); err != nil { - return nil, err - } - } - - if i.Version > 2 { - if i.Version > 5 { - // MatchState and Teams. - if err = r.Read(&i.MatchState); err != nil { - return nil, err - } - } else if err = r.Read(&i.MatchState.MatchStateV2); err != nil { - return nil, err - } - - if err = q.teams(r, i); err != nil { - return nil, err - } - } - - // Clients - if err = q.clients(r, i); err != nil { - return nil, err - } - - return i, nil + testRead := make([]byte, 16) + n, err := q.c.Read(testRead) + // Output everything + fmt.Println(n, err, testRead) + + return nil, fmt.Errorf("debug") + // + //n, err := q.c.Read(b) + //if err != nil { + // return nil, fmt.Errorf("query read: %w", err) + //} else if n < minLength { + // return nil, fmt.Errorf("packet too short (len: %d)", n) + //} + // + //b, err = decrypt(b) + //if err != nil { + // return nil, err + //} + // + //r := common.NewBinaryReader(b[:n], binary.LittleEndian) + //i := &Info{} + // + //// Header. + //if err = r.Read(&i.Header); err != nil { + // return nil, err + //} else if i.Command != ServerInfoResponse { + // return nil, fmt.Errorf("unexpected cmd %x", i.Command) + //} + // + //if i.Version > 1 { + // // InstanceInfo. + // if err = q.instanceInfo(r, i); err != nil { + // return nil, err + // } + //} + // + //// BasicInfo. + //if err = q.basicInfo(r, i); err != nil { + // return nil, err + //} + // + //if i.Version > 4 { + // // PerformanceInfo. + // if err = r.Read(&i.PerformanceInfo); err != nil { + // return nil, err + // } + //} + // + //if i.Version > 2 { + // if i.Version > 5 { + // // MatchState and Teams. + // if err = r.Read(&i.MatchState); err != nil { + // return nil, err + // } + // } else if err = r.Read(&i.MatchState.MatchStateV2); err != nil { + // return nil, err + // } + // + // if err = q.teams(r, i); err != nil { + // return nil, err + // } + //} + // + //// Clients + //if err = q.clients(r, i); err != nil { + // return nil, err + //} + // + //return i, nil } // instanceInfo decodes the instance information from a response. From 309257b00a39446e049b35b6d74c0c6453315b9a Mon Sep 17 00:00:00 2001 From: Michael Durnhofer Date: Tue, 30 Nov 2021 08:18:10 -0800 Subject: [PATCH 3/9] Updates for working tf2e query protocol encryption (#20) --- lib/svrquery/protocol/titanfall/query.go | 160 +++++++++++------------ 1 file changed, 77 insertions(+), 83 deletions(-) diff --git a/lib/svrquery/protocol/titanfall/query.go b/lib/svrquery/protocol/titanfall/query.go index 39a5b94..3e9f2d8 100644 --- a/lib/svrquery/protocol/titanfall/query.go +++ b/lib/svrquery/protocol/titanfall/query.go @@ -5,7 +5,7 @@ import ( "crypto/cipher" "crypto/rand" "encoding/base64" - "encoding/hex" + "encoding/binary" "fmt" "io" "os" @@ -22,6 +22,7 @@ var ( nonceSize = 12 tagSize = 16 packetSize = 1200 + gcmAdditionalData = []byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16} ) type queryer struct { @@ -49,11 +50,10 @@ func encrypt(b []byte) ([]byte, error) { return nil, err } - hexKey := []byte(hex.EncodeToString(keyBytes)) - fmt.Println("key length", len(hexKey)) - fmt.Println("key", hexKey) + //fmt.Println("key length", len(keyBytes)) + //fmt.Println("key", keyBytes) - c, err := aes.NewCipher(hexKey) + c, err := aes.NewCipher(keyBytes) if err != nil { return nil, err } @@ -63,24 +63,24 @@ func encrypt(b []byte) ([]byte, error) { return nil, err } - fmt.Println("Nonce size", gcm.NonceSize()) + //fmt.Println("Nonce size", gcm.NonceSize()) nonce := make([]byte, gcm.NonceSize()) if _, err = io.ReadFull(rand.Reader, nonce); err != nil { return nil, err } - fmt.Println("Nonce: ", nonce) + //fmt.Println("Nonce: ", nonce) // This will output in the form CipherTest | Tag and will need rearranging - ciperTextAndTag := gcm.Seal(nil, nonce, b, nil) - fmt.Println(ciperTextAndTag) + ciperTextAndTag := gcm.Seal(nil, nonce, b, gcmAdditionalData) + //fmt.Println(ciperTextAndTag) // Rearange output to nonce | tag | ciphertext newCipherText := nonce newCipherText = append(newCipherText, ciperTextAndTag[len(ciperTextAndTag)-tagSize:]...) newCipherText = append(newCipherText, ciperTextAndTag[:len(ciperTextAndTag)-tagSize]...) - fmt.Println(newCipherText) + //fmt.Println(newCipherText) return newCipherText, nil } @@ -96,7 +96,7 @@ func decrypt(b []byte) ([]byte, error) { return nil, err } - c, err := aes.NewCipher([]byte(hex.EncodeToString(keyBytes))) + c, err := aes.NewCipher(keyBytes) if err != nil { return nil, err } @@ -110,8 +110,9 @@ func decrypt(b []byte) ([]byte, error) { return nil, fmt.Errorf("incoming bytes smaller than %d", gcm.NonceSize()) } - nonce, b := b[:gcm.NonceSize()], b[gcm.NonceSize():] - plaintext, err := gcm.Open(nil, nonce, b, nil) + nonce, tag, b := b[:gcm.NonceSize()], b[gcm.NonceSize():gcm.NonceSize()+tagSize], b[gcm.NonceSize()+tagSize:] + b = append(b, tag...) + plaintext, err := gcm.Open(nil, nonce, b, gcmAdditionalData) if err != nil { return nil, err } @@ -137,81 +138,74 @@ func (q *queryer) Query() (protocol.Responser, error) { return nil, err } - fmt.Printf("Length: %d\n", len(b)) + //fmt.Printf("Length: %d\n", len(b)) if _, err := q.c.Write(b); err != nil { return nil, fmt.Errorf("query write: %w", err) } - testRead := make([]byte, 16) - n, err := q.c.Read(testRead) - // Output everything - fmt.Println(n, err, testRead) - - return nil, fmt.Errorf("debug") - // - //n, err := q.c.Read(b) - //if err != nil { - // return nil, fmt.Errorf("query read: %w", err) - //} else if n < minLength { - // return nil, fmt.Errorf("packet too short (len: %d)", n) - //} - // - //b, err = decrypt(b) - //if err != nil { - // return nil, err - //} - // - //r := common.NewBinaryReader(b[:n], binary.LittleEndian) - //i := &Info{} - // - //// Header. - //if err = r.Read(&i.Header); err != nil { - // return nil, err - //} else if i.Command != ServerInfoResponse { - // return nil, fmt.Errorf("unexpected cmd %x", i.Command) - //} - // - //if i.Version > 1 { - // // InstanceInfo. - // if err = q.instanceInfo(r, i); err != nil { - // return nil, err - // } - //} - // - //// BasicInfo. - //if err = q.basicInfo(r, i); err != nil { - // return nil, err - //} - // - //if i.Version > 4 { - // // PerformanceInfo. - // if err = r.Read(&i.PerformanceInfo); err != nil { - // return nil, err - // } - //} - // - //if i.Version > 2 { - // if i.Version > 5 { - // // MatchState and Teams. - // if err = r.Read(&i.MatchState); err != nil { - // return nil, err - // } - // } else if err = r.Read(&i.MatchState.MatchStateV2); err != nil { - // return nil, err - // } - // - // if err = q.teams(r, i); err != nil { - // return nil, err - // } - //} - // - //// Clients - //if err = q.clients(r, i); err != nil { - // return nil, err - //} - // - //return i, nil + n, err := q.c.Read(b) + if err != nil { + return nil, fmt.Errorf("query read: %w", err) + } else if n < minLength { + return nil, fmt.Errorf("packet too short (len: %d)", n) + } + + b, err = decrypt(b[:n]) + if err != nil { + return nil, err + } + + r := common.NewBinaryReader(b, binary.LittleEndian) + i := &Info{} + + // Header. + if err = r.Read(&i.Header); err != nil { + return nil, err + } else if i.Command != ServerInfoResponse { + return nil, fmt.Errorf("unexpected cmd %x", i.Command) + } + + if i.Version > 1 { + // InstanceInfo. + if err = q.instanceInfo(r, i); err != nil { + return nil, err + } + } + + // BasicInfo. + if err = q.basicInfo(r, i); err != nil { + return nil, err + } + + if i.Version > 4 { + // PerformanceInfo. + if err = r.Read(&i.PerformanceInfo); err != nil { + return nil, err + } + } + + if i.Version > 2 { + if i.Version > 5 { + // MatchState and Teams. + if err = r.Read(&i.MatchState); err != nil { + return nil, err + } + } else if err = r.Read(&i.MatchState.MatchStateV2); err != nil { + return nil, err + } + + if err = q.teams(r, i); err != nil { + return nil, err + } + } + + // Clients + if err = q.clients(r, i); err != nil { + return nil, err + } + + return i, nil } // instanceInfo decodes the instance information from a response. From 273098e35f9e8b7b2583faee54d82b2bc16a70a5 Mon Sep 17 00:00:00 2001 From: lwaddicor Date: Thu, 2 Dec 2021 15:57:36 +0000 Subject: [PATCH 4/9] cleanup, add cli support and tests --- cmd/cli/bulk.go | 185 ++++++++++++++++++ cmd/cli/bulk_test.go | 101 ++++++++++ cmd/cli/main.go | 21 +- go.mod | 5 +- go.sum | 7 + lib/svrquery/protocol/interfaces.go | 5 + lib/svrquery/protocol/sqp/types.go | 5 + lib/svrquery/protocol/titanfall/query.go | 104 +++++----- lib/svrquery/protocol/titanfall/query_test.go | 102 +++++----- .../protocol/titanfall/testdata/request-v8 | Bin 0 -> 1200 bytes .../protocol/titanfall/testdata/response-v8 | Bin 0 -> 191 bytes lib/svrquery/protocol/titanfall/types.go | 5 + lib/svrquery/protocol/titanfall/types_test.go | 4 +- 13 files changed, 431 insertions(+), 113 deletions(-) create mode 100644 cmd/cli/bulk.go create mode 100644 cmd/cli/bulk_test.go create mode 100644 lib/svrquery/protocol/titanfall/testdata/request-v8 create mode 100644 lib/svrquery/protocol/titanfall/testdata/response-v8 diff --git a/cmd/cli/bulk.go b/cmd/cli/bulk.go new file mode 100644 index 0000000..e778817 --- /dev/null +++ b/cmd/cli/bulk.go @@ -0,0 +1,185 @@ +package main + +import ( + "bufio" + "encoding/json" + "errors" + "fmt" + "log" + "os" + "strings" + + "github.com/multiplay/go-svrquery/lib/svrquery" + "github.com/multiplay/go-svrquery/lib/svrquery/protocol" +) + +const ( + numWorkers = 100 +) + +var ( + errNoItem = errors.New("no item") + errEntryInvalid = errors.New("invalid entry") +) + +// BulkResponseItem contains the information about the query being performed +// against a single server. +type BulkResponseItem struct { + Address string `json:"address"` + ServerInfo *BulkResponseServerInfoItem `json:"serverInfo,omitempty"` + Error string `json:"error,omitempty"` +} + +// BulkResponseServerInfoItem container the server information. +type BulkResponseServerInfoItem struct { + CurrentPlayers int64 `json:"currentPlayers"` + MaxPlayers int64 `json:"maxPlayers"` + Map string `json:"map"` +} + +// BulkResponseItemWork is an item returned by an worker containing the data item +// plus any terminal error it encountered. +type BulkResponseItemWork struct { + Item *BulkResponseItem + Err error +} + +// queryBulk queries a bulk set of servers using a query file. +func queryBulk(file string) error { + f, err := os.Open(file) + if err != nil { + log.Fatal(err) + } + defer f.Close() + + // Make a jobs channel and a number of workers to processes + // work off of the channel. + jobChan := make(chan string) + resultsChan := make(chan BulkResponseItemWork) + for w := 1; w <= numWorkers; w++ { + go worker(jobChan, resultsChan) + } + + items := make([]BulkResponseItem, 0, 1000) + + // Queue work onto the channel + scanner := bufio.NewScanner(f) + jobCount := 0 + for scanner.Scan() { + jobCount++ + jobChan <- scanner.Text() + } + + // Receive results from workers. + for i := 0; i < jobCount; i++ { + v := <-resultsChan + switch { + case errors.Is(v.Err, errNoItem): + // Not fatal, but no response for this entry was created. + continue + case v.Err != nil: + // We had a major issue, force immediate stop. + return fmt.Errorf("fatal error from worker: %w", v.Err) + } + + // add the item to our list of items. + items = append(items, *v.Item) + } + + b, err := json.MarshalIndent(items, "", "\t") + if err != nil { + return err + } + fmt.Printf("%s\n", b) + return nil +} + +// worker is run in a goroutine to provide processing for the items. +func worker(jobChan <-chan string, results chan<- BulkResponseItemWork) { + for entry := range jobChan { + item, err := processBulkEntry(entry) + results <- BulkResponseItemWork{ + Item: item, + Err: err, + } + } +} + +// processBulkEntry processes an entry and returns an item containing the result or error. +func processBulkEntry(entry string) (*BulkResponseItem, error) { + querySection, addressSection, err := parseEntry(entry) + if err != nil { + return nil, fmt.Errorf("parse file entry: %w", err) + } + + item := &BulkResponseItem{ + Address: addressSection, + } + + // If the query contains any options retrieve them and + querySection, options, err := parseOptions(querySection) + if err != nil { + // These errors are non fatal, as we know which server it is for + item.Error = err.Error() + return item, nil + } + + if !protocol.Supported(querySection) { + item.Error = fmt.Sprintf("unsupported protocol: %s", querySection) + return item, nil + } + + client, err := svrquery.NewClient(querySection, addressSection, options...) + if err != nil { + item.Error = fmt.Sprintf("create client: %s", err.Error()) + return item, nil + } + + resp, err := client.Query() + if err != nil { + item.Error = fmt.Sprintf("query client: %s", err.Error()) + return item, nil + } + + item.ServerInfo = &BulkResponseServerInfoItem{ + CurrentPlayers: resp.NumClients(), + MaxPlayers: resp.MaxClients(), + Map: "UNKNOWN", + } + + if currentMap, ok := resp.(protocol.Mapper); ok { + item.ServerInfo.Map = currentMap.Map() + } + return item, nil +} + +func parseEntry(entry string) (querySection, addressSection string, err error) { + entry = strings.TrimSpace(entry) + if entry == "" { + return "", "", fmt.Errorf("process entry: %w", errNoItem) + } + sections := strings.Split(entry, " ") + if len(sections) != 2 { + return "", "", fmt.Errorf("%w: wrong number of sections", errEntryInvalid) + } + + return sections[0], sections[1], nil +} + +func parseOptions(querySection string) (baseQuery string, options []svrquery.Option, error error) { + options = make([]svrquery.Option, 0) + protocolSections := strings.Split(querySection, ",") + for i := 1; i < len(protocolSections); i++ { + keyVal := strings.SplitN(protocolSections[i], "=", 2) + if len(keyVal) != 2 { + return "", nil, fmt.Errorf("key value pair invalid: %v", keyVal) + + } + // Only support key at the moment. + switch strings.ToLower(keyVal[0]) { + case "key": + options = append(options, svrquery.WithKey(keyVal[1])) + } + } + return protocolSections[0], options, nil +} diff --git a/cmd/cli/bulk_test.go b/cmd/cli/bulk_test.go new file mode 100644 index 0000000..e0807ee --- /dev/null +++ b/cmd/cli/bulk_test.go @@ -0,0 +1,101 @@ +package main + +import ( + "testing" + + "github.com/multiplay/go-svrquery/lib/svrquery" + "github.com/stretchr/testify/require" +) + +func TestParseEntry(t *testing.T) { + testCases := []struct { + name string + input string + expQuery string + expAddress string + expErr error + }{ + { + name: "ok", + input: "sqp 1.2.3.4:1234", + expQuery: "sqp", + expAddress: "1.2.3.4:1234", + }, + { + name: "empty line", + input: "", + expErr: errNoItem, + }, + { + name: "invalid entry", + input: "sqp 1.2.3.4:1234 extra", + expErr: errEntryInvalid, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + query, addr, err := parseEntry(tc.input) + if err != nil { + require.ErrorIs(t, err, tc.expErr) + return + } + require.NoError(t, err) + require.Equal(t, tc.expQuery, query) + require.Equal(t, tc.expAddress, addr) + }) + } +} + +func TestCreateClient(t *testing.T) { + testCases := []struct { + name string + query string + expQuery string + expKey string + expErr error + }{ + { + name: "ok", + query: "tf2e", + expQuery: "tf2e", + }, + { + name: "with_key", + query: "tf2e,key=val", + expKey: "val", + expQuery: "tf2e", + }, + { + name: "with_unsupported_other", + query: "tf2e,other=val", + expQuery: "tf2e", + }, + { + name: "invalid entry", + query: "tf2e", + expQuery: "tf2e", + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + baseQuery, options, err := parseOptions(tc.query) + if err != nil { + require.ErrorIs(t, err, tc.expErr) + return + } + require.NoError(t, err) + require.Equal(t, tc.expQuery, baseQuery) + + // Validate key setting + if tc.expKey != "" { + require.Len(t, options, 1) + c := svrquery.Client{} + require.NoError(t, options[0](&c)) + require.Equal(t, tc.expKey, c.Key()) + } + require.NotNil(t, options) + }) + } +} diff --git a/cmd/cli/main.go b/cmd/cli/main.go index 9069025..a22146d 100644 --- a/cmd/cli/main.go +++ b/cmd/cli/main.go @@ -13,10 +13,20 @@ import ( func main() { address := flag.String("addr", "", "Address e.g. 127.0.0.1:12345") proto := flag.String("proto", "", "Protocol e.g. sqp, tf2e, tf2e-v7, tf2e-v8") + key := flag.String("key", "", "Key to use to authenticate") + file := flag.String("file", "", "Bulk file to execute") flag.Parse() l := log.New(os.Stderr, "", 0) + if *file != "" { + // Use bulk file mode + if err := queryBulk(*file); err != nil { + l.Fatal(err) + } + return + } + if *address == "" { l.Println("No address provided") flag.PrintDefaults() @@ -29,13 +39,18 @@ func main() { os.Exit(1) } - if err := query(*proto, *address); err != nil { + if err := query(*proto, *address, *key); err != nil { l.Fatal(err) } } -func query(proto, address string) error { - c, err := svrquery.NewClient(proto, address) +func query(proto, address, key string) error { + options := make([]svrquery.Option, 0) + if key != "" { + options = append(options, svrquery.WithKey(key)) + } + + c, err := svrquery.NewClient(proto, address, options...) if err != nil { return err } diff --git a/go.mod b/go.mod index fb175e8..28fce3e 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,8 @@ go 1.13 require ( github.com/davecgh/go-spew v1.1.1 // indirect github.com/netdata/go-orchestrator v0.0.0-20190905093727-c793edba0e8f - github.com/stretchr/objx v0.1.1 // indirect - github.com/stretchr/testify v1.4.0 + github.com/stretchr/objx v0.3.0 // indirect + github.com/stretchr/testify v1.7.0 golang.org/x/sys v0.0.0-20190422165155-953cdadca894 // indirect + gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect ) diff --git a/go.sum b/go.sum index 50d2660..3e4c916 100644 --- a/go.sum +++ b/go.sum @@ -14,9 +14,13 @@ github.com/stretchr/objx v0.1.0 h1:4G4v2dO3VZwixGIRoQ5Lfboy6nUhCyYzaqnIAPPhYs4= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.1 h1:2vfRuCMp5sSVIDSqO8oNnWJq7mPa6KVP3iPIwFBuy8A= github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.3.0 h1:NGXK3lHquSN08v5vWalVI/L8XU9hdzE/G6xsrze47As= +github.com/stretchr/objx v0.3.0/go.mod h1:qt09Ya8vawLte6SNmTgCsAVtYtaKzEcn8ATUoHMkEqE= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= +github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223 h1:DH4skfRX4EBpamg7iV4ZlCpblAHI6s6TDM39bFZumv8= golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190422165155-953cdadca894 h1:Cz4ceDQGXuKRnVBDTS23GTn/pU5OE2C0WrNTOYK1Uuc= @@ -25,3 +29,6 @@ gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+ gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b h1:h8qDotaEPuJATrMmW04NCwg7v22aHH28wwpauUhK9Oo= +gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/lib/svrquery/protocol/interfaces.go b/lib/svrquery/protocol/interfaces.go index 150d6d2..a4895e8 100644 --- a/lib/svrquery/protocol/interfaces.go +++ b/lib/svrquery/protocol/interfaces.go @@ -17,6 +17,11 @@ type Responser interface { MaxClients() int64 } +// Mapper represents something which can return the current map. +type Mapper interface { + Map() string +} + // Client is an interface which is implemented by types which can act a query transport. type Client interface { io.ReadWriteCloser diff --git a/lib/svrquery/protocol/sqp/types.go b/lib/svrquery/protocol/sqp/types.go index 9fc345f..6f617a2 100644 --- a/lib/svrquery/protocol/sqp/types.go +++ b/lib/svrquery/protocol/sqp/types.go @@ -67,6 +67,11 @@ func (q *QueryResponse) NumClients() int64 { return int64(q.ServerInfo.CurrentPlayers) } +// Map implements protocol.Mapper. +func (q *QueryResponse) Map() string { + return q.ServerInfo.Map +} + type infoHeader struct { Name string Type DataType diff --git a/lib/svrquery/protocol/titanfall/query.go b/lib/svrquery/protocol/titanfall/query.go index 3e9f2d8..bd01281 100644 --- a/lib/svrquery/protocol/titanfall/query.go +++ b/lib/svrquery/protocol/titanfall/query.go @@ -8,7 +8,6 @@ import ( "encoding/binary" "fmt" "io" - "os" "github.com/multiplay/go-svrquery/lib/svrquery/common" "github.com/multiplay/go-svrquery/lib/svrquery/protocol" @@ -18,10 +17,9 @@ import ( var ( // minLength is the smallest packet we can expect. - minLength = 26 - nonceSize = 12 - tagSize = 16 - packetSize = 1200 + minLength = 26 + tagSize = 16 + packetSize = 1200 gcmAdditionalData = []byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16} ) @@ -39,71 +37,54 @@ func newQueryer(version byte) func(c protocol.Client) protocol.Queryer { } } -func encrypt(b []byte) ([]byte, error) { - key := os.Getenv("AES_KEY") - if key == "" { - return nil, fmt.Errorf("no aes key found. (Is AES_KEY env var set?)") - } - - keyBytes, err := base64.StdEncoding.DecodeString(key) +// encrypt encrypts the byte buffer given to it. +func (q *queryer) encrypt(b []byte) ([]byte, error) { + keyBytes, err := base64.StdEncoding.DecodeString(q.c.Key()) if err != nil { - return nil, err + return nil, fmt.Errorf("decode key: %w", err) } - //fmt.Println("key length", len(keyBytes)) - //fmt.Println("key", keyBytes) - c, err := aes.NewCipher(keyBytes) if err != nil { - return nil, err + return nil, fmt.Errorf("new aes cipher: %w", err) } gcm, err := cipher.NewGCM(c) if err != nil { - return nil, err + return nil, fmt.Errorf("new gcm: %w", err) } - //fmt.Println("Nonce size", gcm.NonceSize()) nonce := make([]byte, gcm.NonceSize()) if _, err = io.ReadFull(rand.Reader, nonce); err != nil { - return nil, err + return nil, fmt.Errorf("read random nonce: %w", err) } - //fmt.Println("Nonce: ", nonce) + // This will output in the form CipherTest | Tag and will need rearranging. + cipherTextAndTag := gcm.Seal(nil, nonce, b, gcmAdditionalData) - // This will output in the form CipherTest | Tag and will need rearranging - ciperTextAndTag := gcm.Seal(nil, nonce, b, gcmAdditionalData) - //fmt.Println(ciperTextAndTag) - - // Rearange output to nonce | tag | ciphertext + // Rearrange output to nonce | tag | ciphertext. newCipherText := nonce - newCipherText = append(newCipherText, ciperTextAndTag[len(ciperTextAndTag)-tagSize:]...) - newCipherText = append(newCipherText, ciperTextAndTag[:len(ciperTextAndTag)-tagSize]...) - - //fmt.Println(newCipherText) + newCipherText = append(newCipherText, cipherTextAndTag[len(cipherTextAndTag)-tagSize:]...) + newCipherText = append(newCipherText, cipherTextAndTag[:len(cipherTextAndTag)-tagSize]...) return newCipherText, nil } -func decrypt(b []byte) ([]byte, error) { - key := os.Getenv("AES_KEY") - if key == "" { - return nil, fmt.Errorf("no aes key found. (Is AES_KEY env var set?)") - } - - keyBytes, err := base64.StdEncoding.DecodeString(key) +// decrypt decrypts the byte buffer given to it. +func (q *queryer) decrypt(b []byte) ([]byte, error) { + keyBytes, err := base64.StdEncoding.DecodeString(q.c.Key()) if err != nil { - return nil, err + return nil, fmt.Errorf("decode key: %w", err) } c, err := aes.NewCipher(keyBytes) if err != nil { - return nil, err + return nil, fmt.Errorf("new aes cipher: %w", err) } gcm, err := cipher.NewGCM(c) if err != nil { - fmt.Println(err) + return nil, fmt.Errorf("new gcm: %w", err) } if len(b) < gcm.NonceSize() { @@ -121,10 +102,11 @@ func decrypt(b []byte) ([]byte, error) { } // Query implements protocol.Queryer. -func (q *queryer) Query() (protocol.Responser, error) { - b := make([]byte, 1200) +func (q *queryer) Query() (resp protocol.Responser, err error) { + b := make([]byte, packetSize) copy(b, q.serverInfoPkt()) + // For older query versions we use a keyed magic section to auth. For newer versions we use encryption if key := q.c.Key(); key != "" { if q.version < 5 { // If keyed data asked for bump version sent to supported version level. @@ -133,13 +115,13 @@ func (q *queryer) Query() (protocol.Responser, error) { copy(b[6:], key) } - b, err := encrypt(b) - if err != nil { - return nil, err + if q.version >= 8 && q.c.Key() != "" { + b, err = q.encrypt(b) + if err != nil { + return nil, err + } } - //fmt.Printf("Length: %d\n", len(b)) - if _, err := q.c.Write(b); err != nil { return nil, fmt.Errorf("query write: %w", err) } @@ -150,41 +132,43 @@ func (q *queryer) Query() (protocol.Responser, error) { } else if n < minLength { return nil, fmt.Errorf("packet too short (len: %d)", n) } - - b, err = decrypt(b[:n]) - if err != nil { - return nil, err + + if q.version >= 8 && q.c.Key() != "" { + b, err = q.decrypt(b[:n]) + if err != nil { + return nil, err + } } - + r := common.NewBinaryReader(b, binary.LittleEndian) i := &Info{} - + // Header. if err = r.Read(&i.Header); err != nil { return nil, err } else if i.Command != ServerInfoResponse { return nil, fmt.Errorf("unexpected cmd %x", i.Command) } - + if i.Version > 1 { // InstanceInfo. if err = q.instanceInfo(r, i); err != nil { return nil, err } } - + // BasicInfo. if err = q.basicInfo(r, i); err != nil { return nil, err } - + if i.Version > 4 { // PerformanceInfo. if err = r.Read(&i.PerformanceInfo); err != nil { return nil, err } } - + if i.Version > 2 { if i.Version > 5 { // MatchState and Teams. @@ -194,17 +178,17 @@ func (q *queryer) Query() (protocol.Responser, error) { } else if err = r.Read(&i.MatchState.MatchStateV2); err != nil { return nil, err } - + if err = q.teams(r, i); err != nil { return nil, err } } - + // Clients if err = q.clients(r, i); err != nil { return nil, err } - + return i, nil } diff --git a/lib/svrquery/protocol/titanfall/query_test.go b/lib/svrquery/protocol/titanfall/query_test.go index 98106d5..71b7efb 100644 --- a/lib/svrquery/protocol/titanfall/query_test.go +++ b/lib/svrquery/protocol/titanfall/query_test.go @@ -1,9 +1,6 @@ package titanfall import ( - "encoding/base64" - "fmt" - "os" "testing" "github.com/multiplay/go-svrquery/lib/svrquery/clienttest" @@ -59,10 +56,6 @@ var ( ) func TestQuery(t *testing.T) { - key := "12345678901234567890123456789012" - - os.Setenv("AES_KEY", base64.StdEncoding.EncodeToString([]byte(key))) - keyed := base keyed.Version = 5 keyed.AverageFrameTime = 1.2347187 @@ -84,13 +77,26 @@ func TestQuery(t *testing.T) { } v7.TeamsLeftWithPlayersNum = 6 + v8 := v7 + v8.Version = 8 + v8.InstanceInfoV8 = InstanceInfoV8{ + Retail: 1, + InstanceType: 2, + ClientCRC: 4294967295, + NetProtocol: 526, + HealthFlags: 0, + RandomServerID: 0, + } + v8.InstanceInfo = InstanceInfo{} + cases := []struct { - name string - version byte - request string - response string - key string - expected Info + name string + version byte + request string + response string + key string + expected Info + expEncypted bool }{ { name: "v3", @@ -106,6 +112,15 @@ func TestQuery(t *testing.T) { response: "response-v7", expected: v7, }, + { + name: "v8", + version: 8, + request: "request-v8", + response: "response-v8", + expected: v8, + key: testKey, + expEncypted: true, + }, { name: "keyed", version: 5, @@ -126,46 +141,44 @@ func TestQuery(t *testing.T) { for _, tc := range cases { t.Run(tc.name, func(t *testing.T) { + var err error + mc := &clienttest.MockClient{} + mc.On("Key").Return("Z2ZkZ3Nnbmpza2U0cnRyZQ==") + p := queryer{ + c: mc, + version: tc.version, + } + req := clienttest.LoadData(t, testDir, tc.request) - encodedReq, err := encrypt(req) - require.NoError(t, err) resp := clienttest.LoadData(t, testDir, tc.response) - encodedResp, err := encrypt(resp) - require.NoError(t, err) - m := &clienttest.MockClient{} - m.On("Write", mock.AnythingOfType("[]uint8")).Return(len(encodedReq), nil) - m.On("Read", mock.AnythingOfType("[]uint8")).Return(encodedResp, nil) - m.On("Key").Return(tc.key) + if tc.expEncypted { + req, err = p.encrypt(req) + require.NoError(t, err) + resp, err = p.encrypt(resp) + require.NoError(t, err) + } + + mc.On("Write", mock.AnythingOfType("[]uint8")).Return(len(req), nil) + mc.On("Read", mock.AnythingOfType("[]uint8")).Return(resp, nil) - p := newQueryer(tc.version)(m) i, err := p.Query() require.NoError(t, err) require.IsType(t, &Info{}, i) require.Equal(t, &tc.expected, i) - m.AssertExpectations(t) + mc.AssertExpectations(t) }) } } func TestEncryptAndDecrypt(t *testing.T) { - //key := "12345678901234567890123456789012" // 32 - key := "1234567890123456" // 16 - - os.Setenv("AES_KEY", base64.StdEncoding.EncodeToString([]byte(key))) - - startText := "Some test text to be encrypted and decrypted" - encoded, err := encrypt([]byte(startText)) - require.NoError(t, err) - - fmt.Println(encoded) - - decoded, err := decrypt(encoded) - require.NoError(t, err) - - fmt.Println(string(decoded)) + mc := &clienttest.MockClient{} + mc.On("Key").Return("Z2ZkZ3Nnbmpza2U0cnRyZQ==") + p := queryer{ + c: mc, + } - bigText := `Line 1: Some test text to be encrypted and decrypted + text := `Line 1: Some test text to be encrypted and decrypted Line 2: Some test text to be encrypted and decrypted Line 3: Some test text to be encrypted and decrypted Line 4: Some test text to be encrypted and decrypted @@ -174,13 +187,10 @@ Line 6: Some test text to be encrypted and decrypted Line 7: Some test text to be encrypted and decrypted Line 8: Some test text to be encrypted and decrypted` - encoded, err = encrypt([]byte(bigText)) + encoded, err := p.encrypt([]byte(text)) require.NoError(t, err) - fmt.Println(encoded) - - decoded, err = decrypt(encoded) + decoded, err := p.decrypt(encoded) require.NoError(t, err) - - fmt.Println(string(decoded)) -} \ No newline at end of file + require.Equal(t, text, string(decoded)) +} diff --git a/lib/svrquery/protocol/titanfall/testdata/request-v8 b/lib/svrquery/protocol/titanfall/testdata/request-v8 new file mode 100644 index 0000000000000000000000000000000000000000..4c925c03e47ad01340e3d21fb67dec1aa46c4921 GIT binary patch literal 1200 acmezW|Nnnq4u(-M8UmvsFd71bJ_G Date: Thu, 2 Dec 2021 16:09:42 +0000 Subject: [PATCH 5/9] fix a few merge conflicts --- cmd/cli/main.go | 4 +++- lib/svrsample/query.go | 6 +++--- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/cmd/cli/main.go b/cmd/cli/main.go index deb3609..6f5ae1c 100644 --- a/cmd/cli/main.go +++ b/cmd/cli/main.go @@ -24,7 +24,6 @@ func main() { l := log.New(os.Stderr, "", 0) - if *file != "" { // Use bulk file mode if err := queryBulk(*file); err != nil { @@ -100,6 +99,9 @@ func server(l *log.Logger, proto, address string) error { Map: "Map", Port: 1000, }) + if err != nil { + return err + } addr, err := net.ResolveUDPAddr("udp4", address) if err != nil { diff --git a/lib/svrsample/query.go b/lib/svrsample/query.go index 8cb019e..8fde442 100644 --- a/lib/svrsample/query.go +++ b/lib/svrsample/query.go @@ -10,8 +10,8 @@ import ( ) var ( - // ErrProtoNotFound returned when a protocol is not found - ErrProtoNotFound = errors.New("protocol not found") + // ErrProtoNotSupported returned when a protocol is not supported + ErrProtoNotSupported = errors.New("protocol not supported") ) // GetResponder gets the appropriate responder for the protocol provided @@ -20,5 +20,5 @@ func GetResponder(proto string, state common.QueryState) (common.QueryResponder, case "sqp": return sqp.NewQueryResponder(state) } - return nil, fmt.Errorf("%w: %s", ErrProtoNotFound, proto) + return nil, fmt.Errorf("%w: %s", ErrProtoNotSupported, proto) } From 9a1a7e02309d3e8130ebf8a8d523da3ef150ae23 Mon Sep 17 00:00:00 2001 From: lwaddicor Date: Thu, 2 Dec 2021 16:42:45 +0000 Subject: [PATCH 6/9] Add very simple action to test on push --- .github/workflows/build-test.yml | 14 ++++++++++++++ 1 file changed, 14 insertions(+) create mode 100644 .github/workflows/build-test.yml diff --git a/.github/workflows/build-test.yml b/.github/workflows/build-test.yml new file mode 100644 index 0000000..109eff0 --- /dev/null +++ b/.github/workflows/build-test.yml @@ -0,0 +1,14 @@ +on: [push, pull_request] +name: Test +jobs: + test: + runs-on: ubuntu-latest + steps: + - name: Install Go + uses: actions/setup-go@v2 + with: + go-version: 1.17 + - name: Checkout code + uses: actions/checkout@v2 + - name: Test + run: go test ./... From 289e79d29fef88719d1a59e4b0cfe134fa5b1a75 Mon Sep 17 00:00:00 2001 From: lwaddicor Date: Fri, 14 Jan 2022 09:50:09 +0000 Subject: [PATCH 7/9] rework --- cmd/cli/bulk.go | 64 ++++++++++++++++++++++++++++++++++++------------- cmd/cli/main.go | 2 +- 2 files changed, 48 insertions(+), 18 deletions(-) diff --git a/cmd/cli/bulk.go b/cmd/cli/bulk.go index e778817..9889819 100644 --- a/cmd/cli/bulk.go +++ b/cmd/cli/bulk.go @@ -8,6 +8,7 @@ import ( "log" "os" "strings" + "sync" "github.com/multiplay/go-svrquery/lib/svrquery" "github.com/multiplay/go-svrquery/lib/svrquery/protocol" @@ -30,7 +31,7 @@ type BulkResponseItem struct { Error string `json:"error,omitempty"` } -// BulkResponseServerInfoItem container the server information. +// BulkResponseServerInfoItem containing basic server information. type BulkResponseServerInfoItem struct { CurrentPlayers int64 `json:"currentPlayers"` MaxPlayers int64 `json:"maxPlayers"` @@ -46,45 +47,57 @@ type BulkResponseItemWork struct { // queryBulk queries a bulk set of servers using a query file. func queryBulk(file string) error { - f, err := os.Open(file) - if err != nil { - log.Fatal(err) + // To simplify the workerpool load all the entries we are going to work on + lines := fileLines(file) + + if len(lines) > 10000 { + return fmt.Errorf("too many servers requested %d (max 10000)", len(lines)) } - defer f.Close() // Make a jobs channel and a number of workers to processes // work off of the channel. - jobChan := make(chan string) + jobChan := make(chan string, len(lines)) resultsChan := make(chan BulkResponseItemWork) + wg := sync.WaitGroup{} for w := 1; w <= numWorkers; w++ { - go worker(jobChan, resultsChan) + wg.Add(1) + go worker(jobChan, resultsChan, &wg) } - items := make([]BulkResponseItem, 0, 1000) + items := make([]BulkResponseItem, 0, len(lines)) // Queue work onto the channel - scanner := bufio.NewScanner(f) - jobCount := 0 - for scanner.Scan() { - jobCount++ - jobChan <- scanner.Text() + for _, line := range lines { + jobChan <- line } // Receive results from workers. - for i := 0; i < jobCount; i++ { + var err error + for i := 0; i < len(lines); i++ { v := <-resultsChan switch { case errors.Is(v.Err, errNoItem): // Not fatal, but no response for this entry was created. continue case v.Err != nil: - // We had a major issue, force immediate stop. - return fmt.Errorf("fatal error from worker: %w", v.Err) + // We had a major issue processing the list + if err == nil { + err = fmt.Errorf("fatal error: %w", v.Err) + continue + } + err = fmt.Errorf("additional error: %w", v.Err) + continue } // add the item to our list of items. items = append(items, *v.Item) } + close(jobChan) + wg.Wait() + + if err != nil { + return err + } b, err := json.MarshalIndent(items, "", "\t") if err != nil { @@ -94,8 +107,25 @@ func queryBulk(file string) error { return nil } +func fileLines(file string) []string { + f, err := os.Open(file) + if err != nil { + log.Fatal(err) + } + defer f.Close() + + result := make([]string, 0, 1000) + scanner := bufio.NewScanner(f) + for scanner.Scan() { + line := scanner.Text() + result = append(result, line) + } + return result +} + // worker is run in a goroutine to provide processing for the items. -func worker(jobChan <-chan string, results chan<- BulkResponseItemWork) { +func worker(jobChan <-chan string, results chan<- BulkResponseItemWork, wg *sync.WaitGroup) { + defer wg.Done() for entry := range jobChan { item, err := processBulkEntry(entry) results <- BulkResponseItemWork{ diff --git a/cmd/cli/main.go b/cmd/cli/main.go index 6f5ae1c..15ad762 100644 --- a/cmd/cli/main.go +++ b/cmd/cli/main.go @@ -18,7 +18,7 @@ func main() { clientAddr := flag.String("addr", "", "Address to connect to e.g. 127.0.0.1:12345") proto := flag.String("proto", "", "Protocol e.g. sqp, tf2e, tf2e-v7, tf2e-v8") key := flag.String("key", "", "Key to use to authenticate") - file := flag.String("file", "", "Bulk file to execute") + file := flag.String("file", "", "Bulk file to execute to get basic server information") serverAddr := flag.String("server", "", "Address to start server e.g. 127.0.0.1:12121, :23232") flag.Parse() From 42f5a0be47198dd0cbe59cfca8b8858258950d5f Mon Sep 17 00:00:00 2001 From: lwaddicor Date: Fri, 21 Jan 2022 18:14:51 +0000 Subject: [PATCH 8/9] review rework --- cmd/cli/bulk.go | 23 ++++++++++------------- 1 file changed, 10 insertions(+), 13 deletions(-) diff --git a/cmd/cli/bulk.go b/cmd/cli/bulk.go index 9889819..536bf29 100644 --- a/cmd/cli/bulk.go +++ b/cmd/cli/bulk.go @@ -8,7 +8,6 @@ import ( "log" "os" "strings" - "sync" "github.com/multiplay/go-svrquery/lib/svrquery" "github.com/multiplay/go-svrquery/lib/svrquery/protocol" @@ -16,6 +15,9 @@ import ( const ( numWorkers = 100 + + // maxQueries is the maximum number of queries that can be queried in one bulk request. + maxQueries = 10000 ) var ( @@ -50,18 +52,16 @@ func queryBulk(file string) error { // To simplify the workerpool load all the entries we are going to work on lines := fileLines(file) - if len(lines) > 10000 { - return fmt.Errorf("too many servers requested %d (max 10000)", len(lines)) + if len(lines) > maxQueries { + return fmt.Errorf("too many servers requested %d (max %d)", len(lines), maxQueries) } // Make a jobs channel and a number of workers to processes // work off of the channel. jobChan := make(chan string, len(lines)) resultsChan := make(chan BulkResponseItemWork) - wg := sync.WaitGroup{} for w := 1; w <= numWorkers; w++ { - wg.Add(1) - go worker(jobChan, resultsChan, &wg) + go worker(jobChan, resultsChan) } items := make([]BulkResponseItem, 0, len(lines)) @@ -70,6 +70,7 @@ func queryBulk(file string) error { for _, line := range lines { jobChan <- line } + close(jobChan) // Receive results from workers. var err error @@ -88,12 +89,9 @@ func queryBulk(file string) error { err = fmt.Errorf("additional error: %w", v.Err) continue } - // add the item to our list of items. items = append(items, *v.Item) } - close(jobChan) - wg.Wait() if err != nil { return err @@ -124,8 +122,7 @@ func fileLines(file string) []string { } // worker is run in a goroutine to provide processing for the items. -func worker(jobChan <-chan string, results chan<- BulkResponseItemWork, wg *sync.WaitGroup) { - defer wg.Done() +func worker(jobChan <-chan string, results chan<- BulkResponseItemWork) { for entry := range jobChan { item, err := processBulkEntry(entry) results <- BulkResponseItemWork{ @@ -161,13 +158,13 @@ func processBulkEntry(entry string) (*BulkResponseItem, error) { client, err := svrquery.NewClient(querySection, addressSection, options...) if err != nil { - item.Error = fmt.Sprintf("create client: %s", err.Error()) + item.Error = fmt.Sprintf("create client: %s", err) return item, nil } resp, err := client.Query() if err != nil { - item.Error = fmt.Sprintf("query client: %s", err.Error()) + item.Error = fmt.Sprintf("query client: %s", err) return item, nil } From 33ed3b7b1a705e22fb064fbdcb6da5cc73298d4a Mon Sep 17 00:00:00 2001 From: Steven Hartland Date: Sat, 22 Jan 2022 00:45:30 +0000 Subject: [PATCH 9/9] feat: bulk streamed processing Add streamed processing of bulk, eliminating the limits needed to prevent memory requirements as well as recreation of svrquery.Client for each processed element by implementing a reusable BulkClient. This changes the processing to best effort and can result in entries without an address being output. Also: * Output to a provided stream. * Eliminate the use of log.Fatal in the library. * Include element details when parsing fails. --- cmd/cli/bulk.go | 214 +++++++++++++++++++++--------------- cmd/cli/main.go | 2 +- lib/svrquery/bulk_client.go | 54 +++++++++ 3 files changed, 178 insertions(+), 92 deletions(-) create mode 100644 lib/svrquery/bulk_client.go diff --git a/cmd/cli/bulk.go b/cmd/cli/bulk.go index 536bf29..59b75e8 100644 --- a/cmd/cli/bulk.go +++ b/cmd/cli/bulk.go @@ -2,12 +2,14 @@ package main import ( "bufio" + "bytes" "encoding/json" "errors" "fmt" - "log" + "io" "os" "strings" + "sync" "github.com/multiplay/go-svrquery/lib/svrquery" "github.com/multiplay/go-svrquery/lib/svrquery/protocol" @@ -15,9 +17,6 @@ import ( const ( numWorkers = 100 - - // maxQueries is the maximum number of queries that can be queried in one bulk request. - maxQueries = 10000 ) var ( @@ -28,11 +27,27 @@ var ( // BulkResponseItem contains the information about the query being performed // against a single server. type BulkResponseItem struct { - Address string `json:"address"` + Address string `json:"address,omitempty"` ServerInfo *BulkResponseServerInfoItem `json:"serverInfo,omitempty"` Error string `json:"error,omitempty"` } +// encode writes the JSON encoded version of i to w using the encoder e which writes to b. +// It strips the trailing \n from the output before writing to w. +func (i *BulkResponseItem) encode(w io.Writer, b *bytes.Buffer, e *json.Encoder) error { + defer b.Reset() + + if err := e.Encode(i); err != nil { + return fmt.Errorf("encode item %v: %w", i, err) + } + + if _, err := w.Write(bytes.TrimRight(b.Bytes(), "\n")); err != nil { + return fmt.Errorf("write item: %w", err) + } + + return nil +} + // BulkResponseServerInfoItem containing basic server information. type BulkResponseServerInfoItem struct { CurrentPlayers int64 `json:"currentPlayers"` @@ -40,132 +55,145 @@ type BulkResponseServerInfoItem struct { Map string `json:"map"` } -// BulkResponseItemWork is an item returned by an worker containing the data item -// plus any terminal error it encountered. -type BulkResponseItemWork struct { - Item *BulkResponseItem - Err error -} +// queryBulk queries a bulk set of servers from a query file writing the JSON results to output. +func queryBulk(file string, output io.Writer) (err error) { + work := make(chan string, numWorkers) // Buffered to ensure we can busy all workers. + results := make(chan BulkResponseItem, numWorkers) // Buffered to improve worker concurrency. -// queryBulk queries a bulk set of servers using a query file. -func queryBulk(file string) error { - // To simplify the workerpool load all the entries we are going to work on - lines := fileLines(file) + // Create a pool of workers to process work. + var wgWorkers sync.WaitGroup + wgWorkers.Add(numWorkers) + for w := 1; w <= numWorkers; w++ { + c, err := svrquery.NewBulkClient() + if err != nil { + close(work) // Ensure that existing workers return. + return fmt.Errorf("bulk client: %w", err) + } - if len(lines) > maxQueries { - return fmt.Errorf("too many servers requested %d (max %d)", len(lines), maxQueries) + go func() { + defer wgWorkers.Done() + worker(work, results, c) + }() } - // Make a jobs channel and a number of workers to processes - // work off of the channel. - jobChan := make(chan string, len(lines)) - resultsChan := make(chan BulkResponseItemWork) - for w := 1; w <= numWorkers; w++ { - go worker(jobChan, resultsChan) + // Create a writer to write the results to output as they become available. + errc := make(chan error) + go func() { + errc <- writer(output, results) + }() + + // Queue work onto the channel. + if err = producer(file, work); err != nil { + err = fmt.Errorf("producer: %w", err) + } + + // Wait for all workers to complete so that we can safely close results + // that will trigger writer to return once its processed all results. + wgWorkers.Wait() + close(results) + + if werr := <-errc; werr != nil { + if err != nil { + return fmt.Errorf("%w, writer: %s", err, werr) + } + return fmt.Errorf("writer: %w", werr) } - items := make([]BulkResponseItem, 0, len(lines)) + return err +} - // Queue work onto the channel - for _, line := range lines { - jobChan <- line +// writer writes results as JSON encoded array to w. +func writer(w io.Writer, results <-chan BulkResponseItem) (err error) { + if _, err = w.Write([]byte{'['}); err != nil { + return fmt.Errorf("write header: %w", err) } - close(jobChan) - // Receive results from workers. - var err error - for i := 0; i < len(lines); i++ { - v := <-resultsChan - switch { - case errors.Is(v.Err, errNoItem): - // Not fatal, but no response for this entry was created. - continue - case v.Err != nil: - // We had a major issue processing the list + // Do our best to write valid JSON by ensuring we always write + // the closing ]. If a previous encode fails, this could still + // be insufficient. + defer func() { + if _, werr := w.Write([]byte("]\n")); werr != nil { + werr = fmt.Errorf("write trailer: %w", err) if err == nil { - err = fmt.Errorf("fatal error: %w", v.Err) - continue + err = werr } - err = fmt.Errorf("additional error: %w", v.Err) - continue } - // add the item to our list of items. - items = append(items, *v.Item) + }() + + var b bytes.Buffer + e := json.NewEncoder(&b) + + // Process the first item before looping so separating + // comma can be written easily. + i, ok := <-results + if !ok { + return nil } - if err != nil { + if err := i.encode(w, &b, e); err != nil { return err } - b, err := json.MarshalIndent(items, "", "\t") - if err != nil { - return err + for i := range results { + if _, err := w.Write([]byte(",")); err != nil { + return fmt.Errorf("write set: %w", err) + } + + if err := i.encode(w, &b, e); err != nil { + return err + } } - fmt.Printf("%s\n", b) + return nil } -func fileLines(file string) []string { +// producer reads lines from file sending them to work. +// work will be closed before return. +func producer(file string, work chan<- string) error { + defer close(work) + f, err := os.Open(file) if err != nil { - log.Fatal(err) + return err } defer f.Close() - result := make([]string, 0, 1000) - scanner := bufio.NewScanner(f) - for scanner.Scan() { - line := scanner.Text() - result = append(result, line) + s := bufio.NewScanner(f) + for s.Scan() { + work <- s.Text() } - return result + + return s.Err() } -// worker is run in a goroutine to provide processing for the items. -func worker(jobChan <-chan string, results chan<- BulkResponseItemWork) { - for entry := range jobChan { - item, err := processBulkEntry(entry) - results <- BulkResponseItemWork{ - Item: item, - Err: err, - } +// worker calls processBulkEntry for each item read from work, writing the result to results. +func worker(work <-chan string, results chan<- BulkResponseItem, client *svrquery.BulkClient) { + for e := range work { + results <- processBulkEntry(e, client) } } -// processBulkEntry processes an entry and returns an item containing the result or error. -func processBulkEntry(entry string) (*BulkResponseItem, error) { +// processBulkEntry decodes and processes an entry returning an item containing the result or error. +func processBulkEntry(entry string, client *svrquery.BulkClient) (item BulkResponseItem) { querySection, addressSection, err := parseEntry(entry) if err != nil { - return nil, fmt.Errorf("parse file entry: %w", err) + item.Error = fmt.Sprintf("parse file entry: %s", err) + return item } - item := &BulkResponseItem{ - Address: addressSection, - } + item.Address = addressSection - // If the query contains any options retrieve them and + // If the query contains any options retrieve and use them. querySection, options, err := parseOptions(querySection) if err != nil { - // These errors are non fatal, as we know which server it is for item.Error = err.Error() - return item, nil - } - - if !protocol.Supported(querySection) { - item.Error = fmt.Sprintf("unsupported protocol: %s", querySection) - return item, nil + return item } - client, err := svrquery.NewClient(querySection, addressSection, options...) - if err != nil { - item.Error = fmt.Sprintf("create client: %s", err) - return item, nil - } - - resp, err := client.Query() + resp, err := client.Query(querySection, addressSection, options...) if err != nil { item.Error = fmt.Sprintf("query client: %s", err) - return item, nil + return item } item.ServerInfo = &BulkResponseServerInfoItem{ @@ -177,23 +205,26 @@ func processBulkEntry(entry string) (*BulkResponseItem, error) { if currentMap, ok := resp.(protocol.Mapper); ok { item.ServerInfo.Map = currentMap.Map() } - return item, nil + return item } +// pareEntry parses the details from entry returning the query and address sections. func parseEntry(entry string) (querySection, addressSection string, err error) { entry = strings.TrimSpace(entry) if entry == "" { - return "", "", fmt.Errorf("process entry: %w", errNoItem) + return "", "", fmt.Errorf("parse entry %q: %w", entry, errNoItem) } + sections := strings.Split(entry, " ") if len(sections) != 2 { - return "", "", fmt.Errorf("%w: wrong number of sections", errEntryInvalid) + return "", "", fmt.Errorf("%w %q: wrong number of sections %d", errEntryInvalid, entry, len(sections)) } return sections[0], sections[1], nil } -func parseOptions(querySection string) (baseQuery string, options []svrquery.Option, error error) { +// parseOptions parses querySection returning the baseQuery and query options. +func parseOptions(querySection string) (baseQuery string, options []svrquery.Option, err error) { options = make([]svrquery.Option, 0) protocolSections := strings.Split(querySection, ",") for i := 1; i < len(protocolSections); i++ { @@ -202,6 +233,7 @@ func parseOptions(querySection string) (baseQuery string, options []svrquery.Opt return "", nil, fmt.Errorf("key value pair invalid: %v", keyVal) } + // Only support key at the moment. switch strings.ToLower(keyVal[0]) { case "key": diff --git a/cmd/cli/main.go b/cmd/cli/main.go index 15ad762..d8e0c46 100644 --- a/cmd/cli/main.go +++ b/cmd/cli/main.go @@ -26,7 +26,7 @@ func main() { if *file != "" { // Use bulk file mode - if err := queryBulk(*file); err != nil { + if err := queryBulk(*file, os.Stdout); err != nil { l.Fatal(err) } return diff --git a/lib/svrquery/bulk_client.go b/lib/svrquery/bulk_client.go new file mode 100644 index 0000000..7fc2782 --- /dev/null +++ b/lib/svrquery/bulk_client.go @@ -0,0 +1,54 @@ +package svrquery + +import ( + "net" + + "github.com/multiplay/go-svrquery/lib/svrquery/protocol" +) + +// BulkClient is a client which can be reused with multiple requests. +type BulkClient struct { + client *Client +} + +// NewBulkClient creates a new client with no protocol or +func NewBulkClient(options ...Option) (*BulkClient, error) { + c := &Client{ + network: DefaultNetwork, + timeout: DefaultTimeout, + } + + for _, o := range options { + if err := o(c); err != nil { + return nil, err + } + } + + return &BulkClient{client: c}, nil +} + +// Query runs a query against addr with proto and options. +func (b *BulkClient) Query(proto, addr string, options ...Option) (protocol.Responser, error) { + f, err := protocol.Get(proto) + if err != nil { + return nil, err + } + + for _, o := range options { + if err := o(b.client); err != nil { + return nil, err + } + } + + b.client.Queryer = f(b.client) + + if b.client.ua, err = net.ResolveUDPAddr(b.client.network, addr); err != nil { + return nil, err + } + + if b.client.c, err = net.DialUDP(b.client.network, nil, b.client.ua); err != nil { + return nil, err + } + + return b.client.Query() +}