热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

WebRTC学习进阶之路 十四、源码分析之WebRTC中的线程详解ThreadManagerThread

WebRTC学习进阶之路系列总目录:https:blog.csdn.netxiaomucgwlmxarticledetails103204274前言WebRTC源

WebRTC学习进阶之路系列总目录:https://blog.csdn.net/xiaomucgwlmx/article/details/103204274

前言

WebRTC源码中的很多注释是很赞的,看源码的时候多加阅读注释有助于更好的理解。
WebRTC实现了跨平台(Windows,MacOS,Linux,IOS,Android)的线程类rtc::Thread,WebRTC内部的network_thread,worker_thread,signaling_thread均是该类的实例。

基础功能
rtc:: Thread及其相关类,ThreadManager、MessageQueue,Runnable等等一起提供了如下的基础功能:
线程的管理:通过ThreadManager单例对象,可以管理所有的Thread实例;
线程的常规基本功能:rtc:: Thread提供创建线程对象,设置线程名称,启动线程去执行用户代码;
消息循环,消息投递:rtc:: Thread通过继承MessageQueue类,提供了内部消息循环,并提供了线程间异步,同步投递消息的功能;
跨线程执行方法:提供了跨线程执行方法,并返回执行结果的功能。该功能非常强大,因为WebRTC在某些功能模块的使用上,有要求其必需在指定的线程中才能调用的基本要求,比如音频模块:ADM 的创建必须要在 WebRTC 的 worker thread 中进行;
多路分离器:通过持有SocketServer对象,实现了多路分离器的功能,能处理网络IO;

          

signaling_thread:处理小工作量方法,要求此线程内的方法都必须快速返回。

worker_thread:处理大工作量的方法,此线程内的方法可能会处理很长时间。

network_thread:处理网络消息。
下边我们来看下两个线程的核心类:ThreadManager和Thread。

一、ThreadManager

ThreadManager的源码位于rtc_base目录下的thread.h与thread.cc中,相关头文件如下:

class RTC_EXPORT ThreadManager {public:static const int kForever = -1;// Singleton, constructor and destructor are private.static ThreadManager* Instance();Thread* CurrentThread();void SetCurrentThread(Thread* thread);Thread* WrapCurrentThread();void UnwrapCurrentThread();bool IsMainThread();private:ThreadManager();~ThreadManager();#if defined(WEBRTC_POSIX)pthread_key_t key_;
#endif#if defined(WEBRTC_WIN)const DWORD key_;
#endif// The thread to potentially autowrap.const PlatformThreadRef main_thread_ref_;RTC_DISALLOW_COPY_AND_ASSIGN(ThreadManager);
};

我们来逐步看下这几个方法:

1,static ThreadManager* Instance()

ThreadManager* ThreadManager::Instance() {static ThreadManager* const thread_manager = new ThreadManager();return thread_manager;
}

从源码可以看到ThreadManager是通过单例模式,通过静态方法Instance()来获取唯一的实例。

2,构造与设置、获取当前线程(跨平台)

#if defined(WEBRTC_POSIX)
ThreadManager::ThreadManager() : main_thread_ref_(CurrentThreadRef()) {
#if defined(WEBRTC_MAC)InitCocoaMultiThreading();
#endifpthread_key_create(&key_, nullptr);
}
Thread* ThreadManager::CurrentThread() {return static_cast(pthread_getspecific(key_));
}
void ThreadManager::SetCurrentThread(Thread* thread) {
#if RTC_DLOG_IS_ONif (CurrentThread() && thread) {RTC_DLOG(LS_ERROR) <<"SetCurrentThread: Overwriting an existing value?";}
#endif // RTC_DLOG_IS_ONpthread_setspecific(key_, thread);
}
#endif#if defined(WEBRTC_WIN)
ThreadManager::ThreadManager(): key_(TlsAlloc()), main_thread_ref_(CurrentThreadRef()) {}Thread* ThreadManager::CurrentThread() {return static_cast(TlsGetValue(key_));
}
void ThreadManager::SetCurrentThread(Thread* thread) {RTC_DCHECK(!CurrentThread() || !thread);TlsSetValue(key_, thread);
}
#endif

构造与析构函数均声明为private。下边看下ThreadManager在不同平台上的构造方式、设置和获取当前线程的系统差异性&#xff1a;

