# -*- coding: utf-8 -*-
"""Extensions for MNE-Python's Forward operator.
Authors: Susanna Aro <susanna.aro@aalto.fi>
Marijn van Vliet <w.m.vanvliet@gmail.com>
"""
from copy import deepcopy
import numpy as np
from mne import Forward, SourceSpaces, channel_type
from mne._fiff.pick import _picks_to_idx
from mne.bem import _fit_sphere
from mne.forward import convert_forward_solution
from mne.io.constants import FIFF
from mne.transforms import (
Transform,
_cart_to_sph,
_ensure_trans,
apply_trans,
invert_transform,
read_trans,
)
from mne.utils import logger, verbose
from scipy.spatial import cKDTree
# from mne.externals.six import string_types
from six import string_types
from .utils import _find_indices_1d, get_morph_src_mapping
[docs]
@verbose
def select_vertices_in_sensor_range(
inst, dist, info=None, picks=None, trans=None, indices=False, verbose=None
):
"""Find vertices within given distance to a sensor.
Parameters
----------
inst : instance of Forward | instance of SourceSpaces
The object to select vertices from.
dist : float
The minimum distance between a vertex and the nearest sensor. All
vertices for which the distance to the nearest sensor exceeds this
limit are discarded.
info : instance of Info | None
The info structure that contains information about the channels. Only
needs to be specified if the object to select vertices from does is
an instance of SourceSpaces.
picks : array-like of int | None
Indices of sensors to include in the search for the nearest sensor. If
``None``, the default, only MEG channels are used.
trans : str | instance of Transform | None
Either the full path to the head<->MRI transform ``*-trans.fif`` file
produced during coregistration, or the Transformation itself. If trans
is None, an identity matrix is assumed. Only needed when ``inst`` is a
source space in MRI coordinates.
indices: False | True
If ``True``, return vertex indices instead of vertex numbers. Defaults
to ``False``.
verbose : bool | str | int | None
If not None, override default verbose level (see :func:`mne.verbose`
and :ref:`Logging documentation <tut_logging>` for more).
Returns
-------
vertices : pair of lists | list of int
Either a list of vertex numbers for the left and right hemisphere (if
``indices==False``) or a single list with vertex indices.
See Also
--------
restrict_forward_to_vertices : restrict Forward to the given vertices
restrict_src_to_vertices : restrict SourceSpaces to the given vertices
"""
if isinstance(inst, Forward):
info = inst["info"]
src = inst["src"]
elif isinstance(inst, SourceSpaces):
src = inst
if info is None:
raise ValueError(
"You need to specify an Info object with "
"information about the channels."
)
# Load the head<->MRI transform if necessary
if src[0]["coord_frame"] == FIFF.FIFFV_COORD_MRI:
if trans is None:
raise ValueError(
"Source space is in MRI coordinates, but no "
"head<->MRI transform was given. Please specify "
"the full path to the appropriate *-trans.fif "
'file as the "trans" parameter.'
)
if isinstance(trans, string_types):
trans = read_trans(trans, return_all=True)
for trans in trans: # we got at least 1
try:
trans = _ensure_trans(trans, "head", "mri")
except Exception:
pass
else:
break
else:
raise ValueError("No head->MRI transform.")
src_trans = invert_transform(_ensure_trans(trans, "head", "mri"))
print("Transform!")
else:
src_trans = Transform("head", "head") # Identity transform
dev_to_head = _ensure_trans(info["dev_head_t"], "meg", "head")
if picks is None:
try:
picks = _picks_to_idx(info, "meg")
except ValueError:
picks = []
if len(picks) > 0:
logger.info("Using MEG channels")
else:
logger.info("Using EEG channels")
picks = _picks_to_idx(info, "eeg")
src_pos = np.vstack(
[apply_trans(src_trans, s["rr"][s["inuse"].astype("bool")]) for s in src]
)
sensor_pos = []
for ch in picks:
# MEG channels are in device coordinates, translate them to head
if channel_type(info, ch) in ["mag", "grad"]:
sensor_pos.append(apply_trans(dev_to_head, info["chs"][ch]["loc"][:3]))
else:
sensor_pos.append(info["chs"][ch]["loc"][:3])
sensor_pos = np.array(sensor_pos)
# Find vertices that are within range of a sensor. We use a KD-tree for
# speed.
logger.info("Finding vertices within sensor range...")
tree = cKDTree(sensor_pos)
distances, _ = tree.query(src_pos, distance_upper_bound=dist)
# Vertices out of range are flagged as np.inf
src_sel = np.isfinite(distances)
logger.info("[done]")
if indices:
return np.flatnonzero(src_sel)
else:
n_lh_verts = src[0]["nuse"]
lh_sel, rh_sel = src_sel[:n_lh_verts], src_sel[n_lh_verts:]
vert_lh = src[0]["vertno"][lh_sel]
vert_rh = src[1]["vertno"][rh_sel]
return [vert_lh, vert_rh]
[docs]
@verbose
def restrict_forward_to_vertices(
fwd, vertno_or_idx, check_vertno=True, copy=True, verbose=None
):
"""Restrict the forward model to the given vertices.
.. note :: The order of the vertices in ``vertno_or_idx`` does not matter.
Forward objects will always have the vertices ordered by vertex
number. This also means this function cannot be used to re-order
the rows of the leadfield matrix.
Parameters
----------
fwd : instance of Forward
The forward operator to restrict the vertices of.
vertno_or_idx : tuple of lists (vertno_lh, vertno_rh) | list of int
Either, for each hemisphere, the vertex numbers to keep. Or a single
list of vertex indices to keep. All other vertices are discarded.
check_vertno : bool
Whether to check that all requested vertices are present in the forward
solution and raise an IndexError if this is not the case. Defaults to
True. If all vertices are guaranteed to be present, you can disable
this check for avoid unnecessary computation.
copy : bool
Whether to operate in place (``False``) to on a copy (``True``, the
default).
verbose : bool | str | int | None
If not None, override default verbose level (see :func:`mne.verbose`
and :ref:`Logging documentation <tut_logging>` for more).
Returns
-------
fwd_out : instance of Forward
The restricted forward operator.
See Also
--------
select_vertices_sens_distance : Find the vertices within the given sensor
distance.
"""
if copy:
fwd_out = deepcopy(fwd)
else:
fwd_out = fwd
lh_vertno, rh_vertno = [src["vertno"] for src in fwd["src"]]
if isinstance(vertno_or_idx[0], int):
logger.info("Interpreting given vertno_or_idx as vertex indices.")
vertno_or_idx = np.asarray(vertno_or_idx)
# Make sure the vertices are in sequential order
fwd_idx = np.sort(vertno_or_idx)
n_vert_lh = len(lh_vertno)
sel_lh_idx = vertno_or_idx[fwd_idx < n_vert_lh]
sel_rh_idx = vertno_or_idx[fwd_idx >= n_vert_lh] - n_vert_lh
sel_lh_vertno = lh_vertno[sel_lh_idx]
sel_rh_vertno = rh_vertno[sel_rh_idx]
else:
logger.info("Interpreting given vertno_or_idx as vertex numbers.")
# Make sure vertno_or_idx is sorted
vertno_or_idx = [np.sort(v) for v in vertno_or_idx]
sel_lh_vertno, sel_rh_vertno = vertno_or_idx
src_lh_idx = _find_indices_1d(lh_vertno, sel_lh_vertno, check_vertno)
src_rh_idx = _find_indices_1d(rh_vertno, sel_rh_vertno, check_vertno)
fwd_idx = np.hstack((src_lh_idx, src_rh_idx + len(lh_vertno)))
logger.info(
"Restricting forward solution to %d out of %d vertices."
% (len(fwd_idx), len(lh_vertno) + len(rh_vertno))
)
n_orient = fwd["sol"]["ncol"] // fwd["nsource"]
n_orig_orient = fwd["_orig_sol"].shape[1] // fwd["nsource"]
fwd_out["source_rr"] = fwd["source_rr"][fwd_idx]
fwd_out["nsource"] = len(fwd_idx)
def _reshape_select(X, dim3, sel):
"""Make matrix X 3D and select along the second dimension."""
dim1 = X.shape[0]
X = X.reshape(dim1, -1, dim3)
X = X[:, sel, :]
return X.reshape(dim1, -1)
fwd_out["source_nn"] = _reshape_select(fwd["source_nn"].T, n_orient, fwd_idx).T
fwd_out["sol"]["data"] = _reshape_select(fwd["sol"]["data"], n_orient, fwd_idx)
fwd_out["sol"]["ncol"] = fwd_out["sol"]["data"].shape[1]
if "sol_grad" in fwd and fwd["sol_grad"] is not None:
fwd_out["sol_grad"] = _reshape_select(fwd["sol_grad"], n_orient, fwd_idx)
if "_orig_sol" in fwd:
fwd_out["_orig_sol"] = _reshape_select(fwd["_orig_sol"], n_orig_orient, fwd_idx)
if "_orig_sol_grad" in fwd and fwd["_orig_sol_grad"] is not None:
fwd_out["_orig_sol_grad"] = _reshape_select(
fwd["_orig_sol_grad"], n_orig_orient, fwd_idx
)
# Restrict the SourceSpaces inside the forward operator
fwd_out["src"] = restrict_src_to_vertices(
fwd_out["src"],
[sel_lh_vertno, sel_rh_vertno],
check_vertno=False,
verbose=False,
)
return fwd_out
[docs]
@verbose
def restrict_src_to_vertices(
src, vertno_or_idx, check_vertno=True, copy=True, verbose=None
):
"""Restrict a source space to the given vertices.
.. note :: The order of the vertices in ``vertno_or_idx`` does not matter.
SourceSpaces objects will always have the vertices ordered by
vertex number.
Parameters
----------
src: instance of SourceSpaces
The source space to be restricted.
vertno_or_idx : tuple of lists (vertno_lh, vertno_rh) | list of int
Either, for each hemisphere, the vertex numbers to keep. Or a single
list of vertex indices to keep. All other vertices are discarded.
check_vertno : bool
Whether to check that all requested vertices are present in the
SourceSpaces and raise an IndexError if this is not the case. Defaults
to True. If all vertices are guaranteed to be present, you can disable
this check for avoid unnecessary computation.
copy : bool
Whether to operate in place (``False``) to on a copy (``True``, the
default).
verbose : bool | str | int | None
If not None, override default verbose level (see :func:`mne.verbose`
and :ref:`Logging documentation <tut_logging>` for more).
Returns
-------
src_out : instance of SourceSpaces
The restricted source space.
"""
if copy:
src_out = deepcopy(src)
else:
src_out = src
if vertno_or_idx:
if isinstance(vertno_or_idx[0], int):
logger.info("Interpreting given vertno_or_idx as vertex indices.")
vertno_or_idx = np.asarray(vertno_or_idx)
n_vert_lh = src[0]["nuse"]
ind_lh = vertno_or_idx[vertno_or_idx < n_vert_lh]
ind_rh = vertno_or_idx[vertno_or_idx >= n_vert_lh] - n_vert_lh
vert_no_lh = src[0]["vertno"][ind_lh]
vert_no_rh = src[1]["vertno"][ind_rh]
else:
logger.info("Interpreting given vertno_or_idx as vertex numbers.")
vert_no_lh, vert_no_rh = vertno_or_idx
if check_vertno:
if not (
np.all(np.in1d(vert_no_lh, src[0]["vertno"]))
and np.all(np.in1d(vert_no_rh, src[1]["vertno"]))
):
raise ValueError(
"One or more vertices were not present in" " SourceSpaces."
)
else:
# Empty list
vert_no_lh, vert_no_rh = [], []
logger.info(
"Restricting source space to %d out of %d vertices."
% (len(vert_no_lh) + len(vert_no_rh), src[0]["nuse"] + src[1]["nuse"])
)
for hemi, verts in zip(src_out, (vert_no_lh, vert_no_rh)):
# Ensure vertices are in sequential order
verts = np.sort(verts)
# Restrict the source space
hemi["vertno"] = verts
hemi["nuse"] = len(verts)
hemi["inuse"] = hemi["inuse"].copy()
hemi["inuse"].fill(0)
if hemi["nuse"] > 0: # Don't use empty array as index
hemi["inuse"][verts] = 1
hemi["use_tris"] = np.array([[]], int)
hemi["nuse_tri"] = np.array([0])
return src_out
[docs]
@verbose
def restrict_forward_to_sensor_range(fwd, dist, picks=None, verbose=None):
"""Restrict forward operator to sources within given distance to a sensor.
For each vertex defined in the source space, finds the nearest sensor and
discards the vertex if the distance to this sensor the given
distance.
Parameters
----------
fwd : instance of Forward
The forward operator to restrict the vertices of.
dist : float
The minimum distance between a vertex and the nearest sensor (in
meters). All vertices for which the distance to the nearest sensor
exceeds this limit are discarded.
picks : array-like of int | None
Indices of sensors to include in the search for the nearest sensor. If
None, the default, meg channels are used.
verbose : bool | str | int | None
If not None, override default verbose level (see :func:`mne.verbose`
and :ref:`Logging documentation <tut_logging>` for more).
Returns
-------
fwd_out : instance of Forward
A copy of the forward operator, restricted to the given sensor range
See Also
--------
restrict_fwd_to_stc : Restrict the forward operator to the vertices defined
in a source estimate object.
restrict_fwd_to_label : Restrict the forward operator to specific labels.
"""
vertno = select_vertices_in_sensor_range(fwd, dist, picks, verbose=verbose)
return restrict_forward_to_vertices(fwd, vertno, verbose=verbose)
def _make_radial_coord_system(points, origin):
"""Compute a radial coordinate system at the given points.
For each point X, a set of three unit vectors is computed that point along
the axes of a radial coordinate system. The first axis of the coordinate
system is in the direction of the line between X and the origin point. The
second and third axes are perpendicular to the first axis.
Parameters
----------
points : ndarray, shape (n_points, 3)
For each point, the XYZ carthesian coordinates.
origin : (x, y, z)
A tuple (or other array-like) containing the XYZ carthesian coordinates
of the point of origin. This can for example be the center of a sphere
fitted through the points.
Returns
-------
radial : ndarray, shape (n_points, 3)
For each point X, a unit vector pointing in the radial direction, i.e.,
the direction of the line between X and the origin point. This is the
first axis of the coordinate system.
tan1 : ndarray, shape (n_points, 3)
For each point, a unit vector perpendicular to both ``radial`` and
``tan2``. This is the second axis of the coordinate system.
tan2 : ndarray, shape (n_points, 3)
For each point, a unit vector perpendicular to both ``radial`` and
``tan1``. This is the third axis of the coordinate system.
"""
radial = points - origin
radial /= np.linalg.norm(radial, axis=1)[:, np.newaxis]
theta = _cart_to_sph(radial)[:, 1]
# Compute tangential directions
tan1 = np.vstack((-np.sin(theta), np.cos(theta), np.zeros(len(points)))).T
tan2 = np.cross(radial, tan1)
return radial, tan1, tan2
def _plot_coord_system(points, dim1, dim2, dim3, scale=0.001, n_ori=3):
"""Plot the results of _make_radial_coord_system.
Usage:
>>> _, origin = _fit_sphere(fwd['source_rr'])
... rad, tan1, tan2 = _make_radial_coord_system(fwd['source_rr'], origin)
... _plot_coord_system(fwd['source_rr'], rad, tan1, tan2)
Use ``scale`` to control the size of the arrows.
"""
from mayavi import mlab
f = mlab.figure(size=(600, 600))
red, blue, black = (1, 0, 0), (0, 0, 1), (0, 0, 0)
if n_ori == 3:
mlab.quiver3d(
points[:, 0],
points[:, 1],
points[:, 2],
dim1[:, 0],
dim1[:, 1],
dim1[:, 2],
scale_factor=scale,
color=red,
)
if n_ori > 1:
mlab.quiver3d(
points[:, 0],
points[:, 1],
points[:, 2],
dim2[:, 0],
dim2[:, 1],
dim2[:, 2],
scale_factor=scale,
color=blue,
)
mlab.quiver3d(
points[:, 0],
points[:, 1],
points[:, 2],
dim3[:, 0],
dim3[:, 1],
dim3[:, 2],
scale_factor=scale,
color=black,
)
return f
[docs]
def forward_to_tangential(fwd, center=None):
"""Convert a free orientation forward solution to a tangential one.
Places two source dipoles at each vertex that are oriented tangentially to
a sphere with its origin at the center of the brain. Recomputes the forward
model according to the new dipoles.
Parameters
----------
fwd : instance of Forward
The forward solution to convert.
center : tuple of float (x, y, z) | None
The carthesian coordinates of the center of the brain. By default, a
sphere is fitted through all the points in the source space.
Returns
-------
fwd_out : instance of Forward
The tangential forward solution.
"""
if fwd["source_ori"] != FIFF.FIFFV_MNE_FREE_ORI:
raise ValueError("Forward solution needs to have free orientation.")
n_sources, n_channels = fwd["nsource"], fwd["nchan"]
if fwd["sol"]["ncol"] // n_sources == 2:
raise ValueError(
"Forward solution already seems to be in tangential " "orientation."
)
# Compute two dipole directions tangential to a sphere that has its origin
# in the center of the brain.
if center is None:
_, center = _fit_sphere(fwd["source_rr"], disp=False)
_, tan1, tan2 = _make_radial_coord_system(fwd["source_rr"], center)
# Make sure the forward solution is in head orientation for this
fwd_out = convert_forward_solution(fwd, surf_ori=False, copy=True)
G = fwd_out["sol"]["data"].reshape(n_channels, n_sources, 3)
# Compute the forward solution for the new dipoles
Phi = np.einsum("ijk,ljk->ijl", G, [tan1, tan2])
fwd_out["sol"]["data"] = Phi.reshape(n_channels, 2 * n_sources)
fwd_out["sol"]["ncol"] = 2 * n_sources
# Store the source orientations
fwd_out["source_nn"] = np.stack((tan1, tan2), axis=1).reshape(-1, 3)
# Mark the orientation as free for now. In the future we should add a
# new constant to indicate "tangential" orientations.
fwd_out["source_ori"] = FIFF.FIFFV_MNE_FREE_ORI
return fwd_out
[docs]
def select_shared_vertices(insts, ref_src=None, subjects_dir=None):
"""Select the vertices that are present in each of the given objects.
Produces a list of vertices which are present in each of the given objects.
Objects can either be instances of SourceSpaces or Forward.
If the given source spaces are from different subjects, each vertex number
will not necessarily refer to the same vertex in each source space. In this
case, supply the source space that will be use as a reference point as the
``ref_src`` parameter. All source spaces will be morphed to the reference
source space to determine corresponding vertices between subjects.
Parameters
----------
insts : list of instance of (SourceSpaces | Forward)
The objects to select the vertices from. Each object can have a
different number of vertices defined.
ref_src : instance of SourceSpaces | None
The source space to use as reference point to determine corresponding
vertices between subjects. If ``None`` (the default), vertex numbers
are assumed to correspond to the same vertex in all source spaces.
subjects_dir : str | None
Path to SUBJECTS_DIR if it is not set in the environment. Only needed
if ``ref_src`` is specified.
Returns
-------
vertices : two lists | list of tuple of lists
Two lists with the selected vertex numbers in each hemisphere. If
``ref_subject`` is specified, for each object, two lists with the
selected vertex numbers in each hemisphere.
"""
src_spaces = []
for inst in insts:
if isinstance(inst, SourceSpaces):
src_spaces.append(inst)
elif isinstance(inst, Forward):
src_spaces.append(inst["src"])
else:
raise ValueError(
"Given instances must either be of type "
"SourceSpaces or Forward, not %s." % type(inst)
)
if ref_src is not None:
# Map the vertex numbers to the reference source space and vice-versa
ref_to_subj = list()
subj_to_ref = list()
for src in src_spaces:
mappings = get_morph_src_mapping(ref_src, src, subjects_dir=subjects_dir)
ref_to_subj.append(mappings[0])
subj_to_ref.append(mappings[1])
vert_lh = ref_src[0]["vertno"]
vert_rh = ref_src[1]["vertno"]
else:
vert_lh = src_spaces[0][0]["vertno"]
vert_rh = src_spaces[0][1]["vertno"]
# Drop any vertices missing from one of the source spaces from the list
for i, src in enumerate(src_spaces):
subj_vert_lh = src[0]["vertno"]
subj_vert_rh = src[1]["vertno"]
if ref_src is not None:
# Map vertex numbers to reference source space
subj_vert_lh = [subj_to_ref[i][0][v] for v in subj_vert_lh]
subj_vert_rh = [subj_to_ref[i][1][v] for v in subj_vert_rh]
vert_lh = np.intersect1d(vert_lh, subj_vert_lh)
vert_rh = np.intersect1d(vert_rh, subj_vert_rh)
if ref_src is not None:
# Map vertex numbers from reference source space to each source space
verts_lh = [
np.array([ref_to_subj[i][0][v] for v in vert_lh])
for i in range(len(src_spaces))
]
verts_rh = [
np.array([ref_to_subj[i][1][v] for v in vert_rh])
for i in range(len(src_spaces))
]
return list(zip(verts_lh, verts_rh))
else:
return [vert_lh, vert_rh]