diff --git a/runtime/connection_cache.go b/runtime/connection_cache.go index 3369509d0ed..a4587ccd3ce 100644 --- a/runtime/connection_cache.go +++ b/runtime/connection_cache.go @@ -197,14 +197,20 @@ func (c *connectionCache) get(ctx context.Context, instanceID, driver string, co var err error select { case <-conn.ready: - err = conn.err case <-ctx.Done(): err = ctx.Err() // Will always be non-nil, ensuring releaseConn is called } + + // Lock again for accessing conn + c.lock.Lock() + defer c.lock.Unlock() + + if err == nil { + err = conn.err + } + if err != nil { - c.lock.Lock() c.releaseConn(key, conn) - c.lock.Unlock() return nil, nil, err } diff --git a/runtime/connection_cache_test.go b/runtime/connection_cache_test.go index 8132b0bfda6..eef58fa5904 100644 --- a/runtime/connection_cache_test.go +++ b/runtime/connection_cache_test.go @@ -195,15 +195,18 @@ func TestConnectionCacheMultipleConfigs(t *testing.T) { func TestConnectionCacheParallelCalls(t *testing.T) { ctx := context.Background() - c := newConnectionCache(10, zap.NewNop(), newTestRuntimeWithInst(t), activity.NewNoopClient()) - defer c.Close() - m := &mockDriver{} drivers.Register("mock_driver", m) defer func() { delete(drivers.Drivers, "mock_driver") }() + rt := newTestRuntimeWithInst(t) + defer rt.Close() + + c := newConnectionCache(10, zap.NewNop(), rt, activity.NewNoopClient()) + defer c.Close() + var wg sync.WaitGroup wg.Add(10) // open 10 connections and verify no error @@ -224,16 +227,18 @@ func TestConnectionCacheParallelCalls(t *testing.T) { func TestConnectionCacheBlockingCalls(t *testing.T) { ctx := context.Background() - rt := newTestRuntimeWithInst(t) - c := newConnectionCache(10, zap.NewNop(), rt, activity.NewNoopClient()) - defer c.Close() - m := &mockDriver{} drivers.Register("mock_driver", m) defer func() { delete(drivers.Drivers, "mock_driver") }() + rt := newTestRuntimeWithInst(t) + defer rt.Close() + + c := newConnectionCache(10, zap.NewNop(), rt, activity.NewNoopClient()) + defer c.Close() + var wg sync.WaitGroup wg.Add(12) // open 1 slow connection diff --git a/runtime/controller_test.go b/runtime/controller_test.go index b1a13057d6a..74a27818095 100644 --- a/runtime/controller_test.go +++ b/runtime/controller_test.go @@ -1020,6 +1020,9 @@ func TestWatch(t *testing.T) { require.NoError(t, err) } + // Make sure there's time for the watcher to start + awaitIdle() + testruntime.PutFiles(t, rt, id, map[string]string{ "/data/foo.csv": `a,b,c,d,e 1,2,3,4,5 diff --git a/runtime/drivers/registry.go b/runtime/drivers/registry.go index a3fc6f4a586..86a52df1928 100644 --- a/runtime/drivers/registry.go +++ b/runtime/drivers/registry.go @@ -61,6 +61,7 @@ type Instance struct { // ResolveVariables returns the final resolved variables func (i *Instance) ResolveVariables() map[string]string { r := make(map[string]string, len(i.ProjectVariables)) + // set ProjectVariables first i.e. Project defaults for k, v := range i.ProjectVariables { r[k] = v diff --git a/runtime/reconcilers/project_parser.go b/runtime/reconcilers/project_parser.go index 71e04f8fb31..00f7219e94a 100644 --- a/runtime/reconcilers/project_parser.go +++ b/runtime/reconcilers/project_parser.go @@ -286,6 +286,10 @@ func (r *ProjectParserReconciler) reconcileProjectConfig(ctx context.Context, pa return err } + // Shallow clone for editing + tmp := *inst + inst = &tmp + conns := make([]*runtimev1.Connector, 0, len(parser.RillYAML.Connectors)) for _, c := range parser.RillYAML.Connectors { conns = append(conns, &runtimev1.Connector{ diff --git a/runtime/testruntime/reconcile.go b/runtime/testruntime/reconcile.go index 9f2e4e1d635..22c935dd07d 100644 --- a/runtime/testruntime/reconcile.go +++ b/runtime/testruntime/reconcile.go @@ -111,9 +111,14 @@ func RequireReconcileState(t testing.TB, rt *runtime.Runtime, id string, lenReso } } + var names []string + for _, r := range rs { + names = append(names, fmt.Sprintf("%s/%s", r.Meta.Name.Kind, r.Meta.Name.Name)) + } + require.Equal(t, lenParseErrs, len(parseErrs), "parse errors: %s", strings.Join(parseErrs, "\n")) require.Equal(t, lenReconcileErrs, len(reconcileErrs), "reconcile errors: %s", strings.Join(reconcileErrs, "\n")) - require.Equal(t, lenResources, len(rs), "resources") + require.Equal(t, lenResources, len(rs), "resources: %s", strings.Join(names, "\n")) } func RequireResource(t testing.TB, rt *runtime.Runtime, id string, a *runtimev1.Resource) {