Skip to content

Commit

Permalink
Use Filesystem locking to avoid concurrent updates
Browse files Browse the repository at this point in the history
  • Loading branch information
prakhar10 committed Dec 13, 2024
1 parent 2c31fef commit c0bb63d
Show file tree
Hide file tree
Showing 4 changed files with 70 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -453,7 +453,7 @@ public Response getRoutingRules()
@Consumes(MediaType.APPLICATION_JSON)
@Produces(MediaType.APPLICATION_JSON)
@Path("/updateRoutingRules")
public synchronized Response updateRoutingRules(RoutingRule routingRule)
public Response updateRoutingRules(RoutingRule routingRule)
throws IOException
{
List<RoutingRule> routingRulesList = routingRulesManager.updateRoutingRule(routingRule);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,11 @@
import io.trino.gateway.ha.domain.RoutingRule;

import java.io.IOException;
import java.nio.channels.FileChannel;
import java.nio.channels.FileLock;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
import java.util.List;

Expand Down Expand Up @@ -60,7 +63,7 @@ public List<RoutingRule> getRoutingRules()
}
}

public List<RoutingRule> updateRoutingRule(RoutingRule routingRule)
public synchronized List<RoutingRule> updateRoutingRule(RoutingRule routingRule)
throws IOException
{
ImmutableList.Builder<RoutingRule> updatedRoutingRulesBuilder = ImmutableList.builder();
Expand All @@ -86,7 +89,11 @@ public List<RoutingRule> updateRoutingRule(RoutingRule routingRule)
yamlContent.append(yamlWriter.writeValueAsString(rule));
updatedRoutingRulesBuilder.add(rule);
}
Files.writeString(Paths.get(rulesConfigPath), yamlContent.toString(), UTF_8);
try (FileChannel fileChannel = FileChannel.open(Paths.get(rulesConfigPath), StandardOpenOption.WRITE, StandardOpenOption.READ);
FileLock lock = fileChannel.lock()) {
Files.writeString(Paths.get(rulesConfigPath), yamlContent.toString(), UTF_8);
lock.release();
}
}
catch (IOException e) {
throw new IOException("Failed to parse or update routing rules configuration form path : " + rulesConfigPath, e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,9 @@

import java.io.IOException;
import java.nio.file.NoSuchFileException;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
Expand All @@ -42,7 +43,13 @@ void testGetRoutingRules()
List<RoutingRule> result = routingRulesManager.getRoutingRules();

assertThat(result).hasSize(2);
assertThat(result.getFirst()).isEqualTo(new RoutingRule("airflow", "if query from airflow, route to etl group", null, Collections.singletonList("result.put(\"routingGroup\", \"etl\")"), "request.getHeader(\"X-Trino-Source\") == \"airflow\" && (request.getHeader(\"X-Trino-Client-Tags\") == null || request.getHeader(\"X-Trino-Client-Tags\").isEmpty())"));
assertThat(result.getFirst()).isEqualTo(
new RoutingRule(
"airflow",
"if query from airflow, route to etl group",
null,
List.of("result.put(\"routingGroup\", \"etl\")"),
"request.getHeader(\"X-Trino-Source\") == \"airflow\" && (request.getHeader(\"X-Trino-Client-Tags\") == null || request.getHeader(\"X-Trino-Client-Tags\").isEmpty())"));
}

@Test
Expand Down Expand Up @@ -95,4 +102,48 @@ void testUpdateRoutingRulesNoSuchFileException()

assertThatThrownBy(() -> routingRulesManager.updateRoutingRule(routingRules)).hasRootCauseInstanceOf(NoSuchFileException.class);
}

@Test
void testConcurrentUpdateRoutingRule()
throws IOException
{
HaGatewayConfiguration configuration = new HaGatewayConfiguration();
RoutingRulesConfiguration routingRulesConfiguration = new RoutingRulesConfiguration();
String rulesConfigPath = "src/test/resources/rules/routing_rules_concurrent.yml";
routingRulesConfiguration.setRulesConfigPath(rulesConfigPath);
configuration.setRoutingRules(routingRulesConfiguration);
RoutingRulesManager routingRulesManager = new RoutingRulesManager(configuration);

RoutingRule routingRule1 = new RoutingRule("airflow", "if query from airflow, route to etl group", 0, List.of("result.put(\"routingGroup\", \"etl\")"), "request.getHeader(\"X-Trino-Source\") == \"airflow\"");
RoutingRule routingRule2 = new RoutingRule("airflow", "if query from airflow, route to adhoc group", 0, List.of("result.put(\"routingGroup\", \"adhoc\")"), "request.getHeader(\"X-Trino-Source\") == \"datagrip\"");

ExecutorService executorService = Executors.newFixedThreadPool(2);

executorService.submit(() ->
{
try {
routingRulesManager.updateRoutingRule(routingRule1);
}
catch (IOException e) {
throw new RuntimeException(e);
}
});

executorService.submit(() ->
{
try {
routingRulesManager.updateRoutingRule(routingRule2);
}
catch (IOException e) {
throw new RuntimeException(e);
}
});

executorService.shutdown();
List<RoutingRule> updatedRoutingRules = routingRulesManager.getRoutingRules();
assertThat(updatedRoutingRules).hasSize(1);
assertThat(updatedRoutingRules.getFirst().condition()).isEqualTo("request.getHeader(\"X-Trino-Source\") == \"datagrip\"");
assertThat(updatedRoutingRules.getFirst().actions().getFirst()).isEqualTo("result.put(\"routingGroup\", \"adhoc\")");
assertThat(updatedRoutingRules.getFirst().description()).isEqualTo("if query from airflow, route to adhoc group");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
name: "airflow"
description: "if query from airflow, route to adhoc group"
priority: 0
actions:
- "result.put(\"routingGroup\", \"adhoc\")"
condition: "request.getHeader(\"X-Trino-Source\") == \"datagrip\""

0 comments on commit c0bb63d

Please sign in to comment.