Source code for analysis.equivalent_width._calc_ew

#!/usr/bin/env python3
# -*- coding: UTF-8 -*-

"""This module provides generic functions for measuring the pseudo equivalent
width of arbitrary features for a one or more spectra.
"""

from pathlib import Path

import numpy as np
import yaml
from astropy.table import Table, vstack
from tqdm import tqdm

from .. import utils
from ..exceptions import NoCSPData, UnobservedFeature

with open(Path(__file__).parent / 'features.yml') as infile:
    FEATURES = yaml.load(infile, Loader=yaml.FullLoader)


# noinspection PyTypeChecker, PyUnresolvedReferences
def get_peak_wavelength(
        wavelength, flux, lower_bound, upper_bound, behavior='min'):
    """Return wavelength of the maximum flux within given wavelength bounds

    The behavior argument can be used to select the 'min' or 'max' wavelength
    when there are multiple wavelengths having the same peak flux value. The
    default behavior is 'min'.

    Args:
        wavelength (ndarray): An array of wavelength values
        flux       (ndarray): An array of flux values
        lower_bound  (float): Lower wavelength boundary
        upper_bound  (float): Upper wavelength boundary
        behavior       (str): Return the 'min' or 'max' wavelength

    Returns:
        The wavelength for the maximum flux value
        The maximum flux value
    """

    # Make sure the given spectrum spans the given wavelength bounds
    if (min(wavelength) > lower_bound) or (upper_bound > max(wavelength)):
        raise UnobservedFeature('Feature not in spectral wavelength range.')

    # Select the portion of the spectrum within the given bounds
    feature_indices = (lower_bound <= wavelength) & (wavelength <= upper_bound)
    feature_flux = flux[feature_indices]
    feature_wavelength = wavelength[feature_indices]

    peak_indices = np.argwhere(feature_flux == np.max(feature_flux))
    behavior_func = {'min': np.min, 'max': np.max}[behavior]
    return behavior_func(feature_wavelength[peak_indices])


[docs]def get_feature_bounds(wavelength, flux, feature): """Get the start and end wavelengths / flux for a given feature Args: wavelength (ndarray): An array of wavelength values flux (ndarray): An array of flux values feature (row): A dictionary defining feature parameters Returns: The starting wavelength of the feature The ending wavelength of the feature """ feat_start = get_peak_wavelength( wavelength, flux, feature['lower_blue'], feature['upper_blue'], 'min') feat_end = get_peak_wavelength( wavelength, flux, feature['lower_red'], feature['upper_red'], 'max') return feat_start, feat_end
[docs]def fit_continuum_func(wavelength, flux, feat_start, feat_end): """Fit the pseudo continuum for a given feature Args: wavelength (ndarray): An array of wavelength values flux (ndarray): An array of flux values feat_start (float): The starting wavelength of the feature feat_end (float): The ending wavelength of the feature Return: A linear function fit to the flux values bounding a feature """ if not ((feat_start in wavelength) and (feat_end in wavelength)): raise ValueError('Feature bounds not in wavelength array.') blue_flux = flux[wavelength == feat_start] red_flux = flux[wavelength == feat_end] m = (red_flux - blue_flux) / (feat_end - feat_start) b = blue_flux - m * feat_start return lambda wave: m * np.array(wave) + b
# noinspection PyTypeChecker, PyUnresolvedReferences
[docs]def calc_pew(wavelength, flux, feature=None, feat_start=None, feat_end=None): """Generic function for calculating the pseudo equivalent width If ``feat_start`` and ``feat_end`` are given, use them to determine the continuum flux. If they aren't specified by ``feature`` is, estimate the boundaries of the feature. Args: wavelength (ndarray): An array of wavelength values flux (ndarray): An array of flux values feature (dict): A dictionary defining feature parameters feat_start (float): The starting wavelength of the feature feat_end (float): The ending wavelength of the feature Returns: The pseudo equivalent width as a float """ if (feat_start is None) or (feat_end is None): if feature is None: raise ValueError( 'Must specify either feature bounds or feature dict.') feat_start, feat_end = get_feature_bounds(wavelength, flux, feature) # Select the portion of the spectrum within the given bounds indices = (feat_start <= wavelength) & (wavelength <= feat_end) feature_wave = wavelength[indices] feature_flux = flux[indices] # Normalize the spectrum and calculate the EW cont_func = fit_continuum_func(wavelength, flux, feat_start, feat_end) continuum_flux = cont_func(feature_wave) normalized_flux = feature_flux / continuum_flux pew = np.trapz(1 - normalized_flux, feature_wave) return pew, feat_start, feat_end
def create_pew_summary_table(models): """Create an astropy Table to store pew results Args: models (list): A list of sncosmo models Returns: An astropy table """ # First row represents observed data model_names = [f'{m.source.name}' for m in models] model_versions = [f'{m.source.version}' for m in models] out_table = Table( data=[['OBSERVED'] + model_names, [''] + model_versions], names=['model', 'version'], dtype=['U100', 'U100']) return out_table
[docs]def tabulate_pew_spectrum(phase, wave, flux, models=(), fix_boundaries=True): """Tabulate the observed and modeled pew for multiple features Args: phase (float): The phase of the observed spectrum wave (ndarray): An array of wavelength values flux (ndarray): An array of flux values models (list): A list of sncosmo models fix_boundaries (bool): Fix feature boundaries to observed values Returns: An astropy table """ out_table = create_pew_summary_table(models) out_table['phase'] = phase for feat_name, feature in FEATURES.items(): # Calculate pew for observed data try: pew, feat_start, feat_end = calc_pew(wave, flux, feature) pew_data = [[pew, feat_start, feat_end]] except UnobservedFeature: continue # Reset feature boundaries if not fix_boundaries: feat_start, feat_end = None, None # Calculate pew for models for model in models: # Shift time to beginning of explosion t0 = model.source.peakphase('csp_dr3_B') model_pew_results = calc_pew( wave, model.flux(t0 + phase, wave), feature, feat_start, feat_end) pew_data.append(model_pew_results) new_columns = np.transpose(pew_data) out_table[feat_name] = new_columns[0] out_table[feat_name + '_start'] = new_columns[1] out_table[feat_name + '_end'] = new_columns[2] return out_table
[docs]def tabulate_pew_spectra( data_release, models=(), fix_boundaries=True): """Tabulate the pseudo equivalent widths for multiple spectra / features Args: data_release (module): An sndata data release models (list): A list of sncosmo models fix_boundaries (bool): Fix feature boundaries to observed values Returns: A table of equivalent widths over time """ pew_data = [] id_iter = tqdm( data_release.get_available_ids(), desc='Targets', total=len(data_release.get_available_ids())) for obj_id in id_iter: data_table = data_release.get_data_for_id(obj_id) time, wavelength, flux = utils.parse_spectra_table(data_table) try: for model in models: model.set(extebv=utils.get_csp_ebv(obj_id)) # Shift observed time to B-band peak phase = time - utils.get_csp_t0(obj_id) except NoCSPData: continue spectra_iter = tqdm( iterable=zip(phase, wavelength, flux), desc='Spectra', position=1, total=len(phase)) pew_table = vstack([tabulate_pew_spectrum(*s, models, fix_boundaries) for s in spectra_iter]) pew_table['obj_id'] = data_table.meta['obj_id'] pew_data.append(pew_table) return vstack(pew_data)
[docs]def tabulate_peak_model_pew(models): """Tabulate the pew for each feature at time of B band maximum Args: models (list): A list of sncosmo models Returns: A table of equivalent widths for each feature """ out_table = create_pew_summary_table(models) out_table.remove_row(0) # Remove row for observed data for feat_name, feature in FEATURES.items(): pew_data = [] for model in models: t0 = model.source.peakphase('csp_dr3_B') wave = model.source.interpolated_model()[1] wave = wave[(wave > 3000) & (wave < 10000)] model_pew_results = calc_pew(wave, model.flux(t0, wave), feature) pew_data.append(model_pew_results) new_columns = np.transpose(pew_data) out_table[feat_name] = new_columns[0] out_table[feat_name + '_start'] = new_columns[1] out_table[feat_name + '_end'] = new_columns[2] return out_table