Move update events to eventloop

We can now completely remove the eventqueue thread
This commit is contained in:
patrick96 2021-09-11 13:09:46 +02:00 committed by Patrick Ziegler
parent f36224eaab
commit b9642e4cf3
4 changed files with 45 additions and 127 deletions

View File

@ -10,7 +10,6 @@
#include "components/types.hpp" #include "components/types.hpp"
#include "events/signal_fwd.hpp" #include "events/signal_fwd.hpp"
#include "events/signal_receiver.hpp" #include "events/signal_receiver.hpp"
#include "events/types.hpp"
#include "settings.hpp" #include "settings.hpp"
#include "utils/actions.hpp" #include "utils/actions.hpp"
#include "utils/file.hpp" #include "utils/file.hpp"
@ -51,9 +50,9 @@ class controller
bool run(bool writeback, string snapshot_dst); bool run(bool writeback, string snapshot_dst);
bool enqueue(event&& evt);
void trigger_action(string&& input_data); void trigger_action(string&& input_data);
void trigger_quit(bool reload); void trigger_quit(bool reload);
void trigger_update(bool force);
void stop(bool reload); void stop(bool reload);
@ -66,7 +65,6 @@ class controller
protected: protected:
void read_events(); void read_events();
void process_eventqueue();
void process_inputdata(); void process_inputdata();
bool process_update(bool force); bool process_update(bool force);
@ -121,11 +119,6 @@ class controller
*/ */
bool m_writeback{false}; bool m_writeback{false};
/**
* \brief Internal event queue
*/
moodycamel::BlockingConcurrentQueue<event> m_queue;
/** /**
* \brief Loaded modules * \brief Loaded modules
*/ */
@ -151,11 +144,6 @@ class controller
*/ */
string m_inputdata; string m_inputdata;
/**
* \brief Thread for the eventqueue loop
*/
std::thread m_event_thread;
/** /**
* \brief Misc threads * \brief Misc threads
*/ */

View File

@ -1,42 +0,0 @@
#pragma once
#include <cstdint>
#include "common.hpp"
POLYBAR_NS
enum class event_type {
NONE = 0,
UPDATE,
};
struct event {
int type{0};
bool flag{false};
};
namespace {
inline bool operator==(int id, event_type type) {
return id == static_cast<int>(type);
}
inline bool operator!=(int id, event_type type) {
return !(id == static_cast<int>(type));
}
/**
* Create NONE event
*/
inline event make_none_evt() {
return event{static_cast<int>(event_type::NONE)};
}
/**
* Create UPDATE event
*/
inline event make_update_evt(bool force = false) {
return event{static_cast<int>(event_type::UPDATE), force};
}
}
POLYBAR_NS_END

View File

@ -6,7 +6,6 @@
#include "common.hpp" #include "common.hpp"
#include "events/signal_fwd.hpp" #include "events/signal_fwd.hpp"
#include "events/signal_receiver.hpp" #include "events/signal_receiver.hpp"
#include "events/types.hpp"
#include "x11/extensions/fwd.hpp" #include "x11/extensions/fwd.hpp"
#include "x11/types.hpp" #include "x11/types.hpp"

View File

