Skip to content

Commit

Permalink
Merge pull request #92 from devanbenz/feat-77-comment
Browse files Browse the repository at this point in the history
feat/add-comment
  • Loading branch information
lazyguru authored Jun 6, 2024
2 parents d12bb7a + 279f410 commit 1072ff2
Show file tree
Hide file tree
Showing 10 changed files with 256 additions and 55 deletions.
33 changes: 33 additions & 0 deletions docs/json-schema/comment.schema.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
{
"$id": "https://sublinks.org/comment.schema.json",
"title": "Comment",
"description": "A comment in a Sublinks Post attributable to an author.",
"type": "object",
"properties": {
"id": {
"description": "The unique identifier for the comment.",
"type": "string"
},
"post_id": {
"description": "The unique identifier of Post comment is for.",
"type": "string"
},
"content": {
"description": "The content of the comment.",
"type": "string"
},
"author_id": {
"description": "The unique identifier for the actor that is the author of the comment. I.E. discuss.online/u/lazyguru",
"type": "string"
},
"published": {
"description": "The date and time the comment was published.",
"type": "string"
},
"nsfw": {
"description": "Whether the comment has sensitive (NSFW) content or not.",
"type": "boolean"
}
},
"required": ["id", "post", "content", "author_id", "published"]
}
5 changes: 5 additions & 0 deletions internal/activitypub/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,8 @@ type Link struct {
Type string `json:"type"` // "Link" | "Image"
Href string `json:"href"` // "https://enterprise.lemmy.ml/pictrs/image/eOtYb9iEiB.png"
}

type Language struct {
Identifier string `json:"identifier"` // "fr",
Name string `json:"name"` // "Français"
}
60 changes: 60 additions & 0 deletions internal/activitypub/note.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package activitypub

import (
"fmt"
"sublinks/sublinks-federation/internal/model"
"time"
)

type Note struct {
Context *Context `json:"@context,omitempty"`
Id string `json:"id"`
Type string `json:"type"`
AttributedTo string `json:"attributedTo"`
To []string `json:"to"`
Cc []string `json:"cc"`
Audience string `json:"audience"`
InReplyTo string `json:"inReplyTo"`
Content string `json:"content"`
MediaType string `json:"mediaType"`
Source Source `json:"source,omitempty"`
Tag []Tag `json:"tag,omitempty"`
Distinguished bool `json:"distinguished,omitempty"`
Language Language `json:"language,omitempty"`
Published time.Time `json:"published"`
Updated time.Time `json:"updated"`
}

type Tag struct {
Href string `json:"href"`
Type string `json:"type"`
Name string `json:"name"`
}

func NewNote(commentUrl string, fromUser string, postUrl string, commentBody string, published time.Time) *Note {
return &Note{
Id: commentUrl,
Type: "Note",
AttributedTo: fromUser,
To: []string{"https://www.w3.org/ns/activitystreams#Public"},
Cc: []string{fromUser, commentUrl},
Audience: commentUrl,
InReplyTo: postUrl,
Content: commentBody,
MediaType: "text/html",
Source: Source{
Content: fmt.Sprintf("This is a comment on %s post", postUrl),
MediaType: "text/markdown",
},
Language: Language{
Identifier: "en",
Name: "English",
},
Distinguished: false,
Published: published,
}
}

func ConvertCommentToNote(c *model.Comment) *Note {
return NewNote(c.UrlStub, c.Author, c.Post, c.Content, c.Published)
}
5 changes: 0 additions & 5 deletions internal/activitypub/post.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,6 @@ import (
"time"
)

type Language struct {
Identifier string `json:"identifier"` // "fr",
Name string `json:"name"` // "Français"
}

