Skip to content

Commit

Permalink
logging: adds Kafka logging endpoint support
Browse files Browse the repository at this point in the history
  • Loading branch information
mccurdyc committed May 28, 2020
1 parent f9d47ef commit b5021da
Show file tree
Hide file tree
Showing 13 changed files with 1,406 additions and 0 deletions.
6 changes: 6 additions & 0 deletions pkg/api/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,12 @@ type Interface interface {
UpdateHTTPS(*fastly.UpdateHTTPSInput) (*fastly.HTTPS, error)
DeleteHTTPS(*fastly.DeleteHTTPSInput) error

CreateKafka(*fastly.CreateKafkaInput) (*fastly.Kafka, error)
ListKafkas(*fastly.ListKafkasInput) ([]*fastly.Kafka, error)
GetKafka(*fastly.GetKafkaInput) (*fastly.Kafka, error)
UpdateKafka(*fastly.UpdateKafkaInput) (*fastly.Kafka, error)
DeleteKafka(*fastly.DeleteKafkaInput) error

GetUser(*fastly.GetUserInput) (*fastly.User, error)

GetRegions() (*fastly.RegionsResponse, error)
Expand Down
15 changes: 15 additions & 0 deletions pkg/app/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/fastly/cli/pkg/logging/heroku"
"github.com/fastly/cli/pkg/logging/honeycomb"
"github.com/fastly/cli/pkg/logging/https"
"github.com/fastly/cli/pkg/logging/kafka"
"github.com/fastly/cli/pkg/logging/logentries"
"github.com/fastly/cli/pkg/logging/loggly"
"github.com/fastly/cli/pkg/logging/logshuttle"
Expand Down Expand Up @@ -292,6 +293,13 @@ func Run(args []string, env config.Environment, file config.File, configFilePath
statsHistorical := stats.NewHistoricalCommand(statsRoot.CmdClause, &globals)
statsRealtime := stats.NewRealtimeCommand(statsRoot.CmdClause, &globals)

kafkaRoot := kafka.NewRootCommand(loggingRoot.CmdClause, &globals)
kafkaCreate := kafka.NewCreateCommand(kafkaRoot.CmdClause, &globals)
kafkaList := kafka.NewListCommand(kafkaRoot.CmdClause, &globals)
kafkaDescribe := kafka.NewDescribeCommand(kafkaRoot.CmdClause, &globals)
kafkaUpdate := kafka.NewUpdateCommand(kafkaRoot.CmdClause, &globals)
kafkaDelete := kafka.NewDeleteCommand(kafkaRoot.CmdClause, &globals)

commands := []common.Command{
configureRoot,
whoamiRoot,
Expand Down Expand Up @@ -490,6 +498,13 @@ func Run(args []string, env config.Environment, file config.File, configFilePath
httpsUpdate,
httpsDelete,

kafkaRoot,
kafkaCreate,
kafkaList,
kafkaDescribe,
kafkaUpdate,
kafkaDelete,

statsRoot,
statsRegions,
statsHistorical,
Expand Down
119 changes: 119 additions & 0 deletions pkg/app/run_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2475,6 +2475,125 @@ COMMANDS
--version=VERSION Number of service version
-n, --name=NAME The name of the HTTPS logging object
logging kafka create --name=NAME --version=VERSION --topic=TOPIC --brokers=BROKERS [<flags>]
Create a Kafka logging endpoint on a Fastly service version
-n, --name=NAME The name of the Kafka logging object. Used as
a primary key for API access
-s, --service-id=SERVICE-ID Service ID
--version=VERSION Number of service version
--topic=TOPIC The Kafka topic to send logs to
--brokers=BROKERS A comma-separated list of IP addresses or
hostnames of Kafka brokers
--compression-codec=COMPRESSION-CODEC
The codec used for compression of your logs.
One of: gzip, snappy, lz4
--required-acks=REQUIRED-ACKS
The Number of acknowledgements a leader must
receive before a write is considered
successful. One of: 1 (default) One server
needs to respond. 0 No servers need to
respond. -1 Wait for all in-sync replicas to
respond
--use-tls Whether to use TLS for secure logging. Can be
either true or false
--tls-ca-cert=TLS-CA-CERT A secure certificate to authenticate the
server with. Must be in PEM format
--tls-client-cert=TLS-CLIENT-CERT
The client certificate used to make
authenticated requests. Must be in PEM format
--tls-client-key=TLS-CLIENT-KEY
The client private key used to make
authenticated requests. Must be in PEM format
--tls-hostname=TLS-HOSTNAME
The hostname used to verify the server's
certificate. It can either be the Common Name
or a Subject Alternative Name (SAN)
--format=FORMAT Apache style log formatting. Your log must
produce valid JSON that Kafka can ingest
--format-version=FORMAT-VERSION
The version of the custom logging format used
for the configured endpoint. Can be either 2
(default) or 1
--placement=PLACEMENT Where in the generated VCL the logging call
should be placed, overriding any
format_version default. Can be none or
waf_debug
--response-condition=RESPONSE-CONDITION
The name of an existing condition in the
configured endpoint, or leave blank to always
execute
logging kafka list --version=VERSION [<flags>]
List Kafka endpoints on a Fastly service version
-s, --service-id=SERVICE-ID Service ID
--version=VERSION Number of service version
logging kafka describe --version=VERSION --name=NAME [<flags>]
Show detailed information about a Kafka logging endpoint on a Fastly service
version
-s, --service-id=SERVICE-ID Service ID
--version=VERSION Number of service version
-n, --name=NAME The name of the Kafka logging object
logging kafka update --version=VERSION --name=NAME [<flags>]
Update a Kafka logging endpoint on a Fastly service version
-s, --service-id=SERVICE-ID Service ID
--version=VERSION Number of service version
-n, --name=NAME The name of the Kafka logging object
--new-name=NEW-NAME New name of the Kafka logging object
--topic=TOPIC The Kafka topic to send logs to
--brokers=BROKERS A comma-separated list of IP addresses or
hostnames of Kafka brokers
--compression-codec=COMPRESSION-CODEC
The codec used for compression of your logs.
One of: gzip, snappy, lz4
--required-acks=REQUIRED-ACKS
The Number of acknowledgements a leader must
receive before a write is considered
successful. One of: 1 (default) One server
needs to respond. 0 No servers need to
respond. -1 Wait for all in-sync replicas to
respond
--use-tls Whether to use TLS for secure logging. Can be
either true or false
--tls-ca-cert=TLS-CA-CERT A secure certificate to authenticate the
server with. Must be in PEM format
--tls-client-cert=TLS-CLIENT-CERT
The client certificate used to make
authenticated requests. Must be in PEM format
--tls-client-key=TLS-CLIENT-KEY
The client private key used to make
authenticated requests. Must be in PEM format
--tls-hostname=TLS-HOSTNAME
The hostname used to verify the server's
certificate. It can either be the Common Name
or a Subject Alternative Name (SAN)
--format=FORMAT Apache style log formatting. Your log must
produce valid JSON that Kafka can ingest
--format-version=FORMAT-VERSION
The version of the custom logging format used
for the configured endpoint. Can be either 2
(default) or 1
--placement=PLACEMENT Where in the generated VCL the logging call
should be placed, overriding any
format_version default. Can be none or
waf_debug
--response-condition=RESPONSE-CONDITION
The name of an existing condition in the
configured endpoint, or leave blank to always
execute
logging kafka delete --version=VERSION --name=NAME [<flags>]
Delete a Kafka logging endpoint on a Fastly service version
-s, --service-id=SERVICE-ID Service ID
--version=VERSION Number of service version
-n, --name=NAME The name of the Kafka logging object
stats regions
List stats regions
Expand Down
144 changes: 144 additions & 0 deletions pkg/logging/kafka/create.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
package kafka

import (
"io"

"github.com/fastly/cli/pkg/common"
"github.com/fastly/cli/pkg/compute/manifest"
"github.com/fastly/cli/pkg/config"
"github.com/fastly/cli/pkg/errors"
"github.com/fastly/cli/pkg/text"
"github.com/fastly/go-fastly/fastly"
)

// CreateCommand calls the Fastly API to create Kafka logging endpoints.
type CreateCommand struct {
common.Base
manifest manifest.Data

// required
EndpointName string // Can't shaddow common.Base method Name().
Version int
Topic string
Brokers string

// optional
UseTLS common.OptionalBool
CompressionCodec common.OptionalString
RequiredACKs common.OptionalString
TLSCACert common.OptionalString
TLSClientCert common.OptionalString
TLSClientKey common.OptionalString
TLSHostname common.OptionalString
Format common.OptionalString
FormatVersion common.OptionalUint
Placement common.OptionalString
ResponseCondition common.OptionalString
}

// NewCreateCommand returns a usable command registered under the parent.
func NewCreateCommand(parent common.Registerer, globals *config.Data) *CreateCommand {
var c CreateCommand
c.Globals = globals
c.manifest.File.Read(manifest.Filename)
c.CmdClause = parent.Command("create", "Create a Kafka logging endpoint on a Fastly service version").Alias("add")

c.CmdClause.Flag("name", "The name of the Kafka logging object. Used as a primary key for API access").Short('n').Required().StringVar(&c.EndpointName)
c.CmdClause.Flag("service-id", "Service ID").Short('s').StringVar(&c.manifest.Flag.ServiceID)
c.CmdClause.Flag("version", "Number of service version").Required().IntVar(&c.Version)

c.CmdClause.Flag("topic", "The Kafka topic to send logs to").Required().StringVar(&c.Topic)
c.CmdClause.Flag("brokers", "A comma-separated list of IP addresses or hostnames of Kafka brokers").Required().StringVar(&c.Brokers)

c.CmdClause.Flag("compression-codec", "The codec used for compression of your logs. One of: gzip, snappy, lz4").Action(c.CompressionCodec.Set).StringVar(&c.CompressionCodec.Value)
c.CmdClause.Flag("required-acks", "The Number of acknowledgements a leader must receive before a write is considered successful. One of: 1 (default) One server needs to respond. 0 No servers need to respond. -1 Wait for all in-sync replicas to respond").Action(c.RequiredACKs.Set).StringVar(&c.RequiredACKs.Value)
c.CmdClause.Flag("use-tls", "Whether to use TLS for secure logging. Can be either true or false").Action(c.UseTLS.Set).BoolVar(&c.UseTLS.Value)
c.CmdClause.Flag("tls-ca-cert", "A secure certificate to authenticate the server with. Must be in PEM format").Action(c.TLSCACert.Set).StringVar(&c.TLSCACert.Value)
c.CmdClause.Flag("tls-client-cert", "The client certificate used to make authenticated requests. Must be in PEM format").Action(c.TLSClientCert.Set).StringVar(&c.TLSClientCert.Value)
c.CmdClause.Flag("tls-client-key", "The client private key used to make authenticated requests. Must be in PEM format").Action(c.TLSClientKey.Set).StringVar(&c.TLSClientKey.Value)
c.CmdClause.Flag("tls-hostname", "The hostname used to verify the server's certificate. It can either be the Common Name or a Subject Alternative Name (SAN)").Action(c.TLSHostname.Set).StringVar(&c.TLSHostname.Value)
c.CmdClause.Flag("format", "Apache style log formatting. Your log must produce valid JSON that Kafka can ingest").Action(c.Format.Set).StringVar(&c.Format.Value)
c.CmdClause.Flag("format-version", "The version of the custom logging format used for the configured endpoint. Can be either 2 (default) or 1").Action(c.FormatVersion.Set).UintVar(&c.FormatVersion.Value)
c.CmdClause.Flag("placement", "Where in the generated VCL the logging call should be placed, overriding any format_version default. Can be none or waf_debug").Action(c.Placement.Set).StringVar(&c.Placement.Value)
c.CmdClause.Flag("response-condition", "The name of an existing condition in the configured endpoint, or leave blank to always execute").Action(c.ResponseCondition.Set).StringVar(&c.ResponseCondition.Value)

return &c
}

// createInput transforms values parsed from CLI flags into an object to be used by the API client library.
func (c *CreateCommand) createInput() (*fastly.CreateKafkaInput, error) {
var input fastly.CreateKafkaInput

serviceID, source := c.manifest.ServiceID()
if source == manifest.SourceUndefined {
return nil, errors.ErrNoServiceID
}

input.Service = serviceID
input.Version = c.Version
input.Name = fastly.String(c.EndpointName)
input.Topic = fastly.String(c.Topic)
input.Brokers = fastly.String(c.Brokers)

if c.CompressionCodec.Valid {
input.CompressionCodec = fastly.String(c.CompressionCodec.Value)
}

if c.RequiredACKs.Valid {
input.RequiredACKs = fastly.String(c.RequiredACKs.Value)
}

if c.UseTLS.Valid {
input.UseTLS = fastly.CBool(c.UseTLS.Value)
}

if c.TLSCACert.Valid {
input.TLSCACert = fastly.String(c.TLSCACert.Value)
}

if c.TLSClientCert.Valid {
input.TLSClientCert = fastly.String(c.TLSClientCert.Value)
}

if c.TLSClientKey.Valid {
input.TLSClientKey = fastly.String(c.TLSClientKey.Value)
}

if c.TLSHostname.Valid {
input.TLSHostname = fastly.String(c.TLSHostname.Value)
}

if c.Format.Valid {
input.Format = fastly.String(c.Format.Value)
}

if c.FormatVersion.Valid {
input.FormatVersion = fastly.Uint(c.FormatVersion.Value)
}

if c.ResponseCondition.Valid {
input.ResponseCondition = fastly.String(c.ResponseCondition.Value)
}

if c.Placement.Valid {
input.Placement = fastly.String(c.Placement.Value)
}

return &input, nil
}

// Exec invokes the application logic for the command.
func (c *CreateCommand) Exec(in io.Reader, out io.Writer) error {
input, err := c.createInput()
if err != nil {
return err
}

d, err := c.Globals.Client.CreateKafka(input)
if err != nil {
return err
}

text.Success(out, "Created Kafka logging endpoint %s (service %s version %d)", d.Name, d.ServiceID, d.Version)
return nil
}
47 changes: 47 additions & 0 deletions pkg/logging/kafka/delete.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package kafka

import (
"io"

"github.com/fastly/cli/pkg/common"
"github.com/fastly/cli/pkg/compute/manifest"
"github.com/fastly/cli/pkg/config"
"github.com/fastly/cli/pkg/errors"
"github.com/fastly/cli/pkg/text"
"github.com/fastly/go-fastly/fastly"
)

// DeleteCommand calls the Fastly API to delete Kafka logging endpoints.
type DeleteCommand struct {
common.Base
manifest manifest.Data
Input fastly.DeleteKafkaInput
}

// NewDeleteCommand returns a usable command registered under the parent.
func NewDeleteCommand(parent common.Registerer, globals *config.Data) *DeleteCommand {
var c DeleteCommand
c.Globals = globals
c.manifest.File.Read(manifest.Filename)
c.CmdClause = parent.Command("delete", "Delete a Kafka logging endpoint on a Fastly service version").Alias("remove")
c.CmdClause.Flag("service-id", "Service ID").Short('s').StringVar(&c.manifest.Flag.ServiceID)
c.CmdClause.Flag("version", "Number of service version").Required().IntVar(&c.Input.Version)
c.CmdClause.Flag("name", "The name of the Kafka logging object").Short('n').Required().StringVar(&c.Input.Name)
return &c
}

// Exec invokes the application logic for the command.
func (c *DeleteCommand) Exec(in io.Reader, out io.Writer) error {
serviceID, source := c.manifest.ServiceID()
if source == manifest.SourceUndefined {
return errors.ErrNoServiceID
}
c.Input.Service = serviceID

if err := c.Globals.Client.DeleteKafka(&c.Input); err != nil {
return err
}

text.Success(out, "Deleted Kafka logging endpoint %s (service %s version %d)", c.Input.Name, c.Input.Service, c.Input.Version)
return nil
}
Loading

0 comments on commit b5021da

Please sign in to comment.