pageserver: add shard indices to layer metadata (#5928)

## Problem

For sharded tenants, the layer keys must include the shard number and
shard count, to disambiguate keys written by different shards in the
same tenant (shard number), and disambiguate layers written before and
after splits (shard count).

Closes: https://github.com/neondatabase/neon/issues/5924

## Summary of changes

There are no functional changes in this PR: everything behaves the same
for the default ShardIndex::unsharded() value. Actual construct of
sharded tenants will come next.

- Add a ShardIndex type: this is just a wrapper for a ShardCount and
ShardNumber. This is a subset of ShardIdentity: whereas ShardIdentity
contains enough information to filter page keys, ShardIndex contains
just enough information to construct a remote key. ShardIndex has a
compact encoding, the same as the shard part of TenantShardId.
- Store the ShardIndex as part of IndexLayerMetadata, if it is set to a
different value than ShardIndex::unsharded.
- Update RemoteTimelineClient and DeletionQueue to construct paths using
the layer metadata. Deletion code paths that previously just passed a
`Generation` now pass a full `LayerFileMetadata` to capture the shard as
well.

Notes to reviewers:
- In deletion code paths, I could have used a (Generation, ShardIndex)
instead of the full LayerFileMetadata. I opted for the full object
partly for brevity, and partly because in future when we add checksums
the deletion code really will care about the full metadata in order to
validate that it is deleting what was intended.
- While ShardIdentity and TenantShardId could both use a ShardIndex, I
find that they read more cleanly as "flat" structs that spell out the
shard count and number field separately. Serialization code would need
writing out by hand anyway, because TenantShardId's serialized form is
not a serde struct-style serialization.
- ShardIndex doesn't _have_ to exist (we could use ShardIdentity
everywhere), but it is a worthwhile optimization, as we will have many
copies of this as part of layer metadata. In future the size difference
betweedn ShardIndex and ShardIdentity may become larger if we implement
more sophisticated key distribution mechanisms (i.e. new values of
ShardIdentity::layout).

---------

Co-authored-by: Christian Schwarz <christian@neon.tech>
This commit is contained in:
John Spray
2023-11-28 11:47:25 +00:00
committed by GitHub
parent 286f34dfce
commit ca469be1cf
12 changed files with 398 additions and 72 deletions

View File

@@ -139,6 +139,89 @@ impl From<[u8; 18]> for TenantShardId {
}
}
/// For use within the context of a particular tenant, when we need to know which
/// shard we're dealing with, but do not need to know the full ShardIdentity (because
/// we won't be doing any page->shard mapping), and do not need to know the fully qualified
/// TenantShardId.
#[derive(Eq, PartialEq, PartialOrd, Ord, Clone, Copy)]
pub struct ShardIndex {
pub shard_number: ShardNumber,
pub shard_count: ShardCount,
}
impl ShardIndex {
pub fn new(number: ShardNumber, count: ShardCount) -> Self {
Self {
shard_number: number,
shard_count: count,
}
}
pub fn unsharded() -> Self {
Self {
shard_number: ShardNumber(0),
shard_count: ShardCount(0),
}
}
pub fn is_unsharded(&self) -> bool {
self.shard_number == ShardNumber(0) && self.shard_count == ShardCount(0)
}
/// For use in constructing remote storage paths: concatenate this with a TenantId
/// to get a fully qualified TenantShardId.
///
/// Backward compat: this function returns an empty string if Self::is_unsharded, such
/// that the legacy pre-sharding remote key format is preserved.
pub fn get_suffix(&self) -> String {
if self.is_unsharded() {
"".to_string()
} else {
format!("-{:02x}{:02x}", self.shard_number.0, self.shard_count.0)
}
}
}
impl std::fmt::Display for ShardIndex {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{:02x}{:02x}", self.shard_number.0, self.shard_count.0)
}
}
impl std::fmt::Debug for ShardIndex {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
// Debug is the same as Display: the compact hex representation
write!(f, "{}", self)
}
}
impl std::str::FromStr for ShardIndex {
type Err = hex::FromHexError;
fn from_str(s: &str) -> Result<Self, Self::Err> {
// Expect format: 1 byte shard number, 1 byte shard count
if s.len() == 4 {
let bytes = s.as_bytes();
let mut shard_parts: [u8; 2] = [0u8; 2];
hex::decode_to_slice(bytes, &mut shard_parts)?;
Ok(Self {
shard_number: ShardNumber(shard_parts[0]),
shard_count: ShardCount(shard_parts[1]),
})
} else {
Err(hex::FromHexError::InvalidStringLength)
}
}
}
impl From<[u8; 2]> for ShardIndex {
fn from(b: [u8; 2]) -> Self {
Self {
shard_number: ShardNumber(b[0]),
shard_count: ShardCount(b[1]),
}
}
}
impl Serialize for TenantShardId {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
@@ -209,6 +292,77 @@ impl<'de> Deserialize<'de> for TenantShardId {
}
}
impl Serialize for ShardIndex {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
if serializer.is_human_readable() {
serializer.collect_str(self)
} else {
// Binary encoding is not used in index_part.json, but is included in anticipation of
// switching various structures (e.g. inter-process communication, remote metadata) to more
// compact binary encodings in future.
let mut packed: [u8; 2] = [0; 2];
packed[0] = self.shard_number.0;
packed[1] = self.shard_count.0;
packed.serialize(serializer)
}
}
}
impl<'de> Deserialize<'de> for ShardIndex {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
struct IdVisitor {
is_human_readable_deserializer: bool,
}
impl<'de> serde::de::Visitor<'de> for IdVisitor {
type Value = ShardIndex;
fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
if self.is_human_readable_deserializer {
formatter.write_str("value in form of hex string")
} else {
formatter.write_str("value in form of integer array([u8; 2])")
}
}
fn visit_seq<A>(self, seq: A) -> Result<Self::Value, A::Error>
where
A: serde::de::SeqAccess<'de>,
{
let s = serde::de::value::SeqAccessDeserializer::new(seq);
let id: [u8; 2] = Deserialize::deserialize(s)?;
Ok(ShardIndex::from(id))
}
fn visit_str<E>(self, v: &str) -> Result<Self::Value, E>
where
E: serde::de::Error,
{
ShardIndex::from_str(v).map_err(E::custom)
}
}
if deserializer.is_human_readable() {
deserializer.deserialize_str(IdVisitor {
is_human_readable_deserializer: true,
})
} else {
deserializer.deserialize_tuple(
2,
IdVisitor {
is_human_readable_deserializer: false,
},
)
}
}
}
#[cfg(test)]
mod tests {
use std::str::FromStr;
@@ -318,4 +472,35 @@ mod tests {
Ok(())
}
#[test]
fn shard_index_human_encoding() -> Result<(), hex::FromHexError> {
let example = ShardIndex {
shard_number: ShardNumber(13),
shard_count: ShardCount(17),
};
let expected: String = "0d11".to_string();
let encoded = format!("{example}");
assert_eq!(&encoded, &expected);
let decoded = ShardIndex::from_str(&encoded)?;
assert_eq!(example, decoded);
Ok(())
}
#[test]
fn shard_index_binary_encoding() -> Result<(), hex::FromHexError> {
let example = ShardIndex {
shard_number: ShardNumber(13),
shard_count: ShardCount(17),
};
let expected: [u8; 2] = [0x0d, 0x11];
let encoded = bincode::serialize(&example).unwrap();
assert_eq!(Hex(&encoded), Hex(&expected));
let decoded = bincode::deserialize(&encoded).unwrap();
assert_eq!(example, decoded);
Ok(())
}
}