type Page struct {
Context *Context `json:"@context,omitempty"`
Id string `json:"id"`
Expand Down
34 changes: 34 additions & 0 deletions internal/http/comment.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package http

import (
"encoding/json"
"fmt"
"net/http"
"sublinks/sublinks-federation/internal/activitypub"
"sublinks/sublinks-federation/internal/model"

"github.com/gorilla/mux"
)

func (server *Server) SetupCommentRoutes() {
server.Router.HandleFunc("/comment/{commentId}", server.getCommentHandler).Methods("GET")
}

func (server *Server) getCommentHandler(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
comment := model.Comment{UrlStub: vars["commentId"]}
err := server.Database.Find(&comment)
if err != nil {
server.Logger.Error(fmt.Sprintf("Error reading comment: %+v %s", comment, err), err)
return
}
commentLd := activitypub.ConvertCommentToNote(&comment)
commentLd.Context = activitypub.GetContext()
w.WriteHeader(http.StatusOK)
w.Header().Add("content-type", "application/activity+json")
content, _ := json.MarshalIndent(commentLd, "", " ")
_, err = w.Write(content)
if err != nil {
server.Logger.Error("Error writing response", err)
}
}
13 changes: 13 additions & 0 deletions internal/model/comment.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package model

import "time"

type Comment struct {
Id string `json:"id" gorm:"primary_key"`
UrlStub string `json:"url_stub"`
Post string `json:"post_id"`
Author string `json:"author_id"`
Nsfw bool `json:"nsfw"`
Published time.Time `json:"published"`
Content string `json:"content"`
}
67 changes: 43 additions & 24 deletions internal/queue/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,17 @@ package queue

import (
"errors"
"fmt"
"golang.org/x/sync/errgroup"
"sublinks/sublinks-federation/internal/worker"
)

type ConsumerQueue struct {
Exchange string
QueueName string
RoutingKeys map[string]worker.Worker
}

func (q *RabbitQueue) createConsumer(queueData ConsumerQueue) error {
channelRabbitMQ, err := q.Connection.Channel()
if err != nil {
Expand All @@ -14,14 +22,17 @@ func (q *RabbitQueue) createConsumer(queueData ConsumerQueue) error {
if err != nil {
return err
}
err = channelRabbitMQ.QueueBind(
queueData.QueueName, // queue name
queueData.RoutingKey, // routing key
queueData.Exchange, // exchange
false,
nil)
if err != nil {
return err

for routingKey, _ := range queueData.RoutingKeys {
err = channelRabbitMQ.QueueBind(
queueData.QueueName, // queue name
routingKey, // routing key
queueData.Exchange, // exchange
false,
nil)
if err != nil {
return err
}
}

// Subscribing to QueueService1 for getting messages.
Expand All @@ -41,14 +52,7 @@ func (q *RabbitQueue) createConsumer(queueData ConsumerQueue) error {
return nil
}

type ConsumerQueue struct {
Exchange string
QueueName string
RoutingKey string
}

// TODO: Implement a way to either pass a callback function or return messages/chan
func (q *RabbitQueue) StartConsumer(queueData ConsumerQueue, worker worker.Worker) error {
func (q *RabbitQueue) StartConsumer(queueData ConsumerQueue) error {
err := q.createConsumer(queueData)
if err != nil {
return err
Expand All @@ -57,21 +61,36 @@ func (q *RabbitQueue) StartConsumer(queueData ConsumerQueue, worker worker.Worke
if !ok {
return errors.New("consumer not found")
}
go func() {
for message := range messages {
err := worker.Process(message.Body)

errGroup := new(errgroup.Group)
for message := range messages {
errGroup.Go(func() error {
cbWorker, ok := queueData.RoutingKeys[message.RoutingKey]
if !ok {
return errors.New(fmt.Sprintf("%s not implemented as valid routing key", message.RoutingKey))
}

err := cbWorker.Process(message.Body)

if err != nil {
err = message.Acknowledger.Nack(message.DeliveryTag, false, true)
if err != nil {
panic(err) // I know this isn't good. Will need to fix it
return errors.New(fmt.Sprintf("error nack'ing the message: %s", err.Error()))
}
continue
return errors.New(fmt.Sprintf("error processing message body: %s", err.Error()))
}

err = message.Acknowledger.Ack(message.DeliveryTag, false)
if err != nil {
panic(err) // I know this isn't good. Will need to fix it
return errors.New(fmt.Sprintf("error ack'ing the message: %s", err.Error()))
}
}
}()
return nil
})
}

if err := errGroup.Wait(); err != nil {
return err
}

return nil
}
55 changes: 29 additions & 26 deletions internal/queue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,19 @@ package queue

import (
"os"

"sublinks/sublinks-federation/internal/db"
"sublinks/sublinks-federation/internal/log"
"sublinks/sublinks-federation/internal/repository"
"sublinks/sublinks-federation/internal/worker"

amqp "github.com/rabbitmq/amqp091-go"
"sublinks/sublinks-federation/internal/db"
"sublinks/sublinks-federation/internal/log"
)

type Queue interface {
Connect() error
Run(conn db.Database)
PublishMessage(queueName string, message string) error
StartConsumer(queueData ConsumerQueue, worker worker.Worker) error
StartConsumer(queueData ConsumerQueue) error
Status() map[string]map[string]bool
Close()
}
Expand Down Expand Up @@ -52,42 +51,46 @@ func (q *RabbitQueue) Status() map[string]map[string]bool {

func (q *RabbitQueue) Run(conn db.Database) {
q.processActors(conn)
q.processPosts(conn)
q.processObjects(conn)
}

func (q *RabbitQueue) processActors(conn db.Database) {
actorCQ := ConsumerQueue{
QueueName: "actor_create_queue",
Exchange: "federation",
RoutingKey: "actor.create",
QueueName: "actor_create_queue",
Exchange: "federation",
RoutingKeys: map[string]worker.Worker{
ActorRoutingKey: &worker.ActorWorker{
Logger: q.logger,
Repository: repository.NewRepository(conn),
},
},
}

aw := worker.ActorWorker{
Logger: q.logger,
Repository: repository.NewRepository(conn),
}

err := q.StartConsumer(actorCQ, &aw)
err := q.StartConsumer(actorCQ)
if err != nil {
q.logger.Fatal("failed starting actor consumer", err)
}
}

func (q *RabbitQueue) processPosts(conn db.Database) {
postCQ := ConsumerQueue{
QueueName: "post_queue",
Exchange: "federation",
RoutingKey: "post.create",
}

aw := worker.PostWorker{
Logger: q.logger,
Repository: repository.NewRepository(conn),
func (q *RabbitQueue) processObjects(conn db.Database) {
queue := ConsumerQueue{
QueueName: "object_create_queue",
Exchange: "federation",
RoutingKeys: map[string]worker.Worker{
PostRoutingKey: &worker.PostWorker{
Logger: q.logger,
Repository: repository.NewRepository(conn),
},
CommentRoutingKey: &worker.CommentWorker{
Logger: q.logger,
Repository: repository.NewRepository(conn),
},
},
}

err := q.StartConsumer(postCQ, &aw)
err := q.StartConsumer(queue)
if err != nil {
q.logger.Fatal("failed starting post consumer", err)
q.logger.Fatal("failed starting object consumer", err)
}
}

Expand Down
7 changes: 7 additions & 0 deletions internal/queue/routing-keys.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package queue

const (
ActorRoutingKey = "actor.create"
PostRoutingKey = "post.create"
CommentRoutingKey = "comment.create"
)
Loading

0 comments on commit 1072ff2

Please sign in to comment.