diff --git a/backends/gstreamer/src/lib.rs b/backends/gstreamer/src/lib.rs index e004b077..c70a2aa9 100644 --- a/backends/gstreamer/src/lib.rs +++ b/backends/gstreamer/src/lib.rs @@ -35,6 +35,8 @@ pub enum BackendError { PlayerError(String), PlayerPushDataFailed, PlayerEOSFailed, + PlayerNonSeekable, + PlayerSeekOutOfRange, PlayerSourceSetupFailed, SetPropertyFailed(&'static str), StateChangeFailed, diff --git a/backends/gstreamer/src/player.rs b/backends/gstreamer/src/player.rs index 9f65f1aa..300f8a17 100644 --- a/backends/gstreamer/src/player.rs +++ b/backends/gstreamer/src/player.rs @@ -2,13 +2,13 @@ use super::BackendError; use glib; use glib::*; use gst; -use gst_app; +use gst_app::{self, AppSrcCallbacks, AppStreamType}; use gst_player; use gst_player::{PlayerMediaInfo, PlayerStreamInfoExt}; use ipc_channel::ipc::IpcSender; use servo_media_player::frame::{Frame, FrameRenderer}; use servo_media_player::metadata::Metadata; -use servo_media_player::{PlaybackState, Player, PlayerEvent}; +use servo_media_player::{PlaybackState, Player, PlayerEvent, StreamType}; use std::cell::RefCell; use std::error::Error; use std::sync::mpsc; @@ -86,18 +86,27 @@ struct PlayerInner { appsrc: Option, appsink: gst_app::AppSink, input_size: u64, + stream_type: Option, subscribers: Vec>, renderers: Vec>>, last_metadata: Option, } impl PlayerInner { - pub fn register_event_handler(&mut self, sender: IpcSender) { + pub fn register_event_handler( + &mut self, + sender: IpcSender, + ) -> Result<(), BackendError> { self.subscribers.push(sender); + Ok(()) } - pub fn register_frame_renderer(&mut self, renderer: Arc>) { + pub fn register_frame_renderer( + &mut self, + renderer: Arc>, + ) -> Result<(), BackendError> { self.renderers.push(renderer); + Ok(()) } pub fn notify(&self, event: PlayerEvent) { @@ -116,22 +125,92 @@ impl PlayerInner { Ok(()) } - pub fn set_input_size(&mut self, size: u64) { + pub fn set_input_size(&mut self, size: u64) -> Result<(), BackendError> { + // Set input_size to proxy its value, since it + // could be set by the user before calling .setup(). self.input_size = size; + if let Some(ref mut appsrc) = self.appsrc { + if size > 0 { + appsrc.set_size(size as i64); + } else { + appsrc.set_size(-1); // live source + } + } + Ok(()) + } + + pub fn set_stream_type(&mut self, type_: StreamType) -> Result<(), BackendError> { + let type_ = match type_ { + StreamType::Stream => AppStreamType::Stream, + StreamType::Seekable => AppStreamType::Seekable, + StreamType::RandomAccess => AppStreamType::RandomAccess, + }; + // Set stream_type to proxy its value, since it + // could be set by the user before calling .setup(). + self.stream_type = Some(type_); + if let Some(ref appsrc) = self.appsrc { + appsrc.set_stream_type(type_); + } + Ok(()) } - pub fn play(&mut self) { + pub fn play(&mut self) -> Result<(), BackendError> { self.player.play(); + Ok(()) } - pub fn stop(&mut self) { + pub fn stop(&mut self) -> Result<(), BackendError> { self.player.stop(); self.last_metadata = None; self.appsrc = None; + Ok(()) } - pub fn pause(&mut self) { + pub fn pause(&mut self) -> Result<(), BackendError> { self.player.pause(); + Ok(()) + } + + pub fn end_of_stream(&mut self) -> Result<(), BackendError> { + if let Some(ref mut appsrc) = self.appsrc { + if appsrc.end_of_stream() == gst::FlowReturn::Ok { + return Ok(()); + } + } + Err(BackendError::PlayerEOSFailed) + } + + pub fn seek(&mut self, time: f64) -> Result<(), BackendError> { + // XXX Support AppStreamType::RandomAccess. The callback model changes + // if the stream type is set to RandomAccess (i.e. the seek-data + // callback is received right after pushing the first chunk of data, + // even if player.seek() is not called). + if self.stream_type.is_none() || self.stream_type.unwrap() != AppStreamType::Seekable { + return Err(BackendError::PlayerNonSeekable); + } + if let Some(ref metadata) = self.last_metadata { + if let Some(ref duration) = metadata.duration { + if duration < &time::Duration::new(time as u64, 0) { + eprintln!("Trying to seek out of range"); + return Err(BackendError::PlayerSeekOutOfRange); + } + } + } + + let time = time * 1_000_000_000.; + self.player.seek(gst::ClockTime::from_nseconds(time as u64)); + Ok(()) + } + + pub fn push_data(&mut self, data: Vec) -> Result<(), BackendError> { + if let Some(ref mut appsrc) = self.appsrc { + let buffer = + gst::Buffer::from_slice(data).ok_or_else(|| BackendError::PlayerPushDataFailed)?; + if appsrc.push_buffer(buffer) == gst::FlowReturn::Ok { + return Ok(()); + } + } + Err(BackendError::PlayerPushDataFailed) } pub fn set_app_src(&mut self, appsrc: gst_app::AppSrc) { @@ -190,6 +269,7 @@ impl GStreamerPlayer { appsrc: None, appsink: video_sink, input_size: 0, + stream_type: None, subscribers: Vec::new(), renderers: Vec::new(), last_metadata: None, @@ -243,6 +323,18 @@ impl GStreamerPlayer { } }); + let inner_clone = inner.clone(); + inner + .lock() + .unwrap() + .player + .connect_seek_done(move |_, position| { + if let Some(seconds) = position.seconds() { + let inner = inner_clone.lock().unwrap(); + inner.notify(PlayerEvent::SeekDone(seconds)); + } + }); + let inner_clone = inner.clone(); inner .lock() @@ -275,7 +367,10 @@ impl GStreamerPlayer { eprintln!("Could not get duration seconds"); return; } - Some(time::Duration::new(seconds.unwrap(), (nanos.unwrap() % 1_000_000_000) as u32)) + Some(time::Duration::new( + seconds.unwrap(), + (nanos.unwrap() % 1_000_000_000) as u32, + )) } else { None }; @@ -338,6 +433,10 @@ impl GStreamerPlayer { appsrc.set_size(inner.input_size as i64); } + if let Some(ref stream_type) = inner.stream_type { + appsrc.set_stream_type(*stream_type); + } + let sender_clone = sender.clone(); let need_data_id = Arc::new(Mutex::new(None)); @@ -357,6 +456,19 @@ impl GStreamerPlayer { .unwrap(), ); + let inner_clone = inner_clone.clone(); + appsrc.set_callbacks( + AppSrcCallbacks::new() + .seek_data(move |_, offset| { + inner_clone + .lock() + .unwrap() + .notify(PlayerEvent::SeekData(offset)); + true + }) + .build(), + ); + inner.set_app_src(appsrc); None @@ -379,7 +491,7 @@ impl GStreamerPlayer { player.stop(); }); - inner.pause(); + let _ = inner.pause(); (receiver, error_handler_id) }; @@ -390,96 +502,37 @@ impl GStreamerPlayer { } } -impl Player for GStreamerPlayer { - type Error = BackendError; - fn register_event_handler(&self, sender: IpcSender) -> Result<(), BackendError> { - self.setup()?; - let inner = self.inner.borrow(); - inner - .as_ref() - .unwrap() - .lock() - .unwrap() - .register_event_handler(sender); - Ok(()) - } - - fn register_frame_renderer( - &self, - renderer: Arc>, - ) -> Result<(), BackendError> { - self.setup()?; - let inner = self.inner.borrow(); - inner - .as_ref() - .unwrap() - .lock() - .unwrap() - .register_frame_renderer(renderer); - Ok(()) - } - - fn set_input_size(&self, size: u64) -> Result<(), BackendError> { - self.setup()?; - // Keep inner's .set_input_size() to proxy its value, since it - // could be set by the user before calling .setup() - let inner = self.inner.borrow(); - let mut inner = inner.as_ref().unwrap().lock().unwrap(); - inner.set_input_size(size); - if let Some(ref mut appsrc) = inner.appsrc { - if size > 0 { - appsrc.set_size(size as i64); - } else { - appsrc.set_size(-1); // live source - } +macro_rules! inner_player_proxy { + ($fn_name:ident) => ( + fn $fn_name(&self) -> Result<(), BackendError> { + self.setup()?; + let inner = self.inner.borrow(); + let mut inner = inner.as_ref().unwrap().lock().unwrap(); + inner.$fn_name() } - Ok(()) - } - - fn play(&self) -> Result<(), BackendError> { - self.setup()?; - let inner = self.inner.borrow(); - inner.as_ref().unwrap().lock().unwrap().play(); - Ok(()) - } - - fn pause(&self) -> Result<(), BackendError> { - self.setup()?; - let inner = self.inner.borrow(); - inner.as_ref().unwrap().lock().unwrap().pause(); - Ok(()) - } - - fn stop(&self) -> Result<(), BackendError> { - self.setup()?; - let inner = self.inner.borrow(); - inner.as_ref().unwrap().lock().unwrap().stop(); - Ok(()) - } - - fn push_data(&self, data: Vec) -> Result<(), BackendError> { - self.setup()?; - let inner = self.inner.borrow(); - let mut inner = inner.as_ref().unwrap().lock().unwrap(); - if let Some(ref mut appsrc) = inner.appsrc { - let buffer = - gst::Buffer::from_slice(data).ok_or_else(|| BackendError::PlayerPushDataFailed)?; - if appsrc.push_buffer(buffer) == gst::FlowReturn::Ok { - return Ok(()); - } + ); + + ($fn_name:ident, $arg1:ident, $arg1_type:ty) => ( + fn $fn_name(&self, $arg1: $arg1_type) -> Result<(), BackendError> { + self.setup()?; + let inner = self.inner.borrow(); + let mut inner = inner.as_ref().unwrap().lock().unwrap(); + inner.$fn_name($arg1) } - Err(BackendError::PlayerPushDataFailed) - } + ) +} - fn end_of_stream(&self) -> Result<(), BackendError> { - self.setup()?; - let inner = self.inner.borrow(); - let mut inner = inner.as_ref().unwrap().lock().unwrap(); - if let Some(ref mut appsrc) = inner.appsrc { - if appsrc.end_of_stream() == gst::FlowReturn::Ok { - return Ok(()); - } - } - Err(BackendError::PlayerEOSFailed) - } +impl Player for GStreamerPlayer { + type Error = BackendError; + + inner_player_proxy!(register_event_handler, sender, IpcSender); + inner_player_proxy!(register_frame_renderer, renderer, Arc>); + inner_player_proxy!(play); + inner_player_proxy!(pause); + inner_player_proxy!(stop); + inner_player_proxy!(end_of_stream); + inner_player_proxy!(set_input_size, size, u64); + inner_player_proxy!(set_stream_type, type_, StreamType); + inner_player_proxy!(push_data, data, Vec); + inner_player_proxy!(seek, time, f64); } diff --git a/examples/player/main.rs b/examples/player/main.rs index f07fc1bf..26c3a36c 100644 --- a/examples/player/main.rs +++ b/examples/player/main.rs @@ -118,6 +118,8 @@ impl PlayerWrapper { } PlayerEvent::FrameUpdated => eprint!("."), PlayerEvent::PositionChanged(_) => (), + PlayerEvent::SeekData(_) => (), + PlayerEvent::SeekDone(_) => (), } } player.lock().unwrap().stop().unwrap(); diff --git a/examples/resources/mov_bbb.mp4 b/examples/resources/mov_bbb.mp4 new file mode 100644 index 00000000..98f8be66 Binary files /dev/null and b/examples/resources/mov_bbb.mp4 differ diff --git a/examples/simple_player.rs b/examples/simple_player.rs index 64ed53bc..0c480c1e 100644 --- a/examples/simple_player.rs +++ b/examples/simple_player.rs @@ -2,15 +2,15 @@ extern crate ipc_channel; extern crate servo_media; use ipc_channel::ipc; -use servo_media::player::PlayerEvent; +use servo_media::player::{PlayerEvent, StreamType}; use servo_media::ServoMedia; use std::env; use std::error::Error; use std::fs::File; -use std::io::BufReader; -use std::io::Read; +use std::io::{BufReader, Read, Seek, SeekFrom}; use std::path::Path; use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::mpsc; use std::sync::{Arc, Mutex}; use std::thread; @@ -49,30 +49,50 @@ fn run_example(servo_media: Arc) { .unwrap(); } + player + .lock() + .unwrap() + .set_stream_type(StreamType::Seekable) + .unwrap(); + let player_clone = Arc::clone(&player); let shutdown = Arc::new(AtomicBool::new(false)); let shutdown_clone = shutdown.clone(); + let seek_done = Arc::new(AtomicBool::new(false)); + let seek_done_clone = seek_done.clone(); + let (seek_sender, seek_receiver) = mpsc::channel(); let t = thread::spawn(move || { let player = &player_clone; let mut buf_reader = BufReader::new(file); - let mut buffer = [0; 8192]; - while !shutdown_clone.load(Ordering::Relaxed) { - match buf_reader.read(&mut buffer[..]) { - Ok(0) => { - println!("finished pushing data"); - break; - } - Ok(size) => player - .lock() - .unwrap() - .push_data(Vec::from(&buffer[0..size])) - .unwrap(), - Err(e) => { - eprintln!("Error: {}", e); - break; + let mut buffer = [0; 1024]; + let mut read = |offset| { + if buf_reader.seek(SeekFrom::Start(offset)).is_err() { + eprintln!("BufReader - Could not seek to {:?}", offset); + } + + while !shutdown_clone.load(Ordering::Relaxed) { + match buf_reader.read(&mut buffer[..]) { + Ok(0) => { + println!("Finished pushing data"); + break; + } + Ok(size) => player + .lock() + .unwrap() + .push_data(Vec::from(&buffer[0..size])) + .unwrap(), + Err(e) => { + eprintln!("Error: {}", e); + break; + } } } - } + }; + + read(0); + read(seek_receiver.recv().unwrap()); + read(seek_receiver.recv().unwrap()); + seek_done_clone.store(true, Ordering::Relaxed); }); player.lock().unwrap().play().unwrap(); @@ -80,21 +100,44 @@ fn run_example(servo_media: Arc) { while let Ok(event) = receiver.recv() { match event { PlayerEvent::EndOfStream => { - println!("EOF"); + println!("\nEOF"); break; } PlayerEvent::Error => { - println!("Error"); + println!("\nError"); break; } PlayerEvent::MetadataUpdated(ref m) => { - println!("Metadata updated! {:?}", m); + println!("\nMetadata updated! {:?}", m); } PlayerEvent::StateChanged(ref s) => { - println!("Player state changed to {:?}", s); + println!("\nPlayer state changed to {:?}", s); } PlayerEvent::FrameUpdated => eprint!("."), - PlayerEvent::PositionChanged(p) => println!("{:?}", p), + PlayerEvent::PositionChanged(p) => { + let player = player.lock().unwrap(); + if seek_done.load(Ordering::Relaxed) { + continue; + } + if p == 1 { + println!("\nPosition changed to 1sec, seeking to 4sec"); + if let Err(e) = player.seek(4.) { + eprintln!("{:?}", e); + } + } + + if p == 9 { + println!("\nPosition changed to 9sec, seeking to 0sec"); + if let Err(e) = player.seek(0.) { + eprintln!("{:?}", e); + } + } + } + PlayerEvent::SeekData(p) => { + println!("\nSeek requested to position {:?}", p); + seek_sender.send(p).unwrap(); + } + PlayerEvent::SeekDone(p) => println!("\nSeeked to {:?}", p), } } diff --git a/player/src/lib.rs b/player/src/lib.rs index 1b933c62..8a5b995f 100644 --- a/player/src/lib.rs +++ b/player/src/lib.rs @@ -24,19 +24,40 @@ pub enum PlayerEvent { FrameUpdated, MetadataUpdated(metadata::Metadata), PositionChanged(u64), + /// The player needs the data to perform a seek to the given offset. + /// The next push_data should get the buffers from the new offset. + /// This event is only received for seekable stream types. + SeekData(u64), + /// The player has performed a seek to the given offset. + SeekDone(u64), StateChanged(PlaybackState), } +#[derive(Clone, Debug, Deserialize, Serialize)] +pub enum StreamType { + /// No seeking is supported in the stream, such as a live stream. + Stream, + /// The stream is seekable but seeking might not be very fast, such as data from a webserver. + Seekable, + /// The stream is seekable and seeking is fast, such as in a local file. + RandomAccess, +} + pub trait Player: Send { type Error: Debug; fn register_event_handler(&self, sender: IpcSender) -> Result<(), Self::Error>; - fn register_frame_renderer(&self, renderer: Arc>) -> Result<(), Self::Error>; + fn register_frame_renderer( + &self, + renderer: Arc>, + ) -> Result<(), Self::Error>; fn play(&self) -> Result<(), Self::Error>; fn pause(&self) -> Result<(), Self::Error>; fn stop(&self) -> Result<(), Self::Error>; + fn seek(&self, time: f64) -> Result<(), Self::Error>; fn set_input_size(&self, size: u64) -> Result<(), Self::Error>; + fn set_stream_type(&self, type_: StreamType) -> Result<(), Self::Error>; fn push_data(&self, data: Vec) -> Result<(), Self::Error>; fn end_of_stream(&self) -> Result<(), Self::Error>; } @@ -45,14 +66,32 @@ pub struct DummyPlayer {} impl Player for DummyPlayer { type Error = (); - fn register_event_handler(&self, _: IpcSender) -> Result<(), ()> { Ok(()) } - fn register_frame_renderer(&self, _: Arc>) -> Result<(), ()> { Ok(()) } + fn register_event_handler(&self, _: IpcSender) -> Result<(), ()> { + Ok(()) + } + fn register_frame_renderer(&self, _: Arc>) -> Result<(), ()> { + Ok(()) + } - fn play(&self) -> Result<(), ()> { Ok(()) } - fn pause(&self) -> Result<(), ()> { Ok(()) } - fn stop(&self) -> Result<(), ()> { Ok(()) } + fn play(&self) -> Result<(), ()> { + Ok(()) + } + fn pause(&self) -> Result<(), ()> { + Ok(()) + } + fn stop(&self) -> Result<(), ()> { + Ok(()) + } + fn seek(&self, _: f64) -> Result<(), ()> { + Ok(()) + } - fn set_input_size(&self, _: u64) -> Result<(), ()> { Ok(()) } + fn set_input_size(&self, _: u64) -> Result<(), ()> { + Ok(()) + } + fn set_stream_type(&self, _: StreamType) -> Result<(), ()> { + Ok(()) + } fn push_data(&self, _: Vec) -> Result<(), ()> { Err(()) }