import random
from warnings import warn
from collections.abc import Iterable
from collections import Counter
from copy import copy
from operator import itemgetter
from sys import version_info
from abc import abstractmethod
import numpy as np
if version_info.major > 2:
from pyradigm.base import BaseDataset, CompatibilityException
from pyradigm import MLDataset, ClassificationDataset as ClfDataset, \
RegressionDataset as RegrDataset
else:
raise NotImplementedError('pyradigm supports only python 3 or higher! '
'Upgrade to Python 3+ is recommended.')
class BaseMultiDataset(object):
"""
Container data structure to hold and manage multiple MLDataset instances.
Key uses:
- Uniform processing individual MLDatasets e.g. querying same set of IDs
- ensuring correspondence across multiple datasets in CV
"""
def __init__(self,
dataset_class=BaseDataset,
dataset_spec=None,
name='MultiDataset'):
"""
Constructor.
Parameters
----------
dataset_spec : Iterable or None
List of MLDatasets, or absolute paths to serialized MLDatasets.
"""
if issubclass(dataset_class, BaseDataset):
self._dataset_class = dataset_class
else:
raise TypeError('Input class type is not recognized!'
' Must be a child class of pyradigm.BaseDataset')
self.name = name
self._list = list()
self._is_init = False
# number of modalities for each sample id
self.modality_count = 0
self._ids = set()
self.targets = dict()
self._modalities = dict()
self._labels = dict()
self.feature_names = dict()
self.num_features = list()
# TODO more efficient internal repr is possible as ids/classes do not need be
# stored redundantly for each dataset
# perhaps as different attributes/modalities/feat-sets (of .data) for example?
if dataset_spec is not None:
if not isinstance(dataset_spec, Iterable) or len(dataset_spec) < 1:
raise ValueError('Input must be a list of atleast two datasets.')
self._load(dataset_spec)
def _load(self, dataset_spec):
"""Actual loading of datasets"""
for idx, ds in enumerate(dataset_spec):
self.append(ds, idx)
def _get_id(self):
"""Returns an ID for a new dataset that's different from existing ones."""
self.modality_count += 1
return self.modality_count
def append(self, dataset, identifier):
"""
Adds a dataset, if compatible with the existing ones.
Parameters
----------
dataset : pyradigm dataset or compatible
identifier : hashable
String or integer or another hashable to uniquely identify this dataset
"""
if isinstance(dataset, str):
dataset = self._dataset_class(dataset_path=dataset)
if not isinstance(dataset, self._dataset_class):
raise CompatibilityException('Incompatible dataset. '
'You can only add instances of '
'type {}'.format(self._dataset_class))
if len(dataset.description)>0:
identifier = dataset.description
if not self._is_init:
self._ids = set(dataset.samplet_ids)
self.targets = dataset.targets
self._target_sizes = dataset.target_sizes
self.num_samplets = len(self._ids)
self._modalities[identifier] = dataset.data
self.feature_names[identifier] = dataset.feature_names
self.num_features.append(dataset.num_features)
# maintaining a no-data pyradigm Dataset internally to reuse its methods
self._dataset = copy(dataset)
# replacing its data with zeros
self._dataset.data = {id_: np.zeros(1) for id_ in self._ids}
if hasattr(dataset, 'attr'):
self._common_attr = dataset.attr
self._common_attr_dtype = dataset.attr_dtype
else:
self._common_attr = dict()
self._common_attr_dtype = dict()
self._attr = dict()
self._is_init = True
else:
# this also checks for the size (num_samplets)
if set(dataset.samplet_ids) != self._ids:
raise CompatibilityException(
'Differing set of IDs in two datasets.'
' Unable to add this dataset to the MultiDataset.')
if dataset.targets != self.targets:
raise CompatibilityException(
'Targets for some IDs differ in the two datasets.'
' Unable to add this dataset to the MultiDataset.')
if identifier not in self._modalities:
self._modalities[identifier] = dataset.data
self.feature_names[identifier] = dataset.feature_names
self.num_features.append(dataset.num_features)
else:
raise KeyError('{} already exists in MultiDataset'
''.format(identifier))
if hasattr(dataset, 'attr'):
if len(self._common_attr) < 1:
# no attributes were set at all - simple copy sufficient
self._common_attr = dataset.attr.copy()
self._common_attr_dtype = dataset.attr_dtype.copy()
else:
for a_name in dataset.attr:
if a_name not in self._common_attr:
self._common_attr[a_name] = dataset.attr[a_name]
self._common_attr_dtype[a_name] = \
dataset.attr_dtype[a_name]
elif self._common_attr[a_name] != dataset.attr[a_name]:
raise ValueError(
'Values and/or IDs differ for attribute {}. '
'Ensure all datasets have common attributes '
'with the same values'.format(a_name))
# each addition should be counted, if successful
self.modality_count += 1
@property
def samplet_ids(self):
"""List of samplet IDs in the multi-dataset"""
return list(self._ids)
@property
def modality_ids(self):
"""List of identifiers for all modalities/datasets, sorted for reproducibility."""
return sorted(list(self._modalities.keys()))
@abstractmethod
def __str__(self):
"""human readable repr"""
def _common_str(self):
"""basic str() with common elements"""
return "{}:\n\t{} samples, {} modalities, dims: {}" \
"\n\tIdentifiers: {}" \
"\n\tAttributes: {}" \
"".format(self.name, self.num_samplets, self.modality_count,
self.num_features,
', '.join([str(k) for k in self.modality_ids]),
', '.join([str(k) for k in self._common_attr.keys()]))
@abstractmethod
def holdout(self,
train_perc=0.7,
num_rep=50,
return_ids_only=False,
format='MLDataset'):
"""
Builds a generator for train and test sets for cross-validation.
"""
def _get_data(self, id_list, format='MLDataset'):
"""Returns the data, from all modalities, for a given list of IDs"""
format = format.lower()
features = list() # returning a dict would be better if AutoMKL() can handle it
for modality, data in self._modalities.items():
if format in ('ndarray', 'data_matrix'):
# turning dict of arrays into a data matrix
# this is arguably worse, as labels are difficult to pass
subset = np.array(itemgetter(*id_list)(data))
elif format in ('mldataset', 'pyradigm'):
# getting container with fake data
subset = self._dataset.get_subset(id_list)
# injecting actual features
subset.data = { id_: data[id_] for id_ in id_list }
else:
raise ValueError('Invalid output format - choose only one of '
'pyradigm or data_matrix')
features.append(subset)
return features
def __iter__(self):
"""Iterable mechanism"""
for modality, data in self._modalities.items():
yield modality, np.array([np.array(item) for item in data.values()])
def get_subsets(self, subset_list):
"""
Returns the requested subsets of data while iterating over modalities
if subset_list were to contain two sets of IDs e.g. (train, test)
return data would be this tuple:
(modality, (train_data, train_targets), (test_data, test_targets))
"""
for modality, data in self._modalities.items():
yield modality, ( (np.array(itemgetter(*subset)(data)),
np.array(itemgetter(*subset)(self.targets)))
for subset in subset_list )
@property
def common_attr(self):
"""Attributes common to all subjects/datasets, such as covariates, in this
MultiDataset"""
return self._common_attr
def get_common_attr(self, names, subset, not_found_value=None):
"""Helper to retrieve the requested attributes common to all datasets."""
data, dtypes = list(), list()
for name in names:
if name not in self._common_attr:
raise AttributeError('Attr {} not set for this MultiDataset'
''.format(name))
this_data = np.array([self._common_attr[name].get(sid, not_found_value)
for sid in subset],
dtype=self._common_attr_dtype[name])
data.append(this_data)
dtypes.append(self._common_attr_dtype[name])
return data, dtypes
def set_attr(self, ds_id, attr_name, attr_value):
"""Method to set modality-/dataset-specific attributes"""
if ds_id not in self._modalities:
raise KeyError('Dataset {} not in this {} multi_dataset'
''.format(ds_id, self._name))
if ds_id not in self._attr:
self._attr[ds_id] = dict()
self._attr[ds_id][attr_name] = attr_value
def get_attr(self, ds_id, attr_name, not_found_value='raise'):
"""Method to retrieve modality-/dataset-specific attributes"""
if ds_id not in self._modalities:
raise KeyError('Dataset {} not in this {} multi_dataset'
''.format(ds_id, self._name))
try:
return self._attr[ds_id][attr_name]
except KeyError:
msg = 'attribute {} not set for dataset {}'.format(attr_name, ds_id)
if not_found_value.lower() in ('raise', ):
raise KeyError(msg)
else:
warn(msg)
return not_found_value
[docs]class MultiDatasetClassify(BaseMultiDataset):
"""Container class to manage multimodal classification datasets."""
def __init__(self,
dataset_spec=None,
name='MultiDatasetClassify',
subgroup=None):
"""
Constructor.
Parameters
----------
dataset_spec : Iterable or None
List of pyradigms, or absolute paths to serialized pyradigm Datasets.
name : str
human readable name for printing purposes
"""
self._sub_groups = subgroup
if subgroup is None:
super().__init__(dataset_class=ClfDataset,
dataset_spec=dataset_spec,
name=name)
else:
super().__init__(dataset_class=ClfDataset, dataset_spec=None, name=name)
for idx, ds in enumerate(dataset_spec):
self.append_subgroup(ds, idx, subgroup)
[docs] def append_subgroup(self, dataset, identifier, subgroup):
"""Custom add method"""
if isinstance(dataset, str):
dataset = self._dataset_class(dataset_path=dataset)
target_set = set(dataset.target_set)
subgroup = set(subgroup)
if subgroup is None or subgroup == target_set:
ds_out = dataset
elif subgroup < target_set: # < on sets is an issubset operation
subgroup = sorted(list(subgroup)) # ensure reproducibility
new_id = '{}({}) {}'.format(dataset.description,
','.join(subgroup),
identifier)
ds_out = dataset.get_class(subgroup)
# overriding the "Subset derived from ... "
ds_out.description = new_id
else:
raise ValueError('One or more classes in {} do not exist in\n{}'
''.format(subgroup, dataset.description))
self.append(ds_out, identifier=identifier)
@property
def target_set(self):
"""Set of targets/classes in this multi-dataset"""
return set(self.targets.values())
@property
def target_sizes(self):
"""
Sizes of targets in this classification dataset.
Useful for summary and to compute chance accuracy.
"""
return Counter(self.targets.values())
def __str__(self):
"""human readable repr"""
string = "{}\n\tClasses n={}, sizes " \
"".format(self._common_str(), len(self._target_sizes))
string += ', '.join(['{}: {}'.format(c, n)
for c, n in self._target_sizes.items()])
return string
def __repr__(self):
return self.__str__()
def __format__(self, format_spec):
return self.__str__()
[docs] def holdout(self,
train_perc=0.7,
num_rep=50,
stratified=True,
return_ids_only=False,
format='MLDataset'):
"""
Builds a holdout generator for train and test sets for cross-validation.
Ensures all the classes are represented equally in the training set.
Parameters
----------
train_perc : float
Percentage (0, 1) of samplets from each class to be selected for the
training set. Remaining IDs from each class will be added to test set.
num_rep : int
Number of holdout repetitions
return_ids_only : bool
Whether to return samplet IDs only, or the corresponding Datasets
format : str
Format of the Dataset to be returned when return_ids_only=False
format='MLDataset' returns the full-blown pyradigm data structure, and
format='data_matrix' returns just the feature matrix X in ndarray format
Returns
-------
train, test : tuple
A tuple (in order train, test) of IDs or Datasets
Raises
------
ValueError
If train_perc is < 0 or > 1
If num_rep is not int, or < 1 or infinite
"""
if train_perc <= 0.0 or train_perc >= 1.0:
raise ValueError('Train percentage must be > 0.0 and < 1.0')
num_rep = int(num_rep)
if not np.isfinite(num_rep) or num_rep < 1:
raise ValueError('Number of repetitions must be > 1 and be finite.')
ids_in_class = {cid: self._dataset.sample_ids_in_class(cid)
for cid in self._target_sizes.keys()}
sizes_numeric = np.array([len(ids_in_class[cid])
for cid in ids_in_class.keys()])
size_per_class, total_test_count = compute_training_sizes(
train_perc, sizes_numeric, stratified=stratified)
if len(self._target_sizes) != len(size_per_class):
raise ValueError('size spec differs in num elements with class sizes!')
for rep in range(num_rep):
print('rep {}'.format(rep))
train_set = list()
for index, (cls_id, class_size) in enumerate(self._target_sizes.items()):
# shuffling the IDs each time
random.shuffle(ids_in_class[cls_id])
subset_size = max(0, min(class_size, size_per_class[index]))
if subset_size < 1 or class_size < 1:
warn('No subjects from class {} were selected.'
''.format(cls_id))
else:
subsets_this_class = ids_in_class[cls_id][0:size_per_class[index]]
train_set.extend(subsets_this_class)
# this ensures both are mutually exclusive!
test_set = list(self._ids - set(train_set))
if return_ids_only:
# when only IDs are required, without associated features
# returning tuples to prevent accidental changes
yield tuple(train_set), tuple(test_set)
else:
yield self._get_data(train_set, format), \
self._get_data(test_set, format)
[docs]class MultiDatasetRegress(BaseMultiDataset):
"""Container class to manage multimodal regression datasets."""
def __init__(self,
dataset_spec=None,
name='MultiDatasetRegress'):
"""
Constructor.
Parameters
----------
dataset_spec : Iterable or None
List of pyradigms, or absolute paths to serialized pyradigm Datasets.
name : str
human readable name for printing purposes
"""
super().__init__(dataset_class=RegrDataset,
dataset_spec=dataset_spec,
name=name)
def __str__(self):
"""human readable repr"""
return self._common_str()
def __repr__(self):
return self.__str__()
def __format__(self, format_spec):
return self.__str__()
[docs] def holdout(self,
train_perc=0.7,
num_rep=50,
return_ids_only=False,
format='MLDataset'):
"""
Builds a holdout generator for train and test sets for cross-validation.
Parameters
----------
train_perc : float
Percentage (0, 1) of samplets to be selected for the training set.
Remaining will be added to the test set.
num_rep : int
Number of holdout repetitions
return_ids_only : bool
Whether to return samplet IDs only, or the corresponding Datasets
format : str
Format of the Dataset to be returned when return_ids_only=False
format='MLDataset' returns the full-blown pyradigm data structure, and
format='data_matrix' returns just the feature matrix X in ndarray format
Returns
-------
train, test : tuple
A tuple (in order train, test) of IDs or Datasets
Raises
------
ValueError
If train_perc is < 0 or > 1
If num_rep is not int, or < 1 or infinite
"""
if train_perc <= 0.0 or train_perc >= 1.0:
raise ValueError('Train perc must be > 0.0 and < 1.0')
num_rep = int(num_rep)
if not np.isfinite(num_rep) or num_rep < 1:
raise ValueError('Number of repetitions must be > 1 and be finite.')
subset_size = np.int64(np.floor(self.num_samplets * train_perc))
# clipping the range to [1, n]
subset_size = max(1, min(self.num_samplets, subset_size))
# making it indexible with a local copy
id_list = list(self._ids)
for rep in range(num_rep):
random.shuffle(id_list)
train_set = id_list[:subset_size]
# this ensures both are mutually exclusive!
test_set = list(self._ids - set(train_set))
if return_ids_only:
# when only IDs are required, without associated features
# returning tuples to prevent accidental changes
yield tuple(train_set), tuple(test_set)
else:
yield self._get_data(train_set, format), \
self._get_data(test_set, format)
def compute_training_sizes(train_perc, target_sizes, stratified=True):
"""Computes the maximum training size that the smallest class can provide """
size_per_class = np.int64(np.around(train_perc * target_sizes))
if stratified:
print("Different classes in training set are stratified to match smallest class!")
# per-class
size_per_class = np.minimum(np.min(size_per_class), size_per_class)
# single number
reduced_sizes = np.unique(size_per_class)
if len(reduced_sizes) != 1: # they must all be the same
raise ValueError("Error in stratification of training set based on "
"smallest class!")
total_test_samples = np.int64(np.sum(target_sizes) - sum(size_per_class))
return size_per_class, total_test_samples