Download timelines on demand

This commit is contained in:
Kirill Bulatov
2021-12-07 16:11:07 +02:00
committed by Kirill Bulatov
parent e61732ca7c
commit 673c297949
22 changed files with 2727 additions and 1322 deletions

View File

@@ -13,7 +13,7 @@ use std::path::{Path, PathBuf};
use std::process::{Command, Stdio};
use zenith_utils::auth::{encode_from_key_file, Claims, Scope};
use zenith_utils::postgres_backend::AuthType;
use zenith_utils::zid::ZTenantId;
use zenith_utils::zid::{opt_display_serde, ZTenantId};
//
// This data structures represents zenith CLI config
@@ -46,7 +46,7 @@ pub struct LocalEnv {
// Default tenant ID to use with the 'zenith' command line utility, when
// --tenantid is not explicitly specified.
#[serde(with = "opt_tenantid_serde")]
#[serde(with = "opt_display_serde")]
#[serde(default)]
pub default_tenantid: Option<ZTenantId>,
@@ -325,30 +325,3 @@ fn base_path() -> PathBuf {
None => ".zenith".into(),
}
}
/// Serde routines for Option<ZTenantId>. The serialized form is a hex string.
mod opt_tenantid_serde {
use serde::{Deserialize, Deserializer, Serialize, Serializer};
use std::str::FromStr;
use zenith_utils::zid::ZTenantId;
pub fn serialize<S>(tenantid: &Option<ZTenantId>, ser: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
tenantid.map(|t| t.to_string()).serialize(ser)
}
pub fn deserialize<'de, D>(des: D) -> Result<Option<ZTenantId>, D::Error>
where
D: Deserializer<'de>,
{
let s: Option<String> = Option::deserialize(des)?;
if let Some(s) = s {
return Ok(Some(
ZTenantId::from_str(&s).map_err(serde::de::Error::custom)?,
));
}
Ok(None)
}
}

View File

