From 3a261ba4ec9685b27d5287b239503df4bce767e5 Mon Sep 17 00:00:00 2001 From: Ikhun Um Date: Thu, 5 Dec 2024 17:05:48 +0900 Subject: [PATCH] Provide a way to pin a mirror to a zone (#1062) Motivation: I propose running the mirror in a zone close to the git server or only in zones that are allowed access. Modifications: - Add `zone` as an optional property to mirror configurations. - Add `GET /api/v1/mirror/config` API to provide zone-related configurations. - The API is used to select a pinned zone on the mirror form. - Add `zonePinned` flag to `MirroringServicePluginConfig` to enable zone-pinned mirroring. - The option is disabled by default. - `DefaultMirroringServicePlugin.target()` returns `PluginTarget.ZONE_LEADER_ONLY` if `zonePinned == true` - Fixed `MirrorSchedulingService` to only run a pinned zone if `zonePinned == true` - Add `ZoneConfig` to replace `zone: string` configuration. - `allZones` is newly added to specify the list of zone names. - Breaking) Make `Plugin.target()` take `CentralDogmaConfig` as an argument. Result: You can now specify a zone where you want to perform mirroring. --- .../internal/api/v1/MirrorDto.java | 18 +- .../listener/CustomMirrorListenerTest.java | 4 +- it/mirror/build.gradle | 2 + .../mirror/git/GitMirrorIntegrationTest.java | 3 +- .../git/LocalToRemoteGitMirrorTest.java | 5 +- .../it/mirror/git/MirrorRunnerTest.java | 13 +- .../git/TestZoneAwareMirrorListener.java | 78 ++++++ .../it/mirror/git/ZoneAwareMirrorTest.java | 249 ++++++++++++++++++ ....centraldogma.server.mirror.MirrorListener | 1 + .../it/zoneleader/ZoneLeaderPluginTest.java | 13 +- .../internal/mirror/AbstractGitMirror.java | 4 +- .../internal/mirror/DefaultGitMirror.java | 4 +- .../internal/mirror/GitMirrorProvider.java | 4 +- .../server/internal/mirror/SshGitMirror.java | 5 +- .../DefaultMetaRepositoryWithMirrorTest.java | 7 +- .../mirror/MirrorSchedulingServiceTest.java | 4 +- .../MirroringAndCredentialServiceV1Test.java | 9 +- .../internal/mirror/MirroringTaskTest.java | 8 +- .../internal/mirror/MirroringTestUtils.java | 4 +- .../centraldogma/server/CentralDogma.java | 60 ++--- .../server/CentralDogmaBuilder.java | 21 +- .../server/CentralDogmaConfig.java | 12 +- .../centraldogma/server/PluginGroup.java | 49 +++- .../centraldogma/server/ZoneConfig.java | 94 +++++++ .../internal/api/MirroringServiceV1.java | 51 +++- .../internal/mirror/AbstractMirror.java | 13 +- .../internal/mirror/CentralDogmaMirror.java | 4 +- .../mirror/DefaultMirroringServicePlugin.java | 42 ++- .../server/internal/mirror/MirrorRunner.java | 19 +- .../mirror/MirrorSchedulingService.java | 44 +++- .../storage/PurgeSchedulingServicePlugin.java | 4 +- .../CentralDogmaMirrorProvider.java | 2 +- .../repository/DefaultMetaRepository.java | 18 +- .../storage/repository/MirrorConfig.java | 15 +- .../centraldogma/server/mirror/Mirror.java | 6 + .../server/mirror/MirrorContext.java | 14 +- .../server/mirror/MirrorResult.java | 21 +- .../server/mirror/MirrorTask.java | 18 +- .../mirror/MirroringServicePluginConfig.java | 18 +- .../server/plugin/AllReplicasPlugin.java | 7 +- .../centraldogma/server/plugin/Plugin.java | 2 +- .../storage/repository/MetaRepository.java | 4 + .../centraldogma/server/PluginGroupTest.java | 14 +- .../mirror/CentralDogmaMirrorTest.java | 4 +- .../plugin/NoopPluginForAllReplicas.java | 4 +- .../server/plugin/NoopPluginForLeader.java | 4 +- site/src/sphinx/mirroring.rst | 12 +- site/src/sphinx/setup-configuration.rst | 20 +- .../CentralDogmaReplicationExtension.java | 9 +- .../internal/CentralDogmaRuleDelegate.java | 6 +- webapp/src/dogma/features/api/apiSlice.ts | 18 +- .../project/settings/mirrors/MirrorDto.ts | 1 + .../project/settings/mirrors/MirrorForm.tsx | 57 +++- .../project/settings/mirrors/MirrorView.tsx | 9 + .../webapp/ShiroCentralDogmaTestServer.java | 5 + .../xds/internal/ControlPlanePlugin.java | 5 +- .../XdsKubernetesEndpointFetchingPlugin.java | 13 +- 57 files changed, 985 insertions(+), 169 deletions(-) create mode 100644 it/mirror/src/test/java/com/linecorp/centraldogma/it/mirror/git/TestZoneAwareMirrorListener.java create mode 100644 it/mirror/src/test/java/com/linecorp/centraldogma/it/mirror/git/ZoneAwareMirrorTest.java create mode 100644 server/src/main/java/com/linecorp/centraldogma/server/ZoneConfig.java diff --git a/common/src/main/java/com/linecorp/centraldogma/internal/api/v1/MirrorDto.java b/common/src/main/java/com/linecorp/centraldogma/internal/api/v1/MirrorDto.java index 098a990874..11d22d7423 100644 --- a/common/src/main/java/com/linecorp/centraldogma/internal/api/v1/MirrorDto.java +++ b/common/src/main/java/com/linecorp/centraldogma/internal/api/v1/MirrorDto.java @@ -48,6 +48,8 @@ public final class MirrorDto { @Nullable private final String gitignore; private final String credentialId; + @Nullable + private final String zone; @JsonCreator public MirrorDto(@JsonProperty("id") String id, @@ -62,7 +64,8 @@ public MirrorDto(@JsonProperty("id") String id, @JsonProperty("remotePath") String remotePath, @JsonProperty("remoteBranch") String remoteBranch, @JsonProperty("gitignore") @Nullable String gitignore, - @JsonProperty("credentialId") String credentialId) { + @JsonProperty("credentialId") String credentialId, + @JsonProperty("zone") @Nullable String zone) { this.id = requireNonNull(id, "id"); this.enabled = firstNonNull(enabled, true); this.projectName = requireNonNull(projectName, "projectName"); @@ -76,6 +79,7 @@ public MirrorDto(@JsonProperty("id") String id, this.remoteBranch = requireNonNull(remoteBranch, "remoteBranch"); this.gitignore = gitignore; this.credentialId = requireNonNull(credentialId, "credentialId"); + this.zone = zone; } @JsonProperty("id") @@ -145,6 +149,12 @@ public String credentialId() { return credentialId; } + @Nullable + @JsonProperty("zone") + public String zone() { + return zone; + } + @Override public boolean equals(Object o) { if (this == o) { @@ -166,13 +176,14 @@ public boolean equals(Object o) { remotePath.equals(mirrorDto.remotePath) && remoteBranch.equals(mirrorDto.remoteBranch) && Objects.equals(gitignore, mirrorDto.gitignore) && - credentialId.equals(mirrorDto.credentialId); + credentialId.equals(mirrorDto.credentialId) && + Objects.equals(zone, mirrorDto.zone); } @Override public int hashCode() { return Objects.hash(id, projectName, schedule, direction, localRepo, localPath, remoteScheme, - remoteUrl, remotePath, remoteBranch, gitignore, credentialId, enabled); + remoteUrl, remotePath, remoteBranch, gitignore, credentialId, enabled, zone); } @Override @@ -191,6 +202,7 @@ public String toString() { .add("remotePath", remotePath) .add("gitignore", gitignore) .add("credentialId", credentialId) + .add("zone", zone) .toString(); } } diff --git a/it/mirror-listener/src/test/java/com/linecorp/centraldogma/it/mirror/listener/CustomMirrorListenerTest.java b/it/mirror-listener/src/test/java/com/linecorp/centraldogma/it/mirror/listener/CustomMirrorListenerTest.java index c896f1371d..bf992db847 100644 --- a/it/mirror-listener/src/test/java/com/linecorp/centraldogma/it/mirror/listener/CustomMirrorListenerTest.java +++ b/it/mirror-listener/src/test/java/com/linecorp/centraldogma/it/mirror/listener/CustomMirrorListenerTest.java @@ -89,7 +89,7 @@ void shouldNotifyMirrorEvents() { final Mirror mirror = new AbstractMirror("my-mirror-1", true, EVERY_SECOND, MirrorDirection.REMOTE_TO_LOCAL, Credential.FALLBACK, r, "/", - URI.create("unused://uri"), "/", "", null) { + URI.create("unused://uri"), "/", "", null, null) { @Override protected MirrorResult mirrorLocalToRemote(File workDir, int maxNumFiles, long maxNumBytes, Instant triggeredTime) { @@ -114,7 +114,7 @@ protected MirrorResult mirrorRemoteToLocal(File workDir, CommandExecutor executo when(mr.mirrors()).thenReturn(CompletableFuture.completedFuture(ImmutableList.of(mirror))); final MirrorSchedulingService service = new MirrorSchedulingService( - temporaryFolder, pm, new SimpleMeterRegistry(), 1, 1, 1); + temporaryFolder, pm, new SimpleMeterRegistry(), 1, 1, 1, null); final CommandExecutor executor = mock(CommandExecutor.class); service.start(executor); diff --git a/it/mirror/build.gradle b/it/mirror/build.gradle index f333c214a4..2926988efd 100644 --- a/it/mirror/build.gradle +++ b/it/mirror/build.gradle @@ -7,4 +7,6 @@ dependencies { testImplementation libs.jsch testImplementation libs.mina.sshd.core testImplementation libs.mina.sshd.git + testImplementation libs.zookeeper + testImplementation libs.dropwizard.metrics.core } diff --git a/it/mirror/src/test/java/com/linecorp/centraldogma/it/mirror/git/GitMirrorIntegrationTest.java b/it/mirror/src/test/java/com/linecorp/centraldogma/it/mirror/git/GitMirrorIntegrationTest.java index 5131b16020..6db28ccdf4 100644 --- a/it/mirror/src/test/java/com/linecorp/centraldogma/it/mirror/git/GitMirrorIntegrationTest.java +++ b/it/mirror/src/test/java/com/linecorp/centraldogma/it/mirror/git/GitMirrorIntegrationTest.java @@ -88,7 +88,8 @@ class GitMirrorIntegrationTest { static final CentralDogmaExtension dogma = new CentralDogmaExtension() { @Override protected void configure(CentralDogmaBuilder builder) { - builder.pluginConfigs(new MirroringServicePluginConfig(true, 1, MAX_NUM_FILES, MAX_NUM_BYTES)); + builder.pluginConfigs(new MirroringServicePluginConfig(true, 1, MAX_NUM_FILES, MAX_NUM_BYTES, + false)); } }; diff --git a/it/mirror/src/test/java/com/linecorp/centraldogma/it/mirror/git/LocalToRemoteGitMirrorTest.java b/it/mirror/src/test/java/com/linecorp/centraldogma/it/mirror/git/LocalToRemoteGitMirrorTest.java index aaff0392f1..f5c48fa557 100644 --- a/it/mirror/src/test/java/com/linecorp/centraldogma/it/mirror/git/LocalToRemoteGitMirrorTest.java +++ b/it/mirror/src/test/java/com/linecorp/centraldogma/it/mirror/git/LocalToRemoteGitMirrorTest.java @@ -81,7 +81,8 @@ class LocalToRemoteGitMirrorTest { static final CentralDogmaExtension dogma = new CentralDogmaExtension() { @Override protected void configure(CentralDogmaBuilder builder) { - builder.pluginConfigs(new MirroringServicePluginConfig(true, 1, MAX_NUM_FILES, MAX_NUM_BYTES)); + builder.pluginConfigs( + new MirroringServicePluginConfig(true, 1, MAX_NUM_FILES, MAX_NUM_BYTES, false)); } }; @@ -371,7 +372,7 @@ private void pushMirrorSettings(String localRepo, @Nullable String localPath, @N .push().join(); } catch (CompletionException e) { if (e.getCause() instanceof RedundantChangeException) { - // The same content can be pushed several times. + // The same content can be pushed several times. } else { throw e; } diff --git a/it/mirror/src/test/java/com/linecorp/centraldogma/it/mirror/git/MirrorRunnerTest.java b/it/mirror/src/test/java/com/linecorp/centraldogma/it/mirror/git/MirrorRunnerTest.java index 0bb74fe02b..d33ce5c203 100644 --- a/it/mirror/src/test/java/com/linecorp/centraldogma/it/mirror/git/MirrorRunnerTest.java +++ b/it/mirror/src/test/java/com/linecorp/centraldogma/it/mirror/git/MirrorRunnerTest.java @@ -49,10 +49,10 @@ class MirrorRunnerTest { - private static final String FOO_PROJ = "foo"; - private static final String BAR_REPO = "bar"; - private static final String PRIVATE_KEY_FILE = "ecdsa_256.openssh"; - private static final String TEST_MIRROR_ID = "test-mirror"; + static final String FOO_PROJ = "foo"; + static final String BAR_REPO = "bar"; + static final String PRIVATE_KEY_FILE = "ecdsa_256.openssh"; + static final String TEST_MIRROR_ID = "test-mirror"; @RegisterExtension static final CentralDogmaExtension dogma = new CentralDogmaExtension() { @@ -159,10 +159,11 @@ private static MirrorDto newMirror() { "/", "main", null, - PRIVATE_KEY_FILE); + PRIVATE_KEY_FILE, + null); } - private static PublicKeyCredential getCredential() throws Exception { + static PublicKeyCredential getCredential() throws Exception { final String publicKeyFile = "ecdsa_256.openssh.pub"; final byte[] privateKeyBytes = diff --git a/it/mirror/src/test/java/com/linecorp/centraldogma/it/mirror/git/TestZoneAwareMirrorListener.java b/it/mirror/src/test/java/com/linecorp/centraldogma/it/mirror/git/TestZoneAwareMirrorListener.java new file mode 100644 index 0000000000..caf9e3f937 --- /dev/null +++ b/it/mirror/src/test/java/com/linecorp/centraldogma/it/mirror/git/TestZoneAwareMirrorListener.java @@ -0,0 +1,78 @@ +/* + * Copyright 2024 LINE Corporation + * + * LINE Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.linecorp.centraldogma.it.mirror.git; + +import static com.google.common.base.MoreObjects.firstNonNull; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.linecorp.centraldogma.server.mirror.MirrorListener; +import com.linecorp.centraldogma.server.mirror.MirrorResult; +import com.linecorp.centraldogma.server.mirror.MirrorTask; + +public class TestZoneAwareMirrorListener implements MirrorListener { + + private static final Logger logger = LoggerFactory.getLogger(TestZoneAwareMirrorListener.class); + + static final Map startCount = new ConcurrentHashMap<>(); + static final Map> completions = new ConcurrentHashMap<>(); + static final Map> errors = new ConcurrentHashMap<>(); + + static void reset() { + startCount.clear(); + completions.clear(); + errors.clear(); + } + + private static String key(MirrorTask task) { + return firstNonNull(task.currentZone(), "default"); + } + + @Override + public void onStart(MirrorTask mirror) { + logger.debug("onStart: {}", mirror); + startCount.merge(key(mirror), 1, Integer::sum); + } + + @Override + public void onComplete(MirrorTask mirror, MirrorResult result) { + logger.debug("onComplete: {} -> {}", mirror, result); + final List results = new ArrayList<>(); + results.add(result); + completions.merge(key(mirror), results, (oldValue, newValue) -> { + oldValue.addAll(newValue); + return oldValue; + }); + } + + @Override + public void onError(MirrorTask mirror, Throwable cause) { + logger.debug("onError: {}", mirror, cause); + final List exceptions = new ArrayList<>(); + exceptions.add(cause); + errors.merge(key(mirror), exceptions, (oldValue, newValue) -> { + oldValue.addAll(newValue); + return oldValue; + }); + } +} diff --git a/it/mirror/src/test/java/com/linecorp/centraldogma/it/mirror/git/ZoneAwareMirrorTest.java b/it/mirror/src/test/java/com/linecorp/centraldogma/it/mirror/git/ZoneAwareMirrorTest.java new file mode 100644 index 0000000000..f1bd77cbba --- /dev/null +++ b/it/mirror/src/test/java/com/linecorp/centraldogma/it/mirror/git/ZoneAwareMirrorTest.java @@ -0,0 +1,249 @@ +/* + * Copyright 2024 LINE Corporation + * + * LINE Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.linecorp.centraldogma.it.mirror.git; + +import static com.linecorp.centraldogma.it.mirror.git.MirrorRunnerTest.BAR_REPO; +import static com.linecorp.centraldogma.it.mirror.git.MirrorRunnerTest.FOO_PROJ; +import static com.linecorp.centraldogma.it.mirror.git.MirrorRunnerTest.PRIVATE_KEY_FILE; +import static com.linecorp.centraldogma.it.mirror.git.MirrorRunnerTest.TEST_MIRROR_ID; +import static com.linecorp.centraldogma.it.mirror.git.MirrorRunnerTest.getCredential; +import static com.linecorp.centraldogma.testing.internal.auth.TestAuthMessageUtil.PASSWORD; +import static com.linecorp.centraldogma.testing.internal.auth.TestAuthMessageUtil.USERNAME; +import static com.linecorp.centraldogma.testing.internal.auth.TestAuthMessageUtil.getAccessToken; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.catchThrowableOfType; +import static org.awaitility.Awaitility.await; + +import java.net.URI; +import java.util.List; + +import javax.annotation.Nullable; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.FieldSource; + +import com.fasterxml.jackson.databind.JsonNode; +import com.google.common.collect.ImmutableList; + +import com.linecorp.armeria.client.BlockingWebClient; +import com.linecorp.armeria.client.InvalidHttpResponseException; +import com.linecorp.armeria.client.WebClient; +import com.linecorp.armeria.common.HttpStatus; +import com.linecorp.armeria.common.ResponseEntity; +import com.linecorp.armeria.common.auth.AuthToken; +import com.linecorp.centraldogma.client.CentralDogma; +import com.linecorp.centraldogma.client.CentralDogmaRepository; +import com.linecorp.centraldogma.client.armeria.ArmeriaCentralDogmaBuilder; +import com.linecorp.centraldogma.common.Change; +import com.linecorp.centraldogma.common.MirrorException; +import com.linecorp.centraldogma.internal.Jackson; +import com.linecorp.centraldogma.internal.api.v1.MirrorDto; +import com.linecorp.centraldogma.internal.api.v1.PushResultDto; +import com.linecorp.centraldogma.server.CentralDogmaBuilder; +import com.linecorp.centraldogma.server.ZoneConfig; +import com.linecorp.centraldogma.server.internal.credential.PublicKeyCredential; +import com.linecorp.centraldogma.server.internal.storage.repository.MirrorConfig; +import com.linecorp.centraldogma.server.mirror.MirrorDirection; +import com.linecorp.centraldogma.server.mirror.MirrorResult; +import com.linecorp.centraldogma.server.mirror.MirroringServicePluginConfig; +import com.linecorp.centraldogma.testing.internal.CentralDogmaReplicationExtension; +import com.linecorp.centraldogma.testing.internal.CentralDogmaRuleDelegate; +import com.linecorp.centraldogma.testing.internal.auth.TestAuthProviderFactory; + +class ZoneAwareMirrorTest { + + private static final List ZONES = ImmutableList.of("zone1", "zone2", "zone3"); + + @RegisterExtension + CentralDogmaReplicationExtension cluster = new CentralDogmaReplicationExtension(3) { + @Override + protected void configureEach(int serverId, CentralDogmaBuilder builder) { + builder.authProviderFactory(new TestAuthProviderFactory()); + builder.administrators(USERNAME); + builder.zone(new ZoneConfig(ZONES.get(serverId - 1), ZONES)); + builder.pluginConfigs(new MirroringServicePluginConfig(true, null, null, null, true)); + } + + @Override + protected boolean runForEachTest() { + return true; + } + }; + + private static int serverPort; + private static String accessToken; + + @BeforeEach + void setUp() throws Exception { + final CentralDogmaRuleDelegate server1 = cluster.servers().get(0); + serverPort = server1.serverAddress().getPort(); + accessToken = getAccessToken( + WebClient.of("http://127.0.0.1:" + serverPort), + USERNAME, PASSWORD); + + final CentralDogma client = + new ArmeriaCentralDogmaBuilder() + .host("127.0.0.1", serverPort) + .accessToken(accessToken) + .build(); + client.createProject(FOO_PROJ).join(); + for (String zone : ZONES) { + client.createRepository(FOO_PROJ, BAR_REPO + '-' + zone).join(); + } + client.createRepository(FOO_PROJ, "bar-default").join(); + client.createRepository(FOO_PROJ, "bar-unknown-zone").join(); + TestZoneAwareMirrorListener.reset(); + } + + @FieldSource("ZONES") + @ParameterizedTest + void shouldRunMirrorTaskOnPinnedZone(String zone) throws Exception { + createMirror(zone); + + await().untilAsserted(() -> { + // Wait for three mirror tasks to run to ensure that all tasks are running in the same zone. + assertThat(TestZoneAwareMirrorListener.startCount.get(zone)).isGreaterThanOrEqualTo(3); + }); + await().untilAsserted(() -> { + final List results = TestZoneAwareMirrorListener.completions.get(zone); + assertThat(results).hasSizeGreaterThan(3); + // Make sure that the mirror was executed in the specified zone. + assertThat(results).allSatisfy(result -> { + assertThat(result.zone()).isEqualTo(zone); + assertThat(result.repoName()).isEqualTo(BAR_REPO + '-' + zone); + }); + }); + assertThat(TestZoneAwareMirrorListener.errors.get(zone)).isNullOrEmpty(); + } + + @Test + void shouldRunUnpinnedMirrorTaskOnDefaultZone() throws Exception { + createMirror(null); + // The default zone is the first zone in the list. + final String defaultZone = ZONES.get(0); + await().untilAsserted(() -> { + // Wait for 3 mirror tasks to be run to verify all jobs are executed in the same zone. + assertThat(TestZoneAwareMirrorListener.startCount.get(defaultZone)).isGreaterThanOrEqualTo(3); + }); + await().untilAsserted(() -> { + final List results = TestZoneAwareMirrorListener.completions.get(defaultZone); + assertThat(results).hasSizeGreaterThan(3); + // Make sure that the mirror was executed in the specified zone. + assertThat(results).allSatisfy(mirrorResult -> { + assertThat(mirrorResult.zone()).isNull(); + assertThat(mirrorResult.repoName()).isEqualTo("bar-default"); + }); + }); + } + + @Test + void shouldRejectUnknownZone() throws Exception { + final String unknownZone = "unknown-zone"; + final InvalidHttpResponseException invalidResponseException = + catchThrowableOfType(InvalidHttpResponseException.class, () -> createMirror(unknownZone)); + assertThat(invalidResponseException.response().status()).isEqualTo(HttpStatus.BAD_REQUEST); + assertThat(invalidResponseException.response().contentUtf8()) + .contains("The zone 'unknown-zone' is not in the zone configuration"); + } + + @Test + void shouldWarnUnknownZoneForScheduledJob() throws Exception { + final CentralDogma client = cluster.servers().get(0).client(); + final CentralDogmaRepository repo = client.forRepo(FOO_PROJ, "meta"); + final String mirrorId = TEST_MIRROR_ID + "-unknown-zone"; + final String unknownZone = "unknown-zone"; + final MirrorConfig mirrorConfig = + new MirrorConfig(mirrorId, + true, + "0/1 * * * * ?", + MirrorDirection.REMOTE_TO_LOCAL, + "bar-unknown-zone", + "/", + URI.create( + "git+ssh://github.com/line/centraldogma-authtest.git/#main"), + null, + "foo", + unknownZone); + final Change change = + Change.ofJsonUpsert("/mirrors/" + mirrorId + ".json", Jackson.writeValueAsString(mirrorConfig)); + repo.commit("Add a mirror having an invalid zone", change) + .push().join(); + + await().untilAsserted(() -> { + // Wait for 3 mirror tasks to be run to verify all jobs are executed in the same zone. + assertThat(TestZoneAwareMirrorListener.startCount.get(unknownZone)).isGreaterThanOrEqualTo(1); + }); + await().untilAsserted(() -> { + final List results = TestZoneAwareMirrorListener.completions.get(unknownZone); + assertThat(results).isNullOrEmpty(); + final List causes = TestZoneAwareMirrorListener.errors.get(unknownZone); + + // Make sure that the mirror was executed in the specified zone. + assertThat(causes).allSatisfy(cause -> { + assertThat(cause).isInstanceOf(MirrorException.class) + .hasMessage("The mirror is pinned to an unknown zone: unknown-zone " + + "(valid zones: " + ZONES + ')'); + }); + }); + } + + private static void createMirror(String zone) throws Exception { + final BlockingWebClient client = WebClient.builder("http://127.0.0.1:" + serverPort) + .auth(AuthToken.ofOAuth2(accessToken)) + .build() + .blocking(); + + final PublicKeyCredential credential = getCredential(); + ResponseEntity response = + client.prepare() + .post("/api/v1/projects/{proj}/credentials") + .pathParam("proj", FOO_PROJ) + .contentJson(credential) + .asJson(PushResultDto.class) + .execute(); + assertThat(response.status()).isEqualTo(HttpStatus.CREATED); + + final MirrorDto newMirror = newMirror(zone); + response = client.prepare() + .post("/api/v1/projects/{proj}/mirrors") + .pathParam("proj", FOO_PROJ) + .contentJson(newMirror) + .asJson(PushResultDto.class) + .execute(); + assertThat(response.status()).isEqualTo(HttpStatus.CREATED); + } + + private static MirrorDto newMirror(@Nullable String zone) { + return new MirrorDto(TEST_MIRROR_ID + '-' + (zone == null ? "default" : zone), + true, + FOO_PROJ, + "0/1 * * * * ?", + "REMOTE_TO_LOCAL", + BAR_REPO + '-' + (zone == null ? "default" : zone), + "/", + "git+ssh", + "github.com/line/centraldogma-authtest.git", + "/", + "main", + null, + PRIVATE_KEY_FILE, + zone); + } +} diff --git a/it/mirror/src/test/resources/META-INF/services/com.linecorp.centraldogma.server.mirror.MirrorListener b/it/mirror/src/test/resources/META-INF/services/com.linecorp.centraldogma.server.mirror.MirrorListener index bb05c0ff8c..a698aaa28f 100644 --- a/it/mirror/src/test/resources/META-INF/services/com.linecorp.centraldogma.server.mirror.MirrorListener +++ b/it/mirror/src/test/resources/META-INF/services/com.linecorp.centraldogma.server.mirror.MirrorListener @@ -1 +1,2 @@ com.linecorp.centraldogma.it.mirror.git.TestMirrorRunnerListener +com.linecorp.centraldogma.it.mirror.git.TestZoneAwareMirrorListener diff --git a/it/zone-leader-plugin/src/test/java/com/linecorp/centraldogma/it/zoneleader/ZoneLeaderPluginTest.java b/it/zone-leader-plugin/src/test/java/com/linecorp/centraldogma/it/zoneleader/ZoneLeaderPluginTest.java index a8374bfceb..7489c0ca81 100644 --- a/it/zone-leader-plugin/src/test/java/com/linecorp/centraldogma/it/zoneleader/ZoneLeaderPluginTest.java +++ b/it/zone-leader-plugin/src/test/java/com/linecorp/centraldogma/it/zoneleader/ZoneLeaderPluginTest.java @@ -27,8 +27,12 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; +import com.google.common.collect.ImmutableList; + import com.linecorp.armeria.common.util.UnmodifiableFuture; import com.linecorp.centraldogma.server.CentralDogmaBuilder; +import com.linecorp.centraldogma.server.CentralDogmaConfig; +import com.linecorp.centraldogma.server.ZoneConfig; import com.linecorp.centraldogma.server.plugin.Plugin; import com.linecorp.centraldogma.server.plugin.PluginContext; import com.linecorp.centraldogma.server.plugin.PluginTarget; @@ -38,17 +42,18 @@ class ZoneLeaderPluginTest { private static final List plugins = new ArrayList<>(); private static final int NUM_REPLICAS = 9; + private static final List zones = ImmutableList.of("zone1", "zone2", "zone3"); @RegisterExtension static CentralDogmaReplicationExtension cluster = new CentralDogmaReplicationExtension(NUM_REPLICAS) { @Override protected void configureEach(int serverId, CentralDogmaBuilder builder) { if (serverId <= 3) { - builder.zone("zone1"); + builder.zone(new ZoneConfig("zone1", zones)); } else if (serverId <= 6) { - builder.zone("zone2"); + builder.zone(new ZoneConfig("zone2", zones)); } else { - builder.zone("zone3"); + builder.zone(new ZoneConfig("zone3", zones)); } final ZoneLeaderTestPlugin plugin = new ZoneLeaderTestPlugin(serverId); plugins.add(plugin); @@ -114,7 +119,7 @@ private ZoneLeaderTestPlugin(int serverId) { } @Override - public PluginTarget target() { + public PluginTarget target(CentralDogmaConfig config) { return PluginTarget.ZONE_LEADER_ONLY; } diff --git a/server-mirror-git/src/main/java/com/linecorp/centraldogma/server/internal/mirror/AbstractGitMirror.java b/server-mirror-git/src/main/java/com/linecorp/centraldogma/server/internal/mirror/AbstractGitMirror.java index 225339517a..bff561b3d5 100644 --- a/server-mirror-git/src/main/java/com/linecorp/centraldogma/server/internal/mirror/AbstractGitMirror.java +++ b/server-mirror-git/src/main/java/com/linecorp/centraldogma/server/internal/mirror/AbstractGitMirror.java @@ -125,9 +125,9 @@ abstract class AbstractGitMirror extends AbstractMirror { AbstractGitMirror(String id, boolean enabled, @Nullable Cron schedule, MirrorDirection direction, Credential credential, Repository localRepo, String localPath, URI remoteRepoUri, String remotePath, String remoteBranch, - @Nullable String gitignore) { + @Nullable String gitignore, @Nullable String zone) { super(id, enabled, schedule, direction, credential, localRepo, localPath, remoteRepoUri, remotePath, - remoteBranch, gitignore); + remoteBranch, gitignore, zone); if (gitignore != null) { ignoreNode = new IgnoreNode(); diff --git a/server-mirror-git/src/main/java/com/linecorp/centraldogma/server/internal/mirror/DefaultGitMirror.java b/server-mirror-git/src/main/java/com/linecorp/centraldogma/server/internal/mirror/DefaultGitMirror.java index 6687d3cc9c..e92a893453 100644 --- a/server-mirror-git/src/main/java/com/linecorp/centraldogma/server/internal/mirror/DefaultGitMirror.java +++ b/server-mirror-git/src/main/java/com/linecorp/centraldogma/server/internal/mirror/DefaultGitMirror.java @@ -47,9 +47,9 @@ final class DefaultGitMirror extends AbstractGitMirror { DefaultGitMirror(String id, boolean enabled, @Nullable Cron schedule, MirrorDirection direction, Credential credential, Repository localRepo, String localPath, URI remoteRepoUri, String remotePath, String remoteBranch, - @Nullable String gitignore) { + @Nullable String gitignore, @Nullable String zone) { super(id, enabled, schedule, direction, credential, localRepo, localPath, remoteRepoUri, remotePath, - remoteBranch, gitignore); + remoteBranch, gitignore, zone); } @Override diff --git a/server-mirror-git/src/main/java/com/linecorp/centraldogma/server/internal/mirror/GitMirrorProvider.java b/server-mirror-git/src/main/java/com/linecorp/centraldogma/server/internal/mirror/GitMirrorProvider.java index dd6a2baee8..51711b9fca 100644 --- a/server-mirror-git/src/main/java/com/linecorp/centraldogma/server/internal/mirror/GitMirrorProvider.java +++ b/server-mirror-git/src/main/java/com/linecorp/centraldogma/server/internal/mirror/GitMirrorProvider.java @@ -49,7 +49,7 @@ public Mirror newMirror(MirrorContext context) { context.direction(), context.credential(), context.localRepo(), context.localPath(), repositoryUri.uri(), repositoryUri.path(), repositoryUri.branch(), - context.gitignore()); + context.gitignore(), context.zone()); } case SCHEME_GIT_HTTP: case SCHEME_GIT_HTTPS: @@ -60,7 +60,7 @@ public Mirror newMirror(MirrorContext context) { context.direction(), context.credential(), context.localRepo(), context.localPath(), repositoryUri.uri(), repositoryUri.path(), repositoryUri.branch(), - context.gitignore()); + context.gitignore(), context.zone()); } } diff --git a/server-mirror-git/src/main/java/com/linecorp/centraldogma/server/internal/mirror/SshGitMirror.java b/server-mirror-git/src/main/java/com/linecorp/centraldogma/server/internal/mirror/SshGitMirror.java index d842d255a7..9421e304ca 100644 --- a/server-mirror-git/src/main/java/com/linecorp/centraldogma/server/internal/mirror/SshGitMirror.java +++ b/server-mirror-git/src/main/java/com/linecorp/centraldogma/server/internal/mirror/SshGitMirror.java @@ -85,10 +85,9 @@ final class SshGitMirror extends AbstractGitMirror { SshGitMirror(String id, boolean enabled, @Nullable Cron schedule, MirrorDirection direction, Credential credential, Repository localRepo, String localPath, URI remoteRepoUri, String remotePath, String remoteBranch, - @Nullable String gitignore) { + @Nullable String gitignore, @Nullable String zone) { super(id, enabled, schedule, direction, credential, localRepo, localPath, remoteRepoUri, remotePath, - remoteBranch, - gitignore); + remoteBranch, gitignore, zone); } @Override diff --git a/server-mirror-git/src/test/java/com/linecorp/centraldogma/server/internal/mirror/DefaultMetaRepositoryWithMirrorTest.java b/server-mirror-git/src/test/java/com/linecorp/centraldogma/server/internal/mirror/DefaultMetaRepositoryWithMirrorTest.java index bdb586ae6d..fa13c8bba6 100644 --- a/server-mirror-git/src/test/java/com/linecorp/centraldogma/server/internal/mirror/DefaultMetaRepositoryWithMirrorTest.java +++ b/server-mirror-git/src/test/java/com/linecorp/centraldogma/server/internal/mirror/DefaultMetaRepositoryWithMirrorTest.java @@ -171,9 +171,10 @@ void testMirror(boolean useRawApi) { } else { final List mirrors = ImmutableList.of( new MirrorDto("foo", true, project.name(), DEFAULT_SCHEDULE, "LOCAL_TO_REMOTE", "foo", - "/mirrors/foo", "git+ssh", "foo.com/foo.git", "", "", null, "alice"), + "/mirrors/foo", "git+ssh", "foo.com/foo.git", "", "", null, "alice", null), new MirrorDto("bar", true, project.name(), "0 */10 * * * ?", "REMOTE_TO_LOCAL", "bar", - "", "git+ssh", "bar.com/bar.git", "/some-path", "develop", null, "bob")); + "", "git+ssh", "bar.com/bar.git", "/some-path", "develop", null, "bob", + null)); for (Credential credential : CREDENTIALS) { final Command command = metaRepo.createPushCommand(credential, Author.SYSTEM, false).join(); @@ -181,7 +182,7 @@ void testMirror(boolean useRawApi) { } for (MirrorDto mirror : mirrors) { final Command command = - metaRepo.createPushCommand(mirror, Author.SYSTEM, false).join(); + metaRepo.createPushCommand(mirror, Author.SYSTEM, null, false).join(); pmExtension.executor().execute(command).join(); } } diff --git a/server-mirror-git/src/test/java/com/linecorp/centraldogma/server/internal/mirror/MirrorSchedulingServiceTest.java b/server-mirror-git/src/test/java/com/linecorp/centraldogma/server/internal/mirror/MirrorSchedulingServiceTest.java index 05423c90c1..4cfd1327e9 100644 --- a/server-mirror-git/src/test/java/com/linecorp/centraldogma/server/internal/mirror/MirrorSchedulingServiceTest.java +++ b/server-mirror-git/src/test/java/com/linecorp/centraldogma/server/internal/mirror/MirrorSchedulingServiceTest.java @@ -75,7 +75,7 @@ void mirroringTaskShouldNeverBeRejected() { final Mirror mirror = new AbstractMirror("my-mirror-1", true, EVERY_SECOND, MirrorDirection.REMOTE_TO_LOCAL, Credential.FALLBACK, r, "/", - URI.create("unused://uri"), "/", "", null) { + URI.create("unused://uri"), "/", "", null, null) { @Override protected MirrorResult mirrorLocalToRemote(File workDir, int maxNumFiles, long maxNumBytes, Instant triggeredTime) { @@ -96,7 +96,7 @@ protected MirrorResult mirrorRemoteToLocal(File workDir, CommandExecutor executo when(mr.mirrors()).thenReturn(CompletableFuture.completedFuture(ImmutableList.of(mirror))); final MirrorSchedulingService service = new MirrorSchedulingService( - temporaryFolder, pm, new SimpleMeterRegistry(), 1, 1, 1); + temporaryFolder, pm, new SimpleMeterRegistry(), 1, 1, 1, null); final CommandExecutor executor = mock(CommandExecutor.class); service.start(executor); diff --git a/server-mirror-git/src/test/java/com/linecorp/centraldogma/server/internal/mirror/MirroringAndCredentialServiceV1Test.java b/server-mirror-git/src/test/java/com/linecorp/centraldogma/server/internal/mirror/MirroringAndCredentialServiceV1Test.java index e6e905e459..9467d2c8f5 100644 --- a/server-mirror-git/src/test/java/com/linecorp/centraldogma/server/internal/mirror/MirroringAndCredentialServiceV1Test.java +++ b/server-mirror-git/src/test/java/com/linecorp/centraldogma/server/internal/mirror/MirroringAndCredentialServiceV1Test.java @@ -132,7 +132,8 @@ private void rejectInvalidRepositoryUri() { "/remote-path/1", "mirror-branch", ".my-env0\n.my-env1", - "public-key-credential"); + "public-key-credential", + null); final AggregatedHttpResponse response = userClient.prepare() .post("/api/v1/projects/{proj}/mirrors") @@ -306,7 +307,8 @@ private void updateMirror() { "/updated/remote-path/", "updated-mirror-branch", ".updated-env", - "access-token-credential"); + "access-token-credential", + null); final ResponseEntity updateResponse = userClient.prepare() .put("/api/v1/projects/{proj}/mirrors/{id}") @@ -376,6 +378,7 @@ private static MirrorDto newMirror(String id) { "/remote-path/" + id + '/', "mirror-branch", ".my-env0\n.my-env1", - "public-key-credential"); + "public-key-credential", + null); } } diff --git a/server-mirror-git/src/test/java/com/linecorp/centraldogma/server/internal/mirror/MirroringTaskTest.java b/server-mirror-git/src/test/java/com/linecorp/centraldogma/server/internal/mirror/MirroringTaskTest.java index 273b6eac2b..8403171b23 100644 --- a/server-mirror-git/src/test/java/com/linecorp/centraldogma/server/internal/mirror/MirroringTaskTest.java +++ b/server-mirror-git/src/test/java/com/linecorp/centraldogma/server/internal/mirror/MirroringTaskTest.java @@ -51,9 +51,9 @@ void testSuccessMetrics() { Mirror mirror = newMirror("git://a.com/b.git", DefaultGitMirror.class, "foo", "bar"); mirror = spy(mirror); doReturn(new MirrorResult(mirror.id(), "foo", "bar", MirrorStatus.SUCCESS, "", Instant.now(), - Instant.now())) + Instant.now(), null)) .when(mirror).mirror(any(), any(), anyInt(), anyLong(), any()); - final MirrorTask mirrorTask = new MirrorTask(mirror, User.SYSTEM, Instant.now(), true); + final MirrorTask mirrorTask = new MirrorTask(mirror, User.SYSTEM, Instant.now(), null, true); new InstrumentedMirroringJob(mirrorTask, meterRegistry).run(null, null, 0, 0L); assertThat(MoreMeters.measureAll(meterRegistry)) .contains(entry("mirroring.result#count{direction=LOCAL_TO_REMOTE,localPath=/," + @@ -67,7 +67,7 @@ void testFailureMetrics() { mirror = spy(mirror); final RuntimeException e = new RuntimeException(); doThrow(e).when(mirror).mirror(any(), any(), anyInt(), anyLong(), any()); - final MirrorTask mirrorTask = new MirrorTask(mirror, User.SYSTEM, Instant.now(), true); + final MirrorTask mirrorTask = new MirrorTask(mirror, User.SYSTEM, Instant.now(), null, true); final InstrumentedMirroringJob task = new InstrumentedMirroringJob(mirrorTask, meterRegistry); assertThatThrownBy(() -> task.run(null, null, 0, 0L)) .isSameAs(e); @@ -86,7 +86,7 @@ void testTimerMetrics() { Thread.sleep(1000); return null; }).when(mirror).mirror(any(), any(), anyInt(), anyLong(), any()); - final MirrorTask mirrorTask = new MirrorTask(mirror, User.SYSTEM, Instant.now(), true); + final MirrorTask mirrorTask = new MirrorTask(mirror, User.SYSTEM, Instant.now(), null, true); new InstrumentedMirroringJob(mirrorTask, meterRegistry).run(null, null, 0, 0L); assertThat(MoreMeters.measureAll(meterRegistry)) .hasEntrySatisfying( diff --git a/server-mirror-git/src/test/java/com/linecorp/centraldogma/server/internal/mirror/MirroringTestUtils.java b/server-mirror-git/src/test/java/com/linecorp/centraldogma/server/internal/mirror/MirroringTestUtils.java index 2df3911b32..f3cf3478d8 100644 --- a/server-mirror-git/src/test/java/com/linecorp/centraldogma/server/internal/mirror/MirroringTestUtils.java +++ b/server-mirror-git/src/test/java/com/linecorp/centraldogma/server/internal/mirror/MirroringTestUtils.java @@ -59,7 +59,7 @@ static T newMirror(String remoteUri, Cron schedule, final Mirror mirror = new GitMirrorProvider().newMirror( new MirrorContext("mirror-id", true, schedule, MirrorDirection.LOCAL_TO_REMOTE, - credential, repository, "/", URI.create(remoteUri), null)); + credential, repository, "/", URI.create(remoteUri), null, null)); assertThat(mirror).isInstanceOf(mirrorType); assertThat(mirror.direction()).isEqualTo(MirrorDirection.LOCAL_TO_REMOTE); @@ -76,7 +76,7 @@ static void assertMirrorNull(String remoteUri) { final Credential credential = mock(Credential.class); final Mirror mirror = new GitMirrorProvider().newMirror( new MirrorContext("mirror-id", true, EVERY_MINUTE, MirrorDirection.LOCAL_TO_REMOTE, - credential, mock(Repository.class), "/", URI.create(remoteUri), null)); + credential, mock(Repository.class), "/", URI.create(remoteUri), null, null)); assertThat(mirror).isNull(); } diff --git a/server/src/main/java/com/linecorp/centraldogma/server/CentralDogma.java b/server/src/main/java/com/linecorp/centraldogma/server/CentralDogma.java index 02e23070b4..35ea167fb3 100644 --- a/server/src/main/java/com/linecorp/centraldogma/server/CentralDogma.java +++ b/server/src/main/java/com/linecorp/centraldogma/server/CentralDogma.java @@ -36,6 +36,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; @@ -226,6 +227,7 @@ public static CentralDogma forConfig(File configFile) throws IOException { private final AtomicInteger numPendingStopRequests = new AtomicInteger(); + private final Map pluginGroups; @Nullable private final PluginGroup pluginsForAllReplicas; @Nullable @@ -258,14 +260,12 @@ public static CentralDogma forConfig(File configFile) throws IOException { CentralDogma(CentralDogmaConfig cfg, MeterRegistry meterRegistry, List plugins) { this.cfg = requireNonNull(cfg, "cfg"); - pluginsForAllReplicas = PluginGroup.loadPlugins( - CentralDogma.class.getClassLoader(), PluginTarget.ALL_REPLICAS, cfg, plugins); - pluginsForLeaderOnly = PluginGroup.loadPlugins( - CentralDogma.class.getClassLoader(), PluginTarget.LEADER_ONLY, cfg, plugins); - pluginsForZoneLeaderOnly = PluginGroup.loadPlugins( - CentralDogma.class.getClassLoader(), PluginTarget.ZONE_LEADER_ONLY, cfg, plugins); + pluginGroups = PluginGroup.loadPlugins(CentralDogma.class.getClassLoader(), cfg, plugins); + pluginsForAllReplicas = pluginGroups.get(PluginTarget.ALL_REPLICAS); + pluginsForLeaderOnly = pluginGroups.get(PluginTarget.LEADER_ONLY); + pluginsForZoneLeaderOnly = pluginGroups.get(PluginTarget.ZONE_LEADER_ONLY); if (pluginsForZoneLeaderOnly != null) { - checkState(!isNullOrEmpty(cfg.zone()), + checkState(cfg.zone() != null, "zone must be specified when zone leader plugins are enabled."); } startStop = new CentralDogmaStartStop(pluginsForAllReplicas); @@ -320,14 +320,19 @@ public ProjectManager projectManager() { * Returns the {@link MirroringService} of the server. * * @return the {@link MirroringService} if the server is started and mirroring is enabled. - * {@link Optional#empty()} otherwise. + * {@code null} otherwise. */ - public Optional mirroringService() { - if (pluginsForLeaderOnly == null) { - return Optional.empty(); - } - return pluginsForLeaderOnly.findFirstPlugin(DefaultMirroringServicePlugin.class) - .map(DefaultMirroringServicePlugin::mirroringService); + @Nullable + public MirroringService mirroringService() { + return pluginGroups.values() + .stream() + .map(group -> { + return group.findFirstPlugin(DefaultMirroringServicePlugin.class); + }) + .filter(Objects::nonNull) + .findFirst() + .map(DefaultMirroringServicePlugin::mirroringService) + .orElse(null); } /** @@ -336,20 +341,8 @@ public Optional mirroringService() { * @param target the {@link PluginTarget} of the {@link Plugin}s to be returned */ public List plugins(PluginTarget target) { - switch (requireNonNull(target, "target")) { - case LEADER_ONLY: - return pluginsForLeaderOnly != null ? ImmutableList.copyOf(pluginsForLeaderOnly.plugins()) - : ImmutableList.of(); - case ALL_REPLICAS: - return pluginsForAllReplicas != null ? ImmutableList.copyOf(pluginsForAllReplicas.plugins()) - : ImmutableList.of(); - case ZONE_LEADER_ONLY: - return pluginsForZoneLeaderOnly != null ? - ImmutableList.copyOf(pluginsForZoneLeaderOnly.plugins()) : ImmutableList.of(); - default: - // Should not reach here. - throw new Error("Unknown plugin target: " + target); - } + requireNonNull(target, "target"); + return pluginGroups.get(target).plugins(); } /** @@ -498,7 +491,8 @@ private CommandExecutor startCommandExecutor( Consumer onReleaseZoneLeadership = null; // TODO(ikhoon): Deduplicate if (pluginsForZoneLeaderOnly != null) { - final String zone = cfg.zone(); + assert cfg.zone() != null; + final String zone = cfg.zone().currentZone(); onTakeZoneLeadership = exec -> { logger.info("Starting plugins on the {} zone leader replica ..", zone); pluginsForZoneLeaderOnly @@ -772,6 +766,10 @@ private CommandExecutor newZooKeeperCommandExecutor( final File dataDir = cfg.dataDir(); new File(dataDir, "replica_id").delete(); + String zone = null; + if (config().zone() != null) { + zone = config().zone().currentZone(); + } // TODO(trustin): Provide a way to restart/reload the replicator // so that we can recover from ZooKeeper maintenance automatically. return new ZooKeeperCommandExecutor( @@ -779,7 +777,7 @@ private CommandExecutor newZooKeeperCommandExecutor( new StandaloneCommandExecutor(pm, repositoryWorker, serverStatusManager, sessionManager, /* onTakeLeadership */ null, /* onReleaseLeadership */ null, /* onTakeZoneLeadership */ null, /* onReleaseZoneLeadership */ null), - meterRegistry, pm, config().writeQuotaPerRepository(), config().zone(), + meterRegistry, pm, config().writeQuotaPerRepository(), zone, onTakeLeadership, onReleaseLeadership, onTakeZoneLeadership, onReleaseZoneLeadership); } @@ -872,7 +870,7 @@ private void configureHttpApi(ServerBuilder sb, if (GIT_MIRROR_ENABLED) { mirrorRunner = new MirrorRunner(projectApiManager, executor, cfg, meterRegistry); apiV1ServiceBuilder.annotatedService(new MirroringServiceV1(projectApiManager, executor, - mirrorRunner)); + mirrorRunner, cfg)); } apiV1ServiceBuilder.annotatedService() diff --git a/server/src/main/java/com/linecorp/centraldogma/server/CentralDogmaBuilder.java b/server/src/main/java/com/linecorp/centraldogma/server/CentralDogmaBuilder.java index 27401e1158..238dbc8df8 100644 --- a/server/src/main/java/com/linecorp/centraldogma/server/CentralDogmaBuilder.java +++ b/server/src/main/java/com/linecorp/centraldogma/server/CentralDogmaBuilder.java @@ -137,7 +137,7 @@ public final class CentralDogmaBuilder { @Nullable private ManagementConfig managementConfig; @Nullable - private String zone; + private ZoneConfig zoneConfig; /** * Creates a new builder with the specified data directory. @@ -535,6 +535,13 @@ public CentralDogmaBuilder pluginConfigs(PluginConfig... pluginConfigs) { return this; } + /** + * Returns the {@link PluginConfig}s that have been added. + */ + public List pluginConfigs() { + return pluginConfigs; + } + /** * Adds the {@link Plugin}s. */ @@ -562,11 +569,11 @@ public CentralDogmaBuilder management(ManagementConfig managementConfig) { } /** - * Specifies the zone of the server. + * Specifies the {@link ZoneConfig} of the server. */ - public CentralDogmaBuilder zone(String zone) { - requireNonNull(zone, "zone"); - this.zone = zone; + public CentralDogmaBuilder zone(ZoneConfig zoneConfig) { + requireNonNull(zoneConfig, "zoneConfig"); + this.zoneConfig = zoneConfig; return this; } @@ -601,8 +608,8 @@ private CentralDogmaConfig buildConfig() { requestTimeoutMillis, idleTimeoutMillis, maxFrameLength, numRepositoryWorkers, repositoryCacheSpec, maxRemovedRepositoryAgeMillis, gracefulShutdownTimeout, - webAppEnabled, webAppTitle,replicationConfig, + webAppEnabled, webAppTitle, replicationConfig, null, accessLogFormat, authCfg, quotaConfig, - corsConfig, pluginConfigs, managementConfig, zone); + corsConfig, pluginConfigs, managementConfig, zoneConfig); } } diff --git a/server/src/main/java/com/linecorp/centraldogma/server/CentralDogmaConfig.java b/server/src/main/java/com/linecorp/centraldogma/server/CentralDogmaConfig.java index 0a3c76ea6d..0b3894d45f 100644 --- a/server/src/main/java/com/linecorp/centraldogma/server/CentralDogmaConfig.java +++ b/server/src/main/java/com/linecorp/centraldogma/server/CentralDogmaConfig.java @@ -271,7 +271,7 @@ public static CentralDogmaConfig load(String json) throws JsonMappingException, private final ManagementConfig managementConfig; @Nullable - private final String zone; + private final ZoneConfig zoneConfig; CentralDogmaConfig( @JsonProperty(value = "dataDir", required = true) File dataDir, @@ -300,7 +300,7 @@ public static CentralDogmaConfig load(String json) throws JsonMappingException, @JsonProperty("cors") @Nullable CorsConfig corsConfig, @JsonProperty("pluginConfigs") @Nullable List pluginConfigs, @JsonProperty("management") @Nullable ManagementConfig managementConfig, - @JsonProperty("zone") @Nullable String zone) { + @JsonProperty("zone") @Nullable ZoneConfig zoneConfig) { this.dataDir = requireNonNull(dataDir, "dataDir"); this.ports = ImmutableList.copyOf(requireNonNull(ports, "ports")); @@ -349,7 +349,7 @@ public static CentralDogmaConfig load(String json) throws JsonMappingException, pluginConfigMap = this.pluginConfigs.stream().collect( toImmutableMap(PluginConfig::getClass, Function.identity())); this.managementConfig = managementConfig; - this.zone = convertValue(zone, "zone"); + this.zoneConfig = zoneConfig; } /** @@ -589,13 +589,13 @@ public ManagementConfig managementConfig() { } /** - * Returns the zone of the server. + * Returns the zone information of the server. * Note that the zone must be specified to use the {@link PluginTarget#ZONE_LEADER_ONLY} target. */ @Nullable @JsonProperty("zone") - public String zone() { - return zone; + public ZoneConfig zone() { + return zoneConfig; } @Override diff --git a/server/src/main/java/com/linecorp/centraldogma/server/PluginGroup.java b/server/src/main/java/com/linecorp/centraldogma/server/PluginGroup.java index 0962d26732..a68c12d0d3 100644 --- a/server/src/main/java/com/linecorp/centraldogma/server/PluginGroup.java +++ b/server/src/main/java/com/linecorp/centraldogma/server/PluginGroup.java @@ -16,10 +16,12 @@ package com.linecorp.centraldogma.server; import static com.google.common.collect.ImmutableList.toImmutableList; +import static com.google.common.collect.ImmutableMap.toImmutableMap; import static java.util.Objects.requireNonNull; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.Optional; import java.util.ServiceLoader; import java.util.concurrent.CompletableFuture; @@ -27,12 +29,14 @@ import java.util.concurrent.Executor; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; +import java.util.stream.Collectors; import javax.annotation.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; @@ -63,30 +67,28 @@ final class PluginGroup { * * @param target the {@link PluginTarget} which would be loaded */ + @VisibleForTesting @Nullable static PluginGroup loadPlugins(PluginTarget target, CentralDogmaConfig config) { - return loadPlugins(PluginGroup.class.getClassLoader(), target, config, ImmutableList.of()); + return loadPlugins(PluginGroup.class.getClassLoader(), config, ImmutableList.of()).get(target); } /** * Returns a new {@link PluginGroup} which holds the {@link Plugin}s loaded from the classpath. - * {@code null} is returned if there is no {@link Plugin} whose target equals to the specified + * An empty map is returned if there is no {@link Plugin} whose target equals to the specified * {@code target}. * * @param classLoader which is used to load the {@link Plugin}s - * @param target the {@link PluginTarget} which would be loaded */ - @Nullable - static PluginGroup loadPlugins(ClassLoader classLoader, PluginTarget target, CentralDogmaConfig config, - List plugins) { + static Map loadPlugins(ClassLoader classLoader, CentralDogmaConfig config, + List plugins) { requireNonNull(classLoader, "classLoader"); - requireNonNull(target, "target"); requireNonNull(config, "config"); final ServiceLoader loader = ServiceLoader.load(Plugin.class, classLoader); final ImmutableMap.Builder, Plugin> allPlugins = new ImmutableMap.Builder<>(); for (Plugin plugin : Iterables.concat(plugins, loader)) { - if (target == plugin.target() && plugin.isEnabled(config)) { + if (plugin.isEnabled(config)) { allPlugins.put(plugin.configType(), plugin); } } @@ -94,11 +96,31 @@ static PluginGroup loadPlugins(ClassLoader classLoader, PluginTarget target, Cen // IllegalArgumentException is thrown if there are duplicate keys. final Map, Plugin> pluginMap = allPlugins.build(); if (pluginMap.isEmpty()) { - return null; + return ImmutableMap.of(); } - return new PluginGroup(pluginMap.values(), Executors.newSingleThreadExecutor(new DefaultThreadFactory( - "plugins-for-" + target.name().toLowerCase().replace("_", "-"), true))); + final Map pluginGroups = + pluginMap.values() + .stream() + .collect(Collectors.groupingBy(plugin -> plugin.target(config))) + .entrySet() + .stream() + .collect(toImmutableMap(Entry::getKey, e -> { + final PluginTarget target = e.getKey(); + final List targetPlugins = e.getValue(); + final String poolName = + "plugins-for-" + target.name().toLowerCase().replace("_", "-"); + return new PluginGroup(targetPlugins, + Executors.newSingleThreadExecutor( + new DefaultThreadFactory(poolName, true))); + })); + + pluginGroups.forEach((target, group) -> { + logger.debug("Loaded plugins for target {}: {}", target, + group.plugins().stream().map(plugin -> plugin.getClass().getName()) + .collect(toImmutableList())); + }); + return pluginGroups; } private final List plugins; @@ -119,9 +141,10 @@ List plugins() { /** * Returns the first {@link Plugin} of the specified {@code clazz} as wrapped by an {@link Optional}. */ - Optional findFirstPlugin(Class clazz) { + @Nullable + T findFirstPlugin(Class clazz) { requireNonNull(clazz, "clazz"); - return plugins.stream().filter(clazz::isInstance).map(clazz::cast).findFirst(); + return plugins.stream().filter(clazz::isInstance).map(clazz::cast).findFirst().orElse(null); } /** diff --git a/server/src/main/java/com/linecorp/centraldogma/server/ZoneConfig.java b/server/src/main/java/com/linecorp/centraldogma/server/ZoneConfig.java new file mode 100644 index 0000000000..96bde8aca9 --- /dev/null +++ b/server/src/main/java/com/linecorp/centraldogma/server/ZoneConfig.java @@ -0,0 +1,94 @@ +/* + * Copyright 2024 LINE Corporation + * + * LINE Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + * + */ + +package com.linecorp.centraldogma.server; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.linecorp.centraldogma.server.CentralDogmaConfig.convertValue; +import static java.util.Objects.requireNonNull; + +import java.util.List; +import java.util.Objects; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.MoreObjects; + +/** + * A configuration class for the zone. + */ +public final class ZoneConfig { + + private final String currentZone; + private final List allZones; + + /** + * Creates a new instance. + */ + @JsonCreator + public ZoneConfig(@JsonProperty("currentZone") String currentZone, + @JsonProperty("allZones") List allZones) { + requireNonNull(currentZone, "currentZone"); + requireNonNull(allZones, "allZones"); + this.currentZone = convertValue(currentZone, "zone.currentZone"); + this.allZones = allZones; + checkArgument(allZones.contains(currentZone), "The current zone: %s, (expected: one of %s)", + currentZone, allZones); + } + + /** + * Returns the current zone. + */ + @JsonProperty("currentZone") + public String currentZone() { + return currentZone; + } + + /** + * Returns all zones. + */ + @JsonProperty("allZones") + public List allZones() { + return allZones; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof ZoneConfig)) { + return false; + } + final ZoneConfig that = (ZoneConfig) o; + return currentZone.equals(that.currentZone) && + allZones.equals(that.allZones); + } + + @Override + public int hashCode() { + return Objects.hash(currentZone, allZones); + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("currentZone", currentZone) + .add("allZones", allZones) + .toString(); + } +} diff --git a/server/src/main/java/com/linecorp/centraldogma/server/internal/api/MirroringServiceV1.java b/server/src/main/java/com/linecorp/centraldogma/server/internal/api/MirroringServiceV1.java index 9599f50e14..d6b28e4b25 100644 --- a/server/src/main/java/com/linecorp/centraldogma/server/internal/api/MirroringServiceV1.java +++ b/server/src/main/java/com/linecorp/centraldogma/server/internal/api/MirroringServiceV1.java @@ -18,14 +18,19 @@ import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.collect.ImmutableList.toImmutableList; +import static com.linecorp.centraldogma.server.internal.mirror.DefaultMirroringServicePlugin.mirrorConfig; import static com.linecorp.centraldogma.server.internal.storage.repository.DefaultMetaRepository.mirrorFile; import java.net.URI; import java.util.List; +import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; +import javax.annotation.Nullable; + import com.cronutils.model.Cron; +import com.google.common.collect.ImmutableMap; import com.linecorp.armeria.server.annotation.ConsumesJson; import com.linecorp.armeria.server.annotation.Delete; @@ -42,6 +47,8 @@ import com.linecorp.centraldogma.common.Revision; import com.linecorp.centraldogma.internal.api.v1.MirrorDto; import com.linecorp.centraldogma.internal.api.v1.PushResultDto; +import com.linecorp.centraldogma.server.CentralDogmaConfig; +import com.linecorp.centraldogma.server.ZoneConfig; import com.linecorp.centraldogma.server.command.Command; import com.linecorp.centraldogma.server.command.CommandExecutor; import com.linecorp.centraldogma.server.command.CommitResult; @@ -52,6 +59,7 @@ import com.linecorp.centraldogma.server.metadata.User; import com.linecorp.centraldogma.server.mirror.Mirror; import com.linecorp.centraldogma.server.mirror.MirrorResult; +import com.linecorp.centraldogma.server.mirror.MirroringServicePluginConfig; import com.linecorp.centraldogma.server.storage.project.Project; import com.linecorp.centraldogma.server.storage.repository.MetaRepository; @@ -67,12 +75,29 @@ public class MirroringServiceV1 extends AbstractService { private final ProjectApiManager projectApiManager; private final MirrorRunner mirrorRunner; + private final Map mirrorZoneConfig; + @Nullable + private final ZoneConfig zoneConfig; public MirroringServiceV1(ProjectApiManager projectApiManager, CommandExecutor executor, - MirrorRunner mirrorRunner) { + MirrorRunner mirrorRunner, CentralDogmaConfig config) { super(executor); this.projectApiManager = projectApiManager; this.mirrorRunner = mirrorRunner; + zoneConfig = config.zone(); + mirrorZoneConfig = mirrorZoneConfig(config); + } + + private static Map mirrorZoneConfig(CentralDogmaConfig config) { + final MirroringServicePluginConfig mirrorConfig = mirrorConfig(config); + final ImmutableMap.Builder builder = ImmutableMap.builderWithExpectedSize(2); + final boolean zonePinned = mirrorConfig != null && mirrorConfig.zonePinned(); + builder.put("zonePinned", zonePinned); + final ZoneConfig zone = config.zone(); + if (zone != null) { + builder.put("zone", zone); + } + return builder.build(); } /** @@ -154,11 +179,12 @@ public CompletableFuture deleteMirror(@Param String projectName, private CompletableFuture createOrUpdate(String projectName, MirrorDto newMirror, Author author, boolean update) { - return metaRepo(projectName).createPushCommand(newMirror, author, update).thenCompose(command -> { - return executor().execute(command).thenApply(result -> { - return new PushResultDto(result.revision(), command.timestamp()); - }); - }); + return metaRepo(projectName) + .createPushCommand(newMirror, author, zoneConfig, update).thenCompose(command -> { + return executor().execute(command).thenApply(result -> { + return new PushResultDto(result.revision(), command.timestamp()); + }); + }); } /** @@ -175,6 +201,17 @@ public CompletableFuture runMirror(@Param String projectName, @Par return mirrorRunner.run(projectName, mirrorId, user); } + /** + * GET /mirror/config + * + *

Returns the configuration of the mirroring service. + */ + @Get("/mirror/config") + public Map config() { + // TODO(ikhoon): Add more configurations if necessary. + return mirrorZoneConfig; + } + private static MirrorDto convertToMirrorDto(String projectName, Mirror mirror) { final URI remoteRepoUri = mirror.remoteRepoUri(); final Cron schedule = mirror.schedule(); @@ -190,7 +227,7 @@ private static MirrorDto convertToMirrorDto(String projectName, Mirror mirror) { mirror.remotePath(), mirror.remoteBranch(), mirror.gitignore(), - mirror.credential().id()); + mirror.credential().id(), mirror.zone()); } private MetaRepository metaRepo(String projectName) { diff --git a/server/src/main/java/com/linecorp/centraldogma/server/internal/mirror/AbstractMirror.java b/server/src/main/java/com/linecorp/centraldogma/server/internal/mirror/AbstractMirror.java index 8f5135c3c4..e46fef9d32 100644 --- a/server/src/main/java/com/linecorp/centraldogma/server/internal/mirror/AbstractMirror.java +++ b/server/src/main/java/com/linecorp/centraldogma/server/internal/mirror/AbstractMirror.java @@ -64,6 +64,8 @@ public abstract class AbstractMirror implements Mirror { @Nullable private final String gitignore; @Nullable + private final String zone; + @Nullable private final Cron schedule; @Nullable private final ExecutionTime executionTime; @@ -72,7 +74,7 @@ public abstract class AbstractMirror implements Mirror { protected AbstractMirror(String id, boolean enabled, @Nullable Cron schedule, MirrorDirection direction, Credential credential, Repository localRepo, String localPath, URI remoteRepoUri, String remotePath, String remoteBranch, - @Nullable String gitignore) { + @Nullable String gitignore, @Nullable String zone) { this.id = requireNonNull(id, "id"); this.enabled = enabled; this.direction = requireNonNull(direction, "direction"); @@ -83,6 +85,7 @@ protected AbstractMirror(String id, boolean enabled, @Nullable Cron schedule, Mi this.remotePath = normalizePath(requireNonNull(remotePath, "remotePath")); this.remoteBranch = requireNonNull(remoteBranch, "remoteBranch"); this.gitignore = gitignore; + this.zone = zone; if (schedule != null) { this.schedule = requireNonNull(schedule, "schedule"); @@ -174,6 +177,12 @@ public final boolean enabled() { return enabled; } + @Nullable + @Override + public String zone() { + return zone; + } + @Override public final MirrorResult mirror(File workDir, CommandExecutor executor, int maxNumFiles, long maxNumBytes, Instant triggeredTime) { @@ -212,7 +221,7 @@ protected abstract MirrorResult mirrorRemoteToLocal( protected final MirrorResult newMirrorResult(MirrorStatus mirrorStatus, @Nullable String description, Instant triggeredTime) { return new MirrorResult(id, localRepo.parent().name(), localRepo.name(), mirrorStatus, description, - triggeredTime, Instant.now()); + triggeredTime, Instant.now(), zone); } @Override diff --git a/server/src/main/java/com/linecorp/centraldogma/server/internal/mirror/CentralDogmaMirror.java b/server/src/main/java/com/linecorp/centraldogma/server/internal/mirror/CentralDogmaMirror.java index 73392970f3..a66ab9837a 100644 --- a/server/src/main/java/com/linecorp/centraldogma/server/internal/mirror/CentralDogmaMirror.java +++ b/server/src/main/java/com/linecorp/centraldogma/server/internal/mirror/CentralDogmaMirror.java @@ -40,10 +40,10 @@ public final class CentralDogmaMirror extends AbstractMirror { public CentralDogmaMirror(String id, boolean enabled, Cron schedule, MirrorDirection direction, Credential credential, Repository localRepo, String localPath, URI remoteRepoUri, String remoteProject, String remoteRepo, String remotePath, - @Nullable String gitignore) { + @Nullable String gitignore, @Nullable String zone) { // Central Dogma has no notion of 'branch', so we just pass an empty string as a placeholder. super(id, enabled, schedule, direction, credential, localRepo, localPath, remoteRepoUri, remotePath, - "", gitignore); + "", gitignore, zone); this.remoteProject = requireNonNull(remoteProject, "remoteProject"); this.remoteRepo = requireNonNull(remoteRepo, "remoteRepo"); diff --git a/server/src/main/java/com/linecorp/centraldogma/server/internal/mirror/DefaultMirroringServicePlugin.java b/server/src/main/java/com/linecorp/centraldogma/server/internal/mirror/DefaultMirroringServicePlugin.java index fc71360940..b3fbbe7b64 100644 --- a/server/src/main/java/com/linecorp/centraldogma/server/internal/mirror/DefaultMirroringServicePlugin.java +++ b/server/src/main/java/com/linecorp/centraldogma/server/internal/mirror/DefaultMirroringServicePlugin.java @@ -26,6 +26,7 @@ import com.google.common.base.MoreObjects; import com.linecorp.centraldogma.server.CentralDogmaConfig; +import com.linecorp.centraldogma.server.ZoneConfig; import com.linecorp.centraldogma.server.mirror.MirroringServicePluginConfig; import com.linecorp.centraldogma.server.plugin.Plugin; import com.linecorp.centraldogma.server.plugin.PluginContext; @@ -33,12 +34,31 @@ public final class DefaultMirroringServicePlugin implements Plugin { + @Nullable + public static MirroringServicePluginConfig mirrorConfig(CentralDogmaConfig config) { + return (MirroringServicePluginConfig) config.pluginConfigMap().get(MirroringServicePluginConfig.class); + } + @Nullable private volatile MirrorSchedulingService mirroringService; + @Nullable + private PluginTarget pluginTarget; + @Override - public PluginTarget target() { - return PluginTarget.LEADER_ONLY; + public PluginTarget target(CentralDogmaConfig config) { + requireNonNull(config, "config"); + if (pluginTarget != null) { + return pluginTarget; + } + + final MirroringServicePluginConfig mirrorConfig = mirrorConfig(config); + if (mirrorConfig != null && mirrorConfig.zonePinned()) { + pluginTarget = PluginTarget.ZONE_LEADER_ONLY; + } else { + pluginTarget = PluginTarget.LEADER_ONLY; + } + return pluginTarget; } @Override @@ -48,27 +68,34 @@ public synchronized CompletionStage start(PluginContext context) { MirrorSchedulingService mirroringService = this.mirroringService; if (mirroringService == null) { final CentralDogmaConfig cfg = context.config(); - final MirroringServicePluginConfig mirroringServicePluginConfig = - (MirroringServicePluginConfig) cfg.pluginConfigMap().get(configType()); + final MirroringServicePluginConfig mirroringServicePluginConfig = mirrorConfig(cfg); final int numThreads; final int maxNumFilesPerMirror; final long maxNumBytesPerMirror; + final ZoneConfig zoneConfig; if (mirroringServicePluginConfig != null) { numThreads = mirroringServicePluginConfig.numMirroringThreads(); maxNumFilesPerMirror = mirroringServicePluginConfig.maxNumFilesPerMirror(); maxNumBytesPerMirror = mirroringServicePluginConfig.maxNumBytesPerMirror(); + if (mirroringServicePluginConfig.zonePinned()) { + zoneConfig = cfg.zone(); + assert zoneConfig != null : "zonePinned is enabled but no zone configuration found"; + } else { + zoneConfig = null; + } } else { numThreads = MirroringServicePluginConfig.INSTANCE.numMirroringThreads(); maxNumFilesPerMirror = MirroringServicePluginConfig.INSTANCE.maxNumFilesPerMirror(); maxNumBytesPerMirror = MirroringServicePluginConfig.INSTANCE.maxNumBytesPerMirror(); + zoneConfig = null; } mirroringService = new MirrorSchedulingService(new File(cfg.dataDir(), "_mirrors"), context.projectManager(), context.meterRegistry(), numThreads, maxNumFilesPerMirror, - maxNumBytesPerMirror); + maxNumBytesPerMirror, zoneConfig); this.mirroringService = mirroringService; } mirroringService.start(context.commandExecutor()); @@ -97,8 +124,9 @@ public MirrorSchedulingService mirroringService() { @Override public String toString() { return MoreObjects.toStringHelper(this) - .add("configType", configType()) - .add("target", target()) + .omitNullValues() + .add("configType", configType().getName()) + .add("target", pluginTarget) .toString(); } } diff --git a/server/src/main/java/com/linecorp/centraldogma/server/internal/mirror/MirrorRunner.java b/server/src/main/java/com/linecorp/centraldogma/server/internal/mirror/MirrorRunner.java index 16a197620f..8e0079ceeb 100644 --- a/server/src/main/java/com/linecorp/centraldogma/server/internal/mirror/MirrorRunner.java +++ b/server/src/main/java/com/linecorp/centraldogma/server/internal/mirror/MirrorRunner.java @@ -29,6 +29,8 @@ import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import javax.annotation.Nullable; + import com.google.common.base.MoreObjects; import com.linecorp.armeria.common.util.SafeCloseable; @@ -56,6 +58,8 @@ public final class MirrorRunner implements SafeCloseable { private final ExecutorService worker; private final Map> inflightRequests = new ConcurrentHashMap<>(); + @Nullable + private final String currentZone; public MirrorRunner(ProjectApiManager projectApiManager, CommandExecutor commandExecutor, CentralDogmaConfig cfg, MeterRegistry meterRegistry) { @@ -70,6 +74,11 @@ public MirrorRunner(ProjectApiManager projectApiManager, CommandExecutor command mirrorConfig = MirroringServicePluginConfig.INSTANCE; } this.mirrorConfig = mirrorConfig; + if (cfg.zone() != null) { + currentZone = cfg.zone().currentZone(); + } else { + currentZone = null; + } final ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor( 0, mirrorConfig.numMirroringThreads(), @@ -91,10 +100,16 @@ private CompletableFuture run(MirrorKey mirrorKey, User user) { final CompletableFuture future = metaRepo(mirrorKey.projectName).mirror(mirrorKey.mirrorId).thenApplyAsync(mirror -> { if (!mirror.enabled()) { - throw new MirrorException("The mirror is disabled: " + mirrorKey); + throw new MirrorException("The mirror is disabled: " + + mirrorKey.projectName + '/' + mirrorKey.mirrorId); } - final MirrorTask mirrorTask = new MirrorTask(mirror, user, Instant.now(), true); + final String zone = mirror.zone(); + if (zone != null && !zone.equals(currentZone)) { + throw new MirrorException("The mirror is not in the current zone: " + currentZone); + } + final MirrorTask mirrorTask = new MirrorTask(mirror, user, Instant.now(), + currentZone, false); final MirrorListener listener = MirrorSchedulingService.mirrorListener; listener.onStart(mirrorTask); try { diff --git a/server/src/main/java/com/linecorp/centraldogma/server/internal/mirror/MirrorSchedulingService.java b/server/src/main/java/com/linecorp/centraldogma/server/internal/mirror/MirrorSchedulingService.java index deff2d239a..4dc69f7432 100644 --- a/server/src/main/java/com/linecorp/centraldogma/server/internal/mirror/MirrorSchedulingService.java +++ b/server/src/main/java/com/linecorp/centraldogma/server/internal/mirror/MirrorSchedulingService.java @@ -52,6 +52,7 @@ import com.linecorp.centraldogma.common.MirrorException; import com.linecorp.centraldogma.server.MirroringService; +import com.linecorp.centraldogma.server.ZoneConfig; import com.linecorp.centraldogma.server.command.CommandExecutor; import com.linecorp.centraldogma.server.metadata.User; import com.linecorp.centraldogma.server.mirror.Mirror; @@ -91,6 +92,10 @@ public final class MirrorSchedulingService implements MirroringService { private final int numThreads; private final int maxNumFilesPerMirror; private final long maxNumBytesPerMirror; + @Nullable + private final ZoneConfig zoneConfig; + @Nullable + private final String currentZone; private volatile CommandExecutor commandExecutor; private volatile ListeningScheduledExecutorService scheduler; @@ -101,7 +106,8 @@ public final class MirrorSchedulingService implements MirroringService { @VisibleForTesting public MirrorSchedulingService(File workDir, ProjectManager projectManager, MeterRegistry meterRegistry, - int numThreads, int maxNumFilesPerMirror, long maxNumBytesPerMirror) { + int numThreads, int maxNumFilesPerMirror, long maxNumBytesPerMirror, + @Nullable ZoneConfig zoneConfig) { this.workDir = requireNonNull(workDir, "workDir"); this.projectManager = requireNonNull(projectManager, "projectManager"); @@ -115,6 +121,12 @@ public MirrorSchedulingService(File workDir, ProjectManager projectManager, Mete this.numThreads = numThreads; this.maxNumFilesPerMirror = maxNumFilesPerMirror; this.maxNumBytesPerMirror = maxNumBytesPerMirror; + this.zoneConfig = zoneConfig; + if (zoneConfig != null) { + currentZone = zoneConfig.currentZone(); + } else { + currentZone = null; + } } public boolean isStarted() { @@ -152,7 +164,7 @@ public synchronized void start(CommandExecutor commandExecutor) { })); final ListenableScheduledFuture future = scheduler.scheduleWithFixedDelay( - this::schedulePendingMirrors, + this::scheduleMirrors, TICK.getSeconds(), TICK.getSeconds(), TimeUnit.SECONDS); Futures.addCallback(future, new FutureCallback() { @@ -181,7 +193,7 @@ public synchronized void stop() { } } - private void schedulePendingMirrors() { + private void scheduleMirrors() { final ZonedDateTime now = ZonedDateTime.now(); if (lastExecutionTime == null) { lastExecutionTime = now.minus(TICK); @@ -209,9 +221,31 @@ private void schedulePendingMirrors() { if (m.schedule() == null) { continue; } + if (zoneConfig != null) { + String pinnedZone = m.zone(); + if (pinnedZone == null) { + // Use the first zone if the mirror does not specify a zone. + pinnedZone = zoneConfig.allZones().get(0); + } + if (!pinnedZone.equals(currentZone)) { + // Skip the mirror if it is pinned to a different zone. + if (!zoneConfig.allZones().contains(pinnedZone)) { + // The mirror is pinned to an invalid zone. + final MirrorTask invalidMirror = + new MirrorTask(m, User.SYSTEM, Instant.now(), + pinnedZone, true); + mirrorListener.onStart(invalidMirror); + mirrorListener.onError(invalidMirror, new MirrorException( + "The mirror is pinned to an unknown zone: " + pinnedZone + + " (valid zones: " + zoneConfig.allZones() + ')')); + } + continue; + } + } try { if (m.nextExecutionTime(currentLastExecutionTime).compareTo(now) < 0) { - runAsync(new MirrorTask(m, User.SYSTEM, Instant.now(), true)); + runAsync(new MirrorTask(m, User.SYSTEM, Instant.now(), + currentZone, true)); } } catch (Exception e) { logger.warn("Unexpected exception while mirroring: {}", m, e); @@ -231,7 +265,7 @@ public CompletableFuture mirror() { () -> projectManager.list().values().forEach(p -> { try { p.metaRepo().mirrors().get(5, TimeUnit.SECONDS) - .forEach(m -> run(new MirrorTask(m, User.SYSTEM, Instant.now(), false))); + .forEach(m -> run(new MirrorTask(m, User.SYSTEM, Instant.now(), currentZone, false))); } catch (InterruptedException | TimeoutException | ExecutionException e) { throw new IllegalStateException( "Failed to load mirror list with in 5 seconds. project: " + p.name(), e); diff --git a/server/src/main/java/com/linecorp/centraldogma/server/internal/storage/PurgeSchedulingServicePlugin.java b/server/src/main/java/com/linecorp/centraldogma/server/internal/storage/PurgeSchedulingServicePlugin.java index b117dc2a39..4f38430225 100644 --- a/server/src/main/java/com/linecorp/centraldogma/server/internal/storage/PurgeSchedulingServicePlugin.java +++ b/server/src/main/java/com/linecorp/centraldogma/server/internal/storage/PurgeSchedulingServicePlugin.java @@ -36,7 +36,7 @@ public final class PurgeSchedulingServicePlugin implements Plugin { private volatile PurgeSchedulingService purgeSchedulingService; @Override - public PluginTarget target() { + public PluginTarget target(CentralDogmaConfig config) { return PluginTarget.LEADER_ONLY; } @@ -87,7 +87,7 @@ public PurgeSchedulingService scheduledPurgingService() { public String toString() { return MoreObjects.toStringHelper(this) .omitNullValues() - .add("target", target()) + .add("target", PluginTarget.LEADER_ONLY) .add("purgeSchedulingService", purgeSchedulingService) .toString(); } diff --git a/server/src/main/java/com/linecorp/centraldogma/server/internal/storage/repository/CentralDogmaMirrorProvider.java b/server/src/main/java/com/linecorp/centraldogma/server/internal/storage/repository/CentralDogmaMirrorProvider.java index b23961a8c1..f27c2383a8 100644 --- a/server/src/main/java/com/linecorp/centraldogma/server/internal/storage/repository/CentralDogmaMirrorProvider.java +++ b/server/src/main/java/com/linecorp/centraldogma/server/internal/storage/repository/CentralDogmaMirrorProvider.java @@ -62,6 +62,6 @@ public Mirror newMirror(MirrorContext context) { return new CentralDogmaMirror(context.id(), context.enabled(), context.schedule(), context.direction(), context.credential(), context.localRepo(), context.localPath(), repositoryUri.uri(), remoteProject, remoteRepo, remotePath, - context.gitignore()); + context.gitignore(), context.zone()); } } diff --git a/server/src/main/java/com/linecorp/centraldogma/server/internal/storage/repository/DefaultMetaRepository.java b/server/src/main/java/com/linecorp/centraldogma/server/internal/storage/repository/DefaultMetaRepository.java index 6480792931..a1c838e2b5 100644 --- a/server/src/main/java/com/linecorp/centraldogma/server/internal/storage/repository/DefaultMetaRepository.java +++ b/server/src/main/java/com/linecorp/centraldogma/server/internal/storage/repository/DefaultMetaRepository.java @@ -25,6 +25,8 @@ import java.util.Objects; import java.util.concurrent.CompletableFuture; +import javax.annotation.Nullable; + import com.cronutils.model.Cron; import com.cronutils.model.field.CronField; import com.cronutils.model.field.CronFieldName; @@ -44,6 +46,7 @@ import com.linecorp.centraldogma.common.Revision; import com.linecorp.centraldogma.internal.Jackson; import com.linecorp.centraldogma.internal.api.v1.MirrorDto; +import com.linecorp.centraldogma.server.ZoneConfig; import com.linecorp.centraldogma.server.command.Command; import com.linecorp.centraldogma.server.command.CommitResult; import com.linecorp.centraldogma.server.credential.Credential; @@ -237,8 +240,9 @@ private CompletableFuture>> find(String filePattern) { @Override public CompletableFuture> createPushCommand(MirrorDto mirrorDto, Author author, + @Nullable ZoneConfig zoneConfig, boolean update) { - validateMirror(mirrorDto); + validateMirror(mirrorDto, zoneConfig); if (update) { final String summary = "Update the mirror '" + mirrorDto.id() + '\''; return mirror(mirrorDto.id()).thenApply(mirror -> { @@ -290,7 +294,7 @@ private Command newCommand(Credential credential, Author author, S change); } - private static void validateMirror(MirrorDto mirror) { + private static void validateMirror(MirrorDto mirror, @Nullable ZoneConfig zoneConfig) { checkArgument(!Strings.isNullOrEmpty(mirror.id()), "Mirror ID is empty"); final String scheduleString = mirror.schedule(); if (scheduleString != null) { @@ -299,6 +303,13 @@ private static void validateMirror(MirrorDto mirror) { checkArgument(!secondField.getExpression().asString().contains("*"), "The second field of the schedule must be specified. (seconds: *, expected: 0-59)"); } + + final String zone = mirror.zone(); + if (zone != null) { + checkArgument(zoneConfig != null, "Zone configuration is missing"); + checkArgument(zoneConfig.allZones().contains(zone), + "The zone '%s' is not in the zone configuration: %s", zone, zoneConfig); + } } private static MirrorConfig converterToMirrorConfig(MirrorDto mirrorDto) { @@ -315,6 +326,7 @@ private static MirrorConfig converterToMirrorConfig(MirrorDto mirrorDto) { mirrorDto.localPath(), URI.create(remoteUri), mirrorDto.gitignore(), - mirrorDto.credentialId()); + mirrorDto.credentialId(), + mirrorDto.zone()); } } diff --git a/server/src/main/java/com/linecorp/centraldogma/server/internal/storage/repository/MirrorConfig.java b/server/src/main/java/com/linecorp/centraldogma/server/internal/storage/repository/MirrorConfig.java index 58a7ef94e9..b811f9b666 100644 --- a/server/src/main/java/com/linecorp/centraldogma/server/internal/storage/repository/MirrorConfig.java +++ b/server/src/main/java/com/linecorp/centraldogma/server/internal/storage/repository/MirrorConfig.java @@ -81,6 +81,8 @@ public final class MirrorConfig { private final String credentialId; @Nullable private final Cron schedule; + @Nullable + private final String zone; @JsonCreator public MirrorConfig(@JsonProperty("id") String id, @@ -91,7 +93,8 @@ public MirrorConfig(@JsonProperty("id") String id, @JsonProperty("localPath") @Nullable String localPath, @JsonProperty(value = "remoteUri", required = true) URI remoteUri, @JsonProperty("gitignore") @Nullable Object gitignore, - @JsonProperty("credentialId") String credentialId) { + @JsonProperty("credentialId") String credentialId, + @JsonProperty("zone") @Nullable String zone) { this.id = requireNonNull(id, "id"); this.enabled = firstNonNull(enabled, true); if (schedule != null) { @@ -122,6 +125,7 @@ public MirrorConfig(@JsonProperty("id") String id, this.gitignore = null; } this.credentialId = requireNonNull(credentialId, "credentialId"); + this.zone = zone; } @Nullable @@ -132,7 +136,7 @@ Mirror toMirror(Project parent, Iterable credentials) { final MirrorContext mirrorContext = new MirrorContext( id, enabled, schedule, direction, findCredential(credentials, credentialId), - parent.repos().get(localRepo), localPath, remoteUri, gitignore); + parent.repos().get(localRepo), localPath, remoteUri, gitignore, zone); for (MirrorProvider mirrorProvider : MIRROR_PROVIDERS) { final Mirror mirror = mirrorProvider.newMirror(mirrorContext); if (mirror != null) { @@ -209,6 +213,12 @@ public String schedule() { } } + @Nullable + @JsonProperty("zone") + public String zone() { + return zone; + } + @Override public String toString() { return MoreObjects.toStringHelper(this).omitNullValues() @@ -220,6 +230,7 @@ public String toString() { .add("gitignore", gitignore) .add("credentialId", credentialId) .add("schedule", schedule) + .add("zone", zone) .toString(); } } diff --git a/server/src/main/java/com/linecorp/centraldogma/server/mirror/Mirror.java b/server/src/main/java/com/linecorp/centraldogma/server/mirror/Mirror.java index d4644f7366..61720ef2cf 100644 --- a/server/src/main/java/com/linecorp/centraldogma/server/mirror/Mirror.java +++ b/server/src/main/java/com/linecorp/centraldogma/server/mirror/Mirror.java @@ -100,6 +100,12 @@ public interface Mirror { */ boolean enabled(); + /** + * Returns the zone where this {@link Mirror} is pinned to. + */ + @Nullable + String zone(); + /** * Performs the mirroring task. * diff --git a/server/src/main/java/com/linecorp/centraldogma/server/mirror/MirrorContext.java b/server/src/main/java/com/linecorp/centraldogma/server/mirror/MirrorContext.java index 56dae01833..0390b56666 100644 --- a/server/src/main/java/com/linecorp/centraldogma/server/mirror/MirrorContext.java +++ b/server/src/main/java/com/linecorp/centraldogma/server/mirror/MirrorContext.java @@ -44,13 +44,15 @@ public final class MirrorContext { private final URI remoteUri; @Nullable private final String gitignore; + @Nullable + private final String zone; /** * Creates a new instance. */ public MirrorContext(String id, boolean enabled, @Nullable Cron schedule, MirrorDirection direction, Credential credential, Repository localRepo, String localPath, URI remoteUri, - @Nullable String gitignore) { + @Nullable String gitignore, @Nullable String zone) { this.id = requireNonNull(id, "id"); this.enabled = enabled; this.schedule = schedule; @@ -60,6 +62,7 @@ public MirrorContext(String id, boolean enabled, @Nullable Cron schedule, Mirror this.localPath = requireNonNull(localPath, "localPath"); this.remoteUri = requireNonNull(remoteUri, "remoteUri"); this.gitignore = gitignore; + this.zone = zone; } /** @@ -128,6 +131,14 @@ public String gitignore() { return gitignore; } + /** + * Returns the zone where this mirror is pinned. + */ + @Nullable + public String zone() { + return zone; + } + @Override public String toString() { return MoreObjects.toStringHelper(this).omitNullValues() @@ -140,6 +151,7 @@ public String toString() { .add("localPath", localPath) .add("remoteUri", remoteUri) .add("gitignore", gitignore) + .add("zone", zone) .toString(); } } diff --git a/server/src/main/java/com/linecorp/centraldogma/server/mirror/MirrorResult.java b/server/src/main/java/com/linecorp/centraldogma/server/mirror/MirrorResult.java index 429bb2baed..7342af9398 100644 --- a/server/src/main/java/com/linecorp/centraldogma/server/mirror/MirrorResult.java +++ b/server/src/main/java/com/linecorp/centraldogma/server/mirror/MirrorResult.java @@ -42,6 +42,8 @@ public final class MirrorResult { private final String description; private final Instant triggeredTime; private final Instant completedTime; + @Nullable + private final String zone; /** * Creates a new instance. @@ -53,7 +55,8 @@ public MirrorResult(@JsonProperty("mirrorId") String mirrorId, @JsonProperty("mirrorStatus") MirrorStatus mirrorStatus, @JsonProperty("description") @Nullable String description, @JsonProperty("triggeredTime") Instant triggeredTime, - @JsonProperty("completedTime") Instant completedTime) { + @JsonProperty("completedTime") Instant completedTime, + @JsonProperty("zone") @Nullable String zone) { this.mirrorId = requireNonNull(mirrorId, "mirrorId"); this.projectName = requireNonNull(projectName, "projectName"); this.repoName = requireNonNull(repoName, "repoName"); @@ -61,6 +64,7 @@ public MirrorResult(@JsonProperty("mirrorId") String mirrorId, this.description = description; this.triggeredTime = requireNonNull(triggeredTime, "triggeredTime"); this.completedTime = requireNonNull(completedTime, "completedTime"); + this.zone = zone; } /** @@ -120,6 +124,15 @@ public Instant completedTime() { return completedTime; } + /** + * Returns the zone where the mirroring operation was performed. + */ + @Nullable + @JsonProperty("zone") + public String zone() { + return zone; + } + @Override public boolean equals(Object o) { if (this == o) { @@ -135,13 +148,14 @@ public boolean equals(Object o) { mirrorStatus == that.mirrorStatus && Objects.equals(description, that.description) && triggeredTime.equals(that.triggeredTime) && - completedTime.equals(that.completedTime); + completedTime.equals(that.completedTime) && + Objects.equals(zone, that.zone); } @Override public int hashCode() { return Objects.hash(mirrorId, projectName, repoName, mirrorStatus, description, - triggeredTime, completedTime); + triggeredTime, completedTime, zone); } @Override @@ -155,6 +169,7 @@ public String toString() { .add("description", description) .add("triggeredTime", triggeredTime) .add("completedTime", completedTime) + .add("zone", zone) .toString(); } } diff --git a/server/src/main/java/com/linecorp/centraldogma/server/mirror/MirrorTask.java b/server/src/main/java/com/linecorp/centraldogma/server/mirror/MirrorTask.java index d2884e15f3..70233da338 100644 --- a/server/src/main/java/com/linecorp/centraldogma/server/mirror/MirrorTask.java +++ b/server/src/main/java/com/linecorp/centraldogma/server/mirror/MirrorTask.java @@ -19,8 +19,11 @@ import java.time.Instant; import java.util.Objects; +import javax.annotation.Nullable; + import com.google.common.base.MoreObjects; +import com.linecorp.centraldogma.server.ZoneConfig; import com.linecorp.centraldogma.server.metadata.User; import com.linecorp.centraldogma.server.storage.project.Project; @@ -32,15 +35,19 @@ public final class MirrorTask { private final Mirror mirror; private final User triggeredBy; private final Instant triggeredTime; + @Nullable + private final String currentZone; private final boolean scheduled; /** * Creates a new instance. */ - public MirrorTask(Mirror mirror, User triggeredBy, Instant triggeredTime, boolean scheduled) { + public MirrorTask(Mirror mirror, User triggeredBy, Instant triggeredTime, @Nullable String currentZone, + boolean scheduled) { this.mirror = mirror; this.triggeredTime = triggeredTime; this.triggeredBy = triggeredBy; + this.currentZone = currentZone; this.scheduled = scheduled; } @@ -72,6 +79,15 @@ public Instant triggeredTime() { return triggeredTime; } + /** + * Returns the current zone where the {@link Mirror} is running. + * This value is {@code null} if the {@link ZoneConfig} is not available. + */ + @Nullable + public String currentZone() { + return currentZone; + } + /** * Returns whether the {@link Mirror} is triggered by a scheduler. */ diff --git a/server/src/main/java/com/linecorp/centraldogma/server/mirror/MirroringServicePluginConfig.java b/server/src/main/java/com/linecorp/centraldogma/server/mirror/MirroringServicePluginConfig.java index b76b4c7f7b..3aa4afde0b 100644 --- a/server/src/main/java/com/linecorp/centraldogma/server/mirror/MirroringServicePluginConfig.java +++ b/server/src/main/java/com/linecorp/centraldogma/server/mirror/MirroringServicePluginConfig.java @@ -32,7 +32,7 @@ public final class MirroringServicePluginConfig extends AbstractPluginConfig { public static final MirroringServicePluginConfig INSTANCE = - new MirroringServicePluginConfig(true, null, null, null); + new MirroringServicePluginConfig(true, null, null, null, false); static final int DEFAULT_NUM_MIRRORING_THREADS = 16; static final int DEFAULT_MAX_NUM_FILES_PER_MIRROR = 8192; @@ -41,12 +41,13 @@ public final class MirroringServicePluginConfig extends AbstractPluginConfig { private final int numMirroringThreads; private final int maxNumFilesPerMirror; private final long maxNumBytesPerMirror; + private final boolean zonePinned; /** * Creates a new instance. */ public MirroringServicePluginConfig(boolean enabled) { - this(enabled, null, null, null); + this(enabled, null, null, null, false); } /** @@ -57,7 +58,8 @@ public MirroringServicePluginConfig( @JsonProperty("enabled") @Nullable Boolean enabled, @JsonProperty("numMirroringThreads") @Nullable Integer numMirroringThreads, @JsonProperty("maxNumFilesPerMirror") @Nullable Integer maxNumFilesPerMirror, - @JsonProperty("maxNumBytesPerMirror") @Nullable Long maxNumBytesPerMirror) { + @JsonProperty("maxNumBytesPerMirror") @Nullable Long maxNumBytesPerMirror, + @JsonProperty("zonePinned") boolean zonePinned) { super(enabled); this.numMirroringThreads = firstNonNull(numMirroringThreads, DEFAULT_NUM_MIRRORING_THREADS); checkArgument(this.numMirroringThreads > 0, @@ -68,6 +70,7 @@ public MirroringServicePluginConfig( this.maxNumBytesPerMirror = firstNonNull(maxNumBytesPerMirror, DEFAULT_MAX_NUM_BYTES_PER_MIRROR); checkArgument(this.maxNumBytesPerMirror > 0, "maxNumBytesPerMirror: %s (expected: > 0)", this.maxNumBytesPerMirror); + this.zonePinned = zonePinned; } /** @@ -94,12 +97,21 @@ public long maxNumBytesPerMirror() { return maxNumBytesPerMirror; } + /** + * Returns whether a {@link Mirror} is pinned to a specific zone. + */ + @JsonProperty("zonePinned") + public boolean zonePinned() { + return zonePinned; + } + @Override public String toString() { return MoreObjects.toStringHelper(this) .add("numMirroringThreads", numMirroringThreads) .add("maxNumFilesPerMirror", maxNumFilesPerMirror) .add("maxNumBytesPerMirror", maxNumBytesPerMirror) + .add("zonePinned", zonePinned) .toString(); } } diff --git a/server/src/main/java/com/linecorp/centraldogma/server/plugin/AllReplicasPlugin.java b/server/src/main/java/com/linecorp/centraldogma/server/plugin/AllReplicasPlugin.java index 7c4ecf1a20..2f0605ef4c 100644 --- a/server/src/main/java/com/linecorp/centraldogma/server/plugin/AllReplicasPlugin.java +++ b/server/src/main/java/com/linecorp/centraldogma/server/plugin/AllReplicasPlugin.java @@ -15,8 +15,11 @@ */ package com.linecorp.centraldogma.server.plugin; +import com.linecorp.centraldogma.server.CentralDogmaConfig; + /** - * A Base class for {@link Plugin} whose {@link #target()} is {@link PluginTarget#ALL_REPLICAS}. + * A Base class for {@link Plugin} whose {@link #target(CentralDogmaConfig)} is + * {@link PluginTarget#ALL_REPLICAS}. */ public abstract class AllReplicasPlugin implements Plugin { @@ -26,7 +29,7 @@ public abstract class AllReplicasPlugin implements Plugin { public void init(PluginInitContext pluginInitContext) {} @Override - public final PluginTarget target() { + public final PluginTarget target(CentralDogmaConfig config) { return PluginTarget.ALL_REPLICAS; } } diff --git a/server/src/main/java/com/linecorp/centraldogma/server/plugin/Plugin.java b/server/src/main/java/com/linecorp/centraldogma/server/plugin/Plugin.java index f6306e2918..f8bb613be2 100644 --- a/server/src/main/java/com/linecorp/centraldogma/server/plugin/Plugin.java +++ b/server/src/main/java/com/linecorp/centraldogma/server/plugin/Plugin.java @@ -27,7 +27,7 @@ public interface Plugin { /** * Returns the {@link PluginTarget} which specifies the replicas that this {@link Plugin} is applied to. */ - PluginTarget target(); + PluginTarget target(CentralDogmaConfig config); /** * Invoked when this {@link Plugin} is supposed to be started. diff --git a/server/src/main/java/com/linecorp/centraldogma/server/storage/repository/MetaRepository.java b/server/src/main/java/com/linecorp/centraldogma/server/storage/repository/MetaRepository.java index 49526fdc68..f82864a8d5 100644 --- a/server/src/main/java/com/linecorp/centraldogma/server/storage/repository/MetaRepository.java +++ b/server/src/main/java/com/linecorp/centraldogma/server/storage/repository/MetaRepository.java @@ -19,8 +19,11 @@ import java.util.List; import java.util.concurrent.CompletableFuture; +import javax.annotation.Nullable; + import com.linecorp.centraldogma.common.Author; import com.linecorp.centraldogma.internal.api.v1.MirrorDto; +import com.linecorp.centraldogma.server.ZoneConfig; import com.linecorp.centraldogma.server.command.Command; import com.linecorp.centraldogma.server.command.CommitResult; import com.linecorp.centraldogma.server.credential.Credential; @@ -63,6 +66,7 @@ default CompletableFuture> mirrors() { * Create a push {@link Command} for the {@link MirrorDto}. */ CompletableFuture> createPushCommand(MirrorDto mirrorDto, Author author, + @Nullable ZoneConfig zoneConfig, boolean update); /** diff --git a/server/src/test/java/com/linecorp/centraldogma/server/PluginGroupTest.java b/server/src/test/java/com/linecorp/centraldogma/server/PluginGroupTest.java index 5602b614f7..b63ed4ce63 100644 --- a/server/src/test/java/com/linecorp/centraldogma/server/PluginGroupTest.java +++ b/server/src/test/java/com/linecorp/centraldogma/server/PluginGroupTest.java @@ -41,7 +41,7 @@ void confirmPluginsForAllReplicasLoaded() { final CentralDogmaConfig cfg = mock(CentralDogmaConfig.class); final PluginGroup group = PluginGroup.loadPlugins(PluginTarget.ALL_REPLICAS, cfg); assertThat(group).isNotNull(); - confirmPluginStartStop(group.findFirstPlugin(NoopPluginForAllReplicas.class).orElse(null)); + confirmPluginStartStop(group.findFirstPlugin(NoopPluginForAllReplicas.class)); } @Test @@ -49,7 +49,7 @@ void confirmPluginsForLeaderLoaded() { final CentralDogmaConfig cfg = mock(CentralDogmaConfig.class); final PluginGroup group = PluginGroup.loadPlugins(PluginTarget.LEADER_ONLY, cfg); assertThat(group).isNotNull(); - confirmPluginStartStop(group.findFirstPlugin(NoopPluginForLeader.class).orElse(null)); + confirmPluginStartStop(group.findFirstPlugin(NoopPluginForLeader.class)); } @Test @@ -58,19 +58,19 @@ void confirmDefaultMirroringServiceLoadedDependingOnConfig() { when(cfg.pluginConfigMap()).thenReturn(ImmutableMap.of()); final PluginGroup group1 = PluginGroup.loadPlugins(PluginTarget.LEADER_ONLY, cfg); assertThat(group1).isNotNull(); - assertThat(group1.findFirstPlugin(DefaultMirroringServicePlugin.class)).isPresent(); + assertThat(group1.findFirstPlugin(DefaultMirroringServicePlugin.class)).isNotNull(); when(cfg.pluginConfigMap()).thenReturn(ImmutableMap.of( MirroringServicePluginConfig.class, new MirroringServicePluginConfig(true))); final PluginGroup group2 = PluginGroup.loadPlugins(PluginTarget.LEADER_ONLY, cfg); assertThat(group2).isNotNull(); - assertThat(group2.findFirstPlugin(DefaultMirroringServicePlugin.class)).isPresent(); + assertThat(group2.findFirstPlugin(DefaultMirroringServicePlugin.class)).isNotNull(); when(cfg.pluginConfigMap()).thenReturn(ImmutableMap.of( MirroringServicePluginConfig.class, new MirroringServicePluginConfig(false))); final PluginGroup group3 = PluginGroup.loadPlugins(PluginTarget.LEADER_ONLY, cfg); assertThat(group3).isNotNull(); - assertThat(group3.findFirstPlugin(DefaultMirroringServicePlugin.class)).isNotPresent(); + assertThat(group3.findFirstPlugin(DefaultMirroringServicePlugin.class)).isNull(); } /** @@ -83,12 +83,12 @@ void confirmScheduledPurgingServiceLoadedDependingOnConfig() { when(cfg.maxRemovedRepositoryAgeMillis()).thenReturn(1L); final PluginGroup group1 = PluginGroup.loadPlugins(PluginTarget.LEADER_ONLY, cfg); assertThat(group1).isNotNull(); - assertThat(group1.findFirstPlugin(PurgeSchedulingServicePlugin.class)).isPresent(); + assertThat(group1.findFirstPlugin(PurgeSchedulingServicePlugin.class)).isNotNull(); when(cfg.maxRemovedRepositoryAgeMillis()).thenReturn(0L); final PluginGroup group2 = PluginGroup.loadPlugins(PluginTarget.LEADER_ONLY, cfg); assertThat(group2).isNotNull(); - assertThat(group2.findFirstPlugin(PurgeSchedulingServicePlugin.class)).isNotPresent(); + assertThat(group2.findFirstPlugin(PurgeSchedulingServicePlugin.class)).isNull(); } private static void confirmPluginStartStop(@Nullable AbstractNoopPlugin plugin) { diff --git a/server/src/test/java/com/linecorp/centraldogma/server/internal/mirror/CentralDogmaMirrorTest.java b/server/src/test/java/com/linecorp/centraldogma/server/internal/mirror/CentralDogmaMirrorTest.java index d47fbf0215..da64562f60 100644 --- a/server/src/test/java/com/linecorp/centraldogma/server/internal/mirror/CentralDogmaMirrorTest.java +++ b/server/src/test/java/com/linecorp/centraldogma/server/internal/mirror/CentralDogmaMirrorTest.java @@ -123,7 +123,7 @@ static T newMirror(String remoteUri, Cron schedule, final Mirror mirror = new CentralDogmaMirrorProvider().newMirror( new MirrorContext(mirrorId, true, schedule, MirrorDirection.LOCAL_TO_REMOTE, - credential, repository, "/", URI.create(remoteUri), null)); + credential, repository, "/", URI.create(remoteUri), null, null)); assertThat(mirror).isInstanceOf(mirrorType); assertThat(mirror.id()).isEqualTo(mirrorId); @@ -141,7 +141,7 @@ static void assertMirrorNull(String remoteUri) { final Credential credential = mock(Credential.class); final Mirror mirror = new CentralDogmaMirrorProvider().newMirror( new MirrorContext("mirror-id", true, EVERY_MINUTE, MirrorDirection.LOCAL_TO_REMOTE, - credential, mock(Repository.class), "/", URI.create(remoteUri), null)); + credential, mock(Repository.class), "/", URI.create(remoteUri), null, null)); assertThat(mirror).isNull(); } } diff --git a/server/src/test/java/com/linecorp/centraldogma/server/plugin/NoopPluginForAllReplicas.java b/server/src/test/java/com/linecorp/centraldogma/server/plugin/NoopPluginForAllReplicas.java index d9c624e70c..d4809bacff 100644 --- a/server/src/test/java/com/linecorp/centraldogma/server/plugin/NoopPluginForAllReplicas.java +++ b/server/src/test/java/com/linecorp/centraldogma/server/plugin/NoopPluginForAllReplicas.java @@ -15,9 +15,11 @@ */ package com.linecorp.centraldogma.server.plugin; +import com.linecorp.centraldogma.server.CentralDogmaConfig; + public class NoopPluginForAllReplicas extends AbstractNoopPlugin { @Override - public PluginTarget target() { + public PluginTarget target(CentralDogmaConfig config) { return PluginTarget.ALL_REPLICAS; } diff --git a/server/src/test/java/com/linecorp/centraldogma/server/plugin/NoopPluginForLeader.java b/server/src/test/java/com/linecorp/centraldogma/server/plugin/NoopPluginForLeader.java index f8d207a0b0..f1055a2365 100644 --- a/server/src/test/java/com/linecorp/centraldogma/server/plugin/NoopPluginForLeader.java +++ b/server/src/test/java/com/linecorp/centraldogma/server/plugin/NoopPluginForLeader.java @@ -15,9 +15,11 @@ */ package com.linecorp.centraldogma.server.plugin; +import com.linecorp.centraldogma.server.CentralDogmaConfig; + public class NoopPluginForLeader extends AbstractNoopPlugin { @Override - public PluginTarget target() { + public PluginTarget target(CentralDogmaConfig config) { return PluginTarget.LEADER_ONLY; } diff --git a/site/src/sphinx/mirroring.rst b/site/src/sphinx/mirroring.rst index a611e86c8d..ee3c70a473 100644 --- a/site/src/sphinx/mirroring.rst +++ b/site/src/sphinx/mirroring.rst @@ -57,7 +57,8 @@ Setting up a mirroring task "gitignore": [ "/credential.txt", "private_dir" - ] + ], + "zone": "zone1" } - ``id`` (string) @@ -117,6 +118,15 @@ Setting up a mirroring task of strings where each line represents a single pattern. The file pattern expressed in gitignore is relative to the path of ``remoteUri``. +- ``zone`` (string, optional) + + - the zone where the mirroring task is executed. + + - If unspecified: + + - a mirroring task is executed in the first zone of ``zone.allZones`` configuration. + - if ``zone.allZones`` is not configured, a mirroring task is executed in the leader replica. + Setting up a credential ^^^^^^^^^^^^^^^^^^^^^^^ diff --git a/site/src/sphinx/setup-configuration.rst b/site/src/sphinx/setup-configuration.rst index 53a1ef209a..50cd47e79c 100644 --- a/site/src/sphinx/setup-configuration.rst +++ b/site/src/sphinx/setup-configuration.rst @@ -263,16 +263,24 @@ Core properties - the path of the management service. If not specified, the management service is mounted at ``/internal/management``. -- ``zone`` (string) +- ``zone`` - - the zone name of the server. If not specified, ``PluginTarget.ZONE_LEADER_ONLY`` can't be used. + - the zone information of the server. If not specified, ``PluginTarget.ZONE_LEADER_ONLY`` can't be used. - - If the value starts with ``env:``, the environment variable is used as the zone name. + - ``currentZone`` (string) + + - the current zone name. If the value starts with ``env:``, the environment variable is used as the zone name. For example, if the value is ``env:ZONE_NAME``, the environment variable named ``ZONE_NAME`` is used as the zone name. - You can also dynamically load a zone name by implementing :api:`com.linecorp.centraldogma.server.ConfigValueConverter`. + - ``allZones`` (string array) + + - the list of zone names. + + - the current zone name must be included in the list of zone names. + .. _replication: Configuring replication @@ -528,6 +536,7 @@ with ``pluginConfigs`` property in ``dogma.json`` as follows. "numMirroringThreads": null, "maxNumFilesPerMirror": null, "maxNumBytesPerMirror": null, + "zonePinned": false } ] } @@ -555,6 +564,11 @@ properties that can be configured: this, Central Dogma will reject to mirror the Git repository. If ``null``, the default value of '33554432 bytes' (32 MiB) is used. +- ``zonePinned`` (boolean) + + - whether the mirroring plugin is pinned to a specific zone. If ``true``, a mirroring task will be executed + only in the specified zone. If ``false``, the plugin will be executed in the leader replica. + For more information about mirroring, refer to :ref:`mirroring`. .. _hiding_sensitive_property_values: diff --git a/testing-internal/src/main/java/com/linecorp/centraldogma/testing/internal/CentralDogmaReplicationExtension.java b/testing-internal/src/main/java/com/linecorp/centraldogma/testing/internal/CentralDogmaReplicationExtension.java index 76d52fcff7..8da60014f4 100644 --- a/testing-internal/src/main/java/com/linecorp/centraldogma/testing/internal/CentralDogmaReplicationExtension.java +++ b/testing-internal/src/main/java/com/linecorp/centraldogma/testing/internal/CentralDogmaReplicationExtension.java @@ -116,10 +116,17 @@ protected void configure(CentralDogmaBuilder builder) { builder.port(new InetSocketAddress(NetUtil.LOCALHOST4, dogmaPort), SessionProtocol.HTTP) .administrators(TestAuthMessageUtil.USERNAME) .authProviderFactory(factory) - .pluginConfigs(new MirroringServicePluginConfig(false)) .gracefulShutdownTimeout(new GracefulShutdownTimeout(0, 0)) .replication(new ZooKeeperReplicationConfig(serverId, zooKeeperServers)); configureEach(serverId, builder); + final boolean isMirrorConfigured = + builder.pluginConfigs() + .stream() + .anyMatch(pluginCfg -> pluginCfg instanceof MirroringServicePluginConfig); + if (!isMirrorConfigured) { + // Disable the mirroring service when it is not explicitly configured. + builder.pluginConfigs(new MirroringServicePluginConfig(false)); + } } @Override diff --git a/testing/common/src/main/java/com/linecorp/centraldogma/testing/internal/CentralDogmaRuleDelegate.java b/testing/common/src/main/java/com/linecorp/centraldogma/testing/internal/CentralDogmaRuleDelegate.java index 30c76f034c..081ada1822 100644 --- a/testing/common/src/main/java/com/linecorp/centraldogma/testing/internal/CentralDogmaRuleDelegate.java +++ b/testing/common/src/main/java/com/linecorp/centraldogma/testing/internal/CentralDogmaRuleDelegate.java @@ -16,6 +16,8 @@ package com.linecorp.centraldogma.testing.internal; +import static com.google.common.base.Preconditions.checkState; + import java.io.File; import java.io.IOError; import java.net.InetSocketAddress; @@ -217,7 +219,9 @@ public ProjectManager projectManager() { * @throws IllegalStateException if Central Dogma did not start yet */ public final MirroringService mirroringService() { - return dogma().mirroringService().get(); + final MirroringService mirroringService = dogma().mirroringService(); + checkState(mirroringService != null, "Mirroring service not available"); + return mirroringService; } /** diff --git a/webapp/src/dogma/features/api/apiSlice.ts b/webapp/src/dogma/features/api/apiSlice.ts index 56ad6328fe..5942899c53 100644 --- a/webapp/src/dogma/features/api/apiSlice.ts +++ b/webapp/src/dogma/features/api/apiSlice.ts @@ -75,6 +75,16 @@ export type TitleDto = { hostname: string; }; +export type ZoneDto = { + currentZone: string; + allZones: string[]; +}; + +export type MirrorConfig = { + zonePinned: boolean; + zone: ZoneDto; +}; + export const apiSlice = createApi({ reducerPath: 'api', baseQuery: fetchBaseQuery({ @@ -361,6 +371,12 @@ export const apiSlice = createApi({ }), invalidatesTags: ['Metadata'], }), + getMirrorConfig: builder.query({ + query: () => ({ + url: `/api/v1/mirror/config`, + method: 'GET', + }), + }), getCredentials: builder.query({ query: (projectName) => `/api/v1/projects/${projectName}/credentials`, providesTags: ['Metadata'], @@ -396,7 +412,6 @@ export const apiSlice = createApi({ }), getTitle: builder.query({ query: () => ({ - baseUrl: '', url: `/title`, method: 'GET', }), @@ -446,6 +461,7 @@ export const { useUpdateMirrorMutation, useDeleteMirrorMutation, useRunMirrorMutation, + useGetMirrorConfigQuery, // Credential useGetCredentialsQuery, useGetCredentialQuery, diff --git a/webapp/src/dogma/features/project/settings/mirrors/MirrorDto.ts b/webapp/src/dogma/features/project/settings/mirrors/MirrorDto.ts index cea47b7040..fcd325b6ad 100644 --- a/webapp/src/dogma/features/project/settings/mirrors/MirrorDto.ts +++ b/webapp/src/dogma/features/project/settings/mirrors/MirrorDto.ts @@ -12,4 +12,5 @@ export interface MirrorDto { gitignore?: string; credentialId: string; enabled: boolean; + zone?: string; } diff --git a/webapp/src/dogma/features/project/settings/mirrors/MirrorForm.tsx b/webapp/src/dogma/features/project/settings/mirrors/MirrorForm.tsx index 78623d16c2..30125e03d2 100644 --- a/webapp/src/dogma/features/project/settings/mirrors/MirrorForm.tsx +++ b/webapp/src/dogma/features/project/settings/mirrors/MirrorForm.tsx @@ -44,7 +44,7 @@ import { ExternalLinkIcon } from '@chakra-ui/icons'; import { GoArrowBoth, GoArrowDown, GoArrowUp, GoKey, GoRepo } from 'react-icons/go'; import { Select } from 'chakra-react-select'; import { IoBanSharp } from 'react-icons/io5'; -import { useGetCredentialsQuery, useGetReposQuery } from 'dogma/features/api/apiSlice'; +import { useGetCredentialsQuery, useGetMirrorConfigQuery, useGetReposQuery } from 'dogma/features/api/apiSlice'; import React, { useMemo, useState } from 'react'; import FieldErrorMessage from 'dogma/common/components/form/FieldErrorMessage'; import { RepoDto } from 'dogma/features/repo/RepoDto'; @@ -52,6 +52,7 @@ import { MirrorDto } from 'dogma/features/project/settings/mirrors/MirrorDto'; import { CredentialDto } from 'dogma/features/project/settings/credentials/CredentialDto'; import { FiBox } from 'react-icons/fi'; import cronstrue from 'cronstrue'; +import { CiLocationOn } from 'react-icons/ci'; interface MirrorFormProps { projectName: string; @@ -81,11 +82,14 @@ const MirrorForm = ({ projectName, defaultValue, onSubmit, isWaitingResponse }: setValue, control, watch, - } = useForm(); + } = useForm({ + defaultValues: defaultValue, + }); const isNew = defaultValue.id === ''; const { data: repos } = useGetReposQuery(projectName); const { data: credentials } = useGetCredentialsQuery(projectName); + const { data: zoneConfig } = useGetMirrorConfigQuery(); const [isScheduleEnabled, setScheduleEnabled] = useState(defaultValue.schedule != null); const schedule = watch('schedule'); @@ -104,6 +108,13 @@ const MirrorForm = ({ projectName, defaultValue, onSubmit, isWaitingResponse }: label: credential.id, })); + const zoneOptions: OptionType[] = (zoneConfig?.zonePinned ? zoneConfig.zone.allZones : []).map( + (zone: string) => ({ + value: zone, + label: zone, + }), + ); + useMemo(() => { // `defaultValue` property is not working when using `react-select` with `react-hook-form`. So we have to // set the value manually. https://stackoverflow.com/a/66723262/1736581 @@ -112,6 +123,7 @@ const MirrorForm = ({ projectName, defaultValue, onSubmit, isWaitingResponse }: setValue('remoteScheme', defaultValue.remoteScheme); setValue('credentialId', defaultValue.credentialId); setValue('direction', defaultValue.direction); + setValue('zone', defaultValue.zone); } }, [ isNew, @@ -128,6 +140,9 @@ const MirrorForm = ({ projectName, defaultValue, onSubmit, isWaitingResponse }: const defaultCredential: OptionType = defaultValue.credentialId ? { value: defaultValue.credentialId, label: defaultValue.credentialId } : null; + const defaultZone: OptionType = defaultValue.zone + ? { value: defaultValue.zone, label: defaultValue.zone } + : null; return (
+ {zoneConfig?.zonePinned && ( + <> + + + + + ( +