下边是来自网友ice_ly000的一段精辟的阐述&#xff0c;摘录部分如下&#xff1a;我们可以看到在Windows和类Unix系统中实现进行了区分&#xff0c;WEBRTC_POSIX宏表征该系统是类Unix系统&#xff0c;而WEBRTC_WIN宏表征是Windows系统。虽然实现稍微有些许不同&#xff0c;在MAC下还需要调用InitCocoaMultiThreading()方法来初始化多线程库。但是两个构造函数均初始化了成员key_与main_thread_ref_(我们可以看到WebRTC中的私有成员均以下划线结尾)。其中key是线程管理的关键。

key_的初始化&#xff1a;在Windows平台上&#xff0c;key_被声明为DWORD类型&#xff0c;赋值为TlsAlloc()函数的返回值&#xff0c;TlsAlloc()函数是Windows的系统API&#xff0c;Tls表示的是线程局部存储Thread Local Storage的缩写&#xff0c;其为每个可能的线程分配了一个线程局部变量的槽位&#xff0c;该槽位用来存储WebRTC的Thread线程对象指针。如果不了解相关概念&#xff0c;可以看微软的官方文档。在类Unix系统上&#xff0c;key_被声明pthread_key_t类型&#xff0c;使用方法pthread_key_create(&key_, nullptr);赋值。实质是类Unix系统上的线程局部存储实现&#xff0c;隶属于线程库pthread&#xff0c;因此方法与变量均以pthread开头。总之&#xff0c;在ThreadManager的构造之初&#xff0c;WebRTC就为各个线程所对应的Thread对象制造了一个线程局部变量的槽位&#xff0c;成为多线程管理的关键。

main_thread_ref_的初始化&#xff1a;该成员为PlatformThreadRef类型的对象&#xff0c;赋值为CurrentThreadRef()方法的返回值&#xff0c;如源码所示&#xff1a;在Windows系统下&#xff0c;取值为WinAPI GetCurrentThreadId()返回的当前线程描述符&#xff0c;DWORD类型&#xff1b;在FUCHSIA系统下(该系统是Google新开发的操作系统&#xff0c;像Android还是基于Linux内核属于类Unix范畴&#xff0c;遵循POSIX规范&#xff0c;但FUCHSIA是基于新内核zircon开发的)&#xff0c;返回zx_thread_self()&#xff0c;zx_handle_t类型&#xff1b;在类Unix系统下&#xff0c;通过pthread库的pthread_self()返回&#xff0c;pthread_t类型。总之&#xff0c;如前文所述&#xff0c;这部分代码肯定是在主线程中所运行&#xff0c;因此&#xff0c;main_thread_ref_存储了主线程TID在不同平台下的不同表示。
感谢ice_ly000&#xff01;

3&#xff0c;WrapCurrentThread()与UnwrapCurrentThread()

// Returns a thread object with its thread_ ivar set// to whatever the OS uses to represent the thread.// If there already *is* a Thread object corresponding to this thread,// this method will return that. Otherwise it creates a new Thread// object whose wrapped() method will return true, and whose// handle will, on Win32, be opened with only synchronization privileges -// if you need more privilegs, rather than changing this method, please// write additional code to adjust the privileges, or call a different// factory method of your own devising, because this one gets used in// unexpected contexts (like inside browser plugins) and it would be a// shame to break it. It is also conceivable on Win32 that we won&#39;t even// be able to get synchronization privileges, in which case the result// will have a null handle.Thread* WrapCurrentThread();void UnwrapCurrentThread();

Thread* ThreadManager::WrapCurrentThread() {Thread* result &#61; CurrentThread();if (nullptr &#61;&#61; result) {result &#61; new Thread(SocketServer::CreateDefault());result->WrapCurrentWithThreadManager(this, true);}return result;
}void ThreadManager::UnwrapCurrentThread() {Thread* t &#61; CurrentThread();if (t && !(t->IsOwned())) {t->UnwrapCurrent();delete t;}
}

上边头文件中的注释已经介绍的很清楚了&#xff0c;每个操作系统都会返回一个thread对象&#xff0c;已经存在thread对象就直接返回&#xff0c;否则创建一个新的thread对象&#xff0c;并通过该对象的WrapCurrentWithThreadManager()方法将新建的thread对象纳入ThreadManager的管理之中。对于UnwrapCurrentThread&#xff0c;会根据该线程是不是WrapCurrentWithThreadManager而来&#xff0c;决定是否进行正真的UnwrapCurrent操作删除Thread对象。

4&#xff0c;是否为主线程IsMainThread()

bool ThreadManager::IsMainThread() {return IsThreadRefEqual(CurrentThreadRef(), main_thread_ref_);
}

 

二、Thread

1&#xff0c;源码头文件

源码中的注释已经非常全&#xff0c;这里将.h及注释展示如下&#xff1a;

