diff --git a/gateway-ha/src/main/java/io/trino/gateway/ha/resource/GatewayWebAppResource.java b/gateway-ha/src/main/java/io/trino/gateway/ha/resource/GatewayWebAppResource.java index bceafdd4b..15e5f9689 100644 --- a/gateway-ha/src/main/java/io/trino/gateway/ha/resource/GatewayWebAppResource.java +++ b/gateway-ha/src/main/java/io/trino/gateway/ha/resource/GatewayWebAppResource.java @@ -453,7 +453,7 @@ public Response getRoutingRules() @Consumes(MediaType.APPLICATION_JSON) @Produces(MediaType.APPLICATION_JSON) @Path("/updateRoutingRules") - public synchronized Response updateRoutingRules(RoutingRule routingRule) + public Response updateRoutingRules(RoutingRule routingRule) throws IOException { List routingRulesList = routingRulesManager.updateRoutingRule(routingRule); diff --git a/gateway-ha/src/main/java/io/trino/gateway/ha/router/RoutingRulesManager.java b/gateway-ha/src/main/java/io/trino/gateway/ha/router/RoutingRulesManager.java index 816fd3ef1..ffc7b7f32 100644 --- a/gateway-ha/src/main/java/io/trino/gateway/ha/router/RoutingRulesManager.java +++ b/gateway-ha/src/main/java/io/trino/gateway/ha/router/RoutingRulesManager.java @@ -23,8 +23,11 @@ import io.trino.gateway.ha.domain.RoutingRule; import java.io.IOException; +import java.nio.channels.FileChannel; +import java.nio.channels.FileLock; import java.nio.file.Files; import java.nio.file.Paths; +import java.nio.file.StandardOpenOption; import java.util.ArrayList; import java.util.List; @@ -60,7 +63,7 @@ public List getRoutingRules() } } - public List updateRoutingRule(RoutingRule routingRule) + public synchronized List updateRoutingRule(RoutingRule routingRule) throws IOException { ImmutableList.Builder updatedRoutingRulesBuilder = ImmutableList.builder(); @@ -86,7 +89,11 @@ public List updateRoutingRule(RoutingRule routingRule) yamlContent.append(yamlWriter.writeValueAsString(rule)); updatedRoutingRulesBuilder.add(rule); } - Files.writeString(Paths.get(rulesConfigPath), yamlContent.toString(), UTF_8); + try (FileChannel fileChannel = FileChannel.open(Paths.get(rulesConfigPath), StandardOpenOption.WRITE, StandardOpenOption.READ); + FileLock lock = fileChannel.lock()) { + Files.writeString(Paths.get(rulesConfigPath), yamlContent.toString(), UTF_8); + lock.release(); + } } catch (IOException e) { throw new IOException("Failed to parse or update routing rules configuration form path : " + rulesConfigPath, e); diff --git a/gateway-ha/src/test/java/io/trino/gateway/ha/router/TestRoutingRulesManager.java b/gateway-ha/src/test/java/io/trino/gateway/ha/router/TestRoutingRulesManager.java index 4578c49b1..aa2915c95 100644 --- a/gateway-ha/src/test/java/io/trino/gateway/ha/router/TestRoutingRulesManager.java +++ b/gateway-ha/src/test/java/io/trino/gateway/ha/router/TestRoutingRulesManager.java @@ -20,8 +20,9 @@ import java.io.IOException; import java.nio.file.NoSuchFileException; -import java.util.Collections; import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -42,7 +43,13 @@ void testGetRoutingRules() List result = routingRulesManager.getRoutingRules(); assertThat(result).hasSize(2); - assertThat(result.getFirst()).isEqualTo(new RoutingRule("airflow", "if query from airflow, route to etl group", null, Collections.singletonList("result.put(\"routingGroup\", \"etl\")"), "request.getHeader(\"X-Trino-Source\") == \"airflow\" && (request.getHeader(\"X-Trino-Client-Tags\") == null || request.getHeader(\"X-Trino-Client-Tags\").isEmpty())")); + assertThat(result.getFirst()).isEqualTo( + new RoutingRule( + "airflow", + "if query from airflow, route to etl group", + null, + List.of("result.put(\"routingGroup\", \"etl\")"), + "request.getHeader(\"X-Trino-Source\") == \"airflow\" && (request.getHeader(\"X-Trino-Client-Tags\") == null || request.getHeader(\"X-Trino-Client-Tags\").isEmpty())")); } @Test @@ -95,4 +102,48 @@ void testUpdateRoutingRulesNoSuchFileException() assertThatThrownBy(() -> routingRulesManager.updateRoutingRule(routingRules)).hasRootCauseInstanceOf(NoSuchFileException.class); } + + @Test + void testConcurrentUpdateRoutingRule() + throws IOException + { + HaGatewayConfiguration configuration = new HaGatewayConfiguration(); + RoutingRulesConfiguration routingRulesConfiguration = new RoutingRulesConfiguration(); + String rulesConfigPath = "src/test/resources/rules/routing_rules_concurrent.yml"; + routingRulesConfiguration.setRulesConfigPath(rulesConfigPath); + configuration.setRoutingRules(routingRulesConfiguration); + RoutingRulesManager routingRulesManager = new RoutingRulesManager(configuration); + + RoutingRule routingRule1 = new RoutingRule("airflow", "if query from airflow, route to etl group", 0, List.of("result.put(\"routingGroup\", \"etl\")"), "request.getHeader(\"X-Trino-Source\") == \"airflow\""); + RoutingRule routingRule2 = new RoutingRule("airflow", "if query from airflow, route to adhoc group", 0, List.of("result.put(\"routingGroup\", \"adhoc\")"), "request.getHeader(\"X-Trino-Source\") == \"datagrip\""); + + ExecutorService executorService = Executors.newFixedThreadPool(2); + + executorService.submit(() -> + { + try { + routingRulesManager.updateRoutingRule(routingRule1); + } + catch (IOException e) { + throw new RuntimeException(e); + } + }); + + executorService.submit(() -> + { + try { + routingRulesManager.updateRoutingRule(routingRule2); + } + catch (IOException e) { + throw new RuntimeException(e); + } + }); + + executorService.shutdown(); + List updatedRoutingRules = routingRulesManager.getRoutingRules(); + assertThat(updatedRoutingRules).hasSize(1); + assertThat(updatedRoutingRules.getFirst().condition()).isEqualTo("request.getHeader(\"X-Trino-Source\") == \"datagrip\""); + assertThat(updatedRoutingRules.getFirst().actions().getFirst()).isEqualTo("result.put(\"routingGroup\", \"adhoc\")"); + assertThat(updatedRoutingRules.getFirst().description()).isEqualTo("if query from airflow, route to adhoc group"); + } } diff --git a/gateway-ha/src/test/resources/rules/routing_rules_concurrent.yml b/gateway-ha/src/test/resources/rules/routing_rules_concurrent.yml new file mode 100644 index 000000000..f492dfeaa --- /dev/null +++ b/gateway-ha/src/test/resources/rules/routing_rules_concurrent.yml @@ -0,0 +1,7 @@ +--- +name: "airflow" +description: "if query from airflow, route to adhoc group" +priority: 0 +actions: +- "result.put(\"routingGroup\", \"adhoc\")" +condition: "request.getHeader(\"X-Trino-Source\") == \"datagrip\""