`

MDB记录锁实现细节

 
阅读更多

内存数据库实现中涉及数据的同步一致性问题,可以实现高效的记录锁来尽量减少同步的开销。

下图是一个内存数据库数记录锁的实现分析:




 一个MDB服务端中会包含多个Table,每个Table会包含一个或多个Index键值,操作某张表中数据记录时需要对该表中某个键值加锁,就会锁定该条记录不再被其他线程同时操作。每个MDB服务库中会包含一个内存命名锁管理器(CNamedLockMemMgr),该MDB服务初始化时针对每个Table中的索引键值都创建一个内存命名锁(CNamedLockMem)并返回锁的在命名锁管理器中索引,后续锁操作时指定该索引即可。

命名锁管理器提供的锁操作接口为:

void lock(CLockInfo* pLockInfo, int64 llID, CLockSession& cLockSession )

void lock(CLockInfo* pLockInfo, CFSet<int64>& lstID, CLockSession& cLockSession )

int32 is_lock(CLockInfo* pLockInfo, int64 llID, CLockSession& cLockSession)

void unlock(CLockInfo* pLockInfo, CLockSession& cLockSession )

 

对某个表的键值key加锁过程如下:

1、通过初始化时得到的该表的Index列键值锁信息CLockInfo来找到其位于命名锁管理器中创建的内存命名锁(CNamedLockMem)对象;

2、将键值Key插入找到的CNamedLockMem对象的已锁定键值Set中,调用内部的CNamedLock对象对键值Key加锁;

3、CNamedLock根据建锁信息中指定的m_nLockModule数,创建了相应的多记录锁(CMultiLock)对象,对某个键值Key根据m_nLockModule取模,即可以得到要操作的多记录锁对象。

4、得到多记录锁对象(CMultiLock)进行lock操作,CMultiLock对象内部维护了通过该锁Lock的所有记录的信息,包含锁定的键值、键值当前锁定的状态、当前锁定该键值的线程和事务号、等待该锁的线程数、该锁的加锁时间。

void lock(const T& data, CLockSession& cLockSession )

{

               pthread_mutex_lock( &m_lock); //操作内部锁定记录信息时需互斥加锁

               int nPos = add_data( data);  //如果不存键值Key信息,需要扩展增加锁记录数组

 

               if(m_pLockSession[nPos].m_llThreadID == cLockSession.m_llThreadID && 

                  m_pLockSession[nPos].m_llSessionID == cLockSession.m_llSessionID &&

                  m_pLockStatus[nPos] 

                )

               {//同一个线程和事务已经锁定该记录的处理

                            pthread_mutex_unlock( &m_lock);

 

                           if(CMultiLockAttr::get_recurLockFlag())

                           {

                                          return;

                           }

                           else

                           {

                                         THROW_MFEXCEPTION(MF_ERROR_LOCK_FAILED, "recursive lock failed ");

                             }

                  }

 

                 m_pWanted[nPos]++;  //等待该锁的数量+1

                 timespec   ts;

                 int     iTime = 0;

 

                while( m_pLockStatus[nPos])

                {//该记录已被其他线程或事务锁定

                           if(CMultiLockAttr::get_timedLockFlag())

                           {//超时锁

                                       if(iTime ==0 )

                                      {//设定锁超时时间

                                             clock_gettime(CLOCK_REALTIME, &ts);

                                             ts.tv_sec += CMultiLockAttr::get_lockTime();

                                             iTime ++;

                                      }

 

                                      if(ETIMEDOUT == pthread_cond_timedwait( &m_cond, &m_lock, &ts))

                                      {

                                                 m_pWanted[nPos]--;

                                                 pthread_mutex_unlock( &m_lock);

 

                                                THROW_MFEXCEPTION(MF_ERROR_LOCK_TIMEOUT, "timed lock timedout");

 

                                     }

                         }

                         else

                        {//交出执行权等待解锁事件

                                 pthread_cond_wait( &m_cond, &m_lock);

                         }

                }

               //获取到锁

               m_pLockStatus[nPos] = 1;

               m_pLockSession[nPos].m_llThreadID = cLockSession.m_llThreadID;

               m_pLockSession[nPos].m_llSessionID = cLockSession.m_llSessionID;

               m_pWanted[nPos]--;

               pthread_mutex_unlock( &m_lock);

}

 

void unlock(const T& data, CLockSession& cLockSession )

{

                pthread_mutex_lock( &m_lock);

               int  nPos = find_data( data);

               if( nPos != -1 && 

               ( m_pLockSession[nPos].m_llThreadID == cLockSession.m_llThreadID ||                  cLockSession.m_llThreadID == 0 ) && 

               ( m_pLockSession[nPos].m_llSessionID == cLockSession.m_llSessionID || cLockSession.m_llSessionID == 0 ) )

             {

                       m_pLockStatus[nPos] = 0;

                       m_pLockSession[nPos].m_llThreadID = 0;

                       m_pLockSession[nPos].m_llSessionID = 0;

                       if( m_pWanted[nPos])

                      {//有其他人在等待,广播解锁事件

                                pthread_cond_broadcast( &m_cond);

                      }

             }

 

             pthread_mutex_unlock( &m_lock);

}

 

  • 大小: 81.4 KB
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics