scuffle_bootstrap_telemetry/
lib.rs1#![cfg_attr(feature = "docs", doc = "\n\nSee the [changelog][changelog] for a full release history.")]
7#![cfg_attr(feature = "docs", doc = "## Feature flags")]
8#![cfg_attr(feature = "docs", doc = document_features::document_features!())]
9#![cfg_attr(all(coverage_nightly, test), feature(coverage_attribute))]
105#![cfg_attr(docsrs, feature(doc_auto_cfg))]
106#![deny(missing_docs)]
107#![deny(unsafe_code)]
108#![deny(unreachable_pub)]
109
110use anyhow::Context;
111use bytes::Bytes;
112#[cfg(feature = "opentelemetry-logs")]
113pub use opentelemetry_appender_tracing;
114#[cfg(feature = "opentelemetry")]
115pub use opentelemetry_sdk;
116#[cfg(feature = "prometheus")]
117pub use prometheus_client;
118use scuffle_bootstrap::global::Global;
119use scuffle_bootstrap::service::Service;
120#[cfg(feature = "opentelemetry-traces")]
121pub use tracing_opentelemetry;
122
123#[cfg(feature = "opentelemetry")]
124pub mod opentelemetry;
125
126pub struct TelemetrySvc;
174
175pub trait TelemetryConfig: Global {
177 fn enabled(&self) -> bool {
179 true
180 }
181
182 fn bind_address(&self) -> Option<std::net::SocketAddr> {
184 None
185 }
186
187 fn http_server_name(&self) -> &str {
189 "scuffle-bootstrap-telemetry"
190 }
191
192 fn health_check(&self) -> impl std::future::Future<Output = Result<(), anyhow::Error>> + Send {
196 std::future::ready(Ok(()))
197 }
198
199 #[cfg(feature = "prometheus")]
206 fn prometheus_metrics_registry(&self) -> Option<&prometheus_client::registry::Registry> {
207 None
208 }
209
210 #[cfg(feature = "opentelemetry")]
217 fn opentelemetry(&self) -> Option<&opentelemetry::OpenTelemetry> {
218 None
219 }
220}
221
222impl<Global: TelemetryConfig> Service<Global> for TelemetrySvc {
223 async fn enabled(&self, global: &std::sync::Arc<Global>) -> anyhow::Result<bool> {
224 Ok(global.enabled())
225 }
226
227 async fn run(self, global: std::sync::Arc<Global>, ctx: scuffle_context::Context) -> anyhow::Result<()> {
228 if let Some(bind_addr) = global.bind_address() {
229 let global = global.clone();
230
231 let service = scuffle_http::service::fn_http_service(move |req| {
232 let global = global.clone();
233 async move {
234 match req.uri().path() {
235 "/health" => health_check(&global, req).await,
236 #[cfg(feature = "prometheus")]
237 "/metrics" => metrics(&global, req).await,
238 #[cfg(all(feature = "pprof", unix))]
239 "/pprof/cpu" => pprof(&global, req).await,
240 #[cfg(feature = "opentelemetry")]
241 "/opentelemetry/flush" => opentelemetry_flush(&global).await,
242 _ => Ok(http::Response::builder()
243 .status(http::StatusCode::NOT_FOUND)
244 .body(http_body_util::Full::new(Bytes::from_static(b"not found")))?),
245 }
246 }
247 });
248
249 scuffle_http::HttpServer::builder()
250 .bind(bind_addr)
251 .ctx(ctx)
252 .service_factory(scuffle_http::service::service_clone_factory(service))
253 .build()
254 .run()
255 .await
256 .context("server run")?;
257 } else {
258 ctx.done().await;
259 }
260
261 #[cfg(feature = "opentelemetry")]
262 if let Some(opentelemetry) = global.opentelemetry().cloned() {
263 if opentelemetry.is_enabled() {
264 tokio::task::spawn_blocking(move || opentelemetry.shutdown())
265 .await
266 .context("opentelemetry shutdown spawn")?
267 .context("opentelemetry shutdown")?;
268 }
269 }
270
271 Ok(())
272 }
273}
274
275async fn health_check<G: TelemetryConfig>(
276 global: &std::sync::Arc<G>,
277 _: http::Request<scuffle_http::body::IncomingBody>,
278) -> Result<http::Response<http_body_util::Full<Bytes>>, http::Error> {
279 if let Err(err) = global.health_check().await {
280 tracing::error!("health check failed: {err}");
281 Ok(http::Response::builder()
282 .status(http::StatusCode::INTERNAL_SERVER_ERROR)
283 .body(http_body_util::Full::new(format!("{err:#}").into()))?)
284 } else {
285 Ok(http::Response::builder()
286 .status(http::StatusCode::OK)
287 .body(http_body_util::Full::new(Bytes::from_static(b"ok")))?)
288 }
289}
290
291#[cfg(feature = "prometheus")]
292async fn metrics<G: TelemetryConfig>(
293 global: &std::sync::Arc<G>,
294 _: http::Request<scuffle_http::body::IncomingBody>,
295) -> Result<http::Response<http_body_util::Full<Bytes>>, http::Error> {
296 if let Some(metrics) = global.prometheus_metrics_registry() {
297 let mut buf = String::new();
298 if prometheus_client::encoding::text::encode(&mut buf, metrics).is_err() {
299 tracing::error!("metrics encode failed");
300 return http::Response::builder()
301 .status(http::StatusCode::INTERNAL_SERVER_ERROR)
302 .body(http_body_util::Full::new("metrics encode failed".to_string().into()));
303 }
304
305 Ok(http::Response::builder()
306 .status(http::StatusCode::OK)
307 .body(http_body_util::Full::new(Bytes::from(buf)))?)
308 } else {
309 Ok(http::Response::builder()
310 .status(http::StatusCode::NOT_FOUND)
311 .body(http_body_util::Full::new(Bytes::from_static(b"not found")))?)
312 }
313}
314
315#[cfg(unix)]
316#[cfg(feature = "pprof")]
317async fn pprof<G: TelemetryConfig>(
318 _: &std::sync::Arc<G>,
319 req: http::Request<scuffle_http::body::IncomingBody>,
320) -> Result<http::Response<http_body_util::Full<Bytes>>, http::Error> {
321 let query = req.uri().query();
322 let query = query.map(querystring::querify).into_iter().flatten();
323
324 let mut freq = 100;
325 let mut duration = std::time::Duration::from_secs(5);
326 let mut ignore_list = Vec::new();
327
328 for (key, value) in query {
329 if key == "freq" {
330 freq = match value.parse() {
331 Ok(v) => v,
332 Err(err) => {
333 return http::Response::builder()
334 .status(http::StatusCode::BAD_REQUEST)
335 .body(http_body_util::Full::new(format!("invalid freq: {err:#}").into()));
336 }
337 };
338 } else if key == "duration" {
339 duration = match value.parse() {
340 Ok(v) => std::time::Duration::from_secs(v),
341 Err(err) => {
342 return http::Response::builder()
343 .status(http::StatusCode::BAD_REQUEST)
344 .body(http_body_util::Full::new(format!("invalid duration: {err:#}").into()));
345 }
346 };
347 } else if key == "ignore" {
348 ignore_list.push(value);
349 }
350 }
351
352 let cpu = scuffle_pprof::Cpu::new(freq, &ignore_list);
353
354 match tokio::task::spawn_blocking(move || cpu.capture(duration)).await {
355 Ok(Ok(data)) => Ok(http::Response::builder()
356 .status(http::StatusCode::OK)
357 .body(http_body_util::Full::new(Bytes::from(data)))?),
358 Ok(Err(err)) => {
359 tracing::error!("cpu capture failed: {err:#}");
360 Ok(http::Response::builder()
361 .status(http::StatusCode::INTERNAL_SERVER_ERROR)
362 .body(http_body_util::Full::new(format!("{err:#}").into()))?)
363 }
364 Err(err) => {
365 tracing::error!("cpu capture failed: {err:#}");
366 Ok(http::Response::builder()
367 .status(http::StatusCode::INTERNAL_SERVER_ERROR)
368 .body(http_body_util::Full::new(format!("{err:#}").into()))?)
369 }
370 }
371}
372
373#[cfg(feature = "opentelemetry")]
374async fn opentelemetry_flush<G: TelemetryConfig>(
375 global: &std::sync::Arc<G>,
376) -> Result<http::Response<http_body_util::Full<Bytes>>, http::Error> {
377 if let Some(opentelemetry) = global.opentelemetry().cloned() {
378 if opentelemetry.is_enabled() {
379 match tokio::task::spawn_blocking(move || opentelemetry.flush()).await {
380 Ok(Ok(())) => Ok(http::Response::builder()
381 .status(http::StatusCode::OK)
382 .body(http_body_util::Full::new(Bytes::from_static(b"ok")))?),
383 Ok(Err(err)) => {
384 tracing::error!("opentelemetry flush failed: {err:#}");
385 Ok(http::Response::builder()
386 .status(http::StatusCode::INTERNAL_SERVER_ERROR)
387 .body(http_body_util::Full::new(format!("{err:#}").into()))?)
388 }
389 Err(err) => {
390 tracing::error!("opentelemetry flush spawn failed: {err:#}");
391 Ok(http::Response::builder()
392 .status(http::StatusCode::INTERNAL_SERVER_ERROR)
393 .body(http_body_util::Full::new(format!("{err:#}").into()))?)
394 }
395 }
396 } else {
397 Ok(http::Response::builder()
398 .status(http::StatusCode::OK)
399 .body(http_body_util::Full::new(Bytes::from_static(b"ok")))?)
400 }
401 } else {
402 Ok(http::Response::builder()
403 .status(http::StatusCode::NOT_FOUND)
404 .body(http_body_util::Full::new(Bytes::from_static(b"not found")))?)
405 }
406}
407
408#[cfg(test)]
409#[cfg_attr(all(test, coverage_nightly), coverage(off))]
410#[cfg(all(
411 feature = "opentelemetry-metrics",
412 feature = "opentelemetry-traces",
413 feature = "opentelemetry-logs"
414))]
415mod tests {
416 use std::net::SocketAddr;
417 use std::sync::Arc;
418
419 #[cfg(unix)]
420 use bytes::Bytes;
421 #[cfg(feature = "opentelemetry-logs")]
422 use opentelemetry_sdk::logs::SdkLoggerProvider;
423 #[cfg(feature = "opentelemetry-metrics")]
424 use opentelemetry_sdk::metrics::SdkMeterProvider;
425 #[cfg(feature = "opentelemetry-traces")]
426 use opentelemetry_sdk::trace::SdkTracerProvider;
427 use scuffle_bootstrap::{GlobalWithoutConfig, Service};
428
429 use crate::{TelemetryConfig, TelemetrySvc};
430
431 fn install_provider() {
432 static ONCE: std::sync::Once = std::sync::Once::new();
433
434 ONCE.call_once(|| {
435 rustls::crypto::aws_lc_rs::default_provider()
436 .install_default()
437 .expect("failed to install aws lc provider");
438 });
439 }
440
441 async fn request_metrics(addr: SocketAddr) -> reqwest::Result<String> {
442 reqwest::get(format!("http://{addr}/metrics"))
443 .await
444 .unwrap()
445 .error_for_status()?
446 .text()
447 .await
448 }
449
450 async fn request_health(addr: SocketAddr) -> String {
451 reqwest::get(format!("http://{addr}/health"))
452 .await
453 .unwrap()
454 .error_for_status()
455 .expect("health check failed")
456 .text()
457 .await
458 .expect("health check text")
459 }
460
461 #[cfg(unix)]
462 async fn request_pprof(addr: SocketAddr, freq: &str, duration: &str) -> reqwest::Result<Bytes> {
463 reqwest::get(format!("http://{addr}/pprof/cpu?freq={freq}&duration={duration}"))
464 .await
465 .unwrap()
466 .error_for_status()?
467 .bytes()
468 .await
469 }
470
471 async fn flush_opentelemetry(addr: SocketAddr) -> reqwest::Result<reqwest::Response> {
472 reqwest::get(format!("http://{addr}/opentelemetry/flush"))
473 .await
474 .unwrap()
475 .error_for_status()
476 }
477
478 #[cfg(not(valgrind))] #[tokio::test]
480 async fn telemetry_http_server() {
481 install_provider();
482
483 struct TestGlobal {
484 bind_addr: SocketAddr,
485 #[cfg(feature = "prometheus")]
486 prometheus: prometheus_client::registry::Registry,
487 open_telemetry: crate::opentelemetry::OpenTelemetry,
488 }
489
490 impl GlobalWithoutConfig for TestGlobal {
491 async fn init() -> anyhow::Result<Arc<Self>> {
492 let listener = std::net::TcpListener::bind("127.0.0.1:0")?;
493 let bind_addr = listener.local_addr()?;
494
495 let mut prometheus = prometheus_client::registry::Registry::default();
496
497 let exporter = scuffle_metrics::prometheus::exporter().build();
498 prometheus.register_collector(exporter.collector());
499
500 let metrics = SdkMeterProvider::builder().with_reader(exporter).build();
501 opentelemetry::global::set_meter_provider(metrics.clone());
502
503 let tracer = SdkTracerProvider::default();
504 opentelemetry::global::set_tracer_provider(tracer.clone());
505
506 let logger = SdkLoggerProvider::builder().build();
507
508 let open_telemetry = crate::opentelemetry::OpenTelemetry::new()
509 .with_metrics(metrics)
510 .with_traces(tracer)
511 .with_logs(logger);
512
513 Ok(Arc::new(TestGlobal {
514 bind_addr,
515 prometheus,
516 open_telemetry,
517 }))
518 }
519 }
520
521 impl TelemetryConfig for TestGlobal {
522 fn bind_address(&self) -> Option<std::net::SocketAddr> {
523 Some(self.bind_addr)
524 }
525
526 fn prometheus_metrics_registry(&self) -> Option<&prometheus_client::registry::Registry> {
527 Some(&self.prometheus)
528 }
529
530 fn opentelemetry(&self) -> Option<&crate::opentelemetry::OpenTelemetry> {
531 Some(&self.open_telemetry)
532 }
533 }
534
535 #[scuffle_metrics::metrics]
536 mod example {
537 use scuffle_metrics::{CounterU64, MetricEnum};
538
539 #[derive(MetricEnum)]
540 pub enum Kind {
541 Http,
542 Grpc,
543 }
544
545 #[metrics(unit = "requests")]
546 pub fn request(kind: Kind) -> CounterU64;
547 }
548
549 let global = <TestGlobal as GlobalWithoutConfig>::init().await.unwrap();
550
551 let bind_addr = global.bind_addr;
552
553 assert!(TelemetrySvc.enabled(&global).await.unwrap());
554
555 let task_handle = tokio::spawn(TelemetrySvc.run(global, scuffle_context::Context::global()));
556
557 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
558
559 let health = request_health(bind_addr).await;
560 assert_eq!(health, "ok");
561
562 let metrics = request_metrics(bind_addr).await.expect("metrics failed");
563 assert!(metrics.starts_with("# HELP target Information about the target\n"));
564 assert!(metrics.contains("# TYPE target info\n"));
565 assert!(metrics.contains("service_name=\"unknown_service\""));
566 assert!(metrics.contains("telemetry_sdk_language=\"rust\""));
567 assert!(metrics.contains("telemetry_sdk_name=\"opentelemetry\""));
568 assert!(metrics.ends_with("# EOF\n"));
569
570 example::request(example::Kind::Http).incr();
571
572 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
573
574 let metrics = request_metrics(bind_addr).await.expect("metrics failed");
575 assert!(metrics.contains("# UNIT example_request_requests requests\n"));
576 assert!(metrics.contains("example_request_requests_total{"));
577 assert!(metrics.contains(format!("otel_scope_name=\"{}\"", env!("CARGO_PKG_NAME")).as_str()));
578 assert!(metrics.contains(format!("otel_scope_version=\"{}\"", env!("CARGO_PKG_VERSION")).as_str()));
579 assert!(metrics.contains("kind=\"Http\""));
580 assert!(metrics.contains("} 1\n"));
581 assert!(metrics.ends_with("# EOF\n"));
582
583 example::request(example::Kind::Http).incr();
584
585 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
586
587 let metrics = request_metrics(bind_addr).await.expect("metrics failed");
588 assert!(metrics.contains("# UNIT example_request_requests requests\n"));
589 assert!(metrics.contains("example_request_requests_total{"));
590 assert!(metrics.contains(format!("otel_scope_name=\"{}\"", env!("CARGO_PKG_NAME")).as_str()));
591 assert!(metrics.contains(format!("otel_scope_version=\"{}\"", env!("CARGO_PKG_VERSION")).as_str()));
592 assert!(metrics.contains("kind=\"Http\""));
593 assert!(metrics.contains("} 2\n"));
594 assert!(metrics.ends_with("# EOF\n"));
595
596 #[cfg(unix)]
597 {
598 let timer = std::time::Instant::now();
599 assert!(!request_pprof(bind_addr, "100", "2").await.expect("pprof failed").is_empty());
600 assert!(timer.elapsed() > std::time::Duration::from_secs(2));
601
602 let res = request_pprof(bind_addr, "invalid", "2").await.expect_err("error expected");
603 assert!(res.is_status());
604 assert_eq!(res.status(), Some(reqwest::StatusCode::BAD_REQUEST));
605
606 let res = request_pprof(bind_addr, "100", "invalid").await.expect_err("error expected");
607 assert!(res.is_status());
608 assert_eq!(res.status(), Some(reqwest::StatusCode::BAD_REQUEST));
609 }
610
611 assert!(flush_opentelemetry(bind_addr).await.is_ok());
612
613 let res = reqwest::get(format!("http://{bind_addr}/not_found")).await.unwrap();
615 assert_eq!(res.status(), reqwest::StatusCode::NOT_FOUND);
616
617 scuffle_context::Handler::global().shutdown().await;
618
619 task_handle.await.unwrap().unwrap();
620 }
621
622 #[cfg(not(valgrind))] #[tokio::test]
624 async fn empty_telemetry_http_server() {
625 install_provider();
626
627 struct TestGlobal {
628 bind_addr: SocketAddr,
629 }
630
631 impl GlobalWithoutConfig for TestGlobal {
632 async fn init() -> anyhow::Result<Arc<Self>> {
633 let listener = std::net::TcpListener::bind("127.0.0.1:0")?;
634 let bind_addr = listener.local_addr()?;
635
636 Ok(Arc::new(TestGlobal { bind_addr }))
637 }
638 }
639
640 impl TelemetryConfig for TestGlobal {
641 fn bind_address(&self) -> Option<std::net::SocketAddr> {
642 Some(self.bind_addr)
643 }
644 }
645
646 let global = <TestGlobal as GlobalWithoutConfig>::init().await.unwrap();
647
648 let bind_addr = global.bind_addr;
649
650 assert!(TelemetrySvc.enabled(&global).await.unwrap());
651
652 let task_handle = tokio::spawn(TelemetrySvc.run(global, scuffle_context::Context::global()));
653 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
654
655 let health = request_health(bind_addr).await;
656 assert_eq!(health, "ok");
657
658 let res = request_metrics(bind_addr).await.expect_err("error expected");
659 assert!(res.is_status());
660 assert_eq!(res.status(), Some(reqwest::StatusCode::NOT_FOUND));
661
662 #[cfg(unix)]
663 {
664 let timer = std::time::Instant::now();
665 assert!(!request_pprof(bind_addr, "100", "2").await.expect("pprof failed").is_empty());
666 assert!(timer.elapsed() > std::time::Duration::from_secs(2));
667 }
668
669 let err = flush_opentelemetry(bind_addr).await.expect_err("error expected");
670 assert!(err.is_status());
671 assert_eq!(err.status(), Some(reqwest::StatusCode::NOT_FOUND));
672
673 scuffle_context::Handler::global().shutdown().await;
674
675 task_handle.await.unwrap().unwrap();
676 }
677}
678
679#[cfg(feature = "docs")]
681#[scuffle_changelog::changelog]
682pub mod changelog {}