diff --git a/audio/src/decoder.rs b/audio/src/decoder.rs index 17bedfd6..5087ae20 100644 --- a/audio/src/decoder.rs +++ b/audio/src/decoder.rs @@ -2,9 +2,10 @@ use std::boxed::FnBox; use std::sync::Mutex; pub struct AudioDecoderCallbacks { - pub eos: Mutex>>, + pub eos: Mutex>>, pub error: Mutex>>, - pub progress: Option>) + Send + Sync + 'static>>, + pub progress: Option>, u32) + Send + Sync + 'static>>, + pub ready: Mutex>>, } impl AudioDecoderCallbacks { @@ -13,14 +14,15 @@ impl AudioDecoderCallbacks { eos: None, error: None, progress: None, + ready: None, } } - pub fn eos(&self, channels: u32) { + pub fn eos(&self) { let eos = self.eos.lock().unwrap().take(); match eos { None => return, - Some(callback) => callback(channels), + Some(callback) => callback(), }; } @@ -32,10 +34,18 @@ impl AudioDecoderCallbacks { }; } - pub fn progress(&self, buffer: Box>) { + pub fn progress(&self, buffer: Box>, channel: u32) { match self.progress { None => return, - Some(ref callback) => callback(buffer), + Some(ref callback) => callback(buffer, channel), + }; + } + + pub fn ready(&self, channels: u32) { + let ready = self.ready.lock().unwrap().take(); + match ready { + None => return, + Some(callback) => callback(channels), }; } } @@ -44,13 +54,14 @@ unsafe impl Send for AudioDecoderCallbacks {} unsafe impl Sync for AudioDecoderCallbacks {} pub struct AudioDecoderCallbacksBuilder { - eos: Option>, + eos: Option>, error: Option>, - progress: Option>) + Send + Sync + 'static>>, + progress: Option>, u32) + Send + Sync + 'static>>, + ready: Option>, } impl AudioDecoderCallbacksBuilder { - pub fn eos(self, eos: F) -> Self { + pub fn eos(self, eos: F) -> Self { Self { eos: Some(Box::new(eos)), ..self @@ -64,18 +75,29 @@ impl AudioDecoderCallbacksBuilder { } } - pub fn progress>) + Send + Sync + 'static>(self, progress: F) -> Self { + pub fn progress>, u32) + Send + Sync + 'static>( + self, + progress: F, + ) -> Self { Self { progress: Some(Box::new(progress)), ..self } } + pub fn ready(self, ready: F) -> Self { + Self { + ready: Some(Box::new(ready)), + ..self + } + } + pub fn build(self) -> AudioDecoderCallbacks { AudioDecoderCallbacks { eos: Mutex::new(self.eos), error: Mutex::new(self.error), progress: self.progress, + ready: Mutex::new(self.ready), } } } diff --git a/backends/gstreamer/src/audio_decoder.rs b/backends/gstreamer/src/audio_decoder.rs index 70dbb309..03c778ac 100644 --- a/backends/gstreamer/src/audio_decoder.rs +++ b/backends/gstreamer/src/audio_decoder.rs @@ -65,16 +65,32 @@ impl AudioDecoder for GStreamerAudioDecoder { let options = options.unwrap_or_default(); - let pipeline_ = pipeline.clone(); + let pipeline_ = pipeline.downgrade(); let callbacks_ = callbacks.clone(); + // Initial pipeline looks like + // + // appsrc ! decodebin2! ... + // + // We plug in the second part of the pipeline, including the deinterleave element, + // once the media starts being decoded. decodebin.connect_pad_added(move |_, src_pad| { - // We only want a sink per audio stream. - if src_pad.is_linked() { - return; - } + // A decodebin pad was added, if this is an audio file, + // plug in a deinterleave element to separate each planar channel. + // + // Sub pipeline looks like + // + // ... decodebin2 ! audioconvert ! audioresample ! capsfilter ! deinterleave ... + // + // deinterleave also uses a sometime-pad, so we need to wait until + // a pad for a planar channel is added to plug in the last part of + // the pipeline, with the appsink that will be pulling the data from + // each channel. - let pipeline = &pipeline_; let callbacks = &callbacks_; + let pipeline = match pipeline_.upgrade() { + Some(pipeline) => pipeline, + None => return callbacks.error(), + }; let (is_audio, caps) = { let media_type = src_pad.get_current_caps().and_then(|caps| { @@ -97,68 +113,151 @@ impl AudioDecoder for GStreamerAudioDecoder { return callbacks.error(); } - let insert_sink = || -> Result<(), ()> { - let queue = gst::ElementFactory::make("queue", None).ok_or(())?; + let sample_audio_info = match gst_audio::AudioInfo::from_caps(&caps) { + Some(sample_audio_info) => sample_audio_info, + None => return callbacks.error(), + }; + let channels = sample_audio_info.channels(); + callbacks.ready(channels); + + let insert_deinterleave = || -> Result<(), ()> { let convert = gst::ElementFactory::make("audioconvert", None).ok_or(())?; let resample = gst::ElementFactory::make("audioresample", None).ok_or(())?; - let sink = gst::ElementFactory::make("appsink", None).ok_or(())?; - let appsink = sink.clone().dynamic_cast::().map_err(|_| ())?; - sink.set_property("sync", &false.to_value()) + let filter = gst::ElementFactory::make("capsfilter", None).ok_or(())?; + let deinterleave = + gst::ElementFactory::make("deinterleave", Some("deinterleave")).ok_or(())?; + + deinterleave + .set_property("keep-positions", &true.to_value()) .map_err(|_| ())?; + let pipeline_ = pipeline.downgrade(); + let callbacks_ = callbacks.clone(); + deinterleave.connect_pad_added(move |_, src_pad| { + // A new pad for a planar channel was added in deinterleave. + // Plug in an appsink so we can pull the data from each channel. + // + // The end of the pipeline looks like: + // + // ... deinterleave ! queue ! appsink. + let callbacks = &callbacks_; + let pipeline = match pipeline_.upgrade() { + Some(pipeline) => pipeline, + None => return callbacks.error(), + }; + let insert_sink = || -> Result<(), ()> { + let queue = gst::ElementFactory::make("queue", None).ok_or(())?; + let sink = gst::ElementFactory::make("appsink", None).ok_or(())?; + let appsink = sink.clone().dynamic_cast::().map_err(|_| ())?; + sink.set_property("sync", &false.to_value()) + .map_err(|_| ())?; + + let pipeline_ = pipeline.clone(); + let pipeline__ = pipeline.clone(); + let callbacks_ = callbacks.clone(); + let callbacks__ = callbacks.clone(); + appsink.set_callbacks( + AppSinkCallbacks::new() + .new_sample(move |appsink| { + let sample = match appsink.pull_sample() { + None => { + return gst::FlowReturn::Eos; + } + Some(sample) => sample, + }; + + let buffer = if let Some(buffer) = sample.get_buffer() { + buffer + } else { + callbacks_.error(); + let _ = pipeline_.set_state(gst::State::Null); + return gst::FlowReturn::Error; + }; + + let caps = if let Some(caps) = sample.get_caps() { + caps + } else { + callbacks_.error(); + let _ = pipeline_.set_state(gst::State::Null); + return gst::FlowReturn::Error; + }; + + let audio_info = if let Some(audio_info) = + gst_audio::AudioInfo::from_caps(&caps) + { + audio_info + } else { + callbacks_.error(); + let _ = pipeline_.set_state(gst::State::Null); + return gst::FlowReturn::Error; + }; + assert_eq!(audio_info.channels(), 1); + let positions = if let Some(positions) = audio_info.positions() + { + positions + } else { + callbacks_.error(); + let _ = pipeline_.set_state(gst::State::Null); + return gst::FlowReturn::Error; + }; + + for position in positions.iter() { + let buffer = buffer.clone(); + let map = + if let Ok(map) = buffer.into_mapped_buffer_readable() { + map + } else { + callbacks_.error(); + let _ = pipeline_.set_state(gst::State::Null); + return gst::FlowReturn::Error; + }; + + let progress = Box::new(GStreamerAudioDecoderProgress(map)); + let channel = position.to_mask() as u32; + callbacks_.progress(progress, channel); + } + + gst::FlowReturn::Ok + }) + .eos(move |_| { + callbacks__.eos(); + let _ = pipeline__.set_state(gst::State::Null); + }) + .build(), + ); + + let elements = &[&queue, &sink]; + pipeline.add_many(elements).map_err(|_| ())?; + gst::Element::link_many(elements).map_err(|_| ())?; + + for e in elements { + e.sync_state_with_parent().map_err(|_| ())?; + } + + let sink_pad = queue.get_static_pad("sink").ok_or(())?; + src_pad + .link(&sink_pad) + .into_result() + .map(|_| ()) + .map_err(|_| ()) + }; + + if insert_sink().is_err() { + callbacks.error(); + } + }); - let sample_audio_info = gst_audio::AudioInfo::from_caps(&caps).ok_or(())?; - let channels = sample_audio_info.channels(); let audio_info = gst_audio::AudioInfo::new( gst_audio::AUDIO_FORMAT_F32, options.sample_rate as u32, channels, ).build() .ok_or(())?; - appsink.set_caps(&audio_info.to_caps().unwrap()); + let caps = audio_info.to_caps().ok_or(())?; + filter + .set_property("caps", &caps.to_value()) + .map_err(|_| ())?; - let pipeline_ = pipeline.clone(); - let pipeline__ = pipeline.clone(); - let callbacks_ = callbacks.clone(); - let callbacks__ = callbacks.clone(); - appsink.set_callbacks( - AppSinkCallbacks::new() - .new_sample(move |appsink| { - let sample = match appsink.pull_sample() { - None => { - return gst::FlowReturn::Eos; - } - Some(sample) => sample, - }; - - let buffer = if let Some(buffer) = sample.get_buffer() { - buffer - } else { - callbacks_.error(); - let _ = pipeline_.set_state(gst::State::Null); - return gst::FlowReturn::Error; - }; - - let map = if let Ok(map) = buffer.into_mapped_buffer_readable() { - map - } else { - callbacks_.error(); - let _ = pipeline_.set_state(gst::State::Null); - return gst::FlowReturn::Error; - }; - - let progress = Box::new(GStreamerAudioDecoderProgress(map)); - callbacks_.progress(progress); - - gst::FlowReturn::Ok - }) - .eos(move |_| { - callbacks__.eos(channels); - let _ = pipeline__.set_state(gst::State::Null); - }) - .build(), - ); - - let elements = &[&queue, &convert, &resample, &sink]; + let elements = &[&convert, &resample, &filter, &deinterleave]; pipeline.add_many(elements).map_err(|_| ())?; gst::Element::link_many(elements).map_err(|_| ())?; @@ -166,7 +265,7 @@ impl AudioDecoder for GStreamerAudioDecoder { e.sync_state_with_parent().map_err(|_| ())?; } - let sink_pad = queue.get_static_pad("sink").ok_or(())?; + let sink_pad = convert.get_static_pad("sink").ok_or(())?; src_pad .link(&sink_pad) .into_result() @@ -174,7 +273,7 @@ impl AudioDecoder for GStreamerAudioDecoder { .map_err(|_| ()) }; - if insert_sink().is_err() { + if insert_deinterleave().is_err() { callbacks.error(); } }); diff --git a/examples/audio_decoder.rs b/examples/audio_decoder.rs index bd472a01..d1214376 100644 --- a/examples/audio_decoder.rs +++ b/examples/audio_decoder.rs @@ -22,27 +22,33 @@ fn run_example(servo_media: Arc) { let mut file = File::open(filename).unwrap(); let mut bytes = vec![]; file.read_to_end(&mut bytes).unwrap(); - let decoded_audio: Arc>> = Arc::new(Mutex::new(Vec::new())); - let progress = decoded_audio.clone(); + let decoded_audio: Arc>>> = Arc::new(Mutex::new(Vec::new())); + let decoded_audio_ = decoded_audio.clone(); + let decoded_audio__ = decoded_audio.clone(); let (sender, receiver) = mpsc::channel(); let callbacks = AudioDecoderCallbacks::new() - .eos(move |channels| { - sender.send(channels).unwrap(); + .eos(move || { + sender.send(()).unwrap(); }) .error(|| { eprintln!("Error decoding audio"); }) - .progress(move |buffer| { - progress + .progress(move |buffer, channel| { + let mut decoded_audio = decoded_audio_.lock().unwrap(); + decoded_audio[(channel - 1) as usize].extend_from_slice((*buffer).as_ref()); + }) + .ready(move |channels| { + println!("There are {:?} audio channels", channels); + decoded_audio__ .lock() .unwrap() - .extend_from_slice((*buffer).as_ref()); + .resize(channels as usize, Vec::new()); }) .build(); context.decode_audio_data(bytes.to_vec(), callbacks); println!("Decoding audio"); - let channels = receiver.recv().unwrap(); - println!("Audio decoded. Channels {}", channels); + receiver.recv().unwrap(); + println!("Audio decoded"); let buffer_source = context.create_node(AudioNodeInit::AudioBufferSourceNode(Default::default())); let dest = context.dest_node();