"""
- read and write allc file
- parallel writing
- read region from allc
- separate process gives better performance
This file is modified from xopen 0.3.4
Here is the licence
Copyright (c) 2010-2018 Marcel Martin <mail@marcelm.net>
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
"""
import codecs
import gzip
import os
import pathlib
import sys
import time
from subprocess import Popen, PIPE, run
try:
run(["pigz", "--version"], stdout=PIPE, stderr=PIPE)
except OSError:
PIGZ = False
try:
run(["bgzip", "--version"], stdout=PIPE, stderr=PIPE)
except OSError:
BGZIP = False
[docs]class Closing(object):
"""
Inherit from this class and implement a close() method to offer context
manager functionality.
"""
[docs] def close(self):
raise NotImplementedError
[docs] def __enter__(self):
return self
[docs] def __exit__(self, *exc_info):
self.close()
[docs] def __del__(self):
try:
self.close()
except OSError:
pass
[docs]class PipedGzipWriter(Closing):
"""
Write gzip-compressed files by running an external gzip or pigz process and
piping into it. On Python 2, this is faster than using gzip.open(). On
Python 3, it allows to run the compression in a separate process and can
therefore also be faster.
"""
def __init__(self, path, mode="wt", compresslevel=6, threads=1):
"""
mode -- one of 'w', 'wt', 'wb', 'a', 'at', 'ab'
compresslevel -- gzip compression level
threads (int) -- number of pigz threads (None means to let pigz decide)
"""
if mode not in ("w", "wt", "wb", "a", "at", "ab"):
raise ValueError(
"Mode is '{0}', but it must be 'w', 'wt', 'wb', 'a', 'at' or 'ab'".format(
mode
)
)
self.outfile = open(path, mode)
self.devnull = open(os.devnull, mode)
self.closed = False
self.name = path
kwargs = dict(stdin=PIPE, stdout=self.outfile, stderr=self.devnull)
# Setting close_fds to True in the Popen arguments is necessary due to
# <http://bugs.python.org/issue12786>.
# However, close_fds is not supported on Windows. See
# <https://github.com/marcelm/cutadapt/issues/315>.
if sys.platform != "win32":
kwargs["close_fds"] = True
# only apply to gzip and pigz, bgzip use -l
if "w" in mode and compresslevel != 6:
extra_args = ["-" + str(compresslevel)]
else:
extra_args = []
if "b" not in mode:
encoding = None
else:
encoding = "utf8"
kwargs["encoding"] = encoding
try:
if BGZIP:
bgzip_args = ["bgzip"]
if threads is not None and threads > 0:
bgzip_args += ["-@", str(threads), "-l", str(compresslevel)]
self.process = Popen(bgzip_args, **kwargs)
self.program = "bgzip"
elif PIGZ:
pigz_args = ["pigz"]
if threads is not None and threads > 0:
pigz_args += ["-p", str(threads)]
self.process = Popen(pigz_args + extra_args, **kwargs)
self.program = "pigz"
else:
# pigz not found, try regular gzip
self.process = Popen(["gzip"] + extra_args, **kwargs)
self.program = "gzip"
except OSError:
self.outfile.close()
self.devnull.close()
raise
self._file = codecs.getwriter("utf-8")(self.process.stdin)
[docs] def write(self, arg):
self._file.write(arg)
[docs] def close(self):
self.closed = True
self._file.close()
return_code = self.process.wait()
self.outfile.close()
self.devnull.close()
if return_code != 0:
raise IOError(
f"Output {self.program} process terminated with exit code {return_code}"
)
[docs]class PipedGzipReader(Closing):
# decompression can't be parallel even in pigz, so there is not thread/cpu parameter
def __init__(self, path, region=None, mode="r"):
if mode not in ("r", "rt", "rb"):
raise ValueError(f"Mode is {mode}, but it must be 'r', 'rt' or 'rb'")
if "b" not in mode:
encoding = "utf8"
else:
encoding = None
if region is None:
self.process = Popen(
["gzip", "-cd", path], stdout=PIPE, stderr=PIPE, encoding=encoding
)
else:
self.process = Popen(
["tabix", path] + region.split(" "),
stdout=PIPE,
stderr=PIPE,
encoding=encoding,
)
self.name = path
self._file = self.process.stdout
self._stderr = self.process.stderr
self.closed = False
# Give gzip a little bit of time to report any errors
# (such as a non-existing file)
time.sleep(0.01)
self._raise_if_error()
[docs] def close(self):
self.closed = True
return_code = self.process.poll()
if return_code is None:
# still running
self.process.terminate()
self._raise_if_error()
[docs] def __iter__(self):
for line in self._file:
yield line
self.process.wait()
self._raise_if_error()
[docs] def readline(self):
return self._file.readline()
[docs] def _raise_if_error(self):
"""
Raise IOError if process is not running anymore and the
exit code is nonzero.
"""
return_code = self.process.poll()
if return_code is not None and return_code != 0:
message = self._stderr.read().strip()
raise IOError(message)
[docs] def read(self, *args):
data = self._file.read(*args)
if len(args) == 0 or args[0] <= 0:
# wait for process to terminate until we check the exit code
self.process.wait()
self._raise_if_error()
return data
[docs]class PipedBamReader(Closing):
# decompression can't be parallel even in pigz, so there is not thread/cpu parameter
def __init__(
self, path, region=None, mode="r", include_header=True, samtools_parms_str=None
):
if mode not in ("r", "rt", "rb"):
raise ValueError(f"Mode is {mode}, but it must be 'r', 'rt' or 'rb'")
if "b" not in mode:
encoding = "utf8"
else:
encoding = None
command_list = ["samtools", "view"]
if include_header:
command_list.append("-h")
if samtools_parms_str is not None:
command_list.extend(samtools_parms_str.split(" "))
if region is None:
self.process = Popen(
command_list + [path], stdout=PIPE, stderr=PIPE, encoding=encoding
)
else:
self.process = Popen(
command_list + [path] + region.split(" "),
stdout=PIPE,
stderr=PIPE,
encoding=encoding,
)
self.name = path
self.file = self.process.stdout
self._stderr = self.process.stderr
self.closed = False
# Give gzip a little bit of time to report any errors
# (such as a non-existing file)
time.sleep(0.01)
self._raise_if_error()
[docs] def close(self):
self.closed = True
return_code = self.process.poll()
if return_code is None:
# still running
self.process.terminate()
self._raise_if_error()
[docs] def __iter__(self):
for line in self.file:
yield line
self.process.wait()
self._raise_if_error()
[docs] def readline(self):
return self.file.readline()
[docs] def _raise_if_error(self):
"""
Raise IOError if process is not running anymore and the
exit code is nonzero.
"""
return_code = self.process.poll()
if return_code is not None and return_code != 0:
message = self._stderr.read().strip()
raise IOError(message)
[docs] def read(self, *args):
data = self.file.read(*args)
if len(args) == 0 or args[0] <= 0:
# wait for process to terminate until we check the exit code
self.process.wait()
self._raise_if_error()
return data
[docs]class PipedBamWriter(Closing):
def __init__(self, path, mode="wt", threads=1):
"""
mode -- one of 'w', 'wt', 'wb', 'a', 'at', 'ab'
threads (int) -- number of samtools threads
"""
if mode not in ("w", "wt", "wb", "a", "at", "ab"):
raise ValueError(
"Mode is '{0}', but it must be 'w', 'wt', 'wb', 'a', 'at' or 'ab'".format(
mode
)
)
self.outfile = open(path, mode)
self.devnull = open(os.devnull, mode)
self.closed = False
self.name = path
kwargs = dict(stdin=PIPE, stdout=self.outfile, stderr=self.devnull)
# Setting close_fds to True in the Popen arguments is necessary due to
# <http://bugs.python.org/issue12786>.
# However, close_fds is not supported on Windows. See
# <https://github.com/marcelm/cutadapt/issues/315>.
if sys.platform != "win32":
kwargs["close_fds"] = True
try:
samtools_args = ["samtools", "view", "-b", "-@", str(threads)]
self.process = Popen(samtools_args, **kwargs)
self.program = "samtools"
except OSError:
self.outfile.close()
self.devnull.close()
raise
self._file = codecs.getwriter("utf-8")(self.process.stdin)
return
[docs] def write(self, arg):
self._file.write(arg)
[docs] def close(self):
self.closed = True
self._file.close()
return_code = self.process.wait()
self.outfile.close()
self.devnull.close()
if return_code != 0:
raise IOError(
f"Output {self.program} process terminated with exit code {return_code}"
)
[docs]def open_bam(
file_path,
mode="r",
region=None,
include_header=True,
samtools_parms_str=None,
threads=1,
):
if "r" in mode:
if region is not None:
if not has_bai(file_path):
raise FileNotFoundError(
f".bai index file not found for {file_path}. Index before region query."
)
try:
return PipedBamReader(
file_path,
region=region,
mode=mode,
include_header=include_header,
samtools_parms_str=samtools_parms_str,
)
except OSError as e:
raise e
else:
return PipedBamWriter(file_path, mode, threads=threads)
[docs]def open_gz(file_path, mode="r", compresslevel=3, threads=1, region=None):
if region is not None:
if not isinstance(region, str):
raise TypeError("region parameter need to be string.")
if "r" in mode:
try:
return PipedGzipReader(file_path, region=region, mode=mode)
except OSError:
# gzip not installed
return gzip.open(file_path, mode)
else:
try:
return PipedGzipWriter(file_path, mode, compresslevel, threads=threads)
except OSError:
return gzip.open(file_path, mode, compresslevel=compresslevel)
[docs]def open_allc(file_path, mode="r", compresslevel=3, threads=1, region=None):
"""
A replacement for the "open" function that can also open files that have
been compressed with gzip, bzip2 or xz. If the file_path is '-', standard
output (mode 'w') or input (mode 'r') is returned.
The file type is determined based on the file_path: .gz is gzip, .bz2 is bzip2 and .xz is
xz/lzma.
When writing a gzip-compressed file, the following methods are tried in order to get the
best speed 1) using a pigz (parallel gzip) subprocess; 2) using a gzip subprocess;
3) gzip.open. A single gzip subprocess can be faster than gzip.open because it runs in a
separate process.
Uncompressed files are opened with the regular open().
mode can be: 'rt', 'rb', 'at', 'ab', 'wt', or 'wb'. Also, the 't' can be omitted,
so instead of 'rt', 'wt' and 'at', the abbreviations 'r', 'w' and 'a' can be used.
threads is the number of threads for pigz. If None, then the pigz default is used.
multi-thread only apply to writer, reader (decompression) can't be paralleled
"""
file_path = pathlib.Path(file_path).resolve()
file_exist = file_path.exists()
if "w" not in mode:
if not file_exist:
raise FileNotFoundError(f"{file_path} does not exist.")
else:
if not file_path.parent.exists():
raise OSError(f"File directory {file_path.parent} does not exist.")
file_path = str(file_path)
if mode in ("r", "w", "a"):
mode += "t"
if mode not in ("rt", "rb", "wt", "wb", "at", "ab"):
raise ValueError("mode '{0}' not supported".format(mode))
if compresslevel not in range(1, 10):
raise ValueError("compresslevel must be between 1 and 9")
if region is not None:
# unzipped file
if not file_path.endswith("gz"):
raise ValueError(
f"File must be compressed by bgzip to use region query. File path {file_path}"
)
# normal gzipped file
if not has_tabix(file_path):
raise ValueError(
f"Tried inspect {file_path}, "
"File is compressed by normal gzip, but region query only apply to bgzip"
)
if not os.path.exists(file_path + ".tbi"):
raise FileNotFoundError("region query provided, but .tbi index not found")
if file_path.endswith("gz"):
return open_gz(file_path, mode, compresslevel, threads, region=region)
else:
return open(file_path, mode)
[docs]def has_tabix(filename):
tabix_path = filename + ".tbi"
if os.path.exists(tabix_path):
return True
else:
return False
[docs]def has_bai(filename):
index_path = filename + ".bai"
if os.path.exists(index_path):
return True
else:
return False