Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Distributed encode #136

Merged
merged 8 commits into from
Apr 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# 0.0.6 2024-04-xx
# 0.0.6 2024-04-24

- Only use NOSHUFFLE by default on ``call_genotype`` and bool arrays.
- Add initial implementation of distributed encode

# 0.0.5 2024-04-17

Expand Down
149 changes: 125 additions & 24 deletions bio2zarr/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import click
import coloredlogs
import humanfriendly
import numcodecs
import tabulate

Expand Down Expand Up @@ -39,6 +40,14 @@ def list_commands(self, ctx):
"zarr_path", type=click.Path(file_okay=False, dir_okay=True)
)

zarr_path = click.argument(
"zarr_path", type=click.Path(exists=True, file_okay=False, dir_okay=True)
)

num_partitions = click.argument("num_partitions", type=click.IntRange(min=1))

partition = click.argument("partition", type=click.IntRange(min=0))

verbose = click.option("-v", "--verbose", count=True, help="Increase verbosity")

force = click.option(
Expand Down Expand Up @@ -92,6 +101,27 @@ def list_commands(self, ctx):
help="Chunk size in the samples dimension",
)

schema = click.option("-s", "--schema", default=None, type=click.Path(exists=True))

max_variant_chunks = click.option(
"-V",
"--max-variant-chunks",
type=int,
default=None,
help=(
"Truncate the output in the variants dimension to have "
"this number of chunks. Mainly intended to help with "
"schema tuning."
),
)

max_memory = click.option(
"-M",
"--max-memory",
default=None,
help="An approximate bound on overall memory usage (e.g. 10G),",
)


