From 8ab3ec333814a99812aea0bcdc652385754e6742 Mon Sep 17 00:00:00 2001 From: Solomon Himelbloom <7608183+TechSolomon@users.noreply.github.com> Date: Mon, 1 Apr 2024 19:25:53 -0800 Subject: [PATCH] Code freeze for first minor release (#39) * nit: minor code format * refactor: remove duplicate function contents * Updated configuration flow of control (#38) * feat: draft variable packet size experiment * docs: new section for project replicability * docs: markdown diagram (system control flow) + blockquote highlight * refactor: utility to insights + diode collection script * feat: module install & output binary * feat: begin subscribe and publish commands * feat: basic payload retrieval from standard input * refactor: prepare for complete message detection * feat: filter output from incoming payload * refactor: stream demo into encapsulator & republisher * refactor: option to delimit message & remove write new line * feat: draft message flow w/ client-server location * feat: mock MQTT connection prior to testing * feat: recieve message from given source location * refactor: next steps for outbound message flow * docs: text on links for architecture diagram * remove: start/end delimiter + message case * refactor: entry point for main application, build information, & testing * feat: split input & output metadata via project settings * fix: repackaged JSON object contents + diode metadata * docs: update main program directory name --- .gitignore | 3 + Makefile | 14 ++- README.md | 13 +-- config/settings.yaml | 33 ++++--- diode_test.go | 185 ------------------------------------ go.mod | 3 +- go.sum | 2 - diode.go => main.go | 116 +++++++++++----------- utility/application.go | 119 ++++++++++++++++++----- utility/content.go | 96 ------------------- utility/republisher.go | 151 +++++++++++++++++++++++++++++ utility/republisher_test.go | 21 ++++ utility/stream.go | 184 ----------------------------------- 13 files changed, 359 insertions(+), 581 deletions(-) delete mode 100644 diode_test.go rename diode.go => main.go (61%) delete mode 100644 utility/content.go create mode 100644 utility/republisher.go create mode 100644 utility/republisher_test.go delete mode 100644 utility/stream.go diff --git a/.gitignore b/.gitignore index afaf226..ab72383 100644 --- a/.gitignore +++ b/.gitignore @@ -22,3 +22,6 @@ go.work # Input source → experimental results *.txt + +# Output binary +diode diff --git a/Makefile b/Makefile index c142eb8..4eccf1e 100644 --- a/Makefile +++ b/Makefile @@ -1,11 +1,19 @@ +BIN_NAME=diode +BIN_VERSION=0.1.0 +BIN_DATE=$(shell date +%FT%T%z) + all: build build: - go build -o diode -ldflags="-X main.SemVer=0.0.9" diode.go + go build -o ${BIN_NAME} -ldflags="-X 'main.SemVer=${BIN_VERSION}' -X 'main.BuildInfo=${BIN_DATE}'" test: go test -v ./... -run: - go run diode.go +run: build + ./${BIN_NAME} --help + +clean: + go clean + rm ${BIN_NAME} diff --git a/README.md b/README.md index b83d793..822790f 100644 --- a/README.md +++ b/README.md @@ -23,30 +23,27 @@ Scripts for verifying TCP passthrough functionality. ```zsh . ├── config -├── diode.go -├── diode_test.go ├── docker-compose.yaml ├── Dockerfile ├── docs ├── go.mod ├── go.sum ├── insights +├── main.go ├── Makefile ├── README.md ├── sample └── utility -5 directories, 8 files +5 directories, 7 files ``` -#### Architecture Diagram (WIP) +#### Architecture Diagram ```mermaid graph LR - A("Publish (MQTT)") --> B(TCP Client) - B --> C(Data Diode) - C --> D(TCP Server) - D --> E("Subscribe (MQTT)") + A("Subscribe (MQTT)") -->|TCP Client|B(Data Diode) -->|TCP Server|C("Publish (MQTT)") + ``` > [!NOTE] diff --git a/config/settings.yaml b/config/settings.yaml index 932fda2..4360841 100644 --- a/config/settings.yaml +++ b/config/settings.yaml @@ -1,13 +1,20 @@ -# Data Diode Settings -input: - ip: "localhost" - port: 49152 - timeout: 60 # seconds -output: - ip: "localhost" - port: 13337 - tls: false -broker: - server: "localhost" - port: 1883 - topic: "diode/telemetry" +# Project Settings +diode: + input: + ip: "localhost" + port: 49152 + timeout: 60 # seconds + output: + ip: "localhost" + port: 13337 + tls: false +mqtt: + inside: + server: "localhost" + port: 1883 + topic: "#" + outside: + server: "localhost" + port: 1883 + prefix: "diode" + # Avoid namespace collisions diff --git a/diode_test.go b/diode_test.go deleted file mode 100644 index 4fd0df2..0000000 --- a/diode_test.go +++ /dev/null @@ -1,185 +0,0 @@ -package main - -import ( - "bytes" - "fmt" - "net" - "os" - "os/exec" - "path/filepath" - "strings" - "testing" - - insights "github.com/acep-uaf/data-diode/insights" - "github.com/acep-uaf/data-diode/utility" -) - -var ( - BackupConfiguration = "config/B4-0144-355112.json" - SystemSettings = "config/settings.yaml" - ProjectDocumentation = "docs/SOP.md" - FileChecksum = "477076c6fd8cf48ff2d0159b22bada27588c6fa84918d1c4fc20cd9ddd291dbd" - SampleMessage = "Hello, world." - InterfaceSize = 1024 - InterfaceProtocol = "tcp" - InterfaceAddress = "localhost:13337" -) - -type TCP struct { - ClientTargetIP string - ClientTargetPort int - ServerTargetIP string - ServerPort int - ServerSocketTimeout int -} - -func TestAPI(t *testing.T) { - jsonFile, err := os.Open(BackupConfiguration) - - schema := "CAMIO.2024.1.0" - version := filepath.Base(jsonFile.Name()) - - // FIXME: Cross reference the JSON contents, schema version, & configuration file? - fmt.Println(version, schema) - - if err != nil { - t.Errorf("[?] %s via %s", err, jsonFile.Name()) - } -} - -func TestCLI(t *testing.T) { - binary := exec.Command("go", "build", "-o", "diode") - buildErr := binary.Run() - if buildErr != nil { - t.Fatalf("[!] Failed to build CLI binary: %v", buildErr) - } - - cmd := exec.Command("./diode") - var stdout, stderr bytes.Buffer - cmd.Stdout = &stdout - cmd.Stderr = &stderr - - err := cmd.Run() - if err != nil { - t.Fatalf("[!] Failed to execute CLI command: %v", err) - } - - expectation := "diode: try 'diode --help' for more information" - reality := strings.TrimSpace(stdout.String()) - if reality != expectation { - t.Errorf("[?] Expected output: %q, but got: %q", expectation, reality) - } - - if stderr.Len() > 0 { - t.Errorf("[?] Unexpected error output: %q", stderr.String()) - } -} - -func TestConfiguration(t *testing.T) { - _, err := os.Stat(SystemSettings) - if os.IsNotExist(err) { - t.Errorf("[!] config.yaml does not exist") - } -} - -func TestFileContents(t *testing.T) { - got := fmt.Sprintf("%x", insights.Checksum()) - want := FileChecksum - - if got != want { - t.Errorf(">> got %q, want %q", got, want) - } -} - -func TestBinaryContents(t *testing.T) { - // TODO: Implement the following: - // - Craft a text message containing binary data + checksum. - // - Ensure transmission across data diode without corrupted information. - // - Check for uuenconding and base64 encoding / delimiters. - - sample := []byte(SampleMessage) - - if len(sample) == 0 { - t.Errorf("[!] No binary contents...") - } -} - -func TestEchoMessage(t *testing.T) { - go func() { - listener, err := net.Listen(InterfaceProtocol, InterfaceAddress) - if err != nil { - t.Errorf("[!] Failed to start TCP server: %v", err) - } - defer listener.Close() - - conn, err := listener.Accept() - if err != nil { - t.Errorf("[!] Failed to accept connection: %v", err) - } - defer conn.Close() - - buf := make([]byte, InterfaceSize) - n, err := conn.Read(buf) - if err != nil { - t.Errorf("[!] Failed to read message: %v", err) - } - - _, err = conn.Write(buf[:n]) - if err != nil { - t.Errorf("[!] Failed to write message: %v", err) - } - }() - - // TODO: Mock the TCP client/server to simulate the transmission of data. - - conn, err := net.Dial(InterfaceProtocol, InterfaceAddress) - if err != nil { - t.Fatalf("[!] Failed to connect to TCP server: %v", err) - } - defer conn.Close() - - message := SampleMessage - _, err = conn.Write([]byte(message)) - if err != nil { - t.Fatalf("[!] Failed to send message: %v", err) - } - - buf := make([]byte, len(message)) - n, err := conn.Read(buf) - if err != nil { - t.Fatalf("[!] Failed to read echoed message: %v", err) - } - - match := string(buf[:n]) - if match != message { - t.Errorf("[!] Echoed message does not match original message: got %q, want %q", match, message) - } -} - -func TestRepublishContents(t *testing.T) { - location := ProjectDocumentation - broker := "localhost" - topic := "test/message" - port := 1883 - - // TODO: Mock the MQTT connection. - - utility.RepublishContents(location, broker, topic, port) - - if len(location) == 0 { - t.Errorf("[!] No location specified...") - } - - if len(broker) == 0 { - t.Errorf("[!] No broker specified...") - } - - if len(topic) == 0 { - t.Errorf("[!] No topic specified...") - } - - if port == 0 { - t.Errorf("[!] No port specified...") - } -} - diff --git a/go.mod b/go.mod index ef6ea9b..144eeaa 100644 --- a/go.mod +++ b/go.mod @@ -1,10 +1,9 @@ module github.com/acep-uaf/data-diode -go 1.22.1 +go 1.22 require ( github.com/eclipse/paho.mqtt.golang v1.4.3 - github.com/google/uuid v1.6.0 github.com/olekukonko/tablewriter v0.0.5 github.com/urfave/cli/v2 v2.27.1 gopkg.in/yaml.v2 v2.4.0 diff --git a/go.sum b/go.sum index 599af3b..13d501e 100644 --- a/go.sum +++ b/go.sum @@ -2,8 +2,6 @@ github.com/cpuguy83/go-md2man/v2 v2.0.4 h1:wfIWP927BUkWJb2NmU/kNDYIBTh/ziUX91+lV github.com/cpuguy83/go-md2man/v2 v2.0.4/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= github.com/eclipse/paho.mqtt.golang v1.4.3 h1:2kwcUGn8seMUfWndX0hGbvH8r7crgcJguQNCyp70xik= github.com/eclipse/paho.mqtt.golang v1.4.3/go.mod h1:CSYvoAlsMkhYOXh/oKyxa8EcBci6dVkLCbo5tTC1RIE= -github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= -github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/gorilla/websocket v1.5.1 h1:gmztn0JnHVt9JZquRuzLw3g4wouNVzKL15iLr/zn/QY= github.com/gorilla/websocket v1.5.1/go.mod h1:x3kM2JMyaluk02fnUJpQuwD2dCS5NDG2ZHL0uE0tcaY= github.com/mattn/go-runewidth v0.0.9/go.mod h1:H031xJmbD/WCDINGzjvQ9THkh0rPKHF+m2gUSrubnMI= diff --git a/diode.go b/main.go similarity index 61% rename from diode.go rename to main.go index daa5d4f..2278c9c 100644 --- a/diode.go +++ b/main.go @@ -11,7 +11,6 @@ import ( "fmt" "log" "os" - "time" analysis "github.com/acep-uaf/data-diode/insights" utility "github.com/acep-uaf/data-diode/utility" @@ -21,57 +20,35 @@ import ( var ( SemVer string + BuildInfo string ConfigSettings = "config/settings.yaml" InputTextFile = "docs/example.txt" ) type Configuration struct { - Input struct { - IP string - Port int - Timeout int + Diode struct { + Input struct { + IP string + Port int + Timeout int + } + Output struct { + IP string + Port int + TLS bool + } } - Output struct { - IP string - Port int - TLS bool - } - Broker struct { - Server string - Port int - Topic string - } -} - -func exampleContents(location string) { - sample := utility.ReadLineContent(location) - utility.PrintFileContent(sample) - utility.OutputStatistics(sample) -} - -func republishContents(location string, mqttBrokerIP string, mqttBrokerTopic string, mqttBrokerPort int) { - fileContent := utility.ReadLineContent(location) - - fmt.Println(">> Server: ", mqttBrokerIP) - fmt.Println(">> Topic: ", mqttBrokerTopic) - fmt.Println(">> Port: ", mqttBrokerPort) - - start := time.Now() - - for i := 1; i <= len(fileContent.Lines); i++ { - utility.Observability(mqttBrokerIP, mqttBrokerPort, mqttBrokerTopic, fileContent.Lines[i]) - } - - t := time.Now() - - elapsed := t.Sub(start) - - if len(fileContent.Lines) == 0 { - fmt.Println(">> No message content sent.") - } else if len(fileContent.Lines) == 1 { - fmt.Println(">> Sent message from ", location, " to topic: ", mqttBrokerTopic, " in ", elapsed) - } else { - fmt.Println(">> Sent ", len(fileContent.Lines), " messages from ", location, " to topic: ", mqttBrokerTopic, " in ", elapsed) + MQTT struct { + Inside struct { + Server string + Port int + Topic string + } + Outside struct { + Server string + Port int + Prefix string + } } } @@ -90,14 +67,21 @@ func main() { // Configuration Settings - diodeInputSideIP := config.Input.IP - diodePassthroughPort := config.Input.Port - targetServerIP := config.Output.IP - targetServerPort := config.Output.Port + diodeInputSideIP := config.Diode.Input.IP + diodePassthroughPort := config.Diode.Input.Port + clientLocation := fmt.Sprintf("%s:%d", diodeInputSideIP, diodePassthroughPort) + + targetServerIP := config.Diode.Output.IP + targetServerPort := config.Diode.Output.Port + serverLocation := fmt.Sprintf("%s:%d", targetServerIP, targetServerPort) - mqttBrokerIP := config.Broker.Server - mqttBrokerPort := config.Broker.Port - mqttBrokerTopic := config.Broker.Topic + subBrokerIP := config.MQTT.Inside.Server + subBrokerPort := config.MQTT.Inside.Port + subBrokerTopic := config.MQTT.Inside.Topic + + pubBrokerIP := config.MQTT.Outside.Server + pubBrokerPort := config.MQTT.Outside.Port + pubBrokerTopic := config.MQTT.Outside.Prefix app := &cli.App{ Name: "diode", @@ -137,7 +121,7 @@ func main() { Usage: "Testing state synchronization via diode I/O", Action: func(tCtx *cli.Context) error { fmt.Println("----- TEST -----") - utility.RepublishContents(InputTextFile, mqttBrokerIP, mqttBrokerTopic, mqttBrokerPort) + analysis.Pong() return nil }, }, @@ -157,17 +141,24 @@ func main() { Usage: "System benchmark analysis + report performance metrics", Action: func(bCtx *cli.Context) error { fmt.Println("----- BENCHMARKS -----") - analysis.Pong() return nil }, }, { - Name: "mqtt", - Aliases: []string{"m"}, - Usage: "MQTT → TCP stream demo", - Action: func(mCtx *cli.Context) error { - fmt.Println("----- MQTT -----") - utility.Subscription(mqttBrokerIP, mqttBrokerPort, mqttBrokerTopic, targetServerIP, targetServerPort) + Name: "mqtt-subscribe", + Aliases: []string{"ms"}, + Usage: "Recieve payload, encapsulate message, & stream to diode", + Action: func(msCtx *cli.Context) error { + utility.InboundMessageFlow(subBrokerIP, subBrokerPort, subBrokerTopic, clientLocation) + return nil + }, + }, + { + Name: "mqtt-publish", + Aliases: []string{"mp"}, + Usage: "Detect complete message, decode, & republish the payload", + Action: func(mpCtx *cli.Context) error { + utility.OutboundMessageFlow(pubBrokerIP, pubBrokerPort, pubBrokerTopic, serverLocation) return nil }, }, @@ -176,7 +167,8 @@ func main() { Aliases: []string{"v"}, Usage: "Print the version of the diode CLI", Action: func(vCtx *cli.Context) error { - fmt.Println(">> diode version " + SemVer) + fmt.Println(">> diode version:", SemVer) + fmt.Println(">> build information: ", BuildInfo) return nil }, }, diff --git a/utility/application.go b/utility/application.go index 95ceb92..598afde 100644 --- a/utility/application.go +++ b/utility/application.go @@ -2,7 +2,7 @@ package utility import ( "fmt" - "math/rand" + "log" "net" "time" ) @@ -10,16 +10,44 @@ import ( const ( ACKNOWLEDGEMENT = "OK\r\n" CONN_TYPE = "tcp" - MAX_ATTEMPTS = 42 + MAX_ATTEMPTS = 2 CHUNK_SIZE = 1460 // ? Characters SAMPLE = 10240 // 10 Kbytes ) +func SendMessage(input string, client string) { + conn, err := net.Dial(CONN_TYPE, client) + if err != nil { + log.Fatalf(">> [!] Error connecting to diode client: %v", err) + } + defer conn.Close() + + for index := 0; index < len(input); index += CHUNK_SIZE { + chunk := input[index:min(index+CHUNK_SIZE, len(input))] + + _, err := conn.Write([]byte(chunk)) + if err != nil { + log.Fatalf(">> [!] Error sending data: %v", err) + } + + response := make([]byte, len(ACKNOWLEDGEMENT)) + _, err = conn.Read(response) + if err != nil { + log.Fatalf(">> [!] Error receiving ACK: %v", err) + } + + if string(response) != ACKNOWLEDGEMENT { + log.Fatalf(">> [?] Invalid ACK received.") + } + + fmt.Println(chunk) + } +} + func StartPlaceholderClient(host string, port int) { - upperBound := rand.Intn(1) + 1 - for i := 1; i <= upperBound; i++ { - fmt.Printf(">> [%d of %d] Dialing host %s on port %d via %s...\n", i, upperBound, host, port, CONN_TYPE) + for i := 1; i <= MAX_ATTEMPTS; i++ { + fmt.Printf(">> [%d of %d] Dialing host %s on port %d via %s...\n", i, MAX_ATTEMPTS, host, port, CONN_TYPE) } conn, err := net.Dial(CONN_TYPE, fmt.Sprintf("%s:%d", host, port)) @@ -32,6 +60,20 @@ func StartPlaceholderClient(host string, port int) { message := "The quick brown fox jumps over the lazy dog.\n" + ProcessMessage(message, conn) + + buffer := make([]byte, SAMPLE) + + bytesRead, err := conn.Read(buffer) + if err != nil { + fmt.Println(">> [!] Error reading response: ", err.Error()) + return + } + + fmt.Printf(">> Server response: %s\n", string(buffer[:bytesRead])) +} + +func ProcessMessage(message string, conn net.Conn) bool { for try := 1; try <= MAX_ATTEMPTS; try++ { if len(message) > CHUNK_SIZE { index := 0 @@ -39,28 +81,22 @@ func StartPlaceholderClient(host string, port int) { for index < len(message) { chunk := message[index : index+CHUNK_SIZE] - // I. Send Chunk - _, err := conn.Write([]byte(chunk)) if err != nil { fmt.Println(">> [!] Error sending data: ", err) - return + return true } - // II. Wait for ACK - response := make([]byte, 4) _, err = conn.Read(response) if err != nil { fmt.Println(">> [!] Error receiving ACK: ", err) - return + return true } - // III. Diode Response - if string(response) != ACKNOWLEDGEMENT { fmt.Println(">> [?] Invalid ACK received.") - return + return true } fmt.Printf(">> Successfully sent message to diode: %s\n", chunk) @@ -71,18 +107,7 @@ func StartPlaceholderClient(host string, port int) { time.Sleep(1 * time.Second) } - - _, err = conn.Write([]byte(message)) - - buffer := make([]byte, SAMPLE) - - bytesRead, err := conn.Read(buffer) - if err != nil { - fmt.Println(">> [!] Error reading response: ", err.Error()) - return - } - - fmt.Printf(">> Server response: %s\n", string(buffer[:bytesRead])) + return false } func StartPlaceholderServer(host string, port int) { @@ -118,6 +143,7 @@ func StartPlaceholderServer(host string, port int) { fmt.Println(">> Connection closed by client.") return } + return } fmt.Printf(">> Received data: %s\n", string(data[:bytesRead])) @@ -131,3 +157,44 @@ func StartPlaceholderServer(host string, port int) { }(conn) } } + +func RecieveMessage(destination string, messages chan<- string) error { + server, err := net.Listen("tcp", destination) + if err != nil { + fmt.Println(">> [!] Error connecting to diode: ", err) + return err + } + defer server.Close() + + for { + conn, err := server.Accept() + if err != nil { + fmt.Println(">> [!] Error accepting connection: ", err) + continue + } + + go func(conn net.Conn) { + message, err := connectionHandler(conn) + if err != nil { + fmt.Println(">> [!] Error handling connection: ", err) + return + } + + messages <- message + }(conn) + } +} + +func connectionHandler(conn net.Conn) (string, error) { + defer conn.Close() + + buffer := make([]byte, SAMPLE) + + bytesRead, err := conn.Read(buffer) + if err != nil { + fmt.Println(">> [!] Error reading data: ", err) + return "", err + } + + return string(buffer[:bytesRead]), nil +} diff --git a/utility/content.go b/utility/content.go deleted file mode 100644 index 79f9a08..0000000 --- a/utility/content.go +++ /dev/null @@ -1,96 +0,0 @@ -package utility - -import ( - "bufio" - "fmt" - "log" - "os" - "time" -) - -type FileContent struct { - Lines map[int]string -} - -type Readability struct { - Words int - Characters int - Paragraphs int - Sentences int -} - -func ReadLineContent(location string) FileContent { - file, err := os.Open(location) - if err != nil { - log.Fatal(err) - } - defer file.Close() - - lines := make(map[int]string) - - scanner := bufio.NewScanner(file) - - lineNumber := 1 - - for scanner.Scan() { - lineContent := scanner.Text() - lines[lineNumber] = lineContent - lineNumber++ - } - - if err := scanner.Err(); err != nil { - log.Fatal(err) - } - - return FileContent{Lines: lines} -} - -func ExampleContents(location string) { - sample := ReadLineContent(location) - PrintFileContent(sample) - OutputStatistics(sample) -} - -func RepublishContents(location string, mqttBrokerIP string, mqttBrokerTopic string, mqttBrokerPort int) error { - if _, err := os.Stat(location); os.IsNotExist(err) { - fmt.Println(">> File not found: ", location) - return err - } - - fileContent := ReadLineContent(location) - - fmt.Println(">> Server: ", mqttBrokerIP) - fmt.Println(">> Topic: ", mqttBrokerTopic) - fmt.Println(">> Port: ", mqttBrokerPort) - - start := time.Now() - - for i := 1; i <= len(fileContent.Lines); i++ { - Observability(mqttBrokerIP, mqttBrokerPort, mqttBrokerTopic, fileContent.Lines[i]) - } - - t := time.Now() - - elapsed := t.Sub(start) - - if len(fileContent.Lines) == 0 { - fmt.Println(">> No message content sent.") - } else if len(fileContent.Lines) == 1 { - fmt.Println(">> Sent message from ", location, " to topic: ", mqttBrokerTopic, " in ", elapsed) - } else { - fmt.Println(">> Sent ", len(fileContent.Lines), " messages from ", location, " to topic: ", mqttBrokerTopic, " in ", elapsed) - } - - return nil -} - -func OutputStatistics(content FileContent) { - // ? Contextual information about the file content. - fmt.Println(">> Number of lines: ", len(content.Lines)) -} - -func PrintFileContent(content FileContent) { - for i := 1; i <= len(content.Lines); i++ { - fmt.Println(">> ", content.Lines[i]) - } -} diff --git a/utility/republisher.go b/utility/republisher.go new file mode 100644 index 0000000..c2189c7 --- /dev/null +++ b/utility/republisher.go @@ -0,0 +1,151 @@ +package utility + +import ( + "crypto/md5" + "encoding/base64" + "encoding/json" + "fmt" + "log" + "os" + "os/signal" + "time" + + mqtt "github.com/eclipse/paho.mqtt.golang" +) + +type InputDiodeMessage struct { + Time int `json:"time"` + Topic string `json:"topic"` + B64Payload string `json:"b64payload"` +} + +type OutputDiodeMessage struct { + Time int `json:"time"` + Topic string `json:"topic"` + B64Payload string `json:"b64payload"` + Payload string `json:"payload"` + Length int `json:"length"` + Checksum string `json:"checksum"` +} + +func InboundMessageFlow(server string, port int, topic string, arrival string) { + location := fmt.Sprintf("tcp://%s:%d", server, port) + opts := mqtt.NewClientOptions().AddBroker(location).SetClientID("in_rec_msg") + + client := mqtt.NewClient(opts) + if token := client.Connect(); token.Wait() && token.Error() != nil { + fmt.Println(">> [!] Failed to connect to the broker: ", token.Error()) + } + + handleMessage := func(client mqtt.Client, msg mqtt.Message) { + contents := DetectContents(string(msg.Payload()), msg.Topic()) + SendMessage(contents, arrival) + } + + // Subscription (Topic) + if token := client.Subscribe(topic, 0, handleMessage); token.Wait() && token.Error() != nil { + if token.Error() != nil { + fmt.Println(">> [!] Error subscribing to the topic: ", token.Error()) + } + } + + // Client Shutdown (SIGINT) + c := make(chan os.Signal, 1) + signal.Notify(c, os.Interrupt) + <-c +} + +func OutboundMessageFlow(server string, port int, topic string, destination string) { + messages := make(chan string) + go func() { + err := RecieveMessage(destination, messages) + if err != nil { + fmt.Println(err) + return + } + }() + + for message := range messages { + repackaged := RepackageContents(message, topic) + PublishPayload(server, port, topic, repackaged) + } +} + +func DetectContents(message string, topic string) string { + complete := InputDiodeMessage{ + Time: int(MakeTimestamp()), + Topic: topic, + B64Payload: EncapsulatePayload(message), + } + + jsonPackage, err := json.Marshal(complete) + if err != nil { + log.Fatalf(">> [!] Error marshalling the incoming message: %v", err) + } + + return string(jsonPackage) +} + +func RepackageContents(message string, topic string) string { + var intermediary OutputDiodeMessage + err := json.Unmarshal([]byte(message), &intermediary) + if err != nil { + log.Fatalf(">> [!] Error unmarshalling the message: %v", err) + } + + // Diode Metadata + intermediary.Time = int(MakeTimestamp()) + intermediary.Topic = topic + intermediary.Payload = UnencapsulatePayload(intermediary.B64Payload) + intermediary.Length = len(intermediary.Payload) + intermediary.Checksum = Verification(intermediary.Payload) + + // Process Contents + jsonIntermediary, err := json.Marshal(intermediary) + if err != nil { + log.Fatalf(">> [!] Error marshalling the outgoing message: %v", err) + } + + fmt.Println(string(jsonIntermediary)) + + return string(intermediary.Payload) +} + +func EncapsulatePayload(message string) string { + encoded := base64.StdEncoding.EncodeToString([]byte(message)) + return encoded +} + +func UnencapsulatePayload(message string) string { + // TODO: Test case(s) for various message lengths and content. + + decoded, err := base64.StdEncoding.DecodeString(message) + if err != nil { + fmt.Println(">> [!] Error decoding the message: ", err) + } + return string(decoded) +} + +func PublishPayload(server string, port int, topic string, message string) { + location := fmt.Sprintf("tcp://%s:%d", server, port) + opts := mqtt.NewClientOptions().AddBroker(location).SetClientID("out_rev_string") + + client := mqtt.NewClient(opts) + if token := client.Connect(); token.Wait() && token.Error() != nil { + fmt.Println(">> [!] Failed to connect to the broker: ", token.Error()) + } + + if token := client.Publish(topic, 0, false, message); token.Wait() && token.Error() != nil { + fmt.Println(">> [!] Error publishing the message: ", token.Error()) + } +} + +func MakeTimestamp() int64 { + return time.Now().UnixMilli() +} + +func Verification(data string) string { + hash := md5.New() + hash.Write([]byte(data)) + return fmt.Sprintf("%x", hash.Sum(nil)) +} diff --git a/utility/republisher_test.go b/utility/republisher_test.go new file mode 100644 index 0000000..01e7efd --- /dev/null +++ b/utility/republisher_test.go @@ -0,0 +1,21 @@ +package utility + +import "testing" + +func TestSubscribe(t *testing.T) { + got := "pub" + want := "sub" + + if got != want { + t.Errorf("got %q, want %q", got, want) + } +} + +func TestPublish(t *testing.T) { + got := "sub" + want := "pub" + + if got != want { + t.Errorf("got %q, want %q", got, want) + } +} diff --git a/utility/stream.go b/utility/stream.go deleted file mode 100644 index 11d41de..0000000 --- a/utility/stream.go +++ /dev/null @@ -1,184 +0,0 @@ -package utility - -import ( - "crypto/md5" - "encoding/json" - "fmt" - "log" - "net" - "os" - "os/signal" - "sync" - "time" - - mqtt "github.com/eclipse/paho.mqtt.golang" - "github.com/google/uuid" -) - -type Message struct { - Index int - Topic string - Payload string - Checksum string - UUID string - Timestamp time.Time -} - -var ( - counterMutex sync.Mutex - messageCounter int -) - -func Craft(topic, payload string) Message { - counterMutex.Lock() - defer counterMutex.Unlock() - - uuid := uuid.New().String() - - // TODO: Independent of the topic, the message counter should be incremented? - messageCounter++ - - return Message{ - Index: messageCounter, - Topic: topic, - Payload: payload, - Checksum: Verification(payload), - UUID: uuid, - Timestamp: time.Now(), - } -} - -func Observability(server string, port int, topic string, message string) error { - broker := fmt.Sprintf("tcp://%s:%d", server, port) - clientID := "go_mqtt_client" - - opts := mqtt.NewClientOptions().AddBroker(broker).SetClientID(clientID) - client := mqtt.NewClient(opts) - - if token := client.Connect(); token.Wait() && token.Error() != nil { - return fmt.Errorf(">> Failed to connect to the broker: %v", token.Error()) - } - - defer client.Disconnect(250) // ms - - sample := Craft(topic, message) - - jsonPackage, err := json.Marshal(sample) - if err != nil { - return fmt.Errorf(">> Failed to marshal the message: %v", err) - } - - token := client.Publish(topic, 0, false, jsonPackage) - token.Wait() - if token.Error() != nil { - return fmt.Errorf(">> Failed to publish the message: %v", token.Error()) - } - - return nil -} - -func Republisher(server string, port int, topic string, message string) { - fmt.Println(">> MQTT") - fmt.Println(">> Broker: ", server) - fmt.Println(">> Port: ", port) - - // Source: https://github.com/eclipse/paho.mqtt.golang/blob/master/cmd/simple/main.go - var example mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) { - fmt.Printf(">> Topic: %s\n", msg.Topic()) - fmt.Printf(">> Message: %s\n", msg.Payload()) - } - - mqtt.DEBUG = log.New(os.Stdout, "", 0) - mqtt.ERROR = log.New(os.Stdout, "", 0) - - // Initial Connection - opts := mqtt.NewClientOptions().AddBroker(fmt.Sprintf("tcp://%s:%d", server, port)) - opts.SetKeepAlive(2 * time.Second) - opts.SetDefaultPublishHandler(example) - opts.SetPingTimeout(1 * time.Second) - - // Create and start a client using the above ClientOptions - client := mqtt.NewClient(opts) - if token := client.Connect(); token.Wait() && token.Error() != nil { - panic(token.Error()) - } - - // Subscribe to a topic - if token := client.Subscribe(topic, 0, nil); token.Wait() && token.Error() != nil { - fmt.Println(token.Error()) - os.Exit(1) - } - - // Publish to a topic - token := client.Publish(topic, 0, false, message) - token.Wait() - - time.Sleep(6 * time.Second) - - // Disconnect from the broker - if token := client.Unsubscribe(topic); token.Wait() && token.Error() != nil { - fmt.Println(token.Error()) - os.Exit(1) - } - - client.Disconnect(250) - - time.Sleep(1 * time.Second) - -} - -func Subscription(server string, port int, topic string, host string, destination int) { - fmt.Println(">> Example Broker Activity") - fmt.Println(">> Broker: ", server) - fmt.Println(">> Port: ", port) - - // MQTT Broker / Client - url := fmt.Sprintf("tcp://%s:%d", server, port) - opts := mqtt.NewClientOptions().AddBroker(url) - client := mqtt.NewClient(opts) - - if token := client.Connect(); token.Wait() && token.Error() != nil { - fmt.Println(">> [!] Failed to connect to the broker: ", token.Error()) - } - - // Callback Function (Incoming Messages) - handleMessage := func(client mqtt.Client, msg mqtt.Message) { - fmt.Printf(">> Received message on topic: '%s': %s\n", msg.Topic(), msg.Payload()) - - // Connection Establishment (Target Host) - conn, err := net.Dial("tcp", fmt.Sprintf("%s:%d", host, destination)) - if err != nil { - fmt.Println(">> [!] Error connecting to the target host: ", err) - return - } - defer conn.Close() - - // Data Transmission - _, err = conn.Write(msg.Payload()) - if err != nil { - fmt.Println(">> [!] Error writing to the target host: ", err) - return - } - } - - // Subscription (Topic) - if token := client.Subscribe(topic, 0, handleMessage); token.Wait() && token.Error() != nil { - if token.Error() != nil { - fmt.Println(">> [!] Error subscribing to the topic: ", token.Error()) - } - } - - // Client Shutdown (SIGINT) - c := make(chan os.Signal, 1) - signal.Notify(c, os.Interrupt) - <-c - - client.Unsubscribe(topic) - client.Disconnect(250) // ms -} - -func Verification(data string) string { - hash := md5.New() - hash.Write([]byte(data)) - return fmt.Sprintf("%x", hash.Sum(nil)) -}