Skip to content

Commit

Permalink
Add emission cell temperatures to spherical plot (#69)
Browse files Browse the repository at this point in the history
Add radiation temperatures at emission location of packets for 3D models
to spherical plots
  • Loading branch information
lukeshingles authored May 16, 2023
1 parent 4e54608 commit 4b1ac31
Show file tree
Hide file tree
Showing 5 changed files with 122 additions and 29 deletions.
57 changes: 39 additions & 18 deletions artistools/estimators/estimators.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
from collections.abc import Collection
from collections.abc import Iterator
from collections.abc import Sequence
from functools import lru_cache
from functools import partial
from functools import reduce
from pathlib import Path
Expand Down Expand Up @@ -128,10 +127,9 @@ def parse_estimfile(
modelpath: Path,
get_ion_values: bool = True,
get_heatingcooling: bool = True,
skip_emptycells: bool = False,
) -> Iterator[tuple[int, int, dict]]: # pylint: disable=unused-argument
"""Generate timestep, modelgridindex, dict from estimator file."""
# itstep = at.get_inputparams(modelpath)['itstep']

with at.zopen(estfilepath) as estimfile:
timestep: int = -1
modelgridindex: int = -1
Expand All @@ -143,7 +141,11 @@ def parse_estimfile(

if row[0] == "timestep":
# yield the previous block before starting a new one
if timestep >= 0 and modelgridindex >= 0:
if (
timestep >= 0
and modelgridindex >= 0
and not (skip_emptycells and estimblock.get("emptycell", True))
):
yield timestep, modelgridindex, estimblock

timestep = int(row[1])
Expand Down Expand Up @@ -231,7 +233,7 @@ def parse_estimfile(
estimblock["cooling_" + coolingtype] = float(value)

# reached the end of file
if timestep >= 0 and modelgridindex >= 0:
if timestep >= 0 and modelgridindex >= 0 and not (skip_emptycells and estimblock.get("emptycell", True)):
yield timestep, modelgridindex, estimblock


Expand All @@ -243,6 +245,7 @@ def read_estimators_from_file(
printfilename: bool = False,
get_ion_values: bool = True,
get_heatingcooling: bool = True,
skip_emptycells: bool = False,
) -> dict[tuple[int, int], Any]:
estimators_thisfile = {}
estimfilename = f"estimators_{mpirank:04d}.out"
Expand All @@ -258,7 +261,11 @@ def read_estimators_from_file(
print(f"Reading {estfilepath.relative_to(modelpath.parent)} ({filesize:.2f} MiB)")

for fileblock_timestep, fileblock_modelgridindex, file_estimblock in parse_estimfile(
estfilepath, modelpath, get_ion_values=get_ion_values, get_heatingcooling=get_heatingcooling
estfilepath,
modelpath,
get_ion_values=get_ion_values,
get_heatingcooling=get_heatingcooling,
skip_emptycells=skip_emptycells,
):
if arr_velocity_outer is not None:
file_estimblock["velocity_outer"] = arr_velocity_outer[fileblock_modelgridindex]
Expand All @@ -269,18 +276,22 @@ def read_estimators_from_file(
return estimators_thisfile


@lru_cache(maxsize=16)
def read_estimators(
modelpath: Union[Path, str],
modelpath: Union[Path, str] = Path(),
modelgridindex: Union[None, int, Sequence[int]] = None,
timestep: Union[None, int, Sequence[int]] = None,
mpirank: Optional[int] = None,
runfolder: Union[None, str, Path] = None,
get_ion_values: bool = True,
get_heatingcooling: bool = True,
skip_emptycells: bool = False,
add_velocity: bool = True,
) -> dict[tuple[int, int], dict]:
"""Read estimator files into a nested dictionary structure.
Speed it up by only retrieving estimators for a particular timestep(s) or modelgrid cells.
"""
modelpath = Path(modelpath)
match_modelgridindex: Collection[int]
if modelgridindex is None:
match_modelgridindex = []
Expand All @@ -307,20 +318,29 @@ def read_estimators(

# print(f" matching cells {match_modelgridindex} and timesteps {match_timestep}")

modeldata, _ = at.inputmodel.get_modeldata(modelpath, getheadersonly=True)
if "velocity_outer" in modeldata.columns:
modeldata, _ = at.inputmodel.get_modeldata(modelpath)
arr_velocity_outer = tuple([float(v) for v in modeldata["velocity_outer"].to_numpy()])
else:
arr_velocity_outer = None
arr_velocity_outer = None
if add_velocity:
modeldata, _ = at.inputmodel.get_modeldata(modelpath, getheadersonly=True)
if "velocity_outer" in modeldata.columns:
modeldata, _ = at.inputmodel.get_modeldata(modelpath)
arr_velocity_outer = tuple([float(v) for v in modeldata["velocity_outer"].to_numpy()])

mpiranklist = (
at.get_mpiranklist(modelpath, modelgridindex=match_modelgridindex, only_ranks_withgridcells=True)
if mpirank is None
else [mpirank]
)

mpiranklist = at.get_mpiranklist(modelpath, modelgridindex=match_modelgridindex, only_ranks_withgridcells=True)
runfolders = at.get_runfolders(modelpath, timesteps=match_timestep) if runfolder is None else [Path(runfolder)]

printfilename = len(mpiranklist) < 10

estimators = {}
for folderpath in at.get_runfolders(modelpath, timesteps=match_timestep):
print(f"Reading {len(list(mpiranklist))} estimator files in {folderpath.relative_to(Path(modelpath).parent)}")
for folderpath in runfolders:
if not printfilename:
print(
f"Reading {len(list(mpiranklist))} estimator files in {folderpath.relative_to(Path(modelpath).parent)}"
)

processfile = partial(
read_estimators_from_file,
Expand All @@ -330,10 +350,11 @@ def read_estimators(
get_ion_values=get_ion_values,
get_heatingcooling=get_heatingcooling,
printfilename=printfilename,
skip_emptycells=skip_emptycells,
)

if at.get_config()["num_processes"] > 1:
with multiprocessing.get_context("fork").Pool(processes=at.get_config()["num_processes"]) as pool:
with multiprocessing.get_context("spawn").Pool(processes=at.get_config()["num_processes"]) as pool:
arr_rankestimators = pool.map(processfile, mpiranklist)
pool.close()
pool.join()
Expand Down
4 changes: 4 additions & 0 deletions artistools/inputmodel/inputmodel_misc.py
Original file line number Diff line number Diff line change
Expand Up @@ -371,6 +371,10 @@ def get_modeldata(
dfmodel = None
filenameparquet = at.stripallsuffixes(Path(filename)).with_suffix(".txt.parquet")

if filenameparquet.exists() and Path(filename).stat().st_mtime > filenameparquet.stat().st_mtime:
print(f"{filename} has been modified after {filenameparquet}. Deleting out of date parquet file.")
filenameparquet.unlink()

source_textfile_details = {"st_size": filename.stat().st_size, "st_mtime": filename.stat().st_mtime}

if filenameparquet.is_file() and not getheadersonly:
Expand Down
3 changes: 1 addition & 2 deletions artistools/misc.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import math
import os
from collections import namedtuple
from collections.abc import Collection
from collections.abc import Iterable
from collections.abc import Iterator
from collections.abc import Sequence
Expand Down Expand Up @@ -1073,7 +1072,7 @@ def get_runfolder_timesteps(folderpath: Union[Path, str]) -> tuple[int, ...]:

def get_runfolders(
modelpath: Union[Path, str], timestep: Optional[int] = None, timesteps: Optional[Sequence[int]] = None
) -> Collection[Path]:
) -> Sequence[Path]:
"""Get a list of folders containing ARTIS output files from a modelpath, optionally with a timestep restriction.
The folder list may include non-ARTIS folders if a timestep is not specified.
Expand Down
37 changes: 31 additions & 6 deletions artistools/packets/packets.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import calendar
import gzip
import math
import multiprocessing as mp
import multiprocessing
from collections.abc import Sequence
from functools import lru_cache
from pathlib import Path
Expand All @@ -16,7 +16,7 @@
import artistools as at

# for the parquet files
time_lastschemachange = (2023, 4, 22, 12, 31, 0)
time_parquetschemachange = (2023, 4, 22, 12, 31, 0)

CLIGHT = 2.99792458e10
DAY = 86400
Expand Down Expand Up @@ -149,13 +149,18 @@ def add_derived_columns(
return dfpackets

colnames = at.makelist(colnames)
dimensions = at.get_inputparams(modelpath)["n_dimensions"]

def em_modelgridindex(packet) -> Union[int, float]:
assert dimensions == 1

return at.inputmodel.get_mgi_of_velocity_kms(
modelpath, packet.emission_velocity * cm_to_km, mgilist=allnonemptymgilist
)

def emtrue_modelgridindex(packet) -> Union[int, float]:
assert dimensions == 1

return at.inputmodel.get_mgi_of_velocity_kms(
modelpath, packet.true_emission_velocity * cm_to_km, mgilist=allnonemptymgilist
)
Expand Down Expand Up @@ -198,7 +203,7 @@ def emtrue_timestep(packet) -> int:
return dfpackets


def add_derived_columns_lazy(dfpackets: pl.LazyFrame) -> pl.LazyFrame:
def add_derived_columns_lazy(dfpackets: pl.LazyFrame, modelmeta: Optional[dict] = None) -> pl.LazyFrame:
# we might as well add everything, since the columns only get calculated when they are actually used

dfpackets = dfpackets.with_columns(
Expand All @@ -222,6 +227,26 @@ def add_derived_columns_lazy(dfpackets: pl.LazyFrame) -> pl.LazyFrame:
]
)

if modelmeta is not None and modelmeta["dimensions"] == 3:
t_model_s = modelmeta["t_model_init_days"] * 86400.0
vmax = modelmeta["vmax_cmps"]
vwidth = modelmeta["wid_init"] / t_model_s
dfpackets = dfpackets.with_columns(
[
((pl.col(f"em_pos{ax}") / pl.col("em_time") + vmax) / vwidth).cast(pl.Int32).alias(f"coordpointnum{ax}")
for ax in ["x", "y", "z"]
]
)
dfpackets = dfpackets.with_columns(
[
(
pl.col("coordpointnumz") * modelmeta["ncoordgridy"] * modelmeta["ncoordgridx"]
+ pl.col("coordpointnumy") * modelmeta["ncoordgridx"]
+ pl.col("coordpointnumx")
).alias("em_modelgridindex")
]
)

return dfpackets


Expand Down Expand Up @@ -375,7 +400,7 @@ def get_packetsfilepaths(
searchfolders = [Path(modelpath, "packets"), Path(modelpath)]
# in descending priority (based on speed of reading)
suffix_priority = [".out.zst", ".out.lz4", ".out.zst", ".out", ".out.gz", ".out.xz"]
t_lastschemachange = calendar.timegm(time_lastschemachange)
t_lastschemachange = calendar.timegm(time_parquetschemachange)

parquetpacketsfiles = []
parquetrequiredfiles = []
Expand Down Expand Up @@ -417,7 +442,7 @@ def get_packetsfilepaths(
break

if len(parquetrequiredfiles) >= 20:
with mp.get_context("spawn").Pool(processes=at.get_config()["num_processes"]) as pool:
with multiprocessing.get_context("spawn").Pool(processes=at.get_config()["num_processes"]) as pool:
convertedparquetpacketsfiles = pool.map(convert_text_to_parquet, parquetrequiredfiles)
pool.close()
pool.join()
Expand Down Expand Up @@ -450,7 +475,7 @@ def get_packets_pl(

nprocs_read = len(packetsfiles)
packetsdatasize_gb = nprocs_read * Path(packetsfiles[0]).stat().st_size / 1024 / 1024 / 1024
print(f" data size is {packetsdatasize_gb:.1f} GB (size of {packetsfiles[0].parts[-1]} * {nprocs_read})")
print(f" data size is {packetsdatasize_gb:.1f} GB ({nprocs_read} * size of {packetsfiles[0].parts[-1]})")

pldfpackets = pl.concat(
(pl.scan_parquet(packetsfile) for packetsfile in packetsfiles),
Expand Down
50 changes: 47 additions & 3 deletions artistools/plotspherical.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ def plot_spherical(

solidanglefactor = nphibins * ncosthetabins
aggs = []
dfpackets = at.packets.add_derived_columns_lazy(dfpackets)
dfpackets = at.packets.add_derived_columns_lazy(dfpackets, modelmeta=modelmeta)
if "emvelocityoverc" in plotvars:
aggs.append(
((pl.col("emission_velocity") * pl.col("e_rf")).mean() / pl.col("e_rf").mean() / 29979245800).alias(
Expand All @@ -115,6 +115,48 @@ def plot_spherical(
)
)

if "temperature" in plotvars:
timebins = [
*at.get_timestep_times_float(modelpath, loc="start") * 86400.0,
at.get_timestep_times_float(modelpath, loc="end")[-1] * 86400.0,
]

binindex = (
dfpackets.select("em_time")
.lazy()
.collect()
.get_column("em_time")
.cut(bins=list(timebins), category_label="em_timestep", maintain_order=True)
.get_column("em_timestep")
.cast(pl.Int32)
- 1 # subtract 1 because the returned index 0 is the bin below the start of the first supplied bin
)
dfpackets = dfpackets.with_columns([binindex])
dfest_parquetfile = Path(modelpath, "temperatures.parquet.tmp")

if not dfest_parquetfile.is_file():
estimators = at.estimators.read_estimators(
modelpath,
get_ion_values=False,
get_heatingcooling=False,
skip_emptycells=True,
)
pl.DataFrame(
{
"timestep": (tsmgi[0] for tsmgi in estimators),
"modelgridindex": (tsmgi[1] for tsmgi in estimators),
"TR": (estimators[tsmgi].get("TR", -1) for tsmgi in estimators),
},
).filter(pl.col("TR") >= 0).with_columns(pl.col(pl.Int64).cast(pl.Int32)).write_parquet(
dfest_parquetfile, compression="zstd"
)

df_estimators = pl.scan_parquet(dfest_parquetfile).rename(
{"timestep": "em_timestep", "modelgridindex": "em_modelgridindex", "TR": "em_TR"}
)
dfpackets = dfpackets.join(df_estimators, on=["em_timestep", "em_modelgridindex"], how="left")
aggs.append(((pl.col("em_TR") * pl.col("e_rf")).mean() / pl.col("e_rf").mean()).alias("temperature"))

if atomic_number is not None or ion_stage is not None:
dflinelist = at.get_linelist_pldf(modelpath)
if atomic_number is not None:
Expand Down Expand Up @@ -199,6 +241,8 @@ def plot_spherical(
colorbartitle = r"Mean line of sight velocity [c]"
elif plotvar == "luminosity":
colorbartitle = r"$I_{e,\Omega}\cdot4\pi/\Omega$ [erg/s]"
elif plotvar == "temperature":
colorbartitle = r"Temperature [K]"
else:
raise AssertionError

Expand Down Expand Up @@ -234,9 +278,9 @@ def addargs(parser: argparse.ArgumentParser) -> None:
parser.add_argument(
"-plotvars",
default=["luminosity", "emvelocityoverc", "emlosvelocityoverc"],
choices=["luminosity", "emvelocityoverc", "emlosvelocityoverc"],
choices=["luminosity", "emvelocityoverc", "emlosvelocityoverc", "temperature"],
nargs="+",
help="Variable to plot: luminosity, emvelocityoverc, emlosvelocityoverc",
help="Variable to plot: luminosity, emvelocityoverc, emlosvelocityoverc, temperature",
)
parser.add_argument("-elem", type=str, default=None, help="Filter emitted packets by element of last emission")
parser.add_argument(
Expand Down

0 comments on commit 4b1ac31

Please sign in to comment.