mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-29 19:10:38 +00:00
Compare commits
6 Commits
thesuhas/m
...
diko/safek
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
c53b4545c8 | ||
|
|
e48ac9ed76 | ||
|
|
94cb9a79d9 | ||
|
|
961835add6 | ||
|
|
fc242afcc2 | ||
|
|
e275221aef |
@@ -1,16 +1,10 @@
|
||||
use std::fs::File;
|
||||
use std::sync::Arc;
|
||||
use std::thread;
|
||||
use std::{path::Path, sync::Arc};
|
||||
|
||||
use anyhow::Result;
|
||||
use compute_api::responses::ComputeStatus;
|
||||
use compute_api::spec::ComputeSpec;
|
||||
use tracing::{error, info, instrument};
|
||||
|
||||
use crate::{
|
||||
compute::{ComputeNode, ParsedSpec},
|
||||
spec::get_config_from_control_plane,
|
||||
};
|
||||
use crate::compute::ComputeNode;
|
||||
|
||||
#[instrument(skip_all)]
|
||||
fn configurator_main_loop(compute: &Arc<ComputeNode>) {
|
||||
@@ -18,27 +12,12 @@ fn configurator_main_loop(compute: &Arc<ComputeNode>) {
|
||||
loop {
|
||||
let mut state = compute.state.lock().unwrap();
|
||||
|
||||
if compute.params.lakebase_mode {
|
||||
/* BEGIN_HADRON */
|
||||
// RefreshConfiguration should only be used inside the loop
|
||||
assert_ne!(state.status, ComputeStatus::RefreshConfiguration);
|
||||
/* END_HADRON */
|
||||
|
||||
while state.status != ComputeStatus::ConfigurationPending
|
||||
&& state.status != ComputeStatus::RefreshConfigurationPending
|
||||
&& state.status != ComputeStatus::Failed
|
||||
{
|
||||
info!("configurator: compute status: {:?}, sleeping", state.status);
|
||||
state = compute.state_changed.wait(state).unwrap();
|
||||
}
|
||||
} else {
|
||||
// We have to re-check the status after re-acquiring the lock because it could be that
|
||||
// the status has changed while we were waiting for the lock, and we might not need to
|
||||
// wait on the condition variable. Otherwise, we might end up in some soft-/deadlock, i.e.
|
||||
// we are waiting for a condition variable that will never be signaled.
|
||||
if state.status != ComputeStatus::ConfigurationPending {
|
||||
state = compute.state_changed.wait(state).unwrap();
|
||||
}
|
||||
// We have to re-check the status after re-acquiring the lock because it could be that
|
||||
// the status has changed while we were waiting for the lock, and we might not need to
|
||||
// wait on the condition variable. Otherwise, we might end up in some soft-/deadlock, i.e.
|
||||
// we are waiting for a condition variable that will never be signaled.
|
||||
if state.status != ComputeStatus::ConfigurationPending {
|
||||
state = compute.state_changed.wait(state).unwrap();
|
||||
}
|
||||
|
||||
// Re-check the status after waking up
|
||||
@@ -47,146 +26,17 @@ fn configurator_main_loop(compute: &Arc<ComputeNode>) {
|
||||
state.set_status(ComputeStatus::Configuration, &compute.state_changed);
|
||||
drop(state);
|
||||
|
||||
let mut _new_status = ComputeStatus::Failed;
|
||||
let mut new_status = ComputeStatus::Failed;
|
||||
if let Err(e) = compute.reconfigure() {
|
||||
error!("could not configure compute node: {}", e);
|
||||
// TODO(BRC-1726): Remove this panic once we fix the state machine to allow futher
|
||||
// configuration attempts after a failed configuration attempt.
|
||||
error!("Compute node exiting due to configuration failure.");
|
||||
std::process::exit(1);
|
||||
} else {
|
||||
_new_status = ComputeStatus::Running;
|
||||
new_status = ComputeStatus::Running;
|
||||
info!("compute node configured");
|
||||
}
|
||||
|
||||
// XXX: used to test that API is blocking
|
||||
// std::thread::sleep(std::time::Duration::from_millis(10000));
|
||||
|
||||
compute.set_status(_new_status);
|
||||
} else if state.status == ComputeStatus::RefreshConfigurationPending {
|
||||
info!(
|
||||
"compute node suspects its configuration is out of date, now refreshing configuration"
|
||||
);
|
||||
state.set_status(ComputeStatus::RefreshConfiguration, &compute.state_changed);
|
||||
// Drop the lock guard here to avoid holding the lock while downloading spec from the control plane / HCC.
|
||||
// This is the only thread that can move compute_ctl out of the `RefreshConfiguration` state, so it
|
||||
// is safe to drop the lock like this.
|
||||
drop(state);
|
||||
|
||||
let get_spec_result: anyhow::Result<Option<ComputeSpec>> =
|
||||
if let Some(sp) = &compute.params.spec_path_test_only {
|
||||
// This path is only to make testing easier. In production we always get the spec from the HCM.
|
||||
info!("reloading spec.json from path: {:?}", sp);
|
||||
let path = Path::new(sp);
|
||||
if let Ok(file) = File::open(path) {
|
||||
match serde_json::from_reader(file) {
|
||||
Ok(spec) => Ok(Some(spec)),
|
||||
Err(e) => {
|
||||
error!("could not parse spec file: {}", e);
|
||||
Err(anyhow::anyhow!("could not parse spec file: {}", e))
|
||||
}
|
||||
}
|
||||
} else {
|
||||
error!("could not open spec file at path: {:?}", sp);
|
||||
Err(anyhow::anyhow!(
|
||||
"could not open spec file at path: {:?}",
|
||||
sp
|
||||
))
|
||||
}
|
||||
} else if let Some(control_plane_uri) = &compute.params.control_plane_uri {
|
||||
get_config_from_control_plane(control_plane_uri, &compute.params.compute_id).map(
|
||||
|(spec_opt, _)| {
|
||||
info!("got spec from control plane: {:?}", spec_opt);
|
||||
spec_opt
|
||||
},
|
||||
)
|
||||
} else {
|
||||
Err(anyhow::anyhow!("spec_path_test_only is not set"))
|
||||
};
|
||||
|
||||
// Parse any received ComputeSpec and transpose the result into a Result<Option<ParsedSpec>>.
|
||||
let parsed_spec_result: Result<Option<ParsedSpec>> = get_spec_result.and_then(|spec| {
|
||||
if let Some(spec) = spec {
|
||||
if let Ok(pspec) = ParsedSpec::try_from(spec) {
|
||||
Ok(Some(pspec))
|
||||
} else {
|
||||
Err(anyhow::anyhow!("could not parse spec"))
|
||||
}
|
||||
} else {
|
||||
Ok(None)
|
||||
}
|
||||
});
|
||||
|
||||
let new_status: ComputeStatus;
|
||||
match parsed_spec_result {
|
||||
// Control plane (HCM) returned a spec and we were able to parse it.
|
||||
Ok(Some(pspec)) => {
|
||||
{
|
||||
let mut state = compute.state.lock().unwrap();
|
||||
// Defensive programming to make sure this thread is indeed the only one that can move the compute
|
||||
// node out of the `RefreshConfiguration` state. Would be nice if we can encode this invariant
|
||||
// into the type system.
|
||||
assert_eq!(state.status, ComputeStatus::RefreshConfiguration);
|
||||
|
||||
if state.pspec.as_ref().map(|ps| ps.pageserver_connstr.clone())
|
||||
== Some(pspec.pageserver_connstr.clone())
|
||||
{
|
||||
info!(
|
||||
"Refresh configuration: Retrieved spec is the same as the current spec. Waiting for control plane to update the spec before attempting reconfiguration."
|
||||
);
|
||||
state.status = ComputeStatus::Running;
|
||||
compute.state_changed.notify_all();
|
||||
drop(state);
|
||||
std::thread::sleep(std::time::Duration::from_secs(5));
|
||||
continue;
|
||||
}
|
||||
// state.pspec is consumed by compute.reconfigure() below. Note that compute.reconfigure() will acquire
|
||||
// the compute.state lock again so we need to have the lock guard go out of scope here. We could add a
|
||||
// "locked" variant of compute.reconfigure() that takes the lock guard as an argument to make this cleaner,
|
||||
// but it's not worth forking the codebase too much for this minor point alone right now.
|
||||
ComputeNode::set_spec(&compute.params, &mut state, pspec);
|
||||
}
|
||||
match compute.reconfigure() {
|
||||
Ok(_) => {
|
||||
info!("Refresh configuration: compute node configured");
|
||||
new_status = ComputeStatus::Running;
|
||||
}
|
||||
Err(e) => {
|
||||
error!(
|
||||
"Refresh configuration: could not configure compute node: {}",
|
||||
e
|
||||
);
|
||||
// Set the compute node back to the `RefreshConfigurationPending` state if the configuration
|
||||
// was not successful. It should be okay to treat this situation the same as if the loop
|
||||
// hasn't executed yet as long as the detection side keeps notifying.
|
||||
new_status = ComputeStatus::RefreshConfigurationPending;
|
||||
}
|
||||
}
|
||||
}
|
||||
// Control plane (HCM)'s response does not contain a spec. This is the "Empty" attachment case.
|
||||
Ok(None) => {
|
||||
info!(
|
||||
"Compute Manager signaled that this compute is no longer attached to any storage. Exiting."
|
||||
);
|
||||
// We just immediately terminate the whole compute_ctl in this case. It's not necessary to attempt a
|
||||
// clean shutdown as Postgres is probably not responding anyway (which is why we are in this refresh
|
||||
// configuration state).
|
||||
std::process::exit(1);
|
||||
}
|
||||
// Various error cases:
|
||||
// - The request to the control plane (HCM) either failed or returned a malformed spec.
|
||||
// - compute_ctl itself is configured incorrectly (e.g., compute_id is not set).
|
||||
Err(e) => {
|
||||
error!(
|
||||
"Refresh configuration: error getting a parsed spec: {:?}",
|
||||
e
|
||||
);
|
||||
new_status = ComputeStatus::RefreshConfigurationPending;
|
||||
// We may be dealing with an overloaded HCM if we end up in this path. Backoff 5 seconds before
|
||||
// retrying to avoid hammering the HCM.
|
||||
std::thread::sleep(std::time::Duration::from_secs(5));
|
||||
}
|
||||
}
|
||||
compute.set_status(new_status);
|
||||
} else if state.status == ComputeStatus::Failed {
|
||||
info!("compute node is now in Failed state, exiting");
|
||||
|
||||
60
compute_tools/src/hadron_metrics.rs
Normal file
60
compute_tools/src/hadron_metrics.rs
Normal file
@@ -0,0 +1,60 @@
|
||||
use metrics::{
|
||||
IntCounter, IntGaugeVec, core::Collector, proto::MetricFamily, register_int_counter,
|
||||
register_int_gauge_vec,
|
||||
};
|
||||
use once_cell::sync::Lazy;
|
||||
|
||||
// Counter keeping track of the number of PageStream request errors reported by Postgres.
|
||||
// An error is registered every time Postgres calls compute_ctl's /refresh_configuration API.
|
||||
// Postgres will invoke this API if it detected trouble with PageStream requests (get_page@lsn,
|
||||
// get_base_backup, etc.) it sends to any pageserver. An increase in this counter value typically
|
||||
// indicates Postgres downtime, as PageStream requests are critical for Postgres to function.
|
||||
pub static POSTGRES_PAGESTREAM_REQUEST_ERRORS: Lazy<IntCounter> = Lazy::new(|| {
|
||||
register_int_counter!(
|
||||
"pg_cctl_pagestream_request_errors_total",
|
||||
"Number of PageStream request errors reported by the postgres process"
|
||||
)
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
// Counter keeping track of the number of compute configuration errors due to Postgres statement
|
||||
// timeouts. An error is registered every time `ComputeNode::reconfigure()` fails due to Postgres
|
||||
// error code 57014 (query cancelled). This statement timeout typically occurs when postgres is
|
||||
// stuck in a problematic retry loop when the PS is reject its connection requests (usually due
|
||||
// to PG pointing at the wrong PS). We should investigate the root cause when this counter value
|
||||
// increases by checking PG and PS logs.
|
||||
pub static COMPUTE_CONFIGURE_STATEMENT_TIMEOUT_ERRORS: Lazy<IntCounter> = Lazy::new(|| {
|
||||
register_int_counter!(
|
||||
"pg_cctl_configure_statement_timeout_errors_total",
|
||||
"Number of compute configuration errors due to Postgres statement timeouts."
|
||||
)
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
pub static COMPUTE_ATTACHED: Lazy<IntGaugeVec> = Lazy::new(|| {
|
||||
register_int_gauge_vec!(
|
||||
"pg_cctl_attached",
|
||||
"Compute node attached status (1 if attached)",
|
||||
&[
|
||||
"pg_compute_id",
|
||||
"pg_instance_id",
|
||||
"tenant_id",
|
||||
"timeline_id"
|
||||
]
|
||||
)
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
pub fn collect() -> Vec<MetricFamily> {
|
||||
let mut metrics = Vec::new();
|
||||
metrics.extend(POSTGRES_PAGESTREAM_REQUEST_ERRORS.collect());
|
||||
metrics.extend(COMPUTE_CONFIGURE_STATEMENT_TIMEOUT_ERRORS.collect());
|
||||
metrics.extend(COMPUTE_ATTACHED.collect());
|
||||
metrics
|
||||
}
|
||||
|
||||
pub fn initialize_metrics() {
|
||||
Lazy::force(&POSTGRES_PAGESTREAM_REQUEST_ERRORS);
|
||||
Lazy::force(&COMPUTE_CONFIGURE_STATEMENT_TIMEOUT_ERRORS);
|
||||
Lazy::force(&COMPUTE_ATTACHED);
|
||||
}
|
||||
@@ -16,6 +16,7 @@ pub mod compute_prewarm;
|
||||
pub mod compute_promote;
|
||||
pub mod disk_quota;
|
||||
pub mod extension_server;
|
||||
pub mod hadron_metrics;
|
||||
pub mod installed_extensions;
|
||||
pub mod local_proxy;
|
||||
pub mod lsn_lease;
|
||||
|
||||
@@ -97,8 +97,6 @@ pub struct EndpointConf {
|
||||
reconfigure_concurrency: usize,
|
||||
drop_subscriptions_before_start: bool,
|
||||
features: Vec<ComputeFeature>,
|
||||
compute_id: String,
|
||||
instance_id: Option<String>,
|
||||
cluster: Option<Cluster>,
|
||||
compute_ctl_config: ComputeCtlConfig,
|
||||
privileged_role_name: Option<String>,
|
||||
@@ -201,8 +199,6 @@ impl ComputeControlPlane {
|
||||
mode: ComputeMode,
|
||||
grpc: bool,
|
||||
skip_pg_catalog_updates: bool,
|
||||
compute_id: &str,
|
||||
instance_id: Option<String>,
|
||||
drop_subscriptions_before_start: bool,
|
||||
privileged_role_name: Option<String>,
|
||||
) -> Result<Arc<Endpoint>> {
|
||||
@@ -240,8 +236,6 @@ impl ComputeControlPlane {
|
||||
grpc,
|
||||
reconfigure_concurrency: 1,
|
||||
features: vec![],
|
||||
compute_id: compute_id.to_owned(),
|
||||
instance_id: instance_id.clone(),
|
||||
cluster: None,
|
||||
compute_ctl_config: compute_ctl_config.clone(),
|
||||
privileged_role_name: privileged_role_name.clone(),
|
||||
@@ -264,8 +258,6 @@ impl ComputeControlPlane {
|
||||
drop_subscriptions_before_start,
|
||||
reconfigure_concurrency: 1,
|
||||
features: vec![],
|
||||
compute_id: compute_id.to_string(),
|
||||
instance_id: instance_id.clone(),
|
||||
cluster: None,
|
||||
compute_ctl_config,
|
||||
privileged_role_name,
|
||||
@@ -339,13 +331,6 @@ pub struct Endpoint {
|
||||
reconfigure_concurrency: usize,
|
||||
// Feature flags
|
||||
features: Vec<ComputeFeature>,
|
||||
|
||||
// The compute_id is used to identify the compute node in the cloud.
|
||||
compute_id: String,
|
||||
|
||||
// Hadron database instance id used for PG authentication and logs
|
||||
instance_id: Option<String>,
|
||||
|
||||
// Cluster settings
|
||||
cluster: Option<Cluster>,
|
||||
|
||||
@@ -410,7 +395,6 @@ pub struct EndpointStartArgs {
|
||||
pub autoprewarm: bool,
|
||||
pub offload_lfc_interval_seconds: Option<std::num::NonZeroU64>,
|
||||
pub dev: bool,
|
||||
pub pg_init_timeout: Option<Duration>,
|
||||
}
|
||||
|
||||
impl Endpoint {
|
||||
@@ -453,8 +437,6 @@ impl Endpoint {
|
||||
reconfigure_concurrency: conf.reconfigure_concurrency,
|
||||
drop_subscriptions_before_start: conf.drop_subscriptions_before_start,
|
||||
features: conf.features,
|
||||
compute_id: conf.compute_id,
|
||||
instance_id: conf.instance_id,
|
||||
cluster: conf.cluster,
|
||||
compute_ctl_config: conf.compute_ctl_config,
|
||||
privileged_role_name: conf.privileged_role_name,
|
||||
@@ -499,7 +481,7 @@ impl Endpoint {
|
||||
conf.append("restart_after_crash", "off");
|
||||
|
||||
// Load the 'neon' extension
|
||||
conf.append("shared_preload_libraries", "neon, databricks_auth");
|
||||
conf.append("shared_preload_libraries", "neon");
|
||||
|
||||
conf.append_line("");
|
||||
// Replication-related configurations, such as WAL sending
|
||||
@@ -803,7 +785,6 @@ impl Endpoint {
|
||||
shard_stripe_size: Some(args.shard_stripe_size),
|
||||
local_proxy_config: None,
|
||||
reconfigure_concurrency: self.reconfigure_concurrency,
|
||||
databricks_settings: None,
|
||||
drop_subscriptions_before_start: self.drop_subscriptions_before_start,
|
||||
audit_log_level: ComputeAudit::Disabled,
|
||||
logs_export_host: None::<String>,
|
||||
|
||||
@@ -173,11 +173,6 @@ pub enum ComputeStatus {
|
||||
TerminationPendingImmediate,
|
||||
// Terminated Postgres
|
||||
Terminated,
|
||||
// A spec refresh is being requested
|
||||
RefreshConfigurationPending,
|
||||
// A spec refresh is being applied. We cannot refresh configuration again until the current
|
||||
// refresh is done, i.e., signal_refresh_configuration() will return 500 error.
|
||||
RefreshConfiguration,
|
||||
}
|
||||
|
||||
#[derive(Deserialize, Serialize)]
|
||||
@@ -190,10 +185,6 @@ impl Display for ComputeStatus {
|
||||
match self {
|
||||
ComputeStatus::Empty => f.write_str("empty"),
|
||||
ComputeStatus::ConfigurationPending => f.write_str("configuration-pending"),
|
||||
ComputeStatus::RefreshConfigurationPending => {
|
||||
f.write_str("refresh-configuration-pending")
|
||||
}
|
||||
ComputeStatus::RefreshConfiguration => f.write_str("refresh-configuration"),
|
||||
ComputeStatus::Init => f.write_str("init"),
|
||||
ComputeStatus::Running => f.write_str("running"),
|
||||
ComputeStatus::Configuration => f.write_str("configuration"),
|
||||
@@ -294,15 +285,10 @@ pub struct TlsConfig {
|
||||
}
|
||||
|
||||
/// Response of the `/computes/{compute_id}/spec` control-plane API.
|
||||
/// This is not actually a compute API response, so consider moving
|
||||
/// to a different place.
|
||||
#[derive(Deserialize, Debug)]
|
||||
pub struct ControlPlaneConfigResponse {
|
||||
pub spec: Option<ComputeSpec>,
|
||||
pub status: ControlPlaneComputeStatus,
|
||||
// Hadron: Deserialize this field into a harmless default if
|
||||
// compute_ctl_config is not present for compatibility.
|
||||
#[serde(default)]
|
||||
pub compute_ctl_config: ComputeCtlConfig,
|
||||
}
|
||||
|
||||
|
||||
@@ -301,7 +301,12 @@ pub struct PullTimelineRequest {
|
||||
pub tenant_id: TenantId,
|
||||
pub timeline_id: TimelineId,
|
||||
pub http_hosts: Vec<String>,
|
||||
pub ignore_tombstone: Option<bool>,
|
||||
/// Membership configuration to switch to after pull.
|
||||
/// It guarantees that if pull_timeline returns successfully, the timeline will
|
||||
/// not be deleted by request with an older generation.
|
||||
/// Storage controller always sets this field.
|
||||
/// None is only allowed for manual pull_timeline requests.
|
||||
pub mconf: Option<Configuration>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
|
||||
@@ -178,6 +178,8 @@ static PageServer page_servers[MAX_SHARDS];
|
||||
static bool pageserver_flush(shardno_t shard_no);
|
||||
static void pageserver_disconnect(shardno_t shard_no);
|
||||
static void pageserver_disconnect_shard(shardno_t shard_no);
|
||||
// HADRON
|
||||
shardno_t get_num_shards(void);
|
||||
|
||||
static bool
|
||||
PagestoreShmemIsValid(void)
|
||||
@@ -286,6 +288,22 @@ AssignPageserverConnstring(const char *newval, void *extra)
|
||||
}
|
||||
}
|
||||
|
||||
/* BEGIN_HADRON */
|
||||
/**
|
||||
* Return the total number of shards seen in the shard map.
|
||||
*/
|
||||
shardno_t get_num_shards(void)
|
||||
{
|
||||
const ShardMap *shard_map;
|
||||
|
||||
Assert(pagestore_shared);
|
||||
shard_map = &pagestore_shared->shard_map;
|
||||
|
||||
Assert(shard_map != NULL);
|
||||
return shard_map->num_shards;
|
||||
}
|
||||
/* END_HADRON */
|
||||
|
||||
/*
|
||||
* Get the current number of shards, and/or the connection string for a
|
||||
* particular shard from the shard map in shared memory.
|
||||
|
||||
@@ -110,6 +110,9 @@ static void rm_safekeeper_event_set(Safekeeper *to_remove, bool is_sk);
|
||||
|
||||
static void CheckGracefulShutdown(WalProposer *wp);
|
||||
|
||||
// HADRON
|
||||
shardno_t get_num_shards(void);
|
||||
|
||||
static void
|
||||
init_walprop_config(bool syncSafekeepers)
|
||||
{
|
||||
@@ -646,18 +649,19 @@ walprop_pg_get_shmem_state(WalProposer *wp)
|
||||
* Record new ps_feedback in the array with shards and update min_feedback.
|
||||
*/
|
||||
static PageserverFeedback
|
||||
record_pageserver_feedback(PageserverFeedback *ps_feedback)
|
||||
record_pageserver_feedback(PageserverFeedback *ps_feedback, shardno_t num_shards)
|
||||
{
|
||||
PageserverFeedback min_feedback;
|
||||
|
||||
Assert(ps_feedback->present);
|
||||
Assert(ps_feedback->shard_number < MAX_SHARDS);
|
||||
Assert(ps_feedback->shard_number < num_shards);
|
||||
|
||||
SpinLockAcquire(&walprop_shared->mutex);
|
||||
|
||||
/* Update the number of shards */
|
||||
if (ps_feedback->shard_number + 1 > walprop_shared->num_shards)
|
||||
walprop_shared->num_shards = ps_feedback->shard_number + 1;
|
||||
// Hadron: Update the num_shards from the source-of-truth (shard map) lazily when we receive
|
||||
// a new pageserver feedback.
|
||||
walprop_shared->num_shards = Max(walprop_shared->num_shards, num_shards);
|
||||
|
||||
/* Update the feedback */
|
||||
memcpy(&walprop_shared->shard_ps_feedback[ps_feedback->shard_number], ps_feedback, sizeof(PageserverFeedback));
|
||||
@@ -2023,19 +2027,43 @@ walprop_pg_process_safekeeper_feedback(WalProposer *wp, Safekeeper *sk)
|
||||
if (wp->config->syncSafekeepers)
|
||||
return;
|
||||
|
||||
|
||||
/* handle fresh ps_feedback */
|
||||
if (sk->appendResponse.ps_feedback.present)
|
||||
{
|
||||
PageserverFeedback min_feedback = record_pageserver_feedback(&sk->appendResponse.ps_feedback);
|
||||
shardno_t num_shards = get_num_shards();
|
||||
|
||||
/* Only one main shard sends non-zero currentClusterSize */
|
||||
if (sk->appendResponse.ps_feedback.currentClusterSize > 0)
|
||||
SetNeonCurrentClusterSize(sk->appendResponse.ps_feedback.currentClusterSize);
|
||||
|
||||
if (min_feedback.disk_consistent_lsn != standby_apply_lsn)
|
||||
// During shard split, we receive ps_feedback from child shards before
|
||||
// the split commits and our shard map GUC has been updated. We must
|
||||
// filter out such feedback here because record_pageserver_feedback()
|
||||
// doesn't do it.
|
||||
//
|
||||
// NB: what we would actually want to happen is that we only receive
|
||||
// ps_feedback from the parent shards when the split is committed, then
|
||||
// apply the split to our set of tracked feedback and from here on only
|
||||
// receive ps_feedback from child shards. This filter condition doesn't
|
||||
// do that: if we split from N parent to 2N child shards, the first N
|
||||
// child shards' feedback messages will pass this condition, even before
|
||||
// the split is committed. That's a bit sloppy, but OK for now.
|
||||
if (sk->appendResponse.ps_feedback.shard_number < num_shards)
|
||||
{
|
||||
standby_apply_lsn = min_feedback.disk_consistent_lsn;
|
||||
needToAdvanceSlot = true;
|
||||
PageserverFeedback min_feedback = record_pageserver_feedback(&sk->appendResponse.ps_feedback, num_shards);
|
||||
|
||||
/* Only one main shard sends non-zero currentClusterSize */
|
||||
if (sk->appendResponse.ps_feedback.currentClusterSize > 0)
|
||||
SetNeonCurrentClusterSize(sk->appendResponse.ps_feedback.currentClusterSize);
|
||||
|
||||
if (min_feedback.disk_consistent_lsn != standby_apply_lsn)
|
||||
{
|
||||
standby_apply_lsn = min_feedback.disk_consistent_lsn;
|
||||
needToAdvanceSlot = true;
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
// HADRON
|
||||
elog(DEBUG2, "Ignoring pageserver feedback for unknown shard %d (current shard number %d)",
|
||||
sk->appendResponse.ps_feedback.shard_number, num_shards);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -161,9 +161,9 @@ pub async fn handle_request(
|
||||
FileStorage::create_new(&tli_dir_path, new_state.clone(), conf.no_sync).await?;
|
||||
|
||||
// now we have a ready timeline in a temp directory
|
||||
validate_temp_timeline(conf, request.destination_ttid, &tli_dir_path).await?;
|
||||
validate_temp_timeline(conf, request.destination_ttid, &tli_dir_path, None).await?;
|
||||
global_timelines
|
||||
.load_temp_timeline(request.destination_ttid, &tli_dir_path, true)
|
||||
.load_temp_timeline(request.destination_ttid, &tli_dir_path, None)
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
|
||||
@@ -193,7 +193,7 @@ pub async fn hcc_pull_timeline(
|
||||
tenant_id: timeline.tenant_id,
|
||||
timeline_id: timeline.timeline_id,
|
||||
http_hosts: Vec::new(),
|
||||
ignore_tombstone: None,
|
||||
mconf: None,
|
||||
};
|
||||
for host in timeline.peers {
|
||||
if host.0 == conf.my_id.0 {
|
||||
|
||||
@@ -352,7 +352,7 @@ async fn timeline_exclude_handler(mut request: Request<Body>) -> Result<Response
|
||||
// instead.
|
||||
if data.mconf.contains(my_id) {
|
||||
return Err(ApiError::Forbidden(format!(
|
||||
"refused to switch into {}, node {} is member of it",
|
||||
"refused to exclude timeline with {}, node {} is member of it",
|
||||
data.mconf, my_id
|
||||
)));
|
||||
}
|
||||
|
||||
@@ -13,8 +13,8 @@ use http_utils::error::ApiError;
|
||||
use postgres_ffi::{PG_TLI, XLogFileName, XLogSegNo};
|
||||
use remote_storage::GenericRemoteStorage;
|
||||
use reqwest::Certificate;
|
||||
use safekeeper_api::Term;
|
||||
use safekeeper_api::models::{PullTimelineRequest, PullTimelineResponse, TimelineStatus};
|
||||
use safekeeper_api::{Term, membership};
|
||||
use safekeeper_client::mgmt_api;
|
||||
use safekeeper_client::mgmt_api::Client;
|
||||
use serde::Deserialize;
|
||||
@@ -453,12 +453,40 @@ pub async fn handle_request(
|
||||
global_timelines: Arc<GlobalTimelines>,
|
||||
wait_for_peer_timeline_status: bool,
|
||||
) -> Result<PullTimelineResponse, ApiError> {
|
||||
if let Some(mconf) = &request.mconf {
|
||||
let sk_id = global_timelines.get_sk_id();
|
||||
if !mconf.contains(sk_id) {
|
||||
return Err(ApiError::BadRequest(anyhow!(
|
||||
"refused to pull timeline with {mconf}, node {sk_id} is not member of it",
|
||||
)));
|
||||
}
|
||||
}
|
||||
|
||||
let existing_tli = global_timelines.get(TenantTimelineId::new(
|
||||
request.tenant_id,
|
||||
request.timeline_id,
|
||||
));
|
||||
if existing_tli.is_ok() {
|
||||
info!("Timeline {} already exists", request.timeline_id);
|
||||
if let Ok(timeline) = existing_tli {
|
||||
let cur_generation = timeline
|
||||
.read_shared_state()
|
||||
.await
|
||||
.sk
|
||||
.state()
|
||||
.mconf
|
||||
.generation;
|
||||
|
||||
info!(
|
||||
"Timeline {} already exists with generation {cur_generation}",
|
||||
request.timeline_id,
|
||||
);
|
||||
|
||||
if let Some(mconf) = request.mconf {
|
||||
timeline
|
||||
.membership_switch(mconf)
|
||||
.await
|
||||
.map_err(|e| ApiError::InternalServerError(anyhow::anyhow!(e)))?;
|
||||
}
|
||||
|
||||
return Ok(PullTimelineResponse {
|
||||
safekeeper_host: None,
|
||||
});
|
||||
@@ -495,6 +523,19 @@ pub async fn handle_request(
|
||||
for (i, response) in responses.into_iter().enumerate() {
|
||||
match response {
|
||||
Ok(status) => {
|
||||
if let Some(mconf) = &request.mconf {
|
||||
if status.mconf.generation > mconf.generation {
|
||||
// We probably raced with another timeline membership change with higher generation.
|
||||
// Ignore this request.
|
||||
return Err(ApiError::Conflict(format!(
|
||||
"cannot pull timeline with generation {}: timeline {} already exists with generation {} on {}",
|
||||
mconf.generation,
|
||||
request.timeline_id,
|
||||
status.mconf.generation,
|
||||
http_hosts[i],
|
||||
)));
|
||||
}
|
||||
}
|
||||
statuses.push((status, i));
|
||||
}
|
||||
Err(e) => {
|
||||
@@ -593,15 +634,13 @@ pub async fn handle_request(
|
||||
assert!(status.tenant_id == request.tenant_id);
|
||||
assert!(status.timeline_id == request.timeline_id);
|
||||
|
||||
let check_tombstone = !request.ignore_tombstone.unwrap_or_default();
|
||||
|
||||
match pull_timeline(
|
||||
status,
|
||||
safekeeper_host,
|
||||
sk_auth_token,
|
||||
http_client,
|
||||
global_timelines,
|
||||
check_tombstone,
|
||||
request.mconf,
|
||||
)
|
||||
.await
|
||||
{
|
||||
@@ -611,6 +650,10 @@ pub async fn handle_request(
|
||||
Some(TimelineError::AlreadyExists(_)) => Ok(PullTimelineResponse {
|
||||
safekeeper_host: None,
|
||||
}),
|
||||
Some(TimelineError::Deleted(_)) => Err(ApiError::Conflict(format!(
|
||||
"Timeline {}/{} deleted",
|
||||
request.tenant_id, request.timeline_id
|
||||
))),
|
||||
Some(TimelineError::CreationInProgress(_)) => {
|
||||
// We don't return success here because creation might still fail.
|
||||
Err(ApiError::Conflict("Creation in progress".to_owned()))
|
||||
@@ -627,7 +670,7 @@ async fn pull_timeline(
|
||||
sk_auth_token: Option<SecretString>,
|
||||
http_client: reqwest::Client,
|
||||
global_timelines: Arc<GlobalTimelines>,
|
||||
check_tombstone: bool,
|
||||
mconf: Option<membership::Configuration>,
|
||||
) -> Result<PullTimelineResponse> {
|
||||
let ttid = TenantTimelineId::new(status.tenant_id, status.timeline_id);
|
||||
info!(
|
||||
@@ -689,8 +732,11 @@ async fn pull_timeline(
|
||||
// fsync temp timeline directory to remember its contents.
|
||||
fsync_async_opt(&tli_dir_path, !conf.no_sync).await?;
|
||||
|
||||
let generation = mconf.as_ref().map(|c| c.generation);
|
||||
|
||||
// Let's create timeline from temp directory and verify that it's correct
|
||||
let (commit_lsn, flush_lsn) = validate_temp_timeline(conf, ttid, &tli_dir_path).await?;
|
||||
let (commit_lsn, flush_lsn) =
|
||||
validate_temp_timeline(conf, ttid, &tli_dir_path, generation).await?;
|
||||
info!(
|
||||
"finished downloading timeline {}, commit_lsn={}, flush_lsn={}",
|
||||
ttid, commit_lsn, flush_lsn
|
||||
@@ -698,10 +744,20 @@ async fn pull_timeline(
|
||||
assert!(status.commit_lsn <= status.flush_lsn);
|
||||
|
||||
// Finally, load the timeline.
|
||||
let _tli = global_timelines
|
||||
.load_temp_timeline(ttid, &tli_dir_path, check_tombstone)
|
||||
let timeline = global_timelines
|
||||
.load_temp_timeline(ttid, &tli_dir_path, generation)
|
||||
.await?;
|
||||
|
||||
if let Some(mconf) = mconf {
|
||||
// Switch to provided mconf to guarantee that the timeline will not
|
||||
// be deleted by request with older generation.
|
||||
// The generation might already be higer than the one in mconf, e.g.
|
||||
// if another membership_switch request was executed between `load_temp_timeline`
|
||||
// and `membership_switch`, but that's totaly fine. `membership_switch` will
|
||||
// ignore switch to older generation.
|
||||
timeline.membership_switch(mconf).await?;
|
||||
}
|
||||
|
||||
Ok(PullTimelineResponse {
|
||||
safekeeper_host: Some(host),
|
||||
})
|
||||
|
||||
@@ -1026,6 +1026,13 @@ where
|
||||
self.state.finish_change(&state).await?;
|
||||
}
|
||||
|
||||
if msg.mconf.generation > self.state.mconf.generation && !msg.mconf.contains(self.node_id) {
|
||||
bail!(
|
||||
"refused to switch into {}, node {} is not a member of it",
|
||||
msg.mconf,
|
||||
self.node_id,
|
||||
);
|
||||
}
|
||||
// Switch into conf given by proposer conf if it is higher.
|
||||
self.state.membership_switch(msg.mconf.clone()).await?;
|
||||
|
||||
|
||||
@@ -594,7 +594,7 @@ impl Timeline {
|
||||
|
||||
/// Cancel the timeline, requesting background activity to stop. Closing
|
||||
/// the `self.gate` waits for that.
|
||||
pub async fn cancel(&self) {
|
||||
pub fn cancel(&self) {
|
||||
info!("timeline {} shutting down", self.ttid);
|
||||
self.cancel.cancel();
|
||||
}
|
||||
@@ -914,6 +914,13 @@ impl Timeline {
|
||||
to: Configuration,
|
||||
) -> Result<TimelineMembershipSwitchResponse> {
|
||||
let mut state = self.write_shared_state().await;
|
||||
// Ensure we don't race with exclude/delete requests by checking the cancellation
|
||||
// token under the write_shared_state lock.
|
||||
// Exclude/delete cancel the timeline under the shared state lock,
|
||||
// so the timeline cannot be deleted in the middle of the membership switch.
|
||||
if self.is_cancelled() {
|
||||
bail!(TimelineError::Cancelled(self.ttid));
|
||||
}
|
||||
state.sk.membership_switch(to).await
|
||||
}
|
||||
|
||||
|
||||
@@ -10,13 +10,13 @@ use std::time::{Duration, Instant};
|
||||
use anyhow::{Context, Result, bail};
|
||||
use camino::Utf8PathBuf;
|
||||
use camino_tempfile::Utf8TempDir;
|
||||
use safekeeper_api::membership::Configuration;
|
||||
use safekeeper_api::membership::{Configuration, SafekeeperGeneration};
|
||||
use safekeeper_api::models::{SafekeeperUtilization, TimelineDeleteResult};
|
||||
use safekeeper_api::{ServerInfo, membership};
|
||||
use tokio::fs;
|
||||
use tracing::*;
|
||||
use utils::crashsafe::{durable_rename, fsync_async_opt};
|
||||
use utils::id::{TenantId, TenantTimelineId, TimelineId};
|
||||
use utils::id::{NodeId, TenantId, TenantTimelineId, TimelineId};
|
||||
use utils::lsn::Lsn;
|
||||
|
||||
use crate::defaults::DEFAULT_EVICTION_CONCURRENCY;
|
||||
@@ -40,10 +40,17 @@ enum GlobalMapTimeline {
|
||||
struct GlobalTimelinesState {
|
||||
timelines: HashMap<TenantTimelineId, GlobalMapTimeline>,
|
||||
|
||||
// A tombstone indicates this timeline used to exist has been deleted. These are used to prevent
|
||||
// on-demand timeline creation from recreating deleted timelines. This is only soft-enforced, as
|
||||
// this map is dropped on restart.
|
||||
tombstones: HashMap<TenantTimelineId, Instant>,
|
||||
/// A tombstone indicates this timeline used to exist has been deleted. These are used to prevent
|
||||
/// on-demand timeline creation from recreating deleted timelines. This is only soft-enforced, as
|
||||
/// this map is dropped on restart.
|
||||
/// The timeline might also be locally deleted (excluded) via safekeeper migration algorithm. In that case,
|
||||
/// the tombsone contains the corresponding safekeeper generation. The pull_timeline requests with
|
||||
/// higher generation ignore such tombstones and can recreate the timeline.
|
||||
timeline_tombstones: HashMap<TenantTimelineId, TimelineTombstone>,
|
||||
/// A tombstone indicates that the tenant used to exist has been deleted.
|
||||
/// These are created only by tenant_delete requests. They are always valid regardless of the
|
||||
/// request generation.
|
||||
/// This is only soft-enforced, as this map is dropped on restart.
|
||||
tenant_tombstones: HashMap<TenantId, Instant>,
|
||||
|
||||
conf: Arc<SafeKeeperConf>,
|
||||
@@ -79,7 +86,7 @@ impl GlobalTimelinesState {
|
||||
Err(TimelineError::CreationInProgress(*ttid))
|
||||
}
|
||||
None => {
|
||||
if self.has_tombstone(ttid) {
|
||||
if self.has_tombstone(ttid, None) {
|
||||
Err(TimelineError::Deleted(*ttid))
|
||||
} else {
|
||||
Err(TimelineError::NotFound(*ttid))
|
||||
@@ -88,20 +95,46 @@ impl GlobalTimelinesState {
|
||||
}
|
||||
}
|
||||
|
||||
fn has_tombstone(&self, ttid: &TenantTimelineId) -> bool {
|
||||
self.tombstones.contains_key(ttid) || self.tenant_tombstones.contains_key(&ttid.tenant_id)
|
||||
fn has_timeline_tombstone(
|
||||
&self,
|
||||
ttid: &TenantTimelineId,
|
||||
generation: Option<SafekeeperGeneration>,
|
||||
) -> bool {
|
||||
if let Some(generation) = generation {
|
||||
self.timeline_tombstones
|
||||
.get(ttid)
|
||||
.is_some_and(|t| t.is_valid(generation))
|
||||
} else {
|
||||
self.timeline_tombstones.contains_key(ttid)
|
||||
}
|
||||
}
|
||||
|
||||
/// Removes all blocking tombstones for the given timeline ID.
|
||||
fn has_tenant_tombstone(&self, tenant_id: &TenantId) -> bool {
|
||||
self.tenant_tombstones.contains_key(tenant_id)
|
||||
}
|
||||
|
||||
/// Check if the state has a tenant or a timeline tombstone.
|
||||
/// If `generation` is provided, check only for timeline tombsotnes with same or higher generation.
|
||||
/// If `generation` is `None`, check for any timeline tombstone.
|
||||
/// Tenant tombstones are checked regardless of the generation.
|
||||
fn has_tombstone(
|
||||
&self,
|
||||
ttid: &TenantTimelineId,
|
||||
generation: Option<SafekeeperGeneration>,
|
||||
) -> bool {
|
||||
self.has_timeline_tombstone(ttid, generation) || self.has_tenant_tombstone(&ttid.tenant_id)
|
||||
}
|
||||
|
||||
/// Removes timeline tombstone for the given timeline ID.
|
||||
/// Returns `true` if there have been actual changes.
|
||||
fn remove_tombstone(&mut self, ttid: &TenantTimelineId) -> bool {
|
||||
self.tombstones.remove(ttid).is_some()
|
||||
|| self.tenant_tombstones.remove(&ttid.tenant_id).is_some()
|
||||
fn remove_timeline_tombstone(&mut self, ttid: &TenantTimelineId) -> bool {
|
||||
self.timeline_tombstones.remove(ttid).is_some()
|
||||
}
|
||||
|
||||
fn delete(&mut self, ttid: TenantTimelineId) {
|
||||
fn delete(&mut self, ttid: TenantTimelineId, generation: Option<SafekeeperGeneration>) {
|
||||
self.timelines.remove(&ttid);
|
||||
self.tombstones.insert(ttid, Instant::now());
|
||||
self.timeline_tombstones
|
||||
.insert(ttid, TimelineTombstone::new(generation));
|
||||
}
|
||||
|
||||
fn add_tenant_tombstone(&mut self, tenant_id: TenantId) {
|
||||
@@ -120,7 +153,7 @@ impl GlobalTimelines {
|
||||
Self {
|
||||
state: Mutex::new(GlobalTimelinesState {
|
||||
timelines: HashMap::new(),
|
||||
tombstones: HashMap::new(),
|
||||
timeline_tombstones: HashMap::new(),
|
||||
tenant_tombstones: HashMap::new(),
|
||||
conf,
|
||||
broker_active_set: Arc::new(TimelinesSet::default()),
|
||||
@@ -261,6 +294,8 @@ impl GlobalTimelines {
|
||||
start_lsn: Lsn,
|
||||
commit_lsn: Lsn,
|
||||
) -> Result<Arc<Timeline>> {
|
||||
let generation = Some(mconf.generation);
|
||||
|
||||
let (conf, _, _, _) = {
|
||||
let state = self.state.lock().unwrap();
|
||||
if let Ok(timeline) = state.get(&ttid) {
|
||||
@@ -268,8 +303,8 @@ impl GlobalTimelines {
|
||||
return Ok(timeline);
|
||||
}
|
||||
|
||||
if state.has_tombstone(&ttid) {
|
||||
anyhow::bail!("Timeline {ttid} is deleted, refusing to recreate");
|
||||
if state.has_tombstone(&ttid, generation) {
|
||||
anyhow::bail!(TimelineError::Deleted(ttid));
|
||||
}
|
||||
|
||||
state.get_dependencies()
|
||||
@@ -284,7 +319,9 @@ impl GlobalTimelines {
|
||||
// immediately initialize first WAL segment as well.
|
||||
let state = TimelinePersistentState::new(&ttid, mconf, server_info, start_lsn, commit_lsn)?;
|
||||
control_file::FileStorage::create_new(&tmp_dir_path, state, conf.no_sync).await?;
|
||||
let timeline = self.load_temp_timeline(ttid, &tmp_dir_path, true).await?;
|
||||
let timeline = self
|
||||
.load_temp_timeline(ttid, &tmp_dir_path, generation)
|
||||
.await?;
|
||||
Ok(timeline)
|
||||
}
|
||||
|
||||
@@ -303,7 +340,7 @@ impl GlobalTimelines {
|
||||
&self,
|
||||
ttid: TenantTimelineId,
|
||||
tmp_path: &Utf8PathBuf,
|
||||
check_tombstone: bool,
|
||||
generation: Option<SafekeeperGeneration>,
|
||||
) -> Result<Arc<Timeline>> {
|
||||
// Check for existence and mark that we're creating it.
|
||||
let (conf, broker_active_set, partial_backup_rate_limiter, wal_backup) = {
|
||||
@@ -317,18 +354,18 @@ impl GlobalTimelines {
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
if check_tombstone {
|
||||
if state.has_tombstone(&ttid) {
|
||||
anyhow::bail!("timeline {ttid} is deleted, refusing to recreate");
|
||||
}
|
||||
} else {
|
||||
// We may be have been asked to load a timeline that was previously deleted (e.g. from `pull_timeline.rs`). We trust
|
||||
// that the human doing this manual intervention knows what they are doing, and remove its tombstone.
|
||||
// It's also possible that we enter this when the tenant has been deleted, even if the timeline itself has never existed.
|
||||
if state.remove_tombstone(&ttid) {
|
||||
warn!("un-deleted timeline {ttid}");
|
||||
}
|
||||
|
||||
if state.has_tombstone(&ttid, generation) {
|
||||
// If the timeline is deleted, we refuse to recreate it.
|
||||
// This is a safeguard against accidentally overwriting a timeline that was deleted
|
||||
// by concurrent request.
|
||||
anyhow::bail!(TimelineError::Deleted(ttid));
|
||||
}
|
||||
|
||||
// We might have an outdated tombstone with the older generation.
|
||||
// Remove it unconditionally.
|
||||
state.remove_timeline_tombstone(&ttid);
|
||||
|
||||
state
|
||||
.timelines
|
||||
.insert(ttid, GlobalMapTimeline::CreationInProgress);
|
||||
@@ -503,11 +540,16 @@ impl GlobalTimelines {
|
||||
ttid: &TenantTimelineId,
|
||||
action: DeleteOrExclude,
|
||||
) -> Result<TimelineDeleteResult, DeleteOrExcludeError> {
|
||||
let generation = match &action {
|
||||
DeleteOrExclude::Delete | DeleteOrExclude::DeleteLocal => None,
|
||||
DeleteOrExclude::Exclude(mconf) => Some(mconf.generation),
|
||||
};
|
||||
|
||||
let tli_res = {
|
||||
let state = self.state.lock().unwrap();
|
||||
|
||||
// Do NOT check tenant tombstones here: those were set earlier
|
||||
if state.tombstones.contains_key(ttid) {
|
||||
if state.has_timeline_tombstone(ttid, generation) {
|
||||
// Presence of a tombstone guarantees that a previous deletion has completed and there is no work to do.
|
||||
info!("Timeline {ttid} was already deleted");
|
||||
return Ok(TimelineDeleteResult { dir_existed: false });
|
||||
@@ -528,6 +570,11 @@ impl GlobalTimelines {
|
||||
// We would like to avoid holding the lock while waiting for the
|
||||
// gate to finish as this is deadlock prone, so for actual
|
||||
// deletion will take it second time.
|
||||
//
|
||||
// Canceling the timeline will block membership switch requests,
|
||||
// ensuring that the timeline generation will not increase
|
||||
// after this point, and we will not remove a timeline with a generation
|
||||
// higher than the requested one.
|
||||
if let DeleteOrExclude::Exclude(ref mconf) = action {
|
||||
let shared_state = timeline.read_shared_state().await;
|
||||
if shared_state.sk.state().mconf.generation > mconf.generation {
|
||||
@@ -536,9 +583,9 @@ impl GlobalTimelines {
|
||||
current: shared_state.sk.state().mconf.clone(),
|
||||
});
|
||||
}
|
||||
timeline.cancel().await;
|
||||
timeline.cancel();
|
||||
} else {
|
||||
timeline.cancel().await;
|
||||
timeline.cancel();
|
||||
}
|
||||
|
||||
timeline.close().await;
|
||||
@@ -565,7 +612,7 @@ impl GlobalTimelines {
|
||||
// Finalize deletion, by dropping Timeline objects and storing smaller tombstones. The tombstones
|
||||
// are used to prevent still-running computes from re-creating the same timeline when they send data,
|
||||
// and to speed up repeated deletion calls by avoiding re-listing objects.
|
||||
self.state.lock().unwrap().delete(*ttid);
|
||||
self.state.lock().unwrap().delete(*ttid, generation);
|
||||
|
||||
result
|
||||
}
|
||||
@@ -627,12 +674,16 @@ impl GlobalTimelines {
|
||||
// may recreate a deleted timeline.
|
||||
let now = Instant::now();
|
||||
state
|
||||
.tombstones
|
||||
.retain(|_, v| now.duration_since(*v) < *tombstone_ttl);
|
||||
.timeline_tombstones
|
||||
.retain(|_, v| now.duration_since(v.timestamp) < *tombstone_ttl);
|
||||
state
|
||||
.tenant_tombstones
|
||||
.retain(|_, v| now.duration_since(*v) < *tombstone_ttl);
|
||||
}
|
||||
|
||||
pub fn get_sk_id(&self) -> NodeId {
|
||||
self.state.lock().unwrap().conf.my_id
|
||||
}
|
||||
}
|
||||
|
||||
/// Action for delete_or_exclude.
|
||||
@@ -673,6 +724,7 @@ pub async fn validate_temp_timeline(
|
||||
conf: &SafeKeeperConf,
|
||||
ttid: TenantTimelineId,
|
||||
path: &Utf8PathBuf,
|
||||
generation: Option<SafekeeperGeneration>,
|
||||
) -> Result<(Lsn, Lsn)> {
|
||||
let control_path = path.join("safekeeper.control");
|
||||
|
||||
@@ -681,6 +733,15 @@ pub async fn validate_temp_timeline(
|
||||
bail!("wal_seg_size is not set");
|
||||
}
|
||||
|
||||
if let Some(generation) = generation {
|
||||
if control_store.mconf.generation > generation {
|
||||
bail!(
|
||||
"tmp timeline generation {} is higher than expected {generation}",
|
||||
control_store.mconf.generation
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
let wal_store = wal_storage::PhysicalStorage::new(&ttid, path, &control_store, conf.no_sync)?;
|
||||
|
||||
let commit_lsn = control_store.commit_lsn;
|
||||
@@ -688,3 +749,28 @@ pub async fn validate_temp_timeline(
|
||||
|
||||
Ok((commit_lsn, flush_lsn))
|
||||
}
|
||||
|
||||
/// A tombstone for a deleted timeline.
|
||||
/// The generation is passed with "exclude" request and stored in the tombstone.
|
||||
/// We ignore the tombstone if the request generation is higher than
|
||||
/// the tombstone generation.
|
||||
/// If the tombstone doesn't have a generation, it's considered permanent,
|
||||
/// e.g. after "delete" request.
|
||||
struct TimelineTombstone {
|
||||
timestamp: Instant,
|
||||
generation: Option<SafekeeperGeneration>,
|
||||
}
|
||||
|
||||
impl TimelineTombstone {
|
||||
fn new(generation: Option<SafekeeperGeneration>) -> Self {
|
||||
TimelineTombstone {
|
||||
timestamp: Instant::now(),
|
||||
generation,
|
||||
}
|
||||
}
|
||||
|
||||
/// Check if the timeline is still valid for the given generation.
|
||||
fn is_valid(&self, generation: SafekeeperGeneration) -> bool {
|
||||
self.generation.is_none_or(|g| g >= generation)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -351,7 +351,7 @@ impl Node {
|
||||
warn_threshold: u32,
|
||||
max_retries: u32,
|
||||
timeout: Duration,
|
||||
cancel: &CancellationToken,
|
||||
cancel_new_retries: &CancellationToken,
|
||||
) -> Option<mgmt_api::Result<T>>
|
||||
where
|
||||
O: FnMut(PageserverClient) -> F,
|
||||
@@ -402,7 +402,7 @@ impl Node {
|
||||
self.id,
|
||||
self.base_url(),
|
||||
),
|
||||
cancel,
|
||||
cancel_new_retries,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
@@ -110,7 +110,7 @@ impl Safekeeper {
|
||||
warn_threshold: u32,
|
||||
max_retries: u32,
|
||||
timeout: Duration,
|
||||
cancel: &CancellationToken,
|
||||
cancel_new_retries: &CancellationToken,
|
||||
) -> mgmt_api::Result<T>
|
||||
where
|
||||
O: FnMut(SafekeeperClient) -> F,
|
||||
@@ -161,7 +161,7 @@ impl Safekeeper {
|
||||
self.id,
|
||||
self.base_url(),
|
||||
),
|
||||
cancel,
|
||||
cancel_new_retries,
|
||||
)
|
||||
.await
|
||||
.unwrap_or(Err(mgmt_api::Error::Cancelled))
|
||||
|
||||
@@ -364,7 +364,12 @@ impl SafekeeperReconcilerInner {
|
||||
http_hosts,
|
||||
tenant_id: req.tenant_id,
|
||||
timeline_id,
|
||||
ignore_tombstone: Some(false),
|
||||
// TODO(diko): get mconf from "timelines" table and pass it here.
|
||||
// Now we use pull_timeline reconciliation only for the timeline creation,
|
||||
// so it's not critical right now.
|
||||
// It could be fixed together with other reconciliation issues:
|
||||
// https://github.com/neondatabase/neon/issues/12189
|
||||
mconf: None,
|
||||
};
|
||||
success = self
|
||||
.reconcile_inner(
|
||||
|
||||
@@ -123,12 +123,17 @@ impl Service {
|
||||
|
||||
/// Perform an operation on a list of safekeepers in parallel with retries.
|
||||
///
|
||||
/// If desired_success_count is set, the remaining operations will be cancelled
|
||||
/// when the desired number of successful responses is reached.
|
||||
///
|
||||
/// Return the results of the operation on each safekeeper in the input order.
|
||||
async fn tenant_timeline_safekeeper_op<T, O, F>(
|
||||
&self,
|
||||
safekeepers: &[Safekeeper],
|
||||
op: O,
|
||||
max_retries: u32,
|
||||
timeout: Duration,
|
||||
desired_success_count: Option<usize>,
|
||||
) -> Result<Vec<mgmt_api::Result<T>>, ApiError>
|
||||
where
|
||||
O: FnMut(SafekeeperClient) -> F + Send + 'static,
|
||||
@@ -136,6 +141,7 @@ impl Service {
|
||||
F: std::future::Future<Output = mgmt_api::Result<T>> + Send + 'static,
|
||||
T: Sync + Send + 'static,
|
||||
{
|
||||
let warn_threshold = std::cmp::min(3, max_retries);
|
||||
let jwt = self
|
||||
.config
|
||||
.safekeeper_jwt_token
|
||||
@@ -143,23 +149,26 @@ impl Service {
|
||||
.map(SecretString::from);
|
||||
let mut joinset = JoinSet::new();
|
||||
|
||||
let cancel_new_retries = CancellationToken::new();
|
||||
|
||||
for (idx, sk) in safekeepers.iter().enumerate() {
|
||||
let sk = sk.clone();
|
||||
let http_client = self.http_client.clone();
|
||||
let jwt = jwt.clone();
|
||||
let op = op.clone();
|
||||
let cancel_new_retries = cancel_new_retries.clone();
|
||||
joinset.spawn(async move {
|
||||
let res = sk
|
||||
.with_client_retries(
|
||||
op,
|
||||
&http_client,
|
||||
&jwt,
|
||||
3,
|
||||
3,
|
||||
warn_threshold,
|
||||
max_retries,
|
||||
// TODO(diko): This is a wrong timeout.
|
||||
// It should be scaled to the retry count.
|
||||
timeout,
|
||||
&CancellationToken::new(),
|
||||
&cancel_new_retries,
|
||||
)
|
||||
.await;
|
||||
(idx, res)
|
||||
@@ -184,6 +193,7 @@ impl Service {
|
||||
// Wait until all tasks finish or timeout is hit, whichever occurs
|
||||
// first.
|
||||
let mut result_count = 0;
|
||||
let mut success_count = 0;
|
||||
loop {
|
||||
if let Ok(res) = tokio::time::timeout_at(reconcile_deadline, joinset.join_next()).await
|
||||
{
|
||||
@@ -198,6 +208,15 @@ impl Service {
|
||||
// Only print errors, as there is no Debug trait for T.
|
||||
res.as_ref().map(|_| ()),
|
||||
);
|
||||
if res.is_ok() {
|
||||
success_count += 1;
|
||||
if desired_success_count == Some(success_count) {
|
||||
// We reached the desired number of successful responses, cancel new retries for
|
||||
// the remaining safekeepers.
|
||||
// It does not cancel already started requests, we will still wait for them.
|
||||
cancel_new_retries.cancel();
|
||||
}
|
||||
}
|
||||
results[idx] = res;
|
||||
result_count += 1;
|
||||
}
|
||||
@@ -247,14 +266,14 @@ impl Service {
|
||||
);
|
||||
}
|
||||
|
||||
let quorum_size = target_sk_count / 2 + 1;
|
||||
let max_retries = 3;
|
||||
|
||||
let results = self
|
||||
.tenant_timeline_safekeeper_op(safekeepers, op, timeout)
|
||||
.tenant_timeline_safekeeper_op(safekeepers, op, max_retries, timeout, Some(quorum_size))
|
||||
.await?;
|
||||
|
||||
// Now check if quorum was reached in results.
|
||||
|
||||
let quorum_size = target_sk_count / 2 + 1;
|
||||
|
||||
let success_count = results.iter().filter(|res| res.is_ok()).count();
|
||||
if success_count < quorum_size {
|
||||
// Failure
|
||||
@@ -991,6 +1010,7 @@ impl Service {
|
||||
timeline_id: TimelineId,
|
||||
to_safekeepers: &[Safekeeper],
|
||||
from_safekeepers: &[Safekeeper],
|
||||
mconf: membership::Configuration,
|
||||
) -> Result<(), ApiError> {
|
||||
let http_hosts = from_safekeepers
|
||||
.iter()
|
||||
@@ -1009,17 +1029,15 @@ impl Service {
|
||||
.collect::<Vec<_>>()
|
||||
);
|
||||
|
||||
// TODO(diko): need to pass mconf/generation with the request
|
||||
// to properly handle tombstones. Ignore tombstones for now.
|
||||
// Worst case: we leave a timeline on a safekeeper which is not in the current set.
|
||||
let req = PullTimelineRequest {
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
http_hosts,
|
||||
ignore_tombstone: Some(true),
|
||||
mconf: Some(mconf),
|
||||
};
|
||||
|
||||
const SK_PULL_TIMELINE_RECONCILE_TIMEOUT: Duration = Duration::from_secs(30);
|
||||
let max_retries = 3;
|
||||
|
||||
let responses = self
|
||||
.tenant_timeline_safekeeper_op(
|
||||
@@ -1028,7 +1046,9 @@ impl Service {
|
||||
let req = req.clone();
|
||||
async move { client.pull_timeline(&req).await }
|
||||
},
|
||||
max_retries,
|
||||
SK_PULL_TIMELINE_RECONCILE_TIMEOUT,
|
||||
None,
|
||||
)
|
||||
.await?;
|
||||
|
||||
@@ -1066,6 +1086,9 @@ impl Service {
|
||||
};
|
||||
|
||||
const SK_EXCLUDE_TIMELINE_TIMEOUT: Duration = Duration::from_secs(30);
|
||||
// Do not retry failed requests to speed up the finishing phase.
|
||||
// They will be retried in the reconciler.
|
||||
let max_retries = 0;
|
||||
|
||||
let results = self
|
||||
.tenant_timeline_safekeeper_op(
|
||||
@@ -1074,7 +1097,9 @@ impl Service {
|
||||
let req = req.clone();
|
||||
async move { client.exclude_timeline(tenant_id, timeline_id, &req).await }
|
||||
},
|
||||
max_retries,
|
||||
SK_EXCLUDE_TIMELINE_TIMEOUT,
|
||||
None,
|
||||
)
|
||||
.await?;
|
||||
|
||||
@@ -1336,6 +1361,7 @@ impl Service {
|
||||
timeline_id,
|
||||
&pull_to_safekeepers,
|
||||
&cur_safekeepers,
|
||||
joint_config.clone(),
|
||||
)
|
||||
.await?;
|
||||
|
||||
|
||||
@@ -1540,6 +1540,17 @@ class NeonEnv:
|
||||
|
||||
raise RuntimeError(f"Pageserver with ID {id} not found")
|
||||
|
||||
def get_safekeeper(self, id: int) -> Safekeeper:
|
||||
"""
|
||||
Look up a safekeeper by its ID.
|
||||
"""
|
||||
|
||||
for sk in self.safekeepers:
|
||||
if sk.id == id:
|
||||
return sk
|
||||
|
||||
raise RuntimeError(f"Safekeeper with ID {id} not found")
|
||||
|
||||
def get_tenant_pageserver(self, tenant_id: TenantId | TenantShardId):
|
||||
"""
|
||||
Get the NeonPageserver where this tenant shard is currently attached, according
|
||||
@@ -5391,15 +5402,24 @@ class Safekeeper(LogUtils):
|
||||
return timeline_status.commit_lsn
|
||||
|
||||
def pull_timeline(
|
||||
self, srcs: list[Safekeeper], tenant_id: TenantId, timeline_id: TimelineId
|
||||
self,
|
||||
srcs: list[Safekeeper],
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
mconf: MembershipConfiguration | None = None,
|
||||
) -> dict[str, Any]:
|
||||
"""
|
||||
pull_timeline from srcs to self.
|
||||
"""
|
||||
src_https = [f"http://localhost:{sk.port.http}" for sk in srcs]
|
||||
res = self.http_client().pull_timeline(
|
||||
{"tenant_id": str(tenant_id), "timeline_id": str(timeline_id), "http_hosts": src_https}
|
||||
)
|
||||
body: dict[str, Any] = {
|
||||
"tenant_id": str(tenant_id),
|
||||
"timeline_id": str(timeline_id),
|
||||
"http_hosts": src_https,
|
||||
}
|
||||
if mconf is not None:
|
||||
body["mconf"] = mconf.__dict__
|
||||
res = self.http_client().pull_timeline(body)
|
||||
src_ids = [sk.id for sk in srcs]
|
||||
log.info(f"finished pulling timeline from {src_ids} to {self.id}")
|
||||
return res
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import re
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
import pytest
|
||||
@@ -12,7 +13,7 @@ if TYPE_CHECKING:
|
||||
|
||||
# TODO(diko): pageserver spams with various errors during safekeeper migration.
|
||||
# Fix the code so it handles the migration better.
|
||||
ALLOWED_PAGESERVER_ERRORS = [
|
||||
PAGESERVER_ALLOWED_ERRORS = [
|
||||
".*Timeline .* was cancelled and cannot be used anymore.*",
|
||||
".*Timeline .* has been deleted.*",
|
||||
".*Timeline .* was not found in global map.*",
|
||||
@@ -35,7 +36,7 @@ def test_safekeeper_migration_simple(neon_env_builder: NeonEnvBuilder):
|
||||
"timeline_safekeeper_count": 1,
|
||||
}
|
||||
env = neon_env_builder.init_start()
|
||||
env.pageserver.allowed_errors.extend(ALLOWED_PAGESERVER_ERRORS)
|
||||
env.pageserver.allowed_errors.extend(PAGESERVER_ALLOWED_ERRORS)
|
||||
|
||||
ep = env.endpoints.create("main", tenant_id=env.initial_tenant)
|
||||
|
||||
@@ -136,7 +137,7 @@ def test_safekeeper_migration_common_set_failpoints(neon_env_builder: NeonEnvBui
|
||||
"timeline_safekeeper_count": 3,
|
||||
}
|
||||
env = neon_env_builder.init_start()
|
||||
env.pageserver.allowed_errors.extend(ALLOWED_PAGESERVER_ERRORS)
|
||||
env.pageserver.allowed_errors.extend(PAGESERVER_ALLOWED_ERRORS)
|
||||
|
||||
mconf = env.storage_controller.timeline_locate(env.initial_tenant, env.initial_timeline)
|
||||
assert len(mconf["sk_set"]) == 3
|
||||
@@ -196,3 +197,122 @@ def test_safekeeper_migration_common_set_failpoints(neon_env_builder: NeonEnvBui
|
||||
assert (
|
||||
f"timeline {env.initial_tenant}/{env.initial_timeline} deleted" in exc.value.response.text
|
||||
)
|
||||
|
||||
|
||||
def test_sk_generation_aware_tombstones(neon_env_builder: NeonEnvBuilder):
|
||||
"""
|
||||
Test that safekeeper respects generations:
|
||||
1. Check that migration back and forth between two safekeepers works.
|
||||
2. Check that sk refuses to execute requests with stale generation.
|
||||
"""
|
||||
neon_env_builder.num_safekeepers = 3
|
||||
neon_env_builder.storage_controller_config = {
|
||||
"timelines_onto_safekeepers": True,
|
||||
"timeline_safekeeper_count": 1,
|
||||
}
|
||||
env = neon_env_builder.init_start()
|
||||
env.pageserver.allowed_errors.extend(PAGESERVER_ALLOWED_ERRORS)
|
||||
|
||||
mconf = env.storage_controller.timeline_locate(env.initial_tenant, env.initial_timeline)
|
||||
assert mconf["new_sk_set"] is None
|
||||
assert len(mconf["sk_set"]) == 1
|
||||
cur_sk = mconf["sk_set"][0]
|
||||
|
||||
second_sk, third_sk = [sk.id for sk in env.safekeepers if sk.id != cur_sk]
|
||||
cur_gen = 1
|
||||
|
||||
# Pull the timeline manually to third_sk, so the timeline exists there with stale generation.
|
||||
# This is needed for the test later.
|
||||
env.get_safekeeper(third_sk).pull_timeline(
|
||||
[env.get_safekeeper(cur_sk)], env.initial_tenant, env.initial_timeline
|
||||
)
|
||||
|
||||
def expect_deleted(sk_id: int):
|
||||
with pytest.raises(requests.exceptions.HTTPError, match="Not Found") as exc:
|
||||
env.get_safekeeper(sk_id).http_client().timeline_status(
|
||||
env.initial_tenant, env.initial_timeline
|
||||
)
|
||||
assert exc.value.response.status_code == 404
|
||||
assert re.match(r".*timeline .* deleted.*", exc.value.response.text)
|
||||
|
||||
def get_mconf(sk_id: int):
|
||||
status = (
|
||||
env.get_safekeeper(sk_id)
|
||||
.http_client()
|
||||
.timeline_status(env.initial_tenant, env.initial_timeline)
|
||||
)
|
||||
assert status.mconf is not None
|
||||
return status.mconf
|
||||
|
||||
def migrate():
|
||||
nonlocal cur_sk, second_sk, cur_gen
|
||||
env.storage_controller.migrate_safekeepers(
|
||||
env.initial_tenant, env.initial_timeline, [second_sk]
|
||||
)
|
||||
cur_sk, second_sk = second_sk, cur_sk
|
||||
cur_gen += 2
|
||||
|
||||
# Migrate the timeline back and forth between cur_sk and second_sk.
|
||||
for _i in range(3):
|
||||
migrate()
|
||||
# Timeline should exist on cur_sk.
|
||||
assert get_mconf(cur_sk).generation == cur_gen
|
||||
# Timeline should be deleted on second_sk.
|
||||
expect_deleted(second_sk)
|
||||
|
||||
# Remember current mconf.
|
||||
mconf = get_mconf(cur_sk)
|
||||
|
||||
# Migrate the timeline one more time.
|
||||
# It increases the generation by 2.
|
||||
migrate()
|
||||
|
||||
# Check that sk refuses to execute the exclude request with the old mconf.
|
||||
with pytest.raises(requests.exceptions.HTTPError, match="Conflict") as exc:
|
||||
env.get_safekeeper(cur_sk).http_client().timeline_exclude(
|
||||
env.initial_tenant, env.initial_timeline, mconf
|
||||
)
|
||||
assert re.match(r".*refused to switch into excluding mconf.*", exc.value.response.text)
|
||||
# We shouldn't have deleted the timeline.
|
||||
assert get_mconf(cur_sk).generation == cur_gen
|
||||
|
||||
# Check that sk refuses to execute the pull_timeline request with the old mconf.
|
||||
# Note: we try to pull from third_sk, which has a timeline with stale generation.
|
||||
# Thus, we bypass some preliminary generation checks and actually test tombstones.
|
||||
with pytest.raises(requests.exceptions.HTTPError, match="Conflict") as exc:
|
||||
env.get_safekeeper(second_sk).pull_timeline(
|
||||
[env.get_safekeeper(third_sk)], env.initial_tenant, env.initial_timeline, mconf
|
||||
)
|
||||
assert re.match(r".*Timeline .* deleted.*", exc.value.response.text)
|
||||
# The timeline should remain deleted.
|
||||
expect_deleted(second_sk)
|
||||
|
||||
|
||||
def test_migrate_from_unavailable_sk(neon_env_builder: NeonEnvBuilder):
|
||||
"""
|
||||
Test that we can migrate from an unavailable safekeeper
|
||||
if the quorum is still alive.
|
||||
"""
|
||||
neon_env_builder.num_safekeepers = 4
|
||||
neon_env_builder.storage_controller_config = {
|
||||
"timelines_onto_safekeepers": True,
|
||||
"timeline_safekeeper_count": 3,
|
||||
}
|
||||
env = neon_env_builder.init_start()
|
||||
env.pageserver.allowed_errors.extend(PAGESERVER_ALLOWED_ERRORS)
|
||||
|
||||
mconf = env.storage_controller.timeline_locate(env.initial_tenant, env.initial_timeline)
|
||||
assert len(mconf["sk_set"]) == 3
|
||||
|
||||
another_sk = [sk.id for sk in env.safekeepers if sk.id not in mconf["sk_set"]][0]
|
||||
|
||||
unavailable_sk = mconf["sk_set"][0]
|
||||
env.get_safekeeper(unavailable_sk).stop()
|
||||
|
||||
new_sk_set = mconf["sk_set"][1:] + [another_sk]
|
||||
|
||||
env.storage_controller.migrate_safekeepers(env.initial_tenant, env.initial_timeline, new_sk_set)
|
||||
|
||||
mconf = env.storage_controller.timeline_locate(env.initial_tenant, env.initial_timeline)
|
||||
assert mconf["sk_set"] == new_sk_set
|
||||
assert mconf["generation"] == 3
|
||||
|
||||
2
vendor/postgres-v14
vendored
2
vendor/postgres-v14
vendored
Submodule vendor/postgres-v14 updated: 4cacada8bd...c9f9fdd011
2
vendor/postgres-v15
vendored
2
vendor/postgres-v15
vendored
Submodule vendor/postgres-v15 updated: e5ee23d998...aaaeff2550
2
vendor/postgres-v16
vendored
2
vendor/postgres-v16
vendored
Submodule vendor/postgres-v16 updated: ad2b69b582...9b9cb4b3e3
2
vendor/postgres-v17
vendored
2
vendor/postgres-v17
vendored
Submodule vendor/postgres-v17 updated: ba750903a9...fa1788475e
Reference in New Issue
Block a user