WebRTC学习进阶之路系列总目录:https://blog.csdn.net/xiaomucgwlmx/article/details/103204274
前言WebRTC源码中的很多注释是很赞的,看源码的时候多加阅读注释有助于更好的理解。
WebRTC实现了跨平台(Windows,MacOS,Linux,IOS,Android)的线程类rtc::Thread,WebRTC内部的network_thread,worker_thread,signaling_thread均是该类的实例。
基础功能
rtc:: Thread及其相关类,ThreadManager、MessageQueue,Runnable等等一起提供了如下的基础功能:
线程的管理:通过ThreadManager单例对象,可以管理所有的Thread实例;
线程的常规基本功能:rtc:: Thread提供创建线程对象,设置线程名称,启动线程去执行用户代码;
消息循环,消息投递:rtc:: Thread通过继承MessageQueue类,提供了内部消息循环,并提供了线程间异步,同步投递消息的功能;
跨线程执行方法:提供了跨线程执行方法,并返回执行结果的功能。该功能非常强大,因为WebRTC在某些功能模块的使用上,有要求其必需在指定的线程中才能调用的基本要求,比如音频模块:ADM 的创建必须要在 WebRTC 的 worker thread 中进行;
多路分离器:通过持有SocketServer对象,实现了多路分离器的功能,能处理网络IO;
signaling_thread:处理小工作量方法,要求此线程内的方法都必须快速返回。
worker_thread:处理大工作量的方法,此线程内的方法可能会处理很长时间。
network_thread:处理网络消息。
下边我们来看下两个线程的核心类:ThreadManager和Thread。
ThreadManager的源码位于rtc_base目录下的thread.h与thread.cc中,相关头文件如下:
class RTC_EXPORT ThreadManager {public:static const int kForever = -1;// Singleton, constructor and destructor are private.static ThreadManager* Instance();Thread* CurrentThread();void SetCurrentThread(Thread* thread);Thread* WrapCurrentThread();void UnwrapCurrentThread();bool IsMainThread();private:ThreadManager();~ThreadManager();#if defined(WEBRTC_POSIX)pthread_key_t key_;
#endif#if defined(WEBRTC_WIN)const DWORD key_;
#endif// The thread to potentially autowrap.const PlatformThreadRef main_thread_ref_;RTC_DISALLOW_COPY_AND_ASSIGN(ThreadManager);
};
我们来逐步看下这几个方法:
ThreadManager* ThreadManager::Instance() {static ThreadManager* const thread_manager = new ThreadManager();return thread_manager;
}
从源码可以看到ThreadManager是通过单例模式,通过静态方法Instance()来获取唯一的实例。
#if defined(WEBRTC_POSIX)
ThreadManager::ThreadManager() : main_thread_ref_(CurrentThreadRef()) {
#if defined(WEBRTC_MAC)InitCocoaMultiThreading();
#endifpthread_key_create(&key_, nullptr);
}
Thread* ThreadManager::CurrentThread() {return static_cast
}
void ThreadManager::SetCurrentThread(Thread* thread) {
#if RTC_DLOG_IS_ONif (CurrentThread() && thread) {RTC_DLOG(LS_ERROR) <<"SetCurrentThread: Overwriting an existing value?";}
#endif // RTC_DLOG_IS_ONpthread_setspecific(key_, thread);
}
#endif#if defined(WEBRTC_WIN)
ThreadManager::ThreadManager(): key_(TlsAlloc()), main_thread_ref_(CurrentThreadRef()) {}Thread* ThreadManager::CurrentThread() {return static_cast
}
void ThreadManager::SetCurrentThread(Thread* thread) {RTC_DCHECK(!CurrentThread() || !thread);TlsSetValue(key_, thread);
}
#endif
构造与析构函数均声明为private。下边看下ThreadManager在不同平台上的构造方式、设置和获取当前线程的系统差异性&#xff1a;
下边是来自网友ice_ly000的一段精辟的阐述&#xff0c;摘录部分如下&#xff1a;我们可以看到在Windows和类Unix系统中实现进行了区分&#xff0c;WEBRTC_POSIX宏表征该系统是类Unix系统&#xff0c;而WEBRTC_WIN宏表征是Windows系统。虽然实现稍微有些许不同&#xff0c;在MAC下还需要调用InitCocoaMultiThreading()方法来初始化多线程库。但是两个构造函数均初始化了成员key_与main_thread_ref_(我们可以看到WebRTC中的私有成员均以下划线结尾)。其中key是线程管理的关键。
key_的初始化&#xff1a;在Windows平台上&#xff0c;key_被声明为DWORD类型&#xff0c;赋值为TlsAlloc()函数的返回值&#xff0c;TlsAlloc()函数是Windows的系统API&#xff0c;Tls表示的是线程局部存储Thread Local Storage的缩写&#xff0c;其为每个可能的线程分配了一个线程局部变量的槽位&#xff0c;该槽位用来存储WebRTC的Thread线程对象指针。如果不了解相关概念&#xff0c;可以看微软的官方文档。在类Unix系统上&#xff0c;key_被声明pthread_key_t类型&#xff0c;使用方法pthread_key_create(&key_, nullptr);赋值。实质是类Unix系统上的线程局部存储实现&#xff0c;隶属于线程库pthread&#xff0c;因此方法与变量均以pthread开头。总之&#xff0c;在ThreadManager的构造之初&#xff0c;WebRTC就为各个线程所对应的Thread对象制造了一个线程局部变量的槽位&#xff0c;成为多线程管理的关键。
main_thread_ref_的初始化&#xff1a;该成员为PlatformThreadRef类型的对象&#xff0c;赋值为CurrentThreadRef()方法的返回值&#xff0c;如源码所示&#xff1a;在Windows系统下&#xff0c;取值为WinAPI GetCurrentThreadId()返回的当前线程描述符&#xff0c;DWORD类型&#xff1b;在FUCHSIA系统下(该系统是Google新开发的操作系统&#xff0c;像Android还是基于Linux内核属于类Unix范畴&#xff0c;遵循POSIX规范&#xff0c;但FUCHSIA是基于新内核zircon开发的)&#xff0c;返回zx_thread_self()&#xff0c;zx_handle_t类型&#xff1b;在类Unix系统下&#xff0c;通过pthread库的pthread_self()返回&#xff0c;pthread_t类型。总之&#xff0c;如前文所述&#xff0c;这部分代码肯定是在主线程中所运行&#xff0c;因此&#xff0c;main_thread_ref_存储了主线程TID在不同平台下的不同表示。
感谢ice_ly000&#xff01;
// Returns a thread object with its thread_ ivar set// to whatever the OS uses to represent the thread.// If there already *is* a Thread object corresponding to this thread,// this method will return that. Otherwise it creates a new Thread// object whose wrapped() method will return true, and whose// handle will, on Win32, be opened with only synchronization privileges -// if you need more privilegs, rather than changing this method, please// write additional code to adjust the privileges, or call a different// factory method of your own devising, because this one gets used in// unexpected contexts (like inside browser plugins) and it would be a// shame to break it. It is also conceivable on Win32 that we won&#39;t even// be able to get synchronization privileges, in which case the result// will have a null handle.Thread* WrapCurrentThread();void UnwrapCurrentThread();
Thread* ThreadManager::WrapCurrentThread() {Thread* result &#61; CurrentThread();if (nullptr &#61;&#61; result) {result &#61; new Thread(SocketServer::CreateDefault());result->WrapCurrentWithThreadManager(this, true);}return result;
}void ThreadManager::UnwrapCurrentThread() {Thread* t &#61; CurrentThread();if (t && !(t->IsOwned())) {t->UnwrapCurrent();delete t;}
}
上边头文件中的注释已经介绍的很清楚了&#xff0c;每个操作系统都会返回一个thread对象&#xff0c;已经存在thread对象就直接返回&#xff0c;否则创建一个新的thread对象&#xff0c;并通过该对象的WrapCurrentWithThreadManager()方法将新建的thread对象纳入ThreadManager的管理之中。对于UnwrapCurrentThread&#xff0c;会根据该线程是不是WrapCurrentWithThreadManager而来&#xff0c;决定是否进行正真的UnwrapCurrent操作删除Thread对象。
bool ThreadManager::IsMainThread() {return IsThreadRefEqual(CurrentThreadRef(), main_thread_ref_);
}
二、Thread
源码中的注释已经非常全&#xff0c;这里将.h及注释展示如下&#xff1a;
class RTC_LOCKABLE RTC_EXPORT Thread : public MessageQueue,public webrtc::TaskQueueBase {public:explicit Thread(SocketServer* ss);explicit Thread(std::unique_ptr
#endifprotected:// Same as WrapCurrent except that it never fails as it does not try to// acquire the synchronization access of the thread. The caller should never// call Stop() or Join() on this thread.void SafeWrapCurrent();// Blocks the calling thread until this thread has terminated.void Join();static void AssertBlockingIsAllowedOnCurrentThread();friend class ScopedDisallowBlockingCalls;private:class QueuedTaskHandler final : public MessageHandler {public:void OnMessage(Message* msg) override;};// Sets the per-thread allow-blocking-calls flag and returns the previous// value. Must be called on this thread.bool SetAllowBlockingCalls(bool allow);#if defined(WEBRTC_WIN)static DWORD WINAPI PreRun(LPVOID context);
#elsestatic void* PreRun(void* pv);
#endif// ThreadManager calls this instead WrapCurrent() because// ThreadManager::Instance() cannot be used while ThreadManager is// being created.// The method tries to get synchronization rights of the thread on Windows if// |need_synchronize_access| is true.bool WrapCurrentWithThreadManager(ThreadManager* thread_manager,bool need_synchronize_access);// Return true if the thread is currently running.bool IsRunning();// Processes received "Send" requests. If |source| is not null, only requests// from |source| are processed, otherwise, all requests are processed.void ReceiveSendsFromThread(const Thread* source);// If |source| is not null, pops the first "Send" message from |source| in// |sendlist_|, otherwise, pops the first "Send" message of |sendlist_|.// The caller must lock |crit_| before calling.// Returns true if there is such a message.bool PopSendMessageFromThread(const Thread* source, _SendMessage* msg);void InvokeInternal(const Location& posted_from, MessageHandler* handler);std::list<_SendMessage> sendlist_;std::string name_;// TODO(tommi): Add thread checks for proper use of control methods.// Ideally we should be able to just use PlatformThread.#if defined(WEBRTC_POSIX)pthread_t thread_ &#61; 0;
#endif#if defined(WEBRTC_WIN)HANDLE thread_ &#61; nullptr;DWORD thread_id_ &#61; 0;
#endif// Indicates whether or not ownership of the worker thread lies with// this instance or not. (i.e. owned_ &#61;&#61; !wrapped).// Must only be modified when the worker thread is not running.bool owned_ &#61; true;// Only touched from the worker thread itself.bool blocking_calls_allowed_ &#61; true;// Runs webrtc::QueuedTask posted to the Thread.QueuedTaskHandler queued_task_handler_;friend class ThreadManager;RTC_DISALLOW_COPY_AND_ASSIGN(Thread);
};
explicit Thread(SocketServer* ss);explicit Thread(std::unique_ptr
Thread::Thread(SocketServer* ss) : Thread(ss, /*do_init&#61;*/true) {}Thread::Thread(std::unique_ptr
}Thread::Thread(std::unique_ptr
}
void MessageQueue::DoInit() {if (fInitialized_) {return;}fInitialized_ &#61; true;MessageQueueManager::Add(this);
}
我们可以看到有四个构造&#xff0c;前边两个声明为explicit&#xff0c;C&#43;&#43;提供了关键字explicit&#xff0c;可以阻止不应该允许的经过转换构造函数进行的隐式转换的发生。声明为explicit的构造函数不能在隐式转换中使用。而头文件中对没有explicit修饰的两个已经做了解释&#xff0c;用于子类的构造函数&#xff0c; 它们应该自己调用DoInit并为| do_init |传递false&#xff0c;以便仅在完全实例化的类上调用DoInit&#xff0c;这避免了vptr数据争用。
DoInit()方法在Thread构造中调用&#xff0c;我们会发现该方法将MQ的初始化标志置为true&#xff0c;并且将自身纳入MQ管理类的管理列表中。如果DoInit在MQ构造中调用&#xff0c;意味着MQ构造后&#xff0c;Thread对象的指针已经暴露于外&#xff08;被MQ管理类对象持有&#xff09;&#xff0c;此时Thread对象并未完全构建完成&#xff0c;其虚表vtable还未完全建立。这势必会导致Thread的对象还未构造完成时&#xff0c;就可能会被外部使用(在别的线程中通过MessageQueueManager访问该对象)的风险。为了规避这样的竞太条件&#xff0c;因此&#xff0c;需要给MQ的构造传入false&#xff0c;并在Thread构造中调用DoInit()。
client中的应用如下&#xff1a;
rtc::WinsockInitializer winsock_init;rtc::Win32SocketServer w32_ss;rtc::Win32Thread w32_thread(&w32_ss);rtc::ThreadManager::Instance()->SetCurrentThread(&w32_thread);
std::unique_ptr
}std::unique_ptr
}
class NullSocketServer : public SocketServer {public:NullSocketServer();~NullSocketServer() override;bool Wait(int cms, bool process_io) override;void WakeUp() override;Socket* CreateSocket(int family, int type) override;AsyncSocket* CreateAsyncSocket(int family, int type) override;private:Event event_;
};
bool Thread::Start() {RTC_DCHECK(!IsRunning());if (IsRunning())return false;Restart(); // reset IsQuitting() if the thread is being restarted// Make sure that ThreadManager is created on the main thread before// we start a new thread.ThreadManager::Instance();owned_ &#61; true;#if defined(WEBRTC_WIN)thread_ &#61; CreateThread(nullptr, 0, PreRun, this, 0, &thread_id_);if (!thread_) {return false;}
#elif defined(WEBRTC_POSIX)pthread_attr_t attr;pthread_attr_init(&attr);int error_code &#61; pthread_create(&thread_, &attr, PreRun, this);if (0 !&#61; error_code) {RTC_LOG(LS_ERROR) <<"Unable to create pthread, error " <
}
这块很简单&#xff0c;首先检测运行状态&#xff0c;复位消息循环stop_标志位&#xff0c;快平台创建线程对象。重点在PreRun():
// static
#if defined(WEBRTC_WIN)
DWORD WINAPI Thread::PreRun(LPVOID pv) {
#else
void* Thread::PreRun(void* pv) {
#endifThread* thread &#61; static_cast
// 如果是MAC系统&#xff0c;通过pool对象的创建和析构来使用oc的自动释放池技术&#xff0c;进行内存回收。
#if defined(WEBRTC_MAC)ScopedAutoReleasePool pool;
#endifthread->Run();//阻塞循环&#xff0c;下边再详解//到这里意味着线程要释放了ThreadManager::Instance()->SetCurrentThread(nullptr);
#ifdef WEBRTC_WINreturn 0;
#elsereturn nullptr;
#endif
} // namespace rtc
详细的介绍已经写在了主时中&#xff0c;下边我们来看一下Run()方法。
void Thread::Run() {ProcessMessages(kForever);
}
bool Thread::ProcessMessages(int cmsLoop) {// Using ProcessMessages with a custom clock for testing and a time greater// than 0 doesn&#39;t work, since it&#39;s not guaranteed to advance the custom// clock&#39;s time, and may get stuck in an infinite loop.RTC_DCHECK(GetClockForTesting() &#61;&#61; nullptr || cmsLoop &#61;&#61; 0 ||cmsLoop &#61;&#61; kForever);// 计算终止处理消息的时间int64_t msEnd &#61; (kForever &#61;&#61; cmsLoop) ? 0 : TimeAfter(cmsLoop);// 下次可以进行消息获取的时间长度int cmsNext &#61; cmsLoop;while (true) {
#if defined(WEBRTC_MAC)ScopedAutoReleasePool pool;
#endifMessage msg;if (!Get(&msg, cmsNext))return !IsQuitting();Dispatch(&msg);// 若不是无限期&#xff0c;计算下次可以进行消息获取的时间if (cmsLoop !&#61; kForever) {cmsNext &#61; static_cast
}
ProcessMessages当传入参数为kForever(static const int kForever &#61; -1;)时消息循环无限期进行循环处理&#xff0c;否则有限期定时处理&#xff0c;while中一直会调用MessageQueue::Get()去获取消息(MessageQueue、MessageQueueManager等会在后续核心分析)&#xff0c;通过Dispatch(&msg);来处理消息。
// Convenience method to invoke a functor on another thread. Caller must// provide the |ReturnT| template argument, which cannot (easily) be deduced.// Uses Send() internally, which blocks the current thread until execution// is complete.// Ex: bool result &#61; thread.Invoke
void Thread::InvokeInternal(const Location& posted_from,MessageHandler* handler) {TRACE_EVENT2("webrtc", "Thread::Invoke", "src_file_and_line",posted_from.file_and_line(), "src_func",posted_from.function_name());Send(posted_from, handler);
}
Invoke提供在另一个线程上调用函数的便捷方法。原理是内部使用Send&#xff08;&#xff09;&#xff0c;它将阻塞当前线程&#xff0c;直到执行完成。这里要注意两点&#xff1a;
下边我们看下阻塞执行的send()方法。
void Thread::Send(const Location& posted_from,MessageHandler* phandler,uint32_t id,MessageData* pdata) {if (IsQuitting())return;// Sent messages are sent to the MessageHandler directly, in the context// of "thread", like Win32 SendMessage. If in the right context,// call the handler directly.Message msg;msg.posted_from &#61; posted_from;msg.phandler &#61; phandler;msg.message_id &#61; id;msg.pdata &#61; pdata;if (IsCurrent()) {phandler->OnMessage(&msg);return;}AssertBlockingIsAllowedOnCurrentThread();AutoThread thread;Thread* current_thread &#61; Thread::Current();RTC_DCHECK(current_thread !&#61; nullptr); // AutoThread ensures thisbool ready &#61; false;{CritScope cs(&crit_);_SendMessage smsg;smsg.thread &#61; current_thread;smsg.msg &#61; msg;smsg.ready &#61; &ready;sendlist_.push_back(smsg);}// Wait for a replyWakeUpSocketServer();bool waited &#61; false;crit_.Enter();while (!ready) {crit_.Leave();// We need to limit "ReceiveSends" to |this| thread to avoid an arbitrary// thread invoking calls on the current thread.current_thread->ReceiveSendsFromThread(this);current_thread->socketserver()->Wait(kForever, false);waited &#61; true;crit_.Enter();}crit_.Leave();// Our Wait loop above may have consumed some WakeUp events for this// MessageQueue, that weren&#39;t relevant to this Send. Losing these WakeUps can// cause problems for some SocketServers.//// Concrete example:// Win32SocketServer on thread A calls Send on thread B. While processing the// message, thread B Posts a message to A. We consume the wakeup for that// Post while waiting for the Send to complete, which means that when we exit// this loop, we need to issue another WakeUp, or else the Posted message// won&#39;t be processed in a timely manner.if (waited) {current_thread->socketserver()->WakeUp();}
}
参考链接&#xff1a;https://blog.csdn.net/ice_ly000/article/details/103178691
WebRTC学习进阶之路系列总目录&#xff1a;https://blog.csdn.net/xiaomucgwlmx/article/details/103204274