From 92f1f5029483fdad12eff6f46bf3ab633711029b Mon Sep 17 00:00:00 2001 From: Vassil Vassilev <vvasilev@cern.ch> Date: Wed, 19 Jul 2017 22:49:41 +0200 Subject: [PATCH] Teach TFileMerger to work with TFile* handles. This patch allows TFileMerger to work with externally created TFile-s. Being able to control the creation of the TFile objects give us a chance to use in-memory files. This is very helpful in benchmarking when we want to simulate fast disks or we just want to avoid disk wearout. --- io/io/inc/ROOT/TBufferMerger.hxx | 14 ++++++--- io/io/inc/TFileMerger.h | 3 ++ io/io/src/TBufferMerger.cxx | 25 ++++++++++++--- io/io/src/TBufferMergerFile.cxx | 2 +- io/io/src/TFileMerger.cxx | 31 ++++++++++++++----- io/io/test/CMakeLists.txt | 2 +- io/io/test/TBufferMerger.cxx | 1 + io/io/test/TFileMergerTests.cxx | 52 ++++++++++++++++++++++++++++++++ 8 files changed, 113 insertions(+), 17 deletions(-) create mode 100644 io/io/test/TFileMergerTests.cxx diff --git a/io/io/inc/ROOT/TBufferMerger.hxx b/io/io/inc/ROOT/TBufferMerger.hxx index f24ca623504..a57e64ec04d 100644 --- a/io/io/inc/ROOT/TBufferMerger.hxx +++ b/io/io/inc/ROOT/TBufferMerger.hxx @@ -22,6 +22,7 @@ #include <thread> class TBufferFile; +class TFile; namespace ROOT { namespace Experimental { @@ -50,6 +51,11 @@ public: */ TBufferMerger(const char *name, Option_t *option = "RECREATE", Int_t compress = 1); + /** Constructor + * @param output Output \c TFile + */ + TBufferMerger(std::unique_ptr<TFile> output); + /** Destructor */ virtual ~TBufferMerger(); @@ -100,12 +106,12 @@ private: /** TBufferMerger has no copy operator */ TBufferMerger &operator=(const TBufferMerger &); + void Init(std::unique_ptr<TFile>); + void Push(TBufferFile *buffer); void WriteOutputFile(); - const std::string fName; - const std::string fOption; - const Int_t fCompress; + TFile* fFile; //< Output file. size_t fAutoSave; //< AutoSave only every fAutoSave bytes std::mutex fQueueMutex; //< Mutex used to lock fQueue std::condition_variable fDataAvailable; //< Condition variable used to wait for data @@ -114,7 +120,7 @@ private: std::vector<std::weak_ptr<TBufferMergerFile>> fAttachedFiles; //< Attached files std::function<void(void)> fCallback; //< Callback for when data is removed from queue - ClassDef(TBufferMerger, 0); + ClassDef(TBufferMerger, 1); }; /** diff --git a/io/io/inc/TFileMerger.h b/io/io/inc/TFileMerger.h index 57e87bf6a1c..33c288f4b1d 100644 --- a/io/io/inc/TFileMerger.h +++ b/io/io/inc/TFileMerger.h @@ -16,6 +16,8 @@ #include "TString.h" #include "TStopwatch.h" +#include <memory> + class TList; class TFile; class TDirectory; @@ -97,6 +99,7 @@ public: virtual Bool_t OutputFile(const char *url, Bool_t force, Int_t compressionLevel); virtual Bool_t OutputFile(const char *url, const char *mode = "RECREATE"); virtual Bool_t OutputFile(const char *url, const char *mode, Int_t compressionLevel); + virtual Bool_t OutputFile(std::unique_ptr<TFile> file); virtual void PrintFiles(Option_t *options); virtual Bool_t Merge(Bool_t = kTRUE); virtual Bool_t PartialMerge(Int_t type = kAll | kIncremental); diff --git a/io/io/src/TBufferMerger.cxx b/io/io/src/TBufferMerger.cxx index fcfa7e6b6bc..d3c0f77155e 100644 --- a/io/io/src/TBufferMerger.cxx +++ b/io/io/src/TBufferMerger.cxx @@ -21,9 +21,26 @@ namespace ROOT { namespace Experimental { TBufferMerger::TBufferMerger(const char *name, Option_t *option, Int_t compress) - : fName(name), fOption(option), fCompress(compress), fAutoSave(0), - fMergingThread(new std::thread([&]() { this->WriteOutputFile(); })) { + // NOTE: We cannot use ctor chaining or in-place initialization because we want this operation to have no effect on + // ROOT's gDirectory. + TDirectory::TContext ctxt; + if (TFile *output = TFile::Open(name, option, /*title*/ name, compress)) + Init(std::unique_ptr<TFile>(output)); + else + Error("OutputFile", "cannot open the MERGER output file %s", name); +} + +TBufferMerger::TBufferMerger(std::unique_ptr<TFile> output) +{ + Init(std::move(output)); +} + +void TBufferMerger::Init(std::unique_ptr<TFile> output) +{ + fFile = output.release(); + fAutoSave = 0; + fMergingThread.reset(new std::thread([&]() { this->WriteOutputFile(); })); } TBufferMerger::~TBufferMerger() @@ -84,7 +101,7 @@ void TBufferMerger::WriteOutputFile() { R__LOCKGUARD(gROOTMutex); - merger.OutputFile(fName.c_str(), fOption.c_str(), fCompress); + merger.OutputFile(std::unique_ptr<TFile>(fFile)); } while (true) { @@ -106,7 +123,7 @@ void TBufferMerger::WriteOutputFile() { R__LOCKGUARD(gROOTMutex); - memfiles.push_back(new TMemFile(fName.c_str(), buffer->Buffer() + buffer->Length(), length, "read")); + memfiles.push_back(new TMemFile(fFile->GetName(), buffer->Buffer() + buffer->Length(), length, "read")); buffer->SetBufferOffset(buffer->Length() + length); merger.AddFile(memfiles.back(), false); diff --git a/io/io/src/TBufferMergerFile.cxx b/io/io/src/TBufferMergerFile.cxx index 52718efffb1..192b573d73a 100644 --- a/io/io/src/TBufferMergerFile.cxx +++ b/io/io/src/TBufferMergerFile.cxx @@ -18,7 +18,7 @@ namespace ROOT { namespace Experimental { TBufferMergerFile::TBufferMergerFile(TBufferMerger &m) - : TMemFile(m.fName.c_str(), "recreate", "", m.fCompress), fMerger(m) + : TMemFile(m.fFile->GetName(), "recreate", "", m.fFile->GetCompressionSettings()), fMerger(m) { } diff --git a/io/io/src/TFileMerger.cxx b/io/io/src/TFileMerger.cxx index c2609fe9d91..0323e3f34e7 100644 --- a/io/io/src/TFileMerger.cxx +++ b/io/io/src/TFileMerger.cxx @@ -311,20 +311,37 @@ Bool_t TFileMerger::OutputFile(const char *outputfile, Bool_t force) Bool_t TFileMerger::OutputFile(const char *outputfile, const char *mode, Int_t compressionLevel) { + // We want gDirectory untouched by anything going on here + TDirectory::TContext ctxt; + if (TFile *outputFile = TFile::Open(outputfile, mode, "", compressionLevel)) + return OutputFile(std::unique_ptr<TFile>(outputFile)); + + Error("OutputFile", "cannot open the MERGER output file %s", fOutputFilename.Data()); + return kFALSE; +} + +//////////////////////////////////////////////////////////////////////////////// +/// Set an output file opened externally by the users + +Bool_t TFileMerger::OutputFile(std::unique_ptr<TFile> outputfile) +{ + if (!outputfile || outputfile->IsZombie()) { + Error("OutputFile", "cannot open the MERGER output file %s", (outputfile) ? outputfile->GetName() : ""); + return kFALSE; + } + fExplicitCompLevel = kTRUE; TFile *oldfile = fOutputFile; - fOutputFile = 0; // This avoids the complaint from RecursiveRemove about the file being deleted which is here spurrious. (see RecursiveRemove). + fOutputFile = 0; // This avoids the complaint from RecursiveRemove about the file being deleted which is here + // spurrious. (see RecursiveRemove). SafeDelete(oldfile); - fOutputFilename = outputfile; - + fOutputFilename = outputfile->GetName(); // We want gDirectory untouched by anything going on here TDirectory::TContext ctxt; - if (!(fOutputFile = TFile::Open(outputfile, mode, "", compressionLevel)) || fOutputFile->IsZombie()) { - Error("OutputFile", "cannot open the MERGER output file %s", fOutputFilename.Data()); - return kFALSE; - } + fOutputFile = outputfile.release(); // Transfer the ownership of the file. + return kTRUE; } diff --git a/io/io/test/CMakeLists.txt b/io/io/test/CMakeLists.txt index b1bc38fa164..f51da4927fa 100644 --- a/io/io/test/CMakeLists.txt +++ b/io/io/test/CMakeLists.txt @@ -1 +1 @@ -ROOT_ADD_GTEST(testTBufferMerger TBufferMerger.cxx LIBRARIES RIO Tree) +ROOT_ADD_GTEST(IOTests TBufferMerger.cxx TFileMergerTests.cxx LIBRARIES RIO Tree) diff --git a/io/io/test/TBufferMerger.cxx b/io/io/test/TBufferMerger.cxx index 5a0916f6bbc..4707fa987f7 100644 --- a/io/io/test/TBufferMerger.cxx +++ b/io/io/test/TBufferMerger.cxx @@ -196,6 +196,7 @@ TEST(TBufferMerger, CheckTreeFillResults) { // sum of all branch values in parallel mode TFile f("tbuffermerger_parallel.root"); auto t = (TTree *)f.Get("mytree"); + ASSERT_TRUE(t != nullptr); int n, sum = 0; int nentries = (int)t->GetEntries(); diff --git a/io/io/test/TFileMergerTests.cxx b/io/io/test/TFileMergerTests.cxx new file mode 100644 index 00000000000..be66c6c7986 --- /dev/null +++ b/io/io/test/TFileMergerTests.cxx @@ -0,0 +1,52 @@ +#include "TFileMerger.h" + +#include "TMemFile.h" +#include "TTree.h" + +#include "gtest/gtest.h" + +static void CreateATuple(TMemFile &file, const char *name, double value) +{ + auto mytree = new TTree(name, "A tree"); + mytree->SetDirectory(&file); + mytree->Branch(name, &value); + mytree->Fill(); + file.Write(); +} + +static void CheckTree(TMemFile &file, const char *name, double expectedValue) +{ + auto t = static_cast<TTree *>(file.Get(name)); + ASSERT_TRUE(t != nullptr); + + double d; + t->SetBranchAddress(name, &d); + t->GetEntry(0); + EXPECT_EQ(expectedValue, d); + // Setting branch address to a stack address requires to either call ResetBranchAddresses or delete the tree. + t->ResetBranchAddresses(); +} + +TEST(TFileMerger, CreateWithTFilePointer) +{ + TMemFile a("a.root", "CREATE"); + CreateATuple(a, "a_tree", 1.); + + // FIXME: Calling this out of order causes two values to be written to the second file. + TMemFile b("b.root", "CREATE"); + CreateATuple(b, "b_tree", 2.); + + TFileMerger merger; + auto output = std::unique_ptr<TMemFile>(new TMemFile("output.root", "CREATE")); + output->ResetBit(kMustCleanup); + merger.OutputFile(std::move(output)); + + merger.AddFile(&a, false); + merger.AddFile(&b, false); + // FIXME: Calling merger.Merge() will call Close() and *delete* output. + merger.PartialMerge(); + + auto &result = *static_cast<TMemFile *>(merger.GetOutputFile()); + CheckTree(result, "a_tree", 1); + CheckTree(result, "b_tree", 2); +} -- GitLab