1use crate::{
9 Layer, Service,
10 cli::ForwardKind,
11 combinators::{Either, Either3},
12 error::{BoxError, BoxErrorExt, ErrorContext},
13 extensions::ExtensionsRef,
14 http::{
15 BodyLimitLayer, Request, Response, Version,
16 body::util::BodyExt,
17 convert::curl,
18 core::h2::frame::EarlyFrameCapture,
19 fingerprint::{AkamaiH2, Ja4H},
20 header::USER_AGENT,
21 headers::exotic::XClacksOverhead,
22 layer::set_header::SetResponseHeaderLayer,
23 layer::{required_header::AddRequiredResponseHeadersLayer, trace::TraceLayer},
24 proto::h1::Http1HeaderMap,
25 proto::h2::PseudoHeaderOrder,
26 server::HttpServer,
27 service::web::{extract::Json, response::IntoResponse},
28 ws::handshake::{
29 matcher::WebSocketMatcher,
30 server::{WebSocketAcceptor, WebSocketEchoService},
31 },
32 },
33 layer::limit::policy::UnlimitedPolicy,
34 layer::{ConsumeErrLayer, LimitLayer, TimeoutLayer, limit::policy::ConcurrentPolicy},
35 net::address::ip::geo::IpGeoDb,
36 net::forwarded::Forwarded,
37 net::stream::SocketInfo,
38 net::{AuthorityInputExt, Protocol, ProtocolInputExt},
39 proxy::haproxy::server::HaProxyLayer,
40 rt::Executor,
41 tcp::TcpStream,
42 telemetry::tracing,
43 ua::{UserAgent, layer::classifier::UserAgentClassifierLayer, profile::UserAgentDatabase},
44 utils::octets::mib,
45};
46
47use rama_core::error::ErrorExt as _;
48use rama_http::layer::upgrade::UpgradeLayer;
49use serde::Serialize;
50use serde_json::json;
51use std::{convert::Infallible, sync::Arc, time::Duration};
52
53#[cfg(all(feature = "rustls", not(feature = "boring")))]
54use crate::tls::rustls::server::{TlsAcceptorData, TlsAcceptorLayer};
55
56#[cfg(any(feature = "rustls", feature = "boring"))]
57use crate::{
58 net::fingerprint::{Ja3, Ja4, PeetPrint},
59 net::tls::{
60 SecureTransport,
61 client::ClientHelloExtension,
62 client::{ECHClientHello, NegotiatedTlsParameters},
63 },
64};
65#[cfg(feature = "boring")]
66use crate::{
67 net::tls::server::ServerConfig,
68 tls::boring::server::{TlsAcceptorData, TlsAcceptorLayer},
69};
70
71#[cfg(feature = "boring")]
72type TlsConfig = ServerConfig;
73
74#[cfg(all(feature = "rustls", not(feature = "boring")))]
75type TlsConfig = TlsAcceptorData;
76
77#[derive(Debug, Clone)]
78pub struct EchoServiceBuilder<H> {
81 concurrent_limit: usize,
82 body_limit: usize,
83 timeout: Duration,
84 forward: Option<ForwardKind>,
85
86 #[cfg(any(feature = "rustls", feature = "boring"))]
87 tls_server_config: Option<TlsConfig>,
88
89 http_version: Option<Version>,
90
91 ws_support: bool,
92
93 http_service_builder: H,
94
95 uadb: Option<std::sync::Arc<UserAgentDatabase>>,
96
97 geo_db: Option<std::sync::Arc<IpGeoDb>>,
98}
99
100impl Default for EchoServiceBuilder<()> {
101 fn default() -> Self {
102 Self {
103 concurrent_limit: 0,
104 body_limit: mib(1),
105 timeout: Duration::ZERO,
106 forward: None,
107
108 #[cfg(any(feature = "rustls", feature = "boring"))]
109 tls_server_config: None,
110
111 http_version: None,
112
113 ws_support: false,
114
115 http_service_builder: (),
116
117 uadb: None,
118
119 geo_db: None,
120 }
121 }
122}
123
124impl EchoServiceBuilder<()> {
125 #[must_use]
127 pub fn new() -> Self {
128 Self::default()
129 }
130}
131
132impl<H> EchoServiceBuilder<H> {
133 crate::utils::macros::generate_set_and_with! {
134 pub fn concurrent(mut self, limit: usize) -> Self {
138 self.concurrent_limit = limit;
139 self
140 }
141 }
142
143 crate::utils::macros::generate_set_and_with! {
144 pub fn body_limit(mut self, limit: usize) -> Self {
146 self.body_limit = limit;
147 self
148 }
149 }
150
151 crate::utils::macros::generate_set_and_with! {
152 pub fn timeout(mut self, timeout: Duration) -> Self {
156 self.timeout = timeout;
157 self
158 }
159 }
160
161 crate::utils::macros::generate_set_and_with! {
162 pub fn forward(mut self, kind: Option<ForwardKind>) -> Self {
174 self.forward = kind;
175 self
176 }
177 }
178
179 crate::utils::macros::generate_set_and_with! {
180 #[cfg(any(feature = "rustls", feature = "boring"))]
181 pub fn tls_server_config(mut self, cfg: Option<TlsConfig>) -> Self {
184 self.tls_server_config = cfg;
185 self
186 }
187 }
188
189 crate::utils::macros::generate_set_and_with! {
190 pub fn http_version(mut self, version: Option<Version>) -> Self {
192 self.http_version = version;
193 self
194 }
195 }
196
197 pub fn with_http_layer<H2>(self, layer: H2) -> EchoServiceBuilder<(H, H2)> {
199 EchoServiceBuilder {
200 concurrent_limit: self.concurrent_limit,
201 body_limit: self.body_limit,
202 timeout: self.timeout,
203 forward: self.forward,
204
205 #[cfg(any(feature = "rustls", feature = "boring"))]
206 tls_server_config: self.tls_server_config,
207
208 http_version: self.http_version,
209
210 ws_support: self.ws_support,
211
212 http_service_builder: (self.http_service_builder, layer),
213
214 uadb: self.uadb,
215
216 geo_db: self.geo_db,
217 }
218 }
219
220 crate::utils::macros::generate_set_and_with! {
221 pub fn user_agent_database(
224 mut self,
225 db: Option<std::sync::Arc<UserAgentDatabase>>,
226 ) -> Self {
227 self.uadb = db;
228 self
229 }
230 }
231
232 crate::utils::macros::generate_set_and_with! {
233 pub fn geo_db(mut self, db: Option<std::sync::Arc<IpGeoDb>>) -> Self {
236 self.geo_db = db;
237 self
238 }
239 }
240
241 crate::utils::macros::generate_set_and_with! {
242 pub fn ws_support(
244 mut self,
245 support: bool,
246 ) -> Self {
247 self.ws_support = support;
248 self
249 }
250 }
251}
252
253impl<H> EchoServiceBuilder<H>
254where
255 H: Layer<EchoService, Service: Service<Request, Output = Response, Error = BoxError>>,
256{
257 #[expect(unused_mut)]
258 pub fn build(
260 mut self,
261 exec: Executor,
262 ) -> Result<impl Service<TcpStream, Output = (), Error = Infallible>, BoxError> {
263 let tcp_forwarded_layer = match &self.forward {
264 Some(ForwardKind::HaProxy) => Some(HaProxyLayer::default()),
265 _ => None,
266 };
267
268 let http_service = Arc::new(self.build_http(exec.clone()));
269
270 #[cfg(all(feature = "rustls", not(feature = "boring")))]
271 let tls_cfg = self.tls_server_config;
272
273 #[cfg(feature = "boring")]
274 let tls_cfg: Option<TlsAcceptorData> = match self.tls_server_config {
275 Some(cfg) => Some(cfg.try_into()?),
276 None => None,
277 };
278
279 let tcp_service_builder = (
280 ConsumeErrLayer::trace_as(tracing::Level::DEBUG),
281 LimitLayer::new(if self.concurrent_limit > 0 {
282 Either::A(ConcurrentPolicy::max(self.concurrent_limit))
283 } else {
284 Either::B(UnlimitedPolicy::new())
285 }),
286 if !self.timeout.is_zero() {
287 TimeoutLayer::new(self.timeout)
288 } else {
289 TimeoutLayer::never()
290 },
291 tcp_forwarded_layer,
292 BodyLimitLayer::request_only(self.body_limit),
293 #[cfg(any(feature = "rustls", feature = "boring"))]
294 tls_cfg.map(|cfg| TlsAcceptorLayer::new(cfg).with_store_client_hello(true)),
295 );
296
297 let http_transport_service = match self.http_version {
298 Some(Version::HTTP_2) => Either3::A({
299 let mut http = HttpServer::new_h2(exec);
300 if self.ws_support {
301 http.h2_mut().set_enable_connect_protocol();
302 }
303 http.service(http_service)
304 }),
305 Some(Version::HTTP_11 | Version::HTTP_10 | Version::HTTP_09) => {
306 Either3::B(HttpServer::new_http1(exec).service(http_service))
307 }
308 Some(version) => {
309 return Err(BoxError::from_static_str("unsupported http version")
310 .context_debug_field("version", version));
311 }
312 None => Either3::C({
313 let mut http = HttpServer::auto(exec);
314 if self.ws_support {
315 http.h2_mut().set_enable_connect_protocol();
316 }
317 http.service(http_service)
318 }),
319 };
320
321 Ok(tcp_service_builder.into_layer(http_transport_service))
322 }
323
324 pub fn build_http(
326 &self,
327 exec: Executor,
328 ) -> impl Service<Request, Output: IntoResponse, Error = Infallible> + use<H> {
329 let http_forwarded_layer = super::http_forwarded_layer(self.forward.as_ref());
330
331 let geo_attribution = self.geo_db.as_ref().and_then(|db| {
333 let notices: Vec<_> = db.attributions().collect();
334 (!notices.is_empty()).then(|| crate::cli::service::geo::geo_attribution_layer(notices))
335 });
336
337 (
338 TraceLayer::new_for_http(),
339 SetResponseHeaderLayer::<XClacksOverhead>::if_not_present_default_typed(),
340 AddRequiredResponseHeadersLayer::default(),
341 geo_attribution,
342 UserAgentClassifierLayer::new(),
343 ConsumeErrLayer::default(),
344 http_forwarded_layer,
345 self.ws_support.then(|| {
346 UpgradeLayer::new(
347 exec,
348 WebSocketMatcher::default(),
349 {
350 let acceptor = WebSocketAcceptor::default()
351 .with_protocols_flex(true)
352 .with_echo_protocols();
353
354 #[cfg(feature = "compression")]
355 {
356 acceptor.with_per_message_deflate_overwrite_extensions()
357 }
358 #[cfg(not(feature = "compression"))]
359 {
360 acceptor
361 }
362 },
363 ConsumeErrLayer::trace_as(tracing::Level::DEBUG)
364 .into_layer(WebSocketEchoService::default()),
365 )
366 }),
367 )
368 .into_layer(self.http_service_builder.layer(EchoService {
369 uadb: self.uadb.clone(),
370 geo_db: self.geo_db.clone(),
371 }))
372 }
373}
374
375#[derive(Debug, Clone)]
376#[non_exhaustive]
377pub struct EchoService {
379 uadb: Option<std::sync::Arc<UserAgentDatabase>>,
380 geo_db: Option<std::sync::Arc<IpGeoDb>>,
381}
382
383impl Service<Request> for EchoService {
384 type Output = Response;
385 type Error = BoxError;
386
387 async fn serve(&self, req: Request) -> Result<Self::Output, Self::Error> {
388 let user_agent_info = req
389 .extensions()
390 .get_ref()
391 .map(|ua: &UserAgent| {
392 json!({
393 "user_agent": ua.header_str().to_owned(),
394 "kind": ua.info().map(|info| info.kind.to_string()),
395 "version": ua.info().and_then(|info| info.version),
396 "platform": ua.platform().map(|v| v.to_string()),
397 })
398 })
399 .unwrap_or_default();
400
401 let authority = req
402 .authority()
403 .context("echo: resolve request authority")?
404 .to_string();
405 let scheme = req.protocol().unwrap_or(Protocol::HTTP).to_string();
406
407 let ua_str = req
408 .headers()
409 .get(USER_AGENT)
410 .and_then(|h| h.to_str().ok())
411 .map(ToOwned::to_owned);
412 tracing::debug!(
413 user_agent.original = ua_str,
414 "echo request received from ua with ua header",
415 );
416
417 #[derive(Debug, Serialize)]
418 struct FingerprintProfileData {
419 hash: String,
420 verbose: String,
421 matched: bool,
422 }
423
424 let ja4h = Ja4H::compute(&req)
425 .inspect_err(|err| tracing::error!("ja4h compute failure: {err:?}"))
426 .ok()
427 .map(|ja4h| {
428 let mut profile_ja4h: Option<FingerprintProfileData> = None;
429
430 if let Some(uadb) = self.uadb.as_deref()
431 && let Some(profile) =
432 ua_str.as_deref().and_then(|s| uadb.get_exact_header_str(s))
433 {
434 let matched_ja4h = match req.version() {
435 Version::HTTP_10 | Version::HTTP_11 => profile
436 .http
437 .ja4h_h1_navigate(Some(req.method().clone()))
438 .inspect_err(|err| {
439 tracing::trace!(
440 "ja4h computation of matched profile for incoming h1 req: {err:?}"
441 )
442 })
443 .ok(),
444 Version::HTTP_2 => profile
445 .http
446 .ja4h_h2_navigate(Some(req.method().clone()))
447 .inspect_err(|err| {
448 tracing::trace!(
449 "ja4h computation of matched profile for incoming h2 req: {err:?}"
450 )
451 })
452 .ok(),
453 _ => None,
454 };
455 if let Some(tgt) = matched_ja4h {
456 let hash = format!("{tgt}");
457 let matched = format!("{ja4h}") == hash;
458 profile_ja4h = Some(FingerprintProfileData {
459 hash,
460 verbose: format!("{tgt:?}"),
461 matched,
462 });
463 }
464 }
465
466 json!({
467 "hash": format!("{ja4h}"),
468 "verbose": format!("{ja4h:?}"),
469 "profile": profile_ja4h,
470 })
471 });
472
473 let (parts, body) = req.into_parts();
474
475 let body = body
476 .collect()
477 .await
478 .context("collect request body for echo purposes")?
479 .to_bytes();
480
481 let curl_request = curl::cmd_string_for_request_parts_and_payload(&parts, &body);
482
483 let headers: Vec<_> = Http1HeaderMap::new(parts.headers, Some(&parts.extensions))
484 .into_iter()
485 .map(|(name, value)| {
486 (
487 name,
488 std::str::from_utf8(value.as_bytes())
489 .map(|s| s.to_owned())
490 .unwrap_or_else(|_| format!("0x{:x?}", value.as_bytes())),
491 )
492 })
493 .collect();
494
495 let body = hex::encode(body.as_ref());
496
497 #[cfg(any(feature = "rustls", feature = "boring"))]
498 let tls_info = parts
499 .extensions
500 .get_ref::<SecureTransport>()
501 .and_then(|st| st.client_hello())
502 .map(|hello| {
503 let ja4 = Ja4::compute(parts.extensions.extensions())
504 .inspect_err(|err| tracing::trace!("ja4 computation: {err:?}"))
505 .ok();
506
507 let mut profile_ja4: Option<FingerprintProfileData> = None;
508
509 if let Some(uadb) = self.uadb.as_deref()
510 && let Some(profile) =
511 ua_str.as_deref().and_then(|s| uadb.get_exact_header_str(s))
512 {
513 let matched_ja4 = profile
514 .tls
515 .compute_ja4(
516 parts
517 .extensions
518 .get_ref::<NegotiatedTlsParameters>()
519 .map(|param| param.protocol_version),
520 )
521 .inspect_err(|err| {
522 tracing::trace!("ja4 computation of matched profile: {err:?}")
523 })
524 .ok();
525 if let (Some(src), Some(tgt)) = (ja4.as_ref(), matched_ja4) {
526 let hash = format!("{tgt}");
527 let matched = format!("{src}") == hash;
528 profile_ja4 = Some(FingerprintProfileData {
529 hash,
530 verbose: format!("{tgt:?}"),
531 matched,
532 });
533 }
534 }
535
536 let ja4 = ja4.map(|ja4| {
537 json!({
538 "hash": format!("{ja4}"),
539 "verbose": format!("{ja4:?}"),
540 "profile": profile_ja4,
541 })
542 });
543
544 let ja3 = Ja3::compute(parts.extensions.extensions())
545 .inspect_err(|err| tracing::trace!("ja3 computation: {err:?}"))
546 .ok();
547
548 let mut profile_ja3: Option<FingerprintProfileData> = None;
549
550 if let Some(uadb) = self.uadb.as_deref()
551 && let Some(profile) =
552 ua_str.as_deref().and_then(|s| uadb.get_exact_header_str(s))
553 {
554 let matched_ja3 = profile
555 .tls
556 .compute_ja3(
557 parts
558 .extensions
559 .get_ref::<NegotiatedTlsParameters>()
560 .map(|param| param.protocol_version),
561 )
562 .inspect_err(|err| {
563 tracing::trace!("ja3 computation of matched profile: {err:?}")
564 })
565 .ok();
566 if let (Some(src), Some(tgt)) = (ja3.as_ref(), matched_ja3) {
567 let hash = format!("{tgt:x}");
568 let matched = format!("{src:x}") == hash;
569 profile_ja3 = Some(FingerprintProfileData {
570 hash,
571 verbose: format!("{tgt}"),
572 matched,
573 });
574 }
575 }
576
577 let ja3 = ja3.map(|ja3| {
578 json!({
579 "hash": format!("{ja3:x}"),
580 "verbose": format!("{ja3}"),
581 "profile": profile_ja3,
582 })
583 });
584
585 let peet = PeetPrint::compute(parts.extensions.extensions())
586 .inspect_err(|err| tracing::trace!("peet computation: {err:?}"))
587 .ok();
588
589 let mut profile_peet: Option<FingerprintProfileData> = None;
590
591 if let Some(uadb) = self.uadb.as_deref()
592 && let Some(profile) =
593 ua_str.as_deref().and_then(|s| uadb.get_exact_header_str(s))
594 {
595 let matched_peet = profile
596 .tls
597 .compute_peet()
598 .inspect_err(|err| {
599 tracing::trace!("peetprint computation of matched profile: {err:?}")
600 })
601 .ok();
602 if let (Some(src), Some(tgt)) = (peet.as_ref(), matched_peet) {
603 let hash = format!("{tgt}");
604 let matched = format!("{src}") == hash;
605 profile_peet = Some(FingerprintProfileData {
606 hash,
607 verbose: format!("{tgt:?}"),
608 matched,
609 });
610 }
611 }
612
613 let peet = peet.map(|peet| {
614 json!({
615 "hash": format!("{peet}"),
616 "verbose": format!("{peet:?}"),
617 "profile": profile_peet,
618 })
619 });
620
621 json!({
622 "header": {
623 "version": hello.protocol_version().to_string(),
624 "cipher_suites": hello
625 .cipher_suites().iter().map(|s| s.to_string()).collect::<Vec<_>>(),
626 "compression_algorithms": hello
627 .compression_algorithms().iter().map(|s| s.to_string()).collect::<Vec<_>>(),
628 },
629 "extensions": hello.extensions().iter().map(|extension| match extension {
630 ClientHelloExtension::ServerName(domain) => json!({
631 "id": extension.id().to_string(),
632 "data": domain,
633 }),
634 ClientHelloExtension::SignatureAlgorithms(v) => json!({
635 "id": extension.id().to_string(),
636 "data": v.iter().map(|s| s.to_string()).collect::<Vec<_>>(),
637 }),
638 ClientHelloExtension::SupportedVersions(v) => json!({
639 "id": extension.id().to_string(),
640 "data": v.iter().map(|s| s.to_string()).collect::<Vec<_>>(),
641 }),
642 ClientHelloExtension::ApplicationLayerProtocolNegotiation(v) => json!({
643 "id": extension.id().to_string(),
644 "data": v.iter().map(|s| s.to_string()).collect::<Vec<_>>(),
645 }),
646 ClientHelloExtension::ApplicationSettings{ protocols, .. } => json!({
647 "id": extension.id().to_string(),
648 "data": protocols.iter().map(|s| s.to_string()).collect::<Vec<_>>(),
649 }),
650 ClientHelloExtension::SupportedGroups(v) => json!({
651 "id": extension.id().to_string(),
652 "data": v.iter().map(|s| s.to_string()).collect::<Vec<_>>(),
653 }),
654 ClientHelloExtension::ECPointFormats(v) => json!({
655 "id": extension.id().to_string(),
656 "data": v.iter().map(|s| s.to_string()).collect::<Vec<_>>(),
657 }),
658 ClientHelloExtension::CertificateCompression(v) => json!({
659 "id": extension.id().to_string(),
660 "data": v.iter().map(|s| s.to_string()).collect::<Vec<_>>(),
661 }),
662 ClientHelloExtension::DelegatedCredentials(v) => json!({
663 "id": extension.id().to_string(),
664 "data": v.iter().map(|s| s.to_string()).collect::<Vec<_>>(),
665 }),
666 ClientHelloExtension::RecordSizeLimit(v) => json!({
667 "id": extension.id().to_string(),
668 "data": v.to_string(),
669 }),
670 ClientHelloExtension::EncryptedClientHello(ech) => match ech {
671 ECHClientHello::Outer(ech) => json!({
672 "id": extension.id().to_string(),
673 "data": {
674 "type": "outer",
675 "cipher_suite": {
676 "aead_id": ech.cipher_suite.aead_id.to_string(),
677 "kdf_id": ech.cipher_suite.kdf_id.to_string(),
678 },
679 "config_id": ech.config_id,
680 "enc": format!("0x{}", hex::encode(&ech.enc)),
681 "payload": format!("0x{}", hex::encode(&ech.payload)),
682 },
683 }),
684 ECHClientHello::Inner => json!({
685 "id": extension.id().to_string(),
686 "data": {
687 "type": "inner",
688 },
689 })
690
691 }
692 ClientHelloExtension::Opaque { id, data } => if data.is_empty() {
693 json!({
694 "id": id.to_string()
695 })
696 } else {
697 json!({
698 "id": id.to_string(),
699 "data": format!("0x{}", hex::encode(data))
700 })
701 },
702 }).collect::<Vec<_>>(),
703 "ja3": ja3,
704 "ja4": ja4,
705 "peet": peet
706 })
707 });
708
709 #[cfg(not(any(feature = "rustls", feature = "boring")))]
710 let tls_info: Option<()> = None;
711
712 let mut h2 = None;
713 if parts.version == Version::HTTP_2 {
714 let early_frames = parts.extensions.get_ref::<EarlyFrameCapture>();
715 let pseudo_headers = parts.extensions.get_ref::<PseudoHeaderOrder>();
716 let akamai_h2 = AkamaiH2::compute(&parts.extensions)
717 .inspect_err(|err| tracing::trace!("akamai h2 compute failure: {err:?}"))
718 .ok()
719 .map(|akamai| {
720 json!({
721 "hash": format!("{akamai}"),
722 "verbose": format!("{akamai:?}"),
723 })
724 });
725
726 h2 = Some(json!({
727 "early_frames": early_frames,
728 "pseudo_headers": pseudo_headers,
729 "akamai_h2": akamai_h2,
730 }));
731 }
732
733 let geo = self
735 .geo_db
736 .as_ref()
737 .and_then(|db| {
738 parts
739 .extensions
740 .get_ref::<Forwarded>()
741 .and_then(|f| f.client_ip())
742 .or_else(|| {
743 parts
744 .extensions
745 .get_ref::<SocketInfo>()
746 .map(|s| s.peer_addr().ip_addr)
747 })
748 .and_then(|ip| db.resolve(ip))
749 })
750 .map(|info| serde_json::to_value(&info).unwrap_or_default())
751 .unwrap_or(serde_json::Value::Null);
752
753 Ok(Json(json!({
754 "ua": user_agent_info,
755 "geo": geo,
756 "http": {
757 "version": format!("{:?}", parts.version),
758 "scheme": scheme,
759 "method": format!("{:?}", parts.method),
760 "authority": authority,
761 "path": parts.uri.path_or_root().to_owned(),
762 "query": parts.uri.query().map(|q| q.as_raw_str().to_owned()),
763 "h2": h2,
764 "headers": headers,
765 "payload": body,
766 "ja4h": ja4h,
767 "curl": curl_request,
768 },
769 "tls": tls_info,
770 "socket_addr": parts.extensions.get_ref::<Forwarded>()
771 .and_then(|f|
772 f.client_socket_addr().map(|addr| addr.to_string())
773 .or_else(|| f.client_ip().map(|ip| ip.to_string()))
774 ).or_else(|| parts.extensions.get_ref::<SocketInfo>().map(|v| v.peer_addr().to_string())),
775 }))
776 .into_response())
777 }
778}