diff --git a/storage_controller/migrations/2024-08-23-170149_tenant_id_index/down.sql b/storage_controller/migrations/2024-08-23-170149_tenant_id_index/down.sql new file mode 100644 index 0000000000..518c747100 --- /dev/null +++ b/storage_controller/migrations/2024-08-23-170149_tenant_id_index/down.sql @@ -0,0 +1,2 @@ +-- This file should undo anything in `up.sql` +DROP INDEX tenant_shards_tenant_id; \ No newline at end of file diff --git a/storage_controller/migrations/2024-08-23-170149_tenant_id_index/up.sql b/storage_controller/migrations/2024-08-23-170149_tenant_id_index/up.sql new file mode 100644 index 0000000000..dd6b37781a --- /dev/null +++ b/storage_controller/migrations/2024-08-23-170149_tenant_id_index/up.sql @@ -0,0 +1,2 @@ +-- Your SQL goes here +CREATE INDEX tenant_shards_tenant_id ON tenant_shards (tenant_id); \ No newline at end of file diff --git a/storage_controller/src/persistence.rs b/storage_controller/src/persistence.rs index 16df19026c..1a905753a1 100644 --- a/storage_controller/src/persistence.rs +++ b/storage_controller/src/persistence.rs @@ -91,6 +91,7 @@ pub(crate) enum DatabaseOperation { Detach, ReAttach, IncrementGeneration, + PeekGenerations, ListTenantShards, InsertTenantShards, UpdateTenantShard, @@ -502,6 +503,43 @@ impl Persistence { Ok(Generation::new(g as u32)) } + /// When we want to call out to the running shards for a tenant, e.g. during timeline CRUD operations, + /// we need to know where the shard is attached, _and_ the generation, so that we can re-check the generation + /// afterwards to confirm that our timeline CRUD operation is truly persistent (it must have happened in the + /// latest generation) + /// + /// If the tenant doesn't exist, an empty vector is returned. + /// + /// Output is sorted by shard number + pub(crate) async fn peek_generations( + &self, + filter_tenant_id: TenantId, + ) -> Result, Option)>, DatabaseError> { + use crate::schema::tenant_shards::dsl::*; + let rows = self + .with_measured_conn(DatabaseOperation::PeekGenerations, move |conn| { + let result = tenant_shards + .filter(tenant_id.eq(filter_tenant_id.to_string())) + .select(TenantShardPersistence::as_select()) + .order(shard_number) + .load(conn)?; + Ok(result) + }) + .await?; + + Ok(rows + .into_iter() + .map(|p| { + ( + p.get_tenant_shard_id() + .expect("Corrupt tenant shard id in database"), + p.generation.map(|g| Generation::new(g as u32)), + p.generation_pageserver.map(|n| NodeId(n as u64)), + ) + }) + .collect()) + } + #[allow(non_local_definitions)] /// For use when updating a persistent property of a tenant, such as its config or placement_policy. /// diff --git a/storage_controller/src/service.rs b/storage_controller/src/service.rs index 4b0c556824..7daa1e4f5f 100644 --- a/storage_controller/src/service.rs +++ b/storage_controller/src/service.rs @@ -2854,82 +2854,67 @@ impl Service { .await; failpoint_support::sleep_millis_async!("tenant-create-timeline-shared-lock"); - self.ensure_attached_wait(tenant_id).await?; + self.tenant_remote_mutation(tenant_id, move |mut targets| async move { + if targets.is_empty() { + return Err(ApiError::NotFound( + anyhow::anyhow!("Tenant not found").into(), + )); + }; + let shard_zero = targets.remove(0); - let mut targets = { - let locked = self.inner.read().unwrap(); - let mut targets = Vec::new(); + async fn create_one( + tenant_shard_id: TenantShardId, + node: Node, + jwt: Option, + create_req: TimelineCreateRequest, + ) -> Result { + tracing::info!( + "Creating timeline on shard {}/{}, attached to node {node}", + tenant_shard_id, + create_req.new_timeline_id, + ); + let client = PageserverClient::new(node.get_id(), node.base_url(), jwt.as_deref()); - for (tenant_shard_id, shard) in - locked.tenants.range(TenantShardId::tenant_range(tenant_id)) - { - let node_id = shard.intent.get_attached().ok_or_else(|| { - ApiError::InternalServerError(anyhow::anyhow!("Shard not scheduled")) - })?; - let node = locked - .nodes - .get(&node_id) - .expect("Pageservers may not be deleted while referenced"); - - targets.push((*tenant_shard_id, node.clone())); + client + .timeline_create(tenant_shard_id, &create_req) + .await + .map_err(|e| passthrough_api_error(&node, e)) } - targets - }; - if targets.is_empty() { - return Err(ApiError::NotFound( - anyhow::anyhow!("Tenant not found").into(), - )); - }; - let shard_zero = targets.remove(0); - - async fn create_one( - tenant_shard_id: TenantShardId, - node: Node, - jwt: Option, - create_req: TimelineCreateRequest, - ) -> Result { - tracing::info!( - "Creating timeline on shard {}/{}, attached to node {node}", - tenant_shard_id, - create_req.new_timeline_id, - ); - let client = PageserverClient::new(node.get_id(), node.base_url(), jwt.as_deref()); - - client - .timeline_create(tenant_shard_id, &create_req) - .await - .map_err(|e| passthrough_api_error(&node, e)) - } - - // Because the caller might not provide an explicit LSN, we must do the creation first on a single shard, and then - // use whatever LSN that shard picked when creating on subsequent shards. We arbitrarily use shard zero as the shard - // that will get the first creation request, and propagate the LSN to all the >0 shards. - let timeline_info = create_one( - shard_zero.0, - shard_zero.1, - self.config.jwt_token.clone(), - create_req.clone(), - ) - .await?; - - // Propagate the LSN that shard zero picked, if caller didn't provide one - if create_req.ancestor_timeline_id.is_some() && create_req.ancestor_start_lsn.is_none() { - create_req.ancestor_start_lsn = timeline_info.ancestor_lsn; - } - - // Create timeline on remaining shards with number >0 - if !targets.is_empty() { - // If we had multiple shards, issue requests for the remainder now. - let jwt = &self.config.jwt_token; - self.tenant_for_shards(targets, |tenant_shard_id: TenantShardId, node: Node| { - let create_req = create_req.clone(); - Box::pin(create_one(tenant_shard_id, node, jwt.clone(), create_req)) - }) + // Because the caller might not provide an explicit LSN, we must do the creation first on a single shard, and then + // use whatever LSN that shard picked when creating on subsequent shards. We arbitrarily use shard zero as the shard + // that will get the first creation request, and propagate the LSN to all the >0 shards. + let timeline_info = create_one( + shard_zero.0, + shard_zero.1, + self.config.jwt_token.clone(), + create_req.clone(), + ) .await?; - } - Ok(timeline_info) + // Propagate the LSN that shard zero picked, if caller didn't provide one + if create_req.ancestor_timeline_id.is_some() && create_req.ancestor_start_lsn.is_none() + { + create_req.ancestor_start_lsn = timeline_info.ancestor_lsn; + } + + // Create timeline on remaining shards with number >0 + if !targets.is_empty() { + // If we had multiple shards, issue requests for the remainder now. + let jwt = &self.config.jwt_token; + self.tenant_for_shards( + targets.iter().map(|t| (t.0, t.1.clone())).collect(), + |tenant_shard_id: TenantShardId, node: Node| { + let create_req = create_req.clone(); + Box::pin(create_one(tenant_shard_id, node, jwt.clone(), create_req)) + }, + ) + .await?; + } + + Ok(timeline_info) + }) + .await? } pub(crate) async fn tenant_timeline_detach_ancestor( @@ -2946,107 +2931,87 @@ impl Service { ) .await; - self.ensure_attached_wait(tenant_id).await?; - - let targets = { - let locked = self.inner.read().unwrap(); - let mut targets = Vec::new(); - - for (tenant_shard_id, shard) in - locked.tenants.range(TenantShardId::tenant_range(tenant_id)) - { - let node_id = shard.intent.get_attached().ok_or_else(|| { - ApiError::InternalServerError(anyhow::anyhow!("Shard not scheduled")) - })?; - let node = locked - .nodes - .get(&node_id) - .expect("Pageservers may not be deleted while referenced"); - - targets.push((*tenant_shard_id, node.clone())); + self.tenant_remote_mutation(tenant_id, move |targets| async move { + if targets.is_empty() { + return Err(ApiError::NotFound( + anyhow::anyhow!("Tenant not found").into(), + )); } - targets - }; - if targets.is_empty() { - return Err(ApiError::NotFound( - anyhow::anyhow!("Tenant not found").into(), - )); - } + async fn detach_one( + tenant_shard_id: TenantShardId, + timeline_id: TimelineId, + node: Node, + jwt: Option, + ) -> Result<(ShardNumber, models::detach_ancestor::AncestorDetached), ApiError> { + tracing::info!( + "Detaching timeline on shard {tenant_shard_id}/{timeline_id}, attached to node {node}", + ); - async fn detach_one( - tenant_shard_id: TenantShardId, - timeline_id: TimelineId, - node: Node, - jwt: Option, - ) -> Result<(ShardNumber, models::detach_ancestor::AncestorDetached), ApiError> { - tracing::info!( - "Detaching timeline on shard {tenant_shard_id}/{timeline_id}, attached to node {node}", - ); + let client = PageserverClient::new(node.get_id(), node.base_url(), jwt.as_deref()); - let client = PageserverClient::new(node.get_id(), node.base_url(), jwt.as_deref()); + client + .timeline_detach_ancestor(tenant_shard_id, timeline_id) + .await + .map_err(|e| { + use mgmt_api::Error; - client - .timeline_detach_ancestor(tenant_shard_id, timeline_id) - .await - .map_err(|e| { - use mgmt_api::Error; - - match e { - // no ancestor (ever) - Error::ApiError(StatusCode::CONFLICT, msg) => ApiError::Conflict(format!( - "{node}: {}", - msg.strip_prefix("Conflict: ").unwrap_or(&msg) - )), - // too many ancestors - Error::ApiError(StatusCode::BAD_REQUEST, msg) => { - ApiError::BadRequest(anyhow::anyhow!("{node}: {msg}")) + match e { + // no ancestor (ever) + Error::ApiError(StatusCode::CONFLICT, msg) => ApiError::Conflict(format!( + "{node}: {}", + msg.strip_prefix("Conflict: ").unwrap_or(&msg) + )), + // too many ancestors + Error::ApiError(StatusCode::BAD_REQUEST, msg) => { + ApiError::BadRequest(anyhow::anyhow!("{node}: {msg}")) + } + Error::ApiError(StatusCode::INTERNAL_SERVER_ERROR, msg) => { + // avoid turning these into conflicts to remain compatible with + // pageservers, 500 errors are sadly retryable with timeline ancestor + // detach + ApiError::InternalServerError(anyhow::anyhow!("{node}: {msg}")) + } + // rest can be mapped as usual + other => passthrough_api_error(&node, other), } - Error::ApiError(StatusCode::INTERNAL_SERVER_ERROR, msg) => { - // avoid turning these into conflicts to remain compatible with - // pageservers, 500 errors are sadly retryable with timeline ancestor - // detach - ApiError::InternalServerError(anyhow::anyhow!("{node}: {msg}")) - } - // rest can be mapped as usual - other => passthrough_api_error(&node, other), - } + }) + .map(|res| (tenant_shard_id.shard_number, res)) + } + + // no shard needs to go first/last; the operation should be idempotent + let mut results = self + .tenant_for_shards(targets, |tenant_shard_id, node| { + futures::FutureExt::boxed(detach_one( + tenant_shard_id, + timeline_id, + node, + self.config.jwt_token.clone(), + )) }) - .map(|res| (tenant_shard_id.shard_number, res)) - } + .await?; - // no shard needs to go first/last; the operation should be idempotent - let mut results = self - .tenant_for_shards(targets, |tenant_shard_id, node| { - futures::FutureExt::boxed(detach_one( - tenant_shard_id, - timeline_id, - node, - self.config.jwt_token.clone(), - )) - }) - .await?; + let any = results.pop().expect("we must have at least one response"); - let any = results.pop().expect("we must have at least one response"); + let mismatching = results + .iter() + .filter(|(_, res)| res != &any.1) + .collect::>(); + if !mismatching.is_empty() { + // this can be hit by races which should not happen because operation lock on cplane + let matching = results.len() - mismatching.len(); + tracing::error!( + matching, + compared_against=?any, + ?mismatching, + "shards returned different results" + ); - let mismatching = results - .iter() - .filter(|(_, res)| res != &any.1) - .collect::>(); - if !mismatching.is_empty() { - // this can be hit by races which should not happen because operation lock on cplane - let matching = results.len() - mismatching.len(); - tracing::error!( - matching, - compared_against=?any, - ?mismatching, - "shards returned different results" - ); + return Err(ApiError::InternalServerError(anyhow::anyhow!("pageservers returned mixed results for ancestor detach; manual intervention is required."))); + } - return Err(ApiError::InternalServerError(anyhow::anyhow!("pageservers returned mixed results for ancestor detach; manual intervention is required."))); - } - - Ok(any.1) + Ok(any.1) + }).await? } /// Helper for concurrently calling a pageserver API on a number of shards, such as timeline creation. @@ -3117,6 +3082,84 @@ impl Service { results } + /// Helper for safely working with the shards in a tenant remotely on pageservers, for example + /// when creating and deleting timelines: + /// - Makes sure shards are attached somewhere if they weren't already + /// - Looks up the shards and the nodes where they were most recently attached + /// - Guarantees that after the inner function returns, the shards' generations haven't moved on: this + /// ensures that the remote operation acted on the most recent generation, and is therefore durable. + async fn tenant_remote_mutation( + &self, + tenant_id: TenantId, + op: O, + ) -> Result + where + O: FnOnce(Vec<(TenantShardId, Node)>) -> F, + F: std::future::Future, + { + let target_gens = { + let mut targets = Vec::new(); + + // Load the currently attached pageservers for the latest generation of each shard. This can + // run concurrently with reconciliations, and it is not guaranteed that the node we find here + // will still be the latest when we're done: we will check generations again at the end of + // this function to handle that. + let generations = self.persistence.peek_generations(tenant_id).await?; + let generations = if generations.iter().any(|i| i.1.is_none()) { + // One or more shards is not attached to anything: maybe this is a new tenant? Wait for + // it to reconcile. + self.ensure_attached_wait(tenant_id).await?; + self.persistence.peek_generations(tenant_id).await? + } else { + generations + }; + + let locked = self.inner.read().unwrap(); + for (tenant_shard_id, generation, generation_pageserver) in generations { + let node_id = generation_pageserver.ok_or(ApiError::Conflict( + "Tenant not currently attached".to_string(), + ))?; + let node = locked + .nodes + .get(&node_id) + .ok_or(ApiError::Conflict(format!( + "Raced with removal of node {node_id}" + )))?; + targets.push((tenant_shard_id, node.clone(), generation)); + } + + targets + }; + + let targets = target_gens.iter().map(|t| (t.0, t.1.clone())).collect(); + let result = op(targets).await; + + // Post-check: are all the generations of all the shards the same as they were initially? This proves that + // our remote operation executed on the latest generation and is therefore persistent. + { + let latest_generations = self.persistence.peek_generations(tenant_id).await?; + if latest_generations + .into_iter() + .map(|g| (g.0, g.1)) + .collect::>() + != target_gens + .into_iter() + .map(|i| (i.0, i.2)) + .collect::>() + { + // We raced with something that incremented the generation, and therefore cannot be + // confident that our actions are persistent (they might have hit an old generation). + // + // This is safe but requires a retry: ask the client to do that by giving them a 503 response. + return Err(ApiError::ResourceUnavailable( + "Tenant attachment changed, please retry".into(), + )); + } + } + + Ok(result) + } + pub(crate) async fn tenant_timeline_delete( &self, tenant_id: TenantId, @@ -3130,83 +3173,62 @@ impl Service { ) .await; - self.ensure_attached_wait(tenant_id).await?; - - let mut targets = { - let locked = self.inner.read().unwrap(); - let mut targets = Vec::new(); - - for (tenant_shard_id, shard) in - locked.tenants.range(TenantShardId::tenant_range(tenant_id)) - { - let node_id = shard.intent.get_attached().ok_or_else(|| { - ApiError::InternalServerError(anyhow::anyhow!("Shard not scheduled")) - })?; - let node = locked - .nodes - .get(&node_id) - .expect("Pageservers may not be deleted while referenced"); - - targets.push((*tenant_shard_id, node.clone())); + self.tenant_remote_mutation(tenant_id, move |mut targets| async move { + if targets.is_empty() { + return Err(ApiError::NotFound( + anyhow::anyhow!("Tenant not found").into(), + )); } - targets - }; + let shard_zero = targets.remove(0); - if targets.is_empty() { - return Err(ApiError::NotFound( - anyhow::anyhow!("Tenant not found").into(), - )); - } - let shard_zero = targets.remove(0); + async fn delete_one( + tenant_shard_id: TenantShardId, + timeline_id: TimelineId, + node: Node, + jwt: Option, + ) -> Result { + tracing::info!( + "Deleting timeline on shard {tenant_shard_id}/{timeline_id}, attached to node {node}", + ); - async fn delete_one( - tenant_shard_id: TenantShardId, - timeline_id: TimelineId, - node: Node, - jwt: Option, - ) -> Result { - tracing::info!( - "Deleting timeline on shard {tenant_shard_id}/{timeline_id}, attached to node {node}", - ); + let client = PageserverClient::new(node.get_id(), node.base_url(), jwt.as_deref()); + client + .timeline_delete(tenant_shard_id, timeline_id) + .await + .map_err(|e| { + ApiError::InternalServerError(anyhow::anyhow!( + "Error deleting timeline {timeline_id} on {tenant_shard_id} on node {node}: {e}", + )) + }) + } - let client = PageserverClient::new(node.get_id(), node.base_url(), jwt.as_deref()); - client - .timeline_delete(tenant_shard_id, timeline_id) - .await - .map_err(|e| { - ApiError::InternalServerError(anyhow::anyhow!( - "Error deleting timeline {timeline_id} on {tenant_shard_id} on node {node}: {e}", + let statuses = self + .tenant_for_shards(targets, |tenant_shard_id: TenantShardId, node: Node| { + Box::pin(delete_one( + tenant_shard_id, + timeline_id, + node, + self.config.jwt_token.clone(), )) }) - } + .await?; - let statuses = self - .tenant_for_shards(targets, |tenant_shard_id: TenantShardId, node: Node| { - Box::pin(delete_one( - tenant_shard_id, - timeline_id, - node, - self.config.jwt_token.clone(), - )) - }) + // If any shards >0 haven't finished deletion yet, don't start deletion on shard zero + if statuses.iter().any(|s| s != &StatusCode::NOT_FOUND) { + return Ok(StatusCode::ACCEPTED); + } + + // Delete shard zero last: this is not strictly necessary, but since a caller's GET on a timeline will be routed + // to shard zero, it gives a more obvious behavior that a GET returns 404 once the deletion is done. + let shard_zero_status = delete_one( + shard_zero.0, + timeline_id, + shard_zero.1, + self.config.jwt_token.clone(), + ) .await?; - - // If any shards >0 haven't finished deletion yet, don't start deletion on shard zero - if statuses.iter().any(|s| s != &StatusCode::NOT_FOUND) { - return Ok(StatusCode::ACCEPTED); - } - - // Delete shard zero last: this is not strictly necessary, but since a caller's GET on a timeline will be routed - // to shard zero, it gives a more obvious behavior that a GET returns 404 once the deletion is done. - let shard_zero_status = delete_one( - shard_zero.0, - timeline_id, - shard_zero.1, - self.config.jwt_token.clone(), - ) - .await?; - - Ok(shard_zero_status) + Ok(shard_zero_status) + }).await? } /// When you need to send an HTTP request to the pageserver that holds shard0 of a tenant, this diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index 2bb698f175..92febfec9b 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -2284,7 +2284,7 @@ class NeonStorageController(MetricsGetter, LogUtils): self.allowed_errors, ) - def pageserver_api(self) -> PageserverHttpClient: + def pageserver_api(self, *args, **kwargs) -> PageserverHttpClient: """ The storage controller implements a subset of the pageserver REST API, for mapping per-tenant actions into per-shard actions (e.g. timeline creation). Tests should invoke those @@ -2293,7 +2293,7 @@ class NeonStorageController(MetricsGetter, LogUtils): auth_token = None if self.auth_enabled: auth_token = self.env.auth_keys.generate_token(scope=TokenScope.PAGE_SERVER_API) - return PageserverHttpClient(self.port, lambda: True, auth_token) + return PageserverHttpClient(self.port, lambda: True, auth_token, *args, **kwargs) def request(self, method, *args, **kwargs) -> requests.Response: resp = requests.request(method, *args, **kwargs) diff --git a/test_runner/regress/test_storage_controller.py b/test_runner/regress/test_storage_controller.py index b3464b0c91..03eb7628be 100644 --- a/test_runner/regress/test_storage_controller.py +++ b/test_runner/regress/test_storage_controller.py @@ -21,7 +21,7 @@ from fixtures.neon_fixtures import ( TokenScope, last_flush_lsn_upload, ) -from fixtures.pageserver.http import PageserverHttpClient +from fixtures.pageserver.http import PageserverApiException, PageserverHttpClient from fixtures.pageserver.utils import ( assert_prefix_empty, assert_prefix_not_empty, @@ -41,6 +41,7 @@ from mypy_boto3_s3.type_defs import ( ObjectTypeDef, ) from pytest_httpserver import HTTPServer +from urllib3 import Retry from werkzeug.wrappers.request import Request from werkzeug.wrappers.response import Response @@ -2266,3 +2267,66 @@ def test_storage_controller_ps_restarted_during_drain(neon_env_builder: NeonEnvB # allow for small delay between actually having cancelled and being able reconfigure again wait_until(4, 0.5, reconfigure_node_again) + + +def test_storage_controller_timeline_crud_race(neon_env_builder: NeonEnvBuilder): + """ + The storage controller is meant to handle the case where a timeline CRUD operation races + with a generation-incrementing change to the tenant: this should trigger a retry so that + the operation lands on the highest-generation'd tenant location. + """ + neon_env_builder.num_pageservers = 2 + env = neon_env_builder.init_configs() + env.start() + tenant_id = TenantId.generate() + env.storage_controller.tenant_create(tenant_id) + + # Set up a failpoint so that a timeline creation will be very slow + failpoint = "timeline-creation-after-uninit" + for ps in env.pageservers: + ps.http_client().configure_failpoints((failpoint, "sleep(10000)")) + + # Start a timeline creation in the background + create_timeline_id = TimelineId.generate() + futs = [] + with concurrent.futures.ThreadPoolExecutor( + max_workers=2 + len(env.pageservers) + len(env.safekeepers) + ) as executor: + futs.append( + executor.submit( + env.storage_controller.pageserver_api( + retries=Retry( + status=0, + connect=0, # Disable retries: we want to see the 503 + ) + ).timeline_create, + PgVersion.NOT_SET, + tenant_id, + create_timeline_id, + ) + ) + + def has_hit_failpoint(): + assert any( + ps.log_contains(f"at failpoint {failpoint}") is not None for ps in env.pageservers + ) + + wait_until(10, 1, has_hit_failpoint) + + # Migrate the tenant while the timeline creation is in progress: this migration will complete once it + # can detach from the old pageserver, which will happen once the failpoint completes. + env.storage_controller.tenant_shard_migrate( + TenantShardId(tenant_id, 0, 0), env.pageservers[1].id + ) + + with pytest.raises(PageserverApiException, match="Tenant attachment changed, please retry"): + futs[0].result(timeout=20) + + # Timeline creation should work when there isn't a concurrent migration, even though it's + # slow (our failpoint is still enabled) + env.storage_controller.pageserver_api( + retries=Retry( + status=0, + connect=0, # Disable retries: we want to see the 503 + ) + ).timeline_create(PgVersion.NOT_SET, tenant_id, create_timeline_id)