热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

14.并发与异步

线程是创建并发的底层工具,因此具有一定的局限性。没有简单的方法可以从联合(Join)线程得到“返回值”。因此必须创建一些共享域。当抛出一个异常时,捕捉和处理异常也是麻烦的。线程完成之后,无法再次启动该




线程是创建并发的底层工具,因此具有一定的局限性。



  • 没有简单的方法可以从联合(Join)线程得到“返回值”。因此必须创建一些共享域。当抛出一个异常时,捕捉和处理异常也是麻烦的。

  • 线程完成之后,无法再次启动该线程。相反,只能联合(Join)它(在进程阻塞当前线程)。


与线程相比,Task是一个更高级的抽象概念,它标识一个通过或不通过线程实现的并发操作。
任务是可组合的——使用延续将它们串联在一起。它们可以使用线程池减少启动延迟,而且它们可以通过TaskCompletionSource使用回调方法,避免多个线程同时等待I/O密集操作。


14.3.1 启动任务

从Framework 4.5开始,启动一个由后台线程实现的Task,最简单的方法是使用静态方法Task.Run。调用时需要传入一个Action代理:


Task.Run(() => Console.WriteLine("hello"));

Task.Run是Framework 4.5新引入的方法,在Framework 4.0中,调用Task.Factory.StartNew,可以实现相同效果,前者相当于后者的快捷方式。



Task默认使用线程池,它们都是后台线程。意味当主线程结束时,所有任务都会随之停止。因此,要在控制台应用程序中运行这些例子,必须在启动任务之后阻塞主线程。例如,挂起(Waiting)该让你误,或者调用Console.ReadLine:



    static void Main(string[] args)
{
Task.Run(() => Console.WriteLine("Foo"));
Console.ReadLine();
}

采用这种方式调用Task.Run,与下面启动线程方式类似(唯一不同的是没有隐含使用线程池):


new Thread(() => Console.WriteLine("Foo")).Start();

Task.Run会返回一个Task对象,它可以用来监控任务执行过程,这一点与Thread对象不同。(这里没有调用Start,因为Task.Run创建是“热”任务;相反,想创建“冷”任务,必须使用Task构造函数,但这种方法在实践中很少用)



任务的Status属性可用于跟踪任务的执行状态。



1.等待(Wait)


调用Wait方法,可以阻塞任务,直至任务完成,效果等同于Thread.Join


    Task task = Task.Run(() =>
{
Thread.Sleep(2000);
Console.WriteLine("Foo");
});
Console.WriteLine(task.IsCompleted); //False
task.Wait();//阻塞,直至任务完成
Console.WriteLine(task.IsCompleted); //True
Console.ReadLine();

可以在Wait中指定一个超时时间和一个取消令牌。


2.长任务


默认情况下,CLR会运行在池化线程上,这种线程非常适合执行短计算密集作业。如果要执行长阻塞操作,则可以按下面方式避免使用池化线程:


    Task task = Task.Factory.StartNew(() =>
{
Console.WriteLine("Task started");
Thread.Sleep(2000);
Console.WriteLine("Foo");
}, TaskCreationOptions.LongRunning);

task.Wait(); // Blocks until task is complete


提示:
在池化线程上运行一个长任务问题并不大,但是如果要同时运行多个长任务(特别会阻塞的任务),则会对性能产生影响。在这种情况下,通常更好的方法是使用TaskCreationOptions.LongRunning:



  • 如果运行I/O密集任务,则可以使用TaskCompletionSource和异步函数,通过回调函数(延续)实现并发性,而不通过线程实现。

  • 如果是运行计算密集任务,则可以使用一个生产者/消费者队列,控制这些任务的并发数量,避免出现线程和进程阻塞的问题。



14.3.2 返回值

Task允许任务返回一个值。调用Task.Run,传入一个Func代理(或者兼容的Lambda表达式),代替Action,就可以获得一个Task



Task task = Task.Run (() => { Console.WriteLine ("Foo"); return 3; });

int result = task.Result; // Blocks if not already finished
Console.WriteLine (result); // 3

下面的例子创建一个任务,它使用LINQ就按前3百万个整数(从2开始)中的素数个数:


    Task primeNumberTask = Task.Run(() =>
Enumerable.Range(2, 3000000).Count(n => Enumerable.Range(2, (int)Math.Sqrt(n) - 1).All(i => n % i > 0)));

Console.WriteLine("Task running...");
Console.WriteLine("The answer is " + primeNumberTask.Result);

这段代码会打印“Task running...”,然后几秒钟后打印216815。


14.3.3 异常

与线程不同,Task可以随时抛出异常。
任务代码抛出一个未处理异常,那么这个异常会自动传递到调用Wait()的任务上或者访问TaskResult属性的代码上:


    Task task = Task.Run(() => { throw null; });
try
{
task.Wait();
}
catch (AggregateException aex)
{
if (aex.InnerException is NullReferenceException)
Console.WriteLine("Null!");
else
throw;
}

CLR会将异常封装在AggregateException中,从而更适合并行编程场景;


