1use crate::{
9 Layer, Service,
10 cli::ForwardKind,
11 combinators::{Either, Either3, Either7},
12 error::{BoxError, ErrorContext},
13 extensions::ExtensionsRef,
14 http::{
15 Request, Response, Version,
16 body::util::BodyExt,
17 convert::curl,
18 core::h2::frame::EarlyFrameCapture,
19 header::USER_AGENT,
20 headers::exotic::XClacksOverhead,
21 headers::forwarded::{CFConnectingIp, ClientIp, TrueClientIp, XClientIp, XRealIp},
22 layer::set_header::SetResponseHeaderLayer,
23 layer::{
24 forwarded::GetForwardedHeaderLayer, required_header::AddRequiredResponseHeadersLayer,
25 trace::TraceLayer,
26 },
27 proto::h1::Http1HeaderMap,
28 proto::h2::PseudoHeaderOrder,
29 server::{HttpServer, layer::upgrade::UpgradeLayer},
30 service::web::{extract::Json, response::IntoResponse},
31 ws::handshake::{
32 matcher::WebSocketMatcher,
33 server::{WebSocketAcceptor, WebSocketEchoService},
34 },
35 },
36 layer::limit::policy::UnlimitedPolicy,
37 layer::{ConsumeErrLayer, LimitLayer, TimeoutLayer, limit::policy::ConcurrentPolicy},
38 net::fingerprint::{AkamaiH2, Ja4H},
39 net::forwarded::Forwarded,
40 net::http::RequestContext,
41 net::stream::{SocketInfo, layer::http::BodyLimitLayer},
42 proxy::haproxy::server::HaProxyLayer,
43 rt::Executor,
44 tcp::TcpStream,
45 telemetry::tracing,
46 ua::{UserAgent, layer::classifier::UserAgentClassifierLayer, profile::UserAgentDatabase},
47};
48
49use rama_core::error::ErrorExt as _;
50use serde::Serialize;
51use serde_json::json;
52use std::{convert::Infallible, sync::Arc, time::Duration};
53
54#[cfg(all(feature = "rustls", not(feature = "boring")))]
55use crate::tls::rustls::server::{TlsAcceptorData, TlsAcceptorLayer};
56
57#[cfg(any(feature = "rustls", feature = "boring"))]
58use crate::{
59 net::fingerprint::{Ja3, Ja4, PeetPrint},
60 net::tls::{
61 SecureTransport,
62 client::ClientHelloExtension,
63 client::{ECHClientHello, NegotiatedTlsParameters},
64 },
65};
66#[cfg(feature = "boring")]
67use crate::{
68 net::tls::server::ServerConfig,
69 tls::boring::server::{TlsAcceptorData, TlsAcceptorLayer},
70};
71
72#[cfg(feature = "boring")]
73type TlsConfig = ServerConfig;
74
75#[cfg(all(feature = "rustls", not(feature = "boring")))]
76type TlsConfig = TlsAcceptorData;
77
78#[derive(Debug, Clone)]
79pub struct EchoServiceBuilder<H> {
82 concurrent_limit: usize,
83 body_limit: usize,
84 timeout: Duration,
85 forward: Option<ForwardKind>,
86
87 #[cfg(any(feature = "rustls", feature = "boring"))]
88 tls_server_config: Option<TlsConfig>,
89
90 http_version: Option<Version>,
91
92 ws_support: bool,
93
94 http_service_builder: H,
95
96 uadb: Option<std::sync::Arc<UserAgentDatabase>>,
97}
98
99impl Default for EchoServiceBuilder<()> {
100 fn default() -> Self {
101 Self {
102 concurrent_limit: 0,
103 body_limit: 1024 * 1024,
104 timeout: Duration::ZERO,
105 forward: None,
106
107 #[cfg(any(feature = "rustls", feature = "boring"))]
108 tls_server_config: None,
109
110 http_version: None,
111
112 ws_support: false,
113
114 http_service_builder: (),
115
116 uadb: None,
117 }
118 }
119}
120
121impl EchoServiceBuilder<()> {
122 #[must_use]
124 pub fn new() -> Self {
125 Self::default()
126 }
127}
128
129impl<H> EchoServiceBuilder<H> {
130 crate::utils::macros::generate_set_and_with! {
131 pub fn concurrent(mut self, limit: usize) -> Self {
135 self.concurrent_limit = limit;
136 self
137 }
138 }
139
140 crate::utils::macros::generate_set_and_with! {
141 pub fn body_limit(mut self, limit: usize) -> Self {
143 self.body_limit = limit;
144 self
145 }
146 }
147
148 crate::utils::macros::generate_set_and_with! {
149 pub fn timeout(mut self, timeout: Duration) -> Self {
153 self.timeout = timeout;
154 self
155 }
156 }
157
158 crate::utils::macros::generate_set_and_with! {
159 pub fn forward(mut self, kind: Option<ForwardKind>) -> Self {
171 self.forward = kind;
172 self
173 }
174 }
175
176 crate::utils::macros::generate_set_and_with! {
177 #[cfg(any(feature = "rustls", feature = "boring"))]
178 pub fn tls_server_config(mut self, cfg: Option<TlsConfig>) -> Self {
181 self.tls_server_config = cfg;
182 self
183 }
184 }
185
186 crate::utils::macros::generate_set_and_with! {
187 pub fn http_version(mut self, version: Option<Version>) -> Self {
189 self.http_version = version;
190 self
191 }
192 }
193
194 pub fn with_http_layer<H2>(self, layer: H2) -> EchoServiceBuilder<(H, H2)> {
196 EchoServiceBuilder {
197 concurrent_limit: self.concurrent_limit,
198 body_limit: self.body_limit,
199 timeout: self.timeout,
200 forward: self.forward,
201
202 #[cfg(any(feature = "rustls", feature = "boring"))]
203 tls_server_config: self.tls_server_config,
204
205 http_version: self.http_version,
206
207 ws_support: self.ws_support,
208
209 http_service_builder: (self.http_service_builder, layer),
210
211 uadb: self.uadb,
212 }
213 }
214
215 crate::utils::macros::generate_set_and_with! {
216 pub fn user_agent_database(
219 mut self,
220 db: Option<std::sync::Arc<UserAgentDatabase>>,
221 ) -> Self {
222 self.uadb = db;
223 self
224 }
225 }
226
227 crate::utils::macros::generate_set_and_with! {
228 pub fn ws_support(
230 mut self,
231 support: bool,
232 ) -> Self {
233 self.ws_support = support;
234 self
235 }
236 }
237}
238
239impl<H> EchoServiceBuilder<H>
240where
241 H: Layer<EchoService, Service: Service<Request, Output = Response, Error = BoxError>>,
242{
243 #[allow(unused_mut)]
244 pub fn build(
246 mut self,
247 exec: Executor,
248 ) -> Result<impl Service<TcpStream, Output = (), Error = Infallible>, BoxError> {
249 let tcp_forwarded_layer = match &self.forward {
250 Some(ForwardKind::HaProxy) => Some(HaProxyLayer::default()),
251 _ => None,
252 };
253
254 let http_service = Arc::new(self.build_http(exec.clone()));
255
256 #[cfg(all(feature = "rustls", not(feature = "boring")))]
257 let tls_cfg = self.tls_server_config;
258
259 #[cfg(feature = "boring")]
260 let tls_cfg: Option<TlsAcceptorData> = match self.tls_server_config {
261 Some(cfg) => Some(cfg.try_into()?),
262 None => None,
263 };
264
265 let tcp_service_builder = (
266 ConsumeErrLayer::trace_as(tracing::Level::DEBUG),
267 LimitLayer::new(if self.concurrent_limit > 0 {
268 Either::A(ConcurrentPolicy::max(self.concurrent_limit))
269 } else {
270 Either::B(UnlimitedPolicy::new())
271 }),
272 if !self.timeout.is_zero() {
273 TimeoutLayer::new(self.timeout)
274 } else {
275 TimeoutLayer::never()
276 },
277 tcp_forwarded_layer,
278 BodyLimitLayer::request_only(self.body_limit),
279 #[cfg(any(feature = "rustls", feature = "boring"))]
280 tls_cfg.map(|cfg| TlsAcceptorLayer::new(cfg).with_store_client_hello(true)),
281 );
282
283 let http_transport_service = match self.http_version {
284 Some(Version::HTTP_2) => Either3::A({
285 let mut http = HttpServer::new_h2(exec);
286 if self.ws_support {
287 http.h2_mut().set_enable_connect_protocol();
288 }
289 http.service(http_service)
290 }),
291 Some(Version::HTTP_11 | Version::HTTP_10 | Version::HTTP_09) => {
292 Either3::B(HttpServer::new_http1(exec).service(http_service))
293 }
294 Some(version) => {
295 return Err(BoxError::from("unsupported http version")
296 .context_debug_field("version", version));
297 }
298 None => Either3::C({
299 let mut http = HttpServer::auto(exec);
300 if self.ws_support {
301 http.h2_mut().set_enable_connect_protocol();
302 }
303 http.service(http_service)
304 }),
305 };
306
307 Ok(tcp_service_builder.into_layer(http_transport_service))
308 }
309
310 pub fn build_http(
312 &self,
313 exec: Executor,
314 ) -> impl Service<Request, Output: IntoResponse, Error = Infallible> + use<H> {
315 let http_forwarded_layer = match &self.forward {
316 None | Some(ForwardKind::HaProxy) => None,
317 Some(ForwardKind::Forwarded) => Some(Either7::A(GetForwardedHeaderLayer::forwarded())),
318 Some(ForwardKind::XForwardedFor) => {
319 Some(Either7::B(GetForwardedHeaderLayer::x_forwarded_for()))
320 }
321 Some(ForwardKind::XClientIp) => {
322 Some(Either7::C(GetForwardedHeaderLayer::<XClientIp>::new()))
323 }
324 Some(ForwardKind::ClientIp) => {
325 Some(Either7::D(GetForwardedHeaderLayer::<ClientIp>::new()))
326 }
327 Some(ForwardKind::XRealIp) => {
328 Some(Either7::E(GetForwardedHeaderLayer::<XRealIp>::new()))
329 }
330 Some(ForwardKind::CFConnectingIp) => {
331 Some(Either7::F(GetForwardedHeaderLayer::<CFConnectingIp>::new()))
332 }
333 Some(ForwardKind::TrueClientIp) => {
334 Some(Either7::G(GetForwardedHeaderLayer::<TrueClientIp>::new()))
335 }
336 };
337
338 (
339 TraceLayer::new_for_http(),
340 SetResponseHeaderLayer::<XClacksOverhead>::if_not_present_default_typed(),
341 AddRequiredResponseHeadersLayer::default(),
342 UserAgentClassifierLayer::new(),
343 ConsumeErrLayer::default(),
344 http_forwarded_layer,
345 self.ws_support.then(|| {
346 UpgradeLayer::new(
347 exec,
348 WebSocketMatcher::default(),
349 {
350 let acceptor = WebSocketAcceptor::default()
351 .with_protocols_flex(true)
352 .with_echo_protocols();
353
354 #[cfg(feature = "compression")]
355 {
356 acceptor.with_per_message_deflate_overwrite_extensions()
357 }
358 #[cfg(not(feature = "compression"))]
359 {
360 acceptor
361 }
362 },
363 ConsumeErrLayer::trace_as(tracing::Level::DEBUG)
364 .into_layer(WebSocketEchoService::default()),
365 )
366 }),
367 )
368 .into_layer(self.http_service_builder.layer(EchoService {
369 uadb: self.uadb.clone(),
370 }))
371 }
372}
373
374#[derive(Debug, Clone)]
375#[non_exhaustive]
376pub struct EchoService {
378 uadb: Option<std::sync::Arc<UserAgentDatabase>>,
379}
380
381impl Service<Request> for EchoService {
382 type Output = Response;
383 type Error = BoxError;
384
385 async fn serve(&self, req: Request) -> Result<Self::Output, Self::Error> {
386 let user_agent_info = req
387 .extensions()
388 .get()
389 .map(|ua: &UserAgent| {
390 json!({
391 "user_agent": ua.header_str().to_owned(),
392 "kind": ua.info().map(|info| info.kind.to_string()),
393 "version": ua.info().and_then(|info| info.version),
394 "platform": ua.platform().map(|v| v.to_string()),
395 })
396 })
397 .unwrap_or_default();
398
399 let request_context = RequestContext::try_from(&req)?;
400
401 let authority = request_context.authority.to_string();
402 let scheme = request_context.protocol.to_string();
403
404 let ua_str = req
405 .headers()
406 .get(USER_AGENT)
407 .and_then(|h| h.to_str().ok())
408 .map(ToOwned::to_owned);
409 tracing::debug!(
410 user_agent.original = ua_str,
411 "echo request received from ua with ua header",
412 );
413
414 #[derive(Debug, Serialize)]
415 struct FingerprintProfileData {
416 hash: String,
417 verbose: String,
418 matched: bool,
419 }
420
421 let ja4h = Ja4H::compute(&req)
422 .inspect_err(|err| tracing::error!("ja4h compute failure: {err:?}"))
423 .ok()
424 .map(|ja4h| {
425 let mut profile_ja4h: Option<FingerprintProfileData> = None;
426
427 if let Some(uadb) = self.uadb.as_deref()
428 && let Some(profile) =
429 ua_str.as_deref().and_then(|s| uadb.get_exact_header_str(s))
430 {
431 let matched_ja4h = match req.version() {
432 Version::HTTP_10 | Version::HTTP_11 => profile
433 .http
434 .ja4h_h1_navigate(Some(req.method().clone()))
435 .inspect_err(|err| {
436 tracing::trace!(
437 "ja4h computation of matched profile for incoming h1 req: {err:?}"
438 )
439 })
440 .ok(),
441 Version::HTTP_2 => profile
442 .http
443 .ja4h_h2_navigate(Some(req.method().clone()))
444 .inspect_err(|err| {
445 tracing::trace!(
446 "ja4h computation of matched profile for incoming h2 req: {err:?}"
447 )
448 })
449 .ok(),
450 _ => None,
451 };
452 if let Some(tgt) = matched_ja4h {
453 let hash = format!("{tgt}");
454 let matched = format!("{ja4h}") == hash;
455 profile_ja4h = Some(FingerprintProfileData {
456 hash,
457 verbose: format!("{tgt:?}"),
458 matched,
459 });
460 }
461 }
462
463 json!({
464 "hash": format!("{ja4h}"),
465 "verbose": format!("{ja4h:?}"),
466 "profile": profile_ja4h,
467 })
468 });
469
470 let (parts, body) = req.into_parts();
471
472 let body = body
473 .collect()
474 .await
475 .context("collect request body for echo purposes")?
476 .to_bytes();
477
478 let curl_request = curl::cmd_string_for_request_parts_and_payload(&parts, &body);
479
480 let headers: Vec<_> = Http1HeaderMap::new(parts.headers, Some(&parts.extensions))
481 .into_iter()
482 .map(|(name, value)| {
483 (
484 name,
485 std::str::from_utf8(value.as_bytes())
486 .map(|s| s.to_owned())
487 .unwrap_or_else(|_| format!("0x{:x?}", value.as_bytes())),
488 )
489 })
490 .collect();
491
492 let body = hex::encode(body.as_ref());
493
494 #[cfg(any(feature = "rustls", feature = "boring"))]
495 let tls_info = parts
496 .extensions
497 .get::<SecureTransport>()
498 .and_then(|st| st.client_hello())
499 .map(|hello| {
500 let ja4 = Ja4::compute(parts.extensions.extensions())
501 .inspect_err(|err| tracing::trace!("ja4 computation: {err:?}"))
502 .ok();
503
504 let mut profile_ja4: Option<FingerprintProfileData> = None;
505
506 if let Some(uadb) = self.uadb.as_deref()
507 && let Some(profile) =
508 ua_str.as_deref().and_then(|s| uadb.get_exact_header_str(s))
509 {
510 let matched_ja4 = profile
511 .tls
512 .compute_ja4(
513 parts
514 .extensions
515 .get::<NegotiatedTlsParameters>()
516 .map(|param| param.protocol_version),
517 )
518 .inspect_err(|err| {
519 tracing::trace!("ja4 computation of matched profile: {err:?}")
520 })
521 .ok();
522 if let (Some(src), Some(tgt)) = (ja4.as_ref(), matched_ja4) {
523 let hash = format!("{tgt}");
524 let matched = format!("{src}") == hash;
525 profile_ja4 = Some(FingerprintProfileData {
526 hash,
527 verbose: format!("{tgt:?}"),
528 matched,
529 });
530 }
531 }
532
533 let ja4 = ja4.map(|ja4| {
534 json!({
535 "hash": format!("{ja4}"),
536 "verbose": format!("{ja4:?}"),
537 "profile": profile_ja4,
538 })
539 });
540
541 let ja3 = Ja3::compute(parts.extensions.extensions())
542 .inspect_err(|err| tracing::trace!("ja3 computation: {err:?}"))
543 .ok();
544
545 let mut profile_ja3: Option<FingerprintProfileData> = None;
546
547 if let Some(uadb) = self.uadb.as_deref()
548 && let Some(profile) =
549 ua_str.as_deref().and_then(|s| uadb.get_exact_header_str(s))
550 {
551 let matched_ja3 = profile
552 .tls
553 .compute_ja3(
554 parts
555 .extensions
556 .get::<NegotiatedTlsParameters>()
557 .map(|param| param.protocol_version),
558 )
559 .inspect_err(|err| {
560 tracing::trace!("ja3 computation of matched profile: {err:?}")
561 })
562 .ok();
563 if let (Some(src), Some(tgt)) = (ja3.as_ref(), matched_ja3) {
564 let hash = format!("{tgt:x}");
565 let matched = format!("{src:x}") == hash;
566 profile_ja3 = Some(FingerprintProfileData {
567 hash,
568 verbose: format!("{tgt}"),
569 matched,
570 });
571 }
572 }
573
574 let ja3 = ja3.map(|ja3| {
575 json!({
576 "hash": format!("{ja3:x}"),
577 "verbose": format!("{ja3}"),
578 "profile": profile_ja3,
579 })
580 });
581
582 let peet = PeetPrint::compute(parts.extensions.extensions())
583 .inspect_err(|err| tracing::trace!("peet computation: {err:?}"))
584 .ok();
585
586 let mut profile_peet: Option<FingerprintProfileData> = None;
587
588 if let Some(uadb) = self.uadb.as_deref()
589 && let Some(profile) =
590 ua_str.as_deref().and_then(|s| uadb.get_exact_header_str(s))
591 {
592 let matched_peet = profile
593 .tls
594 .compute_peet()
595 .inspect_err(|err| {
596 tracing::trace!("peetprint computation of matched profile: {err:?}")
597 })
598 .ok();
599 if let (Some(src), Some(tgt)) = (peet.as_ref(), matched_peet) {
600 let hash = format!("{tgt}");
601 let matched = format!("{src}") == hash;
602 profile_peet = Some(FingerprintProfileData {
603 hash,
604 verbose: format!("{tgt:?}"),
605 matched,
606 });
607 }
608 }
609
610 let peet = peet.map(|peet| {
611 json!({
612 "hash": format!("{peet}"),
613 "verbose": format!("{peet:?}"),
614 "profile": profile_peet,
615 })
616 });
617
618 json!({
619 "header": {
620 "version": hello.protocol_version().to_string(),
621 "cipher_suites": hello
622 .cipher_suites().iter().map(|s| s.to_string()).collect::<Vec<_>>(),
623 "compression_algorithms": hello
624 .compression_algorithms().iter().map(|s| s.to_string()).collect::<Vec<_>>(),
625 },
626 "extensions": hello.extensions().iter().map(|extension| match extension {
627 ClientHelloExtension::ServerName(domain) => json!({
628 "id": extension.id().to_string(),
629 "data": domain,
630 }),
631 ClientHelloExtension::SignatureAlgorithms(v) => json!({
632 "id": extension.id().to_string(),
633 "data": v.iter().map(|s| s.to_string()).collect::<Vec<_>>(),
634 }),
635 ClientHelloExtension::SupportedVersions(v) => json!({
636 "id": extension.id().to_string(),
637 "data": v.iter().map(|s| s.to_string()).collect::<Vec<_>>(),
638 }),
639 ClientHelloExtension::ApplicationLayerProtocolNegotiation(v) => json!({
640 "id": extension.id().to_string(),
641 "data": v.iter().map(|s| s.to_string()).collect::<Vec<_>>(),
642 }),
643 ClientHelloExtension::ApplicationSettings{ protocols, .. } => json!({
644 "id": extension.id().to_string(),
645 "data": protocols.iter().map(|s| s.to_string()).collect::<Vec<_>>(),
646 }),
647 ClientHelloExtension::SupportedGroups(v) => json!({
648 "id": extension.id().to_string(),
649 "data": v.iter().map(|s| s.to_string()).collect::<Vec<_>>(),
650 }),
651 ClientHelloExtension::ECPointFormats(v) => json!({
652 "id": extension.id().to_string(),
653 "data": v.iter().map(|s| s.to_string()).collect::<Vec<_>>(),
654 }),
655 ClientHelloExtension::CertificateCompression(v) => json!({
656 "id": extension.id().to_string(),
657 "data": v.iter().map(|s| s.to_string()).collect::<Vec<_>>(),
658 }),
659 ClientHelloExtension::DelegatedCredentials(v) => json!({
660 "id": extension.id().to_string(),
661 "data": v.iter().map(|s| s.to_string()).collect::<Vec<_>>(),
662 }),
663 ClientHelloExtension::RecordSizeLimit(v) => json!({
664 "id": extension.id().to_string(),
665 "data": v.to_string(),
666 }),
667 ClientHelloExtension::EncryptedClientHello(ech) => match ech {
668 ECHClientHello::Outer(ech) => json!({
669 "id": extension.id().to_string(),
670 "data": {
671 "type": "outer",
672 "cipher_suite": {
673 "aead_id": ech.cipher_suite.aead_id.to_string(),
674 "kdf_id": ech.cipher_suite.kdf_id.to_string(),
675 },
676 "config_id": ech.config_id,
677 "enc": format!("0x{}", hex::encode(&ech.enc)),
678 "payload": format!("0x{}", hex::encode(&ech.payload)),
679 },
680 }),
681 ECHClientHello::Inner => json!({
682 "id": extension.id().to_string(),
683 "data": {
684 "type": "inner",
685 },
686 })
687
688 }
689 ClientHelloExtension::Opaque { id, data } => if data.is_empty() {
690 json!({
691 "id": id.to_string()
692 })
693 } else {
694 json!({
695 "id": id.to_string(),
696 "data": format!("0x{}", hex::encode(data))
697 })
698 },
699 }).collect::<Vec<_>>(),
700 "ja3": ja3,
701 "ja4": ja4,
702 "peet": peet
703 })
704 });
705
706 #[cfg(not(any(feature = "rustls", feature = "boring")))]
707 let tls_info: Option<()> = None;
708
709 let mut h2 = None;
710 if parts.version == Version::HTTP_2 {
711 let early_frames = parts.extensions.get::<EarlyFrameCapture>();
712 let pseudo_headers = parts.extensions.get::<PseudoHeaderOrder>();
713 let akamai_h2 = AkamaiH2::compute(&parts.extensions)
714 .inspect_err(|err| tracing::trace!("akamai h2 compute failure: {err:?}"))
715 .ok()
716 .map(|akamai| {
717 json!({
718 "hash": format!("{akamai}"),
719 "verbose": format!("{akamai:?}"),
720 })
721 });
722
723 h2 = Some(json!({
724 "early_frames": early_frames,
725 "pseudo_headers": pseudo_headers,
726 "akamai_h2": akamai_h2,
727 }));
728 }
729
730 Ok(Json(json!({
731 "ua": user_agent_info,
732 "http": {
733 "version": format!("{:?}", parts.version),
734 "scheme": scheme,
735 "method": format!("{:?}", parts.method),
736 "authority": authority,
737 "path": parts.uri.path().to_owned(),
738 "query": parts.uri.query().map(str::to_owned),
739 "h2": h2,
740 "headers": headers,
741 "payload": body,
742 "ja4h": ja4h,
743 "curl": curl_request,
744 },
745 "tls": tls_info,
746 "socket_addr": parts.extensions.get::<Forwarded>()
747 .and_then(|f|
748 f.client_socket_addr().map(|addr| addr.to_string())
749 .or_else(|| f.client_ip().map(|ip| ip.to_string()))
750 ).or_else(|| parts.extensions.get::<SocketInfo>().map(|v| v.peer_addr().to_string())),
751 }))
752 .into_response())
753 }
754}