Skip to content
Snippets Groups Projects
imaging.py 7.03 KiB
"""
Defines classes and functions to manage reading and writing files associated
with absorption imaging.
"""
from __future__ import annotations
import numpy as np
import numpy.linalg as la
import lmfit
import pathlib
import toml
import PIL.Image
import lib.plotdefs as pd

DEF_DX = 3.45 # pixel size for Flir Grasshopper [um]

def _fresh_filename(path: pathlib.Path, overwrite: bool=False) -> pathlib.Path:
    if overwrite:
        return path
    _path = path
    while _path.is_file():
        print(f"WARNING: found existinf file {_path}")
        _path = _path.with_stem(_path.stem + "_")
        print(f"Write instead to {_path}")
    return _path

class AbsorptionData:
    def __init__(self, name: str, arrays: dict[str, np.ndarray],
            config: dict[str, ...], comments: str=None,
            results: dict[str, ...]=None):
        self.name = name
        self.arrays = arrays
        self.config = config
        self.comments = str() if comments is None else comments
        self.results = results

    def save(self, target: pathlib.Path=None, overwrite: bool=False,
            printflag: bool=True):
        T = pathlib.Path(self.name) if target is None else target
        if printflag: print(f"Saving data to {T}:")
        if not T.is_dir():
            if printflag: print(f"  :: mkdir {T}")
            T.mkdir(parents=True, exist_ok=True)

        arrays_file = _fresh_filename(T.joinpath("arrays.npz"), overwrite)
        if printflag: print("  save arrays")
        np.savez_compressed(arrays_file, **self.arrays)

        config_file = _fresh_filename(T.joinpath("config.toml"), overwrite)
        if printflag: print("  save camera config")
        with config_file.open('w') as outfile:
            toml.dump(self.config, outfile)

        comments_file = _fresh_filename(T.joinpath("comments.txt"), overwrite)
        if printflag: print("  save comments")
        comments_file.write_text(self.comments)

        if self.results is not None:
            results_file = _fresh_filename(
                T.joinpath("results.toml"), overwrite)
            if printflag: print("  save results")
            with results_file.open('w') as outfile:
                toml.dump(self.results, outfile)
        if printflag: print("  Done.")
        return self

    @staticmethod
    def load(target: pathlib.Path, name: str=None, printflag: bool=True):
        name = target.stem if name is None else name
        if printflag: print(f"Loading data from {target}:")
        if not target.is_dir():
            raise Exception(f"Target {target} does not exist")

        arrays_file = target.joinpath("arrays.npz")
        if printflag: print("  load arrays")
        if not arrays_file.is_file():
            raise Exception(f"Arrays file {arrays_file} does not exist")
        arrays = np.load(arrays_file)

        config_file = target.joinpath("config.toml")
        if printflag: print("  load camera config")
        if not config_file.is_file():
            raise Exception(f"Config file {config_file} does not exist")
        config = toml.load(config_file)

        comments_file = target.joinpath("comments.txt")
        if printflag and comments_file.is_file(): print("  load comments")
        comments = comments_file.read_text() \
                if comments_file.is_file() else str()

        results_file = target.joinpath("results.toml")
        if printflag and results_file.is_file(): print("  load results")
        results = results_file.read_text() \
                if results_file.is_file() else None
        if printflag: print("  Done.")
        return AbsorptionData(name, arrays, config, comments, results)

    def render_arrays(self, target: pathlib.Path=None, dx=None,
            printflag: bool=True):
        T = pathlib.Path(self.name) if target is None else target
        T = T.joinpath("images")
        _dx = self.config.get("bin_size", 1) * (DEF_DX if dx is None else dx)
        if printflag: print(f"Render images to {T}:")
        if not T.is_dir():
            if printflag: print(f"  :: mkdir {T}")
            T.mkdir(parents=True, exist_ok=True)
        for label, array in self.arrays.items():
            if printflag: print(f"  render {label}")
            if label == "od":
                H, W = array.shape
                (pd.Plotter()
                    .imshow(array,
                        cmap=pd.colormaps["vibrant"], vmin=-0.1, vmax=0.15,
                        extent=[0, W * _dx, H * _dx, 0]
                    )
                    .colorbar()
                    #.set_clim(0, array.max())
                    .set_xlabel("[$\\mu$m]")
                    .set_ylabel("[$\\mu$m]")
                    .savefig(T.joinpath(label + ".png"))
                    .close()
                )
            else:
                PIL.Image.fromarray(array).save(T.joinpath(label + ".png"))
        if printflag: print("  Done.")
        return self

    def compute_results(self, dA=(2*3.45)**2, printflag: bool=True):
        _dA = self.config.get("bin_size", 1)**2 \
                * (DEF_DX**2 if dA is None else dA)
        if printflag: print("Compute results")
        OD = compute_od(
            self.arrays["shadow"], self.arrays["bright"], self.arrays["dark"])
        self.arrays["od"] = OD
        sx, sy = fit_gaussian(OD, _dA)
        self.results = dict() if self.results is None else self.results
        self.results.update({"sx": sx, "sy": sy})
        return self

def compute_od(shadow: np.ndarray, bright: np.ndarray, dark: np.ndarray):
    with np.errstate(divide="ignore", invalid="ignore"):
        T = (sh - da) / (br - da)
        H, W = T.shape
        OD = -np.log(T)
    OD[~np.isfinite(OD)] = 0.0
    return OD

def _fit_gaussian(A, dA):
    """
    Assumes uniform, square dA.
    """
    imax, jmax = np.where(A == A.max())
    i0, j0 = imax.mean(), jmax.mean()
    x = np.sqrt(dA) * (np.arange(A.shape[1]) - j0)
    y = np.sqrt(dA) * (np.arange(A.shape[0]) - i0)
    X, Y = np.meshgrid(x, y)
    Z = (-np.log(A / A.max())).flatten()
    M = np.array([X.flatten()**2, Y.flatten()**2]).T
    Sx, Sy = la.solve(M.T @ M, M.T @ Z)
    return np.sqrt(1 / Sx), np.sqrt(1 / Sy)

def fit_gaussian(data, dA):
    imax, jmax = np.where(data == data.max())
    i0, j0 = imax.mean(), jmax.mean()
    x0 = np.sqrt(dA) * j0
    x = np.sqrt(dA) * np.arange(data.shape[1])
    y0 = np.sqrt(dA) * i0
    y = np.sqrt(dA) * np.arange(data.shape[0])
    X, Y = np.meshgrid(x, y)

    def model(params: lmfit.Parameters):
        A = params["A"]
        x0 = params["x0"]
        sx = params["sx"]
        y0 = params["y0"]
        sy = params["sy"]
        return A * np.exp(-((X - x0) / sx)**2 - ((Y - y0) / sy)**2)

    def residual(params: lmfit.Parameters):
        m = model(params)
        return ((data - m)**2).flatten()

    params = lmfit.Parameters()
    params.add("A", value=data.max())
    params.add("x0", value=x0)
    params.add("sx", value=1)
    params.add("y0", value=y0)
    params.add("sy", value=1)
    fit = lmfit.minimize(residual, params)
    if not fit.success:
        raise Exception("Error in Gaussian fit")
    return float(fit.params["sx"]), float(fit.params["sy"])