From aeef34552dfef9c4c138b276f1d7514506cab1bb Mon Sep 17 00:00:00 2001 From: Tim Semenov Date: Mon, 2 Sep 2024 04:20:13 -0700 Subject: [PATCH] Add RedPajama-V2 dataset to TFDS. PiperOrigin-RevId: 670167688 --- tensorflow_datasets/core/dataset_builder.py | 2 +- .../core/download/download_manager.py | 264 +++++++++--------- .../core/download/downloader.py | 26 +- tensorflow_datasets/core/download/resource.py | 11 +- .../core/download/resource_test.py | 6 +- .../datasets/redpajama_v2/CITATIONS.bib | 7 + .../datasets/redpajama_v2/README.md | 1 + .../datasets/redpajama_v2/TAGS.txt | 9 + .../datasets/redpajama_v2/__init__.py | 15 + .../datasets/redpajama_v2/checksums.tsv | 3 + .../redpajama_v2/dummy_data/documents.json.gz | Bin 0 -> 924 bytes .../redpajama_v2/dummy_data/missing_urls.txt | 0 .../redpajama_v2_dataset_builder.py | 230 +++++++++++++++ .../redpajama_v2_dataset_builder_test.py | 42 +++ 14 files changed, 461 insertions(+), 155 deletions(-) create mode 100644 tensorflow_datasets/datasets/redpajama_v2/CITATIONS.bib create mode 100644 tensorflow_datasets/datasets/redpajama_v2/README.md create mode 100644 tensorflow_datasets/datasets/redpajama_v2/TAGS.txt create mode 100644 tensorflow_datasets/datasets/redpajama_v2/__init__.py create mode 100644 tensorflow_datasets/datasets/redpajama_v2/checksums.tsv create mode 100644 tensorflow_datasets/datasets/redpajama_v2/dummy_data/documents.json.gz create mode 100644 tensorflow_datasets/datasets/redpajama_v2/dummy_data/missing_urls.txt create mode 100644 tensorflow_datasets/datasets/redpajama_v2/redpajama_v2_dataset_builder.py create mode 100644 tensorflow_datasets/datasets/redpajama_v2/redpajama_v2_dataset_builder_test.py diff --git a/tensorflow_datasets/core/dataset_builder.py b/tensorflow_datasets/core/dataset_builder.py index f38bfe6a6f0..5262d6f0c32 100644 --- a/tensorflow_datasets/core/dataset_builder.py +++ b/tensorflow_datasets/core/dataset_builder.py @@ -1315,7 +1315,7 @@ def _make_download_manager( ) return download.DownloadManager( - download_dir=download_dir, + download_dir=download_dir / self.name, extract_dir=extract_dir, manual_dir=manual_dir, url_infos=self.url_infos, diff --git a/tensorflow_datasets/core/download/download_manager.py b/tensorflow_datasets/core/download/download_manager.py index a5a1bf284b9..43221d00c4d 100644 --- a/tensorflow_datasets/core/download/download_manager.py +++ b/tensorflow_datasets/core/download/download_manager.py @@ -316,9 +316,6 @@ def downloaded_size(self): """Returns the total size of downloaded files.""" return sum(url_info.size for url_info in self._recorded_url_infos.values()) - def _get_dl_path(self, url: str, sha256: str) -> epath.Path: - return self._download_dir / resource_lib.get_dl_fname(url, sha256) - @property def register_checksums(self): """Returns whether checksums are being computed and recorded to file.""" @@ -341,7 +338,7 @@ def _download(self, resource: Url) -> promise.Promise[epath.Path]: This function: - 1. Reuse cache (`_get_cached_path`) or download the file + 1. Reuse cache (`downloader.get_cached_path`) or download the file 2. Register or validate checksums (`_register_or_validate_checksums`) 3. Rename download to final path (`_rename_and_get_final_dl_path`) @@ -352,37 +349,39 @@ def _download(self, resource: Url) -> promise.Promise[epath.Path]: path: The path to the downloaded resource. """ # Normalize the input - if isinstance(resource, str): - url = resource - else: - url = resource.url + if not isinstance(resource, resource_lib.Resource): + resource = resource_lib.Resource(url=resource) + url = resource.url assert url is not None, 'URL is undefined from resource.' - expected_url_info = self._url_infos.get(url) + registered_url_info = self._url_infos.get(url) # 3 possible destinations for the path: # * In `manual_dir` (manually downloaded data) - # * In `downloads/url_path` (checksum unknown) - # * In `downloads/checksum_path` (checksum registered) + # * In `downloads/unregistered_path` (checksum unknown) + # * In `downloads/registered_path` (checksum registered) manually_downloaded_path = _get_manually_downloaded_path( manual_dir=self._manual_dir, - expected_url_info=expected_url_info, + url_info=registered_url_info, ) - url_path = self._get_dl_path( - url, sha256=hashlib.sha256(url.encode('utf-8')).hexdigest() - ) - checksum_path = ( - self._get_dl_path(url, sha256=expected_url_info.checksum) - if expected_url_info - else None + download_dir = self._download_dir / resource.relative_download_dir + download_dir.mkdir(parents=True, exist_ok=True) + unregistered_path = download_dir / resource_lib.get_dl_fname( + url=url, checksum=hashlib.sha256(url.encode('utf-8')).hexdigest() ) + if registered_url_info: + registered_path = download_dir / resource_lib.get_dl_fname( + url=url, checksum=registered_url_info.checksum + ) + else: + registered_path = None # Get the cached path and url_info (if they exists) dl_result = downloader.get_cached_path( manually_downloaded_path=manually_downloaded_path, - checksum_path=checksum_path, - url_path=url_path, - expected_url_info=expected_url_info, + registered_path=registered_path, + unregistered_path=unregistered_path, + registered_url_info=registered_url_info, ) if dl_result.path and not self._force_download: # Download was cached logging.info( @@ -394,8 +393,10 @@ def _download(self, resource: Url) -> promise.Promise[epath.Path]: else: # Download in an empty tmp directory (to avoid name collisions) # `download_tmp_dir` is cleaned-up in `_rename_and_get_final_dl_path` - dirname = f'{resource_lib.get_dl_dirname(url)}.tmp.{uuid.uuid4().hex}' - download_tmp_dir = self._download_dir / dirname + download_tmp_dir = ( + unregistered_path.parent + / f'{unregistered_path.name}.tmp.{uuid.uuid4().hex}' + ) download_tmp_dir.mkdir() logging.info(f'Downloading {url} into {download_tmp_dir}...') future = self._downloader.download( @@ -403,121 +404,155 @@ def _download(self, resource: Url) -> promise.Promise[epath.Path]: ) # Post-process the result - return future.then( - lambda dl_result: self._register_or_validate_checksums( # pylint: disable=g-long-lambda - url=url, - path=dl_result.path, - computed_url_info=dl_result.url_info, - expected_url_info=expected_url_info, - checksum_path=checksum_path, - url_path=url_path, - ) - ) + def callback(dl_result: downloader.DownloadResult) -> epath.Path: + return self._register_or_validate_checksums( + url=url, + dl_url_info=dl_result.url_info, + registered_url_info=registered_url_info, + dl_path=dl_result.path, + registered_path=registered_path, + unregistered_path=unregistered_path, + ) + + return future.then(callback) def _register_or_validate_checksums( self, - path: epath.Path, url: str, - expected_url_info: checksums.UrlInfo | None, - computed_url_info: checksums.UrlInfo | None, - checksum_path: epath.Path | None, - url_path: epath.Path, + dl_url_info: checksums.UrlInfo | None, + registered_url_info: checksums.UrlInfo | None, + dl_path: epath.Path, + registered_path: epath.Path | None, + unregistered_path: epath.Path, ) -> epath.Path: """Validates/records checksums and renames final downloaded path.""" - # `path` can be: - # * Manually downloaded - # * (cached) checksum_path - # * (cached) url_path - # * `tmp_dir/file` (downloaded path) - - if computed_url_info: + if dl_url_info: # Used both in `.downloaded_size` and `_record_url_infos()` - self._recorded_url_infos[url] = computed_url_info + self._recorded_url_infos[url] = dl_url_info if self._register_checksums: - if not computed_url_info: + if not dl_url_info: raise ValueError( f'Cannot register checksums for {url}: no computed checksum. ' '--register_checksums with manually downloaded data not supported.' ) # Note: - # * We save even if `expected_url_info == computed_url_info` as - # `expected_url_info` might have been loaded from another dataset. + # * We save even if `registered_url_info == dl_url_info` as + # `registered_url_info` might have been loaded from another dataset. # * `register_checksums_path` was validated in `__init__` so this # shouldn't fail. self._record_url_infos() # Checksum path should now match the new registered checksum (even if # checksums were previously registered) - expected_url_info = computed_url_info - checksum_path = self._get_dl_path(url, computed_url_info.checksum) + registered_url_info = dl_url_info + registered_path = unregistered_path.parent / resource_lib.get_dl_fname( + url, dl_url_info.checksum + ) else: # Eventually validate checksums # Note: - # * If path is cached at `url_path` but cached - # `computed_url_info != expected_url_info`, a new download has - # been triggered (as _get_cached_path returns None) + # * If path is cached at `unregistered_path` but + # `dl_url_info != registered_url_info`, a new download has + # been triggered (as `downloader.get_cached_path` returns None) # * If path was downloaded but checksums don't match expected, then # the download isn't cached (re-running build will retrigger a new # download). This is expected as it might mean the downloaded file # was corrupted. Note: The tmp file isn't deleted to allow inspection. - _validate_checksums( + self._validate_checksums( url=url, - path=path, - expected_url_info=expected_url_info, - computed_url_info=computed_url_info, - force_checksums_validation=self._force_checksums_validation, + dl_url_info=dl_url_info, + registered_url_info=registered_url_info, + dl_path=dl_path, ) return self._rename_and_get_final_dl_path( url=url, - path=path, - expected_url_info=expected_url_info, - computed_url_info=computed_url_info, - checksum_path=checksum_path, - url_path=url_path, + dl_url_info=dl_url_info, + registered_url_info=registered_url_info, + dl_path=dl_path, + registered_path=registered_path, + unregistered_path=unregistered_path, ) + def _validate_checksums( + self, + url: str, + dl_url_info: checksums.UrlInfo | None, + registered_url_info: checksums.UrlInfo | None, + dl_path: epath.Path, + ) -> None: + """Validate cached_url_info match url_info.""" + # If force-checksums validations, both downloaded and registered url_info + # should exists + if self._force_checksums_validation: + # Checksum of the downloaded file unknown (for manually downloaded file) + if not dl_url_info: + dl_url_info = checksums.compute_url_info(dl_path) + # Checksums have not been registered + if not registered_url_info: + raise ValueError( + f'Missing checksums url: {url}, yet ' + '`force_checksums_validation=True`. ' + 'Did you forget to register checksums?' + ) + + if ( + registered_url_info + and dl_url_info + and registered_url_info != dl_url_info + ): + msg = ( + f'Artifact {url}, downloaded to {dl_path}, has wrong checksum:\n' + f'* Expected: {registered_url_info}\n' + f'* Got: {dl_url_info}\n' + 'To debug, see: ' + 'https://www.tensorflow.org/datasets/overview#fixing_nonmatchingchecksumerror' + ) + raise NonMatchingChecksumError(msg) + def _rename_and_get_final_dl_path( self, url: str, - path: epath.Path, - expected_url_info: checksums.UrlInfo | None, - computed_url_info: checksums.UrlInfo | None, - checksum_path: epath.Path | None, - url_path: epath.Path, + dl_url_info: checksums.UrlInfo | None, + registered_url_info: checksums.UrlInfo | None, + dl_path: epath.Path, + registered_path: epath.Path | None, + unregistered_path: epath.Path, ) -> epath.Path: """Eventually rename the downloaded file if checksums were recorded.""" - # `path` can be: - # * Manually downloaded - # * (cached) checksum_path - # * (cached) url_path - # * `tmp_dir/file` (downloaded path) - if self._manual_dir and path.is_relative_to(self._manual_dir): - return path # Manually downloaded data - elif path == checksum_path: # Path already at final destination - assert computed_url_info == expected_url_info # Sanity check - return checksum_path # pytype: disable=bad-return-type - elif path == url_path: - if checksum_path: + # Manually downloaded data + if self._manual_dir and dl_path.is_relative_to(self._manual_dir): + return dl_path + + # Cached at the final destination + elif dl_path == registered_path: + assert dl_url_info == registered_url_info # Sanity check + return dl_path + + # Cached at the tmp destination + elif dl_path == unregistered_path: + if registered_path: # Checksums were registered: Rename -> checksums_path - resource_lib.replace_info_file(path, checksum_path) - return path.replace(checksum_path) + resource_lib.replace_info_file(dl_path, registered_path) + return dl_path.replace(registered_path) else: # Checksums not registered: -> do nothing - return path - else: # Path was downloaded in tmp dir - dst_path = checksum_path or url_path + return dl_path + + # Downloaded at the tmp destination + else: + path = registered_path or unregistered_path resource_lib.write_info_file( url=url, - path=dst_path, + path=path, dataset_name=self._dataset_name, - original_fname=path.name, - url_info=computed_url_info, + original_fname=dl_path.name, + url_info=dl_url_info, ) - path.replace(dst_path) - path.parent.rmdir() # Cleanup tmp dir (will fail if dir not empty) - return dst_path + dl_path.replace(path) + dl_path.parent.rmdir() # Cleanup tmp dir (will fail if dir not empty) + return path @utils.build_synchronize_decorator() @utils.memoize() @@ -711,59 +746,22 @@ def manual_dir(self) -> epath.Path: def _get_manually_downloaded_path( manual_dir: epath.Path | None, - expected_url_info: checksums.UrlInfo | None, + url_info: checksums.UrlInfo | None, ) -> epath.Path | None: """Checks if file is already downloaded in manual_dir.""" if not manual_dir: # Manual dir not passed return None - if not expected_url_info or not expected_url_info.filename: + if not url_info or not url_info.filename: return None # Filename unknown. - manual_path = manual_dir / expected_url_info.filename + manual_path = manual_dir / url_info.filename if not manual_path.exists(): # File not manually downloaded return None return manual_path -def _validate_checksums( - url: str, - path: epath.Path, - computed_url_info: checksums.UrlInfo | None, - expected_url_info: checksums.UrlInfo | None, - force_checksums_validation: bool, -) -> None: - """Validate computed_url_info match expected_url_info.""" - # If force-checksums validations, both expected and computed url_info - # should exists - if force_checksums_validation: - # Checksum of the downloaded file unknown (for manually downloaded file) - if not computed_url_info: - computed_url_info = checksums.compute_url_info(path) - # Checksums have not been registered - if not expected_url_info: - raise ValueError( - f'Missing checksums url: {url}, yet ' - '`force_checksums_validation=True`. ' - 'Did you forget to register checksums?' - ) - - if ( - expected_url_info - and computed_url_info - and expected_url_info != computed_url_info - ): - msg = ( - f'Artifact {url}, downloaded to {path}, has wrong checksum:\n' - f'* Expected: {expected_url_info}\n' - f'* Got: {computed_url_info}\n' - 'To debug, see: ' - 'https://www.tensorflow.org/datasets/overview#fixing_nonmatchingchecksumerror' - ) - raise NonMatchingChecksumError(msg) - - def _map_promise(map_fn, all_inputs): """Map the function into each element and resolve the promise.""" all_promises = tree.map_structure(map_fn, all_inputs) # Apply the function diff --git a/tensorflow_datasets/core/download/downloader.py b/tensorflow_datasets/core/download/downloader.py index 3faa0deda70..112a6ab5079 100644 --- a/tensorflow_datasets/core/download/downloader.py +++ b/tensorflow_datasets/core/download/downloader.py @@ -77,9 +77,9 @@ def _read_url_info(url_path: epath.Path) -> checksums_lib.UrlInfo: def get_cached_path( manually_downloaded_path: epath.Path | None, - checksum_path: epath.Path | None, - url_path: epath.Path, - expected_url_info: checksums_lib.UrlInfo | None, + registered_path: epath.Path | None, + unregistered_path: epath.Path, + registered_url_info: checksums_lib.UrlInfo | None, ) -> DownloadResult: """Returns the downloaded path and computed url-info. @@ -90,29 +90,31 @@ def get_cached_path( Args: manually_downloaded_path: Manually downloaded in `dl_manager.manual_dir` - checksum_path: Cached in the final destination (if checksum known) - url_path: Cached in the tmp destination (if checksum unknown). - expected_url_info: Registered checksum (if known) + registered_path: Cached at the final destination (if checksum known) + unregistered_path: Cached at the tmp destination (if checksum unknown). + registered_url_info: Registered checksum (if known) """ # User has manually downloaded the file. if manually_downloaded_path and manually_downloaded_path.exists(): return DownloadResult(path=manually_downloaded_path, url_info=None) # Download has been cached (checksum known) - elif checksum_path and resource_lib.Resource.exists_locally(checksum_path): + elif registered_path and resource_lib.Resource.exists_locally( + registered_path + ): # `path = f(checksum)` was found, so url_info match - return DownloadResult(checksum_path, url_info=expected_url_info) + return DownloadResult(path=registered_path, url_info=registered_url_info) # Download has been cached (checksum unknown) - elif resource_lib.Resource.exists_locally(url_path): + elif resource_lib.Resource.exists_locally(unregistered_path): # Info restored from `.INFO` file - computed_url_info = _read_url_info(url_path) + url_info = _read_url_info(unregistered_path) # If checksums are now registered but do not match, trigger a new # download (e.g. previous file corrupted, checksums updated) - if expected_url_info and computed_url_info != expected_url_info: + if registered_url_info and url_info != registered_url_info: return DownloadResult(path=None, url_info=None) else: - return DownloadResult(path=url_path, url_info=computed_url_info) + return DownloadResult(path=unregistered_path, url_info=url_info) # Else file not found (or has bad checksums). (re)download. else: diff --git a/tensorflow_datasets/core/download/resource.py b/tensorflow_datasets/core/download/resource.py index 545f842ae85..2c7266ab82e 100644 --- a/tensorflow_datasets/core/download/resource.py +++ b/tensorflow_datasets/core/download/resource.py @@ -19,7 +19,6 @@ import codecs from collections.abc import Mapping import enum -import hashlib import itertools import json import os @@ -191,12 +190,6 @@ def get_dl_fname(url: str, checksum: str) -> str: return f'{name}{checksum}{extension}' -def get_dl_dirname(url: str) -> str: - """Returns name of temp dir for given url.""" - checksum = hashlib.sha256(url.encode()).hexdigest() - return get_dl_fname(url, checksum) - - def _get_info_path(path: epath.Path) -> epath.Path: """Returns path of INFO file associated with resource at path.""" return path.with_suffix(path.suffix + '.INFO') @@ -290,6 +283,7 @@ def __init__( url: str | None = None, extract_method: ExtractMethod | None = None, path: epath.PathLike | None = None, + relative_download_dir: epath.PathLike = '', ): """Resource constructor. @@ -299,10 +293,13 @@ def __init__( set, will be guessed from downloaded file name `original_fname`. path: Path of resource on local disk. Can be None if resource has not be downloaded yet. In such case, `url` must be set. + relative_download_dir: Optional directory for downloading relative to + `download_dir`. """ self.url = url self._extract_method = extract_method self.path: epath.Path = epath.Path(path) if path else None # pytype: disable=annotation-type-mismatch # attribute-variable-annotations + self.relative_download_dir = relative_download_dir @classmethod def exists_locally(cls, path: epath.Path) -> bool: diff --git a/tensorflow_datasets/core/download/resource_test.py b/tensorflow_datasets/core/download/resource_test.py index 42eea8ef6b8..c63cc53ec21 100644 --- a/tensorflow_datasets/core/download/resource_test.py +++ b/tensorflow_datasets/core/download/resource_test.py @@ -13,7 +13,8 @@ # See the License for the specific language governing permissions and # limitations under the License. -"""Tests for resource module.""" +import hashlib + from tensorflow_datasets import testing from tensorflow_datasets.core.download import resource @@ -85,7 +86,8 @@ class DlDirNameTest(testing.TestCase): def test_(self): for url, expected in zip(self.urls, self.expected): - res = resource.get_dl_dirname(url) + checksum = hashlib.sha256(url.encode()).hexdigest() + res = resource.get_dl_fname(url, checksum) self.assertEqual(res, expected) diff --git a/tensorflow_datasets/datasets/redpajama_v2/CITATIONS.bib b/tensorflow_datasets/datasets/redpajama_v2/CITATIONS.bib new file mode 100644 index 00000000000..43d20215633 --- /dev/null +++ b/tensorflow_datasets/datasets/redpajama_v2/CITATIONS.bib @@ -0,0 +1,7 @@ +@software{together2023redpajama, + author = {Together Computer}, + title = {RedPajama: an Open Dataset for Training Large Language Models}, + month = October, + year = 2023, + url = {https://github.com/togethercomputer/RedPajama-Data} +} diff --git a/tensorflow_datasets/datasets/redpajama_v2/README.md b/tensorflow_datasets/datasets/redpajama_v2/README.md new file mode 100644 index 00000000000..e23f7cc8464 --- /dev/null +++ b/tensorflow_datasets/datasets/redpajama_v2/README.md @@ -0,0 +1 @@ +RedPajama V2: an Open Dataset for Training Large Language Models diff --git a/tensorflow_datasets/datasets/redpajama_v2/TAGS.txt b/tensorflow_datasets/datasets/redpajama_v2/TAGS.txt new file mode 100644 index 00000000000..ff8afee7b7b --- /dev/null +++ b/tensorflow_datasets/datasets/redpajama_v2/TAGS.txt @@ -0,0 +1,9 @@ +content.data-type.text # Contains text data. +content.language.de # Contains text in language German / de. +content.language.en # Contains text in language English / en. +content.language.es # Contains text in language Spanish / es. +content.language.fr # Contains text in language French / fr. +content.language.it # Contains text in language Italian / it. +content.multilingual # Contains text in multiple natural languages. +ml.task.language-modelling # Relates to Language Modelling, a machine learning task. +ml.task.text-generation # Relates to Text Generation, a machine learning task. diff --git a/tensorflow_datasets/datasets/redpajama_v2/__init__.py b/tensorflow_datasets/datasets/redpajama_v2/__init__.py new file mode 100644 index 00000000000..5310ec58c7d --- /dev/null +++ b/tensorflow_datasets/datasets/redpajama_v2/__init__.py @@ -0,0 +1,15 @@ +# coding=utf-8 +# Copyright 2024 The TensorFlow Datasets Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + diff --git a/tensorflow_datasets/datasets/redpajama_v2/checksums.tsv b/tensorflow_datasets/datasets/redpajama_v2/checksums.tsv new file mode 100644 index 00000000000..6229e755c9f --- /dev/null +++ b/tensorflow_datasets/datasets/redpajama_v2/checksums.tsv @@ -0,0 +1,3 @@ +# TODO(redpajamav2): If your dataset downloads files, then the checksums +# will be automatically added here when running +# `tfds build --register_checksums`. diff --git a/tensorflow_datasets/datasets/redpajama_v2/dummy_data/documents.json.gz b/tensorflow_datasets/datasets/redpajama_v2/dummy_data/documents.json.gz new file mode 100644 index 0000000000000000000000000000000000000000..72ce2cd9b5aa1a7d2661f7dfdee987aab7dfee10 GIT binary patch literal 924 zcmV;N17rLjiwFp164hn^17vSwb!}yCbaO6hb8l_{byVAq+cpq=?^g_*hrXDyBwN0{ z6y96oESfbo-Ue+|7&OI|SW~1Rj@D7y0{M$JK>HniZ~r7;(xL2lx4_o04T(H6bLQ|2 zf9^H4?#-cBAXf9iAPU2f@1?V)#6p?e1;UntI1Fcl!*@K5*}jm5wYI6*);h4dVT0+9OWjnd!sGSe9btFMl2k$XuJ+5^Jq99Cf|2J zl}m>n;R+tj*Q=}RyX1DVUf;cXb$zzHSY0HS@#S}y->w&n+pq6@h?XYDf9e3hEj^-R4xTyu0v-;F10L30fCtxnzY*4;G#%BKw7b~a z2G6YB`5(k@nr*3YT~L&*x-_X-HCwG*Av1U*E0pC{)-dgZR~VFm7o;z|C4Ir`3tnG1 z=nI{`K>7md3#2bmyK1xsr`}6Y4w7L6|NI8S=?p6MPHLCpV;`xxF%oL&Dr=kswD9S} zACKFgKKz;DHC0eQnG|mXT2Cw3L>cMPA@!q+v@kcM3+hX7fZ$~k3?r`33N1+~-VA(p>CW%;oSs+0_s=#EO*#-N{EB6b*uQ{F?!wuRSHX;^VrD^c(i&$P|GZQj~CTaiP$ z8KWb)vRd1{H{Ua*$%THhgxENlm&d^pG-#Uc6Xb*QbN0<*`I>bXN0bbXC^v|Pv&lG| zC2^EQaX>qJiT^=|qi7t42|cpVf0(LR6tZYEVIwvi22#X9=VilnKw67{G6ea1Psfsh z6|HYhw@N&Fx_r8Oq9pMf9<@=K2mTQEVc3VH592;e`hL<05z}g%{OF}01Ba;1B}e(^6)bK;WF1pokV?Ypi3 literal 0 HcmV?d00001 diff --git a/tensorflow_datasets/datasets/redpajama_v2/dummy_data/missing_urls.txt b/tensorflow_datasets/datasets/redpajama_v2/dummy_data/missing_urls.txt new file mode 100644 index 00000000000..e69de29bb2d diff --git a/tensorflow_datasets/datasets/redpajama_v2/redpajama_v2_dataset_builder.py b/tensorflow_datasets/datasets/redpajama_v2/redpajama_v2_dataset_builder.py new file mode 100644 index 00000000000..072aeab58f2 --- /dev/null +++ b/tensorflow_datasets/datasets/redpajama_v2/redpajama_v2_dataset_builder.py @@ -0,0 +1,230 @@ +# coding=utf-8 +# Copyright 2024 The TensorFlow Datasets Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""RedPajamaV2 dataset.""" + +import collections +from collections.abc import Iterator +import contextlib +import gzip +import json +import os +from typing import Any, TypedDict + +from etils import epath +from tensorflow_datasets.core.utils.lazy_imports_utils import parquet as pq +import tensorflow_datasets.public_api as tfds + + +class _Example(TypedDict): + raw_content: str + meta: dict[str, Any] + quality_signals: dict[str, Any] + + +_NUM_SHARDS = 5000 +_PARTITIONS = ['head', 'middle', 'tail'] +_LANGUAGES = ['en', 'de', 'fr', 'es', 'it'] + +_COMPONENTS_URL = 'https://data.together.xyz/redpajama-data-v2/v1.0.0' +_DOCS_COMPONENT = 'documents' +_SIGNALS_COMPONENT = 'signals' +_DUPLICATES_COMPONENT = 'duplicates' +_URL_TEMPLATE_BY_COMPONENT = { + _DOCS_COMPONENT: os.path.join( + _COMPONENTS_URL, 'documents/{base_tag}.json.gz' + ), + _SIGNALS_COMPONENT: os.path.join( + _COMPONENTS_URL, 'quality_signals/{base_tag}.signals.json.gz' + ), + _DUPLICATES_COMPONENT: os.path.join( + _COMPONENTS_URL, 'duplicates/{base_tag}.duplicates.parquet' + ), +} + +_URL = 'https://huggingface.co/datasets/togethercomputer/RedPajama-Data-V2/resolve/main' +_MISSING_URLS_TEMPLATE = os.path.join(_URL, 'urls/missing-{component}.txt') +_SNAPSHOTS_URL = os.path.join(_URL, '_CC_SNAPSHOT_IDS') + + +class Builder(tfds.core.GeneratorBasedBuilder): + """DatasetBuilder for RedPajamaV2 dataset.""" + + VERSION = tfds.core.Version('1.0.0') + RELEASE_NOTES = { + '1.0.0': 'Initial release.', + } + + def _info(self) -> tfds.core.DatasetInfo: + """Returns the dataset metadata.""" + return self.dataset_info_from_configs( + features=tfds.features.FeaturesDict({ + 'raw_content': tfds.features.Text(), + 'doc_id': tfds.features.Text(), + 'meta': tfds.features.Text(), + 'quality_signals': tfds.features.Text(), + }), + supervised_keys=None, + homepage='https://www.together.ai/blog/redpajama-data-v2', + ) + + def _split_generators(self, dl_manager: tfds.download.DownloadManager): + """Returns SplitGenerators.""" + snapshots_filepath = dl_manager.download_and_extract(_SNAPSHOTS_URL) + snapshots = epath.Path(snapshots_filepath).read_text().splitlines() + + base_tags = [] + for lang in _LANGUAGES: + for snapshot in snapshots: + for partition in _PARTITIONS: + for shard_idx in range(_NUM_SHARDS): + base_tags.append(f'{snapshot}/{shard_idx:04d}/{lang}_{partition}') + + resource_by_component_by_base_tag = collections.defaultdict(dict) + for component, url_template in _URL_TEMPLATE_BY_COMPONENT.items(): + missing_urls_filepath = dl_manager.download_and_extract( + _MISSING_URLS_TEMPLATE.format(component=component) + ) + missing_urls = set( + epath.Path(missing_urls_filepath).read_text().splitlines() + ) + + for base_tag in base_tags: + if (url := url_template.format(base_tag=base_tag)) not in missing_urls: + snapshot = base_tag.split('/', maxsplit=1)[0] + resource_by_component_by_base_tag[base_tag][component] = ( + tfds.download.Resource(url=url, relative_download_dir=snapshot) + ) + + filepath_by_component_by_base_tag = dl_manager.download( + resource_by_component_by_base_tag + ) + return { + 'train': self._generate_examples(filepath_by_component_by_base_tag), + } + + def _generate_examples(self, filepath_by_component_by_base_tag): + """Yields examples.""" + beam = tfds.core.lazy_imports.apache_beam + + def _process_base_tag(base_tag, filepath_by_component): + if not filepath_by_component[_DOCS_COMPONENT]: + return + + if ( + base_tag.endswith('_tail') + or not filepath_by_component[_SIGNALS_COMPONENT] + ): + generate_examples = _generate_tail + partition = 'tail' + duplicates = None + else: + generate_examples = _generate_head_middle + partition = 'head_middle' + + try: + with open_gzip( + filepath_by_component[_DUPLICATES_COMPONENT] + ) as duplicates_file: + duplicates = set( + pq.read_table( + duplicates_file, + columns=['doc_id'], + use_pandas_metadata=False, + )['doc_id'].to_pylist() + ) + except: # pylint: disable=bare-except + duplicates = set() + + for example_idx, example in generate_examples(filepath_by_component): + doc_id = f'{base_tag}.json.gz/{example_idx}' + + example['meta']['partition'] = partition + example['quality_signals']['is_duplicate'] = ( + doc_id in duplicates if duplicates else None + ) + + yield f'{base_tag}_{example_idx}', { + 'raw_content': example['raw_content'], + 'doc_id': doc_id, + 'meta': json.dumps(example['meta']), + 'quality_signals': json.dumps(example['quality_signals']), + } + + return beam.Create( + filepath_by_component_by_base_tag.items() + ) | beam.FlatMapTuple(_process_base_tag) + + +@contextlib.contextmanager +def open_gzip(filepath): + with epath.Path(filepath).open('rb') as f: + yield gzip.open(f) + + +def _generate_tail(filepath_by_component) -> Iterator[tuple[int, _Example]]: + """Yields examples for tail partitions.""" + try: + with open_gzip(filepath_by_component[_DOCS_COMPONENT]) as docs_file: + for idx, doc in enumerate(docs_file): + if example := _get_example(doc, None): + yield idx, example + except gzip.BadGzipFile: + # skip broken gzip files + return + + +def _generate_head_middle( + filepath_by_component, +) -> Iterator[tuple[int, _Example]]: + """Yields examples for head and middle partitions.""" + try: + with open_gzip( + filepath_by_component[_DOCS_COMPONENT] + ) as docs_file, open_gzip( + filepath_by_component[_SIGNALS_COMPONENT] + ) as signals_file: + for idx, (doc, signals) in enumerate(zip(docs_file, signals_file)): + if example := _get_example(doc, signals): + yield idx, example + except gzip.BadGzipFile: + # skip broken gzip files + return + + +def _get_example(doc: str, signals: str | None) -> _Example | None: + """Returns an example.""" + try: + doc = json.loads(doc) + signals = json.loads(signals) if signals else {} + + meta = { + key: doc[key] + for key in [ + 'url', + 'language', + 'source_domain', + 'date_download', + 'digest', + ] + } + + return { + 'raw_content': doc['raw_content'], + 'meta': meta, + 'quality_signals': signals.get('quality_signals', {}), + } + except: # pylint: disable=bare-except + return None diff --git a/tensorflow_datasets/datasets/redpajama_v2/redpajama_v2_dataset_builder_test.py b/tensorflow_datasets/datasets/redpajama_v2/redpajama_v2_dataset_builder_test.py new file mode 100644 index 00000000000..ed2a9e8ff02 --- /dev/null +++ b/tensorflow_datasets/datasets/redpajama_v2/redpajama_v2_dataset_builder_test.py @@ -0,0 +1,42 @@ +# coding=utf-8 +# Copyright 2024 The TensorFlow Datasets Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""RedPajamaV2 dataset.""" + +from tensorflow_datasets.datasets.redpajama_v2 import redpajama_v2_dataset_builder +import tensorflow_datasets.public_api as tfds + + +class RedPajamaV2Test(tfds.testing.DatasetBuilderTestCase): + """Tests for RedPajamaV2 dataset.""" + + redpajama_v2_dataset_builder._NUM_SHARDS = 1 + redpajama_v2_dataset_builder._PARTITIONS = ['tail'] + redpajama_v2_dataset_builder._LANGUAGES = ['en'] + + DATASET_CLASS = redpajama_v2_dataset_builder.Builder + SPLITS = {'train': 1} + + DL_DOWNLOAD_RESULT = { + '1900-01/0000/en_tail': { + redpajama_v2_dataset_builder._DOCS_COMPONENT: 'documents.json.gz' + } + } + DL_EXTRACT_RESULT = 'missing_urls.txt' + SKIP_CHECKSUMS = True + + +if __name__ == '__main__': + tfds.testing.test_main()