Source code for scarlet.source

import autograd.numpy as np
from functools import partial
import logging

from . import initialization as init
from . import operator
from .bbox import Box, overlapped_slices
from .component import Component, CombinedComponent, FactorizedComponent
from .constraint import CenterOnConstraint, PositivityConstraint
from .morphology import (
    ImageMorphology,
    PointSourceMorphology,
    StarletMorphology,
    ExtendedSourceMorphology,
)
from .parameter import Parameter, relative_step
from .spectrum import TabulatedSpectrum

logger = logging.getLogger("scarlet.source")


[docs]class NullSource(Component): """Source that does nothing""" def __init__(self, model_frame): """ Initialize a NullSource Parameters ---------- model_frame: `~scarlet.Frame` The frame of the model """ super().__init__(model_frame)
[docs] def get_model(self, *parameters, frame=None): """Get the model for this component. Parameters ---------- parameters: tuple of optimimzation parameters frame: `~scarlet.frame.Frame` Frame to project the model into. If `frame` is `None` then the model contained in `bbox` is returned. Returns ------- model: array (Channels, Height, Width) image of the model """ model = np.zeros(self.frame.shape) # project the model into frame (if necessary) if frame is not None: model = self.model_to_box(frame.bbox, model) return model
[docs]class RandomSource(FactorizedComponent): """Sources with uniform random morphology and sed. For cases with no well-defined spatial shape, this source initializes a uniform random field and (optionally) matches the SED to match a given observation. """ def __init__(self, model_frame, observations=None): """Source intialized as random field. Parameters ---------- model_frame: `~scarlet.Frame` The frame of the model observations: instance or list of `~scarlet.Observation` Observation to initialize the SED of the source """ C, Ny, Nx = model_frame.bbox.shape image = np.random.rand(Ny, Nx) morphology = ImageMorphology(model_frame, image) if observations is None: spectrum = np.random.rand(C) else: spectrum = init.get_best_fit_spectrum(image[None], observations)[0] # default is step=1e-2, using larger steps here becaus SED is probably uncertain spectrum = Parameter( spectrum, name="spectrum", step=partial(relative_step, factor=1e-1), constraint=PositivityConstraint(), ) spectrum = TabulatedSpectrum(model_frame, spectrum) super().__init__(model_frame, spectrum, morphology)
[docs]class PointSource(FactorizedComponent): """Point-Source model Point sources modeled as `model_frame.psfs`, centered at `sky_coord`. Their SEDs are taken from `observations` at the center pixel. """ def __init__(self, model_frame, sky_coord, observations): """Source intialized with a single pixel Parameters ---------- model_frame: `~scarlet.Frame` The frame of the model sky_coord: tuple Center of the source observations: instance or list of `~scarlet.Observation` Observation(s) to initialize this source """ if not hasattr(observations, "__iter__"): observations = (observations,) center = model_frame.get_pixel(sky_coord) morphology = PointSourceMorphology(model_frame, center) # get spectrum from peak pixel, correct for PSF spectra = init.get_pixel_spectrum(sky_coord, observations, correct_psf=True) spectrum = np.concatenate(spectra, axis=0) noise_rms = np.concatenate( [np.array(np.mean(obs.noise_rms, axis=(1, 2))) for obs in observations] ).reshape(-1) spectrum = TabulatedSpectrum(model_frame, spectrum, min_step=noise_rms) super().__init__(model_frame, spectrum, morphology) # retain center as attribute self.center = morphology.center
[docs]class CompactExtendedSource(FactorizedComponent): def __init__( self, model_frame, sky_coord, observations, shifting=False, resizing=True, boxsize=None, ): """Compact extended source model The model is initialized from `observations` with a point-source morphology and a spectrum from its peak pixel. During optimization it enforces positivitiy for spectrum and morphology, as well as monotonicity of the morphology. Parameters ---------- model_frame: `~scarlet.Frame` The frame of the model sky_coord: tuple Center of the source observations: instance or list of `~scarlet.observation.Observation` Observation(s) to initialize this source. shifting: `bool` Whether or not a subpixel shift is added as optimization parameter resizing : bool Whether or not to change the size of the source box. boxsize: int or None Spatial size of the source box """ if not hasattr(observations, "__iter__"): observations = (observations,) # initialize morphology from model_frame psf assert model_frame.psf is not None morph, bbox = self.init_morph(model_frame, sky_coord, boxsize=boxsize) center = model_frame.get_pixel(sky_coord) morphology = ExtendedSourceMorphology( model_frame, center, morph, bbox=bbox, monotonic="angle", symmetric=False, min_grad=0, shifting=shifting, resizing=resizing, ) # get spectrum from peak pixel, correct for PSF spectra = init.get_pixel_spectrum(sky_coord, observations, correct_psf=True) spectrum = np.concatenate(spectra, axis=0) spectrum /= morph.sum() noise_rms = np.concatenate( [np.array(np.mean(obs.noise_rms, axis=(1, 2))) for obs in observations] ).reshape(-1) spectrum = TabulatedSpectrum(model_frame, spectrum, min_step=noise_rms) # set up model with its parameters super().__init__(model_frame, spectrum, morphology) # retain center as attribute self.center = morphology.center
[docs] @staticmethod def init_morph(frame, sky_coord, boxsize=None): """Initialize a source just like `init_extended_morphology`, but with the morphology of a point source. Parameters ---------- frame: `~scarlet.Frame` The model frame sky_coord: tuple Center of the source boxsize: int or None Size of morph box Returns ------- morph, bbox """ # position in frame coordinates center = frame.get_pixel(sky_coord) center_index = np.round(center).astype("int") # morphology initialized as a point source morph_ = frame.psf.get_model().mean(axis=0) origin = ( center_index[0] - (morph_.shape[0] // 2), center_index[1] - (morph_.shape[1] // 2), ) bbox_ = Box(morph_.shape, origin=origin) if boxsize is None: size = max(morph_.shape) boxsize = init.get_minimal_boxsize(size) # adjust box size to conform with extended sources morph = np.zeros((boxsize, boxsize)) origin = ( center_index[0] - (morph.shape[0] // 2), center_index[1] - (morph.shape[1] // 2), ) bbox = Box(morph.shape, origin=origin) slices = overlapped_slices(bbox, bbox_) morph[slices[0]] = morph_[slices[1]] # apply max normalization morph /= morph.max() return morph, bbox
[docs]class SingleExtendedSource(FactorizedComponent): def __init__( self, model_frame, sky_coord, observations, thresh=1.0, shifting=False, resizing=True, boxsize=None, ): """Extended source model The model is initialized from `observations` with a symmetric and monotonic profile and a spectrum from its peak pixel. During optimization it enforces positivitiy for spectrum and morphology, as well as monotonicity of the morphology. Parameters ---------- model_frame: `~scarlet.Frame` The frame of the model sky_coord: tuple Center of the source observations: instance or list of `~scarlet.observation.Observation` Observation(s) to initialize this source. thresh: `float` Multiple of the backround RMS used as a flux cutoff for morphology initialization. compact: `bool` Initialize with the shape of a point source shifting: `bool` Whether or not a subpixel shift is added as optimization parameter resizing : bool Whether or not to change the size of the source box. boxsize: int or None Spatial size of the source box """ if not hasattr(observations, "__iter__"): observations = (observations,) # get center pixel spectrum # this is from convolved image: weighs higher emission *and* narrow PSF spectra = init.get_pixel_spectrum(sky_coord, observations) # initialize morphology # compute optimal SNR coadd for detection image, std = init.build_initialization_image(observations, spectra=spectra) # make monotonic morphology, trimmed to box with pixels above std morph, bbox = self.init_morph( model_frame, sky_coord, image, std, thresh=thresh, symmetric=True, monotonic="flat", min_grad=0, boxsize=boxsize, ) center = model_frame.get_pixel(sky_coord) morphology = ExtendedSourceMorphology( model_frame, center, morph, bbox=bbox, monotonic="angle", symmetric=False, min_grad=0, shifting=shifting, resizing=resizing, ) # find best-fit spectra for morph from init coadd # assumes img only has that source in region of the box detect_all, std_all = init.build_initialization_image(observations) box_3D = Box((model_frame.C,)) @ bbox boxed_detect = box_3D.extract_from(detect_all) spectrum = init.get_best_fit_spectrum((morph,), boxed_detect) noise_rms = np.concatenate( [np.array(np.mean(obs.noise_rms, axis=(1, 2))) for obs in observations] ).reshape(-1) spectrum = TabulatedSpectrum(model_frame, spectrum, min_step=noise_rms) # set up model with its parameters super().__init__(model_frame, spectrum, morphology) # retain center as attribute self.center = morphology.center
[docs] @staticmethod def init_morph( frame, sky_coord, detect, detect_std, thresh=1, symmetric=True, monotonic="flat", min_grad=0, boxsize=None, ): """Initialize the source that is symmetric and monotonic See `ExtendedSource` for a description of the parameters Returns ------- morph, bbox """ # position in frame coordinates center = frame.get_pixel(sky_coord) center_index = np.round(center).astype("int") # Copy detect if reused for other sources im = detect.copy() # Apply the necessary constraints if symmetric: im = operator.prox_uncentered_symmetry( im, 0, center=center_index, algorithm="sdss", # *1 is to artificially pass a variable that is not coadd ) if monotonic: if monotonic is True: monotonic = "angle" # use finite thresh to remove flat bridges prox_monotonic = operator.prox_weighted_monotonic( im.shape, neighbor_weight=monotonic, center=center_index, min_gradient=min_grad, ) im = prox_monotonic(im, 0).reshape(im.shape) # truncate morph at thresh * bg_rms threshold = detect_std * thresh morph, bbox = init.trim_morphology( center_index, im, bg_thresh=threshold, boxsize=boxsize ) # normalize to unity at peak pixel for the imposed normalization if morph.sum() > 0: morph /= morph.max() else: msg = f"No flux in morphology model for source at {sky_coord}" logger.warning(msg) morph = CenterOnConstraint(tiny=1)(morph, 0) # for very noise inits, there is only 1 or few pixels in the center: # pad morph with the shape of the PSF if frame.psf is not None: psf_morph, _ = CompactExtendedSource.init_morph( frame, sky_coord, boxsize=max(bbox.shape) ) morph = np.maximum(morph, psf_morph) return morph, bbox
[docs]class StarletSource(FactorizedComponent): """Source with a starlet morphology model Sources are initialized as `~scarlet.ExtendedSource`, and the morphology is then transformed into starlet coefficients. """ def __init__( self, model_frame, sky_coord=None, observations=None, spectrum=None, thresh=1.0, starlet_thresh=5e-3, boxsize=None, ): """Extended source intialized to match a set of observations Parameters ---------- model_frame: `~scarlet.Frame` The frame of the model sky_coord: tuple Center of the source observations: instance or list of `~scarlet.observation.Observation` Observation(s) to initialize this source. thresh: `float` Multiple of the backround RMS used as a flux cutoff for morphology initialization. spectrum: `numpy.ndarray` or `scarlet.Parameter` Initial spectrum, otherwise given by `ExtendedSource` initialization starlet_thresh: `float` Multiple of the backround RMS used as a flux cutoff for starlet threshold (usually between 5 and 3). boxsize: int or None Spatial size of the source box """ if sky_coord is None: source = RandomSource( model_frame, ) else: source = ExtendedSource( model_frame, sky_coord, observations, thresh=thresh, boxsize=boxsize ) source = StarletSource.from_source(source, starlet_thresh=starlet_thresh) if spectrum is not None: if isinstance(spectrum, Parameter): assert spectrum.name == "spectrum" else: noise_rms = np.concatenate( [np.array(np.mean(obs.noise_rms, axis=(1, 2))) for obs in observations] ).reshape(-1) spectrum = TabulatedSpectrum(model_frame, spectrum, min_step=noise_rms) source.children[0] = spectrum # still need to init *this* source super().__init__(source.frame, *source.children)
[docs] @classmethod def from_source(cls, source, starlet_thresh=5e-3): assert isinstance(source, FactorizedComponent) frame = source.frame spectrum, morphology = source.children morph = morphology.get_model() bbox = morphology.bbox # transform to starlets morphology = StarletMorphology( frame, morph, bbox=bbox, threshold=starlet_thresh ) # this trick gets us the proper class while call init on the base class obj = cls.__new__(cls) super(StarletSource, obj).__init__(frame, spectrum, morphology) return obj
[docs]class MultiExtendedSource(CombinedComponent): """Extended source with multiple components layered vertically Uses `~scarlet.ExtendedSource` to define the overall morphology, then erodes the outer footprint until it reaches the specified size percentile. For the narrower footprint, it evaluates the mean value at the perimeter and sets the inside to the perimeter value, creating a flat distribution inside. The subsequent component(s) is/are set to the difference between the flattened and the overall morphology. The SED for all components is calculated as the best fit of the multi-component morphology to the multi-channel image in the region of the source. """ def __init__( self, model_frame, sky_coord, observations, K=2, flux_percentiles=None, thresh=1.0, shifting=False, resizing=True, boxsize=None, ): """Create multi-component extended source. Parameters ---------- model_frame: `~scarlet.Frame` The frame of the model sky_coord: tuple Center of the source observations: instance or list of `~scarlet.observation.Observation` Observation(s) to initialize this source. K: int Number of stacked components flux_percentiles: list Flux percentile of each component as the transition point between components. If pixel value is below the first precentile, it becomes part of the outermost component. If it is above, the percentile value will be subtracted and the remainder attributed to the next component. If `flux_percentiles` is `None` then `flux_percentiles=[25,]`. thresh: `float` Multiple of the backround RMS used as a flux cutoff for morphology initialization. shifting: `bool` Whether or not a subpixel shift is added as optimization parameter resizing : bool Whether or not to change the size of the source box. boxsize: int or None Spatial size of the source box """ if flux_percentiles is None: flux_percentiles = (25,) assert K == len(flux_percentiles) + 1 # initialize from observation if not hasattr(observations, "__iter__"): observations = (observations,) # start off with regular ExtendedSource source = ExtendedSource( model_frame, sky_coord, observations, thresh=thresh, boxsize=boxsize ) _, morphology = source.children morphs, boxes = self.init_morphs(morphology, flux_percentiles) # find best-fit spectra for each of morph from the observations # assumes observations only have that one source in region of the box detect_all, std_all = init.build_initialization_image(observations) box_3D = Box((model_frame.C,)) @ boxes[0] boxed_detect = box_3D.extract_from(detect_all) spectra = init.get_best_fit_spectrum(morphs, boxed_detect) noise_rms = np.concatenate( [np.array(np.mean(obs.noise_rms, axis=(1, 2))) for obs in observations] ).reshape(-1) # create one component for each spectrum and morphology components = [] center = model_frame.get_pixel(sky_coord) for k in range(K): spectrum = TabulatedSpectrum( model_frame, spectra[k], min_step=noise_rms / 10 ) morphology = ExtendedSourceMorphology( model_frame, center, morphs[k], bbox=boxes[k], monotonic="angle", symmetric=False, min_grad=0, shifting=shifting, resizing=resizing, ) self.center = morphology.center component = FactorizedComponent(model_frame, spectrum, morphology) components.append(component) super().__init__(components)
[docs] @staticmethod def init_morphs(morphology, flux_percentiles): morph = morphology.get_model() bbox = morphology.bbox # create a list of components from base morph by layering them on top of # each other so that they sum up to morph K = len(flux_percentiles) + 1 Ny, Nx = morph.shape morphs = np.zeros((K, Ny, Nx), dtype=morph.dtype) morphs[0, :, :] = morph[:, :] max_flux = morph.max() percentiles_ = np.sort(flux_percentiles) last_thresh = 0 for k in range(1, K): perc = percentiles_[k - 1] flux_thresh = perc * max_flux / 100 mask_ = morph > flux_thresh morphs[k - 1][mask_] = flux_thresh - last_thresh morphs[k][mask_] = morph[mask_] - flux_thresh last_thresh = flux_thresh # renormalize morphs: initially Smax for k in range(K): if np.all(morphs[k] <= 0): msg = f"Zero or negative morphology for component {k} at {sky_coord}" logger.warning(msg) morphs[k] /= morphs[k].max() # avoid using the same box for multiple components boxes = tuple(bbox.copy() for k in range(K)) return morphs, boxes
[docs]def append_docs_from(other_func): def doc(func): func.__doc__ = func.__doc__ + "\n\n" + other_func.__doc__ return func return doc
# factory two switch between single and multi-ExtendedSource
[docs]@append_docs_from(MultiExtendedSource.__init__) def ExtendedSource( model_frame, sky_coord, observations, K=1, flux_percentiles=None, thresh=1.0, compact=False, shifting=False, resizing=True, boxsize=None, ): """Create extended sources with either a single component or multiple components. If `K== 1`, a single instance of `SingleExtendedSource` is returned, otherwise and instance of `MultiExtendedSource` is returned. """ if compact: return CompactExtendedSource( model_frame, sky_coord, observations, shifting=shifting, resizing=resizing, boxsize=boxsize, ) if K == 1: return SingleExtendedSource( model_frame, sky_coord, observations, thresh=thresh, shifting=shifting, resizing=resizing, boxsize=boxsize, ) else: return MultiExtendedSource( model_frame, sky_coord, observations, K=K, flux_percentiles=flux_percentiles, thresh=thresh, shifting=shifting, resizing=resizing, boxsize=boxsize, )