So I don't think there is a default scikit-learn crossvalidator that will achieve what you want, but it should be possible to create one.
My approach to this would be to loop through all the subjects and greedily assign them to be in the test set for a fold depending on how much that assigning improves the size of the fold as well as the target class rate in the fold.
I've generated some sample data that resembles your problem:
import pandas as pd
import numpy as np
n_subjects = 50
n_observations = 100
n_positives = 15
positive_subjects = np.random.randint(0, n_subjects, n_positives)
data = pd.DataFrame({
'subject': np.random.randint(0, n_subjects, n_observations)
}).assign(
target=lambda d: d['subject'].isin(positive_subjects)
)
subject target
0 14 False
1 12 True
2 10 False
3 36 False
4 21 False
We can then do the assigning using the following snippet
def target_rate_improvements(data, subjects, extra):
"""Compute the improvement in squared difference between the positive rate in each fold vs the overall positive rate in the dataset"""
target_rate = data['target'].mean()
rate_without_extra = data.loc[lambda d: d['subject'].isin(subjects), 'target'].mean()
rate_with_extra = data.loc[lambda d: d['subject'].isin(subjects + [extra]), 'target'].mean()
rate_without_extra = 0 if np.isnan(rate_without_extra) else rate_without_extra
return (rate_without_extra - target_rate)**2 - (rate_with_extra - target_rate)**2
def size_improvement(data, subjects, n_folds):
"""compute the improvement in squared difference between the number of observations in each fold vs the expected number of observations"""
target_obs_per_fold = len(data) / n_folds
return [(target_obs_per_fold - len(data.loc[lambda d: d['subject'].isin(subject)])) ** 2 for subject in subjects.values()]
n_folds = 5
test_subjects_per_fold = {fold: [] for fold in range(n_folds)}
subjects_to_assign = list(range(100))
for subject in data['subject'].unique():
target_rate_improvement = np.array([target_rate_improvements(data, test_subjects_per_fold[fold], subject) for fold in range(n_folds)])
size_improvements = np.array(size_improvement(data, test_subjects_per_fold, n_folds)) * 0.001
best_fold = np.argmax(target_rate_improvement +size_improvements)
test_subjects_per_fold[best_fold] += [subject]
and verify that it works as we expect:
for fold, subjects in test_subjects_per_fold.items():
print('-'*80)
print(f'for fold {fold}')
test_data = data.loc[lambda d: d['subject'].isin(subjects)]
train_data = data.loc[lambda d: ~d['subject'].isin(subjects)]
print('train - pos rate:', train_data['target'].mean(), 'size:', len(train_data))
print('test - pos rate:', test_data['target'].mean(), 'size:', len(test_data))
--------------------------------------------------------------------------------
for fold 0
train - pos rate: 0.3 size: 80
test - pos rate: 0.3 size: 20
--------------------------------------------------------------------------------
for fold 1
train - pos rate: 0.3037974683544304 size: 79
test - pos rate: 0.2857142857142857 size: 21
--------------------------------------------------------------------------------
for fold 2
train - pos rate: 0.2962962962962963 size: 81
test - pos rate: 0.3157894736842105 size: 19
--------------------------------------------------------------------------------
for fold 3
train - pos rate: 0.3 size: 80
test - pos rate: 0.3 size: 20
--------------------------------------------------------------------------------
for fold 4
train - pos rate: 0.3 size: 80
test - pos rate: 0.3 size: 20
Variable naming can be improved here and there but overall I would say this approach could work for your problem.
Implementing this in a scikit-learn compatible crossvalidator would look something like this, although it requires a bit more re-engineering.
class StratifiedGroupKFold(_BaseKFold):
...
def _iter_test_indices(self, X, y, groups):
test_subjects_per_fold = {fold: [] for fold in range(n_folds)}
for subject in data['subject'].unique():
target_rate_improvement = np.array([self.target_rate_improvements(X, y, test_subjects_per_fold[fold], subject) for fold in range(self.n_folds)])
size_improvements = np.array(self.size_improvement(X, y, test_subjects_per_fold, self.n_folds)) * 0.001
best_fold = np.argmax(target_rate_improvement +size_improvements)
test_subjects_per_fold[best_fold] += [subject]
for subjects in test_subjects_per_fold.values():
yield data['subject'].isin(subjects)], ~data['subject'].isin(subjects)]