Compare commits

..

11 Commits

Author SHA1 Message Date
John Spray
6bd443e3f7 Revise is_fatal_io_error to use allow list 2023-10-05 10:09:49 +01:00
John Spray
dd54c7e687 Clean up unreachable blocks after fatal_io_error 2023-10-05 09:58:09 +01:00
John Spray
89bc3aef1a Merge remote-tracking branch 'upstream/main' into jcsp/terminate-on-io-errors 2023-10-05 09:57:01 +01:00
John Spray
0a502f4117 Use nix errno constants 2023-10-05 09:50:48 +01:00
John Spray
2e19940674 Update pageserver/src/virtual_file.rs
Co-authored-by: Joonas Koivunen <joonas@neon.tech>
2023-10-05 09:47:58 +01:00
John Spray
682f1df2ee Adapt block_io/blob_io to virtual_file::Error 2023-10-02 15:38:08 +01:00
John Spray
c60ffde0c8 Use virtual_file::Error in interface 2023-10-02 15:19:37 +01:00
John Spray
964463bb0b Define a virtual_file::Error type that auto-terminates 2023-10-02 15:18:23 +01:00
John Spray
8df507ccea pageserver: make I/O errors in deletion queue fatal 2023-10-02 14:28:04 +01:00
John Spray
cc2c1a8bf4 pageserver: add hook for terminating on I/O errors 2023-10-02 14:28:04 +01:00
John Spray
218b514498 pageserver: deletion queue nits 2023-10-02 11:41:10 +01:00
33 changed files with 2725 additions and 3748 deletions

View File

@@ -1092,10 +1092,8 @@ jobs:
run: |
if [[ "$GITHUB_REF_NAME" == "main" ]]; then
gh workflow --repo neondatabase/aws run deploy-dev.yml --ref main -f branch=main -f dockerTag=${{needs.tag.outputs.build-tag}} -f deployPreprodRegion=false
# TODO: move deployPreprodRegion to release (`"$GITHUB_REF_NAME" == "release"` block), once Staging support different compute tag prefixes for different regions
gh workflow --repo neondatabase/aws run deploy-dev.yml --ref main -f branch=main -f dockerTag=${{needs.tag.outputs.build-tag}} -f deployPreprodRegion=true
elif [[ "$GITHUB_REF_NAME" == "release" ]]; then
gh workflow --repo neondatabase/aws run deploy-dev.yml --ref main -f branch=main -f dockerTag=${{needs.tag.outputs.build-tag}} -f deployPreprodRegion=true
gh workflow --repo neondatabase/aws run deploy-prod.yml --ref main -f branch=main -f dockerTag=${{needs.tag.outputs.build-tag}} -f disclamerAcknowledged=true
else
echo "GITHUB_REF_NAME (value '$GITHUB_REF_NAME') is not set to either 'main' or 'release'"

View File

