Skip to content

Commit

Permalink
[breaking] feature: Added gRPC close signal to Monitor call (allo…
Browse files Browse the repository at this point in the history
…ws graceful close of monitor) (#2276)

* Refactored gRPC Monitor API

* Added Close request to gRPC Monitor API

* Updated docs

* Made CreateEnvForDaeamon available in all integration tests

* Added integration test

* lint: avoid redefinition of the built-in function close

* lint: comparing with == will fail on wrapped errors. Use errors.Is to check for a specific error

* Allow up to 5 seconds for a pluggable monitor to gracefully close

* Made the gRPC daemon actually wait for port close completion
  • Loading branch information
cmaglie authored Jan 2, 2024
1 parent 07cf265 commit 0dbd871
Show file tree
Hide file tree
Showing 13 changed files with 572 additions and 181 deletions.
30 changes: 26 additions & 4 deletions commands/daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"errors"
"fmt"
"io"
"sync/atomic"

"github.com/arduino/arduino-cli/commands"
"github.com/arduino/arduino-cli/commands/board"
Expand Down Expand Up @@ -477,7 +478,11 @@ func (s *ArduinoCoreServerImpl) Monitor(stream rpc.ArduinoCoreService_MonitorSer
return err
}

portProxy, _, err := monitor.Monitor(stream.Context(), req)
openReq := req.GetOpenRequest()
if openReq == nil {
return &cmderrors.InvalidInstanceError{}
}
portProxy, _, err := monitor.Monitor(stream.Context(), openReq)
if err != nil {
return err
}
Expand All @@ -486,6 +491,10 @@ func (s *ArduinoCoreServerImpl) Monitor(stream rpc.ArduinoCoreService_MonitorSer
_ = syncSend.Send(&rpc.MonitorResponse{Success: true})

cancelCtx, cancel := context.WithCancel(stream.Context())
gracefulCloseInitiated := &atomic.Bool{}
gracefuleCloseCtx, gracefulCloseCancel := context.WithCancel(context.Background())

// gRPC stream receiver (gRPC data -> monitor, config, close)
go func() {
defer cancel()
for {
Expand All @@ -497,13 +506,20 @@ func (s *ArduinoCoreServerImpl) Monitor(stream rpc.ArduinoCoreService_MonitorSer
syncSend.Send(&rpc.MonitorResponse{Error: err.Error()})
return
}
if conf := msg.GetPortConfiguration(); conf != nil {
if conf := msg.GetUpdatedConfiguration(); conf != nil {
for _, c := range conf.GetSettings() {
if err := portProxy.Config(c.GetSettingId(), c.GetValue()); err != nil {
syncSend.Send(&rpc.MonitorResponse{Error: err.Error()})
}
}
}
if closeMsg := msg.GetClose(); closeMsg {
gracefulCloseInitiated.Store(true)
if err := portProxy.Close(); err != nil {
logrus.WithError(err).Debug("Error closing monitor port")
}
gracefulCloseCancel()
}
tx := msg.GetTxData()
for len(tx) > 0 {
n, err := portProxy.Write(tx)
Expand All @@ -519,8 +535,9 @@ func (s *ArduinoCoreServerImpl) Monitor(stream rpc.ArduinoCoreService_MonitorSer
}
}()

// gRPC stream sender (monitor -> gRPC)
go func() {
defer cancel()
defer cancel() // unlock the receiver
buff := make([]byte, 4096)
for {
n, err := portProxy.Read(buff)
Expand All @@ -538,6 +555,11 @@ func (s *ArduinoCoreServerImpl) Monitor(stream rpc.ArduinoCoreService_MonitorSer
}()

<-cancelCtx.Done()
portProxy.Close()
if gracefulCloseInitiated.Load() {
// Port closing has been initiated in the receiver
<-gracefuleCloseCtx.Done()
} else {
portProxy.Close()
}
return nil
}
10 changes: 6 additions & 4 deletions commands/daemon/term_example/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,10 @@ func connectToPort(cli commands.ArduinoCoreServiceClient, instance *commands.Ins
log.Fatal("Error opening Monitor:", err)
}
if err := monitorClient.Send(&commands.MonitorRequest{
Instance: instance,
Port: port,
Message: &commands.MonitorRequest_OpenRequest{OpenRequest: &commands.MonitorPortOpenRequest{
Instance: instance,
Port: port,
}},
}); err != nil {
log.Fatal("Error sending Monitor config:", err)
}
Expand All @@ -106,9 +108,9 @@ func connectToPort(cli commands.ArduinoCoreServiceClient, instance *commands.Ins
}
}()

hello := &commands.MonitorRequest{
hello := &commands.MonitorRequest{Message: &commands.MonitorRequest_TxData{
TxData: []byte("HELLO!"),
}
}}
fmt.Println("Send:", hello)
if err := monitorClient.Send(hello); err != nil {
log.Fatal("Monitor send HELLO:", err)
Expand Down
2 changes: 1 addition & 1 deletion commands/monitor/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func (p *PortProxy) Close() error {

// Monitor opens a communication port. It returns a PortProxy to communicate with the port and a PortDescriptor
// that describes the available configuration settings.
func Monitor(ctx context.Context, req *rpc.MonitorRequest) (*PortProxy, *pluggableMonitor.PortDescriptor, error) {
func Monitor(ctx context.Context, req *rpc.MonitorPortOpenRequest) (*PortProxy, *pluggableMonitor.PortDescriptor, error) {
pme, release, err := instances.GetPackageManagerExplorer(req.GetInstance())
if err != nil {
return nil, nil, err
Expand Down
59 changes: 59 additions & 0 deletions docs/UPGRADING.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,65 @@ Here you can find a list of migration guides to handle breaking changes between

We're dropping the `builtin.tools` support. It was the equivalent of Arduino IDE 1.x bundled tools directory.

### The gRPC `cc.arduino.cli.commands.v1.MonitorRequest` message has been changed.

Previously the `MonitorRequest` was a single message used to open the monitor, to stream data, and to change the port
configuration:

```proto
message MonitorRequest {
// Arduino Core Service instance from the `Init` response.
Instance instance = 1;
// Port to open, must be filled only on the first request
Port port = 2;
// The board FQBN we are trying to connect to. This is optional, and it's
// needed to disambiguate if more than one platform provides the pluggable
// monitor for a given port protocol.
string fqbn = 3;
// Data to send to the port
bytes tx_data = 4;
// Port configuration, optional, contains settings of the port to be applied
MonitorPortConfiguration port_configuration = 5;
}
```

Now the meaning of the fields has been clarified with the `oneof` clause, making it more explicit:

```proto
message MonitorRequest {
oneof message {
// Open request, it must be the first incoming message
MonitorPortOpenRequest open_request = 1;
// Data to send to the port
bytes tx_data = 2;
// Port configuration, contains settings of the port to be changed
MonitorPortConfiguration updated_configuration = 3;
// Close message, set to true to gracefully close a port (this ensure
// that the gRPC streaming call is closed by the daemon AFTER the port
// has been successfully closed)
bool close = 4;
}
}
message MonitorPortOpenRequest {
// Arduino Core Service instance from the `Init` response.
Instance instance = 1;
// Port to open, must be filled only on the first request
Port port = 2;
// The board FQBN we are trying to connect to. This is optional, and it's
// needed to disambiguate if more than one platform provides the pluggable
// monitor for a given port protocol.
string fqbn = 3;
// Port configuration, optional, contains settings of the port to be applied
MonitorPortConfiguration port_configuration = 4;
}
```

Now the message field `MonitorPortOpenRequest.open_request` must be sent in the first message after opening the
streaming gRPC call.

The identification number of the fields has been changed, this change is not binary compatible with old clients.

### Some golang modules from `github.com/arduino/arduino-cli/*` have been made private.

The following golang modules are no longer available as public API:
Expand Down
2 changes: 1 addition & 1 deletion internal/arduino/monitor/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ func (mon *PluggableMonitor) Close() error {
if err := mon.sendCommand("CLOSE\n"); err != nil {
return err
}
_, err := mon.waitMessage(time.Millisecond*250, "close")
_, err := mon.waitMessage(time.Millisecond*5000, "close")
return err
}

Expand Down
2 changes: 1 addition & 1 deletion internal/cli/monitor/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ func runMonitorCmd(
}
}
}
portProxy, _, err := monitor.Monitor(context.Background(), &rpc.MonitorRequest{
portProxy, _, err := monitor.Monitor(context.Background(), &rpc.MonitorPortOpenRequest{
Instance: inst,
Port: &rpc.Port{Address: portAddress, Protocol: portProtocol},
Fqbn: fqbn,
Expand Down
34 changes: 34 additions & 0 deletions internal/integrationtest/arduino-cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,21 @@ func NewArduinoCliWithinEnvironment(env *Environment, config *ArduinoCLIConfig)
return cli
}

// CreateEnvForDaemon performs the minimum required operations to start the arduino-cli daemon.
// It returns a testsuite.Environment and an ArduinoCLI client to perform the integration tests.
// The Environment must be disposed by calling the CleanUp method via defer.
func CreateEnvForDaemon(t *testing.T) (*Environment, *ArduinoCLI) {
env := NewEnvironment(t)

cli := NewArduinoCliWithinEnvironment(env, &ArduinoCLIConfig{
ArduinoCLIPath: FindRepositoryRootPath(t).Join("arduino-cli"),
UseSharedStagingFolder: true,
})

_ = cli.StartDaemon(false)
return env, cli
}

// CleanUp closes the Arduino CLI client.
func (cli *ArduinoCLI) CleanUp() {
if cli.proc != nil {
Expand Down Expand Up @@ -596,3 +611,22 @@ func (inst *ArduinoCLIInstance) PlatformSearch(ctx context.Context, args string,
resp, err := inst.cli.daemonClient.PlatformSearch(ctx, req)
return resp, err
}

// Monitor calls the "Monitor" gRPC method and sends the OpenRequest message.
func (inst *ArduinoCLIInstance) Monitor(ctx context.Context, port *commands.Port) (commands.ArduinoCoreService_MonitorClient, error) {
req := &commands.MonitorRequest{}
logCallf(">>> Monitor(%+v)\n", req)
monitorClient, err := inst.cli.daemonClient.Monitor(ctx)
if err != nil {
return nil, err
}
err = monitorClient.Send(&commands.MonitorRequest{
Message: &commands.MonitorRequest_OpenRequest{
OpenRequest: &commands.MonitorPortOpenRequest{
Instance: inst.instance,
Port: port,
},
},
})
return monitorClient, err
}
3 changes: 2 additions & 1 deletion internal/integrationtest/daemon/daemon_concurrency_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"testing"
"time"

"github.com/arduino/arduino-cli/internal/integrationtest"
"github.com/arduino/arduino-cli/rpc/cc/arduino/cli/commands/v1"
"github.com/arduino/go-paths-helper"
"github.com/stretchr/testify/require"
Expand All @@ -31,7 +32,7 @@ import (
func TestArduinoCliDaemonCompileWithLotOfOutput(t *testing.T) {
// See: https://github.com/arduino/arduino-cli/issues/2169

env, cli := createEnvForDaemon(t)
env, cli := integrationtest.CreateEnvForDaemon(t)
defer env.CleanUp()

_, _, err := cli.Run("core", "install", "arduino:avr")
Expand Down
37 changes: 11 additions & 26 deletions internal/integrationtest/daemon/daemon_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ import (
func TestArduinoCliDaemon(t *testing.T) {
// See: https://github.com/arduino/arduino-cli/pull/1804

env, cli := createEnvForDaemon(t)
env, cli := integrationtest.CreateEnvForDaemon(t)
defer env.CleanUp()

grpcInst := cli.Create()
Expand Down Expand Up @@ -96,7 +96,7 @@ func TestArduinoCliDaemon(t *testing.T) {
func TestDaemonAutoUpdateIndexOnFirstInit(t *testing.T) {
// https://github.com/arduino/arduino-cli/issues/1529

env, cli := createEnvForDaemon(t)
env, cli := integrationtest.CreateEnvForDaemon(t)
defer env.CleanUp()

grpcInst := cli.Create()
Expand All @@ -110,26 +110,11 @@ func TestDaemonAutoUpdateIndexOnFirstInit(t *testing.T) {
require.FileExists(t, cli.DataDir().Join("package_index.json").String())
}

// createEnvForDaemon performs the minimum required operations to start the arduino-cli daemon.
// It returns a testsuite.Environment and an ArduinoCLI client to perform the integration tests.
// The Environment must be disposed by calling the CleanUp method via defer.
func createEnvForDaemon(t *testing.T) (*integrationtest.Environment, *integrationtest.ArduinoCLI) {
env := integrationtest.NewEnvironment(t)

cli := integrationtest.NewArduinoCliWithinEnvironment(env, &integrationtest.ArduinoCLIConfig{
ArduinoCLIPath: integrationtest.FindRepositoryRootPath(t).Join("arduino-cli"),
UseSharedStagingFolder: true,
})

_ = cli.StartDaemon(false)
return env, cli
}

func TestDaemonCompileOptions(t *testing.T) {
// See: https://github.com/arduino/arduino-cli/issues/1614
// See: https://github.com/arduino/arduino-cli/pull/1820

env, cli := createEnvForDaemon(t)
env, cli := integrationtest.CreateEnvForDaemon(t)
defer env.CleanUp()

grpcInst := cli.Create()
Expand Down Expand Up @@ -203,7 +188,7 @@ func TestDaemonCompileOptions(t *testing.T) {
func TestDaemonCompileAfterFailedLibInstall(t *testing.T) {
// See: https://github.com/arduino/arduino-cli/issues/1812

env, cli := createEnvForDaemon(t)
env, cli := integrationtest.CreateEnvForDaemon(t)
defer env.CleanUp()

grpcInst := cli.Create()
Expand Down Expand Up @@ -233,7 +218,7 @@ func TestDaemonCompileAfterFailedLibInstall(t *testing.T) {
}

func TestDaemonCoreUpdateIndex(t *testing.T) {
env, cli := createEnvForDaemon(t)
env, cli := integrationtest.CreateEnvForDaemon(t)
defer env.CleanUp()

grpcInst := cli.Create()
Expand Down Expand Up @@ -269,7 +254,7 @@ func TestDaemonCoreUpdateIndex(t *testing.T) {
}

func TestDaemonBundleLibInstall(t *testing.T) {
env, cli := createEnvForDaemon(t)
env, cli := integrationtest.CreateEnvForDaemon(t)
defer env.CleanUp()

grpcInst := cli.Create()
Expand Down Expand Up @@ -409,7 +394,7 @@ func TestDaemonLibrariesRescanOnInstall(t *testing.T) {
with the gprc instance
The last attempt is expected to not raise an error
*/
env, cli := createEnvForDaemon(t)
env, cli := integrationtest.CreateEnvForDaemon(t)
defer env.CleanUp()

grpcInst := cli.Create()
Expand Down Expand Up @@ -465,7 +450,7 @@ func TestDaemonCoreUpgradePlatform(t *testing.T) {

t.Run("upgraded successfully with additional urls", func(t *testing.T) {
t.Run("and install.json is present", func(t *testing.T) {
env, cli := createEnvForDaemon(t)
env, cli := integrationtest.CreateEnvForDaemon(t)
defer env.CleanUp()

grpcInst := cli.Create()
Expand All @@ -481,7 +466,7 @@ func TestDaemonCoreUpgradePlatform(t *testing.T) {
require.False(t, platform.GetRelease().GetMissingMetadata()) // install.json is present
})
t.Run("and install.json is missing", func(t *testing.T) {
env, cli := createEnvForDaemon(t)
env, cli := integrationtest.CreateEnvForDaemon(t)
defer env.CleanUp()

grpcInst := cli.Create()
Expand All @@ -504,7 +489,7 @@ func TestDaemonCoreUpgradePlatform(t *testing.T) {

t.Run("upgrade failed", func(t *testing.T) {
t.Run("without additional URLs", func(t *testing.T) {
env, cli := createEnvForDaemon(t)
env, cli := integrationtest.CreateEnvForDaemon(t)
defer env.CleanUp()

grpcInst := cli.Create()
Expand All @@ -524,7 +509,7 @@ func TestDaemonCoreUpgradePlatform(t *testing.T) {
require.False(t, platform.GetRelease().GetMissingMetadata()) // install.json is present
})
t.Run("missing both additional URLs and install.json", func(t *testing.T) {
env, cli := createEnvForDaemon(t)
env, cli := integrationtest.CreateEnvForDaemon(t)
defer env.CleanUp()

grpcInst := cli.Create()
Expand Down
Loading

0 comments on commit 0dbd871

Please sign in to comment.