Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add logic to process actors and posts #43

Merged
merged 10 commits into from
Mar 16, 2024
Merged

Conversation

lazyguru
Copy link
Member

@lazyguru lazyguru commented Feb 22, 2024

This PR adds a lot of things. For one, it introduces handling of users, communities, and posts at a basic level from the backend via queue (RabbitMQ). It also adds some basic activitypub support (in terms of request URL handling, not of pushing messages out). It, unfortunately, has grown quite large and really needs to get reviewed+merged so other work can continue. If necessary, I an attempt to break it up into smaller PRs. There are some other minor little commits that I've snuck in as well (namely, adding air for hot-reload support, json-schema stuff, switching to Gorm)

Copy link

github-actions bot commented Feb 22, 2024

Go test coverage: 0.3% for commit 1c2da88
⚠️ 9 of 10 packages have zero coverage.
  • sublinks/sublinks-federation
  • sublinks/sublinks-federation/cmd
  • sublinks/sublinks-federation/internal/activitypub
  • sublinks/sublinks-federation/internal/db
  • sublinks/sublinks-federation/internal/http
  • sublinks/sublinks-federation/internal/log
  • sublinks/sublinks-federation/internal/queue
  • sublinks/sublinks-federation/internal/repository
  • sublinks/sublinks-federation/internal/worker
View coverage for all packages
# Package Name                                      | Coverage
- sublinks/sublinks-federation                      |     0.0%
- sublinks/sublinks-federation/cmd                  |     0.0%
- sublinks/sublinks-federation/internal/activitypub |     0.0%
- sublinks/sublinks-federation/internal/db          |     0.0%
- sublinks/sublinks-federation/internal/http        |     0.0%
+ sublinks/sublinks-federation/internal/lemmy       |     2.7%
- sublinks/sublinks-federation/internal/log         |     0.0%
- sublinks/sublinks-federation/internal/queue       |     0.0%
- sublinks/sublinks-federation/internal/repository  |     0.0%
- sublinks/sublinks-federation/internal/worker      |     0.0%

@lazyguru
Copy link
Member Author

@devanbenz would appreciate some feedback here. Not sure I am done though as I may take another pass at it

@lazyguru lazyguru self-assigned this Feb 22, 2024
@lazyguru lazyguru force-pushed the add-logic-to-process-actors branch from 34bc4c3 to a341667 Compare February 23, 2024 07:25
@lazyguru lazyguru marked this pull request as ready for review March 4, 2024 05:00
@lazyguru lazyguru requested a review from devanbenz March 4, 2024 05:01
@lazyguru
Copy link
Member Author

lazyguru commented Mar 4, 2024

I have other code locally, but I think I can push that all to a separate PR and pause additional work here so we can get this one approved+merged first

logger.Fatal("failed starting to consumer", err)
}
processActors(logger, conn, q)
processPosts(logger, conn, q)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These probably need to be moved somewhere else eventually

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could probably encapsulate a lot of the queue logic into the actual queue.go file and have like a single function called Run() or RunRabbitMq() that can do all the work.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that might make sense. I've been trying to keep things separate where possible (eg not have business logic in the queue code but rather have that somewhere else), but I also don't want to over-engineer this either.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

😦 That doesn't work. I took a stab at refactoring to have it run inside the queue. The problem is I need the database there also. So now I have to pass the database to the queue and that feels icky. I think I need to create another object and pass that object the db, logger, and queue (like we do the http server).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I managed to rework things to move these commands into the queue namespace. Not sure that makes sense as a final resting place, but probably good for now

}
}

func processPosts(logger *log.Log, conn db.Database, q queue.Queue) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can maybe be made more generic using some of the code from #46

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Which reminds me... @devanbenz don't delete your branch just yet 😄

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good :D

false, // no wait
nil, // arguments
queueData.QueueName, // queue name
"", // consumer
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When this is blank, it gets auto-assigned. Might be ok? But might also be a better idea for us to think this through some more

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could set the consumer to a unique id with the routing key included in the string?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe. Like I said. Need to think it through. Probably, should create a separate issue to track it after this PR gets merged

nil, // arguments
queueData.QueueName, // queue name
"", // consumer
false, // auto-ack
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The manual ack logic I have right now has issues. If you Nack a message, it just shoves it back in the queue and you reattempt to process it next time it's turn comes up. Need to investigate this more as it just creates an endless loop in local dev. I suspect there is a counter on the message somewhere that you can use to see how many times you've seen it. Also, a "retry after X period of time" would be nice and be able to apply an exponential backoff strategy

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should really implement a dead letter exchange. Let me know what you think? It would be beneficial for issues like this.

