refactor timeline memory state management

This commit is contained in:
Dmitry Rodionov
2022-03-17 13:21:00 +04:00
committed by Dmitry Rodionov
parent a7544eead5
commit 7738254f83
20 changed files with 1484 additions and 1122 deletions

View File

@@ -1,4 +1,3 @@
use std::convert::TryFrom;
use std::io::Write;
use std::net::TcpStream;
use std::path::PathBuf;
@@ -10,7 +9,7 @@ use anyhow::{bail, Context};
use nix::errno::Errno;
use nix::sys::signal::{kill, Signal};
use nix::unistd::Pid;
use pageserver::http::models::{TenantCreateRequest, TimelineCreateRequest, TimelineInfoResponse};
use pageserver::http::models::{TenantCreateRequest, TimelineCreateRequest};
use pageserver::timelines::TimelineInfo;
use postgres::{Config, NoTls};
use reqwest::blocking::{Client, RequestBuilder, Response};
@@ -358,7 +357,7 @@ impl PageServerNode {
}
pub fn timeline_list(&self, tenant_id: &ZTenantId) -> anyhow::Result<Vec<TimelineInfo>> {
let timeline_infos: Vec<TimelineInfoResponse> = self
let timeline_infos: Vec<TimelineInfo> = self
.http_request(
Method::GET,
format!("{}/tenant/{}/timeline", self.http_base_url, tenant_id),
@@ -367,10 +366,7 @@ impl PageServerNode {
.error_from_body()?
.json()?;
timeline_infos
.into_iter()
.map(TimelineInfo::try_from)
.collect()
Ok(timeline_infos)
}
pub fn timeline_create(
@@ -392,10 +388,8 @@ impl PageServerNode {
})
.send()?
.error_from_body()?
.json::<Option<TimelineInfoResponse>>()?;
.json::<Option<TimelineInfo>>()?;
timeline_info_response
.map(TimelineInfo::try_from)
.transpose()
Ok(timeline_info_response)
}
}

View File