View File

@@ -10,6 +10,7 @@ use crate::control_plane_client::ControlPlaneGenerationsApi;
use crate::metrics;
use crate::tenant::remote_timeline_client::remote_layer_path;
use crate::tenant::remote_timeline_client::remote_timeline_path;
use crate::tenant::remote_timeline_client::LayerFileMetadata;
use crate::virtual_file::MaybeFatalIo;
use crate::virtual_file::VirtualFile;
use anyhow::Context;
@@ -509,18 +510,19 @@ impl DeletionQueueClient {
tenant_id: TenantId,
timeline_id: TimelineId,
current_generation: Generation,
layers: Vec<(LayerFileName, Generation)>,
layers: Vec<(LayerFileName, LayerFileMetadata)>,
) -> Result<(), DeletionQueueError> {
if current_generation.is_none() {
debug!("Enqueuing deletions in legacy mode, skipping queue");
let mut layer_paths = Vec::new();
for (layer, generation) in layers {
for (layer, meta) in layers {
layer_paths.push(remote_layer_path(
&tenant_id,
&timeline_id,
meta.shard,
&layer,
generation,
meta.generation,
));
}
self.push_immediate(layer_paths).await?;
@@ -540,7 +542,7 @@ impl DeletionQueueClient {
tenant_id: TenantId,
timeline_id: TimelineId,
current_generation: Generation,
layers: Vec<(LayerFileName, Generation)>,
layers: Vec<(LayerFileName, LayerFileMetadata)>,
) -> Result<(), DeletionQueueError> {
metrics::DELETION_QUEUE
.keys_submitted
@@ -751,6 +753,7 @@ impl DeletionQueue {
mod test {
use camino::Utf8Path;
use hex_literal::hex;
use pageserver_api::shard::ShardIndex;
use std::{io::ErrorKind, time::Duration};
use tracing::info;
@@ -990,6 +993,8 @@ mod test {
// we delete, and the generation of the running Tenant.
let layer_generation = Generation::new(0xdeadbeef);
let now_generation = Generation::new(0xfeedbeef);
let layer_metadata =
LayerFileMetadata::new(0xf00, layer_generation, ShardIndex::unsharded());
let remote_layer_file_name_1 =
format!("{}{}", layer_file_name_1, layer_generation.get_suffix());
@@ -1013,7 +1018,7 @@ mod test {
tenant_id,
TIMELINE_ID,
now_generation,
[(layer_file_name_1.clone(), layer_generation)].to_vec(),
[(layer_file_name_1.clone(), layer_metadata)].to_vec(),
)
.await?;
assert_remote_files(&[&remote_layer_file_name_1], &remote_timeline_path);
@@ -1052,6 +1057,8 @@ mod test {
let stale_generation = latest_generation.previous();
// Generation that our example layer file was written with
let layer_generation = stale_generation.previous();
let layer_metadata =
LayerFileMetadata::new(0xf00, layer_generation, ShardIndex::unsharded());
ctx.set_latest_generation(latest_generation);
@@ -1069,7 +1076,7 @@ mod test {
tenant_id,
TIMELINE_ID,
stale_generation,
[(EXAMPLE_LAYER_NAME.clone(), layer_generation)].to_vec(),
[(EXAMPLE_LAYER_NAME.clone(), layer_metadata.clone())].to_vec(),
)
.await?;
@@ -1084,7 +1091,7 @@ mod test {
tenant_id,
TIMELINE_ID,
latest_generation,
[(EXAMPLE_LAYER_NAME.clone(), layer_generation)].to_vec(),
[(EXAMPLE_LAYER_NAME.clone(), layer_metadata.clone())].to_vec(),
)
.await?;
@@ -1111,6 +1118,8 @@ mod test {
let layer_generation = Generation::new(0xdeadbeef);
let now_generation = Generation::new(0xfeedbeef);
let layer_metadata =
LayerFileMetadata::new(0xf00, layer_generation, ShardIndex::unsharded());
// Inject a deletion in the generation before generation_now: after restart,
// this deletion should _not_ get executed (only the immediately previous
@@ -1122,7 +1131,7 @@ mod test {
tenant_id,
TIMELINE_ID,
now_generation.previous(),
[(EXAMPLE_LAYER_NAME.clone(), layer_generation)].to_vec(),
[(EXAMPLE_LAYER_NAME.clone(), layer_metadata.clone())].to_vec(),
)
.await?;
@@ -1136,7 +1145,7 @@ mod test {
tenant_id,
TIMELINE_ID,
now_generation,
[(EXAMPLE_LAYER_NAME_ALT.clone(), layer_generation)].to_vec(),
[(EXAMPLE_LAYER_NAME_ALT.clone(), layer_metadata.clone())].to_vec(),
)
.await?;
@@ -1226,12 +1235,13 @@ pub(crate) mod mock {
match msg {
ListWriterQueueMessage::Delete(op) => {
let mut objects = op.objects;
for (layer, generation) in op.layers {
for (layer, meta) in op.layers {
objects.push(remote_layer_path(
&op.tenant_id,
&op.timeline_id,
meta.shard,
&layer,
generation,
meta.generation,
));
}

View File

@@ -33,6 +33,7 @@ use crate::config::PageServerConf;
use crate::deletion_queue::TEMP_SUFFIX;
use crate::metrics;
use crate::tenant::remote_timeline_client::remote_layer_path;
use crate::tenant::remote_timeline_client::LayerFileMetadata;
use crate::tenant::storage_layer::LayerFileName;
use crate::virtual_file::on_fatal_io_error;
use crate::virtual_file::MaybeFatalIo;
@@ -58,7 +59,7 @@ pub(super) struct DeletionOp {
// `layers` and `objects` are both just lists of objects. `layers` is used if you do not
// have a config object handy to project it to a remote key, and need the consuming worker
// to do it for you.
pub(super) layers: Vec<(LayerFileName, Generation)>,
pub(super) layers: Vec<(LayerFileName, LayerFileMetadata)>,
pub(super) objects: Vec<RemotePath>,
/// The _current_ generation of the Tenant attachment in which we are enqueuing
@@ -387,12 +388,13 @@ impl ListWriter {
);
let mut layer_paths = Vec::new();
for (layer, generation) in op.layers {
for (layer, meta) in op.layers {
layer_paths.push(remote_layer_path(
&op.tenant_id,
&op.timeline_id,
meta.shard,
&layer,
generation,
meta.generation,
));
}
layer_paths.extend(op.objects);

View File

@@ -3468,6 +3468,7 @@ pub async fn dump_layerfile_from_path(
pub(crate) mod harness {
use bytes::{Bytes, BytesMut};
use once_cell::sync::OnceCell;
use pageserver_api::shard::ShardIndex;
use std::fs;
use std::sync::Arc;
use utils::logging;
@@ -3534,6 +3535,7 @@ pub(crate) mod harness {
pub tenant_conf: TenantConf,
pub tenant_id: TenantId,
pub generation: Generation,
pub shard: ShardIndex,
pub remote_storage: GenericRemoteStorage,
pub remote_fs_dir: Utf8PathBuf,
pub deletion_queue: MockDeletionQueue,
@@ -3593,6 +3595,7 @@ pub(crate) mod harness {
tenant_conf,
tenant_id,
generation: Generation::new(0xdeadbeef),
shard: ShardIndex::unsharded(),
remote_storage,
remote_fs_dir,
deletion_queue,

View File

@@ -188,6 +188,7 @@ use anyhow::Context;
use camino::Utf8Path;
use chrono::{NaiveDateTime, Utc};
use pageserver_api::shard::ShardIndex;
use scopeguard::ScopeGuard;
use tokio_util::sync::CancellationToken;
pub(crate) use upload::upload_initdb_dir;
@@ -402,6 +403,11 @@ impl RemoteTimelineClient {
Ok(())
}
pub(crate) fn get_shard_index(&self) -> ShardIndex {
// TODO: carry this on the struct
ShardIndex::unsharded()
}
pub fn remote_consistent_lsn_projected(&self) -> Option<Lsn> {
match &mut *self.upload_queue.lock().unwrap() {
UploadQueue::Uninitialized => None,
@@ -465,6 +471,7 @@ impl RemoteTimelineClient {
&self.storage_impl,
&self.tenant_id,
&self.timeline_id,
self.get_shard_index(),
self.generation,
cancel,
)
@@ -657,10 +664,10 @@ impl RemoteTimelineClient {
let mut guard = self.upload_queue.lock().unwrap();
let upload_queue = guard.initialized_mut()?;
let with_generations =
let with_metadata =
self.schedule_unlinking_of_layers_from_index_part0(upload_queue, names.iter().cloned());
self.schedule_deletion_of_unlinked0(upload_queue, with_generations);
self.schedule_deletion_of_unlinked0(upload_queue, with_metadata);
// Launch the tasks immediately, if possible
self.launch_queued_tasks(upload_queue);
@@ -695,7 +702,7 @@ impl RemoteTimelineClient {
self: &Arc<Self>,
upload_queue: &mut UploadQueueInitialized,
names: I,
) -> Vec<(LayerFileName, Generation)>
) -> Vec<(LayerFileName, LayerFileMetadata)>
where
I: IntoIterator<Item = LayerFileName>,
{
@@ -703,16 +710,17 @@ impl RemoteTimelineClient {
// so we don't need update it. Just serialize it.
let metadata = upload_queue.latest_metadata.clone();
// Decorate our list of names with each name's generation, dropping
// names that are unexpectedly missing from our metadata.
let with_generations: Vec<_> = names
// Decorate our list of names with each name's metadata, dropping
// names that are unexpectedly missing from our metadata. This metadata
// is later used when physically deleting layers, to construct key paths.
let with_metadata: Vec<_> = names
.into_iter()
.filter_map(|name| {
let meta = upload_queue.latest_files.remove(&name);
if let Some(meta) = meta {
upload_queue.latest_files_changes_since_metadata_upload_scheduled += 1;
Some((name, meta.generation))
Some((name, meta))
} else {
// This can only happen if we forgot to to schedule the file upload
// before scheduling the delete. Log it because it is a rare/strange
@@ -725,9 +733,10 @@ impl RemoteTimelineClient {
.collect();
#[cfg(feature = "testing")]
for (name, gen) in &with_generations {
if let Some(unexpected) = upload_queue.dangling_files.insert(name.to_owned(), *gen) {
if &unexpected == gen {
for (name, metadata) in &with_metadata {
let gen = metadata.generation;
if let Some(unexpected) = upload_queue.dangling_files.insert(name.to_owned(), gen) {
if unexpected == gen {
tracing::error!("{name} was unlinked twice with same generation");
} else {
tracing::error!("{name} was unlinked twice with different generations {gen:?} and {unexpected:?}");
@@ -742,14 +751,14 @@ impl RemoteTimelineClient {
self.schedule_index_upload(upload_queue, metadata);
}
with_generations
with_metadata
}
/// Schedules deletion for layer files which have previously been unlinked from the
/// `index_part.json` with [`Self::schedule_gc_update`] or [`Self::schedule_compaction_update`].
pub(crate) fn schedule_deletion_of_unlinked(
self: &Arc<Self>,
layers: Vec<(LayerFileName, Generation)>,
layers: Vec<(LayerFileName, LayerFileMetadata)>,
) -> anyhow::Result<()> {
let mut guard = self.upload_queue.lock().unwrap();
let upload_queue = guard.initialized_mut()?;
@@ -762,16 +771,22 @@ impl RemoteTimelineClient {
fn schedule_deletion_of_unlinked0(
self: &Arc<Self>,
upload_queue: &mut UploadQueueInitialized,
with_generations: Vec<(LayerFileName, Generation)>,
with_metadata: Vec<(LayerFileName, LayerFileMetadata)>,
) {
for (name, gen) in &with_generations {
info!("scheduling deletion of layer {}{}", name, gen.get_suffix());
for (name, meta) in &with_metadata {
info!(
"scheduling deletion of layer {}{} (shard {})",
name,
meta.generation.get_suffix(),
meta.shard
);
}
#[cfg(feature = "testing")]
for (name, gen) in &with_generations {
for (name, meta) in &with_metadata {
let gen = meta.generation;
match upload_queue.dangling_files.remove(name) {
Some(same) if &same == gen => { /* expected */ }
Some(same) if same == gen => { /* expected */ }
Some(other) => {
tracing::error!("{name} was unlinked with {other:?} but deleted with {gen:?}");
}
@@ -783,7 +798,7 @@ impl RemoteTimelineClient {
// schedule the actual deletions
let op = UploadOp::Delete(Delete {
layers: with_generations,
layers: with_metadata,
});
self.calls_unfinished_metric_begin(&op);
upload_queue.queued_operations.push_back(op);
@@ -904,6 +919,7 @@ impl RemoteTimelineClient {
&self.storage_impl,
&self.tenant_id,
&self.timeline_id,
self.get_shard_index(),
self.generation,
&index_part_with_deleted_at,
)
@@ -962,6 +978,7 @@ impl RemoteTimelineClient {
remote_layer_path(
&self.tenant_id,
&self.timeline_id,
meta.shard,
&file_name,
meta.generation,
)
@@ -1010,7 +1027,12 @@ impl RemoteTimelineClient {
.unwrap_or(
// No generation-suffixed indices, assume we are dealing with
// a legacy index.
remote_index_path(&self.tenant_id, &self.timeline_id, Generation::none()),
remote_index_path(
&self.tenant_id,
&self.timeline_id,
self.get_shard_index(),
Generation::none(),
),
);
let remaining_layers: Vec<RemotePath> = remaining
@@ -1219,6 +1241,7 @@ impl RemoteTimelineClient {
&self.storage_impl,
&self.tenant_id,
&self.timeline_id,
self.get_shard_index(),
self.generation,
index_part,
)
@@ -1527,12 +1550,14 @@ pub fn remote_timeline_path(tenant_id: &TenantId, timeline_id: &TimelineId) -> R
pub fn remote_layer_path(
tenant_id: &TenantId,
timeline_id: &TimelineId,
shard: ShardIndex,
layer_file_name: &LayerFileName,
generation: Generation,
) -> RemotePath {
// Generation-aware key format
let path = format!(
"tenants/{tenant_id}/{TIMELINES_SEGMENT_NAME}/{timeline_id}/{0}{1}",
"tenants/{tenant_id}{0}/{TIMELINES_SEGMENT_NAME}/{timeline_id}/{1}{2}",
shard.get_suffix(),
layer_file_name.file_name(),
generation.get_suffix()
);
@@ -1550,10 +1575,12 @@ pub fn remote_initdb_archive_path(tenant_id: &TenantId, timeline_id: &TimelineId
pub fn remote_index_path(
tenant_id: &TenantId,
timeline_id: &TimelineId,
shard: ShardIndex,
generation: Generation,
) -> RemotePath {
RemotePath::from_string(&format!(
"tenants/{tenant_id}/{TIMELINES_SEGMENT_NAME}/{timeline_id}/{0}{1}",
"tenants/{tenant_id}{0}/{TIMELINES_SEGMENT_NAME}/{timeline_id}/{1}{2}",
shard.get_suffix(),
IndexPart::FILE_NAME,
generation.get_suffix()
))
@@ -1778,6 +1805,7 @@ mod tests {
println!("remote_timeline_dir: {remote_timeline_dir}");
let generation = harness.generation;
let shard = harness.shard;
// Create a couple of dummy files, schedule upload for them
@@ -1794,7 +1822,7 @@ mod tests {
harness.conf,
&timeline,
name,
LayerFileMetadata::new(contents.len() as u64, generation),
LayerFileMetadata::new(contents.len() as u64, generation, shard),
)
}).collect::<Vec<_>>();
@@ -1943,7 +1971,7 @@ mod tests {
harness.conf,
&timeline,
layer_file_name_1.clone(),
LayerFileMetadata::new(content_1.len() as u64, harness.generation),
LayerFileMetadata::new(content_1.len() as u64, harness.generation, harness.shard),
);
#[derive(Debug, PartialEq, Clone, Copy)]
@@ -2008,7 +2036,11 @@ mod tests {
assert_eq!(actual_c, expected_c);
}
async fn inject_index_part(test_state: &TestSetup, generation: Generation) -> IndexPart {
async fn inject_index_part(
test_state: &TestSetup,
generation: Generation,
shard: ShardIndex,
) -> IndexPart {
// An empty IndexPart, just sufficient to ensure deserialization will succeed
let example_metadata = TimelineMetadata::example();
let example_index_part = IndexPart::new(
@@ -2029,7 +2061,13 @@ mod tests {
std::fs::create_dir_all(remote_timeline_dir).expect("creating test dir should work");
let index_path = test_state.harness.remote_fs_dir.join(
remote_index_path(&test_state.harness.tenant_id, &TIMELINE_ID, generation).get_path(),
remote_index_path(
&test_state.harness.tenant_id,
&TIMELINE_ID,
shard,
generation,
)
.get_path(),
);
eprintln!("Writing {index_path}");
std::fs::write(&index_path, index_part_bytes).unwrap();
@@ -2066,7 +2104,12 @@ mod tests {
// Simple case: we are in generation N, load the index from generation N - 1
let generation_n = 5;
let injected = inject_index_part(&test_state, Generation::new(generation_n - 1)).await;
let injected = inject_index_part(
&test_state,
Generation::new(generation_n - 1),
ShardIndex::unsharded(),
)
.await;
assert_got_index_part(&test_state, Generation::new(generation_n), &injected).await;
@@ -2084,22 +2127,34 @@ mod tests {
// A generation-less IndexPart exists in the bucket, we should find it
let generation_n = 5;
let injected_none = inject_index_part(&test_state, Generation::none()).await;
let injected_none =
inject_index_part(&test_state, Generation::none(), ShardIndex::unsharded()).await;
assert_got_index_part(&test_state, Generation::new(generation_n), &injected_none).await;
// If a more recent-than-none generation exists, we should prefer to load that
let injected_1 = inject_index_part(&test_state, Generation::new(1)).await;
let injected_1 =
inject_index_part(&test_state, Generation::new(1), ShardIndex::unsharded()).await;
assert_got_index_part(&test_state, Generation::new(generation_n), &injected_1).await;
// If a more-recent-than-me generation exists, we should ignore it.
let _injected_10 = inject_index_part(&test_state, Generation::new(10)).await;
let _injected_10 =
inject_index_part(&test_state, Generation::new(10), ShardIndex::unsharded()).await;
assert_got_index_part(&test_state, Generation::new(generation_n), &injected_1).await;
// If a directly previous generation exists, _and_ an index exists in my own
// generation, I should prefer my own generation.
let _injected_prev =
inject_index_part(&test_state, Generation::new(generation_n - 1)).await;
let injected_current = inject_index_part(&test_state, Generation::new(generation_n)).await;
let _injected_prev = inject_index_part(
&test_state,
Generation::new(generation_n - 1),
ShardIndex::unsharded(),
)
.await;
let injected_current = inject_index_part(
&test_state,
Generation::new(generation_n),
ShardIndex::unsharded(),
)
.await;
assert_got_index_part(
&test_state,
Generation::new(generation_n),

View File

@@ -9,6 +9,7 @@ use std::time::Duration;
use anyhow::{anyhow, Context};
use camino::Utf8Path;
use pageserver_api::shard::ShardIndex;
use tokio::fs;
use tokio::io::AsyncWriteExt;
use tokio_util::sync::CancellationToken;
@@ -53,6 +54,7 @@ pub async fn download_layer_file<'a>(
let remote_path = remote_layer_path(
&tenant_id,
&timeline_id,
layer_metadata.shard,
layer_file_name,
layer_metadata.generation,
);
@@ -213,10 +215,11 @@ async fn do_download_index_part(
storage: &GenericRemoteStorage,
tenant_id: &TenantId,
timeline_id: &TimelineId,
shard: ShardIndex,
index_generation: Generation,
cancel: CancellationToken,
) -> Result<IndexPart, DownloadError> {
let remote_path = remote_index_path(tenant_id, timeline_id, index_generation);
let remote_path = remote_index_path(tenant_id, timeline_id, shard, index_generation);
let index_part_bytes = download_retry_forever(
|| async {
@@ -254,6 +257,7 @@ pub(super) async fn download_index_part(
storage: &GenericRemoteStorage,
tenant_id: &TenantId,
timeline_id: &TimelineId,
shard: ShardIndex,
my_generation: Generation,
cancel: CancellationToken,
) -> Result<IndexPart, DownloadError> {
@@ -261,8 +265,15 @@ pub(super) async fn download_index_part(
if my_generation.is_none() {
// Operating without generations: just fetch the generation-less path
return do_download_index_part(storage, tenant_id, timeline_id, my_generation, cancel)
.await;
return do_download_index_part(
storage,
tenant_id,
timeline_id,
shard,
my_generation,
cancel,
)
.await;
}
// Stale case: If we were intentionally attached in a stale generation, there may already be a remote
@@ -273,6 +284,7 @@ pub(super) async fn download_index_part(
storage,
tenant_id,
timeline_id,
shard,
my_generation,
cancel.clone(),
)
@@ -300,6 +312,7 @@ pub(super) async fn download_index_part(
storage,
tenant_id,
timeline_id,
shard,
my_generation.previous(),
cancel.clone(),
)
@@ -320,8 +333,9 @@ pub(super) async fn download_index_part(
}
// General case/fallback: if there is no index at my_generation or prev_generation, then list all index_part.json
// objects, and select the highest one with a generation <= my_generation.
let index_prefix = remote_index_path(tenant_id, timeline_id, Generation::none());
// objects, and select the highest one with a generation <= my_generation. Constructing the prefix is equivalent
// to constructing a full index path with no generation, because the generation is a suffix.
let index_prefix = remote_index_path(tenant_id, timeline_id, shard, Generation::none());
let indices = backoff::retry(
|| async { storage.list_files(Some(&index_prefix)).await },
|_| false,
@@ -347,14 +361,21 @@ pub(super) async fn download_index_part(
match max_previous_generation {
Some(g) => {
tracing::debug!("Found index_part in generation {g:?}");
do_download_index_part(storage, tenant_id, timeline_id, g, cancel).await
do_download_index_part(storage, tenant_id, timeline_id, shard, g, cancel).await
}
None => {
// Migration from legacy pre-generation state: we have a generation but no prior
// attached pageservers did. Try to load from a no-generation path.
tracing::info!("No index_part.json* found");
do_download_index_part(storage, tenant_id, timeline_id, Generation::none(), cancel)
.await
do_download_index_part(
storage,
tenant_id,
timeline_id,
shard,
Generation::none(),
cancel,
)
.await
}
}
}

View File

@@ -12,6 +12,7 @@ use crate::tenant::metadata::TimelineMetadata;
use crate::tenant::storage_layer::LayerFileName;
use crate::tenant::upload_queue::UploadQueueInitialized;
use crate::tenant::Generation;
use pageserver_api::shard::ShardIndex;
use utils::lsn::Lsn;
@@ -25,6 +26,8 @@ pub struct LayerFileMetadata {
file_size: u64,
pub(crate) generation: Generation,
pub(crate) shard: ShardIndex,
}
impl From<&'_ IndexLayerMetadata> for LayerFileMetadata {
@@ -32,15 +35,17 @@ impl From<&'_ IndexLayerMetadata> for LayerFileMetadata {
LayerFileMetadata {
file_size: other.file_size,
generation: other.generation,
shard: other.shard,
}
}
}
impl LayerFileMetadata {
pub fn new(file_size: u64, generation: Generation) -> Self {
pub fn new(file_size: u64, generation: Generation, shard: ShardIndex) -> Self {
LayerFileMetadata {
file_size,
generation,
shard,
}
}
@@ -161,6 +166,10 @@ pub struct IndexLayerMetadata {
#[serde(default = "Generation::none")]
#[serde(skip_serializing_if = "Generation::is_none")]
pub generation: Generation,
#[serde(default = "ShardIndex::unsharded")]
#[serde(skip_serializing_if = "ShardIndex::is_unsharded")]
pub shard: ShardIndex,
}
impl From<LayerFileMetadata> for IndexLayerMetadata {
@@ -168,6 +177,7 @@ impl From<LayerFileMetadata> for IndexLayerMetadata {
IndexLayerMetadata {
file_size: other.file_size,
generation: other.generation,
shard: other.shard,
}
}
}
@@ -195,13 +205,15 @@ mod tests {
layer_metadata: HashMap::from([
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9".parse().unwrap(), IndexLayerMetadata {
file_size: 25600000,
generation: Generation::none()
generation: Generation::none(),
shard: ShardIndex::unsharded()
}),
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap(), IndexLayerMetadata {
// serde_json should always parse this but this might be a double with jq for
// example.
file_size: 9007199254741001,
generation: Generation::none()
generation: Generation::none(),
shard: ShardIndex::unsharded()
})
]),
disk_consistent_lsn: "0/16960E8".parse::<Lsn>().unwrap(),
@@ -233,13 +245,15 @@ mod tests {
layer_metadata: HashMap::from([
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9".parse().unwrap(), IndexLayerMetadata {
file_size: 25600000,
generation: Generation::none()
generation: Generation::none(),
shard: ShardIndex::unsharded()
}),
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap(), IndexLayerMetadata {
// serde_json should always parse this but this might be a double with jq for
// example.
file_size: 9007199254741001,
generation: Generation::none()
generation: Generation::none(),
shard: ShardIndex::unsharded()
})
]),
disk_consistent_lsn: "0/16960E8".parse::<Lsn>().unwrap(),
@@ -272,13 +286,15 @@ mod tests {
layer_metadata: HashMap::from([
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9".parse().unwrap(), IndexLayerMetadata {
file_size: 25600000,
generation: Generation::none()
generation: Generation::none(),
shard: ShardIndex::unsharded()
}),
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap(), IndexLayerMetadata {
// serde_json should always parse this but this might be a double with jq for
// example.
file_size: 9007199254741001,
generation: Generation::none()
generation: Generation::none(),
shard: ShardIndex::unsharded()
})
]),
disk_consistent_lsn: "0/16960E8".parse::<Lsn>().unwrap(),
@@ -354,19 +370,21 @@ mod tests {
layer_metadata: HashMap::from([
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9".parse().unwrap(), IndexLayerMetadata {
file_size: 25600000,
generation: Generation::none()
generation: Generation::none(),
shard: ShardIndex::unsharded()
}),
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap(), IndexLayerMetadata {
// serde_json should always parse this but this might be a double with jq for
// example.
file_size: 9007199254741001,
generation: Generation::none()
generation: Generation::none(),
shard: ShardIndex::unsharded()
})
]),
disk_consistent_lsn: "0/16960E8".parse::<Lsn>().unwrap(),
metadata: TimelineMetadata::from_bytes(&[113,11,159,210,0,54,0,4,0,0,0,0,1,105,96,232,1,0,0,0,0,1,105,96,112,0,0,0,0,0,0,0,0,0,0,0,0,0,1,105,96,112,0,0,0,0,1,105,96,112,0,0,0,14,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0]).unwrap(),
deleted_at: Some(chrono::NaiveDateTime::parse_from_str(
"2023-07-31T09:00:00.123000000", "%Y-%m-%dT%H:%M:%S.%f").unwrap())
"2023-07-31T09:00:00.123000000", "%Y-%m-%dT%H:%M:%S.%f").unwrap()),
};
let part = IndexPart::from_s3_bytes(example.as_bytes()).unwrap();

View File

@@ -4,6 +4,7 @@ use anyhow::{bail, Context};
use bytes::Bytes;
use camino::Utf8Path;
use fail::fail_point;
use pageserver_api::shard::ShardIndex;
use std::io::ErrorKind;
use tokio::fs;
@@ -26,6 +27,7 @@ pub(super) async fn upload_index_part<'a>(
storage: &'a GenericRemoteStorage,
tenant_id: &TenantId,
timeline_id: &TimelineId,
shard: ShardIndex,
generation: Generation,
index_part: &'a IndexPart,
) -> anyhow::Result<()> {
@@ -42,7 +44,7 @@ pub(super) async fn upload_index_part<'a>(
let index_part_size = index_part_bytes.len();
let index_part_bytes = tokio::io::BufReader::new(std::io::Cursor::new(index_part_bytes));
let remote_path = remote_index_path(tenant_id, timeline_id, generation);
let remote_path = remote_index_path(tenant_id, timeline_id, shard, generation);
storage
.upload_storage_object(Box::new(index_part_bytes), index_part_size, &remote_path)
.await

View File

@@ -3,6 +3,7 @@ use camino::{Utf8Path, Utf8PathBuf};
use pageserver_api::models::{
HistoricLayerInfo, LayerAccessKind, LayerResidenceEventReason, LayerResidenceStatus,
};
use pageserver_api::shard::ShardIndex;
use std::ops::Range;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::{Arc, Weak};
@@ -96,6 +97,7 @@ impl Layer {
desc,
None,
metadata.generation,
metadata.shard,
)));
debug_assert!(owner.0.needs_download_blocking().unwrap().is_some());
@@ -136,6 +138,7 @@ impl Layer {
desc,
Some(inner),
metadata.generation,
metadata.shard,
)
}));
@@ -179,6 +182,7 @@ impl Layer {
desc,
Some(inner),
timeline.generation,
timeline.get_shard_index(),
)
}));
@@ -426,6 +430,15 @@ struct LayerInner {
/// For loaded layers (resident or evicted) this comes from [`LayerFileMetadata::generation`],
/// for created layers from [`Timeline::generation`].
generation: Generation,
/// The shard of this Layer.
///
/// For layers created in this process, this will always be the [`ShardIndex`] of the
/// current `ShardIdentity`` (TODO: add link once it's introduced).
///
/// For loaded layers, this may be some other value if the tenant has undergone
/// a shard split since the layer was originally written.
shard: ShardIndex,
}
impl std::fmt::Display for LayerInner {
@@ -459,9 +472,9 @@ impl Drop for LayerInner {
let path = std::mem::take(&mut self.path);
let file_name = self.layer_desc().filename();
let gen = self.generation;
let file_size = self.layer_desc().file_size;
let timeline = self.timeline.clone();
let meta = self.metadata();
crate::task_mgr::BACKGROUND_RUNTIME.spawn_blocking(move || {
let _g = span.entered();
@@ -489,7 +502,7 @@ impl Drop for LayerInner {
timeline.metrics.resident_physical_size_sub(file_size);
}
if let Some(remote_client) = timeline.remote_client.as_ref() {
let res = remote_client.schedule_deletion_of_unlinked(vec![(file_name, gen)]);
let res = remote_client.schedule_deletion_of_unlinked(vec![(file_name, meta)]);
if let Err(e) = res {
// test_timeline_deletion_with_files_stuck_in_upload_queue is good at
@@ -523,6 +536,7 @@ impl LayerInner {
desc: PersistentLayerDesc,
downloaded: Option<Arc<DownloadedLayer>>,
generation: Generation,
shard: ShardIndex,
) -> Self {
let path = conf
.timeline_path(&timeline.tenant_id, &timeline.timeline_id)
@@ -550,6 +564,7 @@ impl LayerInner {
status: tokio::sync::broadcast::channel(1).0,
consecutive_failures: AtomicUsize::new(0),
generation,
shard,
}
}
@@ -1077,7 +1092,7 @@ impl LayerInner {
}
fn metadata(&self) -> LayerFileMetadata {
LayerFileMetadata::new(self.desc.file_size, self.generation)
LayerFileMetadata::new(self.desc.file_size, self.generation, self.shard)
}
}

View File

@@ -62,6 +62,7 @@ use crate::pgdatadir_mapping::{is_rel_fsm_block_key, is_rel_vm_block_key};
use crate::pgdatadir_mapping::{BlockNumber, CalculateLogicalSizeError};
use crate::tenant::config::{EvictionPolicy, TenantConfOpt};
use pageserver_api::reltag::RelTag;
use pageserver_api::shard::ShardIndex;
use postgres_connection::PgConnectionConfig;
use postgres_ffi::to_pg_timestamp;
@@ -1597,6 +1598,7 @@ impl Timeline {
// Copy to move into the task we're about to spawn
let generation = self.generation;
let shard = self.get_shard_index();
let this = self.myself.upgrade().expect("&self method holds the arc");
let (loaded_layers, needs_cleanup, total_physical_size) = tokio::task::spawn_blocking({
@@ -1645,6 +1647,7 @@ impl Timeline {
index_part.as_ref(),
disk_consistent_lsn,
generation,
shard,
);
let mut loaded_layers = Vec::new();
@@ -4364,6 +4367,11 @@ impl Timeline {
resident_layers,
}
}
pub(crate) fn get_shard_index(&self) -> ShardIndex {
// TODO: carry this on the struct
ShardIndex::unsharded()
}
}
type TraversalPathItem = (

View File

@@ -13,6 +13,7 @@ use crate::{
};
use anyhow::Context;
use camino::Utf8Path;
use pageserver_api::shard::ShardIndex;
use std::{collections::HashMap, str::FromStr};
use utils::lsn::Lsn;
@@ -107,6 +108,7 @@ pub(super) fn reconcile(
index_part: Option<&IndexPart>,
disk_consistent_lsn: Lsn,
generation: Generation,
shard: ShardIndex,
) -> Vec<(LayerFileName, Result<Decision, DismissedLayer>)> {
use Decision::*;
@@ -118,10 +120,13 @@ pub(super) fn reconcile(
.map(|(name, file_size)| {
(
name,
// The generation here will be corrected to match IndexPart in the merge below, unless
// The generation and shard here will be corrected to match IndexPart in the merge below, unless
// it is not in IndexPart, in which case using our current generation makes sense
// because it will be uploaded in this generation.
(Some(LayerFileMetadata::new(file_size, generation)), None),
(
Some(LayerFileMetadata::new(file_size, generation, shard)),
None,
),
)
})
.collect::<Collected>();

View File

@@ -1,6 +1,5 @@
use super::storage_layer::LayerFileName;
use super::storage_layer::ResidentLayer;
use super::Generation;
use crate::tenant::metadata::TimelineMetadata;
use crate::tenant::remote_timeline_client::index::IndexPart;
use crate::tenant::remote_timeline_client::index::LayerFileMetadata;
@@ -15,6 +14,9 @@ use utils::lsn::AtomicLsn;
use std::sync::atomic::AtomicU32;
use utils::lsn::Lsn;
#[cfg(feature = "testing")]
use utils::generation::Generation;
// clippy warns that Uninitialized is much smaller than Initialized, which wastes
// memory for Uninitialized variants. Doesn't matter in practice, there are not
// that many upload queues in a running pageserver, and most of them are initialized
@@ -232,7 +234,7 @@ pub(crate) struct UploadTask {
/// for timeline deletion, which skips this queue and goes directly to DeletionQueue.
#[derive(Debug)]
pub(crate) struct Delete {
pub(crate) layers: Vec<(LayerFileName, Generation)>,
pub(crate) layers: Vec<(LayerFileName, LayerFileMetadata)>,
}
#[derive(Debug)]