-
-
Notifications
You must be signed in to change notification settings - Fork 4
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add logic to process actors and posts #43
Conversation
Go test coverage: 0.3% for commit 1c2da88
|
@devanbenz would appreciate some feedback here. Not sure I am done though as I may take another pass at it |
34bc4c3
to
a341667
Compare
I have other code locally, but I think I can push that all to a separate PR and pause additional work here so we can get this one approved+merged first |
cmd/federation.go
Outdated
logger.Fatal("failed starting to consumer", err) | ||
} | ||
processActors(logger, conn, q) | ||
processPosts(logger, conn, q) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These probably need to be moved somewhere else eventually
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could probably encapsulate a lot of the queue logic into the actual queue.go file and have like a single function called Run() or RunRabbitMq() that can do all the work.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, that might make sense. I've been trying to keep things separate where possible (eg not have business logic in the queue code but rather have that somewhere else), but I also don't want to over-engineer this either.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
😦 That doesn't work. I took a stab at refactoring to have it run inside the queue. The problem is I need the database there also. So now I have to pass the database to the queue and that feels icky. I think I need to create another object and pass that object the db, logger, and queue (like we do the http server).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I managed to rework things to move these commands into the queue namespace. Not sure that makes sense as a final resting place, but probably good for now
cmd/federation.go
Outdated
} | ||
} | ||
|
||
func processPosts(logger *log.Log, conn db.Database, q queue.Queue) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can maybe be made more generic using some of the code from #46
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Which reminds me... @devanbenz don't delete your branch just yet 😄
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good :D
false, // no wait | ||
nil, // arguments | ||
queueData.QueueName, // queue name | ||
"", // consumer |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When this is blank, it gets auto-assigned. Might be ok? But might also be a better idea for us to think this through some more
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could set the consumer to a unique id with the routing key included in the string?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe. Like I said. Need to think it through. Probably, should create a separate issue to track it after this PR gets merged
nil, // arguments | ||
queueData.QueueName, // queue name | ||
"", // consumer | ||
false, // auto-ack |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The manual ack logic I have right now has issues. If you Nack a message, it just shoves it back in the queue and you reattempt to process it next time it's turn comes up. Need to investigate this more as it just creates an endless loop in local dev. I suspect there is a counter on the message somewhere that you can use to see how many times you've seen it. Also, a "retry after X period of time" would be nice and be able to apply an exponential backoff strategy
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should really implement a dead letter exchange. Let me know what you think? It would be beneficial for issues like this.
if err != nil { | ||
err = message.Acknowledger.Nack(message.DeliveryTag, false, true) | ||
if err != nil { | ||
panic(err) // I know this isn't good. Will need to fix it |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's really not good considering the docker image is set to "restart: always" meaning we still get stuck in an endless loop
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lets think about a way to create a worker pool for this and that way workers can die without causing the entire application to crash. I have a very basic worker pool library that I created a little while ago: https://pkg.go.dev/github.com/we-are-discussing-rest/pool-party I can modify it to have logic to kill off workers
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or at least we can still implement a dead letter exchange but add something like an array of servers that it failed to send to so that way we can retry again for the instances where it failed only
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah but this is internal communication. Any failures would be due to bugs in ours or the API server code
} | ||
err = message.Acknowledger.Ack(message.DeliveryTag, false) | ||
if err != nil { | ||
panic(err) // I know this isn't good. Will need to fix it |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is maybe not as bad considering we really shouldn't run into errors trying to ack a message, but...
Find(*interface{}, ...interface{}) error | ||
} | ||
|
||
type RepositoryImpl struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I ended up going with a generic implementation as the only thing that was different in each repository was the the type reference. This just led to a bunch of boilerplate copy-paste code that made my skin crawl 😱
506c17c
to
1e1a47d
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left some comments
} | ||
|
||
type PostgresDB struct { | ||
*sql.DB | ||
*gorm.DB |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are we going to be doing any SQL queries that are more advanced than a few SELECTS or INSERTS? I am not a big fan of incorporating gorm I think the base SQL driver has great functionality. Like to get your opinion on this though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would not expect to. An ORM should be the better way to go (I know Go devs don't typically like dependencies, but writing your own SQL is not a good idea unless you really understand how databases work and have AND use proper indexes)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I heard someone else complain about Gorm recently. Is there another ORM that we should use? I'm generally opposed to not using an ORM as I'm of the personal opinion that too often developers think they should use SQL because they are developers but really they shouldn't because using SQL is more than just syntax, you need to understand the specific database internals and structure of the tables to write proper SQL. An ORM "saves" you from this as ORMs are typically built to manage the structure of the database with the understanding of the specific database (MySQL / Postgres / etc...) you are working with. (also, ORM makes it easier to swap out to a different DB in the future as you avoid using syntax that is specific to one database)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've heard very good things about Ent. That being said I'm pretty sure SQLx works with any relational datastore and allows for schema-first querying.
cmd/federation.go
Outdated
logger.Fatal("failed starting to consumer", err) | ||
} | ||
processActors(logger, conn, q) | ||
processPosts(logger, conn, q) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could probably encapsulate a lot of the queue logic into the actual queue.go file and have like a single function called Run() or RunRabbitMq() that can do all the work.
if err != nil { | ||
err = message.Acknowledger.Nack(message.DeliveryTag, false, true) | ||
if err != nil { | ||
panic(err) // I know this isn't good. Will need to fix it |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lets think about a way to create a worker pool for this and that way workers can die without causing the entire application to crash. I have a very basic worker pool library that I created a little while ago: https://pkg.go.dev/github.com/we-are-discussing-rest/pool-party I can modify it to have logic to kill off workers
false, // no wait | ||
nil, // arguments | ||
queueData.QueueName, // queue name | ||
"", // consumer |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could set the consumer to a unique id with the routing key included in the string?
if err != nil { | ||
err = message.Acknowledger.Nack(message.DeliveryTag, false, true) | ||
if err != nil { | ||
panic(err) // I know this isn't good. Will need to fix it |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or at least we can still implement a dead letter exchange but add something like an array of servers that it failed to send to so that way we can retry again for the instances where it failed only
632b8a3
to
5ae0fb7
Compare
5ae0fb7
to
a1d9848
Compare
a1d9848
to
7206f75
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
code looks good we just need to think about whether to keep using gorm or another thing and how to handle some of the errors within the messages.
nil, // arguments | ||
queueData.QueueName, // queue name | ||
"", // consumer | ||
false, // auto-ack |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should really implement a dead letter exchange. Let me know what you think? It would be beneficial for issues like this.
This PR adds a lot of things. For one, it introduces handling of users, communities, and posts at a basic level from the backend via queue (RabbitMQ). It also adds some basic activitypub support (in terms of request URL handling, not of pushing messages out). It, unfortunately, has grown quite large and really needs to get reviewed+merged so other work can continue. If necessary, I an attempt to break it up into smaller PRs. There are some other minor little commits that I've snuck in as well (namely, adding air for hot-reload support, json-schema stuff, switching to Gorm)