diff --git a/Cargo.lock b/Cargo.lock index 8ac198c364..ed11d7c8a3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5161,6 +5161,7 @@ dependencies = [ "itertools 0.10.5", "metrics", "once_cell", + "pageserver_api", "parking_lot 0.12.1", "postgres", "postgres-protocol", diff --git a/safekeeper/Cargo.toml b/safekeeper/Cargo.toml index 85561e4aff..97b7311317 100644 --- a/safekeeper/Cargo.toml +++ b/safekeeper/Cargo.toml @@ -28,6 +28,7 @@ hyper0.workspace = true futures.workspace = true once_cell.workspace = true parking_lot.workspace = true +pageserver_api.workspace = true postgres.workspace = true postgres-protocol.workspace = true rand.workspace = true diff --git a/safekeeper/src/handler.rs b/safekeeper/src/handler.rs index 3f00b69cde..1784a9428c 100644 --- a/safekeeper/src/handler.rs +++ b/safekeeper/src/handler.rs @@ -2,11 +2,14 @@ //! protocol commands. use anyhow::Context; +use pageserver_api::shard::{ShardIdentity, ShardStripeSize}; use std::future::Future; use std::str::{self, FromStr}; use std::sync::Arc; use tokio::io::{AsyncRead, AsyncWrite}; use tracing::{debug, info, info_span, Instrument}; +use utils::postgres_client::PostgresClientProtocol; +use utils::shard::{ShardCount, ShardNumber}; use crate::auth::check_permission; use crate::json_ctrl::{handle_json_ctrl, AppendLogicalMessage}; @@ -35,6 +38,8 @@ pub struct SafekeeperPostgresHandler { pub tenant_id: Option, pub timeline_id: Option, pub ttid: TenantTimelineId, + pub shard: Option, + pub protocol: Option, /// Unique connection id is logged in spans for observability. pub conn_id: ConnectionId, /// Auth scope allowed on the connections and public key used to check auth tokens. None if auth is not configured. @@ -107,11 +112,28 @@ impl postgres_backend::Handler ) -> Result<(), QueryError> { if let FeStartupPacket::StartupMessage { params, .. } = sm { if let Some(options) = params.options_raw() { + let mut shard_count: Option = None; + let mut shard_number: Option = None; + let mut shard_stripe_size: Option = None; + for opt in options { // FIXME `ztenantid` and `ztimelineid` left for compatibility during deploy, // remove these after the PR gets deployed: // https://github.com/neondatabase/neon/pull/2433#discussion_r970005064 match opt.split_once('=') { + Some(("protocol", value)) => { + let raw_value = value + .parse::() + .with_context(|| format!("Failed to parse {value} as protocol"))?; + + self.protocol = Some( + PostgresClientProtocol::try_from(raw_value).map_err(|_| { + QueryError::Other(anyhow::anyhow!( + "Unexpected client protocol type: {raw_value}" + )) + })?, + ); + } Some(("ztenantid", value)) | Some(("tenant_id", value)) => { self.tenant_id = Some(value.parse().with_context(|| { format!("Failed to parse {value} as tenant id") @@ -127,9 +149,47 @@ impl postgres_backend::Handler metrics.set_client_az(client_az) } } + Some(("shard_count", value)) => { + shard_count = Some(value.parse::().with_context(|| { + format!("Failed to parse {value} as shard count") + })?); + } + Some(("shard_number", value)) => { + shard_number = Some(value.parse::().with_context(|| { + format!("Failed to parse {value} as shard number") + })?); + } + Some(("shard_stripe_size", value)) => { + shard_stripe_size = Some(value.parse::().with_context(|| { + format!("Failed to parse {value} as shard stripe size") + })?); + } _ => continue, } } + + if self.protocol == Some(PostgresClientProtocol::Interpreted) { + match (shard_count, shard_number, shard_stripe_size) { + (Some(0), _, _) => { + self.shard = Some(ShardIdentity::unsharded()); + } + (Some(count), Some(number), Some(stripe_size)) => { + self.shard = Some( + ShardIdentity::new( + ShardNumber(number), + ShardCount(count), + ShardStripeSize(stripe_size), + ) + .with_context(|| "Failed to create shard identity")?, + ); + } + _ => { + return Err(QueryError::Other(anyhow::anyhow!( + "Shard params were not specified" + ))); + } + } + } } if let Some(app_name) = params.get("application_name") { @@ -150,6 +210,11 @@ impl postgres_backend::Handler tracing::field::debug(self.appname.clone()), ); + if let Some(shard) = self.shard.as_ref() { + tracing::Span::current() + .record("shard", tracing::field::display(shard.shard_slug())); + } + Ok(()) } else { Err(QueryError::Other(anyhow::anyhow!( @@ -258,6 +323,8 @@ impl SafekeeperPostgresHandler { tenant_id: None, timeline_id: None, ttid: TenantTimelineId::empty(), + shard: None, + protocol: None, conn_id, claims: None, auth, @@ -265,6 +332,10 @@ impl SafekeeperPostgresHandler { } } + pub fn protocol(&self) -> PostgresClientProtocol { + self.protocol.unwrap_or(PostgresClientProtocol::Vanilla) + } + // when accessing management api supply None as an argument // when using to authorize tenant pass corresponding tenant id fn check_permission(&self, tenant_id: Option) -> Result<(), QueryError> {