热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

Java多线程下载技术实现

多线程下载多线程下载技术,简单的说就是把要下载的文件分成几块,由不同的线程来负责每一块数据的下载任务。技术要点RandomAccessFile&#

多线程下载

多线程下载技术,简单的说就是把要下载的文件分成几块,由不同的线程来负责每一块数据的下载任务。

技术要点


  • RandomAccessFile:
    Java中用来实现随机访问文件的类
  • http Range请求头

具体思路

1、文件分块。 文件分块大小(blockSize)= (文件大小 +线程数 - 1 )/ 线程数 ;
2、确定每一个线程所要下载的 文件的起始和结束位置。
现假设为每个线程分别编号:0,1, 2,3;则
第一个线程负责的下载位置是: 0*blockSize - (0+1)*blockSize -1,
第二个线程负责的下载位置是: 1*blockSize - (1+1)*blockSize -1,
以此类推第i个线程负责的下载位置是:i*blockSize - (i+1)*blockSize -1;
即线程(编号为id)下载开始位置 start = id*block;
即线程(编号为id)下载结束位置 end = (id+1)*block -1;
3、设置http 请求头, conn.setRequestProperty(“Range”, “bytes=” + start + “-” + end);

代码实现

一个简单的Java多线程下载代码如下:

package com.ricky.java.test.download;import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.RandomAccessFile;
import java.net.HttpURLConnection;
import java.net.URL;public class Downloader {private URL url; // 目标地址private File file; // 本地文件private static final int THREAD_AMOUNT = 8; // 线程数private static final String DOWNLOAD_DIR_PATH = "D:/Download"; // 下载目录private int threadLen; // 每个线程下载多少public Downloader(String address, String filename) throws IOException { // 通过构造函数传入下载地址url = new URL(address);File dir = new File(DOWNLOAD_DIR_PATH);if(!dir.exists()){dir.mkdirs();}file = new File(dir, filename);}public void download() throws IOException {HttpURLConnection conn = (HttpURLConnection) url.openConnection();conn.setConnectTimeout(5000);int totalLen = conn.getContentLength(); // 获取文件长度threadLen = (totalLen + THREAD_AMOUNT - 1) / THREAD_AMOUNT; // 计算每个线程要下载的长度System.out.println("totalLen="+totalLen+",threadLen:"+threadLen);RandomAccessFile raf = new RandomAccessFile(file, "rws"); // 在本地创建一个和服务端大小相同的文件raf.setLength(totalLen); // 设置文件的大小raf.close();for (int i = 0; i // 开启3条线程, 每个线程下载一部分数据到本地文件中new DownloadThread(i).start();}private class DownloadThread extends Thread {private int id;public DownloadThread(int id) {this.id = id;}public void run() {int start = id * threadLen; // 起始位置int end = id * threadLen + threadLen - 1; // 结束位置System.out.println("线程" + id + ": " + start + "-" + end);try {HttpURLConnection conn = (HttpURLConnection) url.openConnection();conn.setConnectTimeout(5000);conn.setRequestProperty("Range", "bytes=" + start + "-" + end); // 设置当前线程下载的范围InputStream in = conn.getInputStream();RandomAccessFile raf = new RandomAccessFile(file, "rws");raf.seek(start); // 设置保存数据的位置byte[] buffer = new byte[1024];int len;while ((len = in.read(buffer)) != -1)raf.write(buffer, 0, len);raf.close();System.out.println("线程" + id + "下载完毕");} catch (IOException e) {e.printStackTrace();}}}public static void main(String[] args) throws IOException {String address = "http://dldir1.qq.com/qqfile/qq/QQ7.9/16621/QQ7.9.exe";new Downloader(address, "QQ7.9.exe").download();// String address = "http://api.t.dianping.com/n/api.xml?cityId=2";
// new Downloader(address, "2.xml").download();}
}

封装多线程下载

文件下载是一个常用的模块,我们可以对其封装一下,方便以后调用。涉及到的开发技术如下:

  • JDK 1.7
  • Eclipse Juno
  • Maven 3
  • HttpClient 4.3.6

工程目录结构如下所示:
这里写图片描述

