Skip to content

Commit

Permalink
Increment version to 2.9 and refactor (#1061)
Browse files Browse the repository at this point in the history
* Open Upgrade gradle version and remove remaining use of ImmutableOpenMap (#814)

Signed-off-by: Monu Singh <[email protected]>

* Increment version to 2.9 and refactor

Signed-off-by: monusingh-1 <[email protected]>

* remove use of import java.util.*

Signed-off-by: monusingh-1 <[email protected]>

---------

Signed-off-by: Monu Singh <[email protected]>
Signed-off-by: monusingh-1 <[email protected]>
  • Loading branch information
monusingh-1 authored Jul 11, 2023
1 parent b98d384 commit b7ebfe9
Show file tree
Hide file tree
Showing 18 changed files with 62 additions and 69 deletions.
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import org.opensearch.gradle.test.RestIntegTestTask
buildscript {
ext {
isSnapshot = "true" == System.getProperty("build.snapshot", "true")
opensearch_version = System.getProperty("opensearch.version", "2.8.0-SNAPSHOT")
opensearch_version = System.getProperty("opensearch.version", "2.9.0-SNAPSHOT")
buildVersionQualifier = System.getProperty("build.version_qualifier", "")
// e.g. 2.0.0-rc1-SNAPSHOT -> 2.0.0.0-rc1-SNAPSHOT
version_tokens = opensearch_version.tokenize('-')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,9 +130,9 @@ class TransportResumeIndexReplicationAction @Inject constructor(transportService
private suspend fun isResumable(params :IndexReplicationParams): Boolean {
var isResumable = true
val remoteClient = client.getRemoteClusterClient(params.leaderAlias)
val shards = clusterService.state().routingTable.indicesRouting().get(params.followerIndexName).shards()
val shards = clusterService.state().routingTable.indicesRouting().get(params.followerIndexName)?.shards()
val retentionLeaseHelper = RemoteClusterRetentionLeaseHelper(clusterService.clusterName.value(), clusterService.state().metadata.clusterUUID(), remoteClient)
shards.forEach {
shards?.forEach {
val followerShardId = it.value.shardId

if (!retentionLeaseHelper.verifyRetentionLeaseExist(ShardId(params.leaderIndex, followerShardId.id), followerShardId)) {
Expand All @@ -146,7 +146,7 @@ class TransportResumeIndexReplicationAction @Inject constructor(transportService

// clean up all retention leases we may have accidentally took while doing verifyRetentionLeaseExist .
// Idempotent Op which does no harm
shards.forEach {
shards?.forEach {
val followerShardId = it.value.shardId
log.debug("Removing lease for $followerShardId.id ")
retentionLeaseHelper.attemptRetentionLeaseRemoval(ShardId(params.leaderIndex, followerShardId.id), followerShardId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import org.opensearch.indices.IndicesService
import org.opensearch.threadpool.ThreadPool
import org.opensearch.transport.TransportService
import java.io.IOException
import java.util.*

class TranportShardsInfoAction @Inject constructor(clusterService: ClusterService,
transportService: TransportService,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import org.opensearch.common.settings.Settings
import org.opensearch.common.settings.Settings.readSettingsFromStream
import org.opensearch.core.xcontent.*
import java.io.IOException
import java.util.*
import java.util.Collections


class UpdateIndexReplicationRequest : AcknowledgedRequest<UpdateIndexReplicationRequest>, IndicesRequest.Replaceable, ToXContentObject {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ import org.opensearch.rest.action.admin.indices.AliasesNotFoundException
import org.opensearch.tasks.Task
import org.opensearch.threadpool.ThreadPool
import org.opensearch.transport.TransportService
import java.util.*
import java.util.Collections

/*
This action allows the replication plugin to update the index metadata(mapping, setting & aliases) on the follower index
Expand Down Expand Up @@ -274,8 +274,8 @@ class TransportUpdateMetadataAction @Inject constructor(
val indexAsArray = arrayOf(concreteIndex)
val aliasMetadata = metadata.findAliases(action, indexAsArray)
val finalAliases: MutableList<String> = ArrayList()
for (curAliases in aliasMetadata.values()) {
for (aliasMeta in curAliases.value) {
for (curAliases in aliasMetadata.values) {
for (aliasMeta in curAliases) {
finalAliases.add(aliasMeta.alias())
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@ import org.opensearch.cluster.block.ClusterBlockException
import org.opensearch.cluster.block.ClusterBlockLevel
import org.opensearch.cluster.block.ClusterBlocks
import org.opensearch.cluster.service.ClusterService
import org.opensearch.common.collect.ImmutableOpenMap
import org.opensearch.index.IndexNotFoundException
import org.opensearch.rest.RestStatus
import java.util.*
import java.util.Collections
import java.util.EnumSet


/* This is our custom index block to prevent changes to follower
Expand All @@ -49,11 +49,11 @@ fun checkIfIndexBlockedWithLevel(clusterService: ClusterService,
clusterBlockLevel: ClusterBlockLevel) {
clusterService.state().routingTable.index(indexName) ?:
throw IndexNotFoundException("Index with name:$indexName doesn't exist")
val writeIndexBlockMap : ImmutableOpenMap<String, Set<ClusterBlock>> = clusterService.state().blocks()
val writeIndexBlockMap : Map<String, Set<ClusterBlock>> = clusterService.state().blocks()
.indices(clusterBlockLevel)
if (!writeIndexBlockMap.containsKey(indexName))
return
val clusterBlocksSet : Set<ClusterBlock> = writeIndexBlockMap.get(indexName)
val clusterBlocksSet : Set<ClusterBlock> = writeIndexBlockMap.getOrDefault(indexName, Collections.emptySet())
if (clusterBlocksSet.contains(INDEX_REPLICATION_BLOCK)
&& clusterBlocksSet.size > 1)
throw ClusterBlockException(clusterBlocksSet)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,8 @@ import org.opensearch.core.xcontent.ToXContent
import org.opensearch.core.xcontent.XContentBuilder
import org.opensearch.core.xcontent.XContentParser
import java.io.IOException
import java.util.*
import java.util.function.BiConsumer
import java.util.function.BiFunction
import java.util.Collections

const val KEY_SETTINGS = "settings"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,8 +193,8 @@ class RemoteClusterRepository(private val repositoryMetadata: RepositoryMetadata
override fun getRepositoryData(listener: ActionListener<RepositoryData>) {
val clusterState = getLeaderClusterState(false, false)
val shardGenerations = ShardGenerations.builder()
clusterState.metadata.indices.values()
.map { it.value }
clusterState.metadata.indices.values
.map { it }
.forEach { indexMetadata ->
val indexId = IndexId(indexMetadata.index.name, indexMetadata.indexUUID)
for (i in 0 until indexMetadata.numberOfShards) {
Expand All @@ -215,7 +215,7 @@ class RemoteClusterRepository(private val repositoryMetadata: RepositoryMetadata
override fun getSnapshotInfo(snapshotId: SnapshotId): SnapshotInfo {
val leaderClusterState = getLeaderClusterState(false, false)
assert(REMOTE_SNAPSHOT_NAME.equals(snapshotId.name), { "SnapshotName differs" })
val indices = leaderClusterState.metadata().indices().keys().map { x -> x.value }
val indices = leaderClusterState.metadata().indices().keys.map { x -> x }
return SnapshotInfo(snapshotId, indices, emptyList(), SnapshotState.SUCCESS, Version.CURRENT)
}

Expand Down Expand Up @@ -244,7 +244,7 @@ class RemoteClusterRepository(private val repositoryMetadata: RepositoryMetadata
builder.remove(REPLICATION_INDEX_TRANSLOG_PRUNING_ENABLED_SETTING.key)

val indexMdBuilder = IndexMetadata.builder(indexMetadata).settings(builder)
indexMetadata.aliases.valuesIt().forEach {
indexMetadata.aliases.values.forEach {
indexMdBuilder.putAlias(it)
}
return indexMdBuilder.build()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import org.opensearch.replication.task.index.IndexReplicationExecutor.Companion.
import org.opensearch.action.support.IndicesOptions
import org.opensearch.client.Requests
import org.opensearch.client.node.NodeClient
import org.opensearch.common.Strings
import org.opensearch.core.common.Strings
import org.opensearch.rest.BaseRestHandler
import org.opensearch.rest.BaseRestHandler.RestChannelConsumer
import org.opensearch.rest.RestChannel
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,9 +136,9 @@ class RemoteClusterRetentionLeaseHelper constructor(var followerClusterNameWithU
val remoteMetadata = getLeaderIndexMetadata(replMetadata.connectionName, replMetadata.leaderContext.resource)
val params = IndexReplicationParams(replMetadata.connectionName, remoteMetadata.index, followerIndexName)
val remoteClient = client.getRemoteClusterClient(params.leaderAlias)
val shards = clusterService.state().routingTable.indicesRouting().get(params.followerIndexName).shards()
val shards = clusterService.state().routingTable.indicesRouting().get(params.followerIndexName)?.shards()
val retentionLeaseHelper = RemoteClusterRetentionLeaseHelper(clusterService.clusterName.value(), followerClusterUUID, remoteClient)
shards.forEach {
shards?.forEach {
val followerShardId = it.value.shardId
log.debug("Removing lease for $followerShardId.id ")
retentionLeaseHelper.attemptRetentionLeaseRemoval(ShardId(params.leaderIndex, followerShardId.id), followerShardId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -346,9 +346,9 @@ open class IndexReplicationTask(id: Long, type: String, action: String, descript
val clusterState = clusterService.state()
val persistentTasks = clusterState.metadata.custom<PersistentTasksCustomMetadata>(PersistentTasksCustomMetadata.TYPE)

val followerShardIds = clusterService.state().routingTable.indicesRouting().get(followerIndexName).shards()
.map { shard -> shard.value.shardId }
.stream().collect(Collectors.toSet())
val followerShardIds = clusterService.state().routingTable.indicesRouting().get(followerIndexName)?.shards()
?.map { shard -> shard.value.shardId }
?.stream()?.collect(Collectors.toSet()).orEmpty()
val runningShardTasksForIndex = persistentTasks.findTasks(ShardReplicationExecutor.TASK_NAME, Predicate { true }).stream()
.map { task -> task.params as ShardReplicationParams }
.filter {taskParam -> followerShardIds.contains(taskParam.followerShardId) }
Expand Down Expand Up @@ -434,16 +434,16 @@ open class IndexReplicationTask(id: Long, type: String, action: String, descript
// If we we want to retrieve just the version of settings and alias versions, there are two options
// 1. Include this in GetChanges and communicate it to IndexTask via Metadata
// 2. Add another API to retrieve version of settings & aliases. Persist current version in Metadata
var leaderSettings = settingsResponse.indexToSettings.get(this.leaderIndex.name)
leaderSettings = leaderSettings.filter { k: String? ->
var leaderSettings = settingsResponse.indexToSettings.getOrDefault(this.leaderIndex.name, Settings.EMPTY)
leaderSettings = leaderSettings.filter { k: String ->
!blockListedSettings.contains(k)
}

gsr = GetSettingsRequest().includeDefaults(false).indices(this.followerIndexName)
settingsResponse = client.suspending(client.admin().indices()::getSettings, injectSecurityContext = true)(gsr)
var followerSettings = settingsResponse.indexToSettings.get(this.followerIndexName)
var followerSettings = settingsResponse.indexToSettings.getOrDefault(this.followerIndexName, Settings.EMPTY)

followerSettings = followerSettings.filter { k: String? ->
followerSettings = followerSettings.filter { k: String ->
k != REPLICATED_INDEX_SETTING.key
}

Expand Down Expand Up @@ -516,11 +516,11 @@ open class IndexReplicationTask(id: Long, type: String, action: String, descript
//Alias
var getAliasesRequest = GetAliasesRequest().indices(this.leaderIndex.name)
var getAliasesRes = remoteClient.suspending(remoteClient.admin().indices()::getAliases, injectSecurityContext = true)(getAliasesRequest)
var leaderAliases = getAliasesRes.aliases.get(this.leaderIndex.name)
var leaderAliases = getAliasesRes.aliases.getOrDefault(this.leaderIndex.name, Collections.emptyList())

getAliasesRequest = GetAliasesRequest().indices(followerIndexName)
getAliasesRes = client.suspending(client.admin().indices()::getAliases, injectSecurityContext = true)(getAliasesRequest)
var followerAliases = getAliasesRes.aliases.get(followerIndexName)
var followerAliases = getAliasesRes.aliases.getOrDefault(followerIndexName, Collections.emptyList())

var request :IndicesAliasesRequest?

Expand Down Expand Up @@ -606,8 +606,8 @@ open class IndexReplicationTask(id: Long, type: String, action: String, descript

try {
//Step 1 : Remove the tasks
val shards = clusterService.state().routingTable.indicesRouting().get(followerIndexName).shards()
shards.forEach {
val shards = clusterService.state().routingTable.indicesRouting().get(followerIndexName)?.shards()
shards?.forEach {
persistentTasksService.removeTask(ShardReplicationTask.taskIdForShard(it.value.shardId))
}

Expand Down Expand Up @@ -748,7 +748,7 @@ open class IndexReplicationTask(id: Long, type: String, action: String, descript

suspend fun startNewOrMissingShardTasks(): Map<ShardId, PersistentTask<ShardReplicationParams>> {
assert(clusterService.state().routingTable.hasIndex(followerIndexName)) { "Can't find index $followerIndexName" }
val shards = clusterService.state().routingTable.indicesRouting().get(followerIndexName).shards()
val shards = clusterService.state().routingTable.indicesRouting().get(followerIndexName)?.shards()
val persistentTasks = clusterService.state().metadata.custom<PersistentTasksCustomMetadata>(PersistentTasksCustomMetadata.TYPE)
val runningShardTasks = persistentTasks.findTasks(ShardReplicationExecutor.TASK_NAME, Predicate { true }).stream()
.map { task -> task as PersistentTask<ShardReplicationParams> }
Expand All @@ -757,14 +757,14 @@ open class IndexReplicationTask(id: Long, type: String, action: String, descript
{t: PersistentTask<ShardReplicationParams> -> t.params!!.followerShardId},
{t: PersistentTask<ShardReplicationParams> -> t}))

val tasks = shards.map {
val tasks = shards?.map {
it.value.shardId
}.associate { shardId ->
}?.associate { shardId ->
val task = runningShardTasks.getOrElse(shardId) {
startReplicationTask(ShardReplicationParams(leaderAlias, ShardId(leaderIndex, shardId.id), shardId))
}
return@associate shardId to task
}
}.orEmpty()

return tasks
}
Expand Down Expand Up @@ -865,9 +865,9 @@ open class IndexReplicationTask(id: Long, type: String, action: String, descript
This can happen if there was a badly timed cluster manager node failure.""".trimIndent())
}
} else if (restore.state() == RestoreInProgress.State.FAILURE) {
val failureReason = restore.shards().values().find {
it.value.state() == RestoreInProgress.State.FAILURE
}!!.value.reason()
val failureReason = restore.shards().values.find {
it.state() == RestoreInProgress.State.FAILURE
}!!.reason()
return FailedState(Collections.emptyMap(), failureReason)
} else {
return InitFollowState
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ import java.nio.channels.FileChannel
import java.nio.file.Files
import java.nio.file.OpenOption
import java.nio.file.Path
import java.util.*
import java.util.function.Supplier
import java.util.LinkedList


class ReplicationTranslogDeletionPolicyTests : OpenSearchTestCase() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,11 @@ import org.opensearch.common.settings.Settings
import org.opensearch.test.OpenSearchTestCase
import org.junit.Assert
import org.junit.Assume
import java.util.*
import java.util.concurrent.TimeUnit
import org.opensearch.replication.task.autofollow.AutoFollowExecutor
import org.opensearch.tasks.TaskInfo
import org.junit.Before
import java.util.Locale

@MultiClusterAnnotations.ClusterConfigurations(
MultiClusterAnnotations.ClusterConfiguration(clusterName = LEADER),
Expand Down Expand Up @@ -228,7 +228,7 @@ class SecurityCustomRolesIT: SecurityBase() {
"1",
followerClient.indices()
.getSettings(getSettingsRequest, RequestOptions.DEFAULT)
.indexToSettings[followerIndexName][IndexMetadata.SETTING_NUMBER_OF_REPLICAS]
.indexToSettings.getOrDefault(followerIndexName, Settings.EMPTY)[IndexMetadata.SETTING_NUMBER_OF_REPLICAS]
)

settings = Settings.builder()
Expand All @@ -243,7 +243,7 @@ class SecurityCustomRolesIT: SecurityBase() {
"checksum",
followerClient.indices()
.getSettings(getSettingsRequest, RequestOptions.DEFAULT)
.indexToSettings[followerIndexName]["index.shard.check_on_startup"]
.indexToSettings.getOrDefault(followerIndexName, Settings.EMPTY)["index.shard.check_on_startup"]
)
}, 30L, TimeUnit.SECONDS)
}
Expand Down Expand Up @@ -273,7 +273,7 @@ class SecurityCustomRolesIT: SecurityBase() {
"1",
followerClient.indices()
.getSettings(getSettingsRequest, RequestOptions.DEFAULT)
.indexToSettings[followerIndexName][IndexMetadata.SETTING_NUMBER_OF_REPLICAS]
.indexToSettings.getOrDefault(followerIndexName, Settings.EMPTY)[IndexMetadata.SETTING_NUMBER_OF_REPLICAS]
)
settings = Settings.builder()
.put("index.shard.check_on_startup", "checksum")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ class SecurityDlsFlsIT: SecurityBase() {
"1",
followerClient.indices()
.getSettings(getSettingsRequest, RequestOptions.DEFAULT)
.indexToSettings[followerIndexName][IndexMetadata.SETTING_NUMBER_OF_REPLICAS]
.indexToSettings.getOrDefault(followerIndexName, Settings.EMPTY)[IndexMetadata.SETTING_NUMBER_OF_REPLICAS]
)
settings = Settings.builder()
.put("index.shard.check_on_startup", "checksum")
Expand Down
Loading

0 comments on commit b7ebfe9

Please sign in to comment.