1use crate::{
9 Context, Layer, Service,
10 cli::ForwardKind,
11 combinators::{Either3, Either7},
12 error::{BoxError, OpaqueError},
13 http::{
14 IntoResponse, Request, Response, Version,
15 conn::LastPeerPriorityParams,
16 dep::http_body_util::BodyExt,
17 header::USER_AGENT,
18 headers::{CFConnectingIp, ClientIp, TrueClientIp, XClientIp, XRealIp},
19 layer::{
20 forwarded::GetForwardedHeadersLayer,
21 required_header::AddRequiredResponseHeadersLayer,
22 trace::TraceLayer,
23 ua::{UserAgent, UserAgentClassifierLayer},
24 },
25 proto::h1::Http1HeaderMap,
26 proto::h2::PseudoHeaderOrder,
27 proto::h2::frame::InitialPeerSettings,
28 response::Json,
29 server::HttpServer,
30 },
31 layer::{ConsumeErrLayer, LimitLayer, TimeoutLayer, limit::policy::ConcurrentPolicy},
32 net::fingerprint::Ja4H,
33 net::forwarded::Forwarded,
34 net::http::RequestContext,
35 net::stream::{SocketInfo, layer::http::BodyLimitLayer},
36 proxy::haproxy::server::HaProxyLayer,
37 rt::Executor,
38 ua::profile::UserAgentDatabase,
39};
40#[cfg(any(feature = "rustls", feature = "boring"))]
41use crate::{
42 net::fingerprint::{Ja3, Ja4},
43 net::tls::client::{ECHClientHello, NegotiatedTlsParameters},
44 tls::types::{SecureTransport, client::ClientHelloExtension},
45};
46use serde::Serialize;
47use serde_json::json;
48use std::{convert::Infallible, time::Duration};
49use tokio::net::TcpStream;
50
51#[cfg(feature = "boring")]
52use crate::{
53 net::tls::server::ServerConfig,
54 tls::boring::server::{TlsAcceptorData, TlsAcceptorLayer},
55};
56
57#[cfg(all(feature = "rustls", not(feature = "boring")))]
58use crate::tls::rustls::server::{TlsAcceptorData, TlsAcceptorLayer};
59
60#[cfg(feature = "boring")]
61type TlsConfig = ServerConfig;
62
63#[cfg(all(feature = "rustls", not(feature = "boring")))]
64type TlsConfig = TlsAcceptorData;
65
66#[derive(Debug, Clone)]
67pub struct EchoServiceBuilder<H> {
70 concurrent_limit: usize,
71 body_limit: usize,
72 timeout: Duration,
73 forward: Option<ForwardKind>,
74
75 #[cfg(any(feature = "rustls", feature = "boring"))]
76 tls_server_config: Option<TlsConfig>,
77
78 http_version: Option<Version>,
79
80 http_service_builder: H,
81
82 uadb: Option<std::sync::Arc<UserAgentDatabase>>,
83}
84
85impl Default for EchoServiceBuilder<()> {
86 fn default() -> Self {
87 Self {
88 concurrent_limit: 0,
89 body_limit: 1024 * 1024,
90 timeout: Duration::ZERO,
91 forward: None,
92
93 #[cfg(any(feature = "rustls", feature = "boring"))]
94 tls_server_config: None,
95
96 http_version: None,
97
98 http_service_builder: (),
99
100 uadb: None,
101 }
102 }
103}
104
105impl EchoServiceBuilder<()> {
106 pub fn new() -> Self {
108 Self::default()
109 }
110}
111
112impl<H> EchoServiceBuilder<H> {
113 pub fn concurrent(mut self, limit: usize) -> Self {
117 self.concurrent_limit = limit;
118 self
119 }
120
121 pub fn set_concurrent(&mut self, limit: usize) -> &mut Self {
125 self.concurrent_limit = limit;
126 self
127 }
128
129 pub fn body_limit(mut self, limit: usize) -> Self {
131 self.body_limit = limit;
132 self
133 }
134
135 pub fn set_body_limit(&mut self, limit: usize) -> &mut Self {
137 self.body_limit = limit;
138 self
139 }
140
141 pub fn timeout(mut self, timeout: Duration) -> Self {
145 self.timeout = timeout;
146 self
147 }
148
149 pub fn set_timeout(&mut self, timeout: Duration) -> &mut Self {
153 self.timeout = timeout;
154 self
155 }
156
157 pub fn forward(self, kind: ForwardKind) -> Self {
169 self.maybe_forward(Some(kind))
170 }
171
172 pub fn set_forward(&mut self, kind: ForwardKind) -> &mut Self {
176 self.forward = Some(kind);
177 self
178 }
179
180 pub fn maybe_forward(mut self, maybe_kind: Option<ForwardKind>) -> Self {
184 self.forward = maybe_kind;
185 self
186 }
187
188 #[cfg(any(feature = "rustls", feature = "boring"))]
189 pub fn tls_server_config(mut self, cfg: TlsConfig) -> Self {
192 self.tls_server_config = Some(cfg);
193 self
194 }
195
196 #[cfg(any(feature = "rustls", feature = "boring"))]
197 pub fn set_tls_server_config(&mut self, cfg: TlsConfig) -> &mut Self {
200 self.tls_server_config = Some(cfg);
201 self
202 }
203
204 #[cfg(any(feature = "rustls", feature = "boring"))]
205 pub fn maybe_tls_server_config(mut self, cfg: Option<TlsConfig>) -> Self {
208 self.tls_server_config = cfg;
209 self
210 }
211
212 pub fn http_version(mut self, version: Version) -> Self {
214 self.http_version = Some(version);
215 self
216 }
217
218 pub fn maybe_http_version(mut self, version: Option<Version>) -> Self {
220 self.http_version = version;
221 self
222 }
223
224 pub fn set_http_version(&mut self, version: Version) -> &mut Self {
226 self.http_version = Some(version);
227 self
228 }
229
230 pub fn http_layer<H2>(self, layer: H2) -> EchoServiceBuilder<(H, H2)> {
232 EchoServiceBuilder {
233 concurrent_limit: self.concurrent_limit,
234 body_limit: self.body_limit,
235 timeout: self.timeout,
236 forward: self.forward,
237
238 #[cfg(any(feature = "rustls", feature = "boring"))]
239 tls_server_config: self.tls_server_config,
240
241 http_version: self.http_version,
242
243 http_service_builder: (self.http_service_builder, layer),
244
245 uadb: self.uadb,
246 }
247 }
248
249 pub fn with_user_agent_database(mut self, db: std::sync::Arc<UserAgentDatabase>) -> Self {
252 self.uadb = Some(db);
253 self
254 }
255
256 pub fn maybe_with_user_agent_database(
259 mut self,
260 db: Option<std::sync::Arc<UserAgentDatabase>>,
261 ) -> Self {
262 self.uadb = db;
263 self
264 }
265
266 pub fn set_user_agent_database(&mut self, db: std::sync::Arc<UserAgentDatabase>) -> &mut Self {
269 self.uadb = Some(db);
270 self
271 }
272}
273
274impl<H> EchoServiceBuilder<H>
275where
276 H: Layer<EchoService, Service: Service<(), Request, Response = Response, Error = BoxError>>,
277{
278 #[allow(unused_mut)]
279 pub fn build(
281 mut self,
282 executor: Executor,
283 ) -> Result<impl Service<(), TcpStream, Response = (), Error = Infallible>, BoxError> {
284 let tcp_forwarded_layer = match &self.forward {
285 Some(ForwardKind::HaProxy) => Some(HaProxyLayer::default()),
286 _ => None,
287 };
288
289 let http_service = self.build_http();
290
291 #[cfg(all(feature = "rustls", not(feature = "boring")))]
292 let tls_cfg = self.tls_server_config;
293
294 #[cfg(feature = "boring")]
295 let tls_cfg: Option<TlsAcceptorData> = match self.tls_server_config {
296 Some(cfg) => Some(cfg.try_into()?),
297 None => None,
298 };
299
300 let tcp_service_builder = (
301 ConsumeErrLayer::trace(tracing::Level::DEBUG),
302 (self.concurrent_limit > 0)
303 .then(|| LimitLayer::new(ConcurrentPolicy::max(self.concurrent_limit))),
304 (!self.timeout.is_zero()).then(|| TimeoutLayer::new(self.timeout)),
305 tcp_forwarded_layer,
306 BodyLimitLayer::request_only(self.body_limit),
307 #[cfg(any(feature = "rustls", feature = "boring"))]
308 tls_cfg.map(|cfg| {
309 #[cfg(feature = "boring")]
310 return TlsAcceptorLayer::new(cfg).with_store_client_hello(true);
311 #[cfg(all(feature = "rustls", not(feature = "boring")))]
312 TlsAcceptorLayer::new(cfg).with_store_client_hello(true)
313 }),
314 );
315
316 let http_transport_service = match self.http_version {
317 Some(Version::HTTP_2) => Either3::A(HttpServer::h2(executor).service(http_service)),
318 Some(Version::HTTP_11 | Version::HTTP_10 | Version::HTTP_09) => {
319 Either3::B(HttpServer::http1().service(http_service))
320 }
321 Some(_) => {
322 return Err(OpaqueError::from_display("unsupported http version").into_boxed());
323 }
324 None => Either3::C(HttpServer::auto(executor).service(http_service)),
325 };
326
327 Ok(tcp_service_builder.into_layer(http_transport_service))
328 }
329
330 pub fn build_http(
332 &self,
333 ) -> impl Service<(), Request, Response: IntoResponse, Error = Infallible> + use<H> {
334 let http_forwarded_layer = match &self.forward {
335 None | Some(ForwardKind::HaProxy) => None,
336 Some(ForwardKind::Forwarded) => Some(Either7::A(GetForwardedHeadersLayer::forwarded())),
337 Some(ForwardKind::XForwardedFor) => {
338 Some(Either7::B(GetForwardedHeadersLayer::x_forwarded_for()))
339 }
340 Some(ForwardKind::XClientIp) => {
341 Some(Either7::C(GetForwardedHeadersLayer::<XClientIp>::new()))
342 }
343 Some(ForwardKind::ClientIp) => {
344 Some(Either7::D(GetForwardedHeadersLayer::<ClientIp>::new()))
345 }
346 Some(ForwardKind::XRealIp) => {
347 Some(Either7::E(GetForwardedHeadersLayer::<XRealIp>::new()))
348 }
349 Some(ForwardKind::CFConnectingIp) => {
350 Some(Either7::F(GetForwardedHeadersLayer::<CFConnectingIp>::new()))
351 }
352 Some(ForwardKind::TrueClientIp) => {
353 Some(Either7::G(GetForwardedHeadersLayer::<TrueClientIp>::new()))
354 }
355 };
356
357 (
358 TraceLayer::new_for_http(),
359 AddRequiredResponseHeadersLayer::default(),
360 UserAgentClassifierLayer::new(),
361 ConsumeErrLayer::default(),
362 http_forwarded_layer,
363 )
364 .into_layer(self.http_service_builder.layer(EchoService {
365 uadb: self.uadb.clone(),
366 }))
367 }
368}
369
370#[derive(Debug, Clone)]
371#[non_exhaustive]
372pub struct EchoService {
374 uadb: Option<std::sync::Arc<UserAgentDatabase>>,
375}
376
377impl Service<(), Request> for EchoService {
378 type Response = Response;
379 type Error = BoxError;
380
381 async fn serve(
382 &self,
383 mut ctx: Context<()>,
384 req: Request,
385 ) -> Result<Self::Response, Self::Error> {
386 let user_agent_info = ctx
387 .get()
388 .map(|ua: &UserAgent| {
389 json!({
390 "user_agent": ua.header_str().to_owned(),
391 "kind": ua.info().map(|info| info.kind.to_string()),
392 "version": ua.info().and_then(|info| info.version),
393 "platform": ua.platform().map(|v| v.to_string()),
394 })
395 })
396 .unwrap_or_default();
397
398 let request_context =
399 ctx.get_or_try_insert_with_ctx::<RequestContext, _>(|ctx| (ctx, &req).try_into())?;
400 let authority = request_context.authority.to_string();
401 let scheme = request_context.protocol.to_string();
402
403 let ua_str = req
404 .headers()
405 .get(USER_AGENT)
406 .and_then(|h| h.to_str().ok())
407 .map(ToOwned::to_owned);
408 tracing::debug!(?ua_str, "echo request received from ua with ua header");
409
410 #[derive(Debug, Serialize)]
411 struct FingerprintProfileData {
412 hash: String,
413 verbose: String,
414 matched: bool,
415 }
416
417 let ja4h = Ja4H::compute(&req)
418 .inspect_err(|err| tracing::error!(?err, "ja4h compute failure"))
419 .ok()
420 .map(|ja4h| {
421 let mut profile_ja4h: Option<FingerprintProfileData> = None;
422
423 if let Some(uadb) = self.uadb.as_deref() {
424 if let Some(profile) =
425 ua_str.as_deref().and_then(|s| uadb.get_exact_header_str(s))
426 {
427 let matched_ja4h = match req.version() {
428 Version::HTTP_10 | Version::HTTP_11 => profile
429 .http
430 .ja4h_h1_navigate(Some(req.method().clone()))
431 .inspect_err(|err| {
432 tracing::trace!(
433 ?err,
434 "ja4h computation of matched profile for incoming h1 req"
435 )
436 })
437 .ok(),
438 Version::HTTP_2 => profile
439 .http
440 .ja4h_h2_navigate(Some(req.method().clone()))
441 .inspect_err(|err| {
442 tracing::trace!(
443 ?err,
444 "ja4h computation of matched profile for incoming h2 req"
445 )
446 })
447 .ok(),
448 _ => None,
449 };
450 if let Some(tgt) = matched_ja4h {
451 let hash = format!("{tgt}");
452 let matched = format!("{ja4h}") == hash;
453 profile_ja4h = Some(FingerprintProfileData {
454 hash,
455 verbose: format!("{tgt:?}"),
456 matched,
457 });
458 }
459 }
460 }
461
462 json!({
463 "hash": format!("{ja4h}"),
464 "verbose": format!("{ja4h:?}"),
465 "profile": profile_ja4h,
466 })
467 });
468
469 let (mut parts, body) = req.into_parts();
470
471 let headers: Vec<_> = Http1HeaderMap::new(parts.headers, Some(&mut parts.extensions))
472 .into_iter()
473 .map(|(name, value)| {
474 (
475 name,
476 std::str::from_utf8(value.as_bytes())
477 .map(|s| s.to_owned())
478 .unwrap_or_else(|_| format!("0x{:x?}", value.as_bytes())),
479 )
480 })
481 .collect();
482
483 let body = body.collect().await.unwrap().to_bytes();
484 let body = hex::encode(body.as_ref());
485
486 #[cfg(any(feature = "rustls", feature = "boring"))]
487 let tls_info = ctx
488 .get::<SecureTransport>()
489 .and_then(|st| st.client_hello())
490 .map(|hello| {
491 let ja4 = Ja4::compute(ctx.extensions())
492 .inspect_err(|err| tracing::trace!(?err, "ja4 computation"))
493 .ok();
494
495 let mut profile_ja4: Option<FingerprintProfileData> = None;
496
497 if let Some(uadb) = self.uadb.as_deref() {
498 if let Some(profile) =
499 ua_str.as_deref().and_then(|s| uadb.get_exact_header_str(s))
500 {
501 let matched_ja4 = profile
502 .tls
503 .compute_ja4(
504 ctx.get::<NegotiatedTlsParameters>()
505 .map(|param| param.protocol_version),
506 )
507 .inspect_err(|err| {
508 tracing::trace!(?err, "ja4 computation of matched profile")
509 })
510 .ok();
511 if let (Some(src), Some(tgt)) = (ja4.as_ref(), matched_ja4) {
512 let hash = format!("{tgt}");
513 let matched = format!("{src}") == hash;
514 profile_ja4 = Some(FingerprintProfileData {
515 hash,
516 verbose: format!("{tgt:?}"),
517 matched,
518 });
519 }
520 }
521 }
522
523 let ja4 = ja4.map(|ja4| {
524 json!({
525 "hash": format!("{ja4}"),
526 "verbose": format!("{ja4:?}"),
527 "profile": profile_ja4,
528 })
529 });
530
531 let ja3 = Ja3::compute(ctx.extensions())
532 .inspect_err(|err| tracing::trace!(?err, "ja3 computation"))
533 .ok();
534
535 let mut profile_ja3: Option<FingerprintProfileData> = None;
536
537 if let Some(uadb) = self.uadb.as_deref() {
538 if let Some(profile) =
539 ua_str.as_deref().and_then(|s| uadb.get_exact_header_str(s))
540 {
541 let matched_ja3 = profile
542 .tls
543 .compute_ja3(
544 ctx.get::<NegotiatedTlsParameters>()
545 .map(|param| param.protocol_version),
546 )
547 .inspect_err(|err| {
548 tracing::trace!(?err, "ja3 computation of matched profile")
549 })
550 .ok();
551 if let (Some(src), Some(tgt)) = (ja3.as_ref(), matched_ja3) {
552 let hash = format!("{tgt:x}");
553 let matched = format!("{src:x}") == hash;
554 profile_ja3 = Some(FingerprintProfileData {
555 hash,
556 verbose: format!("{tgt}"),
557 matched,
558 });
559 }
560 }
561 }
562
563 let ja3 = ja3.map(|ja3| {
564 json!({
565 "hash": format!("{ja3:x}"),
566 "verbose": format!("{ja3}"),
567 "profile": profile_ja3,
568 })
569 });
570
571 json!({
572 "header": {
573 "version": hello.protocol_version().to_string(),
574 "cipher_suites": hello
575 .cipher_suites().iter().map(|s| s.to_string()).collect::<Vec<_>>(),
576 "compression_algorithms": hello
577 .compression_algorithms().iter().map(|s| s.to_string()).collect::<Vec<_>>(),
578 },
579 "extensions": hello.extensions().iter().map(|extension| match extension {
580 ClientHelloExtension::ServerName(domain) => json!({
581 "id": extension.id().to_string(),
582 "data": domain,
583 }),
584 ClientHelloExtension::SignatureAlgorithms(v) => json!({
585 "id": extension.id().to_string(),
586 "data": v.iter().map(|s| s.to_string()).collect::<Vec<_>>(),
587 }),
588 ClientHelloExtension::SupportedVersions(v) => json!({
589 "id": extension.id().to_string(),
590 "data": v.iter().map(|s| s.to_string()).collect::<Vec<_>>(),
591 }),
592 ClientHelloExtension::ApplicationLayerProtocolNegotiation(v) => json!({
593 "id": extension.id().to_string(),
594 "data": v.iter().map(|s| s.to_string()).collect::<Vec<_>>(),
595 }),
596 ClientHelloExtension::SupportedGroups(v) => json!({
597 "id": extension.id().to_string(),
598 "data": v.iter().map(|s| s.to_string()).collect::<Vec<_>>(),
599 }),
600 ClientHelloExtension::ECPointFormats(v) => json!({
601 "id": extension.id().to_string(),
602 "data": v.iter().map(|s| s.to_string()).collect::<Vec<_>>(),
603 }),
604 ClientHelloExtension::CertificateCompression(v) => json!({
605 "id": extension.id().to_string(),
606 "data": v.iter().map(|s| s.to_string()).collect::<Vec<_>>(),
607 }),
608 ClientHelloExtension::DelegatedCredentials(v) => json!({
609 "id": extension.id().to_string(),
610 "data": v.iter().map(|s| s.to_string()).collect::<Vec<_>>(),
611 }),
612 ClientHelloExtension::RecordSizeLimit(v) => json!({
613 "id": extension.id().to_string(),
614 "data": v.to_string(),
615 }),
616 ClientHelloExtension::EncryptedClientHello(ech) => match ech {
617 ECHClientHello::Outer(ech) => json!({
618 "id": extension.id().to_string(),
619 "data": {
620 "type": "outer",
621 "cipher_suite": {
622 "aead_id": ech.cipher_suite.aead_id.to_string(),
623 "kdf_id": ech.cipher_suite.kdf_id.to_string(),
624 },
625 "config_id": ech.config_id,
626 "enc": format!("0x{}", hex::encode(&ech.enc)),
627 "payload": format!("0x{}", hex::encode(&ech.payload)),
628 },
629 }),
630 ECHClientHello::Inner => json!({
631 "id": extension.id().to_string(),
632 "data": {
633 "type": "inner",
634 },
635 })
636
637 }
638 ClientHelloExtension::Opaque { id, data } => if data.is_empty() {
639 json!({
640 "id": id.to_string()
641 })
642 } else {
643 json!({
644 "id": id.to_string(),
645 "data": format!("0x{}", hex::encode(data))
646 })
647 },
648 }).collect::<Vec<_>>(),
649 "ja3": ja3,
650 "ja4": ja4,
651 })
652 });
653
654 #[cfg(not(any(feature = "rustls", feature = "boring")))]
655 let tls_info: Option<()> = None;
656
657 let mut h2 = None;
658 if parts.version == Version::HTTP_2 {
659 let initial_peer_settings = parts
660 .extensions
661 .get::<InitialPeerSettings>()
662 .map(|p| p.0.as_ref());
663
664 let pseudo_headers = parts.extensions.get::<PseudoHeaderOrder>();
665
666 let last_priority_params = parts
667 .extensions
668 .get::<LastPeerPriorityParams>()
669 .map(|p| p.0.dependency.clone());
670
671 h2 = Some(json!({
672 "settings": initial_peer_settings,
673 "pseudo_headers": pseudo_headers,
674 "last_priority_params": last_priority_params,
675 }));
676 }
677
678 Ok(Json(json!({
679 "ua": user_agent_info,
680 "http": {
681 "version": format!("{:?}", parts.version),
682 "scheme": scheme,
683 "method": format!("{:?}", parts.method),
684 "authority": authority,
685 "path": parts.uri.path().to_owned(),
686 "query": parts.uri.query().map(str::to_owned),
687 "h2": h2,
688 "headers": headers,
689 "payload": body,
690 "ja4h": ja4h,
691 },
692 "tls": tls_info,
693 "socket_addr": ctx.get::<Forwarded>()
694 .and_then(|f|
695 f.client_socket_addr().map(|addr| addr.to_string())
696 .or_else(|| f.client_ip().map(|ip| ip.to_string()))
697 ).or_else(|| ctx.get::<SocketInfo>().map(|v| v.peer_addr().to_string())),
698 }))
699 .into_response())
700 }
701}