Merge branch 'main' into communicator-rewrite

This commit is contained in:
Erik Grinaker
2025-07-17 10:59:37 +02:00
129 changed files with 5887 additions and 1063 deletions

View File

@@ -735,15 +735,13 @@ async fn handle_tenant_timeline_passthrough(
);
// Find the node that holds shard zero
let (node, tenant_shard_id) = if tenant_or_shard_id.is_unsharded() {
let (node, tenant_shard_id, consistent) = if tenant_or_shard_id.is_unsharded() {
service
.tenant_shard0_node(tenant_or_shard_id.tenant_id)
.await?
} else {
(
service.tenant_shard_node(tenant_or_shard_id).await?,
tenant_or_shard_id,
)
let (node, consistent) = service.tenant_shard_node(tenant_or_shard_id).await?;
(node, tenant_or_shard_id, consistent)
};
// Callers will always pass an unsharded tenant ID. Before proxying, we must
@@ -788,16 +786,12 @@ async fn handle_tenant_timeline_passthrough(
}
// Transform 404 into 503 if we raced with a migration
if resp.status() == reqwest::StatusCode::NOT_FOUND {
// Look up node again: if we migrated it will be different
let new_node = service.tenant_shard_node(tenant_shard_id).await?;
if new_node.get_id() != node.get_id() {
// Rather than retry here, send the client a 503 to prompt a retry: this matches
// the pageserver's use of 503, and all clients calling this API should retry on 503.
return Err(ApiError::ResourceUnavailable(
format!("Pageserver {node} returned 404, was migrated to {new_node}").into(),
));
}
if resp.status() == reqwest::StatusCode::NOT_FOUND && !consistent {
// Rather than retry here, send the client a 503 to prompt a retry: this matches
// the pageserver's use of 503, and all clients calling this API should retry on 503.
return Err(ApiError::ResourceUnavailable(
format!("Pageserver {node} returned 404 due to ongoing migration, retry later").into(),
));
}
// We have a reqest::Response, would like a http::Response
@@ -2597,6 +2591,17 @@ pub fn make_router(
)
},
)
// Tenant timeline mark_invisible passthrough to shard zero
.put(
"/v1/tenant/:tenant_id/timeline/:timeline_id/mark_invisible",
|r| {
tenant_service_handler(
r,
handle_tenant_timeline_passthrough,
RequestName("v1_tenant_timeline_mark_invisible_passthrough"),
)
},
)
// Tenant detail GET passthrough to shard zero:
.get("/v1/tenant/:tenant_id", |r| {
tenant_service_handler(
@@ -2615,17 +2620,6 @@ pub fn make_router(
RequestName("v1_tenant_passthrough"),
)
})
// Tenant timeline mark_invisible passthrough to shard zero
.put(
"/v1/tenant/:tenant_id/timeline/:timeline_id/mark_invisible",
|r| {
tenant_service_handler(
r,
handle_tenant_timeline_passthrough,
RequestName("v1_tenant_timeline_mark_invisible_passthrough"),
)
},
)
}
#[cfg(test)]

View File

