Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Local paths in common voice #3736

Merged
merged 7 commits into from
Feb 22, 2022
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 93 additions & 18 deletions datasets/common_voice/common_voice.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
""" Common Voice Dataset"""


import os

import datasets
from datasets.tasks import AutomaticSpeechRecognition

Expand Down Expand Up @@ -655,65 +657,136 @@ def _info(self):
],
)

def _split_generators(self, dl_manager):
def _split_generators(self, dl_manager, streaming):
"""Returns SplitGenerators."""
archive = dl_manager.download(_DATA_URL.format(self.config.name))
path_to_data = "/".join(["cv-corpus-6.1-2020-12-11", self.config.name])
path_to_clips = "/".join([path_to_data, "clips"])
archive_path = dl_manager.download(_DATA_URL.format(self.config.name))
if streaming:
# Here we use iter_archive in streaming mode because dl_manager.download_and_extract
# doesn't work to stream TAR archives (we have to stream the files in the archive one by one).
#
# The iter_archive method returns an iterable of (path_within_archive, file_obj) for every
# file in the TAR archive.
#
archive_iterator = dl_manager.iter_archive(archive_path)
# we locate the data using the path within the archive
path_to_data = "/".join(["cv-corpus-6.1-2020-12-11", self.config.name])
path_to_clips = "/".join([path_to_data, "clips"])
metadata_filepaths = {
split: "/".join([path_to_data, f"{split}.tsv"])
for split in ["train", "test", "dev", "other", "validated", "invalidated"]
}
else:
# In non-streaming we can extract the archive locally as usual
extracted_dir = dl_manager.extract(archive_path)
archive_iterator = None
# we locate the data using the local path
path_to_data = os.path.join(extracted_dir, "cv-corpus-6.1-2020-12-11", self.config.name)
path_to_clips = os.path.join(path_to_data, "clips")
metadata_filepaths = {
split: os.path.join(path_to_data, f"{split}.tsv")
for split in ["train", "test", "dev", "other", "validated", "invalidated"]
}

return [
datasets.SplitGenerator(
name=datasets.Split.TRAIN,
gen_kwargs={
"files": dl_manager.iter_archive(archive),
"filepath": "/".join([path_to_data, "train.tsv"]),
"streaming": streaming,
"archive_iterator": archive_iterator,
"filepath": metadata_filepaths["train"],
"path_to_clips": path_to_clips,
},
),
datasets.SplitGenerator(
name=datasets.Split.TEST,
gen_kwargs={
"files": dl_manager.iter_archive(archive),
"filepath": "/".join([path_to_data, "test.tsv"]),
"streaming": streaming,
"archive_iterator": archive_iterator,
"filepath": metadata_filepaths["test"],
"path_to_clips": path_to_clips,
},
),
datasets.SplitGenerator(
name=datasets.Split.VALIDATION,
gen_kwargs={
"files": dl_manager.iter_archive(archive),
"filepath": "/".join([path_to_data, "dev.tsv"]),
"streaming": streaming,
"archive_iterator": archive_iterator,
"filepath": metadata_filepaths["dev"],
"path_to_clips": path_to_clips,
},
),
datasets.SplitGenerator(
name="other",
gen_kwargs={
"files": dl_manager.iter_archive(archive),
"filepath": "/".join([path_to_data, "other.tsv"]),
"streaming": streaming,
"archive_iterator": archive_iterator,
"filepath": metadata_filepaths["other"],
"path_to_clips": path_to_clips,
},
),
datasets.SplitGenerator(
name="validated",
gen_kwargs={
"files": dl_manager.iter_archive(archive),
"filepath": "/".join([path_to_data, "validated.tsv"]),
"streaming": streaming,
"archive_iterator": archive_iterator,
"filepath": metadata_filepaths["validated"],
"path_to_clips": path_to_clips,
},
),
datasets.SplitGenerator(
name="invalidated",
gen_kwargs={
"files": dl_manager.iter_archive(archive),
"filepath": "/".join([path_to_data, "invalidated.tsv"]),
"streaming": streaming,
"archive_iterator": archive_iterator,
"filepath": metadata_filepaths["invalidated"],
"path_to_clips": path_to_clips,
},
),
]

def _generate_examples(self, files, filepath, path_to_clips):
def _generate_examples(self, streaming, archive_iterator, filepath, path_to_clips):
"""Yields examples."""
if streaming:
yield from self._generate_examples_streaming(archive_iterator, filepath, path_to_clips)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

small nit - I'd even pass the streaming flag here to make it super clear that they are two different modes and maybe have both a _generate_examples_streaming(...) and a _generate_examples_non_streaming(...)

else:
yield from self._generate_examples_non_streaming(filepath, path_to_clips)

def _generate_examples_non_streaming(self, filepath, path_to_clips):

data_fields = list(self._info().features.keys())

# audio is not a header of the csv files
data_fields.remove("audio")
path_idx = data_fields.index("path")

with open(filepath, encoding="utf-8") as f:
lines = f.readlines()
headline = lines[0]

column_names = headline.strip().split("\t")
assert (
column_names == data_fields
), f"The file should have {data_fields} as column names, but has {column_names}"

for id_, line in enumerate(lines[1:]):
field_values = line.strip().split("\t")

# set absolute path for mp3 audio file
field_values[path_idx] = os.path.join(path_to_clips, field_values[path_idx])

# if data is incomplete, fill with empty values
if len(field_values) < len(data_fields):
field_values += (len(data_fields) - len(field_values)) * ["''"]

result = {key: value for key, value in zip(data_fields, field_values)}

# set audio feature
result["audio"] = field_values[path_idx]

yield id_, result

def _generate_examples_streaming(self, archive_iterator, filepath, path_to_clips):
"""Yields examples in streaming mode."""
data_fields = list(self._info().features.keys())

# audio is not a header of the csv files
Expand All @@ -722,7 +795,7 @@ def _generate_examples(self, files, filepath, path_to_clips):

all_field_values = {}
metadata_found = False
for path, f in files:
for path, f in archive_iterator:
if path == filepath:
metadata_found = True
lines = f.readlines()
Expand Down Expand Up @@ -752,5 +825,7 @@ def _generate_examples(self, files, filepath, path_to_clips):

# set audio feature
result["audio"] = {"path": path, "bytes": f.read()}
# set path to None since the path doesn't exist locally in streaming mode
result["path"] = None

yield path, result
15 changes: 11 additions & 4 deletions src/datasets/builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -657,7 +657,9 @@ def _download_and_prepare(self, dl_manager, verify_infos, **prepare_split_kwargs
"""
# Generating data for all splits
split_dict = SplitDict(dataset_name=self.name)
split_generators_kwargs = self._make_split_generators_kwargs(prepare_split_kwargs)
split_generators_kwargs = self._make_split_generators_kwargs(
{"dl_manager": dl_manager, **prepare_split_kwargs}
)
split_generators = self._split_generators(dl_manager, **split_generators_kwargs)

# Checksums verification
Expand Down Expand Up @@ -727,8 +729,12 @@ def _save_infos(self):

def _make_split_generators_kwargs(self, prepare_split_kwargs):
"""Get kwargs for `self._split_generators()` from `prepare_split_kwargs`."""
del prepare_split_kwargs
return {}
split_generators_kwargs = {}
split_generators_arg_names = inspect.signature(self._split_generators).parameters.keys()
if "streaming" in split_generators_arg_names:
streaming = isinstance(prepare_split_kwargs.get("dl_manager"), StreamingDownloadManager)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess you need the DownloadManager instance to find out whether we are in streaming mode or not... Because the builder itself knows nothing about streaming or not...

Copy link
Member Author

@lhoestq lhoestq Feb 21, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed having this logic inside the builder can be a bit unexpected. An alternative would be to replace the streaming parameter by

streaming = dl_manager.is_streaming

inside the dataset script

split_generators_kwargs["streaming"] = streaming
return split_generators_kwargs

def as_dataset(
self, split: Optional[Split] = None, run_post_process=True, ignore_verifications=False, in_memory=False
Expand Down Expand Up @@ -892,7 +898,8 @@ def as_streaming_dataset(
data_dir=self.config.data_dir,
)
self._check_manual_download(dl_manager)
splits_generators = {sg.name: sg for sg in self._split_generators(dl_manager)}
split_generators_kwargs = self._make_split_generators_kwargs({"dl_manager": dl_manager})
splits_generators = {sg.name: sg for sg in self._split_generators(dl_manager, **split_generators_kwargs)}
# By default, return all splits
if split is None:
splits_generator = splits_generators
Expand Down
11 changes: 8 additions & 3 deletions src/datasets/commands/dummy_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -339,7 +339,8 @@ def _autogenerate_dummy_data(self, dataset_builder, mock_dl_manager, keep_uncomp
dl_manager = DummyDataGeneratorDownloadManager(
dataset_name=self._dataset_name, mock_download_manager=mock_dl_manager, download_config=download_config
)
dataset_builder._split_generators(dl_manager)
split_generators_kwargs = dataset_builder._make_split_generators_kwargs({"dl_manager": dl_manager})
dataset_builder._split_generators(dl_manager, **split_generators_kwargs)
mock_dl_manager.load_existing_dummy_data = False # don't use real dummy data
dl_manager.auto_generate_dummy_data_folder(
n_lines=self._n_lines,
Expand All @@ -356,7 +357,10 @@ def _autogenerate_dummy_data(self, dataset_builder, mock_dl_manager, keep_uncomp
n_examples_per_split = {}
os.makedirs(dataset_builder._cache_dir, exist_ok=True)
try:
split_generators = dataset_builder._split_generators(mock_dl_manager)
split_generators_kwargs = dataset_builder._make_split_generators_kwargs(
{"dl_manager": mock_dl_manager}
)
split_generators = dataset_builder._split_generators(mock_dl_manager, **split_generators_kwargs)
for split_generator in split_generators:
dataset_builder._prepare_split(split_generator)
n_examples_per_split[split_generator.name] = split_generator.split_info.num_examples
Expand Down Expand Up @@ -393,7 +397,8 @@ def _print_dummy_data_instructions(self, dataset_builder, mock_dl_manager):
os.makedirs(dummy_data_folder, exist_ok=True)

try:
generator_splits = dataset_builder._split_generators(mock_dl_manager)
split_generators_kwargs = dataset_builder._make_split_generators_kwargs({"dl_manager": mock_dl_manager})
generator_splits = dataset_builder._split_generators(mock_dl_manager, **split_generators_kwargs)
except FileNotFoundError as e:

print(
Expand Down
6 changes: 3 additions & 3 deletions src/datasets/inspect.py
Original file line number Diff line number Diff line change
Expand Up @@ -280,11 +280,11 @@ def get_dataset_split_names(
download_config = download_config.copy() if download_config else DownloadConfig()
if use_auth_token is not None:
download_config.use_auth_token = use_auth_token
dl_manager = StreamingDownloadManager(base_path=builder.base_path, download_config=download_config)
split_generators_kwargs = builder._make_split_generators_kwargs({"dl_manager": dl_manager})
return [
split_generator.name
for split_generator in builder._split_generators(
StreamingDownloadManager(base_path=builder.base_path, download_config=download_config)
)
for split_generator in builder._split_generators(dl_manager, **split_generators_kwargs)
]
except Exception as err:
raise SplitsNotFoundError("The split names could not be parsed from the dataset config.") from err
Expand Down