Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Lsps0 implementation #114

Merged
merged 9 commits into from
Aug 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions .github/workflows/integration_tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -124,3 +124,35 @@ jobs:
CLIENT_REF: ${{ env.CLIENT_REF }}
GO_VERSION: ${{ env.GO_VERSION }}
CLN_VERSION: ${{ env.CLN_VERSION }}

run-lsps2-test:
runs-on: ubuntu-22.04
needs:
- setup-itest
- setup-bitcoin-core
- setup-cln
- build-lspd
name: test ${{ matrix.implementation }} ${{ matrix.test }}
strategy:
max-parallel: 6
matrix:
test: [
testLsps0GetProtocolVersions
]
implementation: [
CLN
]
steps:
- name: Checkout
uses: actions/checkout@v3

- name: Run and Process Test State
uses: ./.github/actions/test-lspd
with:
TESTRE: "TestLspd/${{ matrix.implementation }}-lspd:_${{ matrix.test }}"
artifact-name: TestLspd-${{ matrix.implementation }}-lspd_${{ matrix.test }}
bitcoin-version: ${{ env.BITCOIN_VERSION }}
LSP_REF: ${{ env.LSP_REF }}
CLIENT_REF: ${{ env.CLIENT_REF }}
GO_VERSION: ${{ env.GO_VERSION }}
CLN_VERSION: ${{ env.CLN_VERSION }}
175 changes: 175 additions & 0 deletions cln/custom_msg_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
package cln

import (
"context"
"encoding/binary"
"encoding/hex"
"log"
"sync"
"time"

"github.com/breez/lspd/cln_plugin/proto"
"github.com/breez/lspd/config"
"github.com/breez/lspd/lightning"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/status"
)

type CustomMsgClient struct {
lightning.CustomMsgClient
pluginAddress string
client *ClnClient
pluginClient proto.ClnPluginClient
initWg sync.WaitGroup
stopRequested bool
ctx context.Context
cancel context.CancelFunc
recvQueue chan *lightning.CustomMessage
}

func NewCustomMsgClient(conf *config.ClnConfig, client *ClnClient) *CustomMsgClient {
c := &CustomMsgClient{
pluginAddress: conf.PluginAddress,
client: client,
recvQueue: make(chan *lightning.CustomMessage, 10000),
}

c.initWg.Add(1)
return c
}

func (c *CustomMsgClient) Start() error {
ctx, cancel := context.WithCancel(context.Background())
log.Printf("Dialing cln plugin on '%s'", c.pluginAddress)
conn, err := grpc.DialContext(
ctx,
c.pluginAddress,
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithKeepaliveParams(keepalive.ClientParameters{
Time: time.Duration(10) * time.Second,
Timeout: time.Duration(10) * time.Second,
}),
)
if err != nil {
log.Printf("grpc.Dial error: %v", err)
cancel()
return err
}

c.pluginClient = proto.NewClnPluginClient(conn)
c.ctx = ctx
c.cancel = cancel
c.stopRequested = false
return c.listen()
}

func (i *CustomMsgClient) WaitStarted() {
i.initWg.Wait()
}

func (i *CustomMsgClient) listen() error {
inited := false

defer func() {
if !inited {
i.initWg.Done()
}
log.Printf("CLN custom msg listen(): stopping.")
}()

for {
if i.ctx.Err() != nil {
return i.ctx.Err()
}

log.Printf("Connecting CLN msg stream.")
msgClient, err := i.pluginClient.CustomMsgStream(i.ctx, &proto.CustomMessageRequest{})
if err != nil {
log.Printf("pluginClient.CustomMsgStream(): %v", err)
<-time.After(time.Second)
continue
}

for {
if i.ctx.Err() != nil {
return i.ctx.Err()
}

if !inited {
inited = true
i.initWg.Done()
}

// Stop receiving if stop if requested.
if i.stopRequested {
return nil
}

request, err := msgClient.Recv()
if err != nil {
// If it is just the error result of the context cancellation
// the we exit silently.
status, ok := status.FromError(err)
if ok && status.Code() == codes.Canceled {
log.Printf("Got code canceled. Break.")
break
}

// Otherwise it an unexpected error, we log.
log.Printf("unexpected error in interceptor.Recv() %v", err)
break
}

payload, err := hex.DecodeString(request.Payload)
if err != nil {
log.Printf("Error hex decoding cln custom msg payload from peer '%s': %v", request.PeerId, err)
continue
}

if len(payload) < 3 {
log.Printf("UNUSUAL: Custom msg payload from peer '%s' is too small", request.PeerId)
continue
}

t := binary.BigEndian.Uint16(payload)
payload = payload[2:]
i.recvQueue <- &lightning.CustomMessage{
PeerId: request.PeerId,
Type: uint32(t),
Data: payload,
}
}

<-time.After(time.Second)
}
}