def setup_logging(verbosity):
level = "WARNING"
Expand Down Expand Up @@ -158,7 +188,7 @@ def explode(
@click.command
@vcfs
@new_icf_path
@click.argument("num_partitions", type=click.IntRange(min=1))
@num_partitions
@force
@column_chunk_size
@compressor
Expand Down Expand Up @@ -194,7 +224,7 @@ def dexplode_init(

@click.command
@icf_path
@click.argument("partition", type=click.IntRange(min=0))
@partition
@verbose
def dexplode_partition(icf_path, partition, verbose):
"""
Expand All @@ -207,14 +237,14 @@ def dexplode_partition(icf_path, partition, verbose):


@click.command
@click.argument("path", type=click.Path(), required=True)
@icf_path
@verbose
def dexplode_finalise(path, verbose):
def dexplode_finalise(icf_path, verbose):
"""
Final step for distributed conversion of VCF(s) to intermediate columnar format.
"""
setup_logging(verbose)
vcf.explode_finalise(path)
vcf.explode_finalise(icf_path)


@click.command
Expand Down Expand Up @@ -244,26 +274,11 @@ def mkschema(icf_path):
@new_zarr_path
@force
@verbose
@click.option("-s", "--schema", default=None, type=click.Path(exists=True))
@schema
@variants_chunk_size
@samples_chunk_size
@click.option(
"-V",
"--max-variant-chunks",
type=int,
default=None,
help=(
"Truncate the output in the variants dimension to have "
"this number of chunks. Mainly intended to help with "
"schema tuning."
),
)
@click.option(
"-M",
"--max-memory",
default=None,
help="An approximate bound on overall memory usage (e.g. 10G),",
)
@max_variant_chunks
@max_memory
@worker_processes
def encode(
icf_path,
Expand All @@ -288,13 +303,96 @@ def encode(
schema_path=schema,
variants_chunk_size=variants_chunk_size,
samples_chunk_size=samples_chunk_size,
max_v_chunks=max_variant_chunks,
max_variant_chunks=max_variant_chunks,
worker_processes=worker_processes,
max_memory=max_memory,
show_progress=True,
)


@click.command
@icf_path
@new_zarr_path
@num_partitions
@force
@schema
@variants_chunk_size
@samples_chunk_size
@max_variant_chunks
@verbose
def dencode_init(
icf_path,
zarr_path,
num_partitions,
force,
schema,
variants_chunk_size,
samples_chunk_size,
max_variant_chunks,
verbose,
):
"""
Initialise conversion of intermediate format to VCF Zarr. This will
set up the specified ZARR_PATH to perform this conversion over
NUM_PARTITIONS.

The output of this commmand is the actual number of partitions generated
(which may be less then the requested number, if there is not sufficient
chunks in the variants dimension) and a rough lower-bound on the amount
of memory required to encode a partition.

NOTE: the format of this output will likely change in subsequent releases;
it should not be considered machine-readable for now.
"""
setup_logging(verbose)
check_overwrite_dir(zarr_path, force)
num_partitions, max_memory = vcf.encode_init(
icf_path,
zarr_path,
target_num_partitions=num_partitions,
schema_path=schema,
variants_chunk_size=variants_chunk_size,
samples_chunk_size=samples_chunk_size,
max_variant_chunks=max_variant_chunks,
show_progress=True,
)
formatted_size = humanfriendly.format_size(max_memory, binary=True)
# NOTE adding the size to the stdout here so that users can parse it
# and use in their submission scripts. This is a first pass, and
# will most likely change as we see what works and doesn't.
# NOTE we probably want to format this as a table, which lists
# some other properties, line by line
# NOTE This size number is also not quite enough, you need a bit of
# headroom with it (probably 10% or so). We should include this.
click.echo(f"{num_partitions}\t{formatted_size}")


@click.command
@zarr_path
@partition
@verbose
def dencode_partition(zarr_path, partition, verbose):
"""
Convert a partition from intermediate columnar format to VCF Zarr.
Must be called *after* the Zarr path has been initialised with dencode_init.
Partition indexes must be from 0 (inclusive) to the number of paritions
returned by dencode_init (exclusive).
"""
setup_logging(verbose)
vcf.encode_partition(zarr_path, partition)


@click.command
@zarr_path
@verbose
def dencode_finalise(zarr_path, verbose):
"""
Final step for distributed conversion of ICF to VCF Zarr.
"""
setup_logging(verbose)
vcf.encode_finalise(zarr_path, show_progress=True)


@click.command(name="convert")
@vcfs
@new_zarr_path
Expand Down Expand Up @@ -382,6 +480,9 @@ def vcf2zarr():
vcf2zarr.add_command(dexplode_init)
vcf2zarr.add_command(dexplode_partition)
vcf2zarr.add_command(dexplode_finalise)
vcf2zarr.add_command(dencode_init)
vcf2zarr.add_command(dencode_partition)
vcf2zarr.add_command(dencode_finalise)


@click.command(name="convert")
Expand Down
6 changes: 3 additions & 3 deletions bio2zarr/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ def flush(self):
sync_flush_2d_array(
self.buff[: self.buffer_row], self.array, self.array_offset
)
# FIXME the array.name doesn't seem to be working here for some reason
logger.debug(
f"Flushed <{self.array.name} {self.array.shape} "
f"{self.array.dtype}> "
Expand All @@ -131,8 +132,7 @@ def sync_flush_2d_array(np_buffer, zarr_array, offset):
# encoder implementations.
s = slice(offset, offset + np_buffer.shape[0])
samples_chunk_size = zarr_array.chunks[1]
# TODO use zarr chunks here to support non-uniform chunking later
# and for simplicity
# TODO use zarr chunks here for simplicity
zarr_array_width = zarr_array.shape[1]
start = 0
while start < zarr_array_width:
Expand Down Expand Up @@ -192,7 +192,7 @@ def __init__(self, worker_processes=1, progress_config=None):
self.progress_config = progress_config
self.progress_bar = tqdm.tqdm(
total=progress_config.total,
desc=f"{progress_config.title:>7}",
desc=f"{progress_config.title:>8}",
unit_scale=True,
unit=progress_config.units,
smoothing=0.1,
Expand Down
Loading
Loading