Source code for analysis.lc_colors

#!/usr/bin/env python
# coding: utf-8

"""The ``lc_colors`` module fits tabulates the chi-squared values for colors modeled by
an SNCosmo model. It also tabulates modeled and observed delta color over 15
days.

Function Documentation
----------------------
"""

from copy import deepcopy

import numpy as np
from astropy.table import Table
from tqdm import tqdm

from . import utils
from .regression import fit_gaussian_process
from .regression import predict_c_15, predict_color


[docs]def create_empty_output_table(color_bands, suffixes): """Create an empty astropy table for storing color dependant values Returns an empty table with columns for each color defined by color_bands. Additional columns can be added with the same names but with an added suffix by specifying the ``suffixes`` argument. Args: color_bands (list[Tuple]): List of tuples with two band names suffixes (list[str]): Include additional columns with suffixes Returns: An empty, masked astropy table """ names, dtype = ['obj_id', 'source', 'version'], ['U100', 'U100', 'U100'] for b1, b2 in color_bands: col_name = f"{b1.split('_')[-1]}_{b2.split('_')[-1]}" names += [col_name] + [col_name + s for s in suffixes] dtype += [float for _ in range(len(suffixes) + 1)] return Table(names=names, dtype=dtype, masked=True)
[docs]def get_observed_color_times(data, band1, band2): """Return the time range for which observations overlap in two band passes Args: data (Table): Astropy table with columns 'band' and 'time' band1 (str): The name of a bandpass in data['band'] band2 (str): The name of a bandpass in data['band'] Returns: The start time The end time """ for band_name in (band1, band2): if band_name not in data['band']: raise ValueError(f'No observations for band {band_name}') band1_times = data[data['band'] == band1]['time'] band2_times = data[data['band'] == band2]['time'] # Check that bands have overlapping observations b1_start, b1_end = min(band1_times), max(band1_times) b2_start, b2_end = min(band2_times), max(band2_times) if (b1_start > b2_end) or (b2_start > b1_end): raise ValueError(f'Bands do not overlap: {band1} : {band2}') return max(b1_start, b2_start), min(b1_end, b2_end)
[docs]def calc_color_chisq(data_table, model, band1, band2, interval=1, prange=None): """Calculate the chi-squared for observed and modeled color Assumes time values in the data and model are relative to the same zero point. Args: data_table (Table): Data table from sndata (format_sncosmo=True) model (Model): An sncosmo model band1 (str): band2 (str): interval (int): Spacing between phases when summing chisq prange (tuple): Optional start and end phase for color evolution Returns: - The chi-square value - The degrees of freedom """ gp = fit_gaussian_process(data_table) try: obs_start, obs_end = get_observed_color_times(data_table, band1, band2) except ValueError: # No observations for given bands return np.nan, np.nan # Determine integration bounds phase_start, phase_end = obs_start, obs_end if prange is not None: phase_start, phase_end = prange[0], prange[-1] if (phase_start < obs_start) or (obs_end < phase_end): return np.nan, np.nan phase_range = np.arange(phase_start, phase_end + interval, interval) data_color, data_err = predict_color(gp, phase_range, band1, band2) model_color = model.color(band1, band2, 'ab', phase_range) chisq = np.sum((data_color - model_color) / data_err) return chisq, len(phase_range)
[docs]def tabulate_chisq( data_release, models, colors, interval=1, prange=None, out_path=None): """Tabulate color chi-squared for multiple models Integrate the chi-squared of the color evolution over the phase range ``prange``. The phase range should be specified relative to B-band max. If ``prange`` is not specified, use the largest integration range allowed by the data on a color by color basis. Args: data_release (module): An sndata data release models (list): A list of sncosmo models colors (list): List of tuples with band names interval (int): Spacing between phases when summing chisq prange (tuple): Optional start and end phases out_path (str): Optionally write to path with each iteration """ out_table = create_empty_output_table(colors, suffixes=['_dof']) out_table.meta['prange'] = prange out_table.meta['bands'] = colors for model in tqdm(models, desc='Models'): table_iterator = data_release.iter_data( verbose={'desc': 'Targets', 'position': 1}, # Put pbar on 2nd line filter_func=utils.filter_has_csp_data) for data_table in table_iterator: # Set model values so time values are relative to t0 model = deepcopy(model) obj_id = data_table.meta['obj_id'] z = data_table.meta['z'] t0 = utils.get_csp_t0(obj_id) ebv = utils.get_csp_ebv(obj_id) model.set(t0=t0, z=z, mwebv=ebv) # Shift observed times to be relative to t0 data_table = deepcopy(data_table) data_table['time'] = data_table['time'] - t0 # Populate output table new_row = [ data_table.meta['obj_id'], model.source.name, model.source.version ] mask = [False, False, False] for band1, band2 in colors: chisq, dof = calc_color_chisq( data_table, model, band1, band2, interval, prange) new_row += [chisq, dof] mask += np.isnan([chisq, dof]).tolist() out_table.add_row(new_row, mask=mask) if out_path: out_table.write(out_path, overwrite=True) return out_table
[docs]def tabulate_delta_15( data_release, models, band_combos, out_path=None, t0_band='csp_dr3_B'): """Tabulate observed and modeled delta color over 15 days Args: data_release (module): An sndata data release models (list): A list of sncosmo models band_combos (list): List of tuples with band names out_path (str): Optionally write to path with each iteration t0_band (str): Band to use when setting model t0 to peak Returns: An astropy table """ out_table = create_empty_output_table(band_combos, suffixes=['_err']) data_iter = data_release.iter_data( verbose={'desc': 'Targets', 'position': 0}, filter_func=utils.filter_has_csp_data) for data_table in data_iter: # Convert observation times to phase values obj_id = data_table.meta['obj_id'] t0 = utils.get_csp_t0(obj_id) data_table['time'] = data_table['time'] - t0 gp = fit_gaussian_process(data_table) new_row = [obj_id, 'CSP', 'DR3'] mask = [False for _ in new_row] for band1, band2 in band_combos: try: start_phase, end_phase = get_observed_color_times( data_table, band1, band2) if start_phase > 0 or end_phase < 15: raise ValueError except ValueError: new_row += [np.NAN, np.NAN] mask += [True, True] else: new_row.extend(predict_c_15(gp, band1, band2)) mask += [False, False] out_table.add_row(new_row, mask=mask) if out_path: out_table.write(out_path, overwrite=True) for model in models: new_row = ['model', model.source.name, model.source.version] mask = [False for _ in new_row] model = deepcopy(model) t0 = model.source.peakphase(t0_band) for band1, band2 in band_combos: c0 = model.color(band1, band2, 'ab', t0) c15 = model.color(band1, band2, 'ab', t0 + 15) new_row += [c15 - c0, np.NAN] mask += [False, True] out_table.add_row(new_row, mask=mask) if out_path: out_table.write(out_path, overwrite=True) return out_table