Refactor EventLoop into event_loop module

This type and its implementations were seriously cluttering main.rs.
This commit is contained in:
Joe Wilm 2016-09-25 19:49:44 -07:00
parent 6a5ac20def
commit 3f6deb8e2f
4 changed files with 251 additions and 231 deletions

231
src/event_loop.rs Normal file
View File

@ -0,0 +1,231 @@
//! The main event loop which performs I/O on the pseudoterminal
use std::borrow::Cow;
use std::io::{self, ErrorKind};
use std::os::unix::io::AsRawFd;
use std::sync::Arc;
use mio::{self, Events, PollOpt, Ready};
use mio::unix::EventedFd;
use ansi;
use term::Term;
use util::thread;
use sync::FairMutex;
use super::Flag;
pub struct EventLoop<Io> {
poll: mio::Poll,
pty: Io,
rx: mio::channel::Receiver<Msg>,
tx: mio::channel::Sender<Msg>,
terminal: Arc<FairMutex<Term>>,
proxy: ::glutin::WindowProxy,
signal_flag: Flag,
}
#[derive(Debug)]
pub enum Msg {
Input(Cow<'static, [u8]>),
}
const CHANNEL: mio::Token = mio::Token(0);
const PTY: mio::Token = mio::Token(1);
impl<Io> EventLoop<Io>
where Io: io::Read + io::Write + Send + AsRawFd + 'static
{
pub fn new(
terminal: Arc<FairMutex<Term>>,
proxy: ::glutin::WindowProxy,
signal_flag: Flag,
pty: Io,
) -> EventLoop<Io> {
let (tx, rx) = ::mio::channel::channel();
EventLoop {
poll: mio::Poll::new().expect("create mio Poll"),
pty: pty,
tx: tx,
rx: rx,
terminal: terminal,
proxy: proxy,
signal_flag: signal_flag
}
}
pub fn channel(&self) -> mio::channel::Sender<Msg> {
self.tx.clone()
}
pub fn spawn(self) -> thread::JoinHandle<()> {
struct Writing {
source: Cow<'static, [u8]>,
written: usize,
}
impl Writing {
#[inline]
fn new(c: Cow<'static, [u8]>) -> Writing {
Writing { source: c, written: 0 }
}
#[inline]
fn advance(&mut self, n: usize) {
self.written += n;
}
#[inline]
fn remaining_bytes(&self) -> &[u8] {
&self.source[self.written..]
}
#[inline]
fn finished(&self) -> bool {
self.written >= self.source.len()
}
}
thread::spawn_named("pty reader", move || {
let EventLoop { poll, mut pty, rx, terminal, proxy, signal_flag, .. } = self;
let mut buf = [0u8; 4096];
let mut pty_parser = ansi::Processor::new();
let fd = pty.as_raw_fd();
let fd = EventedFd(&fd);
poll.register(&rx, CHANNEL, Ready::readable(), PollOpt::edge() | PollOpt::oneshot())
.unwrap();
poll.register(&fd, PTY, Ready::readable(), PollOpt::edge() | PollOpt::oneshot())
.unwrap();
let mut events = Events::with_capacity(1024);
let mut write_list = ::std::collections::VecDeque::new();
let mut writing = None;
'event_loop: loop {
poll.poll(&mut events, None).expect("poll ok");
for event in events.iter() {
match event.token() {
CHANNEL => {
while let Ok(msg) = rx.try_recv() {
match msg {
Msg::Input(input) => {
write_list.push_back(input);
}
}
}
poll.reregister(
&rx, CHANNEL,
Ready::readable(),
PollOpt::edge() | PollOpt::oneshot()
).expect("reregister channel");
if writing.is_some() || !write_list.is_empty() {
poll.reregister(
&fd,
PTY,
Ready::readable() | Ready::writable(),
PollOpt::edge() | PollOpt::oneshot()
).expect("reregister fd after channel recv");
}
},
PTY => {
let kind = event.kind();
if kind.is_readable() {
loop {
match pty.read(&mut buf[..]) {
Ok(0) => break,
Ok(got) => {
let mut terminal = terminal.lock();
for byte in &buf[..got] {
pty_parser.advance(&mut *terminal, *byte);
}
terminal.dirty = true;
// Only wake up the event loop if it hasn't already been
// signaled. This is a really important optimization
// because waking up the event loop redundantly burns *a
// lot* of cycles.
if !signal_flag.get() {
proxy.wakeup_event_loop();
signal_flag.set(true);
}
},
Err(err) => {
match err.kind() {
ErrorKind::WouldBlock => break,
_ => panic!("unexpected read err: {:?}", err),
}
}
}
}
}
if kind.is_writable() {
if writing.is_none() {
writing = write_list
.pop_front()
.map(|c| Writing::new(c));
}
'write_list_loop: while let Some(mut write_now) = writing.take() {
loop {
match pty.write(write_now.remaining_bytes()) {
Ok(0) => {
writing = Some(write_now);
break 'write_list_loop;
},
Ok(n) => {
write_now.advance(n);
if write_now.finished() {
writing = write_list
.pop_front()
.map(|next| Writing::new(next));
break;
} else {
}
},
Err(err) => {
writing = Some(write_now);
match err.kind() {
ErrorKind::WouldBlock => break 'write_list_loop,
// TODO
_ => panic!("unexpected err: {:?}", err),
}
}
}
}
}
}
if kind.is_hup() {
break 'event_loop;
}
let mut interest = Ready::readable();
if writing.is_some() || !write_list.is_empty() {
interest.insert(Ready::writable());
}
poll.reregister(&fd, PTY, interest, PollOpt::edge() | PollOpt::oneshot())
.expect("register fd after read/write");
},
_ => (),
}
}
}
println!("pty reader stopped");
})
}
}

