mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-15 20:20:38 +00:00
refactor(pageserver): better pageservice command parsing (#9597)
close https://github.com/neondatabase/neon/issues/9460 ## Summary of changes A full rewrite of pagestream cmdline parsing to make it more robust and readable. --------- Signed-off-by: Alex Chi Z <chi@neon.tech> Co-authored-by: Arpad Müller <arpad-m@users.noreply.github.com>
This commit is contained in:
@@ -1,10 +1,11 @@
|
||||
//! The Page Service listens for client connections and serves their GetPage@LSN
|
||||
//! requests.
|
||||
|
||||
use anyhow::Context;
|
||||
use anyhow::{bail, Context};
|
||||
use async_compression::tokio::write::GzipEncoder;
|
||||
use bytes::Buf;
|
||||
use futures::FutureExt;
|
||||
use itertools::Itertools;
|
||||
use once_cell::sync::OnceCell;
|
||||
use pageserver_api::models::TenantState;
|
||||
use pageserver_api::models::{
|
||||
@@ -1221,6 +1222,222 @@ impl PageServerHandler {
|
||||
}
|
||||
}
|
||||
|
||||
/// `basebackup tenant timeline [lsn] [--gzip] [--replica]`
|
||||
#[derive(Debug, Clone, Eq, PartialEq)]
|
||||
struct BaseBackupCmd {
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
lsn: Option<Lsn>,
|
||||
gzip: bool,
|
||||
replica: bool,
|
||||
}
|
||||
|
||||
/// `fullbackup tenant timeline [lsn] [prev_lsn]`
|
||||
#[derive(Debug, Clone, Eq, PartialEq)]
|
||||
struct FullBackupCmd {
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
lsn: Option<Lsn>,
|
||||
prev_lsn: Option<Lsn>,
|
||||
}
|
||||
|
||||
/// `pagestream_v2 tenant timeline`
|
||||
#[derive(Debug, Clone, Eq, PartialEq)]
|
||||
struct PageStreamCmd {
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
}
|
||||
|
||||
/// `lease lsn tenant timeline lsn`
|
||||
#[derive(Debug, Clone, Eq, PartialEq)]
|
||||
struct LeaseLsnCmd {
|
||||
tenant_shard_id: TenantShardId,
|
||||
timeline_id: TimelineId,
|
||||
lsn: Lsn,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Eq, PartialEq)]
|
||||
enum PageServiceCmd {
|
||||
Set,
|
||||
PageStream(PageStreamCmd),
|
||||
BaseBackup(BaseBackupCmd),
|
||||
FullBackup(FullBackupCmd),
|
||||
LeaseLsn(LeaseLsnCmd),
|
||||
}
|
||||
|
||||
impl PageStreamCmd {
|
||||
fn parse(query: &str) -> anyhow::Result<Self> {
|
||||
let parameters = query.split_whitespace().collect_vec();
|
||||
if parameters.len() != 2 {
|
||||
bail!(
|
||||
"invalid number of parameters for pagestream command: {}",
|
||||
query
|
||||
);
|
||||
}
|
||||
let tenant_id = TenantId::from_str(parameters[0])
|
||||
.with_context(|| format!("Failed to parse tenant id from {}", parameters[0]))?;
|
||||
let timeline_id = TimelineId::from_str(parameters[1])
|
||||
.with_context(|| format!("Failed to parse timeline id from {}", parameters[1]))?;
|
||||
Ok(Self {
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl FullBackupCmd {
|
||||
fn parse(query: &str) -> anyhow::Result<Self> {
|
||||
let parameters = query.split_whitespace().collect_vec();
|
||||
if parameters.len() < 2 || parameters.len() > 4 {
|
||||
bail!(
|
||||
"invalid number of parameters for basebackup command: {}",
|
||||
query
|
||||
);
|
||||
}
|
||||
let tenant_id = TenantId::from_str(parameters[0])
|
||||
.with_context(|| format!("Failed to parse tenant id from {}", parameters[0]))?;
|
||||
let timeline_id = TimelineId::from_str(parameters[1])
|
||||
.with_context(|| format!("Failed to parse timeline id from {}", parameters[1]))?;
|
||||
// The caller is responsible for providing correct lsn and prev_lsn.
|
||||
let lsn = if let Some(lsn_str) = parameters.get(2) {
|
||||
Some(
|
||||
Lsn::from_str(lsn_str)
|
||||
.with_context(|| format!("Failed to parse Lsn from {lsn_str}"))?,
|
||||
)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
let prev_lsn = if let Some(prev_lsn_str) = parameters.get(3) {
|
||||
Some(
|
||||
Lsn::from_str(prev_lsn_str)
|
||||
.with_context(|| format!("Failed to parse Lsn from {prev_lsn_str}"))?,
|
||||
)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
Ok(Self {
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
lsn,
|
||||
prev_lsn,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl BaseBackupCmd {
|
||||
fn parse(query: &str) -> anyhow::Result<Self> {
|
||||
let parameters = query.split_whitespace().collect_vec();
|
||||
if parameters.len() < 2 {
|
||||
bail!(
|
||||
"invalid number of parameters for basebackup command: {}",
|
||||
query
|
||||
);
|
||||
}
|
||||
let tenant_id = TenantId::from_str(parameters[0])
|
||||
.with_context(|| format!("Failed to parse tenant id from {}", parameters[0]))?;
|
||||
let timeline_id = TimelineId::from_str(parameters[1])
|
||||
.with_context(|| format!("Failed to parse timeline id from {}", parameters[1]))?;
|
||||
let lsn;
|
||||
let flags_parse_from;
|
||||
if let Some(maybe_lsn) = parameters.get(2) {
|
||||
if *maybe_lsn == "latest" {
|
||||
lsn = None;
|
||||
flags_parse_from = 3;
|
||||
} else if maybe_lsn.starts_with("--") {
|
||||
lsn = None;
|
||||
flags_parse_from = 2;
|
||||
} else {
|
||||
lsn = Some(
|
||||
Lsn::from_str(maybe_lsn)
|
||||
.with_context(|| format!("Failed to parse lsn from {maybe_lsn}"))?,
|
||||
);
|
||||
flags_parse_from = 3;
|
||||
}
|
||||
} else {
|
||||
lsn = None;
|
||||
flags_parse_from = 2;
|
||||
}
|
||||
|
||||
let mut gzip = false;
|
||||
let mut replica = false;
|
||||
|
||||
for ¶m in ¶meters[flags_parse_from..] {
|
||||
match param {
|
||||
"--gzip" => {
|
||||
if gzip {
|
||||
bail!("duplicate parameter for basebackup command: {param}")
|
||||
}
|
||||
gzip = true
|
||||
}
|
||||
"--replica" => {
|
||||
if replica {
|
||||
bail!("duplicate parameter for basebackup command: {param}")
|
||||
}
|
||||
replica = true
|
||||
}
|
||||
_ => bail!("invalid parameter for basebackup command: {param}"),
|
||||
}
|
||||
}
|
||||
Ok(Self {
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
lsn,
|
||||
gzip,
|
||||
replica,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl LeaseLsnCmd {
|
||||
fn parse(query: &str) -> anyhow::Result<Self> {
|
||||
let parameters = query.split_whitespace().collect_vec();
|
||||
if parameters.len() != 3 {
|
||||
bail!(
|
||||
"invalid number of parameters for lease lsn command: {}",
|
||||
query
|
||||
);
|
||||
}
|
||||
let tenant_shard_id = TenantShardId::from_str(parameters[0])
|
||||
.with_context(|| format!("Failed to parse tenant id from {}", parameters[0]))?;
|
||||
let timeline_id = TimelineId::from_str(parameters[1])
|
||||
.with_context(|| format!("Failed to parse timeline id from {}", parameters[1]))?;
|
||||
let lsn = Lsn::from_str(parameters[2])
|
||||
.with_context(|| format!("Failed to parse lsn from {}", parameters[2]))?;
|
||||
Ok(Self {
|
||||
tenant_shard_id,
|
||||
timeline_id,
|
||||
lsn,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl PageServiceCmd {
|
||||
fn parse(query: &str) -> anyhow::Result<Self> {
|
||||
let query = query.trim();
|
||||
let Some((cmd, other)) = query.split_once(' ') else {
|
||||
bail!("cannot parse query: {query}")
|
||||
};
|
||||
match cmd.to_ascii_lowercase().as_str() {
|
||||
"pagestream_v2" => Ok(Self::PageStream(PageStreamCmd::parse(other)?)),
|
||||
"basebackup" => Ok(Self::BaseBackup(BaseBackupCmd::parse(other)?)),
|
||||
"fullbackup" => Ok(Self::FullBackup(FullBackupCmd::parse(other)?)),
|
||||
"lease" => {
|
||||
let Some((cmd2, other)) = other.split_once(' ') else {
|
||||
bail!("invalid lease command: {cmd}");
|
||||
};
|
||||
let cmd2 = cmd2.to_ascii_lowercase();
|
||||
if cmd2 == "lsn" {
|
||||
Ok(Self::LeaseLsn(LeaseLsnCmd::parse(other)?))
|
||||
} else {
|
||||
bail!("invalid lease command: {cmd}");
|
||||
}
|
||||
}
|
||||
"set" => Ok(Self::Set),
|
||||
_ => Err(anyhow::anyhow!("unsupported command {cmd} in {query}")),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<IO> postgres_backend::Handler<IO> for PageServerHandler
|
||||
where
|
||||
IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
|
||||
@@ -1277,206 +1494,137 @@ where
|
||||
fail::fail_point!("ps::connection-start::process-query");
|
||||
|
||||
let ctx = self.connection_ctx.attached_child();
|
||||
debug!("process query {query_string:?}");
|
||||
let parts = query_string.split_whitespace().collect::<Vec<_>>();
|
||||
if let Some(params) = parts.strip_prefix(&["pagestream_v2"]) {
|
||||
if params.len() != 2 {
|
||||
return Err(QueryError::Other(anyhow::anyhow!(
|
||||
"invalid param number for pagestream command"
|
||||
)));
|
||||
}
|
||||
let tenant_id = TenantId::from_str(params[0])
|
||||
.with_context(|| format!("Failed to parse tenant id from {}", params[0]))?;
|
||||
let timeline_id = TimelineId::from_str(params[1])
|
||||
.with_context(|| format!("Failed to parse timeline id from {}", params[1]))?;
|
||||
|
||||
tracing::Span::current()
|
||||
.record("tenant_id", field::display(tenant_id))
|
||||
.record("timeline_id", field::display(timeline_id));
|
||||
|
||||
self.check_permission(Some(tenant_id))?;
|
||||
|
||||
COMPUTE_COMMANDS_COUNTERS
|
||||
.for_command(ComputeCommandKind::PageStreamV2)
|
||||
.inc();
|
||||
|
||||
self.handle_pagerequests(
|
||||
pgb,
|
||||
debug!("process query {query_string}");
|
||||
let query = PageServiceCmd::parse(query_string)?;
|
||||
match query {
|
||||
PageServiceCmd::PageStream(PageStreamCmd {
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
PagestreamProtocolVersion::V2,
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
} else if let Some(params) = parts.strip_prefix(&["basebackup"]) {
|
||||
if params.len() < 2 {
|
||||
return Err(QueryError::Other(anyhow::anyhow!(
|
||||
"invalid param number for basebackup command"
|
||||
)));
|
||||
}) => {
|
||||
tracing::Span::current()
|
||||
.record("tenant_id", field::display(tenant_id))
|
||||
.record("timeline_id", field::display(timeline_id));
|
||||
|
||||
self.check_permission(Some(tenant_id))?;
|
||||
|
||||
COMPUTE_COMMANDS_COUNTERS
|
||||
.for_command(ComputeCommandKind::PageStreamV2)
|
||||
.inc();
|
||||
|
||||
self.handle_pagerequests(
|
||||
pgb,
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
PagestreamProtocolVersion::V2,
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
PageServiceCmd::BaseBackup(BaseBackupCmd {
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
lsn,
|
||||
gzip,
|
||||
replica,
|
||||
}) => {
|
||||
tracing::Span::current()
|
||||
.record("tenant_id", field::display(tenant_id))
|
||||
.record("timeline_id", field::display(timeline_id));
|
||||
|
||||
let tenant_id = TenantId::from_str(params[0])
|
||||
.with_context(|| format!("Failed to parse tenant id from {}", params[0]))?;
|
||||
let timeline_id = TimelineId::from_str(params[1])
|
||||
.with_context(|| format!("Failed to parse timeline id from {}", params[1]))?;
|
||||
self.check_permission(Some(tenant_id))?;
|
||||
|
||||
tracing::Span::current()
|
||||
.record("tenant_id", field::display(tenant_id))
|
||||
.record("timeline_id", field::display(timeline_id));
|
||||
|
||||
self.check_permission(Some(tenant_id))?;
|
||||
|
||||
COMPUTE_COMMANDS_COUNTERS
|
||||
.for_command(ComputeCommandKind::Basebackup)
|
||||
.inc();
|
||||
|
||||
let mut lsn = None;
|
||||
let mut replica = false;
|
||||
let mut gzip = false;
|
||||
for param in ¶ms[2..] {
|
||||
if param.starts_with("--") {
|
||||
match *param {
|
||||
"--gzip" => gzip = true,
|
||||
"--replica" => replica = true,
|
||||
_ => {
|
||||
return Err(QueryError::Other(anyhow::anyhow!(
|
||||
"Unknown parameter {param}",
|
||||
)))
|
||||
}
|
||||
}
|
||||
} else {
|
||||
lsn = Some(
|
||||
Lsn::from_str(param)
|
||||
.with_context(|| format!("Failed to parse Lsn from {param}"))?,
|
||||
);
|
||||
COMPUTE_COMMANDS_COUNTERS
|
||||
.for_command(ComputeCommandKind::Basebackup)
|
||||
.inc();
|
||||
let metric_recording = metrics::BASEBACKUP_QUERY_TIME.start_recording(&ctx);
|
||||
let res = async {
|
||||
self.handle_basebackup_request(
|
||||
pgb,
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
lsn,
|
||||
None,
|
||||
false,
|
||||
gzip,
|
||||
replica,
|
||||
&ctx,
|
||||
)
|
||||
.await?;
|
||||
pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
|
||||
Result::<(), QueryError>::Ok(())
|
||||
}
|
||||
.await;
|
||||
metric_recording.observe(&res);
|
||||
res?;
|
||||
}
|
||||
// same as basebackup, but result includes relational data as well
|
||||
PageServiceCmd::FullBackup(FullBackupCmd {
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
lsn,
|
||||
prev_lsn,
|
||||
}) => {
|
||||
tracing::Span::current()
|
||||
.record("tenant_id", field::display(tenant_id))
|
||||
.record("timeline_id", field::display(timeline_id));
|
||||
|
||||
let metric_recording = metrics::BASEBACKUP_QUERY_TIME.start_recording(&ctx);
|
||||
let res = async {
|
||||
self.check_permission(Some(tenant_id))?;
|
||||
|
||||
COMPUTE_COMMANDS_COUNTERS
|
||||
.for_command(ComputeCommandKind::Fullbackup)
|
||||
.inc();
|
||||
|
||||
// Check that the timeline exists
|
||||
self.handle_basebackup_request(
|
||||
pgb,
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
lsn,
|
||||
None,
|
||||
prev_lsn,
|
||||
true,
|
||||
false,
|
||||
false,
|
||||
gzip,
|
||||
replica,
|
||||
&ctx,
|
||||
)
|
||||
.await?;
|
||||
pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
|
||||
Result::<(), QueryError>::Ok(())
|
||||
}
|
||||
.await;
|
||||
metric_recording.observe(&res);
|
||||
res?;
|
||||
}
|
||||
// same as basebackup, but result includes relational data as well
|
||||
else if let Some(params) = parts.strip_prefix(&["fullbackup"]) {
|
||||
if params.len() < 2 {
|
||||
return Err(QueryError::Other(anyhow::anyhow!(
|
||||
"invalid param number for fullbackup command"
|
||||
)));
|
||||
PageServiceCmd::Set => {
|
||||
// important because psycopg2 executes "SET datestyle TO 'ISO'"
|
||||
// on connect
|
||||
pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
|
||||
}
|
||||
|
||||
let tenant_id = TenantId::from_str(params[0])
|
||||
.with_context(|| format!("Failed to parse tenant id from {}", params[0]))?;
|
||||
let timeline_id = TimelineId::from_str(params[1])
|
||||
.with_context(|| format!("Failed to parse timeline id from {}", params[1]))?;
|
||||
|
||||
tracing::Span::current()
|
||||
.record("tenant_id", field::display(tenant_id))
|
||||
.record("timeline_id", field::display(timeline_id));
|
||||
|
||||
// The caller is responsible for providing correct lsn and prev_lsn.
|
||||
let lsn = if let Some(lsn_str) = params.get(2) {
|
||||
Some(
|
||||
Lsn::from_str(lsn_str)
|
||||
.with_context(|| format!("Failed to parse Lsn from {lsn_str}"))?,
|
||||
)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
let prev_lsn = if let Some(prev_lsn_str) = params.get(3) {
|
||||
Some(
|
||||
Lsn::from_str(prev_lsn_str)
|
||||
.with_context(|| format!("Failed to parse Lsn from {prev_lsn_str}"))?,
|
||||
)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
self.check_permission(Some(tenant_id))?;
|
||||
|
||||
COMPUTE_COMMANDS_COUNTERS
|
||||
.for_command(ComputeCommandKind::Fullbackup)
|
||||
.inc();
|
||||
|
||||
// Check that the timeline exists
|
||||
self.handle_basebackup_request(
|
||||
pgb,
|
||||
tenant_id,
|
||||
PageServiceCmd::LeaseLsn(LeaseLsnCmd {
|
||||
tenant_shard_id,
|
||||
timeline_id,
|
||||
lsn,
|
||||
prev_lsn,
|
||||
true,
|
||||
false,
|
||||
false,
|
||||
&ctx,
|
||||
)
|
||||
.await?;
|
||||
pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
|
||||
} else if query_string.to_ascii_lowercase().starts_with("set ") {
|
||||
// important because psycopg2 executes "SET datestyle TO 'ISO'"
|
||||
// on connect
|
||||
pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
|
||||
} else if query_string.starts_with("lease lsn ") {
|
||||
let params = &parts[2..];
|
||||
if params.len() != 3 {
|
||||
return Err(QueryError::Other(anyhow::anyhow!(
|
||||
"invalid param number {} for lease lsn command",
|
||||
params.len()
|
||||
)));
|
||||
}) => {
|
||||
tracing::Span::current()
|
||||
.record("tenant_id", field::display(tenant_shard_id))
|
||||
.record("timeline_id", field::display(timeline_id));
|
||||
|
||||
self.check_permission(Some(tenant_shard_id.tenant_id))?;
|
||||
|
||||
COMPUTE_COMMANDS_COUNTERS
|
||||
.for_command(ComputeCommandKind::LeaseLsn)
|
||||
.inc();
|
||||
|
||||
match self
|
||||
.handle_make_lsn_lease(pgb, tenant_shard_id, timeline_id, lsn, &ctx)
|
||||
.await
|
||||
{
|
||||
Ok(()) => {
|
||||
pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?
|
||||
}
|
||||
Err(e) => {
|
||||
error!("error obtaining lsn lease for {lsn}: {e:?}");
|
||||
pgb.write_message_noflush(&BeMessage::ErrorResponse(
|
||||
&e.to_string(),
|
||||
Some(e.pg_error_code()),
|
||||
))?
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
let tenant_shard_id = TenantShardId::from_str(params[0])
|
||||
.with_context(|| format!("Failed to parse tenant id from {}", params[0]))?;
|
||||
let timeline_id = TimelineId::from_str(params[1])
|
||||
.with_context(|| format!("Failed to parse timeline id from {}", params[1]))?;
|
||||
|
||||
tracing::Span::current()
|
||||
.record("tenant_id", field::display(tenant_shard_id))
|
||||
.record("timeline_id", field::display(timeline_id));
|
||||
|
||||
self.check_permission(Some(tenant_shard_id.tenant_id))?;
|
||||
|
||||
COMPUTE_COMMANDS_COUNTERS
|
||||
.for_command(ComputeCommandKind::LeaseLsn)
|
||||
.inc();
|
||||
|
||||
// The caller is responsible for providing correct lsn.
|
||||
let lsn = Lsn::from_str(params[2])
|
||||
.with_context(|| format!("Failed to parse Lsn from {}", params[2]))?;
|
||||
|
||||
match self
|
||||
.handle_make_lsn_lease(pgb, tenant_shard_id, timeline_id, lsn, &ctx)
|
||||
.await
|
||||
{
|
||||
Ok(()) => pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?,
|
||||
Err(e) => {
|
||||
error!("error obtaining lsn lease for {lsn}: {e:?}");
|
||||
pgb.write_message_noflush(&BeMessage::ErrorResponse(
|
||||
&e.to_string(),
|
||||
Some(e.pg_error_code()),
|
||||
))?
|
||||
}
|
||||
};
|
||||
} else {
|
||||
return Err(QueryError::Other(anyhow::anyhow!(
|
||||
"unknown command {query_string}"
|
||||
)));
|
||||
}
|
||||
|
||||
Ok(())
|
||||
@@ -1525,3 +1673,181 @@ fn set_tracing_field_shard_id(timeline: &Timeline) {
|
||||
);
|
||||
debug_assert_current_span_has_tenant_and_timeline_id();
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use utils::shard::ShardCount;
|
||||
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn pageservice_cmd_parse() {
|
||||
let tenant_id = TenantId::generate();
|
||||
let timeline_id = TimelineId::generate();
|
||||
let cmd =
|
||||
PageServiceCmd::parse(&format!("pagestream_v2 {tenant_id} {timeline_id}")).unwrap();
|
||||
assert_eq!(
|
||||
cmd,
|
||||
PageServiceCmd::PageStream(PageStreamCmd {
|
||||
tenant_id,
|
||||
timeline_id
|
||||
})
|
||||
);
|
||||
let cmd = PageServiceCmd::parse(&format!("basebackup {tenant_id} {timeline_id}")).unwrap();
|
||||
assert_eq!(
|
||||
cmd,
|
||||
PageServiceCmd::BaseBackup(BaseBackupCmd {
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
lsn: None,
|
||||
gzip: false,
|
||||
replica: false
|
||||
})
|
||||
);
|
||||
let cmd =
|
||||
PageServiceCmd::parse(&format!("basebackup {tenant_id} {timeline_id} --gzip")).unwrap();
|
||||
assert_eq!(
|
||||
cmd,
|
||||
PageServiceCmd::BaseBackup(BaseBackupCmd {
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
lsn: None,
|
||||
gzip: true,
|
||||
replica: false
|
||||
})
|
||||
);
|
||||
let cmd =
|
||||
PageServiceCmd::parse(&format!("basebackup {tenant_id} {timeline_id} latest")).unwrap();
|
||||
assert_eq!(
|
||||
cmd,
|
||||
PageServiceCmd::BaseBackup(BaseBackupCmd {
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
lsn: None,
|
||||
gzip: false,
|
||||
replica: false
|
||||
})
|
||||
);
|
||||
let cmd = PageServiceCmd::parse(&format!("basebackup {tenant_id} {timeline_id} 0/16ABCDE"))
|
||||
.unwrap();
|
||||
assert_eq!(
|
||||
cmd,
|
||||
PageServiceCmd::BaseBackup(BaseBackupCmd {
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
lsn: Some(Lsn::from_str("0/16ABCDE").unwrap()),
|
||||
gzip: false,
|
||||
replica: false
|
||||
})
|
||||
);
|
||||
let cmd = PageServiceCmd::parse(&format!(
|
||||
"basebackup {tenant_id} {timeline_id} --replica --gzip"
|
||||
))
|
||||
.unwrap();
|
||||
assert_eq!(
|
||||
cmd,
|
||||
PageServiceCmd::BaseBackup(BaseBackupCmd {
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
lsn: None,
|
||||
gzip: true,
|
||||
replica: true
|
||||
})
|
||||
);
|
||||
let cmd = PageServiceCmd::parse(&format!(
|
||||
"basebackup {tenant_id} {timeline_id} 0/16ABCDE --replica --gzip"
|
||||
))
|
||||
.unwrap();
|
||||
assert_eq!(
|
||||
cmd,
|
||||
PageServiceCmd::BaseBackup(BaseBackupCmd {
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
lsn: Some(Lsn::from_str("0/16ABCDE").unwrap()),
|
||||
gzip: true,
|
||||
replica: true
|
||||
})
|
||||
);
|
||||
let cmd = PageServiceCmd::parse(&format!("fullbackup {tenant_id} {timeline_id}")).unwrap();
|
||||
assert_eq!(
|
||||
cmd,
|
||||
PageServiceCmd::FullBackup(FullBackupCmd {
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
lsn: None,
|
||||
prev_lsn: None
|
||||
})
|
||||
);
|
||||
let cmd = PageServiceCmd::parse(&format!(
|
||||
"fullbackup {tenant_id} {timeline_id} 0/16ABCDE 0/16ABCDF"
|
||||
))
|
||||
.unwrap();
|
||||
assert_eq!(
|
||||
cmd,
|
||||
PageServiceCmd::FullBackup(FullBackupCmd {
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
lsn: Some(Lsn::from_str("0/16ABCDE").unwrap()),
|
||||
prev_lsn: Some(Lsn::from_str("0/16ABCDF").unwrap()),
|
||||
})
|
||||
);
|
||||
let tenant_shard_id = TenantShardId::unsharded(tenant_id);
|
||||
let cmd = PageServiceCmd::parse(&format!(
|
||||
"lease lsn {tenant_shard_id} {timeline_id} 0/16ABCDE"
|
||||
))
|
||||
.unwrap();
|
||||
assert_eq!(
|
||||
cmd,
|
||||
PageServiceCmd::LeaseLsn(LeaseLsnCmd {
|
||||
tenant_shard_id,
|
||||
timeline_id,
|
||||
lsn: Lsn::from_str("0/16ABCDE").unwrap(),
|
||||
})
|
||||
);
|
||||
let tenant_shard_id = TenantShardId::split(&tenant_shard_id, ShardCount(8))[1];
|
||||
let cmd = PageServiceCmd::parse(&format!(
|
||||
"lease lsn {tenant_shard_id} {timeline_id} 0/16ABCDE"
|
||||
))
|
||||
.unwrap();
|
||||
assert_eq!(
|
||||
cmd,
|
||||
PageServiceCmd::LeaseLsn(LeaseLsnCmd {
|
||||
tenant_shard_id,
|
||||
timeline_id,
|
||||
lsn: Lsn::from_str("0/16ABCDE").unwrap(),
|
||||
})
|
||||
);
|
||||
let cmd = PageServiceCmd::parse("set a = b").unwrap();
|
||||
assert_eq!(cmd, PageServiceCmd::Set);
|
||||
let cmd = PageServiceCmd::parse("SET foo").unwrap();
|
||||
assert_eq!(cmd, PageServiceCmd::Set);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn pageservice_cmd_err_handling() {
|
||||
let tenant_id = TenantId::generate();
|
||||
let timeline_id = TimelineId::generate();
|
||||
let cmd = PageServiceCmd::parse("unknown_command");
|
||||
assert!(cmd.is_err());
|
||||
let cmd = PageServiceCmd::parse("pagestream_v2");
|
||||
assert!(cmd.is_err());
|
||||
let cmd = PageServiceCmd::parse(&format!("pagestream_v2 {tenant_id}xxx"));
|
||||
assert!(cmd.is_err());
|
||||
let cmd = PageServiceCmd::parse(&format!("pagestream_v2 {tenant_id}xxx {timeline_id}xxx"));
|
||||
assert!(cmd.is_err());
|
||||
let cmd = PageServiceCmd::parse(&format!(
|
||||
"basebackup {tenant_id} {timeline_id} --gzip --gzip"
|
||||
));
|
||||
assert!(cmd.is_err());
|
||||
let cmd = PageServiceCmd::parse(&format!(
|
||||
"basebackup {tenant_id} {timeline_id} --gzip --unknown"
|
||||
));
|
||||
assert!(cmd.is_err());
|
||||
let cmd = PageServiceCmd::parse(&format!(
|
||||
"basebackup {tenant_id} {timeline_id} --gzip 0/16ABCDE"
|
||||
));
|
||||
assert!(cmd.is_err());
|
||||
let cmd = PageServiceCmd::parse(&format!("lease {tenant_id} {timeline_id} gzip 0/16ABCDE"));
|
||||
assert!(cmd.is_err());
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user