class RTC_LOCKABLE RTC_EXPORT Thread : public MessageQueue,public webrtc::TaskQueueBase {public:explicit Thread(SocketServer* ss);explicit Thread(std::unique_ptr ss);// Constructors meant for subclasses; they should call DoInit themselves and// pass false for |do_init|, so that DoInit is called only on the fully// instantiated class, which avoids a vptr data race.Thread(SocketServer* ss, bool do_init);Thread(std::unique_ptr ss, bool do_init);// NOTE: ALL SUBCLASSES OF Thread MUST CALL Stop() IN THEIR DESTRUCTORS (or// guarantee Stop() is explicitly called before the subclass is destroyed).// This is required to avoid a data race between the destructor modifying the// vtable, and the Thread::PreRun calling the virtual method Run().~Thread() override;static std::unique_ptr CreateWithSocketServer();static std::unique_ptr Create();static Thread* Current();// Used to catch performance regressions. Use this to disallow blocking calls// (Invoke) for a given scope. If a synchronous call is made while this is in// effect, an assert will be triggered.// Note that this is a single threaded class.class ScopedDisallowBlockingCalls {public:ScopedDisallowBlockingCalls();ScopedDisallowBlockingCalls(const ScopedDisallowBlockingCalls&) &#61; delete;ScopedDisallowBlockingCalls& operator&#61;(const ScopedDisallowBlockingCalls&) &#61;delete;~ScopedDisallowBlockingCalls();private:Thread* const thread_;const bool previous_state_;};bool IsCurrent() const;// Sleeps the calling thread for the specified number of milliseconds, during// which time no processing is performed. Returns false if sleeping was// interrupted by a signal (POSIX only).static bool SleepMs(int millis);// Sets the thread&#39;s name, for debugging. Must be called before Start().// If |obj| is non-null, its value is appended to |name|.const std::string& name() const { return name_; }bool SetName(const std::string& name, const void* obj);// Starts the execution of the thread.bool Start();// Tells the thread to stop and waits until it is joined.// Never call Stop on the current thread. Instead use the inherited Quit// function which will exit the base MessageQueue without terminating the// underlying OS thread.virtual void Stop();// By default, Thread::Run() calls ProcessMessages(kForever). To do other// work, override Run(). To receive and dispatch messages, call// ProcessMessages occasionally.virtual void Run();virtual void Send(const Location& posted_from,MessageHandler* phandler,uint32_t id &#61; 0,MessageData* pdata &#61; nullptr);// Convenience method to invoke a functor on another thread. Caller must// provide the |ReturnT| template argument, which cannot (easily) be deduced.// Uses Send() internally, which blocks the current thread until execution// is complete.// Ex: bool result &#61; thread.Invoke(RTC_FROM_HERE,// &MyFunctionReturningBool);// NOTE: This function can only be called when synchronous calls are allowed.// See ScopedDisallowBlockingCalls for details.// NOTE: Blocking invokes are DISCOURAGED, consider if what you&#39;re doing can// be achieved with PostTask() and callbacks instead.template ReturnT Invoke(const Location& posted_from, FunctorT&& functor) {FunctorMessageHandler handler(std::forward(functor));InvokeInternal(posted_from, &handler);return handler.MoveResult();}// Posts a task to invoke the functor on |this| thread asynchronously, i.e.// without blocking the thread that invoked PostTask(). Ownership of |functor|// is passed and (usually, see below) destroyed on |this| thread after it is// invoked.// Requirements of FunctorT:// - FunctorT is movable.// - FunctorT implements "T operator()()" or "T operator()() const" for some T// (if T is not void, the return value is discarded on |this| thread).// - FunctorT has a public destructor that can be invoked from |this| thread// after operation() has been invoked.// - The functor must not cause the thread to quit before PostTask() is done.//// Destruction of the functor/task mimics what TaskQueue::PostTask does: If// the task is run, it will be destroyed on |this| thread. However, if there// are pending tasks by the time the Thread is destroyed, or a task is posted// to a thread that is quitting, the task is destroyed immediately, on the// calling thread. Destroying the Thread only blocks for any currently running// task to complete. Note that TQ abstraction is even vaguer on how// destruction happens in these cases, allowing destruction to happen// asynchronously at a later time and on some arbitrary thread. So to ease// migration, don&#39;t depend on Thread::PostTask destroying un-run tasks// immediately.//// Example - Calling a class method:// class Foo {// public:// void DoTheThing();// };// Foo foo;// thread->PostTask(RTC_FROM_HERE, Bind(&Foo::DoTheThing, &foo));//// Example - Calling a lambda function:// thread->PostTask(RTC_FROM_HERE,// [&x, &y] { x.TrackComputations(y.Compute()); });template void PostTask(const Location& posted_from, FunctorT&& functor) {// Allocate at first call, never deallocate.static auto* const handler &#61;new rtc_thread_internal::MessageHandlerWithTask;Post(posted_from, handler, 0,new rtc_thread_internal::MessageWithFunctor(std::forward(functor)));}// From TaskQueueBasevoid PostTask(std::unique_ptr task) override;void PostDelayedTask(std::unique_ptr task,uint32_t milliseconds) override;void Delete() override;// From MessageQueuebool IsProcessingMessagesForTesting() override;void Clear(MessageHandler* phandler,uint32_t id &#61; MQID_ANY,MessageList* removed &#61; nullptr) override;void ReceiveSends() override;// ProcessMessages will process I/O and dispatch messages until:// 1) cms milliseconds have elapsed (returns true)// 2) Stop() is called (returns false)bool ProcessMessages(int cms);// Returns true if this is a thread that we created using the standard// constructor, false if it was created by a call to// ThreadManager::WrapCurrentThread(). The main thread of an application// is generally not owned, since the OS representation of the thread// obviously exists before we can get to it.// You cannot call Start on non-owned threads.bool IsOwned();// Expose private method IsRunning() for tests.//// DANGER: this is a terrible public API. Most callers that might want to// call this likely do not have enough control/knowledge of the Thread in// question to guarantee that the returned value remains true for the duration// of whatever code is conditionally executing because of the return value!bool RunningForTest() { return IsRunning(); }// These functions are public to avoid injecting test hooks. Don&#39;t call them// outside of tests.// This method should be called when thread is created using non standard// method, like derived implementation of rtc::Thread and it can not be// started by calling Start(). This will set started flag to true and// owned to false. This must be called from the current thread.bool WrapCurrent();void UnwrapCurrent();// Sets the per-thread allow-blocking-calls flag to false; this is// irrevocable. Must be called on this thread.void DisallowBlockingCalls() { SetAllowBlockingCalls(false); }#ifdef WEBRTC_ANDROID// Sets the per-thread allow-blocking-calls flag to true, sidestepping the// invariants upheld by DisallowBlockingCalls() and// ScopedDisallowBlockingCalls. Must be called on this thread.void DEPRECATED_AllowBlockingCalls() { SetAllowBlockingCalls(true); }
#endifprotected:// Same as WrapCurrent except that it never fails as it does not try to// acquire the synchronization access of the thread. The caller should never// call Stop() or Join() on this thread.void SafeWrapCurrent();// Blocks the calling thread until this thread has terminated.void Join();static void AssertBlockingIsAllowedOnCurrentThread();friend class ScopedDisallowBlockingCalls;private:class QueuedTaskHandler final : public MessageHandler {public:void OnMessage(Message* msg) override;};// Sets the per-thread allow-blocking-calls flag and returns the previous// value. Must be called on this thread.bool SetAllowBlockingCalls(bool allow);#if defined(WEBRTC_WIN)static DWORD WINAPI PreRun(LPVOID context);
#elsestatic void* PreRun(void* pv);
#endif// ThreadManager calls this instead WrapCurrent() because// ThreadManager::Instance() cannot be used while ThreadManager is// being created.// The method tries to get synchronization rights of the thread on Windows if// |need_synchronize_access| is true.bool WrapCurrentWithThreadManager(ThreadManager* thread_manager,bool need_synchronize_access);// Return true if the thread is currently running.bool IsRunning();// Processes received "Send" requests. If |source| is not null, only requests// from |source| are processed, otherwise, all requests are processed.void ReceiveSendsFromThread(const Thread* source);// If |source| is not null, pops the first "Send" message from |source| in// |sendlist_|, otherwise, pops the first "Send" message of |sendlist_|.// The caller must lock |crit_| before calling.// Returns true if there is such a message.bool PopSendMessageFromThread(const Thread* source, _SendMessage* msg);void InvokeInternal(const Location& posted_from, MessageHandler* handler);std::list<_SendMessage> sendlist_;std::string name_;// TODO(tommi): Add thread checks for proper use of control methods.// Ideally we should be able to just use PlatformThread.#if defined(WEBRTC_POSIX)pthread_t thread_ &#61; 0;
#endif#if defined(WEBRTC_WIN)HANDLE thread_ &#61; nullptr;DWORD thread_id_ &#61; 0;
#endif// Indicates whether or not ownership of the worker thread lies with// this instance or not. (i.e. owned_ &#61;&#61; !wrapped).// Must only be modified when the worker thread is not running.bool owned_ &#61; true;// Only touched from the worker thread itself.bool blocking_calls_allowed_ &#61; true;// Runs webrtc::QueuedTask posted to the Thread.QueuedTaskHandler queued_task_handler_;friend class ThreadManager;RTC_DISALLOW_COPY_AND_ASSIGN(Thread);
};

