热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

多任务处理器更新

2020年06月16日更新源码以及详细说明请移步更新后的版本号:implementationcom.ripple.component:task:0.0.6一、背景

2020年06月16日更新

源码以及详细说明请移步
更新后的版本号:

implementation 'com.ripple.component:task:0.0.6'

一、背景

咱们来分析一下多任务,在使用者的角度可以简单理解为其是一个黑盒,使用者放入之后经过黑盒处理之后再取出这样就达到了最终的结果。


二、分析抽象

既然是这样我们可以抽象一下,因为在linux中万物皆文件,所以咱们传入的其实是一个废弃:sourcePath,新增:source:S,那么下一步我们就考虑我们想要的是什么了,然后咱们可以把那个黑盒理解为规则,那么可以抽象为,废弃:fun parse(sourcePath:String,targetPath:String?):String,
新增:fun parse(source: S, target: T?): T这里估计大家会疑问为什么会有targetPath,不是已经有处理结果了么,这其实是使其更具扩展性,比如要处理一个文件,使用者在处理之前就已经为其定好了targetPath,那么在使用时直接传入即可,但是还有一种比如将图片转为base64那么知道的只是规则结果是未知的,这时候就需要去取这个返回值了。
经过以上的分析这个库的主干就出来了,那么下一步就是要为其装饰了。
使用者在使用时肯定想的是这个库能够处理批量任务并且能够有相应的回调通知,这样使用者只需要自己定义好处理规则封装为对象,传入这个多任务处理器引擎中,得到相应的回调。
更新后定义的接口:
但是更新后的调用基本没有变化,回调结果由泛型推导出实际类型

/*** Author: fanyafeng* Data: 2020/6/3 19:15* Email: fanyafeng@live.cn* Description:*/
interface ProcessModel : Serializable {companion object {const val PROCESS_ITEM = "process_item"const val PROCESS_LIST = "process_list"}/*** 获取需要处理的源路径*/fun getSource(): S/*** 目标路径*/fun getTarget(): T?/*** 处理后的目标路径不能为空*/fun setTarget(target: T)/*** 任务解析器* 这里按道理说如果有了targetPath那么这个返回值是可以不需要的* 但是就是因为如果你去处理一个任务但是有规则没有输出那么这个返回值就是必须的了* 而且不能为空** 分为以下两种以下两种情况:* 1.处理文件类* @param source 源* @param target 结果** 2.有处理规则,和原路径,那么方法的返回值就是处理结果* @param source 源**/fun parse(source: S, target: T?): T/*** 简化接口调用*/abstract class ProcessSimpleModel(var sourcePath: S, var targetPath: T) : ProcessModel {override fun getSource(): S {return sourcePath}override fun getTarget(): T? {return targetPath}override fun setTarget(target: T) {this.targetPath = target}}
}

三、多任务处理器结构图

多任务处理器结构图


四、伪代码真调用

这里以kotlin为例,当然也支持java,只是kotlin使用起来会更简洁,来模拟一下咱们想要调用

handleTaskList(taskList){onSuccess{ successResult->}onFailed{ failedResult->}
}

以上只是简单写了几个回调,基本调用方式是这样,可能使用者还想要自定义一下线程池,并行,串行等


五、编码

因为要写的是一个框架类的库,需要对扩展开放对修改关闭,此时再根据上面的抽象以及图便可以很清晰明了的将其完成,下面来细看


5.1 任务Model封装

因为是批量任务处理,并且还需要兼容不同类型,那么这个Model必须要实现ProcessModel接口,设计之初为了灵活设计了三个接口,但是又发现用起来比较麻烦,后来又将其粒度变大,有舍有得吧。下面来看一下接口的实现:
PS:注释中有详细的说明不再做细说明

