Skip to content

Commit

Permalink
Improve the excel script
Browse files Browse the repository at this point in the history
The excel script now takes parameters for which column to find the
ATT&CK ID/name in, and which column to start output from. I used this to
produce a diff of the MapEx spreadsheet for VERIS.
  • Loading branch information
mehaase committed Sep 27, 2024
1 parent db0541b commit dd3ab3a
Showing 1 changed file with 77 additions and 42 deletions.
119 changes: 77 additions & 42 deletions src/attack_sync/generate_mapping_excel.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import json
import re
from argparse import ArgumentParser
import string
from argparse import ArgumentParser, BooleanOptionalAction
from difflib import SequenceMatcher
from pathlib import Path
from typing import Dict
Expand Down Expand Up @@ -62,14 +63,39 @@ def parse_args():
type=str,
help="Path to input mapping spreadsheet (xlsx).",
)
parser.add_argument(
"attack_id_column",
type=str,
help="The column letter in the spreadsheet that contains ATT&CK object IDs.",
)
parser.add_argument(
"attack_name_column",
type=str,
help="The column letter in the spreadsheet that contains ATT&CK object names.",
)
parser.add_argument(
"output_column",
type=str,
help="The column letter where ATT&CK sync output should begin.",
)
parser.add_argument(
"mapping_out",
type=str,
help="Path to save output mapping spreadsheet (xlsx).",
)
parser.add_argument(
"--column-headers",
action=BooleanOptionalAction,
default=True,
help="Indicates whether the source spreadsheet has column headers or not.",
)
return parser.parse_args()


def column_letter_to_number(letter):
return string.ascii_uppercase.index(letter)


def main():
args = parse_args()

Expand All @@ -85,11 +111,13 @@ def main():
logger.info(f"Loading: {original_mapping_file}")
mapping_workbook = load_workbook(filename=original_mapping_file)
mapping_sheet = mapping_workbook.worksheets[0]
initial_cols = 5
cols_per_change = 2
start_col = column_letter_to_number(args.output_column)
attack_id_col = column_letter_to_number(args.attack_id_column)
attack_name_col = column_letter_to_number(args.attack_name_column)
cols_per_change = 2 # each change has two columns: change type and change value
num_changes = 5 # there are 5 hard coded checks for changes below
added_cols = cols_per_change * num_changes
total_cols = initial_cols + added_cols
total_cols = start_col + added_cols

add_style = InlineFont(b=True, color="339933")
del_style = InlineFont(strike=True, color="cc0000")
Expand All @@ -102,23 +130,25 @@ def get_tid_from_ext_refs(external_refs):
return external_id
raise Exception(f"No technique ID found in externals refs: {external_refs}")

def write_change(change_type, change_value):
nonlocal start_col
row[start_col].value = change_type
row[start_col + 1].value = change_value
start_col += cols_per_change
def write_change(row, column, change_type, change_value):
row[column].value = change_type
row[column + 1].value = change_value
# Return the number of columns written -- hardcoded at 2 for now
return 2

logger.info("Providing context for ATT&CK Techniques that have changed")
for row in mapping_sheet.iter_rows(max_col=total_cols):
control_id = row[0].value
technique_id = row[3].value
start_col = 5

# The first row needs column headers
if control_id == "Control ID":
for _ in range(5):
write_change("Change Type", "Change Value")
continue

row_iterator = mapping_sheet.iter_rows(max_col=total_cols)

# If there are column headers, then add new headers for the ATT&CK Sync columns.
if args.column_headers:
row = next(row_iterator)
for column in range(start_col, total_cols, cols_per_change):
write_change(row, column, "Change Type", "Change Value")

for row in row_iterator:
technique_id = row[attack_id_col].value
current_col = start_col

if technique_id in changed_techniques:
changed_technique = changed_techniques[technique_id]
Expand All @@ -137,10 +167,10 @@ def write_change(change_type, change_value):
revocation = f'Superseded by "{revoked_by_tid} {revoked_by["name"]}" - {revoked_by["description"]}'
else:
revocation = "No superseding technique."
write_change("Revoked By", revocation)
current_col += write_change(row, current_col, "Revoked By", revocation)
# Strike out the technique ID/name columns
row[4].font = strike_font
row[5].font = strike_font
row[attack_id_col].font = strike_font
row[attack_name_col].font = strike_font

detailed_diff = json.loads(changed_technique.get("detailed_diff", "{}"))
if detailed_diff:
Expand Down Expand Up @@ -181,7 +211,9 @@ def write_change(change_type, change_value):
cell_contents.append(new)

if stix_field == "description":
write_change(
current_col += write_change(
row,
current_col,
"Modified Description",
CellRichText(cell_contents),
)
Expand All @@ -192,15 +224,19 @@ def write_change(change_type, change_value):
new_value = ""
for new_mitigation in changelog_mitigations["new"]:
new_value += f"{new_mitigation}\n"
write_change(
current_col += write_change(
row,
current_col,
"New Mitigations",
CellRichText([TextBlock(add_style, new_value.strip())]),
)
if changelog_mitigations["dropped"]:
new_value = ""
for dropped_mitigation in changelog_mitigations["dropped"]:
new_value += f"{dropped_mitigation}\n"
write_change(
current_col += write_change(
row,
current_col,
"Dropped Mitigations",
CellRichText([TextBlock(del_style, new_value.strip())]),
)
Expand All @@ -210,15 +246,19 @@ def write_change(change_type, change_value):
new_value = ""
for new_detection in changelog_detections["new"]:
new_value += f"{new_detection}\n"
write_change(
current_col += write_change(
row,
current_col,
"New Detections",
CellRichText([TextBlock(add_style, new_value.strip())]),
)
if changelog_detections["dropped"]:
new_value = ""
for dropped_detections in changelog_detections["dropped"]:
new_value += f"{dropped_detections}\n"
write_change(
current_col += write_change(
row,
current_col,
"Dropped Detections",
CellRichText([TextBlock(del_style, new_value.strip())]),
)
Expand All @@ -229,28 +269,23 @@ def write_change(change_type, change_value):
if technique["CHANGE_TYPE"] == "additions"
}
for tid, addition in sorted(additions.items()):
mapping_sheet.append(
(
"",
"",
"",
tid,
addition["name"],
"New Technique",
addition["description"],
)
new_row = [""] * total_cols
new_row[attack_id_col] = tid
new_row[attack_name_col] = addition["name"]
new_row[start_col] = "New Technique"
new_row[start_col + 1] = CellRichText(
[TextBlock(add_style, addition["description"])]
)
mapping_sheet.append(new_row)

logger.info("Widening new columns...")
for i in range(0, num_changes):
col_idx = initial_cols + 1 + i * cols_per_change
for col_idx in range(start_col + 1, total_cols + 1, cols_per_change):
mapping_sheet.column_dimensions[get_column_letter(col_idx)].width = 18
for j in range(1, cols_per_change):
mapping_sheet.column_dimensions[get_column_letter(col_idx + j)].width = 100
mapping_sheet.column_dimensions[get_column_letter(col_idx + 1)].width = 100

logger.info("Wrapping text for new columns...")
for row in mapping_sheet.rows:
for col_idx in range(initial_cols + 1, total_cols):
for col_idx in range(start_col, total_cols):
row[col_idx].alignment = Alignment(wrap_text=True)

logger.info(f"Saving file: {final_workbook}")
Expand Down

0 comments on commit dd3ab3a

Please sign in to comment.