Skip to content

Commit

Permalink
Local paths in common voice (#3736)
Browse files Browse the repository at this point in the history
* Merge generators for local files and streaming

* add the streaming parameter to _split_generators

* update common_voice

* patrick's comment:
- pass streaming to _generate_examples
- separate in two methods

* add is_streaming attribute to the dl managers

* revert the streaming parameter being passed to _split_generators

Co-authored-by: anton-l <[email protected]>
  • Loading branch information
lhoestq and anton-l authored Feb 22, 2022
1 parent ddd3604 commit e3c8e25
Show file tree
Hide file tree
Showing 4 changed files with 98 additions and 17 deletions.
110 changes: 93 additions & 17 deletions datasets/common_voice/common_voice.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
""" Common Voice Dataset"""


import os

import datasets
from datasets.tasks import AutomaticSpeechRecognition

Expand Down Expand Up @@ -657,63 +659,135 @@ def _info(self):

def _split_generators(self, dl_manager):
"""Returns SplitGenerators."""
archive = dl_manager.download(_DATA_URL.format(self.config.name))
path_to_data = "/".join(["cv-corpus-6.1-2020-12-11", self.config.name])
path_to_clips = "/".join([path_to_data, "clips"])
streaming = dl_manager.is_streaming
archive_path = dl_manager.download(_DATA_URL.format(self.config.name))
if streaming:
# Here we use iter_archive in streaming mode because dl_manager.download_and_extract
# doesn't work to stream TAR archives (we have to stream the files in the archive one by one).
#
# The iter_archive method returns an iterable of (path_within_archive, file_obj) for every
# file in the TAR archive.
#
archive_iterator = dl_manager.iter_archive(archive_path)
# we locate the data using the path within the archive
path_to_data = "/".join(["cv-corpus-6.1-2020-12-11", self.config.name])
path_to_clips = "/".join([path_to_data, "clips"])
metadata_filepaths = {
split: "/".join([path_to_data, f"{split}.tsv"])
for split in ["train", "test", "dev", "other", "validated", "invalidated"]
}
else:
# In non-streaming we can extract the archive locally as usual
extracted_dir = dl_manager.extract(archive_path)
archive_iterator = None
# we locate the data using the local path
path_to_data = os.path.join(extracted_dir, "cv-corpus-6.1-2020-12-11", self.config.name)
path_to_clips = os.path.join(path_to_data, "clips")
metadata_filepaths = {
split: os.path.join(path_to_data, f"{split}.tsv")
for split in ["train", "test", "dev", "other", "validated", "invalidated"]
}

return [
datasets.SplitGenerator(
name=datasets.Split.TRAIN,
gen_kwargs={
"files": dl_manager.iter_archive(archive),
"filepath": "/".join([path_to_data, "train.tsv"]),
"streaming": streaming,
"archive_iterator": archive_iterator,
"filepath": metadata_filepaths["train"],
"path_to_clips": path_to_clips,
},
),
datasets.SplitGenerator(
name=datasets.Split.TEST,
gen_kwargs={
"files": dl_manager.iter_archive(archive),
"filepath": "/".join([path_to_data, "test.tsv"]),
"streaming": streaming,
"archive_iterator": archive_iterator,
"filepath": metadata_filepaths["test"],
"path_to_clips": path_to_clips,
},
),
datasets.SplitGenerator(
name=datasets.Split.VALIDATION,
gen_kwargs={
"files": dl_manager.iter_archive(archive),
"filepath": "/".join([path_to_data, "dev.tsv"]),
"streaming": streaming,
"archive_iterator": archive_iterator,
"filepath": metadata_filepaths["dev"],
"path_to_clips": path_to_clips,
},
),
datasets.SplitGenerator(
name="other",
gen_kwargs={
"files": dl_manager.iter_archive(archive),
"filepath": "/".join([path_to_data, "other.tsv"]),
"streaming": streaming,
"archive_iterator": archive_iterator,
"filepath": metadata_filepaths["other"],
"path_to_clips": path_to_clips,
},
),
datasets.SplitGenerator(
name="validated",
gen_kwargs={
"files": dl_manager.iter_archive(archive),
"filepath": "/".join([path_to_data, "validated.tsv"]),
"streaming": streaming,
"archive_iterator": archive_iterator,
"filepath": metadata_filepaths["validated"],
"path_to_clips": path_to_clips,
},
),
datasets.SplitGenerator(
name="invalidated",
gen_kwargs={
"files": dl_manager.iter_archive(archive),
"filepath": "/".join([path_to_data, "invalidated.tsv"]),
"streaming": streaming,
"archive_iterator": archive_iterator,
"filepath": metadata_filepaths["invalidated"],
"path_to_clips": path_to_clips,
},
),
]

def _generate_examples(self, files, filepath, path_to_clips):
def _generate_examples(self, streaming, archive_iterator, filepath, path_to_clips):
"""Yields examples."""
if streaming:
yield from self._generate_examples_streaming(archive_iterator, filepath, path_to_clips)
else:
yield from self._generate_examples_non_streaming(filepath, path_to_clips)

def _generate_examples_non_streaming(self, filepath, path_to_clips):

data_fields = list(self._info().features.keys())

# audio is not a header of the csv files
data_fields.remove("audio")
path_idx = data_fields.index("path")

with open(filepath, encoding="utf-8") as f:
lines = f.readlines()
headline = lines[0]

column_names = headline.strip().split("\t")
assert (
column_names == data_fields
), f"The file should have {data_fields} as column names, but has {column_names}"

for id_, line in enumerate(lines[1:]):
field_values = line.strip().split("\t")

# set absolute path for mp3 audio file
field_values[path_idx] = os.path.join(path_to_clips, field_values[path_idx])

# if data is incomplete, fill with empty values
if len(field_values) < len(data_fields):
field_values += (len(data_fields) - len(field_values)) * ["''"]

result = {key: value for key, value in zip(data_fields, field_values)}

# set audio feature
result["audio"] = field_values[path_idx]

yield id_, result

def _generate_examples_streaming(self, archive_iterator, filepath, path_to_clips):
"""Yields examples in streaming mode."""
data_fields = list(self._info().features.keys())

# audio is not a header of the csv files
Expand All @@ -722,7 +796,7 @@ def _generate_examples(self, files, filepath, path_to_clips):

all_field_values = {}
metadata_found = False
for path, f in files:
for path, f in archive_iterator:
if path == filepath:
metadata_found = True
lines = f.readlines()
Expand Down Expand Up @@ -752,5 +826,7 @@ def _generate_examples(self, files, filepath, path_to_clips):

# set audio feature
result["audio"] = {"path": path, "bytes": f.read()}
# set path to None since the path doesn't exist locally in streaming mode
result["path"] = None

yield path, result
2 changes: 2 additions & 0 deletions src/datasets/utils/download_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ class GenerateMode(enum.Enum):


class DownloadManager:
is_streaming = False

def __init__(
self,
dataset_name: Optional[str] = None,
Expand Down
1 change: 1 addition & 0 deletions src/datasets/utils/mock_download_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
class MockDownloadManager:
dummy_file_name = "dummy_data"
datasets_scripts_dir = "datasets"
is_streaming = False

def __init__(
self,
Expand Down
2 changes: 2 additions & 0 deletions src/datasets/utils/streaming_download_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -740,6 +740,8 @@ class StreamingDownloadManager:
builtin `open` function to stream data from remote files.
"""

is_streaming = True

def __init__(
self,
dataset_name: Optional[str] = None,
Expand Down

1 comment on commit e3c8e25

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Show benchmarks

PyArrow==3.0.0

Show updated benchmarks!

Benchmark: benchmark_array_xd.json

metric read_batch_formatted_as_numpy after write_array2d read_batch_formatted_as_numpy after write_flattened_sequence read_batch_formatted_as_numpy after write_nested_sequence read_batch_unformated after write_array2d read_batch_unformated after write_flattened_sequence read_batch_unformated after write_nested_sequence read_col_formatted_as_numpy after write_array2d read_col_formatted_as_numpy after write_flattened_sequence read_col_formatted_as_numpy after write_nested_sequence read_col_unformated after write_array2d read_col_unformated after write_flattened_sequence read_col_unformated after write_nested_sequence read_formatted_as_numpy after write_array2d read_formatted_as_numpy after write_flattened_sequence read_formatted_as_numpy after write_nested_sequence read_unformated after write_array2d read_unformated after write_flattened_sequence read_unformated after write_nested_sequence write_array2d write_flattened_sequence write_nested_sequence
new / old (diff) 0.010075 / 0.011353 (-0.001278) 0.004071 / 0.011008 (-0.006937) 0.031094 / 0.038508 (-0.007414) 0.035552 / 0.023109 (0.012443) 0.309199 / 0.275898 (0.033301) 0.339664 / 0.323480 (0.016184) 0.008354 / 0.007986 (0.000368) 0.003673 / 0.004328 (-0.000656) 0.009095 / 0.004250 (0.004844) 0.047072 / 0.037052 (0.010019) 0.293065 / 0.258489 (0.034576) 0.343483 / 0.293841 (0.049642) 0.031630 / 0.128546 (-0.096916) 0.009714 / 0.075646 (-0.065932) 0.252578 / 0.419271 (-0.166693) 0.050279 / 0.043533 (0.006746) 0.295697 / 0.255139 (0.040558) 0.321607 / 0.283200 (0.038407) 0.103858 / 0.141683 (-0.037825) 1.823560 / 1.452155 (0.371405) 1.971818 / 1.492716 (0.479102)

Benchmark: benchmark_getitem_100B.json

metric get_batch_of_1024_random_rows get_batch_of_1024_rows get_first_row get_last_row
new / old (diff) 0.332969 / 0.018006 (0.314963) 0.538086 / 0.000490 (0.537596) 0.039639 / 0.000200 (0.039440) 0.000450 / 0.000054 (0.000395)

Benchmark: benchmark_indices_mapping.json

metric select shard shuffle sort train_test_split
new / old (diff) 0.029421 / 0.037411 (-0.007990) 0.108286 / 0.014526 (0.093760) 0.116746 / 0.176557 (-0.059811) 0.170304 / 0.737135 (-0.566831) 0.118584 / 0.296338 (-0.177755)

Benchmark: benchmark_iterating.json

metric read 5000 read 50000 read_batch 50000 10 read_batch 50000 100 read_batch 50000 1000 read_formatted numpy 5000 read_formatted pandas 5000 read_formatted tensorflow 5000 read_formatted torch 5000 read_formatted_batch numpy 5000 10 read_formatted_batch numpy 5000 1000 shuffled read 5000 shuffled read 50000 shuffled read_batch 50000 10 shuffled read_batch 50000 100 shuffled read_batch 50000 1000 shuffled read_formatted numpy 5000 shuffled read_formatted_batch numpy 5000 10 shuffled read_formatted_batch numpy 5000 1000
new / old (diff) 0.419464 / 0.215209 (0.204255) 4.204035 / 2.077655 (2.126380) 1.786422 / 1.504120 (0.282302) 1.583487 / 1.541195 (0.042292) 1.677827 / 1.468490 (0.209337) 0.448684 / 4.584777 (-4.136093) 4.564673 / 3.745712 (0.818960) 2.361948 / 5.269862 (-2.907913) 0.922088 / 4.565676 (-3.643588) 0.054323 / 0.424275 (-0.369952) 0.012177 / 0.007607 (0.004570) 0.526850 / 0.226044 (0.300805) 5.290348 / 2.268929 (3.021419) 2.264141 / 55.444624 (-53.180483) 1.886483 / 6.876477 (-4.989994) 1.921017 / 2.142072 (-0.221055) 0.570513 / 4.805227 (-4.234715) 0.126059 / 6.500664 (-6.374605) 0.063866 / 0.075469 (-0.011603)

Benchmark: benchmark_map_filter.json

metric filter map fast-tokenizer batched map identity map identity batched map no-op batched map no-op batched numpy map no-op batched pandas map no-op batched pytorch map no-op batched tensorflow
new / old (diff) 1.616405 / 1.841788 (-0.225383) 14.386078 / 8.074308 (6.311770) 26.813310 / 10.191392 (16.621918) 0.856142 / 0.680424 (0.175718) 0.520870 / 0.534201 (-0.013331) 0.499721 / 0.579283 (-0.079562) 0.497813 / 0.434364 (0.063450) 0.324437 / 0.540337 (-0.215901) 0.339474 / 1.386936 (-1.047462)
PyArrow==latest
Show updated benchmarks!

Benchmark: benchmark_array_xd.json

metric read_batch_formatted_as_numpy after write_array2d read_batch_formatted_as_numpy after write_flattened_sequence read_batch_formatted_as_numpy after write_nested_sequence read_batch_unformated after write_array2d read_batch_unformated after write_flattened_sequence read_batch_unformated after write_nested_sequence read_col_formatted_as_numpy after write_array2d read_col_formatted_as_numpy after write_flattened_sequence read_col_formatted_as_numpy after write_nested_sequence read_col_unformated after write_array2d read_col_unformated after write_flattened_sequence read_col_unformated after write_nested_sequence read_formatted_as_numpy after write_array2d read_formatted_as_numpy after write_flattened_sequence read_formatted_as_numpy after write_nested_sequence read_unformated after write_array2d read_unformated after write_flattened_sequence read_unformated after write_nested_sequence write_array2d write_flattened_sequence write_nested_sequence
new / old (diff) 0.008775 / 0.011353 (-0.002578) 0.004262 / 0.011008 (-0.006746) 0.029648 / 0.038508 (-0.008860) 0.035234 / 0.023109 (0.012125) 0.315909 / 0.275898 (0.040011) 0.331053 / 0.323480 (0.007574) 0.006737 / 0.007986 (-0.001249) 0.005133 / 0.004328 (0.000804) 0.007431 / 0.004250 (0.003181) 0.042121 / 0.037052 (0.005069) 0.296367 / 0.258489 (0.037878) 0.331381 / 0.293841 (0.037541) 0.031914 / 0.128546 (-0.096632) 0.009756 / 0.075646 (-0.065890) 0.252752 / 0.419271 (-0.166520) 0.051153 / 0.043533 (0.007620) 0.301778 / 0.255139 (0.046639) 0.342005 / 0.283200 (0.058806) 0.096204 / 0.141683 (-0.045479) 1.828544 / 1.452155 (0.376389) 1.885010 / 1.492716 (0.392294)

Benchmark: benchmark_getitem_100B.json

metric get_batch_of_1024_random_rows get_batch_of_1024_rows get_first_row get_last_row
new / old (diff) 0.289397 / 0.018006 (0.271390) 0.541106 / 0.000490 (0.540616) 0.004442 / 0.000200 (0.004242) 0.000090 / 0.000054 (0.000036)

Benchmark: benchmark_indices_mapping.json

metric select shard shuffle sort train_test_split
new / old (diff) 0.031158 / 0.037411 (-0.006253) 0.101763 / 0.014526 (0.087237) 0.115269 / 0.176557 (-0.061288) 0.166817 / 0.737135 (-0.570318) 0.117006 / 0.296338 (-0.179332)

Benchmark: benchmark_iterating.json

metric read 5000 read 50000 read_batch 50000 10 read_batch 50000 100 read_batch 50000 1000 read_formatted numpy 5000 read_formatted pandas 5000 read_formatted tensorflow 5000 read_formatted torch 5000 read_formatted_batch numpy 5000 10 read_formatted_batch numpy 5000 1000 shuffled read 5000 shuffled read 50000 shuffled read_batch 50000 10 shuffled read_batch 50000 100 shuffled read_batch 50000 1000 shuffled read_formatted numpy 5000 shuffled read_formatted_batch numpy 5000 10 shuffled read_formatted_batch numpy 5000 1000
new / old (diff) 0.424042 / 0.215209 (0.208833) 4.250288 / 2.077655 (2.172633) 1.800701 / 1.504120 (0.296581) 1.594893 / 1.541195 (0.053698) 1.692951 / 1.468490 (0.224461) 0.447943 / 4.584777 (-4.136834) 4.597339 / 3.745712 (0.851627) 3.660060 / 5.269862 (-1.609802) 0.950130 / 4.565676 (-3.615546) 0.053962 / 0.424275 (-0.370313) 0.012142 / 0.007607 (0.004535) 0.531101 / 0.226044 (0.305057) 5.364393 / 2.268929 (3.095464) 2.296333 / 55.444624 (-53.148291) 1.899553 / 6.876477 (-4.976923) 2.019455 / 2.142072 (-0.122617) 0.582093 / 4.805227 (-4.223134) 0.124581 / 6.500664 (-6.376083) 0.062192 / 0.075469 (-0.013277)

Benchmark: benchmark_map_filter.json

metric filter map fast-tokenizer batched map identity map identity batched map no-op batched map no-op batched numpy map no-op batched pandas map no-op batched pytorch map no-op batched tensorflow
new / old (diff) 1.627391 / 1.841788 (-0.214396) 14.094533 / 8.074308 (6.020225) 26.456637 / 10.191392 (16.265245) 0.884784 / 0.680424 (0.204360) 0.531134 / 0.534201 (-0.003067) 0.496945 / 0.579283 (-0.082338) 0.505636 / 0.434364 (0.071272) 0.320152 / 0.540337 (-0.220186) 0.327214 / 1.386936 (-1.059722)

CML watermark

Please sign in to comment.