diff --git a/components/constellation/timer_scheduler.rs b/components/constellation/timer_scheduler.rs index d83c6170d234..8b2834ed8a30 100644 --- a/components/constellation/timer_scheduler.rs +++ b/components/constellation/timer_scheduler.rs @@ -5,78 +5,19 @@ use euclid::length::Length; use ipc_channel::ipc::{self, IpcSender}; use ipc_channel::router::ROUTER; -use script_traits::{MsDuration, NsDuration, precise_time_ms, precise_time_ns}; +use script_traits::{NsDuration, precise_time_ns}; use script_traits::{TimerEvent, TimerEventRequest}; -use std::cell::RefCell; use std::cmp::{self, Ord}; use std::collections::BinaryHeap; -use std::sync::Arc; -use std::sync::atomic::{self, AtomicBool}; -use std::sync::mpsc::{channel, Receiver, Select}; -use std::thread::{self, spawn, Thread}; +use std::sync::mpsc::Receiver; +use std::thread; use std::time::Duration; use util::thread::spawn_named; -/// A quick hack to work around the removal of [`std::old_io::timer::Timer`]( -/// http://doc.rust-lang.org/1.0.0-beta/std/old_io/timer/struct.Timer.html ) -struct CancelableOneshotTimer { - thread: Thread, - canceled: Arc, - port: Receiver<()>, -} - -impl CancelableOneshotTimer { - fn new(duration: MsDuration) -> CancelableOneshotTimer { - let (tx, rx) = channel(); - let canceled = Arc::new(AtomicBool::new(false)); - let canceled_clone = canceled.clone(); - - let thread = spawn(move || { - let due_time = precise_time_ms() + duration; - - let mut park_time = duration; - - loop { - thread::park_timeout(Duration::from_millis(park_time.get())); - - if canceled_clone.load(atomic::Ordering::Relaxed) { - return; - } - - // park_timeout_ms does not guarantee parking for the - // given amout. We might have woken up early. - let current_time = precise_time_ms(); - if current_time >= due_time { - let _ = tx.send(()); - return; - } - park_time = due_time - current_time; - } - }).thread().clone(); - - CancelableOneshotTimer { - thread: thread, - canceled: canceled, - port: rx, - } - } - - fn port(&self) -> &Receiver<()> { - &self.port - } - - fn cancel(&self) { - self.canceled.store(true, atomic::Ordering::Relaxed); - self.thread.unpark(); - } -} - pub struct TimerScheduler { port: Receiver, - scheduled_events: RefCell>, - - timer: RefCell>, + scheduled_events: BinaryHeap, } struct ScheduledEvent { @@ -103,6 +44,23 @@ impl PartialEq for ScheduledEvent { } } +fn recv_with_timeout(port: &Receiver, timeout: NsDuration) -> Option { + if let Ok(ret) = port.try_recv() { + return Some(ret); + } + + let timeout = timeout.get(); + let seconds = timeout / 1_000_000_000; + let nanos = timeout % 1_000_000_000; + // NB: This relies on the fact that Sender::send() calls unpark() on the + // receiver thread. + // + // This is probably stable enough to rely on it until + // https://github.com/rust-lang/rfcs/issues/962 is implemented. + thread::park_timeout(Duration::new(seconds, nanos as u32)); + port.try_recv().ok() +} + enum Task { HandleRequest(TimerEventRequest), DispatchDueEvents, @@ -112,12 +70,9 @@ impl TimerScheduler { pub fn start() -> IpcSender { let (chan, port) = ipc::channel().unwrap(); - let timer_scheduler = TimerScheduler { + let mut timer_scheduler = TimerScheduler { port: ROUTER.route_ipc_receiver_to_new_mpsc_receiver(port), - - scheduled_events: RefCell::new(BinaryHeap::new()), - - timer: RefCell::new(None), + scheduled_events: BinaryHeap::new(), }; spawn_named("TimerScheduler".to_owned(), move || { @@ -127,68 +82,49 @@ impl TimerScheduler { chan } - fn run_event_loop(&self) { - while let Some(thread) = self.receive_next_task() { - match thread { - Task::HandleRequest(request) => self.handle_request(request), - Task::DispatchDueEvents => self.dispatch_due_events(), + fn get_next_task(&mut self) -> Option { + if let Some(event) = self.scheduled_events.peek() { + let now = precise_time_ns(); + if event.for_time < now { + return Some(Task::DispatchDueEvents); } + + let timeout = event.for_time - now; + recv_with_timeout(&self.port, timeout).map(Task::HandleRequest) + } else { + self.port.recv().ok().map(Task::HandleRequest) } } - #[allow(unsafe_code)] - fn receive_next_task(&self) -> Option { - let port = &self.port; - let timer = self.timer.borrow(); - let timer_port = timer.as_ref().map(|timer| timer.port()); - - if let Some(ref timer_port) = timer_port { - let sel = Select::new(); - let mut scheduler_handle = sel.handle(port); - let mut timer_handle = sel.handle(timer_port); - - unsafe { - scheduler_handle.add(); - timer_handle.add(); - } - - let ret = sel.wait(); - if ret == scheduler_handle.id() { - port.recv().ok().map(Task::HandleRequest) - } else if ret == timer_handle.id() { - timer_port.recv().ok().map(|_| Task::DispatchDueEvents) - } else { - panic!("unexpected select result!") + fn run_event_loop(&mut self) { + loop { + let task = self.get_next_task(); + if let Some(task) = task { + match task { + Task::DispatchDueEvents => self.dispatch_due_events(), + Task::HandleRequest(req) => self.add_request(req), + } } - } else { - port.recv().ok().map(Task::HandleRequest) } } - fn handle_request(&self, request: TimerEventRequest) { + fn add_request(&mut self, request: TimerEventRequest) { let TimerEventRequest(_, _, _, duration_ms) = request; let duration_ns = Length::new(duration_ms.get() * 1000 * 1000); let schedule_for = precise_time_ns() + duration_ns; - let previously_earliest = self.scheduled_events.borrow().peek() - .map_or(Length::new(u64::max_value()), |scheduled| scheduled.for_time); - - self.scheduled_events.borrow_mut().push(ScheduledEvent { + self.scheduled_events.push(ScheduledEvent { request: request, for_time: schedule_for, }); - - if schedule_for < previously_earliest { - self.start_timer_for_next_event(); - } } - fn dispatch_due_events(&self) { + fn dispatch_due_events(&mut self) { let now = precise_time_ns(); - { - let mut events = self.scheduled_events.borrow_mut(); + let mut events = &mut self.scheduled_events; + { while !events.is_empty() && events.peek().as_ref().unwrap().for_time <= now { let event = events.pop().unwrap(); let TimerEventRequest(chan, source, id, _) = event.request; @@ -196,26 +132,5 @@ impl TimerScheduler { let _ = chan.send(TimerEvent(source, id)); } } - - self.start_timer_for_next_event(); - } - - fn start_timer_for_next_event(&self) { - let events = self.scheduled_events.borrow(); - let next_event = events.peek(); - - let mut timer = self.timer.borrow_mut(); - - if let Some(ref mut timer) = *timer { - timer.cancel(); - } - - *timer = next_event.map(|next_event| { - let delay_ns = next_event.for_time.get().saturating_sub(precise_time_ns().get()); - // Round up, we'd rather be late than early… - let delay_ms = Length::new(delay_ns.saturating_add(999999) / (1000 * 1000)); - - CancelableOneshotTimer::new(delay_ms) - }); } }