View File

@ -29,6 +29,7 @@ use glutin::{ElementState, VirtualKeyCode};
use glutin::{Mods, mods};
use term::mode::{self, TermMode};
use event_loop;
/// Processes input from glutin.
///
@ -47,14 +48,14 @@ pub trait Notify {
fn notify<B: Into<Cow<'static, [u8]>>>(&mut self, B);
}
pub struct LoopNotifier(pub ::mio::channel::Sender<::EventLoopMessage>);
pub struct LoopNotifier(pub ::mio::channel::Sender<event_loop::Msg>);
impl Notify for LoopNotifier {
fn notify<B>(&mut self, bytes: B)
where B: Into<Cow<'static, [u8]>>
{
let bytes = bytes.into();
match self.0.send(::EventLoopMessage::Input(bytes)) {
match self.0.send(event_loop::Msg::Input(bytes)) {
Ok(_) => (),
Err(_) => panic!("expected send event loop msg"),
}

View File

@ -41,38 +41,39 @@ extern crate bitflags;
#[macro_use]
mod macros;
mod renderer;
pub mod grid;
mod meter;
pub mod config;
mod input;
mod index;
mod event;
mod tty;
pub mod ansi;
mod term;
mod util;
mod event_loop;
mod index;
mod input;
mod meter;
mod renderer;
mod sync;
mod term;
mod tty;
mod util;
pub mod ansi;
pub mod config;
pub mod grid;
use std::sync::{mpsc, Arc};
use std::fs::File;
use std::sync::atomic::{AtomicBool, Ordering};
use parking_lot::{MutexGuard};
use event_loop::EventLoop;
use config::Config;
use meter::Meter;
use renderer::{QuadRenderer, GlyphCache};
use sync::FairMutex;
use term::Term;
use tty::process_should_exit;
use util::thread;
/// Channel used by resize handling on mac
static mut resize_sender: Option<mpsc::Sender<(u32, u32)>> = None;
#[derive(Clone)]
struct Flag(Arc<AtomicBool>);
pub struct Flag(Arc<AtomicBool>);
impl Flag {
pub fn new(initial_value: bool) -> Flag {
Flag(Arc::new(AtomicBool::new(initial_value)))
@ -235,223 +236,7 @@ fn main() {
println!("Goodbye");
}
struct EventLoop {
poll: mio::Poll,
pty: File,
rx: mio::channel::Receiver<EventLoopMessage>,
tx: mio::channel::Sender<EventLoopMessage>,
terminal: Arc<FairMutex<Term>>,
proxy: ::glutin::WindowProxy,
signal_flag: Flag,
}
const CHANNEL: mio::Token = mio::Token(0);
const PTY: mio::Token = mio::Token(1);
#[derive(Debug)]
pub enum EventLoopMessage {
Input(::std::borrow::Cow<'static, [u8]>),
}
impl EventLoop {
pub fn new(
terminal: Arc<FairMutex<Term>>,
proxy: ::glutin::WindowProxy,
signal_flag: Flag,
pty: File,
) -> EventLoop {
let (tx, rx) = ::mio::channel::channel();
EventLoop {
poll: mio::Poll::new().expect("create mio Poll"),
pty: pty,
tx: tx,
rx: rx,
terminal: terminal,
proxy: proxy,
signal_flag: signal_flag
}
}
pub fn channel(&self) -> mio::channel::Sender<EventLoopMessage> {
self.tx.clone()
}
pub fn spawn(self) -> std::thread::JoinHandle<()> {
use mio::{Events, PollOpt, Ready};
use mio::unix::EventedFd;
use std::borrow::Cow;
use std::os::unix::io::AsRawFd;
use std::io::{Read, Write, ErrorKind};
struct Writing {
source: Cow<'static, [u8]>,
written: usize,
}
impl Writing {
#[inline]
fn new(c: Cow<'static, [u8]>) -> Writing {
Writing { source: c, written: 0 }
}
#[inline]
fn advance(&mut self, n: usize) {
self.written += n;
}
#[inline]
fn remaining_bytes(&self) -> &[u8] {
&self.source[self.written..]
}
#[inline]
fn finished(&self) -> bool {
self.written >= self.source.len()
}
}
thread::spawn_named("pty reader", move || {
let EventLoop { poll, mut pty, rx, terminal, proxy, signal_flag, .. } = self;
let mut buf = [0u8; 4096];
let mut pty_parser = ansi::Processor::new();
let fd = pty.as_raw_fd();
let fd = EventedFd(&fd);
poll.register(&rx, CHANNEL, Ready::readable(), PollOpt::edge() | PollOpt::oneshot())
.unwrap();
poll.register(&fd, PTY, Ready::readable(), PollOpt::edge() | PollOpt::oneshot())
.unwrap();
let mut events = Events::with_capacity(1024);
let mut write_list = ::std::collections::VecDeque::new();
let mut writing = None;
'event_loop: loop {
poll.poll(&mut events, None).expect("poll ok");
for event in events.iter() {
match event.token() {
CHANNEL => {
while let Ok(msg) = rx.try_recv() {
match msg {
EventLoopMessage::Input(input) => {
write_list.push_back(input);
}
}
}
poll.reregister(
&rx, CHANNEL,
Ready::readable(),
PollOpt::edge() | PollOpt::oneshot()
).expect("reregister channel");
if writing.is_some() || !write_list.is_empty() {
poll.reregister(
&fd,
PTY,
Ready::readable() | Ready::writable(),
PollOpt::edge() | PollOpt::oneshot()
).expect("reregister fd after channel recv");
}
},
PTY => {
let kind = event.kind();
if kind.is_readable() {
loop {
match pty.read(&mut buf[..]) {
Ok(0) => break,
Ok(got) => {
let mut terminal = terminal.lock();
for byte in &buf[..got] {
pty_parser.advance(&mut *terminal, *byte);
}
terminal.dirty = true;
// Only wake up the event loop if it hasn't already been
// signaled. This is a really important optimization
// because waking up the event loop redundantly burns *a
// lot* of cycles.
if !signal_flag.get() {
proxy.wakeup_event_loop();
signal_flag.set(true);
}
},
Err(err) => {
match err.kind() {
ErrorKind::WouldBlock => break,
_ => panic!("unexpected read err: {:?}", err),
}
}
}
}
}
if kind.is_writable() {
if writing.is_none() {
writing = write_list
.pop_front()
.map(|c| Writing::new(c));
}
'write_list_loop: while let Some(mut write_now) = writing.take() {
loop {
match pty.write(write_now.remaining_bytes()) {
Ok(0) => {
writing = Some(write_now);
break 'write_list_loop;
},
Ok(n) => {
write_now.advance(n);
if write_now.finished() {
writing = write_list
.pop_front()
.map(|next| Writing::new(next));
break;
} else {
}
},
Err(err) => {
writing = Some(write_now);
match err.kind() {
ErrorKind::WouldBlock => break 'write_list_loop,
// TODO
_ => panic!("unexpected err: {:?}", err),
}
}
}
}
}
}
if kind.is_hup() {
break 'event_loop;
}
let mut interest = Ready::readable();
if writing.is_some() || !write_list.is_empty() {
interest.insert(Ready::writable());
}
poll.reregister(&fd, PTY, interest, PollOpt::edge() | PollOpt::oneshot())
.expect("register fd after read/write");
},
_ => (),
}
}
}
println!("pty reader stopped");
})
}
}
struct Display {
window: Arc<glutin::Window>,

View File

@ -23,4 +23,7 @@ pub mod thread {
{
::std::thread::Builder::new().name(name.into()).spawn(f).expect("thread spawn works")
}
pub use ::std::thread::*;
}