@@ -559,13 +559,17 @@ fn start_pageserver(conf: &'static PageServerConf) -> Result<()> {
}
let signals = signals::install_shutdown_handlers()?;
let mut threads = vec![];
let mut threads = Vec::new();
if let Some(handle) = remote_storage::run_storage_sync_thread(conf)? {
let sync_startup = remote_storage::start_local_timeline_sync(conf)
.context("Failed to set up local files sync with external storage")?;
if let Some(handle) = sync_startup.sync_loop_handle {
threads.push(handle);
}
// Initialize tenant manager.
tenant_mgr::init(conf);
tenant_mgr::set_timeline_states(conf, sync_startup.initial_timeline_states);
// initialize authentication for incoming connections
let auth = match &conf.auth_type {

View File

@@ -4,7 +4,7 @@
// TODO: move all paths construction to conf impl
//
use anyhow::{bail, Context, Result};
use anyhow::{anyhow, bail, Context, Result};
use postgres_ffi::ControlFileData;
use serde::{Deserialize, Serialize};
use std::{
@@ -21,10 +21,10 @@ use zenith_utils::logging;
use zenith_utils::lsn::Lsn;
use zenith_utils::zid::{ZTenantId, ZTimelineId};
use crate::tenant_mgr;
use crate::walredo::WalRedoManager;
use crate::CheckpointConfig;
use crate::{repository::Repository, PageServerConf};
use crate::{repository::RepositoryTimeline, tenant_mgr};
use crate::{restore_local_repo, LOG_FILE_NAME};
#[derive(Serialize, Deserialize, Clone)]
@@ -54,7 +54,12 @@ impl BranchInfo {
.to_string();
let timeline_id = std::fs::read_to_string(path)?.parse::<ZTimelineId>()?;
let timeline = repo.get_timeline(timeline_id)?;
let timeline = match repo.get_timeline(timeline_id)? {
RepositoryTimeline::Local(local_entry) => local_entry,
RepositoryTimeline::Remote(_) => {
bail!("Timeline {} is remote, no branches to display", timeline_id)
}
};
// we use ancestor lsn zero if we don't have an ancestor, so turn this into an option based on timeline id
let (ancestor_id, ancestor_lsn) = match timeline.get_ancestor_timeline_id() {
@@ -149,7 +154,7 @@ pub fn create_repo(
conf,
wal_redo_manager,
tenantid,
false,
conf.remote_storage_config.is_some(),
));
// Load data into pageserver
@@ -297,7 +302,10 @@ pub(crate) fn create_branch(
}
let mut startpoint = parse_point_in_time(conf, startpoint_str, tenantid)?;
let timeline = repo.get_timeline(startpoint.timelineid)?;
let timeline = repo
.get_timeline(startpoint.timelineid)?
.local_timeline()
.ok_or_else(|| anyhow!("Cannot branch off the timeline that's not present locally"))?;
if startpoint.lsn == Lsn(0) {
// Find end of WAL on the old timeline
let end_of_wal = timeline.get_last_record_lsn();

View File

@@ -17,6 +17,98 @@ paths:
application/json:
schema:
type: object
/v1/timeline/{tenant_id}:
parameters:
- name: tenant_id
in: path
required: true
schema:
type: string
format: hex
get:
description: List tenant timelines
responses:
"200":
description: array of brief timeline descriptions
content:
application/json:
schema:
type: array
items:
# currently, just a timeline id string, but when remote index gets to be accessed
# remote/local timeline field would be added at least
type: string
"400":
description: Error when no tenant id found in path
content:
application/json:
schema:
$ref: "#/components/schemas/Error"
"401":
description: Unauthorized Error
content:
application/json:
schema:
$ref: "#/components/schemas/UnauthorizedError"
"403":
description: Forbidden Error
content:
application/json:
schema:
$ref: "#/components/schemas/ForbiddenError"
"500":
description: Generic operation error
content:
application/json:
schema:
$ref: "#/components/schemas/Error"
/v1/timeline/{tenant_id}/{timeline_id}:
parameters:
- name: tenant_id
in: path
required: true
schema:
type: string
format: hex
- name: timeline_id
in: path
required: true
schema:
type: string
format: hex
get:
description: Get timeline info for tenant's remote timeline
responses:
"200":
description: TimelineInfo
content:
application/json:
schema:
$ref: "#/components/schemas/TimelineInfo"
"400":
description: Error when no tenant id found in path or no branch name
content:
application/json:
schema:
$ref: "#/components/schemas/Error"
"401":
description: Unauthorized Error
content:
application/json:
schema:
$ref: "#/components/schemas/UnauthorizedError"
"403":
description: Forbidden Error
content:
application/json:
schema:
$ref: "#/components/schemas/ForbiddenError"
"500":
description: Generic operation error
content:
application/json:
schema:
$ref: "#/components/schemas/Error"
/v1/branch/{tenant_id}:
parameters:
- name: tenant_id
@@ -284,6 +376,36 @@ components:
type: integer
current_logical_size_non_incremental:
type: integer
TimelineInfo:
type: object
required:
- timeline_id
- tenant_id
- last_record_lsn
- prev_record_lsn
- start_lsn
- disk_consistent_lsn
properties:
timeline_id:
type: string
format: hex
tenant_id:
type: string
format: hex
ancestor_timeline_id:
type: string
format: hex
last_record_lsn:
type: string
prev_record_lsn:
type: string
start_lsn:
type: string
disk_consistent_lsn:
type: string
timeline_state:
type: string
Error:
type: object
required:

View File

@@ -1,10 +1,11 @@
use std::sync::Arc;
use anyhow::Result;
use anyhow::{bail, Context, Result};
use hyper::header;
use hyper::StatusCode;
use hyper::{Body, Request, Response, Uri};
use routerify::{ext::RequestExt, RouterBuilder};
use serde::Serialize;
use tracing::*;
use zenith_utils::auth::JwtAuth;
use zenith_utils::http::endpoint::attach_openapi_ui;
@@ -18,10 +19,13 @@ use zenith_utils::http::{
request::get_request_param,
request::parse_request_param,
};
use zenith_utils::lsn::Lsn;
use zenith_utils::zid::{opt_display_serde, ZTimelineId};
use super::models::BranchCreateRequest;
use super::models::TenantCreateRequest;
use crate::branches::BranchInfo;
use crate::repository::TimelineSyncState;
use crate::{branches, tenant_mgr, PageServerConf, ZTenantId};
#[derive(Debug)]
@@ -140,6 +144,97 @@ async fn branch_detail_handler(request: Request<Body>) -> Result<Response<Body>,
Ok(json_response(StatusCode::OK, response_data)?)
}
async fn timeline_list_handler(request: Request<Body>) -> Result<Response<Body>, ApiError> {
let tenant_id: ZTenantId = parse_request_param(&request, "tenant_id")?;
check_permission(&request, Some(tenant_id))?;
let conf = get_state(&request).conf;
let timelines_dir = conf.timelines_path(&tenant_id);
let mut timelines_dir_contents =
tokio::fs::read_dir(&timelines_dir).await.with_context(|| {
format!(
"Failed to list timelines dir '{}' contents",
timelines_dir.display()
)
})?;
let mut local_timelines = Vec::new();
while let Some(entry) = timelines_dir_contents.next_entry().await.with_context(|| {
format!(
"Failed to list timelines dir '{}' contents",
timelines_dir.display()
)
})? {
let entry_path = entry.path();
let entry_type = entry.file_type().await.with_context(|| {
format!(
"Failed to get file type of timeline dirs' entry '{}'",
entry_path.display()
)
})?;
if entry_type.is_dir() {
match entry.file_name().to_string_lossy().parse::<ZTimelineId>() {
Ok(timeline_id) => local_timelines.push(timeline_id.to_string()),
Err(e) => error!(
"Failed to get parse timeline id from timeline dirs' entry '{}': {}",
entry_path.display(),
e
),
}
}
}
Ok(json_response(StatusCode::OK, local_timelines)?)
}
#[derive(Debug, Serialize)]
struct TimelineInfo {
#[serde(with = "hex")]
timeline_id: ZTimelineId,
#[serde(with = "hex")]
tenant_id: ZTenantId,
#[serde(with = "opt_display_serde")]
ancestor_timeline_id: Option<ZTimelineId>,
last_record_lsn: Lsn,
prev_record_lsn: Lsn,
start_lsn: Lsn,
disk_consistent_lsn: Lsn,
timeline_state: Option<TimelineSyncState>,
}
async fn timeline_detail_handler(request: Request<Body>) -> Result<Response<Body>, ApiError> {
let tenant_id: ZTenantId = parse_request_param(&request, "tenant_id")?;
check_permission(&request, Some(tenant_id))?;
let timeline_id: ZTimelineId = parse_request_param(&request, "timeline_id")?;
let response_data = tokio::task::spawn_blocking(move || {
let _enter =
info_span!("timeline_detail_handler", tenant = %tenant_id, timeline = %timeline_id)
.entered();
let repo = tenant_mgr::get_repository_for_tenant(tenant_id)?;
match repo.get_timeline(timeline_id)?.local_timeline() {
None => bail!("Timeline with id {} is not present locally", timeline_id),
Some(timeline) => Ok::<_, anyhow::Error>(TimelineInfo {
timeline_id,
tenant_id,
ancestor_timeline_id: timeline.get_ancestor_timeline_id(),
disk_consistent_lsn: timeline.get_disk_consistent_lsn(),
last_record_lsn: timeline.get_last_record_lsn(),
prev_record_lsn: timeline.get_prev_record_lsn(),
start_lsn: timeline.get_start_lsn(),
timeline_state: repo.get_timeline_state(timeline_id),
}),
}
})
.await
.map_err(ApiError::from_err)??;
Ok(json_response(StatusCode::OK, response_data)?)
}
async fn tenant_list_handler(request: Request<Body>) -> Result<Response<Body>, ApiError> {
// check for management permission
check_permission(&request, None)?;
@@ -196,6 +291,11 @@ pub fn make_router(
router
.data(Arc::new(State::new(conf, auth)))
.get("/v1/status", status_handler)
.get("/v1/timeline/:tenant_id", timeline_list_handler)
.get(
"/v1/timeline/:tenant_id/:timeline_id",
timeline_detail_handler,
)
.get("/v1/branch/:tenant_id", branch_list_handler)
.get("/v1/branch/:tenant_id/:branch_name", branch_detail_handler)
.post("/v1/branch", branch_create_handler)

View File

@@ -27,15 +27,18 @@ use std::fs::{File, OpenOptions};
use std::io::Write;
use std::ops::{Bound::Included, Deref};
use std::path::{Path, PathBuf};
use std::sync::atomic::{self, AtomicUsize};
use std::sync::atomic::{self, AtomicBool, AtomicUsize};
use std::sync::{Arc, Mutex, MutexGuard};
use std::time::{Duration, Instant};
use self::metadata::{metadata_path, TimelineMetadata};
use crate::page_cache;
use crate::relish::*;
use crate::remote_storage::schedule_timeline_checkpoint_upload;
use crate::repository::{GcResult, Repository, Timeline, TimelineWriter, WALRecord};
use crate::remote_storage::{schedule_timeline_checkpoint_upload, schedule_timeline_download};
use crate::repository::{
GcResult, Repository, RepositoryTimeline, Timeline, TimelineSyncState, TimelineWriter,
WALRecord,
};
use crate::tenant_mgr;
use crate::walreceiver;
use crate::walreceiver::IS_WAL_RECEIVER;
@@ -118,7 +121,6 @@ lazy_static! {
}
/// Parts of the `.zenith/tenants/<tenantid>/timelines/<timelineid>` directory prefix.
pub const TENANTS_SEGMENT_NAME: &str = "tenants";
pub const TIMELINES_SEGMENT_NAME: &str = "timelines";
///
@@ -127,20 +129,25 @@ pub const TIMELINES_SEGMENT_NAME: &str = "timelines";
pub struct LayeredRepository {
conf: &'static PageServerConf,
tenantid: ZTenantId,
timelines: Mutex<HashMap<ZTimelineId, Arc<LayeredTimeline>>>,
timelines: Mutex<HashMap<ZTimelineId, LayeredTimelineEntry>>,
walredo_mgr: Arc<dyn WalRedoManager + Send + Sync>,
/// Makes evey repo's timelines to backup their files to remote storage,
/// when they get frozen.
/// Makes every timeline to backup their files to remote storage.
upload_relishes: bool,
}
/// Public interface
impl Repository for LayeredRepository {
fn get_timeline(&self, timelineid: ZTimelineId) -> Result<Arc<dyn Timeline>> {
fn get_timeline(&self, timelineid: ZTimelineId) -> Result<RepositoryTimeline> {
let mut timelines = self.timelines.lock().unwrap();
Ok(self.get_timeline_locked(timelineid, &mut timelines)?)
Ok(
match self.get_or_init_timeline(timelineid, &mut timelines)? {
LayeredTimelineEntry::Local(local) => RepositoryTimeline::Local(local),
LayeredTimelineEntry::Remote(remote_timeline_id) => {
RepositoryTimeline::Remote(remote_timeline_id)
}
},
)
}
fn create_empty_timeline(
@@ -164,11 +171,11 @@ impl Repository for LayeredRepository {
self.tenantid,
Arc::clone(&self.walredo_mgr),
0,
false,
)?;
self.upload_relishes,
);
let timeline_rc = Arc::new(timeline);
let r = timelines.insert(timelineid, timeline_rc.clone());
let r = timelines.insert(timelineid, LayeredTimelineEntry::Local(timeline_rc.clone()));
assert!(r.is_none());
Ok(timeline_rc)
}
@@ -179,7 +186,12 @@ impl Repository for LayeredRepository {
// about timelines, so otherwise a race condition is possible, where we create new timeline and GC
// concurrently removes data that is needed by the new timeline.
let mut timelines = self.timelines.lock().unwrap();
let src_timeline = self.get_timeline_locked(src, &mut timelines)?;
let src_timeline = match self.get_or_init_timeline(src, &mut timelines)? {
LayeredTimelineEntry::Local(timeline) => timeline,
LayeredTimelineEntry::Remote(_) => {
bail!("Cannot branch off the timeline {} that's not local", src)
}
};
src_timeline
.check_lsn_is_in_scope(start_lsn)
@@ -243,17 +255,22 @@ impl Repository for LayeredRepository {
// checkpoints. We don't want to block everything else while the
// checkpoint runs.
let timelines = self.timelines.lock().unwrap();
let timelines_to_checkpoint: Vec<(ZTimelineId, Arc<LayeredTimeline>)> = timelines
let timelines_to_checkpoint = timelines
.iter()
.map(|(timelineid, timeline)| (*timelineid, timeline.clone()))
.collect();
.collect::<Vec<_>>();
drop(timelines);
for (timelineid, timeline) in timelines_to_checkpoint.iter() {
for (timelineid, timeline) in &timelines_to_checkpoint {
let _entered =
info_span!("checkpoint", timeline = %timelineid, tenant = %self.tenantid).entered();
timeline.checkpoint(cconf)?;
match timeline {
LayeredTimelineEntry::Local(timeline) => timeline.checkpoint(cconf)?,
LayeredTimelineEntry::Remote(_) => debug!(
"Cannot run the checkpoint for remote timeline {}",
timelineid
),
}
}
Ok(())
@@ -265,104 +282,177 @@ impl Repository for LayeredRepository {
let timelines = self.timelines.lock().unwrap();
for (timelineid, timeline) in timelines.iter() {
shutdown_timeline(*timelineid, timeline.as_ref())?;
shutdown_timeline(self.tenantid, *timelineid, timeline)?;
}
Ok(())
}
fn unload_timeline(&self, timeline_id: ZTimelineId) -> Result<()> {
let mut timelines = self.timelines.lock().unwrap();
let removed_timeline = match timelines.remove(&timeline_id) {
Some(timeline) => timeline,
None => {
warn!("Timeline {} not found, nothing to remove", timeline_id);
return Ok(());
// TODO this method currentlly does not do anything to prevent (or react to) state updates between a sync task schedule and a sync task end (that causes this update).
// Sync task is enqueued and can error and be rescheduled, so some significant time may pass between the events.
//
/// Reacts on the timeline sync state change, changing pageserver's memory state for this timeline (unload or load of the timeline files).
fn set_timeline_state(
&self,
timeline_id: ZTimelineId,
new_state: TimelineSyncState,
) -> Result<()> {
let mut timelines_accessor = self.timelines.lock().unwrap();
let timeline_to_shutdown = match new_state {
TimelineSyncState::Ready => {
let reloaded_timeline =
self.init_local_timeline(timeline_id, &mut timelines_accessor)?;
timelines_accessor
.insert(timeline_id, LayeredTimelineEntry::Local(reloaded_timeline));
None
}
TimelineSyncState::Evicted => timelines_accessor.remove(&timeline_id),
TimelineSyncState::AwaitsDownload | TimelineSyncState::CloudOnly => {
timelines_accessor.insert(timeline_id, LayeredTimelineEntry::Remote(timeline_id))
}
};
drop(timelines);
shutdown_timeline(timeline_id, removed_timeline.as_ref())?;
drop(timelines_accessor);
if let Some(timeline) = timeline_to_shutdown {
shutdown_timeline(self.tenantid, timeline_id, &timeline)?;
}
Ok(())
}
/// Layered repo does not store anything but
/// * local, fully loaded timelines, ready for usage
/// * remote timelines, that need a download task scheduled first before they can be used
///
/// [`TimelineSyncState::Evicted`] and other non-local and non-remote states are not stored in the layered repo at all,
/// hence their statuses cannot be returned by the repo.
fn get_timeline_state(&self, timeline_id: ZTimelineId) -> Option<TimelineSyncState> {
Some(
if self
.timelines
.lock()
.unwrap()
.get(&timeline_id)?
.local_or_schedule_download(self.tenantid)
.is_some()
{
TimelineSyncState::Ready
} else {
TimelineSyncState::CloudOnly
},
)
}
}
fn shutdown_timeline(
tenant_id: ZTenantId,
timelineid: ZTimelineId,
timeline: &LayeredTimeline,
timeline: &LayeredTimelineEntry,
) -> Result<(), anyhow::Error> {
walreceiver::stop_wal_receiver(timelineid);
trace!("repo shutdown. checkpoint timeline {}", timelineid);
timeline.checkpoint(CheckpointConfig::Forced)?;
//TODO Wait for walredo process to shutdown too
if let Some(timeline) = timeline.local_or_schedule_download(tenant_id) {
timeline
.upload_relishes
.store(false, atomic::Ordering::Relaxed);
walreceiver::stop_wal_receiver(timelineid);
trace!("repo shutdown. checkpoint timeline {}", timelineid);
timeline.checkpoint(CheckpointConfig::Forced)?;
//TODO Wait for walredo process to shutdown too
} else {
warn!("Skpping shutdown of a remote timeline");
}
Ok(())
}
#[derive(Clone)]
enum LayeredTimelineEntry {
Local(Arc<LayeredTimeline>),
Remote(ZTimelineId),
}
impl LayeredTimelineEntry {
fn timeline_id(&self) -> ZTimelineId {
match self {
LayeredTimelineEntry::Local(timeline) => timeline.timelineid,
LayeredTimelineEntry::Remote(timeline_id) => *timeline_id,
}
}
/// Gets local timeline data, if it's present. Otherwise schedules a download fot the remote timeline and returns `None`.
fn local_or_schedule_download(&self, tenant_id: ZTenantId) -> Option<&LayeredTimeline> {
match self {
Self::Local(local) => Some(local.as_ref()),
Self::Remote(timeline_id) => {
debug!(
"Accessed a remote timeline {} for tenant {}, scheduling a timeline download",
timeline_id, tenant_id
);
schedule_timeline_download(tenant_id, *timeline_id);
None
}
}
}
}
/// Private functions
impl LayeredRepository {
// Implementation of the public `get_timeline` function. This differs from the public
// interface in that the caller must already hold the mutex on the 'timelines' hashmap.
fn get_timeline_locked(
fn get_or_init_timeline(
&self,
timelineid: ZTimelineId,
timelines: &mut HashMap<ZTimelineId, Arc<LayeredTimeline>>,
) -> Result<Arc<LayeredTimeline>> {
timelines: &mut HashMap<ZTimelineId, LayeredTimelineEntry>,
) -> Result<LayeredTimelineEntry> {
match timelines.get(&timelineid) {
Some(timeline) => Ok(timeline.clone()),
Some(timeline_entry) => {
let _ = timeline_entry.local_or_schedule_download(self.tenantid);
Ok(timeline_entry.clone())
}
None => {
let metadata = Self::load_metadata(self.conf, timelineid, self.tenantid)
.context("failed to load metadata")?;
let disk_consistent_lsn = metadata.disk_consistent_lsn();
// Recurse to look up the ancestor timeline.
//
// TODO: If you have a very deep timeline history, this could become
// expensive. Perhaps delay this until we need to look up a page in
// ancestor.
let ancestor = if let Some(ancestor_timelineid) = metadata.ancestor_timeline() {
Some(self.get_timeline_locked(ancestor_timelineid, timelines)?)
} else {
None
};
let _enter =
info_span!("loading timeline", timeline = %timelineid, tenant = %self.tenantid)
.entered();
let mut timeline = LayeredTimeline::new(
self.conf,
metadata.clone(),
ancestor,
let timeline = self.init_local_timeline(timelineid, timelines)?;
timelines.insert(
timelineid,
self.tenantid,
Arc::clone(&self.walredo_mgr),
0, // init with 0 and update after layers are loaded,
self.upload_relishes,
)?;
// List the layers on disk, and load them into the layer map
let loaded_layers = timeline
.load_layer_map(disk_consistent_lsn)
.context("failed to load layermap")?;
if self.upload_relishes {
schedule_timeline_checkpoint_upload(
self.tenantid,
timelineid,
loaded_layers,
metadata,
);
}
// needs to be after load_layer_map
timeline.init_current_logical_size()?;
let timeline = Arc::new(timeline);
timelines.insert(timelineid, timeline.clone());
Ok(timeline)
LayeredTimelineEntry::Local(Arc::clone(&timeline)),
);
Ok(LayeredTimelineEntry::Local(timeline))
}
}
}
fn init_local_timeline(
&self,
timelineid: ZTimelineId,
timelines: &mut HashMap<ZTimelineId, LayeredTimelineEntry>,
) -> anyhow::Result<Arc<LayeredTimeline>> {
let metadata = Self::load_metadata(self.conf, timelineid, self.tenantid)
.context("failed to load metadata")?;
let disk_consistent_lsn = metadata.disk_consistent_lsn();
let ancestor = metadata
.ancestor_timeline()
.map(|ancestor_timelineid| self.get_or_init_timeline(ancestor_timelineid, timelines))
.transpose()?;
let _enter =
info_span!("loading timeline", timeline = %timelineid, tenant = %self.tenantid)
.entered();
let mut timeline = LayeredTimeline::new(
self.conf,
metadata,
ancestor,
timelineid,
self.tenantid,
Arc::clone(&self.walredo_mgr),
0, // init with 0 and update after layers are loaded,
self.upload_relishes,
);
timeline
.load_layer_map(disk_consistent_lsn)
.context("failed to load layermap")?;
timeline.init_current_logical_size()?;
Ok(Arc::new(timeline))
}
pub fn new(
conf: &'static PageServerConf,
walredo_mgr: Arc<dyn WalRedoManager + Send + Sync>,
@@ -483,10 +573,31 @@ impl LayeredRepository {
//Now collect info about branchpoints
let mut all_branchpoints: BTreeSet<(ZTimelineId, Lsn)> = BTreeSet::new();
for timelineid in &timelineids {
let timeline = self.get_timeline_locked(*timelineid, &mut *timelines)?;
for &timelineid in &timelineids {
let timeline = match self.get_or_init_timeline(timelineid, &mut timelines)? {
LayeredTimelineEntry::Local(timeline) => timeline,
LayeredTimelineEntry::Remote(_) => {
warn!(
"Timeline {} is not local, cannot proceed with gc",
timelineid
);
return Ok(totals);
}
};
if let Some(ancestor_timeline) = &timeline.ancestor_timeline {
let ancestor_timeline =
match ancestor_timeline.local_or_schedule_download(self.tenantid) {
Some(timeline) => timeline,
None => {
warn!(
"Timeline {} has ancestor {} is not local, cannot proceed with gc",
timelineid,
ancestor_timeline.timeline_id()
);
return Ok(totals);
}
};
// If target_timeline is specified, we only need to know branchpoints of its children
if let Some(timelineid) = target_timelineid {
if ancestor_timeline.timelineid == timelineid {
@@ -510,7 +621,13 @@ impl LayeredRepository {
// We have already loaded all timelines above
// so this operation is just a quick map lookup.
let timeline = self.get_timeline_locked(timelineid, &mut *timelines)?;
let timeline = match self.get_or_init_timeline(timelineid, &mut *timelines)? {
LayeredTimelineEntry::Local(timeline) => timeline,
LayeredTimelineEntry::Remote(_) => {
debug!("Skipping GC for non-local timeline {}", timelineid);
continue;
}
};
// If target_timeline is specified, only GC it
if let Some(target_timelineid) = target_timelineid {
@@ -584,7 +701,7 @@ pub struct LayeredTimeline {
// Parent timeline that this timeline was branched from, and the LSN
// of the branch point.
ancestor_timeline: Option<Arc<LayeredTimeline>>,
ancestor_timeline: Option<LayeredTimelineEntry>,
ancestor_lsn: Lsn,
// this variable indicates how much space is used from user's point of view,
@@ -606,8 +723,8 @@ pub struct LayeredTimeline {
// ordering for its operations, but involves private modules, and macro trickery
current_logical_size_gauge: IntGauge,
/// If `true`, will backup its timeline files to remote storage after freezing.
upload_relishes: bool,
/// If `true`, will backup its files that appear after each checkpointing to the remote storage.
upload_relishes: AtomicBool,
/// Ensures layers aren't frozen by checkpointer between
/// [`LayeredTimeline::get_layer_for_write`] and layer reads.
@@ -635,7 +752,9 @@ impl Timeline for LayeredTimeline {
}
fn get_ancestor_timeline_id(&self) -> Option<ZTimelineId> {
self.ancestor_timeline.as_ref().map(|x| x.timelineid)
self.ancestor_timeline
.as_ref()
.map(LayeredTimelineEntry::timeline_id)
}
/// Wait until WAL has been received up to the given LSN.
@@ -800,11 +919,17 @@ impl Timeline for LayeredTimeline {
}
}
if let Some(ancestor) = timeline.ancestor_timeline.as_ref() {
timeline = ancestor;
continue;
} else {
break;
match &timeline.ancestor_timeline {
None => break,
Some(ancestor_entry) => {
match ancestor_entry.local_or_schedule_download(self.tenantid) {
Some(ancestor) => {
timeline = ancestor;
continue;
}
None => bail!("Cannot list relishes for timeline {} tenant {} due to its ancestor being remote only", self.timelineid, self.tenantid),
}
}
}
}
@@ -870,11 +995,11 @@ impl Timeline for LayeredTimeline {
}
fn get_start_lsn(&self) -> Lsn {
if let Some(ancestor) = self.ancestor_timeline.as_ref() {
ancestor.get_start_lsn()
} else {
self.ancestor_lsn
}
self.ancestor_timeline
.as_ref()
.and_then(|ancestor_entry| ancestor_entry.local_or_schedule_download(self.tenantid))
.map(Timeline::get_start_lsn)
.unwrap_or(self.ancestor_lsn)
}
fn get_current_logical_size(&self) -> usize {
@@ -932,17 +1057,17 @@ impl LayeredTimeline {
fn new(
conf: &'static PageServerConf,
metadata: TimelineMetadata,
ancestor: Option<Arc<LayeredTimeline>>,
ancestor: Option<LayeredTimelineEntry>,
timelineid: ZTimelineId,
tenantid: ZTenantId,
walredo_mgr: Arc<dyn WalRedoManager + Send + Sync>,
current_logical_size: usize,
upload_relishes: bool,
) -> Result<LayeredTimeline> {
) -> LayeredTimeline {
let current_logical_size_gauge = LOGICAL_TIMELINE_SIZE
.get_metric_with_label_values(&[&tenantid.to_string(), &timelineid.to_string()])
.unwrap();
let timeline = LayeredTimeline {
LayeredTimeline {
conf,
timelineid,
tenantid,
@@ -961,28 +1086,26 @@ impl LayeredTimeline {
ancestor_lsn: metadata.ancestor_lsn(),
current_logical_size: AtomicUsize::new(current_logical_size),
current_logical_size_gauge,
upload_relishes,
upload_relishes: AtomicBool::new(upload_relishes),
write_lock: Mutex::new(()),
latest_gc_cutoff_lsn: AtomicLsn::from(metadata.latest_gc_cutoff_lsn()),
initdb_lsn: metadata.initdb_lsn(),
};
Ok(timeline)
}
}
///
/// Scan the timeline directory to populate the layer map.
/// Returns all timeline-related files that were found and loaded.
///
fn load_layer_map(&self, disk_consistent_lsn: Lsn) -> anyhow::Result<Vec<PathBuf>> {
fn load_layer_map(&self, disk_consistent_lsn: Lsn) -> anyhow::Result<()> {
let mut layers = self.layers.lock().unwrap();
let mut num_layers = 0;
let (imgfilenames, deltafilenames) =
filename::list_files(self.conf, self.timelineid, self.tenantid)?;
let timeline_path = self.conf.timeline_path(&self.timelineid, &self.tenantid);
let mut local_layers = Vec::with_capacity(imgfilenames.len() + deltafilenames.len());
// First create ImageLayer structs for each image file.
for filename in &imgfilenames {
if filename.lsn > disk_consistent_lsn {
@@ -998,7 +1121,6 @@ impl LayeredTimeline {
let layer = ImageLayer::new(self.conf, self.timelineid, self.tenantid, filename);
trace!("found layer {}", layer.filename().display());
local_layers.push(layer.path());
layers.insert_historic(Arc::new(layer));
num_layers += 1;
}
@@ -1023,13 +1145,12 @@ impl LayeredTimeline {
let layer = DeltaLayer::new(self.conf, self.timelineid, self.tenantid, filename);
trace!("found layer {}", layer.filename().display());
local_layers.push(layer.path());
layers.insert_historic(Arc::new(layer));
num_layers += 1;
}
info!("loaded layer map with {} layers", num_layers);
Ok(local_layers)
Ok(())
}
///
@@ -1088,7 +1209,19 @@ impl LayeredTimeline {
while lsn < timeline.ancestor_lsn {
trace!("going into ancestor {} ", timeline.ancestor_lsn);
timeline = timeline.ancestor_timeline.as_ref().unwrap();
timeline = match timeline
.ancestor_timeline
.as_ref()
.and_then(|ancestor_entry| ancestor_entry.local_or_schedule_download(self.tenantid))
{
Some(timeline) => timeline,
None => {
bail!(
"Cannot get the whole layer for read locked: timeline {} is not present locally",
self.timelineid
)
}
};
}
// Now we have the right starting timeline for our search.
@@ -1127,13 +1260,23 @@ impl LayeredTimeline {
}
// If not, check if there's a layer on the ancestor timeline
if let Some(ancestor) = &timeline.ancestor_timeline {
lsn = timeline.ancestor_lsn;
timeline = ancestor.as_ref();
trace!("recursing into ancestor at {}/{}", timeline.timelineid, lsn);
continue;
match &timeline.ancestor_timeline {
Some(ancestor_entry) => {
match ancestor_entry.local_or_schedule_download(self.tenantid) {
Some(ancestor) => {
lsn = timeline.ancestor_lsn;
timeline = ancestor;
trace!("recursing into ancestor at {}/{}", timeline.timelineid, lsn);
continue;
}
None => bail!(
"Cannot get a layer for read from remote ancestor timeline {}",
self.timelineid
),
}
}
None => return Ok(None),
}
return Ok(None);
}
}
@@ -1345,7 +1488,10 @@ impl LayeredTimeline {
None
};
let ancestor_timelineid = self.ancestor_timeline.as_ref().map(|x| x.timelineid);
let ancestor_timelineid = self
.ancestor_timeline
.as_ref()
.map(LayeredTimelineEntry::timeline_id);
let metadata = TimelineMetadata::new(
disk_consistent_lsn,
@@ -1363,7 +1509,7 @@ impl LayeredTimeline {
&metadata,
false,
)?;
if self.upload_relishes {
if self.upload_relishes.load(atomic::Ordering::Relaxed) {
schedule_timeline_checkpoint_upload(
self.tenantid,
self.timelineid,
@@ -1568,8 +1714,12 @@ impl LayeredTimeline {
prior_lsn, self.timelineid
);
}
// Now check ancestor timelines, if any
else if let Some(ancestor) = &self.ancestor_timeline {
// Now check ancestor timelines, if any are present locally
else if let Some(ancestor) =
self.ancestor_timeline.as_ref().and_then(|timeline_entry| {
timeline_entry.local_or_schedule_download(self.tenantid)
})
{
let prior_lsn = ancestor.get_last_record_lsn();
if seg.rel.is_blocky() {
info!(

View File

@@ -1,4 +1,4 @@
use layered_repository::{TENANTS_SEGMENT_NAME, TIMELINES_SEGMENT_NAME};
use layered_repository::TIMELINES_SEGMENT_NAME;
use zenith_utils::postgres_backend::AuthType;
use zenith_utils::zid::{ZTenantId, ZTimelineId};
@@ -105,7 +105,7 @@ impl PageServerConf {
//
fn tenants_path(&self) -> PathBuf {
self.workdir.join(TENANTS_SEGMENT_NAME)
self.workdir.join("tenants")
}
fn tenant_path(&self, tenantid: &ZTenantId) -> PathBuf {

View File

@@ -279,7 +279,8 @@ impl PageServerHandler {
let _enter = info_span!("pagestream", timeline = %timelineid, tenant = %tenantid).entered();
// Check that the timeline exists
let timeline = tenant_mgr::get_timeline_for_tenant(tenantid, timelineid)?;
let timeline = tenant_mgr::get_timeline_for_tenant(tenantid, timelineid)
.context("Cannot handle pagerequests for a remote timeline")?;
/* switch client to COPYBOTH */
pgb.write_message(&BeMessage::CopyBothResponse)?;
@@ -301,17 +302,17 @@ impl PageServerHandler {
PagestreamFeMessage::Exists(req) => SMGR_QUERY_TIME
.with_label_values(&["get_rel_exists"])
.observe_closure_duration(|| {
self.handle_get_rel_exists_request(&*timeline, &req)
self.handle_get_rel_exists_request(timeline.as_ref(), &req)
}),
PagestreamFeMessage::Nblocks(req) => SMGR_QUERY_TIME
.with_label_values(&["get_rel_size"])
.observe_closure_duration(|| {
self.handle_get_nblocks_request(&*timeline, &req)
self.handle_get_nblocks_request(timeline.as_ref(), &req)
}),
PagestreamFeMessage::GetPage(req) => SMGR_QUERY_TIME
.with_label_values(&["get_page_at_lsn"])
.observe_closure_duration(|| {
self.handle_get_page_at_lsn_request(&*timeline, &req)
self.handle_get_page_at_lsn_request(timeline.as_ref(), &req)
}),
};
@@ -455,7 +456,8 @@ impl PageServerHandler {
let _enter = span.enter();
// check that the timeline exists
let timeline = tenant_mgr::get_timeline_for_tenant(tenantid, timelineid)?;
let timeline = tenant_mgr::get_timeline_for_tenant(tenantid, timelineid)
.context("Cannot handle basebackup request for a remote timeline")?;
if let Some(lsn) = lsn {
timeline
.check_lsn_is_in_scope(lsn)
@@ -595,7 +597,8 @@ impl postgres_backend::Handler for PageServerHandler {
info_span!("callmemaybe", timeline = %timelineid, tenant = %tenantid).entered();
// Check that the timeline exists
tenant_mgr::get_timeline_for_tenant(tenantid, timelineid)?;
tenant_mgr::get_timeline_for_tenant(tenantid, timelineid)
.context("Failed to fetch local timeline for callmemaybe requests")?;
walreceiver::launch_wal_receiver(self.conf, timelineid, &connstr, tenantid.to_owned());

View File

@@ -8,8 +8,15 @@
//! * [`rust_s3`] uses AWS S3 bucket entirely as an external storage
//!
//! * synchronization logic at [`storage_sync`] module that keeps pageserver state (both runtime one and the workdir files) and storage state in sync.
//! Synchronization internals are split into submodules
//! * [`storage_sync::compression`] for a custom remote storage format used to store timeline files in archives
//! * [`storage_sync::index`] to keep track of remote tenant files, the metadata and their mappings to local files
//! * [`storage_sync::upload`] and [`storage_sync::download`] to manage archive creation and upload; download and extraction, respectively
//!
//! * public API via to interact with the external world: [`run_storage_sync_thread`] and [`schedule_timeline_checkpoint_upload`]
//! * public API via to interact with the external world:
//! * [`start_local_timeline_sync`] to launch a background async loop to handle the synchronization
//! * [`schedule_timeline_checkpoint_upload`] and [`schedule_timeline_download`] to enqueue a new upload and download tasks,
//! to be processed by the async loop
//!
//! Here's a schematic overview of all interactions backup and the rest of the pageserver perform:
//!
@@ -17,10 +24,10 @@
//! | | - - - (init async loop) - - - -> | |
//! | | | |
//! | | -------------------------------> | async |
//! | pageserver | (schedule checkpoint upload) | upload/download |
//! | pageserver | (enqueue timeline sync task) | upload/download |
//! | | | loop |
//! | | <------------------------------- | |
//! | | (register downloaded timelines) | |
//! | | (apply new timeline sync states) | |
//! +------------------------+ +---------<-------+
//! |
//! |
@@ -37,92 +44,259 @@
//! +------------------------+
//!
//! First, during startup, the pageserver inits the storage sync thread with the async loop, or leaves the loop uninitialised, if configured so.
//! The loop inits the storage connection and checks the remote files stored.
//! This is done once at startup only, relying on the fact that pageserver uses the storage alone (ergo, nobody else uploads the files to the storage but this server).
//! Based on the remote storage data, the sync logic immediately schedules sync tasks for local timelines and reports about remote only timelines to pageserver, so it can
//! query their downloads later if they are accessed.
//!
//! Some time later, during pageserver checkpoints, in-memory data is flushed onto disk along with its metadata.
//! If the storage sync loop was successfully started before, pageserver schedules the new checkpoint file uploads after every checkpoint.
//! The checkpoint uploads are disabled, if no remote storage configuration is provided (no sync loop is started this way either).
//! See [`crate::layered_repository`] for the upload calls and the adjacent logic.
//!
//! Synchronization logic is able to communicate back with updated timeline sync states, [`TimelineSyncState`],
//! submitted via [`crate::tenant_mgr::set_timeline_states`] function. Tenant manager applies corresponding timeline updates in pageserver's in-memory state.
//! Such submissions happen in two cases:
//! * once after the sync loop startup, to signal pageserver which timelines will be synchronized in the near future
//! * after every loop step, in case a timeline needs to be reloaded or evicted from pageserver's memory
//!
//! When the pageserver terminates, the upload loop finishes a current sync task (if any) and exits.
//!
//! The storage logic considers `image` as a set of local files, fully representing a certain timeline at given moment (identified with `disk_consistent_lsn`).
//! Timeline can change its state, by adding more files on disk and advancing its `disk_consistent_lsn`: this happens after pageserver checkpointing and is followed
//! by the storage upload, if enabled.
//! When a certain checkpoint gets uploaded, the sync loop remembers the fact, preventing further reuploads of the same state.
//! No files are deleted from either local or remote storage, only the missing ones locally/remotely get downloaded/uploaded, local metadata file will be overwritten
//! when the newer image is downloaded.
//!
//! Meanwhile, the loop inits the storage connection and checks the remote files stored.
//! This is done once at startup only, relying on the fact that pageserver uses the storage alone (ergo, nobody else uploads the files to the storage but this server).
//! Based on the remote storage data, the sync logic queues timeline downloads, while accepting any potential upload tasks from pageserver and managing the tasks by their priority.
//! On the timeline download, a [`crate::tenant_mgr::register_timeline_download`] function is called to register the new timeline in pageserver, initializing all related threads and internal state.
//! Yet timeline cannot alter already existing files, and normally cannot remote those too: only a GC process is capable of removing unused files.
//! This way, remote storage synchronization relies on the fact that every checkpoint is incremental and local files are "immutable":
//! * when a certain checkpoint gets uploaded, the sync loop remembers the fact, preventing further reuploads of the same state
//! * no files are deleted from either local or remote storage, only the missing ones locally/remotely get downloaded/uploaded, local metadata file will be overwritten
//! when the newer image is downloaded
//!
//! To optimize S3 storage (and access), the sync loop compresses the checkpoint files before placing them to S3, and uncompresses them back, keeping track of timeline files and metadata.
//! Also, the file remote file list is queried once only, at startup, to avoid possible extra costs and latency issues.
//!
//! When the pageserver terminates, the upload loop finishes a current sync task (if any) and exits.
//! Also, the remote file list is queried once only, at startup, to avoid possible extra costs and latency issues.
//!
//! NOTES:
//! * pageserver assumes it has exclusive write access to the remote storage. If supported, the way multiple pageservers can be separated in the same storage
//! (i.e. using different directories in the local filesystem external storage), but totally up to the storage implementation and not covered with the trait API.
//!
//! * the uploads do not happen right after pageserver startup, they are registered when
//! 1. pageserver does the checkpoint, which happens further in the future after the server start
//! 2. pageserver loads the timeline from disk for the first time
//!
//! * the uploads do not happen right after the upload registration: the sync loop might be occupied with other tasks, or tasks with bigger priority could be waiting already
//! * the sync tasks may not processed immediately after the submission: if they error and get re-enqueued, their execution might be backed off to ensure error cap is not exceeded too fast.
//! The sync queue processing also happens in batches, so the sync tasks can wait in the queue for some time.
mod local_fs;
mod rust_s3;
mod storage_sync;
use std::{
collections::HashMap,
ffi, fs,
path::{Path, PathBuf},
thread,
};
use anyhow::Context;
use anyhow::{bail, Context};
use tokio::io;
use tracing::{error, info};
use zenith_utils::zid::{ZTenantId, ZTimelineId};
pub use self::storage_sync::schedule_timeline_checkpoint_upload;
pub use self::storage_sync::{schedule_timeline_checkpoint_upload, schedule_timeline_download};
use self::{local_fs::LocalFs, rust_s3::S3};
use crate::{PageServerConf, RemoteStorageKind};
use crate::{
layered_repository::metadata::{TimelineMetadata, METADATA_FILE_NAME},
repository::TimelineSyncState,
PageServerConf, RemoteStorageKind,
};
/// Any timeline has its own id and its own tenant it belongs to,
/// the sync processes group timelines by both for simplicity.
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Copy, Hash)]
pub struct TimelineSyncId(ZTenantId, ZTimelineId);
impl std::fmt::Display for TimelineSyncId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "(tenant id: {}, timeline id: {})", self.0, self.1)
}
}
/// A structure to combine all synchronization data to share with pageserver after a successful sync loop initialization.
/// Successful initialization includes a case when sync loop is not started, in which case the startup data is returned still,
/// to simplify the received code.
pub struct SyncStartupData {
/// A sync state, derived from initial comparison of local timeline files and the remote archives,
/// before any sync tasks are executed.
/// To reuse the local file scan logic, the timeline states are returned even if no sync loop get started during init:
/// in this case, no remote files exist and all local timelines with correct metadata files are considered ready.
pub initial_timeline_states: HashMap<ZTenantId, HashMap<ZTimelineId, TimelineSyncState>>,
/// A handle to the sync loop, if it was started from the configuration provided.
pub sync_loop_handle: Option<thread::JoinHandle<anyhow::Result<()>>>,
}
/// Based on the config, initiates the remote storage connection and starts a separate thread
/// that ensures that pageserver and the remote storage are in sync with each other.
/// If no external configuraion connection given, no thread or storage initialization is done.
pub fn run_storage_sync_thread(
/// If no external configuration connection given, no thread or storage initialization is done.
/// Along with that, scans tenant files local and remote (if the sync gets enabled) to check the initial timeline states.
pub fn start_local_timeline_sync(
config: &'static PageServerConf,
) -> anyhow::Result<Option<thread::JoinHandle<anyhow::Result<()>>>> {
) -> anyhow::Result<SyncStartupData> {
let local_timeline_files = local_tenant_timeline_files(config)
.context("Failed to collect local tenant timeline files")?;
match &config.remote_storage_config {
Some(storage_config) => {
let max_concurrent_sync = storage_config.max_concurrent_sync;
let max_sync_errors = storage_config.max_sync_errors;
let handle = match &storage_config.storage {
RemoteStorageKind::LocalFs(root) => storage_sync::spawn_storage_sync_thread(
config,
LocalFs::new(root.clone(), &config.workdir)?,
max_concurrent_sync,
max_sync_errors,
),
RemoteStorageKind::AwsS3(s3_config) => storage_sync::spawn_storage_sync_thread(
config,
S3::new(s3_config, &config.workdir)?,
max_concurrent_sync,
max_sync_errors,
),
};
handle.map(Some)
Some(storage_config) => match &storage_config.storage {
RemoteStorageKind::LocalFs(root) => storage_sync::spawn_storage_sync_thread(
config,
local_timeline_files,
LocalFs::new(root.clone(), &config.workdir)?,
storage_config.max_concurrent_sync,
storage_config.max_sync_errors,
),
RemoteStorageKind::AwsS3(s3_config) => storage_sync::spawn_storage_sync_thread(
config,
local_timeline_files,
S3::new(s3_config, &config.workdir)?,
storage_config.max_concurrent_sync,
storage_config.max_sync_errors,
),
}
.context("Failed to spawn the storage sync thread"),
None => {
info!("No remote storage configured, skipping storage sync, considering all local timelines with correct metadata files enabled");
let mut initial_timeline_states: HashMap<
ZTenantId,
HashMap<ZTimelineId, TimelineSyncState>,
> = HashMap::new();
for TimelineSyncId(tenant_id, timeline_id) in local_timeline_files.into_keys() {
initial_timeline_states
.entry(tenant_id)
.or_default()
.insert(timeline_id, TimelineSyncState::Ready);
}
Ok(SyncStartupData {
initial_timeline_states,
sync_loop_handle: None,
})
}
None => Ok(None),
}
}
fn local_tenant_timeline_files(
config: &'static PageServerConf,
) -> anyhow::Result<HashMap<TimelineSyncId, (TimelineMetadata, Vec<PathBuf>)>> {
let mut local_tenant_timeline_files = HashMap::new();
let tenants_dir = config.tenants_path();
for tenants_dir_entry in fs::read_dir(&tenants_dir)
.with_context(|| format!("Failed to list tenants dir {}", tenants_dir.display()))?
{
match &tenants_dir_entry {
Ok(tenants_dir_entry) => {
match collect_timelines_for_tenant(config, &tenants_dir_entry.path()) {
Ok(collected_files) => {
local_tenant_timeline_files.extend(collected_files.into_iter())
}
Err(e) => error!(
"Failed to collect tenant files from dir '{}' for entry {:?}, reason: {:#}",
tenants_dir.display(),
tenants_dir_entry,
e
),
}
}
Err(e) => error!(
"Failed to list tenants dir entry {:?} in directory {}, reason: {:#}",
tenants_dir_entry,
tenants_dir.display(),
e
),
}
}
Ok(local_tenant_timeline_files)
}
fn collect_timelines_for_tenant(
config: &'static PageServerConf,
tenant_path: &Path,
) -> anyhow::Result<HashMap<TimelineSyncId, (TimelineMetadata, Vec<PathBuf>)>> {
let mut timelines: HashMap<TimelineSyncId, (TimelineMetadata, Vec<PathBuf>)> = HashMap::new();
let tenant_id = tenant_path
.file_name()
.and_then(ffi::OsStr::to_str)
.unwrap_or_default()
.parse::<ZTenantId>()
.context("Could not parse tenant id out of the tenant dir name")?;
let timelines_dir = config.timelines_path(&tenant_id);
for timelines_dir_entry in fs::read_dir(&timelines_dir).with_context(|| {
format!(
"Failed to list timelines dir entry for tenant {}",
tenant_id
)
})? {
match timelines_dir_entry {
Ok(timelines_dir_entry) => {
let timeline_path = timelines_dir_entry.path();
match collect_timeline_files(&timeline_path) {
Ok((timeline_id, metadata, timeline_files)) => {
timelines.insert(
TimelineSyncId(tenant_id, timeline_id),
(metadata, timeline_files),
);
}
Err(e) => error!(
"Failed to process timeline dir contents at '{}', reason: {:#}",
timeline_path.display(),
e
),
}
}
Err(e) => error!(
"Failed to list timelines for entry tenant {}, reason: {:#}",
tenant_id, e
),
}
}
Ok(timelines)
}
fn collect_timeline_files(
timeline_dir: &Path,
) -> anyhow::Result<(ZTimelineId, TimelineMetadata, Vec<PathBuf>)> {
let mut timeline_files = Vec::new();
let mut timeline_metadata_path = None;
let timeline_id = timeline_dir
.file_name()
.and_then(ffi::OsStr::to_str)
.unwrap_or_default()
.parse::<ZTimelineId>()
.context("Could not parse timeline id out of the timeline dir name")?;
let timeline_dir_entries =
fs::read_dir(&timeline_dir).context("Failed to list timeline dir contents")?;
for entry in timeline_dir_entries {
let entry_path = entry.context("Failed to list timeline dir entry")?.path();
if entry_path.is_file() {
if entry_path.file_name().and_then(ffi::OsStr::to_str) == Some(METADATA_FILE_NAME) {
timeline_metadata_path = Some(entry_path);
} else {
timeline_files.push(entry_path);
}
}
}
let timeline_metadata_path = match timeline_metadata_path {
Some(path) => path,
None => bail!("No metadata file found in the timeline directory"),
};
let metadata = TimelineMetadata::from_bytes(
&fs::read(&timeline_metadata_path).context("Failed to read timeline metadata file")?,
)
.context("Failed to parse timeline metadata file bytes")?;
Ok((timeline_id, metadata, timeline_files))
}
/// Storage (potentially remote) API to manage its state.
/// This storage tries to be unaware of any layered repository context,
/// providing basic CRUD operations with storage files.
/// providing basic CRUD operations for storage files.
#[async_trait::async_trait]
trait RemoteStorage: Send + Sync {
/// A way to uniquely reference a file in the remote storage.

View File

@@ -16,12 +16,17 @@ This way, the backups are managed in background, not affecting directly other pa
Current implementation
* provides remote storage wrappers for AWS S3 and local FS
* uploads layers, frozen by pageserver checkpoint thread
* downloads and registers layers, found on the remote storage, but missing locally
* synchronizes the differences with local timelines and remote states as fast as possible
* uploads new relishes, frozen by pageserver checkpoint thread
* downloads and registers timelines, found on the remote storage, but missing locally, if those are requested somehow via pageserver (e.g. http api, gc)
* uses compression when deals with files, for better S3 usage
* maintains an index of what's stored remotely
* evicts failing tasks and stops the corresponding timelines
The tasks are delayed with every retry and the retries are capped, to avoid poisonous tasks.
After any task eviction, or any error at startup checks (e.g. obviously different and wrong local and remote states fot the same timeline),
the timeline has to be stopped from submitting further checkpoint upload tasks, which is done along the corresponding timeline status change.
No good optimisations or performance testing is done, the feature is disabled by default and gets polished over time.
It's planned to deal with all questions that are currently on and prepare the feature to be enabled by default in cloud environments.
@@ -53,19 +58,18 @@ But it's already used in the project, so for now it's reused to avoid bloating t
Based on previous evaluation, even `rusoto-s3` could be a better choice over this library, but needs further benchmarking.
* gc and branches are ignored
* gc is ignored
So far, we don't consider non-main images and don't adjust the remote storage based on GC thread loop results.
Only checkpointer loop affects the remote storage.
So far, we don't adjust the remote storage based on GC thread loop results, only checkpointer loop affects the remote storage.
Index module could be used as a base to implement a deferred GC mechanism, a "defragmentation" that repacks archives into new ones after GC is done removing the files from the archives.
* more timelines should be downloaded on demand
* bracnhes implementaion could be improved
Since we download and load remote layers into pageserver, there's a possibility a need for those layers' ancestors arise.
Most probably, every downloaded image's ancestor is not present in locally too, but currently there's no logic for downloading such ancestors and their metadata,
so the pageserver is unable to respond property on requests to such ancestors.
Currently, there's a code to sync the branches along with the timeline files: on upload, every local branch files that are missing remotely are uploaded,
on the timeline download, missing remote branch files are downlaoded.
To implement the downloading, more `tenant_mgr` refactoring is needed to properly handle web requests for layers and handle the state changes.
[Here](https://github.com/zenithdb/zenith/pull/689#issuecomment-931216193) are the details about initial state management updates needed.
A branch is a per-tenant entity, yet a current implementaion requires synchronizing a timeline first to get the branch files locally.
Currently, there's no other way to know about the remote branch files, neither the file contents is verified and updated.
* no IT tests

View File

@@ -558,7 +558,7 @@ mod fs_tests {
assert_eq!(
first_part_local,
first_part_remote.as_slice(),
"First part bytes should be returned when requrested"
"First part bytes should be returned when requested"
);
let mut second_part_remote = io::BufWriter::new(std::io::Cursor::new(Vec::new()));
@@ -575,7 +575,7 @@ mod fs_tests {
assert_eq!(
second_part_local,
second_part_remote.as_slice(),
"Second part bytes should be returned when requrested"
"Second part bytes should be returned when requested"
);
Ok(())

File diff suppressed because it is too large Load Diff

View File

@@ -1,29 +1,30 @@
//! A set of methods to asynchronously compress and uncompress a stream of data, without holding the entire data in memory.
//!
//! For that, both comporess and uncompress functions operate buffered streams (currently hardcoded sice of [`ARCHIVE_STREAM_BUFFER_SIZE_BYTES`]),
//! A set of structs to represent a compressed part of the timeline, and methods to asynchronously compress and uncompress a stream of data,
//! without holding the entire data in memory.
//! For the latter, both compress and uncompress functions operate buffered streams (currently hardcoded size of [`ARCHIVE_STREAM_BUFFER_SIZE_BYTES`]),
//! not attempting to hold the entire archive in memory.
//!
//! With those ideas, the compression is done with <a href="https://datatracker.ietf.org/doc/html/rfc8878">zstd</a> streaming compression algorithm via the `async-compression` crate.
//! The create does not contain any knobs to tweak the compression, but otherwise is one of the only ones that's both async and has an API to manage the part of an acrhive.
//! The compression is done with <a href="https://datatracker.ietf.org/doc/html/rfc8878">zstd</a> streaming algorithm via the `async-compression` crate.
//! The crate does not contain any knobs to tweak the compression, but otherwise is one of the only ones that's both async and has an API to manage the part of an archive.
//! Zstd was picked as the best algorithm among the ones available in the crate, after testing the initial timeline file compression.
//!
//! Archiving is almost agnostic to timeline file types, with an exception of the metadata file, that's currently distinguished in the [un]compression code.
//! The metadata file is treated separately when [de]compression is involved, to reduce the risk of corrupting the metadata file.
//! When compressed, the metadata file is always required and stored as the last file in the archive stream.
//! When uncompressed, the metadata file gets naturally uncompressed last, to ensure that all other relishes are decompressed successfully first.
//!
//! Archive structure:
//! +----------------------------------------+
//! | header | file_1, ..., file_k, metadata |
//! +----------------------------------------+
//!
//! The archive consists of two separate files:
//! * header, that contains all files names and their sizes and relative paths in the timeline directory
//! The archive consists of two separate zstd archives:
//! * header archive, that contains all files names and their sizes and relative paths in the timeline directory
//! Header is a Rust structure, serialized into bytes and compressed with zstd.
//! * files part, that has metadata file as the last one, all compressed with zstd into a single binary blob
//! * files archive, that has metadata file as the last one, all compressed with zstd into a single binary blob
//!
//! Header offset is stored in the file name, along with the `disk_consistent_lsn` from the metadata file.
//! See [`parse_archive_name`] and [`ARCHIVE_EXTENSION`] for the name details, example: `00000000016B9150-.zst_9732`.
//! This way, the header could be retrieved without reading an entire archive file.
//!
//! The files are stored with the metadata as the last file, to reduce the risk of corrupting the metadata file.
use std::{
collections::BTreeSet,
@@ -45,8 +46,11 @@ use zenith_utils::{bin_ser::BeSer, lsn::Lsn};
use crate::layered_repository::metadata::{TimelineMetadata, METADATA_FILE_NAME};
use super::index::RelativePath;
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct ArchiveHeader {
/// All regular timeline files, excluding the metadata file.
pub files: Vec<FileEntry>,
// Metadata file name is known to the system, as its location relative to the timeline dir,
// so no need to store anything but its size in bytes.
@@ -58,7 +62,7 @@ pub struct FileEntry {
/// Uncompressed file size, bytes.
pub size: u64,
/// A path, relative to the directory root, used when compressing the directory contents.
pub subpath: String,
pub subpath: RelativePath,
}
const ARCHIVE_EXTENSION: &str = "-.zst_";
@@ -76,9 +80,9 @@ const ARCHIVE_STREAM_BUFFER_SIZE_BYTES: usize = 4 * 1024 * 1024;
/// An `impl AsyncRead` and `impl AsyncWrite` pair of connected streams is created to implement the partial contents streaming.
/// The writer end gets into the archive producer future, to put the header and a stream of compressed files.
/// * prepares archive consumer future, by executing the provided closure
/// The closure gets the reader end stream and the name of the file to cretate a future that would stream the file contents elsewhere.
/// The closure gets the reader end stream and the name of the file to create a future that would stream the file contents elsewhere.
/// * runs and waits for both futures to complete
/// * on a successful completion of both futures, hedader, its size and the user-defined consumer future return data is returned
/// * on a successful completion of both futures, header, its size and the user-defined consumer future return data is returned
/// Due to the design above, the archive name and related data is visible inside the consumer future only, so it's possible to return the data,
/// needed for future processing.
pub async fn archive_files_as_stream<Cons, ConsRet, Fut>(
@@ -132,7 +136,7 @@ where
/// Similar to [`archive_files_as_stream`], creates a pair of streams to uncompress the 2nd part of the archive,
/// that contains files and is located after the header.
/// S3 allows downloading partial file contents for a given file key (i.e. name), to accomodate this retrieval,
/// S3 allows downloading partial file contents for a given file key (i.e. name), to accommodate this retrieval,
/// a closure is used.
/// Same concepts with two concurrent futures, user-defined closure, future and return value apply here, but the
/// consumer and the receiver ends are swapped, since the uncompression happens.
@@ -196,10 +200,6 @@ pub async fn read_archive_header<A: io::AsyncRead + Send + Sync + Unpin>(
.read_to_end(&mut header_bytes)
.await
.context("Failed to decompress a header from the archive")?;
header_bytes
.flush()
.await
.context("Failed to decompress a header from the archive")?;
Ok(ArchiveHeader::des(&header_bytes)
.context("Failed to deserialize a header from the archive")?)
@@ -274,20 +274,18 @@ async fn uncompress_with_header(
for entry in header.files {
uncompress_entry(
&mut archive,
&entry.subpath,
&entry.subpath.as_path(destination_dir),
entry.size,
files_to_skip,
destination_dir,
)
.await
.with_context(|| format!("Failed to uncompress archive entry {:?}", entry))?;
}
uncompress_entry(
&mut archive,
METADATA_FILE_NAME,
&destination_dir.join(METADATA_FILE_NAME),
header.metadata_file_size,
files_to_skip,
destination_dir,
)
.await
.context("Failed to uncompress the metadata entry")?;
@@ -296,12 +294,10 @@ async fn uncompress_with_header(
async fn uncompress_entry(
archive: &mut ZstdDecoder<io::BufReader<impl io::AsyncRead + Send + Sync + Unpin>>,
entry_subpath: &str,
destination_path: &Path,
entry_size: u64,
files_to_skip: &BTreeSet<PathBuf>,
destination_dir: &Path,
) -> anyhow::Result<()> {
let destination_path = destination_dir.join(entry_subpath);
if let Some(parent) = destination_path.parent() {
fs::create_dir_all(parent).await.with_context(|| {
format!(
@@ -311,9 +307,9 @@ async fn uncompress_entry(
})?;
};
if files_to_skip.contains(destination_path.as_path()) {
if files_to_skip.contains(destination_path) {
debug!("Skipping {}", destination_path.display());
read_n_bytes(entry_size, archive, &mut io::sink())
copy_n_bytes(entry_size, archive, &mut io::sink())
.await
.context("Failed to skip bytes in the archive")?;
return Ok(());
@@ -326,7 +322,7 @@ async fn uncompress_entry(
destination_path.display()
)
})?);
read_n_bytes(entry_size, archive, &mut destination)
copy_n_bytes(entry_size, archive, &mut destination)
.await
.with_context(|| {
format!(
@@ -349,7 +345,7 @@ async fn write_archive_contents(
) -> anyhow::Result<()> {
debug!("Starting writing files into archive");
for file_entry in header.files {
let path = source_dir.join(&file_entry.subpath);
let path = file_entry.subpath.as_path(&source_dir);
let mut source_file =
io::BufReader::new(fs::File::open(&path).await.with_context(|| {
format!(
@@ -413,16 +409,12 @@ async fn prepare_header(
if file_path.file_name().and_then(|name| name.to_str()) != Some(METADATA_FILE_NAME) {
let entry = FileEntry {
subpath: file_path
.strip_prefix(&source_dir)
.with_context(|| {
format!(
"File '{}' does not belong to pageserver workspace",
file_path.display()
)
})?
.to_string_lossy()
.to_string(),
subpath: RelativePath::new(source_dir, file_path).with_context(|| {
format!(
"File '{}' does not belong to pageserver workspace",
file_path.display()
)
})?,
size: file_metadata.len(),
};
archive_files.push(entry);
@@ -449,26 +441,17 @@ async fn prepare_header(
Ok((header, compressed_header_bytes))
}
async fn read_n_bytes(
async fn copy_n_bytes(
n: u64,
from: &mut (impl io::AsyncRead + Send + Sync + Unpin),
into: &mut (impl io::AsyncWrite + Send + Sync + Unpin),
) -> anyhow::Result<()> {
let mut bytes_unread = n;
while bytes_unread > 0 {
let mut buf = vec![0; bytes_unread as usize];
let bytes_read = from.read(&mut buf).await?;
if bytes_read == 0 {
break;
}
into.write_all(&buf[0..bytes_read]).await?;
bytes_unread -= bytes_read as u64;
}
let bytes_written = io::copy(&mut from.take(n), into).await?;
ensure!(
bytes_unread == 0,
"Failed to read exactly {} bytes from the input, bytes unread: {}",
bytes_written == n,
"Failed to read exactly {} bytes from the input, bytes written: {}",
n,
bytes_unread,
bytes_written,
);
Ok(())
}

View File

@@ -0,0 +1,370 @@
//! Timeline synchrnonization logic to put files from archives on remote storage into pageserver's local directory.
//! Currently, tenant branch files are also downloaded, but this does not appear final.
use std::{borrow::Cow, collections::BTreeSet, path::PathBuf, sync::Arc};
use anyhow::{anyhow, ensure, Context};
use futures::{stream::FuturesUnordered, StreamExt};
use tokio::{fs, sync::RwLock};
use tracing::{debug, error, warn};
use zenith_utils::zid::ZTenantId;
use crate::{
layered_repository::metadata::{metadata_path, TimelineMetadata},
remote_storage::{
storage_sync::{
compression, index::TimelineIndexEntry, sync_queue, tenant_branch_files,
update_index_description, SyncKind, SyncTask,
},
RemoteStorage, TimelineSyncId,
},
PageServerConf,
};
use super::{
index::{ArchiveId, RemoteTimeline, RemoteTimelineIndex},
TimelineDownload,
};
/// Attempts to download and uncompress files from all remote archives for the timeline given.
/// Timeline files that already exist locally are skipped during the download, but the local metadata file is
/// updated in the end of every checkpoint archive extraction.
///
/// Before any archives are considered, the branch files are checked locally and remotely, all remote-only files are downloaded.
///
/// On an error, bumps the retries count and reschedules the download, with updated archive skip list
/// (for any new successful archive downloads and extractions).
pub(super) async fn download_timeline<
P: std::fmt::Debug + Send + Sync + 'static,
S: RemoteStorage<StoragePath = P> + Send + Sync + 'static,
>(
conf: &'static PageServerConf,
remote_assets: Arc<(S, RwLock<RemoteTimelineIndex>)>,
sync_id: TimelineSyncId,
mut download: TimelineDownload,
retries: u32,
) -> Option<bool> {
debug!("Downloading layers for sync id {}", sync_id);
if let Err(e) = download_missing_branches(conf, remote_assets.as_ref(), sync_id.0).await {
error!(
"Failed to download missing branches for sync id {}: {:#}",
sync_id, e
);
sync_queue::push(SyncTask::new(
sync_id,
retries,
SyncKind::Download(download),
));
return Some(false);
}
let TimelineSyncId(tenant_id, timeline_id) = sync_id;
let index_read = remote_assets.1.read().await;
let remote_timeline = match index_read.timeline_entry(&sync_id) {
None => {
error!("Cannot download: no timeline is present in the index for given ids");
return None;
}
Some(TimelineIndexEntry::Full(remote_timeline)) => Cow::Borrowed(remote_timeline),
Some(TimelineIndexEntry::Description(_)) => {
drop(index_read);
debug!("Found timeline description for the given ids, downloading the full index");
match update_index_description(
remote_assets.as_ref(),
&conf.timeline_path(&timeline_id, &tenant_id),
sync_id,
)
.await
{
Ok(remote_timeline) => Cow::Owned(remote_timeline),
Err(e) => {
error!("Failed to download full timeline index: {:#}", e);
sync_queue::push(SyncTask::new(
sync_id,
retries,
SyncKind::Download(download),
));
return Some(false);
}
}
}
};
let mut archives_to_download = remote_timeline
.checkpoints()
.map(ArchiveId)
.filter(|remote_archive| !download.archives_to_skip.contains(remote_archive))
.collect::<Vec<_>>();
let archives_total = archives_to_download.len();
debug!("Downloading {} archives of a timeline", archives_total);
while let Some(archive_id) = archives_to_download.pop() {
match try_download_archive(
conf,
sync_id,
Arc::clone(&remote_assets),
remote_timeline.as_ref(),
archive_id,
Arc::clone(&download.files_to_skip),
)
.await
{
Err(e) => {
let archives_left = archives_to_download.len();
error!(
"Failed to download archive {:?} for tenant {} timeline {} : {:#}, requeueing the download ({} archives left out of {})",
archive_id, tenant_id, timeline_id, e, archives_left, archives_total
);
sync_queue::push(SyncTask::new(
sync_id,
retries,
SyncKind::Download(download),
));
return Some(false);
}
Ok(()) => {
debug!("Successfully downloaded archive {:?}", archive_id);
download.archives_to_skip.insert(archive_id);
}
}
}
debug!("Finished downloading all timeline's archives");
Some(true)
}
async fn try_download_archive<
P: Send + Sync + 'static,
S: RemoteStorage<StoragePath = P> + Send + Sync + 'static,
>(
conf: &'static PageServerConf,
TimelineSyncId(tenant_id, timeline_id): TimelineSyncId,
remote_assets: Arc<(S, RwLock<RemoteTimelineIndex>)>,
remote_timeline: &RemoteTimeline,
archive_id: ArchiveId,
files_to_skip: Arc<BTreeSet<PathBuf>>,
) -> anyhow::Result<()> {
debug!("Downloading archive {:?}", archive_id);
let archive_to_download = remote_timeline
.archive_data(archive_id)
.ok_or_else(|| anyhow!("Archive {:?} not found in remote storage", archive_id))?;
let (archive_header, header_size) = remote_timeline
.restore_header(archive_id)
.context("Failed to restore header when downloading an archive")?;
match read_local_metadata(conf, timeline_id, tenant_id).await {
Ok(local_metadata) => ensure!(
// need to allow `<=` instead of `<` due to cases when a failed archive can be redownloaded
local_metadata.disk_consistent_lsn() <= archive_to_download.disk_consistent_lsn(),
"Cannot download archive with LSN {} since it's earlier than local LSN {}",
archive_to_download.disk_consistent_lsn(),
local_metadata.disk_consistent_lsn()
),
Err(e) => warn!("Failed to read local metadata file, assuing it's safe to override its with the download. Read: {:#}", e),
}
compression::uncompress_file_stream_with_index(
conf.timeline_path(&timeline_id, &tenant_id),
files_to_skip,
archive_to_download.disk_consistent_lsn(),
archive_header,
header_size,
move |mut archive_target, archive_name| async move {
let archive_local_path = conf
.timeline_path(&timeline_id, &tenant_id)
.join(&archive_name);
let remote_storage = &remote_assets.0;
remote_storage
.download_range(
&remote_storage.storage_path(&archive_local_path)?,
header_size,
None,
&mut archive_target,
)
.await
},
)
.await?;
Ok(())
}
async fn read_local_metadata(
conf: &'static PageServerConf,
timeline_id: zenith_utils::zid::ZTimelineId,
tenant_id: ZTenantId,
) -> anyhow::Result<TimelineMetadata> {
let local_metadata_path = metadata_path(conf, timeline_id, tenant_id);
let local_metadata_bytes = fs::read(&local_metadata_path)
.await
.context("Failed to read local metadata file bytes")?;
Ok(TimelineMetadata::from_bytes(&local_metadata_bytes)
.context("Failed to read local metadata files bytes")?)
}
async fn download_missing_branches<
P: std::fmt::Debug + Send + Sync + 'static,
S: RemoteStorage<StoragePath = P> + Send + Sync + 'static,
>(
conf: &'static PageServerConf,
(storage, index): &(S, RwLock<RemoteTimelineIndex>),
tenant_id: ZTenantId,
) -> anyhow::Result<()> {
let local_branches = tenant_branch_files(conf, tenant_id)
.await
.context("Failed to list local branch files for the tenant")?;
let local_branches_dir = conf.branches_path(&tenant_id);
if !local_branches_dir.exists() {
fs::create_dir_all(&local_branches_dir)
.await
.with_context(|| {
format!(
"Failed to create local branches directory at path '{}'",
local_branches_dir.display()
)
})?;
}
if let Some(remote_branches) = index.read().await.branch_files(tenant_id) {
let mut remote_only_branches_downloads = remote_branches
.difference(&local_branches)
.map(|remote_only_branch| async move {
let branches_dir = conf.branches_path(&tenant_id);
let remote_branch_path = remote_only_branch.as_path(&branches_dir);
let storage_path =
storage.storage_path(&remote_branch_path).with_context(|| {
format!(
"Failed to derive a storage path for branch with local path '{}'",
remote_branch_path.display()
)
})?;
let mut target_file = fs::OpenOptions::new()
.write(true)
.create_new(true)
.open(&remote_branch_path)
.await
.with_context(|| {
format!(
"Failed to create local branch file at '{}'",
remote_branch_path.display()
)
})?;
storage
.download(&storage_path, &mut target_file)
.await
.with_context(|| {
format!(
"Failed to download branch file from the remote path {:?}",
storage_path
)
})?;
Ok::<_, anyhow::Error>(())
})
.collect::<FuturesUnordered<_>>();
let mut branch_downloads_failed = false;
while let Some(download_result) = remote_only_branches_downloads.next().await {
if let Err(e) = download_result {
branch_downloads_failed = true;
error!("Failed to download a branch file: {:#}", e);
}
}
ensure!(
!branch_downloads_failed,
"Failed to download all branch files"
);
}
Ok(())
}
#[cfg(test)]
mod tests {
use std::collections::BTreeSet;
use tempfile::tempdir;
use tokio::fs;
use zenith_utils::lsn::Lsn;
use crate::{
remote_storage::{
local_fs::LocalFs,
storage_sync::test_utils::{
assert_index_descriptions, assert_timeline_files_match, create_local_timeline,
dummy_metadata, ensure_correct_timeline_upload, expect_timeline,
},
},
repository::repo_harness::{RepoHarness, TIMELINE_ID},
};
use super::*;
#[tokio::test]
async fn test_download_timeline() -> anyhow::Result<()> {
let tempdir = tempdir()?;
let tempdir_path = tempdir.path();
let _ = zenith_utils::logging::init(tempdir_path.join("log.log"), false);
let repo_harness = RepoHarness::create("test_download_timeline")?;
let sync_id = TimelineSyncId(repo_harness.tenant_id, TIMELINE_ID);
let storage = LocalFs::new(tempdir_path.to_owned(), &repo_harness.conf.workdir)?;
let index = RwLock::new(RemoteTimelineIndex::try_parse_descriptions_from_paths(
repo_harness.conf,
storage
.list()
.await?
.into_iter()
.map(|storage_path| storage.local_path(&storage_path).unwrap()),
));
let remote_assets = Arc::new((storage, index));
let storage = &remote_assets.0;
let index = &remote_assets.1;
let regular_timeline_path = repo_harness.timeline_path(&TIMELINE_ID);
let regular_timeline = create_local_timeline(
&repo_harness,
TIMELINE_ID,
&["a", "b"],
dummy_metadata(Lsn(0x30)),
)?;
ensure_correct_timeline_upload(
&repo_harness,
Arc::clone(&remote_assets),
TIMELINE_ID,
regular_timeline,
)
.await;
fs::remove_dir_all(&regular_timeline_path).await?;
let remote_regular_timeline = expect_timeline(index, sync_id).await;
download_timeline(
repo_harness.conf,
Arc::clone(&remote_assets),
sync_id,
TimelineDownload {
files_to_skip: Arc::new(BTreeSet::new()),
archives_to_skip: BTreeSet::new(),
},
0,
)
.await;
assert_index_descriptions(
index,
RemoteTimelineIndex::try_parse_descriptions_from_paths(
repo_harness.conf,
remote_assets
.0
.list()
.await
.unwrap()
.into_iter()
.map(|storage_path| storage.local_path(&storage_path).unwrap()),
),
)
.await;
assert_timeline_files_match(&repo_harness, TIMELINE_ID, remote_regular_timeline);
Ok(())
}
}

View File

@@ -1,32 +1,137 @@
//! In-memory index to track the timeline files in the remote strorage's archives.
//! In-memory index to track the tenant files on the remote strorage, mitigating the storage format differences between the local and remote files.
//! Able to restore itself from the storage archive data and reconstruct archive indices on demand.
//!
//! The index is intended to be portable, so deliberately does not store any local paths inside.
//! This way in the future, the index could be restored fast from its serialized stored form.
use std::{
collections::{BTreeMap, BTreeSet, HashMap},
collections::{BTreeMap, BTreeSet, HashMap, HashSet},
path::{Path, PathBuf},
};
use anyhow::{anyhow, bail, ensure, Context};
use futures::stream::{FuturesUnordered, StreamExt};
use tracing::error;
use serde::{Deserialize, Serialize};
use tracing::debug;
use zenith_utils::{
lsn::Lsn,
zid::{ZTenantId, ZTimelineId},
};
use crate::{
layered_repository::{TENANTS_SEGMENT_NAME, TIMELINES_SEGMENT_NAME},
layered_repository::TIMELINES_SEGMENT_NAME,
remote_storage::{
storage_sync::compression::{parse_archive_name, FileEntry},
RemoteStorage, TimelineSyncId,
TimelineSyncId,
},
PageServerConf,
};
use super::compression::{read_archive_header, ArchiveHeader};
use super::compression::ArchiveHeader;
/// A part of the filesystem path, that needs a root to become a path again.
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
pub struct RelativePath(String);
impl RelativePath {
/// Attempts to strip off the base from path, producing a relative path or an error.
pub fn new<P: AsRef<Path>>(base: &Path, path: P) -> anyhow::Result<Self> {
let relative = path
.as_ref()
.strip_prefix(base)
.context("path is not relative to base")?;
Ok(RelativePath(relative.to_string_lossy().to_string()))
}
/// Joins the relative path with the base path.
pub fn as_path(&self, base: &Path) -> PathBuf {
base.join(&self.0)
}
}
/// An index to track tenant files that exist on the remote storage.
/// Currently, timeline archives and branch files are tracked.
#[derive(Debug, Clone)]
pub struct RemoteTimelineIndex {
branch_files: HashMap<ZTenantId, HashSet<RelativePath>>,
timeline_files: HashMap<TimelineSyncId, TimelineIndexEntry>,
}
impl RemoteTimelineIndex {
/// Attempts to parse file paths (not checking the file contents) and find files
/// that can be tracked wiht the index.
/// On parse falures, logs the error and continues, so empty index can be created from not suitable paths.
pub fn try_parse_descriptions_from_paths<P: AsRef<Path>>(
conf: &'static PageServerConf,
paths: impl Iterator<Item = P>,
) -> Self {
let mut index = Self {
branch_files: HashMap::new(),
timeline_files: HashMap::new(),
};
for path in paths {
if let Err(e) = try_parse_index_entry(&mut index, conf, path.as_ref()) {
debug!(
"Failed to parse path '{}' as index entry: {:#}",
path.as_ref().display(),
e
);
}
}
index
}
pub fn timeline_entry(&self, id: &TimelineSyncId) -> Option<&TimelineIndexEntry> {
self.timeline_files.get(id)
}
pub fn timeline_entry_mut(&mut self, id: &TimelineSyncId) -> Option<&mut TimelineIndexEntry> {
self.timeline_files.get_mut(id)
}
pub fn add_timeline_entry(&mut self, id: TimelineSyncId, entry: TimelineIndexEntry) {
self.timeline_files.insert(id, entry);
}
pub fn all_sync_ids(&self) -> impl Iterator<Item = TimelineSyncId> + '_ {
self.timeline_files.keys().copied()
}
pub fn add_branch_file(&mut self, tenant_id: ZTenantId, path: RelativePath) {
self.branch_files
.entry(tenant_id)
.or_insert_with(HashSet::new)
.insert(path);
}
pub fn branch_files(&self, tenant_id: ZTenantId) -> Option<&HashSet<RelativePath>> {
self.branch_files.get(&tenant_id)
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum TimelineIndexEntry {
/// An archive found on the remote storage, but not yet downloaded, only a metadata from its storage path is available, without archive contents.
Description(BTreeMap<ArchiveId, ArchiveDescription>),
/// Full archive metadata, including the file list, parsed from the archive header.
Full(RemoteTimeline),
}
impl TimelineIndexEntry {
pub fn uploaded_checkpoints(&self) -> BTreeSet<Lsn> {
match self {
TimelineIndexEntry::Description(description) => {
description.keys().map(|archive_id| archive_id.0).collect()
}
TimelineIndexEntry::Full(remote_timeline) => remote_timeline
.checkpoint_archives
.keys()
.map(|archive_id| archive_id.0)
.collect(),
}
}
}
/// Checkpoint archive's id, corresponding to the `disk_consistent_lsn` from the timeline's metadata file during checkpointing.
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Copy)]
pub struct ArchiveId(pub(super) Lsn);
@@ -66,27 +171,21 @@ impl RemoteTimeline {
}
}
pub fn checkpoints(&self) -> impl Iterator<Item = Lsn> + '_ {
self.checkpoint_archives
.values()
.map(CheckpointArchive::disk_consistent_lsn)
}
/// Lists all relish files in the given remote timeline. Omits the metadata file.
pub fn stored_files(&self, timeline_dir: &Path) -> BTreeSet<PathBuf> {
self.timeline_files
.values()
.map(|file_entry| timeline_dir.join(&file_entry.subpath))
.map(|file_entry| file_entry.subpath.as_path(timeline_dir))
.collect()
}
pub fn stored_archives(&self) -> Vec<ArchiveId> {
self.checkpoint_archives.keys().copied().collect()
}
#[cfg(test)]
pub fn latest_disk_consistent_lsn(&self) -> Option<Lsn> {
self.checkpoint_archives
.keys()
.last()
.map(|archive_id| archive_id.0)
}
pub fn contains_archive(&self, disk_consistent_lsn: Lsn) -> bool {
pub fn contains_checkpoint_at(&self, disk_consistent_lsn: Lsn) -> bool {
self.checkpoint_archives
.contains_key(&ArchiveId(disk_consistent_lsn))
}
@@ -132,8 +231,8 @@ impl RemoteTimeline {
))
}
/// Updates (creates, if necessary) the data about a certain archive contents.
pub fn set_archive_contents(
/// Updates (creates, if necessary) the data about certain archive contents.
pub fn update_archive_contents(
&mut self,
disk_consistent_lsn: Lsn,
header: ArchiveHeader,
@@ -161,178 +260,112 @@ impl RemoteTimeline {
}
}
/// Reads remote storage file list, parses the data from the file paths and uses it to read every archive's header for every timeline,
/// thus restoring the file list for every timeline.
/// Due to the way headers are stored, S3 api for accessing file byte range is used, so we don't have to download an entire archive for its listing.
pub(super) async fn reconstruct_from_storage<
P: std::fmt::Debug + Send + Sync + 'static,
S: RemoteStorage<StoragePath = P> + Send + Sync + 'static,
>(
storage: &S,
) -> anyhow::Result<HashMap<TimelineSyncId, RemoteTimeline>> {
let mut index = HashMap::<TimelineSyncId, RemoteTimeline>::new();
for (sync_id, remote_archives) in collect_archives(storage).await? {
let mut archive_header_downloads = remote_archives
.into_iter()
.map(|(archive_id, (archive, remote_path))| async move {
let mut header_buf = std::io::Cursor::new(Vec::new());
storage
.download_range(&remote_path, 0, Some(archive.header_size), &mut header_buf)
.await
.map_err(|e| (e, archive_id))?;
let header_buf = header_buf.into_inner();
let header = read_archive_header(&archive.archive_name, &mut header_buf.as_slice())
.await
.map_err(|e| (e, archive_id))?;
Ok::<_, (anyhow::Error, ArchiveId)>((archive_id, archive.header_size, header))
})
.collect::<FuturesUnordered<_>>();
while let Some(header_data) = archive_header_downloads.next().await {
match header_data {
Ok((archive_id, header_size, header)) => {
index
.entry(sync_id)
.or_insert_with(RemoteTimeline::empty)
.set_archive_contents(archive_id.0, header, header_size);
}
Err((e, archive_id)) => {
bail!(
"Failed to download archive header for tenant {}, timeline {}, archive for Lsn {}: {}",
sync_id.0, sync_id.1, archive_id.0,
e
);
}
}
}
}
Ok(index)
/// Metadata abput timeline checkpoint archive, parsed from its remote storage path.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ArchiveDescription {
pub header_size: u64,
pub disk_consistent_lsn: Lsn,
pub archive_name: String,
}
async fn collect_archives<
P: std::fmt::Debug + Send + Sync + 'static,
S: RemoteStorage<StoragePath = P> + Send + Sync + 'static,
>(
storage: &S,
) -> anyhow::Result<HashMap<TimelineSyncId, BTreeMap<ArchiveId, (ArchiveDescription, P)>>> {
let mut remote_archives =
HashMap::<TimelineSyncId, BTreeMap<ArchiveId, (ArchiveDescription, P)>>::new();
for (local_path, remote_path) in storage
.list()
.await
.context("Failed to list remote storage files")?
.into_iter()
.map(|remote_path| (storage.local_path(&remote_path), remote_path))
{
match local_path.and_then(|local_path| parse_archive_description(&local_path)) {
Ok((sync_id, archive_description)) => {
remote_archives.entry(sync_id).or_default().insert(
ArchiveId(archive_description.disk_consistent_lsn),
(archive_description, remote_path),
);
}
Err(e) => error!(
"Failed to parse archive description from path '{:?}', reason: {:#}",
remote_path, e
),
}
}
Ok(remote_archives)
}
struct ArchiveDescription {
header_size: u64,
disk_consistent_lsn: Lsn,
archive_name: String,
}
fn parse_archive_description(
archive_path: &Path,
) -> anyhow::Result<(TimelineSyncId, ArchiveDescription)> {
let (disk_consistent_lsn, header_size) =
parse_archive_name(archive_path).with_context(|| {
fn try_parse_index_entry(
index: &mut RemoteTimelineIndex,
conf: &'static PageServerConf,
path: &Path,
) -> anyhow::Result<()> {
let tenants_dir = conf.tenants_path();
let tenant_id = path
.strip_prefix(&tenants_dir)
.with_context(|| {
format!(
"Failed to parse timeline id from archive name '{}'",
archive_path.display()
)
})?;
let mut segments = archive_path
.iter()
.skip_while(|&segment| segment != TENANTS_SEGMENT_NAME);
let tenants_segment = segments.next().ok_or_else(|| {
anyhow!(
"Found no '{}' segment in the archive path '{}'",
TENANTS_SEGMENT_NAME,
archive_path.display()
)
})?;
ensure!(
tenants_segment == TENANTS_SEGMENT_NAME,
"Failed to extract '{}' segment from archive path '{}'",
TENANTS_SEGMENT_NAME,
archive_path.display()
);
let tenant_id = segments
.next()
.ok_or_else(|| {
anyhow!(
"Found no tenant id in the archive path '{}'",
archive_path.display()
"Path '{}' does not belong to tenants directory '{}'",
path.display(),
tenants_dir.display(),
)
})?
.iter()
.next()
.ok_or_else(|| anyhow!("Found no tenant id in path '{}'", path.display()))?
.to_string_lossy()
.parse::<ZTenantId>()
.with_context(|| {
format!(
"Failed to parse tenant id from archive path '{}'",
archive_path.display()
)
})?;
.with_context(|| format!("Failed to parse tenant id from path '{}'", path.display()))?;
let timelines_segment = segments.next().ok_or_else(|| {
anyhow!(
"Found no '{}' segment in the archive path '{}'",
TIMELINES_SEGMENT_NAME,
archive_path.display()
)
})?;
ensure!(
timelines_segment == TIMELINES_SEGMENT_NAME,
"Failed to extract '{}' segment from archive path '{}'",
TIMELINES_SEGMENT_NAME,
archive_path.display()
);
let timeline_id = segments
.next()
.ok_or_else(|| {
anyhow!(
"Found no timeline id in the archive path '{}'",
archive_path.display()
)
})?
.to_string_lossy()
.parse::<ZTimelineId>()
.with_context(|| {
format!(
"Failed to parse timeline id from archive path '{}'",
archive_path.display()
)
})?;
let branches_path = conf.branches_path(&tenant_id);
let timelines_path = conf.timelines_path(&tenant_id);
match (
RelativePath::new(&branches_path, &path),
path.strip_prefix(&timelines_path),
) {
(Ok(_), Ok(_)) => bail!(
"Path '{}' cannot start with both branches '{}' and the timelines '{}' prefixes",
path.display(),
branches_path.display(),
timelines_path.display()
),
(Ok(branches_entry), Err(_)) => index.add_branch_file(tenant_id, branches_entry),
(Err(_), Ok(timelines_subpath)) => {
let mut segments = timelines_subpath.iter();
let timeline_id = segments
.next()
.ok_or_else(|| {
anyhow!(
"{} directory of tenant {} (path '{}') is not an index entry",
TIMELINES_SEGMENT_NAME,
tenant_id,
path.display()
)
})?
.to_string_lossy()
.parse::<ZTimelineId>()
.with_context(|| {
format!("Failed to parse timeline id from path '{}'", path.display())
})?;
let archive_name = archive_path
.file_name()
.ok_or_else(|| anyhow!("Archive '{}' has no file name", archive_path.display()))?
.to_string_lossy()
.to_string();
Ok((
TimelineSyncId(tenant_id, timeline_id),
ArchiveDescription {
header_size,
disk_consistent_lsn,
archive_name,
},
))
let (disk_consistent_lsn, header_size) =
parse_archive_name(path).with_context(|| {
format!(
"Failed to parse archive name out in path '{}'",
path.display()
)
})?;
let archive_name = path
.file_name()
.ok_or_else(|| anyhow!("Archive '{}' has no file name", path.display()))?
.to_string_lossy()
.to_string();
let sync_id = TimelineSyncId(tenant_id, timeline_id);
let timeline_index_entry = index
.timeline_files
.entry(sync_id)
.or_insert_with(|| TimelineIndexEntry::Description(BTreeMap::new()));
match timeline_index_entry {
TimelineIndexEntry::Description(descriptions) => {
descriptions.insert(
ArchiveId(disk_consistent_lsn),
ArchiveDescription {
header_size,
disk_consistent_lsn,
archive_name,
},
);
}
TimelineIndexEntry::Full(_) => {
bail!("Cannot add parsed archive description to its full context in index with sync id {}", sync_id)
}
}
}
(Err(branches_error), Err(timelines_strip_error)) => {
bail!(
"Path '{}' is not an index entry: it's neither parsable as a branch entry '{:#}' nor as an archive entry '{}'",
path.display(),
branches_error,
timelines_strip_error,
)
}
}
Ok(())
}
#[cfg(test)]
@@ -345,15 +378,15 @@ mod tests {
files: vec![
FileEntry {
size: 5,
subpath: "one".to_string(),
subpath: RelativePath("one".to_string()),
},
FileEntry {
size: 1,
subpath: "two".to_string(),
subpath: RelativePath("two".to_string()),
},
FileEntry {
size: 222,
subpath: "zero".to_string(),
subpath: RelativePath("zero".to_string()),
},
],
metadata_file_size: 5,
@@ -361,7 +394,7 @@ mod tests {
let lsn = Lsn(1);
let mut remote_timeline = RemoteTimeline::empty();
remote_timeline.set_archive_contents(lsn, header.clone(), 15);
remote_timeline.update_archive_contents(lsn, header.clone(), 15);
let (restored_header, _) = remote_timeline
.restore_header(ArchiveId(lsn))

View File

@@ -0,0 +1,566 @@
//! Timeline synchronization logic to compress and upload to the remote storage all new timeline files from the checkpoints.
//! Currently, tenant branch files are also uploaded, but this does not appear final.
use std::{borrow::Cow, collections::BTreeSet, path::PathBuf, sync::Arc};
use anyhow::{ensure, Context};
use futures::{stream::FuturesUnordered, StreamExt};
use tokio::{fs, sync::RwLock};
use tracing::{debug, error, warn};
use zenith_utils::zid::ZTenantId;
use crate::{
remote_storage::{
storage_sync::{
compression,
index::{RemoteTimeline, TimelineIndexEntry},
sync_queue, tenant_branch_files, update_index_description, SyncKind, SyncTask,
},
RemoteStorage, TimelineSyncId,
},
PageServerConf,
};
use super::{compression::ArchiveHeader, index::RemoteTimelineIndex, NewCheckpoint};
/// Attempts to compress and upload given checkpoint files.
/// No extra checks for overlapping files is made: download takes care of that, ensuring no non-metadata local timeline files are overwritten.
///
/// Before the checkpoint files are uploaded, branch files are uploaded, if any local ones are missing remotely.
///
/// On an error, bumps the retries count and reschedules the entire task.
/// On success, populates index data with new downloads.
pub(super) async fn upload_timeline_checkpoint<
P: std::fmt::Debug + Send + Sync + 'static,
S: RemoteStorage<StoragePath = P> + Send + Sync + 'static,
>(
config: &'static PageServerConf,
remote_assets: Arc<(S, RwLock<RemoteTimelineIndex>)>,
sync_id: TimelineSyncId,
new_checkpoint: NewCheckpoint,
retries: u32,
) -> Option<bool> {
debug!("Uploading checkpoint for sync id {}", sync_id);
if let Err(e) = upload_missing_branches(config, remote_assets.as_ref(), sync_id.0).await {
error!(
"Failed to upload missing branches for sync id {}: {:#}",
sync_id, e
);
sync_queue::push(SyncTask::new(
sync_id,
retries,
SyncKind::Upload(new_checkpoint),
));
return Some(false);
}
let new_upload_lsn = new_checkpoint.metadata.disk_consistent_lsn();
let index = &remote_assets.1;
let TimelineSyncId(tenant_id, timeline_id) = sync_id;
let timeline_dir = config.timeline_path(&timeline_id, &tenant_id);
let index_read = index.read().await;
let remote_timeline = match index_read.timeline_entry(&sync_id) {
None => None,
Some(TimelineIndexEntry::Full(remote_timeline)) => Some(Cow::Borrowed(remote_timeline)),
Some(TimelineIndexEntry::Description(_)) => {
debug!("Found timeline description for the given ids, downloading the full index");
match update_index_description(remote_assets.as_ref(), &timeline_dir, sync_id).await {
Ok(remote_timeline) => Some(Cow::Owned(remote_timeline)),
Err(e) => {
error!("Failed to download full timeline index: {:#}", e);
sync_queue::push(SyncTask::new(
sync_id,
retries,
SyncKind::Upload(new_checkpoint),
));
return Some(false);
}
}
}
};
let already_contains_upload_lsn = remote_timeline
.as_ref()
.map(|remote_timeline| remote_timeline.contains_checkpoint_at(new_upload_lsn))
.unwrap_or(false);
if already_contains_upload_lsn {
warn!(
"Received a checkpoint with Lsn {} that's already been uploaded to remote storage, skipping the upload.",
new_upload_lsn
);
return None;
}
let already_uploaded_files = remote_timeline
.map(|timeline| timeline.stored_files(&timeline_dir))
.unwrap_or_default();
drop(index_read);
match try_upload_checkpoint(
config,
Arc::clone(&remote_assets),
sync_id,
&new_checkpoint,
already_uploaded_files,
)
.await
{
Ok((archive_header, header_size)) => {
let mut index_write = index.write().await;
match index_write.timeline_entry_mut(&sync_id) {
Some(TimelineIndexEntry::Full(remote_timeline)) => {
remote_timeline.update_archive_contents(
new_checkpoint.metadata.disk_consistent_lsn(),
archive_header,
header_size,
);
}
None | Some(TimelineIndexEntry::Description(_)) => {
let mut new_timeline = RemoteTimeline::empty();
new_timeline.update_archive_contents(
new_checkpoint.metadata.disk_consistent_lsn(),
archive_header,
header_size,
);
index_write.add_timeline_entry(sync_id, TimelineIndexEntry::Full(new_timeline));
}
}
debug!("Checkpoint uploaded successfully");
Some(true)
}
Err(e) => {
error!(
"Failed to upload checkpoint: {:#}, requeueing the upload",
e
);
sync_queue::push(SyncTask::new(
sync_id,
retries,
SyncKind::Upload(new_checkpoint),
));
Some(false)
}
}
}
async fn try_upload_checkpoint<
P: Send + Sync + 'static,
S: RemoteStorage<StoragePath = P> + Send + Sync + 'static,
>(
config: &'static PageServerConf,
remote_assets: Arc<(S, RwLock<RemoteTimelineIndex>)>,
sync_id: TimelineSyncId,
new_checkpoint: &NewCheckpoint,
files_to_skip: BTreeSet<PathBuf>,
) -> anyhow::Result<(ArchiveHeader, u64)> {
let TimelineSyncId(tenant_id, timeline_id) = sync_id;
let timeline_dir = config.timeline_path(&timeline_id, &tenant_id);
let files_to_upload = new_checkpoint
.layers
.iter()
.filter(|&path_to_upload| {
if files_to_skip.contains(path_to_upload) {
error!(
"Skipping file upload '{}', since it was already uploaded",
path_to_upload.display()
);
false
} else {
true
}
})
.collect::<Vec<_>>();
ensure!(!files_to_upload.is_empty(), "No files to upload");
compression::archive_files_as_stream(
&timeline_dir,
files_to_upload.into_iter(),
&new_checkpoint.metadata,
move |archive_streamer, archive_name| async move {
let timeline_dir = config.timeline_path(&timeline_id, &tenant_id);
let remote_storage = &remote_assets.0;
remote_storage
.upload(
archive_streamer,
&remote_storage.storage_path(&timeline_dir.join(&archive_name))?,
)
.await
},
)
.await
.map(|(header, header_size, _)| (header, header_size))
}
async fn upload_missing_branches<
P: std::fmt::Debug + Send + Sync + 'static,
S: RemoteStorage<StoragePath = P> + Send + Sync + 'static,
>(
config: &'static PageServerConf,
(storage, index): &(S, RwLock<RemoteTimelineIndex>),
tenant_id: ZTenantId,
) -> anyhow::Result<()> {
let local_branches = tenant_branch_files(config, tenant_id)
.await
.context("Failed to list local branch files for the tenant")?;
let index_read = index.read().await;
let remote_branches = index_read
.branch_files(tenant_id)
.cloned()
.unwrap_or_default();
drop(index_read);
let mut branch_uploads = local_branches
.difference(&remote_branches)
.map(|local_only_branch| async move {
let local_branch_path = local_only_branch.as_path(&config.branches_path(&tenant_id));
let storage_path = storage.storage_path(&local_branch_path).with_context(|| {
format!(
"Failed to derive a storage path for branch with local path '{}'",
local_branch_path.display()
)
})?;
let local_branch_file = fs::OpenOptions::new()
.read(true)
.open(&local_branch_path)
.await
.with_context(|| {
format!(
"Failed to open local branch file {} for reading",
local_branch_path.display()
)
})?;
storage
.upload(local_branch_file, &storage_path)
.await
.with_context(|| {
format!(
"Failed to upload branch file to the remote path {:?}",
storage_path
)
})?;
Ok::<_, anyhow::Error>(local_only_branch)
})
.collect::<FuturesUnordered<_>>();
let mut branch_uploads_failed = false;
while let Some(upload_result) = branch_uploads.next().await {
match upload_result {
Ok(local_only_branch) => index
.write()
.await
.add_branch_file(tenant_id, local_only_branch.clone()),
Err(e) => {
error!("Failed to upload branch file: {:#}", e);
branch_uploads_failed = true;
}
}
}
ensure!(!branch_uploads_failed, "Failed to upload all branch files");
Ok(())
}
#[cfg(test)]
mod tests {
use tempfile::tempdir;
use zenith_utils::lsn::Lsn;
use crate::{
remote_storage::{
local_fs::LocalFs,
storage_sync::{
index::ArchiveId,
test_utils::{
assert_index_descriptions, create_local_timeline, dummy_metadata,
ensure_correct_timeline_upload, expect_timeline,
},
},
},
repository::repo_harness::{RepoHarness, TIMELINE_ID},
};
use super::*;
#[tokio::test]
async fn reupload_timeline() -> anyhow::Result<()> {
let repo_harness = RepoHarness::create("reupload_timeline")?;
let sync_id = TimelineSyncId(repo_harness.tenant_id, TIMELINE_ID);
let storage = LocalFs::new(tempdir()?.path().to_owned(), &repo_harness.conf.workdir)?;
let index = RwLock::new(RemoteTimelineIndex::try_parse_descriptions_from_paths(
repo_harness.conf,
storage
.list()
.await?
.into_iter()
.map(|storage_path| storage.local_path(&storage_path).unwrap()),
));
let remote_assets = Arc::new((storage, index));
let index = &remote_assets.1;
let first_upload_metadata = dummy_metadata(Lsn(0x10));
let first_checkpoint = create_local_timeline(
&repo_harness,
TIMELINE_ID,
&["a", "b"],
first_upload_metadata.clone(),
)?;
let local_timeline_path = repo_harness.timeline_path(&TIMELINE_ID);
ensure_correct_timeline_upload(
&repo_harness,
Arc::clone(&remote_assets),
TIMELINE_ID,
first_checkpoint,
)
.await;
let uploaded_timeline = expect_timeline(index, sync_id).await;
let uploaded_archives = uploaded_timeline
.checkpoints()
.map(ArchiveId)
.collect::<Vec<_>>();
assert_eq!(
uploaded_archives.len(),
1,
"Only one archive is expected after a first upload"
);
let first_uploaded_archive = uploaded_archives.first().copied().unwrap();
assert_eq!(
uploaded_timeline.checkpoints().last(),
Some(first_upload_metadata.disk_consistent_lsn()),
"Metadata that was uploaded, should have its Lsn stored"
);
assert_eq!(
uploaded_timeline
.archive_data(uploaded_archives.first().copied().unwrap())
.unwrap()
.disk_consistent_lsn(),
first_upload_metadata.disk_consistent_lsn(),
"Uploaded archive should have corresponding Lsn"
);
assert_eq!(
uploaded_timeline.stored_files(&local_timeline_path),
vec![local_timeline_path.join("a"), local_timeline_path.join("b")]
.into_iter()
.collect(),
"Should have all files from the first checkpoint"
);
let second_upload_metadata = dummy_metadata(Lsn(0x40));
let second_checkpoint = create_local_timeline(
&repo_harness,
TIMELINE_ID,
&["b", "c"],
second_upload_metadata.clone(),
)?;
assert!(
first_upload_metadata.disk_consistent_lsn()
< second_upload_metadata.disk_consistent_lsn()
);
ensure_correct_timeline_upload(
&repo_harness,
Arc::clone(&remote_assets),
TIMELINE_ID,
second_checkpoint,
)
.await;
let updated_timeline = expect_timeline(index, sync_id).await;
let mut updated_archives = updated_timeline
.checkpoints()
.map(ArchiveId)
.collect::<Vec<_>>();
assert_eq!(
updated_archives.len(),
2,
"Two archives are expected after a successful update of the upload"
);
updated_archives.retain(|archive_id| archive_id != &first_uploaded_archive);
assert_eq!(
updated_archives.len(),
1,
"Only one new archive is expected among the uploaded"
);
let second_uploaded_archive = updated_archives.last().copied().unwrap();
assert_eq!(
updated_timeline.checkpoints().max(),
Some(second_upload_metadata.disk_consistent_lsn()),
"Metadata that was uploaded, should have its Lsn stored"
);
assert_eq!(
updated_timeline
.archive_data(second_uploaded_archive)
.unwrap()
.disk_consistent_lsn(),
second_upload_metadata.disk_consistent_lsn(),
"Uploaded archive should have corresponding Lsn"
);
assert_eq!(
updated_timeline.stored_files(&local_timeline_path),
vec![
local_timeline_path.join("a"),
local_timeline_path.join("b"),
local_timeline_path.join("c"),
]
.into_iter()
.collect(),
"Should have all files from both checkpoints without duplicates"
);
let third_upload_metadata = dummy_metadata(Lsn(0x20));
let third_checkpoint = create_local_timeline(
&repo_harness,
TIMELINE_ID,
&["d"],
third_upload_metadata.clone(),
)?;
assert_ne!(
third_upload_metadata.disk_consistent_lsn(),
first_upload_metadata.disk_consistent_lsn()
);
assert!(
third_upload_metadata.disk_consistent_lsn()
< second_upload_metadata.disk_consistent_lsn()
);
ensure_correct_timeline_upload(
&repo_harness,
Arc::clone(&remote_assets),
TIMELINE_ID,
third_checkpoint,
)
.await;
let updated_timeline = expect_timeline(index, sync_id).await;
let mut updated_archives = updated_timeline
.checkpoints()
.map(ArchiveId)
.collect::<Vec<_>>();
assert_eq!(
updated_archives.len(),
3,
"Three archives are expected after two successful updates of the upload"
);
updated_archives.retain(|archive_id| {
archive_id != &first_uploaded_archive && archive_id != &second_uploaded_archive
});
assert_eq!(
updated_archives.len(),
1,
"Only one new archive is expected among the uploaded"
);
let third_uploaded_archive = updated_archives.last().copied().unwrap();
assert!(
updated_timeline.checkpoints().max().unwrap()
> third_upload_metadata.disk_consistent_lsn(),
"Should not influence the last lsn by uploading an older checkpoint"
);
assert_eq!(
updated_timeline
.archive_data(third_uploaded_archive)
.unwrap()
.disk_consistent_lsn(),
third_upload_metadata.disk_consistent_lsn(),
"Uploaded archive should have corresponding Lsn"
);
assert_eq!(
updated_timeline.stored_files(&local_timeline_path),
vec![
local_timeline_path.join("a"),
local_timeline_path.join("b"),
local_timeline_path.join("c"),
local_timeline_path.join("d"),
]
.into_iter()
.collect(),
"Should have all files from three checkpoints without duplicates"
);
Ok(())
}
#[tokio::test]
async fn reupload_timeline_rejected() -> anyhow::Result<()> {
let repo_harness = RepoHarness::create("reupload_timeline_rejected")?;
let sync_id = TimelineSyncId(repo_harness.tenant_id, TIMELINE_ID);
let storage = LocalFs::new(tempdir()?.path().to_owned(), &repo_harness.conf.workdir)?;
let index = RwLock::new(RemoteTimelineIndex::try_parse_descriptions_from_paths(
repo_harness.conf,
storage
.list()
.await?
.into_iter()
.map(|storage_path| storage.local_path(&storage_path).unwrap()),
));
let remote_assets = Arc::new((storage, index));
let storage = &remote_assets.0;
let index = &remote_assets.1;
let first_upload_metadata = dummy_metadata(Lsn(0x10));
let first_checkpoint = create_local_timeline(
&repo_harness,
TIMELINE_ID,
&["a", "b"],
first_upload_metadata.clone(),
)?;
ensure_correct_timeline_upload(
&repo_harness,
Arc::clone(&remote_assets),
TIMELINE_ID,
first_checkpoint,
)
.await;
let after_first_uploads = RemoteTimelineIndex::try_parse_descriptions_from_paths(
repo_harness.conf,
remote_assets
.0
.list()
.await
.unwrap()
.into_iter()
.map(|storage_path| storage.local_path(&storage_path).unwrap()),
);
let normal_upload_metadata = dummy_metadata(Lsn(0x20));
assert_ne!(
normal_upload_metadata.disk_consistent_lsn(),
first_upload_metadata.disk_consistent_lsn()
);
let checkpoint_with_no_files = create_local_timeline(
&repo_harness,
TIMELINE_ID,
&[],
normal_upload_metadata.clone(),
)?;
upload_timeline_checkpoint(
repo_harness.conf,
Arc::clone(&remote_assets),
sync_id,
checkpoint_with_no_files,
0,
)
.await;
assert_index_descriptions(index, after_first_uploads.clone()).await;
let checkpoint_with_uploaded_lsn = create_local_timeline(
&repo_harness,
TIMELINE_ID,
&["something", "new"],
first_upload_metadata.clone(),
)?;
upload_timeline_checkpoint(
repo_harness.conf,
Arc::clone(&remote_assets),
sync_id,
checkpoint_with_uploaded_lsn,
0,
)
.await;
assert_index_descriptions(index, after_first_uploads.clone()).await;
Ok(())
}
}

View File

@@ -16,11 +16,20 @@ use zenith_utils::zid::ZTimelineId;
pub trait Repository: Send + Sync {
fn shutdown(&self) -> Result<()>;
/// Stops all timeline-related process in the repository and removes the timeline data from memory.
fn unload_timeline(&self, timeline_id: ZTimelineId) -> Result<()>;
/// Updates timeline based on the new sync state, received from the remote storage synchronization.
/// See [`crate::remote_storage`] for more details about the synchronization.
fn set_timeline_state(
&self,
timeline_id: ZTimelineId,
new_state: TimelineSyncState,
) -> Result<()>;
/// Gets current synchronization state of the timeline.
/// See [`crate::remote_storage`] for more details about the synchronization.
fn get_timeline_state(&self, timeline_id: ZTimelineId) -> Option<TimelineSyncState>;
/// Get Timeline handle for given zenith timeline ID.
fn get_timeline(&self, timelineid: ZTimelineId) -> Result<Arc<dyn Timeline>>;
fn get_timeline(&self, timelineid: ZTimelineId) -> Result<RepositoryTimeline>;
/// Create a new, empty timeline. The caller is responsible for loading data into it
/// Initdb lsn is provided for timeline impl to be able to perform checks for some operations against it.
@@ -34,7 +43,7 @@ pub trait Repository: Send + Sync {
fn branch_timeline(&self, src: ZTimelineId, dst: ZTimelineId, start_lsn: Lsn) -> Result<()>;
/// perform one garbage collection iteration, removing old data files from disk.
/// this funtion is periodically called by gc thread.
/// this function is periodically called by gc thread.
/// also it can be explicitly requested through page server api 'do_gc' command.
///
/// 'timelineid' specifies the timeline to GC, or None for all.
@@ -54,6 +63,43 @@ pub trait Repository: Send + Sync {
fn checkpoint_iteration(&self, cconf: CheckpointConfig) -> Result<()>;
}
/// A timeline, that belongs to the current repository.
pub enum RepositoryTimeline {
/// Timeline, with its files present locally in pageserver's working directory.
/// Loaded into pageserver's memory and ready to be used.
Local(Arc<dyn Timeline>),
/// Timeline, found on the pageserver's remote storage, but not yet downloaded locally.
Remote(ZTimelineId),
}
impl RepositoryTimeline {
pub fn local_timeline(&self) -> Option<Arc<dyn Timeline>> {
if let Self::Local(local_timeline) = self {
Some(Arc::clone(local_timeline))
} else {
None
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
pub enum TimelineSyncState {
/// No further downloads from the remote storage are needed.
/// The timeline state is up-to-date or ahead of the remote storage one,
/// ready to be used in any pageserver operation.
Ready,
/// Timeline is scheduled for downloading, but its current local state is not up to date with the remote storage.
/// The timeline is not ready to be used in any pageserver operations, otherwise it might diverge its local state from the remote version,
/// making it impossible to sync it further.
AwaitsDownload,
/// Timeline was not in the pageserver's local working directory, but was found on the remote storage, ready to be downloaded.
/// Cannot be used in any pageserver operations due to complete absence locally.
CloudOnly,
/// Timeline was evicted from the pageserver's local working directory due to conflicting remote and local states or too many errors during the synchronization.
/// Such timelines cannot have their state synchronized further.
Evicted,
}
///
/// Result of performing GC
///
@@ -266,6 +312,7 @@ pub mod repo_harness {
let tenant_id = ZTenantId::generate();
fs::create_dir_all(conf.tenant_path(&tenant_id))?;
fs::create_dir_all(conf.branches_path(&tenant_id))?;
Ok(Self { conf, tenant_id })
}
@@ -699,7 +746,10 @@ mod tests {
// Create a branch, check that the relation is visible there
repo.branch_timeline(TIMELINE_ID, NEW_TIMELINE_ID, Lsn(0x30))?;
let newtline = repo.get_timeline(NEW_TIMELINE_ID)?;
let newtline = match repo.get_timeline(NEW_TIMELINE_ID)?.local_timeline() {
Some(timeline) => timeline,
None => panic!("Should have a local timeline"),
};
let new_writer = newtline.writer();
assert!(newtline
@@ -757,7 +807,10 @@ mod tests {
// Branch the history, modify relation differently on the new timeline
repo.branch_timeline(TIMELINE_ID, NEW_TIMELINE_ID, Lsn(0x30))?;
let newtline = repo.get_timeline(NEW_TIMELINE_ID)?;
let newtline = match repo.get_timeline(NEW_TIMELINE_ID)?.local_timeline() {
Some(timeline) => timeline,
None => panic!("Should have a local timeline"),
};
let new_writer = newtline.writer();
new_writer.put_page_image(TESTREL_A, 0, Lsn(0x40), TEST_IMG("bar blk 0 at 4"))?;
@@ -905,7 +958,10 @@ mod tests {
make_some_layers(&tline, Lsn(0x20))?;
repo.branch_timeline(TIMELINE_ID, NEW_TIMELINE_ID, Lsn(0x40))?;
let newtline = repo.get_timeline(NEW_TIMELINE_ID)?;
let newtline = match repo.get_timeline(NEW_TIMELINE_ID)?.local_timeline() {
Some(timeline) => timeline,
None => panic!("Should have a local timeline"),
};
// this removes layers before lsn 40 (50 minus 10), so there are two remaining layers, image and delta for 31-50
repo.gc_iteration(Some(TIMELINE_ID), 0x10, false)?;
@@ -923,7 +979,10 @@ mod tests {
make_some_layers(&tline, Lsn(0x20))?;
repo.branch_timeline(TIMELINE_ID, NEW_TIMELINE_ID, Lsn(0x40))?;
let newtline = repo.get_timeline(NEW_TIMELINE_ID)?;
let newtline = match repo.get_timeline(NEW_TIMELINE_ID)?.local_timeline() {
Some(timeline) => timeline,
None => panic!("Should have a local timeline"),
};
make_some_layers(&newtline, Lsn(0x60))?;

View File

@@ -3,7 +3,7 @@
use crate::branches;
use crate::layered_repository::LayeredRepository;
use crate::repository::{Repository, Timeline};
use crate::repository::{Repository, Timeline, TimelineSyncState};
use crate::tenant_threads;
use crate::walredo::PostgresRedoManager;
use crate::PageServerConf;
@@ -11,10 +11,8 @@ use anyhow::{anyhow, bail, Context, Result};
use lazy_static::lazy_static;
use log::*;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::collections::{hash_map, HashMap};
use std::fmt;
use std::fs;
use std::str::FromStr;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex, MutexGuard};
use zenith_utils::zid::{ZTenantId, ZTimelineId};
@@ -30,11 +28,6 @@ struct Tenant {
#[derive(Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq)]
pub enum TenantState {
// This tenant only exists in cloud storage. It cannot be accessed.
CloudOnly,
// This tenant exists in cloud storage, and we are currently downloading it to local disk.
// It cannot be accessed yet, not until it's been fully downloaded to local disk.
Downloading,
// All data for this tenant is complete on local disk, but we haven't loaded the Repository,
// Timeline and Layer structs into memory yet, so it cannot be accessed yet.
//Ready,
@@ -49,22 +42,9 @@ pub enum TenantState {
Stopping,
}
/// A remote storage timeline synchronization event, that needs another registration step
/// inside the manager to be fully completed.
#[derive(Debug)]
pub enum TimelineRegistration {
/// The timeline cannot be synchronized anymore due to some sync issues.
/// Needs to be removed from pageserver, to avoid further data diverging.
Evict,
/// A new timeline got downloaded and needs to be loaded into pageserver.
Download,
}
impl fmt::Display for TenantState {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
TenantState::CloudOnly => f.write_str("CloudOnly"),
TenantState::Downloading => f.write_str("Downloading"),
TenantState::Active => f.write_str("Active"),
TenantState::Idle => f.write_str("Idle"),
TenantState::Stopping => f.write_str("Stopping"),
@@ -78,109 +58,69 @@ fn access_tenants() -> MutexGuard<'static, HashMap<ZTenantId, Tenant>> {
static SHUTDOWN_REQUESTED: AtomicBool = AtomicBool::new(false);
pub fn init(conf: &'static PageServerConf) {
for dir_entry in fs::read_dir(conf.tenants_path()).unwrap() {
let tenantid =
ZTenantId::from_str(dir_entry.unwrap().file_name().to_str().unwrap()).unwrap();
{
let mut m = access_tenants();
let tenant = Tenant {
state: TenantState::CloudOnly,
repo: None,
};
m.insert(tenantid, tenant);
}
init_repo(conf, tenantid);
info!("initialized storage for tenant: {}", &tenantid);
}
}
fn init_repo(conf: &'static PageServerConf, tenant_id: ZTenantId) {
// Set up a WAL redo manager, for applying WAL records.
let walredo_mgr = PostgresRedoManager::new(conf, tenant_id);
// Set up an object repository, for actual data storage.
let repo = Arc::new(LayeredRepository::new(
conf,
Arc::new(walredo_mgr),
tenant_id,
true,
));
let mut m = access_tenants();
let tenant = m.get_mut(&tenant_id).unwrap();
tenant.repo = Some(repo);
tenant.state = TenantState::Idle;
}
pub fn perform_post_timeline_sync_steps(
/// Updates tenants' repositories, changing their timelines state in memory.
pub fn set_timeline_states(
conf: &'static PageServerConf,
post_sync_steps: HashMap<(ZTenantId, ZTimelineId), TimelineRegistration>,
timeline_states: HashMap<ZTenantId, HashMap<ZTimelineId, TimelineSyncState>>,
) {
if post_sync_steps.is_empty() {
debug!("no post-sync steps to perform");
if timeline_states.is_empty() {
debug!("no timeline state updates to perform");
return;
}
info!("Performing {} post-sync steps", post_sync_steps.len());
trace!("Steps: {:?}", post_sync_steps);
info!("Updating states for {} timelines", timeline_states.len());
trace!("States: {:?}", timeline_states);
{
let mut m = access_tenants();
for &(tenant_id, timeline_id) in post_sync_steps.keys() {
let tenant = m.entry(tenant_id).or_insert_with(|| Tenant {
state: TenantState::Downloading,
repo: None,
});
tenant.state = TenantState::Downloading;
match &tenant.repo {
Some(repo) => {
init_timeline(repo.as_ref(), timeline_id);
tenant.state = TenantState::Idle;
return;
}
None => log::warn!("Initialize new repo"),
}
tenant.state = TenantState::Idle;
}
}
for ((tenant_id, timeline_id), post_sync_step) in post_sync_steps {
match post_sync_step {
TimelineRegistration::Evict => {
if let Err(e) = get_repository_for_tenant(tenant_id)
.and_then(|repo| repo.unload_timeline(timeline_id))
{
error!(
"Failed to remove repository for tenant {}, timeline {}: {:#}",
tenant_id, timeline_id, e
)
}
}
TimelineRegistration::Download => {
// TODO remove later, when branching is added to remote storage sync
for missing_path in [conf.branches_path(&tenant_id), conf.tags_path(&tenant_id)] {
if !missing_path.exists() {
fs::create_dir_all(&missing_path).unwrap();
}
}
// init repo updates Tenant state
init_repo(conf, tenant_id);
let new_repo = get_repository_for_tenant(tenant_id).unwrap();
init_timeline(new_repo.as_ref(), timeline_id);
}
let mut m = access_tenants();
for (tenant_id, timeline_states) in timeline_states {
let tenant = m.entry(tenant_id).or_insert_with(|| Tenant {
state: TenantState::Idle,
repo: None,
});
if let Err(e) = put_timelines_into_tenant(conf, tenant, tenant_id, timeline_states) {
error!(
"Failed to update timeline states for tenant {}: {:#}",
tenant_id, e
);
}
}
}
fn init_timeline(repo: &dyn Repository, timeline_id: ZTimelineId) {
match repo.get_timeline(timeline_id) {
Ok(_timeline) => log::info!("Successfully initialized timeline {}", timeline_id),
Err(e) => log::error!("Failed to init timeline {}, reason: {:#}", timeline_id, e),
fn put_timelines_into_tenant(
conf: &'static PageServerConf,
tenant: &mut Tenant,
tenant_id: ZTenantId,
timeline_states: HashMap<ZTimelineId, TimelineSyncState>,
) -> anyhow::Result<()> {
let repo = match tenant.repo.as_ref() {
Some(repo) => Arc::clone(repo),
None => {
// Set up a WAL redo manager, for applying WAL records.
let walredo_mgr = PostgresRedoManager::new(conf, tenant_id);
// Set up an object repository, for actual data storage.
let repo: Arc<dyn Repository> = Arc::new(LayeredRepository::new(
conf,
Arc::new(walredo_mgr),
tenant_id,
conf.remote_storage_config.is_some(),
));
tenant.repo = Some(Arc::clone(&repo));
repo
}
};
for (timeline_id, timeline_state) in timeline_states {
repo.set_timeline_state(timeline_id, timeline_state)
.with_context(|| {
format!(
"Failed to update timeline {} state to {:?}",
timeline_id, timeline_state
)
})?;
}
Ok(())
}
// Check this flag in the thread loops to know when to exit
@@ -212,37 +152,24 @@ pub fn create_repository_for_tenant(
conf: &'static PageServerConf,
tenantid: ZTenantId,
) -> Result<()> {
{
let mut m = access_tenants();
// First check that the tenant doesn't exist already
if m.get(&tenantid).is_some() {
bail!("tenant {} already exists", tenantid);
}
let tenant = Tenant {
state: TenantState::CloudOnly,
repo: None,
};
m.insert(tenantid, tenant);
}
let wal_redo_manager = Arc::new(PostgresRedoManager::new(conf, tenantid));
let repo = branches::create_repo(conf, tenantid, wal_redo_manager)?;
let repo = Some(branches::create_repo(conf, tenantid, wal_redo_manager)?);
let mut m = access_tenants();
let tenant = m.get_mut(&tenantid).unwrap();
tenant.repo = Some(repo);
tenant.state = TenantState::Idle;
match access_tenants().entry(tenantid) {
hash_map::Entry::Occupied(_) => bail!("tenant {} already exists", tenantid),
hash_map::Entry::Vacant(v) => {
v.insert(Tenant {
state: TenantState::Idle,
repo,
});
}
}
Ok(())
}
// If tenant is not found in the repository, return CloudOnly state
pub fn get_tenant_state(tenantid: ZTenantId) -> TenantState {
let m = access_tenants();
match m.get(&tenantid) {
Some(tenant) => tenant.state,
None => TenantState::CloudOnly,
}
pub fn get_tenant_state(tenantid: ZTenantId) -> Option<TenantState> {
Some(access_tenants().get(&tenantid)?.state)
}
pub fn set_tenant_state(tenantid: ZTenantId, newstate: TenantState) -> Result<TenantState> {
@@ -259,7 +186,7 @@ pub fn set_tenant_state(tenantid: ZTenantId, newstate: TenantState) -> Result<Te
tenant.state = newstate;
Ok(tenant.state)
}
None => bail!("Tenant not found for tenant {}", tenantid),
None => bail!("Tenant not found for id {}", tenantid),
}
}
@@ -280,13 +207,14 @@ pub fn get_timeline_for_tenant(
timelineid: ZTimelineId,
) -> Result<Arc<dyn Timeline>> {
get_repository_for_tenant(tenantid)?
.get_timeline(timelineid)
.with_context(|| format!("cannot fetch timeline {}", timelineid))
.get_timeline(timelineid)?
.local_timeline()
.ok_or_else(|| anyhow!("cannot fetch timeline {}", timelineid))
}
fn list_tenantids() -> Result<Vec<ZTenantId>> {
let m = access_tenants();
m.iter()
access_tenants()
.iter()
.map(|v| {
let (tenantid, _) = v;
Ok(*tenantid)
@@ -302,8 +230,8 @@ pub struct TenantInfo {
}
pub fn list_tenants() -> Result<Vec<TenantInfo>> {
let m = access_tenants();
m.iter()
access_tenants()
.iter()
.map(|v| {
let (id, tenant) = v;
Ok(TenantInfo {

View File

@@ -88,7 +88,7 @@ fn checkpoint_loop(tenantid: ZTenantId, conf: &'static PageServerConf) -> Result
}
loop {
if tenant_mgr::get_tenant_state(tenantid) != TenantState::Active {
if tenant_mgr::get_tenant_state(tenantid) != Some(TenantState::Active) {
break;
}
@@ -102,7 +102,7 @@ fn checkpoint_loop(tenantid: ZTenantId, conf: &'static PageServerConf) -> Result
}
trace!(
"checkpointer thread stopped for tenant {} state is {}",
"checkpointer thread stopped for tenant {} state is {:?}",
tenantid,
tenant_mgr::get_tenant_state(tenantid)
);
@@ -120,7 +120,7 @@ fn gc_loop(tenantid: ZTenantId, conf: &'static PageServerConf) -> Result<()> {
}
loop {
if tenant_mgr::get_tenant_state(tenantid) != TenantState::Active {
if tenant_mgr::get_tenant_state(tenantid) != Some(TenantState::Active) {
break;
}
@@ -135,13 +135,14 @@ fn gc_loop(tenantid: ZTenantId, conf: &'static PageServerConf) -> Result<()> {
// TODO Write it in more adequate way using
// condvar.wait_timeout() or something
let mut sleep_time = conf.gc_period.as_secs();
while sleep_time > 0 && tenant_mgr::get_tenant_state(tenantid) == TenantState::Active {
while sleep_time > 0 && tenant_mgr::get_tenant_state(tenantid) == Some(TenantState::Active)
{
sleep_time -= 1;
std::thread::sleep(Duration::from_secs(1));
}
}
trace!(
"GC thread stopped for tenant {} state is {}",
"GC thread stopped for tenant {} state is {:?}",
tenantid,
tenant_mgr::get_tenant_state(tenantid)
);

View File

@@ -12,7 +12,7 @@ use crate::tenant_mgr::TenantState;
use crate::tenant_threads;
use crate::walrecord::*;
use crate::PageServerConf;
use anyhow::{bail, Error, Result};
use anyhow::{bail, Context, Error, Result};
use lazy_static::lazy_static;
use postgres::fallible_iterator::FallibleIterator;
use postgres::replication::ReplicationIter;
@@ -205,7 +205,13 @@ fn walreceiver_main(
let end_of_wal = Lsn::from(u64::from(identify.xlogpos));
let mut caught_up = false;
let timeline = tenant_mgr::get_timeline_for_tenant(tenantid, timelineid)?;
let timeline =
tenant_mgr::get_timeline_for_tenant(tenantid, timelineid).with_context(|| {
format!(
"Can not start the walrecever for a remote tenant {}, timeline {}",
tenantid, timelineid,
)
})?;
//
// Start streaming the WAL, from where we left off previously.

View File

@@ -70,7 +70,8 @@ def test_tenant_list_psql(zenith_env_builder: ZenithEnvBuilder):
cur = conn.cursor()
# check same tenant cannot be created twice
with pytest.raises(psycopg2.DatabaseError, match=f'tenant {env.initial_tenant} already exists'):
with pytest.raises(psycopg2.DatabaseError,
match=f'repo for {env.initial_tenant} already exists'):
cur.execute(f'tenant_create {env.initial_tenant}')
# create one more tenant

View File

@@ -154,3 +154,84 @@ zid_newtype!(ZTimelineId);
pub struct ZTenantId(ZId);
zid_newtype!(ZTenantId);
/// Serde routines for Option<T> (de)serialization, using `T:Display` representations for inner values.
/// Useful for Option<ZTenantId> and Option<ZTimelineId> to get their hex representations into serialized string and deserialize them back.
pub mod opt_display_serde {
use serde::{de, Deserialize, Deserializer, Serialize, Serializer};
use std::{fmt::Display, str::FromStr};
pub fn serialize<S, Id>(id: &Option<Id>, ser: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
Id: Display,
{
id.as_ref().map(ToString::to_string).serialize(ser)
}
pub fn deserialize<'de, D, Id>(des: D) -> Result<Option<Id>, D::Error>
where
D: Deserializer<'de>,
Id: FromStr,
<Id as FromStr>::Err: Display,
{
Ok(if let Some(s) = Option::<String>::deserialize(des)? {
Some(Id::from_str(&s).map_err(de::Error::custom)?)
} else {
None
})
}
}
#[cfg(test)]
mod tests {
use std::fmt::Display;
use super::*;
use hex::FromHexError;
use hex_literal::hex;
#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)]
struct TestStruct<E: Display, T: FromStr<Err = E> + Display> {
#[serde(with = "opt_display_serde")]
field: Option<T>,
}
#[test]
fn test_hex_serializations_tenant_id() {
let original_struct = TestStruct {
field: Some(ZTenantId::from_array(hex!(
"11223344556677881122334455667788"
))),
};
let serialized_string = serde_json::to_string(&original_struct).unwrap();
assert_eq!(
serialized_string,
r#"{"field":"11223344556677881122334455667788"}"#
);
let deserialized_struct: TestStruct<FromHexError, ZTenantId> =
serde_json::from_str(&serialized_string).unwrap();
assert_eq!(original_struct, deserialized_struct);
}
#[test]
fn test_hex_serializations_timeline_id() {
let original_struct = TestStruct {
field: Some(ZTimelineId::from_array(hex!(
"AA223344556677881122334455667788"
))),
};
let serialized_string = serde_json::to_string(&original_struct).unwrap();
assert_eq!(
serialized_string,
r#"{"field":"aa223344556677881122334455667788"}"#
);
let deserialized_struct: TestStruct<FromHexError, ZTimelineId> =
serde_json::from_str(&serialized_string).unwrap();
assert_eq!(original_struct, deserialized_struct);
}
}