diff --git a/src/main/java/htsjdk/samtools/BAMFileReader.java b/src/main/java/htsjdk/samtools/BAMFileReader.java index 98bb74f63..2de91c176 100644 --- a/src/main/java/htsjdk/samtools/BAMFileReader.java +++ b/src/main/java/htsjdk/samtools/BAMFileReader.java @@ -25,6 +25,7 @@ import htsjdk.samtools.seekablestream.SeekableStream; +import htsjdk.samtools.util.AsyncBlockCompressedInputStream; import htsjdk.samtools.util.BinaryCodec; import htsjdk.samtools.util.BlockCompressedInputStream; import htsjdk.samtools.util.CloseableIterator; @@ -67,10 +68,6 @@ // If true, all SAMRecords are fully decoded as they are read. private boolean eagerDecode; - // If true, the BAMFileReader will use asynchronous IO. - // Note: this field currently has no effect (is not hooked up anywhere), but will be in the future. See https://github.com/samtools/htsjdk/pull/576 - private final boolean useAsynchronousIO; - // For error-checking. private ValidationStringency mValidationStringency; @@ -107,8 +104,7 @@ throws IOException { mIndexFile = indexFile; mIsSeekable = false; - this.useAsynchronousIO = useAsynchronousIO; - mCompressedInputStream = new BlockCompressedInputStream(stream); + mCompressedInputStream = useAsynchronousIO ? new AsyncBlockCompressedInputStream(stream) : new BlockCompressedInputStream(stream); mStream = new BinaryCodec(new DataInputStream(mCompressedInputStream)); this.eagerDecode = eagerDecode; this.mValidationStringency = validationStringency; @@ -129,7 +125,7 @@ final ValidationStringency validationStringency, final SAMRecordFactory factory) throws IOException { - this(new BlockCompressedInputStream(file), indexFile!=null ? indexFile : SamFiles.findIndex(file), eagerDecode, useAsynchronousIO, file.getAbsolutePath(), validationStringency, factory); + this(useAsynchronousIO ? new AsyncBlockCompressedInputStream(file) : new BlockCompressedInputStream(file), indexFile!=null ? indexFile : SamFiles.findIndex(file), eagerDecode, useAsynchronousIO, file.getAbsolutePath(), validationStringency, factory); if (mIndexFile != null && mIndexFile.lastModified() < file.lastModified()) { System.err.println("WARNING: BAM index file " + mIndexFile.getAbsolutePath() + " is older than BAM " + file.getAbsolutePath()); @@ -145,7 +141,7 @@ final ValidationStringency validationStringency, final SAMRecordFactory factory) throws IOException { - this(new BlockCompressedInputStream(strm), indexFile, eagerDecode, useAsynchronousIO, strm.getSource(), validationStringency, factory); + this(useAsynchronousIO ? new AsyncBlockCompressedInputStream(strm) : new BlockCompressedInputStream(strm), indexFile, eagerDecode, useAsynchronousIO, strm.getSource(), validationStringency, factory); } BAMFileReader(final SeekableStream strm, @@ -155,7 +151,7 @@ final ValidationStringency validationStringency, final SAMRecordFactory factory) throws IOException { - this(new BlockCompressedInputStream(strm), indexStream, eagerDecode, useAsynchronousIO, strm.getSource(), validationStringency, factory); + this(useAsynchronousIO ? new AsyncBlockCompressedInputStream(strm) : new BlockCompressedInputStream(strm), indexStream, eagerDecode, useAsynchronousIO, strm.getSource(), validationStringency, factory); } private BAMFileReader(final BlockCompressedInputStream compressedInputStream, @@ -171,7 +167,6 @@ private BAMFileReader(final BlockCompressedInputStream compressedInputStream, mCompressedInputStream = compressedInputStream; mStream = new BinaryCodec(new DataInputStream(mCompressedInputStream)); this.eagerDecode = eagerDecode; - this.useAsynchronousIO = useAsynchronousIO; this.mValidationStringency = validationStringency; this.samRecordFactory = factory; this.mFileHeader = readHeader(this.mStream, this.mValidationStringency, source); @@ -191,7 +186,6 @@ private BAMFileReader(final BlockCompressedInputStream compressedInputStream, mCompressedInputStream = compressedInputStream; mStream = new BinaryCodec(new DataInputStream(mCompressedInputStream)); this.eagerDecode = eagerDecode; - this.useAsynchronousIO = useAsynchronousIO; this.mValidationStringency = validationStringency; this.samRecordFactory = factory; this.mFileHeader = readHeader(this.mStream, this.mValidationStringency, source); diff --git a/src/main/java/htsjdk/samtools/SamReaderFactory.java b/src/main/java/htsjdk/samtools/SamReaderFactory.java index bf890d569..e4a72c881 100644 --- a/src/main/java/htsjdk/samtools/SamReaderFactory.java +++ b/src/main/java/htsjdk/samtools/SamReaderFactory.java @@ -118,7 +118,7 @@ public SamReader open(final Path path) { abstract public SamReaderFactory validationStringency(final ValidationStringency validationStringency); /** Set whether readers created by this factory will use asynchronous IO. - * If this methods is not called, this flag will default to the value of {@link Defaults#USE_ASYNC_IO_FOR_SAMTOOLS}. + * If this methods is not called, this flag will default to the value of {@link Defaults#USE_ASYNC_IO_READ_FOR_SAMTOOLS}. * Note that this option may not be applicable to all readers returned from this factory. * Returns the factory itself. */ abstract public SamReaderFactory setUseAsyncIo(final boolean asynchronousIO); diff --git a/src/main/java/htsjdk/samtools/util/AsyncBlockCompressedInputStream.java b/src/main/java/htsjdk/samtools/util/AsyncBlockCompressedInputStream.java new file mode 100644 index 000000000..de319d8bf --- /dev/null +++ b/src/main/java/htsjdk/samtools/util/AsyncBlockCompressedInputStream.java @@ -0,0 +1,215 @@ +/* + * The MIT License + * + * Copyright (c) 2016 Daniel Cameron + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ +package htsjdk.samtools.util; + + +import htsjdk.samtools.Defaults; +import htsjdk.samtools.seekablestream.SeekableStream; + +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.net.URL; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; +import java.util.concurrent.Semaphore; +import java.util.concurrent.ThreadFactory; + +/** + * Asynchronous read-ahead implementation of {@link htsjdk.samtools.util.BlockCompressedInputStream}. + * + * Note that this implementation is not synchronized. If multiple threads access an instance concurrently, it must be synchronized externally. + */ +public class AsyncBlockCompressedInputStream extends BlockCompressedInputStream { + private static final int READ_AHEAD_BUFFERS = (int)Math.ceil(Defaults.NON_ZERO_BUFFER_SIZE / BlockCompressedStreamConstants.MAX_COMPRESSED_BLOCK_SIZE); + private static final Executor threadpool = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors(),new ThreadFactory() { + public Thread newThread(Runnable r) { + Thread t = Executors.defaultThreadFactory().newThread(r); + t.setDaemon(true); + return t; + } + }); + /** + * Next blocks (in stream order) that have already been decompressed. + */ + private final BlockingQueue mResult = new ArrayBlockingQueue<>(READ_AHEAD_BUFFERS); + /** + * Buffers used to decompress previous blocks that are no longer in use. + * These buffers are reused if possible. + * Note that no blocking occurs on this buffer and a blocking queue is used purely + * because it is a base library synchronized queue implementation + * (and Collections.synchronizedQueue() does not exist). + */ + private final BlockingQueue freeBuffers = new ArrayBlockingQueue<>(READ_AHEAD_BUFFERS); + /** + * Indicates whether a read-ahead task has been scheduled to run. Only one read-ahead task + * per stream can be scheduled at any one time. + */ + private final Semaphore running = new Semaphore(1); + /** + * Indicates whether any scheduled task should abort processing and terminate + * as soon as possible since the result will be discarded anyway. + */ + private volatile boolean mAbort = false; + + public AsyncBlockCompressedInputStream(final InputStream stream) { + super(stream, true); + } + + public AsyncBlockCompressedInputStream(final File file) + throws IOException { + super(file); + } + + public AsyncBlockCompressedInputStream(final URL url) { + super(url); + } + + public AsyncBlockCompressedInputStream(final SeekableStream strm) { + super(strm); + } + + @Override + protected DecompressedBlock nextBlock(byte[] bufferAvailableForReuse) { + if (bufferAvailableForReuse != null) { + freeBuffers.offer(bufferAvailableForReuse); + } + return nextBlockSync(); + } + + @Override + protected void prepareForSeek() { + flushReadAhead(); + super.prepareForSeek(); + } + + @Override + public void close() throws IOException { + // Suppress interrupts while we close. + final boolean isInterrupted = Thread.interrupted(); + mAbort = true; + try { + flushReadAhead(); + super.close(); + } finally { + if (isInterrupted) Thread.currentThread().interrupt(); + } + } + /** + * Foreground thread blocking operation that aborts all read-ahead tasks + * and flushes all read-ahead results. + */ + private void flushReadAhead() { + final boolean abortStatus = mAbort; + mAbort = true; + try { + // block until the thread pool operation has completed + running.acquire(); + } catch (InterruptedException e) { + throw new RuntimeException("Interrupted waiting for decompression thread", e); + } + // flush any read-ahead results + mResult.clear(); + mAbort = abortStatus; + running.release(); + } + /** + * Ensures that a read-ahead task for this stream exists in the thread pool. + */ + private void ensureReadAhead() { + if (running.tryAcquire()) { + tryQueueTask(); + } + } + /** + * Try to queue another read-ahead buffer + * This method should only be invoked by the owner of the running semaphore + */ + private void tryQueueTask() { + if (mAbort) { + // Potential deadlock between getNextBlock() and flushReadAhead() here + // This requires seek()/close() and another method to be called + // at the same time. Since the parent class is not thread-safe + // this is an acceptable behavior. + running.release(); + return; + } + if (mResult.remainingCapacity() == 0) { + // read-ahead has already filled the results buffer + running.release(); + if (mResult.remainingCapacity() > 0) { + // race condition this second check fixes: + // - worker thread context switch after checking remaining capacity is zero + // - foreground thread calls getNextBlock() repeatedly until blocking + // - worker thread switches back in and releases mutex + // = foreground blocking on mResult.take(), mutex free, no worker + // -> try to take back mutex and start worker + // if that fails, the someone else took the lock and would + // have started the background worker. (except if flushReadAhead() + // took the lock with getNextBlock() still blocking: not thread-safe + // so we don't care) + ensureReadAhead(); + return; + } else { + return; + } + } + // we are able to perform a read-ahead operation + // ownership of the running mutex is now with the threadpool task + threadpool.execute(new AsyncBlockCompressedInputStreamRunnable()); + } + /** + * Foreground thread blocking operation that retrieves the next read-ahead buffer. + * Lazy initiation of read-ahead is performed if required. + * @return next decompressed block in input stream + */ + private DecompressedBlock nextBlockSync() { + ensureReadAhead(); + DecompressedBlock nextBlock; + try { + nextBlock = mResult.take(); + } catch (InterruptedException e) { + return new DecompressedBlock(0, 0, e); + } + ensureReadAhead(); + return nextBlock; + } + private class AsyncBlockCompressedInputStreamRunnable implements Runnable { + /** + * Thread pool operation that fills the read-ahead queue + */ + @Override + public void run() { + final DecompressedBlock decompressed = processNextBlock(freeBuffers.poll()); + if (!mResult.offer(decompressed)) { + // offer should never block since we never queue a task when the results buffer is full + running.release(); // safety release to ensure foreground close() does not block indefinitely + throw new IllegalStateException("Decompression buffer full"); + } + tryQueueTask(); + } + } +} diff --git a/src/main/java/htsjdk/samtools/util/BlockCompressedInputStream.java b/src/main/java/htsjdk/samtools/util/BlockCompressedInputStream.java index b0ac0018e..8b5e922db 100755 --- a/src/main/java/htsjdk/samtools/util/BlockCompressedInputStream.java +++ b/src/main/java/htsjdk/samtools/util/BlockCompressedInputStream.java @@ -41,11 +41,13 @@ import java.nio.ByteOrder; import java.util.Arrays; -/* +/** * Utility class for reading BGZF block compressed files. The caller can treat this file like any other InputStream. * It probably is not necessary to wrap this stream in a buffering stream, because there is internal buffering. * The advantage of BGZF over conventional GZip format is that BGZF allows for seeking without having to read the - * entire file up to the location being sought. Note that seeking is only possible if the ctor(File) is used. + * entire file up to the location being sought. Note that seeking is only possible if the input stream is seekable. + * + * Note that this implementation is not synchronized. If multiple threads access an instance concurrently, it must be synchronized externally. * * c.f. http://samtools.sourceforge.net/SAM1.pdf for details of BGZF format */ @@ -60,13 +62,11 @@ private InputStream mStream = null; private SeekableStream mFile = null; private byte[] mFileBuffer = null; - private byte[] mCurrentBlock = null; + private DecompressedBlock mCurrentBlock = null; private int mCurrentOffset = 0; - private long mBlockAddress = 0; - private int mLastBlockLength = 0; + private long mStreamOffset = 0; private final BlockGunzipper blockGunzipper = new BlockGunzipper(); - /** * Note that seek() is not supported if this ctor is used. */ @@ -128,13 +128,13 @@ public void setCheckCrcs(final boolean check) { * may block in order to fill an internal buffer if it has been exhausted. */ public int available() throws IOException { - if (mCurrentBlock == null || mCurrentOffset == mCurrentBlock.length) { + if (mCurrentBlock == null || mCurrentOffset == mCurrentBlock.mBlock.length) { readBlock(); } if (mCurrentBlock == null) { return 0; } - return mCurrentBlock.length - mCurrentOffset; + return mCurrentBlock.mBlock.length - mCurrentOffset; } /** @@ -142,7 +142,7 @@ public int available() throws IOException { * false otherwise. */ public boolean endOfBlock() { - return (mCurrentBlock != null && mCurrentOffset == mCurrentBlock.length); + return (mCurrentBlock != null && mCurrentOffset == mCurrentBlock.mBlock.length); } /** @@ -169,7 +169,7 @@ public void close() throws IOException { * @return the next byte of data, or -1 if the end of the stream is reached. */ public int read() throws IOException { - return (available() > 0) ? (mCurrentBlock[mCurrentOffset++] & 0xFF) : -1; + return (available() > 0) ? (mCurrentBlock.mBlock[mCurrentOffset++] & 0xFF) : -1; } /** @@ -199,48 +199,47 @@ public int read(final byte[] buffer) throws IOException { * character, or null if the end of the stream has been reached * * @exception IOException If an I/O error occurs - * @ */ public String readLine() throws IOException { - int available = available(); + int available = available(); if (available == 0) { return null; } if(null == buf){ // lazy initialisation - buf = new ByteArrayOutputStream(8192); + buf = new ByteArrayOutputStream(8192); } buf.reset(); - boolean done = false; - boolean foundCr = false; // \r found flag + boolean done = false; + boolean foundCr = false; // \r found flag while (!done) { - int linetmpPos = mCurrentOffset; - int bCnt = 0; - while((available-- > 0)){ - final byte c = mCurrentBlock[linetmpPos++]; - if(c == eol){ // found \n - done = true; - break; - } else if(foundCr){ // previous char was \r - --linetmpPos; // current char is not \n so put it back - done = true; - break; - } else if(c == eolCr){ // found \r - foundCr = true; - continue; // no ++bCnt - } - ++bCnt; - } - if(mCurrentOffset < linetmpPos){ - buf.write(mCurrentBlock, mCurrentOffset, bCnt); - mCurrentOffset = linetmpPos; - } - available = available(); - if(available == 0){ - // EOF - done = true; - } + int linetmpPos = mCurrentOffset; + int bCnt = 0; + while((available-- > 0)){ + final byte c = mCurrentBlock.mBlock[linetmpPos++]; + if(c == eol){ // found \n + done = true; + break; + } else if(foundCr){ // previous char was \r + --linetmpPos; // current char is not \n so put it back + done = true; + break; + } else if(c == eolCr){ // found \r + foundCr = true; + continue; // no ++bCnt + } + ++bCnt; + } + if(mCurrentOffset < linetmpPos) { + buf.write(mCurrentBlock.mBlock, mCurrentOffset, bCnt); + mCurrentOffset = linetmpPos; + } + available = available(); + if(available == 0) { + // EOF + done = true; + } } - return buf.toString(); + return buf.toString(); } /** @@ -267,7 +266,7 @@ public int read(final byte[] buffer, int offset, int length) throws IOException break; } final int copyLength = Math.min(length, available); - System.arraycopy(mCurrentBlock, mCurrentOffset, buffer, offset, copyLength); + System.arraycopy(mCurrentBlock.mBlock, mCurrentOffset, buffer, offset, copyLength); mCurrentOffset += copyLength; offset += copyLength; length -= copyLength; @@ -286,33 +285,42 @@ public void seek(final long pos) throws IOException { throw new IOException(CANNOT_SEEK_STREAM_MSG); } // Decode virtual file pointer - // Upper 48 bits is the byte offset into the compressed stream of a block. - // Lower 16 bits is the byte offset into the uncompressed stream inside the block. + // Upper 48 bits is the byte offset into the compressed stream of a + // block. + // Lower 16 bits is the byte offset into the uncompressed stream inside + // the block. final long compressedOffset = BlockCompressedFilePointerUtil.getBlockAddress(pos); final int uncompressedOffset = BlockCompressedFilePointerUtil.getBlockOffset(pos); final int available; - if (mBlockAddress == compressedOffset && mCurrentBlock != null) { - available = mCurrentBlock.length; + if (mCurrentBlock != null && mCurrentBlock.mBlockAddress == compressedOffset) { + available = mCurrentBlock.mBlock.length; } else { + prepareForSeek(); mFile.seek(compressedOffset); - mBlockAddress = compressedOffset; - mLastBlockLength = 0; - readBlock(); + mStreamOffset = compressedOffset; + mCurrentBlock = nextBlock(getBufferForReuse(mCurrentBlock)); + mCurrentOffset = 0; available = available(); } - if (uncompressedOffset > available || - (uncompressedOffset == available && !eof())) { + if (uncompressedOffset > available || (uncompressedOffset == available && !eof())) { throw new IOException(INVALID_FILE_PTR_MSG + pos + " for " + mFile.getSource()); } mCurrentOffset = uncompressedOffset; } + + /** + * Performs cleanup required before seek is called on the underlying stream + */ + protected void prepareForSeek() { + } private boolean eof() throws IOException { if (mFile.eof()) { return true; } // If the last remaining block is the size of the EMPTY_GZIP_BLOCK, this is the same as being at EOF. - return (mFile.length() - (mBlockAddress + mLastBlockLength) == BlockCompressedStreamConstants.EMPTY_GZIP_BLOCK.length); + return (mFile.length() - (mCurrentBlock.mBlockAddress + + mCurrentBlock.mBlockCompressedSize) == BlockCompressedStreamConstants.EMPTY_GZIP_BLOCK.length); } /** @@ -321,12 +329,17 @@ private boolean eof() throws IOException { * the two. */ public long getFilePointer() { - if (mCurrentOffset == mCurrentBlock.length) { - // If current offset is at the end of the current block, file pointer should point + if (mCurrentBlock == null) { + // Haven't read anything yet = at start of stream + return BlockCompressedFilePointerUtil.makeFilePointer(0, 0); + } + if (mCurrentOffset > 0 && mCurrentOffset == mCurrentBlock.mBlock.length) { + // If current offset is at the end of the current block, file + // pointer should point // to the beginning of the next block. - return BlockCompressedFilePointerUtil.makeFilePointer(mBlockAddress + mLastBlockLength, 0); + return BlockCompressedFilePointerUtil.makeFilePointer(mCurrentBlock.mBlockAddress + mCurrentBlock.mBlockCompressedSize, 0); } - return BlockCompressedFilePointerUtil.makeFilePointer(mBlockAddress, mCurrentOffset); + return BlockCompressedFilePointerUtil.makeFilePointer(mCurrentBlock.mBlockAddress, mCurrentOffset); } @Override @@ -363,49 +376,100 @@ private static boolean isValidBlockHeader(final byte[] buffer) { } private void readBlock() throws IOException { - + mCurrentBlock = nextBlock(getBufferForReuse(mCurrentBlock)); + mCurrentOffset = 0; + checkAndRethrowDecompressionException(); + } + /** + * Reads and decompresses the next block + * @param bufferAvailableForReuse decompression buffer available for reuse + * @return next block in the decompressed stream + */ + protected DecompressedBlock nextBlock(byte[] bufferAvailableForReuse) { + return processNextBlock(bufferAvailableForReuse); + } + /** + * Rethrows an exception encountered during decompression + * @throws IOException + */ + private void checkAndRethrowDecompressionException() throws IOException { + if (mCurrentBlock.mException != null) { + if (mCurrentBlock.mException instanceof IOException) { + throw (IOException) mCurrentBlock.mException; + } else if (mCurrentBlock.mException instanceof RuntimeException) { + throw (RuntimeException) mCurrentBlock.mException; + } else { + throw new RuntimeException(mCurrentBlock.mException); + } + } + } + + /** + * Attempt to reuse the buffer of the given block + * @param block owning block + * @return null decompressiong buffer to resuse, null if no buffer is available + */ + private byte[] getBufferForReuse(DecompressedBlock block) { + if (block == null) return null; + return block.mBlock; + } + + /** + * Decompress the next block from the input stream. When using asynchronous + * IO, this will be called by the background thread. + * @param bufferAvailableForReuse buffer in which to place decompressed block. A null or + * incorrectly sized buffer will result in the buffer being ignored and + * a new buffer allocated for decompression. + * @return next block in input stream + */ + protected DecompressedBlock processNextBlock(byte[] bufferAvailableForReuse) { if (mFileBuffer == null) { mFileBuffer = new byte[BlockCompressedStreamConstants.MAX_COMPRESSED_BLOCK_SIZE]; } - int count = readBytes(mFileBuffer, 0, BlockCompressedStreamConstants.BLOCK_HEADER_LENGTH); - if (count == 0) { - // Handle case where there is no empty gzip block at end. - mCurrentOffset = 0; - mBlockAddress += mLastBlockLength; - mCurrentBlock = new byte[0]; - return; - } - if (count != BlockCompressedStreamConstants.BLOCK_HEADER_LENGTH) { - throw new IOException(INCORRECT_HEADER_SIZE_MSG + mFile.getSource()); - } - final int blockLength = unpackInt16(mFileBuffer, BlockCompressedStreamConstants.BLOCK_LENGTH_OFFSET) + 1; - if (blockLength < BlockCompressedStreamConstants.BLOCK_HEADER_LENGTH || blockLength > mFileBuffer.length) { - throw new IOException(UNEXPECTED_BLOCK_LENGTH_MSG + blockLength + " for " + mFile.getSource()); - } - final int remaining = blockLength - BlockCompressedStreamConstants.BLOCK_HEADER_LENGTH; - count = readBytes(mFileBuffer, BlockCompressedStreamConstants.BLOCK_HEADER_LENGTH, remaining); - if (count != remaining) { - throw new FileTruncatedException(PREMATURE_END_MSG + mFile.getSource()); + long blockAddress = mStreamOffset; + try { + final int headerByteCount = readBytes(mFileBuffer, 0, BlockCompressedStreamConstants.BLOCK_HEADER_LENGTH); + mStreamOffset += headerByteCount; + if (headerByteCount == 0) { + // Handle case where there is no empty gzip block at end. + return new DecompressedBlock(blockAddress, new byte[0], 0); + } + if (headerByteCount != BlockCompressedStreamConstants.BLOCK_HEADER_LENGTH) { + return new DecompressedBlock(blockAddress, headerByteCount, new IOException(INCORRECT_HEADER_SIZE_MSG + mFile.getSource())); + } + final int blockLength = unpackInt16(mFileBuffer, BlockCompressedStreamConstants.BLOCK_LENGTH_OFFSET) + 1; + if (blockLength < BlockCompressedStreamConstants.BLOCK_HEADER_LENGTH || blockLength > mFileBuffer.length) { + return new DecompressedBlock(blockAddress, blockLength, + new IOException(UNEXPECTED_BLOCK_LENGTH_MSG + blockLength + " for " + mFile.getSource())); + } + final int remaining = blockLength - BlockCompressedStreamConstants.BLOCK_HEADER_LENGTH; + final int dataByteCount = readBytes(mFileBuffer, BlockCompressedStreamConstants.BLOCK_HEADER_LENGTH, + remaining); + mStreamOffset += dataByteCount; + if (dataByteCount != remaining) { + return new DecompressedBlock(blockAddress, blockLength, + new FileTruncatedException(PREMATURE_END_MSG + mFile.getSource())); + } + final byte[] decompressed = inflateBlock(mFileBuffer, blockLength, bufferAvailableForReuse); + return new DecompressedBlock(blockAddress, decompressed, blockLength); + } catch (IOException e) { + return new DecompressedBlock(blockAddress, 0, e); } - inflateBlock(mFileBuffer, blockLength); - mCurrentOffset = 0; - mBlockAddress += mLastBlockLength; - mLastBlockLength = blockLength; } - private void inflateBlock(final byte[] compressedBlock, final int compressedLength) throws IOException { - final int uncompressedLength = unpackInt32(compressedBlock, compressedLength-4); - byte[] buffer = mCurrentBlock; - mCurrentBlock = null; - if (buffer == null || buffer.length != uncompressedLength) { - try { - buffer = new byte[uncompressedLength]; - } catch (final NegativeArraySizeException e) { - throw new RuntimeIOException(mFile.getSource() + " has invalid uncompressedLength: " + uncompressedLength, e); - } + private byte[] inflateBlock(final byte[] compressedBlock, final int compressedLength, + final byte[] bufferAvailableForReuse) throws IOException { + final int uncompressedLength = unpackInt32(compressedBlock, compressedLength - 4); + if (uncompressedLength < 0) { + throw new RuntimeIOException(mFile.getSource() + " has invalid uncompressedLength: " + uncompressedLength); + } + byte[] buffer = bufferAvailableForReuse; + if (buffer == null || uncompressedLength != buffer.length) { + // can't reuse the buffer since the size is incorrect + buffer = new byte[uncompressedLength]; } blockGunzipper.unzipBlock(buffer, compressedBlock, compressedLength); - mCurrentBlock = buffer; + return buffer; } private int readBytes(final byte[] buffer, final int offset, final int length) throws IOException { @@ -508,6 +572,38 @@ private static boolean preambleEqual(final byte[] preamble, final byte[] buf, fi } return true; } -} + protected static class DecompressedBlock { + /** + * Decompressed block + */ + private final byte[] mBlock; + /** + * Compressed size of block (the uncompressed size can be found using + * mBlock.length) + */ + private final int mBlockCompressedSize; + /** + * Stream offset of start of block + */ + private final long mBlockAddress; + /** + * Exception thrown (if any) when attempting to decompress block + */ + private final Exception mException; + + public DecompressedBlock(long blockAddress, byte[] block, int compressedSize) { + mBlock = block; + mBlockAddress = blockAddress; + mBlockCompressedSize = compressedSize; + mException = null; + } + public DecompressedBlock(long blockAddress, int compressedSize, Exception exception) { + mBlock = new byte[0]; + mBlockAddress = blockAddress; + mBlockCompressedSize = compressedSize; + mException = exception; + } + } +} diff --git a/src/test/java/htsjdk/samtools/util/AsyncBlockCompressedInputStreamTest.java b/src/test/java/htsjdk/samtools/util/AsyncBlockCompressedInputStreamTest.java new file mode 100644 index 000000000..957a86942 --- /dev/null +++ b/src/test/java/htsjdk/samtools/util/AsyncBlockCompressedInputStreamTest.java @@ -0,0 +1,91 @@ +/* + * The MIT License + * + * Copyright (c) 2016 Daniel Cameron + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ +package htsjdk.samtools.util; + +import org.testng.Assert; +import org.testng.annotations.Test; + +import htsjdk.samtools.seekablestream.SeekableFileStream; + +import java.io.File; +import java.util.ArrayList; +import java.util.List; + +public class AsyncBlockCompressedInputStreamTest { + private final File BAM_FILE = new File("src/test/resources/htsjdk/samtools/BAMFileIndexTest/index_test.bam"); + @Test + public void testAsync() throws Exception { + BlockCompressedInputStream sync = new BlockCompressedInputStream(new SeekableFileStream(BAM_FILE)); + List expected = new ArrayList<>(); + List virtualOffset = new ArrayList<>(); + List length = new ArrayList<>(); + byte[] buffer = new byte[BlockCompressedStreamConstants.MAX_COMPRESSED_BLOCK_SIZE / 2]; + virtualOffset.add(sync.getFilePointer()); + int len = sync.read(buffer); + length.add(len); + while (len > 0) { + expected.add(buffer); + buffer = new byte[buffer.length]; + len = sync.read(buffer); + length.add(len); + virtualOffset.add(sync.getFilePointer()); + } + sync.close(); + buffer = new byte[buffer.length]; + List list = new ArrayList<>(); + for (int i = 0; i < 8; i++) { + list.add(new AsyncBlockCompressedInputStream(new SeekableFileStream(BAM_FILE))); + } + // read till EOF + for (int i = 0; i < expected.size(); i++) { + for (BlockCompressedInputStream async : list) { + len = async.read(buffer); + Assert.assertEquals(len, (int)length.get(i)); + Assert.assertEquals(buffer[0], expected.get(i)[0]); + } + } + for (int j = 0; j < 128; j++) { + // seek and read + for (BlockCompressedInputStream async : list) { + async.seek(virtualOffset.get(0)); + } + for (int i = 0; i < Math.min(expected.size(), 8); i++) { + for (BlockCompressedInputStream async : list) { + len = async.read(buffer); + Assert.assertEquals(len, (int)length.get(i)); + Assert.assertEquals(buffer[0], expected.get(i)[0]); + } + } + } + for (BlockCompressedInputStream async : list) { + async.close(); + } + } + @Test + public void testFilePointer() throws Exception { + BlockCompressedInputStream sync = new BlockCompressedInputStream(BAM_FILE); + Assert.assertEquals(sync.getFilePointer(), 0); + sync.close(); + } +} diff --git a/src/test/java/htsjdk/samtools/util/BlockCompressedInputStreamTest.java b/src/test/java/htsjdk/samtools/util/BlockCompressedInputStreamTest.java new file mode 100644 index 000000000..e36032cdd --- /dev/null +++ b/src/test/java/htsjdk/samtools/util/BlockCompressedInputStreamTest.java @@ -0,0 +1,89 @@ +package htsjdk.samtools.util; + +import java.io.File; +import java.io.FileInputStream; +import java.nio.file.Files; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import org.testng.Assert; +import org.testng.annotations.Test; + +import htsjdk.samtools.seekablestream.SeekableFileStream; + +public class BlockCompressedInputStreamTest { + // random data pulled from /dev/random then compressed using bgzip from tabix + private static final File BLOCK_UNCOMPRESSED = new File("src/test/resources/htsjdk/samtools/util/random.bin"); + private static final File BLOCK_COMPRESSED = new File("src/test/resources/htsjdk/samtools/util/random.bin.gz"); + private static final long[] BLOCK_COMPRESSED_OFFSETS = new long[] { 0, 0xfc2e, 0x1004d, 0x1fc7b, 0x2009a, }; + private static final long[] BLOCK_UNCOMPRESSED_END_POSITIONS = new long[] { 64512, 65536, 130048 }; + @Test + public void stream_should_match_uncompressed_stream() throws Exception { + byte[] uncompressed = Files.readAllBytes(BLOCK_UNCOMPRESSED.toPath()); + try (BlockCompressedInputStream stream = new BlockCompressedInputStream(new FileInputStream(BLOCK_COMPRESSED))) { + for (int i = 0; i < uncompressed.length; i++) { + Assert.assertEquals(stream.read(), Byte.toUnsignedInt(uncompressed[i])); + } + Assert.assertTrue(stream.endOfBlock()); + } + } + @Test + public void endOfBlock_should_be_true_only_when_entire_block_is_read() throws Exception { + long size = BLOCK_UNCOMPRESSED.length(); + // input file contains 5 blocks + List offsets = new ArrayList<>(); + for (int i = 0; i < BLOCK_UNCOMPRESSED_END_POSITIONS.length; i++) { + offsets.add(BLOCK_UNCOMPRESSED_END_POSITIONS[i]); + } + List endOfBlockTrue = new ArrayList<>(); + try (BlockCompressedInputStream stream = new BlockCompressedInputStream(new FileInputStream(BLOCK_COMPRESSED))) { + for (long i = 0; i < size; i++) { + if (stream.endOfBlock()) { + endOfBlockTrue.add(i); + } + stream.read(); + } + } + Assert.assertEquals(endOfBlockTrue, offsets); + } + @Test + public void decompression_should_cross_block_boundries() throws Exception { + byte[] uncompressed = Files.readAllBytes(BLOCK_UNCOMPRESSED.toPath()); + try (BlockCompressedInputStream stream = new BlockCompressedInputStream(new FileInputStream(BLOCK_COMPRESSED))) { + byte[] decompressed = new byte[uncompressed.length]; + stream.read(decompressed); + Assert.assertEquals(decompressed, uncompressed); + Assert.assertTrue(stream.endOfBlock()); + Assert.assertEquals(stream.read(), -1); + } + } + @Test + public void seek_should_read_block() throws Exception { + byte[] uncompressed = Files.readAllBytes(BLOCK_UNCOMPRESSED.toPath()); + try (SeekableFileStream sfs = new SeekableFileStream(BLOCK_COMPRESSED)) { + try (BlockCompressedInputStream stream = new BlockCompressedInputStream(sfs)) { + // seek to the start of the first block + for (int i = 0; i < BLOCK_COMPRESSED_OFFSETS.length-1; i++) { + stream.seek(BLOCK_COMPRESSED_OFFSETS[i] << 16); + Assert.assertEquals(sfs.position(), BLOCK_COMPRESSED_OFFSETS[i + 1]); + // check + byte[] actual = new byte[uncompressed.length]; + int len = stream.read(actual); + actual = Arrays.copyOf(actual, len); + byte[] expected = Arrays.copyOfRange(uncompressed, uncompressed.length - actual.length, uncompressed.length); + Assert.assertEquals(actual, expected); + } + } + } + } + @Test + public void available_should_return_number_of_bytes_left_in_current_block() throws Exception { + try (BlockCompressedInputStream stream = new BlockCompressedInputStream(BLOCK_COMPRESSED)) { + for (int i = 0; i < BLOCK_UNCOMPRESSED_END_POSITIONS[0]; i++) { + Assert.assertEquals(stream.available(), BLOCK_UNCOMPRESSED_END_POSITIONS[0] - i); + stream.read(); + } + } + } +} diff --git a/src/test/resources/htsjdk/samtools/util/random.bin b/src/test/resources/htsjdk/samtools/util/random.bin new file mode 100644 index 000000000..f59b24766 Binary files /dev/null and b/src/test/resources/htsjdk/samtools/util/random.bin differ diff --git a/src/test/resources/htsjdk/samtools/util/random.bin.gz b/src/test/resources/htsjdk/samtools/util/random.bin.gz new file mode 100644 index 000000000..cd764e0b3 Binary files /dev/null and b/src/test/resources/htsjdk/samtools/util/random.bin.gz differ