Skip to content

Commit

Permalink
Updated event sources to use JANA2 Emit function instead of GetEvent,…
Browse files Browse the repository at this point in the history
… fixing EOF crash (#834)
  • Loading branch information
RaiqaRasool committed Aug 30, 2024
1 parent 388fcec commit 4882f8e
Show file tree
Hide file tree
Showing 12 changed files with 80 additions and 66 deletions.
26 changes: 15 additions & 11 deletions src/libraries/DAQ/JEventSource_EVIO.cc
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ set<uint32_t> ROCIDS_TO_PARSE;
// If EVIO support is not available, define dummy methods
#ifndef HAVE_EVIO
JEventSource_EVIO::JEventSource_EVIO(const char* source_name):JEventSource(source_name){
SetCallbackStyle(CallbackStyle::ExpertMode);
cerr << endl;
cerr << "You are trying to use code requiring EVIO when support" << endl;
cerr << "for EVIO was not built into this binary. Set your" << endl;
Expand All @@ -75,9 +76,9 @@ JEventSource_EVIO::JEventSource_EVIO(const char* source_name):JEventSource(sourc
exit(-1);
}
JEventSource_EVIO::~JEventSource_EVIO(){}
jerror_t JEventSource_EVIO::GetEvent(jana::JEvent &event){return NOERROR;}
// jerror_t JEventSource_EVIO::GetEvent(jana::JEvent &event){return NOERROR;}
void JEventSource_EVIO::FreeEvent(jana::JEvent &event){}
jerror_t JEventSource_EVIO::GetObjects(jana::JEvent &event, jana::JFactory_base *factory){return NOERROR;}
// jerror_t JEventSource_EVIO::GetObjects(jana::JEvent &event, jana::JFactory_base *factory){return NOERROR;}
jerror_t JEventSource_EVIO::ReadEVIOEvent(uint32_t* &buf){return NOERROR;}
//:::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::

Expand All @@ -89,6 +90,7 @@ jerror_t JEventSource_EVIO::ReadEVIOEvent(uint32_t* &buf){return NOERROR;}
JEventSource_EVIO::JEventSource_EVIO(std::string source_name, JApplication* app):JEventSource(source_name)
{
// Initialize connection objects and flags to NULL
SetCallbackStyle(CallbackStyle::ExpertMode);
Nunparsed = 0;
no_more_events_in_source = false;
et_connected = false;
Expand Down Expand Up @@ -597,11 +599,12 @@ void JEventSource_EVIO::Cleanup(void)
}

//----------------
// GetEvent
// Emit
//----------------
void JEventSource_EVIO::GetEvent(std::shared_ptr<JEvent> event)
JEventSource::Result JEventSource_EVIO::Emit(JEvent& event)

{
auto app = event->GetJApplication();
auto app = event.GetJApplication();
DStatusBits* statusBits = new DStatusBits;

if(VERBOSE>1) evioout << "GetEvent called for &event = " << hex << &event << dec << endl;
Expand Down Expand Up @@ -669,7 +672,7 @@ void JEventSource_EVIO::GetEvent(std::shared_ptr<JEvent> event)
pthread_mutex_lock(&stored_events_mutex);
if(stored_events.empty()){
pthread_mutex_unlock(&stored_events_mutex);
throw RETURN_STATUS::kNO_MORE_EVENTS;
return Result::FailureFinished;
}

objs_ptr = stored_events.front();
Expand Down Expand Up @@ -709,10 +712,10 @@ void JEventSource_EVIO::GetEvent(std::shared_ptr<JEvent> event)
// Store a pointer to the ObjList object for this event in the
// JEvent as the Reference value. Parsing will be done later
// in GetObjects() -> ParseEvents() using the eviobuff pointer.
event->SetJEventSource(this);
event->SetEventNumber((uint64_t)objs_ptr->event_number);
event->SetRunNumber(objs_ptr->run_number);
event->Insert(objs_ptr);
event.SetJEventSource(this);
event.SetEventNumber((uint64_t)objs_ptr->event_number);
event.SetRunNumber(objs_ptr->run_number);
event.Insert(objs_ptr);
statusBits->SetStatusBit(kSTATUS_EVIO);
if( source_type == kFileSource ) statusBits->SetStatusBit(kSTATUS_FROM_FILE);
if( source_type == kETSource ) statusBits->SetStatusBit(kSTATUS_FROM_ET);
Expand All @@ -721,10 +724,11 @@ void JEventSource_EVIO::GetEvent(std::shared_ptr<JEvent> event)

// EPICS and BOR events are barrier events
if(statusBits->GetStatusBit(kSTATUS_EPICS_EVENT) || statusBits->GetStatusBit(kSTATUS_BOR_EVENT) ){
event->SetSequential(true);
event.SetSequential(true);
}

Nevents_read++;
return Result::Success;
}

//----------------
Expand Down
2 changes: 1 addition & 1 deletion src/libraries/DAQ/JEventSource_EVIO.h
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ class JEventSource_EVIO: public JEventSource {
JEventSource_EVIO(std::string source_name, JApplication* app);
virtual ~JEventSource_EVIO();

void GetEvent(std::shared_ptr<JEvent> event) override;
Result Emit(JEvent& event) override;
bool GetObjects(const std::shared_ptr<const JEvent> &event, JFactory *factory) override;
void FinishEvent(JEvent &event) override;

Expand Down
21 changes: 12 additions & 9 deletions src/libraries/DAQ/JEventSource_EVIOpp.cc
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ bool sortf250pulsenumbers(const Df250PulseData *a, const Df250PulseData *b) {
//----------------
JEventSource_EVIOpp::JEventSource_EVIOpp(std::string source_name):JEventSource(source_name)
{
SetCallbackStyle(CallbackStyle::ExpertMode);
DONE = false;
DISPATCHER_END = false;
NEVENTS_PROCESSED = 0;
Expand Down Expand Up @@ -566,9 +567,9 @@ jerror_t JEventSource_EVIOpp::SkipEVIOBlocks(uint32_t N)
}

//----------------
// GetEvent
// Emit
//----------------
void JEventSource_EVIOpp::GetEvent(std::shared_ptr<JEvent> event)
JEventSource::Result JEventSource_EVIOpp::Emit(JEvent& event)
{
// Get next event from list, waiting if necessary
unique_lock<std::mutex> lck(PARSED_EVENTS_MUTEX);
Expand All @@ -590,7 +591,7 @@ void JEventSource_EVIOpp::GetEvent(std::shared_ptr<JEvent> event)
// auto it = in_progess_events.find(Ncalls_to_GetEvent);
// if( it != in_progess_events.end() )in_progess_events.erase(it);
// pthread_mutex_unlock(&in_progress_mutex);
throw RETURN_STATUS::kNO_MORE_EVENTS;
return Result::FailureFinished;
}
NEVENTBUFF_STALLED++;
PARSED_EVENTS_CV.wait_for(lck,std::chrono::milliseconds(1));
Expand All @@ -609,9 +610,9 @@ void JEventSource_EVIOpp::GetEvent(std::shared_ptr<JEvent> event)
if(pe->borptrs) borptrs_list.push_front(pe->borptrs);

// Copy info for this parsed event into the JEvent
event->SetEventNumber(pe->event_number);
event->SetRunNumber(USER_RUN_NUMBER>0 ? USER_RUN_NUMBER:pe->run_number);
event->Insert(pe)->SetFactoryFlag(JFactory::NOT_OBJECT_OWNER); // Returned to pool in FinishEvent
event.SetEventNumber(pe->event_number);
event.SetRunNumber(USER_RUN_NUMBER>0 ? USER_RUN_NUMBER:pe->run_number);
event.Insert(pe)->SetFactoryFlag(JFactory::NOT_OBJECT_OWNER); // Returned to pool in FinishEvent

// Set event status bits
DStatusBits* statusBits = new DStatusBits;
Expand All @@ -621,15 +622,17 @@ void JEventSource_EVIOpp::GetEvent(std::shared_ptr<JEvent> event)
if( source_type == kFileSource ) statusBits->SetStatusBit(kSTATUS_FROM_FILE);
if( source_type == kETSource ) statusBits->SetStatusBit(kSTATUS_FROM_ET);

event->Insert(statusBits);
event.Insert(statusBits);

// EPICS and BOR events are barrier events
if(statusBits->GetStatusBit(kSTATUS_EPICS_EVENT)) event->SetSequential(true);
if(statusBits->GetStatusBit(kSTATUS_BOR_EVENT )) event->SetSequential(true);
if(statusBits->GetStatusBit(kSTATUS_EPICS_EVENT)) event.SetSequential(true);
if(statusBits->GetStatusBit(kSTATUS_BOR_EVENT )) event.SetSequential(true);

// Only add BOR events to physics events
if(pe->borptrs==NULL)
if(!borptrs_list.empty()) pe->borptrs = borptrs_list.front();

return Result::Success;
}

//----------------
Expand Down
2 changes: 1 addition & 1 deletion src/libraries/DAQ/JEventSource_EVIOpp.h
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ class JEventSource_EVIOpp: public JEventSource{
void Dispatcher(void);
jerror_t SkipEVIOBlocks(uint32_t N);

void GetEvent(std::shared_ptr<JEvent> event) override;
Result Emit(JEvent& event) override;
void FinishEvent(JEvent &event) override;
bool GetObjects(const std::shared_ptr<const JEvent> &event, JFactory* factory) override;

Expand Down
24 changes: 13 additions & 11 deletions src/libraries/EVENTSTORE/DEventSourceEventStore.cc
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ static bool TEST_MODE = false;
//---------------------------------
DEventSourceEventStore::DEventSourceEventStore(std::string source_name):JEventSource(source_name) {
SetTypeName("DEventSourceEventStore");
SetCallbackStyle(CallbackStyle::ExpertMode);
}

//---------------------------------
Expand Down Expand Up @@ -269,26 +270,26 @@ DEventSourceEventStore::~DEventSourceEventStore()
}

//---------------------------------
// GetEvent
// Emit
//---------------------------------
void DEventSourceEventStore::GetEvent(std::shared_ptr<JEvent> event)
JEventSource::Result DEventSourceEventStore::Emit(JEvent& event)
{

auto statusBits = new DStatusBits;
statusBits->SetStatusBit(kSTATUS_FROM_FILE);
statusBits->SetStatusBit(kSTATUS_PHYSICS_EVENT);
event->Insert(statusBits);
event.Insert(statusBits);

// FOR DEBUGGING - EMIT EVENTS FOREVER
if(TEST_MODE) {
// output some fake event with skim information
event->SetEventNumber(1);
event->SetRunNumber(10000);
event->SetJEventSource(this);
event.SetEventNumber(1);
event.SetRunNumber(10000);
event.SetJEventSource(this);
//event.SetRef(NULL);

DEventStoreEvent *the_es_event = new DEventStoreEvent();
event->Insert(the_es_event);
event.Insert(the_es_event);
for(int i=0; i<4; i++)
if(gRandom->Uniform() < 0.5)
the_es_event->Add_Skim(skim_list[i]);
Expand All @@ -298,15 +299,15 @@ void DEventSourceEventStore::GetEvent(std::shared_ptr<JEvent> event)
while(event_source == NULL) {
while(OpenNextFile() != NOERROR) {} // keep trying to open files until none are left
if(event_source == NULL)
throw RETURN_STATUS::kNO_MORE_EVENTS;
return Result::FailureFinished;

// skip to next event
jerror_t retval;
if( (retval = MoveToNextEvent()) != NOERROR)
throw RETURN_STATUS::kERROR; // if we can't get to another event, then we're done
return Result::FailureFinished; // if we can't get to another event, then we're done

// read the next event in
event_source->GetEvent(event);
event_source->Emit(event);
if(retval == NOERROR) {
// To store the skim and other EventStore information for the event
// we wrap the actual event data and store our information in the wrapper
Expand All @@ -316,7 +317,7 @@ void DEventSourceEventStore::GetEvent(std::shared_ptr<JEvent> event)
// TODO: NWB: Uncomment previous line. Problem: `Ref` is untyped, and our sources now Insert()
// a typed object which `GetRef` can't find. To fix this, we need a "Ref" container
// which _all_ of our eventsources can insert.
event->Insert(the_es_event);
event.Insert(the_es_event);

// tag event with skims
} else if(retval == NO_MORE_EVENTS_IN_SOURCE) {
Expand All @@ -325,6 +326,7 @@ void DEventSourceEventStore::GetEvent(std::shared_ptr<JEvent> event)
event_source = NULL;
}
}
return Result::Success;
}

//---------------------------------
Expand Down
2 changes: 1 addition & 1 deletion src/libraries/EVENTSTORE/DEventSourceEventStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ class DEventSourceEventStore : public JEventSource {
~DEventSourceEventStore() override;

void Open() override;
void GetEvent(std::shared_ptr<JEvent> event) override;
Result Emit(JEvent& event) override;
void FinishEvent(JEvent &event) override;
bool GetObjects(const std::shared_ptr<const JEvent>& event, JFactory* factory) override;

Expand Down
23 changes: 13 additions & 10 deletions src/libraries/HDDM/DEventSourceHDDM.cc
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ DEventSourceHDDM::DEventSourceHDDM(const char* source_name)
: JEventSource(source_name)
{
/// Constructor for DEventSourceHDDM object
SetCallbackStyle(CallbackStyle::ExpertMode);
ifs = new ifstream(source_name);
ifs->get();
ifs->unget();
Expand Down Expand Up @@ -122,14 +123,15 @@ DEventSourceHDDM::~DEventSourceHDDM()
}

//----------------
// GetEvent
// Emit
//----------------
void DEventSourceHDDM::GetEvent(std::shared_ptr<JEvent> event)
JEventSource::Result DEventSourceHDDM::Emit(JEvent& event)

{
/// Implementation of JEventSource virtual function

if (!fin)
return; // EVENT_SOURCE_NOT_OPEN;
return Result::FailureFinished; // EVENT_SOURCE_NOT_OPEN;

// Each open HDDM file takes up about 1M of memory so it's
// worthwhile to close it as soon as we can.
Expand All @@ -138,7 +140,7 @@ void DEventSourceHDDM::GetEvent(std::shared_ptr<JEvent> event)
fin = NULL;
delete ifs;
ifs = NULL;
throw RETURN_STATUS::kNO_MORE_EVENTS;
return Result::FailureFinished;
}

hddm_s::HDDM *record = new hddm_s::HDDM();
Expand All @@ -148,7 +150,7 @@ void DEventSourceHDDM::GetEvent(std::shared_ptr<JEvent> event)
fin = NULL;
delete ifs;
ifs = NULL;
throw RETURN_STATUS::kNO_MORE_EVENTS;
return Result::FailureFinished;
}
}

Expand All @@ -163,16 +165,17 @@ void DEventSourceHDDM::GetEvent(std::shared_ptr<JEvent> event)
}

// Copy the reference info into the JEvent object
event->SetJEventSource(this);
event->SetEventNumber(event_number);
event->SetRunNumber(run_number);
event->Insert(record); // Transfer ownership of record to event
event.SetJEventSource(this);
event.SetEventNumber(event_number);
event.SetRunNumber(run_number);
event.Insert(record); // Transfer ownership of record to event

auto statusBits = new DStatusBits;
statusBits->SetStatusBit(kSTATUS_HDDM);
statusBits->SetStatusBit(kSTATUS_FROM_FILE);
statusBits->SetStatusBit(kSTATUS_PHYSICS_EVENT);
event->Insert(statusBits);
event.Insert(statusBits);
return Result::Success;
}

//----------------
Expand Down
2 changes: 1 addition & 1 deletion src/libraries/HDDM/DEventSourceHDDM.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ class DEventSourceHDDM:public JEventSource
virtual const char* className(void){return static_className();}
static const char* static_className(void){return "DEventSourceHDDM";}

void GetEvent(std::shared_ptr<JEvent> event) override;
Result Emit(JEvent& event) override;
void FinishEvent(JEvent &event) override;
bool GetObjects(const std::shared_ptr<const JEvent> &event, JFactory *factory) override;

Expand Down
Loading

0 comments on commit 4882f8e

Please sign in to comment.