条件变量变量也是出自POSIX线程标准,另一种线程同步机制,。主要用来等待某个条件的发生。可以用来同步同一进程中的各个线程。当然如果一个条件变量存放在多个进程共享的某个内存区中,那么还可以通过条件变量来进行进程间的同步。
每个条件变量总是和一个互斥量相关联,条件本身是由互斥量保护的,线程在改变条件状态之间必须要锁住互斥量。条件变量相对于互斥量最大的优点在于允许线程以无竞争的方式等待条件的发生。当一个线程获得互斥锁后,发现自己需要等待某个条件变为真,如果是这样,该线程就可以等待在某个条件上,这样就不需要通过轮询的方式来判断添加,大大节省了CPU时间。
在互斥量一文中说过:互斥量是用于上锁,而不是用于等待;现在这句话可以加强为:互斥量是用于上锁,条件变量用于等待;
条件变量声明为pthread_cond_t数据类型,在<bits/pthreadtypes.h>中有具体的定义。
1、条件变量初始化和销毁
/* Initialize condition variable */
int pthread_cond_init (pthread_cond_t *__restrict __cond,
__const pthread_condattr_t *__restrict __cond_attr) ;
/* Destroy condition variable */
int pthread_cond_destroy (pthread_cond_t *__cond);
|
上面两个函数分别由于条件变量的初始化和销毁。
和互斥量的初始化一样,如果条件变量是静态分配的,可以通过常量进行初始化,如下:
pthread_cond_t mlock = PTHREAD_COND_INITIALIZER;
|
也可以通过pthread_cond_init()进行初始化,对于动态分配的条件变量由于不能直接赋值进行初始化,就只能采用这种方式进行初始化。那么当不在需要使用条件变量时,需要调用pthread_cond_destroy()销毁该条件所占用的资源。
2、条件变量的属性设置
/* 初始化条件变量属性对象 */
int pthread_condattr_init (pthread_condattr_t *__attr);
/* 销毁条件变量属性对象 */
int pthread_condattr_destroy (pthread_condattr_t *__attr);
/* 获取条件变量属性对象在进程间共享与否的标识 */
int pthread_condattr_getpshared (__const pthread_condattr_t * __restrict __attr,
int *__restrict __pshared);
/* 设置条件变量属性对象,标识在进程间共享与否 */
int pthread_condattr_setpshared (pthread_condattr_t *__attr, int __pshared) ; |
这个属性的设置和互斥量属性设置是一样的,具体使用可以参考互斥量的用法:互斥量的属性设置。
3、条件变量的使用
/* 等待条件变为真 */
int pthread_cond_wait (pthread_cond_t *__restrict __cond,
pthread_mutex_t *__restrict __mutex);
/* 限时等待条件为真 */
int pthread_cond_timedwait (pthread_cond_t *__restrict __cond,
pthread_mutex_t *__restrict __mutex,
__const struct timespec *__restrict __abstime);
/* 唤醒一个等待条件的线程. */
int pthread_cond_signal (pthread_cond_t *__cond);
/* 唤醒等待该条件的所有线程 */
int pthread_cond_broadcast (pthread_cond_t *__cond);
|
(1)pthread_cond_wait()函数用于等待条件被触发。该函数传入两个参数,一个条件变量一个互斥量,函数将条件变量和互斥量进行关联,互斥量对该条件进行保护,传入的互斥量必须是已经锁住的。调用pthread_cond_wait()函数后,会原子的执行以下两个动作:
1)将调用线程放到等待条件的线程列表上,即进入睡眠;
2)对互斥量进行解锁;
由于这两个操作时原子操作,这样就关闭了条件检查和线程进入睡眠等待条件改变这两个操作之间的时间通道,这样就不会错过任何条件的变化。
当pthread_cond_wait()返回后,互斥量会再次被锁住。
(2)pthread_cond_timedwait()函数和pthread_cond_wait()的工作方式相似,只是多了一个等待时间。等待时间的结构为struct
timespec,
/* 等待条件变为真 */
int pthread_cond_wait (pthread_cond_t *__restrict __cond,
pthread_mutex_t *__restrict __mutex);
/* 限时等待条件为真 */
int pthread_cond_timedwait (pthread_cond_t *__restrict __cond,
pthread_mutex_t *__restrict __mutex,
__const struct timespec *__restrict __abstime);
/* 唤醒一个等待条件的线程. */
int pthread_cond_signal (pthread_cond_t *__cond);
/* 唤醒等待该条件的所有线程 */
int pthread_cond_broadcast (pthread_cond_t *__cond);
|
函数要求传入的时间值是一个绝对值,不是相对值,例如,想要等待3分钟,必须先获得当前时间,然后加上3分钟。
要想获得当前系统时间的timespec值,没有直接可调用的函数,需要通过调用gettimeofday函数获取timeval结构,然后转换成timespec结构,转换公式就是:
/* 等待条件变为真 */
int pthread_cond_wait (pthread_cond_t *__restrict __cond,
pthread_mutex_t *__restrict __mutex);
/* 限时等待条件为真 */
int pthread_cond_timedwait (pthread_cond_t *__restrict __cond,
pthread_mutex_t *__restrict __mutex,
__const struct timespec *__restrict __abstime);
/* 唤醒一个等待条件的线程. */
int pthread_cond_signal (pthread_cond_t *__cond);
/* 唤醒等待该条件的所有线程 */
int pthread_cond_broadcast (pthread_cond_t *__cond);
|
所以要等待3分钟,timespec时间结构的获得应该如下所示:
struct timeval now;
struct timespec until;
gettimeofday(&now);//获得系统当前时间
//把时间从timeval结构转换成timespec结构
until.tv_sec = now.tv_sec;
until.tv_nsec = now.tv_usec * 1000;
//增加min
until.tv_sec += 3 * 60;
|
如果时间到后,条件还没有发生,那么会返回ETIMEDOUT错误。
从pthread_cond_wait()和pthread_cond_timewait()成功返回时,线程需要重新计算条件,因为其他线程可能在运行过程中已经改变条件。
(3)pthread_cond_signal() & pthread_cond_broadcast()
这两个函数都是用于向等待条件的线程发送唤醒信号,pthread_cond_signal()函数只会唤醒等待该条件的某个线程,pthread_cond_broadcast()会广播条件状态的改变,以唤醒等待该条件的所有线程。例如多个线程只读共享资源,这是可以将它们都唤醒。
这里要注意的是:一定要在改变条件状态后,再给线程发送信号。
考虑条件变量信号单播发送和广播发送的一种候选方式是坚持使用广播发送。只有在等待者代码编写确切,只有一个等待者需要唤醒,且唤醒哪个线程无所谓,那么此时为这种情况使用单播,所以其他情况下都必须使用广播发送。
下面是一个测试代码,模拟同步问题中经典的生产者消费者问题。
#include <iostream> #include <queue> #include <cstdlib>
#include <unistd.h>
#include <pthread.h>
using namespace std;
//把共享数据和它们的同步变量集合到一个结构中,这往往是一个较好的编程技巧。
struct{
pthread_mutex_t mutex;
pthread_cond_t cond;
queue<int> product;
}sharedData = {PTHREAD_MUTEX_INITIALIZER, PTHREAD_COND_INITIALIZER};
void * produce(void *ptr)
{
for (int i = 0; i < 10; ++i)
{
pthread_mutex_lock(&sharedData.mutex);
sharedData.product.push(i);
pthread_mutex_unlock(&sharedData.mutex);
if (sharedData.product.size() == 1)
pthread_cond_signal(&sharedData.cond);
//sleep(1);
}
}
void * consume(void *ptr)
{
for (int i = 0; i < 10;)
{
pthread_mutex_lock(&sharedData.mutex);
while(sharedData.product.empty())
pthread_cond_wait(&sharedData.cond, &sharedData.mutex);
++i;
cout<<"consume:"<<sharedData.product.front()<<endl;
sharedData.product.pop();
pthread_mutex_unlock(&sharedData.mutex);
//sleep(1);
}
}
int main()
{
pthread_t tid1, tid2;
pthread_create(&tid1, NULL, consume, NULL);
pthread_create(&tid2, NULL, produce, NULL);
void *retVal;
pthread_join(tid1, &retVal);
pthread_join(tid2, &retVal);
return 0;
} |
程序的运行结果如下所示:
consume:0
consume:1
consume:2
consume:3
consume:4
consume:5
consume:6
consume:7
consume:8
consume:9
|
|