Skip to content

Commit

Permalink
Provide a way to pin a mirror to a zone (line#1062)
Browse files Browse the repository at this point in the history
Motivation:

I propose running the mirror in a zone close to the git server or only
in zones that are allowed access.

Modifications:

- Add `zone` as an optional property to mirror configurations.
- Add `GET /api/v1/mirror/config` API to provide zone-related
configurations.
  - The API is used to select a pinned zone on the mirror form.
- Add `zonePinned` flag to `MirroringServicePluginConfig` to enable
zone-pinned mirroring.
  - The option is disabled by default.
- `DefaultMirroringServicePlugin.target()` returns
`PluginTarget.ZONE_LEADER_ONLY` if `zonePinned == true`
- Fixed `MirrorSchedulingService` to only run a pinned zone if
`zonePinned == true`
- Add `ZoneConfig` to replace `zone: string` configuration.
  - `allZones` is newly added to specify the list of zone names.
- Breaking) Make `Plugin.target()` take `CentralDogmaConfig` as an
argument.

Result:

You can now specify a zone where you want to perform mirroring.
  • Loading branch information
ikhoon authored Dec 5, 2024
1 parent bebcfb7 commit 3a261ba
Show file tree
Hide file tree
Showing 57 changed files with 985 additions and 169 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ public final class MirrorDto {
@Nullable
private final String gitignore;
private final String credentialId;
@Nullable
private final String zone;

@JsonCreator
public MirrorDto(@JsonProperty("id") String id,
Expand All @@ -62,7 +64,8 @@ public MirrorDto(@JsonProperty("id") String id,
@JsonProperty("remotePath") String remotePath,
@JsonProperty("remoteBranch") String remoteBranch,
@JsonProperty("gitignore") @Nullable String gitignore,
@JsonProperty("credentialId") String credentialId) {
@JsonProperty("credentialId") String credentialId,
@JsonProperty("zone") @Nullable String zone) {
this.id = requireNonNull(id, "id");
this.enabled = firstNonNull(enabled, true);
this.projectName = requireNonNull(projectName, "projectName");
Expand All @@ -76,6 +79,7 @@ public MirrorDto(@JsonProperty("id") String id,
this.remoteBranch = requireNonNull(remoteBranch, "remoteBranch");
this.gitignore = gitignore;
this.credentialId = requireNonNull(credentialId, "credentialId");
this.zone = zone;
}

@JsonProperty("id")
Expand Down Expand Up @@ -145,6 +149,12 @@ public String credentialId() {
return credentialId;
}

@Nullable
@JsonProperty("zone")
public String zone() {
return zone;
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand All @@ -166,13 +176,14 @@ public boolean equals(Object o) {
remotePath.equals(mirrorDto.remotePath) &&
remoteBranch.equals(mirrorDto.remoteBranch) &&
Objects.equals(gitignore, mirrorDto.gitignore) &&
credentialId.equals(mirrorDto.credentialId);
credentialId.equals(mirrorDto.credentialId) &&
Objects.equals(zone, mirrorDto.zone);
}

@Override
public int hashCode() {
return Objects.hash(id, projectName, schedule, direction, localRepo, localPath, remoteScheme,
remoteUrl, remotePath, remoteBranch, gitignore, credentialId, enabled);
remoteUrl, remotePath, remoteBranch, gitignore, credentialId, enabled, zone);
}

@Override
Expand All @@ -191,6 +202,7 @@ public String toString() {
.add("remotePath", remotePath)
.add("gitignore", gitignore)
.add("credentialId", credentialId)
.add("zone", zone)
.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ void shouldNotifyMirrorEvents() {
final Mirror mirror = new AbstractMirror("my-mirror-1", true, EVERY_SECOND,
MirrorDirection.REMOTE_TO_LOCAL,
Credential.FALLBACK, r, "/",
URI.create("unused://uri"), "/", "", null) {
URI.create("unused://uri"), "/", "", null, null) {
@Override
protected MirrorResult mirrorLocalToRemote(File workDir, int maxNumFiles, long maxNumBytes,
Instant triggeredTime) {
Expand All @@ -114,7 +114,7 @@ protected MirrorResult mirrorRemoteToLocal(File workDir, CommandExecutor executo
when(mr.mirrors()).thenReturn(CompletableFuture.completedFuture(ImmutableList.of(mirror)));

final MirrorSchedulingService service = new MirrorSchedulingService(
temporaryFolder, pm, new SimpleMeterRegistry(), 1, 1, 1);
temporaryFolder, pm, new SimpleMeterRegistry(), 1, 1, 1, null);
final CommandExecutor executor = mock(CommandExecutor.class);
service.start(executor);

Expand Down
2 changes: 2 additions & 0 deletions it/mirror/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,6 @@ dependencies {
testImplementation libs.jsch
testImplementation libs.mina.sshd.core
testImplementation libs.mina.sshd.git
testImplementation libs.zookeeper
testImplementation libs.dropwizard.metrics.core
}
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,8 @@ class GitMirrorIntegrationTest {
static final CentralDogmaExtension dogma = new CentralDogmaExtension() {
@Override
protected void configure(CentralDogmaBuilder builder) {
builder.pluginConfigs(new MirroringServicePluginConfig(true, 1, MAX_NUM_FILES, MAX_NUM_BYTES));
builder.pluginConfigs(new MirroringServicePluginConfig(true, 1, MAX_NUM_FILES, MAX_NUM_BYTES,
false));
}
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,8 @@ class LocalToRemoteGitMirrorTest {
static final CentralDogmaExtension dogma = new CentralDogmaExtension() {
@Override
protected void configure(CentralDogmaBuilder builder) {
builder.pluginConfigs(new MirroringServicePluginConfig(true, 1, MAX_NUM_FILES, MAX_NUM_BYTES));
builder.pluginConfigs(
new MirroringServicePluginConfig(true, 1, MAX_NUM_FILES, MAX_NUM_BYTES, false));
}
};

Expand Down Expand Up @@ -371,7 +372,7 @@ private void pushMirrorSettings(String localRepo, @Nullable String localPath, @N
.push().join();
} catch (CompletionException e) {
if (e.getCause() instanceof RedundantChangeException) {
// The same content can be pushed several times.
// The same content can be pushed several times.
} else {
throw e;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,10 @@

class MirrorRunnerTest {

private static final String FOO_PROJ = "foo";
private static final String BAR_REPO = "bar";
private static final String PRIVATE_KEY_FILE = "ecdsa_256.openssh";
private static final String TEST_MIRROR_ID = "test-mirror";
static final String FOO_PROJ = "foo";
static final String BAR_REPO = "bar";
static final String PRIVATE_KEY_FILE = "ecdsa_256.openssh";
static final String TEST_MIRROR_ID = "test-mirror";

@RegisterExtension
static final CentralDogmaExtension dogma = new CentralDogmaExtension() {
Expand Down Expand Up @@ -159,10 +159,11 @@ private static MirrorDto newMirror() {
"/",
"main",
null,
PRIVATE_KEY_FILE);
PRIVATE_KEY_FILE,
null);
}

private static PublicKeyCredential getCredential() throws Exception {
static PublicKeyCredential getCredential() throws Exception {
final String publicKeyFile = "ecdsa_256.openssh.pub";

final byte[] privateKeyBytes =
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Copyright 2024 LINE Corporation
*
* LINE Corporation licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package com.linecorp.centraldogma.it.mirror.git;

import static com.google.common.base.MoreObjects.firstNonNull;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.linecorp.centraldogma.server.mirror.MirrorListener;
import com.linecorp.centraldogma.server.mirror.MirrorResult;
import com.linecorp.centraldogma.server.mirror.MirrorTask;

public class TestZoneAwareMirrorListener implements MirrorListener {

private static final Logger logger = LoggerFactory.getLogger(TestZoneAwareMirrorListener.class);

static final Map<String, Integer> startCount = new ConcurrentHashMap<>();
static final Map<String, List<MirrorResult>> completions = new ConcurrentHashMap<>();
static final Map<String, List<Throwable>> errors = new ConcurrentHashMap<>();

static void reset() {
startCount.clear();
completions.clear();
errors.clear();
}

private static String key(MirrorTask task) {
return firstNonNull(task.currentZone(), "default");
}

@Override
public void onStart(MirrorTask mirror) {
logger.debug("onStart: {}", mirror);
startCount.merge(key(mirror), 1, Integer::sum);
}

@Override
public void onComplete(MirrorTask mirror, MirrorResult result) {
logger.debug("onComplete: {} -> {}", mirror, result);
final List<MirrorResult> results = new ArrayList<>();
results.add(result);
completions.merge(key(mirror), results, (oldValue, newValue) -> {
oldValue.addAll(newValue);
return oldValue;
});
}

@Override
public void onError(MirrorTask mirror, Throwable cause) {
logger.debug("onError: {}", mirror, cause);
final List<Throwable> exceptions = new ArrayList<>();
exceptions.add(cause);
errors.merge(key(mirror), exceptions, (oldValue, newValue) -> {
oldValue.addAll(newValue);
return oldValue;
});
}
}
Loading

0 comments on commit 3a261ba

Please sign in to comment.