diff --git a/customTrainer.py b/customTrainer.py new file mode 100644 index 0000000000000000000000000000000000000000..0ac5fb5c1de356aa130db47f7ecd96615f54c642 --- /dev/null +++ b/customTrainer.py @@ -0,0 +1,533 @@ +from transformers import Seq2SeqTrainer +# coding=utf-8 +# Copyright 2020-present the HuggingFace Inc. team. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +The Trainer class, to easily train a 🤗 Transformers from scratch or finetune it on a new task. +""" + +import contextlib +import functools +import glob +import inspect +import math +import os +import random +import re +import shutil +import sys +import time +import warnings +from collections.abc import Mapping +from pathlib import Path +from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional, Tuple, Union + +from tqdm.auto import tqdm + + +# Integrations must be imported before ML frameworks: +from transformers.integrations import ( # isort: split + default_hp_search_backend, + get_reporting_integration_callbacks, + hp_params, + is_fairscale_available, + is_optuna_available, + is_ray_tune_available, + is_sigopt_available, + is_wandb_available, + run_hp_search_optuna, + run_hp_search_ray, + run_hp_search_sigopt, + run_hp_search_wandb, +) + +import numpy as np +import torch +import torch.distributed as dist +from packaging import version +from torch import nn +from torch.utils.data import DataLoader, Dataset, RandomSampler, SequentialSampler +from torch.utils.data.distributed import DistributedSampler + +from huggingface_hub import Repository + +from transformers import __version__ +from transformers.configuration_utils import PretrainedConfig +from transformers.data.data_collator import DataCollator, DataCollatorWithPadding, default_data_collator +from transformers.debug_utils import DebugOption, DebugUnderflowOverflow +from transformers.deepspeed import deepspeed_init, is_deepspeed_zero3_enabled +from transformers.dependency_versions_check import dep_version_check +from transformers.modelcard import TrainingSummary +from transformers.modeling_utils import PreTrainedModel, load_sharded_checkpoint, unwrap_model +from transformers.models.auto.modeling_auto import MODEL_FOR_CAUSAL_LM_MAPPING_NAMES, MODEL_MAPPING_NAMES +from transformers.optimization import Adafactor, get_scheduler +from transformers.pytorch_utils import ( + ALL_LAYERNORM_LAYERS, + is_torch_greater_or_equal_than_1_6, + is_torch_greater_or_equal_than_1_10, + is_torch_less_than_1_11, +) +from transformers.tokenization_utils_base import PreTrainedTokenizerBase +from transformers.trainer_callback import ( + CallbackHandler, + DefaultFlowCallback, + PrinterCallback, + ProgressCallback, + TrainerCallback, + TrainerControl, + TrainerState, +) +from transformers.trainer_pt_utils import ( + DistributedLengthGroupedSampler, + DistributedSamplerWithLoop, + DistributedTensorGatherer, + IterableDatasetShard, + LabelSmoother, + LengthGroupedSampler, + SequentialDistributedSampler, + ShardSampler, + distributed_broadcast_scalars, + distributed_concat, + find_batch_size, + get_module_class_from_name, + get_parameter_names, + nested_concat, + nested_detach, + nested_numpify, + nested_truncate, + nested_xla_mesh_reduce, + reissue_pt_warnings, +) +from transformers.trainer_utils import ( + PREFIX_CHECKPOINT_DIR, + BestRun, + EvalLoopOutput, + EvalPrediction, + FSDPOption, + HPSearchBackend, + HubStrategy, + IntervalStrategy, + PredictionOutput, + RemoveColumnsCollator, + ShardedDDPOption, + TrainerMemoryTracker, + TrainOutput, + default_compute_objective, + default_hp_space, + denumpify_detensorize, + enable_full_determinism, + find_executable_batch_size, + get_last_checkpoint, + has_length, + number_of_arguments, + seed_worker, + set_seed, + speed_metrics, +) +from transformers.training_args import OptimizerNames, ParallelMode, TrainingArguments +from transformers.utils import ( + CONFIG_NAME, + WEIGHTS_INDEX_NAME, + WEIGHTS_NAME, + find_labels, + get_full_repo_name, + is_apex_available, + is_datasets_available, + is_in_notebook, + is_ipex_available, + is_sagemaker_dp_enabled, + is_sagemaker_mp_enabled, + is_torch_tensorrt_fx_available, + is_torch_tpu_available, + is_torchdynamo_available, + logging, +) +from transformers.utils.generic import ContextManagers + + +_is_torch_generator_available = False +_is_native_cuda_amp_available = False +_is_native_cpu_amp_available = False + +DEFAULT_CALLBACKS = [DefaultFlowCallback] +DEFAULT_PROGRESS_CALLBACK = ProgressCallback + +if is_in_notebook(): + from transformers.utils.notebook import NotebookProgressCallback + + DEFAULT_PROGRESS_CALLBACK = NotebookProgressCallback + + +if is_torch_greater_or_equal_than_1_6: + _is_torch_generator_available = True + _is_native_cuda_amp_available = True + +if is_torch_greater_or_equal_than_1_10: + _is_native_cpu_amp_available = True + +if is_datasets_available(): + import datasets + + + +IS_SAGEMAKER_MP_POST_1_10 = False + + +logger = logging.get_logger(__name__) + + +# Name of the files used for checkpointing +TRAINING_ARGS_NAME = "training_args.bin" +TRAINER_STATE_NAME = "trainer_state.json" +OPTIMIZER_NAME = "optimizer.pt" +SCHEDULER_NAME = "scheduler.pt" +SCALER_NAME = "scaler.pt" + + +class CustomTrainer(Seq2SeqTrainer): + def set_epoch_callback(self, func): + self.epoch_callback = func + + def _inner_training_loop( + self, batch_size=None, args=None, resume_from_checkpoint=None, trial=None, ignore_keys_for_eval=None + ): + self._train_batch_size = batch_size + # Data loader and number of training steps + train_dataloader = self.get_train_dataloader() + + # Setting up training control variables: + # number of training epochs: num_train_epochs + # number of training steps per epoch: num_update_steps_per_epoch + # total number of training steps to execute: max_steps + total_train_batch_size = args.train_batch_size * args.gradient_accumulation_steps * args.world_size + + len_dataloader = None + if has_length(train_dataloader): + len_dataloader = len(train_dataloader) + num_update_steps_per_epoch = len_dataloader // args.gradient_accumulation_steps + num_update_steps_per_epoch = max(num_update_steps_per_epoch, 1) + num_examples = self.num_examples(train_dataloader) + if args.max_steps > 0: + max_steps = args.max_steps + num_train_epochs = args.max_steps // num_update_steps_per_epoch + int( + args.max_steps % num_update_steps_per_epoch > 0 + ) + # May be slightly incorrect if the last batch in the training dataloader has a smaller size but it's + # the best we can do. + num_train_samples = args.max_steps * total_train_batch_size + else: + max_steps = math.ceil(args.num_train_epochs * num_update_steps_per_epoch) + num_train_epochs = math.ceil(args.num_train_epochs) + num_train_samples = self.num_examples(train_dataloader) * args.num_train_epochs + elif args.max_steps > 0: # Rely on max_steps when dataloader does not have a working size + max_steps = args.max_steps + # Setting a very large number of epochs so we go as many times as necessary over the iterator. + num_train_epochs = sys.maxsize + num_update_steps_per_epoch = max_steps + num_examples = total_train_batch_size * args.max_steps + num_train_samples = args.max_steps * total_train_batch_size + else: + raise ValueError( + "args.max_steps must be set to a positive value if dataloader does not have a length, was" + f" {args.max_steps}" + ) + + if DebugOption.UNDERFLOW_OVERFLOW in self.args.debug: + if self.args.n_gpu > 1: + # nn.DataParallel(model) replicates the model, creating new variables and module + # references registered here no longer work on other gpus, breaking the module + raise ValueError( + "Currently --debug underflow_overflow is not supported under DP. Please use DDP" + " (torch.distributed.launch)." + ) + else: + debug_overflow = DebugUnderflowOverflow(self.model) # noqa + + delay_optimizer_creation = ( + self.sharded_ddp is not None + and self.sharded_ddp != ShardedDDPOption.SIMPLE + or is_sagemaker_mp_enabled() + or self.fsdp is not None + ) + if args.deepspeed: + deepspeed_engine, optimizer, lr_scheduler = deepspeed_init( + self, num_training_steps=max_steps, resume_from_checkpoint=resume_from_checkpoint + ) + self.model = deepspeed_engine.module + self.model_wrapped = deepspeed_engine + self.deepspeed = deepspeed_engine + self.optimizer = optimizer + self.lr_scheduler = lr_scheduler + elif not delay_optimizer_creation: + self.create_optimizer_and_scheduler(num_training_steps=max_steps) + + self.state = TrainerState() + self.state.is_hyper_param_search = trial is not None + + # Activate gradient checkpointing if needed + if args.gradient_checkpointing: + self.model.gradient_checkpointing_enable() + + model = self._wrap_model(self.model_wrapped) + + if is_sagemaker_mp_enabled() and resume_from_checkpoint is not None: + self._load_from_checkpoint(resume_from_checkpoint, model) + + # for the rest of this function `model` is the outside model, whether it was wrapped or not + if model is not self.model: + self.model_wrapped = model + + if delay_optimizer_creation: + self.create_optimizer_and_scheduler(num_training_steps=max_steps) + + # Check if saved optimizer or scheduler states exist + self._load_optimizer_and_scheduler(resume_from_checkpoint) + + # important: at this point: + # self.model is the Transformers Model + # self.model_wrapped is DDP(Transformers Model), Deepspeed(Transformers Model), etc. + + # Train! + logger.info("***** Running training *****") + logger.info(f" Num examples = {num_examples}") + logger.info(f" Num Epochs = {num_train_epochs}") + logger.info(f" Instantaneous batch size per device = {args.per_device_train_batch_size}") + logger.info(f" Total train batch size (w. parallel, distributed & accumulation) = {total_train_batch_size}") + logger.info(f" Gradient Accumulation steps = {args.gradient_accumulation_steps}") + logger.info(f" Total optimization steps = {max_steps}") + + self.state.epoch = 0 + start_time = time.time() + epochs_trained = 0 + steps_trained_in_current_epoch = 0 + steps_trained_progress_bar = None + + # Check if continuing training from a checkpoint + if resume_from_checkpoint is not None and os.path.isfile( + os.path.join(resume_from_checkpoint, TRAINER_STATE_NAME) + ): + self.state = TrainerState.load_from_json(os.path.join(resume_from_checkpoint, TRAINER_STATE_NAME)) + epochs_trained = self.state.global_step // num_update_steps_per_epoch + if not args.ignore_data_skip: + steps_trained_in_current_epoch = self.state.global_step % (num_update_steps_per_epoch) + steps_trained_in_current_epoch *= args.gradient_accumulation_steps + else: + steps_trained_in_current_epoch = 0 + + logger.info(" Continuing training from checkpoint, will skip to saved global_step") + logger.info(f" Continuing training from epoch {epochs_trained}") + logger.info(f" Continuing training from global step {self.state.global_step}") + if not args.ignore_data_skip: + logger.info( + f" Will skip the first {epochs_trained} epochs then the first {steps_trained_in_current_epoch} " + "batches in the first epoch. If this takes a lot of time, you can add the `--ignore_data_skip` " + "flag to your launch command, but you will resume the training on data already seen by your model." + ) + if self.is_local_process_zero() and not args.disable_tqdm: + steps_trained_progress_bar = tqdm(total=steps_trained_in_current_epoch) + steps_trained_progress_bar.set_description("Skipping the first batches") + + # Update the references + self.callback_handler.model = self.model + self.callback_handler.optimizer = self.optimizer + self.callback_handler.lr_scheduler = self.lr_scheduler + self.callback_handler.train_dataloader = train_dataloader + self.state.trial_name = self.hp_name(trial) if self.hp_name is not None else None + if trial is not None: + assignments = trial.assignments if self.hp_search_backend == HPSearchBackend.SIGOPT else trial + self.state.trial_params = hp_params(assignments) + else: + self.state.trial_params = None + # This should be the same if the state has been saved but in case the training arguments changed, it's safer + # to set this after the load. + self.state.max_steps = max_steps + self.state.num_train_epochs = num_train_epochs + self.state.is_local_process_zero = self.is_local_process_zero() + self.state.is_world_process_zero = self.is_world_process_zero() + + # tr_loss is a tensor to avoid synchronization of TPUs through .item() + tr_loss = torch.tensor(0.0).to(args.device) + # _total_loss_scalar is updated everytime .item() has to be called on tr_loss and stores the sum of all losses + self._total_loss_scalar = 0.0 + self._globalstep_last_logged = self.state.global_step + model.zero_grad() + + self.control = self.callback_handler.on_train_begin(args, self.state, self.control) + + # Skip the first epochs_trained epochs to get the random state of the dataloader at the right point. + if not args.ignore_data_skip: + for epoch in range(epochs_trained): + is_random_sampler = hasattr(train_dataloader, "sampler") and isinstance( + train_dataloader.sampler, RandomSampler + ) + if is_torch_less_than_1_11 or not is_random_sampler: + # We just need to begin an iteration to create the randomization of the sampler. + # That was before PyTorch 1.11 however... + for _ in train_dataloader: + break + else: + # Otherwise we need to call the whooooole sampler cause there is some random operation added + # AT THE VERY END! + _ = list(train_dataloader.sampler) + + for epoch in range(epochs_trained, num_train_epochs): + if isinstance(train_dataloader, DataLoader) and isinstance(train_dataloader.sampler, DistributedSampler): + train_dataloader.sampler.set_epoch(epoch) + elif hasattr(train_dataloader, "dataset") and isinstance(train_dataloader.dataset, IterableDatasetShard): + train_dataloader.dataset.set_epoch(epoch) + + epoch_iterator = train_dataloader + + # Reset the past mems state at the beginning of each epoch if necessary. + if args.past_index >= 0: + self._past = None + + steps_in_epoch = ( + len(epoch_iterator) + if len_dataloader is not None + else args.max_steps * args.gradient_accumulation_steps + ) + self.control = self.callback_handler.on_epoch_begin(args, self.state, self.control) + + if epoch == epochs_trained and resume_from_checkpoint is not None and steps_trained_in_current_epoch == 0: + self._load_rng_state(resume_from_checkpoint) + + step = -1 + for step, inputs in enumerate(epoch_iterator): + + # Skip past any already trained steps if resuming training + if steps_trained_in_current_epoch > 0: + steps_trained_in_current_epoch -= 1 + if steps_trained_progress_bar is not None: + steps_trained_progress_bar.update(1) + if steps_trained_in_current_epoch == 0: + self._load_rng_state(resume_from_checkpoint) + continue + elif steps_trained_progress_bar is not None: + steps_trained_progress_bar.close() + steps_trained_progress_bar = None + + if step % args.gradient_accumulation_steps == 0: + self.control = self.callback_handler.on_step_begin(args, self.state, self.control) + + if ( + ((step + 1) % args.gradient_accumulation_steps != 0) + and args.local_rank != -1 + and args._no_sync_in_gradient_accumulation + ): + # Avoid unnecessary DDP synchronization since there will be no backward pass on this example. + with model.no_sync(): + tr_loss_step = self.training_step(model, inputs) + else: + tr_loss_step = self.training_step(model, inputs) + + if ( + args.logging_nan_inf_filter + and not is_torch_tpu_available() + and (torch.isnan(tr_loss_step) or torch.isinf(tr_loss_step)) + ): + # if loss is nan or inf simply add the average of previous logged losses + tr_loss += tr_loss / (1 + self.state.global_step - self._globalstep_last_logged) + else: + tr_loss += tr_loss_step + + self.current_flos += float(self.floating_point_ops(inputs)) + + # Optimizer step for deepspeed must be called on every step regardless of the value of gradient_accumulation_steps + if self.deepspeed: + self.deepspeed.step() + + if (step + 1) % args.gradient_accumulation_steps == 0 or ( + # last step in epoch but step is always smaller than gradient_accumulation_steps + steps_in_epoch <= args.gradient_accumulation_steps + and (step + 1) == steps_in_epoch + ): + + # Optimizer step + optimizer_was_run = True + if self.deepspeed: + pass # called outside the loop + elif self.do_grad_scaling: + scale_before = self.scaler.get_scale() + self.scaler.step(self.optimizer) + self.scaler.update() + scale_after = self.scaler.get_scale() + optimizer_was_run = scale_before <= scale_after + else: + self.optimizer.step() + + if optimizer_was_run and not self.deepspeed: + self.lr_scheduler.step() + + model.zero_grad() + self.state.global_step += 1 + self.state.epoch = epoch + (step + 1) / steps_in_epoch + self.control = self.callback_handler.on_step_end(args, self.state, self.control) + + self._maybe_log_save_evaluate(tr_loss, model, trial, epoch, ignore_keys_for_eval) + else: + self.control = self.callback_handler.on_substep_end(args, self.state, self.control) + + if self.control.should_epoch_stop or self.control.should_training_stop: + break + if step < 0: + logger.warning( + "There seems to be not a single sample in your epoch_iterator, stopping training at step" + f" {self.state.global_step}! This is expected if you're using an IterableDataset and set" + f" num_steps ({max_steps}) higher than the number of available samples." + ) + self.control.should_training_stop = True + + self.control = self.callback_handler.on_epoch_end(args, self.state, self.control) + self._maybe_log_save_evaluate(tr_loss, model, trial, epoch, ignore_keys_for_eval) + + if DebugOption.TPU_METRICS_DEBUG in self.args.debug: + logger.warning( + "You enabled PyTorch/XLA debug metrics but you don't have a TPU " + "configured. Check your training configuration if this is unexpected." + ) + + print("EPOCH DONE") + print(self.epoch_callback) + if self.epoch_callback: + self.epoch_callback() + if self.control.should_training_stop: + break + + + + if args.past_index and hasattr(self, "_past"): + # Clean the state at the end of training + delattr(self, "_past") + + logger.info("\n\nTraining completed. Do not forget to share your model on huggingface.co/models =)\n\n") + + # add remaining tr_loss + self._total_loss_scalar += tr_loss.item() + train_loss = self._total_loss_scalar / self.state.global_step + + metrics = speed_metrics("train", start_time, num_samples=num_train_samples, num_steps=self.state.max_steps) + self.store_flos() + metrics["total_flos"] = self.state.total_flos + metrics["train_loss"] = train_loss + + self.is_in_train = False + + self._memory_tracker.stop_and_update_metrics(metrics) + + self.log(metrics) + + self.control = self.callback_handler.on_train_end(args, self.state, self.control) + + return TrainOutput(self.state.global_step, train_loss, metrics) \ No newline at end of file diff --git a/evaluate.py b/evaluate.py index 947d04abed2ded714480ff1e7d30e284ba3a9ac5..de01cc41c5bf8bb08d4d5da5667fb5b3eebff377 100644 --- a/evaluate.py +++ b/evaluate.py @@ -1,6 +1,7 @@ import sys import tqdm +from customTrainer import CustomTrainer from data import eval_query as eq from data import pred_query_responses as pqr from transformers import BartTokenizer, BartForConditionalGeneration @@ -93,31 +94,12 @@ if __name__ == "__main__": train_dataset = get_dataset(train_tok_path) - - - - + u = 0 - for i in range(int(epochs)): - training_args = Seq2SeqTrainingArguments( - output_dir='./trained-models/blackbox', - num_train_epochs=i+1, - per_device_train_batch_size=1, - per_device_eval_batch_size=1, - warmup_steps=10, - weight_decay=0.01, - logging_dir='./logs', - ) - - trainer = Seq2SeqTrainer( - model=model, - args=training_args, - train_dataset=train_dataset - ) - - - trainer.train() - pred_path = output + "-" + str(i+1) + ".csv" + def predict(): + global u + u += 1 + pred_path = output + "-" + str(u) + ".csv" with open(test_tok_path, "r", encoding="utf-8") as f, open(pred_path, "w", encoding="utf-8") as out: test_data = csv.reader(f, delimiter=",") next(test_data) @@ -131,7 +113,6 @@ if __name__ == "__main__": out.write(f"\"{question}\",\"{predicted}\"\n") pbar.update(1) pbar.close() - dump_path = pred_path.replace("predicted", "pred_responses").replace(".csv", ".json") pqr.build_responsefile(dump_path, test_path, pred_path) print("Evaluation againts server results") @@ -156,6 +137,67 @@ if __name__ == "__main__": f.write(f"Recall micro: {recall_micro_query}\n") f.write(f"F1 micro: {f1_micro_query}\n") f.write(f"Fully correct: {fully_correct_query}\n\n") + + + #for i in range(int(epochs)): + training_args = Seq2SeqTrainingArguments( + output_dir='./trained-models/blackbox', + num_train_epochs=int(epochs), + per_device_train_batch_size=1, + per_device_eval_batch_size=1, + warmup_steps=10, + weight_decay=0.01, + logging_dir='./logs' + ) + + trainer = CustomTrainer( + model=model, + args=training_args, + train_dataset=train_dataset, + ) + + trainer.set_epoch_callback(predict) + + trainer.train() + #pred_path = output + "-" + str(i+1) + ".csv" + #with open(test_tok_path, "r", encoding="utf-8") as f, open(pred_path, "w", encoding="utf-8") as out: + # test_data = csv.reader(f, delimiter=",") + # next(test_data) + # test_data = list(test_data) + # test_data = [q[0] for q in test_data] + # out.write("text,summary\n") + # print("Runs predicted queries") + # pbar = tqdm.tqdm(total=len(test_data)) + # for i, question in enumerate(test_data): + # predicted = gsq.predict_query(question, model, tokenizer) + # out.write(f"\"{question}\",\"{predicted}\"\n") + # pbar.update(1) + # pbar.close() +# + #dump_path = pred_path.replace("predicted", "pred_responses").replace(".csv", ".json") + #pqr.build_responsefile(dump_path, test_path, pred_path) + #print("Evaluation againts server results") + #precision_macro_query, recall_macro_query, f1_macro_query, precision_micro_query, recall_micro_query, f1_micro_query, fully_correct_query = eq.eval_query_response(test_path, dump_path) + #print("Evaluation of queries as strings") + #precision_macro_string, recall_macro_string, f1_macro_string, precision_micro_string, recall_micro_string, f1_micro_string, fully_correct_string = eq.eval_query_json(test_path, dump_path) + #res_path = dump_path.replace("pred_responses", "eval").replace(".json", ".txt") + #with open(res_path, "w") as f: + # f.write("String evaluation\n\n") + # f.write(f"Precision macro: {precision_macro_string}\n") + # f.write(f"Recall macro: {recall_macro_string}\n") + # f.write(f"F1 macro: {f1_macro_string}\n") + # f.write(f"Precision micro: {precision_micro_string}\n") + # f.write(f"Recall micro: {recall_micro_string}\n") + # f.write(f"F1 micro: {f1_micro_string}\n") + # f.write(f"Fully correct: {fully_correct_string}\n\n") + # f.write("Query evaluation\n\n") + # f.write(f"Precision macro: {precision_macro_query}\n") + # f.write(f"Recall macro: {recall_macro_query}\n") + # f.write(f"F1 macro: {f1_macro_query}\n") + # f.write(f"Precision micro: {precision_micro_query}\n") + # f.write(f"Recall micro: {recall_micro_query}\n") + # f.write(f"F1 micro: {f1_micro_query}\n") + # f.write(f"Fully correct: {fully_correct_query}\n\n") #train_dataset = get_dataset("./data/tokenized/lc-quad-requeried-linked-train-tokenized-append-1.csv") #test_dataset = get_dataset("test.csv") #