pageserver: generation number support in keys and indices (#5140)

## Problem

To implement split brain protection, we need tenants and timelines to be
aware of their current generation, and use it when composing S3 keys.


## Summary of changes

- A `Generation` type is introduced in the `utils` crate -- it is in
this broadly-visible location because it will later be used from
`control_plane/` as well as `pageserver/`. Generations can be a number,
None, or Broken, to support legacy content (None), and Tenants in the
broken state (Broken).
- Tenant, Timeline, and RemoteTimelineClient all get a generation
attribute
- IndexPart's IndexLayerMetadata has a new `generation` attribute.
Legacy layers' metadata will deserialize to Generation::none().
- Remote paths are composed with a trailing generation suffix. If a
generation is equal to Generation::none() (as it currently always is),
then this suffix is an empty string.
- Functions for composing remote storage paths added in
remote_timeline_client: these avoid the way that we currently always
compose a local path and then strip the prefix, and avoid requiring a
PageserverConf reference on functions that want to create remote paths
(the conf is only needed for local paths). These are less DRY than the
old functions, but remote storage paths are a very rarely changing
thing, so it's better to write out our paths clearly in the functions
than to compose timeline paths from tenant paths, etc.
- Code paths that construct a Tenant take a `generation` argument in
anticipation that we will soon load generations on startup before
constructing Tenant.

Until the whole feature is done, we don't want any generation-ful keys
though: so initially we will carry this everywhere with the special
Generation::none() value.

Closes: https://github.com/neondatabase/neon/issues/5135

Co-authored-by: Christian Schwarz <christian@neon.tech>
This commit is contained in:
John Spray
2023-08-31 09:19:34 +01:00
committed by GitHub
parent f2c21447ce
commit 83ae2bd82c
13 changed files with 434 additions and 160 deletions

View File

@@ -0,0 +1,113 @@
use std::fmt::Debug;
use serde::{Deserialize, Serialize};
/// Tenant generations are used to provide split-brain safety and allow
/// multiple pageservers to attach the same tenant concurrently.
///
/// See docs/rfcs/025-generation-numbers.md for detail on how generation
/// numbers are used.
#[derive(Copy, Clone, Eq, PartialEq, PartialOrd, Ord)]
pub enum Generation {
// Generations with this magic value will not add a suffix to S3 keys, and will not
// be included in persisted index_part.json. This value is only to be used
// during migration from pre-generation metadata to generation-aware metadata,
// and should eventually go away.
//
// A special Generation is used rather than always wrapping Generation in an Option,
// so that code handling generations doesn't have to be aware of the legacy
// case everywhere it touches a generation.
None,
// Generations with this magic value may never be used to construct S3 keys:
// we will panic if someone tries to. This is for Tenants in the "Broken" state,
// so that we can satisfy their constructor with a Generation without risking
// a code bug using it in an S3 write (broken tenants should never write)
Broken,
Valid(u32),
}
/// The Generation type represents a number associated with a Tenant, which
/// increments every time the tenant is attached to a new pageserver, or
/// an attached pageserver restarts.
///
/// It is included as a suffix in S3 keys, as a protection against split-brain
/// scenarios where pageservers might otherwise issue conflicting writes to
/// remote storage
impl Generation {
/// Create a new Generation that represents a legacy key format with
/// no generation suffix
pub fn none() -> Self {
Self::None
}
// Create a new generation that will panic if you try to use get_suffix
pub fn broken() -> Self {
Self::Broken
}
pub fn new(v: u32) -> Self {
Self::Valid(v)
}
pub fn is_none(&self) -> bool {
matches!(self, Self::None)
}
pub fn get_suffix(&self) -> String {
match self {
Self::Valid(v) => {
format!("-{:08x}", v)
}
Self::None => "".into(),
Self::Broken => {
panic!("Tried to use a broken generation");
}
}
}
}
impl Serialize for Generation {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
if let Self::Valid(v) = self {
v.serialize(serializer)
} else {
// We should never be asked to serialize a None or Broken. Structures
// that include an optional generation should convert None to an
// Option<Generation>::None
Err(serde::ser::Error::custom(
"Tried to serialize invalid generation ({self})",
))
}
}
}
impl<'de> Deserialize<'de> for Generation {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
Ok(Self::Valid(u32::deserialize(deserializer)?))
}
}
// We intentionally do not implement Display for Generation, to reduce the
// risk of a bug where the generation is used in a format!() string directly
// instead of using get_suffix().
impl Debug for Generation {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Valid(v) => {
write!(f, "{:08x}", v)
}
Self::None => {
write!(f, "<none>")
}
Self::Broken => {
write!(f, "<broken>")
}
}
}
}

View File

@@ -27,6 +27,9 @@ pub mod id;
// http endpoint utils // http endpoint utils
pub mod http; pub mod http;
// definition of the Generation type for pageserver attachment APIs
pub mod generation;
// common log initialisation routine // common log initialisation routine
pub mod logging; pub mod logging;

View File

