后面我们也会说明类型功能在Go、Java等高级语言中类似的功能已经被封装好了,但是在C语言中你就必须要用好互斥体( mutex)和信号量(semaphore)并协调他们之间的关系。由于C语言的实现是最复杂的,先来看结构体设计和他的注释:
typedef struct { char label[16];//消息内容 sem_t emptySem;//此信号量代表队列的可写状态 sem_t fullSem;//此信号量代表队列的可读状态 pthread_mutex_t queueMutex;//此互斥体为保证消息不会被误修改,保证线程程安全 int fullSlot;//队尾位置 int emptySlot;//队头位置 int queueSize;#队列长度 int numOfThreads;//同时操作的线程数量 pthread_t * qthread;//线程指针 SSchedMsg * queue;//队列指针 } SSchedQueue;
void *taosInitScheduler(int queueSize, int numOfThreads, char *label) { pthread_attr_t attr; SSchedQueue * pSched = (SSchedQueue *)malloc(sizeof(SSchedQueue)); memset(pSched, 0, sizeof(SSchedQueue)); pSched->queueSize = queueSize; pSched->numOfThreads = numOfThreads; strcpy(pSched->label, label); if (pthread_mutex_init(&pSched->queueMutex, NULL) <0) { pError("init %s:queueMutex failed, reason:%s", pSched->label, strerror(errno)); goto _error; } //emptySem是队列的可写状态,初始化时其值为queueSize,即初始时队列可写,可接受消息长度为队列长度。 if (sem_init(&pSched->emptySem, 0, (unsigned int)pSched->queueSize) != 0) { pError("init %s:empty semaphore failed, reason:%s", pSched->label, strerror(errno)); goto _error; } //fullSem是队列的可读状态,初始化时其值为0,即初始时队列不可读 if (sem_init(&pSched->fullSem, 0, 0) != 0) { pError("init %s:full semaphore failed, reason:%s", pSched->label, strerror(errno)); goto _error; } if ((pSched->queue = (SSchedMsg *)malloc((size_t)pSched->queueSize * sizeof(SSchedMsg))) == NULL) { pError("%s: no enough memory for queue, reason:%s", pSched->label, strerror(errno)); goto _error; } memset(pSched->queue, 0, (size_t)pSched->queueSize * sizeof(SSchedMsg)); pSched->fullSlot = 0;//实始化时队列为空,故队头和队尾的位置都是0 pSched->emptySlot = 0;//实始化时队列为空,故队头和队尾的位置都是0 pSched->qthread = malloc(sizeof(pthread_t) * (size_t)pSched->numOfThreads); pthread_attr_init(&attr); pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE); for (int i = 0; inumOfThreads; ++i) { if (pthread_create(pSched->qthread + i, &attr, taosProcessSchedQueue, (void *)pSched) != 0) { pError("%s: failed to create rpc thread, reason:%s", pSched->label, strerror(errno)); goto _error; } } pTrace("%s scheduler is initialized, numOfThreads:%d", pSched->label, pSched->numOfThreads); return (void *)pSched; _error: taosCleanUpScheduler(pSched); return NULL; }
void *taosProcessSchedQueue(void *param) { SSchedMsg msg; SSchedQueue *pSched = (SSchedQueue *)param; //注意这里是个无限循环,只要队列可读即sem_wait(&pSched->fullSem)不再阻塞就继续处理 while (1) { if (sem_wait(&pSched->fullSem) != 0) { pError("wait %s fullSem failed, errno:%d, reason:%s", pSched->label, errno, strerror(errno)); if (errno == EINTR) { /* sem_wait is interrupted by interrupt, ignore and continue */ continue; } } //加入互斥体防止msg被误用。 if (pthread_mutex_lock(&pSched->queueMutex) != 0) pError("lock %s queueMutex failed, reason:%s", pSched->label, strerror(errno)); msg = pSched->queue[pSched->fullSlot]; memset(pSched->queue + pSched->fullSlot, 0, sizeof(SSchedMsg)); //读取完毕修改fullSlot的值,注意这为避免fullSlot溢出,需要对于queueSize取余。 pSched->fullSlot = (pSched->fullSlot + 1) % pSched->queueSize; //读取完毕修改退出互斥体 if (pthread_mutex_unlock(&pSched->queueMutex) != 0) pError("unlock %s queueMutex failed, reason:%s ", pSched->label, strerror(errno)); //读取完毕对emptySem进行post操作,即把emptySem的值加1,如emptySem原值为5,读取一个消息后,emptySem的值为6,即可写状态,且能接受的消息数量为6 if (sem_post(&pSched->emptySem) != 0) pError("post %s emptySem failed, reason:%s ", pSched->label, strerror(errno)); if (msg.fp) (*(msg.fp))(&msg); else if (msg.tfp) (*(msg.tfp))(msg.ahandle, msg.thandle); } }
int taosScheduleTask(void *qhandle, SSchedMsg *pMsg) { SSchedQueue *pSched = (SSchedQueue *)qhandle; if (pSched == NULL) { pError("sched is not ready, msg:%p is dropped", pMsg); return 0; } //在写队列前先对emptySem进行减1操作,如emptySem原值为1,那么减1后为0,也就是队列已满,必须在读取消息后,即emptySem进行post操作后,队列才能进行可写状态。 if (sem_wait(&pSched->emptySem) != 0) pError("wait %s emptySem failed, reason:%s", pSched->label, strerror(errno)); //加入互斥体防止msg被误操作 if (pthread_mutex_lock(&pSched->queueMutex) != 0) pError("lock %s queueMutex failed, reason:%s", pSched->label, strerror(errno)); pSched->queue[pSched->emptySlot] = *pMsg; pSched->emptySlot = (pSched->emptySlot + 1) % pSched->queueSize; if (pthread_mutex_unlock(&pSched->queueMutex) != 0) pError("unlock %s queueMutex failed, reason:%s", pSched->label, strerror(errno)); //在写队列前先对fullSem进行加1操作,如fullSem原值为0,那么加1后为1,也就是队列可读,咱们上面介绍的读取函数可以进行处理。 if (sem_post(&pSched->fullSem) != 0) pError("post %s fullSem failed, reason:%s", pSched->label, strerror(errno)); return 0; }
public class Storage { // 仓库最大存储量 private final int MAX_SIZE = 10; // 仓库存储的载体 private LinkedList list = new LinkedList(); // 锁 private final Lock lock = new ReentrantLock(); // 仓库满的信号量 private final Condition full = lock.newCondition(); // 仓库空的信号量 private final Condition empty = lock.newCondition(); public void produce() { // 获得锁 lock.lock(); while (list.size() + 1 > MAX_SIZE) { System.out.println("【生产者" + Thread.currentThread().getName() + "】仓库已满"); try { full.await(); } catch (InterruptedException e) { e.printStackTrace(); } } list.add(new Object()); System.out.println("【生产者" + Thread.currentThread().getName() + "】生产一个产品,现库存" + list.size()); empty.signalAll(); lock.unlock(); } public void consume() { // 获得锁 lock.lock(); while (list.size() == 0) { System.out.println("【消费者" + Thread.currentThread().getName() + "】仓库为空"); try { empty.await(); } catch (InterruptedException e) { e.printStackTrace(); } } list.remove(); System.out.println("【消费者" + Thread.currentThread().getName() + "】消费一个产品,现库存" + list.size()); full.signalAll(); lock.unlock(); } }
package main import ( "fmt" "time" ) func Product(ch chan<- int) { //生产者 for i := 0; i <3; i++ { fmt.Println("Product produceed", i) ch <- i //由于channel是goroutine安全的,所以此处没有必要必须加锁或者加lock操作. } } func Consumer(ch <-chan int) { for i := 0; i <3; i++ { j := <-ch //由于channel是goroutine安全的,所以此处没有必要必须加锁或者加lock操作. fmt.Println("Consmuer consumed ", j) } } func main() { ch := make(chan int) go Product(ch)//注意生产者与消费者放在不同goroutine中 go Consumer(ch)//注意生产者与消费者放在不同goroutine中 time.Sleep(time.Second * 1)//防止主goroutine退出 /*运行结果并不确定,可能为 Product produceed 0 Product produceed 1 Consmuer consumed 0 Consmuer consumed 1 Product produceed 2 Consmuer consumed 2 */ }
use std::sync::mpsc; use std::thread; use std::time::Duration; fn main() { let (tx, rx) = mpsc::channel(); let tx1 = mpsc::Sender::clone(&tx); let tx2 = mpsc::Sender::clone(&tx); thread::spawn(move || { let vals = vec![ String::from("1"), String::from("3"), String::from("5"), String::from("7"), ]; for val in vals { tx1.send(val).unwrap(); thread::sleep(Duration::from_secs(1)); } }); thread::spawn(move || { let vals = vec![ String::from("11"), String::from("13"), String::from("15"), String::from("17"), ]; for val in vals { tx.send(val).unwrap(); thread::sleep(Duration::from_secs(1)); } }); thread::spawn(move || { let vals = vec![ String::from("21"), String::from("23"), String::from("25"), String::from("27"), ]; for val in vals { tx2.send(val).unwrap(); thread::sleep(Duration::from_secs(1)); } }); for rec in rx { println!("Got: {}", rec); } }
use std::thread; fn main() { let s = "hello"; let handle = thread::spawn(move || { println!("{}", s); }); handle.join().unwrap(); }
语言 | 安全性 | 运行速度 | 进程启动速度 | 学习难度 |
C | 低 | 极快 | 极快 | 困难 |
Java | 高 | 一般 | 一般 | 一般 |
Go | 高 | 较快 | 较快 | 一般 |
Rust | 高 | 极快(基本比肩C) | 极快(基本比肩C) | 极困难 |