scuffle_rtmp/
lib.rs

1//! A crate for handling RTMP server connections.
2#![cfg_attr(feature = "docs", doc = "\n\nSee the [changelog][changelog] for a full release history.")]
3//! ## Specifications
4//!
5//! | Name | Version | Link | Comments |
6//! | --- | --- | --- | --- |
7//! | Adobe’s Real Time Messaging Protocol | `1.0` | <https://github.com/veovera/enhanced-rtmp/blob/main/docs/legacy/rtmp-v1-0-spec.pdf> | Refered to as 'Legacy RTMP spec' in this documentation |
8//! | Enhancing RTMP, FLV | `v1-2024-02-29-r1` | <https://github.com/veovera/enhanced-rtmp/blob/main/docs/enhanced/enhanced-rtmp-v1.pdf> | |
9//! | Enhanced RTMP | `v2-2024-10-22-b1` | <https://github.com/veovera/enhanced-rtmp/blob/main/docs/enhanced/enhanced-rtmp-v2.pdf> | Refered to as 'Enhanced RTMP spec' in this documentation |
10#![cfg_attr(feature = "docs", doc = "## Feature flags")]
11#![cfg_attr(feature = "docs", doc = document_features::document_features!())]
12//! ## Example
13//!
14//! ```no_run
15//! # use std::io::Cursor;
16//! #
17//! # use scuffle_rtmp::ServerSession;
18//! # use scuffle_rtmp::session::server::{ServerSessionError, SessionData, SessionHandler};
19//! # use tokio::net::TcpListener;
20//! #
21//! struct Handler;
22//!
23//! impl SessionHandler for Handler {
24//!     async fn on_data(&mut self, stream_id: u32, data: SessionData) -> Result<(), ServerSessionError> {
25//!         // Handle incoming video/audio/meta data
26//!         Ok(())
27//!     }
28//!
29//!     async fn on_publish(&mut self, stream_id: u32, app_name: &str, stream_name: &str) -> Result<(), ServerSessionError> {
30//!         // Handle the publish event
31//!         Ok(())
32//!     }
33//!
34//!     async fn on_unpublish(&mut self, stream_id: u32) -> Result<(), ServerSessionError> {
35//!         // Handle the unpublish event
36//!         Ok(())
37//!     }
38//! }
39//!
40//! #[tokio::main]
41//! async fn main() {
42//!     let listener = TcpListener::bind("[::]:1935").await.unwrap();
43//!     // listening on [::]:1935
44//!
45//!     while let Ok((stream, addr)) = listener.accept().await {
46//!         let session = ServerSession::new(stream, Handler);
47//!
48//!         tokio::spawn(async move {
49//!             if let Err(err) = session.run().await {
50//!                 // Handle the session error
51//!             }
52//!         });
53//!     }
54//! }
55//! ```
56//!
57//! ## License
58//!
59//! This project is licensed under the MIT or Apache-2.0 license.
60//! You can choose between one of them if you use this work.
61//!
62//! `SPDX-License-Identifier: MIT OR Apache-2.0`
63#![cfg_attr(all(coverage_nightly, test), feature(coverage_attribute))]
64#![cfg_attr(docsrs, feature(doc_auto_cfg))]
65#![deny(missing_docs)]
66#![deny(unsafe_code)]
67#![deny(unreachable_pub)]
68
69pub mod chunk;
70pub mod command_messages;
71pub mod error;
72pub mod handshake;
73pub mod messages;
74pub mod protocol_control_messages;
75pub mod session;
76pub mod user_control_messages;
77
78pub use session::server::ServerSession;
79
80/// Changelogs generated by [scuffle_changelog]
81#[cfg(feature = "docs")]
82#[scuffle_changelog::changelog]
83pub mod changelog {}
84
85#[cfg(test)]
86#[cfg_attr(all(test, coverage_nightly), coverage(off))]
87mod tests {
88    use std::path::PathBuf;
89    use std::time::Duration;
90
91    use scuffle_future_ext::FutureExt;
92    use tokio::process::Command;
93    use tokio::sync::{mpsc, oneshot};
94
95    use crate::session::server::{ServerSession, ServerSessionError, SessionData, SessionHandler};
96
97    fn file_path(item: &str) -> PathBuf {
98        if let Some(env) = std::env::var_os("ASSETS_DIR") {
99            PathBuf::from(env).join(item)
100        } else {
101            PathBuf::from(env!("CARGO_MANIFEST_DIR")).join(format!("../../assets/{item}"))
102        }
103    }
104
105    enum Event {
106        Publish {
107            stream_id: u32,
108            app_name: String,
109            stream_name: String,
110            response: oneshot::Sender<Result<(), ServerSessionError>>,
111        },
112        Unpublish {
113            stream_id: u32,
114            response: oneshot::Sender<Result<(), ServerSessionError>>,
115        },
116        Data {
117            stream_id: u32,
118            data: SessionData,
119            response: oneshot::Sender<Result<(), ServerSessionError>>,
120        },
121    }
122
123    struct Handler(mpsc::Sender<Event>);
124
125    impl SessionHandler for Handler {
126        async fn on_publish(&mut self, stream_id: u32, app_name: &str, stream_name: &str) -> Result<(), ServerSessionError> {
127            let (response, reciever) = oneshot::channel();
128
129            self.0
130                .send(Event::Publish {
131                    stream_id,
132                    app_name: app_name.to_string(),
133                    stream_name: stream_name.to_string(),
134                    response,
135                })
136                .await
137                .unwrap();
138
139            reciever.await.unwrap()
140        }
141
142        async fn on_unpublish(&mut self, stream_id: u32) -> Result<(), ServerSessionError> {
143            let (response, reciever) = oneshot::channel();
144
145            self.0.send(Event::Unpublish { stream_id, response }).await.unwrap();
146
147            reciever.await.unwrap()
148        }
149
150        async fn on_data(&mut self, stream_id: u32, data: SessionData) -> Result<(), ServerSessionError> {
151            let (response, reciever) = oneshot::channel();
152            self.0
153                .send(Event::Data {
154                    stream_id,
155                    data,
156                    response,
157                })
158                .await
159                .unwrap();
160
161            reciever.await.unwrap()
162        }
163    }
164
165    #[cfg(not(valgrind))] // test is time-sensitive, consider refactoring?
166    #[tokio::test]
167    async fn test_basic_rtmp_clean() {
168        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.expect("failed to bind");
169        let addr = listener.local_addr().unwrap();
170
171        let _ffmpeg = Command::new(std::env::var_os("FFMPEG").unwrap_or("ffmpeg".into()))
172            .args([
173                "-loglevel",
174                "debug",
175                "-re",
176                "-i",
177                file_path("avc_aac.mp4").to_str().expect("failed to get path"),
178                "-r",
179                "30",
180                "-t",
181                "1", // just for the test so it doesn't take too long
182                "-c",
183                "copy",
184                "-f",
185                "flv",
186                &format!("rtmp://{}:{}/live/stream-key", addr.ip(), addr.port()),
187            ])
188            .stdout(std::process::Stdio::inherit())
189            .stderr(std::process::Stdio::inherit())
190            .spawn()
191            .expect("failed to execute ffmpeg");
192
193        let (ffmpeg_stream, _) = listener
194            .accept()
195            .with_timeout(Duration::from_millis(1000))
196            .await
197            .expect("timed out")
198            .expect("failed to accept");
199
200        let (ffmpeg_handle, mut ffmpeg_event_reciever) = {
201            let (ffmpeg_event_producer, ffmpeg_event_reciever) = mpsc::channel(1);
202            let session = ServerSession::new(ffmpeg_stream, Handler(ffmpeg_event_producer));
203
204            (
205                tokio::spawn(async move {
206                    let r = session.run().await;
207                    println!("ffmpeg session ended: {r:?}");
208                    r
209                }),
210                ffmpeg_event_reciever,
211            )
212        };
213
214        let event = ffmpeg_event_reciever
215            .recv()
216            .with_timeout(Duration::from_millis(1000))
217            .await
218            .expect("timed out")
219            .expect("failed to recv event");
220
221        match event {
222            Event::Publish {
223                stream_id,
224                app_name,
225                stream_name,
226                response,
227            } => {
228                assert_eq!(stream_id, 1);
229                assert_eq!(app_name, "live");
230                assert_eq!(stream_name, "stream-key");
231                response.send(Ok(())).expect("failed to send response");
232            }
233            _ => panic!("unexpected event"),
234        }
235
236        let mut got_video = false;
237        let mut got_audio = false;
238        let mut got_metadata = false;
239
240        while let Some(data) = ffmpeg_event_reciever
241            .recv()
242            .with_timeout(Duration::from_millis(1000))
243            .await
244            .expect("timed out")
245        {
246            match data {
247                Event::Data {
248                    stream_id,
249                    response,
250                    data,
251                    ..
252                } => {
253                    match data {
254                        SessionData::Video { .. } => got_video = true,
255                        SessionData::Audio { .. } => got_audio = true,
256                        SessionData::Amf0 { .. } => got_metadata = true,
257                    }
258                    response.send(Ok(())).expect("failed to send response");
259                    assert_eq!(stream_id, 1);
260                }
261                Event::Unpublish { stream_id, response } => {
262                    assert_eq!(stream_id, 1);
263                    response.send(Ok(())).expect("failed to send response");
264                    break;
265                }
266                _ => panic!("unexpected event"),
267            }
268        }
269
270        assert!(got_video);
271        assert!(got_audio);
272        assert!(got_metadata);
273
274        if ffmpeg_event_reciever
275            .recv()
276            .with_timeout(Duration::from_millis(1000))
277            .await
278            .expect("timed out")
279            .is_some()
280        {
281            panic!("unexpected event");
282        }
283
284        assert!(
285            ffmpeg_handle
286                .await
287                .expect("failed to join handle")
288                .expect("failed to handle ffmpeg connection")
289        );
290
291        // TODO: Fix this assertion
292        // assert!(ffmpeg.try_wait().expect("failed to wait for ffmpeg").is_none());
293    }
294
295    // test is time-sensitive, consider refactoring?
296    // windows seems to not let us kill ffmpeg without it cleaning up the stream.
297    #[cfg(all(not(valgrind), not(windows)))]
298    #[tokio::test]
299    async fn test_basic_rtmp_unclean() {
300        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.expect("failed to bind");
301        let addr = listener.local_addr().unwrap();
302
303        println!("ffmpeg from {}", std::env::var("FFMPEG").unwrap_or("ffmpeg".into()));
304
305        let mut ffmpeg = Command::new(std::env::var_os("FFMPEG").unwrap_or("ffmpeg".into()))
306            .args([
307                "-loglevel",
308                "debug",
309                "-re",
310                "-i",
311                file_path("avc_aac.mp4").to_str().expect("failed to get path"),
312                "-r",
313                "30",
314                "-t",
315                "1", // just for the test so it doesn't take too long
316                "-c",
317                "copy",
318                "-f",
319                "flv",
320                &format!("rtmp://{}:{}/live/stream-key", addr.ip(), addr.port()),
321            ])
322            .stdout(std::process::Stdio::inherit())
323            .stderr(std::process::Stdio::inherit())
324            .spawn()
325            .expect("failed to execute ffmpeg");
326
327        let (ffmpeg_stream, _) = listener
328            .accept()
329            .with_timeout(Duration::from_millis(1000))
330            .await
331            .expect("timed out")
332            .expect("failed to accept");
333
334        let (ffmpeg_handle, mut ffmpeg_event_reciever) = {
335            let (ffmpeg_event_producer, ffmpeg_event_reciever) = mpsc::channel(1);
336            let session = ServerSession::new(ffmpeg_stream, Handler(ffmpeg_event_producer));
337
338            (
339                tokio::spawn(async move {
340                    let r = session.run().await;
341                    println!("ffmpeg session ended: {r:?}");
342                    r
343                }),
344                ffmpeg_event_reciever,
345            )
346        };
347
348        let event = ffmpeg_event_reciever
349            .recv()
350            .with_timeout(Duration::from_millis(1000))
351            .await
352            .expect("timed out")
353            .expect("failed to recv event");
354
355        match event {
356            Event::Publish {
357                stream_id,
358                app_name,
359                stream_name,
360                response,
361            } => {
362                assert_eq!(stream_id, 1);
363                assert_eq!(app_name, "live");
364                assert_eq!(stream_name, "stream-key");
365                response.send(Ok(())).expect("failed to send response");
366            }
367            _ => panic!("unexpected event"),
368        }
369
370        let mut got_video = false;
371        let mut got_audio = false;
372        let mut got_metadata = false;
373
374        while let Some(data) = ffmpeg_event_reciever
375            .recv()
376            .with_timeout(Duration::from_millis(1000))
377            .await
378            .expect("timed out")
379        {
380            match data {
381                Event::Data {
382                    stream_id,
383                    response,
384                    data,
385                    ..
386                } => {
387                    assert_eq!(stream_id, 1);
388                    match data {
389                        SessionData::Video { .. } => got_video = true,
390                        SessionData::Audio { .. } => got_audio = true,
391                        SessionData::Amf0 { .. } => got_metadata = true,
392                    }
393                    response.send(Ok(())).expect("failed to send response");
394                }
395                _ => panic!("unexpected event"),
396            }
397
398            if got_video && got_audio && got_metadata {
399                break;
400            }
401        }
402
403        assert!(got_video);
404        assert!(got_audio);
405        assert!(got_metadata);
406
407        ffmpeg.kill().await.expect("failed to kill ffmpeg");
408
409        while let Some(data) = ffmpeg_event_reciever
410            .recv()
411            .with_timeout(Duration::from_millis(1000))
412            .await
413            .expect("timed out")
414        {
415            match data {
416                Event::Data { response, .. } => {
417                    response.send(Ok(())).expect("failed to send response");
418                }
419                _ => panic!("unexpected event"),
420            }
421        }
422
423        // the server should have detected the ffmpeg process has died uncleanly
424        assert!(
425            !ffmpeg_handle
426                .await
427                .expect("failed to join handle")
428                .expect("failed to handle ffmpeg connection")
429        );
430    }
431}