com.ricky.common.java.download.FileDownloader

package com.ricky.common.java.download;import org.apache.log4j.Logger;import com.ricky.common.java.download.config.FileDownloaderConfiguration;/*** Java 文件多线程下载* @author Ricky Fung**/
public class FileDownloader {protected Logger mLogger = Logger.getLogger("devLog");private volatile static FileDownloader fileDownloader;private FileDownloaderEngine downloaderEngine;private FileDownloaderConfiguration configuration;public static FileDownloader getInstance(){if(fileDownloader==null){synchronized (FileDownloader.class) {if(fileDownloader==null){fileDownloader = new FileDownloader();}}}return fileDownloader;}protected FileDownloader(){}public synchronized void init(FileDownloaderConfiguration configuration){if (configuration == null) {throw new IllegalArgumentException("FileDownloader configuration can not be initialized with null");}if (this.configuration == null) {mLogger.info("init FileDownloader");downloaderEngine = new FileDownloaderEngine(configuration);this.configuration = configuration;}else{mLogger.warn("Try to initialize FileDownloader which had already been initialized before.");}}public boolean download(String url, String filename){return downloaderEngine.download(url, filename);}public boolean isInited() {return configuration != null;}public void destroy() {if(downloaderEngine!=null){downloaderEngine.close();downloaderEngine = null;}}
}

com.ricky.common.java.download.config.FileDownloaderConfiguration

