Skip to content
Snippets Groups Projects
imaging.py 9.45 KiB
"""
Defines classes and functions to manage reading and writing files associated
with absorption imaging.
"""
from __future__ import annotations
import numpy as np
import numpy.linalg as la
import lmfit
import pathlib
import toml
import warnings
import PIL.Image
import lib.plotdefs as pd
import lib.dataio as io

H = 6.626070040e-34
C = 2.99792458e+8

DEF_DX = 3.45 # pixel size for Flir Grasshopper [um]

class ImagingData:
    def __init__(self, outdir: pathlib.Path, arrays: dict[str, np.ndarray],
            config: dict[str, ...], comments: str=None,
            results: dict[str, ...]=None):
        self.outdir = outdir
        self.arrays = arrays
        self.config = config
        self.comments = str() if comments is None else comments
        self.results = results

    def save(self, target: pathlib.Path=None, overwrite: bool=False,
            printflag: bool=True):
        T = self.outdir if target is None else target
        if printflag: print(f"[imaging] Saving data to '{T}':")
        if not T.is_dir():
            if printflag: print(f"[imaging] mkdir {T}")
            T.mkdir(parents=True, exist_ok=True)

        arrays_file = io.fresh_filename(T.joinpath("arrays.npz"), overwrite)
        if printflag: print("[imaging] save arrays")
        np.savez_compressed(arrays_file, **self.arrays)

        config_file = io.fresh_filename(T.joinpath("config.toml"), overwrite)
        if printflag: print("[imaging] save camera config")
        with config_file.open('w') as outfile:
            toml.dump(self.config, outfile)

        comments_file = io.fresh_filename(T.joinpath("comments.txt"), overwrite)
        if printflag: print("[imaging] save comments")
        comments_file.write_text(self.comments)

        if self.results is not None:
            results_file = io.fresh_filename(
                T.joinpath("results.toml"), overwrite)
            if printflag: print("[imaging] save results")
            with results_file.open('w') as outfile:
                toml.dump(self.results, outfile)
        #if printflag: print("  Done.")
        return self

    @staticmethod
    def load(target: pathlib.Path, outdir: pathlib.Path=None,
            printflag: bool=True):
        outdir = target if outdir is None else target
        if printflag: print(f"[imaging] Loading data from target '{target}':")
        if not target.is_dir():
            raise Exception(f"Target '{target}' does not exist")

        arrays_file = target.joinpath("arrays.npz")
        if printflag: print("[imaging] load arrays")
        if not arrays_file.is_file():
            raise Exception(f"Arrays file '{arrays_file}' does not exist")
        arrays = dict(np.load(arrays_file))

        config_file = target.joinpath("config.toml")
        if printflag: print("[imaging] load camera config")
        if not config_file.is_file():
            raise Exception(f"Config file '{config_file}' does not exist")
        config = toml.load(config_file)

        comments_file = target.joinpath("comments.txt")
        if printflag and comments_file.is_file(): print("[imaging] load comments")
        comments = comments_file.read_text() \
                if comments_file.is_file() else str()

        results_file = target.joinpath("results.toml")
        if printflag and results_file.is_file(): print("[imaging] load results")
        results = results_file.read_text() \
                if results_file.is_file() else None
        #if printflag: print("  Done.")
        return ImagingData(outdir, arrays, config, comments, results)

    def render_arrays(self, target: pathlib.Path=None, dx=None,
            printflag: bool=True):
        T = self.outdir if target is None else target
        T = T.joinpath("images")
        _dx = self.config.get("bin_size", 1) * (DEF_DX if dx is None else dx)
        if printflag: print(f"[imaging] Render images to {T}:")
        if not T.is_dir():
            if printflag: print(f"[imaging] mkdir {T}")
            T.mkdir(parents=True, exist_ok=True)
        for label, array in self.arrays.items():
            if printflag: print(f"[imaging] render '{label}'")
            if label == "od":
                warnings.filterwarnings("ignore")
                H, W = array.shape
                (pd.Plotter()
                    .imshow(array,
                        cmap=pd.colormaps["vibrant"], vmin=-0.1, vmax=0.15,
                        extent=[0, W * _dx, H * _dx, 0]
                    )
                    .colorbar()
                    #.set_clim(0, array.max())
                    .set_xlabel("[$\\mu$m]")
                    .set_ylabel("[$\\mu$m]")
                    .savefig(T.joinpath(label + ".png"))
                    .close()
                )
                warnings.resetwarnings()
            else:
                PIL.Image.fromarray(array).save(T.joinpath(label + ".png"))
        #if printflag: print("  Done.")
        return self

    def compute_results(self, dA=(2*3.45)**2, printflag: bool=True):
        raise NotImplementedError

