Source code for synthesizAR.atomic.emission_models

"""
Various models for calculating emission from multiple ions
"""
import asdf
import astropy.units as u
import fiasco
import numpy as np
import pathlib
import zarr

from fiasco.util.exceptions import MissingDatasetException
from functools import cache
from scipy.integrate import trapezoid
from scipy.interpolate import interp1d

__all__ = ['EmissionModel']


[docs] class EmissionModel(fiasco.IonCollection): """ Model for what atomic data is used to calculate emissivity. This model calculates line and continuum emissivity as a function of temperature, density (for line emission), and wavelength. In the case of line emission, emissivity is calculated once and then saved to a table to avoid repeating expensive level populations calculations. Parameters ---------- density: `~astropy.units.Quantity` Array of densities over which line emissivity is computed args: `~fiasco.Ion` All remaining positional arguments are the ions comprising the emission model. These can also be elements or collections. emissivity_table_filename: `str` or pathlike, optional Path to Zarr store for saving the emissivity calculations. If this file already exists, care should be taken to ensure that all input parameters are the same as the model that originally generated this file. If not specified, the emissivity table will be saved to the current directory using the `id` of this object. kwargs : `dict`, optional Additional keyword arguments to be passed to `fiasco.Ion.level_populations`. """ @u.quantity_input def __init__(self, density: u.cm**(-3), *args, emissivity_table_filename=None, **kwargs): super().__init__(*args) self.density = density self.emissivity_table_filename = emissivity_table_filename self._level_pops_kwargs = kwargs def __getitem__(self, value): if isinstance(value, str): # Index by ion roman name. Not calling iterator because this would be circular return {i.ion_name_roman: i for i in self._ion_list}[value] else: return super().__getitem__(value) @property def emissivity_table_filename(self): return self._emissivity_table_filename @emissivity_table_filename.setter def emissivity_table_filename(self, value): if value is None: value = pathlib.Path.cwd() / f'emissivity_table_{id(self)}.zarr' self._emissivity_table_filename = pathlib.Path(value)
[docs] def to_asdf(self, filename): """ Serialize an `EmissionModel` to an ASDF file """ tree = { 'temperature': self.temperature, 'density': self.density, 'emissivity_table_filename': self.emissivity_table_filename.as_posix(), 'level_pops_kwargs': self._level_pops_kwargs, 'ions': [ion.ion_name for ion in self], 'ion_kwargs': [ion._instance_kwargs for ion in self] } with asdf.AsdfFile(tree) as asdf_file: asdf_file.write_to(filename)
[docs] @classmethod def from_asdf(cls, filename): """ Restore `EmissionModel` instance from an ASDF file """ with asdf.open(filename, mode='r', memmap=False, lazy_load=False) as af: temperature = af.tree['temperature'] density = af.tree['density'] emissivity_table_filename = af.tree['emissivity_table_filename'] level_pops_kwargs = af.tree['level_pops_kwargs'] ions = af.tree['ions'] ion_kwargs = af.tree['ion_kwargs'] ions = [fiasco.Ion(ion, temperature, **kwargs) for ion, kwargs in zip(ions, ion_kwargs)] em_model = cls( density, *ions, emissivity_table_filename=emissivity_table_filename, **level_pops_kwargs ) return em_model
[docs] def build_emissivity_table(self): """ Save line and continuum emissivity to a Zarr file. Calculate line and continuum emissivity as a function of wavelength, temperature, and density for each ion in the emission model to a Zarr file. It is not necessary to call this function as emissivities will be automatically calculated and stored as needed, but it may be convenient in some instances. """ for ion in self: self.log.debug(f'Calculating emissivity for {ion.ion_name}.') _ = self.get_line_emissivity(ion) _ = self.get_continuum_emissivity(ion)
def _get_quantity_from_emissivity_table(self, ion, path, slice=None): "Convenience method for retrieving a quantity for a given ion from the Zarr file" slice = np.s_[...] if slice is None else slice root = zarr.open(store=self.emissivity_table_filename, mode='r') ds = root[f'{ion.ion_name}/{path}'] if (unit:=ds.attrs.get('unit')) is not None: return u.Quantity(ds[slice], unit) else: return ds[slice] def _calculate_line_emissivity(self, ion): # NOTE: Purposefully not using the contribution_function or emissivity methods on # fiasco.Ion because (i) ionization fraction may be loop dependent, (ii) don't want # to assume any abundance at this stage so that we can change it later without having # to recalculate the level populations, and (iii) we want to exclude the hc/lambda # factor. try: pop = ion.level_populations(self.density, **self._level_pops_kwargs) except MissingDatasetException: # NOTE: populations not available for every ion self.log.warning(f'Cannot compute level populations for {ion.ion_name}') wavelength = [0]*u.AA emissivity = u.Quantity(np.zeros(self.temperature.shape+self.density.shape+(1,)), 'cm3 ph s-1') else: upper_level = ion.transitions.upper_level[ion.transitions.is_bound_bound] wavelength = ion.transitions.wavelength[ion.transitions.is_bound_bound] label = ion.transitions.label[ion.transitions.is_bound_bound] A = ion.transitions.A[ion.transitions.is_bound_bound] i_upper = fiasco.util.vectorize_where(np.arange(1, ion.n_levels+1), upper_level) emissivity = pop[:, :, i_upper] * A * u.photon emissivity = emissivity[:, :, np.argsort(wavelength)] label = label[np.argsort(wavelength)] wavelength = np.sort(wavelength) # This is the factor of n_H/n_e * 1/n_e which replaces 0.83 / n_e nH_ne2 = np.outer(ion.proton_electron_ratio, 1/self.density)[..., np.newaxis] emissivity *= ion.abundance * nH_ne2 return wavelength, label, emissivity
[docs] def get_line_emissivity(self, ion, transition=None): r""" Get bound-bound emissivity for all lines of a particular ion. .. note:: This first searches the Zarr store at ``emissivity_table_filename`` and if no emissivity is found, it is calculated. As such, the first time this is run for a given ion it may be slow, but will cache the result on subsequent calls. In this case, the emissivity, as a function of density :math:`n` and temperature :math:`T`, for a transition :math:`ij` is defined as, .. math:: G_{ij}(n,T) = \frac{n_H}{n_e}\frac{1}{n_e}\mathrm{Ab}_X N_j(n,T) A_{ij} where :math:`N_j` is the level population of level :math:`j` for a given ion of element :math:`X`. Note that this has units of :math:`\mathrm{s}^{-1}`. Note that this is effectively the contribution function without the ionization fraction. The goal of this function is to compute, per ion and as a function of wavelength, every quantity that is dependent on the atomic physics. The ionization fraction is not included here because it can, in principle, be time-dependent and thus must be calculated later using the results of the field-aligned model. Parameters ---------- ion: `fiasco.Ion` Ion instance for which to compute bound-bound line emission. transition: `str`, optional Optionally return only a single transition. The reason for having this is it is faster to read out the emissivity for just one transition rather than reading out the whole array and then slicing. """ root = zarr.open(store=self.emissivity_table_filename, mode='a') if root.get(f'{ion.ion_name}/line') is None: wavelength, label, emissivity = self._calculate_line_emissivity(ion) grp = root.create_group(f'{ion.ion_name}/line') ds = grp.create_array('wavelength', data=wavelength.value) ds.attrs['unit'] = wavelength.unit.to_string() ds = grp.create_array('label', data=label) ds = grp.create_array('emissivity', data=emissivity.value) ds.attrs['unit'] = emissivity.unit.to_string() if transition is not None: label = self._get_quantity_from_emissivity_table(ion, 'line/label') idx = np.s_[:,:] + np.where(label==transition) else: idx = np.s_[:,:,:] wavelength = self._get_quantity_from_emissivity_table(ion, 'line/wavelength', slice=idx[-1]) emissivity = self._get_quantity_from_emissivity_table(ion, 'line/emissivity', slice=idx) return wavelength, emissivity
def _calculate_continuum_emissivity(self, ion): wavelength = np.arange(1,1000,0.1) * u.AA ph_per_erg = u.photon / wavelength.to('erg', equivalencies=u.equivalencies.spectral()) # NOTE: It may seem silly to be reimplementing the existing free-free and free-bound # methods on the collection object, but this is necessary to avoid multiplying by the # ionization fraction. try: ff = ion.free_free(wavelength) except MissingDatasetException: self.log.warning(f'Cannot compute free-free emission for {ion.ion_name}') ff = u.Quantity(np.zeros(self.temperature.shape+wavelength.shape), 'erg cm3 s-1 Angstrom-1') try: fb = ion.free_bound(wavelength) except MissingDatasetException: self.log.warning(f'Cannot compute free-bound emission for {ion.ion_name}') fb = u.Quantity(np.zeros(self.temperature.shape+wavelength.shape), 'erg cm3 s-1 Angstrom-1') emissivity = ion.abundance*ion.proton_electron_ratio[..., np.newaxis]*(ff + fb)*ph_per_erg return wavelength, emissivity
[docs] def get_continuum_emissivity(self, ion): r""" Get continuum (free-free and free-bound) emissivity of a particular ion. The continuum emissivity in this case is given by, .. math:: C(T,\lambda) = \frac{n_H}{n_e}\mathrm{Ab}_X(C_{ff}(T,\lambda) + C_{fb}(T,\lambda))\frac{\lambda}{hc} where :math:`C_{ff},C_{fb}` are the free-free and free-bound emissivities of the ion as computed by `fiasco.Ion.free_free` and `fiasco.Ion.free_bound`, respectively. Parameters ---------- ion: `fiasco.Ion` Ion for which to calculate the continuum emissivity """ root = zarr.open(store=self.emissivity_table_filename, mode='a') if root.get(f'{ion.ion_name}/continuum') is None: wavelength, emissivity = self._calculate_continuum_emissivity(ion) grp = root.create_group(f'{ion.ion_name}/continuum') ds = grp.create_array('wavelength', data=wavelength.value) ds.attrs['unit'] = wavelength.unit.to_string() ds = grp.create_array('emissivity', data=emissivity.value) ds.attrs['unit'] = emissivity.unit.to_string() wavelength = self._get_quantity_from_emissivity_table(ion, 'continuum/wavelength') emissivity = self._get_quantity_from_emissivity_table(ion, 'continuum/emissivity') return wavelength, emissivity
[docs] @cache @u.quantity_input def calculate_narrowband_emissivity( self, ion, channel, ) -> u.Unit('cm5 DN sr s-1 pix-1'): r""" Calculate ion emissivity integrated over an instrument wavelength response function. Compute product between wavelength response :math:`R_c` for ``channel`` (:math:`c`) and emissivity for an ion of ionization stage :math:`k` from element :math:`X` in this emission model, .. math:: \epsilon_{c,X,k} = \sum_{\{ij\}_{X,k}}\epsilon_{ij}R_c(\lambda_{ij}) + \int\mathrm{d}\lambda\,R_c(\lambda)C(T,\lambda) This emissivity includes bound-bound, free-free, and free-bound emission as a function of temperature and density. .. note:: This explicitly does not include the temperature-dependent ionization fraction for each ion. The reason for excluding this here is that this quantity can, in principle, be time-dependent and thus must be evaluated for each strand. Parameters ---------- ion : `~fiasco.Ion` channel : Compatible with `sunkit_instruments.response.abstractions.AbstractChannel` Returns ------- emissivity: `~astropy.units.Quantity` Total emissivity as a function of temperature and density. """ wavelength = channel.wavelength response = channel.wavelength_response() f_interp = interp1d(wavelength, response, bounds_error=False, fill_value=0.0) # Bound-bound line emissivity transition_wavelengths, line_emissivity = self.get_line_emissivity(ion) response_interp = f_interp(transition_wavelengths.to_value(wavelength.unit)) * response.unit emissivity = np.dot(line_emissivity, response_interp) # Continuum emissivity continuum_wavelength, continuum_emissivity = self.get_continuum_emissivity(ion) response_interp = f_interp(continuum_wavelength.to_value(wavelength.unit)) * response.unit integrand = continuum_emissivity * response_interp em_continuum = trapezoid(y=integrand.value, x=continuum_wavelength.value, axis=-1) * integrand.unit*continuum_wavelength.unit emissivity += em_continuum[:, np.newaxis] return emissivity