import os
import collections
import numpy as np
from . import utils
__all__ = ["EazyParam", "TranslateFile", "read_param_file"]
[docs]
def read_param_file(param_file=None, verbose=True):
"""
Load a param file and add default parameters if any missing
"""
param = EazyParam(param_file, verbose=True)
if param_file is not None:
# Read defaults
defaults = EazyParam(None, verbose=False)
for k in defaults.param_names:
if k not in param.param_names:
param[k] = defaults[k]
if verbose:
print(f"Parameter default: {k} = {defaults[k]}")
return param
[docs]
class EazyParam(object):
def __init__(self, PARAM_FILE=None, verbose=True):
"""
Read an Eazy zphot.param file.
Example:
>>> if os.path.exists('zphot.param'):
... params = EazyParam(PARAM_FILE='zphot.param')
... print(params['Z_STEP'])
Defaults are in `eazy/data/zphot.param.default <https://github.com/gbrammer/eazy-py/blob/master/eazy/data/zphot.param.default>`_
Parameters
----------
param_file : str
Name of parameter file. If None, then will get
`data/zphot.param.default` from within the module.
Attributes
----------
params : `collections.OrderedDict`
Parameter dictionary. Don't modify this directly but rather use
`__getitem__` and `__setitem__` methods.
param_names
formats : list
List indicating if parameters are interpreted as string ('s') or
scalar ('f') values.
"""
if PARAM_FILE is None:
PARAM_FILE = os.path.join(
os.path.dirname(__file__), "data/zphot.param.default"
)
if verbose:
print("Read default param file: " + PARAM_FILE)
self.filename = PARAM_FILE
self.param_path = os.path.dirname(PARAM_FILE)
f = open(PARAM_FILE, "r")
self.lines = f.readlines()
f.close()
self.params = collections.OrderedDict()
self.formats = collections.OrderedDict()
self._process_params()
@property
def param_names(self):
"""
Keywords of the `params` dictionary
"""
return list(self.params.keys())
def _process_params(self):
"""
Process parameter dictionary
"""
params = collections.OrderedDict()
formats = collections.OrderedDict()
# self.param_names = []
for line in self.lines:
if not line.strip().startswith("#"):
lsplit = line.split()
if lsplit.__len__() >= 2:
params[lsplit[0]] = lsplit[1]
# self.param_names.append(lsplit[0])
try:
flt = float(lsplit[1])
formats[lsplit[0]] = "f"
params[lsplit[0]] = flt
except:
formats[lsplit[0]] = "s"
self.params = params
self.formats = formats
@property
def to_mJy(self):
"""
Return catalog conversion factor to mJy based on ``PRIOR_ABZP``.
"""
return 10 ** (-0.4 * (self.params["PRIOR_ABZP"] - 23.9)) / 1000.0
[docs]
def write(self, file=None):
"""
Write to an ascii file
"""
if file == None:
print("No output file specified...")
else:
fp = open(file, "w")
for param in self.param_names:
fp.write("{0:25s} {1}\n".format(param, self.params[param]))
fp.close()
def __getitem__(self, param_name):
"""
Get item from ``params`` dict and return None if parameter not found.
"""
if param_name.upper() not in self.param_names:
print(f"Parameter {param_name} not found. Check `param_names` attribute.")
return None
else:
return self.params[param_name.upper()]
def __setitem__(self, param_name, value):
"""
Set item in ``params`` dict.
"""
self.params[param_name.upper()] = value
[docs]
def verify_params(self):
"""
Some checks on the parameters
"""
assert self["Z_MAX"] > self["Z_MIN"]
test_paths = ["./", utils.DATA_PATH, os.path.join(utils.DATA_PATH, "filters")]
for k in ["TEMPLATES_FILE", "TEMP_ERR_FILE", "CATALOG_FILE", "FILTERS_RES"]:
if isinstance(self[k], str):
filename = self[k]
file_found = False
for path in test_paths:
if os.path.exists(os.path.join(path, filename)):
file_found = True
break
if not file_found:
raise FileNotFoundError(
f"{k} ({self[k]}) not found in {test_paths}"
)
assert int(self["ARRAY_NBITS"]) in [32, 64]
# Positive
for k in [
"TEMP_ERR_A2",
"SYS_ERR",
"IGM_SCALE_TAU",
"MW_EBV",
"OMEGA_M",
"OMEGA_L",
]:
if self[k] < 0:
raise ValueError(f"{k} ({self[k]}) must be >= 0")
# Positive nonzero
for k in ["Z_STEP", "H0", "RF_PADDING"]:
if self[k] < 0:
raise ValueError(f"{k} ({self[k]}) must be > 0")
@property
def kwargs(self):
"""
Dictionary with lower-case parameter names for passing as ``**kwargs``
"""
kws = collections.OrderedDict()
for k in self.param_names:
kws[k.lower()] = self.params[k]
return kws
@property
def igm_kwargs(self):
"""
Keywords for initializing the IGM attenuation model
"""
igm_kwargs = {
"scale_tau": 1.0,
"add_cgm": self.__getitem__("ADD_CGM") in utils.TRUE_VALUES,
}
if "IGM_SCALE_TAU" in self.params:
igm_kwargs["scale_tau"] = self.params["IGM_SCALE_TAU"]
igm_kwargs["sigmoid_params"] = (
self.params["SIGMOID_PARAM1"],
self.params["SIGMOID_PARAM2"],
self.params["SIGMOID_PARAM3"],
)
return igm_kwargs
[docs]
class TranslateFile:
def __init__(self, file="zphot.translate"):
"""
File for translating catalog columns to associate bandbasses to them
The `file` has format
.. code-block::
flux_irac_ch1 F18
err_irac_ch1 E18
...
or a CSV table with format
.. code-block::
column, trans, error
flux_irac_ch1, F18
err_irac_ch1, E18, 1.0
...
where `flux_irac_ch1` is a column in the catalog table corresponding
to the IRAC 3.6 µm flux density. ``F18`` indicates that this is a
*flux density* column and is associated with filter number 18 in the
`~eazy.params.filters.FilterFile`.
``E18`` indicates an uncertainty column, and filters must have both
flux density and uncertainty columns to be considered.
The original catalog could have had column names ``F18`` and ``E18``
and not needed a translate file but it is generally preferable to have
more descriptive column names that aren't necessarily tied to a
particular `eazy` filter file.
Note, similarly, that columns like `F{N}` and `E{N}` are treated as
these types of flux and uncertainty columns. If they correspond to
something else, they should be "translated" to avoid confusion
"""
from astropy.table import Table
self.file = file
self.trans = collections.OrderedDict()
self.error = collections.OrderedDict()
if hasattr(file, "colnames"):
tr = file
self.file = "input_table.translate"
if "error" not in tr.colnames:
tr["error"] = 1.0
if tr.colnames != ["column", "trans", "error"]:
msg = f"table translate_file file must have columns"
msg += f" 'column', 'trans' [, 'error']. The file {file}"
msg += f" has columns {tr.colnames}."
raise ValueError(msg)
for i, k in enumerate(tr["column"]):
self.trans[k] = tr["trans"][i]
self.error[k] = tr["error"][i]
elif file.endswith("csv"):
tr = Table.read(file)
if "error" not in tr.colnames:
tr["error"] = 1.0
if tr.colnames != ["column", "trans", "error"]:
msg = f"csv translate_file file must have columns"
msg += f" 'column', 'trans' [, 'error']. The file {file}"
msg += f" has columns {tr.colnames}."
raise ValueError(msg)
for i, k in enumerate(tr["column"]):
self.trans[k] = tr["trans"][i]
self.error[k] = tr["error"][i]
else:
lines = open(file).readlines()
for line in lines:
spl = line.split()
if (line.strip() == "") | (len(spl) < 2):
continue
key = spl[0]
self.trans[key] = spl[1]
if len(spl) == 3:
self.error[key] = float(spl[2])
else:
self.error[key] = 1.0
[docs]
def change_error(self, filter=88, value=1.0e8):
"""
Modify uncertainties based on error scaling factors in translate file
"""
if isinstance(filter, str):
if "f_" in filter:
err_filt = filter.replace("f_", "e_")
else:
err_filt = "e" + filter
if err_filt in self.error:
self.error[err_filt] = value
return True
if isinstance(filter, int):
for key in self.trans.keys():
if self.trans[key] == "E{0:0d}".format(filter):
self.error[key] = value
return True
print("Filter {0} not found in list.".format(str(filter)))
[docs]
def write(self, file=None, show_ones=False):
"""
Write to an ascii file
"""
lines = []
for key in self.error:
line = "{0} {1}".format(key, self.trans[key])
if self.trans[key].startswith("E") & ((self.error[key] != 1.0) | show_ones):
line += " {0:.1f}".format(self.error[key])
lines.append(line + "\n")
if file is None:
file = self.file
if file:
fp = open(file, "w")
fp.writelines(lines)
fp.close()
else:
for line in lines:
print(line[:-1])
[docs]
def to_csv(self):
"""
Generate CSV string
"""
rows = "column,trans,error\n"
for k in self.error:
rows += f"{k},{self.trans[k]},{self.error[k]}\n"
return rows