使用Task的IsFaultedIsCanceled属性,就可以不重新抛出异常而检测出错的任务。
如果都返回false,则没有出错;
IsCanceledtrue,任务抛出 OperationCanceledOPeration
IsFaultedtrue,则任务抛出另一种异常,而Exception属性包含该错误。


1.异常和自主任务


使用静态事件 TaskScheduler.UnobservedTaskException,可以在全局范围订阅为监控的异常;处理这个事件,然后记录发生的错误,是一个很好的异常处理方法。


14.3.4 延续

延续(continuation)告诉任务在完成之后继续执行下面的操作。
延续通常由一个回调方法实现,它会在操作完成之后执行一次。


给一个任务附加延续的方法有两种


第一种是C# 5.0异步功能使用的方法GetAwaiter方法


Task primeNumberTask = Task.Run (() =>
Enumerable.Range (2, 3000000).Count (n => Enumerable.Range (2, (int)Math.Sqrt(n)-1).All (i => n % i > 0)));

//获取用于等待此 System.Threading.Tasks.Task的等待者
var awaiter = primeNumberTask.GetAwaiter();
//将操作设置为当 System.Runtime.CompilerServices.TaskAwaiter 对象停止等待异步任务完成时执行
awaiter.OnCompleted (() =>
{
int result = awaiter.GetResult(); //异步任务完成后关闭等待任务
Console.WriteLine (result); //打印结果
});

调用GetAwaiter会返回一个等待者(awaiter)对象,它的方法会让先导(antecedent)任务(primeNumberTask)在完成(或出错)之后执行一个代理已经完成的任务也可以附加一个延续,这时延续就马上执行。



提示:
等待者可以是任意对象,但它必须包含前面所示两个方法(OnCompletedGetResult)和一个Boolean类型属性IsCompleted对象,它不需要实现包含所有这些成员的特定接口或继承特定基类。



调用GetResult()的好处在于,一旦先前的Task有异常,就会抛出该异常。而且该异常和之前演示的异常不同,它不需要经过AggregateException再包装了。


另一种附加延续的方法是调用任务的ContinueWith方法:


Task primeNumberTask = Task.Run (() =>
Enumerable.Range (2, 3000000).Count (n => Enumerable.Range (2, (int)Math.Sqrt(n)-1).All (i => n % i > 0)));

primeNumberTask.ContinueWith (antecedent =>
{
int result = antecedent.Result;
Console.WriteLine (result); // Writes 123
});

ContinueWith本身返回一个Task,它非常适合添加更多延续。然而,任务出错,我们必须直接处理AggregateException,然后编写额外代码,将延续编列到UI应用程序。而非UI上下文中,如果要让延续运行在同一个线程上,则必须指定TaskContinuationOptions.ExcuteSynchronously;否则弹回线程池。


14.3.5 TaskCompletionSource

前面介绍Task.Run如何创建一个在池化(或非池化)线程运行代理的任务。另一种就是TaskCompletionSource。


TaskCompletionSource可以创建任务,不包含任何必须在后面启动和结束的操作。原理是提供一个可以手工操作的“附属”任务——和其他任务一样。然而,这个任务完全通过下面的方法由TaskCompletionSource对象控制:


public class TaskCompletionSource
{
public void SetCanceled();
public void SetResult(TResult result);
public void SetException(Exception exception);
public bool TrySetCanceled();
public bool TrySetException(Exception exception);
...
}

调用这些方法可以给任务发送信号,将任务修改为完成、异常或取消状态。
这些方法只能调用一次,如果多次调用SetCanceledSetResultSetException,将抛出异常,而Try***等方法则会返回false。


    var tcs = new TaskCompletionSource();

new Thread(() => { Thread.Sleep(5000); tcs.SetResult(42); }).Start();

Task task = tcs.Task; // Our "slave" task.
Console.WriteLine(task.Result); // 42

使用TaskCompletionSource,可以编写自定义的Run方法:


        static void Main(string[] args)
{
Task task = Run(() => { Thread.Sleep(5000); return 42; });
Console.WriteLine(task.Result);
Console.Read();

}

static Task Run(Func function)
{
var tcs = new TaskCompletionSource();
new Thread(() =>
{
try { tcs.SetResult(function()); }
catch (Exception ex) { tcs.SetException(ex); }
}).Start();
return tcs.Task;
}

调用这个方法等同于使用TaskCreationOptions.LongRunning选项调用Task.Factory.StartNew,请求一个非池化线程


TaskCompletionSource真正作用是创建一个不绑定线程的任务。例如,假设一个任务需要等待5秒钟,然后返回数字42.我们可以使用Timer类实现,而不需要使用线程,由CLR在x毫秒之后触发一个事件:


    static void Main(string[] args)
{
var awaiter = GetAnswerToLife().GetAwaiter();
awaiter.OnCompleted(() => Console.WriteLine(awaiter.GetResult()));
}
static Task GetAnswerToLife()
{
var tcs = new TaskCompletionSource();
// Create a timer that fires once in 5000 ms:
var timer = new System.Timers.Timer(5000) { AutoReset = false };
timer.Elapsed += delegate { timer.Dispose(); tcs.SetResult(42); };
timer.Start();
return tcs.Task;
}

