热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

C和Java没那么香了,Serverless时代Rust即将称王?

ServerlessComputing,即”无服务器计算”,其实这一概念在刚刚提出的时候并没有获得太多的关注,直到2014年AWSLambda这一里程碑式的产品出现。Serverl

高并发模式初探

在这个高并发时代最重要的设计模式无疑是生产者、消费者模式,比如著名的消息队列kafka其实就是一个生产者消费者模式的典型实现。其实生产者消费者问题,也就是有限缓冲问题,可以用以下场景进行简要描述,生产者生成一定量的产品放到库房,并不断重复此过程;与此同时,消费者也在缓冲区消耗这些数据,但由于库房大小有限,所以生产者和消费者之间步调协调,生产者不会在库房满的情况放入端口,消费者也不会在库房空时消耗数据。详见下图:

而如果在生产者与消费者之间完美协调并保持高效,这就是高并发要解决的本质问题。

C语言的高并发案例

笔者在前文曾经介绍过TDEngine的相关代码,其中Sheduler模块的相关调度算法就使用了生产、消费者模式进行消息传递功能的实现,也就是有多个生产者(producer)生成并不断向队列中传递消息,也有多个消费者(consumer)不断从队列中取消息。

后面我们也会说明类型功能在Go、Java等高级语言中类似的功能已经被封装好了,但是在C语言中你就必须要用好互斥体( mutex)和信号量(semaphore)并协调他们之间的关系。由于C语言的实现是最复杂的,先来看结构体设计和他的注释:

typedef struct {
  char            label[16];//消息内容
  sem_t           emptySem;//此信号量代表队列的可写状态
  sem_t           fullSem;//此信号量代表队列的可读状态
  pthread_mutex_t queueMutex;//此互斥体为保证消息不会被误修改,保证线程程安全
  int             fullSlot;//队尾位置
  int             emptySlot;//队头位置
  int             queueSize;#队列长度
  int             numOfThreads;//同时操作的线程数量
  pthread_t *     qthread;//线程指针
  SSchedMsg *     queue;//队列指针
} SSchedQueue;


再来看Shceduler初始化函数,这里需要特别说明的是,两个信号量的创建,其中emptySem是队列的可写状态,初始化时其值为queueSize,即初始时队列可写,可接受消息长度为队列长度,fullSem是队列的可读状态,初始化时其值为0,即初始时队列不可读。具体代码及我的注释如下:

void *taosInitScheduler(int queueSize, int numOfThreads, char *label) {
  pthread_attr_t attr;
  SSchedQueue *  pSched = (SSchedQueue *)malloc(sizeof(SSchedQueue));
  memset(pSched, 0, sizeof(SSchedQueue));
  pSched->queueSize = queueSize;
  pSched->numOfThreads = numOfThreads;
  strcpy(pSched->label, label);
  if (pthread_mutex_init(&pSched->queueMutex, NULL) <0) {
    pError("init %s:queueMutex failed, reason:%s", pSched->label, strerror(errno));
    goto _error;
  }
   //emptySem是队列的可写状态,初始化时其值为queueSize,即初始时队列可写,可接受消息长度为队列长度。
  if (sem_init(&pSched->emptySem, 0, (unsigned int)pSched->queueSize) != 0) {
    pError("init %s:empty semaphore failed, reason:%s", pSched->label, strerror(errno));
    goto _error;
  }
 //fullSem是队列的可读状态,初始化时其值为0,即初始时队列不可读
  if (sem_init(&pSched->fullSem, 0, 0) != 0) {
    pError("init %s:full semaphore failed, reason:%s", pSched->label, strerror(errno));
    goto _error;
  }
  if ((pSched->queue = (SSchedMsg *)malloc((size_t)pSched->queueSize * sizeof(SSchedMsg))) == NULL) {
    pError("%s: no enough memory for queue, reason:%s", pSched->label, strerror(errno));
    goto _error;
  }
  memset(pSched->queue, 0, (size_t)pSched->queueSize * sizeof(SSchedMsg));
  pSched->fullSlot = 0;//实始化时队列为空,故队头和队尾的位置都是0
  pSched->emptySlot = 0;//实始化时队列为空,故队头和队尾的位置都是0
  pSched->qthread = malloc(sizeof(pthread_t) * (size_t)pSched->numOfThreads);
  pthread_attr_init(&attr);
  pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
  for (int i = 0; i numOfThreads; ++i) {
    if (pthread_create(pSched->qthread + i, &attr, taosProcessSchedQueue, (void *)pSched) != 0) {
      pError("%s: failed to create rpc thread, reason:%s", pSched->label, strerror(errno));
      goto _error;
    }
  }
  pTrace("%s scheduler is initialized, numOfThreads:%d", pSched->label, pSched->numOfThreads);
  return (void *)pSched;
_error:
  taosCleanUpScheduler(pSched);
  return NULL;
}

