From 10f811e886292e258adec931945f7f6bdce4b412 Mon Sep 17 00:00:00 2001 From: Kirill Bulatov Date: Fri, 4 Feb 2022 10:37:39 -0500 Subject: [PATCH] Use `timeline` instead of `branch` in pageserver's API --- control_plane/src/compute.rs | 38 ++-- control_plane/src/storage.rs | 45 ++-- pageserver/src/bin/pageserver.rs | 5 +- pageserver/src/config.rs | 8 - pageserver/src/http/models.rs | 6 +- pageserver/src/http/openapi_spec.yml | 141 +++---------- pageserver/src/http/routes.rs | 123 +++-------- pageserver/src/layered_repository.rs | 45 ++-- pageserver/src/lib.rs | 2 +- pageserver/src/remote_storage/README.md | 8 - pageserver/src/remote_storage/storage_sync.rs | 62 +----- .../remote_storage/storage_sync/download.rs | 100 +-------- .../src/remote_storage/storage_sync/index.rs | 37 +--- .../src/remote_storage/storage_sync/upload.rs | 94 +-------- pageserver/src/repository.rs | 21 +- pageserver/src/tenant_mgr.rs | 4 +- pageserver/src/{branches.rs => timelines.rs} | 194 ++++++------------ zenith/src/main.rs | 178 ++++++++-------- 18 files changed, 311 insertions(+), 800 deletions(-) rename pageserver/src/{branches.rs => timelines.rs} (70%) diff --git a/control_plane/src/compute.rs b/control_plane/src/compute.rs index a61191e7a4..3569cc1dbb 100644 --- a/control_plane/src/compute.rs +++ b/control_plane/src/compute.rs @@ -73,39 +73,43 @@ impl ComputeControlPlane { .unwrap_or(self.base_port) } - // FIXME: see also parse_point_in_time in branches.rs. + // FIXME: see also parse_point_in_time in timelines.rs. fn parse_point_in_time( &self, - tenantid: ZTenantId, + tenant_id: ZTenantId, s: &str, ) -> Result<(ZTimelineId, Option)> { - let mut strings = s.split('@'); - let name = strings.next().unwrap(); + let _strings = s.split('@'); + // let name = strings.next().unwrap(); - let lsn = strings - .next() - .map(Lsn::from_str) - .transpose() - .context("invalid LSN in point-in-time specification")?; + // let lsn = strings + // .next() + // .map(Lsn::from_str) + // .transpose() + // .context("invalid LSN in point-in-time specification")?; - // Resolve the timeline ID, given the human-readable branch name - let timeline_id = self - .pageserver - .branch_get_by_name(&tenantid, name)? - .timeline_id; + // // Resolve the timeline ID, given the human-readable branch name + // let timeline_id = self + // .pageserver + // .branch_get_by_name(&tenant_id, name)? + // .timeline_id; - Ok((timeline_id, lsn)) + // Ok((timeline_id, lsn)) + todo!("TODO kb check more about the '@name' format") } pub fn new_node( &mut self, tenantid: ZTenantId, name: &str, - timeline_spec: &str, + timeline_spec: Option<&str>, port: Option, ) -> Result> { // Resolve the human-readable timeline spec into timeline ID and LSN - let (timelineid, lsn) = self.parse_point_in_time(tenantid, timeline_spec)?; + let (timelineid, lsn) = match timeline_spec { + Some(timeline_spec) => self.parse_point_in_time(tenantid, timeline_spec)?, + None => (ZTimelineId::generate(), None), + }; let port = port.unwrap_or_else(|| self.get_port()); let node = Arc::new(PostgresNode { diff --git a/control_plane/src/storage.rs b/control_plane/src/storage.rs index cd429e3f7a..aed9a757d4 100644 --- a/control_plane/src/storage.rs +++ b/control_plane/src/storage.rs @@ -9,18 +9,18 @@ use anyhow::bail; use nix::errno::Errno; use nix::sys::signal::{kill, Signal}; use nix::unistd::Pid; -use pageserver::http::models::{BranchCreateRequest, TenantCreateRequest}; +use pageserver::http::models::{TenantCreateRequest, TimelineCreateRequest}; +use pageserver::timelines::TimelineInfo; use postgres::{Config, NoTls}; use reqwest::blocking::{Client, RequestBuilder, Response}; use reqwest::{IntoUrl, Method}; use thiserror::Error; use zenith_utils::http::error::HttpErrorBody; use zenith_utils::postgres_backend::AuthType; -use zenith_utils::zid::ZTenantId; +use zenith_utils::zid::{ZTenantId, ZTimelineId}; use crate::local_env::LocalEnv; use crate::{fill_rust_env_vars, read_pidfile}; -use pageserver::branches::BranchInfo; use pageserver::tenant_mgr::TenantInfo; use zenith_utils::connstring::connection_address; @@ -335,47 +335,32 @@ impl PageServerNode { .json()?) } - pub fn branch_list(&self, tenantid: &ZTenantId) -> Result> { + pub fn timeline_list(&self, tenantid: &ZTenantId) -> Result> { Ok(self .http_request( Method::GET, - format!("{}/branch/{}", self.http_base_url, tenantid), + format!("{}/timeline/{}", self.http_base_url, tenantid), ) .send()? .error_from_body()? .json()?) } - pub fn branch_create( + pub fn timeline_create( &self, - branch_name: &str, - startpoint: &str, - tenantid: &ZTenantId, - ) -> Result { + timeline_id: ZTimelineId, + start_point: String, + tenant_id: ZTenantId, + ) -> Result { Ok(self - .http_request(Method::POST, format!("{}/branch", self.http_base_url)) - .json(&BranchCreateRequest { - tenant_id: tenantid.to_owned(), - name: branch_name.to_owned(), - start_point: startpoint.to_owned(), + .http_request(Method::POST, format!("{}/timeline", self.http_base_url)) + .json(&TimelineCreateRequest { + tenant_id, + timeline_id, + start_point, }) .send()? .error_from_body()? .json()?) } - - pub fn branch_get_by_name( - &self, - tenantid: &ZTenantId, - branch_name: &str, - ) -> Result { - Ok(self - .http_request( - Method::GET, - format!("{}/branch/{}/{}", self.http_base_url, tenantid, branch_name), - ) - .send()? - .error_for_status()? - .json()?) - } } diff --git a/pageserver/src/bin/pageserver.rs b/pageserver/src/bin/pageserver.rs index d8d4033340..2fa772af58 100644 --- a/pageserver/src/bin/pageserver.rs +++ b/pageserver/src/bin/pageserver.rs @@ -10,11 +10,10 @@ use clap::{App, Arg}; use daemonize::Daemonize; use pageserver::{ - branches, config::{defaults::*, PageServerConf}, http, page_cache, page_service, remote_storage, tenant_mgr, thread_mgr, thread_mgr::ThreadKind, - virtual_file, LOG_FILE_NAME, + timelines, virtual_file, LOG_FILE_NAME, }; use zenith_utils::http::endpoint; use zenith_utils::postgres_backend; @@ -143,7 +142,7 @@ fn main() -> Result<()> { // Create repo and exit if init was requested if init { - branches::init_pageserver(conf, create_tenant).context("Failed to init pageserver")?; + timelines::init_pageserver(conf, create_tenant).context("Failed to init pageserver")?; // write the config file std::fs::write(&cfg_file_path, toml.to_string()).with_context(|| { format!( diff --git a/pageserver/src/config.rs b/pageserver/src/config.rs index 3deabb7521..5a9c7557cc 100644 --- a/pageserver/src/config.rs +++ b/pageserver/src/config.rs @@ -400,14 +400,6 @@ impl PageServerConf { self.tags_path(tenantid).join(tag_name) } - pub fn branches_path(&self, tenantid: &ZTenantId) -> PathBuf { - self.tenant_path(tenantid).join("refs").join("branches") - } - - pub fn branch_path(&self, branch_name: &str, tenantid: &ZTenantId) -> PathBuf { - self.branches_path(tenantid).join(branch_name) - } - pub fn timelines_path(&self, tenantid: &ZTenantId) -> PathBuf { self.tenant_path(tenantid).join(TIMELINES_SEGMENT_NAME) } diff --git a/pageserver/src/http/models.rs b/pageserver/src/http/models.rs index 5d7398ef03..a6dce33c03 100644 --- a/pageserver/src/http/models.rs +++ b/pageserver/src/http/models.rs @@ -1,13 +1,15 @@ use serde::{Deserialize, Serialize}; +use zenith_utils::zid::ZTimelineId; use crate::ZTenantId; use zenith_utils::zid::ZNodeId; #[derive(Serialize, Deserialize)] -pub struct BranchCreateRequest { +pub struct TimelineCreateRequest { #[serde(with = "hex")] pub tenant_id: ZTenantId, - pub name: String, + #[serde(with = "hex")] + pub timeline_id: ZTimelineId, pub start_point: String, } diff --git a/pageserver/src/http/openapi_spec.yml b/pageserver/src/http/openapi_spec.yml index baf81fcf21..7f3bf97bfe 100644 --- a/pageserver/src/http/openapi_spec.yml +++ b/pageserver/src/http/openapi_spec.yml @@ -30,19 +30,22 @@ paths: schema: type: string format: hex + - name: include-non-incremental-logical-size + in: query + schema: + type: string + description: Controls calculation of current_logical_size_non_incremental get: - description: List tenant timelines + description: Get timelines for tenant responses: "200": - description: array of brief timeline descriptions + description: TimelineInfo content: application/json: schema: type: array items: - # currently, just a timeline id string, but when remote index gets to be accessed - # remote/local timeline field would be added at least - type: string + $ref: "#/components/schemas/TimelineInfo" "400": description: Error when no tenant id found in path content: @@ -81,8 +84,13 @@ paths: schema: type: string format: hex + - name: include-non-incremental-logical-size + in: query + schema: + type: string + description: Controls calculation of current_logical_size_non_incremental get: - description: Get timeline info for tenant's remote timeline + description: Get timelines for tenant responses: "200": description: TimelineInfo @@ -91,7 +99,7 @@ paths: schema: $ref: "#/components/schemas/TimelineInfo" "400": - description: Error when no tenant id found in path or no branch name + description: Error when no tenant id found in path or no timeline id content: application/json: schema: @@ -114,108 +122,9 @@ paths: application/json: schema: $ref: "#/components/schemas/Error" - /v1/branch/{tenant_id}: - parameters: - - name: tenant_id - in: path - required: true - schema: - type: string - format: hex - - name: include-non-incremental-logical-size - in: query - schema: - type: string - description: Controls calculation of current_logical_size_non_incremental - get: - description: Get branches for tenant - responses: - "200": - description: BranchInfo - content: - application/json: - schema: - type: array - items: - $ref: "#/components/schemas/BranchInfo" - "400": - description: Error when no tenant id found in path - content: - application/json: - schema: - $ref: "#/components/schemas/Error" - "401": - description: Unauthorized Error - content: - application/json: - schema: - $ref: "#/components/schemas/UnauthorizedError" - "403": - description: Forbidden Error - content: - application/json: - schema: - $ref: "#/components/schemas/ForbiddenError" - "500": - description: Generic operation error - content: - application/json: - schema: - $ref: "#/components/schemas/Error" - /v1/branch/{tenant_id}/{branch_name}: - parameters: - - name: tenant_id - in: path - required: true - schema: - type: string - format: hex - - name: branch_name - in: path - required: true - schema: - type: string - - name: include-non-incremental-logical-size - in: query - schema: - type: string - description: Controls calculation of current_logical_size_non_incremental - get: - description: Get branches for tenant - responses: - "200": - description: BranchInfo - content: - application/json: - schema: - $ref: "#/components/schemas/BranchInfo" - "400": - description: Error when no tenant id found in path or no branch name - content: - application/json: - schema: - $ref: "#/components/schemas/Error" - "401": - description: Unauthorized Error - content: - application/json: - schema: - $ref: "#/components/schemas/UnauthorizedError" - "403": - description: Forbidden Error - content: - application/json: - schema: - $ref: "#/components/schemas/ForbiddenError" - "500": - description: Generic operation error - content: - application/json: - schema: - $ref: "#/components/schemas/Error" - /v1/branch/: + /v1/timeline/: post: - description: Create branch + description: Create timeline requestBody: content: application/json: @@ -223,25 +132,26 @@ paths: type: object required: - "tenant_id" - - "name" + - "timeline_id" - "start_point" properties: tenant_id: type: string format: hex - name: + timeline_id: type: string + format: hex start_point: type: string responses: "201": - description: BranchInfo + description: TimelineInfo content: application/json: schema: - $ref: "#/components/schemas/BranchInfo" + $ref: "#/components/schemas/TImelineInfo" "400": - description: Malformed branch create request + description: Malformed timeline create request content: application/json: schema: @@ -358,16 +268,13 @@ components: type: string state: type: string - BranchInfo: + TimelineInfo: type: object required: - - name - timeline_id - latest_valid_lsn - current_logical_size properties: - name: - type: string timeline_id: type: string format: hex diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index 26d473efaf..5ab1576aa6 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -1,6 +1,6 @@ use std::sync::Arc; -use anyhow::{Context, Result}; +use anyhow::Result; use hyper::StatusCode; use hyper::{Body, Request, Response, Uri}; use serde::Serialize; @@ -14,7 +14,6 @@ use zenith_utils::http::{ endpoint, error::HttpErrorBody, json::{json_request, json_response}, - request::get_request_param, request::parse_request_param, }; use zenith_utils::http::{RequestExt, RouterBuilder}; @@ -22,13 +21,12 @@ use zenith_utils::lsn::Lsn; use zenith_utils::zid::HexZTimelineId; use zenith_utils::zid::ZTimelineId; -use super::models::BranchCreateRequest; use super::models::StatusResponse; use super::models::TenantCreateRequest; -use crate::branches::BranchInfo; +use super::models::TimelineCreateRequest; use crate::repository::RepositoryTimeline; use crate::repository::TimelineSyncState; -use crate::{branches, config::PageServerConf, tenant_mgr, ZTenantId}; +use crate::{config::PageServerConf, tenant_mgr, timelines, ZTenantId}; #[derive(Debug)] struct State { @@ -73,18 +71,18 @@ async fn status_handler(request: Request) -> Result, ApiErr )?) } -async fn branch_create_handler(mut request: Request) -> Result, ApiError> { - let request_data: BranchCreateRequest = json_request(&mut request).await?; +async fn timeline_create_handler(mut request: Request) -> Result, ApiError> { + let request_data: TimelineCreateRequest = json_request(&mut request).await?; check_permission(&request, Some(request_data.tenant_id))?; let response_data = tokio::task::spawn_blocking(move || { - let _enter = info_span!("/branch_create", name = %request_data.name, tenant = %request_data.tenant_id, startpoint=%request_data.start_point).entered(); - branches::create_branch( + let _enter = info_span!("/timeline_create", timeline = %request_data.timeline_id, tenant = %request_data.tenant_id, startpoint=%request_data.start_point).entered(); + timelines::create_timeline( get_config(&request), - &request_data.name, &request_data.start_point, - &request_data.tenant_id, + request_data.tenant_id, + request_data.timeline_id, ) }) .await @@ -92,6 +90,19 @@ async fn branch_create_handler(mut request: Request) -> Result) -> Result, ApiError> { + let tenant_id: ZTenantId = parse_request_param(&request, "tenant_id")?; + check_permission(&request, Some(tenant_id))?; + let include_non_incremental_logical_size = get_include_non_incremental_logical_size(&request); + let response_data = tokio::task::spawn_blocking(move || { + let _enter = info_span!("timeline_list", tenant = %tenant_id).entered(); + crate::timelines::get_timelines(tenant_id, include_non_incremental_logical_size) + }) + .await + .map_err(ApiError::from_err)??; + Ok(json_response(StatusCode::OK, response_data)?) +} + // Gate non incremental logical size calculation behind a flag // after pgbench -i -s100 calculation took 28ms so if multiplied by the number of timelines // and tenants it can take noticeable amount of time. Also the value currently used only in tests @@ -107,90 +118,6 @@ fn get_include_non_incremental_logical_size(request: &Request) -> bool { .unwrap_or(false) } -async fn branch_list_handler(request: Request) -> Result, ApiError> { - let tenantid: ZTenantId = parse_request_param(&request, "tenant_id")?; - - let include_non_incremental_logical_size = get_include_non_incremental_logical_size(&request); - - check_permission(&request, Some(tenantid))?; - - let response_data = tokio::task::spawn_blocking(move || { - let _enter = info_span!("branch_list", tenant = %tenantid).entered(); - crate::branches::get_branches( - get_config(&request), - &tenantid, - include_non_incremental_logical_size, - ) - }) - .await - .map_err(ApiError::from_err)??; - Ok(json_response(StatusCode::OK, response_data)?) -} - -async fn branch_detail_handler(request: Request) -> Result, ApiError> { - let tenantid: ZTenantId = parse_request_param(&request, "tenant_id")?; - let branch_name: String = get_request_param(&request, "branch_name")?.to_string(); - let conf = get_state(&request).conf; - let path = conf.branch_path(&branch_name, &tenantid); - - let include_non_incremental_logical_size = get_include_non_incremental_logical_size(&request); - - let response_data = tokio::task::spawn_blocking(move || { - let _enter = info_span!("branch_detail", tenant = %tenantid, branch=%branch_name).entered(); - let repo = tenant_mgr::get_repository_for_tenant(tenantid)?; - BranchInfo::from_path(path, &repo, include_non_incremental_logical_size) - }) - .await - .map_err(ApiError::from_err)??; - - Ok(json_response(StatusCode::OK, response_data)?) -} - -async fn timeline_list_handler(request: Request) -> Result, ApiError> { - let tenant_id: ZTenantId = parse_request_param(&request, "tenant_id")?; - check_permission(&request, Some(tenant_id))?; - - let conf = get_state(&request).conf; - let timelines_dir = conf.timelines_path(&tenant_id); - - let mut timelines_dir_contents = - tokio::fs::read_dir(&timelines_dir).await.with_context(|| { - format!( - "Failed to list timelines dir '{}' contents", - timelines_dir.display() - ) - })?; - - let mut local_timelines = Vec::new(); - while let Some(entry) = timelines_dir_contents.next_entry().await.with_context(|| { - format!( - "Failed to list timelines dir '{}' contents", - timelines_dir.display() - ) - })? { - let entry_path = entry.path(); - let entry_type = entry.file_type().await.with_context(|| { - format!( - "Failed to get file type of timeline dirs' entry '{}'", - entry_path.display() - ) - })?; - - if entry_type.is_dir() { - match entry.file_name().to_string_lossy().parse::() { - Ok(timeline_id) => local_timelines.push(timeline_id.to_string()), - Err(e) => error!( - "Failed to get parse timeline id from timeline dirs' entry '{}': {}", - entry_path.display(), - e - ), - } - } - } - - Ok(json_response(StatusCode::OK, local_timelines)?) -} - #[derive(Debug, Serialize)] #[serde(tag = "type")] enum TimelineInfo { @@ -260,7 +187,7 @@ async fn timeline_attach_handler(request: Request) -> Result { + RepositoryTimeline::Local { .. } => { anyhow::bail!("Timeline with id {} is already local", timeline_id) } RepositoryTimeline::Remote { @@ -369,9 +296,7 @@ pub fn make_router( "/v1/timeline/:tenant_id/:timeline_id/detach", timeline_detach_handler, ) - .get("/v1/branch/:tenant_id", branch_list_handler) - .get("/v1/branch/:tenant_id/:branch_name", branch_detail_handler) - .post("/v1/branch", branch_create_handler) + .post("/v1/timeline", timeline_create_handler) .get("/v1/tenant", tenant_list_handler) .post("/v1/tenant", tenant_create_handler) .any(handler_404) diff --git a/pageserver/src/layered_repository.rs b/pageserver/src/layered_repository.rs index 975b2f5d2b..c3d42d1829 100644 --- a/pageserver/src/layered_repository.rs +++ b/pageserver/src/layered_repository.rs @@ -137,19 +137,20 @@ pub struct LayeredRepository { /// Public interface impl Repository for LayeredRepository { fn get_timeline(&self, timelineid: ZTimelineId) -> Result { - let mut timelines = self.timelines.lock().unwrap(); - Ok( - match self.get_or_init_timeline(timelineid, &mut timelines)? { - LayeredTimelineEntry::Local(local) => RepositoryTimeline::Local(local), - LayeredTimelineEntry::Remote { - id, - disk_consistent_lsn, - } => RepositoryTimeline::Remote { - id, - disk_consistent_lsn, - }, - }, - ) + Ok(RepositoryTimeline::from(self.get_or_init_timeline( + timelineid, + &mut self.timelines.lock().unwrap(), + )?)) + } + + fn list_timelines(&self) -> Result> { + Ok(self + .timelines + .lock() + .unwrap() + .values() + .map(|timeline_entry| RepositoryTimeline::from(timeline_entry.clone())) + .collect()) } fn create_empty_timeline( @@ -428,6 +429,24 @@ impl LayeredTimelineEntry { } } +impl From for RepositoryTimeline { + fn from(layered_timeline: LayeredTimelineEntry) -> Self { + match layered_timeline { + LayeredTimelineEntry::Local(timeline) => RepositoryTimeline::Local { + id: timeline.timelineid, + timeline, + }, + LayeredTimelineEntry::Remote { + id, + disk_consistent_lsn, + } => RepositoryTimeline::Remote { + id, + disk_consistent_lsn, + }, + } + } +} + /// Private functions impl LayeredRepository { // Implementation of the public `get_timeline` function. This differs from the public diff --git a/pageserver/src/lib.rs b/pageserver/src/lib.rs index 3a68f56187..3d66192c80 100644 --- a/pageserver/src/lib.rs +++ b/pageserver/src/lib.rs @@ -1,5 +1,4 @@ pub mod basebackup; -pub mod branches; pub mod config; pub mod http; pub mod import_datadir; @@ -12,6 +11,7 @@ pub mod repository; pub mod tenant_mgr; pub mod tenant_threads; pub mod thread_mgr; +pub mod timelines; pub mod virtual_file; pub mod walingest; pub mod walreceiver; diff --git a/pageserver/src/remote_storage/README.md b/pageserver/src/remote_storage/README.md index 1c718acf06..3c77275da8 100644 --- a/pageserver/src/remote_storage/README.md +++ b/pageserver/src/remote_storage/README.md @@ -62,11 +62,3 @@ Based on previous evaluation, even `rusoto-s3` could be a better choice over thi So far, we don't adjust the remote storage based on GC thread loop results, only checkpointer loop affects the remote storage. Index module could be used as a base to implement a deferred GC mechanism, a "defragmentation" that repacks archives into new ones after GC is done removing the files from the archives. - -* bracnhes implementaion could be improved - -Currently, there's a code to sync the branches along with the timeline files: on upload, every local branch files that are missing remotely are uploaded, -on the timeline download, missing remote branch files are downlaoded. - -A branch is a per-tenant entity, yet a current implementaion requires synchronizing a timeline first to get the branch files locally. -Currently, there's no other way to know about the remote branch files, neither the file contents is verified and updated. diff --git a/pageserver/src/remote_storage/storage_sync.rs b/pageserver/src/remote_storage/storage_sync.rs index 6b588c8e5f..d14f849e15 100644 --- a/pageserver/src/remote_storage/storage_sync.rs +++ b/pageserver/src/remote_storage/storage_sync.rs @@ -14,13 +14,6 @@ //! Only GC removes local timeline files, the GC support is not added to sync currently, //! yet downloading extra files is not critically bad at this stage, GC can remove those again. //! -//! Along the timeline files, branch files are uploaded and downloaded every time a corresponding sync task is processed. -//! For simplicity, branch files are also treated as immutable: only missing files are uploaded or downloaded, no removals, amendments or file contents checks are done. -//! Also, the branches are copied as separate files, with no extra compressions done. -//! Despite branches information currently belonging to tenants, a tenants' timeline sync is required to upload or download the branch files, also, there's no way to know -//! the branch sync state outside of the sync loop. -//! This implementation is currently considered as temporary and is a subjec to change later. -//! //! During the loop startup, an initial [`RemoteTimelineIndex`] state is constructed via listing the remote storage contents. //! It's enough to poll the remote state once on startup only, due to agreement that the pageserver has //! an exclusive write access to the remote storage: new files appear in the storage only after the same @@ -66,7 +59,6 @@ //! NOTE: No real contents or checksum check happens right now and is a subject to improve later. //! //! After the whole timeline is downloaded, [`crate::tenant_mgr::set_timeline_states`] function is used to update pageserver memory stage for the timeline processed. -//! No extra branch registration is done. //! //! When pageserver signals shutdown, current sync task gets finished and the loop exists. @@ -77,7 +69,7 @@ pub mod index; mod upload; use std::{ - collections::{BTreeSet, HashMap, HashSet, VecDeque}, + collections::{BTreeSet, HashMap, VecDeque}, num::{NonZeroU32, NonZeroUsize}, path::{Path, PathBuf}, sync::Arc, @@ -87,7 +79,6 @@ use anyhow::{bail, Context}; use futures::stream::{FuturesUnordered, StreamExt}; use lazy_static::lazy_static; use tokio::{ - fs, runtime::Runtime, sync::{ mpsc::{self, UnboundedReceiver}, @@ -101,8 +92,7 @@ use self::{ compression::ArchiveHeader, download::{download_timeline, DownloadedTimeline}, index::{ - ArchiveDescription, ArchiveId, RelativePath, RemoteTimeline, RemoteTimelineIndex, - TimelineIndexEntry, + ArchiveDescription, ArchiveId, RemoteTimeline, RemoteTimelineIndex, TimelineIndexEntry, }, upload::upload_timeline_checkpoint, }; @@ -843,28 +833,6 @@ async fn download_archive_header< Ok(header) } -async fn tenant_branch_files( - conf: &'static PageServerConf, - tenant_id: ZTenantId, -) -> anyhow::Result> { - let branches_dir = conf.branches_path(&tenant_id); - if !branches_dir.exists() { - return Ok(HashSet::new()); - } - - let mut branch_entries = fs::read_dir(&branches_dir) - .await - .context("Failed to list tenant branches dir contents")?; - - let mut branch_files = HashSet::new(); - while let Some(branch_entry) = branch_entries.next_entry().await? { - if branch_entry.file_type().await?.is_file() { - branch_files.insert(RelativePath::new(&branches_dir, branch_entry.path())?); - } - } - Ok(branch_files) -} - #[cfg(test)] mod test_utils { use std::{ @@ -971,30 +939,9 @@ mod test_utils { "Index contains unexpected sync ids" ); - let mut actual_branches = BTreeMap::new(); - let mut expected_branches = BTreeMap::new(); let mut actual_timeline_entries = BTreeMap::new(); let mut expected_timeline_entries = BTreeMap::new(); for sync_id in actual_sync_ids { - actual_branches.insert( - sync_id.tenant_id, - index_read - .branch_files(sync_id.tenant_id) - .into_iter() - .flat_map(|branch_paths| branch_paths.iter()) - .cloned() - .collect::>(), - ); - expected_branches.insert( - sync_id.tenant_id, - expected_index_with_descriptions - .branch_files(sync_id.tenant_id) - .into_iter() - .flat_map(|branch_paths| branch_paths.iter()) - .cloned() - .collect::>(), - ); - actual_timeline_entries.insert( sync_id, index_read.timeline_entry(&sync_id).unwrap().clone(), @@ -1009,11 +956,6 @@ mod test_utils { } drop(index_read); - assert_eq!( - actual_branches, expected_branches, - "Index contains unexpected branches" - ); - for (sync_id, actual_timeline_entry) in actual_timeline_entries { let expected_timeline_description = expected_timeline_entries .remove(&sync_id) diff --git a/pageserver/src/remote_storage/storage_sync/download.rs b/pageserver/src/remote_storage/storage_sync/download.rs index f268fc442a..00115ba8d5 100644 --- a/pageserver/src/remote_storage/storage_sync/download.rs +++ b/pageserver/src/remote_storage/storage_sync/download.rs @@ -1,10 +1,8 @@ //! Timeline synchrnonization logic to put files from archives on remote storage into pageserver's local directory. -//! Currently, tenant branch files are also downloaded, but this does not appear final. use std::{borrow::Cow, collections::BTreeSet, path::PathBuf, sync::Arc}; use anyhow::{ensure, Context}; -use futures::{stream::FuturesUnordered, StreamExt}; use tokio::{fs, sync::RwLock}; use tracing::{debug, error, trace, warn}; use zenith_utils::{lsn::Lsn, zid::ZTenantId}; @@ -14,8 +12,8 @@ use crate::{ layered_repository::metadata::{metadata_path, TimelineMetadata}, remote_storage::{ storage_sync::{ - compression, index::TimelineIndexEntry, sync_queue, tenant_branch_files, - update_index_description, SyncKind, SyncTask, + compression, index::TimelineIndexEntry, sync_queue, update_index_description, SyncKind, + SyncTask, }, RemoteStorage, ZTenantTimelineId, }, @@ -42,8 +40,6 @@ pub(super) enum DownloadedTimeline { /// Timeline files that already exist locally are skipped during the download, but the local metadata file is /// updated in the end of every checkpoint archive extraction. /// -/// Before any archives are considered, the branch files are checked locally and remotely, all remote-only files are downloaded. -/// /// On an error, bumps the retries count and reschedules the download, with updated archive skip list /// (for any new successful archive downloads and extractions). pub(super) async fn download_timeline< @@ -113,22 +109,6 @@ pub(super) async fn download_timeline< } }; - if let Err(e) = download_missing_branches(conf, remote_assets.as_ref(), sync_id.tenant_id).await - { - error!( - "Failed to download missing branches for sync id {}: {:?}", - sync_id, e - ); - sync_queue::push(SyncTask::new( - sync_id, - retries, - SyncKind::Download(download), - )); - return DownloadedTimeline::FailedAndRescheduled { - disk_consistent_lsn, - }; - } - debug!("Downloading timeline archives"); let archives_to_download = remote_timeline .checkpoints() @@ -250,82 +230,6 @@ async fn read_local_metadata( .context("Failed to read local metadata files bytes")?) } -async fn download_missing_branches< - P: std::fmt::Debug + Send + Sync + 'static, - S: RemoteStorage + Send + Sync + 'static, ->( - conf: &'static PageServerConf, - (storage, index): &(S, RwLock), - tenant_id: ZTenantId, -) -> anyhow::Result<()> { - let local_branches = tenant_branch_files(conf, tenant_id) - .await - .context("Failed to list local branch files for the tenant")?; - let local_branches_dir = conf.branches_path(&tenant_id); - if !local_branches_dir.exists() { - fs::create_dir_all(&local_branches_dir) - .await - .with_context(|| { - format!( - "Failed to create local branches directory at path '{}'", - local_branches_dir.display() - ) - })?; - } - - if let Some(remote_branches) = index.read().await.branch_files(tenant_id) { - let mut remote_only_branches_downloads = remote_branches - .difference(&local_branches) - .map(|remote_only_branch| async move { - let branches_dir = conf.branches_path(&tenant_id); - let remote_branch_path = remote_only_branch.as_path(&branches_dir); - let storage_path = - storage.storage_path(&remote_branch_path).with_context(|| { - format!( - "Failed to derive a storage path for branch with local path '{}'", - remote_branch_path.display() - ) - })?; - let mut target_file = fs::OpenOptions::new() - .write(true) - .create_new(true) - .open(&remote_branch_path) - .await - .with_context(|| { - format!( - "Failed to create local branch file at '{}'", - remote_branch_path.display() - ) - })?; - storage - .download(&storage_path, &mut target_file) - .await - .with_context(|| { - format!( - "Failed to download branch file from the remote path {:?}", - storage_path - ) - })?; - Ok::<_, anyhow::Error>(()) - }) - .collect::>(); - - let mut branch_downloads_failed = false; - while let Some(download_result) = remote_only_branches_downloads.next().await { - if let Err(e) = download_result { - branch_downloads_failed = true; - error!("Failed to download a branch file: {:?}", e); - } - } - ensure!( - !branch_downloads_failed, - "Failed to download all branch files" - ); - } - - Ok(()) -} - #[cfg(test)] mod tests { use std::collections::BTreeSet; diff --git a/pageserver/src/remote_storage/storage_sync/index.rs b/pageserver/src/remote_storage/storage_sync/index.rs index 3d2680948d..8ff92ed55e 100644 --- a/pageserver/src/remote_storage/storage_sync/index.rs +++ b/pageserver/src/remote_storage/storage_sync/index.rs @@ -5,7 +5,7 @@ //! This way in the future, the index could be restored fast from its serialized stored form. use std::{ - collections::{BTreeMap, BTreeSet, HashMap, HashSet}, + collections::{BTreeMap, BTreeSet, HashMap}, path::{Path, PathBuf}, }; @@ -49,10 +49,9 @@ impl RelativePath { } /// An index to track tenant files that exist on the remote storage. -/// Currently, timeline archives and branch files are tracked. +/// Currently, timeline archives files are tracked only. #[derive(Debug, Clone)] pub struct RemoteTimelineIndex { - branch_files: HashMap>, timeline_files: HashMap, } @@ -65,7 +64,6 @@ impl RemoteTimelineIndex { paths: impl Iterator, ) -> Self { let mut index = Self { - branch_files: HashMap::new(), timeline_files: HashMap::new(), }; for path in paths { @@ -98,17 +96,6 @@ impl RemoteTimelineIndex { pub fn all_sync_ids(&self) -> impl Iterator + '_ { self.timeline_files.keys().copied() } - - pub fn add_branch_file(&mut self, tenant_id: ZTenantId, path: RelativePath) { - self.branch_files - .entry(tenant_id) - .or_insert_with(HashSet::new) - .insert(path); - } - - pub fn branch_files(&self, tenant_id: ZTenantId) -> Option<&HashSet> { - self.branch_files.get(&tenant_id) - } } #[derive(Debug, Clone, PartialEq, Eq)] @@ -306,20 +293,9 @@ fn try_parse_index_entry( .parse::() .with_context(|| format!("Failed to parse tenant id from path '{}'", path.display()))?; - let branches_path = conf.branches_path(&tenant_id); let timelines_path = conf.timelines_path(&tenant_id); - match ( - RelativePath::new(&branches_path, &path), - path.strip_prefix(&timelines_path), - ) { - (Ok(_), Ok(_)) => bail!( - "Path '{}' cannot start with both branches '{}' and the timelines '{}' prefixes", - path.display(), - branches_path.display(), - timelines_path.display() - ), - (Ok(branches_entry), Err(_)) => index.add_branch_file(tenant_id, branches_entry), - (Err(_), Ok(timelines_subpath)) => { + match path.strip_prefix(&timelines_path) { + Ok(timelines_subpath) => { let mut segments = timelines_subpath.iter(); let timeline_id = segments .next() @@ -375,11 +351,10 @@ fn try_parse_index_entry( } } } - (Err(branches_error), Err(timelines_strip_error)) => { + Err(timelines_strip_error) => { bail!( - "Path '{}' is not an index entry: it's neither parsable as a branch entry '{:#}' nor as an archive entry '{}'", + "Path '{}' is not an archive entry '{}'", path.display(), - branches_error, timelines_strip_error, ) } diff --git a/pageserver/src/remote_storage/storage_sync/upload.rs b/pageserver/src/remote_storage/storage_sync/upload.rs index 0f57d714dd..d064039ecc 100644 --- a/pageserver/src/remote_storage/storage_sync/upload.rs +++ b/pageserver/src/remote_storage/storage_sync/upload.rs @@ -1,13 +1,10 @@ //! Timeline synchronization logic to compress and upload to the remote storage all new timeline files from the checkpoints. -//! Currently, tenant branch files are also uploaded, but this does not appear final. use std::{borrow::Cow, collections::BTreeSet, path::PathBuf, sync::Arc}; -use anyhow::{ensure, Context}; -use futures::{stream::FuturesUnordered, StreamExt}; -use tokio::{fs, sync::RwLock}; +use anyhow::ensure; +use tokio::sync::RwLock; use tracing::{debug, error, warn}; -use zenith_utils::zid::ZTenantId; use crate::{ config::PageServerConf, @@ -15,7 +12,7 @@ use crate::{ storage_sync::{ compression, index::{RemoteTimeline, TimelineIndexEntry}, - sync_queue, tenant_branch_files, update_index_description, SyncKind, SyncTask, + sync_queue, update_index_description, SyncKind, SyncTask, }, RemoteStorage, ZTenantTimelineId, }, @@ -26,8 +23,6 @@ use super::{compression::ArchiveHeader, index::RemoteTimelineIndex, NewCheckpoin /// Attempts to compress and upload given checkpoint files. /// No extra checks for overlapping files is made: download takes care of that, ensuring no non-metadata local timeline files are overwritten. /// -/// Before the checkpoint files are uploaded, branch files are uploaded, if any local ones are missing remotely. -/// /// On an error, bumps the retries count and reschedules the entire task. /// On success, populates index data with new downloads. pub(super) async fn upload_timeline_checkpoint< @@ -41,19 +36,6 @@ pub(super) async fn upload_timeline_checkpoint< retries: u32, ) -> Option { debug!("Uploading checkpoint for sync id {}", sync_id); - if let Err(e) = upload_missing_branches(config, remote_assets.as_ref(), sync_id.tenant_id).await - { - error!( - "Failed to upload missing branches for sync id {}: {:?}", - sync_id, e - ); - sync_queue::push(SyncTask::new( - sync_id, - retries, - SyncKind::Upload(new_checkpoint), - )); - return Some(false); - } let new_upload_lsn = new_checkpoint.metadata.disk_consistent_lsn(); let index = &remote_assets.1; @@ -201,76 +183,6 @@ async fn try_upload_checkpoint< .map(|(header, header_size, _)| (header, header_size)) } -async fn upload_missing_branches< - P: std::fmt::Debug + Send + Sync + 'static, - S: RemoteStorage + Send + Sync + 'static, ->( - config: &'static PageServerConf, - (storage, index): &(S, RwLock), - tenant_id: ZTenantId, -) -> anyhow::Result<()> { - let local_branches = tenant_branch_files(config, tenant_id) - .await - .context("Failed to list local branch files for the tenant")?; - let index_read = index.read().await; - let remote_branches = index_read - .branch_files(tenant_id) - .cloned() - .unwrap_or_default(); - drop(index_read); - - let mut branch_uploads = local_branches - .difference(&remote_branches) - .map(|local_only_branch| async move { - let local_branch_path = local_only_branch.as_path(&config.branches_path(&tenant_id)); - let storage_path = storage.storage_path(&local_branch_path).with_context(|| { - format!( - "Failed to derive a storage path for branch with local path '{}'", - local_branch_path.display() - ) - })?; - let local_branch_file = fs::OpenOptions::new() - .read(true) - .open(&local_branch_path) - .await - .with_context(|| { - format!( - "Failed to open local branch file {} for reading", - local_branch_path.display() - ) - })?; - storage - .upload(local_branch_file, &storage_path) - .await - .with_context(|| { - format!( - "Failed to upload branch file to the remote path {:?}", - storage_path - ) - })?; - Ok::<_, anyhow::Error>(local_only_branch) - }) - .collect::>(); - - let mut branch_uploads_failed = false; - while let Some(upload_result) = branch_uploads.next().await { - match upload_result { - Ok(local_only_branch) => index - .write() - .await - .add_branch_file(tenant_id, local_only_branch.clone()), - Err(e) => { - error!("Failed to upload branch file: {:?}", e); - branch_uploads_failed = true; - } - } - } - - ensure!(!branch_uploads_failed, "Failed to upload all branch files"); - - Ok(()) -} - #[cfg(test)] mod tests { use tempfile::tempdir; diff --git a/pageserver/src/repository.rs b/pageserver/src/repository.rs index 6142953a58..674d447624 100644 --- a/pageserver/src/repository.rs +++ b/pageserver/src/repository.rs @@ -36,6 +36,10 @@ pub trait Repository: Send + Sync { /// Get Timeline handle for given zenith timeline ID. fn get_timeline(&self, timelineid: ZTimelineId) -> Result; + /// Lists timelines the repository contains. + /// Up to repository's implementation to omit certain timelines that ar not considered ready for use. + fn list_timelines(&self) -> Result>; + /// Create a new, empty timeline. The caller is responsible for loading data into it /// Initdb lsn is provided for timeline impl to be able to perform checks for some operations against it. fn create_empty_timeline( @@ -72,7 +76,10 @@ pub trait Repository: Send + Sync { pub enum RepositoryTimeline { /// Timeline, with its files present locally in pageserver's working directory. /// Loaded into pageserver's memory and ready to be used. - Local(Arc), + Local { + id: ZTimelineId, + timeline: Arc, + }, /// Timeline, found on the pageserver's remote storage, but not yet downloaded locally. Remote { id: ZTimelineId, @@ -83,12 +90,19 @@ pub enum RepositoryTimeline { impl RepositoryTimeline { pub fn local_timeline(&self) -> Option> { - if let Self::Local(local_timeline) = self { - Some(Arc::clone(local_timeline)) + if let Self::Local { timeline, .. } = self { + Some(Arc::clone(timeline)) } else { None } } + + pub fn id(&self) -> ZTimelineId { + match self { + Self::Local { id, .. } => *id, + Self::Remote { id, .. } => *id, + } + } } /// A state of the timeline synchronization with the remote storage. @@ -390,7 +404,6 @@ pub mod repo_harness { let tenant_id = ZTenantId::generate(); fs::create_dir_all(conf.tenant_path(&tenant_id))?; - fs::create_dir_all(conf.branches_path(&tenant_id))?; Ok(Self { conf, tenant_id }) } diff --git a/pageserver/src/tenant_mgr.rs b/pageserver/src/tenant_mgr.rs index d60b5fefd3..98777e5e4b 100644 --- a/pageserver/src/tenant_mgr.rs +++ b/pageserver/src/tenant_mgr.rs @@ -1,12 +1,12 @@ //! This module acts as a switchboard to access different repositories managed by this //! page server. -use crate::branches; use crate::config::PageServerConf; use crate::layered_repository::LayeredRepository; use crate::repository::{Repository, Timeline, TimelineSyncState}; use crate::thread_mgr; use crate::thread_mgr::ThreadKind; +use crate::timelines; use crate::walredo::PostgresRedoManager; use crate::CheckpointConfig; use anyhow::{bail, Context, Result}; @@ -182,7 +182,7 @@ pub fn create_repository_for_tenant( tenantid: ZTenantId, ) -> Result<()> { let wal_redo_manager = Arc::new(PostgresRedoManager::new(conf, tenantid)); - let repo = branches::create_repo(conf, tenantid, wal_redo_manager)?; + let repo = timelines::create_repo(conf, tenantid, wal_redo_manager)?; match access_tenants().entry(tenantid) { hash_map::Entry::Occupied(_) => bail!("tenant {} already exists", tenantid), diff --git a/pageserver/src/branches.rs b/pageserver/src/timelines.rs similarity index 70% rename from pageserver/src/branches.rs rename to pageserver/src/timelines.rs index 43f27af5ea..4a84b434a9 100644 --- a/pageserver/src/branches.rs +++ b/pageserver/src/timelines.rs @@ -1,5 +1,5 @@ //! -//! Branch management code +//! Timeline management code //! // TODO: move all paths construction to conf impl // @@ -27,8 +27,7 @@ use crate::{import_datadir, LOG_FILE_NAME}; use crate::{repository::RepositoryTimeline, tenant_mgr}; #[derive(Serialize, Deserialize, Clone)] -pub struct BranchInfo { - pub name: String, +pub struct TimelineInfo { #[serde(with = "hex")] pub timeline_id: ZTimelineId, pub latest_valid_lsn: Lsn, @@ -38,59 +37,6 @@ pub struct BranchInfo { pub current_logical_size_non_incremental: Option, } -impl BranchInfo { - pub fn from_path>( - path: T, - repo: &Arc, - include_non_incremental_logical_size: bool, - ) -> Result { - let path = path.as_ref(); - let name = path.file_name().unwrap().to_string_lossy().to_string(); - let timeline_id = std::fs::read_to_string(path) - .with_context(|| { - format!( - "Failed to read branch file contents at path '{}'", - path.display() - ) - })? - .parse::()?; - - let timeline = match repo.get_timeline(timeline_id)? { - RepositoryTimeline::Local(local_entry) => local_entry, - RepositoryTimeline::Remote { .. } => { - bail!("Timeline {} is remote, no branches to display", timeline_id) - } - }; - - // we use ancestor lsn zero if we don't have an ancestor, so turn this into an option based on timeline id - let (ancestor_id, ancestor_lsn) = match timeline.get_ancestor_timeline_id() { - Some(ancestor_id) => ( - Some(ancestor_id.to_string()), - Some(timeline.get_ancestor_lsn().to_string()), - ), - None => (None, None), - }; - - // non incremental size calculation can be heavy, so let it be optional - // needed for tests to check size calculation - let current_logical_size_non_incremental = include_non_incremental_logical_size - .then(|| { - timeline.get_current_logical_size_non_incremental(timeline.get_last_record_lsn()) - }) - .transpose()?; - - Ok(BranchInfo { - name, - timeline_id, - latest_valid_lsn: timeline.get_last_record_lsn(), - ancestor_id, - ancestor_lsn, - current_logical_size: timeline.get_current_logical_size(), - current_logical_size_non_incremental, - }) - } -} - #[derive(Debug, Clone, Copy)] pub struct PointInTime { pub timelineid: ZTimelineId, @@ -140,7 +86,6 @@ pub fn create_repo( .with_context(|| format!("could not create directory {}", repo_dir.display()))?; crashsafe_dir::create_dir(conf.timelines_path(&tenantid))?; - crashsafe_dir::create_dir_all(conf.branches_path(&tenantid))?; crashsafe_dir::create_dir_all(conf.tags_path(&tenantid))?; info!("created directory structure in {}", repo_dir.display()); @@ -198,7 +143,7 @@ fn run_initdb(conf: &'static PageServerConf, initdbpath: &Path) -> Result<()> { .output() .context("failed to execute initdb")?; if !initdb_output.status.success() { - anyhow::bail!( + bail!( "initdb failed: '{}'", String::from_utf8_lossy(&initdb_output.stderr) ); @@ -245,65 +190,80 @@ fn bootstrap_timeline( timeline.get_last_record_lsn() ); - let data = tli.to_string(); - fs::write(conf.branch_path("main", &tenantid), data)?; - println!("created main branch"); - // Remove temp dir. We don't need it anymore fs::remove_dir_all(pgdata_path)?; Ok(()) } -pub(crate) fn get_branches( - conf: &PageServerConf, - tenantid: &ZTenantId, +pub(crate) fn get_timelines( + tenant_id: ZTenantId, include_non_incremental_logical_size: bool, -) -> Result> { - let repo = tenant_mgr::get_repository_for_tenant(*tenantid)?; +) -> Result> { + let repo = tenant_mgr::get_repository_for_tenant(tenant_id) + .with_context(|| format!("Failed to get repo for tenant {}", tenant_id))?; - // Each branch has a corresponding record (text file) in the refs/branches - // with timeline_id. - let branches_dir = conf.branches_path(tenantid); - - std::fs::read_dir(&branches_dir) - .with_context(|| { - format!( - "Found no branches directory '{}' for tenant {}", - branches_dir.display(), - tenantid - ) - })? - .map(|dir_entry_res| { - let dir_entry = dir_entry_res.with_context(|| { - format!( - "Failed to list branches directory '{}' content for tenant {}", - branches_dir.display(), - tenantid - ) - })?; - BranchInfo::from_path( - dir_entry.path(), - &repo, - include_non_incremental_logical_size, - ) + Ok(repo + .list_timelines() + .with_context(|| format!("Failed to list timelines for tenant {}", tenant_id))? + .into_iter() + .filter_map(|timeline| match timeline { + RepositoryTimeline::Local { timeline, id } => Some((id, timeline)), + RepositoryTimeline::Remote { .. } => None, }) - .collect() + .map(|(timeline_id, timeline)| { + let (ancestor_id, ancestor_lsn) = match timeline.get_ancestor_timeline_id() { + Some(ancestor_id) => ( + Some(ancestor_id.to_string()), + Some(timeline.get_ancestor_lsn().to_string()), + ), + None => (None, None), + }; + + let current_logical_size_non_incremental = if include_non_incremental_logical_size { + match timeline + .get_current_logical_size_non_incremental(timeline.get_last_record_lsn()) + { + Ok(size) => Some(size), + Err(e) => { + error!( + "Failed to get current logical size for timeline {}: {:?}", + timeline_id, e + ); + None + } + } + } else { + None + }; + + TimelineInfo { + timeline_id, + latest_valid_lsn: timeline.get_last_record_lsn(), + ancestor_id, + ancestor_lsn, + current_logical_size: timeline.get_current_logical_size(), + // non incremental size calculation can be heavy, so let it be optional + // needed for tests to check size calculation + current_logical_size_non_incremental, + } + }) + .collect()) } -pub(crate) fn create_branch( +pub(crate) fn create_timeline( conf: &PageServerConf, - branchname: &str, startpoint_str: &str, - tenantid: &ZTenantId, -) -> Result { - let repo = tenant_mgr::get_repository_for_tenant(*tenantid)?; + tenant_id: ZTenantId, + timeline_id: ZTimelineId, +) -> Result { + let repo = tenant_mgr::get_repository_for_tenant(tenant_id)?; - if conf.branch_path(branchname, tenantid).exists() { - anyhow::bail!("branch {} already exists", branchname); + if conf.timeline_path(&timeline_id, &tenant_id).exists() { + bail!("timeline {} already exists", timeline_id); } - let mut startpoint = parse_point_in_time(conf, startpoint_str, tenantid)?; + let mut startpoint = parse_point_in_time(conf, startpoint_str, &tenant_id)?; let timeline = repo .get_timeline(startpoint.timelineid)? .local_timeline() @@ -325,10 +285,10 @@ pub(crate) fn create_branch( startpoint.lsn = startpoint.lsn.align(); if timeline.get_ancestor_lsn() > startpoint.lsn { // can we safely just branch from the ancestor instead? - anyhow::bail!( - "invalid startpoint {} for the branch {}: less than timeline ancestor lsn {:?}", + bail!( + "invalid startpoint {} for the timeline {}: less than timeline ancestor lsn {:?}", startpoint.lsn, - branchname, + timeline_id, timeline.get_ancestor_lsn() ); } @@ -342,11 +302,11 @@ pub(crate) fn create_branch( // Remember the human-readable branch name for the new timeline. // FIXME: there's a race condition, if you create a branch with the same // name concurrently. + // TODO kb timeline creation needs more let data = new_timeline_id.to_string(); - fs::write(conf.branch_path(branchname, tenantid), data)?; + fs::write(conf.timeline_path(&timeline_id, &tenant_id), data)?; - Ok(BranchInfo { - name: branchname.to_string(), + Ok(TimelineInfo { timeline_id: new_timeline_id, latest_valid_lsn: startpoint.lsn, ancestor_id: Some(startpoint.timelineid.to_string()), @@ -367,14 +327,6 @@ pub(crate) fn create_branch( // A specific LSN on a timeline: // bc62e7d612d0e6fe8f99a6dd2f281f9d@2/15D3DD8 // -// Same, with a human-friendly branch name: -// main -// main@2/15D3DD8 -// -// Human-friendly tag name: -// mytag -// -// fn parse_point_in_time( conf: &PageServerConf, s: &str, @@ -399,18 +351,6 @@ fn parse_point_in_time( } } - // Check if it's a branch - // Check if it's branch @ LSN - let branchpath = conf.branch_path(name, tenantid); - if branchpath.exists() { - let pointstr = fs::read_to_string(branchpath)?; - - let mut result = parse_point_in_time(conf, &pointstr, tenantid)?; - - result.lsn = lsn.unwrap_or(Lsn(0)); - return Ok(result); - } - // Check if it's a timelineid // Check if it's timelineid @ LSN if let Ok(timelineid) = ZTimelineId::from_str(name) { diff --git a/zenith/src/main.rs b/zenith/src/main.rs index bc42af5943..9f8996a540 100644 --- a/zenith/src/main.rs +++ b/zenith/src/main.rs @@ -21,7 +21,7 @@ use zenith_utils::postgres_backend::AuthType; use zenith_utils::zid::{ZNodeId, ZTenantId, ZTimelineId}; use zenith_utils::GIT_VERSION; -use pageserver::branches::BranchInfo; +use pageserver::timelines::TimelineInfo; // Default id of a safekeeper node, if not specified on the command line. const DEFAULT_SAFEKEEPER_ID: ZNodeId = ZNodeId(1); @@ -53,12 +53,12 @@ http_port = {safekeeper_http_port} } /// -/// Branches tree element used as a value in the HashMap. +/// Timelines tree element used as a value in the HashMap. /// -struct BranchTreeEl { - /// `BranchInfo` received from the `pageserver` via the `branch_list` libpq API call. - pub info: BranchInfo, - /// Holds all direct children of this branch referenced using `timeline_id`. +struct TimelineTreeEl { + /// `TimelineInfo` received from the `pageserver` via the `timeline_list` libpq API call. + pub info: TimelineInfo, + /// Holds all direct children of this timeline referenced using `timeline_id`. pub children: Vec, } @@ -84,7 +84,7 @@ fn main() -> Result<()> { let timeline_arg = Arg::new("timeline") .index(2) - .help("Branch name or a point-in time specification") + .help("Timeline id or a point-in time specification") .required(false); let tenantid_arg = Arg::new("tenantid") @@ -129,9 +129,9 @@ fn main() -> Result<()> { ) ) .subcommand( - App::new("branch") - .about("Create a new branch") - .arg(Arg::new("branchname").required(false).index(1)) + App::new("timeline") + .about("Create a new timeline") + .arg(Arg::new("timeline-name").required(false).index(1)) .arg(Arg::new("start-point").required(false).index(2)) .arg(tenantid_arg.clone()), ).subcommand( @@ -239,7 +239,7 @@ fn main() -> Result<()> { match sub_name { "tenant" => handle_tenant(sub_args, &env), - "branch" => handle_branch(sub_args, &env), + "timeline" => handle_timeline(sub_args, &env), "start" => handle_start_all(sub_args, &env), "stop" => handle_stop_all(sub_args, &env), "pageserver" => handle_pageserver(sub_args, &env), @@ -257,43 +257,42 @@ fn main() -> Result<()> { } /// -/// Prints branches list as a tree-like structure. +/// Prints timelines list as a tree-like structure. /// -fn print_branches_tree(branches: Vec) -> Result<()> { - let mut branches_hash: HashMap = HashMap::new(); +fn print_timelines_tree(timelines: Vec) -> Result<()> { + let mut timelines_hash: HashMap = timelines + .iter() + .map(|t| { + ( + t.timeline_id.to_string(), + TimelineTreeEl { + info: t.clone(), + children: Vec::new(), + }, + ) + }) + .collect(); - // Form a hash table of branch timeline_id -> BranchTreeEl. - for branch in &branches { - branches_hash.insert( - branch.timeline_id.to_string(), - BranchTreeEl { - info: branch.clone(), - children: Vec::new(), - }, - ); - } - - // Memorize all direct children of each branch. - for branch in &branches { - if let Some(tid) = &branch.ancestor_id { - branches_hash + // Memorize all direct children of each timeline. + for timeline in &timelines { + if let Some(tid) = &timeline.ancestor_id { + timelines_hash .get_mut(tid) - .context("missing branch info in the HashMap")? + .context("missing timeline info in the HashMap")? .children - .push(branch.timeline_id.to_string()); + .push(timeline.timeline_id.to_string()); } } // Sort children by tid to bring some minimal order. - for branch in &mut branches_hash.values_mut() { - branch.children.sort(); + for timeline in &mut timelines_hash.values_mut() { + timeline.children.sort(); } - for branch in branches_hash.values() { - // Start with root branches (no ancestors) first. - // Now there is 'main' branch only, but things may change. - if branch.info.ancestor_id.is_none() { - print_branch(0, &Vec::from([true]), branch, &branches_hash)?; + for timeline in timelines_hash.values() { + // Start with root timelines (no ancestors) first. + if timeline.info.ancestor_id.is_none() { + print_timeline(0, &Vec::from([true]), timeline, &timelines_hash)?; } } @@ -301,27 +300,27 @@ fn print_branches_tree(branches: Vec) -> Result<()> { } /// -/// Recursively prints branch info with all its children. +/// Recursively prints timeline info with all its children. /// -fn print_branch( +fn print_timeline( nesting_level: usize, is_last: &[bool], - branch: &BranchTreeEl, - branches: &HashMap, + timeline: &TimelineTreeEl, + timelines: &HashMap, ) -> Result<()> { // Draw main padding print!(" "); if nesting_level > 0 { - let lsn = branch + let lsn = timeline .info .ancestor_lsn .as_ref() - .context("missing branch info in the HashMap")?; + .context("missing timeline info in the HashMap")?; let mut br_sym = "┣━"; // Draw each nesting padding with proper style - // depending on whether its branch ended or not. + // depending on whether its timeline ended or not. if nesting_level > 1 { for l in &is_last[1..is_last.len() - 1] { if *l { @@ -332,7 +331,7 @@ fn print_branch( } } - // We are the last in this sub-branch + // We are the last in this sub-timeline if *is_last.last().unwrap() { br_sym = "┗━"; } @@ -340,51 +339,51 @@ fn print_branch( print!("{} @{}: ", br_sym, lsn); } - // Finally print a branch name with new line - println!("{}", branch.info.name); + // Finally print a timeline name with new line + println!("{}", timeline.info.timeline_id); - let len = branch.children.len(); + let len = timeline.children.len(); let mut i: usize = 0; let mut is_last_new = Vec::from(is_last); is_last_new.push(false); - for child in &branch.children { + for child in &timeline.children { i += 1; - // Mark that the last padding is the end of the branch + // Mark that the last padding is the end of the timeline if i == len { if let Some(last) = is_last_new.last_mut() { *last = true; } } - print_branch( + print_timeline( nesting_level + 1, &is_last_new, - branches + timelines .get(child) - .context("missing branch info in the HashMap")?, - branches, + .context("missing timeline info in the HashMap")?, + timelines, )?; } Ok(()) } -/// Returns a map of timeline IDs to branch_name@lsn strings. +/// Returns a map of timeline IDs to timeline_id@lsn strings. /// Connects to the pageserver to query this information. -fn get_branch_infos( +fn get_timeline_infos( env: &local_env::LocalEnv, tenantid: &ZTenantId, -) -> Result> { +) -> Result> { let page_server = PageServerNode::from_env(env); - let branch_infos: Vec = page_server.branch_list(tenantid)?; - let branch_infos: HashMap = branch_infos + let timeline_infos: Vec = page_server.timeline_list(tenantid)?; + let timeline_infos: HashMap = timeline_infos .into_iter() - .map(|branch_info| (branch_info.timeline_id, branch_info)) + .map(|timeline_info| (timeline_info.timeline_id, timeline_info)) .collect(); - Ok(branch_infos) + Ok(timeline_infos) } // Helper function to parse --tenantid option, or get the default from config file @@ -459,24 +458,28 @@ fn handle_tenant(tenant_match: &ArgMatches, env: &local_env::LocalEnv) -> Result Ok(()) } -fn handle_branch(branch_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> { +fn handle_timeline(timeline_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> { let pageserver = PageServerNode::from_env(env); - let tenantid = get_tenantid(branch_match, env)?; + let tenant_id = get_tenantid(timeline_match, env)?; - if let Some(branchname) = branch_match.value_of("branchname") { - let startpoint_str = branch_match + if let Some(timeline_id) = timeline_match.value_of("timeline-id") { + let startpoint_str = timeline_match .value_of("start-point") .context("Missing start-point")?; - let branch = pageserver.branch_create(branchname, startpoint_str, &tenantid)?; + let timeline_id = timeline_id + .parse::() + .context("Failed to parse timeline id from the request")?; + let timeline = + pageserver.timeline_create(timeline_id, startpoint_str.to_owned(), tenant_id)?; println!( - "Created branch '{}' at {:?} for tenant: {}", - branch.name, branch.latest_valid_lsn, tenantid, + "Created timeline '{}' at {:?} for tenant: {}", + timeline.timeline_id, timeline.latest_valid_lsn, tenant_id, ); } else { - // No arguments, list branches for tenant - let branches = pageserver.branch_list(&tenantid)?; - print_branches_tree(branches)?; + // No arguments, list timelines for tenant + let timelines = pageserver.timeline_list(&tenant_id)?; + print_timelines_tree(timelines)?; } Ok(()) @@ -495,12 +498,12 @@ fn handle_pg(pg_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> { match sub_name { "list" => { - let branch_infos = get_branch_infos(env, &tenantid).unwrap_or_else(|e| { - eprintln!("Failed to load branch info: {}", e); + let timeline_infos = get_timeline_infos(env, &tenantid).unwrap_or_else(|e| { + eprintln!("Failed to load timeline info: {}", e); HashMap::new() }); - println!("NODE\tADDRESS\t\tBRANCH\tLSN\t\tSTATUS"); + println!("NODE\tADDRESS\t\tTIMELINE\tLSN\t\tSTATUS"); for ((_, node_name), node) in cplane .nodes .iter() @@ -509,7 +512,7 @@ fn handle_pg(pg_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> { // FIXME: This shows the LSN at the end of the timeline. It's not the // right thing to do for read-only nodes that might be anchored at an // older point in time, or following but lagging behind the primary. - let lsn_str = branch_infos + let lsn_str = timeline_infos .get(&node.timelineid) .map(|bi| bi.latest_valid_lsn.to_string()) .unwrap_or_else(|| "?".to_string()); @@ -518,7 +521,7 @@ fn handle_pg(pg_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> { "{}\t{}\t{}\t{}\t{}", node_name, node.address, - node.timelineid, // FIXME: resolve human-friendly branch name + node.timelineid, lsn_str, node.status(), ); @@ -526,17 +529,17 @@ fn handle_pg(pg_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> { } "create" => { let node_name = sub_args.value_of("node").unwrap_or("main"); - let timeline_name = sub_args.value_of("timeline").unwrap_or(node_name); + let timeline_spec = sub_args.value_of("timeline"); let port: Option = match sub_args.value_of("port") { Some(p) => Some(p.parse()?), None => None, }; - cplane.new_node(tenantid, node_name, timeline_name, port)?; + cplane.new_node(tenantid, node_name, timeline_spec, port)?; } "start" => { let node_name = sub_args.value_of("node").unwrap_or("main"); - let timeline_name = sub_args.value_of("timeline"); + let timeline_spec = sub_args.value_of("timeline"); let port: Option = match sub_args.value_of("port") { Some(p) => Some(p.parse()?), @@ -554,8 +557,8 @@ fn handle_pg(pg_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> { }; if let Some(node) = node { - if timeline_name.is_some() { - println!("timeline name ignored because node exists already"); + if timeline_spec.is_some() { + println!("timeline spec ignored because its node exists already"); } println!("Starting existing postgres {}...", node_name); node.start(&auth_token)?; @@ -565,12 +568,11 @@ fn handle_pg(pg_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> { // start --port X // stop // start <-- will also use port X even without explicit port argument - let timeline_name = timeline_name.unwrap_or(node_name); println!( - "Starting new postgres {} on {}...", - node_name, timeline_name + "Starting new postgres {} on timeline {:?} ...", + node_name, timeline_spec ); - let node = cplane.new_node(tenantid, node_name, timeline_name, port)?; + let node = cplane.new_node(tenantid, node_name, timeline_spec, port)?; node.start(&auth_token)?; } } @@ -585,9 +587,7 @@ fn handle_pg(pg_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> { node.stop(destroy)?; } - _ => { - bail!("Unexpected pg subcommand '{}'", sub_name) - } + _ => bail!("Unexpected pg subcommand '{}'", sub_name), } Ok(())