import os
import warnings
from collections import OrderedDict
import numpy as np
import astropy.units as u
from astropy.utils.exceptions import AstropyWarning, AstropyUserWarning
from . import utils
__all__ = ["TemplateError", "Template", "Redden", "ModifiedBlackBody",
"read_templates_file", "load_phoenix_stars",
"bspline_templates", "gaussian_templates"]
[docs]
class TemplateError(object):
def __init__(self, file='templates/TEMPLATE_ERROR.eazy_v1.0', arrays=None, filter_wavelengths=[5500.], scale=1.):
"""
Template error function with spline interpolation at arbitrary redshift.
Parameters
----------
file : str
File containing the template error function definition
(columns of wavelength in Angstroms and the TEF).
arrays : optional, (wave, TEF)
Set from arrays rather than reading from ``file``.
filter_wavelengths : list
List of filter pivot wavelengths (observed-frame Angstroms).
scale : float
Scale factor multiplied to TEF array, e.g., the ``TEMP_ERR_A2``
parameter.
Attributes
----------
te_x, te_y : arrays
The input wavelength and TEF arrays.
min_wavelength, min_wavelength : float
Min/max of the wavelengths in ``te_x``.
clip_lo, clip_hi : float
Extrapolation limits to use if redshifted filters fall outside
defined ``te_x`` array
"""
self.file = file
if arrays is None:
self.te_x, self.te_y = np.loadtxt(file, unpack=True)
else:
self.te_x, self.te_y = arrays
self.scale = scale
self.filter_wavelengths = filter_wavelengths
self._set_limits()
self._init_spline()
def _set_limits(self):
"""
Limits to control extrapolation
"""
nonzero = self.te_y > 0
self.min_wavelength = self.te_x[nonzero].min()
self.max_wavelength = self.te_x[nonzero].max()
self.clip_lo = self.te_y[nonzero][0]
self.clip_hi = self.te_y[nonzero][-1]
def _init_spline(self):
"""
Initialize the CubicSpline interpolator
"""
from scipy import interpolate
self._spline = interpolate.CubicSpline(self.te_x, self.te_y)
[docs]
def interpolate(self, filter_wavelength=5500., z=1.):
"""
``filter_wavelength`` is observed wavelength of photometric filters.
But these sample the *rest* wavelength of the template error function
at lam/(1+z)
"""
return self._spline(filter_wavelength/(1+z))*self.scale
[docs]
def __call__(self, z, limits=None):
"""
Interpolate TEF arrays at a specific redshift
Parameters
----------
z : float
Redshift
limits : None, (float, float)
Extrapolation limits. If not specified, get from
``clip_lo`` and ``clip_hi`` attributes.
"""
lcz = np.atleast_1d(self.filter_wavelengths)/(1+z)
tef_z = self._spline(np.atleast_1d(self.filter_wavelengths)/(1+z))
if limits is None:
limits = [self.clip_lo, self.clip_hi]
clip_lo = (lcz < self.min_wavelength)
tef_z[clip_lo] = limits[0]
clip_hi = (lcz > self.max_wavelength)
tef_z[clip_hi] = limits[1]
return tef_z*self.scale
[docs]
class Redden(object):
def __init__(self, model=None, Av=0., **kwargs):
"""
Wrapper function for `dust_attenuation` and `dust_extinction`
reddening laws
.. plot::
:include-source:
import numpy as np
import matplotlib.pyplot as plt
from eazy.templates import Redden
fig, ax = plt.subplots(1,1,figsize=(6,4))
wave = np.arange(1200, 2.e4)
for model in ['calzetti00', 'mw', 'smc', 'reddy15']:
redfunc = Redden(model=model, Av=1.0)
ax.plot(wave, redfunc(wave), label=model)
ax.plot(wave, wave*0+10**(-0.4), color='k',
label=r'$A_\lambda = 1$', linestyle=':')
ax.legend()
ax.loglog()
ax.set_xticks([2000, 5000, 1.e4])
ax.set_xticklabels([0.2, 0.5, 1.0])
ax.grid()
ax.set_xlabel('wavelength, microns')
ax.set_ylabel('Attenuation / extinction (Av=1 mag)')
fig.tight_layout(pad=0.5)
Parameters
----------
model : `extinction`/`attenuation` object or str
Allowable string arguments:
- 'smc': `dust_extinction.averages.G03_SMCBar`
- 'lmc': `dust_extinction.averages.G03_LMCAvg`
- 'mw','f99': `dust_extinction.parameter_averages.F99`
- 'calzetti00', 'c00': `dust_attenuation.averages.C00`
- 'wg00': `dust_attenuation.radiative_transfer.WG00`
- 'kc13': Calzetti with modified slope and dust bump from
Kriek & Conroy (2013)
- 'reddy15': Reddy et al. (2015)
Av : float
Selective extinction/attenuation (passed as `tau_V` for ``WG00``)
"""
allowed = ['smc', 'lmc', 'mw', 'f99', 'c00', 'calzetti00', 'wg00',
'kc13','reddy15','zafar15']
if isinstance(model, str):
self.model_name = model
if model in ['smc']:
from dust_extinction.averages import G03_SMCBar
self.model = G03_SMCBar()
elif model in ['lmc']:
from dust_extinction.averages import G03_LMCAvg
self.model = G03_LMCAvg()
elif model in ['mw','f99']:
from dust_extinction.parameter_averages import F99
self.model = F99()
elif model in ['calzetti00', 'c00']:
from dust_attenuation.averages import C00
self.model = C00(Av=Av)
elif model.lower() in ['kc13']:
from eazy.sps import KC13
self.model = KC13(Av=Av, **kwargs)
elif model.lower() in ['reddy15']:
from eazy.sps import Reddy15
self.model = Reddy15(Av=Av, **kwargs)
elif model.lower() in ['zafar15']:
from eazy.sps import Zafar15
self.model = Zafar15(Av=Av)
elif model in ['wg00']:
from dust_attenuation.radiative_transfer import WG00
if 'tau_V' in kwargs:
self.model = WG00(**kwargs)
else:
self.model = WG00(tau_V=Av, **kwargs)
else:
msg = "Requested model ('{model}') not in {allowed}."
raise IOError(msg.format(model=model, allowed=allowed))
else:
self.model = model
self.model_name = 'Unknown'
for k in ['Av', 'tau_V']:
if hasattr(model, k):
Av = getattr(model, k)
break
self.Av = Av
@property
def ebv(self):
"""
E(B-V) for models that have ``Rv``
"""
if hasattr(self.model, 'Rv'):
return self.Av/self.model.Rv
else:
print('Warning: Rv not defined for model: ' + self.__repr__())
return 0.
def __repr__(self):
msg = '<Redden {0}, Av/tau_V={1}>'
return msg.format(self.model.__repr__(), self.Av)
[docs]
def __call__(self, wave, left=0, right=1., **kwargs):
"""
Return reddening factor.
Parameters
----------
wave : array (NW)
Wavelength array. If has no units, assume
`~astropy.units.Angstrom`.
left, right : float
Extrapolation at short/long wavelengths
Returns
-------
ext : array (NW)
Extinction / attenuation as a function of wavelength
"""
if not hasattr(wave, 'unit'):
xu = wave*u.Angstrom
else:
if wave.unit is None:
xu.unit = u.Angstrom
else:
xu = wave
if 'Av' in kwargs:
self.Av = kwargs['Av']
if 'tau_V' in kwargs:
self.Av = kwargs['tau_V']
for k in kwargs:
if hasattr(self.model, k):
setattr(self.model, k, kwargs[k])
ext = np.atleast_1d(np.ones_like(xu.value))
if hasattr(self.model, 'x_range'):
if hasattr(self.model, 'extinguish'):
# dust_extinction has x_range in 1/micron
xblue = (1./xu.to(u.micron)).value > self.model.x_range[1]
xred = (1./xu.to(u.micron)).value < self.model.x_range[0]
else:
# dust_attenuation has x_range in micron
xblue = (xu.to(u.micron)).value < self.model.x_range[0]
xred = (xu.to(u.micron)).value > self.model.x_range[1]
ext[xblue] = left
ext[xred] = right
xr = (~xblue) & (~xred)
else:
xr = np.isfinite(wave)
if (self.model is None) | (self.Av <= 0):
# Don't do anything
pass
elif hasattr(self.model, 'extinguish'):
# extinction
ext[xr] = self.model.extinguish(xu[xr], Av=self.Av)
elif hasattr(self.model, 'attenuate'):
# attenuation
if hasattr(self.model, 'tau_V'):
# WG00
self.model.tau_V = self.Av
else:
self.model.Av = self.Av
ext[xr] = self.model.attenuate(xu[xr])
else:
msg = ('Dust model must have either `attenuate` or `extinguish`' +
' method.')
raise IOError(msg)
if hasattr(wave, '__len__'):
return ext
elif ext.size == 1:
return ext[0]
else:
return ext
[docs]
def read_templates_file(templates_file=None, as_dict=False, **kwargs):
"""
Read templates listed in ``templates_file``.
Parameters
----------
templates_file : str
Filename of the ascii file containing the templates list. Has format
like
.. code::
1 templates/fsps_full/tweak_fsps_QSF_12_v3_001.dat 1.0
2 templates/fsps_full/tweak_fsps_QSF_12_v3_002.dat 1.0
...
N {path} {scale}
where ``scale`` is the factor needed to scale the template wavelength
array to units of Angstroms.
as_dict : bool
Return dictionary rather than a list (e.g., for `grizli`).
kwargs : dict
Extra keyword arguments are passed to `~eazy.templates.Template`
with ``file`` and ``to_angstrom`` keywords set automatically.
Returns
-------
templates : list
List of `eazy.templates.Template` objects (`dict` if ``as_dict``)
"""
lines = open(templates_file).readlines()
templates = []
for line in lines:
if line.strip().startswith('#'):
continue
lspl = line.split()
template_file = lspl[1]
if len(lspl) > 2:
to_angstrom = float(lspl[2])
else:
to_angstrom = 1.
templ = Template(file=template_file, to_angstrom=to_angstrom,
**kwargs)
templates.append(templ)
if as_dict:
tdict = OrderedDict()
for t in templates:
tdict[t.name] = t
return tdict
else:
return templates
[docs]
class Template():
def __init__(self, file=None, name=None, arrays=None, sp=None, meta={}, to_angstrom=1., velocity_smooth=0, norm_filter=None, resample_wave=None, fits_column='flux', redfunc=Redden(), redshifts=[0], verbose=True, flux_unit=(u.L_sun/u.Angstrom), **kwargs):
"""
Template object.
Can optionally specify a 2D flux array with the first
dimension indicating the template for the nearest redshift in the
corresponding ``redshifts`` list. See When integrating the
filter fluxes with ``integrate_filter``, the template index with the
redshift nearest to the specified redshift will be used.
Parameters
----------
file : str
Filename of ascii or FITS template
arrays : (array, array)
Tuple of ``wave``, ``flux`` arrays. Here ``flux`` assumed to
have units f-lambda.
sp : object
Object with ``wave``, ``flux`` attributes, e.g., from
``prospector``. Here ``flux`` is assumed to have units of f-nu.
to_angstrom : float
Scale factor such that ``wave * to_angstrom`` has units of
`astropy.units.Angstrom`
velocity_smooth : float
Velocity smooothing in km/s, applied if > 0
resample_wave : array
Grid to resample the template wavelengths read from the input
fits_column : str
Column name of the flux column if arrays read from a ``file``
redfunc : `eazy.templates.Redden`
Object to apply additional reddening.
redshifts : array-like
Redshift grid for redshift-dependent templates
flux_unit : `astropy.units.core.Unit`
Units of ``flux`` array.
Attributes
----------
wave : array
wavelength in `astropy.units.Angstrom`, dimensions ``[NWAVE]``.
flux : array
Flux density f-lambda, can have redshift dependence, dimensions
``[NZ, NWAVE]``.
name : str
Label name
meta : dict
Metadata
redfunc : `eazy.templates.Redden`, optional
Object for applying dust reddening.
"""
import copy
from astropy.table import Table
import astropy.units as u
self.wave = None
self.flux = None
self.flux_unit = flux_unit
self.name = 'None'
self.meta = copy.deepcopy(meta)
self.velocity_smooth = velocity_smooth
if name is None:
if file is not None:
self.name = os.path.basename(file)
else:
self.name = name
self.orig_table = None
if sp is not None:
# Prospector
self.wave = np.cast[float](sp.wave)
self.flux = np.cast[float](sp.flux)
# already fnu
self.flux *= utils.CLIGHT*1.e10 / self.wave**2
elif file is not None:
# Read from a file
if file.split('.')[-1] in ['fits','csv','ecsv']:
tab = Table.read(file)
self.wave = tab['wave'].data.astype(float)
if fits_column not in tab.colnames:
msg = (f"'{fits_column}' not in {file}; " +
f"available columns are {tab.colnames}.")
raise ValueError(msg)
self.flux = tab[fits_column].data.astype(float)
self.orig_table = tab
if hasattr(tab[fits_column], 'unit'):
if tab[fits_column].unit is not None:
self.flux_unit = tab[fits_column].unit
# Transpose because FITS tables stored like NWAVE, NZ
if self.flux.ndim == 2:
self.flux = self.flux.T
for k in tab.meta:
self.meta[k] = tab.meta[k]
else:
_arr = np.loadtxt(file, unpack=True)
self.wave, self.flux = _arr[0], _arr[1]
elif arrays is not None:
self.wave, self.flux = arrays[0]*1., arrays[1]*1.
if arrays[0].shape[0] != np.atleast_2d(arrays[1]).shape[1]:
raise ValueError("Array dimensions don't match: "+
f'arrays[0]: {arrays[0].shape}, '+
f'arrays[1]: {arrays[1].shape}, ')
if hasattr(self.flux, 'unit'):
self.flux_unit = self.flux.unit
if hasattr(self.flux, 'value'):
self.flux = self.flux.value
#self.set_fnu()
else:
raise TypeError('Must specify either `sp`, `file` or `arrays`')
if self.flux.ndim == 1:
# For redshift dependence
self.flux = np.atleast_2d(self.flux)
self.redshifts = np.zeros(1)
self.NZ, self.NWAVE = self.flux.shape
else:
self.NZ, self.NWAVE = self.flux.shape
if 'NZ' in self.meta:
redshifts = [self.meta[f'Z{j}']
for j in range(self.meta['NZ'])]
if len(redshifts) != self.NZ:
msg = (f'redshifts ({len(redshifts)})'
f' doesn\'t match flux dimension ({self.NZ})!')
raise ValueError(msg)
self.redshifts = np.array(redshifts)
# if verbose:
# print(f'Redshift dependent! (NZ={self.NZ})')
# Handle optional units
if hasattr(self.wave, 'unit'):
if self.wave.unit is not None:
self.wave = self.wave.to(u.Angstrom).value
else:
self.wave = self.wave.data
else:
self.wave *= to_angstrom
flam_unit = u.erg/u.second/u.cm**2/u.Angstrom
if hasattr(self.flux, 'unit'):
if self.flux.unit is not None:
equiv = u.equivalencies.spectral_density(self.wave*u.Angstrom)
flam = self.flux.to(flam_unit, equivalencies=equiv)
self.flux = flam.value
else:
self.flux = self.flux.data
# Smoothing
if velocity_smooth > 0:
self.smooth_velocity(velocity_smooth, in_place=True)
# Resampling
self.resample(resample_wave, in_place=True)
#self.set_fnu()
# Reddening function
self.redfunc = redfunc
_red = self.redden # test to break at init if fails
def __repr__(self):
if self.name is None:
return self.__class__
else:
return '{0}: {1}'.format(self.__class__, self.name)
[docs]
def absorbed_energy(self, i=0):
diff = self.flux[i,:]*(1-self.redden)*(self.redden > 0)
absorbed = np.trapz(diff, self.wave)
return absorbed
# if self.NZ == 1:
# return absorbed[0]
# else:
# return absorbed
@property
def redden(self):
"""
Return multiplicative scaling from `self.redfunc`, which is expected
to return attenuation in magnitudes.
"""
if self.redfunc is not None:
red = self.redfunc(self.wave*u.Angstrom)
else:
red = 1.
return red
@property
def shape(self):
"""
Shape of flux attribute
"""
return self.flux.shape
[docs]
def flux_flam(self, iz=0, z=None, redshift_type='nearest'):
"""
Get redshift-dependent template in units of f-lambda
Parameters
----------
iz : int
Index of template to retrieve
z : float, None
If specified, get the redshift index with
`~eazy.templates.Template.zindex`.
redshift_type : 'nearest', 'interp'
See `~eazy.templates.Template.zindex`.
Returns
-------
flam : array
Template flux density in units of f-lambda, including any
reddening specified in the ``redden`` attribute.
"""
if z is not None:
if redshift_type == 'interp':
iz, frac = self.zindex(z=z, redshift_type=redshift_type)
if frac == 1:
flam = self.flux[iz,:]
else:
flam = frac*self.flux[iz,:]
flam += (1-frac)*self.flux[iz+1,:]
else:
iz = self.zindex(z=z, redshift_type=redshift_type)
flam = self.flux[iz,:]
else:
flam = self.flux[iz,:]
return flam * self.redden
[docs]
def flux_fnu(self, iz=0, z=None, redshift_type='nearest'):
"""
Get redshift-dependent template in units of f-nu
Parameters
----------
iz : int
Index of template to retrieve
z : float, None
If specified, get the redshift index with
`~eazy.templates.Template.zindex`.
redshift_type : str
See `~eazy.templates.Template.zindex`.
Returns
-------
fnu : array
Template flux density in units of f-nu, including any
reddening specified in the ``redden`` attribute.
"""
flam = self.flux_flam(iz=iz, z=z, redshift_type=redshift_type)
return (flam * self.wave**2 / (utils.CLIGHT*1.e10))
[docs]
def set_fnu(self):
"""
Deprecated. ``flux_fnu`` is now a more cmoplicated function.
"""
print('Deprecated. ``flux_fnu`` is now a function.')
pass
[docs]
def smooth_velocity(self, velocity_smooth, in_place=True, raise_error=False, smoothspec_kwargs={'fftsmooth':True}):
"""
Smooth template in velocity using ``sedpy`` (formerly in ``astro-prospector``)
Parameters
----------
velocity_smooth: float
Velocity smoothing *sigma*, in km/s.
in_place : bool
Set internal ``flux`` array to the smoothed array. If False, then
return a new `~eazy.templates.Template` object.
raise_error : bool
If ``from sedpy.smoothing import smooth_vel`` fails,
raise an exception or die quietly.
"""
try:
from sedpy.smoothing import smoothspec
except:
if raise_error:
raise ImportError("Couldn't import `sedpy.smoothing")
else:
return None
if velocity_smooth <= 0:
if in_place:
return True
else:
return self
sm_flux = np.array([smoothspec(self.wave,
self.flux[i,:],
outwave=self.wave,
resolution=velocity_smooth,
smoothtype='vel',
**smoothspec_kwargs
)
for i in range(self.NZ)])
sm_flux[~np.isfinite(sm_flux)] = 0.
if in_place:
self.flux_orig = self.flux*1
self.velocity_smooth = velocity_smooth
self.flux = sm_flux
return True
else:
return Template(arrays=(self.wave, sm_flux),
name=self.name, meta=self.meta,
redshifts=self.redshifts)
[docs]
def to_observed_frame(self, z=0, scalar=1., extra_sigma=0, lsf_func=None, to_air=False, wavelengths=None, smoothspec_kwargs={'fftsmooth':False}, include_igm=True, clip_wavelengths=[4500,9400], as_template=True):
"""
Smooth and resample to observed-frame wavelengths, including an
optional Line Spread Function (LSF)
Note that the smoothing is performed with
`sedpy.smoothing.smoothspec <https://prospect.readthedocs.io/en/latest/api/utils_api.html>`_,
which doesn't integrate precisely over "pixels" for spectral
resolutions that are similar to or less than the target smoothing
factor.
Parameters
----------
z : float
Target redshift. Note that only the wavelength array is shifted
by ``(1+z)``. The flux densities optionally include IGM
absorption (and dust from the ``redfunc`` attribute) but don't
include the ``fl_obs = fl_rest / (1+z)`` scaling.
scalar : float, array
Scalar value or array with same dimensions as ``wave`` and
``flux`` attributes
extra_sigma : float
Extra velocity dispersion (sigma, km/s) to add in quadrature with
the MUSE LSF
lsf_func : 'Bacon', function
Line Spread Function (LSF). If ``'Bacon'``, then use the "UDF-10"
MUSE LSF from `Bacon et al. 2017
<https://ui.adsabs.harvard.edu/abs/2017A%26A...608A...1B>`_ (Eq.
8). Can also be a ``function`` that takes an argument of
wavelength in Angstroms and returns the LSF sigma, in Angstroms.
If neither of these, then only `extra_sigma` will be applied.
to_air : bool
Apply vacuum-to-air conversion with `mpdaf.obj.vactoair <https://mpdaf.readthedocs.io/en/latest/api/mpdaf.obj.vactoair.html>`_
wavelengths : array, None
Optional wavelength grid (observed frame) of the target output
(e.g., MUSE) spectrum
smoothspec_kwargs : dict
Extra keyword arguments to pass to the `sedpy`/`prospector` smoothing
function `sedpy.smoothing.smoothspec <https://prospect.readthedocs.io/en/latest/api/utils_api.html>`_.
When testing with very high resolution templates around a specific
wavelength, ``smoothspec_kwargs = {'fftsmooth':True}`` did not
always work as expected, so be careful with this option (which is
much faster).
include_igm : bool
Include IGM absorption at indicated redshift
clip_wavelengths : [float, float]
Trim the full observed-frame wavelength array before convolving.
The defaults bracket the nominal MUSE range.
Returns
-------
tobs : `~eazy.template.Template` or array-like
Smoothed and resampled `~eazy.template.Template` object or flux array,
depending on ``is_array``
"""
from astropy.stats import gaussian_sigma_to_fwhm
from sedpy.smoothing import smoothspec
wobs = self.wave*(1+z)
if include_igm:
igmz = self.igm_absorption(z, pow=include_igm)
else:
igmz = 1.
if to_air:
try:
from mpdaf.obj import vactoair
wobs = vactoair(wobs)
except ImportError:
msg = ("`to_air` requested but `from mpdaf.obj import " +
"vactoair` failed")
warnings.warn(msg, AstropyUserWarning)
if clip_wavelengths is not None:
clip = wobs >= clip_wavelengths[0]
clip &= wobs <= clip_wavelengths[1]
if clip.sum() == 0:
raise ValueError('No template wavelengths found in the '+
f'clipping range {clip_wavelengths} '+
'Angstroms')
else:
clip = wobs > 0
if wavelengths is not None:
clip &= wobs > 0.95*wavelengths.min()
clip &= wobs < 1.05*wavelengths.max()
if lsf_func in ['Bacon']:
# UDF-10 LSF from Bacon et al. 2017
bacon_lsf_fwhm = lambda w: 5.866e-8 * w**2 - 9.187e-4*w + 6.04
sig_ang = bacon_lsf_fwhm(wobs[clip]) / gaussian_sigma_to_fwhm
lsf_sigma = sig_ang/wobs[clip]*3.e5
lsf_func_name = 'MUSE-LSF'
elif hasattr(lsf_func, '__call__'):
lsf_sigma = lsf_func(wobs[clip])/wobs[clip]*3.e5
lsf_func_name = 'user'
else:
lsf_sigma = 0.
lsf_func_name = None
# Quadrature sum of LSF and extra velocities
vel_sigma = np.sqrt(lsf_sigma**2 + extra_sigma**2)
# In Angstroms
smooth_lambda = vel_sigma / 3.e5 * wobs[clip]
_templ_flux = self.flux_flam(z=z)
if np.allclose(smooth_lambda, 0.):
flux_smooth = (_templ_flux*igmz*scalar)[clip]
else:
flux_smooth = smoothspec(wobs[clip],
(_templ_flux*igmz*scalar)[clip],
resolution=smooth_lambda,
smoothtype='lsf',
**smoothspec_kwargs,
)
newname = self.name + f' z={z:.3f}'
if lsf_func_name is not None:
newname += ' + ' + lsf_func_name
if extra_sigma > 0:
newname += ' + {extra_sigma:.1f} km/s'
tobs = Template(arrays=(wobs[clip], flux_smooth),
name=newname,
resample_wave=wavelengths,
redshifts=[z],
)
return tobs
[docs]
def resample(self, new_wave, z=0, in_place=True, return_array=False, interp_func=None):
"""
Resample the template to a new wavelength grid
Parameters
----------
new_wave : array
New wavelength array, can have units.
z : float
Redshift internal wavelength before resampling.
(z=0 yields no shift).
in_place : bool
Set internal ``wave`` and ``flux`` arrays to the resampled
values
return_array : bool
Return the resampled ``flux`` array if true, else return a new
`~eazy.templates.Template` object.
interp_func : None
Interpolation function. If nothing specified, tries to use
`grizli.utils_c.interp.interp_conserve_c` and falls back to
`eazy.utils.interp_conserve`.
"""
import astropy.units as u
breakme = False
if isinstance(new_wave, str):
if new_wave == 'None':
breakme = True
elif not os.path.exists(new_wave):
msg = 'WARNING: new_wave={0} could not be found'
print(msg.format(new_wave))
breakme = True
else:
new_wave = np.loadtxt(new_wave)
elif new_wave is None:
breakme = True
if breakme:
if in_place:
return False
else:
return self
if hasattr(new_wave, 'unit'):
new_wave = new_wave.to(u.Angstrom).value
if interp_func is None:
try:
from grizli.utils_c import interp
interp_func = interp.interp_conserve_c
except:
interp_func = utils.interp_conserve
new_flux = [interp_func(new_wave, self.wave*(1+z), self.flux[i,:])
for i in range(self.NZ)]
new_flux = np.array(new_flux)
if in_place:
self.wave = new_wave*1
self.flux = new_flux
return True
else:
if return_array:
return new_flux
else:
return Template(arrays=(new_wave, new_flux),
name=self.name, meta=self.meta,
redshifts=self.redshifts)
[docs]
def zindex(self, z=0., redshift_type='nearest'):
"""
Get the redshift index of a multi-dimensional template array
Parameters
----------
z : float
Redshift to retrieve
redshift_type : 'nearest', 'interp', 'floor'
Interpolation type:
- 'nearest': nearest step in the template redshift grid
- 'interp': Returns index below ``z`` and interpolation fraction
- 'floor': last index where redshift grid < ``z``.
Returns
-------
iz : int
Array index, i.e, ``self.flux[iz,:]``
frac : float, optional
Fraction for interpolation, if ``redshift_type == 'interp'``.
"""
zint = np.interp(z, self.redshifts, np.arange(self.NZ),
left=0, right=self.NZ-1)
if redshift_type == 'nearest':
iz = np.round(zint).astype(int)
elif redshift_type == 'interp':
iz = int(np.floor(zint))
if z < self.redshifts[0]:
frac = 1.
elif iz < self.NZ-1:
frac = 1. - ( (z - self.redshifts[iz]) /
np.diff(self.redshifts)[iz] )
else:
frac = 1.
return iz, frac
elif redshift_type == 'floor':
iz = int(np.floor(zint))
else:
raise ValueError(f"redshift_type ({redshift_type}) must be " +
"'nearest', 'interp', or 'floor'.")
return iz
[docs]
def zscale(self, z, scalar=1, include_igm=True, **kwargs):
"""Redshift the template and multiply by a scalar.
Parameters
----------
z : float
Redshift to use.
scalar : float or array
Multiplicative factor. Additional factor of 1./(1+z) is implicit.
include_igm : bool
Include Inoue (2014) IGM absorption (also can be passed as
``apply_igm`` in ``kwargs``.)
Returns
-------
ztemp : `~eazy.templates.Template`
Redshifted and scaled spectrum.
"""
if 'apply_igm' in kwargs:
include_igm = kwargs['apply_igm']
if include_igm:
igmz = self.igm_absorption(z, pow=include_igm)
else:
igmz = 1.
return Template(arrays=(self.wave*(1+z),
self.flux_flam(z=z)*scalar/(1+z)*igmz),
name=f'{self.name} z={z}')
[docs]
def integrate_filter(self, filt, flam=False, scale=1., z=0, include_igm=False, redshift_type='nearest', iz=None):
"""
Integrate the template through a `FilterDefinition` filter object.
.. note:: The `grizli` interpolation function
`grizli.utils_c.interp.interp_conserve_c` will be used if
available.
Parameters
----------
filt : `~eazy.filters.FilterDefinition` object or a list of them
Filter(s) to interpolate
flam : bool
Return integrated fluxes in f-lambda, rather than f-nu
scale : float, array
Scale factor applied to template before integrating. If an
array is specified, it must have the same size as the template
``wave`` array.
z : float
Redshift the template before integrating through the filter
include_igm : bool
Include IGM absorption
redshift_type : str
See `~eazy.templates.Template.zindex`.
iz : int
Evaluate for a specific index of the ``flux`` array rather than
calculating with ``zindex``
Returns
-------
fnu : float or array
Template integrated through one or more filters from ``filt``. By
defaults has units of fnu
.. note:: The interpolated fluxes *do not* include factors of
(1+z) from the redshifted templates.
"""
try:
import grizli.utils_c
interp = grizli.utils_c.interp.interp_conserve_c
except ImportError:
interp = utils.interp_conserve
if hasattr(filt, '__len__'):
filts = filt
single = False
else:
filts = [filt]
single = True
if include_igm > 0:
igmz = self.igm_absorption(z, pow=include_igm)
else:
igmz = 1.
# Fnu flux density, with IGM and scaling
if iz is None:
fnu = self.flux_fnu(z=z, redshift_type=redshift_type)
else:
fnu = self.flux_fnu(iz=iz)
fnu *= scale*igmz
fluxes = []
for filt_i in filts:
templ_filt = interp(filt_i.wave.astype(float),
self.wave.astype(float)*(1+z),
fnu.astype(float), left=0, right=0)
# f_nu/lam dlam == f_nu d (ln nu)
integrator = np.trapz
temp_int = integrator(filt_i.throughput*templ_filt/filt_i.wave,
filt_i.wave)
temp_int /= filt_i.norm
if flam:
temp_int *= utils.CLIGHT*1.e10/(filt_i.pivot/(1+z))**2
fluxes.append(temp_int)
if single:
return fluxes[0]
else:
return np.array(fluxes)
[docs]
def igm_absorption(self, z, scale_tau=1., pow=1):
"""
Compute IGM absorption with `eazy.igm.Inoue14`.
Parameters
----------
z : float
Redshift to use for IGM absorption factors
scale_tau : float
Scale factor multiplied to of IGM ``tau`` values
pow : float
Scale the absorption strength as ``eazy.igm.Inoue14()**pow``.
"""
try:
from . import igm as igm_module
except:
from eazy import igm as igm_module
igm = igm_module.Inoue14(scale_tau=scale_tau)
igmz = self.wave*0.+1
lyman = self.wave < 1300
igmz[lyman] = igm.full_IGM(z, (self.wave*(1+z))[lyman])**pow
return igmz
[docs]
def integrate_filter_list(self, filters, include_igm=True, norm_index=None, **kwargs):
"""
Integrate template through all filters
filters: list of `~eazy.filters.Filter` objects
[rewritten as simple wrapper]
"""
fluxes = self.integrate_filter(filters, include_igm=include_igm,
**kwargs)
if isinstance(norm_index, int):
fluxes /= fluxes[norm_index]
return fluxes
[docs]
def to_table(self, formats={'wave':'.5e', 'flux':'.5e'}, with_units=False, flatten=True):
"""
Return template as an `astropy.table.Table`.
Parameters
----------
formats : dict
Set ``format`` attributes of table columns
with_units : bool
Set ``unit`` attributes of table columns
flatten : bool
If no redshift dependence (``NZ==0``), columns are 1D arrays.
Returns
-------
tab : `astropy.table.Table`
Output table
"""
from astropy.table import Table
import astropy.units as u
import copy
tab = Table()
tab['wave'] = self.wave
tab['flux'] = self.flux.T
if with_units:
tab['wave'].unit = u.Angstrom
tab['flux'].unit = self.flux_unit
for c in tab.colnames:
if c in formats:
tab[c].format = formats[c]
tab.meta = copy.deepcopy(self.meta)
if self.NZ > 1:
tab.meta['NZ'] = self.NZ
for j in range(self.NZ):
tab.meta[f'Z{j}'] = self.redshifts[j]
else:
if flatten:
tab['flux'] = self.flux[0,:]
return tab
[docs]
class ModifiedBlackBody(object):
def __init__(self, Td=47, beta=1.6, q=2.34, alpha=-0.75):
"""
Modified black body: nu**beta * BB(nu, Td) + FIR-radio correlation
Parameters
----------
Td : float
Dust temperature
beta : float
Slope parameter
q : float
FIR-radio normalization
alpha : float
Radio spectral slope: ``fnu = (nu/1.4e9)**alpha``
"""
self.Td = Td
self.q = q
self.beta = beta
self.alpha = alpha
@property
def bb(self):
"""
`astropy.modeling.models.BlackBody` function with ``self.Td`` dust temperature
"""
from astropy.modeling.models import BlackBody
return BlackBody(temperature=self.Td*u.K)
[docs]
def __call__(self, wave, q=None):
"""
Return modified BlackBody (fnu) as a function of wavelength
Parameters
----------
wave : array
Wavelength array. If no ``unit`` attribute, assume
`astropy.units.micron`.
q : float
Parameter of FIR-radio correlation. If not specified, use
internal ``self.q``.
"""
from astropy.constants import L_sun, h, k_B, c
if not hasattr(wave, 'unit'):
mu = wave*u.micron
else:
mu = wave
nu = (c/mu).to(u.Hz).value
mbb = (self.bb(nu)*nu**self.beta).value
lim = (mu > 40*u.micron) & (mu < 120*u.micron)
lir = np.trapz(mbb[lim][::-1], nu[lim][::-1])
if q is None:
q = self.q
radio = 10**(np.log10(lir/3.75e12)-q)
radio *= (nu/1.4e9)**self.alpha
return (mbb+radio)
def __repr__(self):
label = r'$T_\mathrm{{{{d}}}}$={0:.0f}, $\beta$={1:.1f}'
return label.format(self.Td, self.beta)
PHOENIX_LOGG_FULL = [3.0, 3.5, 4.0, 4.5, 5.0, 5.5]
PHOENIX_LOGG = [4.0, 4.5, 5.0, 5.5]
PHOENIX_TEFF_FULL = [400.0, 420.0, 450.0, 500.0, 550.0, 600.0, 650.0, 700.0, 750.0, 800.0, 850.0, 900.0, 950.0, 1000.0, 1050.0, 1100.0, 1150.0, 1200.0, 1250.0, 1300.0, 1350.0, 1400.0, 1450.0, 1500.0, 1550.0, 1600.0, 1650.0, 1700.0, 1750.0, 1800.0, 1850.0, 1900.0, 1950.0, 2000.0, 2100.0, 2200.0, 2300.0, 2400.0, 2500.0, 2600.0, 2700.0, 2800.0, 2900.0, 3000.0, 3100.0, 3200.0, 3300.0, 3400.0, 3500.0, 3600.0, 3700.0, 3800.0, 3900.0, 4000.0, 4100.0, 4200.0, 4300.0, 4400.0, 4500.0, 4600.0, 4700.0, 4800.0, 4900.0, 5000.0]
PHOENIX_TEFF = [400., 420., 450., 500., 550., 600., 650., 700., 750.,
800., 850., 900., 950., 1000., 1050., 1100., 1150., 1200.,
1300., 1400., 1500., 1600., 1700., 1800., 1900., 2000., 2100.,
2200., 2300., 2400., 2500., 2600., 2700., 2800., 2900., 3000.,
3100., 3200., 3300., 3400., 3500., 3600., 3700., 3800., 3900., 4000.,
4200., 4400., 4600., 4800., 5000., 5500., 5500, 6000., 6500., 7000.]
PHOENIX_ZMET_FULL = [-2.5, -2.0, -1.5, -1.0, -0.5, -0., 0.5]
PHOENIX_ZMET = [-1.0, -0.5, -0.]
[docs]
def load_phoenix_stars(logg_list=PHOENIX_LOGG, teff_list=PHOENIX_TEFF, zmet_list=PHOENIX_ZMET, add_carbon_star=True, file='bt-settl_t400-7000_g4.5.fits', sonora_dwarfs=True, lowz_dwarfs=False, lowz_kwargs={}):
"""
Load Phoenix stellar templates
Parameters
----------
logg_list : list
List of logg values of the PHOENIX grid
teff_list : list
List of Teff values of the PHOENIX grid
zmet_list : list
List of ZMET metallicities of the PHOENIX grid
add_carbon_star : bool
Include ``templates/stars/carbon_star.txt`` template
file : str
PHOENIX grid file, available at
https://s3.amazonaws.com/grizli/CONF/bt-settl_t400-7000_g4.5.fits
sonora_dwarfs : bool
Include Sonora dwarf models (Marley et al. 2018), see
`~eazy.templates.load_sonora_stars`
lowz_dwarfs : bool
Include LOWZ dwarf models (Meisner et al. 2021), see
`~eazy.templates.load_LOWZ_templates`
lowz_kwargs : dict
Keyword arguments for `~eazy.templates.load_LOWZ_templates`
Returns
-------
stars : list
List of `~eazy.templates.Template` objects of the stellar models
"""
try:
from urllib.request import urlretrieve
except:
from urllib import urlretrieve
from astropy.table import Table
import astropy.io.fits as pyfits
paths = ['/tmp', './templates/', './']
hdu = None
for path in paths:
templ_path = os.path.join(path, file)
if os.path.exists(templ_path):
print(f'phoenix_templates: {templ_path}')
hdu = pyfits.open(templ_path)
break
if hdu is None:
#url = 'https://s3.amazonaws.com/grizli/CONF'
url = 'https://erda.ku.dk/vgrid/Gabriel%20Brammer/CONF/'
print('Fetch {0}/{1}'.format(url, file))
#os.system('wget -O /tmp/{1} {0}/{1}'.format(url, file))
res = urlretrieve('{0}/{1}'.format(url, file),
filename=templ_path)
hdu = pyfits.open(templ_path)
tab = Table.read(hdu[1])
tstars = []
N = tab['flux'].shape[1]
for i in range(N):
teff = tab.meta['TEFF{0:03d}'.format(i)]
logg = tab.meta['LOGG{0:03d}'.format(i)]
if 'ZMET{0:03d}'.format(i) in tab.meta:
met = tab.meta['ZMET{0:03d}'.format(i)]
else:
met = 0.
if (logg not in logg_list) | (teff not in teff_list) | (met not in zmet_list):
#print('Skip {0} {1}'.format(logg, teff))
continue
label = 'bt-settl_t{0:05.0f}_g{1:3.1f}_m{2:.1f}'.format(teff, logg, met)
arrays = (tab['wave'], tab['flux'][:, i])
tstars.append(Template(arrays=arrays, name=label, redfunc=None))
cfile = 'templates/stars/carbon_star.txt'
if add_carbon_star & os.path.exists(cfile):
sp = Table.read(cfile, format='ascii.commented_header')
if add_carbon_star > 1:
import scipy.ndimage as nd
cflux = nd.gaussian_filter(sp['flux'], add_carbon_star)
else:
cflux = sp['flux']
tstars.append(Template(arrays=(sp['wave'], cflux),
name='carbon-lancon2002'))
if sonora_dwarfs:
tstars += load_sonora_stars()
if lowz_dwarfs:
tstars += load_LOWZ_templates(**lowz_kwargs)
return tstars
def load_sonora_stars():
"""
Load Sonora brown dwarf models: Marley et al. (2018,
https://zenodo.org/record/1309035)
Get the templates from https://github.com/gbrammer/eazy-py/pull/33
"""
import glob
paths = ['/tmp', './templates/', './']
found = False
for path in paths:
templ_path = os.path.join(path, 'sonora')
if os.path.exists(templ_path):
print(f'sonora_stars: {templ_path}')
found = True
break
if not found:
return []
files = glob.glob(os.path.join(templ_path, 'sonora*fits'))
files.sort()
stars = []
for file in files:
name = ' ' + os.path.basename(file).split('.fits')[0]
stars.append(Template(file=file, name=name))
return stars
def download_LOWZ_templates(verbose=True):
"""
Download LOWZ brown dwarf templates from Meisner et al. (2021)
"""
import tarfile
from astropy.utils.data import download_file
from astropy.table import Table
module_path = os.path.dirname(__file__)
data_path = os.path.join(module_path, 'data/templates/lowz')
csv_file = os.path.join(data_path, 'LOWZ_models_index.csv')
if verbose:
print(f'LOWZ (Meisner et al.) templates in {data_path}')
if os.path.exists(data_path):
return data_path, csv_file
os.makedirs(data_path)
# models.tar.gz
remote_url = 'https://dataverse.harvard.edu/api/access/datafile/4571308'
if verbose:
print(f"Download LOWZ models.tar.gz")
file_path = download_file(remote_url, cache=True)
with tarfile.open(file_path, 'r:gz') as fp:
fp.extractall(path=data_path)
if not os.path.exists(csv_file):
if verbose:
print(f"Download LOWZ_models_index.csv")
csv = Table.read('https://dataverse.harvard.edu/api/access/datafile/4570758',
format='ascii.tab')
csv.write(csv_file, overwrite=True, format='csv')
return data_path, csv_file
def load_LOWZ_templates(metallicity=[-2.5, 1.0], logg=[3.5, 5.25], teff=[500, 1600], ctoo=[0.1, 0.55, 0.85], logkzz=[-1.,2.,10.], verbose=True, **kwargs):
"""
Read LOWZ templates (Meisner et al. 2021)
Parameters
----------
metallicity : (float, float)
Range of metallicities from [-2.5, 1.0], inclusive of limits
logg : (float, float)
Range of log g from [2.5, 5.25], inclusive of limits
teff : (float, float)
Range of effective temperature from [500, 1600], inclusive of limits
ctoo : list
Discrete values of the CTOO grid [0.1, 0.55, 0.85]
logkzz : list
Discrete values of the LOGKZZ grid [-1, 2, 10]
Returns
-------
csv : `~astropy.table.Table`
Summary table with the parameter selections
stars : list
List of `~eazy.templates.Template` objects
"""
from tqdm import tqdm
from astropy.table import Table
data_path, csv_file = download_LOWZ_templates(verbose=verbose)
csv = Table.read(csv_file)
for c in list(csv.colnames):
csv.rename_column(c, c.lower())
subset = csv['metallicity'] >= metallicity[0]
subset &= csv['metallicity'] <= metallicity[1]
subset &= csv['logg'] >= logg[0]
subset &= csv['logg'] <= logg[1]
subset &= csv['teff'] >= teff[0]
subset &= csv['teff'] <= teff[1]
subset &= np.in1d(csv['ctoo'], ctoo)
subset &= np.in1d(csv['logkzz'], logkzz)
if verbose:
print(f'Load {subset.sum()} LOWZ BD templates')
name_str = 'LOWZ_t{teff:04.0f}_g{logg:.1f}_m{metallicity:.1f}'
name_str += '_ctoo{ctoo:.2f}_kz{logkzz:.0f}'
stars = []
for row in tqdm(csv[subset]):
file_path = os.path.join(data_path, 'models', row['filename'])
name = name_str.format(**row)
stars.append(Template(file=file_path, name=name, to_angstrom=1.e4))
return csv[subset], stars
def param_table(templates):
"""
Try to generate parameters for a list of templates from their
metadata
(TBD)
"""
pass
[docs]
def bspline_templates(wave, degree=3, df=6, get_matrix=True, log=False, clip=1.e-4, minmax=None):
"""
B-spline basis functions, modeled after `patsy.splines <https://patsy.readthedocs.io/en/latest/spline-regression.html>`_
"""
from collections import OrderedDict
from scipy.interpolate import splev
order = degree+1
n_inner_knots = df - order
inner_knots = np.linspace(0, 1, n_inner_knots + 2)[1:-1]
norm_knots = np.concatenate(([0, 1] * order,
inner_knots))
norm_knots.sort()
if log:
xspl = np.log(wave)
else:
xspl = wave*1
if minmax is None:
mi = xspl.min()
ma = xspl.max()
else:
mi, ma = minmax
width = ma-mi
all_knots = norm_knots*width+mi
n_bases = len(all_knots) - (degree + 1)
basis = np.empty((xspl.shape[0], n_bases), dtype=float)
coefs = np.identity(n_bases)
basis = splev(xspl, (all_knots, coefs, degree))
for i in range(n_bases):
out_of_range = (xspl < mi) | (xspl > ma)
basis[i][out_of_range] = 0
wave_peak = np.round(wave[np.argmax(basis, axis=1)])
maxval = np.max(basis, axis=1)
for i in range(n_bases):
basis[i][basis[i] < clip*maxval[i]] = 0
if get_matrix:
return np.vstack(basis).T
temp = OrderedDict()
for i in range(n_bases):
key = 'bspl {0} {1:.0f}'.format(i, wave_peak[i])
temp[key] = Template(arrays=(wave*1., basis[i]), name=key,
meta={'wave_peak':wave_peak[i]})
#temp[key].name = key
#temp[key].wave_peak = wave_peak[i]
temp.knots = all_knots
temp.degree = degree
temp.xspl = xspl
return temp
[docs]
def gaussian_templates(wave, centers=[], widths=[], norm=False):
"""
Make Gaussian "templates" for the template correction
"""
_x = np.array([1/np.sqrt(2*np.pi*w**2)**norm*np.exp(-(wave-c)**2/2/w**2)
for c, w in zip(centers, widths)])
return _x.T