1use crate::{
9 Layer, Service,
10 cli::ForwardKind,
11 combinators::{Either, Either3, Either7},
12 error::{BoxError, ErrorContext},
13 extensions::ExtensionsRef,
14 http::{
15 Request, Response, Version,
16 body::util::BodyExt,
17 convert::curl,
18 core::h2::frame::EarlyFrameCapture,
19 header::USER_AGENT,
20 headers::exotic::XClacksOverhead,
21 headers::forwarded::{CFConnectingIp, ClientIp, TrueClientIp, XClientIp, XRealIp},
22 layer::set_header::SetResponseHeaderLayer,
23 layer::{
24 forwarded::GetForwardedHeaderLayer, required_header::AddRequiredResponseHeadersLayer,
25 trace::TraceLayer,
26 },
27 proto::h1::Http1HeaderMap,
28 proto::h2::PseudoHeaderOrder,
29 server::{HttpServer, layer::upgrade::UpgradeLayer},
30 service::web::{extract::Json, response::IntoResponse},
31 ws::handshake::server::{WebSocketAcceptor, WebSocketEchoService, WebSocketMatcher},
32 },
33 layer::limit::policy::UnlimitedPolicy,
34 layer::{ConsumeErrLayer, LimitLayer, TimeoutLayer, limit::policy::ConcurrentPolicy},
35 net::fingerprint::{AkamaiH2, Ja4H},
36 net::forwarded::Forwarded,
37 net::http::RequestContext,
38 net::stream::{SocketInfo, layer::http::BodyLimitLayer},
39 proxy::haproxy::server::HaProxyLayer,
40 rt::Executor,
41 tcp::TcpStream,
42 telemetry::tracing,
43 ua::{UserAgent, layer::classifier::UserAgentClassifierLayer, profile::UserAgentDatabase},
44};
45
46use rama_core::error::ErrorExt as _;
47use serde::Serialize;
48use serde_json::json;
49use std::{convert::Infallible, sync::Arc, time::Duration};
50
51#[cfg(all(feature = "rustls", not(feature = "boring")))]
52use crate::tls::rustls::server::{TlsAcceptorData, TlsAcceptorLayer};
53
54#[cfg(any(feature = "rustls", feature = "boring"))]
55use crate::{
56 net::fingerprint::{Ja3, Ja4, PeetPrint},
57 net::tls::{
58 SecureTransport,
59 client::ClientHelloExtension,
60 client::{ECHClientHello, NegotiatedTlsParameters},
61 },
62};
63#[cfg(feature = "boring")]
64use crate::{
65 net::tls::server::ServerConfig,
66 tls::boring::server::{TlsAcceptorData, TlsAcceptorLayer},
67};
68
69#[cfg(feature = "boring")]
70type TlsConfig = ServerConfig;
71
72#[cfg(all(feature = "rustls", not(feature = "boring")))]
73type TlsConfig = TlsAcceptorData;
74
75#[derive(Debug, Clone)]
76pub struct EchoServiceBuilder<H> {
79 concurrent_limit: usize,
80 body_limit: usize,
81 timeout: Duration,
82 forward: Option<ForwardKind>,
83
84 #[cfg(any(feature = "rustls", feature = "boring"))]
85 tls_server_config: Option<TlsConfig>,
86
87 http_version: Option<Version>,
88
89 ws_support: bool,
90
91 http_service_builder: H,
92
93 uadb: Option<std::sync::Arc<UserAgentDatabase>>,
94}
95
96impl Default for EchoServiceBuilder<()> {
97 fn default() -> Self {
98 Self {
99 concurrent_limit: 0,
100 body_limit: 1024 * 1024,
101 timeout: Duration::ZERO,
102 forward: None,
103
104 #[cfg(any(feature = "rustls", feature = "boring"))]
105 tls_server_config: None,
106
107 http_version: None,
108
109 ws_support: false,
110
111 http_service_builder: (),
112
113 uadb: None,
114 }
115 }
116}
117
118impl EchoServiceBuilder<()> {
119 #[must_use]
121 pub fn new() -> Self {
122 Self::default()
123 }
124}
125
126impl<H> EchoServiceBuilder<H> {
127 crate::utils::macros::generate_set_and_with! {
128 pub fn concurrent(mut self, limit: usize) -> Self {
132 self.concurrent_limit = limit;
133 self
134 }
135 }
136
137 crate::utils::macros::generate_set_and_with! {
138 pub fn body_limit(mut self, limit: usize) -> Self {
140 self.body_limit = limit;
141 self
142 }
143 }
144
145 crate::utils::macros::generate_set_and_with! {
146 pub fn timeout(mut self, timeout: Duration) -> Self {
150 self.timeout = timeout;
151 self
152 }
153 }
154
155 crate::utils::macros::generate_set_and_with! {
156 pub fn forward(mut self, kind: Option<ForwardKind>) -> Self {
168 self.forward = kind;
169 self
170 }
171 }
172
173 crate::utils::macros::generate_set_and_with! {
174 #[cfg(any(feature = "rustls", feature = "boring"))]
175 pub fn tls_server_config(mut self, cfg: Option<TlsConfig>) -> Self {
178 self.tls_server_config = cfg;
179 self
180 }
181 }
182
183 crate::utils::macros::generate_set_and_with! {
184 pub fn http_version(mut self, version: Option<Version>) -> Self {
186 self.http_version = version;
187 self
188 }
189 }
190
191 pub fn with_http_layer<H2>(self, layer: H2) -> EchoServiceBuilder<(H, H2)> {
193 EchoServiceBuilder {
194 concurrent_limit: self.concurrent_limit,
195 body_limit: self.body_limit,
196 timeout: self.timeout,
197 forward: self.forward,
198
199 #[cfg(any(feature = "rustls", feature = "boring"))]
200 tls_server_config: self.tls_server_config,
201
202 http_version: self.http_version,
203
204 ws_support: self.ws_support,
205
206 http_service_builder: (self.http_service_builder, layer),
207
208 uadb: self.uadb,
209 }
210 }
211
212 crate::utils::macros::generate_set_and_with! {
213 pub fn user_agent_database(
216 mut self,
217 db: Option<std::sync::Arc<UserAgentDatabase>>,
218 ) -> Self {
219 self.uadb = db;
220 self
221 }
222 }
223
224 crate::utils::macros::generate_set_and_with! {
225 pub fn ws_support(
227 mut self,
228 support: bool,
229 ) -> Self {
230 self.ws_support = support;
231 self
232 }
233 }
234}
235
236impl<H> EchoServiceBuilder<H>
237where
238 H: Layer<EchoService, Service: Service<Request, Output = Response, Error = BoxError>>,
239{
240 #[allow(unused_mut)]
241 pub fn build(
243 mut self,
244 exec: Executor,
245 ) -> Result<impl Service<TcpStream, Output = (), Error = Infallible>, BoxError> {
246 let tcp_forwarded_layer = match &self.forward {
247 Some(ForwardKind::HaProxy) => Some(HaProxyLayer::default()),
248 _ => None,
249 };
250
251 let http_service = Arc::new(self.build_http(exec.clone()));
252
253 #[cfg(all(feature = "rustls", not(feature = "boring")))]
254 let tls_cfg = self.tls_server_config;
255
256 #[cfg(feature = "boring")]
257 let tls_cfg: Option<TlsAcceptorData> = match self.tls_server_config {
258 Some(cfg) => Some(cfg.try_into()?),
259 None => None,
260 };
261
262 let tcp_service_builder = (
263 ConsumeErrLayer::trace_as(tracing::Level::DEBUG),
264 LimitLayer::new(if self.concurrent_limit > 0 {
265 Either::A(ConcurrentPolicy::max(self.concurrent_limit))
266 } else {
267 Either::B(UnlimitedPolicy::new())
268 }),
269 if !self.timeout.is_zero() {
270 TimeoutLayer::new(self.timeout)
271 } else {
272 TimeoutLayer::never()
273 },
274 tcp_forwarded_layer,
275 BodyLimitLayer::request_only(self.body_limit),
276 #[cfg(any(feature = "rustls", feature = "boring"))]
277 tls_cfg.map(|cfg| TlsAcceptorLayer::new(cfg).with_store_client_hello(true)),
278 );
279
280 let http_transport_service = match self.http_version {
281 Some(Version::HTTP_2) => Either3::A({
282 let mut http = HttpServer::h2(exec);
283 if self.ws_support {
284 http.h2_mut().set_enable_connect_protocol();
285 }
286 http.service(http_service)
287 }),
288 Some(Version::HTTP_11 | Version::HTTP_10 | Version::HTTP_09) => {
289 Either3::B(HttpServer::http1(exec).service(http_service))
290 }
291 Some(version) => {
292 return Err(BoxError::from("unsupported http version")
293 .context_debug_field("version", version));
294 }
295 None => Either3::C({
296 let mut http = HttpServer::auto(exec);
297 if self.ws_support {
298 http.h2_mut().set_enable_connect_protocol();
299 }
300 http.service(http_service)
301 }),
302 };
303
304 Ok(tcp_service_builder.into_layer(http_transport_service))
305 }
306
307 pub fn build_http(
309 &self,
310 exec: Executor,
311 ) -> impl Service<Request, Output: IntoResponse, Error = Infallible> + use<H> {
312 let http_forwarded_layer = match &self.forward {
313 None | Some(ForwardKind::HaProxy) => None,
314 Some(ForwardKind::Forwarded) => Some(Either7::A(GetForwardedHeaderLayer::forwarded())),
315 Some(ForwardKind::XForwardedFor) => {
316 Some(Either7::B(GetForwardedHeaderLayer::x_forwarded_for()))
317 }
318 Some(ForwardKind::XClientIp) => {
319 Some(Either7::C(GetForwardedHeaderLayer::<XClientIp>::new()))
320 }
321 Some(ForwardKind::ClientIp) => {
322 Some(Either7::D(GetForwardedHeaderLayer::<ClientIp>::new()))
323 }
324 Some(ForwardKind::XRealIp) => {
325 Some(Either7::E(GetForwardedHeaderLayer::<XRealIp>::new()))
326 }
327 Some(ForwardKind::CFConnectingIp) => {
328 Some(Either7::F(GetForwardedHeaderLayer::<CFConnectingIp>::new()))
329 }
330 Some(ForwardKind::TrueClientIp) => {
331 Some(Either7::G(GetForwardedHeaderLayer::<TrueClientIp>::new()))
332 }
333 };
334
335 (
336 TraceLayer::new_for_http(),
337 SetResponseHeaderLayer::<XClacksOverhead>::if_not_present_default_typed(),
338 AddRequiredResponseHeadersLayer::default(),
339 UserAgentClassifierLayer::new(),
340 ConsumeErrLayer::default(),
341 http_forwarded_layer,
342 self.ws_support.then(|| {
343 UpgradeLayer::new(
344 exec,
345 WebSocketMatcher::default(),
346 {
347 let acceptor = WebSocketAcceptor::default()
348 .with_protocols_flex(true)
349 .with_echo_protocols();
350
351 #[cfg(feature = "compression")]
352 {
353 acceptor.with_per_message_deflate_overwrite_extensions()
354 }
355 #[cfg(not(feature = "compression"))]
356 {
357 acceptor
358 }
359 },
360 ConsumeErrLayer::trace_as(tracing::Level::DEBUG)
361 .into_layer(WebSocketEchoService::default()),
362 )
363 }),
364 )
365 .into_layer(self.http_service_builder.layer(EchoService {
366 uadb: self.uadb.clone(),
367 }))
368 }
369}
370
371#[derive(Debug, Clone)]
372#[non_exhaustive]
373pub struct EchoService {
375 uadb: Option<std::sync::Arc<UserAgentDatabase>>,
376}
377
378impl Service<Request> for EchoService {
379 type Output = Response;
380 type Error = BoxError;
381
382 async fn serve(&self, req: Request) -> Result<Self::Output, Self::Error> {
383 let user_agent_info = req
384 .extensions()
385 .get()
386 .map(|ua: &UserAgent| {
387 json!({
388 "user_agent": ua.header_str().to_owned(),
389 "kind": ua.info().map(|info| info.kind.to_string()),
390 "version": ua.info().and_then(|info| info.version),
391 "platform": ua.platform().map(|v| v.to_string()),
392 })
393 })
394 .unwrap_or_default();
395
396 let request_context = RequestContext::try_from(&req)?;
397
398 let authority = request_context.authority.to_string();
399 let scheme = request_context.protocol.to_string();
400
401 let ua_str = req
402 .headers()
403 .get(USER_AGENT)
404 .and_then(|h| h.to_str().ok())
405 .map(ToOwned::to_owned);
406 tracing::debug!(
407 user_agent.original = ua_str,
408 "echo request received from ua with ua header",
409 );
410
411 #[derive(Debug, Serialize)]
412 struct FingerprintProfileData {
413 hash: String,
414 verbose: String,
415 matched: bool,
416 }
417
418 let ja4h = Ja4H::compute(&req)
419 .inspect_err(|err| tracing::error!("ja4h compute failure: {err:?}"))
420 .ok()
421 .map(|ja4h| {
422 let mut profile_ja4h: Option<FingerprintProfileData> = None;
423
424 if let Some(uadb) = self.uadb.as_deref()
425 && let Some(profile) =
426 ua_str.as_deref().and_then(|s| uadb.get_exact_header_str(s))
427 {
428 let matched_ja4h = match req.version() {
429 Version::HTTP_10 | Version::HTTP_11 => profile
430 .http
431 .ja4h_h1_navigate(Some(req.method().clone()))
432 .inspect_err(|err| {
433 tracing::trace!(
434 "ja4h computation of matched profile for incoming h1 req: {err:?}"
435 )
436 })
437 .ok(),
438 Version::HTTP_2 => profile
439 .http
440 .ja4h_h2_navigate(Some(req.method().clone()))
441 .inspect_err(|err| {
442 tracing::trace!(
443 "ja4h computation of matched profile for incoming h2 req: {err:?}"
444 )
445 })
446 .ok(),
447 _ => None,
448 };
449 if let Some(tgt) = matched_ja4h {
450 let hash = format!("{tgt}");
451 let matched = format!("{ja4h}") == hash;
452 profile_ja4h = Some(FingerprintProfileData {
453 hash,
454 verbose: format!("{tgt:?}"),
455 matched,
456 });
457 }
458 }
459
460 json!({
461 "hash": format!("{ja4h}"),
462 "verbose": format!("{ja4h:?}"),
463 "profile": profile_ja4h,
464 })
465 });
466
467 let (parts, body) = req.into_parts();
468
469 let body = body
470 .collect()
471 .await
472 .context("collect request body for echo purposes")?
473 .to_bytes();
474
475 let curl_request = curl::cmd_string_for_request_parts_and_payload(&parts, &body);
476
477 let headers: Vec<_> = Http1HeaderMap::new(parts.headers, Some(&parts.extensions))
478 .into_iter()
479 .map(|(name, value)| {
480 (
481 name,
482 std::str::from_utf8(value.as_bytes())
483 .map(|s| s.to_owned())
484 .unwrap_or_else(|_| format!("0x{:x?}", value.as_bytes())),
485 )
486 })
487 .collect();
488
489 let body = hex::encode(body.as_ref());
490
491 #[cfg(any(feature = "rustls", feature = "boring"))]
492 let tls_info = parts
493 .extensions
494 .get::<SecureTransport>()
495 .and_then(|st| st.client_hello())
496 .map(|hello| {
497 let ja4 = Ja4::compute(parts.extensions.extensions())
498 .inspect_err(|err| tracing::trace!("ja4 computation: {err:?}"))
499 .ok();
500
501 let mut profile_ja4: Option<FingerprintProfileData> = None;
502
503 if let Some(uadb) = self.uadb.as_deref()
504 && let Some(profile) =
505 ua_str.as_deref().and_then(|s| uadb.get_exact_header_str(s))
506 {
507 let matched_ja4 = profile
508 .tls
509 .compute_ja4(
510 parts
511 .extensions
512 .get::<NegotiatedTlsParameters>()
513 .map(|param| param.protocol_version),
514 )
515 .inspect_err(|err| {
516 tracing::trace!("ja4 computation of matched profile: {err:?}")
517 })
518 .ok();
519 if let (Some(src), Some(tgt)) = (ja4.as_ref(), matched_ja4) {
520 let hash = format!("{tgt}");
521 let matched = format!("{src}") == hash;
522 profile_ja4 = Some(FingerprintProfileData {
523 hash,
524 verbose: format!("{tgt:?}"),
525 matched,
526 });
527 }
528 }
529
530 let ja4 = ja4.map(|ja4| {
531 json!({
532 "hash": format!("{ja4}"),
533 "verbose": format!("{ja4:?}"),
534 "profile": profile_ja4,
535 })
536 });
537
538 let ja3 = Ja3::compute(parts.extensions.extensions())
539 .inspect_err(|err| tracing::trace!("ja3 computation: {err:?}"))
540 .ok();
541
542 let mut profile_ja3: Option<FingerprintProfileData> = None;
543
544 if let Some(uadb) = self.uadb.as_deref()
545 && let Some(profile) =
546 ua_str.as_deref().and_then(|s| uadb.get_exact_header_str(s))
547 {
548 let matched_ja3 = profile
549 .tls
550 .compute_ja3(
551 parts
552 .extensions
553 .get::<NegotiatedTlsParameters>()
554 .map(|param| param.protocol_version),
555 )
556 .inspect_err(|err| {
557 tracing::trace!("ja3 computation of matched profile: {err:?}")
558 })
559 .ok();
560 if let (Some(src), Some(tgt)) = (ja3.as_ref(), matched_ja3) {
561 let hash = format!("{tgt:x}");
562 let matched = format!("{src:x}") == hash;
563 profile_ja3 = Some(FingerprintProfileData {
564 hash,
565 verbose: format!("{tgt}"),
566 matched,
567 });
568 }
569 }
570
571 let ja3 = ja3.map(|ja3| {
572 json!({
573 "hash": format!("{ja3:x}"),
574 "verbose": format!("{ja3}"),
575 "profile": profile_ja3,
576 })
577 });
578
579 let peet = PeetPrint::compute(parts.extensions.extensions())
580 .inspect_err(|err| tracing::trace!("peet computation: {err:?}"))
581 .ok();
582
583 let mut profile_peet: Option<FingerprintProfileData> = None;
584
585 if let Some(uadb) = self.uadb.as_deref()
586 && let Some(profile) =
587 ua_str.as_deref().and_then(|s| uadb.get_exact_header_str(s))
588 {
589 let matched_peet = profile
590 .tls
591 .compute_peet()
592 .inspect_err(|err| {
593 tracing::trace!("peetprint computation of matched profile: {err:?}")
594 })
595 .ok();
596 if let (Some(src), Some(tgt)) = (peet.as_ref(), matched_peet) {
597 let hash = format!("{tgt}");
598 let matched = format!("{src}") == hash;
599 profile_peet = Some(FingerprintProfileData {
600 hash,
601 verbose: format!("{tgt:?}"),
602 matched,
603 });
604 }
605 }
606
607 let peet = peet.map(|peet| {
608 json!({
609 "hash": format!("{peet}"),
610 "verbose": format!("{peet:?}"),
611 "profile": profile_peet,
612 })
613 });
614
615 json!({
616 "header": {
617 "version": hello.protocol_version().to_string(),
618 "cipher_suites": hello
619 .cipher_suites().iter().map(|s| s.to_string()).collect::<Vec<_>>(),
620 "compression_algorithms": hello
621 .compression_algorithms().iter().map(|s| s.to_string()).collect::<Vec<_>>(),
622 },
623 "extensions": hello.extensions().iter().map(|extension| match extension {
624 ClientHelloExtension::ServerName(domain) => json!({
625 "id": extension.id().to_string(),
626 "data": domain,
627 }),
628 ClientHelloExtension::SignatureAlgorithms(v) => json!({
629 "id": extension.id().to_string(),
630 "data": v.iter().map(|s| s.to_string()).collect::<Vec<_>>(),
631 }),
632 ClientHelloExtension::SupportedVersions(v) => json!({
633 "id": extension.id().to_string(),
634 "data": v.iter().map(|s| s.to_string()).collect::<Vec<_>>(),
635 }),
636 ClientHelloExtension::ApplicationLayerProtocolNegotiation(v) => json!({
637 "id": extension.id().to_string(),
638 "data": v.iter().map(|s| s.to_string()).collect::<Vec<_>>(),
639 }),
640 ClientHelloExtension::ApplicationSettings(v) => json!({
641 "id": extension.id().to_string(),
642 "data": v.iter().map(|s| s.to_string()).collect::<Vec<_>>(),
643 }),
644 ClientHelloExtension::SupportedGroups(v) => json!({
645 "id": extension.id().to_string(),
646 "data": v.iter().map(|s| s.to_string()).collect::<Vec<_>>(),
647 }),
648 ClientHelloExtension::ECPointFormats(v) => json!({
649 "id": extension.id().to_string(),
650 "data": v.iter().map(|s| s.to_string()).collect::<Vec<_>>(),
651 }),
652 ClientHelloExtension::CertificateCompression(v) => json!({
653 "id": extension.id().to_string(),
654 "data": v.iter().map(|s| s.to_string()).collect::<Vec<_>>(),
655 }),
656 ClientHelloExtension::DelegatedCredentials(v) => json!({
657 "id": extension.id().to_string(),
658 "data": v.iter().map(|s| s.to_string()).collect::<Vec<_>>(),
659 }),
660 ClientHelloExtension::RecordSizeLimit(v) => json!({
661 "id": extension.id().to_string(),
662 "data": v.to_string(),
663 }),
664 ClientHelloExtension::EncryptedClientHello(ech) => match ech {
665 ECHClientHello::Outer(ech) => json!({
666 "id": extension.id().to_string(),
667 "data": {
668 "type": "outer",
669 "cipher_suite": {
670 "aead_id": ech.cipher_suite.aead_id.to_string(),
671 "kdf_id": ech.cipher_suite.kdf_id.to_string(),
672 },
673 "config_id": ech.config_id,
674 "enc": format!("0x{}", hex::encode(&ech.enc)),
675 "payload": format!("0x{}", hex::encode(&ech.payload)),
676 },
677 }),
678 ECHClientHello::Inner => json!({
679 "id": extension.id().to_string(),
680 "data": {
681 "type": "inner",
682 },
683 })
684
685 }
686 ClientHelloExtension::Opaque { id, data } => if data.is_empty() {
687 json!({
688 "id": id.to_string()
689 })
690 } else {
691 json!({
692 "id": id.to_string(),
693 "data": format!("0x{}", hex::encode(data))
694 })
695 },
696 }).collect::<Vec<_>>(),
697 "ja3": ja3,
698 "ja4": ja4,
699 "peet": peet
700 })
701 });
702
703 #[cfg(not(any(feature = "rustls", feature = "boring")))]
704 let tls_info: Option<()> = None;
705
706 let mut h2 = None;
707 if parts.version == Version::HTTP_2 {
708 let early_frames = parts.extensions.get::<EarlyFrameCapture>();
709 let pseudo_headers = parts.extensions.get::<PseudoHeaderOrder>();
710 let akamai_h2 = AkamaiH2::compute(&parts.extensions)
711 .inspect_err(|err| tracing::trace!("akamai h2 compute failure: {err:?}"))
712 .ok()
713 .map(|akamai| {
714 json!({
715 "hash": format!("{akamai}"),
716 "verbose": format!("{akamai:?}"),
717 })
718 });
719
720 h2 = Some(json!({
721 "early_frames": early_frames,
722 "pseudo_headers": pseudo_headers,
723 "akamai_h2": akamai_h2,
724 }));
725 }
726
727 Ok(Json(json!({
728 "ua": user_agent_info,
729 "http": {
730 "version": format!("{:?}", parts.version),
731 "scheme": scheme,
732 "method": format!("{:?}", parts.method),
733 "authority": authority,
734 "path": parts.uri.path().to_owned(),
735 "query": parts.uri.query().map(str::to_owned),
736 "h2": h2,
737 "headers": headers,
738 "payload": body,
739 "ja4h": ja4h,
740 "curl": curl_request,
741 },
742 "tls": tls_info,
743 "socket_addr": parts.extensions.get::<Forwarded>()
744 .and_then(|f|
745 f.client_socket_addr().map(|addr| addr.to_string())
746 .or_else(|| f.client_ip().map(|ip| ip.to_string()))
747 ).or_else(|| parts.extensions.get::<SocketInfo>().map(|v| v.peer_addr().to_string())),
748 }))
749 .into_response())
750 }
751}