diff --git a/make_synthetic_test_dataset.py b/make_synthetic_test_dataset.py index 9b43f36..fb4c272 100644 --- a/make_synthetic_test_dataset.py +++ b/make_synthetic_test_dataset.py @@ -2,6 +2,7 @@ import os from pathlib import Path from contextlib import redirect_stdout +import hashlib import numpy as np @@ -62,7 +63,11 @@ def generate_instances_from_mist_models(instance_count: int, label: str, verbose # pylint: disable=too-many-locals, too-many-statements, invalid-name generated_counter = 0 usable_counter = 0 - set_id = label.replace("trainset", "") + set_id = ''.join(filter(str.isdigit, label)) + + # Don't use the built-in hash() function; it's not consistent across processes!!! + seed = int.from_bytes(hashlib.shake_128(label.encode("utf8")).digest(8)) + rng = np.random.default_rng(seed) mission = Mission.get_instance("TESS") @@ -74,10 +79,10 @@ def generate_instances_from_mist_models(instance_count: int, label: str, verbose while usable_counter < instance_count: while True: # imitate "loop and a half" / do ... until logic # Get a list of initial masses at a random metallicity & age to choose our stars from - feh = np.random.choice(feh_values) + feh = rng.choice(feh_values) ages = _mist_isochones.list_ages(feh, min_phase, max_phase) while True: - age = np.random.choice(ages) * u.dex(u.yr) + age = rng.choice(ages) * u.dex(u.yr) init_masses = _mist_isochones.list_initial_masses(feh, age.value, min_phase, max_phase, min_mass_value, max_mass_value) @@ -89,11 +94,11 @@ def generate_instances_from_mist_models(instance_count: int, label: str, verbose probs = np.power(init_masses, -2.35) # Salpeter IMF probs *= np.tanh(0.31 * init_masses + .18) # Wells & Prsa Primary multiplicity frac probs = np.divide(probs, np.sum(probs)) # Scaled to get a pmf() == 1 - init_MA = np.random.choice(init_masses, p=probs) * u.solMass + init_MA = rng.choice(init_masses, p=probs) * u.solMass init_MB_mask = (init_masses >= min_mass_value) & (init_masses < init_MA.value) if any(init_MB_mask): - init_MB = np.random.choice(init_masses[init_MB_mask]) * u.solMass + init_MB = rng.choice(init_masses[init_MB_mask]) * u.solMass else: init_MB = init_MA @@ -115,12 +120,12 @@ def generate_instances_from_mist_models(instance_count: int, label: str, verbose # . 3(RA+RB) / 2(1-e) (Wells & Prsa) (assuming e==0 for now) # . max(5*RA, 5*RB) (based on JKTEBOP recommendation for rA <= 0.2, rB <= 0.2) a_min = max(3/2*(RA+RB), 5*RA, 5*RB) - per_min = orbital.orbital_period(MA, MB, a_min) + per_min = orbital.orbital_period(MA, MB, a_min).to(u.d).value # We generate period, inc, and omega (argument of periastron) from uniform distributions - per = np.random.uniform(low=per_min.to(u.d).value, high=25) * u.d - inc = np.random.uniform(low=50., high=90.00001) * u.deg - omega = np.random.uniform(low=0., high=360.) * u.deg + per = rng.uniform(low=per_min, high=max(per_min, 25)) * u.d + inc = rng.uniform(low=50., high=90.00001) * u.deg + omega = rng.uniform(low=0., high=360.) * u.deg # Eccentricity from uniform distribution, subject to a maximum value which depends on # orbital period/seperation (again, based on Wells & Prsa; Moe & Di Stefano) @@ -129,11 +134,11 @@ def generate_instances_from_mist_models(instance_count: int, label: str, verbose ecc = 0 else: e_max = max(min(1-(per.value/2)**(-2/3), 1-(1.5*(RA+RB)/a).value), 0) - ecc = np.random.uniform(low=0, high=e_max) + ecc = rng.uniform(low=0, high=e_max) # We're once more predicting L3 as JKTEBOP is being updated to support # negative L3 input values (so it's now fully trainable) - L3 = np.random.normal(0., 0.1) + L3 = rng.normal(0., 0.1) L3 = 0 # continue to override this as L3 doesn't train well # Now we can calculate other params which we need to decide whether to use this @@ -165,7 +170,7 @@ def generate_instances_from_mist_models(instance_count: int, label: str, verbose # Now we have to decide an appropriate Gaussian noise SNR to apply. # Randomly choose an apparent mag in the TESS photometric range then derive # the SNR (based on a linear regression fit of Álvarez et al. (2024) Table 2). - apparent_mag = np.random.uniform(6, 18) + apparent_mag = rng.uniform(6, 18) snr = np.add(np.multiply(apparent_mag, -2.32), 59.4) yield { @@ -225,35 +230,28 @@ def generate_instances_from_mist_models(instance_count: int, label: str, verbose # which generates random plausible dEB systems based on MIST stellar models. # ------------------------------------------------------------------------------ if __name__ == "__main__": - with redirect_stdout(Tee(open(dataset_dir / "trainset.log", - "w", - encoding="utf8"))): - datasets.generate_dataset_csvs(instance_count=DATASET_SIZE, - file_count=10, - output_dir=dataset_dir, - generator_func=generate_instances_from_mist_models, - file_pattern="trainset{0:03d}.csv", - verbose=True, - simulate=False) - - plots.plot_trainset_histograms(dataset_dir, dataset_dir / "synth-histogram-full.png", cols=4) - plots.plot_trainset_histograms(dataset_dir, dataset_dir / "synth-histogram-main.eps", cols=2, - params=["rA_plus_rB", "k", "J", "inc", "ecosw", "esinw"]) - - with redirect_stdout(Tee(open(dataset_dir/"dataset.log", - "a" if RESUME else "w", - encoding="utf8"))): - datasets.make_dataset_files(trainset_files=sorted(dataset_dir.glob("trainset*.csv")), - output_dir=dataset_dir, - valid_ratio=0., - test_ratio=1., - resume=RESUME, - max_workers=5, - verbose=True, - simulate=False) - - # Simple diagnostic plot of the mags feature of randomly sampled instances. - dataset_files = sorted(dataset_dir.glob("**/*.tfrecord")) - ids, _, _, _ = deb_example.read_dataset(dataset_files) - fig = plots.plot_dataset_instance_mags_features(dataset_files, np.random.choice(ids, 30)) - fig.savefig(dataset_dir / "sample.pdf") + + with redirect_stdout(Tee(open(dataset_dir/"dataset.log", "w", encoding="utf8"))): + datasets.make_dataset(instance_count=DATASET_SIZE, + file_count=10, + output_dir=dataset_dir, + generator_func=generate_instances_from_mist_models, + file_prefix="trainset", + valid_ratio=0., + test_ratio=1., + max_workers=5, + save_param_csvs=True, + verbose=True, + simulate=False) + + # TODO: Update plot_trainset_histograms so that we can change name of the csv/dataset files + # Histograms are generated from the CSV files (as they cover params not in the dataset) + plots.plot_trainset_histograms(dataset_dir, dataset_dir/"synth-histogram-full.png", cols=4) + plots.plot_trainset_histograms(dataset_dir, dataset_dir/"synth-histogram-main.eps", cols=2, + params=["rA_plus_rB", "k", "J", "inc", "ecosw", "esinw"]) + + # Simple diagnostic plot of the mags feature of a small sample of the instances. + dataset_files = sorted(dataset_dir.glob("**/*.tfrecord")) + ids, _, _, _ = deb_example.read_dataset(dataset_files) + fig = plots.plot_dataset_instance_mags_features(dataset_files, ids[:30]) + fig.savefig(dataset_dir / "sample.pdf") diff --git a/make_training_dataset.py b/make_training_dataset.py index 0caa6c1..2895eac 100644 --- a/make_training_dataset.py +++ b/make_training_dataset.py @@ -2,6 +2,7 @@ import os from pathlib import Path from contextlib import redirect_stdout +import hashlib import numpy as np @@ -34,7 +35,6 @@ # - it's a convenient break in the process DATASET_SIZE = 250000 -RESUME = False dataset_dir = Path(f"./datasets/formal-training-dataset-{DATASET_SIZE // 1000}k/") dataset_dir.mkdir(parents=True, exist_ok=True) @@ -51,19 +51,23 @@ def generate_instances_from_distributions(instance_count: int, label: str, verbo # pylint: disable=too-many-locals, invalid-name generated_counter = 0 usable_counter = 0 - set_id = label.replace("trainset", "") + set_id = ''.join(filter(str.isdigit, label)) + + # Don't use the built-in hash() function; it's not consistent across processes!!! + seed = int.from_bytes(hashlib.shake_128(label.encode("utf8")).digest(8)) + rng = np.random.default_rng(seed) while usable_counter < instance_count: while True: # imitate "loop and a half" / "repeat ... until" logic # These are the "label" params for which we have defined distributions - rA_plus_rB = np.random.uniform(low=0.001, high=0.45001) - k = np.random.normal(loc=0.8, scale=0.4) - inc = np.random.uniform(low=50., high=90.00001) * u.deg - J = np.random.normal(loc=0.8, scale=0.4) + rA_plus_rB = rng.uniform(low=0.001, high=0.45001) + k = rng.normal(loc=0.8, scale=0.4) + inc = rng.uniform(low=50., high=90.00001) * u.deg + J = rng.normal(loc=0.8, scale=0.4) # We need a version of JKTEBOP which supports negative L3 input values # (not so for version 43) in order to train a model to predict L3. - L3 = np.random.normal(0., 0.1) + L3 = rng.normal(0., 0.1) L3 = 0 # continue to override until revised JKTEBOP released # The qphot mass ratio value (MB/MA) affects the lightcurve via the ellipsoidal effect @@ -71,12 +75,12 @@ def generate_instances_from_distributions(instance_count: int, label: str, verbo # a value from other params. We're using the k-q relations of Demircan & Kahraman (1991) # Both <1.66 M_sun (k=q^0.935), both >1.66 M_sun (k=q^0.542), MB-low/MA-high (k=q^0.724) # and approx' single rule is k = q^0.715 which we use here (tests find this works best). - qphot = np.random.normal(loc=k**1.4, scale=0.3) if k > 0 else 0 + qphot = rng.normal(loc=k**1.4, scale=0.3) if k > 0 else 0 # We generate ecc and omega (argument of periastron) from appropriate distributions. # They're not used directly as labels, but they make up ecosw and esinw which are. - ecc = np.abs(np.random.normal(loc=0.0, scale=0.2)) - omega = np.random.uniform(low=0., high=360.) * u.deg + ecc = np.abs(rng.normal(loc=0.0, scale=0.2)) + omega = rng.uniform(low=0., high=360.) * u.deg # Now we can calculate the derived values, sufficient to check we've a usable system inc_rad = inc.to(u.rad).value @@ -132,29 +136,20 @@ def generate_instances_from_distributions(instance_count: int, label: str, verbo # samples parameter distributions over JKTEBOP's usable range. # ------------------------------------------------------------------------------ if __name__ == "__main__": - with redirect_stdout(Tee(open(dataset_dir / "trainset.log", - "w", - encoding="utf8"))): - datasets.generate_dataset_csvs(instance_count=DATASET_SIZE, - file_count=DATASET_SIZE // 10000, - output_dir=dataset_dir, - generator_func=generate_instances_from_distributions, - file_pattern="trainset{0:03d}.csv", - verbose=True, - simulate=False) - - plots.plot_trainset_histograms(dataset_dir, dataset_dir / "train-histogram-full.png", cols=3) - plots.plot_trainset_histograms(dataset_dir, dataset_dir / "train-histogram-main.eps", cols=2, - params=["rA_plus_rB", "k", "J", "inc", "ecosw", "esinw"]) - - with redirect_stdout(Tee(open(dataset_dir / "dataset.log", - "a" if RESUME else "w", - encoding="utf8"))): - datasets.make_dataset_files(trainset_files=sorted(dataset_dir.glob("trainset*.csv")), - output_dir=dataset_dir, - valid_ratio=0.2, - test_ratio=0, - resume=RESUME, - max_workers=5, - verbose=True, - simulate=False) + + with redirect_stdout(Tee(open(dataset_dir/"dataset.log", "w", encoding="utf8"))): + datasets.make_dataset(instance_count=DATASET_SIZE, + file_count=DATASET_SIZE // 10000, + output_dir=dataset_dir, + generator_func=generate_instances_from_distributions, + file_prefix="trainset", + valid_ratio=0.2, + test_ratio=0, + max_workers=5, + save_param_csvs=True, + verbose=True, + simulate=False) + + plots.plot_trainset_histograms(dataset_dir, dataset_dir/"train-histogram-full.png", cols=3) + plots.plot_trainset_histograms(dataset_dir, dataset_dir/"train-histogram-main.eps", cols=2, + params=["rA_plus_rB", "k", "J", "inc", "ecosw", "esinw"]) diff --git a/traininglib/datasets.py b/traininglib/datasets.py index 029e349..03ff40a 100644 --- a/traininglib/datasets.py +++ b/traininglib/datasets.py @@ -4,14 +4,14 @@ # Use ucase variable names where they match the equivalent symbol and pylint can't find units alias # pylint: disable=invalid-name, no-member, too-many-arguments, too-many-locals -from typing import Callable, Generator, Iterator +from typing import Callable, Generator from pathlib import Path from timeit import default_timer from datetime import timedelta -import random import traceback from multiprocessing import Pool import csv +import hashlib import numpy as np from scipy.interpolate import interp1d @@ -53,272 +53,214 @@ "bP": (100, r"$b_{prim}$") } -def generate_dataset_csvs(instance_count: int, - file_count: int, - output_dir: Path, - generator_func: Callable[[int, str, bool], Generator[dict[str, any], any, any]], - seed: float=42, - file_pattern: str="dataset{0:03d}.csv", - verbose: bool=False, - simulate: bool=False) -> None: - """ - Writes trainset csv file(s) with instances created by the chosen generator_func. +def make_dataset(instance_count: int, + file_count: int, + output_dir: Path, + generator_func: Callable[[int, str, bool], Generator[dict[str, any], any, any]], + valid_ratio: float=0., + test_ratio: float=0, + file_prefix: str="dataset", + max_workers: int=1, + save_param_csvs: bool=True, + verbose: bool=False, + simulate: bool=False) -> None: + """ + This is a convenience wrapper for make_dataset_file() which handles parallel running by + splitting a request into multiple files and calling make_dataset_file() for each within a pool. + :instance_count: the number of training instances to create :file_count: the number of files to spread them over :output_dir: the directory to write the files to :generator_func: the function to call to generate the required number of systems which must have arguments (instance_count: int, file_stem: str, verbose: bool) and return a Generator[dict[str]] - :seed: random seed to ensure repeatability - :file_pattern: naming pattern for the csv files. Must have a single integer format placeholder. + :valid_ratio: proportion of rows to be written to the validation files + :test_ratio: proportion of rows to be written to the testing files + :file_prefix: naming prefix for each of the dataset files + :max_workers: maximum number of files to process concurrently + :save_param_csvs: whether to save csv files with full params in addition to the dataset files. :verbose: whether to print verbose progress/diagnostic messages :simulate: whether to simulate the process, skipping only file/directory actions """ - if not simulate: - output_dir.mkdir(parents=True, exist_ok=True) + if valid_ratio + test_ratio > 1: + raise ValueError("valid_ratio + test_ratio > 1") + + train_ratio = 1 - valid_ratio - test_ratio if verbose: print(f""" -Generate & write dEB system parameters CSVs for required number of instances ---------------------------------------------------------=------------------- +Generate & write dEB system dataset for required number of instances +-------------------------------------------------------------------- The number of instances to generate: {instance_count:,} across {file_count} file(s) +The dataset files to be written within: {output_dir} The instance generator function is: {generator_func.__name__} -The parameter CSVs will be written to: {output_dir} -The parameter CSVs will be named: {file_pattern} -The random seed to use for selections: {seed}\n""") +Training : Validation : Test ratio is: {train_ratio:.2f} : {valid_ratio:.2f} : {test_ratio:.2f} +The maximum concurrent workers: {max_workers} +File names will be prefixed with: {file_prefix}\n""") if simulate: print("Simulate requested so no files will be written.\n") + elif save_param_csvs: + print("Parameter CSV files will be saved to accompany dataset files.\n") if verbose: start_time = default_timer() - # This is reasonably quick so there's no need to use a process pool here. - # Also, a pool may mess with the repeatability of the psuedo random behaviour. - np.random.seed(seed) - for ix, file_inst_count in enumerate(_calculate_file_splits(instance_count, file_count), 0): - out_file = output_dir / file_pattern.format(ix) - psets = [*generator_func(file_inst_count, out_file.stem, verbose)] - if verbose: - print(f"{'Simulated s' if simulate else 'S'}aving {len(psets)} inst(s) to {out_file}") - if not simulate: - write_param_sets_to_csv(out_file, psets) - - if verbose: - print(f"\nFinished. The time taken was {timedelta(0, round(default_timer()-start_time))}.") - - -def make_dataset_files(trainset_files: Iterator[Path], - output_dir: Path, - valid_ratio: float=0., - test_ratio: float=0, - interp_kind: str="cubic", - resume: bool=False, - max_workers: int=1, - seed: float=42, - verbose: bool=True, - simulate: bool=True): - """ - Will make the an equivalent set of TensorFlow dataset (tfrecord) files from - the input trainset csv files. Up to three dataset files, one each for - training, validation and testing, will be created for each input file in - equivalently named subdirectories of the output_dir argument. The files' - contents are selected randomly from the input in the proportions indicated - by the valid_ratio and test_ratio arguments. Each output row will include - a freshly generated phase folded and model lightcurve & additional features, - and the training labels taken from the input. - - :trainset_files: the input trainset csv files - :output_dir: the parent directory for the dataset files, if None the same as the input - :valid_ratio: proportion of rows to be written to the validation files - :test_ratio: proportion of rows to be written to the testing files - :interp_kind: the kind of interpolation used to reduce the the lightcurve models - :resume: whether we are to attempt to resume from a previous "make" - :max_workers: maximum number of files to process concurrently - :seed: the seed ensures random selection of subsets are repeatable - :verbose: whether to print verbose progress/diagnostic messages - :simulate: whether to simulate the process, skipping only file/directory actions - """ - start_time = default_timer() - trainset_files = sorted(trainset_files) - file_count = len(trainset_files) - train_ratio = 1 - valid_ratio - test_ratio - if verbose: - print(f""" -Build training datasets from testset csvs. ------------------------------------------- -The number of input trainset files is: {file_count} -Output dataset directory: {output_dir} -Resume previous job is set to: {'on' if resume else 'off'} -Training : Validation : Test ratio is: {train_ratio:.2f} : {valid_ratio:.2f} : {test_ratio:.2f} -The model interpolation kind is: {interp_kind} -The maximum concurrent workers: {max_workers} -The random seed to use for selections: {seed}\n""") - if simulate: - print("Simulate requested so no files will be written.\n") + if not simulate: + output_dir.mkdir(parents=True, exist_ok=True) # args for each make_dataset_file call as required by process_pool starmap + file_inst_counts = list(_calculate_file_splits(instance_count, file_count)) iter_params = ( - (f, output_dir, valid_ratio, test_ratio, interp_kind, resume, seed, verbose, simulate) - for f in trainset_files + # pylint: disable=line-too-long + (inst_count, file_ix, output_dir, generator_func, valid_ratio, test_ratio, file_prefix, save_param_csvs, verbose, simulate) + for (file_ix, inst_count) in enumerate(file_inst_counts) ) max_workers = min(file_count, max_workers or 1) if max_workers <= 1: - # We could use a pool of 1, but keep execution on the interactive proc + # We could use a pool of 1, but it's useful to keep execution on the interactive proc for params in iter_params: make_dataset_file(*params) else: with Pool(max_workers) as pool: pool.starmap(make_dataset_file, iter_params) - print(f"\nFinished making the dataset for {file_count} trainset file(s) to {output_dir}") - print(f"The time taken was {timedelta(0, round(default_timer()-start_time))}.") + if verbose: + print(f"\nFinished making a dataset of {file_count} file(s) within {output_dir}") + print(f"The time taken was {timedelta(0, round(default_timer()-start_time))}.\n") + + +def make_dataset_file(inst_count: int, + file_ix: int, + output_dir: Path, + generator_func: Callable[[int, str, bool], Generator[dict[str, any], any, any]], + valid_ratio: float=0., + test_ratio: float=0, + file_prefix: str="dataset", + save_param_csvs: bool=True, + verbose: bool=False, + simulate: bool=False): + """ + Will make a TensorFlow dataset (tfrecord) file and, optionally, an accompanying parameter csv + file. The dataset file is actually split into up to three subfiles, one each in training, + validation and testing subdirectories, with the ratios of instances in each dictated by the + valid_ratio and test_ratio arguments (with train_ratio implied). + TODO: Optionally, handles switching the LC and params if it's found that the eclipse at phase + zero is not the deeper (deferred to make_dataset_file2). -def make_dataset_file(trainset_file: Path, - output_dir: Path=None, - valid_ratio: float=0., - test_ratio: float=0., - interp_kind: str="cubic", - resume: bool=False, - seed: float=42, - verbose: bool=True, - simulate: bool=False) -> None: - """ - Will make the an equivalent set of TensorFlow dataset (tfrecord) files from - the input trainset csv file. Up to three output files, one each for - training, validation and testing, will be created in equivalently named - subdirectories of the output_dir argument. The contents of the three files - will be selected randomly from the input in the proportions indicated - by the valid_ratio and test_ratio arguments. Each output row will include - a freshly generated phase folded and model lightcurve & additional features, - and the training labels taken from the input. - - :trainset_file: the input trainset csv file - :output_dir: the parent directory for the dataset files, if None the same as the input - :valid_ratio: proportion of rows to be written to the validation file - :test_ratio: proportion of rows to be written to the testing file - :interp_kind: the kind of interpolation used to sample the the lightcurve model - :resume: whether we are to attempt to resume from a previous "make" - :seed: the seed ensures random selection of subsets are repeatable + Adds Gaussian noise to the LC/mags feature if a "snr" param is specified. + + :inst_count: the number of training instances to create + :file_ix: the index number of this file. Is appended to file_prefix to make the file stem. + :output_dir: the directory to write the files to + :generator_func: the function to call to generate the required number of systems which must have + arguments (instance_count: int, file_stem: str, verbose: bool) and return a Generator[dict[str]] + :valid_ratio: proportion of rows to be written to the validation files + :test_ratio: proportion of rows to be written to the testing files + :file_prefix: naming prefix for each of the dataset files + :save_param_csvs: whether to save csv files with full params in addition to the dataset files. :verbose: whether to print verbose progress/diagnostic messages :simulate: whether to simulate the process, skipping only file/directory actions """ - # pylint: disable=too-many-statements, too-many-branches - label = trainset_file.stem - output_dir = trainset_file.parent if output_dir is None else output_dir - this_seed = f"{trainset_file.name}/{seed}" - noise_generator = np.random.default_rng(abs(hash(this_seed))) - - # Work out the subsets & files now, before generating, to see if we can skip on resume - subsets = ["training", "validation", "testing"] - subset_values = [(1 - valid_ratio - test_ratio), valid_ratio, test_ratio] - subset_files = [output_dir / subset / f"{trainset_file.stem}.tfrecord" for subset in subsets] - if resume and all(subset_files[i].is_file() == (subset_values[i] > 0) for i in range(3)): - if verbose: - print(f"{label}: Resume is on and all expected output files exist. Skipping.") - return - - # Current approach builds the full set of outputs in memory, shuffles an index before selecting - # contiguous index subset blocks & saving the indexed data rows to files. A more efficient algo - # would be to create & shuffle the index and open the subset output files in advance, then write - # the rows as we go. However, for that to work we need to know the total row count in advance. - rows = [] - for counter, params in enumerate(read_param_sets_from_csv(trainset_file), 1): - params.setdefault("gravA", 0.) - params.setdefault("gravB", 0.) - - # Large negative values force JKTEBOP task2 to calculate the reflection coefficients - params.setdefault("reflA", -100) - params.setdefault("reflB", -100) - - try: - # model_data's shape is (2, rows) with phase in [0, :] and mags in [1, :] - model_data = jktebop.generate_model_light_curve("trainset_", **params) - if np.isnan(np.min(model_data["delta_mag"])): - # Checking for a Heisenbug where a model is somehow assigned NaN for at least 1 mag - # value, subsequently causing training to fail. Adding mitigation/error reporting - # seems to stop it happening despite the same source params, args and seed being - # used. I'll leave this in place to report if it ever happens again. I think the - # issue was caused by passing "bad" params to JKTEBOP which is why it's not repeated - print(f"{label}[{params['id']}]: Replacing NaN/Inf in processed LC.") - np.nan_to_num(x=model_data["delta_mag"], copy=False) - - # Optionally, add Gaussian flux noise based on the instance's SNR - snr = params.get("snr", None) - if snr: - # We apply the noise to fluxes, so revert delta mags to normalized flux - # and base the noise sigma on the instance's SNR and mean flux - fluxes = np.power(10, np.divide(model_data["delta_mag"], -2.5)) - noise_sigma = np.divide(np.mean(fluxes), np.power(10, np.divide(snr, 10))) - if noise_sigma: - noise = noise_generator.normal(0., scale=noise_sigma, size=len(fluxes)) - model_data["delta_mag"] = np.multiply(-2.5, np.log10(fluxes + noise)) - - # We store mags_features for various supported bins values - interpolator = interp1d(model_data["phase"], model_data["delta_mag"], kind=interp_kind) - mags_features = {} - for mag_name, mags_bins in deb_example.stored_mags_features.items(): - # Ensure we don't waste a bin repeating the value for phase 0.0 & 1.0 - new_phases = np.linspace(0., 1., mags_bins + 1)[:-1] - bin_model_data = np.array([new_phases, interpolator(new_phases)], dtype=np.double) - mags_features[mag_name] = bin_model_data[1] - - # These are the extra features which may be used for predictions alongside the LC. - extra_features = { - "phiS": orbital.secondary_eclipse_phase(params["ecosw"], params["ecc"]), - "dS_over_dP": orbital.ratio_of_eclipse_duration(params["esinw"]), - } - - rows.append(deb_example.serialize(params["id"], params, mags_features, extra_features)) - - if verbose and counter % 100 == 0: - print(f"{label}: Processed {counter} instances.") - - except Exception as exc: # pylint: disable=broad-exception-caught - traceback.print_exc(exc) - print(f"{label}: Skipping instance {counter} which caused exc: {params}") - - rows_total = len(rows) + # pylint: disable=too-many-branches, too-many-statements + interp_kind = "cubic" + file_stem = f"{file_prefix}{file_ix:03d}" + + # Don't use the built-in hash() function; it's not consistent across processes!!! + seed = int.from_bytes(hashlib.shake_128(file_stem.encode("utf8")).digest(8)) + rng = np.random.default_rng(seed) + + ds_filename = f"{file_stem}.tfrecord" + if save_param_csvs and not simulate: + csv_file = output_dir / f"{file_stem}.csv" + csv_file.unlink(missing_ok=True) + else: + csv_file = None + + ds_writers = [None] * 3 + ds_subsets = ["training", "validation", "testing"] + try: + # Set up the dataset files (deleting existing) and decide the distribution of instances + inst_file_ixs = [1] * round(valid_ratio*inst_count) + [2] * round(test_ratio*inst_count) + inst_file_ixs = [0] * (inst_count-len(inst_file_ixs)) + inst_file_ixs + for subset_ix, ds_subset in enumerate(ds_subsets): + if not simulate: + writer = output_dir / ds_subset / ds_filename + if subset_ix in inst_file_ixs: + writer.parent.mkdir(exist_ok=True) + ds_writers[subset_ix] = tf.io.TFRecordWriter(f"{writer}", + deb_example.ds_options) + else: + writer.unlink(missing_ok=True) + rng.shuffle(inst_file_ixs) + + for inst_ix, params in enumerate(generator_func(inst_count, file_stem, verbose)): + + try: + # model_data's shape is (2, rows) with phase in [0, :] and mags in [1, :] + model_data = jktebop.generate_model_light_curve("trainset_", **params) + if np.isnan(np.min(model_data["delta_mag"])): + if verbose: + print(f"{file_stem}[{params['id']}]: Replacing NaN/Inf in processed LC.") + np.nan_to_num(x=model_data["delta_mag"], copy=False) + + # Optionally, add Gaussian flux noise based on the instance's SNR + snr = params.get("snr", None) + if snr: + # We apply the noise to fluxes, so revert delta mags to normalized flux + # and base the noise sigma on the instance's SNR and mean flux + fluxes = np.power(10, np.divide(model_data["delta_mag"], -2.5)) + noise_sigma = np.divide(np.mean(fluxes), np.power(10, np.divide(snr, 10))) + if noise_sigma: + noise = rng.normal(0., scale=noise_sigma, size=len(fluxes)) + model_data["delta_mag"] = np.multiply(-2.5, np.log10(fluxes + noise)) + + # We store mags_features for various supported bins values + mags_features = {} + interp = interp1d(model_data["phase"], model_data["delta_mag"], kind=interp_kind) + for mag_name, mags_bins in deb_example.stored_mags_features.items(): + # Ensure we don't waste a bin repeating the value for phase 0.0 & 1.0 + new_phases = np.linspace(0., 1., mags_bins + 1)[:-1] + bin_model_data = np.array([new_phases, interp(new_phases)], dtype=np.double) + mags_features[mag_name] = bin_model_data[1] + + # These are the extra features which may be used for predictions alongside the LC. + extra_features = { + "phiS": orbital.secondary_eclipse_phase(params["ecosw"], params["ecc"]), + "dS_over_dP": orbital.ratio_of_eclipse_duration(params["esinw"]), + } + + # Write to the appropriate dataset file, based on the inst/file indices + row = deb_example.serialize(params["id"], params, mags_features, extra_features) + ds_writers[inst_file_ixs[inst_ix]].write(row) + + if csv_file: + write_param_sets_to_csv(csv_file, [params], append=True) + + except Exception as exc: # pylint: disable=broad-exception-caught + traceback.print_exc(exc) + inst_file_ixs[inst_ix] = None + print(f"{file_stem}: Skipping instance {inst_ix} which caused exc: {params}") + + finally: + for writer in ds_writers: + if writer and isinstance(writer, tf.io.TFRecordWriter): + writer.close() + if verbose: - print(f"{label}: Finished processing {rows_total} instances.") - - # Turn the subset ratios into counts now we know total number of rows - subset_values[2] = round(subset_values[2] * rows_total) - subset_values[1] = round(subset_values[1] * rows_total) - subset_values[0] = rows_total - sum(subset_values[1:]) # ensure training has all that's left - - # Set up a shuffled index. We use an index rather than shuffling the rows - # directly, which is just as quick, so we can undo the suffle when saving. - row_indices = np.arange(rows_total) - random.Random(this_seed).shuffle(row_indices) - subset_slice_start = 0 - for subset, subset_file, subset_count in zip(subsets, subset_files, subset_values): - msg = None - short_name = f"{subset_file.parent.name}/{subset_file.name}" - if simulate: - msg = f"{label}: Simulated saving {subset_count} {subset} instance(s) to {short_name}" - elif subset_count > 0: - # (Over)write the subset file. Rows are selected as a contiguous block of the shuffled - # indices. which are then re-sorted so the rows is written in the original order. - subset_slice = slice(subset_slice_start, subset_slice_start + subset_count) - subset_file.parent.mkdir(parents=True, exist_ok=True) - with tf.io.TFRecordWriter(f"{subset_file}", deb_example.ds_options) as ds: - for sorted_ix in sorted(row_indices[subset_slice]): - ds.write(rows[sorted_ix]) - subset_slice_start = subset_slice.stop - msg = f"{label}: Saved {subset_count} {subset} instance(s) to {short_name}" - else: - # Delete the existing file which may be left from previous run or we - # will be left with too many rows, and duplicates, over the subsets. - subset_file.unlink(missing_ok=True) - if verbose and msg: - print(msg) + for subset_ix, subset in enumerate(ds_subsets): + saved_count = inst_file_ixs.count(subset_ix) + if saved_count: + print("Simulated saving" if simulate else "Saved", + f"{saved_count} {subset} instance(s) to", + f"{output_dir.name}/{subset}/{ds_filename}") def write_param_sets_to_csv(file_name: Path, param_sets: list[dict], - field_names: list[any] = None) -> None: + field_names: list[any] = None, + append: bool=False) -> None: """ Writes the list of parameter set dictionaries to a csv file. @@ -326,6 +268,7 @@ def write_param_sets_to_csv(file_name: Path, :param_sets: the list of dictionaries to write out. :field_names: the list of fields to write, in the required order. If None, the field_names will be read from the first item in param_sets + :mode: the mode to open the file, "w" write or "a" append """ # This data is saved in an intermediate form as we've yet to # generate the actual light-curves. We use csv, as this is @@ -333,12 +276,13 @@ def write_param_sets_to_csv(file_name: Path, # tensorflow data API for subsequent processing. if field_names is None: field_names = param_sets[0].keys() - with open(file_name, mode="w", encoding="UTF8", newline='') as f: + with open(file_name, mode="a+" if append else "w", encoding="UTF8", newline='') as f: dw = csv.DictWriter(f, field_names, quotechar="'", quoting=csv.QUOTE_NONNUMERIC) - dw.writeheader() + if not append or not f.tell(): + dw.writeheader() dw.writerows(param_sets)