From ec592309dac49304b46fead8eb275b7ffca87101 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fernando=20Jim=C3=A9nez=20Moreno?= Date: Tue, 28 Aug 2018 15:44:33 +0200 Subject: [PATCH 1/6] Deinterleave decoded audio --- audio/src/decoder.rs | 13 +- backends/gstreamer/src/audio_decoder.rs | 205 +++++++++++++++++------- examples/audio_decoder.rs | 2 +- 3 files changed, 160 insertions(+), 60 deletions(-) diff --git a/audio/src/decoder.rs b/audio/src/decoder.rs index 17bedfd6..12c6ef23 100644 --- a/audio/src/decoder.rs +++ b/audio/src/decoder.rs @@ -4,7 +4,7 @@ use std::sync::Mutex; pub struct AudioDecoderCallbacks { pub eos: Mutex>>, pub error: Mutex>>, - pub progress: Option>) + Send + Sync + 'static>>, + pub progress: Option>, u32, u32) + Send + Sync + 'static>>, } impl AudioDecoderCallbacks { @@ -32,10 +32,10 @@ impl AudioDecoderCallbacks { }; } - pub fn progress(&self, buffer: Box>) { + pub fn progress(&self, buffer: Box>, channel: u32, channels: u32) { match self.progress { None => return, - Some(ref callback) => callback(buffer), + Some(ref callback) => callback(buffer, channel, channels), }; } } @@ -46,7 +46,7 @@ unsafe impl Sync for AudioDecoderCallbacks {} pub struct AudioDecoderCallbacksBuilder { eos: Option>, error: Option>, - progress: Option>) + Send + Sync + 'static>>, + progress: Option>, u32, u32) + Send + Sync + 'static>>, } impl AudioDecoderCallbacksBuilder { @@ -64,7 +64,10 @@ impl AudioDecoderCallbacksBuilder { } } - pub fn progress>) + Send + Sync + 'static>(self, progress: F) -> Self { + pub fn progress>, u32, u32) + Send + Sync + 'static>( + self, + progress: F, + ) -> Self { Self { progress: Some(Box::new(progress)), ..self diff --git a/backends/gstreamer/src/audio_decoder.rs b/backends/gstreamer/src/audio_decoder.rs index 70dbb309..7110540c 100644 --- a/backends/gstreamer/src/audio_decoder.rs +++ b/backends/gstreamer/src/audio_decoder.rs @@ -67,12 +67,30 @@ impl AudioDecoder for GStreamerAudioDecoder { let pipeline_ = pipeline.clone(); let callbacks_ = callbacks.clone(); + // Initial pipeline looks like + // + // appsrc ! decodebin2! ... + // + // We plug in the second part of the pipeline, including the deinterleave element, + // once the media starts being decoded. decodebin.connect_pad_added(move |_, src_pad| { - // We only want a sink per audio stream. + // Ignore any additional source pads just in case. if src_pad.is_linked() { return; } + // A decodebin pad was added, if this is an audio file, + // plug in a deinterleave element to separate each planar channel. + // + // Sub pipeline looks like + // + // ... decodebin2 ! audioconvert ! audioresample ! capsfilter ! deinterleave ... + // + // deinterleave also uses a sometime-pad, so we need to wait until + // a pad for a planar channel is added to plug in the last part of + // the pipeline, with the appsink that will be pulling the data from + // each channel. + let pipeline = &pipeline_; let callbacks = &callbacks_; @@ -97,68 +115,147 @@ impl AudioDecoder for GStreamerAudioDecoder { return callbacks.error(); } - let insert_sink = || -> Result<(), ()> { - let queue = gst::ElementFactory::make("queue", None).ok_or(())?; + let sample_audio_info = match gst_audio::AudioInfo::from_caps(&caps) { + Some(sample_audio_info) => sample_audio_info, + None => return callbacks.error(), + }; + let channels = sample_audio_info.channels(); + + let insert_deinterleave = || -> Result<(), ()> { let convert = gst::ElementFactory::make("audioconvert", None).ok_or(())?; let resample = gst::ElementFactory::make("audioresample", None).ok_or(())?; - let sink = gst::ElementFactory::make("appsink", None).ok_or(())?; - let appsink = sink.clone().dynamic_cast::().map_err(|_| ())?; - sink.set_property("sync", &false.to_value()) + let filter = gst::ElementFactory::make("capsfilter", None).ok_or(())?; + let deinterleave = + gst::ElementFactory::make("deinterleave", Some("deinterleave")).ok_or(())?; + + deinterleave + .set_property("keep-positions", &true.to_value()) .map_err(|_| ())?; + let pipeline_ = pipeline.clone(); + let callbacks_ = callbacks.clone(); + deinterleave.connect_pad_added(move |_, src_pad| { + // A new pad for a planar channel was added in deinterleave. + // Plug in an appsink so we can pull the data from each channel. + // + // The end of the pipeline looks like: + // + // ... deinterleave ! queue ! appsink. + let pipeline = &pipeline_; + let callbacks = &callbacks_; + let insert_sink = || -> Result<(), ()> { + let queue = gst::ElementFactory::make("queue", None).ok_or(())?; + let sink = gst::ElementFactory::make("appsink", None).ok_or(())?; + let appsink = sink.clone().dynamic_cast::().map_err(|_| ())?; + sink.set_property("sync", &false.to_value()) + .map_err(|_| ())?; + + let pipeline_ = pipeline.clone(); + let pipeline__ = pipeline.clone(); + let callbacks_ = callbacks.clone(); + let callbacks__ = callbacks.clone(); + appsink.set_callbacks( + AppSinkCallbacks::new() + .new_sample(move |appsink| { + let sample = match appsink.pull_sample() { + None => { + return gst::FlowReturn::Eos; + } + Some(sample) => sample, + }; + + let buffer = if let Some(buffer) = sample.get_buffer() { + buffer + } else { + callbacks_.error(); + let _ = pipeline_.set_state(gst::State::Null); + return gst::FlowReturn::Error; + }; + + let caps = if let Some(caps) = sample.get_caps() { + caps + } else { + callbacks_.error(); + let _ = pipeline_.set_state(gst::State::Null); + return gst::FlowReturn::Error; + }; + + let audio_info = if let Some(audio_info) = + gst_audio::AudioInfo::from_caps(&caps) + { + audio_info + } else { + callbacks_.error(); + let _ = pipeline_.set_state(gst::State::Null); + return gst::FlowReturn::Error; + }; + assert_eq!(audio_info.channels(), 1); + let positions = if let Some(positions) = audio_info.positions() + { + positions + } else { + callbacks_.error(); + let _ = pipeline_.set_state(gst::State::Null); + return gst::FlowReturn::Error; + }; + + for position in positions.iter() { + let buffer = buffer.clone(); + let map = + if let Ok(map) = buffer.into_mapped_buffer_readable() { + map + } else { + callbacks_.error(); + let _ = pipeline_.set_state(gst::State::Null); + return gst::FlowReturn::Error; + }; + + let progress = Box::new(GStreamerAudioDecoderProgress(map)); + let channel = position.to_mask() as u32; + callbacks_.progress(progress, channel, channels); + } + + gst::FlowReturn::Ok + }) + .eos(move |_| { + callbacks__.eos(channels); + let _ = pipeline__.set_state(gst::State::Null); + }) + .build(), + ); + + let elements = &[&queue, &sink]; + pipeline.add_many(elements).map_err(|_| ())?; + gst::Element::link_many(elements).map_err(|_| ())?; + + for e in elements { + e.sync_state_with_parent().map_err(|_| ())?; + } + + let sink_pad = queue.get_static_pad("sink").ok_or(())?; + src_pad + .link(&sink_pad) + .into_result() + .map(|_| ()) + .map_err(|_| ()) + }; + + if insert_sink().is_err() { + callbacks.error(); + } + }); - let sample_audio_info = gst_audio::AudioInfo::from_caps(&caps).ok_or(())?; - let channels = sample_audio_info.channels(); let audio_info = gst_audio::AudioInfo::new( gst_audio::AUDIO_FORMAT_F32, options.sample_rate as u32, channels, ).build() .ok_or(())?; - appsink.set_caps(&audio_info.to_caps().unwrap()); + let caps = audio_info.to_caps().ok_or(())?; + filter + .set_property("caps", &caps.to_value()) + .map_err(|_| ())?; - let pipeline_ = pipeline.clone(); - let pipeline__ = pipeline.clone(); - let callbacks_ = callbacks.clone(); - let callbacks__ = callbacks.clone(); - appsink.set_callbacks( - AppSinkCallbacks::new() - .new_sample(move |appsink| { - let sample = match appsink.pull_sample() { - None => { - return gst::FlowReturn::Eos; - } - Some(sample) => sample, - }; - - let buffer = if let Some(buffer) = sample.get_buffer() { - buffer - } else { - callbacks_.error(); - let _ = pipeline_.set_state(gst::State::Null); - return gst::FlowReturn::Error; - }; - - let map = if let Ok(map) = buffer.into_mapped_buffer_readable() { - map - } else { - callbacks_.error(); - let _ = pipeline_.set_state(gst::State::Null); - return gst::FlowReturn::Error; - }; - - let progress = Box::new(GStreamerAudioDecoderProgress(map)); - callbacks_.progress(progress); - - gst::FlowReturn::Ok - }) - .eos(move |_| { - callbacks__.eos(channels); - let _ = pipeline__.set_state(gst::State::Null); - }) - .build(), - ); - - let elements = &[&queue, &convert, &resample, &sink]; + let elements = &[&convert, &resample, &filter, &deinterleave]; pipeline.add_many(elements).map_err(|_| ())?; gst::Element::link_many(elements).map_err(|_| ())?; @@ -166,7 +263,7 @@ impl AudioDecoder for GStreamerAudioDecoder { e.sync_state_with_parent().map_err(|_| ())?; } - let sink_pad = queue.get_static_pad("sink").ok_or(())?; + let sink_pad = convert.get_static_pad("sink").ok_or(())?; src_pad .link(&sink_pad) .into_result() @@ -174,7 +271,7 @@ impl AudioDecoder for GStreamerAudioDecoder { .map_err(|_| ()) }; - if insert_sink().is_err() { + if insert_deinterleave().is_err() { callbacks.error(); } }); diff --git a/examples/audio_decoder.rs b/examples/audio_decoder.rs index bd472a01..eda6b271 100644 --- a/examples/audio_decoder.rs +++ b/examples/audio_decoder.rs @@ -32,7 +32,7 @@ fn run_example(servo_media: Arc) { .error(|| { eprintln!("Error decoding audio"); }) - .progress(move |buffer| { + .progress(move |buffer, _channel, _channels| { progress .lock() .unwrap() From 528d2cc552d3163c8216e2eaaa500b119138b01a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fernando=20Jim=C3=A9nez=20Moreno?= Date: Tue, 28 Aug 2018 15:48:07 +0200 Subject: [PATCH 2/6] No need to send number of channels through decoder eos callback --- audio/src/decoder.rs | 10 +++++----- backends/gstreamer/src/audio_decoder.rs | 2 +- examples/audio_decoder.rs | 8 ++++---- 3 files changed, 10 insertions(+), 10 deletions(-) diff --git a/audio/src/decoder.rs b/audio/src/decoder.rs index 12c6ef23..39fb2d47 100644 --- a/audio/src/decoder.rs +++ b/audio/src/decoder.rs @@ -2,7 +2,7 @@ use std::boxed::FnBox; use std::sync::Mutex; pub struct AudioDecoderCallbacks { - pub eos: Mutex>>, + pub eos: Mutex>>, pub error: Mutex>>, pub progress: Option>, u32, u32) + Send + Sync + 'static>>, } @@ -16,11 +16,11 @@ impl AudioDecoderCallbacks { } } - pub fn eos(&self, channels: u32) { + pub fn eos(&self) { let eos = self.eos.lock().unwrap().take(); match eos { None => return, - Some(callback) => callback(channels), + Some(callback) => callback(), }; } @@ -44,13 +44,13 @@ unsafe impl Send for AudioDecoderCallbacks {} unsafe impl Sync for AudioDecoderCallbacks {} pub struct AudioDecoderCallbacksBuilder { - eos: Option>, + eos: Option>, error: Option>, progress: Option>, u32, u32) + Send + Sync + 'static>>, } impl AudioDecoderCallbacksBuilder { - pub fn eos(self, eos: F) -> Self { + pub fn eos(self, eos: F) -> Self { Self { eos: Some(Box::new(eos)), ..self diff --git a/backends/gstreamer/src/audio_decoder.rs b/backends/gstreamer/src/audio_decoder.rs index 7110540c..37d0706d 100644 --- a/backends/gstreamer/src/audio_decoder.rs +++ b/backends/gstreamer/src/audio_decoder.rs @@ -217,7 +217,7 @@ impl AudioDecoder for GStreamerAudioDecoder { gst::FlowReturn::Ok }) .eos(move |_| { - callbacks__.eos(channels); + callbacks__.eos(); let _ = pipeline__.set_state(gst::State::Null); }) .build(), diff --git a/examples/audio_decoder.rs b/examples/audio_decoder.rs index eda6b271..bc0246b0 100644 --- a/examples/audio_decoder.rs +++ b/examples/audio_decoder.rs @@ -26,8 +26,8 @@ fn run_example(servo_media: Arc) { let progress = decoded_audio.clone(); let (sender, receiver) = mpsc::channel(); let callbacks = AudioDecoderCallbacks::new() - .eos(move |channels| { - sender.send(channels).unwrap(); + .eos(move || { + sender.send(()).unwrap(); }) .error(|| { eprintln!("Error decoding audio"); @@ -41,8 +41,8 @@ fn run_example(servo_media: Arc) { .build(); context.decode_audio_data(bytes.to_vec(), callbacks); println!("Decoding audio"); - let channels = receiver.recv().unwrap(); - println!("Audio decoded. Channels {}", channels); + receiver.recv().unwrap(); + println!("Audio decoded"); let buffer_source = context.create_node(AudioNodeInit::AudioBufferSourceNode(Default::default())); let dest = context.dest_node(); From 75c825b9bb1783b887114326a80027b9abd18901 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fernando=20Jim=C3=A9nez=20Moreno?= Date: Tue, 28 Aug 2018 16:11:53 +0200 Subject: [PATCH 3/6] Add decoder ready callback --- audio/src/decoder.rs | 29 ++++++++++++++++++++----- backends/gstreamer/src/audio_decoder.rs | 3 ++- examples/audio_decoder.rs | 5 ++++- 3 files changed, 30 insertions(+), 7 deletions(-) diff --git a/audio/src/decoder.rs b/audio/src/decoder.rs index 39fb2d47..5087ae20 100644 --- a/audio/src/decoder.rs +++ b/audio/src/decoder.rs @@ -4,7 +4,8 @@ use std::sync::Mutex; pub struct AudioDecoderCallbacks { pub eos: Mutex>>, pub error: Mutex>>, - pub progress: Option>, u32, u32) + Send + Sync + 'static>>, + pub progress: Option>, u32) + Send + Sync + 'static>>, + pub ready: Mutex>>, } impl AudioDecoderCallbacks { @@ -13,6 +14,7 @@ impl AudioDecoderCallbacks { eos: None, error: None, progress: None, + ready: None, } } @@ -32,10 +34,18 @@ impl AudioDecoderCallbacks { }; } - pub fn progress(&self, buffer: Box>, channel: u32, channels: u32) { + pub fn progress(&self, buffer: Box>, channel: u32) { match self.progress { None => return, - Some(ref callback) => callback(buffer, channel, channels), + Some(ref callback) => callback(buffer, channel), + }; + } + + pub fn ready(&self, channels: u32) { + let ready = self.ready.lock().unwrap().take(); + match ready { + None => return, + Some(callback) => callback(channels), }; } } @@ -46,7 +56,8 @@ unsafe impl Sync for AudioDecoderCallbacks {} pub struct AudioDecoderCallbacksBuilder { eos: Option>, error: Option>, - progress: Option>, u32, u32) + Send + Sync + 'static>>, + progress: Option>, u32) + Send + Sync + 'static>>, + ready: Option>, } impl AudioDecoderCallbacksBuilder { @@ -64,7 +75,7 @@ impl AudioDecoderCallbacksBuilder { } } - pub fn progress>, u32, u32) + Send + Sync + 'static>( + pub fn progress>, u32) + Send + Sync + 'static>( self, progress: F, ) -> Self { @@ -74,11 +85,19 @@ impl AudioDecoderCallbacksBuilder { } } + pub fn ready(self, ready: F) -> Self { + Self { + ready: Some(Box::new(ready)), + ..self + } + } + pub fn build(self) -> AudioDecoderCallbacks { AudioDecoderCallbacks { eos: Mutex::new(self.eos), error: Mutex::new(self.error), progress: self.progress, + ready: Mutex::new(self.ready), } } } diff --git a/backends/gstreamer/src/audio_decoder.rs b/backends/gstreamer/src/audio_decoder.rs index 37d0706d..1c750a63 100644 --- a/backends/gstreamer/src/audio_decoder.rs +++ b/backends/gstreamer/src/audio_decoder.rs @@ -120,6 +120,7 @@ impl AudioDecoder for GStreamerAudioDecoder { None => return callbacks.error(), }; let channels = sample_audio_info.channels(); + callbacks.ready(channels); let insert_deinterleave = || -> Result<(), ()> { let convert = gst::ElementFactory::make("audioconvert", None).ok_or(())?; @@ -211,7 +212,7 @@ impl AudioDecoder for GStreamerAudioDecoder { let progress = Box::new(GStreamerAudioDecoderProgress(map)); let channel = position.to_mask() as u32; - callbacks_.progress(progress, channel, channels); + callbacks_.progress(progress, channel); } gst::FlowReturn::Ok diff --git a/examples/audio_decoder.rs b/examples/audio_decoder.rs index bc0246b0..59d930ee 100644 --- a/examples/audio_decoder.rs +++ b/examples/audio_decoder.rs @@ -32,12 +32,15 @@ fn run_example(servo_media: Arc) { .error(|| { eprintln!("Error decoding audio"); }) - .progress(move |buffer, _channel, _channels| { + .progress(move |buffer, _channel| { progress .lock() .unwrap() .extend_from_slice((*buffer).as_ref()); }) + .ready(move |channels| { + println!("There are {:?} audio channels", channels); + }) .build(); context.decode_audio_data(bytes.to_vec(), callbacks); println!("Decoding audio"); From 882b0fa22a6be7100792081d81d22121dc6caf67 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fernando=20Jim=C3=A9nez=20Moreno?= Date: Tue, 28 Aug 2018 16:32:54 +0200 Subject: [PATCH 4/6] Update decoder example to support multiple channels --- examples/audio_decoder.rs | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/examples/audio_decoder.rs b/examples/audio_decoder.rs index 59d930ee..d1214376 100644 --- a/examples/audio_decoder.rs +++ b/examples/audio_decoder.rs @@ -22,8 +22,9 @@ fn run_example(servo_media: Arc) { let mut file = File::open(filename).unwrap(); let mut bytes = vec![]; file.read_to_end(&mut bytes).unwrap(); - let decoded_audio: Arc>> = Arc::new(Mutex::new(Vec::new())); - let progress = decoded_audio.clone(); + let decoded_audio: Arc>>> = Arc::new(Mutex::new(Vec::new())); + let decoded_audio_ = decoded_audio.clone(); + let decoded_audio__ = decoded_audio.clone(); let (sender, receiver) = mpsc::channel(); let callbacks = AudioDecoderCallbacks::new() .eos(move || { @@ -32,14 +33,16 @@ fn run_example(servo_media: Arc) { .error(|| { eprintln!("Error decoding audio"); }) - .progress(move |buffer, _channel| { - progress - .lock() - .unwrap() - .extend_from_slice((*buffer).as_ref()); + .progress(move |buffer, channel| { + let mut decoded_audio = decoded_audio_.lock().unwrap(); + decoded_audio[(channel - 1) as usize].extend_from_slice((*buffer).as_ref()); }) .ready(move |channels| { println!("There are {:?} audio channels", channels); + decoded_audio__ + .lock() + .unwrap() + .resize(channels as usize, Vec::new()); }) .build(); context.decode_audio_data(bytes.to_vec(), callbacks); From dbfb11244843e0f33c3844a1f2e61c4f4e23a3cd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fernando=20Jim=C3=A9nez=20Moreno?= Date: Wed, 29 Aug 2018 09:51:37 +0200 Subject: [PATCH 5/6] Pass weak refs to pipeline to connect_pad_added closures --- backends/gstreamer/src/audio_decoder.rs | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/backends/gstreamer/src/audio_decoder.rs b/backends/gstreamer/src/audio_decoder.rs index 1c750a63..2165e670 100644 --- a/backends/gstreamer/src/audio_decoder.rs +++ b/backends/gstreamer/src/audio_decoder.rs @@ -65,7 +65,7 @@ impl AudioDecoder for GStreamerAudioDecoder { let options = options.unwrap_or_default(); - let pipeline_ = pipeline.clone(); + let pipeline_ = pipeline.downgrade(); let callbacks_ = callbacks.clone(); // Initial pipeline looks like // @@ -91,8 +91,11 @@ impl AudioDecoder for GStreamerAudioDecoder { // the pipeline, with the appsink that will be pulling the data from // each channel. - let pipeline = &pipeline_; let callbacks = &callbacks_; + let pipeline = match pipeline_.upgrade() { + Some(pipeline) => pipeline, + None => return callbacks.error(), + }; let (is_audio, caps) = { let media_type = src_pad.get_current_caps().and_then(|caps| { @@ -132,7 +135,7 @@ impl AudioDecoder for GStreamerAudioDecoder { deinterleave .set_property("keep-positions", &true.to_value()) .map_err(|_| ())?; - let pipeline_ = pipeline.clone(); + let pipeline_ = pipeline.downgrade(); let callbacks_ = callbacks.clone(); deinterleave.connect_pad_added(move |_, src_pad| { // A new pad for a planar channel was added in deinterleave. @@ -141,8 +144,11 @@ impl AudioDecoder for GStreamerAudioDecoder { // The end of the pipeline looks like: // // ... deinterleave ! queue ! appsink. - let pipeline = &pipeline_; let callbacks = &callbacks_; + let pipeline = match pipeline_.upgrade() { + Some(pipeline) => pipeline, + None => return callbacks.error(), + }; let insert_sink = || -> Result<(), ()> { let queue = gst::ElementFactory::make("queue", None).ok_or(())?; let sink = gst::ElementFactory::make("appsink", None).ok_or(())?; From dd0eb457817b7ff3dea9b3265284414ddb12119b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fernando=20Jim=C3=A9nez=20Moreno?= Date: Wed, 29 Aug 2018 10:36:28 +0200 Subject: [PATCH 6/6] Remove useless srcpad being linked check --- backends/gstreamer/src/audio_decoder.rs | 5 ----- 1 file changed, 5 deletions(-) diff --git a/backends/gstreamer/src/audio_decoder.rs b/backends/gstreamer/src/audio_decoder.rs index 2165e670..03c778ac 100644 --- a/backends/gstreamer/src/audio_decoder.rs +++ b/backends/gstreamer/src/audio_decoder.rs @@ -74,11 +74,6 @@ impl AudioDecoder for GStreamerAudioDecoder { // We plug in the second part of the pipeline, including the deinterleave element, // once the media starts being decoded. decodebin.connect_pad_added(move |_, src_pad| { - // Ignore any additional source pads just in case. - if src_pad.is_linked() { - return; - } - // A decodebin pad was added, if this is an audio file, // plug in a deinterleave element to separate each planar channel. //