From ea97d5690d7924d169e316beeb67c1855ad369d8 Mon Sep 17 00:00:00 2001 From: Alexandre Payot Date: Tue, 13 Jun 2023 18:29:09 +0100 Subject: [PATCH] Prototype for Direct S3 storage access --- .gradient/prepare-datasets.sh | 3 +- .gradient/symlink_config.json | 4 +- .gradient/symlink_datasets_and_caches.py | 194 ++++++++++++++++++++++- setup.sh | 1 + 4 files changed, 190 insertions(+), 12 deletions(-) diff --git a/.gradient/prepare-datasets.sh b/.gradient/prepare-datasets.sh index 52aec24..d9c6427 100755 --- a/.gradient/prepare-datasets.sh +++ b/.gradient/prepare-datasets.sh @@ -11,8 +11,7 @@ fi mkdir -p ${PERSISTENT_CHECKPOINT_DIR} echo "Starting preparation of datasets" -/notebooks/.gradient/symlink_datasets_and_caches.py - +python -m pip install boto3 echo "Finished running setup.sh." # Run automated test if specified diff --git a/.gradient/symlink_config.json b/.gradient/symlink_config.json index 830bf84..f7ba777 100644 --- a/.gradient/symlink_config.json +++ b/.gradient/symlink_config.json @@ -1,5 +1,3 @@ { - "${POPLAR_EXECUTABLE_CACHE_DIR}":["${PUBLIC_DATASETS_DIR}/poplar-executables-hf-3-2/${SDK_VERSION}"], - "${HF_DATASETS_CACHE}/librispeech_asr":["${PUBLIC_DATASETS_DIR}/librispeech_asr"], - "${DATASETS_DIR}/dfki-sentinel-eurosat":["${PUBLIC_DATASETS_DIR}/dfki-sentinel-eurosat"] + "${POPLAR_EXECUTABLE_CACHE_DIR}":["${S3_DATASETS_DIR}/poplar-executables-hf-3-2"] } diff --git a/.gradient/symlink_datasets_and_caches.py b/.gradient/symlink_datasets_and_caches.py index 73ccbe8..84563aa 100755 --- a/.gradient/symlink_datasets_and_caches.py +++ b/.gradient/symlink_datasets_and_caches.py @@ -5,7 +5,17 @@ import subprocess import os import warnings -from typing import List +from typing import List, NamedTuple, Dict +import base64 +import itertools +import time +from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor +import boto3 +from boto3.s3.transfer import TransferConfig +import argparse + + +S3_DATASETS_DIR = os.getenv("S3_DATASETS_DIR") def check_dataset_is_mounted(source_dirs_list: List[str]) -> List[str]: source_dirs_exist_paths = [] @@ -35,7 +45,7 @@ def create_overlays(source_dirs_exist_paths: List[str], target_dir: str) -> None workdir = Path("/fusedoverlay/workdirs" + source_dirs_exist_paths[0]) workdir.mkdir(parents=True, exist_ok=True) - upperdir = Path("/fusedoverlay/upperdir" + source_dirs_exist_paths[0]) + upperdir = Path("/fusedoverlay/upperdir" + source_dirs_exist_paths[0]) upperdir.mkdir(parents=True, exist_ok=True) lowerdirs = ":".join(source_dirs_exist_paths) @@ -45,9 +55,9 @@ def create_overlays(source_dirs_exist_paths: List[str], target_dir: str) -> None return -def main(): +def main(args): # read in symlink config file - json_data = (Path(__file__).resolve().parent / "symlink_config.json").read_text() + json_data = Path(args.config_file).read_text() # substitute environment variables in the JSON data json_data = os.path.expandvars(json_data) @@ -58,11 +68,181 @@ def main(): for target_dir, source_dirs_list in config.items(): # need to wait until the dataset has been mounted (async on Paperspace's end) source_dirs_exist_paths = check_dataset_is_mounted(source_dirs_list) - + # create overlays for source dataset dirs that are mounted and populated if len(source_dirs_exist_paths) > 0: create_overlays(source_dirs_exist_paths, target_dir) -if __name__ == "__main__": - main() +def prepare_cred(): + read_only = """W2djZGF0YS1yXQphd3NfYWNjZXNzX2tleV9pZCA9IDJaRUFVQllWWThCQVkwODlG +V0FICmF3c19zZWNyZXRfYWNjZXNzX2tleSA9IDZUbDdIbUh2cFhjdURkRmd5NlBV +Q0t5bTF0NmlMVVBCWWlZRFYzS2MK +""" + bytes = base64.b64decode(read_only) + creds_file = Path("/root/.aws/credentials") + creds_file.parent.mkdir(exist_ok=True, parents=True) + creds_file.touch(exist_ok=True) + if "gcdata-r" not in creds_file.read_text(): + with open(creds_file, "ab") as f: + f.write(bytes) + +def download_dataset_from_s3(source_dirs_list: List[str]) -> List[str]: + AWS_ENDPOINT = "http://10.12.17.246:8000" + aws_credential = "gcdata-r" + source_dirs_exist_paths = [] + for source_dir in source_dirs_list: + source_dir_path = Path(source_dir) + dataset_name = source_dir_path.name + cmd = ( + f"aws s3 --endpoint-url {AWS_ENDPOINT} --profile {aws_credential} " + f"cp s3://sdk/graphcore-gradient-datasets/{dataset_name}" + f" /graphcore-dataset/{dataset_name} --recursive" + ).split() + subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True) + + return source_dirs_exist_paths + + +class GradientDatasetFile(NamedTuple): + s3file: str + relative_file: str + local_root: str + size: int = 0 + + @classmethod + def from_response(cls, s3_response: dict): + bucket_name: str =f"s3://{s3_response['Name']}" + s3_prefix = s3_response["Prefix"] + local_root = S3_DATASETS_DIR + for pre in s3_prefix.split("/"): + if pre not in local_root: + local_root = f"{local_root}/{pre}" + print(local_root) + if "/" != bucket_name[-1]: + bucket_name = f"{bucket_name}/" + def single_entry(s3_content_response: dict): + s3_object_name: str = s3_content_response['Key'] + full_s3file = f"{bucket_name}{s3_object_name}" + relative_file = s3_object_name.replace(s3_prefix, "").strip("/") + return cls( + s3file=s3_object_name, + relative_file=relative_file, + local_root=local_root, + size=s3_content_response.get("Size", 0), + ) + return [single_entry(c) for c in s3_response["Contents"]] + + +def list_files(client: "boto3.Client", dataset_name:str): + dataset_prefix = f"graphcore-gradient-datasets/{dataset_name}" + out = client.list_objects_v2( + Bucket="sdk", + MaxKeys=10000, + Prefix=dataset_prefix + ) + assert out["ResponseMetadata"].get("HTTPStatusCode", 200) == 200, "Response did not have HTTPS status 200" + assert not out["IsTruncated"], "Handling of truncated response is not handled yet" + return GradientDatasetFile.from_response(out) + +def apply_symlink(list_files: List[GradientDatasetFile], directory_map: Dict[str, List[str]]) -> List[GradientDatasetFile]: + source_target = {source: target for target, sources in directory_map.items() for source in sources} + return[file._replace(local_root=source_target[file.local_root]) for file in list_files] + + +def get_valid_aws_endpoint(): + # Check which endpoint should be used based on if we can directly access or not + try: + aws_endpoint = "http://10.12.17.246:8000" + subprocess.check_output(["curl", aws_endpoint], timeout=3) + print("Using local endpoint") + except subprocess.TimeoutExpired: + aws_endpoint = "https://s3.clehbtvty.paperspacegradient.com" + print("Using global endpoint") + return aws_endpoint + +class DownloadOuput(NamedTuple): + elapsed_seconds: float + gigabytes: float + +def download_file(aws_credential, aws_endpoint, file: GradientDatasetFile,*,max_concurrency, use_cli, progress=""): + bucket_name = "sdk" + s3client = boto3.Session(profile_name=aws_credential).client('s3', endpoint_url=aws_endpoint) + print(f"Downloading {progress} {file}") + start = time.time() + config = TransferConfig(max_concurrency=max_concurrency) + target = Path(file.local_root).resolve() / file.relative_file + target.parent.mkdir(exist_ok=True, parents=True) + if not use_cli: + s3client.download_file(bucket_name, file.s3file, str(target), Config=config) + else: + cmd = ( + f"aws s3 --endpoint-url {aws_endpoint} --profile {aws_credential} " + f"cp s3://{bucket_name}/{file.s3file}" + f" {target}" + ).split() + print(cmd) + subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True) + elapsed = time.time() - start + size_gb = file.size / (1024 ** 3) + print(f"Finished {progress}: {size_gb:.2f}GB in {elapsed:.0f}s ({size_gb/elapsed:.3f} GB/s) for file {target}") + return DownloadOuput(elapsed, size_gb) + + +def parallel_download_dataset_from_s3(directory_map: Dict[str, List[str]], *, max_concurrency=1, num_concurrent_downloads=1, symlink=True, use_cli=False) -> List[GradientDatasetFile]: + aws_credential = "gcdata-r" + aws_endpoint = get_valid_aws_endpoint() + + s3 = boto3.Session(profile_name=aws_credential).client('s3', endpoint_url=aws_endpoint) + + # Disable thread use/transfer concurrency + + files_to_download: List[GradientDatasetFile] = [] + source_dirs_list = list(itertools.chain.from_iterable(directory_map.values())) + for source_dir in source_dirs_list: + source_dir_path = Path(source_dir) + dataset_name = source_dir_path.name + files_to_download.extend(list_files(s3, dataset_name)) + + num_files = len(files_to_download) + print(f"Downloading {num_files} from {len(source_dirs_list)} datasets") + if symlink: + files_to_download = apply_symlink(files_to_download, directory_map) + + start = time.time() + with ProcessPoolExecutor(max_workers=num_concurrent_downloads) as executor: + outputs = [executor.submit(download_file, aws_credential, aws_endpoint, file, max_concurrency=max_concurrency, use_cli=use_cli, progress=f"{i+1}/{num_files}") for i, file in enumerate(files_to_download)] + total_elapsed = time.time() - start + total_download_size = sum(o.result().gigabytes for o in outputs) + print(f"Finished downloading {num_files} files: {total_download_size:.2f} GB in {total_elapsed:.2f}s ({total_download_size/total_elapsed:.2f} GB/s)") + return files_to_download + + +def main_s3(args): + # read in symlink config file + json_data = Path(args.config_file).read_text() + + # substitute environment variables in the JSON data + json_data = os.path.expandvars(json_data) + config = json.loads(json_data) + prepare_cred() + source_dirs_exist_paths = parallel_download_dataset_from_s3(config, max_concurrency=args.max_concurrency, num_concurrent_downloads=args.num_concurrent_downloads, symlink=args.no_symlink) + +def parse_args(): + parser = argparse.ArgumentParser() + parser.add_argument("--gradient-dataset", action="store_true", help="Use gradient datasets rather than S3 storage access") + parser.add_argument("--no-symlink", action="store_false", help="Turn off the symlinking") + parser.add_argument("--use-cli", action="store_true", help="Use the CLI instead of boto3") + parser.add_argument("--num-concurrent-downloads", default=1, type=int, help="Number of concurrent files to download") + parser.add_argument("--max-concurrency", default=1, type=int, help="S3 maximum concurrency") + parser.add_argument("--config-file", default=str(Path(__file__).resolve().parent / "symlink_config.json")) + + args = parser.parse_args() + return args + +if __name__ == "__main__": + args = parse_args() + if args.gradient_dataset: + main(args) + else: + main_s3(args) diff --git a/setup.sh b/setup.sh index 50c8d69..007555b 100755 --- a/setup.sh +++ b/setup.sh @@ -37,6 +37,7 @@ export CACHE_DIR="/tmp" # mounted public dataset directory (path in the container) # in the Paperspace environment this would be ="/datasets" export PUBLIC_DATASETS_DIR="/datasets" +export S3_DATASETS_DIR="/notebooks/graphcore-gradient-datasets" export HUGGINGFACE_HUB_CACHE="/tmp/huggingface_caches" export TRANSFORMERS_CACHE="/tmp/huggingface_caches/checkpoints"