热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

Schedulerx2.0分布式计算原理&最佳实践

这篇文章重点是介绍基于schedulerx2.0的分布式执行引擎原理和最佳实践,相信看完这篇文章,大家都能写出高效率的分布式作业,说不定速度能提升好几倍:)Worker总体架构参考Yarn的架构,分为TaskMaster,Container,Processor三层:

1. 前言

Schedulerx2.0 的客户端提供分布式执行、多种任务类型、统一日志等框架,用户只要依赖schedulerx-worker这个jar包,通过schedulerx2.0提供的编程模型,简单几行代码就能实现一套高可靠可运维的分布式执行引擎。

这篇文章重点是介绍基于schedulerx2.0的分布式执行引擎原理和最佳实践,相信看完这篇文章,大家都能写出高效率的分布式作业,说不定速度能提升好几倍:)

2. 可扩展的执行引擎

Worker总体架构参考Yarn的架构,分为TaskMaster, Container, Processor三层:

Schedulerx2.0分布式计算原理&最佳实践

  • TaskMaster:类似于yarn的AppMaster,支持可扩展的分布式执行框架,进行整个jobInstance的生命周期管理、container的资源管理,同时还有failover等能力。默认实现StandaloneTaskMaster(单机执行),BroadcastTaskMaster(广播执行),MapTaskMaster(并行计算、内存网格、网格计算),MapReduceTaskMaster(并行计算、内存网格、网格计算)。
  • Container:执行业务逻辑的容器框架,支持线程/进程/docker/actor等。
  • Processor:业务逻辑框架,不同的processor表示不同的任务类型。

以MapTaskMaster为例,大概的原理如下图所示:

Schedulerx2.0分布式计算原理&最佳实践

3. 分布式编程模型之Map模型

Schedulerx2.0提供了多种分布式编程模型,这篇文章主要介绍Map模型(之后的文章还会介绍MapReduce模型,适用更多的业务场景),简单几行代码就可以将海量数据分布式到多台机器上进行分布式跑批,非常简单易用。

针对不同的跑批场景,map模型作业还提供了并行计算、内存网格、网格计算三种执行方式:

  • 并行计算:子任务300以下,有子任务列表。
  • 内存网格:子任务5W以下,无子任务列表,速度快。
  • 网格计算:子任务100W以下,无子任务列表。

4. 并行计算原理

因为并行任务具有子任务列表:

Schedulerx2.0分布式计算原理&最佳实践

如上图,子任务列表可以看到每个子任务的状态、机器,还有重跑、查看日志等操作。

因为并行计算要做到子任务级别的可视化,并且worker挂了、重启还能支持手动重跑,就需要把task持久化到server端:

Schedulerx2.0分布式计算原理&最佳实践

如上图所示:

  1. server触发jobInstance到某个worker,选中为master。
  2. MapTaskMaster选择某个worker执行root任务,当执行map方法时,会回调MapTaskMaster。
  3. MapTaskMaster收到map方法,会把task持久化到server端。
  4. 同时,MapTaskMaster还有个pull线程,不停拉取INIT状态的task,并派发给其他worker执行。

5. 网格计算原理

网格计算要支持百万级别的task,如果所有任务都往server回写,server肯定扛不住,所以网格计算的存储实际上是分布式在用户自己的机器上的:

Schedulerx2.0分布式计算原理&最佳实践

如上图所示:

  1. server触发jobInstance到某个worker,选中为master。
  2. MapTaskMaster选择某个worker执行root任务,当执行map方法时,会回调MapTaskMaster。
  3. MapTaskMaster收到map方法,会把task持久化到本地h2数据库。
  4. 同时,MapTaskMaster还有个pull线程,不停拉取INIT状态的task,并派发给其他worker执行。

6. 最佳实践

6.1 需求

举个例子:

  1. 读取A表中status=0的数据。
  2. 处理这些数据,插入B表。
  3. 把A表中处理过的数据的修改status=1。
  4. 数据量有4亿+,希望缩短时间。

6.2 反面案例

我们先看下如下代码是否有问题?

public class ScanSingleTableProcessor extends MapJobProcessor {
    private static int pageSize = 1000;

    @Override
    public ProcessResult process(JobContext context) {
        String taskName = context.getTaskName();
        Object task = context.getTask();

        if (WorkerConstants.MAP_TASK_ROOT_NAME.equals(taskName)) {
            int recordCount = queryRecordCount();
            int pageAmount = recordCount / pageSize;//计算分页数量
            for(int i = 0 ; i  recordList = queryRecord(i);//根据分页查询一页数据
                map(recordList, "record记录");//把子任务分发出去并行处理
            }
            return new ProcessResult(true);//true表示执行成功,false表示失败
        } else if ("record记录".equals(taskName)) {
            //TODO
            return new ProcessResult(true);
        }
        return new ProcessResult(false);
    }
}

