mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-17 02:12:56 +00:00
Merge branch 'main' into communicator-rewrite
This commit is contained in:
@@ -15,11 +15,12 @@ use pageserver_api::config::DEFAULT_GRPC_LISTEN_PORT;
|
||||
use pageserver_api::controller_api::AvailabilityZone;
|
||||
use pageserver_api::shard::{ShardCount, ShardNumber, ShardStripeSize, TenantShardId};
|
||||
use postgres_connection::parse_host_port;
|
||||
use safekeeper_api::membership::SafekeeperGeneration;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::{Instrument, info_span};
|
||||
use utils::backoff::{self};
|
||||
use utils::id::{NodeId, TenantId};
|
||||
use utils::id::{NodeId, TenantId, TenantTimelineId, TimelineId};
|
||||
|
||||
use crate::service::Config;
|
||||
|
||||
@@ -37,7 +38,7 @@ struct UnshardedComputeHookTenant {
|
||||
preferred_az: Option<AvailabilityZone>,
|
||||
|
||||
// Must hold this lock to send a notification.
|
||||
send_lock: Arc<tokio::sync::Mutex<Option<ComputeRemoteState>>>,
|
||||
send_lock: Arc<tokio::sync::Mutex<Option<ComputeRemoteTenantState>>>,
|
||||
}
|
||||
struct ShardedComputeHookTenant {
|
||||
stripe_size: ShardStripeSize,
|
||||
@@ -50,7 +51,7 @@ struct ShardedComputeHookTenant {
|
||||
// Must hold this lock to send a notification. The contents represent
|
||||
// the last successfully sent notification, and are used to coalesce multiple
|
||||
// updates by only sending when there is a chance since our last successful send.
|
||||
send_lock: Arc<tokio::sync::Mutex<Option<ComputeRemoteState>>>,
|
||||
send_lock: Arc<tokio::sync::Mutex<Option<ComputeRemoteTenantState>>>,
|
||||
}
|
||||
|
||||
/// Represents our knowledge of the compute's state: we can update this when we get a
|
||||
@@ -58,9 +59,9 @@ struct ShardedComputeHookTenant {
|
||||
///
|
||||
/// Should be wrapped in an Option<>, as we cannot always know the remote state.
|
||||
#[derive(PartialEq, Eq, Debug)]
|
||||
struct ComputeRemoteState {
|
||||
struct ComputeRemoteState<R> {
|
||||
// The request body which was acked by the compute
|
||||
request: ComputeHookNotifyRequest,
|
||||
request: R,
|
||||
|
||||
// Whether the cplane indicated that the state was applied to running computes, or just
|
||||
// persisted. In the Neon control plane, this is the difference between a 423 response (meaning
|
||||
@@ -68,6 +69,36 @@ struct ComputeRemoteState {
|
||||
applied: bool,
|
||||
}
|
||||
|
||||
type ComputeRemoteTenantState = ComputeRemoteState<NotifyAttachRequest>;
|
||||
type ComputeRemoteTimelineState = ComputeRemoteState<NotifySafekeepersRequest>;
|
||||
|
||||
/// The trait which define the handler-specific types and methods.
|
||||
/// We have two implementations of this trait so far:
|
||||
/// - [`ComputeHookTenant`] for tenant attach notifications ("/notify-attach")
|
||||
/// - [`ComputeHookTimeline`] for safekeeper change notifications ("/notify-safekeepers")
|
||||
trait ApiMethod {
|
||||
/// Type of the key which identifies the resource.
|
||||
/// It's either TenantId for tenant attach notifications,
|
||||
/// or TenantTimelineId for safekeeper change notifications.
|
||||
type Key: std::cmp::Eq + std::hash::Hash + Clone;
|
||||
|
||||
type Request: serde::Serialize + std::fmt::Debug;
|
||||
|
||||
const API_PATH: &'static str;
|
||||
|
||||
fn maybe_send(
|
||||
&self,
|
||||
key: Self::Key,
|
||||
lock: Option<tokio::sync::OwnedMutexGuard<Option<ComputeRemoteState<Self::Request>>>>,
|
||||
) -> MaybeSendResult<Self::Request, Self::Key>;
|
||||
|
||||
async fn notify_local(
|
||||
env: &LocalEnv,
|
||||
cplane: &ComputeControlPlane,
|
||||
req: &Self::Request,
|
||||
) -> Result<(), NotifyError>;
|
||||
}
|
||||
|
||||
enum ComputeHookTenant {
|
||||
Unsharded(UnshardedComputeHookTenant),
|
||||
Sharded(ShardedComputeHookTenant),
|
||||
@@ -98,7 +129,7 @@ impl ComputeHookTenant {
|
||||
}
|
||||
}
|
||||
|
||||
fn get_send_lock(&self) -> &Arc<tokio::sync::Mutex<Option<ComputeRemoteState>>> {
|
||||
fn get_send_lock(&self) -> &Arc<tokio::sync::Mutex<Option<ComputeRemoteTenantState>>> {
|
||||
match self {
|
||||
Self::Unsharded(unsharded_tenant) => &unsharded_tenant.send_lock,
|
||||
Self::Sharded(sharded_tenant) => &sharded_tenant.send_lock,
|
||||
@@ -192,19 +223,136 @@ impl ComputeHookTenant {
|
||||
}
|
||||
}
|
||||
|
||||
/// The state of a timeline we need to notify the compute about.
|
||||
struct ComputeHookTimeline {
|
||||
generation: SafekeeperGeneration,
|
||||
safekeepers: Vec<SafekeeperInfo>,
|
||||
|
||||
send_lock: Arc<tokio::sync::Mutex<Option<ComputeRemoteTimelineState>>>,
|
||||
}
|
||||
|
||||
impl ComputeHookTimeline {
|
||||
/// Construct a new ComputeHookTimeline with the given safekeepers and generation.
|
||||
fn new(generation: SafekeeperGeneration, safekeepers: Vec<SafekeeperInfo>) -> Self {
|
||||
Self {
|
||||
generation,
|
||||
safekeepers,
|
||||
send_lock: Arc::default(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Update the state with a new SafekeepersUpdate.
|
||||
/// Noop if the update generation is not greater than the current generation.
|
||||
fn update(&mut self, sk_update: SafekeepersUpdate) {
|
||||
if sk_update.generation > self.generation {
|
||||
self.generation = sk_update.generation;
|
||||
self.safekeepers = sk_update.safekeepers;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl ApiMethod for ComputeHookTimeline {
|
||||
type Key = TenantTimelineId;
|
||||
type Request = NotifySafekeepersRequest;
|
||||
|
||||
const API_PATH: &'static str = "notify-safekeepers";
|
||||
|
||||
fn maybe_send(
|
||||
&self,
|
||||
ttid: TenantTimelineId,
|
||||
lock: Option<tokio::sync::OwnedMutexGuard<Option<ComputeRemoteTimelineState>>>,
|
||||
) -> MaybeSendNotifySafekeepersResult {
|
||||
let locked = match lock {
|
||||
Some(already_locked) => already_locked,
|
||||
None => {
|
||||
// Lock order: this _must_ be only a try_lock, because we are called inside of the [`ComputeHook::timelines`] lock.
|
||||
let Ok(locked) = self.send_lock.clone().try_lock_owned() else {
|
||||
return MaybeSendResult::AwaitLock((ttid, self.send_lock.clone()));
|
||||
};
|
||||
locked
|
||||
}
|
||||
};
|
||||
|
||||
if locked
|
||||
.as_ref()
|
||||
.is_some_and(|s| s.request.generation >= self.generation)
|
||||
{
|
||||
return MaybeSendResult::Noop;
|
||||
}
|
||||
|
||||
MaybeSendResult::Transmit((
|
||||
NotifySafekeepersRequest {
|
||||
tenant_id: ttid.tenant_id,
|
||||
timeline_id: ttid.timeline_id,
|
||||
generation: self.generation,
|
||||
safekeepers: self.safekeepers.clone(),
|
||||
},
|
||||
locked,
|
||||
))
|
||||
}
|
||||
|
||||
async fn notify_local(
|
||||
_env: &LocalEnv,
|
||||
cplane: &ComputeControlPlane,
|
||||
req: &NotifySafekeepersRequest,
|
||||
) -> Result<(), NotifyError> {
|
||||
let NotifySafekeepersRequest {
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
generation,
|
||||
safekeepers,
|
||||
} = req;
|
||||
|
||||
for (endpoint_name, endpoint) in &cplane.endpoints {
|
||||
if endpoint.tenant_id == *tenant_id
|
||||
&& endpoint.timeline_id == *timeline_id
|
||||
&& endpoint.status() == EndpointStatus::Running
|
||||
{
|
||||
tracing::info!("Reconfiguring safekeepers for endpoint {endpoint_name}");
|
||||
|
||||
let safekeepers = safekeepers.iter().map(|sk| sk.id).collect::<Vec<_>>();
|
||||
|
||||
endpoint
|
||||
.reconfigure_safekeepers(safekeepers, *generation)
|
||||
.await
|
||||
.map_err(NotifyError::NeonLocal)?;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Eq, PartialEq)]
|
||||
struct ComputeHookNotifyRequestShard {
|
||||
struct NotifyAttachRequestShard {
|
||||
node_id: NodeId,
|
||||
shard_number: ShardNumber,
|
||||
}
|
||||
|
||||
/// Request body that we send to the control plane to notify it of where a tenant is attached
|
||||
#[derive(Serialize, Deserialize, Debug, Eq, PartialEq)]
|
||||
struct ComputeHookNotifyRequest {
|
||||
struct NotifyAttachRequest {
|
||||
tenant_id: TenantId,
|
||||
preferred_az: Option<String>,
|
||||
stripe_size: Option<ShardStripeSize>,
|
||||
shards: Vec<ComputeHookNotifyRequestShard>,
|
||||
shards: Vec<NotifyAttachRequestShard>,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Eq, PartialEq, Clone)]
|
||||
pub(crate) struct SafekeeperInfo {
|
||||
pub id: NodeId,
|
||||
/// Hostname of the safekeeper.
|
||||
/// It exists for better debuggability. Might be missing.
|
||||
/// Should not be used for anything else.
|
||||
pub hostname: Option<String>,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Eq, PartialEq)]
|
||||
struct NotifySafekeepersRequest {
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
generation: SafekeeperGeneration,
|
||||
safekeepers: Vec<SafekeeperInfo>,
|
||||
}
|
||||
|
||||
/// Error type for attempts to call into the control plane compute notification hook
|
||||
@@ -236,42 +384,50 @@ pub(crate) enum NotifyError {
|
||||
NeonLocal(anyhow::Error),
|
||||
}
|
||||
|
||||
enum MaybeSendResult {
|
||||
enum MaybeSendResult<R, K> {
|
||||
// Please send this request while holding the lock, and if you succeed then write
|
||||
// the request into the lock.
|
||||
Transmit(
|
||||
(
|
||||
ComputeHookNotifyRequest,
|
||||
tokio::sync::OwnedMutexGuard<Option<ComputeRemoteState>>,
|
||||
R,
|
||||
tokio::sync::OwnedMutexGuard<Option<ComputeRemoteState<R>>>,
|
||||
),
|
||||
),
|
||||
// Something requires sending, but you must wait for a current sender then call again
|
||||
AwaitLock(Arc<tokio::sync::Mutex<Option<ComputeRemoteState>>>),
|
||||
AwaitLock((K, Arc<tokio::sync::Mutex<Option<ComputeRemoteState<R>>>>)),
|
||||
// Nothing requires sending
|
||||
Noop,
|
||||
}
|
||||
|
||||
impl ComputeHookTenant {
|
||||
type MaybeSendNotifyAttachResult = MaybeSendResult<NotifyAttachRequest, TenantId>;
|
||||
type MaybeSendNotifySafekeepersResult = MaybeSendResult<NotifySafekeepersRequest, TenantTimelineId>;
|
||||
|
||||
impl ApiMethod for ComputeHookTenant {
|
||||
type Key = TenantId;
|
||||
type Request = NotifyAttachRequest;
|
||||
|
||||
const API_PATH: &'static str = "notify-attach";
|
||||
|
||||
fn maybe_send(
|
||||
&self,
|
||||
tenant_id: TenantId,
|
||||
lock: Option<tokio::sync::OwnedMutexGuard<Option<ComputeRemoteState>>>,
|
||||
) -> MaybeSendResult {
|
||||
lock: Option<tokio::sync::OwnedMutexGuard<Option<ComputeRemoteTenantState>>>,
|
||||
) -> MaybeSendNotifyAttachResult {
|
||||
let locked = match lock {
|
||||
Some(already_locked) => already_locked,
|
||||
None => {
|
||||
// Lock order: this _must_ be only a try_lock, because we are called inside of the [`ComputeHook::state`] lock.
|
||||
// Lock order: this _must_ be only a try_lock, because we are called inside of the [`ComputeHook::tenants`] lock.
|
||||
let Ok(locked) = self.get_send_lock().clone().try_lock_owned() else {
|
||||
return MaybeSendResult::AwaitLock(self.get_send_lock().clone());
|
||||
return MaybeSendResult::AwaitLock((tenant_id, self.get_send_lock().clone()));
|
||||
};
|
||||
locked
|
||||
}
|
||||
};
|
||||
|
||||
let request = match self {
|
||||
Self::Unsharded(unsharded_tenant) => Some(ComputeHookNotifyRequest {
|
||||
Self::Unsharded(unsharded_tenant) => Some(NotifyAttachRequest {
|
||||
tenant_id,
|
||||
shards: vec![ComputeHookNotifyRequestShard {
|
||||
shards: vec![NotifyAttachRequestShard {
|
||||
shard_number: ShardNumber(0),
|
||||
node_id: unsharded_tenant.node_id,
|
||||
}],
|
||||
@@ -284,12 +440,12 @@ impl ComputeHookTenant {
|
||||
Self::Sharded(sharded_tenant)
|
||||
if sharded_tenant.shards.len() == sharded_tenant.shard_count.count() as usize =>
|
||||
{
|
||||
Some(ComputeHookNotifyRequest {
|
||||
Some(NotifyAttachRequest {
|
||||
tenant_id,
|
||||
shards: sharded_tenant
|
||||
.shards
|
||||
.iter()
|
||||
.map(|(shard_number, node_id)| ComputeHookNotifyRequestShard {
|
||||
.map(|(shard_number, node_id)| NotifyAttachRequestShard {
|
||||
shard_number: *shard_number,
|
||||
node_id: *node_id,
|
||||
})
|
||||
@@ -334,98 +490,22 @@ impl ComputeHookTenant {
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// The compute hook is a destination for notifications about changes to tenant:pageserver
|
||||
/// mapping. It aggregates updates for the shards in a tenant, and when appropriate reconfigures
|
||||
/// the compute connection string.
|
||||
pub(super) struct ComputeHook {
|
||||
config: Config,
|
||||
state: std::sync::Mutex<HashMap<TenantId, ComputeHookTenant>>,
|
||||
authorization_header: Option<String>,
|
||||
|
||||
// Concurrency limiter, so that we do not overload the cloud control plane when updating
|
||||
// large numbers of tenants (e.g. when failing over after a node failure)
|
||||
api_concurrency: tokio::sync::Semaphore,
|
||||
|
||||
// This lock is only used in testing enviroments, to serialize calls into neon_lock
|
||||
neon_local_lock: tokio::sync::Mutex<()>,
|
||||
|
||||
// We share a client across all notifications to enable connection re-use etc when
|
||||
// sending large numbers of notifications
|
||||
client: reqwest::Client,
|
||||
}
|
||||
|
||||
/// Callers may give us a list of these when asking us to send a bulk batch
|
||||
/// of notifications in the background. This is a 'notification' in the sense of
|
||||
/// other code notifying us of a shard's status, rather than being the final notification
|
||||
/// that we send upwards to the control plane for the whole tenant.
|
||||
pub(crate) struct ShardUpdate<'a> {
|
||||
pub(crate) tenant_shard_id: TenantShardId,
|
||||
pub(crate) node_id: NodeId,
|
||||
pub(crate) stripe_size: ShardStripeSize,
|
||||
pub(crate) preferred_az: Option<Cow<'a, AvailabilityZone>>,
|
||||
}
|
||||
|
||||
impl ComputeHook {
|
||||
pub(super) fn new(config: Config) -> anyhow::Result<Self> {
|
||||
let authorization_header = config
|
||||
.control_plane_jwt_token
|
||||
.clone()
|
||||
.map(|jwt| format!("Bearer {}", jwt));
|
||||
|
||||
let mut client = reqwest::ClientBuilder::new().timeout(NOTIFY_REQUEST_TIMEOUT);
|
||||
for cert in &config.ssl_ca_certs {
|
||||
client = client.add_root_certificate(cert.clone());
|
||||
}
|
||||
let client = client
|
||||
.build()
|
||||
.context("Failed to build http client for compute hook")?;
|
||||
|
||||
Ok(Self {
|
||||
state: Default::default(),
|
||||
config,
|
||||
authorization_header,
|
||||
neon_local_lock: Default::default(),
|
||||
api_concurrency: tokio::sync::Semaphore::new(API_CONCURRENCY),
|
||||
client,
|
||||
})
|
||||
}
|
||||
|
||||
/// For test environments: use neon_local's LocalEnv to update compute
|
||||
async fn do_notify_local(
|
||||
&self,
|
||||
reconfigure_request: &ComputeHookNotifyRequest,
|
||||
async fn notify_local(
|
||||
env: &LocalEnv,
|
||||
cplane: &ComputeControlPlane,
|
||||
req: &NotifyAttachRequest,
|
||||
) -> Result<(), NotifyError> {
|
||||
// neon_local updates are not safe to call concurrently, use a lock to serialize
|
||||
// all calls to this function
|
||||
let _locked = self.neon_local_lock.lock().await;
|
||||
|
||||
let Some(repo_dir) = self.config.neon_local_repo_dir.as_deref() else {
|
||||
tracing::warn!(
|
||||
"neon_local_repo_dir not set, likely a bug in neon_local; skipping compute update"
|
||||
);
|
||||
return Ok(());
|
||||
};
|
||||
let env = match LocalEnv::load_config(repo_dir) {
|
||||
Ok(e) => e,
|
||||
Err(e) => {
|
||||
tracing::warn!("Couldn't load neon_local config, skipping compute update ({e})");
|
||||
return Ok(());
|
||||
}
|
||||
};
|
||||
let cplane =
|
||||
ComputeControlPlane::load(env.clone()).expect("Error loading compute control plane");
|
||||
let ComputeHookNotifyRequest {
|
||||
let NotifyAttachRequest {
|
||||
tenant_id,
|
||||
shards,
|
||||
stripe_size,
|
||||
preferred_az: _preferred_az,
|
||||
} = reconfigure_request;
|
||||
} = req;
|
||||
|
||||
for (endpoint_name, endpoint) in &cplane.endpoints {
|
||||
if endpoint.tenant_id == *tenant_id && endpoint.status() == EndpointStatus::Running {
|
||||
tracing::info!("Reconfiguring endpoint {endpoint_name}");
|
||||
tracing::info!("Reconfiguring pageservers for endpoint {endpoint_name}");
|
||||
|
||||
let mut shard_conninfos = HashMap::new();
|
||||
for shard in shards.iter() {
|
||||
@@ -460,7 +540,7 @@ impl ComputeHook {
|
||||
};
|
||||
|
||||
endpoint
|
||||
.reconfigure(pageserver_conninfo, *stripe_size, None)
|
||||
.reconfigure_pageservers(pageserver_conninfo, *stripe_size)
|
||||
.await
|
||||
.map_err(NotifyError::NeonLocal)?;
|
||||
}
|
||||
@@ -468,11 +548,102 @@ impl ComputeHook {
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
async fn do_notify_iteration(
|
||||
/// The compute hook is a destination for notifications about changes to tenant:pageserver
|
||||
/// mapping. It aggregates updates for the shards in a tenant, and when appropriate reconfigures
|
||||
/// the compute connection string.
|
||||
pub(super) struct ComputeHook {
|
||||
config: Config,
|
||||
tenants: std::sync::Mutex<HashMap<TenantId, ComputeHookTenant>>,
|
||||
timelines: std::sync::Mutex<HashMap<TenantTimelineId, ComputeHookTimeline>>,
|
||||
authorization_header: Option<String>,
|
||||
|
||||
// Concurrency limiter, so that we do not overload the cloud control plane when updating
|
||||
// large numbers of tenants (e.g. when failing over after a node failure)
|
||||
api_concurrency: tokio::sync::Semaphore,
|
||||
|
||||
// This lock is only used in testing enviroments, to serialize calls into neon_local
|
||||
neon_local_lock: tokio::sync::Mutex<()>,
|
||||
|
||||
// We share a client across all notifications to enable connection re-use etc when
|
||||
// sending large numbers of notifications
|
||||
client: reqwest::Client,
|
||||
}
|
||||
|
||||
/// Callers may give us a list of these when asking us to send a bulk batch
|
||||
/// of notifications in the background. This is a 'notification' in the sense of
|
||||
/// other code notifying us of a shard's status, rather than being the final notification
|
||||
/// that we send upwards to the control plane for the whole tenant.
|
||||
pub(crate) struct ShardUpdate<'a> {
|
||||
pub(crate) tenant_shard_id: TenantShardId,
|
||||
pub(crate) node_id: NodeId,
|
||||
pub(crate) stripe_size: ShardStripeSize,
|
||||
pub(crate) preferred_az: Option<Cow<'a, AvailabilityZone>>,
|
||||
}
|
||||
|
||||
pub(crate) struct SafekeepersUpdate {
|
||||
pub(crate) tenant_id: TenantId,
|
||||
pub(crate) timeline_id: TimelineId,
|
||||
pub(crate) generation: SafekeeperGeneration,
|
||||
pub(crate) safekeepers: Vec<SafekeeperInfo>,
|
||||
}
|
||||
|
||||
impl ComputeHook {
|
||||
pub(super) fn new(config: Config) -> anyhow::Result<Self> {
|
||||
let authorization_header = config
|
||||
.control_plane_jwt_token
|
||||
.clone()
|
||||
.map(|jwt| format!("Bearer {jwt}"));
|
||||
|
||||
let mut client = reqwest::ClientBuilder::new().timeout(NOTIFY_REQUEST_TIMEOUT);
|
||||
for cert in &config.ssl_ca_certs {
|
||||
client = client.add_root_certificate(cert.clone());
|
||||
}
|
||||
let client = client
|
||||
.build()
|
||||
.context("Failed to build http client for compute hook")?;
|
||||
|
||||
Ok(Self {
|
||||
tenants: Default::default(),
|
||||
timelines: Default::default(),
|
||||
config,
|
||||
authorization_header,
|
||||
neon_local_lock: Default::default(),
|
||||
api_concurrency: tokio::sync::Semaphore::new(API_CONCURRENCY),
|
||||
client,
|
||||
})
|
||||
}
|
||||
|
||||
/// For test environments: use neon_local's LocalEnv to update compute
|
||||
async fn do_notify_local<M: ApiMethod>(&self, req: &M::Request) -> Result<(), NotifyError> {
|
||||
// neon_local updates are not safe to call concurrently, use a lock to serialize
|
||||
// all calls to this function
|
||||
let _locked = self.neon_local_lock.lock().await;
|
||||
|
||||
let Some(repo_dir) = self.config.neon_local_repo_dir.as_deref() else {
|
||||
tracing::warn!(
|
||||
"neon_local_repo_dir not set, likely a bug in neon_local; skipping compute update"
|
||||
);
|
||||
return Ok(());
|
||||
};
|
||||
let env = match LocalEnv::load_config(repo_dir) {
|
||||
Ok(e) => e,
|
||||
Err(e) => {
|
||||
tracing::warn!("Couldn't load neon_local config, skipping compute update ({e})");
|
||||
return Ok(());
|
||||
}
|
||||
};
|
||||
let cplane =
|
||||
ComputeControlPlane::load(env.clone()).expect("Error loading compute control plane");
|
||||
|
||||
M::notify_local(&env, &cplane, req).await
|
||||
}
|
||||
|
||||
async fn do_notify_iteration<Req: serde::Serialize + std::fmt::Debug>(
|
||||
&self,
|
||||
url: &String,
|
||||
reconfigure_request: &ComputeHookNotifyRequest,
|
||||
reconfigure_request: &Req,
|
||||
cancel: &CancellationToken,
|
||||
) -> Result<(), NotifyError> {
|
||||
let req = self.client.request(reqwest::Method::PUT, url);
|
||||
@@ -494,9 +665,7 @@ impl ComputeHook {
|
||||
};
|
||||
|
||||
// Treat all 2xx responses as success
|
||||
if response.status() >= reqwest::StatusCode::OK
|
||||
&& response.status() < reqwest::StatusCode::MULTIPLE_CHOICES
|
||||
{
|
||||
if response.status().is_success() {
|
||||
if response.status() != reqwest::StatusCode::OK {
|
||||
// Non-200 2xx response: it doesn't make sense to retry, but this is unexpected, so
|
||||
// log a warning.
|
||||
@@ -547,10 +716,10 @@ impl ComputeHook {
|
||||
}
|
||||
}
|
||||
|
||||
async fn do_notify(
|
||||
async fn do_notify<R: serde::Serialize + std::fmt::Debug>(
|
||||
&self,
|
||||
url: &String,
|
||||
reconfigure_request: &ComputeHookNotifyRequest,
|
||||
reconfigure_request: &R,
|
||||
cancel: &CancellationToken,
|
||||
) -> Result<(), NotifyError> {
|
||||
// We hold these semaphore units across all retries, rather than only across each
|
||||
@@ -582,13 +751,13 @@ impl ComputeHook {
|
||||
}
|
||||
|
||||
/// Synchronous phase: update the per-tenant state for the next intended notification
|
||||
fn notify_prepare(&self, shard_update: ShardUpdate) -> MaybeSendResult {
|
||||
let mut state_locked = self.state.lock().unwrap();
|
||||
fn notify_attach_prepare(&self, shard_update: ShardUpdate) -> MaybeSendNotifyAttachResult {
|
||||
let mut tenants_locked = self.tenants.lock().unwrap();
|
||||
|
||||
use std::collections::hash_map::Entry;
|
||||
let tenant_shard_id = shard_update.tenant_shard_id;
|
||||
|
||||
let tenant = match state_locked.entry(tenant_shard_id.tenant_id) {
|
||||
let tenant = match tenants_locked.entry(tenant_shard_id.tenant_id) {
|
||||
Entry::Vacant(e) => {
|
||||
let ShardUpdate {
|
||||
tenant_shard_id,
|
||||
@@ -612,10 +781,37 @@ impl ComputeHook {
|
||||
tenant.maybe_send(tenant_shard_id.tenant_id, None)
|
||||
}
|
||||
|
||||
async fn notify_execute(
|
||||
fn notify_safekeepers_prepare(
|
||||
&self,
|
||||
maybe_send_result: MaybeSendResult,
|
||||
tenant_shard_id: TenantShardId,
|
||||
safekeepers_update: SafekeepersUpdate,
|
||||
) -> MaybeSendNotifySafekeepersResult {
|
||||
let mut timelines_locked = self.timelines.lock().unwrap();
|
||||
|
||||
let ttid = TenantTimelineId {
|
||||
tenant_id: safekeepers_update.tenant_id,
|
||||
timeline_id: safekeepers_update.timeline_id,
|
||||
};
|
||||
|
||||
use std::collections::hash_map::Entry;
|
||||
let timeline = match timelines_locked.entry(ttid) {
|
||||
Entry::Vacant(e) => e.insert(ComputeHookTimeline::new(
|
||||
safekeepers_update.generation,
|
||||
safekeepers_update.safekeepers,
|
||||
)),
|
||||
Entry::Occupied(e) => {
|
||||
let timeline = e.into_mut();
|
||||
timeline.update(safekeepers_update);
|
||||
timeline
|
||||
}
|
||||
};
|
||||
|
||||
timeline.maybe_send(ttid, None)
|
||||
}
|
||||
|
||||
async fn notify_execute<M: ApiMethod>(
|
||||
&self,
|
||||
state: &std::sync::Mutex<HashMap<M::Key, M>>,
|
||||
maybe_send_result: MaybeSendResult<M::Request, M::Key>,
|
||||
cancel: &CancellationToken,
|
||||
) -> Result<(), NotifyError> {
|
||||
// Process result: we may get an update to send, or we may have to wait for a lock
|
||||
@@ -624,7 +820,7 @@ impl ComputeHook {
|
||||
MaybeSendResult::Noop => {
|
||||
return Ok(());
|
||||
}
|
||||
MaybeSendResult::AwaitLock(send_lock) => {
|
||||
MaybeSendResult::AwaitLock((key, send_lock)) => {
|
||||
let send_locked = tokio::select! {
|
||||
guard = send_lock.lock_owned() => {guard},
|
||||
_ = cancel.cancelled() => {
|
||||
@@ -635,11 +831,11 @@ impl ComputeHook {
|
||||
// Lock order: maybe_send is called within the `[Self::state]` lock, and takes the send lock, but here
|
||||
// we have acquired the send lock and take `[Self::state]` lock. This is safe because maybe_send only uses
|
||||
// try_lock.
|
||||
let state_locked = self.state.lock().unwrap();
|
||||
let Some(tenant) = state_locked.get(&tenant_shard_id.tenant_id) else {
|
||||
let state_locked = state.lock().unwrap();
|
||||
let Some(resource_state) = state_locked.get(&key) else {
|
||||
return Ok(());
|
||||
};
|
||||
match tenant.maybe_send(tenant_shard_id.tenant_id, Some(send_locked)) {
|
||||
match resource_state.maybe_send(key, Some(send_locked)) {
|
||||
MaybeSendResult::AwaitLock(_) => {
|
||||
unreachable!("We supplied lock guard")
|
||||
}
|
||||
@@ -658,14 +854,18 @@ impl ComputeHook {
|
||||
.control_plane_url
|
||||
.as_ref()
|
||||
.map(|control_plane_url| {
|
||||
format!("{}/notify-attach", control_plane_url.trim_end_matches('/'))
|
||||
format!(
|
||||
"{}/{}",
|
||||
control_plane_url.trim_end_matches('/'),
|
||||
M::API_PATH
|
||||
)
|
||||
});
|
||||
|
||||
// We validate this at startup
|
||||
let notify_url = compute_hook_url.as_ref().unwrap();
|
||||
self.do_notify(notify_url, &request, cancel).await
|
||||
} else {
|
||||
self.do_notify_local(&request).await.map_err(|e| {
|
||||
self.do_notify_local::<M>(&request).await.map_err(|e| {
|
||||
// This path is for testing only, so munge the error into our prod-style error type.
|
||||
tracing::error!("neon_local notification hook failed: {e}");
|
||||
NotifyError::Fatal(StatusCode::INTERNAL_SERVER_ERROR)
|
||||
@@ -701,7 +901,7 @@ impl ComputeHook {
|
||||
/// Infallible synchronous fire-and-forget version of notify(), that sends its results to
|
||||
/// a channel. Something should consume the channel and arrange to try notifying again
|
||||
/// if something failed.
|
||||
pub(super) fn notify_background(
|
||||
pub(super) fn notify_attach_background(
|
||||
self: &Arc<Self>,
|
||||
notifications: Vec<ShardUpdate>,
|
||||
result_tx: tokio::sync::mpsc::Sender<Result<(), (TenantShardId, NotifyError)>>,
|
||||
@@ -710,7 +910,7 @@ impl ComputeHook {
|
||||
let mut maybe_sends = Vec::new();
|
||||
for shard_update in notifications {
|
||||
let tenant_shard_id = shard_update.tenant_shard_id;
|
||||
let maybe_send_result = self.notify_prepare(shard_update);
|
||||
let maybe_send_result = self.notify_attach_prepare(shard_update);
|
||||
maybe_sends.push((tenant_shard_id, maybe_send_result))
|
||||
}
|
||||
|
||||
@@ -729,10 +929,10 @@ impl ComputeHook {
|
||||
|
||||
async move {
|
||||
this
|
||||
.notify_execute(maybe_send_result, tenant_shard_id, &cancel)
|
||||
.notify_execute(&this.tenants, maybe_send_result, &cancel)
|
||||
.await.map_err(|e| (tenant_shard_id, e))
|
||||
}.instrument(info_span!(
|
||||
"notify_background", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug()
|
||||
"notify_attach_background", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug()
|
||||
))
|
||||
})
|
||||
.buffered(API_CONCURRENCY);
|
||||
@@ -775,14 +975,23 @@ impl ComputeHook {
|
||||
/// ensuring that they eventually call again to ensure that the compute is eventually notified of
|
||||
/// the proper pageserver nodes for a tenant.
|
||||
#[tracing::instrument(skip_all, fields(tenant_id=%shard_update.tenant_shard_id.tenant_id, shard_id=%shard_update.tenant_shard_id.shard_slug(), node_id))]
|
||||
pub(super) async fn notify<'a>(
|
||||
pub(super) async fn notify_attach<'a>(
|
||||
&self,
|
||||
shard_update: ShardUpdate<'a>,
|
||||
cancel: &CancellationToken,
|
||||
) -> Result<(), NotifyError> {
|
||||
let tenant_shard_id = shard_update.tenant_shard_id;
|
||||
let maybe_send_result = self.notify_prepare(shard_update);
|
||||
self.notify_execute(maybe_send_result, tenant_shard_id, cancel)
|
||||
let maybe_send_result = self.notify_attach_prepare(shard_update);
|
||||
self.notify_execute(&self.tenants, maybe_send_result, cancel)
|
||||
.await
|
||||
}
|
||||
|
||||
pub(super) async fn notify_safekeepers(
|
||||
&self,
|
||||
safekeepers_update: SafekeepersUpdate,
|
||||
cancel: &CancellationToken,
|
||||
) -> Result<(), NotifyError> {
|
||||
let maybe_send_result = self.notify_safekeepers_prepare(safekeepers_update);
|
||||
self.notify_execute(&self.timelines, maybe_send_result, cancel)
|
||||
.await
|
||||
}
|
||||
|
||||
@@ -798,8 +1007,8 @@ impl ComputeHook {
|
||||
) {
|
||||
use std::collections::hash_map::Entry;
|
||||
|
||||
let mut state_locked = self.state.lock().unwrap();
|
||||
match state_locked.entry(tenant_shard_id.tenant_id) {
|
||||
let mut tenants_locked = self.tenants.lock().unwrap();
|
||||
match tenants_locked.entry(tenant_shard_id.tenant_id) {
|
||||
Entry::Vacant(_) => {
|
||||
// This is a valid but niche case, where the tenant was previously attached
|
||||
// as a Secondary location and then detached, so has no previously notified
|
||||
|
||||
@@ -62,7 +62,7 @@ pub(crate) fn validate_node_state(
|
||||
nodes: Arc<HashMap<NodeId, Node>>,
|
||||
) -> Result<(), OperationError> {
|
||||
let node = nodes.get(node_id).ok_or(OperationError::NodeStateChanged(
|
||||
format!("node {} was removed", node_id).into(),
|
||||
format!("node {node_id} was removed").into(),
|
||||
))?;
|
||||
|
||||
let current_policy = node.get_scheduling();
|
||||
@@ -70,7 +70,7 @@ pub(crate) fn validate_node_state(
|
||||
// TODO(vlad): maybe cancel pending reconciles before erroring out. need to think
|
||||
// about it
|
||||
return Err(OperationError::NodeStateChanged(
|
||||
format!("node {} changed state to {:?}", node_id, current_policy).into(),
|
||||
format!("node {node_id} changed state to {current_policy:?}").into(),
|
||||
));
|
||||
}
|
||||
|
||||
@@ -145,7 +145,7 @@ impl TenantShardDrain {
|
||||
|
||||
if !nodes.contains_key(&destination) {
|
||||
return Err(OperationError::NodeStateChanged(
|
||||
format!("node {} was removed", destination).into(),
|
||||
format!("node {destination} was removed").into(),
|
||||
));
|
||||
}
|
||||
|
||||
|
||||
@@ -22,7 +22,7 @@ use pageserver_api::controller_api::{
|
||||
MetadataHealthListUnhealthyResponse, MetadataHealthUpdateRequest, MetadataHealthUpdateResponse,
|
||||
NodeAvailability, NodeConfigureRequest, NodeRegisterRequest, SafekeeperSchedulingPolicyRequest,
|
||||
ShardsPreferredAzsRequest, TenantCreateRequest, TenantPolicyRequest, TenantShardMigrateRequest,
|
||||
TimelineImportRequest,
|
||||
TimelineImportRequest, TimelineSafekeeperMigrateRequest,
|
||||
};
|
||||
use pageserver_api::models::{
|
||||
DetachBehavior, LsnLeaseRequest, TenantConfigPatchRequest, TenantConfigRequest,
|
||||
@@ -34,6 +34,7 @@ use pageserver_api::upcall_api::{
|
||||
PutTimelineImportStatusRequest, ReAttachRequest, TimelineImportStatusRequest, ValidateRequest,
|
||||
};
|
||||
use pageserver_client::{BlockUnblock, mgmt_api};
|
||||
|
||||
use routerify::Middleware;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::warn;
|
||||
@@ -635,6 +636,32 @@ async fn handle_tenant_timeline_download_heatmap_layers(
|
||||
json_response(StatusCode::OK, ())
|
||||
}
|
||||
|
||||
async fn handle_tenant_timeline_safekeeper_migrate(
|
||||
service: Arc<Service>,
|
||||
req: Request<Body>,
|
||||
) -> Result<Response<Body>, ApiError> {
|
||||
let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
|
||||
check_permissions(&req, Scope::PageServerApi)?;
|
||||
maybe_rate_limit(&req, tenant_id).await;
|
||||
|
||||
let timeline_id: TimelineId = parse_request_param(&req, "timeline_id")?;
|
||||
|
||||
let mut req = match maybe_forward(req).await {
|
||||
ForwardOutcome::Forwarded(res) => {
|
||||
return res;
|
||||
}
|
||||
ForwardOutcome::NotForwarded(req) => req,
|
||||
};
|
||||
|
||||
let migrate_req = json_request::<TimelineSafekeeperMigrateRequest>(&mut req).await?;
|
||||
|
||||
service
|
||||
.tenant_timeline_safekeeper_migrate(tenant_id, timeline_id, migrate_req)
|
||||
.await?;
|
||||
|
||||
json_response(StatusCode::OK, ())
|
||||
}
|
||||
|
||||
async fn handle_tenant_timeline_lsn_lease(
|
||||
service: Arc<Service>,
|
||||
req: Request<Body>,
|
||||
@@ -721,9 +748,9 @@ async fn handle_tenant_timeline_passthrough(
|
||||
|
||||
// Callers will always pass an unsharded tenant ID. Before proxying, we must
|
||||
// rewrite this to a shard-aware shard zero ID.
|
||||
let path = format!("{}", path);
|
||||
let path = format!("{path}");
|
||||
let tenant_str = tenant_or_shard_id.tenant_id.to_string();
|
||||
let tenant_shard_str = format!("{}", tenant_shard_id);
|
||||
let tenant_shard_str = format!("{tenant_shard_id}");
|
||||
let path = path.replace(&tenant_str, &tenant_shard_str);
|
||||
|
||||
let latency = &METRICS_REGISTRY
|
||||
@@ -1539,7 +1566,7 @@ async fn handle_ready(req: Request<Body>) -> Result<Response<Body>, ApiError> {
|
||||
|
||||
impl From<ReconcileError> for ApiError {
|
||||
fn from(value: ReconcileError) -> Self {
|
||||
ApiError::Conflict(format!("Reconciliation error: {}", value))
|
||||
ApiError::Conflict(format!("Reconciliation error: {value}"))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2458,6 +2485,16 @@ pub fn make_router(
|
||||
)
|
||||
},
|
||||
)
|
||||
.post(
|
||||
"/v1/tenant/:tenant_id/timeline/:timeline_id/safekeeper_migrate",
|
||||
|r| {
|
||||
tenant_service_handler(
|
||||
r,
|
||||
handle_tenant_timeline_safekeeper_migrate,
|
||||
RequestName("v1_tenant_timeline_safekeeper_migrate"),
|
||||
)
|
||||
},
|
||||
)
|
||||
// LSN lease passthrough to all shards
|
||||
.post(
|
||||
"/v1/tenant/:tenant_id/timeline/:timeline_id/lsn_lease",
|
||||
|
||||
@@ -5,17 +5,20 @@ use std::time::Duration;
|
||||
|
||||
use anyhow::{Context, anyhow};
|
||||
use camino::Utf8PathBuf;
|
||||
use clap::Parser;
|
||||
|
||||
use clap::{ArgAction, Parser};
|
||||
use futures::future::OptionFuture;
|
||||
use http_utils::tls_certs::ReloadingCertificateResolver;
|
||||
use hyper0::Uri;
|
||||
use metrics::BuildInfo;
|
||||
use metrics::launch_timestamp::LaunchTimestamp;
|
||||
use pageserver_api::config::PostHogConfig;
|
||||
use reqwest::Certificate;
|
||||
use storage_controller::http::make_router;
|
||||
use storage_controller::metrics::preinitialize_metrics;
|
||||
use storage_controller::persistence::Persistence;
|
||||
use storage_controller::service::chaos_injector::ChaosInjector;
|
||||
use storage_controller::service::feature_flag::FeatureFlagService;
|
||||
use storage_controller::service::{
|
||||
Config, HEARTBEAT_INTERVAL_DEFAULT, LONG_RECONCILE_THRESHOLD_DEFAULT,
|
||||
MAX_OFFLINE_INTERVAL_DEFAULT, MAX_WARMING_UP_INTERVAL_DEFAULT,
|
||||
@@ -211,8 +214,14 @@ struct Cli {
|
||||
/// Number of safekeepers to choose for a timeline when creating it.
|
||||
/// Safekeepers will be choosen from different availability zones.
|
||||
/// This option exists primarily for testing purposes.
|
||||
#[arg(long, default_value = "3", value_parser = clap::value_parser!(i64).range(1..))]
|
||||
timeline_safekeeper_count: i64,
|
||||
#[arg(long, default_value = "3", value_parser = clap::builder::RangedU64ValueParser::<usize>::new().range(1..))]
|
||||
timeline_safekeeper_count: usize,
|
||||
|
||||
/// When set, actively checks and initiates heatmap downloads/uploads during reconciliation.
|
||||
/// This speed up migrations by avoiding the default wait for the heatmap download interval.
|
||||
/// Primarily useful for testing to reduce test execution time.
|
||||
#[arg(long, default_value = "false", action=ArgAction::Set)]
|
||||
kick_secondary_downloads: bool,
|
||||
}
|
||||
|
||||
enum StrictMode {
|
||||
@@ -242,6 +251,8 @@ struct Secrets {
|
||||
peer_jwt_token: Option<String>,
|
||||
}
|
||||
|
||||
const POSTHOG_CONFIG_ENV: &str = "POSTHOG_CONFIG";
|
||||
|
||||
impl Secrets {
|
||||
const DATABASE_URL_ENV: &'static str = "DATABASE_URL";
|
||||
const PAGESERVER_JWT_TOKEN_ENV: &'static str = "PAGESERVER_JWT_TOKEN";
|
||||
@@ -399,6 +410,18 @@ async fn async_main() -> anyhow::Result<()> {
|
||||
None => Vec::new(),
|
||||
};
|
||||
|
||||
let posthog_config = if let Ok(json) = std::env::var(POSTHOG_CONFIG_ENV) {
|
||||
let res: Result<PostHogConfig, _> = serde_json::from_str(&json);
|
||||
if let Ok(config) = res {
|
||||
Some(config)
|
||||
} else {
|
||||
tracing::warn!("Invalid posthog config: {json}");
|
||||
None
|
||||
}
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
let config = Config {
|
||||
pageserver_jwt_token: secrets.pageserver_jwt_token,
|
||||
safekeeper_jwt_token: secrets.safekeeper_jwt_token,
|
||||
@@ -445,6 +468,8 @@ async fn async_main() -> anyhow::Result<()> {
|
||||
timelines_onto_safekeepers: args.timelines_onto_safekeepers,
|
||||
use_local_compute_notifications: args.use_local_compute_notifications,
|
||||
timeline_safekeeper_count: args.timeline_safekeeper_count,
|
||||
posthog_config: posthog_config.clone(),
|
||||
kick_secondary_downloads: args.kick_secondary_downloads,
|
||||
};
|
||||
|
||||
// Validate that we can connect to the database
|
||||
@@ -525,6 +550,29 @@ async fn async_main() -> anyhow::Result<()> {
|
||||
)
|
||||
});
|
||||
|
||||
let feature_flag_task = if let Some(posthog_config) = posthog_config {
|
||||
let service = service.clone();
|
||||
let cancel = CancellationToken::new();
|
||||
let cancel_bg = cancel.clone();
|
||||
let task = tokio::task::spawn(
|
||||
async move {
|
||||
match FeatureFlagService::new(service, posthog_config) {
|
||||
Ok(feature_flag_service) => {
|
||||
let feature_flag_service = Arc::new(feature_flag_service);
|
||||
feature_flag_service.run(cancel_bg).await
|
||||
}
|
||||
Err(e) => {
|
||||
tracing::warn!("Failed to create feature flag service: {}", e);
|
||||
}
|
||||
};
|
||||
}
|
||||
.instrument(tracing::info_span!("feature_flag_service")),
|
||||
);
|
||||
Some((task, cancel))
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
// Wait until we receive a signal
|
||||
let mut sigint = tokio::signal::unix::signal(SignalKind::interrupt())?;
|
||||
let mut sigquit = tokio::signal::unix::signal(SignalKind::quit())?;
|
||||
@@ -572,6 +620,12 @@ async fn async_main() -> anyhow::Result<()> {
|
||||
chaos_jh.await.ok();
|
||||
}
|
||||
|
||||
// If we were running the feature flag service, stop that so that we're not calling into Service while it shuts down
|
||||
if let Some((feature_flag_task, feature_flag_cancel)) = feature_flag_task {
|
||||
feature_flag_cancel.cancel();
|
||||
feature_flag_task.await.ok();
|
||||
}
|
||||
|
||||
service.shutdown().await;
|
||||
tracing::info!("Service shutdown complete");
|
||||
|
||||
|
||||
@@ -333,6 +333,7 @@ pub(crate) enum DatabaseErrorLabel {
|
||||
ConnectionPool,
|
||||
Logical,
|
||||
Migration,
|
||||
Cas,
|
||||
}
|
||||
|
||||
impl DatabaseError {
|
||||
@@ -343,6 +344,7 @@ impl DatabaseError {
|
||||
Self::ConnectionPool(_) => DatabaseErrorLabel::ConnectionPool,
|
||||
Self::Logical(_) => DatabaseErrorLabel::Logical,
|
||||
Self::Migration(_) => DatabaseErrorLabel::Migration,
|
||||
Self::Cas(_) => DatabaseErrorLabel::Cas,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -376,4 +376,13 @@ impl PageserverClient {
|
||||
.await
|
||||
)
|
||||
}
|
||||
|
||||
pub(crate) async fn update_feature_flag_spec(&self, spec: String) -> Result<()> {
|
||||
measured_request!(
|
||||
"update_feature_flag_spec",
|
||||
crate::metrics::Method::Post,
|
||||
&self.node_id_label,
|
||||
self.inner.update_feature_flag_spec(spec).await
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -29,6 +29,7 @@ use pageserver_api::shard::{
|
||||
use rustls::client::WebPkiServerVerifier;
|
||||
use rustls::client::danger::{ServerCertVerified, ServerCertVerifier};
|
||||
use rustls::crypto::ring;
|
||||
use safekeeper_api::membership::SafekeeperGeneration;
|
||||
use scoped_futures::ScopedBoxFuture;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use utils::generation::Generation;
|
||||
@@ -94,6 +95,8 @@ pub(crate) enum DatabaseError {
|
||||
Logical(String),
|
||||
#[error("Migration error: {0}")]
|
||||
Migration(String),
|
||||
#[error("CAS error: {0}")]
|
||||
Cas(String),
|
||||
}
|
||||
|
||||
#[derive(measured::FixedCardinalityLabel, Copy, Clone)]
|
||||
@@ -126,6 +129,7 @@ pub(crate) enum DatabaseOperation {
|
||||
UpdateLeader,
|
||||
SetPreferredAzs,
|
||||
InsertTimeline,
|
||||
UpdateTimelineMembership,
|
||||
GetTimeline,
|
||||
InsertTimelineReconcile,
|
||||
RemoveTimelineReconcile,
|
||||
@@ -500,15 +504,13 @@ impl Persistence {
|
||||
if let Some(np) = node_to_delete {
|
||||
let lc = NodeLifecycle::from_str(&np.lifecycle).map_err(|e| {
|
||||
DatabaseError::Logical(format!(
|
||||
"Node {} has invalid lifecycle: {}",
|
||||
del_node_id, e
|
||||
"Node {del_node_id} has invalid lifecycle: {e}"
|
||||
))
|
||||
})?;
|
||||
|
||||
if lc != NodeLifecycle::Deleted {
|
||||
return Err(DatabaseError::Logical(format!(
|
||||
"Node {} was not soft deleted before, cannot hard delete it",
|
||||
del_node_id
|
||||
"Node {del_node_id} was not soft deleted before, cannot hard delete it"
|
||||
)));
|
||||
}
|
||||
|
||||
@@ -642,8 +644,7 @@ impl Persistence {
|
||||
.await?;
|
||||
if deleted_node > 0 {
|
||||
return Err(DatabaseError::Logical(format!(
|
||||
"Node {} is marked as deleted, re-attach is not allowed",
|
||||
input_node_id
|
||||
"Node {input_node_id} is marked as deleted, re-attach is not allowed"
|
||||
)));
|
||||
}
|
||||
|
||||
@@ -1003,7 +1004,7 @@ impl Persistence {
|
||||
.execute(conn).await?;
|
||||
if u8::try_from(updated)
|
||||
.map_err(|_| DatabaseError::Logical(
|
||||
format!("Overflow existing shard count {} while splitting", updated))
|
||||
format!("Overflow existing shard count {updated} while splitting"))
|
||||
)? != old_shard_count.count() {
|
||||
// Perhaps a deletion or another split raced with this attempt to split, mutating
|
||||
// the parent shards that we intend to split. In this case the split request should fail.
|
||||
@@ -1343,8 +1344,7 @@ impl Persistence {
|
||||
|
||||
if inserted_updated != 1 {
|
||||
return Err(DatabaseError::Logical(format!(
|
||||
"unexpected number of rows ({})",
|
||||
inserted_updated
|
||||
"unexpected number of rows ({inserted_updated})"
|
||||
)));
|
||||
}
|
||||
|
||||
@@ -1406,8 +1406,57 @@ impl Persistence {
|
||||
0 => Ok(false),
|
||||
1 => Ok(true),
|
||||
_ => Err(DatabaseError::Logical(format!(
|
||||
"unexpected number of rows ({})",
|
||||
inserted_updated
|
||||
"unexpected number of rows ({inserted_updated})"
|
||||
))),
|
||||
}
|
||||
})
|
||||
})
|
||||
.await
|
||||
}
|
||||
|
||||
/// Update timeline membership configuration in the database.
|
||||
/// Perform a compare-and-swap (CAS) operation on the timeline's generation.
|
||||
/// The `new_generation` must be the next (+1) generation after the one in the database.
|
||||
pub(crate) async fn update_timeline_membership(
|
||||
&self,
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
new_generation: SafekeeperGeneration,
|
||||
sk_set: &[NodeId],
|
||||
new_sk_set: Option<&[NodeId]>,
|
||||
) -> DatabaseResult<()> {
|
||||
use crate::schema::timelines::dsl;
|
||||
|
||||
let prev_generation = new_generation.previous().unwrap();
|
||||
|
||||
let tenant_id = &tenant_id;
|
||||
let timeline_id = &timeline_id;
|
||||
self.with_measured_conn(DatabaseOperation::UpdateTimelineMembership, move |conn| {
|
||||
Box::pin(async move {
|
||||
let updated = diesel::update(dsl::timelines)
|
||||
.filter(dsl::tenant_id.eq(&tenant_id.to_string()))
|
||||
.filter(dsl::timeline_id.eq(&timeline_id.to_string()))
|
||||
.filter(dsl::generation.eq(prev_generation.into_inner() as i32))
|
||||
.set((
|
||||
dsl::generation.eq(new_generation.into_inner() as i32),
|
||||
dsl::sk_set.eq(sk_set.iter().map(|id| id.0 as i64).collect::<Vec<_>>()),
|
||||
dsl::new_sk_set.eq(new_sk_set
|
||||
.map(|set| set.iter().map(|id| id.0 as i64).collect::<Vec<_>>())),
|
||||
))
|
||||
.execute(conn)
|
||||
.await?;
|
||||
|
||||
match updated {
|
||||
0 => {
|
||||
// TODO(diko): It makes sense to select the current generation
|
||||
// and include it in the error message for better debuggability.
|
||||
Err(DatabaseError::Cas(
|
||||
"Failed to update membership configuration".to_string(),
|
||||
))
|
||||
}
|
||||
1 => Ok(()),
|
||||
_ => Err(DatabaseError::Logical(format!(
|
||||
"unexpected number of rows ({updated})"
|
||||
))),
|
||||
}
|
||||
})
|
||||
@@ -1476,8 +1525,7 @@ impl Persistence {
|
||||
0 => Ok(()),
|
||||
1 => Ok(()),
|
||||
_ => Err(DatabaseError::Logical(format!(
|
||||
"unexpected number of rows ({})",
|
||||
updated
|
||||
"unexpected number of rows ({updated})"
|
||||
))),
|
||||
}
|
||||
})
|
||||
@@ -1570,8 +1618,7 @@ impl Persistence {
|
||||
0 => Ok(false),
|
||||
1 => Ok(true),
|
||||
_ => Err(DatabaseError::Logical(format!(
|
||||
"unexpected number of rows ({})",
|
||||
inserted_updated
|
||||
"unexpected number of rows ({inserted_updated})"
|
||||
))),
|
||||
}
|
||||
})
|
||||
|
||||
@@ -65,7 +65,7 @@ pub(super) struct Reconciler {
|
||||
pub(crate) compute_hook: Arc<ComputeHook>,
|
||||
|
||||
/// To avoid stalling if the cloud control plane is unavailable, we may proceed
|
||||
/// past failures in [`ComputeHook::notify`], but we _must_ remember that we failed
|
||||
/// past failures in [`ComputeHook::notify_attach`], but we _must_ remember that we failed
|
||||
/// so that we can set [`crate::tenant_shard::TenantShard::pending_compute_notification`] to ensure a later retry.
|
||||
pub(crate) compute_notify_failure: bool,
|
||||
|
||||
@@ -856,6 +856,7 @@ impl Reconciler {
|
||||
&self.shard,
|
||||
&self.config,
|
||||
&self.placement_policy,
|
||||
self.intent.secondary.len(),
|
||||
);
|
||||
match self.observed.locations.get(&node.get_id()) {
|
||||
Some(conf) if conf.conf.as_ref() == Some(&wanted_conf) => {
|
||||
@@ -1022,7 +1023,7 @@ impl Reconciler {
|
||||
if let Some(node) = &self.intent.attached {
|
||||
let result = self
|
||||
.compute_hook
|
||||
.notify(
|
||||
.notify_attach(
|
||||
compute_hook::ShardUpdate {
|
||||
tenant_shard_id: self.tenant_shard_id,
|
||||
node_id: node.get_id(),
|
||||
@@ -1235,11 +1236,11 @@ pub(crate) fn attached_location_conf(
|
||||
shard: &ShardIdentity,
|
||||
config: &TenantConfig,
|
||||
policy: &PlacementPolicy,
|
||||
secondary_count: usize,
|
||||
) -> LocationConfig {
|
||||
let has_secondaries = match policy {
|
||||
PlacementPolicy::Attached(0) | PlacementPolicy::Detached | PlacementPolicy::Secondary => {
|
||||
false
|
||||
}
|
||||
PlacementPolicy::Detached | PlacementPolicy::Secondary => false,
|
||||
PlacementPolicy::Attached(0) => secondary_count > 0,
|
||||
PlacementPolicy::Attached(_) => true,
|
||||
};
|
||||
|
||||
|
||||
@@ -2,6 +2,7 @@ use std::time::Duration;
|
||||
|
||||
use pageserver_api::controller_api::{SafekeeperDescribeResponse, SkSchedulingPolicy};
|
||||
use reqwest::StatusCode;
|
||||
use safekeeper_api::membership::SafekeeperId;
|
||||
use safekeeper_client::mgmt_api;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use utils::backoff;
|
||||
@@ -92,6 +93,13 @@ impl Safekeeper {
|
||||
pub(crate) fn has_https_port(&self) -> bool {
|
||||
self.listen_https_port.is_some()
|
||||
}
|
||||
pub(crate) fn get_safekeeper_id(&self) -> SafekeeperId {
|
||||
SafekeeperId {
|
||||
id: self.id,
|
||||
host: self.skp.host.clone(),
|
||||
pg_port: self.skp.port as u16,
|
||||
}
|
||||
}
|
||||
/// Perform an operation (which is given a [`SafekeeperClient`]) with retries
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub(crate) async fn with_client_retries<T, O, F>(
|
||||
|
||||
@@ -56,6 +56,10 @@ impl SafekeeperClient {
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn node_id_label(&self) -> &str {
|
||||
&self.node_id_label
|
||||
}
|
||||
|
||||
pub(crate) async fn create_timeline(
|
||||
&self,
|
||||
req: &TimelineCreateRequest,
|
||||
|
||||
@@ -23,7 +23,7 @@ pub enum ScheduleError {
|
||||
|
||||
impl From<ScheduleError> for ApiError {
|
||||
fn from(value: ScheduleError) -> Self {
|
||||
ApiError::Conflict(format!("Scheduling error: {}", value))
|
||||
ApiError::Conflict(format!("Scheduling error: {value}"))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -825,6 +825,7 @@ impl Scheduler {
|
||||
struct AzScore {
|
||||
home_shard_count: usize,
|
||||
scheduleable: bool,
|
||||
node_count: usize,
|
||||
}
|
||||
|
||||
let mut azs: HashMap<&AvailabilityZone, AzScore> = HashMap::new();
|
||||
@@ -832,6 +833,7 @@ impl Scheduler {
|
||||
let az = azs.entry(&node.az).or_default();
|
||||
az.home_shard_count += node.home_shard_count;
|
||||
az.scheduleable |= matches!(node.may_schedule, MaySchedule::Yes(_));
|
||||
az.node_count += 1;
|
||||
}
|
||||
|
||||
// If any AZs are schedulable, then filter out the non-schedulable ones (i.e. AZs where
|
||||
@@ -840,10 +842,20 @@ impl Scheduler {
|
||||
azs.retain(|_, i| i.scheduleable);
|
||||
}
|
||||
|
||||
// We will multiply up shard counts by the max node count for scoring, before dividing
|
||||
// by per-node max node count, to get a normalized score that doesn't collapse to zero
|
||||
// when the absolute shard count is less than the node count.
|
||||
let max_node_count = azs.values().map(|i| i.node_count).max().unwrap_or(0);
|
||||
|
||||
// Find the AZ with the lowest number of shards currently allocated
|
||||
Some(
|
||||
azs.into_iter()
|
||||
.min_by_key(|i| (i.1.home_shard_count, i.0))
|
||||
.min_by_key(|i| {
|
||||
(
|
||||
(i.1.home_shard_count * max_node_count) / i.1.node_count,
|
||||
i.0,
|
||||
)
|
||||
})
|
||||
.unwrap()
|
||||
.0
|
||||
.clone(),
|
||||
@@ -891,7 +903,7 @@ impl Scheduler {
|
||||
/// rigorously updating them on every change.
|
||||
pub(crate) fn update_metrics(&self) {
|
||||
for (node_id, node) in &self.nodes {
|
||||
let node_id_str = format!("{}", node_id);
|
||||
let node_id_str = format!("{node_id}");
|
||||
let label_group = NodeLabelGroup {
|
||||
az: &node.az.0,
|
||||
node_id: &node_id_str,
|
||||
@@ -1314,7 +1326,7 @@ mod tests {
|
||||
.map(|(node_id, node)| (node_id, node.home_shard_count))
|
||||
.collect::<Vec<_>>();
|
||||
node_home_counts.sort_by_key(|i| i.0);
|
||||
eprintln!("Selected {}, vs nodes {:?}", preferred_az, node_home_counts);
|
||||
eprintln!("Selected {preferred_az}, vs nodes {node_home_counts:?}");
|
||||
|
||||
let tenant_shard_id = TenantShardId {
|
||||
tenant_id: TenantId::generate(),
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
pub mod chaos_injector;
|
||||
mod context_iterator;
|
||||
pub mod feature_flag;
|
||||
pub(crate) mod safekeeper_reconciler;
|
||||
mod safekeeper_service;
|
||||
|
||||
@@ -25,6 +26,7 @@ use futures::stream::FuturesUnordered;
|
||||
use http_utils::error::ApiError;
|
||||
use hyper::Uri;
|
||||
use itertools::Itertools;
|
||||
use pageserver_api::config::PostHogConfig;
|
||||
use pageserver_api::controller_api::{
|
||||
AvailabilityZone, MetadataHealthRecord, MetadataHealthUpdateRequest, NodeAvailability,
|
||||
NodeRegisterRequest, NodeSchedulingPolicy, NodeShard, NodeShardResponse, PlacementPolicy,
|
||||
@@ -159,6 +161,7 @@ enum TenantOperations {
|
||||
DropDetached,
|
||||
DownloadHeatmapLayers,
|
||||
TimelineLsnLease,
|
||||
TimelineSafekeeperMigrate,
|
||||
}
|
||||
|
||||
#[derive(Clone, strum_macros::Display)]
|
||||
@@ -260,7 +263,7 @@ fn passthrough_api_error(node: &Node, e: mgmt_api::Error) -> ApiError {
|
||||
// Presume errors receiving body are connectivity/availability issues except for decoding errors
|
||||
let src_str = err.source().map(|e| e.to_string()).unwrap_or_default();
|
||||
ApiError::ResourceUnavailable(
|
||||
format!("{node} error receiving error body: {err} {}", src_str).into(),
|
||||
format!("{node} error receiving error body: {err} {src_str}").into(),
|
||||
)
|
||||
}
|
||||
mgmt_api::Error::ApiError(StatusCode::NOT_FOUND, msg) => {
|
||||
@@ -469,7 +472,13 @@ pub struct Config {
|
||||
|
||||
/// Number of safekeepers to choose for a timeline when creating it.
|
||||
/// Safekeepers will be choosen from different availability zones.
|
||||
pub timeline_safekeeper_count: i64,
|
||||
pub timeline_safekeeper_count: usize,
|
||||
|
||||
/// PostHog integration config
|
||||
pub posthog_config: Option<PostHogConfig>,
|
||||
|
||||
/// When set, actively checks and initiates heatmap downloads/uploads.
|
||||
pub kick_secondary_downloads: bool,
|
||||
}
|
||||
|
||||
impl From<DatabaseError> for ApiError {
|
||||
@@ -483,6 +492,7 @@ impl From<DatabaseError> for ApiError {
|
||||
DatabaseError::Logical(reason) | DatabaseError::Migration(reason) => {
|
||||
ApiError::InternalServerError(anyhow::anyhow!(reason))
|
||||
}
|
||||
DatabaseError::Cas(reason) => ApiError::Conflict(reason),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -668,7 +678,7 @@ impl std::fmt::Display for StopReconciliationsReason {
|
||||
Self::ShuttingDown => "Shutting down",
|
||||
Self::SteppingDown => "Stepping down",
|
||||
};
|
||||
write!(writer, "{}", s)
|
||||
write!(writer, "{s}")
|
||||
}
|
||||
}
|
||||
|
||||
@@ -868,18 +878,18 @@ impl Service {
|
||||
// Emit compute hook notifications for all tenants which are already stably attached. Other tenants
|
||||
// will emit compute hook notifications when they reconcile.
|
||||
//
|
||||
// Ordering: our calls to notify_background synchronously establish a relative order for these notifications vs. any later
|
||||
// Ordering: our calls to notify_attach_background synchronously establish a relative order for these notifications vs. any later
|
||||
// calls into the ComputeHook for the same tenant: we can leave these to run to completion in the background and any later
|
||||
// calls will be correctly ordered wrt these.
|
||||
//
|
||||
// Concurrency: we call notify_background for all tenants, which will create O(N) tokio tasks, but almost all of them
|
||||
// Concurrency: we call notify_attach_background for all tenants, which will create O(N) tokio tasks, but almost all of them
|
||||
// will just wait on the ComputeHook::API_CONCURRENCY semaphore immediately, so very cheap until they get that semaphore
|
||||
// unit and start doing I/O.
|
||||
tracing::info!(
|
||||
"Sending {} compute notifications",
|
||||
compute_notifications.len()
|
||||
);
|
||||
self.compute_hook.notify_background(
|
||||
self.compute_hook.notify_attach_background(
|
||||
compute_notifications,
|
||||
bg_compute_notify_result_tx.clone(),
|
||||
&self.cancel,
|
||||
@@ -2064,6 +2074,7 @@ impl Service {
|
||||
&tenant_shard.shard,
|
||||
&tenant_shard.config,
|
||||
&PlacementPolicy::Attached(0),
|
||||
tenant_shard.intent.get_secondary().len(),
|
||||
)),
|
||||
},
|
||||
)]);
|
||||
@@ -2573,7 +2584,7 @@ impl Service {
|
||||
.do_initial_shard_scheduling(
|
||||
tenant_shard_id,
|
||||
initial_generation,
|
||||
&create_req.shard_parameters,
|
||||
create_req.shard_parameters,
|
||||
create_req.config.clone(),
|
||||
placement_policy.clone(),
|
||||
preferred_az_id.as_ref(),
|
||||
@@ -2630,7 +2641,7 @@ impl Service {
|
||||
&self,
|
||||
tenant_shard_id: TenantShardId,
|
||||
initial_generation: Option<Generation>,
|
||||
shard_params: &ShardParameters,
|
||||
shard_params: ShardParameters,
|
||||
config: TenantConfig,
|
||||
placement_policy: PlacementPolicy,
|
||||
preferred_az_id: Option<&AvailabilityZone>,
|
||||
@@ -5274,7 +5285,7 @@ impl Service {
|
||||
shard_params,
|
||||
result
|
||||
.iter()
|
||||
.map(|s| format!("{:?}", s))
|
||||
.map(|s| format!("{s:?}"))
|
||||
.collect::<Vec<_>>()
|
||||
.join(",")
|
||||
);
|
||||
@@ -5605,7 +5616,15 @@ impl Service {
|
||||
for parent_id in parent_ids {
|
||||
let child_ids = parent_id.split(new_shard_count);
|
||||
|
||||
let (pageserver, generation, policy, parent_ident, config, preferred_az) = {
|
||||
let (
|
||||
pageserver,
|
||||
generation,
|
||||
policy,
|
||||
parent_ident,
|
||||
config,
|
||||
preferred_az,
|
||||
secondary_count,
|
||||
) = {
|
||||
let mut old_state = tenants
|
||||
.remove(&parent_id)
|
||||
.expect("It was present, we just split it");
|
||||
@@ -5625,6 +5644,7 @@ impl Service {
|
||||
old_state.shard,
|
||||
old_state.config.clone(),
|
||||
old_state.preferred_az().cloned(),
|
||||
old_state.intent.get_secondary().len(),
|
||||
)
|
||||
};
|
||||
|
||||
@@ -5646,6 +5666,7 @@ impl Service {
|
||||
&child_shard,
|
||||
&config,
|
||||
&policy,
|
||||
secondary_count,
|
||||
)),
|
||||
},
|
||||
);
|
||||
@@ -6187,7 +6208,7 @@ impl Service {
|
||||
},
|
||||
)
|
||||
.await
|
||||
.map_err(|e| ApiError::Conflict(format!("Failed to split {}: {}", parent_id, e)))?;
|
||||
.map_err(|e| ApiError::Conflict(format!("Failed to split {parent_id}: {e}")))?;
|
||||
|
||||
fail::fail_point!("shard-split-post-remote", |_| Err(ApiError::Conflict(
|
||||
"failpoint".to_string()
|
||||
@@ -6204,7 +6225,7 @@ impl Service {
|
||||
response
|
||||
.new_shards
|
||||
.iter()
|
||||
.map(|s| format!("{:?}", s))
|
||||
.map(|s| format!("{s:?}"))
|
||||
.collect::<Vec<_>>()
|
||||
.join(",")
|
||||
);
|
||||
@@ -6260,7 +6281,7 @@ impl Service {
|
||||
for (child_id, child_ps, stripe_size) in child_locations {
|
||||
if let Err(e) = self
|
||||
.compute_hook
|
||||
.notify(
|
||||
.notify_attach(
|
||||
compute_hook::ShardUpdate {
|
||||
tenant_shard_id: child_id,
|
||||
node_id: child_ps,
|
||||
@@ -7103,8 +7124,7 @@ impl Service {
|
||||
Ok(())
|
||||
} else {
|
||||
Err(ApiError::Conflict(format!(
|
||||
"Node {} is in use, consider using tombstone API first",
|
||||
node_id
|
||||
"Node {node_id} is in use, consider using tombstone API first"
|
||||
)))
|
||||
}
|
||||
}
|
||||
@@ -7654,7 +7674,7 @@ impl Service {
|
||||
|
||||
if let Some(ongoing) = ongoing_op {
|
||||
return Err(ApiError::PreconditionFailed(
|
||||
format!("Background operation already ongoing for node: {}", ongoing).into(),
|
||||
format!("Background operation already ongoing for node: {ongoing}").into(),
|
||||
));
|
||||
}
|
||||
|
||||
@@ -7785,7 +7805,7 @@ impl Service {
|
||||
|
||||
if let Some(ongoing) = ongoing_op {
|
||||
return Err(ApiError::PreconditionFailed(
|
||||
format!("Background operation already ongoing for node: {}", ongoing).into(),
|
||||
format!("Background operation already ongoing for node: {ongoing}").into(),
|
||||
));
|
||||
}
|
||||
|
||||
@@ -8346,7 +8366,6 @@ impl Service {
|
||||
"Skipping migration of {tenant_shard_id} to {node} because secondary isn't ready: {progress:?}"
|
||||
);
|
||||
|
||||
#[cfg(feature = "testing")]
|
||||
if progress.heatmap_mtime.is_none() {
|
||||
// No heatmap might mean the attached location has never uploaded one, or that
|
||||
// the secondary download hasn't happened yet. This is relatively unusual in the field,
|
||||
@@ -8371,8 +8390,12 @@ impl Service {
|
||||
/// happens on multi-minute timescales in the field, which is fine because optimisation is meant
|
||||
/// to be a lazy background thing. However, when testing, it is not practical to wait around, so
|
||||
/// we have this helper to move things along faster.
|
||||
#[cfg(feature = "testing")]
|
||||
async fn kick_secondary_download(&self, tenant_shard_id: TenantShardId) {
|
||||
if !self.config.kick_secondary_downloads {
|
||||
// No-op if kick_secondary_downloads functionaliuty is not configured
|
||||
return;
|
||||
}
|
||||
|
||||
let (attached_node, secondaries) = {
|
||||
let locked = self.inner.read().unwrap();
|
||||
let Some(shard) = locked.tenants.get(&tenant_shard_id) else {
|
||||
@@ -8851,7 +8874,7 @@ impl Service {
|
||||
let nodes = self.inner.read().unwrap().nodes.clone();
|
||||
let node = nodes.get(secondary).ok_or(mgmt_api::Error::ApiError(
|
||||
StatusCode::NOT_FOUND,
|
||||
format!("Node with id {} not found", secondary),
|
||||
format!("Node with id {secondary} not found"),
|
||||
))?;
|
||||
|
||||
match node
|
||||
@@ -8930,8 +8953,7 @@ impl Service {
|
||||
Err(err) => {
|
||||
return Err(OperationError::FinalizeError(
|
||||
format!(
|
||||
"Failed to finalise drain cancel of {} by setting scheduling policy to Active: {}",
|
||||
node_id, err
|
||||
"Failed to finalise drain cancel of {node_id} by setting scheduling policy to Active: {err}"
|
||||
)
|
||||
.into(),
|
||||
));
|
||||
@@ -9035,8 +9057,7 @@ impl Service {
|
||||
Err(err) => {
|
||||
return Err(OperationError::FinalizeError(
|
||||
format!(
|
||||
"Failed to finalise drain cancel of {} by setting scheduling policy to Active: {}",
|
||||
node_id, err
|
||||
"Failed to finalise drain cancel of {node_id} by setting scheduling policy to Active: {err}"
|
||||
)
|
||||
.into(),
|
||||
));
|
||||
@@ -9246,8 +9267,7 @@ impl Service {
|
||||
Err(err) => {
|
||||
return Err(OperationError::FinalizeError(
|
||||
format!(
|
||||
"Failed to finalise drain cancel of {} by setting scheduling policy to Active: {}",
|
||||
node_id, err
|
||||
"Failed to finalise drain cancel of {node_id} by setting scheduling policy to Active: {err}"
|
||||
)
|
||||
.into(),
|
||||
));
|
||||
@@ -9329,8 +9349,7 @@ impl Service {
|
||||
Err(err) => {
|
||||
return Err(OperationError::FinalizeError(
|
||||
format!(
|
||||
"Failed to finalise drain cancel of {} by setting scheduling policy to Active: {}",
|
||||
node_id, err
|
||||
"Failed to finalise drain cancel of {node_id} by setting scheduling policy to Active: {err}"
|
||||
)
|
||||
.into(),
|
||||
));
|
||||
|
||||
111
storage_controller/src/service/feature_flag.rs
Normal file
111
storage_controller/src/service/feature_flag.rs
Normal file
@@ -0,0 +1,111 @@
|
||||
use std::{sync::Arc, time::Duration};
|
||||
|
||||
use futures::StreamExt;
|
||||
use pageserver_api::config::PostHogConfig;
|
||||
use pageserver_client::mgmt_api;
|
||||
use posthog_client_lite::PostHogClient;
|
||||
use reqwest::StatusCode;
|
||||
use tokio::time::MissedTickBehavior;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
|
||||
use crate::{pageserver_client::PageserverClient, service::Service};
|
||||
|
||||
pub struct FeatureFlagService {
|
||||
service: Arc<Service>,
|
||||
config: PostHogConfig,
|
||||
client: PostHogClient,
|
||||
http_client: reqwest::Client,
|
||||
}
|
||||
|
||||
const DEFAULT_POSTHOG_REFRESH_INTERVAL: Duration = Duration::from_secs(30);
|
||||
|
||||
impl FeatureFlagService {
|
||||
pub fn new(service: Arc<Service>, config: PostHogConfig) -> Result<Self, &'static str> {
|
||||
let client = PostHogClient::new(config.clone().try_into_posthog_config()?);
|
||||
Ok(Self {
|
||||
service,
|
||||
config,
|
||||
client,
|
||||
http_client: reqwest::Client::new(),
|
||||
})
|
||||
}
|
||||
|
||||
async fn refresh(self: Arc<Self>, cancel: CancellationToken) -> Result<(), anyhow::Error> {
|
||||
let nodes = {
|
||||
let inner = self.service.inner.read().unwrap();
|
||||
inner.nodes.clone()
|
||||
};
|
||||
|
||||
let feature_flag_spec = self.client.get_feature_flags_local_evaluation_raw().await?;
|
||||
let stream = futures::stream::iter(nodes.values().cloned()).map(|node| {
|
||||
let this = self.clone();
|
||||
let feature_flag_spec = feature_flag_spec.clone();
|
||||
async move {
|
||||
let res = async {
|
||||
let client = PageserverClient::new(
|
||||
node.get_id(),
|
||||
this.http_client.clone(),
|
||||
node.base_url(),
|
||||
// TODO: what if we rotate the token during storcon lifetime?
|
||||
this.service.config.pageserver_jwt_token.as_deref(),
|
||||
);
|
||||
|
||||
client.update_feature_flag_spec(feature_flag_spec).await?;
|
||||
tracing::info!(
|
||||
"Updated {}({}) with feature flag spec",
|
||||
node.get_id(),
|
||||
node.base_url()
|
||||
);
|
||||
Ok::<_, mgmt_api::Error>(())
|
||||
};
|
||||
|
||||
if let Err(e) = res.await {
|
||||
if let mgmt_api::Error::ApiError(status, _) = e {
|
||||
if status == StatusCode::NOT_FOUND {
|
||||
// This is expected during deployments where the API is not available, so we can ignore it
|
||||
return;
|
||||
}
|
||||
}
|
||||
tracing::warn!(
|
||||
"Failed to update feature flag spec for {}: {e}",
|
||||
node.get_id()
|
||||
);
|
||||
}
|
||||
}
|
||||
});
|
||||
let mut stream = stream.buffer_unordered(8);
|
||||
|
||||
while stream.next().await.is_some() {
|
||||
if cancel.is_cancelled() {
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn run(self: Arc<Self>, cancel: CancellationToken) {
|
||||
let refresh_interval = self
|
||||
.config
|
||||
.refresh_interval
|
||||
.unwrap_or(DEFAULT_POSTHOG_REFRESH_INTERVAL);
|
||||
let mut interval = tokio::time::interval(refresh_interval);
|
||||
interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
|
||||
tracing::info!(
|
||||
"Starting feature flag service with refresh interval: {:?}",
|
||||
refresh_interval
|
||||
);
|
||||
loop {
|
||||
tokio::select! {
|
||||
_ = interval.tick() => {}
|
||||
_ = cancel.cancelled() => {
|
||||
break;
|
||||
}
|
||||
}
|
||||
let res = self.clone().refresh(cancel.clone()).await;
|
||||
if let Err(e) = res {
|
||||
tracing::error!("Failed to refresh feature flags: {e:#?}");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -145,7 +145,7 @@ pub(crate) async fn load_schedule_requests(
|
||||
}
|
||||
let Some(sk) = safekeepers.get(&other_node_id) else {
|
||||
tracing::warn!(
|
||||
"couldnt find safekeeper with pending op id {other_node_id}, not pulling from it"
|
||||
"couldn't find safekeeper with pending op id {other_node_id}, not pulling from it"
|
||||
);
|
||||
return None;
|
||||
};
|
||||
|
||||
@@ -1,24 +1,34 @@
|
||||
use std::cmp::max;
|
||||
use std::collections::HashSet;
|
||||
use std::str::FromStr;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use super::safekeeper_reconciler::ScheduleRequest;
|
||||
use crate::compute_hook;
|
||||
use crate::heartbeater::SafekeeperState;
|
||||
use crate::id_lock_map::trace_shared_lock;
|
||||
use crate::metrics;
|
||||
use crate::persistence::{
|
||||
DatabaseError, SafekeeperTimelineOpKind, TimelinePendingOpPersistence, TimelinePersistence,
|
||||
};
|
||||
use crate::safekeeper::Safekeeper;
|
||||
use crate::safekeeper_client::SafekeeperClient;
|
||||
use crate::service::TenantOperations;
|
||||
use crate::timeline_import::TimelineImportFinalizeError;
|
||||
use anyhow::Context;
|
||||
use http_utils::error::ApiError;
|
||||
use pageserver_api::controller_api::{
|
||||
SafekeeperDescribeResponse, SkSchedulingPolicy, TimelineImportRequest,
|
||||
TimelineSafekeeperMigrateRequest,
|
||||
};
|
||||
use pageserver_api::models::{SafekeeperInfo, SafekeepersInfo, TimelineInfo};
|
||||
use safekeeper_api::membership::{MemberSet, SafekeeperGeneration, SafekeeperId};
|
||||
use safekeeper_api::PgVersionId;
|
||||
use safekeeper_api::membership::{self, MemberSet, SafekeeperGeneration};
|
||||
use safekeeper_api::models::{
|
||||
PullTimelineRequest, TimelineMembershipSwitchRequest, TimelineMembershipSwitchResponse,
|
||||
};
|
||||
use safekeeper_api::{INITIAL_TERM, Term};
|
||||
use safekeeper_client::mgmt_api;
|
||||
use tokio::task::JoinSet;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use utils::id::{NodeId, TenantId, TimelineId};
|
||||
@@ -35,6 +45,33 @@ pub struct TimelineLocateResponse {
|
||||
}
|
||||
|
||||
impl Service {
|
||||
fn make_member_set(safekeepers: &[Safekeeper]) -> Result<MemberSet, ApiError> {
|
||||
let members = safekeepers
|
||||
.iter()
|
||||
.map(|sk| sk.get_safekeeper_id())
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
MemberSet::new(members).map_err(ApiError::InternalServerError)
|
||||
}
|
||||
|
||||
fn get_safekeepers(&self, ids: &[i64]) -> Result<Vec<Safekeeper>, ApiError> {
|
||||
let safekeepers = {
|
||||
let locked = self.inner.read().unwrap();
|
||||
locked.safekeepers.clone()
|
||||
};
|
||||
|
||||
ids.iter()
|
||||
.map(|&id| {
|
||||
let node_id = NodeId(id as u64);
|
||||
safekeepers.get(&node_id).cloned().ok_or_else(|| {
|
||||
ApiError::InternalServerError(anyhow::anyhow!(
|
||||
"safekeeper {node_id} is not registered"
|
||||
))
|
||||
})
|
||||
})
|
||||
.collect::<Result<Vec<_>, _>>()
|
||||
}
|
||||
|
||||
/// Timeline creation on safekeepers
|
||||
///
|
||||
/// Returns `Ok(left)` if the timeline has been created on a quorum of safekeepers,
|
||||
@@ -44,38 +81,12 @@ impl Service {
|
||||
&self,
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
pg_version: u32,
|
||||
pg_version: PgVersionId,
|
||||
timeline_persistence: &TimelinePersistence,
|
||||
) -> Result<Vec<NodeId>, ApiError> {
|
||||
// If quorum is reached, return if we are outside of a specified timeout
|
||||
let jwt = self
|
||||
.config
|
||||
.safekeeper_jwt_token
|
||||
.clone()
|
||||
.map(SecretString::from);
|
||||
let mut joinset = JoinSet::new();
|
||||
let safekeepers = self.get_safekeepers(&timeline_persistence.sk_set)?;
|
||||
|
||||
// Prepare membership::Configuration from choosen safekeepers.
|
||||
let safekeepers = {
|
||||
let locked = self.inner.read().unwrap();
|
||||
locked.safekeepers.clone()
|
||||
};
|
||||
|
||||
let mut members = Vec::new();
|
||||
for sk_id in timeline_persistence.sk_set.iter() {
|
||||
let sk_id = NodeId(*sk_id as u64);
|
||||
let Some(safekeeper) = safekeepers.get(&sk_id) else {
|
||||
return Err(ApiError::InternalServerError(anyhow::anyhow!(
|
||||
"couldn't find entry for safekeeper with id {sk_id}"
|
||||
)))?;
|
||||
};
|
||||
members.push(SafekeeperId {
|
||||
id: sk_id,
|
||||
host: safekeeper.skp.host.clone(),
|
||||
pg_port: safekeeper.skp.port as u16,
|
||||
});
|
||||
}
|
||||
let mset = MemberSet::new(members).map_err(ApiError::InternalServerError)?;
|
||||
let mset = Self::make_member_set(&safekeepers)?;
|
||||
let mconf = safekeeper_api::membership::Configuration::new(mset);
|
||||
|
||||
let req = safekeeper_api::models::TimelineCreateRequest {
|
||||
@@ -88,79 +99,150 @@ impl Service {
|
||||
timeline_id,
|
||||
wal_seg_size: None,
|
||||
};
|
||||
|
||||
const SK_CREATE_TIMELINE_RECONCILE_TIMEOUT: Duration = Duration::from_secs(30);
|
||||
for sk in timeline_persistence.sk_set.iter() {
|
||||
let sk_id = NodeId(*sk as u64);
|
||||
let safekeepers = safekeepers.clone();
|
||||
|
||||
let results = self
|
||||
.tenant_timeline_safekeeper_op_quorum(
|
||||
&safekeepers,
|
||||
move |client| {
|
||||
let req = req.clone();
|
||||
async move { client.create_timeline(&req).await }
|
||||
},
|
||||
SK_CREATE_TIMELINE_RECONCILE_TIMEOUT,
|
||||
)
|
||||
.await?;
|
||||
|
||||
Ok(results
|
||||
.into_iter()
|
||||
.enumerate()
|
||||
.filter_map(|(idx, res)| {
|
||||
if res.is_ok() {
|
||||
None // Success, don't return this safekeeper
|
||||
} else {
|
||||
Some(safekeepers[idx].get_id()) // Failure, return this safekeeper
|
||||
}
|
||||
})
|
||||
.collect::<Vec<_>>())
|
||||
}
|
||||
|
||||
/// Perform an operation on a list of safekeepers in parallel with retries.
|
||||
///
|
||||
/// Return the results of the operation on each safekeeper in the input order.
|
||||
async fn tenant_timeline_safekeeper_op<T, O, F>(
|
||||
&self,
|
||||
safekeepers: &[Safekeeper],
|
||||
op: O,
|
||||
timeout: Duration,
|
||||
) -> Result<Vec<mgmt_api::Result<T>>, ApiError>
|
||||
where
|
||||
O: FnMut(SafekeeperClient) -> F + Send + 'static,
|
||||
O: Clone,
|
||||
F: std::future::Future<Output = mgmt_api::Result<T>> + Send + 'static,
|
||||
T: Sync + Send + 'static,
|
||||
{
|
||||
let jwt = self
|
||||
.config
|
||||
.safekeeper_jwt_token
|
||||
.clone()
|
||||
.map(SecretString::from);
|
||||
let mut joinset = JoinSet::new();
|
||||
|
||||
for (idx, sk) in safekeepers.iter().enumerate() {
|
||||
let sk = sk.clone();
|
||||
let http_client = self.http_client.clone();
|
||||
let jwt = jwt.clone();
|
||||
let req = req.clone();
|
||||
let op = op.clone();
|
||||
joinset.spawn(async move {
|
||||
// Unwrap is fine as we already would have returned error above
|
||||
let sk_p = safekeepers.get(&sk_id).unwrap();
|
||||
let res = sk_p
|
||||
let res = sk
|
||||
.with_client_retries(
|
||||
|client| {
|
||||
let req = req.clone();
|
||||
async move { client.create_timeline(&req).await }
|
||||
},
|
||||
op,
|
||||
&http_client,
|
||||
&jwt,
|
||||
3,
|
||||
3,
|
||||
SK_CREATE_TIMELINE_RECONCILE_TIMEOUT,
|
||||
// TODO(diko): This is a wrong timeout.
|
||||
// It should be scaled to the retry count.
|
||||
timeout,
|
||||
&CancellationToken::new(),
|
||||
)
|
||||
.await;
|
||||
(sk_id, sk_p.skp.host.clone(), res)
|
||||
(idx, res)
|
||||
});
|
||||
}
|
||||
|
||||
// Initialize results with timeout errors in case we never get a response.
|
||||
let mut results: Vec<mgmt_api::Result<T>> = safekeepers
|
||||
.iter()
|
||||
.map(|_| {
|
||||
Err(mgmt_api::Error::Timeout(
|
||||
"safekeeper operation timed out".to_string(),
|
||||
))
|
||||
})
|
||||
.collect();
|
||||
|
||||
// After we have built the joinset, we now wait for the tasks to complete,
|
||||
// but with a specified timeout to make sure we return swiftly, either with
|
||||
// a failure or success.
|
||||
let reconcile_deadline = tokio::time::Instant::now() + SK_CREATE_TIMELINE_RECONCILE_TIMEOUT;
|
||||
let reconcile_deadline = tokio::time::Instant::now() + timeout;
|
||||
|
||||
// Wait until all tasks finish or timeout is hit, whichever occurs
|
||||
// first.
|
||||
let mut reconcile_results = Vec::new();
|
||||
let mut result_count = 0;
|
||||
loop {
|
||||
if let Ok(res) = tokio::time::timeout_at(reconcile_deadline, joinset.join_next()).await
|
||||
{
|
||||
let Some(res) = res else { break };
|
||||
match res {
|
||||
Ok(res) => {
|
||||
Ok((idx, res)) => {
|
||||
let sk = &safekeepers[idx];
|
||||
tracing::info!(
|
||||
"response from safekeeper id:{} at {}: {:?}",
|
||||
res.0,
|
||||
res.1,
|
||||
res.2
|
||||
sk.get_id(),
|
||||
sk.skp.host,
|
||||
// Only print errors, as there is no Debug trait for T.
|
||||
res.as_ref().map(|_| ()),
|
||||
);
|
||||
reconcile_results.push(res);
|
||||
results[idx] = res;
|
||||
result_count += 1;
|
||||
}
|
||||
Err(join_err) => {
|
||||
tracing::info!("join_err for task in joinset: {join_err}");
|
||||
}
|
||||
}
|
||||
} else {
|
||||
tracing::info!(
|
||||
"timeout for creation call after {} responses",
|
||||
reconcile_results.len()
|
||||
);
|
||||
tracing::info!("timeout for operation call after {result_count} responses",);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
// Now check now if quorum was reached in reconcile_results.
|
||||
let total_result_count = reconcile_results.len();
|
||||
let remaining = reconcile_results
|
||||
.into_iter()
|
||||
.filter_map(|res| res.2.is_err().then_some(res.0))
|
||||
.collect::<Vec<_>>();
|
||||
tracing::info!(
|
||||
"Got {} non-successful responses from initial creation request of total {total_result_count} responses",
|
||||
remaining.len()
|
||||
);
|
||||
let target_sk_count = timeline_persistence.sk_set.len();
|
||||
Ok(results)
|
||||
}
|
||||
|
||||
/// Perform an operation on a list of safekeepers in parallel with retries,
|
||||
/// and validates that we reach a quorum of successful responses.
|
||||
///
|
||||
/// Return the results of the operation on each safekeeper in the input order.
|
||||
/// It's guaranteed that at least a quorum of the responses are successful.
|
||||
async fn tenant_timeline_safekeeper_op_quorum<T, O, F>(
|
||||
&self,
|
||||
safekeepers: &[Safekeeper],
|
||||
op: O,
|
||||
timeout: Duration,
|
||||
) -> Result<Vec<mgmt_api::Result<T>>, ApiError>
|
||||
where
|
||||
O: FnMut(SafekeeperClient) -> F,
|
||||
O: Clone + Send + 'static,
|
||||
F: std::future::Future<Output = mgmt_api::Result<T>> + Send + 'static,
|
||||
T: Sync + Send + 'static,
|
||||
{
|
||||
let results = self
|
||||
.tenant_timeline_safekeeper_op(safekeepers, op, timeout)
|
||||
.await?;
|
||||
|
||||
// Now check if quorum was reached in results.
|
||||
|
||||
let target_sk_count = safekeepers.len();
|
||||
let quorum_size = match target_sk_count {
|
||||
0 => {
|
||||
return Err(ApiError::InternalServerError(anyhow::anyhow!(
|
||||
@@ -179,7 +261,7 @@ impl Service {
|
||||
// in order to schedule work to them
|
||||
tracing::warn!(
|
||||
"couldn't find at least 3 safekeepers for timeline, found: {:?}",
|
||||
timeline_persistence.sk_set
|
||||
target_sk_count
|
||||
);
|
||||
return Err(ApiError::InternalServerError(anyhow::anyhow!(
|
||||
"couldn't find at least 3 safekeepers to put timeline to"
|
||||
@@ -188,7 +270,7 @@ impl Service {
|
||||
}
|
||||
_ => target_sk_count / 2 + 1,
|
||||
};
|
||||
let success_count = target_sk_count - remaining.len();
|
||||
let success_count = results.iter().filter(|res| res.is_ok()).count();
|
||||
if success_count < quorum_size {
|
||||
// Failure
|
||||
return Err(ApiError::InternalServerError(anyhow::anyhow!(
|
||||
@@ -196,7 +278,7 @@ impl Service {
|
||||
)));
|
||||
}
|
||||
|
||||
Ok(remaining)
|
||||
Ok(results)
|
||||
}
|
||||
|
||||
/// Create timeline in controller database and on safekeepers.
|
||||
@@ -219,7 +301,7 @@ impl Service {
|
||||
read_only: bool,
|
||||
) -> Result<SafekeepersInfo, ApiError> {
|
||||
let timeline_id = timeline_info.timeline_id;
|
||||
let pg_version = timeline_info.pg_version * 10000;
|
||||
let pg_version = PgVersionId::from(timeline_info.pg_version);
|
||||
// Initially start_lsn is determined by last_record_lsn in pageserver
|
||||
// response as it does initdb. However, later we persist it and in sk
|
||||
// creation calls replace with the value from the timeline row if it
|
||||
@@ -653,13 +735,7 @@ impl Service {
|
||||
)
|
||||
});
|
||||
// Number of safekeepers in different AZs we are looking for
|
||||
let mut wanted_count = self.config.timeline_safekeeper_count as usize;
|
||||
// TODO(diko): remove this when `timeline_safekeeper_count` option is in the release
|
||||
// branch and is specified in tests/neon_local config.
|
||||
if cfg!(feature = "testing") && all_safekeepers.len() < wanted_count {
|
||||
// In testing mode, we can have less safekeepers than the config says
|
||||
wanted_count = max(all_safekeepers.len(), 1);
|
||||
}
|
||||
let wanted_count = self.config.timeline_safekeeper_count;
|
||||
|
||||
let mut sks = Vec::new();
|
||||
let mut azs = HashSet::new();
|
||||
@@ -803,4 +879,486 @@ impl Service {
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Call `switch_timeline_membership` on all safekeepers with retries
|
||||
/// till the quorum of successful responses is reached.
|
||||
///
|
||||
/// If min_position is not None, validates that majority of safekeepers
|
||||
/// reached at least min_position.
|
||||
///
|
||||
/// Return responses from safekeepers in the input order.
|
||||
async fn tenant_timeline_set_membership_quorum(
|
||||
self: &Arc<Self>,
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
safekeepers: &[Safekeeper],
|
||||
config: &membership::Configuration,
|
||||
min_position: Option<(Term, Lsn)>,
|
||||
) -> Result<Vec<mgmt_api::Result<TimelineMembershipSwitchResponse>>, ApiError> {
|
||||
let req = TimelineMembershipSwitchRequest {
|
||||
mconf: config.clone(),
|
||||
};
|
||||
|
||||
const SK_SET_MEM_TIMELINE_RECONCILE_TIMEOUT: Duration = Duration::from_secs(30);
|
||||
|
||||
let results = self
|
||||
.tenant_timeline_safekeeper_op_quorum(
|
||||
safekeepers,
|
||||
move |client| {
|
||||
let req = req.clone();
|
||||
async move {
|
||||
let mut res = client
|
||||
.switch_timeline_membership(tenant_id, timeline_id, &req)
|
||||
.await;
|
||||
|
||||
// If min_position is not reached, map the response to an error,
|
||||
// so it isn't counted toward the quorum.
|
||||
if let Some(min_position) = min_position {
|
||||
if let Ok(ok_res) = &res {
|
||||
if (ok_res.term, ok_res.flush_lsn) < min_position {
|
||||
// Use Error::Timeout to make this error retriable.
|
||||
res = Err(mgmt_api::Error::Timeout(
|
||||
format!(
|
||||
"safekeeper {} returned position {:?} which is less than minimum required position {:?}",
|
||||
client.node_id_label(),
|
||||
(ok_res.term, ok_res.flush_lsn),
|
||||
min_position
|
||||
)
|
||||
));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
res
|
||||
}
|
||||
},
|
||||
SK_SET_MEM_TIMELINE_RECONCILE_TIMEOUT,
|
||||
)
|
||||
.await?;
|
||||
|
||||
for res in results.iter().flatten() {
|
||||
if res.current_conf.generation > config.generation {
|
||||
// Antoher switch_membership raced us.
|
||||
return Err(ApiError::Conflict(format!(
|
||||
"received configuration with generation {} from safekeeper, but expected {}",
|
||||
res.current_conf.generation, config.generation
|
||||
)));
|
||||
} else if res.current_conf.generation < config.generation {
|
||||
// Note: should never happen.
|
||||
// If we get a response, it should be at least the sent generation.
|
||||
tracing::error!(
|
||||
"received configuration with generation {} from safekeeper, but expected {}",
|
||||
res.current_conf.generation,
|
||||
config.generation
|
||||
);
|
||||
return Err(ApiError::InternalServerError(anyhow::anyhow!(
|
||||
"received configuration with generation {} from safekeeper, but expected {}",
|
||||
res.current_conf.generation,
|
||||
config.generation
|
||||
)));
|
||||
}
|
||||
}
|
||||
|
||||
Ok(results)
|
||||
}
|
||||
|
||||
/// Pull timeline to to_safekeepers from from_safekeepers with retries.
|
||||
///
|
||||
/// Returns Ok(()) only if all the pull_timeline requests were successful.
|
||||
async fn tenant_timeline_pull_from_peers(
|
||||
self: &Arc<Self>,
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
to_safekeepers: &[Safekeeper],
|
||||
from_safekeepers: &[Safekeeper],
|
||||
) -> Result<(), ApiError> {
|
||||
let http_hosts = from_safekeepers
|
||||
.iter()
|
||||
.map(|sk| sk.base_url())
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
tracing::info!(
|
||||
"pulling timeline to {:?} from {:?}",
|
||||
to_safekeepers
|
||||
.iter()
|
||||
.map(|sk| sk.get_id())
|
||||
.collect::<Vec<_>>(),
|
||||
from_safekeepers
|
||||
.iter()
|
||||
.map(|sk| sk.get_id())
|
||||
.collect::<Vec<_>>()
|
||||
);
|
||||
|
||||
// TODO(diko): need to pass mconf/generation with the request
|
||||
// to properly handle tombstones. Ignore tombstones for now.
|
||||
// Worst case: we leave a timeline on a safekeeper which is not in the current set.
|
||||
let req = PullTimelineRequest {
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
http_hosts,
|
||||
ignore_tombstone: Some(true),
|
||||
};
|
||||
|
||||
const SK_PULL_TIMELINE_RECONCILE_TIMEOUT: Duration = Duration::from_secs(30);
|
||||
|
||||
let responses = self
|
||||
.tenant_timeline_safekeeper_op(
|
||||
to_safekeepers,
|
||||
move |client| {
|
||||
let req = req.clone();
|
||||
async move { client.pull_timeline(&req).await }
|
||||
},
|
||||
SK_PULL_TIMELINE_RECONCILE_TIMEOUT,
|
||||
)
|
||||
.await?;
|
||||
|
||||
if let Some((idx, err)) = responses
|
||||
.iter()
|
||||
.enumerate()
|
||||
.find_map(|(idx, res)| Some((idx, res.as_ref().err()?)))
|
||||
{
|
||||
let sk_id = to_safekeepers[idx].get_id();
|
||||
return Err(ApiError::InternalServerError(anyhow::anyhow!(
|
||||
"pull_timeline to {sk_id} failed: {err}",
|
||||
)));
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Exclude a timeline from safekeepers in parallel with retries.
|
||||
/// If an exclude request is unsuccessful, it will be added to
|
||||
/// the reconciler, and after that the function will succeed.
|
||||
async fn tenant_timeline_safekeeper_exclude(
|
||||
self: &Arc<Self>,
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
safekeepers: &[Safekeeper],
|
||||
config: &membership::Configuration,
|
||||
) -> Result<(), ApiError> {
|
||||
let req = TimelineMembershipSwitchRequest {
|
||||
mconf: config.clone(),
|
||||
};
|
||||
|
||||
const SK_EXCLUDE_TIMELINE_TIMEOUT: Duration = Duration::from_secs(30);
|
||||
|
||||
let results = self
|
||||
.tenant_timeline_safekeeper_op(
|
||||
safekeepers,
|
||||
move |client| {
|
||||
let req = req.clone();
|
||||
async move { client.exclude_timeline(tenant_id, timeline_id, &req).await }
|
||||
},
|
||||
SK_EXCLUDE_TIMELINE_TIMEOUT,
|
||||
)
|
||||
.await?;
|
||||
|
||||
let mut reconcile_requests = Vec::new();
|
||||
|
||||
for (idx, res) in results.iter().enumerate() {
|
||||
if res.is_err() {
|
||||
let sk_id = safekeepers[idx].skp.id;
|
||||
let pending_op = TimelinePendingOpPersistence {
|
||||
tenant_id: tenant_id.to_string(),
|
||||
timeline_id: timeline_id.to_string(),
|
||||
generation: config.generation.into_inner() as i32,
|
||||
op_kind: SafekeeperTimelineOpKind::Exclude,
|
||||
sk_id,
|
||||
};
|
||||
tracing::info!("writing pending exclude op for sk id {sk_id}");
|
||||
self.persistence.insert_pending_op(pending_op).await?;
|
||||
|
||||
let req = ScheduleRequest {
|
||||
safekeeper: Box::new(safekeepers[idx].clone()),
|
||||
host_list: Vec::new(),
|
||||
tenant_id,
|
||||
timeline_id: Some(timeline_id),
|
||||
generation: config.generation.into_inner(),
|
||||
kind: SafekeeperTimelineOpKind::Exclude,
|
||||
};
|
||||
reconcile_requests.push(req);
|
||||
}
|
||||
}
|
||||
|
||||
if !reconcile_requests.is_empty() {
|
||||
let locked = self.inner.read().unwrap();
|
||||
for req in reconcile_requests {
|
||||
locked.safekeeper_reconcilers.schedule_request(req);
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Migrate timeline safekeeper set to a new set.
|
||||
///
|
||||
/// This function implements an algorithm from RFC-035.
|
||||
/// <https://github.com/neondatabase/neon/blob/main/docs/rfcs/035-safekeeper-dynamic-membership-change.md>
|
||||
pub(crate) async fn tenant_timeline_safekeeper_migrate(
|
||||
self: &Arc<Self>,
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
req: TimelineSafekeeperMigrateRequest,
|
||||
) -> Result<(), ApiError> {
|
||||
let all_safekeepers = self.inner.read().unwrap().safekeepers.clone();
|
||||
|
||||
let new_sk_set = req.new_sk_set;
|
||||
|
||||
for sk_id in new_sk_set.iter() {
|
||||
if !all_safekeepers.contains_key(sk_id) {
|
||||
return Err(ApiError::BadRequest(anyhow::anyhow!(
|
||||
"safekeeper {sk_id} does not exist"
|
||||
)));
|
||||
}
|
||||
}
|
||||
|
||||
// TODO(diko): per-tenant lock is too wide. Consider introducing per-timeline locks.
|
||||
let _tenant_lock = trace_shared_lock(
|
||||
&self.tenant_op_locks,
|
||||
tenant_id,
|
||||
TenantOperations::TimelineSafekeeperMigrate,
|
||||
)
|
||||
.await;
|
||||
|
||||
// 1. Fetch current timeline configuration from the configuration storage.
|
||||
|
||||
let timeline = self
|
||||
.persistence
|
||||
.get_timeline(tenant_id, timeline_id)
|
||||
.await?;
|
||||
|
||||
let Some(timeline) = timeline else {
|
||||
return Err(ApiError::NotFound(
|
||||
anyhow::anyhow!(
|
||||
"timeline {tenant_id}/{timeline_id} doesn't exist in timelines table"
|
||||
)
|
||||
.into(),
|
||||
));
|
||||
};
|
||||
|
||||
let cur_sk_set = timeline
|
||||
.sk_set
|
||||
.iter()
|
||||
.map(|&id| NodeId(id as u64))
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
tracing::info!(
|
||||
?cur_sk_set,
|
||||
?new_sk_set,
|
||||
"Migrating timeline to new safekeeper set",
|
||||
);
|
||||
|
||||
let mut generation = SafekeeperGeneration::new(timeline.generation as u32);
|
||||
|
||||
if let Some(ref presistent_new_sk_set) = timeline.new_sk_set {
|
||||
// 2. If it is already joint one and new_set is different from desired_set refuse to change.
|
||||
if presistent_new_sk_set
|
||||
.iter()
|
||||
.map(|&id| NodeId(id as u64))
|
||||
.ne(new_sk_set.iter().cloned())
|
||||
{
|
||||
tracing::info!(
|
||||
?presistent_new_sk_set,
|
||||
?new_sk_set,
|
||||
"different new safekeeper set is already set in the database",
|
||||
);
|
||||
return Err(ApiError::Conflict(format!(
|
||||
"the timeline is already migrating to a different safekeeper set: {presistent_new_sk_set:?}"
|
||||
)));
|
||||
}
|
||||
// It it is the same new_sk_set, we can continue the migration (retry).
|
||||
} else {
|
||||
// 3. No active migration yet.
|
||||
// Increment current generation and put desired_set to new_sk_set.
|
||||
generation = generation.next();
|
||||
|
||||
self.persistence
|
||||
.update_timeline_membership(
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
generation,
|
||||
&cur_sk_set,
|
||||
Some(&new_sk_set),
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
|
||||
let cur_safekeepers = self.get_safekeepers(&timeline.sk_set)?;
|
||||
let cur_sk_member_set = Self::make_member_set(&cur_safekeepers)?;
|
||||
|
||||
let new_sk_set_i64 = new_sk_set.iter().map(|id| id.0 as i64).collect::<Vec<_>>();
|
||||
let new_safekeepers = self.get_safekeepers(&new_sk_set_i64)?;
|
||||
let new_sk_member_set = Self::make_member_set(&new_safekeepers)?;
|
||||
|
||||
let joint_config = membership::Configuration {
|
||||
generation,
|
||||
members: cur_sk_member_set,
|
||||
new_members: Some(new_sk_member_set.clone()),
|
||||
};
|
||||
|
||||
// 4. Call PUT configuration on safekeepers from the current set,
|
||||
// delivering them joint_conf.
|
||||
|
||||
// Notify cplane/compute about the membership change BEFORE changing the membership on safekeepers.
|
||||
// This way the compute will know about new safekeepers from joint_config before we require to
|
||||
// collect a quorum from them.
|
||||
self.cplane_notify_safekeepers(tenant_id, timeline_id, &joint_config)
|
||||
.await?;
|
||||
|
||||
let results = self
|
||||
.tenant_timeline_set_membership_quorum(
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
&cur_safekeepers,
|
||||
&joint_config,
|
||||
None, // no min position
|
||||
)
|
||||
.await?;
|
||||
|
||||
let mut sync_position = (INITIAL_TERM, Lsn::INVALID);
|
||||
for res in results.into_iter().flatten() {
|
||||
let sk_position = (res.term, res.flush_lsn);
|
||||
if sync_position < sk_position {
|
||||
sync_position = sk_position;
|
||||
}
|
||||
}
|
||||
|
||||
tracing::info!(
|
||||
%generation,
|
||||
?sync_position,
|
||||
"safekeepers set membership updated",
|
||||
);
|
||||
|
||||
// 5. Initialize timeline on safekeeper(s) from new_sk_set where it doesn't exist yet
|
||||
// by doing pull_timeline from the majority of the current set.
|
||||
|
||||
// Filter out safekeepers which are already in the current set.
|
||||
let from_ids: HashSet<NodeId> = cur_safekeepers.iter().map(|sk| sk.get_id()).collect();
|
||||
let pull_to_safekeepers = new_safekeepers
|
||||
.iter()
|
||||
.filter(|sk| !from_ids.contains(&sk.get_id()))
|
||||
.cloned()
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
self.tenant_timeline_pull_from_peers(
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
&pull_to_safekeepers,
|
||||
&cur_safekeepers,
|
||||
)
|
||||
.await?;
|
||||
|
||||
// 6. Call POST bump_term(sync_term) on safekeepers from the new set. Success on majority is enough.
|
||||
|
||||
// TODO(diko): do we need to bump timeline term?
|
||||
|
||||
// 7. Repeatedly call PUT configuration on safekeepers from the new set,
|
||||
// delivering them joint_conf and collecting their positions.
|
||||
|
||||
tracing::info!(?sync_position, "waiting for safekeepers to sync position");
|
||||
|
||||
self.tenant_timeline_set_membership_quorum(
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
&new_safekeepers,
|
||||
&joint_config,
|
||||
Some(sync_position),
|
||||
)
|
||||
.await?;
|
||||
|
||||
// 8. Create new_conf: Configuration incrementing joint_conf generation and
|
||||
// having new safekeeper set as sk_set and None new_sk_set.
|
||||
|
||||
let generation = generation.next();
|
||||
|
||||
let new_conf = membership::Configuration {
|
||||
generation,
|
||||
members: new_sk_member_set,
|
||||
new_members: None,
|
||||
};
|
||||
|
||||
self.persistence
|
||||
.update_timeline_membership(tenant_id, timeline_id, generation, &new_sk_set, None)
|
||||
.await?;
|
||||
|
||||
// TODO(diko): at this point we have already updated the timeline in the database,
|
||||
// but we still need to notify safekeepers and cplane about the new configuration,
|
||||
// and put delition of the timeline from the old safekeepers into the reconciler.
|
||||
// Ideally it should be done atomically, but now it's not.
|
||||
// Worst case: the timeline is not deleted from old safekeepers,
|
||||
// the compute may require both quorums till the migration is retried and completed.
|
||||
|
||||
self.tenant_timeline_set_membership_quorum(
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
&new_safekeepers,
|
||||
&new_conf,
|
||||
None, // no min position
|
||||
)
|
||||
.await?;
|
||||
|
||||
let new_ids: HashSet<NodeId> = new_safekeepers.iter().map(|sk| sk.get_id()).collect();
|
||||
let exclude_safekeepers = cur_safekeepers
|
||||
.into_iter()
|
||||
.filter(|sk| !new_ids.contains(&sk.get_id()))
|
||||
.collect::<Vec<_>>();
|
||||
self.tenant_timeline_safekeeper_exclude(
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
&exclude_safekeepers,
|
||||
&new_conf,
|
||||
)
|
||||
.await?;
|
||||
|
||||
// Notify cplane/compute about the membership change AFTER changing the membership on safekeepers.
|
||||
// This way the compute will stop talking to excluded safekeepers only after we stop requiring to
|
||||
// collect a quorum from them.
|
||||
self.cplane_notify_safekeepers(tenant_id, timeline_id, &new_conf)
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Notify cplane about safekeeper membership change.
|
||||
/// The cplane will receive a joint set of safekeepers as a safekeeper list.
|
||||
async fn cplane_notify_safekeepers(
|
||||
&self,
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
mconf: &membership::Configuration,
|
||||
) -> Result<(), ApiError> {
|
||||
let mut safekeepers = Vec::new();
|
||||
let mut ids: HashSet<_> = HashSet::new();
|
||||
|
||||
for member in mconf
|
||||
.members
|
||||
.m
|
||||
.iter()
|
||||
.chain(mconf.new_members.iter().flat_map(|m| m.m.iter()))
|
||||
{
|
||||
if ids.insert(member.id) {
|
||||
safekeepers.push(compute_hook::SafekeeperInfo {
|
||||
id: member.id,
|
||||
hostname: Some(member.host.clone()),
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
self.compute_hook
|
||||
.notify_safekeepers(
|
||||
compute_hook::SafekeepersUpdate {
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
generation: mconf.generation,
|
||||
safekeepers,
|
||||
},
|
||||
&self.cancel,
|
||||
)
|
||||
.await
|
||||
.map_err(|err| {
|
||||
ApiError::InternalServerError(anyhow::anyhow!(
|
||||
"failed to notify cplane about safekeeper membership change: {err}"
|
||||
))
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1381,8 +1381,13 @@ impl TenantShard {
|
||||
.generation
|
||||
.expect("Attempted to enter attached state without a generation");
|
||||
|
||||
let wanted_conf =
|
||||
attached_location_conf(generation, &self.shard, &self.config, &self.policy);
|
||||
let wanted_conf = attached_location_conf(
|
||||
generation,
|
||||
&self.shard,
|
||||
&self.config,
|
||||
&self.policy,
|
||||
self.intent.get_secondary().len(),
|
||||
);
|
||||
match self.observed.locations.get(&node_id) {
|
||||
Some(conf) if conf.conf.as_ref() == Some(&wanted_conf) => {}
|
||||
Some(_) | None => {
|
||||
@@ -3003,21 +3008,18 @@ pub(crate) mod tests {
|
||||
|
||||
if attachments_in_wrong_az > 0 {
|
||||
violations.push(format!(
|
||||
"{} attachments scheduled to the incorrect AZ",
|
||||
attachments_in_wrong_az
|
||||
"{attachments_in_wrong_az} attachments scheduled to the incorrect AZ"
|
||||
));
|
||||
}
|
||||
|
||||
if secondaries_in_wrong_az > 0 {
|
||||
violations.push(format!(
|
||||
"{} secondaries scheduled to the incorrect AZ",
|
||||
secondaries_in_wrong_az
|
||||
"{secondaries_in_wrong_az} secondaries scheduled to the incorrect AZ"
|
||||
));
|
||||
}
|
||||
|
||||
eprintln!(
|
||||
"attachments_in_wrong_az={} secondaries_in_wrong_az={}",
|
||||
attachments_in_wrong_az, secondaries_in_wrong_az
|
||||
"attachments_in_wrong_az={attachments_in_wrong_az} secondaries_in_wrong_az={secondaries_in_wrong_az}"
|
||||
);
|
||||
|
||||
for (node_id, stats) in &node_stats {
|
||||
|
||||
@@ -195,7 +195,7 @@ impl UpcallClient {
|
||||
let authorization_header = config
|
||||
.control_plane_jwt_token
|
||||
.clone()
|
||||
.map(|jwt| format!("Bearer {}", jwt));
|
||||
.map(|jwt| format!("Bearer {jwt}"));
|
||||
|
||||
let client = reqwest::ClientBuilder::new()
|
||||
.timeout(IMPORT_COMPLETE_REQUEST_TIMEOUT)
|
||||
|
||||
Reference in New Issue
Block a user