diff --git a/app/(blog)/blog/a-tale-of-two-copies/page.md b/app/(blog)/blog/a-tale-of-two-copies/page.md index eb966c9c2..91defb521 100644 --- a/app/(blog)/blog/a-tale-of-two-copies/page.md +++ b/app/(blog)/blog/a-tale-of-two-copies/page.md @@ -17,7 +17,7 @@ title: A Tale of Two Copies It was the best of times, it was the worst of times. That's when I hit a performance mystery that sent me down a multi-day rabbit hole of adventure. I was writing some code to take some entries, append them into a fixed size in-memory buffer, and then flush that buffer to disk when it was full. The main bit of code looked a little something like this: -``` +```go type Buffer struct { fh *os.File n uint @@ -50,7 +50,7 @@ Even if I'm not relying on my intuition, I find it fun and useful to try to pred Maybe you thought something similar, or maybe something completely different. Or maybe you didn't sign up to do thinking right now and just want me to get on with it. And so, I ran the following benchmark: -``` +```go func BenchmarkBuffer(b *testing.B) { fh := tempFile(b) defer fh.Close() @@ -310,7 +310,7 @@ What was left to try? Sometimes when I have no idea why something is slow, I try writing the same code but in a different way. That may tickle the compiler just right to cause it to change which optimizations it can or can't apply, giving some clues as to what's going on. So in that spirit I changed the benchmark to this: -``` +```go func BenchmarkBuffer(b *testing.B) { // ... setup code diff --git a/app/(blog)/blog/finding-and-tracking-resource-leaks-in-go/page.md b/app/(blog)/blog/finding-and-tracking-resource-leaks-in-go/page.md index 0c6eacd70..2983b3f50 100644 --- a/app/(blog)/blog/finding-and-tracking-resource-leaks-in-go/page.md +++ b/app/(blog)/blog/finding-and-tracking-resource-leaks-in-go/page.md @@ -14,67 +14,389 @@ title: Finding and Tracking Resource Leaks in Go --- -Forgetting to close a file, a connection, or some other resource is a rather common issue in Go. Usually you can spot them with good code review practices, but what if you wanted to automate it and you don't have a suitable linter at hand? - -How do we track and figure out those leaks? - +Forgetting to close a file, a connection, or some other resource is a rather common issue in Go. Usually you can spot them with good code review practices, but what if you wanted to automate it and you don't have a suitable linter at hand? + +How do we track and figure out those leaks? + Fortunately, there's an approach to finding common resource leaks that we’ll explore below. ## Problem: Connection Leak Let's take a simple example that involves a TCP client. Of course, it applies to other protocols, such as GRPC, database, or HTTP. We'll omit the communication implementation because it's irrelevant to the problem. +```go +type Client struct { + conn net.Conn +} + +func Dial(ctx context.Context, address string) (*Client, error) { + conn, err := (&net.Dialer{}).DialContext(ctx, "tcp", address) + if err != nil { + return nil, fmt.Errorf("failed to dial: %w", err) + } + + return &Client{conn: conn}, nil +} + +func (client *Client) Close() error { + return client.conn.Close() +} +``` + It's easy to put the defer in the wrong place or forget to call Close altogether. +```go +func ExampleDial(ctx context.Context) error { + source, err := Dial(ctx, "127.0.0.1:1000") + if err != nil { + return err + } + + destination, err := Dial(ctx, "127.0.0.1:1001") + if err != nil { + return err + } + + defer source.Close() + defer destination.Close() + + data, err := source.Recv(ctx) + if err != nil { + return fmt.Errorf("recv failed: %w", err) + } + + err = destination.Send(ctx, data) + if err != nil { + return fmt.Errorf("send failed: %w", err) + } + + return nil +} +``` + Notice if we fail to dial the second client, we have forgotten to close the source connection. ## Problem: File Leak Another common resource management mistake is a file leak. +```go +func ExampleFile(ctx context.Context, fs fs.FS) error { + file, err := fs.Open("data.csv") + if err != nil { + return fmt.Errorf("open failed: %w", err) + } + + stat, err := fs.Stat() + if err != nil { + return fmt.Errorf("stat failed: %w", err) + } + + fmt.Println(stat.Name()) + + _ = file.Close() + return nil +} +``` + ## Tracking Resources How do we track and figure out those leaks? One thing we can do is to keep track of every single open file and connection and ensure that everything is closed when the tests finish. We need to build something that keeps a list of all open things and tracks where we started using a resource. -To figure out where our "leak" comes from, we can use [runtime.Callers](https://pkg.go.dev/runtime#Callers). You can look at the [Frames example](https://pkg.go.dev/runtime#example-Frames) to learn how to use it. Let's call the struct we use to hold this information a `Tag`. +To figure out where our "leak" comes from, we can use [`runtime.Callers`](https://pkg.go.dev/runtime#Callers). You can look at the [Frames example](https://pkg.go.dev/runtime#example-Frames) to learn how to use it. Let's call the struct we use to hold this information a `Tag`. + +```go +// Tag is used to keep track of things we consider open. +type Tag struct { + owner *Tracker // we'll explain this below + caller [5]uintptr +} + +// newTag creates a new tracking tag. +func newTag(owner *Tracker, skip int) *Tag { + tag := &Tag{owner: owner} + runtime.Callers(skip+1, tag.caller[:]) + return tag +} + +// String converts a caller frames to a string. +func (tag *Tag) String() string { + var s strings.Builder + frames := runtime.CallersFrames(tag.caller[:]) + for { + frame, more := frames.Next() + if strings.Contains(frame.File, "runtime/") { + break + } + fmt.Fprintf(&s, "%s\n", frame.Function) + fmt.Fprintf(&s, "\t%s:%d\n", frame.File, frame.Line) + if !more { + break + } + } + return s.String() +} + +// Close marks the tag as being properly deallocated. +func (tag *Tag) Close() { + tag.owner.Remove(tag) +} +``` Of course, we need something to keep the list of all open trackers: +```go +// Tracker keeps track of all open tags. +type Tracker struct { + mu sync.Mutex + closed bool + open map[*Tag]struct{} +} + +// NewTracker creates an empty tracker. +func NewTracker() *Tracker { + return &Tracker{open: map[*Tag]struct{}{}} +} + +// Create creates a new tag, which needs to be closed. +func (tracker *Tracker) Create() *Tag { + tag := newTag(tracker, 2) + + tracker.mu.Lock() + defer tracker.mu.Unlock() + + // We don't want to allow creating a new tag, when we stopped tracking. + if tracker.closed { + panic("creating a tag after tracker has been closed") + } + tracker.open[tag] = struct{}{} + + return tag +} + +// Remove stops tracking tag. +func (tracker *Tracker) Remove(tag *Tag) { + tracker.mu.Lock() + defer tracker.mu.Unlock() + delete(tracker.open, tag) +} + +// Close checks that none of the tags are still open. +func (tracker *Tracker) Close() error { + tracker.mu.Lock() + defer tracker.mu.Unlock() + + tracker.closed = true + if len(tracker.open) > 0 { + return errors.New(tracker.openResources()) + } + return nil +} + +// openResources returns a string describing all the open resources. +func (tracker *Tracker) openResources() string { + var s strings.Builder + fmt.Fprintf(&s, "%d open resources\n", len(tracker.open)) + + for tag := range tracker.open { + fmt.Fprintf(&s, "---\n%s\n", tag) + } + + return s.String() +} +``` + Let's look at how it works: -You can test it over at . +```go +func TestTracker(t *testing.T) { + tracker := NewTracker() + defer func() { + if err := tracker.Close(); err != nil { + t.Fatal(err) + } + }() + + tag := tracker.Create() + // if we forget to call Close, then the test fails. + // tag.Close() +} +``` + +You can test it over at https://go.dev/play/p/8AkKrzYVFH5. ## Hooking up the tracker to a `fs.FS` We need to integrate it into the initially problematic code. We can create a wrapper for `fs.FS` that creates a tag for each opened file. +```go +type TrackedFS struct { + tracker *Tracker + fs fs.FS +} + +func TrackFS(fs fs.FS) *TrackedFS { + return &TrackedFS{ + tracker: NewTracker(), + fs: fs, + } +} + +func (fs *TrackedFS) Open(name string) (fs.File, error) { + file, err := fs.fs.Open(name) + if err != nil { + return file, err + } + + tag := fs.tracker.Create() + return &trackedFile{ + File: file, + tag: tag, + }, nil +} + +func (fs *TrackedFS) Close() error { return fs.tracker.Close() } + +type trackedFile struct { + fs.File + tag *Tag +} + +func (file *trackedFile) Close() error { + file.tag.Close() + return file.File.Close() +} +``` + Finally, we can use this wrapper in a test and get some actual issues resolved: -You can play around with it here . +```go +func TestFS(t *testing.T) { + // We'll use `fstest` package here, but you can also replace this with + // `os.DirFS` or similar. + dir := fstest.MapFS{ + "data.csv": &fstest.MapFile{Data: []byte("hello")}, + } + + fs := TrackFS(dir) + defer func() { + if err := fs.Close(); err != nil { + t.Fatal(err) + } + }() + + file, err := fs.Open("data.csv") + if err != nil { + t.Fatal(err) + } + + stat, err := file.Stat() + if err != nil { + t.Fatal(err) + } + + t.Log(stat.Name()) +} +``` + +You can play around with it here https://go.dev/play/p/VTKZUzWukTe. + -## Hooking up the tracker via a Context +## Hooking up the tracker via a `Context` Passing this `tracker` everywhere would be rather cumbersome. However, we can write some helpers to put the tracker inside a `Context`. +```go +type trackerKey struct{} + +func WithTracker(ctx context.Context) (*Tracker, context.Context) { + tracker := NewTracker() + return tracker, context.WithValue(ctx, trackerKey{}, tracker) +} + +func TrackerFromContext(ctx context.Context) *Tracker { + value := ctx.Value(trackerKey{}) + return value.(*Tracker) +} +``` + Of course, we need to adjust our `Client` implementation as well: +```go +type Client struct { + conn net.Conn + tag *Tag +} + +func Dial(ctx context.Context, address string) (*Client, error) { + conn, err := (&net.Dialer{}).DialContext(ctx, "tcp", address) + if err != nil { + return nil, fmt.Errorf("failed to dial: %w", err) + } + + tracker := TrackerFromContext(ctx) + return &Client{conn: conn, tag: tracker.Create()}, nil +} + +func (client *Client) Close() error { + client.tag.Close() + return client.conn.Close() +} +``` + To make our testing code even shorter, we can make a tiny helper: +```go +func TestingTracker(ctx context.Context, tb testing.TB) context.Context { + tracker, ctx := WithTracker(ctx) + tb.Cleanup(func() { + if err := tracker.Close(); err != nil { + tb.Fatal(err) + } + }) + return ctx +} +``` + Finally, we can put it all together: -You can see it working over here . +```go +func TestClient(t *testing.T) { + ctx := TestingTracker(context.Background(), t) + + addr := startTestServer(t) + + client, err := Dial(ctx, addr) + if err != nil { + t.Fatal(err) + } + + // if we forget to close, then the test will fail + // client.Close + _ = client +} +``` + +You can see it working over here https://go.dev/play/p/B6qI6xgij1m. ## Making it zero cost for production -Now, all of this `runtime.Callers` calling comes with a high cost. However, we can reduce it by conditionally compiling the code. Luckily we can use tags to only compile it only for testing. I like to use the `race` tag for it because it is added anytime you run your tests with `-race`. +Now, all of this `runtime.Callers` calling comes with a high cost. However, we can reduce it by conditionally compiling the code. Luckily we can use tags to only compile it only for testing. I like to use the `race` tag for it because it is added any time you run your tests with `-race`. + +```go +//go:build race + +package tracker +``` The implementations are left as an exercise for the reader. :) ## Conclusion -This is probably not a final solution for your problem, but hopefully, it is a good starting point. You can add more helpers, maybe track the filename inside a `Tag`, or only print unique caller frames in the test failure. Maybe try implementing this for SQL driver and track each thing separately you can take a peek [at our implementation](https://github.com/storj/private/tree/main/tagsql), if you get stuck. - -May all your resource leaks be discovered.This is a continuation of our series of finding leaks in Golang. In case you missed it, in a previous post we covered [finding leaked goroutines](https://www.storj.io/blog/finding-goroutine-leaks-in-tests). +This is probably not a final solution for your problem, but hopefully, it is a good starting point. You can add more helpers, maybe track the filename inside a `Tag`, or only print unique caller frames in the test failure. Maybe try implementing this for SQL driver and track each thing separately -- you can take a peek [at our implementation](https://github.com/storj/private/tree/main/tagsql), if you get stuck. + +May all your resource leaks be discovered. +This is a continuation of our series of finding leaks in Golang. In case you missed it, in a previous post we covered [finding leaked goroutines](https://www.storj.io/blog/finding-goroutine-leaks-in-tests). diff --git a/app/(blog)/blog/finding-goroutine-leaks-in-tests/page.md b/app/(blog)/blog/finding-goroutine-leaks-in-tests/page.md index dfcff542c..ba898f61f 100644 --- a/app/(blog)/blog/finding-goroutine-leaks-in-tests/page.md +++ b/app/(blog)/blog/finding-goroutine-leaks-in-tests/page.md @@ -5,7 +5,7 @@ date: '2022-03-07 00:00:00' heroimage: ./c1245dac8cff160d.jpeg layout: blog metadata: - description: 'Finding Goroutine Leaks in TestsA leaked goroutine at the end of a + description: 'A leaked goroutine at the end of a test can indicate several problems. Let''s first, take a look at the most common ones before tackling an approach to finding them.Problem: DeadlockFirst, we can have a goroutine that is blocked. As an example:func LeakySumSquares(c...' @@ -14,8 +14,6 @@ title: Finding Goroutine Leaks in Tests --- -### Finding Goroutine Leaks in Tests - A leaked goroutine at the end of a test can indicate several problems. Let's first, take a look at the most common ones before tackling an approach to finding them. ### Problem: Deadlock @@ -23,7 +21,7 @@ A leaked goroutine at the end of a test can indicate several problems. Let's fir First, we can have a goroutine that is blocked. As an example: -``` +```go func LeakySumSquares(ctx context.Context, data []int) ( total int, err error) { @@ -57,7 +55,7 @@ In this case, when the context is canceled, the goroutines might end up leaking. Many times different services, connections, or databases have an internal goroutine used for async processing. A leaked goroutine can show such leaks. -``` +```go type Conn struct { messages chan Message @@ -106,7 +104,7 @@ Even if all the goroutines terminate, there can still be order problems with reg Let's take a common case of the problem: -``` +```go type Server struct { log Logger db *sql.DB @@ -176,7 +174,7 @@ To attach the label: -``` +```go func Track(ctx context.Context, t *testing.T, fn func(context.Context)) { label := t.Name() pprof.Do(ctx, pprof.Labels("test", label), fn) @@ -189,7 +187,7 @@ func Track(ctx context.Context, t *testing.T, fn func(context.Context)) { Unfortunately, currently, there's not an easy way to get the goroutines with a given label. But, we can use some of the profiling endpoints to extract the necessary information. Clearly, this is not very efficient. -``` +```go import "github.com/google/pprof/profile" func CheckNoGoroutines(key, value string) error { @@ -269,7 +267,7 @@ func matchesLabel(sample *profile.Sample, key, expectedValue string) bool { And a failing test might look like this: -``` +```go func TestLeaking(t *testing.T) { t.Parallel() ctx, cancel := context.WithCancel(context.Background()) diff --git a/app/(blog)/blog/go-integration-tests-with-postgres/page.md b/app/(blog)/blog/go-integration-tests-with-postgres/page.md index 9fd09d6a1..e8c0bcc96 100644 --- a/app/(blog)/blog/go-integration-tests-with-postgres/page.md +++ b/app/(blog)/blog/go-integration-tests-with-postgres/page.md @@ -24,30 +24,251 @@ If you have searched a bit on how to set up a clean test environment, you've pro Usually, the first thing you do is set up something to act as the client. With *dockertest* it means creating a *dockertest.Pool*. And we need to set it up in our *TestMain*: +```go +var dockerPool *dockertest.Pool + +func TestMain(m *testing.M) { + var err error + pool, err = dockertest.NewPool("") + if err != nil { + fmt.Fprintln(os.Stderr, err) + os.Exit(1) + } + + // Set a time for our retries. A lower value probably makes more sense. + pool.MaxWait = 120 * time.Second + code := m.Run() + os.Exit(code) +} +``` + If we are writing tests, then using a specific helper is going to be very convenient. +```go +func TestCreateTable(t *testing.T) { + ctx := context.Background() + WithDatabase(ctx, t, func(t *testing.TB, db *pgx.Conn) { + _, err := db.Exec(ctx, ` + CREATE TABLE accounts ( user_id serial PRIMARY KEY ); + `) + if err != nil { + t.Fatal(err) + } + }) +} + +func WithDatabase[TB testing.TB](ctx context.Context, tb TB, +test func(t TB, db *pgx.Conn)) { + // < snip > +} +``` + This approach creates a docker image and calls *test* callback whenever it's ready. The callback based approach is especially helpful if you need to test with multiple backends such as Cockroach and Postgres. In your own codebase you probably would return the data layer interface rather than *\*pgx.Conn* directly. For example: +```go +func TestCreateTable(t *testing.T) { + ctx := context.Background() + db := NewDatabase(ctx, t) + _, err := db.Exec(ctx, ` + CREATE TABLE accounts ( user_id serial PRIMARY KEY ); + `) + if err != nil { + t.Fatal(err) + } +} + +func NewDatabase(ctx context.Context, tb testing.TB) *pgx.Conn { + // create the database resource + tb.Cleanup(func() { + err := db.Close(ctx) + if err != nil { + tb.Logf("failed to close db: %v", err) + } + }) + return conn +} +``` + A single table migration isn't indicative of a proper database layer, but it's sufficient for seeing the best-case scenario. Adding more tables didn't seem to affect things that much. Let's get back on track and see how you can implement the first approach. It's should be trivial to convert one to the other: +```go +func WithDatabase[TB testing.TB](ctx context.Context, tb TB, +test func(t TB, db *pgx.Conn)) { + + // First we need to specify the image we wish to use. + resource, err := dockerPool.RunWithOptions(&dockertest.RunOptions{ + Repository: "postgres", + Tag: "15", + Env: []string{ + "POSTGRES_PASSWORD=secret", + "POSTGRES_USER=user", + "POSTGRES_DB=main", + "listen_addresses = '*'", + }, + }, func(config *docker.HostConfig) { + // set AutoRemove to true so that stopped container goes away by itself + config.AutoRemove = true + config.RestartPolicy = docker.RestartPolicy{Name: "no"} + }) + if err != nil { + tb.Fatalf("Could not start resource: %s", err) + } + defer func() { + if err := dockerPool.Purge(resource); err != nil { + tb.Logf("failed to stop: %v", err) + } + }() + + // Construct our connection string. + hostAndPort := resource.GetHostPort("5432/tcp") + databaseConnstr := fmt.Sprintf("postgres://user:secret@%s/main?sslmode=disable", hostAndPort) + + err = resource.Expire(2 * 60) // hard kill the container after 2 minutes, just in case. + if err != nil { + tb.Fatalf("Unable to set container expiration: %v", err) + } + + // Finally, try to connect to the container. + // We need to retry, because it might take some time until the container becomes available. + var db *pgx.Conn + err = dockerPool.Retry(func() error { + db, err = pgx.Connect(ctx, databaseConnstr) + if err != nil { + return err + } + return nil + }) + if err != nil { + tb.Fatal("unable to connect to Postgres", err) + } + + defer func() { + err := db.Close(ctx) + if err != nil { + tb.Logf("failed to close db: %v", err) + } + }() + + // Finally call our test code. + test(tb, db) +} +``` + Let's look at the performance: +``` +Environment Test Time +Windows Threadripper 2950X Container 2.86s ± 6% +MacOS M1 Pro Container 1.63s ± 16% +Linux Xeon Gold 6226R Container 2.24s ± 10% +``` + ## Using DATABASE In most cases, creating a new postgres instance per test isn't necessary. It'll be entirely sufficient to have a database per test. If we have SUPERUSER permissions in postgres we can create them dynamically. To contrast with the previous approach, let's use a locally installed Postgres instance. This can be helpful, if you want to run tests against a remote database or want to avoid the container startup time. +```go +var pgaddr = flag.String("database", os.Getenv("DATABASE_URL"), "database address") +``` + Let's rewrite the function to create a new database per test: +```go +func WithDatabase[TB testing.TB](ctx context.Context, tb TB, test func(t TB, db *pgx.Conn)) { + if *pgaddr == "" { + tb.Skip("-database flag not defined") + } + dbaddr := *pgaddr + + // We need to create a unique database name so that our parallel tests don't clash. + var id [8]byte + rand.Read(id[:]) + uniqueName := tb.Name() + "/" + hex.EncodeToString(id[:]) + + // Create the main connection that we use to create the database. + maindb, err := pgx.Connect(ctx, dbaddr) + if err != nil { + tb.Fatalf("Unable to connect to database: %v", err) + } + + // Run the database creation query and defer the database cleanup query. + if err := createDatabase(ctx, maindb, uniqueName); err != nil { + tb.Fatalf("unable to create database: %v", err) + } + defer func() { + if err := dropDatabase(ctx, maindb, uniqueName); err != nil { + tb.Fatalf("unable to drop database: %v", err) + } + }() + + // Modify the connection string to use a different database. + connstr, err := connstrWithDatabase(dbaddr, uniqueName) + if err != nil { + tb.Fatal(err) + } + + // Create a new connection to the database. + db, err := pgx.Connect(ctx, connstr) + if err != nil { + tb.Fatalf("Unable to connect to database: %v", err) + } + defer func() { _ = db.Close(ctx) }() + + // Run our test code. + test(tb, db) +} +``` + Now for the small utility funcs that we used: +```go +// connstrWithDatabase changes the main database in the connection string. +func connstrWithDatabase(connstr, database string) (string, error) { + u, err := url.Parse(connstr) + if err != nil { + return "", fmt.Errorf("invalid connstr: %q", connstr) + } + u.Path = database + return u.String(), nil +} + +// createDatabase creates a new database with the specified name. +func createDatabase(ctx context.Context, db *pgx.Conn, name string) error { + _, err := db.Exec(ctx, `CREATE DATABASE `+sanitizeDatabaseName(name)+`;`) + return err +} + +// dropDatabase drops the specific database. +func dropDatabase(ctx context.Context, db *pgx.Conn, name string) error { + _, err := db.Exec(ctx, `DROP DATABASE `+sanitizeDatabaseName(name)+`;`) + return err +} + + +// sanitizeDatabaseName is ensures that the database name is a valid postgres identifier. +func sanitizeDatabaseName(schema string) string { + return pgx.Identifier{schema}.Sanitize() +} +``` + The performance looks already significantly better: +``` +Environment Test Time +Windows Threadripper 2950X Container 2.86s ± 6% +Windows Threadripper 2950X Database 136ms ± 12% +MacOS M1 Pro Container 1.63s ± 16% +MacOS M1 Pro Database 136ms ± 12% +Linux Xeon Gold 6226R Container 2.24s ± 10% +Linux Xeon Gold 6226R Database 135ms ± 10% +``` + ## Using SCHEMA But, 90ms is still a lot of time per single test. There's one lesser-known approach we discovered in Storj. It's possible to use a [schema](https://www.postgresql.org/docs/current/ddl-schemas.html) to create an isolated namespace that can be dropped together. @@ -58,16 +279,119 @@ Of course, if you use schemas for other purposes in your system, then this appro Now that the disclaimer is out of the way, let's take a look at some code: +```go +func WithSchema[TB testing.TB](ctx context.Context, tb TB, test func(t TB, db *pgx.Conn)) { + if *pgaddr == "" { + tb.Skip("-database flag not defined") + } + dbaddr := *pgaddr + + // We need to create a unique schema name so that our parallel tests don't clash. + var id [8]byte + rand.Read(id[:]) + uniqueName := tb.Name() + "/" + hex.EncodeToString(id[:]) + + // Change the connection string to use a specific schema name. + connstr, err := connstrWithSchema(dbaddr, uniqueName) + if err != nil { + tb.Fatal(err) + } + db, err := pgx.Connect(ctx, connstr) + if err != nil { + tb.Fatalf("Unable to connect to database: %v", err) + } + defer func() { _ = db.Close(ctx) }() + + // Surprisingly, it's perfectly fine to create a schema after connecting with the name. + if err := createSchema(ctx, db, uniqueName); err != nil { + tb.Fatal(err) + } + defer func() { + if err := dropSchema(ctx, db, uniqueName); err != nil { + tb.Fatal(err) + } + }() + + test(tb, db) +} +``` + The smaller utilities that make it work: +```go +// connstrWithSchema adds search_path argument to the connection string. +func connstrWithSchema(connstr, schema string) (string, error) { + u, err := url.Parse(connstr) + if err != nil { + return "", fmt.Errorf("invalid connstr: %q", connstr) + } + u.Query().Set("search_path", sanitizeSchemaName(schema)) + return u.String(), nil +} + +// createSchema creates a new schema in the database. +func createSchema(ctx context.Context, db *pgx.Conn, schema string) error { + _, err := db.Exec(ctx, `CREATE SCHEMA IF NOT EXISTS` ++ sanitizeSchemaName(schema)+`;`) + return err +} + +// dropSchema drops the specified schema and associated data. +func dropSchema(ctx context.Context, db *pgx.Conn, schema string) error { + _, err := db.Exec(ctx, `DROP SCHEMA `+sanitizeSchemaName(schema)+` CASCADE;`) + return err +} + +// sanitizeSchemaName is ensures that the name is a valid postgres identifier. +func sanitizeSchemaName(schema string) string { + return pgx.Identifier{schema}.Sanitize() +} +``` + After running some benchmarks we can see that we've reached ~20ms: +``` +Environment Test Time +Windows Threadripper 2950X Container 2.86s ± 6% +Windows Threadripper 2950X Database 136ms ± 12% +Windows Threadripper 2950X Schema 26.7ms ± 3% +MacOS M1 Pro Container 1.63s ± 16% +MacOS M1 Pro Database 136ms ± 12% +MacOS M1 Pro Schema 19.7ms ± 20% +Linux Xeon Gold 6226R Container 2.24s ± 10% +Linux Xeon Gold 6226R Database 135ms ± 10% +Linux Xeon Gold 6226R Schema 29.2ms ± 16% +``` + + ## Final tweaks There's one important flag that you can adjust in Postgres to make it run faster... of course, this should only be used for testing. It's disabling [fsync](https://www.postgresql.org/docs/current/runtime-config-wal.html). The final results of the comparison look like: +``` +Environment Test fsync Time +Windows Threadripper 2950X Container on 2.86s ± 6% +Windows Threadripper 2950X Container off 2.82s ± 4% +Windows Threadripper 2950X Database on 136ms ± 12% +Windows Threadripper 2950X Database off 105ms ± 30% +Windows Threadripper 2950X Schema on 26.7ms ± 3% +Windows Threadripper 2950X Schema off 20.5ms ± 5% +MacOS M1 Pro Container on 1.63s ± 16% +MacOS M1 Pro Container off 1.64s ± 13% +MacOS M1 Pro Database on 136ms ± 12% +MacOS M1 Pro Database off 105ms ± 30% +MacOS M1 Pro Schema on 19.7ms ± 20% +MacOS M1 Pro Schema off 18.5ms ± 31% +Linux Xeon Gold 6226R Container on 2.24s ± 10% +Linux Xeon Gold 6226R Container off 1.97s ± 10% +Linux Xeon Gold 6226R Database on 135ms ± 10% +Linux Xeon Gold 6226R Database off 74.2ms ± 10% +Linux Xeon Gold 6226R Schema on 29.2ms ± 16% +Linux Xeon Gold 6226R Schema off 15.3ms ± 15% +``` + All the tests were run in a container that didn't have persistent disk mounted. The fsync=off would probably have a bigger impact with an actual disk. So for the conclusion, we looked at three different approaches to creating a clean Postgres environment. The approaches aren't completely equivalent, but use the fastest one that you can. diff --git a/app/(blog)/blog/production-concurrency/page.md b/app/(blog)/blog/production-concurrency/page.md index 59040b951..45df3f44d 100644 --- a/app/(blog)/blog/production-concurrency/page.md +++ b/app/(blog)/blog/production-concurrency/page.md @@ -24,7 +24,7 @@ Before we start, I should mention that many of these recommendations will have c I've seen many times people using concurrency where you should not use it. It should go without saying, don't add concurrency unless you have a good reason. -``` +```go var wg sync.WaitGroup wg.Add(1) @@ -34,7 +34,7 @@ wg.Wait() ❯ -``` +```go serve() ``` The concurrency here is entirely unnecessary, but I've seen this exact code in a repository. System without concurrency is much easier to debug, test and understand. @@ -46,7 +46,7 @@ People also add concurrency because they think it will speed up their program. I A friend of the previous rule is to prefer synchronous API. As mentioned, non-concurrent code is usually shorter and easier to test and debug. -``` +```go server.Start(ctx) server.Stop() server.Wait() @@ -54,7 +54,7 @@ server.Wait() ❯ -``` +```go server.Run(ctx) ``` If you need concurrency when using something, it's relatively easy to make things concurrent. It's much more difficult to do the reverse. @@ -68,7 +68,7 @@ First is -race, which enables the race detector to flag all the observed data ra Second mark your tests with t.Parallel(): -``` +```go func TestServer(t *testing.T) { t.Parallel() // ... @@ -82,7 +82,7 @@ Avoid global variables such as caches, loggers, and databases. For example, it's relatively common for people to use log.Println inside their service, and their testing output ends in the wrong location. -``` +```go func TestAlpha(t *testing.T) { t.Parallel() log.Println("Alpha") @@ -117,7 +117,7 @@ Notice how the "Alpha" and "Beta" are out of place. The code under test should c Similarly, it's relatively common for people to start goroutines without waiting for them to finish. *go* keyword makes starting goroutines very easy; however, it's not apparent that you also must wait for them to stop. -``` +```go go ListenHTTP(ctx) go ListenGRPC(ctx) go ListenDebugServer(ctx) @@ -126,7 +126,7 @@ select{} ❯ -``` +```go g, ctx := errgroup.WithContext(ctx) g.Go(func() error { return ListenHTTP(ctx) @@ -148,13 +148,13 @@ Similarly, when you wait for all goroutines to finish, you can detect scenarios The next common issue is not handling context cancellation. It usually won't be a problem in the production system itself. It's more of an annoyance during testing and development. Let's imagine you have a time.Sleep somewhere in your code: -``` +```go time.Sleep(time.Minute) ``` ❯ -``` +```go tick := time.NewTimer(time.Minute) defer tick.Stop() @@ -171,7 +171,7 @@ time.Sleep cannot react to any code, which means when you press Ctrl-C on your k The other scenario where this cancellation comes up is long calculations: -``` +```go for _, f := range files { data, err := os.ReadFile(f) // ... @@ -180,7 +180,7 @@ for _, f := range files { ❯ -``` +```go for _, f := range files { if err := ctx.Err(); err != nil { return err @@ -205,7 +205,7 @@ There are many reasons to not use worker pools: You can replace your worker pools with a goroutine limiter -- something that disallows from creating more than N goroutines. -``` +```go var wg sync.WaitGroup defer wg.Wait() queue := make(chan string, 8) @@ -227,7 +227,7 @@ close(queue) ❯ -``` +```go var wg sync.WaitGroup defer wg.Wait() limiter := make(chan struct{}, 8) @@ -250,7 +250,7 @@ We'll later show how to make a limiter primitive easier to use. Polling another system is rather wasteful of resources. It's usually better to use some channel or signal to message the other side: -``` +```go lastKnown := 0 for { time.Sleep(time.Second) @@ -265,7 +265,7 @@ for { ❯ -``` +```go lastKnown := 0 for newState := range t.updates { if lastKnown != newState { @@ -283,7 +283,7 @@ Polling wastes resources when the update rates are slow. It also responds to cha It's easy to forget an mu.Unlock, wg.Wait or close(ch). If you always defer them, it will be much easier to see when they are missing. -``` +```go for _, item := range items { service.mu.Lock() service.process(item) @@ -293,7 +293,7 @@ for _, item := range items { ❯ -``` +```go for _, item := range items { func() { service.mu.Lock() @@ -310,7 +310,7 @@ Even if your initial code is correct, then code modification can introduce a bug The larger the scope where the locks can be used, the easier it is to make a mistake. -``` +```go type Set[T any] struct { sync.Lock Items []T @@ -319,7 +319,7 @@ type Set[T any] struct { ❯ -``` +```go type Set[T any] struct { mu sync.Lock items []T @@ -330,7 +330,7 @@ type Set[T any] struct { You can make your debugging and stack traces much nicer by adding names to your goroutines: -``` +```go labels := pprof.Labels("server", "grpc") pprof.Do(ctx, labels, func(ctx context.Context) { @@ -364,7 +364,7 @@ If you are surprised that chan and go func() { are so low on the list, we'll sho ### Common Mistake #1: go func() -``` +```go func (server *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) { ... // start an async operation @@ -396,7 +396,7 @@ One of the solutions for starting goroutines is to use sync.WaitGroup. However, Let's take a look at the first common mistake with sync.WaitGroup: -``` +```go func processConcurrently(item []*Item) { var wg sync.WaitGroup defer wg.Wait() @@ -420,7 +420,7 @@ Here the problem is that the processConcurrently can return before wg.Add is cal The other scenario comes up when people incrementally change code: -``` +```go func processConcurrently(item []*Item) { var wg sync.WaitGroup wg.Add(len(items)) @@ -442,7 +442,7 @@ Notice how we moved the call to wg.Done outside of the process, making it easier To fully fix the code, it should look like this: -``` +```go func processConcurrently(item []*Item) { var wg sync.WaitGroup defer wg.Wait() @@ -462,7 +462,7 @@ func processConcurrently(item []*Item) { If you don't see the following parts when someone is using sync.WaitGroup, then it probably has a subtle error somewhere: -``` +```go var wg sync.WaitGroup defer wg.Wait() ... @@ -476,7 +476,7 @@ for ... { Instead of sync.WaitGroup there's a better alternative that avoids many of these issues: -``` +```go func processConcurrently(item []*Item) error { var g errgroup.Group for _, item := range items { @@ -494,7 +494,7 @@ func processConcurrently(item []*Item) error { errgroup.Group can be used in two ways: -``` +```go // on failure, waits other goroutines // to stop on their own var g errgroup.Group @@ -507,7 +507,7 @@ g.Go(func() error { err := g.Wait() ``` -``` +```go // on failure, cancels other goroutines g, ctx := errgroup.WithContext(ctx) g.Go(func() error { @@ -525,7 +525,7 @@ You can read [golang.org/x/sync/errgroup documentation](https://pkg.go.dev/golan Mutex is definitely a useful primitive, however you should be careful when you use it. I've seen quite often code that looks like: -``` +```go func (cache *Cache) Add(ctx context.Context, key, value string) { cache.mu.Lock() defer cache.mu.Unlock() @@ -542,7 +542,7 @@ You might wonder, what's the problem here. It's appropriately locking and unlock Instead, you can use a chan \*state, which allows you to handle context cancellation properly: -``` +```go type Cache struct { state chan *state } @@ -589,7 +589,7 @@ Either way, you should be able to demonstrate that your use of sync.RWMutex is h Channels are valuable things in the Go language but are also error-prone. There are many ways to write bugs with them: -``` +```go const workerCount = 100 var wg sync.WaitGroup @@ -642,7 +642,7 @@ There are many ways you can write such primitives. The following are merely exam Let's take a basic thing first, sleeping a bit: -``` +```go func Sleep(ctx context.Context, duration time.Duration) error { t := time.NewTimer(duration) defer t.Stop() @@ -658,7 +658,7 @@ func Sleep(ctx context.Context, duration time.Duration) error { Here we need to ensure that we appropriately react to context cancellation so that we don't wait for a long time until we notice that context canceled the operation. Using this call is not much longer than time.Sleep itself: -``` +```go if err := Sleep(ctx, time.Second); err != nil { return err } @@ -668,7 +668,7 @@ if err := Sleep(ctx, time.Second); err != nil { I've found plenty of cases where you must limit the number of goroutines. -``` +```go type Limiter struct { limit chan struct{} working sync.WaitGroup @@ -713,7 +713,7 @@ func (lim *Limiter) Wait() { This primitive is used the same way as errgroup.Group: -``` +```go lim := NewLimiter(8) defer lim.Wait() for _, item := range items { @@ -735,7 +735,7 @@ Of course, if your limited goroutines are dependent on each other, then it can i Let's say you want to process a slice concurrently. We can use this limiter to start multiple goroutines with the specified batch sizes: -``` +```go type Parallel struct { Concurrency int BatchSize int @@ -773,7 +773,7 @@ func (p Parallel) Process(ctx context.Context, This primitive allows to hide the "goroutine management" from our domain code: -``` +```go var mu sync.Mutex total := 0 @@ -796,7 +796,7 @@ err := Parallel{ Sometimes for testing, you need to start multiple goroutines and wait for all of them to complete. You can use errgroup for it; however, we can write a utility that makes it shorter: -``` +```go func Concurrently(fns ...func() error) error { var g errgroup.Group for _, fn := range fns { @@ -808,7 +808,7 @@ func Concurrently(fns ...func() error) error { A test can use it this way: -``` +```go err := Concurrently( func() error { if v := cache.Get(123); v != nil { @@ -834,7 +834,7 @@ There are many variations of this. Should the function take ctx as an argument a Sometimes you want different goroutines to wait for one another: -``` +```go type Fence struct { create sync.Once release sync.Once @@ -878,7 +878,7 @@ func (f *Fence) Wait(ctx context.Context) error { When we use it together with Concurrently we can write code that looks like: -``` +```go var loaded Fence var data map[string]int @@ -907,7 +907,7 @@ err := Concurrently( Similarly, we quite often need to protect the state when concurrently modifying it. We've seen how sync.Mutex is sometimes error-prone and doesn't consider context cancellation. Let's write a helper for such a scenario. -``` +```go type Locked[T any] struct { state chan *T } @@ -936,7 +936,7 @@ func (s *Locked[T]) Modify(ctx context.Context, fn func(*T) error) error { Then we can use it like: -``` +```go state := NewLocked(&State{Value: 123}) err := state.Modify(ctx, func(state *State) error { state.Value = 256 @@ -950,8 +950,8 @@ Finally, let's take a scenario where we want to start background goroutines insi Let's first write out the server code, how we would like to use it: -``` -unc (server *Server) Run(ctx context.Context) error { +```go +func (server *Server) Run(ctx context.Context) error { server.pending = NewJobs(ctx) defer server.pending.Wait() @@ -981,7 +981,7 @@ func (server *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) { Then let's write the primitive: -``` +```go type Jobs struct { root context.WithContext group errgroup.Group @@ -1015,7 +1015,7 @@ func (jobs *Jobs) Go(requestCtx context.Context, fn func(ctx context.Context)) b Of course, we can add a limiter, to prevent too many background workers to be started: -``` +```go type Jobs struct { root context.WithContext limit chan struct{} @@ -1048,7 +1048,7 @@ func (jobs *Jobs) Go(requestCtx context.Context, fn func(ctx context.Context)) b As a final exercise for the reader, you can try implementing a retry with backoff. The API for such a primitive can look like this: -``` +```go const ( maxRetries = 10 minWait = time.Second/10 @@ -1066,7 +1066,7 @@ if retry.Err() != nil { Alternatively, it can be callback based: -``` +```go err := Retry(ctx, maxRetries, minWait, maxWait, func(ctx context.Context) error { ...