From c301deab31675a5558c717decdb398bea55e8c63 Mon Sep 17 00:00:00 2001 From: Yoshiyuki Mineo Date: Sat, 16 Nov 2024 21:48:32 +0900 Subject: [PATCH] Rename --- v2/distributed_gobreaker.go | 95 +++++++++++---------- v2/distributed_gobreaker_test.go | 138 +++++++++++++++---------------- 2 files changed, 116 insertions(+), 117 deletions(-) diff --git a/v2/distributed_gobreaker.go b/v2/distributed_gobreaker.go index bc7fbd6..2565d90 100644 --- a/v2/distributed_gobreaker.go +++ b/v2/distributed_gobreaker.go @@ -6,7 +6,7 @@ import ( "time" ) -// SharedState represents the CircuitBreaker state stored in Distributed Storage +// SharedState represents the shared state of DistributedCircuitBreaker. type SharedState struct { State State `json:"state"` Generation uint64 `json:"generation"` @@ -26,32 +26,32 @@ type DistributedCircuitBreaker[T any] struct { } // NewDistributedCircuitBreaker returns a new DistributedCircuitBreaker configured with the given StorageSettings -func NewDistributedCircuitBreaker[T any](storageClient SharedStateStore, settings Settings) *DistributedCircuitBreaker[T] { +func NewDistributedCircuitBreaker[T any](store SharedStateStore, settings Settings) *DistributedCircuitBreaker[T] { cb := NewCircuitBreaker[T](settings) return &DistributedCircuitBreaker[T]{ CircuitBreaker: cb, - store: storageClient, + store: store, } } -func (rcb *DistributedCircuitBreaker[T]) State(ctx context.Context) State { - if rcb.store == nil { - return rcb.CircuitBreaker.State() +func (dcb *DistributedCircuitBreaker[T]) State(ctx context.Context) State { + if dcb.store == nil { + return dcb.CircuitBreaker.State() } - state, err := rcb.store.GetState(ctx) + state, err := dcb.store.GetState(ctx) if err != nil { // Fallback to in-memory state if Storage fails - return rcb.CircuitBreaker.State() + return dcb.CircuitBreaker.State() } now := time.Now() - currentState, _ := rcb.currentState(state, now) + currentState, _ := dcb.currentState(state, now) // Update the state in Storage if it has changed if currentState != state.State { state.State = currentState - if err := rcb.store.SetState(ctx, state); err != nil { + if err := dcb.store.SetState(ctx, state); err != nil { // Log the error, but continue with the current state fmt.Printf("Failed to update state in storage: %v\n", err) } @@ -61,11 +61,11 @@ func (rcb *DistributedCircuitBreaker[T]) State(ctx context.Context) State { } // Execute runs the given request if the DistributedCircuitBreaker accepts it -func (rcb *DistributedCircuitBreaker[T]) Execute(ctx context.Context, req func() (T, error)) (T, error) { - if rcb.store == nil { - return rcb.CircuitBreaker.Execute(req) +func (dcb *DistributedCircuitBreaker[T]) Execute(ctx context.Context, req func() (T, error)) (T, error) { + if dcb.store == nil { + return dcb.CircuitBreaker.Execute(req) } - generation, err := rcb.beforeRequest(ctx) + generation, err := dcb.beforeRequest(ctx) if err != nil { var zero T return zero, err @@ -74,28 +74,28 @@ func (rcb *DistributedCircuitBreaker[T]) Execute(ctx context.Context, req func() defer func() { e := recover() if e != nil { - rcb.afterRequest(ctx, generation, false) + dcb.afterRequest(ctx, generation, false) panic(e) } }() result, err := req() - rcb.afterRequest(ctx, generation, rcb.isSuccessful(err)) + dcb.afterRequest(ctx, generation, dcb.isSuccessful(err)) return result, err } -func (rcb *DistributedCircuitBreaker[T]) beforeRequest(ctx context.Context) (uint64, error) { - state, err := rcb.store.GetState(ctx) +func (dcb *DistributedCircuitBreaker[T]) beforeRequest(ctx context.Context) (uint64, error) { + state, err := dcb.store.GetState(ctx) if err != nil { return 0, err } now := time.Now() - currentState, generation := rcb.currentState(state, now) + currentState, generation := dcb.currentState(state, now) if currentState != state.State { - rcb.setState(&state, currentState, now) - err = rcb.store.SetState(ctx, state) + dcb.setState(&state, currentState, now) + err = dcb.store.SetState(ctx, state) if err != nil { return 0, err } @@ -103,12 +103,12 @@ func (rcb *DistributedCircuitBreaker[T]) beforeRequest(ctx context.Context) (uin if currentState == StateOpen { return generation, ErrOpenState - } else if currentState == StateHalfOpen && state.Counts.Requests >= rcb.maxRequests { + } else if currentState == StateHalfOpen && state.Counts.Requests >= dcb.maxRequests { return generation, ErrTooManyRequests } state.Counts.onRequest() - err = rcb.store.SetState(ctx, state) + err = dcb.store.SetState(ctx, state) if err != nil { return 0, err } @@ -116,27 +116,27 @@ func (rcb *DistributedCircuitBreaker[T]) beforeRequest(ctx context.Context) (uin return generation, nil } -func (rcb *DistributedCircuitBreaker[T]) afterRequest(ctx context.Context, before uint64, success bool) { - state, err := rcb.store.GetState(ctx) +func (dcb *DistributedCircuitBreaker[T]) afterRequest(ctx context.Context, before uint64, success bool) { + state, err := dcb.store.GetState(ctx) if err != nil { return } now := time.Now() - currentState, generation := rcb.currentState(state, now) + currentState, generation := dcb.currentState(state, now) if generation != before { return } if success { - rcb.onSuccess(&state, currentState, now) + dcb.onSuccess(&state, currentState, now) } else { - rcb.onFailure(&state, currentState, now) + dcb.onFailure(&state, currentState, now) } - rcb.store.SetState(ctx, state) + dcb.store.SetState(ctx, state) } -func (rcb *DistributedCircuitBreaker[T]) onSuccess(state *SharedState, currentState State, now time.Time) { +func (dcb *DistributedCircuitBreaker[T]) onSuccess(state *SharedState, currentState State, now time.Time) { if state.State == StateOpen { state.State = currentState } @@ -146,39 +146,39 @@ func (rcb *DistributedCircuitBreaker[T]) onSuccess(state *SharedState, currentSt state.Counts.onSuccess() case StateHalfOpen: state.Counts.onSuccess() - if state.Counts.ConsecutiveSuccesses >= rcb.maxRequests { - rcb.setState(state, StateClosed, now) + if state.Counts.ConsecutiveSuccesses >= dcb.maxRequests { + dcb.setState(state, StateClosed, now) } } } -func (rcb *DistributedCircuitBreaker[T]) onFailure(state *SharedState, currentState State, now time.Time) { +func (dcb *DistributedCircuitBreaker[T]) onFailure(state *SharedState, currentState State, now time.Time) { switch currentState { case StateClosed: state.Counts.onFailure() - if rcb.readyToTrip(state.Counts) { - rcb.setState(state, StateOpen, now) + if dcb.readyToTrip(state.Counts) { + dcb.setState(state, StateOpen, now) } case StateHalfOpen: - rcb.setState(state, StateOpen, now) + dcb.setState(state, StateOpen, now) } } -func (rcb *DistributedCircuitBreaker[T]) currentState(state SharedState, now time.Time) (State, uint64) { +func (dcb *DistributedCircuitBreaker[T]) currentState(state SharedState, now time.Time) (State, uint64) { switch state.State { case StateClosed: if !state.Expiry.IsZero() && state.Expiry.Before(now) { - rcb.toNewGeneration(&state, now) + dcb.toNewGeneration(&state, now) } case StateOpen: if state.Expiry.Before(now) { - rcb.setState(&state, StateHalfOpen, now) + dcb.setState(&state, StateHalfOpen, now) } } return state.State, state.Generation } -func (rcb *DistributedCircuitBreaker[T]) setState(state *SharedState, newState State, now time.Time) { +func (dcb *DistributedCircuitBreaker[T]) setState(state *SharedState, newState State, now time.Time) { if state.State == newState { return } @@ -186,28 +186,27 @@ func (rcb *DistributedCircuitBreaker[T]) setState(state *SharedState, newState S prev := state.State state.State = newState - rcb.toNewGeneration(state, now) + dcb.toNewGeneration(state, now) - if rcb.onStateChange != nil { - rcb.onStateChange(rcb.name, prev, newState) + if dcb.onStateChange != nil { + dcb.onStateChange(dcb.name, prev, newState) } } -func (rcb *DistributedCircuitBreaker[T]) toNewGeneration(state *SharedState, now time.Time) { - +func (dcb *DistributedCircuitBreaker[T]) toNewGeneration(state *SharedState, now time.Time) { state.Generation++ state.Counts.clear() var zero time.Time switch state.State { case StateClosed: - if rcb.interval == 0 { + if dcb.interval == 0 { state.Expiry = zero } else { - state.Expiry = now.Add(rcb.interval) + state.Expiry = now.Add(dcb.interval) } case StateOpen: - state.Expiry = now.Add(rcb.timeout) + state.Expiry = now.Add(dcb.timeout) default: // StateHalfOpen state.Expiry = zero } diff --git a/v2/distributed_gobreaker_test.go b/v2/distributed_gobreaker_test.go index 7ffbf6e..c9edf6f 100644 --- a/v2/distributed_gobreaker_test.go +++ b/v2/distributed_gobreaker_test.go @@ -12,8 +12,8 @@ import ( "github.com/stretchr/testify/assert" ) -var defaultRCB *DistributedCircuitBreaker[any] -var customRCB *DistributedCircuitBreaker[any] +var defaultDCB *DistributedCircuitBreaker[any] +var customDCB *DistributedCircuitBreaker[any] type storageAdapter struct { client *redis.Client @@ -65,24 +65,24 @@ func setupTestWithMiniredis() (*DistributedCircuitBreaker[any], *miniredis.Minir }), mr, client } -func pseudoSleepStorage(ctx context.Context, rcb *DistributedCircuitBreaker[any], period time.Duration) { - state, _ := rcb.store.GetState(ctx) +func pseudoSleepStorage(ctx context.Context, dcb *DistributedCircuitBreaker[any], period time.Duration) { + state, _ := dcb.store.GetState(ctx) state.Expiry = state.Expiry.Add(-period) // Reset counts if the interval has passed if time.Now().After(state.Expiry) { state.Counts = Counts{} } - rcb.store.SetState(ctx, state) + dcb.store.SetState(ctx, state) } -func successRequest(ctx context.Context, rcb *DistributedCircuitBreaker[any]) error { - _, err := rcb.Execute(ctx, func() (interface{}, error) { return nil, nil }) +func successRequest(ctx context.Context, dcb *DistributedCircuitBreaker[any]) error { + _, err := dcb.Execute(ctx, func() (interface{}, error) { return nil, nil }) return err } -func failRequest(ctx context.Context, rcb *DistributedCircuitBreaker[any]) error { - _, err := rcb.Execute(ctx, func() (interface{}, error) { return nil, errors.New("fail") }) +func failRequest(ctx context.Context, dcb *DistributedCircuitBreaker[any]) error { + _, err := dcb.Execute(ctx, func() (interface{}, error) { return nil, errors.New("fail") }) if err != nil && err.Error() == "fail" { return nil } @@ -90,74 +90,74 @@ func failRequest(ctx context.Context, rcb *DistributedCircuitBreaker[any]) error } func TestDistributedCircuitBreakerInitialization(t *testing.T) { - rcb, mr, _ := setupTestWithMiniredis() + dcb, mr, _ := setupTestWithMiniredis() defer mr.Close() ctx := context.Background() - assert.Equal(t, "TestBreaker", rcb.Name()) - assert.Equal(t, uint32(3), rcb.maxRequests) - assert.Equal(t, time.Second, rcb.interval) - assert.Equal(t, time.Second*2, rcb.timeout) - assert.NotNil(t, rcb.readyToTrip) + assert.Equal(t, "TestBreaker", dcb.Name()) + assert.Equal(t, uint32(3), dcb.maxRequests) + assert.Equal(t, time.Second, dcb.interval) + assert.Equal(t, time.Second*2, dcb.timeout) + assert.NotNil(t, dcb.readyToTrip) - state := rcb.State(ctx) + state := dcb.State(ctx) assert.Equal(t, StateClosed, state) } func TestDistributedCircuitBreakerStateTransitions(t *testing.T) { - rcb, mr, _ := setupTestWithMiniredis() + dcb, mr, _ := setupTestWithMiniredis() defer mr.Close() ctx := context.Background() // Check if initial state is closed - assert.Equal(t, StateClosed, rcb.State(ctx)) + assert.Equal(t, StateClosed, dcb.State(ctx)) // StateClosed to StateOpen for i := 0; i < 6; i++ { - assert.NoError(t, failRequest(ctx, rcb)) + assert.NoError(t, failRequest(ctx, dcb)) } - assert.Equal(t, StateOpen, rcb.State(ctx)) + assert.Equal(t, StateOpen, dcb.State(ctx)) // Ensure requests fail when circuit is open - err := failRequest(ctx, rcb) + err := failRequest(ctx, dcb) assert.Error(t, err) assert.Equal(t, ErrOpenState, err) // Wait for timeout to transition to half-open - pseudoSleepStorage(ctx, rcb, rcb.timeout) - assert.Equal(t, StateHalfOpen, rcb.State(ctx)) + pseudoSleepStorage(ctx, dcb, dcb.timeout) + assert.Equal(t, StateHalfOpen, dcb.State(ctx)) // StateHalfOpen to StateClosed - for i := 0; i < int(rcb.maxRequests); i++ { - assert.NoError(t, successRequest(ctx, rcb)) + for i := 0; i < int(dcb.maxRequests); i++ { + assert.NoError(t, successRequest(ctx, dcb)) } - assert.Equal(t, StateClosed, rcb.State(ctx)) + assert.Equal(t, StateClosed, dcb.State(ctx)) // StateClosed to StateOpen (again) for i := 0; i < 6; i++ { - assert.NoError(t, failRequest(ctx, rcb)) + assert.NoError(t, failRequest(ctx, dcb)) } - assert.Equal(t, StateOpen, rcb.State(ctx)) + assert.Equal(t, StateOpen, dcb.State(ctx)) } func TestDistributedCircuitBreakerExecution(t *testing.T) { - rcb, mr, _ := setupTestWithMiniredis() + dcb, mr, _ := setupTestWithMiniredis() defer mr.Close() ctx := context.Background() // Test successful execution - result, err := rcb.Execute(ctx, func() (interface{}, error) { + result, err := dcb.Execute(ctx, func() (interface{}, error) { return "success", nil }) assert.NoError(t, err) assert.Equal(t, "success", result) // Test failed execution - _, err = rcb.Execute(ctx, func() (interface{}, error) { + _, err = dcb.Execute(ctx, func() (interface{}, error) { return nil, errors.New("test error") }) assert.Error(t, err) @@ -165,25 +165,25 @@ func TestDistributedCircuitBreakerExecution(t *testing.T) { } func TestDistributedCircuitBreakerCounts(t *testing.T) { - rcb, mr, _ := setupTestWithMiniredis() + dcb, mr, _ := setupTestWithMiniredis() defer mr.Close() ctx := context.Background() for i := 0; i < 5; i++ { - assert.Nil(t, successRequest(ctx, rcb)) + assert.Nil(t, successRequest(ctx, dcb)) } - state, _ := rcb.store.GetState(ctx) + state, _ := dcb.store.GetState(ctx) assert.Equal(t, Counts{5, 5, 0, 5, 0}, state.Counts) - assert.Nil(t, failRequest(ctx, rcb)) - state, _ = rcb.store.GetState(ctx) + assert.Nil(t, failRequest(ctx, dcb)) + state, _ = dcb.store.GetState(ctx) assert.Equal(t, Counts{6, 5, 1, 0, 1}, state.Counts) } func TestDistributedCircuitBreakerFallback(t *testing.T) { - rcb, mr, _ := setupTestWithMiniredis() + dcb, mr, _ := setupTestWithMiniredis() defer mr.Close() ctx := context.Background() @@ -191,14 +191,14 @@ func TestDistributedCircuitBreakerFallback(t *testing.T) { // Test when Storage is unavailable mr.Close() // Simulate Storage being unavailable - rcb.store = nil + dcb.store = nil - state := rcb.State(ctx) + state := dcb.State(ctx) assert.Equal(t, StateClosed, state, "Should fallback to in-memory state when Storage is unavailable") // Ensure operations still work without Storage - assert.Nil(t, successRequest(ctx, rcb)) - assert.Nil(t, failRequest(ctx, rcb)) + assert.Nil(t, successRequest(ctx, dcb)) + assert.Nil(t, failRequest(ctx, dcb)) } func TestCustomDistributedCircuitBreaker(t *testing.T) { @@ -214,7 +214,7 @@ func TestCustomDistributedCircuitBreaker(t *testing.T) { storageClient := &storageAdapter{client: client} - customRCB = NewDistributedCircuitBreaker[any](storageClient, Settings{ + customDCB = NewDistributedCircuitBreaker[any](storageClient, Settings{ Name: "CustomBreaker", MaxRequests: 3, Interval: time.Second * 30, @@ -229,54 +229,54 @@ func TestCustomDistributedCircuitBreaker(t *testing.T) { ctx := context.Background() t.Run("Initialization", func(t *testing.T) { - assert.Equal(t, "CustomBreaker", customRCB.Name()) - assert.Equal(t, StateClosed, customRCB.State(ctx)) + assert.Equal(t, "CustomBreaker", customDCB.Name()) + assert.Equal(t, StateClosed, customDCB.State(ctx)) }) t.Run("Counts and State Transitions", func(t *testing.T) { // Perform 5 successful and 5 failed requests for i := 0; i < 5; i++ { - assert.NoError(t, successRequest(ctx, customRCB)) - assert.NoError(t, failRequest(ctx, customRCB)) + assert.NoError(t, successRequest(ctx, customDCB)) + assert.NoError(t, failRequest(ctx, customDCB)) } - state, err := customRCB.store.GetState(ctx) + state, err := customDCB.store.GetState(ctx) assert.NoError(t, err) assert.Equal(t, StateClosed, state.State) assert.Equal(t, Counts{10, 5, 5, 0, 1}, state.Counts) // Perform one more successful request - assert.NoError(t, successRequest(ctx, customRCB)) - state, err = customRCB.store.GetState(ctx) + assert.NoError(t, successRequest(ctx, customDCB)) + state, err = customDCB.store.GetState(ctx) assert.NoError(t, err) assert.Equal(t, Counts{11, 6, 5, 1, 0}, state.Counts) // Simulate time passing to reset counts - pseudoSleepStorage(ctx, customRCB, time.Second*30) + pseudoSleepStorage(ctx, customDCB, time.Second*30) // Perform requests to trigger StateOpen - assert.NoError(t, successRequest(ctx, customRCB)) - assert.NoError(t, failRequest(ctx, customRCB)) - assert.NoError(t, failRequest(ctx, customRCB)) + assert.NoError(t, successRequest(ctx, customDCB)) + assert.NoError(t, failRequest(ctx, customDCB)) + assert.NoError(t, failRequest(ctx, customDCB)) // Check if the circuit breaker is now open - assert.Equal(t, StateOpen, customRCB.State(ctx)) + assert.Equal(t, StateOpen, customDCB.State(ctx)) - state, err = customRCB.store.GetState(ctx) + state, err = customDCB.store.GetState(ctx) assert.NoError(t, err) assert.Equal(t, Counts{0, 0, 0, 0, 0}, state.Counts) }) t.Run("Timeout and Half-Open State", func(t *testing.T) { // Simulate timeout to transition to half-open state - pseudoSleepStorage(ctx, customRCB, time.Second*90) - assert.Equal(t, StateHalfOpen, customRCB.State(ctx)) + pseudoSleepStorage(ctx, customDCB, time.Second*90) + assert.Equal(t, StateHalfOpen, customDCB.State(ctx)) // Successful requests in half-open state should close the circuit for i := 0; i < 3; i++ { - assert.NoError(t, successRequest(ctx, customRCB)) + assert.NoError(t, successRequest(ctx, customDCB)) } - assert.Equal(t, StateClosed, customRCB.State(ctx)) + assert.Equal(t, StateClosed, customDCB.State(ctx)) }) } @@ -308,43 +308,43 @@ func TestCustomDistributedCircuitBreakerStateTransitions(t *testing.T) { storageClient := &storageAdapter{client: client} - cb := NewDistributedCircuitBreaker[any](storageClient, customSt) + dcb := NewDistributedCircuitBreaker[any](storageClient, customSt) ctx := context.Background() // Test case t.Run("Circuit Breaker State Transitions", func(t *testing.T) { // Initial state should be Closed - assert.Equal(t, StateClosed, cb.State(ctx)) + assert.Equal(t, StateClosed, dcb.State(ctx)) // Cause two consecutive failures to trip the circuit for i := 0; i < 2; i++ { - err := failRequest(ctx, cb) + err := failRequest(ctx, dcb) assert.NoError(t, err, "Fail request should not return an error") } // Circuit should now be Open - assert.Equal(t, StateOpen, cb.State(ctx)) + assert.Equal(t, StateOpen, dcb.State(ctx)) assert.Equal(t, StateChange{"cb", StateClosed, StateOpen}, stateChange) // Requests should fail immediately when circuit is Open - err := successRequest(ctx, cb) + err := successRequest(ctx, dcb) assert.Error(t, err) assert.Equal(t, ErrOpenState, err) // Simulate timeout to transition to Half-Open - pseudoSleepStorage(ctx, cb, 6*time.Second) - assert.Equal(t, StateHalfOpen, cb.State(ctx)) + pseudoSleepStorage(ctx, dcb, 6*time.Second) + assert.Equal(t, StateHalfOpen, dcb.State(ctx)) assert.Equal(t, StateChange{"cb", StateOpen, StateHalfOpen}, stateChange) // Successful requests in Half-Open state should close the circuit - for i := 0; i < int(cb.maxRequests); i++ { - err := successRequest(ctx, cb) + for i := 0; i < int(dcb.maxRequests); i++ { + err := successRequest(ctx, dcb) assert.NoError(t, err) } // Circuit should now be Closed - assert.Equal(t, StateClosed, cb.State(ctx)) + assert.Equal(t, StateClosed, dcb.State(ctx)) assert.Equal(t, StateChange{"cb", StateHalfOpen, StateClosed}, stateChange) }) }