mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-15 17:32:56 +00:00
pageserver: hacky version of putting shard into remote storage paths
This commit is contained in:
@@ -15,6 +15,7 @@ use crate::virtual_file::VirtualFile;
|
||||
use anyhow::Context;
|
||||
use camino::Utf8PathBuf;
|
||||
use hex::FromHex;
|
||||
use pageserver_api::shard::ShardIdentity;
|
||||
use remote_storage::{GenericRemoteStorage, RemotePath};
|
||||
use serde::Deserialize;
|
||||
use serde::Serialize;
|
||||
@@ -300,6 +301,7 @@ impl DeletionList {
|
||||
fn push(
|
||||
&mut self,
|
||||
tenant: &TenantId,
|
||||
shard: &ShardIdentity,
|
||||
timeline: &TimelineId,
|
||||
generation: Generation,
|
||||
objects: &mut Vec<RemotePath>,
|
||||
@@ -326,7 +328,7 @@ impl DeletionList {
|
||||
|
||||
let timeline_entry = tenant_entry.timelines.entry(*timeline).or_default();
|
||||
|
||||
let timeline_remote_path = remote_timeline_path(tenant, timeline);
|
||||
let timeline_remote_path = remote_timeline_path(tenant, shard, timeline);
|
||||
|
||||
self.size += objects.len();
|
||||
timeline_entry.extend(objects.drain(..).map(|p| {
|
||||
@@ -341,7 +343,9 @@ impl DeletionList {
|
||||
let mut result = Vec::new();
|
||||
for (tenant, tenant_deletions) in self.tenants.into_iter() {
|
||||
for (timeline, timeline_layers) in tenant_deletions.timelines.into_iter() {
|
||||
let timeline_remote_path = remote_timeline_path(&tenant, &timeline);
|
||||
// FIXME: need to update DeletionList definition to store the ShardIdentity for each Tenant
|
||||
let timeline_remote_path =
|
||||
remote_timeline_path(&tenant, &ShardIdentity::none(), &timeline);
|
||||
result.extend(
|
||||
timeline_layers
|
||||
.into_iter()
|
||||
@@ -507,6 +511,7 @@ impl DeletionQueueClient {
|
||||
pub(crate) async fn push_layers(
|
||||
&self,
|
||||
tenant_id: TenantId,
|
||||
shard: &ShardIdentity,
|
||||
timeline_id: TimelineId,
|
||||
current_generation: Generation,
|
||||
layers: Vec<(LayerFileName, Generation)>,
|
||||
@@ -517,6 +522,7 @@ impl DeletionQueueClient {
|
||||
for (layer, generation) in layers {
|
||||
layer_paths.push(remote_layer_path(
|
||||
&tenant_id,
|
||||
shard,
|
||||
&timeline_id,
|
||||
&layer,
|
||||
generation,
|
||||
@@ -829,7 +835,8 @@ mod test {
|
||||
gen: Generation,
|
||||
) -> anyhow::Result<String> {
|
||||
let tenant_id = self.harness.tenant_id;
|
||||
let relative_remote_path = remote_timeline_path(&tenant_id, &TIMELINE_ID);
|
||||
let relative_remote_path =
|
||||
remote_timeline_path(&tenant_id, &ShardIdentity::none(), &TIMELINE_ID);
|
||||
let remote_timeline_path = self.remote_fs_dir.join(relative_remote_path.get_path());
|
||||
std::fs::create_dir_all(&remote_timeline_path)?;
|
||||
let remote_layer_file_name = format!("{}{}", file_name, gen.get_suffix());
|
||||
@@ -981,7 +988,8 @@ mod test {
|
||||
let tenant_id = ctx.harness.tenant_id;
|
||||
|
||||
let content: Vec<u8> = "victim1 contents".into();
|
||||
let relative_remote_path = remote_timeline_path(&tenant_id, &TIMELINE_ID);
|
||||
let relative_remote_path =
|
||||
remote_timeline_path(&tenant_id, &ShardIdentity::none(), &TIMELINE_ID);
|
||||
let remote_timeline_path = ctx.remote_fs_dir.join(relative_remote_path.get_path());
|
||||
let deletion_prefix = ctx.harness.conf.deletion_prefix();
|
||||
|
||||
@@ -1010,6 +1018,7 @@ mod test {
|
||||
client
|
||||
.push_layers(
|
||||
tenant_id,
|
||||
&ShardIdentity::none(),
|
||||
TIMELINE_ID,
|
||||
now_generation,
|
||||
[(layer_file_name_1.clone(), layer_generation)].to_vec(),
|
||||
@@ -1055,7 +1064,8 @@ mod test {
|
||||
ctx.set_latest_generation(latest_generation);
|
||||
|
||||
let tenant_id = ctx.harness.tenant_id;
|
||||
let relative_remote_path = remote_timeline_path(&tenant_id, &TIMELINE_ID);
|
||||
let relative_remote_path =
|
||||
remote_timeline_path(&tenant_id, &ShardIdentity::none(), &TIMELINE_ID);
|
||||
let remote_timeline_path = ctx.remote_fs_dir.join(relative_remote_path.get_path());
|
||||
|
||||
// Initial state: a remote layer exists
|
||||
@@ -1066,6 +1076,7 @@ mod test {
|
||||
client
|
||||
.push_layers(
|
||||
tenant_id,
|
||||
&ShardIdentity::none(),
|
||||
TIMELINE_ID,
|
||||
stale_generation,
|
||||
[(EXAMPLE_LAYER_NAME.clone(), layer_generation)].to_vec(),
|
||||
@@ -1081,6 +1092,7 @@ mod test {
|
||||
client
|
||||
.push_layers(
|
||||
tenant_id,
|
||||
&ShardIdentity::none(),
|
||||
TIMELINE_ID,
|
||||
latest_generation,
|
||||
[(EXAMPLE_LAYER_NAME.clone(), layer_generation)].to_vec(),
|
||||
@@ -1104,7 +1116,8 @@ mod test {
|
||||
|
||||
let tenant_id = ctx.harness.tenant_id;
|
||||
|
||||
let relative_remote_path = remote_timeline_path(&tenant_id, &TIMELINE_ID);
|
||||
let relative_remote_path =
|
||||
remote_timeline_path(&tenant_id, &ShardIdentity::none(), &TIMELINE_ID);
|
||||
let remote_timeline_path = ctx.remote_fs_dir.join(relative_remote_path.get_path());
|
||||
let deletion_prefix = ctx.harness.conf.deletion_prefix();
|
||||
|
||||
@@ -1119,6 +1132,7 @@ mod test {
|
||||
client
|
||||
.push_layers(
|
||||
tenant_id,
|
||||
&ShardIdentity::none(),
|
||||
TIMELINE_ID,
|
||||
now_generation.previous(),
|
||||
[(EXAMPLE_LAYER_NAME.clone(), layer_generation)].to_vec(),
|
||||
@@ -1133,6 +1147,7 @@ mod test {
|
||||
client
|
||||
.push_layers(
|
||||
tenant_id,
|
||||
&ShardIdentity::none(),
|
||||
TIMELINE_ID,
|
||||
now_generation,
|
||||
[(EXAMPLE_LAYER_NAME_ALT.clone(), layer_generation)].to_vec(),
|
||||
@@ -1228,6 +1243,7 @@ pub(crate) mod mock {
|
||||
for (layer, generation) in op.layers {
|
||||
objects.push(remote_layer_path(
|
||||
&op.tenant_id,
|
||||
&ShardIdentity::none(),
|
||||
&op.timeline_id,
|
||||
&layer,
|
||||
generation,
|
||||
|
||||
@@ -19,6 +19,7 @@ use std::collections::HashMap;
|
||||
use std::fs::create_dir_all;
|
||||
use std::time::Duration;
|
||||
|
||||
use pageserver_api::shard::ShardIdentity;
|
||||
use regex::Regex;
|
||||
use remote_storage::RemotePath;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
@@ -390,6 +391,8 @@ impl ListWriter {
|
||||
for (layer, generation) in op.layers {
|
||||
layer_paths.push(remote_layer_path(
|
||||
&op.tenant_id,
|
||||
// TODO: store shard in deletion list
|
||||
&ShardIdentity::none(),
|
||||
&op.timeline_id,
|
||||
&layer,
|
||||
generation,
|
||||
@@ -399,6 +402,8 @@ impl ListWriter {
|
||||
|
||||
if !self.pending.push(
|
||||
&op.tenant_id,
|
||||
// TODO: store shard in deletion list
|
||||
&ShardIdentity::none(),
|
||||
&op.timeline_id,
|
||||
op.generation,
|
||||
&mut layer_paths,
|
||||
@@ -406,6 +411,8 @@ impl ListWriter {
|
||||
self.flush().await;
|
||||
let retry_succeeded = self.pending.push(
|
||||
&op.tenant_id,
|
||||
// TODO: store shard in deletion list
|
||||
&ShardIdentity::none(),
|
||||
&op.timeline_id,
|
||||
op.generation,
|
||||
&mut layer_paths,
|
||||
|
||||
@@ -688,9 +688,11 @@ impl Tenant {
|
||||
// Get list of remote timelines
|
||||
// download index files for every tenant timeline
|
||||
info!("listing remote timelines");
|
||||
let shard = self.tenant_conf.read().unwrap().shard.clone();
|
||||
let (remote_timeline_ids, other_keys) = remote_timeline_client::list_remote_timelines(
|
||||
remote_storage,
|
||||
self.tenant_id,
|
||||
&shard,
|
||||
cancel.clone(),
|
||||
)
|
||||
.await?;
|
||||
@@ -1151,6 +1153,7 @@ impl Tenant {
|
||||
self.deletion_queue_client.clone(),
|
||||
self.conf,
|
||||
self.tenant_id,
|
||||
self.tenant_conf.read().unwrap().shard.clone(),
|
||||
timeline_id,
|
||||
self.generation,
|
||||
);
|
||||
@@ -2991,6 +2994,7 @@ impl Tenant {
|
||||
self.deletion_queue_client.clone(),
|
||||
self.conf,
|
||||
self.tenant_id,
|
||||
self.tenant_conf.read().unwrap().shard.clone(),
|
||||
timeline_id,
|
||||
self.generation,
|
||||
);
|
||||
|
||||
@@ -771,7 +771,10 @@ impl TenantManager {
|
||||
new_location_config: LocationConf,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<(), anyhow::Error> {
|
||||
info!("configuring tenant location {tenant_id} to state {new_location_config:?}");
|
||||
info!(
|
||||
"configuring tenant location {tenant_id} {} to state {new_location_config:?}",
|
||||
new_location_config.shard.slug()
|
||||
);
|
||||
|
||||
// Special case fast-path for updates to Tenant: if our upsert is only updating configuration,
|
||||
// then we do not need to set the slot to InProgress, we can just call into the
|
||||
|
||||
@@ -188,6 +188,7 @@ use anyhow::Context;
|
||||
use camino::Utf8Path;
|
||||
use chrono::{NaiveDateTime, Utc};
|
||||
|
||||
use pageserver_api::shard::ShardIdentity;
|
||||
use scopeguard::ScopeGuard;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use utils::backoff::{
|
||||
@@ -298,6 +299,7 @@ pub struct RemoteTimelineClient {
|
||||
runtime: tokio::runtime::Handle,
|
||||
|
||||
tenant_id: TenantId,
|
||||
shard: ShardIdentity,
|
||||
timeline_id: TimelineId,
|
||||
generation: Generation,
|
||||
|
||||
@@ -322,9 +324,12 @@ impl RemoteTimelineClient {
|
||||
deletion_queue_client: DeletionQueueClient,
|
||||
conf: &'static PageServerConf,
|
||||
tenant_id: TenantId,
|
||||
shard: ShardIdentity,
|
||||
timeline_id: TimelineId,
|
||||
generation: Generation,
|
||||
) -> RemoteTimelineClient {
|
||||
tracing::info!("RemoteTimelineClient::new shard={}", shard.slug());
|
||||
|
||||
RemoteTimelineClient {
|
||||
conf,
|
||||
runtime: if cfg!(test) {
|
||||
@@ -334,6 +339,7 @@ impl RemoteTimelineClient {
|
||||
BACKGROUND_RUNTIME.handle().clone()
|
||||
},
|
||||
tenant_id,
|
||||
shard,
|
||||
timeline_id,
|
||||
generation,
|
||||
storage_impl: remote_storage,
|
||||
@@ -461,6 +467,7 @@ impl RemoteTimelineClient {
|
||||
let index_part = download::download_index_part(
|
||||
&self.storage_impl,
|
||||
&self.tenant_id,
|
||||
&self.shard,
|
||||
&self.timeline_id,
|
||||
self.generation,
|
||||
cancel,
|
||||
@@ -503,6 +510,7 @@ impl RemoteTimelineClient {
|
||||
self.conf,
|
||||
&self.storage_impl,
|
||||
self.tenant_id,
|
||||
&self.shard,
|
||||
self.timeline_id,
|
||||
layer_file_name,
|
||||
layer_metadata,
|
||||
@@ -893,6 +901,7 @@ impl RemoteTimelineClient {
|
||||
upload::upload_index_part(
|
||||
&self.storage_impl,
|
||||
&self.tenant_id,
|
||||
&self.shard,
|
||||
&self.timeline_id,
|
||||
self.generation,
|
||||
&index_part_with_deleted_at,
|
||||
@@ -951,6 +960,7 @@ impl RemoteTimelineClient {
|
||||
.map(|(file_name, meta)| {
|
||||
remote_layer_path(
|
||||
&self.tenant_id,
|
||||
&self.shard,
|
||||
&self.timeline_id,
|
||||
&file_name,
|
||||
meta.generation,
|
||||
@@ -964,7 +974,8 @@ impl RemoteTimelineClient {
|
||||
|
||||
// Do not delete index part yet, it is needed for possible retry. If we remove it first
|
||||
// and retry will arrive to different pageserver there wont be any traces of it on remote storage
|
||||
let timeline_storage_path = remote_timeline_path(&self.tenant_id, &self.timeline_id);
|
||||
let timeline_storage_path =
|
||||
remote_timeline_path(&self.tenant_id, &self.shard, &self.timeline_id);
|
||||
|
||||
// Execute all pending deletions, so that when we proceed to do a list_prefixes below, we aren't
|
||||
// taking the burden of listing all the layers that we already know we should delete.
|
||||
@@ -1000,7 +1011,12 @@ impl RemoteTimelineClient {
|
||||
.unwrap_or(
|
||||
// No generation-suffixed indices, assume we are dealing with
|
||||
// a legacy index.
|
||||
remote_index_path(&self.tenant_id, &self.timeline_id, Generation::none()),
|
||||
remote_index_path(
|
||||
&self.tenant_id,
|
||||
&self.shard,
|
||||
&self.timeline_id,
|
||||
Generation::none(),
|
||||
),
|
||||
);
|
||||
|
||||
let remaining_layers: Vec<RemotePath> = remaining
|
||||
@@ -1178,13 +1194,20 @@ impl RemoteTimelineClient {
|
||||
|
||||
let upload_result: anyhow::Result<()> = match &task.op {
|
||||
UploadOp::UploadLayer(ref layer, ref layer_metadata) => {
|
||||
let path = layer.local_path();
|
||||
upload::upload_timeline_layer(
|
||||
self.conf,
|
||||
&self.storage_impl,
|
||||
path,
|
||||
layer_metadata,
|
||||
let remote_path = remote_layer_path(
|
||||
&self.tenant_id,
|
||||
&self.shard,
|
||||
&self.timeline_id,
|
||||
&layer.layer_desc().filename(),
|
||||
self.generation,
|
||||
);
|
||||
|
||||
let local_path = layer.local_path();
|
||||
upload::upload_timeline_layer(
|
||||
&self.storage_impl,
|
||||
local_path,
|
||||
remote_path,
|
||||
layer_metadata,
|
||||
)
|
||||
.measure_remote_op(
|
||||
self.tenant_id,
|
||||
@@ -1208,6 +1231,7 @@ impl RemoteTimelineClient {
|
||||
let res = upload::upload_index_part(
|
||||
&self.storage_impl,
|
||||
&self.tenant_id,
|
||||
&self.shard,
|
||||
&self.timeline_id,
|
||||
self.generation,
|
||||
index_part,
|
||||
@@ -1233,6 +1257,7 @@ impl RemoteTimelineClient {
|
||||
.deletion_queue_client
|
||||
.push_layers(
|
||||
self.tenant_id,
|
||||
&self.shard,
|
||||
self.timeline_id,
|
||||
self.generation,
|
||||
delete.layers.clone(),
|
||||
@@ -1503,24 +1528,33 @@ impl RemoteTimelineClient {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn remote_timelines_path(tenant_id: &TenantId) -> RemotePath {
|
||||
let path = format!("tenants/{tenant_id}/{TIMELINES_SEGMENT_NAME}");
|
||||
pub fn remote_timelines_path(tenant_id: &TenantId, shard: &ShardIdentity) -> RemotePath {
|
||||
let path = format!(
|
||||
"tenants/{tenant_id}{}/{TIMELINES_SEGMENT_NAME}",
|
||||
shard.slug()
|
||||
);
|
||||
RemotePath::from_string(&path).expect("Failed to construct path")
|
||||
}
|
||||
|
||||
pub fn remote_timeline_path(tenant_id: &TenantId, timeline_id: &TimelineId) -> RemotePath {
|
||||
remote_timelines_path(tenant_id).join(Utf8Path::new(&timeline_id.to_string()))
|
||||
pub fn remote_timeline_path(
|
||||
tenant_id: &TenantId,
|
||||
shard: &ShardIdentity,
|
||||
timeline_id: &TimelineId,
|
||||
) -> RemotePath {
|
||||
remote_timelines_path(tenant_id, shard).join(Utf8Path::new(&timeline_id.to_string()))
|
||||
}
|
||||
|
||||
pub fn remote_layer_path(
|
||||
tenant_id: &TenantId,
|
||||
shard: &ShardIdentity,
|
||||
timeline_id: &TimelineId,
|
||||
layer_file_name: &LayerFileName,
|
||||
generation: Generation,
|
||||
) -> RemotePath {
|
||||
// Generation-aware key format
|
||||
let path = format!(
|
||||
"tenants/{tenant_id}/{TIMELINES_SEGMENT_NAME}/{timeline_id}/{0}{1}",
|
||||
"tenants/{tenant_id}{0}/{TIMELINES_SEGMENT_NAME}/{timeline_id}/{1}{2}",
|
||||
shard.slug(),
|
||||
layer_file_name.file_name(),
|
||||
generation.get_suffix()
|
||||
);
|
||||
@@ -1530,11 +1564,13 @@ pub fn remote_layer_path(
|
||||
|
||||
pub fn remote_index_path(
|
||||
tenant_id: &TenantId,
|
||||
shard: &ShardIdentity,
|
||||
timeline_id: &TimelineId,
|
||||
generation: Generation,
|
||||
) -> RemotePath {
|
||||
RemotePath::from_string(&format!(
|
||||
"tenants/{tenant_id}/{TIMELINES_SEGMENT_NAME}/{timeline_id}/{0}{1}",
|
||||
"tenants/{tenant_id}{0}/{TIMELINES_SEGMENT_NAME}/{timeline_id}/{1}{2}",
|
||||
shard.slug(),
|
||||
IndexPart::FILE_NAME,
|
||||
generation.get_suffix()
|
||||
))
|
||||
@@ -1558,29 +1594,6 @@ pub fn parse_remote_index_path(path: RemotePath) -> Option<Generation> {
|
||||
}
|
||||
}
|
||||
|
||||
/// Files on the remote storage are stored with paths, relative to the workdir.
|
||||
/// That path includes in itself both tenant and timeline ids, allowing to have a unique remote storage path.
|
||||
///
|
||||
/// Errors if the path provided does not start from pageserver's workdir.
|
||||
pub fn remote_path(
|
||||
conf: &PageServerConf,
|
||||
local_path: &Utf8Path,
|
||||
generation: Generation,
|
||||
) -> anyhow::Result<RemotePath> {
|
||||
let stripped = local_path
|
||||
.strip_prefix(&conf.workdir)
|
||||
.context("Failed to strip workdir prefix")?;
|
||||
|
||||
let suffixed = format!("{0}{1}", stripped, generation.get_suffix());
|
||||
|
||||
RemotePath::new(Utf8Path::new(&suffixed)).with_context(|| {
|
||||
format!(
|
||||
"to resolve remote part of path {:?} for base {:?}",
|
||||
local_path, conf.workdir
|
||||
)
|
||||
})
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
@@ -1677,6 +1690,7 @@ mod tests {
|
||||
conf: self.harness.conf,
|
||||
runtime: tokio::runtime::Handle::current(),
|
||||
tenant_id: self.harness.tenant_id,
|
||||
shard: ShardIdentity::none(),
|
||||
timeline_id: TIMELINE_ID,
|
||||
generation,
|
||||
storage_impl: self.harness.remote_storage.clone(),
|
||||
@@ -2010,7 +2024,13 @@ mod tests {
|
||||
std::fs::create_dir_all(remote_timeline_dir).expect("creating test dir should work");
|
||||
|
||||
let index_path = test_state.harness.remote_fs_dir.join(
|
||||
remote_index_path(&test_state.harness.tenant_id, &TIMELINE_ID, generation).get_path(),
|
||||
remote_index_path(
|
||||
&test_state.harness.tenant_id,
|
||||
&ShardIdentity::none(),
|
||||
&TIMELINE_ID,
|
||||
generation,
|
||||
)
|
||||
.get_path(),
|
||||
);
|
||||
eprintln!("Writing {index_path}");
|
||||
std::fs::write(&index_path, index_part_bytes).unwrap();
|
||||
|
||||
@@ -9,6 +9,7 @@ use std::time::Duration;
|
||||
|
||||
use anyhow::{anyhow, Context};
|
||||
use camino::Utf8Path;
|
||||
use pageserver_api::shard::ShardIdentity;
|
||||
use tokio::fs;
|
||||
use tokio::io::AsyncWriteExt;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
@@ -40,6 +41,7 @@ pub async fn download_layer_file<'a>(
|
||||
conf: &'static PageServerConf,
|
||||
storage: &'a GenericRemoteStorage,
|
||||
tenant_id: TenantId,
|
||||
shard: &ShardIdentity,
|
||||
timeline_id: TimelineId,
|
||||
layer_file_name: &'a LayerFileName,
|
||||
layer_metadata: &'a LayerFileMetadata,
|
||||
@@ -52,6 +54,7 @@ pub async fn download_layer_file<'a>(
|
||||
|
||||
let remote_path = remote_layer_path(
|
||||
&tenant_id,
|
||||
shard,
|
||||
&timeline_id,
|
||||
layer_file_name,
|
||||
layer_metadata.generation,
|
||||
@@ -170,9 +173,10 @@ pub fn is_temp_download_file(path: &Utf8Path) -> bool {
|
||||
pub async fn list_remote_timelines(
|
||||
storage: &GenericRemoteStorage,
|
||||
tenant_id: TenantId,
|
||||
shard: &ShardIdentity,
|
||||
cancel: CancellationToken,
|
||||
) -> anyhow::Result<(HashSet<TimelineId>, HashSet<String>)> {
|
||||
let remote_path = remote_timelines_path(&tenant_id);
|
||||
let remote_path = remote_timelines_path(&tenant_id, shard);
|
||||
|
||||
fail::fail_point!("storage-sync-list-remote-timelines", |_| {
|
||||
anyhow::bail!("storage-sync-list-remote-timelines");
|
||||
@@ -212,11 +216,12 @@ pub async fn list_remote_timelines(
|
||||
async fn do_download_index_part(
|
||||
storage: &GenericRemoteStorage,
|
||||
tenant_id: &TenantId,
|
||||
shard: &ShardIdentity,
|
||||
timeline_id: &TimelineId,
|
||||
index_generation: Generation,
|
||||
cancel: CancellationToken,
|
||||
) -> Result<IndexPart, DownloadError> {
|
||||
let remote_path = remote_index_path(tenant_id, timeline_id, index_generation);
|
||||
let remote_path = remote_index_path(tenant_id, shard, timeline_id, index_generation);
|
||||
|
||||
let index_part_bytes = download_retry_forever(
|
||||
|| async {
|
||||
@@ -253,6 +258,7 @@ async fn do_download_index_part(
|
||||
pub(super) async fn download_index_part(
|
||||
storage: &GenericRemoteStorage,
|
||||
tenant_id: &TenantId,
|
||||
shard: &ShardIdentity,
|
||||
timeline_id: &TimelineId,
|
||||
my_generation: Generation,
|
||||
cancel: CancellationToken,
|
||||
@@ -261,8 +267,15 @@ pub(super) async fn download_index_part(
|
||||
|
||||
if my_generation.is_none() {
|
||||
// Operating without generations: just fetch the generation-less path
|
||||
return do_download_index_part(storage, tenant_id, timeline_id, my_generation, cancel)
|
||||
.await;
|
||||
return do_download_index_part(
|
||||
storage,
|
||||
tenant_id,
|
||||
shard,
|
||||
timeline_id,
|
||||
my_generation,
|
||||
cancel,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
// Stale case: If we were intentionally attached in a stale generation, there may already be a remote
|
||||
@@ -272,6 +285,7 @@ pub(super) async fn download_index_part(
|
||||
let res = do_download_index_part(
|
||||
storage,
|
||||
tenant_id,
|
||||
shard,
|
||||
timeline_id,
|
||||
my_generation,
|
||||
cancel.clone(),
|
||||
@@ -299,6 +313,7 @@ pub(super) async fn download_index_part(
|
||||
let res = do_download_index_part(
|
||||
storage,
|
||||
tenant_id,
|
||||
shard,
|
||||
timeline_id,
|
||||
my_generation.previous(),
|
||||
cancel.clone(),
|
||||
@@ -321,7 +336,7 @@ pub(super) async fn download_index_part(
|
||||
|
||||
// General case/fallback: if there is no index at my_generation or prev_generation, then list all index_part.json
|
||||
// objects, and select the highest one with a generation <= my_generation.
|
||||
let index_prefix = remote_index_path(tenant_id, timeline_id, Generation::none());
|
||||
let index_prefix = remote_index_path(tenant_id, shard, timeline_id, Generation::none());
|
||||
let indices = backoff::retry(
|
||||
|| async { storage.list_files(Some(&index_prefix)).await },
|
||||
|_| false,
|
||||
@@ -347,14 +362,21 @@ pub(super) async fn download_index_part(
|
||||
match max_previous_generation {
|
||||
Some(g) => {
|
||||
tracing::debug!("Found index_part in generation {g:?}");
|
||||
do_download_index_part(storage, tenant_id, timeline_id, g, cancel).await
|
||||
do_download_index_part(storage, tenant_id, shard, timeline_id, g, cancel).await
|
||||
}
|
||||
None => {
|
||||
// Migration from legacy pre-generation state: we have a generation but no prior
|
||||
// attached pageservers did. Try to load from a no-generation path.
|
||||
tracing::info!("No index_part.json* found");
|
||||
do_download_index_part(storage, tenant_id, timeline_id, Generation::none(), cancel)
|
||||
.await
|
||||
do_download_index_part(
|
||||
storage,
|
||||
tenant_id,
|
||||
shard,
|
||||
timeline_id,
|
||||
Generation::none(),
|
||||
cancel,
|
||||
)
|
||||
.await
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3,15 +3,13 @@
|
||||
use anyhow::{bail, Context};
|
||||
use camino::Utf8Path;
|
||||
use fail::fail_point;
|
||||
use pageserver_api::shard::ShardIdentity;
|
||||
use std::io::ErrorKind;
|
||||
use tokio::fs;
|
||||
|
||||
use super::Generation;
|
||||
use crate::{
|
||||
config::PageServerConf,
|
||||
tenant::remote_timeline_client::{index::IndexPart, remote_index_path, remote_path},
|
||||
};
|
||||
use remote_storage::GenericRemoteStorage;
|
||||
use crate::tenant::remote_timeline_client::{index::IndexPart, remote_index_path};
|
||||
use remote_storage::{GenericRemoteStorage, RemotePath};
|
||||
use utils::id::{TenantId, TimelineId};
|
||||
|
||||
use super::index::LayerFileMetadata;
|
||||
@@ -22,6 +20,7 @@ use tracing::info;
|
||||
pub(super) async fn upload_index_part<'a>(
|
||||
storage: &'a GenericRemoteStorage,
|
||||
tenant_id: &TenantId,
|
||||
shard: &ShardIdentity,
|
||||
timeline_id: &TimelineId,
|
||||
generation: Generation,
|
||||
index_part: &'a IndexPart,
|
||||
@@ -38,7 +37,7 @@ pub(super) async fn upload_index_part<'a>(
|
||||
let index_part_size = index_part_bytes.len();
|
||||
let index_part_bytes = tokio::io::BufReader::new(std::io::Cursor::new(index_part_bytes));
|
||||
|
||||
let remote_path = remote_index_path(tenant_id, timeline_id, generation);
|
||||
let remote_path = remote_index_path(tenant_id, shard, timeline_id, generation);
|
||||
storage
|
||||
.upload_storage_object(Box::new(index_part_bytes), index_part_size, &remote_path)
|
||||
.await
|
||||
@@ -50,11 +49,10 @@ pub(super) async fn upload_index_part<'a>(
|
||||
///
|
||||
/// On an error, bumps the retries count and reschedules the entire task.
|
||||
pub(super) async fn upload_timeline_layer<'a>(
|
||||
conf: &'static PageServerConf,
|
||||
storage: &'a GenericRemoteStorage,
|
||||
source_path: &'a Utf8Path,
|
||||
source_path: &Utf8Path,
|
||||
remote_path: RemotePath,
|
||||
known_metadata: &'a LayerFileMetadata,
|
||||
generation: Generation,
|
||||
) -> anyhow::Result<()> {
|
||||
fail_point!("before-upload-layer", |_| {
|
||||
bail!("failpoint before-upload-layer")
|
||||
@@ -62,7 +60,6 @@ pub(super) async fn upload_timeline_layer<'a>(
|
||||
|
||||
pausable_failpoint!("before-upload-layer-pausable");
|
||||
|
||||
let storage_path = remote_path(conf, source_path, generation)?;
|
||||
let source_file_res = fs::File::open(&source_path).await;
|
||||
let source_file = match source_file_res {
|
||||
Ok(source_file) => source_file,
|
||||
@@ -97,7 +94,7 @@ pub(super) async fn upload_timeline_layer<'a>(
|
||||
.with_context(|| format!("convert {source_path:?} size {fs_size} usize"))?;
|
||||
|
||||
storage
|
||||
.upload(source_file, fs_size, &storage_path, None)
|
||||
.upload(source_file, fs_size, &remote_path, None)
|
||||
.await
|
||||
.with_context(|| format!("upload layer from local path '{source_path}'"))?;
|
||||
|
||||
|
||||
@@ -1547,7 +1547,6 @@ impl Timeline {
|
||||
.tenant_conf
|
||||
.max_lsn_wal_lag
|
||||
.unwrap_or(self.conf.default_tenant_conf.max_lsn_wal_lag);
|
||||
let shard = tenant_conf_guard.shard.clone();
|
||||
drop(tenant_conf_guard);
|
||||
|
||||
let mut guard = self.walreceiver.lock().unwrap();
|
||||
|
||||
Reference in New Issue
Block a user