1use crate::{
9 Layer, Service,
10 cli::ForwardKind,
11 combinators::{Either, Either3, Either7},
12 error::{BoxError, ErrorContext, OpaqueError},
13 extensions::ExtensionsRef,
14 http::{
15 Request, Response, Version,
16 body::util::BodyExt,
17 convert::curl,
18 core::h2::frame::EarlyFrameCapture,
19 header::USER_AGENT,
20 headers::forwarded::{CFConnectingIp, ClientIp, TrueClientIp, XClientIp, XRealIp},
21 headers::{HeaderEncode as _, TypedHeader as _, exotic::XClacksOverhead},
22 layer::set_header::SetResponseHeaderLayer,
23 layer::{
24 forwarded::GetForwardedHeaderLayer, required_header::AddRequiredResponseHeadersLayer,
25 trace::TraceLayer,
26 },
27 proto::h1::Http1HeaderMap,
28 proto::h2::PseudoHeaderOrder,
29 server::{HttpServer, layer::upgrade::UpgradeLayer},
30 service::web::{extract::Json, response::IntoResponse},
31 ws::handshake::server::{WebSocketAcceptor, WebSocketEchoService, WebSocketMatcher},
32 },
33 layer::limit::policy::UnlimitedPolicy,
34 layer::{ConsumeErrLayer, LimitLayer, TimeoutLayer, limit::policy::ConcurrentPolicy},
35 net::fingerprint::{AkamaiH2, Ja4H},
36 net::forwarded::Forwarded,
37 net::http::RequestContext,
38 net::stream::{SocketInfo, layer::http::BodyLimitLayer},
39 proxy::haproxy::server::HaProxyLayer,
40 rt::Executor,
41 tcp::TcpStream,
42 telemetry::tracing,
43 ua::{UserAgent, layer::classifier::UserAgentClassifierLayer, profile::UserAgentDatabase},
44};
45
46use serde::Serialize;
47use serde_json::json;
48use std::{convert::Infallible, time::Duration};
49
50#[cfg(all(feature = "rustls", not(feature = "boring")))]
51use crate::tls::rustls::server::{TlsAcceptorData, TlsAcceptorLayer};
52
53#[cfg(any(feature = "rustls", feature = "boring"))]
54use crate::{
55 net::fingerprint::{Ja3, Ja4, PeetPrint},
56 net::tls::{
57 SecureTransport,
58 client::ClientHelloExtension,
59 client::{ECHClientHello, NegotiatedTlsParameters},
60 },
61};
62#[cfg(feature = "boring")]
63use crate::{
64 net::tls::server::ServerConfig,
65 tls::boring::server::{TlsAcceptorData, TlsAcceptorLayer},
66};
67
68#[cfg(feature = "boring")]
69type TlsConfig = ServerConfig;
70
71#[cfg(all(feature = "rustls", not(feature = "boring")))]
72type TlsConfig = TlsAcceptorData;
73
74#[derive(Debug, Clone)]
75pub struct EchoServiceBuilder<H> {
78 concurrent_limit: usize,
79 body_limit: usize,
80 timeout: Duration,
81 forward: Option<ForwardKind>,
82
83 #[cfg(any(feature = "rustls", feature = "boring"))]
84 tls_server_config: Option<TlsConfig>,
85
86 http_version: Option<Version>,
87
88 ws_support: bool,
89
90 http_service_builder: H,
91
92 uadb: Option<std::sync::Arc<UserAgentDatabase>>,
93}
94
95impl Default for EchoServiceBuilder<()> {
96 fn default() -> Self {
97 Self {
98 concurrent_limit: 0,
99 body_limit: 1024 * 1024,
100 timeout: Duration::ZERO,
101 forward: None,
102
103 #[cfg(any(feature = "rustls", feature = "boring"))]
104 tls_server_config: None,
105
106 http_version: None,
107
108 ws_support: false,
109
110 http_service_builder: (),
111
112 uadb: None,
113 }
114 }
115}
116
117impl EchoServiceBuilder<()> {
118 #[must_use]
120 pub fn new() -> Self {
121 Self::default()
122 }
123}
124
125impl<H> EchoServiceBuilder<H> {
126 crate::utils::macros::generate_set_and_with! {
127 pub fn concurrent(mut self, limit: usize) -> Self {
131 self.concurrent_limit = limit;
132 self
133 }
134 }
135
136 crate::utils::macros::generate_set_and_with! {
137 pub fn body_limit(mut self, limit: usize) -> Self {
139 self.body_limit = limit;
140 self
141 }
142 }
143
144 crate::utils::macros::generate_set_and_with! {
145 pub fn timeout(mut self, timeout: Duration) -> Self {
149 self.timeout = timeout;
150 self
151 }
152 }
153
154 crate::utils::macros::generate_set_and_with! {
155 pub fn forward(mut self, kind: Option<ForwardKind>) -> Self {
167 self.forward = kind;
168 self
169 }
170 }
171
172 crate::utils::macros::generate_set_and_with! {
173 #[cfg(any(feature = "rustls", feature = "boring"))]
174 pub fn tls_server_config(mut self, cfg: Option<TlsConfig>) -> Self {
177 self.tls_server_config = cfg;
178 self
179 }
180 }
181
182 crate::utils::macros::generate_set_and_with! {
183 pub fn http_version(mut self, version: Option<Version>) -> Self {
185 self.http_version = version;
186 self
187 }
188 }
189
190 pub fn with_http_layer<H2>(self, layer: H2) -> EchoServiceBuilder<(H, H2)> {
192 EchoServiceBuilder {
193 concurrent_limit: self.concurrent_limit,
194 body_limit: self.body_limit,
195 timeout: self.timeout,
196 forward: self.forward,
197
198 #[cfg(any(feature = "rustls", feature = "boring"))]
199 tls_server_config: self.tls_server_config,
200
201 http_version: self.http_version,
202
203 ws_support: self.ws_support,
204
205 http_service_builder: (self.http_service_builder, layer),
206
207 uadb: self.uadb,
208 }
209 }
210
211 crate::utils::macros::generate_set_and_with! {
212 pub fn user_agent_database(
215 mut self,
216 db: Option<std::sync::Arc<UserAgentDatabase>>,
217 ) -> Self {
218 self.uadb = db;
219 self
220 }
221 }
222
223 crate::utils::macros::generate_set_and_with! {
224 pub fn ws_support(
226 mut self,
227 support: bool,
228 ) -> Self {
229 self.ws_support = support;
230 self
231 }
232 }
233}
234
235impl<H> EchoServiceBuilder<H>
236where
237 H: Layer<EchoService, Service: Service<Request, Response = Response, Error = BoxError>>,
238{
239 #[allow(unused_mut)]
240 pub fn build(
242 mut self,
243 executor: Executor,
244 ) -> Result<impl Service<TcpStream, Response = (), Error = Infallible>, BoxError> {
245 let tcp_forwarded_layer = match &self.forward {
246 Some(ForwardKind::HaProxy) => Some(HaProxyLayer::default()),
247 _ => None,
248 };
249
250 let http_service = self.build_http();
251
252 #[cfg(all(feature = "rustls", not(feature = "boring")))]
253 let tls_cfg = self.tls_server_config;
254
255 #[cfg(feature = "boring")]
256 let tls_cfg: Option<TlsAcceptorData> = match self.tls_server_config {
257 Some(cfg) => Some(cfg.try_into()?),
258 None => None,
259 };
260
261 let tcp_service_builder = (
262 ConsumeErrLayer::trace(tracing::Level::DEBUG),
263 LimitLayer::new(if self.concurrent_limit > 0 {
264 Either::A(ConcurrentPolicy::max(self.concurrent_limit))
265 } else {
266 Either::B(UnlimitedPolicy::new())
267 }),
268 if !self.timeout.is_zero() {
269 TimeoutLayer::new(self.timeout)
270 } else {
271 TimeoutLayer::never()
272 },
273 tcp_forwarded_layer,
274 BodyLimitLayer::request_only(self.body_limit),
275 #[cfg(any(feature = "rustls", feature = "boring"))]
276 tls_cfg.map(|cfg| TlsAcceptorLayer::new(cfg).with_store_client_hello(true)),
277 );
278
279 let http_transport_service = match self.http_version {
280 Some(Version::HTTP_2) => Either3::A({
281 let mut http = HttpServer::h2(executor);
282 if self.ws_support {
283 http.h2_mut().enable_connect_protocol();
284 }
285 http.service(http_service)
286 }),
287 Some(Version::HTTP_11 | Version::HTTP_10 | Version::HTTP_09) => {
288 Either3::B(HttpServer::http1().service(http_service))
289 }
290 Some(_) => {
291 return Err(OpaqueError::from_display("unsupported http version").into_boxed());
292 }
293 None => Either3::C({
294 let mut http = HttpServer::auto(executor);
295 if self.ws_support {
296 http.h2_mut().enable_connect_protocol();
297 }
298 http.service(http_service)
299 }),
300 };
301
302 Ok(tcp_service_builder.into_layer(http_transport_service))
303 }
304
305 pub fn build_http(
307 &self,
308 ) -> impl Service<Request, Response: IntoResponse, Error = Infallible> + use<H> {
309 let http_forwarded_layer = match &self.forward {
310 None | Some(ForwardKind::HaProxy) => None,
311 Some(ForwardKind::Forwarded) => Some(Either7::A(GetForwardedHeaderLayer::forwarded())),
312 Some(ForwardKind::XForwardedFor) => {
313 Some(Either7::B(GetForwardedHeaderLayer::x_forwarded_for()))
314 }
315 Some(ForwardKind::XClientIp) => {
316 Some(Either7::C(GetForwardedHeaderLayer::<XClientIp>::new()))
317 }
318 Some(ForwardKind::ClientIp) => {
319 Some(Either7::D(GetForwardedHeaderLayer::<ClientIp>::new()))
320 }
321 Some(ForwardKind::XRealIp) => {
322 Some(Either7::E(GetForwardedHeaderLayer::<XRealIp>::new()))
323 }
324 Some(ForwardKind::CFConnectingIp) => {
325 Some(Either7::F(GetForwardedHeaderLayer::<CFConnectingIp>::new()))
326 }
327 Some(ForwardKind::TrueClientIp) => {
328 Some(Either7::G(GetForwardedHeaderLayer::<TrueClientIp>::new()))
329 }
330 };
331
332 (
333 TraceLayer::new_for_http(),
334 SetResponseHeaderLayer::if_not_present_fn(XClacksOverhead::name().clone(), || {
335 std::future::ready(XClacksOverhead::new().encode_to_value())
336 }),
337 AddRequiredResponseHeadersLayer::default(),
338 UserAgentClassifierLayer::new(),
339 ConsumeErrLayer::default(),
340 http_forwarded_layer,
341 self.ws_support.then(|| {
342 UpgradeLayer::new(
343 WebSocketMatcher::default(),
344 {
345 let acceptor = WebSocketAcceptor::default()
346 .with_protocols_flex(true)
347 .with_echo_protocols();
348
349 #[cfg(feature = "compression")]
350 {
351 acceptor.with_per_message_deflate_overwrite_extensions()
352 }
353 #[cfg(not(feature = "compression"))]
354 {
355 acceptor
356 }
357 },
358 ConsumeErrLayer::trace(tracing::Level::DEBUG)
359 .into_layer(WebSocketEchoService::default()),
360 )
361 }),
362 )
363 .into_layer(self.http_service_builder.layer(EchoService {
364 uadb: self.uadb.clone(),
365 }))
366 }
367}
368
369#[derive(Debug, Clone)]
370#[non_exhaustive]
371pub struct EchoService {
373 uadb: Option<std::sync::Arc<UserAgentDatabase>>,
374}
375
376impl Service<Request> for EchoService {
377 type Response = Response;
378 type Error = BoxError;
379
380 async fn serve(&self, req: Request) -> Result<Self::Response, Self::Error> {
381 let user_agent_info = req
382 .extensions()
383 .get()
384 .map(|ua: &UserAgent| {
385 json!({
386 "user_agent": ua.header_str().to_owned(),
387 "kind": ua.info().map(|info| info.kind.to_string()),
388 "version": ua.info().and_then(|info| info.version),
389 "platform": ua.platform().map(|v| v.to_string()),
390 })
391 })
392 .unwrap_or_default();
393
394 let request_context = RequestContext::try_from(&req)?;
395
396 let authority = request_context.authority.to_string();
397 let scheme = request_context.protocol.to_string();
398
399 let ua_str = req
400 .headers()
401 .get(USER_AGENT)
402 .and_then(|h| h.to_str().ok())
403 .map(ToOwned::to_owned);
404 tracing::debug!(
405 user_agent.original = ua_str,
406 "echo request received from ua with ua header",
407 );
408
409 #[derive(Debug, Serialize)]
410 struct FingerprintProfileData {
411 hash: String,
412 verbose: String,
413 matched: bool,
414 }
415
416 let ja4h = Ja4H::compute(&req)
417 .inspect_err(|err| tracing::error!("ja4h compute failure: {err:?}"))
418 .ok()
419 .map(|ja4h| {
420 let mut profile_ja4h: Option<FingerprintProfileData> = None;
421
422 if let Some(uadb) = self.uadb.as_deref()
423 && let Some(profile) =
424 ua_str.as_deref().and_then(|s| uadb.get_exact_header_str(s))
425 {
426 let matched_ja4h = match req.version() {
427 Version::HTTP_10 | Version::HTTP_11 => profile
428 .http
429 .ja4h_h1_navigate(Some(req.method().clone()))
430 .inspect_err(|err| {
431 tracing::trace!(
432 "ja4h computation of matched profile for incoming h1 req: {err:?}"
433 )
434 })
435 .ok(),
436 Version::HTTP_2 => profile
437 .http
438 .ja4h_h2_navigate(Some(req.method().clone()))
439 .inspect_err(|err| {
440 tracing::trace!(
441 "ja4h computation of matched profile for incoming h2 req: {err:?}"
442 )
443 })
444 .ok(),
445 _ => None,
446 };
447 if let Some(tgt) = matched_ja4h {
448 let hash = format!("{tgt}");
449 let matched = format!("{ja4h}") == hash;
450 profile_ja4h = Some(FingerprintProfileData {
451 hash,
452 verbose: format!("{tgt:?}"),
453 matched,
454 });
455 }
456 }
457
458 json!({
459 "hash": format!("{ja4h}"),
460 "verbose": format!("{ja4h:?}"),
461 "profile": profile_ja4h,
462 })
463 });
464
465 let (parts, body) = req.into_parts();
466
467 let body = body
468 .collect()
469 .await
470 .context("collect request body for echo purposes")?
471 .to_bytes();
472
473 let curl_request = curl::cmd_string_for_request_parts_and_payload(&parts, &body);
474
475 let headers: Vec<_> = Http1HeaderMap::new(parts.headers, Some(&parts.extensions))
476 .into_iter()
477 .map(|(name, value)| {
478 (
479 name,
480 std::str::from_utf8(value.as_bytes())
481 .map(|s| s.to_owned())
482 .unwrap_or_else(|_| format!("0x{:x?}", value.as_bytes())),
483 )
484 })
485 .collect();
486
487 let body = hex::encode(body.as_ref());
488
489 #[cfg(any(feature = "rustls", feature = "boring"))]
490 let tls_info = parts
491 .extensions
492 .get::<SecureTransport>()
493 .and_then(|st| st.client_hello())
494 .map(|hello| {
495 let ja4 = Ja4::compute(parts.extensions.extensions())
496 .inspect_err(|err| tracing::trace!("ja4 computation: {err:?}"))
497 .ok();
498
499 let mut profile_ja4: Option<FingerprintProfileData> = None;
500
501 if let Some(uadb) = self.uadb.as_deref()
502 && let Some(profile) =
503 ua_str.as_deref().and_then(|s| uadb.get_exact_header_str(s))
504 {
505 let matched_ja4 = profile
506 .tls
507 .compute_ja4(
508 parts
509 .extensions
510 .get::<NegotiatedTlsParameters>()
511 .map(|param| param.protocol_version),
512 )
513 .inspect_err(|err| {
514 tracing::trace!("ja4 computation of matched profile: {err:?}")
515 })
516 .ok();
517 if let (Some(src), Some(tgt)) = (ja4.as_ref(), matched_ja4) {
518 let hash = format!("{tgt}");
519 let matched = format!("{src}") == hash;
520 profile_ja4 = Some(FingerprintProfileData {
521 hash,
522 verbose: format!("{tgt:?}"),
523 matched,
524 });
525 }
526 }
527
528 let ja4 = ja4.map(|ja4| {
529 json!({
530 "hash": format!("{ja4}"),
531 "verbose": format!("{ja4:?}"),
532 "profile": profile_ja4,
533 })
534 });
535
536 let ja3 = Ja3::compute(parts.extensions.extensions())
537 .inspect_err(|err| tracing::trace!("ja3 computation: {err:?}"))
538 .ok();
539
540 let mut profile_ja3: Option<FingerprintProfileData> = None;
541
542 if let Some(uadb) = self.uadb.as_deref()
543 && let Some(profile) =
544 ua_str.as_deref().and_then(|s| uadb.get_exact_header_str(s))
545 {
546 let matched_ja3 = profile
547 .tls
548 .compute_ja3(
549 parts
550 .extensions
551 .get::<NegotiatedTlsParameters>()
552 .map(|param| param.protocol_version),
553 )
554 .inspect_err(|err| {
555 tracing::trace!("ja3 computation of matched profile: {err:?}")
556 })
557 .ok();
558 if let (Some(src), Some(tgt)) = (ja3.as_ref(), matched_ja3) {
559 let hash = format!("{tgt:x}");
560 let matched = format!("{src:x}") == hash;
561 profile_ja3 = Some(FingerprintProfileData {
562 hash,
563 verbose: format!("{tgt}"),
564 matched,
565 });
566 }
567 }
568
569 let ja3 = ja3.map(|ja3| {
570 json!({
571 "hash": format!("{ja3:x}"),
572 "verbose": format!("{ja3}"),
573 "profile": profile_ja3,
574 })
575 });
576
577 let peet = PeetPrint::compute(parts.extensions.extensions())
578 .inspect_err(|err| tracing::trace!("peet computation: {err:?}"))
579 .ok();
580
581 let mut profile_peet: Option<FingerprintProfileData> = None;
582
583 if let Some(uadb) = self.uadb.as_deref()
584 && let Some(profile) =
585 ua_str.as_deref().and_then(|s| uadb.get_exact_header_str(s))
586 {
587 let matched_peet = profile
588 .tls
589 .compute_peet()
590 .inspect_err(|err| {
591 tracing::trace!("peetprint computation of matched profile: {err:?}")
592 })
593 .ok();
594 if let (Some(src), Some(tgt)) = (peet.as_ref(), matched_peet) {
595 let hash = format!("{tgt}");
596 let matched = format!("{src}") == hash;
597 profile_peet = Some(FingerprintProfileData {
598 hash,
599 verbose: format!("{tgt:?}"),
600 matched,
601 });
602 }
603 }
604
605 let peet = peet.map(|peet| {
606 json!({
607 "hash": format!("{peet}"),
608 "verbose": format!("{peet:?}"),
609 "profile": profile_peet,
610 })
611 });
612
613 json!({
614 "header": {
615 "version": hello.protocol_version().to_string(),
616 "cipher_suites": hello
617 .cipher_suites().iter().map(|s| s.to_string()).collect::<Vec<_>>(),
618 "compression_algorithms": hello
619 .compression_algorithms().iter().map(|s| s.to_string()).collect::<Vec<_>>(),
620 },
621 "extensions": hello.extensions().iter().map(|extension| match extension {
622 ClientHelloExtension::ServerName(domain) => json!({
623 "id": extension.id().to_string(),
624 "data": domain,
625 }),
626 ClientHelloExtension::SignatureAlgorithms(v) => json!({
627 "id": extension.id().to_string(),
628 "data": v.iter().map(|s| s.to_string()).collect::<Vec<_>>(),
629 }),
630 ClientHelloExtension::SupportedVersions(v) => json!({
631 "id": extension.id().to_string(),
632 "data": v.iter().map(|s| s.to_string()).collect::<Vec<_>>(),
633 }),
634 ClientHelloExtension::ApplicationLayerProtocolNegotiation(v) => json!({
635 "id": extension.id().to_string(),
636 "data": v.iter().map(|s| s.to_string()).collect::<Vec<_>>(),
637 }),
638 ClientHelloExtension::ApplicationSettings(v) => json!({
639 "id": extension.id().to_string(),
640 "data": v.iter().map(|s| s.to_string()).collect::<Vec<_>>(),
641 }),
642 ClientHelloExtension::SupportedGroups(v) => json!({
643 "id": extension.id().to_string(),
644 "data": v.iter().map(|s| s.to_string()).collect::<Vec<_>>(),
645 }),
646 ClientHelloExtension::ECPointFormats(v) => json!({
647 "id": extension.id().to_string(),
648 "data": v.iter().map(|s| s.to_string()).collect::<Vec<_>>(),
649 }),
650 ClientHelloExtension::CertificateCompression(v) => json!({
651 "id": extension.id().to_string(),
652 "data": v.iter().map(|s| s.to_string()).collect::<Vec<_>>(),
653 }),
654 ClientHelloExtension::DelegatedCredentials(v) => json!({
655 "id": extension.id().to_string(),
656 "data": v.iter().map(|s| s.to_string()).collect::<Vec<_>>(),
657 }),
658 ClientHelloExtension::RecordSizeLimit(v) => json!({
659 "id": extension.id().to_string(),
660 "data": v.to_string(),
661 }),
662 ClientHelloExtension::EncryptedClientHello(ech) => match ech {
663 ECHClientHello::Outer(ech) => json!({
664 "id": extension.id().to_string(),
665 "data": {
666 "type": "outer",
667 "cipher_suite": {
668 "aead_id": ech.cipher_suite.aead_id.to_string(),
669 "kdf_id": ech.cipher_suite.kdf_id.to_string(),
670 },
671 "config_id": ech.config_id,
672 "enc": format!("0x{}", hex::encode(&ech.enc)),
673 "payload": format!("0x{}", hex::encode(&ech.payload)),
674 },
675 }),
676 ECHClientHello::Inner => json!({
677 "id": extension.id().to_string(),
678 "data": {
679 "type": "inner",
680 },
681 })
682
683 }
684 ClientHelloExtension::Opaque { id, data } => if data.is_empty() {
685 json!({
686 "id": id.to_string()
687 })
688 } else {
689 json!({
690 "id": id.to_string(),
691 "data": format!("0x{}", hex::encode(data))
692 })
693 },
694 }).collect::<Vec<_>>(),
695 "ja3": ja3,
696 "ja4": ja4,
697 "peet": peet
698 })
699 });
700
701 #[cfg(not(any(feature = "rustls", feature = "boring")))]
702 let tls_info: Option<()> = None;
703
704 let mut h2 = None;
705 if parts.version == Version::HTTP_2 {
706 let early_frames = parts.extensions.get::<EarlyFrameCapture>();
707 let pseudo_headers = parts.extensions.get::<PseudoHeaderOrder>();
708 let akamai_h2 = AkamaiH2::compute(&parts.extensions)
709 .inspect_err(|err| tracing::trace!("akamai h2 compute failure: {err:?}"))
710 .ok()
711 .map(|akamai| {
712 json!({
713 "hash": format!("{akamai}"),
714 "verbose": format!("{akamai:?}"),
715 })
716 });
717
718 h2 = Some(json!({
719 "early_frames": early_frames,
720 "pseudo_headers": pseudo_headers,
721 "akamai_h2": akamai_h2,
722 }));
723 }
724
725 Ok(Json(json!({
726 "ua": user_agent_info,
727 "http": {
728 "version": format!("{:?}", parts.version),
729 "scheme": scheme,
730 "method": format!("{:?}", parts.method),
731 "authority": authority,
732 "path": parts.uri.path().to_owned(),
733 "query": parts.uri.query().map(str::to_owned),
734 "h2": h2,
735 "headers": headers,
736 "payload": body,
737 "ja4h": ja4h,
738 "curl": curl_request,
739 },
740 "tls": tls_info,
741 "socket_addr": parts.extensions.get::<Forwarded>()
742 .and_then(|f|
743 f.client_socket_addr().map(|addr| addr.to_string())
744 .or_else(|| f.client_ip().map(|ip| ip.to_string()))
745 ).or_else(|| parts.extensions.get::<SocketInfo>().map(|v| v.peer_addr().to_string())),
746 }))
747 .into_response())
748 }
749}