"""
Container for fieldlines in three-dimensional magnetic skeleton
"""
from functools import cached_property
import numpy as np
from scipy.interpolate import splev, splprep, interp1d
import astropy.units as u
from astropy.coordinates import SkyCoord
import asdf
import zarr
from synthesizAR import Loop
from synthesizAR.visualize import plot_fieldlines
__all__ = ['Skeleton']
[docs]
class Skeleton(object):
"""
Construct magnetic field skeleton fieldlines
Parameters
----------
loops : `list`
List of `Loop` objects
Examples
--------
>>> import synthesizAR
>>> import astropy.units as u
>>> loop = synthesizAR.Loop('loop', SkyCoord(x=[1,4]*u.Mm, y=[2,5]*u.Mm, z=[3,6]*u.Mm,frame='heliographic_stonyhurst', representation_type='cartesian'), [1e2,1e3] * u.G)
>>> field = synthesizAR.Skeleton([loop,])
"""
def __init__(self, loops):
self.loops = loops
[docs]
@classmethod
def from_coordinates(cls, coordinates, field_strengths=None, **kwargs):
"""
Construct `Skeleton` from list of coordinates and field strengths
Parameters
----------
coordinates : `list`
List of `~astropy.coordinates.SkyCoord` loop coordinates
field_strengths : `list`
List of `~astropy.units.Quantity` scalar magnetic field strength along the loop
"""
loops = []
if field_strengths is None:
field_strengths = len(coordinates) * [None]
for i, (coord, fs) in enumerate(zip(coordinates, field_strengths)):
loops.append(Loop(f'loop{i:06d}', coord, field_strength=fs, **kwargs))
return cls(loops)
def __repr__(self):
return f'''synthesizAR Skeleton Object
------------------------
Number of loops: {len(self.loops)}'''
[docs]
def to_asdf(self, filename):
"""
Serialize this instance of `Skeleton` to an ASDF file
"""
tree = {}
for l in self.loops:
tree[l.name] = {
'field_strength': l.field_strength,
'coordinate': l.coordinate,
'cross_sectional_area': l.cross_sectional_area,
'model_results_filename': l.model_results_filename,
}
with asdf.AsdfFile(tree) as asdf_file:
asdf_file.write_to(filename)
[docs]
@classmethod
def from_asdf(cls, filename):
"""
Restore a `Skeleton` instance from an ASDF file
Examples
--------
>>> import synthesizAR
>>> restored_field = synthesizAR.Skeleton.from_asdf('/path/to/skeleton.asdf') # doctest: +SKIP
"""
exclude_keys = ['asdf_library', 'history']
loops = []
with asdf.open(filename, mode='r', copy_arrays=True) as af:
for k in af.keys():
if k in exclude_keys:
continue
model_results_filename = af.tree[k].get('model_results_filename', None)
cross_sectional_area = af.tree[k].get('cross_sectional_area', None)
loops.append(Loop(
k,
SkyCoord(af.tree[k]['coordinate']),
af.tree[k]['field_strength'],
cross_sectional_area=cross_sectional_area,
model_results_filename=model_results_filename,
))
return cls(loops)
[docs]
@u.quantity_input
def refine_loops(self, delta_s: u.cm, **kwargs):
"""
Interpolate loop coordinates and field strengths to a specified spatial resolution
and return a new `Skeleton` object.
This can be important in order to ensure that an adequate number of points are used
to represent each fieldline when binning intensities onto the instrument grid.
"""
new_loops = []
for l in self.loops:
_l = self.refine_loop(l, delta_s, **kwargs)
new_loops.append(_l)
return Skeleton(new_loops)
[docs]
@staticmethod
@u.quantity_input
def refine_loop(loop, delta_s: u.cm, **kwargs):
evkwargs = kwargs.get('evkwargs', {})
prepkwargs = kwargs.get('prepkwargs', {})
try:
tck, _ = splprep(loop.coordinate.cartesian.xyz.value,
u=loop.field_aligned_coordinate_norm, **prepkwargs)
new_s = np.arange(0, loop.length.to(u.Mm).value, delta_s.to(u.Mm).value) * u.Mm
x, y, z = splev((new_s/loop.length).decompose(), tck, **evkwargs)
except (ValueError, TypeError) as e:
raise Exception(f'Failed to refine {loop.name}') from e
unit = loop.coordinate.cartesian.xyz.unit
new_coord = SkyCoord(x=x*unit,
y=y*unit,
z=z*unit,
frame=loop.coordinate.frame,
representation_type=loop.coordinate.representation_type)
f_B = interp1d(loop.field_aligned_coordinate.to(u.Mm), loop.field_strength)
new_field_strength = f_B(new_s.to(u.Mm)) * loop.field_strength.unit
f_A = interp1d(loop.field_aligned_coordinate.to(u.Mm), loop.cross_sectional_area)
new_area = f_A(new_s.to(u.Mm)) * loop.cross_sectional_area.unit
return Loop(loop.name,
new_coord,
field_strength=new_field_strength,
cross_sectional_area=new_area,
model_results_filename=loop.model_results_filename)
@property
def all_coordinates(self):
"""
Coordinates for all loops in the skeleton.
.. note:: This should be treated as a collection of points and NOT a
continuous structure.
"""
return SkyCoord([l.coordinate for l in self.loops],
frame=self.loops[0].coordinate.frame,
representation_type=self.loops[0].coordinate.representation_type)
@property
def all_coordinates_centers(self):
"""
Coordinates for all grid cell centers of all loops in the skeleton
.. note:: This should be treated as a collection of points and NOT a
continuous structure.
"""
return SkyCoord([l.coordinate_center for l in self.loops],
frame=self.loops[0].coordinate_center.frame,
representation_type=self.loops[0].coordinate_center.representation_type)
@cached_property
def all_widths(self) -> u.cm:
"""
Widths for all loops concatenated together
"""
return np.concatenate([l.field_aligned_coordinate_width for l in self.loops])
@cached_property
def all_cross_sectional_areas(self) -> u.cm**2:
"""
Cross-sectional areas for all loops concatenated together.
.. note:: These are the cross-sectional areas evaluated at the center of the loop.
"""
return np.concatenate([l.cross_sectional_area_center for l in self.loops])
[docs]
def peek(self, **kwargs):
"""
Plot loop coordinates on the solar disk.
See Also
--------
synthesizAR.visualize.plot_fieldlines
"""
plot_fieldlines(*[_.coordinate for _ in self.loops], **kwargs)
@staticmethod
def _load_loop_simulation(loop, root=None, interface=None):
# Load in parameters from interface
(time, electron_temperature, ion_temperature,
density, velocity) = interface.load_results(loop)
# If no Zarr file is passed, set the quantites as attributes on the loops
if root is None:
loop._time = time
loop._electron_temperature = electron_temperature
loop._ion_temperature = ion_temperature
loop._density = density
loop._velocity = velocity
loop._simulation_type = interface.name
else:
# Write to file
grp = root.create_group(loop.name)
grp.attrs['simulation_type'] = interface.name
# time
dset_time = grp.create_dataset('time', data=time.value)
dset_time.attrs['unit'] = time.unit.to_string()
# NOTE: Set the chunk size such that accessing all entries for a given timestep
# is the most efficient pattern.
chunks = (None,) + loop.field_aligned_coordinate_center.shape
# electron temperature
dset_electron_temperature = grp.create_dataset(
'electron_temperature', data=electron_temperature.value, chunks=chunks)
dset_electron_temperature.attrs['unit'] = electron_temperature.unit.to_string()
# ion temperature
dset_ion_temperature = grp.create_dataset(
'ion_temperature', data=ion_temperature.value, chunks=chunks)
dset_ion_temperature.attrs['unit'] = ion_temperature.unit.to_string()
# number density
dset_density = grp.create_dataset('density', data=density.value, chunks=chunks)
dset_density.attrs['unit'] = density.unit.to_string()
# field-aligned velocity
dset_velocity = grp.create_dataset('velocity', data=velocity.value, chunks=chunks)
dset_velocity.attrs['unit'] = velocity.unit.to_string()
dset_velocity.attrs['note'] = 'Velocity in the field-aligned direction'
[docs]
def load_loop_simulations(self, interface, filename=None, **kwargs):
"""
Load in loop parameters from hydrodynamic results.
"""
if filename is None:
root = None
else:
root = zarr.open(store=filename, mode='w', **kwargs)
for loop in self.loops:
loop.model_results_filename = filename
try:
import distributed
client = distributed.get_client()
except (ImportError, ValueError):
for l in self.loops:
self._load_loop_simulation(l, root=root, interface=interface)
else:
status = client.map(
self._load_loop_simulation,
self.loops,
root=root,
interface=interface,
)
return status
[docs]
def load_ionization_fractions(self, emission_model, interface=None, **kwargs):
"""
Load the ionization fractions for each ion in the emission model.
Parameters
----------
emission_model : `synthesizAR.atomic.EmissionModel`
interface : optional
A model interface. Only necessary if loading the ionization fractions
from the model
If the model interface provides a method for loading the population fraction
from the model, use that to get the population fractions. Otherwise, compute
the ion population fractions in equilibrium. This should be done after
calling `load_loop_simulations`.
"""
from fiasco import Element
from synthesizAR.atomic import equilibrium_ionization
root = zarr.open(store=self.loops[0].model_results_filename, mode='a', **kwargs)
# Check if we can load from the model
FROM_MODEL = False
if interface is not None and hasattr(interface, 'load_ionization_fraction'):
frac = interface.load_ionization_fraction(self.loops[0], emission_model[0])
# Some models may optionally output the ionization fractions such that
# they will have this method, but it may not return anything
if frac is not None:
FROM_MODEL = True
# Get the unique elements from all of our ions
element_names = list(set([ion.element_name for ion in emission_model]))
for el_name in element_names:
el = Element(el_name, emission_model.temperature)
ions = [i for i in emission_model if i.element_name == el.element_name]
for loop in self.loops:
chunks = (None,) + loop.field_aligned_coordinate_center.shape
if not FROM_MODEL:
frac_el = equilibrium_ionization(el, loop.electron_temperature)
if 'ionization_fraction' in root[loop.name]:
grp = root[f'{loop.name}/ionization_fraction']
else:
grp = root[loop.name].create_group('ionization_fraction')
for ion in ions:
if FROM_MODEL:
frac = interface.load_ionization_fraction(loop, ion)
desc = f'{ion.ion_name} ionization fraction computed by {interface.name}'
else:
frac = frac_el[:, :, ion.charge_state]
desc = f'{ion.ion_name} ionization fraction in equilibrium.'
dset = grp.create_dataset(f'{ion.ion_name}', data=frac, chunks=chunks)
dset.attrs['unit'] = ''
dset.attrs['description'] = desc