mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-04 20:12:54 +00:00
neon_local: add "tenant import" (#7399)
## Problem Sometimes we have test data in the form of S3 contents that we would like to run live in a neon_local environment. ## Summary of changes - Add a storage controller API that imports an existing tenant. Currently this is equivalent to doing a create with a high generation number, but in future this would be something smarter to probe S3 to find the shards in a tenant and find generation numbers. - Add a `neon_local` command that invokes the import API, and then inspects timelines in the newly attached tenant to create matching branches.
This commit is contained in:
@@ -417,6 +417,54 @@ async fn handle_tenant(
|
||||
println!("{} {:?}", t.id, t.state);
|
||||
}
|
||||
}
|
||||
Some(("import", import_match)) => {
|
||||
let tenant_id = parse_tenant_id(import_match)?.unwrap_or_else(TenantId::generate);
|
||||
|
||||
let storage_controller = StorageController::from_env(env);
|
||||
let create_response = storage_controller.tenant_import(tenant_id).await?;
|
||||
|
||||
let shard_zero = create_response
|
||||
.shards
|
||||
.first()
|
||||
.expect("Import response omitted shards");
|
||||
|
||||
let attached_pageserver_id = shard_zero.node_id;
|
||||
let pageserver =
|
||||
PageServerNode::from_env(env, env.get_pageserver_conf(attached_pageserver_id)?);
|
||||
|
||||
println!(
|
||||
"Imported tenant {tenant_id}, attached to pageserver {attached_pageserver_id}"
|
||||
);
|
||||
|
||||
let timelines = pageserver
|
||||
.http_client
|
||||
.list_timelines(shard_zero.shard_id)
|
||||
.await?;
|
||||
|
||||
// Pick a 'main' timeline that has no ancestors, the rest will get arbitrary names
|
||||
let main_timeline = timelines
|
||||
.iter()
|
||||
.find(|t| t.ancestor_timeline_id.is_none())
|
||||
.expect("No timelines found")
|
||||
.timeline_id;
|
||||
|
||||
let mut branch_i = 0;
|
||||
for timeline in timelines.iter() {
|
||||
let branch_name = if timeline.timeline_id == main_timeline {
|
||||
"main".to_string()
|
||||
} else {
|
||||
branch_i += 1;
|
||||
format!("branch_{branch_i}")
|
||||
};
|
||||
|
||||
println!(
|
||||
"Importing timeline {tenant_id}/{} as branch {branch_name}",
|
||||
timeline.timeline_id
|
||||
);
|
||||
|
||||
env.register_branch_mapping(branch_name, tenant_id, timeline.timeline_id)?;
|
||||
}
|
||||
}
|
||||
Some(("create", create_match)) => {
|
||||
let tenant_conf: HashMap<_, _> = create_match
|
||||
.get_many::<String>("config")
|
||||
@@ -1480,6 +1528,8 @@ fn cli() -> Command {
|
||||
.subcommand(Command::new("config")
|
||||
.arg(tenant_id_arg.clone())
|
||||
.arg(Arg::new("config").short('c').num_args(1).action(ArgAction::Append).required(false)))
|
||||
.subcommand(Command::new("import").arg(tenant_id_arg.clone().required(true))
|
||||
.about("Import a tenant that is present in remote storage, and create branches for its timelines"))
|
||||
)
|
||||
.subcommand(
|
||||
Command::new("pageserver")
|
||||
|
||||
@@ -472,6 +472,16 @@ impl StorageController {
|
||||
.await
|
||||
}
|
||||
|
||||
#[instrument(skip(self))]
|
||||
pub async fn tenant_import(&self, tenant_id: TenantId) -> anyhow::Result<TenantCreateResponse> {
|
||||
self.dispatch::<(), TenantCreateResponse>(
|
||||
Method::POST,
|
||||
format!("debug/v1/tenant/{tenant_id}/import"),
|
||||
None,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
#[instrument(skip(self))]
|
||||
pub async fn tenant_locate(&self, tenant_id: TenantId) -> anyhow::Result<TenantLocateResponse> {
|
||||
self.dispatch::<(), _>(
|
||||
|
||||
@@ -782,6 +782,17 @@ pub struct SecondaryProgress {
|
||||
pub bytes_total: u64,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug)]
|
||||
pub struct TenantScanRemoteStorageShard {
|
||||
pub tenant_shard_id: TenantShardId,
|
||||
pub generation: Option<u32>,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Default)]
|
||||
pub struct TenantScanRemoteStorageResponse {
|
||||
pub shards: Vec<TenantScanRemoteStorageShard>,
|
||||
}
|
||||
|
||||
pub mod virtual_file {
|
||||
#[derive(
|
||||
Copy,
|
||||
|
||||
@@ -34,6 +34,8 @@ pub enum Generation {
|
||||
/// scenarios where pageservers might otherwise issue conflicting writes to
|
||||
/// remote storage
|
||||
impl Generation {
|
||||
pub const MAX: Self = Self::Valid(u32::MAX);
|
||||
|
||||
/// Create a new Generation that represents a legacy key format with
|
||||
/// no generation suffix
|
||||
pub fn none() -> Self {
|
||||
|
||||
@@ -243,6 +243,19 @@ impl Client {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn tenant_scan_remote_storage(
|
||||
&self,
|
||||
tenant_id: TenantId,
|
||||
) -> Result<TenantScanRemoteStorageResponse> {
|
||||
let uri = format!(
|
||||
"{}/v1/tenant/{tenant_id}/scan_remote_storage",
|
||||
self.mgmt_api_endpoint
|
||||
);
|
||||
let response = self.request(Method::GET, &uri, ()).await?;
|
||||
let body = response.json().await.map_err(Error::ReceiveBody)?;
|
||||
Ok(body)
|
||||
}
|
||||
|
||||
pub async fn tenant_config(&self, req: &TenantConfigRequest) -> Result<()> {
|
||||
let uri = format!("{}/v1/tenant/config", self.mgmt_api_endpoint);
|
||||
self.request(Method::PUT, &uri, req).await?;
|
||||
|
||||
@@ -19,6 +19,8 @@ use pageserver_api::models::LocationConfigListResponse;
|
||||
use pageserver_api::models::ShardParameters;
|
||||
use pageserver_api::models::TenantDetails;
|
||||
use pageserver_api::models::TenantLocationConfigResponse;
|
||||
use pageserver_api::models::TenantScanRemoteStorageResponse;
|
||||
use pageserver_api::models::TenantScanRemoteStorageShard;
|
||||
use pageserver_api::models::TenantShardLocation;
|
||||
use pageserver_api::models::TenantShardSplitRequest;
|
||||
use pageserver_api::models::TenantShardSplitResponse;
|
||||
@@ -29,6 +31,7 @@ use pageserver_api::models::{
|
||||
};
|
||||
use pageserver_api::shard::ShardCount;
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use remote_storage::DownloadError;
|
||||
use remote_storage::GenericRemoteStorage;
|
||||
use remote_storage::TimeTravelError;
|
||||
use tenant_size_model::{SizeResult, StorageModel};
|
||||
@@ -54,6 +57,9 @@ use crate::tenant::mgr::{
|
||||
};
|
||||
use crate::tenant::mgr::{TenantSlot, UpsertLocationError};
|
||||
use crate::tenant::remote_timeline_client;
|
||||
use crate::tenant::remote_timeline_client::download_index_part;
|
||||
use crate::tenant::remote_timeline_client::list_remote_tenant_shards;
|
||||
use crate::tenant::remote_timeline_client::list_remote_timelines;
|
||||
use crate::tenant::secondary::SecondaryController;
|
||||
use crate::tenant::size::ModelInputs;
|
||||
use crate::tenant::storage_layer::LayerAccessStatsReset;
|
||||
@@ -2035,6 +2041,79 @@ async fn secondary_upload_handler(
|
||||
json_response(StatusCode::OK, ())
|
||||
}
|
||||
|
||||
async fn tenant_scan_remote_handler(
|
||||
request: Request<Body>,
|
||||
cancel: CancellationToken,
|
||||
) -> Result<Response<Body>, ApiError> {
|
||||
let state = get_state(&request);
|
||||
let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
|
||||
|
||||
let Some(remote_storage) = state.remote_storage.as_ref() else {
|
||||
return Err(ApiError::BadRequest(anyhow::anyhow!(
|
||||
"Remote storage not configured"
|
||||
)));
|
||||
};
|
||||
|
||||
let mut response = TenantScanRemoteStorageResponse::default();
|
||||
|
||||
let (shards, _other_keys) =
|
||||
list_remote_tenant_shards(remote_storage, tenant_id, cancel.clone())
|
||||
.await
|
||||
.map_err(|e| ApiError::InternalServerError(anyhow::anyhow!(e)))?;
|
||||
|
||||
for tenant_shard_id in shards {
|
||||
let (timeline_ids, _other_keys) =
|
||||
list_remote_timelines(remote_storage, tenant_shard_id, cancel.clone())
|
||||
.await
|
||||
.map_err(|e| ApiError::InternalServerError(anyhow::anyhow!(e)))?;
|
||||
|
||||
let mut generation = Generation::none();
|
||||
for timeline_id in timeline_ids {
|
||||
match download_index_part(
|
||||
remote_storage,
|
||||
&tenant_shard_id,
|
||||
&timeline_id,
|
||||
Generation::MAX,
|
||||
&cancel,
|
||||
)
|
||||
.instrument(info_span!("download_index_part",
|
||||
tenant_id=%tenant_shard_id.tenant_id,
|
||||
shard_id=%tenant_shard_id.shard_slug(),
|
||||
%timeline_id))
|
||||
.await
|
||||
{
|
||||
Ok((index_part, index_generation)) => {
|
||||
tracing::info!("Found timeline {tenant_shard_id}/{timeline_id} metadata (gen {index_generation:?}, {} layers, {} consistent LSN)",
|
||||
index_part.layer_metadata.len(), index_part.get_disk_consistent_lsn());
|
||||
generation = std::cmp::max(generation, index_generation);
|
||||
}
|
||||
Err(DownloadError::NotFound) => {
|
||||
// This is normal for tenants that were created with multiple shards: they have an unsharded path
|
||||
// containing the timeline's initdb tarball but no index. Otherwise it is a bit strange.
|
||||
tracing::info!("Timeline path {tenant_shard_id}/{timeline_id} exists in remote storage but has no index, skipping");
|
||||
continue;
|
||||
}
|
||||
Err(e) => {
|
||||
return Err(ApiError::InternalServerError(anyhow::anyhow!(e)));
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
response.shards.push(TenantScanRemoteStorageShard {
|
||||
tenant_shard_id,
|
||||
generation: generation.into(),
|
||||
});
|
||||
}
|
||||
|
||||
if response.shards.is_empty() {
|
||||
return Err(ApiError::NotFound(
|
||||
anyhow::anyhow!("No shards found for tenant ID {tenant_id}").into(),
|
||||
));
|
||||
}
|
||||
|
||||
json_response(StatusCode::OK, response)
|
||||
}
|
||||
|
||||
async fn secondary_download_handler(
|
||||
request: Request<Body>,
|
||||
_cancel: CancellationToken,
|
||||
@@ -2431,6 +2510,9 @@ pub fn make_router(
|
||||
.post("/v1/tenant/:tenant_shard_id/heatmap_upload", |r| {
|
||||
api_handler(r, secondary_upload_handler)
|
||||
})
|
||||
.get("/v1/tenant/:tenant_id/scan_remote_storage", |r| {
|
||||
api_handler(r, tenant_scan_remote_handler)
|
||||
})
|
||||
.put("/v1/disk_usage_eviction/run", |r| {
|
||||
api_handler(r, disk_usage_eviction_run)
|
||||
})
|
||||
|
||||
@@ -888,7 +888,7 @@ impl Tenant {
|
||||
|
||||
#[instrument(skip_all)]
|
||||
pub(crate) async fn preload(
|
||||
self: &Arc<Tenant>,
|
||||
self: &Arc<Self>,
|
||||
remote_storage: &GenericRemoteStorage,
|
||||
cancel: CancellationToken,
|
||||
) -> anyhow::Result<TenantPreload> {
|
||||
@@ -918,9 +918,13 @@ impl Tenant {
|
||||
|
||||
Ok(TenantPreload {
|
||||
deleting,
|
||||
timelines: self
|
||||
.load_timeline_metadata(remote_timeline_ids, remote_storage, cancel)
|
||||
.await?,
|
||||
timelines: Self::load_timeline_metadata(
|
||||
self,
|
||||
remote_timeline_ids,
|
||||
remote_storage,
|
||||
cancel,
|
||||
)
|
||||
.await?,
|
||||
})
|
||||
}
|
||||
|
||||
|
||||
@@ -243,7 +243,9 @@ use super::storage_layer::{Layer, LayerFileName, ResidentLayer};
|
||||
use super::upload_queue::SetDeletedFlagProgress;
|
||||
use super::Generation;
|
||||
|
||||
pub(crate) use download::{is_temp_download_file, list_remote_timelines};
|
||||
pub(crate) use download::{
|
||||
download_index_part, is_temp_download_file, list_remote_tenant_shards, list_remote_timelines,
|
||||
};
|
||||
pub(crate) use index::LayerFileMetadata;
|
||||
|
||||
// Occasional network issues and such can cause remote operations to fail, and
|
||||
@@ -472,7 +474,7 @@ impl RemoteTimelineClient {
|
||||
},
|
||||
);
|
||||
|
||||
let index_part = download::download_index_part(
|
||||
let (index_part, _index_generation) = download::download_index_part(
|
||||
&self.storage_impl,
|
||||
&self.tenant_shard_id,
|
||||
&self.timeline_id,
|
||||
@@ -1716,6 +1718,11 @@ impl RemoteTimelineClient {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn remote_tenant_path(tenant_shard_id: &TenantShardId) -> RemotePath {
|
||||
let path = format!("tenants/{tenant_shard_id}");
|
||||
RemotePath::from_string(&path).expect("Failed to construct path")
|
||||
}
|
||||
|
||||
pub fn remote_timelines_path(tenant_shard_id: &TenantShardId) -> RemotePath {
|
||||
let path = format!("tenants/{tenant_shard_id}/{TIMELINES_SEGMENT_NAME}");
|
||||
RemotePath::from_string(&path).expect("Failed to construct path")
|
||||
|
||||
@@ -5,6 +5,7 @@
|
||||
|
||||
use std::collections::HashSet;
|
||||
use std::future::Future;
|
||||
use std::str::FromStr;
|
||||
|
||||
use anyhow::{anyhow, Context};
|
||||
use camino::{Utf8Path, Utf8PathBuf};
|
||||
@@ -25,13 +26,13 @@ use crate::virtual_file::{on_fatal_io_error, MaybeFatalIo, VirtualFile};
|
||||
use crate::TEMP_FILE_SUFFIX;
|
||||
use remote_storage::{DownloadError, GenericRemoteStorage, ListingMode, RemotePath};
|
||||
use utils::crashsafe::path_with_suffix_extension;
|
||||
use utils::id::TimelineId;
|
||||
use utils::id::{TenantId, TimelineId};
|
||||
|
||||
use super::index::{IndexPart, LayerFileMetadata};
|
||||
use super::{
|
||||
parse_remote_index_path, remote_index_path, remote_initdb_archive_path,
|
||||
remote_initdb_preserved_archive_path, FAILED_DOWNLOAD_WARN_THRESHOLD, FAILED_REMOTE_OP_RETRIES,
|
||||
INITDB_PATH,
|
||||
remote_initdb_preserved_archive_path, remote_tenant_path, FAILED_DOWNLOAD_WARN_THRESHOLD,
|
||||
FAILED_REMOTE_OP_RETRIES, INITDB_PATH,
|
||||
};
|
||||
|
||||
///
|
||||
@@ -253,42 +254,31 @@ pub(crate) fn is_temp_download_file(path: &Utf8Path) -> bool {
|
||||
}
|
||||
}
|
||||
|
||||
/// List timelines of given tenant in remote storage
|
||||
pub async fn list_remote_timelines(
|
||||
async fn list_identifiers<T>(
|
||||
storage: &GenericRemoteStorage,
|
||||
tenant_shard_id: TenantShardId,
|
||||
prefix: RemotePath,
|
||||
cancel: CancellationToken,
|
||||
) -> anyhow::Result<(HashSet<TimelineId>, HashSet<String>)> {
|
||||
let remote_path = remote_timelines_path(&tenant_shard_id).add_trailing_slash();
|
||||
|
||||
fail::fail_point!("storage-sync-list-remote-timelines", |_| {
|
||||
anyhow::bail!("storage-sync-list-remote-timelines");
|
||||
});
|
||||
|
||||
) -> anyhow::Result<(HashSet<T>, HashSet<String>)>
|
||||
where
|
||||
T: FromStr + Eq + std::hash::Hash,
|
||||
{
|
||||
let listing = download_retry_forever(
|
||||
|| {
|
||||
storage.list(
|
||||
Some(&remote_path),
|
||||
ListingMode::WithDelimiter,
|
||||
None,
|
||||
&cancel,
|
||||
)
|
||||
},
|
||||
&format!("list timelines for {tenant_shard_id}"),
|
||||
|| storage.list(Some(&prefix), ListingMode::WithDelimiter, None, &cancel),
|
||||
&format!("list identifiers in prefix {prefix}"),
|
||||
&cancel,
|
||||
)
|
||||
.await?;
|
||||
|
||||
let mut timeline_ids = HashSet::new();
|
||||
let mut parsed_ids = HashSet::new();
|
||||
let mut other_prefixes = HashSet::new();
|
||||
|
||||
for timeline_remote_storage_key in listing.prefixes {
|
||||
let object_name = timeline_remote_storage_key.object_name().ok_or_else(|| {
|
||||
anyhow::anyhow!("failed to get timeline id for remote tenant {tenant_shard_id}")
|
||||
for id_remote_storage_key in listing.prefixes {
|
||||
let object_name = id_remote_storage_key.object_name().ok_or_else(|| {
|
||||
anyhow::anyhow!("failed to get object name for key {id_remote_storage_key}")
|
||||
})?;
|
||||
|
||||
match object_name.parse::<TimelineId>() {
|
||||
Ok(t) => timeline_ids.insert(t),
|
||||
match object_name.parse::<T>() {
|
||||
Ok(t) => parsed_ids.insert(t),
|
||||
Err(_) => other_prefixes.insert(object_name.to_string()),
|
||||
};
|
||||
}
|
||||
@@ -300,7 +290,31 @@ pub async fn list_remote_timelines(
|
||||
other_prefixes.insert(object_name.to_string());
|
||||
}
|
||||
|
||||
Ok((timeline_ids, other_prefixes))
|
||||
Ok((parsed_ids, other_prefixes))
|
||||
}
|
||||
|
||||
/// List shards of given tenant in remote storage
|
||||
pub(crate) async fn list_remote_tenant_shards(
|
||||
storage: &GenericRemoteStorage,
|
||||
tenant_id: TenantId,
|
||||
cancel: CancellationToken,
|
||||
) -> anyhow::Result<(HashSet<TenantShardId>, HashSet<String>)> {
|
||||
let remote_path = remote_tenant_path(&TenantShardId::unsharded(tenant_id));
|
||||
list_identifiers::<TenantShardId>(storage, remote_path, cancel).await
|
||||
}
|
||||
|
||||
/// List timelines of given tenant shard in remote storage
|
||||
pub async fn list_remote_timelines(
|
||||
storage: &GenericRemoteStorage,
|
||||
tenant_shard_id: TenantShardId,
|
||||
cancel: CancellationToken,
|
||||
) -> anyhow::Result<(HashSet<TimelineId>, HashSet<String>)> {
|
||||
fail::fail_point!("storage-sync-list-remote-timelines", |_| {
|
||||
anyhow::bail!("storage-sync-list-remote-timelines");
|
||||
});
|
||||
|
||||
let remote_path = remote_timelines_path(&tenant_shard_id).add_trailing_slash();
|
||||
list_identifiers::<TimelineId>(storage, remote_path, cancel).await
|
||||
}
|
||||
|
||||
async fn do_download_index_part(
|
||||
@@ -309,7 +323,7 @@ async fn do_download_index_part(
|
||||
timeline_id: &TimelineId,
|
||||
index_generation: Generation,
|
||||
cancel: &CancellationToken,
|
||||
) -> Result<IndexPart, DownloadError> {
|
||||
) -> Result<(IndexPart, Generation), DownloadError> {
|
||||
let remote_path = remote_index_path(tenant_shard_id, timeline_id, index_generation);
|
||||
|
||||
let index_part_bytes = download_retry_forever(
|
||||
@@ -334,7 +348,7 @@ async fn do_download_index_part(
|
||||
.with_context(|| format!("deserialize index part file at {remote_path:?}"))
|
||||
.map_err(DownloadError::Other)?;
|
||||
|
||||
Ok(index_part)
|
||||
Ok((index_part, index_generation))
|
||||
}
|
||||
|
||||
/// index_part.json objects are suffixed with a generation number, so we cannot
|
||||
@@ -343,13 +357,13 @@ async fn do_download_index_part(
|
||||
/// In this function we probe for the most recent index in a generation <= our current generation.
|
||||
/// See "Finding the remote indices for timelines" in docs/rfcs/025-generation-numbers.md
|
||||
#[tracing::instrument(skip_all, fields(generation=?my_generation))]
|
||||
pub(super) async fn download_index_part(
|
||||
pub(crate) async fn download_index_part(
|
||||
storage: &GenericRemoteStorage,
|
||||
tenant_shard_id: &TenantShardId,
|
||||
timeline_id: &TimelineId,
|
||||
my_generation: Generation,
|
||||
cancel: &CancellationToken,
|
||||
) -> Result<IndexPart, DownloadError> {
|
||||
) -> Result<(IndexPart, Generation), DownloadError> {
|
||||
debug_assert_current_span_has_tenant_and_timeline_id();
|
||||
|
||||
if my_generation.is_none() {
|
||||
|
||||
@@ -522,6 +522,18 @@ async fn handle_tenant_drop(req: Request<Body>) -> Result<Response<Body>, ApiErr
|
||||
json_response(StatusCode::OK, state.service.tenant_drop(tenant_id).await?)
|
||||
}
|
||||
|
||||
async fn handle_tenant_import(req: Request<Body>) -> Result<Response<Body>, ApiError> {
|
||||
let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
|
||||
check_permissions(&req, Scope::PageServerApi)?;
|
||||
|
||||
let state = get_state(&req);
|
||||
|
||||
json_response(
|
||||
StatusCode::OK,
|
||||
state.service.tenant_import(tenant_id).await?,
|
||||
)
|
||||
}
|
||||
|
||||
async fn handle_tenants_dump(req: Request<Body>) -> Result<Response<Body>, ApiError> {
|
||||
check_permissions(&req, Scope::Admin)?;
|
||||
|
||||
@@ -759,6 +771,13 @@ pub fn make_router(
|
||||
.post("/debug/v1/node/:node_id/drop", |r| {
|
||||
named_request_span(r, handle_node_drop, RequestName("debug_v1_node_drop"))
|
||||
})
|
||||
.post("/debug/v1/tenant/:tenant_id/import", |r| {
|
||||
named_request_span(
|
||||
r,
|
||||
handle_tenant_import,
|
||||
RequestName("debug_v1_tenant_import"),
|
||||
)
|
||||
})
|
||||
.get("/debug/v1/tenant", |r| {
|
||||
named_request_span(r, handle_tenants_dump, RequestName("debug_v1_tenant"))
|
||||
})
|
||||
|
||||
@@ -1,13 +1,14 @@
|
||||
use pageserver_api::{
|
||||
models::{
|
||||
LocationConfig, LocationConfigListResponse, PageserverUtilization, SecondaryProgress,
|
||||
TenantShardSplitRequest, TenantShardSplitResponse, TimelineCreateRequest, TimelineInfo,
|
||||
TenantScanRemoteStorageResponse, TenantShardSplitRequest, TenantShardSplitResponse,
|
||||
TimelineCreateRequest, TimelineInfo,
|
||||
},
|
||||
shard::TenantShardId,
|
||||
};
|
||||
use pageserver_client::mgmt_api::{Client, Result};
|
||||
use reqwest::StatusCode;
|
||||
use utils::id::{NodeId, TimelineId};
|
||||
use utils::id::{NodeId, TenantId, TimelineId};
|
||||
|
||||
/// Thin wrapper around [`pageserver_client::mgmt_api::Client`]. It allows the storage
|
||||
/// controller to collect metrics in a non-intrusive manner.
|
||||
@@ -88,6 +89,18 @@ impl PageserverClient {
|
||||
)
|
||||
}
|
||||
|
||||
pub(crate) async fn tenant_scan_remote_storage(
|
||||
&self,
|
||||
tenant_id: TenantId,
|
||||
) -> Result<TenantScanRemoteStorageResponse> {
|
||||
measured_request!(
|
||||
"tenant_scan_remote_storage",
|
||||
crate::metrics::Method::Get,
|
||||
&self.node_id_label,
|
||||
self.inner.tenant_scan_remote_storage(tenant_id).await
|
||||
)
|
||||
}
|
||||
|
||||
pub(crate) async fn tenant_secondary_download(
|
||||
&self,
|
||||
tenant_id: TenantShardId,
|
||||
|
||||
@@ -110,6 +110,42 @@ struct ServiceState {
|
||||
delayed_reconcile_rx: tokio::sync::mpsc::Receiver<TenantShardId>,
|
||||
}
|
||||
|
||||
/// Transform an error from a pageserver into an error to return to callers of a storage
|
||||
/// controller API.
|
||||
fn passthrough_api_error(node: &Node, e: mgmt_api::Error) -> ApiError {
|
||||
match e {
|
||||
mgmt_api::Error::ReceiveErrorBody(str) => {
|
||||
// Presume errors receiving body are connectivity/availability issues
|
||||
ApiError::ResourceUnavailable(
|
||||
format!("{node} error receiving error body: {str}").into(),
|
||||
)
|
||||
}
|
||||
mgmt_api::Error::ReceiveBody(str) => {
|
||||
// Presume errors receiving body are connectivity/availability issues
|
||||
ApiError::ResourceUnavailable(format!("{node} error receiving body: {str}").into())
|
||||
}
|
||||
mgmt_api::Error::ApiError(StatusCode::NOT_FOUND, msg) => {
|
||||
ApiError::NotFound(anyhow::anyhow!(format!("{node}: {msg}")).into())
|
||||
}
|
||||
mgmt_api::Error::ApiError(StatusCode::SERVICE_UNAVAILABLE, msg) => {
|
||||
ApiError::ResourceUnavailable(format!("{node}: {msg}").into())
|
||||
}
|
||||
mgmt_api::Error::ApiError(status @ StatusCode::UNAUTHORIZED, msg)
|
||||
| mgmt_api::Error::ApiError(status @ StatusCode::FORBIDDEN, msg) => {
|
||||
// Auth errors talking to a pageserver are not auth errors for the caller: they are
|
||||
// internal server errors, showing that something is wrong with the pageserver or
|
||||
// storage controller's auth configuration.
|
||||
ApiError::InternalServerError(anyhow::anyhow!("{node} {status}: {msg}"))
|
||||
}
|
||||
mgmt_api::Error::ApiError(status, msg) => {
|
||||
// Presume general case of pageserver API errors is that we tried to do something
|
||||
// that can't be done right now.
|
||||
ApiError::Conflict(format!("{node} {status}: {status} {msg}"))
|
||||
}
|
||||
mgmt_api::Error::Cancelled => ApiError::ShuttingDown,
|
||||
}
|
||||
}
|
||||
|
||||
impl ServiceState {
|
||||
fn new(
|
||||
nodes: HashMap<NodeId, Node>,
|
||||
@@ -2519,17 +2555,7 @@ impl Service {
|
||||
client
|
||||
.timeline_create(tenant_shard_id, &create_req)
|
||||
.await
|
||||
.map_err(|e| match e {
|
||||
mgmt_api::Error::ApiError(status, msg)
|
||||
if status == StatusCode::INTERNAL_SERVER_ERROR
|
||||
|| status == StatusCode::NOT_ACCEPTABLE =>
|
||||
{
|
||||
// TODO: handle more error codes, e.g. 503 should be passed through. Make a general wrapper
|
||||
// for pass-through API calls.
|
||||
ApiError::InternalServerError(anyhow::anyhow!(msg))
|
||||
}
|
||||
_ => ApiError::Conflict(format!("Failed to create timeline: {e}")),
|
||||
})
|
||||
.map_err(|e| passthrough_api_error(&node, e))
|
||||
}
|
||||
|
||||
// Because the caller might not provide an explicit LSN, we must do the creation first on a single shard, and then
|
||||
@@ -3654,6 +3680,88 @@ impl Service {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// This is for debug/support only: assuming tenant data is already present in S3, we "create" a
|
||||
/// tenant with a very high generation number so that it will see the existing data.
|
||||
pub(crate) async fn tenant_import(
|
||||
&self,
|
||||
tenant_id: TenantId,
|
||||
) -> Result<TenantCreateResponse, ApiError> {
|
||||
// Pick an arbitrary available pageserver to use for scanning the tenant in remote storage
|
||||
let maybe_node = {
|
||||
self.inner
|
||||
.read()
|
||||
.unwrap()
|
||||
.nodes
|
||||
.values()
|
||||
.find(|n| n.is_available())
|
||||
.cloned()
|
||||
};
|
||||
let Some(node) = maybe_node else {
|
||||
return Err(ApiError::BadRequest(anyhow::anyhow!("No nodes available")));
|
||||
};
|
||||
|
||||
let client = PageserverClient::new(
|
||||
node.get_id(),
|
||||
node.base_url(),
|
||||
self.config.jwt_token.as_deref(),
|
||||
);
|
||||
|
||||
let scan_result = client
|
||||
.tenant_scan_remote_storage(tenant_id)
|
||||
.await
|
||||
.map_err(|e| passthrough_api_error(&node, e))?;
|
||||
|
||||
// A post-split tenant may contain a mixture of shard counts in remote storage: pick the highest count.
|
||||
let Some(shard_count) = scan_result
|
||||
.shards
|
||||
.iter()
|
||||
.map(|s| s.tenant_shard_id.shard_count)
|
||||
.max()
|
||||
else {
|
||||
return Err(ApiError::NotFound(
|
||||
anyhow::anyhow!("No shards found").into(),
|
||||
));
|
||||
};
|
||||
|
||||
// Ideally we would set each newly imported shard's generation independently, but for correctness it is sufficient
|
||||
// to
|
||||
let generation = scan_result
|
||||
.shards
|
||||
.iter()
|
||||
.map(|s| s.generation)
|
||||
.max()
|
||||
.expect("We already validated >0 shards");
|
||||
|
||||
// FIXME: we have no way to recover the shard stripe size from contents of remote storage: this will
|
||||
// only work if they were using the default stripe size.
|
||||
let stripe_size = ShardParameters::DEFAULT_STRIPE_SIZE;
|
||||
|
||||
let (response, waiters) = self
|
||||
.do_tenant_create(TenantCreateRequest {
|
||||
new_tenant_id: TenantShardId::unsharded(tenant_id),
|
||||
generation,
|
||||
|
||||
shard_parameters: ShardParameters {
|
||||
count: shard_count,
|
||||
stripe_size,
|
||||
},
|
||||
placement_policy: Some(PlacementPolicy::Attached(0)), // No secondaries, for convenient debug/hacking
|
||||
|
||||
// There is no way to know what the tenant's config was: revert to defaults
|
||||
config: TenantConfig::default(),
|
||||
})
|
||||
.await?;
|
||||
|
||||
if let Err(e) = self.await_waiters(waiters, SHORT_RECONCILE_TIMEOUT).await {
|
||||
// Since this is a debug/support operation, all kinds of weird issues are possible (e.g. this
|
||||
// tenant doesn't exist in the control plane), so don't fail the request if it can't fully
|
||||
// reconcile, as reconciliation includes notifying compute.
|
||||
tracing::warn!(%tenant_id, "Reconcile not done yet while importing tenant ({e})");
|
||||
}
|
||||
|
||||
Ok(response)
|
||||
}
|
||||
|
||||
/// For debug/support: a full JSON dump of TenantShards. Returns a response so that
|
||||
/// we don't have to make TenantShard clonable in the return path.
|
||||
pub(crate) fn tenants_dump(&self) -> Result<hyper::Response<hyper::Body>, ApiError> {
|
||||
|
||||
@@ -1575,6 +1575,11 @@ class NeonCli(AbstractNeonCli):
|
||||
res.check_returncode()
|
||||
return tenant_id, timeline_id
|
||||
|
||||
def import_tenant(self, tenant_id: TenantId):
|
||||
args = ["tenant", "import", "--tenant-id", str(tenant_id)]
|
||||
res = self.raw_cli(args)
|
||||
res.check_returncode()
|
||||
|
||||
def set_default(self, tenant_id: TenantId):
|
||||
"""
|
||||
Update default tenant for future operations that require tenant_id.
|
||||
@@ -2207,6 +2212,13 @@ class NeonStorageController(MetricsGetter):
|
||||
headers=self.headers(TokenScope.ADMIN),
|
||||
)
|
||||
|
||||
def tenant_import(self, tenant_id: TenantId):
|
||||
self.request(
|
||||
"POST",
|
||||
f"{self.env.storage_controller_api}/debug/v1/tenant/{tenant_id}/import",
|
||||
headers=self.headers(TokenScope.ADMIN),
|
||||
)
|
||||
|
||||
def reconcile_all(self):
|
||||
r = self.request(
|
||||
"POST",
|
||||
|
||||
@@ -26,6 +26,7 @@ from fixtures.pg_version import PgVersion
|
||||
from fixtures.remote_storage import RemoteStorageKind, s3_storage
|
||||
from fixtures.types import TenantId, TenantShardId, TimelineId
|
||||
from fixtures.utils import run_pg_bench_small, subprocess_capture, wait_until
|
||||
from fixtures.workload import Workload
|
||||
from mypy_boto3_s3.type_defs import (
|
||||
ObjectTypeDef,
|
||||
)
|
||||
@@ -1256,3 +1257,85 @@ def test_storcon_cli(neon_env_builder: NeonEnvBuilder):
|
||||
# Quiesce any background reconciliation before doing consistency check
|
||||
env.storage_controller.reconcile_until_idle(timeout_secs=10)
|
||||
env.storage_controller.consistency_check()
|
||||
|
||||
|
||||
@pytest.mark.parametrize("remote_storage", [RemoteStorageKind.LOCAL_FS, s3_storage()])
|
||||
@pytest.mark.parametrize("shard_count", [None, 4])
|
||||
def test_tenant_import(neon_env_builder: NeonEnvBuilder, shard_count, remote_storage):
|
||||
"""
|
||||
Tenant import is a support/debug tool for recovering a tenant from remote storage
|
||||
if we don't have any metadata for it in the storage controller.
|
||||
"""
|
||||
|
||||
# This test is parametrized on remote storage because it exercises the relatively rare
|
||||
# code path of listing with a prefix that is not a directory name: this helps us notice
|
||||
# quickly if local_fs or s3_bucket implementations diverge.
|
||||
neon_env_builder.enable_pageserver_remote_storage(remote_storage)
|
||||
|
||||
# Use multiple pageservers because some test helpers assume single sharded tenants
|
||||
# if there is only one pageserver.
|
||||
neon_env_builder.num_pageservers = 2
|
||||
|
||||
env = neon_env_builder.init_start(initial_tenant_shard_count=shard_count)
|
||||
tenant_id = env.initial_tenant
|
||||
|
||||
# Create a second timeline to ensure that import finds both
|
||||
timeline_a = env.initial_timeline
|
||||
timeline_b = env.neon_cli.create_branch("branch_b", tenant_id=tenant_id)
|
||||
|
||||
workload_a = Workload(env, tenant_id, timeline_a, branch_name="main")
|
||||
workload_a.init()
|
||||
|
||||
workload_b = Workload(env, tenant_id, timeline_b, branch_name="branch_b")
|
||||
workload_b.init()
|
||||
|
||||
# Write some data
|
||||
workload_a.write_rows(72)
|
||||
expect_rows_a = workload_a.expect_rows
|
||||
workload_a.stop()
|
||||
del workload_a
|
||||
|
||||
# Bump generation to make sure generation recovery works properly
|
||||
for pageserver in env.pageservers:
|
||||
pageserver.stop()
|
||||
pageserver.start()
|
||||
|
||||
# Write some data in the higher generation into the other branch
|
||||
workload_b.write_rows(107)
|
||||
expect_rows_b = workload_b.expect_rows
|
||||
workload_b.stop()
|
||||
del workload_b
|
||||
|
||||
# Detach from pageservers
|
||||
env.storage_controller.tenant_policy_update(
|
||||
tenant_id,
|
||||
{
|
||||
"placement": "Detached",
|
||||
},
|
||||
)
|
||||
env.storage_controller.reconcile_until_idle(timeout_secs=10)
|
||||
|
||||
# Force-drop it from the storage controller
|
||||
env.storage_controller.request(
|
||||
"POST",
|
||||
f"{env.storage_controller_api}/debug/v1/tenant/{tenant_id}/drop",
|
||||
headers=env.storage_controller.headers(TokenScope.ADMIN),
|
||||
)
|
||||
|
||||
# Now import it again
|
||||
env.neon_cli.import_tenant(tenant_id)
|
||||
|
||||
# Check we found the shards
|
||||
describe = env.storage_controller.tenant_describe(tenant_id)
|
||||
literal_shard_count = 1 if shard_count is None else shard_count
|
||||
assert len(describe["shards"]) == literal_shard_count
|
||||
|
||||
# Check the data is still there: this implicitly proves that we recovered generation numbers
|
||||
# properly, for the timeline which was written to after a generation bump.
|
||||
for timeline, branch, expect_rows in [
|
||||
(timeline_a, "main", expect_rows_a),
|
||||
(timeline_b, "branch_1", expect_rows_b),
|
||||
]:
|
||||
workload = Workload(env, tenant_id, timeline, branch_name=branch)
|
||||
workload.expect_rows = expect_rows
|
||||
workload.validate()
|
||||
|
||||
@@ -132,7 +132,7 @@ def test_tenant_reattach(neon_env_builder: NeonEnvBuilder, mode: str):
|
||||
assert query_scalar(cur, "SELECT count(*) FROM t") == 100000
|
||||
|
||||
# Check that we had to retry the downloads
|
||||
assert env.pageserver.log_contains(".*list timelines.*failed, will retry.*")
|
||||
assert env.pageserver.log_contains(".*list identifiers.*failed, will retry.*")
|
||||
assert env.pageserver.log_contains(".*download.*failed, will retry.*")
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user