class AbsorptionData(ImagingData):
    @staticmethod
    def load(target: pathlib.Path, outdir: pathlib.Path=None,
            printflag: bool=True):
        data = ImagingData.load(target, outdir, printflag)
        return AbsorptionData(
            data.outdir, data.arrays, data.config, data.comments, data.results)

    def compute_results(self, dA=(3.45)**2, printflag: bool=True):
        _dA = self.config.get("bin_size", 1)**2 \
                * (DEF_DX**2 if dA is None else dA)
        if printflag: print("[imaging] Compute results")
        OD = compute_od(
            self.arrays["shadow"], self.arrays["bright"], self.arrays["dark"])
        self.arrays["od"] = OD
        sx, sy = fit_gaussian(OD, _dA)
        self.results = dict() if self.results is None else self.results
        self.results.update({"sx": sx, "sy": sy})
        return self

def compute_od(shadow: np.ndarray, bright: np.ndarray, dark: np.ndarray):
    with np.errstate(divide="ignore", invalid="ignore"):
        T = (shadow - dark) / (bright - dark)
        H, W = T.shape
        OD = -np.log(T)
    OD[~np.isfinite(OD)] = 0.0
    return OD

class FluorescenceData(ImagingData):
    @staticmethod
    def load(target: pathlib.Path, outdir: pathlib.Path=None,
            printflag: bool=True):
        data = ImagingData.load(target, outdir, printflag)
        return FluorescenceData(
            data.outdir, data.arrays, data.config, data.comments, data.results)

    def compute_results(self, dA=3.45**2, printflag: bool=True):
        _dA = self.config.get("bin_size", 1)**2 \
                * (DEF_DX**2 if dA is None else dA)
        if printflag: print("[imaging] Compute results")
        for label, array in self.arrays.items():
            N = compute_mot_number(
                array,
                0.76,
                self.config["gain"], # dB
                self.config["exposure_time"] * 1e-6, # s
                (0.0127**2 * np.pi) / (4 * np.pi * 0.125**2),
                50e6 * 2 * np.pi,
                1.0
            )
            sx, sy = _fit_gaussian(array, _dA)
            self.results = dict() if self.results is None else self.results
            self.results.update(
                {
                    f"N_{label}": float(N),
                    f"sx_{label}": float(sx),
                    f"sy_{label}": float(sy)
                }
            )
        return self

def compute_mot_number(image, QE, gain, exposure_time, solid_angle,
        detuning, intensity_parameter):
    if image.max() >= np.iinfo(image.dtype).max:
        print("[imaging] WARNING: image may contain clipping")
    electron_rate = image.astype(np.float64).sum() / 16 # get to 12-bit values
    photon_rate \
            = electron_rate \
            / 10**(gain / 10) \
            / QE \
            / solid_angle \
            / exposure_time
    Y = 29.1e6 * 2 * np.pi # transition linewidth; s^-1
    l = 399e-9 # transition wavelength; m
    I_sat = (Y * H * C * np.pi) / (3 * l**3)
    scatter_rate = lambda D, I: \
            (Y / 2) * I / (1 + 4 * (D / Y)**2 + I)
    N = photon_rate / scatter_rate(detuning, intensity_parameter)
    return N

def _fit_gaussian(A, dA):
    """
    Assumes uniform, square dA.
    """
    imax, jmax = np.where(A == A.max())
    i0, j0 = imax.mean(), jmax.mean()
    x = np.sqrt(dA) * (np.arange(A.shape[1]) - j0)
    y = np.sqrt(dA) * (np.arange(A.shape[0]) - i0)
    X, Y = np.meshgrid(x, y)
    Z = (-np.log(A / A.max())).flatten()
    M = np.array([X.flatten()**2, Y.flatten()**2]).T
    Sx, Sy = la.solve(M.T @ M, M.T @ Z)
    return np.sqrt(1 / Sx), np.sqrt(1 / Sy)

def fit_gaussian(data, dA):
    imax, jmax = np.where(data == data.max())
    i0, j0 = imax.mean(), jmax.mean()
    x0 = np.sqrt(dA) * j0
    x = np.sqrt(dA) * np.arange(data.shape[1])
    y0 = np.sqrt(dA) * i0
    y = np.sqrt(dA) * np.arange(data.shape[0])
    X, Y = np.meshgrid(x, y)

    def model(params: lmfit.Parameters):
        A = params["A"]
        x0 = params["x0"]
        sx = params["sx"]
        y0 = params["y0"]
        sy = params["sy"]
        return A * np.exp(-((X - x0) / sx)**2 - ((Y - y0) / sy)**2)

    def residual(params: lmfit.Parameters):
        m = model(params)
        return ((data - m)**2).flatten()

    params = lmfit.Parameters()
    params.add("A", value=data.max())
    params.add("x0", value=x0)
    params.add("sx", value=1)
    params.add("y0", value=y0)
    params.add("sy", value=1)
    fit = lmfit.minimize(residual, params)
    if not fit.success:
        raise Exception("Error in Gaussian fit")
    return float(fit.params["sx"]), float(fit.params["sy"])