如上面的代码所示,在root任务中,会把数据库所有记录读取出来,每一行就是一个Record,然后分发出去,分布式到不同的worker上去执行。逻辑是没有问题的,但是实际上性能非常的差。结合网格计算原理,我们把上面的代码绘制成下面这幅图:

Schedulerx2.0分布式计算原理&最佳实践

如上图所示,root任务一开始会全量的读取A表的数据,然后会全量的存到h2中,pull线程还会全量的从h2读取一次所有的task,还会分发给所有客户端。所以实际上对A表中的数据:

  • 全量读2次
  • 全量写一次
  • 全量传输一次

这个效率是非常低的。

6.3 正面案例

下面给出正面案例的代码:

public class ScanSingleTableJobProcessor extends MapJobProcessor {
    private static final int pageSize = 100;

    static class PageTask {
        private int startId;
        private int endId;
        public PageTask(int startId, int endId) {
             this.startId = startId;
             this.endId = endId;
        }
        public int getStartId() {
              return startId;
        }
        public int getEndId() {
              return endId;
        }
    }

    @Override
    public ProcessResult process(JobContext context) {
        String taskName = context.getTaskName();
        Object task = context.getTask();
        if (taskName.equals(WorkerConstants.MAP_TASK_ROOT_NAME)) {
            System.out.println("start root task");
            Pair idPair = queryMinAndMaxId();
            int minId = idPair.getFirst();
            int maxId = idPair.getSecond();
            List taskList = Lists.newArrayList();
            int step = (int) ((maxId - minId) / pageSize); //计算分页数量
            for (int i = minId; i  maxId ? maxId : i+step)));
            }
            return map(taskList, "Level1Dispatch");
        } else if (taskName.equals("Level1Dispatch")) {
            PageTask record = (PageTask)task;
            long startId = record.getStartId();
            long endId = record.getEndId();
            //TODO
            return new ProcessResult(true);
        }
        return new ProcessResult(true);
    }

    @Override
    public void postProcess(JobContext context) {
        //TODO
        System.out.println("all tasks is finished.");
    }

    private Pair queryMinAndMaxId() {
        //TODO select min(id),max(id) from xxx
        return null;
    }
}

如上面的代码所示,

  • 每个task不是整行记录的record,而是PageTask,里面就2个字段,startId和endId。
  • root任务,没有全量的读取A表,而是读一下整张表的minId和maxId,然后构造PageTask进行分页。比如task1表示PageTask[1,1000],task2表示PageTask[1001,2000]。每个task处理A表不同的数据。
  • 在下一级task中,如果拿到的是PageTask,再根据id区间去A表处理数据。

根据上面的代码和网格计算原理,得出下面这幅图:

Schedulerx2.0分布式计算原理&最佳实践

如上图所示,

  • A表只需要全量读取一次。
  • 子任务数量比反面案例少了上千、上万倍。
  • 子任务的body非常小,如果recod中有大字段,也少了上千、上万倍。

综上,对A表访问次数少了好几倍,对h2存储压力少了上万倍,不但执行速度可以快很多,还保证不会把自己本地的h2数据库搞挂。

本文作者:黄晓萌

阅读原文

本文为云栖社区原创内容,未经允许不得转载。


以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 我们


