From d8c04ce2524e7b80a9a3812796494b43690bb882 Mon Sep 17 00:00:00 2001 From: Fernando Pereira Date: Thu, 14 Nov 2024 14:44:01 +0000 Subject: [PATCH] CoreNeuron Rebalancer (#206) ## Context CoreNeuron runs, in particular those using multi-cycle, may end up distributing cells unoptimally. We want to mitigate that by introducing a post processing step which distributes the CoreNeuron input files evenly across ~~ranks~~ Machines, based on their size. ## Scope Added `rebalance-corenrn-data.py` and `rebalance-stats.py` to `neurodamus/tools`. CLI: ``` neurodamus/tools $ ./rebalance-corenrn-data.py -h usage: rebalance-corenrn-data.py [-h] [--ranks_per_machine RANKS_PER_MACHINE] [--max-entries MAX_ENTRIES] [--output-file OUTPUT_FILE] [-v] [--histogram] input_file n_machines Redistribute CoreNeuron dat files, optimizing for a given number of Machines positional arguments: input_file Path to the CoreNeuron input file, typically files.dat n_machines Number of target machines options: -h, --help show this help message and exit --ranks_per_machine RANKS_PER_MACHINE Number of target ranks --max-entries MAX_ENTRIES Consider only the first N entries of the input file --output-file OUTPUT_FILE The rebalanced output file path -v, --verbose Enable verbose output for debugging. --histogram Additionally display the histogram of the ranks accumulated sizes ``` ## Testing Using `/gpfs/bbp.cscs.ch/data/scratch/proj134/home/king/BBPP134-917/o_MultiCycle_Support/output/2557873/coreneuron_input/files.dat` for testing **Example** ``` leite@bbpv2 ~/dev/neurodamus/neurodamus-py/tools (leite/corenrn-rebalancer %)$ python rebalance-corenrn-data.py /gpfs/bbp.cscs.ch/data/scratch/proj134/home/king/BBPP134-917/o_MultiCycle_Support/output/2557873/coreneuron_input/files.dat 10 --max-entries=1000 INFO :: Reading from input file: /gpfs/bbp.cscs.ch/data/scratch/proj134/home/king/BBPP134-917/o_MultiCycle_Support/output/2557873/coreneuron_input/files.dat' WARNING :: files.dat (line 2): reduced number of entries: 1000' INFO :: Distributing files into 10 buckets...' INFO :: Processing 1000 entries' 0 [ 0%] 20 [ 2%] ... 980 [ 98%] INFO :: Writing out data from 10 buckets to file: rebalanced-files.dat' INFO :: DONE' ``` ## Review * [ ] PR description is complete * [ ] Coding style (imports, function length, New functions, classes or files) are good * [ ] Unit/Scientific test added * [ ] Updated Readme, in-code, developer documentation --- tools/rebalance-corenrn-data.py | 202 ++++++++++++++++++++++++++++++++ tools/rebalance-stats.py | 83 +++++++++++++ tools/toolbox.py | 40 +++++++ 3 files changed, 325 insertions(+) create mode 100755 tools/rebalance-corenrn-data.py create mode 100755 tools/rebalance-stats.py create mode 100644 tools/toolbox.py diff --git a/tools/rebalance-corenrn-data.py b/tools/rebalance-corenrn-data.py new file mode 100755 index 00000000..29d11a31 --- /dev/null +++ b/tools/rebalance-corenrn-data.py @@ -0,0 +1,202 @@ +#!/bin/env python3 + +""" +A post-processing script to redistribute CoreNeuron input files +more evenly across ranks based on their filesystem size. + +Blue Brain Project - EPFL, 2024 + +""" + +import argparse +import heapq +import itertools +import logging +import os +import sys + +from toolbox import get_dat_entry_size as get_entry_size +from toolbox import show_histogram, with_progress + +DEFAULT_OUTPUT_FILE = "rebalanced-files.dat" +CORENRN_SKIP_MARK = "-1" +DEFAULT_RANKS_PER_MACHINE = 40 + + +def distribute_dat_to_bucket(dat_entry, size, buckets, bucket_sizes): + """ + Distribute a single file into the bucket with the least total size. + """ + # Pop the bucket with the smallest size + smallest_size, smallest_bucket_index = heapq.heappop(bucket_sizes) + # Assign the file to this bucket + buckets[smallest_bucket_index].append(dat_entry) + # Update the bucket size in the heap + new_size = smallest_size + size + heapq.heappush(bucket_sizes, (new_size, smallest_bucket_index)) + + +def redistribute_files_dat(files_dat_file, n_buckets, max_entries=None, show_stats=False): + """ + Read and process each entry from the dat file and distribute them into buckets. + """ + base_dir = os.path.dirname(files_dat_file) + metadata = {} + + logging.debug("Reading distribution file: %s", files_dat_file) + with open(files_dat_file, "r") as file: + # read header + metadata["version"] = file.readline().strip() + n_entries = file.readline() + + metadata["n_files"] = max_entries or n_entries + + # read all dat entries + dat_entries = file.readlines() + + if (n_files := int(metadata["n_files"])) < len(dat_entries): + logging.warning("files.dat: processing reduced number of entries: %d", n_files) + dat_entries = dat_entries[:n_files] + + logging.info("Distributing files into %d buckets...", n_buckets) + + if len(dat_entries) < n_buckets: + raise RuntimeError("Too little data for selected number of ranks. Specify less") + + # Initialize empty buckets + buckets = [[] for _ in range(n_buckets)] + + # Create a heap to keep track of bucket sizes. Each entry is (bucket_size, bucket_index) + bucket_heap = [(0, i) for i in range(n_buckets)] + heapq.heapify(bucket_heap) # Turn the list into a heap + + dat_entries = [entry.strip() for entry in dat_entries] + entry_sizes = [(entry, get_entry_size(base_dir, entry)) for entry in with_progress(dat_entries)] + # Knapsack: sort entries from larger to smaller, + entry_sizes = sorted(entry_sizes, key=lambda e: e[1], reverse=True) + + for dat_entry, size in entry_sizes: + try: + distribute_dat_to_bucket(dat_entry, size, buckets, bucket_heap) + except Exception as e: + raise RuntimeError(f"Error processing dat entry {dat_entry}") from e + + if show_stats: + logging.info("Top 10 machines accumulated sizes") + for size, mach_i in heapq.nlargest(10, bucket_heap): + print(f" Machine {mach_i}: {size/(1024*1024):.1f} MiB") + + mach_sizes = [bucket[0] for bucket in bucket_heap] + show_histogram(mach_sizes) + + return buckets, metadata + + +def write_dat_file(buckets, infos: dict, ranks_per_machine, output_file="rebalanced-files.dat"): + """ + Output the result after processing all directories + """ + DEFAULT_LINE = f"{CORENRN_SKIP_MARK}\n" * ranks_per_machine + logging.info("Writing out data from %d buckets to file: %s", len(buckets), output_file) + + # CoreNeuron does RoundRobin - we need to transpose the entries + # When a sequence finishes use "-1" (to keep in sync) + + def batch(iterable, first=0): + last = first + ranks_per_machine + group = iterable[first:last] + while group: + if len(group) < ranks_per_machine: + yield group + [CORENRN_SKIP_MARK] * (ranks_per_machine - len(group)) + break + yield group + first, last = last, last + 40 + group = iterable[first:last] + + with open(output_file, "w") as out: + print(infos["version"], file=out) + print(infos["n_files"], file=out) + + for buckets in itertools.zip_longest(*[batch(m) for m in buckets]): + for entries in buckets: + if entries is None: + out.write(DEFAULT_LINE) + else: + for entry in entries: + out.write(entry + "\n") + + +def main(): + parser = argparse.ArgumentParser( + usage="%(prog)s [OPTION]... ", + description="Redistribute CoreNeuron dat files, optimizing for a given number of Machines", + ) + parser.add_argument( + "input_file", + type=str, + help="Path to the CoreNeuron input file, typically files.dat", + ) + parser.add_argument( + "n_machines", type=int, help="The number of target machines running the simulation" + ) + parser.add_argument( + "--ranks-per-machine", + type=int, + default=DEFAULT_RANKS_PER_MACHINE, + help=f"Number of ranks per machine (default: {DEFAULT_RANKS_PER_MACHINE})", + ) + parser.add_argument( + "--max-entries", + type=int, + default=None, + help="Consider only the first N entries of the input file", + ) + parser.add_argument( + "--output-file", + default=DEFAULT_OUTPUT_FILE, + help="The rebalanced output file path", + ) + # Optional argument for verbose output + parser.add_argument( + "-v", + "--verbose", + action="store_true", + help="Enable verbose output for debugging.", + ) + parser.add_argument( + "--histogram", + action="store_true", + help="Additionally display the histogram of the machine accumulated sizes", + ) + + args = parser.parse_args() + + logging_level = logging.DEBUG if args.verbose else logging.INFO + logging.basicConfig(level=logging_level, format="%(levelname)s :: %(message)s") + + if args.histogram: + try: + import numpy as _ # noqa + except ImportError: + logging.error("Numpy is required to compute histograms") + return 1 + + if not os.path.isfile(args.input_file): + logging.error("Input file could not be found!") + return 1 + else: + logging.info(f"Reading from input file: {args.input_file}") + + # Do the redistribution + buckets, infos = redistribute_files_dat( + args.input_file, args.n_machines, args.max_entries, args.histogram + ) + + # Create a new files.dat according to the new buckets + write_dat_file(buckets, infos, args.ranks_per_machine, args.output_file) + + logging.info("DONE") + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/tools/rebalance-stats.py b/tools/rebalance-stats.py new file mode 100755 index 00000000..cdb987d5 --- /dev/null +++ b/tools/rebalance-stats.py @@ -0,0 +1,83 @@ +#!/bin/env python3 + +""" +A script to inspect and show statistics of the load imposed by a given +CORENEURON input dir when loaded Round-Robin in a cluster. + +Blue Brain Project - EPFL, 2024 + +""" + +import argparse +import os +from toolbox import get_dat_entry_size, show_histogram, with_progress + +CORENRN_SKIP_MARK = "-1" +DEFAULT_RANKS_PER_MACHINE = 40 + + +def files_dat_load_ranks(input_file, n_machines, ranks_per_machine, base_dir): + """From a files.dat compute the total amount of data to load per rank + """ + print(f"Reading from input file: {input_file}") + base_dir = base_dir or os.path.dirname(input_file) + n_ranks = n_machines * ranks_per_machine + ranks_size = [0.0] * n_ranks + + with open(input_file, "r") as file: + next(file) # header: version + next(file) # header: n_values + for i, line in enumerate(with_progress(file.readlines())): + if line[:2] == CORENRN_SKIP_MARK: + continue + size = get_dat_entry_size(base_dir, line.strip()) + ranks_size[i % n_ranks] += size + + return ranks_size + + +def main(): + parser = argparse.ArgumentParser( + description="Redistribute CoreNeuron dat files, optimizing for a given number of Machines" + ) + parser.add_argument( + "input_file", + type=str, + help="Path to the CoreNeuron input file, typically 'files.dat'", + ) + parser.add_argument( + "n_machines", type=int, help="Number of target machines running the simulation" + ) + parser.add_argument( + "--ranks-per-machine", + type=int, + default=DEFAULT_RANKS_PER_MACHINE, + help=f"Number of ranks per machine (default: {DEFAULT_RANKS_PER_MACHINE})", + ) + parser.add_argument( + "--dat-path", + type=str, + default=None, + required=False, + help="The base path of dat files", + ) + args = parser.parse_args() + + ranks_size = files_dat_load_ranks( + args.input_file, args.n_machines, args.ranks_per_machine, args.dat_path + ) + + # Collect Machine stats + machine_size = [0.0] * args.n_machines + for rank_i, size in enumerate(ranks_size): + machine_size[rank_i // args.ranks_per_machine] += size + + print("Machine histogram") + show_histogram(machine_size) + + print("Rank histogram") + show_histogram(ranks_size) + + +if __name__ == "__main__": + main() diff --git a/tools/toolbox.py b/tools/toolbox.py new file mode 100644 index 00000000..71e7f8db --- /dev/null +++ b/tools/toolbox.py @@ -0,0 +1,40 @@ +# Blue Brain Project - EPFL, 2024 +"""A library of functions shared across tools. +""" + +import math +import os +import sys + +PROGRESS_STEPS = 50 +DEFAULT_HISTOGRAM_NBINS = 40 + + +def show_histogram(buckets, n_bins=DEFAULT_HISTOGRAM_NBINS): + """A simple histogram CLI visualizer""" + import numpy # optional + MiB = float(1024 * 1024) + freq, bins = numpy.histogram(buckets, bins=n_bins) + bin_start = bins[0] + for count, bin_end in zip(freq, bins[1:]): + if count: + print(f" [{bin_start/MiB:5.0f} - {bin_end/MiB:5.0f}]: {count:0d}") + bin_start = bin_end + + +def get_dat_entry_size(base_dir, dat_entry): + """Obtain the file size of a dat entry""" + dat_file = f"{dat_entry}_2.dat" + file_path = os.path.join(base_dir, dat_file) + return os.path.getsize(file_path) + + +def with_progress(elements): + """A quick and easy generator for displaying progress while iterating""" + total_elems = len(elements) + report_every = math.ceil(total_elems / PROGRESS_STEPS) + print(f"Processing {total_elems} entries") + for i, elem in enumerate(elements): + if i % report_every == 0: + print(f"{i:10} [{i*100/total_elems:3.0f}%]", file=sys.stderr) + yield elem