Source code for synthesizAR.util.util

"""
Some basic tools/utilities needed for active region construction. These functions are generally
peripheral to the actual physics.
"""
from collections import namedtuple
import warnings

import numpy as np
from scipy.interpolate import RegularGridInterpolator
import astropy.units as u
from astropy.coordinates import SkyCoord
import astropy.constants as const
import sunpy.coordinates
import sunpy.sun.constants as sun_const

import synthesizAR

__all__ = [
    'SpatialPair',
    'los_velocity',
    'coord_in_fov',
    'find_minimum_fov',
    'is_visible',
    'from_pfsspack',
    'from_pfsspy',
    'change_obstime',
    'change_obstime_frame',
]


SpatialPair = namedtuple('SpatialPair', 'x y z')


[docs] @u.quantity_input def los_velocity(v_xyz: u.cm/u.s, observer): """ Compute the LOS velocity for some observing angle. The sign of the result is consistent with the convention that LOS velocity is :math:`>0` away from the observer, i.e. red shifts are positive, blue shifts are negative Parameters ---------- v_xyz : `~astropy.units.Quantity` Cartesian velocity components in the Heliographic Stonyhurst coordinate system, with shape ``(3,...)`` observer : `~astropy.coordinates.SkyCoord` Heliographic Stonyhurst observer coordinate """ # NOTE: transform from HEEQ to HCC with respect to the instrument observer Phi_0 = observer.lon.to(u.radian) B_0 = observer.lat.to(u.radian) v_los = v_xyz[2]*np.sin(B_0) + v_xyz[0]*np.cos(B_0)*np.cos(Phi_0) + v_xyz[1]*np.cos(B_0)*np.sin(Phi_0) return -v_los
[docs] def coord_in_fov(coord, width, height, center=None, bottom_left_corner=None): # NOTE: this does not work for frames other than HPC if center is None and bottom_left_corner is None: raise ValueError('Must specify either center or bottom left corner') if bottom_left_corner is None: bottom_left_corner = SkyCoord(Tx=center.Tx-width/2, Ty=center.Ty-height/2, frame=center.frame) coord = coord.transform_to(bottom_left_corner.frame) top_right_corner = SkyCoord(Tx=bottom_left_corner.Tx+width, Ty=bottom_left_corner.Ty+height, frame=bottom_left_corner.frame) in_x = np.logical_and(coord.Tx > bottom_left_corner.Tx, coord.Tx < top_right_corner.Tx) in_y = np.logical_and(coord.Ty > bottom_left_corner.Ty, coord.Ty < top_right_corner.Ty) return np.logical_and(in_x, in_y)
[docs] def find_minimum_fov(coordinates, padding=None): """ Given an HPC coordinate, find the coordinates of the corners of the FOV that includes all of the coordinates. """ if padding is None: padding = [0, 0] * u.arcsec Tx = coordinates.Tx Ty = coordinates.Ty bottom_left_corner = SkyCoord( Tx=Tx.min() - padding[0], Ty=Ty.min() - padding[1], frame=coordinates.frame ) delta_x = Tx.max() + padding[0] - bottom_left_corner.Tx delta_y = Ty.max() + padding[1] - bottom_left_corner.Ty # Compute right corner after the fact to account for rounding in bin numbers # NOTE: this is the coordinate of the top right corner of the top right corner pixel, NOT # the coordinate at the center of the pixel! top_right_corner = SkyCoord( Tx=bottom_left_corner.Tx + delta_x, Ty=bottom_left_corner.Ty + delta_y, frame=coordinates.frame ) return bottom_left_corner, top_right_corner
[docs] def is_visible(coords, observer): """ Create mask of coordinates not blocked by the solar disk. Parameters ---------- coords : `~astropy.coordinates.SkyCoord` Helioprojective oordinates of the object(s) of interest observer : `~astropy.coordinates.SkyCoord` Heliographic-Stonyhurst Location of the observer """ theta_x = coords.Tx theta_y = coords.Ty distance = coords.distance rsun_obs = ((sun_const.radius / (observer.radius - sun_const.radius)).decompose() * u.radian).to(u.arcsec) off_disk = np.sqrt(theta_x**2 + theta_y**2) > rsun_obs in_front_of_disk = distance - observer.radius < 0. return np.any(np.stack([off_disk, in_front_of_disk], axis=1), axis=1)
[docs] def from_pfsspack(pfss_fieldlines): """ Convert fieldline coordinates output from the SSW package `pfss <http://www.lmsal.com/~derosa/pfsspack/>`_ into `~astropy.coordinates.SkyCoord` objects. Parameters ---------- pfss_fieldlines : `~numpy.recarray` Structure produced by reading pfss output with `~scipy.io.readsav` Returns ------- fieldlines : `list` Each entry is a `tuple` containing a `~astropy.coordinates.SkyCoord` object and a `~astropy.units.Quantity` object listing the coordinates and field strength along the loop. """ # Fieldline coordinates num_fieldlines = pfss_fieldlines['ptr'].shape[0] # Use HGC frame if possible try: frame = sunpy.coordinates.HeliographicCarrington( obstime=sunpy.time.parse_time(pfss_fieldlines['now'].decode('utf-8'))) except ValueError: warnings.warn('Assuming HGS frame because no date available for HGC frame') frame = sunpy.coordinates.HeliographicStonyhurst() fieldlines = [] for i in range(num_fieldlines): # NOTE: For an unknown reason, there are a number of invalid points for each line output # by pfss n_valid = pfss_fieldlines['nstep'][i] lon = (pfss_fieldlines['ptph'][i, :] * u.radian).to(u.deg)[:n_valid] lat = 90 * u.deg - (pfss_fieldlines['ptth'][i, :] * u.radian).to(u.deg)[:n_valid] radius = ((pfss_fieldlines['ptr'][i, :]) * const.R_sun.to(u.cm))[:n_valid] coord = SkyCoord(lon=lon, lat=lat, radius=radius, frame=frame) fieldlines.append(coord) # Magnetic field strengths lon_grid = (pfss_fieldlines['phi'] * u.radian - np.pi * u.radian).to(u.deg).value lat_grid = (np.pi / 2. * u.radian - pfss_fieldlines['theta'] * u.radian).to(u.deg).value radius_grid = pfss_fieldlines['rix'] * const.R_sun.to(u.cm).value B_radius = pfss_fieldlines['br'] B_lat = pfss_fieldlines['bth'] B_lon = pfss_fieldlines['bph'] # Create interpolators B_radius_interpolator = RegularGridInterpolator((radius_grid, lat_grid, lon_grid), B_radius, bounds_error=False, fill_value=None) B_lat_interpolator = RegularGridInterpolator((radius_grid, lat_grid, lon_grid), B_lat, bounds_error=False, fill_value=None) B_lon_interpolator = RegularGridInterpolator((radius_grid, lat_grid, lon_grid), B_lon, bounds_error=False, fill_value=None) # Interpolate values through each line field_strengths = [] for f in fieldlines: points = np.stack([f.spherical.distance.to(u.cm).value, f.spherical.lat.to(u.deg).value, f.spherical.lon.to(u.deg).value], axis=1) b_r = B_radius_interpolator(points) b_lat = B_lat_interpolator(points) b_lon = B_lon_interpolator(points) field_strengths.append(np.sqrt(b_r**2 + b_lat**2 + b_lon**2) * u.Gauss) return [(l, b) for l, b in zip(fieldlines, field_strengths)]
[docs] def change_obstime(coord, obstime): """ Change the obstime of a coordinate, including its observer. """ new_observer = coord.observer.replicate(obstime=obstime) return SkyCoord(coord.replicate(observer=new_observer, obstime=obstime))
[docs] def change_obstime_frame(frame, obstime): """ Change the obstime of a coordinate frame, including its observer. """ new_observer = frame.observer.replicate(obstime=obstime) return frame.replicate_without_data(observer=new_observer, obstime=obstime)
[docs] @u.quantity_input def from_pfsspy(fieldlines, n_min=0, obstime=None, length_min=20*u.Mm, length_max=3e3*u.Mm, name_template=None, cross_sectional_area=None): """ Convert a `pfsspy.fieldline.FieldLines` structure into a list of `~synthesizAR.Loop` objects. Parameters ---------- fieldlines: `pfsspy.fieldline.FieldLines` n_min: `int`, optional The minimum number of points required to keep a traced fieldline. This is often useful when trying to filter out very small or erroneous fieldlines. obstime: `astropy.time.Time`, optional The desired obstime of the coordinates. Use this if the coordinates need to be at a different obstime than that of the Carrington map they were traced from. length_min : `astropy.units.Quantity`, optional Minimum allowed loop length. All loops with length below this are excluded. length_max : `astropy.units.Quantity`, optional Maximum allowed loop length. All loops with length above this are excluded. name_template: `str`, optional Name template to use when building loops. Defaults to 'loop_{:06d}' cross_sectional_area: `astropy.units.Quantity`, optional The cross-sectional area to assign to each loop. """ from synthesizAR import log if name_template is None: name_template = 'loop_{:06d}' loops = [] for i, f in enumerate(fieldlines): if f.coords.shape[0] <= n_min: log.debug(f'Dropping {f} as it has less than {n_min} points.') continue # NOTE: There are some lines along which we cannot find # the field strength. try: if np.isnan(f.b_along_fline).all(): log.debug(f'Dropping {f} as field strength along strand is all NaN.') continue except IndexError: # TODO: remember why this exception exists. continue b = np.sqrt((f.b_along_fline**2).sum(axis=1)) * u.G # NOTE: redefine the coordinate at a new obstime. This is useful because the # Carrington map that the coordinates were derived from has a single time for # the entire Carrington rotation, but this is often not the time at which we # need the coordinates defined. If deriving coordinates for a relatively small # (e.g. AR-sized) patch on the Sun, this time should roughly correspond to the time # at which the center of that patch crossed the central meridian. if obstime is not None: coord = change_obstime(f.coords, obstime) else: coord = f.coords # Construct the loop here to easily filter on loop length and interpolate # NaNs from the field strength. loop = synthesizAR.Loop(name_template.format(i), coord, field_strength=b, cross_sectional_area=cross_sectional_area) if loop.length < length_min or loop.length > length_max: log.debug(f'Dropping {loop} as it has length {loop.length} outside of the allowed range.') continue if np.any(np.isnan(loop.field_strength)): # There are often NaN values that show up in the interpolated field strengths. # Interpolate over these. b = loop.field_strength s = loop.field_aligned_coordinate nans = np.isnan(b) b[nans] = np.interp(s[nans].value, s[~nans].value, b[~nans].value) * b.unit loop.field_strength = b loops.append(loop) return loops