if err != nil {
err = message.Acknowledger.Nack(message.DeliveryTag, false, true)
if err != nil {
panic(err) // I know this isn't good. Will need to fix it
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's really not good considering the docker image is set to "restart: always" meaning we still get stuck in an endless loop

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets think about a way to create a worker pool for this and that way workers can die without causing the entire application to crash. I have a very basic worker pool library that I created a little while ago: https://pkg.go.dev/github.com/we-are-discussing-rest/pool-party I can modify it to have logic to kill off workers

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or at least we can still implement a dead letter exchange but add something like an array of servers that it failed to send to so that way we can retry again for the instances where it failed only

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah but this is internal communication. Any failures would be due to bugs in ours or the API server code

}
err = message.Acknowledger.Ack(message.DeliveryTag, false)
if err != nil {
panic(err) // I know this isn't good. Will need to fix it
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is maybe not as bad considering we really shouldn't run into errors trying to ack a message, but...

Find(*interface{}, ...interface{}) error
}

type RepositoryImpl struct {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I ended up going with a generic implementation as the only thing that was different in each repository was the the type reference. This just led to a bunch of boilerplate copy-paste code that made my skin crawl 😱

@lazyguru lazyguru changed the title WIP: Add logic to process actors Add logic to process actors Mar 4, 2024
@lazyguru lazyguru changed the title Add logic to process actors Add logic to process actors and posts Mar 4, 2024
@lazyguru lazyguru force-pushed the add-logic-to-process-actors branch from 506c17c to 1e1a47d Compare March 6, 2024 22:34
Copy link
Member

@devanbenz devanbenz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left some comments

}

type PostgresDB struct {
*sql.DB
*gorm.DB
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we going to be doing any SQL queries that are more advanced than a few SELECTS or INSERTS? I am not a big fan of incorporating gorm I think the base SQL driver has great functionality. Like to get your opinion on this though.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would not expect to. An ORM should be the better way to go (I know Go devs don't typically like dependencies, but writing your own SQL is not a good idea unless you really understand how databases work and have AND use proper indexes)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I heard someone else complain about Gorm recently. Is there another ORM that we should use? I'm generally opposed to not using an ORM as I'm of the personal opinion that too often developers think they should use SQL because they are developers but really they shouldn't because using SQL is more than just syntax, you need to understand the specific database internals and structure of the tables to write proper SQL. An ORM "saves" you from this as ORMs are typically built to manage the structure of the database with the understanding of the specific database (MySQL / Postgres / etc...) you are working with. (also, ORM makes it easier to swap out to a different DB in the future as you avoid using syntax that is specific to one database)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've heard very good things about Ent. That being said I'm pretty sure SQLx works with any relational datastore and allows for schema-first querying.

internal/db/migrate.go Show resolved Hide resolved
logger.Fatal("failed starting to consumer", err)
}
processActors(logger, conn, q)
processPosts(logger, conn, q)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could probably encapsulate a lot of the queue logic into the actual queue.go file and have like a single function called Run() or RunRabbitMq() that can do all the work.

internal/activitypub/person.go Outdated Show resolved Hide resolved
internal/http/server.go Show resolved Hide resolved
if err != nil {
err = message.Acknowledger.Nack(message.DeliveryTag, false, true)
if err != nil {
panic(err) // I know this isn't good. Will need to fix it
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets think about a way to create a worker pool for this and that way workers can die without causing the entire application to crash. I have a very basic worker pool library that I created a little while ago: https://pkg.go.dev/github.com/we-are-discussing-rest/pool-party I can modify it to have logic to kill off workers

false, // no wait
nil, // arguments
queueData.QueueName, // queue name
"", // consumer
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could set the consumer to a unique id with the routing key included in the string?

if err != nil {
err = message.Acknowledger.Nack(message.DeliveryTag, false, true)
if err != nil {
panic(err) // I know this isn't good. Will need to fix it
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or at least we can still implement a dead letter exchange but add something like an array of servers that it failed to send to so that way we can retry again for the instances where it failed only

@lazyguru lazyguru force-pushed the add-logic-to-process-actors branch 2 times, most recently from 632b8a3 to 5ae0fb7 Compare March 7, 2024 04:23
@lazyguru lazyguru force-pushed the add-logic-to-process-actors branch from 5ae0fb7 to a1d9848 Compare March 15, 2024 05:51
@lazyguru lazyguru force-pushed the add-logic-to-process-actors branch from a1d9848 to 7206f75 Compare March 15, 2024 06:48
@lazyguru lazyguru requested review from devanbenz and a team March 15, 2024 06:57
Copy link
Member

@devanbenz devanbenz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

code looks good we just need to think about whether to keep using gorm or another thing and how to handle some of the errors within the messages.

nil, // arguments
queueData.QueueName, // queue name
"", // consumer
false, // auto-ack
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should really implement a dead letter exchange. Let me know what you think? It would be beneficial for issues like this.

@lazyguru lazyguru merged commit d7814e7 into main Mar 16, 2024
2 checks passed
@lazyguru lazyguru deleted the add-logic-to-process-actors branch March 16, 2024 22:27
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants