Skip to content

Commit

Permalink
Merge pull request #184 from jeromekelleher/encode-explode-init-inter…
Browse files Browse the repository at this point in the history
…face

Improved info interface on encode/explode init
  • Loading branch information
jeromekelleher authored May 7, 2024
2 parents f2ec234 + eeeceea commit 4a8e0ff
Show file tree
Hide file tree
Showing 7 changed files with 118 additions and 45 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
# 0.0.10 2024-05-XX
- Change output format of dexplode-init and dencode-init

# 0.0.9 2024-05-02

- Change on-disk format for explode and schema
Expand Down
38 changes: 25 additions & 13 deletions bio2zarr/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@

import click
import coloredlogs
import humanfriendly
import numcodecs
import tabulate

Expand Down Expand Up @@ -65,6 +64,13 @@ def list_commands(self, ctx):
help="Partition indexes are interpreted as one-based",
)

json = click.option(
"--json",
is_flag=True,
flag_value=True,
help="Output summary data in JSON format",
)

version = click.version_option(version=f"{provenance.__version__}")

worker_processes = click.option(
Expand Down Expand Up @@ -166,6 +172,16 @@ def get_compressor(cname):
return numcodecs.get_codec(config)


def show_work_summary(work_summary, json):
if json:
output = work_summary.asjson()
else:
data = work_summary.asdict()
output = tabulate.tabulate(list(data.items()), tablefmt="plain")
# output = "\n".join(f"{k}\t{v}" for k, v in data.items())
click.echo(output)


@click.command
@vcfs
@new_icf_path
Expand Down Expand Up @@ -199,6 +215,7 @@ def explode(
@force
@column_chunk_size
@compressor
@json
@verbose
@worker_processes
def dexplode_init(
Expand All @@ -208,6 +225,7 @@ def dexplode_init(
force,
column_chunk_size,
compressor,
json,
verbose,
worker_processes,
):
Expand All @@ -217,7 +235,7 @@ def dexplode_init(
"""
setup_logging(verbose)
check_overwrite_dir(icf_path, force)
num_partitions = vcf.explode_init(
work_summary = vcf.explode_init(
icf_path,
vcfs,
target_num_partitions=num_partitions,
Expand All @@ -226,7 +244,7 @@ def dexplode_init(
compressor=get_compressor(compressor),
show_progress=True,
)
click.echo(num_partitions)
show_work_summary(work_summary, json)


@click.command
Expand Down Expand Up @@ -331,6 +349,7 @@ def encode(
@variants_chunk_size
@samples_chunk_size
@max_variant_chunks
@json
@verbose
def dencode_init(
icf_path,
Expand All @@ -341,6 +360,7 @@ def dencode_init(
variants_chunk_size,
samples_chunk_size,
max_variant_chunks,
json,
verbose,
):
"""
Expand All @@ -358,7 +378,7 @@ def dencode_init(
"""
setup_logging(verbose)
check_overwrite_dir(zarr_path, force)
num_partitions, max_memory = vcf.encode_init(
work_summary = vcf.encode_init(
icf_path,
zarr_path,
target_num_partitions=num_partitions,
Expand All @@ -368,15 +388,7 @@ def dencode_init(
max_variant_chunks=max_variant_chunks,
show_progress=True,
)
formatted_size = humanfriendly.format_size(max_memory, binary=True)
# NOTE adding the size to the stdout here so that users can parse it
# and use in their submission scripts. This is a first pass, and
# will most likely change as we see what works and doesn't.
# NOTE we probably want to format this as a table, which lists
# some other properties, line by line
# NOTE This size number is also not quite enough, you need a bit of
# headroom with it (probably 10% or so). We should include this.
click.echo(f"{num_partitions}\t{formatted_size}")
show_work_summary(work_summary, json)


@click.command
Expand Down
61 changes: 55 additions & 6 deletions bio2zarr/vcf.py
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,10 @@ def num_contigs(self):
def num_filters(self):
return len(self.filters)

@property
def num_samples(self):
return len(self.samples)

@staticmethod
def fromdict(d):
if d["format_version"] != ICF_METADATA_FORMAT_VERSION:
Expand Down Expand Up @@ -982,6 +986,19 @@ def check_field_clobbering(icf_metadata):
)


@dataclasses.dataclass
class IcfWriteSummary:
num_partitions: int
num_samples: int
num_variants: int

def asdict(self):
return dataclasses.asdict(self)

def asjson(self):
return json.dumps(self.asdict(), indent=4)


class IntermediateColumnarFormatWriter:
def __init__(self, path):
self.path = pathlib.Path(path)
Expand Down Expand Up @@ -1038,7 +1055,11 @@ def init(
logger.info("Writing WIP metadata")
with open(self.wip_path / "metadata.json", "w") as f:
json.dump(self.metadata.asdict(), f, indent=4)
return self.num_partitions
return IcfWriteSummary(
num_partitions=self.num_partitions,
num_variants=icf_metadata.num_records,
num_samples=icf_metadata.num_samples,
)

def mkdirs(self):
num_dirs = len(self.metadata.fields)
Expand Down Expand Up @@ -1371,6 +1392,7 @@ def variant_chunk_nbytes(self):
"""
Returns the nbytes for a single variant chunk of this array.
"""
# TODO WARNING IF this is a string
chunk_items = self.chunks[0]
for size in self.shape[1:]:
chunk_items *= size
Expand Down Expand Up @@ -1643,6 +1665,21 @@ def fromdict(d):
return ret


@dataclasses.dataclass
class VcfZarrWriteSummary:
num_partitions: int
num_samples: int
num_variants: int
num_chunks: int
max_encoding_memory: str

def asdict(self):
return dataclasses.asdict(self)

def asjson(self):
return json.dumps(self.asdict(), indent=4)


class VcfZarrWriter:
def __init__(self, path):
self.path = pathlib.Path(path)
Expand Down Expand Up @@ -1718,13 +1755,22 @@ def init(
store = zarr.DirectoryStore(self.arrays_path)
root = zarr.group(store=store)

for column in self.schema.fields.values():
self.init_array(root, column, partitions[-1].stop)
total_chunks = 0
for field in self.schema.fields.values():
a = self.init_array(root, field, partitions[-1].stop)
total_chunks += a.nchunks

logger.info("Writing WIP metadata")
with open(self.wip_path / "metadata.json", "w") as f:
json.dump(self.metadata.asdict(), f, indent=4)
return len(partitions)

return VcfZarrWriteSummary(
num_variants=self.icf.num_records,
num_samples=self.icf.num_samples,
num_partitions=self.num_partitions,
num_chunks=total_chunks,
max_encoding_memory=display_size(self.get_max_encoding_memory()),
)

def encode_samples(self, root):
if self.schema.samples != self.icf.metadata.samples:
Expand Down Expand Up @@ -1794,6 +1840,7 @@ def init_array(self, root, variable, variants_dim_size):
}
)
logger.debug(f"Initialised {a}")
return a

#######################
# encode_partition
Expand Down Expand Up @@ -2062,6 +2109,9 @@ def get_max_encoding_memory(self):
"""
Return the approximate maximum memory used to encode a variant chunk.
"""
# NOTE This size number is also not quite enough, you need a bit of
# headroom with it (probably 10% or so). We should include this.
# FIXME this is actively wrong for String columns. See if we can do better.
max_encoding_mem = max(
col.variant_chunk_nbytes for col in self.schema.fields.values()
)
Expand Down Expand Up @@ -2190,14 +2240,13 @@ def encode_init(
schema = VcfZarrSchema.fromjson(f.read())
zarr_path = pathlib.Path(zarr_path)
vzw = VcfZarrWriter(zarr_path)
vzw.init(
return vzw.init(
icf,
target_num_partitions=target_num_partitions,
schema=schema,
dimension_separator=dimension_separator,
max_variant_chunks=max_variant_chunks,
)
return vzw.num_partitions, vzw.get_max_encoding_memory()


def encode_partition(zarr_path, partition):
Expand Down
29 changes: 21 additions & 8 deletions tests/test_cli.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import dataclasses
import json
from unittest import mock

import click.testing as ct
Expand Down Expand Up @@ -46,6 +48,17 @@
DEFAULT_DENCODE_FINALISE_ARGS = dict(show_progress=True)


@dataclasses.dataclass
class FakeWorkSummary:
num_partitions: int

def asdict(self):
return dataclasses.asdict(self)

def asjson(self):
return json.dumps(self.asdict())


class TestWithMocks:
vcf_path = "tests/data/vcf/sample.vcf.gz"

Expand Down Expand Up @@ -262,7 +275,7 @@ def test_vcf_explode_missing_and_existing_vcf(self, mocked, tmp_path):
assert "'no_such_file' does not exist" in result.stderr
mocked.assert_not_called()

@mock.patch("bio2zarr.vcf.explode_init", return_value=5)
@mock.patch("bio2zarr.vcf.explode_init", return_value=FakeWorkSummary(5))
def test_vcf_dexplode_init(self, mocked, tmp_path):
runner = ct.CliRunner(mix_stderr=False)
icf_path = tmp_path / "icf"
Expand All @@ -273,7 +286,7 @@ def test_vcf_dexplode_init(self, mocked, tmp_path):
)
assert result.exit_code == 0
assert len(result.stderr) == 0
assert result.stdout == "5\n"
assert list(result.stdout.split()) == ["num_partitions", "5"]
mocked.assert_called_once_with(
str(icf_path),
(self.vcf_path,),
Expand Down Expand Up @@ -415,7 +428,7 @@ def test_encode(self, mocked, tmp_path):
**DEFAULT_ENCODE_ARGS,
)

@mock.patch("bio2zarr.vcf.encode_init", return_value=(10, 1024))
@mock.patch("bio2zarr.vcf.encode_init", return_value=FakeWorkSummary(10))
def test_dencode_init(self, mocked, tmp_path):
icf_path = tmp_path / "icf"
icf_path.mkdir()
Expand All @@ -427,7 +440,7 @@ def test_dencode_init(self, mocked, tmp_path):
catch_exceptions=False,
)
assert result.exit_code == 0
assert result.stdout == "10\t1 KiB\n"
assert list(result.stdout.split()) == ["num_partitions", "10"]
assert len(result.stderr) == 0
mocked.assert_called_once_with(
str(icf_path),
Expand Down Expand Up @@ -529,11 +542,11 @@ def test_dexplode(self, tmp_path, one_based):
runner = ct.CliRunner(mix_stderr=False)
result = runner.invoke(
cli.vcf2zarr,
f"dexplode-init {self.vcf_path} {icf_path} 5",
f"dexplode-init {self.vcf_path} {icf_path} 5 --json",
catch_exceptions=False,
)
assert result.exit_code == 0
assert result.stdout.strip() == "3"
assert json.loads(result.stdout)["num_partitions"] == 3

for j in range(3):
if one_based:
Expand Down Expand Up @@ -598,11 +611,11 @@ def test_dencode(self, tmp_path, one_based):
assert result.exit_code == 0
result = runner.invoke(
cli.vcf2zarr,
f"dencode-init {icf_path} {zarr_path} 5 --variants-chunk-size=3",
f"dencode-init {icf_path} {zarr_path} 5 --variants-chunk-size=3 --json",
catch_exceptions=False,
)
assert result.exit_code == 0
assert result.stdout.split()[0] == "3"
assert json.loads(result.stdout)["num_partitions"] == 3

for j in range(3):
if one_based:
Expand Down
12 changes: 6 additions & 6 deletions tests/test_icf.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,8 @@ class TestIcfWriterExample:
def test_init_paths(self, tmp_path):
icf_path = tmp_path / "x.icf"
assert not icf_path.exists()
num_partitions = vcf.explode_init(icf_path, [self.data_path])
assert num_partitions == 3
summary = vcf.explode_init(icf_path, [self.data_path])
assert summary.num_partitions == 3
assert icf_path.exists()
wip_path = icf_path / "wip"
assert wip_path.exists()
Expand All @@ -118,9 +118,9 @@ def test_init_paths(self, tmp_path):
def test_finalise_paths(self, tmp_path):
icf_path = tmp_path / "x.icf"
wip_path = icf_path / "wip"
num_partitions = vcf.explode_init(icf_path, [self.data_path])
summary = vcf.explode_init(icf_path, [self.data_path])
assert icf_path.exists()
for j in range(num_partitions):
for j in range(summary.num_partitions):
vcf.explode_partition(icf_path, j)
assert wip_path.exists()
vcf.explode_finalise(icf_path)
Expand Down Expand Up @@ -270,8 +270,8 @@ def run_explode(self, tmp_path, **kwargs):

def run_dexplode(self, tmp_path, **kwargs):
icf_path = tmp_path / "icf"
partitions = vcf.explode_init(icf_path, [self.data_path], **kwargs)
for j in range(partitions):
summary = vcf.explode_init(icf_path, [self.data_path], **kwargs)
for j in range(summary.num_partitions):
vcf.explode_partition(icf_path, j)
vcf.explode_finalise(icf_path)
return vcf.IntermediateColumnarFormat(icf_path)
Expand Down
Loading

0 comments on commit 4a8e0ff

Please sign in to comment.