1#![cfg_attr(feature = "docs", doc = "\n\nSee the [changelog][changelog] for a full release history.")]
3#![cfg_attr(feature = "docs", doc = "## Feature flags")]
11#![cfg_attr(feature = "docs", doc = document_features::document_features!())]
12#![cfg_attr(all(coverage_nightly, test), feature(coverage_attribute))]
64#![cfg_attr(docsrs, feature(doc_auto_cfg))]
65#![deny(missing_docs)]
66#![deny(unsafe_code)]
67#![deny(unreachable_pub)]
68
69pub mod chunk;
70pub mod command_messages;
71pub mod error;
72pub mod handshake;
73pub mod messages;
74pub mod protocol_control_messages;
75pub mod session;
76pub mod user_control_messages;
77
78pub use session::server::ServerSession;
79
80#[cfg(feature = "docs")]
82#[scuffle_changelog::changelog]
83pub mod changelog {}
84
85#[cfg(test)]
86#[cfg_attr(all(test, coverage_nightly), coverage(off))]
87mod tests {
88 use std::path::PathBuf;
89 use std::time::Duration;
90
91 use scuffle_future_ext::FutureExt;
92 use tokio::process::Command;
93 use tokio::sync::{mpsc, oneshot};
94
95 use crate::session::server::{ServerSession, ServerSessionError, SessionData, SessionHandler};
96
97 fn file_path(item: &str) -> PathBuf {
98 if let Some(env) = std::env::var_os("ASSETS_DIR") {
99 PathBuf::from(env).join(item)
100 } else {
101 PathBuf::from(env!("CARGO_MANIFEST_DIR")).join(format!("../../assets/{item}"))
102 }
103 }
104
105 enum Event {
106 Publish {
107 stream_id: u32,
108 app_name: String,
109 stream_name: String,
110 response: oneshot::Sender<Result<(), ServerSessionError>>,
111 },
112 Unpublish {
113 stream_id: u32,
114 response: oneshot::Sender<Result<(), ServerSessionError>>,
115 },
116 Data {
117 stream_id: u32,
118 data: SessionData,
119 response: oneshot::Sender<Result<(), ServerSessionError>>,
120 },
121 }
122
123 struct Handler(mpsc::Sender<Event>);
124
125 impl SessionHandler for Handler {
126 async fn on_publish(&mut self, stream_id: u32, app_name: &str, stream_name: &str) -> Result<(), ServerSessionError> {
127 let (response, reciever) = oneshot::channel();
128
129 self.0
130 .send(Event::Publish {
131 stream_id,
132 app_name: app_name.to_string(),
133 stream_name: stream_name.to_string(),
134 response,
135 })
136 .await
137 .unwrap();
138
139 reciever.await.unwrap()
140 }
141
142 async fn on_unpublish(&mut self, stream_id: u32) -> Result<(), ServerSessionError> {
143 let (response, reciever) = oneshot::channel();
144
145 self.0.send(Event::Unpublish { stream_id, response }).await.unwrap();
146
147 reciever.await.unwrap()
148 }
149
150 async fn on_data(&mut self, stream_id: u32, data: SessionData) -> Result<(), ServerSessionError> {
151 let (response, reciever) = oneshot::channel();
152 self.0
153 .send(Event::Data {
154 stream_id,
155 data,
156 response,
157 })
158 .await
159 .unwrap();
160
161 reciever.await.unwrap()
162 }
163 }
164
165 #[cfg(not(valgrind))] #[tokio::test]
167 async fn test_basic_rtmp_clean() {
168 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.expect("failed to bind");
169 let addr = listener.local_addr().unwrap();
170
171 let _ffmpeg = Command::new(std::env::var_os("FFMPEG").unwrap_or("ffmpeg".into()))
172 .args([
173 "-loglevel",
174 "debug",
175 "-re",
176 "-i",
177 file_path("avc_aac.mp4").to_str().expect("failed to get path"),
178 "-r",
179 "30",
180 "-t",
181 "1", "-c",
183 "copy",
184 "-f",
185 "flv",
186 &format!("rtmp://{}:{}/live/stream-key", addr.ip(), addr.port()),
187 ])
188 .stdout(std::process::Stdio::inherit())
189 .stderr(std::process::Stdio::inherit())
190 .spawn()
191 .expect("failed to execute ffmpeg");
192
193 let (ffmpeg_stream, _) = listener
194 .accept()
195 .with_timeout(Duration::from_millis(1000))
196 .await
197 .expect("timed out")
198 .expect("failed to accept");
199
200 let (ffmpeg_handle, mut ffmpeg_event_reciever) = {
201 let (ffmpeg_event_producer, ffmpeg_event_reciever) = mpsc::channel(1);
202 let session = ServerSession::new(ffmpeg_stream, Handler(ffmpeg_event_producer));
203
204 (
205 tokio::spawn(async move {
206 let r = session.run().await;
207 println!("ffmpeg session ended: {r:?}");
208 r
209 }),
210 ffmpeg_event_reciever,
211 )
212 };
213
214 let event = ffmpeg_event_reciever
215 .recv()
216 .with_timeout(Duration::from_millis(1000))
217 .await
218 .expect("timed out")
219 .expect("failed to recv event");
220
221 match event {
222 Event::Publish {
223 stream_id,
224 app_name,
225 stream_name,
226 response,
227 } => {
228 assert_eq!(stream_id, 1);
229 assert_eq!(app_name, "live");
230 assert_eq!(stream_name, "stream-key");
231 response.send(Ok(())).expect("failed to send response");
232 }
233 _ => panic!("unexpected event"),
234 }
235
236 let mut got_video = false;
237 let mut got_audio = false;
238 let mut got_metadata = false;
239
240 while let Some(data) = ffmpeg_event_reciever
241 .recv()
242 .with_timeout(Duration::from_millis(1000))
243 .await
244 .expect("timed out")
245 {
246 match data {
247 Event::Data {
248 stream_id,
249 response,
250 data,
251 ..
252 } => {
253 match data {
254 SessionData::Video { .. } => got_video = true,
255 SessionData::Audio { .. } => got_audio = true,
256 SessionData::Amf0 { .. } => got_metadata = true,
257 }
258 response.send(Ok(())).expect("failed to send response");
259 assert_eq!(stream_id, 1);
260 }
261 Event::Unpublish { stream_id, response } => {
262 assert_eq!(stream_id, 1);
263 response.send(Ok(())).expect("failed to send response");
264 break;
265 }
266 _ => panic!("unexpected event"),
267 }
268 }
269
270 assert!(got_video);
271 assert!(got_audio);
272 assert!(got_metadata);
273
274 if ffmpeg_event_reciever
275 .recv()
276 .with_timeout(Duration::from_millis(1000))
277 .await
278 .expect("timed out")
279 .is_some()
280 {
281 panic!("unexpected event");
282 }
283
284 assert!(
285 ffmpeg_handle
286 .await
287 .expect("failed to join handle")
288 .expect("failed to handle ffmpeg connection")
289 );
290
291 }
294
295 #[cfg(all(not(valgrind), not(windows)))]
298 #[tokio::test]
299 async fn test_basic_rtmp_unclean() {
300 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.expect("failed to bind");
301 let addr = listener.local_addr().unwrap();
302
303 println!("ffmpeg from {}", std::env::var("FFMPEG").unwrap_or("ffmpeg".into()));
304
305 let mut ffmpeg = Command::new(std::env::var_os("FFMPEG").unwrap_or("ffmpeg".into()))
306 .args([
307 "-loglevel",
308 "debug",
309 "-re",
310 "-i",
311 file_path("avc_aac.mp4").to_str().expect("failed to get path"),
312 "-r",
313 "30",
314 "-t",
315 "1", "-c",
317 "copy",
318 "-f",
319 "flv",
320 &format!("rtmp://{}:{}/live/stream-key", addr.ip(), addr.port()),
321 ])
322 .stdout(std::process::Stdio::inherit())
323 .stderr(std::process::Stdio::inherit())
324 .spawn()
325 .expect("failed to execute ffmpeg");
326
327 let (ffmpeg_stream, _) = listener
328 .accept()
329 .with_timeout(Duration::from_millis(1000))
330 .await
331 .expect("timed out")
332 .expect("failed to accept");
333
334 let (ffmpeg_handle, mut ffmpeg_event_reciever) = {
335 let (ffmpeg_event_producer, ffmpeg_event_reciever) = mpsc::channel(1);
336 let session = ServerSession::new(ffmpeg_stream, Handler(ffmpeg_event_producer));
337
338 (
339 tokio::spawn(async move {
340 let r = session.run().await;
341 println!("ffmpeg session ended: {r:?}");
342 r
343 }),
344 ffmpeg_event_reciever,
345 )
346 };
347
348 let event = ffmpeg_event_reciever
349 .recv()
350 .with_timeout(Duration::from_millis(1000))
351 .await
352 .expect("timed out")
353 .expect("failed to recv event");
354
355 match event {
356 Event::Publish {
357 stream_id,
358 app_name,
359 stream_name,
360 response,
361 } => {
362 assert_eq!(stream_id, 1);
363 assert_eq!(app_name, "live");
364 assert_eq!(stream_name, "stream-key");
365 response.send(Ok(())).expect("failed to send response");
366 }
367 _ => panic!("unexpected event"),
368 }
369
370 let mut got_video = false;
371 let mut got_audio = false;
372 let mut got_metadata = false;
373
374 while let Some(data) = ffmpeg_event_reciever
375 .recv()
376 .with_timeout(Duration::from_millis(1000))
377 .await
378 .expect("timed out")
379 {
380 match data {
381 Event::Data {
382 stream_id,
383 response,
384 data,
385 ..
386 } => {
387 assert_eq!(stream_id, 1);
388 match data {
389 SessionData::Video { .. } => got_video = true,
390 SessionData::Audio { .. } => got_audio = true,
391 SessionData::Amf0 { .. } => got_metadata = true,
392 }
393 response.send(Ok(())).expect("failed to send response");
394 }
395 _ => panic!("unexpected event"),
396 }
397
398 if got_video && got_audio && got_metadata {
399 break;
400 }
401 }
402
403 assert!(got_video);
404 assert!(got_audio);
405 assert!(got_metadata);
406
407 ffmpeg.kill().await.expect("failed to kill ffmpeg");
408
409 while let Some(data) = ffmpeg_event_reciever
410 .recv()
411 .with_timeout(Duration::from_millis(1000))
412 .await
413 .expect("timed out")
414 {
415 match data {
416 Event::Data { response, .. } => {
417 response.send(Ok(())).expect("failed to send response");
418 }
419 _ => panic!("unexpected event"),
420 }
421 }
422
423 assert!(
425 !ffmpeg_handle
426 .await
427 .expect("failed to join handle")
428 .expect("failed to handle ffmpeg connection")
429 );
430 }
431}