1use crate::{
9 Layer, Service,
10 cli::ForwardKind,
11 combinators::{Either3, Either7},
12 error::{BoxError, ErrorContext, OpaqueError},
13 extensions::ExtensionsRef,
14 http::{
15 Request, Response, Version,
16 body::util::BodyExt,
17 convert::curl,
18 core::h2::frame::EarlyFrameCapture,
19 header::USER_AGENT,
20 headers::forwarded::{CFConnectingIp, ClientIp, TrueClientIp, XClientIp, XRealIp},
21 layer::{
22 forwarded::GetForwardedHeaderLayer,
23 required_header::AddRequiredResponseHeadersLayer,
24 trace::TraceLayer,
25 ua::{UserAgent, UserAgentClassifierLayer},
26 },
27 proto::h1::Http1HeaderMap,
28 proto::h2::PseudoHeaderOrder,
29 server::{HttpServer, layer::upgrade::UpgradeLayer},
30 service::web::{extract::Json, response::IntoResponse},
31 ws::handshake::server::{WebSocketAcceptor, WebSocketEchoService, WebSocketMatcher},
32 },
33 layer::{ConsumeErrLayer, LimitLayer, TimeoutLayer, limit::policy::ConcurrentPolicy},
34 net::fingerprint::Ja4H,
35 net::forwarded::Forwarded,
36 net::http::RequestContext,
37 net::stream::{SocketInfo, layer::http::BodyLimitLayer},
38 proxy::haproxy::server::HaProxyLayer,
39 rt::Executor,
40 tcp::TcpStream,
41 telemetry::tracing,
42 ua::profile::UserAgentDatabase,
43};
44
45use serde::Serialize;
46use serde_json::json;
47use std::{convert::Infallible, time::Duration};
48
49#[cfg(all(feature = "rustls", not(feature = "boring")))]
50use crate::tls::rustls::server::{TlsAcceptorData, TlsAcceptorLayer};
51
52#[cfg(any(feature = "rustls", feature = "boring"))]
53use crate::{
54 net::fingerprint::{Ja3, Ja4, PeetPrint},
55 net::tls::{
56 SecureTransport,
57 client::ClientHelloExtension,
58 client::{ECHClientHello, NegotiatedTlsParameters},
59 },
60};
61#[cfg(feature = "boring")]
62use crate::{
63 net::tls::server::ServerConfig,
64 tls::boring::server::{TlsAcceptorData, TlsAcceptorLayer},
65};
66
67#[cfg(feature = "boring")]
68type TlsConfig = ServerConfig;
69
70#[cfg(all(feature = "rustls", not(feature = "boring")))]
71type TlsConfig = TlsAcceptorData;
72
73#[derive(Debug, Clone)]
74pub struct EchoServiceBuilder<H> {
77 concurrent_limit: usize,
78 body_limit: usize,
79 timeout: Duration,
80 forward: Option<ForwardKind>,
81
82 #[cfg(any(feature = "rustls", feature = "boring"))]
83 tls_server_config: Option<TlsConfig>,
84
85 http_version: Option<Version>,
86
87 ws_support: bool,
88
89 http_service_builder: H,
90
91 uadb: Option<std::sync::Arc<UserAgentDatabase>>,
92}
93
94impl Default for EchoServiceBuilder<()> {
95 fn default() -> Self {
96 Self {
97 concurrent_limit: 0,
98 body_limit: 1024 * 1024,
99 timeout: Duration::ZERO,
100 forward: None,
101
102 #[cfg(any(feature = "rustls", feature = "boring"))]
103 tls_server_config: None,
104
105 http_version: None,
106
107 ws_support: false,
108
109 http_service_builder: (),
110
111 uadb: None,
112 }
113 }
114}
115
116impl EchoServiceBuilder<()> {
117 #[must_use]
119 pub fn new() -> Self {
120 Self::default()
121 }
122}
123
124impl<H> EchoServiceBuilder<H> {
125 crate::utils::macros::generate_set_and_with! {
126 pub fn concurrent(mut self, limit: usize) -> Self {
130 self.concurrent_limit = limit;
131 self
132 }
133 }
134
135 crate::utils::macros::generate_set_and_with! {
136 pub fn body_limit(mut self, limit: usize) -> Self {
138 self.body_limit = limit;
139 self
140 }
141 }
142
143 crate::utils::macros::generate_set_and_with! {
144 pub fn timeout(mut self, timeout: Duration) -> Self {
148 self.timeout = timeout;
149 self
150 }
151 }
152
153 crate::utils::macros::generate_set_and_with! {
154 pub fn forward(mut self, kind: Option<ForwardKind>) -> Self {
166 self.forward = kind;
167 self
168 }
169 }
170
171 crate::utils::macros::generate_set_and_with! {
172 #[cfg(any(feature = "rustls", feature = "boring"))]
173 pub fn tls_server_config(mut self, cfg: Option<TlsConfig>) -> Self {
176 self.tls_server_config = cfg;
177 self
178 }
179 }
180
181 crate::utils::macros::generate_set_and_with! {
182 pub fn http_version(mut self, version: Option<Version>) -> Self {
184 self.http_version = version;
185 self
186 }
187 }
188
189 pub fn with_http_layer<H2>(self, layer: H2) -> EchoServiceBuilder<(H, H2)> {
191 EchoServiceBuilder {
192 concurrent_limit: self.concurrent_limit,
193 body_limit: self.body_limit,
194 timeout: self.timeout,
195 forward: self.forward,
196
197 #[cfg(any(feature = "rustls", feature = "boring"))]
198 tls_server_config: self.tls_server_config,
199
200 http_version: self.http_version,
201
202 ws_support: self.ws_support,
203
204 http_service_builder: (self.http_service_builder, layer),
205
206 uadb: self.uadb,
207 }
208 }
209
210 crate::utils::macros::generate_set_and_with! {
211 pub fn user_agent_database(
214 mut self,
215 db: Option<std::sync::Arc<UserAgentDatabase>>,
216 ) -> Self {
217 self.uadb = db;
218 self
219 }
220 }
221
222 crate::utils::macros::generate_set_and_with! {
223 pub fn ws_support(
225 mut self,
226 support: bool,
227 ) -> Self {
228 self.ws_support = support;
229 self
230 }
231 }
232}
233
234impl<H> EchoServiceBuilder<H>
235where
236 H: Layer<EchoService, Service: Service<Request, Response = Response, Error = BoxError>>,
237{
238 #[allow(unused_mut)]
239 pub fn build(
241 mut self,
242 executor: Executor,
243 ) -> Result<impl Service<TcpStream, Response = (), Error = Infallible>, BoxError> {
244 let tcp_forwarded_layer = match &self.forward {
245 Some(ForwardKind::HaProxy) => Some(HaProxyLayer::default()),
246 _ => None,
247 };
248
249 let http_service = self.build_http();
250
251 #[cfg(all(feature = "rustls", not(feature = "boring")))]
252 let tls_cfg = self.tls_server_config;
253
254 #[cfg(feature = "boring")]
255 let tls_cfg: Option<TlsAcceptorData> = match self.tls_server_config {
256 Some(cfg) => Some(cfg.try_into()?),
257 None => None,
258 };
259
260 let tcp_service_builder = (
261 ConsumeErrLayer::trace(tracing::Level::DEBUG),
262 (self.concurrent_limit > 0)
263 .then(|| LimitLayer::new(ConcurrentPolicy::max(self.concurrent_limit))),
264 (!self.timeout.is_zero()).then(|| TimeoutLayer::new(self.timeout)),
265 tcp_forwarded_layer,
266 BodyLimitLayer::request_only(self.body_limit),
267 #[cfg(any(feature = "rustls", feature = "boring"))]
268 tls_cfg.map(|cfg| TlsAcceptorLayer::new(cfg).with_store_client_hello(true)),
269 );
270
271 let http_transport_service = match self.http_version {
272 Some(Version::HTTP_2) => Either3::A({
273 let mut http = HttpServer::h2(executor);
274 if self.ws_support {
275 http.h2_mut().enable_connect_protocol();
276 }
277 http.service(http_service)
278 }),
279 Some(Version::HTTP_11 | Version::HTTP_10 | Version::HTTP_09) => {
280 Either3::B(HttpServer::http1().service(http_service))
281 }
282 Some(_) => {
283 return Err(OpaqueError::from_display("unsupported http version").into_boxed());
284 }
285 None => Either3::C({
286 let mut http = HttpServer::auto(executor);
287 if self.ws_support {
288 http.h2_mut().enable_connect_protocol();
289 }
290 http.service(http_service)
291 }),
292 };
293
294 Ok(tcp_service_builder.into_layer(http_transport_service))
295 }
296
297 pub fn build_http(
299 &self,
300 ) -> impl Service<Request, Response: IntoResponse, Error = Infallible> + use<H> {
301 let http_forwarded_layer = match &self.forward {
302 None | Some(ForwardKind::HaProxy) => None,
303 Some(ForwardKind::Forwarded) => Some(Either7::A(GetForwardedHeaderLayer::forwarded())),
304 Some(ForwardKind::XForwardedFor) => {
305 Some(Either7::B(GetForwardedHeaderLayer::x_forwarded_for()))
306 }
307 Some(ForwardKind::XClientIp) => {
308 Some(Either7::C(GetForwardedHeaderLayer::<XClientIp>::new()))
309 }
310 Some(ForwardKind::ClientIp) => {
311 Some(Either7::D(GetForwardedHeaderLayer::<ClientIp>::new()))
312 }
313 Some(ForwardKind::XRealIp) => {
314 Some(Either7::E(GetForwardedHeaderLayer::<XRealIp>::new()))
315 }
316 Some(ForwardKind::CFConnectingIp) => {
317 Some(Either7::F(GetForwardedHeaderLayer::<CFConnectingIp>::new()))
318 }
319 Some(ForwardKind::TrueClientIp) => {
320 Some(Either7::G(GetForwardedHeaderLayer::<TrueClientIp>::new()))
321 }
322 };
323
324 (
325 TraceLayer::new_for_http(),
326 AddRequiredResponseHeadersLayer::default(),
327 UserAgentClassifierLayer::new(),
328 ConsumeErrLayer::default(),
329 http_forwarded_layer,
330 self.ws_support.then(|| {
331 UpgradeLayer::new(
332 WebSocketMatcher::default(),
333 {
334 let acceptor = WebSocketAcceptor::default()
335 .with_protocols_flex(true)
336 .with_echo_protocols();
337
338 #[cfg(feature = "compression")]
339 {
340 acceptor.with_per_message_deflate_overwrite_extensions()
341 }
342 #[cfg(not(feature = "compression"))]
343 {
344 acceptor
345 }
346 },
347 ConsumeErrLayer::trace(tracing::Level::DEBUG)
348 .into_layer(WebSocketEchoService::default()),
349 )
350 }),
351 )
352 .into_layer(self.http_service_builder.layer(EchoService {
353 uadb: self.uadb.clone(),
354 }))
355 }
356}
357
358#[derive(Debug, Clone)]
359#[non_exhaustive]
360pub struct EchoService {
362 uadb: Option<std::sync::Arc<UserAgentDatabase>>,
363}
364
365impl Service<Request> for EchoService {
366 type Response = Response;
367 type Error = BoxError;
368
369 async fn serve(&self, req: Request) -> Result<Self::Response, Self::Error> {
370 let user_agent_info = req
371 .extensions()
372 .get()
373 .map(|ua: &UserAgent| {
374 json!({
375 "user_agent": ua.header_str().to_owned(),
376 "kind": ua.info().map(|info| info.kind.to_string()),
377 "version": ua.info().and_then(|info| info.version),
378 "platform": ua.platform().map(|v| v.to_string()),
379 })
380 })
381 .unwrap_or_default();
382
383 let request_context = RequestContext::try_from(&req)?;
384
385 let authority = request_context.authority.to_string();
386 let scheme = request_context.protocol.to_string();
387
388 let ua_str = req
389 .headers()
390 .get(USER_AGENT)
391 .and_then(|h| h.to_str().ok())
392 .map(ToOwned::to_owned);
393 tracing::debug!(
394 user_agent.original = ua_str,
395 "echo request received from ua with ua header",
396 );
397
398 #[derive(Debug, Serialize)]
399 struct FingerprintProfileData {
400 hash: String,
401 verbose: String,
402 matched: bool,
403 }
404
405 let ja4h = Ja4H::compute(&req)
406 .inspect_err(|err| tracing::error!("ja4h compute failure: {err:?}"))
407 .ok()
408 .map(|ja4h| {
409 let mut profile_ja4h: Option<FingerprintProfileData> = None;
410
411 if let Some(uadb) = self.uadb.as_deref()
412 && let Some(profile) =
413 ua_str.as_deref().and_then(|s| uadb.get_exact_header_str(s))
414 {
415 let matched_ja4h = match req.version() {
416 Version::HTTP_10 | Version::HTTP_11 => profile
417 .http
418 .ja4h_h1_navigate(Some(req.method().clone()))
419 .inspect_err(|err| {
420 tracing::trace!(
421 "ja4h computation of matched profile for incoming h1 req: {err:?}"
422 )
423 })
424 .ok(),
425 Version::HTTP_2 => profile
426 .http
427 .ja4h_h2_navigate(Some(req.method().clone()))
428 .inspect_err(|err| {
429 tracing::trace!(
430 "ja4h computation of matched profile for incoming h2 req: {err:?}"
431 )
432 })
433 .ok(),
434 _ => None,
435 };
436 if let Some(tgt) = matched_ja4h {
437 let hash = format!("{tgt}");
438 let matched = format!("{ja4h}") == hash;
439 profile_ja4h = Some(FingerprintProfileData {
440 hash,
441 verbose: format!("{tgt:?}"),
442 matched,
443 });
444 }
445 }
446
447 json!({
448 "hash": format!("{ja4h}"),
449 "verbose": format!("{ja4h:?}"),
450 "profile": profile_ja4h,
451 })
452 });
453
454 let (mut parts, body) = req.into_parts();
455
456 let body = body
457 .collect()
458 .await
459 .context("collect request body for echo purposes")?
460 .to_bytes();
461
462 let curl_request = curl::cmd_string_for_request_parts_and_payload(&parts, &body);
463
464 let headers: Vec<_> = Http1HeaderMap::new(parts.headers, Some(&mut parts.extensions))
465 .into_iter()
466 .map(|(name, value)| {
467 (
468 name,
469 std::str::from_utf8(value.as_bytes())
470 .map(|s| s.to_owned())
471 .unwrap_or_else(|_| format!("0x{:x?}", value.as_bytes())),
472 )
473 })
474 .collect();
475
476 let body = hex::encode(body.as_ref());
477
478 #[cfg(any(feature = "rustls", feature = "boring"))]
479 let tls_info = parts
480 .extensions
481 .get::<SecureTransport>()
482 .and_then(|st| st.client_hello())
483 .map(|hello| {
484 let ja4 = Ja4::compute(parts.extensions.extensions())
485 .inspect_err(|err| tracing::trace!("ja4 computation: {err:?}"))
486 .ok();
487
488 let mut profile_ja4: Option<FingerprintProfileData> = None;
489
490 if let Some(uadb) = self.uadb.as_deref()
491 && let Some(profile) =
492 ua_str.as_deref().and_then(|s| uadb.get_exact_header_str(s))
493 {
494 let matched_ja4 = profile
495 .tls
496 .compute_ja4(
497 parts
498 .extensions
499 .get::<NegotiatedTlsParameters>()
500 .map(|param| param.protocol_version),
501 )
502 .inspect_err(|err| {
503 tracing::trace!("ja4 computation of matched profile: {err:?}")
504 })
505 .ok();
506 if let (Some(src), Some(tgt)) = (ja4.as_ref(), matched_ja4) {
507 let hash = format!("{tgt}");
508 let matched = format!("{src}") == hash;
509 profile_ja4 = Some(FingerprintProfileData {
510 hash,
511 verbose: format!("{tgt:?}"),
512 matched,
513 });
514 }
515 }
516
517 let ja4 = ja4.map(|ja4| {
518 json!({
519 "hash": format!("{ja4}"),
520 "verbose": format!("{ja4:?}"),
521 "profile": profile_ja4,
522 })
523 });
524
525 let ja3 = Ja3::compute(parts.extensions.extensions())
526 .inspect_err(|err| tracing::trace!("ja3 computation: {err:?}"))
527 .ok();
528
529 let mut profile_ja3: Option<FingerprintProfileData> = None;
530
531 if let Some(uadb) = self.uadb.as_deref()
532 && let Some(profile) =
533 ua_str.as_deref().and_then(|s| uadb.get_exact_header_str(s))
534 {
535 let matched_ja3 = profile
536 .tls
537 .compute_ja3(
538 parts
539 .extensions
540 .get::<NegotiatedTlsParameters>()
541 .map(|param| param.protocol_version),
542 )
543 .inspect_err(|err| {
544 tracing::trace!("ja3 computation of matched profile: {err:?}")
545 })
546 .ok();
547 if let (Some(src), Some(tgt)) = (ja3.as_ref(), matched_ja3) {
548 let hash = format!("{tgt:x}");
549 let matched = format!("{src:x}") == hash;
550 profile_ja3 = Some(FingerprintProfileData {
551 hash,
552 verbose: format!("{tgt}"),
553 matched,
554 });
555 }
556 }
557
558 let ja3 = ja3.map(|ja3| {
559 json!({
560 "hash": format!("{ja3:x}"),
561 "verbose": format!("{ja3}"),
562 "profile": profile_ja3,
563 })
564 });
565
566 let peet = PeetPrint::compute(parts.extensions.extensions())
567 .inspect_err(|err| tracing::trace!("peet computation: {err:?}"))
568 .ok();
569
570 let mut profile_peet: Option<FingerprintProfileData> = None;
571
572 if let Some(uadb) = self.uadb.as_deref()
573 && let Some(profile) =
574 ua_str.as_deref().and_then(|s| uadb.get_exact_header_str(s))
575 {
576 let matched_peet = profile
577 .tls
578 .compute_peet()
579 .inspect_err(|err| {
580 tracing::trace!("peetprint computation of matched profile: {err:?}")
581 })
582 .ok();
583 if let (Some(src), Some(tgt)) = (peet.as_ref(), matched_peet) {
584 let hash = format!("{tgt}");
585 let matched = format!("{src}") == hash;
586 profile_peet = Some(FingerprintProfileData {
587 hash,
588 verbose: format!("{tgt:?}"),
589 matched,
590 });
591 }
592 }
593
594 let peet = peet.map(|peet| {
595 json!({
596 "hash": format!("{peet}"),
597 "verbose": format!("{peet:?}"),
598 "profile": profile_peet,
599 })
600 });
601
602 json!({
603 "header": {
604 "version": hello.protocol_version().to_string(),
605 "cipher_suites": hello
606 .cipher_suites().iter().map(|s| s.to_string()).collect::<Vec<_>>(),
607 "compression_algorithms": hello
608 .compression_algorithms().iter().map(|s| s.to_string()).collect::<Vec<_>>(),
609 },
610 "extensions": hello.extensions().iter().map(|extension| match extension {
611 ClientHelloExtension::ServerName(domain) => json!({
612 "id": extension.id().to_string(),
613 "data": domain,
614 }),
615 ClientHelloExtension::SignatureAlgorithms(v) => json!({
616 "id": extension.id().to_string(),
617 "data": v.iter().map(|s| s.to_string()).collect::<Vec<_>>(),
618 }),
619 ClientHelloExtension::SupportedVersions(v) => json!({
620 "id": extension.id().to_string(),
621 "data": v.iter().map(|s| s.to_string()).collect::<Vec<_>>(),
622 }),
623 ClientHelloExtension::ApplicationLayerProtocolNegotiation(v) => json!({
624 "id": extension.id().to_string(),
625 "data": v.iter().map(|s| s.to_string()).collect::<Vec<_>>(),
626 }),
627 ClientHelloExtension::ApplicationSettings(v) => json!({
628 "id": extension.id().to_string(),
629 "data": v.iter().map(|s| s.to_string()).collect::<Vec<_>>(),
630 }),
631 ClientHelloExtension::SupportedGroups(v) => json!({
632 "id": extension.id().to_string(),
633 "data": v.iter().map(|s| s.to_string()).collect::<Vec<_>>(),
634 }),
635 ClientHelloExtension::ECPointFormats(v) => json!({
636 "id": extension.id().to_string(),
637 "data": v.iter().map(|s| s.to_string()).collect::<Vec<_>>(),
638 }),
639 ClientHelloExtension::CertificateCompression(v) => json!({
640 "id": extension.id().to_string(),
641 "data": v.iter().map(|s| s.to_string()).collect::<Vec<_>>(),
642 }),
643 ClientHelloExtension::DelegatedCredentials(v) => json!({
644 "id": extension.id().to_string(),
645 "data": v.iter().map(|s| s.to_string()).collect::<Vec<_>>(),
646 }),
647 ClientHelloExtension::RecordSizeLimit(v) => json!({
648 "id": extension.id().to_string(),
649 "data": v.to_string(),
650 }),
651 ClientHelloExtension::EncryptedClientHello(ech) => match ech {
652 ECHClientHello::Outer(ech) => json!({
653 "id": extension.id().to_string(),
654 "data": {
655 "type": "outer",
656 "cipher_suite": {
657 "aead_id": ech.cipher_suite.aead_id.to_string(),
658 "kdf_id": ech.cipher_suite.kdf_id.to_string(),
659 },
660 "config_id": ech.config_id,
661 "enc": format!("0x{}", hex::encode(&ech.enc)),
662 "payload": format!("0x{}", hex::encode(&ech.payload)),
663 },
664 }),
665 ECHClientHello::Inner => json!({
666 "id": extension.id().to_string(),
667 "data": {
668 "type": "inner",
669 },
670 })
671
672 }
673 ClientHelloExtension::Opaque { id, data } => if data.is_empty() {
674 json!({
675 "id": id.to_string()
676 })
677 } else {
678 json!({
679 "id": id.to_string(),
680 "data": format!("0x{}", hex::encode(data))
681 })
682 },
683 }).collect::<Vec<_>>(),
684 "ja3": ja3,
685 "ja4": ja4,
686 "peet": peet
687 })
688 });
689
690 #[cfg(not(any(feature = "rustls", feature = "boring")))]
691 let tls_info: Option<()> = None;
692
693 let mut h2 = None;
694 if parts.version == Version::HTTP_2 {
695 let early_frames = parts.extensions.get::<EarlyFrameCapture>();
696 let pseudo_headers = parts.extensions.get::<PseudoHeaderOrder>();
697
698 h2 = Some(json!({
699 "early_frames": early_frames,
700 "pseudo_headers": pseudo_headers,
701 }));
702 }
703
704 Ok(Json(json!({
705 "ua": user_agent_info,
706 "http": {
707 "version": format!("{:?}", parts.version),
708 "scheme": scheme,
709 "method": format!("{:?}", parts.method),
710 "authority": authority,
711 "path": parts.uri.path().to_owned(),
712 "query": parts.uri.query().map(str::to_owned),
713 "h2": h2,
714 "headers": headers,
715 "payload": body,
716 "ja4h": ja4h,
717 "curl": curl_request,
718 },
719 "tls": tls_info,
720 "socket_addr": parts.extensions.get::<Forwarded>()
721 .and_then(|f|
722 f.client_socket_addr().map(|addr| addr.to_string())
723 .or_else(|| f.client_ip().map(|ip| ip.to_string()))
724 ).or_else(|| parts.extensions.get::<SocketInfo>().map(|v| v.peer_addr().to_string())),
725 }))
726 .into_response())
727 }
728}