__all__ = (
"hpid2_ra_dec",
"ra_dec2_hpid",
"healbin",
"_hpid2_ra_dec",
"_ra_dec2_hpid",
"_healbin",
"moc2array",
"hp_grow_argsort",
"_hp_grow_mask",
)
import warnings
from functools import lru_cache
import healpy as hp
import numpy as np
from .tree_utils import _build_tree, _xyz_from_ra_dec
def _hpid2_ra_dec(nside, hpids, **kwargs):
"""
Correct for healpy being silly and running dec from 0-180.
Parameters
----------
nside : int
Must be a value of 2^N.
hpids : np.array
Array (or single value) of healpixel IDs.
Returns
-------
ra_ret : float (or np.array)
RA positions of the input healpixel IDs. In radians.
dec_ret : float (or np.array)
Dec positions of the input healpixel IDs. In radians.
"""
lat, lon = hp.pix2ang(nside, hpids, **kwargs)
dec_ret = np.pi / 2.0 - lat
ra_ret = lon
return ra_ret, dec_ret
[docs]
def hpid2_ra_dec(nside, hpids, **kwargs):
"""
Correct for healpy being silly and running dec from 0-180.
Parameters
----------
nside : int
Must be a value of 2^N.
hpids : np.array
Array (or single value) of healpixel IDs.
Returns
-------
raRet : float (or np.array)
RA positions of the input healpixel IDs. In degrees.
decRet : float (or np.array)
Dec positions of the input healpixel IDs. In degrees.
"""
ra, dec = _hpid2_ra_dec(nside, hpids, **kwargs)
return np.degrees(ra), np.degrees(dec)
def _ra_dec2_hpid(nside, ra, dec, **kwargs):
"""
Assign ra,dec points to the correct healpixel.
Parameters
----------
nside : int
Must be a value of 2^N.
ra : np.array
RA values to assign to healpixels. Radians.
dec : np.array
Dec values to assign to healpixels. Radians.
Returns
-------
hpids : np.array
Healpixel IDs for the input positions.
"""
lat = np.pi / 2.0 - dec
hpids = hp.ang2pix(nside, lat, ra, **kwargs)
return hpids
[docs]
def ra_dec2_hpid(nside, ra, dec, **kwargs):
"""
Assign ra,dec points to the correct healpixel.
Parameters
----------
nside : int
Must be a value of 2^N.
ra : np.array
RA values to assign to healpixels. Degrees.
dec : np.array
Dec values to assign to healpixels. Degrees.
Returns
-------
hpids : np.array
Healpixel IDs for the input positions.
"""
return _ra_dec2_hpid(nside, np.radians(ra), np.radians(dec), **kwargs)
def _healbin(ra, dec, values, nside=128, reduce_func=np.mean, dtype=float, fill_val=hp.UNSEEN):
"""
Take arrays of ra's, dec's, and value and bin into healpixels.
Like numpy.hexbin but for bins on a sphere.
Parameters
----------
ra : np.array
RA positions of the data points. Radians.
dec : np.array
Dec positions of the data points. Radians
values : np.array
The values at each ra,dec position.
nside : int
Healpixel nside resolution. Must be a value of 2^N.
reduce_func : function (numpy.mean)
A function that will return a single value given a subset of
`values`.
dtype : dtype ('float')
Data type of the resulting mask
fill_val : float (hp.UNSEEN)
Value to fill in with healpixels that have no value.
Default is the healpy mask value.
Returns
-------
map_vals : np.array
A numpy array that is a valid Healpixel map.
"""
hpids = _ra_dec2_hpid(nside, ra, dec)
order = np.argsort(hpids)
hpids = hpids[order]
values = values[order]
pixids = np.unique(hpids)
left = np.searchsorted(hpids, pixids)
right = np.searchsorted(hpids, pixids, side="right")
map_vals = np.zeros(hp.nside2npix(nside), dtype=dtype) + fill_val
# Wow, I thought histogram would be faster than the loop, but
# this has been faster!
for i, idx in enumerate(pixids):
map_vals[idx] = reduce_func(values[left[i] : right[i]])
# Change any NaNs to fill value
map_vals[np.isnan(map_vals)] = fill_val
return map_vals
[docs]
def healbin(ra, dec, values, nside=128, reduce_func=np.mean, dtype=float, fill_val=hp.UNSEEN):
"""
Take arrays of ra's, dec's, and value and bin into healpixels.
Like numpy.hexbin but for bins on a sphere.
Parameters
----------
ra : np.array
RA positions of the data points. Degrees.
dec : np.array
Dec positions of the data points. Degrees.
values : np.array
The values at each ra,dec position.
nside : int
Healpixel nside resolution. Must be a value of 2^N.
reduce_func : function (numpy.mean)
A function that will return a single value given a subset
of `values`.
dtype : dtype ('float')
Data type of the resulting mask
fill_val : float (hp.UNSEEN)
Value to fill in with healpixels that have no value.
Default is the healpy mask value.
Returns
-------
mapVals : np.array
A numpy array that is a valid Healpixel map.
"""
return _healbin(
np.radians(ra),
np.radians(dec),
values,
nside=nside,
reduce_func=reduce_func,
dtype=dtype,
fill_val=fill_val,
)
[docs]
def moc2array(data, uniq, nside=128, reduce_func=np.sum, density=True, fill_val=0.0):
"""Convert a Multi-Order Coverage Map to a single nside HEALPix
array. Useful for converting maps output by LIGO alerts. Expect
that future versions of healpy or astropy will be able to replace
this functionality. Note that this is a convienence function that
will probably degrade portions of the MOC that are sampled at high
resolution.
Details of HEALPix Mulit-Order Coverage map:
http://ivoa.net/documents/MOC/20190404/PR-MOC-1.1-20190404.pdf
Parameters
----------
data : np.array
Data values for the MOC map
uniq : np.array
The UNIQ values for the MOC map
nside : int (128)
The output map nside
reduce_func : function (np.sum)
The function to use to combine data into single healpixels.
density : bool (True)
If True, multiplies data values by pixel area before applying
reduce_func, and divides the final array by the output pixel
area. Should be True if working on a probability density MOC.
fill_val : float (0.)
Value to fill empty HEALPixels with. Good choices include
0 (default), hp.UNSEEN, and np.nan.
Returns
-------
np.array : HEALpy array of nside. Units should be the same as
the input map as processed by reduce_func.
"""
# NUNIQ packing, from page 12 of
# http://ivoa.net/documents/MOC/20190404/PR-MOC-1.1-20190404.pdf
orders = np.floor(np.log2(uniq / 4) / 2).astype(int)
npixs = (uniq - 4 * 4**orders).astype(int)
nsides = 2**orders
names = ["ra", "dec", "area"]
types = [float] * len(names)
data_points = np.zeros(data.size, dtype=list(zip(names, types)))
for order in np.unique(orders):
good = np.where(orders == order)
ra, dec = _hpid2_ra_dec(nsides[good][0], npixs[good], nest=True)
data_points["ra"][good] = ra
data_points["dec"][good] = dec
data_points["area"][good] = hp.nside2pixarea(nsides[good][0])
if density:
tobin_data = data * data_points["area"]
else:
tobin_data = data
result = _healbin(
data_points["ra"],
data_points["dec"],
tobin_data,
nside=nside,
reduce_func=reduce_func,
fill_val=fill_val,
)
if density:
good = np.where(result != fill_val)
result[good] = result[good] / hp.nside2pixarea(nside)
return result
[docs]
def hp_grow_argsort(in_map, ignore_nan=True):
"""Find the maximum of a healpix map, then orders healpixels
by selecting the maximum bordering the selected area.
Parameters
----------
in_map : np.array
A valid HEALpix array
ignore_nan : bool (True)
If true, ignores values that are NaN
Returns
-------
ordered_hp : int array
The indices that put in_map in the correct order
"""
nside = hp.npix2nside(np.size(in_map))
npix = np.size(in_map)
pix_indx = np.arange(npix)
if ignore_nan:
not_nan_pix = ~np.isnan(in_map)
npix = np.size(in_map[not_nan_pix])
# Check if we have already run with this nside
if hasattr(hp_grow_argsort, "nside"):
nside_match = nside == hp_grow_argsort.nside
else:
nside_match = False
# If we already have neighbors chached, just use it
if nside_match:
neighbors = hp_grow_argsort.neighbors_cache
else:
# Running a new nside, or for the first time, compute
# neighbors and set attributes Make a `bool` area to keep
# track of which pixels still need to be sorted
neighbors = hp.get_all_neighbours(nside, pix_indx).T
hp_grow_argsort.neighbors_cache = neighbors
hp_grow_argsort.nside = nside
valid_neighbors_mask = np.ones(neighbors.shape, dtype=bool)
# Sometimes there can be no neighbors in some directions
valid_neighbors_mask[np.where(neighbors == -1)] = False
ordered_hp = np.zeros(npix, dtype=int)
current_max = np.where(in_map == np.nanmax(in_map))[0].min()
ordered_hp[0] = current_max
# Remove max from valid_neighbors. Can be clever with indexing
# so we don't have to do a brute force search of the entire
# neghbors array to mask it.
# valid_neighbors_mask[np.where(neighbors == current_max)] = False
current_neighbors = neighbors[current_max][valid_neighbors_mask[current_max]]
sub_indx = np.where(neighbors[current_neighbors] == current_max)
valid_neighbors_mask[(current_neighbors[sub_indx[0]], sub_indx[1])] = False
for i in np.arange(1, npix):
current_neighbors = neighbors[ordered_hp[0:i]][valid_neighbors_mask[ordered_hp[0:i]]]
indx = np.where(in_map[current_neighbors] == np.nanmax(in_map[current_neighbors]))[0]
if np.size(indx) == 0:
# We can't connect to any more pixels
warnings.warn("Can not connect to any more pixels.")
return ordered_hp[0:i]
else:
indx = np.min(indx)
current_max = current_neighbors[indx]
ordered_hp[i] = current_max
# current_max is no longer a valid neighbor to consider
neighbors_of_current = neighbors[current_max]
sub_indx = np.where(neighbors[neighbors_of_current] == current_max)
valid_neighbors_mask[(neighbors_of_current[sub_indx[0]], sub_indx[1])] = False
return ordered_hp
def _tree_for_mask(nside, scale=1000):
"""Build a kdTree for HEALpixels"""
ra, dec = _hpid2_ra_dec(nside, np.arange(hp.nside2npix(nside)))
tree = _build_tree(ra, dec, scale=scale)
return tree
@lru_cache(maxsize=10)
def _hp_grow_mask(nside, masked_indx_tuple, grow_size=np.radians(2.0), scale=1000):
"""Grow a HEALpix mask.
Would be nice if healpy.query_disc was vectorized, but here we are.
Parameters
----------
nside : int
HEALpix nside to use
masked_indx_tuple : `tuple`
Indices that are currently masked. Needs to be tuple of ints
so cache can hash it.
scale : `int`
Scale passed to _build_tree to maintain cross-platform repeatability.
Default 1000.
Returns
-------
out_array : `np.array`
The input mask with any masked pixels grown the correct amount
"""
# Set nside and kdTree as attributes so they will be cached for later
if not hasattr(_hp_grow_mask, "nside"):
_hp_grow_mask.nside = nside
_hp_grow_mask.tree = _tree_for_mask(nside, scale=scale)
_hp_grow_mask.scale = scale
if (_hp_grow_mask.nside != nside) | (_hp_grow_mask.scale != scale):
_hp_grow_mask.nside = nside
_hp_grow_mask.tree = _tree_for_mask(nside, scale=scale)
_hp_grow_mask.scale = scale
# Where are we masked
# Technically might be able to reach into the tree object and
# get the points that way and save an _hpid2_ra_dec call.
ra, dec = _hpid2_ra_dec(nside, np.arange(hp.nside2npix(nside))[list(masked_indx_tuple)])
x, y, z = _xyz_from_ra_dec(ra, dec)
# convert angular distance to Euclidian distance
dist = 2.0 * np.sin(grow_size / 2.0)
if scale is not None:
x = np.round(x * scale).astype(int)
y = np.round(y * scale).astype(int)
z = np.round(z * scale).astype(int)
dist = np.round(dist * scale).astype(int)
# If there are lots of points, may want to use query_ball_tree
# instead for speed.
lists_of_neighbors = _hp_grow_mask.tree.query_ball_point(np.vstack([x, y, z]).T, dist)
u_indx = np.unique(np.concatenate(lists_of_neighbors))
return u_indx