diff --git a/src/event_loop.rs b/src/event_loop.rs index 51eeccd1..9584e416 100644 --- a/src/event_loop.rs +++ b/src/event_loop.rs @@ -1,5 +1,6 @@ //! The main event loop which performs I/O on the pseudoterminal use std::borrow::Cow; +use std::collections::VecDeque; use std::io::{self, ErrorKind}; use std::os::unix::io::AsRawFd; use std::sync::Arc; @@ -14,6 +15,17 @@ use sync::FairMutex; use super::Flag; +/// Messages that may be sent to the `EventLoop` +#[derive(Debug)] +pub enum Msg { + /// Data that should be written to the pty + Input(Cow<'static, [u8]>), +} + +/// The main event!.. loop. +/// +/// Handles all the pty I/O and runs the pty parser which updates terminal +/// state. pub struct EventLoop { poll: mio::Poll, pty: Io, @@ -24,18 +36,95 @@ pub struct EventLoop { signal_flag: Flag, } - -#[derive(Debug)] -pub enum Msg { - Input(Cow<'static, [u8]>), +/// Helper type which tracks how much of a buffer has been written. +struct Writing { + source: Cow<'static, [u8]>, + written: usize, } +/// All of the mutable state needed to run the event loop +/// +/// Contains list of items to write, current write state, etc. Anything that +/// would otherwise be mutated on the `EventLoop` goes here. +pub struct State { + write_list: VecDeque>, + writing: Option, + parser: ansi::Processor, +} + +impl Default for State { + fn default() -> State { + State { + write_list: VecDeque::new(), + parser: ansi::Processor::new(), + writing: None, + } + } +} + +impl State { + #[inline] + fn ensure_next(&mut self) { + if self.writing.is_none() { + self.goto_next(); + } + } + + #[inline] + fn goto_next(&mut self) { + self.writing = self.write_list + .pop_front() + .map(|c| Writing::new(c)); + } + + #[inline] + fn take_current(&mut self) -> Option { + self.writing.take() + } + + #[inline] + fn needs_write(&self) -> bool { + self.writing.is_some() || !self.write_list.is_empty() + } + + #[inline] + fn set_current(&mut self, new: Option) { + self.writing = new; + } +} + +impl Writing { + #[inline] + fn new(c: Cow<'static, [u8]>) -> Writing { + Writing { source: c, written: 0 } + } + + #[inline] + fn advance(&mut self, n: usize) { + self.written += n; + } + + #[inline] + fn remaining_bytes(&self) -> &[u8] { + &self.source[self.written..] + } + + #[inline] + fn finished(&self) -> bool { + self.written >= self.source.len() + } +} + +/// mio::Token for the event loop channel const CHANNEL: mio::Token = mio::Token(0); + +/// mio::Token for the pty file descriptor const PTY: mio::Token = mio::Token(1); impl EventLoop where Io: io::Read + io::Write + Send + AsRawFd + 'static { + /// Create a new event loop pub fn new( terminal: Arc>, proxy: ::glutin::WindowProxy, @@ -58,166 +147,141 @@ impl EventLoop self.tx.clone() } - pub fn spawn(self) -> thread::JoinHandle<()> { - - struct Writing { - source: Cow<'static, [u8]>, - written: usize, - } - - impl Writing { - #[inline] - fn new(c: Cow<'static, [u8]>) -> Writing { - Writing { source: c, written: 0 } - } - - #[inline] - fn advance(&mut self, n: usize) { - self.written += n; - } - - #[inline] - fn remaining_bytes(&self) -> &[u8] { - &self.source[self.written..] - } - - #[inline] - fn finished(&self) -> bool { - self.written >= self.source.len() + #[inline] + fn channel_event(&mut self, state: &mut State) { + while let Ok(msg) = self.rx.try_recv() { + match msg { + Msg::Input(input) => { + state.write_list.push_back(input); + } } } + self.poll.reregister( + &self.rx, CHANNEL, + Ready::readable(), + PollOpt::edge() | PollOpt::oneshot() + ).expect("reregister channel"); + + if state.needs_write() { + self.poll.reregister( + &EventedFd(&self.pty.as_raw_fd()), + PTY, + Ready::readable() | Ready::writable(), + PollOpt::edge() | PollOpt::oneshot() + ).expect("reregister fd after channel recv"); + } + } + + #[inline] + fn pty_read(&mut self, state: &mut State, buf: &mut [u8]) { + loop { + match self.pty.read(&mut buf[..]) { + Ok(0) => break, + Ok(got) => { + let mut terminal = self.terminal.lock(); + for byte in &buf[..got] { + state.parser.advance(&mut *terminal, *byte); + } + + terminal.dirty = true; + + // Only wake up the event loop if it hasn't already been + // signaled. This is a really important optimization because + // waking up the event loop redundantly burns *a lot* of + // cycles. + if !self.signal_flag.get() { + self.proxy.wakeup_event_loop(); + self.signal_flag.set(true); + } + }, + Err(err) => { + match err.kind() { + ErrorKind::WouldBlock => break, + _ => panic!("unexpected read err: {:?}", err), + } + } + } + } + } + + #[inline] + fn pty_write(&mut self, state: &mut State) { + state.ensure_next(); + + 'write_many: while let Some(mut current) = state.take_current() { + 'write_one: loop { + match self.pty.write(current.remaining_bytes()) { + Ok(0) => { + state.set_current(Some(current)); + break 'write_many; + }, + Ok(n) => { + current.advance(n); + if current.finished() { + state.goto_next(); + break 'write_one; + } + }, + Err(err) => { + state.set_current(Some(current)); + match err.kind() { + ErrorKind::WouldBlock => break 'write_many, + // TODO + _ => panic!("unexpected err: {:?}", err), + } + } + } + + } + } + } + + pub fn spawn(mut self, state: Option) -> thread::JoinHandle<(EventLoop, State)> { thread::spawn_named("pty reader", move || { - - let EventLoop { poll, mut pty, rx, terminal, proxy, signal_flag, .. } = self; - - + let mut state = state.unwrap_or_else(Default::default); let mut buf = [0u8; 4096]; - let mut pty_parser = ansi::Processor::new(); - let fd = pty.as_raw_fd(); + + let fd = self.pty.as_raw_fd(); let fd = EventedFd(&fd); - poll.register(&rx, CHANNEL, Ready::readable(), PollOpt::edge() | PollOpt::oneshot()) - .unwrap(); - poll.register(&fd, PTY, Ready::readable(), PollOpt::edge() | PollOpt::oneshot()) - .unwrap(); + let poll_opts = PollOpt::edge() | PollOpt::oneshot(); + + self.poll.register(&self.rx, CHANNEL, Ready::readable(), poll_opts).unwrap(); + self.poll.register(&fd, PTY, Ready::readable(), poll_opts).unwrap(); let mut events = Events::with_capacity(1024); - let mut write_list = ::std::collections::VecDeque::new(); - let mut writing = None; 'event_loop: loop { - poll.poll(&mut events, None).expect("poll ok"); + self.poll.poll(&mut events, None).expect("poll ok"); for event in events.iter() { match event.token() { - CHANNEL => { - while let Ok(msg) = rx.try_recv() { - match msg { - Msg::Input(input) => { - write_list.push_back(input); - } - } - } - - poll.reregister( - &rx, CHANNEL, - Ready::readable(), - PollOpt::edge() | PollOpt::oneshot() - ).expect("reregister channel"); - - if writing.is_some() || !write_list.is_empty() { - poll.reregister( - &fd, - PTY, - Ready::readable() | Ready::writable(), - PollOpt::edge() | PollOpt::oneshot() - ).expect("reregister fd after channel recv"); - } - }, + CHANNEL => self.channel_event(&mut state), PTY => { let kind = event.kind(); if kind.is_readable() { - loop { - match pty.read(&mut buf[..]) { - Ok(0) => break, - Ok(got) => { - let mut terminal = terminal.lock(); - for byte in &buf[..got] { - pty_parser.advance(&mut *terminal, *byte); - } - - terminal.dirty = true; - - // Only wake up the event loop if it hasn't already been - // signaled. This is a really important optimization - // because waking up the event loop redundantly burns *a - // lot* of cycles. - if !signal_flag.get() { - proxy.wakeup_event_loop(); - signal_flag.set(true); - } - }, - Err(err) => { - match err.kind() { - ErrorKind::WouldBlock => break, - _ => panic!("unexpected read err: {:?}", err), - } - } - } - } + self.pty_read(&mut state, &mut buf); } if kind.is_writable() { - if writing.is_none() { - writing = write_list - .pop_front() - .map(|c| Writing::new(c)); - } - - 'write_list_loop: while let Some(mut write_now) = writing.take() { - loop { - match pty.write(write_now.remaining_bytes()) { - Ok(0) => { - writing = Some(write_now); - break 'write_list_loop; - }, - Ok(n) => { - write_now.advance(n); - if write_now.finished() { - writing = write_list - .pop_front() - .map(|next| Writing::new(next)); - - break; - } else { - } - }, - Err(err) => { - writing = Some(write_now); - match err.kind() { - ErrorKind::WouldBlock => break 'write_list_loop, - // TODO - _ => panic!("unexpected err: {:?}", err), - } - } - } - - } - } + self.pty_write(&mut state); } if kind.is_hup() { break 'event_loop; } + // Figure out pty interest let mut interest = Ready::readable(); - if writing.is_some() || !write_list.is_empty() { + if state.needs_write() { interest.insert(Ready::writable()); } - poll.reregister(&fd, PTY, interest, PollOpt::edge() | PollOpt::oneshot()) + // Reregister pty + self.poll + .reregister(&fd, PTY, interest, poll_opts) .expect("register fd after read/write"); }, _ => (), @@ -225,7 +289,10 @@ impl EventLoop } } - println!("pty reader stopped"); + self.poll.deregister(&self.rx).expect("deregister channel"); + self.poll.deregister(&fd).expect("deregister pty"); + + (self, state) }) } } diff --git a/src/main.rs b/src/main.rs index 5a57b362..dc2d2eff 100644 --- a/src/main.rs +++ b/src/main.rs @@ -196,7 +196,7 @@ fn main() { ); let loop_tx = event_loop.channel(); - let event_loop_handle = event_loop.spawn(); + let event_loop_handle = event_loop.spawn(None); // Wraps a renderer and gives simple draw() api. let mut display = Display::new(