Skip to content

Commit

Permalink
passthrough of filehandles between smart_open en xopen to benefit fro…
Browse files Browse the repository at this point in the history
…m both
  • Loading branch information
geertvandeweyer committed Feb 8, 2024
1 parent 0f06f8f commit da2aebc
Show file tree
Hide file tree
Showing 4 changed files with 108 additions and 62 deletions.
19 changes: 14 additions & 5 deletions src/cutadapt/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,8 @@ def get_argument_parser() -> ArgumentParser:
# Compression level for gzipped output files. Not exposed since we have -Z
group.add_argument("--compression-level", type=int, default=5,
help=SUPPRESS)
# transport_params passed to smart_open, see smart_open documentation. can be a json file or a json string
group.add_argument("--transport-params", type=str, default="",help=SUPPRESS)
# Disable adapter index creation
group.add_argument("--no-index", dest="index", default=True, action="store_false", help=SUPPRESS)

Expand Down Expand Up @@ -567,7 +569,7 @@ def determine_paired(args) -> bool:


def make_input_paths(
inputs: Sequence[str], paired: bool, interleaved: bool
inputs: Sequence[str], paired: bool, interleaved: bool, transport_params: str
) -> InputPaths:
"""
Do some other error checking of the input file names and return InputPaths.
Expand Down Expand Up @@ -605,10 +607,15 @@ def make_input_paths(

if input_paired_filename:
return InputPaths(
input_filename, input_paired_filename, interleaved=interleaved
input_filename,
input_paired_filename,
interleaved=interleaved,
transport_params=transport_params,
)
else:
return InputPaths(input_filename, interleaved=interleaved)
return InputPaths(
input_filename, interleaved=interleaved, transport_params=transport_params
)


def check_arguments(args, paired: bool) -> None:
Expand Down Expand Up @@ -1208,16 +1215,18 @@ def main(cmdlineargs, default_outfile=sys.stdout.buffer) -> Statistics:
file_opener = FileOpener(
compression_level=args.compression_level,
threads=estimate_compression_threads(cores),
transport_params=args.transport_params,
)
if sys.stderr.isatty() and not args.quiet and not args.debug:
progress = Progress()
else:
progress = DummyProgress()
paired = determine_paired(args)

try:
is_interleaved_input = args.interleaved and len(args.inputs) == 1
input_paths = make_input_paths(args.inputs, paired, is_interleaved_input)
input_paths = make_input_paths(
args.inputs, paired, is_interleaved_input, args.transport_params
)
check_arguments(args, paired)
adapters, adapters2 = adapters_from_args(args)
log_adapters(adapters, adapters2 if paired else None)
Expand Down
121 changes: 73 additions & 48 deletions src/cutadapt/files.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,11 @@
from abc import ABC, abstractmethod
from enum import Enum
from typing import BinaryIO, Optional, Dict, List, TextIO, Any
import lzma
import dnaio
from xopen import xopen

import smart_open
import logging
import json
from cutadapt.utils import logger

try:
Expand All @@ -18,56 +19,42 @@
resource = None # type: ignore


def open_rb(path: str):
"""
Open a (possibly compressed) file for reading in binary mode.
Determines if the file is local or remote and opens it accordingly.
"""
# local file: open with xopen routines
if os.path.exists(path):
return xopen_rb_raise_limit(path)
# assume remote file
else:
return smart_open_rb(path)


def smart_open_rb(path: str):
"""
Open a (possibly compressed) remote file for reading in binary mode.
see smart_open documentation for details
"""
try:
import smart_open

except ImportError:
raise ImportError(
"The smart_open package is required to read from remote files"
)
# for xz : load additional library
if path.endswith(".xz"):

def _handle_xz(file_obj, mode="rb"):
return lzma.LZMAFile(file_obj, mode, format=lzma.FORMAT_XZ)

smart_open.register_compressor(".xz", _handle_xz)
try:
return smart_open.open(path, "rb")
except Exception as e:
logger.error("Error opening '%s': %s", path, e)
raise


def xopen_rb_raise_limit(path: str):
def xopen_rb_raise_limit(path: str, transport_params: str = ""):
"""
Open a (possibly compressed) file for reading in binary mode, trying to avoid the
"Too many open files" problem using `open_raise_limit`.
"""
mode = "rb"
f = open_raise_limit(xopen, path, mode, threads=0)
# transfer options : string of key=value,key=value or a file path: convert to dictionary
transport_params = get_transport_params(path, transport_params)
# smart_open to automatically open remote files, disable auto-compression
f = open_raise_limit(
smart_open.open,
path,
mode,
compression="disable",
transport_params=transport_params,
)
logging.getLogger("smart_open").setLevel(logging.WARNING)
# pass through to xopen to handle compression
f = open_raise_limit(xopen, f, mode, threads=4)
logger.debug("Opening '%s', mode '%s' with xopen resulted in %s", path, mode, f)
return f


def get_transport_params(path, transport_params):
if not transport_params:
return {}
# load from json file
if os.path.isfile(transport_params):
with open(transport_params) as f:
transport_params = json.load(f)
else:
transport_params = json.loads(transport_params)

return transport_params


def open_raise_limit(func, *args, **kwargs):
"""
Run 'func' (which should be some kind of open() function) and return its result.
Expand All @@ -94,26 +81,59 @@ def raise_open_files_limit(n):


