From 0df936d60b06b08716d744797cec234ca0611002 Mon Sep 17 00:00:00 2001 From: Jesse de Wit Date: Thu, 10 Aug 2023 08:50:47 +0200 Subject: [PATCH 1/9] cln_plugin: rename htlc related fields/functions --- cln_plugin/cln_plugin.go | 12 +++++---- cln_plugin/server.go | 58 ++++++++++++++++++++-------------------- 2 files changed, 36 insertions(+), 34 deletions(-) diff --git a/cln_plugin/cln_plugin.go b/cln_plugin/cln_plugin.go index 778fa33b..ba10f1f4 100644 --- a/cln_plugin/cln_plugin.go +++ b/cln_plugin/cln_plugin.go @@ -126,13 +126,13 @@ func (c *ClnPlugin) listenRequests() error { } // Listens to responses to htlc_accepted requests from the grpc server. -func (c *ClnPlugin) listenServer() { +func (c *ClnPlugin) htlcListenServer() { for { select { case <-c.done: return default: - id, result := c.server.Receive() + id, result := c.server.ReceiveHtlcResolution() // The server may return nil if it is stopped. if result == nil { @@ -227,6 +227,8 @@ func (c *ClnPlugin) processRequest(request *Request) { }) case "setchannelacceptscript": c.handleSetChannelAcceptScript(request) + case "custommsg": + c.handleCustomMsg(request) default: c.sendError( request.Id, @@ -404,8 +406,8 @@ func (c *ClnPlugin) handleInit(request *Request) { return } - // Listen for responses from the grpc server. - go c.listenServer() + // Listen for htlc responses from the grpc server. + go c.htlcListenServer() // Let cln know the plugin is initialized. c.sendToCln(&Response{ @@ -436,7 +438,7 @@ func (c *ClnPlugin) handleHtlcAccepted(request *Request) { return } - c.server.Send(idToString(request.Id), &htlc) + c.server.SendHtlcAccepted(idToString(request.Id), &htlc) } func (c *ClnPlugin) handleSetChannelAcceptScript(request *Request) { diff --git a/cln_plugin/server.go b/cln_plugin/server.go index b03c97c3..8b8160f6 100644 --- a/cln_plugin/server.go +++ b/cln_plugin/server.go @@ -31,14 +31,14 @@ type server struct { subscriberTimeout time.Duration grpcServer *grpc.Server mtx sync.Mutex - stream proto.ClnPlugin_HtlcStreamServer - newSubscriber chan struct{} started chan struct{} done chan struct{} completed chan struct{} startError chan error - sendQueue chan *htlcAcceptedMsg - recvQueue chan *htlcResultMsg + htlcnewSubscriber chan struct{} + htlcStream proto.ClnPlugin_HtlcStreamServer + htlcSendQueue chan *htlcAcceptedMsg + htlcRecvQueue chan *htlcResultMsg } // Creates a new grpc server @@ -48,13 +48,13 @@ func NewServer(listenAddress string, subscriberTimeout time.Duration) *server { listenAddress: listenAddress, subscriberTimeout: subscriberTimeout, // The send queue exists to buffer messages until a subscriber is active. - sendQueue: make(chan *htlcAcceptedMsg, 10000), + htlcSendQueue: make(chan *htlcAcceptedMsg, 10000), // The receive queue exists mainly to allow returning timeouts to the // cln plugin. If there is no subscriber active within the subscriber // timeout period these results can be put directly on the receive queue. - recvQueue: make(chan *htlcResultMsg, 10000), - started: make(chan struct{}), - startError: make(chan error, 1), + htlcRecvQueue: make(chan *htlcResultMsg, 10000), + started: make(chan struct{}), + startError: make(chan error, 1), } } @@ -78,7 +78,7 @@ func (s *server) Start() error { s.done = make(chan struct{}) s.completed = make(chan struct{}) - s.newSubscriber = make(chan struct{}) + s.htlcnewSubscriber = make(chan struct{}) s.grpcServer = grpc.NewServer( grpc.KeepaliveParams(keepalive.ServerParameters{ Time: time.Duration(1) * time.Second, @@ -132,7 +132,7 @@ func (s *server) Stop() { // from or to the subscriber, the subscription is closed. func (s *server) HtlcStream(stream proto.ClnPlugin_HtlcStreamServer) error { s.mtx.Lock() - if s.stream == nil { + if s.htlcStream == nil { log.Printf("Got a new HTLC stream subscription request.") } else { s.mtx.Unlock() @@ -141,12 +141,12 @@ func (s *server) HtlcStream(stream proto.ClnPlugin_HtlcStreamServer) error { return fmt.Errorf("already subscribed") } - s.stream = stream + s.htlcStream = stream // Notify listeners that a new subscriber is active. Replace the chan with // a new one immediately in case this subscriber is dropped later. - close(s.newSubscriber) - s.newSubscriber = make(chan struct{}) + close(s.htlcnewSubscriber) + s.htlcnewSubscriber = make(chan struct{}) s.mtx.Unlock() <-stream.Context().Done() @@ -154,15 +154,15 @@ func (s *server) HtlcStream(stream proto.ClnPlugin_HtlcStreamServer) error { // Remove the subscriber. s.mtx.Lock() - s.stream = nil + s.htlcStream = nil s.mtx.Unlock() return stream.Context().Err() } // Enqueues a htlc_accepted message for send to the grpc client. -func (s *server) Send(id string, h *HtlcAccepted) { - s.sendQueue <- &htlcAcceptedMsg{ +func (s *server) SendHtlcAccepted(id string, h *HtlcAccepted) { + s.htlcSendQueue <- &htlcAcceptedMsg{ id: id, htlc: h, timeout: time.Now().Add(s.subscriberTimeout), @@ -173,11 +173,11 @@ func (s *server) Send(id string, h *HtlcAccepted) { // and message. Blocks until a message is available. Returns a nil message if // the server is done. This function effectively waits until a subscriber is // active and has sent a message. -func (s *server) Receive() (string, interface{}) { +func (s *server) ReceiveHtlcResolution() (string, interface{}) { select { case <-s.done: return "", nil - case msg := <-s.recvQueue: + case msg := <-s.htlcRecvQueue: return msg.id, msg.result } } @@ -191,7 +191,7 @@ func (s *server) listenHtlcRequests() { case <-s.done: log.Printf("listenHtlcRequests received done. Stop listening.") return - case msg := <-s.sendQueue: + case msg := <-s.htlcSendQueue: s.handleHtlcAccepted(msg) } } @@ -202,8 +202,8 @@ func (s *server) listenHtlcRequests() { func (s *server) handleHtlcAccepted(msg *htlcAcceptedMsg) { for { s.mtx.Lock() - stream := s.stream - ns := s.newSubscriber + stream := s.htlcStream + ns := s.htlcnewSubscriber s.mtx.Unlock() // If there is no active subscription, wait until there is a new @@ -228,7 +228,7 @@ func (s *server) handleHtlcAccepted(msg *htlcAcceptedMsg) { // If the subscriber timeout expires while holding the htlc // we short circuit the htlc by sending the default result // (continue) to cln. - s.recvQueue <- &htlcResultMsg{ + s.htlcRecvQueue <- &htlcResultMsg{ id: msg.id, result: s.defaultResult(), } @@ -283,10 +283,10 @@ func (s *server) listenHtlcResponses() { log.Printf("listenHtlcResponses received done. Stopping listening.") return default: - resp := s.recv() - s.recvQueue <- &htlcResultMsg{ + resp := s.recvHtlcResolution() + s.htlcRecvQueue <- &htlcResultMsg{ id: resp.Correlationid, - result: s.mapResult(resp.Outcome), + result: s.mapHtlcResult(resp.Outcome), } } } @@ -295,14 +295,14 @@ func (s *server) listenHtlcResponses() { // Helper function that blocks until a message from a grpc client is received // or the server stops. Either returns a received message, or nil if the server // has stopped. -func (s *server) recv() *proto.HtlcResolution { +func (s *server) recvHtlcResolution() *proto.HtlcResolution { for { // make a copy of the used fields, to make sure state updates don't // surprise us. The newSubscriber chan is swapped whenever a new // subscriber arrives. s.mtx.Lock() - stream := s.stream - ns := s.newSubscriber + stream := s.htlcStream + ns := s.htlcnewSubscriber s.mtx.Unlock() if stream == nil { @@ -336,7 +336,7 @@ func (s *server) recv() *proto.HtlcResolution { // Maps a grpc result to the corresponding result for cln. The cln message // is a raw json message, so it's easiest to use a map directly. -func (s *server) mapResult(outcome interface{}) interface{} { +func (s *server) mapHtlcResult(outcome interface{}) interface{} { // result: continue cont, ok := outcome.(*proto.HtlcResolution_Continue) if ok { From abef3b98c8958f1be5edd2a8fdcf401921443f18 Mon Sep 17 00:00:00 2001 From: Jesse de Wit Date: Thu, 10 Aug 2023 09:36:13 +0200 Subject: [PATCH 2/9] cln_plugin: custommsg support --- cln_plugin/cln_messages.go | 5 + cln_plugin/cln_plugin.go | 47 +++++- cln_plugin/proto/cln_plugin.pb.go | 172 ++++++++++++++++++--- cln_plugin/proto/cln_plugin.proto | 7 + cln_plugin/proto/cln_plugin_grpc.pb.go | 65 +++++++- cln_plugin/server.go | 200 ++++++++++++++++++++++--- 6 files changed, 454 insertions(+), 42 deletions(-) diff --git a/cln_plugin/cln_messages.go b/cln_plugin/cln_messages.go index aed6a5ea..b2bffe84 100644 --- a/cln_plugin/cln_messages.go +++ b/cln_plugin/cln_messages.go @@ -115,3 +115,8 @@ type LogNotification struct { Level string `json:"level"` Message string `json:"message"` } + +type CustomMessageRequest struct { + PeerId string `json:"peer_id"` + Payload string `json:"payload"` +} diff --git a/cln_plugin/cln_plugin.go b/cln_plugin/cln_plugin.go index ba10f1f4..506a77ba 100644 --- a/cln_plugin/cln_plugin.go +++ b/cln_plugin/cln_plugin.go @@ -149,6 +149,30 @@ func (c *ClnPlugin) htlcListenServer() { } } +// Listens to responses to custommsg requests from the grpc server. +func (c *ClnPlugin) custommsgListenServer() { + for { + select { + case <-c.done: + return + default: + id, result := c.server.ReceiveCustomMessageResponse() + + // The server may return nil if it is stopped. + if result == nil { + continue + } + + serid, _ := json.Marshal(&id) + c.sendToCln(&Response{ + Id: serid, + JsonRpc: SpecVersion, + Result: result, + }) + } + } +} + // processes a single message from cln. Sends the message to the appropriate // handler. func (c *ClnPlugin) processMsg(msg []byte) { @@ -277,6 +301,7 @@ func (c *ClnPlugin) handleGetManifest(request *Request) { }, Dynamic: true, Hooks: []Hook{ + {Name: "custommsg"}, {Name: "htlc_accepted"}, {Name: "openchannel"}, {Name: "openchannel2"}, @@ -406,8 +431,9 @@ func (c *ClnPlugin) handleInit(request *Request) { return } - // Listen for htlc responses from the grpc server. + // Listen for htlc and custommsg responses from the grpc server. go c.htlcListenServer() + go c.custommsgListenServer() // Let cln know the plugin is initialized. c.sendToCln(&Response{ @@ -466,6 +492,25 @@ func (c *ClnPlugin) handleSetChannelAcceptScript(request *Request) { }) } +func (c *ClnPlugin) handleCustomMsg(request *Request) { + var custommsg CustomMessageRequest + err := json.Unmarshal(request.Params, &custommsg) + if err != nil { + c.sendError( + request.Id, + ParseError, + fmt.Sprintf( + "Failed to unmarshal custommsg params:%s [%s]", + err.Error(), + request.Params, + ), + ) + return + } + + c.server.SendCustomMessage(idToString(request.Id), &custommsg) +} + func unmarshalOpenChannel(request *Request) (r json.RawMessage, err error) { switch request.Method { case "openchannel": diff --git a/cln_plugin/proto/cln_plugin.pb.go b/cln_plugin/proto/cln_plugin.pb.go index 72675d6e..5f3755fd 100644 --- a/cln_plugin/proto/cln_plugin.pb.go +++ b/cln_plugin/proto/cln_plugin.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: // protoc-gen-go v1.28.1 -// protoc v3.21.12 +// protoc v4.23.4 // source: cln_plugin.proto package proto @@ -549,6 +549,99 @@ func (x *HtlcResolve) GetPaymentKey() string { return "" } +type CustomMessageRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields +} + +func (x *CustomMessageRequest) Reset() { + *x = CustomMessageRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_cln_plugin_proto_msgTypes[7] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *CustomMessageRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*CustomMessageRequest) ProtoMessage() {} + +func (x *CustomMessageRequest) ProtoReflect() protoreflect.Message { + mi := &file_cln_plugin_proto_msgTypes[7] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use CustomMessageRequest.ProtoReflect.Descriptor instead. +func (*CustomMessageRequest) Descriptor() ([]byte, []int) { + return file_cln_plugin_proto_rawDescGZIP(), []int{7} +} + +type CustomMessage struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + PeerId string `protobuf:"bytes,1,opt,name=peer_id,json=peerId,proto3" json:"peer_id,omitempty"` + Payload string `protobuf:"bytes,2,opt,name=payload,proto3" json:"payload,omitempty"` +} + +func (x *CustomMessage) Reset() { + *x = CustomMessage{} + if protoimpl.UnsafeEnabled { + mi := &file_cln_plugin_proto_msgTypes[8] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *CustomMessage) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*CustomMessage) ProtoMessage() {} + +func (x *CustomMessage) ProtoReflect() protoreflect.Message { + mi := &file_cln_plugin_proto_msgTypes[8] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use CustomMessage.ProtoReflect.Descriptor instead. +func (*CustomMessage) Descriptor() ([]byte, []int) { + return file_cln_plugin_proto_rawDescGZIP(), []int{8} +} + +func (x *CustomMessage) GetPeerId() string { + if x != nil { + return x.PeerId + } + return "" +} + +func (x *CustomMessage) GetPayload() string { + if x != nil { + return x.Payload + } + return "" +} + var File_cln_plugin_proto protoreflect.FileDescriptor var file_cln_plugin_proto_rawDesc = []byte{ @@ -618,14 +711,23 @@ var file_cln_plugin_proto_rawDesc = []byte{ 0x75, 0x72, 0x65, 0x22, 0x2e, 0x0a, 0x0b, 0x48, 0x74, 0x6c, 0x63, 0x52, 0x65, 0x73, 0x6f, 0x6c, 0x76, 0x65, 0x12, 0x1f, 0x0a, 0x0b, 0x70, 0x61, 0x79, 0x6d, 0x65, 0x6e, 0x74, 0x5f, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0a, 0x70, 0x61, 0x79, 0x6d, 0x65, 0x6e, 0x74, - 0x4b, 0x65, 0x79, 0x32, 0x3d, 0x0a, 0x09, 0x43, 0x6c, 0x6e, 0x50, 0x6c, 0x75, 0x67, 0x69, 0x6e, - 0x12, 0x30, 0x0a, 0x0a, 0x48, 0x74, 0x6c, 0x63, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x12, 0x0f, - 0x2e, 0x48, 0x74, 0x6c, 0x63, 0x52, 0x65, 0x73, 0x6f, 0x6c, 0x75, 0x74, 0x69, 0x6f, 0x6e, 0x1a, - 0x0d, 0x2e, 0x48, 0x74, 0x6c, 0x63, 0x41, 0x63, 0x63, 0x65, 0x70, 0x74, 0x65, 0x64, 0x28, 0x01, - 0x30, 0x01, 0x42, 0x28, 0x5a, 0x26, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, - 0x2f, 0x62, 0x72, 0x65, 0x65, 0x7a, 0x2f, 0x6c, 0x73, 0x70, 0x64, 0x2f, 0x63, 0x6c, 0x6e, 0x5f, - 0x70, 0x6c, 0x75, 0x67, 0x69, 0x6e, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x06, 0x70, 0x72, - 0x6f, 0x74, 0x6f, 0x33, + 0x4b, 0x65, 0x79, 0x22, 0x16, 0x0a, 0x14, 0x43, 0x75, 0x73, 0x74, 0x6f, 0x6d, 0x4d, 0x65, 0x73, + 0x73, 0x61, 0x67, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x22, 0x42, 0x0a, 0x0d, 0x43, + 0x75, 0x73, 0x74, 0x6f, 0x6d, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x17, 0x0a, 0x07, + 0x70, 0x65, 0x65, 0x72, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x70, + 0x65, 0x65, 0x72, 0x49, 0x64, 0x12, 0x18, 0x0a, 0x07, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, + 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x32, + 0x79, 0x0a, 0x09, 0x43, 0x6c, 0x6e, 0x50, 0x6c, 0x75, 0x67, 0x69, 0x6e, 0x12, 0x30, 0x0a, 0x0a, + 0x48, 0x74, 0x6c, 0x63, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x12, 0x0f, 0x2e, 0x48, 0x74, 0x6c, + 0x63, 0x52, 0x65, 0x73, 0x6f, 0x6c, 0x75, 0x74, 0x69, 0x6f, 0x6e, 0x1a, 0x0d, 0x2e, 0x48, 0x74, + 0x6c, 0x63, 0x41, 0x63, 0x63, 0x65, 0x70, 0x74, 0x65, 0x64, 0x28, 0x01, 0x30, 0x01, 0x12, 0x3a, + 0x0a, 0x0f, 0x43, 0x75, 0x73, 0x74, 0x6f, 0x6d, 0x4d, 0x73, 0x67, 0x53, 0x74, 0x72, 0x65, 0x61, + 0x6d, 0x12, 0x15, 0x2e, 0x43, 0x75, 0x73, 0x74, 0x6f, 0x6d, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, + 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x0e, 0x2e, 0x43, 0x75, 0x73, 0x74, 0x6f, + 0x6d, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x30, 0x01, 0x42, 0x28, 0x5a, 0x26, 0x67, 0x69, + 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x62, 0x72, 0x65, 0x65, 0x7a, 0x2f, 0x6c, + 0x73, 0x70, 0x64, 0x2f, 0x63, 0x6c, 0x6e, 0x5f, 0x70, 0x6c, 0x75, 0x67, 0x69, 0x6e, 0x2f, 0x70, + 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -640,15 +742,17 @@ func file_cln_plugin_proto_rawDescGZIP() []byte { return file_cln_plugin_proto_rawDescData } -var file_cln_plugin_proto_msgTypes = make([]protoimpl.MessageInfo, 7) +var file_cln_plugin_proto_msgTypes = make([]protoimpl.MessageInfo, 9) var file_cln_plugin_proto_goTypes = []interface{}{ - (*HtlcAccepted)(nil), // 0: HtlcAccepted - (*Onion)(nil), // 1: Onion - (*Htlc)(nil), // 2: Htlc - (*HtlcResolution)(nil), // 3: HtlcResolution - (*HtlcContinue)(nil), // 4: HtlcContinue - (*HtlcFail)(nil), // 5: HtlcFail - (*HtlcResolve)(nil), // 6: HtlcResolve + (*HtlcAccepted)(nil), // 0: HtlcAccepted + (*Onion)(nil), // 1: Onion + (*Htlc)(nil), // 2: Htlc + (*HtlcResolution)(nil), // 3: HtlcResolution + (*HtlcContinue)(nil), // 4: HtlcContinue + (*HtlcFail)(nil), // 5: HtlcFail + (*HtlcResolve)(nil), // 6: HtlcResolve + (*CustomMessageRequest)(nil), // 7: CustomMessageRequest + (*CustomMessage)(nil), // 8: CustomMessage } var file_cln_plugin_proto_depIdxs = []int32{ 1, // 0: HtlcAccepted.onion:type_name -> Onion @@ -657,9 +761,11 @@ var file_cln_plugin_proto_depIdxs = []int32{ 4, // 3: HtlcResolution.continue:type_name -> HtlcContinue 6, // 4: HtlcResolution.resolve:type_name -> HtlcResolve 3, // 5: ClnPlugin.HtlcStream:input_type -> HtlcResolution - 0, // 6: ClnPlugin.HtlcStream:output_type -> HtlcAccepted - 6, // [6:7] is the sub-list for method output_type - 5, // [5:6] is the sub-list for method input_type + 7, // 6: ClnPlugin.CustomMsgStream:input_type -> CustomMessageRequest + 0, // 7: ClnPlugin.HtlcStream:output_type -> HtlcAccepted + 8, // 8: ClnPlugin.CustomMsgStream:output_type -> CustomMessage + 7, // [7:9] is the sub-list for method output_type + 5, // [5:7] is the sub-list for method input_type 5, // [5:5] is the sub-list for extension type_name 5, // [5:5] is the sub-list for extension extendee 0, // [0:5] is the sub-list for field type_name @@ -755,6 +861,30 @@ func file_cln_plugin_proto_init() { return nil } } + file_cln_plugin_proto_msgTypes[7].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*CustomMessageRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_cln_plugin_proto_msgTypes[8].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*CustomMessage); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } } file_cln_plugin_proto_msgTypes[3].OneofWrappers = []interface{}{ (*HtlcResolution_Fail)(nil), @@ -772,7 +902,7 @@ func file_cln_plugin_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: file_cln_plugin_proto_rawDesc, NumEnums: 0, - NumMessages: 7, + NumMessages: 9, NumExtensions: 0, NumServices: 1, }, diff --git a/cln_plugin/proto/cln_plugin.proto b/cln_plugin/proto/cln_plugin.proto index be350594..584f60fc 100644 --- a/cln_plugin/proto/cln_plugin.proto +++ b/cln_plugin/proto/cln_plugin.proto @@ -3,6 +3,7 @@ option go_package="github.com/breez/lspd/cln_plugin/proto"; service ClnPlugin { rpc HtlcStream(stream HtlcResolution) returns (stream HtlcAccepted); + rpc CustomMsgStream(CustomMessageRequest) returns (stream CustomMessage); } message HtlcAccepted { @@ -54,3 +55,9 @@ message HtlcFail { message HtlcResolve { string payment_key = 1; } + +message CustomMessageRequest {} +message CustomMessage { + string peer_id = 1; + string payload = 2; +} diff --git a/cln_plugin/proto/cln_plugin_grpc.pb.go b/cln_plugin/proto/cln_plugin_grpc.pb.go index 1f8ccd29..745de5f7 100644 --- a/cln_plugin/proto/cln_plugin_grpc.pb.go +++ b/cln_plugin/proto/cln_plugin_grpc.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go-grpc. DO NOT EDIT. // versions: // - protoc-gen-go-grpc v1.2.0 -// - protoc v3.21.12 +// - protoc v4.23.4 // source: cln_plugin.proto package proto @@ -23,6 +23,7 @@ const _ = grpc.SupportPackageIsVersion7 // For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. type ClnPluginClient interface { HtlcStream(ctx context.Context, opts ...grpc.CallOption) (ClnPlugin_HtlcStreamClient, error) + CustomMsgStream(ctx context.Context, in *CustomMessageRequest, opts ...grpc.CallOption) (ClnPlugin_CustomMsgStreamClient, error) } type clnPluginClient struct { @@ -64,11 +65,44 @@ func (x *clnPluginHtlcStreamClient) Recv() (*HtlcAccepted, error) { return m, nil } +func (c *clnPluginClient) CustomMsgStream(ctx context.Context, in *CustomMessageRequest, opts ...grpc.CallOption) (ClnPlugin_CustomMsgStreamClient, error) { + stream, err := c.cc.NewStream(ctx, &ClnPlugin_ServiceDesc.Streams[1], "/ClnPlugin/CustomMsgStream", opts...) + if err != nil { + return nil, err + } + x := &clnPluginCustomMsgStreamClient{stream} + if err := x.ClientStream.SendMsg(in); err != nil { + return nil, err + } + if err := x.ClientStream.CloseSend(); err != nil { + return nil, err + } + return x, nil +} + +type ClnPlugin_CustomMsgStreamClient interface { + Recv() (*CustomMessage, error) + grpc.ClientStream +} + +type clnPluginCustomMsgStreamClient struct { + grpc.ClientStream +} + +func (x *clnPluginCustomMsgStreamClient) Recv() (*CustomMessage, error) { + m := new(CustomMessage) + if err := x.ClientStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + // ClnPluginServer is the server API for ClnPlugin service. // All implementations must embed UnimplementedClnPluginServer // for forward compatibility type ClnPluginServer interface { HtlcStream(ClnPlugin_HtlcStreamServer) error + CustomMsgStream(*CustomMessageRequest, ClnPlugin_CustomMsgStreamServer) error mustEmbedUnimplementedClnPluginServer() } @@ -79,6 +113,9 @@ type UnimplementedClnPluginServer struct { func (UnimplementedClnPluginServer) HtlcStream(ClnPlugin_HtlcStreamServer) error { return status.Errorf(codes.Unimplemented, "method HtlcStream not implemented") } +func (UnimplementedClnPluginServer) CustomMsgStream(*CustomMessageRequest, ClnPlugin_CustomMsgStreamServer) error { + return status.Errorf(codes.Unimplemented, "method CustomMsgStream not implemented") +} func (UnimplementedClnPluginServer) mustEmbedUnimplementedClnPluginServer() {} // UnsafeClnPluginServer may be embedded to opt out of forward compatibility for this service. @@ -118,6 +155,27 @@ func (x *clnPluginHtlcStreamServer) Recv() (*HtlcResolution, error) { return m, nil } +func _ClnPlugin_CustomMsgStream_Handler(srv interface{}, stream grpc.ServerStream) error { + m := new(CustomMessageRequest) + if err := stream.RecvMsg(m); err != nil { + return err + } + return srv.(ClnPluginServer).CustomMsgStream(m, &clnPluginCustomMsgStreamServer{stream}) +} + +type ClnPlugin_CustomMsgStreamServer interface { + Send(*CustomMessage) error + grpc.ServerStream +} + +type clnPluginCustomMsgStreamServer struct { + grpc.ServerStream +} + +func (x *clnPluginCustomMsgStreamServer) Send(m *CustomMessage) error { + return x.ServerStream.SendMsg(m) +} + // ClnPlugin_ServiceDesc is the grpc.ServiceDesc for ClnPlugin service. // It's only intended for direct use with grpc.RegisterService, // and not to be introspected or modified (even as a copy) @@ -132,6 +190,11 @@ var ClnPlugin_ServiceDesc = grpc.ServiceDesc{ ServerStreams: true, ClientStreams: true, }, + { + StreamName: "CustomMsgStream", + Handler: _ClnPlugin_CustomMsgStream_Handler, + ServerStreams: true, + }, }, Metadata: "cln_plugin.proto", } diff --git a/cln_plugin/server.go b/cln_plugin/server.go index 8b8160f6..ba8b7b4a 100644 --- a/cln_plugin/server.go +++ b/cln_plugin/server.go @@ -25,20 +25,37 @@ type htlcResultMsg struct { result interface{} } +// Internal custommsg message meant for the sendQueue. +type custommsgMsg struct { + id string + custommsg *CustomMessageRequest + timeout time.Time +} + +// Internal custommsg result message meant for the recvQueue. +type custommsgResultMsg struct { + id string + result interface{} +} + type server struct { proto.ClnPluginServer - listenAddress string - subscriberTimeout time.Duration - grpcServer *grpc.Server - mtx sync.Mutex - started chan struct{} - done chan struct{} - completed chan struct{} - startError chan error - htlcnewSubscriber chan struct{} - htlcStream proto.ClnPlugin_HtlcStreamServer - htlcSendQueue chan *htlcAcceptedMsg - htlcRecvQueue chan *htlcResultMsg + listenAddress string + subscriberTimeout time.Duration + grpcServer *grpc.Server + mtx sync.Mutex + started chan struct{} + done chan struct{} + completed chan struct{} + startError chan error + htlcnewSubscriber chan struct{} + htlcStream proto.ClnPlugin_HtlcStreamServer + htlcSendQueue chan *htlcAcceptedMsg + htlcRecvQueue chan *htlcResultMsg + custommsgNewSubscriber chan struct{} + custommsgStream proto.ClnPlugin_CustomMsgStreamServer + custommsgSendQueue chan *custommsgMsg + custommsgRecvQueue chan *custommsgResultMsg } // Creates a new grpc server @@ -48,13 +65,15 @@ func NewServer(listenAddress string, subscriberTimeout time.Duration) *server { listenAddress: listenAddress, subscriberTimeout: subscriberTimeout, // The send queue exists to buffer messages until a subscriber is active. - htlcSendQueue: make(chan *htlcAcceptedMsg, 10000), + htlcSendQueue: make(chan *htlcAcceptedMsg, 10000), + custommsgSendQueue: make(chan *custommsgMsg, 10000), // The receive queue exists mainly to allow returning timeouts to the // cln plugin. If there is no subscriber active within the subscriber // timeout period these results can be put directly on the receive queue. - htlcRecvQueue: make(chan *htlcResultMsg, 10000), - started: make(chan struct{}), - startError: make(chan error, 1), + htlcRecvQueue: make(chan *htlcResultMsg, 10000), + custommsgRecvQueue: make(chan *custommsgResultMsg, 10000), + started: make(chan struct{}), + startError: make(chan error, 1), } } @@ -79,6 +98,7 @@ func (s *server) Start() error { s.done = make(chan struct{}) s.completed = make(chan struct{}) s.htlcnewSubscriber = make(chan struct{}) + s.custommsgNewSubscriber = make(chan struct{}) s.grpcServer = grpc.NewServer( grpc.KeepaliveParams(keepalive.ServerParameters{ Time: time.Duration(1) * time.Second, @@ -94,6 +114,7 @@ func (s *server) Start() error { log.Printf("Server starting to listen on %s.", s.listenAddress) go s.listenHtlcRequests() go s.listenHtlcResponses() + go s.listenCustomMsgRequests() close(s.started) err = s.grpcServer.Serve(lis) close(s.completed) @@ -306,13 +327,13 @@ func (s *server) recvHtlcResolution() *proto.HtlcResolution { s.mtx.Unlock() if stream == nil { - log.Printf("Got no subscribers for receive. Waiting for subscriber.") + log.Printf("Got no subscribers for htlc receive. Waiting for subscriber.") select { case <-s.done: - log.Printf("Done signalled, stopping receive.") + log.Printf("Done signalled, stopping htlc receive.") return nil case <-ns: - log.Printf("New subscription available for receive, continue receive.") + log.Printf("New subscription available for htlc receive, continue receive.") continue } } @@ -393,6 +414,147 @@ func (s *server) mapHtlcResult(outcome interface{}) interface{} { return s.defaultResult() } +// Grpc method that is called when a new client subscribes. There can only be +// one subscriber active at a time. If there is an error receiving or sending +// from or to the subscriber, the subscription is closed. +func (s *server) CustomMsgStream( + _ *proto.CustomMessageRequest, + stream proto.ClnPlugin_CustomMsgStreamServer, +) error { + + s.mtx.Lock() + if s.custommsgStream == nil { + log.Printf("Got a new custommsg stream subscription request.") + } else { + s.mtx.Unlock() + log.Printf("Got a custommsg stream subscription request, but " + + "subscription was already active.") + return fmt.Errorf("already subscribed") + } + + s.custommsgStream = stream + + // Notify listeners that a new subscriber is active. Replace the chan with + // a new one immediately in case this subscriber is dropped later. + close(s.custommsgNewSubscriber) + s.custommsgNewSubscriber = make(chan struct{}) + s.mtx.Unlock() + + <-stream.Context().Done() + log.Printf( + "CustomMsgStream context is done. Return: %v", + stream.Context().Err(), + ) + + // Remove the subscriber. + s.mtx.Lock() + s.custommsgStream = nil + s.mtx.Unlock() + + return stream.Context().Err() +} + +// Enqueues a htlc_accepted message for send to the grpc client. +func (s *server) SendCustomMessage(id string, c *CustomMessageRequest) { + s.custommsgSendQueue <- &custommsgMsg{ + id: id, + custommsg: c, + timeout: time.Now().Add(s.subscriberTimeout), + } +} + +// Receives the next custommsg response message from the grpc client. Returns id +// and message. Blocks until a message is available. Returns a nil message if +// the server is done. This function effectively waits until a subscriber is +// active and has sent a message. +func (s *server) ReceiveCustomMessageResponse() (string, interface{}) { + select { + case <-s.done: + return "", nil + case msg := <-s.custommsgRecvQueue: + return msg.id, msg.result + } +} + +// Listens to sendQueue for custommsg requests from cln. The message will be +// held until a subscriber is active, or the subscriber timeout expires. The +// messages are sent to the grpc client in fifo order. +func (s *server) listenCustomMsgRequests() { + for { + select { + case <-s.done: + log.Printf("listenCustomMsgRequests received done. Stop listening.") + return + case msg := <-s.custommsgSendQueue: + s.handleCustomMsg(msg) + } + } +} + +// Attempts to send a custommsg message to the grpc client. The message will +// be held until a subscriber is active, or the subscriber timeout expires. +func (s *server) handleCustomMsg(msg *custommsgMsg) { + for { + s.mtx.Lock() + stream := s.custommsgStream + ns := s.custommsgNewSubscriber + s.mtx.Unlock() + + // If there is no active subscription, wait until there is a new + // subscriber, or the message times out. + if stream == nil { + select { + case <-s.done: + log.Printf("handleCustomMsg received server done. Stop processing.") + return + case <-ns: + log.Printf("got a new subscriber. continue handleCustomMsg.") + continue + case <-time.After(time.Until(msg.timeout)): + log.Printf( + "WARNING: custommsg with id '%s' timed out after '%v' waiting "+ + "for grpc subscriber: %+v", + msg.id, + s.subscriberTimeout, + msg.custommsg, + ) + + // If the subscriber timeout expires while holding the custommsg + // we ignore the message by sending the default result + // (continue) to cln. + s.custommsgRecvQueue <- &custommsgResultMsg{ + id: msg.id, + result: s.defaultResult(), + } + + return + } + } + + // There is a subscriber. Attempt to send the custommsg message. + err := stream.Send(&proto.CustomMessage{ + PeerId: msg.custommsg.PeerId, + Payload: msg.custommsg.Payload, + }) + + // If there is no error, we're done, mark the message as handled. + if err == nil { + s.custommsgRecvQueue <- &custommsgResultMsg{ + id: msg.id, + result: s.defaultResult(), + } + return + } + + // If we end up here, there was an error sending the message to the + // grpc client. + // TODO: If the Send errors, but the context is not done, this will + // currently retry immediately. Check whether the context is really + // done on an error! + log.Printf("Error sending custommsg message to subscriber. Retrying: %v", err) + } +} + // Returns a result: continue message. func (s *server) defaultResult() interface{} { return map[string]interface{}{ From 4778906c6be01bec9a5e604feac57a56be39a7b6 Mon Sep 17 00:00:00 2001 From: Jesse de Wit Date: Fri, 11 Aug 2023 08:49:57 +0200 Subject: [PATCH 3/9] lsps0: lsps0 server implementation --- lightning/custom_msg_client.go | 12 ++ lsps0/codes/code.go | 14 ++ lsps0/jsonrpc/jsonrpc.go | 30 ++++ lsps0/protocol_server.go | 55 +++++++ lsps0/server.go | 273 +++++++++++++++++++++++++++++++++ lsps0/server_test.go | 243 +++++++++++++++++++++++++++++ lsps0/status/status.go | 62 ++++++++ 7 files changed, 689 insertions(+) create mode 100644 lightning/custom_msg_client.go create mode 100644 lsps0/codes/code.go create mode 100644 lsps0/jsonrpc/jsonrpc.go create mode 100644 lsps0/protocol_server.go create mode 100644 lsps0/server.go create mode 100644 lsps0/server_test.go create mode 100644 lsps0/status/status.go diff --git a/lightning/custom_msg_client.go b/lightning/custom_msg_client.go new file mode 100644 index 00000000..c34ccfa3 --- /dev/null +++ b/lightning/custom_msg_client.go @@ -0,0 +1,12 @@ +package lightning + +type CustomMessage struct { + PeerId string + Type uint32 + Data []byte +} + +type CustomMsgClient interface { + Recv() (*CustomMessage, error) + Send(*CustomMessage) error +} diff --git a/lsps0/codes/code.go b/lsps0/codes/code.go new file mode 100644 index 00000000..46e0797f --- /dev/null +++ b/lsps0/codes/code.go @@ -0,0 +1,14 @@ +package codes + +type Code int32 + +const ( + OK Code = 0 + Canceled Code = 1 + Unknown Code = 2 + ParseError Code = -32700 + InvalidRequest Code = -32600 + MethodNotFound Code = -32601 + InvalidParams Code = -32602 + InternalError Code = -32603 +) diff --git a/lsps0/jsonrpc/jsonrpc.go b/lsps0/jsonrpc/jsonrpc.go new file mode 100644 index 00000000..0812cb42 --- /dev/null +++ b/lsps0/jsonrpc/jsonrpc.go @@ -0,0 +1,30 @@ +package jsonrpc + +import "encoding/json" + +var Version = "2.0" + +type Request struct { + JsonRpc string `json:"jsonrpc"` + Method string `json:"method"` + Id string `json:"id"` + Params json.RawMessage `json:"params"` +} + +type Response struct { + JsonRpc string `json:"jsonrpc"` + Id string `json:"id"` + Result json.RawMessage `json:"result"` +} + +type Error struct { + JsonRpc string `json:"jsonrpc"` + Id *string `json:"id"` + Error ErrorBody `json:"error"` +} + +type ErrorBody struct { + Code int32 `json:"code"` + Message string `json:"message"` + Data json.RawMessage `json:"data,omitempty"` +} diff --git a/lsps0/protocol_server.go b/lsps0/protocol_server.go new file mode 100644 index 00000000..b23e428c --- /dev/null +++ b/lsps0/protocol_server.go @@ -0,0 +1,55 @@ +package lsps0 + +import "context" + +type ProtocolServer interface { + ListProtocols( + ctx context.Context, + req *ListProtocolsRequest, + ) (*ListProtocolsResponse, error) +} +type protocolServer struct { + protocols []uint32 +} + +type ListProtocolsRequest struct{} +type ListProtocolsResponse struct { + Protocols []uint32 `json:"protocols"` +} + +func NewProtocolServer(supportedProtocols []uint32) ProtocolServer { + return &protocolServer{ + protocols: supportedProtocols, + } +} + +func (s *protocolServer) ListProtocols( + ctx context.Context, + req *ListProtocolsRequest, +) (*ListProtocolsResponse, error) { + return &ListProtocolsResponse{ + Protocols: s.protocols, + }, nil +} + +func RegisterProtocolServer(s ServiceRegistrar, p ProtocolServer) { + s.RegisterService( + &ServiceDesc{ + ServiceName: "lsps0", + HandlerType: (*ProtocolServer)(nil), + Methods: []MethodDesc{ + { + MethodName: "lsps0.list_protocols", + Handler: func(srv interface{}, ctx context.Context, dec func(interface{}) error) (interface{}, error) { + in := new(ListProtocolsRequest) + if err := dec(in); err != nil { + return nil, err + } + return srv.(ProtocolServer).ListProtocols(ctx, in) + }, + }, + }, + }, + p, + ) +} diff --git a/lsps0/server.go b/lsps0/server.go new file mode 100644 index 00000000..85c01a8b --- /dev/null +++ b/lsps0/server.go @@ -0,0 +1,273 @@ +package lsps0 + +import ( + "context" + "encoding/json" + "errors" + "log" + "sync" + "time" + + "github.com/breez/lspd/lightning" + "github.com/breez/lspd/lsps0/codes" + "github.com/breez/lspd/lsps0/jsonrpc" + "github.com/breez/lspd/lsps0/status" + "golang.org/x/exp/slices" +) + +var ErrAlreadyServing = errors.New("lsps0: already serving") +var ErrServerStopped = errors.New("lsps0: the server has been stopped") +var Lsps0MessageType uint32 = 37913 +var BadMessageFormatError string = "bad message format" +var InternalError string = "internal error" + +// ServiceDesc and is constructed from it for internal purposes. +type serviceInfo struct { + // Contains the implementation for the methods in this service. + serviceImpl interface{} + methods map[string]*MethodDesc +} + +type methodInfo struct { + service *serviceInfo + method *MethodDesc +} + +type Server struct { + mu sync.Mutex + serve bool + services map[string]*serviceInfo + methods map[string]*methodInfo +} + +func NewServer() *Server { + return &Server{ + services: make(map[string]*serviceInfo), + methods: make(map[string]*methodInfo), + } +} + +func (s *Server) Serve(lis lightning.CustomMsgClient) error { + s.mu.Lock() + if s.serve { + return ErrAlreadyServing + } + s.serve = true + s.mu.Unlock() + + defer func() { + s.mu.Lock() + s.serve = false + s.mu.Unlock() + }() + + for { + msg, err := lis.Recv() + if err != nil { + if err == context.Canceled { + log.Printf("lsps0: lis got canceled, stopping.") + return err + } + + log.Printf("lsps0 Serve(): Recv() err != nil: %v", err) + <-time.After(time.Second) + continue + } + + // Ignore any message that is not an lsps0 message + if msg.Type != Lsps0MessageType { + continue + } + + // Make sure there are no 0 bytes + if slices.Contains(msg.Data, 0x00) { + log.Printf("UNUSUAL: Got custom message containing 0 bytes from peer '%s'.", msg.PeerId) + go sendError(lis, msg, nil, status.New(codes.ParseError, BadMessageFormatError)) + continue + } + + req := new(jsonrpc.Request) + err = json.Unmarshal(msg.Data, req) + if err != nil { + log.Printf("UNUSUAL: Failed to unmarshal custom message from peer '%s': %v", msg.PeerId, err) + go sendError(lis, msg, nil, status.New(codes.ParseError, BadMessageFormatError)) + continue + } + + if req == nil { + log.Printf("UNUSUAL: req == nil after unmarshal custom message from peer '%s': %v", msg.PeerId, err) + go sendError(lis, msg, nil, status.New(codes.ParseError, BadMessageFormatError)) + continue + } + + if req.JsonRpc != jsonrpc.Version { + log.Printf("UNUSUAL: jsonrpc version is '%s' in custom message from peer '%s': %v", req.JsonRpc, msg.PeerId, err) + go sendError(lis, msg, req, status.Newf(codes.InvalidRequest, "Expected jsonrpc %s, found %s", jsonrpc.Version, req.JsonRpc)) + continue + } + + m, ok := s.methods[req.Method] + if !ok { + log.Printf("UNUSUAL: peer '%s' requested method '%s', but it does not exist.", msg.PeerId, req.Method) + go sendError(lis, msg, req, status.New(codes.MethodNotFound, "method not found")) + continue + } + + df := func(v interface{}) error { + if err := json.Unmarshal(req.Params, v); err != nil { + return status.Newf(codes.InvalidParams, "invalid params").Err() + } + + return nil + } + + // NOTE: The handler is being called synchonously. There's an option to + // do this in a goroutine instead. Also, there's the option to put the + // goroutine in the method desc for specific methods instead. + r, err := m.method.Handler(m.service.serviceImpl, context.TODO(), df) + if err != nil { + s, ok := status.FromError(err) + if !ok { + log.Printf("Internal error when processing custom message '%s' from peer '%s': %v", string(msg.Data), msg.PeerId, err) + s = status.New(codes.InternalError, InternalError) + } + + go sendError(lis, msg, req, s) + continue + } + + go sendResponse(lis, msg, req, r) + } +} + +func sendResponse( + lis lightning.CustomMsgClient, + in *lightning.CustomMessage, + req *jsonrpc.Request, + params interface{}, +) { + rd, err := json.Marshal(params) + if err != nil { + log.Printf("Failed to mashal response params '%+v'", params) + sendError(lis, in, req, status.New(codes.InternalError, InternalError)) + return + } + + resp := &jsonrpc.Response{ + JsonRpc: jsonrpc.Version, + Id: req.Id, + Result: rd, + } + res, err := json.Marshal(resp) + if err != nil { + log.Printf("Failed to mashal response '%+v'", resp) + sendError(lis, in, req, status.New(codes.InternalError, InternalError)) + return + } + + msg := &lightning.CustomMessage{ + PeerId: in.PeerId, + Type: Lsps0MessageType, + Data: res, + } + + err = lis.Send(msg) + if err != nil { + log.Printf("Failed to send response message '%s' to request '%s' to peer '%s': %v", string(msg.Data), string(in.Data), msg.PeerId, err) + return + } +} + +func sendError( + lis lightning.CustomMsgClient, + in *lightning.CustomMessage, + req *jsonrpc.Request, + status *status.Status, +) { + var id *string + if req != nil && req.Id != "" { + id = &req.Id + } + resp := &jsonrpc.Error{ + JsonRpc: jsonrpc.Version, + Id: id, + Error: jsonrpc.ErrorBody{ + Code: int32(status.Code), + Message: status.Message, + Data: nil, + }, + } + + res, err := json.Marshal(resp) + if err != nil { + log.Printf("Failed to mashal response '%+v'", resp) + return + } + + msg := &lightning.CustomMessage{ + PeerId: in.PeerId, + Type: Lsps0MessageType, + Data: res, + } + + err = lis.Send(msg) + if err != nil { + log.Printf("Failed to send message '%s' to peer '%s': %v", string(msg.Data), msg.PeerId, err) + return + } +} + +func (s *Server) RegisterService(desc *ServiceDesc, impl interface{}) { + s.mu.Lock() + defer s.mu.Unlock() + log.Printf("RegisterService(%q)", desc.ServiceName) + if s.serve { + log.Fatalf("lsps0: Server.RegisterService after Server.Serve for %q", desc.ServiceName) + } + if _, ok := s.services[desc.ServiceName]; ok { + log.Fatalf("lsps0: Server.RegisterService found duplicate service registration for %q", desc.ServiceName) + } + info := &serviceInfo{ + serviceImpl: impl, + methods: make(map[string]*MethodDesc), + } + for i := range desc.Methods { + d := &desc.Methods[i] + if _, ok := s.methods[d.MethodName]; ok { + log.Fatalf("lsps0: Server.RegisterService found duplicate method registration for %q", d.MethodName) + } + info.methods[d.MethodName] = d + s.methods[d.MethodName] = &methodInfo{ + service: info, + method: d, + } + } + s.services[desc.ServiceName] = info +} + +type ServiceDesc struct { + ServiceName string + // The pointer to the service interface. Used to check whether the user + // provided implementation satisfies the interface requirements. + HandlerType interface{} + Methods []MethodDesc +} + +type MethodDesc struct { + MethodName string + Handler methodHandler +} + +type methodHandler func(srv interface{}, ctx context.Context, dec func(interface{}) error) (interface{}, error) + +// ServiceRegistrar wraps a single method that supports service registration. It +// enables users to pass concrete types other than grpc.Server to the service +// registration methods exported by the IDL generated code. +type ServiceRegistrar interface { + // RegisterService registers a service and its implementation to the + // concrete type implementing this interface. It may not be called + // once the server has started serving. + // desc describes the service and its methods and handlers. impl is the + // service implementation which is passed to the method handlers. + RegisterService(desc *ServiceDesc, impl interface{}) +} diff --git a/lsps0/server_test.go b/lsps0/server_test.go new file mode 100644 index 00000000..f8672d69 --- /dev/null +++ b/lsps0/server_test.go @@ -0,0 +1,243 @@ +package lsps0 + +import ( + "encoding/json" + "testing" + + "github.com/breez/lspd/lightning" + "github.com/breez/lspd/lsps0/jsonrpc" + "github.com/stretchr/testify/assert" +) + +type MockLightningImpl struct { + lightning.CustomMsgClient + req chan *lightning.CustomMessage + err error + resp chan *lightning.CustomMessage +} + +func newMock(err error) *MockLightningImpl { + return &MockLightningImpl{ + req: make(chan *lightning.CustomMessage, 100), + err: err, + resp: make(chan *lightning.CustomMessage, 100), + } +} + +func (m *MockLightningImpl) Recv() (*lightning.CustomMessage, error) { + msg := <-m.req + return msg, m.err +} + +func (m *MockLightningImpl) Send(c *lightning.CustomMessage) error { + m.resp <- c + return nil +} + +func TestSuccess(t *testing.T) { + srv := NewServer() + pSrv := &protocolServer{ + protocols: []uint32{1, 2}, + } + RegisterProtocolServer(srv, pSrv) + rawMsg := `{ + "method": "lsps0.list_protocols", + "jsonrpc": "2.0", + "id": "example#3cad6a54d302edba4c9ade2f7ffac098", + "params": {} + }` + mock := newMock(nil) + mock.req <- &lightning.CustomMessage{ + PeerId: "AAA", + Type: 37913, + Data: []byte(rawMsg), + } + + go srv.Serve(mock) + + resp := <-mock.resp + + r := new(jsonrpc.Response) + err := json.Unmarshal(resp.Data, r) + assert.NoError(t, err) + + assert.Equal(t, uint32(37913), resp.Type) + assert.Equal(t, "AAA", resp.PeerId) + assert.Equal(t, "example#3cad6a54d302edba4c9ade2f7ffac098", r.Id) + assert.Equal(t, "2.0", r.JsonRpc) + + result := new(ListProtocolsResponse) + err = json.Unmarshal(r.Result, result) + assert.NoError(t, err) + + assert.Equal(t, []uint32{1, 2}, result.Protocols) +} + +func TestInvalidRequest(t *testing.T) { + srv := NewServer() + pSrv := &protocolServer{ + protocols: []uint32{1, 2}, + } + RegisterProtocolServer(srv, pSrv) + rawMsg := `{ + "method": "lsps0.list_protocols", + "jsonrpc": "2.0", + "id": "example#3cad6a54d302edba4c9ade2f7ffac098", + "params": {} + ` + mock := newMock(nil) + mock.req <- &lightning.CustomMessage{ + PeerId: "AAA", + Type: 37913, + Data: []byte(rawMsg), + } + go srv.Serve(mock) + + resp := <-mock.resp + + r := new(jsonrpc.Error) + err := json.Unmarshal(resp.Data, r) + assert.NoError(t, err) + + assert.Equal(t, uint32(37913), resp.Type) + assert.Equal(t, "AAA", resp.PeerId) + assert.Nil(t, r.Id) + assert.Equal(t, "2.0", r.JsonRpc) + assert.Equal(t, r.Error.Code, int32(-32700)) + assert.Equal(t, r.Error.Message, "bad message format") +} + +func TestInvalidJsonRpcVersion(t *testing.T) { + srv := NewServer() + pSrv := &protocolServer{ + protocols: []uint32{1, 2}, + } + RegisterProtocolServer(srv, pSrv) + rawMsg := `{ + "method": "lsps0.list_protocols", + "jsonrpc": "1.0", + "id": "example#3cad6a54d302edba4c9ade2f7ffac098", + "params": {} + }` + mock := newMock(nil) + mock.req <- &lightning.CustomMessage{ + PeerId: "AAA", + Type: 37913, + Data: []byte(rawMsg), + } + go srv.Serve(mock) + + resp := <-mock.resp + + r := new(jsonrpc.Error) + err := json.Unmarshal(resp.Data, r) + assert.NoError(t, err) + + assert.Equal(t, uint32(37913), resp.Type) + assert.Equal(t, "AAA", resp.PeerId) + assert.Equal(t, "example#3cad6a54d302edba4c9ade2f7ffac098", *r.Id) + assert.Equal(t, "2.0", r.JsonRpc) + assert.Equal(t, r.Error.Code, int32(-32600)) + assert.Equal(t, r.Error.Message, "Expected jsonrpc 2.0, found 1.0") +} + +func TestInvalidJsonRpcVersionNoId(t *testing.T) { + srv := NewServer() + pSrv := &protocolServer{ + protocols: []uint32{1, 2}, + } + RegisterProtocolServer(srv, pSrv) + rawMsg := `{ + "method": "lsps0.list_protocols", + "jsonrpc": "1.0", + "params": {} + }` + mock := newMock(nil) + mock.req <- &lightning.CustomMessage{ + PeerId: "AAA", + Type: 37913, + Data: []byte(rawMsg), + } + go srv.Serve(mock) + + resp := <-mock.resp + + r := new(jsonrpc.Error) + err := json.Unmarshal(resp.Data, r) + assert.NoError(t, err) + + assert.Equal(t, uint32(37913), resp.Type) + assert.Equal(t, "AAA", resp.PeerId) + assert.Nil(t, r.Id) + assert.Equal(t, "2.0", r.JsonRpc) + assert.Equal(t, r.Error.Code, int32(-32600)) + assert.Equal(t, r.Error.Message, "Expected jsonrpc 2.0, found 1.0") +} + +func TestUnknownMethod(t *testing.T) { + srv := NewServer() + pSrv := &protocolServer{ + protocols: []uint32{1, 2}, + } + RegisterProtocolServer(srv, pSrv) + rawMsg := `{ + "method": "lsps0.unknown", + "jsonrpc": "2.0", + "id": "example#3cad6a54d302edba4c9ade2f7ffac098", + "params": {} + }` + mock := newMock(nil) + mock.req <- &lightning.CustomMessage{ + PeerId: "AAA", + Type: 37913, + Data: []byte(rawMsg), + } + go srv.Serve(mock) + + resp := <-mock.resp + + r := new(jsonrpc.Error) + err := json.Unmarshal(resp.Data, r) + assert.NoError(t, err) + + assert.Equal(t, uint32(37913), resp.Type) + assert.Equal(t, "AAA", resp.PeerId) + assert.Equal(t, "example#3cad6a54d302edba4c9ade2f7ffac098", *r.Id) + assert.Equal(t, "2.0", r.JsonRpc) + assert.Equal(t, r.Error.Code, int32(-32601)) + assert.Equal(t, r.Error.Message, "method not found") +} + +func TestInvalidParams(t *testing.T) { + srv := NewServer() + pSrv := &protocolServer{ + protocols: []uint32{1, 2}, + } + RegisterProtocolServer(srv, pSrv) + rawMsg := `{ + "method": "lsps0.list_protocols", + "jsonrpc": "2.0", + "id": "example#3cad6a54d302edba4c9ade2f7ffac098", + "params": [] + }` + mock := newMock(nil) + mock.req <- &lightning.CustomMessage{ + PeerId: "AAA", + Type: 37913, + Data: []byte(rawMsg), + } + go srv.Serve(mock) + + resp := <-mock.resp + + r := new(jsonrpc.Error) + err := json.Unmarshal(resp.Data, r) + assert.NoError(t, err) + + assert.Equal(t, uint32(37913), resp.Type) + assert.Equal(t, "AAA", resp.PeerId) + assert.Equal(t, "example#3cad6a54d302edba4c9ade2f7ffac098", *r.Id) + assert.Equal(t, "2.0", r.JsonRpc) + assert.Equal(t, r.Error.Code, int32(-32602)) + assert.Equal(t, r.Error.Message, "invalid params") +} diff --git a/lsps0/status/status.go b/lsps0/status/status.go new file mode 100644 index 00000000..8292c91f --- /dev/null +++ b/lsps0/status/status.go @@ -0,0 +1,62 @@ +package status + +import ( + "fmt" + + "github.com/breez/lspd/lsps0/codes" +) + +type Status struct { + Code codes.Code + Message string +} + +func New(c codes.Code, msg string) *Status { + return &Status{Code: c, Message: msg} +} + +func Newf(c codes.Code, format string, a ...interface{}) *Status { + return New(c, fmt.Sprintf(format, a...)) +} + +func FromError(err error) (s *Status, ok bool) { + if err == nil { + return nil, true + } + if se, ok := err.(interface { + Lsps0Status() *Status + }); ok { + return se.Lsps0Status(), true + } + return New(codes.Unknown, err.Error()), false +} + +// Convert is a convenience function which removes the need to handle the +// boolean return value from FromError. +func Convert(err error) *Status { + s, _ := FromError(err) + return s +} + +func (s *Status) Err() error { + if s.Code == codes.OK { + return nil + } + return &Error{s: s} +} + +func (s *Status) String() string { + return fmt.Sprintf("lsps0 error: code = %d desc = %s", int32(s.Code), s.Message) +} + +type Error struct { + s *Status +} + +func (e *Error) Error() string { + return e.s.String() +} + +func (e *Error) Lsps0Status() *Status { + return e.s +} From 52903bd71b444938fed273c00645df1c7106d511 Mon Sep 17 00:00:00 2001 From: Jesse de Wit Date: Fri, 11 Aug 2023 11:47:06 +0200 Subject: [PATCH 4/9] lsps0: implement cln custommsg client --- cln/custom_msg_client.go | 175 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 175 insertions(+) create mode 100644 cln/custom_msg_client.go diff --git a/cln/custom_msg_client.go b/cln/custom_msg_client.go new file mode 100644 index 00000000..79121f4a --- /dev/null +++ b/cln/custom_msg_client.go @@ -0,0 +1,175 @@ +package cln + +import ( + "context" + "encoding/binary" + "encoding/hex" + "log" + "sync" + "time" + + "github.com/breez/lspd/cln_plugin/proto" + "github.com/breez/lspd/config" + "github.com/breez/lspd/lightning" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/keepalive" + "google.golang.org/grpc/status" +) + +type CustomMsgClient struct { + lightning.CustomMsgClient + pluginAddress string + client *ClnClient + pluginClient proto.ClnPluginClient + initWg sync.WaitGroup + stopRequested bool + ctx context.Context + cancel context.CancelFunc + recvQueue chan *lightning.CustomMessage +} + +func NewCustomMsgClient(conf *config.ClnConfig, client *ClnClient) *CustomMsgClient { + c := &CustomMsgClient{ + pluginAddress: conf.PluginAddress, + client: client, + recvQueue: make(chan *lightning.CustomMessage, 10000), + } + + c.initWg.Add(1) + return c +} + +func (c *CustomMsgClient) Start() error { + ctx, cancel := context.WithCancel(context.Background()) + log.Printf("Dialing cln plugin on '%s'", c.pluginAddress) + conn, err := grpc.DialContext( + ctx, + c.pluginAddress, + grpc.WithTransportCredentials(insecure.NewCredentials()), + grpc.WithKeepaliveParams(keepalive.ClientParameters{ + Time: time.Duration(10) * time.Second, + Timeout: time.Duration(10) * time.Second, + }), + ) + if err != nil { + log.Printf("grpc.Dial error: %v", err) + cancel() + return err + } + + c.pluginClient = proto.NewClnPluginClient(conn) + c.ctx = ctx + c.cancel = cancel + c.stopRequested = false + return c.listen() +} + +func (i *CustomMsgClient) WaitStarted() { + i.initWg.Wait() +} + +func (i *CustomMsgClient) listen() error { + inited := false + + defer func() { + if !inited { + i.initWg.Done() + } + log.Printf("CLN custom msg listen(): stopping.") + }() + + for { + if i.ctx.Err() != nil { + return i.ctx.Err() + } + + log.Printf("Connecting CLN msg stream.") + msgClient, err := i.pluginClient.CustomMsgStream(i.ctx, &proto.CustomMessageRequest{}) + if err != nil { + log.Printf("pluginClient.CustomMsgStream(): %v", err) + <-time.After(time.Second) + continue + } + + for { + if i.ctx.Err() != nil { + return i.ctx.Err() + } + + if !inited { + inited = true + i.initWg.Done() + } + + // Stop receiving if stop if requested. + if i.stopRequested { + return nil + } + + request, err := msgClient.Recv() + if err != nil { + // If it is just the error result of the context cancellation + // the we exit silently. + status, ok := status.FromError(err) + if ok && status.Code() == codes.Canceled { + log.Printf("Got code canceled. Break.") + break + } + + // Otherwise it an unexpected error, we log. + log.Printf("unexpected error in interceptor.Recv() %v", err) + break + } + + payload, err := hex.DecodeString(request.Payload) + if err != nil { + log.Printf("Error hex decoding cln custom msg payload from peer '%s': %v", request.PeerId, err) + continue + } + + if len(payload) < 3 { + log.Printf("UNUSUAL: Custom msg payload from peer '%s' is too small", request.PeerId) + continue + } + + t := binary.BigEndian.Uint16(payload) + payload = payload[2:] + i.recvQueue <- &lightning.CustomMessage{ + PeerId: request.PeerId, + Type: uint32(t), + Data: payload, + } + } + + <-time.After(time.Second) + } +} + +func (c *CustomMsgClient) Recv() (*lightning.CustomMessage, error) { + select { + case msg := <-c.recvQueue: + return msg, nil + case <-c.ctx.Done(): + return nil, c.ctx.Err() + } +} + +func (c *CustomMsgClient) Send(msg *lightning.CustomMessage) error { + var t [2]byte + binary.BigEndian.PutUint16(t[:], uint16(msg.Type)) + + m := hex.EncodeToString(t[:]) + hex.EncodeToString(msg.Data) + _, err := c.client.client.SendCustomMessage(msg.PeerId, m) + return err +} + +func (i *CustomMsgClient) Stop() error { + // Setting stopRequested to true will make the interceptor stop receiving. + i.stopRequested = true + + // Close the grpc connection. + i.cancel() + return nil +} From 090079d90ce43ae38dedfebc0dcf25cf55ddd89c Mon Sep 17 00:00:00 2001 From: Jesse de Wit Date: Fri, 11 Aug 2023 14:20:29 +0200 Subject: [PATCH 5/9] lsps0: hook up lsps0 server --- main.go | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/main.go b/main.go index 29657387..00e065c2 100644 --- a/main.go +++ b/main.go @@ -15,6 +15,7 @@ import ( "github.com/breez/lspd/config" "github.com/breez/lspd/interceptor" "github.com/breez/lspd/lnd" + "github.com/breez/lspd/lsps0" "github.com/breez/lspd/mempool" "github.com/breez/lspd/notifications" "github.com/breez/lspd/postgresql" @@ -110,6 +111,15 @@ func main() { if err != nil { log.Fatalf("failed to initialize CLN interceptor: %v", err) } + + msgClient := cln.NewCustomMsgClient(node.Cln, client) + go msgClient.Start() + msgServer := lsps0.NewServer() + protocolServer := lsps0.NewProtocolServer([]uint32{2}) + lsps0.RegisterProtocolServer(msgServer, protocolServer) + msgClient.WaitStarted() + defer msgClient.Stop() + go msgServer.Serve(msgClient) } if htlcInterceptor == nil { From 69b0505af7d57bdc917c69f5ab0120158f7f41a4 Mon Sep 17 00:00:00 2001 From: Jesse de Wit Date: Fri, 11 Aug 2023 14:20:45 +0200 Subject: [PATCH 6/9] lsps0: add integration test --- .github/workflows/integration_tests.yaml | 32 ++++++++ go.mod | 2 +- itest/breez_client.go | 1 + itest/cln_breez_client.go | 100 +++++++++++++++++++---- itest/lnd_breez_client.go | 5 ++ itest/lspd_test.go | 4 + itest/lsps0_protocol_version_test.go | 45 ++++++++++ 7 files changed, 171 insertions(+), 18 deletions(-) create mode 100644 itest/lsps0_protocol_version_test.go diff --git a/.github/workflows/integration_tests.yaml b/.github/workflows/integration_tests.yaml index 66677eb1..1fc4ca0a 100644 --- a/.github/workflows/integration_tests.yaml +++ b/.github/workflows/integration_tests.yaml @@ -124,3 +124,35 @@ jobs: CLIENT_REF: ${{ env.CLIENT_REF }} GO_VERSION: ${{ env.GO_VERSION }} CLN_VERSION: ${{ env.CLN_VERSION }} + + run-lsps2-test: + runs-on: ubuntu-22.04 + needs: + - setup-itest + - setup-bitcoin-core + - setup-cln + - build-lspd + name: test ${{ matrix.implementation }} ${{ matrix.test }} + strategy: + max-parallel: 6 + matrix: + test: [ + testLsps0GetProtocolVersions + ] + implementation: [ + CLN + ] + steps: + - name: Checkout + uses: actions/checkout@v3 + + - name: Run and Process Test State + uses: ./.github/actions/test-lspd + with: + TESTRE: "TestLspd/${{ matrix.implementation }}-lspd:_${{ matrix.test }}" + artifact-name: TestLspd-${{ matrix.implementation }}-lspd_${{ matrix.test }} + bitcoin-version: ${{ env.BITCOIN_VERSION }} + LSP_REF: ${{ env.LSP_REF }} + CLIENT_REF: ${{ env.CLIENT_REF }} + GO_VERSION: ${{ env.GO_VERSION }} + CLN_VERSION: ${{ env.CLN_VERSION }} diff --git a/go.mod b/go.mod index 4525e3eb..9fe25854 100644 --- a/go.mod +++ b/go.mod @@ -4,7 +4,7 @@ go 1.19 require ( github.com/aws/aws-sdk-go v1.34.0 - github.com/breez/lntest v0.0.26 + github.com/breez/lntest v0.0.27 github.com/btcsuite/btcd v0.23.5-0.20230228185050-38331963bddd github.com/btcsuite/btcd/btcec/v2 v2.3.2 github.com/btcsuite/btcd/chaincfg/chainhash v1.0.2 diff --git a/itest/breez_client.go b/itest/breez_client.go index 80f1fa85..f117b16b 100644 --- a/itest/breez_client.go +++ b/itest/breez_client.go @@ -21,6 +21,7 @@ type BreezClient interface { Stop() error SetHtlcAcceptor(totalMsat uint64) ResetHtlcAcceptor() + ReceiveCustomMessage() *lntest.CustomMsgRequest } type generateInvoicesRequest struct { diff --git a/itest/cln_breez_client.go b/itest/cln_breez_client.go index ee2ad666..9436179d 100644 --- a/itest/cln_breez_client.go +++ b/itest/cln_breez_client.go @@ -4,6 +4,7 @@ import ( "bufio" "bytes" "context" + "encoding/binary" "encoding/hex" "fmt" "io" @@ -51,16 +52,18 @@ python %s ` type clnBreezClient struct { - name string - scriptDir string - pluginFilePath string - htlcAcceptorAddress string - htlcAcceptor func(*proto.HtlcAccepted) *proto.HtlcResolution - htlcAcceptorCancel context.CancelFunc - harness *lntest.TestHarness - isInitialized bool - node *lntest.ClnNode - mtx sync.Mutex + name string + scriptDir string + pluginFilePath string + pluginAddress string + htlcAcceptor func(*proto.HtlcAccepted) *proto.HtlcResolution + htlcAcceptorCancel context.CancelFunc + customMsgCancel context.CancelFunc + customMsgQueue chan *lntest.CustomMsgRequest + harness *lntest.TestHarness + isInitialized bool + node *lntest.ClnNode + mtx sync.Mutex } func newClnBreezClient(h *lntest.TestHarness, m *lntest.Miner, name string) BreezClient { @@ -89,12 +92,12 @@ func newClnBreezClient(h *lntest.TestHarness, m *lntest.Miner, name string) Bree ) return &clnBreezClient{ - name: name, - harness: h, - node: node, - scriptDir: scriptDir, - pluginFilePath: pluginFilePath, - htlcAcceptorAddress: htlcAcceptorAddress, + name: name, + harness: h, + node: node, + scriptDir: scriptDir, + pluginFilePath: pluginFilePath, + pluginAddress: htlcAcceptorAddress, } } @@ -121,6 +124,8 @@ func (c *clnBreezClient) Start() { c.node.Start() c.startHtlcAcceptor() + c.customMsgQueue = make(chan *lntest.CustomMsgRequest, 100) + c.startCustomMsgListener() } func (c *clnBreezClient) ResetHtlcAcceptor() { @@ -202,6 +207,67 @@ func (c *clnBreezClient) SetHtlcAcceptor(totalMsat uint64) { } } +func (c *clnBreezClient) startCustomMsgListener() { + ctx, cancel := context.WithCancel(c.harness.Ctx) + c.customMsgCancel = cancel + + go func() { + for { + if ctx.Err() != nil { + return + } + + select { + case <-ctx.Done(): + return + case <-time.After(time.Second): + } + + conn, err := grpc.DialContext( + ctx, + c.pluginAddress, + grpc.WithTransportCredentials(insecure.NewCredentials()), + grpc.WithKeepaliveParams(keepalive.ClientParameters{ + Time: time.Duration(10) * time.Second, + Timeout: time.Duration(10) * time.Second, + }), + ) + if err != nil { + log.Printf("%s: Dial htlc acceptor error: %v", c.name, err) + continue + } + + client := proto.NewClnPluginClient(conn) + listener, err := client.CustomMsgStream(ctx, &proto.CustomMessageRequest{}) + if err != nil { + log.Printf("%s: client.CustomMsgStream() error: %v", c.name, err) + break + } + for { + msg, err := listener.Recv() + if err != nil { + log.Printf("%s: listener.Recv() error: %v", c.name, err) + break + } + + payload, err := hex.DecodeString(msg.Payload) + lntest.CheckError(c.harness.T, err) + + c.customMsgQueue <- &lntest.CustomMsgRequest{ + PeerId: msg.PeerId, + Type: uint32(binary.BigEndian.Uint16(payload)), + Data: payload[2:], + } + } + } + }() +} + +func (c *clnBreezClient) ReceiveCustomMessage() *lntest.CustomMsgRequest { + msg := <-c.customMsgQueue + return msg +} + func (c *clnBreezClient) startHtlcAcceptor() { ctx, cancel := context.WithCancel(c.harness.Ctx) c.htlcAcceptorCancel = cancel @@ -220,7 +286,7 @@ func (c *clnBreezClient) startHtlcAcceptor() { conn, err := grpc.DialContext( ctx, - c.htlcAcceptorAddress, + c.pluginAddress, grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithKeepaliveParams(keepalive.ClientParameters{ Time: time.Duration(10) * time.Second, diff --git a/itest/lnd_breez_client.go b/itest/lnd_breez_client.go index c95febd8..c86be474 100644 --- a/itest/lnd_breez_client.go +++ b/itest/lnd_breez_client.go @@ -86,6 +86,11 @@ func (c *lndBreezClient) SetHtlcAcceptor(totalMsat uint64) { // No need for a htlc acceptor in the LND breez client } +func (c *lndBreezClient) ReceiveCustomMessage() *lntest.CustomMsgRequest { + // TODO: Not implemented. + return nil +} + func (c *lndBreezClient) startChannelAcceptor(ctx context.Context) error { client, err := c.node.LightningClient().ChannelAcceptor(ctx) if err != nil { diff --git a/itest/lspd_test.go b/itest/lspd_test.go index 098d68a1..39ad0657 100644 --- a/itest/lspd_test.go +++ b/itest/lspd_test.go @@ -163,4 +163,8 @@ var allTestCases = []*testCase{ name: "testOfflineNotificationZeroConfChannel", test: testOfflineNotificationZeroConfChannel, }, + { + name: "testLsps0GetProtocolVersions", + test: testLsps0GetProtocolVersions, + }, } diff --git a/itest/lsps0_protocol_version_test.go b/itest/lsps0_protocol_version_test.go new file mode 100644 index 00000000..d0d332d3 --- /dev/null +++ b/itest/lsps0_protocol_version_test.go @@ -0,0 +1,45 @@ +package itest + +import ( + "encoding/hex" + "encoding/json" + + "github.com/breez/lntest" + "github.com/breez/lspd/lsps0" + "github.com/stretchr/testify/assert" +) + +func testLsps0GetProtocolVersions(p *testParams) { + p.BreezClient().Node().ConnectPeer(p.Lsp().LightningNode()) + + rawMsg := `{ + "method": "lsps0.list_protocols", + "jsonrpc": "2.0", + "id": "example#3cad6a54d302edba4c9ade2f7ffac098", + "params": {} + }` + p.BreezClient().Node().SendCustomMessage(&lntest.CustomMsgRequest{ + PeerId: hex.EncodeToString(p.Lsp().NodeId()), + Type: lsps0.Lsps0MessageType, + Data: []byte(rawMsg), + }) + + resp := p.BreezClient().ReceiveCustomMessage() + assert.Equal(p.t, uint32(37913), resp.Type) + + content := make(map[string]interface{}) + err := json.Unmarshal(resp.Data[:], &content) + lntest.CheckError(p.t, err) + + assert.Equal(p.t, "2.0", content["jsonrpc"]) + assert.Equal(p.t, "example#3cad6a54d302edba4c9ade2f7ffac098", content["id"]) + + content2 := make(map[string]json.RawMessage) + err = json.Unmarshal(resp.Data[:], &content2) + lntest.CheckError(p.t, err) + + result := make(map[string][]int) + err = json.Unmarshal(content2["result"], &result) + lntest.CheckError(p.t, err) + assert.Equal(p.t, []int{2}, result["protocols"]) +} From d4687c5e2a1d5482b93af28719fd2cc59f1659b1 Mon Sep 17 00:00:00 2001 From: Jesse de Wit Date: Mon, 14 Aug 2023 09:30:07 +0200 Subject: [PATCH 7/9] lsps0: set cln feature bit --- cln_plugin/cln_plugin.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/cln_plugin/cln_plugin.go b/cln_plugin/cln_plugin.go index 506a77ba..75002cae 100644 --- a/cln_plugin/cln_plugin.go +++ b/cln_plugin/cln_plugin.go @@ -25,6 +25,7 @@ const ( var ( DefaultSubscriberTimeout = "1m" DefaultChannelAcceptorScript = "" + LspsFeatureBit = "0200000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000" ) const ( @@ -299,7 +300,7 @@ func (c *ClnPlugin) handleGetManifest(request *Request) { Description: "Set the startlark channel acceptor script", }, }, - Dynamic: true, + Dynamic: false, Hooks: []Hook{ {Name: "custommsg"}, {Name: "htlc_accepted"}, @@ -310,6 +311,9 @@ func (c *ClnPlugin) handleGetManifest(request *Request) { Subscriptions: []string{ "shutdown", }, + FeatureBits: &FeatureBits{ + Node: &LspsFeatureBit, + }, }, }) } From 90903d84518793e35d6dbd294ae0b11cfd637e6b Mon Sep 17 00:00:00 2001 From: Jesse de Wit Date: Thu, 17 Aug 2023 09:37:54 +0200 Subject: [PATCH 8/9] lsps0: handle requests in a goroutine --- lsps0/server.go | 28 ++++++++++++++++++++++------ 1 file changed, 22 insertions(+), 6 deletions(-) diff --git a/lsps0/server.go b/lsps0/server.go index 85c01a8b..f7d71da3 100644 --- a/lsps0/server.go +++ b/lsps0/server.go @@ -20,6 +20,7 @@ var ErrServerStopped = errors.New("lsps0: the server has been stopped") var Lsps0MessageType uint32 = 37913 var BadMessageFormatError string = "bad message format" var InternalError string = "internal error" +var MaxSimultaneousRequests = 25 // ServiceDesc and is constructed from it for internal purposes. type serviceInfo struct { @@ -61,6 +62,7 @@ func (s *Server) Serve(lis lightning.CustomMsgClient) error { s.mu.Unlock() }() + guard := make(chan struct{}, MaxSimultaneousRequests) for { msg, err := lis.Recv() if err != nil { @@ -113,6 +115,9 @@ func (s *Server) Serve(lis lightning.CustomMsgClient) error { continue } + // Deserialization step of the request params. This function is called + // by method handlers of service implementations to deserialize the + // typed request object. df := func(v interface{}) error { if err := json.Unmarshal(req.Params, v); err != nil { return status.Newf(codes.InvalidParams, "invalid params").Err() @@ -121,9 +126,19 @@ func (s *Server) Serve(lis lightning.CustomMsgClient) error { return nil } - // NOTE: The handler is being called synchonously. There's an option to - // do this in a goroutine instead. Also, there's the option to put the - // goroutine in the method desc for specific methods instead. + // Will block if the guard queue is already filled to ensure + // MaxSimultaneousRequests is not exceeded. + guard <- struct{}{} + + // NOTE: The handler is being called asynchonously. This may cause the + // order of messages handled to be different from the order in which + // they were received. + go func() { + // Releases a queued item in the guard, to release a spot for + // another simultaneous request. + defer func() { <-guard }() + + // Call the method handler for the requested method. r, err := m.method.Handler(m.service.serviceImpl, context.TODO(), df) if err != nil { s, ok := status.FromError(err) @@ -132,11 +147,12 @@ func (s *Server) Serve(lis lightning.CustomMsgClient) error { s = status.New(codes.InternalError, InternalError) } - go sendError(lis, msg, req, s) - continue + sendError(lis, msg, req, s) + return } - go sendResponse(lis, msg, req, r) + sendResponse(lis, msg, req, r) + }() } } From cc26cc8aca3cb13cb674cdeebeda13dbd065618b Mon Sep 17 00:00:00 2001 From: Jesse de Wit Date: Thu, 17 Aug 2023 09:38:43 +0200 Subject: [PATCH 9/9] lsps0: check service impl satisfies handler type. --- lsps0/server.go | 27 +++++++++++++++++++-------- 1 file changed, 19 insertions(+), 8 deletions(-) diff --git a/lsps0/server.go b/lsps0/server.go index f7d71da3..c964ed2e 100644 --- a/lsps0/server.go +++ b/lsps0/server.go @@ -5,6 +5,7 @@ import ( "encoding/json" "errors" "log" + "reflect" "sync" "time" @@ -139,17 +140,17 @@ func (s *Server) Serve(lis lightning.CustomMsgClient) error { defer func() { <-guard }() // Call the method handler for the requested method. - r, err := m.method.Handler(m.service.serviceImpl, context.TODO(), df) - if err != nil { - s, ok := status.FromError(err) - if !ok { - log.Printf("Internal error when processing custom message '%s' from peer '%s': %v", string(msg.Data), msg.PeerId, err) - s = status.New(codes.InternalError, InternalError) - } + r, err := m.method.Handler(m.service.serviceImpl, context.TODO(), df) + if err != nil { + s, ok := status.FromError(err) + if !ok { + log.Printf("Internal error when processing custom message '%s' from peer '%s': %v", string(msg.Data), msg.PeerId, err) + s = status.New(codes.InternalError, InternalError) + } sendError(lis, msg, req, s) return - } + } sendResponse(lis, msg, req, r) }() @@ -234,6 +235,16 @@ func sendError( } func (s *Server) RegisterService(desc *ServiceDesc, impl interface{}) { + if impl == nil { + log.Fatalf("lsps0: Server.RegisterService got nil service implementation") + } + + ht := reflect.TypeOf(desc.HandlerType).Elem() + st := reflect.TypeOf(impl) + if !st.Implements(ht) { + log.Fatalf("lsps0: Server.RegisterService found the handler of type %v that does not satisfy %v", st, ht) + } + s.mu.Lock() defer s.mu.Unlock() log.Printf("RegisterService(%q)", desc.ServiceName)