Skip to content

Commit

Permalink
feat: complete daemon client/server test
Browse files Browse the repository at this point in the history
  • Loading branch information
nedpals committed Feb 1, 2024
1 parent b95d642 commit 7d21380
Show file tree
Hide file tree
Showing 3 changed files with 311 additions and 86 deletions.
47 changes: 27 additions & 20 deletions server/daemon/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func (c *Client) SetId(id int) {

func (c *Client) processIdField() jsonrpc2.CallOption {
// TODO: if !handshake { return nil }
if c.processId <= 0 {
if c.processId < 0 {
return nil
}
return jsonrpc2.ExtraField("processId", c.processId)
Expand Down Expand Up @@ -99,31 +99,38 @@ func (c *Client) tryReconnect(reason error) error {
return c.Connect()
}

func (c *Client) SetConn(conn net.Conn) {
c.tcpConn = conn
}

func (c *Client) Connect() error {
if c.context == nil {
c.context = context.Background()
}

conn, err := net.Dial("tcp", c.addr)
if err != nil {
if err, ok := err.(*net.OpError); ok {
if strings.HasSuffix(err.Err.Error(), "connection refused") {
return c.tryReconnect(err)
if c.tcpConn == nil {
conn, err := net.Dial("tcp", c.addr)
if err != nil {
if err, ok := err.(*net.OpError); ok {
if strings.HasSuffix(err.Err.Error(), "connection refused") {
return c.tryReconnect(err)
}
}
return err
}
return err
}

if err := conn.(*net.TCPConn).SetKeepAlive(true); err != nil {
return err
}
if err := conn.(*net.TCPConn).SetKeepAlive(true); err != nil {
return err
}

if err := conn.(*net.TCPConn).SetKeepAlivePeriod(10 * time.Second); err != nil {
return err
if err := conn.(*net.TCPConn).SetKeepAlivePeriod(10 * time.Second); err != nil {
return err
}

c.SetConn(conn)
}

c.connState = ConnectedState
c.tcpConn = conn
c.connRetries = 0

c.rpcConn = jsonrpc2.NewConn(
Expand Down Expand Up @@ -227,23 +234,23 @@ func (c *Client) Collect(errCode int, command, workingDir, errMsg string) (int,
}

func (c *Client) ResolveDocument(filepath string, content string) error {
return c.Notify(types.ResolveDocumentMethod, types.DocumentPayload{
return c.Call(types.ResolveDocumentMethod, types.DocumentPayload{
DocumentIdentifier: types.DocumentIdentifier{Filepath: filepath},
Content: content,
})
}, nil)
}

func (c *Client) UpdateDocument(filepath string, content string) error {
return c.Notify(types.UpdateDocumentMethod, types.DocumentPayload{
return c.Call(types.UpdateDocumentMethod, types.DocumentPayload{
DocumentIdentifier: types.DocumentIdentifier{Filepath: filepath},
Content: content,
})
}, nil)
}

func (c *Client) DeleteDocument(filepath string) error {
return c.Notify(types.DeleteDocumentMethod, types.DocumentIdentifier{
return c.Call(types.DeleteDocumentMethod, types.DocumentIdentifier{
Filepath: filepath,
})
}, nil)
}

func (c *Client) Handshake() error {
Expand Down
110 changes: 85 additions & 25 deletions server/daemon/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package server
import (
"context"
"encoding/json"
"errors"
"fmt"
"io/fs"
"os"
Expand Down Expand Up @@ -53,40 +54,35 @@ func (d *Server) FS() *helpers.SharedFS {
return d.engine.FS.FSs[0].(*helpers.SharedFS)
}

func (d *Server) getProcessId(r *jsonrpc2.Request) int {
func (d *Server) getProcessId(r *jsonrpc2.Request) (int, error) {
for _, req := range r.ExtraFields {
if req.Name != "processId" {
continue
} else if procId, ok := req.Value.(json.Number); ok {
if num, err := procId.Int64(); err == nil {
return int(num)
} else {
return -1
}
} else {
return -2
}
procId := req.Value.(json.Number)
num, err := procId.Int64()
if err != nil {
break
}
return int(num), nil
}
return -1
return -1, errors.New("processId not found")
}

func (d *Server) checkProcessConnection(r *jsonrpc2.Request) *jsonrpc2.Error {
procId := d.getProcessId(r)

if _, found := d.connectedClients[procId]; !found {
return &jsonrpc2.Error{
Code: jsonrpc2.CodeInvalidRequest,
Message: "Process not connected yet.",
}
} else if procId == -2 {
procId, err := d.getProcessId(r)
if err != nil {
fmt.Println(err.Error())
return &jsonrpc2.Error{
Code: jsonrpc2.CodeInvalidRequest,
Message: "Invalid process ID",
Message: "Process ID not found",
}
} else if procId == -1 {
}

if _, found := d.connectedClients[procId]; !found {
return &jsonrpc2.Error{
Code: jsonrpc2.CodeInvalidRequest,
Message: "Process ID not found",
Message: "Process not connected yet.",
}
}

Expand Down Expand Up @@ -129,7 +125,14 @@ func (d *Server) Handle(ctx context.Context, c *jsonrpc2.Conn, r *jsonrpc2.Reque
d.notifyErrors(ctx, info.ProcessId)
}
case types.ShutdownMethod:
procId := d.getProcessId(r)
procId, err := d.getProcessId(r)
if err != nil {
c.ReplyWithError(ctx, r.ID, &jsonrpc2.Error{
Message: err.Error(),
})
return
}

delete(d.connectedClients, procId)
fmt.Printf("> disconnected: {process_id: %d}\n", procId)
case types.CollectMethod:
Expand All @@ -156,7 +159,13 @@ func (d *Server) Handle(ctx context.Context, c *jsonrpc2.Conn, r *jsonrpc2.Reque
})
}
case types.PingMethod:
procId := d.getProcessId(r)
procId, err := d.getProcessId(r)
if err != nil {
c.ReplyWithError(ctx, r.ID, &jsonrpc2.Error{
Message: err.Error(),
})
return
}
fmt.Printf("> ping from %d\n", procId)
c.Reply(ctx, r.ID, "pong!")
case types.ResolveDocumentMethod:
Expand All @@ -168,6 +177,13 @@ func (d *Server) Handle(ctx context.Context, c *jsonrpc2.Conn, r *jsonrpc2.Reque
return
}

if len(payloadStr.Filepath) == 0 {
c.ReplyWithError(ctx, r.ID, &jsonrpc2.Error{
Message: "Filepath is empty",
})
return
}

if err := d.FS().WriteFile(payloadStr.Filepath, []byte(payloadStr.Content)); err != nil {
c.ReplyWithError(ctx, r.ID, &jsonrpc2.Error{
Message: err.Error(),
Expand All @@ -176,6 +192,7 @@ func (d *Server) Handle(ctx context.Context, c *jsonrpc2.Conn, r *jsonrpc2.Reque
}

fmt.Printf("> resolved document: %s (len: %d)\n", payloadStr.Filepath, len(payloadStr.Content))
c.Reply(ctx, r.ID, "ok")
case types.UpdateDocumentMethod:
var payloadStr types.DocumentPayload
if err := json.Unmarshal(*r.Params, &payloadStr); err != nil {
Expand All @@ -185,10 +202,39 @@ func (d *Server) Handle(ctx context.Context, c *jsonrpc2.Conn, r *jsonrpc2.Reque
return
}

if len(payloadStr.Filepath) == 0 {
c.ReplyWithError(ctx, r.ID, &jsonrpc2.Error{
Message: "Filepath is empty",
})
return
}

// check if the file exists
if file, err := d.FS().Open(payloadStr.Filepath); errors.Is(err, fs.ErrNotExist) {
c.ReplyWithError(ctx, r.ID, &jsonrpc2.Error{
Message: "File does not exist",
})
return
} else if err != nil {
c.ReplyWithError(ctx, r.ID, &jsonrpc2.Error{
Message: err.Error(),
})
return
} else {
file.Close()
}

// IDEA: create a dependency tree wherein errors will be removed
// once the file is updated
d.FS().WriteFile(payloadStr.Filepath, []byte(payloadStr.Content))
if err := d.FS().WriteFile(payloadStr.Filepath, []byte(payloadStr.Content)); err != nil {
c.ReplyWithError(ctx, r.ID, &jsonrpc2.Error{
Message: err.Error(),
})
return
}

fmt.Printf("> updated document: %s (len: %d)\n", payloadStr.Filepath, len(payloadStr.Content))
c.Reply(ctx, r.ID, "ok")
case types.DeleteDocumentMethod:
var payload types.DocumentIdentifier
if err := json.Unmarshal(*r.Params, &payload); err != nil {
Expand All @@ -198,9 +244,23 @@ func (d *Server) Handle(ctx context.Context, c *jsonrpc2.Conn, r *jsonrpc2.Reque
return
}

if len(payload.Filepath) == 0 {
c.ReplyWithError(ctx, r.ID, &jsonrpc2.Error{
Message: "Filepath is empty",
})
return
}

// TODO: use dependency tree
d.FS().Remove(payload.Filepath)
if err := d.FS().Remove(payload.Filepath); err != nil {
c.ReplyWithError(ctx, r.ID, &jsonrpc2.Error{
Message: err.Error(),
})
return
}

fmt.Printf("> removed document: %s\n", payload.Filepath)
c.Reply(ctx, r.ID, "ok")
case types.NearestNodeMethod:
var payload types.NearestNodePayload
if err := json.Unmarshal(*r.Params, &payload); err != nil {
Expand Down
Loading

0 comments on commit 7d21380

Please sign in to comment.