Java并发编程实践 目录
并发编程 01—— ThreadLocal
并发编程 02—— ConcurrentHashMap
并发编程 03—— 阻塞队列和生产者-消费者模式
并发编程 04—— 闭锁CountDownLatch 与 栅栏CyclicBarrier
并发编程 05—— Callable和Future
并发编程 06—— CompletionService : Executor 和 BlockingQueue
并发编程 07—— 任务取消
并发编程 08—— 任务取消 之 中断
并发编程 09—— 任务取消 之 停止基于线程的服务
并发编程 10—— 任务取消 之 关闭 ExecutorService
并发编程 11—— 任务取消 之 “毒丸”对象
并发编程 12—— 任务取消与关闭 之 shutdownNow 的局限性
并发编程 13—— 线程池的使用 之 配置ThreadPoolExecutor 和 饱和策略
并发编程 14—— 线程池 之 整体架构
并发编程 15—— 线程池 之 原理一
并发编程 16—— 线程池 之 原理二
并发编程 17—— Lock
并发编程 18—— 使用内置条件队列实现简单的有界缓存
并发编程 19—— 显式的Conditon 对象
并发编程 20—— AbstractQueuedSynchronizer 深入分析
并发编程 21—— 原子变量和非阻塞同步机制
当通过 shutdownNow 来强行关闭 ExecutorService 时,它会尝试取消正在执行的任务,并返回所有已提交但尚未开始的任务,从而将这些任务写入日志或者保存起来以便之后进行处理。
然而,我们无法通过常规方法来找出哪些任务已经开始但尚未结束。这意味着这我们无法在关闭过程中知道正在执行的任务的状态,除非任务本身会执行某种检查。要知道哪些任务还没有完成,你不仅需要知道哪些任务还没有开始,而且还需知道当 Executor 关闭时哪些任务正在执行。
第2 部分 实例在下面程序 TrackingExecutor 中给出了如何在关闭过程中判断正在执行的任务。通过封装 ExecutorService 并使得execute 记录哪些任务是在关闭后取消的,TrackingExecutor 可以找出哪些任务已经开始但还没有正常完成。在 Executor 结束后,getCancelledTasks 返回被取消的任务清单。
1 /**
2 * 7.21 在 ExecutorService 中跟踪在关闭之后取消的任务
3 * @ClassName: TrackingExecutor
4 * @author xingle
5 * @date 2014-11-12 下午8:39:33
6 */
7 public class TrackingExecutor extends AbstractExecutorService{
8 private final ExecutorService exec;
9 private final Set
10 .synchronizedSet(new HashSet
11
12 public TrackingExecutor(ExecutorService exec){
13 this.exec = exec;
14 }
15
16 public List
17 if(!exec.isTerminated())
18 throw new IllegalStateException();
19 return new ArrayList
20 }
21
22 /**
23 *
24 * @Description: TODO
25 * @param command
26 * @author xingle
27 * @data 2014-11-13 上午9:06:56
28 */
29 @Override
30 public void execute(final Runnable runnable) {
31 exec.execute(new Runnable() {
32
33 @Override
34 public void run() {
35 try{
36 runnable.run();
37 }finally{
38 if(isShutdown() && Thread.currentThread().isInterrupted())
39 tasksCancelledAtShutdown.add(runnable);
40 }
41 }
42 });
43 }
44
45 /**
46 * 下面将ExecutorService 的其他方法委托给 exec
47 */
48
49 /**
50 *
51 * @Description: TODO
52 * @author xingle
53 * @data 2014-11-13 上午9:06:56
54 */
55 @Override
56 public void shutdown() {
57 exec.shutdown();
58 }
59
60 /**
61 *
62 * @Description: TODO
63 * @return
64 * @author xingle
65 * @data 2014-11-13 上午9:06:56
66 */
67 @Override
68 public List
69 return exec.shutdownNow();
70 }
71
72 /**
73 *
74 * @Description: TODO
75 * @return
76 * @author xingle
77 * @data 2014-11-13 上午9:06:56
78 */
79 @Override
80 public boolean isShutdown() {
81 return exec.isShutdown();
82 }
83
84 /**
85 *
86 * @Description: TODO
87 * @return
88 * @author xingle
89 * @data 2014-11-13 上午9:06:56
90 */
91 @Override
92 public boolean isTerminated() {
93 return exec.isTerminated();
94 }
95
96 /**
97 *
98 * @Description: TODO
99 * @param timeout
100 * @param unit
101 * @return
102 * @throws InterruptedException
103 * @author xingle
104 * @data 2014-11-13 上午9:06:56
105 */
106 @Override
107 public boolean awaitTermination(long timeout, TimeUnit unit)
108 throws InterruptedException {
109 return exec.awaitTermination(timeout, unit);
110 }
111
112
113 }
在程序 WebCrawler 中给出了 TrackingExecutor 的用法。网页爬虫程序的工作通常是无穷尽的,因此当爬虫程序必须关闭时,我们通常希望保持它的状态,以便稍后重启动。CrawlTask 提供了一个 getPage 方法,该方法能找出正在处理的页面。当爬虫程序关闭时,无论是还没有开始的任务,还是那些被取消的任务,都将记录他们的URL,因此当爬虫程序程序启动时,就可以将这些URL 的页面抓取任务加入到任务队列中。
1 /**
2 * 7.22 使用TrackingExecutorService 来保存未完成的任务以备后续执行
3 * @ClassName: WebCrawler
4 * TODO
5 * @author xingle
6 * @date 2014-11-13 上午9:17:54
7 */
8 public abstract class WebCrawler {
9 private volatile TrackingExecutor exec;
10 @GuardedBy("this")
11 public final Set
12
13 private final ConcurrentMap
14 private static final long TIMEOUT = 500;
15 private static final TimeUnit UNIT = TimeUnit.MICROSECONDS;
16
17 public WebCrawler(URL startUrl){
18 urlsToCrawl.add(startUrl);
19 }
20
21 public synchronized void start(){
22 exec = new TrackingExecutor(Executors.newCachedThreadPool());
23 for (URL url: urlsToCrawl)
24 submitCrawlTask(url);
25 urlsToCrawl.clear();
26 }
27
28 /**
29 * 提交爬虫任务
30 * @param url
31 * @author xingle
32 * @data 2014-11-13 上午9:46:01
33 */
34 private void submitCrawlTask(URL url) {
35 exec.execute(new CrawlTask(url));
36 }
37
38 protected abstract List
39
40 /**
41 * 保存未完成的
42 * @param urlsToCrawl
43 * @author xingle
44 * @data 2014-11-13 上午10:10:07
45 */
46 private void saveUncrawled(List
47 for (Runnable task:uncrawled){
48 URL url = ((CrawlTask)task).getPage();
49 System.out.println("保存未完成的URL:"+url);
50 urlsToCrawl.add(url);
51 }
52
53 }
54
55 //爬虫任务
56 private class CrawlTask implements Runnable{
57 private final URL url;
58
59 CrawlTask(URL url){
60 this.url = url;
61 }
62
63 private int count = 1;
64
65 boolean alreadyCrawled() {
66 return seen.putIfAbsent(url, true) != null;
67 }
68
69 void markUncrawled() {
70 seen.remove(url);
71 System.out.printf("marking %s uncrawled%n", url);
72 }
73
74 @Override
75 public void run() {
76 for (URL link :processPage(url)){
77 if(Thread.currentThread().isInterrupted())
78 return;
79 System.out.println("提交的爬虫url:"+link);
80 submitCrawlTask(link);
81 }
82 }
83
84 public URL getPage(){
85 return url;
86 }
87 }
88
89 public synchronized void stop() throws InterruptedException{
90 try {
91 saveUncrawled(exec.shutdownNow());
92 if (exec.awaitTermination(100, UNIT)){
93 saveUncrawled(exec.getCancelledTasks());
94 }
95
96 } finally {
97 exec = null;
98 }
99 }
100 }
测试程序:
1 public class WebCrawler_Main {
2
3 public static void main(String[] args) throws MalformedURLException{
4 WebCrawler webc = new WebCrawler(new URL("http://site.baidu.com/")) {
5
6 @Override
7 protected List
8 //获取该url下所有的链接
9 //这里省略了该功能
10 List
11 try {
12 url2.add(new URL("http://www.cnblogs.com/xingele0917/"));
13 //url2.add(new URL("http://www.zhihu.com/"));
14 } catch (MalformedURLException e) {
15 e.printStackTrace();
16 }
17 return url2;
18
19 }
20
21 };
22
23 webc.start();
24 try {
25 Thread.sleep(10);
26 webc.stop();
27 } catch (InterruptedException e) {
28 e.printStackTrace();
29 }
30 }
31
32 }
执行结果: