mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-20 14:40:37 +00:00
Merge remote-tracking branch 'origin/main' into vlad/hadron-jwt
This commit is contained in:
@@ -6,13 +6,16 @@ use std::time::Duration;
|
||||
|
||||
use anyhow::Context;
|
||||
use compute_api::spec::PageserverProtocol;
|
||||
use control_plane::endpoint::{ComputeControlPlane, EndpointStatus};
|
||||
use compute_api::spec::PageserverShardInfo;
|
||||
use control_plane::endpoint::{
|
||||
ComputeControlPlane, EndpointStatus, PageserverConnectionInfo, PageserverShardConnectionInfo,
|
||||
};
|
||||
use control_plane::local_env::LocalEnv;
|
||||
use futures::StreamExt;
|
||||
use hyper::StatusCode;
|
||||
use pageserver_api::config::DEFAULT_GRPC_LISTEN_PORT;
|
||||
use pageserver_api::controller_api::AvailabilityZone;
|
||||
use pageserver_api::shard::{ShardCount, ShardNumber, ShardStripeSize, TenantShardId};
|
||||
use pageserver_api::shard::{ShardCount, ShardIndex, ShardNumber, ShardStripeSize, TenantShardId};
|
||||
use postgres_connection::parse_host_port;
|
||||
use safekeeper_api::membership::SafekeeperGeneration;
|
||||
use serde::{Deserialize, Serialize};
|
||||
@@ -506,27 +509,64 @@ impl ApiMethod for ComputeHookTenant {
|
||||
if endpoint.tenant_id == *tenant_id && endpoint.status() == EndpointStatus::Running {
|
||||
tracing::info!("Reconfiguring pageservers for endpoint {endpoint_name}");
|
||||
|
||||
let pageservers = shards
|
||||
.iter()
|
||||
.map(|shard| {
|
||||
let ps_conf = env
|
||||
.get_pageserver_conf(shard.node_id)
|
||||
.expect("Unknown pageserver");
|
||||
if endpoint.grpc {
|
||||
let addr = ps_conf.listen_grpc_addr.as_ref().expect("no gRPC address");
|
||||
let (host, port) = parse_host_port(addr).expect("invalid gRPC address");
|
||||
let port = port.unwrap_or(DEFAULT_GRPC_LISTEN_PORT);
|
||||
(PageserverProtocol::Grpc, host, port)
|
||||
} else {
|
||||
let (host, port) = parse_host_port(&ps_conf.listen_pg_addr)
|
||||
.expect("Unable to parse listen_pg_addr");
|
||||
(PageserverProtocol::Libpq, host, port.unwrap_or(5432))
|
||||
}
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
let shard_count = match shards.len() {
|
||||
1 => ShardCount::unsharded(),
|
||||
n => ShardCount(n.try_into().expect("too many shards")),
|
||||
};
|
||||
|
||||
let mut shard_infos: HashMap<ShardIndex, PageserverShardInfo> = HashMap::new();
|
||||
|
||||
let prefer_protocol = if endpoint.grpc {
|
||||
PageserverProtocol::Grpc
|
||||
} else {
|
||||
PageserverProtocol::Libpq
|
||||
};
|
||||
|
||||
for shard in shards.iter() {
|
||||
let ps_conf = env
|
||||
.get_pageserver_conf(shard.node_id)
|
||||
.expect("Unknown pageserver");
|
||||
|
||||
let libpq_url = Some({
|
||||
let (host, port) = parse_host_port(&ps_conf.listen_pg_addr)
|
||||
.expect("Unable to parse listen_pg_addr");
|
||||
let port = port.unwrap_or(5432);
|
||||
format!("postgres://no_user@{host}:{port}")
|
||||
});
|
||||
let grpc_url = if let Some(grpc_addr) = &ps_conf.listen_grpc_addr {
|
||||
let (host, port) =
|
||||
parse_host_port(grpc_addr).expect("invalid gRPC address");
|
||||
let port = port.unwrap_or(DEFAULT_GRPC_LISTEN_PORT);
|
||||
Some(format!("grpc://no_user@{host}:{port}"))
|
||||
} else {
|
||||
None
|
||||
};
|
||||
let pageserver = PageserverShardConnectionInfo {
|
||||
id: Some(shard.node_id),
|
||||
libpq_url,
|
||||
grpc_url,
|
||||
};
|
||||
let shard_info = PageserverShardInfo {
|
||||
pageservers: vec![pageserver],
|
||||
};
|
||||
shard_infos.insert(
|
||||
ShardIndex {
|
||||
shard_number: shard.shard_number,
|
||||
shard_count,
|
||||
},
|
||||
shard_info,
|
||||
);
|
||||
}
|
||||
|
||||
let pageserver_conninfo = PageserverConnectionInfo {
|
||||
shard_count,
|
||||
stripe_size: stripe_size.map(|val| ShardStripeSize(val.0)),
|
||||
shards: shard_infos,
|
||||
prefer_protocol,
|
||||
};
|
||||
|
||||
endpoint
|
||||
.reconfigure_pageservers(pageservers, *stripe_size)
|
||||
.reconfigure_pageservers(&pageserver_conninfo)
|
||||
.await
|
||||
.map_err(NotifyError::NeonLocal)?;
|
||||
}
|
||||
|
||||
@@ -46,11 +46,31 @@ impl TenantShardDrain {
|
||||
&self,
|
||||
tenants: &BTreeMap<TenantShardId, TenantShard>,
|
||||
scheduler: &Scheduler,
|
||||
) -> Option<NodeId> {
|
||||
let tenant_shard = tenants.get(&self.tenant_shard_id)?;
|
||||
) -> TenantShardDrainAction {
|
||||
let Some(tenant_shard) = tenants.get(&self.tenant_shard_id) else {
|
||||
return TenantShardDrainAction::Skip;
|
||||
};
|
||||
|
||||
if *tenant_shard.intent.get_attached() != Some(self.drained_node) {
|
||||
return None;
|
||||
// If the intent attached node is not the drained node, check the observed state
|
||||
// of the shard on the drained node. If it is Attached*, it means the shard is
|
||||
// beeing migrated from the drained node. The drain loop needs to wait for the
|
||||
// reconciliation to complete for a smooth draining.
|
||||
|
||||
use pageserver_api::models::LocationConfigMode::*;
|
||||
|
||||
let attach_mode = tenant_shard
|
||||
.observed
|
||||
.locations
|
||||
.get(&self.drained_node)
|
||||
.and_then(|observed| observed.conf.as_ref().map(|conf| conf.mode));
|
||||
|
||||
return match (attach_mode, tenant_shard.intent.get_attached()) {
|
||||
(Some(AttachedSingle | AttachedMulti | AttachedStale), Some(intent_node_id)) => {
|
||||
TenantShardDrainAction::Reconcile(*intent_node_id)
|
||||
}
|
||||
_ => TenantShardDrainAction::Skip,
|
||||
};
|
||||
}
|
||||
|
||||
// Only tenants with a normal (Active) scheduling policy are proactively moved
|
||||
@@ -63,19 +83,19 @@ impl TenantShardDrain {
|
||||
}
|
||||
ShardSchedulingPolicy::Pause | ShardSchedulingPolicy::Stop => {
|
||||
// If we have been asked to avoid rescheduling this shard, then do not migrate it during a drain
|
||||
return None;
|
||||
return TenantShardDrainAction::Skip;
|
||||
}
|
||||
}
|
||||
|
||||
match tenant_shard.preferred_secondary(scheduler) {
|
||||
Some(node) => Some(node),
|
||||
Some(node) => TenantShardDrainAction::RescheduleToSecondary(node),
|
||||
None => {
|
||||
tracing::warn!(
|
||||
tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(),
|
||||
"No eligible secondary while draining {}", self.drained_node
|
||||
);
|
||||
|
||||
None
|
||||
TenantShardDrainAction::Skip
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -138,3 +158,17 @@ impl TenantShardDrain {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Action to take when draining a tenant shard.
|
||||
pub(crate) enum TenantShardDrainAction {
|
||||
/// The tenant shard is on the draining node.
|
||||
/// Reschedule the tenant shard to a secondary location.
|
||||
/// Holds a destination node id to reschedule to.
|
||||
RescheduleToSecondary(NodeId),
|
||||
/// The tenant shard is beeing migrated from the draining node.
|
||||
/// Wait for the reconciliation to complete.
|
||||
/// Holds the intent attached node id.
|
||||
Reconcile(NodeId),
|
||||
/// The tenant shard is not eligible for drainining, skip it.
|
||||
Skip,
|
||||
}
|
||||
|
||||
@@ -471,11 +471,17 @@ impl Persistence {
|
||||
&self,
|
||||
input_node_id: NodeId,
|
||||
input_https_port: Option<u16>,
|
||||
input_grpc_addr: Option<String>,
|
||||
input_grpc_port: Option<u16>,
|
||||
) -> DatabaseResult<()> {
|
||||
use crate::schema::nodes::dsl::*;
|
||||
self.update_node(
|
||||
input_node_id,
|
||||
listen_https_port.eq(input_https_port.map(|x| x as i32)),
|
||||
(
|
||||
listen_https_port.eq(input_https_port.map(|x| x as i32)),
|
||||
listen_grpc_addr.eq(input_grpc_addr),
|
||||
listen_grpc_port.eq(input_grpc_port.map(|x| x as i32)),
|
||||
),
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
@@ -80,7 +80,7 @@ use crate::id_lock_map::{
|
||||
use crate::leadership::Leadership;
|
||||
use crate::metrics;
|
||||
use crate::node::{AvailabilityTransition, Node};
|
||||
use crate::operation_utils::{self, TenantShardDrain};
|
||||
use crate::operation_utils::{self, TenantShardDrain, TenantShardDrainAction};
|
||||
use crate::pageserver_client::PageserverClient;
|
||||
use crate::peer_client::GlobalObservedState;
|
||||
use crate::persistence::split_state::SplitState;
|
||||
@@ -1280,7 +1280,7 @@ impl Service {
|
||||
// Always attempt autosplits. Sharding is crucial for bulk ingest performance, so we
|
||||
// must be responsive when new projects begin ingesting and reach the threshold.
|
||||
self.autosplit_tenants().await;
|
||||
}
|
||||
},
|
||||
_ = self.reconcilers_cancel.cancelled() => return
|
||||
}
|
||||
}
|
||||
@@ -7824,7 +7824,7 @@ impl Service {
|
||||
register_req.listen_https_port,
|
||||
register_req.listen_pg_addr,
|
||||
register_req.listen_pg_port,
|
||||
register_req.listen_grpc_addr,
|
||||
register_req.listen_grpc_addr.clone(),
|
||||
register_req.listen_grpc_port,
|
||||
register_req.availability_zone_id.clone(),
|
||||
self.config.use_https_pageserver_api,
|
||||
@@ -7859,6 +7859,8 @@ impl Service {
|
||||
.update_node_on_registration(
|
||||
register_req.node_id,
|
||||
register_req.listen_https_port,
|
||||
register_req.listen_grpc_addr,
|
||||
register_req.listen_grpc_port,
|
||||
)
|
||||
.await?
|
||||
}
|
||||
@@ -8887,6 +8889,9 @@ impl Service {
|
||||
for (_tenant_id, schedule_context, shards) in
|
||||
TenantShardExclusiveIterator::new(tenants, ScheduleMode::Speculative)
|
||||
{
|
||||
if work.len() >= MAX_OPTIMIZATIONS_PLAN_PER_PASS {
|
||||
break;
|
||||
}
|
||||
for shard in shards {
|
||||
if work.len() >= MAX_OPTIMIZATIONS_PLAN_PER_PASS {
|
||||
break;
|
||||
@@ -9651,16 +9656,16 @@ impl Service {
|
||||
tenant_shard_id: tid,
|
||||
};
|
||||
|
||||
let dest_node_id = {
|
||||
let drain_action = {
|
||||
let locked = self.inner.read().unwrap();
|
||||
tid_drain.tenant_shard_eligible_for_drain(&locked.tenants, &locked.scheduler)
|
||||
};
|
||||
|
||||
match tid_drain
|
||||
.tenant_shard_eligible_for_drain(&locked.tenants, &locked.scheduler)
|
||||
{
|
||||
Some(node_id) => node_id,
|
||||
None => {
|
||||
continue;
|
||||
}
|
||||
let dest_node_id = match drain_action {
|
||||
TenantShardDrainAction::RescheduleToSecondary(dest_node_id) => dest_node_id,
|
||||
TenantShardDrainAction::Reconcile(intent_node_id) => intent_node_id,
|
||||
TenantShardDrainAction::Skip => {
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
@@ -9695,14 +9700,16 @@ impl Service {
|
||||
{
|
||||
let mut locked = self.inner.write().unwrap();
|
||||
let (nodes, tenants, scheduler) = locked.parts_mut();
|
||||
let rescheduled = tid_drain.reschedule_to_secondary(
|
||||
dest_node_id,
|
||||
tenants,
|
||||
scheduler,
|
||||
nodes,
|
||||
)?;
|
||||
|
||||
if let Some(tenant_shard) = rescheduled {
|
||||
let tenant_shard = match drain_action {
|
||||
TenantShardDrainAction::RescheduleToSecondary(dest_node_id) => tid_drain
|
||||
.reschedule_to_secondary(dest_node_id, tenants, scheduler, nodes)?,
|
||||
TenantShardDrainAction::Reconcile(_) => tenants.get_mut(&tid),
|
||||
// Note: Unreachable, handled above.
|
||||
TenantShardDrainAction::Skip => None,
|
||||
};
|
||||
|
||||
if let Some(tenant_shard) = tenant_shard {
|
||||
let waiter = self.maybe_configured_reconcile_shard(
|
||||
tenant_shard,
|
||||
nodes,
|
||||
|
||||
@@ -24,12 +24,12 @@ use pageserver_api::controller_api::{
|
||||
};
|
||||
use pageserver_api::models::{SafekeeperInfo, SafekeepersInfo, TimelineInfo};
|
||||
use safekeeper_api::PgVersionId;
|
||||
use safekeeper_api::Term;
|
||||
use safekeeper_api::membership::{self, MemberSet, SafekeeperGeneration};
|
||||
use safekeeper_api::models::{
|
||||
PullTimelineRequest, TimelineLocateResponse, TimelineMembershipSwitchRequest,
|
||||
TimelineMembershipSwitchResponse,
|
||||
};
|
||||
use safekeeper_api::{INITIAL_TERM, Term};
|
||||
use safekeeper_client::mgmt_api;
|
||||
use tokio::task::JoinSet;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
@@ -1298,13 +1298,7 @@ impl Service {
|
||||
)
|
||||
.await?;
|
||||
|
||||
let mut sync_position = (INITIAL_TERM, Lsn::INVALID);
|
||||
for res in results.into_iter().flatten() {
|
||||
let sk_position = (res.last_log_term, res.flush_lsn);
|
||||
if sync_position < sk_position {
|
||||
sync_position = sk_position;
|
||||
}
|
||||
}
|
||||
let sync_position = Self::get_sync_position(&results)?;
|
||||
|
||||
tracing::info!(
|
||||
%generation,
|
||||
@@ -1598,4 +1592,36 @@ impl Service {
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Get membership switch responses from all safekeepers and return the sync position.
|
||||
///
|
||||
/// Sync position is a position equal or greater than the commit position.
|
||||
/// It is guaranteed that all WAL entries with (last_log_term, flush_lsn)
|
||||
/// greater than the sync position are not committed (= not on a quorum).
|
||||
///
|
||||
/// Returns error if there is no quorum of successful responses.
|
||||
fn get_sync_position(
|
||||
responses: &[mgmt_api::Result<TimelineMembershipSwitchResponse>],
|
||||
) -> Result<(Term, Lsn), ApiError> {
|
||||
let quorum_size = responses.len() / 2 + 1;
|
||||
|
||||
let mut wal_positions = responses
|
||||
.iter()
|
||||
.flatten()
|
||||
.map(|res| (res.last_log_term, res.flush_lsn))
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
// Should be already checked if the responses are from tenant_timeline_set_membership_quorum.
|
||||
if wal_positions.len() < quorum_size {
|
||||
return Err(ApiError::InternalServerError(anyhow::anyhow!(
|
||||
"not enough successful responses to get sync position: {}/{}",
|
||||
wal_positions.len(),
|
||||
quorum_size,
|
||||
)));
|
||||
}
|
||||
|
||||
wal_positions.sort();
|
||||
|
||||
Ok(wal_positions[quorum_size - 1])
|
||||
}
|
||||
}
|
||||
|
||||
@@ -812,8 +812,6 @@ impl TenantShard {
|
||||
/// if the swap is not possible and leaves the intent state in its original state.
|
||||
///
|
||||
/// Arguments:
|
||||
/// `attached_to`: the currently attached location matching the intent state (may be None if the
|
||||
/// shard is not attached)
|
||||
/// `promote_to`: an optional secondary location of this tenant shard. If set to None, we ask
|
||||
/// the scheduler to recommend a node
|
||||
pub(crate) fn reschedule_to_secondary(
|
||||
|
||||
Reference in New Issue
Block a user