@@ -207,6 +207,27 @@ enum ShardGenerationValidity {
},
}
/// We collect the state of attachments for some operations to determine if the operation
/// needs to be retried when it fails.
struct TenantShardAttachState {
/// The targets of the operation.
///
/// Tenant shard ID, node ID, node, is intent node observed primary.
targets: Vec<(TenantShardId, NodeId, Node, bool)>,
/// The targets grouped by node ID.
by_node_id: HashMap<NodeId, (TenantShardId, Node, bool)>,
}
impl TenantShardAttachState {
fn for_api_call(&self) -> Vec<(TenantShardId, Node)> {
self.targets
.iter()
.map(|(tenant_shard_id, _, node, _)| (*tenant_shard_id, node.clone()))
.collect()
}
}
pub const RECONCILER_CONCURRENCY_DEFAULT: usize = 128;
pub const PRIORITY_RECONCILER_CONCURRENCY_DEFAULT: usize = 256;
pub const SAFEKEEPER_RECONCILER_CONCURRENCY_DEFAULT: usize = 32;
@@ -4752,6 +4773,86 @@ impl Service {
Ok(())
}
fn is_observed_consistent_with_intent(
&self,
shard: &TenantShard,
intent_node_id: NodeId,
) -> bool {
if let Some(location) = shard.observed.locations.get(&intent_node_id)
&& let Some(ref conf) = location.conf
&& (conf.mode == LocationConfigMode::AttachedSingle
|| conf.mode == LocationConfigMode::AttachedMulti)
{
true
} else {
false
}
}
fn collect_tenant_shards(
&self,
tenant_id: TenantId,
) -> Result<TenantShardAttachState, ApiError> {
let locked = self.inner.read().unwrap();
let mut targets = Vec::new();
let mut by_node_id = HashMap::new();
// If the request got an unsharded tenant id, then apply
// the operation to all shards. Otherwise, apply it to a specific shard.
let shards_range = TenantShardId::tenant_range(tenant_id);
for (tenant_shard_id, shard) in locked.tenants.range(shards_range) {
if let Some(node_id) = shard.intent.get_attached() {
let node = locked
.nodes
.get(node_id)
.expect("Pageservers may not be deleted while referenced");
let consistent = self.is_observed_consistent_with_intent(shard, *node_id);
targets.push((*tenant_shard_id, *node_id, node.clone(), consistent));
by_node_id.insert(*node_id, (*tenant_shard_id, node.clone(), consistent));
}
}
Ok(TenantShardAttachState {
targets,
by_node_id,
})
}
fn process_result_and_passthrough_errors<T>(
&self,
results: Vec<(Node, Result<T, mgmt_api::Error>)>,
attach_state: TenantShardAttachState,
) -> Result<Vec<(Node, T)>, ApiError> {
let mut processed_results: Vec<(Node, T)> = Vec::with_capacity(results.len());
debug_assert_eq!(results.len(), attach_state.targets.len());
for (node, res) in results {
let is_consistent = attach_state
.by_node_id
.get(&node.get_id())
.map(|(_, _, consistent)| *consistent);
match res {
Ok(res) => processed_results.push((node, res)),
Err(mgmt_api::Error::ApiError(StatusCode::NOT_FOUND, _))
if is_consistent == Some(false) =>
{
// This is expected if the attach is not finished yet. Return 503 so that the client can retry.
return Err(ApiError::ResourceUnavailable(
format!(
"Timeline is not attached to the pageserver {} yet, please retry",
node.get_id()
)
.into(),
));
}
Err(e) => return Err(passthrough_api_error(&node, e)),
}
}
Ok(processed_results)
}
pub(crate) async fn tenant_timeline_lsn_lease(
&self,
tenant_id: TenantId,
@@ -4765,49 +4866,11 @@ impl Service {
)
.await;
let mut retry_if_not_attached = false;
let targets = {
let locked = self.inner.read().unwrap();
let mut targets = Vec::new();
let attach_state = self.collect_tenant_shards(tenant_id)?;
// If the request got an unsharded tenant id, then apply
// the operation to all shards. Otherwise, apply it to a specific shard.
let shards_range = TenantShardId::tenant_range(tenant_id);
for (tenant_shard_id, shard) in locked.tenants.range(shards_range) {
if let Some(node_id) = shard.intent.get_attached() {
let node = locked
.nodes
.get(node_id)
.expect("Pageservers may not be deleted while referenced");
targets.push((*tenant_shard_id, node.clone()));
if let Some(location) = shard.observed.locations.get(node_id) {
if let Some(ref conf) = location.conf {
if conf.mode != LocationConfigMode::AttachedSingle
&& conf.mode != LocationConfigMode::AttachedMulti
{
// If the shard is attached as secondary, we need to retry if 404.
retry_if_not_attached = true;
}
// If the shard is attached as primary, we should succeed.
} else {
// Location conf is not available yet, retry if 404.
retry_if_not_attached = true;
}
} else {
// The shard is not attached to the intended pageserver yet, retry if 404.
retry_if_not_attached = true;
}
}
}
targets
};
let res = self
let results = self
.tenant_for_shards_api(
targets,
attach_state.for_api_call(),
|tenant_shard_id, client| async move {
client
.timeline_lease_lsn(tenant_shard_id, timeline_id, lsn)
@@ -4820,31 +4883,13 @@ impl Service {
)
.await;
let leases = self.process_result_and_passthrough_errors(results, attach_state)?;
let mut valid_until = None;
for (node, r) in res {
match r {
Ok(lease) => {
if let Some(ref mut valid_until) = valid_until {
*valid_until = std::cmp::min(*valid_until, lease.valid_until);
} else {
valid_until = Some(lease.valid_until);
}
}
Err(mgmt_api::Error::ApiError(StatusCode::NOT_FOUND, _))
if retry_if_not_attached =>
{
// This is expected if the attach is not finished yet. Return 503 so that the client can retry.
return Err(ApiError::ResourceUnavailable(
format!(
"Timeline is not attached to the pageserver {} yet, please retry",
node.get_id()
)
.into(),
));
}
Err(e) => {
return Err(passthrough_api_error(&node, e));
}
for (_, lease) in leases {
if let Some(ref mut valid_until) = valid_until {
*valid_until = std::cmp::min(*valid_until, lease.valid_until);
} else {
valid_until = Some(lease.valid_until);
}
}
Ok(LsnLease {
@@ -5267,10 +5312,12 @@ impl Service {
status_code
}
/// When you know the TenantId but not a specific shard, and would like to get the node holding shard 0.
///
/// Returns the node, tenant shard id, and whether it is consistent with the observed state.
pub(crate) async fn tenant_shard0_node(
&self,
tenant_id: TenantId,
) -> Result<(Node, TenantShardId), ApiError> {
) -> Result<(Node, TenantShardId, bool), ApiError> {
let tenant_shard_id = {
let locked = self.inner.read().unwrap();
let Some((tenant_shard_id, _shard)) = locked
@@ -5288,15 +5335,17 @@ impl Service {
self.tenant_shard_node(tenant_shard_id)
.await
.map(|node| (node, tenant_shard_id))
.map(|(node, consistent)| (node, tenant_shard_id, consistent))
}
/// When you need to send an HTTP request to the pageserver that holds a shard of a tenant, this
/// function looks up and returns node. If the shard isn't found, returns Err(ApiError::NotFound)
///
/// Returns the intent node and whether it is consistent with the observed state.
pub(crate) async fn tenant_shard_node(
&self,
tenant_shard_id: TenantShardId,
) -> Result<Node, ApiError> {
) -> Result<(Node, bool), ApiError> {
// Look up in-memory state and maybe use the node from there.
{
let locked = self.inner.read().unwrap();
@@ -5326,7 +5375,8 @@ impl Service {
"Shard refers to nonexistent node"
)));
};
return Ok(node.clone());
let consistent = self.is_observed_consistent_with_intent(shard, *intent_node_id);
return Ok((node.clone(), consistent));
}
};
@@ -5360,8 +5410,8 @@ impl Service {
"Shard refers to nonexistent node"
)));
};
Ok(node.clone())
// As a reconciliation is in flight, we do not have the observed state yet, and therefore we assume it is always inconsistent.
Ok((node.clone(), false))
}
pub(crate) fn tenant_locate(

View File

@@ -1272,7 +1272,9 @@ impl TenantShard {
}
/// Return true if the optimization was really applied: it will not be applied if the optimization's
/// sequence is behind this tenant shard's
/// sequence is behind this tenant shard's or if the intent state proposed by the optimization
/// is not compatible with the current intent state. The later may happen when the background
/// reconcile loops runs concurrently with HTTP driven optimisations.
pub(crate) fn apply_optimization(
&mut self,
scheduler: &mut Scheduler,
@@ -1282,6 +1284,15 @@ impl TenantShard {
return false;
}
if !self.validate_optimization(&optimization) {
tracing::info!(
"Skipping optimization for {} because it does not match current intent: {:?}",
self.tenant_shard_id,
optimization,
);
return false;
}
metrics::METRICS_REGISTRY
.metrics_group
.storage_controller_schedule_optimization
@@ -1322,6 +1333,34 @@ impl TenantShard {
true
}
/// Check that the desired modifications to the intent state are compatible with
/// the current intent state
fn validate_optimization(&self, optimization: &ScheduleOptimization) -> bool {
match optimization.action {
ScheduleOptimizationAction::MigrateAttachment(MigrateAttachment {
old_attached_node_id,
new_attached_node_id,
}) => {
self.intent.attached == Some(old_attached_node_id)
&& self.intent.secondary.contains(&new_attached_node_id)
}
ScheduleOptimizationAction::ReplaceSecondary(ReplaceSecondary {
old_node_id: _,
new_node_id,
}) => {
// It's legal to remove a secondary that is not present in the intent state
!self.intent.secondary.contains(&new_node_id)
}
ScheduleOptimizationAction::CreateSecondary(new_node_id) => {
!self.intent.secondary.contains(&new_node_id)
}
ScheduleOptimizationAction::RemoveSecondary(_) => {
// It's legal to remove a secondary that is not present in the intent state
true
}
}
}
/// When a shard has several secondary locations, we need to pick one in situations where
/// we promote one of them to an attached location:
/// - When draining a node for restart