mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-17 13:10:38 +00:00
storage controller: enable timeline CRUD operations to run concurrently with reconciliation & make them safer (#8783)
## Problem - If a reconciler was waiting to be able to notify computes about a change, but the control plane was waiting for the controller to finish a timeline creation/deletion, the overall system can deadlock. - If a tenant shard was migrated concurrently with a timeline creation/deletion, there was a risk that the timeline operation could be applied to a non-latest-generation location, and thereby not really be persistent. This has never happened in practice, but would eventually happen at scale. Closes: #8743 ## Summary of changes - Introduce `Service::tenant_remote_mutation` helper, which looks up shards & generations and passes them into an inner function that may do remote I/O to pageservers. Before returning success, this helper checks that generations haven't incremented, to guarantee that changes are persistent. - Convert tenant_timeline_create, tenant_timeline_delete, and tenant_timeline_detach_ancestor to use this helper. - These functions no longer block on ensure_attached unless the tenant was never attached at all, so they should make progress even if we can't complete compute notifications. This increases the database load from timeline/create operations, but only with cheap read transactions.
This commit is contained in:
@@ -0,0 +1,2 @@
|
||||
-- This file should undo anything in `up.sql`
|
||||
DROP INDEX tenant_shards_tenant_id;
|
||||
@@ -0,0 +1,2 @@
|
||||
-- Your SQL goes here
|
||||
CREATE INDEX tenant_shards_tenant_id ON tenant_shards (tenant_id);
|
||||
@@ -91,6 +91,7 @@ pub(crate) enum DatabaseOperation {
|
||||
Detach,
|
||||
ReAttach,
|
||||
IncrementGeneration,
|
||||
PeekGenerations,
|
||||
ListTenantShards,
|
||||
InsertTenantShards,
|
||||
UpdateTenantShard,
|
||||
@@ -502,6 +503,43 @@ impl Persistence {
|
||||
Ok(Generation::new(g as u32))
|
||||
}
|
||||
|
||||
/// When we want to call out to the running shards for a tenant, e.g. during timeline CRUD operations,
|
||||
/// we need to know where the shard is attached, _and_ the generation, so that we can re-check the generation
|
||||
/// afterwards to confirm that our timeline CRUD operation is truly persistent (it must have happened in the
|
||||
/// latest generation)
|
||||
///
|
||||
/// If the tenant doesn't exist, an empty vector is returned.
|
||||
///
|
||||
/// Output is sorted by shard number
|
||||
pub(crate) async fn peek_generations(
|
||||
&self,
|
||||
filter_tenant_id: TenantId,
|
||||
) -> Result<Vec<(TenantShardId, Option<Generation>, Option<NodeId>)>, DatabaseError> {
|
||||
use crate::schema::tenant_shards::dsl::*;
|
||||
let rows = self
|
||||
.with_measured_conn(DatabaseOperation::PeekGenerations, move |conn| {
|
||||
let result = tenant_shards
|
||||
.filter(tenant_id.eq(filter_tenant_id.to_string()))
|
||||
.select(TenantShardPersistence::as_select())
|
||||
.order(shard_number)
|
||||
.load(conn)?;
|
||||
Ok(result)
|
||||
})
|
||||
.await?;
|
||||
|
||||
Ok(rows
|
||||
.into_iter()
|
||||
.map(|p| {
|
||||
(
|
||||
p.get_tenant_shard_id()
|
||||
.expect("Corrupt tenant shard id in database"),
|
||||
p.generation.map(|g| Generation::new(g as u32)),
|
||||
p.generation_pageserver.map(|n| NodeId(n as u64)),
|
||||
)
|
||||
})
|
||||
.collect())
|
||||
}
|
||||
|
||||
#[allow(non_local_definitions)]
|
||||
/// For use when updating a persistent property of a tenant, such as its config or placement_policy.
|
||||
///
|
||||
|
||||
@@ -2854,82 +2854,67 @@ impl Service {
|
||||
.await;
|
||||
failpoint_support::sleep_millis_async!("tenant-create-timeline-shared-lock");
|
||||
|
||||
self.ensure_attached_wait(tenant_id).await?;
|
||||
self.tenant_remote_mutation(tenant_id, move |mut targets| async move {
|
||||
if targets.is_empty() {
|
||||
return Err(ApiError::NotFound(
|
||||
anyhow::anyhow!("Tenant not found").into(),
|
||||
));
|
||||
};
|
||||
let shard_zero = targets.remove(0);
|
||||
|
||||
let mut targets = {
|
||||
let locked = self.inner.read().unwrap();
|
||||
let mut targets = Vec::new();
|
||||
async fn create_one(
|
||||
tenant_shard_id: TenantShardId,
|
||||
node: Node,
|
||||
jwt: Option<String>,
|
||||
create_req: TimelineCreateRequest,
|
||||
) -> Result<TimelineInfo, ApiError> {
|
||||
tracing::info!(
|
||||
"Creating timeline on shard {}/{}, attached to node {node}",
|
||||
tenant_shard_id,
|
||||
create_req.new_timeline_id,
|
||||
);
|
||||
let client = PageserverClient::new(node.get_id(), node.base_url(), jwt.as_deref());
|
||||
|
||||
for (tenant_shard_id, shard) in
|
||||
locked.tenants.range(TenantShardId::tenant_range(tenant_id))
|
||||
{
|
||||
let node_id = shard.intent.get_attached().ok_or_else(|| {
|
||||
ApiError::InternalServerError(anyhow::anyhow!("Shard not scheduled"))
|
||||
})?;
|
||||
let node = locked
|
||||
.nodes
|
||||
.get(&node_id)
|
||||
.expect("Pageservers may not be deleted while referenced");
|
||||
|
||||
targets.push((*tenant_shard_id, node.clone()));
|
||||
client
|
||||
.timeline_create(tenant_shard_id, &create_req)
|
||||
.await
|
||||
.map_err(|e| passthrough_api_error(&node, e))
|
||||
}
|
||||
targets
|
||||
};
|
||||
|
||||
if targets.is_empty() {
|
||||
return Err(ApiError::NotFound(
|
||||
anyhow::anyhow!("Tenant not found").into(),
|
||||
));
|
||||
};
|
||||
let shard_zero = targets.remove(0);
|
||||
|
||||
async fn create_one(
|
||||
tenant_shard_id: TenantShardId,
|
||||
node: Node,
|
||||
jwt: Option<String>,
|
||||
create_req: TimelineCreateRequest,
|
||||
) -> Result<TimelineInfo, ApiError> {
|
||||
tracing::info!(
|
||||
"Creating timeline on shard {}/{}, attached to node {node}",
|
||||
tenant_shard_id,
|
||||
create_req.new_timeline_id,
|
||||
);
|
||||
let client = PageserverClient::new(node.get_id(), node.base_url(), jwt.as_deref());
|
||||
|
||||
client
|
||||
.timeline_create(tenant_shard_id, &create_req)
|
||||
.await
|
||||
.map_err(|e| passthrough_api_error(&node, e))
|
||||
}
|
||||
|
||||
// Because the caller might not provide an explicit LSN, we must do the creation first on a single shard, and then
|
||||
// use whatever LSN that shard picked when creating on subsequent shards. We arbitrarily use shard zero as the shard
|
||||
// that will get the first creation request, and propagate the LSN to all the >0 shards.
|
||||
let timeline_info = create_one(
|
||||
shard_zero.0,
|
||||
shard_zero.1,
|
||||
self.config.jwt_token.clone(),
|
||||
create_req.clone(),
|
||||
)
|
||||
.await?;
|
||||
|
||||
// Propagate the LSN that shard zero picked, if caller didn't provide one
|
||||
if create_req.ancestor_timeline_id.is_some() && create_req.ancestor_start_lsn.is_none() {
|
||||
create_req.ancestor_start_lsn = timeline_info.ancestor_lsn;
|
||||
}
|
||||
|
||||
// Create timeline on remaining shards with number >0
|
||||
if !targets.is_empty() {
|
||||
// If we had multiple shards, issue requests for the remainder now.
|
||||
let jwt = &self.config.jwt_token;
|
||||
self.tenant_for_shards(targets, |tenant_shard_id: TenantShardId, node: Node| {
|
||||
let create_req = create_req.clone();
|
||||
Box::pin(create_one(tenant_shard_id, node, jwt.clone(), create_req))
|
||||
})
|
||||
// Because the caller might not provide an explicit LSN, we must do the creation first on a single shard, and then
|
||||
// use whatever LSN that shard picked when creating on subsequent shards. We arbitrarily use shard zero as the shard
|
||||
// that will get the first creation request, and propagate the LSN to all the >0 shards.
|
||||
let timeline_info = create_one(
|
||||
shard_zero.0,
|
||||
shard_zero.1,
|
||||
self.config.jwt_token.clone(),
|
||||
create_req.clone(),
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
|
||||
Ok(timeline_info)
|
||||
// Propagate the LSN that shard zero picked, if caller didn't provide one
|
||||
if create_req.ancestor_timeline_id.is_some() && create_req.ancestor_start_lsn.is_none()
|
||||
{
|
||||
create_req.ancestor_start_lsn = timeline_info.ancestor_lsn;
|
||||
}
|
||||
|
||||
// Create timeline on remaining shards with number >0
|
||||
if !targets.is_empty() {
|
||||
// If we had multiple shards, issue requests for the remainder now.
|
||||
let jwt = &self.config.jwt_token;
|
||||
self.tenant_for_shards(
|
||||
targets.iter().map(|t| (t.0, t.1.clone())).collect(),
|
||||
|tenant_shard_id: TenantShardId, node: Node| {
|
||||
let create_req = create_req.clone();
|
||||
Box::pin(create_one(tenant_shard_id, node, jwt.clone(), create_req))
|
||||
},
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
|
||||
Ok(timeline_info)
|
||||
})
|
||||
.await?
|
||||
}
|
||||
|
||||
pub(crate) async fn tenant_timeline_detach_ancestor(
|
||||
@@ -2946,107 +2931,87 @@ impl Service {
|
||||
)
|
||||
.await;
|
||||
|
||||
self.ensure_attached_wait(tenant_id).await?;
|
||||
|
||||
let targets = {
|
||||
let locked = self.inner.read().unwrap();
|
||||
let mut targets = Vec::new();
|
||||
|
||||
for (tenant_shard_id, shard) in
|
||||
locked.tenants.range(TenantShardId::tenant_range(tenant_id))
|
||||
{
|
||||
let node_id = shard.intent.get_attached().ok_or_else(|| {
|
||||
ApiError::InternalServerError(anyhow::anyhow!("Shard not scheduled"))
|
||||
})?;
|
||||
let node = locked
|
||||
.nodes
|
||||
.get(&node_id)
|
||||
.expect("Pageservers may not be deleted while referenced");
|
||||
|
||||
targets.push((*tenant_shard_id, node.clone()));
|
||||
self.tenant_remote_mutation(tenant_id, move |targets| async move {
|
||||
if targets.is_empty() {
|
||||
return Err(ApiError::NotFound(
|
||||
anyhow::anyhow!("Tenant not found").into(),
|
||||
));
|
||||
}
|
||||
targets
|
||||
};
|
||||
|
||||
if targets.is_empty() {
|
||||
return Err(ApiError::NotFound(
|
||||
anyhow::anyhow!("Tenant not found").into(),
|
||||
));
|
||||
}
|
||||
async fn detach_one(
|
||||
tenant_shard_id: TenantShardId,
|
||||
timeline_id: TimelineId,
|
||||
node: Node,
|
||||
jwt: Option<String>,
|
||||
) -> Result<(ShardNumber, models::detach_ancestor::AncestorDetached), ApiError> {
|
||||
tracing::info!(
|
||||
"Detaching timeline on shard {tenant_shard_id}/{timeline_id}, attached to node {node}",
|
||||
);
|
||||
|
||||
async fn detach_one(
|
||||
tenant_shard_id: TenantShardId,
|
||||
timeline_id: TimelineId,
|
||||
node: Node,
|
||||
jwt: Option<String>,
|
||||
) -> Result<(ShardNumber, models::detach_ancestor::AncestorDetached), ApiError> {
|
||||
tracing::info!(
|
||||
"Detaching timeline on shard {tenant_shard_id}/{timeline_id}, attached to node {node}",
|
||||
);
|
||||
let client = PageserverClient::new(node.get_id(), node.base_url(), jwt.as_deref());
|
||||
|
||||
let client = PageserverClient::new(node.get_id(), node.base_url(), jwt.as_deref());
|
||||
client
|
||||
.timeline_detach_ancestor(tenant_shard_id, timeline_id)
|
||||
.await
|
||||
.map_err(|e| {
|
||||
use mgmt_api::Error;
|
||||
|
||||
client
|
||||
.timeline_detach_ancestor(tenant_shard_id, timeline_id)
|
||||
.await
|
||||
.map_err(|e| {
|
||||
use mgmt_api::Error;
|
||||
|
||||
match e {
|
||||
// no ancestor (ever)
|
||||
Error::ApiError(StatusCode::CONFLICT, msg) => ApiError::Conflict(format!(
|
||||
"{node}: {}",
|
||||
msg.strip_prefix("Conflict: ").unwrap_or(&msg)
|
||||
)),
|
||||
// too many ancestors
|
||||
Error::ApiError(StatusCode::BAD_REQUEST, msg) => {
|
||||
ApiError::BadRequest(anyhow::anyhow!("{node}: {msg}"))
|
||||
match e {
|
||||
// no ancestor (ever)
|
||||
Error::ApiError(StatusCode::CONFLICT, msg) => ApiError::Conflict(format!(
|
||||
"{node}: {}",
|
||||
msg.strip_prefix("Conflict: ").unwrap_or(&msg)
|
||||
)),
|
||||
// too many ancestors
|
||||
Error::ApiError(StatusCode::BAD_REQUEST, msg) => {
|
||||
ApiError::BadRequest(anyhow::anyhow!("{node}: {msg}"))
|
||||
}
|
||||
Error::ApiError(StatusCode::INTERNAL_SERVER_ERROR, msg) => {
|
||||
// avoid turning these into conflicts to remain compatible with
|
||||
// pageservers, 500 errors are sadly retryable with timeline ancestor
|
||||
// detach
|
||||
ApiError::InternalServerError(anyhow::anyhow!("{node}: {msg}"))
|
||||
}
|
||||
// rest can be mapped as usual
|
||||
other => passthrough_api_error(&node, other),
|
||||
}
|
||||
Error::ApiError(StatusCode::INTERNAL_SERVER_ERROR, msg) => {
|
||||
// avoid turning these into conflicts to remain compatible with
|
||||
// pageservers, 500 errors are sadly retryable with timeline ancestor
|
||||
// detach
|
||||
ApiError::InternalServerError(anyhow::anyhow!("{node}: {msg}"))
|
||||
}
|
||||
// rest can be mapped as usual
|
||||
other => passthrough_api_error(&node, other),
|
||||
}
|
||||
})
|
||||
.map(|res| (tenant_shard_id.shard_number, res))
|
||||
}
|
||||
|
||||
// no shard needs to go first/last; the operation should be idempotent
|
||||
let mut results = self
|
||||
.tenant_for_shards(targets, |tenant_shard_id, node| {
|
||||
futures::FutureExt::boxed(detach_one(
|
||||
tenant_shard_id,
|
||||
timeline_id,
|
||||
node,
|
||||
self.config.jwt_token.clone(),
|
||||
))
|
||||
})
|
||||
.map(|res| (tenant_shard_id.shard_number, res))
|
||||
}
|
||||
.await?;
|
||||
|
||||
// no shard needs to go first/last; the operation should be idempotent
|
||||
let mut results = self
|
||||
.tenant_for_shards(targets, |tenant_shard_id, node| {
|
||||
futures::FutureExt::boxed(detach_one(
|
||||
tenant_shard_id,
|
||||
timeline_id,
|
||||
node,
|
||||
self.config.jwt_token.clone(),
|
||||
))
|
||||
})
|
||||
.await?;
|
||||
let any = results.pop().expect("we must have at least one response");
|
||||
|
||||
let any = results.pop().expect("we must have at least one response");
|
||||
let mismatching = results
|
||||
.iter()
|
||||
.filter(|(_, res)| res != &any.1)
|
||||
.collect::<Vec<_>>();
|
||||
if !mismatching.is_empty() {
|
||||
// this can be hit by races which should not happen because operation lock on cplane
|
||||
let matching = results.len() - mismatching.len();
|
||||
tracing::error!(
|
||||
matching,
|
||||
compared_against=?any,
|
||||
?mismatching,
|
||||
"shards returned different results"
|
||||
);
|
||||
|
||||
let mismatching = results
|
||||
.iter()
|
||||
.filter(|(_, res)| res != &any.1)
|
||||
.collect::<Vec<_>>();
|
||||
if !mismatching.is_empty() {
|
||||
// this can be hit by races which should not happen because operation lock on cplane
|
||||
let matching = results.len() - mismatching.len();
|
||||
tracing::error!(
|
||||
matching,
|
||||
compared_against=?any,
|
||||
?mismatching,
|
||||
"shards returned different results"
|
||||
);
|
||||
return Err(ApiError::InternalServerError(anyhow::anyhow!("pageservers returned mixed results for ancestor detach; manual intervention is required.")));
|
||||
}
|
||||
|
||||
return Err(ApiError::InternalServerError(anyhow::anyhow!("pageservers returned mixed results for ancestor detach; manual intervention is required.")));
|
||||
}
|
||||
|
||||
Ok(any.1)
|
||||
Ok(any.1)
|
||||
}).await?
|
||||
}
|
||||
|
||||
/// Helper for concurrently calling a pageserver API on a number of shards, such as timeline creation.
|
||||
@@ -3117,6 +3082,84 @@ impl Service {
|
||||
results
|
||||
}
|
||||
|
||||
/// Helper for safely working with the shards in a tenant remotely on pageservers, for example
|
||||
/// when creating and deleting timelines:
|
||||
/// - Makes sure shards are attached somewhere if they weren't already
|
||||
/// - Looks up the shards and the nodes where they were most recently attached
|
||||
/// - Guarantees that after the inner function returns, the shards' generations haven't moved on: this
|
||||
/// ensures that the remote operation acted on the most recent generation, and is therefore durable.
|
||||
async fn tenant_remote_mutation<R, O, F>(
|
||||
&self,
|
||||
tenant_id: TenantId,
|
||||
op: O,
|
||||
) -> Result<R, ApiError>
|
||||
where
|
||||
O: FnOnce(Vec<(TenantShardId, Node)>) -> F,
|
||||
F: std::future::Future<Output = R>,
|
||||
{
|
||||
let target_gens = {
|
||||
let mut targets = Vec::new();
|
||||
|
||||
// Load the currently attached pageservers for the latest generation of each shard. This can
|
||||
// run concurrently with reconciliations, and it is not guaranteed that the node we find here
|
||||
// will still be the latest when we're done: we will check generations again at the end of
|
||||
// this function to handle that.
|
||||
let generations = self.persistence.peek_generations(tenant_id).await?;
|
||||
let generations = if generations.iter().any(|i| i.1.is_none()) {
|
||||
// One or more shards is not attached to anything: maybe this is a new tenant? Wait for
|
||||
// it to reconcile.
|
||||
self.ensure_attached_wait(tenant_id).await?;
|
||||
self.persistence.peek_generations(tenant_id).await?
|
||||
} else {
|
||||
generations
|
||||
};
|
||||
|
||||
let locked = self.inner.read().unwrap();
|
||||
for (tenant_shard_id, generation, generation_pageserver) in generations {
|
||||
let node_id = generation_pageserver.ok_or(ApiError::Conflict(
|
||||
"Tenant not currently attached".to_string(),
|
||||
))?;
|
||||
let node = locked
|
||||
.nodes
|
||||
.get(&node_id)
|
||||
.ok_or(ApiError::Conflict(format!(
|
||||
"Raced with removal of node {node_id}"
|
||||
)))?;
|
||||
targets.push((tenant_shard_id, node.clone(), generation));
|
||||
}
|
||||
|
||||
targets
|
||||
};
|
||||
|
||||
let targets = target_gens.iter().map(|t| (t.0, t.1.clone())).collect();
|
||||
let result = op(targets).await;
|
||||
|
||||
// Post-check: are all the generations of all the shards the same as they were initially? This proves that
|
||||
// our remote operation executed on the latest generation and is therefore persistent.
|
||||
{
|
||||
let latest_generations = self.persistence.peek_generations(tenant_id).await?;
|
||||
if latest_generations
|
||||
.into_iter()
|
||||
.map(|g| (g.0, g.1))
|
||||
.collect::<Vec<_>>()
|
||||
!= target_gens
|
||||
.into_iter()
|
||||
.map(|i| (i.0, i.2))
|
||||
.collect::<Vec<_>>()
|
||||
{
|
||||
// We raced with something that incremented the generation, and therefore cannot be
|
||||
// confident that our actions are persistent (they might have hit an old generation).
|
||||
//
|
||||
// This is safe but requires a retry: ask the client to do that by giving them a 503 response.
|
||||
return Err(ApiError::ResourceUnavailable(
|
||||
"Tenant attachment changed, please retry".into(),
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
pub(crate) async fn tenant_timeline_delete(
|
||||
&self,
|
||||
tenant_id: TenantId,
|
||||
@@ -3130,83 +3173,62 @@ impl Service {
|
||||
)
|
||||
.await;
|
||||
|
||||
self.ensure_attached_wait(tenant_id).await?;
|
||||
|
||||
let mut targets = {
|
||||
let locked = self.inner.read().unwrap();
|
||||
let mut targets = Vec::new();
|
||||
|
||||
for (tenant_shard_id, shard) in
|
||||
locked.tenants.range(TenantShardId::tenant_range(tenant_id))
|
||||
{
|
||||
let node_id = shard.intent.get_attached().ok_or_else(|| {
|
||||
ApiError::InternalServerError(anyhow::anyhow!("Shard not scheduled"))
|
||||
})?;
|
||||
let node = locked
|
||||
.nodes
|
||||
.get(&node_id)
|
||||
.expect("Pageservers may not be deleted while referenced");
|
||||
|
||||
targets.push((*tenant_shard_id, node.clone()));
|
||||
self.tenant_remote_mutation(tenant_id, move |mut targets| async move {
|
||||
if targets.is_empty() {
|
||||
return Err(ApiError::NotFound(
|
||||
anyhow::anyhow!("Tenant not found").into(),
|
||||
));
|
||||
}
|
||||
targets
|
||||
};
|
||||
let shard_zero = targets.remove(0);
|
||||
|
||||
if targets.is_empty() {
|
||||
return Err(ApiError::NotFound(
|
||||
anyhow::anyhow!("Tenant not found").into(),
|
||||
));
|
||||
}
|
||||
let shard_zero = targets.remove(0);
|
||||
async fn delete_one(
|
||||
tenant_shard_id: TenantShardId,
|
||||
timeline_id: TimelineId,
|
||||
node: Node,
|
||||
jwt: Option<String>,
|
||||
) -> Result<StatusCode, ApiError> {
|
||||
tracing::info!(
|
||||
"Deleting timeline on shard {tenant_shard_id}/{timeline_id}, attached to node {node}",
|
||||
);
|
||||
|
||||
async fn delete_one(
|
||||
tenant_shard_id: TenantShardId,
|
||||
timeline_id: TimelineId,
|
||||
node: Node,
|
||||
jwt: Option<String>,
|
||||
) -> Result<StatusCode, ApiError> {
|
||||
tracing::info!(
|
||||
"Deleting timeline on shard {tenant_shard_id}/{timeline_id}, attached to node {node}",
|
||||
);
|
||||
let client = PageserverClient::new(node.get_id(), node.base_url(), jwt.as_deref());
|
||||
client
|
||||
.timeline_delete(tenant_shard_id, timeline_id)
|
||||
.await
|
||||
.map_err(|e| {
|
||||
ApiError::InternalServerError(anyhow::anyhow!(
|
||||
"Error deleting timeline {timeline_id} on {tenant_shard_id} on node {node}: {e}",
|
||||
))
|
||||
})
|
||||
}
|
||||
|
||||
let client = PageserverClient::new(node.get_id(), node.base_url(), jwt.as_deref());
|
||||
client
|
||||
.timeline_delete(tenant_shard_id, timeline_id)
|
||||
.await
|
||||
.map_err(|e| {
|
||||
ApiError::InternalServerError(anyhow::anyhow!(
|
||||
"Error deleting timeline {timeline_id} on {tenant_shard_id} on node {node}: {e}",
|
||||
let statuses = self
|
||||
.tenant_for_shards(targets, |tenant_shard_id: TenantShardId, node: Node| {
|
||||
Box::pin(delete_one(
|
||||
tenant_shard_id,
|
||||
timeline_id,
|
||||
node,
|
||||
self.config.jwt_token.clone(),
|
||||
))
|
||||
})
|
||||
}
|
||||
.await?;
|
||||
|
||||
let statuses = self
|
||||
.tenant_for_shards(targets, |tenant_shard_id: TenantShardId, node: Node| {
|
||||
Box::pin(delete_one(
|
||||
tenant_shard_id,
|
||||
timeline_id,
|
||||
node,
|
||||
self.config.jwt_token.clone(),
|
||||
))
|
||||
})
|
||||
// If any shards >0 haven't finished deletion yet, don't start deletion on shard zero
|
||||
if statuses.iter().any(|s| s != &StatusCode::NOT_FOUND) {
|
||||
return Ok(StatusCode::ACCEPTED);
|
||||
}
|
||||
|
||||
// Delete shard zero last: this is not strictly necessary, but since a caller's GET on a timeline will be routed
|
||||
// to shard zero, it gives a more obvious behavior that a GET returns 404 once the deletion is done.
|
||||
let shard_zero_status = delete_one(
|
||||
shard_zero.0,
|
||||
timeline_id,
|
||||
shard_zero.1,
|
||||
self.config.jwt_token.clone(),
|
||||
)
|
||||
.await?;
|
||||
|
||||
// If any shards >0 haven't finished deletion yet, don't start deletion on shard zero
|
||||
if statuses.iter().any(|s| s != &StatusCode::NOT_FOUND) {
|
||||
return Ok(StatusCode::ACCEPTED);
|
||||
}
|
||||
|
||||
// Delete shard zero last: this is not strictly necessary, but since a caller's GET on a timeline will be routed
|
||||
// to shard zero, it gives a more obvious behavior that a GET returns 404 once the deletion is done.
|
||||
let shard_zero_status = delete_one(
|
||||
shard_zero.0,
|
||||
timeline_id,
|
||||
shard_zero.1,
|
||||
self.config.jwt_token.clone(),
|
||||
)
|
||||
.await?;
|
||||
|
||||
Ok(shard_zero_status)
|
||||
Ok(shard_zero_status)
|
||||
}).await?
|
||||
}
|
||||
|
||||
/// When you need to send an HTTP request to the pageserver that holds shard0 of a tenant, this
|
||||
|
||||
@@ -2284,7 +2284,7 @@ class NeonStorageController(MetricsGetter, LogUtils):
|
||||
self.allowed_errors,
|
||||
)
|
||||
|
||||
def pageserver_api(self) -> PageserverHttpClient:
|
||||
def pageserver_api(self, *args, **kwargs) -> PageserverHttpClient:
|
||||
"""
|
||||
The storage controller implements a subset of the pageserver REST API, for mapping
|
||||
per-tenant actions into per-shard actions (e.g. timeline creation). Tests should invoke those
|
||||
@@ -2293,7 +2293,7 @@ class NeonStorageController(MetricsGetter, LogUtils):
|
||||
auth_token = None
|
||||
if self.auth_enabled:
|
||||
auth_token = self.env.auth_keys.generate_token(scope=TokenScope.PAGE_SERVER_API)
|
||||
return PageserverHttpClient(self.port, lambda: True, auth_token)
|
||||
return PageserverHttpClient(self.port, lambda: True, auth_token, *args, **kwargs)
|
||||
|
||||
def request(self, method, *args, **kwargs) -> requests.Response:
|
||||
resp = requests.request(method, *args, **kwargs)
|
||||
|
||||
@@ -21,7 +21,7 @@ from fixtures.neon_fixtures import (
|
||||
TokenScope,
|
||||
last_flush_lsn_upload,
|
||||
)
|
||||
from fixtures.pageserver.http import PageserverHttpClient
|
||||
from fixtures.pageserver.http import PageserverApiException, PageserverHttpClient
|
||||
from fixtures.pageserver.utils import (
|
||||
assert_prefix_empty,
|
||||
assert_prefix_not_empty,
|
||||
@@ -41,6 +41,7 @@ from mypy_boto3_s3.type_defs import (
|
||||
ObjectTypeDef,
|
||||
)
|
||||
from pytest_httpserver import HTTPServer
|
||||
from urllib3 import Retry
|
||||
from werkzeug.wrappers.request import Request
|
||||
from werkzeug.wrappers.response import Response
|
||||
|
||||
@@ -2266,3 +2267,66 @@ def test_storage_controller_ps_restarted_during_drain(neon_env_builder: NeonEnvB
|
||||
|
||||
# allow for small delay between actually having cancelled and being able reconfigure again
|
||||
wait_until(4, 0.5, reconfigure_node_again)
|
||||
|
||||
|
||||
def test_storage_controller_timeline_crud_race(neon_env_builder: NeonEnvBuilder):
|
||||
"""
|
||||
The storage controller is meant to handle the case where a timeline CRUD operation races
|
||||
with a generation-incrementing change to the tenant: this should trigger a retry so that
|
||||
the operation lands on the highest-generation'd tenant location.
|
||||
"""
|
||||
neon_env_builder.num_pageservers = 2
|
||||
env = neon_env_builder.init_configs()
|
||||
env.start()
|
||||
tenant_id = TenantId.generate()
|
||||
env.storage_controller.tenant_create(tenant_id)
|
||||
|
||||
# Set up a failpoint so that a timeline creation will be very slow
|
||||
failpoint = "timeline-creation-after-uninit"
|
||||
for ps in env.pageservers:
|
||||
ps.http_client().configure_failpoints((failpoint, "sleep(10000)"))
|
||||
|
||||
# Start a timeline creation in the background
|
||||
create_timeline_id = TimelineId.generate()
|
||||
futs = []
|
||||
with concurrent.futures.ThreadPoolExecutor(
|
||||
max_workers=2 + len(env.pageservers) + len(env.safekeepers)
|
||||
) as executor:
|
||||
futs.append(
|
||||
executor.submit(
|
||||
env.storage_controller.pageserver_api(
|
||||
retries=Retry(
|
||||
status=0,
|
||||
connect=0, # Disable retries: we want to see the 503
|
||||
)
|
||||
).timeline_create,
|
||||
PgVersion.NOT_SET,
|
||||
tenant_id,
|
||||
create_timeline_id,
|
||||
)
|
||||
)
|
||||
|
||||
def has_hit_failpoint():
|
||||
assert any(
|
||||
ps.log_contains(f"at failpoint {failpoint}") is not None for ps in env.pageservers
|
||||
)
|
||||
|
||||
wait_until(10, 1, has_hit_failpoint)
|
||||
|
||||
# Migrate the tenant while the timeline creation is in progress: this migration will complete once it
|
||||
# can detach from the old pageserver, which will happen once the failpoint completes.
|
||||
env.storage_controller.tenant_shard_migrate(
|
||||
TenantShardId(tenant_id, 0, 0), env.pageservers[1].id
|
||||
)
|
||||
|
||||
with pytest.raises(PageserverApiException, match="Tenant attachment changed, please retry"):
|
||||
futs[0].result(timeout=20)
|
||||
|
||||
# Timeline creation should work when there isn't a concurrent migration, even though it's
|
||||
# slow (our failpoint is still enabled)
|
||||
env.storage_controller.pageserver_api(
|
||||
retries=Retry(
|
||||
status=0,
|
||||
connect=0, # Disable retries: we want to see the 503
|
||||
)
|
||||
).timeline_create(PgVersion.NOT_SET, tenant_id, create_timeline_id)
|
||||
|
||||
Reference in New Issue
Block a user