diff --git a/control_plane/attachment_service/src/http.rs b/control_plane/attachment_service/src/http.rs
index d6c8fa084b..67ab37dfc1 100644
--- a/control_plane/attachment_service/src/http.rs
+++ b/control_plane/attachment_service/src/http.rs
@@ -114,7 +114,10 @@ async fn handle_tenant_create(
mut req: Request
,
) -> Result, ApiError> {
let create_req = json_request::(&mut req).await?;
- json_response(StatusCode::OK, service.tenant_create(create_req).await?)
+ json_response(
+ StatusCode::CREATED,
+ service.tenant_create(create_req).await?,
+ )
}
// For tenant and timeline deletions, which both implement an "initially return 202, then 404 once
@@ -196,7 +199,7 @@ async fn handle_tenant_timeline_create(
let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
let create_req = json_request::(&mut req).await?;
json_response(
- StatusCode::OK,
+ StatusCode::CREATED,
service
.tenant_timeline_create(tenant_id, create_req)
.await?,
diff --git a/control_plane/attachment_service/src/service.rs b/control_plane/attachment_service/src/service.rs
index 0fe758e731..4082af3fe6 100644
--- a/control_plane/attachment_service/src/service.rs
+++ b/control_plane/attachment_service/src/service.rs
@@ -14,7 +14,7 @@ use control_plane::attachment_service::{
TenantShardMigrateRequest, TenantShardMigrateResponse,
};
use diesel::result::DatabaseErrorKind;
-use futures::StreamExt;
+use futures::{stream::FuturesUnordered, StreamExt};
use hyper::StatusCode;
use pageserver_api::{
control_api::{
@@ -1287,8 +1287,6 @@ impl Service {
tenant_id: TenantId,
mut create_req: TimelineCreateRequest,
) -> Result {
- let mut timeline_info = None;
-
tracing::info!(
"Creating timeline {}/{}",
tenant_id,
@@ -1299,7 +1297,7 @@ impl Service {
// TODO: refuse to do this if shard splitting is in progress
// (https://github.com/neondatabase/neon/issues/6676)
- let targets = {
+ let mut targets = {
let locked = self.inner.read().unwrap();
let mut targets = Vec::new();
@@ -1323,21 +1321,24 @@ impl Service {
return Err(ApiError::NotFound(
anyhow::anyhow!("Tenant not found").into(),
));
- }
-
- for (tenant_shard_id, node) in targets {
- // TODO: issue shard timeline creates in parallel, once the 0th is done.
-
- let client = mgmt_api::Client::new(node.base_url(), self.config.jwt_token.as_deref());
+ };
+ let shard_zero = targets.remove(0);
+ async fn create_one(
+ tenant_shard_id: TenantShardId,
+ node: Node,
+ jwt: Option,
+ create_req: TimelineCreateRequest,
+ ) -> Result {
tracing::info!(
"Creating timeline on shard {}/{}, attached to node {}",
tenant_shard_id,
create_req.new_timeline_id,
node.id
);
+ let client = mgmt_api::Client::new(node.base_url(), jwt.as_deref());
- let shard_timeline_info = client
+ client
.timeline_create(tenant_shard_id, &create_req)
.await
.map_err(|e| match e {
@@ -1350,23 +1351,66 @@ impl Service {
ApiError::InternalServerError(anyhow::anyhow!(msg))
}
_ => ApiError::Conflict(format!("Failed to create timeline: {e}")),
- })?;
-
- if timeline_info.is_none() {
- // If the caller specified an ancestor but no ancestor LSN, we are responsible for
- // propagating the LSN chosen by the first shard to the other shards: it is important
- // that all shards end up with the same ancestor_start_lsn.
- if create_req.ancestor_timeline_id.is_some()
- && create_req.ancestor_start_lsn.is_none()
- {
- create_req.ancestor_start_lsn = shard_timeline_info.ancestor_lsn;
- }
-
- // We will return the TimelineInfo from the first shard
- timeline_info = Some(shard_timeline_info);
- }
+ })
}
- Ok(timeline_info.expect("targets cannot be empty"))
+
+ // Because the caller might not provide an explicit LSN, we must do the creation first on a single shard, and then
+ // use whatever LSN that shard picked when creating on subsequent shards. We arbitrarily use shard zero as the shard
+ // that will get the first creation request, and propagate the LSN to all the >0 shards.
+ let timeline_info = create_one(
+ shard_zero.0,
+ shard_zero.1,
+ self.config.jwt_token.clone(),
+ create_req.clone(),
+ )
+ .await?;
+
+ // Propagate the LSN that shard zero picked, if caller didn't provide one
+ if create_req.ancestor_timeline_id.is_some() && create_req.ancestor_start_lsn.is_none() {
+ create_req.ancestor_start_lsn = timeline_info.ancestor_lsn;
+ }
+
+ // Create timeline on remaining shards with number >0
+ if !targets.is_empty() {
+ // If we had multiple shards, issue requests for the remainder now.
+ let jwt = self.config.jwt_token.clone();
+ self.tenant_for_shards(targets, |tenant_shard_id: TenantShardId, node: Node| {
+ let create_req = create_req.clone();
+ Box::pin(create_one(tenant_shard_id, node, jwt.clone(), create_req))
+ })
+ .await?;
+ }
+
+ Ok(timeline_info)
+ }
+
+ /// Helper for concurrently calling a pageserver API on a number of shards, such as timeline creation.
+ ///
+ /// On success, the returned vector contains exactly the same number of elements as the input `locations`.
+ async fn tenant_for_shards(
+ &self,
+ locations: Vec<(TenantShardId, Node)>,
+ mut req_fn: F,
+ ) -> Result, ApiError>
+ where
+ F: FnMut(
+ TenantShardId,
+ Node,
+ )
+ -> std::pin::Pin> + Send>>,
+ {
+ let mut futs = FuturesUnordered::new();
+ let mut results = Vec::with_capacity(locations.len());
+
+ for (tenant_shard_id, node) in locations {
+ futs.push(req_fn(tenant_shard_id, node));
+ }
+
+ while let Some(r) = futs.next().await {
+ results.push(r?);
+ }
+
+ Ok(results)
}
pub(crate) async fn tenant_timeline_delete(
@@ -1380,7 +1424,7 @@ impl Service {
// TODO: refuse to do this if shard splitting is in progress
// (https://github.com/neondatabase/neon/issues/6676)
- let targets = {
+ let mut targets = {
let locked = self.inner.read().unwrap();
let mut targets = Vec::new();
@@ -1405,12 +1449,14 @@ impl Service {
anyhow::anyhow!("Tenant not found").into(),
));
}
+ let shard_zero = targets.remove(0);
- // TODO: call into shards concurrently
- let mut any_pending = false;
- for (tenant_shard_id, node) in targets {
- let client = mgmt_api::Client::new(node.base_url(), self.config.jwt_token.as_deref());
-
+ async fn delete_one(
+ tenant_shard_id: TenantShardId,
+ timeline_id: TimelineId,
+ node: Node,
+ jwt: Option,
+ ) -> Result {
tracing::info!(
"Deleting timeline on shard {}/{}, attached to node {}",
tenant_shard_id,
@@ -1418,7 +1464,8 @@ impl Service {
node.id
);
- let status = client
+ let client = mgmt_api::Client::new(node.base_url(), jwt.as_deref());
+ client
.timeline_delete(tenant_shard_id, timeline_id)
.await
.map_err(|e| {
@@ -1426,18 +1473,36 @@ impl Service {
"Error deleting timeline {timeline_id} on {tenant_shard_id} on node {}: {e}",
node.id
))
- })?;
-
- if status == StatusCode::ACCEPTED {
- any_pending = true;
- }
+ })
}
- if any_pending {
- Ok(StatusCode::ACCEPTED)
- } else {
- Ok(StatusCode::NOT_FOUND)
+ let statuses = self
+ .tenant_for_shards(targets, |tenant_shard_id: TenantShardId, node: Node| {
+ Box::pin(delete_one(
+ tenant_shard_id,
+ timeline_id,
+ node,
+ self.config.jwt_token.clone(),
+ ))
+ })
+ .await?;
+
+ // If any shards >0 haven't finished deletion yet, don't start deletion on shard zero
+ if statuses.iter().any(|s| s != &StatusCode::NOT_FOUND) {
+ return Ok(StatusCode::ACCEPTED);
}
+
+ // Delete shard zero last: this is not strictly necessary, but since a caller's GET on a timeline will be routed
+ // to shard zero, it gives a more obvious behavior that a GET returns 404 once the deletion is done.
+ let shard_zero_status = delete_one(
+ shard_zero.0,
+ timeline_id,
+ shard_zero.1,
+ self.config.jwt_token.clone(),
+ )
+ .await?;
+
+ Ok(shard_zero_status)
}
/// When you need to send an HTTP request to the pageserver that holds shard0 of a tenant, this
diff --git a/libs/pageserver_api/src/models.rs b/libs/pageserver_api/src/models.rs
index d546cb5c54..557a4d7de9 100644
--- a/libs/pageserver_api/src/models.rs
+++ b/libs/pageserver_api/src/models.rs
@@ -180,7 +180,7 @@ pub enum TimelineState {
Broken { reason: String, backtrace: String },
}
-#[derive(Serialize, Deserialize)]
+#[derive(Serialize, Deserialize, Clone)]
pub struct TimelineCreateRequest {
pub new_timeline_id: TimelineId,
#[serde(default)]