@@ -643,23 +643,6 @@ impl PageServerConf {
.join(METADATA_FILE_NAME) .join(METADATA_FILE_NAME)
} }
/// Files on the remote storage are stored with paths, relative to the workdir.
/// That path includes in itself both tenant and timeline ids, allowing to have a unique remote storage path.
///
/// Errors if the path provided does not start from pageserver's workdir.
pub fn remote_path(&self, local_path: &Path) -> anyhow::Result<RemotePath> {
local_path
.strip_prefix(&self.workdir)
.context("Failed to strip workdir prefix")
.and_then(RemotePath::new)
.with_context(|| {
format!(
"Failed to resolve remote part of path {:?} for base {:?}",
local_path, self.workdir
)
})
}
/// Turns storage remote path of a file into its local path. /// Turns storage remote path of a file into its local path.
pub fn local_path(&self, remote_path: &RemotePath) -> PathBuf { pub fn local_path(&self, remote_path: &RemotePath) -> PathBuf {
remote_path.with_base(&self.workdir) remote_path.with_base(&self.workdir)

View File

@@ -85,6 +85,7 @@ pub use pageserver_api::models::TenantState;
use toml_edit; use toml_edit;
use utils::{ use utils::{
crashsafe, crashsafe,
generation::Generation,
id::{TenantId, TimelineId}, id::{TenantId, TimelineId},
lsn::{Lsn, RecordLsn}, lsn::{Lsn, RecordLsn},
}; };
@@ -178,6 +179,11 @@ pub struct Tenant {
tenant_conf: Arc<RwLock<TenantConfOpt>>, tenant_conf: Arc<RwLock<TenantConfOpt>>,
tenant_id: TenantId, tenant_id: TenantId,
/// The remote storage generation, used to protect S3 objects from split-brain.
/// Does not change over the lifetime of the [`Tenant`] object.
generation: Generation,
timelines: Mutex<HashMap<TimelineId, Arc<Timeline>>>, timelines: Mutex<HashMap<TimelineId, Arc<Timeline>>>,
// This mutex prevents creation of new timelines during GC. // This mutex prevents creation of new timelines during GC.
// Adding yet another mutex (in addition to `timelines`) is needed because holding // Adding yet another mutex (in addition to `timelines`) is needed because holding
@@ -522,6 +528,7 @@ impl Tenant {
pub(crate) fn spawn_attach( pub(crate) fn spawn_attach(
conf: &'static PageServerConf, conf: &'static PageServerConf,
tenant_id: TenantId, tenant_id: TenantId,
generation: Generation,
broker_client: storage_broker::BrokerClientChannel, broker_client: storage_broker::BrokerClientChannel,
tenants: &'static tokio::sync::RwLock<TenantsMap>, tenants: &'static tokio::sync::RwLock<TenantsMap>,
remote_storage: GenericRemoteStorage, remote_storage: GenericRemoteStorage,
@@ -538,6 +545,7 @@ impl Tenant {
tenant_conf, tenant_conf,
wal_redo_manager, wal_redo_manager,
tenant_id, tenant_id,
generation,
Some(remote_storage.clone()), Some(remote_storage.clone()),
)); ));
@@ -648,12 +656,8 @@ impl Tenant {
.as_ref() .as_ref()
.ok_or_else(|| anyhow::anyhow!("cannot attach without remote storage"))?; .ok_or_else(|| anyhow::anyhow!("cannot attach without remote storage"))?;
let remote_timeline_ids = remote_timeline_client::list_remote_timelines( let remote_timeline_ids =
remote_storage, remote_timeline_client::list_remote_timelines(remote_storage, self.tenant_id).await?;
self.conf,
self.tenant_id,
)
.await?;
info!("found {} timelines", remote_timeline_ids.len()); info!("found {} timelines", remote_timeline_ids.len());
@@ -665,6 +669,7 @@ impl Tenant {
self.conf, self.conf,
self.tenant_id, self.tenant_id,
timeline_id, timeline_id,
self.generation,
); );
part_downloads.spawn( part_downloads.spawn(
async move { async move {
@@ -851,6 +856,7 @@ impl Tenant {
TenantConfOpt::default(), TenantConfOpt::default(),
wal_redo_manager, wal_redo_manager,
tenant_id, tenant_id,
Generation::broken(),
None, None,
)) ))
} }
@@ -868,6 +874,7 @@ impl Tenant {
pub(crate) fn spawn_load( pub(crate) fn spawn_load(
conf: &'static PageServerConf, conf: &'static PageServerConf,
tenant_id: TenantId, tenant_id: TenantId,
generation: Generation,
resources: TenantSharedResources, resources: TenantSharedResources,
init_order: Option<InitializationOrder>, init_order: Option<InitializationOrder>,
tenants: &'static tokio::sync::RwLock<TenantsMap>, tenants: &'static tokio::sync::RwLock<TenantsMap>,
@@ -893,6 +900,7 @@ impl Tenant {
tenant_conf, tenant_conf,
wal_redo_manager, wal_redo_manager,
tenant_id, tenant_id,
generation,
remote_storage.clone(), remote_storage.clone(),
); );
let tenant = Arc::new(tenant); let tenant = Arc::new(tenant);
@@ -2274,6 +2282,7 @@ impl Tenant {
ancestor, ancestor,
new_timeline_id, new_timeline_id,
self.tenant_id, self.tenant_id,
self.generation,
Arc::clone(&self.walredo_mgr), Arc::clone(&self.walredo_mgr),
resources, resources,
pg_version, pg_version,
@@ -2291,6 +2300,7 @@ impl Tenant {
tenant_conf: TenantConfOpt, tenant_conf: TenantConfOpt,
walredo_mgr: Arc<dyn WalRedoManager + Send + Sync>, walredo_mgr: Arc<dyn WalRedoManager + Send + Sync>,
tenant_id: TenantId, tenant_id: TenantId,
generation: Generation,
remote_storage: Option<GenericRemoteStorage>, remote_storage: Option<GenericRemoteStorage>,
) -> Tenant { ) -> Tenant {
let (state, mut rx) = watch::channel(state); let (state, mut rx) = watch::channel(state);
@@ -2349,6 +2359,7 @@ impl Tenant {
Tenant { Tenant {
tenant_id, tenant_id,
generation,
conf, conf,
// using now here is good enough approximation to catch tenants with really long // using now here is good enough approximation to catch tenants with really long
// activation times. // activation times.
@@ -2931,6 +2942,7 @@ impl Tenant {
self.conf, self.conf,
self.tenant_id, self.tenant_id,
timeline_id, timeline_id,
self.generation,
); );
Some(remote_client) Some(remote_client)
} else { } else {
@@ -3454,6 +3466,7 @@ pub mod harness {
pub conf: &'static PageServerConf, pub conf: &'static PageServerConf,
pub tenant_conf: TenantConf, pub tenant_conf: TenantConf,
pub tenant_id: TenantId, pub tenant_id: TenantId,
pub generation: Generation,
} }
static LOG_HANDLE: OnceCell<()> = OnceCell::new(); static LOG_HANDLE: OnceCell<()> = OnceCell::new();
@@ -3495,6 +3508,7 @@ pub mod harness {
conf, conf,
tenant_conf, tenant_conf,
tenant_id, tenant_id,
generation: Generation::new(0xdeadbeef),
}) })
} }
@@ -3521,6 +3535,7 @@ pub mod harness {
TenantConfOpt::from(self.tenant_conf), TenantConfOpt::from(self.tenant_conf),
walredo_mgr, walredo_mgr,
self.tenant_id, self.tenant_id,
self.generation,
remote_storage, remote_storage,
)); ));
tenant tenant

View File

