1use crate::{
9 Context, Layer, Service,
10 cli::ForwardKind,
11 combinators::{Either3, Either7},
12 error::{BoxError, OpaqueError},
13 http::{
14 Request, Response, Version,
15 dep::http_body_util::BodyExt,
16 header::USER_AGENT,
17 headers::forwarded::{CFConnectingIp, ClientIp, TrueClientIp, XClientIp, XRealIp},
18 layer::{
19 forwarded::GetForwardedHeaderLayer,
20 required_header::AddRequiredResponseHeadersLayer,
21 trace::TraceLayer,
22 ua::{UserAgent, UserAgentClassifierLayer},
23 },
24 proto::h1::Http1HeaderMap,
25 proto::h2::PseudoHeaderOrder,
26 server::HttpServer,
27 },
28 layer::{ConsumeErrLayer, LimitLayer, TimeoutLayer, limit::policy::ConcurrentPolicy},
29 net::fingerprint::Ja4H,
30 net::forwarded::Forwarded,
31 net::http::RequestContext,
32 net::stream::{SocketInfo, layer::http::BodyLimitLayer},
33 proxy::haproxy::server::HaProxyLayer,
34 rt::Executor,
35 telemetry::tracing,
36 ua::profile::UserAgentDatabase,
37};
38#[cfg(any(feature = "rustls", feature = "boring"))]
39use crate::{
40 net::fingerprint::{Ja3, Ja4, PeetPrint},
41 net::tls::{
42 SecureTransport,
43 client::ClientHelloExtension,
44 client::{ECHClientHello, NegotiatedTlsParameters},
45 },
46};
47#[cfg(feature = "boring")]
48use crate::{
49 net::tls::server::ServerConfig,
50 tls::boring::server::{TlsAcceptorData, TlsAcceptorLayer},
51};
52use rama_http::service::web::{extract::Json, response::IntoResponse};
53use rama_http_backend::server::layer::upgrade::UpgradeLayer;
54use rama_http_core::h2::frame::EarlyFrameCapture;
55use rama_ws::handshake::server::{WebSocketAcceptor, WebSocketEchoService, WebSocketMatcher};
56use serde::Serialize;
57use serde_json::json;
58use std::{convert::Infallible, time::Duration};
59use tokio::net::TcpStream;
60
61#[cfg(all(feature = "rustls", not(feature = "boring")))]
62use crate::tls::rustls::server::{TlsAcceptorData, TlsAcceptorLayer};
63
64#[cfg(feature = "boring")]
65type TlsConfig = ServerConfig;
66
67#[cfg(all(feature = "rustls", not(feature = "boring")))]
68type TlsConfig = TlsAcceptorData;
69
70#[derive(Debug, Clone)]
71pub struct EchoServiceBuilder<H> {
74 concurrent_limit: usize,
75 body_limit: usize,
76 timeout: Duration,
77 forward: Option<ForwardKind>,
78
79 #[cfg(any(feature = "rustls", feature = "boring"))]
80 tls_server_config: Option<TlsConfig>,
81
82 http_version: Option<Version>,
83
84 ws_support: bool,
85
86 http_service_builder: H,
87
88 uadb: Option<std::sync::Arc<UserAgentDatabase>>,
89}
90
91impl Default for EchoServiceBuilder<()> {
92 fn default() -> Self {
93 Self {
94 concurrent_limit: 0,
95 body_limit: 1024 * 1024,
96 timeout: Duration::ZERO,
97 forward: None,
98
99 #[cfg(any(feature = "rustls", feature = "boring"))]
100 tls_server_config: None,
101
102 http_version: None,
103
104 ws_support: false,
105
106 http_service_builder: (),
107
108 uadb: None,
109 }
110 }
111}
112
113impl EchoServiceBuilder<()> {
114 #[must_use]
116 pub fn new() -> Self {
117 Self::default()
118 }
119}
120
121impl<H> EchoServiceBuilder<H> {
122 crate::utils::macros::generate_set_and_with! {
123 pub fn concurrent(mut self, limit: usize) -> Self {
127 self.concurrent_limit = limit;
128 self
129 }
130 }
131
132 crate::utils::macros::generate_set_and_with! {
133 pub fn body_limit(mut self, limit: usize) -> Self {
135 self.body_limit = limit;
136 self
137 }
138 }
139
140 crate::utils::macros::generate_set_and_with! {
141 pub fn timeout(mut self, timeout: Duration) -> Self {
145 self.timeout = timeout;
146 self
147 }
148 }
149
150 crate::utils::macros::generate_set_and_with! {
151 pub fn forward(mut self, kind: Option<ForwardKind>) -> Self {
163 self.forward = kind;
164 self
165 }
166 }
167
168 crate::utils::macros::generate_set_and_with! {
169 #[cfg(any(feature = "rustls", feature = "boring"))]
170 pub fn tls_server_config(mut self, cfg: Option<TlsConfig>) -> Self {
173 self.tls_server_config = cfg;
174 self
175 }
176 }
177
178 crate::utils::macros::generate_set_and_with! {
179 pub fn http_version(mut self, version: Option<Version>) -> Self {
181 self.http_version = version;
182 self
183 }
184 }
185
186 pub fn with_http_layer<H2>(self, layer: H2) -> EchoServiceBuilder<(H, H2)> {
188 EchoServiceBuilder {
189 concurrent_limit: self.concurrent_limit,
190 body_limit: self.body_limit,
191 timeout: self.timeout,
192 forward: self.forward,
193
194 #[cfg(any(feature = "rustls", feature = "boring"))]
195 tls_server_config: self.tls_server_config,
196
197 http_version: self.http_version,
198
199 ws_support: self.ws_support,
200
201 http_service_builder: (self.http_service_builder, layer),
202
203 uadb: self.uadb,
204 }
205 }
206
207 crate::utils::macros::generate_set_and_with! {
208 pub fn user_agent_database(
211 mut self,
212 db: Option<std::sync::Arc<UserAgentDatabase>>,
213 ) -> Self {
214 self.uadb = db;
215 self
216 }
217 }
218
219 crate::utils::macros::generate_set_and_with! {
220 pub fn ws_support(
222 mut self,
223 support: bool,
224 ) -> Self {
225 self.ws_support = support;
226 self
227 }
228 }
229}
230
231impl<H> EchoServiceBuilder<H>
232where
233 H: Layer<EchoService, Service: Service<Request, Response = Response, Error = BoxError>>,
234{
235 #[allow(unused_mut)]
236 pub fn build(
238 mut self,
239 executor: Executor,
240 ) -> Result<impl Service<TcpStream, Response = (), Error = Infallible>, BoxError> {
241 let tcp_forwarded_layer = match &self.forward {
242 Some(ForwardKind::HaProxy) => Some(HaProxyLayer::default()),
243 _ => None,
244 };
245
246 let http_service = self.build_http();
247
248 #[cfg(all(feature = "rustls", not(feature = "boring")))]
249 let tls_cfg = self.tls_server_config;
250
251 #[cfg(feature = "boring")]
252 let tls_cfg: Option<TlsAcceptorData> = match self.tls_server_config {
253 Some(cfg) => Some(cfg.try_into()?),
254 None => None,
255 };
256
257 let tcp_service_builder = (
258 ConsumeErrLayer::trace(tracing::Level::DEBUG),
259 (self.concurrent_limit > 0)
260 .then(|| LimitLayer::new(ConcurrentPolicy::max(self.concurrent_limit))),
261 (!self.timeout.is_zero()).then(|| TimeoutLayer::new(self.timeout)),
262 tcp_forwarded_layer,
263 BodyLimitLayer::request_only(self.body_limit),
264 #[cfg(any(feature = "rustls", feature = "boring"))]
265 tls_cfg.map(|cfg| {
266 #[cfg(feature = "boring")]
267 return TlsAcceptorLayer::new(cfg).with_store_client_hello(true);
268 #[cfg(all(feature = "rustls", not(feature = "boring")))]
269 TlsAcceptorLayer::new(cfg).with_store_client_hello(true)
270 }),
271 );
272
273 let http_transport_service = match self.http_version {
274 Some(Version::HTTP_2) => Either3::A({
275 let mut http = HttpServer::h2(executor);
276 if self.ws_support {
277 http.h2_mut().enable_connect_protocol();
278 }
279 http.service(http_service)
280 }),
281 Some(Version::HTTP_11 | Version::HTTP_10 | Version::HTTP_09) => {
282 Either3::B(HttpServer::http1().service(http_service))
283 }
284 Some(_) => {
285 return Err(OpaqueError::from_display("unsupported http version").into_boxed());
286 }
287 None => Either3::C({
288 let mut http = HttpServer::auto(executor);
289 if self.ws_support {
290 http.h2_mut().enable_connect_protocol();
291 }
292 http.service(http_service)
293 }),
294 };
295
296 Ok(tcp_service_builder.into_layer(http_transport_service))
297 }
298
299 pub fn build_http(
301 &self,
302 ) -> impl Service<Request, Response: IntoResponse, Error = Infallible> + use<H> {
303 let http_forwarded_layer = match &self.forward {
304 None | Some(ForwardKind::HaProxy) => None,
305 Some(ForwardKind::Forwarded) => Some(Either7::A(GetForwardedHeaderLayer::forwarded())),
306 Some(ForwardKind::XForwardedFor) => {
307 Some(Either7::B(GetForwardedHeaderLayer::x_forwarded_for()))
308 }
309 Some(ForwardKind::XClientIp) => {
310 Some(Either7::C(GetForwardedHeaderLayer::<XClientIp>::new()))
311 }
312 Some(ForwardKind::ClientIp) => {
313 Some(Either7::D(GetForwardedHeaderLayer::<ClientIp>::new()))
314 }
315 Some(ForwardKind::XRealIp) => {
316 Some(Either7::E(GetForwardedHeaderLayer::<XRealIp>::new()))
317 }
318 Some(ForwardKind::CFConnectingIp) => {
319 Some(Either7::F(GetForwardedHeaderLayer::<CFConnectingIp>::new()))
320 }
321 Some(ForwardKind::TrueClientIp) => {
322 Some(Either7::G(GetForwardedHeaderLayer::<TrueClientIp>::new()))
323 }
324 };
325
326 (
327 TraceLayer::new_for_http(),
328 AddRequiredResponseHeadersLayer::default(),
329 UserAgentClassifierLayer::new(),
330 ConsumeErrLayer::default(),
331 http_forwarded_layer,
332 self.ws_support.then(|| {
333 UpgradeLayer::new(
334 WebSocketMatcher::default(),
335 {
336 let acceptor = WebSocketAcceptor::default()
337 .with_protocols_flex(true)
338 .with_echo_protocols();
339
340 #[cfg(feature = "compression")]
341 {
342 acceptor.with_per_message_deflate_overwrite_extensions()
343 }
344 #[cfg(not(feature = "compression"))]
345 {
346 acceptor
347 }
348 },
349 ConsumeErrLayer::trace(tracing::Level::DEBUG)
350 .into_layer(WebSocketEchoService::default()),
351 )
352 }),
353 )
354 .into_layer(self.http_service_builder.layer(EchoService {
355 uadb: self.uadb.clone(),
356 }))
357 }
358}
359
360#[derive(Debug, Clone)]
361#[non_exhaustive]
362pub struct EchoService {
364 uadb: Option<std::sync::Arc<UserAgentDatabase>>,
365}
366
367impl Service<Request> for EchoService {
368 type Response = Response;
369 type Error = BoxError;
370
371 async fn serve(&self, mut ctx: Context, req: Request) -> Result<Self::Response, Self::Error> {
372 let user_agent_info = ctx
373 .get()
374 .map(|ua: &UserAgent| {
375 json!({
376 "user_agent": ua.header_str().to_owned(),
377 "kind": ua.info().map(|info| info.kind.to_string()),
378 "version": ua.info().and_then(|info| info.version),
379 "platform": ua.platform().map(|v| v.to_string()),
380 })
381 })
382 .unwrap_or_default();
383
384 let request_context =
385 ctx.get_or_try_insert_with_ctx::<RequestContext, _>(|ctx| (ctx, &req).try_into())?;
386 let authority = request_context.authority.to_string();
387 let scheme = request_context.protocol.to_string();
388
389 let ua_str = req
390 .headers()
391 .get(USER_AGENT)
392 .and_then(|h| h.to_str().ok())
393 .map(ToOwned::to_owned);
394 tracing::debug!(
395 user_agent.original = ua_str,
396 "echo request received from ua with ua header",
397 );
398
399 #[derive(Debug, Serialize)]
400 struct FingerprintProfileData {
401 hash: String,
402 verbose: String,
403 matched: bool,
404 }
405
406 let ja4h = Ja4H::compute(&req)
407 .inspect_err(|err| tracing::error!("ja4h compute failure: {err:?}"))
408 .ok()
409 .map(|ja4h| {
410 let mut profile_ja4h: Option<FingerprintProfileData> = None;
411
412 if let Some(uadb) = self.uadb.as_deref()
413 && let Some(profile) =
414 ua_str.as_deref().and_then(|s| uadb.get_exact_header_str(s))
415 {
416 let matched_ja4h = match req.version() {
417 Version::HTTP_10 | Version::HTTP_11 => profile
418 .http
419 .ja4h_h1_navigate(Some(req.method().clone()))
420 .inspect_err(|err| {
421 tracing::trace!(
422 "ja4h computation of matched profile for incoming h1 req: {err:?}"
423 )
424 })
425 .ok(),
426 Version::HTTP_2 => profile
427 .http
428 .ja4h_h2_navigate(Some(req.method().clone()))
429 .inspect_err(|err| {
430 tracing::trace!(
431 "ja4h computation of matched profile for incoming h2 req: {err:?}"
432 )
433 })
434 .ok(),
435 _ => None,
436 };
437 if let Some(tgt) = matched_ja4h {
438 let hash = format!("{tgt}");
439 let matched = format!("{ja4h}") == hash;
440 profile_ja4h = Some(FingerprintProfileData {
441 hash,
442 verbose: format!("{tgt:?}"),
443 matched,
444 });
445 }
446 }
447
448 json!({
449 "hash": format!("{ja4h}"),
450 "verbose": format!("{ja4h:?}"),
451 "profile": profile_ja4h,
452 })
453 });
454
455 let (mut parts, body) = req.into_parts();
456
457 let headers: Vec<_> = Http1HeaderMap::new(parts.headers, Some(&mut parts.extensions))
458 .into_iter()
459 .map(|(name, value)| {
460 (
461 name,
462 std::str::from_utf8(value.as_bytes())
463 .map(|s| s.to_owned())
464 .unwrap_or_else(|_| format!("0x{:x?}", value.as_bytes())),
465 )
466 })
467 .collect();
468
469 let body = body.collect().await.unwrap().to_bytes();
470 let body = hex::encode(body.as_ref());
471
472 #[cfg(any(feature = "rustls", feature = "boring"))]
473 let tls_info = ctx
474 .get::<SecureTransport>()
475 .and_then(|st| st.client_hello())
476 .map(|hello| {
477 let ja4 = Ja4::compute(ctx.extensions())
478 .inspect_err(|err| tracing::trace!("ja4 computation: {err:?}"))
479 .ok();
480
481 let mut profile_ja4: Option<FingerprintProfileData> = None;
482
483 if let Some(uadb) = self.uadb.as_deref()
484 && let Some(profile) =
485 ua_str.as_deref().and_then(|s| uadb.get_exact_header_str(s))
486 {
487 let matched_ja4 = profile
488 .tls
489 .compute_ja4(
490 ctx.get::<NegotiatedTlsParameters>()
491 .map(|param| param.protocol_version),
492 )
493 .inspect_err(|err| {
494 tracing::trace!("ja4 computation of matched profile: {err:?}")
495 })
496 .ok();
497 if let (Some(src), Some(tgt)) = (ja4.as_ref(), matched_ja4) {
498 let hash = format!("{tgt}");
499 let matched = format!("{src}") == hash;
500 profile_ja4 = Some(FingerprintProfileData {
501 hash,
502 verbose: format!("{tgt:?}"),
503 matched,
504 });
505 }
506 }
507
508 let ja4 = ja4.map(|ja4| {
509 json!({
510 "hash": format!("{ja4}"),
511 "verbose": format!("{ja4:?}"),
512 "profile": profile_ja4,
513 })
514 });
515
516 let ja3 = Ja3::compute(ctx.extensions())
517 .inspect_err(|err| tracing::trace!("ja3 computation: {err:?}"))
518 .ok();
519
520 let mut profile_ja3: Option<FingerprintProfileData> = None;
521
522 if let Some(uadb) = self.uadb.as_deref()
523 && let Some(profile) =
524 ua_str.as_deref().and_then(|s| uadb.get_exact_header_str(s))
525 {
526 let matched_ja3 = profile
527 .tls
528 .compute_ja3(
529 ctx.get::<NegotiatedTlsParameters>()
530 .map(|param| param.protocol_version),
531 )
532 .inspect_err(|err| {
533 tracing::trace!("ja3 computation of matched profile: {err:?}")
534 })
535 .ok();
536 if let (Some(src), Some(tgt)) = (ja3.as_ref(), matched_ja3) {
537 let hash = format!("{tgt:x}");
538 let matched = format!("{src:x}") == hash;
539 profile_ja3 = Some(FingerprintProfileData {
540 hash,
541 verbose: format!("{tgt}"),
542 matched,
543 });
544 }
545 }
546
547 let ja3 = ja3.map(|ja3| {
548 json!({
549 "hash": format!("{ja3:x}"),
550 "verbose": format!("{ja3}"),
551 "profile": profile_ja3,
552 })
553 });
554
555 let peet = PeetPrint::compute(ctx.extensions())
556 .inspect_err(|err| tracing::trace!("peet computation: {err:?}"))
557 .ok();
558
559 let mut profile_peet: Option<FingerprintProfileData> = None;
560
561 if let Some(uadb) = self.uadb.as_deref()
562 && let Some(profile) =
563 ua_str.as_deref().and_then(|s| uadb.get_exact_header_str(s))
564 {
565 let matched_peet = profile
566 .tls
567 .compute_peet()
568 .inspect_err(|err| {
569 tracing::trace!("peetprint computation of matched profile: {err:?}")
570 })
571 .ok();
572 if let (Some(src), Some(tgt)) = (peet.as_ref(), matched_peet) {
573 let hash = format!("{tgt}");
574 let matched = format!("{src}") == hash;
575 profile_peet = Some(FingerprintProfileData {
576 hash,
577 verbose: format!("{tgt:?}"),
578 matched,
579 });
580 }
581 }
582
583 let peet = peet.map(|peet| {
584 json!({
585 "hash": format!("{peet}"),
586 "verbose": format!("{peet:?}"),
587 "profile": profile_peet,
588 })
589 });
590
591 json!({
592 "header": {
593 "version": hello.protocol_version().to_string(),
594 "cipher_suites": hello
595 .cipher_suites().iter().map(|s| s.to_string()).collect::<Vec<_>>(),
596 "compression_algorithms": hello
597 .compression_algorithms().iter().map(|s| s.to_string()).collect::<Vec<_>>(),
598 },
599 "extensions": hello.extensions().iter().map(|extension| match extension {
600 ClientHelloExtension::ServerName(domain) => json!({
601 "id": extension.id().to_string(),
602 "data": domain,
603 }),
604 ClientHelloExtension::SignatureAlgorithms(v) => json!({
605 "id": extension.id().to_string(),
606 "data": v.iter().map(|s| s.to_string()).collect::<Vec<_>>(),
607 }),
608 ClientHelloExtension::SupportedVersions(v) => json!({
609 "id": extension.id().to_string(),
610 "data": v.iter().map(|s| s.to_string()).collect::<Vec<_>>(),
611 }),
612 ClientHelloExtension::ApplicationLayerProtocolNegotiation(v) => json!({
613 "id": extension.id().to_string(),
614 "data": v.iter().map(|s| s.to_string()).collect::<Vec<_>>(),
615 }),
616 ClientHelloExtension::ApplicationSettings(v) => json!({
617 "id": extension.id().to_string(),
618 "data": v.iter().map(|s| s.to_string()).collect::<Vec<_>>(),
619 }),
620 ClientHelloExtension::SupportedGroups(v) => json!({
621 "id": extension.id().to_string(),
622 "data": v.iter().map(|s| s.to_string()).collect::<Vec<_>>(),
623 }),
624 ClientHelloExtension::ECPointFormats(v) => json!({
625 "id": extension.id().to_string(),
626 "data": v.iter().map(|s| s.to_string()).collect::<Vec<_>>(),
627 }),
628 ClientHelloExtension::CertificateCompression(v) => json!({
629 "id": extension.id().to_string(),
630 "data": v.iter().map(|s| s.to_string()).collect::<Vec<_>>(),
631 }),
632 ClientHelloExtension::DelegatedCredentials(v) => json!({
633 "id": extension.id().to_string(),
634 "data": v.iter().map(|s| s.to_string()).collect::<Vec<_>>(),
635 }),
636 ClientHelloExtension::RecordSizeLimit(v) => json!({
637 "id": extension.id().to_string(),
638 "data": v.to_string(),
639 }),
640 ClientHelloExtension::EncryptedClientHello(ech) => match ech {
641 ECHClientHello::Outer(ech) => json!({
642 "id": extension.id().to_string(),
643 "data": {
644 "type": "outer",
645 "cipher_suite": {
646 "aead_id": ech.cipher_suite.aead_id.to_string(),
647 "kdf_id": ech.cipher_suite.kdf_id.to_string(),
648 },
649 "config_id": ech.config_id,
650 "enc": format!("0x{}", hex::encode(&ech.enc)),
651 "payload": format!("0x{}", hex::encode(&ech.payload)),
652 },
653 }),
654 ECHClientHello::Inner => json!({
655 "id": extension.id().to_string(),
656 "data": {
657 "type": "inner",
658 },
659 })
660
661 }
662 ClientHelloExtension::Opaque { id, data } => if data.is_empty() {
663 json!({
664 "id": id.to_string()
665 })
666 } else {
667 json!({
668 "id": id.to_string(),
669 "data": format!("0x{}", hex::encode(data))
670 })
671 },
672 }).collect::<Vec<_>>(),
673 "ja3": ja3,
674 "ja4": ja4,
675 "peet": peet
676 })
677 });
678
679 #[cfg(not(any(feature = "rustls", feature = "boring")))]
680 let tls_info: Option<()> = None;
681
682 let mut h2 = None;
683 if parts.version == Version::HTTP_2 {
684 let early_frames = parts.extensions.get::<EarlyFrameCapture>();
685 let pseudo_headers = parts.extensions.get::<PseudoHeaderOrder>();
686
687 h2 = Some(json!({
688 "early_frames": early_frames,
689 "pseudo_headers": pseudo_headers,
690 }));
691 }
692
693 Ok(Json(json!({
694 "ua": user_agent_info,
695 "http": {
696 "version": format!("{:?}", parts.version),
697 "scheme": scheme,
698 "method": format!("{:?}", parts.method),
699 "authority": authority,
700 "path": parts.uri.path().to_owned(),
701 "query": parts.uri.query().map(str::to_owned),
702 "h2": h2,
703 "headers": headers,
704 "payload": body,
705 "ja4h": ja4h,
706 },
707 "tls": tls_info,
708 "socket_addr": ctx.get::<Forwarded>()
709 .and_then(|f|
710 f.client_socket_addr().map(|addr| addr.to_string())
711 .or_else(|| f.client_ip().map(|ip| ip.to_string()))
712 ).or_else(|| ctx.get::<SocketInfo>().map(|v| v.peer_addr().to_string())),
713 }))
714 .into_response())
715 }
716}