推荐阅读
  • 秒建一个后台管理系统?用这5个开源免费的Java项目就够了
    秒建一个后台管理系统?用这5个开源免费的Java项目就够了 ... [详细]
  • Web开发框架概览:Java与JavaScript技术及框架综述
    Web开发涉及服务器端和客户端的协同工作。在服务器端,Java是一种优秀的编程语言,适用于构建各种功能模块,如通过Servlet实现特定服务。客户端则主要依赖HTML进行内容展示,同时借助JavaScript增强交互性和动态效果。此外,现代Web开发还广泛使用各种框架和库,如Spring Boot、React和Vue.js,以提高开发效率和应用性能。 ... [详细]
  • 在开发过程中,我最初也依赖于功能全面但操作繁琐的集成开发环境(IDE),如Borland Delphi 和 Microsoft Visual Studio。然而,随着对高效开发的追求,我逐渐转向了更加轻量级和灵活的工具组合。通过 CLIfe,我构建了一个高度定制化的开发环境,不仅提高了代码编写效率,还简化了项目管理流程。这一配置结合了多种强大的命令行工具和插件,使我在日常开发中能够更加得心应手。 ... [详细]
  • 提升 Kubernetes 集群管理效率的七大专业工具
    Kubernetes 在云原生环境中的应用日益广泛,然而集群管理的复杂性也随之增加。为了提高管理效率,本文推荐了七款专业工具,这些工具不仅能够简化日常操作,还能提升系统的稳定性和安全性。从自动化部署到监控和故障排查,这些工具覆盖了集群管理的各个方面,帮助管理员更好地应对挑战。 ... [详细]
  • Spring框架的核心组件与架构解析 ... [详细]
  • Docker入门指南:初探容器化技术
    Docker入门指南:初探容器化技术摘要:Docker 是一个使用 Go 语言开发的开源容器平台,旨在实现应用程序的构建、分发和运行的标准化。通过将应用及其依赖打包成轻量级的容器,Docker 能够确保应用在任何环境中都能一致地运行,从而提高开发和部署的效率。本文将详细介绍 Docker 的基本概念、核心功能以及如何快速上手使用这一强大的容器化工具。 ... [详细]
  • CentOS 7环境下Jenkins的安装与前后端应用部署详解
    CentOS 7环境下Jenkins的安装与前后端应用部署详解 ... [详细]
  • 解决Bootstrap DataTable Ajax请求重复问题
    在最近的一个项目中,我们使用了JQuery DataTable进行数据展示,虽然使用起来非常方便,但在测试过程中发现了一个问题:当查询条件改变时,有时查询结果的数据不正确。通过FireBug调试发现,点击搜索按钮时,会发送两次Ajax请求,一次是原条件的请求,一次是新条件的请求。 ... [详细]
  • 在《Cocos2d-x学习笔记:基础概念解析与内存管理机制深入探讨》中,详细介绍了Cocos2d-x的基础概念,并深入分析了其内存管理机制。特别是针对Boost库引入的智能指针管理方法进行了详细的讲解,例如在处理鱼的运动过程中,可以通过编写自定义函数来动态计算角度变化,利用CallFunc回调机制实现高效的游戏逻辑控制。此外,文章还探讨了如何通过智能指针优化资源管理和避免内存泄漏,为开发者提供了实用的编程技巧和最佳实践。 ... [详细]
  • 在本文中,我们将探讨如何在Docker环境中高效地管理和利用数据库。首先,需要安装Docker Desktop以确保本地环境准备就绪。接下来,可以从Docker Hub中选择合适的数据库镜像,并通过简单的命令将其拉取到本地。此外,我们还将介绍如何配置和优化这些数据库容器,以实现最佳性能和安全性。 ... [详细]
  • 在探讨Hibernate框架的高级特性时,缓存机制和懒加载策略是提升数据操作效率的关键要素。缓存策略能够显著减少数据库访问次数,从而提高应用性能,特别是在处理频繁访问的数据时。Hibernate提供了多层次的缓存支持,包括一级缓存和二级缓存,以满足不同场景下的需求。懒加载策略则通过按需加载关联对象,进一步优化了资源利用和响应时间。本文将深入分析这些机制的实现原理及其最佳实践。 ... [详细]
  • Docker 中创建 CentOS 容器并安装 MySQL 进行本地连接
    本文详细介绍了如何在 Docker 中创建 CentOS 容器,并在容器中安装 MySQL 以实现本地连接。文章内容包括镜像拉取、容器创建、MySQL 安装与配置等步骤。 ... [详细]
  • 在日常的项目开发中,测试环境和生产环境通常采用HTTP协议访问服务。然而,从浏览器的角度来看,这种访问方式会被标记为不安全。为了提升安全性,当前大多数生产环境已经转向了HTTPS协议。本文将详细介绍如何在Spring Boot应用中配置SSL证书,以实现HTTPS安全访问。通过这一过程,不仅可以增强数据传输的安全性,还能提高用户对系统的信任度。 ... [详细]
  • 在 Manjaro 系统中,出现了一种由未预期的符号 `newline` 引起的 bash 语法错误。具体表现为系统提示“bash: 附近有语法错误”。通过将相关字符串从双引号改为单引号,可以有效解决这一问题。此外,建议在编写脚本时,注意检查换行符和特殊字符的使用,以避免类似错误的发生。 ... [详细]
  • 如何在IntelliJ IDEA中生成Maven项目的所有Jar包依赖关系图
    本文详细介绍了如何在IntelliJ IDEA中生成Maven项目的完整Jar包依赖关系图。通过具体步骤和示例,帮助开发者清晰地理解并掌握这一重要功能,适合希望深入了解Maven依赖管理的读者学习参考。 ... [详细]
author-avatar
ccer
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有