"""
Tools for making FSPS templates
"""
import os
from collections import OrderedDict
import numpy as np
import astropy.units as u
from astropy.cosmology import WMAP9
FLAM_CGS = u.erg/u.second/u.cm**2/u.Angstrom
LINE_CGS = 1.e-17*u.erg/u.second/u.cm**2
try:
from dust_attenuation.baseclasses import BaseAttAvModel
except:
BaseAttAvModel = object
from astropy.modeling import Parameter
import astropy.units as u
try:
from fsps import StellarPopulation
except:
# Broken, but imports
StellarPopulation = object
from . import utils
from . import templates
DEFAULT_LABEL = 'fsps_tau{tau:3.1f}_logz{logzsol:4.2f}_tage{tage:4.2f}_av{Av:4.2f}'
WG00_DEFAULTS = dict(geometry='shell', dust_type='mw',
dust_distribution='homogeneous')
__all__ = ["Zafar15", "ExtinctionModel", "SMC", "Reddy15", "KC13",
"ParameterizedWG00", "ExtendedFsps", "fsps_line_info",
"wuyts_line_Av"]
class ArrayExtCurve(BaseAttAvModel):
"""
Alam interpolated from arrays
"""
name = 'Array'
#bump_ampl = 1.
Rv = 2.21 # err 0.22
xarray = np.arange(0.09, 2.2, 0.01)
yarray = xarray*0.+1
left=None
right=None
def Alam(self, mu):
"""
klam, eq. 1
"""
Alam = np.interp(mu, self.xarray, self.yarray,
left=self.left, right=self.right)
return Alam
def evaluate(self, x, Av):
if not hasattr(x, 'unit'):
xin = np.atleast_1d(x)*u.micron
else:
xin = np.atleast_1d(x)
mu = xin.to(u.micron).value
alam = self.Alam(mu) #*self.Rv
return np.maximum(alam*Av, 0.)
[docs]
class Zafar15(BaseAttAvModel):
"""
Quasar extinction curve from Zafar et al. (2015)
https://ui.adsabs.harvard.edu/abs/2015A%26A...584A.100Z/abstract
"""
name = 'Zafar+15'
#bump_ampl = 1.
Rv = 2.21 # err 0.22
[docs]
@staticmethod
def Alam(mu, Rv):
"""
klam, eq. 1
"""
x = 1/mu
# My fit
coeffs = np.array([0.05694421, 0.57778243, -0.12417444])
Alam = np.polyval(coeffs, x)*2.21/Rv
# Only above x > 5.90
fuv = x > 5.90
if fuv.sum() > 0:
Afuv = 1/Rv*(-4.678+2.355*x + 0.622*(x-5.90)**2) + 1.
Alam[fuv] = Afuv[fuv]
return Alam
[docs]
def evaluate(self, x, Av):
if not hasattr(x, 'unit'):
xin = np.atleast_1d(x)*u.micron
else:
xin = np.atleast_1d(x)
mu = xin.to(u.micron).value
alam = self.Alam(mu, self.Rv) #*self.Rv
# Rv = Av/EBV
# EBV=Av/Rv
# Ax = Alam/Av
#
# klam = Alam/EBV
# Alam = klam*EBV = klam*Av/Rv
return np.maximum(alam*Av, 0.)
[docs]
class ExtinctionModel(BaseAttAvModel):
"""
Modify `dust_extinction.averages.G03_SMCBar` to work as Att
"""
#from dust_extinction.averages import G03_SMCBar
#SMCBar = G03_SMCBar()
curve_type = 'smc'
init_curve = None
#@property
def _curve_model(self):
if self.init_curve == self.curve_type:
return 0
if self.curve_type.upper() == 'SMC':
from dust_extinction.averages import G03_SMCBar as curve
elif self.curve_type.upper() == 'LMC':
from dust_extinction.averages import G03_LMCAvg as curve
elif self.curve_type.upper() in ['MW','F99']:
from dust_extinction.parameter_averages import F99 as curve
else:
raise ValueError(f'curve_type {self.curve_type} not recognized')
self.curve = curve()
self.init_curve = self.curve_type
[docs]
def evaluate(self, x, Av):
self._curve_model()
if not hasattr(x, 'unit'):
xin = np.atleast_1d(x)*u.Angstrom
else:
xin = np.atleast_1d(x)
xinv = 1./xin.to(u.micron)
curve = self.curve
xr = [x for x in curve.x_range]
xr[0] *= 1.001
xr[1] *= 0.999
print('xxx', xr)
if 'Rv' in curve.param_names:
klam = curve.evaluate(1/np.clip(xinv,
xr[0]/u.micron, xr[1]/u.micron),
Rv=curve.Rv)
else:
klam = curve.evaluate(1/np.clip(xinv,
xr[0]/u.micron, xr[1]/u.micron))
return klam*Av
[docs]
class SMC(BaseAttAvModel):
"""
Modify `dust_extinction.averages.G03_SMCBar` to work as Att
"""
from dust_extinction.averages import G03_SMCBar
SMCBar = G03_SMCBar()
bump_ampl = Parameter(description="Amplitude of UV bump",
default=0., min=0., max=10.)
bump_gamma = 0.04
bump_x0 = 0.2175
[docs]
def uv_bump(self, mu, bump_ampl):
"""
Drude profile for computing the UV bump.
Parameters
----------
x: np array (float)
expects wavelengths in [micron]
x0: float
Central wavelength of the UV bump (in microns).
gamma: float
Width (FWHM) of the UV bump (in microns).
ampl: float
Amplitude of the UV bump.
Returns
-------
np array (float)
lorentzian-like Drude profile
Raises
------
ValueError
Input x values outside of defined range
"""
return bump_ampl * (mu**2 * self.bump_gamma**2 /
((mu**2 - self.bump_x0**2)**2 +
mu**2 * self.bump_gamma**2))
[docs]
def evaluate(self, x, Av, bump_ampl):
if not hasattr(x, 'unit'):
xin = np.atleast_1d(x)*u.Angstrom
else:
xin = np.atleast_1d(x)
xinv = 1./xin.to(u.micron)
klam = self.SMCBar.evaluate(1/np.clip(xinv,
0.301/u.micron, 9.99/u.micron))
if bump_ampl > 0:
klam += self.uv_bump(xin.to(u.micron).value, bump_ampl)
return klam*Av
[docs]
class Reddy15(BaseAttAvModel):
"""
Attenuation curve from Reddy et al. (2015)
With optional UV bump
https://ui.adsabs.harvard.edu/abs/2015ApJ...806..259R/abstract
"""
name = 'Reddy+15'
#bump_ampl = 1.
bump_ampl = Parameter(description="Amplitude of UV bump",
default=2., min=0., max=10.)
bump_gamma = 0.04
bump_x0 = 0.2175
Rv = 2.505
@staticmethod
def _left(mu):
"""
klam, mu < 0.6 micron
"""
return -5.726 + 4.004/mu - 0.525/mu**2 + 0.029/mu**3 + 2.505
@staticmethod
def _right(mu):
"""
klam, mu > 0.6 micron
"""
return -2.672 - 0.010/mu + 1.532/mu**2 - 0.412/mu**3 + 2.505
@property
def koffset(self):
"""
Force smooth transition at 0.6 micron
"""
return self._left(0.6) - self._right(0.6)
[docs]
def evaluate(self, x, Av, bump_ampl):
if not hasattr(x, 'unit'):
xin = np.atleast_1d(x)*u.Angstrom
else:
xin = np.atleast_1d(x)
mu = xin.to(u.micron).value
left = mu < 0.6
klam = mu*0.
# Reddy Eq. 8
kleft = self._left(mu)
kright = self._right(mu)
klam[left] = self._left(mu[left])
klam[~left] = self._right(mu[~left]) + self.koffset
# Rv = Av/EBV
# EBV=Av/Rv
# klam = Alam/EBV
# Alam = klam*EBV = klam*Av/Rv
return np.maximum((klam + self.uv_bump(mu, bump_ampl))*Av/self.Rv, 0.)
[docs]
def uv_bump(self, mu, bump_ampl):
"""
Drude profile for computing the UV bump.
Parameters
----------
x: np array (float)
expects wavelengths in [micron]
x0: float
Central wavelength of the UV bump (in microns).
gamma: float
Width (FWHM) of the UV bump (in microns).
ampl: float
Amplitude of the UV bump.
Returns
-------
np array (float)
lorentzian-like Drude profile
Raises
------
ValueError
Input x values outside of defined range
"""
return bump_ampl * (mu**2 * self.bump_gamma**2 /
((mu**2 - self.bump_x0**2)**2 +
mu**2 * self.bump_gamma**2))
[docs]
class KC13(BaseAttAvModel):
"""
Kriek & Conroy (2013) attenuation model, extends Noll 2009 with UV bump
amplitude correlated with the slope, delta.
Slightly different from KC13 since the N09 model uses Leitherer (2002)
below 1500 Angstroms.
"""
name = 'Kriek+Conroy2013'
delta = Parameter(description="delta: slope of the power law",
default=0., min=-3., max=3.)
#extra_bump = 1.
extra_params = {'extra_bump':1., 'beta':-3.2, 'extra_uv':-0.4}
# Big range for use with FSPS
x_range = [0.9e-4, 2.e8]
def _init_N09(self):
from dust_attenuation import averages, shapes, radiative_transfer
# Allow extrapolation
#shapes.x_range_N09 = [0.9e-4, 2.e8]
#averages.x_range_C00 = [0.9e-4, 2.e8]
#averages.x_range_L02 = [0.9e-4, 0.18]
shapes.C00.x_range = self.x_range
shapes.N09.x_range = self.x_range
if self.x_range[0] < 0.18:
shapes.L02.x_range = [self.x_range[0], 0.18]
else:
shapes.L02.x_range = [0.097, 0.18]
self.N09 = shapes.N09()
[docs]
def evaluate(self, x, Av, delta):
import dust_attenuation
if not hasattr(self, 'N09'):
self._init_N09()
#Av = np.polyval(self.coeffs['Av'], tau_V)
x0 = 0.2175
gamma = 0.0350
ampl = (0.85 - 1.9*delta)*self.extra_params['extra_bump']
if not hasattr(x, 'unit'):
xin = np.atleast_1d(x)*u.Angstrom
else:
xin = x
wred = np.array([2.199e4])*u.Angstrom
if self.N09.param_names[0] == 'x0':
Alam = self.N09.evaluate(xin, x0, gamma, ampl, delta, Av)
Ared = self.N09.evaluate(wred, x0, gamma, ampl, delta, Av)[0]
else:
Alam = self.N09.evaluate(xin, Av, x0, gamma, ampl, delta)
Ared = self.N09.evaluate(wred, Av, x0, gamma, ampl, delta)[0]
# Extrapolate with beta slope
red = xin > wred[0]
if red.sum() > 0:
Alam[red] = Ared*(xin[red]/wred[0])**self.extra_params['beta']
blue = xin < 1500*u.Angstrom
if blue.sum() > 0:
plblue = np.ones(len(xin))
wb = xin[blue].to(u.Angstrom).value/1500
plblue[blue] = wb**self.extra_params['extra_uv']
Alam *= plblue
return Alam
[docs]
class ParameterizedWG00(BaseAttAvModel):
coeffs = {'Av': np.array([-0.001, 0.026, 0.643, -0.016]),
'x0': np.array([ 3.067e-19, -7.401e-18, 6.421e-17, -2.370e-16,
3.132e-16, 2.175e-01]),
'gamma': np.array([ 2.101e-06, -4.135e-05, 2.719e-04,
-7.178e-04, 3.376e-04, 4.270e-02]),
'ampl': np.array([-1.906e-03, 4.374e-02, -3.501e-01,
1.228e+00, -2.151e+00, 8.880e+00]),
'slope': np.array([-4.084e-05, 9.984e-04, -8.893e-03,
3.670e-02, -7.325e-02, 5.891e-02])}
# Turn off bump
include_bump = 0.25
wg00_coeffs = {'geometry': 'shell',
'dust_type': 'mw',
'dust_distribution': 'homogeneous'}
name = 'ParameterizedWG00'
# def __init__(self, Av=1.0, **kwargs):
# """
# Version of the N09 curves fit to the WG00 curves up to tauV=10
# """
# from dust_attenuation import averages, shapes, radiative_transfer
#
# # Allow extrapolation
# shapes.x_range_N09 = [0.01, 1000]
# averages.x_range_C00 = [0.01, 1000]
# averages.x_range_L02 = [0.01, 0.18]
#
# self.N09 = shapes.N09()
def _init_N09(self):
from dust_attenuation import averages, shapes, radiative_transfer
# Allow extrapolation
shapes.x_range_N09 = [0.009, 2.e8]
averages.x_range_C00 = [0.009, 2.e8]
averages.x_range_L02 = [0.009, 0.18]
self.N09 = shapes.N09()
[docs]
def get_tau(self, Av):
"""
Get the WG00 tau_V for a given Av
"""
tau_grid = np.arange(0, 10, 0.01)
av_grid = np.polyval(self.coeffs['Av'], tau_grid)
return np.interp(Av, av_grid, tau_grid, left=0., right=tau_grid[-1])
[docs]
def evaluate(self, x, Av):
import dust_attenuation
if not hasattr(self, 'N09'):
self._init_N09()
tau_V = self.get_tau(Av)
#Av = np.polyval(self.coeffs['Av'], tau_V)
x0 = np.polyval(self.coeffs['x0'], tau_V)
gamma = np.polyval(self.coeffs['gamma'], tau_V)
if self.include_bump:
ampl = np.polyval(self.coeffs['ampl'], tau_V)*self.include_bump
else:
ampl = 0.
slope = np.polyval(self.coeffs['slope'], tau_V)
if not hasattr(x, 'unit'):
xin = np.atleast_1d(x)*u.Angstrom
else:
xin = x
if self.N09.param_names[0] == 'x0':
return self.N09.evaluate(xin, x0, gamma, ampl, slope, Av)
else:
return self.N09.evaluate(xin, Av, x0, gamma, ampl, slope)
[docs]
def fsps_line_info(wlimits=None):
"""
Read FSPS line list
"""
try:
info_file = os.path.join(os.getenv('SPS_HOME'), 'data/emlines_info.dat')
with open(info_file, 'r') as f:
lines = f.readlines()
except:
return [], []
waves = np.array([float(l.split(',')[0]) for l in lines])
names = np.array([l.strip().split(',')[1].replace(' ','') for l in lines])
if wlimits is not None:
clip = (waves > wlimits[0]) & (waves < wlimits[1])
waves = waves[clip]
names = names[clip]
return waves, names
DEFAULT_LINES = fsps_line_info(wlimits=[1200, 1.9e4])[0]
BOUNDS = {}
BOUNDS['tage'] = [0.03, 12, 0.05]
BOUNDS['tau'] = [0.03, 2, 0.05]
BOUNDS['zred'] = [0.0, 13, 1.e-4]
BOUNDS['Av'] = [0.0, 15, 0.05]
BOUNDS['gas_logu'] = [-4, 0, 0.05]
BOUNDS['gas_logz'] = [-2, 0.3, 0.05]
BOUNDS['logzsol'] = [-2, 0.3, 0.05]
BOUNDS['sigma_smooth'] = [100, 500, 0.05]
[docs]
def wuyts_line_Av(Acont):
"""
Wuyts prescription for extra extinction towards nebular emission
"""
return Acont + 0.9*Acont - 0.15*Acont**2
[docs]
class ExtendedFsps(StellarPopulation):
"""
Extended functionality for the `fsps.StellarPopulation` object
"""
lognorm_center = 0.
lognorm_logwidth = 0.05
is_lognorm_sfh = False
lognorm_fburst = -30
cosmology = WMAP9
scale_lyman_series = 0.1
scale_lines = OrderedDict()
line_av_func = None
# Smoothing parameters for fitting
lsf_func = None
FFT_SMOOTH = False
#_meta_bands = ['v']
@property
def izmet(self):
"""
Get zmet index for nearest ``self.zlegend`` value to ``loggzsol``.
"""
NZ = len(self.zlegend)
logzsol = self.params['logzsol']
zi = np.interp(logzsol, np.log10(self.zlegend/0.019), np.arange(NZ))
return np.clip(np.cast[int](np.round(zi)), 0, NZ-1)
@property
def fsps_ages(self):
"""
(linear) ages of the FSPS SSP age grid, Gyr
"""
if hasattr(self, '_fsps_ages'):
return self._fsps_ages
_ = self.get_spectrum()
fsps_ages = 10**(self.log_age-9)
self._fsps_ages = fsps_ages
return fsps_ages
[docs]
def set_lognormal_sfh(self, min_sigma=3, verbose=False, **kwargs):
"""
Set lognormal tabular SFH
"""
try:
from grizli.utils_c.interp import interp_conserve_c as interp_func
except:
interp_func = utils.interp_conserve
if 'lognorm_center' in kwargs:
self.lognorm_center = kwargs['lognorm_center']
if 'lognorm_logwidth' in kwargs:
self.lognorm_logwidth = kwargs['lognorm_logwidth']
if self.is_lognorm_sfh:
self.params['sfh'] = 3
if verbose:
msg = 'lognormal SFH ({0}, {1}) [sfh3={2}]'
print(msg.format(self.lognorm_center, self.lognorm_logwidth,
self.is_lognorm_sfh))
xages = np.logspace(np.log10(self.fsps_ages[0]),
np.log10(self.fsps_ages[-1]), 2048)
mu = self.lognorm_center#*np.log(10)
# sfh = 1./t*exp(-(log(t)-mu)**2/2/sig**2)
logn_sfh = 10**(-(np.log10(xages)-mu)**2/2/self.lognorm_logwidth**2)
logn_sfh *= 1./xages
# Normalize
logn_sfh *= 1.e-9/(self.lognorm_logwidth*np.sqrt(2*np.pi*np.log(10)))
self.set_tabular_sfh(xages, logn_sfh)
self._lognorm_sfh = (xages, logn_sfh)
[docs]
def lognormal_integral(self, tage=0.1, **kwargs):
"""
Integral of lognormal SFH up to t=tage
"""
from scipy.special import erfc
mu = self.lognorm_center*np.log(10)
sig = self.lognorm_logwidth*np.sqrt(np.log(10))
cdf = 0.5*erfc(-(np.log(tage)-mu)/sig/np.sqrt(2))
return cdf
def _set_extend_attrs(self, line_sigma=50, lya_sigma=200, **kwargs):
"""
Set attributes on `fsps.StellarPopulation` object used by `narrow_lines`.
sigma : line width (FWHM/2.35), km/s.
lya_sigma : width for Lyman-alpha
Sets `emline_dlam`, `emline_sigma` attributes.
"""
# Line widths, native FSPS and new
wave, line = self.get_spectrum(tage=1., peraa=True)
dlam = np.diff(wave)
self.emline_dlam = [np.interp(w, wave[1:], dlam)
for w in self.emline_wavelengths] # Angstrom
self.emline_sigma = [line_sigma for w in self.emline_wavelengths] #kms
# Separate Ly-alpha
lya_ix = np.argmin(np.abs(self.emline_wavelengths - 1216.8))
self.emline_sigma[lya_ix] = lya_sigma
# Line EWs computed in `narrow_emission_lines`
self.emline_eqw = [-1e10 for w in self.emline_wavelengths]
# Emission line names
waves, names = fsps_line_info()
if np.allclose(self.emline_wavelengths, waves, 0.5):
self.emline_names = names
else:
self.emline_names = ['?'] * len(self.emline_wavelengths)
for w, n in zip(waves, names):
dl = np.abs(self.emline_wavelengths - w)
if dl.min() < 0.5:
self.emline_names[np.argmin(dl)] = n
for l in self.emline_names:
self.scale_lines[l] = 1.
# Precomputed arrays for WG00 reddening defined between 0.1..3 um
self.wg00lim = (self.wavelengths > 1000) & (self.wavelengths < 3.e4)
self.wg00red = (self.wavelengths > 1000)*1.
self.exec_params = None
self.narrow = None
[docs]
def narrow_emission_lines(self, tage=0.1, emwave=DEFAULT_LINES, line_sigma=100, oversample=5, clip_sigma=10, verbose=False, get_eqw=True, scale_lyman_series=None, scale_lines={}, force_recompute=False, use_sigma_smooth=True, lorentz=False, **kwargs):
"""
Replace broad FSPS lines with specified line widths
tage : age in Gyr of FSPS model
FSPS sigma: line width in A in FSPS models
emwave : (approx) wavelength of line to replace
line_sigma : line width in km/s of new line
oversample : factor by which to sample the Gaussian profiles
clip_sigma : sigmas from line center to use for the line
scale_lyman_series : scaling to apply to Lyman-series emission lines
scale_lines : scaling to apply to other emission lines, by name
Returns: `dict` with keys
wave_full, flux_full, line_full = wave and flux with fine lines
wave, flux_line, flux_clean = original model + removed lines
ymin, ymax = range of new line useful for plotting
"""
if not hasattr(self, 'emline_dlam'):
self._set_extend_attrs(line_sigma=line_sigma, **kwargs)
self.params['add_neb_emission'] = True
if scale_lyman_series is None:
scale_lyman_series = self.scale_lyman_series
else:
self.scale_lyman_series = scale_lyman_series
if scale_lines is None:
scale_lines = self.scale_lines
else:
for k in scale_lines:
if k in self.scale_lines:
self.scale_lines[k] = scale_lines[k]
else:
print(f'Line "{k}" not found in `self.scale_lines`')
# Avoid recomputing if all parameters are the same (i.e., change Av)
call_params = np.hstack([self.param_floats(params=None), emwave,
list(self.scale_lines.values()),
[tage, oversample, clip_sigma, scale_lyman_series]])
try:
is_close = np.allclose(call_params, self.exec_params)
except:
is_close = False
if is_close & (not force_recompute):
if verbose:
print('use stored')
return self.narrow
self.exec_params = call_params
wave, line = self.get_spectrum(tage=tage, peraa=True)
line_ix = [np.argmin(np.abs(self.emline_wavelengths - w))
for w in emwave]
line_lum = [self.emline_luminosity[i] for i in line_ix]
line_wave = [self.emline_wavelengths[i] for i in line_ix]
fsps_sigma = [np.sqrt((2*self.emline_dlam[i])**2 +
(self.params['sigma_smooth']/3.e5*self.emline_wavelengths[i])**2)
for i in line_ix]
if line_sigma < 0:
lines_sigma = [-line_sigma for ix in line_ix]
elif (self.params['sigma_smooth'] > 0) & (use_sigma_smooth):
lines_sigma = [self.params['sigma_smooth'] for ix in line_ix]
else:
lines_sigma = [self.emline_sigma[ix] for ix in line_ix]
line_dlam = [sig/3.e5*lwave
for sig, lwave in zip(lines_sigma, line_wave)]
clean = line*1
wlimits = [np.min(emwave), np.max(emwave)]
wlimits = [2./3*wlimits[0], 4.3*wlimits[1]]
wfine = utils.log_zgrid(wlimits, np.min(lines_sigma)/oversample/3.e5)
qfine = wfine < 0
if verbose:
msg = 'Matched line: {0} [{1}], lum={2}'
for i, ix in enumerate(line_ix):
print(msg.format(line_wave[i], ix, line_lum[i]))
#########
# Remove lines from FSPS
# line width seems to be 2*dlam at the line wavelength
for i, ix in enumerate(line_ix):
if self.params['nebemlineinspec']:
gauss = 1/np.sqrt(2*np.pi*fsps_sigma[i]**2)
gauss *= np.exp(-(wave - line_wave[i])**2/2/fsps_sigma[i]**2)
clean -= gauss*line_lum[i]
# indices of fine array where new lines defined
qfine |= np.abs(wfine - line_wave[i]) < clip_sigma*line_dlam[i]
# Linear interpolate cleaned spectrum on fine grid
iclean = np.interp(wfine[qfine], wave, clean)
# Append original and fine sampled arrays
wfull = np.append(wave, wfine[qfine])
cfull = np.append(clean, iclean)
so = np.argsort(wfull)
wfull, uniq = np.unique(wfull, return_index=True)
cfull = cfull[uniq]
gfull = cfull*0.
for i in range(len(line_wave)):
if lorentz:
# astropy.modeling.functional_models.Lorentz1D.html
# gamma is FWHM/2., integral is gamma*pi
gam = 2.35482*line_dlam[i]/2.
gline = gam**2/(gam**2 + (wfull-line_wave[i])**2)
norm = line_lum[i]/(gam*np.pi)
else:
# Gaussian
gline = np.exp(-(wfull - line_wave[i])**2/2/line_dlam[i]**2)
norm = line_lum[i]/np.sqrt(2*np.pi*line_dlam[i]**2)
if self.emline_names[line_ix[i]].startswith('Ly'):
norm *= scale_lyman_series
if self.emline_names[line_ix[i]] in self.scale_lines:
norm *= self.scale_lines[self.emline_names[line_ix[i]]]
gfull += gline*norm
if get_eqw:
clip = np.abs(wfull - line_wave[i]) < clip_sigma*line_dlam[i]
eqw = np.trapz(gline[clip]*norm/cfull[clip], wfull[clip])
self.emline_eqw[line_ix[i]] = eqw
cfull += gfull
# For plot range
ymin = iclean.min()
line_peak = [1/np.sqrt(2*np.pi*dlam**2)*lum for dlam,lum in zip(line_dlam, line_lum)]
ymax = iclean.max()+np.max(line_peak)
data = OrderedDict()
data['wave_full'] = wfull
data['flux_full'] = cfull
data['line_full'] = gfull
data['wave' ] = wave
data['flux_line' ] = line
data['flux_clean'] = clean
data['ymin' ] = ymin
data['ymax' ] = ymax
self.narrow = data
return data
[docs]
def set_fir_template(self, arrays=None, file='templates/magdis/magdis_09.txt', verbose=True, unset=False, scale_pah3=0.5):
"""
Set the far-IR template for reprocessed dust emission
"""
if unset:
if verbose:
print('Unset FIR template attributes')
for attr in ['fir_template', 'fir_name', 'fir_arrays']:
if hasattr(self, attr):
delattr(self, attr)
return True
if os.path.exists(file):
if verbose:
print(f'Set FIR dust template from {file}')
_ = np.loadtxt(file, unpack=True)
wave, flux = _[0], _[1]
self.fir_name = file
elif arrays is not None:
if verbose:
print(f'Set FIR dust template from input arrays')
wave, flux = arrays
self.fir_name = 'user-supplied'
else:
if verbose:
print(f'Set FIR dust template from FSPS (DL07)')
# Set with fsps
self.params['dust1'] = 0
self.params['dust2'] = 1.
self.params['add_dust_emission'] = True
wave, flux = self.get_spectrum(tage=1., peraa=True)
self.params['add_dust_emission'] = False
wave, flux_nodust = self.get_spectrum(tage=1., peraa=True)
flux -= flux_nodust
self.fir_name = 'fsps-dl07'
if scale_pah3 is not None:
if verbose:
print(f'Scale 3.3um PAH: {scale_pah3:.2f}')
ran = np.abs(wave-3.3e4) < 0.5e4
line = np.abs(wave-3.3e4) < 0.3e4
px = np.polyfit(wave[ran & ~line], flux[ran & ~line], 3)
scaled_line = (flux[ran] - np.polyval(px, wave[ran]))*scale_pah3
flux[ran] = np.polyval(px, wave[ran]) + scaled_line
fir_flux = np.interp(self.wavelengths, wave, flux, left=0, right=0)
self.fir_template = fir_flux/np.trapz(fir_flux, self.wavelengths)
self.fir_arrays = arrays
return True
[docs]
def set_dust(self, Av=0., dust_obj_type='WG00x', wg00_kwargs=WG00_DEFAULTS):
"""
Set `dust_obj` attribute
dust_obj_type:
'WG00' = `dust_attenuation.radiative_transfer.WG00`
'C00' = `dust_attenuation.averages.C00`
'WG00x' = `ParameterizedWG00`
'KC13' = Kriek & Conroy (2013) with dust_index parameter
'R15' = Reddy et al. (2015) with dust bump parameter
ir_template: (wave, flux)
Template to use for re-emitted IR light
"""
from dust_attenuation import averages, radiative_transfer
self.params['dust1'] = 0.
self.params['dust2'] = 0.
needs_init = False
if not hasattr(self, 'dust_obj'):
needs_init = True
if hasattr(self, 'dust_obj_type'):
if self.dust_obj_type != dust_obj_type:
needs_init = True
if needs_init:
self.dust_obj_type = dust_obj_type
if dust_obj_type == 'WG00':
self.dust_obj = radiative_transfer.WG00(tau_V=1.0,
**WG00_DEFAULTS)
elif dust_obj_type == 'WG00x':
self.dust_obj = ParameterizedWG00(Av=Av)
elif dust_obj_type == 'C00':
self.dust_obj = averages.C00(Av=Av)
elif dust_obj_type == 'R15':
self.dust_obj = Reddy15(Av=Av, bump_ampl=2.)
elif hasattr(dust_obj_type, 'extinguish'):
self.dust_obj = dust_obj_type
else:
self.dust_obj = KC13(Av=Av)
print('Init dust_obj: {0} {1}'.format(dust_obj_type, self.dust_obj.param_names))
self.Av = Av
if dust_obj_type == 'WG00':
Avs = np.array([0.151, 0.298, 0.44 , 0.574, 0.825, 1.05 , 1.252, 1.428, 1.584, 1.726, 1.853, 1.961, 2.065, 2.154, 2.318, 2.454, 2.573, 2.686, 3.11 , 3.447, 3.758, 4.049, 4.317, 4.59 , 4.868, 5.148])
taus = np.array([ 0.25, 0.5 , 0.75, 1. , 1.5 , 2. , 2.5 , 3. , 3.5 , 4. , 4.5 , 5. , 5.5 , 6. , 7. , 8. , 9. , 10. , 15. , 20. , 25. , 30. , 35. , 40. , 45. , 50. ])
tau_V = np.interp(Av, Avs, taus, left=0.25, right=50)
self.dust_obj.tau_V = tau_V
self.Av = self.dust_obj(5500*u.Angstrom)
elif dust_obj_type == 'KC13':
self.dust_obj.Av = Av
self.dust_obj.delta = self.params['dust_index']
else:
self.dust_obj.Av = Av
[docs]
def redden(self, wave):
if hasattr(self.dust_obj, 'extinguish'):
return self.dust_obj.extinguish(wave, Av=self.Av)
else:
return 10**(-0.4*self.dust_obj(wave))
[docs]
def get_full_spectrum(self, tage=1.0, Av=0., get_template=True, set_all_templates=False, z=None, tie_metallicity=True, **kwargs):
"""
Get full spectrum with reprocessed emission lines and dust emission
dust_fraction: Fraction of the SED that sees the specified Av
"""
# Set the dust model
if Av is None:
Av = self.Av
if 'dust_obj_type' in kwargs:
self.set_dust(Av=Av, dust_obj_type=kwargs['dust_obj_type'])
elif hasattr(self, 'dust_obj'):
self.set_dust(Av=Av, dust_obj_type=self.dust_obj_type)
else:
self.set_dust(Av=Av, dust_obj_type='WG00x')
# Lognormal SFH?
if ('lognorm_center' in kwargs) | ('lognorm_logwidth' in kwargs):
self.set_lognormal_sfh(**kwargs)
if 'lognorm_fburst' in kwargs:
self.lognorm_fburst = kwargs['lognorm_fburst']
# Burst fraction for lognormal SFH
if self.is_lognorm_sfh:
if not hasattr(self, '_lognorm_sfh'):
self.set_lognormal_sfh()
t1 = self.lognormal_integral(tage)
dt = (tage-self._lognorm_sfh[0])
t100 = (dt <= 0.1) & (dt >= 0)
sfhy = self._lognorm_sfh[1]*1.
sfhy += t1*10**self.lognorm_fburst/100e6*t100
self.set_tabular_sfh(self._lognorm_sfh[0], sfhy)
# Set FSPS parameters
for k in kwargs:
if k in self.params.all_params:
self.params[k] = kwargs[k]
if 'zmet' not in kwargs:
self.params['zmet'] = self.izmet
if ('gas_logz' not in kwargs) & tie_metallicity:
self.params['gas_logz'] = self.params['logzsol']
# Run the emission line function
if tage is None:
tage = self.params['tage']
_ = self.narrow_emission_lines(tage=tage, **kwargs)
wave = _['wave_full']
flux = _['flux_full']
lines = _['line_full']
contin = flux - lines
#self.sfr100 = self.sfr_avg(dt=np.minimum(tage, 0.1))
# Apply dust
if self.dust_obj_type == 'WG00':
# To template
red = (wave > 1000)*1.
wlim = (wave > 1000) & (wave < 3.e4)
red[wlim] = self.redden(wave[wlim]*u.Angstrom)
# To lines
red_lines = (self.emline_wavelengths > 1000)*1.
wlim = (self.emline_wavelengths > 1000)
wlim &= (self.emline_wavelengths < 3.e4)
line_wave = self.emline_wavelengths[wlim]*u.Angstrom
red_lines[wlim] = self.redden(line_wave)
else:
red = self.redden(wave*u.Angstrom)
if self.line_av_func is None:
self.Av_line = self.Av*1.
red_lines_full = red
line_wave = self.emline_wavelengths*u.Angstrom
red_lines = self.redden(line_wave)
else:
# Differential reddening towards nebular lines
self.Av_line = self.line_av_func(Av)
self.set_dust(Av=self.Av_line,
dust_obj_type=self.dust_obj_type)
red_lines_full = self.redden(wave*u.Angstrom)
line_wave = self.emline_wavelengths*u.Angstrom
red_lines = self.redden(line_wave)
# Reset for continuum
self.set_dust(Av=Av, dust_obj_type=self.dust_obj_type)
# Apply dust to line luminosities
lred = [llum*lr for llum, lr in
zip(self.emline_luminosity, red_lines)]
self.emline_reddened = np.array(lred)
# Total energy
e0 = np.trapz(flux, wave)
# Energy of reddened template
reddened = contin*red+lines*red_lines_full
e1 = np.trapz(reddened, wave)
self.energy_absorbed = (e0 - e1)
# Add dust emission
if hasattr(self, 'fir_template') & self.params['add_dust_emission']:
dust_em = np.interp(wave, self.wavelengths, self.fir_template)
dust_em *= self.energy_absorbed
else:
dust_em = 0.
meta0 = self.meta
self.templ = self.as_template(wave, reddened+dust_em, meta=meta0)
# Set template attributes
if set_all_templates:
# Original wavelength grid
# owave = self.wavelengths
# owave = owave[self.wg00lim]*u.Angstrom
# self.wg00red[self.wg00lim] = self.redden(owave)
# ofir = self.fir_template*self.energy_absorbed
# fl_orig = _['flux_line']*self.wg00red + ofir
# self.templ_orig = self.as_template(owave, fl_orig, meta=meta0)
# No lines
meta = meta0.copy()
meta['add_neb_emission'] = False
fl_cont = contin*red + dust_em
#ocont = _['flux_clean']*self.wg00red + ofir
self.templ_cont = self.as_template(wave, fl_cont, meta=meta)
#self.templ_cont_orig = self.as_template(owave, ocont, meta=meta)
# No dust
meta = meta0.copy()
meta['add_neb_emission'] = True
meta['Av'] = 0
self.templ_unred = self.as_template(wave, flux, meta=meta)
#self.templ_unred_orig = self.as_template(owave, _['flux_clean'],
# meta=meta)
if get_template:
return self.templ
else:
return self.templ.wave, self.templ.flux
[docs]
def as_template(self, wave, flux, label=DEFAULT_LABEL, meta=None):
"""
Return a `eazy.templates.Template` object with metadata
"""
if meta is None:
meta = self.meta
templ = templates.Template(arrays=(wave, flux), meta=meta,
name=label.format(**meta))
return templ
[docs]
def lognorm_avg_sfr(self, tage=None, dt=0.1):
"""
Analytic average SFR for lognorm SFH
"""
if tage is None:
tage = self.params['tage']
t1 = self.lognormal_integral(tage)
t0 = self.lognormal_integral(np.maximum(tage-dt, 0))
sfr_avg = (t1*(1+10**self.lognorm_fburst)-t0)/(dt*1.e9)
return sfr_avg
@property
def sfr100(self):
"""
SFR averaged over maximum(tage, 100 Myr) from `sfr_avg`
"""
if self.params['sfh'] == 0:
sfr_avg = 0.
elif self.params['sfh'] == 3:
# Try to integrate SFH arrays if attribute set
if self.is_lognorm_sfh:
sfr_avg = self.lognorm_avg_sfr(tage=None, dt=0.1)
elif hasattr(self, '_sfh_tab'):
try:
fwd = self.params['tabsfh_forward']
except:
fwd = 1
if fwd == 1:
age_lb = self.params['tage'] - self._sfh_tab[0]
step = -1
else:
age_lb = self._sfh_tab[0]
step = 1
age100 = (age_lb <= 0.1) & (age_lb >= 0)
if age100.sum() < 2:
sfr_avg = 0.
else:
sfr_avg = np.trapz(self._sfh_tab[1][age100][::step],
age_lb[age100][::step])/0.1
else:
sfr_avg = 0.
else:
sfr_avg = self.sfr_avg(dt=np.minimum(self.params['tage'], 0.1))
return sfr_avg
@property
def sfr10(self):
"""
SFR averaged over last MAXIMUM(tage, 10 Myr) from `sfr_avg`
"""
if self.params['sfh'] == 0:
sfr_avg = 0.
elif self.params['sfh'] == 3:
# Try to integrate SFH arrays if attribute set
if self.is_lognorm_sfh:
sfr_avg = self.lognorm_avg_sfr(tage=None, dt=0.01)
elif hasattr(self, '_sfh_tab'):
try:
fwd = self.params['tabsfh_forward']
except:
fwd = 1
if fwd == 1:
age_lb = self.params['tage'] - self._sfh_tab[0]
step = -1
else:
age_lb = self._sfh_tab[0]
step = 1
age10 = (age_lb < 0.01) & (age_lb >= 0)
if age10.sum() < 2:
sfr_avg = 0.
else:
sfr_avg = np.trapz(self._sfh_tab[1][age10][::step],
age_lb[age10][::step])/0.1
else:
sfr_avg = 0.
else:
sfr_avg = self.sfr_avg(dt=np.minimum(self.params['tage'], 0.01))
return sfr_avg
@property
def meta(self):
"""
Full metadata, including line properties
"""
import fsps
meta = self.param_dict
if self._zcontinuous:
meta['metallicity'] = 10**self.params['logzsol']*0.019
else:
meta['metallicity'] = self.zlegend[self.params['zmet']]
for k in ['log_age','stellar_mass', 'formed_mass', 'log_lbol',
'sfr', 'sfr100', 'dust_obj_type','Av','energy_absorbed',
'fir_name', '_zcontinuous', 'scale_lyman_series',
'lognorm_center', 'lognorm_logwidth', 'is_lognorm_sfh',
'lognorm_fburst']:
if hasattr(self, k):
meta[k] = self.__getattribute__(k)
if hasattr(self, 'emline_names'):
has_red = hasattr(self, 'emline_reddened')
if self.emline_luminosity.ndim == 1:
for i in range(len(self.emline_wavelengths)):
n = self.emline_names[i]
if n in self.scale_lines:
kscl = self.scale_lines[n]
else:
kscl = 1.0
meta[f'scale {n}'] = kscl
meta[f'line {n}'] = self.emline_luminosity[i]*kscl
if has_red:
meta[f'rline {n}'] = self.emline_reddened[i]*kscl
meta[f'eqw {n}'] = self.emline_eqw[i]
meta[f'sigma {n}'] = self.emline_sigma[i]
# Band information
if hasattr(self, '_meta_bands'):
light_ages = self.light_age_band(self._meta_bands, flat=False)
band_flux = self.get_mags(tage=self.params['tage'], zmet=None,
bands=self._meta_bands, units='flam')
band_waves = [fsps.get_filter(b).lambda_eff*u.Angstrom
for b in self._meta_bands]
band_lum = [f*w for f, w in zip(band_flux, band_waves)]
for i, b in enumerate(self._meta_bands):
meta[f'lwage_{b}'] = light_ages[i]
meta[f'lum_{b}'] = band_lum[i].value
try:
meta['libraries'] = ';'.join([s.decode() for s in self.libraries])
except:
try:
meta['libraries'] = ';'.join([s for s in self.libraries])
except:
meta['libraries'] = '[error]'
return meta
@property
def param_dict(self):
"""
`dict` version of `self.params`
"""
d = OrderedDict()
for p in self.params.all_params:
d[p] = self.params[p]
return d
[docs]
def light_age_band(self, bands=['v'], flat=True):
"""
Get light-weighted age of current model
"""
self.params['compute_light_ages'] = True
band_ages = self.get_mags(tage=self.params['tage'], zmet=None,
bands=bands)
self.params['compute_light_ages'] = False
if flat & (band_ages.shape == (1,)):
return band_ages[0]
else:
return band_ages
[docs]
def pset(self, params):
"""
Return a subset dictionary of `self.meta`
"""
d = OrderedDict()
for p in params:
if p in self.meta:
d[p] = self.meta[p]
else:
d[p] = np.nan
return d
[docs]
def param_floats(self, params=None):
"""
Return a list of parameter values. If `params` is None, then use
full list in `self.params.all_params`.
"""
if params is None:
params = self.params.all_params
d = []
for p in params:
d.append(self.params[p]*1)
return np.array(d)
[docs]
def parameter_bounds(self, params, limit_age=False):
"""
Parameter bounds for `scipy.optimize.least_squares`
"""
blo = []
bhi = []
steps = []
for p in params:
if p in BOUNDS:
blo.append(BOUNDS[p][0])
bhi.append(BOUNDS[p][1])
steps.append(BOUNDS[p][2])
else:
blo.append(-np.inf)
bhi.append(np.inf)
steps.append(0.05)
return (blo, bhi), steps
[docs]
def fit_spec(self, wave_obs, flux_obs, err_obs, mask=None, plist=['tage', 'Av', 'gas_logu', 'sigma_smooth'], func_kwargs={'lorentz':False}, verbose=True, bspl_kwargs=None, cheb_kwargs=None, lsq_kwargs={'method':'trf', 'max_nfev':200, 'loss':'huber', 'x_scale':1.0, 'verbose':True}, show=False):
"""
Fit models to observed spectrum
"""
from scipy.optimize import least_squares
import matplotlib.pyplot as plt
import grizli.utils
sys_err = 0.015
if wave_obs is None:
# mpdaf muse spectrum
_ = None # muse spectrum object
spec = _[0].spectra['MUSE_TOT_SKYSUB']
wave_obs = spec.wave.coord()
flux_obs = spec.data.filled(fill_value=np.nan)
err_obs = np.sqrt(spec.var.filled(fill_value=np.nan))
err_obs = np.sqrt(err_obs**2+(sys_err*flux_obs)**2)
mask = np.isfinite(flux_obs+err_obs) & (err_obs > 0)
omask = mask
#mask = omask & (wave_obs/(1+0.0342) > 6520) & (wave_obs/(1+0.0342) < 6780)
mask = omask & (wave_obs/(1+0.0342) > 4800) & (wave_obs/(1+0.0342) < 5050)
theta0 = np.array([self.meta[p] for p in plist])
if bspl_kwargs is not None:
bspl = grizli.utils.bspline_templates(wave_obs, get_matrix=True,
**bspl_kwargs)
elif cheb_kwargs is not None:
bspl = grizli.utils.cheb_templates(wave_obs, get_matrix=True,
**cheb_kwargs)
else:
bspl = None
kwargs = func_kwargs.copy()
for i, p in enumerate(plist):
kwargs[p] = theta0[i]
# Model test
margs = (self, plist, wave_obs, flux_obs, err_obs,
mask, bspl, kwargs, 'model')
flux_model, Anorm, chi2_init = self.objfun_fitspec(theta0, *margs)
if show:
mask &= np.isfinite(flux_model+flux_obs+err_obs) & (err_obs > 0)
plt.close('all')
fig = plt.figure(figsize=(12, 6))
plt.errorbar(wave_obs[mask],
flux_obs[mask],
err_obs[mask],
color='k',
alpha=0.5,
linestyle='None',
marker='.')
plt.plot(wave_obs, flux_model, color='pink', linewidth=2, alpha=0.8)
else:
fig = None
bounds, steps = self.parameter_bounds(plist)
#lsq_kwargs['diff_step'] = np.array(steps)/2.
#lsq_kwargs['diff_step'] = 0.05
lsq_kwargs['diff_step'] = steps
lmargs = (self, plist, wave_obs, flux_obs, err_obs,
mask, bspl, kwargs, 'least_squares verbose')
_res = least_squares(self.objfun_fitspec,
theta0,bounds=bounds,
args=lmargs,
**lsq_kwargs,
)
fit_model, Anorm, chi2_fit = self.objfun_fitspec(_res.x, *margs)
result = {'fit_model':fit_model, 'Anorm':Anorm, 'chi2_fit':chi2_fit,
'least_squares':_res, 'bounds':bounds, 'plist':plist,
'lsq_kwargs':lsq_kwargs, 'bspl':bspl}
return result
[docs]
@staticmethod
def objfun_fitspec(theta, self, plist, wave_obs, flux_obs, err_obs, mask, bspl, kwargs, ret_type):
"""
Objective function for fitting spectra
"""
import scipy.stats
try:
from grizli.utils_c.interp import interp_conserve_c as interp_func
except:
interp_func = utils.interp_conserve
err_scale = 1.
for i, p in enumerate(plist):
if p == 'err_scale':
err_scale = theta[i]
continue
kwargs[p] = theta[i]
templ = self.get_full_spectrum(**kwargs)
flux_model = templ.to_observed_frame(z=self.params['zred'],
lsf_func=self.lsf_func,
clip_wavelengths=None,
wavelengths=wave_obs,
smoothspec_kwargs={'fftsmooth':self.FFT_SMOOTH},
)
# flux_model = templ.resample(wave_obs, z=self.params['zred'],
# in_place=False,
# return_array=True, interp_func=interp_func)
flux_model = np.squeeze(flux_model.flux_flam())
if mask is None:
mask = np.isfinite(flux_model+flux_obs+err_obs) & (err_obs > 0)
if bspl is not None:
_A = (bspl.T*flux_model)
_yx = (flux_obs / err_obs)[mask]
_c = np.linalg.lstsq((_A/err_obs).T[mask,:], _yx, rcond=-1)
Anorm = np.mean(bspl.dot(_c[0]))
flux_model = _A.T.dot(_c[0])
else:
lsq_num = (flux_obs*flux_model/err_obs**2)[mask].sum()
lsq_den = (flux_model**2/err_obs**2)[mask].sum()
Anorm = lsq_num/lsq_den
flux_model *= Anorm
chi = ((flux_model - flux_obs)/err_obs)[mask]
chi2 = (chi**2).sum()
if 'verbose' in ret_type:
print('{0} {1:.4f}'.format(theta, chi2))
if 'model' in ret_type:
return flux_model, Anorm, chi2
elif 'least_squares' in ret_type:
return chi
elif 'logpdf' in ret_type:
#return -chi2/2.
lnp = scipy.stats.norm.logpdf((flux_model-flux_obs)[mask],
loc=0,
scale=(err_obs*flux_model)[mask])
return lnp
else:
return chi2
[docs]
def line_to_obsframe(self, zred=None, cosmology=None, verbose=False, unit=LINE_CGS, target_stellar_mass=None, target_sfr=None, target_lir=None):
"""
Scale factor to convert internal line luminosities (L_sun) to observed frame
If ``target_stellar_mass``, ``target_sfr``, or ``target_lir``
specified, then scale the output to the desired value using the
intrinsic properties. Units are linear ``Msun``, ``Msun/yr``,
and ``Lsun``, respectively.
"""
from astropy.constants import L_sun
if zred == None:
zred = self.params['zred']
if verbose:
msg = 'continuum_to_obsframe: Use params[zred] = {0:.3f}'
print(msg.format(zred))
if cosmology is None:
cosmology = self.cosmology
else:
self.cosmology = cosmology
if zred <= 0:
dL = 1*u.cm
else:
dL = cosmology.luminosity_distance(zred).to(u.cm)
to_cgs = (1*L_sun/(4*np.pi*dL**2)).to(unit)
if target_stellar_mass is not None:
to_cgs *= target_stellar_mass / self.stellar_mass
elif target_sfr is not None:
to_cgs *= target_sfr / self.sfr100
elif target_lir is not None:
to_cgs *= target_lir / self.energy_absorbed
return to_cgs.value
[docs]
def continuum_to_obsframe(self, zred=None, cosmology=None, unit=u.microJansky, verbose=False, target_stellar_mass=None, target_sfr=None, target_lir=None):
"""
Compute a normalization factor to scale input FSPS model flux density
units of (L_sun / Hz) or (L_sun / \AA) to observed-frame `unit`.
If ``target_stellar_mass``, ``target_sfr``, or ``target_lir``
specified, then scale the output to the desired value using the
intrinsic properties. Units are linear ``Msun``, ``Msun/yr``,
and ``Lsun``, respectively.
"""
from astropy.constants import L_sun
if zred == None:
zred = self.params['zred']
if verbose:
msg = 'continuum_to_obsframe: Use params[zred] = {0:.3f}'
print(msg.format(zred))
if cosmology is None:
cosmology = self.cosmology
else:
self.cosmology = cosmology
if zred <= 0:
dL = 1*u.cm
else:
dL = cosmology.luminosity_distance(zred).to(u.cm)
# FSPS L_sun / Hz to observed-frame
try:
# Unit is like f-lambda
_x = (1*unit).to(u.erg/u.second/u.cm**2/u.Angstrom)
is_flam = True
obs_unit = (1*L_sun/u.Angstrom/(4*np.pi*dL**2)).to(unit)/(1+zred)
except:
# Unit is like f-nu
is_flam = False
obs_unit = (1*L_sun/u.Hz/(4*np.pi*dL**2)).to(unit)*(1+zred)
if target_stellar_mass is not None:
obs_unit *= target_stellar_mass / self.stellar_mass
elif target_sfr is not None:
obs_unit *= target_sfr / self.sfr100
elif target_lir is not None:
obs_unit *= target_lir / self.energy_absorbed
return obs_unit.value
[docs]
def fit_phot(self, phot_dict, filters=None, flux_unit=u.microJansky, plist=['tage', 'Av', 'gas_logu', 'sigma_smooth'], func_kwargs={'lorentz':False}, verbose=True, lsq_kwargs={'method':'trf', 'max_nfev':200, 'loss':'huber', 'x_scale':1.0, 'verbose':True}, show=False, TEF=None, photoz_obj=None):
"""
Fit models to observed spectrum
"""
from scipy.optimize import least_squares
import matplotlib.pyplot as plt
import grizli.utils
sys_err = 0.02
flux = phot_dict['fobs']
err = phot_dict['efobs']
if 'flux_unit' in phot_dict:
flux_unit = phot_dict['flux_unit']
x0 = np.array([self.meta[p] for p in plist])
# Are input fluxes f-lambda or f-nu?
try:
_x = (1*flux_unit).to(u.erg/u.second/u.cm**2/u.Angstrom)
is_flam = True
except:
is_flam = False
# Initialize keywords
kwargs = func_kwargs.copy()
for i_p, p in enumerate(plist):
kwargs[p] = x0[i_p]
# Initial model
margs = (self, plist, flux, err, is_flam, filters, TEF, kwargs, 'model')
flux_model, Anorm, chi2_init, templ = self.objfun_fitphot(x0, *margs)
if show:
if hasattr(show, 'plot'):
ax = show
else:
plt.close('all')
fig = plt.figure(figsize=(12, 6))
ax = fig.add_subplot(111)
mask = err > 0
pivot = np.array([f.pivot for f in filters])
so = np.argsort(pivot)
ax.errorbar(pivot[mask]/1.e4, flux[mask], err[mask],
color='k', alpha=0.5, linestyle='None', marker='.')
ax.scatter(pivot[so]/1.e4, flux_model[so], color='pink',
alpha=0.8, zorder=100)
# Parameter bounds
bounds, steps = self.parameter_bounds(plist)
lsq_kwargs['diff_step'] = steps
# Run the optimization
lmargs = (self, plist, flux, err, is_flam, filters, TEF, kwargs, 'least_squares verbose')
_res = least_squares(self.objfun_fitphot, x0, bounds=bounds,
args=lmargs, **lsq_kwargs)
_out = self.objfun_fitphot(_res.x, *margs)
xtempl = _out[3]
xscale = _out[1]
_fit = {}
_fit['fmodel'] = _out[0]
_fit['scale'] = xscale
_fit['chi2'] = _out[2]
_fit['templ'] = xtempl
_fit['plist'] = plist
_fit['theta'] = _res.x
_fit['res'] = _res
# Stellar mass
#fit_model, Anorm, chi2_fit, templ = _phot
# Parameter scaling to observed frame.
# e.g., stellar mass = self.stellar_mass * scale / to_obsframe
z = self.params['zred']
_obsfr = self.continuum_to_obsframe(zred=z, unit=flux_unit)
_fit['to_obsframe'] = _obsfr
scl = _fit['scale']/_fit['to_obsframe']
_fit['log_mass'] = np.log10(self.stellar_mass*scl)
_fit['sfr'] = self.sfr*scl
_fit['sfr10'] = self.sfr10*scl
_fit['sfr100'] = self.sfr100*scl
age_bands = ['i1500','v']
ages = self.light_age_band(bands=age_bands)
for i, b in enumerate(age_bands):
_fit['age_'+b] = ages[i]
if show:
ax.scatter(pivot[so]/1.e4, _fit['fmodel'][so], color='r',
alpha=0.8, zorder=101)
iz = templ.zindex(z)
igm = templ.igm_absorption(z, scale_tau=1.4)
if is_flam:
ax.plot(xtempl.wave*(1+z)/1.e4, xtempl.flux[iz,:]*xscale*igm,
color='r', alpha=0.3, zorder=10000)
else:
ax.plot(xtempl.wave*(1+z)/1.e4, xtempl.flux_fnu(iz)*xscale*igm,
color='r', alpha=0.3, zorder=10000)
return _fit
[docs]
@staticmethod
def objfun_fitphot(theta, self, plist, flux_fnu, err_fnu, is_flam, filters, TEF, kwargs, ret_type):
"""
Objective function for fitting spectra
"""
try:
from grizli.utils_c.interp import interp_conserve_c as interp_func
except:
interp_func = utils.interp_conserve
for i, p in enumerate(plist):
kwargs[p] = theta[i]
templ = self.get_full_spectrum(**kwargs)
model_fnu = templ.integrate_filter_list(filters,
z=self.params['zred'], flam=is_flam,
include_igm=True)
mask = (err_fnu > 0)
full_var = err_fnu**2
if TEF is not None:
tefz = TEF(self.params['zred'])
full_var += (flux_fnu*tefz)**2
lsq_num = (flux_fnu*model_fnu/full_var)[mask].sum()
lsq_den = (model_fnu**2/full_var)[mask].sum()
Anorm = lsq_num/lsq_den
model_fnu *= Anorm
chi = ((model_fnu - flux_fnu)/np.sqrt(full_var))[mask]
chi2 = (chi**2).sum()
if 'verbose' in ret_type:
print('{0} {1:.4f}'.format(theta, (chi**2).sum()))
if 'model' in ret_type:
return model_fnu, Anorm, chi2, templ
elif 'least_squares' in ret_type:
return chi
elif 'logpdf' in ret_type:
return -chi2/2
else:
return chi2
[docs]
def fit_grism(self, mb, plist=['tage', 'zred'], func_kwargs={'lorentz':False}, verbose=True, lsq_kwargs={'method':'trf', 'max_nfev':200, 'loss':'huber', 'x_scale':1.0, 'verbose':True}, show=False, TEF=None, photoz_obj=None, flux_unit=u.erg/u.second/u.cm**2/u.Angstrom):
"""
Fit models to grism spectrum
"""
from scipy.optimize import least_squares
import matplotlib.pyplot as plt
import grizli.utils
x0 = np.array([self.meta[p] for p in plist])
# Initialize keywords
kwargs = func_kwargs.copy()
for i_p, p in enumerate(plist):
kwargs[p] = x0[i_p]
# Initial model
margs = (self, plist, mb, TEF, kwargs, 'model')
tfit, Anorm, chi2_init, templ = self.objfun_fitgrism(x0, *margs)
if show:
_init_fig = mb.oned_figure(tfit=tfit, show_individual_templates=True)
# Parameter bounds
bounds, steps = self.parameter_bounds(plist)
lsq_kwargs['diff_step'] = steps
# Run the optimization
lmargs = (self, plist, mb, TEF, kwargs, 'least_squares verbose')
_res = least_squares(self.objfun_fitgrism, x0, bounds=bounds,
args=lmargs, **lsq_kwargs)
_out = self.objfun_fitgrism(_res.x, *margs)
xtempl = _out[3]
xscale = _out[1]
_fit = {}
_fit['fmodel'] = _out[0]
_fit['scale'] = xscale
_fit['chi2'] = _out[2]
_fit['templ'] = xtempl
_fit['plist'] = plist
_fit['theta'] = _res.x
_fit['res'] = _res
# Stellar mass
#fit_model, Anorm, chi2_fit, templ = _phot
# Parameter scaling to observed frame.
# e.g., stellar mass = self.stellar_mass * scale / to_obsframe
z = self.params['zred']
_obsfr = self.continuum_to_obsframe(zred=z, unit=flux_unit)
_fit['to_obsframe'] = _obsfr
scl = _fit['scale']/_fit['to_obsframe']
_fit['log_mass'] = np.log10(self.stellar_mass*scl)
_fit['sfr'] = self.sfr*scl
_fit['sfr10'] = self.sfr10*scl
_fit['sfr100'] = self.sfr100*scl
age_bands = ['i1500','v']
ages = self.light_age_band(bands=age_bands)
for i, b in enumerate(age_bands):
_fit['age_'+b] = ages[i]
_tfit = _out[0]
if show:
_fit_fig = mb.oned_figure(tfit=_tfit, show_individual_templates=True)
_figs = (_init_fig, _fit_fig)
else:
_figs = None
return _fit, _tfit, _figs
[docs]
@staticmethod
def objfun_fitgrism(theta, self, plist, mb, TEF, kwargs, ret_type):
"""
Objective function for fitting spectra
"""
try:
from grizli.utils_c.interp import interp_conserve_c as interp_func
except:
interp_func = utils.interp_conserve
from grizli import utils
for i, p in enumerate(plist):
kwargs[p] = theta[i]
templ = self.get_full_spectrum(**kwargs)
tsp = utils.SpectrumTemplate(wave=templ.wave, flux=templ.flux_flam())
tx = {'fsps':tsp}
if 'least_squares' in ret_type:
out = mb.xfit_at_z(z=self.params['zred'],
templates=tx, fitter='lstsq',
fit_background=True, get_uncertainties=False,
include_photometry=False, get_residuals=True,
use_cached_templates=False)
chi, _coeffs, _, _ = out
chi2 = (chi**2).sum()
Anorm = _coeffs[-1]
else:
tfit = mb.template_at_z(z=self.params['zred'], templates=tx)
chi2 = tfit['chi2']
Anorm = tfit['cfit']['fsps'][0]
if 'verbose' in ret_type:
print('{0} {1:.4f}'.format(theta, chi2))
if 'model' in ret_type:
return tfit, Anorm, chi2, templ
elif 'least_squares' in ret_type:
return chi
elif 'logpdf' in ret_type:
return -chi2/2
else:
return chi2