Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Local paths in common voice #3736

Merged
merged 7 commits into from
Feb 22, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 93 additions & 17 deletions datasets/common_voice/common_voice.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
""" Common Voice Dataset"""


import os

import datasets
from datasets.tasks import AutomaticSpeechRecognition

Expand Down Expand Up @@ -657,63 +659,135 @@ def _info(self):

def _split_generators(self, dl_manager):
"""Returns SplitGenerators."""
archive = dl_manager.download(_DATA_URL.format(self.config.name))
path_to_data = "/".join(["cv-corpus-6.1-2020-12-11", self.config.name])
path_to_clips = "/".join([path_to_data, "clips"])
streaming = dl_manager.is_streaming
archive_path = dl_manager.download(_DATA_URL.format(self.config.name))
if streaming:
# Here we use iter_archive in streaming mode because dl_manager.download_and_extract
# doesn't work to stream TAR archives (we have to stream the files in the archive one by one).
#
# The iter_archive method returns an iterable of (path_within_archive, file_obj) for every
# file in the TAR archive.
#
archive_iterator = dl_manager.iter_archive(archive_path)
# we locate the data using the path within the archive
path_to_data = "/".join(["cv-corpus-6.1-2020-12-11", self.config.name])
path_to_clips = "/".join([path_to_data, "clips"])
metadata_filepaths = {
split: "/".join([path_to_data, f"{split}.tsv"])
for split in ["train", "test", "dev", "other", "validated", "invalidated"]
}
else:
# In non-streaming we can extract the archive locally as usual
extracted_dir = dl_manager.extract(archive_path)
archive_iterator = None
# we locate the data using the local path
path_to_data = os.path.join(extracted_dir, "cv-corpus-6.1-2020-12-11", self.config.name)
path_to_clips = os.path.join(path_to_data, "clips")
metadata_filepaths = {
split: os.path.join(path_to_data, f"{split}.tsv")
for split in ["train", "test", "dev", "other", "validated", "invalidated"]
}

return [
datasets.SplitGenerator(
name=datasets.Split.TRAIN,
gen_kwargs={
"files": dl_manager.iter_archive(archive),
"filepath": "/".join([path_to_data, "train.tsv"]),
"streaming": streaming,
"archive_iterator": archive_iterator,
"filepath": metadata_filepaths["train"],
"path_to_clips": path_to_clips,
},
),
datasets.SplitGenerator(
name=datasets.Split.TEST,
gen_kwargs={
"files": dl_manager.iter_archive(archive),
"filepath": "/".join([path_to_data, "test.tsv"]),
"streaming": streaming,
"archive_iterator": archive_iterator,
"filepath": metadata_filepaths["test"],
"path_to_clips": path_to_clips,
},
),
datasets.SplitGenerator(
name=datasets.Split.VALIDATION,
gen_kwargs={
"files": dl_manager.iter_archive(archive),
"filepath": "/".join([path_to_data, "dev.tsv"]),
"streaming": streaming,
"archive_iterator": archive_iterator,
"filepath": metadata_filepaths["dev"],
"path_to_clips": path_to_clips,
},
),
datasets.SplitGenerator(
name="other",
gen_kwargs={
"files": dl_manager.iter_archive(archive),
"filepath": "/".join([path_to_data, "other.tsv"]),
"streaming": streaming,
"archive_iterator": archive_iterator,
"filepath": metadata_filepaths["other"],
"path_to_clips": path_to_clips,
},
),
datasets.SplitGenerator(
name="validated",
gen_kwargs={
"files": dl_manager.iter_archive(archive),
"filepath": "/".join([path_to_data, "validated.tsv"]),
"streaming": streaming,
"archive_iterator": archive_iterator,
"filepath": metadata_filepaths["validated"],
"path_to_clips": path_to_clips,
},
),
datasets.SplitGenerator(
name="invalidated",
gen_kwargs={
"files": dl_manager.iter_archive(archive),
"filepath": "/".join([path_to_data, "invalidated.tsv"]),
"streaming": streaming,
"archive_iterator": archive_iterator,
"filepath": metadata_filepaths["invalidated"],
"path_to_clips": path_to_clips,
},
),
]

def _generate_examples(self, files, filepath, path_to_clips):
def _generate_examples(self, streaming, archive_iterator, filepath, path_to_clips):
"""Yields examples."""
if streaming:
yield from self._generate_examples_streaming(archive_iterator, filepath, path_to_clips)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

small nit - I'd even pass the streaming flag here to make it super clear that they are two different modes and maybe have both a _generate_examples_streaming(...) and a _generate_examples_non_streaming(...)

else:
yield from self._generate_examples_non_streaming(filepath, path_to_clips)

def _generate_examples_non_streaming(self, filepath, path_to_clips):

data_fields = list(self._info().features.keys())

# audio is not a header of the csv files
data_fields.remove("audio")
path_idx = data_fields.index("path")

with open(filepath, encoding="utf-8") as f:
lines = f.readlines()
headline = lines[0]

column_names = headline.strip().split("\t")
assert (
column_names == data_fields
), f"The file should have {data_fields} as column names, but has {column_names}"

for id_, line in enumerate(lines[1:]):
field_values = line.strip().split("\t")

# set absolute path for mp3 audio file
field_values[path_idx] = os.path.join(path_to_clips, field_values[path_idx])

# if data is incomplete, fill with empty values
if len(field_values) < len(data_fields):
field_values += (len(data_fields) - len(field_values)) * ["''"]

result = {key: value for key, value in zip(data_fields, field_values)}

# set audio feature
result["audio"] = field_values[path_idx]

yield id_, result

def _generate_examples_streaming(self, archive_iterator, filepath, path_to_clips):
"""Yields examples in streaming mode."""
data_fields = list(self._info().features.keys())

# audio is not a header of the csv files
Expand All @@ -722,7 +796,7 @@ def _generate_examples(self, files, filepath, path_to_clips):

all_field_values = {}
metadata_found = False
for path, f in files:
for path, f in archive_iterator:
if path == filepath:
metadata_found = True
lines = f.readlines()
Expand Down Expand Up @@ -752,5 +826,7 @@ def _generate_examples(self, files, filepath, path_to_clips):

# set audio feature
result["audio"] = {"path": path, "bytes": f.read()}
# set path to None since the path doesn't exist locally in streaming mode
result["path"] = None

yield path, result
2 changes: 2 additions & 0 deletions src/datasets/utils/download_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ class GenerateMode(enum.Enum):


class DownloadManager:
is_streaming = False

def __init__(
self,
dataset_name: Optional[str] = None,
Expand Down
1 change: 1 addition & 0 deletions src/datasets/utils/mock_download_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
class MockDownloadManager:
dummy_file_name = "dummy_data"
datasets_scripts_dir = "datasets"
is_streaming = False

def __init__(
self,
Expand Down
2 changes: 2 additions & 0 deletions src/datasets/utils/streaming_download_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -712,6 +712,8 @@ class StreamingDownloadManager:
builtin `open` function to stream data from remote files.
"""

is_streaming = True

def __init__(
self,
dataset_name: Optional[str] = None,
Expand Down