Skip to content

Commit

Permalink
Merge branch 'main' into milo/default-eval-interval
Browse files Browse the repository at this point in the history
  • Loading branch information
milocress authored Aug 20, 2024
2 parents df47dfa + d3a9ac9 commit a2f7ace
Show file tree
Hide file tree
Showing 109 changed files with 1,249 additions and 878 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/pr-cpu.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ jobs:
- name: Checkout code
uses: actions/checkout@v2
- name: Run PR CPU Tests
uses: mosaicml/ci-testing/.github/actions/[email protected].0
uses: mosaicml/ci-testing/.github/actions/[email protected].2
with:
name: ${{ matrix.name }}
container: ${{ matrix.container }}
Expand Down
12 changes: 6 additions & 6 deletions .github/workflows/pr-gpu.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,10 @@ jobs:
markers: "gpu"
pip_deps: "[all]"
pytest_command: "coverage run -m pytest"
ci_repo_gpu_test_ref: v0.1.0
ci_repo_gpu_test_ref: v0.1.2
steps:
- name: Run PR GPU Tests
uses: mosaicml/ci-testing/.github/actions/[email protected].0
uses: mosaicml/ci-testing/.github/actions/[email protected].2
with:
container: ${{ matrix.container }}
git_repo: mosaicml/llm-foundry
Expand All @@ -56,10 +56,10 @@ jobs:
markers: "gpu"
pip_deps: "[all]"
pytest_command: "coverage run -m pytest"
ci_repo_gpu_test_ref: v0.1.0
ci_repo_gpu_test_ref: v0.1.2
steps:
- name: Run PR GPU Tests
uses: mosaicml/ci-testing/.github/actions/[email protected].0
uses: mosaicml/ci-testing/.github/actions/[email protected].2
with:
container: ${{ matrix.container }}
git_repo: mosaicml/llm-foundry
Expand All @@ -85,10 +85,10 @@ jobs:
markers: "gpu"
pip_deps: "[all]"
pytest_command: "coverage run -m pytest"
ci_repo_gpu_test_ref: v0.1.0
ci_repo_gpu_test_ref: v0.1.2
steps:
- name: Run PR GPU Tests
uses: mosaicml/ci-testing/.github/actions/[email protected].0
uses: mosaicml/ci-testing/.github/actions/[email protected].2
with:
container: ${{ matrix.container }}
git_repo: mosaicml/llm-foundry
Expand Down
2 changes: 1 addition & 1 deletion llmfoundry/_version.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,4 @@

"""The LLM Foundry Version."""

__version__ = '0.11.0.dev'
__version__ = '0.12.0.dev0'
3 changes: 2 additions & 1 deletion llmfoundry/callbacks/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

from llmfoundry.callbacks.async_eval_callback import AsyncEval
from llmfoundry.callbacks.curriculum_learning_callback import CurriculumLearning
from llmfoundry.callbacks.env_logging_callback import EnvironmentLoggingCallback
from llmfoundry.callbacks.eval_gauntlet_callback import EvalGauntlet
from llmfoundry.callbacks.eval_output_logging_callback import EvalOutputLogging
from llmfoundry.callbacks.fdiff_callback import FDiffMetrics
Expand Down Expand Up @@ -55,8 +56,8 @@
callbacks.register('eval_output_logging', func=EvalOutputLogging)
callbacks.register('mbmoe_tok_per_expert', func=MegaBlocksMoE_TokPerExpert)
callbacks.register('run_timeout', func=RunTimeoutCallback)

callbacks.register('loss_perp_v_len', func=LossPerpVsContextLengthLogger)
callbacks.register('env_logger', func=EnvironmentLoggingCallback)

callbacks_with_config.register('async_eval', func=AsyncEval)
callbacks_with_config.register('curriculum_learning', func=CurriculumLearning)
Expand Down
32 changes: 16 additions & 16 deletions llmfoundry/callbacks/async_eval_callback.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
import warnings
from collections import Counter
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple, Union
from typing import Any, Optional, Union

from composer.callbacks import CheckpointSaver
from composer.core import Event, State, Time, Timestamp, TimeUnit
Expand Down Expand Up @@ -84,10 +84,10 @@ def get_run_name(training_run_name: str, current_interval: str) -> str:


