import numpy as np
from .cache import Cache
from .wavelet import apply_wavelet_denoising
from . import fft
[docs]def get_filter_coords(filter_values, center=None):
"""Create filter coordinate grid needed for the apply filter function
Parameters
----------
filter_values: array
The 2D array of the filter to apply.
center: tuple
The center (y,x) of the filter. If `center` is `None` then
`filter_values` must have an odd number of rows and columns
and the center will be set to the center of `filter_values`.
Returns
-------
coords: array
The coordinates of the pixels in `filter_values`,
where the coordinates of the `center` pixel are `(0,0)`.
"""
if len(filter_values.shape) != 2:
raise ValueError("`filter_values` must be 2D")
if center is None:
if filter_values.shape[0] % 2 == 0 or filter_values.shape[1] % 2 == 0:
msg = """Ambiguous center of the `filter_values` array,
you must use a `filter_values` array
with an odd number of rows and columns or
calculate `coords` on your own."""
raise ValueError(msg)
center = [filter_values.shape[0] // 2, filter_values.shape[1] // 2]
x = np.arange(filter_values.shape[1])
y = np.arange(filter_values.shape[0])
x, y = np.meshgrid(x, y)
x -= center[1]
y -= center[0]
coords = np.dstack([y, x])
return coords
[docs]def get_filter_bounds(coords):
"""Get the slices in x and y to apply a filter
Parameters
----------
coords: array
The coordinates of the filter,
defined by `get_filter_coords`.
Returns
-------
y_start, y_end, x_start, x_end: int
The start and end of each slice that is passed to `apply_filter`.
"""
z = np.zeros((len(coords),), dtype=int)
# Set the y slices
y_start = np.max([z, coords[:, 0]], axis=0)
y_end = -np.min([z, coords[:, 0]], axis=0)
# Set the x slices
x_start = np.max([z, coords[:, 1]], axis=0)
x_end = -np.min([z, coords[:, 1]], axis=0)
return y_start, y_end, x_start, x_end
[docs]def get_projection_slices(image, shape, yx0=None):
"""Get slices needed to project an image
This method returns the bounding boxes needed to
project `image` into a larger image with `shape`.
The results can be used with
`projection[bb] = image[ibb]`.
Parameters
----------
image: array
2D input image
shape: tuple
Shape of the new image.
yx0: tuple
Location of the lower left corner of the image in
the projection.
If `yx0` is `None` then the image is centered in
the projection.
Returns
-------
bb: tuple
`(yslice, xslice)` of the projected image to place `image`.
ibb: tuple
`(iyslice, ixslice)` of `image` to insert into the projection.
bounds: tuple
`(bottom, top, left, right)` locations of the corners of `image`
in the projection. While this isn't needed for slicing it can be
useful for calculating information about the image before projection.
"""
Ny, Nx = shape
iNy, iNx = image.shape
if yx0 is None:
y0 = iNy // 2
x0 = iNx // 2
yx0 = (-y0, -x0)
bottom, left = yx0
bottom += Ny >> 1
left += Nx >> 1
top = bottom + iNy
yslice = slice(max(0, bottom), min(Ny, top))
iyslice = slice(max(0, -bottom), max(Ny - bottom, -top))
right = left + iNx
xslice = slice(max(0, left), min(Nx, right))
ixslice = slice(max(0, -left), max(Nx - left, -right))
return (yslice, xslice), (iyslice, ixslice), (bottom, top, left, right)
[docs]def project_image(image, shape, yx0=None):
"""Project an image centered in a larger image
The projection pads the image with zeros if
necessary or trims the edges if img is larger
than shape in a given dimension.
Parameters
----------
image: array
2D input image
shape: tuple
Shape of the new image.
yx0: tuple
Location of the lower left corner of the image in
the projection.
If `yx0` is `None` then the image is centered in
the projection.
Returns
-------
result: array
The result of projecting `image`.
"""
result = np.zeros(shape)
bb, ibb, _ = get_projection_slices(image, shape, yx0)
result[bb] = image[ibb]
return result
[docs]def common_projections(img1, img2):
"""Project two images to a common frame
It is assumed that the two images have the same center.
This is mainly used for FFT convolutions of source components,
where the convolution kernel is a different size than the morphology.
Parameters
----------
img1: array
1st 2D image to project
img2: array
2nd 2D image to project
Returns
-------
img1: array
Projection of 1st image
img2: array
Projection of 2nd image
"""
h1, w1 = img1.shape
h2, w2 = img2.shape
shape = (max(h1, h2), max(w1, w2))
return project_image(img1, shape), project_image(img2, shape)
[docs]def bilinear(dx):
"""Bilinear interpolation kernel
Interpolate between neighboring pixels to shift
by a fractional amount.
Parameters
----------
dx: float
Fractional amount that the kernel will be shifted
Returns
-------
result: array
2x2 linear kernel to use nearest neighbor interpolation
window: array
The pixel values for the window containing the kernel
"""
if np.abs(dx) > 1:
raise ValueError("The fractional shift dx must be between -1 and 1")
if dx >= 0:
window = np.arange(2)
y = np.array([1 - dx, dx])
else:
window = np.array([-1, 0])
y = np.array([-dx, 1 + dx])
return y, window
[docs]def cubic_spline(dx, a=1, b=0):
"""Generate a cubix spline centered on `dx`.
Parameters
----------
dx: float
Fractional amount that the kernel will be shifted
a: float
Cubic spline sharpness paremeter
b: float
Cubic spline shape parameter
Returns
-------
result: array
Cubic Spline kernel in a window from floor(dx)-1 to floor(dx) + 3
window: array
The pixel values for the window containing the kernel
"""
if np.abs(dx) > 1:
raise ValueError("The fractional shift dx must be between -1 and 1")
def inner(x):
"""Cubic from 0<=abs(x)<=1
"""
third = (-6 * a - 9 * b + 12) * x ** 3
second = (6 * a + 12 * b - 18) * x ** 2
zero = -2 * b + 6
return (zero + second + third) / 6
def outer(x):
"""Cubic from 1<=abs(x)<=2
"""
third = (-6 * a - b) * x ** 3
second = (30 * a + 6 * b) * x ** 2
first = (-48 * a - 12 * b) * x
zero = 24 * a + 8 * b
return (zero + first + second + third) / 6
window = np.arange(-1, 3) + np.floor(dx)
x = np.abs(dx - window)
result = np.piecewise(
x, [x <= 1, (x > 1) & (x < 2)], [lambda x: inner(x), lambda x: outer(x)]
)
return result, np.array(window).astype(int)
[docs]def catmull_rom(dx):
"""Cubic spline with a=0.5, b=0
See `cubic_spline` for details.
"""
return cubic_spline(dx, a=0.5, b=0)
[docs]def mitchel_netravali(dx):
"""Cubic spline with a=1/3, b=1/3
See `cubic_spline` for details.
"""
ab = 1 / 3
return cubic_spline(dx, a=ab, b=ab)
[docs]def lanczos(dx, a=3):
"""Lanczos kernel
Parameters
----------
dx: float
amount to shift image
a: int
Lanczos window size parameter
Returns
-------
result: array-like
1D Lanczos kernel
"""
if np.abs(dx) > 1:
raise ValueError("The fractional shift dx must be between -1 and 1")
window = np.arange(-a + 1, a + 1) + np.floor(dx)
y = np.sinc(dx - window) * np.sinc((dx - window) / a)
return y, window.astype(int)
[docs]def quintic_spline(dx, dtype=np.float64):
def inner(x):
return 1 + x ** 3 / 12 * (-95 + 138 * x - 55 * x ** 2)
def middle(x):
return (x - 1) * (x - 2) / 24 * (-138 + 348 * x - 249 * x ** 2 + 55 * x ** 3)
def outer(x):
return (x - 2) * (x - 3) ** 2 / 24 * (-54 + 50 * x - 11 * x ** 2)
window = np.arange(-3, 4)
x = np.abs(dx - window)
result = np.piecewise(
x,
[x <= 1, (x > 1) & (x <= 2), (x > 2) & (x <= 3)],
[lambda x: inner(x), lambda x: middle(x), lambda x: outer(x)],
)
return result, window
[docs]def get_separable_kernel(dy, dx, kernel=lanczos, **kwargs):
"""Create a 2D kernel from a 1D kernel separable in x and y
Parameters
----------
dy: float
amount to shift image in x
dx: float
amount to shift image in y
kernel: function
1D kernel that is separable in x and y
kwargs: dict
Keyword arguments for the kernel
Returns
-------
kernel: Tensor
2D separable kernel
x_window: Tensor
The pixel values for the window containing the kernel in the x-direction
y_window: Tensor
The pixel values for the window containing the kernel in the y-direction
"""
kx, x_window = kernel(dx, **kwargs)
ky, y_window = kernel(dy, **kwargs)
kyx = np.outer(ky, kx)
return kyx, y_window, x_window
[docs]def mk_shifter(shape, real=False):
""" Performs shifts in the Fourier domain on Fourier objects
Parameters:
-----------
shape: array
shape of the 2-D array to shift
real: bool
if true, the frequencies are all returned for real transforms (all dimension are half of the shape).
if False, only the last dimension is considered a real transform.
Returns:
--------
result: Fourier
A Fourier object with shifted arrays
"""
# Name of the chached shifts.
name = "mk_shifter"
key = shape[0], shape[1], real
try:
shifters = Cache.check(name, key)
except KeyError:
freq_x = np.fft.rfftfreq(shape[-1])
if real is True:
freq_y = np.fft.rfftfreq(shape[-2])
else:
freq_y = np.fft.fftfreq(shape[-2])
# Shift the signal to recenter it, negative because math is opposite from
# pixel direction
shift_y = -1j * 2 * np.pi * freq_y
shift_x = -1j * 2 * np.pi * freq_x
shifters = (shift_y, shift_x)
Cache.set(name, key, shifters)
return shifters
[docs]def get_affine(wcs):
try:
model_affine = wcs.wcs.pc
except AttributeError:
model_affine = wcs.cd
return model_affine
[docs]def get_pixel_size(model_affine):
""" Extracts the pixel size from a wcs
"""
pix = np.sqrt(
np.abs(model_affine[0, 0])
* np.abs(model_affine[1, 1] - model_affine[0, 1] * model_affine[1, 0])
)
return pix
[docs]def get_angles(frame_wcs, model_wcs):
""" Computes the angles between two WCS
Parameters
----------
frame_wcs: WCS
WCS of the observation's frame
model_WCS:
WCS of the model frame.
"""
model_affine = get_affine(model_wcs)
frame_affine = get_affine(frame_wcs)
model_pix = get_pixel_size(model_affine)
frame_pix = get_pixel_size(frame_affine)
# Pixel scale ratio
h = frame_pix / model_pix
# Vector giving the direction of the x-axis of each frame
self_framevector = np.sum(frame_affine, axis=0)[:2] / frame_pix
model_framevector = np.sum(model_affine, axis=0)[:2] / model_pix
# normalisation
self_framevector /= np.sum(self_framevector ** 2) ** 0.5
model_framevector /= np.sum(model_framevector ** 2) ** 0.5
# sin of the angle between datasets (normalised cross product)
sin_rot = np.cross(self_framevector, model_framevector)
# cos of the angle. (normalised scalar product)
cos_rot = np.dot(self_framevector, model_framevector)
return [cos_rot, sin_rot], h
[docs]def sinc_interp(images, coord_hr, coord_lr, angle=None, padding=3):
"""
Parameters
----------
image: array
image whose pixels are at positions coord_lr
coord_hr: array (2xN)
Coordinates of the high resolution grid
coord_lr: array (2xM)
Coordinates of the low resolution grid
angle: float
rotation angle between coordinate sets coord_hr and coord_lr
padding: int
value of zero padding for fft
Returns
-------
result: interpolated samples at positions coord_hr
"""
y_hr, x_hr = coord_hr
y_lr, x_lr = coord_lr
hy = np.abs(y_lr[1] - y_lr[0])
hx = np.abs(x_lr[1] - x_lr[0])
assert hy != 0
assert hx != 0
if (angle is None) or (1 - angle[0] < np.finfo(float).eps):
result = [
np.dot(
np.dot(
np.sinc((y_lr[np.newaxis, :] - y_hr[:, np.newaxis]) / hy), image.T
),
np.sinc((x_lr[:, np.newaxis] - x_hr[np.newaxis, :]) / hx),
)
for image in images
]
return np.array(result)
cos = angle[0]
sin = angle[1]
fft_shape = fft._get_fft_shape(images, images, padding=padding, axes=[1, 2])
X = fft.Fourier(images)
# Fourier transform
X_fft = X.fft(fft_shape, (-2, -1))
# Shift elementary kernel
shifter_y, shifter_x = mk_shifter(fft_shape)
# Shifts values
shift_y = np.exp(shifter_y[np.newaxis, :] * (-(y_hr[:, np.newaxis]) * cos))
shift_x = np.exp(shifter_x[np.newaxis, :] * (-(y_hr[:, np.newaxis]) * sin))
# Apply shifts
result_fft = X_fft[:, np.newaxis, :, :] * shift_y[np.newaxis, :, :, np.newaxis]
result_fft = result_fft * shift_x[np.newaxis, :, np.newaxis, :]
# Shape of the expected array
result_shape = np.array(
[result_fft.shape[0], result_fft.shape[1], X.image.shape[1], X.image.shape[2]]
)
# Shifts applied in one direction
result_shift = fft.Fourier.from_fft(result_fft, fft_shape, result_shape, [2, 3])
# sinc kernels
shy = np.sinc((y_lr[np.newaxis, :] + x_hr[:, np.newaxis] * sin) / hy)
shx = np.sinc((x_lr[np.newaxis, :] - x_hr[:, np.newaxis] * cos) / hx)
# Sinc kernels in both direction
result_y = (
result_shift.image[:, :, np.newaxis, :, :]
* shy[np.newaxis, np.newaxis, :, :, np.newaxis]
).sum(axis=-2)
result = (result_y * shx[np.newaxis, np.newaxis, :, :]).sum(axis=-1)
return result
[docs]def sinc_interp_inplace(image, h_image, h_target, angle, pad_shape=None):
""" In place interpolation of a cube of images
Performs interpolation from a grid defined by the grid of `image` to a grid spanning the same physical area scaled
by a factor `h` and rotated by `angle` radians. The center for the rotation is the central pixel of the image.
This procedure is advised for odd-sized images only.
Parameters
----------
image: `ndarray`
Cube of images with shape BxNyxNx with B the number of bands and NyxNx, the number of pixels
h_image: `float`
Phisical scale of a pixel in image
h_target: `float`
Physical scale of the target pixel to which to interpolate
angle: float
angle between the grid of image and the target grid where to interpolate
Returns
-------
interp_image: `ndarray`
padded interpolated image
"""
assert len(image.shape) == 3, (
"images should be provided as a cube. If only one image is provided, "
"image should be a cube with image.shape[0] = 1"
)
if pad_shape is not None:
# Padding. This is never explicitelly undone in this function on purpose. Proceed with caution.
image = fft._pad(image, pad_shape, axes=[-2, -1])
ny_lr, nx_lr = image.shape[-2:]
coord_lr = np.array(
[
np.array(range(ny_lr)) - (ny_lr - 1) / 2,
np.array(range(nx_lr)) - (nx_lr - 1) / 2,
]
)
ny_hr, nx_hr = (
np.round(image.shape[-2] * h_image / h_target).astype(int),
np.round(image.shape[-1] * h_image / h_target).astype(int),
)
if (ny_hr % 2) == 0:
ny_hr += 1
if (nx_hr % 2) == 0:
nx_hr += 1
coord_hr = (
np.array(
[
np.array(range(ny_hr)) - (ny_hr - 1) / 2,
np.array(range(nx_hr)) - (nx_hr - 1) / 2,
]
)
/ h_image
* h_target
)
return sinc_interp(image, coord_hr, coord_lr, angle=angle)
[docs]def interpolate_observation(observation, frame, wave_filter=False):
""" Interpolates the images in an observation on the grid described by a frame
and returns the interpolated images.
Paramters
---------
observation: `scarlet.Observation` object
observation with images to interpolate
frame: `scarlet.Frame` object
frame to which to interpolate the observation
wave_filter: `bool`
set to True to wavelet-filter the images before interpolation (avoid correlated noise)
Returns
-------
interp: `numpy.ndarray`
array containing the interpolated images from observation.
"""
# Interpolate low resolution data to high resolution
coord_lr0 = np.array(
(np.arange(observation.shape[1]),
np.arange(observation.shape[2]),)
)
coord_hr = (np.arange(frame.shape[1]),
np.arange(frame.shape[2]))
coord_lr = observation.convert_pixel_to(frame, pixel=coord_lr0.T).T
interp = []
if wave_filter is True:
images = np.array([apply_wavelet_denoising(image) for image in observation.data])
else:
images = observation.data
for image in images:
interp.append(
sinc_interp(image[None, :, :], coord_hr, coord_lr, angle=None)[0].T
)
return np.array(interp)
[docs]def get_common_padding(img1, img2, padding=None):
"""Project two images to a common frame
It is assumed that the two images have the same center.
This is mainly used for FFT convolutions of source components,
where the convolution kernel is a different size than the morphology.
Parameters
----------
img1: array
1st 2D or 3D image to project
img2: array
2nd 2D or 3D image to project
Returns
-------
img1: array
Projection of 1st image
img2: array
Projection of 2nd image
"""
h1, w1 = img1.shape[-2:]
h2, w2 = img2.shape[-2:]
height = h1 + h2
width = w1 + w2
if padding is not None:
height += padding
width += padding
def get_padding(h, w):
bottom = (height - h) // 2
top = height - h - bottom
left = (width - w) // 2
right = width - w - left
return ((bottom, top), (left, right))
return get_padding(h1, w1), get_padding(h2, w2)
[docs]def sinc2D(y, x):
"""
2-D sinc function based on the product of 2 1-D sincs
Parameters
----------
x, y: arrays
Coordinates where to evaluate the 2-D sinc
Returns
-------
result: array
2-D sinc evaluated in x and y
"""
return np.dot(np.sinc(y), np.sinc(x))
[docs]def subsample_function(y, x, f, dNy, dNx=None, dy=None, dx=None):
"""Subsample a function
Given the expected pixel grid of a function, subsample that function
at a grid subdivided in x by `dNx` and y by `dNy`.
"""
# Use the spacing between x values to define the subsampled regions
if dx is None:
dx = x[1] - x[0]
if dy is None:
dy = y[1] - y[0]
if dNx is None:
dNx = dNy
assert dNy % 2 == 0, "dNy must be even, received {0}".format(dNy)
assert dNx % 2 == 0, "dNx must be even, received {0}".format(dNx)
assert np.all(np.isclose(x[1:] - x[:-1], x[1] - x[0])), "x must have equal spacing"
assert np.all(np.isclose(y[1:] - y[:-1], y[1] - y[0])), "y must have equal spacing"
# Create the subsampled interval and use it to sample `f`
_x = np.linspace(x[0] - dx / 2, x[-1] + dx / 2, len(x) * dNx + 1)
_y = np.linspace(y[0] - dy / 2, y[-1] + dy / 2, len(y) * dNy + 1)
return f(_y, _x), _y, _x
[docs]def apply_2D_trapezoid_rule(y, x, f, dNy, dNx=None, dy=None, dx=None):
"""Use the trapezoid rule to integrate over a subsampled function
2D implementation of the trapezoid rule.
See `apply_trapezoid_rule` for a description, with the difference
that `f` is a function `f(y,x)`, where we note the c ++`(y,x)` ordering.
"""
if dy is None:
dy = y[1] - y[0]
if dx is None:
dx = x[1] - x[0]
if dNx is None:
dNx = dNy
z, _y, _x = subsample_function(y, x, f, dNy, dNx, dy, dx)
# Calculate the volume of each sub region
dz = 0.4 * (z[:-1, :-1] + z[1:, :-1] + z[:-1, 1:] + z[1:, 1:])
volumes = dy * dx * dz / dNy / dNx
# Sum up the sub regions around each point to
# give it the same shape as the original `(y,x)`
_dNy = len(_y) // dNy
_dNx = len(_x) // dNx
volumes = np.array(
np.split(np.array(np.split(volumes, _dNx, axis=1)), _dNy, axis=1)
).sum(axis=(2, 3))
return volumes
[docs]def get_psf_size(psf):
""" Measures the size of a psf by computing the size of the area in 3 sigma around the center.
This is an approximate method to estimate the size of the psf for setting the size of the frame,
which does not require a precise measurement.
Parameters
----------
PSF: `scarlet.PSF` object
PSF for whic to compute the size
Returns
-------
sigma3: `float`
radius of the area inside 3 sigma around the center in pixels
"""
# Normalisation by maximum
psf_frame = psf / np.max(psf)
# Pixels in the FWHM set to one, others to 0:
psf_frame[psf_frame > 0.5] = 1
psf_frame[psf_frame <= 0.5] = 0
# Area in the FWHM:
area = np.sum(psf_frame)
# Diameter of this area
d = 2 * (area / np.pi) ** 0.5
# 3-sigma:
sigma3 = 3 * d / (2 * (2 * np.log(2)) ** 0.5)
return sigma3