/*** Author: fanyafeng* Data: 2020/6/3 19:15* Email: fanyafeng@live.cn* Description: 单项任务接口*/
interface ProcessModel : Serializable {companion object {const val PROCESS_ITEM = "process_item"const val PROCESS_LIST = "process_list"}/*** 获取需要处理的源路径* 不能为空皮之不存毛将焉附*/fun getSourcePath(): String/*** 目标路径可以为空* 因为有时只知道规则不知道结果*/fun getTargetPath(): String?/*** 处理后的目标路径不能为空*/fun setTargetPath(target: String)/*** 任务解析器* 这里按道理说如果有了targetPath那么这个返回值是可以不需要的* 但是就是因为如果你去处理一个任务但是有规则没有输出那么这个返回值就是必须的了* 而且不能为空** 分为以下两种以下两种情况:* 1.处理文件类* @param sourcePath 文件原路径* @param targetPath 文件目标路径** 2.有处理规则,和原路径,那么方法的返回值就是处理结果* @param sourcePath 文件原路径**/fun parse(sourcePath: String, targetPath: String?): String
}

5.2 引擎处理设置

这里使用的是java的线程池的主要接口ExecutorService,主要是其中为我们封装好了我们需要的一些通用的方法,我这里还是把任务交给线程去处理,如果是单线程则是串行,多的话就是并行处理了,还可以实现接口进行自定义
更新后不仅不会每次都新建线程,而是去复用之前的线程并且为线程添加了name方便定位问题,防止重复创建太多的匿名线程,

