import numpy as np
import logging
from .bbox import Box
from .cache import Cache
from .renderer import NullRenderer, ConvolutionRenderer
logger = logging.getLogger("scarlet.initialisation")
[docs]def get_pixel_spectrum(
sky_coord, observations, correct_psf=False, models=None, concat=True
):
"""Get the spectrum at `sky_coord` in `observation`.
Yields the spectrum of a single-pixel source with flux 1 in every channel,
concatenated for all observations.
If `correct_psf`, it homogenizes the PSFs of the observations, which yields the
correct spectrum for a flux=1 point source.
If `model` is set, it reads of the value of the model at `sky_coord` and yields the
spectrum for that model.
Parameters
----------
sky_coord: tuple
Position in the observation
observations: instance or list of `~scarlet.Observation`
Observation to extract SED from.
correct_psf: bool
If PSF shape variations in the observations should be corrected.
models: instance or list of arrays
Rendered models for this source in every observation
concat: bool
Whether spectra from multiple observations are flattened
Returns
-------
spectrum: `~numpy.array`
"""
if models is not None:
assert correct_psf is False
if not hasattr(observations, "__iter__"):
observations = (observations,)
models = (models,)
else:
if models is not None:
assert len(models) == len(observations)
else:
models = (None,) * len(observations)
spectra = []
for obs, model in zip(observations, models):
pixel = obs.get_pixel(sky_coord)
index = np.round(pixel).astype("int")
spectrum = obs.data[:, index[0], index[1]].copy()
if correct_psf and obs.psf is not None:
# correct spectrum for PSF-induced change in peak pixel intensity
psf_model = obs.psf.get_model()
psf_peak = psf_model.max(axis=(1, 2))
spectrum /= psf_peak
elif model is not None:
model_value = model[:, index[0], index[1]].copy()
spectrum /= model_value
spectra.append(spectrum)
if np.any(spectrum <= 0):
# If the flux in all channels is <=0,
# the new sed will be filled with NaN values,
# which will cause the code to crash later
msg = f"Zero or negative spectrum {spectrum} at {sky_coord}"
if np.all(spectrum <= 0):
logger.warning(msg)
else:
logger.info(msg)
if concat:
spectra = np.concatenate(spectra).reshape(-1)
return spectra
[docs]def get_psf_spectrum(sky_coord, observations, compute_snr=False, concat=True):
"""Get spectrum for a point source at `sky_coord` in `observation`
Equivalent to point source photometry for isolated sources. For extended source,
this will underestimate the actual source flux in every channel. In case of crowding,
the resulting photometry is likely contaminated by neighbors.
Yields the spectrum of a PSF-homogenized source of flux 1 in every channel,
concatenated for all observations.
Parameters
----------
sky_coord: tuple
Position in the observation
observations: instance or list of `~scarlet.Observation`
Observation to extract the spectrum from.
compute_snr: bool
Whether the compute the SNR of a PSF at `sky_coord`
concat: bool
Whether spectra from multiple observations are flattened
Returns
-------
spectrum: ~numpy.array`
"""
if not hasattr(observations, "__iter__"):
observations = (observations,)
spectra = []
if compute_snr:
snr_num, snr_denom = [], []
for i, obs in enumerate(observations):
pixel = obs.get_pixel(sky_coord)
index = np.round(pixel).astype("int")
psf = obs.psf.get_model()
bbox = obs.psf.bbox + (0, *index)
img = bbox.extract_from(obs.data)
noise_rms = obs.noise_rms
# masked array doesn't survive extract_from
noise = bbox.extract_from(noise_rms)
noise_mask = bbox.extract_from(noise_rms.mask)
spectra.append([])
# apply mask: needs to be done separately in each channel
for c in range(obs.C):
# outside of observation or masked pixels have noise_mask = 0
mask = ~(noise_mask[c])
psf_ = psf[c, mask]
img_ = img[c, mask]
# amplitude of img when projected onto psf
# i.e. factor to multiply psf with to get img (if img looked like psf)
img_psf = img_ @ psf_
spectrum = img_psf / (psf_ @ psf_)
spectra[i].append(spectrum)
if compute_snr:
noise_ = noise[c, mask]
snr_num.append(img_psf)
snr_denom.append((psf_ * noise_ ** 2) @ psf_)
spectra[i] = np.array(spectra[i])
if np.any(spectra[i] <= 0):
# If the flux in all channels is <=0,
# the new sed will be filled with NaN values,
# which will cause the code to crash later
msg = f"Zero or negative spectrum {spectra[i]} at {sky_coord}"
if np.all(spectra[i] <= 0):
logger.warning(msg)
else:
logger.info(msg)
if concat:
spectra = np.concatenate(spectra).reshape(-1)
if compute_snr:
snr = np.sum(snr_num) / np.sqrt(np.sum(snr_denom))
return spectra, snr
return spectra
[docs]def get_minimal_boxsize(size, min_size=21, increment=10):
boxsize = min_size
while boxsize < size:
boxsize += increment # keep box sizes quite small
return boxsize
[docs]def trim_morphology(center_index, morph, bg_thresh=0, boxsize=None):
# trim morph to pixels above threshold
mask = morph > bg_thresh
morph[~mask] = 0
bbox = Box.from_data(morph, min_value=0)
# find fitting bbox
if bbox.contains(center_index):
size = 2 * max(
(
center_index[0] - bbox.start[-2],
bbox.stop[0] - center_index[-2],
center_index[1] - bbox.start[-1],
bbox.stop[1] - center_index[-1],
)
)
else:
size = 0
# define new box and cut morphology accordingly
if boxsize is None:
boxsize = get_minimal_boxsize(size)
bottom = center_index[0] - boxsize // 2
top = center_index[0] + boxsize // 2 + 1
left = center_index[1] - boxsize // 2
right = center_index[1] + boxsize // 2 + 1
bbox = Box.from_bounds((bottom, top), (left, right))
morph = bbox.extract_from(morph)
return morph, bbox
[docs]def build_initialization_image(observations, spectra=None):
"""Build a spectrum-weighted image from all observations.
Parameters
----------
observations: list of `~scarlet.observation.Observation`
Every observation with a suitable renderer will contribute to the initialization image,
according to its noise level.
spectra: list of arrays
for every observation: source spectrum to optimize for. If not set, assumes flat spectrum.
Returns
-------
image: array
image created by weighting all channels by spectrum
std: array
the effective noise standard deviation of `image`
"""
if not hasattr(observations, "__iter__"):
observations = (observations,)
spectra = (spectra,)
assert len(observations) == len(spectra)
model_frame = observations[0].model_frame
# check if detection images are stored in obs[0]
# storing in an obs avoids using the cache (see issue 256)
if not hasattr(observations[0], "_detect"):
# if not, map every obs and variance onto the model frame
detect, var = [], []
for i, obs in enumerate(observations):
# only works on unrotated simple frames
if not isinstance(obs.renderer, (NullRenderer, ConvolutionRenderer)):
continue
detect_ = np.zeros(model_frame.shape, dtype=model_frame.dtype)
var_ = np.zeros(model_frame.shape, dtype=model_frame.dtype)
data_slice, model_slice = obs.renderer.slices
obs.renderer.map_channels(detect_)[model_slice] += obs.data[data_slice]
obs.renderer.map_channels(var_)[model_slice] += (
obs.noise_rms[data_slice]
) ** 2
detect.append(detect_)
var.append(var_)
detect = np.array(detect) # L x C x Ny x Nx
var = np.array(var) # L x C x Ny x Nx
observations[0]._detect = (detect, var)
detect, var = observations[0]._detect
# spectrum SNR weighted combination of all observations
spectrum = []
for i, obs in enumerate(observations):
if not isinstance(obs.renderer, (NullRenderer, ConvolutionRenderer)):
continue
spectrum_ = np.zeros(model_frame.C)
if spectra[i] is not None:
obs.renderer.map_channels(spectrum_)[:] = spectra[i]
else:
obs.renderer.map_channels(spectrum_)[:] = 1
spectrum.append(spectrum_)
spectrum = np.stack(spectrum, axis=0)[:, :, None, None] # L x C x Ny x Nx
weight = np.zeros(var.shape)
sel = var > 0
weight[sel] = 1 / var[sel]
weight *= spectrum
detect = (weight * detect).sum(axis=(0, 1))
var = (spectrum * weight).sum(axis=(0, 1))
return detect, np.sqrt(var)
[docs]def init_all_sources(
frame,
centers,
observations,
thresh=1,
max_components=1,
min_components=1,
min_snr=50,
shifting=False,
resizing=True,
boxsize=None,
fallback=True,
silent=False,
set_spectra=True,
):
"""Initialize all sources in a blend
Seeks to initialize sources at the sky positions of `centers` with multiple
components of type `ExtendedSource`. If each component has sufficient SNR, the
model will be kept, otherwise one component is removed and the source reinitialized.
If a source cannot be initialized, its index is returned in `skipped`.
See `~init_source` for a description of the arguments
Parameters
----------
centers : list of tuples
`(y, x)` center location for each source in sky coordinates
silent: bool
If set to True, will prevent exceptions from being thrown abd register the
source index in a list of skipped sources.
set_spectra: bool
If set to True, will solve for the best spectra of all sources given the
observations. See `set_spectra_to_match` for details.
Returns
-------
sources: list
List of intialized sources, where each source derives from the
`~scarlet.Component` class.
skipped: list
This list contains sources that failed to initialize with `silent` = True
"""
if not hasattr(observations, "__iter__"):
observations = (observations,)
# Only deblend sources that can be initialized
sources = []
skipped = []
for k, center in enumerate(centers):
try:
source = init_source(
frame,
center,
observations,
thresh=thresh,
max_components=max_components,
min_components=min_components,
min_snr=min_snr,
shifting=shifting,
resizing=resizing,
boxsize=boxsize,
fallback=fallback,
)
sources.append(source)
except Exception as e:
msg = f"Failed to initialize source {k}"
logger.warning(msg)
if silent:
skipped.append(k)
else:
raise e
if set_spectra:
set_spectra_to_match(sources, observations)
return sources, skipped
[docs]def init_source(
frame,
center,
observations,
thresh=1,
max_components=1,
min_components=1,
min_snr=50,
shifting=False,
resizing=True,
boxsize=None,
fallback=True,
):
"""Initialize a Source
The user can specify the number of desired components
for the modeled source. If scarlet cannot initialize a
model with the desired number of components it continues
to attempt initialization of one fewer component until
it finds a model that can be initialized.
It is possible that scarlet will be unable to initialize a
source with the desired number of components, for example
a two component source might have degenerate components,
a single component source might not have enough signal in
the joint coadd (all bands combined together into
single signal-to-noise weighted image for initialization)
to initialize, and a true spurious detection will not have
enough signal to initialize as a point source.
If all of the models fail, including a `CompactExtendedSource` model,
then this source is skipped.
Parameters
----------
frame : `~scarlet.Frame`
The model frame for the scene
center : `tuple` of `float``
`(y, x)` location for the center of the source.
observations : instance or list of `~scarlet.Observation`
The `Observation` that contains the images, weights, and PSF
used to generate the model.
thresh : `float`
Multiple of the backround RMS used as a flux cutoff for morphology initialization
max_components : int
The maximum number of components in a source.
If `fallback` is `True` then when
a source fails to initialize with `max_components` it
will continue to subtract one from the number of components
until it reaches zero (which fits a `CompactExtendedSource`).
If a point source cannot be fit then the source is skipped.
min_components : int
The minimum number of components in a source.
Only relevent for `fallback=True`.
min_snr: float
Mininmum SNR per component of a multi-component source.
If fallback=True, every component needs to have at least min_snr, otherwise the component number will be reduced
At last resort, the component is initialized with a PointSource morphology
shifting : bool
Whether or not to fit the position of a source.
This is an expensive operation and is typically only used when
a source is on the edge of the detector.
resizing : bool
Whether or not to change the size of the source box.
boxsize: int or None
Spatial size of the source box
fallback : bool
Whether to reduce the number of components
if the model cannot be initialized with `max_components`.
Fallback = False is unlikely to be used in production
but can be useful for troubleshooting when an error can cause
a particular source class to fail every time.
prerender: bool
Whether to initialize the source with pre-rendered observations.
This is an experimental feature, which may be removed in the future.
"""
from .source import ExtendedSource
if not hasattr(observations, "__iter__"):
observations = (observations,)
if fallback:
_, psf_snr = get_psf_spectrum(center, observations, compute_snr=True)
max_components = np.min(
[
max_components,
np.max([min_components, np.floor(psf_snr / min_snr).astype("int")]),
]
)
while max_components >= 0:
try:
if max_components > 0:
source = ExtendedSource(
frame,
center,
observations,
thresh=thresh,
shifting=shifting,
resizing=resizing,
boxsize=boxsize,
K=max_components,
)
else:
source = ExtendedSource(
frame,
center,
observations,
shifting=shifting,
resizing=resizing,
boxsize=boxsize,
compact=True,
)
# test if parameters are fine, otherwise throw ArithmeticError
source.check_parameters()
except ArithmeticError as e:
if fallback:
msg = f"Could not initialize source at {center} with {max_components} components: {e}"
logger.info(msg)
max_components -= 1
continue
else:
raise e
return source
[docs]def set_spectra_to_match(sources, observations):
"""Sets the spectra of any `FactorizedComponent` to match the Observations.
Computes the best-fit amplitude of the rendered model of the sources in every
channel of every observation as a linear inverse problem.
Parameters
----------
sources: list of sources
Only `FactorizedComponent` or `CombinedComponent` will be considered
observations: `scarlet.Observation` or list thereof
"""
from .component import FactorizedComponent, CombinedComponent
if not hasattr(observations, "__iter__"):
observations = (observations,)
model_frame = observations[0].model_frame
# extract multi-channel model for every non-degenerate component
parameters = []
update_of = []
models = []
for i, src in enumerate(sources):
if isinstance(src, CombinedComponent):
components = src.children
else:
components = (src,)
for j, c in enumerate(components):
p = c.get_parameter(
"spectrum"
) # returns None of c doesn't have parameter "spectrum"
parameters.append(p)
# correct for different flux in channels to have flat-spectrum component
if p is not None and not p.fixed:
p[:] = 1
model = c.get_model(frame=model_frame)
# check for models with identical initializations, see #282
# if duplicate: remove morph[k] from linear fit, but keep track of parameters[k]
# to set spectrum later: update_of: component index -> updated spectrum index
K_ = len(models)
update_of.append(K_)
for l in range(K_):
if np.allclose(model, models[l]):
update_of[-1] = l
message = f"Source {i}, Component {j} has a model identical to another component.\n"
message += "This is likely not intended, and the source/component should be deleted. "
message += "Spectra will be identical."
logger.warning(message)
if update_of[-1] == K_:
models.append(model)
models = np.array(models)
K = len(parameters)
K_ = len(models)
for obs in observations:
# independent channels, no mixing
# solve the linear inverse problem of the amplitudes in every channel
# given all the rendered morphologies
# spectrum = (M^T Sigma^-1 M)^-1 M^T Sigma^-1 * im
C = obs.C
images = obs.data
weights = obs.weights
morphs = np.stack([obs.render(model) for model in models], axis=0)
spectra = np.zeros((K_, C))
for c in range(C):
im = images[c].reshape(-1)
w = weights[c].reshape(-1)
m = morphs[:, c, :, :].reshape(K_, -1)
mw = m * w[None, :]
# check if all components have nonzero flux in c.
# because of convolutions, flux can be outside the box,
# so we need to compare weighted flux with unweighted flux,
# which is the same (up to a constant) for constant weights.
# so we check if *most* of the flux is from pixels with non-zero weight
nonzero = np.sum(mw, axis=1) / np.sum(m, axis=1) / np.mean(w) > 0.1
nonzero = np.flatnonzero(nonzero)
if len(nonzero) == K_:
covar = np.linalg.inv(mw @ m.T)
spectra[:, c] = covar @ m @ (im * w)
else:
covar = np.linalg.inv(mw[nonzero] @ m[nonzero].T)
spectra[nonzero, c] = covar @ m[nonzero] @ (im * w)
# update the parameters with the best-fit spectrum solution
for k, p in enumerate(parameters):
if p is not None and not p.fixed:
l = update_of[k]
obs.renderer.map_channels(p)[:] = spectra[l]
# enforce constraints
for p in parameters:
if p is not None and p.constraint is not None:
p[:] = p.constraint(p, 0)