Skip to content

Add ability to crop data 2D data using f1l,f1r or xmin, xmax #27

@github-actions

Description

@github-actions

# TODO: Add ability to crop data 2D data using f1l,f1r or xmin, xmax

"""
Functions for reading data from raw Bruker NMR files.

All functions return a dictionary containing data with appropriate keys.
"""

import os
from xml.etree import ElementTree

import numpy as np
from nmrglue.fileio import bruker
from scipy import signal


def get_1d_data(exp_path, proc_num=1, include_md=False):
    """
    Get 1D NMR data from raw Bruker files. Returns a dictionary bundling the data.

    exp_path: Top-level experiment folder containing the 2D NMR data.
    proc_num: Process number of data to be plotted (default = 1).
    include_md: If true, include procs and acqus dictionaries in the output bundle.
    """

    pdata_path = os.path.join(exp_path, "pdata", str(proc_num))
    metadata, data = bruker.read_pdata(pdata_path)

    # Determine x axis values
    offset_freq = metadata["acqus"]["O1"]
    basic_field = metadata["acqus"]["BF1"]
    nucleus = metadata["acqus"]["NUC1"]

    pdata_size = metadata["procs"]["SI"]
    spectral_ref_freq = metadata["procs"]["SF"]
    spectral_width_hz = metadata["procs"]["SW_p"]

    spectral_ref = (spectral_ref_freq - basic_field) * 1e6
    true_center = offset_freq - spectral_ref
    x_min = true_center - spectral_width_hz / 2
    x_max = true_center + spectral_width_hz / 2
    x_vals_hz = np.linspace(x_max, x_min, num=int(pdata_size))
    x_vals_ppm = x_vals_hz / spectral_ref_freq

    bundle = {
        "data_type": "1d",
        "nucleus": nucleus,
        "x_vals_hz": x_vals_hz,
        "x_vals_ppm": x_vals_ppm,
        "y_data": data,
    }

    if include_md:
        bundle.update(metadata)

    return bundle


def get_data_from_folder(dir_path, exp_nums):
    """
    Function to read in 1D NMR data from a folder of raw Bruker files.

    dir_path: Data directory containing 1D-NMR experiment folders.
    exp_nums: List of experiment numbers (e.g., [1, 5, 6, 10]).

    TODO: Add normalization option to get_data_from_folder
    TODO: Check data dimensionality in get_data_from_folder
    """

    # If exp_nums is not provided, use all experiment-numbered folders in dir_path
    if exp_nums is None:
        exp_nums = [
            item
            for item in os.listdir(dir_path)
            if os.path.isdir(os.path.join(dir_path, item)) and item.isnumeric()
        ]

    bundle = {}
    for exp_num in exp_nums:
        exp_path = os.path.join(dir_path, str(exp_num))
        bundle[f"exp_{exp_num}"] = get_1d_data(exp_path)

    # If all experiments have the same set of x vals, specify add a key to the bundle
    # with that set
    all_x_vals_ppm = set([exp_bundle["x_vals_ppm"] for _, exp_bundle in bundle.items()])
    if len(all_x_vals_ppm) == 1:
        bundle["x_vals_ppm"] = next(iter(bundle))["x_vals_ppm"]

    return bundle


def get_2d_data(exp_path, proc_num=1):
    """
    Get 2D NMR data from raw Bruker files.

    exp_path: Top-level experiment folder containing the 2D NMR data.
    proc_num: Process number of data to be plotted (default = 1).
    """

    pdata_path = os.path.join(exp_path, "pdata", str(proc_num))
    metadata, data = bruker.read_pdata(pdata_path)

    bundle = {"data_type": "2d", "z_data": data}

    # Determine values for x and y axes
    for dim in ["x", "y"]:
        if dim == "x":
            acqus = "acqus"
            procs = "procs"
        else:
            acqus = "acqu2s"
            procs = "proc2s"

        offset_freq = metadata[acqus]["O1"]
        basic_field = metadata[acqus]["BF1"]
        nucleus = metadata[acqus]["NUC1"]

        pdata_size = metadata[procs]["SI"]
        spectral_ref_freq = metadata[procs]["SF"]
        spectral_width_hz = metadata[procs]["SW_p"]

        spectral_ref = (spectral_ref_freq - basic_field) * 1e6
        center = offset_freq - spectral_ref
        min_hz = center - spectral_width_hz / 2
        max_hz = center + spectral_width_hz / 2
        vals_hz = np.linspace(max_hz, min_hz, num=int(pdata_size))

        bundle[f"{dim}_nucleus"] = nucleus
        bundle[f"{dim}_vals_hz"] = vals_hz
        bundle[f"{dim}_vals_ppm"] = vals_hz / spectral_ref_freq

    # TODO: Add ability to crop data 2D data using f1l,f1r or xmin, xmax

    return bundle


def get_diff_params(exp_path):
    """
    Extract diffusion experiment parameters from a diff.xml file.

    Returns the following values in a bundle dict:
        little_delta: Length of each gradient pulse in [ms].
        big_delta: Diffusion time, measured between the starting times of the two
                   gradient pulses in [ms].
        diff_coeff_estimate: The estimated diffusion coefficient in [m^2/s].
        gradient_list: List of gradient strengths in [G/cm].
    """

    xml_path = os.path.join(exp_path, "diff.xml")
    tree = ElementTree.parse(xml_path)
    root = tree.getroot()

    little_delta = float(root.find(".//little_delta").text)  # [ms]
    little_delta = little_delta / 1000  # [s]

    big_delta = float(root.find(".//big_delta").text)  # [ms]
    big_delta = big_delta / 1000  # [s]

    diff_coeff_estimate = float(root.find(".//exDiffCoff").text)  # [m2/s]

    x_values_element = root.find(".//xValues/List")
    if x_values_element:
        x_values_list = x_values_element.text.split()
        x_values = [float(value) for value in x_values_list]
        gradient_list = x_values[1::4]  # [G/cm]
    else:
        x_values_list = root.findall(".//X")
        gradient_list = [float(i.attrib["g"]) for i in x_values_list]

    # Convert from [G/cm] to [T/m]?
    gradient_list = [x / 100 for x in gradient_list]  # [T/m]

    bundle = {
        "little_delta": little_delta,
        "big_delta": big_delta,
        "diff_coeff_estimate": diff_coeff_estimate,
        "gradient_list": gradient_list,
    }

    return bundle


def get_pseudo2d_data(exp_path, proc_num=1):
    """
    Get pseudo-2D NMR data from raw Bruker files.

    exp_path: Top-level experiment folder containing the pseudo-2D NMR data.
    proc_num: Process number of data to be plotted (default = 1).
    """

    pdata_path = os.path.join(exp_path, "pdata", str(proc_num))
    metadata, data = bruker.read_pdata(pdata_path)

    offset_freq = metadata["acqus"]["O1"]
    basic_field = metadata["acqus"]["BF1"]
    nucleus = metadata["acqus"]["NUC1"]

    pdata_size = metadata["procs"]["SI"]
    spectral_ref_freq = metadata["procs"]["SF"]
    spectral_width_hz = metadata["procs"]["SW_p"]

    spectral_ref = (spectral_ref_freq - basic_field) * 1e6
    center = offset_freq - spectral_ref
    x_min_hz = center - spectral_width_hz / 2
    x_max_hz = center + spectral_width_hz / 2
    x_vals_hz = np.linspace(x_max_hz, x_min_hz, num=int(pdata_size))
    x_vals_ppm = x_vals_hz / spectral_ref_freq

    bundle = {
        "data_type": "pseudo2d",
        "nucleus": nucleus,
        "x_vals_hz": x_vals_hz,
        "x_vals_ppm": x_vals_ppm,
        "y_data": data,
        "acqus": metadata["acqus"],
        "acqu2s": metadata["acqu2s"],
    }

    return bundle


def get_peak_slice_intensities(
    x_vals_ppm,
    y_data,
    peak_pos=None,
    prominence=None,
):
    """
    Find the intensities of peaks across slices in a pseudo-2d experiment. Peak
    intensities are normalized by the slice with the largest total intensity.

    y_data: 2D intensity data, where each row is a slice/1D spectrum.
    x_vals_ppm: Corresponding ppm values for intensity data.
    peak_pos: Position(s) in ppm to get the intensities for. If None (default), peak
              positions will be found automatically using scipy.signal.find_peaks.
    prominence: Range of prominence values to allow during automatic peak picking.
                Default is [0.001, 1]. Refer to `scipy.signal.find_peaks` for more info.

    TODO: add bundle input to get_peak_slice_intensities
    TODO: allow no xdata input to get_peak_slice_intensities
    TODO: Add normalize option to get_peak_slice_intensities
    """

    # Convert data into nparray if not already
    x_vals_ppm = np.array(x_vals_ppm)
    y_data = np.array(y_data)

    # Choose the slice with the highest total intensity for normalization
    best_slice = max(y_data, key=np.sum)

    # CHECK: Does this normalization block actually do anything since intensities are
    #        renormalized after picking?
    # Normalize best slice
    min_best_slice = min(best_slice)
    best_slice = best_slice - min_best_slice
    max_best_slice = max(best_slice)
    best_slice = best_slice / max_best_slice
    # Normalize y_data by the best slice
    y_data = y_data - min_best_slice
    y_data = y_data / max_best_slice

    if peak_pos is None:
        # Find peaks using find_peaks if peak_pos not provided
        if prominence is None:
            prominence = [0.001, 1]
        peak_idx = signal.find_peaks(best_slice, prominence=prominence)[0]
    else:
        peak_idx = [np.abs(x_vals_ppm - ppm).argmin() for ppm in peak_pos]

    # ppm values of picked peaks, might be slightly different from input peak_pos
    peak_pos = x_vals_ppm[peak_idx]

    # Get intensities of picked peaks in all slices
    # CHECK Is float cast needed here?
    peak_ints = np.array([[float(y_slice[i]) for i in peak_idx] for y_slice in y_data])
    # Normalize by max intensity of each peak
    peak_ints_norm = peak_ints / peak_ints.max(axis=0)

    bundle = {
        "peak_idx": peak_idx,
        "peak_pos_ppm": peak_pos,
        "peak_ints": peak_ints,
        "peak_ints_norm": peak_ints_norm,
    }

    return bundle

Metadata

Metadata

Assignees

Labels

enhancementNew feature or request

Type

No type
No fields configured for issues without a type.

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions