热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

Rayismultitaskingofdisturbingpython.

并行和分布式计算是现代应用程序的主要内容。我们需要利用多个核心或多台机器来加速应用程序或大规模运行它们。网络爬虫和搜索所使用的基础设施并不是在某人笔记本电脑上运行的单线程程序&#x

并行和分布式计算是现代应用程序的主要内容。我们需要利用多个核心或多台机器来加速应用程序或大规模运行它们。网络爬虫和搜索所使用的基础设施并不是在某人笔记本电脑上运行的单线程程序,而是相互通信和交互的服务的集合。

ray的api接口教程:https://ray.readthedocs.io/en/latest/api.html


为什么要使用 Ray?

        很多教程解释了如何使用 Python 的多进程模块(https://docs.python.org/2/library/multiprocessing.html)。遗憾的是,多进程模块在处理现代应用程序的要求方面存在严重的短板。这些要求包括以下这些内容:

在多台计算机上运行相同的代码。

构建有状态且可以与之通信的微服务和 actor。

优雅地处理机器故障。

有效处理大对象和数值数据。


AI的开源框架

与深度学习框架的关系: Ray与TensorFlow,PyTorch和MXNet等深度学习框架完全兼容,在许多应用中与Ray一起使用一个或多个深度学习框架是很自然的(例如,我们的强化学习库使用TensorFlow和PyTorch)。

与其他分布式系统的关系:今天使用了许多流行的分布式系统,但是其中大多数并不是用AI应用程序构建的,并且缺乏支持所需的性能以及表示AI应用程序的API。从今天的分布式系统来看,它们缺少以下功能(以各种组合方式):

支持毫秒级任务和每秒数百万个任务
嵌套并行(在任务内并行化任务,例如超参数搜索内部的并行模拟)(见下图)
在运行时动态确定任意任务依赖关系(例如,为了避免等待缓慢的工作人员)
在共享可变状态下运行的任务(例如,神经网络权重或模拟器)
支持异构资源(CPU,GPU等)

有两种使用Ray的主要方法:通过其较低级别的API和更高级别的库。较高级别的库建立在较低级别的API之上。目前这些包括Ray RLlib,一个可扩展的强化学习库和Ray.tune,一个高效的分布式超参数搜索库。

ray.init(redis_address="123.45.67.89:6379")

这些过程包括:


  • 有很多 worker 进程并行执行 Python 函数(大概是每个 CPU 核心对应一个 worker)。

  • 用于将“任务”分配给 worker(以及其他计算机)的调度程序进程。任务是 Ray 调度的工作单元,对应于一个函数调用或方法调用。

  • 共享内存对象存储库,用于在 worker 之间有效地共享对象(无需创建副本)。

  • 内存数据库,用于存储在发生机器故障时重新运行任务所需的元数据。

Ray worker 是独立的进程,而不是线程,因为在 Python 中存在全局解释器锁,所以对多线程的支持非常有限。


Ray低级API

Ray API的目标是自然地表达非常普遍的计算模式和应用程序,而不受像MapReduce这样的固定模式的限制。


动态任务图

Ray应用程序或作业中的基础基元是一个动态任务图。这与TensorFlow中的计算图非常不同。而在TensorFlow中,一个计算图代表一个神经网络,并且在单个应用程序中执行多次,在Ray中,任务图代表整个应用程序,并且只执行一次。任务图不是事先知道的。它是在应用程序运行时动态构建的,执行一个任务可能会触发创建更多任务。

在这里插入图片描述

任意的Python函数都可以作为任务执行,并且可以任意依赖其他任务的输出。下面的例子给出了说明。

要将 Python 函数 f 转换为一个“远程函数”(可以远程和异步执行的函数),可以使用 @ray.remote 装饰器来声明这个函数。然后函数调用 f.remote() 将立即返回一个 future(future 是对最终输出的引用),实际的函数执行将在后台进行(我们将这个函数执行称为任务)。

要将一个任务的输出作为输入提供给后续任务,只需将第一个任务返回的 future 作为参数传给第二个任务。Ray 的调度程序会自动考虑任务依赖关系。在第一个任务完成之前不会执行第二个任务,第一个任务的输出将自动被发送给执行第二个任务的机器。

import ray
import time,datetime# Start Ray.
ray.init()import numpy as np# 定义两个远程函数。
# 这些函数的调用创建了远程执行的任务@ray.remote
def create_matrix(size):return np.random.normal(size=size)
@ray.remote
def multiply_matrices(x, y):return np.dot(x, y)result_ids = []
for i in range(400):# 开始两个并行的任务,这些会立即返回futures并在后台执行x_id = create_matrix.remote([1000, 1000])print(datetime.datetime.now())y_id = create_matrix.remote([1000, 1000])print(datetime.datetime.now())# 开始第三个任务,但这并不会被提前计划,直到前两个任务都完成了.result_ids.append(multiply_matrices.remote(x_id, y_id))print(datetime.datetime.now())
# 获取结果。这个结果直到第三个任务完成才能得到。只有get创建以后所有的任务才开始创建执行。
z_id = ray.get(result_ids)
print(z_id)

机器耗能:

在这里插入图片描述


有效的使用聚合函数

下图是两个聚合过程和相应的函数。以线性方式聚合值与以树形结构方式聚合值的对比

在这里插入图片描述

右图方式的聚合函数会比左图方式的聚合更高校,因为在一个任务

import time
@ray.remote
def add(x, y):time.sleep(1)return x + y
# Aggregate the values slowly. This approach takes O(n) where n is the
# number of values being aggregated. In this case, 7 seconds.
id1 = add.remote(1, 2)
id2 = add.remote(id1, 3)
id3 = add.remote(id2, 4)
id4 = add.remote(id3, 5)
id5 = add.remote(id4, 6)
id6 = add.remote(id5, 7)
id7 = add.remote(id6, 8)
result = ray.get(id7)
# Aggregate the values in a tree-structured pattern. This approach
# takes O(log(n)). In this case, 3 seconds.
id1 = add.remote(1, 2)
id2 = add.remote(3, 4)
id3 = add.remote(5, 6)
id4 = add.remote(7, 8)
id5 = add.remote(id1, id2)
id6 = add.remote(id3, id4)
id7 = add.remote(id5, id6)
result = ray.get(id7)# Slow approach.
values = [1, 2, 3, 4, 5, 6, 7, 8]
while len(values) > 1:values = [add.remote(values[0], values[1])] + values[2:]
result = ray.get(values[0])
# Fast approach.
values = [1, 2, 3, 4, 5, 6, 7, 8]
while len(values) > 1:values = values[2:] + [add.remote(values[0], values[1])]
result = ray.get(values[0])

简单开始

首先来看一下最简单的Ray程序是如何编写的。

# 导入ray,并初始化执行环境
import ray
ray.init()# 定义ray remote函数
@ray.remote
def hello():return "Hello world !"# 异步执行remote函数,返回结果id
object_id = hello.remote()# 同步获取计算结果
hello = ray.get(object_id)# 输出计算结果
print hello

在Ray里,通过Python注解@ray.remote定义remote函数。使用此注解声明的函数都会自带一个默认的方法remote,通过此方法发起的函数调用都是以提交分布式任务的方式异步执行的,函数的返回值是一个对象id,使用ray.get内置操作可以同步获取该id对应的对象。熟悉Java里的Future机制的话对此应该并不陌生,或许会有人疑惑这和普通的异步函数调用没什么大的区别,但是这里最大的差异是,函数hello是分布式异步执行的。

remote函数是Ray分布式计算抽象中的核心概念,通过它开发者拥有了动态定制计算依赖(任务DAG)的能力。比如:

@ray.remote
def A():return "A"@ray.remote
def B():return "B"@ray.remote
def C(a, b):return "C"a_id = A.remote()
b_id = B.remote()
c_id = C.remote(a_id, b_id)
print ray.get(c_id)

例子代码中,对函数A、B的调用是完全并行执行的,但是对函数C的调用依赖于A、B函数的返回结果。Ray可以保证函数C需要等待A、B函数的结果真正计算出来后才会执行。如果将函数A、B、C类比为DAG的节点的话,那么DAG的边就是函数C参数对函数A、B计算结果的依赖,自由的函数调用方式允许Ray可以自由地定制DAG的结构和计算依赖关系。另外,提及一点的是Python的函数可以定义函数具有多个返回值,这也使得Python的函数更天然具备了DAG节点多入和多出的特点。

这里写图片描述


二、系统架构

Ray是使用什么样的架构对分布式计算做出如上抽象的呢,一下给出了Ray的系统架构(来自Ray论文,参考文献1)。

这里写图片描述

作为分布式计算系统,Ray仍旧遵循了典型的Master-Slave的设计:Master负责全局协调和状态维护,Slave执行分布式计算任务。不过和传统的分布式计算系统不同的是,Ray使用了混合任务调度的思路。在集群部署模式下,Ray启动了以下关键组件:


  1. GlobalScheduler:Master上启动了一个全局调度器,用于接收本地调度器提交的任务,并将任务分发给合适的本地任务调度器执行。
  2. RedisServer:Master上启动了一到多个RedisServer用于保存分布式任务的状态信息(ControlState),包括对象机器的映射、任务描述、任务debug信息等。
  3. LocalScheduler:每个Slave上启动了一个本地调度器,用于提交任务到全局调度器,以及分配任务给当前机器的Worker进程。
  4. Worker:每个Slave上可以启动多个Worker进程执行分布式任务,并将计算结果存储到ObjectStore。
  5. ObjectStore:每个Slave上启动了一个ObjectStore存储只读数据对象,Worker可以通过共享内存的方式访问这些对象数据,这样可以有效地减少内存拷贝和对象序列化成本。ObjectStore底层由Apache Arrow实现。
  6. Plasma:每个Slave上的ObjectStore都由一个名为Plasma的对象管理器进行管理,它可以在Worker访问本地ObjectStore上不存在的远程数据对象时,主动拉取其它Slave上的对象数据到当前机器。

需要说明的是,Ray的论文中提及,全局调度器可以启动一到多个,而目前Ray的实现文档里讨论的内容都是基于一个全局调度器的情况。我猜测可能是Ray尚在建设中,一些机制还未完善,后续读者可以留意此处的细节变化。

Ray的任务也是通过类似Spark中Driver的概念的方式进行提交的,有所不同的是:


  1. Spark的Driver提交的是任务DAG,一旦提交则不可更改。
  2. 而Ray提交的是更细粒度的remote function,任务DAG依赖关系由函数依赖关系自由定制。

论文给出的架构图里并未画出Driver的概念,因此我在其基础上做了一些修改和扩充。

这里写图片描述

Ray的Driver节点和和Slave节点启动的组件几乎相同,不过却有以下区别:


  1. Driver上的工作进程DriverProcess一般只有一个,即用户启动的PythonShell。Slave可以根据需要创建多个WorkerProcess。
  2. Driver只能提交任务,却不能接收来自全局调度器分配的任务。Slave可以提交任务,也可以接收全局调度器分配的任务。

Driver可以主动绕过全局调度器给Slave发送Actor调用任务(此处设计是否合理尚不讨论)。Slave只能接收全局调度器分配的计算任务


三、核心操作

基于以上架构,我们简单讨论一下Ray中关键的操作和流程。


1. ray.init()

在PythonShell中,使用ray.init()可以在本地启动ray,包括Driver、HeadNode(Master)和若干Slave。

import ray
ray.init()

如果是直连已有的Ray集群,只需要指定RedisServer的地址即可。

ray.init(redis_address="")

本地启动Ray得到的输出如下:

>>> ray.init()
Waiting for redis server at 127.0.0.1:58807 to respond...
Waiting for redis server at 127.0.0.1:23148 to respond...
Allowing the Plasma store to use up to 13.7439GB of memory.
Starting object store with directory /tmp and huge page support disabled
Starting local scheduler with 8 CPUs, 0 GPUs======================================================================
View the web UI at http://localhost:8888/notebooks/ray_ui62614.ipynb?token=7c253b0fd66fe41294d9f2c6739e3f002c1e76f6f59b99f5
======================================================================{'object_store_addresses': [ObjectStoreAddress(name='/tmp/plasma_store73540254', manager_name='/tmp/plasma_manager78072648', manager_port=39874)], 'redis_address': '127.0.0.1:58807', 'local_scheduler_socket_names': ['/tmp/scheduler98624129'], 'webui_url': 'http://localhost:8888/notebooks/ray_ui62614.ipynb?token=7c253b0fd66fe41294d9f2c6739e3f002c1e76f6f59b99f5', 'node_ip_address': '127.0.0.1'}

本地启动Ray时,可以看到Ray的WebUI的访问地址。


2. ray.put()

使用ray.put()可以将Python对象存入本地ObjectStore,并且异步返回一个唯一的ObjectID。通过该ID,Ray可以访问集群中任一个节点上的对象(远程对象通过查阅Master的对象表获得)。

对象一旦存入ObjectStore便不可更改,Ray的remote函数可以将直接将该对象的ID作为参数传入。使用ObjectID作为remote函数参数,可以有效地减少函数参数的写ObjectStore的次数。

@ray.remote
def f(x):passx = "hello"# 对象x往ObjectStore拷贝里10次
[f.remote(x) for _ in range(10)]# 对象x仅往ObjectStore拷贝1次
x_id = ray.put(x)
[f.remote(x_id) for _ in range(10)]

3. ray.get()

使用ray.get()可以通过ObjectID获取ObjectStore内的对象并将之转换为Python对象。对于数组类型的对象,Ray使用共享内存机制减少数据的拷贝成本。而对于其它对象则需要将数据从ObjectStore拷贝到进程的堆内存中。

如果调用ray.get()操作时,对象尚未创建好,则get操作会阻塞,直到对象创建完成后返回。get操作的关键流程如下: 

result_ids = [ray.put(i) for i in range(10)]
ray.get(result_ids) # [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

 

@ray.remote注解有一个参数num_return_vals用于声明remote函数的返回值个数,基于此实现remote函数的多返回值机制。


  • Driver或者Worker进程首先到ObjectStore内请求ObjectID对应的对象数据。
  • 如果本地ObjectStore没有对应的对象数据,本地对象管理器Plasma会检查Master上的对象表查看对象是否存储其它节点的ObjectStore。
  • 如果对象数据在其它节点的ObjectStore内,Plasma会发送网络请求将对象数据拉到本地ObjectStore。
  • 如果对象数据还没有创建好,Master会在对象创建完成后通知请求的Plasma读取。
  • 如果对象数据已经被所有的ObjectStore移除(被LRU策略删除),本地调度器会根据任务血缘关系执行对象的重新创建工作。
  • 一旦对象数据在本地ObjectStore可用,Driver或者Worker进程会通过共享内存的方式直接将对象内存区域映射到自己的进程地址空间中,并反序列化为Python对象。

    4. @ray.remote

    Ray中使用注解@ray.remote可以声明一个remote function。remote函数时Ray的基本任务调度单元,remote函数定义后会立即被序列化存储到RedisServer中,并且分配了一个唯一的ID,这样就保证了集群的所有节点都可以看到这个函数的定义。

    不过,这样对remote函数定义有了一个潜在的要求,即remote函数内如果调用了其它的用户函数,则必须提前定义,否则remote函数无法找到对应的函数定义内容。

    remote函数内也可以调用其它的remote函数,Driver和Slave每次调用remote函数时,其实都是向集群提交了一个计算任务,从这里也可以看到Ray的分布式计算的自由性。

    Ray中调用remote函数的关键流程如下:

  • 调用remote函数时,首先会创建一个任务对象,它包含了函数的ID、参数的ID或者值(Python的基本对象直接传值,复杂对象会先通过ray.put()操作存入ObjectStore然后返回ObjectID)、函数返回值对象的ID。
  • 任务对象被发送到本地调度器。
  • 本地调度器决定任务对象是在本地调度还是发送给全局调度器。如果任务对象的依赖(参数)在本地的ObejctStore已经存在且本地的CPU和GPU计算资源充足,那么本地调度器将任务分配给本地的WorkerProcess执行。否则,任务对象被发送给全局调度器并存储到任务表(TaskTable)中,全局调度器根据当前的任务状态信息决定将任务发给集群中的某一个本地调度器。
  • 本地调度器收到任务对象后(来自本地的任务或者全局调度分配的任务),会将其放入一个任务队列中,等待计算资源和本地依赖满足后分配给WorkerProcess执行。
  • Worker收到任务对象后执行该任务,并将函数返回值存入ObjectStore,并更新Master的对象表(ObjectTable)信息。
  • @ray.remote注解有一个参数num_return_vals用于声明remote函数的返回值个数,基于此实现remote函数的多返回值机制。

    @ray.remote(num_return_vals=2)
    def f():return 1, 2x_id, y_id = f.remote()
    ray.get(x_id) # 1
    ray.get(y_id) # 2

    原创:https://blog.csdn.net/luanpeng825485697/article/details/88242020?ops_request_misc=%257B%2522request%255Fid%2522%253A%2522160611079119724838534574%2522%252C%2522scm%2522%253A%252220140713.130102334..%2522%257D&request_id=160611079119724838534574&biz_id=0&utm_medium=distribute.pc_search_result.none-task-blog-2~all~sobaiduend~default-1-88242020.first_rank_ecpm_v3_pc_rank_v2&utm_term=python+%E5%88%86%E5%B8%83%E5%BC%8F&spm=1018.2118.3001.4449


推荐阅读
  • Python与R语言在功能和应用场景上各有优势。尽管R语言在统计分析和数据可视化方面具有更强的专业性,但Python作为一种通用编程语言,适用于更广泛的领域,包括Web开发、自动化脚本和机器学习等。对于初学者而言,Python的学习曲线更为平缓,上手更加容易。此外,Python拥有庞大的社区支持和丰富的第三方库,使其在实际应用中更具灵活性和扩展性。 ... [详细]
  • Python 数据可视化实战指南
    本文详细介绍如何使用 Python 进行数据可视化,涵盖从环境搭建到具体实例的全过程。 ... [详细]
  • 本文讨论了在进行 MySQL 数据迁移过程中遇到的所有 .frm 文件报错的问题,并提供了详细的解决方案和建议。 ... [详细]
  • 在Windows系统中安装TensorFlow GPU版的详细指南与常见问题解决
    在Windows系统中安装TensorFlow GPU版是许多深度学习初学者面临的挑战。本文详细介绍了安装过程中的每一个步骤,并针对常见的问题提供了有效的解决方案。通过本文的指导,读者可以顺利地完成安装并避免常见的陷阱。 ... [详细]
  • 在机器学习领域,深入探讨了概率论与数理统计的基础知识,特别是这些理论在数据挖掘中的应用。文章重点分析了偏差(Bias)与方差(Variance)之间的平衡问题,强调了方差反映了不同训练模型之间的差异,例如在K折交叉验证中,不同模型之间的性能差异显著。此外,还讨论了如何通过优化模型选择和参数调整来有效控制这一平衡,以提高模型的泛化能力。 ... [详细]
  • 【图像分类实战】利用DenseNet在PyTorch中实现秃头识别
    本文详细介绍了如何使用DenseNet模型在PyTorch框架下实现秃头识别。首先,文章概述了项目所需的库和全局参数设置。接着,对图像进行预处理并读取数据集。随后,构建并配置DenseNet模型,设置训练和验证流程。最后,通过测试阶段验证模型性能,并提供了完整的代码实现。本文不仅涵盖了技术细节,还提供了实用的操作指南,适合初学者和有经验的研究人员参考。 ... [详细]
  • 使用 Jupyter Notebook 实现 Markdown 编写与代码运行
    Jupyter Notebook 是一个开源的基于网页的应用程序,允许用户在同一文档中编写 Markdown 文本和运行多种编程语言的代码,并实时查看运行结果。 ... [详细]
  • 本文介绍了在 iOS 开发中设置图片和视图圆角的几种方法,包括通过 layer 设置圆角、使用贝塞尔曲线和 Core Graphics 框架,以及使用 CAShapeLayer 和 UIBezierPath。每种方法都有其优缺点,适用于不同的场景。 ... [详细]
  • PHP-Casbin v3.20.0 已经发布,这是一个使用 PHP 语言开发的轻量级开源访问控制框架,支持多种访问控制模型,包括 ACL、RBAC 和 ABAC。新版本在性能上有了显著的提升。 ... [详细]
  • 操作系统如何通过进程控制块管理进程
    本文详细介绍了操作系统如何通过进程控制块(PCB)来管理和控制进程。PCB是操作系统感知进程存在的重要数据结构,包含了进程的标识符、状态、资源清单等关键信息。 ... [详细]
  • 在Conda环境中高效配置并安装PyTorch和TensorFlow GPU版的方法如下:首先,创建一个新的Conda环境以避免与基础环境发生冲突,例如使用 `conda create -n pytorch_gpu python=3.7` 命令。接着,激活该环境,确保所有依赖项都正确安装。此外,建议在安装过程中指定CUDA版本,以确保与GPU兼容性。通过这些步骤,可以确保PyTorch和TensorFlow GPU版的顺利安装和运行。 ... [详细]
  • Python 程序转换为 EXE 文件:详细解析 .py 脚本打包成独立可执行文件的方法与技巧
    在开发了几个简单的爬虫 Python 程序后,我决定将其封装成独立的可执行文件以便于分发和使用。为了实现这一目标,首先需要解决的是如何将 Python 脚本转换为 EXE 文件。在这个过程中,我选择了 Qt 作为 GUI 框架,因为之前对此并不熟悉,希望通过这个项目进一步学习和掌握 Qt 的基本用法。本文将详细介绍从 .py 脚本到 EXE 文件的整个过程,包括所需工具、具体步骤以及常见问题的解决方案。 ... [详细]
  • 能够感知你情绪状态的智能机器人即将问世 | 科技前沿观察
    本周科技前沿报道了多项重要进展,包括美国多所高校在机器人技术和自动驾驶领域的最新研究成果,以及硅谷大型企业在智能硬件和深度学习技术上的突破性进展。特别值得一提的是,一款能够感知用户情绪状态的智能机器人即将问世,为未来的人机交互带来了全新的可能性。 ... [详细]
  • 通过使用CIFAR-10数据集,本文详细介绍了如何快速掌握Mixup数据增强技术,并展示了该方法在图像分类任务中的显著效果。实验结果表明,Mixup能够有效提高模型的泛化能力和分类精度,为图像识别领域的研究提供了有价值的参考。 ... [详细]
  • 在《Python编程基础》课程中,我们将深入探讨Python中的循环结构。通过详细解析for循环和while循环的语法与应用场景,帮助初学者掌握循环控制语句的核心概念和实际应用技巧。此外,还将介绍如何利用循环结构解决复杂问题,提高编程效率和代码可读性。 ... [详细]
author-avatar
greybt
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有