diff --git a/Framework/Core/include/Framework/DeviceState.h b/Framework/Core/include/Framework/DeviceState.h index 7e9ac3772f426..b87eae6e620c3 100644 --- a/Framework/Core/include/Framework/DeviceState.h +++ b/Framework/Core/include/Framework/DeviceState.h @@ -23,6 +23,7 @@ typedef struct uv_loop_s uv_loop_t; typedef struct uv_timer_s uv_timer_t; typedef struct uv_poll_s uv_poll_t; typedef struct uv_signal_s uv_signal_t; +typedef struct uv_async_s uv_async_t; namespace o2::framework { @@ -78,6 +79,9 @@ struct DeviceState { std::vector activeOutputPollers; /// The list of active signal handlers std::vector activeSignals; + + uv_async_t* awakeMainThread = nullptr; + int loopReason = 0; }; diff --git a/Framework/Core/src/DataProcessingDevice.cxx b/Framework/Core/src/DataProcessingDevice.cxx index f3dea6416d523..b8c3d1ac8c60d 100644 --- a/Framework/Core/src/DataProcessingDevice.cxx +++ b/Framework/Core/src/DataProcessingDevice.cxx @@ -355,8 +355,22 @@ void handleRegionCallbacks(ServiceRegistry& registry, std::vectordata; + state->loopReason |= DeviceState::ASYNC_NOTIFICATION; +} +} // namespace void DataProcessingDevice::InitTask() { + if (mState.awakeMainThread == nullptr) { + mState.awakeMainThread = (uv_async_t*)malloc(sizeof(uv_async_t)); + mState.awakeMainThread->data = &mState; + uv_async_init(mState.loop, mState.awakeMainThread, on_awake_main_thread); + } + for (auto& channel : fChannels) { channel.second.at(0).Transport()->SubscribeToRegionEvents([this, ®istry = mServiceRegistry, @@ -372,6 +386,8 @@ void DataProcessingDevice::InitTask() // When not running we can handle the callbacks synchronously. if (this->GetCurrentState() != fair::mq::State::Running) { handleRegionCallbacks(registry, pendingRegionInfos); + } else { + uv_async_send(registry.get().awakeMainThread); } }); }