diff --git a/CHANGELOG.md b/CHANGELOG.md index b3ac9c3..26e819b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,11 +1,17 @@ # Change Log All notable changes to this project will be documented in this file. +## 3.0.0 (2018-08-28) +- The daemon now serves as a proxy to the X-Ray SDK for API calls that are related to sampling rules. The proxy runs on TCP port 2000 and relays calls to get sampling rules and report sampling statistics to X-Ray. The TCP proxy server address can be configured from command line using `t` flag or by using `cfg.yaml` version `2` file. +- `cfg.yaml` file version is changed to `2` and has an extra attribute. The daemon supports version `1` of cfg.yaml: -## 2.1.3 (2018-06-13) -- Updated AWS SDK dependency version to `1.14.1` -- Refactored `MockTimerClient` -- Moving version number variable to `cfg` package +``` + Socket: + # Change the address and port on which the daemon listens for HTTP requests to proxy to AWS X-Ray. + TCPAddress: "127.0.0.1:2000" +``` +- Adds timestamp header `X-Amzn-Xray-Timestamp` to PutTraceSegments API calls made by the daemon +- Adding support for configuring `ProxyAddress` through command line: PR [#10](https://github.com/aws/aws-xray-daemon/pull/10) ## 2.1.2 (2018-05-14) - SystemD service file updates for Debian and Linux binaries: PR [#3](https://github.com/aws/aws-xray-daemon/pull/3) diff --git a/README.md b/README.md index afa7974..d2c0f37 100644 --- a/README.md +++ b/README.md @@ -46,15 +46,16 @@ Usage: X-Ray [options] 1. -a --resource-arn Amazon Resource Name (ARN) of the AWS resource running the daemon. 2. -o --local-mode Don't check for EC2 instance metadata. 3. -m --buffer-memory Change the amount of memory in MB that buffers can use (minimum 3). -4. -n --region Send segments to the X-Ray service in a specific region. -5. -b --bind Overrides default UDP address (127.0.0.1:2000). -6. -r --role-arn Assume the specified IAM role to upload segments to a different account. -7. -c --config Load a configuration file from the specified path. -8. -f --log-file Output logs to the specified file path. -9. -l --log-level Log level, from most verbose to least: dev, debug, info, warn, error, prod (default). -10. -p --proxy-address Proxy address through which to upload segments. -11. -v --version Show AWS X-Ray daemon version. -12. -h --help Show this screen +4. -n --region Send segments to X-Ray service in a specific region. +5. -b --bind Overrides default UDP address (127.0.0.1:2000). +6. -t --bind-tcp Overrides default TCP address (127.0.0.1:2000). +7. -r --role-arn Assume the specified IAM role to upload segments to a different account. +8. -c --config Load a configuration file from the specified path. +9. -f --log-file Output logs to the specified file path. +10. -l --log-level Log level, from most verbose to least: dev, debug, info, warn, error, prod (default). +11. -p --proxy-address Proxy address through which to upload segments. +12. -v --version Show AWS X-Ray daemon version. +13. -h --help Show this screen ## Build diff --git a/VERSION b/VERSION index abae0d9..56fea8a 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -2.1.3 \ No newline at end of file +3.0.0 \ No newline at end of file diff --git a/daemon/cfg.yaml b/daemon/cfg.yaml index f85c740..a5daa10 100644 --- a/daemon/cfg.yaml +++ b/daemon/cfg.yaml @@ -9,6 +9,8 @@ Endpoint: "" Socket: # Change the address and port on which the daemon listens for UDP packets containing segment documents. UDPAddress: "127.0.0.1:2000" + # Change the address and port on which the daemon listens for HTTP requests to proxy to AWS X-Ray. + TCPAddress: "127.0.0.1:2000" Logging: LogRotation: true # Change the log level, from most verbose to least: dev, debug, info, warn, error, prod (default). @@ -26,4 +28,4 @@ NoVerifySSL: false # Upload segments to AWS X-Ray through a proxy. ProxyAddress: "" # Daemon configuration file format version. -Version: 1 \ No newline at end of file +Version: 2 diff --git a/daemon/cfg/cfg.go b/daemon/cfg/cfg.go index 507b1e7..8cbb627 100644 --- a/daemon/cfg/cfg.go +++ b/daemon/cfg/cfg.go @@ -17,13 +17,16 @@ import ( "reflect" "github.com/aws/aws-xray-daemon/daemon/util" + "gopkg.in/yaml.v2" log "github.com/cihub/seelog" ) // Version number of the X-Ray daemon. -const Version = "2.1.3" +const Version = "3.0.0" + +var cfgFileVersions = [...]int{1, 2} // Supported versions of cfg.yaml file. var configLocations = []string{ "/etc/amazon/xray/cfg.yaml", @@ -54,8 +57,15 @@ type Config struct { Socket struct { // Address and port on which the daemon listens for UDP packets containing segment documents. UDPAddress string `yaml:"UDPAddress"` + TCPAddress string `yaml:"TCPAddress"` } `yaml:"Socket"` + ProxyServer struct { + IdleConnTimeout int + MaxIdleConnsPerHost int + MaxIdleConns int + } + // Structure for logging. Logging struct { // LogRotation, if true, will rotate log after 50 MB size of current log file. @@ -94,8 +104,20 @@ func DefaultConfig() *Config { Region: "", Socket: struct { UDPAddress string `yaml:"UDPAddress"` + TCPAddress string `yaml:"TCPAddress"` }{ UDPAddress: "127.0.0.1:2000", + TCPAddress: "127.0.0.1:2000", + }, + ProxyServer: struct { + IdleConnTimeout int + MaxIdleConnsPerHost int + MaxIdleConns int + }{ + + IdleConnTimeout: 30, + MaxIdleConnsPerHost: 2, + MaxIdleConns: 0, }, Logging: struct { LogRotation *bool `yaml:"LogRotation"` @@ -321,11 +343,23 @@ func contains(s []string, e string) bool { func merge(configFile string) *Config { userConfig := loadConfigFromFile(configFile) - if userConfig.Version != 1 { + versionMatch := false + for i := 0; i < len(cfgFileVersions); i++ { + if cfgFileVersions[i] == userConfig.Version { + versionMatch = true + break + } + } + + if !versionMatch { errorAndExit("Config Version Setting is not correct. Use X-Ray Daemon Config Migration Script to update the config file. Please refer to AWS X-Ray Documentation for more information.", nil) } userConfig.Socket.UDPAddress = getStringValue(userConfig.Socket.UDPAddress, DefaultConfig().Socket.UDPAddress) + userConfig.Socket.TCPAddress = getStringValue(userConfig.Socket.TCPAddress, DefaultConfig().Socket.TCPAddress) + userConfig.ProxyServer.IdleConnTimeout = DefaultConfig().ProxyServer.IdleConnTimeout + userConfig.ProxyServer.MaxIdleConnsPerHost = DefaultConfig().ProxyServer.MaxIdleConnsPerHost + userConfig.ProxyServer.MaxIdleConns = DefaultConfig().ProxyServer.MaxIdleConns userConfig.TotalBufferSizeMB = getIntValue(userConfig.TotalBufferSizeMB, DefaultConfig().TotalBufferSizeMB) userConfig.ResourceARN = getStringValue(userConfig.ResourceARN, DefaultConfig().ResourceARN) userConfig.RoleARN = getStringValue(userConfig.RoleARN, DefaultConfig().RoleARN) diff --git a/daemon/cfg/cfg_test.go b/daemon/cfg/cfg_test.go index 1d9bc21..bef9506 100644 --- a/daemon/cfg/cfg_test.go +++ b/daemon/cfg/cfg_test.go @@ -11,6 +11,7 @@ package cfg import ( "errors" + "io/ioutil" "os" "os/exec" @@ -23,6 +24,8 @@ import ( var errFile = "error.log" var tstFileName = "test_config.yaml" var tstFilePath string +var version2 = 2 +var version1 = 1 func setupTestCase() { LogFile = errFile @@ -55,6 +58,7 @@ func TestLoadConfigFromBytes(t *testing.T) { configString := `Socket: UDPAddress: "127.0.0.1:2000" + TCPAddress: "127.0.0.1:2000" TotalBufferSizeMB: 16 Region: "us-east-1" Endpoint: "https://xxxx.xxxx.com" @@ -68,11 +72,12 @@ Logging: NoVerifySSL: false LocalMode: false ProxyAddress: "" -Version: 1` +Version: 2` c := loadConfigFromBytes([]byte(configString)) assert.EqualValues(t, c.Socket.UDPAddress, "127.0.0.1:2000") + assert.EqualValues(t, c.Socket.TCPAddress, "127.0.0.1:2000") assert.EqualValues(t, c.TotalBufferSizeMB, 16) assert.EqualValues(t, c.Region, "us-east-1") assert.EqualValues(t, c.Endpoint, "https://xxxx.xxxx.com") @@ -85,7 +90,7 @@ Version: 1` assert.EqualValues(t, *c.NoVerifySSL, false) assert.EqualValues(t, *c.LocalMode, false) assert.EqualValues(t, c.ProxyAddress, "") - assert.EqualValues(t, c.Version, 1) + assert.EqualValues(t, c.Version, version2) } func TestLoadConfigFromBytesTypeError(t *testing.T) { @@ -116,6 +121,7 @@ func TestLoadConfigFromFile(t *testing.T) { configString := `Socket: UDPAddress: "127.0.0.1:2000" + TCPAddress: "127.0.0.1:2000" TotalBufferSizeMB: 16 Region: "us-east-1" Endpoint: "https://xxxx.xxxx.com" @@ -129,7 +135,7 @@ Logging: NoVerifySSL: false LocalMode: false ProxyAddress: "" -Version: 1` +Version: 2` setupTestFile(configString) c := loadConfigFromFile(tstFilePath) @@ -147,7 +153,7 @@ Version: 1` assert.EqualValues(t, *c.NoVerifySSL, false) assert.EqualValues(t, *c.LocalMode, false) assert.EqualValues(t, c.ProxyAddress, "") - assert.EqualValues(t, c.Version, 1) + assert.EqualValues(t, c.Version, version2) clearTestFile() } @@ -177,7 +183,7 @@ func TestLoadConfigFromFileDoesNotExist(t *testing.T) { tearTestCase() } -func TestLoadConfig(t *testing.T) { +func TestLoadConfigVersion1(t *testing.T) { configString := `Socket: UDPAddress: "127.0.0.1:2000" @@ -201,6 +207,49 @@ Version: 1` c := LoadConfig("") assert.EqualValues(t, c.Socket.UDPAddress, "127.0.0.1:2000") + assert.EqualValues(t, c.Socket.TCPAddress, "127.0.0.1:2000") // TCP address for V! cfg.yaml + assert.EqualValues(t, c.TotalBufferSizeMB, 16) + assert.EqualValues(t, c.Region, "us-east-1") + assert.EqualValues(t, c.Endpoint, "https://xxxx.xxxx.com") + assert.EqualValues(t, c.ResourceARN, "") + assert.EqualValues(t, c.RoleARN, "") + assert.EqualValues(t, c.Concurrency, 8) + assert.EqualValues(t, c.Logging.LogLevel, "prod") + assert.EqualValues(t, c.Logging.LogPath, "") + assert.EqualValues(t, *c.Logging.LogRotation, true) + assert.EqualValues(t, *c.NoVerifySSL, false) + assert.EqualValues(t, *c.LocalMode, false) + assert.EqualValues(t, c.ProxyAddress, "") + assert.EqualValues(t, c.Version, version1) + clearTestFile() +} + +func TestLoadConfigVersion2(t *testing.T) { + configString := + `Socket: + UDPAddress: "127.0.0.1:2000" + TCPAddress : "127.0.0.2:3000" +TotalBufferSizeMB: 16 +Region: "us-east-1" +Endpoint: "https://xxxx.xxxx.com" +ResourceARN: "" +RoleARN: "" +Concurrency: 8 +Logging: + LogRotation: true + LogPath: "" + LogLevel: "prod" +NoVerifySSL: false +LocalMode: false +ProxyAddress: "" +Version: 2` + setupTestFile(configString) + configLocations = append([]string{tstFilePath}, configLocations...) + + c := LoadConfig("") + + assert.EqualValues(t, c.Socket.UDPAddress, "127.0.0.1:2000") + assert.EqualValues(t, c.Socket.TCPAddress, "127.0.0.2:3000") assert.EqualValues(t, c.TotalBufferSizeMB, 16) assert.EqualValues(t, c.Region, "us-east-1") assert.EqualValues(t, c.Endpoint, "https://xxxx.xxxx.com") @@ -213,7 +262,7 @@ Version: 1` assert.EqualValues(t, *c.NoVerifySSL, false) assert.EqualValues(t, *c.LocalMode, false) assert.EqualValues(t, c.ProxyAddress, "") - assert.EqualValues(t, c.Version, 1) + assert.EqualValues(t, c.Version, version2) clearTestFile() } @@ -239,11 +288,12 @@ RoleARN: "" Concurrency: 8 Logging: LogRotation: false -Version: 1` +Version: 2` setupTestFile(configString) c := merge(tstFilePath) assert.EqualValues(t, c.Socket.UDPAddress, "127.0.0.1:3000") + assert.EqualValues(t, c.Socket.TCPAddress, "127.0.0.1:2000") // set to default value assert.EqualValues(t, c.TotalBufferSizeMB, 8) assert.EqualValues(t, c.Region, "us-east-2") assert.EqualValues(t, c.Endpoint, "https://xxxx.xxxx.com") @@ -256,7 +306,7 @@ Version: 1` assert.EqualValues(t, *c.NoVerifySSL, false) assert.EqualValues(t, *c.LocalMode, false) assert.EqualValues(t, c.ProxyAddress, "") - assert.EqualValues(t, c.Version, 1) + assert.EqualValues(t, c.Version, version2) clearTestFile() } @@ -310,6 +360,57 @@ Concurrency: 8` tearTestCase() } +func TestConfigUnsupportedVersionSet(t *testing.T) { + setupTestCase() + configString := + `Socket: + UDPAddress: "127.0.0.1:3000" +TotalBufferSizeMB: 8 +Region: "us-east-2" +Endpoint: "https://xxxx.xxxx.com" +ResourceARN: "" +RoleARN: "" +Concurrency: 8 +Version: 10000` + + goPath, err := setupTestFile(configString) + + // Only run the failing part when a specific env variable is set + if os.Getenv("TEST_CONFIG_UNSUPPORTED_VERSION") == "1" { + merge(tstFilePath) + return + } + + // Start the actual test in a different subprocess + cmd := exec.Command(os.Args[0], "-test.run=TestConfigUnsupportedVersionSet") + cmd.Env = append(os.Environ(), "TEST_CONFIG_UNSUPPORTED_VERSION=1") + if cmdErr := cmd.Start(); cmdErr != nil { + t.Fatal(cmdErr) + } + + // Check that the program exited + error := cmd.Wait() + if e, ok := error.(*exec.ExitError); !ok || e.Success() { + t.Fatalf("Process ran with err %v, want exit status 1", err) + } + + // Check if the log message is what we expected + if _, logErr := os.Stat(goPath + "/" + errFile); os.IsNotExist(logErr) { + t.Fatal(logErr) + } + gotBytes, err := ioutil.ReadFile(goPath + "/" + errFile) + if err != nil { + t.Fatal(err) + } + got := string(gotBytes) + expected := "Config Version Setting is not correct." + if !strings.Contains(got, expected) { + t.Fatalf("Unexpected log message. Got %s but should contain %s", got, expected) + } + clearTestFile() + tearTestCase() +} + func TestUseMemoryLimitInConfig(t *testing.T) { setupTestCase() configString := @@ -321,7 +422,7 @@ Endpoint: "https://xxxx.xxxx.com" ResourceARN: "" RoleARN: "" Concurrency: 8 -Version: 1` +Version: 2` goPath, err := setupTestFile(configString) @@ -366,7 +467,7 @@ func TestConfigValidationForNotSupportFlags(t *testing.T) { configString := `Socket: BufferSizeKB: 128 -Version: 1` +Version: 2` goPath, err := setupTestFile(configString) @@ -411,7 +512,7 @@ func TestConfigValidationForNeedMigrationFlag(t *testing.T) { configString := `Processor: Region: "" -Version: 1` +Version: 2` goPath, err := setupTestFile(configString) @@ -454,7 +555,7 @@ Version: 1` func TestConfigValidationForInvalidFlag(t *testing.T) { setupTestCase() configString := `ABCDE: true -Version: 1` +Version: 2` goPath := os.Getenv("PWD") if goPath == "" { @@ -505,14 +606,13 @@ Version: 1` } func TestValidConfigArray(t *testing.T) { - validString := []string{"TotalBufferSizeMB", "Concurrency", "Endpoint", "Region", "Socket.UDPAddress", "Logging.LogRotation", "Logging.LogLevel", "Logging.LogPath", - "LocalMode", "ResourceARN", "RoleARN", "NoVerifySSL", "ProxyAddress", "Version"} + validString := []string{"TotalBufferSizeMB", "Concurrency", "Endpoint", "Region", "Socket.UDPAddress", "Socket.TCPAddress", "ProxyServer.IdleConnTimeout", "ProxyServer.MaxIdleConnsPerHost", "ProxyServer.MaxIdleConns", "Logging.LogRotation", "Logging.LogLevel", "Logging.LogPath", "LocalMode", "ResourceARN", "RoleARN", "NoVerifySSL", "ProxyAddress", "Version"} testString := validConfigArray() if len(validString) != len(testString) { t.Fatalf("Unexpect test array length. Got %v but should be %v", len(testString), len(validString)) } for i, v := range validString { - if v != testString[i] { + if !strings.EqualFold(v, testString[i]) { t.Fatalf("Unexpect Flag in test array. Got %v but should be %v", testString[i], v) } } @@ -527,7 +627,7 @@ Region: "us-east-2" Endpoint: "https://xxxx.xxxx.com" ResourceARN: "" RoleARN: "" -Version: 1` +Version: 2` setupTestFile(configString) diff --git a/daemon/conn/conn.go b/daemon/conn/conn.go index adc3706..591d021 100644 --- a/daemon/conn/conn.go +++ b/daemon/conn/conn.go @@ -128,6 +128,35 @@ func GetAWSConfigSession(cn connAttr, c *cfg.Config, roleArn string, region stri return config, s } +// ProxyServerTransport configures HTTP transport for TCP Proxy Server. +func ProxyServerTransport(config *cfg.Config) *http.Transport { + tls := &tls.Config{ + InsecureSkipVerify: *config.NoVerifySSL, + } + + proxyAddr := getProxyAddress(config.ProxyAddress) + proxyURL := getProxyURL(proxyAddr) + + // Connection timeout in seconds + idleConnTimeout := time.Duration(config.ProxyServer.IdleConnTimeout) * time.Second + + transport := &http.Transport{ + MaxIdleConns: config.ProxyServer.MaxIdleConns, + MaxIdleConnsPerHost: config.ProxyServer.MaxIdleConnsPerHost, + IdleConnTimeout: idleConnTimeout, + Proxy: http.ProxyURL(proxyURL), + TLSClientConfig: tls, + + // If not disabled the transport will add a gzip encoding header + // to requests with no `accept-encoding` header value. The header + // is added after we sign the request which invalidates the + // signature. + DisableCompression: true, + } + + return transport +} + func (c *Conn) newAWSSession(roleArn string) *session.Session { var s *session.Session var err error diff --git a/daemon/daemon.go b/daemon/daemon.go index 06a4b63..6afd969 100644 --- a/daemon/daemon.go +++ b/daemon/daemon.go @@ -27,6 +27,7 @@ import ( "github.com/aws/aws-xray-daemon/daemon/logger" "github.com/aws/aws-xray-daemon/daemon/processor" "github.com/aws/aws-xray-daemon/daemon/profiler" + "github.com/aws/aws-xray-daemon/daemon/proxy" "github.com/aws/aws-xray-daemon/daemon/ringbuffer" "github.com/aws/aws-xray-daemon/daemon/socketconn" "github.com/aws/aws-xray-daemon/daemon/socketconn/udp" @@ -49,6 +50,8 @@ const protocolSeparator = "\n" const logRotationSize int64 = 50 * 1024 * 1024 var udpAddress string +var tcpAddress string + var stdFlag int var socketConnection string var cpuProfile string @@ -85,6 +88,9 @@ type Daemon struct { // Reference to Processor. processor *processor.Processor + + // HTTP Proxy server + server *proxy.Server } func init() { @@ -114,6 +120,7 @@ func initCli(configFile string) (*cli.Flag, *cfg.Config) { defaultLogPath = cnfg.Logging.LogPath defaultLogLevel = cnfg.Logging.LogLevel defaultUDPAddress = cnfg.Socket.UDPAddress + defaultTCPAddress = cnfg.Socket.TCPAddress defaultRoleARN = cnfg.RoleARN defaultLocalMode = cnfg.LocalMode defaultRegion = cnfg.Region @@ -127,6 +134,7 @@ func initCli(configFile string) (*cli.Flag, *cfg.Config) { flag.IntVarF(&daemonProcessBufferMemoryMB, "buffer-memory", "m", defaultDaemonProcessSpaceLimitMB, "Change the amount of memory in MB that buffers can use (minimum 3).") flag.StringVarF(®ionFlag, "region", "n", defaultRegion, "Send segments to X-Ray service in a specific region.") flag.StringVarF(&udpAddress, "bind", "b", defaultUDPAddress, "Overrides default UDP address (127.0.0.1:2000).") + flag.StringVarF(&tcpAddress, "bind-tcp", "t", defaultTCPAddress, "Overrides default TCP address (127.0.0.1:2000).") flag.StringVarF(&roleArn, "role-arn", "r", defaultRoleARN, "Assume the specified IAM role to upload segments to a different account.") flag.StringVarF(&configFilePath, "config", "c", "", "Load a configuration file from the specified path.") flag.StringVarF(&logFile, "log-file", "f", defaultLogPath, "Output logs to the specified file path.") @@ -195,12 +203,21 @@ func initDaemon(config *cfg.Config) *Daemon { // If calculated number of buffer is lower than our default, use calculated one. Otherwise, use default value. parameterConfig.Processor.BatchSize = util.GetMinIntValue(parameterConfig.Processor.BatchSize, bufferLimit) + config.Socket.TCPAddress = tcpAddress // assign final tcp address either through config file or cmd line + // Create proxy http server + server, err := proxy.NewServer(config, awsConfig, session) + if err != nil { + log.Errorf("Unable to start http proxy server: %v", err) + os.Exit(1) + } + daemon := &Daemon{ done: make(chan bool), std: std, pool: bufferPool, count: 0, sock: sock, + server: server, processor: processor.New(awsConfig, session, processorCount, std, bufferPool, parameterConfig), } @@ -208,6 +225,9 @@ func initDaemon(config *cfg.Config) *Daemon { } func runDaemon(daemon *Daemon) { + // Start http server for proxying requests to xray + go daemon.server.Serve() + for i := 0; i < receiverCount; i++ { go daemon.poll() } @@ -232,6 +252,7 @@ func (d *Daemon) close() { func (d *Daemon) stop() { d.sock.Close() + d.server.Close() } // Returns number of bytes read from socket connection. diff --git a/daemon/proxy/server.go b/daemon/proxy/server.go new file mode 100644 index 0000000..2bf2451 --- /dev/null +++ b/daemon/proxy/server.go @@ -0,0 +1,144 @@ +// Package proxy provides an http server to act as a signing proxy for SDKs calling AWS X-Ray APIs +package proxy + +import ( + "bytes" + "errors" + "fmt" + "io" + "io/ioutil" + "net" + "net/http" + "net/http/httputil" + "net/url" + "os" + "time" + + "github.com/aws/aws-sdk-go/aws/endpoints" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/session" + "github.com/aws/aws-sdk-go/aws/signer/v4" + "github.com/aws/aws-xray-daemon/daemon/cfg" + "github.com/aws/aws-xray-daemon/daemon/conn" + log "github.com/cihub/seelog" +) + +const service = "xray" +const connHeader = "Connection" + +// Server represents HTTP server. +type Server struct { + *http.Server +} + +// NewServer returns a proxy server listening on the given address. +// Requests are forwarded to the endpoint in the given config. +// Requests are signed using credentials from the given config. +func NewServer(cfg *cfg.Config, awsCfg *aws.Config, sess *session.Session) (*Server, error) { + + _, err := net.ResolveTCPAddr("tcp", cfg.Socket.TCPAddress) + if err != nil { + log.Errorf("%v", err) + os.Exit(1) + } + + endPoint, er := getServiceEndpoint(awsCfg) + if er != nil { + return nil, fmt.Errorf("%v", er) + } + // Parse url from endpoint + url, err := url.Parse(endPoint) + if err != nil { + return nil, fmt.Errorf("unable to parse xray endpoint: %v", err) + } + + signer := &v4.Signer{ + Credentials: sess.Config.Credentials, + } + + transport := conn.ProxyServerTransport(cfg) + + // Reverse proxy handler + handler := &httputil.ReverseProxy{ + Transport: transport, + + // Handler for modifying and forwarding requests + Director: func(req *http.Request) { + // Remove connection header before signing request, otherwise the + // reverse-proxy will remove the header before forwarding to X-Ray + // resulting in a signed header being missing from the request. + req.Header.Del(connHeader) + + // Set req url to xray endpoint + req.URL.Scheme = url.Scheme + req.URL.Host = url.Host + req.Host = url.Host + + // Consume body and convert to io.ReadSeeker for signer to consume + body, err := consume(req.Body) + if err != nil { + log.Errorf("Unable to consume request body: %v", err) + + // Forward unsigned request + return + } + + // Sign request. signer.Sign() also repopulates the request body. + _, err = signer.Sign(req, body, service, *awsCfg.Region, time.Now()) + if err != nil { + log.Errorf("Unable to sign request: %v", err) + } + }, + } + + server := &http.Server{ + Addr: cfg.Socket.TCPAddress, + Handler: handler, + } + + p := &Server{server} + + return p, nil +} + +// consume readsAll() the body and creates a new io.ReadSeeker from the content. v4.Signer +// requires an io.ReadSeeker to be able to sign requests. May return a nil io.ReadSeeker. +func consume(body io.ReadCloser) (io.ReadSeeker, error) { + var buf []byte + + // Return nil ReadSeeker if body is nil + if body == nil { + return nil, nil + } + + // Consume body + buf, err := ioutil.ReadAll(body) + if err != nil { + return nil, err + } + + return bytes.NewReader(buf), nil +} + +// Serve starts server. +func (s *Server) Serve() { + log.Infof("Starting proxy http server on %s", s.Addr) + s.ListenAndServe() +} + +// Close stops server. +func (s *Server) Close() { + s.Server.Close() +} + +func getServiceEndpoint(awsCfg *aws.Config) (string, error) { + if awsCfg.Endpoint == nil || *awsCfg.Endpoint == "" { + if awsCfg.Region == nil || *awsCfg.Region == "" { + return "", errors.New("unable to generate endpoint from region with nil value") + } + resolved, err := endpoints.DefaultResolver().EndpointFor(service, *awsCfg.Region) + return resolved.URL, err + } + return *awsCfg.Endpoint, nil +} diff --git a/daemon/proxy/server_test.go b/daemon/proxy/server_test.go new file mode 100644 index 0000000..9a2c9ff --- /dev/null +++ b/daemon/proxy/server_test.go @@ -0,0 +1,157 @@ +package proxy + +import ( + "io" + "io/ioutil" + "net/http" + "net/http/httputil" + "net/url" + "strings" + "testing" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/credentials" + "github.com/aws/aws-sdk-go/aws/session" + "github.com/aws/aws-xray-daemon/daemon/cfg" + "github.com/stretchr/testify/assert" +) + +// Assert that consume returns a ReadSeeker with the same content as the +// ReadCloser passed in. +func TestConsume(t *testing.T) { + // Create an io.Reader + r := strings.NewReader("Content") + + // Create an io.ReadCloser + rc := ioutil.NopCloser(r) + + // Consume ReadCloser and create ReadSeeker + rs, err := consume(rc) + assert.Nil(t, err) + + // Read from ReadSeeker + bytes, err := ioutil.ReadAll(rs) + assert.Nil(t, err) + + // Assert contents of bytes are same as contents of original Reader + assert.Equal(t, "Content", string(bytes)) +} + +// Assert that consume returns a nil ReadSeeker when a nil ReadCloser is passed in +func TestConsumeNilBody(t *testing.T) { + // Create a nil io.ReadCloser + var rc io.ReadCloser + + // Consume ReadCloser and create ReadSeeker + rs, err := consume(rc) + assert.Nil(t, err) + assert.Nil(t, rs) +} + +// Assert that Director modifies the passed in http.Request +func TestDirector(t *testing.T) { + // Create dummy credentials to sign with + cred := credentials.NewStaticCredentials("id", "secret", "token") + + // Create dummy aws Config + awsCfg := &aws.Config{ + Endpoint: aws.String("https://xray.us-east-1.amazonaws.com"), + Region: aws.String("us-east-1"), + Credentials: cred, + } + + // Create dummy aws Session + sess := &session.Session{ + Config: awsCfg, + } + + // Create proxy server + s, err := NewServer(cfg.DefaultConfig(), awsCfg, sess) + assert.Nil(t, err) + + // Extract director from server + d := s.Handler.(*httputil.ReverseProxy).Director + + // Create http request to pass to director + url, err := url.Parse("http://127.0.0.1:2000") + assert.Nil(t, err) + + header := map[string][]string{ + "Connection": []string{}, + } + + req := &http.Request{ + URL: url, + Host: "127.0.0.1", + Header: header, + Body: ioutil.NopCloser(strings.NewReader("Body")), + } + + // Apply director to request + d(req) + + // Assert that the url was changed to point to AWS X-Ray + assert.Equal(t, "https", req.URL.Scheme) + assert.Equal(t, "xray.us-east-1.amazonaws.com", req.URL.Host) + assert.Equal(t, "xray.us-east-1.amazonaws.com", req.Host) + + // Assert that additional headers were added by the signer + assert.Contains(t, req.Header, "Authorization") + assert.Contains(t, req.Header, "X-Amz-Security-Token") + assert.Contains(t, req.Header, "X-Amz-Date") + assert.NotContains(t, req.Header, "Connection") +} + +// Fetching endpoint from aws config instance +func TestEndpoint1(t *testing.T) { + e := "https://xray.us-east-1.amazonaws.com" + awsCfg := &aws.Config{ + Endpoint: aws.String(e), // Endpoint value has higher priority than region value + Region: aws.String("us-west-1"), + } + result, err := getServiceEndpoint(awsCfg) + assert.Equal(t, e, result, "Fetching endpoint from config instance") + assert.Nil(t, err) +} + +// Generating endpoint from region value of awsCfg instance +func TestEndpoint2(t *testing.T) { + e := "https://xray.us-west-1.amazonaws.com" + awsCfg := &aws.Config{ + Region: aws.String("us-west-1"), // No endpoint + } + result, err := getServiceEndpoint(awsCfg) + assert.Equal(t, e, result, "Fetching endpoint from region") + assert.Nil(t, err) +} + +// Error received when no endpoint and region value present in awsCfg instance +func TestEndpoint3(t *testing.T) { + awsCfg := &aws.Config{ + // No endpoint and region value + } + result, err := getServiceEndpoint(awsCfg) + assert.Equal(t, "", result, "Endpoint cannot be created") + assert.NotNil(t, err) +} + +func TestEndpoint4(t *testing.T) { + awsCfg := &aws.Config{ + // region value set to "" + Region: aws.String(""), + } + result, err := getServiceEndpoint(awsCfg) + assert.Equal(t, "", result, "Endpoint cannot be created") + assert.NotNil(t, err) +} + +func TestEndpoint5(t *testing.T) { + e := "https://xray.us-west-1.amazonaws.com" + awsCfg := &aws.Config{ + Endpoint: aws.String(""), // Endpoint set to "" + Region: aws.String("us-west-1"), // No endpoint + } + result, err := getServiceEndpoint(awsCfg) + assert.Equal(t, e, result, "Endpoint created from region value") + assert.Nil(t, err) +} diff --git a/daemon/telemetry/telemetry.go b/daemon/telemetry/telemetry.go index 2bd1243..1424caa 100644 --- a/daemon/telemetry/telemetry.go +++ b/daemon/telemetry/telemetry.go @@ -12,9 +12,10 @@ package telemetry import ( "sync/atomic" "time" + "unsafe" + "github.com/aws/aws-xray-daemon/daemon/conn" "github.com/aws/aws-xray-daemon/daemon/util/timer" - "unsafe" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/awserr" diff --git a/daemon/util/test/mock_timer_client.go b/daemon/util/test/mock_timer_client.go index 4e8ca6d..f60415c 100644 --- a/daemon/util/test/mock_timer_client.go +++ b/daemon/util/test/mock_timer_client.go @@ -14,6 +14,7 @@ type timer struct { c chan time.Time } +// MockTimerClient contains mock timer client. type MockTimerClient struct { sync.RWMutex @@ -42,19 +43,21 @@ func (m *MockTimerClient) newTimer(d time.Duration, repeat bool) *timer { return t } +// After is mock of time.After(). func (m *MockTimerClient) After(d time.Duration) <-chan time.Time { atomic.AddUint64(&m.afterCalled, 1) return m.newTimer(d, false).c } +// Tick is mock of time.Tick(). func (m *MockTimerClient) Tick(d time.Duration) <-chan time.Time { atomic.AddUint64(&m.tickCalled, 1) return m.newTimer(d, true).c } -// simulate time passing and signal timers / tickers accordingly +// Advance simulates time passing and signal timers / tickers accordingly func (m *MockTimerClient) Advance(d time.Duration) { m.Lock() m.current = m.current.Add(d) diff --git a/daemon/util/util.go b/daemon/util/util.go index 112d158..6be33d3 100644 --- a/daemon/util/util.go +++ b/daemon/util/util.go @@ -48,6 +48,7 @@ func GetMinIntValue(a, b int) int { return b } +// Bool return pointer to input parameter func Bool(b bool) *bool { return &b }