2&#xff0c;Thread的构造

explicit Thread(SocketServer* ss);explicit Thread(std::unique_ptr ss);// Constructors meant for subclasses; they should call DoInit themselves and// pass false for |do_init|, so that DoInit is called only on the fully// instantiated class, which avoids a vptr data race.Thread(SocketServer* ss, bool do_init);Thread(std::unique_ptr ss, bool do_init);

Thread::Thread(SocketServer* ss) : Thread(ss, /*do_init&#61;*/true) {}Thread::Thread(std::unique_ptr ss): Thread(std::move(ss), /*do_init&#61;*/true) {}Thread::Thread(SocketServer* ss, bool do_init): MessageQueue(ss, /*do_init&#61;*/false) {SetName("Thread", this); // default nameif (do_init) {DoInit();}
}Thread::Thread(std::unique_ptr ss, bool do_init): MessageQueue(std::move(ss), false) {SetName("Thread", this); // default nameif (do_init) {DoInit();}
}

void MessageQueue::DoInit() {if (fInitialized_) {return;}fInitialized_ &#61; true;MessageQueueManager::Add(this);
}

我们可以看到有四个构造&#xff0c;前边两个声明为explicit&#xff0c;C&#43;&#43;提供了关键字explicit&#xff0c;可以阻止不应该允许的经过转换构造函数进行的隐式转换的发生。声明为explicit的构造函数不能在隐式转换中使用。而头文件中对没有explicit修饰的两个已经做了解释&#xff0c;用于子类的构造函数&#xff0c; 它们应该自己调用DoInit并为| do_init |传递false&#xff0c;以便仅在完全实例化的类上调用DoInit&#xff0c;这避免了vptr数据争用。
DoInit()方法在Thread构造中调用&#xff0c;我们会发现该方法将MQ的初始化标志置为true&#xff0c;并且将自身纳入MQ管理类的管理列表中。如果DoInit在MQ构造中调用&#xff0c;意味着MQ构造后&#xff0c;Thread对象的指针已经暴露于外&#xff08;被MQ管理类对象持有&#xff09;&#xff0c;此时Thread对象并未完全构建完成&#xff0c;其虚表vtable还未完全建立。这势必会导致Thread的对象还未构造完成时&#xff0c;就可能会被外部使用(在别的线程中通过MessageQueueManager访问该对象)的风险。为了规避这样的竞太条件&#xff0c;因此&#xff0c;需要给MQ的构造传入false&#xff0c;并在Thread构造中调用DoInit()。

