1use crate::{
9 Layer, Service,
10 cli::ForwardKind,
11 combinators::{Either, Either3, Either7},
12 error::{BoxError, ErrorContext},
13 extensions::ExtensionsRef,
14 http::{
15 Request, Response, Version,
16 body::util::BodyExt,
17 convert::curl,
18 core::h2::frame::EarlyFrameCapture,
19 header::USER_AGENT,
20 headers::exotic::XClacksOverhead,
21 headers::forwarded::{CFConnectingIp, ClientIp, TrueClientIp, XClientIp, XRealIp},
22 layer::set_header::SetResponseHeaderLayer,
23 layer::{
24 forwarded::GetForwardedHeaderLayer, required_header::AddRequiredResponseHeadersLayer,
25 trace::TraceLayer,
26 },
27 proto::h1::Http1HeaderMap,
28 proto::h2::PseudoHeaderOrder,
29 server::HttpServer,
30 service::web::{extract::Json, response::IntoResponse},
31 ws::handshake::{
32 matcher::WebSocketMatcher,
33 server::{WebSocketAcceptor, WebSocketEchoService},
34 },
35 },
36 layer::limit::policy::UnlimitedPolicy,
37 layer::{ConsumeErrLayer, LimitLayer, TimeoutLayer, limit::policy::ConcurrentPolicy},
38 net::fingerprint::{AkamaiH2, Ja4H},
39 net::forwarded::Forwarded,
40 net::http::RequestContext,
41 net::stream::{SocketInfo, layer::http::BodyLimitLayer},
42 proxy::haproxy::server::HaProxyLayer,
43 rt::Executor,
44 tcp::TcpStream,
45 telemetry::tracing,
46 ua::{UserAgent, layer::classifier::UserAgentClassifierLayer, profile::UserAgentDatabase},
47};
48
49use rama_core::error::{ErrorExt as _, extra::OpaqueError};
50use rama_http::layer::upgrade::UpgradeLayer;
51use serde::Serialize;
52use serde_json::json;
53use std::{convert::Infallible, sync::Arc, time::Duration};
54
55#[cfg(all(feature = "rustls", not(feature = "boring")))]
56use crate::tls::rustls::server::{TlsAcceptorData, TlsAcceptorLayer};
57
58#[cfg(any(feature = "rustls", feature = "boring"))]
59use crate::{
60 net::fingerprint::{Ja3, Ja4, PeetPrint},
61 net::tls::{
62 SecureTransport,
63 client::ClientHelloExtension,
64 client::{ECHClientHello, NegotiatedTlsParameters},
65 },
66};
67#[cfg(feature = "boring")]
68use crate::{
69 net::tls::server::ServerConfig,
70 tls::boring::server::{TlsAcceptorData, TlsAcceptorLayer},
71};
72
73#[cfg(feature = "boring")]
74type TlsConfig = ServerConfig;
75
76#[cfg(all(feature = "rustls", not(feature = "boring")))]
77type TlsConfig = TlsAcceptorData;
78
79#[derive(Debug, Clone)]
80pub struct EchoServiceBuilder<H> {
83 concurrent_limit: usize,
84 body_limit: usize,
85 timeout: Duration,
86 forward: Option<ForwardKind>,
87
88 #[cfg(any(feature = "rustls", feature = "boring"))]
89 tls_server_config: Option<TlsConfig>,
90
91 http_version: Option<Version>,
92
93 ws_support: bool,
94
95 http_service_builder: H,
96
97 uadb: Option<std::sync::Arc<UserAgentDatabase>>,
98}
99
100impl Default for EchoServiceBuilder<()> {
101 fn default() -> Self {
102 Self {
103 concurrent_limit: 0,
104 body_limit: 1024 * 1024,
105 timeout: Duration::ZERO,
106 forward: None,
107
108 #[cfg(any(feature = "rustls", feature = "boring"))]
109 tls_server_config: None,
110
111 http_version: None,
112
113 ws_support: false,
114
115 http_service_builder: (),
116
117 uadb: None,
118 }
119 }
120}
121
122impl EchoServiceBuilder<()> {
123 #[must_use]
125 pub fn new() -> Self {
126 Self::default()
127 }
128}
129
130impl<H> EchoServiceBuilder<H> {
131 crate::utils::macros::generate_set_and_with! {
132 pub fn concurrent(mut self, limit: usize) -> Self {
136 self.concurrent_limit = limit;
137 self
138 }
139 }
140
141 crate::utils::macros::generate_set_and_with! {
142 pub fn body_limit(mut self, limit: usize) -> Self {
144 self.body_limit = limit;
145 self
146 }
147 }
148
149 crate::utils::macros::generate_set_and_with! {
150 pub fn timeout(mut self, timeout: Duration) -> Self {
154 self.timeout = timeout;
155 self
156 }
157 }
158
159 crate::utils::macros::generate_set_and_with! {
160 pub fn forward(mut self, kind: Option<ForwardKind>) -> Self {
172 self.forward = kind;
173 self
174 }
175 }
176
177 crate::utils::macros::generate_set_and_with! {
178 #[cfg(any(feature = "rustls", feature = "boring"))]
179 pub fn tls_server_config(mut self, cfg: Option<TlsConfig>) -> Self {
182 self.tls_server_config = cfg;
183 self
184 }
185 }
186
187 crate::utils::macros::generate_set_and_with! {
188 pub fn http_version(mut self, version: Option<Version>) -> Self {
190 self.http_version = version;
191 self
192 }
193 }
194
195 pub fn with_http_layer<H2>(self, layer: H2) -> EchoServiceBuilder<(H, H2)> {
197 EchoServiceBuilder {
198 concurrent_limit: self.concurrent_limit,
199 body_limit: self.body_limit,
200 timeout: self.timeout,
201 forward: self.forward,
202
203 #[cfg(any(feature = "rustls", feature = "boring"))]
204 tls_server_config: self.tls_server_config,
205
206 http_version: self.http_version,
207
208 ws_support: self.ws_support,
209
210 http_service_builder: (self.http_service_builder, layer),
211
212 uadb: self.uadb,
213 }
214 }
215
216 crate::utils::macros::generate_set_and_with! {
217 pub fn user_agent_database(
220 mut self,
221 db: Option<std::sync::Arc<UserAgentDatabase>>,
222 ) -> Self {
223 self.uadb = db;
224 self
225 }
226 }
227
228 crate::utils::macros::generate_set_and_with! {
229 pub fn ws_support(
231 mut self,
232 support: bool,
233 ) -> Self {
234 self.ws_support = support;
235 self
236 }
237 }
238}
239
240impl<H> EchoServiceBuilder<H>
241where
242 H: Layer<EchoService, Service: Service<Request, Output = Response, Error = BoxError>>,
243{
244 #[expect(unused_mut)]
245 pub fn build(
247 mut self,
248 exec: Executor,
249 ) -> Result<impl Service<TcpStream, Output = (), Error = Infallible>, BoxError> {
250 let tcp_forwarded_layer = match &self.forward {
251 Some(ForwardKind::HaProxy) => Some(HaProxyLayer::default()),
252 _ => None,
253 };
254
255 let http_service = Arc::new(self.build_http(exec.clone()));
256
257 #[cfg(all(feature = "rustls", not(feature = "boring")))]
258 let tls_cfg = self.tls_server_config;
259
260 #[cfg(feature = "boring")]
261 let tls_cfg: Option<TlsAcceptorData> = match self.tls_server_config {
262 Some(cfg) => Some(cfg.try_into()?),
263 None => None,
264 };
265
266 let tcp_service_builder = (
267 ConsumeErrLayer::trace_as(tracing::Level::DEBUG),
268 LimitLayer::new(if self.concurrent_limit > 0 {
269 Either::A(ConcurrentPolicy::max(self.concurrent_limit))
270 } else {
271 Either::B(UnlimitedPolicy::new())
272 }),
273 if !self.timeout.is_zero() {
274 TimeoutLayer::new(self.timeout)
275 } else {
276 TimeoutLayer::never()
277 },
278 tcp_forwarded_layer,
279 BodyLimitLayer::request_only(self.body_limit),
280 #[cfg(any(feature = "rustls", feature = "boring"))]
281 tls_cfg.map(|cfg| TlsAcceptorLayer::new(cfg).with_store_client_hello(true)),
282 );
283
284 let http_transport_service = match self.http_version {
285 Some(Version::HTTP_2) => Either3::A({
286 let mut http = HttpServer::new_h2(exec);
287 if self.ws_support {
288 http.h2_mut().set_enable_connect_protocol();
289 }
290 http.service(http_service)
291 }),
292 Some(Version::HTTP_11 | Version::HTTP_10 | Version::HTTP_09) => {
293 Either3::B(HttpServer::new_http1(exec).service(http_service))
294 }
295 Some(version) => {
296 return Err(OpaqueError::from_static_str("unsupported http version")
297 .context_debug_field("version", version));
298 }
299 None => Either3::C({
300 let mut http = HttpServer::auto(exec);
301 if self.ws_support {
302 http.h2_mut().set_enable_connect_protocol();
303 }
304 http.service(http_service)
305 }),
306 };
307
308 Ok(tcp_service_builder.into_layer(http_transport_service))
309 }
310
311 pub fn build_http(
313 &self,
314 exec: Executor,
315 ) -> impl Service<Request, Output: IntoResponse, Error = Infallible> + use<H> {
316 let http_forwarded_layer = match &self.forward {
317 None | Some(ForwardKind::HaProxy) => None,
318 Some(ForwardKind::Forwarded) => Some(Either7::A(GetForwardedHeaderLayer::forwarded())),
319 Some(ForwardKind::XForwardedFor) => {
320 Some(Either7::B(GetForwardedHeaderLayer::x_forwarded_for()))
321 }
322 Some(ForwardKind::XClientIp) => {
323 Some(Either7::C(GetForwardedHeaderLayer::<XClientIp>::new()))
324 }
325 Some(ForwardKind::ClientIp) => {
326 Some(Either7::D(GetForwardedHeaderLayer::<ClientIp>::new()))
327 }
328 Some(ForwardKind::XRealIp) => {
329 Some(Either7::E(GetForwardedHeaderLayer::<XRealIp>::new()))
330 }
331 Some(ForwardKind::CFConnectingIp) => {
332 Some(Either7::F(GetForwardedHeaderLayer::<CFConnectingIp>::new()))
333 }
334 Some(ForwardKind::TrueClientIp) => {
335 Some(Either7::G(GetForwardedHeaderLayer::<TrueClientIp>::new()))
336 }
337 };
338
339 (
340 TraceLayer::new_for_http(),
341 SetResponseHeaderLayer::<XClacksOverhead>::if_not_present_default_typed(),
342 AddRequiredResponseHeadersLayer::default(),
343 UserAgentClassifierLayer::new(),
344 ConsumeErrLayer::default(),
345 http_forwarded_layer,
346 self.ws_support.then(|| {
347 UpgradeLayer::new(
348 exec,
349 WebSocketMatcher::default(),
350 {
351 let acceptor = WebSocketAcceptor::default()
352 .with_protocols_flex(true)
353 .with_echo_protocols();
354
355 #[cfg(feature = "compression")]
356 {
357 acceptor.with_per_message_deflate_overwrite_extensions()
358 }
359 #[cfg(not(feature = "compression"))]
360 {
361 acceptor
362 }
363 },
364 ConsumeErrLayer::trace_as(tracing::Level::DEBUG)
365 .into_layer(WebSocketEchoService::default()),
366 )
367 }),
368 )
369 .into_layer(self.http_service_builder.layer(EchoService {
370 uadb: self.uadb.clone(),
371 }))
372 }
373}
374
375#[derive(Debug, Clone)]
376#[non_exhaustive]
377pub struct EchoService {
379 uadb: Option<std::sync::Arc<UserAgentDatabase>>,
380}
381
382impl Service<Request> for EchoService {
383 type Output = Response;
384 type Error = BoxError;
385
386 async fn serve(&self, req: Request) -> Result<Self::Output, Self::Error> {
387 let user_agent_info = req
388 .extensions()
389 .get_ref()
390 .map(|ua: &UserAgent| {
391 json!({
392 "user_agent": ua.header_str().to_owned(),
393 "kind": ua.info().map(|info| info.kind.to_string()),
394 "version": ua.info().and_then(|info| info.version),
395 "platform": ua.platform().map(|v| v.to_string()),
396 })
397 })
398 .unwrap_or_default();
399
400 let request_context = RequestContext::try_from(&req)?;
401
402 let authority = request_context.authority.to_string();
403 let scheme = request_context.protocol.to_string();
404
405 let ua_str = req
406 .headers()
407 .get(USER_AGENT)
408 .and_then(|h| h.to_str().ok())
409 .map(ToOwned::to_owned);
410 tracing::debug!(
411 user_agent.original = ua_str,
412 "echo request received from ua with ua header",
413 );
414
415 #[derive(Debug, Serialize)]
416 struct FingerprintProfileData {
417 hash: String,
418 verbose: String,
419 matched: bool,
420 }
421
422 let ja4h = Ja4H::compute(&req)
423 .inspect_err(|err| tracing::error!("ja4h compute failure: {err:?}"))
424 .ok()
425 .map(|ja4h| {
426 let mut profile_ja4h: Option<FingerprintProfileData> = None;
427
428 if let Some(uadb) = self.uadb.as_deref()
429 && let Some(profile) =
430 ua_str.as_deref().and_then(|s| uadb.get_exact_header_str(s))
431 {
432 let matched_ja4h = match req.version() {
433 Version::HTTP_10 | Version::HTTP_11 => profile
434 .http
435 .ja4h_h1_navigate(Some(req.method().clone()))
436 .inspect_err(|err| {
437 tracing::trace!(
438 "ja4h computation of matched profile for incoming h1 req: {err:?}"
439 )
440 })
441 .ok(),
442 Version::HTTP_2 => profile
443 .http
444 .ja4h_h2_navigate(Some(req.method().clone()))
445 .inspect_err(|err| {
446 tracing::trace!(
447 "ja4h computation of matched profile for incoming h2 req: {err:?}"
448 )
449 })
450 .ok(),
451 _ => None,
452 };
453 if let Some(tgt) = matched_ja4h {
454 let hash = format!("{tgt}");
455 let matched = format!("{ja4h}") == hash;
456 profile_ja4h = Some(FingerprintProfileData {
457 hash,
458 verbose: format!("{tgt:?}"),
459 matched,
460 });
461 }
462 }
463
464 json!({
465 "hash": format!("{ja4h}"),
466 "verbose": format!("{ja4h:?}"),
467 "profile": profile_ja4h,
468 })
469 });
470
471 let (parts, body) = req.into_parts();
472
473 let body = body
474 .collect()
475 .await
476 .context("collect request body for echo purposes")?
477 .to_bytes();
478
479 let curl_request = curl::cmd_string_for_request_parts_and_payload(&parts, &body);
480
481 let headers: Vec<_> = Http1HeaderMap::new(parts.headers, Some(&parts.extensions))
482 .into_iter()
483 .map(|(name, value)| {
484 (
485 name,
486 std::str::from_utf8(value.as_bytes())
487 .map(|s| s.to_owned())
488 .unwrap_or_else(|_| format!("0x{:x?}", value.as_bytes())),
489 )
490 })
491 .collect();
492
493 let body = hex::encode(body.as_ref());
494
495 #[cfg(any(feature = "rustls", feature = "boring"))]
496 let tls_info = parts
497 .extensions
498 .get_ref::<SecureTransport>()
499 .and_then(|st| st.client_hello())
500 .map(|hello| {
501 let ja4 = Ja4::compute(parts.extensions.extensions())
502 .inspect_err(|err| tracing::trace!("ja4 computation: {err:?}"))
503 .ok();
504
505 let mut profile_ja4: Option<FingerprintProfileData> = None;
506
507 if let Some(uadb) = self.uadb.as_deref()
508 && let Some(profile) =
509 ua_str.as_deref().and_then(|s| uadb.get_exact_header_str(s))
510 {
511 let matched_ja4 = profile
512 .tls
513 .compute_ja4(
514 parts
515 .extensions
516 .get_ref::<NegotiatedTlsParameters>()
517 .map(|param| param.protocol_version),
518 )
519 .inspect_err(|err| {
520 tracing::trace!("ja4 computation of matched profile: {err:?}")
521 })
522 .ok();
523 if let (Some(src), Some(tgt)) = (ja4.as_ref(), matched_ja4) {
524 let hash = format!("{tgt}");
525 let matched = format!("{src}") == hash;
526 profile_ja4 = Some(FingerprintProfileData {
527 hash,
528 verbose: format!("{tgt:?}"),
529 matched,
530 });
531 }
532 }
533
534 let ja4 = ja4.map(|ja4| {
535 json!({
536 "hash": format!("{ja4}"),
537 "verbose": format!("{ja4:?}"),
538 "profile": profile_ja4,
539 })
540 });
541
542 let ja3 = Ja3::compute(parts.extensions.extensions())
543 .inspect_err(|err| tracing::trace!("ja3 computation: {err:?}"))
544 .ok();
545
546 let mut profile_ja3: Option<FingerprintProfileData> = None;
547
548 if let Some(uadb) = self.uadb.as_deref()
549 && let Some(profile) =
550 ua_str.as_deref().and_then(|s| uadb.get_exact_header_str(s))
551 {
552 let matched_ja3 = profile
553 .tls
554 .compute_ja3(
555 parts
556 .extensions
557 .get_ref::<NegotiatedTlsParameters>()
558 .map(|param| param.protocol_version),
559 )
560 .inspect_err(|err| {
561 tracing::trace!("ja3 computation of matched profile: {err:?}")
562 })
563 .ok();
564 if let (Some(src), Some(tgt)) = (ja3.as_ref(), matched_ja3) {
565 let hash = format!("{tgt:x}");
566 let matched = format!("{src:x}") == hash;
567 profile_ja3 = Some(FingerprintProfileData {
568 hash,
569 verbose: format!("{tgt}"),
570 matched,
571 });
572 }
573 }
574
575 let ja3 = ja3.map(|ja3| {
576 json!({
577 "hash": format!("{ja3:x}"),
578 "verbose": format!("{ja3}"),
579 "profile": profile_ja3,
580 })
581 });
582
583 let peet = PeetPrint::compute(parts.extensions.extensions())
584 .inspect_err(|err| tracing::trace!("peet computation: {err:?}"))
585 .ok();
586
587 let mut profile_peet: Option<FingerprintProfileData> = None;
588
589 if let Some(uadb) = self.uadb.as_deref()
590 && let Some(profile) =
591 ua_str.as_deref().and_then(|s| uadb.get_exact_header_str(s))
592 {
593 let matched_peet = profile
594 .tls
595 .compute_peet()
596 .inspect_err(|err| {
597 tracing::trace!("peetprint computation of matched profile: {err:?}")
598 })
599 .ok();
600 if let (Some(src), Some(tgt)) = (peet.as_ref(), matched_peet) {
601 let hash = format!("{tgt}");
602 let matched = format!("{src}") == hash;
603 profile_peet = Some(FingerprintProfileData {
604 hash,
605 verbose: format!("{tgt:?}"),
606 matched,
607 });
608 }
609 }
610
611 let peet = peet.map(|peet| {
612 json!({
613 "hash": format!("{peet}"),
614 "verbose": format!("{peet:?}"),
615 "profile": profile_peet,
616 })
617 });
618
619 json!({
620 "header": {
621 "version": hello.protocol_version().to_string(),
622 "cipher_suites": hello
623 .cipher_suites().iter().map(|s| s.to_string()).collect::<Vec<_>>(),
624 "compression_algorithms": hello
625 .compression_algorithms().iter().map(|s| s.to_string()).collect::<Vec<_>>(),
626 },
627 "extensions": hello.extensions().iter().map(|extension| match extension {
628 ClientHelloExtension::ServerName(domain) => json!({
629 "id": extension.id().to_string(),
630 "data": domain,
631 }),
632 ClientHelloExtension::SignatureAlgorithms(v) => json!({
633 "id": extension.id().to_string(),
634 "data": v.iter().map(|s| s.to_string()).collect::<Vec<_>>(),
635 }),
636 ClientHelloExtension::SupportedVersions(v) => json!({
637 "id": extension.id().to_string(),
638 "data": v.iter().map(|s| s.to_string()).collect::<Vec<_>>(),
639 }),
640 ClientHelloExtension::ApplicationLayerProtocolNegotiation(v) => json!({
641 "id": extension.id().to_string(),
642 "data": v.iter().map(|s| s.to_string()).collect::<Vec<_>>(),
643 }),
644 ClientHelloExtension::ApplicationSettings{ protocols, .. } => json!({
645 "id": extension.id().to_string(),
646 "data": protocols.iter().map(|s| s.to_string()).collect::<Vec<_>>(),
647 }),
648 ClientHelloExtension::SupportedGroups(v) => json!({
649 "id": extension.id().to_string(),
650 "data": v.iter().map(|s| s.to_string()).collect::<Vec<_>>(),
651 }),
652 ClientHelloExtension::ECPointFormats(v) => json!({
653 "id": extension.id().to_string(),
654 "data": v.iter().map(|s| s.to_string()).collect::<Vec<_>>(),
655 }),
656 ClientHelloExtension::CertificateCompression(v) => json!({
657 "id": extension.id().to_string(),
658 "data": v.iter().map(|s| s.to_string()).collect::<Vec<_>>(),
659 }),
660 ClientHelloExtension::DelegatedCredentials(v) => json!({
661 "id": extension.id().to_string(),
662 "data": v.iter().map(|s| s.to_string()).collect::<Vec<_>>(),
663 }),
664 ClientHelloExtension::RecordSizeLimit(v) => json!({
665 "id": extension.id().to_string(),
666 "data": v.to_string(),
667 }),
668 ClientHelloExtension::EncryptedClientHello(ech) => match ech {
669 ECHClientHello::Outer(ech) => json!({
670 "id": extension.id().to_string(),
671 "data": {
672 "type": "outer",
673 "cipher_suite": {
674 "aead_id": ech.cipher_suite.aead_id.to_string(),
675 "kdf_id": ech.cipher_suite.kdf_id.to_string(),
676 },
677 "config_id": ech.config_id,
678 "enc": format!("0x{}", hex::encode(&ech.enc)),
679 "payload": format!("0x{}", hex::encode(&ech.payload)),
680 },
681 }),
682 ECHClientHello::Inner => json!({
683 "id": extension.id().to_string(),
684 "data": {
685 "type": "inner",
686 },
687 })
688
689 }
690 ClientHelloExtension::Opaque { id, data } => if data.is_empty() {
691 json!({
692 "id": id.to_string()
693 })
694 } else {
695 json!({
696 "id": id.to_string(),
697 "data": format!("0x{}", hex::encode(data))
698 })
699 },
700 }).collect::<Vec<_>>(),
701 "ja3": ja3,
702 "ja4": ja4,
703 "peet": peet
704 })
705 });
706
707 #[cfg(not(any(feature = "rustls", feature = "boring")))]
708 let tls_info: Option<()> = None;
709
710 let mut h2 = None;
711 if parts.version == Version::HTTP_2 {
712 let early_frames = parts.extensions.get_ref::<EarlyFrameCapture>();
713 let pseudo_headers = parts.extensions.get_ref::<PseudoHeaderOrder>();
714 let akamai_h2 = AkamaiH2::compute(&parts.extensions)
715 .inspect_err(|err| tracing::trace!("akamai h2 compute failure: {err:?}"))
716 .ok()
717 .map(|akamai| {
718 json!({
719 "hash": format!("{akamai}"),
720 "verbose": format!("{akamai:?}"),
721 })
722 });
723
724 h2 = Some(json!({
725 "early_frames": early_frames,
726 "pseudo_headers": pseudo_headers,
727 "akamai_h2": akamai_h2,
728 }));
729 }
730
731 Ok(Json(json!({
732 "ua": user_agent_info,
733 "http": {
734 "version": format!("{:?}", parts.version),
735 "scheme": scheme,
736 "method": format!("{:?}", parts.method),
737 "authority": authority,
738 "path": parts.uri.path().to_owned(),
739 "query": parts.uri.query().map(str::to_owned),
740 "h2": h2,
741 "headers": headers,
742 "payload": body,
743 "ja4h": ja4h,
744 "curl": curl_request,
745 },
746 "tls": tls_info,
747 "socket_addr": parts.extensions.get_ref::<Forwarded>()
748 .and_then(|f|
749 f.client_socket_addr().map(|addr| addr.to_string())
750 .or_else(|| f.client_ip().map(|ip| ip.to_string()))
751 ).or_else(|| parts.extensions.get_ref::<SocketInfo>().map(|v| v.peer_addr().to_string())),
752 }))
753 .into_response())
754 }
755}