func (c *CustomMsgClient) Recv() (*lightning.CustomMessage, error) {
select {
case msg := <-c.recvQueue:
return msg, nil
case <-c.ctx.Done():
return nil, c.ctx.Err()
}
}

func (c *CustomMsgClient) Send(msg *lightning.CustomMessage) error {
var t [2]byte
binary.BigEndian.PutUint16(t[:], uint16(msg.Type))

m := hex.EncodeToString(t[:]) + hex.EncodeToString(msg.Data)
_, err := c.client.client.SendCustomMessage(msg.PeerId, m)
return err
}

func (i *CustomMsgClient) Stop() error {
// Setting stopRequested to true will make the interceptor stop receiving.
i.stopRequested = true

// Close the grpc connection.
i.cancel()
return nil
}
5 changes: 5 additions & 0 deletions cln_plugin/cln_messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,3 +115,8 @@ type LogNotification struct {
Level string `json:"level"`
Message string `json:"message"`
}

type CustomMessageRequest struct {
PeerId string `json:"peer_id"`
Payload string `json:"payload"`
}
63 changes: 57 additions & 6 deletions cln_plugin/cln_plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ const (
var (
DefaultSubscriberTimeout = "1m"
DefaultChannelAcceptorScript = ""
LspsFeatureBit = "0200000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000"
)

const (
Expand Down Expand Up @@ -126,13 +127,37 @@ func (c *ClnPlugin) listenRequests() error {
}

// Listens to responses to htlc_accepted requests from the grpc server.
func (c *ClnPlugin) listenServer() {
func (c *ClnPlugin) htlcListenServer() {
for {
select {
case <-c.done:
return
default:
id, result := c.server.Receive()
id, result := c.server.ReceiveHtlcResolution()

// The server may return nil if it is stopped.
if result == nil {
continue
}

serid, _ := json.Marshal(&id)
c.sendToCln(&Response{
Id: serid,
JsonRpc: SpecVersion,
Result: result,
})
}
}
}

// Listens to responses to custommsg requests from the grpc server.
func (c *ClnPlugin) custommsgListenServer() {
for {
select {
case <-c.done:
return
default:
id, result := c.server.ReceiveCustomMessageResponse()

// The server may return nil if it is stopped.
if result == nil {
Expand Down Expand Up @@ -227,6 +252,8 @@ func (c *ClnPlugin) processRequest(request *Request) {
})
case "setchannelacceptscript":
c.handleSetChannelAcceptScript(request)
case "custommsg":
c.handleCustomMsg(request)
default:
c.sendError(
request.Id,
Expand Down Expand Up @@ -273,8 +300,9 @@ func (c *ClnPlugin) handleGetManifest(request *Request) {
Description: "Set the startlark channel acceptor script",
},
},
Dynamic: true,
Dynamic: false,
Hooks: []Hook{
{Name: "custommsg"},
{Name: "htlc_accepted"},
{Name: "openchannel"},
{Name: "openchannel2"},
Expand All @@ -283,6 +311,9 @@ func (c *ClnPlugin) handleGetManifest(request *Request) {
Subscriptions: []string{
"shutdown",
},
FeatureBits: &FeatureBits{
Node: &LspsFeatureBit,
},
},
})
}
Expand Down Expand Up @@ -404,8 +435,9 @@ func (c *ClnPlugin) handleInit(request *Request) {
return
}

// Listen for responses from the grpc server.
go c.listenServer()
// Listen for htlc and custommsg responses from the grpc server.
go c.htlcListenServer()
go c.custommsgListenServer()

// Let cln know the plugin is initialized.
c.sendToCln(&Response{
Expand Down Expand Up @@ -436,7 +468,7 @@ func (c *ClnPlugin) handleHtlcAccepted(request *Request) {
return
}

c.server.Send(idToString(request.Id), &htlc)
c.server.SendHtlcAccepted(idToString(request.Id), &htlc)
}

func (c *ClnPlugin) handleSetChannelAcceptScript(request *Request) {
Expand Down Expand Up @@ -464,6 +496,25 @@ func (c *ClnPlugin) handleSetChannelAcceptScript(request *Request) {
})
}

func (c *ClnPlugin) handleCustomMsg(request *Request) {
var custommsg CustomMessageRequest
err := json.Unmarshal(request.Params, &custommsg)
if err != nil {
c.sendError(
request.Id,
ParseError,
fmt.Sprintf(
"Failed to unmarshal custommsg params:%s [%s]",
err.Error(),
request.Params,
),
)
return
}

c.server.SendCustomMessage(idToString(request.Id), &custommsg)
}

func unmarshalOpenChannel(request *Request) (r json.RawMessage, err error) {
switch request.Method {
case "openchannel":
Expand Down
Loading