From 7738254f83c86e46795b34db834d18af97197d8d Mon Sep 17 00:00:00 2001 From: Dmitry Rodionov Date: Thu, 17 Mar 2022 13:21:00 +0400 Subject: [PATCH] refactor timeline memory state management --- control_plane/src/storage.rs | 16 +- pageserver/src/bin/pageserver.rs | 49 +- pageserver/src/http/models.rs | 96 +++- pageserver/src/http/routes.rs | 188 +++++-- pageserver/src/layered_repository.rs | 471 ++++++++---------- pageserver/src/page_service.rs | 16 +- pageserver/src/remote_storage.rs | 37 +- pageserver/src/remote_storage/storage_sync.rs | 274 +++++----- .../remote_storage/storage_sync/download.rs | 73 +-- .../src/remote_storage/storage_sync/index.rs | 126 ++++- .../src/remote_storage/storage_sync/upload.rs | 110 ++-- pageserver/src/repository.rs | 257 ++++++---- pageserver/src/tenant_mgr.rs | 169 +++---- pageserver/src/timelines.rs | 348 +++++++------ pageserver/src/walreceiver.rs | 56 ++- .../batch_others/test_remote_storage.py | 39 +- .../batch_others/test_tenant_relocation.py | 81 ++- test_runner/fixtures/zenith_fixtures.py | 89 ++++ zenith/src/main.rs | 105 ++-- zenith_utils/src/http/error.rs | 6 + 20 files changed, 1484 insertions(+), 1122 deletions(-) diff --git a/control_plane/src/storage.rs b/control_plane/src/storage.rs index f6b7173067..ef43ba3c1e 100644 --- a/control_plane/src/storage.rs +++ b/control_plane/src/storage.rs @@ -1,4 +1,3 @@ -use std::convert::TryFrom; use std::io::Write; use std::net::TcpStream; use std::path::PathBuf; @@ -10,7 +9,7 @@ use anyhow::{bail, Context}; use nix::errno::Errno; use nix::sys::signal::{kill, Signal}; use nix::unistd::Pid; -use pageserver::http::models::{TenantCreateRequest, TimelineCreateRequest, TimelineInfoResponse}; +use pageserver::http::models::{TenantCreateRequest, TimelineCreateRequest}; use pageserver::timelines::TimelineInfo; use postgres::{Config, NoTls}; use reqwest::blocking::{Client, RequestBuilder, Response}; @@ -358,7 +357,7 @@ impl PageServerNode { } pub fn timeline_list(&self, tenant_id: &ZTenantId) -> anyhow::Result> { - let timeline_infos: Vec = self + let timeline_infos: Vec = self .http_request( Method::GET, format!("{}/tenant/{}/timeline", self.http_base_url, tenant_id), @@ -367,10 +366,7 @@ impl PageServerNode { .error_from_body()? .json()?; - timeline_infos - .into_iter() - .map(TimelineInfo::try_from) - .collect() + Ok(timeline_infos) } pub fn timeline_create( @@ -392,10 +388,8 @@ impl PageServerNode { }) .send()? .error_from_body()? - .json::>()?; + .json::>()?; - timeline_info_response - .map(TimelineInfo::try_from) - .transpose() + Ok(timeline_info_response) } } diff --git a/pageserver/src/bin/pageserver.rs b/pageserver/src/bin/pageserver.rs index d37ba0cece..05fb14daca 100644 --- a/pageserver/src/bin/pageserver.rs +++ b/pageserver/src/bin/pageserver.rs @@ -18,7 +18,10 @@ use daemonize::Daemonize; use pageserver::{ config::{defaults::*, PageServerConf}, - http, page_cache, page_service, remote_storage, tenant_mgr, thread_mgr, + http, page_cache, page_service, + remote_storage::{self, SyncStartupData}, + repository::TimelineSyncStatusUpdate, + tenant_mgr, thread_mgr, thread_mgr::ThreadKind, timelines, virtual_file, LOG_FILE_NAME, }; @@ -227,11 +230,47 @@ fn start_pageserver(conf: &'static PageServerConf, daemonize: bool) -> Result<() } let signals = signals::install_shutdown_handlers()?; - let sync_startup = remote_storage::start_local_timeline_sync(conf) + + // Initialize repositories with locally available timelines. + // Timelines that are only partially available locally (remote storage has more data than this pageserver) + // are scheduled for download and added to the repository once download is completed. + let SyncStartupData { + remote_index, + local_timeline_init_statuses, + } = remote_storage::start_local_timeline_sync(conf) .context("Failed to set up local files sync with external storage")?; - // Initialize tenant manager. - tenant_mgr::set_timeline_states(conf, sync_startup.initial_timeline_states); + for (tenant_id, local_timeline_init_statuses) in local_timeline_init_statuses { + // initialize local tenant + let repo = tenant_mgr::load_local_repo(conf, tenant_id, &remote_index); + for (timeline_id, init_status) in local_timeline_init_statuses { + match init_status { + remote_storage::LocalTimelineInitStatus::LocallyComplete => { + debug!("timeline {} for tenant {} is locally complete, registering it in repository", tenant_id, timeline_id); + // Lets fail here loudly to be on the safe side. + // XXX: It may be a better api to actually distinguish between repository startup + // and processing of newly downloaded timelines. + repo.apply_timeline_remote_sync_status_update( + timeline_id, + TimelineSyncStatusUpdate::Downloaded, + ) + .with_context(|| { + format!( + "Failed to bootstrap timeline {} for tenant {}", + timeline_id, tenant_id + ) + })? + } + remote_storage::LocalTimelineInitStatus::NeedsSync => { + debug!( + "timeline {} for tenant {} needs sync, \ + so skipped for adding into repository until sync is finished", + tenant_id, timeline_id + ); + } + } + } + } // initialize authentication for incoming connections let auth = match &conf.auth_type { @@ -253,7 +292,7 @@ fn start_pageserver(conf: &'static PageServerConf, daemonize: bool) -> Result<() None, "http_endpoint_thread", move || { - let router = http::make_router(conf, auth_cloned); + let router = http::make_router(conf, auth_cloned, remote_index); endpoint::serve_thread_main(router, http_listener, thread_mgr::shutdown_watcher()) }, )?; diff --git a/pageserver/src/http/models.rs b/pageserver/src/http/models.rs index 9844e7ea82..8827713f11 100644 --- a/pageserver/src/http/models.rs +++ b/pageserver/src/http/models.rs @@ -1,11 +1,12 @@ -use crate::timelines::TimelineInfo; -use anyhow::{anyhow, bail, Context}; +use anyhow::Context; use serde::{Deserialize, Serialize}; use zenith_utils::{ lsn::Lsn, zid::{HexZTenantId, HexZTimelineId, ZNodeId, ZTenantId, ZTimelineId}, }; +use crate::timelines::{LocalTimelineInfo, TimelineInfo}; + #[derive(Serialize, Deserialize)] pub struct TimelineCreateRequest { pub new_timeline_id: Option, @@ -18,8 +19,28 @@ pub struct TenantCreateRequest { pub new_tenant_id: Option, } +#[derive(Clone)] +pub enum TimelineInfoV1 { + Local { + timeline_id: ZTimelineId, + tenant_id: ZTenantId, + last_record_lsn: Lsn, + prev_record_lsn: Option, + ancestor_timeline_id: Option, + ancestor_lsn: Option, + disk_consistent_lsn: Lsn, + current_logical_size: Option, + current_logical_size_non_incremental: Option, + }, + Remote { + timeline_id: ZTimelineId, + tenant_id: ZTenantId, + disk_consistent_lsn: Lsn, + }, +} + #[derive(Serialize, Deserialize)] -pub struct TimelineInfoResponse { +pub struct TimelineInfoResponseV1 { pub kind: String, #[serde(with = "hex")] timeline_id: ZTimelineId, @@ -34,10 +55,10 @@ pub struct TimelineInfoResponse { current_logical_size_non_incremental: Option, } -impl From for TimelineInfoResponse { - fn from(other: TimelineInfo) -> Self { +impl From for TimelineInfoResponseV1 { + fn from(other: TimelineInfoV1) -> Self { match other { - TimelineInfo::Local { + TimelineInfoV1::Local { timeline_id, tenant_id, last_record_lsn, @@ -47,23 +68,23 @@ impl From for TimelineInfoResponse { disk_consistent_lsn, current_logical_size, current_logical_size_non_incremental, - } => TimelineInfoResponse { + } => TimelineInfoResponseV1 { kind: "Local".to_owned(), timeline_id, tenant_id, disk_consistent_lsn: disk_consistent_lsn.to_string(), last_record_lsn: Some(last_record_lsn.to_string()), - prev_record_lsn: Some(prev_record_lsn.to_string()), + prev_record_lsn: prev_record_lsn.map(|lsn| lsn.to_string()), ancestor_timeline_id: ancestor_timeline_id.map(HexZTimelineId::from), ancestor_lsn: ancestor_lsn.map(|lsn| lsn.to_string()), - current_logical_size: Some(current_logical_size), + current_logical_size, current_logical_size_non_incremental, }, - TimelineInfo::Remote { + TimelineInfoV1::Remote { timeline_id, tenant_id, disk_consistent_lsn, - } => TimelineInfoResponse { + } => TimelineInfoResponseV1 { kind: "Remote".to_owned(), timeline_id, tenant_id, @@ -79,10 +100,10 @@ impl From for TimelineInfoResponse { } } -impl TryFrom for TimelineInfo { +impl TryFrom for TimelineInfoV1 { type Error = anyhow::Error; - fn try_from(other: TimelineInfoResponse) -> anyhow::Result { + fn try_from(other: TimelineInfoResponseV1) -> anyhow::Result { let parse_lsn_hex_string = |lsn_string: String| { lsn_string .parse::() @@ -91,33 +112,68 @@ impl TryFrom for TimelineInfo { let disk_consistent_lsn = parse_lsn_hex_string(other.disk_consistent_lsn)?; Ok(match other.kind.as_str() { - "Local" => TimelineInfo::Local { + "Local" => TimelineInfoV1::Local { timeline_id: other.timeline_id, tenant_id: other.tenant_id, last_record_lsn: other .last_record_lsn - .ok_or(anyhow!("Local timeline should have last_record_lsn")) + .ok_or(anyhow::anyhow!( + "Local timeline should have last_record_lsn" + )) .and_then(parse_lsn_hex_string)?, prev_record_lsn: other .prev_record_lsn - .ok_or(anyhow!("Local timeline should have prev_record_lsn")) - .and_then(parse_lsn_hex_string)?, + .map(parse_lsn_hex_string) + .transpose()?, ancestor_timeline_id: other.ancestor_timeline_id.map(ZTimelineId::from), ancestor_lsn: other.ancestor_lsn.map(parse_lsn_hex_string).transpose()?, disk_consistent_lsn, - current_logical_size: other.current_logical_size.ok_or(anyhow!("No "))?, + current_logical_size: other.current_logical_size, current_logical_size_non_incremental: other.current_logical_size_non_incremental, }, - "Remote" => TimelineInfo::Remote { + "Remote" => TimelineInfoV1::Remote { timeline_id: other.timeline_id, tenant_id: other.tenant_id, disk_consistent_lsn, }, - unknown => bail!("Unknown timeline kind: {}", unknown), + unknown => anyhow::bail!("Unknown timeline kind: {}", unknown), }) } } +fn from_local( + tenant_id: ZTenantId, + timeline_id: ZTimelineId, + local: &LocalTimelineInfo, +) -> TimelineInfoV1 { + TimelineInfoV1::Local { + timeline_id, + tenant_id, + last_record_lsn: local.last_record_lsn, + prev_record_lsn: local.prev_record_lsn, + ancestor_timeline_id: local.ancestor_timeline_id.map(ZTimelineId::from), + ancestor_lsn: local.ancestor_lsn, + disk_consistent_lsn: local.disk_consistent_lsn, + current_logical_size: local.current_logical_size, + current_logical_size_non_incremental: local.current_logical_size_non_incremental, + } +} + +impl From for TimelineInfoV1 { + fn from(t: TimelineInfo) -> Self { + match (t.local.as_ref(), t.remote.as_ref()) { + (None, None) => unreachable!(), + (None, Some(remote)) => TimelineInfoV1::Remote { + timeline_id: t.timeline_id, + tenant_id: t.tenant_id, + disk_consistent_lsn: remote.remote_consistent_lsn.unwrap_or(Lsn(0)), + }, + (Some(local), None) => from_local(t.tenant_id, t.timeline_id, local), + (Some(local), Some(_)) => from_local(t.tenant_id, t.timeline_id, local), + } + } +} + #[derive(Serialize)] pub struct StatusResponse { pub id: ZNodeId, diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index 8365601042..2d913afe4e 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -3,6 +3,7 @@ use std::sync::Arc; use anyhow::Result; use hyper::StatusCode; use hyper::{Body, Request, Response, Uri}; +use tokio::sync::RwLock; use tracing::*; use zenith_utils::auth::JwtAuth; use zenith_utils::http::endpoint::attach_openapi_ui; @@ -16,24 +17,32 @@ use zenith_utils::http::{ request::parse_request_param, }; use zenith_utils::http::{RequestExt, RouterBuilder}; -use zenith_utils::zid::{HexZTenantId, ZTimelineId}; +use zenith_utils::zid::{HexZTenantId, ZTenantTimelineId, ZTimelineId}; use super::models::{ - StatusResponse, TenantCreateRequest, TimelineCreateRequest, TimelineInfoResponse, + StatusResponse, TenantCreateRequest, TimelineCreateRequest, TimelineInfoResponseV1, + TimelineInfoV1, +}; +use crate::remote_storage::{schedule_timeline_download, RemoteTimelineIndex}; +use crate::timelines::{ + extract_remote_timeline_info, LocalTimelineInfo, RemoteTimelineInfo, TimelineInfo, }; -use crate::repository::RepositoryTimeline; -use crate::timelines::TimelineInfo; use crate::{config::PageServerConf, tenant_mgr, timelines, ZTenantId}; #[derive(Debug)] struct State { conf: &'static PageServerConf, auth: Option>, + remote_index: Arc>, allowlist_routes: Vec, } impl State { - fn new(conf: &'static PageServerConf, auth: Option>) -> Self { + fn new( + conf: &'static PageServerConf, + auth: Option>, + remote_index: Arc>, + ) -> Self { let allowlist_routes = ["/v1/status", "/v1/doc", "/swagger.yml"] .iter() .map(|v| v.parse().unwrap()) @@ -42,6 +51,7 @@ impl State { conf, auth, allowlist_routes, + remote_index, } } } @@ -88,7 +98,7 @@ async fn timeline_create_handler(mut request: Request) -> Result json_response(StatusCode::CREATED, TimelineInfoResponse::from(info))?, + Some(info) => json_response(StatusCode::CREATED, info)?, None => json_response(StatusCode::CONFLICT, ())?, }) } @@ -97,15 +107,24 @@ async fn timeline_list_handler(request: Request) -> Result, let tenant_id: ZTenantId = parse_request_param(&request, "tenant_id")?; check_permission(&request, Some(tenant_id))?; let include_non_incremental_logical_size = get_include_non_incremental_logical_size(&request); - let response_data: Vec = tokio::task::spawn_blocking(move || { + let local_timeline_infos = tokio::task::spawn_blocking(move || { let _enter = info_span!("timeline_list", tenant = %tenant_id).entered(); - crate::timelines::get_timelines(tenant_id, include_non_incremental_logical_size) + crate::timelines::get_local_timelines(tenant_id, include_non_incremental_logical_size) }) .await - .map_err(ApiError::from_err)?? - .into_iter() - .map(TimelineInfoResponse::from) - .collect(); + .map_err(ApiError::from_err)??; + + let remote_index = get_state(&request).remote_index.read().await; + let mut response_data = Vec::with_capacity(local_timeline_infos.len()); + for (timeline_id, local_timeline_info) in local_timeline_infos { + response_data.push(TimelineInfo { + tenant_id, + timeline_id, + local: Some(local_timeline_info), + remote: extract_remote_timeline_info(tenant_id, timeline_id, &remote_index), + }) + } + Ok(json_response(StatusCode::OK, response_data)?) } @@ -124,30 +143,76 @@ fn get_include_non_incremental_logical_size(request: &Request) -> bool { .unwrap_or(false) } -async fn timeline_detail_handler(request: Request) -> Result, ApiError> { +// common part for v1 and v2 handlers +async fn timeline_detail_common(request: Request) -> Result { let tenant_id: ZTenantId = parse_request_param(&request, "tenant_id")?; check_permission(&request, Some(tenant_id))?; let timeline_id: ZTimelineId = parse_request_param(&request, "timeline_id")?; + let include_non_incremental_logical_size = get_include_non_incremental_logical_size(&request); - let response_data = tokio::task::spawn_blocking(move || { - let _enter = - info_span!("timeline_detail_handler", tenant = %tenant_id, timeline = %timeline_id) - .entered(); + let span = info_span!("timeline_detail_handler", tenant = %tenant_id, timeline = %timeline_id); + + let (local_timeline_info, span) = tokio::task::spawn_blocking(move || { + let entered = span.entered(); let repo = tenant_mgr::get_repository_for_tenant(tenant_id)?; - let include_non_incremental_logical_size = - get_include_non_incremental_logical_size(&request); - Ok::<_, anyhow::Error>(TimelineInfo::from_repo_timeline( - tenant_id, - repo.get_timeline(timeline_id)?, - include_non_incremental_logical_size, - )) + let local_timeline = { + repo.get_timeline(timeline_id) + .map(|timeline| { + LocalTimelineInfo::from_repo_timeline( + timeline, + include_non_incremental_logical_size, + ) + }) + .transpose()? + }; + Ok::<_, anyhow::Error>((local_timeline, entered.exit())) }) .await - .map_err(ApiError::from_err)? - .map(TimelineInfoResponse::from)?; + .map_err(ApiError::from_err)??; - Ok(json_response(StatusCode::OK, response_data)?) + let remote_timeline_info = { + let remote_index_read = get_state(&request).remote_index.read().await; + remote_index_read + .timeline_entry(&ZTenantTimelineId { + tenant_id, + timeline_id, + }) + .map(|remote_entry| RemoteTimelineInfo { + remote_consistent_lsn: remote_entry.disk_consistent_lsn(), + awaits_download: remote_entry.get_awaits_download(), + }) + }; + + let _enter = span.entered(); + + if local_timeline_info.is_none() && remote_timeline_info.is_none() { + return Err(ApiError::NotFound( + "Timeline is not found neither locally nor remotely".to_string(), + )); + } + + Ok(TimelineInfo { + tenant_id, + timeline_id, + local: local_timeline_info, + remote: remote_timeline_info, + }) +} + +// TODO remove when console adopts v2 +async fn timeline_detail_handler_v1(request: Request) -> Result, ApiError> { + let timeline_info = timeline_detail_common(request).await?; + Ok(json_response( + StatusCode::OK, + TimelineInfoResponseV1::from(TimelineInfoV1::from(timeline_info)), + )?) +} + +async fn timeline_detail_handler_v2(request: Request) -> Result, ApiError> { + let timeline_info = timeline_detail_common(request).await?; + + Ok(json_response(StatusCode::OK, timeline_info)?) } async fn timeline_attach_handler(request: Request) -> Result, ApiError> { @@ -155,31 +220,37 @@ async fn timeline_attach_handler(request: Request) -> Result { - anyhow::bail!("Timeline with id {} is already local", timeline_id) - } - RepositoryTimeline::Remote { - id: _, - disk_consistent_lsn: _, - } => { - // FIXME (rodionov) get timeline already schedules timeline for download, and duplicate tasks can cause errors - // first should be fixed in https://github.com/zenithdb/zenith/issues/997 - // TODO (rodionov) change timeline state to awaits download (incapsulate it somewhere in the repo) - // TODO (rodionov) can we safely request replication on the timeline before sync is completed? (can be implemented on top of the #997) - Ok(()) - } - } + let span = tokio::task::spawn_blocking(move || { + let entered = span.entered(); + if tenant_mgr::get_timeline_for_tenant_load(tenant_id, timeline_id).is_ok() { + anyhow::bail!("Timeline is already present locally") + }; + Ok(entered.exit()) }) .await .map_err(ApiError::from_err)??; + let mut remote_index_write = get_state(&request).remote_index.write().await; + + let _enter = span.entered(); // entered guard cannot live across awaits (non Send) + let index_entry = remote_index_write + .timeline_entry_mut(&ZTenantTimelineId { + tenant_id, + timeline_id, + }) + .ok_or_else(|| ApiError::BadRequest("Unknown remote timeline".to_string()))?; + + if index_entry.get_awaits_download() { + return Err(ApiError::NotFound( + "Timeline download is already in progress".to_string(), + )); + } + + index_entry.set_awaits_download(true); + schedule_timeline_download(tenant_id, timeline_id); + Ok(json_response(StatusCode::ACCEPTED, ())?) } @@ -221,13 +292,17 @@ async fn tenant_create_handler(mut request: Request) -> Result) -> Result, ApiError> { pub fn make_router( conf: &'static PageServerConf, auth: Option>, + remote_index: Arc>, ) -> RouterBuilder { let spec = include_bytes!("openapi_spec.yml"); let mut router = attach_openapi_ui(endpoint::make_router(), spec, "/swagger.yml", "/v1/doc"); @@ -263,7 +339,7 @@ pub fn make_router( } router - .data(Arc::new(State::new(conf, auth))) + .data(Arc::new(State::new(conf, auth, remote_index))) .get("/v1/status", status_handler) .get("/v1/tenant", tenant_list_handler) .post("/v1/tenant", tenant_create_handler) @@ -271,7 +347,11 @@ pub fn make_router( .post("/v1/tenant/:tenant_id/timeline", timeline_create_handler) .get( "/v1/tenant/:tenant_id/timeline/:timeline_id", - timeline_detail_handler, + timeline_detail_handler_v1, + ) + .get( + "/v2/tenant/:tenant_id/timeline/:timeline_id", + timeline_detail_handler_v2, ) .post( "/v1/tenant/:tenant_id/timeline/:timeline_id/attach", diff --git a/pageserver/src/layered_repository.rs b/pageserver/src/layered_repository.rs index 9e0df5dab2..c17df84689 100644 --- a/pageserver/src/layered_repository.rs +++ b/pageserver/src/layered_repository.rs @@ -35,9 +35,9 @@ use self::metadata::{metadata_path, TimelineMetadata, METADATA_FILE_NAME}; use crate::config::PageServerConf; use crate::page_cache; use crate::relish::*; -use crate::remote_storage::{schedule_timeline_checkpoint_upload, schedule_timeline_download}; +use crate::remote_storage::{schedule_timeline_checkpoint_upload, RemoteTimelineIndex}; use crate::repository::{ - BlockNumber, GcResult, Repository, RepositoryTimeline, Timeline, TimelineSyncState, + BlockNumber, GcResult, Repository, RepositoryTimeline, Timeline, TimelineSyncStatusUpdate, TimelineWriter, ZenithWalRecord, }; use crate::thread_mgr; @@ -129,27 +129,46 @@ pub struct LayeredRepository { // timeout... gc_cs: Mutex<()>, walredo_mgr: Arc, + + // provides access to timeline data sitting in the remote storage + // supposed to be used for retrieval of remote consistent lsn in walreceiver + remote_index: Arc>, + /// Makes every timeline to backup their files to remote storage. upload_relishes: bool, } /// Public interface impl Repository for LayeredRepository { - fn get_timeline(&self, timelineid: ZTimelineId) -> Result { - Ok(RepositoryTimeline::from(self.get_or_init_timeline( - timelineid, - &mut self.timelines.lock().unwrap(), - )?)) + fn get_timeline(&self, timelineid: ZTimelineId) -> Option { + let timelines = self.timelines.lock().unwrap(); + self.get_timeline_internal(timelineid, &timelines) + .map(RepositoryTimeline::from) } - fn list_timelines(&self) -> Result> { - Ok(self - .timelines + fn get_timeline_load(&self, timelineid: ZTimelineId) -> Result> { + let mut timelines = self.timelines.lock().unwrap(); + match self.get_timeline_load_internal(timelineid, &mut timelines)? { + Some(local_loaded_timeline) => Ok(local_loaded_timeline as _), + None => anyhow::bail!( + "cannot get local timeline: unknown timeline id: {}", + timelineid + ), + } + } + + fn list_timelines(&self) -> Vec<(ZTimelineId, RepositoryTimeline)> { + self.timelines .lock() .unwrap() - .values() - .map(|timeline_entry| RepositoryTimeline::from(timeline_entry.clone())) - .collect()) + .iter() + .map(|(timeline_id, timeline_entry)| { + ( + *timeline_id, + RepositoryTimeline::from(timeline_entry.clone()), + ) + }) + .collect() } fn create_empty_timeline( @@ -176,10 +195,16 @@ impl Repository for LayeredRepository { self.upload_relishes, ); - let timeline_rc = Arc::new(timeline); - let r = timelines.insert(timelineid, LayeredTimelineEntry::Local(timeline_rc.clone())); - assert!(r.is_none()); - Ok(timeline_rc) + let timeline = Arc::new(timeline); + let r = timelines.insert( + timelineid, + LayeredTimelineEntry::Loaded(Arc::clone(&timeline)), + ); + ensure!( + r.is_none(), + "assertion failure, inserted duplicate timeline" + ); + Ok(timeline) } /// Branch a timeline @@ -190,14 +215,12 @@ impl Repository for LayeredRepository { let _gc_cs = self.gc_cs.lock().unwrap(); let mut timelines = self.timelines.lock().unwrap(); - let src_timeline = match self.get_or_init_timeline(src, &mut timelines)? { - LayeredTimelineEntry::Local(timeline) => timeline, - LayeredTimelineEntry::Remote { .. } => { - bail!("Cannot branch off the timeline {} that's not local", src) - } - }; + let src_timeline = self + .get_timeline_load_internal(src, &mut timelines) + // message about timeline being remote is one .context up in the stack + .context("failed to load timeline for branching")? + .ok_or_else(|| anyhow::anyhow!("unknown timeline id: {}", &src))?; let latest_gc_cutoff_lsn = src_timeline.get_latest_gc_cutoff_lsn(); - src_timeline .check_lsn_is_in_scope(start_lsn, &latest_gc_cutoff_lsn) .context("invalid branch start lsn")?; @@ -232,6 +255,7 @@ impl Repository for LayeredRepository { ); crashsafe_dir::create_dir_all(self.conf.timeline_path(&dst, &self.tenantid))?; Self::save_metadata(self.conf, dst, self.tenantid, &metadata, true)?; + timelines.insert(dst, LayeredTimelineEntry::Unloaded { id: dst, metadata }); info!("branched timeline {} from {} at {}", dst, src, start_lsn); @@ -261,11 +285,19 @@ impl Repository for LayeredRepository { fn checkpoint_iteration(&self, cconf: CheckpointConfig) -> Result<()> { // Scan through the hashmap and collect a list of all the timelines, // while holding the lock. Then drop the lock and actually perform the - // checkpoints. We don't want to block everything else while the + // checkpoints. We don't want to block everything else while the // checkpoint runs. let timelines = self.timelines.lock().unwrap(); let timelines_to_checkpoint = timelines .iter() + // filter to get only loaded timelines + .filter_map(|(timelineid, entry)| match entry { + LayeredTimelineEntry::Loaded(timeline) => Some((timelineid, timeline)), + LayeredTimelineEntry::Unloaded { .. } => { + debug!("Skipping checkpoint for unloaded timeline {}", timelineid); + None + } + }) .map(|(timelineid, timeline)| (*timelineid, timeline.clone())) .collect::>(); drop(timelines); @@ -273,13 +305,7 @@ impl Repository for LayeredRepository { for (timelineid, timeline) in &timelines_to_checkpoint { let _entered = info_span!("checkpoint", timeline = %timelineid, tenant = %self.tenantid).entered(); - match timeline { - LayeredTimelineEntry::Local(timeline) => timeline.checkpoint(cconf)?, - LayeredTimelineEntry::Remote { .. } => debug!( - "Cannot run the checkpoint for remote timeline {}", - timelineid - ), - } + timeline.checkpoint(cconf)?; } Ok(()) @@ -288,32 +314,10 @@ impl Repository for LayeredRepository { // Detaches the timeline from the repository. fn detach_timeline(&self, timeline_id: ZTimelineId) -> Result<()> { let mut timelines = self.timelines.lock().unwrap(); - match timelines.entry(timeline_id) { - Entry::Vacant(_) => { - bail!("cannot detach non existing timeline"); - } - Entry::Occupied(mut entry) => { - let timeline_entry = entry.get_mut(); + if timelines.remove(&timeline_id).is_none() { + bail!("cannot detach timeline that is not available locally"); + } - let timeline = match timeline_entry { - LayeredTimelineEntry::Remote { .. } => { - bail!("cannot detach remote timeline {}", timeline_id); - } - LayeredTimelineEntry::Local(timeline) => timeline, - }; - - // TODO (rodionov) keep local state in timeline itself (refactoring related to https://github.com/zenithdb/zenith/issues/997 and #1104) - - // FIXME this is local disk consistent lsn, need to keep the latest succesfully uploaded checkpoint lsn in timeline (metadata?) - // https://github.com/zenithdb/zenith/issues/1104 - let remote_disk_consistent_lsn = timeline.disk_consistent_lsn.load(); - // reference to timeline is dropped here - entry.insert(LayeredTimelineEntry::Remote { - id: timeline_id, - disk_consistent_lsn: remote_disk_consistent_lsn, - }); - } - }; // Release the lock to shutdown and remove the files without holding it drop(timelines); // shutdown the timeline (this shuts down the walreceiver) @@ -324,158 +328,142 @@ impl Repository for LayeredRepository { Ok(()) } - // TODO this method currentlly does not do anything to prevent (or react to) state updates between a sync task schedule and a sync task end (that causes this update). - // Sync task is enqueued and can error and be rescheduled, so some significant time may pass between the events. - // - /// Reacts on the timeline sync state change, changing pageserver's memory state for this timeline (unload or load of the timeline files). - fn set_timeline_state( + fn apply_timeline_remote_sync_status_update( &self, timeline_id: ZTimelineId, - new_state: TimelineSyncState, + timeline_sync_status_update: TimelineSyncStatusUpdate, ) -> Result<()> { debug!( - "set_timeline_state: timeline_id: {}, new_state: {:?}", - timeline_id, new_state + "apply_timeline_remote_sync_status_update timeline_id: {} update: {:?}", + timeline_id, timeline_sync_status_update ); - let mut timelines_accessor = self.timelines.lock().unwrap(); - - match new_state { - TimelineSyncState::Ready(_) => { - let reloaded_timeline = - self.init_local_timeline(timeline_id, &mut timelines_accessor)?; - timelines_accessor - .insert(timeline_id, LayeredTimelineEntry::Local(reloaded_timeline)); - None + match timeline_sync_status_update { + TimelineSyncStatusUpdate::Uploaded => { /* nothing to do, remote consistent lsn is managed by the remote storage */ } - TimelineSyncState::Evicted(_) => timelines_accessor.remove(&timeline_id), - TimelineSyncState::AwaitsDownload(disk_consistent_lsn) - | TimelineSyncState::CloudOnly(disk_consistent_lsn) => timelines_accessor.insert( - timeline_id, - LayeredTimelineEntry::Remote { - id: timeline_id, - disk_consistent_lsn, - }, - ), - }; - // NOTE we do not delete local data in case timeline became cloud only, this is performed in detach_timeline - drop(timelines_accessor); - + TimelineSyncStatusUpdate::Downloaded => { + match self.timelines.lock().unwrap().entry(timeline_id) { + Entry::Occupied(_) => bail!("We completed a download for a timeline that already exists in repository. This is a bug."), + Entry::Vacant(entry) => { + // we need to get metadata of a timeline, another option is to pass it along with Downloaded status + let metadata = Self::load_metadata(self.conf, timeline_id, self.tenantid).context("failed to load local metadata")?; + // finally we make newly downloaded timeline visible to repository + entry.insert(LayeredTimelineEntry::Unloaded { id: timeline_id, metadata, }) + }, + }; + } + } Ok(()) } - /// Layered repo does not store anything but - /// * local, fully loaded timelines, ready for usage - /// * remote timelines, that need a download task scheduled first before they can be used - /// - /// [`TimelineSyncState::Evicted`] and other non-local and non-remote states are not stored in the layered repo at all, - /// hence their statuses cannot be returned by the repo. - fn get_timeline_state(&self, timeline_id: ZTimelineId) -> Option { - let timelines_accessor = self.timelines.lock().unwrap(); - let timeline_entry = timelines_accessor.get(&timeline_id)?; - Some( - if timeline_entry - .local_or_schedule_download(self.tenantid) - .is_some() - { - TimelineSyncState::Ready(timeline_entry.disk_consistent_lsn()) - } else { - TimelineSyncState::CloudOnly(timeline_entry.disk_consistent_lsn()) - }, - ) + fn get_remote_index(&self) -> &tokio::sync::RwLock { + self.remote_index.as_ref() } } #[derive(Clone)] enum LayeredTimelineEntry { - Local(Arc), - Remote { + Loaded(Arc), + Unloaded { id: ZTimelineId, - /// metadata contents of the latest successfully uploaded checkpoint - disk_consistent_lsn: Lsn, + metadata: TimelineMetadata, }, } impl LayeredTimelineEntry { fn timeline_id(&self) -> ZTimelineId { match self { - LayeredTimelineEntry::Local(timeline) => timeline.timelineid, - LayeredTimelineEntry::Remote { id, .. } => *id, + LayeredTimelineEntry::Loaded(timeline) => timeline.timelineid, + LayeredTimelineEntry::Unloaded { id, .. } => *id, } } - /// Gets local timeline data, if it's present. Otherwise schedules a download fot the remote timeline and returns `None`. - fn local_or_schedule_download(&self, tenant_id: ZTenantId) -> Option<&LayeredTimeline> { + fn ancestor_timeline_id(&self) -> Option { match self { - Self::Local(local) => Some(local.as_ref()), - Self::Remote { - id: timeline_id, .. - } => { - debug!( - "Accessed a remote timeline {} for tenant {}, scheduling a timeline download", - timeline_id, tenant_id - ); - schedule_timeline_download(tenant_id, *timeline_id); - None + LayeredTimelineEntry::Loaded(timeline) => { + timeline.ancestor_timeline.as_ref().map(|t| t.timeline_id()) } + LayeredTimelineEntry::Unloaded { metadata, .. } => metadata.ancestor_timeline(), } } - /// Gets a current (latest for the remote case) disk consistent Lsn for the timeline. - fn disk_consistent_lsn(&self) -> Lsn { + fn ancestor_lsn(&self) -> Lsn { match self { - Self::Local(local) => local.disk_consistent_lsn.load(), - Self::Remote { - disk_consistent_lsn, - .. - } => *disk_consistent_lsn, + LayeredTimelineEntry::Loaded(timeline) => timeline.ancestor_lsn, + LayeredTimelineEntry::Unloaded { metadata, .. } => metadata.ancestor_lsn(), + } + } + + fn ensure_loaded(&self) -> anyhow::Result<&Arc> { + match self { + LayeredTimelineEntry::Loaded(timeline) => Ok(timeline), + LayeredTimelineEntry::Unloaded { .. } => { + anyhow::bail!("timeline is unloaded") + } } } } impl From for RepositoryTimeline { - fn from(layered_timeline: LayeredTimelineEntry) -> Self { - match layered_timeline { - LayeredTimelineEntry::Local(timeline) => RepositoryTimeline::Local { - id: timeline.timelineid, - timeline, - }, - LayeredTimelineEntry::Remote { - id, - disk_consistent_lsn, - } => RepositoryTimeline::Remote { - id, - disk_consistent_lsn, - }, + fn from(entry: LayeredTimelineEntry) -> Self { + match entry { + LayeredTimelineEntry::Loaded(timeline) => RepositoryTimeline::Loaded(timeline as _), + LayeredTimelineEntry::Unloaded { metadata, .. } => { + RepositoryTimeline::Unloaded { metadata } + } } } } /// Private functions impl LayeredRepository { - // Implementation of the public `get_timeline` function. This differs from the public - // interface in that the caller must already hold the mutex on the 'timelines' hashmap. - fn get_or_init_timeline( + // Implementation of the public `get_timeline` function. + // Differences from the public: + // * interface in that the caller must already hold the mutex on the 'timelines' hashmap. + fn get_timeline_internal( + &self, + timelineid: ZTimelineId, + timelines: &HashMap, + ) -> Option { + timelines.get(&timelineid).cloned() + } + + // Implementation of the public `get_timeline_load` function. + // Differences from the public: + // * interface in that the caller must already hold the mutex on the 'timelines' hashmap. + fn get_timeline_load_internal( &self, timelineid: ZTimelineId, timelines: &mut HashMap, - ) -> Result { + ) -> anyhow::Result>> { match timelines.get(&timelineid) { - Some(timeline_entry) => { - let _ = timeline_entry.local_or_schedule_download(self.tenantid); - Ok(timeline_entry.clone()) - } + Some(entry) => match entry { + LayeredTimelineEntry::Loaded(local_timeline) => { + trace!("timeline {} found loaded", &timelineid); + return Ok(Some(Arc::clone(local_timeline))); + } + LayeredTimelineEntry::Unloaded { .. } => { + trace!("timeline {} found unloaded", &timelineid) + } + }, None => { - let timeline = self.init_local_timeline(timelineid, timelines)?; - timelines.insert( - timelineid, - LayeredTimelineEntry::Local(Arc::clone(&timeline)), - ); - Ok(LayeredTimelineEntry::Local(timeline)) + trace!("timeline {} not found", &timelineid); + return Ok(None); } - } + }; + let timeline = self.load_local_timeline(timelineid, timelines)?; + let was_loaded = timelines.insert( + timelineid, + LayeredTimelineEntry::Loaded(Arc::clone(&timeline)), + ); + ensure!( + was_loaded.is_none() + || matches!(was_loaded, Some(LayeredTimelineEntry::Unloaded { .. })), + "assertion failure, inserted wrong timeline in an incorrect state" + ); + Ok(Some(timeline)) } - fn init_local_timeline( + fn load_local_timeline( &self, timelineid: ZTimelineId, timelines: &mut HashMap, @@ -486,8 +474,18 @@ impl LayeredRepository { let ancestor = metadata .ancestor_timeline() - .map(|ancestor_timelineid| self.get_or_init_timeline(ancestor_timelineid, timelines)) - .transpose()?; + .map(|ancestor_timeline_id| { + trace!( + "loading {}'s ancestor {}", + timelineid, + &ancestor_timeline_id + ); + self.get_timeline_load_internal(ancestor_timeline_id, timelines) + }) + .transpose() + .context("cannot load ancestor timeline")? + .flatten() + .map(LayeredTimelineEntry::Loaded); let _enter = info_span!("loading timeline", timeline = %timelineid, tenant = %self.tenantid) .entered(); @@ -513,6 +511,7 @@ impl LayeredRepository { conf: &'static PageServerConf, walredo_mgr: Arc, tenantid: ZTenantId, + remote_index: Arc>, upload_relishes: bool, ) -> LayeredRepository { LayeredRepository { @@ -521,6 +520,7 @@ impl LayeredRepository { timelines: Mutex::new(HashMap::new()), gc_cs: Mutex::new(()), walredo_mgr, + remote_index, upload_relishes, } } @@ -608,86 +608,46 @@ impl LayeredRepository { // grab mutex to prevent new timelines from being created here. let _gc_cs = self.gc_cs.lock().unwrap(); - let mut timelines = self.timelines.lock().unwrap(); - // Scan all timelines. For each timeline, remember the timeline ID and // the branch point where it was created. - // - let mut timelineids: Vec = Vec::new(); - - // We scan the directory, not the in-memory hash table, because the hash - // table only contains entries for timelines that have been accessed. We - // need to take all timelines into account, not only the active ones. - let timelines_path = self.conf.timelines_path(&self.tenantid); - - for direntry in fs::read_dir(timelines_path)? { - let direntry = direntry?; - if let Some(fname) = direntry.file_name().to_str() { - if let Ok(timelineid) = fname.parse::() { - timelineids.push(timelineid); - } - } - } - - // Now collect info about branchpoints let mut all_branchpoints: BTreeSet<(ZTimelineId, Lsn)> = BTreeSet::new(); - for &timelineid in &timelineids { - let timeline = match self.get_or_init_timeline(timelineid, &mut timelines)? { - LayeredTimelineEntry::Local(timeline) => timeline, - LayeredTimelineEntry::Remote { .. } => { - warn!( - "Timeline {} is not local, cannot proceed with gc", - timelineid - ); - return Ok(totals); - } - }; + let mut timeline_ids = Vec::new(); + let mut timelines = self.timelines.lock().unwrap(); - if let Some(ancestor_timeline) = &timeline.ancestor_timeline { - let ancestor_timeline = - match ancestor_timeline.local_or_schedule_download(self.tenantid) { - Some(timeline) => timeline, - None => { - warn!( - "Timeline {} has ancestor {} is not local, cannot proceed with gc", - timelineid, - ancestor_timeline.timeline_id() - ); - return Ok(totals); - } - }; + for (timeline_id, timeline_entry) in timelines.iter() { + timeline_ids.push(*timeline_id); + + // This is unresolved question for now, how to do gc in presense of remote timelines + // especially when this is combined with branching. + // Somewhat related: https://github.com/zenithdb/zenith/issues/999 + if let Some(ancestor_timeline_id) = &timeline_entry.ancestor_timeline_id() { // If target_timeline is specified, we only need to know branchpoints of its children if let Some(timelineid) = target_timelineid { - if ancestor_timeline.timelineid == timelineid { + if ancestor_timeline_id == &timelineid { all_branchpoints - .insert((ancestor_timeline.timelineid, timeline.ancestor_lsn)); + .insert((*ancestor_timeline_id, timeline_entry.ancestor_lsn())); } } // Collect branchpoints for all timelines else { - all_branchpoints.insert((ancestor_timeline.timelineid, timeline.ancestor_lsn)); + all_branchpoints.insert((*ancestor_timeline_id, timeline_entry.ancestor_lsn())); } } } // Ok, we now know all the branch points. // Perform GC for each timeline. - for timelineid in timelineids { + for timelineid in timeline_ids.into_iter() { if thread_mgr::is_shutdown_requested() { // We were requested to shut down. Stop and return with the progress we // made. break; } - // We have already loaded all timelines above - // so this operation is just a quick map lookup. - let timeline = match self.get_or_init_timeline(timelineid, &mut *timelines)? { - LayeredTimelineEntry::Local(timeline) => timeline, - LayeredTimelineEntry::Remote { .. } => { - debug!("Skipping GC for non-local timeline {}", timelineid); - continue; - } - }; + // Timeline is known to be local and loaded. + let timeline = self + .get_timeline_load_internal(timelineid, &mut *timelines)? + .expect("checked above that timeline is local and loaded"); // If target_timeline is specified, only GC it if let Some(target_timelineid) = target_timelineid { @@ -989,13 +949,13 @@ impl Timeline for LayeredTimeline { match &timeline.ancestor_timeline { None => break, Some(ancestor_entry) => { - match ancestor_entry.local_or_schedule_download(self.tenantid) { - Some(ancestor) => { - timeline = ancestor; - continue; - } - None => bail!("Cannot list relishes for timeline {} tenant {} due to its ancestor being remote only", self.timelineid, self.tenantid), - } + timeline = ancestor_entry.ensure_loaded().with_context( + || format!( + "cannot list relishes for timeline {} tenant {} due to its ancestor {} being either unloaded", + self.timelineid, self.tenantid, ancestor_entry.timeline_id(), + ) + )?; + continue; } } } @@ -1313,19 +1273,15 @@ impl LayeredTimeline { while lsn < timeline.ancestor_lsn { trace!("going into ancestor {} ", timeline.ancestor_lsn); - timeline = match timeline - .ancestor_timeline - .as_ref() - .and_then(|ancestor_entry| ancestor_entry.local_or_schedule_download(self.tenantid)) - { - Some(timeline) => timeline, - None => { - bail!( - "Cannot get the whole layer for read locked: timeline {} is not present locally", - self.timelineid - ) - } - }; + timeline = timeline + .ancestor_timeline + .as_ref() + .expect("there should be an ancestor") + .ensure_loaded() + .with_context(|| format!( + "Cannot get the whole layer for read locked: timeline {} is not present locally", + self.get_ancestor_timeline_id().unwrap()) + )?; } // Now we have the right starting timeline for our search. @@ -1366,18 +1322,13 @@ impl LayeredTimeline { // If not, check if there's a layer on the ancestor timeline match &timeline.ancestor_timeline { Some(ancestor_entry) => { - match ancestor_entry.local_or_schedule_download(self.tenantid) { - Some(ancestor) => { - lsn = timeline.ancestor_lsn; - timeline = ancestor; - trace!("recursing into ancestor at {}/{}", timeline.timelineid, lsn); - continue; - } - None => bail!( - "Cannot get a layer for read from remote ancestor timeline {}", - self.timelineid - ), - } + let ancestor = ancestor_entry + .ensure_loaded() + .context("cannot get a layer for read from ancestor because it is either remote or unloaded")?; + lsn = timeline.ancestor_lsn; + timeline = ancestor; + trace!("recursing into ancestor at {}/{}", timeline.timelineid, lsn); + continue; } None => return Ok(None), } @@ -1501,7 +1452,6 @@ impl LayeredTimeline { fn checkpoint_internal(&self, checkpoint_distance: u64, reconstruct_pages: bool) -> Result<()> { // Prevent concurrent checkpoints let _checkpoint_cs = self.checkpoint_cs.lock().unwrap(); - let write_guard = self.write_lock.lock().unwrap(); let mut layers = self.layers.lock().unwrap(); @@ -1862,10 +1812,10 @@ impl LayeredTimeline { ); } // Now check ancestor timelines, if any are present locally - else if let Some(ancestor) = - self.ancestor_timeline.as_ref().and_then(|timeline_entry| { - timeline_entry.local_or_schedule_download(self.tenantid) - }) + else if let Some(ancestor) = self + .ancestor_timeline + .as_ref() + .and_then(|timeline_entry| timeline_entry.ensure_loaded().ok()) { let prior_lsn = ancestor.get_last_record_lsn(); if seg.rel.is_blocky() { @@ -2435,9 +2385,8 @@ mod tests { metadata_bytes[512 - 4 - 2] ^= 1; std::fs::write(metadata_path, metadata_bytes)?; - let new_repo = harness.load(); - let err = new_repo.get_timeline(TIMELINE_ID).err().unwrap(); - assert_eq!(err.to_string(), "failed to load metadata"); + let err = harness.try_load().err().expect("should fail"); + assert_eq!(err.to_string(), "failed to load local metadata"); assert_eq!( err.source().unwrap().to_string(), "metadata checksum mismatch" @@ -2527,7 +2476,7 @@ mod tests { // Load the timeline. This will cause the files in the "future" to be renamed // away. let new_repo = harness.load(); - new_repo.get_timeline(TIMELINE_ID).unwrap(); + new_repo.get_timeline_load(TIMELINE_ID).unwrap(); drop(new_repo); for filename in future_filenames.iter() { @@ -2544,7 +2493,7 @@ mod tests { } let new_repo = harness.load(); - new_repo.get_timeline(TIMELINE_ID).unwrap(); + new_repo.get_timeline_load(TIMELINE_ID).unwrap(); drop(new_repo); for filename in future_filenames.iter() { diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index 42a099cca5..6e6b6415f3 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -322,8 +322,8 @@ impl PageServerHandler { let _enter = info_span!("pagestream", timeline = %timelineid, tenant = %tenantid).entered(); // Check that the timeline exists - let timeline = tenant_mgr::get_timeline_for_tenant(tenantid, timelineid) - .context("Cannot handle pagerequests for a remote timeline")?; + let timeline = tenant_mgr::get_timeline_for_tenant_load(tenantid, timelineid) + .context("Cannot load local timeline")?; /* switch client to COPYBOTH */ pgb.write_message(&BeMessage::CopyBothResponse)?; @@ -520,8 +520,8 @@ impl PageServerHandler { let _enter = span.enter(); // check that the timeline exists - let timeline = tenant_mgr::get_timeline_for_tenant(tenantid, timelineid) - .context("Cannot handle basebackup request for a remote timeline")?; + let timeline = tenant_mgr::get_timeline_for_tenant_load(tenantid, timelineid) + .context("Cannot load local timeline")?; let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn(); if let Some(lsn) = lsn { timeline @@ -655,8 +655,8 @@ impl postgres_backend::Handler for PageServerHandler { info_span!("callmemaybe", timeline = %timelineid, tenant = %tenantid).entered(); // Check that the timeline exists - tenant_mgr::get_timeline_for_tenant(tenantid, timelineid) - .context("Failed to fetch local timeline for callmemaybe requests")?; + tenant_mgr::get_timeline_for_tenant_load(tenantid, timelineid) + .context("Cannot load local timeline")?; walreceiver::launch_wal_receiver(self.conf, tenantid, timelineid, &connstr)?; @@ -778,8 +778,8 @@ impl postgres_backend::Handler for PageServerHandler { let tenantid = ZTenantId::from_str(caps.get(1).unwrap().as_str())?; let timelineid = ZTimelineId::from_str(caps.get(2).unwrap().as_str())?; - let timeline = tenant_mgr::get_timeline_for_tenant(tenantid, timelineid) - .context("Failed to fetch local timeline for checkpoint request")?; + let timeline = tenant_mgr::get_timeline_for_tenant_load(tenantid, timelineid) + .context("Cannot load local timeline")?; timeline.checkpoint(CheckpointConfig::Forced)?; pgb.write_message_noflush(&SINGLE_COL_ROWDESC)? diff --git a/pageserver/src/remote_storage.rs b/pageserver/src/remote_storage.rs index 4af1f8ed56..08fb16a679 100644 --- a/pageserver/src/remote_storage.rs +++ b/pageserver/src/remote_storage.rs @@ -89,32 +89,38 @@ use std::{ collections::HashMap, ffi, fs, path::{Path, PathBuf}, + sync::Arc, }; use anyhow::{bail, Context}; -use tokio::io; +use tokio::{io, sync::RwLock}; use tracing::{error, info}; use zenith_utils::zid::{ZTenantId, ZTenantTimelineId, ZTimelineId}; +pub use self::storage_sync::index::{RemoteTimelineIndex, TimelineIndexEntry}; pub use self::storage_sync::{schedule_timeline_checkpoint_upload, schedule_timeline_download}; use self::{local_fs::LocalFs, rust_s3::S3}; use crate::{ config::{PageServerConf, RemoteStorageKind}, layered_repository::metadata::{TimelineMetadata, METADATA_FILE_NAME}, - repository::TimelineSyncState, }; pub use storage_sync::compression; +#[derive(Clone, Copy, Debug)] +pub enum LocalTimelineInitStatus { + LocallyComplete, + NeedsSync, +} + +type LocalTimelineInitStatuses = HashMap>; + /// A structure to combine all synchronization data to share with pageserver after a successful sync loop initialization. /// Successful initialization includes a case when sync loop is not started, in which case the startup data is returned still, /// to simplify the received code. pub struct SyncStartupData { - /// A sync state, derived from initial comparison of local timeline files and the remote archives, - /// before any sync tasks are executed. - /// To reuse the local file scan logic, the timeline states are returned even if no sync loop get started during init: - /// in this case, no remote files exist and all local timelines with correct metadata files are considered ready. - pub initial_timeline_states: HashMap>, + pub remote_index: Arc>, + pub local_timeline_init_statuses: LocalTimelineInitStatuses, } /// Based on the config, initiates the remote storage connection and starts a separate thread @@ -154,23 +160,18 @@ pub fn start_local_timeline_sync( .context("Failed to spawn the storage sync thread"), None => { info!("No remote storage configured, skipping storage sync, considering all local timelines with correct metadata files enabled"); - let mut initial_timeline_states: HashMap< - ZTenantId, - HashMap, - > = HashMap::new(); - for (ZTenantTimelineId{tenant_id, timeline_id}, (timeline_metadata, _)) in + let mut local_timeline_init_statuses = LocalTimelineInitStatuses::new(); + for (ZTenantTimelineId { tenant_id, timeline_id }, _) in local_timeline_files { - initial_timeline_states + local_timeline_init_statuses .entry(tenant_id) .or_default() - .insert( - timeline_id, - TimelineSyncState::Ready(timeline_metadata.disk_consistent_lsn()), - ); + .insert(timeline_id, LocalTimelineInitStatus::LocallyComplete); } Ok(SyncStartupData { - initial_timeline_states, + local_timeline_init_statuses, + remote_index: Arc::new(RwLock::new(RemoteTimelineIndex::empty())), }) } } diff --git a/pageserver/src/remote_storage/storage_sync.rs b/pageserver/src/remote_storage/storage_sync.rs index d14f849e15..f1483375cb 100644 --- a/pageserver/src/remote_storage/storage_sync.rs +++ b/pageserver/src/remote_storage/storage_sync.rs @@ -58,7 +58,7 @@ //! Synchronization never removes any local from pageserver workdir or remote files from the remote storage, yet there could be overwrites of the same files (metadata file updates; future checksum mismatch fixes). //! NOTE: No real contents or checksum check happens right now and is a subject to improve later. //! -//! After the whole timeline is downloaded, [`crate::tenant_mgr::set_timeline_states`] function is used to update pageserver memory stage for the timeline processed. +//! After the whole timeline is downloaded, [`crate::tenant_mgr::apply_timeline_sync_status_updates`] function is used to update pageserver memory stage for the timeline processed. //! //! When pageserver signals shutdown, current sync task gets finished and the loop exists. @@ -93,17 +93,25 @@ use self::{ download::{download_timeline, DownloadedTimeline}, index::{ ArchiveDescription, ArchiveId, RemoteTimeline, RemoteTimelineIndex, TimelineIndexEntry, + TimelineIndexEntryInner, }, upload::upload_timeline_checkpoint, }; -use super::{RemoteStorage, SyncStartupData, ZTenantTimelineId}; +use super::{ + LocalTimelineInitStatus, LocalTimelineInitStatuses, RemoteStorage, SyncStartupData, + ZTenantTimelineId, +}; use crate::{ config::PageServerConf, layered_repository::metadata::TimelineMetadata, - remote_storage::storage_sync::compression::read_archive_header, repository::TimelineSyncState, - tenant_mgr::set_timeline_states, thread_mgr, thread_mgr::ThreadKind, + remote_storage::storage_sync::compression::read_archive_header, + repository::TimelineSyncStatusUpdate, tenant_mgr::apply_timeline_sync_status_updates, + thread_mgr, thread_mgr::ThreadKind, }; -use zenith_metrics::{register_histogram_vec, register_int_gauge, HistogramVec, IntGauge}; +use zenith_metrics::{ + register_histogram_vec, register_int_counter, register_int_gauge, HistogramVec, IntCounter, + IntGauge, +}; use zenith_utils::zid::{ZTenantId, ZTimelineId}; lazy_static! { @@ -112,6 +120,11 @@ lazy_static! { "Number of storage sync items left in the queue" ) .expect("failed to register pageserver remote storage remaining sync items int gauge"); + static ref FATAL_TASK_FAILURES: IntCounter = register_int_counter!( + "pageserver_remote_storage_fatal_task_failures", + "Number of critically failed tasks" + ) + .expect("failed to register pageserver remote storage remaining sync items int gauge"); static ref IMAGE_SYNC_TIME: HistogramVec = register_histogram_vec!( "pageserver_remote_storage_image_sync_time", "Time took to synchronize (download or upload) a whole pageserver image. \ @@ -379,10 +392,13 @@ pub(super) fn spawn_storage_sync_thread< None } }); - let remote_index = RemoteTimelineIndex::try_parse_descriptions_from_paths(conf, download_paths); - - let initial_timeline_states = schedule_first_sync_tasks(&remote_index, local_timeline_files); + let mut remote_index = + RemoteTimelineIndex::try_parse_descriptions_from_paths(conf, download_paths); + let local_timeline_init_statuses = + schedule_first_sync_tasks(&mut remote_index, local_timeline_files); + let remote_index = Arc::new(RwLock::new(remote_index)); + let remote_index_cloned = Arc::clone(&remote_index); thread_mgr::spawn( ThreadKind::StorageSync, None, @@ -393,7 +409,7 @@ pub(super) fn spawn_storage_sync_thread< runtime, conf, receiver, - remote_index, + remote_index_cloned, storage, max_concurrent_sync, max_sync_errors, @@ -402,12 +418,13 @@ pub(super) fn spawn_storage_sync_thread< ) .context("Failed to spawn remote storage sync thread")?; Ok(SyncStartupData { - initial_timeline_states, + remote_index, + local_timeline_init_statuses, }) } enum LoopStep { - NewStates(HashMap>), + SyncStatusUpdates(HashMap>), Shutdown, } @@ -419,13 +436,14 @@ fn storage_sync_loop< runtime: Runtime, conf: &'static PageServerConf, mut receiver: UnboundedReceiver, - index: RemoteTimelineIndex, + index: Arc>, storage: S, max_concurrent_sync: NonZeroUsize, max_sync_errors: NonZeroU32, ) -> anyhow::Result<()> { - let remote_assets = Arc::new((storage, RwLock::new(index))); + let remote_assets = Arc::new((storage, Arc::clone(&index))); loop { + let index = Arc::clone(&index); let loop_step = runtime.block_on(async { tokio::select! { new_timeline_states = loop_step( @@ -435,15 +453,15 @@ fn storage_sync_loop< max_concurrent_sync, max_sync_errors, ) - .instrument(debug_span!("storage_sync_loop_step")) => LoopStep::NewStates(new_timeline_states), + .instrument(debug_span!("storage_sync_loop_step")) => LoopStep::SyncStatusUpdates(new_timeline_states), _ = thread_mgr::shutdown_watcher() => LoopStep::Shutdown, } }); match loop_step { - LoopStep::NewStates(new_timeline_states) => { + LoopStep::SyncStatusUpdates(new_timeline_states) => { // Batch timeline download registration to ensure that the external registration code won't block any running tasks before. - set_timeline_states(conf, new_timeline_states); + apply_timeline_sync_status_updates(conf, index, new_timeline_states); debug!("Sync loop step completed"); } LoopStep::Shutdown => { @@ -462,10 +480,10 @@ async fn loop_step< >( conf: &'static PageServerConf, receiver: &mut UnboundedReceiver, - remote_assets: Arc<(S, RwLock)>, + remote_assets: Arc<(S, Arc>)>, max_concurrent_sync: NonZeroUsize, max_sync_errors: NonZeroU32, -) -> HashMap> { +) -> HashMap> { let max_concurrent_sync = max_concurrent_sync.get(); let mut next_tasks = BTreeSet::new(); @@ -516,8 +534,10 @@ async fn loop_step< }) .collect::>(); - let mut new_timeline_states: HashMap> = - HashMap::with_capacity(max_concurrent_sync); + let mut new_timeline_states: HashMap< + ZTenantId, + HashMap, + > = HashMap::with_capacity(max_concurrent_sync); while let Some((sync_id, state_update)) = task_batch.next().await { debug!("Finished storage sync task for sync id {}", sync_id); if let Some(state_update) = state_update { @@ -540,24 +560,19 @@ async fn process_task< S: RemoteStorage + Send + Sync + 'static, >( conf: &'static PageServerConf, - remote_assets: Arc<(S, RwLock)>, + remote_assets: Arc<(S, Arc>)>, task: SyncTask, max_sync_errors: NonZeroU32, -) -> Option { +) -> Option { if task.retries > max_sync_errors.get() { error!( "Evicting task {:?} that failed {} times, exceeding the error threshold", task.kind, task.retries ); - return Some(TimelineSyncState::Evicted( - remote_assets - .as_ref() - .1 - .read() - .await - .timeline_entry(&task.sync_id) - .and_then(TimelineIndexEntry::disk_consistent_lsn), - )); + FATAL_TASK_FAILURES.inc(); + // FIXME (rodionov) this can potentially leave holes in timeline uploads + // planneed to be fixed as part of https://github.com/zenithdb/zenith/issues/977 + return None; } if task.retries > 0 { @@ -569,6 +584,8 @@ async fn process_task< tokio::time::sleep(Duration::from_secs_f64(seconds_to_wait)).await; } + let remote_index = Arc::clone(&remote_assets.1); + let sync_start = Instant::now(); let sync_name = task.kind.sync_name(); match task.kind { @@ -585,19 +602,25 @@ async fn process_task< match download_result { DownloadedTimeline::Abort => { register_sync_status(sync_start, sync_name, None); + remote_index + .write() + .await + .set_awaits_download(&task.sync_id, false) + .expect("timeline should be present in remote index"); None } - DownloadedTimeline::FailedAndRescheduled { - disk_consistent_lsn, - } => { + DownloadedTimeline::FailedAndRescheduled => { register_sync_status(sync_start, sync_name, Some(false)); - Some(TimelineSyncState::AwaitsDownload(disk_consistent_lsn)) + None } - DownloadedTimeline::Successful { - disk_consistent_lsn, - } => { + DownloadedTimeline::Successful => { register_sync_status(sync_start, sync_name, Some(true)); - Some(TimelineSyncState::Ready(disk_consistent_lsn)) + remote_index + .write() + .await + .set_awaits_download(&task.sync_id, false) + .expect("timeline should be present in remote index"); + Some(TimelineSyncStatusUpdate::Downloaded) } } } @@ -617,45 +640,45 @@ async fn process_task< } fn schedule_first_sync_tasks( - index: &RemoteTimelineIndex, + index: &mut RemoteTimelineIndex, local_timeline_files: HashMap)>, -) -> HashMap> { - let mut initial_timeline_statuses: HashMap> = - HashMap::new(); +) -> LocalTimelineInitStatuses { + let mut local_timeline_init_statuses = LocalTimelineInitStatuses::new(); let mut new_sync_tasks = VecDeque::with_capacity(local_timeline_files.len().max(local_timeline_files.len())); for (sync_id, (local_metadata, local_files)) in local_timeline_files { - let local_disk_consistent_lsn = local_metadata.disk_consistent_lsn(); - let ZTenantTimelineId { tenant_id, timeline_id, } = sync_id; - match index.timeline_entry(&sync_id) { + match index.timeline_entry_mut(&sync_id) { Some(index_entry) => { - let timeline_status = compare_local_and_remote_timeline( + let (timeline_status, awaits_download) = compare_local_and_remote_timeline( &mut new_sync_tasks, sync_id, local_metadata, local_files, index_entry, ); - match timeline_status { - Some(timeline_status) => { - initial_timeline_statuses - .entry(tenant_id) - .or_default() - .insert(timeline_id, timeline_status); - } - None => error!( - "Failed to compare local and remote timeline for task {}", - sync_id - ), + let was_there = local_timeline_init_statuses + .entry(tenant_id) + .or_default() + .insert(timeline_id, timeline_status); + + if was_there.is_some() { + // defensive check + warn!( + "Overwriting timeline init sync status. Status {:?} Timeline {}", + timeline_status, timeline_id + ); } + index_entry.set_awaits_download(awaits_download); } None => { + // TODO (rodionov) does this mean that we've crashed during tenant creation? + // is it safe to upload this checkpoint? could it be half broken? new_sync_tasks.push_back(SyncTask::new( sync_id, 0, @@ -664,56 +687,18 @@ fn schedule_first_sync_tasks( metadata: local_metadata, }), )); - initial_timeline_statuses + local_timeline_init_statuses .entry(tenant_id) .or_default() - .insert( - timeline_id, - TimelineSyncState::Ready(local_disk_consistent_lsn), - ); + .insert(timeline_id, LocalTimelineInitStatus::LocallyComplete); } } } - let unprocessed_remote_ids = |remote_id: &ZTenantTimelineId| { - initial_timeline_statuses - .get(&remote_id.tenant_id) - .and_then(|timelines| timelines.get(&remote_id.timeline_id)) - .is_none() - }; - for unprocessed_remote_id in index - .all_sync_ids() - .filter(unprocessed_remote_ids) - .collect::>() - { - let ZTenantTimelineId { - tenant_id: cloud_only_tenant_id, - timeline_id: cloud_only_timeline_id, - } = unprocessed_remote_id; - match index - .timeline_entry(&unprocessed_remote_id) - .and_then(TimelineIndexEntry::disk_consistent_lsn) - { - Some(remote_disk_consistent_lsn) => { - initial_timeline_statuses - .entry(cloud_only_tenant_id) - .or_default() - .insert( - cloud_only_timeline_id, - TimelineSyncState::CloudOnly(remote_disk_consistent_lsn), - ); - } - None => error!( - "Failed to find disk consistent LSN for remote timeline {}", - unprocessed_remote_id - ), - } - } - new_sync_tasks.into_iter().for_each(|task| { sync_queue::push(task); }); - initial_timeline_statuses + local_timeline_init_statuses } fn compare_local_and_remote_timeline( @@ -722,10 +707,21 @@ fn compare_local_and_remote_timeline( local_metadata: TimelineMetadata, local_files: Vec, remote_entry: &TimelineIndexEntry, -) -> Option { +) -> (LocalTimelineInitStatus, bool) { let local_lsn = local_metadata.disk_consistent_lsn(); let uploads = remote_entry.uploaded_checkpoints(); + let mut initial_timeline_status = LocalTimelineInitStatus::LocallyComplete; + + let mut awaits_download = false; + // TODO probably here we need more sophisticated logic, + // if more data is available remotely can we just download whats there? + // without trying to upload something. It may be tricky, needs further investigation. + // For now looks strange that we can request upload + // and dowload for the same timeline simultaneously. + // (upload needs to be only for previously unsynced files, not whole timeline dir). + // If one of the tasks fails they will be reordered in the queue which can lead + // to timeline being stuck in evicted state if !uploads.contains(&local_lsn) { new_sync_tasks.push_back(SyncTask::new( sync_id, @@ -735,6 +731,7 @@ fn compare_local_and_remote_timeline( metadata: local_metadata, }), )); + // Note that status here doesnt change. } let uploads_count = uploads.len(); @@ -743,7 +740,7 @@ fn compare_local_and_remote_timeline( .filter(|upload_lsn| upload_lsn <= &local_lsn) .map(ArchiveId) .collect(); - Some(if archives_to_skip.len() != uploads_count { + if archives_to_skip.len() != uploads_count { new_sync_tasks.push_back(SyncTask::new( sync_id, 0, @@ -752,10 +749,12 @@ fn compare_local_and_remote_timeline( archives_to_skip, }), )); - TimelineSyncState::AwaitsDownload(remote_entry.disk_consistent_lsn()?) - } else { - TimelineSyncState::Ready(remote_entry.disk_consistent_lsn().unwrap_or(local_lsn)) - }) + initial_timeline_status = LocalTimelineInitStatus::NeedsSync; + awaits_download = true; + // we do not need to manupulate with remote consistent lsn here + // because it will be updated when sync will be completed + } + (initial_timeline_status, awaits_download) } fn register_sync_status(sync_start: Instant, sync_name: &str, sync_status: Option) { @@ -769,21 +768,23 @@ fn register_sync_status(sync_start: Instant, sync_name: &str, sync_status: Optio .observe(secs_elapsed) } -async fn update_index_description< +async fn fetch_full_index< P: Send + Sync + 'static, S: RemoteStorage + Send + Sync + 'static, >( - (storage, index): &(S, RwLock), + (storage, index): &(S, Arc>), timeline_dir: &Path, id: ZTenantTimelineId, ) -> anyhow::Result { - let mut index_write = index.write().await; - let full_index = match index_write.timeline_entry(&id) { + let index_read = index.read().await; + let full_index = match index_read.timeline_entry(&id).map(|e| e.inner()) { None => bail!("Timeline not found for sync id {}", id), - Some(TimelineIndexEntry::Full(_)) => bail!("Index is already populated for sync id {}", id), - Some(TimelineIndexEntry::Description(description)) => { + Some(TimelineIndexEntryInner::Full(_)) => { + bail!("Index is already populated for sync id {}", id) + } + Some(TimelineIndexEntryInner::Description(description)) => { let mut archive_header_downloads = FuturesUnordered::new(); - for (&archive_id, description) in description { + for (archive_id, description) in description { archive_header_downloads.push(async move { let header = download_archive_header(storage, timeline_dir, description) .await @@ -795,18 +796,22 @@ async fn update_index_description< let mut full_index = RemoteTimeline::empty(); while let Some(header_data) = archive_header_downloads.next().await { match header_data { - Ok((archive_id, header_size, header)) => full_index.update_archive_contents(archive_id.0, header, header_size), - Err((e, archive_id)) => bail!( - "Failed to download archive header for tenant {}, timeline {}, archive for Lsn {}: {}", - id.tenant_id, id.timeline_id, archive_id.0, - e - ), - } + Ok((archive_id, header_size, header)) => full_index.update_archive_contents(archive_id.0, header, header_size), + Err((e, archive_id)) => bail!( + "Failed to download archive header for tenant {}, timeline {}, archive for Lsn {}: {}", + id.tenant_id, id.timeline_id, archive_id.0, + e + ), + } } full_index } }; - index_write.add_timeline_entry(id, TimelineIndexEntry::Full(full_index.clone())); + drop(index_read); // tokio rw lock is not upgradeable + let mut index_write = index.write().await; + index_write + .upgrade_timeline_entry(&id, full_index.clone()) + .context("cannot upgrade timeline entry in remote index")?; Ok(full_index) } @@ -850,7 +855,7 @@ mod test_utils { #[track_caller] pub async fn ensure_correct_timeline_upload( harness: &RepoHarness, - remote_assets: Arc<(LocalFs, RwLock)>, + remote_assets: Arc<(LocalFs, Arc>)>, timeline_id: ZTimelineId, new_upload: NewCheckpoint, ) { @@ -909,11 +914,14 @@ mod test_utils { } pub async fn expect_timeline( - index: &RwLock, + index: &Arc>, sync_id: ZTenantTimelineId, ) -> RemoteTimeline { - if let Some(TimelineIndexEntry::Full(remote_timeline)) = - index.read().await.timeline_entry(&sync_id) + if let Some(TimelineIndexEntryInner::Full(remote_timeline)) = index + .read() + .await + .timeline_entry(&sync_id) + .map(|e| e.inner()) { remote_timeline.clone() } else { @@ -926,7 +934,7 @@ mod test_utils { #[track_caller] pub async fn assert_index_descriptions( - index: &RwLock, + index: &Arc>, expected_index_with_descriptions: RemoteTimelineIndex, ) { let index_read = index.read().await; @@ -965,26 +973,26 @@ mod test_utils { sync_id ) }); - let expected_timeline_description = match expected_timeline_description { - TimelineIndexEntry::Description(description) => description, - TimelineIndexEntry::Full(_) => panic!("Expected index entry for sync id {} is a full entry, while a description was expected", sync_id), + let expected_timeline_description = match expected_timeline_description.inner() { + TimelineIndexEntryInner::Description(description) => description, + TimelineIndexEntryInner::Full(_) => panic!("Expected index entry for sync id {} is a full entry, while a description was expected", sync_id), }; - match actual_timeline_entry { - TimelineIndexEntry::Description(actual_descriptions) => { + match actual_timeline_entry.inner() { + TimelineIndexEntryInner::Description(description) => { assert_eq!( - actual_descriptions, expected_timeline_description, + description, expected_timeline_description, "Index contains unexpected descriptions entry for sync id {}", sync_id ) } - TimelineIndexEntry::Full(actual_full_entry) => { + TimelineIndexEntryInner::Full(remote_timeline) => { let expected_lsns = expected_timeline_description .values() .map(|description| description.disk_consistent_lsn) .collect::>(); assert_eq!( - actual_full_entry.checkpoints().collect::>(), + remote_timeline.checkpoints().collect::>(), expected_lsns, "Timeline {} should have the same checkpoints uploaded", sync_id, diff --git a/pageserver/src/remote_storage/storage_sync/download.rs b/pageserver/src/remote_storage/storage_sync/download.rs index 00115ba8d5..e5362b2973 100644 --- a/pageserver/src/remote_storage/storage_sync/download.rs +++ b/pageserver/src/remote_storage/storage_sync/download.rs @@ -5,14 +5,14 @@ use std::{borrow::Cow, collections::BTreeSet, path::PathBuf, sync::Arc}; use anyhow::{ensure, Context}; use tokio::{fs, sync::RwLock}; use tracing::{debug, error, trace, warn}; -use zenith_utils::{lsn::Lsn, zid::ZTenantId}; +use zenith_utils::zid::ZTenantId; use crate::{ config::PageServerConf, layered_repository::metadata::{metadata_path, TimelineMetadata}, remote_storage::{ storage_sync::{ - compression, index::TimelineIndexEntry, sync_queue, update_index_description, SyncKind, + compression, fetch_full_index, index::TimelineIndexEntryInner, sync_queue, SyncKind, SyncTask, }, RemoteStorage, ZTenantTimelineId, @@ -30,10 +30,10 @@ pub(super) enum DownloadedTimeline { Abort, /// Remote timeline data is found, its latest checkpoint's metadata contents (disk_consistent_lsn) is known. /// Initial download failed due to some error, the download task is rescheduled for another retry. - FailedAndRescheduled { disk_consistent_lsn: Lsn }, + FailedAndRescheduled, /// Remote timeline data is found, its latest checkpoint's metadata contents (disk_consistent_lsn) is known. /// Initial download successful. - Successful { disk_consistent_lsn: Lsn }, + Successful, } /// Attempts to download and uncompress files from all remote archives for the timeline given. @@ -47,7 +47,7 @@ pub(super) async fn download_timeline< S: RemoteStorage + Send + Sync + 'static, >( conf: &'static PageServerConf, - remote_assets: Arc<(S, RwLock)>, + remote_assets: Arc<(S, Arc>)>, sync_id: ZTenantTimelineId, mut download: TimelineDownload, retries: u32, @@ -58,19 +58,26 @@ pub(super) async fn download_timeline< tenant_id, timeline_id, } = sync_id; - let index_read = remote_assets.1.read().await; + let index = &remote_assets.1; + + let index_read = index.read().await; let remote_timeline = match index_read.timeline_entry(&sync_id) { None => { - error!("Cannot download: no timeline is present in the index for given ids"); + error!("Cannot download: no timeline is present in the index for given id"); return DownloadedTimeline::Abort; } - Some(index_entry) => match index_entry { - TimelineIndexEntry::Full(remote_timeline) => Cow::Borrowed(remote_timeline), - TimelineIndexEntry::Description(_) => { + + Some(index_entry) => match index_entry.inner() { + TimelineIndexEntryInner::Full(remote_timeline) => Cow::Borrowed(remote_timeline), + TimelineIndexEntryInner::Description(_) => { + // we do not check here for awaits_download because it is ok + // to call this function while the download is in progress + // so it is not a concurrent download, it is the same one + let remote_disk_consistent_lsn = index_entry.disk_consistent_lsn(); drop(index_read); debug!("Found timeline description for the given ids, downloading the full index"); - match update_index_description( + match fetch_full_index( remote_assets.as_ref(), &conf.timeline_path(&timeline_id, &tenant_id), sync_id, @@ -80,16 +87,15 @@ pub(super) async fn download_timeline< Ok(remote_timeline) => Cow::Owned(remote_timeline), Err(e) => { error!("Failed to download full timeline index: {:?}", e); + return match remote_disk_consistent_lsn { - Some(disk_consistent_lsn) => { + Some(_) => { sync_queue::push(SyncTask::new( sync_id, retries, SyncKind::Download(download), )); - DownloadedTimeline::FailedAndRescheduled { - disk_consistent_lsn, - } + DownloadedTimeline::FailedAndRescheduled } None => { error!("Cannot download: no disk consistent Lsn is present for the index entry"); @@ -101,12 +107,9 @@ pub(super) async fn download_timeline< } }, }; - let disk_consistent_lsn = match remote_timeline.checkpoints().max() { - Some(lsn) => lsn, - None => { - debug!("Cannot download: no disk consistent Lsn is present for the remote timeline"); - return DownloadedTimeline::Abort; - } + if remote_timeline.checkpoints().max().is_none() { + debug!("Cannot download: no disk consistent Lsn is present for the remote timeline"); + return DownloadedTimeline::Abort; }; debug!("Downloading timeline archives"); @@ -125,7 +128,7 @@ pub(super) async fn download_timeline< conf, sync_id, Arc::clone(&remote_assets), - remote_timeline.as_ref(), + &remote_timeline, archive_id, Arc::clone(&download.files_to_skip), ) @@ -142,9 +145,7 @@ pub(super) async fn download_timeline< retries, SyncKind::Download(download), )); - return DownloadedTimeline::FailedAndRescheduled { - disk_consistent_lsn, - }; + return DownloadedTimeline::FailedAndRescheduled; } Ok(()) => { debug!("Successfully downloaded archive {:?}", archive_id); @@ -154,9 +155,7 @@ pub(super) async fn download_timeline< } debug!("Finished downloading all timeline's archives"); - DownloadedTimeline::Successful { - disk_consistent_lsn, - } + DownloadedTimeline::Successful } async fn try_download_archive< @@ -168,7 +167,7 @@ async fn try_download_archive< tenant_id, timeline_id, }: ZTenantTimelineId, - remote_assets: Arc<(S, RwLock)>, + remote_assets: Arc<(S, Arc>)>, remote_timeline: &RemoteTimeline, archive_id: ArchiveId, files_to_skip: Arc>, @@ -256,13 +255,15 @@ mod tests { let repo_harness = RepoHarness::create("test_download_timeline")?; let sync_id = ZTenantTimelineId::new(repo_harness.tenant_id, TIMELINE_ID); let storage = LocalFs::new(tempdir()?.path().to_owned(), &repo_harness.conf.workdir)?; - let index = RwLock::new(RemoteTimelineIndex::try_parse_descriptions_from_paths( - repo_harness.conf, - storage - .list() - .await? - .into_iter() - .map(|storage_path| storage.local_path(&storage_path).unwrap()), + let index = Arc::new(RwLock::new( + RemoteTimelineIndex::try_parse_descriptions_from_paths( + repo_harness.conf, + storage + .list() + .await? + .into_iter() + .map(|storage_path| storage.local_path(&storage_path).unwrap()), + ), )); let remote_assets = Arc::new((storage, index)); let storage = &remote_assets.0; diff --git a/pageserver/src/remote_storage/storage_sync/index.rs b/pageserver/src/remote_storage/storage_sync/index.rs index 81c99754c9..7d6b4881f7 100644 --- a/pageserver/src/remote_storage/storage_sync/index.rs +++ b/pageserver/src/remote_storage/storage_sync/index.rs @@ -11,7 +11,7 @@ use std::{ use anyhow::{bail, ensure, Context}; use serde::{Deserialize, Serialize}; -use tracing::debug; +use tracing::*; use zenith_utils::{ lsn::Lsn, zid::{ZTenantId, ZTimelineId}, @@ -52,10 +52,16 @@ impl RelativePath { /// Currently, timeline archive files are tracked only. #[derive(Debug, Clone)] pub struct RemoteTimelineIndex { - timeline_files: HashMap, + timeline_entries: HashMap, } impl RemoteTimelineIndex { + pub fn empty() -> Self { + Self { + timeline_entries: HashMap::new(), + } + } + /// Attempts to parse file paths (not checking the file contents) and find files /// that can be tracked wiht the index. /// On parse falures, logs the error and continues, so empty index can be created from not suitable paths. @@ -63,9 +69,7 @@ impl RemoteTimelineIndex { conf: &'static PageServerConf, paths: impl Iterator, ) -> Self { - let mut index = Self { - timeline_files: HashMap::new(), - }; + let mut index = Self::empty(); for path in paths { if let Err(e) = try_parse_index_entry(&mut index, conf, path.as_ref()) { debug!( @@ -79,40 +83,100 @@ impl RemoteTimelineIndex { } pub fn timeline_entry(&self, id: &ZTenantTimelineId) -> Option<&TimelineIndexEntry> { - self.timeline_files.get(id) + self.timeline_entries.get(id) } pub fn timeline_entry_mut( &mut self, id: &ZTenantTimelineId, ) -> Option<&mut TimelineIndexEntry> { - self.timeline_files.get_mut(id) + self.timeline_entries.get_mut(id) } pub fn add_timeline_entry(&mut self, id: ZTenantTimelineId, entry: TimelineIndexEntry) { - self.timeline_files.insert(id, entry); + self.timeline_entries.insert(id, entry); + } + + pub fn upgrade_timeline_entry( + &mut self, + id: &ZTenantTimelineId, + remote_timeline: RemoteTimeline, + ) -> anyhow::Result<()> { + let mut entry = self.timeline_entries.get_mut(id).ok_or(anyhow::anyhow!( + "timeline is unexpectedly missing from remote index" + ))?; + + if !matches!(entry.inner, TimelineIndexEntryInner::Description(_)) { + anyhow::bail!("timeline entry is not a description entry") + }; + + entry.inner = TimelineIndexEntryInner::Full(remote_timeline); + + Ok(()) } pub fn all_sync_ids(&self) -> impl Iterator + '_ { - self.timeline_files.keys().copied() + self.timeline_entries.keys().copied() + } + + pub fn set_awaits_download( + &mut self, + id: &ZTenantTimelineId, + awaits_download: bool, + ) -> anyhow::Result<()> { + self.timeline_entry_mut(id) + .ok_or_else(|| anyhow::anyhow!("unknown timeline sync {}", id))? + .set_awaits_download(awaits_download); + Ok(()) } } +#[derive(Debug, Clone, PartialEq, Eq, Default)] +pub struct DescriptionTimelineIndexEntry { + pub description: BTreeMap, + pub awaits_download: bool, +} + #[derive(Debug, Clone, PartialEq, Eq)] -pub enum TimelineIndexEntry { - /// An archive found on the remote storage, but not yet downloaded, only a metadata from its storage path is available, without archive contents. +pub struct FullTimelineIndexEntry { + pub remote_timeline: RemoteTimeline, + pub awaits_download: bool, +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum TimelineIndexEntryInner { Description(BTreeMap), - /// Full archive metadata, including the file list, parsed from the archive header. Full(RemoteTimeline), } +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct TimelineIndexEntry { + inner: TimelineIndexEntryInner, + awaits_download: bool, +} + impl TimelineIndexEntry { + pub fn new(inner: TimelineIndexEntryInner, awaits_download: bool) -> Self { + Self { + inner, + awaits_download, + } + } + + pub fn inner(&self) -> &TimelineIndexEntryInner { + &self.inner + } + + pub fn inner_mut(&mut self) -> &mut TimelineIndexEntryInner { + &mut self.inner + } + pub fn uploaded_checkpoints(&self) -> BTreeSet { - match self { - Self::Description(description) => { + match &self.inner { + TimelineIndexEntryInner::Description(description) => { description.keys().map(|archive_id| archive_id.0).collect() } - Self::Full(remote_timeline) => remote_timeline + TimelineIndexEntryInner::Full(remote_timeline) => remote_timeline .checkpoint_archives .keys() .map(|archive_id| archive_id.0) @@ -122,17 +186,25 @@ impl TimelineIndexEntry { /// Gets latest uploaded checkpoint's disk consisten Lsn for the corresponding timeline. pub fn disk_consistent_lsn(&self) -> Option { - match self { - Self::Description(description) => { + match &self.inner { + TimelineIndexEntryInner::Description(description) => { description.keys().map(|archive_id| archive_id.0).max() } - Self::Full(remote_timeline) => remote_timeline + TimelineIndexEntryInner::Full(remote_timeline) => remote_timeline .checkpoint_archives .keys() .map(|archive_id| archive_id.0) .max(), } } + + pub fn get_awaits_download(&self) -> bool { + self.awaits_download + } + + pub fn set_awaits_download(&mut self, awaits_download: bool) { + self.awaits_download = awaits_download; + } } /// Checkpoint archive's id, corresponding to the `disk_consistent_lsn` from the timeline's metadata file during checkpointing. @@ -331,13 +403,15 @@ fn try_parse_index_entry( tenant_id, timeline_id, }; - let timeline_index_entry = index - .timeline_files - .entry(sync_id) - .or_insert_with(|| TimelineIndexEntry::Description(BTreeMap::new())); - match timeline_index_entry { - TimelineIndexEntry::Description(descriptions) => { - descriptions.insert( + let timeline_index_entry = index.timeline_entries.entry(sync_id).or_insert_with(|| { + TimelineIndexEntry::new( + TimelineIndexEntryInner::Description(BTreeMap::default()), + false, + ) + }); + match timeline_index_entry.inner_mut() { + TimelineIndexEntryInner::Description(description) => { + description.insert( ArchiveId(disk_consistent_lsn), ArchiveDescription { header_size, @@ -346,7 +420,7 @@ fn try_parse_index_entry( }, ); } - TimelineIndexEntry::Full(_) => { + TimelineIndexEntryInner::Full(_) => { bail!("Cannot add parsed archive description to its full context in index with sync id {}", sync_id) } } diff --git a/pageserver/src/remote_storage/storage_sync/upload.rs b/pageserver/src/remote_storage/storage_sync/upload.rs index d064039ecc..8fdd91dd18 100644 --- a/pageserver/src/remote_storage/storage_sync/upload.rs +++ b/pageserver/src/remote_storage/storage_sync/upload.rs @@ -10,9 +10,9 @@ use crate::{ config::PageServerConf, remote_storage::{ storage_sync::{ - compression, - index::{RemoteTimeline, TimelineIndexEntry}, - sync_queue, update_index_description, SyncKind, SyncTask, + compression, fetch_full_index, + index::{RemoteTimeline, TimelineIndexEntry, TimelineIndexEntryInner}, + sync_queue, SyncKind, SyncTask, }, RemoteStorage, ZTenantTimelineId, }, @@ -30,7 +30,7 @@ pub(super) async fn upload_timeline_checkpoint< S: RemoteStorage + Send + Sync + 'static, >( config: &'static PageServerConf, - remote_assets: Arc<(S, RwLock)>, + remote_assets: Arc<(S, Arc>)>, sync_id: ZTenantTimelineId, new_checkpoint: NewCheckpoint, retries: u32, @@ -49,22 +49,24 @@ pub(super) async fn upload_timeline_checkpoint< let index_read = index.read().await; let remote_timeline = match index_read.timeline_entry(&sync_id) { None => None, - Some(TimelineIndexEntry::Full(remote_timeline)) => Some(Cow::Borrowed(remote_timeline)), - Some(TimelineIndexEntry::Description(_)) => { - debug!("Found timeline description for the given ids, downloading the full index"); - match update_index_description(remote_assets.as_ref(), &timeline_dir, sync_id).await { - Ok(remote_timeline) => Some(Cow::Owned(remote_timeline)), - Err(e) => { - error!("Failed to download full timeline index: {:?}", e); - sync_queue::push(SyncTask::new( - sync_id, - retries, - SyncKind::Upload(new_checkpoint), - )); - return Some(false); + Some(entry) => match entry.inner() { + TimelineIndexEntryInner::Full(remote_timeline) => Some(Cow::Borrowed(remote_timeline)), + TimelineIndexEntryInner::Description(_) => { + debug!("Found timeline description for the given ids, downloading the full index"); + match fetch_full_index(remote_assets.as_ref(), &timeline_dir, sync_id).await { + Ok(remote_timeline) => Some(Cow::Owned(remote_timeline)), + Err(e) => { + error!("Failed to download full timeline index: {:?}", e); + sync_queue::push(SyncTask::new( + sync_id, + retries, + SyncKind::Upload(new_checkpoint), + )); + return Some(false); + } } } - } + }, }; let already_contains_upload_lsn = remote_timeline @@ -95,22 +97,40 @@ pub(super) async fn upload_timeline_checkpoint< { Ok((archive_header, header_size)) => { let mut index_write = index.write().await; - match index_write.timeline_entry_mut(&sync_id) { - Some(TimelineIndexEntry::Full(remote_timeline)) => { - remote_timeline.update_archive_contents( - new_checkpoint.metadata.disk_consistent_lsn(), - archive_header, - header_size, - ); - } - None | Some(TimelineIndexEntry::Description(_)) => { + match index_write + .timeline_entry_mut(&sync_id) + .map(|e| e.inner_mut()) + { + None => { let mut new_timeline = RemoteTimeline::empty(); new_timeline.update_archive_contents( new_checkpoint.metadata.disk_consistent_lsn(), archive_header, header_size, ); - index_write.add_timeline_entry(sync_id, TimelineIndexEntry::Full(new_timeline)); + index_write.add_timeline_entry( + sync_id, + TimelineIndexEntry::new(TimelineIndexEntryInner::Full(new_timeline), false), + ) + } + Some(TimelineIndexEntryInner::Full(remote_timeline)) => { + remote_timeline.update_archive_contents( + new_checkpoint.metadata.disk_consistent_lsn(), + archive_header, + header_size, + ); + } + Some(TimelineIndexEntryInner::Description(_)) => { + let mut new_timeline = RemoteTimeline::empty(); + new_timeline.update_archive_contents( + new_checkpoint.metadata.disk_consistent_lsn(), + archive_header, + header_size, + ); + index_write.add_timeline_entry( + sync_id, + TimelineIndexEntry::new(TimelineIndexEntryInner::Full(new_timeline), false), + ) } } debug!("Checkpoint uploaded successfully"); @@ -136,7 +156,7 @@ async fn try_upload_checkpoint< S: RemoteStorage + Send + Sync + 'static, >( config: &'static PageServerConf, - remote_assets: Arc<(S, RwLock)>, + remote_assets: Arc<(S, Arc>)>, sync_id: ZTenantTimelineId, new_checkpoint: &NewCheckpoint, files_to_skip: BTreeSet, @@ -209,13 +229,15 @@ mod tests { let repo_harness = RepoHarness::create("reupload_timeline")?; let sync_id = ZTenantTimelineId::new(repo_harness.tenant_id, TIMELINE_ID); let storage = LocalFs::new(tempdir()?.path().to_owned(), &repo_harness.conf.workdir)?; - let index = RwLock::new(RemoteTimelineIndex::try_parse_descriptions_from_paths( - repo_harness.conf, - storage - .list() - .await? - .into_iter() - .map(|storage_path| storage.local_path(&storage_path).unwrap()), + let index = Arc::new(RwLock::new( + RemoteTimelineIndex::try_parse_descriptions_from_paths( + repo_harness.conf, + storage + .list() + .await? + .into_iter() + .map(|storage_path| storage.local_path(&storage_path).unwrap()), + ), )); let remote_assets = Arc::new((storage, index)); let index = &remote_assets.1; @@ -405,13 +427,15 @@ mod tests { let repo_harness = RepoHarness::create("reupload_timeline_rejected")?; let sync_id = ZTenantTimelineId::new(repo_harness.tenant_id, TIMELINE_ID); let storage = LocalFs::new(tempdir()?.path().to_owned(), &repo_harness.conf.workdir)?; - let index = RwLock::new(RemoteTimelineIndex::try_parse_descriptions_from_paths( - repo_harness.conf, - storage - .list() - .await? - .into_iter() - .map(|storage_path| storage.local_path(&storage_path).unwrap()), + let index = Arc::new(RwLock::new( + RemoteTimelineIndex::try_parse_descriptions_from_paths( + repo_harness.conf, + storage + .list() + .await? + .into_iter() + .map(|storage_path| storage.local_path(&storage_path).unwrap()), + ), )); let remote_assets = Arc::new((storage, index)); let storage = &remote_assets.0; diff --git a/pageserver/src/repository.rs b/pageserver/src/repository.rs index be937b8d26..e335f42519 100644 --- a/pageserver/src/repository.rs +++ b/pageserver/src/repository.rs @@ -1,4 +1,6 @@ +use crate::layered_repository::metadata::TimelineMetadata; use crate::relish::*; +use crate::remote_storage::RemoteTimelineIndex; use crate::walrecord::MultiXactMember; use crate::CheckpointConfig; use anyhow::Result; @@ -6,6 +8,7 @@ use bytes::Bytes; use postgres_ffi::{MultiXactId, MultiXactOffset, TransactionId}; use serde::{Deserialize, Serialize}; use std::collections::HashSet; +use std::fmt::Display; use std::ops::{AddAssign, Deref}; use std::sync::{Arc, RwLockReadGuard}; use std::time::Duration; @@ -15,30 +18,43 @@ use zenith_utils::zid::ZTimelineId; /// Block number within a relish. This matches PostgreSQL's BlockNumber type. pub type BlockNumber = u32; +#[derive(Clone, Copy, Debug)] +pub enum TimelineSyncStatusUpdate { + Uploaded, + Downloaded, +} + +impl Display for TimelineSyncStatusUpdate { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let s = match self { + TimelineSyncStatusUpdate::Uploaded => "Uploaded", + TimelineSyncStatusUpdate::Downloaded => "Downloaded", + }; + f.write_str(s) + } +} /// /// A repository corresponds to one .zenith directory. One repository holds multiple /// timelines, forked off from the same initial call to 'initdb'. pub trait Repository: Send + Sync { - fn detach_timeline(&self, timeline_id: ZTimelineId) -> Result<()>; - - /// Updates timeline based on the new sync state, received from the remote storage synchronization. + /// Updates timeline based on the `TimelineSyncStatusUpdate`, received from the remote storage synchronization. /// See [`crate::remote_storage`] for more details about the synchronization. - fn set_timeline_state( + fn apply_timeline_remote_sync_status_update( &self, timeline_id: ZTimelineId, - new_state: TimelineSyncState, + timeline_sync_status_update: TimelineSyncStatusUpdate, ) -> Result<()>; - /// Gets current synchronization state of the timeline. - /// See [`crate::remote_storage`] for more details about the synchronization. - fn get_timeline_state(&self, timeline_id: ZTimelineId) -> Option; - /// Get Timeline handle for given zenith timeline ID. - fn get_timeline(&self, timelineid: ZTimelineId) -> Result; + /// This function is idempotent. It doesnt change internal state in any way. + fn get_timeline(&self, timelineid: ZTimelineId) -> Option; + + /// Get Timeline handle for locally available timeline. Load it into memory if it is not loaded. + fn get_timeline_load(&self, timelineid: ZTimelineId) -> Result>; /// Lists timelines the repository contains. /// Up to repository's implementation to omit certain timelines that ar not considered ready for use. - fn list_timelines(&self) -> Result>; + fn list_timelines(&self) -> Vec<(ZTimelineId, RepositoryTimeline)>; /// Create a new, empty timeline. The caller is responsible for loading data into it /// Initdb lsn is provided for timeline impl to be able to perform checks for some operations against it. @@ -70,72 +86,44 @@ pub trait Repository: Send + Sync { /// perform one checkpoint iteration, flushing in-memory data on disk. /// this function is periodically called by checkponter thread. fn checkpoint_iteration(&self, cconf: CheckpointConfig) -> Result<()>; + + /// detaches locally available timeline by stopping all threads and removing all the data. + fn detach_timeline(&self, timeline_id: ZTimelineId) -> Result<()>; + + // Allows to retrieve remote timeline index from the repo. Used in walreceiver to grab remote consistent lsn. + fn get_remote_index(&self) -> &tokio::sync::RwLock; } /// A timeline, that belongs to the current repository. pub enum RepositoryTimeline { /// Timeline, with its files present locally in pageserver's working directory. /// Loaded into pageserver's memory and ready to be used. - Local { - id: ZTimelineId, - timeline: Arc, - }, - /// Timeline, found on the pageserver's remote storage, but not yet downloaded locally. - Remote { - id: ZTimelineId, - /// metadata contents of the latest successfully uploaded checkpoint - disk_consistent_lsn: Lsn, + Loaded(Arc), + + /// All the data is available locally, but not loaded into memory, so loading have to be done before actually using the timeline + Unloaded { + // It is ok to keep metadata here, because it is not changed when timeline is unloaded. + // FIXME can s3 sync actually change it? It can change it when timeline is in awaiting download state. + // but we currently do not download something for the timeline once it is local (even if there are new checkpoints) is it correct? + // also it is not that good to keep TimelineMetadata here, because it is layered repo implementation detail + metadata: TimelineMetadata, }, } -impl RepositoryTimeline { - pub fn local_timeline(&self) -> Option> { - if let Self::Local { timeline, .. } = self { - Some(Arc::clone(timeline)) - } else { - None - } - } - - pub fn id(&self) -> ZTimelineId { - match self { - Self::Local { id, .. } => *id, - Self::Remote { id, .. } => *id, - } - } -} - -/// A state of the timeline synchronization with the remote storage. -/// Contains `disk_consistent_lsn` of the corresponding remote timeline (latest checkpoint's disk_consistent_lsn). #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] -pub enum TimelineSyncState { - /// No further downloads from the remote storage are needed. - /// The timeline state is up-to-date or ahead of the remote storage one, - /// ready to be used in any pageserver operation. - Ready(Lsn), - /// Timeline is scheduled for downloading, but its current local state is not up to date with the remote storage. - /// The timeline is not ready to be used in any pageserver operations, otherwise it might diverge its local state from the remote version, - /// making it impossible to sync it further. - AwaitsDownload(Lsn), - /// Timeline was not in the pageserver's local working directory, but was found on the remote storage, ready to be downloaded. - /// Cannot be used in any pageserver operations due to complete absence locally. - CloudOnly(Lsn), - /// Timeline was evicted from the pageserver's local working directory due to conflicting remote and local states or too many errors during the synchronization. - /// Such timelines cannot have their state synchronized further and may not have the data about remote timeline's disk_consistent_lsn, since eviction may happen - /// due to errors before the remote timeline contents is known. - Evicted(Option), +pub enum LocalTimelineState { + // timeline is loaded into memory (with layer map and all the bits), + Loaded, + // timeline is on disk locally and ready to be loaded into memory. + Unloaded, } -impl TimelineSyncState { - pub fn remote_disk_consistent_lsn(&self) -> Option { - Some(match self { - TimelineSyncState::Evicted(None) => return None, - TimelineSyncState::Ready(lsn) => lsn, - TimelineSyncState::AwaitsDownload(lsn) => lsn, - TimelineSyncState::CloudOnly(lsn) => lsn, - TimelineSyncState::Evicted(Some(lsn)) => lsn, - }) - .copied() +impl<'a> From<&'a RepositoryTimeline> for LocalTimelineState { + fn from(local_timeline_entry: &'a RepositoryTimeline) -> Self { + match local_timeline_entry { + RepositoryTimeline::Loaded(_) => LocalTimelineState::Loaded, + RepositoryTimeline::Unloaded { .. } => LocalTimelineState::Unloaded, + } } } @@ -362,7 +350,7 @@ pub mod repo_harness { use crate::{ config::PageServerConf, - layered_repository::{LayeredRepository, TIMELINES_SEGMENT_NAME}, + layered_repository::LayeredRepository, walredo::{WalRedoError, WalRedoManager}, }; @@ -395,7 +383,6 @@ pub mod repo_harness { let repo_dir = PageServerConf::test_repo_dir(test_name); let _ = fs::remove_dir_all(&repo_dir); fs::create_dir_all(&repo_dir)?; - fs::create_dir_all(&repo_dir.join(TIMELINES_SEGMENT_NAME))?; let conf = PageServerConf::dummy_conf(repo_dir); // Make a static copy of the config. This can never be free'd, but that's @@ -404,19 +391,45 @@ pub mod repo_harness { let tenant_id = ZTenantId::generate(); fs::create_dir_all(conf.tenant_path(&tenant_id))?; + fs::create_dir_all(conf.timelines_path(&tenant_id))?; Ok(Self { conf, tenant_id }) } pub fn load(&self) -> Box { + self.try_load().expect("failed to load test repo") + } + + pub fn try_load(&self) -> Result> { let walredo_mgr = Arc::new(TestRedoManager); - Box::new(LayeredRepository::new( + let repo = Box::new(LayeredRepository::new( self.conf, walredo_mgr, self.tenant_id, + Arc::new(tokio::sync::RwLock::new(RemoteTimelineIndex::empty())), false, - )) + )); + // populate repo with locally available timelines + for timeline_dir_entry in fs::read_dir(self.conf.timelines_path(&self.tenant_id)) + .expect("should be able to read timelines dir") + { + let timeline_dir_entry = timeline_dir_entry.unwrap(); + let timeline_id: ZTimelineId = timeline_dir_entry + .path() + .file_name() + .unwrap() + .to_string_lossy() + .parse() + .unwrap(); + + repo.apply_timeline_remote_sync_status_update( + timeline_id, + TimelineSyncStatusUpdate::Downloaded, + )?; + } + + Ok(repo) } pub fn timeline_path(&self, timeline_id: &ZTimelineId) -> PathBuf { @@ -835,10 +848,9 @@ mod tests { // Create a branch, check that the relation is visible there repo.branch_timeline(TIMELINE_ID, NEW_TIMELINE_ID, Lsn(0x30))?; - let newtline = match repo.get_timeline(NEW_TIMELINE_ID)?.local_timeline() { - Some(timeline) => timeline, - None => panic!("Should have a local timeline"), - }; + let newtline = repo + .get_timeline_load(NEW_TIMELINE_ID) + .expect("Should have a local timeline"); let new_writer = newtline.writer(); assert!(newtline @@ -896,10 +908,9 @@ mod tests { // Branch the history, modify relation differently on the new timeline repo.branch_timeline(TIMELINE_ID, NEW_TIMELINE_ID, Lsn(0x30))?; - let newtline = match repo.get_timeline(NEW_TIMELINE_ID)?.local_timeline() { - Some(timeline) => timeline, - None => panic!("Should have a local timeline"), - }; + let newtline = repo + .get_timeline_load(NEW_TIMELINE_ID) + .expect("Should have a local timeline"); let new_writer = newtline.writer(); new_writer.put_page_image(TESTREL_A, 0, Lsn(0x40), TEST_IMG("bar blk 0 at 4"))?; @@ -1046,11 +1057,9 @@ mod tests { make_some_layers(&tline, Lsn(0x20))?; repo.branch_timeline(TIMELINE_ID, NEW_TIMELINE_ID, Lsn(0x40))?; - let newtline = match repo.get_timeline(NEW_TIMELINE_ID)?.local_timeline() { - Some(timeline) => timeline, - None => panic!("Should have a local timeline"), - }; - + let newtline = repo + .get_timeline_load(NEW_TIMELINE_ID) + .expect("Should have a local timeline"); // this removes layers before lsn 40 (50 minus 10), so there are two remaining layers, image and delta for 31-50 repo.gc_iteration(Some(TIMELINE_ID), 0x10, false)?; assert!(newtline.get_page_at_lsn(TESTREL_A, 0, Lsn(0x25)).is_ok()); @@ -1067,10 +1076,9 @@ mod tests { make_some_layers(&tline, Lsn(0x20))?; repo.branch_timeline(TIMELINE_ID, NEW_TIMELINE_ID, Lsn(0x40))?; - let newtline = match repo.get_timeline(NEW_TIMELINE_ID)?.local_timeline() { - Some(timeline) => timeline, - None => panic!("Should have a local timeline"), - }; + let newtline = repo + .get_timeline_load(NEW_TIMELINE_ID) + .expect("Should have a local timeline"); make_some_layers(&newtline, Lsn(0x60))?; @@ -1143,4 +1151,81 @@ mod tests { Ok(()) } + + #[test] + fn timeline_load() -> Result<()> { + const TEST_NAME: &str = "timeline_load"; + let harness = RepoHarness::create(TEST_NAME)?; + { + let repo = harness.load(); + let tline = repo.create_empty_timeline(TIMELINE_ID, Lsn(0x8000))?; + make_some_layers(&tline, Lsn(0x8000))?; + tline.checkpoint(CheckpointConfig::Forced)?; + } + + let repo = harness.load(); + let tline = repo + .get_timeline(TIMELINE_ID) + .expect("cannot load timeline"); + assert!(matches!(tline, RepositoryTimeline::Unloaded { .. })); + + assert!(repo.get_timeline_load(TIMELINE_ID).is_ok()); + + let tline = repo + .get_timeline(TIMELINE_ID) + .expect("cannot load timeline"); + assert!(matches!(tline, RepositoryTimeline::Loaded(_))); + + Ok(()) + } + + #[test] + fn timeline_load_with_ancestor() -> Result<()> { + const TEST_NAME: &str = "timeline_load"; + let harness = RepoHarness::create(TEST_NAME)?; + // create two timelines + { + let repo = harness.load(); + let tline = repo.create_empty_timeline(TIMELINE_ID, Lsn(0))?; + + make_some_layers(&tline, Lsn(0x20))?; + tline.checkpoint(CheckpointConfig::Forced)?; + + repo.branch_timeline(TIMELINE_ID, NEW_TIMELINE_ID, Lsn(0x40))?; + + let newtline = repo + .get_timeline_load(NEW_TIMELINE_ID) + .expect("Should have a local timeline"); + + make_some_layers(&newtline, Lsn(0x60))?; + tline.checkpoint(CheckpointConfig::Forced)?; + } + + // check that both of them are initially unloaded + let repo = harness.load(); + { + let tline = repo.get_timeline(TIMELINE_ID).expect("cannot get timeline"); + assert!(matches!(tline, RepositoryTimeline::Unloaded { .. })); + + let tline = repo + .get_timeline(NEW_TIMELINE_ID) + .expect("cannot get timeline"); + assert!(matches!(tline, RepositoryTimeline::Unloaded { .. })); + } + // load only child timeline + let _ = repo + .get_timeline_load(NEW_TIMELINE_ID) + .expect("cannot load timeline"); + + // check that both, child and ancestor are loaded + let tline = repo + .get_timeline(NEW_TIMELINE_ID) + .expect("cannot get timeline"); + assert!(matches!(tline, RepositoryTimeline::Loaded(_))); + + let tline = repo.get_timeline(TIMELINE_ID).expect("cannot get timeline"); + assert!(matches!(tline, RepositoryTimeline::Loaded(_))); + + Ok(()) + } } diff --git a/pageserver/src/tenant_mgr.rs b/pageserver/src/tenant_mgr.rs index 568088fc1d..8584bdd424 100644 --- a/pageserver/src/tenant_mgr.rs +++ b/pageserver/src/tenant_mgr.rs @@ -3,16 +3,19 @@ use crate::config::PageServerConf; use crate::layered_repository::LayeredRepository; -use crate::repository::{Repository, Timeline, TimelineSyncState}; +use crate::remote_storage::RemoteTimelineIndex; +use crate::repository::{Repository, Timeline, TimelineSyncStatusUpdate}; use crate::thread_mgr; use crate::thread_mgr::ThreadKind; use crate::timelines; +use crate::timelines::CreateRepo; use crate::walredo::PostgresRedoManager; use crate::CheckpointConfig; use anyhow::{Context, Result}; use lazy_static::lazy_static; use log::*; use serde::{Deserialize, Serialize}; +use std::collections::hash_map::Entry; use std::collections::HashMap; use std::fmt; use std::sync::{Arc, Mutex, MutexGuard}; @@ -57,79 +60,67 @@ fn access_tenants() -> MutexGuard<'static, HashMap> { TENANTS.lock().unwrap() } -/// Updates tenants' repositories, changing their timelines state in memory. -pub fn set_timeline_states( +// Sets up wal redo manager and repository for tenant. Reduces code duplocation. +// Used during pageserver startup, or when new tenant is attached to pageserver. +pub fn load_local_repo( conf: &'static PageServerConf, - timeline_states: HashMap>, -) { - if timeline_states.is_empty() { - debug!("no timeline state updates to perform"); - return; - } - - info!("Updating states for {} timelines", timeline_states.len()); - trace!("States: {:?}", timeline_states); - + tenant_id: ZTenantId, + remote_index: &Arc>, +) -> Arc { let mut m = access_tenants(); - for (tenant_id, timeline_states) in timeline_states { - let tenant = m.entry(tenant_id).or_insert_with(|| { - // TODO (rodionov) reuse one of the initialisation routines - // Set up a WAL redo manager, for applying WAL records. - let walredo_mgr = PostgresRedoManager::new(conf, tenant_id); + let tenant = m.entry(tenant_id).or_insert_with(|| { + // Set up a WAL redo manager, for applying WAL records. + let walredo_mgr = PostgresRedoManager::new(conf, tenant_id); - // Set up an object repository, for actual data storage. - let repo: Arc = Arc::new(LayeredRepository::new( - conf, - Arc::new(walredo_mgr), - tenant_id, - conf.remote_storage_config.is_some(), - )); - Tenant { - state: TenantState::Idle, - repo, - } - }); - if let Err(e) = put_timelines_into_tenant(tenant, tenant_id, timeline_states) { - error!( - "Failed to update timeline states for tenant {}: {:?}", - tenant_id, e - ); + // Set up an object repository, for actual data storage. + let repo: Arc = Arc::new(LayeredRepository::new( + conf, + Arc::new(walredo_mgr), + tenant_id, + Arc::clone(remote_index), + conf.remote_storage_config.is_some(), + )); + Tenant { + state: TenantState::Idle, + repo, } - } + }); + Arc::clone(&tenant.repo) } -fn put_timelines_into_tenant( - tenant: &mut Tenant, - tenant_id: ZTenantId, - timeline_states: HashMap, -) -> anyhow::Result<()> { - for (timeline_id, timeline_state) in timeline_states { - // If the timeline is being put into any other state than Ready, - // stop any threads operating on it. - // - // FIXME: This is racy. A page service thread could just get - // handle on the Timeline, before we call set_timeline_state() - if !matches!(timeline_state, TimelineSyncState::Ready(_)) { - thread_mgr::shutdown_threads(None, Some(tenant_id), Some(timeline_id)); - - // Should we run a final checkpoint to flush all the data to - // disk? Doesn't seem necessary; all of the states other than - // Ready imply that the data on local disk is corrupt or incomplete, - // and we don't want to flush that to disk. - } - - tenant - .repo - .set_timeline_state(timeline_id, timeline_state) - .with_context(|| { - format!( - "Failed to update timeline {} state to {:?}", - timeline_id, timeline_state - ) - })?; +/// Updates tenants' repositories, changing their timelines state in memory. +pub fn apply_timeline_sync_status_updates( + conf: &'static PageServerConf, + remote_index: Arc>, + sync_status_updates: HashMap>, +) { + if sync_status_updates.is_empty() { + debug!("no sync status updates to apply"); + return; } + info!( + "Applying sync status updates for {} timelines", + sync_status_updates.len() + ); + trace!("Sync status updates: {:?}", sync_status_updates); - Ok(()) + for (tenant_id, tenant_timelines_sync_status_updates) in sync_status_updates { + let repo = load_local_repo(conf, tenant_id, &remote_index); + + for (timeline_id, timeline_sync_status_update) in tenant_timelines_sync_status_updates { + match repo.apply_timeline_remote_sync_status_update(timeline_id, timeline_sync_status_update) + { + Ok(_) => debug!( + "successfully applied timeline sync status update: {} -> {}", + timeline_id, timeline_sync_status_update + ), + Err(e) => error!( + "Failed to apply timeline sync status update for tenant {}. timeline {} update {} Error: {:#}", + tenant_id, timeline_id, timeline_sync_status_update, e + ), + } + } + } } /// @@ -179,24 +170,30 @@ pub fn shutdown_all_tenants() { pub fn create_tenant_repository( conf: &'static PageServerConf, - new_tenant_id: Option, + tenantid: ZTenantId, + remote_index: Arc>, ) -> Result> { - let new_tenant_id = new_tenant_id.unwrap_or_else(ZTenantId::generate); - let wal_redo_manager = Arc::new(PostgresRedoManager::new(conf, new_tenant_id)); - match timelines::create_repo(conf, new_tenant_id, wal_redo_manager)? { - Some(repo) => { - access_tenants() - .entry(new_tenant_id) - .or_insert_with(|| Tenant { - state: TenantState::Idle, - repo, - }); - Ok(Some(new_tenant_id)) - } - None => { - debug!("repository already exists for tenant {}", new_tenant_id); + match access_tenants().entry(tenantid) { + Entry::Occupied(_) => { + debug!("tenant {} already exists", tenantid); Ok(None) } + Entry::Vacant(v) => { + let wal_redo_manager = Arc::new(PostgresRedoManager::new(conf, tenantid)); + let repo = timelines::create_repo( + conf, + tenantid, + CreateRepo::Real { + wal_redo_manager, + remote_index, + }, + )?; + v.insert(Tenant { + state: TenantState::Idle, + repo, + }); + Ok(Some(tenantid)) + } } } @@ -255,19 +252,19 @@ pub fn get_repository_for_tenant(tenantid: ZTenantId) -> Result Result> { get_repository_for_tenant(tenantid)? - .get_timeline(timelineid)? - .local_timeline() - .with_context(|| format!("cannot fetch timeline {}", timelineid)) + .get_timeline_load(timelineid) + .with_context(|| format!("Timeline {} not found for tenant {}", timelineid, tenantid)) } #[derive(Serialize, Deserialize, Clone)] diff --git a/pageserver/src/timelines.rs b/pageserver/src/timelines.rs index 4de131ef70..9cfc21b413 100644 --- a/pageserver/src/timelines.rs +++ b/pageserver/src/timelines.rs @@ -2,8 +2,9 @@ //! Timeline management code // -use anyhow::{anyhow, bail, Context, Result}; +use anyhow::{bail, Context, Result}; use postgres_ffi::ControlFileData; +use serde::{Deserialize, Serialize}; use std::{ fs, path::Path, @@ -12,135 +13,126 @@ use std::{ }; use tracing::*; -use zenith_utils::lsn::Lsn; -use zenith_utils::zid::{ZTenantId, ZTimelineId}; +use zenith_utils::zid::{ZTenantId, ZTenantTimelineId, ZTimelineId}; use zenith_utils::{crashsafe_dir, logging}; +use zenith_utils::{lsn::Lsn, zid::HexZTimelineId}; -use crate::{config::PageServerConf, repository::Repository}; +use crate::{ + config::PageServerConf, + layered_repository::metadata::TimelineMetadata, + remote_storage::RemoteTimelineIndex, + repository::{LocalTimelineState, Repository}, +}; use crate::{import_datadir, LOG_FILE_NAME}; use crate::{layered_repository::LayeredRepository, walredo::WalRedoManager}; use crate::{repository::RepositoryTimeline, tenant_mgr}; use crate::{repository::Timeline, CheckpointConfig}; -#[derive(Clone)] -pub enum TimelineInfo { - Local { - timeline_id: ZTimelineId, - tenant_id: ZTenantId, - last_record_lsn: Lsn, - prev_record_lsn: Lsn, - ancestor_timeline_id: Option, - ancestor_lsn: Option, - disk_consistent_lsn: Lsn, - current_logical_size: usize, - current_logical_size_non_incremental: Option, - }, - Remote { - timeline_id: ZTimelineId, - tenant_id: ZTenantId, - disk_consistent_lsn: Lsn, - }, +#[derive(Debug, Serialize, Deserialize, Clone)] +pub struct LocalTimelineInfo { + pub ancestor_timeline_id: Option, + pub ancestor_lsn: Option, + pub last_record_lsn: Lsn, + pub prev_record_lsn: Option, + pub disk_consistent_lsn: Lsn, + pub current_logical_size: Option, // is None when timeline is Unloaded + pub current_logical_size_non_incremental: Option, + pub timeline_state: LocalTimelineState, } -impl TimelineInfo { - pub fn from_repo_timeline( - tenant_id: ZTenantId, - repo_timeline: RepositoryTimeline, - include_non_incremental_logical_size: bool, - ) -> Self { - match repo_timeline { - RepositoryTimeline::Local { id, timeline } => { - let ancestor_timeline_id = timeline.get_ancestor_timeline_id(); - let ancestor_lsn = if ancestor_timeline_id.is_some() { - Some(timeline.get_ancestor_lsn()) - } else { - None - }; - - Self::Local { - timeline_id: id, - tenant_id, - last_record_lsn: timeline.get_last_record_lsn(), - prev_record_lsn: timeline.get_prev_record_lsn(), - ancestor_timeline_id, - ancestor_lsn, - disk_consistent_lsn: timeline.get_disk_consistent_lsn(), - current_logical_size: timeline.get_current_logical_size(), - current_logical_size_non_incremental: get_current_logical_size_non_incremental( - include_non_incremental_logical_size, - timeline.as_ref(), - ), - } - } - RepositoryTimeline::Remote { - id, - disk_consistent_lsn, - } => Self::Remote { - timeline_id: id, - tenant_id, - disk_consistent_lsn, - }, - } - } - - pub fn from_dyn_timeline( - tenant_id: ZTenantId, - timeline_id: ZTimelineId, +impl LocalTimelineInfo { + pub fn from_loaded_timeline( timeline: &dyn Timeline, include_non_incremental_logical_size: bool, - ) -> Self { - let ancestor_timeline_id = timeline.get_ancestor_timeline_id(); - let ancestor_lsn = if ancestor_timeline_id.is_some() { - Some(timeline.get_ancestor_lsn()) - } else { - None - }; - - Self::Local { - timeline_id, - tenant_id, - last_record_lsn: timeline.get_last_record_lsn(), - prev_record_lsn: timeline.get_prev_record_lsn(), - ancestor_timeline_id, - ancestor_lsn, + ) -> anyhow::Result { + let last_record_lsn = timeline.get_last_record_lsn(); + let info = LocalTimelineInfo { + ancestor_timeline_id: timeline + .get_ancestor_timeline_id() + .map(HexZTimelineId::from), + ancestor_lsn: { + match timeline.get_ancestor_lsn() { + Lsn(0) => None, + lsn @ Lsn(_) => Some(lsn), + } + }, disk_consistent_lsn: timeline.get_disk_consistent_lsn(), - current_logical_size: timeline.get_current_logical_size(), - current_logical_size_non_incremental: get_current_logical_size_non_incremental( - include_non_incremental_logical_size, - timeline, - ), + last_record_lsn, + prev_record_lsn: Some(timeline.get_prev_record_lsn()), + timeline_state: LocalTimelineState::Loaded, + current_logical_size: Some(timeline.get_current_logical_size()), + current_logical_size_non_incremental: if include_non_incremental_logical_size { + Some(timeline.get_current_logical_size_non_incremental(last_record_lsn)?) + } else { + None + }, + }; + Ok(info) + } + + pub fn from_unloaded_timeline(metadata: &TimelineMetadata) -> Self { + LocalTimelineInfo { + ancestor_timeline_id: metadata.ancestor_timeline().map(HexZTimelineId::from), + ancestor_lsn: { + match metadata.ancestor_lsn() { + Lsn(0) => None, + lsn @ Lsn(_) => Some(lsn), + } + }, + disk_consistent_lsn: metadata.disk_consistent_lsn(), + last_record_lsn: metadata.disk_consistent_lsn(), + prev_record_lsn: metadata.prev_record_lsn(), + timeline_state: LocalTimelineState::Unloaded, + current_logical_size: None, + current_logical_size_non_incremental: None, } } - pub fn timeline_id(&self) -> ZTimelineId { - match *self { - TimelineInfo::Local { timeline_id, .. } => timeline_id, - TimelineInfo::Remote { timeline_id, .. } => timeline_id, - } - } - - pub fn tenant_id(&self) -> ZTenantId { - match *self { - TimelineInfo::Local { tenant_id, .. } => tenant_id, - TimelineInfo::Remote { tenant_id, .. } => tenant_id, + pub fn from_repo_timeline( + repo_timeline: RepositoryTimeline, + include_non_incremental_logical_size: bool, + ) -> anyhow::Result { + match repo_timeline { + RepositoryTimeline::Loaded(timeline) => { + Self::from_loaded_timeline(timeline.as_ref(), include_non_incremental_logical_size) + } + RepositoryTimeline::Unloaded { metadata } => { + Ok(Self::from_unloaded_timeline(&metadata)) + } } } } -fn get_current_logical_size_non_incremental( - include_non_incremental_logical_size: bool, - timeline: &dyn Timeline, -) -> Option { - if !include_non_incremental_logical_size { - return None; - } - match timeline.get_current_logical_size_non_incremental(timeline.get_last_record_lsn()) { - Ok(size) => Some(size), - Err(e) => { - error!("Failed to get non-incremental logical size: {:?}", e); - None - } - } +#[derive(Debug, Serialize, Deserialize, Clone)] +pub struct RemoteTimelineInfo { + pub remote_consistent_lsn: Option, + pub awaits_download: bool, +} + +#[derive(Debug, Serialize, Deserialize, Clone)] +pub struct TimelineInfo { + #[serde(with = "hex")] + pub tenant_id: ZTenantId, + #[serde(with = "hex")] + pub timeline_id: ZTimelineId, + pub local: Option, + pub remote: Option, +} + +pub fn extract_remote_timeline_info( + tenant_id: ZTenantId, + timeline_id: ZTimelineId, + remote_index: &RemoteTimelineIndex, +) -> Option { + remote_index + .timeline_entry(&ZTenantTimelineId { + tenant_id, + timeline_id, + }) + .map(|remote_entry| RemoteTimelineInfo { + remote_consistent_lsn: remote_entry.disk_consistent_lsn(), + awaits_download: remote_entry.get_awaits_download(), + }) } #[derive(Debug, Clone, Copy)] @@ -158,25 +150,12 @@ pub fn init_pageserver( // use true as daemonize parameter because otherwise we pollute zenith cli output with a few pages long output of info messages let _log_file = logging::init(LOG_FILE_NAME, true)?; - // We don't use the real WAL redo manager, because we don't want to spawn the WAL redo - // process during repository initialization. - // - // FIXME: That caused trouble, because the WAL redo manager spawned a thread that launched - // initdb in the background, and it kept running even after the "zenith init" had exited. - // In tests, we started the page server immediately after that, so that initdb was still - // running in the background, and we failed to run initdb again in the same directory. This - // has been solved for the rapid init+start case now, but the general race condition remains - // if you restart the server quickly. The WAL redo manager doesn't use a separate thread - // anymore, but I think that could still happen. - let dummy_redo_mgr = Arc::new(crate::walredo::DummyRedoManager {}); - crashsafe_dir::create_dir_all(conf.tenants_path())?; if let Some(tenant_id) = create_tenant { println!("initializing tenantid {}", tenant_id); - let repo = create_repo(conf, tenant_id, dummy_redo_mgr) - .context("failed to create repo")? - .ok_or_else(|| anyhow!("For newely created pageserver, found already existing repository for tenant {}", tenant_id))?; + let repo = + create_repo(conf, tenant_id, CreateRepo::Dummy).context("failed to create repo")?; let new_timeline_id = initial_timeline_id.unwrap_or_else(ZTimelineId::generate); bootstrap_timeline(conf, tenant_id, new_timeline_id, repo.as_ref()) .context("failed to create initial timeline")?; @@ -189,15 +168,45 @@ pub fn init_pageserver( Ok(()) } +pub enum CreateRepo { + Real { + wal_redo_manager: Arc, + remote_index: Arc>, + }, + Dummy, +} + pub fn create_repo( conf: &'static PageServerConf, tenant_id: ZTenantId, - wal_redo_manager: Arc, -) -> Result>> { + create_repo: CreateRepo, +) -> Result> { + let (wal_redo_manager, remote_index) = match create_repo { + CreateRepo::Real { + wal_redo_manager, + remote_index, + } => (wal_redo_manager, remote_index), + CreateRepo::Dummy => { + // We don't use the real WAL redo manager, because we don't want to spawn the WAL redo + // process during repository initialization. + // + // FIXME: That caused trouble, because the WAL redo manager spawned a thread that launched + // initdb in the background, and it kept running even after the "zenith init" had exited. + // In tests, we started the page server immediately after that, so that initdb was still + // running in the background, and we failed to run initdb again in the same directory. This + // has been solved for the rapid init+start case now, but the general race condition remains + // if you restart the server quickly. The WAL redo manager doesn't use a separate thread + // anymore, but I think that could still happen. + let wal_redo_manager = Arc::new(crate::walredo::DummyRedoManager {}); + + let remote_index = Arc::new(tokio::sync::RwLock::new(RemoteTimelineIndex::empty())); + (wal_redo_manager as _, remote_index) + } + }; + let repo_dir = conf.tenant_path(&tenant_id); if repo_dir.exists() { - debug!("repo for {} already exists", tenant_id); - return Ok(None); + bail!("tenant {} directory already exists", tenant_id); } // top-level dir may exist if we are creating it through CLI @@ -206,12 +215,13 @@ pub fn create_repo( crashsafe_dir::create_dir(conf.timelines_path(&tenant_id))?; info!("created directory structure in {}", repo_dir.display()); - Ok(Some(Arc::new(LayeredRepository::new( + Ok(Arc::new(LayeredRepository::new( conf, wal_redo_manager, tenant_id, + remote_index, conf.remote_storage_config.is_some(), - )))) + ))) } // Returns checkpoint LSN from controlfile @@ -299,30 +309,25 @@ fn bootstrap_timeline( Ok(timeline) } -pub(crate) fn get_timelines( +pub(crate) fn get_local_timelines( tenant_id: ZTenantId, include_non_incremental_logical_size: bool, -) -> Result> { +) -> Result> { let repo = tenant_mgr::get_repository_for_tenant(tenant_id) .with_context(|| format!("Failed to get repo for tenant {}", tenant_id))?; + let repo_timelines = repo.list_timelines(); - Ok(repo - .list_timelines() - .with_context(|| format!("Failed to list timelines for tenant {}", tenant_id))? - .into_iter() - .filter_map(|timeline| match timeline { - RepositoryTimeline::Local { timeline, id } => Some((id, timeline)), - RepositoryTimeline::Remote { .. } => None, - }) - .map(|(timeline_id, timeline)| { - TimelineInfo::from_dyn_timeline( - tenant_id, - timeline_id, - timeline.as_ref(), + let mut local_timeline_info = Vec::with_capacity(repo_timelines.len()); + for (timeline_id, repository_timeline) in repo_timelines { + local_timeline_info.push(( + timeline_id, + LocalTimelineInfo::from_repo_timeline( + repository_timeline, include_non_incremental_logical_size, - ) - }) - .collect()) + )?, + )) + } + Ok(local_timeline_info) } pub(crate) fn create_timeline( @@ -336,16 +341,8 @@ pub(crate) fn create_timeline( let repo = tenant_mgr::get_repository_for_tenant(tenant_id)?; if conf.timeline_path(&new_timeline_id, &tenant_id).exists() { - match repo.get_timeline(new_timeline_id)? { - RepositoryTimeline::Local { id, .. } => { - debug!("timeline {} already exists", id); - return Ok(None); - } - RepositoryTimeline::Remote { id, .. } => bail!( - "timeline {} already exists in pageserver's remote storage", - id - ), - } + debug!("timeline {} already exists", new_timeline_id); + return Ok(None); } let mut start_lsn = ancestor_start_lsn.unwrap_or(Lsn(0)); @@ -353,15 +350,8 @@ pub(crate) fn create_timeline( let new_timeline_info = match ancestor_timeline_id { Some(ancestor_timeline_id) => { let ancestor_timeline = repo - .get_timeline(ancestor_timeline_id) - .with_context(|| format!("Cannot get ancestor timeline {}", ancestor_timeline_id))? - .local_timeline() - .with_context(|| { - format!( - "Cannot branch off the timeline {} that's not present locally", - ancestor_timeline_id - ) - })?; + .get_timeline_load(ancestor_timeline_id) + .context("Cannot branch off the timeline that's not present locally")?; if start_lsn == Lsn(0) { // Find end of WAL on the old timeline @@ -391,18 +381,20 @@ pub(crate) fn create_timeline( } repo.branch_timeline(ancestor_timeline_id, new_timeline_id, start_lsn)?; // load the timeline into memory - let loaded_timeline = repo.get_timeline(new_timeline_id)?; - TimelineInfo::from_repo_timeline(tenant_id, loaded_timeline, false) + let loaded_timeline = repo.get_timeline_load(new_timeline_id)?; + LocalTimelineInfo::from_loaded_timeline(loaded_timeline.as_ref(), false) + .context("cannot fill timeline info")? } None => { let new_timeline = bootstrap_timeline(conf, tenant_id, new_timeline_id, repo.as_ref())?; - TimelineInfo::from_dyn_timeline( - tenant_id, - new_timeline_id, - new_timeline.as_ref(), - false, - ) + LocalTimelineInfo::from_loaded_timeline(new_timeline.as_ref(), false) + .context("cannot fill timeline info")? } }; - Ok(Some(new_timeline_info)) + Ok(Some(TimelineInfo { + tenant_id, + timeline_id: new_timeline_id, + local: Some(new_timeline_info), + remote: None, + })) } diff --git a/pageserver/src/walreceiver.rs b/pageserver/src/walreceiver.rs index 6fff1d062d..305dd4b3a2 100644 --- a/pageserver/src/walreceiver.rs +++ b/pageserver/src/walreceiver.rs @@ -31,6 +31,7 @@ use tracing::*; use zenith_utils::lsn::Lsn; use zenith_utils::pq_proto::ZenithFeedback; use zenith_utils::zid::ZTenantId; +use zenith_utils::zid::ZTenantTimelineId; use zenith_utils::zid::ZTimelineId; // @@ -111,18 +112,18 @@ fn get_wal_producer_connstr(tenantid: ZTenantId, timelineid: ZTimelineId) -> Str // fn thread_main( conf: &'static PageServerConf, - tenantid: ZTenantId, - timelineid: ZTimelineId, + tenant_id: ZTenantId, + timeline_id: ZTimelineId, ) -> Result<()> { - let _enter = info_span!("WAL receiver", timeline = %timelineid, tenant = %tenantid).entered(); + let _enter = info_span!("WAL receiver", timeline = %timeline_id, tenant = %tenant_id).entered(); info!("WAL receiver thread started"); // Look up the current WAL producer address - let wal_producer_connstr = get_wal_producer_connstr(tenantid, timelineid); + let wal_producer_connstr = get_wal_producer_connstr(tenant_id, timeline_id); // Make a connection to the WAL safekeeper, or directly to the primary PostgreSQL server, // and start streaming WAL from it. - let res = walreceiver_main(conf, tenantid, timelineid, &wal_producer_connstr); + let res = walreceiver_main(conf, tenant_id, timeline_id, &wal_producer_connstr); // TODO cleanup info messages if let Err(e) = res { @@ -130,20 +131,20 @@ fn thread_main( } else { info!( "walreceiver disconnected tenant {}, timelineid {}", - tenantid, timelineid + tenant_id, timeline_id ); } // Drop it from list of active WAL_RECEIVERS // so that next callmemaybe request launched a new thread - drop_wal_receiver(tenantid, timelineid); + drop_wal_receiver(tenant_id, timeline_id); Ok(()) } fn walreceiver_main( _conf: &PageServerConf, - tenantid: ZTenantId, - timelineid: ZTimelineId, + tenant_id: ZTenantId, + timeline_id: ZTimelineId, wal_producer_connstr: &str, ) -> Result<(), Error> { // Connect to the database in replication mode. @@ -182,13 +183,16 @@ fn walreceiver_main( let end_of_wal = Lsn::from(u64::from(identify.xlogpos)); let mut caught_up = false; - let timeline = - tenant_mgr::get_timeline_for_tenant(tenantid, timelineid).with_context(|| { - format!( - "Can not start the walrecever for a remote tenant {}, timeline {}", - tenantid, timelineid, - ) - })?; + let repo = tenant_mgr::get_repository_for_tenant(tenant_id) + .with_context(|| format!("no repository found for tenant {}", tenant_id))?; + let timeline = repo.get_timeline_load(timeline_id).with_context(|| { + format!( + "local timeline {} not found for tenant {}", + timeline_id, tenant_id + ) + })?; + + let remote_index = repo.get_remote_index(); // // Start streaming the WAL, from where we left off previously. @@ -292,11 +296,19 @@ fn walreceiver_main( }; if let Some(last_lsn) = status_update { - let timeline_synced_disk_consistent_lsn = - tenant_mgr::get_repository_for_tenant(tenantid)? - .get_timeline_state(timelineid) - .and_then(|state| state.remote_disk_consistent_lsn()) - .unwrap_or(Lsn(0)); + let timeline_remote_consistent_lsn = runtime.block_on(async { + remote_index + .read() + .await + // here we either do not have this timeline in remote index + // or there were no checkpoints for it yet + .timeline_entry(&ZTenantTimelineId { + tenant_id, + timeline_id, + }) + .and_then(|e| e.disk_consistent_lsn()) + .unwrap_or(Lsn(0)) // no checkpoint was uploaded + }); // The last LSN we processed. It is not guaranteed to survive pageserver crash. let write_lsn = u64::from(last_lsn); @@ -304,7 +316,7 @@ fn walreceiver_main( let flush_lsn = u64::from(timeline.get_disk_consistent_lsn()); // The last LSN that is synced to remote storage and is guaranteed to survive pageserver crash // Used by safekeepers to remove WAL preceding `remote_consistent_lsn`. - let apply_lsn = u64::from(timeline_synced_disk_consistent_lsn); + let apply_lsn = u64::from(timeline_remote_consistent_lsn); let ts = SystemTime::now(); // Send zenith feedback message. diff --git a/test_runner/batch_others/test_remote_storage.py b/test_runner/batch_others/test_remote_storage.py index edcc768819..8689838089 100644 --- a/test_runner/batch_others/test_remote_storage.py +++ b/test_runner/batch_others/test_remote_storage.py @@ -5,7 +5,7 @@ import time, shutil, os from contextlib import closing from pathlib import Path from uuid import UUID -from fixtures.zenith_fixtures import ZenithEnvBuilder +from fixtures.zenith_fixtures import ZenithEnvBuilder, assert_local, wait_for, wait_for_last_record_lsn, wait_for_upload from fixtures.log_helper import log import pytest @@ -26,7 +26,6 @@ import pytest # * queries the specific data, ensuring that it matches the one stored before # # The tests are done for all types of remote storage pageserver supports. -@pytest.mark.skip(reason="will be fixed with https://github.com/zenithdb/zenith/issues/1193") @pytest.mark.parametrize('storage_type', ['local_fs', 'mock_s3']) def test_remote_storage_backup_and_restore(zenith_env_builder: ZenithEnvBuilder, storage_type: str): zenith_env_builder.rust_log_override = 'debug' @@ -45,6 +44,8 @@ def test_remote_storage_backup_and_restore(zenith_env_builder: ZenithEnvBuilder, env = zenith_env_builder.init_start() pg = env.postgres.create_start('main') + client = env.pageserver.http_client() + tenant_id = pg.safe_psql("show zenith.zenith_tenant")[0][0] timeline_id = pg.safe_psql("show zenith.zenith_timeline")[0][0] @@ -54,13 +55,21 @@ def test_remote_storage_backup_and_restore(zenith_env_builder: ZenithEnvBuilder, CREATE TABLE t1(id int primary key, secret text); INSERT INTO t1 VALUES ({data_id}, '{data_secret}'); ''') + cur.execute("SELECT pg_current_wal_flush_lsn()") + current_lsn = int(cur.fetchone()[0].split('/')[1], base=16) + + # wait until pageserver receives that data + wait_for_last_record_lsn(client, UUID(tenant_id), UUID(timeline_id), current_lsn) # run checkpoint manually to be sure that data landed in remote storage with closing(env.pageserver.connect()) as psconn: with psconn.cursor() as pscur: - pscur.execute(f"do_gc {tenant_id} {timeline_id}") - log.info("waiting for upload") # TODO api to check if upload is done - time.sleep(2) + pscur.execute(f"checkpoint {tenant_id} {timeline_id}") + + log.info("waiting for upload") + # wait until pageserver successfully uploaded a checkpoint to remote storage + wait_for_upload(client, UUID(tenant_id), UUID(timeline_id), current_lsn) + log.info("upload is done") ##### Stop the first pageserver instance, erase all its data env.postgres.stop_all() @@ -73,26 +82,12 @@ def test_remote_storage_backup_and_restore(zenith_env_builder: ZenithEnvBuilder, ##### Second start, restore the data and ensure it's the same env.pageserver.start() - client = env.pageserver.http_client() client.timeline_attach(UUID(tenant_id), UUID(timeline_id)) - # FIXME cannot handle duplicate download requests (which might be caused by repeated timeline detail calls) - # subject to fix in https://github.com/zenithdb/zenith/issues/997 - time.sleep(5) log.info("waiting for timeline redownload") - attempts = 0 - while True: - timeline_details = client.timeline_detail(UUID(tenant_id), UUID(timeline_id)) - assert timeline_details['timeline_id'] == timeline_id - assert timeline_details['tenant_id'] == tenant_id - if timeline_details['kind'] == 'Local': - log.info("timeline downloaded, checking its data") - break - attempts += 1 - if attempts > 10: - raise Exception("timeline redownload failed") - log.debug("still waiting") - time.sleep(1) + wait_for(number_of_iterations=10, + interval=1, + func=lambda: assert_local(client, UUID(tenant_id), UUID(timeline_id))) pg = env.postgres.create_start('main') with closing(pg.connect()) as conn: diff --git a/test_runner/batch_others/test_tenant_relocation.py b/test_runner/batch_others/test_tenant_relocation.py index 7a9d478f16..e4492e5393 100644 --- a/test_runner/batch_others/test_tenant_relocation.py +++ b/test_runner/batch_others/test_tenant_relocation.py @@ -3,17 +3,19 @@ import os import pathlib import subprocess import threading +from typing import Dict from uuid import UUID from fixtures.log_helper import log import time import signal import pytest -from fixtures.zenith_fixtures import PgProtocol, PortDistributor, Postgres, ZenithEnvBuilder, ZenithPageserverHttpClient, zenith_binpath, pg_distrib_dir +from fixtures.zenith_fixtures import PgProtocol, PortDistributor, Postgres, ZenithEnvBuilder, ZenithPageserverHttpClient, assert_local, wait_for, wait_for_last_record_lsn, wait_for_upload, zenith_binpath, pg_distrib_dir def assert_abs_margin_ratio(a: float, b: float, margin_ratio: float): - assert abs(a - b) / a < margin_ratio, (a, b, margin_ratio) + print("!" * 100, abs(a - b) / a) + assert abs(a - b) / a < margin_ratio, abs(a - b) / a @contextmanager @@ -34,6 +36,7 @@ def new_pageserver_helper(new_pageserver_dir: pathlib.Path, f"-c listen_pg_addr='localhost:{pg_port}'", f"-c listen_http_addr='localhost:{http_port}'", f"-c pg_distrib_dir='{pg_distrib_dir}'", + f"-c id=2", f"-c remote_storage={{local_path='{remote_storage_mock_path}'}}", ] @@ -57,20 +60,6 @@ def new_pageserver_helper(new_pageserver_dir: pathlib.Path, os.kill(pid, signal.SIGQUIT) -def wait_for(number_of_iterations: int, interval: int, func): - last_exception = None - for i in range(number_of_iterations): - try: - res = func() - except Exception as e: - log.info("waiting for %s iteration %s failed", func, i + 1) - last_exception = e - time.sleep(interval) - continue - return res - raise Exception("timed out while waiting for %s" % func) from last_exception - - @contextmanager def pg_cur(pg): with closing(pg.connect()) as conn: @@ -108,13 +97,6 @@ def load(pg: Postgres, stop_event: threading.Event, load_ok_event: threading.Eve log.info('load thread stopped') -def assert_local(pageserver_http_client: ZenithPageserverHttpClient, tenant: UUID, timeline: str): - timeline_detail = pageserver_http_client.timeline_detail(tenant, UUID(timeline)) - assert timeline_detail.get('type') == "Local", timeline_detail - return timeline_detail - - -@pytest.mark.skip(reason="will be fixed with https://github.com/zenithdb/zenith/issues/1193") @pytest.mark.parametrize('with_load', ['with_load', 'without_load']) def test_tenant_relocation(zenith_env_builder: ZenithEnvBuilder, port_distributor: PortDistributor, @@ -129,7 +111,7 @@ def test_tenant_relocation(zenith_env_builder: ZenithEnvBuilder, tenant = env.zenith_cli.create_tenant(UUID("74ee8b079a0e437eb0afea7d26a07209")) log.info("tenant to relocate %s", tenant) - + env.zenith_cli.create_root_branch('main', tenant_id=tenant) env.zenith_cli.create_branch('test_tenant_relocation', tenant_id=tenant) tenant_pg = env.postgres.create_start(branch_name='main', @@ -141,8 +123,8 @@ def test_tenant_relocation(zenith_env_builder: ZenithEnvBuilder, with conn.cursor() as cur: # save timeline for later gc call cur.execute("SHOW zenith.zenith_timeline") - timeline = cur.fetchone()[0] - log.info("timeline to relocate %s", timeline) + timeline = UUID(cur.fetchone()[0]) + log.info("timeline to relocate %s", timeline.hex) # we rely upon autocommit after each statement # as waiting for acceptors happens there @@ -150,6 +132,15 @@ def test_tenant_relocation(zenith_env_builder: ZenithEnvBuilder, cur.execute("INSERT INTO t SELECT generate_series(1,1000), 'some payload'") cur.execute("SELECT sum(key) FROM t") assert cur.fetchone() == (500500, ) + cur.execute("SELECT pg_current_wal_flush_lsn()") + + current_lsn = int(cur.fetchone()[0].split('/')[1], base=16) + + pageserver_http = env.pageserver.http_client() + + # wait until pageserver receives that data + wait_for_last_record_lsn(pageserver_http, tenant, timeline, current_lsn) + timeline_detail = pageserver_http.timeline_detail_v2(tenant, timeline) if with_load == 'with_load': # create load table @@ -165,12 +156,10 @@ def test_tenant_relocation(zenith_env_builder: ZenithEnvBuilder, # run checkpoint manually to be sure that data landed in remote storage with closing(env.pageserver.connect()) as psconn: with psconn.cursor() as pscur: - pscur.execute(f"do_gc {tenant.hex} {timeline}") + pscur.execute(f"checkpoint {tenant.hex} {timeline.hex}") - # ensure upload is completed - pageserver_http_client = env.pageserver.http_client() - timeline_detail = pageserver_http_client.timeline_detail(tenant, UUID(timeline)) - assert timeline_detail['disk_consistent_lsn'] == timeline_detail['timeline_state']['Ready'] + # wait until pageserver successfully uploaded a checkpoint to remote storage + wait_for_upload(pageserver_http, tenant, timeline, current_lsn) log.info("inititalizing new pageserver") # bootstrap second pageserver @@ -182,8 +171,7 @@ def test_tenant_relocation(zenith_env_builder: ZenithEnvBuilder, log.info("new pageserver ports pg %s http %s", new_pageserver_pg_port, new_pageserver_http_port) pageserver_bin = pathlib.Path(zenith_binpath) / 'pageserver' - new_pageserver_http_client = ZenithPageserverHttpClient(port=new_pageserver_http_port, - auth_token=None) + new_pageserver_http = ZenithPageserverHttpClient(port=new_pageserver_http_port, auth_token=None) with new_pageserver_helper(new_pageserver_dir, pageserver_bin, @@ -192,25 +180,18 @@ def test_tenant_relocation(zenith_env_builder: ZenithEnvBuilder, new_pageserver_http_port): # call to attach timeline to new pageserver - new_pageserver_http_client.timeline_attach(tenant, UUID(timeline)) - # FIXME cannot handle duplicate download requests, subject to fix in https://github.com/zenithdb/zenith/issues/997 - time.sleep(5) - # new pageserver should in sync (modulo wal tail or vacuum activity) with the old one because there was no new writes since checkpoint + new_pageserver_http.timeline_attach(tenant, timeline) + # new pageserver should be in sync (modulo wal tail or vacuum activity) with the old one because there was no new writes since checkpoint new_timeline_detail = wait_for( number_of_iterations=5, interval=1, - func=lambda: assert_local(new_pageserver_http_client, tenant, timeline)) - assert new_timeline_detail['timeline_state'].get('Ready'), new_timeline_detail + func=lambda: assert_local(new_pageserver_http, tenant, timeline)) + # when load is active these checks can break because lsns are not static # so lets check with some margin - if with_load == 'without_load': - # TODO revisit this once https://github.com/zenithdb/zenith/issues/1049 is fixed - assert_abs_margin_ratio(new_timeline_detail['disk_consistent_lsn'], - timeline_detail['disk_consistent_lsn'], - 0.01) - assert_abs_margin_ratio(new_timeline_detail['timeline_state']['Ready'], - timeline_detail['timeline_state']['Ready'], - 0.01) + assert_abs_margin_ratio(new_timeline_detail['local']['disk_consistent_lsn'], + timeline_detail['local']['disk_consistent_lsn'], + 0.03) # callmemaybe to start replication from safekeeper to the new pageserver # when there is no load there is a clean checkpoint and no wal delta @@ -219,7 +200,9 @@ def test_tenant_relocation(zenith_env_builder: ZenithEnvBuilder, with pg_cur(PgProtocol(host='localhost', port=new_pageserver_pg_port)) as cur: # "callmemaybe {} {} host={} port={} options='-c ztimelineid={} ztenantid={}'" safekeeper_connstring = f"host=localhost port={env.safekeepers[0].port.pg} options='-c ztimelineid={timeline} ztenantid={tenant} pageserver_connstr=postgresql://no_user:@localhost:{new_pageserver_pg_port}'" - cur.execute("callmemaybe {} {} {}".format(tenant, timeline, safekeeper_connstring)) + cur.execute("callmemaybe {} {} {}".format(tenant.hex, + timeline.hex, + safekeeper_connstring)) tenant_pg.stop() @@ -239,7 +222,7 @@ def test_tenant_relocation(zenith_env_builder: ZenithEnvBuilder, # detach tenant from old pageserver before we check # that all the data is there to be sure that old pageserver # is no longer involved, and if it is, we will see the errors - pageserver_http_client.timeline_detach(tenant, UUID(timeline)) + pageserver_http.timeline_detach(tenant, timeline) with pg_cur(tenant_pg) as cur: # check that data is still there diff --git a/test_runner/fixtures/zenith_fixtures.py b/test_runner/fixtures/zenith_fixtures.py index ec570a7dac..c44a6e431f 100644 --- a/test_runner/fixtures/zenith_fixtures.py +++ b/test_runner/fixtures/zenith_fixtures.py @@ -783,6 +783,15 @@ class ZenithPageserverHttpClient(requests.Session): assert isinstance(res_json, dict) return res_json + def timeline_detail_v2(self, tenant_id: uuid.UUID, timeline_id: uuid.UUID) -> Dict[Any, Any]: + res = self.get( + f"http://localhost:{self.port}/v2/tenant/{tenant_id.hex}/timeline/{timeline_id.hex}?include-non-incremental-logical-size=1" + ) + self.verbose_error(res) + res_json = res.json() + assert isinstance(res_json, dict) + return res_json + def get_metrics(self) -> str: res = self.get(f"http://localhost:{self.port}/metrics") self.verbose_error(res) @@ -866,6 +875,30 @@ class ZenithCli: return uuid.UUID(created_timeline_id) + def create_root_branch(self, branch_name: str, tenant_id: Optional[uuid.UUID] = None): + cmd = [ + 'timeline', + 'create', + '--branch-name', + branch_name, + '--tenant-id', + (tenant_id or self.env.initial_tenant).hex, + ] + + res = self.raw_cli(cmd) + res.check_returncode() + + matches = CREATE_TIMELINE_ID_EXTRACTOR.search(res.stdout) + + created_timeline_id = None + if matches is not None: + created_timeline_id = matches.group('timeline_id') + + if created_timeline_id is None: + raise Exception('could not find timeline id after `zenith timeline create` invocation') + else: + return uuid.UUID(created_timeline_id) + def create_branch(self, new_branch_name: str = DEFAULT_BRANCH_NAME, ancestor_branch_name: Optional[str] = None, @@ -1839,3 +1872,59 @@ def check_restored_datadir_content(test_output_dir: str, env: ZenithEnv, pg: Pos subprocess.run([cmd], stdout=stdout_f, shell=True) assert (mismatch, error) == ([], []) + + +def wait_for(number_of_iterations: int, interval: int, func): + last_exception = None + for i in range(number_of_iterations): + try: + res = func() + except Exception as e: + log.info("waiting for %s iteration %s failed", func, i + 1) + last_exception = e + time.sleep(interval) + continue + return res + raise Exception("timed out while waiting for %s" % func) from last_exception + + +def assert_local(pageserver_http_client: ZenithPageserverHttpClient, + tenant: uuid.UUID, + timeline: uuid.UUID): + timeline_detail = pageserver_http_client.timeline_detail_v2(tenant, timeline) + assert timeline_detail.get('local', {}).get("disk_consistent_lsn"), timeline_detail + return timeline_detail + + +def remote_consistent_lsn(pageserver_http_client: ZenithPageserverHttpClient, + tenant: uuid.UUID, + timeline: uuid.UUID) -> int: + detail = pageserver_http_client.timeline_detail_v2(tenant, timeline) + assert isinstance(detail['remote']['remote_consistent_lsn'], int) + return detail['remote']['remote_consistent_lsn'] + + +def wait_for_upload(pageserver_http_client: ZenithPageserverHttpClient, + tenant: uuid.UUID, + timeline: uuid.UUID, + lsn: int): + """waits for local timeline upload up to specified lsn""" + + wait_for(10, 1, lambda: remote_consistent_lsn(pageserver_http_client, tenant, timeline) >= lsn) + + +def last_record_lsn(pageserver_http_client: ZenithPageserverHttpClient, + tenant: uuid.UUID, + timeline: uuid.UUID) -> int: + detail = pageserver_http_client.timeline_detail_v2(tenant, timeline) + assert isinstance(detail['local']['last_record_lsn'], int) + return detail['local']['last_record_lsn'] + + +def wait_for_last_record_lsn(pageserver_http_client: ZenithPageserverHttpClient, + tenant: uuid.UUID, + timeline: uuid.UUID, + lsn: int): + """waits for pageserver to catch up to a certain lsn""" + + wait_for(10, 1, lambda: last_record_lsn(pageserver_http_client, tenant, timeline) >= lsn) diff --git a/zenith/src/main.rs b/zenith/src/main.rs index dd35427d5d..389c394103 100644 --- a/zenith/src/main.rs +++ b/zenith/src/main.rs @@ -299,42 +299,40 @@ fn print_timelines_tree( .iter() .map(|t| { ( - t.timeline_id(), + t.timeline_id, TimelineTreeEl { info: t.clone(), children: BTreeSet::new(), name: timeline_name_mappings - .remove(&ZTenantTimelineId::new(t.tenant_id(), t.timeline_id())), + .remove(&ZTenantTimelineId::new(t.tenant_id, t.timeline_id)), }, ) }) .collect::>(); // Memorize all direct children of each timeline. - for timeline in &timelines { - if let TimelineInfo::Local { - ancestor_timeline_id: Some(tid), - .. - } = timeline + for timeline in timelines.iter() { + if let Some(ancestor_timeline_id) = + timeline.local.as_ref().and_then(|l| l.ancestor_timeline_id) { timelines_hash - .get_mut(tid) + .get_mut(&ZTimelineId::from(ancestor_timeline_id)) .context("missing timeline info in the HashMap")? .children - .insert(timeline.timeline_id()); + .insert(timeline.timeline_id); } } for timeline in timelines_hash.values() { // Start with root local timelines (no ancestors) first. - if let TimelineInfo::Local { - ancestor_timeline_id, - .. - } = &timeline.info + if timeline + .info + .local + .as_ref() + .and_then(|l| l.ancestor_timeline_id) + .is_none() { - if ancestor_timeline_id.is_none() { - print_timeline(0, &Vec::from([true]), timeline, &timelines_hash)?; - } + print_timeline(0, &Vec::from([true]), timeline, &timelines_hash)?; } } @@ -350,20 +348,21 @@ fn print_timeline( timeline: &TimelineTreeEl, timelines: &HashMap, ) -> Result<()> { - let local_or_remote = match timeline.info { - TimelineInfo::Local { .. } => "(L)", - TimelineInfo::Remote { .. } => "(R)", + let local_remote = match (timeline.info.local.as_ref(), timeline.info.remote.as_ref()) { + (None, None) => unreachable!("in this case no info for a timeline is found"), + (None, Some(_)) => "(R)", + (Some(_), None) => "(L)", + (Some(_), Some(_)) => "(L+R)", }; // Draw main padding - print!("{} ", local_or_remote); + print!("{} ", local_remote); if nesting_level > 0 { - let lsn_string = match &timeline.info { - TimelineInfo::Local { ancestor_lsn, .. } => ancestor_lsn - .map(|lsn| lsn.to_string()) - .unwrap_or_else(|| "Unknown local Lsn".to_string()), - TimelineInfo::Remote { .. } => "unknown Lsn (remote)".to_string(), + let ancestor_lsn = match timeline.info.local.as_ref().and_then(|i| i.ancestor_lsn) { + Some(lsn) => lsn.to_string(), + None => "Unknown Lsn".to_string(), }; + let mut br_sym = "┣━"; // Draw each nesting padding with proper style @@ -383,14 +382,14 @@ fn print_timeline( br_sym = "┗━"; } - print!("{} @{}: ", br_sym, lsn_string); + print!("{} @{}: ", br_sym, ancestor_lsn); } // Finally print a timeline id and name with new line println!( "{} [{}]", timeline.name.as_deref().unwrap_or("_no_name_"), - timeline.info.timeline_id() + timeline.info.timeline_id ); let len = timeline.children.len(); @@ -430,7 +429,7 @@ fn get_timeline_infos( Ok(PageServerNode::from_env(env) .timeline_list(tenant_id)? .into_iter() - .map(|timeline_info| (timeline_info.timeline_id(), timeline_info)) + .map(|timeline_info| (timeline_info.timeline_id, timeline_info)) .collect()) } @@ -555,26 +554,17 @@ fn handle_timeline(timeline_match: &ArgMatches, env: &mut local_env::LocalEnv) - let timeline = pageserver .timeline_create(tenant_id, None, None, None)? .ok_or_else(|| anyhow!("Failed to create new timeline for tenant {}", tenant_id))?; - let new_timeline_id = timeline.timeline_id(); + let new_timeline_id = timeline.timeline_id; - let last_record_lsn = match timeline { - TimelineInfo::Local { - last_record_lsn, .. - } => last_record_lsn, - TimelineInfo::Remote { .. } => { - bail!( - "Timeline {} was created as remote, not local", - new_timeline_id - ) - } - }; + let last_record_lsn = timeline + .local + .expect("no local timeline info") + .last_record_lsn; env.register_branch_mapping(new_branch_name.to_string(), tenant_id, new_timeline_id)?; println!( "Created timeline '{}' at Lsn {} for tenant: {}", - timeline.timeline_id(), - last_record_lsn, - tenant_id, + timeline.timeline_id, last_record_lsn, tenant_id, ); } Some(("branch", branch_match)) => { @@ -602,26 +592,18 @@ fn handle_timeline(timeline_match: &ArgMatches, env: &mut local_env::LocalEnv) - let timeline = pageserver .timeline_create(tenant_id, None, start_lsn, Some(ancestor_timeline_id))? .ok_or_else(|| anyhow!("Failed to create new timeline for tenant {}", tenant_id))?; - let new_timeline_id = timeline.timeline_id(); + let new_timeline_id = timeline.timeline_id; - let last_record_lsn = match timeline { - TimelineInfo::Local { - last_record_lsn, .. - } => last_record_lsn, - TimelineInfo::Remote { .. } => bail!( - "Timeline {} was created as remote, not local", - new_timeline_id - ), - }; + let last_record_lsn = timeline + .local + .expect("no local timeline info") + .last_record_lsn; env.register_branch_mapping(new_branch_name.to_string(), tenant_id, new_timeline_id)?; println!( "Created timeline '{}' at Lsn {} for tenant: {}. Ancestor timeline: '{}'", - timeline.timeline_id(), - last_record_lsn, - tenant_id, - ancestor_branch_name, + timeline.timeline_id, last_record_lsn, tenant_id, ancestor_branch_name, ); } Some((sub_name, _)) => bail!("Unexpected tenant subcommand '{}'", sub_name), @@ -662,13 +644,8 @@ fn handle_pg(pg_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> { // older point in time, or following but lagging behind the primary. let lsn_str = timeline_infos .get(&node.timeline_id) - .map(|bi| match bi { - TimelineInfo::Local { - last_record_lsn, .. - } => last_record_lsn.to_string(), - TimelineInfo::Remote { .. } => "? (remote)".to_string(), - }) - .unwrap_or_else(|| '?'.to_string()); + .and_then(|bi| bi.local.as_ref().map(|l| l.last_record_lsn.to_string())) + .unwrap_or_else(|| "?".to_string()); let branch_name = timeline_name_mappings .get(&ZTenantTimelineId::new(tenant_id, node.timeline_id)) diff --git a/zenith_utils/src/http/error.rs b/zenith_utils/src/http/error.rs index 3262c33a51..b23fa029d4 100644 --- a/zenith_utils/src/http/error.rs +++ b/zenith_utils/src/http/error.rs @@ -14,6 +14,9 @@ pub enum ApiError { #[error("Unauthorized: {0}")] Unauthorized(String), + #[error("NotFound: {0}")] + NotFound(String), + #[error(transparent)] InternalServerError(#[from] anyhow::Error), } @@ -36,6 +39,9 @@ impl ApiError { self.to_string(), StatusCode::UNAUTHORIZED, ), + ApiError::NotFound(_) => { + HttpErrorBody::response_from_msg_and_status(self.to_string(), StatusCode::NOT_FOUND) + } ApiError::InternalServerError(err) => HttpErrorBody::response_from_msg_and_status( err.to_string(), StatusCode::INTERNAL_SERVER_ERROR,