From 02a8b7fbe0bfee9d78b1d234f8c0c1946211326f Mon Sep 17 00:00:00 2001 From: John Spray Date: Tue, 20 Feb 2024 10:13:21 +0000 Subject: [PATCH] storage controller: issue timeline create/delete calls concurrently (#6827) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## Problem Timeline creation is meant to be very fast: it should only take approximately on S3 PUT latency. When we have many shards in a tenant, we should preserve that responsiveness. ## Summary of changes - Issue create/delete pageserver API calls concurrently across all >0 shards - During tenant deletion, delete shard zero last, separately, to avoid confusing anything using GETs on the timeline. - Return 201 instead of 200 on creations to make cloud control plane happy --------- Co-authored-by: Arpad Müller --- control_plane/attachment_service/src/http.rs | 7 +- .../attachment_service/src/service.rs | 151 +++++++++++++----- libs/pageserver_api/src/models.rs | 2 +- 3 files changed, 114 insertions(+), 46 deletions(-) diff --git a/control_plane/attachment_service/src/http.rs b/control_plane/attachment_service/src/http.rs index d6c8fa084b..67ab37dfc1 100644 --- a/control_plane/attachment_service/src/http.rs +++ b/control_plane/attachment_service/src/http.rs @@ -114,7 +114,10 @@ async fn handle_tenant_create( mut req: Request, ) -> Result, ApiError> { let create_req = json_request::(&mut req).await?; - json_response(StatusCode::OK, service.tenant_create(create_req).await?) + json_response( + StatusCode::CREATED, + service.tenant_create(create_req).await?, + ) } // For tenant and timeline deletions, which both implement an "initially return 202, then 404 once @@ -196,7 +199,7 @@ async fn handle_tenant_timeline_create( let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?; let create_req = json_request::(&mut req).await?; json_response( - StatusCode::OK, + StatusCode::CREATED, service .tenant_timeline_create(tenant_id, create_req) .await?, diff --git a/control_plane/attachment_service/src/service.rs b/control_plane/attachment_service/src/service.rs index 0fe758e731..4082af3fe6 100644 --- a/control_plane/attachment_service/src/service.rs +++ b/control_plane/attachment_service/src/service.rs @@ -14,7 +14,7 @@ use control_plane::attachment_service::{ TenantShardMigrateRequest, TenantShardMigrateResponse, }; use diesel::result::DatabaseErrorKind; -use futures::StreamExt; +use futures::{stream::FuturesUnordered, StreamExt}; use hyper::StatusCode; use pageserver_api::{ control_api::{ @@ -1287,8 +1287,6 @@ impl Service { tenant_id: TenantId, mut create_req: TimelineCreateRequest, ) -> Result { - let mut timeline_info = None; - tracing::info!( "Creating timeline {}/{}", tenant_id, @@ -1299,7 +1297,7 @@ impl Service { // TODO: refuse to do this if shard splitting is in progress // (https://github.com/neondatabase/neon/issues/6676) - let targets = { + let mut targets = { let locked = self.inner.read().unwrap(); let mut targets = Vec::new(); @@ -1323,21 +1321,24 @@ impl Service { return Err(ApiError::NotFound( anyhow::anyhow!("Tenant not found").into(), )); - } - - for (tenant_shard_id, node) in targets { - // TODO: issue shard timeline creates in parallel, once the 0th is done. - - let client = mgmt_api::Client::new(node.base_url(), self.config.jwt_token.as_deref()); + }; + let shard_zero = targets.remove(0); + async fn create_one( + tenant_shard_id: TenantShardId, + node: Node, + jwt: Option, + create_req: TimelineCreateRequest, + ) -> Result { tracing::info!( "Creating timeline on shard {}/{}, attached to node {}", tenant_shard_id, create_req.new_timeline_id, node.id ); + let client = mgmt_api::Client::new(node.base_url(), jwt.as_deref()); - let shard_timeline_info = client + client .timeline_create(tenant_shard_id, &create_req) .await .map_err(|e| match e { @@ -1350,23 +1351,66 @@ impl Service { ApiError::InternalServerError(anyhow::anyhow!(msg)) } _ => ApiError::Conflict(format!("Failed to create timeline: {e}")), - })?; - - if timeline_info.is_none() { - // If the caller specified an ancestor but no ancestor LSN, we are responsible for - // propagating the LSN chosen by the first shard to the other shards: it is important - // that all shards end up with the same ancestor_start_lsn. - if create_req.ancestor_timeline_id.is_some() - && create_req.ancestor_start_lsn.is_none() - { - create_req.ancestor_start_lsn = shard_timeline_info.ancestor_lsn; - } - - // We will return the TimelineInfo from the first shard - timeline_info = Some(shard_timeline_info); - } + }) } - Ok(timeline_info.expect("targets cannot be empty")) + + // Because the caller might not provide an explicit LSN, we must do the creation first on a single shard, and then + // use whatever LSN that shard picked when creating on subsequent shards. We arbitrarily use shard zero as the shard + // that will get the first creation request, and propagate the LSN to all the >0 shards. + let timeline_info = create_one( + shard_zero.0, + shard_zero.1, + self.config.jwt_token.clone(), + create_req.clone(), + ) + .await?; + + // Propagate the LSN that shard zero picked, if caller didn't provide one + if create_req.ancestor_timeline_id.is_some() && create_req.ancestor_start_lsn.is_none() { + create_req.ancestor_start_lsn = timeline_info.ancestor_lsn; + } + + // Create timeline on remaining shards with number >0 + if !targets.is_empty() { + // If we had multiple shards, issue requests for the remainder now. + let jwt = self.config.jwt_token.clone(); + self.tenant_for_shards(targets, |tenant_shard_id: TenantShardId, node: Node| { + let create_req = create_req.clone(); + Box::pin(create_one(tenant_shard_id, node, jwt.clone(), create_req)) + }) + .await?; + } + + Ok(timeline_info) + } + + /// Helper for concurrently calling a pageserver API on a number of shards, such as timeline creation. + /// + /// On success, the returned vector contains exactly the same number of elements as the input `locations`. + async fn tenant_for_shards( + &self, + locations: Vec<(TenantShardId, Node)>, + mut req_fn: F, + ) -> Result, ApiError> + where + F: FnMut( + TenantShardId, + Node, + ) + -> std::pin::Pin> + Send>>, + { + let mut futs = FuturesUnordered::new(); + let mut results = Vec::with_capacity(locations.len()); + + for (tenant_shard_id, node) in locations { + futs.push(req_fn(tenant_shard_id, node)); + } + + while let Some(r) = futs.next().await { + results.push(r?); + } + + Ok(results) } pub(crate) async fn tenant_timeline_delete( @@ -1380,7 +1424,7 @@ impl Service { // TODO: refuse to do this if shard splitting is in progress // (https://github.com/neondatabase/neon/issues/6676) - let targets = { + let mut targets = { let locked = self.inner.read().unwrap(); let mut targets = Vec::new(); @@ -1405,12 +1449,14 @@ impl Service { anyhow::anyhow!("Tenant not found").into(), )); } + let shard_zero = targets.remove(0); - // TODO: call into shards concurrently - let mut any_pending = false; - for (tenant_shard_id, node) in targets { - let client = mgmt_api::Client::new(node.base_url(), self.config.jwt_token.as_deref()); - + async fn delete_one( + tenant_shard_id: TenantShardId, + timeline_id: TimelineId, + node: Node, + jwt: Option, + ) -> Result { tracing::info!( "Deleting timeline on shard {}/{}, attached to node {}", tenant_shard_id, @@ -1418,7 +1464,8 @@ impl Service { node.id ); - let status = client + let client = mgmt_api::Client::new(node.base_url(), jwt.as_deref()); + client .timeline_delete(tenant_shard_id, timeline_id) .await .map_err(|e| { @@ -1426,18 +1473,36 @@ impl Service { "Error deleting timeline {timeline_id} on {tenant_shard_id} on node {}: {e}", node.id )) - })?; - - if status == StatusCode::ACCEPTED { - any_pending = true; - } + }) } - if any_pending { - Ok(StatusCode::ACCEPTED) - } else { - Ok(StatusCode::NOT_FOUND) + let statuses = self + .tenant_for_shards(targets, |tenant_shard_id: TenantShardId, node: Node| { + Box::pin(delete_one( + tenant_shard_id, + timeline_id, + node, + self.config.jwt_token.clone(), + )) + }) + .await?; + + // If any shards >0 haven't finished deletion yet, don't start deletion on shard zero + if statuses.iter().any(|s| s != &StatusCode::NOT_FOUND) { + return Ok(StatusCode::ACCEPTED); } + + // Delete shard zero last: this is not strictly necessary, but since a caller's GET on a timeline will be routed + // to shard zero, it gives a more obvious behavior that a GET returns 404 once the deletion is done. + let shard_zero_status = delete_one( + shard_zero.0, + timeline_id, + shard_zero.1, + self.config.jwt_token.clone(), + ) + .await?; + + Ok(shard_zero_status) } /// When you need to send an HTTP request to the pageserver that holds shard0 of a tenant, this diff --git a/libs/pageserver_api/src/models.rs b/libs/pageserver_api/src/models.rs index d546cb5c54..557a4d7de9 100644 --- a/libs/pageserver_api/src/models.rs +++ b/libs/pageserver_api/src/models.rs @@ -180,7 +180,7 @@ pub enum TimelineState { Broken { reason: String, backtrace: String }, } -#[derive(Serialize, Deserialize)] +#[derive(Serialize, Deserialize, Clone)] pub struct TimelineCreateRequest { pub new_timeline_id: TimelineId, #[serde(default)]