diff --git a/pageserver/src/deletion_queue.rs b/pageserver/src/deletion_queue.rs index 6a732d1029..1e4f2c1528 100644 --- a/pageserver/src/deletion_queue.rs +++ b/pageserver/src/deletion_queue.rs @@ -15,6 +15,7 @@ use crate::virtual_file::VirtualFile; use anyhow::Context; use camino::Utf8PathBuf; use hex::FromHex; +use pageserver_api::shard::ShardIdentity; use remote_storage::{GenericRemoteStorage, RemotePath}; use serde::Deserialize; use serde::Serialize; @@ -300,6 +301,7 @@ impl DeletionList { fn push( &mut self, tenant: &TenantId, + shard: &ShardIdentity, timeline: &TimelineId, generation: Generation, objects: &mut Vec, @@ -326,7 +328,7 @@ impl DeletionList { let timeline_entry = tenant_entry.timelines.entry(*timeline).or_default(); - let timeline_remote_path = remote_timeline_path(tenant, timeline); + let timeline_remote_path = remote_timeline_path(tenant, shard, timeline); self.size += objects.len(); timeline_entry.extend(objects.drain(..).map(|p| { @@ -341,7 +343,9 @@ impl DeletionList { let mut result = Vec::new(); for (tenant, tenant_deletions) in self.tenants.into_iter() { for (timeline, timeline_layers) in tenant_deletions.timelines.into_iter() { - let timeline_remote_path = remote_timeline_path(&tenant, &timeline); + // FIXME: need to update DeletionList definition to store the ShardIdentity for each Tenant + let timeline_remote_path = + remote_timeline_path(&tenant, &ShardIdentity::none(), &timeline); result.extend( timeline_layers .into_iter() @@ -507,6 +511,7 @@ impl DeletionQueueClient { pub(crate) async fn push_layers( &self, tenant_id: TenantId, + shard: &ShardIdentity, timeline_id: TimelineId, current_generation: Generation, layers: Vec<(LayerFileName, Generation)>, @@ -517,6 +522,7 @@ impl DeletionQueueClient { for (layer, generation) in layers { layer_paths.push(remote_layer_path( &tenant_id, + shard, &timeline_id, &layer, generation, @@ -829,7 +835,8 @@ mod test { gen: Generation, ) -> anyhow::Result { let tenant_id = self.harness.tenant_id; - let relative_remote_path = remote_timeline_path(&tenant_id, &TIMELINE_ID); + let relative_remote_path = + remote_timeline_path(&tenant_id, &ShardIdentity::none(), &TIMELINE_ID); let remote_timeline_path = self.remote_fs_dir.join(relative_remote_path.get_path()); std::fs::create_dir_all(&remote_timeline_path)?; let remote_layer_file_name = format!("{}{}", file_name, gen.get_suffix()); @@ -981,7 +988,8 @@ mod test { let tenant_id = ctx.harness.tenant_id; let content: Vec = "victim1 contents".into(); - let relative_remote_path = remote_timeline_path(&tenant_id, &TIMELINE_ID); + let relative_remote_path = + remote_timeline_path(&tenant_id, &ShardIdentity::none(), &TIMELINE_ID); let remote_timeline_path = ctx.remote_fs_dir.join(relative_remote_path.get_path()); let deletion_prefix = ctx.harness.conf.deletion_prefix(); @@ -1010,6 +1018,7 @@ mod test { client .push_layers( tenant_id, + &ShardIdentity::none(), TIMELINE_ID, now_generation, [(layer_file_name_1.clone(), layer_generation)].to_vec(), @@ -1055,7 +1064,8 @@ mod test { ctx.set_latest_generation(latest_generation); let tenant_id = ctx.harness.tenant_id; - let relative_remote_path = remote_timeline_path(&tenant_id, &TIMELINE_ID); + let relative_remote_path = + remote_timeline_path(&tenant_id, &ShardIdentity::none(), &TIMELINE_ID); let remote_timeline_path = ctx.remote_fs_dir.join(relative_remote_path.get_path()); // Initial state: a remote layer exists @@ -1066,6 +1076,7 @@ mod test { client .push_layers( tenant_id, + &ShardIdentity::none(), TIMELINE_ID, stale_generation, [(EXAMPLE_LAYER_NAME.clone(), layer_generation)].to_vec(), @@ -1081,6 +1092,7 @@ mod test { client .push_layers( tenant_id, + &ShardIdentity::none(), TIMELINE_ID, latest_generation, [(EXAMPLE_LAYER_NAME.clone(), layer_generation)].to_vec(), @@ -1104,7 +1116,8 @@ mod test { let tenant_id = ctx.harness.tenant_id; - let relative_remote_path = remote_timeline_path(&tenant_id, &TIMELINE_ID); + let relative_remote_path = + remote_timeline_path(&tenant_id, &ShardIdentity::none(), &TIMELINE_ID); let remote_timeline_path = ctx.remote_fs_dir.join(relative_remote_path.get_path()); let deletion_prefix = ctx.harness.conf.deletion_prefix(); @@ -1119,6 +1132,7 @@ mod test { client .push_layers( tenant_id, + &ShardIdentity::none(), TIMELINE_ID, now_generation.previous(), [(EXAMPLE_LAYER_NAME.clone(), layer_generation)].to_vec(), @@ -1133,6 +1147,7 @@ mod test { client .push_layers( tenant_id, + &ShardIdentity::none(), TIMELINE_ID, now_generation, [(EXAMPLE_LAYER_NAME_ALT.clone(), layer_generation)].to_vec(), @@ -1228,6 +1243,7 @@ pub(crate) mod mock { for (layer, generation) in op.layers { objects.push(remote_layer_path( &op.tenant_id, + &ShardIdentity::none(), &op.timeline_id, &layer, generation, diff --git a/pageserver/src/deletion_queue/list_writer.rs b/pageserver/src/deletion_queue/list_writer.rs index 28daae2da5..c5837254c5 100644 --- a/pageserver/src/deletion_queue/list_writer.rs +++ b/pageserver/src/deletion_queue/list_writer.rs @@ -19,6 +19,7 @@ use std::collections::HashMap; use std::fs::create_dir_all; use std::time::Duration; +use pageserver_api::shard::ShardIdentity; use regex::Regex; use remote_storage::RemotePath; use tokio_util::sync::CancellationToken; @@ -390,6 +391,8 @@ impl ListWriter { for (layer, generation) in op.layers { layer_paths.push(remote_layer_path( &op.tenant_id, + // TODO: store shard in deletion list + &ShardIdentity::none(), &op.timeline_id, &layer, generation, @@ -399,6 +402,8 @@ impl ListWriter { if !self.pending.push( &op.tenant_id, + // TODO: store shard in deletion list + &ShardIdentity::none(), &op.timeline_id, op.generation, &mut layer_paths, @@ -406,6 +411,8 @@ impl ListWriter { self.flush().await; let retry_succeeded = self.pending.push( &op.tenant_id, + // TODO: store shard in deletion list + &ShardIdentity::none(), &op.timeline_id, op.generation, &mut layer_paths, diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index 585a08e155..ea36973e15 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -688,9 +688,11 @@ impl Tenant { // Get list of remote timelines // download index files for every tenant timeline info!("listing remote timelines"); + let shard = self.tenant_conf.read().unwrap().shard.clone(); let (remote_timeline_ids, other_keys) = remote_timeline_client::list_remote_timelines( remote_storage, self.tenant_id, + &shard, cancel.clone(), ) .await?; @@ -1151,6 +1153,7 @@ impl Tenant { self.deletion_queue_client.clone(), self.conf, self.tenant_id, + self.tenant_conf.read().unwrap().shard.clone(), timeline_id, self.generation, ); @@ -2991,6 +2994,7 @@ impl Tenant { self.deletion_queue_client.clone(), self.conf, self.tenant_id, + self.tenant_conf.read().unwrap().shard.clone(), timeline_id, self.generation, ); diff --git a/pageserver/src/tenant/mgr.rs b/pageserver/src/tenant/mgr.rs index 07d1618272..942e122c2c 100644 --- a/pageserver/src/tenant/mgr.rs +++ b/pageserver/src/tenant/mgr.rs @@ -771,7 +771,10 @@ impl TenantManager { new_location_config: LocationConf, ctx: &RequestContext, ) -> Result<(), anyhow::Error> { - info!("configuring tenant location {tenant_id} to state {new_location_config:?}"); + info!( + "configuring tenant location {tenant_id} {} to state {new_location_config:?}", + new_location_config.shard.slug() + ); // Special case fast-path for updates to Tenant: if our upsert is only updating configuration, // then we do not need to set the slot to InProgress, we can just call into the diff --git a/pageserver/src/tenant/remote_timeline_client.rs b/pageserver/src/tenant/remote_timeline_client.rs index bbf6a0c5c5..da32f6558d 100644 --- a/pageserver/src/tenant/remote_timeline_client.rs +++ b/pageserver/src/tenant/remote_timeline_client.rs @@ -188,6 +188,7 @@ use anyhow::Context; use camino::Utf8Path; use chrono::{NaiveDateTime, Utc}; +use pageserver_api::shard::ShardIdentity; use scopeguard::ScopeGuard; use tokio_util::sync::CancellationToken; use utils::backoff::{ @@ -298,6 +299,7 @@ pub struct RemoteTimelineClient { runtime: tokio::runtime::Handle, tenant_id: TenantId, + shard: ShardIdentity, timeline_id: TimelineId, generation: Generation, @@ -322,9 +324,12 @@ impl RemoteTimelineClient { deletion_queue_client: DeletionQueueClient, conf: &'static PageServerConf, tenant_id: TenantId, + shard: ShardIdentity, timeline_id: TimelineId, generation: Generation, ) -> RemoteTimelineClient { + tracing::info!("RemoteTimelineClient::new shard={}", shard.slug()); + RemoteTimelineClient { conf, runtime: if cfg!(test) { @@ -334,6 +339,7 @@ impl RemoteTimelineClient { BACKGROUND_RUNTIME.handle().clone() }, tenant_id, + shard, timeline_id, generation, storage_impl: remote_storage, @@ -461,6 +467,7 @@ impl RemoteTimelineClient { let index_part = download::download_index_part( &self.storage_impl, &self.tenant_id, + &self.shard, &self.timeline_id, self.generation, cancel, @@ -503,6 +510,7 @@ impl RemoteTimelineClient { self.conf, &self.storage_impl, self.tenant_id, + &self.shard, self.timeline_id, layer_file_name, layer_metadata, @@ -893,6 +901,7 @@ impl RemoteTimelineClient { upload::upload_index_part( &self.storage_impl, &self.tenant_id, + &self.shard, &self.timeline_id, self.generation, &index_part_with_deleted_at, @@ -951,6 +960,7 @@ impl RemoteTimelineClient { .map(|(file_name, meta)| { remote_layer_path( &self.tenant_id, + &self.shard, &self.timeline_id, &file_name, meta.generation, @@ -964,7 +974,8 @@ impl RemoteTimelineClient { // Do not delete index part yet, it is needed for possible retry. If we remove it first // and retry will arrive to different pageserver there wont be any traces of it on remote storage - let timeline_storage_path = remote_timeline_path(&self.tenant_id, &self.timeline_id); + let timeline_storage_path = + remote_timeline_path(&self.tenant_id, &self.shard, &self.timeline_id); // Execute all pending deletions, so that when we proceed to do a list_prefixes below, we aren't // taking the burden of listing all the layers that we already know we should delete. @@ -1000,7 +1011,12 @@ impl RemoteTimelineClient { .unwrap_or( // No generation-suffixed indices, assume we are dealing with // a legacy index. - remote_index_path(&self.tenant_id, &self.timeline_id, Generation::none()), + remote_index_path( + &self.tenant_id, + &self.shard, + &self.timeline_id, + Generation::none(), + ), ); let remaining_layers: Vec = remaining @@ -1178,13 +1194,20 @@ impl RemoteTimelineClient { let upload_result: anyhow::Result<()> = match &task.op { UploadOp::UploadLayer(ref layer, ref layer_metadata) => { - let path = layer.local_path(); - upload::upload_timeline_layer( - self.conf, - &self.storage_impl, - path, - layer_metadata, + let remote_path = remote_layer_path( + &self.tenant_id, + &self.shard, + &self.timeline_id, + &layer.layer_desc().filename(), self.generation, + ); + + let local_path = layer.local_path(); + upload::upload_timeline_layer( + &self.storage_impl, + local_path, + remote_path, + layer_metadata, ) .measure_remote_op( self.tenant_id, @@ -1208,6 +1231,7 @@ impl RemoteTimelineClient { let res = upload::upload_index_part( &self.storage_impl, &self.tenant_id, + &self.shard, &self.timeline_id, self.generation, index_part, @@ -1233,6 +1257,7 @@ impl RemoteTimelineClient { .deletion_queue_client .push_layers( self.tenant_id, + &self.shard, self.timeline_id, self.generation, delete.layers.clone(), @@ -1503,24 +1528,33 @@ impl RemoteTimelineClient { } } -pub fn remote_timelines_path(tenant_id: &TenantId) -> RemotePath { - let path = format!("tenants/{tenant_id}/{TIMELINES_SEGMENT_NAME}"); +pub fn remote_timelines_path(tenant_id: &TenantId, shard: &ShardIdentity) -> RemotePath { + let path = format!( + "tenants/{tenant_id}{}/{TIMELINES_SEGMENT_NAME}", + shard.slug() + ); RemotePath::from_string(&path).expect("Failed to construct path") } -pub fn remote_timeline_path(tenant_id: &TenantId, timeline_id: &TimelineId) -> RemotePath { - remote_timelines_path(tenant_id).join(Utf8Path::new(&timeline_id.to_string())) +pub fn remote_timeline_path( + tenant_id: &TenantId, + shard: &ShardIdentity, + timeline_id: &TimelineId, +) -> RemotePath { + remote_timelines_path(tenant_id, shard).join(Utf8Path::new(&timeline_id.to_string())) } pub fn remote_layer_path( tenant_id: &TenantId, + shard: &ShardIdentity, timeline_id: &TimelineId, layer_file_name: &LayerFileName, generation: Generation, ) -> RemotePath { // Generation-aware key format let path = format!( - "tenants/{tenant_id}/{TIMELINES_SEGMENT_NAME}/{timeline_id}/{0}{1}", + "tenants/{tenant_id}{0}/{TIMELINES_SEGMENT_NAME}/{timeline_id}/{1}{2}", + shard.slug(), layer_file_name.file_name(), generation.get_suffix() ); @@ -1530,11 +1564,13 @@ pub fn remote_layer_path( pub fn remote_index_path( tenant_id: &TenantId, + shard: &ShardIdentity, timeline_id: &TimelineId, generation: Generation, ) -> RemotePath { RemotePath::from_string(&format!( - "tenants/{tenant_id}/{TIMELINES_SEGMENT_NAME}/{timeline_id}/{0}{1}", + "tenants/{tenant_id}{0}/{TIMELINES_SEGMENT_NAME}/{timeline_id}/{1}{2}", + shard.slug(), IndexPart::FILE_NAME, generation.get_suffix() )) @@ -1558,29 +1594,6 @@ pub fn parse_remote_index_path(path: RemotePath) -> Option { } } -/// Files on the remote storage are stored with paths, relative to the workdir. -/// That path includes in itself both tenant and timeline ids, allowing to have a unique remote storage path. -/// -/// Errors if the path provided does not start from pageserver's workdir. -pub fn remote_path( - conf: &PageServerConf, - local_path: &Utf8Path, - generation: Generation, -) -> anyhow::Result { - let stripped = local_path - .strip_prefix(&conf.workdir) - .context("Failed to strip workdir prefix")?; - - let suffixed = format!("{0}{1}", stripped, generation.get_suffix()); - - RemotePath::new(Utf8Path::new(&suffixed)).with_context(|| { - format!( - "to resolve remote part of path {:?} for base {:?}", - local_path, conf.workdir - ) - }) -} - #[cfg(test)] mod tests { use super::*; @@ -1677,6 +1690,7 @@ mod tests { conf: self.harness.conf, runtime: tokio::runtime::Handle::current(), tenant_id: self.harness.tenant_id, + shard: ShardIdentity::none(), timeline_id: TIMELINE_ID, generation, storage_impl: self.harness.remote_storage.clone(), @@ -2010,7 +2024,13 @@ mod tests { std::fs::create_dir_all(remote_timeline_dir).expect("creating test dir should work"); let index_path = test_state.harness.remote_fs_dir.join( - remote_index_path(&test_state.harness.tenant_id, &TIMELINE_ID, generation).get_path(), + remote_index_path( + &test_state.harness.tenant_id, + &ShardIdentity::none(), + &TIMELINE_ID, + generation, + ) + .get_path(), ); eprintln!("Writing {index_path}"); std::fs::write(&index_path, index_part_bytes).unwrap(); diff --git a/pageserver/src/tenant/remote_timeline_client/download.rs b/pageserver/src/tenant/remote_timeline_client/download.rs index 6039b01ab8..50b5ffd3a7 100644 --- a/pageserver/src/tenant/remote_timeline_client/download.rs +++ b/pageserver/src/tenant/remote_timeline_client/download.rs @@ -9,6 +9,7 @@ use std::time::Duration; use anyhow::{anyhow, Context}; use camino::Utf8Path; +use pageserver_api::shard::ShardIdentity; use tokio::fs; use tokio::io::AsyncWriteExt; use tokio_util::sync::CancellationToken; @@ -40,6 +41,7 @@ pub async fn download_layer_file<'a>( conf: &'static PageServerConf, storage: &'a GenericRemoteStorage, tenant_id: TenantId, + shard: &ShardIdentity, timeline_id: TimelineId, layer_file_name: &'a LayerFileName, layer_metadata: &'a LayerFileMetadata, @@ -52,6 +54,7 @@ pub async fn download_layer_file<'a>( let remote_path = remote_layer_path( &tenant_id, + shard, &timeline_id, layer_file_name, layer_metadata.generation, @@ -170,9 +173,10 @@ pub fn is_temp_download_file(path: &Utf8Path) -> bool { pub async fn list_remote_timelines( storage: &GenericRemoteStorage, tenant_id: TenantId, + shard: &ShardIdentity, cancel: CancellationToken, ) -> anyhow::Result<(HashSet, HashSet)> { - let remote_path = remote_timelines_path(&tenant_id); + let remote_path = remote_timelines_path(&tenant_id, shard); fail::fail_point!("storage-sync-list-remote-timelines", |_| { anyhow::bail!("storage-sync-list-remote-timelines"); @@ -212,11 +216,12 @@ pub async fn list_remote_timelines( async fn do_download_index_part( storage: &GenericRemoteStorage, tenant_id: &TenantId, + shard: &ShardIdentity, timeline_id: &TimelineId, index_generation: Generation, cancel: CancellationToken, ) -> Result { - let remote_path = remote_index_path(tenant_id, timeline_id, index_generation); + let remote_path = remote_index_path(tenant_id, shard, timeline_id, index_generation); let index_part_bytes = download_retry_forever( || async { @@ -253,6 +258,7 @@ async fn do_download_index_part( pub(super) async fn download_index_part( storage: &GenericRemoteStorage, tenant_id: &TenantId, + shard: &ShardIdentity, timeline_id: &TimelineId, my_generation: Generation, cancel: CancellationToken, @@ -261,8 +267,15 @@ pub(super) async fn download_index_part( if my_generation.is_none() { // Operating without generations: just fetch the generation-less path - return do_download_index_part(storage, tenant_id, timeline_id, my_generation, cancel) - .await; + return do_download_index_part( + storage, + tenant_id, + shard, + timeline_id, + my_generation, + cancel, + ) + .await; } // Stale case: If we were intentionally attached in a stale generation, there may already be a remote @@ -272,6 +285,7 @@ pub(super) async fn download_index_part( let res = do_download_index_part( storage, tenant_id, + shard, timeline_id, my_generation, cancel.clone(), @@ -299,6 +313,7 @@ pub(super) async fn download_index_part( let res = do_download_index_part( storage, tenant_id, + shard, timeline_id, my_generation.previous(), cancel.clone(), @@ -321,7 +336,7 @@ pub(super) async fn download_index_part( // General case/fallback: if there is no index at my_generation or prev_generation, then list all index_part.json // objects, and select the highest one with a generation <= my_generation. - let index_prefix = remote_index_path(tenant_id, timeline_id, Generation::none()); + let index_prefix = remote_index_path(tenant_id, shard, timeline_id, Generation::none()); let indices = backoff::retry( || async { storage.list_files(Some(&index_prefix)).await }, |_| false, @@ -347,14 +362,21 @@ pub(super) async fn download_index_part( match max_previous_generation { Some(g) => { tracing::debug!("Found index_part in generation {g:?}"); - do_download_index_part(storage, tenant_id, timeline_id, g, cancel).await + do_download_index_part(storage, tenant_id, shard, timeline_id, g, cancel).await } None => { // Migration from legacy pre-generation state: we have a generation but no prior // attached pageservers did. Try to load from a no-generation path. tracing::info!("No index_part.json* found"); - do_download_index_part(storage, tenant_id, timeline_id, Generation::none(), cancel) - .await + do_download_index_part( + storage, + tenant_id, + shard, + timeline_id, + Generation::none(), + cancel, + ) + .await } } } diff --git a/pageserver/src/tenant/remote_timeline_client/upload.rs b/pageserver/src/tenant/remote_timeline_client/upload.rs index 0a37a8f283..f66c21f717 100644 --- a/pageserver/src/tenant/remote_timeline_client/upload.rs +++ b/pageserver/src/tenant/remote_timeline_client/upload.rs @@ -3,15 +3,13 @@ use anyhow::{bail, Context}; use camino::Utf8Path; use fail::fail_point; +use pageserver_api::shard::ShardIdentity; use std::io::ErrorKind; use tokio::fs; use super::Generation; -use crate::{ - config::PageServerConf, - tenant::remote_timeline_client::{index::IndexPart, remote_index_path, remote_path}, -}; -use remote_storage::GenericRemoteStorage; +use crate::tenant::remote_timeline_client::{index::IndexPart, remote_index_path}; +use remote_storage::{GenericRemoteStorage, RemotePath}; use utils::id::{TenantId, TimelineId}; use super::index::LayerFileMetadata; @@ -22,6 +20,7 @@ use tracing::info; pub(super) async fn upload_index_part<'a>( storage: &'a GenericRemoteStorage, tenant_id: &TenantId, + shard: &ShardIdentity, timeline_id: &TimelineId, generation: Generation, index_part: &'a IndexPart, @@ -38,7 +37,7 @@ pub(super) async fn upload_index_part<'a>( let index_part_size = index_part_bytes.len(); let index_part_bytes = tokio::io::BufReader::new(std::io::Cursor::new(index_part_bytes)); - let remote_path = remote_index_path(tenant_id, timeline_id, generation); + let remote_path = remote_index_path(tenant_id, shard, timeline_id, generation); storage .upload_storage_object(Box::new(index_part_bytes), index_part_size, &remote_path) .await @@ -50,11 +49,10 @@ pub(super) async fn upload_index_part<'a>( /// /// On an error, bumps the retries count and reschedules the entire task. pub(super) async fn upload_timeline_layer<'a>( - conf: &'static PageServerConf, storage: &'a GenericRemoteStorage, - source_path: &'a Utf8Path, + source_path: &Utf8Path, + remote_path: RemotePath, known_metadata: &'a LayerFileMetadata, - generation: Generation, ) -> anyhow::Result<()> { fail_point!("before-upload-layer", |_| { bail!("failpoint before-upload-layer") @@ -62,7 +60,6 @@ pub(super) async fn upload_timeline_layer<'a>( pausable_failpoint!("before-upload-layer-pausable"); - let storage_path = remote_path(conf, source_path, generation)?; let source_file_res = fs::File::open(&source_path).await; let source_file = match source_file_res { Ok(source_file) => source_file, @@ -97,7 +94,7 @@ pub(super) async fn upload_timeline_layer<'a>( .with_context(|| format!("convert {source_path:?} size {fs_size} usize"))?; storage - .upload(source_file, fs_size, &storage_path, None) + .upload(source_file, fs_size, &remote_path, None) .await .with_context(|| format!("upload layer from local path '{source_path}'"))?; diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index d90758a560..487f55275a 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -1547,7 +1547,6 @@ impl Timeline { .tenant_conf .max_lsn_wal_lag .unwrap_or(self.conf.default_tenant_conf.max_lsn_wal_lag); - let shard = tenant_conf_guard.shard.clone(); drop(tenant_conf_guard); let mut guard = self.walreceiver.lock().unwrap();