热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

并行编程数据并行System.Threading.Tasks.Parallel类(.NET)

原文转载于:https:www.cnblogs.comspringsnowp9405016.html目录一、并行概念1、并行编程2、数据并行二、Parallel.Invoke&

原文转:https://www.cnblogs.com/springsnow/p/9405016.html

目录

  • 一、并行概念
    • 1、并行编程
    • 2、数据并行
  • 二、Parallel.Invoke():并行调用多个任务 。
  • 三、Parallel.For(): for 循环的并行运算 
  • 四、Parallel.ForEach():foreach 循环的并行运算 
  • 五、线程局部变量
    • 1、Parallel.For中定义局部变量:
    • 2、Parallel.Each中定义局部变量:
  • 六、Break、Stop中断与停止线程
  • 七、Cancel取消循环
  • 八、Handel Exceptions异常处理

 


回到顶部

一、并行概念


1、并行编程

      在.NET 4中的并行编程是依赖Task Parallel Library(后面简称为TPL) 实现的。在TPL中,最基本的执行单元是task(中文可以理解为"任务"),一个task就代表了你要执行的一个操作。你可以为你所要执行的每一个操作定义一个task,TPL就负责创建线程来执行你所定义的task,并且管理线程。TPL是面向task的,自动的;而传统的多线程是以人工为导向的。

现在已经进入了多核的时代,我们的程序如何更多的利用好硬件cpu,答案是并行处理。在.net4.0之前我们要开发并行的程序是非常的困难,在.net4.0中,在命名空间System.Threading.Tasks提供了方便的并行开发的类库。


2、数据并行

      数据并行指的是对源集合或数组的元素同时(即,并行)执行相同操作的场景。 在数据并行操作中,对源集合进行分区,以便多个线程能够同时在不同的网段上操作。

      任务并行库 (TPL) 支持通过 System.Threading.Tasks.Parallel 类实现的数据并行。 此类对 for 循环和 foreach 循环提供了基于方法的并行执行。你为Parallel.For 或 Parallel.ForEach 循环编写的循环逻辑与编写连续循环的相似。 无需创建线程或列工作项。 在基本循环中,不需要加锁。TPL 为你处理所有低级别的工作。

      Parallel.For()和Parallel.ForEach()方法多次调用同一个方法,而Parallel.Invoke()方法允许同时调用不同的方法。

回到顶部

二、Parallel.Invoke():并行调用多个任务 。

例1:同时调用2个任务

static void Main(string[] args)
{
var watch = Stopwatch.StartNew();
Parallel.Invoke(Run1, Run2);
watch.Stop();
Console.WriteLine("我是并行开发,总共耗时:{0}", watch.ElapsedMilliseconds)
}static void Run1()
{
Console.WriteLine("我是任务一,我跑了3s");
Thread.Sleep(3000);
}static void Run2()
{
Console.WriteLine("我是任务二,我跑了5s");
Thread.Sleep(5000);
}

例2:说明并不是每个任务一个线程。

// 定义一个线程局部变量,返回其线程名
ThreadLocal ThreadName = new ThreadLocal(() =>
{
return "Thread" + Thread.CurrentThread.ManagedThreadId;
});// 打印出当前线程名的方法。
Action action = () =>
{
// 如果 ThreadName.IsValueCreated 为true,在这个线程上不是第一次运行这个方法。
bool repeat = ThreadName.IsValueCreated;
Console.WriteLine("ThreadName = {0} {1}", ThreadName.Value, repeat ? "(repeat)" : "");
};// 调用8个方法,你应该会看到一些重复的线程名
Parallel.Invoke(action, action, action, action, action, action, action, action);
ThreadName.Dispose();

image

回到顶部

三、Parallel.For(): for 循环的并行运算 

      我们知道串行代码中也有一个for,但是那个for并没有用到多核,而Paraller.for它会在底层根据硬件线程的运行状况来充分的使用所有的可利用的硬件线程,注意这里的Parallel.for的步行是1。

      在For()方法中,前两个参数定义了循环的开头和结束。示例从0迭代到9。第3个参数是一个 Action委托。整数参数是循环的迭代次数,该参数被传递给Action 委托引用的方法。 Parallel.For方法的返回类型是ParallelLoopResult结构,它提供了循环是否结束的信息。

ParallelLoopResult result = Parallel.For(0, 10, i =>
{
Console.WriteLine("{0}, task: {1}, thread: {2}", i, Task.CurrentId, Thread.CurrentThread.ManagedThreadId);
Thread.Sleep(10);
});
Console.WriteLine(result.IsCompleted);

