diff --git a/build/dial.go b/build/dial.go new file mode 100644 index 00000000000..92c6adc7648 --- /dev/null +++ b/build/dial.go @@ -0,0 +1,59 @@ +package build + +import ( + "context" + "net" + + "github.com/containerd/containerd/platforms" + "github.com/docker/buildx/builder" + "github.com/docker/buildx/util/progress" + v1 "github.com/opencontainers/image-spec/specs-go/v1" + "github.com/pkg/errors" +) + +func Dial(ctx context.Context, nodes []builder.Node, pw progress.Writer, platform *v1.Platform) (net.Conn, error) { + nodes, err := filterAvailableNodes(nodes) + if err != nil { + return nil, err + } + + if len(nodes) == 0 { + return nil, errors.New("no nodes available") + } + + var pls []v1.Platform + if platform != nil { + pls = []v1.Platform{*platform} + } + + opts := map[string]Options{"": {Platforms: pls}} + resolved, err := resolveDrivers(ctx, nodes, opts, pw) + if err != nil { + return nil, err + } + + for _, ls := range resolved { + for _, rn := range ls { + if platform != nil { + p := *platform + var found bool + for _, pp := range rn.platforms { + if platforms.Only(p).Match(pp) { + found = true + break + } + } + if !found { + continue + } + } + + conn, err := nodes[rn.driverIndex].Driver.Dial(ctx) + if err == nil { + return conn, nil + } + } + } + + return nil, errors.Errorf("no nodes available") +} diff --git a/commands/dial_stdio.go b/commands/dial_stdio.go new file mode 100644 index 00000000000..15143f04aa4 --- /dev/null +++ b/commands/dial_stdio.go @@ -0,0 +1,130 @@ +package commands + +import ( + "io" + "net" + "os" + + "github.com/containerd/containerd/platforms" + "github.com/docker/buildx/build" + "github.com/docker/buildx/builder" + "github.com/docker/buildx/util/progress" + "github.com/docker/cli/cli/command" + "github.com/moby/buildkit/util/appcontext" + "github.com/moby/buildkit/util/progress/progressui" + v1 "github.com/opencontainers/image-spec/specs-go/v1" + "github.com/pkg/errors" + "github.com/spf13/cobra" + "golang.org/x/sync/errgroup" +) + +type stdioOptions struct { + builder string + platform string +} + +func runDialStdio(dockerCli command.Cli, opts stdioOptions) error { + ctx := appcontext.Context() + + contextPathHash, _ := os.Getwd() + b, err := builder.New(dockerCli, + builder.WithName(opts.builder), + builder.WithContextPathHash(contextPathHash), + ) + if err != nil { + return err + } + + if err = updateLastActivity(dockerCli, b.NodeGroup); err != nil { + return errors.Wrapf(err, "failed to update builder last activity time") + } + nodes, err := b.LoadNodes(ctx) + if err != nil { + return err + } + + printer, err := progress.NewPrinter(ctx, os.Stderr, progressui.AutoMode, progress.WithPhase("dial-stdio"), progress.WithDesc("builder: "+b.Name, "builder:"+b.Name)) + if err != nil { + return err + } + + var p *v1.Platform + if opts.platform != "" { + pp, err := platforms.Parse(opts.platform) + if err != nil { + return errors.Wrapf(err, "invalid platform %q", opts.platform) + } + p = &pp + } + + defer printer.Wait() + + return progress.Wrap("Proxying to builder", printer.Write, func(sub progress.SubLogger) error { + var conn net.Conn + + err := sub.Wrap("Dialing builder", func() error { + conn, err = build.Dial(ctx, nodes, printer, p) + if err != nil { + return err + } + return nil + }) + if err != nil { + return err + } + + defer conn.Close() + + go func() { + <-ctx.Done() + closeWrite(conn) + }() + + var eg errgroup.Group + + eg.Go(func() error { + _, err := io.Copy(conn, os.Stdin) + closeWrite(conn) + return err + }) + eg.Go(func() error { + _, err := io.Copy(os.Stdout, conn) + closeRead(conn) + return err + }) + return eg.Wait() + }) +} + +func closeRead(conn net.Conn) error { + if c, ok := conn.(interface{ CloseRead() error }); ok { + return c.CloseRead() + } + return conn.Close() +} + +func closeWrite(conn net.Conn) error { + if c, ok := conn.(interface{ CloseWrite() error }); ok { + return c.CloseWrite() + } + return conn.Close() +} + +func dialStdioCmd(dockerCli command.Cli, rootOpts *rootOptions) *cobra.Command { + opts := stdioOptions{} + + cmd := &cobra.Command{ + Use: "dial-stdio", + Short: "Dial stdio", + Args: cobra.NoArgs, + RunE: func(cmd *cobra.Command, args []string) error { + opts.builder = rootOpts.builder + return runDialStdio(dockerCli, opts) + }, + } + + flags := cmd.Flags() + cmd.Flags() + flags.StringVar(&opts.platform, "platform", os.Getenv("DOCKER_DEFAULT_PLATFORM"), "Target platform: this is used for node selection") + return cmd +} diff --git a/commands/root.go b/commands/root.go index 43a43a4c10c..707ca97ad85 100644 --- a/commands/root.go +++ b/commands/root.go @@ -75,6 +75,7 @@ func addCommands(cmd *cobra.Command, dockerCli command.Cli) { buildCmd(dockerCli, opts, nil), bakeCmd(dockerCli, opts), createCmd(dockerCli), + dialStdioCmd(dockerCli, opts), rmCmd(dockerCli, opts), lsCmd(dockerCli), useCmd(dockerCli, opts), diff --git a/docs/reference/buildx.md b/docs/reference/buildx.md index c1b760dd176..4e7ff0debda 100644 --- a/docs/reference/buildx.md +++ b/docs/reference/buildx.md @@ -15,6 +15,7 @@ Extended build capabilities with BuildKit | [`build`](buildx_build.md) | Start a build | | [`create`](buildx_create.md) | Create a new builder instance | | [`debug`](buildx_debug.md) | Start debugger | +| [`dial-stdio`](buildx_dial-stdio.md) | Dial stdio | | [`du`](buildx_du.md) | Disk usage | | [`imagetools`](buildx_imagetools.md) | Commands to work on images in registry | | [`inspect`](buildx_inspect.md) | Inspect current builder instance | diff --git a/docs/reference/buildx_dial-stdio.md b/docs/reference/buildx_dial-stdio.md new file mode 100644 index 00000000000..d6218a2bf1d --- /dev/null +++ b/docs/reference/buildx_dial-stdio.md @@ -0,0 +1,15 @@ +# docker buildx dial-stdio + + +Dial stdio + +### Options + +| Name | Type | Default | Description | +|:-------------|:---------|:--------|:-------------------------------------------------| +| `--builder` | `string` | | Override the configured builder instance | +| `--platform` | `string` | | Target platform: this is used for node selection | + + + + diff --git a/driver/docker-container/driver.go b/driver/docker-container/driver.go index b79e2f00e9f..b6ad865d011 100644 --- a/driver/docker-container/driver.go +++ b/driver/docker-container/driver.go @@ -380,13 +380,20 @@ func (d *Driver) Rm(ctx context.Context, force, rmVolume, rmDaemon bool) error { return nil } -func (d *Driver) Client(ctx context.Context) (*client.Client, error) { +func (d *Driver) Dial(ctx context.Context) (net.Conn, error) { _, conn, err := d.exec(ctx, []string{"buildctl", "dial-stdio"}) if err != nil { return nil, err } - conn = demuxConn(conn) + return conn, nil +} + +func (d *Driver) Client(ctx context.Context) (*client.Client, error) { + conn, err := d.Dial(ctx) + if err != nil { + return nil, err + } exp, err := detect.Exporter() if err != nil { diff --git a/driver/docker/driver.go b/driver/docker/driver.go index 0bed4187782..6e1d95e06c1 100644 --- a/driver/docker/driver.go +++ b/driver/docker/driver.go @@ -55,10 +55,14 @@ func (d *Driver) Rm(ctx context.Context, force, rmVolume, rmDaemon bool) error { return nil } +func (d *Driver) Dial(ctx context.Context) (net.Conn, error) { + return d.DockerAPI.DialHijack(ctx, "/grpc", "h2c", d.DialMeta) +} + func (d *Driver) Client(ctx context.Context) (*client.Client, error) { opts := []client.ClientOpt{ client.WithContextDialer(func(context.Context, string) (net.Conn, error) { - return d.DockerAPI.DialHijack(ctx, "/grpc", "h2c", d.DialMeta) + return d.Dial(ctx) }), client.WithSessionDialer(func(ctx context.Context, proto string, meta map[string][]string) (net.Conn, error) { return d.DockerAPI.DialHijack(ctx, "/session", proto, meta) }), diff --git a/driver/driver.go b/driver/driver.go index 16d43d7af3a..6d3c546737c 100644 --- a/driver/driver.go +++ b/driver/driver.go @@ -59,6 +59,7 @@ type Driver interface { Version(context.Context) (string, error) Stop(ctx context.Context, force bool) error Rm(ctx context.Context, force, rmVolume, rmDaemon bool) error + Dial(ctx context.Context) (net.Conn, error) Client(ctx context.Context) (*client.Client, error) Features(ctx context.Context) map[Feature]bool HostGatewayIP(ctx context.Context) (net.IP, error) diff --git a/driver/kubernetes/driver.go b/driver/kubernetes/driver.go index d46448be4ed..6467950b26d 100644 --- a/driver/kubernetes/driver.go +++ b/driver/kubernetes/driver.go @@ -186,7 +186,7 @@ func (d *Driver) Rm(ctx context.Context, force, rmVolume, rmDaemon bool) error { return nil } -func (d *Driver) Client(ctx context.Context) (*client.Client, error) { +func (d *Driver) Dial(ctx context.Context) (net.Conn, error) { restClient := d.clientset.CoreV1().RESTClient() restClientConfig, err := d.KubeClientConfig.ClientConfig() if err != nil { @@ -205,7 +205,10 @@ func (d *Driver) Client(ctx context.Context) (*client.Client, error) { if err != nil { return nil, err } + return conn, nil +} +func (d *Driver) Client(ctx context.Context) (*client.Client, error) { exp, err := detect.Exporter() if err != nil { return nil, err @@ -213,7 +216,7 @@ func (d *Driver) Client(ctx context.Context) (*client.Client, error) { var opts []client.ClientOpt opts = append(opts, client.WithContextDialer(func(context.Context, string) (net.Conn, error) { - return conn, nil + return d.Dial(ctx) })) if td, ok := exp.(client.TracerDelegate); ok { opts = append(opts, client.WithTracerDelegate(td)) diff --git a/driver/remote/driver.go b/driver/remote/driver.go index 2efd2263651..43fc04a9e1e 100644 --- a/driver/remote/driver.go +++ b/driver/remote/driver.go @@ -2,13 +2,15 @@ package remote import ( "context" - "errors" + "fmt" "net" + "strings" "github.com/docker/buildx/driver" "github.com/docker/buildx/util/progress" "github.com/moby/buildkit/client" "github.com/moby/buildkit/util/tracing/detect" + "github.com/pkg/errors" ) type Driver struct { @@ -84,6 +86,23 @@ func (d *Driver) Client(ctx context.Context) (*client.Client, error) { return client.New(ctx, d.InitConfig.EndpointAddr, opts...) } +func (d *Driver) Dial(ctx context.Context) (net.Conn, error) { + if d.tlsOpts != nil { + // TODO: add TLS support + return nil, errors.New("TLS dialer is not supported yet") + } + + network, addr, ok := strings.Cut(d.InitConfig.EndpointAddr, "://") + if !ok { + return nil, fmt.Errorf("invalid endpoint address: %s", d.InitConfig.EndpointAddr) + } + conn, err := net.Dial(network, addr) + if err != nil { + return nil, errors.Wrap(err, "error ") + } + return conn, nil +} + func (d *Driver) Features(ctx context.Context) map[driver.Feature]bool { return map[driver.Feature]bool{ driver.OCIExporter: true, diff --git a/tests/dialstdio.go b/tests/dialstdio.go new file mode 100644 index 00000000000..6fcb6d54b76 --- /dev/null +++ b/tests/dialstdio.go @@ -0,0 +1,108 @@ +package tests + +import ( + "bytes" + "context" + "net" + "os" + "os/exec" + "path/filepath" + "testing" + + "github.com/docker/buildx/util/progress" + "github.com/moby/buildkit/client" + "github.com/moby/buildkit/client/llb" + gwclient "github.com/moby/buildkit/frontend/gateway/client" + "github.com/moby/buildkit/util/progress/progressui" + "github.com/moby/buildkit/util/testutil/integration" + "github.com/pkg/errors" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +var dialstdioTests = []func(t *testing.T, sb integration.Sandbox){ + testDialStdio, +} + +func testDialStdio(t *testing.T, sb integration.Sandbox) { + errBuf := bytes.NewBuffer(nil) + defer func() { + if t.Failed() { + t.Log(errBuf.String()) + } + }() + var cmd *exec.Cmd + c, err := client.New(sb.Context(), "", client.WithContextDialer(func(ctx context.Context, _ string) (net.Conn, error) { + c1, c2 := net.Pipe() + cmd = buildxCmd(sb, withArgs("dial-stdio"), func(cmd *exec.Cmd) { + cmd.Stdin = c1 + cmd.Stdout = c1 + cmd.Stderr = errBuf + }) + + if err := cmd.Start(); err != nil { + c1.Close() + c2.Close() + return nil, errors.Wrap(err, errBuf.String()) + } + + return c2, nil + })) + require.NoError(t, err) + + defer func() { + c.Close() + // Since the client is closed (and as such the connection shutdown), the buildx command should exit cleanly. + assert.NoError(t, cmd.Wait()) + }() + + _, err = c.Info(sb.Context()) + require.NoError(t, err) + + require.Contains(t, errBuf.String(), "builder: "+sb.Address()) + + dir := t.TempDir() + + f, err := os.CreateTemp(dir, "log") + require.NoError(t, err) + defer f.Close() + + defer func() { + if t.Failed() { + dt, _ := os.ReadFile(f.Name()) + t.Log(string(dt)) + } + }() + + p, err := progress.NewPrinter(sb.Context(), f, progressui.AutoMode) + require.NoError(t, err) + + ch, chDone := progress.NewChannel(p) + done := func() { + select { + case <-sb.Context().Done(): + case <-chDone: + } + } + + _, err = c.Build(sb.Context(), client.SolveOpt{ + Exports: []client.ExportEntry{ + {Type: "local", OutputDir: dir}, + }, + }, "", func(ctx context.Context, gwc gwclient.Client) (*gwclient.Result, error) { + def, err := llb.Scratch().File(llb.Mkfile("hello", 0o600, []byte("world"))).Marshal(ctx) + if err != nil { + return nil, err + } + + return gwc.Solve(ctx, gwclient.SolveRequest{ + Definition: def.ToPB(), + }) + }, ch) + done() + require.NoError(t, err) + + dt, err := os.ReadFile(filepath.Join(dir, "/hello")) + require.NoError(t, err) + require.Equal(t, "world", string(dt)) +} diff --git a/tests/integration_test.go b/tests/integration_test.go index db2e5d447b6..381ef25f244 100644 --- a/tests/integration_test.go +++ b/tests/integration_test.go @@ -27,6 +27,7 @@ func TestIntegration(t *testing.T) { tests = append(tests, lsTests...) tests = append(tests, imagetoolsTests...) tests = append(tests, versionTests...) + tests = append(tests, dialstdioTests...) testIntegration(t, tests...) }