From b2a2c1d19e8de6e6bab7f9e97dd2b0f2c410fb8c Mon Sep 17 00:00:00 2001 From: chenzero Date: Sat, 20 Jan 2018 22:12:51 +0800 Subject: [PATCH] NET-652 Ftp Connection Resuming --- .../org/apache/commons/net/ftp/FTPClient.java | 218 +++++++++++++++++- .../java/org/apache/commons/net/io/Util.java | 76 ++++++ 2 files changed, 293 insertions(+), 1 deletion(-) diff --git a/src/main/java/org/apache/commons/net/ftp/FTPClient.java b/src/main/java/org/apache/commons/net/ftp/FTPClient.java index 9b9c2601c..015e392fc 100644 --- a/src/main/java/org/apache/commons/net/ftp/FTPClient.java +++ b/src/main/java/org/apache/commons/net/ftp/FTPClient.java @@ -19,11 +19,13 @@ import java.io.BufferedOutputStream; import java.io.BufferedReader; import java.io.BufferedWriter; +import java.io.File; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; import java.io.OutputStream; import java.io.OutputStreamWriter; +import java.io.RandomAccessFile; import java.io.Reader; import java.net.Inet6Address; import java.net.InetAddress; @@ -642,7 +644,7 @@ private boolean __storeFile(FTPCmd command, String remote, InputStream local) throws IOException { return _storeFile(command.getCommand(), remote, local); - } + } /** * @since 3.1 @@ -698,6 +700,13 @@ CopyStreamEvent.UNKNOWN_STREAM_SIZE, __mergeListeners(csl), } } } + + static class ProgressData { + public long remoteTimestamp=0; + public long remoteFileLength=0; + public long localOffset=0; + public boolean restEnabled=true; // whether REST command is supported by server + } private OutputStream __storeFileStream(FTPCmd command, String remote) throws IOException @@ -1888,6 +1897,213 @@ public boolean retrieveFile(String remote, OutputStream local) { return _retrieveFile(FTPCmd.RETR.getCommand(), remote, local); } + + /** + * progress data signature + */ + public static final String PROGRESS_SIGNATURE="39DF01B317C94CC4A62F9CAD8A1025D3"; + /** + * progress data block length in bytes. + */ + public static final int PROGRESS_DATA_LEN=48; + + /** + * read progress data from local file. + * progres data: 32 bytes signature + 8 bytes remote timestamp + 8 bytes local offset + * @param f + * @param expectTimestamp + * @return + * @throws Exception + */ + protected static RandomAccessFile readProgressData(File f, ProgressData pd) + throws IOException + { + RandomAccessFile rf = new RandomAccessFile(f, "rwd"); + + if(pd.remoteFileLength<1000) { // too small file + pd.restEnabled = false; + pd.localOffset = 0; + + rf.setLength(pd.remoteFileLength); + } + else { + boolean recreate = true; + + do { + if(f.exists()) { + long flen = rf.length(); + if (flen==pd.remoteFileLength) { + rf.seek(flen - PROGRESS_DATA_LEN); + byte[] buf = new byte[32]; + rf.readFully(buf); + String s = new String(buf, "UTF-8"); + if(!PROGRESS_SIGNATURE.equals(s)) { + break; + } + + long ts2 = rf.readLong(); + if(ts2!=pd.remoteTimestamp) { + break; + } + + pd.localOffset = rf.readLong(); + if(pd.localOffset<0) { + break; + } + + recreate = false; + } + else { + } + } + else { + } + } while (false); + + if(recreate) { + pd.localOffset = 0; + rf.setLength(pd.remoteFileLength); + rf.seek(pd.remoteFileLength-PROGRESS_DATA_LEN); + byte[] bytes = PROGRESS_SIGNATURE.getBytes("UTF-8"); + rf.write(bytes); + rf.writeLong(pd.remoteTimestamp); + rf.writeLong(pd.localOffset); + } + } + return rf; + } + + + /** + * this is same with retrieveFile(String, InputStream), besides, to supports connection resuming + * @param remote + * @param local + * @return + * @throws IOException + */ + public boolean retrieveFile(String remote, File local) throws IOException { + FTPFile[] fs = this.listFiles(remote); + if(fs.length!=1) { + return false; + } + + ProgressData pd = new ProgressData(); + pd.remoteFileLength = fs[0].getSize(); + pd.remoteTimestamp = fs[0].getTimestamp().getTimeInMillis(); + + RandomAccessFile rf = readProgressData(local, pd); + + boolean ret = _retrieveFile(FTPCmd.RETR.getCommand(), remote, rf, pd); + rf.close(); + if(ret) { + local.setLastModified(pd.remoteTimestamp); + } + + return ret; + } + + + private static class ProgressDataUpdater implements CopyStreamListener { + long flushByteTransferred = 0; + + RandomAccessFile file; + + public ProgressDataUpdater(RandomAccessFile file) throws SocketException { + this.file = file; + } + + @Override + public void bytesTransferred(CopyStreamEvent event) { + bytesTransferred(event.getTotalBytesTransferred(), event.getBytesTransferred(), event.getStreamSize()); + } + + @Override + public void bytesTransferred(long totalBytesTransferred, int bytesTransferred, long streamSize) { + this.flushByteTransferred += bytesTransferred; + try { + if(totalBytesTransferred + PROGRESS_DATA_LEN 1024) { + file.getFD().sync(); + + flushByteTransferred=0; + } + + file.seek(pos); + } + else { + // nop + } + } + catch(Exception e) { + e.printStackTrace(); + } + } + } + + private boolean _retrieveFile(String command, String remote, RandomAccessFile local, + ProgressData pd) throws IOException { + if(pd.localOffset!=0) { + int rc = this.rest(""+ pd.localOffset); + if(!FTPReply.isPositiveIntermediate(rc)) { // server not support REST command + pd.restEnabled = false; + pd.localOffset = 0; + } + } + + local.seek(pd.localOffset); + + Socket socket = _openDataConnection_(command, remote); + + if (socket == null) { + return false; + } + + final InputStream input; + if (__fileType == ASCII_FILE_TYPE) { + input = new FromNetASCIIInputStream(getBufferedInputStream(socket.getInputStream())); + } + else { + input = getBufferedInputStream(socket.getInputStream()); + } + + CopyStreamAdapter csa = new CopyStreamAdapter(); + + if(this.__copyStreamListener!=null) { + csa.addCopyStreamListener(this.__copyStreamListener); + } + + CSL csl = null; + if (__controlKeepAliveTimeout > 0) { + csl = new CSL(this, __controlKeepAliveTimeout, __controlKeepAliveReplyTimeout); + csa.addCopyStreamListener(csl); + } + + if(pd.restEnabled) { + ProgressDataUpdater pdu = new ProgressDataUpdater(local); + csa.addCopyStreamListener(pdu); + } + + // Treat everything else as binary for now + try { + Util.copyStream(input, local, pd.localOffset, getBufferSize(), pd.remoteFileLength, csa, false); + + // Get the transfer response + return completePendingCommand(); + } + finally { + Util.closeQuietly(input); + Util.closeQuietly(socket); + if (csl != null) { + __cslDebug = csl.cleanUp(); // fetch any outstanding keepalive replies + } + } + } /** * @param command the command to get diff --git a/src/main/java/org/apache/commons/net/io/Util.java b/src/main/java/org/apache/commons/net/io/Util.java index 1f4e5b500..b9c7429a6 100644 --- a/src/main/java/org/apache/commons/net/io/Util.java +++ b/src/main/java/org/apache/commons/net/io/Util.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.io.RandomAccessFile; import java.io.Reader; import java.io.Writer; import java.net.Socket; @@ -140,6 +141,81 @@ public static final long copyStream(InputStream source, OutputStream dest, return total; } + + /*** + * Copies the contents of an InputStream to a RandomAccessFile using a + * copy buffer of a given size and notifies the provided + * CopyStreamListener of the progress of the copy operation by calling + * its bytesTransferred(long, int) method after each write to the + * destination. If you wish to notify more than one listener you should + * use a CopyStreamAdapter as the listener and register the additional + * listeners with the CopyStreamAdapter. + *

+ * The contents of the InputStream are + * read until the end of the stream is reached, but neither the + * source nor the destination are closed. You must do this yourself + * outside of the method call. The number of bytes read/written is + * returned. + * + * @param source The source InputStream. + * @param dest The destination RandomAccessFile. + * @param offset the file resuming offset. + * @param bufferSize The number of bytes to buffer during the copy. + * A zero or negative value means to use {@link #DEFAULT_COPY_BUFFER_SIZE}. + * @param streamSize The number of bytes in the stream being copied. + * should NOT set to CopyStreamEvent.UNKNOWN_STREAM_SIZE. + * @param listener The CopyStreamListener to notify of progress. If + * this parameter is null, notification is not attempted. + * @param flush Whether to flush the output stream after every + * write. This is necessary for interactive sessions that rely on + * buffered streams. If you don't flush, the data will stay in + * the stream buffer. + * @return number of bytes read/written + * @throws CopyStreamException If an error occurs while reading from the + * source or writing to the destination. The CopyStreamException + * will contain the number of bytes confirmed to have been + * transferred before an + * IOException occurred, and it will also contain the IOException + * that caused the error. These values can be retrieved with + * the CopyStreamException getTotalBytesTransferred() and + * getIOException() methods. + */ + public static final long copyStream(InputStream source, RandomAccessFile dest, long offset, + int bufferSize, long streamSize, + CopyStreamListener listener, + boolean flush) + throws CopyStreamException + { + int numBytes; + long total = offset; + byte[] buffer = new byte[bufferSize > 0 ? bufferSize : DEFAULT_COPY_BUFFER_SIZE]; + + try { + while ((numBytes = source.read(buffer)) != -1) { + // Technically, some read(byte[]) methods may return 0 and we + // cannot + // accept that as an indication of EOF. + + if (numBytes == 0) { + continue; + } + + dest.write(buffer, 0, numBytes); + if (flush) { + dest.getFD().sync(); + } + total += numBytes; + if (listener != null) { + listener.bytesTransferred(total, numBytes, streamSize); + } + } + } + catch (IOException e) { + throw new CopyStreamException("IOException caught while copying.", total, e); + } + + return total; + } /*** * Copies the contents of an InputStream to an OutputStream using a