Use timeline instead of branch in pageserver's API

This commit is contained in:
Kirill Bulatov
2022-02-04 10:37:39 -05:00
committed by Kirill Bulatov
parent 2883a25761
commit 10f811e886
18 changed files with 311 additions and 800 deletions

View File

@@ -73,39 +73,43 @@ impl ComputeControlPlane {
.unwrap_or(self.base_port)
}
// FIXME: see also parse_point_in_time in branches.rs.
// FIXME: see also parse_point_in_time in timelines.rs.
fn parse_point_in_time(
&self,
tenantid: ZTenantId,
tenant_id: ZTenantId,
s: &str,
) -> Result<(ZTimelineId, Option<Lsn>)> {
let mut strings = s.split('@');
let name = strings.next().unwrap();
let _strings = s.split('@');
// let name = strings.next().unwrap();
let lsn = strings
.next()
.map(Lsn::from_str)
.transpose()
.context("invalid LSN in point-in-time specification")?;
// let lsn = strings
// .next()
// .map(Lsn::from_str)
// .transpose()
// .context("invalid LSN in point-in-time specification")?;
// Resolve the timeline ID, given the human-readable branch name
let timeline_id = self
.pageserver
.branch_get_by_name(&tenantid, name)?
.timeline_id;
// // Resolve the timeline ID, given the human-readable branch name
// let timeline_id = self
// .pageserver
// .branch_get_by_name(&tenant_id, name)?
// .timeline_id;
Ok((timeline_id, lsn))
// Ok((timeline_id, lsn))
todo!("TODO kb check more about the '@name' format")
}
pub fn new_node(
&mut self,
tenantid: ZTenantId,
name: &str,
timeline_spec: &str,
timeline_spec: Option<&str>,
port: Option<u16>,
) -> Result<Arc<PostgresNode>> {
// Resolve the human-readable timeline spec into timeline ID and LSN
let (timelineid, lsn) = self.parse_point_in_time(tenantid, timeline_spec)?;
let (timelineid, lsn) = match timeline_spec {
Some(timeline_spec) => self.parse_point_in_time(tenantid, timeline_spec)?,
None => (ZTimelineId::generate(), None),
};
let port = port.unwrap_or_else(|| self.get_port());
let node = Arc::new(PostgresNode {

View File

@@ -9,18 +9,18 @@ use anyhow::bail;
use nix::errno::Errno;
use nix::sys::signal::{kill, Signal};
use nix::unistd::Pid;
use pageserver::http::models::{BranchCreateRequest, TenantCreateRequest};
use pageserver::http::models::{TenantCreateRequest, TimelineCreateRequest};
use pageserver::timelines::TimelineInfo;
use postgres::{Config, NoTls};
use reqwest::blocking::{Client, RequestBuilder, Response};
use reqwest::{IntoUrl, Method};
use thiserror::Error;
use zenith_utils::http::error::HttpErrorBody;
use zenith_utils::postgres_backend::AuthType;
use zenith_utils::zid::ZTenantId;
use zenith_utils::zid::{ZTenantId, ZTimelineId};
use crate::local_env::LocalEnv;
use crate::{fill_rust_env_vars, read_pidfile};
use pageserver::branches::BranchInfo;
use pageserver::tenant_mgr::TenantInfo;
use zenith_utils::connstring::connection_address;
@@ -335,47 +335,32 @@ impl PageServerNode {
.json()?)
}
pub fn branch_list(&self, tenantid: &ZTenantId) -> Result<Vec<BranchInfo>> {
pub fn timeline_list(&self, tenantid: &ZTenantId) -> Result<Vec<TimelineInfo>> {
Ok(self
.http_request(
Method::GET,
format!("{}/branch/{}", self.http_base_url, tenantid),
format!("{}/timeline/{}", self.http_base_url, tenantid),
)
.send()?
.error_from_body()?
.json()?)
}
pub fn branch_create(
pub fn timeline_create(
&self,
branch_name: &str,
startpoint: &str,
tenantid: &ZTenantId,
) -> Result<BranchInfo> {
timeline_id: ZTimelineId,
start_point: String,
tenant_id: ZTenantId,
) -> Result<TimelineInfo> {
Ok(self
.http_request(Method::POST, format!("{}/branch", self.http_base_url))
.json(&BranchCreateRequest {
tenant_id: tenantid.to_owned(),
name: branch_name.to_owned(),
start_point: startpoint.to_owned(),
.http_request(Method::POST, format!("{}/timeline", self.http_base_url))
.json(&TimelineCreateRequest {
tenant_id,
timeline_id,
start_point,
})
.send()?
.error_from_body()?
.json()?)
}
pub fn branch_get_by_name(
&self,
tenantid: &ZTenantId,
branch_name: &str,
) -> Result<BranchInfo> {
Ok(self
.http_request(
Method::GET,
format!("{}/branch/{}/{}", self.http_base_url, tenantid, branch_name),
)
.send()?
.error_for_status()?
.json()?)
}
}

View File

@@ -10,11 +10,10 @@ use clap::{App, Arg};
use daemonize::Daemonize;
use pageserver::{
branches,
config::{defaults::*, PageServerConf},
http, page_cache, page_service, remote_storage, tenant_mgr, thread_mgr,
thread_mgr::ThreadKind,
virtual_file, LOG_FILE_NAME,
timelines, virtual_file, LOG_FILE_NAME,
};
use zenith_utils::http::endpoint;
use zenith_utils::postgres_backend;
@@ -143,7 +142,7 @@ fn main() -> Result<()> {
// Create repo and exit if init was requested
if init {
branches::init_pageserver(conf, create_tenant).context("Failed to init pageserver")?;
timelines::init_pageserver(conf, create_tenant).context("Failed to init pageserver")?;
// write the config file
std::fs::write(&cfg_file_path, toml.to_string()).with_context(|| {
format!(

View File

@@ -400,14 +400,6 @@ impl PageServerConf {
self.tags_path(tenantid).join(tag_name)
}
pub fn branches_path(&self, tenantid: &ZTenantId) -> PathBuf {
self.tenant_path(tenantid).join("refs").join("branches")
}
pub fn branch_path(&self, branch_name: &str, tenantid: &ZTenantId) -> PathBuf {
self.branches_path(tenantid).join(branch_name)
}
pub fn timelines_path(&self, tenantid: &ZTenantId) -> PathBuf {
self.tenant_path(tenantid).join(TIMELINES_SEGMENT_NAME)
}

View File

@@ -1,13 +1,15 @@
use serde::{Deserialize, Serialize};
use zenith_utils::zid::ZTimelineId;
use crate::ZTenantId;
use zenith_utils::zid::ZNodeId;
#[derive(Serialize, Deserialize)]
pub struct BranchCreateRequest {
pub struct TimelineCreateRequest {
#[serde(with = "hex")]
pub tenant_id: ZTenantId,
pub name: String,
#[serde(with = "hex")]
pub timeline_id: ZTimelineId,
pub start_point: String,
}

View File

@@ -30,19 +30,22 @@ paths:
schema:
type: string
format: hex
- name: include-non-incremental-logical-size
in: query
schema:
type: string
description: Controls calculation of current_logical_size_non_incremental
get:
description: List tenant timelines
description: Get timelines for tenant
responses:
"200":
description: array of brief timeline descriptions
description: TimelineInfo
content:
application/json:
schema:
type: array
items:
# currently, just a timeline id string, but when remote index gets to be accessed
# remote/local timeline field would be added at least
type: string
$ref: "#/components/schemas/TimelineInfo"
"400":
description: Error when no tenant id found in path
content:
@@ -81,8 +84,13 @@ paths:
schema:
type: string
format: hex
- name: include-non-incremental-logical-size
in: query
schema:
type: string
description: Controls calculation of current_logical_size_non_incremental
get:
description: Get timeline info for tenant's remote timeline
description: Get timelines for tenant
responses:
"200":
description: TimelineInfo
@@ -91,7 +99,7 @@ paths:
schema:
$ref: "#/components/schemas/TimelineInfo"
"400":
description: Error when no tenant id found in path or no branch name
description: Error when no tenant id found in path or no timeline id
content:
application/json:
schema:
@@ -114,108 +122,9 @@ paths:
application/json:
schema:
$ref: "#/components/schemas/Error"
/v1/branch/{tenant_id}:
parameters:
- name: tenant_id
in: path
required: true
schema:
type: string
format: hex
- name: include-non-incremental-logical-size
in: query
schema:
type: string
description: Controls calculation of current_logical_size_non_incremental
get:
description: Get branches for tenant
responses:
"200":
description: BranchInfo
content:
application/json:
schema:
type: array
items:
$ref: "#/components/schemas/BranchInfo"
"400":
description: Error when no tenant id found in path
content:
application/json:
schema:
$ref: "#/components/schemas/Error"
"401":
description: Unauthorized Error
content:
application/json:
schema:
$ref: "#/components/schemas/UnauthorizedError"
"403":
description: Forbidden Error
content:
application/json:
schema:
$ref: "#/components/schemas/ForbiddenError"
"500":
description: Generic operation error
content:
application/json:
schema:
$ref: "#/components/schemas/Error"
/v1/branch/{tenant_id}/{branch_name}:
parameters:
- name: tenant_id
in: path
required: true
schema:
type: string
format: hex
- name: branch_name
in: path
required: true
schema:
type: string
- name: include-non-incremental-logical-size
in: query
schema:
type: string
description: Controls calculation of current_logical_size_non_incremental
get:
description: Get branches for tenant
responses:
"200":
description: BranchInfo
content:
application/json:
schema:
$ref: "#/components/schemas/BranchInfo"
"400":
description: Error when no tenant id found in path or no branch name
content:
application/json:
schema:
$ref: "#/components/schemas/Error"
"401":
description: Unauthorized Error
content:
application/json:
schema:
$ref: "#/components/schemas/UnauthorizedError"
"403":
description: Forbidden Error
content:
application/json:
schema:
$ref: "#/components/schemas/ForbiddenError"
"500":
description: Generic operation error
content:
application/json:
schema:
$ref: "#/components/schemas/Error"
/v1/branch/:
/v1/timeline/:
post:
description: Create branch
description: Create timeline
requestBody:
content:
application/json:
@@ -223,25 +132,26 @@ paths:
type: object
required:
- "tenant_id"
- "name"
- "timeline_id"
- "start_point"
properties:
tenant_id:
type: string
format: hex
name:
timeline_id:
type: string
format: hex
start_point:
type: string
responses:
"201":
description: BranchInfo
description: TimelineInfo
content:
application/json:
schema:
$ref: "#/components/schemas/BranchInfo"
$ref: "#/components/schemas/TImelineInfo"
"400":
description: Malformed branch create request
description: Malformed timeline create request
content:
application/json:
schema:
@@ -358,16 +268,13 @@ components:
type: string
state:
type: string
BranchInfo:
TimelineInfo:
type: object
required:
- name
- timeline_id
- latest_valid_lsn
- current_logical_size
properties:
name:
type: string
timeline_id:
type: string
format: hex

View File

@@ -1,6 +1,6 @@
use std::sync::Arc;
use anyhow::{Context, Result};
use anyhow::Result;
use hyper::StatusCode;
use hyper::{Body, Request, Response, Uri};
use serde::Serialize;
@@ -14,7 +14,6 @@ use zenith_utils::http::{
endpoint,
error::HttpErrorBody,
json::{json_request, json_response},
request::get_request_param,
request::parse_request_param,
};
use zenith_utils::http::{RequestExt, RouterBuilder};
@@ -22,13 +21,12 @@ use zenith_utils::lsn::Lsn;
use zenith_utils::zid::HexZTimelineId;
use zenith_utils::zid::ZTimelineId;
use super::models::BranchCreateRequest;
use super::models::StatusResponse;
use super::models::TenantCreateRequest;
use crate::branches::BranchInfo;
use super::models::TimelineCreateRequest;
use crate::repository::RepositoryTimeline;
use crate::repository::TimelineSyncState;
use crate::{branches, config::PageServerConf, tenant_mgr, ZTenantId};
use crate::{config::PageServerConf, tenant_mgr, timelines, ZTenantId};
#[derive(Debug)]
struct State {
@@ -73,18 +71,18 @@ async fn status_handler(request: Request<Body>) -> Result<Response<Body>, ApiErr
)?)
}
async fn branch_create_handler(mut request: Request<Body>) -> Result<Response<Body>, ApiError> {
let request_data: BranchCreateRequest = json_request(&mut request).await?;
async fn timeline_create_handler(mut request: Request<Body>) -> Result<Response<Body>, ApiError> {
let request_data: TimelineCreateRequest = json_request(&mut request).await?;
check_permission(&request, Some(request_data.tenant_id))?;
let response_data = tokio::task::spawn_blocking(move || {
let _enter = info_span!("/branch_create", name = %request_data.name, tenant = %request_data.tenant_id, startpoint=%request_data.start_point).entered();
branches::create_branch(
let _enter = info_span!("/timeline_create", timeline = %request_data.timeline_id, tenant = %request_data.tenant_id, startpoint=%request_data.start_point).entered();
timelines::create_timeline(
get_config(&request),
&request_data.name,
&request_data.start_point,
&request_data.tenant_id,
request_data.tenant_id,
request_data.timeline_id,
)
})
.await
@@ -92,6 +90,19 @@ async fn branch_create_handler(mut request: Request<Body>) -> Result<Response<Bo
Ok(json_response(StatusCode::CREATED, response_data)?)
}
async fn timeline_list_handler(request: Request<Body>) -> Result<Response<Body>, ApiError> {
let tenant_id: ZTenantId = parse_request_param(&request, "tenant_id")?;
check_permission(&request, Some(tenant_id))?;
let include_non_incremental_logical_size = get_include_non_incremental_logical_size(&request);
let response_data = tokio::task::spawn_blocking(move || {
let _enter = info_span!("timeline_list", tenant = %tenant_id).entered();
crate::timelines::get_timelines(tenant_id, include_non_incremental_logical_size)
})
.await
.map_err(ApiError::from_err)??;
Ok(json_response(StatusCode::OK, response_data)?)
}
// Gate non incremental logical size calculation behind a flag
// after pgbench -i -s100 calculation took 28ms so if multiplied by the number of timelines
// and tenants it can take noticeable amount of time. Also the value currently used only in tests
@@ -107,90 +118,6 @@ fn get_include_non_incremental_logical_size(request: &Request<Body>) -> bool {
.unwrap_or(false)
}
async fn branch_list_handler(request: Request<Body>) -> Result<Response<Body>, ApiError> {
let tenantid: ZTenantId = parse_request_param(&request, "tenant_id")?;
let include_non_incremental_logical_size = get_include_non_incremental_logical_size(&request);
check_permission(&request, Some(tenantid))?;
let response_data = tokio::task::spawn_blocking(move || {
let _enter = info_span!("branch_list", tenant = %tenantid).entered();
crate::branches::get_branches(
get_config(&request),
&tenantid,
include_non_incremental_logical_size,
)
})
.await
.map_err(ApiError::from_err)??;
Ok(json_response(StatusCode::OK, response_data)?)
}
async fn branch_detail_handler(request: Request<Body>) -> Result<Response<Body>, ApiError> {
let tenantid: ZTenantId = parse_request_param(&request, "tenant_id")?;
let branch_name: String = get_request_param(&request, "branch_name")?.to_string();
let conf = get_state(&request).conf;
let path = conf.branch_path(&branch_name, &tenantid);
let include_non_incremental_logical_size = get_include_non_incremental_logical_size(&request);
let response_data = tokio::task::spawn_blocking(move || {
let _enter = info_span!("branch_detail", tenant = %tenantid, branch=%branch_name).entered();
let repo = tenant_mgr::get_repository_for_tenant(tenantid)?;
BranchInfo::from_path(path, &repo, include_non_incremental_logical_size)
})
.await
.map_err(ApiError::from_err)??;
Ok(json_response(StatusCode::OK, response_data)?)
}
async fn timeline_list_handler(request: Request<Body>) -> Result<Response<Body>, ApiError> {
let tenant_id: ZTenantId = parse_request_param(&request, "tenant_id")?;
check_permission(&request, Some(tenant_id))?;
let conf = get_state(&request).conf;
let timelines_dir = conf.timelines_path(&tenant_id);
let mut timelines_dir_contents =
tokio::fs::read_dir(&timelines_dir).await.with_context(|| {
format!(
"Failed to list timelines dir '{}' contents",
timelines_dir.display()
)
})?;
let mut local_timelines = Vec::new();
while let Some(entry) = timelines_dir_contents.next_entry().await.with_context(|| {
format!(
"Failed to list timelines dir '{}' contents",
timelines_dir.display()
)
})? {
let entry_path = entry.path();
let entry_type = entry.file_type().await.with_context(|| {
format!(
"Failed to get file type of timeline dirs' entry '{}'",
entry_path.display()
)
})?;
if entry_type.is_dir() {
match entry.file_name().to_string_lossy().parse::<ZTimelineId>() {
Ok(timeline_id) => local_timelines.push(timeline_id.to_string()),
Err(e) => error!(
"Failed to get parse timeline id from timeline dirs' entry '{}': {}",
entry_path.display(),
e
),
}
}
}
Ok(json_response(StatusCode::OK, local_timelines)?)
}
#[derive(Debug, Serialize)]
#[serde(tag = "type")]
enum TimelineInfo {
@@ -260,7 +187,7 @@ async fn timeline_attach_handler(request: Request<Body>) -> Result<Response<Body
.entered();
let repo = tenant_mgr::get_repository_for_tenant(tenant_id)?;
match repo.get_timeline(timeline_id)? {
RepositoryTimeline::Local(_) => {
RepositoryTimeline::Local { .. } => {
anyhow::bail!("Timeline with id {} is already local", timeline_id)
}
RepositoryTimeline::Remote {
@@ -369,9 +296,7 @@ pub fn make_router(
"/v1/timeline/:tenant_id/:timeline_id/detach",
timeline_detach_handler,
)
.get("/v1/branch/:tenant_id", branch_list_handler)
.get("/v1/branch/:tenant_id/:branch_name", branch_detail_handler)
.post("/v1/branch", branch_create_handler)
.post("/v1/timeline", timeline_create_handler)
.get("/v1/tenant", tenant_list_handler)
.post("/v1/tenant", tenant_create_handler)
.any(handler_404)

View File

@@ -137,19 +137,20 @@ pub struct LayeredRepository {
/// Public interface
impl Repository for LayeredRepository {
fn get_timeline(&self, timelineid: ZTimelineId) -> Result<RepositoryTimeline> {
let mut timelines = self.timelines.lock().unwrap();
Ok(
match self.get_or_init_timeline(timelineid, &mut timelines)? {
LayeredTimelineEntry::Local(local) => RepositoryTimeline::Local(local),
LayeredTimelineEntry::Remote {
id,
disk_consistent_lsn,
} => RepositoryTimeline::Remote {
id,
disk_consistent_lsn,
},
},
)
Ok(RepositoryTimeline::from(self.get_or_init_timeline(
timelineid,
&mut self.timelines.lock().unwrap(),
)?))
}
fn list_timelines(&self) -> Result<Vec<RepositoryTimeline>> {
Ok(self
.timelines
.lock()
.unwrap()
.values()
.map(|timeline_entry| RepositoryTimeline::from(timeline_entry.clone()))
.collect())
}
fn create_empty_timeline(
@@ -428,6 +429,24 @@ impl LayeredTimelineEntry {
}
}
impl From<LayeredTimelineEntry> for RepositoryTimeline {
fn from(layered_timeline: LayeredTimelineEntry) -> Self {
match layered_timeline {
LayeredTimelineEntry::Local(timeline) => RepositoryTimeline::Local {
id: timeline.timelineid,
timeline,
},
LayeredTimelineEntry::Remote {
id,
disk_consistent_lsn,
} => RepositoryTimeline::Remote {
id,
disk_consistent_lsn,
},
}
}
}
/// Private functions
impl LayeredRepository {
// Implementation of the public `get_timeline` function. This differs from the public

View File

@@ -1,5 +1,4 @@
pub mod basebackup;
pub mod branches;
pub mod config;
pub mod http;
pub mod import_datadir;
@@ -12,6 +11,7 @@ pub mod repository;
pub mod tenant_mgr;
pub mod tenant_threads;
pub mod thread_mgr;
pub mod timelines;
pub mod virtual_file;
pub mod walingest;
pub mod walreceiver;

View File

@@ -62,11 +62,3 @@ Based on previous evaluation, even `rusoto-s3` could be a better choice over thi
So far, we don't adjust the remote storage based on GC thread loop results, only checkpointer loop affects the remote storage.
Index module could be used as a base to implement a deferred GC mechanism, a "defragmentation" that repacks archives into new ones after GC is done removing the files from the archives.
* bracnhes implementaion could be improved
Currently, there's a code to sync the branches along with the timeline files: on upload, every local branch files that are missing remotely are uploaded,
on the timeline download, missing remote branch files are downlaoded.
A branch is a per-tenant entity, yet a current implementaion requires synchronizing a timeline first to get the branch files locally.
Currently, there's no other way to know about the remote branch files, neither the file contents is verified and updated.

View File

@@ -14,13 +14,6 @@
//! Only GC removes local timeline files, the GC support is not added to sync currently,
//! yet downloading extra files is not critically bad at this stage, GC can remove those again.
//!
//! Along the timeline files, branch files are uploaded and downloaded every time a corresponding sync task is processed.
//! For simplicity, branch files are also treated as immutable: only missing files are uploaded or downloaded, no removals, amendments or file contents checks are done.
//! Also, the branches are copied as separate files, with no extra compressions done.
//! Despite branches information currently belonging to tenants, a tenants' timeline sync is required to upload or download the branch files, also, there's no way to know
//! the branch sync state outside of the sync loop.
//! This implementation is currently considered as temporary and is a subjec to change later.
//!
//! During the loop startup, an initial [`RemoteTimelineIndex`] state is constructed via listing the remote storage contents.
//! It's enough to poll the remote state once on startup only, due to agreement that the pageserver has
//! an exclusive write access to the remote storage: new files appear in the storage only after the same
@@ -66,7 +59,6 @@
//! NOTE: No real contents or checksum check happens right now and is a subject to improve later.
//!
//! After the whole timeline is downloaded, [`crate::tenant_mgr::set_timeline_states`] function is used to update pageserver memory stage for the timeline processed.
//! No extra branch registration is done.
//!
//! When pageserver signals shutdown, current sync task gets finished and the loop exists.
@@ -77,7 +69,7 @@ pub mod index;
mod upload;
use std::{
collections::{BTreeSet, HashMap, HashSet, VecDeque},
collections::{BTreeSet, HashMap, VecDeque},
num::{NonZeroU32, NonZeroUsize},
path::{Path, PathBuf},
sync::Arc,
@@ -87,7 +79,6 @@ use anyhow::{bail, Context};
use futures::stream::{FuturesUnordered, StreamExt};
use lazy_static::lazy_static;
use tokio::{
fs,
runtime::Runtime,
sync::{
mpsc::{self, UnboundedReceiver},
@@ -101,8 +92,7 @@ use self::{
compression::ArchiveHeader,
download::{download_timeline, DownloadedTimeline},
index::{
ArchiveDescription, ArchiveId, RelativePath, RemoteTimeline, RemoteTimelineIndex,
TimelineIndexEntry,
ArchiveDescription, ArchiveId, RemoteTimeline, RemoteTimelineIndex, TimelineIndexEntry,
},
upload::upload_timeline_checkpoint,
};
@@ -843,28 +833,6 @@ async fn download_archive_header<
Ok(header)
}
async fn tenant_branch_files(
conf: &'static PageServerConf,
tenant_id: ZTenantId,
) -> anyhow::Result<HashSet<RelativePath>> {
let branches_dir = conf.branches_path(&tenant_id);
if !branches_dir.exists() {
return Ok(HashSet::new());
}
let mut branch_entries = fs::read_dir(&branches_dir)
.await
.context("Failed to list tenant branches dir contents")?;
let mut branch_files = HashSet::new();
while let Some(branch_entry) = branch_entries.next_entry().await? {
if branch_entry.file_type().await?.is_file() {
branch_files.insert(RelativePath::new(&branches_dir, branch_entry.path())?);
}
}
Ok(branch_files)
}
#[cfg(test)]
mod test_utils {
use std::{
@@ -971,30 +939,9 @@ mod test_utils {
"Index contains unexpected sync ids"
);
let mut actual_branches = BTreeMap::new();
let mut expected_branches = BTreeMap::new();
let mut actual_timeline_entries = BTreeMap::new();
let mut expected_timeline_entries = BTreeMap::new();
for sync_id in actual_sync_ids {
actual_branches.insert(
sync_id.tenant_id,
index_read
.branch_files(sync_id.tenant_id)
.into_iter()
.flat_map(|branch_paths| branch_paths.iter())
.cloned()
.collect::<BTreeSet<_>>(),
);
expected_branches.insert(
sync_id.tenant_id,
expected_index_with_descriptions
.branch_files(sync_id.tenant_id)
.into_iter()
.flat_map(|branch_paths| branch_paths.iter())
.cloned()
.collect::<BTreeSet<_>>(),
);
actual_timeline_entries.insert(
sync_id,
index_read.timeline_entry(&sync_id).unwrap().clone(),
@@ -1009,11 +956,6 @@ mod test_utils {
}
drop(index_read);
assert_eq!(
actual_branches, expected_branches,
"Index contains unexpected branches"
);
for (sync_id, actual_timeline_entry) in actual_timeline_entries {
let expected_timeline_description = expected_timeline_entries
.remove(&sync_id)

View File

@@ -1,10 +1,8 @@
//! Timeline synchrnonization logic to put files from archives on remote storage into pageserver's local directory.
//! Currently, tenant branch files are also downloaded, but this does not appear final.
use std::{borrow::Cow, collections::BTreeSet, path::PathBuf, sync::Arc};
use anyhow::{ensure, Context};
use futures::{stream::FuturesUnordered, StreamExt};
use tokio::{fs, sync::RwLock};
use tracing::{debug, error, trace, warn};
use zenith_utils::{lsn::Lsn, zid::ZTenantId};
@@ -14,8 +12,8 @@ use crate::{
layered_repository::metadata::{metadata_path, TimelineMetadata},
remote_storage::{
storage_sync::{
compression, index::TimelineIndexEntry, sync_queue, tenant_branch_files,
update_index_description, SyncKind, SyncTask,
compression, index::TimelineIndexEntry, sync_queue, update_index_description, SyncKind,
SyncTask,
},
RemoteStorage, ZTenantTimelineId,
},
@@ -42,8 +40,6 @@ pub(super) enum DownloadedTimeline {
/// Timeline files that already exist locally are skipped during the download, but the local metadata file is
/// updated in the end of every checkpoint archive extraction.
///
/// Before any archives are considered, the branch files are checked locally and remotely, all remote-only files are downloaded.
///
/// On an error, bumps the retries count and reschedules the download, with updated archive skip list
/// (for any new successful archive downloads and extractions).
pub(super) async fn download_timeline<
@@ -113,22 +109,6 @@ pub(super) async fn download_timeline<
}
};
if let Err(e) = download_missing_branches(conf, remote_assets.as_ref(), sync_id.tenant_id).await
{
error!(
"Failed to download missing branches for sync id {}: {:?}",
sync_id, e
);
sync_queue::push(SyncTask::new(
sync_id,
retries,
SyncKind::Download(download),
));
return DownloadedTimeline::FailedAndRescheduled {
disk_consistent_lsn,
};
}
debug!("Downloading timeline archives");
let archives_to_download = remote_timeline
.checkpoints()
@@ -250,82 +230,6 @@ async fn read_local_metadata(
.context("Failed to read local metadata files bytes")?)
}
async fn download_missing_branches<
P: std::fmt::Debug + Send + Sync + 'static,
S: RemoteStorage<StoragePath = P> + Send + Sync + 'static,
>(
conf: &'static PageServerConf,
(storage, index): &(S, RwLock<RemoteTimelineIndex>),
tenant_id: ZTenantId,
) -> anyhow::Result<()> {
let local_branches = tenant_branch_files(conf, tenant_id)
.await
.context("Failed to list local branch files for the tenant")?;
let local_branches_dir = conf.branches_path(&tenant_id);
if !local_branches_dir.exists() {
fs::create_dir_all(&local_branches_dir)
.await
.with_context(|| {
format!(
"Failed to create local branches directory at path '{}'",
local_branches_dir.display()
)
})?;
}
if let Some(remote_branches) = index.read().await.branch_files(tenant_id) {
let mut remote_only_branches_downloads = remote_branches
.difference(&local_branches)
.map(|remote_only_branch| async move {
let branches_dir = conf.branches_path(&tenant_id);
let remote_branch_path = remote_only_branch.as_path(&branches_dir);
let storage_path =
storage.storage_path(&remote_branch_path).with_context(|| {
format!(
"Failed to derive a storage path for branch with local path '{}'",
remote_branch_path.display()
)
})?;
let mut target_file = fs::OpenOptions::new()
.write(true)
.create_new(true)
.open(&remote_branch_path)
.await
.with_context(|| {
format!(
"Failed to create local branch file at '{}'",
remote_branch_path.display()
)
})?;
storage
.download(&storage_path, &mut target_file)
.await
.with_context(|| {
format!(
"Failed to download branch file from the remote path {:?}",
storage_path
)
})?;
Ok::<_, anyhow::Error>(())
})
.collect::<FuturesUnordered<_>>();
let mut branch_downloads_failed = false;
while let Some(download_result) = remote_only_branches_downloads.next().await {
if let Err(e) = download_result {
branch_downloads_failed = true;
error!("Failed to download a branch file: {:?}", e);
}
}
ensure!(
!branch_downloads_failed,
"Failed to download all branch files"
);
}
Ok(())
}
#[cfg(test)]
mod tests {
use std::collections::BTreeSet;

View File

@@ -5,7 +5,7 @@
//! This way in the future, the index could be restored fast from its serialized stored form.
use std::{
collections::{BTreeMap, BTreeSet, HashMap, HashSet},
collections::{BTreeMap, BTreeSet, HashMap},
path::{Path, PathBuf},
};
@@ -49,10 +49,9 @@ impl RelativePath {
}
/// An index to track tenant files that exist on the remote storage.
/// Currently, timeline archives and branch files are tracked.
/// Currently, timeline archives files are tracked only.
#[derive(Debug, Clone)]
pub struct RemoteTimelineIndex {
branch_files: HashMap<ZTenantId, HashSet<RelativePath>>,
timeline_files: HashMap<ZTenantTimelineId, TimelineIndexEntry>,
}
@@ -65,7 +64,6 @@ impl RemoteTimelineIndex {
paths: impl Iterator<Item = P>,
) -> Self {
let mut index = Self {
branch_files: HashMap::new(),
timeline_files: HashMap::new(),
};
for path in paths {
@@ -98,17 +96,6 @@ impl RemoteTimelineIndex {
pub fn all_sync_ids(&self) -> impl Iterator<Item = ZTenantTimelineId> + '_ {
self.timeline_files.keys().copied()
}
pub fn add_branch_file(&mut self, tenant_id: ZTenantId, path: RelativePath) {
self.branch_files
.entry(tenant_id)
.or_insert_with(HashSet::new)
.insert(path);
}
pub fn branch_files(&self, tenant_id: ZTenantId) -> Option<&HashSet<RelativePath>> {
self.branch_files.get(&tenant_id)
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
@@ -306,20 +293,9 @@ fn try_parse_index_entry(
.parse::<ZTenantId>()
.with_context(|| format!("Failed to parse tenant id from path '{}'", path.display()))?;
let branches_path = conf.branches_path(&tenant_id);
let timelines_path = conf.timelines_path(&tenant_id);
match (
RelativePath::new(&branches_path, &path),
path.strip_prefix(&timelines_path),
) {
(Ok(_), Ok(_)) => bail!(
"Path '{}' cannot start with both branches '{}' and the timelines '{}' prefixes",
path.display(),
branches_path.display(),
timelines_path.display()
),
(Ok(branches_entry), Err(_)) => index.add_branch_file(tenant_id, branches_entry),
(Err(_), Ok(timelines_subpath)) => {
match path.strip_prefix(&timelines_path) {
Ok(timelines_subpath) => {
let mut segments = timelines_subpath.iter();
let timeline_id = segments
.next()
@@ -375,11 +351,10 @@ fn try_parse_index_entry(
}
}
}
(Err(branches_error), Err(timelines_strip_error)) => {
Err(timelines_strip_error) => {
bail!(
"Path '{}' is not an index entry: it's neither parsable as a branch entry '{:#}' nor as an archive entry '{}'",
"Path '{}' is not an archive entry '{}'",
path.display(),
branches_error,
timelines_strip_error,
)
}

View File

@@ -1,13 +1,10 @@
//! Timeline synchronization logic to compress and upload to the remote storage all new timeline files from the checkpoints.
//! Currently, tenant branch files are also uploaded, but this does not appear final.
use std::{borrow::Cow, collections::BTreeSet, path::PathBuf, sync::Arc};
use anyhow::{ensure, Context};
use futures::{stream::FuturesUnordered, StreamExt};
use tokio::{fs, sync::RwLock};
use anyhow::ensure;
use tokio::sync::RwLock;
use tracing::{debug, error, warn};
use zenith_utils::zid::ZTenantId;
use crate::{
config::PageServerConf,
@@ -15,7 +12,7 @@ use crate::{
storage_sync::{
compression,
index::{RemoteTimeline, TimelineIndexEntry},
sync_queue, tenant_branch_files, update_index_description, SyncKind, SyncTask,
sync_queue, update_index_description, SyncKind, SyncTask,
},
RemoteStorage, ZTenantTimelineId,
},
@@ -26,8 +23,6 @@ use super::{compression::ArchiveHeader, index::RemoteTimelineIndex, NewCheckpoin
/// Attempts to compress and upload given checkpoint files.
/// No extra checks for overlapping files is made: download takes care of that, ensuring no non-metadata local timeline files are overwritten.
///
/// Before the checkpoint files are uploaded, branch files are uploaded, if any local ones are missing remotely.
///
/// On an error, bumps the retries count and reschedules the entire task.
/// On success, populates index data with new downloads.
pub(super) async fn upload_timeline_checkpoint<
@@ -41,19 +36,6 @@ pub(super) async fn upload_timeline_checkpoint<
retries: u32,
) -> Option<bool> {
debug!("Uploading checkpoint for sync id {}", sync_id);
if let Err(e) = upload_missing_branches(config, remote_assets.as_ref(), sync_id.tenant_id).await
{
error!(
"Failed to upload missing branches for sync id {}: {:?}",
sync_id, e
);
sync_queue::push(SyncTask::new(
sync_id,
retries,
SyncKind::Upload(new_checkpoint),
));
return Some(false);
}
let new_upload_lsn = new_checkpoint.metadata.disk_consistent_lsn();
let index = &remote_assets.1;
@@ -201,76 +183,6 @@ async fn try_upload_checkpoint<
.map(|(header, header_size, _)| (header, header_size))
}
async fn upload_missing_branches<
P: std::fmt::Debug + Send + Sync + 'static,
S: RemoteStorage<StoragePath = P> + Send + Sync + 'static,
>(
config: &'static PageServerConf,
(storage, index): &(S, RwLock<RemoteTimelineIndex>),
tenant_id: ZTenantId,
) -> anyhow::Result<()> {
let local_branches = tenant_branch_files(config, tenant_id)
.await
.context("Failed to list local branch files for the tenant")?;
let index_read = index.read().await;
let remote_branches = index_read
.branch_files(tenant_id)
.cloned()
.unwrap_or_default();
drop(index_read);
let mut branch_uploads = local_branches
.difference(&remote_branches)
.map(|local_only_branch| async move {
let local_branch_path = local_only_branch.as_path(&config.branches_path(&tenant_id));
let storage_path = storage.storage_path(&local_branch_path).with_context(|| {
format!(
"Failed to derive a storage path for branch with local path '{}'",
local_branch_path.display()
)
})?;
let local_branch_file = fs::OpenOptions::new()
.read(true)
.open(&local_branch_path)
.await
.with_context(|| {
format!(
"Failed to open local branch file {} for reading",
local_branch_path.display()
)
})?;
storage
.upload(local_branch_file, &storage_path)
.await
.with_context(|| {
format!(
"Failed to upload branch file to the remote path {:?}",
storage_path
)
})?;
Ok::<_, anyhow::Error>(local_only_branch)
})
.collect::<FuturesUnordered<_>>();
let mut branch_uploads_failed = false;
while let Some(upload_result) = branch_uploads.next().await {
match upload_result {
Ok(local_only_branch) => index
.write()
.await
.add_branch_file(tenant_id, local_only_branch.clone()),
Err(e) => {
error!("Failed to upload branch file: {:?}", e);
branch_uploads_failed = true;
}
}
}
ensure!(!branch_uploads_failed, "Failed to upload all branch files");
Ok(())
}
#[cfg(test)]
mod tests {
use tempfile::tempdir;

View File

@@ -36,6 +36,10 @@ pub trait Repository: Send + Sync {
/// Get Timeline handle for given zenith timeline ID.
fn get_timeline(&self, timelineid: ZTimelineId) -> Result<RepositoryTimeline>;
/// Lists timelines the repository contains.
/// Up to repository's implementation to omit certain timelines that ar not considered ready for use.
fn list_timelines(&self) -> Result<Vec<RepositoryTimeline>>;
/// Create a new, empty timeline. The caller is responsible for loading data into it
/// Initdb lsn is provided for timeline impl to be able to perform checks for some operations against it.
fn create_empty_timeline(
@@ -72,7 +76,10 @@ pub trait Repository: Send + Sync {
pub enum RepositoryTimeline {
/// Timeline, with its files present locally in pageserver's working directory.
/// Loaded into pageserver's memory and ready to be used.
Local(Arc<dyn Timeline>),
Local {
id: ZTimelineId,
timeline: Arc<dyn Timeline>,
},
/// Timeline, found on the pageserver's remote storage, but not yet downloaded locally.
Remote {
id: ZTimelineId,
@@ -83,12 +90,19 @@ pub enum RepositoryTimeline {
impl RepositoryTimeline {
pub fn local_timeline(&self) -> Option<Arc<dyn Timeline>> {
if let Self::Local(local_timeline) = self {
Some(Arc::clone(local_timeline))
if let Self::Local { timeline, .. } = self {
Some(Arc::clone(timeline))
} else {
None
}
}
pub fn id(&self) -> ZTimelineId {
match self {
Self::Local { id, .. } => *id,
Self::Remote { id, .. } => *id,
}
}
}
/// A state of the timeline synchronization with the remote storage.
@@ -390,7 +404,6 @@ pub mod repo_harness {
let tenant_id = ZTenantId::generate();
fs::create_dir_all(conf.tenant_path(&tenant_id))?;
fs::create_dir_all(conf.branches_path(&tenant_id))?;
Ok(Self { conf, tenant_id })
}

View File

@@ -1,12 +1,12 @@
//! This module acts as a switchboard to access different repositories managed by this
//! page server.
use crate::branches;
use crate::config::PageServerConf;
use crate::layered_repository::LayeredRepository;
use crate::repository::{Repository, Timeline, TimelineSyncState};
use crate::thread_mgr;
use crate::thread_mgr::ThreadKind;
use crate::timelines;
use crate::walredo::PostgresRedoManager;
use crate::CheckpointConfig;
use anyhow::{bail, Context, Result};
@@ -182,7 +182,7 @@ pub fn create_repository_for_tenant(
tenantid: ZTenantId,
) -> Result<()> {
let wal_redo_manager = Arc::new(PostgresRedoManager::new(conf, tenantid));
let repo = branches::create_repo(conf, tenantid, wal_redo_manager)?;
let repo = timelines::create_repo(conf, tenantid, wal_redo_manager)?;
match access_tenants().entry(tenantid) {
hash_map::Entry::Occupied(_) => bail!("tenant {} already exists", tenantid),

View File

@@ -1,5 +1,5 @@
//!
//! Branch management code
//! Timeline management code
//!
// TODO: move all paths construction to conf impl
//
@@ -27,8 +27,7 @@ use crate::{import_datadir, LOG_FILE_NAME};
use crate::{repository::RepositoryTimeline, tenant_mgr};
#[derive(Serialize, Deserialize, Clone)]
pub struct BranchInfo {
pub name: String,
pub struct TimelineInfo {
#[serde(with = "hex")]
pub timeline_id: ZTimelineId,
pub latest_valid_lsn: Lsn,
@@ -38,59 +37,6 @@ pub struct BranchInfo {
pub current_logical_size_non_incremental: Option<usize>,
}
impl BranchInfo {
pub fn from_path<T: AsRef<Path>>(
path: T,
repo: &Arc<dyn Repository>,
include_non_incremental_logical_size: bool,
) -> Result<Self> {
let path = path.as_ref();
let name = path.file_name().unwrap().to_string_lossy().to_string();
let timeline_id = std::fs::read_to_string(path)
.with_context(|| {
format!(
"Failed to read branch file contents at path '{}'",
path.display()
)
})?
.parse::<ZTimelineId>()?;
let timeline = match repo.get_timeline(timeline_id)? {
RepositoryTimeline::Local(local_entry) => local_entry,
RepositoryTimeline::Remote { .. } => {
bail!("Timeline {} is remote, no branches to display", timeline_id)
}
};
// we use ancestor lsn zero if we don't have an ancestor, so turn this into an option based on timeline id
let (ancestor_id, ancestor_lsn) = match timeline.get_ancestor_timeline_id() {
Some(ancestor_id) => (
Some(ancestor_id.to_string()),
Some(timeline.get_ancestor_lsn().to_string()),
),
None => (None, None),
};
// non incremental size calculation can be heavy, so let it be optional
// needed for tests to check size calculation
let current_logical_size_non_incremental = include_non_incremental_logical_size
.then(|| {
timeline.get_current_logical_size_non_incremental(timeline.get_last_record_lsn())
})
.transpose()?;
Ok(BranchInfo {
name,
timeline_id,
latest_valid_lsn: timeline.get_last_record_lsn(),
ancestor_id,
ancestor_lsn,
current_logical_size: timeline.get_current_logical_size(),
current_logical_size_non_incremental,
})
}
}
#[derive(Debug, Clone, Copy)]
pub struct PointInTime {
pub timelineid: ZTimelineId,
@@ -140,7 +86,6 @@ pub fn create_repo(
.with_context(|| format!("could not create directory {}", repo_dir.display()))?;
crashsafe_dir::create_dir(conf.timelines_path(&tenantid))?;
crashsafe_dir::create_dir_all(conf.branches_path(&tenantid))?;
crashsafe_dir::create_dir_all(conf.tags_path(&tenantid))?;
info!("created directory structure in {}", repo_dir.display());
@@ -198,7 +143,7 @@ fn run_initdb(conf: &'static PageServerConf, initdbpath: &Path) -> Result<()> {
.output()
.context("failed to execute initdb")?;
if !initdb_output.status.success() {
anyhow::bail!(
bail!(
"initdb failed: '{}'",
String::from_utf8_lossy(&initdb_output.stderr)
);
@@ -245,65 +190,80 @@ fn bootstrap_timeline(
timeline.get_last_record_lsn()
);
let data = tli.to_string();
fs::write(conf.branch_path("main", &tenantid), data)?;
println!("created main branch");
// Remove temp dir. We don't need it anymore
fs::remove_dir_all(pgdata_path)?;
Ok(())
}
pub(crate) fn get_branches(
conf: &PageServerConf,
tenantid: &ZTenantId,
pub(crate) fn get_timelines(
tenant_id: ZTenantId,
include_non_incremental_logical_size: bool,
) -> Result<Vec<BranchInfo>> {
let repo = tenant_mgr::get_repository_for_tenant(*tenantid)?;
) -> Result<Vec<TimelineInfo>> {
let repo = tenant_mgr::get_repository_for_tenant(tenant_id)
.with_context(|| format!("Failed to get repo for tenant {}", tenant_id))?;
// Each branch has a corresponding record (text file) in the refs/branches
// with timeline_id.
let branches_dir = conf.branches_path(tenantid);
std::fs::read_dir(&branches_dir)
.with_context(|| {
format!(
"Found no branches directory '{}' for tenant {}",
branches_dir.display(),
tenantid
)
})?
.map(|dir_entry_res| {
let dir_entry = dir_entry_res.with_context(|| {
format!(
"Failed to list branches directory '{}' content for tenant {}",
branches_dir.display(),
tenantid
)
})?;
BranchInfo::from_path(
dir_entry.path(),
&repo,
include_non_incremental_logical_size,
)
Ok(repo
.list_timelines()
.with_context(|| format!("Failed to list timelines for tenant {}", tenant_id))?
.into_iter()
.filter_map(|timeline| match timeline {
RepositoryTimeline::Local { timeline, id } => Some((id, timeline)),
RepositoryTimeline::Remote { .. } => None,
})
.collect()
.map(|(timeline_id, timeline)| {
let (ancestor_id, ancestor_lsn) = match timeline.get_ancestor_timeline_id() {
Some(ancestor_id) => (
Some(ancestor_id.to_string()),
Some(timeline.get_ancestor_lsn().to_string()),
),
None => (None, None),
};
let current_logical_size_non_incremental = if include_non_incremental_logical_size {
match timeline
.get_current_logical_size_non_incremental(timeline.get_last_record_lsn())
{
Ok(size) => Some(size),
Err(e) => {
error!(
"Failed to get current logical size for timeline {}: {:?}",
timeline_id, e
);
None
}
}
} else {
None
};
TimelineInfo {
timeline_id,
latest_valid_lsn: timeline.get_last_record_lsn(),
ancestor_id,
ancestor_lsn,
current_logical_size: timeline.get_current_logical_size(),
// non incremental size calculation can be heavy, so let it be optional
// needed for tests to check size calculation
current_logical_size_non_incremental,
}
})
.collect())
}
pub(crate) fn create_branch(
pub(crate) fn create_timeline(
conf: &PageServerConf,
branchname: &str,
startpoint_str: &str,
tenantid: &ZTenantId,
) -> Result<BranchInfo> {
let repo = tenant_mgr::get_repository_for_tenant(*tenantid)?;
tenant_id: ZTenantId,
timeline_id: ZTimelineId,
) -> Result<TimelineInfo> {
let repo = tenant_mgr::get_repository_for_tenant(tenant_id)?;
if conf.branch_path(branchname, tenantid).exists() {
anyhow::bail!("branch {} already exists", branchname);
if conf.timeline_path(&timeline_id, &tenant_id).exists() {
bail!("timeline {} already exists", timeline_id);
}
let mut startpoint = parse_point_in_time(conf, startpoint_str, tenantid)?;
let mut startpoint = parse_point_in_time(conf, startpoint_str, &tenant_id)?;
let timeline = repo
.get_timeline(startpoint.timelineid)?
.local_timeline()
@@ -325,10 +285,10 @@ pub(crate) fn create_branch(
startpoint.lsn = startpoint.lsn.align();
if timeline.get_ancestor_lsn() > startpoint.lsn {
// can we safely just branch from the ancestor instead?
anyhow::bail!(
"invalid startpoint {} for the branch {}: less than timeline ancestor lsn {:?}",
bail!(
"invalid startpoint {} for the timeline {}: less than timeline ancestor lsn {:?}",
startpoint.lsn,
branchname,
timeline_id,
timeline.get_ancestor_lsn()
);
}
@@ -342,11 +302,11 @@ pub(crate) fn create_branch(
// Remember the human-readable branch name for the new timeline.
// FIXME: there's a race condition, if you create a branch with the same
// name concurrently.
// TODO kb timeline creation needs more
let data = new_timeline_id.to_string();
fs::write(conf.branch_path(branchname, tenantid), data)?;
fs::write(conf.timeline_path(&timeline_id, &tenant_id), data)?;
Ok(BranchInfo {
name: branchname.to_string(),
Ok(TimelineInfo {
timeline_id: new_timeline_id,
latest_valid_lsn: startpoint.lsn,
ancestor_id: Some(startpoint.timelineid.to_string()),
@@ -367,14 +327,6 @@ pub(crate) fn create_branch(
// A specific LSN on a timeline:
// bc62e7d612d0e6fe8f99a6dd2f281f9d@2/15D3DD8
//
// Same, with a human-friendly branch name:
// main
// main@2/15D3DD8
//
// Human-friendly tag name:
// mytag
//
//
fn parse_point_in_time(
conf: &PageServerConf,
s: &str,
@@ -399,18 +351,6 @@ fn parse_point_in_time(
}
}
// Check if it's a branch
// Check if it's branch @ LSN
let branchpath = conf.branch_path(name, tenantid);
if branchpath.exists() {
let pointstr = fs::read_to_string(branchpath)?;
let mut result = parse_point_in_time(conf, &pointstr, tenantid)?;
result.lsn = lsn.unwrap_or(Lsn(0));
return Ok(result);
}
// Check if it's a timelineid
// Check if it's timelineid @ LSN
if let Ok(timelineid) = ZTimelineId::from_str(name) {

View File

@@ -21,7 +21,7 @@ use zenith_utils::postgres_backend::AuthType;
use zenith_utils::zid::{ZNodeId, ZTenantId, ZTimelineId};
use zenith_utils::GIT_VERSION;
use pageserver::branches::BranchInfo;
use pageserver::timelines::TimelineInfo;
// Default id of a safekeeper node, if not specified on the command line.
const DEFAULT_SAFEKEEPER_ID: ZNodeId = ZNodeId(1);
@@ -53,12 +53,12 @@ http_port = {safekeeper_http_port}
}
///
/// Branches tree element used as a value in the HashMap.
/// Timelines tree element used as a value in the HashMap.
///
struct BranchTreeEl {
/// `BranchInfo` received from the `pageserver` via the `branch_list` libpq API call.
pub info: BranchInfo,
/// Holds all direct children of this branch referenced using `timeline_id`.
struct TimelineTreeEl {
/// `TimelineInfo` received from the `pageserver` via the `timeline_list` libpq API call.
pub info: TimelineInfo,
/// Holds all direct children of this timeline referenced using `timeline_id`.
pub children: Vec<String>,
}
@@ -84,7 +84,7 @@ fn main() -> Result<()> {
let timeline_arg = Arg::new("timeline")
.index(2)
.help("Branch name or a point-in time specification")
.help("Timeline id or a point-in time specification")
.required(false);
let tenantid_arg = Arg::new("tenantid")
@@ -129,9 +129,9 @@ fn main() -> Result<()> {
)
)
.subcommand(
App::new("branch")
.about("Create a new branch")
.arg(Arg::new("branchname").required(false).index(1))
App::new("timeline")
.about("Create a new timeline")
.arg(Arg::new("timeline-name").required(false).index(1))
.arg(Arg::new("start-point").required(false).index(2))
.arg(tenantid_arg.clone()),
).subcommand(
@@ -239,7 +239,7 @@ fn main() -> Result<()> {
match sub_name {
"tenant" => handle_tenant(sub_args, &env),
"branch" => handle_branch(sub_args, &env),
"timeline" => handle_timeline(sub_args, &env),
"start" => handle_start_all(sub_args, &env),
"stop" => handle_stop_all(sub_args, &env),
"pageserver" => handle_pageserver(sub_args, &env),
@@ -257,43 +257,42 @@ fn main() -> Result<()> {
}
///
/// Prints branches list as a tree-like structure.
/// Prints timelines list as a tree-like structure.
///
fn print_branches_tree(branches: Vec<BranchInfo>) -> Result<()> {
let mut branches_hash: HashMap<String, BranchTreeEl> = HashMap::new();
fn print_timelines_tree(timelines: Vec<TimelineInfo>) -> Result<()> {
let mut timelines_hash: HashMap<String, TimelineTreeEl> = timelines
.iter()
.map(|t| {
(
t.timeline_id.to_string(),
TimelineTreeEl {
info: t.clone(),
children: Vec::new(),
},
)
})
.collect();
// Form a hash table of branch timeline_id -> BranchTreeEl.
for branch in &branches {
branches_hash.insert(
branch.timeline_id.to_string(),
BranchTreeEl {
info: branch.clone(),
children: Vec::new(),
},
);
}
// Memorize all direct children of each branch.
for branch in &branches {
if let Some(tid) = &branch.ancestor_id {
branches_hash
// Memorize all direct children of each timeline.
for timeline in &timelines {
if let Some(tid) = &timeline.ancestor_id {
timelines_hash
.get_mut(tid)
.context("missing branch info in the HashMap")?
.context("missing timeline info in the HashMap")?
.children
.push(branch.timeline_id.to_string());
.push(timeline.timeline_id.to_string());
}
}
// Sort children by tid to bring some minimal order.
for branch in &mut branches_hash.values_mut() {
branch.children.sort();
for timeline in &mut timelines_hash.values_mut() {
timeline.children.sort();
}
for branch in branches_hash.values() {
// Start with root branches (no ancestors) first.
// Now there is 'main' branch only, but things may change.
if branch.info.ancestor_id.is_none() {
print_branch(0, &Vec::from([true]), branch, &branches_hash)?;
for timeline in timelines_hash.values() {
// Start with root timelines (no ancestors) first.
if timeline.info.ancestor_id.is_none() {
print_timeline(0, &Vec::from([true]), timeline, &timelines_hash)?;
}
}
@@ -301,27 +300,27 @@ fn print_branches_tree(branches: Vec<BranchInfo>) -> Result<()> {
}
///
/// Recursively prints branch info with all its children.
/// Recursively prints timeline info with all its children.
///
fn print_branch(
fn print_timeline(
nesting_level: usize,
is_last: &[bool],
branch: &BranchTreeEl,
branches: &HashMap<String, BranchTreeEl>,
timeline: &TimelineTreeEl,
timelines: &HashMap<String, TimelineTreeEl>,
) -> Result<()> {
// Draw main padding
print!(" ");
if nesting_level > 0 {
let lsn = branch
let lsn = timeline
.info
.ancestor_lsn
.as_ref()
.context("missing branch info in the HashMap")?;
.context("missing timeline info in the HashMap")?;
let mut br_sym = "┣━";
// Draw each nesting padding with proper style
// depending on whether its branch ended or not.
// depending on whether its timeline ended or not.
if nesting_level > 1 {
for l in &is_last[1..is_last.len() - 1] {
if *l {
@@ -332,7 +331,7 @@ fn print_branch(
}
}
// We are the last in this sub-branch
// We are the last in this sub-timeline
if *is_last.last().unwrap() {
br_sym = "┗━";
}
@@ -340,51 +339,51 @@ fn print_branch(
print!("{} @{}: ", br_sym, lsn);
}
// Finally print a branch name with new line
println!("{}", branch.info.name);
// Finally print a timeline name with new line
println!("{}", timeline.info.timeline_id);
let len = branch.children.len();
let len = timeline.children.len();
let mut i: usize = 0;
let mut is_last_new = Vec::from(is_last);
is_last_new.push(false);
for child in &branch.children {
for child in &timeline.children {
i += 1;
// Mark that the last padding is the end of the branch
// Mark that the last padding is the end of the timeline
if i == len {
if let Some(last) = is_last_new.last_mut() {
*last = true;
}
}
print_branch(
print_timeline(
nesting_level + 1,
&is_last_new,
branches
timelines
.get(child)
.context("missing branch info in the HashMap")?,
branches,
.context("missing timeline info in the HashMap")?,
timelines,
)?;
}
Ok(())
}
/// Returns a map of timeline IDs to branch_name@lsn strings.
/// Returns a map of timeline IDs to timeline_id@lsn strings.
/// Connects to the pageserver to query this information.
fn get_branch_infos(
fn get_timeline_infos(
env: &local_env::LocalEnv,
tenantid: &ZTenantId,
) -> Result<HashMap<ZTimelineId, BranchInfo>> {
) -> Result<HashMap<ZTimelineId, TimelineInfo>> {
let page_server = PageServerNode::from_env(env);
let branch_infos: Vec<BranchInfo> = page_server.branch_list(tenantid)?;
let branch_infos: HashMap<ZTimelineId, BranchInfo> = branch_infos
let timeline_infos: Vec<TimelineInfo> = page_server.timeline_list(tenantid)?;
let timeline_infos: HashMap<ZTimelineId, TimelineInfo> = timeline_infos
.into_iter()
.map(|branch_info| (branch_info.timeline_id, branch_info))
.map(|timeline_info| (timeline_info.timeline_id, timeline_info))
.collect();
Ok(branch_infos)
Ok(timeline_infos)
}
// Helper function to parse --tenantid option, or get the default from config file
@@ -459,24 +458,28 @@ fn handle_tenant(tenant_match: &ArgMatches, env: &local_env::LocalEnv) -> Result
Ok(())
}
fn handle_branch(branch_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> {
fn handle_timeline(timeline_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> {
let pageserver = PageServerNode::from_env(env);
let tenantid = get_tenantid(branch_match, env)?;
let tenant_id = get_tenantid(timeline_match, env)?;
if let Some(branchname) = branch_match.value_of("branchname") {
let startpoint_str = branch_match
if let Some(timeline_id) = timeline_match.value_of("timeline-id") {
let startpoint_str = timeline_match
.value_of("start-point")
.context("Missing start-point")?;
let branch = pageserver.branch_create(branchname, startpoint_str, &tenantid)?;
let timeline_id = timeline_id
.parse::<ZTimelineId>()
.context("Failed to parse timeline id from the request")?;
let timeline =
pageserver.timeline_create(timeline_id, startpoint_str.to_owned(), tenant_id)?;
println!(
"Created branch '{}' at {:?} for tenant: {}",
branch.name, branch.latest_valid_lsn, tenantid,
"Created timeline '{}' at {:?} for tenant: {}",
timeline.timeline_id, timeline.latest_valid_lsn, tenant_id,
);
} else {
// No arguments, list branches for tenant
let branches = pageserver.branch_list(&tenantid)?;
print_branches_tree(branches)?;
// No arguments, list timelines for tenant
let timelines = pageserver.timeline_list(&tenant_id)?;
print_timelines_tree(timelines)?;
}
Ok(())
@@ -495,12 +498,12 @@ fn handle_pg(pg_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> {
match sub_name {
"list" => {
let branch_infos = get_branch_infos(env, &tenantid).unwrap_or_else(|e| {
eprintln!("Failed to load branch info: {}", e);
let timeline_infos = get_timeline_infos(env, &tenantid).unwrap_or_else(|e| {
eprintln!("Failed to load timeline info: {}", e);
HashMap::new()
});
println!("NODE\tADDRESS\t\tBRANCH\tLSN\t\tSTATUS");
println!("NODE\tADDRESS\t\tTIMELINE\tLSN\t\tSTATUS");
for ((_, node_name), node) in cplane
.nodes
.iter()
@@ -509,7 +512,7 @@ fn handle_pg(pg_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> {
// FIXME: This shows the LSN at the end of the timeline. It's not the
// right thing to do for read-only nodes that might be anchored at an
// older point in time, or following but lagging behind the primary.
let lsn_str = branch_infos
let lsn_str = timeline_infos
.get(&node.timelineid)
.map(|bi| bi.latest_valid_lsn.to_string())
.unwrap_or_else(|| "?".to_string());
@@ -518,7 +521,7 @@ fn handle_pg(pg_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> {
"{}\t{}\t{}\t{}\t{}",
node_name,
node.address,
node.timelineid, // FIXME: resolve human-friendly branch name
node.timelineid,
lsn_str,
node.status(),
);
@@ -526,17 +529,17 @@ fn handle_pg(pg_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> {
}
"create" => {
let node_name = sub_args.value_of("node").unwrap_or("main");
let timeline_name = sub_args.value_of("timeline").unwrap_or(node_name);
let timeline_spec = sub_args.value_of("timeline");
let port: Option<u16> = match sub_args.value_of("port") {
Some(p) => Some(p.parse()?),
None => None,
};
cplane.new_node(tenantid, node_name, timeline_name, port)?;
cplane.new_node(tenantid, node_name, timeline_spec, port)?;
}
"start" => {
let node_name = sub_args.value_of("node").unwrap_or("main");
let timeline_name = sub_args.value_of("timeline");
let timeline_spec = sub_args.value_of("timeline");
let port: Option<u16> = match sub_args.value_of("port") {
Some(p) => Some(p.parse()?),
@@ -554,8 +557,8 @@ fn handle_pg(pg_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> {
};
if let Some(node) = node {
if timeline_name.is_some() {
println!("timeline name ignored because node exists already");
if timeline_spec.is_some() {
println!("timeline spec ignored because its node exists already");
}
println!("Starting existing postgres {}...", node_name);
node.start(&auth_token)?;
@@ -565,12 +568,11 @@ fn handle_pg(pg_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> {
// start --port X
// stop
// start <-- will also use port X even without explicit port argument
let timeline_name = timeline_name.unwrap_or(node_name);
println!(
"Starting new postgres {} on {}...",
node_name, timeline_name
"Starting new postgres {} on timeline {:?} ...",
node_name, timeline_spec
);
let node = cplane.new_node(tenantid, node_name, timeline_name, port)?;
let node = cplane.new_node(tenantid, node_name, timeline_spec, port)?;
node.start(&auth_token)?;
}
}
@@ -585,9 +587,7 @@ fn handle_pg(pg_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> {
node.stop(destroy)?;
}
_ => {
bail!("Unexpected pg subcommand '{}'", sub_name)
}
_ => bail!("Unexpected pg subcommand '{}'", sub_name),
}
Ok(())