Skip to content

Commit

Permalink
Make tests pass with -race
Browse files Browse the repository at this point in the history
  • Loading branch information
begelundmuller committed Oct 3, 2023
1 parent 682eb0c commit af03bcc
Show file tree
Hide file tree
Showing 6 changed files with 35 additions and 11 deletions.
12 changes: 9 additions & 3 deletions runtime/connection_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,14 +197,20 @@ func (c *connectionCache) get(ctx context.Context, instanceID, driver string, co
var err error
select {
case <-conn.ready:
err = conn.err
case <-ctx.Done():
err = ctx.Err() // Will always be non-nil, ensuring releaseConn is called
}

// Lock again for accessing conn
c.lock.Lock()
defer c.lock.Unlock()

if err == nil {
err = conn.err
}

if err != nil {
c.lock.Lock()
c.releaseConn(key, conn)
c.lock.Unlock()
return nil, nil, err
}

Expand Down
19 changes: 12 additions & 7 deletions runtime/connection_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,15 +195,18 @@ func TestConnectionCacheMultipleConfigs(t *testing.T) {
func TestConnectionCacheParallelCalls(t *testing.T) {
ctx := context.Background()

c := newConnectionCache(10, zap.NewNop(), newTestRuntimeWithInst(t), activity.NewNoopClient())
defer c.Close()

m := &mockDriver{}
drivers.Register("mock_driver", m)
defer func() {
delete(drivers.Drivers, "mock_driver")
}()

rt := newTestRuntimeWithInst(t)
defer rt.Close()

c := newConnectionCache(10, zap.NewNop(), rt, activity.NewNoopClient())
defer c.Close()

var wg sync.WaitGroup
wg.Add(10)
// open 10 connections and verify no error
Expand All @@ -224,16 +227,18 @@ func TestConnectionCacheParallelCalls(t *testing.T) {
func TestConnectionCacheBlockingCalls(t *testing.T) {
ctx := context.Background()

rt := newTestRuntimeWithInst(t)
c := newConnectionCache(10, zap.NewNop(), rt, activity.NewNoopClient())
defer c.Close()

m := &mockDriver{}
drivers.Register("mock_driver", m)
defer func() {
delete(drivers.Drivers, "mock_driver")
}()

rt := newTestRuntimeWithInst(t)
defer rt.Close()

c := newConnectionCache(10, zap.NewNop(), rt, activity.NewNoopClient())
defer c.Close()

var wg sync.WaitGroup
wg.Add(12)
// open 1 slow connection
Expand Down
3 changes: 3 additions & 0 deletions runtime/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1020,6 +1020,9 @@ func TestWatch(t *testing.T) {
require.NoError(t, err)
}

// Make sure there's time for the watcher to start
awaitIdle()

testruntime.PutFiles(t, rt, id, map[string]string{
"/data/foo.csv": `a,b,c,d,e
1,2,3,4,5
Expand Down
1 change: 1 addition & 0 deletions runtime/drivers/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ type Instance struct {
// ResolveVariables returns the final resolved variables
func (i *Instance) ResolveVariables() map[string]string {
r := make(map[string]string, len(i.ProjectVariables))

// set ProjectVariables first i.e. Project defaults
for k, v := range i.ProjectVariables {
r[k] = v
Expand Down
4 changes: 4 additions & 0 deletions runtime/reconcilers/project_parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,10 @@ func (r *ProjectParserReconciler) reconcileProjectConfig(ctx context.Context, pa
return err
}

// Shallow clone for editing
tmp := *inst
inst = &tmp

conns := make([]*runtimev1.Connector, 0, len(parser.RillYAML.Connectors))
for _, c := range parser.RillYAML.Connectors {
conns = append(conns, &runtimev1.Connector{
Expand Down
7 changes: 6 additions & 1 deletion runtime/testruntime/reconcile.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,9 +111,14 @@ func RequireReconcileState(t testing.TB, rt *runtime.Runtime, id string, lenReso
}
}

var names []string
for _, r := range rs {
names = append(names, fmt.Sprintf("%s/%s", r.Meta.Name.Kind, r.Meta.Name.Name))
}

require.Equal(t, lenParseErrs, len(parseErrs), "parse errors: %s", strings.Join(parseErrs, "\n"))
require.Equal(t, lenReconcileErrs, len(reconcileErrs), "reconcile errors: %s", strings.Join(reconcileErrs, "\n"))
require.Equal(t, lenResources, len(rs), "resources")
require.Equal(t, lenResources, len(rs), "resources: %s", strings.Join(names, "\n"))
}

func RequireResource(t testing.TB, rt *runtime.Runtime, id string, a *runtimev1.Resource) {
Expand Down

0 comments on commit af03bcc

Please sign in to comment.