/*** Author: fanyafeng* Data: 2020/5/6 17:51* Email: fanyafeng@live.cn* Description: 使用java自带的任务服务*/
interface ProcessEngine : Serializable {companion object {internal var singleExecutorInner = Executors.newSingleThreadExecutor(object :ThreadFactory {val atomic = AtomicInteger(1)override fun newThread(r: Runnable): Thread {return Thread(r, "ripple-task-内部单线程池-" + atomic.getAndIncrement())}})internal val SINGLE_THREAD_EXECUTOR_INNER: ProcessEngine =object : ProcessEngine {override fun getExecutorService(): ExecutorService {return if (!singleExecutorInner.isShutdown) {singleExecutorInner} else {singleExecutorInner = Executors.newSingleThreadExecutor(object :ThreadFactory {val atomic = AtomicInteger(1)override fun newThread(r: Runnable): Thread {return Thread(r, "ripple-task-内部单线程池-" + atomic.getAndIncrement())}})singleExecutorInner}}override fun shutdown() {singleExecutorInner.shutdown()}}/*** 单线程处理器* 处理任务为串行处理*/private var singleExecutor = Executors.newSingleThreadExecutor(object :ThreadFactory {val atomic = AtomicInteger(1)override fun newThread(r: Runnable): Thread {return Thread(r, "ripple-task-内置单线程池-" + atomic.getAndIncrement())}})val SINGLE_THREAD_EXECUTOR: ProcessEngine =object : ProcessEngine {override fun getExecutorService(): ExecutorService {return if (!singleExecutor.isShutdown) {singleExecutor} else {singleExecutor = Executors.newSingleThreadExecutor(object :ThreadFactory {val atomic = AtomicInteger(1)override fun newThread(r: Runnable): Thread {return Thread(r, "ripple-task-内置单线程池-" + atomic.getAndIncrement())}})singleExecutor}}override fun shutdown() {singleExecutor.shutdown()}}/*** 自带6个线程的处理器* 不用纠结个数为什么这么定义,纯属个人喜欢的数字* 处理任务为并行处理,并且顺序是打乱的*/private var maxExecutor = Executors.newFixedThreadPool(Thread.MAX_PRIORITY, object :ThreadFactory {val atomic = AtomicInteger(1)override fun newThread(r: Runnable): Thread {return Thread(r, "ripple-task-内置最大线程池-" + atomic.getAndIncrement())}})val MULTI_THREAD_EXECUTOR_MAX: ProcessEngine =object : ProcessEngine {override fun getExecutorService(): ExecutorService {return if (!maxExecutor.isShutdown) {maxExecutor} else {maxExecutor = Executors.newFixedThreadPool(Thread.MAX_PRIORITY, object :ThreadFactory {val atomic = AtomicInteger(1)override fun newThread(r: Runnable): Thread {return Thread(r, "ripple-task-内置最大线程池-" + atomic.getAndIncrement())}})maxExecutor}}override fun shutdown() {maxExecutor.shutdown()}}private var normalExecutor = Executors.newFixedThreadPool(Thread.NORM_PRIORITY, object :ThreadFactory {val atomic = AtomicInteger(1)override fun newThread(r: Runnable): Thread {return Thread(r, "ripple-task-内置一般线程池-" + atomic.getAndIncrement())}})val MULTI_THREAD_EXECUTOR_NORMAL: ProcessEngine =object : ProcessEngine {override fun getExecutorService(): ExecutorService {return if (!normalExecutor.isShutdown) {normalExecutor} else {normalExecutor = Executors.newFixedThreadPool(Thread.NORM_PRIORITY, object :ThreadFactory {val atomic = AtomicInteger(1)override fun newThread(r: Runnable): Thread {return Thread(r, "ripple-task-内置一般线程池-" + atomic.getAndIncrement())}})normalExecutor}}override fun shutdown() {normalExecutor.shutdown()}}private var minExecutor = Executors.newFixedThreadPool(Thread.MIN_PRIORITY, object :ThreadFactory {val atomic = AtomicInteger(1)override fun newThread(r: Runnable): Thread {return Thread(r, "ripple-task-内置最小线程池-" + atomic.getAndIncrement())}})val MULTI_THREAD_EXECUTOR_MIN: ProcessEngine =object : ProcessEngine {override fun getExecutorService(): ExecutorService {return if (!minExecutor.isShutdown) {minExecutor} else {minExecutor = Executors.newFixedThreadPool(Thread.MIN_PRIORITY, object :ThreadFactory {val atomic = AtomicInteger(1)override fun newThread(r: Runnable): Thread {return Thread(r, "ripple-task-内置最小线程池-" + atomic.getAndIncrement())}})minExecutor}}override fun shutdown() {minExecutor.shutdown()}}}/*** 获取任务处理任务** 1、线程池: 提供一个线程队列,队列中保存着所有等待状态的线程。避免了创建与销毁的额外开销,提高了响应的速度。** [java.util.concurrent.Executor]* [java.util.concurrent.ExecutorService]* [java.util.concurrent.ThreadPoolExecutor]* [java.util.concurrent.ScheduledExecutorService]* [java.util.concurrent.ScheduledThreadPoolExecutor]** 2、线程池的体系结构:* java.util.concurrent.Executor 负责线程的使用和调度的根接口* |--ExecutorService 子接口: 线程池的主要接口* |--ThreadPoolExecutor 线程池的实现类* |--ScheduledExecutorService 子接口: 负责线程的调度* |--ScheduledThreadPoolExecutor : 继承ThreadPoolExecutor,实现了ScheduledExecutorService** [java.util.concurrent.Executor]** 3、工具类 : Executors* ExecutorService newFixedThreadPool() : 创建固定大小的线程池* ExecutorService newCachedThreadPool() : 缓存线程池,线程池的数量不固定,可以根据需求自动的更改数量。* ExecutorService newSingleThreadExecutor() : 创建单个线程池。 线程池中只有一个线程** ScheduledExecutorService newScheduledThreadPool() : 创建固定大小的线程,可以延迟或定时的执行任务*/fun getExecutorService(): ExecutorService/*** 停止任务*/fun shutdown()}

5.3 核心任务处理

这里就包含了任务处理器以及任务回调了,因为使用者想要的就是把任务处理完成以及结果的回调不论成功或者失败。
更新后的task添加了泛型支持,需要外部传入类型,增加了通用性