@@ -25,6 +25,7 @@ use crate::tenant::{create_tenant_files, CreateTenantFilesMode, Tenant, TenantSt
use crate::{InitializationOrder, IGNORED_TENANT_FILE_NAME}; use crate::{InitializationOrder, IGNORED_TENANT_FILE_NAME};
use utils::fs_ext::PathExt; use utils::fs_ext::PathExt;
use utils::generation::Generation;
use utils::id::{TenantId, TimelineId}; use utils::id::{TenantId, TimelineId};
use super::delete::DeleteTenantError; use super::delete::DeleteTenantError;
@@ -202,6 +203,7 @@ pub(crate) fn schedule_local_tenant_processing(
match Tenant::spawn_attach( match Tenant::spawn_attach(
conf, conf,
tenant_id, tenant_id,
Generation::none(),
resources.broker_client, resources.broker_client,
tenants, tenants,
remote_storage, remote_storage,
@@ -224,7 +226,15 @@ pub(crate) fn schedule_local_tenant_processing(
} else { } else {
info!("tenant {tenant_id} is assumed to be loadable, starting load operation"); info!("tenant {tenant_id} is assumed to be loadable, starting load operation");
// Start loading the tenant into memory. It will initially be in Loading state. // Start loading the tenant into memory. It will initially be in Loading state.
Tenant::spawn_load(conf, tenant_id, resources, init_order, tenants, ctx) Tenant::spawn_load(
conf,
tenant_id,
Generation::none(),
resources,
init_order,
tenants,
ctx,
)
}; };
Ok(tenant) Ok(tenant)
} }

View File

@@ -216,7 +216,7 @@ use utils::backoff::{
}; };
use std::collections::{HashMap, VecDeque}; use std::collections::{HashMap, VecDeque};
use std::path::Path; use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicU32, Ordering}; use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
@@ -235,6 +235,7 @@ use crate::task_mgr::shutdown_token;
use crate::tenant::debug_assert_current_span_has_tenant_and_timeline_id; use crate::tenant::debug_assert_current_span_has_tenant_and_timeline_id;
use crate::tenant::remote_timeline_client::index::LayerFileMetadata; use crate::tenant::remote_timeline_client::index::LayerFileMetadata;
use crate::tenant::upload_queue::Delete; use crate::tenant::upload_queue::Delete;
use crate::tenant::TIMELINES_SEGMENT_NAME;
use crate::{ use crate::{
config::PageServerConf, config::PageServerConf,
task_mgr, task_mgr,
@@ -252,6 +253,7 @@ use self::index::IndexPart;
use super::storage_layer::LayerFileName; use super::storage_layer::LayerFileName;
use super::upload_queue::SetDeletedFlagProgress; use super::upload_queue::SetDeletedFlagProgress;
use super::Generation;
// Occasional network issues and such can cause remote operations to fail, and // Occasional network issues and such can cause remote operations to fail, and
// that's expected. If a download fails, we log it at info-level, and retry. // that's expected. If a download fails, we log it at info-level, and retry.
@@ -315,6 +317,7 @@ pub struct RemoteTimelineClient {
tenant_id: TenantId, tenant_id: TenantId,
timeline_id: TimelineId, timeline_id: TimelineId,
generation: Generation,
upload_queue: Mutex<UploadQueue>, upload_queue: Mutex<UploadQueue>,
@@ -335,12 +338,14 @@ impl RemoteTimelineClient {
conf: &'static PageServerConf, conf: &'static PageServerConf,
tenant_id: TenantId, tenant_id: TenantId,
timeline_id: TimelineId, timeline_id: TimelineId,
generation: Generation,
) -> RemoteTimelineClient { ) -> RemoteTimelineClient {
RemoteTimelineClient { RemoteTimelineClient {
conf, conf,
runtime: BACKGROUND_RUNTIME.handle().to_owned(), runtime: BACKGROUND_RUNTIME.handle().to_owned(),
tenant_id, tenant_id,
timeline_id, timeline_id,
generation,
storage_impl: remote_storage, storage_impl: remote_storage,
upload_queue: Mutex::new(UploadQueue::Uninitialized), upload_queue: Mutex::new(UploadQueue::Uninitialized),
metrics: Arc::new(RemoteTimelineClientMetrics::new(&tenant_id, &timeline_id)), metrics: Arc::new(RemoteTimelineClientMetrics::new(&tenant_id, &timeline_id)),
@@ -449,10 +454,10 @@ impl RemoteTimelineClient {
); );
let index_part = download::download_index_part( let index_part = download::download_index_part(
self.conf,
&self.storage_impl, &self.storage_impl,
&self.tenant_id, &self.tenant_id,
&self.timeline_id, &self.timeline_id,
self.generation,
) )
.measure_remote_op( .measure_remote_op(
self.tenant_id, self.tenant_id,
@@ -650,22 +655,41 @@ impl RemoteTimelineClient {
// from latest_files, but not yet scheduled for deletion. Use a closure // from latest_files, but not yet scheduled for deletion. Use a closure
// to syntactically forbid ? or bail! calls here. // to syntactically forbid ? or bail! calls here.
let no_bail_here = || { let no_bail_here = || {
for name in names { // Decorate our list of names with each name's generation, dropping
if upload_queue.latest_files.remove(name).is_some() { // makes that are unexpectedly missing from our metadata.
upload_queue.latest_files_changes_since_metadata_upload_scheduled += 1; let with_generations: Vec<_> = names
} .iter()
} .filter_map(|name| {
// Remove from latest_files, learning the file's remote generation in the process
let meta = upload_queue.latest_files.remove(name);
if let Some(meta) = meta {
upload_queue.latest_files_changes_since_metadata_upload_scheduled += 1;
Some((name, meta.generation))
} else {
// This can only happen if we forgot to to schedule the file upload
// before scheduling the delete. Log it because it is a rare/strange
// situation, and in case something is misbehaving, we'd like to know which
// layers experienced this.
info!(
"Deleting layer {name} not found in latest_files list, never uploaded?"
);
None
}
})
.collect();
if upload_queue.latest_files_changes_since_metadata_upload_scheduled > 0 { if upload_queue.latest_files_changes_since_metadata_upload_scheduled > 0 {
self.schedule_index_upload(upload_queue, metadata); self.schedule_index_upload(upload_queue, metadata);
} }
// schedule the actual deletions // schedule the actual deletions
for name in names { for (name, generation) in with_generations {
let op = UploadOp::Delete(Delete { let op = UploadOp::Delete(Delete {
file_kind: RemoteOpFileKind::Layer, file_kind: RemoteOpFileKind::Layer,
layer_file_name: name.clone(), layer_file_name: name.clone(),
scheduled_from_timeline_delete: false, scheduled_from_timeline_delete: false,
generation,
}); });
self.calls_unfinished_metric_begin(&op); self.calls_unfinished_metric_begin(&op);
upload_queue.queued_operations.push_back(op); upload_queue.queued_operations.push_back(op);
@@ -761,10 +785,10 @@ impl RemoteTimelineClient {
backoff::retry( backoff::retry(
|| { || {
upload::upload_index_part( upload::upload_index_part(
self.conf,
&self.storage_impl, &self.storage_impl,
&self.tenant_id, &self.tenant_id,
&self.timeline_id, &self.timeline_id,
self.generation,
&index_part_with_deleted_at, &index_part_with_deleted_at,
) )
}, },
@@ -822,12 +846,14 @@ impl RemoteTimelineClient {
.reserve(stopped.upload_queue_for_deletion.latest_files.len()); .reserve(stopped.upload_queue_for_deletion.latest_files.len());
// schedule the actual deletions // schedule the actual deletions
for name in stopped.upload_queue_for_deletion.latest_files.keys() { for (name, meta) in &stopped.upload_queue_for_deletion.latest_files {
let op = UploadOp::Delete(Delete { let op = UploadOp::Delete(Delete {
file_kind: RemoteOpFileKind::Layer, file_kind: RemoteOpFileKind::Layer,
layer_file_name: name.clone(), layer_file_name: name.clone(),
scheduled_from_timeline_delete: true, scheduled_from_timeline_delete: true,
generation: meta.generation,
}); });
self.calls_unfinished_metric_begin(&op); self.calls_unfinished_metric_begin(&op);
stopped stopped
.upload_queue_for_deletion .upload_queue_for_deletion
@@ -850,8 +876,7 @@ impl RemoteTimelineClient {
// Do not delete index part yet, it is needed for possible retry. If we remove it first // Do not delete index part yet, it is needed for possible retry. If we remove it first
// and retry will arrive to different pageserver there wont be any traces of it on remote storage // and retry will arrive to different pageserver there wont be any traces of it on remote storage
let timeline_path = self.conf.timeline_path(&self.tenant_id, &self.timeline_id); let timeline_storage_path = remote_timeline_path(&self.tenant_id, &self.timeline_id);
let timeline_storage_path = self.conf.remote_path(&timeline_path)?;
let remaining = backoff::retry( let remaining = backoff::retry(
|| async { || async {
@@ -1055,15 +1080,17 @@ impl RemoteTimelineClient {
let upload_result: anyhow::Result<()> = match &task.op { let upload_result: anyhow::Result<()> = match &task.op {
UploadOp::UploadLayer(ref layer_file_name, ref layer_metadata) => { UploadOp::UploadLayer(ref layer_file_name, ref layer_metadata) => {
let path = &self let path = self
.conf .conf
.timeline_path(&self.tenant_id, &self.timeline_id) .timeline_path(&self.tenant_id, &self.timeline_id)
.join(layer_file_name.file_name()); .join(layer_file_name.file_name());
upload::upload_timeline_layer( upload::upload_timeline_layer(
self.conf, self.conf,
&self.storage_impl, &self.storage_impl,
path, &path,
layer_metadata, layer_metadata,
self.generation,
) )
.measure_remote_op( .measure_remote_op(
self.tenant_id, self.tenant_id,
@@ -1085,10 +1112,10 @@ impl RemoteTimelineClient {
}; };
let res = upload::upload_index_part( let res = upload::upload_index_part(
self.conf,
&self.storage_impl, &self.storage_impl,
&self.tenant_id, &self.tenant_id,
&self.timeline_id, &self.timeline_id,
self.generation,
index_part, index_part,
) )
.measure_remote_op( .measure_remote_op(
@@ -1113,7 +1140,7 @@ impl RemoteTimelineClient {
.conf .conf
.timeline_path(&self.tenant_id, &self.timeline_id) .timeline_path(&self.tenant_id, &self.timeline_id)
.join(delete.layer_file_name.file_name()); .join(delete.layer_file_name.file_name());
delete::delete_layer(self.conf, &self.storage_impl, path) delete::delete_layer(self.conf, &self.storage_impl, path, delete.generation)
.measure_remote_op( .measure_remote_op(
self.tenant_id, self.tenant_id,
self.timeline_id, self.timeline_id,
@@ -1360,6 +1387,71 @@ impl RemoteTimelineClient {
} }
} }
pub fn remote_timelines_path(tenant_id: &TenantId) -> RemotePath {
let path = format!("tenants/{tenant_id}/{TIMELINES_SEGMENT_NAME}");
RemotePath::from_string(&path).expect("Failed to construct path")
}
pub fn remote_timeline_path(tenant_id: &TenantId, timeline_id: &TimelineId) -> RemotePath {
remote_timelines_path(tenant_id).join(&PathBuf::from(timeline_id.to_string()))
}
pub fn remote_layer_path(
tenant_id: &TenantId,
timeline_id: &TimelineId,
layer_file_name: &LayerFileName,
layer_meta: &LayerFileMetadata,
) -> RemotePath {
// Generation-aware key format
let path = format!(
"tenants/{tenant_id}/{TIMELINES_SEGMENT_NAME}/{timeline_id}/{0}{1}",
layer_file_name.file_name(),
layer_meta.generation.get_suffix()
);
RemotePath::from_string(&path).expect("Failed to construct path")
}
pub fn remote_index_path(
tenant_id: &TenantId,
timeline_id: &TimelineId,
generation: Generation,
) -> RemotePath {
RemotePath::from_string(&format!(
"tenants/{tenant_id}/{TIMELINES_SEGMENT_NAME}/{timeline_id}/{0}{1}",
IndexPart::FILE_NAME,
generation.get_suffix()
))
.expect("Failed to construct path")
}
/// Files on the remote storage are stored with paths, relative to the workdir.
/// That path includes in itself both tenant and timeline ids, allowing to have a unique remote storage path.
///
/// Errors if the path provided does not start from pageserver's workdir.
pub fn remote_path(
conf: &PageServerConf,
local_path: &Path,
generation: Generation,
) -> anyhow::Result<RemotePath> {
let stripped = local_path
.strip_prefix(&conf.workdir)
.context("Failed to strip workdir prefix")?;
let suffixed = format!(
"{0}{1}",
stripped.to_string_lossy(),
generation.get_suffix()
);
RemotePath::new(&PathBuf::from(suffixed)).with_context(|| {
format!(
"to resolve remote part of path {:?} for base {:?}",
local_path, conf.workdir
)
})
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;
@@ -1367,7 +1459,7 @@ mod tests {
context::RequestContext, context::RequestContext,
tenant::{ tenant::{
harness::{TenantHarness, TIMELINE_ID}, harness::{TenantHarness, TIMELINE_ID},
Tenant, Timeline, Generation, Tenant, Timeline,
}, },
DEFAULT_PG_VERSION, DEFAULT_PG_VERSION,
}; };
@@ -1409,8 +1501,11 @@ mod tests {
assert_eq!(avec, bvec); assert_eq!(avec, bvec);
} }
fn assert_remote_files(expected: &[&str], remote_path: &Path) { fn assert_remote_files(expected: &[&str], remote_path: &Path, generation: Generation) {
let mut expected: Vec<String> = expected.iter().map(|x| String::from(*x)).collect(); let mut expected: Vec<String> = expected
.iter()
.map(|x| format!("{}{}", x, generation.get_suffix()))
.collect();
expected.sort(); expected.sort();
let mut found: Vec<String> = Vec::new(); let mut found: Vec<String> = Vec::new();
@@ -1461,6 +1556,8 @@ mod tests {
storage: RemoteStorageKind::LocalFs(remote_fs_dir.clone()), storage: RemoteStorageKind::LocalFs(remote_fs_dir.clone()),
}; };
let generation = Generation::new(0xdeadbeef);
let storage = GenericRemoteStorage::from_config(&storage_config).unwrap(); let storage = GenericRemoteStorage::from_config(&storage_config).unwrap();
let client = Arc::new(RemoteTimelineClient { let client = Arc::new(RemoteTimelineClient {
@@ -1468,6 +1565,7 @@ mod tests {
runtime: tokio::runtime::Handle::current(), runtime: tokio::runtime::Handle::current(),
tenant_id: harness.tenant_id, tenant_id: harness.tenant_id,
timeline_id: TIMELINE_ID, timeline_id: TIMELINE_ID,
generation,
storage_impl: storage, storage_impl: storage,
upload_queue: Mutex::new(UploadQueue::Uninitialized), upload_queue: Mutex::new(UploadQueue::Uninitialized),
metrics: Arc::new(RemoteTimelineClientMetrics::new( metrics: Arc::new(RemoteTimelineClientMetrics::new(
@@ -1526,6 +1624,8 @@ mod tests {
.init_upload_queue_for_empty_remote(&metadata) .init_upload_queue_for_empty_remote(&metadata)
.unwrap(); .unwrap();
let generation = Generation::new(0xdeadbeef);
// Create a couple of dummy files, schedule upload for them // Create a couple of dummy files, schedule upload for them
let layer_file_name_1: LayerFileName = "000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap(); let layer_file_name_1: LayerFileName = "000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap();
let layer_file_name_2: LayerFileName = "000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D9-00000000016B5A52".parse().unwrap(); let layer_file_name_2: LayerFileName = "000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D9-00000000016B5A52".parse().unwrap();
@@ -1545,13 +1645,13 @@ mod tests {
client client
.schedule_layer_file_upload( .schedule_layer_file_upload(
&layer_file_name_1, &layer_file_name_1,
&LayerFileMetadata::new(content_1.len() as u64), &LayerFileMetadata::new(content_1.len() as u64, generation),
) )
.unwrap(); .unwrap();
client client
.schedule_layer_file_upload( .schedule_layer_file_upload(
&layer_file_name_2, &layer_file_name_2,
&LayerFileMetadata::new(content_2.len() as u64), &LayerFileMetadata::new(content_2.len() as u64, generation),
) )
.unwrap(); .unwrap();
@@ -1615,7 +1715,7 @@ mod tests {
client client
.schedule_layer_file_upload( .schedule_layer_file_upload(
&layer_file_name_3, &layer_file_name_3,
&LayerFileMetadata::new(content_3.len() as u64), &LayerFileMetadata::new(content_3.len() as u64, generation),
) )
.unwrap(); .unwrap();
client client
@@ -1639,6 +1739,7 @@ mod tests {
"index_part.json", "index_part.json",
], ],
&remote_timeline_dir, &remote_timeline_dir,
generation,
); );
// Finish them // Finish them
@@ -1651,6 +1752,7 @@ mod tests {
"index_part.json", "index_part.json",
], ],
&remote_timeline_dir, &remote_timeline_dir,
generation,
); );
} }
@@ -1703,12 +1805,14 @@ mod tests {
// Test // Test
let generation = Generation::new(0xdeadbeef);
let init = get_bytes_started_stopped(); let init = get_bytes_started_stopped();
client client
.schedule_layer_file_upload( .schedule_layer_file_upload(
&layer_file_name_1, &layer_file_name_1,
&LayerFileMetadata::new(content_1.len() as u64), &LayerFileMetadata::new(content_1.len() as u64, generation),
) )
.unwrap(); .unwrap();

View File

@@ -5,25 +5,30 @@ use tracing::debug;
use remote_storage::GenericRemoteStorage; use remote_storage::GenericRemoteStorage;
use crate::config::PageServerConf; use crate::{
config::PageServerConf,
tenant::{remote_timeline_client::remote_path, Generation},
};
pub(super) async fn delete_layer<'a>( pub(super) async fn delete_layer<'a>(
conf: &'static PageServerConf, conf: &'static PageServerConf,
storage: &'a GenericRemoteStorage, storage: &'a GenericRemoteStorage,
local_layer_path: &'a Path, local_layer_path: &'a Path,
generation: Generation,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
fail::fail_point!("before-delete-layer", |_| { fail::fail_point!("before-delete-layer", |_| {
anyhow::bail!("failpoint before-delete-layer") anyhow::bail!("failpoint before-delete-layer")
}); });
debug!("Deleting layer from remote storage: {local_layer_path:?}",); debug!("Deleting layer from remote storage: {local_layer_path:?}",);
let path_to_delete = conf.remote_path(local_layer_path)?; let path_to_delete = remote_path(conf, local_layer_path, generation)?;
// We don't want to print an error if the delete failed if the file has // We don't want to print an error if the delete failed if the file has
// already been deleted. Thankfully, in this situation S3 already // already been deleted. Thankfully, in this situation S3 already
// does not yield an error. While OS-provided local file system APIs do yield // does not yield an error. While OS-provided local file system APIs do yield
// errors, we avoid them in the `LocalFs` wrapper. // errors, we avoid them in the `LocalFs` wrapper.
storage.delete(&path_to_delete).await.with_context(|| { storage
format!("Failed to delete remote layer from storage at {path_to_delete:?}") .delete(&path_to_delete)
}) .await
.with_context(|| format!("delete remote layer from storage at {path_to_delete:?}"))
} }

View File

@@ -15,14 +15,16 @@ use tokio_util::sync::CancellationToken;
use utils::{backoff, crashsafe}; use utils::{backoff, crashsafe};
use crate::config::PageServerConf; use crate::config::PageServerConf;
use crate::tenant::remote_timeline_client::{remote_layer_path, remote_timelines_path};
use crate::tenant::storage_layer::LayerFileName; use crate::tenant::storage_layer::LayerFileName;
use crate::tenant::timeline::span::debug_assert_current_span_has_tenant_and_timeline_id; use crate::tenant::timeline::span::debug_assert_current_span_has_tenant_and_timeline_id;
use crate::tenant::Generation;
use remote_storage::{DownloadError, GenericRemoteStorage}; use remote_storage::{DownloadError, GenericRemoteStorage};
use utils::crashsafe::path_with_suffix_extension; use utils::crashsafe::path_with_suffix_extension;
use utils::id::{TenantId, TimelineId}; use utils::id::{TenantId, TimelineId};
use super::index::{IndexPart, LayerFileMetadata}; use super::index::{IndexPart, LayerFileMetadata};
use super::{FAILED_DOWNLOAD_WARN_THRESHOLD, FAILED_REMOTE_OP_RETRIES}; use super::{remote_index_path, FAILED_DOWNLOAD_WARN_THRESHOLD, FAILED_REMOTE_OP_RETRIES};
static MAX_DOWNLOAD_DURATION: Duration = Duration::from_secs(120); static MAX_DOWNLOAD_DURATION: Duration = Duration::from_secs(120);
@@ -41,13 +43,11 @@ pub async fn download_layer_file<'a>(
) -> Result<u64, DownloadError> { ) -> Result<u64, DownloadError> {
debug_assert_current_span_has_tenant_and_timeline_id(); debug_assert_current_span_has_tenant_and_timeline_id();
let timeline_path = conf.timeline_path(&tenant_id, &timeline_id); let local_path = conf
.timeline_path(&tenant_id, &timeline_id)
.join(layer_file_name.file_name());
let local_path = timeline_path.join(layer_file_name.file_name()); let remote_path = remote_layer_path(&tenant_id, &timeline_id, layer_file_name, layer_metadata);
let remote_path = conf
.remote_path(&local_path)
.map_err(DownloadError::Other)?;
// Perform a rename inspired by durable_rename from file_utils.c. // Perform a rename inspired by durable_rename from file_utils.c.
// The sequence: // The sequence:
@@ -64,33 +64,43 @@ pub async fn download_layer_file<'a>(
let (mut destination_file, bytes_amount) = download_retry( let (mut destination_file, bytes_amount) = download_retry(
|| async { || async {
// TODO: this doesn't use the cached fd for some reason? // TODO: this doesn't use the cached fd for some reason?
let mut destination_file = fs::File::create(&temp_file_path).await.with_context(|| { let mut destination_file = fs::File::create(&temp_file_path)
format!( .await
"create a destination file for layer '{}'", .with_context(|| {
temp_file_path.display() format!(
) "create a destination file for layer '{}'",
}) temp_file_path.display()
.map_err(DownloadError::Other)?; )
let mut download = storage.download(&remote_path).await.with_context(|| { })
format!( .map_err(DownloadError::Other)?;
let mut download = storage
.download(&remote_path)
.await
.with_context(|| {
format!(
"open a download stream for layer with remote storage path '{remote_path:?}'" "open a download stream for layer with remote storage path '{remote_path:?}'"
) )
})
.map_err(DownloadError::Other)?;
let bytes_amount = tokio::time::timeout(MAX_DOWNLOAD_DURATION, tokio::io::copy(&mut download.download_stream, &mut destination_file))
.await
.map_err(|e| DownloadError::Other(anyhow::anyhow!("Timed out {:?}", e)))?
.with_context(|| {
format!("Failed to download layer with remote storage path '{remote_path:?}' into file {temp_file_path:?}")
}) })
.map_err(DownloadError::Other)?; .map_err(DownloadError::Other)?;
Ok((destination_file, bytes_amount)) let bytes_amount = tokio::time::timeout(
MAX_DOWNLOAD_DURATION,
tokio::io::copy(&mut download.download_stream, &mut destination_file),
)
.await
.map_err(|e| DownloadError::Other(anyhow::anyhow!("Timed out {:?}", e)))?
.with_context(|| {
format!(
"download layer at remote path '{remote_path:?}' into file {temp_file_path:?}"
)
})
.map_err(DownloadError::Other)?;
Ok((destination_file, bytes_amount))
}, },
&format!("download {remote_path:?}"), &format!("download {remote_path:?}"),
).await?; )
.await?;
// Tokio doc here: https://docs.rs/tokio/1.17.0/tokio/fs/struct.File.html states that: // Tokio doc here: https://docs.rs/tokio/1.17.0/tokio/fs/struct.File.html states that:
// A file will not be closed immediately when it goes out of scope if there are any IO operations // A file will not be closed immediately when it goes out of scope if there are any IO operations
@@ -103,12 +113,7 @@ pub async fn download_layer_file<'a>(
destination_file destination_file
.flush() .flush()
.await .await
.with_context(|| { .with_context(|| format!("flush source file at {}", temp_file_path.display()))
format!(
"failed to flush source file at {}",
temp_file_path.display()
)
})
.map_err(DownloadError::Other)?; .map_err(DownloadError::Other)?;
let expected = layer_metadata.file_size(); let expected = layer_metadata.file_size();
@@ -139,17 +144,12 @@ pub async fn download_layer_file<'a>(
fs::rename(&temp_file_path, &local_path) fs::rename(&temp_file_path, &local_path)
.await .await
.with_context(|| { .with_context(|| format!("rename download layer file to {}", local_path.display(),))
format!(
"Could not rename download layer file to {}",
local_path.display(),
)
})
.map_err(DownloadError::Other)?; .map_err(DownloadError::Other)?;
crashsafe::fsync_async(&local_path) crashsafe::fsync_async(&local_path)
.await .await
.with_context(|| format!("Could not fsync layer file {}", local_path.display(),)) .with_context(|| format!("fsync layer file {}", local_path.display(),))
.map_err(DownloadError::Other)?; .map_err(DownloadError::Other)?;
tracing::debug!("download complete: {}", local_path.display()); tracing::debug!("download complete: {}", local_path.display());
@@ -173,21 +173,19 @@ pub fn is_temp_download_file(path: &Path) -> bool {
} }
/// List timelines of given tenant in remote storage /// List timelines of given tenant in remote storage
pub async fn list_remote_timelines<'a>( pub async fn list_remote_timelines(
storage: &'a GenericRemoteStorage, storage: &GenericRemoteStorage,
conf: &'static PageServerConf,
tenant_id: TenantId, tenant_id: TenantId,
) -> anyhow::Result<HashSet<TimelineId>> { ) -> anyhow::Result<HashSet<TimelineId>> {
let tenant_path = conf.timelines_path(&tenant_id); let remote_path = remote_timelines_path(&tenant_id);
let tenant_storage_path = conf.remote_path(&tenant_path)?;
fail::fail_point!("storage-sync-list-remote-timelines", |_| { fail::fail_point!("storage-sync-list-remote-timelines", |_| {
anyhow::bail!("storage-sync-list-remote-timelines"); anyhow::bail!("storage-sync-list-remote-timelines");
}); });
let timelines = download_retry( let timelines = download_retry(
|| storage.list_prefixes(Some(&tenant_storage_path)), || storage.list_prefixes(Some(&remote_path)),
&format!("list prefixes for {tenant_path:?}"), &format!("list prefixes for {tenant_id}"),
) )
.await?; .await?;
@@ -202,9 +200,9 @@ pub async fn list_remote_timelines<'a>(
anyhow::anyhow!("failed to get timeline id for remote tenant {tenant_id}") anyhow::anyhow!("failed to get timeline id for remote tenant {tenant_id}")
})?; })?;
let timeline_id: TimelineId = object_name.parse().with_context(|| { let timeline_id: TimelineId = object_name
format!("failed to parse object name into timeline id '{object_name}'") .parse()
})?; .with_context(|| format!("parse object name into timeline id '{object_name}'"))?;
// list_prefixes is assumed to return unique names. Ensure this here. // list_prefixes is assumed to return unique names. Ensure this here.
// NB: it's safer to bail out than warn-log this because the pageserver // NB: it's safer to bail out than warn-log this because the pageserver
@@ -222,21 +220,16 @@ pub async fn list_remote_timelines<'a>(
} }
pub(super) async fn download_index_part( pub(super) async fn download_index_part(
conf: &'static PageServerConf,
storage: &GenericRemoteStorage, storage: &GenericRemoteStorage,
tenant_id: &TenantId, tenant_id: &TenantId,
timeline_id: &TimelineId, timeline_id: &TimelineId,
generation: Generation,
) -> Result<IndexPart, DownloadError> { ) -> Result<IndexPart, DownloadError> {
let index_part_path = conf let remote_path = remote_index_path(tenant_id, timeline_id, generation);
.metadata_path(tenant_id, timeline_id)
.with_file_name(IndexPart::FILE_NAME);
let part_storage_path = conf
.remote_path(&index_part_path)
.map_err(DownloadError::BadInput)?;
let index_part_bytes = download_retry( let index_part_bytes = download_retry(
|| async { || async {
let mut index_part_download = storage.download(&part_storage_path).await?; let mut index_part_download = storage.download(&remote_path).await?;
let mut index_part_bytes = Vec::new(); let mut index_part_bytes = Vec::new();
tokio::io::copy( tokio::io::copy(
@@ -244,20 +237,16 @@ pub(super) async fn download_index_part(
&mut index_part_bytes, &mut index_part_bytes,
) )
.await .await
.with_context(|| { .with_context(|| format!("download index part at {remote_path:?}"))
format!("Failed to download an index part into file {index_part_path:?}")
})
.map_err(DownloadError::Other)?; .map_err(DownloadError::Other)?;
Ok(index_part_bytes) Ok(index_part_bytes)
}, },
&format!("download {part_storage_path:?}"), &format!("download {remote_path:?}"),
) )
.await?; .await?;
let index_part: IndexPart = serde_json::from_slice(&index_part_bytes) let index_part: IndexPart = serde_json::from_slice(&index_part_bytes)
.with_context(|| { .with_context(|| format!("download index part file at {remote_path:?}"))
format!("Failed to deserialize index part file into file {index_part_path:?}")
})
.map_err(DownloadError::Other)?; .map_err(DownloadError::Other)?;
Ok(index_part) Ok(index_part)

View File

@@ -12,6 +12,7 @@ use utils::bin_ser::SerializeError;
use crate::tenant::metadata::TimelineMetadata; use crate::tenant::metadata::TimelineMetadata;
use crate::tenant::storage_layer::LayerFileName; use crate::tenant::storage_layer::LayerFileName;
use crate::tenant::upload_queue::UploadQueueInitialized; use crate::tenant::upload_queue::UploadQueueInitialized;
use crate::tenant::Generation;
use utils::lsn::Lsn; use utils::lsn::Lsn;
@@ -20,22 +21,28 @@ use utils::lsn::Lsn;
/// Fields have to be `Option`s because remote [`IndexPart`]'s can be from different version, which /// Fields have to be `Option`s because remote [`IndexPart`]'s can be from different version, which
/// might have less or more metadata depending if upgrading or rolling back an upgrade. /// might have less or more metadata depending if upgrading or rolling back an upgrade.
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)] #[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
#[cfg_attr(test, derive(Default))] //#[cfg_attr(test, derive(Default))]
pub struct LayerFileMetadata { pub struct LayerFileMetadata {
file_size: u64, file_size: u64,
pub(crate) generation: Generation,
} }
impl From<&'_ IndexLayerMetadata> for LayerFileMetadata { impl From<&'_ IndexLayerMetadata> for LayerFileMetadata {
fn from(other: &IndexLayerMetadata) -> Self { fn from(other: &IndexLayerMetadata) -> Self {
LayerFileMetadata { LayerFileMetadata {
file_size: other.file_size, file_size: other.file_size,
generation: other.generation,
} }
} }
} }
impl LayerFileMetadata { impl LayerFileMetadata {
pub fn new(file_size: u64) -> Self { pub fn new(file_size: u64, generation: Generation) -> Self {
LayerFileMetadata { file_size } LayerFileMetadata {
file_size,
generation,
}
} }
pub fn file_size(&self) -> u64 { pub fn file_size(&self) -> u64 {
@@ -128,15 +135,20 @@ impl TryFrom<&UploadQueueInitialized> for IndexPart {
} }
/// Serialized form of [`LayerFileMetadata`]. /// Serialized form of [`LayerFileMetadata`].
#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize, Default)] #[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)]
pub struct IndexLayerMetadata { pub struct IndexLayerMetadata {
pub(super) file_size: u64, pub(super) file_size: u64,
#[serde(default = "Generation::none")]
#[serde(skip_serializing_if = "Generation::is_none")]
pub(super) generation: Generation,
} }
impl From<LayerFileMetadata> for IndexLayerMetadata { impl From<LayerFileMetadata> for IndexLayerMetadata {
fn from(other: LayerFileMetadata) -> Self { fn from(other: LayerFileMetadata) -> Self {
IndexLayerMetadata { IndexLayerMetadata {
file_size: other.file_size, file_size: other.file_size,
generation: other.generation,
} }
} }
} }
@@ -164,11 +176,13 @@ mod tests {
layer_metadata: HashMap::from([ layer_metadata: HashMap::from([
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9".parse().unwrap(), IndexLayerMetadata { ("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9".parse().unwrap(), IndexLayerMetadata {
file_size: 25600000, file_size: 25600000,
generation: Generation::none()
}), }),
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap(), IndexLayerMetadata { ("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap(), IndexLayerMetadata {
// serde_json should always parse this but this might be a double with jq for // serde_json should always parse this but this might be a double with jq for
// example. // example.
file_size: 9007199254741001, file_size: 9007199254741001,
generation: Generation::none()
}) })
]), ]),
disk_consistent_lsn: "0/16960E8".parse::<Lsn>().unwrap(), disk_consistent_lsn: "0/16960E8".parse::<Lsn>().unwrap(),
@@ -200,11 +214,13 @@ mod tests {
layer_metadata: HashMap::from([ layer_metadata: HashMap::from([
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9".parse().unwrap(), IndexLayerMetadata { ("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9".parse().unwrap(), IndexLayerMetadata {
file_size: 25600000, file_size: 25600000,
generation: Generation::none()
}), }),
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap(), IndexLayerMetadata { ("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap(), IndexLayerMetadata {
// serde_json should always parse this but this might be a double with jq for // serde_json should always parse this but this might be a double with jq for
// example. // example.
file_size: 9007199254741001, file_size: 9007199254741001,
generation: Generation::none()
}) })
]), ]),
disk_consistent_lsn: "0/16960E8".parse::<Lsn>().unwrap(), disk_consistent_lsn: "0/16960E8".parse::<Lsn>().unwrap(),
@@ -237,11 +253,13 @@ mod tests {
layer_metadata: HashMap::from([ layer_metadata: HashMap::from([
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9".parse().unwrap(), IndexLayerMetadata { ("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9".parse().unwrap(), IndexLayerMetadata {
file_size: 25600000, file_size: 25600000,
generation: Generation::none()
}), }),
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap(), IndexLayerMetadata { ("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap(), IndexLayerMetadata {
// serde_json should always parse this but this might be a double with jq for // serde_json should always parse this but this might be a double with jq for
// example. // example.
file_size: 9007199254741001, file_size: 9007199254741001,
generation: Generation::none()
}) })
]), ]),
disk_consistent_lsn: "0/16960E8".parse::<Lsn>().unwrap(), disk_consistent_lsn: "0/16960E8".parse::<Lsn>().unwrap(),

View File

@@ -5,7 +5,11 @@ use fail::fail_point;
use std::{io::ErrorKind, path::Path}; use std::{io::ErrorKind, path::Path};
use tokio::fs; use tokio::fs;
use crate::{config::PageServerConf, tenant::remote_timeline_client::index::IndexPart}; use super::Generation;
use crate::{
config::PageServerConf,
tenant::remote_timeline_client::{index::IndexPart, remote_index_path, remote_path},
};
use remote_storage::GenericRemoteStorage; use remote_storage::GenericRemoteStorage;
use utils::id::{TenantId, TimelineId}; use utils::id::{TenantId, TimelineId};
@@ -15,10 +19,10 @@ use tracing::info;
/// Serializes and uploads the given index part data to the remote storage. /// Serializes and uploads the given index part data to the remote storage.
pub(super) async fn upload_index_part<'a>( pub(super) async fn upload_index_part<'a>(
conf: &'static PageServerConf,
storage: &'a GenericRemoteStorage, storage: &'a GenericRemoteStorage,
tenant_id: &TenantId, tenant_id: &TenantId,
timeline_id: &TimelineId, timeline_id: &TimelineId,
generation: Generation,
index_part: &'a IndexPart, index_part: &'a IndexPart,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
tracing::trace!("uploading new index part"); tracing::trace!("uploading new index part");
@@ -27,20 +31,16 @@ pub(super) async fn upload_index_part<'a>(
bail!("failpoint before-upload-index") bail!("failpoint before-upload-index")
}); });
let index_part_bytes = serde_json::to_vec(&index_part) let index_part_bytes =
.context("Failed to serialize index part file into bytes")?; serde_json::to_vec(&index_part).context("serialize index part file into bytes")?;
let index_part_size = index_part_bytes.len(); let index_part_size = index_part_bytes.len();
let index_part_bytes = tokio::io::BufReader::new(std::io::Cursor::new(index_part_bytes)); let index_part_bytes = tokio::io::BufReader::new(std::io::Cursor::new(index_part_bytes));
let index_part_path = conf let remote_path = remote_index_path(tenant_id, timeline_id, generation);
.metadata_path(tenant_id, timeline_id)
.with_file_name(IndexPart::FILE_NAME);
let storage_path = conf.remote_path(&index_part_path)?;
storage storage
.upload_storage_object(Box::new(index_part_bytes), index_part_size, &storage_path) .upload_storage_object(Box::new(index_part_bytes), index_part_size, &remote_path)
.await .await
.with_context(|| format!("Failed to upload index part for '{tenant_id} / {timeline_id}'")) .with_context(|| format!("upload index part for '{tenant_id} / {timeline_id}'"))
} }
/// Attempts to upload given layer files. /// Attempts to upload given layer files.
@@ -52,12 +52,13 @@ pub(super) async fn upload_timeline_layer<'a>(
storage: &'a GenericRemoteStorage, storage: &'a GenericRemoteStorage,
source_path: &'a Path, source_path: &'a Path,
known_metadata: &'a LayerFileMetadata, known_metadata: &'a LayerFileMetadata,
generation: Generation,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
fail_point!("before-upload-layer", |_| { fail_point!("before-upload-layer", |_| {
bail!("failpoint before-upload-layer") bail!("failpoint before-upload-layer")
}); });
let storage_path = conf.remote_path(source_path)?;
let storage_path = remote_path(conf, source_path, generation)?;
let source_file_res = fs::File::open(&source_path).await; let source_file_res = fs::File::open(&source_path).await;
let source_file = match source_file_res { let source_file = match source_file_res {
Ok(source_file) => source_file, Ok(source_file) => source_file,
@@ -70,16 +71,15 @@ pub(super) async fn upload_timeline_layer<'a>(
info!(path = %source_path.display(), "File to upload doesn't exist. Likely the file has been deleted and an upload is not required any more."); info!(path = %source_path.display(), "File to upload doesn't exist. Likely the file has been deleted and an upload is not required any more.");
return Ok(()); return Ok(());
} }
Err(e) => Err(e) Err(e) => {
.with_context(|| format!("Failed to open a source file for layer {source_path:?}"))?, Err(e).with_context(|| format!("open a source file for layer {source_path:?}"))?
}
}; };
let fs_size = source_file let fs_size = source_file
.metadata() .metadata()
.await .await
.with_context(|| { .with_context(|| format!("get the source file metadata for layer {source_path:?}"))?
format!("Failed to get the source file metadata for layer {source_path:?}")
})?
.len(); .len();
let metadata_size = known_metadata.file_size(); let metadata_size = known_metadata.file_size();
@@ -87,19 +87,13 @@ pub(super) async fn upload_timeline_layer<'a>(
bail!("File {source_path:?} has its current FS size {fs_size} diferent from initially determined {metadata_size}"); bail!("File {source_path:?} has its current FS size {fs_size} diferent from initially determined {metadata_size}");
} }
let fs_size = usize::try_from(fs_size).with_context(|| { let fs_size = usize::try_from(fs_size)
format!("File {source_path:?} size {fs_size} could not be converted to usize") .with_context(|| format!("convert {source_path:?} size {fs_size} usize"))?;
})?;
storage storage
.upload(source_file, fs_size, &storage_path, None) .upload(source_file, fs_size, &storage_path, None)
.await .await
.with_context(|| { .with_context(|| format!("upload layer from local path '{}'", source_path.display()))?;
format!(
"Failed to upload a layer from local path '{}'",
source_path.display()
)
})?;
Ok(()) Ok(())
} }

View File

@@ -67,6 +67,7 @@ use postgres_connection::PgConnectionConfig;
use postgres_ffi::to_pg_timestamp; use postgres_ffi::to_pg_timestamp;
use utils::{ use utils::{
completion, completion,
generation::Generation,
id::{TenantId, TimelineId}, id::{TenantId, TimelineId},
lsn::{AtomicLsn, Lsn, RecordLsn}, lsn::{AtomicLsn, Lsn, RecordLsn},
seqwait::SeqWait, seqwait::SeqWait,
@@ -152,6 +153,10 @@ pub struct Timeline {
pub tenant_id: TenantId, pub tenant_id: TenantId,
pub timeline_id: TimelineId, pub timeline_id: TimelineId,
/// The generation of the tenant that instantiated us: this is used for safety when writing remote objects.
/// Never changes for the lifetime of this [`Timeline`] object.
generation: Generation,
pub pg_version: u32, pub pg_version: u32,
/// The tuple has two elements. /// The tuple has two elements.
@@ -1199,7 +1204,7 @@ impl Timeline {
Ok(delta) => Some(delta), Ok(delta) => Some(delta),
}; };
let layer_metadata = LayerFileMetadata::new(layer_file_size); let layer_metadata = LayerFileMetadata::new(layer_file_size, self.generation);
let new_remote_layer = Arc::new(match local_layer.filename() { let new_remote_layer = Arc::new(match local_layer.filename() {
LayerFileName::Image(image_name) => RemoteLayer::new_img( LayerFileName::Image(image_name) => RemoteLayer::new_img(
@@ -1377,6 +1382,7 @@ impl Timeline {
ancestor: Option<Arc<Timeline>>, ancestor: Option<Arc<Timeline>>,
timeline_id: TimelineId, timeline_id: TimelineId,
tenant_id: TenantId, tenant_id: TenantId,
generation: Generation,
walredo_mgr: Arc<dyn WalRedoManager + Send + Sync>, walredo_mgr: Arc<dyn WalRedoManager + Send + Sync>,
resources: TimelineResources, resources: TimelineResources,
pg_version: u32, pg_version: u32,
@@ -1406,6 +1412,7 @@ impl Timeline {
myself: myself.clone(), myself: myself.clone(),
timeline_id, timeline_id,
tenant_id, tenant_id,
generation,
pg_version, pg_version,
layers: Arc::new(tokio::sync::RwLock::new(LayerManager::create())), layers: Arc::new(tokio::sync::RwLock::new(LayerManager::create())),
wanted_image_layers: Mutex::new(None), wanted_image_layers: Mutex::new(None),
@@ -1615,6 +1622,9 @@ impl Timeline {
let (conf, tenant_id, timeline_id) = (self.conf, self.tenant_id, self.timeline_id); let (conf, tenant_id, timeline_id) = (self.conf, self.tenant_id, self.timeline_id);
let span = tracing::Span::current(); let span = tracing::Span::current();
// Copy to move into the task we're about to spawn
let generation = self.generation;
let (loaded_layers, to_sync, total_physical_size) = tokio::task::spawn_blocking({ let (loaded_layers, to_sync, total_physical_size) = tokio::task::spawn_blocking({
move || { move || {
let _g = span.entered(); let _g = span.entered();
@@ -1656,8 +1666,12 @@ impl Timeline {
); );
} }
let decided = let decided = init::reconcile(
init::reconcile(discovered_layers, index_part.as_ref(), disk_consistent_lsn); discovered_layers,
index_part.as_ref(),
disk_consistent_lsn,
generation,
);
let mut loaded_layers = Vec::new(); let mut loaded_layers = Vec::new();
let mut needs_upload = Vec::new(); let mut needs_upload = Vec::new();
@@ -2669,7 +2683,7 @@ impl Timeline {
( (
HashMap::from([( HashMap::from([(
layer.filename(), layer.filename(),
LayerFileMetadata::new(layer.layer_desc().file_size), LayerFileMetadata::new(layer.layer_desc().file_size, self.generation),
)]), )]),
Some(layer), Some(layer),
) )
@@ -3065,7 +3079,10 @@ impl Timeline {
.metadata() .metadata()
.with_context(|| format!("reading metadata of layer file {}", path.file_name()))?; .with_context(|| format!("reading metadata of layer file {}", path.file_name()))?;
layer_paths_to_upload.insert(path, LayerFileMetadata::new(metadata.len())); layer_paths_to_upload.insert(
path,
LayerFileMetadata::new(metadata.len(), self.generation),
);
self.metrics self.metrics
.resident_physical_size_gauge .resident_physical_size_gauge
@@ -3740,7 +3757,7 @@ impl Timeline {
if let Some(remote_client) = &self.remote_client { if let Some(remote_client) = &self.remote_client {
remote_client.schedule_layer_file_upload( remote_client.schedule_layer_file_upload(
&l.filename(), &l.filename(),
&LayerFileMetadata::new(metadata.len()), &LayerFileMetadata::new(metadata.len(), self.generation),
)?; )?;
} }
@@ -3749,7 +3766,10 @@ impl Timeline {
.resident_physical_size_gauge .resident_physical_size_gauge
.add(metadata.len()); .add(metadata.len());
new_layer_paths.insert(new_delta_path, LayerFileMetadata::new(metadata.len())); new_layer_paths.insert(
new_delta_path,
LayerFileMetadata::new(metadata.len(), self.generation),
);
l.access_stats().record_residence_event( l.access_stats().record_residence_event(
LayerResidenceStatus::Resident, LayerResidenceStatus::Resident,
LayerResidenceEventReason::LayerCreate, LayerResidenceEventReason::LayerCreate,

View File

@@ -7,6 +7,7 @@ use crate::{
index::{IndexPart, LayerFileMetadata}, index::{IndexPart, LayerFileMetadata},
}, },
storage_layer::LayerFileName, storage_layer::LayerFileName,
Generation,
}, },
METADATA_FILE_NAME, METADATA_FILE_NAME,
}; };
@@ -104,6 +105,7 @@ pub(super) fn reconcile(
discovered: Vec<(LayerFileName, u64)>, discovered: Vec<(LayerFileName, u64)>,
index_part: Option<&IndexPart>, index_part: Option<&IndexPart>,
disk_consistent_lsn: Lsn, disk_consistent_lsn: Lsn,
generation: Generation,
) -> Vec<(LayerFileName, Result<Decision, FutureLayer>)> { ) -> Vec<(LayerFileName, Result<Decision, FutureLayer>)> {
use Decision::*; use Decision::*;
@@ -112,7 +114,15 @@ pub(super) fn reconcile(
let mut discovered = discovered let mut discovered = discovered
.into_iter() .into_iter()
.map(|(name, file_size)| (name, (Some(LayerFileMetadata::new(file_size)), None))) .map(|(name, file_size)| {
(
name,
// The generation here will be corrected to match IndexPart in the merge below, unless
// it is not in IndexPart, in which case using our current generation makes sense
// because it will be uploaded in this generation.
(Some(LayerFileMetadata::new(file_size, generation)), None),
)
})
.collect::<Collected>(); .collect::<Collected>();
// merge any index_part information, when available // merge any index_part information, when available
@@ -137,7 +147,11 @@ pub(super) fn reconcile(
Err(FutureLayer { local }) Err(FutureLayer { local })
} else { } else {
Ok(match (local, remote) { Ok(match (local, remote) {
(Some(local), Some(remote)) if local != remote => UseRemote { local, remote }, (Some(local), Some(remote)) if local != remote => {
assert_eq!(local.generation, remote.generation);
UseRemote { local, remote }
}
(Some(x), Some(_)) => UseLocal(x), (Some(x), Some(_)) => UseLocal(x),
(None, Some(x)) => Evicted(x), (None, Some(x)) => Evicted(x),
(Some(x), None) => NeedsUpload(x), (Some(x), None) => NeedsUpload(x),

View File

@@ -1,6 +1,7 @@
use crate::metrics::RemoteOpFileKind; use crate::metrics::RemoteOpFileKind;
use super::storage_layer::LayerFileName; use super::storage_layer::LayerFileName;
use super::Generation;
use crate::tenant::metadata::TimelineMetadata; use crate::tenant::metadata::TimelineMetadata;
use crate::tenant::remote_timeline_client::index::IndexPart; use crate::tenant::remote_timeline_client::index::IndexPart;
use crate::tenant::remote_timeline_client::index::LayerFileMetadata; use crate::tenant::remote_timeline_client::index::LayerFileMetadata;
@@ -205,6 +206,7 @@ pub(crate) struct Delete {
pub(crate) file_kind: RemoteOpFileKind, pub(crate) file_kind: RemoteOpFileKind,
pub(crate) layer_file_name: LayerFileName, pub(crate) layer_file_name: LayerFileName,
pub(crate) scheduled_from_timeline_delete: bool, pub(crate) scheduled_from_timeline_delete: bool,
pub(crate) generation: Generation,
} }
#[derive(Debug)] #[derive(Debug)]
@@ -228,17 +230,21 @@ impl std::fmt::Display for UploadOp {
UploadOp::UploadLayer(path, metadata) => { UploadOp::UploadLayer(path, metadata) => {
write!( write!(
f, f,
"UploadLayer({}, size={:?})", "UploadLayer({}, size={:?}, gen={:?})",
path.file_name(), path.file_name(),
metadata.file_size() metadata.file_size(),
metadata.generation,
) )
} }
UploadOp::UploadMetadata(_, lsn) => write!(f, "UploadMetadata(lsn: {})", lsn), UploadOp::UploadMetadata(_, lsn) => {
write!(f, "UploadMetadata(lsn: {})", lsn)
}
UploadOp::Delete(delete) => write!( UploadOp::Delete(delete) => write!(
f, f,
"Delete(path: {}, scheduled_from_timeline_delete: {})", "Delete(path: {}, scheduled_from_timeline_delete: {}, gen: {:?})",
delete.layer_file_name.file_name(), delete.layer_file_name.file_name(),
delete.scheduled_from_timeline_delete delete.scheduled_from_timeline_delete,
delete.generation
), ),
UploadOp::Barrier(_) => write!(f, "Barrier"), UploadOp::Barrier(_) => write!(f, "Barrier"),
} }