"""
Some basic tools/utilities needed for active region construction. These functions are generally
peripheral to the actual physics.
"""
import astropy.constants as const
import astropy.units as u
import numpy as np
import sunpy.coordinates
import warnings
from astropy.coordinates import SkyCoord
from scipy.interpolate import RegularGridInterpolator
import synthesizAR
__all__ = [
'los_velocity',
'coord_in_fov',
'find_minimum_fov',
'from_pfsspack',
'from_pfsspy',
'change_obstime',
'change_obstime_frame',
'power_law_transform',
]
[docs]
@u.quantity_input
def los_velocity(v_xyz: u.cm/u.s, observer):
"""
Compute the LOS velocity for some observing angle. The sign of the result
is consistent with the convention that LOS velocity is :math:`>0` away from
the observer, i.e. red shifts are positive, blue shifts are negative
Parameters
----------
v_xyz : `~astropy.units.Quantity`
Cartesian velocity components in the Heliographic Stonyhurst coordinate
system, with shape ``(3,...)``
observer : `~astropy.coordinates.SkyCoord`
Heliographic Stonyhurst observer coordinate
"""
# NOTE: transform from HEEQ to HCC with respect to the instrument observer
Phi_0 = observer.lon.to(u.radian)
B_0 = observer.lat.to(u.radian)
v_los = v_xyz[2]*np.sin(B_0) + v_xyz[0]*np.cos(B_0)*np.cos(Phi_0) + v_xyz[1]*np.cos(B_0)*np.sin(Phi_0)
return -v_los
[docs]
def coord_in_fov(coord, width, height, center=None, bottom_left_corner=None):
# NOTE: this does not work for frames other than HPC
if center is None and bottom_left_corner is None:
raise ValueError('Must specify either center or bottom left corner')
if bottom_left_corner is None:
bottom_left_corner = SkyCoord(Tx=center.Tx-width/2,
Ty=center.Ty-height/2,
frame=center.frame)
coord = coord.transform_to(bottom_left_corner.frame)
top_right_corner = SkyCoord(Tx=bottom_left_corner.Tx+width,
Ty=bottom_left_corner.Ty+height,
frame=bottom_left_corner.frame)
in_x = np.logical_and(coord.Tx > bottom_left_corner.Tx, coord.Tx < top_right_corner.Tx)
in_y = np.logical_and(coord.Ty > bottom_left_corner.Ty, coord.Ty < top_right_corner.Ty)
return np.logical_and(in_x, in_y)
[docs]
def find_minimum_fov(coordinates):
"""
Given an HPC coordinate, find the coordinates of the corners of the
FOV that includes all of the coordinates.
"""
Tx = coordinates.Tx
Ty = coordinates.Ty
bottom_left_corner = SkyCoord(
Tx=Tx.min(),
Ty=Ty.min(),
frame=coordinates.frame
)
delta_x = Tx.max() - bottom_left_corner.Tx
delta_y = Ty.max() - bottom_left_corner.Ty
# Compute right corner after the fact to account for rounding in bin numbers
# NOTE: this is the coordinate of the top right corner of the top right corner pixel, NOT
# the coordinate at the center of the pixel!
top_right_corner = SkyCoord(
Tx=bottom_left_corner.Tx + delta_x,
Ty=bottom_left_corner.Ty + delta_y,
frame=coordinates.frame
)
return bottom_left_corner, top_right_corner
[docs]
def from_pfsspack(pfss_fieldlines):
"""
Convert fieldline coordinates output from the SSW package `pfss <http://www.lmsal.com/~derosa/pfsspack/>`_
into `~astropy.coordinates.SkyCoord` objects.
Parameters
----------
pfss_fieldlines : `~numpy.recarray`
Structure produced by reading pfss output with `~scipy.io.readsav`
Returns
-------
fieldlines : `list`
Each entry is a `tuple` containing a `~astropy.coordinates.SkyCoord` object and a
`~astropy.units.Quantity` object listing the coordinates and field strength along the loop.
"""
# Fieldline coordinates
num_fieldlines = pfss_fieldlines['ptr'].shape[0]
# Use HGC frame if possible
try:
frame = sunpy.coordinates.HeliographicCarrington(
obstime=sunpy.time.parse_time(pfss_fieldlines['now'].decode('utf-8')))
except ValueError:
warnings.warn('Assuming HGS frame because no date available for HGC frame')
frame = sunpy.coordinates.HeliographicStonyhurst()
fieldlines = []
for i in range(num_fieldlines):
# NOTE: For an unknown reason, there are a number of invalid points for each line output
# by pfss
n_valid = pfss_fieldlines['nstep'][i]
lon = (pfss_fieldlines['ptph'][i, :] * u.radian).to(u.deg)[:n_valid]
lat = 90 * u.deg - (pfss_fieldlines['ptth'][i, :] * u.radian).to(u.deg)[:n_valid]
radius = ((pfss_fieldlines['ptr'][i, :]) * const.R_sun.to(u.cm))[:n_valid]
coord = SkyCoord(lon=lon, lat=lat, radius=radius, frame=frame)
fieldlines.append(coord)
# Magnetic field strengths
lon_grid = (pfss_fieldlines['phi'] * u.radian - np.pi * u.radian).to(u.deg).value
lat_grid = (np.pi / 2. * u.radian - pfss_fieldlines['theta'] * u.radian).to(u.deg).value
radius_grid = pfss_fieldlines['rix'] * const.R_sun.to(u.cm).value
B_radius = pfss_fieldlines['br']
B_lat = pfss_fieldlines['bth']
B_lon = pfss_fieldlines['bph']
# Create interpolators
B_radius_interpolator = RegularGridInterpolator((radius_grid, lat_grid, lon_grid), B_radius,
bounds_error=False, fill_value=None)
B_lat_interpolator = RegularGridInterpolator((radius_grid, lat_grid, lon_grid), B_lat,
bounds_error=False, fill_value=None)
B_lon_interpolator = RegularGridInterpolator((radius_grid, lat_grid, lon_grid), B_lon,
bounds_error=False, fill_value=None)
# Interpolate values through each line
field_strengths = []
for f in fieldlines:
points = np.stack([f.spherical.distance.to(u.cm).value,
f.spherical.lat.to(u.deg).value,
f.spherical.lon.to(u.deg).value], axis=1)
b_r = B_radius_interpolator(points)
b_lat = B_lat_interpolator(points)
b_lon = B_lon_interpolator(points)
field_strengths.append(np.sqrt(b_r**2 + b_lat**2 + b_lon**2) * u.Gauss)
return [(l, b) for l, b in zip(fieldlines, field_strengths)]
[docs]
def change_obstime(coord, obstime):
"""
Change the obstime of a coordinate, including its observer.
"""
new_observer = coord.observer.replicate(obstime=obstime)
return SkyCoord(coord.replicate(observer=new_observer, obstime=obstime))
[docs]
def change_obstime_frame(frame, obstime):
"""
Change the obstime of a coordinate frame, including its observer.
"""
new_observer = frame.observer.replicate(obstime=obstime)
return frame.replicate_without_data(observer=new_observer, obstime=obstime)
[docs]
@u.quantity_input
def from_pfsspy(fieldlines,
n_min=0,
obstime=None,
length_min=20*u.Mm,
length_max=3e3*u.Mm,
name_template=None,
cross_sectional_area=None):
"""
Convert a `pfsspy.fieldline.FieldLines` structure into a list of `~synthesizAR.Strand` objects.
Parameters
----------
fieldlines: `pfsspy.fieldline.FieldLines`
n_min: `int`, optional
The minimum number of points required to keep a traced fieldline. This is often useful when
trying to filter out very small or erroneous fieldlines.
obstime: `astropy.time.Time`, optional
The desired obstime of the coordinates. Use this if the coordinates need to be at a
different obstime than that of the Carrington map they were traced from.
length_min : `astropy.units.Quantity`, optional
Minimum allowed loop length. All strands with length below this are excluded.
length_max : `astropy.units.Quantity`, optional
Maximum allowed loop length. All strands with length above this are excluded.
name_template: `str`, optional
Name template to use when building strands. Defaults to 'strand_{:06d}'
cross_sectional_area: `astropy.units.Quantity`, optional
The cross-sectional area to assign to each loop.
"""
from synthesizAR import log
if name_template is None:
name_template = 'strand_{:06d}'
strands = []
for i, f in enumerate(fieldlines):
if f.coords.shape[0] <= n_min:
log.debug(f'Dropping {f} as it has less than {n_min} points.')
continue
# NOTE: There are some lines along which we cannot find
# the field strength.
try:
if np.isnan(f.b_along_fline).all():
log.debug(f'Dropping {f} as field strength along strand is all NaN.')
continue
except IndexError:
# TODO: remember why this exception exists.
continue
b = np.sqrt((f.b_along_fline**2).sum(axis=1)) * u.G
# NOTE: redefine the coordinate at a new obstime. This is useful because the
# Carrington map that the coordinates were derived from has a single time for
# the entire Carrington rotation, but this is often not the time at which we
# need the coordinates defined. If deriving coordinates for a relatively small
# (e.g. AR-sized) patch on the Sun, this time should roughly correspond to the time
# at which the center of that patch crossed the central meridian.
if obstime is not None:
coord = change_obstime(f.coords, obstime)
else:
coord = f.coords
# Construct the loop here to easily filter on loop length and interpolate
# NaNs from the field strength.
strand = synthesizAR.Strand(name_template.format(i),
coord,
field_strength=b,
cross_sectional_area=cross_sectional_area)
if strand.length < length_min or strand.length > length_max:
log.debug(f'Dropping {strand} as it has length {strand.length} outside of the allowed range.')
continue
if np.any(np.isnan(strand.field_strength)):
# There are often NaN values that show up in the interpolated field strengths.
# Interpolate over these.
b = strand.field_strength
s = strand.field_aligned_coordinate
nans = np.isnan(b)
b[nans] = np.interp(s[nans].value, s[~nans].value, b[~nans].value) * b.unit
strand.field_strength = b
strands.append(strand)
return strands