Source code for synthesizAR.instruments.sdo

"""
Class for the SDO/AIA instrument. Holds information about the cadence and
spatial and spectroscopic resolution.
"""
import warnings
import pkg_resources

import numpy as np
import asdf
import zarr
import astropy.units as u
from aiapy.response import Channel
from aiapy.psf import filter_mesh_parameters
from scipy.interpolate import interp1d, interpn

from synthesizAR.instruments import InstrumentBase
from synthesizAR.util.decorators import return_quantity_as_tuple

__all__ = ['InstrumentSDOAIA', 'aia_kernel_quick']

_TEMPERATURE_RESPONSE_FILE = pkg_resources.resource_filename(
    'synthesizAR', 'instruments/data/aia_temperature_response.asdf')
with asdf.open(_TEMPERATURE_RESPONSE_FILE, 'r', copy_arrays=True) as af:
    _TEMPERATURE_RESPONSE = af.tree


[docs] class InstrumentSDOAIA(InstrumentBase): """ Instrument object for the Atmospheric Imaging Assembly on the Solar Dynamics Observatory Parameters ---------- observing_time : `tuple` start and end of observing time observer : `~astropy.coordinates.SkyCoord` Examples -------- """ name = 'SDO_AIA' def __init__(self, observing_time, observer, **kwargs): self.channels = [ Channel(94*u.angstrom), Channel(131*u.angstrom), Channel(171*u.angstrom), Channel(193*u.angstrom), Channel(211*u.angstrom), Channel(335*u.angstrom), ] # Add the Gaussian width for the PSF convolution psf_params = filter_mesh_parameters(use_preflightcore=True) for c in self.channels: psf_width = psf_params[c.channel]['width'] c.psf_width = u.Quantity([psf_width, psf_width]) super().__init__(observing_time, observer, **kwargs) @property def cadence(self) -> u.s: return 12.0 * u.s @property def resolution(self) -> u.arcsec / u.pix: return [0.600698, 0.600698] * u.arcsec/u.pixel @property def observatory(self): return 'SDO' @property def detector(self): return 'AIA' @property def telescope(self): return 'SDO/AIA'
[docs] def get_instrument_name(self, channel): return f'{self.detector}_{channel.telescope_number}'
[docs] @staticmethod @return_quantity_as_tuple def calculate_intensity_kernel(loop, channel, **kwargs): em_model = kwargs.get('emission_model', None) if em_model: # Full intensity calculation using CHIANTI and the # wavelength response functions n = loop.density T = loop.electron_temperature Tn_flat = np.stack((T.value.flatten(), n.value.flatten()), axis=1) kernel = np.zeros(T.shape) # Get the group for this channel root = zarr.open(em_model.emissivity_table_filename, mode='r') grp = root[f'SDO_AIA/{channel.name}'] for ion in em_model: if ion.ion_name not in grp: warnings.warn(f'Not including contribution from {ion.ion_name}') continue ds = grp[ion.ion_name] em_ion = u.Quantity(ds, ds.attrs['unit']) # Interpolate wavelength-convolved emissivity to loop n,T em_flat = interpn( (em_model.temperature.to(T.unit).value, em_model.density.to(n.unit).value), em_ion.value, Tn_flat, method='linear', fill_value=None, bounds_error=False, ) em_ion_interp = np.reshape(em_flat, T.shape) em_ion_interp = u.Quantity(np.where(em_ion_interp < 0., 0., em_ion_interp), em_ion.unit) ionization_fraction = loop.get_ionization_fraction(ion) tmp = ion.abundance*0.83/(4*np.pi*u.steradian)*ionization_fraction*n*em_ion_interp if not hasattr(kernel, 'unit'): kernel = kernel*tmp.unit kernel += tmp else: # Use tabulated temperature respone functions kernel = aia_kernel_quick(channel.name, loop.electron_temperature, loop.density) return kernel
[docs] def convolve_emissivities(self, channel, emission_model, **kwargs): """ Compute product between wavelength response for `channel` and emissivity for all ions in an emission model. """ em_convolved = {} r = channel.wavelength_response(**kwargs) * channel.plate_scale f_interp = interp1d(channel.wavelength, r, bounds_error=False, fill_value=0.0) for ion in emission_model: wavelength, emissivity = emission_model.get_emissivity(ion) # TODO: need to figure out a better way to propagate missing emissivities if wavelength is None or emissivity is None: em_convolved[ion.ion_name] = None else: em_convolved[ion.ion_name] = np.dot(emissivity, f_interp(wavelength)) * r.unit return em_convolved
[docs] def observe(self, skeleton, save_directory=None, channels=None, **kwargs): em_model = kwargs.get('emission_model') if em_model: # TODO: skip if the file already exists? # If using an emission model, we want to first convolve the wavelength-dependent # emissivities with the wavelength response functions and store them in the # emissivity table channels = self.channels if channels is None else channels # NOTE: Don't open with 'w' because we want to preserve the emissivity table root = zarr.open(store=em_model.emissivity_table_filename, mode='a') if self.name not in root: grp = root.create_group(self.name) else: grp = root[self.name] # Get options for wavelength response include_crosstalk = kwargs.pop('include_crosstalk', True) obstime = self.observer.obstime if kwargs.pop('include_degradation', False) else None include_eve_correction = kwargs.pop('include_eve_correction', False) for channel in channels: em_convolved = self.convolve_emissivities( channel, em_model, include_crosstalk=include_crosstalk, obstime=obstime, include_eve_correction=include_eve_correction, ) if channel.name in grp: chan_grp = grp[channel.name] else: chan_grp = grp.create_group(channel.name) for k in em_convolved: # NOTE: update dataset even when it already exists if k in chan_grp: ds = chan_grp[k] ds[:, :] = em_convolved[k].value else: ds = chan_grp.create_dataset(k, data=em_convolved[k].value) ds.attrs['unit'] = em_convolved[k].unit.to_string() return super().observe(skeleton, save_directory=save_directory, channels=channels, **kwargs)
[docs] @u.quantity_input def aia_kernel_quick(channel, temperature: u.K, density: u.cm**(-3)) -> u.Unit('ct pix-1 s-1 cm-1'): r""" Calculate AIA intensity kernel for a given channel Compute the integrand of the AIA intensity integral, .. math:: p_c = \int\mathrm{d}h\,K_c(T_e)n_e^2 by interpolating the tabulated response curve to ``temperature`` and multiplying by the square of ``density``. Parameters ---------- channel : `str` Name of the AIA channel temperature : `~astropy.units.Quantity` density : `astropy.units.Quantity` """ T, K = _TEMPERATURE_RESPONSE['temperature'], _TEMPERATURE_RESPONSE[channel] return np.interp(temperature, T, K) * density**2