@@ -44,7 +44,7 @@ use std::{thread, time::Duration};
use anyhow::{Context, Result};
use chrono::Utc;
use clap::Arg;
use tracing::{error, info, info_span};
use tracing::{error, info};
use url::Url;
use compute_api::responses::ComputeStatus;
@@ -57,7 +57,6 @@ use compute_tools::logger::*;
use compute_tools::monitor::launch_monitor;
use compute_tools::params::*;
use compute_tools::spec::*;
use utils::id::TenantTimelineId;
// this is an arbitrary build tag. Fine as a default / for testing purposes
// in-case of not-set environment var
@@ -250,20 +249,11 @@ fn main() -> Result<()> {
state.status = ComputeStatus::Init;
compute.state_changed.notify_all();
let pspec = state.pspec.as_ref().expect("spec must be set");
let ttid = TenantTimelineId {
tenant_id: pspec.tenant_id,
timeline_id: pspec.timeline_id,
};
drop(state);
// Log ttid everywhere for easier log identification (e.g. loki agent can
// create label on that).
let _guard = info_span!("", ttid = %ttid).entered();
// Launch remaining service threads
let _monitor_handle = launch_monitor(&compute, ttid);
let _configurator_handle = launch_configurator(&compute, ttid);
let _monitor_handle = launch_monitor(&compute);
let _configurator_handle = launch_configurator(&compute);
// Start Postgres
let mut delay_exit = false;

View File

@@ -4,13 +4,11 @@ use std::thread;
use tracing::{error, info, instrument};
use compute_api::responses::ComputeStatus;
use utils::id::TenantTimelineId;
use crate::compute::ComputeNode;
// Log ttid everywhere
#[instrument(name = "", fields(ttid = %ttid), skip_all)]
fn configurator_main_loop(compute: &Arc<ComputeNode>, ttid: TenantTimelineId) {
#[instrument(skip_all)]
fn configurator_main_loop(compute: &Arc<ComputeNode>) {
info!("waiting for reconfiguration requests");
loop {
let state = compute.state.lock().unwrap();
@@ -43,16 +41,13 @@ fn configurator_main_loop(compute: &Arc<ComputeNode>, ttid: TenantTimelineId) {
}
}
pub fn launch_configurator(
compute: &Arc<ComputeNode>,
ttid: TenantTimelineId,
) -> thread::JoinHandle<()> {
pub fn launch_configurator(compute: &Arc<ComputeNode>) -> thread::JoinHandle<()> {
let compute = Arc::clone(compute);
thread::Builder::new()
.name("compute-configurator".into())
.spawn(move || {
configurator_main_loop(&compute, ttid);
configurator_main_loop(&compute);
info!("configurator thread is exited");
})
.expect("cannot launch configurator thread")

View File

@@ -3,8 +3,7 @@ use std::{thread, time::Duration};
use chrono::{DateTime, Utc};
use postgres::{Client, NoTls};
use tracing::{debug, info, instrument};
use utils::id::TenantTimelineId;
use tracing::{debug, info};
use crate::compute::ComputeNode;
@@ -13,8 +12,7 @@ const MONITOR_CHECK_INTERVAL: Duration = Duration::from_millis(500);
// Spin in a loop and figure out the last activity time in the Postgres.
// Then update it in the shared state. This function never errors out.
// XXX: the only expected panic is at `RwLock` unwrap().
#[instrument(name = "", fields(ttid = %ttid), skip_all)]
fn watch_compute_activity(compute: &ComputeNode, ttid: TenantTimelineId) {
fn watch_compute_activity(compute: &ComputeNode) {
// Suppose that `connstr` doesn't change
let connstr = compute.connstr.as_str();
// Define `client` outside of the loop to reuse existing connection if it's active.
@@ -105,11 +103,11 @@ fn watch_compute_activity(compute: &ComputeNode, ttid: TenantTimelineId) {
}
/// Launch a separate compute monitor thread and return its `JoinHandle`.
pub fn launch_monitor(state: &Arc<ComputeNode>, ttid: TenantTimelineId) -> thread::JoinHandle<()> {
pub fn launch_monitor(state: &Arc<ComputeNode>) -> thread::JoinHandle<()> {
let state = Arc::clone(state);
thread::Builder::new()
.name("compute-monitor".into())
.spawn(move || watch_compute_activity(&state, ttid))
.spawn(move || watch_compute_activity(&state))
.expect("cannot launch compute monitor thread")
}

View File

@@ -10,7 +10,6 @@ use serde_with::{serde_as, DisplayFromStr};
use strum_macros;
use utils::{
completion,
generation::Generation,
history_buffer::HistoryBufferWithDropCounter,
id::{NodeId, TenantId, TimelineId},
lsn::Lsn,
@@ -219,8 +218,6 @@ impl std::ops::Deref for TenantCreateRequest {
}
}
/// An alternative representation of `pageserver::tenant::TenantConf` with
/// simpler types.
#[derive(Serialize, Deserialize, Debug, Default)]
pub struct TenantConfig {
pub checkpoint_distance: Option<u64>,
@@ -246,39 +243,6 @@ pub struct TenantConfig {
pub gc_feedback: Option<bool>,
}
/// A flattened analog of a `pagesever::tenant::LocationMode`, which
/// lists out all possible states (and the virtual "Detached" state)
/// in a flat form rather than using rust-style enums.
#[derive(Serialize, Deserialize, Debug)]
pub enum LocationConfigMode {
AttachedSingle,
AttachedMulti,
AttachedStale,
Secondary,
Detached,
}
#[derive(Serialize, Deserialize, Debug)]
pub struct LocationConfigSecondary {
pub warm: bool,
}
/// An alternative representation of `pageserver::tenant::LocationConf`,
/// for use in external-facing APIs.
#[derive(Serialize, Deserialize, Debug)]
pub struct LocationConfig {
pub mode: LocationConfigMode,
/// If attaching, in what generation?
#[serde(default)]
pub generation: Option<Generation>,
#[serde(default)]
pub secondary_conf: Option<LocationConfigSecondary>,
// If requesting mode `Secondary`, configuration for that.
// Custom storage configuration for the tenant, if any
pub tenant_conf: TenantConfig,
}
#[serde_as]
#[derive(Serialize, Deserialize)]
#[serde(transparent)]
@@ -289,16 +253,6 @@ pub struct StatusResponse {
pub id: NodeId,
}
#[serde_as]
#[derive(Serialize, Deserialize, Debug)]
#[serde(deny_unknown_fields)]
pub struct TenantLocationConfigRequest {
#[serde_as(as = "DisplayFromStr")]
pub tenant_id: TenantId,
#[serde(flatten)]
pub config: LocationConfig, // as we have a flattened field, we should reject all unknown fields in it
}
#[serde_as]
#[derive(Serialize, Deserialize, Debug)]
#[serde(deny_unknown_fields)]

View File

@@ -37,8 +37,8 @@ use crate::tenant::{
TIMELINES_SEGMENT_NAME,
};
use crate::{
IGNORED_TENANT_FILE_NAME, METADATA_FILE_NAME, TENANT_CONFIG_NAME, TENANT_LOCATION_CONFIG_NAME,
TIMELINE_DELETE_MARK_SUFFIX, TIMELINE_UNINIT_MARK_SUFFIX,
IGNORED_TENANT_FILE_NAME, METADATA_FILE_NAME, TENANT_CONFIG_NAME, TIMELINE_DELETE_MARK_SUFFIX,
TIMELINE_UNINIT_MARK_SUFFIX,
};
pub mod defaults {
@@ -631,18 +631,10 @@ impl PageServerConf {
/// Points to a place in pageserver's local directory,
/// where certain tenant's tenantconf file should be located.
///
/// Legacy: superseded by tenant_location_config_path. Eventually
/// remove this function.
pub fn tenant_config_path(&self, tenant_id: &TenantId) -> Utf8PathBuf {
self.tenant_path(tenant_id).join(TENANT_CONFIG_NAME)
}
pub fn tenant_location_config_path(&self, tenant_id: &TenantId) -> Utf8PathBuf {
self.tenant_path(tenant_id)
.join(TENANT_LOCATION_CONFIG_NAME)
}
pub fn timelines_path(&self, tenant_id: &TenantId) -> Utf8PathBuf {
self.tenant_path(tenant_id).join(TIMELINES_SEGMENT_NAME)
}

View File

@@ -133,8 +133,6 @@ impl ControlPlaneGenerationsApi for ControlPlaneClient {
node_id: self.node_id,
};
fail::fail_point!("control-plane-client-re-attach");
let response: ReAttachResponse = self.retry_http_forever(&re_attach_path, request).await?;
tracing::info!(
"Received re-attach response with {} tenants",
@@ -170,8 +168,6 @@ impl ControlPlaneGenerationsApi for ControlPlaneClient {
.collect(),
};
fail::fail_point!("control-plane-client-validate");
let response: ValidateResponse = self.retry_http_forever(&re_attach_path, request).await?;
Ok(response

View File

@@ -34,6 +34,8 @@ use crate::deletion_queue::TEMP_SUFFIX;
use crate::metrics;
use crate::tenant::remote_timeline_client::remote_layer_path;
use crate::tenant::storage_layer::LayerFileName;
use crate::virtual_file;
use crate::virtual_file::on_fatal_io_error;
// The number of keys in a DeletionList before we will proactively persist it
// (without reaching a flush deadline). This aims to deliver objects of the order
@@ -195,7 +197,7 @@ impl ListWriter {
debug!("Deletion header {header_path} not found, first start?");
Ok(None)
} else {
Err(anyhow::anyhow!(e))
on_fatal_io_error(&e);
}
}
}
@@ -221,9 +223,9 @@ impl ListWriter {
Err(e) => {
warn!("Failed to open deletion list directory {deletion_directory}: {e:#}");
// Give up: if we can't read the deletion list directory, we probably can't
// write lists into it later, so the queue won't work.
return Err(e.into());
// This is fatal: any failure to read this local directory indicates a
// storage problem or configuration problem of the node.
virtual_file::on_fatal_io_error(&e);
}
};
@@ -249,6 +251,8 @@ impl ListWriter {
// Non-fatal error: we will just leave the file behind but not
// try and load it.
warn!("Failed to clean up temporary file {absolute_path}: {e:#}");
virtual_file::on_fatal_io_error(&e);
}
continue;
@@ -261,7 +265,7 @@ impl ListWriter {
.expect("Non optional group should be present")
.as_str()
} else {
warn!("Unexpected key in deletion queue: {basename}");
warn!("Unexpected filename in deletion queue: {basename}");
metrics::DELETION_QUEUE.unexpected_errors.inc();
continue;
};
@@ -289,7 +293,12 @@ impl ListWriter {
for s in seqs {
let list_path = self.conf.deletion_list_path(s);
let list_bytes = tokio::fs::read(&list_path).await?;
let list_bytes = match tokio::fs::read(&list_path).await {
Ok(b) => b,
Err(e) => {
virtual_file::on_fatal_io_error(&e);
}
};
let mut deletion_list = match serde_json::from_slice::<DeletionList>(&list_bytes) {
Ok(l) => l,

View File

@@ -28,6 +28,7 @@ use crate::config::PageServerConf;
use crate::control_plane_client::ControlPlaneGenerationsApi;
use crate::control_plane_client::RetryForeverError;
use crate::metrics;
use crate::virtual_file::on_fatal_io_error;
use super::deleter::DeleterMessage;
use super::DeletionHeader;
@@ -116,6 +117,11 @@ where
/// Valid LSN updates propagate back to Timelines immediately, valid DeletionLists
/// go into the queue of ready-to-execute lists.
async fn validate(&mut self) -> Result<(), DeletionQueueError> {
// Figure out for each tenant which generation number to validate.
//
// It is sufficient to validate the max generation number of each tenant because only the
// highest generation number can possibly be valid. Hence this map will collect the
// highest generation pending validation for each tenant.
let mut tenant_generations = HashMap::new();
for list in &self.pending_lists {
for (tenant_id, tenant_list) in &list.tenants {
@@ -220,8 +226,6 @@ where
warn!("Dropping stale deletions for tenant {tenant_id} in generation {:?}, objects may be leaked", tenant.generation);
metrics::DELETION_QUEUE.keys_dropped.inc_by(tenant.len() as u64);
mutated = true;
} else {
metrics::DELETION_QUEUE.keys_validated.inc_by(tenant.len() as u64);
}
this_list_valid
});
@@ -248,6 +252,11 @@ where
}
}
// Assert monotonicity of the list sequence numbers we are processing
if let Some(validated) = validated_sequence {
assert!(list.sequence >= validated)
}
validated_sequence = Some(list.sequence);
}
@@ -295,7 +304,8 @@ where
// issue (probably permissions) has been fixed by then.
tracing::error!("Failed to delete {list_path}: {e:#}");
metrics::DELETION_QUEUE.unexpected_errors.inc();
break;
on_fatal_io_error(&e);
}
}
}

View File

@@ -10,8 +10,7 @@ use hyper::StatusCode;
use hyper::{Body, Request, Response, Uri};
use metrics::launch_timestamp::LaunchTimestamp;
use pageserver_api::models::{
DownloadRemoteLayersTaskSpawnRequest, LocationConfigMode, TenantAttachRequest,
TenantLoadRequest, TenantLocationConfigRequest,
DownloadRemoteLayersTaskSpawnRequest, TenantAttachRequest, TenantLoadRequest,
};
use remote_storage::GenericRemoteStorage;
use tenant_size_model::{SizeResult, StorageModel};
@@ -30,7 +29,7 @@ use crate::deletion_queue::DeletionQueueClient;
use crate::metrics::{StorageTimeOperation, STORAGE_TIME_GLOBAL};
use crate::pgdatadir_mapping::LsnForTimestamp;
use crate::task_mgr::TaskKind;
use crate::tenant::config::{LocationConf, TenantConfOpt};
use crate::tenant::config::TenantConfOpt;
use crate::tenant::mgr::{
GetTenantError, SetNewTenantConfigError, TenantMapInsertError, TenantStateError,
};
@@ -151,10 +150,7 @@ impl From<TenantMapInsertError> for ApiError {
TenantMapInsertError::TenantAlreadyExists(id, state) => {
ApiError::Conflict(format!("tenant {id} already exists, state: {state:?}"))
}
TenantMapInsertError::TenantExistsSecondary(id) => {
ApiError::Conflict(format!("tenant {id} already exists as secondary"))
}
TenantMapInsertError::Other(e) => ApiError::InternalServerError(e),
TenantMapInsertError::Closure(e) => ApiError::InternalServerError(e),
}
}
}
@@ -1015,48 +1011,6 @@ async fn update_tenant_config_handler(
json_response(StatusCode::OK, ())
}
async fn put_tenant_location_config_handler(
mut request: Request<Body>,
_cancel: CancellationToken,
) -> Result<Response<Body>, ApiError> {
let request_data: TenantLocationConfigRequest = json_request(&mut request).await?;
let tenant_id = request_data.tenant_id;
check_permission(&request, Some(tenant_id))?;
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Warn);
let state = get_state(&request);
let conf = state.conf;
// The `Detached` state is special, it doesn't upsert a tenant, it removes
// its local disk content and drops it from memory.
if let LocationConfigMode::Detached = request_data.config.mode {
mgr::detach_tenant(conf, tenant_id, true)
.instrument(info_span!("tenant_detach", %tenant_id))
.await?;
return json_response(StatusCode::OK, ());
}
let location_conf =
LocationConf::try_from(&request_data.config).map_err(ApiError::BadRequest)?;
mgr::upsert_location(
state.conf,
tenant_id,
location_conf,
state.broker_client.clone(),
state.remote_storage.clone(),
state.deletion_queue_client.clone(),
&ctx,
)
.await
// TODO: badrequest assumes the caller was asking for something unreasonable, but in
// principle we might have hit something like concurrent API calls to the same tenant,
// which is not a 400 but a 409.
.map_err(ApiError::BadRequest)?;
json_response(StatusCode::OK, ())
}
/// Testing helper to transition a tenant to [`crate::tenant::TenantState::Broken`].
async fn handle_tenant_break(
r: Request<Body>,
@@ -1510,9 +1464,6 @@ pub fn make_router(
.get("/v1/tenant/:tenant_id/config", |r| {
api_handler(r, get_tenant_config_handler)
})
.put("/v1/tenant/:tenant_id/location_config", |r| {
api_handler(r, put_tenant_location_config_handler)
})
.get("/v1/tenant/:tenant_id/timeline", |r| {
api_handler(r, timeline_list_handler)
})

View File

@@ -112,10 +112,6 @@ pub const METADATA_FILE_NAME: &str = "metadata";
/// Full path: `tenants/<tenant_id>/config`.
pub const TENANT_CONFIG_NAME: &str = "config";
/// Per-tenant configuration file.
/// Full path: `tenants/<tenant_id>/config`.
pub const TENANT_LOCATION_CONFIG_NAME: &str = "config-v1";
/// A suffix used for various temporary files. Any temporary files found in the
/// data directory at pageserver startup can be automatically removed.
pub const TEMP_FILE_SUFFIX: &str = "___temp";

View File

@@ -967,7 +967,6 @@ pub(crate) struct DeletionQueueMetrics {
pub(crate) keys_submitted: IntCounter,
pub(crate) keys_dropped: IntCounter,
pub(crate) keys_executed: IntCounter,
pub(crate) keys_validated: IntCounter,
pub(crate) dropped_lsn_updates: IntCounter,
pub(crate) unexpected_errors: IntCounter,
pub(crate) remote_errors: IntCounterVec,
@@ -989,13 +988,7 @@ pub(crate) static DELETION_QUEUE: Lazy<DeletionQueueMetrics> = Lazy::new(|| {
keys_executed: register_int_counter!(
"pageserver_deletion_queue_executed_total",
"Number of objects deleted. Only includes objects that we actually deleted, sum with pageserver_deletion_queue_dropped_total for the total number of keys processed to completion"
)
.expect("failed to define a metric"),
keys_validated: register_int_counter!(
"pageserver_deletion_queue_validated_total",
"Number of keys validated for deletion. Sum with pageserver_deletion_queue_dropped_total for the total number of keys that have passed through the validation stage."
"Number of objects deleted. Only includes objects that we actually deleted, sum with pageserver_deletion_queue_dropped_total for the total number of keys processed."
)
.expect("failed to define a metric"),

View File

@@ -44,8 +44,6 @@ use std::sync::MutexGuard;
use std::sync::{Mutex, RwLock};
use std::time::{Duration, Instant};
use self::config::AttachedLocationConfig;
use self::config::LocationConf;
use self::config::TenantConf;
use self::delete::DeleteTenantFlow;
use self::metadata::LoadMetadataError;
@@ -66,7 +64,6 @@ use crate::metrics::{remove_tenant_metrics, TENANT_STATE_METRIC, TENANT_SYNTHETI
use crate::repository::GcResult;
use crate::task_mgr;
use crate::task_mgr::TaskKind;
use crate::tenant::config::LocationMode;
use crate::tenant::config::TenantConfOpt;
use crate::tenant::metadata::load_metadata;
pub use crate::tenant::remote_timeline_client::index::IndexPart;
@@ -163,28 +160,6 @@ pub struct TenantSharedResources {
pub deletion_queue_client: DeletionQueueClient,
}
/// A [`Tenant`] is really an _attached_ tenant. The configuration
/// for an attached tenant is a subset of the [`LocationConf`], represented
/// in this struct.
pub(super) struct AttachedTenantConf {
tenant_conf: TenantConfOpt,
location: AttachedLocationConfig,
}
impl AttachedTenantConf {
fn try_from(location_conf: LocationConf) -> anyhow::Result<Self> {
match &location_conf.mode {
LocationMode::Attached(attach_conf) => Ok(Self {
tenant_conf: location_conf.tenant_conf,
location: attach_conf.clone(),
}),
LocationMode::Secondary(_) => {
anyhow::bail!("Attempted to construct AttachedTenantConf from a LocationConf in secondary mode")
}
}
}
}
///
/// Tenant consists of multiple timelines. Keep them in a hash table.
///
@@ -202,15 +177,12 @@ pub struct Tenant {
// We keep TenantConfOpt sturct here to preserve the information
// about parameters that are not set.
// This is necessary to allow global config updates.
tenant_conf: Arc<RwLock<AttachedTenantConf>>,
tenant_conf: Arc<RwLock<TenantConfOpt>>,
tenant_id: TenantId,
/// The remote storage generation, used to protect S3 objects from split-brain.
/// Does not change over the lifetime of the [`Tenant`] object.
///
/// This duplicates the generation stored in LocationConf, but that structure is mutable:
/// this copy enforces the invariant that generatio doesn't change during a Tenant's lifetime.
generation: Generation,
timelines: Mutex<HashMap<TimelineId, Arc<Timeline>>>,
@@ -554,13 +526,14 @@ impl Tenant {
pub(crate) fn spawn_attach(
conf: &'static PageServerConf,
tenant_id: TenantId,
generation: Generation,
resources: TenantSharedResources,
attached_conf: AttachedTenantConf,
tenants: &'static tokio::sync::RwLock<TenantsMap>,
ctx: &RequestContext,
) -> anyhow::Result<Arc<Tenant>> {
// TODO dedup with spawn_load
let wal_redo_manager = Arc::new(PostgresRedoManager::new(conf, tenant_id));
let tenant_conf =
Self::load_tenant_config(conf, &tenant_id).context("load tenant config")?;
let TenantSharedResources {
broker_client,
@@ -568,12 +541,14 @@ impl Tenant {
deletion_queue_client,
} = resources;
let wal_redo_manager = Arc::new(PostgresRedoManager::new(conf, tenant_id));
let tenant = Arc::new(Tenant::new(
TenantState::Attaching,
conf,
attached_conf,
tenant_conf,
wal_redo_manager,
tenant_id,
generation,
remote_storage.clone(),
deletion_queue_client,
));
@@ -884,9 +859,10 @@ impl Tenant {
backtrace: String::new(),
},
conf,
AttachedTenantConf::try_from(LocationConf::default()).unwrap(),
TenantConfOpt::default(),
wal_redo_manager,
tenant_id,
Generation::broken(),
None,
DeletionQueueClient::broken(),
))
@@ -905,7 +881,7 @@ impl Tenant {
pub(crate) fn spawn_load(
conf: &'static PageServerConf,
tenant_id: TenantId,
attached_conf: AttachedTenantConf,
generation: Generation,
resources: TenantSharedResources,
init_order: Option<InitializationOrder>,
tenants: &'static tokio::sync::RwLock<TenantsMap>,
@@ -913,6 +889,14 @@ impl Tenant {
) -> Arc<Tenant> {
span::debug_assert_current_span_has_tenant_id();
let tenant_conf = match Self::load_tenant_config(conf, &tenant_id) {
Ok(conf) => conf,
Err(e) => {
error!("load tenant config failed: {:?}", e);
return Tenant::create_broken_tenant(conf, tenant_id, format!("{e:#}"));
}
};
let broker_client = resources.broker_client;
let remote_storage = resources.remote_storage;
@@ -920,9 +904,10 @@ impl Tenant {
let tenant = Tenant::new(
TenantState::Loading,
conf,
attached_conf,
tenant_conf,
wal_redo_manager,
tenant_id,
generation,
remote_storage.clone(),
resources.deletion_queue_client.clone(),
);
@@ -1661,15 +1646,6 @@ impl Tenant {
"Cannot run GC iteration on inactive tenant"
);
{
let conf = self.tenant_conf.read().unwrap();
if !conf.location.may_delete_layers_hint() {
info!("Skipping GC in location state {:?}", conf.location);
return Ok(GcResult::default());
}
}
self.gc_iteration_internal(target_timeline_id, horizon, pitr, ctx)
.await
}
@@ -1688,14 +1664,6 @@ impl Tenant {
"Cannot run compaction iteration on inactive tenant"
);
{
let conf = self.tenant_conf.read().unwrap();
if !conf.location.may_delete_layers_hint() || !conf.location.may_upload_layers_hint() {
info!("Skipping compaction in location state {:?}", conf.location);
return Ok(());
}
}
// Scan through the hashmap and collect a list of all the timelines,
// while holding the lock. Then drop the lock and actually perform the
// compactions. We don't want to block everything else while the
@@ -2121,7 +2089,7 @@ where
impl Tenant {
pub fn tenant_specific_overrides(&self) -> TenantConfOpt {
self.tenant_conf.read().unwrap().tenant_conf
*self.tenant_conf.read().unwrap()
}
pub fn effective_config(&self) -> TenantConf {
@@ -2130,95 +2098,84 @@ impl Tenant {
}
pub fn get_checkpoint_distance(&self) -> u64 {
let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
let tenant_conf = self.tenant_conf.read().unwrap();
tenant_conf
.checkpoint_distance
.unwrap_or(self.conf.default_tenant_conf.checkpoint_distance)
}
pub fn get_checkpoint_timeout(&self) -> Duration {
let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
let tenant_conf = self.tenant_conf.read().unwrap();
tenant_conf
.checkpoint_timeout
.unwrap_or(self.conf.default_tenant_conf.checkpoint_timeout)
}
pub fn get_compaction_target_size(&self) -> u64 {
let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
let tenant_conf = self.tenant_conf.read().unwrap();
tenant_conf
.compaction_target_size
.unwrap_or(self.conf.default_tenant_conf.compaction_target_size)
}
pub fn get_compaction_period(&self) -> Duration {
let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
let tenant_conf = self.tenant_conf.read().unwrap();
tenant_conf
.compaction_period
.unwrap_or(self.conf.default_tenant_conf.compaction_period)
}
pub fn get_compaction_threshold(&self) -> usize {
let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
let tenant_conf = self.tenant_conf.read().unwrap();
tenant_conf
.compaction_threshold
.unwrap_or(self.conf.default_tenant_conf.compaction_threshold)
}
pub fn get_gc_horizon(&self) -> u64 {
let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
let tenant_conf = self.tenant_conf.read().unwrap();
tenant_conf
.gc_horizon
.unwrap_or(self.conf.default_tenant_conf.gc_horizon)
}
pub fn get_gc_period(&self) -> Duration {
let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
let tenant_conf = self.tenant_conf.read().unwrap();
tenant_conf
.gc_period
.unwrap_or(self.conf.default_tenant_conf.gc_period)
}
pub fn get_image_creation_threshold(&self) -> usize {
let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
let tenant_conf = self.tenant_conf.read().unwrap();
tenant_conf
.image_creation_threshold
.unwrap_or(self.conf.default_tenant_conf.image_creation_threshold)
}
pub fn get_pitr_interval(&self) -> Duration {
let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
let tenant_conf = self.tenant_conf.read().unwrap();
tenant_conf
.pitr_interval
.unwrap_or(self.conf.default_tenant_conf.pitr_interval)
}
pub fn get_trace_read_requests(&self) -> bool {
let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
let tenant_conf = self.tenant_conf.read().unwrap();
tenant_conf
.trace_read_requests
.unwrap_or(self.conf.default_tenant_conf.trace_read_requests)
}
pub fn get_min_resident_size_override(&self) -> Option<u64> {
let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
let tenant_conf = self.tenant_conf.read().unwrap();
tenant_conf
.min_resident_size_override
.or(self.conf.default_tenant_conf.min_resident_size_override)
}
pub fn set_new_tenant_config(&self, new_tenant_conf: TenantConfOpt) {
self.tenant_conf.write().unwrap().tenant_conf = new_tenant_conf;
// Don't hold self.timelines.lock() during the notifies.
// There's no risk of deadlock right now, but there could be if we consolidate
// mutexes in struct Timeline in the future.
let timelines = self.list_timelines();
for timeline in timelines {
timeline.tenant_conf_updated();
}
}
pub(crate) fn set_new_location_config(&self, new_conf: AttachedTenantConf) {
*self.tenant_conf.write().unwrap() = new_conf;
*self.tenant_conf.write().unwrap() = new_tenant_conf;
// Don't hold self.timelines.lock() during the notifies.
// There's no risk of deadlock right now, but there could be if we consolidate
// mutexes in struct Timeline in the future.
@@ -2288,9 +2245,10 @@ impl Tenant {
fn new(
state: TenantState,
conf: &'static PageServerConf,
attached_conf: AttachedTenantConf,
tenant_conf: TenantConfOpt,
walredo_mgr: Arc<dyn WalRedoManager + Send + Sync>,
tenant_id: TenantId,
generation: Generation,
remote_storage: Option<GenericRemoteStorage>,
deletion_queue_client: DeletionQueueClient,
) -> Tenant {
@@ -2350,12 +2308,12 @@ impl Tenant {
Tenant {
tenant_id,
generation: attached_conf.location.generation,
generation,
conf,
// using now here is good enough approximation to catch tenants with really long
// activation times.
loading_started_at: Instant::now(),
tenant_conf: Arc::new(RwLock::new(attached_conf)),
tenant_conf: Arc::new(RwLock::new(tenant_conf)),
timelines: Mutex::new(HashMap::new()),
gc_cs: tokio::sync::Mutex::new(()),
walredo_mgr,
@@ -2373,123 +2331,52 @@ impl Tenant {
pub(super) fn load_tenant_config(
conf: &'static PageServerConf,
tenant_id: &TenantId,
) -> anyhow::Result<LocationConf> {
let legacy_config_path = conf.tenant_config_path(tenant_id);
let config_path = conf.tenant_location_config_path(tenant_id);
) -> anyhow::Result<TenantConfOpt> {
let target_config_path = conf.tenant_config_path(tenant_id);
if config_path.exists() {
// New-style config takes precedence
let deserialized = Self::read_config(&config_path)?;
Ok(toml_edit::de::from_document::<LocationConf>(deserialized)?)
} else if legacy_config_path.exists() {
// Upgrade path: found an old-style configuration only
let deserialized = Self::read_config(&legacy_config_path)?;
info!("loading tenantconf from {target_config_path}");
let mut tenant_conf = TenantConfOpt::default();
for (key, item) in deserialized.iter() {
match key {
"tenant_config" => {
tenant_conf = PageServerConf::parse_toml_tenant_conf(item).with_context(|| {
format!("Failed to parse config from file '{legacy_config_path}' as pageserver config")
})?;
}
_ => bail!(
"config file {legacy_config_path} has unrecognized pageserver option '{key}'"
),
}
}
// Legacy configs are implicitly in attached state
Ok(LocationConf::attached_single(
tenant_conf,
Generation::none(),
))
} else {
// FIXME If the config file is not found, assume that we're attaching
// a detached tenant and config is passed via attach command.
// https://github.com/neondatabase/neon/issues/1555
// OR: we're loading after incomplete deletion that managed to remove config.
info!(
"tenant config not found in {} or {}",
config_path, legacy_config_path
);
Ok(LocationConf::default())
// FIXME If the config file is not found, assume that we're attaching
// a detached tenant and config is passed via attach command.
// https://github.com/neondatabase/neon/issues/1555
// OR: we're loading after incomplete deletion that managed to remove config.
if !target_config_path.exists() {
info!("tenant config not found in {target_config_path}");
return Ok(TenantConfOpt::default());
}
}
fn read_config(path: &Utf8Path) -> anyhow::Result<toml_edit::Document> {
info!("loading tenant configuration from {path}");
// load and parse file
let config = fs::read_to_string(path)
.with_context(|| format!("Failed to load config from path '{path}'"))?;
let config = fs::read_to_string(&target_config_path)
.with_context(|| format!("Failed to load config from path '{target_config_path}'"))?;
config
.parse::<toml_edit::Document>()
.with_context(|| format!("Failed to parse config from file '{path}' as toml file"))
let toml = config.parse::<toml_edit::Document>().with_context(|| {
format!("Failed to parse config from file '{target_config_path}' as toml file")
})?;
let mut tenant_conf = TenantConfOpt::default();
for (key, item) in toml.iter() {
match key {
"tenant_config" => {
tenant_conf = PageServerConf::parse_toml_tenant_conf(item).with_context(|| {
format!("Failed to parse config from file '{target_config_path}' as pageserver config")
})?;
}
_ => bail!(
"config file {target_config_path} has unrecognized pageserver option '{key}'"
),
}
}
Ok(tenant_conf)
}
#[tracing::instrument(skip_all, fields(%tenant_id))]
pub(super) async fn persist_tenant_config(
conf: &'static PageServerConf,
tenant_id: &TenantId,
location_conf: &LocationConf,
) -> anyhow::Result<()> {
let legacy_config_path = conf.tenant_config_path(tenant_id);
let config_path = conf.tenant_location_config_path(tenant_id);
Self::persist_tenant_config_at(tenant_id, &config_path, &legacy_config_path, location_conf)
.await
}
#[tracing::instrument(skip_all, fields(%tenant_id))]
pub(super) async fn persist_tenant_config_at(
tenant_id: &TenantId,
config_path: &Utf8Path,
legacy_config_path: &Utf8Path,
location_conf: &LocationConf,
) -> anyhow::Result<()> {
// Forward compat: write out an old-style configuration that old versions can read, in case we roll back
Self::persist_tenant_config_legacy(
tenant_id,
legacy_config_path,
&location_conf.tenant_conf,
)
.await?;
if let LocationMode::Attached(attach_conf) = &location_conf.mode {
// Once we use LocationMode, generations are mandatory. If we aren't using generations,
// then drop out after writing legacy-style config.
if attach_conf.generation.is_none() {
tracing::debug!("Running without generations, not writing new-style LocationConf");
return Ok(());
}
}
info!("persisting tenantconf to {config_path}");
let mut conf_content = r#"# This file contains a specific per-tenant's config.
# It is read in case of pageserver restart.
"#
.to_string();
// Convert the config to a toml file.
conf_content += &toml_edit::ser::to_string_pretty(&location_conf)?;
let conf_content = conf_content.as_bytes();
let temp_path = path_with_suffix_extension(config_path, TEMP_FILE_SUFFIX);
VirtualFile::crashsafe_overwrite(config_path, &temp_path, conf_content)
.await
.with_context(|| format!("write tenant {tenant_id} config to {config_path}"))?;
Ok(())
}
#[tracing::instrument(skip_all, fields(%tenant_id))]
async fn persist_tenant_config_legacy(
tenant_id: &TenantId,
target_config_path: &Utf8Path,
tenant_conf: &TenantConfOpt,
tenant_conf: TenantConfOpt,
) -> anyhow::Result<()> {
// imitate a try-block with a closure
info!("persisting tenantconf to {target_config_path}");
let mut conf_content = r#"# This file contains a specific per-tenant's config.
@@ -3189,7 +3076,7 @@ pub(crate) enum CreateTenantFilesMode {
pub(crate) async fn create_tenant_files(
conf: &'static PageServerConf,
location_conf: &LocationConf,
tenant_conf: TenantConfOpt,
tenant_id: &TenantId,
mode: CreateTenantFilesMode,
) -> anyhow::Result<Utf8PathBuf> {
@@ -3212,7 +3099,7 @@ pub(crate) async fn create_tenant_files(
let creation_result = try_create_target_tenant_dir(
conf,
location_conf,
tenant_conf,
tenant_id,
mode,
&temporary_tenant_dir,
@@ -3238,7 +3125,7 @@ pub(crate) async fn create_tenant_files(
async fn try_create_target_tenant_dir(
conf: &'static PageServerConf,
location_conf: &LocationConf,
tenant_conf: TenantConfOpt,
tenant_id: &TenantId,
mode: CreateTenantFilesMode,
temporary_tenant_dir: &Utf8Path,
@@ -3268,26 +3155,14 @@ async fn try_create_target_tenant_dir(
temporary_tenant_dir,
)
.with_context(|| format!("resolve tenant {tenant_id} temporary timelines dir"))?;
let temporary_legacy_tenant_config_path = rebase_directory(
let temporary_tenant_config_path = rebase_directory(
&conf.tenant_config_path(tenant_id),
target_tenant_directory,
temporary_tenant_dir,
)
.with_context(|| format!("resolve tenant {tenant_id} temporary config path"))?;
let temporary_tenant_config_path = rebase_directory(
&conf.tenant_location_config_path(tenant_id),
target_tenant_directory,
temporary_tenant_dir,
)
.with_context(|| format!("resolve tenant {tenant_id} temporary config path"))?;
Tenant::persist_tenant_config_at(
tenant_id,
&temporary_tenant_config_path,
&temporary_legacy_tenant_config_path,
location_conf,
)
.await?;
Tenant::persist_tenant_config(tenant_id, &temporary_tenant_config_path, tenant_conf).await?;
crashsafe::create_dir(&temporary_tenant_timelines_dir).with_context(|| {
format!(
@@ -3568,13 +3443,10 @@ pub mod harness {
let tenant = Arc::new(Tenant::new(
TenantState::Loading,
self.conf,
AttachedTenantConf::try_from(LocationConf::attached_single(
TenantConfOpt::from(self.tenant_conf),
self.generation,
))
.unwrap(),
TenantConfOpt::from(self.tenant_conf),
walredo_mgr,
self.tenant_id,
self.generation,
Some(self.remote_storage.clone()),
self.deletion_queue.new_client(),
));

View File

@@ -234,7 +234,10 @@ impl BlobWriter<false> {
#[cfg(test)]
mod tests {
use super::*;
use crate::{context::DownloadBehavior, task_mgr::TaskKind, tenant::block_io::BlockReaderRef};
use crate::{
context::DownloadBehavior, task_mgr::TaskKind, tenant::block_io::BlockReaderRef,
virtual_file::Error,
};
use rand::{Rng, SeedableRng};
async fn round_trip_test<const BUFFERED: bool>(blobs: &[Vec<u8>]) -> Result<(), Error> {

View File

@@ -6,7 +6,7 @@ use super::ephemeral_file::EphemeralFile;
use super::storage_layer::delta_layer::{Adapter, DeltaLayerInner};
use crate::context::RequestContext;
use crate::page_cache::{self, PageReadGuard, ReadBufResult, PAGE_SZ};
use crate::virtual_file::VirtualFile;
use crate::virtual_file::{self, VirtualFile};
use bytes::Bytes;
use std::ops::{Deref, DerefMut};
@@ -96,7 +96,7 @@ impl<'a> BlockReaderRef<'a> {
#[cfg(test)]
TestDisk(r) => r.read_blk(blknum),
#[cfg(test)]
VirtualFile(r) => r.read_blk(blknum).await,
VirtualFile(r) => r.read_blk(blknum).await.map_err(virtual_file::Error::into),
}
}
}
@@ -174,6 +174,7 @@ impl FileBlockReader {
self.file
.read_exact_at(buf, blkno as u64 * PAGE_SZ as u64)
.await
.map_err(virtual_file::Error::into)
}
/// Read a block.
///

View File

@@ -13,7 +13,6 @@ use pageserver_api::models;
use serde::{Deserialize, Serialize};
use std::num::NonZeroU64;
use std::time::Duration;
use utils::generation::Generation;
pub mod defaults {
// FIXME: This current value is very low. I would imagine something like 1 GB or 10 GB
@@ -45,211 +44,7 @@ pub mod defaults {
pub const DEFAULT_EVICTIONS_LOW_RESIDENCE_DURATION_METRIC_THRESHOLD: &str = "24 hour";
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub(crate) enum AttachmentMode {
/// Our generation is current as far as we know, and as far as we know we are the only attached
/// pageserver. This is the "normal" attachment mode.
Single,
/// Our generation number is current as far as we know, but we are advised that another
/// pageserver is still attached, and therefore to avoid executing deletions. This is
/// the attachment mode of a pagesever that is the destination of a migration.
Multi,
/// Our generation number is superseded, or about to be superseded. We are advised
/// to avoid remote storage writes if possible, and to avoid sending billing data. This
/// is the attachment mode of a pageserver that is the origin of a migration.
Stale,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub(crate) struct AttachedLocationConfig {
pub(crate) generation: Generation,
pub(crate) attach_mode: AttachmentMode,
// TODO: add a flag to override AttachmentMode's policies under
// disk pressure (i.e. unblock uploads under disk pressure in Stale
// state, unblock deletions after timeout in Multi state)
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub(crate) struct SecondaryLocationConfig {
/// If true, keep the local cache warm by polling remote storage
pub(crate) warm: bool,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub(crate) enum LocationMode {
Attached(AttachedLocationConfig),
Secondary(SecondaryLocationConfig),
}
/// Per-tenant, per-pageserver configuration. All pageservers use the same TenantConf,
/// but have distinct LocationConf.
#[derive(Clone, PartialEq, Eq, Serialize, Deserialize)]
pub(crate) struct LocationConf {
/// The location-specific part of the configuration, describes the operating
/// mode of this pageserver for this tenant.
pub(crate) mode: LocationMode,
/// The pan-cluster tenant configuration, the same on all locations
pub(crate) tenant_conf: TenantConfOpt,
}
impl std::fmt::Debug for LocationConf {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match &self.mode {
LocationMode::Attached(conf) => {
write!(
f,
"Attached {:?}, gen={:?}",
conf.attach_mode, conf.generation
)
}
LocationMode::Secondary(conf) => {
write!(f, "Secondary, warm={}", conf.warm)
}
}
}
}
impl AttachedLocationConfig {
/// Consult attachment mode to determine whether we are currently permitted
/// to delete layers. This is only advisory, not required for data safety.
/// See [`AttachmentMode`] for more context.
pub(crate) fn may_delete_layers_hint(&self) -> bool {
// TODO: add an override for disk pressure in AttachedLocationConfig,
// and respect it here.
match &self.attach_mode {
AttachmentMode::Single => true,
AttachmentMode::Multi | AttachmentMode::Stale => {
// In Multi mode we avoid doing deletions because some other
// attached pageserver might get 404 while trying to read
// a layer we delete which is still referenced in their metadata.
//
// In Stale mode, we avoid doing deletions because we expect
// that they would ultimately fail validation in the deletion
// queue due to our stale generation.
false
}
}
}
/// Whether we are currently hinted that it is worthwhile to upload layers.
/// This is only advisory, not required for data safety.
/// See [`AttachmentMode`] for more context.
pub(crate) fn may_upload_layers_hint(&self) -> bool {
// TODO: add an override for disk pressure in AttachedLocationConfig,
// and respect it here.
match &self.attach_mode {
AttachmentMode::Single | AttachmentMode::Multi => true,
AttachmentMode::Stale => {
// In Stale mode, we avoid doing uploads because we expect that
// our replacement pageserver will already have started its own
// IndexPart that will never reference layers we upload: it is
// wasteful.
false
}
}
}
}
impl LocationConf {
/// For use when loading from a legacy configuration: presence of a tenant
/// implies it is in AttachmentMode::Single, which used to be the only
/// possible state. This function should eventually be removed.
pub(crate) fn attached_single(tenant_conf: TenantConfOpt, generation: Generation) -> Self {
Self {
mode: LocationMode::Attached(AttachedLocationConfig {
generation,
attach_mode: AttachmentMode::Single,
}),
tenant_conf,
}
}
/// For use when attaching/re-attaching: update the generation stored in this
/// structure. If we were in a secondary state, promote to attached (posession
/// of a fresh generation implies this).
pub(crate) fn attach_in_generation(&mut self, generation: Generation) {
match &mut self.mode {
LocationMode::Attached(attach_conf) => {
attach_conf.generation = generation;
}
LocationMode::Secondary(_) => {
// We are promoted to attached by the control plane's re-attach response
self.mode = LocationMode::Attached(AttachedLocationConfig {
generation,
attach_mode: AttachmentMode::Single,
})
}
}
}
pub(crate) fn try_from(conf: &'_ models::LocationConfig) -> anyhow::Result<Self> {
let tenant_conf = TenantConfOpt::try_from(&conf.tenant_conf)?;
fn get_generation(conf: &'_ models::LocationConfig) -> Result<Generation, anyhow::Error> {
conf.generation
.ok_or_else(|| anyhow::anyhow!("Generation must be set when attaching"))
}
let mode = match &conf.mode {
models::LocationConfigMode::AttachedMulti => {
LocationMode::Attached(AttachedLocationConfig {
generation: get_generation(conf)?,
attach_mode: AttachmentMode::Multi,
})
}
models::LocationConfigMode::AttachedSingle => {
LocationMode::Attached(AttachedLocationConfig {
generation: get_generation(conf)?,
attach_mode: AttachmentMode::Single,
})
}
models::LocationConfigMode::AttachedStale => {
LocationMode::Attached(AttachedLocationConfig {
generation: get_generation(conf)?,
attach_mode: AttachmentMode::Stale,
})
}
models::LocationConfigMode::Secondary => {
anyhow::ensure!(conf.generation.is_none());
let warm = conf
.secondary_conf
.as_ref()
.map(|c| c.warm)
.unwrap_or(false);
LocationMode::Secondary(SecondaryLocationConfig { warm })
}
models::LocationConfigMode::Detached => {
// Should not have been called: API code should translate this mode
// into a detach rather than trying to decode it as a LocationConf
return Err(anyhow::anyhow!("Cannot decode a Detached configuration"));
}
};
Ok(Self { mode, tenant_conf })
}
}
impl Default for LocationConf {
// TODO: this should be removed once tenant loading can guarantee that we are never
// loading from a directory without a configuration.
// => tech debt since https://github.com/neondatabase/neon/issues/1555
fn default() -> Self {
Self {
mode: LocationMode::Attached(AttachedLocationConfig {
generation: Generation::none(),
attach_mode: AttachmentMode::Single,
}),
tenant_conf: TenantConfOpt::default(),
}
}
}
/// A tenant's calcuated configuration, which is the result of merging a
/// tenant's TenantConfOpt with the global TenantConf from PageServerConf.
///
/// For storing and transmitting individual tenant's configuration, see
/// TenantConfOpt.
/// Per-tenant configuration options
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub struct TenantConf {
// Flush out an inmemory layer, if it's holding WAL older than this

View File

@@ -197,7 +197,6 @@ async fn cleanup_remaining_fs_traces(
};
rm(conf.tenant_config_path(tenant_id), false).await?;
rm(conf.tenant_location_config_path(tenant_id), false).await?;
fail::fail_point!("tenant-delete-before-remove-timelines-dir", |_| {
Err(anyhow::anyhow!(

View File

@@ -24,11 +24,9 @@ use crate::control_plane_client::{
};
use crate::deletion_queue::DeletionQueueClient;
use crate::task_mgr::{self, TaskKind};
use crate::tenant::config::{LocationConf, LocationMode, TenantConfOpt};
use crate::tenant::config::TenantConfOpt;
use crate::tenant::delete::DeleteTenantFlow;
use crate::tenant::{
create_tenant_files, AttachedTenantConf, CreateTenantFilesMode, Tenant, TenantState,
};
use crate::tenant::{create_tenant_files, CreateTenantFilesMode, Tenant, TenantState};
use crate::{InitializationOrder, IGNORED_TENANT_FILE_NAME, TEMP_FILE_SUFFIX};
use utils::crashsafe::path_with_suffix_extension;
@@ -40,39 +38,6 @@ use super::delete::DeleteTenantError;
use super::timeline::delete::DeleteTimelineFlow;
use super::TenantSharedResources;
/// For a tenant that appears in TenantsMap, it may either be
/// - `Attached`: has a full Tenant object, is elegible to service
/// reads and ingest WAL.
/// - `Secondary`: is only keeping a local cache warm.
///
/// Secondary is a totally distinct state rather than being a mode of a `Tenant`, because
/// that way we avoid having to carefully switch a tenant's ingestion etc on and off during
/// its lifetime, and we can preserve some important safety invariants like `Tenant` always
/// having a properly acquired generation (Secondary doesn't need a generation)
#[derive(Clone)]
pub enum TenantSlot {
Attached(Arc<Tenant>),
Secondary,
}
impl TenantSlot {
/// Return the `Tenant` in this slot if attached, else None
fn get_attached(&self) -> Option<&Arc<Tenant>> {
match self {
Self::Attached(t) => Some(t),
Self::Secondary => None,
}
}
/// Consume self and return the `Tenant` that was in this slot if attached, else None
fn into_attached(self) -> Option<Arc<Tenant>> {
match self {
Self::Attached(t) => Some(t),
Self::Secondary => None,
}
}
}
/// The tenants known to the pageserver.
/// The enum variants are used to distinguish the different states that the pageserver can be in.
pub(crate) enum TenantsMap {
@@ -80,27 +45,14 @@ pub(crate) enum TenantsMap {
Initializing,
/// [`init_tenant_mgr`] is done, all on-disk tenants have been loaded.
/// New tenants can be added using [`tenant_map_insert`].
Open(HashMap<TenantId, TenantSlot>),
Open(HashMap<TenantId, Arc<Tenant>>),
/// The pageserver has entered shutdown mode via [`shutdown_all_tenants`].
/// Existing tenants are still accessible, but no new tenants can be created.
ShuttingDown(HashMap<TenantId, TenantSlot>),
ShuttingDown(HashMap<TenantId, Arc<Tenant>>),
}
impl TenantsMap {
/// Convenience function for typical usage, where we want to get a `Tenant` object, for
/// working with attached tenants. If the TenantId is in the map but in Secondary state,
/// None is returned.
pub(crate) fn get(&self, tenant_id: &TenantId) -> Option<&Arc<Tenant>> {
match self {
TenantsMap::Initializing => None,
TenantsMap::Open(m) | TenantsMap::ShuttingDown(m) => {
m.get(tenant_id).and_then(TenantSlot::get_attached)
}
}
}
/// Get the contents of the map at this tenant ID, even if it is in secondary state.
pub(crate) fn get_slot(&self, tenant_id: &TenantId) -> Option<&TenantSlot> {
match self {
TenantsMap::Initializing => None,
TenantsMap::Open(m) | TenantsMap::ShuttingDown(m) => m.get(tenant_id),
@@ -109,9 +61,7 @@ impl TenantsMap {
pub(crate) fn remove(&mut self, tenant_id: &TenantId) -> Option<Arc<Tenant>> {
match self {
TenantsMap::Initializing => None,
TenantsMap::Open(m) | TenantsMap::ShuttingDown(m) => {
m.remove(tenant_id).and_then(TenantSlot::into_attached)
}
TenantsMap::Open(m) | TenantsMap::ShuttingDown(m) => m.remove(tenant_id),
}
}
}
@@ -255,59 +205,19 @@ pub async fn init_tenant_mgr(
}
};
// Try loading the location configuration
let mut location_conf = match Tenant::load_tenant_config(conf, &tenant_id)
.context("load tenant config")
{
Ok(c) => c,
Err(e) => {
warn!("Marking tenant broken, failed to {e:#}");
tenants.insert(
tenant_id,
TenantSlot::Attached(Tenant::create_broken_tenant(
conf,
tenant_id,
"error loading tenant location configuration".to_string(),
)),
);
continue;
}
};
let generation = if let Some(generations) = &tenant_generations {
// We have a generation map: treat it as the authority for whether
// this tenant is really attached.
if let Some(gen) = generations.get(&tenant_id) {
*gen
} else {
match &location_conf.mode {
LocationMode::Secondary(_) => {
// We do not require the control plane's permission for secondary mode
// tenants, because they do no remote writes and hence require no
// generation number
info!("Loaded tenant {tenant_id} in secondary mode");
tenants.insert(tenant_id, TenantSlot::Secondary);
}
LocationMode::Attached(_) => {
// TODO: augment re-attach API to enable the control plane to
// instruct us about secondary attachments. That way, instead of throwing
// away local state, we can gracefully fall back to secondary here, if the control
// plane tells us so.
// (https://github.com/neondatabase/neon/issues/5377)
info!("Detaching tenant {tenant_id}, control plane omitted it in re-attach response");
if let Err(e) =
safe_remove_tenant_dir_all(&tenant_dir_path).await
{
error!(
"Failed to remove detached tenant directory '{}': {:?}",
tenant_dir_path, e
);
}
}
};
info!("Detaching tenant {tenant_id}, control plane omitted it in re-attach response");
if let Err(e) = safe_remove_tenant_dir_all(&tenant_dir_path).await {
error!(
"Failed to remove detached tenant directory '{}': {:?}",
tenant_dir_path, e
);
}
continue;
}
} else {
@@ -320,23 +230,18 @@ pub async fn init_tenant_mgr(
Generation::none()
};
// Presence of a generation number implies attachment: attach the tenant
// if it wasn't already, and apply the generation number.
location_conf.attach_in_generation(generation);
Tenant::persist_tenant_config(conf, &tenant_id, &location_conf).await?;
match schedule_local_tenant_processing(
conf,
tenant_id,
&tenant_dir_path,
AttachedTenantConf::try_from(location_conf)?,
generation,
resources.clone(),
Some(init_order.clone()),
&TENANTS,
&ctx,
) {
Ok(tenant) => {
tenants.insert(tenant.tenant_id(), TenantSlot::Attached(tenant));
tenants.insert(tenant.tenant_id(), tenant);
}
Err(e) => {
error!("Failed to collect tenant files from dir {tenants_dir:?} for entry {dir_entry:?}, reason: {e:#}");
@@ -368,7 +273,7 @@ pub(crate) fn schedule_local_tenant_processing(
conf: &'static PageServerConf,
tenant_id: TenantId,
tenant_path: &Utf8Path,
location_conf: AttachedTenantConf,
generation: Generation,
resources: TenantSharedResources,
init_order: Option<InitializationOrder>,
tenants: &'static tokio::sync::RwLock<TenantsMap>,
@@ -405,7 +310,7 @@ pub(crate) fn schedule_local_tenant_processing(
"attaching mark file present but no remote storage configured".to_string(),
)
} else {
match Tenant::spawn_attach(conf, tenant_id, resources, location_conf, tenants, ctx) {
match Tenant::spawn_attach(conf, tenant_id, generation, resources, tenants, ctx) {
Ok(tenant) => tenant,
Err(e) => {
error!("Failed to spawn_attach tenant {tenant_id}, reason: {e:#}");
@@ -417,13 +322,7 @@ pub(crate) fn schedule_local_tenant_processing(
info!("tenant {tenant_id} is assumed to be loadable, starting load operation");
// Start loading the tenant into memory. It will initially be in Loading state.
Tenant::spawn_load(
conf,
tenant_id,
location_conf,
resources,
init_order,
tenants,
ctx,
conf, tenant_id, generation, resources, init_order, tenants, ctx,
)
};
Ok(tenant)
@@ -479,16 +378,7 @@ async fn shutdown_all_tenants0(tenants: &tokio::sync::RwLock<TenantsMap>) {
let res = {
let (_guard, shutdown_progress) = completion::channel();
match tenant {
TenantSlot::Attached(t) => {
t.shutdown(shutdown_progress, freeze_and_flush).await
}
TenantSlot::Secondary => {
// TODO: once secondary mode downloads are implemented,
// ensure they have all stopped before we reach this point.
Ok(())
}
}
tenant.shutdown(shutdown_progress, freeze_and_flush).await
};
if let Err(other_progress) = res {
@@ -561,19 +451,16 @@ pub async fn create_tenant(
ctx: &RequestContext,
) -> Result<Arc<Tenant>, TenantMapInsertError> {
tenant_map_insert(tenant_id, || async {
let location_conf = LocationConf::attached_single(tenant_conf, generation);
// We're holding the tenants lock in write mode while doing local IO.
// If this section ever becomes contentious, introduce a new `TenantState::Creating`
// and do the work in that state.
let tenant_directory = super::create_tenant_files(conf, &location_conf, &tenant_id, CreateTenantFilesMode::Create).await?;
let tenant_directory = super::create_tenant_files(conf, tenant_conf, &tenant_id, CreateTenantFilesMode::Create).await?;
// TODO: tenant directory remains on disk if we bail out from here on.
// See https://github.com/neondatabase/neon/issues/4233
let created_tenant =
schedule_local_tenant_processing(conf, tenant_id, &tenant_directory,
AttachedTenantConf::try_from(location_conf)?, resources, None, &TENANTS, ctx)?;
generation, resources, None, &TENANTS, ctx)?;
// TODO: tenant object & its background loops remain, untracked in tenant map, if we fail here.
// See https://github.com/neondatabase/neon/issues/4233
@@ -602,126 +489,14 @@ pub async fn set_new_tenant_config(
info!("configuring tenant {tenant_id}");
let tenant = get_tenant(tenant_id, true).await?;
// This is a legacy API that only operates on attached tenants: the preferred
// API to use is the location_config/ endpoint, which lets the caller provide
// the full LocationConf.
let location_conf = LocationConf::attached_single(new_tenant_conf, tenant.generation);
Tenant::persist_tenant_config(conf, &tenant_id, &location_conf)
let tenant_config_path = conf.tenant_config_path(&tenant_id);
Tenant::persist_tenant_config(&tenant_id, &tenant_config_path, new_tenant_conf)
.await
.map_err(SetNewTenantConfigError::Persist)?;
tenant.set_new_tenant_config(new_tenant_conf);
Ok(())
}
#[instrument(skip_all, fields(tenant_id, new_location_config))]
pub(crate) async fn upsert_location(
conf: &'static PageServerConf,
tenant_id: TenantId,
new_location_config: LocationConf,
broker_client: storage_broker::BrokerClientChannel,
remote_storage: Option<GenericRemoteStorage>,
deletion_queue_client: DeletionQueueClient,
ctx: &RequestContext,
) -> Result<(), anyhow::Error> {
info!("configuring tenant location {tenant_id} to state {new_location_config:?}");
let mut existing_tenant = match get_tenant(tenant_id, false).await {
Ok(t) => Some(t),
Err(GetTenantError::NotFound(_)) => None,
Err(e) => anyhow::bail!(e),
};
// If we need to shut down a Tenant, do that first
let shutdown_tenant = match (&new_location_config.mode, &existing_tenant) {
(LocationMode::Secondary(_), Some(t)) => Some(t),
(LocationMode::Attached(attach_conf), Some(t)) => {
if attach_conf.generation != t.generation {
Some(t)
} else {
None
}
}
_ => None,
};
// TODO: currently we risk concurrent operations interfering with the tenant
// while we await shutdown, but we also should not hold the TenantsMap lock
// across the whole operation. Before we start using this function in production,
// a follow-on change will revise how concurrency is handled in TenantsMap.
// (https://github.com/neondatabase/neon/issues/5378)
if let Some(tenant) = shutdown_tenant {
let (_guard, progress) = utils::completion::channel();
info!("Shutting down attached tenant");
match tenant.shutdown(progress, false).await {
Ok(()) => {}
Err(barrier) => {
info!("Shutdown already in progress, waiting for it to complete");
barrier.wait().await;
}
}
existing_tenant = None;
}
if let Some(tenant) = existing_tenant {
// Update the existing tenant
Tenant::persist_tenant_config(conf, &tenant_id, &new_location_config)
.await
.map_err(SetNewTenantConfigError::Persist)?;
tenant.set_new_location_config(AttachedTenantConf::try_from(new_location_config)?);
} else {
// Upsert a fresh TenantSlot into TenantsMap. Do it within the map write lock,
// and re-check that the state of anything we are replacing is as expected.
tenant_map_upsert_slot(tenant_id, |old_value| async move {
if let Some(TenantSlot::Attached(t)) = old_value {
if !matches!(t.current_state(), TenantState::Stopping { .. }) {
anyhow::bail!("Tenant state changed during location configuration update");
}
}
let new_slot = match &new_location_config.mode {
LocationMode::Secondary(_) => TenantSlot::Secondary,
LocationMode::Attached(_attach_config) => {
// Do a schedule_local_tenant_processing
// FIXME: should avoid doing this disk I/O inside the TenantsMap lock,
// we have the same problem in load_tenant/attach_tenant. Probably
// need a lock in TenantSlot to fix this.
Tenant::persist_tenant_config(conf, &tenant_id, &new_location_config)
.await
.map_err(SetNewTenantConfigError::Persist)?;
let tenant_path = conf.tenant_path(&tenant_id);
let resources = TenantSharedResources {
broker_client,
remote_storage,
deletion_queue_client,
};
let new_tenant = schedule_local_tenant_processing(
conf,
tenant_id,
&tenant_path,
AttachedTenantConf::try_from(new_location_config)?,
resources,
None,
&TENANTS,
ctx,
)
.with_context(|| {
format!("Failed to schedule tenant processing in path {tenant_path:?}")
})?;
TenantSlot::Attached(new_tenant)
}
};
Ok(new_slot)
})
.await?;
}
Ok(())
}
#[derive(Debug, thiserror::Error)]
pub enum GetTenantError {
#[error("Tenant {0} not found")]
@@ -882,12 +657,7 @@ pub async fn load_tenant(
remote_storage,
deletion_queue_client
};
let mut location_conf = Tenant::load_tenant_config(conf, &tenant_id).map_err( TenantMapInsertError::Other)?;
location_conf.attach_in_generation(generation);
Tenant::persist_tenant_config(conf, &tenant_id, &location_conf).await?;
let new_tenant = schedule_local_tenant_processing(conf, tenant_id, &tenant_path, AttachedTenantConf::try_from(location_conf)?, resources, None, &TENANTS, ctx)
let new_tenant = schedule_local_tenant_processing(conf, tenant_id, &tenant_path, generation, resources, None, &TENANTS, ctx)
.with_context(|| {
format!("Failed to schedule tenant processing in path {tenant_path:?}")
})?;
@@ -940,10 +710,7 @@ pub async fn list_tenants() -> Result<Vec<(TenantId, TenantState)>, TenantMapLis
TenantsMap::Open(m) | TenantsMap::ShuttingDown(m) => m,
};
Ok(m.iter()
.filter_map(|(id, tenant)| match tenant {
TenantSlot::Attached(tenant) => Some((*id, tenant.current_state())),
TenantSlot::Secondary => None,
})
.map(|(id, tenant)| (*id, tenant.current_state()))
.collect())
}
@@ -960,8 +727,7 @@ pub async fn attach_tenant(
ctx: &RequestContext,
) -> Result<(), TenantMapInsertError> {
tenant_map_insert(tenant_id, || async {
let location_conf = LocationConf::attached_single(tenant_conf, generation);
let tenant_dir = create_tenant_files(conf, &location_conf, &tenant_id, CreateTenantFilesMode::Attach).await?;
let tenant_dir = create_tenant_files(conf, tenant_conf, &tenant_id, CreateTenantFilesMode::Attach).await?;
// TODO: tenant directory remains on disk if we bail out from here on.
// See https://github.com/neondatabase/neon/issues/4233
@@ -972,7 +738,8 @@ pub async fn attach_tenant(
.context("check for attach marker file existence")?;
anyhow::ensure!(marker_file_exists, "create_tenant_files should have created the attach marker file");
let attached_tenant = schedule_local_tenant_processing(conf, tenant_id, &tenant_dir, AttachedTenantConf::try_from(location_conf)?, resources, None, &TENANTS, ctx)?;
let attached_tenant = schedule_local_tenant_processing(conf, tenant_id, &tenant_dir, generation, resources, None, &TENANTS, ctx)?;
// TODO: tenant object & its background loops remain, untracked in tenant map, if we fail here.
// See https://github.com/neondatabase/neon/issues/4233
@@ -995,10 +762,8 @@ pub enum TenantMapInsertError {
ShuttingDown,
#[error("tenant {0} already exists, state: {1:?}")]
TenantAlreadyExists(TenantId, TenantState),
#[error("tenant {0} already exists in secondary state")]
TenantExistsSecondary(TenantId),
#[error(transparent)]
Other(#[from] anyhow::Error),
Closure(#[from] anyhow::Error),
}
/// Give the given closure access to the tenants map entry for the given `tenant_id`, iff that
@@ -1022,47 +787,20 @@ where
TenantsMap::Open(m) => m,
};
match m.entry(tenant_id) {
hash_map::Entry::Occupied(e) => match e.get() {
TenantSlot::Attached(t) => Err(TenantMapInsertError::TenantAlreadyExists(
tenant_id,
t.current_state(),
)),
TenantSlot::Secondary => Err(TenantMapInsertError::TenantExistsSecondary(tenant_id)),
},
hash_map::Entry::Occupied(e) => Err(TenantMapInsertError::TenantAlreadyExists(
tenant_id,
e.get().current_state(),
)),
hash_map::Entry::Vacant(v) => match insert_fn().await {
Ok(tenant) => {
v.insert(TenantSlot::Attached(tenant.clone()));
v.insert(tenant.clone());
Ok(tenant)
}
Err(e) => Err(TenantMapInsertError::Other(e)),
Err(e) => Err(TenantMapInsertError::Closure(e)),
},
}
}
async fn tenant_map_upsert_slot<'a, F, R>(
tenant_id: TenantId,
upsert_fn: F,
) -> Result<(), TenantMapInsertError>
where
F: FnOnce(Option<TenantSlot>) -> R,
R: std::future::Future<Output = anyhow::Result<TenantSlot>>,
{
let mut guard = TENANTS.write().await;
let m = match &mut *guard {
TenantsMap::Initializing => return Err(TenantMapInsertError::StillInitializing),
TenantsMap::ShuttingDown(_) => return Err(TenantMapInsertError::ShuttingDown),
TenantsMap::Open(m) => m,
};
match upsert_fn(m.remove(&tenant_id)).await {
Ok(upsert_val) => {
m.insert(tenant_id, upsert_val);
Ok(())
}
Err(e) => Err(TenantMapInsertError::Other(e)),
}
}
/// Stops and removes the tenant from memory, if it's not [`TenantState::Stopping`] already, bails otherwise.
/// Allows to remove other tenant resources manually, via `tenant_cleanup`.
/// If the cleanup fails, tenant will stay in memory in [`TenantState::Broken`] state, and another removal
@@ -1082,40 +820,28 @@ where
// tenant-wde cleanup operations may take some time (removing the entire tenant directory), we want to
// avoid holding the lock for the entire process.
let tenant = {
match tenants
tenants
.write()
.await
.get_slot(&tenant_id)
.get(&tenant_id)
.cloned()
.ok_or(TenantStateError::NotFound(tenant_id))?
{
TenantSlot::Attached(t) => Some(t.clone()),
TenantSlot::Secondary => None,
}
};
// allow pageserver shutdown to await for our completion
let (_guard, progress) = completion::channel();
// If the tenant was attached, shut it down gracefully. For secondary
// locations this part is not necessary
match tenant {
Some(attached_tenant) => {
// whenever we remove a tenant from memory, we don't want to flush and wait for upload
let freeze_and_flush = false;
// whenever we remove a tenant from memory, we don't want to flush and wait for upload
let freeze_and_flush = false;
// shutdown is sure to transition tenant to stopping, and wait for all tasks to complete, so
// that we can continue safely to cleanup.
match attached_tenant.shutdown(progress, freeze_and_flush).await {
Ok(()) => {}
Err(_other) => {
// if pageserver shutdown or other detach/ignore is already ongoing, we don't want to
// wait for it but return an error right away because these are distinct requests.
return Err(TenantStateError::IsStopping(tenant_id));
}
}
}
None => {
// Nothing to wait on when not attached, proceed.
// shutdown is sure to transition tenant to stopping, and wait for all tasks to complete, so
// that we can continue safely to cleanup.
match tenant.shutdown(progress, freeze_and_flush).await {
Ok(()) => {}
Err(_other) => {
// if pageserver shutdown or other detach/ignore is already ongoing, we don't want to
// wait for it but return an error right away because these are distinct requests.
return Err(TenantStateError::IsStopping(tenant_id));
}
}
@@ -1206,8 +932,6 @@ mod tests {
use std::sync::Arc;
use tracing::{info_span, Instrument};
use crate::tenant::mgr::TenantSlot;
use super::{super::harness::TenantHarness, TenantsMap};
#[tokio::test(start_paused = true)]
@@ -1229,7 +953,7 @@ mod tests {
// tenant harness configures the logging and we cannot escape it
let _e = info_span!("testing", tenant_id = %id).entered();
let tenants = HashMap::from([(id, TenantSlot::Attached(t.clone()))]);
let tenants = HashMap::from([(id, t.clone())]);
let tenants = Arc::new(tokio::sync::RwLock::new(TenantsMap::Open(tenants)));
let (until_cleanup_completed, can_complete_cleanup) = utils::completion::channel();

View File

@@ -91,12 +91,12 @@ use self::logical_size::LogicalSize;
use self::walreceiver::{WalReceiver, WalReceiverConf};
use super::config::TenantConf;
use super::debug_assert_current_span_has_tenant_and_timeline_id;
use super::remote_timeline_client::index::IndexPart;
use super::remote_timeline_client::RemoteTimelineClient;
use super::storage_layer::{
AsLayerDesc, DeltaLayer, ImageLayer, LayerAccessStatsReset, PersistentLayerDesc,
};
use super::{debug_assert_current_span_has_tenant_and_timeline_id, AttachedTenantConf};
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
pub(super) enum FlushLoopState {
@@ -149,7 +149,7 @@ pub struct TimelineResources {
pub struct Timeline {
conf: &'static PageServerConf,
tenant_conf: Arc<RwLock<AttachedTenantConf>>,
tenant_conf: Arc<RwLock<TenantConfOpt>>,
myself: Weak<Self>,
@@ -158,9 +158,6 @@ pub struct Timeline {
/// The generation of the tenant that instantiated us: this is used for safety when writing remote objects.
/// Never changes for the lifetime of this [`Timeline`] object.
///
/// This duplicates the generation stored in LocationConf, but that structure is mutable:
/// this copy enforces the invariant that generatio doesn't change during a Tenant's lifetime.
generation: Generation,
pub pg_version: u32,
@@ -1381,42 +1378,42 @@ const REPARTITION_FREQ_IN_CHECKPOINT_DISTANCE: u64 = 10;
// Private functions
impl Timeline {
fn get_checkpoint_distance(&self) -> u64 {
let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
let tenant_conf = self.tenant_conf.read().unwrap();
tenant_conf
.checkpoint_distance
.unwrap_or(self.conf.default_tenant_conf.checkpoint_distance)
}
fn get_checkpoint_timeout(&self) -> Duration {
let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
let tenant_conf = self.tenant_conf.read().unwrap();
tenant_conf
.checkpoint_timeout
.unwrap_or(self.conf.default_tenant_conf.checkpoint_timeout)
}
fn get_compaction_target_size(&self) -> u64 {
let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
let tenant_conf = self.tenant_conf.read().unwrap();
tenant_conf
.compaction_target_size
.unwrap_or(self.conf.default_tenant_conf.compaction_target_size)
}
fn get_compaction_threshold(&self) -> usize {
let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
let tenant_conf = self.tenant_conf.read().unwrap();
tenant_conf
.compaction_threshold
.unwrap_or(self.conf.default_tenant_conf.compaction_threshold)
}
fn get_image_creation_threshold(&self) -> usize {
let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
let tenant_conf = self.tenant_conf.read().unwrap();
tenant_conf
.image_creation_threshold
.unwrap_or(self.conf.default_tenant_conf.image_creation_threshold)
}
fn get_eviction_policy(&self) -> EvictionPolicy {
let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
let tenant_conf = self.tenant_conf.read().unwrap();
tenant_conf
.eviction_policy
.unwrap_or(self.conf.default_tenant_conf.eviction_policy)
@@ -1432,7 +1429,7 @@ impl Timeline {
}
fn get_gc_feedback(&self) -> bool {
let tenant_conf = &self.tenant_conf.read().unwrap().tenant_conf;
let tenant_conf = self.tenant_conf.read().unwrap();
tenant_conf
.gc_feedback
.unwrap_or(self.conf.default_tenant_conf.gc_feedback)
@@ -1445,7 +1442,7 @@ impl Timeline {
// The threshold is embedded in the metric. So, we need to update it.
{
let new_threshold = Self::get_evictions_low_residence_duration_metric_threshold(
&self.tenant_conf.read().unwrap().tenant_conf,
&self.tenant_conf.read().unwrap(),
&self.conf.default_tenant_conf,
);
let tenant_id_str = self.tenant_id.to_string();
@@ -1464,7 +1461,7 @@ impl Timeline {
#[allow(clippy::too_many_arguments)]
pub(super) fn new(
conf: &'static PageServerConf,
tenant_conf: Arc<RwLock<AttachedTenantConf>>,
tenant_conf: Arc<RwLock<TenantConfOpt>>,
metadata: &TimelineMetadata,
ancestor: Option<Arc<Timeline>>,
timeline_id: TimelineId,
@@ -1487,7 +1484,7 @@ impl Timeline {
let evictions_low_residence_duration_metric_threshold =
Self::get_evictions_low_residence_duration_metric_threshold(
&tenant_conf_guard.tenant_conf,
&tenant_conf_guard,
&conf.default_tenant_conf,
);
drop(tenant_conf_guard);
@@ -1652,15 +1649,12 @@ impl Timeline {
let tenant_conf_guard = self.tenant_conf.read().unwrap();
let wal_connect_timeout = tenant_conf_guard
.tenant_conf
.walreceiver_connect_timeout
.unwrap_or(self.conf.default_tenant_conf.walreceiver_connect_timeout);
let lagging_wal_timeout = tenant_conf_guard
.tenant_conf
.lagging_wal_timeout
.unwrap_or(self.conf.default_tenant_conf.lagging_wal_timeout);
let max_lsn_wal_lag = tenant_conf_guard
.tenant_conf
.max_lsn_wal_lag
.unwrap_or(self.conf.default_tenant_conf.max_lsn_wal_lag);
drop(tenant_conf_guard);

View File

@@ -15,7 +15,7 @@ use crate::tenant::TENANTS_SEGMENT_NAME;
use camino::{Utf8Path, Utf8PathBuf};
use once_cell::sync::OnceCell;
use std::fs::{self, File, OpenOptions};
use std::io::{Error, ErrorKind, Seek, SeekFrom};
use std::io::{ErrorKind, Seek, SeekFrom};
use std::os::unix::fs::FileExt;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::{RwLock, RwLockWriteGuard};
@@ -173,55 +173,148 @@ impl OpenFiles {
}
}
#[derive(Debug, thiserror::Error)]
pub enum CrashsafeOverwriteError {
#[error("final path has no parent dir")]
FinalPathHasNoParentDir,
#[error("remove tempfile")]
RemovePreviousTempfile(#[source] std::io::Error),
#[error("create tempfile")]
CreateTempfile(#[source] std::io::Error),
#[error("write tempfile")]
WriteContents(#[source] std::io::Error),
#[error("sync tempfile")]
SyncTempfile(#[source] std::io::Error),
#[error("rename tempfile to final path")]
RenameTempfileToFinalPath(#[source] std::io::Error),
#[error("open final path parent dir")]
OpenFinalPathParentDir(#[source] std::io::Error),
#[error("sync final path parent dir")]
SyncFinalPathParentDir(#[source] std::io::Error),
/// Call this when the local filesystem gives us an error with an external
/// cause: this includes EIO, EROFS, and EACCESS: all these indicate either
/// bad storage or bad configuration, and we can't fix that from inside
/// a running process.
pub(crate) fn on_fatal_io_error(e: &std::io::Error) -> ! {
tracing::error!("Fatal I/O error: {}", &e);
std::process::abort();
}
impl CrashsafeOverwriteError {
/// Returns true iff the new contents are durably stored.
pub fn are_new_contents_durable(&self) -> bool {
match self {
Self::FinalPathHasNoParentDir => false,
Self::RemovePreviousTempfile(_) => false,
Self::CreateTempfile(_) => false,
Self::WriteContents(_) => false,
Self::SyncTempfile(_) => false,
Self::RenameTempfileToFinalPath(_) => false,
Self::OpenFinalPathParentDir(_) => false,
Self::SyncFinalPathParentDir(_) => true,
/// Identify error types that should alwways terminate the process. Other
/// error types may be elegible for retry.
pub(crate) fn is_fatal_io_error(e: &std::io::Error) -> bool {
use nix::errno::Errno::*;
match e.raw_os_error().map(nix::errno::from_i32) {
Some(EIO) => {
// Terminate on EIO because we no longer trust the device to store
// data safely, or to uphold persistence guarantees on fsync.
true
}
Some(EROFS) => {
// Terminate on EROFS because a filesystem is usually remounted
// readonly when it has experienced some critical issue, so the same
// logic as EIO applies.
true
}
Some(EACCES) => {
// Terminate on EACCESS because we should always have permissions
// for our own data dir: if we don't, then we can't do our job and
// need administrative intervention to fix permissions. Terminating
// is the best way to make sure we stop cleanly rather than going
// into infinite retry loops, and will make it clear to the outside
// world that we need help.
true
}
_ => {
// Treat all other local file I/O errors are retryable. This includes:
// - ENOSPC: we stay up and wait for eviction to free some space
// - EINVAL, EBADF, EBADFD: this is a code bug, not a filesystem/hardware issue
// - WriteZero, Interrupted: these are used internally VirtualFile
false
}
}
}
/// Wrap std::io::Error with a behavior where we will terminate the process
/// on most I/O errors from local storage. The rational for terminating is:
/// - EIO means we can't trust the drive any more
/// - EROFS means the local filesystem or drive is damaged, we shouldn't use it any more
/// - EACCESS means something is fatally misconfigured about the pageserver, such
/// as running the process as the wrong user, or the filesystem having the wrong
/// ownership or permission bits. We terminate so that it's obvious to
/// the operator why the pageserver isn't working, and they can restart it when
/// they've fixed the problem.
#[derive(thiserror::Error, Debug)]
pub struct Error {
inner: std::io::Error,
context: Option<String>,
}
impl Error {
/// Wrap a io::Error with some context & terminate
/// the process if the io::Error matches our policy for termination
fn new_with_context(e: std::io::Error, context: &str) -> Self {
Self::build(e, Some(context.to_string()))
}
fn context(e: Self, context: &str) -> Self {
Self {
inner: e.inner,
context: Some(context.to_string()),
}
}
fn new(e: std::io::Error) -> Self {
Self::build(e, None)
}
fn invalid(reason: &str) -> Self {
Self::new(std::io::Error::new(ErrorKind::InvalidInput, reason))
}
fn build(e: std::io::Error, context: Option<String>) -> Self {
// Construct instance early so that we have it for
// using Display in termination message.
let instance = Self { inner: e, context };
// Maybe terminate: this violates the usual expectation that callers
// should make their own decisions about how to handle an Error, but
// it's worthwhile to avoid every single user of the local filesystem
// having to apply the same "terminate on errors" behavior.
if is_fatal_io_error(&instance.inner) {
on_fatal_io_error(&instance.inner);
}
instance
}
fn kind(&self) -> ErrorKind {
self.inner.kind()
}
}
impl From<std::io::Error> for Error {
fn from(e: std::io::Error) -> Self {
Self::build(e, None)
}
}
impl From<Error> for std::io::Error {
fn from(e: Error) -> std::io::Error {
e.inner
}
}
impl std::fmt::Display for Error {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match &self.context {
Some(context) => {
write!(f, "{}: {}", context, self.inner)
}
None => self.inner.fmt(f),
}
}
}
impl VirtualFile {
/// Open a file in read-only mode. Like File::open.
pub async fn open(path: &Utf8Path) -> Result<VirtualFile, std::io::Error> {
Self::open_with_options(path, OpenOptions::new().read(true)).await
pub async fn open(path: &Utf8Path) -> Result<VirtualFile, Error> {
Self::open_with_options(path, OpenOptions::new().read(true))
.await
.map_err(Error::from)
}
/// Create a new file for writing. If the file exists, it will be truncated.
/// Like File::create.
pub async fn create(path: &Utf8Path) -> Result<VirtualFile, std::io::Error> {
pub async fn create(path: &Utf8Path) -> Result<VirtualFile, Error> {
Self::open_with_options(
path,
OpenOptions::new().write(true).create(true).truncate(true),
)
.await
.map_err(Error::from)
}
/// Open a file with given options.
@@ -232,7 +325,7 @@ impl VirtualFile {
pub async fn open_with_options(
path: &Utf8Path,
open_options: &OpenOptions,
) -> Result<VirtualFile, std::io::Error> {
) -> Result<VirtualFile, Error> {
let path_str = path.to_string();
let parts = path_str.split('/').collect::<Vec<&str>>();
let tenant_id;
@@ -284,14 +377,16 @@ impl VirtualFile {
final_path: &Utf8Path,
tmp_path: &Utf8Path,
content: &[u8],
) -> Result<(), CrashsafeOverwriteError> {
let Some(final_path_parent) = final_path.parent() else {
return Err(CrashsafeOverwriteError::FinalPathHasNoParentDir);
};
) -> Result<(), Error> {
let final_path_parent = final_path.parent().ok_or(std::io::Error::new(
ErrorKind::InvalidInput,
"Path must be absolute",
))?;
match std::fs::remove_file(tmp_path) {
Ok(()) => {}
Err(e) if e.kind() == std::io::ErrorKind::NotFound => {}
Err(e) => return Err(CrashsafeOverwriteError::RemovePreviousTempfile(e)),
Err(e) => return Err(Error::new_with_context(e, "removing tempfile")),
}
let mut file = Self::open_with_options(
tmp_path,
@@ -302,17 +397,17 @@ impl VirtualFile {
.create_new(true),
)
.await
.map_err(CrashsafeOverwriteError::CreateTempfile)?;
.map_err(|e| Error::context(e, "create tempfile"))?;
file.write_all(content)
.await
.map_err(CrashsafeOverwriteError::WriteContents)?;
.map_err(|e| Error::context(e, "write contents"))?;
file.sync_all()
.await
.map_err(CrashsafeOverwriteError::SyncTempfile)?;
.map_err(|e| Error::context(e, "sync tempfile"))?;
drop(file); // before the rename, that's important!
// renames are atomic
std::fs::rename(tmp_path, final_path)
.map_err(CrashsafeOverwriteError::RenameTempfileToFinalPath)?;
.map_err(|e| Error::new_with_context(e, "rename tempfile to final path"))?;
// Only open final path parent dirfd now, so that this operation only
// ever holds one VirtualFile fd at a time. That's important because
// the current `find_victim_slot` impl might pick the same slot for both
@@ -321,11 +416,11 @@ impl VirtualFile {
let final_parent_dirfd =
Self::open_with_options(final_path_parent, OpenOptions::new().read(true))
.await
.map_err(CrashsafeOverwriteError::OpenFinalPathParentDir)?;
.map_err(|e| Error::context(e, "open final path parent"))?;
final_parent_dirfd
.sync_all()
.await
.map_err(CrashsafeOverwriteError::SyncFinalPathParentDir)?;
.map_err(|e| Error::context(e, "sync final path parent"))?;
Ok(())
}
@@ -333,11 +428,13 @@ impl VirtualFile {
pub async fn sync_all(&self) -> Result<(), Error> {
self.with_file(StorageIoOperation::Fsync, |file| file.sync_all())
.await?
.map_err(Error::new)
}
pub async fn metadata(&self) -> Result<fs::Metadata, Error> {
self.with_file(StorageIoOperation::Metadata, |file| file.metadata())
.await?
.map_err(Error::new)
}
/// Helper function that looks up the underlying File for this VirtualFile,
@@ -432,13 +529,10 @@ impl VirtualFile {
SeekFrom::Current(offset) => {
let pos = self.pos as i128 + offset as i128;
if pos < 0 {
return Err(Error::new(
ErrorKind::InvalidInput,
"offset would be negative",
));
return Err(Error::invalid("offset would be negative"));
}
if pos > u64::MAX as i128 {
return Err(Error::new(ErrorKind::InvalidInput, "offset overflow"));
return Err(Error::invalid("offset overflow"));
}
self.pos = pos as u64;
}
@@ -451,10 +545,11 @@ impl VirtualFile {
while !buf.is_empty() {
match self.read_at(buf, offset).await {
Ok(0) => {
return Err(Error::new(
return Err(std::io::Error::new(
std::io::ErrorKind::UnexpectedEof,
"failed to fill whole buffer",
))
)
.into())
}
Ok(n) => {
buf = &mut buf[n..];
@@ -472,10 +567,11 @@ impl VirtualFile {
while !buf.is_empty() {
match self.write_at(buf, offset).await {
Ok(0) => {
return Err(Error::new(
return Err(std::io::Error::new(
std::io::ErrorKind::WriteZero,
"failed to write whole buffer",
));
)
.into());
}
Ok(n) => {
buf = &buf[n..];
@@ -492,10 +588,11 @@ impl VirtualFile {
while !buf.is_empty() {
match self.write(buf).await {
Ok(0) => {
return Err(Error::new(
return Err(std::io::Error::new(
std::io::ErrorKind::WriteZero,
"failed to write whole buffer",
));
)
.into());
}
Ok(n) => {
buf = &buf[n..];
@@ -507,7 +604,7 @@ impl VirtualFile {
Ok(())
}
async fn write(&mut self, buf: &[u8]) -> Result<usize, std::io::Error> {
async fn write(&mut self, buf: &[u8]) -> Result<usize, Error> {
let pos = self.pos;
let n = self.write_at(buf, pos).await?;
self.pos += n as u64;
@@ -523,7 +620,7 @@ impl VirtualFile {
.with_label_values(&["read", &self.tenant_id, &self.timeline_id])
.add(size as i64);
}
result
result.map_err(Error::new)
}
async fn write_at(&self, buf: &[u8], offset: u64) -> Result<usize, Error> {
@@ -535,7 +632,7 @@ impl VirtualFile {
.with_label_values(&["write", &self.tenant_id, &self.timeline_id])
.add(size as i64);
}
result
result.map_err(Error::new)
}
}
@@ -544,7 +641,7 @@ impl VirtualFile {
pub(crate) async fn read_blk(
&self,
blknum: u32,
) -> Result<crate::tenant::block_io::BlockLease<'_>, std::io::Error> {
) -> Result<crate::tenant::block_io::BlockLease<'_>, Error> {
use crate::page_cache::PAGE_SZ;
let mut buf = [0; PAGE_SZ];
self.read_exact_at(&mut buf, blknum as u64 * (PAGE_SZ as u64))
@@ -660,25 +757,25 @@ mod tests {
async fn read_exact_at(&self, buf: &mut [u8], offset: u64) -> Result<(), Error> {
match self {
MaybeVirtualFile::VirtualFile(file) => file.read_exact_at(buf, offset).await,
MaybeVirtualFile::File(file) => file.read_exact_at(buf, offset),
MaybeVirtualFile::File(file) => file.read_exact_at(buf, offset).map_err(Error::new),
}
}
async fn write_all_at(&self, buf: &[u8], offset: u64) -> Result<(), Error> {
match self {
MaybeVirtualFile::VirtualFile(file) => file.write_all_at(buf, offset).await,
MaybeVirtualFile::File(file) => file.write_all_at(buf, offset),
MaybeVirtualFile::File(file) => file.write_all_at(buf, offset).map_err(Error::new),
}
}
async fn seek(&mut self, pos: SeekFrom) -> Result<u64, Error> {
match self {
MaybeVirtualFile::VirtualFile(file) => file.seek(pos).await,
MaybeVirtualFile::File(file) => file.seek(pos),
MaybeVirtualFile::File(file) => file.seek(pos).map_err(Error::new),
}
}
async fn write_all(&mut self, buf: &[u8]) -> Result<(), Error> {
match self {
MaybeVirtualFile::VirtualFile(file) => file.write_all(buf).await,
MaybeVirtualFile::File(file) => file.write_all(buf),
MaybeVirtualFile::File(file) => file.write_all(buf).map_err(Error::new),
}
}
@@ -887,7 +984,7 @@ mod tests {
hdls.push(hdl);
}
for hdl in hdls {
hdl.await?;
hdl.await.expect("joining")
}
std::mem::forget(rt);

View File

@@ -7,12 +7,12 @@ OBJS = \
extension_server.o \
file_cache.o \
libpagestore.o \
libpqwalproposer.o \
neon.o \
neon_utils.o \
pagestore_smgr.o \
relsize_cache.o \
walproposer.o \
walproposer_pg.o \
walproposer_utils.o \
control_plane_connector.o
PG_CPPFLAGS = -I$(libpq_srcdir)

View File

@@ -30,7 +30,7 @@
#include "neon.h"
#include "walproposer.h"
#include "neon_utils.h"
#include "walproposer_utils.h"
#define PageStoreTrace DEBUG5

View File

@@ -0,0 +1,424 @@
#include "postgres.h"
#include "libpq-fe.h"
#include "neon.h"
#include "walproposer.h"
/* Header in walproposer.h -- Wrapper struct to abstract away the libpq connection */
struct WalProposerConn
{
PGconn *pg_conn;
bool is_nonblocking; /* whether the connection is non-blocking */
char *recvbuf; /* last received data from
* walprop_async_read */
};
/* Helper function */
static bool
ensure_nonblocking_status(WalProposerConn *conn, bool is_nonblocking)
{
/* If we're already correctly blocking or nonblocking, all good */
if (is_nonblocking == conn->is_nonblocking)
return true;
/* Otherwise, set it appropriately */
if (PQsetnonblocking(conn->pg_conn, is_nonblocking) == -1)
return false;
conn->is_nonblocking = is_nonblocking;
return true;
}
/* Exported function definitions */
char *
walprop_error_message(WalProposerConn *conn)
{
return PQerrorMessage(conn->pg_conn);
}
WalProposerConnStatusType
walprop_status(WalProposerConn *conn)
{
switch (PQstatus(conn->pg_conn))
{
case CONNECTION_OK:
return WP_CONNECTION_OK;
case CONNECTION_BAD:
return WP_CONNECTION_BAD;
default:
return WP_CONNECTION_IN_PROGRESS;
}
}
WalProposerConn *
walprop_connect_start(char *conninfo, char *password)
{
WalProposerConn *conn;
PGconn *pg_conn;
const char *keywords[3];
const char *values[3];
int n;
/*
* Connect using the given connection string. If the
* NEON_AUTH_TOKEN environment variable was set, use that as
* the password.
*
* The connection options are parsed in the order they're given, so
* when we set the password before the connection string, the
* connection string can override the password from the env variable.
* Seems useful, although we don't currently use that capability
* anywhere.
*/
n = 0;
if (password)
{
keywords[n] = "password";
values[n] = password;
n++;
}
keywords[n] = "dbname";
values[n] = conninfo;
n++;
keywords[n] = NULL;
values[n] = NULL;
n++;
pg_conn = PQconnectStartParams(keywords, values, 1);
/*
* Allocation of a PQconn can fail, and will return NULL. We want to fully
* replicate the behavior of PQconnectStart here.
*/
if (!pg_conn)
return NULL;
/*
* And in theory this allocation can fail as well, but it's incredibly
* unlikely if we just successfully allocated a PGconn.
*
* palloc will exit on failure though, so there's not much we could do if
* it *did* fail.
*/
conn = palloc(sizeof(WalProposerConn));
conn->pg_conn = pg_conn;
conn->is_nonblocking = false; /* connections always start in blocking
* mode */
conn->recvbuf = NULL;
return conn;
}
WalProposerConnectPollStatusType
walprop_connect_poll(WalProposerConn *conn)
{
WalProposerConnectPollStatusType return_val;
switch (PQconnectPoll(conn->pg_conn))
{
case PGRES_POLLING_FAILED:
return_val = WP_CONN_POLLING_FAILED;
break;
case PGRES_POLLING_READING:
return_val = WP_CONN_POLLING_READING;
break;
case PGRES_POLLING_WRITING:
return_val = WP_CONN_POLLING_WRITING;
break;
case PGRES_POLLING_OK:
return_val = WP_CONN_POLLING_OK;
break;
/*
* There's a comment at its source about this constant being
* unused. We'll expect it's never returned.
*/
case PGRES_POLLING_ACTIVE:
elog(FATAL, "Unexpected PGRES_POLLING_ACTIVE returned from PQconnectPoll");
/*
* This return is never actually reached, but it's here to make
* the compiler happy
*/
return WP_CONN_POLLING_FAILED;
default:
Assert(false);
return_val = WP_CONN_POLLING_FAILED; /* keep the compiler quiet */
}
return return_val;
}
bool
walprop_send_query(WalProposerConn *conn, char *query)
{
/*
* We need to be in blocking mode for sending the query to run without
* requiring a call to PQflush
*/
if (!ensure_nonblocking_status(conn, false))
return false;
/* PQsendQuery returns 1 on success, 0 on failure */
if (!PQsendQuery(conn->pg_conn, query))
return false;
return true;
}
WalProposerExecStatusType
walprop_get_query_result(WalProposerConn *conn)
{
PGresult *result;
WalProposerExecStatusType return_val;
/* Marker variable if we need to log an unexpected success result */
char *unexpected_success = NULL;
/* Consume any input that we might be missing */
if (!PQconsumeInput(conn->pg_conn))
return WP_EXEC_FAILED;
if (PQisBusy(conn->pg_conn))
return WP_EXEC_NEEDS_INPUT;
result = PQgetResult(conn->pg_conn);
/*
* PQgetResult returns NULL only if getting the result was successful &
* there's no more of the result to get.
*/
if (!result)
{
elog(WARNING, "[libpqwalproposer] Unexpected successful end of command results");
return WP_EXEC_UNEXPECTED_SUCCESS;
}
/* Helper macro to reduce boilerplate */
#define UNEXPECTED_SUCCESS(msg) \
return_val = WP_EXEC_UNEXPECTED_SUCCESS; \
unexpected_success = msg; \
break;
switch (PQresultStatus(result))
{
/* "true" success case */
case PGRES_COPY_BOTH:
return_val = WP_EXEC_SUCCESS_COPYBOTH;
break;
/* Unexpected success case */
case PGRES_EMPTY_QUERY:
UNEXPECTED_SUCCESS("empty query return");
case PGRES_COMMAND_OK:
UNEXPECTED_SUCCESS("data-less command end");
case PGRES_TUPLES_OK:
UNEXPECTED_SUCCESS("tuples return");
case PGRES_COPY_OUT:
UNEXPECTED_SUCCESS("'Copy Out' response");
case PGRES_COPY_IN:
UNEXPECTED_SUCCESS("'Copy In' response");
case PGRES_SINGLE_TUPLE:
UNEXPECTED_SUCCESS("single tuple return");
case PGRES_PIPELINE_SYNC:
UNEXPECTED_SUCCESS("pipeline sync point");
/* Failure cases */
case PGRES_BAD_RESPONSE:
case PGRES_NONFATAL_ERROR:
case PGRES_FATAL_ERROR:
case PGRES_PIPELINE_ABORTED:
return_val = WP_EXEC_FAILED;
break;
default:
Assert(false);
return_val = WP_EXEC_FAILED; /* keep the compiler quiet */
}
if (unexpected_success)
elog(WARNING, "[libpqwalproposer] Unexpected successful %s", unexpected_success);
return return_val;
}
pgsocket
walprop_socket(WalProposerConn *conn)
{
return PQsocket(conn->pg_conn);
}
int
walprop_flush(WalProposerConn *conn)
{
return (PQflush(conn->pg_conn));
}
void
walprop_finish(WalProposerConn *conn)
{
if (conn->recvbuf != NULL)
PQfreemem(conn->recvbuf);
PQfinish(conn->pg_conn);
pfree(conn);
}
/*
* Receive a message from the safekeeper.
*
* On success, the data is placed in *buf. It is valid until the next call
* to this function.
*/
PGAsyncReadResult
walprop_async_read(WalProposerConn *conn, char **buf, int *amount)
{
int result;
if (conn->recvbuf != NULL)
{
PQfreemem(conn->recvbuf);
conn->recvbuf = NULL;
}
/* Call PQconsumeInput so that we have the data we need */
if (!PQconsumeInput(conn->pg_conn))
{
*amount = 0;
*buf = NULL;
return PG_ASYNC_READ_FAIL;
}
/*
* The docs for PQgetCopyData list the return values as: 0 if the copy is
* still in progress, but no "complete row" is available -1 if the copy is
* done -2 if an error occurred (> 0) if it was successful; that value is
* the amount transferred.
*
* The protocol we use between walproposer and safekeeper means that we
* *usually* wouldn't expect to see that the copy is done, but this can
* sometimes be triggered by the server returning an ErrorResponse (which
* also happens to have the effect that the copy is done).
*/
switch (result = PQgetCopyData(conn->pg_conn, &conn->recvbuf, true))
{
case 0:
*amount = 0;
*buf = NULL;
return PG_ASYNC_READ_TRY_AGAIN;
case -1:
{
/*
* If we get -1, it's probably because of a server error; the
* safekeeper won't normally send a CopyDone message.
*
* We can check PQgetResult to make sure that the server
* failed; it'll always result in PGRES_FATAL_ERROR
*/
ExecStatusType status = PQresultStatus(PQgetResult(conn->pg_conn));
if (status != PGRES_FATAL_ERROR)
elog(FATAL, "unexpected result status %d after failed PQgetCopyData", status);
/*
* If there was actually an error, it'll be properly reported
* by calls to PQerrorMessage -- we don't have to do anything
* else
*/
*amount = 0;
*buf = NULL;
return PG_ASYNC_READ_FAIL;
}
case -2:
*amount = 0;
*buf = NULL;
return PG_ASYNC_READ_FAIL;
default:
/* Positive values indicate the size of the returned result */
*amount = result;
*buf = conn->recvbuf;
return PG_ASYNC_READ_SUCCESS;
}
}
PGAsyncWriteResult
walprop_async_write(WalProposerConn *conn, void const *buf, size_t size)
{
int result;
/* If we aren't in non-blocking mode, switch to it. */
if (!ensure_nonblocking_status(conn, true))
return PG_ASYNC_WRITE_FAIL;
/*
* The docs for PQputcopyData list the return values as: 1 if the data was
* queued, 0 if it was not queued because of full buffers, or -1 if an
* error occurred
*/
result = PQputCopyData(conn->pg_conn, buf, size);
/*
* We won't get a result of zero because walproposer always empties the
* connection's buffers before sending more
*/
Assert(result != 0);
switch (result)
{
case 1:
/* good -- continue */
break;
case -1:
return PG_ASYNC_WRITE_FAIL;
default:
elog(FATAL, "invalid return %d from PQputCopyData", result);
}
/*
* After queueing the data, we still need to flush to get it to send. This
* might take multiple tries, but we don't want to wait around until it's
* done.
*
* PQflush has the following returns (directly quoting the docs): 0 if
* sucessful, 1 if it was unable to send all the data in the send queue
* yet -1 if it failed for some reason
*/
switch (result = PQflush(conn->pg_conn))
{
case 0:
return PG_ASYNC_WRITE_SUCCESS;
case 1:
return PG_ASYNC_WRITE_TRY_FLUSH;
case -1:
return PG_ASYNC_WRITE_FAIL;
default:
elog(FATAL, "invalid return %d from PQflush", result);
}
}
/*
* This function is very similar to walprop_async_write. For more
* information, refer to the comments there.
*/
bool
walprop_blocking_write(WalProposerConn *conn, void const *buf, size_t size)
{
int result;
/* If we are in non-blocking mode, switch out of it. */
if (!ensure_nonblocking_status(conn, false))
return false;
if ((result = PQputCopyData(conn->pg_conn, buf, size)) == -1)
return false;
Assert(result == 1);
/* Because the connection is non-blocking, flushing returns 0 or -1 */
if ((result = PQflush(conn->pg_conn)) == -1)
return false;
Assert(result == 0);
return true;
}

View File

@@ -18,10 +18,6 @@ extern char *neon_auth_token;
extern char *neon_timeline;
extern char *neon_tenant;
extern char *wal_acceptors_list;
extern int wal_acceptor_reconnect_timeout;
extern int wal_acceptor_connection_timeout;
extern void pg_init_libpagestore(void);
extern void pg_init_walproposer(void);
@@ -34,10 +30,4 @@ extern void pg_init_extension_server(void);
extern bool neon_redo_read_buffer_filter(XLogReaderState *record, uint8 block_id);
extern bool (*old_redo_read_buffer_filter) (XLogReaderState *record, uint8 block_id);
extern uint64 BackpressureThrottlingTime(void);
extern void replication_feedback_get_lsns(XLogRecPtr *writeLsn, XLogRecPtr *flushLsn, XLogRecPtr *applyLsn);
extern void PGDLLEXPORT WalProposerSync(int argc, char *argv[]);
extern void PGDLLEXPORT WalProposerMain(Datum main_arg);
#endif /* NEON_H */

View File

@@ -1,116 +0,0 @@
#include "postgres.h"
#include "access/timeline.h"
#include "access/xlogutils.h"
#include "common/logging.h"
#include "common/ip.h"
#include "funcapi.h"
#include "libpq/libpq.h"
#include "libpq/pqformat.h"
#include "miscadmin.h"
#include "postmaster/interrupt.h"
#include "replication/slot.h"
#include "replication/walsender_private.h"
#include "storage/ipc.h"
#include "utils/builtins.h"
#include "utils/ps_status.h"
#include "libpq-fe.h"
#include <netinet/tcp.h>
#include <unistd.h>
#if PG_VERSION_NUM >= 150000
#include "access/xlogutils.h"
#include "access/xlogrecovery.h"
#endif
#if PG_MAJORVERSION_NUM >= 16
#include "utils/guc.h"
#endif
/*
* Convert a character which represents a hexadecimal digit to an integer.
*
* Returns -1 if the character is not a hexadecimal digit.
*/
int
HexDecodeChar(char c)
{
if (c >= '0' && c <= '9')
return c - '0';
if (c >= 'a' && c <= 'f')
return c - 'a' + 10;
if (c >= 'A' && c <= 'F')
return c - 'A' + 10;
return -1;
}
/*
* Decode a hex string into a byte string, 2 hex chars per byte.
*
* Returns false if invalid characters are encountered; otherwise true.
*/
bool
HexDecodeString(uint8 *result, char *input, int nbytes)
{
int i;
for (i = 0; i < nbytes; ++i)
{
int n1 = HexDecodeChar(input[i * 2]);
int n2 = HexDecodeChar(input[i * 2 + 1]);
if (n1 < 0 || n2 < 0)
return false;
result[i] = n1 * 16 + n2;
}
return true;
}
/* --------------------------------
* pq_getmsgint32_le - get a binary 4-byte int from a message buffer in native (LE) order
* --------------------------------
*/
uint32
pq_getmsgint32_le(StringInfo msg)
{
uint32 n32;
pq_copymsgbytes(msg, (char *) &n32, sizeof(n32));
return n32;
}
/* --------------------------------
* pq_getmsgint64 - get a binary 8-byte int from a message buffer in native (LE) order
* --------------------------------
*/
uint64
pq_getmsgint64_le(StringInfo msg)
{
uint64 n64;
pq_copymsgbytes(msg, (char *) &n64, sizeof(n64));
return n64;
}
/* append a binary [u]int32 to a StringInfo buffer in native (LE) order */
void
pq_sendint32_le(StringInfo buf, uint32 i)
{
enlargeStringInfo(buf, sizeof(uint32));
memcpy(buf->data + buf->len, &i, sizeof(uint32));
buf->len += sizeof(uint32);
}
/* append a binary [u]int64 to a StringInfo buffer in native (LE) order */
void
pq_sendint64_le(StringInfo buf, uint64 i)
{
enlargeStringInfo(buf, sizeof(uint64));
memcpy(buf->data + buf->len, &i, sizeof(uint64));
buf->len += sizeof(uint64);
}

View File

@@ -1,12 +0,0 @@
#ifndef __NEON_UTILS_H__
#define __NEON_UTILS_H__
#include "postgres.h"
bool HexDecodeString(uint8 *result, char *input, int nbytes);
uint32 pq_getmsgint32_le(StringInfo msg);
uint64 pq_getmsgint64_le(StringInfo msg);
void pq_sendint32_le(StringInfo buf, uint32 i);
void pq_sendint64_le(StringInfo buf, uint64 i);
#endif /* __NEON_UTILS_H__ */

File diff suppressed because it is too large Load Diff

View File

@@ -1,8 +1,8 @@
#ifndef __NEON_WALPROPOSER_H__
#define __NEON_WALPROPOSER_H__
#include "postgres.h"
#include "access/xlogdefs.h"
#include "postgres.h"
#include "port.h"
#include "access/xlog_internal.h"
#include "access/transam.h"
@@ -16,15 +16,29 @@
#define MAX_SAFEKEEPERS 32
#define MAX_SEND_SIZE (XLOG_BLCKSZ * 16) /* max size of a single* WAL
* message */
#define XLOG_HDR_SIZE (1 + 8 * 3) /* 'w' + startPos + walEnd + timestamp */
#define XLOG_HDR_START_POS 1 /* offset of start position in wal sender*
* message header */
#define XLOG_HDR_END_POS (1 + 8) /* offset of end position in wal sender*
* message header */
/*
* In the spirit of WL_SOCKET_READABLE and others, this corresponds to no events having occurred,
* because all WL_* events are given flags equal to some (1 << i), starting from i = 0
*/
#define WL_NO_EVENTS 0
struct WalProposerConn; /* Defined in implementation (walprop_pg.c) */
extern char *wal_acceptors_list;
extern int wal_acceptor_reconnect_timeout;
extern int wal_acceptor_connection_timeout;
extern bool am_wal_proposer;
struct WalProposerConn; /* Defined in libpqwalproposer */
typedef struct WalProposerConn WalProposerConn;
struct WalMessage;
typedef struct WalMessage WalMessage;
/* Possible return values from ReadPGAsync */
typedef enum
{
@@ -38,7 +52,7 @@ typedef enum
PG_ASYNC_READ_TRY_AGAIN,
/* Reading failed. Check PQerrorMessage(conn) */
PG_ASYNC_READ_FAIL,
} PGAsyncReadResult;
} PGAsyncReadResult;
/* Possible return values from WritePGAsync */
typedef enum
@@ -57,7 +71,7 @@ typedef enum
PG_ASYNC_WRITE_TRY_FLUSH,
/* Writing failed. Check PQerrorMessage(conn) */
PG_ASYNC_WRITE_FAIL,
} PGAsyncWriteResult;
} PGAsyncWriteResult;
/*
* WAL safekeeper state, which is used to wait for some event.
@@ -133,7 +147,7 @@ typedef enum
* to read.
*/
SS_ACTIVE,
} SafekeeperState;
} SafekeeperState;
/* Consensus logical timestamp. */
typedef uint64 term_t;
@@ -157,12 +171,12 @@ typedef struct ProposerGreeting
uint8 tenant_id[16];
TimeLineID timeline;
uint32 walSegSize;
} ProposerGreeting;
} ProposerGreeting;
typedef struct AcceptorProposerMessage
{
uint64 tag;
} AcceptorProposerMessage;
} AcceptorProposerMessage;
/*
* Acceptor -> Proposer initial response: the highest term acceptor voted for.
@@ -172,7 +186,7 @@ typedef struct AcceptorGreeting
AcceptorProposerMessage apm;
term_t term;
NNodeId nodeId;
} AcceptorGreeting;
} AcceptorGreeting;
/*
* Proposer -> Acceptor vote request.
@@ -182,20 +196,20 @@ typedef struct VoteRequest
uint64 tag;
term_t term;
pg_uuid_t proposerId; /* for monitoring/debugging */
} VoteRequest;
} VoteRequest;
/* Element of term switching chain. */
typedef struct TermSwitchEntry
{
term_t term;
XLogRecPtr lsn;
} TermSwitchEntry;
} TermSwitchEntry;
typedef struct TermHistory
{
uint32 n_entries;
TermSwitchEntry *entries;
} TermHistory;
} TermHistory;
/* Vote itself, sent from safekeeper to proposer */
typedef struct VoteResponse
@@ -213,7 +227,7 @@ typedef struct VoteResponse
* recovery of some safekeeper */
TermHistory termHistory;
XLogRecPtr timelineStartLsn; /* timeline globally starts at this LSN */
} VoteResponse;
} VoteResponse;
/*
* Proposer -> Acceptor message announcing proposer is elected and communicating
@@ -229,7 +243,7 @@ typedef struct ProposerElected
TermHistory *termHistory;
/* timeline globally starts at this LSN */
XLogRecPtr timelineStartLsn;
} ProposerElected;
} ProposerElected;
/*
* Header of request with WAL message sent from proposer to safekeeper.
@@ -254,7 +268,7 @@ typedef struct AppendRequestHeader
*/
XLogRecPtr truncateLsn;
pg_uuid_t proposerId; /* for monitoring/debugging */
} AppendRequestHeader;
} AppendRequestHeader;
/*
* Hot standby feedback received from replica
@@ -264,7 +278,7 @@ typedef struct HotStandbyFeedback
TimestampTz ts;
FullTransactionId xmin;
FullTransactionId catalog_xmin;
} HotStandbyFeedback;
} HotStandbyFeedback;
typedef struct PageserverFeedback
{
@@ -275,7 +289,7 @@ typedef struct PageserverFeedback
XLogRecPtr disk_consistent_lsn;
XLogRecPtr remote_consistent_lsn;
TimestampTz replytime;
} PageserverFeedback;
} PageserverFeedback;
typedef struct WalproposerShmemState
{
@@ -283,7 +297,7 @@ typedef struct WalproposerShmemState
PageserverFeedback feedback;
term_t mineLastElectedTerm;
pg_atomic_uint64 backpressureThrottlingTime;
} WalproposerShmemState;
} WalproposerShmemState;
/*
* Report safekeeper state to proposer
@@ -307,22 +321,17 @@ typedef struct AppendResponse
/* and custom neon feedback. */
/* This part of the message is extensible. */
PageserverFeedback rf;
} AppendResponse;
} AppendResponse;
/* PageserverFeedback is extensible part of the message that is parsed separately */
/* Other fields are fixed part */
#define APPENDRESPONSE_FIXEDPART_SIZE offsetof(AppendResponse, rf)
struct WalProposer;
typedef struct WalProposer WalProposer;
/*
* Descriptor of safekeeper
*/
typedef struct Safekeeper
{
WalProposer *wp;
char const *host;
char const *port;
@@ -331,7 +340,7 @@ typedef struct Safekeeper
*
* May contain private information like password and should not be logged.
*/
char conninfo[MAXCONNINFO];
char conninfo[MAXCONNINFO];
/*
* postgres protocol connection to the WAL acceptor
@@ -364,12 +373,27 @@ typedef struct Safekeeper
int eventPos; /* position in wait event set. Equal to -1 if*
* no event */
SafekeeperState state; /* safekeeper state machine state */
TimestampTz latestMsgReceivedAt; /* when latest msg is received */
TimestampTz latestMsgReceivedAt; /* when latest msg is received */
AcceptorGreeting greetResponse; /* acceptor greeting */
VoteResponse voteResponse; /* the vote */
AppendResponse appendResponse; /* feedback for master */
} Safekeeper;
extern void PGDLLEXPORT WalProposerSync(int argc, char *argv[]);
extern void PGDLLEXPORT WalProposerMain(Datum main_arg);
extern void WalProposerBroadcast(XLogRecPtr startpos, XLogRecPtr endpos);
extern void WalProposerPoll(void);
extern void ParsePageserverFeedbackMessage(StringInfo reply_message,
PageserverFeedback *rf);
extern void StartProposerReplication(StartReplicationCmd *cmd);
extern Size WalproposerShmemSize(void);
extern bool WalproposerShmemInit(void);
extern void replication_feedback_set(PageserverFeedback *rf);
extern void replication_feedback_get_lsns(XLogRecPtr *writeLsn, XLogRecPtr *flushLsn, XLogRecPtr *applyLsn);
/* libpqwalproposer hooks & helper type */
/* Re-exported PostgresPollingStatusType */
typedef enum
{
@@ -382,7 +406,7 @@ typedef enum
* 'libpq-fe.h' still has PGRES_POLLING_ACTIVE, but says it's unused.
* We've removed it here to avoid clutter.
*/
} WalProposerConnectPollStatusType;
} WalProposerConnectPollStatusType;
/* Re-exported and modified ExecStatusType */
typedef enum
@@ -407,7 +431,7 @@ typedef enum
WP_EXEC_NEEDS_INPUT,
/* Catch-all failure. Check PQerrorMessage. */
WP_EXEC_FAILED,
} WalProposerExecStatusType;
} WalProposerExecStatusType;
/* Re-exported ConnStatusType */
typedef enum
@@ -421,252 +445,67 @@ typedef enum
* that extra functionality, so we collect them into a single tag here.
*/
WP_CONNECTION_IN_PROGRESS,
} WalProposerConnStatusType;
} WalProposerConnStatusType;
/* Re-exported PQerrorMessage */
extern char *walprop_error_message(WalProposerConn *conn);
/* Re-exported PQstatus */
extern WalProposerConnStatusType walprop_status(WalProposerConn *conn);
/* Re-exported PQconnectStart */
extern WalProposerConn * walprop_connect_start(char *conninfo, char *password);
/* Re-exported PQconectPoll */
extern WalProposerConnectPollStatusType walprop_connect_poll(WalProposerConn *conn);
/* Blocking wrapper around PQsendQuery */
extern bool walprop_send_query(WalProposerConn *conn, char *query);
/* Wrapper around PQconsumeInput + PQisBusy + PQgetResult */
extern WalProposerExecStatusType walprop_get_query_result(WalProposerConn *conn);
/* Re-exported PQsocket */
extern pgsocket walprop_socket(WalProposerConn *conn);
/* Wrapper around PQconsumeInput (if socket's read-ready) + PQflush */
extern int walprop_flush(WalProposerConn *conn);
/* Re-exported PQfinish */
extern void walprop_finish(WalProposerConn *conn);
/*
* Collection of hooks for walproposer, to call postgres functions,
* read WAL and send it over the network.
* Ergonomic wrapper around PGgetCopyData
*
* Reads a CopyData block from a safekeeper, setting *amount to the number
* of bytes returned.
*
* This function is allowed to assume certain properties specific to the
* protocol with the safekeepers, so it should not be used as-is for any
* other purpose.
*
* Note: If possible, using <AsyncRead> is generally preferred, because it
* performs a bit of extra checking work that's always required and is normally
* somewhat verbose.
*/
typedef struct walproposer_api
{
/*
* Get WalproposerShmemState. This is used to store information about last
* elected term.
*/
WalproposerShmemState *(*get_shmem_state) (void);
/*
* Start receiving notifications about new WAL. This is an infinite loop
* which calls WalProposerBroadcast() and WalProposerPoll() to send the
* WAL.
*/
void (*start_streaming) (WalProposer *wp, XLogRecPtr startpos);
/* Get pointer to the latest available WAL. */
XLogRecPtr (*get_flush_rec_ptr) (void);
/* Get current time. */
TimestampTz (*get_current_timestamp) (void);
/* Get postgres timeline. */
TimeLineID (*get_timeline_id) (void);
/* Current error message, aka PQerrorMessage. */
char *(*conn_error_message) (WalProposerConn *conn);
/* Connection status, aka PQstatus. */
WalProposerConnStatusType (*conn_status) (WalProposerConn *conn);
/* Start the connection, aka PQconnectStart. */
WalProposerConn *(*conn_connect_start) (char *conninfo);
/* Poll an asynchronous connection, aka PQconnectPoll. */
WalProposerConnectPollStatusType (*conn_connect_poll) (WalProposerConn *conn);
/* Send a blocking SQL query, aka PQsendQuery. */
bool (*conn_send_query) (WalProposerConn *conn, char *query);
/* Read the query result, aka PQgetResult. */
WalProposerExecStatusType (*conn_get_query_result) (WalProposerConn *conn);
/* Flush buffer to the network, aka PQflush. */
int (*conn_flush) (WalProposerConn *conn);
/* Close the connection, aka PQfinish. */
void (*conn_finish) (WalProposerConn *conn);
/* Try to read CopyData message, aka PQgetCopyData. */
PGAsyncReadResult (*conn_async_read) (WalProposerConn *conn, char **buf, int *amount);
/* Try to write CopyData message, aka PQputCopyData. */
PGAsyncWriteResult (*conn_async_write) (WalProposerConn *conn, void const *buf, size_t size);
/* Blocking CopyData write, aka PQputCopyData + PQflush. */
bool (*conn_blocking_write) (WalProposerConn *conn, void const *buf, size_t size);
/* Download WAL from startpos to endpos and make it available locally. */
bool (*recovery_download) (Safekeeper *sk, TimeLineID timeline, XLogRecPtr startpos, XLogRecPtr endpos);
/* Read WAL from disk to buf. */
void (*wal_read) (XLogReaderState *state, char *buf, XLogRecPtr startptr, Size count);
/* Allocate WAL reader. */
XLogReaderState *(*wal_reader_allocate) (void);
/* Deallocate event set. */
void (*free_event_set) (void);
/* Initialize event set. */
void (*init_event_set) (int n_safekeepers);
/* Update events for an existing safekeeper connection. */
void (*update_event_set) (Safekeeper *sk, uint32 events);
/* Add a new safekeeper connection to the event set. */
void (*add_safekeeper_event_set) (Safekeeper *sk, uint32 events);
/*
* Wait until some event happens: - timeout is reached - socket event for
* safekeeper connection - new WAL is available
*
* Returns 0 if timeout is reached, 1 if some event happened. Updates
* events mask to indicate events and sets sk to the safekeeper which has
* an event.
*/
int (*wait_event_set) (long timeout, Safekeeper **sk, uint32 *events);
/* Read random bytes. */
bool (*strong_random) (void *buf, size_t len);
/*
* Get a basebackup LSN. Used to cross-validate with the latest available
* LSN on the safekeepers.
*/
XLogRecPtr (*get_redo_start_lsn) (void);
/*
* Finish sync safekeepers with the given LSN. This function should not
* return and should exit the program.
*/
void (*finish_sync_safekeepers) (XLogRecPtr lsn);
/*
* Called after every new message from the safekeeper. Used to propagate
* backpressure feedback and to confirm WAL persistence (has been commited
* on the quorum of safekeepers).
*/
void (*process_safekeeper_feedback) (WalProposer *wp, XLogRecPtr commitLsn);
/*
* Called on peer_horizon_lsn updates. Used to advance replication slot
* and to free up disk space by deleting unnecessary WAL.
*/
void (*confirm_wal_streamed) (XLogRecPtr lsn);
} walproposer_api;
extern PGAsyncReadResult walprop_async_read(WalProposerConn *conn, char **buf, int *amount);
/*
* Configuration of the WAL proposer.
* Ergonomic wrapper around PQputCopyData + PQflush
*
* Starts to write a CopyData block to a safekeeper.
*
* For information on the meaning of return codes, refer to PGAsyncWriteResult.
*/
typedef struct WalProposerConfig
{
/* hex-encoded TenantId cstr */
char *neon_tenant;
/* hex-encoded TimelineId cstr */
char *neon_timeline;
/*
* Comma-separated list of safekeepers, in the following format:
* host1:port1,host2:port2,host3:port3
*
* This cstr should be editable.
*/
char *safekeepers_list;
/*
* WalProposer reconnects to offline safekeepers once in this interval.
* Time is in milliseconds.
*/
int safekeeper_reconnect_timeout;
/*
* WalProposer terminates the connection if it doesn't receive any message
* from the safekeeper in this interval. Time is in milliseconds.
*/
int safekeeper_connection_timeout;
/*
* WAL segment size. Will be passed to safekeepers in greet request. Also
* used to detect page headers.
*/
int wal_segment_size;
/*
* If safekeeper was started in sync mode, walproposer will not subscribe
* for new WAL and will exit when quorum of safekeepers will be synced to
* the latest available LSN.
*/
bool syncSafekeepers;
/* Will be passed to safekeepers in greet request. */
uint64 systemId;
} WalProposerConfig;
extern PGAsyncWriteResult walprop_async_write(WalProposerConn *conn, void const *buf, size_t size);
/*
* WAL proposer state.
* Blocking equivalent to walprop_async_write_fn
*
* Returns 'true' if successful, 'false' on failure.
*/
typedef struct WalProposer
{
WalProposerConfig *config;
int n_safekeepers;
extern bool walprop_blocking_write(WalProposerConn *conn, void const *buf, size_t size);
/* (n_safekeepers / 2) + 1 */
int quorum;
Safekeeper safekeeper[MAX_SAFEKEEPERS];
/* WAL has been generated up to this point */
XLogRecPtr availableLsn;
/* last commitLsn broadcasted to safekeepers */
XLogRecPtr lastSentCommitLsn;
ProposerGreeting greetRequest;
/* Vote request for safekeeper */
VoteRequest voteRequest;
/*
* Minimal LSN which may be needed for recovery of some safekeeper,
* record-aligned (first record which might not yet received by someone).
*/
XLogRecPtr truncateLsn;
/*
* Term of the proposer. We want our term to be highest and unique, so we
* collect terms from safekeepers quorum, choose max and +1. After that
* our term is fixed and must not change. If we observe that some
* safekeeper has higher term, it means that we have another running
* compute, so we must stop immediately.
*/
term_t propTerm;
/* term history of the proposer */
TermHistory propTermHistory;
/* epoch start lsn of the proposer */
XLogRecPtr propEpochStartLsn;
/* Most advanced acceptor epoch */
term_t donorEpoch;
/* Most advanced acceptor */
int donor;
/* timeline globally starts at this LSN */
XLogRecPtr timelineStartLsn;
/* number of votes collected from safekeepers */
int n_votes;
/* number of successful connections over the lifetime of walproposer */
int n_connected;
/*
* Timestamp of the last reconnection attempt. Related to
* config->safekeeper_reconnect_timeout
*/
TimestampTz last_reconnect_attempt;
walproposer_api api;
} WalProposer;
extern WalProposer *WalProposerCreate(WalProposerConfig *config, walproposer_api api);
extern void WalProposerStart(WalProposer *wp);
extern void WalProposerBroadcast(WalProposer *wp, XLogRecPtr startpos, XLogRecPtr endpos);
extern void WalProposerPoll(WalProposer *wp);
extern void ParsePageserverFeedbackMessage(StringInfo reply_message,
PageserverFeedback *rf);
extern uint64 BackpressureThrottlingTime(void);
#endif /* __NEON_WALPROPOSER_H__ */

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,659 @@
#include "postgres.h"
#include "access/timeline.h"
#include "access/xlogutils.h"
#include "common/logging.h"
#include "common/ip.h"
#include "funcapi.h"
#include "libpq/libpq.h"
#include "libpq/pqformat.h"
#include "miscadmin.h"
#include "postmaster/interrupt.h"
#include "replication/slot.h"
#include "walproposer_utils.h"
#include "replication/walsender_private.h"
#include "storage/ipc.h"
#include "utils/builtins.h"
#include "utils/ps_status.h"
#include "libpq-fe.h"
#include <netinet/tcp.h>
#include <unistd.h>
#if PG_VERSION_NUM >= 150000
#include "access/xlogutils.h"
#include "access/xlogrecovery.h"
#endif
#if PG_MAJORVERSION_NUM >= 16
#include "utils/guc.h"
#endif
/*
* These variables are used similarly to openLogFile/SegNo,
* but for walproposer to write the XLOG during recovery. walpropFileTLI is the TimeLineID
* corresponding the filename of walpropFile.
*/
static int walpropFile = -1;
static TimeLineID walpropFileTLI = 0;
static XLogSegNo walpropSegNo = 0;
/* START cloned file-local variables and functions from walsender.c */
/*
* How far have we sent WAL already? This is also advertised in
* MyWalSnd->sentPtr. (Actually, this is the next WAL location to send.)
*/
static XLogRecPtr sentPtr = InvalidXLogRecPtr;
static void WalSndLoop(void);
static void XLogBroadcastWalProposer(void);
/* END cloned file-level variables and functions from walsender.c */
int
CompareLsn(const void *a, const void *b)
{
XLogRecPtr lsn1 = *((const XLogRecPtr *) a);
XLogRecPtr lsn2 = *((const XLogRecPtr *) b);
if (lsn1 < lsn2)
return -1;
else if (lsn1 == lsn2)
return 0;
else
return 1;
}
/* Returns a human-readable string corresonding to the SafekeeperState
*
* The string should not be freed.
*
* The strings are intended to be used as a prefix to "state", e.g.:
*
* elog(LOG, "currently in %s state", FormatSafekeeperState(sk->state));
*
* If this sort of phrasing doesn't fit the message, instead use something like:
*
* elog(LOG, "currently in state [%s]", FormatSafekeeperState(sk->state));
*/
char *
FormatSafekeeperState(SafekeeperState state)
{
char *return_val = NULL;
switch (state)
{
case SS_OFFLINE:
return_val = "offline";
break;
case SS_CONNECTING_READ:
case SS_CONNECTING_WRITE:
return_val = "connecting";
break;
case SS_WAIT_EXEC_RESULT:
return_val = "receiving query result";
break;
case SS_HANDSHAKE_RECV:
return_val = "handshake (receiving)";
break;
case SS_VOTING:
return_val = "voting";
break;
case SS_WAIT_VERDICT:
return_val = "wait-for-verdict";
break;
case SS_SEND_ELECTED_FLUSH:
return_val = "send-announcement-flush";
break;
case SS_IDLE:
return_val = "idle";
break;
case SS_ACTIVE:
return_val = "active";
break;
}
Assert(return_val != NULL);
return return_val;
}
/* Asserts that the provided events are expected for given safekeeper's state */
void
AssertEventsOkForState(uint32 events, Safekeeper *sk)
{
uint32 expected = SafekeeperStateDesiredEvents(sk->state);
/*
* The events are in-line with what we're expecting, under two conditions:
* (a) if we aren't expecting anything, `events` has no read- or
* write-ready component. (b) if we are expecting something, there's
* overlap (i.e. `events & expected != 0`)
*/
bool events_ok_for_state; /* long name so the `Assert` is more
* clear later */
if (expected == WL_NO_EVENTS)
events_ok_for_state = ((events & (WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE)) == 0);
else
events_ok_for_state = ((events & expected) != 0);
if (!events_ok_for_state)
{
/*
* To give a descriptive message in the case of failure, we use elog
* and then an assertion that's guaranteed to fail.
*/
elog(WARNING, "events %s mismatched for safekeeper %s:%s in state [%s]",
FormatEvents(events), sk->host, sk->port, FormatSafekeeperState(sk->state));
Assert(events_ok_for_state);
}
}
/* Returns the set of events a safekeeper in this state should be waiting on
*
* This will return WL_NO_EVENTS (= 0) for some events. */
uint32
SafekeeperStateDesiredEvents(SafekeeperState state)
{
uint32 result = WL_NO_EVENTS;
/* If the state doesn't have a modifier, we can check the base state */
switch (state)
{
/* Connecting states say what they want in the name */
case SS_CONNECTING_READ:
result = WL_SOCKET_READABLE;
break;
case SS_CONNECTING_WRITE:
result = WL_SOCKET_WRITEABLE;
break;
/* Reading states need the socket to be read-ready to continue */
case SS_WAIT_EXEC_RESULT:
case SS_HANDSHAKE_RECV:
case SS_WAIT_VERDICT:
result = WL_SOCKET_READABLE;
break;
/*
* Idle states use read-readiness as a sign that the connection
* has been disconnected.
*/
case SS_VOTING:
case SS_IDLE:
result = WL_SOCKET_READABLE;
break;
/*
* Flush states require write-ready for flushing. Active state
* does both reading and writing.
*
* TODO: SS_ACTIVE sometimes doesn't need to be write-ready. We
* should check sk->flushWrite here to set WL_SOCKET_WRITEABLE.
*/
case SS_SEND_ELECTED_FLUSH:
case SS_ACTIVE:
result = WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE;
break;
/* The offline state expects no events. */
case SS_OFFLINE:
result = WL_NO_EVENTS;
break;
default:
Assert(false);
break;
}
return result;
}
/* Returns a human-readable string corresponding to the event set
*
* If the events do not correspond to something set as the `events` field of a `WaitEvent`, the
* returned string may be meaingless.
*
* The string should not be freed. It should also not be expected to remain the same between
* function calls. */
char *
FormatEvents(uint32 events)
{
static char return_str[8];
/* Helper variable to check if there's extra bits */
uint32 all_flags = WL_LATCH_SET
| WL_SOCKET_READABLE
| WL_SOCKET_WRITEABLE
| WL_TIMEOUT
| WL_POSTMASTER_DEATH
| WL_EXIT_ON_PM_DEATH
| WL_SOCKET_CONNECTED;
/*
* The formatting here isn't supposed to be *particularly* useful -- it's
* just to give an sense of what events have been triggered without
* needing to remember your powers of two.
*/
return_str[0] = (events & WL_LATCH_SET) ? 'L' : '_';
return_str[1] = (events & WL_SOCKET_READABLE) ? 'R' : '_';
return_str[2] = (events & WL_SOCKET_WRITEABLE) ? 'W' : '_';
return_str[3] = (events & WL_TIMEOUT) ? 'T' : '_';
return_str[4] = (events & WL_POSTMASTER_DEATH) ? 'D' : '_';
return_str[5] = (events & WL_EXIT_ON_PM_DEATH) ? 'E' : '_';
return_str[5] = (events & WL_SOCKET_CONNECTED) ? 'C' : '_';
if (events & (~all_flags))
{
elog(WARNING, "Event formatting found unexpected component %d",
events & (~all_flags));
return_str[6] = '*';
return_str[7] = '\0';
}
else
return_str[6] = '\0';
return (char *) &return_str;
}
/*
* Convert a character which represents a hexadecimal digit to an integer.
*
* Returns -1 if the character is not a hexadecimal digit.
*/
static int
HexDecodeChar(char c)
{
if (c >= '0' && c <= '9')
return c - '0';
if (c >= 'a' && c <= 'f')
return c - 'a' + 10;
if (c >= 'A' && c <= 'F')
return c - 'A' + 10;
return -1;
}
/*
* Decode a hex string into a byte string, 2 hex chars per byte.
*
* Returns false if invalid characters are encountered; otherwise true.
*/
bool
HexDecodeString(uint8 *result, char *input, int nbytes)
{
int i;
for (i = 0; i < nbytes; ++i)
{
int n1 = HexDecodeChar(input[i * 2]);
int n2 = HexDecodeChar(input[i * 2 + 1]);
if (n1 < 0 || n2 < 0)
return false;
result[i] = n1 * 16 + n2;
}
return true;
}
/* --------------------------------
* pq_getmsgint32_le - get a binary 4-byte int from a message buffer in native (LE) order
* --------------------------------
*/
uint32
pq_getmsgint32_le(StringInfo msg)
{
uint32 n32;
pq_copymsgbytes(msg, (char *) &n32, sizeof(n32));
return n32;
}
/* --------------------------------
* pq_getmsgint64 - get a binary 8-byte int from a message buffer in native (LE) order
* --------------------------------
*/
uint64
pq_getmsgint64_le(StringInfo msg)
{
uint64 n64;
pq_copymsgbytes(msg, (char *) &n64, sizeof(n64));
return n64;
}
/* append a binary [u]int32 to a StringInfo buffer in native (LE) order */
void
pq_sendint32_le(StringInfo buf, uint32 i)
{
enlargeStringInfo(buf, sizeof(uint32));
memcpy(buf->data + buf->len, &i, sizeof(uint32));
buf->len += sizeof(uint32);
}
/* append a binary [u]int64 to a StringInfo buffer in native (LE) order */
void
pq_sendint64_le(StringInfo buf, uint64 i)
{
enlargeStringInfo(buf, sizeof(uint64));
memcpy(buf->data + buf->len, &i, sizeof(uint64));
buf->len += sizeof(uint64);
}
/*
* Write XLOG data to disk.
*/
void
XLogWalPropWrite(char *buf, Size nbytes, XLogRecPtr recptr)
{
int startoff;
int byteswritten;
while (nbytes > 0)
{
int segbytes;
/* Close the current segment if it's completed */
if (walpropFile >= 0 && !XLByteInSeg(recptr, walpropSegNo, wal_segment_size))
XLogWalPropClose(recptr);
if (walpropFile < 0)
{
#if PG_VERSION_NUM >= 150000
/* FIXME Is it ok to use hardcoded value here? */
TimeLineID tli = 1;
#else
bool use_existent = true;
#endif
/* Create/use new log file */
XLByteToSeg(recptr, walpropSegNo, wal_segment_size);
#if PG_VERSION_NUM >= 150000
walpropFile = XLogFileInit(walpropSegNo, tli);
walpropFileTLI = tli;
#else
walpropFile = XLogFileInit(walpropSegNo, &use_existent, false);
walpropFileTLI = ThisTimeLineID;
#endif
}
/* Calculate the start offset of the received logs */
startoff = XLogSegmentOffset(recptr, wal_segment_size);
if (startoff + nbytes > wal_segment_size)
segbytes = wal_segment_size - startoff;
else
segbytes = nbytes;
/* OK to write the logs */
errno = 0;
byteswritten = pg_pwrite(walpropFile, buf, segbytes, (off_t) startoff);
if (byteswritten <= 0)
{
char xlogfname[MAXFNAMELEN];
int save_errno;
/* if write didn't set errno, assume no disk space */
if (errno == 0)
errno = ENOSPC;
save_errno = errno;
XLogFileName(xlogfname, walpropFileTLI, walpropSegNo, wal_segment_size);
errno = save_errno;
ereport(PANIC,
(errcode_for_file_access(),
errmsg("could not write to log segment %s "
"at offset %u, length %lu: %m",
xlogfname, startoff, (unsigned long) segbytes)));
}
/* Update state for write */
recptr += byteswritten;
nbytes -= byteswritten;
buf += byteswritten;
}
/*
* Close the current segment if it's fully written up in the last cycle of
* the loop.
*/
if (walpropFile >= 0 && !XLByteInSeg(recptr, walpropSegNo, wal_segment_size))
{
XLogWalPropClose(recptr);
}
}
/*
* Close the current segment.
*/
void
XLogWalPropClose(XLogRecPtr recptr)
{
Assert(walpropFile >= 0 && !XLByteInSeg(recptr, walpropSegNo, wal_segment_size));
if (close(walpropFile) != 0)
{
char xlogfname[MAXFNAMELEN];
XLogFileName(xlogfname, walpropFileTLI, walpropSegNo, wal_segment_size);
ereport(PANIC,
(errcode_for_file_access(),
errmsg("could not close log segment %s: %m",
xlogfname)));
}
walpropFile = -1;
}
/* START of cloned functions from walsender.c */
/*
* Subscribe for new WAL and stream it in the loop to safekeepers.
*
* At the moment, this never returns, but an ereport(ERROR) will take us back
* to the main loop.
*/
void
StartProposerReplication(StartReplicationCmd *cmd)
{
XLogRecPtr FlushPtr;
TimeLineID currTLI;
#if PG_VERSION_NUM < 150000
if (ThisTimeLineID == 0)
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("IDENTIFY_SYSTEM has not been run before START_REPLICATION")));
#endif
/*
* We assume here that we're logging enough information in the WAL for
* log-shipping, since this is checked in PostmasterMain().
*
* NOTE: wal_level can only change at shutdown, so in most cases it is
* difficult for there to be WAL data that we can still see that was
* written at wal_level='minimal'.
*/
if (cmd->slotname)
{
ReplicationSlotAcquire(cmd->slotname, true);
if (SlotIsLogical(MyReplicationSlot))
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("cannot use a logical replication slot for physical replication")));
/*
* We don't need to verify the slot's restart_lsn here; instead we
* rely on the caller requesting the starting point to use. If the
* WAL segment doesn't exist, we'll fail later.
*/
}
/*
* Select the timeline. If it was given explicitly by the client, use
* that. Otherwise use the timeline of the last replayed record, which is
* kept in ThisTimeLineID.
*
* Neon doesn't currently use PG Timelines, but it may in the future, so
* we keep this code around to lighten the load for when we need it.
*/
#if PG_VERSION_NUM >= 150000
FlushPtr = GetFlushRecPtr(&currTLI);
#else
FlushPtr = GetFlushRecPtr();
currTLI = ThisTimeLineID;
#endif
/*
* When we first start replication the standby will be behind the
* primary. For some applications, for example synchronous
* replication, it is important to have a clear state for this initial
* catchup mode, so we can trigger actions when we change streaming
* state later. We may stay in this state for a long time, which is
* exactly why we want to be able to monitor whether or not we are
* still here.
*/
WalSndSetState(WALSNDSTATE_CATCHUP);
/*
* Don't allow a request to stream from a future point in WAL that
* hasn't been flushed to disk in this server yet.
*/
if (FlushPtr < cmd->startpoint)
{
ereport(ERROR,
(errmsg("requested starting point %X/%X is ahead of the WAL flush position of this server %X/%X",
LSN_FORMAT_ARGS(cmd->startpoint),
LSN_FORMAT_ARGS(FlushPtr))));
}
/* Start streaming from the requested point */
sentPtr = cmd->startpoint;
/* Initialize shared memory status, too */
SpinLockAcquire(&MyWalSnd->mutex);
MyWalSnd->sentPtr = sentPtr;
SpinLockRelease(&MyWalSnd->mutex);
SyncRepInitConfig();
/* Infinite send loop, never returns */
WalSndLoop();
WalSndSetState(WALSNDSTATE_STARTUP);
if (cmd->slotname)
ReplicationSlotRelease();
}
/*
* Main loop that waits for LSN updates and calls the walproposer.
* Synchronous replication sets latch in WalSndWakeup at walsender.c
*/
static void
WalSndLoop(void)
{
/* Clear any already-pending wakeups */
ResetLatch(MyLatch);
for (;;)
{
CHECK_FOR_INTERRUPTS();
XLogBroadcastWalProposer();
if (MyWalSnd->state == WALSNDSTATE_CATCHUP)
WalSndSetState(WALSNDSTATE_STREAMING);
WalProposerPoll();
}
}
/*
* Notify walproposer about the new WAL position.
*/
static void
XLogBroadcastWalProposer(void)
{
XLogRecPtr startptr;
XLogRecPtr endptr;
/* Start from the last sent position */
startptr = sentPtr;
/*
* Streaming the current timeline on a primary.
*
* Attempt to send all data that's already been written out and
* fsync'd to disk. We cannot go further than what's been written out
* given the current implementation of WALRead(). And in any case
* it's unsafe to send WAL that is not securely down to disk on the
* primary: if the primary subsequently crashes and restarts, standbys
* must not have applied any WAL that got lost on the primary.
*/
#if PG_VERSION_NUM >= 150000
endptr = GetFlushRecPtr(NULL);
#else
endptr = GetFlushRecPtr();
#endif
/*
* Record the current system time as an approximation of the time at which
* this WAL location was written for the purposes of lag tracking.
*
* In theory we could make XLogFlush() record a time in shmem whenever WAL
* is flushed and we could get that time as well as the LSN when we call
* GetFlushRecPtr() above (and likewise for the cascading standby
* equivalent), but rather than putting any new code into the hot WAL path
* it seems good enough to capture the time here. We should reach this
* after XLogFlush() runs WalSndWakeupProcessRequests(), and although that
* may take some time, we read the WAL flush pointer and take the time
* very close to together here so that we'll get a later position if it is
* still moving.
*
* Because LagTrackerWrite ignores samples when the LSN hasn't advanced,
* this gives us a cheap approximation for the WAL flush time for this
* LSN.
*
* Note that the LSN is not necessarily the LSN for the data contained in
* the present message; it's the end of the WAL, which might be further
* ahead. All the lag tracking machinery cares about is finding out when
* that arbitrary LSN is eventually reported as written, flushed and
* applied, so that it can measure the elapsed time.
*/
LagTrackerWrite(endptr, GetCurrentTimestamp());
/* Do we have any work to do? */
Assert(startptr <= endptr);
if (endptr <= startptr)
return;
WalProposerBroadcast(startptr, endptr);
sentPtr = endptr;
/* Update shared memory status */
{
WalSnd *walsnd = MyWalSnd;
SpinLockAcquire(&walsnd->mutex);
walsnd->sentPtr = sentPtr;
SpinLockRelease(&walsnd->mutex);
}
/* Report progress of XLOG streaming in PS display */
if (update_process_title)
{
char activitymsg[50];
snprintf(activitymsg, sizeof(activitymsg), "streaming %X/%X",
LSN_FORMAT_ARGS(sentPtr));
set_ps_display(activitymsg);
}
}

View File

@@ -0,0 +1,19 @@
#ifndef __NEON_WALPROPOSER_UTILS_H__
#define __NEON_WALPROPOSER_UTILS_H__
#include "walproposer.h"
int CompareLsn(const void *a, const void *b);
char *FormatSafekeeperState(SafekeeperState state);
void AssertEventsOkForState(uint32 events, Safekeeper *sk);
uint32 SafekeeperStateDesiredEvents(SafekeeperState state);
char *FormatEvents(uint32 events);
bool HexDecodeString(uint8 *result, char *input, int nbytes);
uint32 pq_getmsgint32_le(StringInfo msg);
uint64 pq_getmsgint64_le(StringInfo msg);
void pq_sendint32_le(StringInfo buf, uint32 i);
void pq_sendint64_le(StringInfo buf, uint64 i);
void XLogWalPropWrite(char *buf, Size nbytes, XLogRecPtr recptr);
void XLogWalPropClose(XLogRecPtr recptr);
#endif /* __NEON_WALPROPOSER_UTILS_H__ */

View File

@@ -1085,32 +1085,15 @@ class AbstractNeonCli(abc.ABC):
stderr=subprocess.PIPE,
timeout=timeout,
)
indent = " "
if not res.returncode:
stripped = res.stdout.strip()
lines = stripped.splitlines()
if len(lines) < 2:
log.debug(f"Run {res.args} success: {stripped}")
else:
log.debug("Run %s success:\n%s" % (res.args, textwrap.indent(stripped, indent)))
log.info(f"Run {res.args} success: {res.stdout}")
elif check_return_code:
# this way command output will be in recorded and shown in CI in failure message
indent = indent * 2
msg = textwrap.dedent(
"""\
Run %s failed:
stdout:
%s
stderr:
%s
msg = f"""\
Run {res.args} failed:
stdout: {res.stdout}
stderr: {res.stderr}
"""
)
msg = msg % (
res.args,
textwrap.indent(res.stdout.strip(), indent),
textwrap.indent(res.stderr.strip(), indent),
)
log.info(msg)
raise RuntimeError(msg) from subprocess.CalledProcessError(
res.returncode, res.args, res.stdout, res.stderr

View File

@@ -116,10 +116,6 @@ def get_deletion_queue_submitted(ps_http) -> int:
return get_metric_or_0(ps_http, "pageserver_deletion_queue_submitted_total")
def get_deletion_queue_validated(ps_http) -> int:
return get_metric_or_0(ps_http, "pageserver_deletion_queue_validated_total")
def get_deletion_queue_dropped(ps_http) -> int:
return get_metric_or_0(ps_http, "pageserver_deletion_queue_dropped_total")
@@ -277,15 +273,12 @@ def test_deferred_deletion(neon_env_builder: NeonEnvBuilder):
@pytest.mark.parametrize("keep_attachment", [True, False])
@pytest.mark.parametrize("validate_before", [True, False])
def test_deletion_queue_recovery(
neon_env_builder: NeonEnvBuilder, pg_bin: PgBin, keep_attachment: bool, validate_before: bool
neon_env_builder: NeonEnvBuilder, pg_bin: PgBin, keep_attachment: bool
):
"""
:param keep_attachment: If true, we re-attach after restart. Else, we act as if some other
node took the attachment while we were restarting.
:param validate_before: If true, we wait for deletions to be validated before restart. This
makes them elegible to be executed after restart, if the same node keeps the attachment.
"""
neon_env_builder.enable_generations = True
neon_env_builder.enable_pageserver_remote_storage(
@@ -295,20 +288,12 @@ def test_deletion_queue_recovery(
ps_http = env.pageserver.http_client()
failpoints = [
# Prevent deletion lists from being executed, to build up some backlog of deletions
("deletion-queue-before-execute", "return"),
]
if not validate_before:
failpoints.append(
# Prevent deletion lists from being validated, we will test that they are
# dropped properly during recovery. 'pause' is okay here because we kill
# the pageserver with immediate=true
("control-plane-client-validate", "pause")
)
ps_http.configure_failpoints(failpoints)
# Prevent deletion lists from being executed, to build up some backlog of deletions
ps_http.configure_failpoints(
[
("deletion-queue-before-execute", "return"),
]
)
generate_uploads_and_deletions(env)
@@ -320,16 +305,6 @@ def test_deletion_queue_recovery(
assert get_deletion_queue_unexpected_errors(ps_http) == 0
assert get_deletion_queue_dropped_lsn_updates(ps_http) == 0
if validate_before:
def assert_validation_complete():
assert get_deletion_queue_submitted(ps_http) == get_deletion_queue_validated(ps_http)
wait_until(20, 1, assert_validation_complete)
# A short wait to let the DeletionHeader get written out, as this happens after
# the validated count gets incremented.
time.sleep(1)
log.info(f"Restarting pageserver with {before_restart_depth} deletions enqueued")
env.pageserver.stop(immediate=True)
@@ -352,17 +327,14 @@ def test_deletion_queue_recovery(
ps_http.deletion_queue_flush(execute=True)
wait_until(10, 1, lambda: assert_deletion_queue(ps_http, lambda n: n == 0))
if keep_attachment or validate_before:
# - If we kept the attachment, then our pre-restart deletions should execute
# because on re-attach they were from the immediately preceding generation
# - If we validated before restart, then the deletions should execute because the
# deletion queue header records a validated deletion list sequence number.
if keep_attachment:
# If we kept the attachment, then our pre-restart deletions should have executed
# successfully
assert get_deletion_queue_executed(ps_http) == before_restart_depth
else:
env.pageserver.allowed_errors.extend([".*Dropping stale deletions.*"])
# If we lost the attachment, we should have dropped our pre-restart deletions.
assert get_deletion_queue_dropped(ps_http) == before_restart_depth
env.pageserver.allowed_errors.extend([".*Dropping stale deletions.*"])
assert get_deletion_queue_unexpected_errors(ps_http) == 0
assert get_deletion_queue_dropped_lsn_updates(ps_http) == 0