Source code for mne_rsa.rsa

# encoding: utf-8
"""Methods to compute representational similarity analysis (RSA).

Authors
-------
Marijn van Vliet <marijn.vanvliet@aalto.fi>
Egor Eremin <egor.eremin@aalto.fi>
"""

import numpy as np
from joblib import Parallel, delayed
from scipy import stats

from .folds import _match_order, create_folds
from .rdm import _ensure_condensed, _n_items_from_rdm, compute_rdm, compute_rdm_cv
from .searchlight import searchlight

try:
    # Version 1.8.0 and up
    from scipy.stats._stats_py import _kendall_dis
except ImportError:
    from scipy.stats.stats import _kendall_dis


def _kendall_tau_a(x, y):
    """Compute Kendall's Tau metric, A-variant.

    Taken from scipy.stats.kendalltau and modified to be the tau-a variant.
    """
    x = np.asarray(x).ravel()
    y = np.asarray(y).ravel()

    if x.size != y.size:
        raise ValueError(
            "All inputs to `kendalltau` must be of the same size,"
            " found x-size %s and y-size %s" % (x.size, y.size)
        )
    elif not x.size or not y.size:
        return np.nan  # Return NaN if arrays are empty

    def count_rank_tie(ranks):
        cnt = np.bincount(ranks).astype("int64", copy=False)
        cnt = cnt[cnt > 1]
        return (
            (cnt * (cnt - 1) // 2).sum(),
            (cnt * (cnt - 1.0) * (cnt - 2)).sum(),
            (cnt * (cnt - 1.0) * (2 * cnt + 5)).sum(),
        )

    size = x.size
    perm = np.argsort(y)  # sort on y and convert y to dense ranks
    x, y = x[perm], y[perm]
    y = np.r_[True, y[1:] != y[:-1]].cumsum(dtype="intp")

    # Perform stable sort on x and convert x to dense ranks.
    perm = np.argsort(x, kind="mergesort")
    x, y = x[perm], y[perm]
    x = np.r_[True, x[1:] != x[:-1]].cumsum(dtype="intp")

    dis = _kendall_dis(x, y)  # discordant pairs

    obs = np.r_[True, (x[1:] != x[:-1]) | (y[1:] != y[:-1]), True]
    cnt = np.diff(np.nonzero(obs)[0]).astype("int64", copy=False)

    ntie = (cnt * (cnt - 1) // 2).sum()  # joint ties
    xtie, x0, x1 = count_rank_tie(x)  # ties in x, stats
    ytie, y0, y1 = count_rank_tie(y)  # ties in y, stats

    tot = (size * (size - 1)) // 2

    if xtie == tot or ytie == tot:
        return np.nan

    # Note that tot = con + dis + (xtie - ntie) + (ytie - ntie) + ntie
    #               = con + dis + xtie + ytie - ntie
    con_minus_dis = tot - xtie - ytie + ntie - 2 * dis
    tau = con_minus_dis / tot
    # Limit range to fix computational errors.
    tau = min(1.0, max(-1.0, tau))

    return tau


def _consolidate_masks(masks):
    """Compute the union of multiple masks."""
    if type(masks[0]) is slice:
        mask = slice(None)
    else:
        mask = masks[0]
        for m in masks[1:]:
            mask &= m
    return mask


def _partial_correlation(rdm_data, rdm_model, masks=None, type="pearson"):
    """Compute partial Pearson/Spearman correlation."""
    if len(rdm_model) == 1:
        raise ValueError(
            "Need more than one model RDM to use partial correlation as metric."
        )
    if type not in ["pearson", "spearman"]:
        raise ValueError("Correlation type must be either 'pearson' or 'spearman'")

    if masks is not None:
        mask = _consolidate_masks(masks)
        rdm_model = [rdm[mask] for rdm in rdm_model]
        rdm_data = rdm_data[mask]

    X = np.vstack([rdm_data] + rdm_model).T
    if type == "spearman":
        X = np.apply_along_axis(stats.rankdata, 0, X)
    X = X - X.mean(axis=0)
    cov_X_inv = np.linalg.pinv(X.T @ X)
    norm = np.sqrt(np.outer(np.diag(cov_X_inv), np.diag(cov_X_inv)))
    R_partial = cov_X_inv / norm
    return -R_partial[0, 1:]


[docs] def rsa_gen(rdm_data_gen, rdm_model, metric="spearman", ignore_nan=False): """Generate RSA values between data and model RDMs. Will yield RSA scores for each data RDM. Parameters ---------- rdm_data_gen : generator of ndarray, shape (n_items, n_items) The generator for data RDMs rdm_model : ndarray, shape (n_items, n_items) | list of ndarray The model RDM, or list of model RDMs. metric : str The RSA metric to use to compare the RDMs. Valid options are: * 'spearman' for Spearman's correlation (the default) * 'pearson' for Pearson's correlation * 'kendall-tau-a' for Kendall's Tau (alpha variant) * 'partial' for partial Pearson correlations * 'partial-spearman' for partial Spearman correlations * 'regression' for linear regression weights Defaults to 'spearman'. ignore_nan : bool Whether to treat NaN's as missing values and ignore them when computing the distance metric. Defaults to ``False``. .. versionadded:: 0.8 Yields ------ rsa_val : float | ndarray, shape (len(rdm_model),) For each data RDM, the representational similarity with the model RDM. When multiple model RDMs are specified, this will be a 1D array of similarities, comparing the data RDM with each model RDM. See Also -------- rsa """ if isinstance(rdm_model, list): return_array = True rdm_model = [_ensure_condensed(rdm, "rdm_model") for rdm in rdm_model] else: return_array = False rdm_model = [_ensure_condensed(rdm_model, "rdm_model")] if ignore_nan: masks = [~np.isnan(rdm) for rdm in rdm_model] else: masks = [slice(None)] * len(rdm_model) # Precompute ranks for Spearman if metric == "spearman": rdm_model = [stats.rankdata(rdm) for rdm in rdm_model] for rdm_data in rdm_data_gen: rdm_data = _ensure_condensed(rdm_data, "rdm_data") if ignore_nan: data_mask = ~np.isnan(rdm_data) masks = [m & data_mask for m in masks] rsa_vals = _rsa_single_rdm(rdm_data, rdm_model, metric, masks, ignore_nan) if return_array: yield np.asarray(rsa_vals) else: yield rsa_vals[0]
def _rsa_single_rdm(rdm_data, rdm_model, metric, masks, ignore_nan): """Compute RSA between a single data RDM and one or more model RDMs.""" if metric == "spearman": if not ignore_nan: rdm_data = stats.rankdata(rdm_data) rsa_vals = [ np.corrcoef(rdm_data, rdm_model_[mask])[0, 1] for rdm_model_, mask in zip(rdm_model, masks) ] else: rsa_vals = [ np.corrcoef( stats.rankdata(rdm_data[mask]), stats.rankdata(rdm_model_[mask]) )[0, 1] for rdm_model_, mask in zip(rdm_model, masks) ] elif metric == "pearson": rsa_vals = [ stats.pearsonr(rdm_data[mask], rdm_model_[mask])[0] for rdm_model_, mask in zip(rdm_model, masks) ] elif metric == "kendall-tau-a": rsa_vals = [ _kendall_tau_a(rdm_data[mask], rdm_model_[mask]) for rdm_model_, mask in zip(rdm_model, masks) ] elif metric == "partial": rsa_vals = _partial_correlation(rdm_data, rdm_model, masks) elif metric == "partial-spearman": rsa_vals = _partial_correlation(rdm_data, rdm_model, masks, type="spearman") elif metric == "regression": mask = _consolidate_masks(masks) rdm_model = [rdm[mask] for rdm in rdm_model] rdm_data = rdm_data[mask] X = np.atleast_2d(np.array(rdm_model)).T X = X - X.mean(axis=0) y = rdm_data - rdm_data.mean() rsa_vals = np.linalg.lstsq(X, y, rcond=None)[0] else: raise ValueError( "Invalid RSA metric, must be one of: 'spearman', " "'pearson', 'partial', 'partial-spearman', " "'regression' or 'kendall-tau-a'." ) return rsa_vals
[docs] def rsa( rdm_data, rdm_model, metric="spearman", ignore_nan=False, n_data_rdms=None, n_jobs=1, verbose=False, ): """Perform RSA between data and model RDMs. Parameters ---------- rdm_data : ndarray, shape (n_items, n_items) | list | generator The data RDM (or list/generator of data RDMs). rdm_model : ndarray, shape (n_items, n_items) | list of ndarray The model RDM (or list of model RDMs). metric : str The RSA metric to use to compare the RDMs. Valid options are: * 'spearman' for Spearman's correlation (the default) * 'pearson' for Pearson's correlation * 'kendall-tau-a' for Kendall's Tau (alpha variant) * 'partial' for partial Pearson correlations * 'partial-spearman' for partial Spearman correlations * 'regression' for linear regression weights Defaults to 'spearman'. ignore_nan : bool Whether to treat NaN's as missing values and ignore them when computing the distance metric. Defaults to ``False``. .. versionadded:: 0.8 n_data_rdms : int | None The number of data RDMs. This is useful when displaying a progress bar, so an estimate can be made of the computation time remaining. This information is available if ``rdm_data`` is an array or a list, but if it is a generator, this information is not available and you may want to set it explicitly. n_jobs : int The number of processes (=number of CPU cores) to use. Specify -1 to use all available cores. Defaults to 1. verbose : bool Whether to display a progress bar. In order for this to work, you need the tqdm python module installed. Defaults to False. Returns ------- rsa_val : float | ndarray, shape (len(rdm_data), len(rdm_model)) Depending on whether one or more data and model RDMs were specified, a single similarity value or a 2D array of similarity values for each data RDM versus each model RDM. See Also -------- rsa_gen """ return_array = False if isinstance(rdm_data, list) or hasattr(rdm_data, "__next__"): return_array = True else: rdm_data = [rdm_data] if verbose: from tqdm import tqdm if n_data_rdms is not None: total = n_data_rdms elif hasattr(rdm_data, "__len__"): total = len(rdm_data) else: total = None rdm_data = tqdm(rdm_data, total=total, unit="RDM") if n_jobs == 1: rsa_vals = list(rsa_gen(rdm_data, rdm_model, metric, ignore_nan)) else: def process_single_rdm(rdm): return next(rsa_gen([rdm], rdm_model, metric, ignore_nan)) rsa_vals = Parallel(n_jobs)( delayed(process_single_rdm)(rdm) for rdm in rdm_data ) if return_array: return np.asarray(rsa_vals) else: return rsa_vals[0]
[docs] def rsa_array( X, rdm_model, patches=None, data_rdm_metric="correlation", data_rdm_params=dict(), rsa_metric="spearman", ignore_nan=False, y=None, labels_X=None, labels_rdm_model=None, n_folds=1, n_jobs=1, verbose=False, ): """Perform RSA on an array of data, possibly in a searchlight pattern. Parameters ---------- X : ndarray, shape (n_items, n_series, n_times) An array containing the data. rdm_model : ndarray, shape (n, n) | (n * (n - 1) // 2,) | list of ndarray The model RDM, see :func:`compute_rdm`. For efficiency, you can give it in condensed form, meaning only the upper triangle of the matrix as a vector. See :func:`scipy.spatial.distance.squareform`. To perform RSA against multiple models at the same time, supply a list of model RDMs. Use :func:`compute_rdm` to compute RDMs. patches : generator of tuples | None Searchlight patches as generated by :class:`searchlight`. If ``None``, no searchlight is used. Defaults to ``None``. data_rdm_metric : str The metric to use to compute the data RDMs. This can be any metric supported by the scipy.distance.pdist function. Defaults to 'correlation'. data_rdm_params : dict Extra arguments for the distance metric used to compute the RDMs. Refer to :mod:`scipy.spatial.distance` for a list of all other metrics and their arguments. Defaults to an empty dictionary. rsa_metric : str The RSA metric to use to compare the RDMs. Valid options are: * 'spearman' for Spearman's correlation (the default) * 'pearson' for Pearson's correlation * 'kendall-tau-a' for Kendall's Tau (alpha variant) * 'partial' for partial Pearson correlations * 'partial-spearman' for partial Spearman correlations * 'regression' for linear regression weights Defaults to 'spearman'. ignore_nan : bool Whether to treat NaN's as missing values and ignore them when computing the distance metric. Defaults to ``False``. .. versionadded:: 0.8 y : ndarray of int, shape (n_items,) | None Deprecated, use ``labels_X`` and ``labels_rdm_model`` instead. For each item, a number indicating the class to which the item belongs. Defaults to ``None``, in which case ``labels_X`` is used. labels_X : list | None For each element in ``X`` (=the first dimension), a label that identifies the item to which it corresponds. This is used in combination with ``labels_rdm_model`` to align the data and model RDMs before comparing them. Multiple elements in ``X`` may correspond to the same item, in which case they should have the same label and will either be averaged when computing the data RDM (``n_folds=1``) or used for cross-validation (``n_folds>1``). Labels may be of any python type that can be compared with ``==`` (int, float, string, tuple, etc). By default (``None``), the integers ``0:len(X)`` are used as labels. .. versionadded:: 0.10 labels_rdm_model: list | None For each row in ``rdm_model``, a label that identifies the item to which it corresponds. This is used in combination with ``labels_X`` to align the data and model RDMs before comparing them. Each row should have a unique label. Labels may be of any python type that can be compared with ``==`` (int, float, string, tuple, etc). By default (``None``), the integers ``0:n_rows`` are used as labels. .. versionadded:: 0.10 n_folds : int | sklearn.model_selection.BaseCrollValidator | None Number of cross-validation folds to use when computing the distance metric. Folds are created based on the ``y`` parameter. Specify ``None`` to use the maximum number of folds possible, given the data. Alternatively, you can pass a Scikit-Learn cross validator object (e.g. ``sklearn.model_selection.KFold``) to assert fine-grained control over how folds are created. Defaults to 1 (no cross-validation). n_jobs : int The number of processes (=number of CPU cores) to use. Specify -1 to use all available cores. Defaults to 1. verbose : bool Whether to display a progress bar. In order for this to work, you need the tqdm python module installed. Defaults to False. Returns ------- rsa_vals : ndarray, shape ([n_model_rdms,] [n_series,] [n_times,]) The RSA value for each searchlight patch. When ``spatial_radius`` is set to ``None``, there will only be no ``n_series`` dimension. When ``temporal_radius`` is set to ``None``, there will be no time dimension. When multiple models have been supplied, the first dimension will contain RSA results for each model. See Also -------- searchlight compute_rdm rdm_array """ if patches is None: patches = searchlight(X.shape) # one big searchlight patch if isinstance(rdm_model, list): rdm_model = [_ensure_condensed(rdm, "rdm_model") for rdm in rdm_model] else: rdm_model = [_ensure_condensed(rdm_model, "rdm_model")] # Align data with model RDM given the provided labels. if labels_X is None and y is not None: labels_X = y y = _match_order( len(X), _n_items_from_rdm(rdm_model[0]), labels_X, labels_rdm_model ) # Create folds for cross-validated RDM metrics. X = create_folds(X, y, n_folds) # The data is now folds x items x n_series x n_times. if ignore_nan: masks = [~np.isnan(rdm) for rdm in rdm_model] else: masks = [slice(None)] * len(rdm_model) # Precompute ranks for Spearman if rsa_metric == "spearman": rdm_model = [stats.rankdata(rdm) for rdm in rdm_model] if verbose: from tqdm import tqdm shape = getattr(patches, "shape", (-1,)) patches = tqdm(patches, unit="patch") try: setattr(patches, "shape", shape) except AttributeError: pass def rsa_single_patch(patch): """Compute RSA for a single searchlight patch.""" if len(X) == 1: # check number of folds # no cross-validation rdm_data = compute_rdm(X[0][patch], data_rdm_metric, **data_rdm_params) else: # with cross-validation rdm_data = compute_rdm_cv( X[(slice(None),) + patch], data_rdm_metric, **data_rdm_params ) if ignore_nan: data_mask = ~np.isnan(rdm_data) patch_masks = [m & data_mask for m in masks] else: patch_masks = masks return _rsa_single_rdm(rdm_data, rdm_model, rsa_metric, patch_masks, ignore_nan) # Call RSA multiple times in parallel for each searchlight patch. data = Parallel(n_jobs=n_jobs)( delayed(rsa_single_patch)(patch) for patch in patches ) # Figure out the desired dimensions of the resulting array. dims = getattr(patches, "shape", (-1,)) if len(rdm_model) == 1: return np.array(data).reshape(dims) else: dims = dims + (len(rdm_model),) data = np.array(data).reshape(dims) return np.rollaxis(data, axis=-1) # put n_models dim first