import numpy as np
import os
from astropy.table import Table
from . import utils
__all__ = ["FilterDefinition", "FilterFile", "ParamFilter"]
VEGA_FILE = os.path.join(utils.path_to_eazy_data(),
'alpha_lyr_stis_008.fits')
VEGA = Table.read(VEGA_FILE)
for c in VEGA.colnames:
VEGA[c] = VEGA[c].astype(float)
[docs]
class FilterDefinition:
def __init__(self, name=None, wave=None, throughput=None, bp=None, photon_counter=True):
"""
Bandpass object
Parameters
----------
name : str
Label name
wave : array
Wavelength array, in `astropy.units.Angstrom`.
throughput : array
Throughput, arbitrary normalization
bp : optional, `pysynphot.obsbandpass` object
`pysynphot` filter bandpass
photon_counter : bool
Filter is associated with a photon-counting detector (e.g. CCDs).
Often FIR devices are energy-counting, e.g., MIPS, Herschel.
"""
self.name = name
self.wave = wave
self._throughput = throughput
self.Aflux = 1.
self.photon_counter = photon_counter
if isinstance(name, str):
# Set energy-counting filters
ir_filts = ['mips/24','mips/70','mips/160',
'scuba2/450','scuba2/850',
'herschel/pacs/70', 'herschel/pacs/100',
'herschel/pacs/160', 'herschel/spire/200',
'herschel/spire/350', 'herschel/spire/500']
if name.split()[0] in ir_filts:
self.photon_counter = False
# pysynphot Bandpass
if bp is not None:
self.wave = np.cast[np.double](bp.wave)
self._throughput = np.cast[np.double](bp.throughput)
self.name = bp.name
# Set throughput accounting for photon_counter
self.set_throughput()
[docs]
def set_throughput(self):
"""
Set throughput accounting for `photon_counter` attribute
"""
if self._throughput is None:
self.throughput = None
self.norm = 1.
else:
if self.photon_counter:
self.throughput = self._throughput
else:
_wfact = self.wave/np.mean(self.wave)
self.throughput = self._throughput/_wfact
self.norm = np.trapz(self.throughput/self.wave, self.wave)
def __repr__(self):
return self.name.__repr__()
def __str__(self):
return self.name.__str__()
[docs]
def get_extinction(self, EBV=0, Rv=3.1):
"""
Extinction factor
"""
import astropy.units as u
f99 = utils.GalacticExtinction(EBV=EBV, Rv=Rv)
self.Alambda = f99(self.wave)
self.Aflux = 10**(-0.4*self.Alambda)
[docs]
def extinction_correction(self, EBV, Rv=3.1, mag=True, source_lam=None, source_flux=None):
"""
Get the MW extinction correction within the filter.
Optionally supply a source spectrum.
"""
import astropy.units as u
try:
import grizli.utils_c
interp = grizli.utils_c.interp.interp_conserve_c
except ImportError:
interp = utils.interp_conserve
if self.wave is None:
print('Filter not defined.')
return False
if source_flux is None:
src = self.throughput*0.+1
else:
src = interp(self.wave, source_lam, source_flux, left=0, right=0)
if (self.wave.min() < 910) | (self.wave.max() > 6.e4):
Alambda = 0.
else:
f99 = utils.GalacticExtinction(EBV=EBV, Rv=Rv)
Alambda = f99(self.wave)
src_red = np.trapz(self.throughput*src*10**(-0.4*Alambda)/self.wave,
self.wave)
src_nored = np.trapz(self.throughput*src/self.wave, self.wave)
delta = src_red/src_nored
if mag:
return 2.5*np.log10(delta)
else:
return 1./delta
@property
def ABVega(self):
"""
Compute AB-Vega conversion
"""
from astropy.constants import c
import astropy.units as u
try:
import grizli.utils_c
interp = grizli.utils_c.interp.interp_conserve_c
except ImportError:
interp = utils.interp_conserve
# Union of throughput and Vega spectrum arrays
full_x = np.hstack([self.wave, VEGA['WAVELENGTH']])
full_x = full_x[np.argsort(full_x)]
# Vega spectrum, units of f-lambda flux density, cgs
# Interpolate to wavelength grid, no extrapolation
vega_full = interp(full_x, VEGA['WAVELENGTH'], VEGA['FLUX'],
left=0, right=0)
thru_full = interp(full_x, self.wave, self.throughput,
left=0, right=0)
# AB = 0, same units
absp = 3631*1e-23*c.to(u.m/u.s).value*1.e10/full_x**2
# Integrate over the bandpass, flam dlam
num = np.trapz(vega_full*thru_full, full_x)
den = np.trapz(absp*thru_full, full_x)
return -2.5*np.log10(num/den)
@property
def pivot(self):
"""
Pivot wavelength
http://pysynphot.readthedocs.io/en/latest/properties.html
"""
integrator = np.trapz
num = integrator(self.wave, self.wave*self.throughput)
den = integrator(self.wave, self.throughput/self.wave)
pivot = np.sqrt(num/den)
return pivot
@property
def equivwidth(self):
"""
Filter equivalent width
http://pysynphot.readthedocs.io/en/latest/properties.html
"""
return np.trapz(self.throughput, self.wave)
@property
def rectwidth(self):
"""
Filter rectangular width
http://pysynphot.readthedocs.io/en/latest/properties.html
"""
rect = self.equivwidth / self.throughput.max()
return rect
@property
def ctw95(self):
"""
95% cumulative throughput width
http://www.stsci.edu/hst/acs/analysis/bandwidths/#keywords
"""
dl = np.diff(self.wave)
filt = np.cumsum((self.wave*self.throughput)[1:]*dl)
ctw95 = np.interp([0.025, 0.975], filt/filt.max(), self.wave[1:])
return np.diff(ctw95)[0]
[docs]
def for_filter_file(self, row_str='{i:6} {wave:.5e} {thru:.5e}'):
"""
Return a string that can be put in the EAZY filter file
"""
header = '{0} {1} lambda_c= {2:.4e} AB-Vega= {3:.3f} w95={4:.1f}'
N = len(self.wave)
lines = [header.format(N, self.name.split('lambda_c')[0],
self.pivot, self.ABVega, self.ctw95)]
lines += [row_str.format(i=i+1, wave=w, thru=t)
for i, (w, t) in enumerate(zip(self.wave, self.throughput))]
return '\n'.join(lines)
[docs]
class FilterFile:
def __init__(self, file='FILTER.RES.latest', path='./'):
"""
Read a EAZY filter file.
.. plot::
:include-source:
import matplotlib.pyplot as plt
from eazy.filters import FilterFile
res = FilterFile(path=None)
print(len(res.filters))
bp = res[205]
print(bp)
fig, ax = plt.subplots(1,1,figsize=(6,4))
ax.plot(bp.wave, bp.throughput, label=bp.name.split()[0])
ax.set_xlabel('wavelength, Angstroms')
ax.set_ylabel('throughput')
ax.legend()
ax.grid()
fig.tight_layout(pad=0.5)
"""
if path is None:
file_path = os.path.join(os.getenv('EAZYCODE'), 'filters', file)
else:
file_path = os.path.join(path, file)
with open(file_path, 'r') as fp:
lines = fp.readlines()
self.filename = file_path
filters = []
wave = []
trans = []
header = ''
for line in lines:
if 'lambda_c' in line:
if len(wave) > 0:
# Make filter from lines already read in
new_filter = FilterDefinition(name=header,
wave=np.cast[float](wave),
throughput=np.cast[float](trans))
# new_filter.name = header
# new_filter.wave = np.cast[float](wave)
# new_filter.throughput = np.cast[float](trans)
filters.append(new_filter)
# Initialize filter
header = ' '.join(line.split()[1:])
wave = []
trans = []
else:
lspl = np.cast[float](line.split())
wave.append(lspl[1])
trans.append(lspl[2])
# last one
# new_filter = FilterDefinition()
# new_filter.name = header
# new_filter.wave = np.cast[float](wave)
# new_filter.throughput = np.cast[float](trans)
new_filter = FilterDefinition(name=header,
wave=np.cast[float](wave),
throughput=np.cast[float](trans))
filters.append(new_filter)
self.filters = filters
@property
def NFILT(self):
"""
Number of filters in the list
"""
return len(self.filters)
def __getitem__(self, i1):
"""
Return unit-indexed filter, e.g., 161 = 2mass-j
"""
return self.filters[i1-1]
[docs]
def names(self, verbose=True):
"""
Print the filter names.
"""
if verbose:
for i in range(len(self.filters)):
print('{0:5d} {1}'.format(i+1, self.filters[i].name))
else:
string_list = ['{0:5d} {1}\n'.format(i+1, self.filters[i].name) for i in range(len(self.filters))]
return string_list
[docs]
def write(self, file='xxx.res', verbose=True):
"""
Dump the filter information to a filter file.
"""
fp = open(file,'w')
for filter in self.filters:
fp.write('{0:6d} {1}\n'.format(len(filter.wave), filter.name))
for i in range(len(filter.wave)):
fp.write('{0:6d} {1:.5e} {2:.5e}\n'.format(i+1, filter.wave[i], filter.throughput[i]))
fp.close()
string_list = self.names(verbose=False)
fp = open(file+'.info', 'w')
fp.writelines(string_list)
fp.close()
if verbose:
print('Wrote <{0}[.info]>'.format(file))
[docs]
def search(self, search_string, case=False, verbose=True):
"""
Search filter names for ``search_string``. If ``case`` is True, then
match case.
"""
import re
if not case:
search_string = search_string.upper()
matched = []
for i in range(len(self.filters)):
filt_name = self.filters[i].name
if not case:
filt_name = filt_name.upper()
if re.search(search_string, filt_name) is not None:
if verbose:
print('{0:5d} {1}'.format(i+1, self.filters[i].name))
matched.append(i)
return np.array(matched)
[docs]
class ParamFilter(FilterDefinition):
def __init__(self, line='# Filter #20, RES#78: COSMOS/SUBARU_filter_B.txt - lambda_c=4458.276253'):
self.lambda_c = float(line.split('lambda_c=')[1])
self.name = line.split()[4]
self.fnumber = int(line.split('RES#')[1].split(':')[0])
self.cnumber = int(line.split('Filter #')[1].split(',')[0])