class FileOpener:
def __init__(self, compression_level: int = 1, threads: Optional[int] = None):
def __init__(
self,
compression_level: int = 1,
threads: Optional[int] = None,
transport_params: str = "",
):
"""
threads -- no. of external compression threads.
0: write in-process
None: min(cpu_count(), 4)
"""
self.compression_level = compression_level
self.threads = threads
self.transport_params = transport_params

def smart_open(self, path, mode):
# get transport params for smart_open
transport_params = get_transport_params(path, self.transport_params)
# smart_open to automatically open remote files, disable auto-compression
f = open_raise_limit(
smart_open.open,
path,
mode,
compression="disable",
transport_params=transport_params,
)
logging.getLogger("smart_open").setLevel(logging.ERROR)
logger.debug(
"Opening output '%s', mode '%s' with smart_open resulted in %s",
path,
mode,
f,
)
return f

def xopen(self, path, mode):
threads = self.threads if "w" in mode else 0
# smart open to handle remote files
f = self.smart_open(path, mode)
# xopen to handle compression
f = open_raise_limit(
xopen, path, mode, compresslevel=self.compression_level, threads=threads
xopen, f, mode, compresslevel=self.compression_level, threads=threads
)
if "w" in mode:
extra = f" (compression level {self.compression_level}, {threads} threads)"
else:
extra = ""
logger.debug(
"Opening '%s', mode '%s'%s with xopen resulted in %s", path, mode, extra, f
"Opening output '%s', mode '%s'%s with xopen resulted in %s",
path,
mode,
extra,
f,
)
return f

Expand Down Expand Up @@ -173,12 +193,17 @@ def close(self) -> None:


class InputPaths:
def __init__(self, *paths: str, interleaved: bool = False):
def __init__(
self, *paths: str, interleaved: bool = False, transport_params: str = ""
):
self.paths = paths
self.interleaved = interleaved
self.transport_params = transport_params

def open(self) -> InputFiles:
files = [open_rb(path) for path in self.paths]
files = [
xopen_rb_raise_limit(path, self.transport_params) for path in self.paths
]
return InputFiles(*files, interleaved=self.interleaved)


Expand Down
4 changes: 2 additions & 2 deletions src/cutadapt/runners.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
InputFiles,
OutputFiles,
InputPaths,
open_rb,
xopen_rb_raise_limit,
detect_file_format,
FileFormat,
ProxyWriter,
Expand Down Expand Up @@ -91,7 +91,7 @@ def run(self):
try:
with ExitStack() as stack:
files = [
stack.enter_context(open_rb(path))
stack.enter_context(xopen_rb_raise_limit(path))
for path in self._paths
]
file_format = detect_file_format(files[0])
Expand Down
26 changes: 19 additions & 7 deletions tests/test_files.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
import os
import pickle

from cutadapt.files import ProxyTextFile, ProxyRecordWriter, OutputFiles,open_rb
from cutadapt.files import (
ProxyTextFile,
ProxyRecordWriter,
OutputFiles,
xopen_rb_raise_limit,
)
from dnaio import SequenceRecord


Expand Down Expand Up @@ -127,25 +132,32 @@ def test_interleaved_record_writer(self, tmp_path):


def test_open_rb_local_file(tmp_path):

# Create a local file
file_path = tmp_path / "test.txt"
file_path.write_text("Hello, World!")
# Test opening a local file
file = open_rb(str(file_path))
file = xopen_rb_raise_limit(str(file_path))
assert file.read() == b"Hello, World!"
file.close()


def test_open_rb_remote_file():
# Test opening a remote file over https
file = open_rb("https://raw.githubusercontent.com/marcelm/cutadapt/main/tests/data/454.fa")
file = xopen_rb_raise_limit(
"https://raw.githubusercontent.com/marcelm/cutadapt/main/tests/data/454.fa"
)
assert file.readline() == b">000163_1255_2627 length=52 uaccno=E0R4ISW01DCIQD\n"
file.close()


def test_open_rb_s3_file():
# Test opening a remote file on s3
file = open_rb("s3://platinum-genomes/2017-1.0/md5sum.txt")
assert file.readline() == b"2e6aa26b42283bbbc4ca03686f427dc2 ./hg38/small_variants/ConfidentRegions.bed.gz\n"
# Test opening a remote file on s3
file = xopen_rb_raise_limit("s3://platinum-genomes/2017-1.0/md5sum.txt")
assert (
file.readline()
== b"2e6aa26b42283bbbc4ca03686f427dc2 ./hg38/small_variants/ConfidentRegions.bed.gz\n"
)
file.close()
# - test force fasta
# - test qualities
Expand Down

0 comments on commit da2aebc

Please sign in to comment.