Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CONFIG-129] Ctlstore reflector metrics inconsistency in prod euw 1 #106

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions ldb_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ func (reader *LDBReader) GetLedgerLatency(ctx context.Context) (time.Duration, e
}
}

// GetRowsByKeyPrefix returns a *Rows iterator that will supply all of the rows in
// GetRowsByKeyPrefix returns a *Rows iterator that will supply all the rows in
// the family and table match the supplied primary key prefix.
func (reader *LDBReader) GetRowsByKeyPrefix(ctx context.Context, familyName string, tableName string, key ...interface{}) (*Rows, error) {
ctx = discardContext()
Expand Down Expand Up @@ -241,7 +241,7 @@ func (reader *LDBReader) GetRowByKey(
// very likely use the global singleton reader, this means that we
// must assume that the cache will be shared across the whole process.
// The way that a PK would be changed on a table is that it would need
// to be dropped and re-created. In the mean time, this cache will
// to be dropped and re-created. In the meantime, this cache will
// go stale. The way that this is dealt with is to clear the cache if
// the statement encounters any execution errors.
pk, err := reader.getPrimaryKey(ctx, ldbTable) // assumes RLock held
Expand Down Expand Up @@ -368,11 +368,11 @@ func (reader *LDBReader) Ping(ctx context.Context) bool {
// ensure that a supplied key is converted appropriately with respect
// to the type of each PK column.
func convertKeyBeforeQuery(pk schema.PrimaryKey, key []interface{}) error {
// sanity check on th length of the pk field type slice
if len(key) > len(pk.Types) {
return errors.New("insufficient key field type data")
}
for i, k := range key {
// sanity check on th elength of the pk field type slice
if i >= len(pk.Types) {
return errors.New("insufficient key field type data")
}
pkt := pk.Types[i]
switch k := k.(type) {
case string:
Expand All @@ -399,7 +399,7 @@ func (reader *LDBReader) unlock() {
func (reader *LDBReader) invalidatePKCache(ldbTable string) {
if reader.pkCache == nil {
// Cache hasn't even been initialized yet, so invalidation would
// do nothing anyways.
// do nothing anyway.
return
}

Expand Down
72 changes: 72 additions & 0 deletions ldb_reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,78 @@ func TestGetRowsByKeyPrefix(t *testing.T) {
},
err: nil,
},
{
zhou-hongyu marked this conversation as resolved.
Show resolved Hide resolved
desc: "empty string key [map]",
family: "foo",
table: "multirow",
key: []interface{}{""},
targetFunc: func() interface{} { return map[string]interface{}{} },
expected: nil,
err: nil,
},
{
desc: "empty string keys [struct]",
family: "foo",
table: "multirow",
key: []interface{}{""},
targetFunc: func() interface{} { return map[string]interface{}{} },
expected: nil,
err: nil,
},
{
desc: "incomplete keys [map]",
family: "foo",
table: "multirow",
key: []interface{}{"a", ""},
targetFunc: func() interface{} { return map[string]interface{}{} },
expected: nil,
err: nil,
},
{
desc: "incomplete keys [struct]",
family: "foo",
table: "multirow",
key: []interface{}{"a", ""},
targetFunc: func() interface{} { return map[string]interface{}{} },
expected: nil,
err: nil,
},
{
desc: "out-of-ordered keys [map]",
family: "foo",
table: "multirow",
key: []interface{}{"A", "a"},
targetFunc: func() interface{} { return map[string]interface{}{} },
expected: nil,
err: nil,
},
{
desc: "out-of-ordered keys [struct]",
family: "foo",
table: "multirow",
key: []interface{}{"A", "a"},
targetFunc: func() interface{} { return map[string]interface{}{} },
expected: nil,
err: nil,
},
{
desc: "not the first key [map]",
family: "foo",
table: "multirow",
key: []interface{}{"A"},
targetFunc: func() interface{} { return map[string]interface{}{} },
expected: nil,
err: nil,
},
{
desc: "not the first key [struct]",
family: "foo",
table: "multirow",
key: []interface{}{"A"},
targetFunc: func() interface{} { return map[string]interface{}{} },
expected: nil,
err: nil,
},
{
desc: "no keys [map]",
family: "foo",
Expand Down
44 changes: 23 additions & 21 deletions pkg/ledger/ledger_monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,29 +81,27 @@ func (m *Monitor) Start(ctx context.Context) {
}
// always instrument ledger latency even if ECS behavior is disabled.
stats.Set("reflector-ledger-latency", latency)
if !m.cfg.DisableECSBehavior {
switch {
case latency <= m.cfg.MaxHealthyLatency && (health == nil || *health != true):
// set a healthy attribute
if err := m.setHealthAttribute(ctx, m.cfg.HealthyAttributeValue); err != nil {
return errors.Wrap(err, "set healthy")
}
health = pointer.ToBool(true)
case latency > m.cfg.MaxHealthyLatency && (health == nil || *health != false):
// set an unhealthy attribute
if err := m.setHealthAttribute(ctx, m.cfg.UnhealthyAttributeValue); err != nil {
return errors.Wrap(err, "set unhealthy")
}
health = pointer.ToBool(false)
switch {
case latency <= m.cfg.MaxHealthyLatency && (health == nil || *health != true):
// set a healthy attribute
if err := m.setHealthAttribute(ctx, m.cfg.HealthyAttributeValue); err != nil {
zhou-hongyu marked this conversation as resolved.
Show resolved Hide resolved
return errors.Wrap(err, "set healthy")
}
switch {
case health == nil:
stats.Set("ledger-health", 1, stats.T("status", "unknown"))
case *health == false:
stats.Set("ledger-health", 1, stats.T("status", "unhealthy"))
case *health == true:
stats.Set("ledger-health", 1, stats.T("status", "healthy"))
health = pointer.ToBool(true)
case latency > m.cfg.MaxHealthyLatency && (health == nil || *health != false):
// set an unhealthy attribute
if err := m.setHealthAttribute(ctx, m.cfg.UnhealthyAttributeValue); err != nil {
return errors.Wrap(err, "set unhealthy")
}
health = pointer.ToBool(false)
}
switch {
case health == nil:
zhou-hongyu marked this conversation as resolved.
Show resolved Hide resolved
stats.Set("ledger_health", 1, stats.T("status", "unknown"))
case *health == false:
stats.Set("ledger_health", 1, stats.T("status", "unhealthy"))
case *health == true:
stats.Set("ledger_health", 1, stats.T("status", "healthy"))
}
return nil
}()
Expand All @@ -125,6 +123,10 @@ func (m *Monitor) Start(ctx context.Context) {
}

func (m *Monitor) setHealthAttribute(ctx context.Context, attrValue string) error {
if m.cfg.DisableECSBehavior {
return nil
}

events.Log("Setting ECS instance attribute: %s=%s", m.cfg.AttributeName, attrValue)
ecsMeta, err := m.getECSMetadata(ctx)
if err != nil {
Expand Down