forked from bolt-observer/agent
-
Notifications
You must be signed in to change notification settings - Fork 0
/
connection_commando.go
154 lines (122 loc) · 2.99 KB
/
connection_commando.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
package lightning
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"strings"
"time"
"github.com/bolt-observer/agent/lnsocket"
)
// ClnCommandoConnection struct.
type ClnCommandoConnection struct {
ln *lnsocket.LN
addr string
rune string
ClnConnection
}
// Compile time check for the interface.
var _ ClnConnectionAPI = &ClnCommandoConnection{}
// NewCommandoConnection creates a new CLN connection.
func NewCommandoConnection(addr string, rune string, timeout time.Duration) *ClnCommandoConnection {
ret := &ClnCommandoConnection{}
ret.addr = addr
ret.rune = rune
ret.ln = lnsocket.NewLN(timeout)
return ret
}
// Call calls serviceMethod with args and fills reply with response.
func (l *ClnCommandoConnection) Call(ctx context.Context, serviceMethod string, args any, reply any, timeout time.Duration) error {
err := l.initConnection()
if err != nil {
return err
}
params := convertArgs(args)
reader, err := l.ln.NewCommandoReader(ctx, l.rune, serviceMethod, params, timeout)
if err != nil {
return err
}
// To get meaningful error messages
data, err := parseResp(reader)
if err != nil {
return err
}
err = json.Unmarshal(data.Result, &reply)
if err != nil {
return err
}
return nil
}
// StreamResponse is meant for streaming responses it calls serviceMethod with args and returns an io.Reader.
func (l *ClnCommandoConnection) StreamResponse(ctx context.Context, serviceMethod string, args any) (io.Reader, error) {
err := l.initConnection()
if err != nil {
return nil, err
}
params := convertArgs(args)
reader, err := l.ln.NewCommandoReader(ctx, l.rune, serviceMethod, params, 8*24*time.Hour)
if err != nil {
return nil, err
}
return reader, nil
}
// Cleanup does the cleanup.
func (l *ClnCommandoConnection) Cleanup() {
if l.ln != nil {
l.ln.Disconnect()
}
}
func (l *ClnCommandoConnection) initConnection() error {
// Check if connection is still usable
err := l.ln.Ping()
if err == nil {
return nil
}
l.ln.Disconnect()
err = l.ln.Connect(l.addr)
if err != nil {
return err
}
err = l.ln.Handshake()
if err != nil {
return err
}
return nil
}
func parseResp(reader io.Reader) (ClnSuccessResp, error) {
// Need to buffer the response (to parse it twice)
data, err := io.ReadAll(reader)
if err != nil {
return ClnSuccessResp{}, err
}
r := bytes.NewReader(data)
var errResp ClnErrorResp
err = json.NewDecoder(r).Decode(&errResp)
if err != nil {
return ClnSuccessResp{}, err
}
if errResp.Error.Code != 0 {
return ClnSuccessResp{}, fmt.Errorf("invalid response: %s", errResp.Error.Message)
}
var successResp ClnSuccessResp
r = bytes.NewReader(data)
err = json.NewDecoder(r).Decode(&successResp)
if err != nil {
return ClnSuccessResp{}, err
}
return successResp, nil
}
func convertArgs(param any) string {
const Empty = "[]"
if param == nil {
return Empty
}
sb := new(strings.Builder)
enc := json.NewEncoder(sb)
err := enc.Encode(param)
if err != nil {
return Empty
}
return strings.TrimRight(sb.String(), "\r\n\t")
}