再来看读消息的taosProcessSchedQueue函数这其实是消费者一方的实现,这个函数的主要逻辑是

1.使用无限循环,只要队列可读即sem_wait(&pSched->fullSem)不再阻塞就继续向下处理

2.在操作msg前,加入互斥体防止msg被误用。

3.读操作完毕后修改fullSlot的值,注意这为避免fullSlot溢出,需要对于queueSize取余。同时退出互斥体。

4.对emptySem进行post操作,即把emptySem的值加1,如emptySem原值为5,读取一个消息后,emptySem的值为6,即可写状态,且能接受的消息数量为6

具体代码及注释如下:

void *taosProcessSchedQueue(void *param) {
  SSchedMsg    msg;
  SSchedQueue *pSched = (SSchedQueue *)param;
 //注意这里是个无限循环,只要队列可读即sem_wait(&pSched->fullSem)不再阻塞就继续处理
  while (1) {
    if (sem_wait(&pSched->fullSem) != 0) {
      pError("wait %s fullSem failed, errno:%d, reason:%s", pSched->label, errno, strerror(errno));
      if (errno == EINTR) {
        /* sem_wait is interrupted by interrupt, ignore and continue */
        continue;
      }
    }
     //加入互斥体防止msg被误用。
    if (pthread_mutex_lock(&pSched->queueMutex) != 0)
      pError("lock %s queueMutex failed, reason:%s", pSched->label, strerror(errno));
    msg = pSched->queue[pSched->fullSlot];
    memset(pSched->queue + pSched->fullSlot, 0, sizeof(SSchedMsg));
    //读取完毕修改fullSlot的值,注意这为避免fullSlot溢出,需要对于queueSize取余。
    pSched->fullSlot = (pSched->fullSlot + 1) % pSched->queueSize;
     //读取完毕修改退出互斥体
    if (pthread_mutex_unlock(&pSched->queueMutex) != 0)
      pError("unlock %s queueMutex failed, reason:%s
", pSched->label, strerror(errno));
     //读取完毕对emptySem进行post操作,即把emptySem的值加1,如emptySem原值为5,读取一个消息后,emptySem的值为6,即可写状态,且能接受的消息数量为6
    if (sem_post(&pSched->emptySem) != 0)
      pError("post %s emptySem failed, reason:%s
", pSched->label, strerror(errno));
    if (msg.fp)
      (*(msg.fp))(&msg);
    else if (msg.tfp)
      (*(msg.tfp))(msg.ahandle, msg.thandle);
  }
}

最后写消息的taosScheduleTask函数也就是生产的实现,其基本逻辑是

1.写队列前先对emptySem进行减1操作,如emptySem原值为1,那么减1后为0,也就是队列已满,必须在读取消息后,即emptySem进行post操作后,队列才能进行可写状态。

2.加入互斥体防止msg被误操作,写入完成后退出互斥体

3.写队列完成后对fullSem进行加1操作,如fullSem原值为0,那么加1后为1,也就是队列可读,咱们上面介绍的读取taosProcessSchedQueue中sem_wait(&pSched->fullSem)不再阻塞就继续向下。

int taosScheduleTask(void *qhandle, SSchedMsg *pMsg) {
  SSchedQueue *pSched = (SSchedQueue *)qhandle;
  if (pSched == NULL) {
    pError("sched is not ready, msg:%p is dropped", pMsg);
    return 0;
  }
  //在写队列前先对emptySem进行减1操作,如emptySem原值为1,那么减1后为0,也就是队列已满,必须在读取消息后,即emptySem进行post操作后,队列才能进行可写状态。
  if (sem_wait(&pSched->emptySem) != 0) pError("wait %s emptySem failed, reason:%s", pSched->label, strerror(errno));
//加入互斥体防止msg被误操作
  if (pthread_mutex_lock(&pSched->queueMutex) != 0)
    pError("lock %s queueMutex failed, reason:%s", pSched->label, strerror(errno));
  pSched->queue[pSched->emptySlot] = *pMsg;
  pSched->emptySlot = (pSched->emptySlot + 1) % pSched->queueSize;
  if (pthread_mutex_unlock(&pSched->queueMutex) != 0)
    pError("unlock %s queueMutex failed, reason:%s", pSched->label, strerror(errno));
  //在写队列前先对fullSem进行加1操作,如fullSem原值为0,那么加1后为1,也就是队列可读,咱们上面介绍的读取函数可以进行处理。
  if (sem_post(&pSched->fullSem) != 0) pError("post %s fullSem failed, reason:%s", pSched->label, strerror(errno));
  return 0;
}

Java的高并发实现

从并发模型来看,Go和Rust都有channel这个概念,也都是通过Channel来实现线(协)程间的同步,由于channel带有读写状态且保证数据顺序,而且channel的封装程度和效率明显可以做的更高,因此Go和Rust官方都会建议使用channel(通信)来共享内存,而不是使用共享内存来通信。

为了让帮助大家找到区别,我们先以Java为例来,看一下没有channel的高级语言Java,生产者消费者该如何实现,代码及注释如下:

public class Storage {
    // 仓库最大存储量
    private final int MAX_SIZE = 10;
    // 仓库存储的载体
    private LinkedList list = new LinkedList();
    // 锁
    private final Lock lock = new ReentrantLock();
    // 仓库满的信号量
    private final Condition full = lock.newCondition();
    // 仓库空的信号量
    private final Condition empty = lock.newCondition();
    public void produce()
    {
        // 获得锁
        lock.lock();
        while (list.size() + 1 > MAX_SIZE) {
            System.out.println("【生产者" + Thread.currentThread().getName()
		             + "】仓库已满");
            try {
                full.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        list.add(new Object());
        System.out.println("【生产者" + Thread.currentThread().getName() 
				 + "】生产一个产品,现库存" + list.size());
        empty.signalAll();
        lock.unlock();
    }
    public void consume()
    {
        // 获得锁
        lock.lock();
        while (list.size() == 0) {
            System.out.println("【消费者" + Thread.currentThread().getName()
		             + "】仓库为空");
            try {
                empty.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        list.remove();
        System.out.println("【消费者" + Thread.currentThread().getName()
		         + "】消费一个产品,现库存" + list.size());
        full.signalAll();
        lock.unlock();
    }
}

在Java、C#这种面向对象,但是没有channel语言中,生产者、消费者模式至少要借助一个lock和两个信号量共同完成。其中锁的作用是保证同是时间,仓库中只有一个用户进行数据的修改,而还需要表示仓库满的信号量,一旦达到仓库满的情况则将此信号量置为阻塞状态,从而阻止其它生产者再向仓库运商品了,反之仓库空的信号量也是一样,一旦仓库空了,也要阻其它消费者再前来消费了。

Go的高并发实现

我们刚刚也介绍过了Go语言中官方推荐使用channel来实现协程间通信,所以不需要再添加lock和信号量就能实现模式了,以下代码中我们通过子goroutine完成了生产者的功能,在在另一个子goroutine中实现了消费者的功能,注意要阻塞主goroutine以确保子goroutine能够执行,从而轻而易举的就这完成了生产者消费者模式。下面我们就通过具体实践中来看一下生产者消费者模型的实现。

package main
import (
	"fmt"
	"time"
)
func Product(ch chan<- int) { //生产者
	for i := 0; i <3; i++ {
		fmt.Println("Product  produceed", i)
		ch <- i //由于channel是goroutine安全的,所以此处没有必要必须加锁或者加lock操作.
	}
}
func Consumer(ch <-chan int) {
	for i := 0; i <3; i++ {
		j := <-ch //由于channel是goroutine安全的,所以此处没有必要必须加锁或者加lock操作.
		fmt.Println("Consmuer consumed ", j)
	}
}
func main() {
	ch := make(chan int)
	go Product(ch)//注意生产者与消费者放在不同goroutine中
	go Consumer(ch)//注意生产者与消费者放在不同goroutine中
	time.Sleep(time.Second * 1)//防止主goroutine退出
	/*运行结果并不确定,可能为
		Product  produceed 0
	Product  produceed 1
	Consmuer consumed  0
	Consmuer consumed  1
	Product  produceed 2
	Consmuer consumed  2
	*/
}

可以看到和Java比起来使用GO来实现并发式的生产者消费者模式的确是更为清爽了。

Rust的高并发实现

不得不说Rust的难度实在太高了,虽然笔者之前在汇编、C、Java等方面的经验可以帮助我快速掌握Go语言。但是假期看了两天Rust真想大呼告辞,这尼玛也太劝退了。在Rust官方提供的功能中,其实并不包括多生产者、多消费者的channel,std:sync空间下只有一个多生产者单消费者(mpsc)的channel。其样例实现如下:

use std::sync::mpsc;
use std::thread;
use std::time::Duration;
fn main() {
    let (tx, rx) = mpsc::channel();
    let tx1 = mpsc::Sender::clone(&tx);
    let tx2 = mpsc::Sender::clone(&tx);
    thread::spawn(move || {
        let vals = vec![
            String::from("1"),
            String::from("3"),
            String::from("5"),
            String::from("7"),
        ];
        for val in vals {
            tx1.send(val).unwrap();
            thread::sleep(Duration::from_secs(1));
        }
    });
    thread::spawn(move || {
        let vals = vec![
            String::from("11"),
            String::from("13"),
            String::from("15"),
            String::from("17"),
        ];
        for val in vals {
            tx.send(val).unwrap();
            thread::sleep(Duration::from_secs(1));
        }
    });
    thread::spawn(move || {
        let vals = vec![
            String::from("21"),
            String::from("23"),
            String::from("25"),
            String::from("27"),
        ];
        for val in vals {
            tx2.send(val).unwrap();
            thread::sleep(Duration::from_secs(1));
        }
    });
    for rec in rx {
        println!("Got: {}", rec);
    }
}

可以看到在Rust下实现生产者消费者是不难的,但是生产者可以clone多个,不过消费者却只能有一个,究其原因是因为Rust下没有GC也就是垃圾回收功能,而想保证安全Rust就必须要对于变更使用权限进行严格管理。在Rust下使用move关键字进行变更的所有权转移,但是按照Rust对于变更生产周期的管理规定,线程间权限转移的所有权接收者在同一时间只能有一个,这也是Rust官方只提供MPSC的原因,

use std::thread;
fn main() {
    let s = "hello";
    let handle = thread::spawn(move || {
        println!("{}", s);
    });
    handle.join().unwrap();
}

当然Rust下有一个API比较贴心就是join,他可以所有子线程都执行结束再退出主线程,这比Go中要手工阻塞还是要有一定的提高。而如果你想用多生产者、多消费者的功能,就要入手crossbeam模块了,这个模块掌握起来难度也真的不低。

总结

通过上面的比较我们可以用一张表格来说明几种主流语言的情况对比:

语言 安全性 运行速度 进程启动速度 学习难度
C 极快 极快 困难
Java 一般 一般 一般
Go 较快 较快 一般
Rust 极快(基本比肩C) 极快(基本比肩C) 极困难

可以看到Rust以其高安全性、基本比肩C的运行及启动速度必将在Serverless的时代独占鳌头,Go基本也能紧随其后,而C语言程序中难以避免的野指针,Java相对较低的运行及启动速度,可能都不太适用于函数式运算的场景,Java在企业级开发的时代打败各种C#之类的对手,但是在云时代好像还真没有之前统治力那么强了,真可谓是打败你的往往不是你的对手,而是其它空间的降维打击。


推荐阅读
  • 关于进程的复习:#管道#数据的共享Managerdictlist#进程池#cpu个数1#retmap(func,iterable)#异步自带close和join#所有 ... [详细]
  • 在iOS开发中,多线程技术的应用非常广泛,能够高效地执行多个调度任务。本文将重点介绍GCD(Grand Central Dispatch)在多线程开发中的应用,包括其函数和队列的实现细节。 ... [详细]
  • Docker安全策略与管理
    本文探讨了Docker的安全挑战、核心安全特性及其管理策略,旨在帮助读者深入理解Docker安全机制,并提供实用的安全管理建议。 ... [详细]
  • RTThread线程间通信
    线程中通信在裸机编程中,经常会使用全局变量进行功能间的通信,如某些功能可能由于一些操作而改变全局变量的值,另一个功能对此全局变量进行读取& ... [详细]
  • Redis:缓存与内存数据库详解
    本文介绍了数据库的基本分类,重点探讨了关系型与非关系型数据库的区别,并详细解析了Redis作为非关系型数据库的特点、工作模式、优点及持久化机制。 ... [详细]
  • 深入解析Python进程间通信:Queue与Pipe的应用
    本文详细探讨了Python中进程间通信的两种常用方法——Queue和Pipe,并通过具体示例介绍了它们的基本概念、使用方法及注意事项。 ... [详细]
  • Spring Boot + RabbitMQ 消息确认机制详解
    本文详细介绍如何在 Spring Boot 项目中使用 RabbitMQ 的消息确认机制,包括消息发送确认和消息接收确认,帮助开发者解决在实际操作中可能遇到的问题。 ... [详细]
  • 我自己做了一个网站图片的抓取,感觉速度有点慢抓取4000张图片可能得用15分钟左右的时间,我百度看用线程可以加快抓取,然后创建了5个线程抓取,但是5个线程是同步执行同样的操作一个图片就 ... [详细]
  • 软件测试行业深度解析:迈向高薪的必经之路
    本文深入探讨了软件测试行业的发展现状及未来趋势,旨在帮助有志于在该领域取得高薪的技术人员明确职业方向和发展路径。 ... [详细]
  • 了解多域名SAN SSL证书及其工作原理
    本文介绍了多域名SAN SSL证书的概念及其工作方式,探讨其在现代网络安全中的重要性和应用。 ... [详细]
  • 前言:由于Android系统本身决定了其自身的单线程模型结构。在日常的开发过程中,我们又不能把所有的工作都交给主线程去处理(会造成UI卡顿现象)。因此,适当的创建子线程去处理一些耗 ... [详细]
  • Java中的引用类型详解
    本文详细介绍了Java中的引用类型,包括强引用、软引用、弱引用和虚引用的特点和应用场景。 ... [详细]
  • 大华股份2013届校园招聘软件算法类试题D卷
    一、填空题(共17题,每题3分,总共51分)1.设有inta5,*b,**c,执行语句c&b,b&a后,**c的值为________答:5 ... [详细]
  • 本文详细介绍了如何对一个整数的二进制表示进行逆序操作。通过多种方法,包括直接法、查表法和分治法,帮助读者全面理解和掌握这一技术。 ... [详细]
  • Redis 是一个高性能的开源键值存储系统,支持多种数据结构。本文将详细介绍 Redis 中的六种底层数据结构及其在对象系统中的应用,包括字符串对象、列表对象、哈希对象、集合对象和有序集合对象。通过12张图解,帮助读者全面理解 Redis 的数据结构和对象系统。 ... [详细]
author-avatar
vvdjiechipi48_2a247d
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有