mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-07 05:22:56 +00:00
storage controller: issue timeline create/delete calls concurrently (#6827)
## Problem Timeline creation is meant to be very fast: it should only take approximately on S3 PUT latency. When we have many shards in a tenant, we should preserve that responsiveness. ## Summary of changes - Issue create/delete pageserver API calls concurrently across all >0 shards - During tenant deletion, delete shard zero last, separately, to avoid confusing anything using GETs on the timeline. - Return 201 instead of 200 on creations to make cloud control plane happy --------- Co-authored-by: Arpad Müller <arpad-m@users.noreply.github.com>
This commit is contained in:
@@ -114,7 +114,10 @@ async fn handle_tenant_create(
|
||||
mut req: Request<Body>,
|
||||
) -> Result<Response<Body>, ApiError> {
|
||||
let create_req = json_request::<TenantCreateRequest>(&mut req).await?;
|
||||
json_response(StatusCode::OK, service.tenant_create(create_req).await?)
|
||||
json_response(
|
||||
StatusCode::CREATED,
|
||||
service.tenant_create(create_req).await?,
|
||||
)
|
||||
}
|
||||
|
||||
// For tenant and timeline deletions, which both implement an "initially return 202, then 404 once
|
||||
@@ -196,7 +199,7 @@ async fn handle_tenant_timeline_create(
|
||||
let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
|
||||
let create_req = json_request::<TimelineCreateRequest>(&mut req).await?;
|
||||
json_response(
|
||||
StatusCode::OK,
|
||||
StatusCode::CREATED,
|
||||
service
|
||||
.tenant_timeline_create(tenant_id, create_req)
|
||||
.await?,
|
||||
|
||||
@@ -14,7 +14,7 @@ use control_plane::attachment_service::{
|
||||
TenantShardMigrateRequest, TenantShardMigrateResponse,
|
||||
};
|
||||
use diesel::result::DatabaseErrorKind;
|
||||
use futures::StreamExt;
|
||||
use futures::{stream::FuturesUnordered, StreamExt};
|
||||
use hyper::StatusCode;
|
||||
use pageserver_api::{
|
||||
control_api::{
|
||||
@@ -1287,8 +1287,6 @@ impl Service {
|
||||
tenant_id: TenantId,
|
||||
mut create_req: TimelineCreateRequest,
|
||||
) -> Result<TimelineInfo, ApiError> {
|
||||
let mut timeline_info = None;
|
||||
|
||||
tracing::info!(
|
||||
"Creating timeline {}/{}",
|
||||
tenant_id,
|
||||
@@ -1299,7 +1297,7 @@ impl Service {
|
||||
|
||||
// TODO: refuse to do this if shard splitting is in progress
|
||||
// (https://github.com/neondatabase/neon/issues/6676)
|
||||
let targets = {
|
||||
let mut targets = {
|
||||
let locked = self.inner.read().unwrap();
|
||||
let mut targets = Vec::new();
|
||||
|
||||
@@ -1323,21 +1321,24 @@ impl Service {
|
||||
return Err(ApiError::NotFound(
|
||||
anyhow::anyhow!("Tenant not found").into(),
|
||||
));
|
||||
}
|
||||
|
||||
for (tenant_shard_id, node) in targets {
|
||||
// TODO: issue shard timeline creates in parallel, once the 0th is done.
|
||||
|
||||
let client = mgmt_api::Client::new(node.base_url(), self.config.jwt_token.as_deref());
|
||||
};
|
||||
let shard_zero = targets.remove(0);
|
||||
|
||||
async fn create_one(
|
||||
tenant_shard_id: TenantShardId,
|
||||
node: Node,
|
||||
jwt: Option<String>,
|
||||
create_req: TimelineCreateRequest,
|
||||
) -> Result<TimelineInfo, ApiError> {
|
||||
tracing::info!(
|
||||
"Creating timeline on shard {}/{}, attached to node {}",
|
||||
tenant_shard_id,
|
||||
create_req.new_timeline_id,
|
||||
node.id
|
||||
);
|
||||
let client = mgmt_api::Client::new(node.base_url(), jwt.as_deref());
|
||||
|
||||
let shard_timeline_info = client
|
||||
client
|
||||
.timeline_create(tenant_shard_id, &create_req)
|
||||
.await
|
||||
.map_err(|e| match e {
|
||||
@@ -1350,23 +1351,66 @@ impl Service {
|
||||
ApiError::InternalServerError(anyhow::anyhow!(msg))
|
||||
}
|
||||
_ => ApiError::Conflict(format!("Failed to create timeline: {e}")),
|
||||
})?;
|
||||
|
||||
if timeline_info.is_none() {
|
||||
// If the caller specified an ancestor but no ancestor LSN, we are responsible for
|
||||
// propagating the LSN chosen by the first shard to the other shards: it is important
|
||||
// that all shards end up with the same ancestor_start_lsn.
|
||||
if create_req.ancestor_timeline_id.is_some()
|
||||
&& create_req.ancestor_start_lsn.is_none()
|
||||
{
|
||||
create_req.ancestor_start_lsn = shard_timeline_info.ancestor_lsn;
|
||||
}
|
||||
|
||||
// We will return the TimelineInfo from the first shard
|
||||
timeline_info = Some(shard_timeline_info);
|
||||
}
|
||||
})
|
||||
}
|
||||
Ok(timeline_info.expect("targets cannot be empty"))
|
||||
|
||||
// Because the caller might not provide an explicit LSN, we must do the creation first on a single shard, and then
|
||||
// use whatever LSN that shard picked when creating on subsequent shards. We arbitrarily use shard zero as the shard
|
||||
// that will get the first creation request, and propagate the LSN to all the >0 shards.
|
||||
let timeline_info = create_one(
|
||||
shard_zero.0,
|
||||
shard_zero.1,
|
||||
self.config.jwt_token.clone(),
|
||||
create_req.clone(),
|
||||
)
|
||||
.await?;
|
||||
|
||||
// Propagate the LSN that shard zero picked, if caller didn't provide one
|
||||
if create_req.ancestor_timeline_id.is_some() && create_req.ancestor_start_lsn.is_none() {
|
||||
create_req.ancestor_start_lsn = timeline_info.ancestor_lsn;
|
||||
}
|
||||
|
||||
// Create timeline on remaining shards with number >0
|
||||
if !targets.is_empty() {
|
||||
// If we had multiple shards, issue requests for the remainder now.
|
||||
let jwt = self.config.jwt_token.clone();
|
||||
self.tenant_for_shards(targets, |tenant_shard_id: TenantShardId, node: Node| {
|
||||
let create_req = create_req.clone();
|
||||
Box::pin(create_one(tenant_shard_id, node, jwt.clone(), create_req))
|
||||
})
|
||||
.await?;
|
||||
}
|
||||
|
||||
Ok(timeline_info)
|
||||
}
|
||||
|
||||
/// Helper for concurrently calling a pageserver API on a number of shards, such as timeline creation.
|
||||
///
|
||||
/// On success, the returned vector contains exactly the same number of elements as the input `locations`.
|
||||
async fn tenant_for_shards<F, R>(
|
||||
&self,
|
||||
locations: Vec<(TenantShardId, Node)>,
|
||||
mut req_fn: F,
|
||||
) -> Result<Vec<R>, ApiError>
|
||||
where
|
||||
F: FnMut(
|
||||
TenantShardId,
|
||||
Node,
|
||||
)
|
||||
-> std::pin::Pin<Box<dyn futures::Future<Output = Result<R, ApiError>> + Send>>,
|
||||
{
|
||||
let mut futs = FuturesUnordered::new();
|
||||
let mut results = Vec::with_capacity(locations.len());
|
||||
|
||||
for (tenant_shard_id, node) in locations {
|
||||
futs.push(req_fn(tenant_shard_id, node));
|
||||
}
|
||||
|
||||
while let Some(r) = futs.next().await {
|
||||
results.push(r?);
|
||||
}
|
||||
|
||||
Ok(results)
|
||||
}
|
||||
|
||||
pub(crate) async fn tenant_timeline_delete(
|
||||
@@ -1380,7 +1424,7 @@ impl Service {
|
||||
|
||||
// TODO: refuse to do this if shard splitting is in progress
|
||||
// (https://github.com/neondatabase/neon/issues/6676)
|
||||
let targets = {
|
||||
let mut targets = {
|
||||
let locked = self.inner.read().unwrap();
|
||||
let mut targets = Vec::new();
|
||||
|
||||
@@ -1405,12 +1449,14 @@ impl Service {
|
||||
anyhow::anyhow!("Tenant not found").into(),
|
||||
));
|
||||
}
|
||||
let shard_zero = targets.remove(0);
|
||||
|
||||
// TODO: call into shards concurrently
|
||||
let mut any_pending = false;
|
||||
for (tenant_shard_id, node) in targets {
|
||||
let client = mgmt_api::Client::new(node.base_url(), self.config.jwt_token.as_deref());
|
||||
|
||||
async fn delete_one(
|
||||
tenant_shard_id: TenantShardId,
|
||||
timeline_id: TimelineId,
|
||||
node: Node,
|
||||
jwt: Option<String>,
|
||||
) -> Result<StatusCode, ApiError> {
|
||||
tracing::info!(
|
||||
"Deleting timeline on shard {}/{}, attached to node {}",
|
||||
tenant_shard_id,
|
||||
@@ -1418,7 +1464,8 @@ impl Service {
|
||||
node.id
|
||||
);
|
||||
|
||||
let status = client
|
||||
let client = mgmt_api::Client::new(node.base_url(), jwt.as_deref());
|
||||
client
|
||||
.timeline_delete(tenant_shard_id, timeline_id)
|
||||
.await
|
||||
.map_err(|e| {
|
||||
@@ -1426,18 +1473,36 @@ impl Service {
|
||||
"Error deleting timeline {timeline_id} on {tenant_shard_id} on node {}: {e}",
|
||||
node.id
|
||||
))
|
||||
})?;
|
||||
|
||||
if status == StatusCode::ACCEPTED {
|
||||
any_pending = true;
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
if any_pending {
|
||||
Ok(StatusCode::ACCEPTED)
|
||||
} else {
|
||||
Ok(StatusCode::NOT_FOUND)
|
||||
let statuses = self
|
||||
.tenant_for_shards(targets, |tenant_shard_id: TenantShardId, node: Node| {
|
||||
Box::pin(delete_one(
|
||||
tenant_shard_id,
|
||||
timeline_id,
|
||||
node,
|
||||
self.config.jwt_token.clone(),
|
||||
))
|
||||
})
|
||||
.await?;
|
||||
|
||||
// If any shards >0 haven't finished deletion yet, don't start deletion on shard zero
|
||||
if statuses.iter().any(|s| s != &StatusCode::NOT_FOUND) {
|
||||
return Ok(StatusCode::ACCEPTED);
|
||||
}
|
||||
|
||||
// Delete shard zero last: this is not strictly necessary, but since a caller's GET on a timeline will be routed
|
||||
// to shard zero, it gives a more obvious behavior that a GET returns 404 once the deletion is done.
|
||||
let shard_zero_status = delete_one(
|
||||
shard_zero.0,
|
||||
timeline_id,
|
||||
shard_zero.1,
|
||||
self.config.jwt_token.clone(),
|
||||
)
|
||||
.await?;
|
||||
|
||||
Ok(shard_zero_status)
|
||||
}
|
||||
|
||||
/// When you need to send an HTTP request to the pageserver that holds shard0 of a tenant, this
|
||||
|
||||
@@ -180,7 +180,7 @@ pub enum TimelineState {
|
||||
Broken { reason: String, backtrace: String },
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
#[derive(Serialize, Deserialize, Clone)]
|
||||
pub struct TimelineCreateRequest {
|
||||
pub new_timeline_id: TimelineId,
|
||||
#[serde(default)]
|
||||
|
||||
Reference in New Issue
Block a user