Skip to content
Snippets Groups Projects
Commit 437aabf6 authored by Enric Tejedor Saavedra's avatar Enric Tejedor Saavedra
Browse files

Fix intermittent deadlock between main and consumer threads.

Make sure the consumer thread is properly notified by the main
thread both when there is a pending block and when the prefetching
is over.
parent 1afb3816
No related branches found
No related tags found
No related merge requests found
......@@ -58,12 +58,11 @@ private:
std::mutex fMutexReadList; // mutex for the list of read blocks
std::condition_variable fNewBlockAdded; // signal the addition of a new pending block
std::condition_variable fReadBlockAdded; // signal the addition of a new red block
TSemaphore *fSemMasterWorker; // semaphore used to kill the consumer thread
TSemaphore *fSemWorkerMaster; // semaphore used to notify the master that worker is killed
TSemaphore *fSemChangeFile; // semaphore used when changin a file in TChain
TString fPathCache; // path to the cache directory
TStopwatch fWaitTime; // time wating to prefetch a buffer (in usec)
Bool_t fThreadJoined; // mark if async thread was joined
Bool_t fPrefetchFinished; // true if prefetching is over
static TThread::VoidRtnFunc_t ThreadProc(void*); //create a joinable worker thread
......@@ -97,6 +96,7 @@ public:
void SetFile(TFile*);
std::condition_variable &GetCondNewBlock() { return fNewBlockAdded; };
void WaitFinishPrefetch();
Bool_t IsPrefetchFinished() const { return fPrefetchFinished; }
ClassDef(TFilePrefetch, 0); // File block prefetcher
};
......
......@@ -51,7 +51,8 @@ and must be explicitly enabled by the user.
TFilePrefetch::TFilePrefetch(TFile* file) :
fFile(file),
fConsumer(0),
fThreadJoined(kTRUE)
fThreadJoined(kTRUE),
fPrefetchFinished(kFALSE)
{
fPendingBlocks = new TList();
fReadBlocks = new TList();
......@@ -59,8 +60,6 @@ TFilePrefetch::TFilePrefetch(TFile* file) :
fPendingBlocks->SetOwner();
fReadBlocks->SetOwner();
fSemMasterWorker = new TSemaphore(0);
fSemWorkerMaster = new TSemaphore(0);
fSemChangeFile = new TSemaphore(0);
}
......@@ -76,8 +75,6 @@ TFilePrefetch::~TFilePrefetch()
SafeDelete(fConsumer);
SafeDelete(fPendingBlocks);
SafeDelete(fReadBlocks);
SafeDelete(fSemMasterWorker);
SafeDelete(fSemWorkerMaster);
SafeDelete(fSemChangeFile);
}
......@@ -87,14 +84,16 @@ TFilePrefetch::~TFilePrefetch()
void TFilePrefetch::WaitFinishPrefetch()
{
fSemMasterWorker->Post();
while ( fSemWorkerMaster->Wait(10) != 0 ) {
fNewBlockAdded.notify_one();
// Inform the consumer thread that prefetching is over
{
std::lock_guard<std::mutex> lk(fMutexPendingList);
fPrefetchFinished = kTRUE;
}
fNewBlockAdded.notify_one();
fConsumer->Join();
fThreadJoined=kTRUE;
fThreadJoined = kTRUE;
fPrefetchFinished = kFALSE;
}
......@@ -238,7 +237,8 @@ TFPBlock* TFilePrefetch::GetPendingBlock()
// is changed on the fly by TChain
fSemChangeFile->Post();
std::unique_lock<std::mutex> lk(fMutexPendingList);
fNewBlockAdded.wait(lk);
// Wait unless there is a pending block or prefetching is over
fNewBlockAdded.wait(lk, [&]{ return fPendingBlocks->GetSize() > 0 || fPrefetchFinished; });
lk.unlock();
fSemChangeFile->Wait();
......@@ -358,11 +358,10 @@ TThread::VoidRtnFunc_t TFilePrefetch::ThreadProc(void* arg)
{
TFilePrefetch* pClass = (TFilePrefetch*) arg;
while( pClass->fSemMasterWorker->TryWait() != 0 ) {
while (!pClass->IsPrefetchFinished()) {
pClass->ReadListOfBlocks();
}
pClass->fSemWorkerMaster->Post();
return (TThread::VoidRtnFunc_t) 1;
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment