controller: scan remote storage for import

This commit is contained in:
John Spray
2024-04-17 21:20:22 +01:00
parent 695e5b9fce
commit fadce9723d
3 changed files with 119 additions and 22 deletions

View File

@@ -441,11 +441,20 @@ async fn handle_tenant(
.list_timelines(shard_zero.shard_id)
.await?;
for (i, timeline) in timelines.iter().enumerate() {
let branch_name = if i == 0 {
// Pick a 'main' timeline that has no ancestors, the rest will get arbitrary names
let main_timeline = timelines
.iter()
.find(|t| t.ancestor_timeline_id.is_none())
.expect("No timelines found")
.timeline_id;
let mut branch_i = 0;
for timeline in timelines.iter() {
let branch_name = if timeline.timeline_id == main_timeline {
"main".to_string()
} else {
format!("branch_{i}")
branch_i += 1;
format!("branch_{branch_i}")
};
println!(

View File

@@ -1,13 +1,14 @@
use pageserver_api::{
models::{
LocationConfig, LocationConfigListResponse, PageserverUtilization, SecondaryProgress,
TenantShardSplitRequest, TenantShardSplitResponse, TimelineCreateRequest, TimelineInfo,
TenantScanRemoteStorageResponse, TenantShardSplitRequest, TenantShardSplitResponse,
TimelineCreateRequest, TimelineInfo,
},
shard::TenantShardId,
};
use pageserver_client::mgmt_api::{Client, Result};
use reqwest::StatusCode;
use utils::id::{NodeId, TimelineId};
use utils::id::{NodeId, TenantId, TimelineId};
/// Thin wrapper around [`pageserver_client::mgmt_api::Client`]. It allows the storage
/// controller to collect metrics in a non-intrusive manner.
@@ -88,6 +89,18 @@ impl PageserverClient {
)
}
pub(crate) async fn tenant_scan_remote_storage(
&self,
tenant_id: TenantId,
) -> Result<TenantScanRemoteStorageResponse> {
measured_request!(
"tenant_scan_remote_storage",
crate::metrics::Method::Get,
&self.node_id_label,
self.inner.tenant_scan_remote_storage(tenant_id).await
)
}
pub(crate) async fn tenant_secondary_download(
&self,
tenant_id: TenantShardId,

View File

@@ -99,6 +99,42 @@ struct ServiceState {
scheduler: Scheduler,
}
/// Transform an error from a pageserver into an error to return to callers of a storage
/// controller API.
fn passthrough_api_error(node: &Node, e: mgmt_api::Error) -> ApiError {
match e {
mgmt_api::Error::ReceiveErrorBody(str) => {
// Presume errors receiving body are connectivity/availability issues
ApiError::ResourceUnavailable(
format!("{node} error receiving error body: {str}").into(),
)
}
mgmt_api::Error::ReceiveBody(str) => {
// Presume errors receiving body are connectivity/availability issues
ApiError::ResourceUnavailable(format!("{node} error receiving body: {str}").into())
}
mgmt_api::Error::ApiError(StatusCode::NOT_FOUND, msg) => {
ApiError::NotFound(anyhow::anyhow!(format!("{node}: {msg}")).into())
}
mgmt_api::Error::ApiError(StatusCode::SERVICE_UNAVAILABLE, msg) => {
ApiError::ResourceUnavailable(format!("{node}: {msg}").into())
}
mgmt_api::Error::ApiError(status @ StatusCode::UNAUTHORIZED, msg)
| mgmt_api::Error::ApiError(status @ StatusCode::FORBIDDEN, msg) => {
// Auth errors talking to a pageserver are not auth errors for the caller: they are
// internal server errors, showing that something is wrong with the pageserver or
// storage controller's auth configuration.
ApiError::InternalServerError(anyhow::anyhow!("{node} {status}: {msg}"))
}
mgmt_api::Error::ApiError(status, msg) => {
// Presume general case of pageserver API errors is that we tried to do something
// that can't be done right now.
ApiError::Conflict(format!("{node} {status}: {status} {msg}"))
}
mgmt_api::Error::Cancelled => ApiError::ShuttingDown,
}
}
impl ServiceState {
fn new(
nodes: HashMap<NodeId, Node>,
@@ -2467,17 +2503,7 @@ impl Service {
client
.timeline_create(tenant_shard_id, &create_req)
.await
.map_err(|e| match e {
mgmt_api::Error::ApiError(status, msg)
if status == StatusCode::INTERNAL_SERVER_ERROR
|| status == StatusCode::NOT_ACCEPTABLE =>
{
// TODO: handle more error codes, e.g. 503 should be passed through. Make a general wrapper
// for pass-through API calls.
ApiError::InternalServerError(anyhow::anyhow!(msg))
}
_ => ApiError::Conflict(format!("Failed to create timeline: {e}")),
})
.map_err(|e| passthrough_api_error(&node, e))
}
// Because the caller might not provide an explicit LSN, we must do the creation first on a single shard, and then
@@ -3601,16 +3627,65 @@ impl Service {
&self,
tenant_id: TenantId,
) -> Result<TenantCreateResponse, ApiError> {
// Pick an arbitrary available pageserver to use for scanning the tenant in remote storage
let maybe_node = {
self.inner
.read()
.unwrap()
.nodes
.values()
.find(|n| n.is_available())
.cloned()
};
let Some(node) = maybe_node else {
return Err(ApiError::BadRequest(anyhow::anyhow!("No nodes available")));
};
let client = PageserverClient::new(
node.get_id(),
node.base_url(),
self.config.jwt_token.as_deref(),
);
let scan_result = client
.tenant_scan_remote_storage(tenant_id)
.await
.map_err(|e| passthrough_api_error(&node, e))?;
// A post-split tenant may contain a mixture of shard counts in remote storage: pick the highest count.
let Some(shard_count) = scan_result
.shards
.iter()
.map(|s| s.tenant_shard_id.shard_count)
.max()
else {
return Err(ApiError::NotFound(
anyhow::anyhow!("No shards found").into(),
));
};
// Ideally we would set each newly imported shard's generation independently, but for correctness it is sufficient
// to
let generation = scan_result
.shards
.iter()
.map(|s| s.generation)
.max()
.expect("We already validated >0 shards");
// FIXME: we have no way to recover the shard stripe size from contents of remote storage: this will
// only work if they were using the default stripe size.
let stripe_size = ShardParameters::DEFAULT_STRIPE_SIZE;
let (response, waiters) = self
.do_tenant_create(TenantCreateRequest {
new_tenant_id: TenantShardId::unsharded(tenant_id),
// A sufficiently high generation is de-facto guaranteed to be high enough to see any
// indices in S3 (unless this tenant was at some point in the past recovered via this path).
// TODO: we should really probe remote storage to learn the generation, so that we don't
// eat a large swath of the generation number space in an irreversible way.
generation: Some(0x3fffffff),
generation,
shard_parameters: ShardParameters::default(),
shard_parameters: ShardParameters {
count: shard_count,
stripe_size,
},
placement_policy: Some(PlacementPolicy::Attached(0)), // No secondaries, for convenient debug/hacking
// There is no way to know what the tenant's config was: revert to defaults