mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-05 20:42:54 +00:00
storcon: do timeline creation on all attached location (#9237)
## Problem Creation of a timelines during a reconciliation can lead to unavailability if the user attempts to start a compute before the storage controller has notified cplane of the cut-over. ## Summary of changes Create timelines on all currently attached locations. For the latest location, we still look at the database (this is a previously). With this change we also look into the observed state to find *other* attached locations. Related https://github.com/neondatabase/neon/issues/9144
This commit is contained in:
@@ -526,6 +526,21 @@ pub(crate) enum ReconcileResultRequest {
|
||||
Stop,
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
struct MutationLocation {
|
||||
node: Node,
|
||||
generation: Generation,
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
struct ShardMutationLocations {
|
||||
latest: MutationLocation,
|
||||
other: Vec<MutationLocation>,
|
||||
}
|
||||
|
||||
#[derive(Default, Clone)]
|
||||
struct TenantMutationLocations(BTreeMap<TenantShardId, ShardMutationLocations>);
|
||||
|
||||
impl Service {
|
||||
pub fn get_config(&self) -> &Config {
|
||||
&self.config
|
||||
@@ -2987,38 +3002,83 @@ impl Service {
|
||||
failpoint_support::sleep_millis_async!("tenant-create-timeline-shared-lock");
|
||||
|
||||
self.tenant_remote_mutation(tenant_id, move |mut targets| async move {
|
||||
if targets.is_empty() {
|
||||
if targets.0.is_empty() {
|
||||
return Err(ApiError::NotFound(
|
||||
anyhow::anyhow!("Tenant not found").into(),
|
||||
));
|
||||
};
|
||||
let shard_zero = targets.remove(0);
|
||||
|
||||
let (shard_zero_tid, shard_zero_locations) =
|
||||
targets.0.pop_first().expect("Must have at least one shard");
|
||||
assert!(shard_zero_tid.is_shard_zero());
|
||||
|
||||
async fn create_one(
|
||||
tenant_shard_id: TenantShardId,
|
||||
node: Node,
|
||||
locations: ShardMutationLocations,
|
||||
jwt: Option<String>,
|
||||
create_req: TimelineCreateRequest,
|
||||
) -> Result<TimelineInfo, ApiError> {
|
||||
let latest = locations.latest.node;
|
||||
|
||||
tracing::info!(
|
||||
"Creating timeline on shard {}/{}, attached to node {node}",
|
||||
"Creating timeline on shard {}/{}, attached to node {latest} in generation {:?}",
|
||||
tenant_shard_id,
|
||||
create_req.new_timeline_id,
|
||||
locations.latest.generation
|
||||
);
|
||||
let client = PageserverClient::new(node.get_id(), node.base_url(), jwt.as_deref());
|
||||
|
||||
client
|
||||
let client =
|
||||
PageserverClient::new(latest.get_id(), latest.base_url(), jwt.as_deref());
|
||||
|
||||
let timeline_info = client
|
||||
.timeline_create(tenant_shard_id, &create_req)
|
||||
.await
|
||||
.map_err(|e| passthrough_api_error(&node, e))
|
||||
.map_err(|e| passthrough_api_error(&latest, e))?;
|
||||
|
||||
// We propagate timeline creations to all attached locations such that a compute
|
||||
// for the new timeline is able to start regardless of the current state of the
|
||||
// tenant shard reconciliation.
|
||||
for location in locations.other {
|
||||
tracing::info!(
|
||||
"Creating timeline on shard {}/{}, stale attached to node {} in generation {:?}",
|
||||
tenant_shard_id,
|
||||
create_req.new_timeline_id,
|
||||
location.node,
|
||||
location.generation
|
||||
);
|
||||
|
||||
let client = PageserverClient::new(
|
||||
location.node.get_id(),
|
||||
location.node.base_url(),
|
||||
jwt.as_deref(),
|
||||
);
|
||||
|
||||
let res = client
|
||||
.timeline_create(tenant_shard_id, &create_req)
|
||||
.await;
|
||||
|
||||
if let Err(e) = res {
|
||||
match e {
|
||||
mgmt_api::Error::ApiError(StatusCode::NOT_FOUND, _) => {
|
||||
// Tenant might have been detached from the stale location,
|
||||
// so ignore 404s.
|
||||
},
|
||||
_ => {
|
||||
return Err(passthrough_api_error(&location.node, e));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(timeline_info)
|
||||
}
|
||||
|
||||
// Because the caller might not provide an explicit LSN, we must do the creation first on a single shard, and then
|
||||
// use whatever LSN that shard picked when creating on subsequent shards. We arbitrarily use shard zero as the shard
|
||||
// that will get the first creation request, and propagate the LSN to all the >0 shards.
|
||||
let timeline_info = create_one(
|
||||
shard_zero.0,
|
||||
shard_zero.1,
|
||||
shard_zero_tid,
|
||||
shard_zero_locations,
|
||||
self.config.jwt_token.clone(),
|
||||
create_req.clone(),
|
||||
)
|
||||
@@ -3031,14 +3091,24 @@ impl Service {
|
||||
}
|
||||
|
||||
// Create timeline on remaining shards with number >0
|
||||
if !targets.is_empty() {
|
||||
if !targets.0.is_empty() {
|
||||
// If we had multiple shards, issue requests for the remainder now.
|
||||
let jwt = &self.config.jwt_token;
|
||||
self.tenant_for_shards(
|
||||
targets.iter().map(|t| (t.0, t.1.clone())).collect(),
|
||||
|tenant_shard_id: TenantShardId, node: Node| {
|
||||
targets
|
||||
.0
|
||||
.iter()
|
||||
.map(|t| (*t.0, t.1.latest.node.clone()))
|
||||
.collect(),
|
||||
|tenant_shard_id: TenantShardId, _node: Node| {
|
||||
let create_req = create_req.clone();
|
||||
Box::pin(create_one(tenant_shard_id, node, jwt.clone(), create_req))
|
||||
let mutation_locations = targets.0.remove(&tenant_shard_id).unwrap();
|
||||
Box::pin(create_one(
|
||||
tenant_shard_id,
|
||||
mutation_locations,
|
||||
jwt.clone(),
|
||||
create_req,
|
||||
))
|
||||
},
|
||||
)
|
||||
.await?;
|
||||
@@ -3068,7 +3138,7 @@ impl Service {
|
||||
.await;
|
||||
|
||||
self.tenant_remote_mutation(tenant_id, move |targets| async move {
|
||||
if targets.is_empty() {
|
||||
if targets.0.is_empty() {
|
||||
return Err(ApiError::NotFound(
|
||||
anyhow::anyhow!("Tenant not found").into(),
|
||||
));
|
||||
@@ -3099,8 +3169,9 @@ impl Service {
|
||||
|
||||
// no shard needs to go first/last; the operation should be idempotent
|
||||
// TODO: it would be great to ensure that all shards return the same error
|
||||
let locations = targets.0.iter().map(|t| (*t.0, t.1.latest.node.clone())).collect();
|
||||
let results = self
|
||||
.tenant_for_shards(targets, |tenant_shard_id, node| {
|
||||
.tenant_for_shards(locations, |tenant_shard_id, node| {
|
||||
futures::FutureExt::boxed(config_one(
|
||||
tenant_shard_id,
|
||||
timeline_id,
|
||||
@@ -3131,7 +3202,7 @@ impl Service {
|
||||
.await;
|
||||
|
||||
self.tenant_remote_mutation(tenant_id, move |targets| async move {
|
||||
if targets.is_empty() {
|
||||
if targets.0.is_empty() {
|
||||
return Err(ApiError::NotFound(
|
||||
anyhow::anyhow!("Tenant not found").into(),
|
||||
));
|
||||
@@ -3179,8 +3250,9 @@ impl Service {
|
||||
}
|
||||
|
||||
// no shard needs to go first/last; the operation should be idempotent
|
||||
let locations = targets.0.iter().map(|t| (*t.0, t.1.latest.node.clone())).collect();
|
||||
let mut results = self
|
||||
.tenant_for_shards(targets, |tenant_shard_id, node| {
|
||||
.tenant_for_shards(locations, |tenant_shard_id, node| {
|
||||
futures::FutureExt::boxed(detach_one(
|
||||
tenant_shard_id,
|
||||
timeline_id,
|
||||
@@ -3227,7 +3299,7 @@ impl Service {
|
||||
.await;
|
||||
|
||||
self.tenant_remote_mutation(tenant_id, move |targets| async move {
|
||||
if targets.is_empty() {
|
||||
if targets.0.is_empty() {
|
||||
return Err(ApiError::NotFound(
|
||||
anyhow::anyhow!("Tenant not found").into(),
|
||||
));
|
||||
@@ -3249,7 +3321,12 @@ impl Service {
|
||||
}
|
||||
|
||||
// no shard needs to go first/last; the operation should be idempotent
|
||||
self.tenant_for_shards(targets, |tenant_shard_id, node| {
|
||||
let locations = targets
|
||||
.0
|
||||
.iter()
|
||||
.map(|t| (*t.0, t.1.latest.node.clone()))
|
||||
.collect();
|
||||
self.tenant_for_shards(locations, |tenant_shard_id, node| {
|
||||
futures::FutureExt::boxed(do_one(
|
||||
tenant_shard_id,
|
||||
timeline_id,
|
||||
@@ -3344,11 +3421,11 @@ impl Service {
|
||||
op: O,
|
||||
) -> Result<R, ApiError>
|
||||
where
|
||||
O: FnOnce(Vec<(TenantShardId, Node)>) -> F,
|
||||
O: FnOnce(TenantMutationLocations) -> F,
|
||||
F: std::future::Future<Output = R>,
|
||||
{
|
||||
let target_gens = {
|
||||
let mut targets = Vec::new();
|
||||
let mutation_locations = {
|
||||
let mut locations = TenantMutationLocations::default();
|
||||
|
||||
// Load the currently attached pageservers for the latest generation of each shard. This can
|
||||
// run concurrently with reconciliations, and it is not guaranteed that the node we find here
|
||||
@@ -3399,14 +3476,50 @@ impl Service {
|
||||
.ok_or(ApiError::Conflict(format!(
|
||||
"Raced with removal of node {node_id}"
|
||||
)))?;
|
||||
targets.push((tenant_shard_id, node.clone(), generation));
|
||||
let generation = generation.expect("Checked above");
|
||||
|
||||
let tenant = locked.tenants.get(&tenant_shard_id);
|
||||
|
||||
// TODO(vlad): Abstract the logic that finds stale attached locations
|
||||
// from observed state into a [`Service`] method.
|
||||
let other_locations = match tenant {
|
||||
Some(tenant) => {
|
||||
let mut other = tenant.attached_locations();
|
||||
let latest_location_index =
|
||||
other.iter().position(|&l| l == (node.get_id(), generation));
|
||||
if let Some(idx) = latest_location_index {
|
||||
other.remove(idx);
|
||||
}
|
||||
|
||||
other
|
||||
}
|
||||
None => Vec::default(),
|
||||
};
|
||||
|
||||
let location = ShardMutationLocations {
|
||||
latest: MutationLocation {
|
||||
node: node.clone(),
|
||||
generation,
|
||||
},
|
||||
other: other_locations
|
||||
.into_iter()
|
||||
.filter_map(|(node_id, generation)| {
|
||||
let node = locked.nodes.get(&node_id)?;
|
||||
|
||||
Some(MutationLocation {
|
||||
node: node.clone(),
|
||||
generation,
|
||||
})
|
||||
})
|
||||
.collect(),
|
||||
};
|
||||
locations.0.insert(tenant_shard_id, location);
|
||||
}
|
||||
|
||||
targets
|
||||
locations
|
||||
};
|
||||
|
||||
let targets = target_gens.iter().map(|t| (t.0, t.1.clone())).collect();
|
||||
let result = op(targets).await;
|
||||
let result = op(mutation_locations.clone()).await;
|
||||
|
||||
// Post-check: are all the generations of all the shards the same as they were initially? This proves that
|
||||
// our remote operation executed on the latest generation and is therefore persistent.
|
||||
@@ -3422,9 +3535,10 @@ impl Service {
|
||||
}| (tenant_shard_id, generation),
|
||||
)
|
||||
.collect::<Vec<_>>()
|
||||
!= target_gens
|
||||
!= mutation_locations
|
||||
.0
|
||||
.into_iter()
|
||||
.map(|i| (i.0, i.2))
|
||||
.map(|i| (i.0, Some(i.1.latest.generation)))
|
||||
.collect::<Vec<_>>()
|
||||
{
|
||||
// We raced with something that incremented the generation, and therefore cannot be
|
||||
@@ -3454,12 +3568,14 @@ impl Service {
|
||||
.await;
|
||||
|
||||
self.tenant_remote_mutation(tenant_id, move |mut targets| async move {
|
||||
if targets.is_empty() {
|
||||
if targets.0.is_empty() {
|
||||
return Err(ApiError::NotFound(
|
||||
anyhow::anyhow!("Tenant not found").into(),
|
||||
));
|
||||
}
|
||||
let shard_zero = targets.remove(0);
|
||||
|
||||
let (shard_zero_tid, shard_zero_locations) = targets.0.pop_first().expect("Must have at least one shard");
|
||||
assert!(shard_zero_tid.is_shard_zero());
|
||||
|
||||
async fn delete_one(
|
||||
tenant_shard_id: TenantShardId,
|
||||
@@ -3482,8 +3598,9 @@ impl Service {
|
||||
})
|
||||
}
|
||||
|
||||
let locations = targets.0.iter().map(|t| (*t.0, t.1.latest.node.clone())).collect();
|
||||
let statuses = self
|
||||
.tenant_for_shards(targets, |tenant_shard_id: TenantShardId, node: Node| {
|
||||
.tenant_for_shards(locations, |tenant_shard_id: TenantShardId, node: Node| {
|
||||
Box::pin(delete_one(
|
||||
tenant_shard_id,
|
||||
timeline_id,
|
||||
@@ -3501,9 +3618,9 @@ impl Service {
|
||||
// Delete shard zero last: this is not strictly necessary, but since a caller's GET on a timeline will be routed
|
||||
// to shard zero, it gives a more obvious behavior that a GET returns 404 once the deletion is done.
|
||||
let shard_zero_status = delete_one(
|
||||
shard_zero.0,
|
||||
shard_zero_tid,
|
||||
timeline_id,
|
||||
shard_zero.1,
|
||||
shard_zero_locations.latest.node,
|
||||
self.config.jwt_token.clone(),
|
||||
)
|
||||
.await?;
|
||||
|
||||
@@ -17,6 +17,7 @@ use crate::{
|
||||
service::ReconcileResultRequest,
|
||||
};
|
||||
use futures::future::{self, Either};
|
||||
use itertools::Itertools;
|
||||
use pageserver_api::controller_api::{
|
||||
AvailabilityZone, NodeSchedulingPolicy, PlacementPolicy, ShardSchedulingPolicy,
|
||||
};
|
||||
@@ -1410,6 +1411,32 @@ impl TenantShard {
|
||||
pub(crate) fn set_preferred_az(&mut self, preferred_az_id: AvailabilityZone) {
|
||||
self.preferred_az_id = Some(preferred_az_id);
|
||||
}
|
||||
|
||||
/// Returns all the nodes to which this tenant shard is attached according to the
|
||||
/// observed state and the generations. Return vector is sorted from latest generation
|
||||
/// to earliest.
|
||||
pub(crate) fn attached_locations(&self) -> Vec<(NodeId, Generation)> {
|
||||
self.observed
|
||||
.locations
|
||||
.iter()
|
||||
.filter_map(|(node_id, observed)| {
|
||||
use LocationConfigMode::{AttachedMulti, AttachedSingle, AttachedStale};
|
||||
|
||||
let conf = observed.conf.as_ref()?;
|
||||
|
||||
match (conf.generation, conf.mode) {
|
||||
(Some(gen), AttachedMulti | AttachedSingle | AttachedStale) => {
|
||||
Some((*node_id, gen))
|
||||
}
|
||||
_ => None,
|
||||
}
|
||||
})
|
||||
.sorted_by(|(_lhs_node_id, lhs_gen), (_rhs_node_id, rhs_gen)| {
|
||||
lhs_gen.cmp(rhs_gen).reverse()
|
||||
})
|
||||
.map(|(node_id, gen)| (node_id, Generation::new(gen)))
|
||||
.collect()
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
||||
Reference in New Issue
Block a user