Skip to content

Commit

Permalink
Runtime: Refactor conn cache to contain and detect hanging opens/clos…
Browse files Browse the repository at this point in the history
…es (#3666)

* Runtime: Refactor conn cache to contain and detect hanging opens/closes

* Extract connection cache to pkg + use a singleflight

* Add tests

* Make tests pass

* increase test sleeps for clsoe

* Fix various race conditions

* Integrate singleflight with conncache's mutex

* Increase timeouts

* Better comments

* Prevent deadlock when closing while opening

* Address review comments

* Remove redundant var
  • Loading branch information
begelundmuller authored and mindspank committed Dec 18, 2023
1 parent b1582fe commit 77e5bc1
Show file tree
Hide file tree
Showing 9 changed files with 806 additions and 697 deletions.
344 changes: 75 additions & 269 deletions runtime/connection_cache.go

Large diffs are not rendered by default.

417 changes: 0 additions & 417 deletions runtime/connection_cache_test.go

This file was deleted.

4 changes: 2 additions & 2 deletions runtime/connections.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ func (r *Runtime) AcquireSystemHandle(ctx context.Context, connector string) (dr
cfg[strings.ToLower(k)] = v
}
cfg["allow_host_access"] = r.opts.AllowHostAccess
return r.connCache.get(ctx, "", c.Type, cfg, true)
return r.getConnection(ctx, "", c.Type, cfg, true)
}
}
return nil, nil, fmt.Errorf("connector %s doesn't exist", connector)
Expand All @@ -36,7 +36,7 @@ func (r *Runtime) AcquireHandle(ctx context.Context, instanceID, connector strin
// So we take this moment to make sure the ctx gets checked for cancellation at least every once in a while.
return nil, nil, ctx.Err()
}
return r.connCache.get(ctx, instanceID, driver, cfg, false)
return r.getConnection(ctx, instanceID, driver, cfg, false)
}

func (r *Runtime) Repo(ctx context.Context, instanceID string) (drivers.RepoStore, func(), error) {
Expand Down
2 changes: 1 addition & 1 deletion runtime/drivers/duckdb/duckdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -813,7 +813,7 @@ func (c *connection) periodicallyCheckConnDurations(d time.Duration) {
c.connTimesMu.Lock()
for connID, connTime := range c.connTimes {
if time.Since(connTime) > maxAcquiredConnDuration {
c.logger.Error("duckdb: a connection has been held for more longer than the maximum allowed duration", zap.Int("conn_id", connID), zap.Duration("duration", time.Since(connTime)))
c.logger.Error("duckdb: a connection has been held for longer than the maximum allowed duration", zap.Int("conn_id", connID), zap.Duration("duration", time.Since(connTime)))
}
}
c.connTimesMu.Unlock()
Expand Down
Loading

0 comments on commit 77e5bc1

Please sign in to comment.