From f49990ed433616270a7db33c3d554d9ed4cf4135 Mon Sep 17 00:00:00 2001 From: Kirill Bulatov Date: Mon, 14 Feb 2022 00:53:00 +0200 Subject: [PATCH] Allow creating timelines by branching off ancestors --- control_plane/src/compute.rs | 81 +---- control_plane/src/local_env.rs | 16 +- control_plane/src/storage.rs | 4 +- pageserver/src/http/models.rs | 9 +- pageserver/src/http/routes.rs | 58 +--- .../src/remote_storage/storage_sync/index.rs | 2 +- pageserver/src/repository.rs | 2 +- pageserver/src/tenant_mgr.rs | 6 +- pageserver/src/timelines.rs | 297 ++++++++++------ test_runner/batch_others/test_auth.py | 19 +- .../batch_others/test_branch_behind.py | 33 +- .../batch_others/test_clog_truncate.py | 15 +- test_runner/batch_others/test_config.py | 8 +- test_runner/batch_others/test_createdropdb.py | 27 +- test_runner/batch_others/test_createuser.py | 11 +- .../batch_others/test_gc_aggressive.py | 5 +- test_runner/batch_others/test_multixact.py | 10 +- .../batch_others/test_old_request_lsn.py | 4 +- .../batch_others/test_pageserver_api.py | 14 +- .../batch_others/test_pageserver_catchup.py | 8 +- .../batch_others/test_pageserver_restart.py | 4 +- .../batch_others/test_parallel_copy.py | 6 +- test_runner/batch_others/test_pgbench.py | 4 +- .../batch_others/test_readonly_node.py | 18 +- .../batch_others/test_restart_compute.py | 11 +- test_runner/batch_others/test_snapfiles_gc.py | 4 +- test_runner/batch_others/test_subxacts.py | 4 +- .../batch_others/test_tenant_relocation.py | 12 +- test_runner/batch_others/test_tenants.py | 22 +- .../batch_others/test_timeline_size.py | 17 +- test_runner/batch_others/test_twophase.py | 11 +- test_runner/batch_others/test_vm_bits.py | 10 +- test_runner/batch_others/test_wal_acceptor.py | 86 ++--- .../batch_others/test_wal_acceptor_async.py | 5 +- test_runner/batch_others/test_zenith_cli.py | 60 ++-- .../batch_pg_regress/test_isolation.py | 6 +- .../batch_pg_regress/test_pg_regress.py | 4 +- .../batch_pg_regress/test_zenith_regress.py | 4 +- test_runner/fixtures/compare_fixtures.py | 5 +- test_runner/fixtures/zenith_fixtures.py | 243 +++++++------ .../performance/test_bulk_tenant_create.py | 11 +- .../performance/test_parallel_copy_to.py | 1 - test_runner/test_broken.py | 4 +- zenith/src/main.rs | 327 ++++++++++++------ 44 files changed, 855 insertions(+), 653 deletions(-) diff --git a/control_plane/src/compute.rs b/control_plane/src/compute.rs index 3381ca4a04..5d225a67fa 100644 --- a/control_plane/src/compute.rs +++ b/control_plane/src/compute.rs @@ -10,7 +10,7 @@ use std::str::FromStr; use std::sync::Arc; use std::time::Duration; -use anyhow::{bail, Context, Result}; +use anyhow::{Context, Result}; use zenith_utils::connstring::connection_host_port; use zenith_utils::lsn::Lsn; use zenith_utils::postgres_backend::AuthType; @@ -37,7 +37,7 @@ impl ComputeControlPlane { // pgdatadirs // |- tenants // | |- - // | | |- + // | | |- pub fn load(env: LocalEnv) -> Result { let pageserver = Arc::new(PageServerNode::from_env(&env)); @@ -52,7 +52,7 @@ impl ComputeControlPlane { .with_context(|| format!("failed to list {}", tenant_dir.path().display()))? { let node = PostgresNode::from_dir_entry(timeline_dir?, &env, &pageserver)?; - nodes.insert((node.tenantid, node.name.clone()), Arc::new(node)); + nodes.insert((node.tenant_id, node.name.clone()), Arc::new(node)); } } @@ -75,17 +75,12 @@ impl ComputeControlPlane { pub fn new_node( &mut self, - tenantid: ZTenantId, + tenant_id: ZTenantId, name: &str, - timeline_spec: Option<&str>, + timeline_id: ZTimelineId, + lsn: Option, port: Option, ) -> Result> { - // Resolve the human-readable timeline spec into timeline ID and LSN - let (timelineid, lsn) = match timeline_spec { - Some(timeline_spec) => parse_point_in_time(timeline_spec)?, - None => (ZTimelineId::generate(), None), - }; - let port = port.unwrap_or_else(|| self.get_port()); let node = Arc::new(PostgresNode { name: name.to_owned(), @@ -93,9 +88,9 @@ impl ComputeControlPlane { env: self.env.clone(), pageserver: Arc::clone(&self.pageserver), is_test: false, - timelineid, + timeline_id, lsn, - tenantid, + tenant_id, uses_wal_proposer: false, }); @@ -103,50 +98,12 @@ impl ComputeControlPlane { node.setup_pg_conf(self.env.pageserver.auth_type)?; self.nodes - .insert((tenantid, node.name.clone()), Arc::clone(&node)); + .insert((tenant_id, node.name.clone()), Arc::clone(&node)); Ok(node) } } -// Parse user-given string that represents a point-in-time. -// -// Variants suported: -// -// Raw timeline id in hex, meaning the end of that timeline: -// bc62e7d612d0e6fe8f99a6dd2f281f9d -// -// A specific LSN on a timeline: -// bc62e7d612d0e6fe8f99a6dd2f281f9d@2/15D3DD8 -// -fn parse_point_in_time(timeline_spec: &str) -> anyhow::Result<(ZTimelineId, Option)> { - let mut strings = timeline_spec.split('@'); - - let name = match strings.next() { - Some(n) => n, - None => bail!("invalid timeline specification: {}", timeline_spec), - }; - let timeline_id = ZTimelineId::from_str(name).with_context(|| { - format!( - "failed to parse the timeline id from specification: {}", - timeline_spec - ) - })?; - - let lsn = strings - .next() - .map(Lsn::from_str) - .transpose() - .with_context(|| { - format!( - "failed to parse the Lsn from timeline specification: {}", - timeline_spec - ) - })?; - - Ok((timeline_id, lsn)) -} - /////////////////////////////////////////////////////////////////////////////// #[derive(Debug)] @@ -156,9 +113,9 @@ pub struct PostgresNode { pub env: LocalEnv, pageserver: Arc, is_test: bool, - pub timelineid: ZTimelineId, + pub timeline_id: ZTimelineId, pub lsn: Option, // if it's a read-only node. None for primary - pub tenantid: ZTenantId, + pub tenant_id: ZTenantId, uses_wal_proposer: bool, } @@ -191,7 +148,7 @@ impl PostgresNode { let context = format!("in config file {}", cfg_path_str); let port: u16 = conf.parse_field("port", &context)?; let timelineid: ZTimelineId = conf.parse_field("zenith.zenith_timeline", &context)?; - let tenantid: ZTenantId = conf.parse_field("zenith.zenith_tenant", &context)?; + let tenant_id: ZTenantId = conf.parse_field("zenith.zenith_tenant", &context)?; let uses_wal_proposer = conf.get("wal_acceptors").is_some(); // parse recovery_target_lsn, if any @@ -205,9 +162,9 @@ impl PostgresNode { env: env.clone(), pageserver: Arc::clone(pageserver), is_test: false, - timelineid, + timeline_id: timelineid, lsn: recovery_target_lsn, - tenantid, + tenant_id, uses_wal_proposer, }) } @@ -258,9 +215,9 @@ impl PostgresNode { ); let sql = if let Some(lsn) = lsn { - format!("basebackup {} {} {}", self.tenantid, self.timelineid, lsn) + format!("basebackup {} {} {}", self.tenant_id, self.timeline_id, lsn) } else { - format!("basebackup {} {}", self.tenantid, self.timelineid) + format!("basebackup {} {}", self.tenant_id, self.timeline_id) }; let mut client = self @@ -346,8 +303,8 @@ impl PostgresNode { conf.append("shared_preload_libraries", "zenith"); conf.append_line(""); conf.append("zenith.page_server_connstring", &pageserver_connstr); - conf.append("zenith.zenith_tenant", &self.tenantid.to_string()); - conf.append("zenith.zenith_timeline", &self.timelineid.to_string()); + conf.append("zenith.zenith_tenant", &self.tenant_id.to_string()); + conf.append("zenith.zenith_timeline", &self.timeline_id.to_string()); if let Some(lsn) = self.lsn { conf.append("recovery_target_lsn", &lsn.to_string()); } @@ -425,7 +382,7 @@ impl PostgresNode { } pub fn pgdata(&self) -> PathBuf { - self.env.pg_data_dir(&self.tenantid, &self.name) + self.env.pg_data_dir(&self.tenant_id, &self.name) } pub fn status(&self) -> &str { diff --git a/control_plane/src/local_env.rs b/control_plane/src/local_env.rs index 238c78821e..98b6379106 100644 --- a/control_plane/src/local_env.rs +++ b/control_plane/src/local_env.rs @@ -5,6 +5,7 @@ use anyhow::{bail, Context}; use serde::{Deserialize, Serialize}; +use std::collections::HashMap; use std::env; use std::fmt::Write; use std::fs; @@ -12,7 +13,7 @@ use std::path::{Path, PathBuf}; use std::process::{Command, Stdio}; use zenith_utils::auth::{encode_from_key_file, Claims, Scope}; use zenith_utils::postgres_backend::AuthType; -use zenith_utils::zid::{HexZTenantId, ZNodeId, ZTenantId}; +use zenith_utils::zid::{HexZTenantId, ZNodeId, ZTenantId, ZTimelineId}; use crate::safekeeper::SafekeeperNode; @@ -48,7 +49,7 @@ pub struct LocalEnv { // Default tenant ID to use with the 'zenith' command line utility, when // --tenantid is not explicitly specified. #[serde(default)] - pub default_tenantid: Option, + pub default_tenant_id: Option, // used to issue tokens during e.g pg start #[serde(default)] @@ -58,6 +59,13 @@ pub struct LocalEnv { #[serde(default)] pub safekeepers: Vec, + + /// Every tenant has a first timeline created for it, currently the only one ancestor-less for this tenant. + /// It is used as a default timeline for branching, if no ancestor timeline is specified. + #[serde(default)] + // TODO kb this does not survive calls between invocations, so will have to persist it. + // Then it comes back to names again? + pub initial_timelines: HashMap, } #[derive(Serialize, Deserialize, Clone, Debug)] @@ -183,8 +191,8 @@ impl LocalEnv { } // If no initial tenant ID was given, generate it. - if env.default_tenantid.is_none() { - env.default_tenantid = Some(HexZTenantId::from(ZTenantId::generate())); + if env.default_tenant_id.is_none() { + env.default_tenant_id = Some(HexZTenantId::from(ZTenantId::generate())); } env.base_data_dir = base_path(); diff --git a/control_plane/src/storage.rs b/control_plane/src/storage.rs index d550bfc064..9d5a88784d 100644 --- a/control_plane/src/storage.rs +++ b/control_plane/src/storage.rs @@ -325,7 +325,7 @@ impl PageServerNode { .json()?) } - pub fn tenant_create(&self, tenantid: ZTenantId) -> Result<()> { + pub fn tenant_create(&self, tenantid: ZTenantId) -> Result { Ok(self .http_request(Method::POST, format!("{}/{}", self.http_base_url, "tenant")) .json(&TenantCreateRequest { @@ -352,6 +352,7 @@ impl PageServerNode { tenant_id: ZTenantId, timeline_id: ZTimelineId, start_lsn: Option, + ancestor_timeline_id: Option, ) -> Result { Ok(self .http_request(Method::POST, format!("{}/timeline", self.http_base_url)) @@ -359,6 +360,7 @@ impl PageServerNode { tenant_id, timeline_id, start_lsn, + ancestor_timeline_id, }) .send()? .error_from_body()? diff --git a/pageserver/src/http/models.rs b/pageserver/src/http/models.rs index bc0d46a96c..7f95c64527 100644 --- a/pageserver/src/http/models.rs +++ b/pageserver/src/http/models.rs @@ -1,8 +1,9 @@ use serde::{Deserialize, Serialize}; -use zenith_utils::{lsn::Lsn, zid::ZTimelineId}; - -use crate::ZTenantId; use zenith_utils::zid::ZNodeId; +use zenith_utils::{ + lsn::Lsn, + zid::{opt_display_serde, ZTenantId, ZTimelineId}, +}; #[derive(Serialize, Deserialize)] pub struct TimelineCreateRequest { @@ -10,6 +11,8 @@ pub struct TimelineCreateRequest { pub tenant_id: ZTenantId, #[serde(with = "hex")] pub timeline_id: ZTimelineId, + #[serde(with = "opt_display_serde")] + pub ancestor_timeline_id: Option, pub start_lsn: Option, } diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index 34a61cab9c..f332e59135 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -3,7 +3,6 @@ use std::sync::Arc; use anyhow::Result; use hyper::StatusCode; use hyper::{Body, Request, Response, Uri}; -use serde::Serialize; use tracing::*; use zenith_utils::auth::JwtAuth; use zenith_utils::http::endpoint::attach_openapi_ui; @@ -17,15 +16,13 @@ use zenith_utils::http::{ request::parse_request_param, }; use zenith_utils::http::{RequestExt, RouterBuilder}; -use zenith_utils::lsn::Lsn; -use zenith_utils::zid::HexZTimelineId; -use zenith_utils::zid::ZTimelineId; +use zenith_utils::zid::{HexZTimelineId, ZTimelineId}; use super::models::StatusResponse; use super::models::TenantCreateRequest; use super::models::TimelineCreateRequest; use crate::repository::RepositoryTimeline; -use crate::repository::TimelineSyncState; +use crate::timelines::TimelineInfo; use crate::{config::PageServerConf, tenant_mgr, timelines, ZTenantId}; #[derive(Debug)] @@ -82,6 +79,7 @@ async fn timeline_create_handler(mut request: Request) -> Result) -> bool { .unwrap_or(false) } -#[derive(Debug, Serialize)] -#[serde(tag = "type")] -enum TimelineInfo { - Local { - #[serde(with = "hex")] - timeline_id: ZTimelineId, - #[serde(with = "hex")] - tenant_id: ZTenantId, - ancestor_timeline_id: Option, - last_record_lsn: Lsn, - prev_record_lsn: Lsn, - disk_consistent_lsn: Lsn, - timeline_state: Option, - }, - Remote { - #[serde(with = "hex")] - timeline_id: ZTimelineId, - #[serde(with = "hex")] - tenant_id: ZTenantId, - }, -} - async fn timeline_detail_handler(request: Request) -> Result, ApiError> { let tenant_id: ZTenantId = parse_request_param(&request, "tenant_id")?; check_permission(&request, Some(tenant_id))?; @@ -151,23 +127,13 @@ async fn timeline_detail_handler(request: Request) -> Result(match repo.get_timeline(timeline_id)?.local_timeline() { - None => TimelineInfo::Remote { - timeline_id, - tenant_id, - }, - Some(timeline) => TimelineInfo::Local { - timeline_id, - tenant_id, - ancestor_timeline_id: timeline - .get_ancestor_timeline_id() - .map(HexZTimelineId::from), - disk_consistent_lsn: timeline.get_disk_consistent_lsn(), - last_record_lsn: timeline.get_last_record_lsn(), - prev_record_lsn: timeline.get_prev_record_lsn(), - timeline_state: repo.get_timeline_state(timeline_id), - }, - }) + let include_non_incremental_logical_size = + get_include_non_incremental_logical_size(&request); + Ok::<_, anyhow::Error>(TimelineInfo::from_repo_timeline( + tenant_id, + repo.get_timeline(timeline_id)?, + include_non_incremental_logical_size, + )) }) .await .map_err(ApiError::from_err)??; @@ -247,13 +213,13 @@ async fn tenant_create_handler(mut request: Request) -> Result) -> Result, ApiError> { diff --git a/pageserver/src/remote_storage/storage_sync/index.rs b/pageserver/src/remote_storage/storage_sync/index.rs index 8ff92ed55e..81c99754c9 100644 --- a/pageserver/src/remote_storage/storage_sync/index.rs +++ b/pageserver/src/remote_storage/storage_sync/index.rs @@ -49,7 +49,7 @@ impl RelativePath { } /// An index to track tenant files that exist on the remote storage. -/// Currently, timeline archives files are tracked only. +/// Currently, timeline archive files are tracked only. #[derive(Debug, Clone)] pub struct RemoteTimelineIndex { timeline_files: HashMap, diff --git a/pageserver/src/repository.rs b/pageserver/src/repository.rs index 674d447624..be937b8d26 100644 --- a/pageserver/src/repository.rs +++ b/pageserver/src/repository.rs @@ -107,7 +107,7 @@ impl RepositoryTimeline { /// A state of the timeline synchronization with the remote storage. /// Contains `disk_consistent_lsn` of the corresponding remote timeline (latest checkpoint's disk_consistent_lsn). -#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)] +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] pub enum TimelineSyncState { /// No further downloads from the remote storage are needed. /// The timeline state is up-to-date or ahead of the remote storage one, diff --git a/pageserver/src/tenant_mgr.rs b/pageserver/src/tenant_mgr.rs index 98777e5e4b..f7f694d833 100644 --- a/pageserver/src/tenant_mgr.rs +++ b/pageserver/src/tenant_mgr.rs @@ -180,9 +180,9 @@ pub fn shutdown_all_tenants() { pub fn create_repository_for_tenant( conf: &'static PageServerConf, tenantid: ZTenantId, -) -> Result<()> { +) -> Result { let wal_redo_manager = Arc::new(PostgresRedoManager::new(conf, tenantid)); - let repo = timelines::create_repo(conf, tenantid, wal_redo_manager)?; + let (initial_timeline_id, repo) = timelines::create_repo(conf, tenantid, wal_redo_manager)?; match access_tenants().entry(tenantid) { hash_map::Entry::Occupied(_) => bail!("tenant {} already exists", tenantid), @@ -194,7 +194,7 @@ pub fn create_repository_for_tenant( } } - Ok(()) + Ok(initial_timeline_id) } pub fn get_tenant_state(tenantid: ZTenantId) -> Option { diff --git a/pageserver/src/timelines.rs b/pageserver/src/timelines.rs index 1e54fe3897..fc29767ddd 100644 --- a/pageserver/src/timelines.rs +++ b/pageserver/src/timelines.rs @@ -17,24 +17,133 @@ use std::{ use tracing::*; use zenith_utils::lsn::Lsn; -use zenith_utils::zid::{ZTenantId, ZTimelineId}; +use zenith_utils::zid::{opt_display_serde, ZTenantId, ZTimelineId}; use zenith_utils::{crashsafe_dir, logging}; use crate::walredo::WalRedoManager; -use crate::CheckpointConfig; use crate::{config::PageServerConf, repository::Repository}; use crate::{import_datadir, LOG_FILE_NAME}; use crate::{repository::RepositoryTimeline, tenant_mgr}; +use crate::{repository::Timeline, CheckpointConfig}; #[derive(Serialize, Deserialize, Clone)] -pub struct TimelineInfo { - #[serde(with = "hex")] - pub timeline_id: ZTimelineId, - pub latest_valid_lsn: Lsn, - pub ancestor_id: Option, - pub ancestor_lsn: Option, - pub current_logical_size: usize, - pub current_logical_size_non_incremental: Option, +#[serde(tag = "type")] +pub enum TimelineInfo { + Local { + #[serde(with = "hex")] + timeline_id: ZTimelineId, + #[serde(with = "hex")] + tenant_id: ZTenantId, + last_record_lsn: Lsn, + prev_record_lsn: Lsn, + #[serde(with = "opt_display_serde")] + ancestor_timeline_id: Option, + ancestor_lsn: Option, + disk_consistent_lsn: Lsn, + current_logical_size: usize, + current_logical_size_non_incremental: Option, + }, + Remote { + #[serde(with = "hex")] + timeline_id: ZTimelineId, + #[serde(with = "hex")] + tenant_id: ZTenantId, + disk_consistent_lsn: Lsn, + }, +} + +impl TimelineInfo { + pub fn from_repo_timeline( + tenant_id: ZTenantId, + repo_timeline: RepositoryTimeline, + include_non_incremental_logical_size: bool, + ) -> Self { + match repo_timeline { + RepositoryTimeline::Local { id, timeline } => { + let ancestor_timeline_id = timeline.get_ancestor_timeline_id(); + let ancestor_lsn = if ancestor_timeline_id.is_some() { + Some(timeline.get_ancestor_lsn()) + } else { + None + }; + + Self::Local { + timeline_id: id, + tenant_id, + last_record_lsn: timeline.get_last_record_lsn(), + prev_record_lsn: timeline.get_prev_record_lsn(), + ancestor_timeline_id, + ancestor_lsn, + disk_consistent_lsn: timeline.get_disk_consistent_lsn(), + current_logical_size: timeline.get_current_logical_size(), + current_logical_size_non_incremental: get_current_logical_size_non_incremental( + include_non_incremental_logical_size, + timeline.as_ref(), + ), + } + } + RepositoryTimeline::Remote { + id, + disk_consistent_lsn, + } => Self::Remote { + timeline_id: id, + tenant_id, + disk_consistent_lsn, + }, + } + } + + pub fn from_dyn_timeline( + tenant_id: ZTenantId, + timeline_id: ZTimelineId, + timeline: &dyn Timeline, + include_non_incremental_logical_size: bool, + ) -> Self { + let ancestor_timeline_id = timeline.get_ancestor_timeline_id(); + let ancestor_lsn = if ancestor_timeline_id.is_some() { + Some(timeline.get_ancestor_lsn()) + } else { + None + }; + + Self::Local { + timeline_id, + tenant_id, + last_record_lsn: timeline.get_last_record_lsn(), + prev_record_lsn: timeline.get_prev_record_lsn(), + ancestor_timeline_id, + ancestor_lsn, + disk_consistent_lsn: timeline.get_disk_consistent_lsn(), + current_logical_size: timeline.get_current_logical_size(), + current_logical_size_non_incremental: get_current_logical_size_non_incremental( + include_non_incremental_logical_size, + timeline, + ), + } + } + + pub fn timeline_id(&self) -> ZTimelineId { + match *self { + TimelineInfo::Local { timeline_id, .. } => timeline_id, + TimelineInfo::Remote { timeline_id, .. } => timeline_id, + } + } +} + +fn get_current_logical_size_non_incremental( + include_non_incremental_logical_size: bool, + timeline: &dyn Timeline, +) -> Option { + if !include_non_incremental_logical_size { + return None; + } + match timeline.get_current_logical_size_non_incremental(timeline.get_last_record_lsn()) { + Ok(size) => Some(size), + Err(e) => { + error!("Failed to get non-incremental logical size: {:?}", e); + None + } + } } #[derive(Debug, Clone, Copy)] @@ -75,7 +184,7 @@ pub fn create_repo( conf: &'static PageServerConf, tenantid: ZTenantId, wal_redo_manager: Arc, -) -> Result> { +) -> Result<(ZTimelineId, Arc)> { let repo_dir = conf.tenant_path(&tenantid); if repo_dir.exists() { bail!("repo for {} already exists", tenantid) @@ -107,7 +216,7 @@ pub fn create_repo( // move data loading out of create_repo() bootstrap_timeline(conf, tenantid, timeline_id, repo.as_ref())?; - Ok(repo) + Ok((timeline_id, repo)) } // Returns checkpoint LSN from controlfile @@ -160,7 +269,7 @@ fn bootstrap_timeline( tenantid: ZTenantId, tli: ZTimelineId, repo: &dyn Repository, -) -> Result<()> { +) -> Result> { let _enter = info_span!("bootstrapping", timeline = %tli, tenant = %tenantid).entered(); let initdb_path = conf.tenant_path(&tenantid).join("tmp"); @@ -192,7 +301,7 @@ fn bootstrap_timeline( // Remove temp dir. We don't need it anymore fs::remove_dir_all(pgdata_path)?; - Ok(()) + Ok(timeline) } pub(crate) fn get_timelines( @@ -211,110 +320,86 @@ pub(crate) fn get_timelines( RepositoryTimeline::Remote { .. } => None, }) .map(|(timeline_id, timeline)| { - let (ancestor_id, ancestor_lsn) = match timeline.get_ancestor_timeline_id() { - Some(ancestor_id) => ( - Some(ancestor_id.to_string()), - Some(timeline.get_ancestor_lsn().to_string()), - ), - None => (None, None), - }; - - let current_logical_size_non_incremental = if include_non_incremental_logical_size { - match timeline - .get_current_logical_size_non_incremental(timeline.get_last_record_lsn()) - { - Ok(size) => Some(size), - Err(e) => { - error!( - "Failed to get current logical size for timeline {}: {:?}", - timeline_id, e - ); - None - } - } - } else { - None - }; - - TimelineInfo { + TimelineInfo::from_dyn_timeline( + tenant_id, timeline_id, - latest_valid_lsn: timeline.get_last_record_lsn(), - ancestor_id, - ancestor_lsn, - current_logical_size: timeline.get_current_logical_size(), - // non incremental size calculation can be heavy, so let it be optional - // needed for tests to check size calculation - current_logical_size_non_incremental, - } + timeline.as_ref(), + include_non_incremental_logical_size, + ) }) .collect()) } pub(crate) fn create_timeline( - conf: &PageServerConf, + conf: &'static PageServerConf, tenant_id: ZTenantId, - timeline_id: ZTimelineId, - start_lsn: Option, + new_timeline_id: ZTimelineId, + ancestor_timeline_id: Option, + ancestor_start_lsn: Option, ) -> Result { - if conf.timeline_path(&timeline_id, &tenant_id).exists() { - bail!("timeline {} already exists", timeline_id); + if conf.timeline_path(&new_timeline_id, &tenant_id).exists() { + bail!("timeline {} already exists", new_timeline_id); } let repo = tenant_mgr::get_repository_for_tenant(tenant_id)?; + let mut start_lsn = ancestor_start_lsn.unwrap_or(Lsn(0)); - let mut startpoint = PointInTime { - timeline_id, - lsn: start_lsn.unwrap_or(Lsn(0)), - }; + match ancestor_timeline_id { + Some(ancestor_timeline_id) => { + let ancestor_timeline = repo + .get_timeline(ancestor_timeline_id) + .with_context(|| format!("Cannot get ancestor timeline {}", ancestor_timeline_id))? + .local_timeline() + .with_context(|| { + format!( + "Cannot branch off the timeline {} that's not present locally", + ancestor_timeline_id + ) + })?; - let timeline = repo - .get_timeline(startpoint.timeline_id)? - .local_timeline() - .context("Cannot branch off the timeline that's not present locally")?; - if startpoint.lsn == Lsn(0) { - // Find end of WAL on the old timeline - let end_of_wal = timeline.get_last_record_lsn(); - info!("branching at end of WAL: {}", end_of_wal); - startpoint.lsn = end_of_wal; - } else { - // Wait for the WAL to arrive and be processed on the parent branch up - // to the requested branch point. The repository code itself doesn't - // require it, but if we start to receive WAL on the new timeline, - // decoding the new WAL might need to look up previous pages, relation - // sizes etc. and that would get confused if the previous page versions - // are not in the repository yet. - timeline.wait_lsn(startpoint.lsn)?; + if start_lsn == Lsn(0) { + // Find end of WAL on the old timeline + let end_of_wal = ancestor_timeline.get_last_record_lsn(); + info!("branching at end of WAL: {}", end_of_wal); + start_lsn = end_of_wal; + } else { + // Wait for the WAL to arrive and be processed on the parent branch up + // to the requested branch point. The repository code itself doesn't + // require it, but if we start to receive WAL on the new timeline, + // decoding the new WAL might need to look up previous pages, relation + // sizes etc. and that would get confused if the previous page versions + // are not in the repository yet. + ancestor_timeline.wait_lsn(start_lsn)?; + } + start_lsn = start_lsn.align(); + + let ancestor_ancestor_lsn = ancestor_timeline.get_ancestor_lsn(); + if ancestor_ancestor_lsn > start_lsn { + // can we safely just branch from the ancestor instead? + anyhow::bail!( + "invalid start lsn {} for ancestor timeline {}: less than timeline ancestor lsn {}", + start_lsn, + ancestor_timeline_id, + ancestor_ancestor_lsn, + ); + } + repo.branch_timeline(ancestor_timeline_id, new_timeline_id, start_lsn)?; + // load the timeline into memory + let loaded_timeline = repo.get_timeline(new_timeline_id)?; + Ok(TimelineInfo::from_repo_timeline( + tenant_id, + loaded_timeline, + false, + )) + } + None => { + let new_timeline = bootstrap_timeline(conf, tenant_id, new_timeline_id, repo.as_ref())?; + Ok(TimelineInfo::from_dyn_timeline( + tenant_id, + new_timeline_id, + new_timeline.as_ref(), + false, + )) + } } - startpoint.lsn = startpoint.lsn.align(); - if timeline.get_ancestor_lsn() > startpoint.lsn { - // can we safely just branch from the ancestor instead? - bail!( - "invalid startpoint {} for the timeline {}: less than timeline ancestor lsn {:?}", - startpoint.lsn, - timeline_id, - timeline.get_ancestor_lsn() - ); - } - - let new_timeline_id = ZTimelineId::generate(); - - // Forward entire timeline creation routine to repository - // backend, so it can do all needed initialization - repo.branch_timeline(startpoint.timeline_id, new_timeline_id, startpoint.lsn)?; - - // Remember the human-readable branch name for the new timeline. - // FIXME: there's a race condition, if you create a branch with the same - // name concurrently. - // TODO kb timeline creation needs more - let data = new_timeline_id.to_string(); - fs::write(conf.timeline_path(&timeline_id, &tenant_id), data)?; - - Ok(TimelineInfo { - timeline_id: new_timeline_id, - latest_valid_lsn: startpoint.lsn, - ancestor_id: Some(startpoint.timeline_id.to_string()), - ancestor_lsn: Some(startpoint.lsn.to_string()), - current_logical_size: 0, - current_logical_size_non_incremental: Some(0), - }) } diff --git a/test_runner/batch_others/test_auth.py b/test_runner/batch_others/test_auth.py index ee1a09c917..4d1d0847ed 100644 --- a/test_runner/batch_others/test_auth.py +++ b/test_runner/batch_others/test_auth.py @@ -1,8 +1,8 @@ from contextlib import closing from typing import Iterator from uuid import UUID, uuid4 -import psycopg2 from fixtures.zenith_fixtures import ZenithEnvBuilder, ZenithPageserverApiException +from requests.exceptions import HTTPError import pytest @@ -26,14 +26,20 @@ def test_pageserver_auth(zenith_env_builder: ZenithEnvBuilder): ps.safe_psql("set FOO", password=management_token) # tenant can create branches - tenant_http_client.branch_create(env.initial_tenant, 'new1', 'main') + tenant_http_client.timeline_create(timeline_id=uuid4(), + tenant_id=env.initial_tenant, + ancestor_timeline_id=env.initial_timeline) # console can create branches for tenant - management_http_client.branch_create(env.initial_tenant, 'new2', 'main') + management_http_client.timeline_create(timeline_id=uuid4(), + tenant_id=env.initial_tenant, + ancestor_timeline_id=env.initial_timeline) # fail to create branch using token with different tenant_id with pytest.raises(ZenithPageserverApiException, match='Forbidden: Tenant id mismatch. Permission denied'): - invalid_tenant_http_client.branch_create(env.initial_tenant, "new3", "main") + invalid_tenant_http_client.timeline_create(timeline_id=uuid4(), + tenant_id=env.initial_tenant, + ancestor_timeline_id=env.initial_timeline) # create tenant using management token management_http_client.tenant_create(uuid4()) @@ -54,9 +60,8 @@ def test_compute_auth_to_pageserver(zenith_env_builder: ZenithEnvBuilder, with_w env = zenith_env_builder.init_start() branch = f"test_compute_auth_to_pageserver{with_wal_acceptors}" - env.zenith_cli.create_branch(branch, "main") - - pg = env.postgres.create_start(branch) + new_timeline_id = env.zenith_cli.branch_timeline() + pg = env.postgres.create_start(branch, timeline_id=new_timeline_id) with closing(pg.connect()) as conn: with conn.cursor() as cur: diff --git a/test_runner/batch_others/test_branch_behind.py b/test_runner/batch_others/test_branch_behind.py index 509c46975e..f8ff1741b4 100644 --- a/test_runner/batch_others/test_branch_behind.py +++ b/test_runner/batch_others/test_branch_behind.py @@ -22,9 +22,9 @@ def test_branch_behind(zenith_env_builder: ZenithEnvBuilder): env = zenith_env_builder.init_start() # Branch at the point where only 100 rows were inserted - env.zenith_cli.create_branch("test_branch_behind", "main") - - pgmain = env.postgres.create_start('test_branch_behind') + test_branch_behind_timeline_id = env.zenith_cli.branch_timeline() + pgmain = env.postgres.create_start('test_branch_behind', + timeline_id=test_branch_behind_timeline_id) log.info("postgres is running on 'test_branch_behind' branch") main_pg_conn = pgmain.connect() @@ -60,7 +60,8 @@ def test_branch_behind(zenith_env_builder: ZenithEnvBuilder): log.info(f'LSN after 200100 rows: {lsn_b}') # Branch at the point where only 100 rows were inserted - env.zenith_cli.create_branch("test_branch_behind_hundred", "test_branch_behind@" + lsn_a) + test_branch_behind_hundred_timeline_id = env.zenith_cli.branch_timeline( + ancestor_timeline_id=test_branch_behind_timeline_id, ancestor_start_lsn=lsn_a) # Insert many more rows. This generates enough WAL to fill a few segments. main_cur.execute(''' @@ -75,10 +76,13 @@ def test_branch_behind(zenith_env_builder: ZenithEnvBuilder): log.info(f'LSN after 400100 rows: {lsn_c}') # Branch at the point where only 200100 rows were inserted - env.zenith_cli.create_branch("test_branch_behind_more", "test_branch_behind@" + lsn_b) + test_branch_behind_more_timeline_id = env.zenith_cli.branch_timeline( + ancestor_timeline_id=test_branch_behind_timeline_id, ancestor_start_lsn=lsn_b) - pg_hundred = env.postgres.create_start("test_branch_behind_hundred") - pg_more = env.postgres.create_start("test_branch_behind_more") + pg_hundred = env.postgres.create_start("test_branch_behind_hundred", + timeline_id=test_branch_behind_hundred_timeline_id) + pg_more = env.postgres.create_start("test_branch_behind_more", + timeline_id=test_branch_behind_more_timeline_id) # On the 'hundred' branch, we should see only 100 rows hundred_pg_conn = pg_hundred.connect() @@ -99,19 +103,23 @@ def test_branch_behind(zenith_env_builder: ZenithEnvBuilder): # Check bad lsn's for branching # branch at segment boundary - env.zenith_cli.create_branch("test_branch_segment_boundary", "test_branch_behind@0/3000000") - pg = env.postgres.create_start("test_branch_segment_boundary") + test_branch_segment_boundary_timeline_id = env.zenith_cli.branch_timeline( + ancestor_timeline_id=test_branch_behind_timeline_id, ancestor_start_lsn="0/3000000") + pg = env.postgres.create_start("test_branch_segment_boundary", + timeline_id=test_branch_segment_boundary_timeline_id) cur = pg.connect().cursor() cur.execute('SELECT 1') assert cur.fetchone() == (1, ) # branch at pre-initdb lsn with pytest.raises(Exception, match="invalid branch start lsn"): - env.zenith_cli.create_branch("test_branch_preinitdb", "main@0/42") + env.zenith_cli.branch_timeline(ancestor_timeline_id=env.initial_timeline, + ancestor_start_lsn="0/42") # branch at pre-ancestor lsn with pytest.raises(Exception, match="less than timeline ancestor lsn"): - env.zenith_cli.create_branch("test_branch_preinitdb", "test_branch_behind@0/42") + env.zenith_cli.branch_timeline(ancestor_timeline_id=test_branch_behind_timeline_id, + ancestor_start_lsn="0/42") # check that we cannot create branch based on garbage collected data with closing(env.pageserver.connect()) as psconn: @@ -123,7 +131,8 @@ def test_branch_behind(zenith_env_builder: ZenithEnvBuilder): with pytest.raises(Exception, match="invalid branch start lsn"): # this gced_lsn is pretty random, so if gc is disabled this woudln't fail - env.zenith_cli.create_branch("test_branch_create_fail", f"test_branch_behind@{gced_lsn}") + env.zenith_cli.branch_timeline(ancestor_timeline_id=test_branch_behind_timeline_id, + ancestor_start_lsn=gced_lsn) # check that after gc everything is still there hundred_cur.execute('SELECT count(*) FROM foo') diff --git a/test_runner/batch_others/test_clog_truncate.py b/test_runner/batch_others/test_clog_truncate.py index 504f455936..9d3927aa84 100644 --- a/test_runner/batch_others/test_clog_truncate.py +++ b/test_runner/batch_others/test_clog_truncate.py @@ -12,7 +12,7 @@ from fixtures.log_helper import log # def test_clog_truncate(zenith_simple_env: ZenithEnv): env = zenith_simple_env - env.zenith_cli.create_branch("test_clog_truncate", "empty") + test_clog_truncate_timeline_id = env.zenith_cli.branch_timeline() # set agressive autovacuum to make sure that truncation will happen config = [ @@ -25,7 +25,9 @@ def test_clog_truncate(zenith_simple_env: ZenithEnv): 'autovacuum_freeze_max_age=100000' ] - pg = env.postgres.create_start('test_clog_truncate', config_lines=config) + pg = env.postgres.create_start('test_clog_truncate', + config_lines=config, + timeline_id=test_clog_truncate_timeline_id) log.info('postgres is running on test_clog_truncate branch') # Install extension containing function needed for test @@ -62,10 +64,11 @@ def test_clog_truncate(zenith_simple_env: ZenithEnv): # create new branch after clog truncation and start a compute node on it log.info(f'create branch at lsn_after_truncation {lsn_after_truncation}') - env.zenith_cli.create_branch("test_clog_truncate_new", - "test_clog_truncate@" + lsn_after_truncation) - - pg2 = env.postgres.create_start('test_clog_truncate_new') + test_clog_truncate_new_timeline_id = env.zenith_cli.branch_timeline( + ancestor_timeline_id=test_clog_truncate_timeline_id, + ancestor_start_lsn=lsn_after_truncation) + pg2 = env.postgres.create_start('test_clog_truncate_new', + timeline_id=test_clog_truncate_new_timeline_id) log.info('postgres is running on test_clog_truncate_new branch') # check that new node doesn't contain truncated segment diff --git a/test_runner/batch_others/test_config.py b/test_runner/batch_others/test_config.py index fd2b3b4e99..bd1f8b487f 100644 --- a/test_runner/batch_others/test_config.py +++ b/test_runner/batch_others/test_config.py @@ -9,10 +9,10 @@ from fixtures.log_helper import log # def test_config(zenith_simple_env: ZenithEnv): env = zenith_simple_env - env.zenith_cli.create_branch("test_config", "empty") - - # change config - pg = env.postgres.create_start('test_config', config_lines=['log_min_messages=debug1']) + new_timeline_id = env.zenith_cli.branch_timeline() + pg = env.postgres.create_start('test_config', + config_lines=['log_min_messages=debug1'], + timeline_id=new_timeline_id) log.info('postgres is running on test_config branch') with closing(pg.connect()) as conn: diff --git a/test_runner/batch_others/test_createdropdb.py b/test_runner/batch_others/test_createdropdb.py index 38243b298b..e77e1928b8 100644 --- a/test_runner/batch_others/test_createdropdb.py +++ b/test_runner/batch_others/test_createdropdb.py @@ -11,9 +11,9 @@ from fixtures.log_helper import log # def test_createdb(zenith_simple_env: ZenithEnv): env = zenith_simple_env - env.zenith_cli.create_branch("test_createdb", "empty") + test_createdb_timeline_id = env.zenith_cli.branch_timeline() - pg = env.postgres.create_start('test_createdb') + pg = env.postgres.create_start('test_createdb', timeline_id=test_createdb_timeline_id) log.info("postgres is running on 'test_createdb' branch") with closing(pg.connect()) as conn: @@ -27,9 +27,9 @@ def test_createdb(zenith_simple_env: ZenithEnv): lsn = cur.fetchone()[0] # Create a branch - env.zenith_cli.create_branch("test_createdb2", "test_createdb@" + lsn) - - pg2 = env.postgres.create_start('test_createdb2') + test_createdb2_timeline_id = env.zenith_cli.branch_timeline( + ancestor_timeline_id=test_createdb_timeline_id, ancestor_start_lsn=lsn) + pg2 = env.postgres.create_start('test_createdb2', timeline_id=test_createdb2_timeline_id) # Test that you can connect to the new database on both branches for db in (pg, pg2): @@ -41,9 +41,8 @@ def test_createdb(zenith_simple_env: ZenithEnv): # def test_dropdb(zenith_simple_env: ZenithEnv, test_output_dir): env = zenith_simple_env - env.zenith_cli.create_branch("test_dropdb", "empty") - - pg = env.postgres.create_start('test_dropdb') + test_dropdb_timeline_id = env.zenith_cli.branch_timeline() + pg = env.postgres.create_start('test_dropdb', timeline_id=test_dropdb_timeline_id) log.info("postgres is running on 'test_dropdb' branch") with closing(pg.connect()) as conn: @@ -66,11 +65,15 @@ def test_dropdb(zenith_simple_env: ZenithEnv, test_output_dir): lsn_after_drop = cur.fetchone()[0] # Create two branches before and after database drop. - env.zenith_cli.create_branch("test_before_dropdb", "test_dropdb@" + lsn_before_drop) - pg_before = env.postgres.create_start('test_before_dropdb') + test_before_dropdb_timeline_db = env.zenith_cli.branch_timeline( + ancestor_timeline_id=test_dropdb_timeline_id, ancestor_start_lsn=lsn_before_drop) + pg_before = env.postgres.create_start('test_before_dropdb', + timeline_id=test_before_dropdb_timeline_db) - env.zenith_cli.create_branch("test_after_dropdb", "test_dropdb@" + lsn_after_drop) - pg_after = env.postgres.create_start('test_after_dropdb') + test_after_dropdb_timeline_id = env.zenith_cli.branch_timeline( + ancestor_timeline_id=test_dropdb_timeline_id, ancestor_start_lsn=lsn_after_drop) + pg_after = env.postgres.create_start('test_after_dropdb', + timeline_id=test_after_dropdb_timeline_id) # Test that database exists on the branch before drop pg_before.connect(dbname='foodb').close() diff --git a/test_runner/batch_others/test_createuser.py b/test_runner/batch_others/test_createuser.py index 1959b47dcc..8f825a0a1a 100644 --- a/test_runner/batch_others/test_createuser.py +++ b/test_runner/batch_others/test_createuser.py @@ -9,9 +9,8 @@ from fixtures.log_helper import log # def test_createuser(zenith_simple_env: ZenithEnv): env = zenith_simple_env - env.zenith_cli.create_branch("test_createuser", "empty") - - pg = env.postgres.create_start('test_createuser') + test_createuser_timeline_id = env.zenith_cli.branch_timeline() + pg = env.postgres.create_start('test_createuser', timeline_id=test_createuser_timeline_id) log.info("postgres is running on 'test_createuser' branch") with closing(pg.connect()) as conn: @@ -25,9 +24,9 @@ def test_createuser(zenith_simple_env: ZenithEnv): lsn = cur.fetchone()[0] # Create a branch - env.zenith_cli.create_branch("test_createuser2", "test_createuser@" + lsn) - - pg2 = env.postgres.create_start('test_createuser2') + test_createuser2_timeline_id = env.zenith_cli.branch_timeline( + ancestor_timeline_id=test_createuser_timeline_id, ancestor_start_lsn=lsn) + pg2 = env.postgres.create_start('test_createuser2', timeline_id=test_createuser2_timeline_id) # Test that you can connect to new branch as a new user assert pg2.safe_psql('select current_user', username='testuser') == [('testuser', )] diff --git a/test_runner/batch_others/test_gc_aggressive.py b/test_runner/batch_others/test_gc_aggressive.py index 9de6ba9f59..7dd38a5799 100644 --- a/test_runner/batch_others/test_gc_aggressive.py +++ b/test_runner/batch_others/test_gc_aggressive.py @@ -1,7 +1,6 @@ from contextlib import closing import asyncio -import asyncpg import random from fixtures.zenith_fixtures import ZenithEnv, Postgres, Safekeeper @@ -55,8 +54,8 @@ async def update_and_gc(env: ZenithEnv, pg: Postgres, timeline: str): # def test_gc_aggressive(zenith_simple_env: ZenithEnv): env = zenith_simple_env - env.zenith_cli.create_branch("test_gc_aggressive", "empty") - pg = env.postgres.create_start('test_gc_aggressive') + new_timeline_id = env.zenith_cli.branch_timeline() + pg = env.postgres.create_start('test_gc_aggressive', timeline_id=new_timeline_id) log.info('postgres is running on test_gc_aggressive branch') conn = pg.connect() diff --git a/test_runner/batch_others/test_multixact.py b/test_runner/batch_others/test_multixact.py index 6a2afd2ede..11f8000226 100644 --- a/test_runner/batch_others/test_multixact.py +++ b/test_runner/batch_others/test_multixact.py @@ -10,8 +10,8 @@ from fixtures.log_helper import log # def test_multixact(zenith_simple_env: ZenithEnv, test_output_dir): env = zenith_simple_env - env.zenith_cli.create_branch("test_multixact", "empty") - pg = env.postgres.create_start('test_multixact') + test_multixact_timeline_id = env.zenith_cli.branch_timeline() + pg = env.postgres.create_start('test_multixact', timeline_id=test_multixact_timeline_id) log.info("postgres is running on 'test_multixact' branch") pg_conn = pg.connect() @@ -60,8 +60,10 @@ def test_multixact(zenith_simple_env: ZenithEnv, test_output_dir): assert int(next_multixact_id) > int(next_multixact_id_old) # Branch at this point - env.zenith_cli.create_branch("test_multixact_new", "test_multixact@" + lsn) - pg_new = env.postgres.create_start('test_multixact_new') + test_multixact_new_timeline_id = env.zenith_cli.branch_timeline( + ancestor_timeline_id=test_multixact_timeline_id, ancestor_start_lsn=lsn) + pg_new = env.postgres.create_start('test_multixact_new', + timeline_id=test_multixact_new_timeline_id) log.info("postgres is running on 'test_multixact_new' branch") pg_new_conn = pg_new.connect() diff --git a/test_runner/batch_others/test_old_request_lsn.py b/test_runner/batch_others/test_old_request_lsn.py index d09fb24913..f0701dfe4f 100644 --- a/test_runner/batch_others/test_old_request_lsn.py +++ b/test_runner/batch_others/test_old_request_lsn.py @@ -16,8 +16,8 @@ from fixtures.log_helper import log # def test_old_request_lsn(zenith_simple_env: ZenithEnv): env = zenith_simple_env - env.zenith_cli.create_branch("test_old_request_lsn", "empty") - pg = env.postgres.create_start('test_old_request_lsn') + new_timeline_id = env.zenith_cli.branch_timeline() + pg = env.postgres.create_start('test_old_request_lsn', timeline_id=new_timeline_id) log.info('postgres is running on test_old_request_lsn branch') pg_conn = pg.connect() diff --git a/test_runner/batch_others/test_pageserver_api.py b/test_runner/batch_others/test_pageserver_api.py index ba1f106c4b..4c3b98e838 100644 --- a/test_runner/batch_others/test_pageserver_api.py +++ b/test_runner/batch_others/test_pageserver_api.py @@ -26,18 +26,20 @@ def check_client(client: ZenithPageserverHttpClient, initial_tenant: UUID): # check its timelines timelines = client.timeline_list(tenant_id) assert len(timelines) > 0 - for timeline_id_str in timelines: - timeline_details = client.timeline_detail(tenant_id, UUID(timeline_id_str)) + for timeline in timelines: + timeline_id_str = str(timeline['timeline_id']) + timeline_details = client.timeline_detail(tenant_id=tenant_id, + timeline_id=UUID(timeline_id_str)) assert timeline_details['type'] == 'Local' assert timeline_details['tenant_id'] == tenant_id.hex assert timeline_details['timeline_id'] == timeline_id_str - # create branch - branch_name = uuid4().hex - client.branch_create(tenant_id, branch_name, "main") + # create timeline + timeline_id = uuid4() + client.timeline_create(tenant_id=tenant_id, timeline_id=timeline_id) # check it is there - assert branch_name in {b['name'] for b in client.branch_list(tenant_id)} + assert timeline_id.hex in {b['timeline_id'] for b in client.timeline_list(tenant_id)} def test_pageserver_http_api_client(zenith_simple_env: ZenithEnv): diff --git a/test_runner/batch_others/test_pageserver_catchup.py b/test_runner/batch_others/test_pageserver_catchup.py index 985d1a3af0..ba77a4a321 100644 --- a/test_runner/batch_others/test_pageserver_catchup.py +++ b/test_runner/batch_others/test_pageserver_catchup.py @@ -16,8 +16,9 @@ def test_pageserver_catchup_while_compute_down(zenith_env_builder: ZenithEnvBuil zenith_env_builder.num_safekeepers = 3 env = zenith_env_builder.init_start() - env.zenith_cli.create_branch("test_pageserver_catchup_while_compute_down", "main") - pg = env.postgres.create_start('test_pageserver_catchup_while_compute_down') + new_timeline_id = env.zenith_cli.branch_timeline() + pg = env.postgres.create_start('test_pageserver_catchup_while_compute_down', + timeline_id=new_timeline_id) pg_conn = pg.connect() cur = pg_conn.cursor() @@ -59,7 +60,8 @@ def test_pageserver_catchup_while_compute_down(zenith_env_builder: ZenithEnvBuil env.safekeepers[2].start() # restart compute node - pg.stop_and_destroy().create_start('test_pageserver_catchup_while_compute_down') + pg.stop_and_destroy().create_start('test_pageserver_catchup_while_compute_down', + timeline_id=new_timeline_id) # Ensure that basebackup went correct and pageserver returned all data pg_conn = pg.connect() diff --git a/test_runner/batch_others/test_pageserver_restart.py b/test_runner/batch_others/test_pageserver_restart.py index ec93c2cf5b..f1d154408c 100644 --- a/test_runner/batch_others/test_pageserver_restart.py +++ b/test_runner/batch_others/test_pageserver_restart.py @@ -15,8 +15,8 @@ def test_pageserver_restart(zenith_env_builder: ZenithEnvBuilder): zenith_env_builder.num_safekeepers = 1 env = zenith_env_builder.init_start() - env.zenith_cli.create_branch("test_pageserver_restart", "main") - pg = env.postgres.create_start('test_pageserver_restart') + new_timeline_id = env.zenith_cli.branch_timeline() + pg = env.postgres.create_start('test_pageserver_restart', timeline_id=new_timeline_id) pg_conn = pg.connect() cur = pg_conn.cursor() diff --git a/test_runner/batch_others/test_parallel_copy.py b/test_runner/batch_others/test_parallel_copy.py index 6f87bc4a36..8e954a8e51 100644 --- a/test_runner/batch_others/test_parallel_copy.py +++ b/test_runner/batch_others/test_parallel_copy.py @@ -1,7 +1,5 @@ from io import BytesIO import asyncio -import asyncpg -import subprocess from fixtures.zenith_fixtures import ZenithEnv, Postgres from fixtures.log_helper import log @@ -37,8 +35,8 @@ async def parallel_load_same_table(pg: Postgres, n_parallel: int): # Load data into one table with COPY TO from 5 parallel connections def test_parallel_copy(zenith_simple_env: ZenithEnv, n_parallel=5): env = zenith_simple_env - env.zenith_cli.create_branch("test_parallel_copy", "empty") - pg = env.postgres.create_start('test_parallel_copy') + new_timeline_id = env.zenith_cli.branch_timeline() + pg = env.postgres.create_start('test_parallel_copy', timeline_id=new_timeline_id) log.info("postgres is running on 'test_parallel_copy' branch") # Create test table diff --git a/test_runner/batch_others/test_pgbench.py b/test_runner/batch_others/test_pgbench.py index 09713023bc..207f1e1e2c 100644 --- a/test_runner/batch_others/test_pgbench.py +++ b/test_runner/batch_others/test_pgbench.py @@ -4,8 +4,8 @@ from fixtures.log_helper import log def test_pgbench(zenith_simple_env: ZenithEnv, pg_bin): env = zenith_simple_env - env.zenith_cli.create_branch("test_pgbench", "empty") - pg = env.postgres.create_start('test_pgbench') + new_timeline_id = env.zenith_cli.branch_timeline() + pg = env.postgres.create_start('test_pgbench', timeline_id=new_timeline_id) log.info("postgres is running on 'test_pgbench' branch") connstr = pg.connstr() diff --git a/test_runner/batch_others/test_readonly_node.py b/test_runner/batch_others/test_readonly_node.py index ba256e71f7..2998ea7528 100644 --- a/test_runner/batch_others/test_readonly_node.py +++ b/test_runner/batch_others/test_readonly_node.py @@ -11,9 +11,9 @@ from fixtures.zenith_fixtures import ZenithEnv # def test_readonly_node(zenith_simple_env: ZenithEnv): env = zenith_simple_env - env.zenith_cli.create_branch("test_readonly_node", "empty") - - pgmain = env.postgres.create_start('test_readonly_node') + test_readonly_node_timeline_id = env.zenith_cli.branch_timeline() + pgmain = env.postgres.create_start('test_readonly_node', + timeline_id=test_readonly_node_timeline_id) log.info("postgres is running on 'test_readonly_node' branch") main_pg_conn = pgmain.connect() @@ -54,11 +54,13 @@ def test_readonly_node(zenith_simple_env: ZenithEnv): # Create first read-only node at the point where only 100 rows were inserted pg_hundred = env.postgres.create_start("test_readonly_node_hundred", - branch=f'test_readonly_node@{lsn_a}') + timeline_id=test_readonly_node_timeline_id, + lsn=lsn_a) # And another at the point where 200100 rows were inserted pg_more = env.postgres.create_start("test_readonly_node_more", - branch=f'test_readonly_node@{lsn_b}') + timeline_id=test_readonly_node_timeline_id, + lsn=lsn_b) # On the 'hundred' node, we should see only 100 rows hundred_pg_conn = pg_hundred.connect() @@ -78,7 +80,8 @@ def test_readonly_node(zenith_simple_env: ZenithEnv): # Check creating a node at segment boundary pg = env.postgres.create_start("test_branch_segment_boundary", - branch="test_readonly_node@0/3000000") + timeline_id=test_readonly_node_timeline_id, + lsn='0/3000000') cur = pg.connect().cursor() cur.execute('SELECT 1') assert cur.fetchone() == (1, ) @@ -87,4 +90,5 @@ def test_readonly_node(zenith_simple_env: ZenithEnv): with pytest.raises(Exception, match="invalid basebackup lsn"): # compute node startup with invalid LSN should fail env.zenith_cli.pg_start("test_readonly_node_preinitdb", - timeline_spec="test_readonly_node@0/42") + timeline_id=test_readonly_node_timeline_id, + lsn="0/42") diff --git a/test_runner/batch_others/test_restart_compute.py b/test_runner/batch_others/test_restart_compute.py index d4dd3fb9e2..baa1f787df 100644 --- a/test_runner/batch_others/test_restart_compute.py +++ b/test_runner/batch_others/test_restart_compute.py @@ -15,9 +15,8 @@ def test_restart_compute(zenith_env_builder: ZenithEnvBuilder, with_wal_acceptor zenith_env_builder.num_safekeepers = 3 env = zenith_env_builder.init_start() - env.zenith_cli.create_branch("test_restart_compute", "main") - - pg = env.postgres.create_start('test_restart_compute') + new_timeline_id = env.zenith_cli.branch_timeline() + pg = env.postgres.create_start('test_restart_compute', timeline_id=new_timeline_id) log.info("postgres is running on 'test_restart_compute' branch") with closing(pg.connect()) as conn: @@ -30,7 +29,7 @@ def test_restart_compute(zenith_env_builder: ZenithEnvBuilder, with_wal_acceptor log.info(f"res = {r}") # Remove data directory and restart - pg.stop_and_destroy().create_start('test_restart_compute') + pg.stop_and_destroy().create_start('test_restart_compute', timeline_id=new_timeline_id) with closing(pg.connect()) as conn: with conn.cursor() as cur: @@ -49,7 +48,7 @@ def test_restart_compute(zenith_env_builder: ZenithEnvBuilder, with_wal_acceptor log.info(f"res = {r}") # Again remove data directory and restart - pg.stop_and_destroy().create_start('test_restart_compute') + pg.stop_and_destroy().create_start('test_restart_compute', timeline_id=new_timeline_id) # That select causes lots of FPI's and increases probability of wakeepers # lagging behind after query completion @@ -63,7 +62,7 @@ def test_restart_compute(zenith_env_builder: ZenithEnvBuilder, with_wal_acceptor log.info(f"res = {r}") # And again remove data directory and restart - pg.stop_and_destroy().create_start('test_restart_compute') + pg.stop_and_destroy().create_start('test_restart_compute', timeline_id=new_timeline_id) with closing(pg.connect()) as conn: with conn.cursor() as cur: diff --git a/test_runner/batch_others/test_snapfiles_gc.py b/test_runner/batch_others/test_snapfiles_gc.py index c6d4512bc9..fb02e54be2 100644 --- a/test_runner/batch_others/test_snapfiles_gc.py +++ b/test_runner/batch_others/test_snapfiles_gc.py @@ -14,8 +14,8 @@ from fixtures.log_helper import log # def test_layerfiles_gc(zenith_simple_env: ZenithEnv): env = zenith_simple_env - env.zenith_cli.create_branch("test_layerfiles_gc", "empty") - pg = env.postgres.create_start('test_layerfiles_gc') + new_timeline_id = env.zenith_cli.branch_timeline() + pg = env.postgres.create_start('test_layerfiles_gc', timeline_id=new_timeline_id) with closing(pg.connect()) as conn: with conn.cursor() as cur: diff --git a/test_runner/batch_others/test_subxacts.py b/test_runner/batch_others/test_subxacts.py index bed1c4be63..6153bd1fe2 100644 --- a/test_runner/batch_others/test_subxacts.py +++ b/test_runner/batch_others/test_subxacts.py @@ -10,8 +10,8 @@ from fixtures.log_helper import log # CLOG. def test_subxacts(zenith_simple_env: ZenithEnv, test_output_dir): env = zenith_simple_env - env.zenith_cli.create_branch("test_subxacts", "empty") - pg = env.postgres.create_start('test_subxacts') + new_timeline_id = env.zenith_cli.branch_timeline() + pg = env.postgres.create_start('test_subxacts', timeline_id=new_timeline_id) log.info("postgres is running on 'test_subxacts' branch") pg_conn = pg.connect() diff --git a/test_runner/batch_others/test_tenant_relocation.py b/test_runner/batch_others/test_tenant_relocation.py index acff3ef62c..429aee8488 100644 --- a/test_runner/batch_others/test_tenant_relocation.py +++ b/test_runner/batch_others/test_tenant_relocation.py @@ -127,16 +127,14 @@ def test_tenant_relocation(zenith_env_builder: ZenithEnvBuilder, # create folder for remote storage mock remote_storage_mock_path = env.repo_dir / 'local_fs_remote_storage' - tenant = env.create_tenant(UUID("74ee8b079a0e437eb0afea7d26a07209")) + (tenant, _) = env.zenith_cli.create_tenant(UUID("74ee8b079a0e437eb0afea7d26a07209")) log.info("tenant to relocate %s", tenant) - env.zenith_cli.create_branch("test_tenant_relocation", "main", tenant_id=tenant) + new_timeline_id = env.zenith_cli.branch_timeline(tenant_id=tenant) - tenant_pg = env.postgres.create_start( - "test_tenant_relocation", - "main", # branch name, None means same as node name - tenant_id=tenant, - ) + tenant_pg = env.postgres.create_start("test_tenant_relocation", + tenant_id=tenant, + timeline_id=new_timeline_id) # insert some data with closing(tenant_pg.connect()) as conn: diff --git a/test_runner/batch_others/test_tenants.py b/test_runner/batch_others/test_tenants.py index b665ae9022..20a910e9ce 100644 --- a/test_runner/batch_others/test_tenants.py +++ b/test_runner/batch_others/test_tenants.py @@ -12,25 +12,23 @@ def test_tenants_normal_work(zenith_env_builder: ZenithEnvBuilder, with_wal_acce env = zenith_env_builder.init_start() """Tests tenants with and without wal acceptors""" - tenant_1 = env.create_tenant() - tenant_2 = env.create_tenant() + (tenant_1, initial_timeline_1) = env.zenith_cli.create_tenant() + (tenant_2, initial_timeline_2) = env.zenith_cli.create_tenant() - env.zenith_cli.create_branch(f"test_tenants_normal_work_with_wal_acceptors{with_wal_acceptors}", - "main", - tenant_id=tenant_1) - env.zenith_cli.create_branch(f"test_tenants_normal_work_with_wal_acceptors{with_wal_acceptors}", - "main", - tenant_id=tenant_2) + new_timeline_tenant_1 = env.zenith_cli.branch_timeline(tenant_id=tenant_1, + ancestor_timeline_id=initial_timeline_1) + new_timeline_tenant_2 = env.zenith_cli.branch_timeline(tenant_id=tenant_2, + ancestor_timeline_id=initial_timeline_2) pg_tenant1 = env.postgres.create_start( f"test_tenants_normal_work_with_wal_acceptors{with_wal_acceptors}", - None, # branch name, None means same as node name - tenant_1, + tenant_id=tenant_1, + timeline_id=new_timeline_tenant_1, ) pg_tenant2 = env.postgres.create_start( f"test_tenants_normal_work_with_wal_acceptors{with_wal_acceptors}", - None, # branch name, None means same as node name - tenant_2, + tenant_id=tenant_2, + timeline_id=new_timeline_tenant_2, ) for pg in [pg_tenant1, pg_tenant2]: diff --git a/test_runner/batch_others/test_timeline_size.py b/test_runner/batch_others/test_timeline_size.py index 2c31267922..49143d0000 100644 --- a/test_runner/batch_others/test_timeline_size.py +++ b/test_runner/batch_others/test_timeline_size.py @@ -10,13 +10,14 @@ import time def test_timeline_size(zenith_simple_env: ZenithEnv): env = zenith_simple_env # Branch at the point where only 100 rows were inserted - env.zenith_cli.create_branch("test_timeline_size", "empty") + new_timeline_id = env.zenith_cli.branch_timeline() client = env.pageserver.http_client() - res = client.branch_detail(env.initial_tenant, "test_timeline_size") + res = client.timeline_detail(tenant_id=env.initial_tenant, timeline_id=new_timeline_id) + print(f'@@@@@@@@@@\n{res}\n@@@@@@@@@@@') assert res["current_logical_size"] == res["current_logical_size_non_incremental"] - pgmain = env.postgres.create_start("test_timeline_size") + pgmain = env.postgres.create_start("test_timeline_size", timeline_id=new_timeline_id) log.info("postgres is running on 'test_timeline_size' branch") with closing(pgmain.connect()) as conn: @@ -31,11 +32,11 @@ def test_timeline_size(zenith_simple_env: ZenithEnv): FROM generate_series(1, 10) g """) - res = client.branch_detail(env.initial_tenant, "test_timeline_size") + res = client.timeline_detail(tenant_id=env.initial_tenant, timeline_id=new_timeline_id) assert res["current_logical_size"] == res["current_logical_size_non_incremental"] cur.execute("TRUNCATE foo") - res = client.branch_detail(env.initial_tenant, "test_timeline_size") + res = client.timeline_detail(tenant_id=env.initial_tenant, timeline_id=new_timeline_id) assert res["current_logical_size"] == res["current_logical_size_non_incremental"] @@ -68,17 +69,17 @@ def wait_for_pageserver_catchup(pgmain: Postgres, polling_interval=1, timeout=60 def test_timeline_size_quota(zenith_env_builder: ZenithEnvBuilder): zenith_env_builder.num_safekeepers = 1 env = zenith_env_builder.init_start() - env.zenith_cli.create_branch("test_timeline_size_quota", "main") + new_timeline_id = env.zenith_cli.branch_timeline() client = env.pageserver.http_client() - res = client.branch_detail(env.initial_tenant, "test_timeline_size_quota") + res = client.timeline_detail(tenant_id=env.initial_tenant, timeline_id=new_timeline_id) assert res["current_logical_size"] == res["current_logical_size_non_incremental"] pgmain = env.postgres.create_start( "test_timeline_size_quota", # Set small limit for the test config_lines=['zenith.max_cluster_size=30MB'], - ) + timeline_id=new_timeline_id) log.info("postgres is running on 'test_timeline_size_quota' branch") with closing(pgmain.connect()) as conn: diff --git a/test_runner/batch_others/test_twophase.py b/test_runner/batch_others/test_twophase.py index d6a1cd01e8..b479e9de22 100644 --- a/test_runner/batch_others/test_twophase.py +++ b/test_runner/batch_others/test_twophase.py @@ -9,9 +9,10 @@ from fixtures.log_helper import log # def test_twophase(zenith_simple_env: ZenithEnv): env = zenith_simple_env - env.zenith_cli.create_branch("test_twophase", "empty") - - pg = env.postgres.create_start('test_twophase', config_lines=['max_prepared_transactions=5']) + test_twophase_timeline_id = env.zenith_cli.branch_timeline() + pg = env.postgres.create_start('test_twophase', + config_lines=['max_prepared_transactions=5'], + timeline_id=test_twophase_timeline_id) log.info("postgres is running on 'test_twophase' branch") conn = pg.connect() @@ -56,12 +57,14 @@ def test_twophase(zenith_simple_env: ZenithEnv): assert len(twophase_files) == 2 # Create a branch with the transaction in prepared state - env.zenith_cli.create_branch("test_twophase_prepared", "test_twophase") + test_twophase_prepared_timeline_id = env.zenith_cli.branch_timeline( + ancestor_timeline_id=test_twophase_timeline_id) # Start compute on the new branch pg2 = env.postgres.create_start( 'test_twophase_prepared', config_lines=['max_prepared_transactions=5'], + timeline_id=test_twophase_prepared_timeline_id, ) # Check that we restored only needed twophase files diff --git a/test_runner/batch_others/test_vm_bits.py b/test_runner/batch_others/test_vm_bits.py index 49e48dd450..a657b3e3fd 100644 --- a/test_runner/batch_others/test_vm_bits.py +++ b/test_runner/batch_others/test_vm_bits.py @@ -9,8 +9,8 @@ from fixtures.log_helper import log def test_vm_bit_clear(zenith_simple_env: ZenithEnv): env = zenith_simple_env - env.zenith_cli.create_branch("test_vm_bit_clear", "empty") - pg = env.postgres.create_start('test_vm_bit_clear') + test_vm_bit_clear_timeline_id = env.zenith_cli.branch_timeline() + pg = env.postgres.create_start('test_vm_bit_clear', timeline_id=test_vm_bit_clear_timeline_id) log.info("postgres is running on 'test_vm_bit_clear' branch") pg_conn = pg.connect() @@ -33,7 +33,8 @@ def test_vm_bit_clear(zenith_simple_env: ZenithEnv): cur.execute('UPDATE vmtest_update SET id = 5000 WHERE id = 1') # Branch at this point, to test that later - env.zenith_cli.create_branch("test_vm_bit_clear_new", "test_vm_bit_clear") + test_vm_bit_clear_new_timeline_id = env.zenith_cli.branch_timeline( + ancestor_timeline_id=test_vm_bit_clear_timeline_id) # Clear the buffer cache, to force the VM page to be re-fetched from # the page server @@ -61,7 +62,8 @@ def test_vm_bit_clear(zenith_simple_env: ZenithEnv): # a dirty VM page is evicted. If the VM bit was not correctly cleared by the # earlier WAL record, the full-page image hides the problem. Starting a new # server at the right point-in-time avoids that full-page image. - pg_new = env.postgres.create_start('test_vm_bit_clear_new') + pg_new = env.postgres.create_start('test_vm_bit_clear_new', + timeline_id=test_vm_bit_clear_new_timeline_id) log.info("postgres is running on 'test_vm_bit_clear_new' branch") pg_new_conn = pg_new.connect() diff --git a/test_runner/batch_others/test_wal_acceptor.py b/test_runner/batch_others/test_wal_acceptor.py index c375c9626a..3e39228494 100644 --- a/test_runner/batch_others/test_wal_acceptor.py +++ b/test_runner/batch_others/test_wal_acceptor.py @@ -24,9 +24,8 @@ def test_normal_work(zenith_env_builder: ZenithEnvBuilder): zenith_env_builder.num_safekeepers = 3 env = zenith_env_builder.init_start() - env.zenith_cli.create_branch("test_wal_acceptors_normal_work", "main") - - pg = env.postgres.create_start('test_wal_acceptors_normal_work') + new_timeline_id = env.zenith_cli.branch_timeline() + pg = env.postgres.create_start('test_wal_acceptors_normal_work', timeline_id=new_timeline_id) with closing(pg.connect()) as conn: with conn.cursor() as cur: @@ -39,9 +38,9 @@ def test_normal_work(zenith_env_builder: ZenithEnvBuilder): @dataclass -class BranchMetrics: - name: str - latest_valid_lsn: int +class TimelineMetrics: + timeline_id: str + last_record_lsn: int # One entry per each Safekeeper, order is the same flush_lsns: List[int] = field(default_factory=list) commit_lsns: List[int] = field(default_factory=list) @@ -55,21 +54,26 @@ def test_many_timelines(zenith_env_builder: ZenithEnvBuilder): n_timelines = 3 - branches = ["test_wal_acceptors_many_timelines_{}".format(tlin) for tlin in range(n_timelines)] + branch_names = [ + "test_wal_acceptors_many_timelines_{}".format(tlin) for tlin in range(n_timelines) + ] + branch_names_to_timeline_ids = {} # start postgres on each timeline pgs = [] - for branch in branches: - env.zenith_cli.create_branch(branch, "main") - pgs.append(env.postgres.create_start(branch)) + for branch_name in branch_names: + new_timeline_id = env.zenith_cli.branch_timeline() + pgs.append(env.postgres.create_start(branch_name, timeline_id=new_timeline_id)) + branch_names_to_timeline_ids[branch_name] = new_timeline_id tenant_id = env.initial_tenant - def collect_metrics(message: str) -> List[BranchMetrics]: + def collect_metrics(message: str) -> List[TimelineMetrics]: with env.pageserver.http_client() as pageserver_http: - branch_details = [ - pageserver_http.branch_detail(tenant_id=tenant_id, name=branch) - for branch in branches + timeline_details = [ + pageserver_http.timeline_detail( + tenant_id=tenant_id, timeline_id=branch_names_to_timeline_ids[branch_name]) + for branch_name in branch_names ] # All changes visible to pageserver (latest_valid_lsn) should be # confirmed by safekeepers first. As we cannot atomically get @@ -80,14 +84,15 @@ def test_many_timelines(zenith_env_builder: ZenithEnvBuilder): # safekeepers' state, it will look contradictory. sk_metrics = [sk.http_client().get_metrics() for sk in env.safekeepers] - branch_metrics = [] + timeline_metrics = [] with env.pageserver.http_client() as pageserver_http: - for branch_detail in branch_details: - timeline_id: str = branch_detail["timeline_id"] + for timeline_detail in timeline_details: + print(f"@@@@@@@@@@@\n{timeline_detail}\n@@@@@@@@@@@") + timeline_id: str = timeline_detail["timeline_id"] - m = BranchMetrics( - name=branch_detail["name"], - latest_valid_lsn=branch_detail["latest_valid_lsn"], + m = TimelineMetrics( + timeline_id=timeline_id, + last_record_lsn=timeline_detail["last_record_lsn"], ) for sk_m in sk_metrics: m.flush_lsns.append(sk_m.flush_lsn_inexact[(tenant_id.hex, timeline_id)]) @@ -99,13 +104,13 @@ def test_many_timelines(zenith_env_builder: ZenithEnvBuilder): # We only call collect_metrics() after a transaction is confirmed by # the compute node, which only happens after a consensus of safekeepers # has confirmed the transaction. We assume majority consensus here. - assert (2 * sum(m.latest_valid_lsn <= lsn + assert (2 * sum(m.last_record_lsn <= lsn for lsn in m.flush_lsns) > zenith_env_builder.num_safekeepers) - assert (2 * sum(m.latest_valid_lsn <= lsn + assert (2 * sum(m.last_record_lsn <= lsn for lsn in m.commit_lsns) > zenith_env_builder.num_safekeepers) - branch_metrics.append(m) - log.info(f"{message}: {branch_metrics}") - return branch_metrics + timeline_metrics.append(m) + log.info(f"{message}: {timeline_metrics}") + return timeline_metrics # TODO: https://github.com/zenithdb/zenith/issues/809 # collect_metrics("before CREATE TABLE") @@ -117,7 +122,7 @@ def test_many_timelines(zenith_env_builder: ZenithEnvBuilder): pg.safe_psql("CREATE TABLE t(key int primary key, value text)") init_m = collect_metrics("after CREATE TABLE") - # Populate data for 2/3 branches + # Populate data for 2/3 timelines class MetricsChecker(threading.Thread): def __init__(self) -> None: super().__init__(daemon=True) @@ -155,15 +160,15 @@ def test_many_timelines(zenith_env_builder: ZenithEnvBuilder): collect_metrics("after INSERT INTO") - # Check data for 2/3 branches + # Check data for 2/3 timelines for pg in pgs[:-1]: res = pg.safe_psql("SELECT sum(key) FROM t") assert res[0] == (5000050000, ) final_m = collect_metrics("after SELECT") - # Assume that LSNs (a) behave similarly in all branches; and (b) INSERT INTO alters LSN significantly. + # Assume that LSNs (a) behave similarly in all timelines; and (b) INSERT INTO alters LSN significantly. # Also assume that safekeepers will not be significantly out of sync in this test. - middle_lsn = (init_m[0].latest_valid_lsn + final_m[0].latest_valid_lsn) // 2 + middle_lsn = (init_m[0].last_record_lsn + final_m[0].last_record_lsn) // 2 assert max(init_m[0].flush_lsns) < middle_lsn < min(final_m[0].flush_lsns) assert max(init_m[0].commit_lsns) < middle_lsn < min(final_m[0].commit_lsns) assert max(init_m[1].flush_lsns) < middle_lsn < min(final_m[1].flush_lsns) @@ -183,8 +188,8 @@ def test_restarts(zenith_env_builder: ZenithEnvBuilder): zenith_env_builder.num_safekeepers = n_acceptors env = zenith_env_builder.init_start() - env.zenith_cli.create_branch("test_wal_acceptors_restarts", "main") - pg = env.postgres.create_start('test_wal_acceptors_restarts') + new_timeline_id = env.zenith_cli.branch_timeline() + pg = env.postgres.create_start('test_wal_acceptors_restarts', timeline_id=new_timeline_id) # we rely upon autocommit after each statement # as waiting for acceptors happens there @@ -220,8 +225,8 @@ def test_unavailability(zenith_env_builder: ZenithEnvBuilder): zenith_env_builder.num_safekeepers = 2 env = zenith_env_builder.init_start() - env.zenith_cli.create_branch("test_wal_acceptors_unavailability", "main") - pg = env.postgres.create_start('test_wal_acceptors_unavailability') + new_timeline_id = env.zenith_cli.branch_timeline() + pg = env.postgres.create_start('test_wal_acceptors_unavailability', timeline_id=new_timeline_id) # we rely upon autocommit after each statement # as waiting for acceptors happens there @@ -291,8 +296,9 @@ def test_race_conditions(zenith_env_builder: ZenithEnvBuilder, stop_value): zenith_env_builder.num_safekeepers = 3 env = zenith_env_builder.init_start() - env.zenith_cli.create_branch("test_wal_acceptors_race_conditions", "main") - pg = env.postgres.create_start('test_wal_acceptors_race_conditions') + new_timeline_id = env.zenith_cli.branch_timeline() + pg = env.postgres.create_start('test_wal_acceptors_race_conditions', + timeline_id=new_timeline_id) # we rely upon autocommit after each statement # as waiting for acceptors happens there @@ -456,8 +462,8 @@ def test_timeline_status(zenith_env_builder: ZenithEnvBuilder): zenith_env_builder.num_safekeepers = 1 env = zenith_env_builder.init_start() - env.zenith_cli.create_branch("test_timeline_status", "main") - pg = env.postgres.create_start('test_timeline_status') + new_timeline_id = env.zenith_cli.branch_timeline() + pg = env.postgres.create_start('test_timeline_status', timeline_id=new_timeline_id) wa = env.safekeepers[0] wa_http_cli = wa.http_client() @@ -630,12 +636,12 @@ def test_replace_safekeeper(zenith_env_builder: ZenithEnvBuilder): zenith_env_builder.num_safekeepers = 4 env = zenith_env_builder.init_start() - env.zenith_cli.create_branch("test_replace_safekeeper", "main") + new_timeline_id = env.zenith_cli.branch_timeline() log.info("Use only first 3 safekeepers") env.safekeepers[3].stop() active_safekeepers = [1, 2, 3] - pg = env.postgres.create('test_replace_safekeeper') + pg = env.postgres.create('test_replace_safekeeper', timeline_id=new_timeline_id) pg.adjust_for_wal_acceptors(safekeepers_guc(env, active_safekeepers)) pg.start() @@ -673,7 +679,7 @@ def test_replace_safekeeper(zenith_env_builder: ZenithEnvBuilder): show_statuses(env.safekeepers, tenant_id, timeline_id) log.info("Recreate postgres to replace failed sk1 with new sk4") - pg.stop_and_destroy().create('test_replace_safekeeper') + pg.stop_and_destroy().create('test_replace_safekeeper', timeline_id=uuid.UUID(timeline_id)) active_safekeepers = [2, 3, 4] env.safekeepers[3].start() pg.adjust_for_wal_acceptors(safekeepers_guc(env, active_safekeepers)) diff --git a/test_runner/batch_others/test_wal_acceptor_async.py b/test_runner/batch_others/test_wal_acceptor_async.py index 4b6a27f73d..719e8c163f 100644 --- a/test_runner/batch_others/test_wal_acceptor_async.py +++ b/test_runner/batch_others/test_wal_acceptor_async.py @@ -202,8 +202,9 @@ def test_restarts_under_load(zenith_env_builder: ZenithEnvBuilder): zenith_env_builder.num_safekeepers = 3 env = zenith_env_builder.init_start() - env.zenith_cli.create_branch("test_wal_acceptors_restarts_under_load", "main") - pg = env.postgres.create_start('test_wal_acceptors_restarts_under_load') + new_timeline_id = env.zenith_cli.branch_timeline() + pg = env.postgres.create_start('test_wal_acceptors_restarts_under_load', + timeline_id=new_timeline_id) asyncio.run(run_restarts_under_load(pg, env.safekeepers)) diff --git a/test_runner/batch_others/test_zenith_cli.py b/test_runner/batch_others/test_zenith_cli.py index f1897e4b6f..4f089d4354 100644 --- a/test_runner/batch_others/test_zenith_cli.py +++ b/test_runner/batch_others/test_zenith_cli.py @@ -7,52 +7,47 @@ from fixtures.zenith_fixtures import ZenithEnv, ZenithEnvBuilder, ZenithPageserv from typing import cast -def helper_compare_branch_list(pageserver_http_client: ZenithPageserverHttpClient, - env: ZenithEnv, - initial_tenant: uuid.UUID): +def helper_compare_timeline_list(pageserver_http_client: ZenithPageserverHttpClient, + env: ZenithEnv, + initial_tenant: uuid.UUID): """ - Compare branches list returned by CLI and directly via API. - Filters out branches created by other tests. + Compare timelines list returned by CLI and directly via API. + Filters out timelines created by other tests. """ - branches = pageserver_http_client.branch_list(initial_tenant) - branches_api = sorted(map(lambda b: cast(str, b['name']), branches)) - branches_api = [b for b in branches_api if b.startswith('test_cli_') or b in ('empty', 'main')] - res = env.zenith_cli.list_branches() - branches_cli = sorted(map(lambda b: b.split(':')[-1].strip(), res.stdout.strip().split("\n"))) - branches_cli = [b for b in branches_cli if b.startswith('test_cli_') or b in ('empty', 'main')] - - res = env.zenith_cli.list_branches(tenant_id=initial_tenant) - branches_cli_with_tenant_arg = sorted( - map(lambda b: b.split(':')[-1].strip(), res.stdout.strip().split("\n"))) - branches_cli_with_tenant_arg = [ - b for b in branches_cli if b.startswith('test_cli_') or b in ('empty', 'main') + timelines_cli = env.zenith_cli.list_timelines() + timelines_cli = [ + b for b in timelines_cli if b.startswith('test_cli_') or b in ('empty', 'main') ] - assert branches_api == branches_cli == branches_cli_with_tenant_arg + timelines_cli_with_tenant_arg = env.zenith_cli.list_timelines(initial_tenant) + timelines_cli_with_tenant_arg = [ + b for b in timelines_cli if b.startswith('test_cli_') or b in ('empty', 'main') + ] + + assert timelines_cli == timelines_cli_with_tenant_arg -def test_cli_branch_list(zenith_simple_env: ZenithEnv): +def test_cli_timeline_list(zenith_simple_env: ZenithEnv): env = zenith_simple_env pageserver_http_client = env.pageserver.http_client() # Initial sanity check - helper_compare_branch_list(pageserver_http_client, env, env.initial_tenant) - env.zenith_cli.create_branch("test_cli_branch_list_main", "empty") - helper_compare_branch_list(pageserver_http_client, env, env.initial_tenant) + helper_compare_timeline_list(pageserver_http_client, env, env.initial_tenant) + + # Create a branch for us + main_timeline_id = env.zenith_cli.branch_timeline() + helper_compare_timeline_list(pageserver_http_client, env, env.initial_tenant) # Create a nested branch - res = env.zenith_cli.create_branch("test_cli_branch_list_nested", "test_cli_branch_list_main") - assert res.stderr == '' - helper_compare_branch_list(pageserver_http_client, env, env.initial_tenant) + nested_timeline_id = env.zenith_cli.branch_timeline(ancestor_timeline_id=main_timeline_id) + helper_compare_timeline_list(pageserver_http_client, env, env.initial_tenant) # Check that all new branches are visible via CLI - res = env.zenith_cli.list_branches() - assert res.stderr == '' - branches_cli = sorted(map(lambda b: b.split(':')[-1].strip(), res.stdout.strip().split("\n"))) + timelines_cli = env.zenith_cli.list_timelines() - assert 'test_cli_branch_list_main' in branches_cli - assert 'test_cli_branch_list_nested' in branches_cli + assert main_timeline_id.hex in timelines_cli + assert nested_timeline_id.hex in timelines_cli def helper_compare_tenant_list(pageserver_http_client: ZenithPageserverHttpClient, env: ZenithEnv): @@ -60,7 +55,6 @@ def helper_compare_tenant_list(pageserver_http_client: ZenithPageserverHttpClien tenants_api = sorted(map(lambda t: cast(str, t['id']), tenants)) res = env.zenith_cli.list_tenants() - assert res.stderr == '' tenants_cli = sorted(map(lambda t: t.split()[0], res.stdout.splitlines())) assert tenants_api == tenants_cli @@ -74,14 +68,14 @@ def test_cli_tenant_list(zenith_simple_env: ZenithEnv): # Create new tenant tenant1 = uuid.uuid4() - env.zenith_cli.create_tenant(tenant1) + env.zenith_cli.create_tenant(tenant_id=tenant1) # check tenant1 appeared helper_compare_tenant_list(pageserver_http_client, env) # Create new tenant tenant2 = uuid.uuid4() - env.zenith_cli.create_tenant(tenant2) + env.zenith_cli.create_tenant(tenant_id=tenant2) # check tenant2 appeared helper_compare_tenant_list(pageserver_http_client, env) diff --git a/test_runner/batch_pg_regress/test_isolation.py b/test_runner/batch_pg_regress/test_isolation.py index ddafc3815b..8dce020dc0 100644 --- a/test_runner/batch_pg_regress/test_isolation.py +++ b/test_runner/batch_pg_regress/test_isolation.py @@ -7,10 +7,12 @@ from fixtures.zenith_fixtures import ZenithEnv, base_dir, pg_distrib_dir def test_isolation(zenith_simple_env: ZenithEnv, test_output_dir, pg_bin, capsys): env = zenith_simple_env - env.zenith_cli.create_branch("test_isolation", "empty") + new_timeline_id = env.zenith_cli.branch_timeline() # Connect to postgres and create a database called "regression". # isolation tests use prepared transactions, so enable them - pg = env.postgres.create_start('test_isolation', config_lines=['max_prepared_transactions=100']) + pg = env.postgres.create_start('test_isolation', + config_lines=['max_prepared_transactions=100'], + timeline_id=new_timeline_id) pg.safe_psql('CREATE DATABASE isolation_regression') # Create some local directories for pg_isolation_regress to run in. diff --git a/test_runner/batch_pg_regress/test_pg_regress.py b/test_runner/batch_pg_regress/test_pg_regress.py index 5199f65216..efeb63fce3 100644 --- a/test_runner/batch_pg_regress/test_pg_regress.py +++ b/test_runner/batch_pg_regress/test_pg_regress.py @@ -7,9 +7,9 @@ from fixtures.zenith_fixtures import ZenithEnv, check_restored_datadir_content, def test_pg_regress(zenith_simple_env: ZenithEnv, test_output_dir: str, pg_bin, capsys): env = zenith_simple_env - env.zenith_cli.create_branch("test_pg_regress", "empty") + new_timeline_id = env.zenith_cli.branch_timeline() # Connect to postgres and create a database called "regression". - pg = env.postgres.create_start('test_pg_regress') + pg = env.postgres.create_start('test_pg_regress', timeline_id=new_timeline_id) pg.safe_psql('CREATE DATABASE regression') # Create some local directories for pg_regress to run in. diff --git a/test_runner/batch_pg_regress/test_zenith_regress.py b/test_runner/batch_pg_regress/test_zenith_regress.py index 31d5b07093..2ccbafccfd 100644 --- a/test_runner/batch_pg_regress/test_zenith_regress.py +++ b/test_runner/batch_pg_regress/test_zenith_regress.py @@ -11,9 +11,9 @@ from fixtures.log_helper import log def test_zenith_regress(zenith_simple_env: ZenithEnv, test_output_dir, pg_bin, capsys): env = zenith_simple_env - env.zenith_cli.create_branch("test_zenith_regress", "empty") + new_timeline_id = env.zenith_cli.branch_timeline() # Connect to postgres and create a database called "regression". - pg = env.postgres.create_start('test_zenith_regress') + pg = env.postgres.create_start('test_zenith_regress', timeline_id=new_timeline_id) pg.safe_psql('CREATE DATABASE regression') # Create some local directories for pg_regress to run in. diff --git a/test_runner/fixtures/compare_fixtures.py b/test_runner/fixtures/compare_fixtures.py index 570c787184..66b9fe54ea 100644 --- a/test_runner/fixtures/compare_fixtures.py +++ b/test_runner/fixtures/compare_fixtures.py @@ -64,9 +64,8 @@ class ZenithCompare(PgCompare): self._pg_bin = pg_bin # We only use one branch and one timeline - self.branch = branch_name - self.env.zenith_cli.create_branch(self.branch, "empty") - self._pg = self.env.postgres.create_start(self.branch) + timeline_id = self.env.zenith_cli.branch_timeline() + self._pg = self.env.postgres.create_start("branch", timeline_id=timeline_id) self.timeline = self.pg.safe_psql("SHOW zenith.zenith_timeline")[0][0] # Long-lived cursor, useful for flushing diff --git a/test_runner/fixtures/zenith_fixtures.py b/test_runner/fixtures/zenith_fixtures.py index 06f75aa604..7c4d178a3f 100644 --- a/test_runner/fixtures/zenith_fixtures.py +++ b/test_runner/fixtures/zenith_fixtures.py @@ -548,8 +548,7 @@ class ZenithEnv: self.s3_mock_server = config.s3_mock_server self.zenith_cli = ZenithCli(env=self) - self.postgres = PostgresFactory(self) - + self.zenith_cli = ZenithCli(env=self) self.safekeepers: List[Safekeeper] = [] # generate initial tenant ID here instead of letting 'zenith init' generate it, @@ -558,7 +557,7 @@ class ZenithEnv: # Create a config file corresponding to the options toml = textwrap.dedent(f""" - default_tenantid = '{self.initial_tenant.hex}' + default_tenant_id = '{self.initial_tenant.hex}' """) # Create config for pageserver @@ -600,8 +599,9 @@ class ZenithEnv: self.safekeepers.append(safekeeper) log.info(f"Config: {toml}") - - self.zenith_cli.init(toml) + # TODO kb is this a wrong concept? will break for multiple tenant tests + self.initial_timeline = self.zenith_cli.init(toml) + self.postgres = PostgresFactory(self) def start(self): # Start up the page server and all the safekeepers @@ -614,12 +614,6 @@ class ZenithEnv: """ Get list of safekeeper endpoints suitable for wal_acceptors GUC """ return ','.join([f'localhost:{wa.port.pg}' for wa in self.safekeepers]) - def create_tenant(self, tenant_id: Optional[uuid.UUID] = None) -> uuid.UUID: - if tenant_id is None: - tenant_id = uuid.uuid4() - self.zenith_cli.create_tenant(tenant_id) - return tenant_id - @cached_property def auth_keys(self) -> AuthKeys: pub = (Path(self.repo_dir) / 'auth_public_key.pem').read_bytes() @@ -643,14 +637,7 @@ def _shared_simple_env(request: Any, port_distributor) -> Iterator[ZenithEnv]: shutil.rmtree(repo_dir, ignore_errors=True) with ZenithEnvBuilder(Path(repo_dir), port_distributor) as builder: - - env = builder.init_start() - - # For convenience in tests, create a branch from the freshly-initialized cluster. - env.zenith_cli.create_branch("empty", "main") - - # Return the builder to the caller - yield env + yield builder.init_start() @pytest.fixture(scope='function') @@ -729,34 +716,27 @@ class ZenithPageserverHttpClient(requests.Session): f"http://localhost:{self.port}/v1/timeline/{tenant_id.hex}/{timeline_id.hex}/detach", ) self.verbose_error(res) - def branch_list(self, tenant_id: uuid.UUID) -> List[Dict[Any, Any]]: - res = self.get(f"http://localhost:{self.port}/v1/branch/{tenant_id.hex}") - self.verbose_error(res) - res_json = res.json() - assert isinstance(res_json, list) - return res_json - - def branch_create(self, tenant_id: uuid.UUID, name: str, start_point: str) -> Dict[Any, Any]: - res = self.post(f"http://localhost:{self.port}/v1/branch", + def timeline_create(self, + tenant_id: uuid.UUID, + timeline_id: uuid.UUID, + start_lsn: Optional[str] = None, + ancestor_timeline_id: Optional[uuid.UUID] = None) -> Dict[Any, Any]: + res = self.post(f"http://localhost:{self.port}/v1/timeline", json={ - 'tenant_id': tenant_id.hex, - 'name': name, - 'start_point': start_point, + 'tenant_id': + tenant_id.hex, + 'timeline_id': + timeline_id.hex, + 'start_lsn': + start_lsn, + 'ancestor_timeline_id': + ancestor_timeline_id.hex if ancestor_timeline_id else None, }) self.verbose_error(res) res_json = res.json() assert isinstance(res_json, dict) return res_json - def branch_detail(self, tenant_id: uuid.UUID, name: str) -> Dict[Any, Any]: - res = self.get( - f"http://localhost:{self.port}/v1/branch/{tenant_id.hex}/{name}?include-non-incremental-logical-size=1", - ) - self.verbose_error(res) - res_json = res.json() - assert isinstance(res_json, dict) - return res_json - def tenant_list(self) -> List[Dict[Any, Any]]: res = self.get(f"http://localhost:{self.port}/v1/tenant") self.verbose_error(res) @@ -774,7 +754,7 @@ class ZenithPageserverHttpClient(requests.Session): self.verbose_error(res) return res.json() - def timeline_list(self, tenant_id: uuid.UUID) -> List[str]: + def timeline_list(self, tenant_id: uuid.UUID) -> List[Dict[Any, Any]]: res = self.get(f"http://localhost:{self.port}/v1/timeline/{tenant_id.hex}") self.verbose_error(res) res_json = res.json() @@ -783,7 +763,8 @@ class ZenithPageserverHttpClient(requests.Session): def timeline_detail(self, tenant_id: uuid.UUID, timeline_id: uuid.UUID) -> Dict[Any, Any]: res = self.get( - f"http://localhost:{self.port}/v1/timeline/{tenant_id.hex}/{timeline_id.hex}") + f"http://localhost:{self.port}/v1/timeline/{tenant_id.hex}/{timeline_id.hex}?include-non-incremental-logical-size=1" + ) self.verbose_error(res) res_json = res.json() assert isinstance(res_json, dict) @@ -827,34 +808,76 @@ class ZenithCli: self.env = env pass - def create_tenant(self, tenant_id: Optional[uuid.UUID] = None) -> uuid.UUID: + def create_tenant(self, tenant_id: Optional[uuid.UUID] = None) -> tuple[uuid.UUID, uuid.UUID]: + """ + Creates a new tenant, returns its id and its initial timeline's id. + """ if tenant_id is None: tenant_id = uuid.uuid4() - self.raw_cli(['tenant', 'create', tenant_id.hex]) - return tenant_id + res = self.raw_cli(['tenant', 'create', '--tenant-id', tenant_id.hex]) + + initial_timeline_id_extractor = re.compile(r"initial timeline: '(?P[^']+)'", + re.MULTILINE) + matches = initial_timeline_id_extractor.search(res.stdout) + + created_timeline_id = None + if matches is not None: + created_timeline_id = matches.group('timeline_id') + + if created_timeline_id is None: + raise Exception('could not find timeline id after `zenith tenant create` invocation') + else: + return (tenant_id, uuid.UUID(created_timeline_id)) def list_tenants(self) -> 'subprocess.CompletedProcess[str]': - return self.raw_cli(['tenant', 'list']) + res = self.raw_cli(['tenant', 'list']) + res.check_returncode() + return res - def create_branch(self, - branch_name: str, - starting_point: str, - tenant_id: Optional[uuid.UUID] = None) -> 'subprocess.CompletedProcess[str]': - args = ['branch'] - if tenant_id is not None: - args.extend(['--tenantid', tenant_id.hex]) - args.extend([branch_name, starting_point]) + def branch_timeline(self, + tenant_id: Optional[uuid.UUID] = None, + new_timeline_id: Optional[uuid.UUID] = None, + ancestor_timeline_id: Optional[uuid.UUID] = None, + ancestor_start_lsn: Optional[str] = None) -> uuid.UUID: + cmd = [ + 'timeline', + 'branch', + '--tenant-id', + (tenant_id or self.env.initial_tenant).hex, + '--ancestor-timeline-id', + (ancestor_timeline_id or self.env.initial_timeline).hex, + ] + if ancestor_start_lsn is not None: + cmd.extend(['--ancestor-start-lsn', ancestor_start_lsn]) + if new_timeline_id is not None: + cmd.extend(['--timeline-id', new_timeline_id.hex]) - return self.raw_cli(args) + completed_process = self.raw_cli(cmd) + completed_process.check_returncode() + create_timeline_id_extractor = re.compile(r"^Created timeline '(?P[^']+)'", + re.MULTILINE) + matches = create_timeline_id_extractor.search(completed_process.stdout) - def list_branches(self, - tenant_id: Optional[uuid.UUID] = None) -> 'subprocess.CompletedProcess[str]': - args = ['branch'] - if tenant_id is not None: - args.extend(['--tenantid', tenant_id.hex]) - return self.raw_cli(args) + created_timeline_id = None + if matches is not None: + created_timeline_id = matches.group('timeline_id') + + if created_timeline_id is None: + raise Exception('could not find timeline id after `zenith timeline create` invocation') + else: + return uuid.UUID(created_timeline_id) + + def list_timelines(self, tenant_id: Optional[uuid.UUID] = None) -> List[str]: + res = self.raw_cli( + ['timeline', 'list', '--tenant-id', (tenant_id or self.env.initial_tenant).hex]) + branches_cli = sorted( + map(lambda b: b.split(') ')[-1].strip().split(':')[-1].strip(), + res.stdout.strip().split("\n"))) + return branches_cli + + def init(self, config_toml: str) -> uuid.UUID: + initial_timeline = None - def init(self, config_toml: str) -> 'subprocess.CompletedProcess[str]': with tempfile.NamedTemporaryFile(mode='w+') as tmp: tmp.write(config_toml) tmp.flush() @@ -864,7 +887,18 @@ class ZenithCli: self.env.pageserver.remote_storage, self.env.pageserver.config_override) - return self.raw_cli(cmd) + completed_process = self.raw_cli(cmd) + completed_process.check_returncode() + init_timeline_id_extractor = re.compile( + r'^created initial timeline (?P[^\s]+)\s', re.MULTILINE) + matches = init_timeline_id_extractor.search(completed_process.stdout) + if matches is not None: + initial_timeline = matches.group('timeline_id') + + if initial_timeline is None: + raise Exception('could not find timeline id after `zenith init` invocation') + else: + return uuid.UUID(initial_timeline) def pageserver_start(self, overrides=()) -> 'subprocess.CompletedProcess[str]': start_args = ['pageserver', 'start', *overrides] @@ -898,36 +932,50 @@ class ZenithCli: self, node_name: str, tenant_id: Optional[uuid.UUID] = None, - timeline_spec: Optional[str] = None, + timeline_id: Optional[uuid.UUID] = None, + lsn: Optional[str] = None, port: Optional[int] = None, ) -> 'subprocess.CompletedProcess[str]': - args = ['pg', 'create'] - if tenant_id is not None: - args.extend(['--tenantid', tenant_id.hex]) + args = [ + 'pg', + 'create', + '--tenant-id', (tenant_id or self.env.initial_tenant).hex, + '--timeline-id', (timeline_id or self.env.initial_timeline).hex + ] + if lsn is not None: + args.append(f'--lsn={lsn}') if port is not None: args.append(f'--port={port}') args.append(node_name) - if timeline_spec is not None: - args.append(timeline_spec) - return self.raw_cli(args) + res = self.raw_cli(args) + res.check_returncode() + return res def pg_start( self, node_name: str, tenant_id: Optional[uuid.UUID] = None, - timeline_spec: Optional[str] = None, + timeline_id: Optional[uuid.UUID] = None, + lsn: Optional[str] = None, port: Optional[int] = None, ) -> 'subprocess.CompletedProcess[str]': - args = ['pg', 'start'] - if tenant_id is not None: - args.extend(['--tenantid', tenant_id.hex]) + args = [ + 'pg', + 'start', + '--tenant-id', + (tenant_id or self.env.initial_tenant).hex, + '--timeline-id', + (timeline_id or self.env.initial_timeline).hex, + ] + if lsn is not None: + args.append(f'--lsn={lsn}') if port is not None: args.append(f'--port={port}') args.append(node_name) - if timeline_spec is not None: - args.append(timeline_spec) - return self.raw_cli(args) + res = self.raw_cli(args) + res.check_returncode() + return res def pg_stop( self, @@ -935,9 +983,7 @@ class ZenithCli: tenant_id: Optional[uuid.UUID] = None, destroy=False, ) -> 'subprocess.CompletedProcess[str]': - args = ['pg', 'stop'] - if tenant_id is not None: - args.extend(['--tenantid', tenant_id.hex]) + args = ['pg', 'stop', f'--tenant-id={(tenant_id or self.env.initial_tenant).hex}'] if destroy: args.append('--destroy') args.append(node_name) @@ -1044,7 +1090,6 @@ class ZenithPageserver(PgProtocol): if self.running: self.env.zenith_cli.pageserver_stop(immediate) self.running = False - return self def __enter__(self): @@ -1261,7 +1306,8 @@ class Postgres(PgProtocol): def create( self, node_name: str, - branch: Optional[str] = None, + timeline_id: uuid.UUID, + lsn: Optional[str] = None, config_lines: Optional[List[str]] = None, ) -> 'Postgres': """ @@ -1272,13 +1318,11 @@ class Postgres(PgProtocol): if not config_lines: config_lines = [] - if branch is None: - branch = node_name - self.env.zenith_cli.pg_create(node_name, + timeline_id=timeline_id, tenant_id=self.tenant_id, - port=self.port, - timeline_spec=branch) + lsn=lsn, + port=self.port) self.node_name = node_name path = pathlib.Path('pgdatadirs') / 'tenants' / self.tenant_id.hex / self.node_name self.pgdata_dir = os.path.join(self.env.repo_dir, path) @@ -1375,7 +1419,7 @@ class Postgres(PgProtocol): if self.running: assert self.node_name is not None - self.env.zenith_cli.pg_stop(self.node_name, tenant_id=self.tenant_id) + self.env.zenith_cli.pg_stop(self.node_name, self.tenant_id) self.running = False return self @@ -1387,7 +1431,7 @@ class Postgres(PgProtocol): """ assert self.node_name is not None - self.env.zenith_cli.pg_stop(self.node_name, self.tenant_id, destroy=True) + self.env.zenith_cli.pg_stop(self.node_name, self.tenant_id, True) self.node_name = None return self @@ -1395,7 +1439,8 @@ class Postgres(PgProtocol): def create_start( self, node_name: str, - branch: Optional[str] = None, + timeline_id: uuid.UUID, + lsn: Optional[str] = None, config_lines: Optional[List[str]] = None, ) -> 'Postgres': """ @@ -1406,8 +1451,9 @@ class Postgres(PgProtocol): self.create( node_name=node_name, - branch=branch, + timeline_id=timeline_id, config_lines=config_lines, + lsn=lsn, ).start() return self @@ -1428,8 +1474,9 @@ class PostgresFactory: def create_start(self, node_name: str = "main", - branch: Optional[str] = None, tenant_id: Optional[uuid.UUID] = None, + timeline_id: Optional[uuid.UUID] = None, + lsn: Optional[str] = None, config_lines: Optional[List[str]] = None) -> Postgres: pg = Postgres( @@ -1442,14 +1489,16 @@ class PostgresFactory: return pg.create_start( node_name=node_name, - branch=branch, + timeline_id=timeline_id or self.env.initial_timeline, config_lines=config_lines, + lsn=lsn, ) def create(self, node_name: str = "main", - branch: Optional[str] = None, tenant_id: Optional[uuid.UUID] = None, + timeline_id: Optional[uuid.UUID] = None, + lsn: Optional[str] = None, config_lines: Optional[List[str]] = None) -> Postgres: pg = Postgres( @@ -1463,7 +1512,8 @@ class PostgresFactory: return pg.create( node_name=node_name, - branch=branch, + timeline_id=timeline_id or self.env.initial_timeline, + lsn=lsn, config_lines=config_lines, ) @@ -1683,8 +1733,7 @@ def list_files_to_compare(pgdata_dir: str): # pg is the existing and running compute node, that we want to compare with a basebackup def check_restored_datadir_content(test_output_dir: str, env: ZenithEnv, pg: Postgres): - - # Get the timeline ID of our branch. We need it for the 'basebackup' command + # Get the timeline ID. We need it for the 'basebackup' command with closing(pg.connect()) as conn: with conn.cursor() as cur: cur.execute("SHOW zenith.zenith_timeline") diff --git a/test_runner/performance/test_bulk_tenant_create.py b/test_runner/performance/test_bulk_tenant_create.py index 0247385211..dda31ba692 100644 --- a/test_runner/performance/test_bulk_tenant_create.py +++ b/test_runner/performance/test_bulk_tenant_create.py @@ -30,11 +30,9 @@ def test_bulk_tenant_create( for i in range(tenants_count): start = timeit.default_timer() - tenant = env.create_tenant() - env.zenith_cli.create_branch( - f"test_bulk_tenant_create_{tenants_count}_{i}_{use_wal_acceptors}", - "main", - tenant_id=tenant) + (tenant, tenant_initial_timeline_id) = env.zenith_cli.create_tenant() + new_timeline_id = env.zenith_cli.branch_timeline( + tenant_id=tenant, ancestor_timeline_id=tenant_initial_timeline_id) # FIXME: We used to start new safekeepers here. Did that make sense? Should we do it now? #if use_wal_acceptors == 'with_wa': @@ -42,9 +40,8 @@ def test_bulk_tenant_create( pg_tenant = env.postgres.create_start( f"test_bulk_tenant_create_{tenants_count}_{i}_{use_wal_acceptors}", - None, # branch name, None means same as node name tenant, - ) + timeline_id=new_timeline_id) end = timeit.default_timer() time_slices.append(end - start) diff --git a/test_runner/performance/test_parallel_copy_to.py b/test_runner/performance/test_parallel_copy_to.py index e4388ce8e2..0ee0a37ebb 100644 --- a/test_runner/performance/test_parallel_copy_to.py +++ b/test_runner/performance/test_parallel_copy_to.py @@ -1,6 +1,5 @@ from io import BytesIO import asyncio -import asyncpg from fixtures.zenith_fixtures import ZenithEnv, Postgres, PgProtocol from fixtures.log_helper import log from fixtures.benchmark_fixture import MetricReport, ZenithBenchmarker diff --git a/test_runner/test_broken.py b/test_runner/test_broken.py index 56c735e87c..994544666b 100644 --- a/test_runner/test_broken.py +++ b/test_runner/test_broken.py @@ -21,8 +21,8 @@ run_broken = pytest.mark.skipif(os.environ.get('RUN_BROKEN') is None, def test_broken(zenith_simple_env: ZenithEnv, pg_bin): env = zenith_simple_env - env.zenith_cli.create_branch("test_broken", "empty") - env.postgres.create_start("test_broken") + new_timeline_id = env.zenith_cli.branch_timeline() + env.postgres.create_start("test_broken", timeline_id=new_timeline_id) log.info('postgres is running') log.info('THIS NEXT COMMAND WILL FAIL:') diff --git a/zenith/src/main.rs b/zenith/src/main.rs index 7170653754..dcfeb63309 100644 --- a/zenith/src/main.rs +++ b/zenith/src/main.rs @@ -9,7 +9,7 @@ use pageserver::config::defaults::{ DEFAULT_HTTP_LISTEN_ADDR as DEFAULT_PAGESERVER_HTTP_ADDR, DEFAULT_PG_LISTEN_ADDR as DEFAULT_PAGESERVER_PG_ADDR, }; -use std::collections::HashMap; +use std::collections::{BTreeSet, HashMap}; use std::process::exit; use std::str::FromStr; use walkeeper::defaults::{ @@ -60,7 +60,7 @@ struct TimelineTreeEl { /// `TimelineInfo` received from the `pageserver` via the `timeline_list` libpq API call. pub info: TimelineInfo, /// Holds all direct children of this timeline referenced using `timeline_id`. - pub children: Vec, + pub children: BTreeSet, } // Main entry point for the 'zenith' CLI utility @@ -71,25 +71,18 @@ struct TimelineTreeEl { // * Providing CLI api to the pageserver // * TODO: export/import to/from usual postgres fn main() -> Result<()> { - #[rustfmt::skip] // rustfmt squashes these into a single line otherwise - let pg_node_arg = Arg::new("node") - .index(1) - .help("Node name") - .required(true); + let pg_node_arg = Arg::new("node").help("Node name").required(true); - #[rustfmt::skip] - let safekeeper_id_arg = Arg::new("id") - .index(1) - .help("safekeeper id") + let safekeeper_id_arg = Arg::new("id").help("safekeeper id").required(false); + + let timeline_id_arg = Arg::new("timeline-id") + .long("timeline-id") + .help("Timeline id. Represented as a hexadecimal string 32 symbols length") + .takes_value(true) .required(false); - let timeline_arg = Arg::new("timeline") - .index(2) - .help("Timeline id or a point-in time specification") - .required(false); - - let tenantid_arg = Arg::new("tenantid") - .long("tenantid") + let tenant_id_arg = Arg::new("tenant-id") + .long("tenant-id") .help("Tenant id. Represented as a hexadecimal string 32 symbols length") .takes_value(true) .required(false); @@ -115,6 +108,12 @@ fn main() -> Result<()> { .help("Additional pageserver's configuration options or overrides, refer to pageserver's 'config-override' CLI parameter docs for more") .required(false); + let lsn_arg = Arg::new("lsn") + .long("lsn") + .help("Specify Lsn on the timeline to start from. By default, end of the timeline would be used.") + .takes_value(true) + .required(false); + let matches = App::new("Zenith CLI") .setting(AppSettings::ArgRequiredElseHelp) .version(GIT_VERSION) @@ -131,16 +130,28 @@ fn main() -> Result<()> { ) .subcommand( App::new("timeline") - .about("Create a new timeline") - .arg(Arg::new("timeline-name").required(false).index(1)) - .arg(Arg::new("start-point").required(false).index(2)) - .arg(tenantid_arg.clone()), + .about("Manage timelines") + .subcommand(App::new("list") + .about("List all timelines, available to this pageserver") + .arg(tenant_id_arg.clone())) + .subcommand(App::new("branch") + .about("Create a new timeline, using another timeline as a base, copying its data") + .arg(tenant_id_arg.clone()) + .arg(timeline_id_arg.clone().help("Id of the new timeline, optional. If not specified, it will be generated randomly")) + .arg(Arg::new("ancestor-timeline-id").long("ancestor-timeline-id").takes_value(true) + .help("Use last Lsn of another timeline (and its data) as base when creating the new timeline").required(false)) + .arg(Arg::new("ancestor-start-lsn").long("ancestor-start-lsn").takes_value(true) + .help("When using another timeline as base, use a specific Lsn in it instead of the latest one").required(false))) + .subcommand(App::new("create") + .about("Create a new blank timeline") + .arg(tenant_id_arg.clone()) + .arg(timeline_id_arg.clone().help("Id of the new timeline, optional. If not specified, it will be generated randomly"))) ).subcommand( App::new("tenant") .setting(AppSettings::ArgRequiredElseHelp) .about("Manage tenants") .subcommand(App::new("list")) - .subcommand(App::new("create").arg(Arg::new("tenantid").required(false).index(1))) + .subcommand(App::new("create").arg(tenant_id_arg.clone())) ) .subcommand( App::new("pageserver") @@ -175,12 +186,13 @@ fn main() -> Result<()> { App::new("pg") .setting(AppSettings::ArgRequiredElseHelp) .about("Manage postgres instances") - .subcommand(App::new("list").arg(tenantid_arg.clone())) + .subcommand(App::new("list").arg(tenant_id_arg.clone())) .subcommand(App::new("create") .about("Create a postgres compute node") .arg(pg_node_arg.clone()) - .arg(timeline_arg.clone()) - .arg(tenantid_arg.clone()) + .arg(timeline_id_arg.clone()) + .arg(tenant_id_arg.clone()) + .arg(lsn_arg.clone()) .arg(port_arg.clone()) .arg( Arg::new("config-only") @@ -191,14 +203,14 @@ fn main() -> Result<()> { .subcommand(App::new("start") .about("Start a postgres compute node.\n This command actually creates new node from scratch, but preserves existing config files") .arg(pg_node_arg.clone()) - .arg(timeline_arg.clone()) - .arg(tenantid_arg.clone()) + .arg(timeline_id_arg.clone()) + .arg(tenant_id_arg.clone()) + .arg(lsn_arg.clone()) .arg(port_arg.clone())) .subcommand( App::new("stop") .arg(pg_node_arg.clone()) - .arg(timeline_arg.clone()) - .arg(tenantid_arg.clone()) + .arg(tenant_id_arg.clone()) .arg( Arg::new("destroy") .help("Also delete data directory (now optional, should be default in future)") @@ -230,7 +242,7 @@ fn main() -> Result<()> { handle_init(sub_args) } else { // all other commands need an existing config - let env = match LocalEnv::load_config() { + let mut env = match LocalEnv::load_config() { Ok(conf) => conf, Err(e) => { eprintln!("Error loading config: {}", e); @@ -239,7 +251,7 @@ fn main() -> Result<()> { }; match sub_name { - "tenant" => handle_tenant(sub_args, &env), + "tenant" => handle_tenant(sub_args, &mut env), "timeline" => handle_timeline(sub_args, &env), "start" => handle_start_all(sub_args, &env), "stop" => handle_stop_all(sub_args, &env), @@ -261,39 +273,44 @@ fn main() -> Result<()> { /// Prints timelines list as a tree-like structure. /// fn print_timelines_tree(timelines: Vec) -> Result<()> { - let mut timelines_hash: HashMap = timelines + let mut timelines_hash = timelines .iter() .map(|t| { ( - t.timeline_id.to_string(), + t.timeline_id(), TimelineTreeEl { info: t.clone(), - children: Vec::new(), + children: BTreeSet::new(), }, ) }) - .collect(); + .collect::>(); // Memorize all direct children of each timeline. for timeline in &timelines { - if let Some(tid) = &timeline.ancestor_id { + if let TimelineInfo::Local { + ancestor_timeline_id: Some(tid), + .. + } = timeline + { timelines_hash .get_mut(tid) .context("missing timeline info in the HashMap")? .children - .push(timeline.timeline_id.to_string()); + .insert(timeline.timeline_id()); } } - // Sort children by tid to bring some minimal order. - for timeline in &mut timelines_hash.values_mut() { - timeline.children.sort(); - } - for timeline in timelines_hash.values() { - // Start with root timelines (no ancestors) first. - if timeline.info.ancestor_id.is_none() { - print_timeline(0, &Vec::from([true]), timeline, &timelines_hash)?; + // Start with root local timelines (no ancestors) first. + if let TimelineInfo::Local { + ancestor_timeline_id, + .. + } = &timeline.info + { + if ancestor_timeline_id.is_none() { + print_timeline(0, &Vec::from([true]), timeline, &timelines_hash)?; + } } } @@ -307,17 +324,22 @@ fn print_timeline( nesting_level: usize, is_last: &[bool], timeline: &TimelineTreeEl, - timelines: &HashMap, + timelines: &HashMap, ) -> Result<()> { + let local_or_remote = match timeline.info { + TimelineInfo::Local { .. } => "(L)", + TimelineInfo::Remote { .. } => "(R)", + }; // Draw main padding - print!(" "); + print!("{} ", local_or_remote); if nesting_level > 0 { - let lsn = timeline - .info - .ancestor_lsn - .as_ref() - .context("missing timeline info in the HashMap")?; + let lsn_string = match timeline.info { + TimelineInfo::Local { ancestor_lsn, .. } => ancestor_lsn + .map(|lsn| lsn.to_string()) + .unwrap_or_else(|| "Unknown local Lsn".to_string()), + TimelineInfo::Remote { .. } => "unknown Lsn (remote)".to_string(), + }; let mut br_sym = "┣━"; // Draw each nesting padding with proper style @@ -337,11 +359,11 @@ fn print_timeline( br_sym = "┗━"; } - print!("{} @{}: ", br_sym, lsn); + print!("{} @{}: ", br_sym, lsn_string); } // Finally print a timeline name with new line - println!("{}", timeline.info.timeline_id); + println!("{}", timeline.info.timeline_id()); let len = timeline.children.len(); let mut i: usize = 0; @@ -375,26 +397,44 @@ fn print_timeline( /// Connects to the pageserver to query this information. fn get_timeline_infos( env: &local_env::LocalEnv, - tenantid: &ZTenantId, + tenant_id: &ZTenantId, ) -> Result> { let page_server = PageServerNode::from_env(env); - let timeline_infos: Vec = page_server.timeline_list(tenantid)?; + let timeline_infos: Vec = page_server.timeline_list(tenant_id)?; let timeline_infos: HashMap = timeline_infos .into_iter() - .map(|timeline_info| (timeline_info.timeline_id, timeline_info)) + .map(|timeline_info| (timeline_info.timeline_id(), timeline_info)) .collect(); Ok(timeline_infos) } -// Helper function to parse --tenantid option, or get the default from config file -fn get_tenantid(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Result { - if let Some(tenantid_cmd) = sub_match.value_of("tenantid") { - Ok(ZTenantId::from_str(tenantid_cmd)?) - } else if let Some(tenantid_conf) = env.default_tenantid { +// Helper function to parse --tenant_id option, or get the default from config file +fn get_tenant_id(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> anyhow::Result { + if let Some(tenantid_cmd) = sub_match.value_of("tenant-id") { + Ok( + ZTenantId::from_str(tenantid_cmd) + .context("Failed to parse tenant id from arguments")?, + ) + } else if let Some(tenantid_conf) = env.default_tenant_id { Ok(ZTenantId::from(tenantid_conf)) } else { - bail!("No tenantid. Use --tenantid, or set 'default_tenantid' in the config file"); + bail!("No tenant id. Use --tenant-id, or set 'default_tenant_id' in the config file"); + } +} + +fn get_timeline_id( + sub_match: &ArgMatches, + tenant_id: ZTenantId, + env: &local_env::LocalEnv, +) -> anyhow::Result { + if let Some(timeline_id) = sub_match.value_of("timeline-id") { + Ok(ZTimelineId::from_str(timeline_id) + .context("Failed to parse timeline id from arguments")?) + } else if let Some(&initial_timeline_id) = env.initial_timelines.get(&tenant_id) { + Ok(initial_timeline_id) + } else { + bail!("No timeline id, specify one in the subcommand's arguments"); } } @@ -418,7 +458,7 @@ fn handle_init(init_match: &ArgMatches) -> Result<()> { let pageserver = PageServerNode::from_env(&env); if let Err(e) = pageserver.init( // default_tenantid was generated by the `env.init()` call above - Some(&ZTenantId::from(env.default_tenantid.unwrap()).to_string()), + Some(&ZTenantId::from(env.default_tenant_id.unwrap()).to_string()), &pageserver_config_overrides(init_match), ) { eprintln!("pageserver init failed: {}", e); @@ -436,7 +476,7 @@ fn pageserver_config_overrides(init_match: &ArgMatches) -> Vec<&str> { .collect() } -fn handle_tenant(tenant_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> { +fn handle_tenant(tenant_match: &ArgMatches, env: &mut local_env::LocalEnv) -> Result<()> { let pageserver = PageServerNode::from_env(env); match tenant_match.subcommand() { Some(("list", _)) => { @@ -445,13 +485,17 @@ fn handle_tenant(tenant_match: &ArgMatches, env: &local_env::LocalEnv) -> Result } } Some(("create", create_match)) => { - let tenantid = match create_match.value_of("tenantid") { - Some(tenantid) => ZTenantId::from_str(tenantid)?, + let tenant_id = match create_match.value_of("tenant-id") { + Some(id) => ZTenantId::from_str(id)?, None => ZTenantId::generate(), }; - println!("using tenant id {}", tenantid); - pageserver.tenant_create(tenantid)?; - println!("tenant successfully created on the pageserver"); + println!("using tenant id {}", tenant_id); + let initial_timeline_id = pageserver.tenant_create(tenant_id)?; + env.initial_timelines.insert(tenant_id, initial_timeline_id); + println!( + "tenant {} successfully created on the pageserver, initial timeline: '{}'", + tenant_id, initial_timeline_id + ); } Some((sub_name, _)) => bail!("Unexpected tenant subcommand '{}'", sub_name), None => bail!("no tenant subcommand provided"), @@ -462,26 +506,77 @@ fn handle_tenant(tenant_match: &ArgMatches, env: &local_env::LocalEnv) -> Result fn handle_timeline(timeline_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> { let pageserver = PageServerNode::from_env(env); - let tenant_id = get_tenantid(timeline_match, env)?; + match timeline_match.subcommand() { + Some(("list", list_match)) => { + let tenant_id = get_tenant_id(list_match, env)?; + let timelines = pageserver.timeline_list(&tenant_id)?; + print_timelines_tree(timelines)?; + } + Some(("create", create_match)) => { + let tenant_id = get_tenant_id(create_match, env)?; + let timeline_id = get_timeline_id(create_match, tenant_id, env) + .unwrap_or_else(|_| ZTimelineId::generate()); + let timeline = pageserver.timeline_create(tenant_id, timeline_id, None, None)?; - if let Some(timeline_id) = timeline_match.value_of("timeline-id") { - let start_lsn = timeline_match - .value_of("start-lsn") - .map(|lsn| lsn.parse::()) - .transpose() - .context("Failed to parse start Lsn from the request")?; - let timeline_id = timeline_id - .parse::() - .context("Failed to parse timeline id from the request")?; - let timeline = pageserver.timeline_create(tenant_id, timeline_id, start_lsn)?; - println!( - "Created timeline '{}' at {:?} for tenant: {}", - timeline.timeline_id, timeline.latest_valid_lsn, tenant_id, - ); - } else { - // No arguments, list timelines for tenant - let timelines = pageserver.timeline_list(&tenant_id)?; - print_timelines_tree(timelines)?; + let last_record_lsn = match timeline { + TimelineInfo::Local { + last_record_lsn, .. + } => last_record_lsn, + TimelineInfo::Remote { .. } => { + bail!("Timeline {} was created as remote, not local", timeline_id) + } + }; + println!( + "Created timeline '{}' at Lsn {} for tenant: {}", + timeline.timeline_id(), + last_record_lsn, + tenant_id, + ); + } + Some(("branch", branch_match)) => { + let tenant_id = get_tenant_id(branch_match, env)?; + let timeline_id = get_timeline_id(branch_match, tenant_id, env) + .unwrap_or_else(|_| ZTimelineId::generate()); + let ancestor_timeline_id = match branch_match + .value_of("ancestor-timeline-id") + .map(ZTimelineId::from_str) + .transpose() + .context("Failed to parse ancestor timeline id from the request")? + .or_else(|| env.initial_timelines.get(&tenant_id).copied()) + { + Some(id) => id, + None => bail!("No ancestor timeline id provided"), + }; + let start_lsn = branch_match + .value_of("ancestor-start-lsn") + .map(Lsn::from_str) + .transpose() + .context("Failed to parse ancestor start Lsn from the request")?; + let timeline = pageserver.timeline_create( + tenant_id, + timeline_id, + start_lsn, + Some(ancestor_timeline_id), + )?; + + let last_record_lsn = match timeline { + TimelineInfo::Local { + last_record_lsn, .. + } => last_record_lsn, + TimelineInfo::Remote { .. } => { + bail!("Timeline {} was created as remote, not local", timeline_id) + } + }; + println!( + "Created timeline '{}' at Lsn {} for tenant: {}. Ancestor timeline: '{}'", + timeline.timeline_id(), + last_record_lsn, + tenant_id, + ancestor_timeline_id, + ); + } + Some((sub_name, _)) => bail!("Unexpected tenant subcommand '{}'", sub_name), + None => bail!("no tenant subcommand provided"), } Ok(()) @@ -495,12 +590,12 @@ fn handle_pg(pg_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> { let mut cplane = ComputeControlPlane::load(env.clone())?; - // All subcommands take an optional --tenantid option - let tenantid = get_tenantid(sub_args, env)?; + // All subcommands take an optional --tenant-id option + let tenant_id = get_tenant_id(sub_args, env)?; match sub_name { "list" => { - let timeline_infos = get_timeline_infos(env, &tenantid).unwrap_or_else(|e| { + let timeline_infos = get_timeline_infos(env, &tenant_id).unwrap_or_else(|e| { eprintln!("Failed to load timeline info: {}", e); HashMap::new() }); @@ -509,21 +604,26 @@ fn handle_pg(pg_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> { for ((_, node_name), node) in cplane .nodes .iter() - .filter(|((node_tenantid, _), _)| node_tenantid == &tenantid) + .filter(|((node_tenant_id, _), _)| node_tenant_id == &tenant_id) { // FIXME: This shows the LSN at the end of the timeline. It's not the // right thing to do for read-only nodes that might be anchored at an // older point in time, or following but lagging behind the primary. let lsn_str = timeline_infos - .get(&node.timelineid) - .map(|bi| bi.latest_valid_lsn.to_string()) - .unwrap_or_else(|| "?".to_string()); + .get(&node.timeline_id) + .map(|bi| match bi { + TimelineInfo::Local { + last_record_lsn, .. + } => last_record_lsn.to_string(), + TimelineInfo::Remote { .. } => "? (remote)".to_string(), + }) + .unwrap_or_else(|| '?'.to_string()); println!( "{}\t{}\t{}\t{}\t{}", node_name, node.address, - node.timelineid, + node.timeline_id, lsn_str, node.status(), ); @@ -531,27 +631,31 @@ fn handle_pg(pg_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> { } "create" => { let node_name = sub_args.value_of("node").unwrap_or("main"); - let timeline_spec = sub_args.value_of("timeline"); + let lsn = sub_args + .value_of("lsn") + .map(Lsn::from_str) + .transpose() + .context("Failed to parse Lsn from the request")?; + let timeline_id = get_timeline_id(sub_args, tenant_id, env)?; let port: Option = match sub_args.value_of("port") { Some(p) => Some(p.parse()?), None => None, }; - cplane.new_node(tenantid, node_name, timeline_spec, port)?; + cplane.new_node(tenant_id, node_name, timeline_id, lsn, port)?; } "start" => { let node_name = sub_args.value_of("node").unwrap_or("main"); - let timeline_spec = sub_args.value_of("timeline"); let port: Option = match sub_args.value_of("port") { Some(p) => Some(p.parse()?), None => None, }; - let node = cplane.nodes.get(&(tenantid, node_name.to_owned())); + let node = cplane.nodes.get(&(tenant_id, node_name.to_owned())); let auth_token = if matches!(env.pageserver.auth_type, AuthType::ZenithJWT) { - let claims = Claims::new(Some(tenantid), Scope::Tenant); + let claims = Claims::new(Some(tenant_id), Scope::Tenant); Some(env.generate_auth_token(&claims)?) } else { @@ -559,22 +663,25 @@ fn handle_pg(pg_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> { }; if let Some(node) = node { - if timeline_spec.is_some() { - println!("timeline spec ignored because its node exists already"); - } println!("Starting existing postgres {}...", node_name); node.start(&auth_token)?; } else { + let timeline_id = get_timeline_id(sub_args, tenant_id, env)?; + let lsn = sub_args + .value_of("lsn") + .map(Lsn::from_str) + .transpose() + .context("Failed to parse Lsn from the request")?; // when used with custom port this results in non obvious behaviour // port is remembered from first start command, i e // start --port X // stop // start <-- will also use port X even without explicit port argument println!( - "Starting new postgres {} on timeline {:?} ...", - node_name, timeline_spec + "Starting new postgres {} on timeline {} ...", + node_name, timeline_id ); - let node = cplane.new_node(tenantid, node_name, timeline_spec, port)?; + let node = cplane.new_node(tenant_id, node_name, timeline_id, lsn, port)?; node.start(&auth_token)?; } } @@ -584,7 +691,7 @@ fn handle_pg(pg_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> { let node = cplane .nodes - .get(&(tenantid, node_name.to_owned())) + .get(&(tenant_id, node_name.to_owned())) .with_context(|| format!("postgres {} is not found", node_name))?; node.stop(destroy)?; }