通过给任务附加一个延续,就可以在不阻塞任何线程的前提下打印这个结果。


    var awaiter = GetAnswerToLife().GetAwaiter();
awaiter.OnCompleted(() => Console.WriteLine(awaiter.GetResult()));

将延迟时间参数化,并且删除返回值,可以优化这段代码。并且将它变成一个通用的Delay方法。意味让它返回一个Task而不是Task。然而,TaskCompletionSource没有泛型版本,因此无法创建一个非泛型任务。但变通方法很简单:因为Task派生自Task,所以创建一个TaskCompletionSource,然后将它隐式转换为Task,就可以得到一个Task:


var tcs = new TaskCompletionSource();
Task task = tcs.Task;

写出Delay方法,然后让它5秒打印“42”:


    static void Main(string[] args)
{
Delay(5000).GetAwaiter().OnCompleted(() => Console.WriteLine(42));
Console.Read();
}
static Task Delay(int milliseconds)
{
var tcs = new TaskCompletionSource();
var timer = new System.Timers.Timer(milliseconds) { AutoReset = false };
timer.Elapsed += delegate { timer.Dispose(); tcs.SetResult(null); };
timer.Start();
return tcs.Task;
}

不在线程上使用TaskCompletionSource,意味着只有在延续启动时才创建线程。同时启动10000个这种操作,而不会出错或超出资源限制:


for (int i = 0; i <10000; i++)
Delay (5000).GetAwaiter().OnCompleted (() => Console.WriteLine (42));

14.3.6 Task.Delay

Task.DelayThread.Sleep的异步版本


Task.Delay(5000).GetAwaiter().OnCompleted(()=>Console.WriteLine(42));

或者


 Task.Delay(5000).ContinueWith(ant => Console.WriteLine(42));




推荐阅读
  • 本文介绍了Java并发库中的阻塞队列(BlockingQueue)及其典型应用场景。通过具体实例,展示了如何利用LinkedBlockingQueue实现线程间高效、安全的数据传递,并结合线程池和原子类优化性能。 ... [详细]
  • 本文介绍如何使用Objective-C结合dispatch库进行并发编程,以提高素数计数任务的效率。通过对比纯C代码与引入并发机制后的代码,展示dispatch库的强大功能。 ... [详细]
  • 1.如何在运行状态查看源代码?查看函数的源代码,我们通常会使用IDE来完成。比如在PyCharm中,你可以Ctrl+鼠标点击进入函数的源代码。那如果没有IDE呢?当我们想使用一个函 ... [详细]
  • 本文探讨了如何在给定整数N的情况下,找到两个不同的整数a和b,使得它们的和最大,并且满足特定的数学条件。 ... [详细]
  • 本文详细介绍了Java中org.neo4j.helpers.collection.Iterators.single()方法的功能、使用场景及代码示例,帮助开发者更好地理解和应用该方法。 ... [详细]
  • 1:有如下一段程序:packagea.b.c;publicclassTest{privatestaticinti0;publicintgetNext(){return ... [详细]
  • 本文详细介绍了如何在Linux系统上安装和配置Smokeping,以实现对网络链路质量的实时监控。通过详细的步骤和必要的依赖包安装,确保用户能够顺利完成部署并优化其网络性能监控。 ... [详细]
  • MQTT技术周报:硬件连接与协议解析
    本周开发笔记重点介绍了在新项目中使用MQTT协议进行硬件连接的技术细节,涵盖其特性、原理及实现步骤。 ... [详细]
  • 本文详细介绍了Java中org.w3c.dom.Text类的splitText()方法,通过多个代码示例展示了其实际应用。该方法用于将文本节点在指定位置拆分为两个节点,并保持在文档树中。 ... [详细]
  • 探讨如何通过编程技术实现100个并发连接,解决线程创建顺序问题,并提供高效的并发测试方案。 ... [详细]
  • Explore a common issue encountered when implementing an OAuth 1.0a API, specifically the inability to encode null objects and how to resolve it. ... [详细]
  • C++: 实现基于类的四面体体积计算
    本文介绍如何使用C++编程语言,通过定义类和方法来计算由四个三维坐标点构成的四面体体积。文中详细解释了四面体体积的数学公式,并提供了两种不同的实现方式。 ... [详细]
  • DNN Community 和 Professional 版本的主要差异
    本文详细解析了 DotNetNuke (DNN) 的两种主要版本:Community 和 Professional。通过对比两者的功能和附加组件,帮助用户选择最适合其需求的版本。 ... [详细]
  • UNP 第9章:主机名与地址转换
    本章探讨了用于在主机名和数值地址之间进行转换的函数,如gethostbyname和gethostbyaddr。此外,还介绍了getservbyname和getservbyport函数,用于在服务器名和端口号之间进行转换。 ... [详细]
  • Linux设备驱动程序:异步时间操作与调度机制
    本文介绍了Linux内核中的几种异步延迟操作方法,包括内核定时器、tasklet机制和工作队列。这些机制允许在未来的某个时间点执行任务,而无需阻塞当前线程,从而提高系统的响应性和效率。 ... [详细]
author-avatar
鹏大1111
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有