Source code for scarlet.observation

import autograd.numpy as np

from . import interpolation
from .bbox import Box, overlapped_slices
from .frame import Frame
from .renderer import Renderer, NullRenderer, ConvolutionRenderer, ResolutionRenderer


[docs]class Observation(Frame): """Data and metadata for a single set of observations Attributes ---------- data: array 3D data cube (channels, Ny, Nx) of the image in each band. weights: array or tensor Weight for each pixel in `data`. If a set of masks exists for the observations then then any masked pixels should have their `weight` set to zero. """ def __init__(self, data, channels, psf=None, weights=None, wcs=None, padding=10): """Create an Observation Parameters --------- data: array or tensor 3D data cube (Channel, Height, Width) of the image in each band. psf: `scarlet.PSF` or its arguments PSF in each channel. Can be 3D cube of data stacked in channel direction. weights: array or tensor Weight for each pixel in `data`. If a set of masks exists for the observations then then any masked pixels should have their `weight` set to zero. wcs: TBD World Coordinate System associated with the data. channels: list of hashable elements Names/identifiers of spectral channels padding: int Number of pixels to pad each side with, in addition to half the width of the PSF, for FFTs. This is needed to prevent artifacts from the FFT. """ super().__init__( data.shape, wcs=wcs, psf=psf, channels=channels, dtype=data.dtype ) self.data = data if weights is not None: self.weights = weights else: self.weights = np.ones(data.shape, dtype=data.dtype) assert ( self.weights.shape == self.data.shape ), "Weights needs to have same shape as data"
[docs] def match(self, model_frame, renderer=None): """Match the frame of the model to the frame of this observation. The method sets up the mappings in spectral and spatial coordinates, which includes a spatial selection, computing PSF difference kernels and filter transformations. Parameters --------- model_frame: a `scarlet.Frame` instance The frame of `Blend` to match renderer: a `scarlet.Renderer` instance The transformation from model to observation Returns ------- None """ self.model_frame = model_frame # check dtype consistency if self.dtype != model_frame.dtype: self.dtype = model_frame.dtype self.data = self.data.astype(model_frame.dtype) if type(self.weights) is np.ndarray: self.weights = self.weights.astype(model_frame.dtype) # choose the renderer if renderer is None: if self.psf is model_frame.psf: self.renderer = NullRenderer(self, model_frame) else: assert self.psf is not None and model_frame.psf is not None if self.wcs is model_frame.wcs: # same or None wcs: ConvolutionRenderer self.renderer = ConvolutionRenderer( self, model_frame, convolution_type="fft" ) else: # if wcs shows changes in resolution or orientation: # use ResolutionRenderer assert self.wcs is not None and model_frame.wcs is not None angle, h = interpolation.get_angles(self.wcs, model_frame.wcs) same_res = abs(h - 1) < np.finfo(float).eps same_rot = (np.abs(angle[1]) ** 2) < np.finfo(float).eps if same_res and same_rot: self.renderer = ConvolutionRenderer( self, model_frame, convolution_type="fft" ) else: self.renderer = ResolutionRenderer(self, model_frame) else: assert isinstance(renderer, Renderer) self.renderer = renderer return self
@property def noise_rms(self): if not hasattr(self, "_noise_rms"): import numpy.ma as ma self._noise_rms = 1 / np.sqrt(ma.masked_equal(self.weights, 0)) ma.set_fill_value(self._noise_rms, np.inf) return self._noise_rms @property def parameters(self): # data is immutable, but renderer might be parameterized return self.renderer.parameters
[docs] def render(self, model, *parameters): """Convolve a model to the observation frame Parameters ---------- model: array The hyperspectral model parameters: tuple of optimization parameters Returns ------- model_: array `model` mapped into the observation frame """ return self.renderer(model, *parameters)
[docs] def get_log_likelihood(self, model, *parameters, noise_factor=0): """Computes the log-Likelihood of a given model wrt to the observation Parameters ---------- model: array The model from `Blend` parameters: tuple of optimization parameters Returns ------- logL: float """ model_ = self.render(model, *parameters) data_ = self.data weights_ = self.weights # noise injection to soften the gradient if noise_factor > 0: noise = np.random.normal(loc=0, scale=self.noise_rms) data_ = data_ + noise weights_ = weights_ / (noise_factor + 1) return -self.log_norm - np.sum(weights_ * (model_ - data_) ** 2) / 2
@property def log_norm(self): if not hasattr(self, "_log_norm"): # normalization of the single-pixel likelihood: # 1 / [(2pi)^1/2 (sigma^2)^1/2] # with inverse variance weights: sigma^2 = 1/weight # full likelihood is sum over all data samples: pixel in data D = np.prod(self.data.shape) - self.noise_rms.mask.sum() self._log_norm = D / 2 * np.log(2 * np.pi) # noise_rms has 0s masked, but log still throws warning... with np.errstate(divide="ignore"): self._log_norm += np.log(self.noise_rms).sum() return self._log_norm def _to_frame(self, frame, data=None): """Project this observation into another frame Note: the frame must have the same sampling and rotation, but can differ in the shape and origin of the `Frame`. This method is a convenience function for now but should not be considered supported and could be removed in a later version """ frame_slices, observation_slices = overlapped_slices(frame.bbox, self.bbox) if data is None: data = self.data if hasattr(frame, "dtype"): dtype = frame.dtype else: dtype = data.dtype result = np.zeros(frame.shape, dtype=dtype) result[frame_slices] = data[observation_slices] return result