pika作為類(lèi)redis的存儲(chǔ)系統(tǒng),為了彌補(bǔ)在性能上的不足,在整個(gè)系統(tǒng)中大量使用多線程的結(jié)構(gòu),涉及到多線程編程,勢(shì)必需要為線程加鎖來(lái)保證數(shù)據(jù)訪問(wèn)的一致性和有效性。其中主要用到了三種鎖
應(yīng)用掛起指令,在掛起指令的執(zhí)行中,會(huì)添加寫(xiě)鎖,以確保,此時(shí)沒(méi)有其他指令執(zhí)行。其他的普通指令在會(huì)添加讀鎖,可以并行訪問(wèn)。 其中掛起指令有:
保證當(dāng)前服務(wù)器在執(zhí)行掛起指令時(shí),起到阻寫(xiě)作用。
行鎖,用于對(duì)一個(gè)key加鎖,保證同一時(shí)間只有一個(gè)線程對(duì)一個(gè)key進(jìn)行操作。
pika中存取的數(shù)據(jù)都是類(lèi)key,value數(shù)據(jù),不同key所對(duì)應(yīng)的數(shù)據(jù)完全獨(dú)立,所以只需要對(duì)key加鎖可以保證數(shù)據(jù)在并發(fā)訪問(wèn)時(shí)的一致性,行鎖相對(duì)來(lái)說(shuō),鎖定粒度小,也可以保證數(shù)據(jù)訪問(wèn)的高效性。
在pika系統(tǒng)中,對(duì)于數(shù)據(jù)庫(kù)的操作都需要添加行鎖,主要在應(yīng)用于兩個(gè)地方,在系統(tǒng)上層指令過(guò)程中和在數(shù)據(jù)引擎層面。在pika系統(tǒng)中,對(duì)于寫(xiě)指令(會(huì)改變數(shù)據(jù)狀態(tài),如SET,HSET)需要除了更新數(shù)據(jù)庫(kù)狀態(tài),還涉及到pika的增量同步,需要在binlog中添加所執(zhí)行的寫(xiě)指令,用于保證master和slave的數(shù)據(jù)庫(kù)狀態(tài)一致。故一條寫(xiě)指令的執(zhí)行,主要有兩個(gè)部分:
其加鎖情況,如下圖:
在圖中可以看到,對(duì)同一個(gè)key,加了兩次行鎖,在實(shí)際應(yīng)用中,pika上所加的鎖就已經(jīng)能夠保證數(shù)據(jù)訪問(wèn)的正確性。如果只是為了pika所需要的業(yè)務(wù),nemo層面使用行鎖是多余的,但是nemo的設(shè)計(jì)初衷就是通過(guò)對(duì)rocksdb的改造和封裝提供一套完整的類(lèi)redis數(shù)據(jù)訪問(wèn)的解決方案,而不僅僅是為pika提供數(shù)據(jù)庫(kù)引擎。這種設(shè)計(jì)思路也是秉承了Unix中的設(shè)計(jì)原則:Write programs that do one thing and do it well。
這樣設(shè)計(jì)大大降低了pika與nemo之間的耦合,也使得nemo可以被單獨(dú)拿出來(lái)測(cè)試和使用,在pika中的數(shù)據(jù)遷移工具就是完全使用nemo來(lái)完成,不必依賴(lài)任何pika相關(guān)的東西。另外對(duì)于nemo感興趣或者有需求的團(tuán)隊(duì)也可以直接將nemo作為數(shù)據(jù)庫(kù)引擎而不需要修改任何代碼就能使用完整的數(shù)據(jù)訪問(wèn)功能。
在pika系統(tǒng)中,一把行鎖就可以維護(hù)所有key。在行鎖的實(shí)現(xiàn)上是將一個(gè)key與一把互斥鎖相綁定,并將其放入哈希表中維護(hù),來(lái)保證每次訪問(wèn)key的線程只有一個(gè),但是不可能也不需要為每一個(gè)key保留一把互斥鎖,只需要當(dāng)有多條線程訪問(wèn)同一個(gè)key時(shí)才需要鎖,在所有線程都訪問(wèn)結(jié)束之后,就可以銷(xiāo)毀這個(gè)綁定key的互斥鎖,釋放資源。具體實(shí)現(xiàn)如下:
class RecordLock {
public:
RecordLock(port::RecordMutex *mu, const std::string &key)
: mu_(mu), key_(key) {
mu_->Lock(key_);
}
~RecordLock() { mu_->Unlock(key_); }
private:
port::RecordMutex *const mu_;
std::string key_;
// No copying allowed
RecordLock(const RecordLock&);
void operator=(const RecordLock&);
};
void RecordMutex::Lock(const std::string &key) {
mutex_.Lock();
std::unordered_map<std::string, RefMutex *>::const_iterator it = records_.find(key);
if (it != records_.end()) {
//log_info ("tid=(%u) >Lock key=(%s) exist, map_size=%u", pthread_self(), key.c_str(), records_.size());
RefMutex *ref_mutex = it->second;
ref_mutex->Ref();
mutex_.Unlock();
ref_mutex->Lock();
//log_info ("tid=(%u) <Lock key=(%s) exist", pthread_self(), key.c_str());
} else {
//log_info ("tid=(%u) >Lock key=(%s) new, map_size=%u ++", pthread_self(), key.c_str(), records_.size());
RefMutex *ref_mutex = new RefMutex();
records_.insert(std::make_pair(key, ref_mutex));
ref_mutex->Ref();
mutex_.Unlock();
ref_mutex->Lock();
//log_info ("tid=(%u) <Lock key=(%s) new", pthread_self(), key.c_str());
}
}
完整代碼可參考:slash_mutex.cc slash_mutex.h
更多建議: