From 1c7d90ec8bfc1859c7c0b5ab42e23b3a26559ca4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Emilio=20Cobos=20=C3=81lvarez?= Date: Thu, 19 May 2016 17:17:41 +0200 Subject: [PATCH 1/5] constellation: Simplify the scheduler so it doesn't create a thread for each event This can't land as-is, just wanted to make sure this idea is fine, and do a try run because I need to leave now. --- components/constellation/timer_scheduler.rs | 172 +++++--------------- 1 file changed, 40 insertions(+), 132 deletions(-) diff --git a/components/constellation/timer_scheduler.rs b/components/constellation/timer_scheduler.rs index d83c6170d234..1a3ca661b583 100644 --- a/components/constellation/timer_scheduler.rs +++ b/components/constellation/timer_scheduler.rs @@ -5,78 +5,17 @@ use euclid::length::Length; use ipc_channel::ipc::{self, IpcSender}; use ipc_channel::router::ROUTER; -use script_traits::{MsDuration, NsDuration, precise_time_ms, precise_time_ns}; +use script_traits::{NsDuration, precise_time_ns}; use script_traits::{TimerEvent, TimerEventRequest}; -use std::cell::RefCell; use std::cmp::{self, Ord}; use std::collections::BinaryHeap; -use std::sync::Arc; -use std::sync::atomic::{self, AtomicBool}; -use std::sync::mpsc::{channel, Receiver, Select}; -use std::thread::{self, spawn, Thread}; -use std::time::Duration; +use std::sync::mpsc::Receiver; use util::thread::spawn_named; -/// A quick hack to work around the removal of [`std::old_io::timer::Timer`]( -/// http://doc.rust-lang.org/1.0.0-beta/std/old_io/timer/struct.Timer.html ) -struct CancelableOneshotTimer { - thread: Thread, - canceled: Arc, - port: Receiver<()>, -} - -impl CancelableOneshotTimer { - fn new(duration: MsDuration) -> CancelableOneshotTimer { - let (tx, rx) = channel(); - let canceled = Arc::new(AtomicBool::new(false)); - let canceled_clone = canceled.clone(); - - let thread = spawn(move || { - let due_time = precise_time_ms() + duration; - - let mut park_time = duration; - - loop { - thread::park_timeout(Duration::from_millis(park_time.get())); - - if canceled_clone.load(atomic::Ordering::Relaxed) { - return; - } - - // park_timeout_ms does not guarantee parking for the - // given amout. We might have woken up early. - let current_time = precise_time_ms(); - if current_time >= due_time { - let _ = tx.send(()); - return; - } - park_time = due_time - current_time; - } - }).thread().clone(); - - CancelableOneshotTimer { - thread: thread, - canceled: canceled, - port: rx, - } - } - - fn port(&self) -> &Receiver<()> { - &self.port - } - - fn cancel(&self) { - self.canceled.store(true, atomic::Ordering::Relaxed); - self.thread.unpark(); - } -} - pub struct TimerScheduler { port: Receiver, - scheduled_events: RefCell>, - - timer: RefCell>, + scheduled_events: BinaryHeap, } struct ScheduledEvent { @@ -103,6 +42,18 @@ impl PartialEq for ScheduledEvent { } } +fn recv_with_timeout(port: &Receiver, from: NsDuration, timeout: NsDuration) -> Option { + loop { + if let Ok(ret) = port.try_recv() { + return Some(ret); + } + let now = precise_time_ns(); + if now - from >= timeout { + return None; + } + } +} + enum Task { HandleRequest(TimerEventRequest), DispatchDueEvents, @@ -112,12 +63,9 @@ impl TimerScheduler { pub fn start() -> IpcSender { let (chan, port) = ipc::channel().unwrap(); - let timer_scheduler = TimerScheduler { + let mut timer_scheduler = TimerScheduler { port: ROUTER.route_ipc_receiver_to_new_mpsc_receiver(port), - - scheduled_events: RefCell::new(BinaryHeap::new()), - - timer: RefCell::new(None), + scheduled_events: BinaryHeap::new(), }; spawn_named("TimerScheduler".to_owned(), move || { @@ -127,68 +75,49 @@ impl TimerScheduler { chan } - fn run_event_loop(&self) { - while let Some(thread) = self.receive_next_task() { - match thread { - Task::HandleRequest(request) => self.handle_request(request), - Task::DispatchDueEvents => self.dispatch_due_events(), + fn get_next_task(&mut self) -> Option { + let now = precise_time_ns(); + if let Some(event) = self.scheduled_events.peek() { + if event.for_time < now { + return Some(Task::DispatchDueEvents); } + + let timeout = event.for_time - now; + recv_with_timeout(&self.port, now, timeout).map(Task::HandleRequest) + } else { + self.port.recv().ok().map(Task::HandleRequest) } } - #[allow(unsafe_code)] - fn receive_next_task(&self) -> Option { - let port = &self.port; - let timer = self.timer.borrow(); - let timer_port = timer.as_ref().map(|timer| timer.port()); - - if let Some(ref timer_port) = timer_port { - let sel = Select::new(); - let mut scheduler_handle = sel.handle(port); - let mut timer_handle = sel.handle(timer_port); - - unsafe { - scheduler_handle.add(); - timer_handle.add(); - } - - let ret = sel.wait(); - if ret == scheduler_handle.id() { - port.recv().ok().map(Task::HandleRequest) - } else if ret == timer_handle.id() { - timer_port.recv().ok().map(|_| Task::DispatchDueEvents) - } else { - panic!("unexpected select result!") + fn run_event_loop(&mut self) { + loop { + let task = self.get_next_task(); + if let Some(task) = task { + match task { + Task::DispatchDueEvents => self.dispatch_due_events(), + Task::HandleRequest(req) => self.add_request(req), + } } - } else { - port.recv().ok().map(Task::HandleRequest) } } - fn handle_request(&self, request: TimerEventRequest) { + fn add_request(&mut self, request: TimerEventRequest) { let TimerEventRequest(_, _, _, duration_ms) = request; let duration_ns = Length::new(duration_ms.get() * 1000 * 1000); let schedule_for = precise_time_ns() + duration_ns; - let previously_earliest = self.scheduled_events.borrow().peek() - .map_or(Length::new(u64::max_value()), |scheduled| scheduled.for_time); - - self.scheduled_events.borrow_mut().push(ScheduledEvent { + self.scheduled_events.push(ScheduledEvent { request: request, for_time: schedule_for, }); - - if schedule_for < previously_earliest { - self.start_timer_for_next_event(); - } } - fn dispatch_due_events(&self) { + fn dispatch_due_events(&mut self) { let now = precise_time_ns(); - { - let mut events = self.scheduled_events.borrow_mut(); + let mut events = &mut self.scheduled_events; + { while !events.is_empty() && events.peek().as_ref().unwrap().for_time <= now { let event = events.pop().unwrap(); let TimerEventRequest(chan, source, id, _) = event.request; @@ -196,26 +125,5 @@ impl TimerScheduler { let _ = chan.send(TimerEvent(source, id)); } } - - self.start_timer_for_next_event(); - } - - fn start_timer_for_next_event(&self) { - let events = self.scheduled_events.borrow(); - let next_event = events.peek(); - - let mut timer = self.timer.borrow_mut(); - - if let Some(ref mut timer) = *timer { - timer.cancel(); - } - - *timer = next_event.map(|next_event| { - let delay_ns = next_event.for_time.get().saturating_sub(precise_time_ns().get()); - // Round up, we'd rather be late than early… - let delay_ms = Length::new(delay_ns.saturating_add(999999) / (1000 * 1000)); - - CancelableOneshotTimer::new(delay_ms) - }); } } From a9879a828bfb7823f0a216834c78ef0415b4a081 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Emilio=20Cobos=20=C3=81lvarez?= Date: Thu, 19 May 2016 21:25:25 +0200 Subject: [PATCH 2/5] constellation: Make the scheduler not spin while waiting This approach uses a secondary thread to manage the timeout + select. This is not ideal, but it can be vastly improved if something is done about https://github.com/rust-lang/rfcs/issues/962. The other apparently cleaner option, having the returned value not being an ipc-sender, but a struct containing the sender and the Thread handle (which would allow unparking), wouldn't work across IPC. --- components/constellation/timer_scheduler.rs | 89 ++++++++++++++++++--- 1 file changed, 79 insertions(+), 10 deletions(-) diff --git a/components/constellation/timer_scheduler.rs b/components/constellation/timer_scheduler.rs index 1a3ca661b583..32c6857d0801 100644 --- a/components/constellation/timer_scheduler.rs +++ b/components/constellation/timer_scheduler.rs @@ -9,13 +9,24 @@ use script_traits::{NsDuration, precise_time_ns}; use script_traits::{TimerEvent, TimerEventRequest}; use std::cmp::{self, Ord}; use std::collections::BinaryHeap; -use std::sync::mpsc::Receiver; +use std::sync::mpsc::{Receiver, Sender, channel}; +use std::thread::{self, Thread}; +use std::time::Duration; use util::thread::spawn_named; pub struct TimerScheduler { port: Receiver, scheduled_events: BinaryHeap, + + /// Channel used to schedule a new timeout + to_timeout_helper_tx: Sender, + + /// Channel used to receive a timeout + from_timeout_helper_rx: Receiver<()>, + + /// Used to wake-up the timeout helper thread + timeout_helper: Thread, } struct ScheduledEvent { @@ -42,14 +53,41 @@ impl PartialEq for ScheduledEvent { } } -fn recv_with_timeout(port: &Receiver, from: NsDuration, timeout: NsDuration) -> Option { +// TODO(emilio): This can be vastly simplified once action is taken in +// https://github.com/rust-lang/rfcs/issues/962. +// +// Another way to do this more cleanly would be returning a +// `TimerSchedulerHandler`, that would contain the thread handle and the sender, +// and make the sender take care of `send()`ing, then waking up the thread. +// +// Unfortunately, this wouldn't be ipc-safe. +// +// Also, this could be a method in `TimerScheduler` if we wouldn't use it while +// holding the topmost event in the heap. +#[allow(unsafe_code)] // due to select! +fn recv_until(port: &Receiver, + until: NsDuration, + timeout_rx: &Receiver<()>, + timeout_tx: &Sender, + timeout_thread: &Thread) -> Option { + if let Ok(ret) = port.try_recv() { + return Some(ret); + } + + timeout_tx.send(until).expect("send to TimeoutHelper failed"); loop { - if let Ok(ret) = port.try_recv() { - return Some(ret); - } - let now = precise_time_ns(); - if now - from >= timeout { - return None; + select! { + msg = port.recv() => { + if let Ok(ret) = msg { + timeout_thread.unpark(); + let _ = timeout_rx.recv(); + return Some(ret) + } + continue; + }, + _ = timeout_rx.recv() => { + return None; + } } } } @@ -63,9 +101,37 @@ impl TimerScheduler { pub fn start() -> IpcSender { let (chan, port) = ipc::channel().unwrap(); + let (to_timeout_helper_tx, to_timeout_helper_rx) = channel::(); + let (from_timeout_helper_tx, from_timeout_helper_rx) = channel::<()>(); + + let helper_thread = thread::spawn(move || { + loop { + let until = match to_timeout_helper_rx.recv() { + Ok(until) => until, + Err(_) => continue, + }; + + let now = precise_time_ns(); + if until < now { + from_timeout_helper_tx.send(()) + .expect("Failed to send to TimerScheduler"); + continue; + } + + let duration = (until - now).get(); + let duration_secs = duration / 1_000_000_000; + let duration_ns = duration % 1_000_000_000; + thread::park_timeout(Duration::new(duration_secs, duration_ns as u32)); + let _ = from_timeout_helper_tx.send(()); + } + }).thread().clone(); + let mut timer_scheduler = TimerScheduler { port: ROUTER.route_ipc_receiver_to_new_mpsc_receiver(port), scheduled_events: BinaryHeap::new(), + to_timeout_helper_tx: to_timeout_helper_tx, + from_timeout_helper_rx: from_timeout_helper_rx, + timeout_helper: helper_thread, }; spawn_named("TimerScheduler".to_owned(), move || { @@ -82,8 +148,11 @@ impl TimerScheduler { return Some(Task::DispatchDueEvents); } - let timeout = event.for_time - now; - recv_with_timeout(&self.port, now, timeout).map(Task::HandleRequest) + recv_until(&self.port, + event.for_time, + &self.from_timeout_helper_rx, + &self.to_timeout_helper_tx, + &self.timeout_helper).map(Task::HandleRequest) } else { self.port.recv().ok().map(Task::HandleRequest) } From 82e9efac51cec7bd16e6003a5bb84978fd43eb00 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Emilio=20Cobos=20=C3=81lvarez?= Date: Thu, 19 May 2016 21:37:20 +0200 Subject: [PATCH 3/5] constellation: scheduler: Don't call precise_time_ns() if there are no tasks --- components/constellation/timer_scheduler.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/components/constellation/timer_scheduler.rs b/components/constellation/timer_scheduler.rs index 32c6857d0801..95990f25ef23 100644 --- a/components/constellation/timer_scheduler.rs +++ b/components/constellation/timer_scheduler.rs @@ -142,8 +142,8 @@ impl TimerScheduler { } fn get_next_task(&mut self) -> Option { - let now = precise_time_ns(); if let Some(event) = self.scheduled_events.peek() { + let now = precise_time_ns(); if event.for_time < now { return Some(Task::DispatchDueEvents); } From 664c4c01e698b62a98e0d6ac5208fe678d128b21 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Emilio=20Cobos=20=C3=81lvarez?= Date: Fri, 20 May 2016 02:36:39 +0200 Subject: [PATCH 4/5] Revert "constellation: Make the scheduler not spin while waiting" This reverts commit a9879a828bfb7823f0a216834c78ef0415b4a081. --- components/constellation/timer_scheduler.rs | 89 +++------------------ 1 file changed, 10 insertions(+), 79 deletions(-) diff --git a/components/constellation/timer_scheduler.rs b/components/constellation/timer_scheduler.rs index 95990f25ef23..b7863a049fe3 100644 --- a/components/constellation/timer_scheduler.rs +++ b/components/constellation/timer_scheduler.rs @@ -9,24 +9,13 @@ use script_traits::{NsDuration, precise_time_ns}; use script_traits::{TimerEvent, TimerEventRequest}; use std::cmp::{self, Ord}; use std::collections::BinaryHeap; -use std::sync::mpsc::{Receiver, Sender, channel}; -use std::thread::{self, Thread}; -use std::time::Duration; +use std::sync::mpsc::Receiver; use util::thread::spawn_named; pub struct TimerScheduler { port: Receiver, scheduled_events: BinaryHeap, - - /// Channel used to schedule a new timeout - to_timeout_helper_tx: Sender, - - /// Channel used to receive a timeout - from_timeout_helper_rx: Receiver<()>, - - /// Used to wake-up the timeout helper thread - timeout_helper: Thread, } struct ScheduledEvent { @@ -53,41 +42,14 @@ impl PartialEq for ScheduledEvent { } } -// TODO(emilio): This can be vastly simplified once action is taken in -// https://github.com/rust-lang/rfcs/issues/962. -// -// Another way to do this more cleanly would be returning a -// `TimerSchedulerHandler`, that would contain the thread handle and the sender, -// and make the sender take care of `send()`ing, then waking up the thread. -// -// Unfortunately, this wouldn't be ipc-safe. -// -// Also, this could be a method in `TimerScheduler` if we wouldn't use it while -// holding the topmost event in the heap. -#[allow(unsafe_code)] // due to select! -fn recv_until(port: &Receiver, - until: NsDuration, - timeout_rx: &Receiver<()>, - timeout_tx: &Sender, - timeout_thread: &Thread) -> Option { - if let Ok(ret) = port.try_recv() { - return Some(ret); - } - - timeout_tx.send(until).expect("send to TimeoutHelper failed"); +fn recv_with_timeout(port: &Receiver, from: NsDuration, timeout: NsDuration) -> Option { loop { - select! { - msg = port.recv() => { - if let Ok(ret) = msg { - timeout_thread.unpark(); - let _ = timeout_rx.recv(); - return Some(ret) - } - continue; - }, - _ = timeout_rx.recv() => { - return None; - } + if let Ok(ret) = port.try_recv() { + return Some(ret); + } + let now = precise_time_ns(); + if now - from >= timeout { + return None; } } } @@ -101,37 +63,9 @@ impl TimerScheduler { pub fn start() -> IpcSender { let (chan, port) = ipc::channel().unwrap(); - let (to_timeout_helper_tx, to_timeout_helper_rx) = channel::(); - let (from_timeout_helper_tx, from_timeout_helper_rx) = channel::<()>(); - - let helper_thread = thread::spawn(move || { - loop { - let until = match to_timeout_helper_rx.recv() { - Ok(until) => until, - Err(_) => continue, - }; - - let now = precise_time_ns(); - if until < now { - from_timeout_helper_tx.send(()) - .expect("Failed to send to TimerScheduler"); - continue; - } - - let duration = (until - now).get(); - let duration_secs = duration / 1_000_000_000; - let duration_ns = duration % 1_000_000_000; - thread::park_timeout(Duration::new(duration_secs, duration_ns as u32)); - let _ = from_timeout_helper_tx.send(()); - } - }).thread().clone(); - let mut timer_scheduler = TimerScheduler { port: ROUTER.route_ipc_receiver_to_new_mpsc_receiver(port), scheduled_events: BinaryHeap::new(), - to_timeout_helper_tx: to_timeout_helper_tx, - from_timeout_helper_rx: from_timeout_helper_rx, - timeout_helper: helper_thread, }; spawn_named("TimerScheduler".to_owned(), move || { @@ -148,11 +82,8 @@ impl TimerScheduler { return Some(Task::DispatchDueEvents); } - recv_until(&self.port, - event.for_time, - &self.from_timeout_helper_rx, - &self.to_timeout_helper_tx, - &self.timeout_helper).map(Task::HandleRequest) + let timeout = event.for_time - now; + recv_with_timeout(&self.port, now, timeout).map(Task::HandleRequest) } else { self.port.recv().ok().map(Task::HandleRequest) } From 65e0b486ceff34133f778d80516fdb8babc84330 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Emilio=20Cobos=20=C3=81lvarez?= Date: Fri, 20 May 2016 02:45:37 +0200 Subject: [PATCH 5/5] constellation: scheduler: Make the second thread unnecessary relying on the implementation of std::sync::mpsc::Sender. Optimal, but not ideal. I have a PR in progress to implement recv_timeout() --- components/constellation/timer_scheduler.rs | 27 +++++++++++++-------- 1 file changed, 17 insertions(+), 10 deletions(-) diff --git a/components/constellation/timer_scheduler.rs b/components/constellation/timer_scheduler.rs index b7863a049fe3..8b2834ed8a30 100644 --- a/components/constellation/timer_scheduler.rs +++ b/components/constellation/timer_scheduler.rs @@ -10,6 +10,8 @@ use script_traits::{TimerEvent, TimerEventRequest}; use std::cmp::{self, Ord}; use std::collections::BinaryHeap; use std::sync::mpsc::Receiver; +use std::thread; +use std::time::Duration; use util::thread::spawn_named; pub struct TimerScheduler { @@ -42,16 +44,21 @@ impl PartialEq for ScheduledEvent { } } -fn recv_with_timeout(port: &Receiver, from: NsDuration, timeout: NsDuration) -> Option { - loop { - if let Ok(ret) = port.try_recv() { - return Some(ret); - } - let now = precise_time_ns(); - if now - from >= timeout { - return None; - } +fn recv_with_timeout(port: &Receiver, timeout: NsDuration) -> Option { + if let Ok(ret) = port.try_recv() { + return Some(ret); } + + let timeout = timeout.get(); + let seconds = timeout / 1_000_000_000; + let nanos = timeout % 1_000_000_000; + // NB: This relies on the fact that Sender::send() calls unpark() on the + // receiver thread. + // + // This is probably stable enough to rely on it until + // https://github.com/rust-lang/rfcs/issues/962 is implemented. + thread::park_timeout(Duration::new(seconds, nanos as u32)); + port.try_recv().ok() } enum Task { @@ -83,7 +90,7 @@ impl TimerScheduler { } let timeout = event.for_time - now; - recv_with_timeout(&self.port, now, timeout).map(Task::HandleRequest) + recv_with_timeout(&self.port, timeout).map(Task::HandleRequest) } else { self.port.recv().ok().map(Task::HandleRequest) }