首先:我在互斥/多线程编程完全是一个新手,对任何错误很抱歉提前...
我有一个运行多个线程的程序。 螺纹(每个CPU核心通常是一个)做了很多的计算和“思考”,然后有时他们决定打电话给一个特定的(共享)方法,更新一些统计数据。 统计更新并发通过使用一个互斥体的管理:
stats_mutex.lock();
common_area->update_thread_stats( ... );
stats_mutex.unlock();
现在的问题。 所有这些线程有需要差不多一个特定线程
实时优先级,因为它是实际操作的唯一线索。
随着“几乎实时优先”我的意思是:
让我们假设线程T0是“特权一个”和T1 .... T15是正常ones.What情况现在是:
- 线T1获得锁。
- 线程T2,T3,T0调用lock()方法,并等待它的成功。
- 线程T1调用解锁()
- 一个(随机的,据我所知)线程T2,T3的,T0成功地获取锁,而其他的人继续等待。
我需要的是:
- 线T1获取锁。
- 线程T2,T3,T0调用lock()方法,并等待它的成功。
- 线程T1调用解锁()
- 线程T0获得锁,因为它的特权
那么,什么是最好的(可能是最简单的)方法做这件事情?
我当时的想法是有所谓的“privileged_needs_lock”一个布尔变量。
但我想我需要另一个互斥来管理这个变量的访问。我不知道这是正确的方式...
附加信息:
- 我的线程使用11 C ++(如GCC 4.6.3)
- 代码需要在Linux和Windows(不过目前只在Linux上测试)运行。
- 在锁定机构的表现是不是一个问题(我的表现的问题是内螺纹的计算和线程数将永远是低的,一个或两个每个CPU核心最大)
任何想法表示赞赏。 谢谢
下面的解决方案的工作(三级互斥方式):
#include <thread>
#include <iostream>
#include "unistd.h"
std::mutex M;
std::mutex N;
std::mutex L;
void lowpriolock(){
L.lock();
N.lock();
M.lock();
N.unlock();
}
void lowpriounlock(){
M.unlock();
L.unlock();
}
void highpriolock(){
N.lock();
M.lock();
N.unlock();
}
void highpriounlock(){
M.unlock();
}
void hpt(const char* s){
using namespace std;
//cout << "hpt trying to get lock here" << endl;
highpriolock();
cout << s << endl;
sleep(2);
highpriounlock();
}
void lpt(const char* s){
using namespace std;
//cout << "lpt trying to get lock here" << endl;
lowpriolock();
cout << s << endl;
sleep(2);
lowpriounlock();
}
int main(){
std::thread t0(lpt,"low prio t0 working here");
std::thread t1(lpt,"low prio t1 working here");
std::thread t2(hpt,"high prio t2 working here");
std::thread t3(lpt,"low prio t3 working here");
std::thread t4(lpt,"low prio t4 working here");
std::thread t5(lpt,"low prio t5 working here");
std::thread t6(lpt,"low prio t6 working here");
std::thread t7(lpt,"low prio t7 working here");
//std::cout << "All threads created" << std::endl;
t0.join();
t1.join();
t2.join();
t3.join();
t4.join();
t5.join();
t6.join();
t7.join();
return 0;
}
尝试了以下解决方案,建议,但它不工作(以“克++ -std =的C ++ 0x -o测试TEST.CPP -lpthread”编译):
#include <thread>
#include <mutex>
#include "time.h"
#include "pthread.h"
std::mutex l;
void waiter(){
l.lock();
printf("Here i am, waiter starts\n");
sleep(2);
printf("Here i am, waiter ends\n");
l.unlock();
}
void privileged(int id){
usleep(200000);
l.lock();
usleep(200000);
printf("Here i am, privileged (%d)\n",id);
l.unlock();
}
void normal(int id){
usleep(200000);
l.lock();
usleep(200000);
printf("Here i am, normal (%d)\n",id);
l.unlock();
}
int main(){
std::thread tw(waiter);
std::thread t1(normal,1);
std::thread t0(privileged,0);
std::thread t2(normal,2);
sched_param sch;
int policy;
pthread_getschedparam(t0.native_handle(), &policy, &sch);
sch.sched_priority = -19;
pthread_setschedparam(t0.native_handle(), SCHED_FIFO, &sch);
pthread_getschedparam(t1.native_handle(), &policy, &sch);
sch.sched_priority = 18;
pthread_setschedparam(t1.native_handle(), SCHED_FIFO, &sch);
pthread_getschedparam(t2.native_handle(), &policy, &sch);
sch.sched_priority = 18;
pthread_setschedparam(t2.native_handle(), SCHED_FIFO, &sch);
tw.join();
t1.join();
t0.join();
t2.join();
return 0;
}
我能想到的唯一使用线程原语的三种方法:
三重互斥
三个互斥体将在这里工作:
- 数据互斥( 'M')
- 下一个要访问的互斥(“N”),并
- 低优先级接入互斥(“L”)
访问模式是:
- 低优先级的线程:锁L,锁N,锁男,解锁N,{做的东西},开锁男,解锁大号
- 高优先级的线程:锁N,锁男,解锁N,{做的东西},解锁中号
这样的访问数据保护,高优先级的线程可以访问它获得成功的低优先级的线程。
互斥,条件变量,原子标志
做到这一点的原始的方式是用一个条件变量和原子:
- 互斥米;
- Condvar℃;
- 原子布尔hpt_waiting;
数据访问模式:
- 低优先级的线程:锁男,而(hpt_waiting)等待C对男,{做的东西},广播C,解锁中号
- 高优先级的线程:hpt_waiting:= TRUE,锁男,hpt_waiting:=假,{做的东西},广播C,解锁中号
互斥,条件变量,两个非原子标志
另外,您可以使用与condvar两个非原子的bool; 在这种技术中,互斥/ condvar保护标志,数据由互斥锁,而是由一个标志得不到保障:
把请求的线程上的“优先级队列”。 特权线程可以得到第一次去的数据时,它是免费的。
这样做的一个方法是withan ConcurrentQueues [privilegeLevel],锁和一些事件的数组。
想要在数据的任何线程进入锁。 如果数据是免费的,(布尔),它得到的数据对象和退出锁。 如果数据是被其他线程,请求线程推动一个事件到并发队列之一,根据其权限级别,退出锁定并等待该事件。
当一个线程想要释放它的数据对象的所有权,它得到了锁,并遍历从最高特权ConcurrentQueues的阵列端朝下,寻找一个事件(即队列计数> 0)。 如果找到一个,它标志着它并退出锁,如果不是,它设置“dataFree”布尔型和并退出锁。
当等待一个事件的访问数据的线程已准备好,它可以访问数据对象。
我thnk应该工作。 请其他开发商,看看这个设计,看看你能想到的任何种族等的? 我还是有点距离的“待客超载”了一趟CZ后痛苦..
编辑 - 可能甚至都不需要,因为在它们之间所有的明确锁定的并发队列。 任何旧的队列会做。
#include <thread>
#include <mutex>
#include <condition_variable>
#include <cassert>
class priority_mutex {
std::condition_variable cv_;
std::mutex gate_;
bool locked_;
std::thread::id pr_tid_; // priority thread
public:
priority_mutex() : locked_(false) {}
~priority_mutex() { assert(!locked_); }
priority_mutex(priority_mutex&) = delete;
priority_mutex operator=(priority_mutex&) = delete;
void lock(bool privileged = false) {
const std::thread::id tid = std::this_thread::get_id();
std::unique_lock<decltype(gate_)> lk(gate_);
if (privileged)
pr_tid_ = tid;
cv_.wait(lk, [&]{
return !locked_ && (pr_tid_ == std::thread::id() || pr_tid_ == tid);
});
locked_ = true;
}
void unlock() {
std::lock_guard<decltype(gate_)> lk(gate_);
if (pr_tid_ == std::this_thread::get_id())
pr_tid_ = std::thread::id();
locked_ = false;
cv_.notify_all();
}
};
注意:此priority_mutex
提供了不公平的线程调度。 如果特权线程频繁获取锁,其他非特权线程可能几乎没有安排。
用法示例:
#include <mutex>
priority_mutex mtx;
void privileged_thread()
{
//...
{
mtx.lock(true); // acquire 'priority lock'
std::unique_lock<decltype(mtx)> lk(mtx, std::adopt_lock);
// update shared state, etc.
}
//...
}
void normal_thread()
{
//...
{
std::unique_lock<decltype(mtx)> lk(mtx); // acquire 'normal lock'
// do something
}
//...
}
在Linux下,你可以检查此人:pthread_setschedparam也是人sched_setscheduler
pthread_setschedparam(的pthread_t线程,诠释政策,常量结构sched_param * PARAM);
检查这也为C ++ 2011: http://msdn.microsoft.com/en-us/library/system.threading.thread.priority.aspx#Y78
试着像下面这样。 你可以做的类线程安全的单,你甚至可以让它变成一个仿函数。
#include <pthread.h>
#include <semaphore.h>
#include <map>
class ThreadPrioFun
{
typedef std::multimap<int, sem_t*> priomap_t;
public:
ThreadPrioFun()
{
pthread_mutex_init(&mtx, NULL);
}
~ThreadPrioFun()
{
pthread_mutex_destroy(&mtx);
}
void fun(int prio, sem_t* pSem)
{
pthread_mutex_lock(&mtx);
bool bWait = !(pm.empty());
priomap_t::iterator it = pm.insert(std::pair<int, sem_t*>(prio, pSem) );
pthread_mutex_unlock(&mtx);
if( bWait ) sem_wait(pSem);
// do the actual job
// ....
//
pthread_mutex_lock(&mtx);
// done, remove yourself
pm.erase(it);
if( ! pm.empty() )
{
// let next guy run:
sem_post((pm.begin()->second));
}
pthread_mutex_unlock(&mtx);
}
private:
pthread_mutex_t mtx;
priomap_t pm;
};
并行线程有线程优先级:
pthread_setschedprio( (pthread_t*)(&mThreadId), wpri );
如果多个线程在锁睡觉等待,调度程序将首先唤醒最高优先级的线程。
由于线程优先级不为你工作:
创建互斥2,定期锁定和优先级锁。
普通线程必须首先锁定正常的锁,然后优先级锁。 优先级的线程只锁定优先级锁:
Mutex mLock;
Mutex mPriLock;
doNormal()
{
mLock.lock();
pthread_yield();
doPriority();
mLock.unlock();
}
doPriority()
{
mPriLock.lock();
doStuff();
mPriLock.unlock();
}
稍微修改ecatmur答案,加入了第四互斥处理多个高优先级的线程同时(注意,这不是在我原来的问题要求 ):
#include <thread>
#include <iostream>
#include "unistd.h"
std::mutex M; //data access mutex
std::mutex N; // 'next to access' mutex
std::mutex L; //low priority access mutex
std::mutex H; //hptwaiting int access mutex
int hptwaiting=0;
void lowpriolock(){
L.lock();
while(hptwaiting>0){
N.lock();
N.unlock();
}
N.lock();
M.lock();
N.unlock();
}
void lowpriounlock(){
M.unlock();
L.unlock();
}
void highpriolock(){
H.lock();
hptwaiting++;
H.unlock();
N.lock();
M.lock();
N.unlock();
}
void highpriounlock(){
M.unlock();
H.lock();
hptwaiting--;
H.unlock();
}
void hpt(const char* s){
using namespace std;
//cout << "hpt trying to get lock here" << endl;
highpriolock();
cout << s << endl;
usleep(30000);
highpriounlock();
}
void lpt(const char* s){
using namespace std;
//cout << "lpt trying to get lock here" << endl;
lowpriolock();
cout << s << endl;
usleep(30000);
lowpriounlock();
}
int main(){
std::thread t0(lpt,"low prio t0 working here");
std::thread t1(lpt,"low prio t1 working here");
std::thread t2(hpt,"high prio t2 working here");
std::thread t3(lpt,"low prio t3 working here");
std::thread t4(lpt,"low prio t4 working here");
std::thread t5(lpt,"low prio t5 working here");
std::thread t6(hpt,"high prio t6 working here");
std::thread t7(lpt,"low prio t7 working here");
std::thread t8(hpt,"high prio t8 working here");
std::thread t9(lpt,"low prio t9 working here");
std::thread t10(lpt,"low prio t10 working here");
std::thread t11(lpt,"low prio t11 working here");
std::thread t12(hpt,"high prio t12 working here");
std::thread t13(lpt,"low prio t13 working here");
//std::cout << "All threads created" << std::endl;
t0.join();
t1.join();
t2.join();
t3.join();
t4.join();
t5.join();
t6.join();
t7.join();
t8.join();
t9.join();
t10.join();
t11.join();
t12.join();
t13.join();
return 0;
}
你怎么看? 可以吗? 的确,一个信号可以处理这种美好的事情,但互斥是更容易管理给我。