Skip to content

Commit

Permalink
fix retention data stream check
Browse files Browse the repository at this point in the history
  • Loading branch information
Justin Henderson committed Oct 11, 2022
1 parent 6cdf286 commit b047063
Showing 1 changed file with 40 additions and 26 deletions.
66 changes: 40 additions & 26 deletions retention.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from error import send_notification
NOTIFICATION = False


def get_retention_policy(client_config):
"""Get retention policy
Expand All @@ -21,11 +22,12 @@ def get_retention_policy(client_config):
if "retention" in client_config['policy']:
index_retention_policies = client_config['policy']['retention']
else:
index_retention_policies = { "global": 3660 }
index_retention_policies = {"global": 3660}
else:
index_retention_policies = { "global": 3660 }
index_retention_policies = {"global": 3660}
return index_retention_policies


def delete_old_indices(client_config, index, index_retention_policies):
"""Deletes indices past retention policy
Expand All @@ -36,11 +38,13 @@ def delete_old_indices(client_config, index, index_retention_policies):
"""
elastic_connection = es.build_es_connection(client_config)
newest_record = ""
newest_record = es.get_newest_document_date_in_index(client_config, index, elastic_connection)
newest_record = es.get_newest_document_date_in_index(
client_config, index, elastic_connection)
# make sure newest record is not empty
if newest_record != "":
# Get the index specific retention policy
policy = es.check_index_retention_policy(index, index_retention_policies)
policy = es.check_index_retention_policy(
index, index_retention_policies)
# Get policy retention days from specific policy
policy_days = index_retention_policies[policy]
# Get current datetime
Expand All @@ -52,14 +56,17 @@ def delete_old_indices(client_config, index, index_retention_policies):
index_group = es.get_index_group(index)
if days_ago >= policy_days:
# Delete old index
print(f"Deleting index {index} due to age of {days_ago}" \
f" vs policy limit of {policy_days}")
print(f"Deleting index {index} due to age of {days_ago}"
f" vs policy limit of {policy_days}")

index_group = es.get_index_group(index)
data_stream_info = elastic_connection.indices.get_data_stream(name=index_group)
number_of_indices_in_ds =0
number_of_indices_in_ds = len(data_stream_info['data_streams'][0]['indices'])
#active_ds_index = data_stream_info['data_streams'][0]['generation']
try:
data_stream_info = elastic_connection.indices.get_data_stream(
name=index_group)
number_of_indices_in_ds = len(
data_stream_info['data_streams'][0]['indices'])
except:
number_of_indices_in_ds = 0
if number_of_indices_in_ds == 1:
elastic_connection.indices.delete_data_stream(name=index_group)
else:
Expand All @@ -68,8 +75,10 @@ def delete_old_indices(client_config, index, index_retention_policies):
if success is False:
settings = load_settings()
message = f"Retention operation failed for client {client_config['client_name']}."
message = message + f"\nTried deleting index {index} due to age of "
message = message + f"{days_ago} vs policy limit of {policy_days}"
message = message + \
f"\nTried deleting index {index} due to age of "
message = message + \
f"{days_ago} vs policy limit of {policy_days}"

send_notification(
client_config,
Expand All @@ -81,6 +90,7 @@ def delete_old_indices(client_config, index, index_retention_policies):
)
elastic_connection.close()


def apply_retention_to_old_indices(indices, index_retention_policies, client_config):
"""Apply retention to indices older than policy
Expand All @@ -91,15 +101,17 @@ def apply_retention_to_old_indices(indices, index_retention_policies, client_con
"""
elastic_connection = es.build_es_connection(client_config)
with ThreadPoolExecutor(
max_workers=es.get_lowest_data_node_thread_count(client_config)
) as executor:
max_workers=es.get_lowest_data_node_thread_count(client_config)
) as executor:
for index in indices:
index = str(index['index'])
# Only proceed if index is not a special index
if not es.check_special_index(index):
executor.submit(delete_old_indices, client_config, index, index_retention_policies)
executor.submit(delete_old_indices, client_config,
index, index_retention_policies)
elastic_connection.close()


def apply_retention_policies(manual_client=""):
"""Apply retention policies
Expand All @@ -125,10 +137,11 @@ def apply_retention_policies(manual_client=""):
while retry_count >= 0 and success == 0:
# Check cluster health - Expect Yellow to continue
if es.check_cluster_health_status(
client_config, settings['retention']['health_check_level']
):
client_config, settings['retention']['health_check_level']
):
# Grab the client's retention policies
index_retention_policies = get_retention_policy(client_config)
index_retention_policies = get_retention_policy(
client_config)
# Next, get information on all current indices in cluster
indices = es.es_get_indices(client_config)
# Get the list of indices that are older than the retention policy
Expand All @@ -140,9 +153,9 @@ def apply_retention_policies(manual_client=""):
success = 1
else:
if retry_count > 0:
print("Retention operation failed for " + client_name + \
". Cluster health does not meet level: " + \
settings['retention']['health_check_level'])
print("Retention operation failed for " + client_name +
". Cluster health does not meet level: " +
settings['retention']['health_check_level'])
else:
message = "Retention operation failed.\n\n" + \
"It is also possible that connections are " + \
Expand All @@ -165,17 +178,18 @@ def apply_retention_policies(manual_client=""):
if success == 0:
# Decrese retry count by one before trying while statement again
retry_count = retry_count - 1
print("Retry attempts left for retention " + \
"operation set to " + str(retry_count) + \
" sleeping for " + str(sleep_time) + " seconds")
print("Retry attempts left for retention " +
"operation set to " + str(retry_count) +
" sleeping for " + str(sleep_time) + " seconds")
time.sleep(sleep_time)


if __name__ == "__main__":
import argparse
from argparse import RawTextHelpFormatter
parser = argparse.ArgumentParser(
description='Used to manually run accounting against a ' + \
'specific client (Example - retention.py --client ha)',
description='Used to manually run accounting against a ' +
'specific client (Example - retention.py --client ha)',
formatter_class=RawTextHelpFormatter
)
parser.add_argument(
Expand Down

0 comments on commit b047063

Please sign in to comment.