diff --git a/proxy/src/binary/local_proxy.rs b/proxy/src/binary/local_proxy.rs index ba10fce7b4..fd21173b0d 100644 --- a/proxy/src/binary/local_proxy.rs +++ b/proxy/src/binary/local_proxy.rs @@ -279,7 +279,6 @@ fn build_config(args: &LocalProxyCliArgs) -> anyhow::Result<&'static ProxyConfig }, proxy_protocol_v2: config::ProxyProtocolV2::Rejected, handshake_timeout: Duration::from_secs(10), - region: "local".into(), wake_compute_retry_config: RetryConfig::parse(RetryConfig::WAKE_COMPUTE_DEFAULT_VALUES)?, connect_compute_locks, connect_to_compute: compute_config, diff --git a/proxy/src/binary/pg_sni_router.rs b/proxy/src/binary/pg_sni_router.rs index a4f517fead..45c1b0de26 100644 --- a/proxy/src/binary/pg_sni_router.rs +++ b/proxy/src/binary/pg_sni_router.rs @@ -237,7 +237,6 @@ pub(super) async fn task_main( extra: None, }, crate::metrics::Protocol::SniRouter, - "sni", ); handle_client(ctx, dest_suffix, tls_config, compute_tls_config, socket).await } diff --git a/proxy/src/binary/proxy.rs b/proxy/src/binary/proxy.rs index d5d3508333..8cd30e2ccf 100644 --- a/proxy/src/binary/proxy.rs +++ b/proxy/src/binary/proxy.rs @@ -475,6 +475,7 @@ pub async fn run() -> anyhow::Result<()> { client_tasks.spawn(crate::context::parquet::worker( cancellation_token.clone(), args.parquet_upload, + args.region, )); // maintenance tasks. these never return unless there's an error @@ -665,7 +666,6 @@ fn build_config(args: &ProxyCliArgs) -> anyhow::Result<&'static ProxyConfig> { authentication_config, proxy_protocol_v2: args.proxy_protocol_v2, handshake_timeout: args.handshake_timeout, - region: args.region.clone(), wake_compute_retry_config: config::RetryConfig::parse(&args.wake_compute_retry)?, connect_compute_locks, connect_to_compute: compute_config, diff --git a/proxy/src/config.rs b/proxy/src/config.rs index 248584a19a..cee15ac7fa 100644 --- a/proxy/src/config.rs +++ b/proxy/src/config.rs @@ -22,7 +22,6 @@ pub struct ProxyConfig { pub http_config: HttpConfig, pub authentication_config: AuthenticationConfig, pub proxy_protocol_v2: ProxyProtocolV2, - pub region: String, pub handshake_timeout: Duration, pub wake_compute_retry_config: RetryConfig, pub connect_compute_locks: ApiLocks, diff --git a/proxy/src/console_redirect_proxy.rs b/proxy/src/console_redirect_proxy.rs index 324dcf5824..30e9ff3208 100644 --- a/proxy/src/console_redirect_proxy.rs +++ b/proxy/src/console_redirect_proxy.rs @@ -90,12 +90,7 @@ pub async fn task_main( } } - let ctx = RequestContext::new( - session_id, - conn_info, - crate::metrics::Protocol::Tcp, - &config.region, - ); + let ctx = RequestContext::new(session_id, conn_info, crate::metrics::Protocol::Tcp); let res = handle_client( config, diff --git a/proxy/src/context/mod.rs b/proxy/src/context/mod.rs index 24268997ba..df1c4e194a 100644 --- a/proxy/src/context/mod.rs +++ b/proxy/src/context/mod.rs @@ -46,7 +46,6 @@ struct RequestContextInner { pub(crate) session_id: Uuid, pub(crate) protocol: Protocol, first_packet: chrono::DateTime, - region: &'static str, pub(crate) span: Span, // filled in as they are discovered @@ -94,7 +93,6 @@ impl Clone for RequestContext { session_id: inner.session_id, protocol: inner.protocol, first_packet: inner.first_packet, - region: inner.region, span: info_span!("background_task"), project: inner.project, @@ -124,12 +122,7 @@ impl Clone for RequestContext { } impl RequestContext { - pub fn new( - session_id: Uuid, - conn_info: ConnectionInfo, - protocol: Protocol, - region: &'static str, - ) -> Self { + pub fn new(session_id: Uuid, conn_info: ConnectionInfo, protocol: Protocol) -> Self { // TODO: be careful with long lived spans let span = info_span!( "connect_request", @@ -145,7 +138,6 @@ impl RequestContext { session_id, protocol, first_packet: Utc::now(), - region, span, project: None, @@ -179,7 +171,7 @@ impl RequestContext { let ip = IpAddr::from([127, 0, 0, 1]); let addr = SocketAddr::new(ip, 5432); let conn_info = ConnectionInfo { addr, extra: None }; - RequestContext::new(Uuid::now_v7(), conn_info, Protocol::Tcp, "test") + RequestContext::new(Uuid::now_v7(), conn_info, Protocol::Tcp) } pub(crate) fn console_application_name(&self) -> String { diff --git a/proxy/src/context/parquet.rs b/proxy/src/context/parquet.rs index c9d3905abd..3a9d78f531 100644 --- a/proxy/src/context/parquet.rs +++ b/proxy/src/context/parquet.rs @@ -74,7 +74,7 @@ pub(crate) const FAILED_UPLOAD_MAX_RETRIES: u32 = 10; #[derive(parquet_derive::ParquetRecordWriter)] pub(crate) struct RequestData { - region: &'static str, + region: String, protocol: &'static str, /// Must be UTC. The derive macro doesn't like the timezones timestamp: chrono::NaiveDateTime, @@ -147,7 +147,7 @@ impl From<&RequestContextInner> for RequestData { }), jwt_issuer: value.jwt_issuer.clone(), protocol: value.protocol.as_str(), - region: value.region, + region: String::new(), error: value.error_kind.as_ref().map(|e| e.to_metric_label()), success: value.success, cold_start_info: value.cold_start_info.as_str(), @@ -167,6 +167,7 @@ impl From<&RequestContextInner> for RequestData { pub async fn worker( cancellation_token: CancellationToken, config: ParquetUploadArgs, + region: String, ) -> anyhow::Result<()> { let Some(remote_storage_config) = config.parquet_upload_remote_storage else { tracing::warn!("parquet request upload: no s3 bucket configured"); @@ -232,12 +233,17 @@ pub async fn worker( .context("remote storage for disconnect events init")?; let parquet_config_disconnect = parquet_config.clone(); tokio::try_join!( - worker_inner(storage, rx, parquet_config), - worker_inner(storage_disconnect, rx_disconnect, parquet_config_disconnect) + worker_inner(storage, rx, parquet_config, region.clone()), + worker_inner( + storage_disconnect, + rx_disconnect, + parquet_config_disconnect, + region + ) ) .map(|_| ()) } else { - worker_inner(storage, rx, parquet_config).await + worker_inner(storage, rx, parquet_config, region).await } } @@ -257,6 +263,7 @@ async fn worker_inner( storage: GenericRemoteStorage, rx: impl Stream, config: ParquetConfig, + region: String, ) -> anyhow::Result<()> { #[cfg(any(test, feature = "testing"))] let storage = if config.test_remote_failures > 0 { @@ -277,7 +284,8 @@ async fn worker_inner( let mut last_upload = time::Instant::now(); let mut len = 0; - while let Some(row) = rx.next().await { + while let Some(mut row) = rx.next().await { + row.region = region.clone(); rows.push(row); let force = last_upload.elapsed() > config.max_duration; if rows.len() == config.rows_per_group || force { @@ -533,7 +541,7 @@ mod tests { auth_method: None, jwt_issuer: None, protocol: ["tcp", "ws", "http"][rng.gen_range(0..3)], - region: "us-east-1", + region: String::new(), error: None, success: rng.r#gen(), cold_start_info: "no", @@ -565,7 +573,9 @@ mod tests { .await .unwrap(); - worker_inner(storage, rx, config).await.unwrap(); + worker_inner(storage, rx, config, "us-east-1".to_string()) + .await + .unwrap(); let mut files = WalkDir::new(tmpdir.as_std_path()) .into_iter() diff --git a/proxy/src/proxy/mod.rs b/proxy/src/proxy/mod.rs index 0e00c4f97e..cf6d3888cc 100644 --- a/proxy/src/proxy/mod.rs +++ b/proxy/src/proxy/mod.rs @@ -134,12 +134,7 @@ pub async fn task_main( } } - let ctx = RequestContext::new( - session_id, - conn_info, - crate::metrics::Protocol::Tcp, - &config.region, - ); + let ctx = RequestContext::new(session_id, conn_info, crate::metrics::Protocol::Tcp); let res = handle_client( config, diff --git a/proxy/src/serverless/mod.rs b/proxy/src/serverless/mod.rs index f6f681ac45..bca357b830 100644 --- a/proxy/src/serverless/mod.rs +++ b/proxy/src/serverless/mod.rs @@ -417,12 +417,7 @@ async fn request_handler( if config.http_config.accept_websockets && framed_websockets::upgrade::is_upgrade_request(&request) { - let ctx = RequestContext::new( - session_id, - conn_info, - crate::metrics::Protocol::Ws, - &config.region, - ); + let ctx = RequestContext::new(session_id, conn_info, crate::metrics::Protocol::Ws); ctx.set_user_agent( request @@ -462,12 +457,7 @@ async fn request_handler( // Return the response so the spawned future can continue. Ok(response.map(|b| b.map_err(|x| match x {}).boxed())) } else if request.uri().path() == "/sql" && *request.method() == Method::POST { - let ctx = RequestContext::new( - session_id, - conn_info, - crate::metrics::Protocol::Http, - &config.region, - ); + let ctx = RequestContext::new(session_id, conn_info, crate::metrics::Protocol::Http); let span = ctx.span(); let testodrome_id = request