diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index aed8a87851..f07474df6a 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -1,10 +1,11 @@ //! The Page Service listens for client connections and serves their GetPage@LSN //! requests. -use anyhow::Context; +use anyhow::{bail, Context}; use async_compression::tokio::write::GzipEncoder; use bytes::Buf; use futures::FutureExt; +use itertools::Itertools; use once_cell::sync::OnceCell; use pageserver_api::models::TenantState; use pageserver_api::models::{ @@ -1221,6 +1222,222 @@ impl PageServerHandler { } } +/// `basebackup tenant timeline [lsn] [--gzip] [--replica]` +#[derive(Debug, Clone, Eq, PartialEq)] +struct BaseBackupCmd { + tenant_id: TenantId, + timeline_id: TimelineId, + lsn: Option, + gzip: bool, + replica: bool, +} + +/// `fullbackup tenant timeline [lsn] [prev_lsn]` +#[derive(Debug, Clone, Eq, PartialEq)] +struct FullBackupCmd { + tenant_id: TenantId, + timeline_id: TimelineId, + lsn: Option, + prev_lsn: Option, +} + +/// `pagestream_v2 tenant timeline` +#[derive(Debug, Clone, Eq, PartialEq)] +struct PageStreamCmd { + tenant_id: TenantId, + timeline_id: TimelineId, +} + +/// `lease lsn tenant timeline lsn` +#[derive(Debug, Clone, Eq, PartialEq)] +struct LeaseLsnCmd { + tenant_shard_id: TenantShardId, + timeline_id: TimelineId, + lsn: Lsn, +} + +#[derive(Debug, Clone, Eq, PartialEq)] +enum PageServiceCmd { + Set, + PageStream(PageStreamCmd), + BaseBackup(BaseBackupCmd), + FullBackup(FullBackupCmd), + LeaseLsn(LeaseLsnCmd), +} + +impl PageStreamCmd { + fn parse(query: &str) -> anyhow::Result { + let parameters = query.split_whitespace().collect_vec(); + if parameters.len() != 2 { + bail!( + "invalid number of parameters for pagestream command: {}", + query + ); + } + let tenant_id = TenantId::from_str(parameters[0]) + .with_context(|| format!("Failed to parse tenant id from {}", parameters[0]))?; + let timeline_id = TimelineId::from_str(parameters[1]) + .with_context(|| format!("Failed to parse timeline id from {}", parameters[1]))?; + Ok(Self { + tenant_id, + timeline_id, + }) + } +} + +impl FullBackupCmd { + fn parse(query: &str) -> anyhow::Result { + let parameters = query.split_whitespace().collect_vec(); + if parameters.len() < 2 || parameters.len() > 4 { + bail!( + "invalid number of parameters for basebackup command: {}", + query + ); + } + let tenant_id = TenantId::from_str(parameters[0]) + .with_context(|| format!("Failed to parse tenant id from {}", parameters[0]))?; + let timeline_id = TimelineId::from_str(parameters[1]) + .with_context(|| format!("Failed to parse timeline id from {}", parameters[1]))?; + // The caller is responsible for providing correct lsn and prev_lsn. + let lsn = if let Some(lsn_str) = parameters.get(2) { + Some( + Lsn::from_str(lsn_str) + .with_context(|| format!("Failed to parse Lsn from {lsn_str}"))?, + ) + } else { + None + }; + let prev_lsn = if let Some(prev_lsn_str) = parameters.get(3) { + Some( + Lsn::from_str(prev_lsn_str) + .with_context(|| format!("Failed to parse Lsn from {prev_lsn_str}"))?, + ) + } else { + None + }; + Ok(Self { + tenant_id, + timeline_id, + lsn, + prev_lsn, + }) + } +} + +impl BaseBackupCmd { + fn parse(query: &str) -> anyhow::Result { + let parameters = query.split_whitespace().collect_vec(); + if parameters.len() < 2 { + bail!( + "invalid number of parameters for basebackup command: {}", + query + ); + } + let tenant_id = TenantId::from_str(parameters[0]) + .with_context(|| format!("Failed to parse tenant id from {}", parameters[0]))?; + let timeline_id = TimelineId::from_str(parameters[1]) + .with_context(|| format!("Failed to parse timeline id from {}", parameters[1]))?; + let lsn; + let flags_parse_from; + if let Some(maybe_lsn) = parameters.get(2) { + if *maybe_lsn == "latest" { + lsn = None; + flags_parse_from = 3; + } else if maybe_lsn.starts_with("--") { + lsn = None; + flags_parse_from = 2; + } else { + lsn = Some( + Lsn::from_str(maybe_lsn) + .with_context(|| format!("Failed to parse lsn from {maybe_lsn}"))?, + ); + flags_parse_from = 3; + } + } else { + lsn = None; + flags_parse_from = 2; + } + + let mut gzip = false; + let mut replica = false; + + for ¶m in ¶meters[flags_parse_from..] { + match param { + "--gzip" => { + if gzip { + bail!("duplicate parameter for basebackup command: {param}") + } + gzip = true + } + "--replica" => { + if replica { + bail!("duplicate parameter for basebackup command: {param}") + } + replica = true + } + _ => bail!("invalid parameter for basebackup command: {param}"), + } + } + Ok(Self { + tenant_id, + timeline_id, + lsn, + gzip, + replica, + }) + } +} + +impl LeaseLsnCmd { + fn parse(query: &str) -> anyhow::Result { + let parameters = query.split_whitespace().collect_vec(); + if parameters.len() != 3 { + bail!( + "invalid number of parameters for lease lsn command: {}", + query + ); + } + let tenant_shard_id = TenantShardId::from_str(parameters[0]) + .with_context(|| format!("Failed to parse tenant id from {}", parameters[0]))?; + let timeline_id = TimelineId::from_str(parameters[1]) + .with_context(|| format!("Failed to parse timeline id from {}", parameters[1]))?; + let lsn = Lsn::from_str(parameters[2]) + .with_context(|| format!("Failed to parse lsn from {}", parameters[2]))?; + Ok(Self { + tenant_shard_id, + timeline_id, + lsn, + }) + } +} + +impl PageServiceCmd { + fn parse(query: &str) -> anyhow::Result { + let query = query.trim(); + let Some((cmd, other)) = query.split_once(' ') else { + bail!("cannot parse query: {query}") + }; + match cmd.to_ascii_lowercase().as_str() { + "pagestream_v2" => Ok(Self::PageStream(PageStreamCmd::parse(other)?)), + "basebackup" => Ok(Self::BaseBackup(BaseBackupCmd::parse(other)?)), + "fullbackup" => Ok(Self::FullBackup(FullBackupCmd::parse(other)?)), + "lease" => { + let Some((cmd2, other)) = other.split_once(' ') else { + bail!("invalid lease command: {cmd}"); + }; + let cmd2 = cmd2.to_ascii_lowercase(); + if cmd2 == "lsn" { + Ok(Self::LeaseLsn(LeaseLsnCmd::parse(other)?)) + } else { + bail!("invalid lease command: {cmd}"); + } + } + "set" => Ok(Self::Set), + _ => Err(anyhow::anyhow!("unsupported command {cmd} in {query}")), + } + } +} + impl postgres_backend::Handler for PageServerHandler where IO: AsyncRead + AsyncWrite + Send + Sync + Unpin, @@ -1277,206 +1494,137 @@ where fail::fail_point!("ps::connection-start::process-query"); let ctx = self.connection_ctx.attached_child(); - debug!("process query {query_string:?}"); - let parts = query_string.split_whitespace().collect::>(); - if let Some(params) = parts.strip_prefix(&["pagestream_v2"]) { - if params.len() != 2 { - return Err(QueryError::Other(anyhow::anyhow!( - "invalid param number for pagestream command" - ))); - } - let tenant_id = TenantId::from_str(params[0]) - .with_context(|| format!("Failed to parse tenant id from {}", params[0]))?; - let timeline_id = TimelineId::from_str(params[1]) - .with_context(|| format!("Failed to parse timeline id from {}", params[1]))?; - - tracing::Span::current() - .record("tenant_id", field::display(tenant_id)) - .record("timeline_id", field::display(timeline_id)); - - self.check_permission(Some(tenant_id))?; - - COMPUTE_COMMANDS_COUNTERS - .for_command(ComputeCommandKind::PageStreamV2) - .inc(); - - self.handle_pagerequests( - pgb, + debug!("process query {query_string}"); + let query = PageServiceCmd::parse(query_string)?; + match query { + PageServiceCmd::PageStream(PageStreamCmd { tenant_id, timeline_id, - PagestreamProtocolVersion::V2, - ctx, - ) - .await?; - } else if let Some(params) = parts.strip_prefix(&["basebackup"]) { - if params.len() < 2 { - return Err(QueryError::Other(anyhow::anyhow!( - "invalid param number for basebackup command" - ))); + }) => { + tracing::Span::current() + .record("tenant_id", field::display(tenant_id)) + .record("timeline_id", field::display(timeline_id)); + + self.check_permission(Some(tenant_id))?; + + COMPUTE_COMMANDS_COUNTERS + .for_command(ComputeCommandKind::PageStreamV2) + .inc(); + + self.handle_pagerequests( + pgb, + tenant_id, + timeline_id, + PagestreamProtocolVersion::V2, + ctx, + ) + .await?; } + PageServiceCmd::BaseBackup(BaseBackupCmd { + tenant_id, + timeline_id, + lsn, + gzip, + replica, + }) => { + tracing::Span::current() + .record("tenant_id", field::display(tenant_id)) + .record("timeline_id", field::display(timeline_id)); - let tenant_id = TenantId::from_str(params[0]) - .with_context(|| format!("Failed to parse tenant id from {}", params[0]))?; - let timeline_id = TimelineId::from_str(params[1]) - .with_context(|| format!("Failed to parse timeline id from {}", params[1]))?; + self.check_permission(Some(tenant_id))?; - tracing::Span::current() - .record("tenant_id", field::display(tenant_id)) - .record("timeline_id", field::display(timeline_id)); - - self.check_permission(Some(tenant_id))?; - - COMPUTE_COMMANDS_COUNTERS - .for_command(ComputeCommandKind::Basebackup) - .inc(); - - let mut lsn = None; - let mut replica = false; - let mut gzip = false; - for param in ¶ms[2..] { - if param.starts_with("--") { - match *param { - "--gzip" => gzip = true, - "--replica" => replica = true, - _ => { - return Err(QueryError::Other(anyhow::anyhow!( - "Unknown parameter {param}", - ))) - } - } - } else { - lsn = Some( - Lsn::from_str(param) - .with_context(|| format!("Failed to parse Lsn from {param}"))?, - ); + COMPUTE_COMMANDS_COUNTERS + .for_command(ComputeCommandKind::Basebackup) + .inc(); + let metric_recording = metrics::BASEBACKUP_QUERY_TIME.start_recording(&ctx); + let res = async { + self.handle_basebackup_request( + pgb, + tenant_id, + timeline_id, + lsn, + None, + false, + gzip, + replica, + &ctx, + ) + .await?; + pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?; + Result::<(), QueryError>::Ok(()) } + .await; + metric_recording.observe(&res); + res?; } + // same as basebackup, but result includes relational data as well + PageServiceCmd::FullBackup(FullBackupCmd { + tenant_id, + timeline_id, + lsn, + prev_lsn, + }) => { + tracing::Span::current() + .record("tenant_id", field::display(tenant_id)) + .record("timeline_id", field::display(timeline_id)); - let metric_recording = metrics::BASEBACKUP_QUERY_TIME.start_recording(&ctx); - let res = async { + self.check_permission(Some(tenant_id))?; + + COMPUTE_COMMANDS_COUNTERS + .for_command(ComputeCommandKind::Fullbackup) + .inc(); + + // Check that the timeline exists self.handle_basebackup_request( pgb, tenant_id, timeline_id, lsn, - None, + prev_lsn, + true, + false, false, - gzip, - replica, &ctx, ) .await?; pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?; - Result::<(), QueryError>::Ok(()) } - .await; - metric_recording.observe(&res); - res?; - } - // same as basebackup, but result includes relational data as well - else if let Some(params) = parts.strip_prefix(&["fullbackup"]) { - if params.len() < 2 { - return Err(QueryError::Other(anyhow::anyhow!( - "invalid param number for fullbackup command" - ))); + PageServiceCmd::Set => { + // important because psycopg2 executes "SET datestyle TO 'ISO'" + // on connect + pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?; } - - let tenant_id = TenantId::from_str(params[0]) - .with_context(|| format!("Failed to parse tenant id from {}", params[0]))?; - let timeline_id = TimelineId::from_str(params[1]) - .with_context(|| format!("Failed to parse timeline id from {}", params[1]))?; - - tracing::Span::current() - .record("tenant_id", field::display(tenant_id)) - .record("timeline_id", field::display(timeline_id)); - - // The caller is responsible for providing correct lsn and prev_lsn. - let lsn = if let Some(lsn_str) = params.get(2) { - Some( - Lsn::from_str(lsn_str) - .with_context(|| format!("Failed to parse Lsn from {lsn_str}"))?, - ) - } else { - None - }; - let prev_lsn = if let Some(prev_lsn_str) = params.get(3) { - Some( - Lsn::from_str(prev_lsn_str) - .with_context(|| format!("Failed to parse Lsn from {prev_lsn_str}"))?, - ) - } else { - None - }; - - self.check_permission(Some(tenant_id))?; - - COMPUTE_COMMANDS_COUNTERS - .for_command(ComputeCommandKind::Fullbackup) - .inc(); - - // Check that the timeline exists - self.handle_basebackup_request( - pgb, - tenant_id, + PageServiceCmd::LeaseLsn(LeaseLsnCmd { + tenant_shard_id, timeline_id, lsn, - prev_lsn, - true, - false, - false, - &ctx, - ) - .await?; - pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?; - } else if query_string.to_ascii_lowercase().starts_with("set ") { - // important because psycopg2 executes "SET datestyle TO 'ISO'" - // on connect - pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?; - } else if query_string.starts_with("lease lsn ") { - let params = &parts[2..]; - if params.len() != 3 { - return Err(QueryError::Other(anyhow::anyhow!( - "invalid param number {} for lease lsn command", - params.len() - ))); + }) => { + tracing::Span::current() + .record("tenant_id", field::display(tenant_shard_id)) + .record("timeline_id", field::display(timeline_id)); + + self.check_permission(Some(tenant_shard_id.tenant_id))?; + + COMPUTE_COMMANDS_COUNTERS + .for_command(ComputeCommandKind::LeaseLsn) + .inc(); + + match self + .handle_make_lsn_lease(pgb, tenant_shard_id, timeline_id, lsn, &ctx) + .await + { + Ok(()) => { + pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))? + } + Err(e) => { + error!("error obtaining lsn lease for {lsn}: {e:?}"); + pgb.write_message_noflush(&BeMessage::ErrorResponse( + &e.to_string(), + Some(e.pg_error_code()), + ))? + } + }; } - - let tenant_shard_id = TenantShardId::from_str(params[0]) - .with_context(|| format!("Failed to parse tenant id from {}", params[0]))?; - let timeline_id = TimelineId::from_str(params[1]) - .with_context(|| format!("Failed to parse timeline id from {}", params[1]))?; - - tracing::Span::current() - .record("tenant_id", field::display(tenant_shard_id)) - .record("timeline_id", field::display(timeline_id)); - - self.check_permission(Some(tenant_shard_id.tenant_id))?; - - COMPUTE_COMMANDS_COUNTERS - .for_command(ComputeCommandKind::LeaseLsn) - .inc(); - - // The caller is responsible for providing correct lsn. - let lsn = Lsn::from_str(params[2]) - .with_context(|| format!("Failed to parse Lsn from {}", params[2]))?; - - match self - .handle_make_lsn_lease(pgb, tenant_shard_id, timeline_id, lsn, &ctx) - .await - { - Ok(()) => pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?, - Err(e) => { - error!("error obtaining lsn lease for {lsn}: {e:?}"); - pgb.write_message_noflush(&BeMessage::ErrorResponse( - &e.to_string(), - Some(e.pg_error_code()), - ))? - } - }; - } else { - return Err(QueryError::Other(anyhow::anyhow!( - "unknown command {query_string}" - ))); } Ok(()) @@ -1525,3 +1673,181 @@ fn set_tracing_field_shard_id(timeline: &Timeline) { ); debug_assert_current_span_has_tenant_and_timeline_id(); } + +#[cfg(test)] +mod tests { + use utils::shard::ShardCount; + + use super::*; + + #[test] + fn pageservice_cmd_parse() { + let tenant_id = TenantId::generate(); + let timeline_id = TimelineId::generate(); + let cmd = + PageServiceCmd::parse(&format!("pagestream_v2 {tenant_id} {timeline_id}")).unwrap(); + assert_eq!( + cmd, + PageServiceCmd::PageStream(PageStreamCmd { + tenant_id, + timeline_id + }) + ); + let cmd = PageServiceCmd::parse(&format!("basebackup {tenant_id} {timeline_id}")).unwrap(); + assert_eq!( + cmd, + PageServiceCmd::BaseBackup(BaseBackupCmd { + tenant_id, + timeline_id, + lsn: None, + gzip: false, + replica: false + }) + ); + let cmd = + PageServiceCmd::parse(&format!("basebackup {tenant_id} {timeline_id} --gzip")).unwrap(); + assert_eq!( + cmd, + PageServiceCmd::BaseBackup(BaseBackupCmd { + tenant_id, + timeline_id, + lsn: None, + gzip: true, + replica: false + }) + ); + let cmd = + PageServiceCmd::parse(&format!("basebackup {tenant_id} {timeline_id} latest")).unwrap(); + assert_eq!( + cmd, + PageServiceCmd::BaseBackup(BaseBackupCmd { + tenant_id, + timeline_id, + lsn: None, + gzip: false, + replica: false + }) + ); + let cmd = PageServiceCmd::parse(&format!("basebackup {tenant_id} {timeline_id} 0/16ABCDE")) + .unwrap(); + assert_eq!( + cmd, + PageServiceCmd::BaseBackup(BaseBackupCmd { + tenant_id, + timeline_id, + lsn: Some(Lsn::from_str("0/16ABCDE").unwrap()), + gzip: false, + replica: false + }) + ); + let cmd = PageServiceCmd::parse(&format!( + "basebackup {tenant_id} {timeline_id} --replica --gzip" + )) + .unwrap(); + assert_eq!( + cmd, + PageServiceCmd::BaseBackup(BaseBackupCmd { + tenant_id, + timeline_id, + lsn: None, + gzip: true, + replica: true + }) + ); + let cmd = PageServiceCmd::parse(&format!( + "basebackup {tenant_id} {timeline_id} 0/16ABCDE --replica --gzip" + )) + .unwrap(); + assert_eq!( + cmd, + PageServiceCmd::BaseBackup(BaseBackupCmd { + tenant_id, + timeline_id, + lsn: Some(Lsn::from_str("0/16ABCDE").unwrap()), + gzip: true, + replica: true + }) + ); + let cmd = PageServiceCmd::parse(&format!("fullbackup {tenant_id} {timeline_id}")).unwrap(); + assert_eq!( + cmd, + PageServiceCmd::FullBackup(FullBackupCmd { + tenant_id, + timeline_id, + lsn: None, + prev_lsn: None + }) + ); + let cmd = PageServiceCmd::parse(&format!( + "fullbackup {tenant_id} {timeline_id} 0/16ABCDE 0/16ABCDF" + )) + .unwrap(); + assert_eq!( + cmd, + PageServiceCmd::FullBackup(FullBackupCmd { + tenant_id, + timeline_id, + lsn: Some(Lsn::from_str("0/16ABCDE").unwrap()), + prev_lsn: Some(Lsn::from_str("0/16ABCDF").unwrap()), + }) + ); + let tenant_shard_id = TenantShardId::unsharded(tenant_id); + let cmd = PageServiceCmd::parse(&format!( + "lease lsn {tenant_shard_id} {timeline_id} 0/16ABCDE" + )) + .unwrap(); + assert_eq!( + cmd, + PageServiceCmd::LeaseLsn(LeaseLsnCmd { + tenant_shard_id, + timeline_id, + lsn: Lsn::from_str("0/16ABCDE").unwrap(), + }) + ); + let tenant_shard_id = TenantShardId::split(&tenant_shard_id, ShardCount(8))[1]; + let cmd = PageServiceCmd::parse(&format!( + "lease lsn {tenant_shard_id} {timeline_id} 0/16ABCDE" + )) + .unwrap(); + assert_eq!( + cmd, + PageServiceCmd::LeaseLsn(LeaseLsnCmd { + tenant_shard_id, + timeline_id, + lsn: Lsn::from_str("0/16ABCDE").unwrap(), + }) + ); + let cmd = PageServiceCmd::parse("set a = b").unwrap(); + assert_eq!(cmd, PageServiceCmd::Set); + let cmd = PageServiceCmd::parse("SET foo").unwrap(); + assert_eq!(cmd, PageServiceCmd::Set); + } + + #[test] + fn pageservice_cmd_err_handling() { + let tenant_id = TenantId::generate(); + let timeline_id = TimelineId::generate(); + let cmd = PageServiceCmd::parse("unknown_command"); + assert!(cmd.is_err()); + let cmd = PageServiceCmd::parse("pagestream_v2"); + assert!(cmd.is_err()); + let cmd = PageServiceCmd::parse(&format!("pagestream_v2 {tenant_id}xxx")); + assert!(cmd.is_err()); + let cmd = PageServiceCmd::parse(&format!("pagestream_v2 {tenant_id}xxx {timeline_id}xxx")); + assert!(cmd.is_err()); + let cmd = PageServiceCmd::parse(&format!( + "basebackup {tenant_id} {timeline_id} --gzip --gzip" + )); + assert!(cmd.is_err()); + let cmd = PageServiceCmd::parse(&format!( + "basebackup {tenant_id} {timeline_id} --gzip --unknown" + )); + assert!(cmd.is_err()); + let cmd = PageServiceCmd::parse(&format!( + "basebackup {tenant_id} {timeline_id} --gzip 0/16ABCDE" + )); + assert!(cmd.is_err()); + let cmd = PageServiceCmd::parse(&format!("lease {tenant_id} {timeline_id} gzip 0/16ABCDE")); + assert!(cmd.is_err()); + } +}