@ -31,6 +31,10 @@ POLYBAR_NS
sig_atomic_t g_reload{0}; sig_atomic_t g_reload{0};
sig_atomic_t g_terminate{0}; sig_atomic_t g_terminate{0};
// TODO pass this information in a better way
sig_atomic_t g_update{0};
sig_atomic_t g_force_update{0};
/** /**
* Build controller instance * Build controller instance
*/ */
@ -57,6 +61,7 @@ controller::controller(connection& conn, signal_emitter& emitter, const logger&
"remove it from your config"); "remove it from your config");
} }
// TODO deprecate both
m_swallow_limit = m_conf.deprecated("settings", "eventqueue-swallow", "throttle-output", m_swallow_limit); m_swallow_limit = m_conf.deprecated("settings", "eventqueue-swallow", "throttle-output", m_swallow_limit);
m_swallow_update = m_conf.deprecated("settings", "eventqueue-swallow-time", "throttle-output-for", m_swallow_update); m_swallow_update = m_conf.deprecated("settings", "eventqueue-swallow-time", "throttle-output-for", m_swallow_update);
@ -129,34 +134,14 @@ bool controller::run(bool writeback, string snapshot_dst) {
} }
m_connection.flush(); m_connection.flush();
m_event_thread = thread(&controller::process_eventqueue, this);
read_events(); read_events();
if (m_event_thread.joinable()) {
m_log.info("Joining event thread");
m_event_thread.join();
}
m_log.notice("Termination signal received, shutting down..."); m_log.notice("Termination signal received, shutting down...");
return !g_reload; return !g_reload;
} }
/**
* Enqueue event
*/
bool controller::enqueue(event&& evt) {
if (!m_process_events) {
return false;
}
if (!m_queue.enqueue(forward<decltype(evt)>(evt))) {
m_log.warn("Failed to enqueue event");
return false;
}
return true;
}
/** /**
* Enqueue input data * Enqueue input data
*/ */
@ -172,9 +157,23 @@ void controller::trigger_action(string&& input_data) {
void controller::trigger_quit(bool reload) { void controller::trigger_quit(bool reload) {
g_terminate = 1; g_terminate = 1;
g_reload = reload; g_reload = reload;
// TODO create function for this
UV(uv_async_send, m_notifier.get()); UV(uv_async_send, m_notifier.get());
} }
void controller::trigger_update(bool force) {
if (force) {
g_force_update = 1;
} else {
g_update = 1;
}
// TODO this isn't really safe
if (m_notifier) {
UV(uv_async_send, m_notifier.get());
}
}
void controller::stop(bool reload) { void controller::stop(bool reload) {
g_terminate = 1; g_terminate = 1;
g_reload = reload; g_reload = reload;
@ -237,6 +236,15 @@ void controller::notifier_handler() {
if (!m_inputdata.empty()) { if (!m_inputdata.empty()) {
process_inputdata(); process_inputdata();
} }
if (g_force_update) {
process_update(true);
} else if (g_update) {
process_update(false);
}
g_update = 0;
g_force_update = 0;
} }
static void ipc_alloc_cb(uv_handle_t*, size_t, uv_buf_t* buf) { static void ipc_alloc_cb(uv_handle_t*, size_t, uv_buf_t* buf) {
@ -273,6 +281,15 @@ static void notifier_cb_wrapper(uv_async_t *handle) {
void controller::read_events() { void controller::read_events() {
m_log.info("Entering event loop (thread-id=%lu)", this_thread::get_id()); m_log.info("Entering event loop (thread-id=%lu)", this_thread::get_id());
if (!m_writeback) {
m_sig.emit(signals::eventqueue::start{});
} else {
// bypass the start eventqueue signal
m_sig.emit(signals::ui::ready{});
}
process_update(true);
auto ipc_handle = std::unique_ptr<uv_pipe_t>(nullptr); auto ipc_handle = std::unique_ptr<uv_pipe_t>(nullptr);
try { try {
@ -309,56 +326,11 @@ void controller::read_events() {
stop(false); stop(false);
} }
// Notify event queue so that it stops
enqueue(make_none_evt());
m_log.info("Eventloop finished"); m_log.info("Eventloop finished");
eloop.reset(); eloop.reset();
} }
/**
* Eventqueue worker loop
*/
void controller::process_eventqueue() {
m_log.info("Eventqueue worker (thread-id=%lu)", this_thread::get_id());
if (!m_writeback) {
m_sig.emit(signals::eventqueue::start{});
} else {
// bypass the start eventqueue signal
m_sig.emit(signals::ui::ready{});
}
while (!g_terminate) {
event evt{};
m_queue.wait_dequeue(evt);
if (g_terminate) {
break;
} else if (evt.type == event_type::UPDATE && evt.flag) {
process_update(true);
} else {
event next{};
size_t swallowed{0};
while (swallowed++ < m_swallow_limit && m_queue.wait_dequeue_timed(next, m_swallow_update)) {
if (evt.type != next.type) {
enqueue(move(next));
break;
} else {
m_log.trace_x("controller: Swallowing event within timeframe");
evt = next;
}
}
if (evt.type == event_type::UPDATE) {
process_update(evt.flag);
} else {
m_log.warn("Unknown event type for enqueued event (%d)", evt.type);
}
}
}
}
/** /**
* Tries to match the given command to a legacy action string and sends the * Tries to match the given command to a legacy action string and sends the
* appropriate new action (and data) to the right module if possible. * appropriate new action (and data) to the right module if possible.
@ -693,14 +665,16 @@ size_t controller::setup_modules(alignment align) {
* Process broadcast events * Process broadcast events
*/ */
bool controller::on(const signals::eventqueue::notify_change&) { bool controller::on(const signals::eventqueue::notify_change&) {
return enqueue(make_update_evt(false)); trigger_update(false);
return true;
} }
/** /**
* Process forced broadcast events * Process forced broadcast events
*/ */
bool controller::on(const signals::eventqueue::notify_forcechange&) { bool controller::on(const signals::eventqueue::notify_forcechange&) {
return enqueue(make_update_evt(true)); trigger_update(true);
return true;
} }
/** /**
@ -738,13 +712,13 @@ bool controller::on(const signals::eventqueue::check_state&) {
*/ */
bool controller::on(const signals::ui::ready&) { bool controller::on(const signals::ui::ready&) {
m_process_events = true; m_process_events = true;
enqueue(make_update_evt(true)); trigger_update(true);
if (!m_snapshot_dst.empty()) { if (!m_snapshot_dst.empty()) {
m_threads.emplace_back(thread([&] { m_threads.emplace_back(thread([&] {
this_thread::sleep_for(3s); this_thread::sleep_for(3s);
m_sig.emit(signals::ui::request_snapshot{move(m_snapshot_dst)}); m_sig.emit(signals::ui::request_snapshot{move(m_snapshot_dst)});
enqueue(make_update_evt(true)); trigger_update(true);
})); }));
} }
@ -830,8 +804,7 @@ bool controller::on(const signals::ipc::hook& evt) {
} }
bool controller::on(const signals::ui::update_background&) { bool controller::on(const signals::ui::update_background&) {
enqueue(make_update_evt(true)); trigger_update(true);
return false; return false;
} }