转载至 https://www.codeproject.com/Articles/28785/Thread-synchronization-Wait-and-Pulse-demystified
This short article is about the three less well understood methods of the Monitor
class: Wait
, Pulse
, and PulseAll
. The MSDN documentation for these methods explains what they do, but not why or how to use them. I will try to fill this gap and demystify them.
These methods provide low-level synchronization between threads. They are the most versatile constructs, but they are harder to use correctly than other synchronization primitives like AutoResetEvent
. However, if you follow the recommended pattern I will describe, they can be used with confidence.
public partial static class Monitor
{
public static bool Wait( object o ) { ... } // and overloads
public static void Pulse( object o ) { ... }
public static void PulseAll( object o ) { ... }
}
At the most basic level, a thread calls Wait
when it wants to enter a wait state, and another thread calls Pulse
when it wants to wake up the waiting thread. Note that if a thread calls Pulse
when no other threads are waiting, the Pulse
is lost. This is in contrast to primitives like AutoResetEvent
, where a call to Set
is remembered until a subsequent call to WaitOne
occurs.
There is a requirement that all these methods must be called from within a lock
statement. If they are called while not holding the lock, then they will throw a SynchronizationLockException
. We will see why this is actually useful, later on.
For example:
readonly object key = new object();
// thread A
lock ( key ) Monitor.Wait( key );
// thread B
lock ( key ) Monitor.Pulse( key );
If thread A runs first, it acquires the lock and executes the Wait
method. Then, thread B runs, releasing thread A to allow it to continue.
You may have noticed a little problem with the code above. If thread A holds the lock on the key
object, why does thread B not block when it tries to acquire the lock? This is, of course, handled properly. The call to Wait
in thread A releases the lock before it waits. This allows thread B to acquire the lock and call Pulse
. Thread A then resumes, but it has to wait until thread B releases the lock, so it can reacquire it and complete the Wait
call. Note that Pulse
never blocks.
The ready queue is the collection of threads that are waiting for a particular lock. The Monitor.Wait
methods introduce another queue: the waiting queue. This is required as waiting for a Pulse
is distinct from waiting to acquire a lock. Like the ready queue, the waiting queue is FIFO.
These queues can lead to unexpected behaviour. When a Pulse
occurs, the head of the waiting queue is released and is added to the ready queue. However, if there are other threads in the ready queue, they will acquire the lock before the thread that was released. This is a problem, because the thread that acquires the lock can alter the state that the pulsed thread relies on.
The solution is to use a while
condition inside the lock
statement:
readonly object key = new object();
bool block = true;
// thread A
lock ( key )
{
while ( block )
Monitor.Wait( key );
block = true;
}
// thread B
lock ( key )
{
block = false;
Monitor.Pulse( key );
}
This pattern shows the reason for the rule that locks must be used: they protect the condition variable from concurrent access. Locks are also memory barriers, so you do not have to declare the condition variables as volatile
.
The while
loop also solves a couple of other problems. Firstly, if thread B executes before thread A calls Wait
, the Pulse
is lost. However, when thread A eventually runs and acquires the lock, the first thing it does is check the condition. Thread B has already set block
to false
, so Thread A continues without ever calling Wait
.
Secondly, it solves the problem of the queues. If thread A is pulsed, it leaves the waiting queue and is added to the ready queue. If, however, a different thread acquires the lock and this thread sets block
back to true
, it now doesn‘t matter. This is because thread A will go round the while
loop, find the condition to block is true
, and execute Wait
again.
The code above is actually an implementation of an AutoResetEvent
. If you omitted the block = true
statement in thread A, it would be a ManualResetEvent
. If you use an int
instead of bool
for the condition, it would be a Semaphore. This shows how versatile this pattern is. You have complete control over what condition you use in the while
loop.
So, the rule of thumb is: use higher level primitives if they fit. If you need finer control, use the Monitor
methods with this pattern to ensure correctness.
It is usually okay to use PulseAll
instead of Pulse
. This causes all the waiting threads to re-evaluate their conditions and either continue or go back to waiting. As long as the condition evaluation is not too expensive, this is normally acceptable. PulseAll
is also useful where you have multiple threads waiting on the same synchronization object, but with different conditions.
There are overloads of Wait
that take a timeout parameter. Similar to above, this usually doesn‘t hurt. It can help prevent stuck Wait
s or missed Pulse
s by re-evaluating the conditions on a regular basis.
Here is an example with full source code that demonstrates the versatility of this pattern. It implements a blocking queue that can be stopped. A blocking queue is a fixed-size queue. If the queue is full, attempts to add an item will block. If the queue is empty, attempts to remove an item will block. When Quit()
is called, the queue is stopped. This means that you can‘t add any more items, but you can remove existing items until the queue is empty. At that point, the queue is finished.
This is a fairly complex set of conditions. You could implement this using a combination of higher-level constructs, but it would be harder. The pattern makes this implementation relatively trivial.
class BlockingQueue
{
readonly int _Size = 0;
readonly Queue
readonly object _Key = new object();
bool _Quit = false;
public BlockingQueue( int size )
{
_Size = size;
}
public void Quit()
{
lock ( _Key )
{
_Quit = true;
Monitor.PulseAll( _Key );
}
}
public bool Enqueue( T t )
{
lock ( _Key )
{
while ( !_Quit && _Queue.Count >= _Size ) Monitor.Wait( _Key );
if ( _Quit ) return false;
_Queue.Enqueue( t );
Monitor.PulseAll( _Key );
}
return true;
}
public bool Dequeue( out T t )
{
t = default( T );
lock ( _Key )
{
while ( !_Quit && _Queue.Count == 0 ) Monitor.Wait( _Key );
if ( _Queue.Count == 0 ) return false;
t = _Queue.Dequeue();
Monitor.PulseAll( _Key );
}
return true;
}
}
This implementation is safe for concurrent access by an arbitrary number of producers and consumers. Here is an example with one fast producer and two slow consumers:
internal static void Test()
{
var q = new BlockingQueue
// Producer
new Thread( () =>
{
for ( int x = 0 ; ; x++ )
{
if ( !q.Enqueue( x ) ) break;
Trace.WriteLine( x.ToString( "0000" ) + " >" );
}
Trace.WriteLine( "Producer quitting" );
} ).Start();
// Consumers
for ( int i = 0 ; i <2 ; i++ )
{
new Thread( () =>
{
for ( ; ; )
{
Thread.Sleep( 100 );
int x = 0;
if ( !q.Dequeue( out x ) ) break;
Trace.WriteLine( " <" + x.ToString( "0000" ) );
}
Trace.WriteLine( "Consumer quitting" );
} ).Start();
}
Thread.Sleep( 1000 );
Trace.WriteLine( "Quitting" );
q.Quit();
}
And, here is the output of one run:
0.00000000 0000 >
0.00006564 0001 >
0.00009096 0002 >
0.00011540 0003 >
0.09100076 <0000
0.09105981 <0001
0.09118936 0004 >
0.09121715 0005 >
0.19127709 <0002
0.19138214 0006 >
0.19141905 0007 >
0.19156006 <0003
0.29184034 <0004
0.29195839 <0005
0.29209006 0008 >
0.29211268 0009 >
0.39240077 <0006
0.39249521 <0007
0.39265713 0010 >
0.39268187 0011 >
0.49300483 <0008
0.49308145 0012 >
0.49310759 0013 >
0.49324051 <0009
0.59353358 <0010
0.59361452 <0011
0.59378797 0014 >
0.59381104 0015 >
0.69410956 <0012
0.69421405 0016 >
0.69423932 0017 >
0.69443953 <0013
0.79467082 <0014
0.79478532 <0015
0.79493624 0018 >
0.79496473 0019 >
0.89524573 <0016
0.89536309 <0017
0.89549100 0020 >
0.89552164 0021 >
0.98704302 Quitting
0.98829663 Producer quitting
0.99580252 <0018
0.99590403 <0019
1.09638131 <0020
1.09647286 <0021
1.19700873 Consumer quitting
1.19717586 Consumer quitting