diff --git a/Framework/Core/src/DataProcessingDevice.cxx b/Framework/Core/src/DataProcessingDevice.cxx index d9939ab246c9b..69bd8b83385ad 100644 --- a/Framework/Core/src/DataProcessingDevice.cxx +++ b/Framework/Core/src/DataProcessingDevice.cxx @@ -342,10 +342,25 @@ void on_signal_callback(uv_signal_t* handle, int signum) context->stats->totalSigusr1 += 1; } +/// Invoke the callbacks for the mPendingRegionInfos +void handleRegionCallbacks(ServiceRegistry& registry, std::vector& infos) +{ + if (infos.empty() == false) { + std::vector toBeNotified; + toBeNotified.swap(infos); // avoid any MT issue. + for (auto const& info : toBeNotified) { + registry.get()(CallbackService::Id::RegionInfoCallback, info); + } + } +} + void DataProcessingDevice::InitTask() { for (auto& channel : fChannels) { - channel.second.at(0).Transport()->SubscribeToRegionEvents([&pendingRegionInfos = mPendingRegionInfos, ®ionInfoMutex = mRegionInfoMutex](FairMQRegionInfo info) { + channel.second.at(0).Transport()->SubscribeToRegionEvents([this, + ®istry = mServiceRegistry, + &pendingRegionInfos = mPendingRegionInfos, + ®ionInfoMutex = mRegionInfoMutex](FairMQRegionInfo info) { std::lock_guard lock(regionInfoMutex); LOG(debug) << ">>> Region info event" << info.event; LOG(debug) << "id: " << info.id; @@ -353,6 +368,10 @@ void DataProcessingDevice::InitTask() LOG(debug) << "size: " << info.size; LOG(debug) << "flags: " << info.flags; pendingRegionInfos.push_back(info); + // When not running we can handle the callbacks synchronously. + if (this->GetCurrentState() != fair::mq::State::Running) { + handleRegionCallbacks(registry, pendingRegionInfos); + } }); } @@ -497,6 +516,12 @@ void DataProcessingDevice::Reset() { mServiceRegistry.get()(Cal bool DataProcessingDevice::ConditionalRun() { + // Notify on the main thread the new region callbacks, making sure + // no callback is issued if there is something still processing. + { + std::lock_guard lock(mRegionInfoMutex); + handleRegionCallbacks(mServiceRegistry, mPendingRegionInfos); + } // This will block for the correct delay (or until we get data // on a socket). We also do not block on the first iteration // so that devices which do not have a timer can still start an @@ -528,15 +553,13 @@ bool DataProcessingDevice::ConditionalRun() // Notify on the main thread the new region callbacks, making sure // no callback is issued if there is something still processing. + // Notice that we still need to perform callbacks also after + // the socket epolled, because otherwise we would end up serving + // the callback after the first data arrives is the system is too + // fast to transition from Init to Run. { std::lock_guard lock(mRegionInfoMutex); - if (mPendingRegionInfos.empty() == false) { - std::vector toBeNotified; - toBeNotified.swap(mPendingRegionInfos); // avoid any MT issue. - for (auto const& info : toBeNotified) { - mServiceRegistry.get()(CallbackService::Id::RegionInfoCallback, info); - } - } + handleRegionCallbacks(mServiceRegistry, mPendingRegionInfos); } assert(mStreams.size() == mHandles.size());