-
-
Notifications
You must be signed in to change notification settings - Fork 413
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #409 from spiral/plugin/resetter
Plugin/resetter
- Loading branch information
Showing
8 changed files
with
272 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
package resetter | ||
|
||
type Resetter interface { | ||
Reset() error | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,54 @@ | ||
package resetter | ||
|
||
import ( | ||
"github.com/spiral/endure" | ||
"github.com/spiral/errors" | ||
"github.com/spiral/roadrunner/v2/interfaces/log" | ||
"github.com/spiral/roadrunner/v2/interfaces/resetter" | ||
) | ||
|
||
const PluginName = "resetter" | ||
|
||
type Plugin struct { | ||
registry map[string]resetter.Resetter | ||
log log.Logger | ||
} | ||
|
||
func (p *Plugin) Init(log log.Logger) error { | ||
p.registry = make(map[string]resetter.Resetter) | ||
p.log = log | ||
return nil | ||
} | ||
|
||
// Reset named service. | ||
func (p *Plugin) Reset(name string) error { | ||
svc, ok := p.registry[name] | ||
if !ok { | ||
return errors.E("no such service", errors.Str(name)) | ||
} | ||
|
||
return svc.Reset() | ||
} | ||
|
||
// RegisterTarget resettable service. | ||
func (p *Plugin) RegisterTarget(name endure.Named, r resetter.Resetter) error { | ||
p.registry[name.Name()] = r | ||
return nil | ||
} | ||
|
||
// Collects declares services to be collected. | ||
func (p *Plugin) Collects() []interface{} { | ||
return []interface{}{ | ||
p.RegisterTarget, | ||
} | ||
} | ||
|
||
// Name of the service. | ||
func (p *Plugin) Name() string { | ||
return PluginName | ||
} | ||
|
||
// RPCService returns associated rpc service. | ||
func (p *Plugin) RPC() interface{} { | ||
return &rpc{srv: p, log: p.log} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,30 @@ | ||
package resetter | ||
|
||
import "github.com/spiral/roadrunner/v2/interfaces/log" | ||
|
||
type rpc struct { | ||
srv *Plugin | ||
log log.Logger | ||
} | ||
|
||
// List all resettable services. | ||
func (rpc *rpc) List(_ bool, list *[]string) error { | ||
rpc.log.Info("started List method") | ||
*list = make([]string, 0) | ||
|
||
for name := range rpc.srv.registry { | ||
*list = append(*list, name) | ||
} | ||
rpc.log.Debug("services list", "services", *list) | ||
|
||
rpc.log.Info("finished List method") | ||
return nil | ||
} | ||
|
||
// Reset named service. | ||
func (rpc *rpc) Reset(service string, done *bool) error { | ||
rpc.log.Info("started Reset method for the service", "service", service) | ||
defer rpc.log.Info("finished Reset method for the service", "service", service) | ||
*done = true | ||
return rpc.srv.Reset(service) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,13 @@ | ||
server: | ||
command: "php ../../../tests/client.php echo pipes" | ||
user: "" | ||
group: "" | ||
env: | ||
"RR_CONFIG": "/some/place/on/the/C134" | ||
"RR_CONFIG2": "C138" | ||
relay: "pipes" | ||
relayTimeout: "20s" | ||
|
||
rpc: | ||
listen: tcp://127.0.0.1:6001 | ||
disabled: false |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,101 @@ | ||
package tests | ||
|
||
import ( | ||
"net" | ||
"net/rpc" | ||
"os" | ||
"os/signal" | ||
"syscall" | ||
"testing" | ||
"time" | ||
|
||
"github.com/spiral/endure" | ||
"github.com/spiral/goridge/v2" | ||
"github.com/spiral/roadrunner/v2/plugins/config" | ||
"github.com/spiral/roadrunner/v2/plugins/logger" | ||
"github.com/spiral/roadrunner/v2/plugins/resetter" | ||
rpcPlugin "github.com/spiral/roadrunner/v2/plugins/rpc" | ||
"github.com/spiral/roadrunner/v2/plugins/server" | ||
"github.com/stretchr/testify/assert" | ||
) | ||
|
||
func TestInformerInit(t *testing.T) { | ||
cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.DebugLevel)) | ||
if err != nil { | ||
t.Fatal(err) | ||
} | ||
|
||
cfg := &config.Viper{ | ||
Path: ".rr-resetter.yaml", | ||
Prefix: "rr", | ||
} | ||
|
||
err = cont.RegisterAll( | ||
cfg, | ||
&server.Plugin{}, | ||
&logger.ZapLogger{}, | ||
&resetter.Plugin{}, | ||
&rpcPlugin.Plugin{}, | ||
&Plugin1{}, | ||
) | ||
assert.NoError(t, err) | ||
|
||
err = cont.Init() | ||
if err != nil { | ||
t.Fatal(err) | ||
} | ||
|
||
ch, err := cont.Serve() | ||
assert.NoError(t, err) | ||
|
||
sig := make(chan os.Signal, 1) | ||
signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) | ||
|
||
tt := time.NewTimer(time.Second * 15) | ||
|
||
t.Run("InformerRpcTest", resetterRpcTest) | ||
|
||
for { | ||
select { | ||
case e := <-ch: | ||
assert.Fail(t, "error", e.Error.Error()) | ||
err = cont.Stop() | ||
if err != nil { | ||
assert.FailNow(t, "error", err.Error()) | ||
} | ||
case <-sig: | ||
err = cont.Stop() | ||
if err != nil { | ||
assert.FailNow(t, "error", err.Error()) | ||
} | ||
return | ||
case <-tt.C: | ||
// timeout | ||
err = cont.Stop() | ||
if err != nil { | ||
assert.FailNow(t, "error", err.Error()) | ||
} | ||
return | ||
} | ||
} | ||
} | ||
|
||
func resetterRpcTest(t *testing.T) { | ||
conn, err := net.Dial("tcp", "127.0.0.1:6001") | ||
assert.NoError(t, err) | ||
client := rpc.NewClientWithCodec(goridge.NewClientCodec(conn)) | ||
// WorkerList contains list of workers. | ||
|
||
var ret bool | ||
err = client.Call("resetter.Reset", "resetter.plugin1", &ret) | ||
assert.NoError(t, err) | ||
assert.True(t, ret) | ||
ret = false | ||
|
||
var services []string | ||
err = client.Call("resetter.List", nil, &services) | ||
assert.NoError(t, err) | ||
if services[0] != "resetter.plugin1" { | ||
t.Fatal("no enough services") | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,66 @@ | ||
package tests | ||
|
||
import ( | ||
"context" | ||
"time" | ||
|
||
"github.com/spiral/roadrunner/v2" | ||
"github.com/spiral/roadrunner/v2/interfaces/server" | ||
"github.com/spiral/roadrunner/v2/plugins/config" | ||
) | ||
|
||
var testPoolConfig = roadrunner.PoolConfig{ | ||
NumWorkers: 10, | ||
MaxJobs: 100, | ||
AllocateTimeout: time.Second * 10, | ||
DestroyTimeout: time.Second * 10, | ||
Supervisor: &roadrunner.SupervisorConfig{ | ||
WatchTick: 60, | ||
TTL: 1000, | ||
IdleTTL: 10, | ||
ExecTTL: 10, | ||
MaxWorkerMemory: 1000, | ||
}, | ||
} | ||
|
||
// Gauge ////////////// | ||
type Plugin1 struct { | ||
config config.Configurer | ||
server server.Server | ||
} | ||
|
||
func (p1 *Plugin1) Init(cfg config.Configurer, server server.Server) error { | ||
p1.config = cfg | ||
p1.server = server | ||
return nil | ||
} | ||
|
||
func (p1 *Plugin1) Serve() chan error { | ||
errCh := make(chan error, 1) | ||
return errCh | ||
} | ||
|
||
func (p1 *Plugin1) Stop() error { | ||
return nil | ||
} | ||
|
||
func (p1 *Plugin1) Name() string { | ||
return "resetter.plugin1" | ||
} | ||
|
||
func (p1 *Plugin1) Reset() error { | ||
pool, err := p1.server.NewWorkerPool(context.Background(), testPoolConfig, nil) | ||
if err != nil { | ||
panic(err) | ||
} | ||
pool.Destroy(context.Background()) | ||
|
||
pool, err = p1.server.NewWorkerPool(context.Background(), testPoolConfig, nil) | ||
if err != nil { | ||
panic(err) | ||
} | ||
|
||
_ = pool | ||
|
||
return nil | ||
} |