多线程下载技术,简单的说就是把要下载的文件分成几块,由不同的线程来负责每一块数据的下载任务。
1、文件分块。 文件分块大小(blockSize)= (文件大小 +线程数 - 1 )/ 线程数 ;
2、确定每一个线程所要下载的 文件的起始和结束位置。
现假设为每个线程分别编号:0,1, 2,3;则
第一个线程负责的下载位置是: 0*blockSize - (0+1)*blockSize -1,
第二个线程负责的下载位置是: 1*blockSize - (1+1)*blockSize -1,
以此类推第i个线程负责的下载位置是:i*blockSize - (i+1)*blockSize -1;
即线程(编号为id)下载开始位置 start = id*block;
即线程(编号为id)下载结束位置 end = (id+1)*block -1;
3、设置http 请求头, conn.setRequestProperty(“Range”, “bytes=” + start + “-” + end);
一个简单的Java多线程下载代码如下:
package com.ricky.java.test.download;import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.RandomAccessFile;
import java.net.HttpURLConnection;
import java.net.URL;public class Downloader {private URL url; // 目标地址private File file; // 本地文件private static final int THREAD_AMOUNT = 8; // 线程数private static final String DOWNLOAD_DIR_PATH = "D:/Download"; // 下载目录private int threadLen; // 每个线程下载多少public Downloader(String address, String filename) throws IOException { // 通过构造函数传入下载地址url = new URL(address);File dir = new File(DOWNLOAD_DIR_PATH);if(!dir.exists()){dir.mkdirs();}file = new File(dir, filename);}public void download() throws IOException {HttpURLConnection conn = (HttpURLConnection) url.openConnection();conn.setConnectTimeout(5000);int totalLen = conn.getContentLength(); // 获取文件长度threadLen = (totalLen + THREAD_AMOUNT - 1) / THREAD_AMOUNT; // 计算每个线程要下载的长度System.out.println("totalLen="+totalLen+",threadLen:"+threadLen);RandomAccessFile raf = new RandomAccessFile(file, "rws"); // 在本地创建一个和服务端大小相同的文件raf.setLength(totalLen); // 设置文件的大小raf.close();for (int i = 0; i
// new Downloader(address, "2.xml").download();}
}
文件下载是一个常用的模块,我们可以对其封装一下,方便以后调用。涉及到的开发技术如下:
工程目录结构如下所示:
com.ricky.common.java.download.FileDownloader
package com.ricky.common.java.download;import org.apache.log4j.Logger;import com.ricky.common.java.download.config.FileDownloaderConfiguration;/*** Java 文件多线程下载* @author Ricky Fung**/
public class FileDownloader {protected Logger mLogger = Logger.getLogger("devLog");private volatile static FileDownloader fileDownloader;private FileDownloaderEngine downloaderEngine;private FileDownloaderConfiguration configuration;public static FileDownloader getInstance(){if(fileDownloader==null){synchronized (FileDownloader.class) {if(fileDownloader==null){fileDownloader = new FileDownloader();}}}return fileDownloader;}protected FileDownloader(){}public synchronized void init(FileDownloaderConfiguration configuration){if (configuration == null) {throw new IllegalArgumentException("FileDownloader configuration can not be initialized with null");}if (this.configuration == null) {mLogger.info("init FileDownloader");downloaderEngine = new FileDownloaderEngine(configuration);this.configuration = configuration;}else{mLogger.warn("Try to initialize FileDownloader which had already been initialized before.");}}public boolean download(String url, String filename){return downloaderEngine.download(url, filename);}public boolean isInited() {return configuration != null;}public void destroy() {if(downloaderEngine!=null){downloaderEngine.close();downloaderEngine = null;}}
}
com.ricky.common.java.download.config.FileDownloaderConfiguration
package com.ricky.common.java.download.config;import java.io.File;public class FileDownloaderConfiguration {private final int connectTimeout;private final int socketTimeout;private final int maxRetryCount;private final int coreThreadNum; private final long requestBytesSize;private final File downloadDestinationDir; private FileDownloaderConfiguration(Builder builder) { this.connectTimeout &#61; builder.connectTimeout; this.socketTimeout &#61; builder.socketTimeout; this.maxRetryCount &#61; builder.maxRetryCount; this.coreThreadNum &#61; builder.coreThreadNum; this.requestBytesSize &#61; builder.requestBytesSize;this.downloadDestinationDir &#61; builder.downloadDestinationDir; }public int getConnectTimeout() {return connectTimeout;}public int getSocketTimeout() {return socketTimeout;}public int getMaxRetryCount() {return maxRetryCount;}public int getCoreThreadNum() {return coreThreadNum;}public long getRequestBytesSize() {return requestBytesSize;}public File getDownloadDestinationDir() {return downloadDestinationDir;}public static FileDownloaderConfiguration.Builder custom() { return new Builder(); }public static class Builder { private int connectTimeout; private int socketTimeout; private int maxRetryCount; private int coreThreadNum; private long requestBytesSize; private File downloadDestinationDir; public Builder connectTimeout(int connectTimeout) { this.connectTimeout &#61; connectTimeout; return this; } public Builder socketTimeout(int socketTimeout) { this.socketTimeout &#61; socketTimeout; return this;} public Builder coreThreadNum(int coreThreadNum) { this.coreThreadNum &#61; coreThreadNum; return this; } public Builder maxRetryCount(int maxRetryCount) { this.maxRetryCount &#61; maxRetryCount; return this; } public Builder requestBytesSize(long requestBytesSize) { this.requestBytesSize &#61; requestBytesSize; return this; } public Builder downloadDestinationDir(File downloadDestinationDir) { this.downloadDestinationDir &#61; downloadDestinationDir; return this; }public FileDownloaderConfiguration build() { initDefaultValue(this); return new FileDownloaderConfiguration(this); } private void initDefaultValue(Builder builder) { if(builder.connectTimeout<1){ builder.connectTimeout &#61; 6*1000; } if(builder.socketTimeout<1){ builder.socketTimeout &#61; 6*1000; }if(builder.maxRetryCount<1){builder.maxRetryCount &#61; 1; } if(builder.coreThreadNum<1){ builder.coreThreadNum &#61; 3; }if(builder.requestBytesSize<1){ builder.requestBytesSize &#61; 1024*128; }if(builder.downloadDestinationDir&#61;&#61;null){ builder.downloadDestinationDir &#61; new File("./"); }} }
}
com.ricky.common.java.download.FileDownloaderEngine
package com.ricky.common.java.download;import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.util.BitSet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;import org.apache.log4j.Logger;import com.ricky.common.java.download.config.FileDownloaderConfiguration;
import com.ricky.common.java.download.job.DownloadWorker;
import com.ricky.common.java.download.job.Worker.DownloadListener;public class FileDownloaderEngine {protected Logger mLogger &#61; Logger.getLogger("devLog");private FileDownloaderConfiguration configuration;private ExecutorService pool;private HttpRequestImpl httpRequestImpl;private File downloadDestinationDir;private int coreThreadNum;public FileDownloaderEngine(FileDownloaderConfiguration configuration){this.configuration &#61; configuration;this.coreThreadNum &#61; configuration.getCoreThreadNum();this.httpRequestImpl &#61; new HttpRequestImpl(this.configuration);this.pool &#61; Executors.newFixedThreadPool(this.configuration.getCoreThreadNum());this.downloadDestinationDir &#61; this.configuration.getDownloadDestinationDir();if(!this.downloadDestinationDir.exists()){this.downloadDestinationDir.mkdirs();}}public boolean download(String url, String filename){long start_time &#61; System.currentTimeMillis();mLogger.info("开始下载,url:"&#43;url&#43;",filename:"&#43;filename);long total_file_len &#61; httpRequestImpl.getFileSize(url); // 获取文件长度if(total_file_len<1){mLogger.warn("获取文件大小失败,url:"&#43;url&#43;",filename:"&#43;filename);return false;}final BitSet downloadIndicatorBitSet &#61; new BitSet(coreThreadNum); //标记每个线程下载是否成功File file &#61; null;try {file &#61; new File(downloadDestinationDir, filename);RandomAccessFile raf &#61; new RandomAccessFile(file, "rws"); // 在本地创建一个和服务端大小相同的文件raf.setLength(total_file_len); // 设置文件的大小raf.close();mLogger.info("create new file:"&#43;file);} catch (FileNotFoundException e) {mLogger.error("create new file error", e);} catch (IOException e) {mLogger.error("create new file error", e);}if(file&#61;&#61;null || !file.exists()){mLogger.warn("创建文件失败,url:"&#43;url&#43;",filename:"&#43;filename);return false;}long thread_download_len &#61; (total_file_len &#43; coreThreadNum - 1) / coreThreadNum; // 计算每个线程要下载的长度mLogger.info("filename:"&#43;filename&#43;",total_file_len&#61;"&#43;total_file_len&#43;",coreThreadNum:"&#43;coreThreadNum&#43;",thread_download_len:"&#43;thread_download_len);CountDownLatch latch &#61; new CountDownLatch(coreThreadNum);//两个工人的协作 for (int i &#61; 0; i
com.ricky.common.java.download.job.DownloadWorker
package com.ricky.common.java.download.job;import java.io.File;
import java.util.concurrent.CountDownLatch;import org.apache.log4j.Logger;import com.ricky.common.java.download.HttpRequestImpl;
import com.ricky.common.java.download.RetryFailedException;public class DownloadWorker extends Worker {protected Logger mLogger &#61; Logger.getLogger("devLog");private int id;private String url;private File file;private long thread_download_len;private CountDownLatch latch;private HttpRequestImpl httpRequestImpl;public DownloadWorker(int id, String url, long thread_download_len, File file, HttpRequestImpl httpRequestImpl, CountDownLatch latch) {this.id &#61; id;this.url &#61; url;this.thread_download_len &#61; thread_download_len;this.file &#61; file;this.httpRequestImpl &#61; httpRequestImpl;this.latch &#61; latch;}&#64;Overridepublic void run() {long start &#61; id * thread_download_len; // 起始位置long end &#61; id * thread_download_len &#43; thread_download_len - 1; // 结束位置mLogger.info("线程:" &#43; id &#43;" 开始下载 url:"&#43;url&#43; ",range:" &#43; start &#43; "-" &#43; end);boolean result &#61; false;try {httpRequestImpl.downloadPartFile(id, url, file, start, end);result &#61; true;mLogger.info("线程:" &#43; id &#43; " 下载 "&#43;url&#43; " range[" &#43; start &#43; "-" &#43; end&#43;"] 成功");} catch (RetryFailedException e) {mLogger.error("线程:" &#43; id &#43;" 重试出错", e);}catch (Exception e) {mLogger.error("线程:" &#43; id &#43;" 下载出错", e);}if(listener!&#61;null){mLogger.info("notify FileDownloaderEngine download result");listener.notify(id, url, start, end, result, "");}latch.countDown();}}
com.ricky.common.java.download.job.Worker
package com.ricky.common.java.download.job;public abstract class Worker implements Runnable {protected DownloadListener listener;public void addListener(DownloadListener listener){this.listener &#61; listener;}public interface DownloadListener{public void notify(int thread_id, String url, long start, long end, boolean result, String msg);}
}
com.ricky.common.java.download.HttpRequestImpl
package com.ricky.common.java.download;import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.RandomAccessFile;import org.apache.commons.io.IOUtils;
import org.apache.http.HttpEntity;
import org.apache.http.HttpStatus;
import org.apache.http.client.ClientProtocolException;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.log4j.Logger;import com.ricky.common.java.download.config.FileDownloaderConfiguration;
import com.ricky.common.java.http.HttpClientManager;public class HttpRequestImpl {protected Logger mLogger &#61; Logger.getLogger("devLog");private int connectTimeout; private int socketTimeout; private int maxRetryCount; private long requestBytesSize; private CloseableHttpClient httpclient &#61; HttpClientManager.getHttpClient();public HttpRequestImpl(FileDownloaderConfiguration configuration){connectTimeout &#61; configuration.getConnectTimeout();socketTimeout &#61; configuration.getSocketTimeout();maxRetryCount &#61; configuration.getMaxRetryCount();requestBytesSize &#61; configuration.getRequestBytesSize();}public void downloadPartFile(int id, String url, File file, long start, long end){RandomAccessFile raf &#61; null;try {raf &#61; new RandomAccessFile(file, "rws");} catch (FileNotFoundException e) {mLogger.error("file not found:"&#43;file, e);throw new IllegalArgumentException(e);}int retry &#61; 0;long pos &#61; start;while(pos
}
最后是客户端调用代码
package com.ricky.common.java;import java.io.File;import com.ricky.common.java.download.FileDownloader;
import com.ricky.common.java.download.config.FileDownloaderConfiguration;public class FileDownloaderTest {public static void main(String[] args) {FileDownloader fileDownloader &#61; FileDownloader.getInstance();FileDownloaderConfiguration configuration &#61; FileDownloaderConfiguration.custom().coreThreadNum(5).downloadDestinationDir(new File("D:/Download")).build();fileDownloader.init(configuration);String url &#61; "http://dldir1.qq.com/qqfile/qq/QQ7.9/16621/QQ7.9.exe";;String filename &#61; "QQ7.9.exe";boolean result &#61; fileDownloader.download(url, filename);System.out.println("download result:"&#43;result);fileDownloader.destroy(); //close it when you not need}
}
https://github.com/TiFG/FileDownloader