client中的应用如下&#xff1a;

rtc::WinsockInitializer winsock_init;rtc::Win32SocketServer w32_ss;rtc::Win32Thread w32_thread(&w32_ss);rtc::ThreadManager::Instance()->SetCurrentThread(&w32_thread);

3&#xff0c;创建Thread

std::unique_ptr Thread::CreateWithSocketServer() {return std::unique_ptr(new Thread(SocketServer::CreateDefault()));
}std::unique_ptr Thread::Create() {return std::unique_ptr(new Thread(std::unique_ptr(new NullSocketServer())));
}

class NullSocketServer : public SocketServer {public:NullSocketServer();~NullSocketServer() override;bool Wait(int cms, bool process_io) override;void WakeUp() override;Socket* CreateSocket(int family, int type) override;AsyncSocket* CreateAsyncSocket(int family, int type) override;private:Event event_;
};

  • Thread::Create()&#xff1a;给Thread构造传入的是NullSocketServer对象&#xff0c;该对象不持有真正的Socket&#xff0c;使得创建的Thread无法处理网络IO&#xff0c;但可以运行消息循环&#xff0c;可以处理线程间消息投递。WebRTC中工作线程worker_thread_默认使用该方法创建&#xff1b;
  • Thread::CreateWithSocketServer()&#xff1a;给Thread构造传入PhysicalSocketServer对象&#xff0c;该对象持有平台相关的Socket对象&#xff0c;使得Thread能处理网络IO&#xff0c;当然&#xff0c;也可以处理线程间消息投递。WebRTC中网络线程network_thread_默认使用该方法创建。
     

4&#xff0c;线程启动

bool Thread::Start() {RTC_DCHECK(!IsRunning());if (IsRunning())return false;Restart(); // reset IsQuitting() if the thread is being restarted// Make sure that ThreadManager is created on the main thread before// we start a new thread.ThreadManager::Instance();owned_ &#61; true;#if defined(WEBRTC_WIN)thread_ &#61; CreateThread(nullptr, 0, PreRun, this, 0, &thread_id_);if (!thread_) {return false;}
#elif defined(WEBRTC_POSIX)pthread_attr_t attr;pthread_attr_init(&attr);int error_code &#61; pthread_create(&thread_, &attr, PreRun, this);if (0 !&#61; error_code) {RTC_LOG(LS_ERROR) <<"Unable to create pthread, error " <#endifreturn true;
}

这块很简单&#xff0c;首先检测运行状态&#xff0c;复位消息循环stop_标志位&#xff0c;快平台创建线程对象。重点在PreRun():

// static
#if defined(WEBRTC_WIN)
DWORD WINAPI Thread::PreRun(LPVOID pv) {
#else
void* Thread::PreRun(void* pv) {
#endifThread* thread &#61; static_cast(pv);// 将新创建的Thread对象纳入管理&#xff0c;与当前线程进行绑定。ThreadManager::Instance()->SetCurrentThread(thread);// 为线程设置名称&#xff0c;该方法会调用平台相关的API给线程内核结构体赋值上该线程的名称。rtc::SetCurrentThreadName(thread->name_.c_str());CurrentTaskQueueSetter set_current_task_queue(thread);
// 如果是MAC系统&#xff0c;通过pool对象的创建和析构来使用oc的自动释放池技术&#xff0c;进行内存回收。
#if defined(WEBRTC_MAC)ScopedAutoReleasePool pool;
#endifthread->Run();//阻塞循环&#xff0c;下边再详解//到这里意味着线程要释放了ThreadManager::Instance()->SetCurrentThread(nullptr);
#ifdef WEBRTC_WINreturn 0;
#elsereturn nullptr;
#endif
} // namespace rtc

详细的介绍已经写在了主时中&#xff0c;下边我们来看一下Run()方法。

5&#xff0c;Run() 之ProcessMessages()建立消息循环

void Thread::Run() {ProcessMessages(kForever);
}
bool Thread::ProcessMessages(int cmsLoop) {// Using ProcessMessages with a custom clock for testing and a time greater// than 0 doesn&#39;t work, since it&#39;s not guaranteed to advance the custom// clock&#39;s time, and may get stuck in an infinite loop.RTC_DCHECK(GetClockForTesting() &#61;&#61; nullptr || cmsLoop &#61;&#61; 0 ||cmsLoop &#61;&#61; kForever);// 计算终止处理消息的时间int64_t msEnd &#61; (kForever &#61;&#61; cmsLoop) ? 0 : TimeAfter(cmsLoop);// 下次可以进行消息获取的时间长度int cmsNext &#61; cmsLoop;while (true) {
#if defined(WEBRTC_MAC)ScopedAutoReleasePool pool;
#endifMessage msg;if (!Get(&msg, cmsNext))return !IsQuitting();Dispatch(&msg);// 若不是无限期&#xff0c;计算下次可以进行消息获取的时间if (cmsLoop !&#61; kForever) {cmsNext &#61; static_cast(TimeUntil(msEnd));// 若使用时间已经到了&#xff0c;那么退出循环if (cmsNext <0)return true;}}
}

ProcessMessages当传入参数为kForever(static const int kForever &#61; -1;)时消息循环无限期进行循环处理&#xff0c;否则有限期定时处理&#xff0c;while中一直会调用MessageQueue::Get()去获取消息(MessageQueue、MessageQueueManager等会在后续核心分析)&#xff0c;通过Dispatch(&msg);来处理消息。

六、Invoke跨线程同步执行

// Convenience method to invoke a functor on another thread. Caller must// provide the |ReturnT| template argument, which cannot (easily) be deduced.// Uses Send() internally, which blocks the current thread until execution// is complete.// Ex: bool result &#61; thread.Invoke(RTC_FROM_HERE,// &MyFunctionReturningBool);// NOTE: This function can only be called when synchronous calls are allowed.// See ScopedDisallowBlockingCalls for details.// NOTE: Blocking invokes are DISCOURAGED, consider if what you&#39;re doing can// be achieved with PostTask() and callbacks instead.template ReturnT Invoke(const Location& posted_from, FunctorT&& functor) {FunctorMessageHandler handler(std::forward(functor));InvokeInternal(posted_from, &handler);return handler.MoveResult();}
void Thread::InvokeInternal(const Location& posted_from,MessageHandler* handler) {TRACE_EVENT2("webrtc", "Thread::Invoke", "src_file_and_line",posted_from.file_and_line(), "src_func",posted_from.function_name());Send(posted_from, handler);
}

Invoke提供在另一个线程上调用函数的便捷方法。原理是内部使用Send&#xff08;&#xff09;&#xff0c;它将阻塞当前线程&#xff0c;直到执行完成。这里要注意两点&#xff1a;

  • 仅当允许同步调用时才能调用此函数。
  • 建议不要使用阻塞调用&#xff0c;请考虑是否可以使用PostTask&#xff08;&#xff09;和回调来实现您正在执行的操作。

下边我们看下阻塞执行的send()方法。

七、阻塞执行的send()发送消息


void Thread::Send(const Location& posted_from,MessageHandler* phandler,uint32_t id,MessageData* pdata) {if (IsQuitting())return;// Sent messages are sent to the MessageHandler directly, in the context// of "thread", like Win32 SendMessage. If in the right context,// call the handler directly.Message msg;msg.posted_from &#61; posted_from;msg.phandler &#61; phandler;msg.message_id &#61; id;msg.pdata &#61; pdata;if (IsCurrent()) {phandler->OnMessage(&msg);return;}AssertBlockingIsAllowedOnCurrentThread();AutoThread thread;Thread* current_thread &#61; Thread::Current();RTC_DCHECK(current_thread !&#61; nullptr); // AutoThread ensures thisbool ready &#61; false;{CritScope cs(&crit_);_SendMessage smsg;smsg.thread &#61; current_thread;smsg.msg &#61; msg;smsg.ready &#61; &ready;sendlist_.push_back(smsg);}// Wait for a replyWakeUpSocketServer();bool waited &#61; false;crit_.Enter();while (!ready) {crit_.Leave();// We need to limit "ReceiveSends" to |this| thread to avoid an arbitrary// thread invoking calls on the current thread.current_thread->ReceiveSendsFromThread(this);current_thread->socketserver()->Wait(kForever, false);waited &#61; true;crit_.Enter();}crit_.Leave();// Our Wait loop above may have consumed some WakeUp events for this// MessageQueue, that weren&#39;t relevant to this Send. Losing these WakeUps can// cause problems for some SocketServers.//// Concrete example:// Win32SocketServer on thread A calls Send on thread B. While processing the// message, thread B Posts a message to A. We consume the wakeup for that// Post while waiting for the Send to complete, which means that when we exit// this loop, we need to issue another WakeUp, or else the Posted message// won&#39;t be processed in a timely manner.if (waited) {current_thread->socketserver()->WakeUp();}
}

  • IsQuitting()判断目标线程的消息循环是否还在处理消息&#xff0c;若消息循环停止工作&#xff0c;那么会拒绝处理消息&#xff0c;Send会直接返回创建需要处理的消息
  • 判断IsCurrent()若目标线程就是自己&#xff0c;那么直接处理消息phandler->OnMessage(&msg)&#xff0c;然后返回
  • AssertBlockingIsAllowedOnCurrentThread();断言当前线程是否具有阻塞权限&#xff0c;无阻塞权限向别的线程Send消息就是个非法操作
  • RTC_DCHECK(current_thread !&#61; nullptr);确保当前线程有一个Thread对象与之绑定 
  • ready 表征该消息是否已经处理完
  • 创建一个SendMessage对象&#xff0c;放置到目标线程对象的sendlist_ &#xff0c;sendlist_.push_back(smsg);
  • WakeUpSocketServer();将目标线程从IO处理中唤醒
  • 同步等待消息被处理 
  • current_thread->ReceiveSendsFromThread(this);处理对方发送的消息
  • current_thread->socketserver()->Wait(kForever, false);处理完对方的Send消息后&#xff0c;阻塞等待对方处理完我Send的消息
  • 如果出现过waited&#xff0c;那么再唤醒一次当前线程去处理Post消息

 

 

参考链接&#xff1a;https://blog.csdn.net/ice_ly000/article/details/103178691

WebRTC学习进阶之路系列总目录&#xff1a;https://blog.csdn.net/xiaomucgwlmx/article/details/103204274

 

 


推荐阅读
  • Parallels Desktop for Mac 是一款功能强大的虚拟化软件,能够在不重启的情况下实现在同一台电脑上无缝切换和使用 Windows 和 macOS 系统中的各种应用程序。该软件不仅提供了高效稳定的性能,还支持多种高级功能,如拖放文件、共享剪贴板等,极大地提升了用户的生产力和使用体验。 ... [详细]
  • Python 伦理黑客技术:深入探讨后门攻击(第三部分)
    在《Python 伦理黑客技术:深入探讨后门攻击(第三部分)》中,作者详细分析了后门攻击中的Socket问题。由于TCP协议基于流,难以确定消息批次的结束点,这给后门攻击的实现带来了挑战。为了解决这一问题,文章提出了一系列有效的技术方案,包括使用特定的分隔符和长度前缀,以确保数据包的准确传输和解析。这些方法不仅提高了攻击的隐蔽性和可靠性,还为安全研究人员提供了宝贵的参考。 ... [详细]
  • V8不仅是一款著名的八缸发动机,广泛应用于道奇Charger、宾利Continental GT和BossHoss摩托车中。自2008年以来,作为Chromium项目的一部分,V8 JavaScript引擎在性能优化和技术创新方面取得了显著进展。该引擎通过先进的编译技术和高效的垃圾回收机制,显著提升了JavaScript的执行效率,为现代Web应用提供了强大的支持。持续的优化和创新使得V8在处理复杂计算和大规模数据时表现更加出色,成为众多开发者和企业的首选。 ... [详细]
  • MongoDB高可用架构:深入解析Replica Set机制
    MongoDB的高可用架构主要依赖于其Replica Set机制。Replica Set通过多个mongod节点的协同工作,实现了数据的冗余存储和故障自动切换,确保了系统的高可用性和数据的一致性。本文将深入解析Replica Set的工作原理及其在实际应用中的配置和优化方法,帮助读者更好地理解和实施MongoDB的高可用架构。 ... [详细]
  • C#中实现高效UDP数据传输技术
    C#中实现高效UDP数据传输技术 ... [详细]
  • Python与R语言在功能和应用场景上各有优势。尽管R语言在统计分析和数据可视化方面具有更强的专业性,但Python作为一种通用编程语言,适用于更广泛的领域,包括Web开发、自动化脚本和机器学习等。对于初学者而言,Python的学习曲线更为平缓,上手更加容易。此外,Python拥有庞大的社区支持和丰富的第三方库,使其在实际应用中更具灵活性和扩展性。 ... [详细]
  • Python学习:环境配置与安装指南
    Python作为一种跨平台的编程语言,适用于Windows、Linux和macOS等多种操作系统。为了确保本地已成功安装Python,用户可以通过终端或命令行界面输入`python`或`python3`命令进行验证。此外,建议使用虚拟环境管理工具如`venv`或`conda`,以便更好地隔离不同项目依赖,提高开发效率。 ... [详细]
  • Visual Studio Code (VSCode) 是一款功能强大的源代码编辑器,支持多种编程语言,具备丰富的扩展生态。本文将详细介绍如何在 macOS 上安装、配置并使用 VSCode。 ... [详细]
  • 字符串学习时间:1.5W(“W”周,下同)知识点checkliststrlen()函数的返回值是什么类型的?字 ... [详细]
  • importpymysql#一、直接连接mysql数据库'''coonpymysql.connect(host'192.168.*.*',u ... [详细]
  • 微软推出Windows Terminal Preview v0.10
    微软近期发布了Windows Terminal Preview v0.10,用户可以在微软商店或GitHub上获取这一更新。该版本在2月份发布的v0.9基础上,新增了鼠标输入和复制Pane等功能。 ... [详细]
  • MySQL 5.7 学习指南:SQLyog 中的主键、列属性和数据类型
    本文介绍了 MySQL 5.7 中主键(Primary Key)和自增(Auto-Increment)的概念,以及如何在 SQLyog 中设置这些属性。同时,还探讨了数据类型的分类和选择,以及列属性的设置方法。 ... [详细]
  • 解决 Windows Server 2016 网络连接问题
    本文详细介绍了如何解决 Windows Server 2016 在使用无线网络 (WLAN) 和有线网络 (以太网) 时遇到的连接问题。包括添加必要的功能和安装正确的驱动程序。 ... [详细]
  • 使用Jsoup解析并遍历HTML文档时,该库能够高效地生成一个清晰、规范的解析树,即使源HTML文档存在格式问题。Jsoup具备强大的容错能力,能够处理多种异常情况,如未闭合的标签等,确保解析结果的准确性和完整性。 ... [详细]
  • 大类|电阻器_使用Requests、Etree、BeautifulSoup、Pandas和Path库进行数据抓取与处理 | 将指定区域内容保存为HTML和Excel格式
    大类|电阻器_使用Requests、Etree、BeautifulSoup、Pandas和Path库进行数据抓取与处理 | 将指定区域内容保存为HTML和Excel格式 ... [详细]
author-avatar
10灬月
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有