diff --git a/libs/pageserver_api/src/shard.rs b/libs/pageserver_api/src/shard.rs index 32a834a26a..16f2d13770 100644 --- a/libs/pageserver_api/src/shard.rs +++ b/libs/pageserver_api/src/shard.rs @@ -139,6 +139,89 @@ impl From<[u8; 18]> for TenantShardId { } } +/// For use within the context of a particular tenant, when we need to know which +/// shard we're dealing with, but do not need to know the full ShardIdentity (because +/// we won't be doing any page->shard mapping), and do not need to know the fully qualified +/// TenantShardId. +#[derive(Eq, PartialEq, PartialOrd, Ord, Clone, Copy)] +pub struct ShardIndex { + pub shard_number: ShardNumber, + pub shard_count: ShardCount, +} + +impl ShardIndex { + pub fn new(number: ShardNumber, count: ShardCount) -> Self { + Self { + shard_number: number, + shard_count: count, + } + } + pub fn unsharded() -> Self { + Self { + shard_number: ShardNumber(0), + shard_count: ShardCount(0), + } + } + + pub fn is_unsharded(&self) -> bool { + self.shard_number == ShardNumber(0) && self.shard_count == ShardCount(0) + } + + /// For use in constructing remote storage paths: concatenate this with a TenantId + /// to get a fully qualified TenantShardId. + /// + /// Backward compat: this function returns an empty string if Self::is_unsharded, such + /// that the legacy pre-sharding remote key format is preserved. + pub fn get_suffix(&self) -> String { + if self.is_unsharded() { + "".to_string() + } else { + format!("-{:02x}{:02x}", self.shard_number.0, self.shard_count.0) + } + } +} + +impl std::fmt::Display for ShardIndex { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{:02x}{:02x}", self.shard_number.0, self.shard_count.0) + } +} + +impl std::fmt::Debug for ShardIndex { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + // Debug is the same as Display: the compact hex representation + write!(f, "{}", self) + } +} + +impl std::str::FromStr for ShardIndex { + type Err = hex::FromHexError; + + fn from_str(s: &str) -> Result { + // Expect format: 1 byte shard number, 1 byte shard count + if s.len() == 4 { + let bytes = s.as_bytes(); + let mut shard_parts: [u8; 2] = [0u8; 2]; + hex::decode_to_slice(bytes, &mut shard_parts)?; + Ok(Self { + shard_number: ShardNumber(shard_parts[0]), + shard_count: ShardCount(shard_parts[1]), + }) + } else { + Err(hex::FromHexError::InvalidStringLength) + } + } +} + +impl From<[u8; 2]> for ShardIndex { + fn from(b: [u8; 2]) -> Self { + Self { + shard_number: ShardNumber(b[0]), + shard_count: ShardCount(b[1]), + } + } +} + impl Serialize for TenantShardId { fn serialize(&self, serializer: S) -> Result where @@ -209,6 +292,77 @@ impl<'de> Deserialize<'de> for TenantShardId { } } +impl Serialize for ShardIndex { + fn serialize(&self, serializer: S) -> Result + where + S: serde::Serializer, + { + if serializer.is_human_readable() { + serializer.collect_str(self) + } else { + // Binary encoding is not used in index_part.json, but is included in anticipation of + // switching various structures (e.g. inter-process communication, remote metadata) to more + // compact binary encodings in future. + let mut packed: [u8; 2] = [0; 2]; + packed[0] = self.shard_number.0; + packed[1] = self.shard_count.0; + packed.serialize(serializer) + } + } +} + +impl<'de> Deserialize<'de> for ShardIndex { + fn deserialize(deserializer: D) -> Result + where + D: serde::Deserializer<'de>, + { + struct IdVisitor { + is_human_readable_deserializer: bool, + } + + impl<'de> serde::de::Visitor<'de> for IdVisitor { + type Value = ShardIndex; + + fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result { + if self.is_human_readable_deserializer { + formatter.write_str("value in form of hex string") + } else { + formatter.write_str("value in form of integer array([u8; 2])") + } + } + + fn visit_seq(self, seq: A) -> Result + where + A: serde::de::SeqAccess<'de>, + { + let s = serde::de::value::SeqAccessDeserializer::new(seq); + let id: [u8; 2] = Deserialize::deserialize(s)?; + Ok(ShardIndex::from(id)) + } + + fn visit_str(self, v: &str) -> Result + where + E: serde::de::Error, + { + ShardIndex::from_str(v).map_err(E::custom) + } + } + + if deserializer.is_human_readable() { + deserializer.deserialize_str(IdVisitor { + is_human_readable_deserializer: true, + }) + } else { + deserializer.deserialize_tuple( + 2, + IdVisitor { + is_human_readable_deserializer: false, + }, + ) + } + } +} + #[cfg(test)] mod tests { use std::str::FromStr; @@ -318,4 +472,35 @@ mod tests { Ok(()) } + + #[test] + fn shard_index_human_encoding() -> Result<(), hex::FromHexError> { + let example = ShardIndex { + shard_number: ShardNumber(13), + shard_count: ShardCount(17), + }; + let expected: String = "0d11".to_string(); + let encoded = format!("{example}"); + assert_eq!(&encoded, &expected); + + let decoded = ShardIndex::from_str(&encoded)?; + assert_eq!(example, decoded); + Ok(()) + } + + #[test] + fn shard_index_binary_encoding() -> Result<(), hex::FromHexError> { + let example = ShardIndex { + shard_number: ShardNumber(13), + shard_count: ShardCount(17), + }; + let expected: [u8; 2] = [0x0d, 0x11]; + + let encoded = bincode::serialize(&example).unwrap(); + assert_eq!(Hex(&encoded), Hex(&expected)); + let decoded = bincode::deserialize(&encoded).unwrap(); + assert_eq!(example, decoded); + + Ok(()) + } } diff --git a/pageserver/src/deletion_queue.rs b/pageserver/src/deletion_queue.rs index 86be1b7094..ad95254a65 100644 --- a/pageserver/src/deletion_queue.rs +++ b/pageserver/src/deletion_queue.rs @@ -10,6 +10,7 @@ use crate::control_plane_client::ControlPlaneGenerationsApi; use crate::metrics; use crate::tenant::remote_timeline_client::remote_layer_path; use crate::tenant::remote_timeline_client::remote_timeline_path; +use crate::tenant::remote_timeline_client::LayerFileMetadata; use crate::virtual_file::MaybeFatalIo; use crate::virtual_file::VirtualFile; use anyhow::Context; @@ -509,18 +510,19 @@ impl DeletionQueueClient { tenant_id: TenantId, timeline_id: TimelineId, current_generation: Generation, - layers: Vec<(LayerFileName, Generation)>, + layers: Vec<(LayerFileName, LayerFileMetadata)>, ) -> Result<(), DeletionQueueError> { if current_generation.is_none() { debug!("Enqueuing deletions in legacy mode, skipping queue"); let mut layer_paths = Vec::new(); - for (layer, generation) in layers { + for (layer, meta) in layers { layer_paths.push(remote_layer_path( &tenant_id, &timeline_id, + meta.shard, &layer, - generation, + meta.generation, )); } self.push_immediate(layer_paths).await?; @@ -540,7 +542,7 @@ impl DeletionQueueClient { tenant_id: TenantId, timeline_id: TimelineId, current_generation: Generation, - layers: Vec<(LayerFileName, Generation)>, + layers: Vec<(LayerFileName, LayerFileMetadata)>, ) -> Result<(), DeletionQueueError> { metrics::DELETION_QUEUE .keys_submitted @@ -751,6 +753,7 @@ impl DeletionQueue { mod test { use camino::Utf8Path; use hex_literal::hex; + use pageserver_api::shard::ShardIndex; use std::{io::ErrorKind, time::Duration}; use tracing::info; @@ -990,6 +993,8 @@ mod test { // we delete, and the generation of the running Tenant. let layer_generation = Generation::new(0xdeadbeef); let now_generation = Generation::new(0xfeedbeef); + let layer_metadata = + LayerFileMetadata::new(0xf00, layer_generation, ShardIndex::unsharded()); let remote_layer_file_name_1 = format!("{}{}", layer_file_name_1, layer_generation.get_suffix()); @@ -1013,7 +1018,7 @@ mod test { tenant_id, TIMELINE_ID, now_generation, - [(layer_file_name_1.clone(), layer_generation)].to_vec(), + [(layer_file_name_1.clone(), layer_metadata)].to_vec(), ) .await?; assert_remote_files(&[&remote_layer_file_name_1], &remote_timeline_path); @@ -1052,6 +1057,8 @@ mod test { let stale_generation = latest_generation.previous(); // Generation that our example layer file was written with let layer_generation = stale_generation.previous(); + let layer_metadata = + LayerFileMetadata::new(0xf00, layer_generation, ShardIndex::unsharded()); ctx.set_latest_generation(latest_generation); @@ -1069,7 +1076,7 @@ mod test { tenant_id, TIMELINE_ID, stale_generation, - [(EXAMPLE_LAYER_NAME.clone(), layer_generation)].to_vec(), + [(EXAMPLE_LAYER_NAME.clone(), layer_metadata.clone())].to_vec(), ) .await?; @@ -1084,7 +1091,7 @@ mod test { tenant_id, TIMELINE_ID, latest_generation, - [(EXAMPLE_LAYER_NAME.clone(), layer_generation)].to_vec(), + [(EXAMPLE_LAYER_NAME.clone(), layer_metadata.clone())].to_vec(), ) .await?; @@ -1111,6 +1118,8 @@ mod test { let layer_generation = Generation::new(0xdeadbeef); let now_generation = Generation::new(0xfeedbeef); + let layer_metadata = + LayerFileMetadata::new(0xf00, layer_generation, ShardIndex::unsharded()); // Inject a deletion in the generation before generation_now: after restart, // this deletion should _not_ get executed (only the immediately previous @@ -1122,7 +1131,7 @@ mod test { tenant_id, TIMELINE_ID, now_generation.previous(), - [(EXAMPLE_LAYER_NAME.clone(), layer_generation)].to_vec(), + [(EXAMPLE_LAYER_NAME.clone(), layer_metadata.clone())].to_vec(), ) .await?; @@ -1136,7 +1145,7 @@ mod test { tenant_id, TIMELINE_ID, now_generation, - [(EXAMPLE_LAYER_NAME_ALT.clone(), layer_generation)].to_vec(), + [(EXAMPLE_LAYER_NAME_ALT.clone(), layer_metadata.clone())].to_vec(), ) .await?; @@ -1226,12 +1235,13 @@ pub(crate) mod mock { match msg { ListWriterQueueMessage::Delete(op) => { let mut objects = op.objects; - for (layer, generation) in op.layers { + for (layer, meta) in op.layers { objects.push(remote_layer_path( &op.tenant_id, &op.timeline_id, + meta.shard, &layer, - generation, + meta.generation, )); } diff --git a/pageserver/src/deletion_queue/list_writer.rs b/pageserver/src/deletion_queue/list_writer.rs index 28daae2da5..5d52b680e4 100644 --- a/pageserver/src/deletion_queue/list_writer.rs +++ b/pageserver/src/deletion_queue/list_writer.rs @@ -33,6 +33,7 @@ use crate::config::PageServerConf; use crate::deletion_queue::TEMP_SUFFIX; use crate::metrics; use crate::tenant::remote_timeline_client::remote_layer_path; +use crate::tenant::remote_timeline_client::LayerFileMetadata; use crate::tenant::storage_layer::LayerFileName; use crate::virtual_file::on_fatal_io_error; use crate::virtual_file::MaybeFatalIo; @@ -58,7 +59,7 @@ pub(super) struct DeletionOp { // `layers` and `objects` are both just lists of objects. `layers` is used if you do not // have a config object handy to project it to a remote key, and need the consuming worker // to do it for you. - pub(super) layers: Vec<(LayerFileName, Generation)>, + pub(super) layers: Vec<(LayerFileName, LayerFileMetadata)>, pub(super) objects: Vec, /// The _current_ generation of the Tenant attachment in which we are enqueuing @@ -387,12 +388,13 @@ impl ListWriter { ); let mut layer_paths = Vec::new(); - for (layer, generation) in op.layers { + for (layer, meta) in op.layers { layer_paths.push(remote_layer_path( &op.tenant_id, &op.timeline_id, + meta.shard, &layer, - generation, + meta.generation, )); } layer_paths.extend(op.objects); diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index 7fbf538e2f..a4d61f0951 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -3468,6 +3468,7 @@ pub async fn dump_layerfile_from_path( pub(crate) mod harness { use bytes::{Bytes, BytesMut}; use once_cell::sync::OnceCell; + use pageserver_api::shard::ShardIndex; use std::fs; use std::sync::Arc; use utils::logging; @@ -3534,6 +3535,7 @@ pub(crate) mod harness { pub tenant_conf: TenantConf, pub tenant_id: TenantId, pub generation: Generation, + pub shard: ShardIndex, pub remote_storage: GenericRemoteStorage, pub remote_fs_dir: Utf8PathBuf, pub deletion_queue: MockDeletionQueue, @@ -3593,6 +3595,7 @@ pub(crate) mod harness { tenant_conf, tenant_id, generation: Generation::new(0xdeadbeef), + shard: ShardIndex::unsharded(), remote_storage, remote_fs_dir, deletion_queue, diff --git a/pageserver/src/tenant/remote_timeline_client.rs b/pageserver/src/tenant/remote_timeline_client.rs index 99d9783f73..962a0fa795 100644 --- a/pageserver/src/tenant/remote_timeline_client.rs +++ b/pageserver/src/tenant/remote_timeline_client.rs @@ -188,6 +188,7 @@ use anyhow::Context; use camino::Utf8Path; use chrono::{NaiveDateTime, Utc}; +use pageserver_api::shard::ShardIndex; use scopeguard::ScopeGuard; use tokio_util::sync::CancellationToken; pub(crate) use upload::upload_initdb_dir; @@ -402,6 +403,11 @@ impl RemoteTimelineClient { Ok(()) } + pub(crate) fn get_shard_index(&self) -> ShardIndex { + // TODO: carry this on the struct + ShardIndex::unsharded() + } + pub fn remote_consistent_lsn_projected(&self) -> Option { match &mut *self.upload_queue.lock().unwrap() { UploadQueue::Uninitialized => None, @@ -465,6 +471,7 @@ impl RemoteTimelineClient { &self.storage_impl, &self.tenant_id, &self.timeline_id, + self.get_shard_index(), self.generation, cancel, ) @@ -657,10 +664,10 @@ impl RemoteTimelineClient { let mut guard = self.upload_queue.lock().unwrap(); let upload_queue = guard.initialized_mut()?; - let with_generations = + let with_metadata = self.schedule_unlinking_of_layers_from_index_part0(upload_queue, names.iter().cloned()); - self.schedule_deletion_of_unlinked0(upload_queue, with_generations); + self.schedule_deletion_of_unlinked0(upload_queue, with_metadata); // Launch the tasks immediately, if possible self.launch_queued_tasks(upload_queue); @@ -695,7 +702,7 @@ impl RemoteTimelineClient { self: &Arc, upload_queue: &mut UploadQueueInitialized, names: I, - ) -> Vec<(LayerFileName, Generation)> + ) -> Vec<(LayerFileName, LayerFileMetadata)> where I: IntoIterator, { @@ -703,16 +710,17 @@ impl RemoteTimelineClient { // so we don't need update it. Just serialize it. let metadata = upload_queue.latest_metadata.clone(); - // Decorate our list of names with each name's generation, dropping - // names that are unexpectedly missing from our metadata. - let with_generations: Vec<_> = names + // Decorate our list of names with each name's metadata, dropping + // names that are unexpectedly missing from our metadata. This metadata + // is later used when physically deleting layers, to construct key paths. + let with_metadata: Vec<_> = names .into_iter() .filter_map(|name| { let meta = upload_queue.latest_files.remove(&name); if let Some(meta) = meta { upload_queue.latest_files_changes_since_metadata_upload_scheduled += 1; - Some((name, meta.generation)) + Some((name, meta)) } else { // This can only happen if we forgot to to schedule the file upload // before scheduling the delete. Log it because it is a rare/strange @@ -725,9 +733,10 @@ impl RemoteTimelineClient { .collect(); #[cfg(feature = "testing")] - for (name, gen) in &with_generations { - if let Some(unexpected) = upload_queue.dangling_files.insert(name.to_owned(), *gen) { - if &unexpected == gen { + for (name, metadata) in &with_metadata { + let gen = metadata.generation; + if let Some(unexpected) = upload_queue.dangling_files.insert(name.to_owned(), gen) { + if unexpected == gen { tracing::error!("{name} was unlinked twice with same generation"); } else { tracing::error!("{name} was unlinked twice with different generations {gen:?} and {unexpected:?}"); @@ -742,14 +751,14 @@ impl RemoteTimelineClient { self.schedule_index_upload(upload_queue, metadata); } - with_generations + with_metadata } /// Schedules deletion for layer files which have previously been unlinked from the /// `index_part.json` with [`Self::schedule_gc_update`] or [`Self::schedule_compaction_update`]. pub(crate) fn schedule_deletion_of_unlinked( self: &Arc, - layers: Vec<(LayerFileName, Generation)>, + layers: Vec<(LayerFileName, LayerFileMetadata)>, ) -> anyhow::Result<()> { let mut guard = self.upload_queue.lock().unwrap(); let upload_queue = guard.initialized_mut()?; @@ -762,16 +771,22 @@ impl RemoteTimelineClient { fn schedule_deletion_of_unlinked0( self: &Arc, upload_queue: &mut UploadQueueInitialized, - with_generations: Vec<(LayerFileName, Generation)>, + with_metadata: Vec<(LayerFileName, LayerFileMetadata)>, ) { - for (name, gen) in &with_generations { - info!("scheduling deletion of layer {}{}", name, gen.get_suffix()); + for (name, meta) in &with_metadata { + info!( + "scheduling deletion of layer {}{} (shard {})", + name, + meta.generation.get_suffix(), + meta.shard + ); } #[cfg(feature = "testing")] - for (name, gen) in &with_generations { + for (name, meta) in &with_metadata { + let gen = meta.generation; match upload_queue.dangling_files.remove(name) { - Some(same) if &same == gen => { /* expected */ } + Some(same) if same == gen => { /* expected */ } Some(other) => { tracing::error!("{name} was unlinked with {other:?} but deleted with {gen:?}"); } @@ -783,7 +798,7 @@ impl RemoteTimelineClient { // schedule the actual deletions let op = UploadOp::Delete(Delete { - layers: with_generations, + layers: with_metadata, }); self.calls_unfinished_metric_begin(&op); upload_queue.queued_operations.push_back(op); @@ -904,6 +919,7 @@ impl RemoteTimelineClient { &self.storage_impl, &self.tenant_id, &self.timeline_id, + self.get_shard_index(), self.generation, &index_part_with_deleted_at, ) @@ -962,6 +978,7 @@ impl RemoteTimelineClient { remote_layer_path( &self.tenant_id, &self.timeline_id, + meta.shard, &file_name, meta.generation, ) @@ -1010,7 +1027,12 @@ impl RemoteTimelineClient { .unwrap_or( // No generation-suffixed indices, assume we are dealing with // a legacy index. - remote_index_path(&self.tenant_id, &self.timeline_id, Generation::none()), + remote_index_path( + &self.tenant_id, + &self.timeline_id, + self.get_shard_index(), + Generation::none(), + ), ); let remaining_layers: Vec = remaining @@ -1219,6 +1241,7 @@ impl RemoteTimelineClient { &self.storage_impl, &self.tenant_id, &self.timeline_id, + self.get_shard_index(), self.generation, index_part, ) @@ -1527,12 +1550,14 @@ pub fn remote_timeline_path(tenant_id: &TenantId, timeline_id: &TimelineId) -> R pub fn remote_layer_path( tenant_id: &TenantId, timeline_id: &TimelineId, + shard: ShardIndex, layer_file_name: &LayerFileName, generation: Generation, ) -> RemotePath { // Generation-aware key format let path = format!( - "tenants/{tenant_id}/{TIMELINES_SEGMENT_NAME}/{timeline_id}/{0}{1}", + "tenants/{tenant_id}{0}/{TIMELINES_SEGMENT_NAME}/{timeline_id}/{1}{2}", + shard.get_suffix(), layer_file_name.file_name(), generation.get_suffix() ); @@ -1550,10 +1575,12 @@ pub fn remote_initdb_archive_path(tenant_id: &TenantId, timeline_id: &TimelineId pub fn remote_index_path( tenant_id: &TenantId, timeline_id: &TimelineId, + shard: ShardIndex, generation: Generation, ) -> RemotePath { RemotePath::from_string(&format!( - "tenants/{tenant_id}/{TIMELINES_SEGMENT_NAME}/{timeline_id}/{0}{1}", + "tenants/{tenant_id}{0}/{TIMELINES_SEGMENT_NAME}/{timeline_id}/{1}{2}", + shard.get_suffix(), IndexPart::FILE_NAME, generation.get_suffix() )) @@ -1778,6 +1805,7 @@ mod tests { println!("remote_timeline_dir: {remote_timeline_dir}"); let generation = harness.generation; + let shard = harness.shard; // Create a couple of dummy files, schedule upload for them @@ -1794,7 +1822,7 @@ mod tests { harness.conf, &timeline, name, - LayerFileMetadata::new(contents.len() as u64, generation), + LayerFileMetadata::new(contents.len() as u64, generation, shard), ) }).collect::>(); @@ -1943,7 +1971,7 @@ mod tests { harness.conf, &timeline, layer_file_name_1.clone(), - LayerFileMetadata::new(content_1.len() as u64, harness.generation), + LayerFileMetadata::new(content_1.len() as u64, harness.generation, harness.shard), ); #[derive(Debug, PartialEq, Clone, Copy)] @@ -2008,7 +2036,11 @@ mod tests { assert_eq!(actual_c, expected_c); } - async fn inject_index_part(test_state: &TestSetup, generation: Generation) -> IndexPart { + async fn inject_index_part( + test_state: &TestSetup, + generation: Generation, + shard: ShardIndex, + ) -> IndexPart { // An empty IndexPart, just sufficient to ensure deserialization will succeed let example_metadata = TimelineMetadata::example(); let example_index_part = IndexPart::new( @@ -2029,7 +2061,13 @@ mod tests { std::fs::create_dir_all(remote_timeline_dir).expect("creating test dir should work"); let index_path = test_state.harness.remote_fs_dir.join( - remote_index_path(&test_state.harness.tenant_id, &TIMELINE_ID, generation).get_path(), + remote_index_path( + &test_state.harness.tenant_id, + &TIMELINE_ID, + shard, + generation, + ) + .get_path(), ); eprintln!("Writing {index_path}"); std::fs::write(&index_path, index_part_bytes).unwrap(); @@ -2066,7 +2104,12 @@ mod tests { // Simple case: we are in generation N, load the index from generation N - 1 let generation_n = 5; - let injected = inject_index_part(&test_state, Generation::new(generation_n - 1)).await; + let injected = inject_index_part( + &test_state, + Generation::new(generation_n - 1), + ShardIndex::unsharded(), + ) + .await; assert_got_index_part(&test_state, Generation::new(generation_n), &injected).await; @@ -2084,22 +2127,34 @@ mod tests { // A generation-less IndexPart exists in the bucket, we should find it let generation_n = 5; - let injected_none = inject_index_part(&test_state, Generation::none()).await; + let injected_none = + inject_index_part(&test_state, Generation::none(), ShardIndex::unsharded()).await; assert_got_index_part(&test_state, Generation::new(generation_n), &injected_none).await; // If a more recent-than-none generation exists, we should prefer to load that - let injected_1 = inject_index_part(&test_state, Generation::new(1)).await; + let injected_1 = + inject_index_part(&test_state, Generation::new(1), ShardIndex::unsharded()).await; assert_got_index_part(&test_state, Generation::new(generation_n), &injected_1).await; // If a more-recent-than-me generation exists, we should ignore it. - let _injected_10 = inject_index_part(&test_state, Generation::new(10)).await; + let _injected_10 = + inject_index_part(&test_state, Generation::new(10), ShardIndex::unsharded()).await; assert_got_index_part(&test_state, Generation::new(generation_n), &injected_1).await; // If a directly previous generation exists, _and_ an index exists in my own // generation, I should prefer my own generation. - let _injected_prev = - inject_index_part(&test_state, Generation::new(generation_n - 1)).await; - let injected_current = inject_index_part(&test_state, Generation::new(generation_n)).await; + let _injected_prev = inject_index_part( + &test_state, + Generation::new(generation_n - 1), + ShardIndex::unsharded(), + ) + .await; + let injected_current = inject_index_part( + &test_state, + Generation::new(generation_n), + ShardIndex::unsharded(), + ) + .await; assert_got_index_part( &test_state, Generation::new(generation_n), diff --git a/pageserver/src/tenant/remote_timeline_client/download.rs b/pageserver/src/tenant/remote_timeline_client/download.rs index 6039b01ab8..3b2cb5b599 100644 --- a/pageserver/src/tenant/remote_timeline_client/download.rs +++ b/pageserver/src/tenant/remote_timeline_client/download.rs @@ -9,6 +9,7 @@ use std::time::Duration; use anyhow::{anyhow, Context}; use camino::Utf8Path; +use pageserver_api::shard::ShardIndex; use tokio::fs; use tokio::io::AsyncWriteExt; use tokio_util::sync::CancellationToken; @@ -53,6 +54,7 @@ pub async fn download_layer_file<'a>( let remote_path = remote_layer_path( &tenant_id, &timeline_id, + layer_metadata.shard, layer_file_name, layer_metadata.generation, ); @@ -213,10 +215,11 @@ async fn do_download_index_part( storage: &GenericRemoteStorage, tenant_id: &TenantId, timeline_id: &TimelineId, + shard: ShardIndex, index_generation: Generation, cancel: CancellationToken, ) -> Result { - let remote_path = remote_index_path(tenant_id, timeline_id, index_generation); + let remote_path = remote_index_path(tenant_id, timeline_id, shard, index_generation); let index_part_bytes = download_retry_forever( || async { @@ -254,6 +257,7 @@ pub(super) async fn download_index_part( storage: &GenericRemoteStorage, tenant_id: &TenantId, timeline_id: &TimelineId, + shard: ShardIndex, my_generation: Generation, cancel: CancellationToken, ) -> Result { @@ -261,8 +265,15 @@ pub(super) async fn download_index_part( if my_generation.is_none() { // Operating without generations: just fetch the generation-less path - return do_download_index_part(storage, tenant_id, timeline_id, my_generation, cancel) - .await; + return do_download_index_part( + storage, + tenant_id, + timeline_id, + shard, + my_generation, + cancel, + ) + .await; } // Stale case: If we were intentionally attached in a stale generation, there may already be a remote @@ -273,6 +284,7 @@ pub(super) async fn download_index_part( storage, tenant_id, timeline_id, + shard, my_generation, cancel.clone(), ) @@ -300,6 +312,7 @@ pub(super) async fn download_index_part( storage, tenant_id, timeline_id, + shard, my_generation.previous(), cancel.clone(), ) @@ -320,8 +333,9 @@ pub(super) async fn download_index_part( } // General case/fallback: if there is no index at my_generation or prev_generation, then list all index_part.json - // objects, and select the highest one with a generation <= my_generation. - let index_prefix = remote_index_path(tenant_id, timeline_id, Generation::none()); + // objects, and select the highest one with a generation <= my_generation. Constructing the prefix is equivalent + // to constructing a full index path with no generation, because the generation is a suffix. + let index_prefix = remote_index_path(tenant_id, timeline_id, shard, Generation::none()); let indices = backoff::retry( || async { storage.list_files(Some(&index_prefix)).await }, |_| false, @@ -347,14 +361,21 @@ pub(super) async fn download_index_part( match max_previous_generation { Some(g) => { tracing::debug!("Found index_part in generation {g:?}"); - do_download_index_part(storage, tenant_id, timeline_id, g, cancel).await + do_download_index_part(storage, tenant_id, timeline_id, shard, g, cancel).await } None => { // Migration from legacy pre-generation state: we have a generation but no prior // attached pageservers did. Try to load from a no-generation path. tracing::info!("No index_part.json* found"); - do_download_index_part(storage, tenant_id, timeline_id, Generation::none(), cancel) - .await + do_download_index_part( + storage, + tenant_id, + timeline_id, + shard, + Generation::none(), + cancel, + ) + .await } } } diff --git a/pageserver/src/tenant/remote_timeline_client/index.rs b/pageserver/src/tenant/remote_timeline_client/index.rs index 0d0b34365c..0abfdeef02 100644 --- a/pageserver/src/tenant/remote_timeline_client/index.rs +++ b/pageserver/src/tenant/remote_timeline_client/index.rs @@ -12,6 +12,7 @@ use crate::tenant::metadata::TimelineMetadata; use crate::tenant::storage_layer::LayerFileName; use crate::tenant::upload_queue::UploadQueueInitialized; use crate::tenant::Generation; +use pageserver_api::shard::ShardIndex; use utils::lsn::Lsn; @@ -25,6 +26,8 @@ pub struct LayerFileMetadata { file_size: u64, pub(crate) generation: Generation, + + pub(crate) shard: ShardIndex, } impl From<&'_ IndexLayerMetadata> for LayerFileMetadata { @@ -32,15 +35,17 @@ impl From<&'_ IndexLayerMetadata> for LayerFileMetadata { LayerFileMetadata { file_size: other.file_size, generation: other.generation, + shard: other.shard, } } } impl LayerFileMetadata { - pub fn new(file_size: u64, generation: Generation) -> Self { + pub fn new(file_size: u64, generation: Generation, shard: ShardIndex) -> Self { LayerFileMetadata { file_size, generation, + shard, } } @@ -161,6 +166,10 @@ pub struct IndexLayerMetadata { #[serde(default = "Generation::none")] #[serde(skip_serializing_if = "Generation::is_none")] pub generation: Generation, + + #[serde(default = "ShardIndex::unsharded")] + #[serde(skip_serializing_if = "ShardIndex::is_unsharded")] + pub shard: ShardIndex, } impl From for IndexLayerMetadata { @@ -168,6 +177,7 @@ impl From for IndexLayerMetadata { IndexLayerMetadata { file_size: other.file_size, generation: other.generation, + shard: other.shard, } } } @@ -195,13 +205,15 @@ mod tests { layer_metadata: HashMap::from([ ("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9".parse().unwrap(), IndexLayerMetadata { file_size: 25600000, - generation: Generation::none() + generation: Generation::none(), + shard: ShardIndex::unsharded() }), ("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap(), IndexLayerMetadata { // serde_json should always parse this but this might be a double with jq for // example. file_size: 9007199254741001, - generation: Generation::none() + generation: Generation::none(), + shard: ShardIndex::unsharded() }) ]), disk_consistent_lsn: "0/16960E8".parse::().unwrap(), @@ -233,13 +245,15 @@ mod tests { layer_metadata: HashMap::from([ ("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9".parse().unwrap(), IndexLayerMetadata { file_size: 25600000, - generation: Generation::none() + generation: Generation::none(), + shard: ShardIndex::unsharded() }), ("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap(), IndexLayerMetadata { // serde_json should always parse this but this might be a double with jq for // example. file_size: 9007199254741001, - generation: Generation::none() + generation: Generation::none(), + shard: ShardIndex::unsharded() }) ]), disk_consistent_lsn: "0/16960E8".parse::().unwrap(), @@ -272,13 +286,15 @@ mod tests { layer_metadata: HashMap::from([ ("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9".parse().unwrap(), IndexLayerMetadata { file_size: 25600000, - generation: Generation::none() + generation: Generation::none(), + shard: ShardIndex::unsharded() }), ("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap(), IndexLayerMetadata { // serde_json should always parse this but this might be a double with jq for // example. file_size: 9007199254741001, - generation: Generation::none() + generation: Generation::none(), + shard: ShardIndex::unsharded() }) ]), disk_consistent_lsn: "0/16960E8".parse::().unwrap(), @@ -354,19 +370,21 @@ mod tests { layer_metadata: HashMap::from([ ("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9".parse().unwrap(), IndexLayerMetadata { file_size: 25600000, - generation: Generation::none() + generation: Generation::none(), + shard: ShardIndex::unsharded() }), ("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap(), IndexLayerMetadata { // serde_json should always parse this but this might be a double with jq for // example. file_size: 9007199254741001, - generation: Generation::none() + generation: Generation::none(), + shard: ShardIndex::unsharded() }) ]), disk_consistent_lsn: "0/16960E8".parse::().unwrap(), metadata: TimelineMetadata::from_bytes(&[113,11,159,210,0,54,0,4,0,0,0,0,1,105,96,232,1,0,0,0,0,1,105,96,112,0,0,0,0,0,0,0,0,0,0,0,0,0,1,105,96,112,0,0,0,0,1,105,96,112,0,0,0,14,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0]).unwrap(), deleted_at: Some(chrono::NaiveDateTime::parse_from_str( - "2023-07-31T09:00:00.123000000", "%Y-%m-%dT%H:%M:%S.%f").unwrap()) + "2023-07-31T09:00:00.123000000", "%Y-%m-%dT%H:%M:%S.%f").unwrap()), }; let part = IndexPart::from_s3_bytes(example.as_bytes()).unwrap(); diff --git a/pageserver/src/tenant/remote_timeline_client/upload.rs b/pageserver/src/tenant/remote_timeline_client/upload.rs index 4d3e1731dc..789a10cf54 100644 --- a/pageserver/src/tenant/remote_timeline_client/upload.rs +++ b/pageserver/src/tenant/remote_timeline_client/upload.rs @@ -4,6 +4,7 @@ use anyhow::{bail, Context}; use bytes::Bytes; use camino::Utf8Path; use fail::fail_point; +use pageserver_api::shard::ShardIndex; use std::io::ErrorKind; use tokio::fs; @@ -26,6 +27,7 @@ pub(super) async fn upload_index_part<'a>( storage: &'a GenericRemoteStorage, tenant_id: &TenantId, timeline_id: &TimelineId, + shard: ShardIndex, generation: Generation, index_part: &'a IndexPart, ) -> anyhow::Result<()> { @@ -42,7 +44,7 @@ pub(super) async fn upload_index_part<'a>( let index_part_size = index_part_bytes.len(); let index_part_bytes = tokio::io::BufReader::new(std::io::Cursor::new(index_part_bytes)); - let remote_path = remote_index_path(tenant_id, timeline_id, generation); + let remote_path = remote_index_path(tenant_id, timeline_id, shard, generation); storage .upload_storage_object(Box::new(index_part_bytes), index_part_size, &remote_path) .await diff --git a/pageserver/src/tenant/storage_layer/layer.rs b/pageserver/src/tenant/storage_layer/layer.rs index f28f1c9444..703fb7db24 100644 --- a/pageserver/src/tenant/storage_layer/layer.rs +++ b/pageserver/src/tenant/storage_layer/layer.rs @@ -3,6 +3,7 @@ use camino::{Utf8Path, Utf8PathBuf}; use pageserver_api::models::{ HistoricLayerInfo, LayerAccessKind, LayerResidenceEventReason, LayerResidenceStatus, }; +use pageserver_api::shard::ShardIndex; use std::ops::Range; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::sync::{Arc, Weak}; @@ -96,6 +97,7 @@ impl Layer { desc, None, metadata.generation, + metadata.shard, ))); debug_assert!(owner.0.needs_download_blocking().unwrap().is_some()); @@ -136,6 +138,7 @@ impl Layer { desc, Some(inner), metadata.generation, + metadata.shard, ) })); @@ -179,6 +182,7 @@ impl Layer { desc, Some(inner), timeline.generation, + timeline.get_shard_index(), ) })); @@ -426,6 +430,15 @@ struct LayerInner { /// For loaded layers (resident or evicted) this comes from [`LayerFileMetadata::generation`], /// for created layers from [`Timeline::generation`]. generation: Generation, + + /// The shard of this Layer. + /// + /// For layers created in this process, this will always be the [`ShardIndex`] of the + /// current `ShardIdentity`` (TODO: add link once it's introduced). + /// + /// For loaded layers, this may be some other value if the tenant has undergone + /// a shard split since the layer was originally written. + shard: ShardIndex, } impl std::fmt::Display for LayerInner { @@ -459,9 +472,9 @@ impl Drop for LayerInner { let path = std::mem::take(&mut self.path); let file_name = self.layer_desc().filename(); - let gen = self.generation; let file_size = self.layer_desc().file_size; let timeline = self.timeline.clone(); + let meta = self.metadata(); crate::task_mgr::BACKGROUND_RUNTIME.spawn_blocking(move || { let _g = span.entered(); @@ -489,7 +502,7 @@ impl Drop for LayerInner { timeline.metrics.resident_physical_size_sub(file_size); } if let Some(remote_client) = timeline.remote_client.as_ref() { - let res = remote_client.schedule_deletion_of_unlinked(vec![(file_name, gen)]); + let res = remote_client.schedule_deletion_of_unlinked(vec![(file_name, meta)]); if let Err(e) = res { // test_timeline_deletion_with_files_stuck_in_upload_queue is good at @@ -523,6 +536,7 @@ impl LayerInner { desc: PersistentLayerDesc, downloaded: Option>, generation: Generation, + shard: ShardIndex, ) -> Self { let path = conf .timeline_path(&timeline.tenant_id, &timeline.timeline_id) @@ -550,6 +564,7 @@ impl LayerInner { status: tokio::sync::broadcast::channel(1).0, consecutive_failures: AtomicUsize::new(0), generation, + shard, } } @@ -1077,7 +1092,7 @@ impl LayerInner { } fn metadata(&self) -> LayerFileMetadata { - LayerFileMetadata::new(self.desc.file_size, self.generation) + LayerFileMetadata::new(self.desc.file_size, self.generation, self.shard) } } diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 9493ed1c9a..9e204e1258 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -62,6 +62,7 @@ use crate::pgdatadir_mapping::{is_rel_fsm_block_key, is_rel_vm_block_key}; use crate::pgdatadir_mapping::{BlockNumber, CalculateLogicalSizeError}; use crate::tenant::config::{EvictionPolicy, TenantConfOpt}; use pageserver_api::reltag::RelTag; +use pageserver_api::shard::ShardIndex; use postgres_connection::PgConnectionConfig; use postgres_ffi::to_pg_timestamp; @@ -1597,6 +1598,7 @@ impl Timeline { // Copy to move into the task we're about to spawn let generation = self.generation; + let shard = self.get_shard_index(); let this = self.myself.upgrade().expect("&self method holds the arc"); let (loaded_layers, needs_cleanup, total_physical_size) = tokio::task::spawn_blocking({ @@ -1645,6 +1647,7 @@ impl Timeline { index_part.as_ref(), disk_consistent_lsn, generation, + shard, ); let mut loaded_layers = Vec::new(); @@ -4364,6 +4367,11 @@ impl Timeline { resident_layers, } } + + pub(crate) fn get_shard_index(&self) -> ShardIndex { + // TODO: carry this on the struct + ShardIndex::unsharded() + } } type TraversalPathItem = ( diff --git a/pageserver/src/tenant/timeline/init.rs b/pageserver/src/tenant/timeline/init.rs index 96bf847fbb..916ebfc6d9 100644 --- a/pageserver/src/tenant/timeline/init.rs +++ b/pageserver/src/tenant/timeline/init.rs @@ -13,6 +13,7 @@ use crate::{ }; use anyhow::Context; use camino::Utf8Path; +use pageserver_api::shard::ShardIndex; use std::{collections::HashMap, str::FromStr}; use utils::lsn::Lsn; @@ -107,6 +108,7 @@ pub(super) fn reconcile( index_part: Option<&IndexPart>, disk_consistent_lsn: Lsn, generation: Generation, + shard: ShardIndex, ) -> Vec<(LayerFileName, Result)> { use Decision::*; @@ -118,10 +120,13 @@ pub(super) fn reconcile( .map(|(name, file_size)| { ( name, - // The generation here will be corrected to match IndexPart in the merge below, unless + // The generation and shard here will be corrected to match IndexPart in the merge below, unless // it is not in IndexPart, in which case using our current generation makes sense // because it will be uploaded in this generation. - (Some(LayerFileMetadata::new(file_size, generation)), None), + ( + Some(LayerFileMetadata::new(file_size, generation, shard)), + None, + ), ) }) .collect::(); diff --git a/pageserver/src/tenant/upload_queue.rs b/pageserver/src/tenant/upload_queue.rs index b47878aacc..5eaed1d5ca 100644 --- a/pageserver/src/tenant/upload_queue.rs +++ b/pageserver/src/tenant/upload_queue.rs @@ -1,6 +1,5 @@ use super::storage_layer::LayerFileName; use super::storage_layer::ResidentLayer; -use super::Generation; use crate::tenant::metadata::TimelineMetadata; use crate::tenant::remote_timeline_client::index::IndexPart; use crate::tenant::remote_timeline_client::index::LayerFileMetadata; @@ -15,6 +14,9 @@ use utils::lsn::AtomicLsn; use std::sync::atomic::AtomicU32; use utils::lsn::Lsn; +#[cfg(feature = "testing")] +use utils::generation::Generation; + // clippy warns that Uninitialized is much smaller than Initialized, which wastes // memory for Uninitialized variants. Doesn't matter in practice, there are not // that many upload queues in a running pageserver, and most of them are initialized @@ -232,7 +234,7 @@ pub(crate) struct UploadTask { /// for timeline deletion, which skips this queue and goes directly to DeletionQueue. #[derive(Debug)] pub(crate) struct Delete { - pub(crate) layers: Vec<(LayerFileName, Generation)>, + pub(crate) layers: Vec<(LayerFileName, LayerFileMetadata)>, } #[derive(Debug)]