Added support for I/O completion callbacks.

git-svn-id: https://svn.code.sf.net/p/flaim/code/trunk@807 0109f412-320b-0410-ab79-c3e0c5ffbbe6
This commit is contained in:
ahodgkinson
2006-09-01 19:45:34 +00:00
parent a8713bba98
commit 3a0aefb321
21 changed files with 625 additions and 257 deletions

View File

@@ -61,6 +61,10 @@ public:
private:
F_MUTEX m_hMutex;
#ifndef FLM_UNIX
F_SEM m_hAvailSem;
#endif
FLMUINT m_uiMaxBuffers;
FLMUINT m_uiMaxBufferBytes;
FLMUINT m_uiTotalBuffers;
@@ -69,6 +73,7 @@ private:
F_IOBuffer * m_pFirstAvail;
F_IOBuffer * m_pFirstUsed;
FLMBOOL m_bReuseBuffers;
F_NOTIFY_LIST_ITEM * m_pAvailNotify;
RCODE m_completionRc;
friend class F_IOBuffer;
@@ -116,6 +121,11 @@ Desc:
****************************************************************************/
F_IOBufferMgr::F_IOBufferMgr()
{
m_hMutex = F_MUTEX_NULL;
#ifndef FLM_UNIX
m_hAvailSem = F_SEM_NULL;
#endif
m_uiMaxBuffers = 0;
m_uiMaxBufferBytes = 0;
@@ -126,6 +136,7 @@ F_IOBufferMgr::F_IOBufferMgr()
m_pFirstAvail = NULL;
m_pFirstUsed = NULL;
m_pAvailNotify = NULL;
m_bReuseBuffers = FALSE;
m_completionRc = NE_FLM_OK;
}
@@ -137,11 +148,24 @@ F_IOBufferMgr::~F_IOBufferMgr()
{
f_assert( !m_pFirstPending);
f_assert( !m_pFirstUsed);
f_assert( !m_pAvailNotify);
while( m_pFirstAvail)
{
m_pFirstAvail->Release();
}
if( m_hMutex != F_MUTEX_NULL)
{
f_mutexDestroy( &m_hMutex);
}
#ifndef FLM_UNIX
if( m_hAvailSem != F_SEM_NULL)
{
f_semDestroy( &m_hAvailSem);
}
#endif
}
/****************************************************************************
@@ -152,14 +176,30 @@ RCODE F_IOBufferMgr::setupBufferMgr(
FLMUINT uiMaxBytes,
FLMBOOL bReuseBuffers)
{
RCODE rc = NE_FLM_OK;
f_assert( uiMaxBuffers);
f_assert( uiMaxBytes);
if( RC_BAD( rc = f_mutexCreate( &m_hMutex)))
{
goto Exit;
}
#ifndef FLM_UNIX
if( RC_BAD( rc = f_semCreate( &m_hAvailSem)))
{
goto Exit;
}
#endif
m_uiMaxBuffers = uiMaxBuffers;
m_uiMaxBufferBytes = uiMaxBytes;
m_bReuseBuffers = bReuseBuffers;
return( NE_FLM_OK);
Exit:
return( rc);
}
/****************************************************************************
@@ -171,6 +211,7 @@ RCODE FLMAPI F_IOBufferMgr::getBuffer(
{
RCODE rc = NE_FLM_OK;
F_IOBuffer * pIOBuffer = NULL;
FLMBOOL bMutexLocked = FALSE;
f_assert( *ppIOBuffer == NULL);
@@ -180,6 +221,9 @@ RCODE FLMAPI F_IOBufferMgr::getBuffer(
goto Exit;
}
f_mutexLock( m_hMutex);
bMutexLocked = TRUE;
Retry:
if( m_pFirstAvail)
@@ -215,11 +259,31 @@ Retry:
}
else if( m_pFirstPending)
{
if( RC_BAD( rc = m_pFirstPending->waitToComplete()))
#ifndef FLM_UNIX
if( RC_BAD( rc = f_notifyWait( m_hMutex, m_hAvailSem,
NULL, &m_pAvailNotify)))
{
goto Exit;
}
#else
F_IOBuffer * pPending = m_pFirstPending;
pPending->AddRef();
f_mutexUnlock( m_hMutex);
bMutexLocked = FALSE;
rc = pPending->waitToComplete();
f_mutexLock( m_hMutex);
bMutexLocked = TRUE;
pPending->Release();
if( RC_BAD( rc))
{
goto Exit;
}
#endif
goto Retry;
}
else
@@ -240,6 +304,11 @@ Exit:
pIOBuffer->Release();
}
if( bMutexLocked)
{
f_mutexUnlock( m_hMutex);
}
return( rc);
}
@@ -248,24 +317,49 @@ Desc:
****************************************************************************/
RCODE FLMAPI F_IOBufferMgr::waitForAllPendingIO( void)
{
RCODE rc;
RCODE rc = NE_FLM_OK;
RCODE tmpRc;
F_IOBuffer * pBuf;
FLMBOOL bMutexLocked = FALSE;
f_mutexLock( m_hMutex);
bMutexLocked = TRUE;
while( (pBuf = m_pFirstPending) != NULL)
{
pBuf->AddRef();
f_mutexUnlock( m_hMutex);
bMutexLocked = FALSE;
if( RC_BAD( tmpRc = pBuf->waitToComplete()))
{
if( RC_OK( m_completionRc))
{
f_mutexLock( m_hMutex);
bMutexLocked = TRUE;
m_completionRc = tmpRc;
}
}
if( !bMutexLocked)
{
f_mutexLock( m_hMutex);
bMutexLocked = TRUE;
}
pBuf->Release( TRUE);
pBuf = NULL;
}
rc = m_completionRc;
m_completionRc = NE_FLM_OK;
if( bMutexLocked)
{
f_mutexUnlock( m_hMutex);
}
return( rc);
}
@@ -276,6 +370,7 @@ void F_IOBufferMgr::linkToList(
F_IOBuffer ** ppListHead,
F_IOBuffer * pIOBuffer)
{
f_assertMutexLocked( m_hMutex);
f_assert( pIOBuffer->m_eList == MGR_LIST_NONE);
pIOBuffer->m_pPrev = NULL;
@@ -287,11 +382,14 @@ void F_IOBufferMgr::linkToList(
*ppListHead = pIOBuffer;
if( ppListHead == &m_pFirstPending || ppListHead == &m_pFirstUsed)
if( ppListHead == &m_pFirstPending)
{
pIOBuffer->m_eList = (ppListHead == &m_pFirstPending
? MGR_LIST_PENDING
: MGR_LIST_USED);
f_assert( !pIOBuffer->m_bPending);
pIOBuffer->m_eList = MGR_LIST_PENDING;
}
else if( ppListHead == &m_pFirstUsed)
{
pIOBuffer->m_eList = MGR_LIST_USED;
}
else
{
@@ -305,6 +403,8 @@ Desc:
void F_IOBufferMgr::unlinkFromList(
F_IOBuffer * pIOBuffer)
{
f_assertMutexLocked( m_hMutex);
if( pIOBuffer->m_pNext)
{
pIOBuffer->m_pNext->m_pPrev = pIOBuffer->m_pPrev;
@@ -333,21 +433,37 @@ void F_IOBufferMgr::unlinkFromList(
/****************************************************************************
Desc:
****************************************************************************/
FLMINT FLMAPI F_IOBuffer::Release( void)
FLMINT F_IOBuffer::Release(
FLMBOOL bMutexAlreadyLocked)
{
FLMINT iRefCnt;
FLMINT iRefCnt;
F_MUTEX hMutex = F_MUTEX_NULL;
F_IOBufferMgr * pBufferMgr = NULL;
if( m_pBufferMgr && !bMutexAlreadyLocked)
{
hMutex = m_pBufferMgr->m_hMutex;
f_mutexLock( hMutex);
}
if( m_refCnt <= 2)
{
if( m_pBufferMgr && m_eList != MGR_LIST_NONE)
{
f_assert( m_eList != MGR_LIST_PENDING);
m_pBufferMgr->unlinkFromList( this);
}
}
if( m_refCnt == 2)
{
if( m_pBufferMgr)
if( m_pAsyncClient)
{
m_pAsyncClient->Release();
m_pAsyncClient = NULL;
}
if( (pBufferMgr = m_pBufferMgr) != NULL)
{
if( m_pBufferMgr->m_bReuseBuffers)
{
@@ -358,15 +474,26 @@ FLMINT FLMAPI F_IOBuffer::Release( void)
f_assert( m_pBufferMgr->m_uiTotalBuffers);
f_assert( m_pBufferMgr->m_uiTotalBufferBytes >= m_uiBufferSize);
m_refCnt--;
f_atomicDec( &m_refCnt);
m_pBufferMgr->m_uiTotalBuffers--;
m_pBufferMgr->m_uiTotalBufferBytes -= m_uiBufferSize;
m_pBufferMgr = NULL;
}
if( pBufferMgr->m_pAvailNotify)
{
f_notifySignal( pBufferMgr->m_pAvailNotify, NE_FLM_OK);
pBufferMgr->m_pAvailNotify = NULL;
}
}
}
iRefCnt = --m_refCnt;
iRefCnt = f_atomicDec( &m_refCnt);
if( hMutex != F_MUTEX_NULL)
{
f_mutexUnlock( hMutex);
}
if( !iRefCnt)
{
@@ -406,16 +533,24 @@ void FLMAPI F_IOBuffer::setPending( void)
{
f_assert( !m_bPending);
m_bPending = TRUE;
m_uiStartTime = FLM_GET_TIMER();
if( m_pBufferMgr)
{
f_assert( m_eList == MGR_LIST_USED);
f_mutexLock( m_pBufferMgr->m_hMutex);
m_pBufferMgr->unlinkFromList( this);
m_pBufferMgr->linkToList( &m_pBufferMgr->m_pFirstPending, this);
f_mutexUnlock( m_pBufferMgr->m_hMutex);
}
#ifndef FLM_UNIX
f_assert( !m_pAsyncClient ||
f_semGetSignalCount( ((F_FileAsyncClient *)m_pAsyncClient)->m_hSem) == 0);
#endif
m_bPending = TRUE;
m_uiStartTime = FLM_GET_TIMER();
m_uiEndTime = 0;
}
/****************************************************************************
@@ -425,16 +560,18 @@ void FLMAPI F_IOBuffer::clearPending( void)
{
f_assert( m_bPending);
m_bPending = FALSE;
m_uiStartTime = 0;
if( m_pBufferMgr)
{
f_assert( m_eList == MGR_LIST_PENDING);
f_mutexLock( m_pBufferMgr->m_hMutex);
m_pBufferMgr->unlinkFromList( this);
m_pBufferMgr->linkToList( &m_pBufferMgr->m_pFirstUsed, this);
f_mutexUnlock( m_pBufferMgr->m_hMutex);
}
m_bPending = FALSE;
m_uiStartTime = 0;
}
/****************************************************************************
@@ -458,10 +595,11 @@ void F_IOBuffer::notifyComplete(
m_fnCompletion = NULL;
m_pvData = NULL;
}
if( m_pBufferMgr)
{
f_assert( m_eList == MGR_LIST_PENDING);
f_mutexLock( m_pBufferMgr->m_hMutex);
m_pBufferMgr->unlinkFromList( this);
m_pBufferMgr->linkToList( &m_pBufferMgr->m_pFirstUsed, this);
@@ -470,6 +608,8 @@ void F_IOBuffer::notifyComplete(
{
m_pBufferMgr->m_completionRc = completionRc;
}
f_mutexUnlock( m_pBufferMgr->m_hMutex);
}
}