"""
Container for fieldlines in three-dimensional magnetic skeleton
"""
import asdf
import astropy.units as u
import numpy as np
import zarr
from astropy.coordinates import SkyCoord
from functools import cached_property
from scipy.interpolate import interp1d, splev, splprep
from synthesizAR import Strand
from synthesizAR.visualize import plot_fieldlines
__all__ = ['Skeleton']
[docs]
class Skeleton:
"""
Construct magnetic field skeleton fieldlines
Parameters
----------
strands : `list` of `~synthesizAR.Strand` objects
List of objects containing the information about each strand in the magnetic
skeleton.
Examples
--------
>>> import synthesizAR
>>> import astropy.units as u
>>> strand = synthesizAR.Strand('strand', SkyCoord(x=[1,4]*u.Mm, y=[2,5]*u.Mm, z=[3,6]*u.Mm, frame='heliographic_stonyhurst', representation_type='cartesian'), [1e2,1e3] * u.G)
>>> field = synthesizAR.Skeleton([strand,])
"""
def __init__(self, strands):
self.strands = strands
from synthesizAR import log
self.log = log
[docs]
@classmethod
def from_coordinates(cls, coordinates, field_strengths=None, **kwargs):
"""
Construct `Skeleton` from list of coordinates and field strengths
Parameters
----------
coordinates : `list` of `~astropy.coordinates.SkyCoord` objects
Coordinate of each strand
field_strengths : `list` of `~astropy.units.Quantity`
Scalar magnetic field strength along the strand
"""
strands = []
if field_strengths is None:
field_strengths = len(coordinates) * [None]
for i, (coord, fs) in enumerate(zip(coordinates, field_strengths)):
strands.append(Strand(f'strand{i:06d}', coord, field_strength=fs, **kwargs))
return cls(strands)
def __repr__(self):
return f'''synthesizAR Skeleton Object
------------------------
Number of strands: {len(self.strands)}'''
[docs]
def to_asdf(self, filename):
"""
Serialize this instance of `Skeleton` to an ASDF file
"""
tree = {}
for l in self.strands:
tree[l.name] = {
'field_strength': l.field_strength,
'coordinate': l.coordinate,
'cross_sectional_area': l.cross_sectional_area,
}
if l.model_results_filename is not None:
tree[l.name]['model_results_filename'] = l.model_results_filename.as_posix()
with asdf.AsdfFile(tree) as asdf_file:
asdf_file.write_to(filename)
[docs]
@classmethod
def from_asdf(cls, filename):
"""
Restore a `Skeleton` instance from an ASDF file
Examples
--------
>>> import synthesizAR
>>> restored_field = synthesizAR.Skeleton.from_asdf('/path/to/skeleton.asdf') # doctest: +SKIP
"""
exclude_keys = ['asdf_library', 'history']
strands = []
with asdf.open(filename, mode='r', memmap=False, lazy_load=False) as af:
for k in af.keys():
if k in exclude_keys:
continue
model_results_filename = af.tree[k].get('model_results_filename', None)
cross_sectional_area = af.tree[k].get('cross_sectional_area', None)
strand = Strand(k,
SkyCoord(af.tree[k]['coordinate']),
af.tree[k]['field_strength'],
cross_sectional_area=cross_sectional_area,
model_results_filename=model_results_filename)
strands.append(strand)
return cls(strands)
[docs]
@u.quantity_input
def refine_strands(self, delta_s: u.cm, **kwargs):
"""
Interpolate strand coordinates and field strengths to a specified spatial resolution
and return a new `Skeleton` object.
This can be important in order to ensure that an adequate number of points are used
to represent each fieldline when binning intensities onto the instrument grid.
"""
new_strands = []
for l in self.strands:
_l = self.refine_strand(l, delta_s, **kwargs)
new_strands.append(_l)
return Skeleton(new_strands)
[docs]
@staticmethod
@u.quantity_input
def refine_strand(strand, delta_s: u.cm, **kwargs):
evkwargs = kwargs.get('evkwargs', {})
prepkwargs = kwargs.get('prepkwargs', {})
try:
tck, _ = splprep(strand.coordinate.cartesian.xyz.value,
u=strand.field_aligned_coordinate_norm, **prepkwargs)
new_s = np.arange(0, strand.length.to(u.Mm).value, delta_s.to(u.Mm).value) * u.Mm
x, y, z = splev((new_s/strand.length).decompose(), tck, **evkwargs)
except (ValueError, TypeError) as e:
raise Exception(f'Failed to refine {strand.name}') from e
unit = strand.coordinate.cartesian.xyz.unit
new_coord = SkyCoord(x=x*unit,
y=y*unit,
z=z*unit,
frame=strand.coordinate.frame,
representation_type=strand.coordinate.representation_type)
f_B = interp1d(strand.field_aligned_coordinate.to(u.Mm), strand.field_strength)
new_field_strength = f_B(new_s.to(u.Mm)) * strand.field_strength.unit
f_A = interp1d(strand.field_aligned_coordinate.to(u.Mm), strand.cross_sectional_area)
new_area = f_A(new_s.to(u.Mm)) * strand.cross_sectional_area.unit
return Strand(strand.name,
new_coord,
field_strength=new_field_strength,
cross_sectional_area=new_area,
model_results_filename=strand.model_results_filename)
@property
def all_coordinates(self):
"""
Coordinates for all stands in the skeleton.
.. note:: This should be treated as a collection of points and NOT a
continuous structure.
"""
return SkyCoord([l.coordinate for l in self.strands],
frame=self.strands[0].coordinate.frame,
representation_type=self.strands[0].coordinate.representation_type)
@property
def all_coordinates_centers(self):
"""
Coordinates for all grid cell centers of all strands in the skeleton
.. note:: This should be treated as a collection of points and NOT a
continuous structure.
"""
return SkyCoord([l.coordinate_center for l in self.strands],
frame=self.strands[0].coordinate_center.frame,
representation_type=self.strands[0].coordinate_center.representation_type)
@cached_property
def all_widths(self) -> u.cm:
"""
Widths for all strands concatenated together
"""
return np.concatenate([l.field_aligned_coordinate_width for l in self.strands])
@cached_property
def all_cross_sectional_areas(self) -> u.cm**2:
"""
Cross-sectional areas for all strands concatenated together.
.. note:: These are the cross-sectional areas evaluated at the center of the strand.
"""
return np.concatenate([l.cross_sectional_area_center for l in self.strands])
[docs]
@u.quantity_input
def get_chromosphere_mask(self, footpoint_height:u.Mm):
"Returns result of `synthesizAR.Strand.get_chromosphere_mask` for all strands in skeleton."
return np.concatenate([l.get_chromosphere_mask(footpoint_height) for l in self.strands])
[docs]
def peek(self, **kwargs):
"""
Plot strand coordinates on the solar disk.
See Also
--------
synthesizAR.visualize.plot_fieldlines
"""
plot_fieldlines(*[_.coordinate for _ in self.strands], **kwargs)
@staticmethod
def _load_loop_simulation(strand, root=None, interface=None, emission_model=None):
# Load in parameters from interface
results = interface.load_results(strand, emission_model=emission_model)
# If no Zarr file is passed, set the quantites as attributes on the loops
if root is None:
strand._simulation_type = interface.name
for name, quantity in results.items():
setattr(strand, f'_{name}', quantity)
else:
# Write to file
grp = root.create_group(strand.name)
grp.attrs['simulation_type'] = interface.name
# NOTE: Set the chunk size such that accessing all entries for a given timestep
# is the most efficient pattern.
chunks = results['time'].shape + (1,)
for name, quantity in results.items():
dset = grp.create_array(name,
data=quantity.value,
chunks='auto' if name=='time' else chunks)
dset.attrs['unit'] = quantity.unit.to_string()
[docs]
def load_loop_simulations(self, interface, filename=None, parallelize=False, emission_model=None):
"""
Load results from hydrodynamic results.
Parameters
----------
interface : model interface object
Interface to the hydrodynamic simulation from which to load the results
filename : `str` or path-like
Path to `zarr` store to write hydrodynamic results to
parallelize : `bool`
If True and a `distributed.Client` exists, load loop simulations in parallel.
emission_model : `synthesizAR.atomic.EmissionModel`
Emission model that specifies the ions used in the emission modeling process.
This can be optionally specified in order to load the time-dependent ionization
fractions for some models.
"""
if filename is None:
root = None
else:
root = zarr.open(store=filename, mode='w')
for strand in self.strands:
strand.model_results_filename = filename
try:
import distributed
client = distributed.get_client()
except (ImportError, ValueError):
pass
else:
if parallelize:
status = client.map(
self._load_loop_simulation,
self.strands,
root=root,
interface=interface,
emission_model=emission_model,
)
return status
for l in self.strands:
self.log.debug(f'Loading results for strand {l.name}')
self._load_loop_simulation(l, root=root, interface=interface, emission_model=emission_model)