Allow creating timelines by branching off ancestors

This commit is contained in:
Kirill Bulatov
2022-02-14 00:53:00 +02:00
committed by Kirill Bulatov
parent 0c91091c63
commit f49990ed43
44 changed files with 855 additions and 653 deletions

View File

@@ -1,8 +1,9 @@
use serde::{Deserialize, Serialize};
use zenith_utils::{lsn::Lsn, zid::ZTimelineId};
use crate::ZTenantId;
use zenith_utils::zid::ZNodeId;
use zenith_utils::{
lsn::Lsn,
zid::{opt_display_serde, ZTenantId, ZTimelineId},
};
#[derive(Serialize, Deserialize)]
pub struct TimelineCreateRequest {
@@ -10,6 +11,8 @@ pub struct TimelineCreateRequest {
pub tenant_id: ZTenantId,
#[serde(with = "hex")]
pub timeline_id: ZTimelineId,
#[serde(with = "opt_display_serde")]
pub ancestor_timeline_id: Option<ZTimelineId>,
pub start_lsn: Option<Lsn>,
}

View File

@@ -3,7 +3,6 @@ use std::sync::Arc;
use anyhow::Result;
use hyper::StatusCode;
use hyper::{Body, Request, Response, Uri};
use serde::Serialize;
use tracing::*;
use zenith_utils::auth::JwtAuth;
use zenith_utils::http::endpoint::attach_openapi_ui;
@@ -17,15 +16,13 @@ use zenith_utils::http::{
request::parse_request_param,
};
use zenith_utils::http::{RequestExt, RouterBuilder};
use zenith_utils::lsn::Lsn;
use zenith_utils::zid::HexZTimelineId;
use zenith_utils::zid::ZTimelineId;
use zenith_utils::zid::{HexZTimelineId, ZTimelineId};
use super::models::StatusResponse;
use super::models::TenantCreateRequest;
use super::models::TimelineCreateRequest;
use crate::repository::RepositoryTimeline;
use crate::repository::TimelineSyncState;
use crate::timelines::TimelineInfo;
use crate::{config::PageServerConf, tenant_mgr, timelines, ZTenantId};
#[derive(Debug)]
@@ -82,6 +79,7 @@ async fn timeline_create_handler(mut request: Request<Body>) -> Result<Response<
get_config(&request),
request_data.tenant_id,
request_data.timeline_id,
request_data.ancestor_timeline_id,
request_data.start_lsn,
)
})
@@ -118,28 +116,6 @@ fn get_include_non_incremental_logical_size(request: &Request<Body>) -> bool {
.unwrap_or(false)
}
#[derive(Debug, Serialize)]
#[serde(tag = "type")]
enum TimelineInfo {
Local {
#[serde(with = "hex")]
timeline_id: ZTimelineId,
#[serde(with = "hex")]
tenant_id: ZTenantId,
ancestor_timeline_id: Option<HexZTimelineId>,
last_record_lsn: Lsn,
prev_record_lsn: Lsn,
disk_consistent_lsn: Lsn,
timeline_state: Option<TimelineSyncState>,
},
Remote {
#[serde(with = "hex")]
timeline_id: ZTimelineId,
#[serde(with = "hex")]
tenant_id: ZTenantId,
},
}
async fn timeline_detail_handler(request: Request<Body>) -> Result<Response<Body>, ApiError> {
let tenant_id: ZTenantId = parse_request_param(&request, "tenant_id")?;
check_permission(&request, Some(tenant_id))?;
@@ -151,23 +127,13 @@ async fn timeline_detail_handler(request: Request<Body>) -> Result<Response<Body
info_span!("timeline_detail_handler", tenant = %tenant_id, timeline = %timeline_id)
.entered();
let repo = tenant_mgr::get_repository_for_tenant(tenant_id)?;
Ok::<_, anyhow::Error>(match repo.get_timeline(timeline_id)?.local_timeline() {
None => TimelineInfo::Remote {
timeline_id,
tenant_id,
},
Some(timeline) => TimelineInfo::Local {
timeline_id,
tenant_id,
ancestor_timeline_id: timeline
.get_ancestor_timeline_id()
.map(HexZTimelineId::from),
disk_consistent_lsn: timeline.get_disk_consistent_lsn(),
last_record_lsn: timeline.get_last_record_lsn(),
prev_record_lsn: timeline.get_prev_record_lsn(),
timeline_state: repo.get_timeline_state(timeline_id),
},
})
let include_non_incremental_logical_size =
get_include_non_incremental_logical_size(&request);
Ok::<_, anyhow::Error>(TimelineInfo::from_repo_timeline(
tenant_id,
repo.get_timeline(timeline_id)?,
include_non_incremental_logical_size,
))
})
.await
.map_err(ApiError::from_err)??;
@@ -247,13 +213,13 @@ async fn tenant_create_handler(mut request: Request<Body>) -> Result<Response<Bo
let request_data: TenantCreateRequest = json_request(&mut request).await?;
tokio::task::spawn_blocking(move || {
let initial_timeline_id = tokio::task::spawn_blocking(move || {
let _enter = info_span!("tenant_create", tenant = %request_data.tenant_id).entered();
tenant_mgr::create_repository_for_tenant(get_config(&request), request_data.tenant_id)
})
.await
.map_err(ApiError::from_err)??;
Ok(json_response(StatusCode::CREATED, ())?)
Ok(json_response(StatusCode::CREATED, initial_timeline_id)?)
}
async fn handler_404(_: Request<Body>) -> Result<Response<Body>, ApiError> {

View File

@@ -49,7 +49,7 @@ impl RelativePath {
}
/// An index to track tenant files that exist on the remote storage.
/// Currently, timeline archives files are tracked only.
/// Currently, timeline archive files are tracked only.
#[derive(Debug, Clone)]
pub struct RemoteTimelineIndex {
timeline_files: HashMap<ZTenantTimelineId, TimelineIndexEntry>,

View File

@@ -107,7 +107,7 @@ impl RepositoryTimeline {
/// A state of the timeline synchronization with the remote storage.
/// Contains `disk_consistent_lsn` of the corresponding remote timeline (latest checkpoint's disk_consistent_lsn).
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum TimelineSyncState {
/// No further downloads from the remote storage are needed.
/// The timeline state is up-to-date or ahead of the remote storage one,

View File

@@ -180,9 +180,9 @@ pub fn shutdown_all_tenants() {
pub fn create_repository_for_tenant(
conf: &'static PageServerConf,
tenantid: ZTenantId,
) -> Result<()> {
) -> Result<ZTimelineId> {
let wal_redo_manager = Arc::new(PostgresRedoManager::new(conf, tenantid));
let repo = timelines::create_repo(conf, tenantid, wal_redo_manager)?;
let (initial_timeline_id, repo) = timelines::create_repo(conf, tenantid, wal_redo_manager)?;
match access_tenants().entry(tenantid) {
hash_map::Entry::Occupied(_) => bail!("tenant {} already exists", tenantid),
@@ -194,7 +194,7 @@ pub fn create_repository_for_tenant(
}
}
Ok(())
Ok(initial_timeline_id)
}
pub fn get_tenant_state(tenantid: ZTenantId) -> Option<TenantState> {

View File

@@ -17,24 +17,133 @@ use std::{
use tracing::*;
use zenith_utils::lsn::Lsn;
use zenith_utils::zid::{ZTenantId, ZTimelineId};
use zenith_utils::zid::{opt_display_serde, ZTenantId, ZTimelineId};
use zenith_utils::{crashsafe_dir, logging};
use crate::walredo::WalRedoManager;
use crate::CheckpointConfig;
use crate::{config::PageServerConf, repository::Repository};
use crate::{import_datadir, LOG_FILE_NAME};
use crate::{repository::RepositoryTimeline, tenant_mgr};
use crate::{repository::Timeline, CheckpointConfig};
#[derive(Serialize, Deserialize, Clone)]
pub struct TimelineInfo {
#[serde(with = "hex")]
pub timeline_id: ZTimelineId,
pub latest_valid_lsn: Lsn,
pub ancestor_id: Option<String>,
pub ancestor_lsn: Option<String>,
pub current_logical_size: usize,
pub current_logical_size_non_incremental: Option<usize>,
#[serde(tag = "type")]
pub enum TimelineInfo {
Local {
#[serde(with = "hex")]
timeline_id: ZTimelineId,
#[serde(with = "hex")]
tenant_id: ZTenantId,
last_record_lsn: Lsn,
prev_record_lsn: Lsn,
#[serde(with = "opt_display_serde")]
ancestor_timeline_id: Option<ZTimelineId>,
ancestor_lsn: Option<Lsn>,
disk_consistent_lsn: Lsn,
current_logical_size: usize,
current_logical_size_non_incremental: Option<usize>,
},
Remote {
#[serde(with = "hex")]
timeline_id: ZTimelineId,
#[serde(with = "hex")]
tenant_id: ZTenantId,
disk_consistent_lsn: Lsn,
},
}
impl TimelineInfo {
pub fn from_repo_timeline(
tenant_id: ZTenantId,
repo_timeline: RepositoryTimeline,
include_non_incremental_logical_size: bool,
) -> Self {
match repo_timeline {
RepositoryTimeline::Local { id, timeline } => {
let ancestor_timeline_id = timeline.get_ancestor_timeline_id();
let ancestor_lsn = if ancestor_timeline_id.is_some() {
Some(timeline.get_ancestor_lsn())
} else {
None
};
Self::Local {
timeline_id: id,
tenant_id,
last_record_lsn: timeline.get_last_record_lsn(),
prev_record_lsn: timeline.get_prev_record_lsn(),
ancestor_timeline_id,
ancestor_lsn,
disk_consistent_lsn: timeline.get_disk_consistent_lsn(),
current_logical_size: timeline.get_current_logical_size(),
current_logical_size_non_incremental: get_current_logical_size_non_incremental(
include_non_incremental_logical_size,
timeline.as_ref(),
),
}
}
RepositoryTimeline::Remote {
id,
disk_consistent_lsn,
} => Self::Remote {
timeline_id: id,
tenant_id,
disk_consistent_lsn,
},
}
}
pub fn from_dyn_timeline(
tenant_id: ZTenantId,
timeline_id: ZTimelineId,
timeline: &dyn Timeline,
include_non_incremental_logical_size: bool,
) -> Self {
let ancestor_timeline_id = timeline.get_ancestor_timeline_id();
let ancestor_lsn = if ancestor_timeline_id.is_some() {
Some(timeline.get_ancestor_lsn())
} else {
None
};
Self::Local {
timeline_id,
tenant_id,
last_record_lsn: timeline.get_last_record_lsn(),
prev_record_lsn: timeline.get_prev_record_lsn(),
ancestor_timeline_id,
ancestor_lsn,
disk_consistent_lsn: timeline.get_disk_consistent_lsn(),
current_logical_size: timeline.get_current_logical_size(),
current_logical_size_non_incremental: get_current_logical_size_non_incremental(
include_non_incremental_logical_size,
timeline,
),
}
}
pub fn timeline_id(&self) -> ZTimelineId {
match *self {
TimelineInfo::Local { timeline_id, .. } => timeline_id,
TimelineInfo::Remote { timeline_id, .. } => timeline_id,
}
}
}
fn get_current_logical_size_non_incremental(
include_non_incremental_logical_size: bool,
timeline: &dyn Timeline,
) -> Option<usize> {
if !include_non_incremental_logical_size {
return None;
}
match timeline.get_current_logical_size_non_incremental(timeline.get_last_record_lsn()) {
Ok(size) => Some(size),
Err(e) => {
error!("Failed to get non-incremental logical size: {:?}", e);
None
}
}
}
#[derive(Debug, Clone, Copy)]
@@ -75,7 +184,7 @@ pub fn create_repo(
conf: &'static PageServerConf,
tenantid: ZTenantId,
wal_redo_manager: Arc<dyn WalRedoManager + Send + Sync>,
) -> Result<Arc<dyn Repository>> {
) -> Result<(ZTimelineId, Arc<dyn Repository>)> {
let repo_dir = conf.tenant_path(&tenantid);
if repo_dir.exists() {
bail!("repo for {} already exists", tenantid)
@@ -107,7 +216,7 @@ pub fn create_repo(
// move data loading out of create_repo()
bootstrap_timeline(conf, tenantid, timeline_id, repo.as_ref())?;
Ok(repo)
Ok((timeline_id, repo))
}
// Returns checkpoint LSN from controlfile
@@ -160,7 +269,7 @@ fn bootstrap_timeline(
tenantid: ZTenantId,
tli: ZTimelineId,
repo: &dyn Repository,
) -> Result<()> {
) -> Result<Arc<dyn Timeline>> {
let _enter = info_span!("bootstrapping", timeline = %tli, tenant = %tenantid).entered();
let initdb_path = conf.tenant_path(&tenantid).join("tmp");
@@ -192,7 +301,7 @@ fn bootstrap_timeline(
// Remove temp dir. We don't need it anymore
fs::remove_dir_all(pgdata_path)?;
Ok(())
Ok(timeline)
}
pub(crate) fn get_timelines(
@@ -211,110 +320,86 @@ pub(crate) fn get_timelines(
RepositoryTimeline::Remote { .. } => None,
})
.map(|(timeline_id, timeline)| {
let (ancestor_id, ancestor_lsn) = match timeline.get_ancestor_timeline_id() {
Some(ancestor_id) => (
Some(ancestor_id.to_string()),
Some(timeline.get_ancestor_lsn().to_string()),
),
None => (None, None),
};
let current_logical_size_non_incremental = if include_non_incremental_logical_size {
match timeline
.get_current_logical_size_non_incremental(timeline.get_last_record_lsn())
{
Ok(size) => Some(size),
Err(e) => {
error!(
"Failed to get current logical size for timeline {}: {:?}",
timeline_id, e
);
None
}
}
} else {
None
};
TimelineInfo {
TimelineInfo::from_dyn_timeline(
tenant_id,
timeline_id,
latest_valid_lsn: timeline.get_last_record_lsn(),
ancestor_id,
ancestor_lsn,
current_logical_size: timeline.get_current_logical_size(),
// non incremental size calculation can be heavy, so let it be optional
// needed for tests to check size calculation
current_logical_size_non_incremental,
}
timeline.as_ref(),
include_non_incremental_logical_size,
)
})
.collect())
}
pub(crate) fn create_timeline(
conf: &PageServerConf,
conf: &'static PageServerConf,
tenant_id: ZTenantId,
timeline_id: ZTimelineId,
start_lsn: Option<Lsn>,
new_timeline_id: ZTimelineId,
ancestor_timeline_id: Option<ZTimelineId>,
ancestor_start_lsn: Option<Lsn>,
) -> Result<TimelineInfo> {
if conf.timeline_path(&timeline_id, &tenant_id).exists() {
bail!("timeline {} already exists", timeline_id);
if conf.timeline_path(&new_timeline_id, &tenant_id).exists() {
bail!("timeline {} already exists", new_timeline_id);
}
let repo = tenant_mgr::get_repository_for_tenant(tenant_id)?;
let mut start_lsn = ancestor_start_lsn.unwrap_or(Lsn(0));
let mut startpoint = PointInTime {
timeline_id,
lsn: start_lsn.unwrap_or(Lsn(0)),
};
match ancestor_timeline_id {
Some(ancestor_timeline_id) => {
let ancestor_timeline = repo
.get_timeline(ancestor_timeline_id)
.with_context(|| format!("Cannot get ancestor timeline {}", ancestor_timeline_id))?
.local_timeline()
.with_context(|| {
format!(
"Cannot branch off the timeline {} that's not present locally",
ancestor_timeline_id
)
})?;
let timeline = repo
.get_timeline(startpoint.timeline_id)?
.local_timeline()
.context("Cannot branch off the timeline that's not present locally")?;
if startpoint.lsn == Lsn(0) {
// Find end of WAL on the old timeline
let end_of_wal = timeline.get_last_record_lsn();
info!("branching at end of WAL: {}", end_of_wal);
startpoint.lsn = end_of_wal;
} else {
// Wait for the WAL to arrive and be processed on the parent branch up
// to the requested branch point. The repository code itself doesn't
// require it, but if we start to receive WAL on the new timeline,
// decoding the new WAL might need to look up previous pages, relation
// sizes etc. and that would get confused if the previous page versions
// are not in the repository yet.
timeline.wait_lsn(startpoint.lsn)?;
if start_lsn == Lsn(0) {
// Find end of WAL on the old timeline
let end_of_wal = ancestor_timeline.get_last_record_lsn();
info!("branching at end of WAL: {}", end_of_wal);
start_lsn = end_of_wal;
} else {
// Wait for the WAL to arrive and be processed on the parent branch up
// to the requested branch point. The repository code itself doesn't
// require it, but if we start to receive WAL on the new timeline,
// decoding the new WAL might need to look up previous pages, relation
// sizes etc. and that would get confused if the previous page versions
// are not in the repository yet.
ancestor_timeline.wait_lsn(start_lsn)?;
}
start_lsn = start_lsn.align();
let ancestor_ancestor_lsn = ancestor_timeline.get_ancestor_lsn();
if ancestor_ancestor_lsn > start_lsn {
// can we safely just branch from the ancestor instead?
anyhow::bail!(
"invalid start lsn {} for ancestor timeline {}: less than timeline ancestor lsn {}",
start_lsn,
ancestor_timeline_id,
ancestor_ancestor_lsn,
);
}
repo.branch_timeline(ancestor_timeline_id, new_timeline_id, start_lsn)?;
// load the timeline into memory
let loaded_timeline = repo.get_timeline(new_timeline_id)?;
Ok(TimelineInfo::from_repo_timeline(
tenant_id,
loaded_timeline,
false,
))
}
None => {
let new_timeline = bootstrap_timeline(conf, tenant_id, new_timeline_id, repo.as_ref())?;
Ok(TimelineInfo::from_dyn_timeline(
tenant_id,
new_timeline_id,
new_timeline.as_ref(),
false,
))
}
}
startpoint.lsn = startpoint.lsn.align();
if timeline.get_ancestor_lsn() > startpoint.lsn {
// can we safely just branch from the ancestor instead?
bail!(
"invalid startpoint {} for the timeline {}: less than timeline ancestor lsn {:?}",
startpoint.lsn,
timeline_id,
timeline.get_ancestor_lsn()
);
}
let new_timeline_id = ZTimelineId::generate();
// Forward entire timeline creation routine to repository
// backend, so it can do all needed initialization
repo.branch_timeline(startpoint.timeline_id, new_timeline_id, startpoint.lsn)?;
// Remember the human-readable branch name for the new timeline.
// FIXME: there's a race condition, if you create a branch with the same
// name concurrently.
// TODO kb timeline creation needs more
let data = new_timeline_id.to_string();
fs::write(conf.timeline_path(&timeline_id, &tenant_id), data)?;
Ok(TimelineInfo {
timeline_id: new_timeline_id,
latest_valid_lsn: startpoint.lsn,
ancestor_id: Some(startpoint.timeline_id.to_string()),
ancestor_lsn: Some(startpoint.lsn.to_string()),
current_logical_size: 0,
current_logical_size_non_incremental: Some(0),
})
}