From e14ebe5b5d25cf5580add58d11027256dc16939a Mon Sep 17 00:00:00 2001 From: Louis Bergelson Date: Tue, 6 Jun 2017 14:30:52 -0400 Subject: [PATCH 1/2] overloading BlockCompressedInputStream.checkTerminator to support NIO adding overloads that take Path and SeekableByteChannel --- .../samtools/util/BlockCompressedInputStream.java | 82 +++++++++++++--------- 1 file changed, 47 insertions(+), 35 deletions(-) diff --git a/src/main/java/htsjdk/samtools/util/BlockCompressedInputStream.java b/src/main/java/htsjdk/samtools/util/BlockCompressedInputStream.java index 066a0c001..ecb5f57a3 100755 --- a/src/main/java/htsjdk/samtools/util/BlockCompressedInputStream.java +++ b/src/main/java/htsjdk/samtools/util/BlockCompressedInputStream.java @@ -32,16 +32,15 @@ import htsjdk.samtools.seekablestream.SeekableStream; import htsjdk.samtools.util.zip.InflaterFactory; -import java.io.ByteArrayOutputStream; -import java.io.File; -import java.io.IOException; -import java.io.InputStream; -import java.io.RandomAccessFile; +import java.io.*; import java.net.URL; import java.nio.ByteBuffer; import java.nio.ByteOrder; +import java.nio.channels.SeekableByteChannel; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.StandardOpenOption; import java.util.Arrays; -import java.util.zip.Inflater; /** * Utility class for reading BGZF block compressed files. The caller can treat this file like any other InputStream. @@ -588,40 +587,53 @@ private int unpackInt32(final byte[] buffer, final int offset) { public enum FileTermination {HAS_TERMINATOR_BLOCK, HAS_HEALTHY_LAST_BLOCK, DEFECTIVE} public static FileTermination checkTermination(final File file) throws IOException { - final long fileSize = file.length(); + return checkTermination(file == null ? null : file.toPath()); + } + + public static FileTermination checkTermination(final Path path) throws IOException { + try( final SeekableByteChannel channel = Files.newByteChannel(path, StandardOpenOption.READ) ){ + return checkTermination(channel); + } + } + + public static FileTermination checkTermination(SeekableByteChannel channel) throws IOException { + final long fileSize = channel.size(); if (fileSize < BlockCompressedStreamConstants.EMPTY_GZIP_BLOCK.length) { return FileTermination.DEFECTIVE; } - final RandomAccessFile raFile = new RandomAccessFile(file, "r"); - try { - raFile.seek(fileSize - BlockCompressedStreamConstants.EMPTY_GZIP_BLOCK.length); - byte[] buf = new byte[BlockCompressedStreamConstants.EMPTY_GZIP_BLOCK.length]; - raFile.readFully(buf); - if (Arrays.equals(buf, BlockCompressedStreamConstants.EMPTY_GZIP_BLOCK)) { - return FileTermination.HAS_TERMINATOR_BLOCK; + channel.position(fileSize - BlockCompressedStreamConstants.EMPTY_GZIP_BLOCK.length); + + final ByteBuffer bytebuffer = ByteBuffer.allocate(BlockCompressedStreamConstants.EMPTY_GZIP_BLOCK.length); + readFully(channel, bytebuffer); + if (Arrays.equals(bytebuffer.array(), BlockCompressedStreamConstants.EMPTY_GZIP_BLOCK)) { + return FileTermination.HAS_TERMINATOR_BLOCK; + } + final int bufsize = (int)Math.min(fileSize, BlockCompressedStreamConstants.MAX_COMPRESSED_BLOCK_SIZE); + final byte[] buf = new byte[bufsize]; + channel.position(fileSize - bufsize); + readFully(channel, ByteBuffer.wrap(buf)); + for (int i = buf.length - BlockCompressedStreamConstants.EMPTY_GZIP_BLOCK.length; + i >= 0; --i) { + if (!preambleEqual(BlockCompressedStreamConstants.GZIP_BLOCK_PREAMBLE, + buf, i, BlockCompressedStreamConstants.GZIP_BLOCK_PREAMBLE.length)) { + continue; } - final int bufsize = (int)Math.min(fileSize, BlockCompressedStreamConstants.MAX_COMPRESSED_BLOCK_SIZE); - buf = new byte[bufsize]; - raFile.seek(fileSize - bufsize); - raFile.read(buf); - for (int i = buf.length - BlockCompressedStreamConstants.EMPTY_GZIP_BLOCK.length; - i >= 0; --i) { - if (!preambleEqual(BlockCompressedStreamConstants.GZIP_BLOCK_PREAMBLE, - buf, i, BlockCompressedStreamConstants.GZIP_BLOCK_PREAMBLE.length)) { - continue; - } - final ByteBuffer byteBuffer = ByteBuffer.wrap(buf, i + BlockCompressedStreamConstants.GZIP_BLOCK_PREAMBLE.length, 4); - byteBuffer.order(ByteOrder.LITTLE_ENDIAN); - final int totalBlockSizeMinusOne = byteBuffer.getShort() & 0xFFFF; - if (buf.length - i == totalBlockSizeMinusOne + 1) { - return FileTermination.HAS_HEALTHY_LAST_BLOCK; - } else { - return FileTermination.DEFECTIVE; - } + final ByteBuffer byteBuffer = ByteBuffer.wrap(buf, i + BlockCompressedStreamConstants.GZIP_BLOCK_PREAMBLE.length, 4); + byteBuffer.order(ByteOrder.LITTLE_ENDIAN); + final int totalBlockSizeMinusOne = byteBuffer.getShort() & 0xFFFF; + if (buf.length - i == totalBlockSizeMinusOne + 1) { + return FileTermination.HAS_HEALTHY_LAST_BLOCK; + } else { + return FileTermination.DEFECTIVE; } - return FileTermination.DEFECTIVE; - } finally { - raFile.close(); + } + return FileTermination.DEFECTIVE; + } + + private static void readFully(SeekableByteChannel channel, ByteBuffer dst) throws IOException { + final int bytesRead = channel.read(dst); + if (bytesRead < dst.capacity()){ + throw new EOFException(); } } From 1734eb99e5dcf16d92febead5e1b62323e0b6199 Mon Sep 17 00:00:00 2001 From: Louis Bergelson Date: Wed, 7 Jun 2017 18:20:09 -0400 Subject: [PATCH 2/2] responding to david's comments --- .../samtools/util/BlockCompressedInputStream.java | 94 ++++++++++++++++------ .../util/BlockCompressedTerminatorTest.java | 87 +++++++++++++++++--- 2 files changed, 145 insertions(+), 36 deletions(-) diff --git a/src/main/java/htsjdk/samtools/util/BlockCompressedInputStream.java b/src/main/java/htsjdk/samtools/util/BlockCompressedInputStream.java index ecb5f57a3..e108d1bb3 100755 --- a/src/main/java/htsjdk/samtools/util/BlockCompressedInputStream.java +++ b/src/main/java/htsjdk/samtools/util/BlockCompressedInputStream.java @@ -586,51 +586,95 @@ private int unpackInt32(final byte[] buffer, final int offset) { public enum FileTermination {HAS_TERMINATOR_BLOCK, HAS_HEALTHY_LAST_BLOCK, DEFECTIVE} + /** + * + * @param file the file to check + * @return status of the last compressed block + * @throws IOException + */ public static FileTermination checkTermination(final File file) throws IOException { return checkTermination(file == null ? null : file.toPath()); } + /** + * + * @param path to the file to check + * @return status of the last compressed block + * @throws IOException + */ public static FileTermination checkTermination(final Path path) throws IOException { try( final SeekableByteChannel channel = Files.newByteChannel(path, StandardOpenOption.READ) ){ return checkTermination(channel); } } + /** + * check the status of the final bzgipped block for the given bgzipped resource + * + * @param channel an open channel to read from, + * the channel will remain open and the initial position will be restored when the operation completes + * this makes no guarantee about the state of the channel if an exception is thrown during reading + * + * @return the status of the last compressed black + * @throws IOException + */ public static FileTermination checkTermination(SeekableByteChannel channel) throws IOException { final long fileSize = channel.size(); if (fileSize < BlockCompressedStreamConstants.EMPTY_GZIP_BLOCK.length) { return FileTermination.DEFECTIVE; } - channel.position(fileSize - BlockCompressedStreamConstants.EMPTY_GZIP_BLOCK.length); - - final ByteBuffer bytebuffer = ByteBuffer.allocate(BlockCompressedStreamConstants.EMPTY_GZIP_BLOCK.length); - readFully(channel, bytebuffer); - if (Arrays.equals(bytebuffer.array(), BlockCompressedStreamConstants.EMPTY_GZIP_BLOCK)) { - return FileTermination.HAS_TERMINATOR_BLOCK; - } - final int bufsize = (int)Math.min(fileSize, BlockCompressedStreamConstants.MAX_COMPRESSED_BLOCK_SIZE); - final byte[] buf = new byte[bufsize]; - channel.position(fileSize - bufsize); - readFully(channel, ByteBuffer.wrap(buf)); - for (int i = buf.length - BlockCompressedStreamConstants.EMPTY_GZIP_BLOCK.length; - i >= 0; --i) { - if (!preambleEqual(BlockCompressedStreamConstants.GZIP_BLOCK_PREAMBLE, - buf, i, BlockCompressedStreamConstants.GZIP_BLOCK_PREAMBLE.length)) { - continue; + final long initialPosition = channel.position(); + boolean exceptionThrown = false; + try { + channel.position(fileSize - BlockCompressedStreamConstants.EMPTY_GZIP_BLOCK.length); + + //Check if the end of the file is an empty gzip block which is used as the terminator for a bgzipped file + final ByteBuffer lastBlockBuffer = ByteBuffer.allocate(BlockCompressedStreamConstants.EMPTY_GZIP_BLOCK.length); + readFully(channel, lastBlockBuffer); + if (Arrays.equals(lastBlockBuffer.array(), BlockCompressedStreamConstants.EMPTY_GZIP_BLOCK)) { + return FileTermination.HAS_TERMINATOR_BLOCK; } - final ByteBuffer byteBuffer = ByteBuffer.wrap(buf, i + BlockCompressedStreamConstants.GZIP_BLOCK_PREAMBLE.length, 4); - byteBuffer.order(ByteOrder.LITTLE_ENDIAN); - final int totalBlockSizeMinusOne = byteBuffer.getShort() & 0xFFFF; - if (buf.length - i == totalBlockSizeMinusOne + 1) { - return FileTermination.HAS_HEALTHY_LAST_BLOCK; - } else { - return FileTermination.DEFECTIVE; + + //if the last block isn't an empty gzip block, check to see if it is a healthy compressed block or if it's corrupted + final int bufsize = (int) Math.min(fileSize, BlockCompressedStreamConstants.MAX_COMPRESSED_BLOCK_SIZE); + final byte[] bufferArray = new byte[bufsize]; + channel.position(fileSize - bufsize); + readFully(channel, ByteBuffer.wrap(bufferArray)); + for (int i = bufferArray.length - BlockCompressedStreamConstants.EMPTY_GZIP_BLOCK.length; + i >= 0; --i) { + if (!preambleEqual(BlockCompressedStreamConstants.GZIP_BLOCK_PREAMBLE, + bufferArray, i, BlockCompressedStreamConstants.GZIP_BLOCK_PREAMBLE.length)) { + continue; + } + final ByteBuffer byteBuffer = ByteBuffer.wrap(bufferArray, + i + BlockCompressedStreamConstants.GZIP_BLOCK_PREAMBLE.length, + 4); + byteBuffer.order(ByteOrder.LITTLE_ENDIAN); + final int totalBlockSizeMinusOne = byteBuffer.getShort() & 0xFFFF; + if (bufferArray.length - i == totalBlockSizeMinusOne + 1) { + return FileTermination.HAS_HEALTHY_LAST_BLOCK; + } else { + return FileTermination.DEFECTIVE; + } + } + return FileTermination.DEFECTIVE; + } catch (final Throwable e) { + exceptionThrown = true; + throw e; + } finally { + //if an exception was thrown we don't want to reset the position because that would be likely to throw again + //and suppress the initial exception + if(!exceptionThrown) { + channel.position(initialPosition); } } - return FileTermination.DEFECTIVE; } - private static void readFully(SeekableByteChannel channel, ByteBuffer dst) throws IOException { + /** + * read as many bytes as dst's capacity into dst or throw if that's not possible + * @throws EOFException if channel has fewer bytes available than dst's capacity + */ + static void readFully(SeekableByteChannel channel, ByteBuffer dst) throws IOException { final int bytesRead = channel.read(dst); if (bytesRead < dst.capacity()){ throw new EOFException(); diff --git a/src/test/java/htsjdk/samtools/util/BlockCompressedTerminatorTest.java b/src/test/java/htsjdk/samtools/util/BlockCompressedTerminatorTest.java index d9d20ccef..4a14bd920 100644 --- a/src/test/java/htsjdk/samtools/util/BlockCompressedTerminatorTest.java +++ b/src/test/java/htsjdk/samtools/util/BlockCompressedTerminatorTest.java @@ -23,38 +23,103 @@ */ package htsjdk.samtools.util; +import com.google.common.jimfs.Configuration; +import com.google.common.jimfs.Jimfs; import htsjdk.HtsjdkTest; +import htsjdk.samtools.SeekableByteChannelFromBuffer; import org.testng.Assert; +import org.testng.annotations.DataProvider; import org.testng.annotations.Test; +import java.io.EOFException; import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.SeekableByteChannel; +import java.nio.file.FileSystem; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Arrays; /** * @author alecw@broadinstitute.org */ public class BlockCompressedTerminatorTest extends HtsjdkTest { private static final File TEST_DATA_DIR = new File("src/test/resources/htsjdk/samtools/util"); + private static final File DEFECTIVE = new File(TEST_DATA_DIR, "defective_bgzf.bam"); + private static final File NO_TERMINATOR = new File(TEST_DATA_DIR, "no_bgzf_terminator.bam"); - @Test - public void testFileWithTerminator() throws Exception { + @DataProvider + public Object[][] getFiles() throws IOException { + return new Object[][]{ + {getValidCompressedFile(), BlockCompressedInputStream.FileTermination.HAS_TERMINATOR_BLOCK}, + {NO_TERMINATOR, BlockCompressedInputStream.FileTermination.HAS_HEALTHY_LAST_BLOCK}, + {DEFECTIVE, BlockCompressedInputStream.FileTermination.DEFECTIVE} + }; + } + + @Test( dataProvider = "getFiles") + public void testCheckTerminationForFiles(File compressedFile, BlockCompressedInputStream.FileTermination expected) throws IOException { + Assert.assertEquals(BlockCompressedInputStream.checkTermination(compressedFile), expected); + } + + @Test( dataProvider = "getFiles") + public void testCheckTerminationForPaths(File compressedFile, BlockCompressedInputStream.FileTermination expected) throws IOException { + try(FileSystem fs = Jimfs.newFileSystem("test", Configuration.unix())){ + final Path compressedFileInJimfs = Files.copy(compressedFile.toPath(), fs.getPath("something")); + Assert.assertEquals(BlockCompressedInputStream.checkTermination(compressedFileInJimfs), expected); + } + } + + @Test( dataProvider = "getFiles") + public void testCheckTerminationForSeekableByteChannels(File compressedFile, BlockCompressedInputStream.FileTermination expected) throws IOException { + try(SeekableByteChannel channel = Files.newByteChannel(compressedFile.toPath())){ + Assert.assertEquals(BlockCompressedInputStream.checkTermination(channel), expected); + } + } + + @Test(dataProvider = "getFiles") + public void testChannelPositionIsRestored(File compressedFile, BlockCompressedInputStream.FileTermination expected) throws IOException { + final long position = 50; + try(SeekableByteChannel channel = Files.newByteChannel(compressedFile.toPath())){ + channel.position(position); + Assert.assertEquals(channel.position(), position); + Assert.assertEquals(BlockCompressedInputStream.checkTermination(channel), expected); + Assert.assertEquals(channel.position(), position); + } + } + + private static File getValidCompressedFile() throws IOException { final File tmpCompressedFile = File.createTempFile("test.", ".bgzf"); tmpCompressedFile.deleteOnExit(); final BlockCompressedOutputStream os = new BlockCompressedOutputStream(tmpCompressedFile); os.write("Hi, Mom!\n".getBytes()); os.close(); - Assert.assertEquals(BlockCompressedInputStream.checkTermination(tmpCompressedFile), - BlockCompressedInputStream.FileTermination.HAS_TERMINATOR_BLOCK); + return tmpCompressedFile; } @Test - public void testValidFileWithoutTerminator() throws Exception { - Assert.assertEquals(BlockCompressedInputStream.checkTermination(new File(TEST_DATA_DIR, "no_bgzf_terminator.bam")), - BlockCompressedInputStream.FileTermination.HAS_HEALTHY_LAST_BLOCK); + public void testReadFullyReadsBytesCorrectly() throws IOException { + try(final SeekableByteChannel channel = Files.newByteChannel(DEFECTIVE.toPath())){ + final ByteBuffer readBuffer = ByteBuffer.allocate(10); + Assert.assertTrue(channel.size() > readBuffer.capacity()); + BlockCompressedInputStream.readFully(channel, readBuffer); + + ByteBuffer expected = ByteBuffer.allocate(10); + channel.position(0).read(expected); + Assert.assertEquals(readBuffer.array(), expected.array()); + } } - @Test - public void testDefectiveFile() throws Exception { - Assert.assertEquals(BlockCompressedInputStream.checkTermination(new File(TEST_DATA_DIR, "defective_bgzf.bam")), - BlockCompressedInputStream.FileTermination.DEFECTIVE); + @Test(expectedExceptions = EOFException.class) + public void testReadFullyThrowWhenItCantReadEnough() throws IOException { + try(final SeekableByteChannel channel = Files.newByteChannel(DEFECTIVE.toPath())){ + final ByteBuffer readBuffer = ByteBuffer.allocate(1000); + Assert.assertTrue(channel.size() < readBuffer.capacity()); + BlockCompressedInputStream.readFully(channel, readBuffer); + } } + + + }