diff --git a/cmd/cli/bulk.go b/cmd/cli/bulk.go index 536bf29..59b75e8 100644 --- a/cmd/cli/bulk.go +++ b/cmd/cli/bulk.go @@ -2,12 +2,14 @@ package main import ( "bufio" + "bytes" "encoding/json" "errors" "fmt" - "log" + "io" "os" "strings" + "sync" "github.com/multiplay/go-svrquery/lib/svrquery" "github.com/multiplay/go-svrquery/lib/svrquery/protocol" @@ -15,9 +17,6 @@ import ( const ( numWorkers = 100 - - // maxQueries is the maximum number of queries that can be queried in one bulk request. - maxQueries = 10000 ) var ( @@ -28,11 +27,27 @@ var ( // BulkResponseItem contains the information about the query being performed // against a single server. type BulkResponseItem struct { - Address string `json:"address"` + Address string `json:"address,omitempty"` ServerInfo *BulkResponseServerInfoItem `json:"serverInfo,omitempty"` Error string `json:"error,omitempty"` } +// encode writes the JSON encoded version of i to w using the encoder e which writes to b. +// It strips the trailing \n from the output before writing to w. +func (i *BulkResponseItem) encode(w io.Writer, b *bytes.Buffer, e *json.Encoder) error { + defer b.Reset() + + if err := e.Encode(i); err != nil { + return fmt.Errorf("encode item %v: %w", i, err) + } + + if _, err := w.Write(bytes.TrimRight(b.Bytes(), "\n")); err != nil { + return fmt.Errorf("write item: %w", err) + } + + return nil +} + // BulkResponseServerInfoItem containing basic server information. type BulkResponseServerInfoItem struct { CurrentPlayers int64 `json:"currentPlayers"` @@ -40,132 +55,145 @@ type BulkResponseServerInfoItem struct { Map string `json:"map"` } -// BulkResponseItemWork is an item returned by an worker containing the data item -// plus any terminal error it encountered. -type BulkResponseItemWork struct { - Item *BulkResponseItem - Err error -} +// queryBulk queries a bulk set of servers from a query file writing the JSON results to output. +func queryBulk(file string, output io.Writer) (err error) { + work := make(chan string, numWorkers) // Buffered to ensure we can busy all workers. + results := make(chan BulkResponseItem, numWorkers) // Buffered to improve worker concurrency. -// queryBulk queries a bulk set of servers using a query file. -func queryBulk(file string) error { - // To simplify the workerpool load all the entries we are going to work on - lines := fileLines(file) + // Create a pool of workers to process work. + var wgWorkers sync.WaitGroup + wgWorkers.Add(numWorkers) + for w := 1; w <= numWorkers; w++ { + c, err := svrquery.NewBulkClient() + if err != nil { + close(work) // Ensure that existing workers return. + return fmt.Errorf("bulk client: %w", err) + } - if len(lines) > maxQueries { - return fmt.Errorf("too many servers requested %d (max %d)", len(lines), maxQueries) + go func() { + defer wgWorkers.Done() + worker(work, results, c) + }() } - // Make a jobs channel and a number of workers to processes - // work off of the channel. - jobChan := make(chan string, len(lines)) - resultsChan := make(chan BulkResponseItemWork) - for w := 1; w <= numWorkers; w++ { - go worker(jobChan, resultsChan) + // Create a writer to write the results to output as they become available. + errc := make(chan error) + go func() { + errc <- writer(output, results) + }() + + // Queue work onto the channel. + if err = producer(file, work); err != nil { + err = fmt.Errorf("producer: %w", err) + } + + // Wait for all workers to complete so that we can safely close results + // that will trigger writer to return once its processed all results. + wgWorkers.Wait() + close(results) + + if werr := <-errc; werr != nil { + if err != nil { + return fmt.Errorf("%w, writer: %s", err, werr) + } + return fmt.Errorf("writer: %w", werr) } - items := make([]BulkResponseItem, 0, len(lines)) + return err +} - // Queue work onto the channel - for _, line := range lines { - jobChan <- line +// writer writes results as JSON encoded array to w. +func writer(w io.Writer, results <-chan BulkResponseItem) (err error) { + if _, err = w.Write([]byte{'['}); err != nil { + return fmt.Errorf("write header: %w", err) } - close(jobChan) - // Receive results from workers. - var err error - for i := 0; i < len(lines); i++ { - v := <-resultsChan - switch { - case errors.Is(v.Err, errNoItem): - // Not fatal, but no response for this entry was created. - continue - case v.Err != nil: - // We had a major issue processing the list + // Do our best to write valid JSON by ensuring we always write + // the closing ]. If a previous encode fails, this could still + // be insufficient. + defer func() { + if _, werr := w.Write([]byte("]\n")); werr != nil { + werr = fmt.Errorf("write trailer: %w", err) if err == nil { - err = fmt.Errorf("fatal error: %w", v.Err) - continue + err = werr } - err = fmt.Errorf("additional error: %w", v.Err) - continue } - // add the item to our list of items. - items = append(items, *v.Item) + }() + + var b bytes.Buffer + e := json.NewEncoder(&b) + + // Process the first item before looping so separating + // comma can be written easily. + i, ok := <-results + if !ok { + return nil } - if err != nil { + if err := i.encode(w, &b, e); err != nil { return err } - b, err := json.MarshalIndent(items, "", "\t") - if err != nil { - return err + for i := range results { + if _, err := w.Write([]byte(",")); err != nil { + return fmt.Errorf("write set: %w", err) + } + + if err := i.encode(w, &b, e); err != nil { + return err + } } - fmt.Printf("%s\n", b) + return nil } -func fileLines(file string) []string { +// producer reads lines from file sending them to work. +// work will be closed before return. +func producer(file string, work chan<- string) error { + defer close(work) + f, err := os.Open(file) if err != nil { - log.Fatal(err) + return err } defer f.Close() - result := make([]string, 0, 1000) - scanner := bufio.NewScanner(f) - for scanner.Scan() { - line := scanner.Text() - result = append(result, line) + s := bufio.NewScanner(f) + for s.Scan() { + work <- s.Text() } - return result + + return s.Err() } -// worker is run in a goroutine to provide processing for the items. -func worker(jobChan <-chan string, results chan<- BulkResponseItemWork) { - for entry := range jobChan { - item, err := processBulkEntry(entry) - results <- BulkResponseItemWork{ - Item: item, - Err: err, - } +// worker calls processBulkEntry for each item read from work, writing the result to results. +func worker(work <-chan string, results chan<- BulkResponseItem, client *svrquery.BulkClient) { + for e := range work { + results <- processBulkEntry(e, client) } } -// processBulkEntry processes an entry and returns an item containing the result or error. -func processBulkEntry(entry string) (*BulkResponseItem, error) { +// processBulkEntry decodes and processes an entry returning an item containing the result or error. +func processBulkEntry(entry string, client *svrquery.BulkClient) (item BulkResponseItem) { querySection, addressSection, err := parseEntry(entry) if err != nil { - return nil, fmt.Errorf("parse file entry: %w", err) + item.Error = fmt.Sprintf("parse file entry: %s", err) + return item } - item := &BulkResponseItem{ - Address: addressSection, - } + item.Address = addressSection - // If the query contains any options retrieve them and + // If the query contains any options retrieve and use them. querySection, options, err := parseOptions(querySection) if err != nil { - // These errors are non fatal, as we know which server it is for item.Error = err.Error() - return item, nil - } - - if !protocol.Supported(querySection) { - item.Error = fmt.Sprintf("unsupported protocol: %s", querySection) - return item, nil + return item } - client, err := svrquery.NewClient(querySection, addressSection, options...) - if err != nil { - item.Error = fmt.Sprintf("create client: %s", err) - return item, nil - } - - resp, err := client.Query() + resp, err := client.Query(querySection, addressSection, options...) if err != nil { item.Error = fmt.Sprintf("query client: %s", err) - return item, nil + return item } item.ServerInfo = &BulkResponseServerInfoItem{ @@ -177,23 +205,26 @@ func processBulkEntry(entry string) (*BulkResponseItem, error) { if currentMap, ok := resp.(protocol.Mapper); ok { item.ServerInfo.Map = currentMap.Map() } - return item, nil + return item } +// pareEntry parses the details from entry returning the query and address sections. func parseEntry(entry string) (querySection, addressSection string, err error) { entry = strings.TrimSpace(entry) if entry == "" { - return "", "", fmt.Errorf("process entry: %w", errNoItem) + return "", "", fmt.Errorf("parse entry %q: %w", entry, errNoItem) } + sections := strings.Split(entry, " ") if len(sections) != 2 { - return "", "", fmt.Errorf("%w: wrong number of sections", errEntryInvalid) + return "", "", fmt.Errorf("%w %q: wrong number of sections %d", errEntryInvalid, entry, len(sections)) } return sections[0], sections[1], nil } -func parseOptions(querySection string) (baseQuery string, options []svrquery.Option, error error) { +// parseOptions parses querySection returning the baseQuery and query options. +func parseOptions(querySection string) (baseQuery string, options []svrquery.Option, err error) { options = make([]svrquery.Option, 0) protocolSections := strings.Split(querySection, ",") for i := 1; i < len(protocolSections); i++ { @@ -202,6 +233,7 @@ func parseOptions(querySection string) (baseQuery string, options []svrquery.Opt return "", nil, fmt.Errorf("key value pair invalid: %v", keyVal) } + // Only support key at the moment. switch strings.ToLower(keyVal[0]) { case "key": diff --git a/cmd/cli/main.go b/cmd/cli/main.go index 15ad762..d8e0c46 100644 --- a/cmd/cli/main.go +++ b/cmd/cli/main.go @@ -26,7 +26,7 @@ func main() { if *file != "" { // Use bulk file mode - if err := queryBulk(*file); err != nil { + if err := queryBulk(*file, os.Stdout); err != nil { l.Fatal(err) } return diff --git a/lib/svrquery/bulk_client.go b/lib/svrquery/bulk_client.go new file mode 100644 index 0000000..7fc2782 --- /dev/null +++ b/lib/svrquery/bulk_client.go @@ -0,0 +1,54 @@ +package svrquery + +import ( + "net" + + "github.com/multiplay/go-svrquery/lib/svrquery/protocol" +) + +// BulkClient is a client which can be reused with multiple requests. +type BulkClient struct { + client *Client +} + +// NewBulkClient creates a new client with no protocol or +func NewBulkClient(options ...Option) (*BulkClient, error) { + c := &Client{ + network: DefaultNetwork, + timeout: DefaultTimeout, + } + + for _, o := range options { + if err := o(c); err != nil { + return nil, err + } + } + + return &BulkClient{client: c}, nil +} + +// Query runs a query against addr with proto and options. +func (b *BulkClient) Query(proto, addr string, options ...Option) (protocol.Responser, error) { + f, err := protocol.Get(proto) + if err != nil { + return nil, err + } + + for _, o := range options { + if err := o(b.client); err != nil { + return nil, err + } + } + + b.client.Queryer = f(b.client) + + if b.client.ua, err = net.ResolveUDPAddr(b.client.network, addr); err != nil { + return nil, err + } + + if b.client.c, err = net.DialUDP(b.client.network, nil, b.client.ua); err != nil { + return nil, err + } + + return b.client.Query() +}