How to use huggingface HF trainer train with custom collate function?
O

2

0

I have some custom data set with custom table entries and wanted to deal with it with a custom collate. But it didn't work when I pass a collate function I wrote (that DOES work on a individual dataloader e.g., see How does one create a pytorch data loader with a custom hugging face data set without having errors? or How does one create a pytoch data loader using an interleaved hugging face dataset?) . It just doesn't work with HF trianer.

Code

from pathlib import Path
# token = open(Path('~/data/hf_token.txt').expanduser()).read().strip()
token = None
batch_size = 8

# -- AF now
from datasets import load_dataset
import torch
from transformers import GPT2Tokenizer, GPT2LMHeadModel

tokenizer = GPT2Tokenizer.from_pretrained("gpt2")
if tokenizer.pad_token_id is None:
  tokenizer.pad_token = tokenizer.eos_token
model = GPT2LMHeadModel.from_pretrained("gpt2")
device = torch.device(f"cuda:{0}" if torch.cuda.is_available() else "cpu")
model = model.to(device)

# -- Get batch from dataset
from datasets import load_dataset
# path, name = 'brando/debug1_af', 'debug1_af'
path, name = 'brando/debug0_af', 'debug0_af'
# train_dataset = load_dataset(path, name, streaming=True, split="train", token=token).with_format(type="torch")
# eval_dataset = load_dataset(path, name, streaming=True, split="test", token=token).with_format(type="torch")
# batch = dataset.take(1)
# column_names = next(iterbatch).keys()
# print(f'{column_names=}')

# -- Compute max steps (I think we should try to do this for real experiments such that the number of tokens is the same in all training runs for fair experiments, todo: ask Sudharsan or online, for now just make streaming=False)
train_dataset = load_dataset(path, name, streaming=False, split="train", token=token).with_format(type="torch")  # hack to get dataset size
eval_dataset = load_dataset(path, name, streaming=False, split="test", token=token).with_format(type="torch") # hack to get dataset size
print(f'{len(train_dataset)=}')
print(f'{len(eval_dataset)=}')
per_device_train_batch_size = batch_size
num_epochs = 1
max_steps = (len(train_dataset) // per_device_train_batch_size) * num_epochs
print(f'{max_steps=}')    

# -- Get trainer
def collate_tokenize(data):
    text_batch = [f'informal statement {example["generated informal statement"]} formal statement {example["formal statement"]}' for example in data]
    tokenized = tokenizer(text_batch, padding='longest', max_length=128, truncation=True, return_tensors='pt')
    return tokenized

from transformers import Trainer, TrainingArguments
training_args = TrainingArguments(
    output_dir=Path('./results').expanduser(),          # output directory
    max_steps=max_steps,             # max_steps
    per_device_train_batch_size=per_device_train_batch_size,   # batch size per device during training
    per_device_eval_batch_size=batch_size,    # batch size for evaluation
    warmup_steps=500,                # number of warmup steps for learning rate scheduler
    weight_decay=0.01,               # strength of weight decay
    logging_dir=Path('./logs').expanduser(),            # directory for storing logs
    logging_steps=10,
    report_to='none',
)
trainer = Trainer(
    model=model,                         # the instantiated πŸ€— Transformers model to be trained
    args=training_args,                  # training arguments, defined above
    train_dataset=train_dataset,         # training dataset
    eval_dataset=eval_dataset,             # evaluation dataset
    data_collator = collate_tokenize,
)
trainer.train()
print('Done!\a')

error:

len(train_dataset)=14
len(eval_dataset)=13
max_steps=1
/usr/local/lib/python3.10/dist-packages/transformers/optimization.py:411: FutureWarning: This implementation of AdamW is deprecated and will be removed in a future version. Use the PyTorch implementation torch.optim.AdamW instead, or set `no_deprecation_warning=True` to disable this warning
  warnings.warn(
---------------------------------------------------------------------------
IndexError                                Traceback (most recent call last)
<ipython-input-2-4403554fc52d> in <cell line: 63>()
     61     data_collator = collate_tokenize,
     62 )
---> 63 trainer.train()
     64 print('Done!\a')

11 frames
/usr/local/lib/python3.10/dist-packages/datasets/formatting/formatting.py in _check_valid_index_key(key, size)
    524     if isinstance(key, int):
    525         if (key < 0 and key + size < 0) or (key >= size):
--> 526             raise IndexError(f"Invalid key: {key} is out of bounds for size {size}")
    527         return
    528     elif isinstance(key, slice):

IndexError: Invalid key: 12 is out of bounds for size 0

why? How to fix?

Osi answered 10/8, 2023 at 23:22 Comment(1)
is this right? claude.ai/chat/475a4638-cee3-4ce0-af64-c8b8d1dc0d90 – Osi
E
4

There are a couple of issues with your code that might interfere with the HF trainer class. Here's some changes I made:

  • Add remove_unused_columns=False, to the TrainingArguments. This can ensure your data makes it to the trainer.

  • Return explicit labels: HF trainers expect labels. If you're training a language model, the tokenized data should have an input_ids key, and if it's a supervised task, a labels key. In the Hugging Face's Trainer class, the name "labels" is hardcoded in many places to refer to the ground truth that the model's predictions are compared against. This is especially true when computing the loss. See here "The dictionary will be unpacked before being fed to the model. Most models expect the targets under the argument labels. Check your model's documentation for all accepted arguments."

  • Added a handler for missing data.

There are some additional suggestions here as well.

If you run into other issues, you can always set the logging info like this:

import transformers
transformers.logging.set_verbosity_info()

Here's the working code:

from pathlib import Path
from datasets import load_dataset
import torch
from transformers import GPT2LMHeadModel, PreTrainedTokenizer, AutoTokenizer, Trainer, TrainingArguments

# Load model and tokenizer
model = GPT2LMHeadModel.from_pretrained("gpt2")
device = torch.device(f"cuda:{0}" if torch.cuda.is_available() else "cpu")
model = model.to(device)
tokenizer = AutoTokenizer.from_pretrained("gpt2")

# Ensure padding token is set
tokenizer.pad_token = tokenizer.eos_token
if tokenizer.pad_token_id is None:
    raise ValueError("Padding token is not set.")

# Load datasets
path, name = 'brando/debug0_af', 'debug0_af'
train_dataset = load_dataset(path, name, streaming=False, split="train").with_format(type="torch")
eval_dataset = load_dataset(path, name, streaming=False, split="test").with_format(type="torch")

# Compute max steps
batch_size = 3
print(f'{len(train_dataset)=}')
print(f'{len(eval_dataset)=}')
per_device_train_batch_size = batch_size
num_epochs = 1
max_steps = 8
print(f'{max_steps=}')

# Define custom collate function
from typing import List, Dict
from transformers import PreTrainedTokenizer

def custom_collate_fn(data: List[Dict[str, str]], tokenizer: PreTrainedTokenizer) -> Dict[str, torch.Tensor]:
    # Ensure tokenizer has a padding token
    if tokenizer.pad_token is None:
        tokenizer.pad_token = tokenizer.eos_token

    # Extract and concatenate informal and formal statements
    sequences = []
    for idx, example in enumerate(data):
        # Handle null values
        informal = example.get("generated informal statement", "") or ""
        formal = example.get("formal statement", "") or ""

        # Skip if both are empty
        if not informal and not formal:
            continue

        sequences.append(f'informal statement {informal} formal statement {formal}')

    # Tokenize the sequences
    tokenized_data = tokenizer(sequences, padding='longest', truncation=True, return_tensors='pt')
    tokenized_data["labels"] = tokenized_data["input_ids"].clone()

    return tokenized_data

# Training arguments and trainer instantiation
training_args = TrainingArguments(
    output_dir=Path('./results').expanduser(),
    max_steps=max_steps,
    per_device_train_batch_size=per_device_train_batch_size,
    per_device_eval_batch_size=batch_size,
    warmup_steps=500,
    weight_decay=0.01,
    logging_dir=Path('./logs').expanduser(),
    logging_steps=10,
    remove_unused_columns=False,
    report_to='none',
)


sample_data = [train_dataset[i] for i in range(batch_size)]
processed_data = custom_collate_fn(sample_data, tokenizer=tokenizer)

trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=train_dataset,
    eval_dataset=eval_dataset,
    data_collator=lambda data: custom_collate_fn(data, tokenizer=tokenizer)
)

trainer.train()
print('Done!\a')

And a colab with some stuff to check the results.

Effeminacy answered 18/8, 2023 at 13:53 Comment(17)
I'm slightly puzzled/curious, why do we have collate_fn option if preprocess + map does everything we need it seems (later is even lazy already!)? reference: discuss.huggingface.co/t/… – Osi
can the input to the collate be called batch? – Osi
curious, what are the tradeoffs of doing tokenizer(sequences["text"], padding="max_length", max_length=128, truncation=True, return_tensors="pt") instead? – Osi
curious, how does the context length of model interact with this, will it be truncated by the HF model later if it's too long? – Osi
The collate_fn is a more general-purpose tool provided by PyTorch's DataLoader. It allows you to specify how a list of samples should be merged/batched together. preprocess and map functions in the HF datasets library are powerful and can handle many tasks, there might be cases where you want to perform some custom operations at the time of batching (e.g., dynamic padding, special token insertion, etc.). that's when it becomes handy to use collate_fn-- for user flexibility – Effeminacy
Yes, the input to the collate_fn can be named whatever you like, but it's conventionally named batch or samples in many contexts. It's a list of data samples that you want to batch together. The name is just a placeholder afaik. discuss.pytorch.org/t/how-to-use-collate-fn/27181 – Effeminacy
Using padding="max_length" /specifying a max_length will ensure that sequences are of the same length. Trade-offs are: Pros: Uniform sequence lengths can lead to more efficient GPU utilization. Cons: might lose information if many sequences are longer than the max_length. if most of your sequences are much shorter than the max_length, you'll end up with a lot of padding. context length of the model refers to the maximum number of tokens the model can handle in a single forward pass. If you pass a sequence longer than this length, you'll get an error. – Effeminacy
also, one last modification to your excellent script. How do I compute the len of the data set if streaming=True? I do that most of the time and want to set up max_steps properly. – Osi
thanks! for the context lenght I think config = AutoConfig.from_pretrained("gpt2"); context_length = config.max_position_embeddings # perhaps you can query first layer from attention input matrix is a good option. – Osi
you could try something like this? ``` def compute_length_of_streaming_dataset(dataset): length = 0 for _ in dataset: length += 1 return length train_dataset = load_dataset(path, name, streaming=True, split="train") train_dataset_length = compute_length_of_streaming_dataset(train_dataset) print(f'Length of streaming train dataset: {train_dataset_length}') ``` Iterating over the entire dataset just to compute its length might be inefficient FYI: Once you've iterated over the streaming dataset, you'll need to reload it if you want to use it again – Effeminacy
Might make sense to use something like dask or vaex – Effeminacy
one thing I don't understand, why is this statement in your answer true? "Add remove_unused_columns=False, to the TrainingArguments. This can ensure your data makes it to the trainer."? – Osi
for the streaming data set I went for 3 epochs of full training. I can't halt it when I'm done. I was thinking of up adding more data to the HF repo as its training on the fly...idk if that will screw things up. Plus it's simpler solution. It feels bad to go through the data set before training. The Pile is 800GB so computing that number might be really slow. So Perhaps just not do it? (will share my new colab in a sec when done, so you can benefit too) – Osi
In the HF Trainer, remove_unused_columns decides if unused dataset columns should be removed. By default, it's often True, removing columns not directly used by the model. For custom data or non-standard columns, setting it to False ensures these columns are kept, which is vital if they're needed for custom processing or collation, which it seems like they are in this case. – Effeminacy
RE Adding Data on the Fly: It's possible but not typical to add data during training. Doing so can disrupt training dynamics. It's best to start with a fixed dataset size. For large datasets like the Pile, we could bypass length computation. If targeting 3 epochs, set max_steps in TrainingArguments using an estimated steps_per_epoch: max_steps = 3 * steps_per_epoch. You could also train for a set time, like 24 hours, and manually halt when needed? – Effeminacy
sorry perhaps this is repetitive but isn't preprocess + map basically "turning complete"? i.e., does anything I'd like to do with collate fn? – Osi
is this right? claude.ai/chat/475a4638-cee3-4ce0-af64-c8b8d1dc0d90 – Osi
O
0

It is also possible to do the standard:

  1. preprocess function that gets the text field e.g., examples["text"]
  2. then pass that to the data set object (actual HF full data set) or batch (as a dataset obj) as in batch.map(preprocess, ...)

example code with batch:

    # - Prepare functions to tokenize batch
    def preprocess(examples):
        return tokenizer(examples["text"], padding="max_length", max_length=128, truncation=True, return_tensors="pt")
    remove_columns = column_names  # remove all keys that are not tensors to avoid bugs in collate function in task2vec's pytorch data loader
    def map(batch):
        return batch.map(preprocess, batched=True, remove_columns=remove_columns)
    tokenized_batch = map(raw_text_batch)
    print(f'{next(iter(tokenized_batch))=}')

example code with data set:

from datasets import load_dataset
from transformers import GPT2Tokenizer

# Load the tokenizer for the GPT-2 model
tokenizer = GPT2Tokenizer.from_pretrained("gpt2-medium")

# Define a preprocessing function
def preprocess(example):
    # Tokenize the text and return the result
    return tokenizer(example["text"], truncation=True, padding="max_length", max_length=128)

# Load a sample dataset
dataset = load_dataset("text", data_files={"train": "sample.txt"})  # Assume "sample.txt" contains your text data

# Apply the preprocessing function to the dataset
processed_dataset = dataset.map(preprocess)

# Check the processed dataset
print(processed_dataset["train"][0])

For both, since they are iterable objects, the .map(preprocess, ...) function sets up the ds objects for later getting data with next lazily.

The collate function is similar to the above but slightly more general, but I think everything could be done in the preprocess and map functions. So it's not entirely clear to me we actually need the collate function.


Slightly uglier code that deals with real case scenarios:

    # -- Get probe network
    from datasets import load_dataset 
    from datasets.iterable_dataset import IterableDataset
    import torch
    from transformers import GPT2Tokenizer, GPT2LMHeadModel

    tokenizer = GPT2Tokenizer.from_pretrained("gpt2")
    if tokenizer.pad_token_id is None:
        tokenizer.pad_token = tokenizer.eos_token
    probe_network = GPT2LMHeadModel.from_pretrained("gpt2")
    device = torch.device(f"cuda:{0}" if torch.cuda.is_available() else "cpu")
    probe_network = probe_network.to(device)

    # -- Get data set
    def my_load_dataset(path, name, data_files=data_files):
        print(f'{path=} {name=} {streaming=} {data_files=}')
        if path == 'json' or path == 'bin' or path == 'csv':
            print(f'{data_files_prefix+name=}')
            return load_dataset(path, data_files=data_files_prefix+name, streaming=streaming, split="train").with_format("torch")
        elif path == 'parquet':
            print(f'{data_files=}')
            return load_dataset(path, data_files=data_files, streaming=streaming, split="train").with_format("torch")
        else:
            return load_dataset(path, name, streaming=streaming, split="train").with_format("torch")
    # - get data set for real now
    if isinstance(path, str):
        dataset = my_load_dataset(path, name, data_files)
    else:
        # -Interleaving datasets
        print('- Interleaving datasets')
        datasets = [my_load_dataset(path, name, data_files).with_format("torch") for path, name, data_files in zip(path, name, data_files)]
        # datasets = [my_load_dataset(path, name).with_format("torch") for path, name in zip(path, name)]
        if any('parquet' == p for p in path) or path == 'parquest':  # idk why I need to do this, I checked very carefully and deleted all columns so interleaved data set matched but when doing this with c4 & wikitext it fails but with the parquet it works https://discuss.huggingface.co/t/why-does-deleting-the-columns-before-giving-it-to-interleave-work-but-sometimes-it-does-not-work/50879
            dataset_descriptions = [dataset.description for dataset in datasets]  # print description if available
            print(f'{dataset_descriptions=}')
            # - make sure all datasets have the same columns to avoid interleave to complain
            all_columns = [col for dataset in datasets for col in dataset.column_names]
            print(f'{all_columns=}')
            columns_to_remove = [col for dataset in datasets for col in dataset.column_names if col != 'text']
            columns_to_remove = list(set(columns_to_remove))  # remove duplicates
            print(f'{columns_to_remove=}')
            datasets = [dataset.remove_columns(columns_to_remove) for dataset in datasets]
            # - interleave
            print(f'{probabilities=}')
            dataset_descriptions = [dataset.description for dataset in datasets]  # print description if available
            print(f'{dataset_descriptions=}')
        dataset = interleave_datasets(datasets, probabilities)
        # dataset = dataset.remove_columns(columns_to_remove)
        print(f'{dataset=}')
        print(f'{dataset.column_names=}')
    print(f'{dataset=}')
    print(f'{type(dataset)=}')
    # datasets.iterable_dataset.IterableDataset
    # datasets.arrow_dataset.Dataset
    # dataset = IterableDataset(dataset) if type(dataset) != IterableDataset else dataset  # to force dataset.take(batch_size) to work in non-streaming mode
    raw_text_batch = dataset.take(batch_size) if streaming else dataset.select(range(batch_size))
    print(f'{raw_text_batch=}')
    print(f'{next(iter(raw_text_batch))=}')
    column_names = next(iter(raw_text_batch)).keys()
    print(f'{column_names=}')

    # - Prepare functions to tokenize batch
    def preprocess(examples):
        return tokenizer(examples["text"], padding="max_length", max_length=128, truncation=True, return_tensors="pt")
    remove_columns = column_names  # remove all keys that are not tensors to avoid bugs in collate function in task2vec's pytorch data loader
    def map(batch):
        return batch.map(preprocess, batched=True, remove_columns=remove_columns)
    tokenized_batch = map(raw_text_batch)
    print(f'{next(iter(tokenized_batch))=}')

Osi answered 21/8, 2023 at 17:22 Comment(1)
is this right? claude.ai/chat/475a4638-cee3-4ce0-af64-c8b8d1dc0d90 – Osi

© 2022 - 2024 β€” McMap. All rights reserved.