@@ -18,7 +18,10 @@ use daemonize::Daemonize;
use pageserver::{
config::{defaults::*, PageServerConf},
http, page_cache, page_service, remote_storage, tenant_mgr, thread_mgr,
http, page_cache, page_service,
remote_storage::{self, SyncStartupData},
repository::TimelineSyncStatusUpdate,
tenant_mgr, thread_mgr,
thread_mgr::ThreadKind,
timelines, virtual_file, LOG_FILE_NAME,
};
@@ -227,11 +230,47 @@ fn start_pageserver(conf: &'static PageServerConf, daemonize: bool) -> Result<()
}
let signals = signals::install_shutdown_handlers()?;
let sync_startup = remote_storage::start_local_timeline_sync(conf)
// Initialize repositories with locally available timelines.
// Timelines that are only partially available locally (remote storage has more data than this pageserver)
// are scheduled for download and added to the repository once download is completed.
let SyncStartupData {
remote_index,
local_timeline_init_statuses,
} = remote_storage::start_local_timeline_sync(conf)
.context("Failed to set up local files sync with external storage")?;
// Initialize tenant manager.
tenant_mgr::set_timeline_states(conf, sync_startup.initial_timeline_states);
for (tenant_id, local_timeline_init_statuses) in local_timeline_init_statuses {
// initialize local tenant
let repo = tenant_mgr::load_local_repo(conf, tenant_id, &remote_index);
for (timeline_id, init_status) in local_timeline_init_statuses {
match init_status {
remote_storage::LocalTimelineInitStatus::LocallyComplete => {
debug!("timeline {} for tenant {} is locally complete, registering it in repository", tenant_id, timeline_id);
// Lets fail here loudly to be on the safe side.
// XXX: It may be a better api to actually distinguish between repository startup
// and processing of newly downloaded timelines.
repo.apply_timeline_remote_sync_status_update(
timeline_id,
TimelineSyncStatusUpdate::Downloaded,
)
.with_context(|| {
format!(
"Failed to bootstrap timeline {} for tenant {}",
timeline_id, tenant_id
)
})?
}
remote_storage::LocalTimelineInitStatus::NeedsSync => {
debug!(
"timeline {} for tenant {} needs sync, \
so skipped for adding into repository until sync is finished",
tenant_id, timeline_id
);
}
}
}
}
// initialize authentication for incoming connections
let auth = match &conf.auth_type {
@@ -253,7 +292,7 @@ fn start_pageserver(conf: &'static PageServerConf, daemonize: bool) -> Result<()
None,
"http_endpoint_thread",
move || {
let router = http::make_router(conf, auth_cloned);
let router = http::make_router(conf, auth_cloned, remote_index);
endpoint::serve_thread_main(router, http_listener, thread_mgr::shutdown_watcher())
},
)?;

View File

@@ -1,11 +1,12 @@
use crate::timelines::TimelineInfo;
use anyhow::{anyhow, bail, Context};
use anyhow::Context;
use serde::{Deserialize, Serialize};
use zenith_utils::{
lsn::Lsn,
zid::{HexZTenantId, HexZTimelineId, ZNodeId, ZTenantId, ZTimelineId},
};
use crate::timelines::{LocalTimelineInfo, TimelineInfo};
#[derive(Serialize, Deserialize)]
pub struct TimelineCreateRequest {
pub new_timeline_id: Option<HexZTimelineId>,
@@ -18,8 +19,28 @@ pub struct TenantCreateRequest {
pub new_tenant_id: Option<HexZTenantId>,
}
#[derive(Clone)]
pub enum TimelineInfoV1 {
Local {
timeline_id: ZTimelineId,
tenant_id: ZTenantId,
last_record_lsn: Lsn,
prev_record_lsn: Option<Lsn>,
ancestor_timeline_id: Option<ZTimelineId>,
ancestor_lsn: Option<Lsn>,
disk_consistent_lsn: Lsn,
current_logical_size: Option<usize>,
current_logical_size_non_incremental: Option<usize>,
},
Remote {
timeline_id: ZTimelineId,
tenant_id: ZTenantId,
disk_consistent_lsn: Lsn,
},
}
#[derive(Serialize, Deserialize)]
pub struct TimelineInfoResponse {
pub struct TimelineInfoResponseV1 {
pub kind: String,
#[serde(with = "hex")]
timeline_id: ZTimelineId,
@@ -34,10 +55,10 @@ pub struct TimelineInfoResponse {
current_logical_size_non_incremental: Option<usize>,
}
impl From<TimelineInfo> for TimelineInfoResponse {
fn from(other: TimelineInfo) -> Self {
impl From<TimelineInfoV1> for TimelineInfoResponseV1 {
fn from(other: TimelineInfoV1) -> Self {
match other {
TimelineInfo::Local {
TimelineInfoV1::Local {
timeline_id,
tenant_id,
last_record_lsn,
@@ -47,23 +68,23 @@ impl From<TimelineInfo> for TimelineInfoResponse {
disk_consistent_lsn,
current_logical_size,
current_logical_size_non_incremental,
} => TimelineInfoResponse {
} => TimelineInfoResponseV1 {
kind: "Local".to_owned(),
timeline_id,
tenant_id,
disk_consistent_lsn: disk_consistent_lsn.to_string(),
last_record_lsn: Some(last_record_lsn.to_string()),
prev_record_lsn: Some(prev_record_lsn.to_string()),
prev_record_lsn: prev_record_lsn.map(|lsn| lsn.to_string()),
ancestor_timeline_id: ancestor_timeline_id.map(HexZTimelineId::from),
ancestor_lsn: ancestor_lsn.map(|lsn| lsn.to_string()),
current_logical_size: Some(current_logical_size),
current_logical_size,
current_logical_size_non_incremental,
},
TimelineInfo::Remote {
TimelineInfoV1::Remote {
timeline_id,
tenant_id,
disk_consistent_lsn,
} => TimelineInfoResponse {
} => TimelineInfoResponseV1 {
kind: "Remote".to_owned(),
timeline_id,
tenant_id,
@@ -79,10 +100,10 @@ impl From<TimelineInfo> for TimelineInfoResponse {
}
}
impl TryFrom<TimelineInfoResponse> for TimelineInfo {
impl TryFrom<TimelineInfoResponseV1> for TimelineInfoV1 {
type Error = anyhow::Error;
fn try_from(other: TimelineInfoResponse) -> anyhow::Result<Self> {
fn try_from(other: TimelineInfoResponseV1) -> anyhow::Result<Self> {
let parse_lsn_hex_string = |lsn_string: String| {
lsn_string
.parse::<Lsn>()
@@ -91,33 +112,68 @@ impl TryFrom<TimelineInfoResponse> for TimelineInfo {
let disk_consistent_lsn = parse_lsn_hex_string(other.disk_consistent_lsn)?;
Ok(match other.kind.as_str() {
"Local" => TimelineInfo::Local {
"Local" => TimelineInfoV1::Local {
timeline_id: other.timeline_id,
tenant_id: other.tenant_id,
last_record_lsn: other
.last_record_lsn
.ok_or(anyhow!("Local timeline should have last_record_lsn"))
.ok_or(anyhow::anyhow!(
"Local timeline should have last_record_lsn"
))
.and_then(parse_lsn_hex_string)?,
prev_record_lsn: other
.prev_record_lsn
.ok_or(anyhow!("Local timeline should have prev_record_lsn"))
.and_then(parse_lsn_hex_string)?,
.map(parse_lsn_hex_string)
.transpose()?,
ancestor_timeline_id: other.ancestor_timeline_id.map(ZTimelineId::from),
ancestor_lsn: other.ancestor_lsn.map(parse_lsn_hex_string).transpose()?,
disk_consistent_lsn,
current_logical_size: other.current_logical_size.ok_or(anyhow!("No "))?,
current_logical_size: other.current_logical_size,
current_logical_size_non_incremental: other.current_logical_size_non_incremental,
},
"Remote" => TimelineInfo::Remote {
"Remote" => TimelineInfoV1::Remote {
timeline_id: other.timeline_id,
tenant_id: other.tenant_id,
disk_consistent_lsn,
},
unknown => bail!("Unknown timeline kind: {}", unknown),
unknown => anyhow::bail!("Unknown timeline kind: {}", unknown),
})
}
}
fn from_local(
tenant_id: ZTenantId,
timeline_id: ZTimelineId,
local: &LocalTimelineInfo,
) -> TimelineInfoV1 {
TimelineInfoV1::Local {
timeline_id,
tenant_id,
last_record_lsn: local.last_record_lsn,
prev_record_lsn: local.prev_record_lsn,
ancestor_timeline_id: local.ancestor_timeline_id.map(ZTimelineId::from),
ancestor_lsn: local.ancestor_lsn,
disk_consistent_lsn: local.disk_consistent_lsn,
current_logical_size: local.current_logical_size,
current_logical_size_non_incremental: local.current_logical_size_non_incremental,
}
}
impl From<TimelineInfo> for TimelineInfoV1 {
fn from(t: TimelineInfo) -> Self {
match (t.local.as_ref(), t.remote.as_ref()) {
(None, None) => unreachable!(),
(None, Some(remote)) => TimelineInfoV1::Remote {
timeline_id: t.timeline_id,
tenant_id: t.tenant_id,
disk_consistent_lsn: remote.remote_consistent_lsn.unwrap_or(Lsn(0)),
},
(Some(local), None) => from_local(t.tenant_id, t.timeline_id, local),
(Some(local), Some(_)) => from_local(t.tenant_id, t.timeline_id, local),
}
}
}
#[derive(Serialize)]
pub struct StatusResponse {
pub id: ZNodeId,

View File

@@ -3,6 +3,7 @@ use std::sync::Arc;
use anyhow::Result;
use hyper::StatusCode;
use hyper::{Body, Request, Response, Uri};
use tokio::sync::RwLock;
use tracing::*;
use zenith_utils::auth::JwtAuth;
use zenith_utils::http::endpoint::attach_openapi_ui;
@@ -16,24 +17,32 @@ use zenith_utils::http::{
request::parse_request_param,
};
use zenith_utils::http::{RequestExt, RouterBuilder};
use zenith_utils::zid::{HexZTenantId, ZTimelineId};
use zenith_utils::zid::{HexZTenantId, ZTenantTimelineId, ZTimelineId};
use super::models::{
StatusResponse, TenantCreateRequest, TimelineCreateRequest, TimelineInfoResponse,
StatusResponse, TenantCreateRequest, TimelineCreateRequest, TimelineInfoResponseV1,
TimelineInfoV1,
};
use crate::remote_storage::{schedule_timeline_download, RemoteTimelineIndex};
use crate::timelines::{
extract_remote_timeline_info, LocalTimelineInfo, RemoteTimelineInfo, TimelineInfo,
};
use crate::repository::RepositoryTimeline;
use crate::timelines::TimelineInfo;
use crate::{config::PageServerConf, tenant_mgr, timelines, ZTenantId};
#[derive(Debug)]
struct State {
conf: &'static PageServerConf,
auth: Option<Arc<JwtAuth>>,
remote_index: Arc<RwLock<RemoteTimelineIndex>>,
allowlist_routes: Vec<Uri>,
}
impl State {
fn new(conf: &'static PageServerConf, auth: Option<Arc<JwtAuth>>) -> Self {
fn new(
conf: &'static PageServerConf,
auth: Option<Arc<JwtAuth>>,
remote_index: Arc<RwLock<RemoteTimelineIndex>>,
) -> Self {
let allowlist_routes = ["/v1/status", "/v1/doc", "/swagger.yml"]
.iter()
.map(|v| v.parse().unwrap())
@@ -42,6 +51,7 @@ impl State {
conf,
auth,
allowlist_routes,
remote_index,
}
}
}
@@ -88,7 +98,7 @@ async fn timeline_create_handler(mut request: Request<Body>) -> Result<Response<
.map_err(ApiError::from_err)??;
Ok(match new_timeline_info {
Some(info) => json_response(StatusCode::CREATED, TimelineInfoResponse::from(info))?,
Some(info) => json_response(StatusCode::CREATED, info)?,
None => json_response(StatusCode::CONFLICT, ())?,
})
}
@@ -97,15 +107,24 @@ async fn timeline_list_handler(request: Request<Body>) -> Result<Response<Body>,
let tenant_id: ZTenantId = parse_request_param(&request, "tenant_id")?;
check_permission(&request, Some(tenant_id))?;
let include_non_incremental_logical_size = get_include_non_incremental_logical_size(&request);
let response_data: Vec<TimelineInfoResponse> = tokio::task::spawn_blocking(move || {
let local_timeline_infos = tokio::task::spawn_blocking(move || {
let _enter = info_span!("timeline_list", tenant = %tenant_id).entered();
crate::timelines::get_timelines(tenant_id, include_non_incremental_logical_size)
crate::timelines::get_local_timelines(tenant_id, include_non_incremental_logical_size)
})
.await
.map_err(ApiError::from_err)??
.into_iter()
.map(TimelineInfoResponse::from)
.collect();
.map_err(ApiError::from_err)??;
let remote_index = get_state(&request).remote_index.read().await;
let mut response_data = Vec::with_capacity(local_timeline_infos.len());
for (timeline_id, local_timeline_info) in local_timeline_infos {
response_data.push(TimelineInfo {
tenant_id,
timeline_id,
local: Some(local_timeline_info),
remote: extract_remote_timeline_info(tenant_id, timeline_id, &remote_index),
})
}
Ok(json_response(StatusCode::OK, response_data)?)
}
@@ -124,30 +143,76 @@ fn get_include_non_incremental_logical_size(request: &Request<Body>) -> bool {
.unwrap_or(false)
}
async fn timeline_detail_handler(request: Request<Body>) -> Result<Response<Body>, ApiError> {
// common part for v1 and v2 handlers
async fn timeline_detail_common(request: Request<Body>) -> Result<TimelineInfo, ApiError> {
let tenant_id: ZTenantId = parse_request_param(&request, "tenant_id")?;
check_permission(&request, Some(tenant_id))?;
let timeline_id: ZTimelineId = parse_request_param(&request, "timeline_id")?;
let include_non_incremental_logical_size = get_include_non_incremental_logical_size(&request);
let response_data = tokio::task::spawn_blocking(move || {
let _enter =
info_span!("timeline_detail_handler", tenant = %tenant_id, timeline = %timeline_id)
.entered();
let span = info_span!("timeline_detail_handler", tenant = %tenant_id, timeline = %timeline_id);
let (local_timeline_info, span) = tokio::task::spawn_blocking(move || {
let entered = span.entered();
let repo = tenant_mgr::get_repository_for_tenant(tenant_id)?;
let include_non_incremental_logical_size =
get_include_non_incremental_logical_size(&request);
Ok::<_, anyhow::Error>(TimelineInfo::from_repo_timeline(
tenant_id,
repo.get_timeline(timeline_id)?,
include_non_incremental_logical_size,
))
let local_timeline = {
repo.get_timeline(timeline_id)
.map(|timeline| {
LocalTimelineInfo::from_repo_timeline(
timeline,
include_non_incremental_logical_size,
)
})
.transpose()?
};
Ok::<_, anyhow::Error>((local_timeline, entered.exit()))
})
.await
.map_err(ApiError::from_err)?
.map(TimelineInfoResponse::from)?;
.map_err(ApiError::from_err)??;
Ok(json_response(StatusCode::OK, response_data)?)
let remote_timeline_info = {
let remote_index_read = get_state(&request).remote_index.read().await;
remote_index_read
.timeline_entry(&ZTenantTimelineId {
tenant_id,
timeline_id,
})
.map(|remote_entry| RemoteTimelineInfo {
remote_consistent_lsn: remote_entry.disk_consistent_lsn(),
awaits_download: remote_entry.get_awaits_download(),
})
};
let _enter = span.entered();
if local_timeline_info.is_none() && remote_timeline_info.is_none() {
return Err(ApiError::NotFound(
"Timeline is not found neither locally nor remotely".to_string(),
));
}
Ok(TimelineInfo {
tenant_id,
timeline_id,
local: local_timeline_info,
remote: remote_timeline_info,
})
}
// TODO remove when console adopts v2
async fn timeline_detail_handler_v1(request: Request<Body>) -> Result<Response<Body>, ApiError> {
let timeline_info = timeline_detail_common(request).await?;
Ok(json_response(
StatusCode::OK,
TimelineInfoResponseV1::from(TimelineInfoV1::from(timeline_info)),
)?)
}
async fn timeline_detail_handler_v2(request: Request<Body>) -> Result<Response<Body>, ApiError> {
let timeline_info = timeline_detail_common(request).await?;
Ok(json_response(StatusCode::OK, timeline_info)?)
}
async fn timeline_attach_handler(request: Request<Body>) -> Result<Response<Body>, ApiError> {
@@ -155,31 +220,37 @@ async fn timeline_attach_handler(request: Request<Body>) -> Result<Response<Body
check_permission(&request, Some(tenant_id))?;
let timeline_id: ZTimelineId = parse_request_param(&request, "timeline_id")?;
let span = info_span!("timeline_attach_handler", tenant = %tenant_id, timeline = %timeline_id);
tokio::task::spawn_blocking(move || {
let _enter =
info_span!("timeline_attach_handler", tenant = %tenant_id, timeline = %timeline_id)
.entered();
let repo = tenant_mgr::get_repository_for_tenant(tenant_id)?;
match repo.get_timeline(timeline_id)? {
RepositoryTimeline::Local { .. } => {
anyhow::bail!("Timeline with id {} is already local", timeline_id)
}
RepositoryTimeline::Remote {
id: _,
disk_consistent_lsn: _,
} => {
// FIXME (rodionov) get timeline already schedules timeline for download, and duplicate tasks can cause errors
// first should be fixed in https://github.com/zenithdb/zenith/issues/997
// TODO (rodionov) change timeline state to awaits download (incapsulate it somewhere in the repo)
// TODO (rodionov) can we safely request replication on the timeline before sync is completed? (can be implemented on top of the #997)
Ok(())
}
}
let span = tokio::task::spawn_blocking(move || {
let entered = span.entered();
if tenant_mgr::get_timeline_for_tenant_load(tenant_id, timeline_id).is_ok() {
anyhow::bail!("Timeline is already present locally")
};
Ok(entered.exit())
})
.await
.map_err(ApiError::from_err)??;
let mut remote_index_write = get_state(&request).remote_index.write().await;
let _enter = span.entered(); // entered guard cannot live across awaits (non Send)
let index_entry = remote_index_write
.timeline_entry_mut(&ZTenantTimelineId {
tenant_id,
timeline_id,
})
.ok_or_else(|| ApiError::BadRequest("Unknown remote timeline".to_string()))?;
if index_entry.get_awaits_download() {
return Err(ApiError::NotFound(
"Timeline download is already in progress".to_string(),
));
}
index_entry.set_awaits_download(true);
schedule_timeline_download(tenant_id, timeline_id);
Ok(json_response(StatusCode::ACCEPTED, ())?)
}
@@ -221,13 +292,17 @@ async fn tenant_create_handler(mut request: Request<Body>) -> Result<Response<Bo
check_permission(&request, None)?;
let request_data: TenantCreateRequest = json_request(&mut request).await?;
let remote_index = Arc::clone(&get_state(&request).remote_index);
let target_tenant_id = request_data
.new_tenant_id
.map(ZTenantId::from)
.unwrap_or_else(ZTenantId::generate);
let new_tenant_id = tokio::task::spawn_blocking(move || {
let _enter = info_span!("tenant_create", tenant = ?request_data.new_tenant_id).entered();
tenant_mgr::create_tenant_repository(
get_config(&request),
request_data.new_tenant_id.map(ZTenantId::from),
)
let _enter = info_span!("tenant_create", tenant = ?target_tenant_id).entered();
tenant_mgr::create_tenant_repository(get_config(&request), target_tenant_id, remote_index)
})
.await
.map_err(ApiError::from_err)??;
@@ -248,6 +323,7 @@ async fn handler_404(_: Request<Body>) -> Result<Response<Body>, ApiError> {
pub fn make_router(
conf: &'static PageServerConf,
auth: Option<Arc<JwtAuth>>,
remote_index: Arc<RwLock<RemoteTimelineIndex>>,
) -> RouterBuilder<hyper::Body, ApiError> {
let spec = include_bytes!("openapi_spec.yml");
let mut router = attach_openapi_ui(endpoint::make_router(), spec, "/swagger.yml", "/v1/doc");
@@ -263,7 +339,7 @@ pub fn make_router(
}
router
.data(Arc::new(State::new(conf, auth)))
.data(Arc::new(State::new(conf, auth, remote_index)))
.get("/v1/status", status_handler)
.get("/v1/tenant", tenant_list_handler)
.post("/v1/tenant", tenant_create_handler)
@@ -271,7 +347,11 @@ pub fn make_router(
.post("/v1/tenant/:tenant_id/timeline", timeline_create_handler)
.get(
"/v1/tenant/:tenant_id/timeline/:timeline_id",
timeline_detail_handler,
timeline_detail_handler_v1,
)
.get(
"/v2/tenant/:tenant_id/timeline/:timeline_id",
timeline_detail_handler_v2,
)
.post(
"/v1/tenant/:tenant_id/timeline/:timeline_id/attach",

View File

@@ -35,9 +35,9 @@ use self::metadata::{metadata_path, TimelineMetadata, METADATA_FILE_NAME};
use crate::config::PageServerConf;
use crate::page_cache;
use crate::relish::*;
use crate::remote_storage::{schedule_timeline_checkpoint_upload, schedule_timeline_download};
use crate::remote_storage::{schedule_timeline_checkpoint_upload, RemoteTimelineIndex};
use crate::repository::{
BlockNumber, GcResult, Repository, RepositoryTimeline, Timeline, TimelineSyncState,
BlockNumber, GcResult, Repository, RepositoryTimeline, Timeline, TimelineSyncStatusUpdate,
TimelineWriter, ZenithWalRecord,
};
use crate::thread_mgr;
@@ -129,27 +129,46 @@ pub struct LayeredRepository {
// timeout...
gc_cs: Mutex<()>,
walredo_mgr: Arc<dyn WalRedoManager + Send + Sync>,
// provides access to timeline data sitting in the remote storage
// supposed to be used for retrieval of remote consistent lsn in walreceiver
remote_index: Arc<tokio::sync::RwLock<RemoteTimelineIndex>>,
/// Makes every timeline to backup their files to remote storage.
upload_relishes: bool,
}
/// Public interface
impl Repository for LayeredRepository {
fn get_timeline(&self, timelineid: ZTimelineId) -> Result<RepositoryTimeline> {
Ok(RepositoryTimeline::from(self.get_or_init_timeline(
timelineid,
&mut self.timelines.lock().unwrap(),
)?))
fn get_timeline(&self, timelineid: ZTimelineId) -> Option<RepositoryTimeline> {
let timelines = self.timelines.lock().unwrap();
self.get_timeline_internal(timelineid, &timelines)
.map(RepositoryTimeline::from)
}
fn list_timelines(&self) -> Result<Vec<RepositoryTimeline>> {
Ok(self
.timelines
fn get_timeline_load(&self, timelineid: ZTimelineId) -> Result<Arc<dyn Timeline>> {
let mut timelines = self.timelines.lock().unwrap();
match self.get_timeline_load_internal(timelineid, &mut timelines)? {
Some(local_loaded_timeline) => Ok(local_loaded_timeline as _),
None => anyhow::bail!(
"cannot get local timeline: unknown timeline id: {}",
timelineid
),
}
}
fn list_timelines(&self) -> Vec<(ZTimelineId, RepositoryTimeline)> {
self.timelines
.lock()
.unwrap()
.values()
.map(|timeline_entry| RepositoryTimeline::from(timeline_entry.clone()))
.collect())
.iter()
.map(|(timeline_id, timeline_entry)| {
(
*timeline_id,
RepositoryTimeline::from(timeline_entry.clone()),
)
})
.collect()
}
fn create_empty_timeline(
@@ -176,10 +195,16 @@ impl Repository for LayeredRepository {
self.upload_relishes,
);
let timeline_rc = Arc::new(timeline);
let r = timelines.insert(timelineid, LayeredTimelineEntry::Local(timeline_rc.clone()));
assert!(r.is_none());
Ok(timeline_rc)
let timeline = Arc::new(timeline);
let r = timelines.insert(
timelineid,
LayeredTimelineEntry::Loaded(Arc::clone(&timeline)),
);
ensure!(
r.is_none(),
"assertion failure, inserted duplicate timeline"
);
Ok(timeline)
}
/// Branch a timeline
@@ -190,14 +215,12 @@ impl Repository for LayeredRepository {
let _gc_cs = self.gc_cs.lock().unwrap();
let mut timelines = self.timelines.lock().unwrap();
let src_timeline = match self.get_or_init_timeline(src, &mut timelines)? {
LayeredTimelineEntry::Local(timeline) => timeline,
LayeredTimelineEntry::Remote { .. } => {
bail!("Cannot branch off the timeline {} that's not local", src)
}
};
let src_timeline = self
.get_timeline_load_internal(src, &mut timelines)
// message about timeline being remote is one .context up in the stack
.context("failed to load timeline for branching")?
.ok_or_else(|| anyhow::anyhow!("unknown timeline id: {}", &src))?;
let latest_gc_cutoff_lsn = src_timeline.get_latest_gc_cutoff_lsn();
src_timeline
.check_lsn_is_in_scope(start_lsn, &latest_gc_cutoff_lsn)
.context("invalid branch start lsn")?;
@@ -232,6 +255,7 @@ impl Repository for LayeredRepository {
);
crashsafe_dir::create_dir_all(self.conf.timeline_path(&dst, &self.tenantid))?;
Self::save_metadata(self.conf, dst, self.tenantid, &metadata, true)?;
timelines.insert(dst, LayeredTimelineEntry::Unloaded { id: dst, metadata });
info!("branched timeline {} from {} at {}", dst, src, start_lsn);
@@ -261,11 +285,19 @@ impl Repository for LayeredRepository {
fn checkpoint_iteration(&self, cconf: CheckpointConfig) -> Result<()> {
// Scan through the hashmap and collect a list of all the timelines,
// while holding the lock. Then drop the lock and actually perform the
// checkpoints. We don't want to block everything else while the
// checkpoints. We don't want to block everything else while the
// checkpoint runs.
let timelines = self.timelines.lock().unwrap();
let timelines_to_checkpoint = timelines
.iter()
// filter to get only loaded timelines
.filter_map(|(timelineid, entry)| match entry {
LayeredTimelineEntry::Loaded(timeline) => Some((timelineid, timeline)),
LayeredTimelineEntry::Unloaded { .. } => {
debug!("Skipping checkpoint for unloaded timeline {}", timelineid);
None
}
})
.map(|(timelineid, timeline)| (*timelineid, timeline.clone()))
.collect::<Vec<_>>();
drop(timelines);
@@ -273,13 +305,7 @@ impl Repository for LayeredRepository {
for (timelineid, timeline) in &timelines_to_checkpoint {
let _entered =
info_span!("checkpoint", timeline = %timelineid, tenant = %self.tenantid).entered();
match timeline {
LayeredTimelineEntry::Local(timeline) => timeline.checkpoint(cconf)?,
LayeredTimelineEntry::Remote { .. } => debug!(
"Cannot run the checkpoint for remote timeline {}",
timelineid
),
}
timeline.checkpoint(cconf)?;
}
Ok(())
@@ -288,32 +314,10 @@ impl Repository for LayeredRepository {
// Detaches the timeline from the repository.
fn detach_timeline(&self, timeline_id: ZTimelineId) -> Result<()> {
let mut timelines = self.timelines.lock().unwrap();
match timelines.entry(timeline_id) {
Entry::Vacant(_) => {
bail!("cannot detach non existing timeline");
}
Entry::Occupied(mut entry) => {
let timeline_entry = entry.get_mut();
if timelines.remove(&timeline_id).is_none() {
bail!("cannot detach timeline that is not available locally");
}
let timeline = match timeline_entry {
LayeredTimelineEntry::Remote { .. } => {
bail!("cannot detach remote timeline {}", timeline_id);
}
LayeredTimelineEntry::Local(timeline) => timeline,
};
// TODO (rodionov) keep local state in timeline itself (refactoring related to https://github.com/zenithdb/zenith/issues/997 and #1104)
// FIXME this is local disk consistent lsn, need to keep the latest succesfully uploaded checkpoint lsn in timeline (metadata?)
// https://github.com/zenithdb/zenith/issues/1104
let remote_disk_consistent_lsn = timeline.disk_consistent_lsn.load();
// reference to timeline is dropped here
entry.insert(LayeredTimelineEntry::Remote {
id: timeline_id,
disk_consistent_lsn: remote_disk_consistent_lsn,
});
}
};
// Release the lock to shutdown and remove the files without holding it
drop(timelines);
// shutdown the timeline (this shuts down the walreceiver)
@@ -324,158 +328,142 @@ impl Repository for LayeredRepository {
Ok(())
}
// TODO this method currentlly does not do anything to prevent (or react to) state updates between a sync task schedule and a sync task end (that causes this update).
// Sync task is enqueued and can error and be rescheduled, so some significant time may pass between the events.
//
/// Reacts on the timeline sync state change, changing pageserver's memory state for this timeline (unload or load of the timeline files).
fn set_timeline_state(
fn apply_timeline_remote_sync_status_update(
&self,
timeline_id: ZTimelineId,
new_state: TimelineSyncState,
timeline_sync_status_update: TimelineSyncStatusUpdate,
) -> Result<()> {
debug!(
"set_timeline_state: timeline_id: {}, new_state: {:?}",
timeline_id, new_state
"apply_timeline_remote_sync_status_update timeline_id: {} update: {:?}",
timeline_id, timeline_sync_status_update
);
let mut timelines_accessor = self.timelines.lock().unwrap();
match new_state {
TimelineSyncState::Ready(_) => {
let reloaded_timeline =
self.init_local_timeline(timeline_id, &mut timelines_accessor)?;
timelines_accessor
.insert(timeline_id, LayeredTimelineEntry::Local(reloaded_timeline));
None
match timeline_sync_status_update {
TimelineSyncStatusUpdate::Uploaded => { /* nothing to do, remote consistent lsn is managed by the remote storage */
}
TimelineSyncState::Evicted(_) => timelines_accessor.remove(&timeline_id),
TimelineSyncState::AwaitsDownload(disk_consistent_lsn)
| TimelineSyncState::CloudOnly(disk_consistent_lsn) => timelines_accessor.insert(
timeline_id,
LayeredTimelineEntry::Remote {
id: timeline_id,
disk_consistent_lsn,
},
),
};
// NOTE we do not delete local data in case timeline became cloud only, this is performed in detach_timeline
drop(timelines_accessor);
TimelineSyncStatusUpdate::Downloaded => {
match self.timelines.lock().unwrap().entry(timeline_id) {
Entry::Occupied(_) => bail!("We completed a download for a timeline that already exists in repository. This is a bug."),
Entry::Vacant(entry) => {
// we need to get metadata of a timeline, another option is to pass it along with Downloaded status
let metadata = Self::load_metadata(self.conf, timeline_id, self.tenantid).context("failed to load local metadata")?;
// finally we make newly downloaded timeline visible to repository
entry.insert(LayeredTimelineEntry::Unloaded { id: timeline_id, metadata, })
},
};
}
}
Ok(())
}
/// Layered repo does not store anything but
/// * local, fully loaded timelines, ready for usage
/// * remote timelines, that need a download task scheduled first before they can be used
///
/// [`TimelineSyncState::Evicted`] and other non-local and non-remote states are not stored in the layered repo at all,
/// hence their statuses cannot be returned by the repo.
fn get_timeline_state(&self, timeline_id: ZTimelineId) -> Option<TimelineSyncState> {
let timelines_accessor = self.timelines.lock().unwrap();
let timeline_entry = timelines_accessor.get(&timeline_id)?;
Some(
if timeline_entry
.local_or_schedule_download(self.tenantid)
.is_some()
{
TimelineSyncState::Ready(timeline_entry.disk_consistent_lsn())
} else {
TimelineSyncState::CloudOnly(timeline_entry.disk_consistent_lsn())
},
)
fn get_remote_index(&self) -> &tokio::sync::RwLock<RemoteTimelineIndex> {
self.remote_index.as_ref()
}
}
#[derive(Clone)]
enum LayeredTimelineEntry {
Local(Arc<LayeredTimeline>),
Remote {
Loaded(Arc<LayeredTimeline>),
Unloaded {
id: ZTimelineId,
/// metadata contents of the latest successfully uploaded checkpoint
disk_consistent_lsn: Lsn,
metadata: TimelineMetadata,
},
}
impl LayeredTimelineEntry {
fn timeline_id(&self) -> ZTimelineId {
match self {
LayeredTimelineEntry::Local(timeline) => timeline.timelineid,
LayeredTimelineEntry::Remote { id, .. } => *id,
LayeredTimelineEntry::Loaded(timeline) => timeline.timelineid,
LayeredTimelineEntry::Unloaded { id, .. } => *id,
}
}
/// Gets local timeline data, if it's present. Otherwise schedules a download fot the remote timeline and returns `None`.
fn local_or_schedule_download(&self, tenant_id: ZTenantId) -> Option<&LayeredTimeline> {
fn ancestor_timeline_id(&self) -> Option<ZTimelineId> {
match self {
Self::Local(local) => Some(local.as_ref()),
Self::Remote {
id: timeline_id, ..
} => {
debug!(
"Accessed a remote timeline {} for tenant {}, scheduling a timeline download",
timeline_id, tenant_id
);
schedule_timeline_download(tenant_id, *timeline_id);
None
LayeredTimelineEntry::Loaded(timeline) => {
timeline.ancestor_timeline.as_ref().map(|t| t.timeline_id())
}
LayeredTimelineEntry::Unloaded { metadata, .. } => metadata.ancestor_timeline(),
}
}
/// Gets a current (latest for the remote case) disk consistent Lsn for the timeline.
fn disk_consistent_lsn(&self) -> Lsn {
fn ancestor_lsn(&self) -> Lsn {
match self {
Self::Local(local) => local.disk_consistent_lsn.load(),
Self::Remote {
disk_consistent_lsn,
..
} => *disk_consistent_lsn,
LayeredTimelineEntry::Loaded(timeline) => timeline.ancestor_lsn,
LayeredTimelineEntry::Unloaded { metadata, .. } => metadata.ancestor_lsn(),
}
}
fn ensure_loaded(&self) -> anyhow::Result<&Arc<LayeredTimeline>> {
match self {
LayeredTimelineEntry::Loaded(timeline) => Ok(timeline),
LayeredTimelineEntry::Unloaded { .. } => {
anyhow::bail!("timeline is unloaded")
}
}
}
}
impl From<LayeredTimelineEntry> for RepositoryTimeline {
fn from(layered_timeline: LayeredTimelineEntry) -> Self {
match layered_timeline {
LayeredTimelineEntry::Local(timeline) => RepositoryTimeline::Local {
id: timeline.timelineid,
timeline,
},
LayeredTimelineEntry::Remote {
id,
disk_consistent_lsn,
} => RepositoryTimeline::Remote {
id,
disk_consistent_lsn,
},
fn from(entry: LayeredTimelineEntry) -> Self {
match entry {
LayeredTimelineEntry::Loaded(timeline) => RepositoryTimeline::Loaded(timeline as _),
LayeredTimelineEntry::Unloaded { metadata, .. } => {
RepositoryTimeline::Unloaded { metadata }
}
}
}
}
/// Private functions
impl LayeredRepository {
// Implementation of the public `get_timeline` function. This differs from the public
// interface in that the caller must already hold the mutex on the 'timelines' hashmap.
fn get_or_init_timeline(
// Implementation of the public `get_timeline` function.
// Differences from the public:
// * interface in that the caller must already hold the mutex on the 'timelines' hashmap.
fn get_timeline_internal(
&self,
timelineid: ZTimelineId,
timelines: &HashMap<ZTimelineId, LayeredTimelineEntry>,
) -> Option<LayeredTimelineEntry> {
timelines.get(&timelineid).cloned()
}
// Implementation of the public `get_timeline_load` function.
// Differences from the public:
// * interface in that the caller must already hold the mutex on the 'timelines' hashmap.
fn get_timeline_load_internal(
&self,
timelineid: ZTimelineId,
timelines: &mut HashMap<ZTimelineId, LayeredTimelineEntry>,
) -> Result<LayeredTimelineEntry> {
) -> anyhow::Result<Option<Arc<LayeredTimeline>>> {
match timelines.get(&timelineid) {
Some(timeline_entry) => {
let _ = timeline_entry.local_or_schedule_download(self.tenantid);
Ok(timeline_entry.clone())
}
Some(entry) => match entry {
LayeredTimelineEntry::Loaded(local_timeline) => {
trace!("timeline {} found loaded", &timelineid);
return Ok(Some(Arc::clone(local_timeline)));
}
LayeredTimelineEntry::Unloaded { .. } => {
trace!("timeline {} found unloaded", &timelineid)
}
},
None => {
let timeline = self.init_local_timeline(timelineid, timelines)?;
timelines.insert(
timelineid,
LayeredTimelineEntry::Local(Arc::clone(&timeline)),
);
Ok(LayeredTimelineEntry::Local(timeline))
trace!("timeline {} not found", &timelineid);
return Ok(None);
}
}
};
let timeline = self.load_local_timeline(timelineid, timelines)?;
let was_loaded = timelines.insert(
timelineid,
LayeredTimelineEntry::Loaded(Arc::clone(&timeline)),
);
ensure!(
was_loaded.is_none()
|| matches!(was_loaded, Some(LayeredTimelineEntry::Unloaded { .. })),
"assertion failure, inserted wrong timeline in an incorrect state"
);
Ok(Some(timeline))
}
fn init_local_timeline(
fn load_local_timeline(
&self,
timelineid: ZTimelineId,
timelines: &mut HashMap<ZTimelineId, LayeredTimelineEntry>,
@@ -486,8 +474,18 @@ impl LayeredRepository {
let ancestor = metadata
.ancestor_timeline()
.map(|ancestor_timelineid| self.get_or_init_timeline(ancestor_timelineid, timelines))
.transpose()?;
.map(|ancestor_timeline_id| {
trace!(
"loading {}'s ancestor {}",
timelineid,
&ancestor_timeline_id
);
self.get_timeline_load_internal(ancestor_timeline_id, timelines)
})
.transpose()
.context("cannot load ancestor timeline")?
.flatten()
.map(LayeredTimelineEntry::Loaded);
let _enter =
info_span!("loading timeline", timeline = %timelineid, tenant = %self.tenantid)
.entered();
@@ -513,6 +511,7 @@ impl LayeredRepository {
conf: &'static PageServerConf,
walredo_mgr: Arc<dyn WalRedoManager + Send + Sync>,
tenantid: ZTenantId,
remote_index: Arc<tokio::sync::RwLock<RemoteTimelineIndex>>,
upload_relishes: bool,
) -> LayeredRepository {
LayeredRepository {
@@ -521,6 +520,7 @@ impl LayeredRepository {
timelines: Mutex::new(HashMap::new()),
gc_cs: Mutex::new(()),
walredo_mgr,
remote_index,
upload_relishes,
}
}
@@ -608,86 +608,46 @@ impl LayeredRepository {
// grab mutex to prevent new timelines from being created here.
let _gc_cs = self.gc_cs.lock().unwrap();
let mut timelines = self.timelines.lock().unwrap();
// Scan all timelines. For each timeline, remember the timeline ID and
// the branch point where it was created.
//
let mut timelineids: Vec<ZTimelineId> = Vec::new();
// We scan the directory, not the in-memory hash table, because the hash
// table only contains entries for timelines that have been accessed. We
// need to take all timelines into account, not only the active ones.
let timelines_path = self.conf.timelines_path(&self.tenantid);
for direntry in fs::read_dir(timelines_path)? {
let direntry = direntry?;
if let Some(fname) = direntry.file_name().to_str() {
if let Ok(timelineid) = fname.parse::<ZTimelineId>() {
timelineids.push(timelineid);
}
}
}
// Now collect info about branchpoints
let mut all_branchpoints: BTreeSet<(ZTimelineId, Lsn)> = BTreeSet::new();
for &timelineid in &timelineids {
let timeline = match self.get_or_init_timeline(timelineid, &mut timelines)? {
LayeredTimelineEntry::Local(timeline) => timeline,
LayeredTimelineEntry::Remote { .. } => {
warn!(
"Timeline {} is not local, cannot proceed with gc",
timelineid
);
return Ok(totals);
}
};
let mut timeline_ids = Vec::new();
let mut timelines = self.timelines.lock().unwrap();
if let Some(ancestor_timeline) = &timeline.ancestor_timeline {
let ancestor_timeline =
match ancestor_timeline.local_or_schedule_download(self.tenantid) {
Some(timeline) => timeline,
None => {
warn!(
"Timeline {} has ancestor {} is not local, cannot proceed with gc",
timelineid,
ancestor_timeline.timeline_id()
);
return Ok(totals);
}
};
for (timeline_id, timeline_entry) in timelines.iter() {
timeline_ids.push(*timeline_id);
// This is unresolved question for now, how to do gc in presense of remote timelines
// especially when this is combined with branching.
// Somewhat related: https://github.com/zenithdb/zenith/issues/999
if let Some(ancestor_timeline_id) = &timeline_entry.ancestor_timeline_id() {
// If target_timeline is specified, we only need to know branchpoints of its children
if let Some(timelineid) = target_timelineid {
if ancestor_timeline.timelineid == timelineid {
if ancestor_timeline_id == &timelineid {
all_branchpoints
.insert((ancestor_timeline.timelineid, timeline.ancestor_lsn));
.insert((*ancestor_timeline_id, timeline_entry.ancestor_lsn()));
}
}
// Collect branchpoints for all timelines
else {
all_branchpoints.insert((ancestor_timeline.timelineid, timeline.ancestor_lsn));
all_branchpoints.insert((*ancestor_timeline_id, timeline_entry.ancestor_lsn()));
}
}
}
// Ok, we now know all the branch points.
// Perform GC for each timeline.
for timelineid in timelineids {
for timelineid in timeline_ids.into_iter() {
if thread_mgr::is_shutdown_requested() {
// We were requested to shut down. Stop and return with the progress we
// made.
break;
}
// We have already loaded all timelines above
// so this operation is just a quick map lookup.
let timeline = match self.get_or_init_timeline(timelineid, &mut *timelines)? {
LayeredTimelineEntry::Local(timeline) => timeline,
LayeredTimelineEntry::Remote { .. } => {
debug!("Skipping GC for non-local timeline {}", timelineid);
continue;
}
};
// Timeline is known to be local and loaded.
let timeline = self
.get_timeline_load_internal(timelineid, &mut *timelines)?
.expect("checked above that timeline is local and loaded");
// If target_timeline is specified, only GC it
if let Some(target_timelineid) = target_timelineid {
@@ -989,13 +949,13 @@ impl Timeline for LayeredTimeline {
match &timeline.ancestor_timeline {
None => break,
Some(ancestor_entry) => {
match ancestor_entry.local_or_schedule_download(self.tenantid) {
Some(ancestor) => {
timeline = ancestor;
continue;
}
None => bail!("Cannot list relishes for timeline {} tenant {} due to its ancestor being remote only", self.timelineid, self.tenantid),
}
timeline = ancestor_entry.ensure_loaded().with_context(
|| format!(
"cannot list relishes for timeline {} tenant {} due to its ancestor {} being either unloaded",
self.timelineid, self.tenantid, ancestor_entry.timeline_id(),
)
)?;
continue;
}
}
}
@@ -1313,19 +1273,15 @@ impl LayeredTimeline {
while lsn < timeline.ancestor_lsn {
trace!("going into ancestor {} ", timeline.ancestor_lsn);
timeline = match timeline
.ancestor_timeline
.as_ref()
.and_then(|ancestor_entry| ancestor_entry.local_or_schedule_download(self.tenantid))
{
Some(timeline) => timeline,
None => {
bail!(
"Cannot get the whole layer for read locked: timeline {} is not present locally",
self.timelineid
)
}
};
timeline = timeline
.ancestor_timeline
.as_ref()
.expect("there should be an ancestor")
.ensure_loaded()
.with_context(|| format!(
"Cannot get the whole layer for read locked: timeline {} is not present locally",
self.get_ancestor_timeline_id().unwrap())
)?;
}
// Now we have the right starting timeline for our search.
@@ -1366,18 +1322,13 @@ impl LayeredTimeline {
// If not, check if there's a layer on the ancestor timeline
match &timeline.ancestor_timeline {
Some(ancestor_entry) => {
match ancestor_entry.local_or_schedule_download(self.tenantid) {
Some(ancestor) => {
lsn = timeline.ancestor_lsn;
timeline = ancestor;
trace!("recursing into ancestor at {}/{}", timeline.timelineid, lsn);
continue;
}
None => bail!(
"Cannot get a layer for read from remote ancestor timeline {}",
self.timelineid
),
}
let ancestor = ancestor_entry
.ensure_loaded()
.context("cannot get a layer for read from ancestor because it is either remote or unloaded")?;
lsn = timeline.ancestor_lsn;
timeline = ancestor;
trace!("recursing into ancestor at {}/{}", timeline.timelineid, lsn);
continue;
}
None => return Ok(None),
}
@@ -1501,7 +1452,6 @@ impl LayeredTimeline {
fn checkpoint_internal(&self, checkpoint_distance: u64, reconstruct_pages: bool) -> Result<()> {
// Prevent concurrent checkpoints
let _checkpoint_cs = self.checkpoint_cs.lock().unwrap();
let write_guard = self.write_lock.lock().unwrap();
let mut layers = self.layers.lock().unwrap();
@@ -1862,10 +1812,10 @@ impl LayeredTimeline {
);
}
// Now check ancestor timelines, if any are present locally
else if let Some(ancestor) =
self.ancestor_timeline.as_ref().and_then(|timeline_entry| {
timeline_entry.local_or_schedule_download(self.tenantid)
})
else if let Some(ancestor) = self
.ancestor_timeline
.as_ref()
.and_then(|timeline_entry| timeline_entry.ensure_loaded().ok())
{
let prior_lsn = ancestor.get_last_record_lsn();
if seg.rel.is_blocky() {
@@ -2435,9 +2385,8 @@ mod tests {
metadata_bytes[512 - 4 - 2] ^= 1;
std::fs::write(metadata_path, metadata_bytes)?;
let new_repo = harness.load();
let err = new_repo.get_timeline(TIMELINE_ID).err().unwrap();
assert_eq!(err.to_string(), "failed to load metadata");
let err = harness.try_load().err().expect("should fail");
assert_eq!(err.to_string(), "failed to load local metadata");
assert_eq!(
err.source().unwrap().to_string(),
"metadata checksum mismatch"
@@ -2527,7 +2476,7 @@ mod tests {
// Load the timeline. This will cause the files in the "future" to be renamed
// away.
let new_repo = harness.load();
new_repo.get_timeline(TIMELINE_ID).unwrap();
new_repo.get_timeline_load(TIMELINE_ID).unwrap();
drop(new_repo);
for filename in future_filenames.iter() {
@@ -2544,7 +2493,7 @@ mod tests {
}
let new_repo = harness.load();
new_repo.get_timeline(TIMELINE_ID).unwrap();
new_repo.get_timeline_load(TIMELINE_ID).unwrap();
drop(new_repo);
for filename in future_filenames.iter() {

View File

@@ -322,8 +322,8 @@ impl PageServerHandler {
let _enter = info_span!("pagestream", timeline = %timelineid, tenant = %tenantid).entered();
// Check that the timeline exists
let timeline = tenant_mgr::get_timeline_for_tenant(tenantid, timelineid)
.context("Cannot handle pagerequests for a remote timeline")?;
let timeline = tenant_mgr::get_timeline_for_tenant_load(tenantid, timelineid)
.context("Cannot load local timeline")?;
/* switch client to COPYBOTH */
pgb.write_message(&BeMessage::CopyBothResponse)?;
@@ -520,8 +520,8 @@ impl PageServerHandler {
let _enter = span.enter();
// check that the timeline exists
let timeline = tenant_mgr::get_timeline_for_tenant(tenantid, timelineid)
.context("Cannot handle basebackup request for a remote timeline")?;
let timeline = tenant_mgr::get_timeline_for_tenant_load(tenantid, timelineid)
.context("Cannot load local timeline")?;
let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
if let Some(lsn) = lsn {
timeline
@@ -655,8 +655,8 @@ impl postgres_backend::Handler for PageServerHandler {
info_span!("callmemaybe", timeline = %timelineid, tenant = %tenantid).entered();
// Check that the timeline exists
tenant_mgr::get_timeline_for_tenant(tenantid, timelineid)
.context("Failed to fetch local timeline for callmemaybe requests")?;
tenant_mgr::get_timeline_for_tenant_load(tenantid, timelineid)
.context("Cannot load local timeline")?;
walreceiver::launch_wal_receiver(self.conf, tenantid, timelineid, &connstr)?;
@@ -778,8 +778,8 @@ impl postgres_backend::Handler for PageServerHandler {
let tenantid = ZTenantId::from_str(caps.get(1).unwrap().as_str())?;
let timelineid = ZTimelineId::from_str(caps.get(2).unwrap().as_str())?;
let timeline = tenant_mgr::get_timeline_for_tenant(tenantid, timelineid)
.context("Failed to fetch local timeline for checkpoint request")?;
let timeline = tenant_mgr::get_timeline_for_tenant_load(tenantid, timelineid)
.context("Cannot load local timeline")?;
timeline.checkpoint(CheckpointConfig::Forced)?;
pgb.write_message_noflush(&SINGLE_COL_ROWDESC)?

View File

@@ -89,32 +89,38 @@ use std::{
collections::HashMap,
ffi, fs,
path::{Path, PathBuf},
sync::Arc,
};
use anyhow::{bail, Context};
use tokio::io;
use tokio::{io, sync::RwLock};
use tracing::{error, info};
use zenith_utils::zid::{ZTenantId, ZTenantTimelineId, ZTimelineId};
pub use self::storage_sync::index::{RemoteTimelineIndex, TimelineIndexEntry};
pub use self::storage_sync::{schedule_timeline_checkpoint_upload, schedule_timeline_download};
use self::{local_fs::LocalFs, rust_s3::S3};
use crate::{
config::{PageServerConf, RemoteStorageKind},
layered_repository::metadata::{TimelineMetadata, METADATA_FILE_NAME},
repository::TimelineSyncState,
};
pub use storage_sync::compression;
#[derive(Clone, Copy, Debug)]
pub enum LocalTimelineInitStatus {
LocallyComplete,
NeedsSync,
}
type LocalTimelineInitStatuses = HashMap<ZTenantId, HashMap<ZTimelineId, LocalTimelineInitStatus>>;
/// A structure to combine all synchronization data to share with pageserver after a successful sync loop initialization.
/// Successful initialization includes a case when sync loop is not started, in which case the startup data is returned still,
/// to simplify the received code.
pub struct SyncStartupData {
/// A sync state, derived from initial comparison of local timeline files and the remote archives,
/// before any sync tasks are executed.
/// To reuse the local file scan logic, the timeline states are returned even if no sync loop get started during init:
/// in this case, no remote files exist and all local timelines with correct metadata files are considered ready.
pub initial_timeline_states: HashMap<ZTenantId, HashMap<ZTimelineId, TimelineSyncState>>,
pub remote_index: Arc<RwLock<RemoteTimelineIndex>>,
pub local_timeline_init_statuses: LocalTimelineInitStatuses,
}
/// Based on the config, initiates the remote storage connection and starts a separate thread
@@ -154,23 +160,18 @@ pub fn start_local_timeline_sync(
.context("Failed to spawn the storage sync thread"),
None => {
info!("No remote storage configured, skipping storage sync, considering all local timelines with correct metadata files enabled");
let mut initial_timeline_states: HashMap<
ZTenantId,
HashMap<ZTimelineId, TimelineSyncState>,
> = HashMap::new();
for (ZTenantTimelineId{tenant_id, timeline_id}, (timeline_metadata, _)) in
let mut local_timeline_init_statuses = LocalTimelineInitStatuses::new();
for (ZTenantTimelineId { tenant_id, timeline_id }, _) in
local_timeline_files
{
initial_timeline_states
local_timeline_init_statuses
.entry(tenant_id)
.or_default()
.insert(
timeline_id,
TimelineSyncState::Ready(timeline_metadata.disk_consistent_lsn()),
);
.insert(timeline_id, LocalTimelineInitStatus::LocallyComplete);
}
Ok(SyncStartupData {
initial_timeline_states,
local_timeline_init_statuses,
remote_index: Arc::new(RwLock::new(RemoteTimelineIndex::empty())),
})
}
}

View File

@@ -58,7 +58,7 @@
//! Synchronization never removes any local from pageserver workdir or remote files from the remote storage, yet there could be overwrites of the same files (metadata file updates; future checksum mismatch fixes).
//! NOTE: No real contents or checksum check happens right now and is a subject to improve later.
//!
//! After the whole timeline is downloaded, [`crate::tenant_mgr::set_timeline_states`] function is used to update pageserver memory stage for the timeline processed.
//! After the whole timeline is downloaded, [`crate::tenant_mgr::apply_timeline_sync_status_updates`] function is used to update pageserver memory stage for the timeline processed.
//!
//! When pageserver signals shutdown, current sync task gets finished and the loop exists.
@@ -93,17 +93,25 @@ use self::{
download::{download_timeline, DownloadedTimeline},
index::{
ArchiveDescription, ArchiveId, RemoteTimeline, RemoteTimelineIndex, TimelineIndexEntry,
TimelineIndexEntryInner,
},
upload::upload_timeline_checkpoint,
};
use super::{RemoteStorage, SyncStartupData, ZTenantTimelineId};
use super::{
LocalTimelineInitStatus, LocalTimelineInitStatuses, RemoteStorage, SyncStartupData,
ZTenantTimelineId,
};
use crate::{
config::PageServerConf, layered_repository::metadata::TimelineMetadata,
remote_storage::storage_sync::compression::read_archive_header, repository::TimelineSyncState,
tenant_mgr::set_timeline_states, thread_mgr, thread_mgr::ThreadKind,
remote_storage::storage_sync::compression::read_archive_header,
repository::TimelineSyncStatusUpdate, tenant_mgr::apply_timeline_sync_status_updates,
thread_mgr, thread_mgr::ThreadKind,
};
use zenith_metrics::{register_histogram_vec, register_int_gauge, HistogramVec, IntGauge};
use zenith_metrics::{
register_histogram_vec, register_int_counter, register_int_gauge, HistogramVec, IntCounter,
IntGauge,
};
use zenith_utils::zid::{ZTenantId, ZTimelineId};
lazy_static! {
@@ -112,6 +120,11 @@ lazy_static! {
"Number of storage sync items left in the queue"
)
.expect("failed to register pageserver remote storage remaining sync items int gauge");
static ref FATAL_TASK_FAILURES: IntCounter = register_int_counter!(
"pageserver_remote_storage_fatal_task_failures",
"Number of critically failed tasks"
)
.expect("failed to register pageserver remote storage remaining sync items int gauge");
static ref IMAGE_SYNC_TIME: HistogramVec = register_histogram_vec!(
"pageserver_remote_storage_image_sync_time",
"Time took to synchronize (download or upload) a whole pageserver image. \
@@ -379,10 +392,13 @@ pub(super) fn spawn_storage_sync_thread<
None
}
});
let remote_index = RemoteTimelineIndex::try_parse_descriptions_from_paths(conf, download_paths);
let initial_timeline_states = schedule_first_sync_tasks(&remote_index, local_timeline_files);
let mut remote_index =
RemoteTimelineIndex::try_parse_descriptions_from_paths(conf, download_paths);
let local_timeline_init_statuses =
schedule_first_sync_tasks(&mut remote_index, local_timeline_files);
let remote_index = Arc::new(RwLock::new(remote_index));
let remote_index_cloned = Arc::clone(&remote_index);
thread_mgr::spawn(
ThreadKind::StorageSync,
None,
@@ -393,7 +409,7 @@ pub(super) fn spawn_storage_sync_thread<
runtime,
conf,
receiver,
remote_index,
remote_index_cloned,
storage,
max_concurrent_sync,
max_sync_errors,
@@ -402,12 +418,13 @@ pub(super) fn spawn_storage_sync_thread<
)
.context("Failed to spawn remote storage sync thread")?;
Ok(SyncStartupData {
initial_timeline_states,
remote_index,
local_timeline_init_statuses,
})
}
enum LoopStep {
NewStates(HashMap<ZTenantId, HashMap<ZTimelineId, TimelineSyncState>>),
SyncStatusUpdates(HashMap<ZTenantId, HashMap<ZTimelineId, TimelineSyncStatusUpdate>>),
Shutdown,
}
@@ -419,13 +436,14 @@ fn storage_sync_loop<
runtime: Runtime,
conf: &'static PageServerConf,
mut receiver: UnboundedReceiver<SyncTask>,
index: RemoteTimelineIndex,
index: Arc<RwLock<RemoteTimelineIndex>>,
storage: S,
max_concurrent_sync: NonZeroUsize,
max_sync_errors: NonZeroU32,
) -> anyhow::Result<()> {
let remote_assets = Arc::new((storage, RwLock::new(index)));
let remote_assets = Arc::new((storage, Arc::clone(&index)));
loop {
let index = Arc::clone(&index);
let loop_step = runtime.block_on(async {
tokio::select! {
new_timeline_states = loop_step(
@@ -435,15 +453,15 @@ fn storage_sync_loop<
max_concurrent_sync,
max_sync_errors,
)
.instrument(debug_span!("storage_sync_loop_step")) => LoopStep::NewStates(new_timeline_states),
.instrument(debug_span!("storage_sync_loop_step")) => LoopStep::SyncStatusUpdates(new_timeline_states),
_ = thread_mgr::shutdown_watcher() => LoopStep::Shutdown,
}
});
match loop_step {
LoopStep::NewStates(new_timeline_states) => {
LoopStep::SyncStatusUpdates(new_timeline_states) => {
// Batch timeline download registration to ensure that the external registration code won't block any running tasks before.
set_timeline_states(conf, new_timeline_states);
apply_timeline_sync_status_updates(conf, index, new_timeline_states);
debug!("Sync loop step completed");
}
LoopStep::Shutdown => {
@@ -462,10 +480,10 @@ async fn loop_step<
>(
conf: &'static PageServerConf,
receiver: &mut UnboundedReceiver<SyncTask>,
remote_assets: Arc<(S, RwLock<RemoteTimelineIndex>)>,
remote_assets: Arc<(S, Arc<RwLock<RemoteTimelineIndex>>)>,
max_concurrent_sync: NonZeroUsize,
max_sync_errors: NonZeroU32,
) -> HashMap<ZTenantId, HashMap<ZTimelineId, TimelineSyncState>> {
) -> HashMap<ZTenantId, HashMap<ZTimelineId, TimelineSyncStatusUpdate>> {
let max_concurrent_sync = max_concurrent_sync.get();
let mut next_tasks = BTreeSet::new();
@@ -516,8 +534,10 @@ async fn loop_step<
})
.collect::<FuturesUnordered<_>>();
let mut new_timeline_states: HashMap<ZTenantId, HashMap<ZTimelineId, TimelineSyncState>> =
HashMap::with_capacity(max_concurrent_sync);
let mut new_timeline_states: HashMap<
ZTenantId,
HashMap<ZTimelineId, TimelineSyncStatusUpdate>,
> = HashMap::with_capacity(max_concurrent_sync);
while let Some((sync_id, state_update)) = task_batch.next().await {
debug!("Finished storage sync task for sync id {}", sync_id);
if let Some(state_update) = state_update {
@@ -540,24 +560,19 @@ async fn process_task<
S: RemoteStorage<StoragePath = P> + Send + Sync + 'static,
>(
conf: &'static PageServerConf,
remote_assets: Arc<(S, RwLock<RemoteTimelineIndex>)>,
remote_assets: Arc<(S, Arc<RwLock<RemoteTimelineIndex>>)>,
task: SyncTask,
max_sync_errors: NonZeroU32,
) -> Option<TimelineSyncState> {
) -> Option<TimelineSyncStatusUpdate> {
if task.retries > max_sync_errors.get() {
error!(
"Evicting task {:?} that failed {} times, exceeding the error threshold",
task.kind, task.retries
);
return Some(TimelineSyncState::Evicted(
remote_assets
.as_ref()
.1
.read()
.await
.timeline_entry(&task.sync_id)
.and_then(TimelineIndexEntry::disk_consistent_lsn),
));
FATAL_TASK_FAILURES.inc();
// FIXME (rodionov) this can potentially leave holes in timeline uploads
// planneed to be fixed as part of https://github.com/zenithdb/zenith/issues/977
return None;
}
if task.retries > 0 {
@@ -569,6 +584,8 @@ async fn process_task<
tokio::time::sleep(Duration::from_secs_f64(seconds_to_wait)).await;
}
let remote_index = Arc::clone(&remote_assets.1);
let sync_start = Instant::now();
let sync_name = task.kind.sync_name();
match task.kind {
@@ -585,19 +602,25 @@ async fn process_task<
match download_result {
DownloadedTimeline::Abort => {
register_sync_status(sync_start, sync_name, None);
remote_index
.write()
.await
.set_awaits_download(&task.sync_id, false)
.expect("timeline should be present in remote index");
None
}
DownloadedTimeline::FailedAndRescheduled {
disk_consistent_lsn,
} => {
DownloadedTimeline::FailedAndRescheduled => {
register_sync_status(sync_start, sync_name, Some(false));
Some(TimelineSyncState::AwaitsDownload(disk_consistent_lsn))
None
}
DownloadedTimeline::Successful {
disk_consistent_lsn,
} => {
DownloadedTimeline::Successful => {
register_sync_status(sync_start, sync_name, Some(true));
Some(TimelineSyncState::Ready(disk_consistent_lsn))
remote_index
.write()
.await
.set_awaits_download(&task.sync_id, false)
.expect("timeline should be present in remote index");
Some(TimelineSyncStatusUpdate::Downloaded)
}
}
}
@@ -617,45 +640,45 @@ async fn process_task<
}
fn schedule_first_sync_tasks(
index: &RemoteTimelineIndex,
index: &mut RemoteTimelineIndex,
local_timeline_files: HashMap<ZTenantTimelineId, (TimelineMetadata, Vec<PathBuf>)>,
) -> HashMap<ZTenantId, HashMap<ZTimelineId, TimelineSyncState>> {
let mut initial_timeline_statuses: HashMap<ZTenantId, HashMap<ZTimelineId, TimelineSyncState>> =
HashMap::new();
) -> LocalTimelineInitStatuses {
let mut local_timeline_init_statuses = LocalTimelineInitStatuses::new();
let mut new_sync_tasks =
VecDeque::with_capacity(local_timeline_files.len().max(local_timeline_files.len()));
for (sync_id, (local_metadata, local_files)) in local_timeline_files {
let local_disk_consistent_lsn = local_metadata.disk_consistent_lsn();
let ZTenantTimelineId {
tenant_id,
timeline_id,
} = sync_id;
match index.timeline_entry(&sync_id) {
match index.timeline_entry_mut(&sync_id) {
Some(index_entry) => {
let timeline_status = compare_local_and_remote_timeline(
let (timeline_status, awaits_download) = compare_local_and_remote_timeline(
&mut new_sync_tasks,
sync_id,
local_metadata,
local_files,
index_entry,
);
match timeline_status {
Some(timeline_status) => {
initial_timeline_statuses
.entry(tenant_id)
.or_default()
.insert(timeline_id, timeline_status);
}
None => error!(
"Failed to compare local and remote timeline for task {}",
sync_id
),
let was_there = local_timeline_init_statuses
.entry(tenant_id)
.or_default()
.insert(timeline_id, timeline_status);
if was_there.is_some() {
// defensive check
warn!(
"Overwriting timeline init sync status. Status {:?} Timeline {}",
timeline_status, timeline_id
);
}
index_entry.set_awaits_download(awaits_download);
}
None => {
// TODO (rodionov) does this mean that we've crashed during tenant creation?
// is it safe to upload this checkpoint? could it be half broken?
new_sync_tasks.push_back(SyncTask::new(
sync_id,
0,
@@ -664,56 +687,18 @@ fn schedule_first_sync_tasks(
metadata: local_metadata,
}),
));
initial_timeline_statuses
local_timeline_init_statuses
.entry(tenant_id)
.or_default()
.insert(
timeline_id,
TimelineSyncState::Ready(local_disk_consistent_lsn),
);
.insert(timeline_id, LocalTimelineInitStatus::LocallyComplete);
}
}
}
let unprocessed_remote_ids = |remote_id: &ZTenantTimelineId| {
initial_timeline_statuses
.get(&remote_id.tenant_id)
.and_then(|timelines| timelines.get(&remote_id.timeline_id))
.is_none()
};
for unprocessed_remote_id in index
.all_sync_ids()
.filter(unprocessed_remote_ids)
.collect::<Vec<_>>()
{
let ZTenantTimelineId {
tenant_id: cloud_only_tenant_id,
timeline_id: cloud_only_timeline_id,
} = unprocessed_remote_id;
match index
.timeline_entry(&unprocessed_remote_id)
.and_then(TimelineIndexEntry::disk_consistent_lsn)
{
Some(remote_disk_consistent_lsn) => {
initial_timeline_statuses
.entry(cloud_only_tenant_id)
.or_default()
.insert(
cloud_only_timeline_id,
TimelineSyncState::CloudOnly(remote_disk_consistent_lsn),
);
}
None => error!(
"Failed to find disk consistent LSN for remote timeline {}",
unprocessed_remote_id
),
}
}
new_sync_tasks.into_iter().for_each(|task| {
sync_queue::push(task);
});
initial_timeline_statuses
local_timeline_init_statuses
}
fn compare_local_and_remote_timeline(
@@ -722,10 +707,21 @@ fn compare_local_and_remote_timeline(
local_metadata: TimelineMetadata,
local_files: Vec<PathBuf>,
remote_entry: &TimelineIndexEntry,
) -> Option<TimelineSyncState> {
) -> (LocalTimelineInitStatus, bool) {
let local_lsn = local_metadata.disk_consistent_lsn();
let uploads = remote_entry.uploaded_checkpoints();
let mut initial_timeline_status = LocalTimelineInitStatus::LocallyComplete;
let mut awaits_download = false;
// TODO probably here we need more sophisticated logic,
// if more data is available remotely can we just download whats there?
// without trying to upload something. It may be tricky, needs further investigation.
// For now looks strange that we can request upload
// and dowload for the same timeline simultaneously.
// (upload needs to be only for previously unsynced files, not whole timeline dir).
// If one of the tasks fails they will be reordered in the queue which can lead
// to timeline being stuck in evicted state
if !uploads.contains(&local_lsn) {
new_sync_tasks.push_back(SyncTask::new(
sync_id,
@@ -735,6 +731,7 @@ fn compare_local_and_remote_timeline(
metadata: local_metadata,
}),
));
// Note that status here doesnt change.
}
let uploads_count = uploads.len();
@@ -743,7 +740,7 @@ fn compare_local_and_remote_timeline(
.filter(|upload_lsn| upload_lsn <= &local_lsn)
.map(ArchiveId)
.collect();
Some(if archives_to_skip.len() != uploads_count {
if archives_to_skip.len() != uploads_count {
new_sync_tasks.push_back(SyncTask::new(
sync_id,
0,
@@ -752,10 +749,12 @@ fn compare_local_and_remote_timeline(
archives_to_skip,
}),
));
TimelineSyncState::AwaitsDownload(remote_entry.disk_consistent_lsn()?)
} else {
TimelineSyncState::Ready(remote_entry.disk_consistent_lsn().unwrap_or(local_lsn))
})
initial_timeline_status = LocalTimelineInitStatus::NeedsSync;
awaits_download = true;
// we do not need to manupulate with remote consistent lsn here
// because it will be updated when sync will be completed
}
(initial_timeline_status, awaits_download)
}
fn register_sync_status(sync_start: Instant, sync_name: &str, sync_status: Option<bool>) {
@@ -769,21 +768,23 @@ fn register_sync_status(sync_start: Instant, sync_name: &str, sync_status: Optio
.observe(secs_elapsed)
}
async fn update_index_description<
async fn fetch_full_index<
P: Send + Sync + 'static,
S: RemoteStorage<StoragePath = P> + Send + Sync + 'static,
>(
(storage, index): &(S, RwLock<RemoteTimelineIndex>),
(storage, index): &(S, Arc<RwLock<RemoteTimelineIndex>>),
timeline_dir: &Path,
id: ZTenantTimelineId,
) -> anyhow::Result<RemoteTimeline> {
let mut index_write = index.write().await;
let full_index = match index_write.timeline_entry(&id) {
let index_read = index.read().await;
let full_index = match index_read.timeline_entry(&id).map(|e| e.inner()) {
None => bail!("Timeline not found for sync id {}", id),
Some(TimelineIndexEntry::Full(_)) => bail!("Index is already populated for sync id {}", id),
Some(TimelineIndexEntry::Description(description)) => {
Some(TimelineIndexEntryInner::Full(_)) => {
bail!("Index is already populated for sync id {}", id)
}
Some(TimelineIndexEntryInner::Description(description)) => {
let mut archive_header_downloads = FuturesUnordered::new();
for (&archive_id, description) in description {
for (archive_id, description) in description {
archive_header_downloads.push(async move {
let header = download_archive_header(storage, timeline_dir, description)
.await
@@ -795,18 +796,22 @@ async fn update_index_description<
let mut full_index = RemoteTimeline::empty();
while let Some(header_data) = archive_header_downloads.next().await {
match header_data {
Ok((archive_id, header_size, header)) => full_index.update_archive_contents(archive_id.0, header, header_size),
Err((e, archive_id)) => bail!(
"Failed to download archive header for tenant {}, timeline {}, archive for Lsn {}: {}",
id.tenant_id, id.timeline_id, archive_id.0,
e
),
}
Ok((archive_id, header_size, header)) => full_index.update_archive_contents(archive_id.0, header, header_size),
Err((e, archive_id)) => bail!(
"Failed to download archive header for tenant {}, timeline {}, archive for Lsn {}: {}",
id.tenant_id, id.timeline_id, archive_id.0,
e
),
}
}
full_index
}
};
index_write.add_timeline_entry(id, TimelineIndexEntry::Full(full_index.clone()));
drop(index_read); // tokio rw lock is not upgradeable
let mut index_write = index.write().await;
index_write
.upgrade_timeline_entry(&id, full_index.clone())
.context("cannot upgrade timeline entry in remote index")?;
Ok(full_index)
}
@@ -850,7 +855,7 @@ mod test_utils {
#[track_caller]
pub async fn ensure_correct_timeline_upload(
harness: &RepoHarness,
remote_assets: Arc<(LocalFs, RwLock<RemoteTimelineIndex>)>,
remote_assets: Arc<(LocalFs, Arc<RwLock<RemoteTimelineIndex>>)>,
timeline_id: ZTimelineId,
new_upload: NewCheckpoint,
) {
@@ -909,11 +914,14 @@ mod test_utils {
}
pub async fn expect_timeline(
index: &RwLock<RemoteTimelineIndex>,
index: &Arc<RwLock<RemoteTimelineIndex>>,
sync_id: ZTenantTimelineId,
) -> RemoteTimeline {
if let Some(TimelineIndexEntry::Full(remote_timeline)) =
index.read().await.timeline_entry(&sync_id)
if let Some(TimelineIndexEntryInner::Full(remote_timeline)) = index
.read()
.await
.timeline_entry(&sync_id)
.map(|e| e.inner())
{
remote_timeline.clone()
} else {
@@ -926,7 +934,7 @@ mod test_utils {
#[track_caller]
pub async fn assert_index_descriptions(
index: &RwLock<RemoteTimelineIndex>,
index: &Arc<RwLock<RemoteTimelineIndex>>,
expected_index_with_descriptions: RemoteTimelineIndex,
) {
let index_read = index.read().await;
@@ -965,26 +973,26 @@ mod test_utils {
sync_id
)
});
let expected_timeline_description = match expected_timeline_description {
TimelineIndexEntry::Description(description) => description,
TimelineIndexEntry::Full(_) => panic!("Expected index entry for sync id {} is a full entry, while a description was expected", sync_id),
let expected_timeline_description = match expected_timeline_description.inner() {
TimelineIndexEntryInner::Description(description) => description,
TimelineIndexEntryInner::Full(_) => panic!("Expected index entry for sync id {} is a full entry, while a description was expected", sync_id),
};
match actual_timeline_entry {
TimelineIndexEntry::Description(actual_descriptions) => {
match actual_timeline_entry.inner() {
TimelineIndexEntryInner::Description(description) => {
assert_eq!(
actual_descriptions, expected_timeline_description,
description, expected_timeline_description,
"Index contains unexpected descriptions entry for sync id {}",
sync_id
)
}
TimelineIndexEntry::Full(actual_full_entry) => {
TimelineIndexEntryInner::Full(remote_timeline) => {
let expected_lsns = expected_timeline_description
.values()
.map(|description| description.disk_consistent_lsn)
.collect::<BTreeSet<_>>();
assert_eq!(
actual_full_entry.checkpoints().collect::<BTreeSet<_>>(),
remote_timeline.checkpoints().collect::<BTreeSet<_>>(),
expected_lsns,
"Timeline {} should have the same checkpoints uploaded",
sync_id,

View File

@@ -5,14 +5,14 @@ use std::{borrow::Cow, collections::BTreeSet, path::PathBuf, sync::Arc};
use anyhow::{ensure, Context};
use tokio::{fs, sync::RwLock};
use tracing::{debug, error, trace, warn};
use zenith_utils::{lsn::Lsn, zid::ZTenantId};
use zenith_utils::zid::ZTenantId;
use crate::{
config::PageServerConf,
layered_repository::metadata::{metadata_path, TimelineMetadata},
remote_storage::{
storage_sync::{
compression, index::TimelineIndexEntry, sync_queue, update_index_description, SyncKind,
compression, fetch_full_index, index::TimelineIndexEntryInner, sync_queue, SyncKind,
SyncTask,
},
RemoteStorage, ZTenantTimelineId,
@@ -30,10 +30,10 @@ pub(super) enum DownloadedTimeline {
Abort,
/// Remote timeline data is found, its latest checkpoint's metadata contents (disk_consistent_lsn) is known.
/// Initial download failed due to some error, the download task is rescheduled for another retry.
FailedAndRescheduled { disk_consistent_lsn: Lsn },
FailedAndRescheduled,
/// Remote timeline data is found, its latest checkpoint's metadata contents (disk_consistent_lsn) is known.
/// Initial download successful.
Successful { disk_consistent_lsn: Lsn },
Successful,
}
/// Attempts to download and uncompress files from all remote archives for the timeline given.
@@ -47,7 +47,7 @@ pub(super) async fn download_timeline<
S: RemoteStorage<StoragePath = P> + Send + Sync + 'static,
>(
conf: &'static PageServerConf,
remote_assets: Arc<(S, RwLock<RemoteTimelineIndex>)>,
remote_assets: Arc<(S, Arc<RwLock<RemoteTimelineIndex>>)>,
sync_id: ZTenantTimelineId,
mut download: TimelineDownload,
retries: u32,
@@ -58,19 +58,26 @@ pub(super) async fn download_timeline<
tenant_id,
timeline_id,
} = sync_id;
let index_read = remote_assets.1.read().await;
let index = &remote_assets.1;
let index_read = index.read().await;
let remote_timeline = match index_read.timeline_entry(&sync_id) {
None => {
error!("Cannot download: no timeline is present in the index for given ids");
error!("Cannot download: no timeline is present in the index for given id");
return DownloadedTimeline::Abort;
}
Some(index_entry) => match index_entry {
TimelineIndexEntry::Full(remote_timeline) => Cow::Borrowed(remote_timeline),
TimelineIndexEntry::Description(_) => {
Some(index_entry) => match index_entry.inner() {
TimelineIndexEntryInner::Full(remote_timeline) => Cow::Borrowed(remote_timeline),
TimelineIndexEntryInner::Description(_) => {
// we do not check here for awaits_download because it is ok
// to call this function while the download is in progress
// so it is not a concurrent download, it is the same one
let remote_disk_consistent_lsn = index_entry.disk_consistent_lsn();
drop(index_read);
debug!("Found timeline description for the given ids, downloading the full index");
match update_index_description(
match fetch_full_index(
remote_assets.as_ref(),
&conf.timeline_path(&timeline_id, &tenant_id),
sync_id,
@@ -80,16 +87,15 @@ pub(super) async fn download_timeline<
Ok(remote_timeline) => Cow::Owned(remote_timeline),
Err(e) => {
error!("Failed to download full timeline index: {:?}", e);
return match remote_disk_consistent_lsn {
Some(disk_consistent_lsn) => {
Some(_) => {
sync_queue::push(SyncTask::new(
sync_id,
retries,
SyncKind::Download(download),
));
DownloadedTimeline::FailedAndRescheduled {
disk_consistent_lsn,
}
DownloadedTimeline::FailedAndRescheduled
}
None => {
error!("Cannot download: no disk consistent Lsn is present for the index entry");
@@ -101,12 +107,9 @@ pub(super) async fn download_timeline<
}
},
};
let disk_consistent_lsn = match remote_timeline.checkpoints().max() {
Some(lsn) => lsn,
None => {
debug!("Cannot download: no disk consistent Lsn is present for the remote timeline");
return DownloadedTimeline::Abort;
}
if remote_timeline.checkpoints().max().is_none() {
debug!("Cannot download: no disk consistent Lsn is present for the remote timeline");
return DownloadedTimeline::Abort;
};
debug!("Downloading timeline archives");
@@ -125,7 +128,7 @@ pub(super) async fn download_timeline<
conf,
sync_id,
Arc::clone(&remote_assets),
remote_timeline.as_ref(),
&remote_timeline,
archive_id,
Arc::clone(&download.files_to_skip),
)
@@ -142,9 +145,7 @@ pub(super) async fn download_timeline<
retries,
SyncKind::Download(download),
));
return DownloadedTimeline::FailedAndRescheduled {
disk_consistent_lsn,
};
return DownloadedTimeline::FailedAndRescheduled;
}
Ok(()) => {
debug!("Successfully downloaded archive {:?}", archive_id);
@@ -154,9 +155,7 @@ pub(super) async fn download_timeline<
}
debug!("Finished downloading all timeline's archives");
DownloadedTimeline::Successful {
disk_consistent_lsn,
}
DownloadedTimeline::Successful
}
async fn try_download_archive<
@@ -168,7 +167,7 @@ async fn try_download_archive<
tenant_id,
timeline_id,
}: ZTenantTimelineId,
remote_assets: Arc<(S, RwLock<RemoteTimelineIndex>)>,
remote_assets: Arc<(S, Arc<RwLock<RemoteTimelineIndex>>)>,
remote_timeline: &RemoteTimeline,
archive_id: ArchiveId,
files_to_skip: Arc<BTreeSet<PathBuf>>,
@@ -256,13 +255,15 @@ mod tests {
let repo_harness = RepoHarness::create("test_download_timeline")?;
let sync_id = ZTenantTimelineId::new(repo_harness.tenant_id, TIMELINE_ID);
let storage = LocalFs::new(tempdir()?.path().to_owned(), &repo_harness.conf.workdir)?;
let index = RwLock::new(RemoteTimelineIndex::try_parse_descriptions_from_paths(
repo_harness.conf,
storage
.list()
.await?
.into_iter()
.map(|storage_path| storage.local_path(&storage_path).unwrap()),
let index = Arc::new(RwLock::new(
RemoteTimelineIndex::try_parse_descriptions_from_paths(
repo_harness.conf,
storage
.list()
.await?
.into_iter()
.map(|storage_path| storage.local_path(&storage_path).unwrap()),
),
));
let remote_assets = Arc::new((storage, index));
let storage = &remote_assets.0;

View File

@@ -11,7 +11,7 @@ use std::{
use anyhow::{bail, ensure, Context};
use serde::{Deserialize, Serialize};
use tracing::debug;
use tracing::*;
use zenith_utils::{
lsn::Lsn,
zid::{ZTenantId, ZTimelineId},
@@ -52,10 +52,16 @@ impl RelativePath {
/// Currently, timeline archive files are tracked only.
#[derive(Debug, Clone)]
pub struct RemoteTimelineIndex {
timeline_files: HashMap<ZTenantTimelineId, TimelineIndexEntry>,
timeline_entries: HashMap<ZTenantTimelineId, TimelineIndexEntry>,
}
impl RemoteTimelineIndex {
pub fn empty() -> Self {
Self {
timeline_entries: HashMap::new(),
}
}
/// Attempts to parse file paths (not checking the file contents) and find files
/// that can be tracked wiht the index.
/// On parse falures, logs the error and continues, so empty index can be created from not suitable paths.
@@ -63,9 +69,7 @@ impl RemoteTimelineIndex {
conf: &'static PageServerConf,
paths: impl Iterator<Item = P>,
) -> Self {
let mut index = Self {
timeline_files: HashMap::new(),
};
let mut index = Self::empty();
for path in paths {
if let Err(e) = try_parse_index_entry(&mut index, conf, path.as_ref()) {
debug!(
@@ -79,40 +83,100 @@ impl RemoteTimelineIndex {
}
pub fn timeline_entry(&self, id: &ZTenantTimelineId) -> Option<&TimelineIndexEntry> {
self.timeline_files.get(id)
self.timeline_entries.get(id)
}
pub fn timeline_entry_mut(
&mut self,
id: &ZTenantTimelineId,
) -> Option<&mut TimelineIndexEntry> {
self.timeline_files.get_mut(id)
self.timeline_entries.get_mut(id)
}
pub fn add_timeline_entry(&mut self, id: ZTenantTimelineId, entry: TimelineIndexEntry) {
self.timeline_files.insert(id, entry);
self.timeline_entries.insert(id, entry);
}
pub fn upgrade_timeline_entry(
&mut self,
id: &ZTenantTimelineId,
remote_timeline: RemoteTimeline,
) -> anyhow::Result<()> {
let mut entry = self.timeline_entries.get_mut(id).ok_or(anyhow::anyhow!(
"timeline is unexpectedly missing from remote index"
))?;
if !matches!(entry.inner, TimelineIndexEntryInner::Description(_)) {
anyhow::bail!("timeline entry is not a description entry")
};
entry.inner = TimelineIndexEntryInner::Full(remote_timeline);
Ok(())
}
pub fn all_sync_ids(&self) -> impl Iterator<Item = ZTenantTimelineId> + '_ {
self.timeline_files.keys().copied()
self.timeline_entries.keys().copied()
}
pub fn set_awaits_download(
&mut self,
id: &ZTenantTimelineId,
awaits_download: bool,
) -> anyhow::Result<()> {
self.timeline_entry_mut(id)
.ok_or_else(|| anyhow::anyhow!("unknown timeline sync {}", id))?
.set_awaits_download(awaits_download);
Ok(())
}
}
#[derive(Debug, Clone, PartialEq, Eq, Default)]
pub struct DescriptionTimelineIndexEntry {
pub description: BTreeMap<ArchiveId, ArchiveDescription>,
pub awaits_download: bool,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum TimelineIndexEntry {
/// An archive found on the remote storage, but not yet downloaded, only a metadata from its storage path is available, without archive contents.
pub struct FullTimelineIndexEntry {
pub remote_timeline: RemoteTimeline,
pub awaits_download: bool,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum TimelineIndexEntryInner {
Description(BTreeMap<ArchiveId, ArchiveDescription>),
/// Full archive metadata, including the file list, parsed from the archive header.
Full(RemoteTimeline),
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct TimelineIndexEntry {
inner: TimelineIndexEntryInner,
awaits_download: bool,
}
impl TimelineIndexEntry {
pub fn new(inner: TimelineIndexEntryInner, awaits_download: bool) -> Self {
Self {
inner,
awaits_download,
}
}
pub fn inner(&self) -> &TimelineIndexEntryInner {
&self.inner
}
pub fn inner_mut(&mut self) -> &mut TimelineIndexEntryInner {
&mut self.inner
}
pub fn uploaded_checkpoints(&self) -> BTreeSet<Lsn> {
match self {
Self::Description(description) => {
match &self.inner {
TimelineIndexEntryInner::Description(description) => {
description.keys().map(|archive_id| archive_id.0).collect()
}
Self::Full(remote_timeline) => remote_timeline
TimelineIndexEntryInner::Full(remote_timeline) => remote_timeline
.checkpoint_archives
.keys()
.map(|archive_id| archive_id.0)
@@ -122,17 +186,25 @@ impl TimelineIndexEntry {
/// Gets latest uploaded checkpoint's disk consisten Lsn for the corresponding timeline.
pub fn disk_consistent_lsn(&self) -> Option<Lsn> {
match self {
Self::Description(description) => {
match &self.inner {
TimelineIndexEntryInner::Description(description) => {
description.keys().map(|archive_id| archive_id.0).max()
}
Self::Full(remote_timeline) => remote_timeline
TimelineIndexEntryInner::Full(remote_timeline) => remote_timeline
.checkpoint_archives
.keys()
.map(|archive_id| archive_id.0)
.max(),
}
}
pub fn get_awaits_download(&self) -> bool {
self.awaits_download
}
pub fn set_awaits_download(&mut self, awaits_download: bool) {
self.awaits_download = awaits_download;
}
}
/// Checkpoint archive's id, corresponding to the `disk_consistent_lsn` from the timeline's metadata file during checkpointing.
@@ -331,13 +403,15 @@ fn try_parse_index_entry(
tenant_id,
timeline_id,
};
let timeline_index_entry = index
.timeline_files
.entry(sync_id)
.or_insert_with(|| TimelineIndexEntry::Description(BTreeMap::new()));
match timeline_index_entry {
TimelineIndexEntry::Description(descriptions) => {
descriptions.insert(
let timeline_index_entry = index.timeline_entries.entry(sync_id).or_insert_with(|| {
TimelineIndexEntry::new(
TimelineIndexEntryInner::Description(BTreeMap::default()),
false,
)
});
match timeline_index_entry.inner_mut() {
TimelineIndexEntryInner::Description(description) => {
description.insert(
ArchiveId(disk_consistent_lsn),
ArchiveDescription {
header_size,
@@ -346,7 +420,7 @@ fn try_parse_index_entry(
},
);
}
TimelineIndexEntry::Full(_) => {
TimelineIndexEntryInner::Full(_) => {
bail!("Cannot add parsed archive description to its full context in index with sync id {}", sync_id)
}
}

View File

@@ -10,9 +10,9 @@ use crate::{
config::PageServerConf,
remote_storage::{
storage_sync::{
compression,
index::{RemoteTimeline, TimelineIndexEntry},
sync_queue, update_index_description, SyncKind, SyncTask,
compression, fetch_full_index,
index::{RemoteTimeline, TimelineIndexEntry, TimelineIndexEntryInner},
sync_queue, SyncKind, SyncTask,
},
RemoteStorage, ZTenantTimelineId,
},
@@ -30,7 +30,7 @@ pub(super) async fn upload_timeline_checkpoint<
S: RemoteStorage<StoragePath = P> + Send + Sync + 'static,
>(
config: &'static PageServerConf,
remote_assets: Arc<(S, RwLock<RemoteTimelineIndex>)>,
remote_assets: Arc<(S, Arc<RwLock<RemoteTimelineIndex>>)>,
sync_id: ZTenantTimelineId,
new_checkpoint: NewCheckpoint,
retries: u32,
@@ -49,22 +49,24 @@ pub(super) async fn upload_timeline_checkpoint<
let index_read = index.read().await;
let remote_timeline = match index_read.timeline_entry(&sync_id) {
None => None,
Some(TimelineIndexEntry::Full(remote_timeline)) => Some(Cow::Borrowed(remote_timeline)),
Some(TimelineIndexEntry::Description(_)) => {
debug!("Found timeline description for the given ids, downloading the full index");
match update_index_description(remote_assets.as_ref(), &timeline_dir, sync_id).await {
Ok(remote_timeline) => Some(Cow::Owned(remote_timeline)),
Err(e) => {
error!("Failed to download full timeline index: {:?}", e);
sync_queue::push(SyncTask::new(
sync_id,
retries,
SyncKind::Upload(new_checkpoint),
));
return Some(false);
Some(entry) => match entry.inner() {
TimelineIndexEntryInner::Full(remote_timeline) => Some(Cow::Borrowed(remote_timeline)),
TimelineIndexEntryInner::Description(_) => {
debug!("Found timeline description for the given ids, downloading the full index");
match fetch_full_index(remote_assets.as_ref(), &timeline_dir, sync_id).await {
Ok(remote_timeline) => Some(Cow::Owned(remote_timeline)),
Err(e) => {
error!("Failed to download full timeline index: {:?}", e);
sync_queue::push(SyncTask::new(
sync_id,
retries,
SyncKind::Upload(new_checkpoint),
));
return Some(false);
}
}
}
}
},
};
let already_contains_upload_lsn = remote_timeline
@@ -95,22 +97,40 @@ pub(super) async fn upload_timeline_checkpoint<
{
Ok((archive_header, header_size)) => {
let mut index_write = index.write().await;
match index_write.timeline_entry_mut(&sync_id) {
Some(TimelineIndexEntry::Full(remote_timeline)) => {
remote_timeline.update_archive_contents(
new_checkpoint.metadata.disk_consistent_lsn(),
archive_header,
header_size,
);
}
None | Some(TimelineIndexEntry::Description(_)) => {
match index_write
.timeline_entry_mut(&sync_id)
.map(|e| e.inner_mut())
{
None => {
let mut new_timeline = RemoteTimeline::empty();
new_timeline.update_archive_contents(
new_checkpoint.metadata.disk_consistent_lsn(),
archive_header,
header_size,
);
index_write.add_timeline_entry(sync_id, TimelineIndexEntry::Full(new_timeline));
index_write.add_timeline_entry(
sync_id,
TimelineIndexEntry::new(TimelineIndexEntryInner::Full(new_timeline), false),
)
}
Some(TimelineIndexEntryInner::Full(remote_timeline)) => {
remote_timeline.update_archive_contents(
new_checkpoint.metadata.disk_consistent_lsn(),
archive_header,
header_size,
);
}
Some(TimelineIndexEntryInner::Description(_)) => {
let mut new_timeline = RemoteTimeline::empty();
new_timeline.update_archive_contents(
new_checkpoint.metadata.disk_consistent_lsn(),
archive_header,
header_size,
);
index_write.add_timeline_entry(
sync_id,
TimelineIndexEntry::new(TimelineIndexEntryInner::Full(new_timeline), false),
)
}
}
debug!("Checkpoint uploaded successfully");
@@ -136,7 +156,7 @@ async fn try_upload_checkpoint<
S: RemoteStorage<StoragePath = P> + Send + Sync + 'static,
>(
config: &'static PageServerConf,
remote_assets: Arc<(S, RwLock<RemoteTimelineIndex>)>,
remote_assets: Arc<(S, Arc<RwLock<RemoteTimelineIndex>>)>,
sync_id: ZTenantTimelineId,
new_checkpoint: &NewCheckpoint,
files_to_skip: BTreeSet<PathBuf>,
@@ -209,13 +229,15 @@ mod tests {
let repo_harness = RepoHarness::create("reupload_timeline")?;
let sync_id = ZTenantTimelineId::new(repo_harness.tenant_id, TIMELINE_ID);
let storage = LocalFs::new(tempdir()?.path().to_owned(), &repo_harness.conf.workdir)?;
let index = RwLock::new(RemoteTimelineIndex::try_parse_descriptions_from_paths(
repo_harness.conf,
storage
.list()
.await?
.into_iter()
.map(|storage_path| storage.local_path(&storage_path).unwrap()),
let index = Arc::new(RwLock::new(
RemoteTimelineIndex::try_parse_descriptions_from_paths(
repo_harness.conf,
storage
.list()
.await?
.into_iter()
.map(|storage_path| storage.local_path(&storage_path).unwrap()),
),
));
let remote_assets = Arc::new((storage, index));
let index = &remote_assets.1;
@@ -405,13 +427,15 @@ mod tests {
let repo_harness = RepoHarness::create("reupload_timeline_rejected")?;
let sync_id = ZTenantTimelineId::new(repo_harness.tenant_id, TIMELINE_ID);
let storage = LocalFs::new(tempdir()?.path().to_owned(), &repo_harness.conf.workdir)?;
let index = RwLock::new(RemoteTimelineIndex::try_parse_descriptions_from_paths(
repo_harness.conf,
storage
.list()
.await?
.into_iter()
.map(|storage_path| storage.local_path(&storage_path).unwrap()),
let index = Arc::new(RwLock::new(
RemoteTimelineIndex::try_parse_descriptions_from_paths(
repo_harness.conf,
storage
.list()
.await?
.into_iter()
.map(|storage_path| storage.local_path(&storage_path).unwrap()),
),
));
let remote_assets = Arc::new((storage, index));
let storage = &remote_assets.0;

View File

@@ -1,4 +1,6 @@
use crate::layered_repository::metadata::TimelineMetadata;
use crate::relish::*;
use crate::remote_storage::RemoteTimelineIndex;
use crate::walrecord::MultiXactMember;
use crate::CheckpointConfig;
use anyhow::Result;
@@ -6,6 +8,7 @@ use bytes::Bytes;
use postgres_ffi::{MultiXactId, MultiXactOffset, TransactionId};
use serde::{Deserialize, Serialize};
use std::collections::HashSet;
use std::fmt::Display;
use std::ops::{AddAssign, Deref};
use std::sync::{Arc, RwLockReadGuard};
use std::time::Duration;
@@ -15,30 +18,43 @@ use zenith_utils::zid::ZTimelineId;
/// Block number within a relish. This matches PostgreSQL's BlockNumber type.
pub type BlockNumber = u32;
#[derive(Clone, Copy, Debug)]
pub enum TimelineSyncStatusUpdate {
Uploaded,
Downloaded,
}
impl Display for TimelineSyncStatusUpdate {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let s = match self {
TimelineSyncStatusUpdate::Uploaded => "Uploaded",
TimelineSyncStatusUpdate::Downloaded => "Downloaded",
};
f.write_str(s)
}
}
///
/// A repository corresponds to one .zenith directory. One repository holds multiple
/// timelines, forked off from the same initial call to 'initdb'.
pub trait Repository: Send + Sync {
fn detach_timeline(&self, timeline_id: ZTimelineId) -> Result<()>;
/// Updates timeline based on the new sync state, received from the remote storage synchronization.
/// Updates timeline based on the `TimelineSyncStatusUpdate`, received from the remote storage synchronization.
/// See [`crate::remote_storage`] for more details about the synchronization.
fn set_timeline_state(
fn apply_timeline_remote_sync_status_update(
&self,
timeline_id: ZTimelineId,
new_state: TimelineSyncState,
timeline_sync_status_update: TimelineSyncStatusUpdate,
) -> Result<()>;
/// Gets current synchronization state of the timeline.
/// See [`crate::remote_storage`] for more details about the synchronization.
fn get_timeline_state(&self, timeline_id: ZTimelineId) -> Option<TimelineSyncState>;
/// Get Timeline handle for given zenith timeline ID.
fn get_timeline(&self, timelineid: ZTimelineId) -> Result<RepositoryTimeline>;
/// This function is idempotent. It doesnt change internal state in any way.
fn get_timeline(&self, timelineid: ZTimelineId) -> Option<RepositoryTimeline>;
/// Get Timeline handle for locally available timeline. Load it into memory if it is not loaded.
fn get_timeline_load(&self, timelineid: ZTimelineId) -> Result<Arc<dyn Timeline>>;
/// Lists timelines the repository contains.
/// Up to repository's implementation to omit certain timelines that ar not considered ready for use.
fn list_timelines(&self) -> Result<Vec<RepositoryTimeline>>;
fn list_timelines(&self) -> Vec<(ZTimelineId, RepositoryTimeline)>;
/// Create a new, empty timeline. The caller is responsible for loading data into it
/// Initdb lsn is provided for timeline impl to be able to perform checks for some operations against it.
@@ -70,72 +86,44 @@ pub trait Repository: Send + Sync {
/// perform one checkpoint iteration, flushing in-memory data on disk.
/// this function is periodically called by checkponter thread.
fn checkpoint_iteration(&self, cconf: CheckpointConfig) -> Result<()>;
/// detaches locally available timeline by stopping all threads and removing all the data.
fn detach_timeline(&self, timeline_id: ZTimelineId) -> Result<()>;
// Allows to retrieve remote timeline index from the repo. Used in walreceiver to grab remote consistent lsn.
fn get_remote_index(&self) -> &tokio::sync::RwLock<RemoteTimelineIndex>;
}
/// A timeline, that belongs to the current repository.
pub enum RepositoryTimeline {
/// Timeline, with its files present locally in pageserver's working directory.
/// Loaded into pageserver's memory and ready to be used.
Local {
id: ZTimelineId,
timeline: Arc<dyn Timeline>,
},
/// Timeline, found on the pageserver's remote storage, but not yet downloaded locally.
Remote {
id: ZTimelineId,
/// metadata contents of the latest successfully uploaded checkpoint
disk_consistent_lsn: Lsn,
Loaded(Arc<dyn Timeline>),
/// All the data is available locally, but not loaded into memory, so loading have to be done before actually using the timeline
Unloaded {
// It is ok to keep metadata here, because it is not changed when timeline is unloaded.
// FIXME can s3 sync actually change it? It can change it when timeline is in awaiting download state.
// but we currently do not download something for the timeline once it is local (even if there are new checkpoints) is it correct?
// also it is not that good to keep TimelineMetadata here, because it is layered repo implementation detail
metadata: TimelineMetadata,
},
}
impl RepositoryTimeline {
pub fn local_timeline(&self) -> Option<Arc<dyn Timeline>> {
if let Self::Local { timeline, .. } = self {
Some(Arc::clone(timeline))
} else {
None
}
}
pub fn id(&self) -> ZTimelineId {
match self {
Self::Local { id, .. } => *id,
Self::Remote { id, .. } => *id,
}
}
}
/// A state of the timeline synchronization with the remote storage.
/// Contains `disk_consistent_lsn` of the corresponding remote timeline (latest checkpoint's disk_consistent_lsn).
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum TimelineSyncState {
/// No further downloads from the remote storage are needed.
/// The timeline state is up-to-date or ahead of the remote storage one,
/// ready to be used in any pageserver operation.
Ready(Lsn),
/// Timeline is scheduled for downloading, but its current local state is not up to date with the remote storage.
/// The timeline is not ready to be used in any pageserver operations, otherwise it might diverge its local state from the remote version,
/// making it impossible to sync it further.
AwaitsDownload(Lsn),
/// Timeline was not in the pageserver's local working directory, but was found on the remote storage, ready to be downloaded.
/// Cannot be used in any pageserver operations due to complete absence locally.
CloudOnly(Lsn),
/// Timeline was evicted from the pageserver's local working directory due to conflicting remote and local states or too many errors during the synchronization.
/// Such timelines cannot have their state synchronized further and may not have the data about remote timeline's disk_consistent_lsn, since eviction may happen
/// due to errors before the remote timeline contents is known.
Evicted(Option<Lsn>),
pub enum LocalTimelineState {
// timeline is loaded into memory (with layer map and all the bits),
Loaded,
// timeline is on disk locally and ready to be loaded into memory.
Unloaded,
}
impl TimelineSyncState {
pub fn remote_disk_consistent_lsn(&self) -> Option<Lsn> {
Some(match self {
TimelineSyncState::Evicted(None) => return None,
TimelineSyncState::Ready(lsn) => lsn,
TimelineSyncState::AwaitsDownload(lsn) => lsn,
TimelineSyncState::CloudOnly(lsn) => lsn,
TimelineSyncState::Evicted(Some(lsn)) => lsn,
})
.copied()
impl<'a> From<&'a RepositoryTimeline> for LocalTimelineState {
fn from(local_timeline_entry: &'a RepositoryTimeline) -> Self {
match local_timeline_entry {
RepositoryTimeline::Loaded(_) => LocalTimelineState::Loaded,
RepositoryTimeline::Unloaded { .. } => LocalTimelineState::Unloaded,
}
}
}
@@ -362,7 +350,7 @@ pub mod repo_harness {
use crate::{
config::PageServerConf,
layered_repository::{LayeredRepository, TIMELINES_SEGMENT_NAME},
layered_repository::LayeredRepository,
walredo::{WalRedoError, WalRedoManager},
};
@@ -395,7 +383,6 @@ pub mod repo_harness {
let repo_dir = PageServerConf::test_repo_dir(test_name);
let _ = fs::remove_dir_all(&repo_dir);
fs::create_dir_all(&repo_dir)?;
fs::create_dir_all(&repo_dir.join(TIMELINES_SEGMENT_NAME))?;
let conf = PageServerConf::dummy_conf(repo_dir);
// Make a static copy of the config. This can never be free'd, but that's
@@ -404,19 +391,45 @@ pub mod repo_harness {
let tenant_id = ZTenantId::generate();
fs::create_dir_all(conf.tenant_path(&tenant_id))?;
fs::create_dir_all(conf.timelines_path(&tenant_id))?;
Ok(Self { conf, tenant_id })
}
pub fn load(&self) -> Box<dyn Repository> {
self.try_load().expect("failed to load test repo")
}
pub fn try_load(&self) -> Result<Box<dyn Repository>> {
let walredo_mgr = Arc::new(TestRedoManager);
Box::new(LayeredRepository::new(
let repo = Box::new(LayeredRepository::new(
self.conf,
walredo_mgr,
self.tenant_id,
Arc::new(tokio::sync::RwLock::new(RemoteTimelineIndex::empty())),
false,
))
));
// populate repo with locally available timelines
for timeline_dir_entry in fs::read_dir(self.conf.timelines_path(&self.tenant_id))
.expect("should be able to read timelines dir")
{
let timeline_dir_entry = timeline_dir_entry.unwrap();
let timeline_id: ZTimelineId = timeline_dir_entry
.path()
.file_name()
.unwrap()
.to_string_lossy()
.parse()
.unwrap();
repo.apply_timeline_remote_sync_status_update(
timeline_id,
TimelineSyncStatusUpdate::Downloaded,
)?;
}
Ok(repo)
}
pub fn timeline_path(&self, timeline_id: &ZTimelineId) -> PathBuf {
@@ -835,10 +848,9 @@ mod tests {
// Create a branch, check that the relation is visible there
repo.branch_timeline(TIMELINE_ID, NEW_TIMELINE_ID, Lsn(0x30))?;
let newtline = match repo.get_timeline(NEW_TIMELINE_ID)?.local_timeline() {
Some(timeline) => timeline,
None => panic!("Should have a local timeline"),
};
let newtline = repo
.get_timeline_load(NEW_TIMELINE_ID)
.expect("Should have a local timeline");
let new_writer = newtline.writer();
assert!(newtline
@@ -896,10 +908,9 @@ mod tests {
// Branch the history, modify relation differently on the new timeline
repo.branch_timeline(TIMELINE_ID, NEW_TIMELINE_ID, Lsn(0x30))?;
let newtline = match repo.get_timeline(NEW_TIMELINE_ID)?.local_timeline() {
Some(timeline) => timeline,
None => panic!("Should have a local timeline"),
};
let newtline = repo
.get_timeline_load(NEW_TIMELINE_ID)
.expect("Should have a local timeline");
let new_writer = newtline.writer();
new_writer.put_page_image(TESTREL_A, 0, Lsn(0x40), TEST_IMG("bar blk 0 at 4"))?;
@@ -1046,11 +1057,9 @@ mod tests {
make_some_layers(&tline, Lsn(0x20))?;
repo.branch_timeline(TIMELINE_ID, NEW_TIMELINE_ID, Lsn(0x40))?;
let newtline = match repo.get_timeline(NEW_TIMELINE_ID)?.local_timeline() {
Some(timeline) => timeline,
None => panic!("Should have a local timeline"),
};
let newtline = repo
.get_timeline_load(NEW_TIMELINE_ID)
.expect("Should have a local timeline");
// this removes layers before lsn 40 (50 minus 10), so there are two remaining layers, image and delta for 31-50
repo.gc_iteration(Some(TIMELINE_ID), 0x10, false)?;
assert!(newtline.get_page_at_lsn(TESTREL_A, 0, Lsn(0x25)).is_ok());
@@ -1067,10 +1076,9 @@ mod tests {
make_some_layers(&tline, Lsn(0x20))?;
repo.branch_timeline(TIMELINE_ID, NEW_TIMELINE_ID, Lsn(0x40))?;
let newtline = match repo.get_timeline(NEW_TIMELINE_ID)?.local_timeline() {
Some(timeline) => timeline,
None => panic!("Should have a local timeline"),
};
let newtline = repo
.get_timeline_load(NEW_TIMELINE_ID)
.expect("Should have a local timeline");
make_some_layers(&newtline, Lsn(0x60))?;
@@ -1143,4 +1151,81 @@ mod tests {
Ok(())
}
#[test]
fn timeline_load() -> Result<()> {
const TEST_NAME: &str = "timeline_load";
let harness = RepoHarness::create(TEST_NAME)?;
{
let repo = harness.load();
let tline = repo.create_empty_timeline(TIMELINE_ID, Lsn(0x8000))?;
make_some_layers(&tline, Lsn(0x8000))?;
tline.checkpoint(CheckpointConfig::Forced)?;
}
let repo = harness.load();
let tline = repo
.get_timeline(TIMELINE_ID)
.expect("cannot load timeline");
assert!(matches!(tline, RepositoryTimeline::Unloaded { .. }));
assert!(repo.get_timeline_load(TIMELINE_ID).is_ok());
let tline = repo
.get_timeline(TIMELINE_ID)
.expect("cannot load timeline");
assert!(matches!(tline, RepositoryTimeline::Loaded(_)));
Ok(())
}
#[test]
fn timeline_load_with_ancestor() -> Result<()> {
const TEST_NAME: &str = "timeline_load";
let harness = RepoHarness::create(TEST_NAME)?;
// create two timelines
{
let repo = harness.load();
let tline = repo.create_empty_timeline(TIMELINE_ID, Lsn(0))?;
make_some_layers(&tline, Lsn(0x20))?;
tline.checkpoint(CheckpointConfig::Forced)?;
repo.branch_timeline(TIMELINE_ID, NEW_TIMELINE_ID, Lsn(0x40))?;
let newtline = repo
.get_timeline_load(NEW_TIMELINE_ID)
.expect("Should have a local timeline");
make_some_layers(&newtline, Lsn(0x60))?;
tline.checkpoint(CheckpointConfig::Forced)?;
}
// check that both of them are initially unloaded
let repo = harness.load();
{
let tline = repo.get_timeline(TIMELINE_ID).expect("cannot get timeline");
assert!(matches!(tline, RepositoryTimeline::Unloaded { .. }));
let tline = repo
.get_timeline(NEW_TIMELINE_ID)
.expect("cannot get timeline");
assert!(matches!(tline, RepositoryTimeline::Unloaded { .. }));
}
// load only child timeline
let _ = repo
.get_timeline_load(NEW_TIMELINE_ID)
.expect("cannot load timeline");
// check that both, child and ancestor are loaded
let tline = repo
.get_timeline(NEW_TIMELINE_ID)
.expect("cannot get timeline");
assert!(matches!(tline, RepositoryTimeline::Loaded(_)));
let tline = repo.get_timeline(TIMELINE_ID).expect("cannot get timeline");
assert!(matches!(tline, RepositoryTimeline::Loaded(_)));
Ok(())
}
}

View File

@@ -3,16 +3,19 @@
use crate::config::PageServerConf;
use crate::layered_repository::LayeredRepository;
use crate::repository::{Repository, Timeline, TimelineSyncState};
use crate::remote_storage::RemoteTimelineIndex;
use crate::repository::{Repository, Timeline, TimelineSyncStatusUpdate};
use crate::thread_mgr;
use crate::thread_mgr::ThreadKind;
use crate::timelines;
use crate::timelines::CreateRepo;
use crate::walredo::PostgresRedoManager;
use crate::CheckpointConfig;
use anyhow::{Context, Result};
use lazy_static::lazy_static;
use log::*;
use serde::{Deserialize, Serialize};
use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::fmt;
use std::sync::{Arc, Mutex, MutexGuard};
@@ -57,79 +60,67 @@ fn access_tenants() -> MutexGuard<'static, HashMap<ZTenantId, Tenant>> {
TENANTS.lock().unwrap()
}
/// Updates tenants' repositories, changing their timelines state in memory.
pub fn set_timeline_states(
// Sets up wal redo manager and repository for tenant. Reduces code duplocation.
// Used during pageserver startup, or when new tenant is attached to pageserver.
pub fn load_local_repo(
conf: &'static PageServerConf,
timeline_states: HashMap<ZTenantId, HashMap<ZTimelineId, TimelineSyncState>>,
) {
if timeline_states.is_empty() {
debug!("no timeline state updates to perform");
return;
}
info!("Updating states for {} timelines", timeline_states.len());
trace!("States: {:?}", timeline_states);
tenant_id: ZTenantId,
remote_index: &Arc<tokio::sync::RwLock<RemoteTimelineIndex>>,
) -> Arc<dyn Repository> {
let mut m = access_tenants();
for (tenant_id, timeline_states) in timeline_states {
let tenant = m.entry(tenant_id).or_insert_with(|| {
// TODO (rodionov) reuse one of the initialisation routines
// Set up a WAL redo manager, for applying WAL records.
let walredo_mgr = PostgresRedoManager::new(conf, tenant_id);
let tenant = m.entry(tenant_id).or_insert_with(|| {
// Set up a WAL redo manager, for applying WAL records.
let walredo_mgr = PostgresRedoManager::new(conf, tenant_id);
// Set up an object repository, for actual data storage.
let repo: Arc<dyn Repository> = Arc::new(LayeredRepository::new(
conf,
Arc::new(walredo_mgr),
tenant_id,
conf.remote_storage_config.is_some(),
));
Tenant {
state: TenantState::Idle,
repo,
}
});
if let Err(e) = put_timelines_into_tenant(tenant, tenant_id, timeline_states) {
error!(
"Failed to update timeline states for tenant {}: {:?}",
tenant_id, e
);
// Set up an object repository, for actual data storage.
let repo: Arc<dyn Repository> = Arc::new(LayeredRepository::new(
conf,
Arc::new(walredo_mgr),
tenant_id,
Arc::clone(remote_index),
conf.remote_storage_config.is_some(),
));
Tenant {
state: TenantState::Idle,
repo,
}
}
});
Arc::clone(&tenant.repo)
}
fn put_timelines_into_tenant(
tenant: &mut Tenant,
tenant_id: ZTenantId,
timeline_states: HashMap<ZTimelineId, TimelineSyncState>,
) -> anyhow::Result<()> {
for (timeline_id, timeline_state) in timeline_states {
// If the timeline is being put into any other state than Ready,
// stop any threads operating on it.
//
// FIXME: This is racy. A page service thread could just get
// handle on the Timeline, before we call set_timeline_state()
if !matches!(timeline_state, TimelineSyncState::Ready(_)) {
thread_mgr::shutdown_threads(None, Some(tenant_id), Some(timeline_id));
// Should we run a final checkpoint to flush all the data to
// disk? Doesn't seem necessary; all of the states other than
// Ready imply that the data on local disk is corrupt or incomplete,
// and we don't want to flush that to disk.
}
tenant
.repo
.set_timeline_state(timeline_id, timeline_state)
.with_context(|| {
format!(
"Failed to update timeline {} state to {:?}",
timeline_id, timeline_state
)
})?;
/// Updates tenants' repositories, changing their timelines state in memory.
pub fn apply_timeline_sync_status_updates(
conf: &'static PageServerConf,
remote_index: Arc<tokio::sync::RwLock<RemoteTimelineIndex>>,
sync_status_updates: HashMap<ZTenantId, HashMap<ZTimelineId, TimelineSyncStatusUpdate>>,
) {
if sync_status_updates.is_empty() {
debug!("no sync status updates to apply");
return;
}
info!(
"Applying sync status updates for {} timelines",
sync_status_updates.len()
);
trace!("Sync status updates: {:?}", sync_status_updates);
Ok(())
for (tenant_id, tenant_timelines_sync_status_updates) in sync_status_updates {
let repo = load_local_repo(conf, tenant_id, &remote_index);
for (timeline_id, timeline_sync_status_update) in tenant_timelines_sync_status_updates {
match repo.apply_timeline_remote_sync_status_update(timeline_id, timeline_sync_status_update)
{
Ok(_) => debug!(
"successfully applied timeline sync status update: {} -> {}",
timeline_id, timeline_sync_status_update
),
Err(e) => error!(
"Failed to apply timeline sync status update for tenant {}. timeline {} update {} Error: {:#}",
tenant_id, timeline_id, timeline_sync_status_update, e
),
}
}
}
}
///
@@ -179,24 +170,30 @@ pub fn shutdown_all_tenants() {
pub fn create_tenant_repository(
conf: &'static PageServerConf,
new_tenant_id: Option<ZTenantId>,
tenantid: ZTenantId,
remote_index: Arc<tokio::sync::RwLock<RemoteTimelineIndex>>,
) -> Result<Option<ZTenantId>> {
let new_tenant_id = new_tenant_id.unwrap_or_else(ZTenantId::generate);
let wal_redo_manager = Arc::new(PostgresRedoManager::new(conf, new_tenant_id));
match timelines::create_repo(conf, new_tenant_id, wal_redo_manager)? {
Some(repo) => {
access_tenants()
.entry(new_tenant_id)
.or_insert_with(|| Tenant {
state: TenantState::Idle,
repo,
});
Ok(Some(new_tenant_id))
}
None => {
debug!("repository already exists for tenant {}", new_tenant_id);
match access_tenants().entry(tenantid) {
Entry::Occupied(_) => {
debug!("tenant {} already exists", tenantid);
Ok(None)
}
Entry::Vacant(v) => {
let wal_redo_manager = Arc::new(PostgresRedoManager::new(conf, tenantid));
let repo = timelines::create_repo(
conf,
tenantid,
CreateRepo::Real {
wal_redo_manager,
remote_index,
},
)?;
v.insert(Tenant {
state: TenantState::Idle,
repo,
});
Ok(Some(tenantid))
}
}
}
@@ -255,19 +252,19 @@ pub fn get_repository_for_tenant(tenantid: ZTenantId) -> Result<Arc<dyn Reposito
let m = access_tenants();
let tenant = m
.get(&tenantid)
.with_context(|| format!("Tenant not found for tenant {}", tenantid))?;
.with_context(|| format!("Tenant {} not found", tenantid))?;
Ok(Arc::clone(&tenant.repo))
}
pub fn get_timeline_for_tenant(
// Retrieve timeline for tenant. Load it into memory if it is not already loaded
pub fn get_timeline_for_tenant_load(
tenantid: ZTenantId,
timelineid: ZTimelineId,
) -> Result<Arc<dyn Timeline>> {
get_repository_for_tenant(tenantid)?
.get_timeline(timelineid)?
.local_timeline()
.with_context(|| format!("cannot fetch timeline {}", timelineid))
.get_timeline_load(timelineid)
.with_context(|| format!("Timeline {} not found for tenant {}", timelineid, tenantid))
}
#[derive(Serialize, Deserialize, Clone)]

View File

@@ -2,8 +2,9 @@
//! Timeline management code
//
use anyhow::{anyhow, bail, Context, Result};
use anyhow::{bail, Context, Result};
use postgres_ffi::ControlFileData;
use serde::{Deserialize, Serialize};
use std::{
fs,
path::Path,
@@ -12,135 +13,126 @@ use std::{
};
use tracing::*;
use zenith_utils::lsn::Lsn;
use zenith_utils::zid::{ZTenantId, ZTimelineId};
use zenith_utils::zid::{ZTenantId, ZTenantTimelineId, ZTimelineId};
use zenith_utils::{crashsafe_dir, logging};
use zenith_utils::{lsn::Lsn, zid::HexZTimelineId};
use crate::{config::PageServerConf, repository::Repository};
use crate::{
config::PageServerConf,
layered_repository::metadata::TimelineMetadata,
remote_storage::RemoteTimelineIndex,
repository::{LocalTimelineState, Repository},
};
use crate::{import_datadir, LOG_FILE_NAME};
use crate::{layered_repository::LayeredRepository, walredo::WalRedoManager};
use crate::{repository::RepositoryTimeline, tenant_mgr};
use crate::{repository::Timeline, CheckpointConfig};
#[derive(Clone)]
pub enum TimelineInfo {
Local {
timeline_id: ZTimelineId,
tenant_id: ZTenantId,
last_record_lsn: Lsn,
prev_record_lsn: Lsn,
ancestor_timeline_id: Option<ZTimelineId>,
ancestor_lsn: Option<Lsn>,
disk_consistent_lsn: Lsn,
current_logical_size: usize,
current_logical_size_non_incremental: Option<usize>,
},
Remote {
timeline_id: ZTimelineId,
tenant_id: ZTenantId,
disk_consistent_lsn: Lsn,
},
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct LocalTimelineInfo {
pub ancestor_timeline_id: Option<HexZTimelineId>,
pub ancestor_lsn: Option<Lsn>,
pub last_record_lsn: Lsn,
pub prev_record_lsn: Option<Lsn>,
pub disk_consistent_lsn: Lsn,
pub current_logical_size: Option<usize>, // is None when timeline is Unloaded
pub current_logical_size_non_incremental: Option<usize>,
pub timeline_state: LocalTimelineState,
}
impl TimelineInfo {
pub fn from_repo_timeline(
tenant_id: ZTenantId,
repo_timeline: RepositoryTimeline,
include_non_incremental_logical_size: bool,
) -> Self {
match repo_timeline {
RepositoryTimeline::Local { id, timeline } => {
let ancestor_timeline_id = timeline.get_ancestor_timeline_id();
let ancestor_lsn = if ancestor_timeline_id.is_some() {
Some(timeline.get_ancestor_lsn())
} else {
None
};
Self::Local {
timeline_id: id,
tenant_id,
last_record_lsn: timeline.get_last_record_lsn(),
prev_record_lsn: timeline.get_prev_record_lsn(),
ancestor_timeline_id,
ancestor_lsn,
disk_consistent_lsn: timeline.get_disk_consistent_lsn(),
current_logical_size: timeline.get_current_logical_size(),
current_logical_size_non_incremental: get_current_logical_size_non_incremental(
include_non_incremental_logical_size,
timeline.as_ref(),
),
}
}
RepositoryTimeline::Remote {
id,
disk_consistent_lsn,
} => Self::Remote {
timeline_id: id,
tenant_id,
disk_consistent_lsn,
},
}
}
pub fn from_dyn_timeline(
tenant_id: ZTenantId,
timeline_id: ZTimelineId,
impl LocalTimelineInfo {
pub fn from_loaded_timeline(
timeline: &dyn Timeline,
include_non_incremental_logical_size: bool,
) -> Self {
let ancestor_timeline_id = timeline.get_ancestor_timeline_id();
let ancestor_lsn = if ancestor_timeline_id.is_some() {
Some(timeline.get_ancestor_lsn())
} else {
None
};
Self::Local {
timeline_id,
tenant_id,
last_record_lsn: timeline.get_last_record_lsn(),
prev_record_lsn: timeline.get_prev_record_lsn(),
ancestor_timeline_id,
ancestor_lsn,
) -> anyhow::Result<Self> {
let last_record_lsn = timeline.get_last_record_lsn();
let info = LocalTimelineInfo {
ancestor_timeline_id: timeline
.get_ancestor_timeline_id()
.map(HexZTimelineId::from),
ancestor_lsn: {
match timeline.get_ancestor_lsn() {
Lsn(0) => None,
lsn @ Lsn(_) => Some(lsn),
}
},
disk_consistent_lsn: timeline.get_disk_consistent_lsn(),
current_logical_size: timeline.get_current_logical_size(),
current_logical_size_non_incremental: get_current_logical_size_non_incremental(
include_non_incremental_logical_size,
timeline,
),
last_record_lsn,
prev_record_lsn: Some(timeline.get_prev_record_lsn()),
timeline_state: LocalTimelineState::Loaded,
current_logical_size: Some(timeline.get_current_logical_size()),
current_logical_size_non_incremental: if include_non_incremental_logical_size {
Some(timeline.get_current_logical_size_non_incremental(last_record_lsn)?)
} else {
None
},
};
Ok(info)
}
pub fn from_unloaded_timeline(metadata: &TimelineMetadata) -> Self {
LocalTimelineInfo {
ancestor_timeline_id: metadata.ancestor_timeline().map(HexZTimelineId::from),
ancestor_lsn: {
match metadata.ancestor_lsn() {
Lsn(0) => None,
lsn @ Lsn(_) => Some(lsn),
}
},
disk_consistent_lsn: metadata.disk_consistent_lsn(),
last_record_lsn: metadata.disk_consistent_lsn(),
prev_record_lsn: metadata.prev_record_lsn(),
timeline_state: LocalTimelineState::Unloaded,
current_logical_size: None,
current_logical_size_non_incremental: None,
}
}
pub fn timeline_id(&self) -> ZTimelineId {
match *self {
TimelineInfo::Local { timeline_id, .. } => timeline_id,
TimelineInfo::Remote { timeline_id, .. } => timeline_id,
}
}
pub fn tenant_id(&self) -> ZTenantId {
match *self {
TimelineInfo::Local { tenant_id, .. } => tenant_id,
TimelineInfo::Remote { tenant_id, .. } => tenant_id,
pub fn from_repo_timeline(
repo_timeline: RepositoryTimeline,
include_non_incremental_logical_size: bool,
) -> anyhow::Result<Self> {
match repo_timeline {
RepositoryTimeline::Loaded(timeline) => {
Self::from_loaded_timeline(timeline.as_ref(), include_non_incremental_logical_size)
}
RepositoryTimeline::Unloaded { metadata } => {
Ok(Self::from_unloaded_timeline(&metadata))
}
}
}
}
fn get_current_logical_size_non_incremental(
include_non_incremental_logical_size: bool,
timeline: &dyn Timeline,
) -> Option<usize> {
if !include_non_incremental_logical_size {
return None;
}
match timeline.get_current_logical_size_non_incremental(timeline.get_last_record_lsn()) {
Ok(size) => Some(size),
Err(e) => {
error!("Failed to get non-incremental logical size: {:?}", e);
None
}
}
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct RemoteTimelineInfo {
pub remote_consistent_lsn: Option<Lsn>,
pub awaits_download: bool,
}
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct TimelineInfo {
#[serde(with = "hex")]
pub tenant_id: ZTenantId,
#[serde(with = "hex")]
pub timeline_id: ZTimelineId,
pub local: Option<LocalTimelineInfo>,
pub remote: Option<RemoteTimelineInfo>,
}
pub fn extract_remote_timeline_info(
tenant_id: ZTenantId,
timeline_id: ZTimelineId,
remote_index: &RemoteTimelineIndex,
) -> Option<RemoteTimelineInfo> {
remote_index
.timeline_entry(&ZTenantTimelineId {
tenant_id,
timeline_id,
})
.map(|remote_entry| RemoteTimelineInfo {
remote_consistent_lsn: remote_entry.disk_consistent_lsn(),
awaits_download: remote_entry.get_awaits_download(),
})
}
#[derive(Debug, Clone, Copy)]
@@ -158,25 +150,12 @@ pub fn init_pageserver(
// use true as daemonize parameter because otherwise we pollute zenith cli output with a few pages long output of info messages
let _log_file = logging::init(LOG_FILE_NAME, true)?;
// We don't use the real WAL redo manager, because we don't want to spawn the WAL redo
// process during repository initialization.
//
// FIXME: That caused trouble, because the WAL redo manager spawned a thread that launched
// initdb in the background, and it kept running even after the "zenith init" had exited.
// In tests, we started the page server immediately after that, so that initdb was still
// running in the background, and we failed to run initdb again in the same directory. This
// has been solved for the rapid init+start case now, but the general race condition remains
// if you restart the server quickly. The WAL redo manager doesn't use a separate thread
// anymore, but I think that could still happen.
let dummy_redo_mgr = Arc::new(crate::walredo::DummyRedoManager {});
crashsafe_dir::create_dir_all(conf.tenants_path())?;
if let Some(tenant_id) = create_tenant {
println!("initializing tenantid {}", tenant_id);
let repo = create_repo(conf, tenant_id, dummy_redo_mgr)
.context("failed to create repo")?
.ok_or_else(|| anyhow!("For newely created pageserver, found already existing repository for tenant {}", tenant_id))?;
let repo =
create_repo(conf, tenant_id, CreateRepo::Dummy).context("failed to create repo")?;
let new_timeline_id = initial_timeline_id.unwrap_or_else(ZTimelineId::generate);
bootstrap_timeline(conf, tenant_id, new_timeline_id, repo.as_ref())
.context("failed to create initial timeline")?;
@@ -189,15 +168,45 @@ pub fn init_pageserver(
Ok(())
}
pub enum CreateRepo {
Real {
wal_redo_manager: Arc<dyn WalRedoManager + Send + Sync>,
remote_index: Arc<tokio::sync::RwLock<RemoteTimelineIndex>>,
},
Dummy,
}
pub fn create_repo(
conf: &'static PageServerConf,
tenant_id: ZTenantId,
wal_redo_manager: Arc<dyn WalRedoManager + Send + Sync>,
) -> Result<Option<Arc<dyn Repository>>> {
create_repo: CreateRepo,
) -> Result<Arc<dyn Repository>> {
let (wal_redo_manager, remote_index) = match create_repo {
CreateRepo::Real {
wal_redo_manager,
remote_index,
} => (wal_redo_manager, remote_index),
CreateRepo::Dummy => {
// We don't use the real WAL redo manager, because we don't want to spawn the WAL redo
// process during repository initialization.
//
// FIXME: That caused trouble, because the WAL redo manager spawned a thread that launched
// initdb in the background, and it kept running even after the "zenith init" had exited.
// In tests, we started the page server immediately after that, so that initdb was still
// running in the background, and we failed to run initdb again in the same directory. This
// has been solved for the rapid init+start case now, but the general race condition remains
// if you restart the server quickly. The WAL redo manager doesn't use a separate thread
// anymore, but I think that could still happen.
let wal_redo_manager = Arc::new(crate::walredo::DummyRedoManager {});
let remote_index = Arc::new(tokio::sync::RwLock::new(RemoteTimelineIndex::empty()));
(wal_redo_manager as _, remote_index)
}
};
let repo_dir = conf.tenant_path(&tenant_id);
if repo_dir.exists() {
debug!("repo for {} already exists", tenant_id);
return Ok(None);
bail!("tenant {} directory already exists", tenant_id);
}
// top-level dir may exist if we are creating it through CLI
@@ -206,12 +215,13 @@ pub fn create_repo(
crashsafe_dir::create_dir(conf.timelines_path(&tenant_id))?;
info!("created directory structure in {}", repo_dir.display());
Ok(Some(Arc::new(LayeredRepository::new(
Ok(Arc::new(LayeredRepository::new(
conf,
wal_redo_manager,
tenant_id,
remote_index,
conf.remote_storage_config.is_some(),
))))
)))
}
// Returns checkpoint LSN from controlfile
@@ -299,30 +309,25 @@ fn bootstrap_timeline(
Ok(timeline)
}
pub(crate) fn get_timelines(
pub(crate) fn get_local_timelines(
tenant_id: ZTenantId,
include_non_incremental_logical_size: bool,
) -> Result<Vec<TimelineInfo>> {
) -> Result<Vec<(ZTimelineId, LocalTimelineInfo)>> {
let repo = tenant_mgr::get_repository_for_tenant(tenant_id)
.with_context(|| format!("Failed to get repo for tenant {}", tenant_id))?;
let repo_timelines = repo.list_timelines();
Ok(repo
.list_timelines()
.with_context(|| format!("Failed to list timelines for tenant {}", tenant_id))?
.into_iter()
.filter_map(|timeline| match timeline {
RepositoryTimeline::Local { timeline, id } => Some((id, timeline)),
RepositoryTimeline::Remote { .. } => None,
})
.map(|(timeline_id, timeline)| {
TimelineInfo::from_dyn_timeline(
tenant_id,
timeline_id,
timeline.as_ref(),
let mut local_timeline_info = Vec::with_capacity(repo_timelines.len());
for (timeline_id, repository_timeline) in repo_timelines {
local_timeline_info.push((
timeline_id,
LocalTimelineInfo::from_repo_timeline(
repository_timeline,
include_non_incremental_logical_size,
)
})
.collect())
)?,
))
}
Ok(local_timeline_info)
}
pub(crate) fn create_timeline(
@@ -336,16 +341,8 @@ pub(crate) fn create_timeline(
let repo = tenant_mgr::get_repository_for_tenant(tenant_id)?;
if conf.timeline_path(&new_timeline_id, &tenant_id).exists() {
match repo.get_timeline(new_timeline_id)? {
RepositoryTimeline::Local { id, .. } => {
debug!("timeline {} already exists", id);
return Ok(None);
}
RepositoryTimeline::Remote { id, .. } => bail!(
"timeline {} already exists in pageserver's remote storage",
id
),
}
debug!("timeline {} already exists", new_timeline_id);
return Ok(None);
}
let mut start_lsn = ancestor_start_lsn.unwrap_or(Lsn(0));
@@ -353,15 +350,8 @@ pub(crate) fn create_timeline(
let new_timeline_info = match ancestor_timeline_id {
Some(ancestor_timeline_id) => {
let ancestor_timeline = repo
.get_timeline(ancestor_timeline_id)
.with_context(|| format!("Cannot get ancestor timeline {}", ancestor_timeline_id))?
.local_timeline()
.with_context(|| {
format!(
"Cannot branch off the timeline {} that's not present locally",
ancestor_timeline_id
)
})?;
.get_timeline_load(ancestor_timeline_id)
.context("Cannot branch off the timeline that's not present locally")?;
if start_lsn == Lsn(0) {
// Find end of WAL on the old timeline
@@ -391,18 +381,20 @@ pub(crate) fn create_timeline(
}
repo.branch_timeline(ancestor_timeline_id, new_timeline_id, start_lsn)?;
// load the timeline into memory
let loaded_timeline = repo.get_timeline(new_timeline_id)?;
TimelineInfo::from_repo_timeline(tenant_id, loaded_timeline, false)
let loaded_timeline = repo.get_timeline_load(new_timeline_id)?;
LocalTimelineInfo::from_loaded_timeline(loaded_timeline.as_ref(), false)
.context("cannot fill timeline info")?
}
None => {
let new_timeline = bootstrap_timeline(conf, tenant_id, new_timeline_id, repo.as_ref())?;
TimelineInfo::from_dyn_timeline(
tenant_id,
new_timeline_id,
new_timeline.as_ref(),
false,
)
LocalTimelineInfo::from_loaded_timeline(new_timeline.as_ref(), false)
.context("cannot fill timeline info")?
}
};
Ok(Some(new_timeline_info))
Ok(Some(TimelineInfo {
tenant_id,
timeline_id: new_timeline_id,
local: Some(new_timeline_info),
remote: None,
}))
}

View File

@@ -31,6 +31,7 @@ use tracing::*;
use zenith_utils::lsn::Lsn;
use zenith_utils::pq_proto::ZenithFeedback;
use zenith_utils::zid::ZTenantId;
use zenith_utils::zid::ZTenantTimelineId;
use zenith_utils::zid::ZTimelineId;
//
@@ -111,18 +112,18 @@ fn get_wal_producer_connstr(tenantid: ZTenantId, timelineid: ZTimelineId) -> Str
//
fn thread_main(
conf: &'static PageServerConf,
tenantid: ZTenantId,
timelineid: ZTimelineId,
tenant_id: ZTenantId,
timeline_id: ZTimelineId,
) -> Result<()> {
let _enter = info_span!("WAL receiver", timeline = %timelineid, tenant = %tenantid).entered();
let _enter = info_span!("WAL receiver", timeline = %timeline_id, tenant = %tenant_id).entered();
info!("WAL receiver thread started");
// Look up the current WAL producer address
let wal_producer_connstr = get_wal_producer_connstr(tenantid, timelineid);
let wal_producer_connstr = get_wal_producer_connstr(tenant_id, timeline_id);
// Make a connection to the WAL safekeeper, or directly to the primary PostgreSQL server,
// and start streaming WAL from it.
let res = walreceiver_main(conf, tenantid, timelineid, &wal_producer_connstr);
let res = walreceiver_main(conf, tenant_id, timeline_id, &wal_producer_connstr);
// TODO cleanup info messages
if let Err(e) = res {
@@ -130,20 +131,20 @@ fn thread_main(
} else {
info!(
"walreceiver disconnected tenant {}, timelineid {}",
tenantid, timelineid
tenant_id, timeline_id
);
}
// Drop it from list of active WAL_RECEIVERS
// so that next callmemaybe request launched a new thread
drop_wal_receiver(tenantid, timelineid);
drop_wal_receiver(tenant_id, timeline_id);
Ok(())
}
fn walreceiver_main(
_conf: &PageServerConf,
tenantid: ZTenantId,
timelineid: ZTimelineId,
tenant_id: ZTenantId,
timeline_id: ZTimelineId,
wal_producer_connstr: &str,
) -> Result<(), Error> {
// Connect to the database in replication mode.
@@ -182,13 +183,16 @@ fn walreceiver_main(
let end_of_wal = Lsn::from(u64::from(identify.xlogpos));
let mut caught_up = false;
let timeline =
tenant_mgr::get_timeline_for_tenant(tenantid, timelineid).with_context(|| {
format!(
"Can not start the walrecever for a remote tenant {}, timeline {}",
tenantid, timelineid,
)
})?;
let repo = tenant_mgr::get_repository_for_tenant(tenant_id)
.with_context(|| format!("no repository found for tenant {}", tenant_id))?;
let timeline = repo.get_timeline_load(timeline_id).with_context(|| {
format!(
"local timeline {} not found for tenant {}",
timeline_id, tenant_id
)
})?;
let remote_index = repo.get_remote_index();
//
// Start streaming the WAL, from where we left off previously.
@@ -292,11 +296,19 @@ fn walreceiver_main(
};
if let Some(last_lsn) = status_update {
let timeline_synced_disk_consistent_lsn =
tenant_mgr::get_repository_for_tenant(tenantid)?
.get_timeline_state(timelineid)
.and_then(|state| state.remote_disk_consistent_lsn())
.unwrap_or(Lsn(0));
let timeline_remote_consistent_lsn = runtime.block_on(async {
remote_index
.read()
.await
// here we either do not have this timeline in remote index
// or there were no checkpoints for it yet
.timeline_entry(&ZTenantTimelineId {
tenant_id,
timeline_id,
})
.and_then(|e| e.disk_consistent_lsn())
.unwrap_or(Lsn(0)) // no checkpoint was uploaded
});
// The last LSN we processed. It is not guaranteed to survive pageserver crash.
let write_lsn = u64::from(last_lsn);
@@ -304,7 +316,7 @@ fn walreceiver_main(
let flush_lsn = u64::from(timeline.get_disk_consistent_lsn());
// The last LSN that is synced to remote storage and is guaranteed to survive pageserver crash
// Used by safekeepers to remove WAL preceding `remote_consistent_lsn`.
let apply_lsn = u64::from(timeline_synced_disk_consistent_lsn);
let apply_lsn = u64::from(timeline_remote_consistent_lsn);
let ts = SystemTime::now();
// Send zenith feedback message.

View File

@@ -5,7 +5,7 @@ import time, shutil, os
from contextlib import closing
from pathlib import Path
from uuid import UUID
from fixtures.zenith_fixtures import ZenithEnvBuilder
from fixtures.zenith_fixtures import ZenithEnvBuilder, assert_local, wait_for, wait_for_last_record_lsn, wait_for_upload
from fixtures.log_helper import log
import pytest
@@ -26,7 +26,6 @@ import pytest
# * queries the specific data, ensuring that it matches the one stored before
#
# The tests are done for all types of remote storage pageserver supports.
@pytest.mark.skip(reason="will be fixed with https://github.com/zenithdb/zenith/issues/1193")
@pytest.mark.parametrize('storage_type', ['local_fs', 'mock_s3'])
def test_remote_storage_backup_and_restore(zenith_env_builder: ZenithEnvBuilder, storage_type: str):
zenith_env_builder.rust_log_override = 'debug'
@@ -45,6 +44,8 @@ def test_remote_storage_backup_and_restore(zenith_env_builder: ZenithEnvBuilder,
env = zenith_env_builder.init_start()
pg = env.postgres.create_start('main')
client = env.pageserver.http_client()
tenant_id = pg.safe_psql("show zenith.zenith_tenant")[0][0]
timeline_id = pg.safe_psql("show zenith.zenith_timeline")[0][0]
@@ -54,13 +55,21 @@ def test_remote_storage_backup_and_restore(zenith_env_builder: ZenithEnvBuilder,
CREATE TABLE t1(id int primary key, secret text);
INSERT INTO t1 VALUES ({data_id}, '{data_secret}');
''')
cur.execute("SELECT pg_current_wal_flush_lsn()")
current_lsn = int(cur.fetchone()[0].split('/')[1], base=16)
# wait until pageserver receives that data
wait_for_last_record_lsn(client, UUID(tenant_id), UUID(timeline_id), current_lsn)
# run checkpoint manually to be sure that data landed in remote storage
with closing(env.pageserver.connect()) as psconn:
with psconn.cursor() as pscur:
pscur.execute(f"do_gc {tenant_id} {timeline_id}")
log.info("waiting for upload") # TODO api to check if upload is done
time.sleep(2)
pscur.execute(f"checkpoint {tenant_id} {timeline_id}")
log.info("waiting for upload")
# wait until pageserver successfully uploaded a checkpoint to remote storage
wait_for_upload(client, UUID(tenant_id), UUID(timeline_id), current_lsn)
log.info("upload is done")
##### Stop the first pageserver instance, erase all its data
env.postgres.stop_all()
@@ -73,26 +82,12 @@ def test_remote_storage_backup_and_restore(zenith_env_builder: ZenithEnvBuilder,
##### Second start, restore the data and ensure it's the same
env.pageserver.start()
client = env.pageserver.http_client()
client.timeline_attach(UUID(tenant_id), UUID(timeline_id))
# FIXME cannot handle duplicate download requests (which might be caused by repeated timeline detail calls)
# subject to fix in https://github.com/zenithdb/zenith/issues/997
time.sleep(5)
log.info("waiting for timeline redownload")
attempts = 0
while True:
timeline_details = client.timeline_detail(UUID(tenant_id), UUID(timeline_id))
assert timeline_details['timeline_id'] == timeline_id
assert timeline_details['tenant_id'] == tenant_id
if timeline_details['kind'] == 'Local':
log.info("timeline downloaded, checking its data")
break
attempts += 1
if attempts > 10:
raise Exception("timeline redownload failed")
log.debug("still waiting")
time.sleep(1)
wait_for(number_of_iterations=10,
interval=1,
func=lambda: assert_local(client, UUID(tenant_id), UUID(timeline_id)))
pg = env.postgres.create_start('main')
with closing(pg.connect()) as conn:

View File

@@ -3,17 +3,19 @@ import os
import pathlib
import subprocess
import threading
from typing import Dict
from uuid import UUID
from fixtures.log_helper import log
import time
import signal
import pytest
from fixtures.zenith_fixtures import PgProtocol, PortDistributor, Postgres, ZenithEnvBuilder, ZenithPageserverHttpClient, zenith_binpath, pg_distrib_dir
from fixtures.zenith_fixtures import PgProtocol, PortDistributor, Postgres, ZenithEnvBuilder, ZenithPageserverHttpClient, assert_local, wait_for, wait_for_last_record_lsn, wait_for_upload, zenith_binpath, pg_distrib_dir
def assert_abs_margin_ratio(a: float, b: float, margin_ratio: float):
assert abs(a - b) / a < margin_ratio, (a, b, margin_ratio)
print("!" * 100, abs(a - b) / a)
assert abs(a - b) / a < margin_ratio, abs(a - b) / a
@contextmanager
@@ -34,6 +36,7 @@ def new_pageserver_helper(new_pageserver_dir: pathlib.Path,
f"-c listen_pg_addr='localhost:{pg_port}'",
f"-c listen_http_addr='localhost:{http_port}'",
f"-c pg_distrib_dir='{pg_distrib_dir}'",
f"-c id=2",
f"-c remote_storage={{local_path='{remote_storage_mock_path}'}}",
]
@@ -57,20 +60,6 @@ def new_pageserver_helper(new_pageserver_dir: pathlib.Path,
os.kill(pid, signal.SIGQUIT)
def wait_for(number_of_iterations: int, interval: int, func):
last_exception = None
for i in range(number_of_iterations):
try:
res = func()
except Exception as e:
log.info("waiting for %s iteration %s failed", func, i + 1)
last_exception = e
time.sleep(interval)
continue
return res
raise Exception("timed out while waiting for %s" % func) from last_exception
@contextmanager
def pg_cur(pg):
with closing(pg.connect()) as conn:
@@ -108,13 +97,6 @@ def load(pg: Postgres, stop_event: threading.Event, load_ok_event: threading.Eve
log.info('load thread stopped')
def assert_local(pageserver_http_client: ZenithPageserverHttpClient, tenant: UUID, timeline: str):
timeline_detail = pageserver_http_client.timeline_detail(tenant, UUID(timeline))
assert timeline_detail.get('type') == "Local", timeline_detail
return timeline_detail
@pytest.mark.skip(reason="will be fixed with https://github.com/zenithdb/zenith/issues/1193")
@pytest.mark.parametrize('with_load', ['with_load', 'without_load'])
def test_tenant_relocation(zenith_env_builder: ZenithEnvBuilder,
port_distributor: PortDistributor,
@@ -129,7 +111,7 @@ def test_tenant_relocation(zenith_env_builder: ZenithEnvBuilder,
tenant = env.zenith_cli.create_tenant(UUID("74ee8b079a0e437eb0afea7d26a07209"))
log.info("tenant to relocate %s", tenant)
env.zenith_cli.create_root_branch('main', tenant_id=tenant)
env.zenith_cli.create_branch('test_tenant_relocation', tenant_id=tenant)
tenant_pg = env.postgres.create_start(branch_name='main',
@@ -141,8 +123,8 @@ def test_tenant_relocation(zenith_env_builder: ZenithEnvBuilder,
with conn.cursor() as cur:
# save timeline for later gc call
cur.execute("SHOW zenith.zenith_timeline")
timeline = cur.fetchone()[0]
log.info("timeline to relocate %s", timeline)
timeline = UUID(cur.fetchone()[0])
log.info("timeline to relocate %s", timeline.hex)
# we rely upon autocommit after each statement
# as waiting for acceptors happens there
@@ -150,6 +132,15 @@ def test_tenant_relocation(zenith_env_builder: ZenithEnvBuilder,
cur.execute("INSERT INTO t SELECT generate_series(1,1000), 'some payload'")
cur.execute("SELECT sum(key) FROM t")
assert cur.fetchone() == (500500, )
cur.execute("SELECT pg_current_wal_flush_lsn()")
current_lsn = int(cur.fetchone()[0].split('/')[1], base=16)
pageserver_http = env.pageserver.http_client()
# wait until pageserver receives that data
wait_for_last_record_lsn(pageserver_http, tenant, timeline, current_lsn)
timeline_detail = pageserver_http.timeline_detail_v2(tenant, timeline)
if with_load == 'with_load':
# create load table
@@ -165,12 +156,10 @@ def test_tenant_relocation(zenith_env_builder: ZenithEnvBuilder,
# run checkpoint manually to be sure that data landed in remote storage
with closing(env.pageserver.connect()) as psconn:
with psconn.cursor() as pscur:
pscur.execute(f"do_gc {tenant.hex} {timeline}")
pscur.execute(f"checkpoint {tenant.hex} {timeline.hex}")
# ensure upload is completed
pageserver_http_client = env.pageserver.http_client()
timeline_detail = pageserver_http_client.timeline_detail(tenant, UUID(timeline))
assert timeline_detail['disk_consistent_lsn'] == timeline_detail['timeline_state']['Ready']
# wait until pageserver successfully uploaded a checkpoint to remote storage
wait_for_upload(pageserver_http, tenant, timeline, current_lsn)
log.info("inititalizing new pageserver")
# bootstrap second pageserver
@@ -182,8 +171,7 @@ def test_tenant_relocation(zenith_env_builder: ZenithEnvBuilder,
log.info("new pageserver ports pg %s http %s", new_pageserver_pg_port, new_pageserver_http_port)
pageserver_bin = pathlib.Path(zenith_binpath) / 'pageserver'
new_pageserver_http_client = ZenithPageserverHttpClient(port=new_pageserver_http_port,
auth_token=None)
new_pageserver_http = ZenithPageserverHttpClient(port=new_pageserver_http_port, auth_token=None)
with new_pageserver_helper(new_pageserver_dir,
pageserver_bin,
@@ -192,25 +180,18 @@ def test_tenant_relocation(zenith_env_builder: ZenithEnvBuilder,
new_pageserver_http_port):
# call to attach timeline to new pageserver
new_pageserver_http_client.timeline_attach(tenant, UUID(timeline))
# FIXME cannot handle duplicate download requests, subject to fix in https://github.com/zenithdb/zenith/issues/997
time.sleep(5)
# new pageserver should in sync (modulo wal tail or vacuum activity) with the old one because there was no new writes since checkpoint
new_pageserver_http.timeline_attach(tenant, timeline)
# new pageserver should be in sync (modulo wal tail or vacuum activity) with the old one because there was no new writes since checkpoint
new_timeline_detail = wait_for(
number_of_iterations=5,
interval=1,
func=lambda: assert_local(new_pageserver_http_client, tenant, timeline))
assert new_timeline_detail['timeline_state'].get('Ready'), new_timeline_detail
func=lambda: assert_local(new_pageserver_http, tenant, timeline))
# when load is active these checks can break because lsns are not static
# so lets check with some margin
if with_load == 'without_load':
# TODO revisit this once https://github.com/zenithdb/zenith/issues/1049 is fixed
assert_abs_margin_ratio(new_timeline_detail['disk_consistent_lsn'],
timeline_detail['disk_consistent_lsn'],
0.01)
assert_abs_margin_ratio(new_timeline_detail['timeline_state']['Ready'],
timeline_detail['timeline_state']['Ready'],
0.01)
assert_abs_margin_ratio(new_timeline_detail['local']['disk_consistent_lsn'],
timeline_detail['local']['disk_consistent_lsn'],
0.03)
# callmemaybe to start replication from safekeeper to the new pageserver
# when there is no load there is a clean checkpoint and no wal delta
@@ -219,7 +200,9 @@ def test_tenant_relocation(zenith_env_builder: ZenithEnvBuilder,
with pg_cur(PgProtocol(host='localhost', port=new_pageserver_pg_port)) as cur:
# "callmemaybe {} {} host={} port={} options='-c ztimelineid={} ztenantid={}'"
safekeeper_connstring = f"host=localhost port={env.safekeepers[0].port.pg} options='-c ztimelineid={timeline} ztenantid={tenant} pageserver_connstr=postgresql://no_user:@localhost:{new_pageserver_pg_port}'"
cur.execute("callmemaybe {} {} {}".format(tenant, timeline, safekeeper_connstring))
cur.execute("callmemaybe {} {} {}".format(tenant.hex,
timeline.hex,
safekeeper_connstring))
tenant_pg.stop()
@@ -239,7 +222,7 @@ def test_tenant_relocation(zenith_env_builder: ZenithEnvBuilder,
# detach tenant from old pageserver before we check
# that all the data is there to be sure that old pageserver
# is no longer involved, and if it is, we will see the errors
pageserver_http_client.timeline_detach(tenant, UUID(timeline))
pageserver_http.timeline_detach(tenant, timeline)
with pg_cur(tenant_pg) as cur:
# check that data is still there

View File

@@ -783,6 +783,15 @@ class ZenithPageserverHttpClient(requests.Session):
assert isinstance(res_json, dict)
return res_json
def timeline_detail_v2(self, tenant_id: uuid.UUID, timeline_id: uuid.UUID) -> Dict[Any, Any]:
res = self.get(
f"http://localhost:{self.port}/v2/tenant/{tenant_id.hex}/timeline/{timeline_id.hex}?include-non-incremental-logical-size=1"
)
self.verbose_error(res)
res_json = res.json()
assert isinstance(res_json, dict)
return res_json
def get_metrics(self) -> str:
res = self.get(f"http://localhost:{self.port}/metrics")
self.verbose_error(res)
@@ -866,6 +875,30 @@ class ZenithCli:
return uuid.UUID(created_timeline_id)
def create_root_branch(self, branch_name: str, tenant_id: Optional[uuid.UUID] = None):
cmd = [
'timeline',
'create',
'--branch-name',
branch_name,
'--tenant-id',
(tenant_id or self.env.initial_tenant).hex,
]
res = self.raw_cli(cmd)
res.check_returncode()
matches = CREATE_TIMELINE_ID_EXTRACTOR.search(res.stdout)
created_timeline_id = None
if matches is not None:
created_timeline_id = matches.group('timeline_id')
if created_timeline_id is None:
raise Exception('could not find timeline id after `zenith timeline create` invocation')
else:
return uuid.UUID(created_timeline_id)
def create_branch(self,
new_branch_name: str = DEFAULT_BRANCH_NAME,
ancestor_branch_name: Optional[str] = None,
@@ -1839,3 +1872,59 @@ def check_restored_datadir_content(test_output_dir: str, env: ZenithEnv, pg: Pos
subprocess.run([cmd], stdout=stdout_f, shell=True)
assert (mismatch, error) == ([], [])
def wait_for(number_of_iterations: int, interval: int, func):
last_exception = None
for i in range(number_of_iterations):
try:
res = func()
except Exception as e:
log.info("waiting for %s iteration %s failed", func, i + 1)
last_exception = e
time.sleep(interval)
continue
return res
raise Exception("timed out while waiting for %s" % func) from last_exception
def assert_local(pageserver_http_client: ZenithPageserverHttpClient,
tenant: uuid.UUID,
timeline: uuid.UUID):
timeline_detail = pageserver_http_client.timeline_detail_v2(tenant, timeline)
assert timeline_detail.get('local', {}).get("disk_consistent_lsn"), timeline_detail
return timeline_detail
def remote_consistent_lsn(pageserver_http_client: ZenithPageserverHttpClient,
tenant: uuid.UUID,
timeline: uuid.UUID) -> int:
detail = pageserver_http_client.timeline_detail_v2(tenant, timeline)
assert isinstance(detail['remote']['remote_consistent_lsn'], int)
return detail['remote']['remote_consistent_lsn']
def wait_for_upload(pageserver_http_client: ZenithPageserverHttpClient,
tenant: uuid.UUID,
timeline: uuid.UUID,
lsn: int):
"""waits for local timeline upload up to specified lsn"""
wait_for(10, 1, lambda: remote_consistent_lsn(pageserver_http_client, tenant, timeline) >= lsn)
def last_record_lsn(pageserver_http_client: ZenithPageserverHttpClient,
tenant: uuid.UUID,
timeline: uuid.UUID) -> int:
detail = pageserver_http_client.timeline_detail_v2(tenant, timeline)
assert isinstance(detail['local']['last_record_lsn'], int)
return detail['local']['last_record_lsn']
def wait_for_last_record_lsn(pageserver_http_client: ZenithPageserverHttpClient,
tenant: uuid.UUID,
timeline: uuid.UUID,
lsn: int):
"""waits for pageserver to catch up to a certain lsn"""
wait_for(10, 1, lambda: last_record_lsn(pageserver_http_client, tenant, timeline) >= lsn)

View File

@@ -299,42 +299,40 @@ fn print_timelines_tree(
.iter()
.map(|t| {
(
t.timeline_id(),
t.timeline_id,
TimelineTreeEl {
info: t.clone(),
children: BTreeSet::new(),
name: timeline_name_mappings
.remove(&ZTenantTimelineId::new(t.tenant_id(), t.timeline_id())),
.remove(&ZTenantTimelineId::new(t.tenant_id, t.timeline_id)),
},
)
})
.collect::<HashMap<_, _>>();
// Memorize all direct children of each timeline.
for timeline in &timelines {
if let TimelineInfo::Local {
ancestor_timeline_id: Some(tid),
..
} = timeline
for timeline in timelines.iter() {
if let Some(ancestor_timeline_id) =
timeline.local.as_ref().and_then(|l| l.ancestor_timeline_id)
{
timelines_hash
.get_mut(tid)
.get_mut(&ZTimelineId::from(ancestor_timeline_id))
.context("missing timeline info in the HashMap")?
.children
.insert(timeline.timeline_id());
.insert(timeline.timeline_id);
}
}
for timeline in timelines_hash.values() {
// Start with root local timelines (no ancestors) first.
if let TimelineInfo::Local {
ancestor_timeline_id,
..
} = &timeline.info
if timeline
.info
.local
.as_ref()
.and_then(|l| l.ancestor_timeline_id)
.is_none()
{
if ancestor_timeline_id.is_none() {
print_timeline(0, &Vec::from([true]), timeline, &timelines_hash)?;
}
print_timeline(0, &Vec::from([true]), timeline, &timelines_hash)?;
}
}
@@ -350,20 +348,21 @@ fn print_timeline(
timeline: &TimelineTreeEl,
timelines: &HashMap<ZTimelineId, TimelineTreeEl>,
) -> Result<()> {
let local_or_remote = match timeline.info {
TimelineInfo::Local { .. } => "(L)",
TimelineInfo::Remote { .. } => "(R)",
let local_remote = match (timeline.info.local.as_ref(), timeline.info.remote.as_ref()) {
(None, None) => unreachable!("in this case no info for a timeline is found"),
(None, Some(_)) => "(R)",
(Some(_), None) => "(L)",
(Some(_), Some(_)) => "(L+R)",
};
// Draw main padding
print!("{} ", local_or_remote);
print!("{} ", local_remote);
if nesting_level > 0 {
let lsn_string = match &timeline.info {
TimelineInfo::Local { ancestor_lsn, .. } => ancestor_lsn
.map(|lsn| lsn.to_string())
.unwrap_or_else(|| "Unknown local Lsn".to_string()),
TimelineInfo::Remote { .. } => "unknown Lsn (remote)".to_string(),
let ancestor_lsn = match timeline.info.local.as_ref().and_then(|i| i.ancestor_lsn) {
Some(lsn) => lsn.to_string(),
None => "Unknown Lsn".to_string(),
};
let mut br_sym = "┣━";
// Draw each nesting padding with proper style
@@ -383,14 +382,14 @@ fn print_timeline(
br_sym = "┗━";
}
print!("{} @{}: ", br_sym, lsn_string);
print!("{} @{}: ", br_sym, ancestor_lsn);
}
// Finally print a timeline id and name with new line
println!(
"{} [{}]",
timeline.name.as_deref().unwrap_or("_no_name_"),
timeline.info.timeline_id()
timeline.info.timeline_id
);
let len = timeline.children.len();
@@ -430,7 +429,7 @@ fn get_timeline_infos(
Ok(PageServerNode::from_env(env)
.timeline_list(tenant_id)?
.into_iter()
.map(|timeline_info| (timeline_info.timeline_id(), timeline_info))
.map(|timeline_info| (timeline_info.timeline_id, timeline_info))
.collect())
}
@@ -555,26 +554,17 @@ fn handle_timeline(timeline_match: &ArgMatches, env: &mut local_env::LocalEnv) -
let timeline = pageserver
.timeline_create(tenant_id, None, None, None)?
.ok_or_else(|| anyhow!("Failed to create new timeline for tenant {}", tenant_id))?;
let new_timeline_id = timeline.timeline_id();
let new_timeline_id = timeline.timeline_id;
let last_record_lsn = match timeline {
TimelineInfo::Local {
last_record_lsn, ..
} => last_record_lsn,
TimelineInfo::Remote { .. } => {
bail!(
"Timeline {} was created as remote, not local",
new_timeline_id
)
}
};
let last_record_lsn = timeline
.local
.expect("no local timeline info")
.last_record_lsn;
env.register_branch_mapping(new_branch_name.to_string(), tenant_id, new_timeline_id)?;
println!(
"Created timeline '{}' at Lsn {} for tenant: {}",
timeline.timeline_id(),
last_record_lsn,
tenant_id,
timeline.timeline_id, last_record_lsn, tenant_id,
);
}
Some(("branch", branch_match)) => {
@@ -602,26 +592,18 @@ fn handle_timeline(timeline_match: &ArgMatches, env: &mut local_env::LocalEnv) -
let timeline = pageserver
.timeline_create(tenant_id, None, start_lsn, Some(ancestor_timeline_id))?
.ok_or_else(|| anyhow!("Failed to create new timeline for tenant {}", tenant_id))?;
let new_timeline_id = timeline.timeline_id();
let new_timeline_id = timeline.timeline_id;
let last_record_lsn = match timeline {
TimelineInfo::Local {
last_record_lsn, ..
} => last_record_lsn,
TimelineInfo::Remote { .. } => bail!(
"Timeline {} was created as remote, not local",
new_timeline_id
),
};
let last_record_lsn = timeline
.local
.expect("no local timeline info")
.last_record_lsn;
env.register_branch_mapping(new_branch_name.to_string(), tenant_id, new_timeline_id)?;
println!(
"Created timeline '{}' at Lsn {} for tenant: {}. Ancestor timeline: '{}'",
timeline.timeline_id(),
last_record_lsn,
tenant_id,
ancestor_branch_name,
timeline.timeline_id, last_record_lsn, tenant_id, ancestor_branch_name,
);
}
Some((sub_name, _)) => bail!("Unexpected tenant subcommand '{}'", sub_name),
@@ -662,13 +644,8 @@ fn handle_pg(pg_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> {
// older point in time, or following but lagging behind the primary.
let lsn_str = timeline_infos
.get(&node.timeline_id)
.map(|bi| match bi {
TimelineInfo::Local {
last_record_lsn, ..
} => last_record_lsn.to_string(),
TimelineInfo::Remote { .. } => "? (remote)".to_string(),
})
.unwrap_or_else(|| '?'.to_string());
.and_then(|bi| bi.local.as_ref().map(|l| l.last_record_lsn.to_string()))
.unwrap_or_else(|| "?".to_string());
let branch_name = timeline_name_mappings
.get(&ZTenantTimelineId::new(tenant_id, node.timeline_id))

View File

@@ -14,6 +14,9 @@ pub enum ApiError {
#[error("Unauthorized: {0}")]
Unauthorized(String),
#[error("NotFound: {0}")]
NotFound(String),
#[error(transparent)]
InternalServerError(#[from] anyhow::Error),
}
@@ -36,6 +39,9 @@ impl ApiError {
self.to_string(),
StatusCode::UNAUTHORIZED,
),
ApiError::NotFound(_) => {
HttpErrorBody::response_from_msg_and_status(self.to_string(), StatusCode::NOT_FOUND)
}
ApiError::InternalServerError(err) => HttpErrorBody::response_from_msg_and_status(
err.to_string(),
StatusCode::INTERNAL_SERVER_ERROR,