/*** Author: fanyafeng* Data: 2020/6/3 19:39* Email: fanyafeng@live.cn* Description:*/
interface ProcessTask {/*** 所有单个任务回调,任务回调包含以下所有回调,但是为了简化使用* 除去单项任务完成回调必须重写其余进行了空实现** 单项任务开始回调* [com.ripple.task.callback.OnItemStart]** 单项任务进行回调* [com.ripple.task.callback.OnItemDoing]** 单项任务打断回调* [com.ripple.task.callback.OnItemInterrupted]** 单项任务失败回调* [com.ripple.task.callback.OnItemFailed]** 单项任务成功回调* [com.ripple.task.callback.OnItemSuccess]** 单项任务完成回调* [com.ripple.task.callback.OnItemFinish]*/fun getItemResult(): OnItemResult>?/*** 所有任务回调,基本同上除去具体回调** 所有任务开始回调* [com.ripple.task.callback.OnStart]** 所有任务进行回调* [com.ripple.task.callback.OnDoing]** 所有任务失败回调* [com.ripple.task.callback.OnFailed]** 所有任务成功回调* [com.ripple.task.callback.OnSuccess]** 所有任务完成结束回调* [com.ripple.task.callback.OnFinish]*/fun getAllResult(): OnAllResult>>?/*** 获取任务处理器引擎* [com.ripple.task.engine.ProcessEngine]*/fun getProcessEngine(): ProcessEngine
}

5.4 以上任务处理的必须部分都完成,剩下的就是将任务model交付ProcessTask处理即可

因为库的主要作用是为了方便大家使用,这里进行默认实现,因为代码量略大不在这里贴了,如果感觉默认的使用不能满足大家的使用,大家可以自己在实现相关接口之后进行自己的impl,这里贴一下库的使用代码,所有回调的结果都有,需要哪个回调实现哪个回调方法即可。


5.4.1 以下为java方式调用

主要是kotlin使用比较简洁,但是兼容了java的调用:
更新后以string为例:

val task = ProcessTaskImpl()
task.onAllResult = object : OnAllResult>> {override fun onFinish(finishResult: List>?,unFinishResult: List>?) {TODO("所有任务完成回调")}}
task.handleTaskList(listOf(Task1("abdaafda"),Task2("不会输出"),Task3("abdaafda又变大写了")
))

5.4.2 以下为kotlin方式调用

kotlin调用的话就稍微简单一点

handleTaskList(listOf(Task1("abdaafda"),Task2("不会输出"),Task3("abdaafda又变大写了"))
) {onSuccess { successResult ->}onFailed { failedResult ->}onFinish { finishResult, unFinishResult ->println("结果回调" + finishResult.toString())println(unFinishResult.toString())}
}

