Skip to content

Commit

Permalink
[Backport 2.x] Backport remote publication stats (#16699)
Browse files Browse the repository at this point in the history
* Separating remote download and publication stats (#16682)

Signed-off-by: Himshikha Gupta <[email protected]>
  • Loading branch information
himshikha authored Nov 25, 2024
1 parent 8492199 commit 1b2c543
Show file tree
Hide file tree
Showing 7 changed files with 262 additions and 172 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add a flag in QueryShardContext to differentiate inner hit query ([#16600](https://github.com/opensearch-project/OpenSearch/pull/16600))
- Add vertical scaling and SoftReference for snapshot repository data cache ([#16489](https://github.com/opensearch-project/OpenSearch/pull/16489))
- Add new configuration setting `synonym_analyzer`, to the `synonym` and `synonym_graph` filters, enabling the specification of a custom analyzer for reading the synonym file ([#16488](https://github.com/opensearch-project/OpenSearch/pull/16488)).
- Add stats for remote publication failure and move download failure stats to remote methods([#16682](https://github.com/opensearch-project/OpenSearch/pull/16682/))

### Dependencies
- Bump `com.google.cloud:google-cloud-core-http` from 2.23.0 to 2.47.0 ([#16504](https://github.com/opensearch-project/OpenSearch/pull/16504))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -298,9 +298,9 @@ PublishWithJoinResponse handleIncomingRemotePublishRequest(RemotePublishRequest
}
} catch (Exception e) {
if (applyFullState) {
remoteClusterStateService.fullDownloadFailed();
remoteClusterStateService.fullIncomingPublicationFailed();
} else {
remoteClusterStateService.diffDownloadFailed();
remoteClusterStateService.diffIncomingPublicationFailed();
}
throw e;
}
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,13 @@
public class RemoteDownloadStats extends PersistedStateStats {
static final String CHECKSUM_VALIDATION_FAILED_COUNT = "checksum_validation_failed_count";
private AtomicLong checksumValidationFailedCount = new AtomicLong(0);
public static final String INCOMING_PUBLICATION_FAILED_COUNT = "incoming_publication_failed_count";
private AtomicLong incomingPublicationFailedCount = new AtomicLong(0);

public RemoteDownloadStats(String statsName) {
super(statsName);
addToExtendedFields(CHECKSUM_VALIDATION_FAILED_COUNT, checksumValidationFailedCount);
addToExtendedFields(INCOMING_PUBLICATION_FAILED_COUNT, incomingPublicationFailedCount);
}

public void checksumValidationFailedCount() {
Expand All @@ -33,4 +36,12 @@ public void checksumValidationFailedCount() {
public long getChecksumValidationFailedCount() {
return checksumValidationFailedCount.get();
}

public void incomingPublicationFailedCount() {
incomingPublicationFailedCount.incrementAndGet();
}

public long getIncomingPublicationFailedCount() {
return incomingPublicationFailedCount.get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,14 @@ public long getStateFullDownloadValidationFailed() {
return remoteFullDownloadStats.getChecksumValidationFailedCount();
}

public void stateDiffIncomingPublicationFailed() {
remoteDiffDownloadStats.incomingPublicationFailedCount();
}

public void stateFullIncomingPublicationFailed() {
remoteFullDownloadStats.incomingPublicationFailedCount();
}

public PersistedStateStats getUploadStats() {
return remoteUploadStats;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.opensearch.gateway.remote.ClusterMetadataManifest;
import org.opensearch.gateway.remote.ClusterStateDiffManifest;
import org.opensearch.gateway.remote.RemoteClusterStateService;
import org.opensearch.gateway.remote.RemoteDownloadStats;
import org.opensearch.node.Node;
import org.opensearch.telemetry.tracing.noop.NoopTracer;
import org.opensearch.test.OpenSearchTestCase;
Expand All @@ -76,7 +77,10 @@
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
Expand Down Expand Up @@ -180,8 +184,8 @@ public void testHandleIncomingRemotePublishRequestWhenNoCurrentPublishRequest()
() -> handler.handleIncomingRemotePublishRequest(remotePublishRequest)
);
assertThat(e.getMessage(), containsString("publication to self failed"));
verify(remoteClusterStateService, times(0)).fullDownloadFailed();
verify(remoteClusterStateService, times(1)).diffDownloadFailed();
verify(remoteClusterStateService, times(0)).fullIncomingPublicationFailed();
verify(remoteClusterStateService, times(1)).diffIncomingPublicationFailed();
verifyNoMoreInteractions(remoteClusterStateService);
}

Expand All @@ -207,8 +211,8 @@ public void testHandleIncomingRemotePublishRequestWhenTermMismatch() {
() -> handler.handleIncomingRemotePublishRequest(remotePublishRequest)
);
assertThat(e.getMessage(), containsString("publication to self failed"));
verify(remoteClusterStateService, times(0)).fullDownloadFailed();
verify(remoteClusterStateService, times(1)).diffDownloadFailed();
verify(remoteClusterStateService, times(0)).fullIncomingPublicationFailed();
verify(remoteClusterStateService, times(1)).diffIncomingPublicationFailed();
verifyNoMoreInteractions(remoteClusterStateService);
}

Expand All @@ -234,8 +238,8 @@ public void testHandleIncomingRemotePublishRequestWhenVersionMismatch() {
() -> handler.handleIncomingRemotePublishRequest(remotePublishRequest)
);
assertThat(e.getMessage(), containsString("publication to self failed"));
verify(remoteClusterStateService, times(1)).diffDownloadFailed();
verify(remoteClusterStateService, times(0)).fullDownloadFailed();
verify(remoteClusterStateService, times(1)).diffIncomingPublicationFailed();
verify(remoteClusterStateService, times(0)).fullIncomingPublicationFailed();
verifyNoMoreInteractions(remoteClusterStateService);
}

Expand Down Expand Up @@ -263,20 +267,20 @@ public void testHandleIncomingRemotePublishRequestForLocalNode() throws IOExcept

public void testDownloadRemotePersistedFullStateFailedStats() throws IOException {
RemoteClusterStateService remoteClusterStateService = mock(RemoteClusterStateService.class);
PersistedStateStats remoteFullDownloadStats = new PersistedStateStats("dummy_full_stats");
PersistedStateStats remoteDiffDownloadStats = new PersistedStateStats("dummy_diff_stats");
RemoteDownloadStats remoteFullDownloadStats = new RemoteDownloadStats("dummy_full_stats");
RemoteDownloadStats remoteDiffDownloadStats = new RemoteDownloadStats("dummy_diff_stats");
when(remoteClusterStateService.getFullDownloadStats()).thenReturn(remoteFullDownloadStats);
when(remoteClusterStateService.getDiffDownloadStats()).thenReturn(remoteDiffDownloadStats);

doAnswer((i) -> {
remoteFullDownloadStats.stateFailed();
remoteFullDownloadStats.incomingPublicationFailedCount();
return null;
}).when(remoteClusterStateService).fullDownloadFailed();
}).when(remoteClusterStateService).fullIncomingPublicationFailed();

doAnswer((i) -> {
remoteDiffDownloadStats.stateFailed();
remoteDiffDownloadStats.incomingPublicationFailedCount();
return null;
}).when(remoteClusterStateService).diffDownloadFailed();
}).when(remoteClusterStateService).diffIncomingPublicationFailed();

PublishWithJoinResponse expectedPublishResponse = new PublishWithJoinResponse(new PublishResponse(TERM, VERSION), Optional.empty());
Function<PublishRequest, PublishWithJoinResponse> handlePublishRequest = p -> expectedPublishResponse;
Expand All @@ -294,13 +298,49 @@ public void testDownloadRemotePersistedFullStateFailedStats() throws IOException
handler.setCurrentPublishRequestToSelf(publishRequest);

assertThrows(IllegalStateException.class, () -> handler.handleIncomingRemotePublishRequest(remotePublishRequest));
assertEquals(1, remoteClusterStateService.getDiffDownloadStats().getFailedCount());
assertEquals(0, remoteClusterStateService.getFullDownloadStats().getFailedCount());
assertEquals(1, ((RemoteDownloadStats) remoteClusterStateService.getDiffDownloadStats()).getIncomingPublicationFailedCount());
assertEquals(0, ((RemoteDownloadStats) remoteClusterStateService.getFullDownloadStats()).getIncomingPublicationFailedCount());
}

public void testDownloadRemotePersistedFullStateFailedStatsManifestExists() throws IOException {
RemoteClusterStateService remoteClusterStateService = mock(RemoteClusterStateService.class);
RemoteDownloadStats remoteFullDownloadStats = new RemoteDownloadStats("dummy_full_stats");
when(remoteClusterStateService.getFullDownloadStats()).thenReturn(remoteFullDownloadStats);

doAnswer((i) -> {
remoteFullDownloadStats.incomingPublicationFailedCount();
return null;
}).when(remoteClusterStateService).fullIncomingPublicationFailed();

ClusterMetadataManifest metadataManifest = new ClusterMetadataManifest.Builder().diffManifest(
new ClusterStateDiffManifest.Builder().fromStateUUID("state-uuid").build()
).build();
when(remoteClusterStateService.getClusterMetadataManifestByFileName(any(), any())).thenReturn(metadataManifest);

doThrow(new RuntimeException("test exception")).when(remoteClusterStateService)
.getClusterStateForManifest(anyString(), any(), any(), anyBoolean());
PublishWithJoinResponse expectedPublishResponse = new PublishWithJoinResponse(new PublishResponse(TERM, VERSION), Optional.empty());
Function<PublishRequest, PublishWithJoinResponse> handlePublishRequest = p -> expectedPublishResponse;
final PublicationTransportHandler handler = getPublicationTransportHandler(handlePublishRequest, remoteClusterStateService);
RemotePublishRequest remotePublishRequest = new RemotePublishRequest(
secondNode,
TERM,
VERSION,
CLUSTER_NAME,
CLUSTER_UUID,
MANIFEST_FILE
);
ClusterState clusterState = buildClusterState(TERM, VERSION);
PublishRequest publishRequest = new PublishRequest(clusterState);
handler.setCurrentPublishRequestToSelf(publishRequest);

assertThrows(RuntimeException.class, () -> handler.handleIncomingRemotePublishRequest(remotePublishRequest));
assertEquals(1, ((RemoteDownloadStats) remoteClusterStateService.getFullDownloadStats()).getIncomingPublicationFailedCount());
}

public void testDownloadRemotePersistedDiffStateFailedStats() throws IOException {
RemoteClusterStateService remoteClusterStateService = mock(RemoteClusterStateService.class);
PersistedStateStats remoteDiffDownloadStats = new PersistedStateStats("dummy_stats");
RemoteDownloadStats remoteDiffDownloadStats = new RemoteDownloadStats("dummy_stats");
when(remoteClusterStateService.getDiffDownloadStats()).thenReturn(remoteDiffDownloadStats);

ClusterMetadataManifest metadataManifest = new ClusterMetadataManifest.Builder().diffManifest(
Expand All @@ -309,9 +349,9 @@ public void testDownloadRemotePersistedDiffStateFailedStats() throws IOException
when(remoteClusterStateService.getClusterMetadataManifestByFileName(any(), any())).thenReturn(metadataManifest);

doAnswer((i) -> {
remoteDiffDownloadStats.stateFailed();
remoteDiffDownloadStats.incomingPublicationFailedCount();
return null;
}).when(remoteClusterStateService).diffDownloadFailed();
}).when(remoteClusterStateService).diffIncomingPublicationFailed();

PublishWithJoinResponse expectedPublishResponse = new PublishWithJoinResponse(new PublishResponse(TERM, VERSION), Optional.empty());
Function<PublishRequest, PublishWithJoinResponse> handlePublishRequest = p -> expectedPublishResponse;
Expand All @@ -333,7 +373,7 @@ public void testDownloadRemotePersistedDiffStateFailedStats() throws IOException
handler.setCurrentPublishRequestToSelf(publishRequest);

assertThrows(NullPointerException.class, () -> handler.handleIncomingRemotePublishRequest(remotePublishRequest));
assertEquals(1, remoteClusterStateService.getDiffDownloadStats().getFailedCount());
assertEquals(1, ((RemoteDownloadStats) remoteClusterStateService.getDiffDownloadStats()).getIncomingPublicationFailedCount());

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -962,6 +962,9 @@ public void testGetClusterStateForManifest_ExcludeEphemeral() throws IOException
eq(false)

);
assertNotNull(remoteClusterStateService.getFullDownloadStats());
assertEquals(1, remoteClusterStateService.getFullDownloadStats().getSuccessCount());
assertEquals(0, remoteClusterStateService.getFullDownloadStats().getFailedCount());
}

public void testGetClusterStateFromManifest_CodecV1() throws IOException {
Expand Down Expand Up @@ -1296,6 +1299,9 @@ public void testGetClusterStateUsingDiff() throws IOException {
diffManifest.getClusterStateCustomDeleted().forEach(clusterStateCustomName -> {
assertFalse(updatedClusterState.customs().containsKey(clusterStateCustomName));
});
assertNotNull(remoteClusterStateService.getDiffDownloadStats());
assertEquals(1, remoteClusterStateService.getDiffDownloadStats().getSuccessCount());
assertEquals(0, remoteClusterStateService.getDiffDownloadStats().getFailedCount());
}

public void testReadClusterStateInParallel_TimedOut() throws IOException {
Expand Down Expand Up @@ -3421,6 +3427,9 @@ public void testGetClusterStateForManifestWithChecksumValidationEnabledWithMisma
true
);
assertEquals(1, remoteClusterStateService.getRemoteStateStats().getStateFullDownloadValidationFailed());
assertNotNull(remoteClusterStateService.getFullDownloadStats());
assertEquals(0, remoteClusterStateService.getFullDownloadStats().getSuccessCount());
assertEquals(1, remoteClusterStateService.getFullDownloadStats().getFailedCount());
}

public void testGetClusterStateForManifestWithChecksumValidationDebugWithMismatch() throws IOException {
Expand Down Expand Up @@ -3717,6 +3726,9 @@ public void testGetClusterStateUsingDiffWithChecksumMismatch() throws IOExceptio
eq(false)
);
assertEquals(1, remoteClusterStateService.getRemoteStateStats().getStateDiffDownloadValidationFailed());
assertNotNull(remoteClusterStateService.getDiffDownloadStats());
assertEquals(0, remoteClusterStateService.getDiffDownloadStats().getSuccessCount());
assertEquals(1, remoteClusterStateService.getDiffDownloadStats().getFailedCount());
}

private void mockObjectsForGettingPreviousClusterUUID(Map<String, String> clusterUUIDsPointers) throws IOException {
Expand Down

0 comments on commit 1b2c543

Please sign in to comment.