def get_eval_parameters(
parameters: Dict[str, Any],
parameters: dict[str, Any],
checkpoint: str,
training_run_name: str,
) -> Dict[str, Any]:
) -> dict[str, Any]:
"""Get the parameters needed for the eval run.
Args:
Expand Down Expand Up @@ -164,8 +164,8 @@ def validate_interval(


def validate_eval_run_config(
eval_run_config: Optional[Dict[str, Any]],
) -> Dict[str, Any]:
eval_run_config: Optional[dict[str, Any]],
) -> dict[str, Any]:

if not eval_run_config:
return {}
Expand Down Expand Up @@ -220,9 +220,9 @@ class AsyncEval(CallbackWithConfig):

def __init__(
self,
train_config: Dict[str, Any],
train_config: dict[str, Any],
interval: Union[str, int, Time],
eval_run_config: Optional[Dict[str, Any]] = None,
eval_run_config: Optional[dict[str, Any]] = None,
):

# Run these during init to fail fast in any of the error cases
Expand Down Expand Up @@ -263,7 +263,7 @@ def __init__(

# Keep track of checkpoints that have already been evaled
# Format: {eval_timestamp: (checkpoint, run_name)}
self.checkpoints_evaled: Dict[Time, Tuple[str, str]] = {}
self.checkpoints_evaled: dict[Time, tuple[str, str]] = {}

# Scheduling is based on the check interval, while _get_checkpoints_and_launch_runs
# will only launch runs at the interval
Expand All @@ -279,7 +279,7 @@ def __init__(
f'interval {interval}, checking at {self.check_interval}',
)

def state_dict(self) -> Dict[str, Any]:
def state_dict(self) -> dict[str, Any]:
checkpoints_evaled = []
for eval_ts, (checkpoint, run_name) in self.checkpoints_evaled.items():
eval_ts_dict = {
Expand All @@ -292,7 +292,7 @@ def state_dict(self) -> Dict[str, Any]:
'checkpoints_evaled': checkpoints_evaled,
}

def load_state_dict(self, state_dict: Dict[str, Any]):
def load_state_dict(self, state_dict: dict[str, Any]):
previous_checkpoints_evaled = state_dict.get('checkpoints_evaled', [])
if previous_checkpoints_evaled:
for (eval_ts, checkpoint, run_name) in previous_checkpoints_evaled:
Expand All @@ -305,9 +305,9 @@ def load_state_dict(self, state_dict: Dict[str, Any]):

@staticmethod
def _get_ready_sharded_checkpoints(
checkpointer_checkpoints: Dict[str, Timestamp],
remote_files: List[str],
) -> Dict[str, Timestamp]:
checkpointer_checkpoints: dict[str, Timestamp],
remote_files: list[str],
) -> dict[str, Timestamp]:
"""Identify checkpoints ready to be evaled based on remote files.
This has special logic for sharded checkpoints to consider checkpoints composed
Expand Down Expand Up @@ -349,9 +349,9 @@ def _get_ready_sharded_checkpoints(

@staticmethod
def _get_ready_single_checkpoints(
checkpointer_checkpoints: Dict[str, Timestamp],
remote_checkpoints: List[str],
) -> Dict[str, Timestamp]:
checkpointer_checkpoints: dict[str, Timestamp],
remote_checkpoints: list[str],
) -> dict[str, Timestamp]:
"""Identify checkpoints ready to be evaled based on remote checkpoints.
This is much simpler than the sharded case, because there is only one file
Expand Down
188 changes: 188 additions & 0 deletions llmfoundry/callbacks/env_logging_callback.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
# Copyright 2024 MosaicML LLM Foundry authors
# SPDX-License-Identifier: Apache-2.0

import os
import platform
import socket
from typing import Any, Optional

import git
import pkg_resources
import psutil
import torch
from composer.core import Callback, State
from composer.loggers import Logger
from composer.utils import dist

from mcli import sdk

__all__ = ['EnvironmentLoggingCallback']

_PACKAGES_TO_LOG = [
'llm-foundry',
'mosaicml',
'megablocks',
'grouped-gemm',
'torch',
'flash_attn',
'transformers',
'datasets',
'peft',
]


class EnvironmentLoggingCallback(Callback):
"""A callback for logging environment information during model training.
This callback collects various pieces of information about the training environment,
including git repository details, package versions, system information, GPU details,
distributed training setup, NVIDIA driver information, and Docker container details.
Args:
workspace_dir (str): The directory containing the workspace. Defaults to '/workspace'.
log_git (bool): Whether to log git repository information. Defaults to True.
log_packages (bool): Whether to log package versions. Defaults to True.
log_nvidia (bool): Whether to log NVIDIA driver information. Defaults to True.
log_docker (bool): Whether to log Docker container information. Defaults to True.
log_system (bool): Whether to log system information. Defaults to False.
log_gpu (bool): Whether to log GPU information. Defaults to False.
log_distributed (bool): Whether to log distributed training information. Defaults to False.
packages_to_log (list[str]): A list of package names to log versions for. Defaults to None.
The collected information is logged as hyperparameters at the start of model fitting.
"""

def __init__(
self,
workspace_dir: str = '/workspace',
log_git: bool = True,
log_nvidia: bool = True,
log_docker: bool = True,
log_packages: bool = True,
log_system: bool = False,
log_gpu: bool = False,
log_distributed: bool = False,
packages_to_log: Optional[list[str]] = None,
):
self.workspace_dir = workspace_dir
self.log_git = log_git
self.log_packages = log_packages
self.log_nvidia = log_nvidia
self.log_docker = log_docker
self.log_system = log_system
self.log_gpu = log_gpu
self.log_distributed = log_distributed
self.env_data: dict[str, Any] = {}
self.packages_to_log = packages_to_log or _PACKAGES_TO_LOG

def _get_git_info(self, repo_path: str) -> Optional[dict[str, str]]:
if not os.path.isdir(repo_path):
return None
try:
repo = git.Repo(repo_path)
return {
'commit_hash': repo.head.commit.hexsha,
'branch': repo.active_branch.name,
}
except (git.InvalidGitRepositoryError, git.NoSuchPathError):
return None

def _get_package_version(self, package_name: str) -> Optional[str]:
try:
return pkg_resources.get_distribution(package_name).version
except pkg_resources.DistributionNotFound:
return None

def _get_system_info(self) -> dict[str, Any]:
return {
'python_version': platform.python_version(),
'os': f'{platform.system()} {platform.release()}',
'hostname': socket.gethostname(),
'cpu_info': {
'model': platform.processor(),
'cores': psutil.cpu_count(logical=False),
'threads': psutil.cpu_count(logical=True),
},
'memory': {
'total': psutil.virtual_memory().total,
'available': psutil.virtual_memory().available,
},
}

def _get_gpu_info(self) -> dict[str, Any]:
if torch.cuda.is_available():
return {
'model': torch.cuda.get_device_name(0),
'count': torch.cuda.device_count(),
'memory': {
'total': torch.cuda.get_device_properties(0).total_memory,
'allocated': torch.cuda.memory_allocated(0),
},
}
return {'available': False}

def _get_nvidia_info(self) -> dict[str, Any]:
if torch.cuda.is_available():
nccl_version = torch.cuda.nccl.version() # type: ignore
return {
'cuda_version':
torch.version.cuda, # type: ignore[attr-defined]
'cudnn_version': str(
torch.backends.cudnn.version(),
), # type: ignore[attr-defined]
'nccl_version': '.'.join(map(str, nccl_version)),
}
return {'available': False}

def _get_distributed_info(self) -> dict[str, Any]:
return {
'world_size': dist.get_world_size(),
'local_world_size': dist.get_local_world_size(),
'rank': dist.get_global_rank(),
'local_rank': dist.get_local_rank(),
}

def _get_docker_info(self) -> Optional[dict[str, Any]]:
if 'RUN_NAME' not in os.environ:
return None
run = sdk.get_run(os.environ['RUN_NAME'])
image, tag = run.image.split(':')
return {
'image': image,
'tag': tag,
}

def fit_start(self, state: State, logger: Logger) -> None:
# Collect environment data
if self.log_git:
self.env_data['git_info'] = {}
for folder in os.listdir(self.workspace_dir):
path = self._get_git_info(
os.path.join(self.workspace_dir, folder),
)
if path:
self.env_data['git_info'][folder] = path

if self.log_packages:
self.env_data['package_versions'] = {
pkg: self._get_package_version(pkg)
for pkg in self.packages_to_log
}
if self.log_nvidia:
self.env_data['nvidia'] = self._get_nvidia_info()

if self.log_docker:
if docker_info := self._get_docker_info():
self.env_data['docker'] = docker_info

if self.log_system:
self.env_data['system_info'] = self._get_system_info()

if self.log_gpu:
self.env_data['gpu_info'] = self._get_gpu_info()

if self.log_distributed:
self.env_data['distributed_info'] = self._get_distributed_info()

# Log the collected data
logger.log_hyperparameters({'environment_data': self.env_data})
10 changes: 5 additions & 5 deletions llmfoundry/callbacks/eval_gauntlet_callback.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import logging
import math
from enum import Enum
from typing import Dict, Optional
from typing import Optional

from composer.core import Callback, State
from composer.loggers import Logger
Expand All @@ -23,8 +23,8 @@ class Weighting(Enum):


def calculate_named_averages(
average_names: Dict[str, list],
category_scores: Dict[str, float],
average_names: dict[str, list],
category_scores: dict[str, float],
):
"""Calculates the named averages based off the raw category scores.
Expand Down Expand Up @@ -144,7 +144,7 @@ def __init__(
f'Found average name `{avg_name}` used as category name. Average names and category names must be non-overlapping.',
)

def extract_metrics_from_state(self, state: State) -> Dict[str, float]:
def extract_metrics_from_state(self, state: State) -> dict[str, float]:
results = {}

for key in self.logger_keys:
Expand All @@ -169,7 +169,7 @@ def extract_metrics_from_state(self, state: State) -> Dict[str, float]:

return {k: sum(v) / len(v) for k, v in results.items()}

def eval_after_all(self, state: State, logger: Logger) -> Dict[str, float]:
def eval_after_all(self, state: State, logger: Logger) -> dict[str, float]:
computed_metrics = self.extract_metrics_from_state(state)
if len(computed_metrics) == 0:
return {}
Expand Down
Loading

0 comments on commit a2f7ace

Please sign in to comment.