from __future__ import print_function
import os
import sys
import numpy as np
import nibabel as nib
import tempfile
import pandas as pd
import re
try: # run as a package if installed
from pcntoolkit import configs
except ImportError:
pass
path = os.path.abspath(os.path.dirname(__file__))
path = os.path.dirname(path) # parent directory
if path not in sys.path:
sys.path.append(path)
del path
import configs
CIFTI_MAPPINGS = ('dconn', 'dtseries', 'pconn', 'ptseries', 'dscalar',
'dlabel', 'pscalar', 'pdconn', 'dpconn',
'pconnseries', 'pconnscalar')
CIFTI_VOL_ATLAS = 'Atlas_ROIs.2.nii.gz'
PICKLE_PROTOCOL = configs.PICKLE_PROTOCOL
# ------------------------
# general utility routines
# ------------------------
[docs]def predictive_interval(s2_forward,
cov_forward,
multiplicator):
"""
Calculates a predictive interval for the forward model
"""
# calculates a predictive interval
PI = np.zeros(len(cov_forward))
for i, xdot in enumerate(cov_forward):
s = np.sqrt(s2_forward[i])
PI[i] = multiplicator*s
return PI
[docs]def create_mask(data_array, mask, verbose=False):
"""
Create a mask from a data array or a nifti file
Basic usage::
create_mask(data_array, mask, verbose)
:param data_array: numpy array containing the data to write out
:param mask: nifti image containing a mask for the image
:param verbose: verbose output
"""
# create a (volumetric) mask either from an input nifti or the nifti itself
if mask is not None:
if verbose:
print('Loading ROI mask ...')
maskvol = load_nifti(mask, vol=True)
maskvol = maskvol != 0
else:
if len(data_array.shape) < 4:
dim = data_array.shape[0:3] + (1,)
else:
dim = data_array.shape[0:3] + (data_array.shape[3],)
if verbose:
print('Generating mask automatically ...')
if dim[3] == 1:
maskvol = data_array[:, :, :] != 0
else:
maskvol = data_array[:, :, :, 0] != 0
return maskvol
[docs]def vol2vec(dat, mask, verbose=False):
"""
Vectorise a 3d image
Basic usage::
vol2vec(dat, mask, verbose)
:param dat: numpy array containing the data to write out
:param mask: nifti image containing a mask for the image
:param verbose: verbose output
"""
# vectorise a 3d image
if len(dat.shape) < 4:
dim = dat.shape[0:3] + (1,)
else:
dim = dat.shape[0:3] + (dat.shape[3],)
# mask = create_mask(dat, mask=mask, verbose=verbose)
if mask is None:
mask = create_mask(dat, mask=mask, verbose=verbose)
# mask the image
maskid = np.where(mask.ravel())[0]
dat = np.reshape(dat, (np.prod(dim[0:3]), dim[3]))
dat = dat[maskid, :]
# convert to 1-d array if the file only contains one volume
if dim[3] == 1:
dat = dat.ravel()
return dat
[docs]def file_type(filename):
"""
Determine the file type of a file
Basic usage::
file_type(filename)
:param filename: name of the file to check
"""
# routine to determine filetype
if filename.endswith(('.dtseries.nii', '.dscalar.nii', '.dlabel.nii')):
ftype = 'cifti'
elif filename.endswith(('.nii.gz', '.nii', '.img', '.hdr')):
ftype = 'nifti'
elif filename.endswith(('.txt', '.csv', '.tsv', '.asc')):
ftype = 'text'
elif filename.endswith(('.pkl')):
ftype = 'binary'
else:
raise ValueError("I don't know what to do with " + filename)
return ftype
[docs]def file_extension(filename):
"""
Determine the file extension of a file (e.g. .nii.gz)
Basic usage::
file_extension(filename)
:param filename: name of the file to check
"""
# routine to get the full file extension (e.g. .nii.gz, not just .gz)
parts = filename.split(os.extsep)
if parts[-1] == 'gz':
if parts[-2] == 'nii' or parts[-2] == 'img' or parts[-2] == 'hdr':
ext = parts[-2] + '.' + parts[-1]
else:
ext = parts[-1]
elif parts[-1] == 'nii':
if parts[-2] in CIFTI_MAPPINGS:
ext = parts[-2] + '.' + parts[-1]
else:
ext = parts[-1]
else:
ext = parts[-1]
ext = '.' + ext
return ext
[docs]def file_stem(filename):
"""
Determine the file stem of a file (e.g. /path/to/file.nii.gz -> file)
Basic usage::
file_stem(filename)
:param filename: name of the file to check
"""
idx = filename.find(file_extension(filename))
stm = filename[0:idx]
return stm
# --------------
# nifti routines
# --------------
[docs]def load_nifti(datafile, mask=None, vol=False, verbose=False):
"""
Load a nifti file into a numpy array
Basic usage::
load_nifti(datafile, mask, vol, verbose)
:param datafile: name of the file to load
:param mask: nifti image containing a mask for the image
:param vol: whether to load the image as a volume
:param verbose: verbose output
"""
if verbose:
print('Loading nifti: ' + datafile + ' ...')
img = nib.load(datafile)
dat = img.get_data()
if mask is not None:
mask = load_nifti(mask, vol=True)
if not vol:
dat = vol2vec(dat, mask)
return dat
[docs]def save_nifti(data, filename, examplenii, mask, dtype=None):
'''
Write output to nifti
Basic usage::
save_nifti(data, filename mask, dtype)
:param data: numpy array containing the data to write out
:param filename: where to store it
:param examplenii: nifti to copy the geometry and data type from
:mask: nifti image containing a mask for the image
:param dtype: data type for the output image (if different from the image)
'''
# load mask
if isinstance(mask, str):
mask = load_nifti(mask, vol=True)
mask = mask != 0
# load example image
ex_img = nib.load(examplenii)
ex_img.shape
dim = ex_img.shape[0:3]
if len(data.shape) < 2:
nvol = 1
data = data[:, np.newaxis]
else:
nvol = int(data.shape[1])
# write data
array_data = np.zeros((np.prod(dim), nvol))
array_data[mask.flatten(), :] = data
array_data = np.reshape(array_data, dim+(nvol,))
hdr = ex_img.header
if dtype is not None:
hdr.set_data_dtype(dtype)
array_data = array_data.astype(dtype)
array_img = nib.Nifti1Image(array_data, ex_img.affine, hdr)
nib.save(array_img, filename)
# --------------
# cifti routines
# --------------
[docs]def load_cifti(filename, vol=False, mask=None, rmtmp=True):
"""
Load a cifti file into a numpy array
Basic usage::
load_cifti(filename, vol, mask, rmtmp)
:param filename: name of the file to load
:param vol: whether to load the image as a volume
:param mask: nifti image containing a mask for the image
:param rmtmp: whether to remove temporary files
"""
# parse the name
dnam, fnam = os.path.split(filename)
fpref = file_stem(fnam)
outstem = os.path.join(tempfile.gettempdir(),
str(os.getpid()) + "-" + fpref)
# extract surface data from the cifti file
print("Extracting cifti surface data to ", outstem, '-*.func.gii', sep="")
giinamel = outstem + '-left.func.gii'
giinamer = outstem + '-right.func.gii'
os.system('wb_command -cifti-separate ' + filename +
' COLUMN -metric CORTEX_LEFT ' + giinamel)
os.system('wb_command -cifti-separate ' + filename +
' COLUMN -metric CORTEX_RIGHT ' + giinamer)
# load the surface data
giil = nib.load(giinamel)
giir = nib.load(giinamer)
Nimg = len(giil.darrays)
Nvert = len(giil.darrays[0].data)
if Nimg == 1:
out = np.concatenate((giil.darrays[0].data, giir.darrays[0].data),
axis=0)
else:
Gl = np.zeros((Nvert, Nimg))
Gr = np.zeros((Nvert, Nimg))
for i in range(0, Nimg):
Gl[:, i] = giil.darrays[i].data
Gr[:, i] = giir.darrays[i].data
out = np.concatenate((Gl, Gr), axis=0)
if rmtmp:
# clean up temporary files
os.remove(giinamel)
os.remove(giinamer)
if vol:
niiname = outstem + '-vol.nii'
print("Extracting cifti volume data to ", niiname, sep="")
os.system('wb_command -cifti-separate ' + filename +
' COLUMN -volume-all ' + niiname)
vol = load_nifti(niiname, vol=True)
volmask = create_mask(vol)
out = np.concatenate((out, vol2vec(vol, volmask)), axis=0)
if rmtmp:
os.remove(niiname)
return out
[docs]def save_cifti(data, filename, example, mask=None, vol=True, volatlas=None):
"""
Save a cifti file from a numpy array
Basic usage::
save_cifti(data, filename, example, mask, vol, volatlas)
:param data: numpy array containing the data to write out
:param filename: where to store it
:param example: example file to copy the geometry from
:param mask: nifti image containing a mask for the image
:param vol: whether to load the image as a volume
:param volatlas: atlas to use for the volume
"""
# do some sanity checks
if data.dtype == 'float32' or \
data.dtype == 'float' or \
data.dtype == 'float64':
data = data.astype('float32') # force 32 bit output
dtype = 'NIFTI_TYPE_FLOAT32'
else:
raise ValueError('Only float data types currently handled')
if len(data.shape) == 1:
Nimg = 1
data = data[:, np.newaxis]
else:
Nimg = data.shape[1]
# get the base filename
dnam, fnam = os.path.split(filename)
fstem = file_stem(fnam)
# Split the template
estem = os.path.join(tempfile.gettempdir(), str(os.getpid()) + "-" + fstem)
giiexnamel = estem + '-left.func.gii'
giiexnamer = estem + '-right.func.gii'
os.system('wb_command -cifti-separate ' + example +
' COLUMN -metric CORTEX_LEFT ' + giiexnamel)
os.system('wb_command -cifti-separate ' + example +
' COLUMN -metric CORTEX_RIGHT ' + giiexnamer)
# write left hemisphere
giiexl = nib.load(giiexnamel)
Nvertl = len(giiexl.darrays[0].data)
garraysl = []
for i in range(0, Nimg):
garraysl.append(
nib.gifti.gifti.GiftiDataArray(data=data[0:Nvertl, i],
datatype=dtype))
giil = nib.gifti.gifti.GiftiImage(darrays=garraysl)
fnamel = fstem + '-left.func.gii'
nib.save(giil, fnamel)
# write right hemisphere
giiexr = nib.load(giiexnamer)
Nvertr = len(giiexr.darrays[0].data)
garraysr = []
for i in range(0, Nimg):
garraysr.append(
nib.gifti.gifti.GiftiDataArray(data=data[Nvertl:Nvertl+Nvertr, i],
datatype=dtype))
giir = nib.gifti.gifti.GiftiImage(darrays=garraysr)
fnamer = fstem + '-right.func.gii'
nib.save(giir, fnamer)
tmpfiles = [fnamer, fnamel, giiexnamel, giiexnamer]
# process volumetric data
if vol:
niiexname = estem + '-vol.nii'
os.system('wb_command -cifti-separate ' + example +
' COLUMN -volume-all ' + niiexname)
niivol = load_nifti(niiexname, vol=True)
if mask is None:
mask = create_mask(niivol)
if volatlas is None:
volatlas = CIFTI_VOL_ATLAS
fnamev = fstem + '-vol.nii'
save_nifti(data[Nvertr+Nvertl:, :], fnamev, niiexname, mask)
tmpfiles.extend([fnamev, niiexname])
# write cifti
fname = fstem + '.dtseries.nii'
os.system('wb_command -cifti-create-dense-timeseries ' + fname +
' -volume ' + fnamev + ' ' + volatlas +
' -left-metric ' + fnamel + ' -right-metric ' + fnamer)
# clean up
for f in tmpfiles:
os.remove(f)
# --------------
# ascii routines
# --------------
[docs]def load_pd(filename):
"""
Load a csv file into a pandas dataframe
Basic usage::
load_pd(filename)
:param filename: name of the file to load
"""
# based on pandas
x = pd.read_csv(filename,
sep=' ',
header=None)
return x
[docs]def save_pd(data, filename):
"""
Save a pandas dataframe to a csv file
Basic usage::
save_pd(data, filename)
:param data: pandas dataframe containing the data to write out
:param filename: where to store it
"""
# based on pandas
data.to_csv(filename,
index=None,
header=None,
sep=' ',
na_rep='NaN')
[docs]def load_ascii(filename):
"""
Load an ascii file into a numpy array
Basic usage::
load_ascii(filename)
:param filename: name of the file to load
"""
# based on pandas
x = np.loadtxt(filename)
return x
[docs]def save_ascii(data, filename):
"""
Save a numpy array to an ascii file
Basic usage::
save_ascii(data, filename)
:param data: numpy array containing the data to write out
:param filename: where to store it
"""
# based on pandas
np.savetxt(filename, data)
# ----------------
# generic routines
# ----------------
[docs]def save(data, filename, example=None, mask=None, text=False, dtype=None):
"""
Save a numpy array to a file
Basic usage::
save(data, filename, example, mask, text, dtype)
:param data: numpy array containing the data to write out
:param filename: where to store it
:param example: example file to copy the geometry from
:param mask: nifti image containing a mask for the image
:param text: whether to write out a text file
:param dtype: data type for the output image (if different from the image)
"""
if file_type(filename) == 'cifti':
save_cifti(data.T, filename, example, vol=True)
elif file_type(filename) == 'nifti':
save_nifti(data.T, filename, example, mask, dtype=dtype)
elif text or file_type(filename) == 'text':
save_ascii(data, filename)
elif file_type(filename) == 'binary':
data = pd.DataFrame(data)
data.to_pickle(filename, protocol=PICKLE_PROTOCOL)
[docs]def load(filename, mask=None, text=False, vol=True):
"""
Load a numpy array from a file
Basic usage::
load(filename, mask, text, vol)
:param filename: name of the file to load
:param mask: nifti image containing a mask for the image
:param text: whether to write out a text file
:param vol: whether to load the image as a volume
"""
if file_type(filename) == 'cifti':
x = load_cifti(filename, vol=vol)
elif file_type(filename) == 'nifti':
x = load_nifti(filename, mask, vol=vol)
elif text or file_type(filename) == 'text':
x = load_ascii(filename)
elif file_type(filename) == 'binary':
x = pd.read_pickle(filename)
x = x.to_numpy()
return x
# -------------------
# sorting routines for batched in normative parallel
# -------------------
[docs]def tryint(s):
"""
Try to convert a string to an integer
Basic usage::
tryint(s)
:param s: string to convert
"""
try:
return int(s)
except ValueError:
return s
[docs]def alphanum_key(s):
"""
Turn a string into a list of numbers
Basic usage::
alphanum_key(s)
:param s: string to convert
"""
return [tryint(c) for c in re.split('([0-9]+)', s)]
[docs]def sort_nicely(l):
"""
Sort a list of strings in a natural way
Basic usage::
sort_nicely(l)
:param l: list of strings to sort
"""
return sorted(l, key=alphanum_key)