首先先写一个普通的循环:

private void NormalFor()
{
for (var i &#61; 0; i <10000; i&#43;&#43;)
{
for (var j &#61; 0; j <1000; j&#43;&#43;)
{
for (var k &#61; 0; k <100; k&#43;&#43;)
{
DoSomething();
}
}
}
}

再看一个并行的For语句&#xff1a;

private void ParallelFor()
{
Parallel.For(0, 10000, i &#61;>
{
for (int j &#61; 0; j <1000; j&#43;&#43;)
{
for (var k &#61; 0; k <100; k&#43;&#43;)
{
DoSomething();
}
}});
}

上面的例子中&#xff0c;只是将最外层的For语句替换成了Parallel.For&#xff0c;Parallel执行速度可以提高近一倍。

回到顶部

四、Parallel.ForEach&#xff08;&#xff09;&#xff1a;foreach 循环的并行运算 

private void NormalForeach()
{
foreach (var file in GetFiles())
{
DoSomething();
}}private void ParallelForeach()
{
Parallel.ForEach(GetFiles(), file &#61;> {
DoSomething();
});
}

ForEach的使用跟For使用几乎是差不多了&#xff0c;只是在对非泛型的Collection进行操作的时候&#xff0c;需要通过Cast方法进行转换。

ForEach的独到之处就是可以将数据进行分区&#xff0c;每一个小区内实现串行计算&#xff0c;分区采用Partitioner.Create实现。

for (int j &#61; 1; j <4; j&#43;&#43;)
{
Console.WriteLine("\n第{0}次比较", j);
ConcurrentBag bag &#61; new ConcurrentBag();
var watch &#61; Stopwatch.StartNew();
watch.Start();
for (int i &#61; 0; i <3000000; i&#43;&#43;)
{
bag.Add(i);
}
Console.WriteLine("串行计算&#xff1a;集合有:{0},总共耗时&#xff1a;{1}", bag.Count, watch.ElapsedMilliseconds);GC.Collect();
bag &#61; new ConcurrentBag();
watch &#61; Stopwatch.StartNew();
watch.Start();
Parallel.ForEach(Partitioner.Create(0, 3000000), i &#61;>
{
for (int m &#61; i.Item1; m {
bag.Add(m);
}
});
Console.WriteLine("并行计算&#xff1a;集合有:{0},总共耗时&#xff1a;{1}", bag.Count, watch.ElapsedMilliseconds);
GC.Collect();
}

回到顶部

五、线程局部变量

下面这段代码多次运行每次的结果都不一样&#xff0c;因为total变量是公共的&#xff0c;而我们的程序是多个线程的加&#xff0c;而多个线程之间是不能把数据共享的。

public void NormalParallelTest()
{
int[] nums &#61; Enumerable.Range(0, 1000000).ToArray();
long total &#61; 0;
Parallel.For(0,nums.Length,i&#61;>
{
total &#43;&#61; nums[i];
});
Console.WriteLine("The total is {0}", total);
}

其实我们需要的是在每个线程中计算出一个和值&#xff0c;然后再进行累加。我们来看看线程局部变量&#xff1a;

泛型方法Parallel.For的原型&#xff1a;

public static ParallelLoopResult For
(int fromInclusive, int toExclusive, Func localInit, Func body,Action localFinally);

  • TLocal:线程变量的类型&#xff1b;第一个、第二个参数就不必多说了&#xff0c;就是起始值跟结束值。
  • localInit&#xff1a;每个线程的线程局部变量初始值的设置&#xff1b;
  • body&#xff1a;每次循环执行的方法&#xff0c;其中方法的最后一个参数就是线程局部变量&#xff1b;
  • localFinally&#xff1a;每个线程之后执行的方法。

1、Parallel.For中定义局部变量&#xff1a;

从2开始&#xff0c;累加2个&#xff0c;得49.

int[] nums &#61; Enumerable.Range(0, 10).ToArray();
long total &#61; 0;Parallel.For(0, nums.Length, () &#61;> { return 2; },(j, loop, subtotal) &#61;>//1、每次循环执行的方法
{
subtotal &#43;&#61; nums[j];
Console.WriteLine("主体&#xff1a; thread {1}, task {2},结果&#xff1a;{0}", j&#43; ":" &#43;nums[j] &#43; "-" &#43; subtotal, Thread.CurrentThread.ManagedThreadId, Task.CurrentId);return subtotal;
},(x) &#61;>//2、每个线程执行之后执行的方法
{Console.WriteLine(" 最终执行&#xff1a;thread {1}, task {2}&#xff0c;结果&#xff1a;{0} ", x, Thread.CurrentThread.ManagedThreadId, Task.CurrentId);
Interlocked.Add(ref total, x);
});
Console.WriteLine("The total is {0}", total);

image


2、Parallel.Each中定义局部变量&#xff1a;

要注意的是&#xff0c;我们必须要使用ForEach&#xff0c;因为第一个参数表示的是迭代源的类型&#xff0c;第二个表示的是线程局部变量的类型&#xff0c;其方法的参数跟For是差不多的。

public void ForeachThreadLocalTest()
{int[] nums &#61; Enumerable.Range(0, 1000000).ToArray();long total &#61; 0;Parallel.ForEach(nums,()&#61;>0,(member,loopState,subTotal)&#61;>//1、每次循环执行的方法{subTotal &#43;&#61; member;return subTotal;},(perLocal)&#61;>//2、每个线程执行之后执行的方法Interlocked.Add(ref total,perLocal));Console.WriteLine("The total is {0}", total);
}

回到顶部

六、Break、Stop中断与停止线程

      在并行循环的委托参数中提供了一个ParallelLoopState&#xff0c;该实例提供了Break和Stop方法来帮我们实现。

  • Break“中断”&#xff1a;表示完成当前线程上当前迭代之前的所有线程上的所有迭代&#xff0c;然后退出循环。&#xff08;比如并行计算正在迭代100&#xff0c;那么break后程序还会迭代所有小于100的。&#xff09;
  • Stop“停止”&#xff1a;表示在方便的情况下尽快停止所有迭代。&#xff08;比如正在迭代100突然遇到stop&#xff0c;那它啥也不管了&#xff0c;直接退出。&#xff09;

首先我们可以看到在Parallel.For的一个重载方法&#xff1a;

public static ParallelLoopResult For(int fromInclusive, int toExclusive, Action body)

在委托的最后一个参数类型为ParallelLoopState,而ParallelLoopState里面提供给我们两个方法&#xff1a;Break、Stop来终止迭代。

private void StopLoop()
{var Stack &#61; new ConcurrentStack();Parallel.For(0, 10000, (i, loopState) &#61;>{if (i <1000)Stack.Push(i.ToString());else{loopState.Stop();return;}});Console.WriteLine("Stop Loop Info:\n elements count:{0}", Stack.Count);
}

 

回到顶部

七、Cancel取消循环

      在并行的循环中支持通过传递ParallelOptions参数中的CancellationToken进行取消循环的控制&#xff0c;我们可以CancellationTokenSource实例化之后传递给ParallelOptions对象Cancellation值。下面来看个示例&#xff1a;

      在For循环的实现代码内部&#xff0c;Parallel类验证CancellationToken 的结果&#xff0c;并取消操作。一旦取消操作&#xff0c;For&#xff08;&#xff09;方法就抛出个OperationCanceledException类型的异常&#xff0c;这是本例捕获的异常。使用 CancellationTokeri可以注册取消操作时的信息。为此&#xff0c;需要调用Register方法&#xff0c;并传递一个在取消 操作时调用的委托。

var cts &#61; new CancellationTokenSource();
cts.Token.Register(() &#61;>Console.WriteLine("*** token canceled"));// start a task that sends a cancel after 500 ms
new Task(() &#61;>
{
Thread.Sleep(500);
cts.Cancel(false);
}).Start();try
{
ParallelLoopResult result &#61;
Parallel.For(0, 100,
new ParallelOptions()
{
CancellationToken &#61; cts.Token,
},
x &#61;>
{
Console.WriteLine("loop {0} started", x);
int sum &#61; 0;
for (int i &#61; 0; i <100; i&#43;&#43;)
{
Thread.Sleep(2);
sum &#43;&#61; i;
}
Console.WriteLine("loop {0} finished", x);
});
}
catch (OperationCanceledException ex)
{
Console.WriteLine(ex.Message);
}

回到顶部

八、Handel Exceptions异常处理

      在处理并行循环的异常的与顺序循环异常的处理是有所不同的,并行循环里面可能会一个异常在多个循环中出现,或则一个线程上的异常导致另外一个线程上也出现异常。比较好的处理方式就是&#xff0c;首先获取所有的异常最后通过AggregateException来包装所有的循环的异常&#xff0c;循环结束后进行throw。看一段示例代码&#xff1a;

private void HandleNumbers(int[] numbers)
{var exceptions &#61; new ConcurrentQueue();Parallel.For(0, numbers.Length, i &#61;> {try{if (numbers[i] > 10 && numbers[i] <20){throw new Exception(String.Format("numbers[{0}] betwewn 10 to 20",i));}}catch (Exception e){exceptions.Enqueue(e);}});if (exceptions.Count > 0) throw new AggregateException(exceptions); }

测试方法&#xff1a;

public void HandleExceptions()
{
var numbers &#61; Enumerable.Range(0, 10000).ToArray();
try
{
this.HandleNumbers(numbers);
}
catch(AggregateException exceptions)
{
foreach (var ex in exceptions.InnerExceptions)
{
Console.WriteLine(ex.Message);
}
}
}

对上面的方法说明下&#xff0c;在HandleNumbers方法中&#xff0c;就是一个小的demo如果元素的值出现在10-20之间就抛出异常。在上面我们的处理方法就是&#xff1a;在循环时通过队列将所有的异常都集中起来&#xff0c;循环结束后来抛出一个AggregateException。


推荐阅读
  • 大华股份2013届校园招聘软件算法类试题D卷
    一、填空题(共17题,每题3分,总共51分)1.设有inta5,*b,**c,执行语句c&b,b&a后,**c的值为________答:5 ... [详细]
  • 本文详细探讨了在Java中如何将图像对象转换为文件和字节数组(Byte[])的技术。虽然网络上存在大量相关资料,但实际操作时仍需注意细节。本文通过使用JMSL 4.0库中的图表对象作为示例,提供了一种实用的方法。 ... [详细]
  • 问题描述现在,不管开发一个多大的系统(至少我现在的部门是这样的),都会带一个日志功能;在实际开发过程中 ... [详细]
  • 关于进程的复习:#管道#数据的共享Managerdictlist#进程池#cpu个数1#retmap(func,iterable)#异步自带close和join#所有 ... [详细]
  • 深入理解Java多线程与并发机制
    本文探讨了Java多线程和并发机制的核心概念,包括多线程类的分类、执行器框架、并发容器及控制工具。通过详细解析这些组件,帮助开发者更好地理解和应用多线程技术。 ... [详细]
  • 本文总结了Java初学者需要掌握的六大核心知识点,帮助你更好地理解和应用Java编程。无论你是刚刚入门还是希望巩固基础,这些知识点都是必不可少的。 ... [详细]
  • Docker安全策略与管理
    本文探讨了Docker的安全挑战、核心安全特性及其管理策略,旨在帮助读者深入理解Docker安全机制,并提供实用的安全管理建议。 ... [详细]
  • 字符串中特定模式出现次数的计算方法
    本文详细探讨了如何高效地计算字符串中特定模式(如'pat')的出现次数,通过实例分析与算法解析,帮助读者掌握解决此类问题的方法。 ... [详细]
  • 问题场景用Java进行web开发过程当中,当遇到很多很多个字段的实体时,最苦恼的莫过于编辑字段的查看和修改界面,发现2个页面存在很多重复信息,能不能写一遍?有没有轮子用都不如自己造。解决方式笔者根据自 ... [详细]
  • 我自己做了一个网站图片的抓取,感觉速度有点慢抓取4000张图片可能得用15分钟左右的时间,我百度看用线程可以加快抓取,然后创建了5个线程抓取,但是5个线程是同步执行同样的操作一个图片就 ... [详细]
  • 在iOS开发中,多线程技术的应用非常广泛,能够高效地执行多个调度任务。本文将重点介绍GCD(Grand Central Dispatch)在多线程开发中的应用,包括其函数和队列的实现细节。 ... [详细]
  • 在运行于MS SQL Server 2005的.NET 2.0 Web应用中,我偶尔会遇到令人头疼的SQL死锁问题。过去,我们主要通过调整查询来解决这些问题,但这既耗时又不可靠。我希望能找到一种确定性的查询模式,确保从设计上彻底避免SQL死锁。 ... [详细]
  • 本文详细介绍了 Java 网站开发的相关资源和步骤,包括常用网站、开发环境和框架选择。 ... [详细]
  • 本文将深入探讨 iOS 中的 Grand Central Dispatch (GCD),并介绍如何利用 GCD 进行高效多线程编程。如果你对线程的基本概念还不熟悉,建议先阅读相关基础资料。 ... [详细]
  • VB.net 进程通信中FindWindow、FindWindowEX、SendMessage函数的理解
    目录一、代码背景二、主要工具三、函数解析1、FindWindow:2、FindWindowEx:3、SendMessage: ... [详细]
author-avatar
兜兜岁月真伟大
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有