Skip to content

process

aggregate_columns(df, columns, by=['start', 'end', 'exposure'])

Aggregate the DataFrame the specified columns by intensity-weighted average.

Source code in hdxms_datasets/process.py
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
@nw.narwhalify
def aggregate_columns(
    df: nw.DataFrame, columns: list[str], by: list[str] = ["start", "end", "exposure"]
) -> nw.DataFrame:
    """
    Aggregate the DataFrame the specified columns by intensity-weighted average.
    """
    groups = df.group_by(by)
    output = {k: [] for k in by}
    for col in columns:
        output[col] = []
        output[f"{col}_sd"] = []

    for (start, end, exposure), df_group in groups:
        output["start"].append(start)
        output["end"].append(end)
        output["exposure"].append(exposure)

        for col in columns:
            val = ufloat_stats(df_group[col], df_group["intensity"])
            output[col].append(val.nominal_value)
            output[f"{col}_sd"].append(val.std_dev)

    agg_df = nw.from_dict(output, backend=BACKEND)
    return agg_df

compute_uptake_metrics(df, exception='raise')

Tries to add derived columns to the DataFrame. Possible columns to add are: uptake, uptake_sd, fd_uptake, fd_uptake_sd, rfu, max_uptake.

Source code in hdxms_datasets/process.py
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
def compute_uptake_metrics(df: nw.DataFrame, exception="raise") -> nw.DataFrame:
    """
    Tries to add derived columns to the DataFrame.
    Possible columns to add are: uptake, uptake_sd, fd_uptake, fd_uptake_sd, rfu, max_uptake.
    """
    all_columns = {
        "max_uptake": hdx_expr.max_uptake,
        "uptake": hdx_expr.uptake,
        "uptake_sd": hdx_expr.uptake_sd,
        "fd_uptake": hdx_expr.fd_uptake,
        "fd_uptake_sd": hdx_expr.fd_uptake_sd,
        "frac_fd_control": hdx_expr.frac_fd_control,
        "frac_fd_control_sd": hdx_expr.frac_fd_control_sd,
        "frac_max_uptake": hdx_expr.frac_max_uptake,
        "frac_max_uptake_sd": hdx_expr.frac_max_uptake_sd,
    }

    for col, expr in all_columns.items():
        if col not in df.columns:
            try:
                df = df.with_columns(expr)
            except Exception as e:
                if exception == "raise":
                    raise e
                elif exception == "warn":
                    warnings.warn(f"Failed to add column {col}: {e}")
                elif exception == "ignore":
                    pass
                else:
                    raise ValueError("Invalid exception handling option")

    return df

drop_null_columns(df)

Drop columns that are all null from the DataFrame.

Source code in hdxms_datasets/process.py
232
233
234
235
def drop_null_columns(df: nw.DataFrame) -> nw.DataFrame:
    """Drop columns that are all null from the DataFrame."""
    all_null_columns = [col for col in df.columns if df[col].is_null().all()]
    return df.drop(all_null_columns)

dynamx_cluster_to_state(cluster_data, nd_exposure=0.0)

convert dynamx cluster data to state data must contain only a single state

Source code in hdxms_datasets/process.py
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
def dynamx_cluster_to_state(cluster_data: nw.DataFrame, nd_exposure: float = 0.0) -> nw.DataFrame:
    """
    convert dynamx cluster data to state data
    must contain only a single state
    """

    assert len(cluster_data["state"].unique()) == 1, "Multiple states found in data"

    # determine undeuterated masses per peptide
    nd_data = cluster_data.filter(nw.col("exposure") == nd_exposure)
    nd_peptides: list[tuple[int, int]] = sorted(
        {(start, end) for start, end in zip(nd_data["start"], nd_data["end"])}
    )

    peptides_nd_mass = {}
    for p in nd_peptides:
        start, end = p
        df_nd_peptide = nd_data.filter((nw.col("start") == start) & (nw.col("end") == end))

        masses = df_nd_peptide["z"] * (df_nd_peptide["center"] - PROTON_MASS)
        nd_mass = ufloat_stats(masses, df_nd_peptide["inten"])

        peptides_nd_mass[p] = nd_mass

    groups = cluster_data.group_by(["start", "end", "exposure"])
    unique_columns = [
        "end",
        "exposure",
        "fragment",
        "maxuptake",
        "mhp",
        "modification",
        "protein",
        "sequence",
        "start",
        "state",
        "stop",
    ]
    records = []
    for (start, end, exposure), df_group in groups:
        record = {col: df_group[col][0] for col in unique_columns}

        rt = ufloat_stats(df_group["rt"], df_group["inten"])
        record["rt"] = rt.nominal_value
        record["rt_sd"] = rt.std_dev

        # state data 'center' is mass as if |charge| would be 1
        center = ufloat_stats(
            df_group["z"] * (df_group["center"] - PROTON_MASS) + PROTON_MASS, df_group["inten"]
        )
        record["center"] = center.nominal_value
        record["center_sd"] = center.std_dev

        masses = df_group["z"] * (df_group["center"] - PROTON_MASS)
        exp_mass = ufloat_stats(masses, df_group["inten"])

        if (start, end) in peptides_nd_mass:
            uptake = exp_mass - peptides_nd_mass[(start, end)]
            record["uptake"] = uptake.nominal_value
            record["uptake_sd"] = uptake.std_dev
        else:
            record["uptake"] = None
            record["uptake_sd"] = None

        records.append(record)

    d = records_to_dict(records)
    df = nw.from_dict(d, backend=BACKEND)

    if set(df.columns) == set(STATE_DATA_COLUMN_ORDER):
        df = df[STATE_DATA_COLUMN_ORDER]

    return df

merge_peptides(peptides, base_dir=Path.cwd())

Merge peptide tables from different deuteration types into a single DataFrame.

Source code in hdxms_datasets/process.py
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
def merge_peptides(peptides: list[Peptides], base_dir: Path = Path.cwd()) -> nw.DataFrame:
    """Merge peptide tables from different deuteration types into a single DataFrame."""
    peptide_types = {p.deuteration_type for p in peptides}
    if not peptides:
        raise ValueError("No peptides provided for merging.")

    if len(peptide_types) != len(peptides):
        raise ValueError(
            "Multiple peptides of the same type found. Please ensure unique deuteration types."
        )

    if DeuterationType.partially_deuterated not in peptide_types:
        raise ValueError("Partially deuterated peptide is required for uptake metrics calculation.")

    loaded_peptides = {
        p.deuteration_type.value: load_peptides(p, base_dir=base_dir) for p in peptides
    }

    merged = merge_peptide_tables(**loaded_peptides, column=None)
    return merged

sort_rows(df)

Sorts the DataFrame by state, exposure, start, end, file.

Source code in hdxms_datasets/process.py
216
217
218
219
220
def sort_rows(df: nw.DataFrame) -> nw.DataFrame:
    """Sorts the DataFrame by state, exposure, start, end, file."""
    all_by = ["state", "exposure", "start", "end", "replicate"]
    by = [col for col in all_by if col in df.columns]
    return df.sort(by=by)

ufloat_stats(array, weights)

Calculate the weighted mean and standard deviation.

Source code in hdxms_datasets/process.py
45
46
47
48
def ufloat_stats(array, weights) -> Variable:
    """Calculate the weighted mean and standard deviation."""
    weighted_stats = DescrStatsW(array, weights=weights, ddof=0)
    return ufloat(weighted_stats.mean, weighted_stats.std)