From 28b01220b6de1089bc11d2d50c5ed6a90605927b Mon Sep 17 00:00:00 2001 From: ViperEkura <3081035982@qq.com> Date: Fri, 3 Oct 2025 22:08:11 +0800 Subject: [PATCH] =?UTF-8?q?test(trainer):=20=E6=8B=86=E5=88=86=E6=B5=8B?= =?UTF-8?q?=E8=AF=95=E6=96=87=E4=BB=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- tests/conftest.py | 103 ++++++++ tests/test_callbacks.py | 59 +++++ tests/test_dataset_loader.py | 69 ++++++ tests/test_early_stopping.py | 63 +++++ tests/test_sampler.py | 128 ++++++++++ tests/test_train_config.py | 90 +++++++ tests/test_train_strategy.py | 71 ++++++ tests/test_trainer.py | 466 ----------------------------------- 8 files changed, 583 insertions(+), 466 deletions(-) create mode 100644 tests/conftest.py create mode 100644 tests/test_callbacks.py create mode 100644 tests/test_dataset_loader.py create mode 100644 tests/test_early_stopping.py create mode 100644 tests/test_sampler.py create mode 100644 tests/test_train_config.py create mode 100644 tests/test_train_strategy.py delete mode 100644 tests/test_trainer.py diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 0000000..d5406bc --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,103 @@ +import os +import json +import numpy as np +import tempfile +import shutil +import torch + +import pytest +from torch.utils.data import Dataset + +from khaosz.core import * +from khaosz.trainer import * +from khaosz.trainer.data_util import * + +@pytest.fixture +def base_test_env(): + test_dir = tempfile.mkdtemp() + config_path = os.path.join(test_dir, "config.json") + + n_dim_choices = [8, 16, 32] + n_head_choices = [2, 4] + + n_dim = int(np.random.choice(n_dim_choices)) + n_head = int(np.random.choice(n_head_choices)) + n_kvhead = n_head // 2 + d_ffn = n_dim * 2 + + config = { + "vocab_size": 1000, + "n_dim": n_dim, + "n_head": n_head, + "n_kvhead": n_kvhead, + "d_ffn": d_ffn, + "m_len": 1024, + "n_layer": 4, + "norm_eps": 1e-5 + } + + with open(config_path, 'w') as f: + json.dump(config, f) + + transformer_config = TransformerConfig().load(config_path) + model = Transformer(transformer_config) + tokenizer = BpeTokenizer() + + yield { + "test_dir": test_dir, + "config_path": config_path, + "transformer_config": transformer_config, + "model": model, + "tokenizer": tokenizer, + } + + shutil.rmtree(test_dir) + +@pytest.fixture +def random_dataset(): + class RandomDataset(Dataset): + def __init__(self, length=None, max_length=64, vocab_size=1000): + self.length = length or int(np.random.randint(100, 200)) + self.max_length = max_length + self.vocab_size = vocab_size + + def __len__(self): + return self.length + + def __getitem__(self, idx): + return { + "input_ids": torch.randint(0, self.vocab_size, (self.max_length,)), + "target_ids": torch.randint(0, self.vocab_size, (self.max_length,)) + } + + dataset = RandomDataset() + + yield dataset + +@pytest.fixture +def multi_turn_dataset(): + class MultiTurnDataset(Dataset): + def __init__(self, length=None, max_length=64, vocab_size=1000): + self.length = length or int(np.random.randint(100, 200)) + self.max_length = max_length + self.vocab_size = vocab_size + + def __len__(self): + return self.length + + def __getitem__(self, idx): + input_ids = torch.randint(0, self.vocab_size, (self.max_length,)) + target_ids = torch.randint(0, self.vocab_size, (self.max_length,)) + loss_mask = build_loss_mask(input_ids, 0, 1) + attn_mask = build_attention_mask(input_ids, 2, True) + + return { + "input_ids": input_ids, + "target_ids": target_ids, + "loss_mask": loss_mask, + "attn_mask": attn_mask, + } + + dataset = MultiTurnDataset() + + yield dataset \ No newline at end of file diff --git a/tests/test_callbacks.py b/tests/test_callbacks.py new file mode 100644 index 0000000..35b832e --- /dev/null +++ b/tests/test_callbacks.py @@ -0,0 +1,59 @@ +import torch + +from khaosz.core import * +from khaosz.trainer import * +from khaosz.trainer.data_util import * + +def test_callback_integration(base_test_env, random_dataset): + """Test that all callbacks are properly integrated""" + optimizer = torch.optim.AdamW(base_test_env["model"].parameters()) + train_config = TrainConfig( + dataset=random_dataset, + optimizer=optimizer, + checkpoint_dir=base_test_env["test_dir"], + n_epoch=1, + batch_size=2, + checkpoint_interval=3, + accumulation_steps=1, + max_grad_norm=1.0, + random_seed=42 + ) + + schedule_config = CosineScheduleConfig( + warmup_steps=10, + total_steps=20 + ) + + # Create custom callbacks to track calls + callback_calls = [] + + class TrackingCallback(TrainerCallback): + def on_train_begin(self, trainer, **kwargs): + callback_calls.append('on_train_begin') + + def on_batch_end(self, trainer, **kwargs): + callback_calls.append('on_batch_end') + + def on_epoch_end(self, trainer, **kwargs): + callback_calls.append('on_epoch_end') + + train_config.strategy = StrategyFactory.load(base_test_env["model"], "seq") + model_parameter = ModelParameter( + base_test_env["model"], + base_test_env["tokenizer"], + base_test_env["transformer_config"] + ) + + trainer = Trainer( + model_parameter, + train_config, + schedule_config, + callbacks=[TrackingCallback(), ProgressBarCallback()] + ) + + trainer.train() + + # Verify callbacks were called + assert 'on_train_begin' in callback_calls + assert 'on_batch_end' in callback_calls + assert 'on_epoch_end' in callback_calls \ No newline at end of file diff --git a/tests/test_dataset_loader.py b/tests/test_dataset_loader.py new file mode 100644 index 0000000..d92d2da --- /dev/null +++ b/tests/test_dataset_loader.py @@ -0,0 +1,69 @@ +import os +import torch +import pickle +import numpy as np + +from khaosz.core import * +from khaosz.trainer import * +from khaosz.trainer.data_util import * + +def test_dataset_loader_random_paths(base_test_env): + """Test dataset loader with multiple random paths""" + test_dir = base_test_env["test_dir"] + + # Create multiple pkl files with random data + num_files = np.random.randint(2, 5) + pkl_paths = [] + + for i in range(num_files): + pkl_path = os.path.join(test_dir, f"test_data_{i}.pkl") + seq_length = np.random.randint(50, 100) + dummy_data = { + "sequence": torch.randint(0, 1000, (seq_length,)), + "chosen": torch.randint(0, 1000, (seq_length,)), + "rejected": torch.randint(0, 1000, (seq_length,)), + "chosen_mask": torch.ones(seq_length, dtype=torch.bool), + "rejected_mask": torch.ones(seq_length, dtype=torch.bool) + } + with open(pkl_path, "wb") as f: + pickle.dump(dummy_data, f) + pkl_paths.append(pkl_path) + + # Test loading with multiple paths + loaded_dataset = DatasetLoader.load( + train_type="seq", + load_path=pkl_paths, + max_len=64, + device="cpu" + ) + assert loaded_dataset is not None + assert len(loaded_dataset) > 0 + +def test_dpo_strategy_with_random_data(base_test_env): + """Test DPO strategy with randomized preference data""" + test_dir = base_test_env["test_dir"] + + # Create DPO-style data + pkl_path = os.path.join(test_dir, "dpo_data.pkl") + seq_length = np.random.randint(40, 80) + + dummy_data = { + "chosen": torch.randint(0, 1000, (seq_length,)), + "rejected": torch.randint(0, 1000, (seq_length,)), + "chosen_mask": torch.ones(seq_length, dtype=torch.bool), + "rejected_mask": torch.ones(seq_length, dtype=torch.bool) + } + + with open(pkl_path, "wb") as f: + pickle.dump(dummy_data, f) + + # Load DPO dataset + dpo_dataset = DatasetLoader.load( + train_type="dpo", + load_path=pkl_path, + max_len=64, + device="cpu" + ) + + assert dpo_dataset is not None + assert hasattr(dpo_dataset, 'fetcher') \ No newline at end of file diff --git a/tests/test_early_stopping.py b/tests/test_early_stopping.py new file mode 100644 index 0000000..e4f12f5 --- /dev/null +++ b/tests/test_early_stopping.py @@ -0,0 +1,63 @@ +import torch + +from torch.utils.data import Dataset +from khaosz.core import * +from khaosz.trainer import * +from khaosz.trainer.data_util import * + +def test_early_stopping_simulation(base_test_env): + """Simulate early stopping behavior""" + class EarlyStoppingDataset(Dataset): + def __init__(self, length=10, stop_after=5): + self.length = length + self.stop_after = stop_after + self.count = 0 + + def __len__(self): + return self.length + + def __getitem__(self, idx): + self.count += 1 + if self.count == self.stop_after: + raise RuntimeError("Simulated early stopping") + + return { + "input_ids": torch.randint(0, 1000, (64,)), + "target_ids": torch.randint(0, 1000, (64,)) + } + + dataset = EarlyStoppingDataset() + + optimizer = torch.optim.AdamW(base_test_env["model"].parameters()) + train_config = TrainConfig( + dataset=dataset, + optimizer=optimizer, + checkpoint_dir=base_test_env["test_dir"], + n_epoch=2, + batch_size=2, + checkpoint_interval=1, + accumulation_steps=1, + max_grad_norm=1.0, + random_seed=42 + ) + + train_config.strategy = StrategyFactory.load(base_test_env["model"], "seq") + model_parameter = ModelParameter( + base_test_env["model"], + base_test_env["tokenizer"], + base_test_env["transformer_config"] + ) + schedule_config = CosineScheduleConfig(warmup_steps=10, total_steps=20) + trainer = Trainer(model_parameter, train_config, schedule_config) + + # Should handle early stopping gracefully + checkpoint = None + try: + checkpoint = trainer.train() + assert len(checkpoint.loss_list) == 2 + except Exception: + # Handle any exceptions + pass + + checkpoint = trainer.train(checkpoint) + assert len(checkpoint.loss_list) == 10 \ No newline at end of file diff --git a/tests/test_sampler.py b/tests/test_sampler.py new file mode 100644 index 0000000..0d2f375 --- /dev/null +++ b/tests/test_sampler.py @@ -0,0 +1,128 @@ +import os +import torch +from khaosz.core import * +from khaosz.trainer import * +from khaosz.trainer.data_util import * + +def test_random_sampler_consistency(random_dataset): + """Test RandomSampler produces consistent results with same seed""" + dataset = random_dataset + + # Create two samplers with same seed + sampler1 = RandomSampler(dataset, seed=42) + sampler2 = RandomSampler(dataset, seed=42) + + indices1 = list(iter(sampler1)) + indices2 = list(iter(sampler2)) + + assert indices1 == indices2 + +def test_random_sampler_different_seeds(random_dataset): + """Test RandomSampler produces different results with different seeds""" + dataset = random_dataset + + # Create two samplers with different seeds + sampler1 = RandomSampler(dataset, seed=42) + sampler2 = RandomSampler(dataset, seed=123) + + indices1 = list(iter(sampler1)) + indices2 = list(iter(sampler2)) + + # Very high probability they should be different + assert indices1 != indices2 + +def test_sampler_state_persistence(random_dataset): + """Test that sampler state is correctly saved and loaded""" + dataset = random_dataset + n = len(dataset) + + # Create sampler and get some indices + sampler = RandomSampler(dataset, seed=42) + iter1 = iter(sampler) + indices1 = [next(iter1) for _ in range(min(10, n))] + + # Save state + state_dict = sampler.state_dict() + + # Get more indices + indices2 = [next(iter1) for _ in range(min(10, n - len(indices1)))] + + # Create new sampler and load state + sampler2 = RandomSampler(dataset, seed=42) + sampler2.load_state_dict(state_dict) + + # Check that new sampler produces same sequence from saved point + iter2 = iter(sampler2) + indices3 = [next(iter2) for _ in range(min(10, n - len(indices1)))] + + assert indices2 == indices3 + +def test_training_resume_with_sampler(base_test_env, random_dataset): + """Test that training can resume correctly with sampler state""" + test_dir = base_test_env["test_dir"] + dataset = random_dataset + + # Initial training config + optimizer = torch.optim.AdamW(base_test_env["model"].parameters()) + train_config = TrainConfig( + dataset=dataset, + optimizer=optimizer, + checkpoint_dir=test_dir, + n_epoch=1, + batch_size=2, + checkpoint_interval=5, + accumulation_steps=1, + max_grad_norm=1.0, + random_seed=42 + ) + + train_config.strategy = StrategyFactory.load(base_test_env["model"], "seq") + model_parameter = ModelParameter( + base_test_env["model"], + base_test_env["tokenizer"], + base_test_env["transformer_config"] + ) + schedule_config = CosineScheduleConfig(warmup_steps=10, total_steps=20) + + # First training run - stop after a few steps + trainer = Trainer(model_parameter, train_config, schedule_config) + try: + # Run for a few steps then interrupt + for i, _ in enumerate(trainer.train()): + if i >= 3: # Run for 3 steps then stop + break + except Exception: + pass + + # Load checkpoint + checkpoint_path = os.path.join(test_dir, "iter_3") + checkpoint = Checkpoint().load(checkpoint_path) + + # Resume training + trainer = Trainer(model_parameter, train_config, schedule_config) + resumed_checkpoint = trainer.train(checkpoint) + + # Check that training resumed from correct point + assert resumed_checkpoint.sampler_state['current_iter'] > 3 + +def test_sampler_across_epochs(base_test_env, random_dataset): + """Test sampler behavior across multiple epochs""" + dataset = random_dataset + n = len(dataset) + + sampler = RandomSampler(dataset, seed=42) + + # Get indices for first epoch + epoch1_indices = list(iter(sampler)) + assert len(epoch1_indices) == n + + # Get indices for second epoch + epoch2_indices = list(iter(sampler)) + assert len(epoch2_indices) == n + + # Check that epochs have different order (should be random) + assert epoch1_indices != epoch2_indices + + # Check that all indices are present in each epoch + assert set(epoch1_indices) == set(range(n)) + assert set(epoch2_indices) == set(range(n)) \ No newline at end of file diff --git a/tests/test_train_config.py b/tests/test_train_config.py new file mode 100644 index 0000000..2236a41 --- /dev/null +++ b/tests/test_train_config.py @@ -0,0 +1,90 @@ +import torch +import numpy as np + +from khaosz.core import * +from khaosz.trainer import * +from khaosz.trainer.data_util import * + +def test_different_batch_sizes(base_test_env, random_dataset): + """Test training with different batch sizes""" + batch_sizes = [1, 2, 4, 8] + + for batch_size in batch_sizes: + optimizer = torch.optim.AdamW(base_test_env["model"].parameters()) + train_config = TrainConfig( + dataset=random_dataset, + optimizer=optimizer, + checkpoint_dir=base_test_env["test_dir"], + n_epoch=1, + batch_size=batch_size, + checkpoint_interval=5, + accumulation_steps=1, + max_grad_norm=1.0, + random_seed=np.random.randint(1000) + ) + + assert train_config.batch_size == batch_size + +def test_gradient_accumulation(base_test_env, random_dataset): + """Test training with different gradient accumulation steps""" + accumulation_steps_list = [1, 2, 4] + + for accumulation_steps in accumulation_steps_list: + optimizer = torch.optim.AdamW(base_test_env["model"].parameters()) + train_config = TrainConfig( + dataset=random_dataset, + optimizer=optimizer, + checkpoint_dir=base_test_env["test_dir"], + n_epoch=1, + batch_size=2, + checkpoint_interval=10, + accumulation_steps=accumulation_steps, + max_grad_norm=1.0, + random_seed=42 + ) + + schedule_config = CosineScheduleConfig( + warmup_steps=10, + total_steps=20 + ) + + train_config.strategy = StrategyFactory.load( + base_test_env["model"], + "seq" + ) + + model_parameter = ModelParameter( + base_test_env["model"], + base_test_env["tokenizer"], + base_test_env["transformer_config"] + ) + + trainer = Trainer(model_parameter, train_config, schedule_config) + trainer.train() + + assert train_config.accumulation_steps == accumulation_steps + +def test_memory_efficient_training(base_test_env, random_dataset): + """Test training with memory-efficient configurations""" + # Test with smaller batch sizes and gradient checkpointing + small_batch_configs = [ + {"batch_size": 1, "accumulation_steps": 8}, + {"batch_size": 2, "accumulation_steps": 4}, + {"batch_size": 4, "accumulation_steps": 2} + ] + + for config in small_batch_configs: + optimizer = torch.optim.AdamW(base_test_env["model"].parameters()) + train_config = TrainConfig( + dataset=random_dataset, + optimizer=optimizer, + checkpoint_dir=base_test_env["test_dir"], + n_epoch=1, + batch_size=config["batch_size"], + checkpoint_interval=5, + accumulation_steps=config["accumulation_steps"], + max_grad_norm=1.0, + random_seed=42 + ) + + assert train_config.accumulation_steps == config["accumulation_steps"] \ No newline at end of file diff --git a/tests/test_train_strategy.py b/tests/test_train_strategy.py new file mode 100644 index 0000000..a884bbb --- /dev/null +++ b/tests/test_train_strategy.py @@ -0,0 +1,71 @@ +import torch +import numpy as np + +from khaosz.core import * +from khaosz.trainer import * +from khaosz.trainer.data_util import * + +def test_multi_turn_training(base_test_env, multi_turn_dataset): + """Test training with multi-turn conversation data""" + optimizer = torch.optim.AdamW(base_test_env["model"].parameters()) + train_config = TrainConfig( + dataset=multi_turn_dataset, + optimizer=optimizer, + checkpoint_dir=base_test_env["test_dir"], + n_epoch=2, + batch_size=2, + checkpoint_interval=3, + accumulation_steps=1, + max_grad_norm=1.0, + random_seed=int(np.random.randint(1000)) + ) + + schedule_config = CosineScheduleConfig( + warmup_steps=50, + total_steps=100 + ) + + train_config.strategy = StrategyFactory.load( + base_test_env["model"], + "sft", + bos_token_id=2, + eos_token_id=3, + user_token_id=1, + multi_turn=True + ) + + model_parameter = ModelParameter( + base_test_env["model"], + base_test_env["tokenizer"], + base_test_env["transformer_config"] + ) + + trainer = Trainer(model_parameter, train_config, schedule_config) + checkpoint = trainer.train() + + assert len(checkpoint.loss_list) > 0 + +def test_schedule_factory_random_configs(): + """Test scheduler factory with random configurations""" + schedule_configs = [ + CosineScheduleConfig( + warmup_steps=np.random.randint(50, 200), + total_steps=np.random.randint(1000, 5000), + min_rate=np.random.uniform(0.01, 0.1) + ), + SgdrScheduleConfig( + warmup_steps=np.random.randint(50, 200), + cycle_length=np.random.randint(500, 2000), + t_mult=np.random.randint(1, 3), + min_rate=np.random.uniform(0.01, 0.1) + ) + ] + + for config in schedule_configs: + schedule_fn = SchedulerFactory.load_schedule_fn(config) + assert callable(schedule_fn) + + # Test the schedule function at different steps + for step in [0, config.warmup_steps // 2, config.warmup_steps, config.warmup_steps * 2]: + lr_mult = schedule_fn(step) + assert 0 <= lr_mult <= 1 \ No newline at end of file diff --git a/tests/test_trainer.py b/tests/test_trainer.py deleted file mode 100644 index 82dbfb0..0000000 --- a/tests/test_trainer.py +++ /dev/null @@ -1,466 +0,0 @@ -import os -import json -import torch -import shutil -import pytest -import pickle -import tempfile -import numpy as np - -from torch.utils.data import Dataset -from khaosz.core import * -from khaosz.trainer import * -from khaosz.trainer.data_util import * - -import matplotlib -matplotlib.use('Agg') - - -@pytest.fixture -def test_env(): - """Setup test environment with randomized data""" - test_dir = tempfile.mkdtemp() - config_path = os.path.join(test_dir, "config.json") - - n_dim_choices = [8, 16, 32] - n_head_choices = [2, 4] - - n_dim = int(np.random.choice(n_dim_choices)) - n_head = int(np.random.choice(n_head_choices)) - n_kvhead = n_head // 2 - d_ffn = n_dim * 2 - - config = { - "vocab_size": 1000, - "n_dim": n_dim, - "n_head": n_head, - "n_kvhead": n_kvhead, - "d_ffn": d_ffn, - "m_len": 1024, - "n_layer": 4, - "norm_eps": 1e-5 - } - - with open(config_path, 'w') as f: - json.dump(config, f) - - transformer_config = TransformerConfig().load(config_path) - model = Transformer(transformer_config) - tokenizer = BpeTokenizer() - - class RandomDataset(Dataset): - def __init__(self, length=None, max_length=64, vocab_size=1000): - self.length = length or int(np.random.randint(100, 200)) - self.max_length = max_length - self.vocab_size = vocab_size - - - def __len__(self): - return self.length - - def __getitem__(self, idx): - - return { - "input_ids": torch.randint(0, self.vocab_size, (self.max_length,)), - "target_ids": torch.randint(0, self.vocab_size, (self.max_length,)) - } - - class MultiTurnDataset(Dataset): - def __init__(self, length=None, max_length=64, vocab_size=1000): - self.length = length or int(np.random.randint(100, 200)) - self.max_length = max_length - self.vocab_size = vocab_size - - - def __len__(self): - return self.length - - def __getitem__(self, idx): - input_ids = torch.randint(0, self.vocab_size, (self.max_length,)) - target_ids = torch.randint(0, self.vocab_size, (self.max_length,)) - loss_mask = build_loss_mask(input_ids, 0, 1) - attn_mask = build_attention_mask(input_ids, 2, True) - - return { - "input_ids": input_ids, - "target_ids": target_ids, - "loss_mask": loss_mask, - "attn_mask": attn_mask, - } - - dataset = RandomDataset() - multi_turn_dataset = MultiTurnDataset() - - yield { - "test_dir": test_dir, - "config_path": config_path, - "transformer_config": transformer_config, - "model": model, - "tokenizer": tokenizer, - "dataset": dataset, - "multi_turn_dataset": multi_turn_dataset - } - - shutil.rmtree(test_dir) - -def test_dataset_loader_random_paths(test_env): - """Test dataset loader with multiple random paths""" - test_dir = test_env["test_dir"] - - # Create multiple pkl files with random data - num_files = np.random.randint(2, 5) - pkl_paths = [] - - for i in range(num_files): - pkl_path = os.path.join(test_dir, f"test_data_{i}.pkl") - seq_length = np.random.randint(50, 100) - dummy_data = { - "sequence": torch.randint(0, 1000, (seq_length,)), - "chosen": torch.randint(0, 1000, (seq_length,)), - "rejected": torch.randint(0, 1000, (seq_length,)), - "chosen_mask": torch.ones(seq_length, dtype=torch.bool), - "rejected_mask": torch.ones(seq_length, dtype=torch.bool) - } - with open(pkl_path, "wb") as f: - pickle.dump(dummy_data, f) - pkl_paths.append(pkl_path) - - # Test loading with multiple paths - loaded_dataset = DatasetLoader.load( - train_type="seq", - load_path=pkl_paths, - max_len=64, - device="cpu" - ) - assert loaded_dataset is not None - assert len(loaded_dataset) > 0 - - -def test_different_batch_sizes(test_env): - """Test training with different batch sizes""" - batch_sizes = [1, 2, 4, 8] - - for batch_size in batch_sizes: - optimizer = torch.optim.AdamW(test_env["model"].parameters()) - train_config = TrainConfig( - dataset=test_env["dataset"], - optimizer=optimizer, - checkpoint_dir=test_env["test_dir"], - n_epoch=1, - batch_size=batch_size, - checkpoint_interval=5, - accumulation_steps=1, - max_grad_norm=1.0, - random_seed=np.random.randint(1000) - ) - - assert train_config.batch_size == batch_size - - -def test_random_sampler_consistency(test_env): - """Test RandomSampler produces consistent results with same seed""" - dataset = test_env["dataset"] - - # Create two samplers with same seed - sampler1 = RandomSampler(dataset, seed=42) - sampler2 = RandomSampler(dataset, seed=42) - - indices1 = list(iter(sampler1)) - indices2 = list(iter(sampler2)) - - assert indices1 == indices2 - - -def test_random_sampler_different_seeds(test_env): - """Test RandomSampler produces different results with different seeds""" - dataset = test_env["dataset"] - - # Create two samplers with different seeds - sampler1 = RandomSampler(dataset, seed=42) - sampler2 = RandomSampler(dataset, seed=123) - - indices1 = list(iter(sampler1)) - indices2 = list(iter(sampler2)) - - # Very high probability they should be different - assert indices1 != indices2 - - -def test_schedule_factory_random_configs(test_env): - """Test scheduler factory with random configurations""" - schedule_configs = [ - CosineScheduleConfig( - warmup_steps=np.random.randint(50, 200), - total_steps=np.random.randint(1000, 5000), - min_rate=np.random.uniform(0.01, 0.1) - ), - SgdrScheduleConfig( - warmup_steps=np.random.randint(50, 200), - cycle_length=np.random.randint(500, 2000), - t_mult=np.random.randint(1, 3), - min_rate=np.random.uniform(0.01, 0.1) - ) - ] - - for config in schedule_configs: - schedule_fn = SchedulerFactory.load_schedule_fn(config) - assert callable(schedule_fn) - - # Test the schedule function at different steps - for step in [0, config.warmup_steps // 2, config.warmup_steps, config.warmup_steps * 2]: - lr_mult = schedule_fn(step) - assert 0 <= lr_mult <= 1 - - -def test_multi_turn_training(test_env): - """Test training with multi-turn conversation data""" - optimizer = torch.optim.AdamW(test_env["model"].parameters()) - train_config = TrainConfig( - dataset=test_env["multi_turn_dataset"], - optimizer=optimizer, - checkpoint_dir=test_env["test_dir"], - n_epoch=2, - batch_size=2, - checkpoint_interval=3, - accumulation_steps=1, - max_grad_norm=1.0, - random_seed=int(np.random.randint(1000)) - ) - - schedule_config = CosineScheduleConfig( - warmup_steps=50, - total_steps=100 - ) - - train_config.strategy = StrategyFactory.load( - test_env["model"], - "sft", - bos_token_id=2, - eos_token_id=3, - user_token_id=1, - multi_turn=True - ) - - model_parameter = ModelParameter( - test_env["model"], - test_env["tokenizer"], - test_env["transformer_config"] - ) - - trainer = Trainer(model_parameter, train_config, schedule_config) - checkpoint = trainer.train() - - assert len(checkpoint.loss_list) > 0 - - -def test_gradient_accumulation(test_env): - """Test training with different gradient accumulation steps""" - accumulation_steps_list = [1, 2, 4] - - for accumulation_steps in accumulation_steps_list: - optimizer = torch.optim.AdamW(test_env["model"].parameters()) - train_config = TrainConfig( - dataset=test_env["dataset"], - optimizer=optimizer, - checkpoint_dir=test_env["test_dir"], - n_epoch=1, - batch_size=2, - checkpoint_interval=10, - accumulation_steps=accumulation_steps, - max_grad_norm=1.0, - random_seed=42 - ) - - schedule_config = CosineScheduleConfig( - warmup_steps=10, - total_steps=20 - ) - - train_config.strategy = StrategyFactory.load( - test_env["model"], - "seq" - ) - - model_parameter = ModelParameter( - test_env["model"], - test_env["tokenizer"], - test_env["transformer_config"] - ) - - trainer = Trainer(model_parameter, train_config, schedule_config) - trainer.train() - - assert train_config.accumulation_steps == accumulation_steps - -def test_dpo_strategy_with_random_data(test_env): - """Test DPO strategy with randomized preference data""" - test_dir = test_env["test_dir"] - - # Create DPO-style data - pkl_path = os.path.join(test_dir, "dpo_data.pkl") - seq_length = np.random.randint(40, 80) - - dummy_data = { - "chosen": torch.randint(0, 1000, (seq_length,)), - "rejected": torch.randint(0, 1000, (seq_length,)), - "chosen_mask": torch.ones(seq_length, dtype=torch.bool), - "rejected_mask": torch.ones(seq_length, dtype=torch.bool) - } - - with open(pkl_path, "wb") as f: - pickle.dump(dummy_data, f) - - # Load DPO dataset - dpo_dataset = DatasetLoader.load( - train_type="dpo", - load_path=pkl_path, - max_len=64, - device="cpu" - ) - - assert dpo_dataset is not None - assert hasattr(dpo_dataset, 'fetcher') - - -def test_callback_integration(test_env): - """Test that all callbacks are properly integrated""" - optimizer = torch.optim.AdamW(test_env["model"].parameters()) - train_config = TrainConfig( - dataset=test_env["dataset"], - optimizer=optimizer, - checkpoint_dir=test_env["test_dir"], - n_epoch=1, - batch_size=2, - checkpoint_interval=3, - accumulation_steps=1, - max_grad_norm=1.0, - random_seed=42 - ) - - schedule_config = CosineScheduleConfig( - warmup_steps=10, - total_steps=20 - ) - - # Create custom callbacks to track calls - callback_calls = [] - - class TrackingCallback(TrainerCallback): - def on_train_begin(self, trainer, **kwargs): - callback_calls.append('on_train_begin') - - def on_batch_end(self, trainer, **kwargs): - callback_calls.append('on_batch_end') - - def on_epoch_end(self, trainer, **kwargs): - callback_calls.append('on_epoch_end') - - train_config.strategy = StrategyFactory.load(test_env["model"], "seq") - model_parameter = ModelParameter( - test_env["model"], - test_env["tokenizer"], - test_env["transformer_config"] - ) - - trainer = Trainer( - model_parameter, - train_config, - schedule_config, - callbacks=[TrackingCallback(), ProgressBarCallback()] - ) - - trainer.train() - - # Verify callbacks were called - assert 'on_train_begin' in callback_calls - assert 'on_batch_end' in callback_calls - assert 'on_epoch_end' in callback_calls - - -def test_memory_efficient_training(test_env): - """Test training with memory-efficient configurations""" - # Test with smaller batch sizes and gradient checkpointing - small_batch_configs = [ - {"batch_size": 1, "accumulation_steps": 8}, - {"batch_size": 2, "accumulation_steps": 4}, - {"batch_size": 4, "accumulation_steps": 2} - ] - - for config in small_batch_configs: - optimizer = torch.optim.AdamW(test_env["model"].parameters()) - train_config = TrainConfig( - dataset=test_env["dataset"], - optimizer=optimizer, - checkpoint_dir=test_env["test_dir"], - n_epoch=1, - batch_size=config["batch_size"], - checkpoint_interval=5, - accumulation_steps=config["accumulation_steps"], - max_grad_norm=1.0, - random_seed=42 - ) - - assert train_config.accumulation_steps == config["accumulation_steps"] - - -def test_early_stopping_simulation(test_env): - """Simulate early stopping behavior""" - class EarlyStoppingDataset(Dataset): - def __init__(self, length=10, stop_after=5): - self.length = length - self.stop_after = stop_after - self.count = 0 - - def __len__(self): - return self.length - - def __getitem__(self, idx): - self.count += 1 - if self.count == self.stop_after: - raise RuntimeError("Simulated early stopping") - - return { - "input_ids": torch.randint(0, 1000, (64,)), - "target_ids": torch.randint(0, 1000, (64,)) - } - - dataset = EarlyStoppingDataset() - - optimizer = torch.optim.AdamW(test_env["model"].parameters()) - train_config = TrainConfig( - dataset=dataset, - optimizer=optimizer, - checkpoint_dir=test_env["test_dir"], - n_epoch=2, - batch_size=2, - checkpoint_interval=1, - accumulation_steps=1, - max_grad_norm=1.0, - random_seed=42 - ) - - train_config.strategy = StrategyFactory.load(test_env["model"], "seq") - model_parameter = ModelParameter( - test_env["model"], - test_env["tokenizer"], - test_env["transformer_config"] - ) - schedule_config = CosineScheduleConfig(warmup_steps=10, total_steps=20) - trainer = Trainer(model_parameter, train_config, schedule_config) - - # Should handle early stopping gracefully - checkpoint = None - try: - checkpoint = trainer.train() - assert len(checkpoint.loss_list) == 2 - except Exception: - # Handle any exceptions - pass - - checkpoint = trainer.train(checkpoint) - assert len(checkpoint.loss_list) == 10 + 1 - - -if __name__ == "__main__": - # Run all tests - pytest.main([__file__, "-v"]) \ No newline at end of file