推荐阅读
  • 本文讨论了clone的fork与pthread_create创建线程的不同之处。进程是一个指令执行流及其执行环境,其执行环境是一个系统资源的集合。在调用系统调用fork创建一个进程时,子进程只是完全复制父进程的资源,这样得到的子进程独立于父进程,具有良好的并发性。但是二者之间的通讯需要通过专门的通讯机制,另外通过fork创建子进程系统开销很大。因此,在某些情况下,使用clone或pthread_create创建线程可能更加高效。 ... [详细]
  • Java太阳系小游戏分析和源码详解
    本文介绍了一个基于Java的太阳系小游戏的分析和源码详解。通过对面向对象的知识的学习和实践,作者实现了太阳系各行星绕太阳转的效果。文章详细介绍了游戏的设计思路和源码结构,包括工具类、常量、图片加载、面板等。通过这个小游戏的制作,读者可以巩固和应用所学的知识,如类的继承、方法的重载与重写、多态和封装等。 ... [详细]
  • Linux服务器密码过期策略、登录次数限制、私钥登录等配置方法
    本文介绍了在Linux服务器上进行密码过期策略、登录次数限制、私钥登录等配置的方法。通过修改配置文件中的参数,可以设置密码的有效期、最小间隔时间、最小长度,并在密码过期前进行提示。同时还介绍了如何进行公钥登录和修改默认账户用户名的操作。详细步骤和注意事项可参考本文内容。 ... [详细]
  • 生成式对抗网络模型综述摘要生成式对抗网络模型(GAN)是基于深度学习的一种强大的生成模型,可以应用于计算机视觉、自然语言处理、半监督学习等重要领域。生成式对抗网络 ... [详细]
  • Iamtryingtomakeaclassthatwillreadatextfileofnamesintoanarray,thenreturnthatarra ... [详细]
  • 在Android开发中,使用Picasso库可以实现对网络图片的等比例缩放。本文介绍了使用Picasso库进行图片缩放的方法,并提供了具体的代码实现。通过获取图片的宽高,计算目标宽度和高度,并创建新图实现等比例缩放。 ... [详细]
  • 向QTextEdit拖放文件的方法及实现步骤
    本文介绍了在使用QTextEdit时如何实现拖放文件的功能,包括相关的方法和实现步骤。通过重写dragEnterEvent和dropEvent函数,并结合QMimeData和QUrl等类,可以轻松实现向QTextEdit拖放文件的功能。详细的代码实现和说明可以参考本文提供的示例代码。 ... [详细]
  • Linux重启网络命令实例及关机和重启示例教程
    本文介绍了Linux系统中重启网络命令的实例,以及使用不同方式关机和重启系统的示例教程。包括使用图形界面和控制台访问系统的方法,以及使用shutdown命令进行系统关机和重启的句法和用法。 ... [详细]
  • android listview OnItemClickListener失效原因
    最近在做listview时发现OnItemClickListener失效的问题,经过查找发现是因为button的原因。不仅listitem中存在button会影响OnItemClickListener事件的失效,还会导致单击后listview每个item的背景改变,使得item中的所有有关焦点的事件都失效。本文给出了一个范例来说明这种情况,并提供了解决方法。 ... [详细]
  • 本文讨论了一个关于cuowu类的问题,作者在使用cuowu类时遇到了错误提示和使用AdjustmentListener的问题。文章提供了16个解决方案,并给出了两个可能导致错误的原因。 ... [详细]
  • ALTERTABLE通过更改、添加、除去列和约束,或者通过启用或禁用约束和触发器来更改表的定义。语法ALTERTABLEtable{[ALTERCOLUMNcolu ... [详细]
  • Java学习笔记之面向对象编程(OOP)
    本文介绍了Java学习笔记中的面向对象编程(OOP)内容,包括OOP的三大特性(封装、继承、多态)和五大原则(单一职责原则、开放封闭原则、里式替换原则、依赖倒置原则)。通过学习OOP,可以提高代码复用性、拓展性和安全性。 ... [详细]
  • Html5-Canvas实现简易的抽奖转盘效果
    本文介绍了如何使用Html5和Canvas标签来实现简易的抽奖转盘效果,同时使用了jQueryRotate.js旋转插件。文章中给出了主要的html和css代码,并展示了实现的基本效果。 ... [详细]
  • 浏览器中的异常检测算法及其在深度学习中的应用
    本文介绍了在浏览器中进行异常检测的算法,包括统计学方法和机器学习方法,并探讨了异常检测在深度学习中的应用。异常检测在金融领域的信用卡欺诈、企业安全领域的非法入侵、IT运维中的设备维护时间点预测等方面具有广泛的应用。通过使用TensorFlow.js进行异常检测,可以实现对单变量和多变量异常的检测。统计学方法通过估计数据的分布概率来计算数据点的异常概率,而机器学习方法则通过训练数据来建立异常检测模型。 ... [详细]
  • 深入理解Kafka服务端请求队列中请求的处理
    本文深入分析了Kafka服务端请求队列中请求的处理过程,详细介绍了请求的封装和放入请求队列的过程,以及处理请求的线程池的创建和容量设置。通过场景分析、图示说明和源码分析,帮助读者更好地理解Kafka服务端的工作原理。 ... [详细]
author-avatar
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有