package com.ricky.common.java.download.config;import java.io.File;public class FileDownloaderConfiguration {private final int connectTimeout;private final int socketTimeout;private final int maxRetryCount;private final int coreThreadNum; private final long requestBytesSize;private final File downloadDestinationDir; private FileDownloaderConfiguration(Builder builder) { this.connectTimeout &#61; builder.connectTimeout; this.socketTimeout &#61; builder.socketTimeout; this.maxRetryCount &#61; builder.maxRetryCount; this.coreThreadNum &#61; builder.coreThreadNum; this.requestBytesSize &#61; builder.requestBytesSize;this.downloadDestinationDir &#61; builder.downloadDestinationDir; }public int getConnectTimeout() {return connectTimeout;}public int getSocketTimeout() {return socketTimeout;}public int getMaxRetryCount() {return maxRetryCount;}public int getCoreThreadNum() {return coreThreadNum;}public long getRequestBytesSize() {return requestBytesSize;}public File getDownloadDestinationDir() {return downloadDestinationDir;}public static FileDownloaderConfiguration.Builder custom() { return new Builder(); }public static class Builder { private int connectTimeout; private int socketTimeout; private int maxRetryCount; private int coreThreadNum; private long requestBytesSize; private File downloadDestinationDir; public Builder connectTimeout(int connectTimeout) { this.connectTimeout &#61; connectTimeout; return this; } public Builder socketTimeout(int socketTimeout) { this.socketTimeout &#61; socketTimeout; return this;} public Builder coreThreadNum(int coreThreadNum) { this.coreThreadNum &#61; coreThreadNum; return this; } public Builder maxRetryCount(int maxRetryCount) { this.maxRetryCount &#61; maxRetryCount; return this; } public Builder requestBytesSize(long requestBytesSize) { this.requestBytesSize &#61; requestBytesSize; return this; } public Builder downloadDestinationDir(File downloadDestinationDir) { this.downloadDestinationDir &#61; downloadDestinationDir; return this; }public FileDownloaderConfiguration build() { initDefaultValue(this); return new FileDownloaderConfiguration(this); } private void initDefaultValue(Builder builder) { if(builder.connectTimeout<1){ builder.connectTimeout &#61; 6*1000; } if(builder.socketTimeout<1){ builder.socketTimeout &#61; 6*1000; }if(builder.maxRetryCount<1){builder.maxRetryCount &#61; 1; } if(builder.coreThreadNum<1){ builder.coreThreadNum &#61; 3; }if(builder.requestBytesSize<1){ builder.requestBytesSize &#61; 1024*128; }if(builder.downloadDestinationDir&#61;&#61;null){ builder.downloadDestinationDir &#61; new File("./"); }} }
}

com.ricky.common.java.download.FileDownloaderEngine

package com.ricky.common.java.download;import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.util.BitSet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;import org.apache.log4j.Logger;import com.ricky.common.java.download.config.FileDownloaderConfiguration;
import com.ricky.common.java.download.job.DownloadWorker;
import com.ricky.common.java.download.job.Worker.DownloadListener;public class FileDownloaderEngine {protected Logger mLogger &#61; Logger.getLogger("devLog");private FileDownloaderConfiguration configuration;private ExecutorService pool;private HttpRequestImpl httpRequestImpl;private File downloadDestinationDir;private int coreThreadNum;public FileDownloaderEngine(FileDownloaderConfiguration configuration){this.configuration &#61; configuration;this.coreThreadNum &#61; configuration.getCoreThreadNum();this.httpRequestImpl &#61; new HttpRequestImpl(this.configuration);this.pool &#61; Executors.newFixedThreadPool(this.configuration.getCoreThreadNum());this.downloadDestinationDir &#61; this.configuration.getDownloadDestinationDir();if(!this.downloadDestinationDir.exists()){this.downloadDestinationDir.mkdirs();}}public boolean download(String url, String filename){long start_time &#61; System.currentTimeMillis();mLogger.info("开始下载,url:"&#43;url&#43;",filename:"&#43;filename);long total_file_len &#61; httpRequestImpl.getFileSize(url); // 获取文件长度if(total_file_len<1){mLogger.warn("获取文件大小失败,url:"&#43;url&#43;",filename:"&#43;filename);return false;}final BitSet downloadIndicatorBitSet &#61; new BitSet(coreThreadNum); //标记每个线程下载是否成功File file &#61; null;try {file &#61; new File(downloadDestinationDir, filename);RandomAccessFile raf &#61; new RandomAccessFile(file, "rws"); // 在本地创建一个和服务端大小相同的文件raf.setLength(total_file_len); // 设置文件的大小raf.close();mLogger.info("create new file:"&#43;file);} catch (FileNotFoundException e) {mLogger.error("create new file error", e);} catch (IOException e) {mLogger.error("create new file error", e);}if(file&#61;&#61;null || !file.exists()){mLogger.warn("创建文件失败,url:"&#43;url&#43;",filename:"&#43;filename);return false;}long thread_download_len &#61; (total_file_len &#43; coreThreadNum - 1) / coreThreadNum; // 计算每个线程要下载的长度mLogger.info("filename:"&#43;filename&#43;",total_file_len&#61;"&#43;total_file_len&#43;",coreThreadNum:"&#43;coreThreadNum&#43;",thread_download_len:"&#43;thread_download_len);CountDownLatch latch &#61; new CountDownLatch(coreThreadNum);//两个工人的协作 for (int i &#61; 0; i new DownloadWorker(i, url, thread_download_len, file, httpRequestImpl, latch);worker.addListener(new DownloadListener() {&#64;Overridepublic void notify(int thread_id, String url, long start, long end,boolean result, String msg) {mLogger.info("thread_id:"&#43;thread_id&#43;" download result:"&#43;result&#43;",url->"&#43;url);modifyState(downloadIndicatorBitSet, thread_id);}});pool.execute(worker);}try {latch.await();} catch (InterruptedException e) {mLogger.error("CountDownLatch Interrupt", e);}mLogger.info("下载结束,url:"&#43;url&#43;",耗时:"&#43;((System.currentTimeMillis()-start_time)/1000)&#43;"(s)");return downloadIndicatorBitSet.cardinality()&#61;&#61;coreThreadNum;}private synchronized void modifyState(BitSet bitSet, int index){bitSet.set(index);}/**释放资源*/public void close(){if(httpRequestImpl!&#61;null){httpRequestImpl.close();httpRequestImpl &#61; null;}if(pool!&#61;null){pool.shutdown();pool &#61; null;}}}

com.ricky.common.java.download.job.DownloadWorker

package com.ricky.common.java.download.job;import java.io.File;
import java.util.concurrent.CountDownLatch;import org.apache.log4j.Logger;import com.ricky.common.java.download.HttpRequestImpl;
import com.ricky.common.java.download.RetryFailedException;public class DownloadWorker extends Worker {protected Logger mLogger &#61; Logger.getLogger("devLog");private int id;private String url;private File file;private long thread_download_len;private CountDownLatch latch;private HttpRequestImpl httpRequestImpl;public DownloadWorker(int id, String url, long thread_download_len, File file, HttpRequestImpl httpRequestImpl, CountDownLatch latch) {this.id &#61; id;this.url &#61; url;this.thread_download_len &#61; thread_download_len;this.file &#61; file;this.httpRequestImpl &#61; httpRequestImpl;this.latch &#61; latch;}&#64;Overridepublic void run() {long start &#61; id * thread_download_len; // 起始位置long end &#61; id * thread_download_len &#43; thread_download_len - 1; // 结束位置mLogger.info("线程:" &#43; id &#43;" 开始下载 url:"&#43;url&#43; ",range:" &#43; start &#43; "-" &#43; end);boolean result &#61; false;try {httpRequestImpl.downloadPartFile(id, url, file, start, end);result &#61; true;mLogger.info("线程:" &#43; id &#43; " 下载 "&#43;url&#43; " range[" &#43; start &#43; "-" &#43; end&#43;"] 成功");} catch (RetryFailedException e) {mLogger.error("线程:" &#43; id &#43;" 重试出错", e);}catch (Exception e) {mLogger.error("线程:" &#43; id &#43;" 下载出错", e);}if(listener!&#61;null){mLogger.info("notify FileDownloaderEngine download result");listener.notify(id, url, start, end, result, "");}latch.countDown();}}

com.ricky.common.java.download.job.Worker

package com.ricky.common.java.download.job;public abstract class Worker implements Runnable {protected DownloadListener listener;public void addListener(DownloadListener listener){this.listener &#61; listener;}public interface DownloadListener{public void notify(int thread_id, String url, long start, long end, boolean result, String msg);}
}

com.ricky.common.java.download.HttpRequestImpl

package com.ricky.common.java.download;import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.RandomAccessFile;import org.apache.commons.io.IOUtils;
import org.apache.http.HttpEntity;
import org.apache.http.HttpStatus;
import org.apache.http.client.ClientProtocolException;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.log4j.Logger;import com.ricky.common.java.download.config.FileDownloaderConfiguration;
import com.ricky.common.java.http.HttpClientManager;public class HttpRequestImpl {protected Logger mLogger &#61; Logger.getLogger("devLog");private int connectTimeout; private int socketTimeout; private int maxRetryCount; private long requestBytesSize; private CloseableHttpClient httpclient &#61; HttpClientManager.getHttpClient();public HttpRequestImpl(FileDownloaderConfiguration configuration){connectTimeout &#61; configuration.getConnectTimeout();socketTimeout &#61; configuration.getSocketTimeout();maxRetryCount &#61; configuration.getMaxRetryCount();requestBytesSize &#61; configuration.getRequestBytesSize();}public void downloadPartFile(int id, String url, File file, long start, long end){RandomAccessFile raf &#61; null;try {raf &#61; new RandomAccessFile(file, "rws");} catch (FileNotFoundException e) {mLogger.error("file not found:"&#43;file, e);throw new IllegalArgumentException(e);}int retry &#61; 0;long pos &#61; start;while(poslong end_index &#61; pos &#43; requestBytesSize;if(end_index>end){end_index &#61; end;}boolean success &#61; false;try {success &#61; requestByRange(url, raf, pos, end_index);} catch (ClientProtocolException e) {mLogger.error("download error,start:"&#43;pos&#43;",end:"&#43;end_index, e);}catch (IOException e) {mLogger.error("download error,start:"&#43;pos&#43;",end:"&#43;end_index, e);}catch (Exception e) {mLogger.error("download error,start:"&#43;pos&#43;",end:"&#43;end_index, e);}// mLogger.info("线程:" &#43; id &#43;",download url:"&#43;url&#43;",range:"&#43; pos &#43; "-" &#43; end_index&#43;",success&#61;"&#43;success );if(success){pos &#43;&#61; requestBytesSize;retry &#61; 0;}else{if(retry "线程:" &#43; id &#43;",url:"&#43;url&#43;",range:"&#43;pos&#43;","&#43;end_index&#43;" 下载失败,重试"&#43;retry&#43;"次");}else{mLogger.warn("线程:" &#43; id &#43;",url:"&#43;url&#43;",range:"&#43;pos&#43;","&#43;end_index&#43;" 下载失败,放弃重试!");throw new RetryFailedException("超过最大重试次数");}}}}private boolean requestByRange(String url, RandomAccessFile raf, long start, long end) throws ClientProtocolException, IOException {HttpGet httpget &#61; new HttpGet(url);httpget.setHeader("User-Agent", "Mozilla/5.0 (Windows NT 6.3; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/42.0.2311.152 Safari/537.36");httpget.setHeader("Range", "bytes&#61;" &#43; start &#43; "-" &#43; end);RequestConfig requestConfig &#61; RequestConfig.custom().setConnectTimeout(connectTimeout).setSocketTimeout(socketTimeout).build();httpget.setConfig(requestConfig);CloseableHttpResponse response &#61; null;try {response &#61; httpclient.execute(httpget);int code &#61; response.getStatusLine().getStatusCode();if(code&#61;&#61;HttpStatus.SC_OK || code&#61;&#61; HttpStatus.SC_PARTIAL_CONTENT){HttpEntity entity &#61; response.getEntity();if (entity !&#61; null) {InputStream in &#61; entity.getContent();raf.seek(start);// 设置保存数据的位置byte[] buffer &#61; new byte[1024];int len;while ((len &#61; in.read(buffer)) !&#61; -1){raf.write(buffer, 0, len);}return true;}else{mLogger.warn("response entity is null,url:"&#43;url);}}else{mLogger.warn("response error, code&#61;"&#43;code&#43;",url:"&#43;url);}}finally {IOUtils.closeQuietly(response);}return false;}public long getFileSize(String url){int retry &#61; 0;long filesize &#61; 0;while(retrytry {filesize &#61; getContentLength(url);} catch (Exception e) {mLogger.error("get File Size error", e);}if(filesize>0){break;}else{retry&#43;&#43;;mLogger.warn("get File Size failed,retry:"&#43;retry);}}return filesize;}private long getContentLength(String url) throws ClientProtocolException, IOException{HttpGet httpget &#61; new HttpGet(url);httpget.setHeader("User-Agent", "Mozilla/5.0 (Windows NT 6.3; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/42.0.2311.152 Safari/537.36");RequestConfig requestConfig &#61; RequestConfig.custom().setConnectTimeout(connectTimeout).setSocketTimeout(socketTimeout).build();httpget.setConfig(requestConfig);CloseableHttpResponse response &#61; null;try {response &#61; httpclient.execute(httpget);int code &#61; response.getStatusLine().getStatusCode();if(code&#61;&#61;HttpStatus.SC_OK){HttpEntity entity &#61; response.getEntity();if (entity !&#61; null) {return entity.getContentLength();}}else{mLogger.warn("response code&#61;"&#43;code);}}finally {IOUtils.closeQuietly(response);}return -1;}public void close(){if(httpclient!&#61;null){try {httpclient.close();} catch (IOException e) {e.printStackTrace();}httpclient &#61; null;}}
}

最后是客户端调用代码

package com.ricky.common.java;import java.io.File;import com.ricky.common.java.download.FileDownloader;
import com.ricky.common.java.download.config.FileDownloaderConfiguration;public class FileDownloaderTest {public static void main(String[] args) {FileDownloader fileDownloader &#61; FileDownloader.getInstance();FileDownloaderConfiguration configuration &#61; FileDownloaderConfiguration.custom().coreThreadNum(5).downloadDestinationDir(new File("D:/Download")).build();fileDownloader.init(configuration);String url &#61; "http://dldir1.qq.com/qqfile/qq/QQ7.9/16621/QQ7.9.exe";;String filename &#61; "QQ7.9.exe";boolean result &#61; fileDownloader.download(url, filename);System.out.println("download result:"&#43;result);fileDownloader.destroy(); //close it when you not need}
}


源代码

https://github.com/TiFG/FileDownloader


推荐阅读
author-avatar
mobiledu2502883257
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有