mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-18 21:50:37 +00:00
Compare commits
14 Commits
rc/2024-10
...
lfc_prewar
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
8852e8a766 | ||
|
|
c04ae556c5 | ||
|
|
44b283e107 | ||
|
|
c7a3359edd | ||
|
|
012a8a360f | ||
|
|
0b42695983 | ||
|
|
284d7b4da6 | ||
|
|
361fc04cd6 | ||
|
|
12f635aa04 | ||
|
|
ac6c53b94b | ||
|
|
df289738b8 | ||
|
|
0d3503a187 | ||
|
|
f328d497e1 | ||
|
|
f971c3a786 |
2
.gitignore
vendored
2
.gitignore
vendored
@@ -6,8 +6,6 @@ __pycache__/
|
||||
test_output/
|
||||
.vscode
|
||||
.idea
|
||||
*.swp
|
||||
tags
|
||||
neon.iml
|
||||
/.neon
|
||||
/integration_tests/.neon
|
||||
|
||||
4
Cargo.lock
generated
4
Cargo.lock
generated
@@ -6272,7 +6272,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "tokio-epoll-uring"
|
||||
version = "0.1.0"
|
||||
source = "git+https://github.com/neondatabase/tokio-epoll-uring.git?branch=main#cb2dcea2058034bc209e7917b01c5097712a3168"
|
||||
source = "git+https://github.com/neondatabase/tokio-epoll-uring.git?branch=main#08ccfa94ff5507727bf4d8d006666b5b192e04c6"
|
||||
dependencies = [
|
||||
"futures",
|
||||
"nix 0.26.4",
|
||||
@@ -6788,7 +6788,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "uring-common"
|
||||
version = "0.1.0"
|
||||
source = "git+https://github.com/neondatabase/tokio-epoll-uring.git?branch=main#cb2dcea2058034bc209e7917b01c5097712a3168"
|
||||
source = "git+https://github.com/neondatabase/tokio-epoll-uring.git?branch=main#08ccfa94ff5507727bf4d8d006666b5b192e04c6"
|
||||
dependencies = [
|
||||
"bytes",
|
||||
"io-uring",
|
||||
|
||||
@@ -1073,10 +1073,10 @@ async fn handle_tenant(subcmd: &TenantCmd, env: &mut local_env::LocalEnv) -> any
|
||||
tenant_id,
|
||||
TimelineCreateRequest {
|
||||
new_timeline_id,
|
||||
mode: pageserver_api::models::TimelineCreateRequestMode::Bootstrap {
|
||||
existing_initdb_timeline_id: None,
|
||||
pg_version: Some(args.pg_version),
|
||||
},
|
||||
ancestor_timeline_id: None,
|
||||
ancestor_start_lsn: None,
|
||||
existing_initdb_timeline_id: None,
|
||||
pg_version: Some(args.pg_version),
|
||||
},
|
||||
)
|
||||
.await?;
|
||||
@@ -1133,10 +1133,10 @@ async fn handle_timeline(cmd: &TimelineCmd, env: &mut local_env::LocalEnv) -> Re
|
||||
let storage_controller = StorageController::from_env(env);
|
||||
let create_req = TimelineCreateRequest {
|
||||
new_timeline_id,
|
||||
mode: pageserver_api::models::TimelineCreateRequestMode::Bootstrap {
|
||||
existing_initdb_timeline_id: None,
|
||||
pg_version: Some(args.pg_version),
|
||||
},
|
||||
ancestor_timeline_id: None,
|
||||
existing_initdb_timeline_id: None,
|
||||
ancestor_start_lsn: None,
|
||||
pg_version: Some(args.pg_version),
|
||||
};
|
||||
let timeline_info = storage_controller
|
||||
.tenant_timeline_create(tenant_id, create_req)
|
||||
@@ -1189,11 +1189,10 @@ async fn handle_timeline(cmd: &TimelineCmd, env: &mut local_env::LocalEnv) -> Re
|
||||
let storage_controller = StorageController::from_env(env);
|
||||
let create_req = TimelineCreateRequest {
|
||||
new_timeline_id,
|
||||
mode: pageserver_api::models::TimelineCreateRequestMode::Branch {
|
||||
ancestor_timeline_id,
|
||||
ancestor_start_lsn: start_lsn,
|
||||
pg_version: None,
|
||||
},
|
||||
ancestor_timeline_id: Some(ancestor_timeline_id),
|
||||
existing_initdb_timeline_id: None,
|
||||
ancestor_start_lsn: start_lsn,
|
||||
pg_version: None,
|
||||
};
|
||||
let timeline_info = storage_controller
|
||||
.tenant_timeline_create(tenant_id, create_req)
|
||||
|
||||
@@ -529,6 +529,28 @@ impl PageServerNode {
|
||||
Ok(self.http_client.list_timelines(*tenant_shard_id).await?)
|
||||
}
|
||||
|
||||
pub async fn timeline_create(
|
||||
&self,
|
||||
tenant_shard_id: TenantShardId,
|
||||
new_timeline_id: TimelineId,
|
||||
ancestor_start_lsn: Option<Lsn>,
|
||||
ancestor_timeline_id: Option<TimelineId>,
|
||||
pg_version: Option<u32>,
|
||||
existing_initdb_timeline_id: Option<TimelineId>,
|
||||
) -> anyhow::Result<TimelineInfo> {
|
||||
let req = models::TimelineCreateRequest {
|
||||
new_timeline_id,
|
||||
ancestor_start_lsn,
|
||||
ancestor_timeline_id,
|
||||
pg_version,
|
||||
existing_initdb_timeline_id,
|
||||
};
|
||||
Ok(self
|
||||
.http_client
|
||||
.timeline_create(tenant_shard_id, &req)
|
||||
.await?)
|
||||
}
|
||||
|
||||
/// Import a basebackup prepared using either:
|
||||
/// a) `pg_basebackup -F tar`, or
|
||||
/// b) The `fullbackup` pageserver endpoint
|
||||
|
||||
@@ -19,7 +19,6 @@ use once_cell::sync::Lazy;
|
||||
use prometheus::core::{
|
||||
Atomic, AtomicU64, Collector, GenericCounter, GenericCounterVec, GenericGauge, GenericGaugeVec,
|
||||
};
|
||||
pub use prometheus::local::LocalHistogram;
|
||||
pub use prometheus::opts;
|
||||
pub use prometheus::register;
|
||||
pub use prometheus::Error;
|
||||
|
||||
@@ -211,30 +211,13 @@ pub enum TimelineState {
|
||||
#[derive(Serialize, Deserialize, Clone)]
|
||||
pub struct TimelineCreateRequest {
|
||||
pub new_timeline_id: TimelineId,
|
||||
#[serde(flatten)]
|
||||
pub mode: TimelineCreateRequestMode,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Clone)]
|
||||
#[serde(untagged)]
|
||||
pub enum TimelineCreateRequestMode {
|
||||
Branch {
|
||||
ancestor_timeline_id: TimelineId,
|
||||
#[serde(default)]
|
||||
ancestor_start_lsn: Option<Lsn>,
|
||||
// TODO: cplane sets this, but, the branching code always
|
||||
// inherits the ancestor's pg_version. Earlier code wasn't
|
||||
// using a flattened enum, so, it was an accepted field, and
|
||||
// we continue to accept it by having it here.
|
||||
pg_version: Option<u32>,
|
||||
},
|
||||
// NB: Bootstrap is all-optional, and thus the serde(untagged) will cause serde to stop at Bootstrap.
|
||||
// (serde picks the first matching enum variant, in declaration order).
|
||||
Bootstrap {
|
||||
#[serde(default)]
|
||||
existing_initdb_timeline_id: Option<TimelineId>,
|
||||
pg_version: Option<u32>,
|
||||
},
|
||||
#[serde(default)]
|
||||
pub ancestor_timeline_id: Option<TimelineId>,
|
||||
#[serde(default)]
|
||||
pub existing_initdb_timeline_id: Option<TimelineId>,
|
||||
#[serde(default)]
|
||||
pub ancestor_start_lsn: Option<Lsn>,
|
||||
pub pg_version: Option<u32>,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Clone)]
|
||||
@@ -1068,12 +1051,6 @@ pub mod virtual_file {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct ScanDisposableKeysResponse {
|
||||
pub disposable_count: usize,
|
||||
pub not_disposable_count: usize,
|
||||
}
|
||||
|
||||
// Wrapped in libpq CopyData
|
||||
#[derive(PartialEq, Eq, Debug)]
|
||||
pub enum PagestreamFeMessage {
|
||||
|
||||
@@ -357,20 +357,22 @@ impl RemoteStorage for LocalFs {
|
||||
.list_recursive(prefix)
|
||||
.await
|
||||
.map_err(DownloadError::Other)?;
|
||||
let mut objects = Vec::with_capacity(keys.len());
|
||||
for key in keys {
|
||||
let path = key.with_base(&self.storage_root);
|
||||
let metadata = file_metadata(&path).await?;
|
||||
if metadata.is_dir() {
|
||||
continue;
|
||||
}
|
||||
objects.push(ListingObject {
|
||||
key: key.clone(),
|
||||
last_modified: metadata.modified()?,
|
||||
size: metadata.len(),
|
||||
});
|
||||
}
|
||||
let objects = objects;
|
||||
let objects = keys
|
||||
.into_iter()
|
||||
.filter_map(|k| {
|
||||
let path = k.with_base(&self.storage_root);
|
||||
if path.is_dir() {
|
||||
None
|
||||
} else {
|
||||
Some(ListingObject {
|
||||
key: k.clone(),
|
||||
// LocalFs is just for testing, so just specify a dummy time
|
||||
last_modified: SystemTime::now(),
|
||||
size: 0,
|
||||
})
|
||||
}
|
||||
})
|
||||
.collect();
|
||||
|
||||
if let ListingMode::NoDelimiter = mode {
|
||||
result.keys = objects;
|
||||
@@ -408,8 +410,9 @@ impl RemoteStorage for LocalFs {
|
||||
} else {
|
||||
result.keys.push(ListingObject {
|
||||
key: RemotePath::from_string(&relative_key).unwrap(),
|
||||
last_modified: object.last_modified,
|
||||
size: object.size,
|
||||
// LocalFs is just for testing
|
||||
last_modified: SystemTime::now(),
|
||||
size: 0,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
@@ -345,6 +345,7 @@ impl AuxFileV2 {
|
||||
AuxFileV2::Recognized("pg_logical/replorigin_checkpoint", hash)
|
||||
}
|
||||
(2, 1) => AuxFileV2::Recognized("pg_replslot/", hash),
|
||||
(4, 1) => AuxFileV2::Recognized("lfc.state", hash),
|
||||
(1, 0xff) => AuxFileV2::OtherWithPrefix("pg_logical/", hash),
|
||||
(0xff, 0xff) => AuxFileV2::Other(hash),
|
||||
_ => return None,
|
||||
|
||||
@@ -39,6 +39,7 @@ fn aux_hash_to_metadata_key(dir_level1: u8, dir_level2: u8, data: &[u8]) -> Key
|
||||
|
||||
const AUX_DIR_PG_LOGICAL: u8 = 0x01;
|
||||
const AUX_DIR_PG_REPLSLOT: u8 = 0x02;
|
||||
const AUX_DIR_LFC_STATE: u8 = 0x04;
|
||||
const AUX_DIR_PG_UNKNOWN: u8 = 0xFF;
|
||||
|
||||
/// Encode the aux file into a fixed-size key.
|
||||
@@ -75,6 +76,8 @@ pub fn encode_aux_file_key(path: &str) -> Key {
|
||||
aux_hash_to_metadata_key(AUX_DIR_PG_LOGICAL, 0xFF, fname.as_bytes())
|
||||
} else if let Some(fname) = path.strip_prefix("pg_replslot/") {
|
||||
aux_hash_to_metadata_key(AUX_DIR_PG_REPLSLOT, 0x01, fname.as_bytes())
|
||||
} else if let Some(fname) = path.strip_prefix("lfc.state") {
|
||||
aux_hash_to_metadata_key(AUX_DIR_LFC_STATE, 0x01, fname.as_bytes())
|
||||
} else {
|
||||
if cfg!(debug_assertions) {
|
||||
warn!(
|
||||
|
||||
@@ -597,10 +597,6 @@ paths:
|
||||
Create a timeline. Returns new timeline id on success.
|
||||
Recreating the same timeline will succeed if the parameters match the existing timeline.
|
||||
If no pg_version is specified, assume DEFAULT_PG_VERSION hardcoded in the pageserver.
|
||||
|
||||
To ensure durability, the caller must retry the creation until success.
|
||||
Just because the timeline is visible via other endpoints does not mean it is durable.
|
||||
Future versions may stop showing timelines that are not yet durable.
|
||||
requestBody:
|
||||
content:
|
||||
application/json:
|
||||
|
||||
@@ -38,7 +38,6 @@ use pageserver_api::models::TenantShardSplitRequest;
|
||||
use pageserver_api::models::TenantShardSplitResponse;
|
||||
use pageserver_api::models::TenantSorting;
|
||||
use pageserver_api::models::TimelineArchivalConfigRequest;
|
||||
use pageserver_api::models::TimelineCreateRequestMode;
|
||||
use pageserver_api::models::TimelinesInfoAndOffloaded;
|
||||
use pageserver_api::models::TopTenantShardItem;
|
||||
use pageserver_api::models::TopTenantShardsRequest;
|
||||
@@ -86,7 +85,6 @@ use crate::tenant::timeline::Timeline;
|
||||
use crate::tenant::GetTimelineError;
|
||||
use crate::tenant::OffloadedTimeline;
|
||||
use crate::tenant::{LogicalSizeCalculationCause, PageReconstructError};
|
||||
use crate::DEFAULT_PG_VERSION;
|
||||
use crate::{disk_usage_eviction_task, tenant};
|
||||
use pageserver_api::models::{
|
||||
StatusResponse, TenantConfigRequest, TenantInfo, TimelineCreateRequest, TimelineGcRequest,
|
||||
@@ -549,26 +547,6 @@ async fn timeline_create_handler(
|
||||
check_permission(&request, Some(tenant_shard_id.tenant_id))?;
|
||||
|
||||
let new_timeline_id = request_data.new_timeline_id;
|
||||
// fill in the default pg_version if not provided & convert request into domain model
|
||||
let params: tenant::CreateTimelineParams = match request_data.mode {
|
||||
TimelineCreateRequestMode::Bootstrap {
|
||||
existing_initdb_timeline_id,
|
||||
pg_version,
|
||||
} => tenant::CreateTimelineParams::Bootstrap(tenant::CreateTimelineParamsBootstrap {
|
||||
new_timeline_id,
|
||||
existing_initdb_timeline_id,
|
||||
pg_version: pg_version.unwrap_or(DEFAULT_PG_VERSION),
|
||||
}),
|
||||
TimelineCreateRequestMode::Branch {
|
||||
ancestor_timeline_id,
|
||||
ancestor_start_lsn,
|
||||
pg_version: _,
|
||||
} => tenant::CreateTimelineParams::Branch(tenant::CreateTimelineParamsBranch {
|
||||
new_timeline_id,
|
||||
ancestor_timeline_id,
|
||||
ancestor_start_lsn,
|
||||
}),
|
||||
};
|
||||
|
||||
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Error);
|
||||
|
||||
@@ -581,12 +559,22 @@ async fn timeline_create_handler(
|
||||
|
||||
tenant.wait_to_become_active(ACTIVE_TENANT_TIMEOUT).await?;
|
||||
|
||||
// earlier versions of the code had pg_version and ancestor_lsn in the span
|
||||
// => continue to provide that information, but, through a log message that doesn't require us to destructure
|
||||
tracing::info!(?params, "creating timeline");
|
||||
if let Some(ancestor_id) = request_data.ancestor_timeline_id.as_ref() {
|
||||
tracing::info!(%ancestor_id, "starting to branch");
|
||||
} else {
|
||||
tracing::info!("bootstrapping");
|
||||
}
|
||||
|
||||
match tenant
|
||||
.create_timeline(params, state.broker_client.clone(), &ctx)
|
||||
.create_timeline(
|
||||
new_timeline_id,
|
||||
request_data.ancestor_timeline_id,
|
||||
request_data.ancestor_start_lsn,
|
||||
request_data.pg_version.unwrap_or(crate::DEFAULT_PG_VERSION),
|
||||
request_data.existing_initdb_timeline_id,
|
||||
state.broker_client.clone(),
|
||||
&ctx,
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(new_timeline) => {
|
||||
@@ -637,6 +625,8 @@ async fn timeline_create_handler(
|
||||
tenant_id = %tenant_shard_id.tenant_id,
|
||||
shard_id = %tenant_shard_id.shard_slug(),
|
||||
timeline_id = %new_timeline_id,
|
||||
lsn=?request_data.ancestor_start_lsn,
|
||||
pg_version=?request_data.pg_version
|
||||
))
|
||||
.await
|
||||
}
|
||||
@@ -1293,99 +1283,6 @@ async fn layer_map_info_handler(
|
||||
json_response(StatusCode::OK, layer_map_info)
|
||||
}
|
||||
|
||||
#[instrument(skip_all, fields(tenant_id, shard_id, timeline_id, layer_name))]
|
||||
async fn timeline_layer_scan_disposable_keys(
|
||||
request: Request<Body>,
|
||||
cancel: CancellationToken,
|
||||
) -> Result<Response<Body>, ApiError> {
|
||||
let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
|
||||
let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
|
||||
let layer_name: LayerName = parse_request_param(&request, "layer_name")?;
|
||||
|
||||
tracing::Span::current().record(
|
||||
"tenant_id",
|
||||
tracing::field::display(&tenant_shard_id.tenant_id),
|
||||
);
|
||||
tracing::Span::current().record(
|
||||
"shard_id",
|
||||
tracing::field::display(tenant_shard_id.shard_slug()),
|
||||
);
|
||||
tracing::Span::current().record("timeline_id", tracing::field::display(&timeline_id));
|
||||
tracing::Span::current().record("layer_name", tracing::field::display(&layer_name));
|
||||
|
||||
let state = get_state(&request);
|
||||
|
||||
check_permission(&request, Some(tenant_shard_id.tenant_id))?;
|
||||
|
||||
// technically the timeline need not be active for this scan to complete
|
||||
let timeline =
|
||||
active_timeline_of_active_tenant(&state.tenant_manager, tenant_shard_id, timeline_id)
|
||||
.await?;
|
||||
|
||||
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
|
||||
|
||||
let guard = timeline.layers.read().await;
|
||||
let Some(layer) = guard.try_get_from_key(&layer_name.clone().into()) else {
|
||||
return Err(ApiError::NotFound(
|
||||
anyhow::anyhow!("Layer {tenant_shard_id}/{timeline_id}/{layer_name} not found").into(),
|
||||
));
|
||||
};
|
||||
|
||||
let resident_layer = layer
|
||||
.download_and_keep_resident()
|
||||
.await
|
||||
.map_err(|err| match err {
|
||||
tenant::storage_layer::layer::DownloadError::TimelineShutdown
|
||||
| tenant::storage_layer::layer::DownloadError::DownloadCancelled => {
|
||||
ApiError::ShuttingDown
|
||||
}
|
||||
tenant::storage_layer::layer::DownloadError::ContextAndConfigReallyDeniesDownloads
|
||||
| tenant::storage_layer::layer::DownloadError::DownloadRequired
|
||||
| tenant::storage_layer::layer::DownloadError::NotFile(_)
|
||||
| tenant::storage_layer::layer::DownloadError::DownloadFailed
|
||||
| tenant::storage_layer::layer::DownloadError::PreStatFailed(_) => {
|
||||
ApiError::InternalServerError(err.into())
|
||||
}
|
||||
#[cfg(test)]
|
||||
tenant::storage_layer::layer::DownloadError::Failpoint(_) => {
|
||||
ApiError::InternalServerError(err.into())
|
||||
}
|
||||
})?;
|
||||
|
||||
let keys = resident_layer
|
||||
.load_keys(&ctx)
|
||||
.await
|
||||
.map_err(ApiError::InternalServerError)?;
|
||||
|
||||
let shard_identity = timeline.get_shard_identity();
|
||||
|
||||
let mut disposable_count = 0;
|
||||
let mut not_disposable_count = 0;
|
||||
let cancel = cancel.clone();
|
||||
for (i, key) in keys.into_iter().enumerate() {
|
||||
if shard_identity.is_key_disposable(&key) {
|
||||
disposable_count += 1;
|
||||
tracing::debug!(key = %key, key.dbg=?key, "disposable key");
|
||||
} else {
|
||||
not_disposable_count += 1;
|
||||
}
|
||||
#[allow(clippy::collapsible_if)]
|
||||
if i % 10000 == 0 {
|
||||
if cancel.is_cancelled() || timeline.cancel.is_cancelled() || timeline.is_stopping() {
|
||||
return Err(ApiError::ShuttingDown);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
json_response(
|
||||
StatusCode::OK,
|
||||
pageserver_api::models::ScanDisposableKeysResponse {
|
||||
disposable_count,
|
||||
not_disposable_count,
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
async fn layer_download_handler(
|
||||
request: Request<Body>,
|
||||
_cancel: CancellationToken,
|
||||
@@ -3248,10 +3145,6 @@ pub fn make_router(
|
||||
"/v1/tenant/:tenant_shard_id/timeline/:timeline_id/layer/:layer_file_name",
|
||||
|r| api_handler(r, evict_timeline_layer_handler),
|
||||
)
|
||||
.post(
|
||||
"/v1/tenant/:tenant_shard_id/timeline/:timeline_id/layer/:layer_name/scan_disposable_keys",
|
||||
|r| testing_api_handler("timeline_layer_scan_disposable_keys", r, timeline_layer_scan_disposable_keys),
|
||||
)
|
||||
.post(
|
||||
"/v1/tenant/:tenant_shard_id/timeline/:timeline_id/block_gc",
|
||||
|r| api_handler(r, timeline_gc_blocking_handler),
|
||||
|
||||
@@ -3040,111 +3040,13 @@ impl<F: Future<Output = Result<O, E>>, O, E> Future for MeasuredRemoteOp<F> {
|
||||
}
|
||||
|
||||
pub mod tokio_epoll_uring {
|
||||
use std::{
|
||||
collections::HashMap,
|
||||
sync::{Arc, Mutex},
|
||||
};
|
||||
|
||||
use metrics::{register_histogram, register_int_counter, Histogram, LocalHistogram, UIntGauge};
|
||||
use metrics::{register_int_counter, UIntGauge};
|
||||
use once_cell::sync::Lazy;
|
||||
|
||||
/// Shared storage for tokio-epoll-uring thread local metrics.
|
||||
pub(crate) static THREAD_LOCAL_METRICS_STORAGE: Lazy<ThreadLocalMetricsStorage> =
|
||||
Lazy::new(|| {
|
||||
let slots_submission_queue_depth = register_histogram!(
|
||||
"pageserver_tokio_epoll_uring_slots_submission_queue_depth",
|
||||
"The slots waiters queue depth of each tokio_epoll_uring system",
|
||||
vec![1.0, 2.0, 4.0, 8.0, 16.0, 32.0, 64.0, 128.0, 256.0, 512.0, 1024.0],
|
||||
)
|
||||
.expect("failed to define a metric");
|
||||
ThreadLocalMetricsStorage {
|
||||
observers: Mutex::new(HashMap::new()),
|
||||
slots_submission_queue_depth,
|
||||
}
|
||||
});
|
||||
|
||||
pub struct ThreadLocalMetricsStorage {
|
||||
/// List of thread local metrics observers.
|
||||
observers: Mutex<HashMap<u64, Arc<ThreadLocalMetrics>>>,
|
||||
/// A histogram shared between all thread local systems
|
||||
/// for collecting slots submission queue depth.
|
||||
slots_submission_queue_depth: Histogram,
|
||||
}
|
||||
|
||||
/// Each thread-local [`tokio_epoll_uring::System`] gets one of these as its
|
||||
/// [`tokio_epoll_uring::metrics::PerSystemMetrics`] generic.
|
||||
///
|
||||
/// The System makes observations into [`Self`] and periodically, the collector
|
||||
/// comes along and flushes [`Self`] into the shared storage [`THREAD_LOCAL_METRICS_STORAGE`].
|
||||
///
|
||||
/// [`LocalHistogram`] is `!Send`, so, we need to put it behind a [`Mutex`].
|
||||
/// But except for the periodic flush, the lock is uncontended so there's no waiting
|
||||
/// for cache coherence protocol to get an exclusive cache line.
|
||||
pub struct ThreadLocalMetrics {
|
||||
/// Local observer of thread local tokio-epoll-uring system's slots waiters queue depth.
|
||||
slots_submission_queue_depth: Mutex<LocalHistogram>,
|
||||
}
|
||||
|
||||
impl ThreadLocalMetricsStorage {
|
||||
/// Registers a new thread local system. Returns a thread local metrics observer.
|
||||
pub fn register_system(&self, id: u64) -> Arc<ThreadLocalMetrics> {
|
||||
let per_system_metrics = Arc::new(ThreadLocalMetrics::new(
|
||||
self.slots_submission_queue_depth.local(),
|
||||
));
|
||||
let mut g = self.observers.lock().unwrap();
|
||||
g.insert(id, Arc::clone(&per_system_metrics));
|
||||
per_system_metrics
|
||||
}
|
||||
|
||||
/// Removes metrics observer for a thread local system.
|
||||
/// This should be called before dropping a thread local system.
|
||||
pub fn remove_system(&self, id: u64) {
|
||||
let mut g = self.observers.lock().unwrap();
|
||||
g.remove(&id);
|
||||
}
|
||||
|
||||
/// Flush all thread local metrics to the shared storage.
|
||||
pub fn flush_thread_local_metrics(&self) {
|
||||
let g = self.observers.lock().unwrap();
|
||||
g.values().for_each(|local| {
|
||||
local.flush();
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
impl ThreadLocalMetrics {
|
||||
pub fn new(slots_submission_queue_depth: LocalHistogram) -> Self {
|
||||
ThreadLocalMetrics {
|
||||
slots_submission_queue_depth: Mutex::new(slots_submission_queue_depth),
|
||||
}
|
||||
}
|
||||
|
||||
/// Flushes the thread local metrics to shared aggregator.
|
||||
pub fn flush(&self) {
|
||||
let Self {
|
||||
slots_submission_queue_depth,
|
||||
} = self;
|
||||
slots_submission_queue_depth.lock().unwrap().flush();
|
||||
}
|
||||
}
|
||||
|
||||
impl tokio_epoll_uring::metrics::PerSystemMetrics for ThreadLocalMetrics {
|
||||
fn observe_slots_submission_queue_depth(&self, queue_depth: u64) {
|
||||
let Self {
|
||||
slots_submission_queue_depth,
|
||||
} = self;
|
||||
slots_submission_queue_depth
|
||||
.lock()
|
||||
.unwrap()
|
||||
.observe(queue_depth as f64);
|
||||
}
|
||||
}
|
||||
|
||||
pub struct Collector {
|
||||
descs: Vec<metrics::core::Desc>,
|
||||
systems_created: UIntGauge,
|
||||
systems_destroyed: UIntGauge,
|
||||
thread_local_metrics_storage: &'static ThreadLocalMetricsStorage,
|
||||
}
|
||||
|
||||
impl metrics::core::Collector for Collector {
|
||||
@@ -3154,7 +3056,7 @@ pub mod tokio_epoll_uring {
|
||||
|
||||
fn collect(&self) -> Vec<metrics::proto::MetricFamily> {
|
||||
let mut mfs = Vec::with_capacity(Self::NMETRICS);
|
||||
let tokio_epoll_uring::metrics::GlobalMetrics {
|
||||
let tokio_epoll_uring::metrics::Metrics {
|
||||
systems_created,
|
||||
systems_destroyed,
|
||||
} = tokio_epoll_uring::metrics::global();
|
||||
@@ -3162,21 +3064,12 @@ pub mod tokio_epoll_uring {
|
||||
mfs.extend(self.systems_created.collect());
|
||||
self.systems_destroyed.set(systems_destroyed);
|
||||
mfs.extend(self.systems_destroyed.collect());
|
||||
|
||||
self.thread_local_metrics_storage
|
||||
.flush_thread_local_metrics();
|
||||
|
||||
mfs.extend(
|
||||
self.thread_local_metrics_storage
|
||||
.slots_submission_queue_depth
|
||||
.collect(),
|
||||
);
|
||||
mfs
|
||||
}
|
||||
}
|
||||
|
||||
impl Collector {
|
||||
const NMETRICS: usize = 3;
|
||||
const NMETRICS: usize = 2;
|
||||
|
||||
#[allow(clippy::new_without_default)]
|
||||
pub fn new() -> Self {
|
||||
@@ -3208,7 +3101,6 @@ pub mod tokio_epoll_uring {
|
||||
descs,
|
||||
systems_created,
|
||||
systems_destroyed,
|
||||
thread_local_metrics_storage: &THREAD_LOCAL_METRICS_STORAGE,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -3568,7 +3460,6 @@ pub fn preinitialize_metrics() {
|
||||
Lazy::force(&RECONSTRUCT_TIME);
|
||||
Lazy::force(&BASEBACKUP_QUERY_TIME);
|
||||
Lazy::force(&COMPUTE_COMMANDS_COUNTERS);
|
||||
Lazy::force(&tokio_epoll_uring::THREAD_LOCAL_METRICS_STORAGE);
|
||||
|
||||
tenant_throttling::preinitialize_global_metrics();
|
||||
}
|
||||
|
||||
@@ -1506,42 +1506,35 @@ impl<'a> DatadirModification<'a> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Drop some relations
|
||||
pub(crate) async fn put_rel_drops(
|
||||
&mut self,
|
||||
drop_relations: HashMap<(u32, u32), Vec<RelTag>>,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
for ((spc_node, db_node), rel_tags) in drop_relations {
|
||||
let dir_key = rel_dir_to_key(spc_node, db_node);
|
||||
let buf = self.get(dir_key, ctx).await?;
|
||||
let mut dir = RelDirectory::des(&buf)?;
|
||||
/// Drop a relation.
|
||||
pub async fn put_rel_drop(&mut self, rel: RelTag, ctx: &RequestContext) -> anyhow::Result<()> {
|
||||
anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
|
||||
|
||||
let mut dirty = false;
|
||||
for rel_tag in rel_tags {
|
||||
if dir.rels.remove(&(rel_tag.relnode, rel_tag.forknum)) {
|
||||
dirty = true;
|
||||
// Remove it from the directory entry
|
||||
let dir_key = rel_dir_to_key(rel.spcnode, rel.dbnode);
|
||||
let buf = self.get(dir_key, ctx).await?;
|
||||
let mut dir = RelDirectory::des(&buf)?;
|
||||
|
||||
// update logical size
|
||||
let size_key = rel_size_to_key(rel_tag);
|
||||
let old_size = self.get(size_key, ctx).await?.get_u32_le();
|
||||
self.pending_nblocks -= old_size as i64;
|
||||
self.pending_directory_entries
|
||||
.push((DirectoryKind::Rel, dir.rels.len()));
|
||||
|
||||
// Remove entry from relation size cache
|
||||
self.tline.remove_cached_rel_size(&rel_tag);
|
||||
|
||||
// Delete size entry, as well as all blocks
|
||||
self.delete(rel_key_range(rel_tag));
|
||||
}
|
||||
}
|
||||
|
||||
if dirty {
|
||||
self.put(dir_key, Value::Image(Bytes::from(RelDirectory::ser(&dir)?)));
|
||||
self.pending_directory_entries
|
||||
.push((DirectoryKind::Rel, dir.rels.len()));
|
||||
}
|
||||
if dir.rels.remove(&(rel.relnode, rel.forknum)) {
|
||||
self.put(dir_key, Value::Image(Bytes::from(RelDirectory::ser(&dir)?)));
|
||||
} else {
|
||||
warn!("dropped rel {} did not exist in rel directory", rel);
|
||||
}
|
||||
|
||||
// update logical size
|
||||
let size_key = rel_size_to_key(rel);
|
||||
let old_size = self.get(size_key, ctx).await?.get_u32_le();
|
||||
self.pending_nblocks -= old_size as i64;
|
||||
|
||||
// Remove enty from relation size cache
|
||||
self.tline.remove_cached_rel_size(&rel);
|
||||
|
||||
// Delete size entry, as well as all blocks
|
||||
self.delete(rel_key_range(rel));
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
||||
@@ -294,11 +294,11 @@ pub struct Tenant {
|
||||
|
||||
/// During timeline creation, we first insert the TimelineId to the
|
||||
/// creating map, then `timelines`, then remove it from the creating map.
|
||||
/// **Lock order**: if acquiring all (or a subset), acquire them in order `timelines`, `timelines_offloaded`, `timelines_creating`
|
||||
/// **Lock order**: if acquiring both, acquire`timelines` before `timelines_creating`
|
||||
timelines_creating: std::sync::Mutex<HashSet<TimelineId>>,
|
||||
|
||||
/// Possibly offloaded and archived timelines
|
||||
/// **Lock order**: if acquiring all (or a subset), acquire them in order `timelines`, `timelines_offloaded`, `timelines_creating`
|
||||
/// **Lock order**: if acquiring both, acquire`timelines` before `timelines_offloaded`
|
||||
timelines_offloaded: Mutex<HashMap<TimelineId, Arc<OffloadedTimeline>>>,
|
||||
|
||||
// This mutex prevents creation of new timelines during GC.
|
||||
@@ -584,40 +584,30 @@ impl OffloadedTimeline {
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Debug for OffloadedTimeline {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
write!(f, "OffloadedTimeline<{}>", self.timeline_id)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Copy, Clone, PartialEq, Eq, Hash, Debug)]
|
||||
pub enum MaybeOffloaded {
|
||||
Yes,
|
||||
No,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
#[derive(Clone)]
|
||||
pub enum TimelineOrOffloaded {
|
||||
Timeline(Arc<Timeline>),
|
||||
Offloaded(Arc<OffloadedTimeline>),
|
||||
}
|
||||
|
||||
impl TimelineOrOffloaded {
|
||||
pub fn arc_ref(&self) -> TimelineOrOffloadedArcRef<'_> {
|
||||
pub fn tenant_shard_id(&self) -> TenantShardId {
|
||||
match self {
|
||||
TimelineOrOffloaded::Timeline(timeline) => {
|
||||
TimelineOrOffloadedArcRef::Timeline(timeline)
|
||||
}
|
||||
TimelineOrOffloaded::Offloaded(offloaded) => {
|
||||
TimelineOrOffloadedArcRef::Offloaded(offloaded)
|
||||
}
|
||||
TimelineOrOffloaded::Timeline(timeline) => timeline.tenant_shard_id,
|
||||
TimelineOrOffloaded::Offloaded(offloaded) => offloaded.tenant_shard_id,
|
||||
}
|
||||
}
|
||||
pub fn tenant_shard_id(&self) -> TenantShardId {
|
||||
self.arc_ref().tenant_shard_id()
|
||||
}
|
||||
pub fn timeline_id(&self) -> TimelineId {
|
||||
self.arc_ref().timeline_id()
|
||||
match self {
|
||||
TimelineOrOffloaded::Timeline(timeline) => timeline.timeline_id,
|
||||
TimelineOrOffloaded::Offloaded(offloaded) => offloaded.timeline_id,
|
||||
}
|
||||
}
|
||||
pub fn delete_progress(&self) -> &Arc<tokio::sync::Mutex<DeleteTimelineFlow>> {
|
||||
match self {
|
||||
@@ -625,7 +615,7 @@ impl TimelineOrOffloaded {
|
||||
TimelineOrOffloaded::Offloaded(offloaded) => &offloaded.delete_progress,
|
||||
}
|
||||
}
|
||||
fn remote_client_maybe_construct(&self, tenant: &Tenant) -> Arc<RemoteTimelineClient> {
|
||||
pub fn remote_client_maybe_construct(&self, tenant: &Tenant) -> Arc<RemoteTimelineClient> {
|
||||
match self {
|
||||
TimelineOrOffloaded::Timeline(timeline) => timeline.remote_client.clone(),
|
||||
TimelineOrOffloaded::Offloaded(offloaded) => match offloaded.remote_client.clone() {
|
||||
@@ -642,38 +632,6 @@ impl TimelineOrOffloaded {
|
||||
}
|
||||
}
|
||||
|
||||
pub enum TimelineOrOffloadedArcRef<'a> {
|
||||
Timeline(&'a Arc<Timeline>),
|
||||
Offloaded(&'a Arc<OffloadedTimeline>),
|
||||
}
|
||||
|
||||
impl TimelineOrOffloadedArcRef<'_> {
|
||||
pub fn tenant_shard_id(&self) -> TenantShardId {
|
||||
match self {
|
||||
TimelineOrOffloadedArcRef::Timeline(timeline) => timeline.tenant_shard_id,
|
||||
TimelineOrOffloadedArcRef::Offloaded(offloaded) => offloaded.tenant_shard_id,
|
||||
}
|
||||
}
|
||||
pub fn timeline_id(&self) -> TimelineId {
|
||||
match self {
|
||||
TimelineOrOffloadedArcRef::Timeline(timeline) => timeline.timeline_id,
|
||||
TimelineOrOffloadedArcRef::Offloaded(offloaded) => offloaded.timeline_id,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> From<&'a Arc<Timeline>> for TimelineOrOffloadedArcRef<'a> {
|
||||
fn from(timeline: &'a Arc<Timeline>) -> Self {
|
||||
Self::Timeline(timeline)
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> From<&'a Arc<OffloadedTimeline>> for TimelineOrOffloadedArcRef<'a> {
|
||||
fn from(timeline: &'a Arc<OffloadedTimeline>) -> Self {
|
||||
Self::Offloaded(timeline)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, thiserror::Error, PartialEq, Eq)]
|
||||
pub enum GetTimelineError {
|
||||
#[error("Timeline is shutting down")]
|
||||
@@ -779,99 +737,6 @@ impl Debug for SetStoppingError {
|
||||
}
|
||||
}
|
||||
|
||||
/// Arguments to [`Tenant::create_timeline`].
|
||||
///
|
||||
/// Not usable as an idempotency key for timeline creation because if [`CreateTimelineParamsBranch::ancestor_start_lsn`]
|
||||
/// is `None`, the result of the timeline create call is not deterministic.
|
||||
///
|
||||
/// See [`CreateTimelineIdempotency`] for an idempotency key.
|
||||
#[derive(Debug)]
|
||||
pub(crate) enum CreateTimelineParams {
|
||||
Bootstrap(CreateTimelineParamsBootstrap),
|
||||
Branch(CreateTimelineParamsBranch),
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct CreateTimelineParamsBootstrap {
|
||||
pub(crate) new_timeline_id: TimelineId,
|
||||
pub(crate) existing_initdb_timeline_id: Option<TimelineId>,
|
||||
pub(crate) pg_version: u32,
|
||||
}
|
||||
|
||||
/// NB: See comment on [`CreateTimelineIdempotency::Branch`] for why there's no `pg_version` here.
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct CreateTimelineParamsBranch {
|
||||
pub(crate) new_timeline_id: TimelineId,
|
||||
pub(crate) ancestor_timeline_id: TimelineId,
|
||||
pub(crate) ancestor_start_lsn: Option<Lsn>,
|
||||
}
|
||||
|
||||
/// What is used to determine idempotency of a [`Tenant::create_timeline`] call in [`Tenant::start_creating_timeline`].
|
||||
///
|
||||
/// Each [`Timeline`] object holds [`Self`] as an immutable property in [`Timeline::create_idempotency`].
|
||||
///
|
||||
/// We lower timeline creation requests to [`Self`], and then use [`PartialEq::eq`] to compare [`Timeline::create_idempotency`] with the request.
|
||||
/// If they are equal, we return a reference to the existing timeline, otherwise it's an idempotency conflict.
|
||||
///
|
||||
/// There is special treatment for [`Self::FailWithConflict`] to always return an idempotency conflict.
|
||||
/// It would be nice to have more advanced derive macros to make that special treatment declarative.
|
||||
///
|
||||
/// Notes:
|
||||
/// - Unlike [`CreateTimelineParams`], ancestor LSN is fixed, so, branching will be at a deterministic LSN.
|
||||
/// - We make some trade-offs though, e.g., [`CreateTimelineParamsBootstrap::existing_initdb_timeline_id`]
|
||||
/// is not considered for idempotency. We can improve on this over time if we deem it necessary.
|
||||
///
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub(crate) enum CreateTimelineIdempotency {
|
||||
/// NB: special treatment, see comment in [`Self`].
|
||||
FailWithConflict,
|
||||
Bootstrap {
|
||||
pg_version: u32,
|
||||
},
|
||||
/// NB: branches always have the same `pg_version` as their ancestor.
|
||||
/// While [`pageserver_api::models::TimelineCreateRequestMode::Branch::pg_version`]
|
||||
/// exists as a field, and is set by cplane, it has always been ignored by pageserver when
|
||||
/// determining the child branch pg_version.
|
||||
Branch {
|
||||
ancestor_timeline_id: TimelineId,
|
||||
ancestor_start_lsn: Lsn,
|
||||
},
|
||||
}
|
||||
|
||||
/// What is returned by [`Tenant::start_creating_timeline`].
|
||||
#[must_use]
|
||||
enum StartCreatingTimelineResult<'t> {
|
||||
CreateGuard(TimelineCreateGuard<'t>),
|
||||
Idempotent(Arc<Timeline>),
|
||||
}
|
||||
|
||||
/// What is returned by [`Tenant::create_timeline`].
|
||||
enum CreateTimelineResult {
|
||||
Created(Arc<Timeline>),
|
||||
Idempotent(Arc<Timeline>),
|
||||
}
|
||||
|
||||
impl CreateTimelineResult {
|
||||
fn discriminant(&self) -> &'static str {
|
||||
match self {
|
||||
Self::Created(_) => "Created",
|
||||
Self::Idempotent(_) => "Idempotent",
|
||||
}
|
||||
}
|
||||
fn timeline(&self) -> &Arc<Timeline> {
|
||||
match self {
|
||||
Self::Created(t) | Self::Idempotent(t) => t,
|
||||
}
|
||||
}
|
||||
/// Unit test timelines aren't activated, test has to do it if it needs to.
|
||||
#[cfg(test)]
|
||||
fn into_timeline_for_test(self) -> Arc<Timeline> {
|
||||
match self {
|
||||
Self::Created(t) | Self::Idempotent(t) => t,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(thiserror::Error, Debug)]
|
||||
pub enum CreateTimelineError {
|
||||
#[error("creation of timeline with the given ID is in progress")]
|
||||
@@ -1011,24 +876,12 @@ impl Tenant {
|
||||
) -> anyhow::Result<()> {
|
||||
let tenant_id = self.tenant_shard_id;
|
||||
|
||||
let idempotency = if metadata.ancestor_timeline().is_none() {
|
||||
CreateTimelineIdempotency::Bootstrap {
|
||||
pg_version: metadata.pg_version(),
|
||||
}
|
||||
} else {
|
||||
CreateTimelineIdempotency::Branch {
|
||||
ancestor_timeline_id: metadata.ancestor_timeline().unwrap(),
|
||||
ancestor_start_lsn: metadata.ancestor_lsn(),
|
||||
}
|
||||
};
|
||||
|
||||
let timeline = self.create_timeline_struct(
|
||||
timeline_id,
|
||||
&metadata,
|
||||
ancestor.clone(),
|
||||
resources,
|
||||
CreateTimelineCause::Load,
|
||||
idempotency.clone(),
|
||||
)?;
|
||||
let disk_consistent_lsn = timeline.get_disk_consistent_lsn();
|
||||
anyhow::ensure!(
|
||||
@@ -1821,8 +1674,6 @@ impl Tenant {
|
||||
}
|
||||
|
||||
/// Loads the specified (offloaded) timeline from S3 and attaches it as a loaded timeline
|
||||
///
|
||||
/// Counterpart to [`offload_timeline`].
|
||||
async fn unoffload_timeline(
|
||||
self: &Arc<Self>,
|
||||
timeline_id: TimelineId,
|
||||
@@ -1831,24 +1682,6 @@ impl Tenant {
|
||||
) -> Result<Arc<Timeline>, TimelineArchivalError> {
|
||||
info!("unoffloading timeline");
|
||||
let cancel = self.cancel.clone();
|
||||
|
||||
// Protect against concurrent attempts to use this TimelineId
|
||||
// We don't care much about idempotency, as it's ensured a layer above.
|
||||
let allow_offloaded = true;
|
||||
let _create_guard = self
|
||||
.create_timeline_create_guard(
|
||||
timeline_id,
|
||||
CreateTimelineIdempotency::FailWithConflict,
|
||||
allow_offloaded,
|
||||
)
|
||||
.map_err(|err| match err {
|
||||
TimelineExclusionError::AlreadyCreating => TimelineArchivalError::AlreadyInProgress,
|
||||
TimelineExclusionError::AlreadyExists { .. } => {
|
||||
TimelineArchivalError::Other(anyhow::anyhow!("Timeline already exists"))
|
||||
}
|
||||
TimelineExclusionError::Other(e) => TimelineArchivalError::Other(e),
|
||||
})?;
|
||||
|
||||
let timeline_preload = self
|
||||
.load_timeline_metadata(timeline_id, self.remote_storage.clone(), cancel.clone())
|
||||
.await;
|
||||
@@ -2115,17 +1948,16 @@ impl Tenant {
|
||||
self.timelines.lock().unwrap().keys().cloned().collect()
|
||||
}
|
||||
|
||||
/// This is used by tests & import-from-basebackup.
|
||||
/// This is used to create the initial 'main' timeline during bootstrapping,
|
||||
/// or when importing a new base backup. The caller is expected to load an
|
||||
/// initial image of the datadir to the new timeline after this.
|
||||
///
|
||||
/// The returned [`UninitializedTimeline`] contains no data nor metadata and it is in
|
||||
/// a state that will fail [`Tenant::load_remote_timeline`] because `disk_consistent_lsn=Lsn(0)`.
|
||||
/// Until that happens, the on-disk state is invalid (disk_consistent_lsn=Lsn(0))
|
||||
/// and the timeline will fail to load at a restart.
|
||||
///
|
||||
/// The caller is responsible for getting the timeline into a state that will be accepted
|
||||
/// by [`Tenant::load_remote_timeline`] / [`Tenant::attach`].
|
||||
/// Then they may call [`UninitializedTimeline::finish_creation`] to add the timeline
|
||||
/// to the [`Tenant::timelines`].
|
||||
///
|
||||
/// Tests should use `Tenant::create_test_timeline` to set up the minimum required metadata keys.
|
||||
/// For tests, use `DatadirModification::init_empty_test_timeline` + `commit` to setup the
|
||||
/// minimum amount of keys required to get a writable timeline.
|
||||
/// (Without it, `put` might fail due to `repartition` failing.)
|
||||
pub(crate) async fn create_empty_timeline(
|
||||
&self,
|
||||
new_timeline_id: TimelineId,
|
||||
@@ -2139,15 +1971,7 @@ impl Tenant {
|
||||
);
|
||||
|
||||
// Protect against concurrent attempts to use this TimelineId
|
||||
let create_guard = match self
|
||||
.start_creating_timeline(new_timeline_id, CreateTimelineIdempotency::FailWithConflict)
|
||||
.await?
|
||||
{
|
||||
StartCreatingTimelineResult::CreateGuard(guard) => guard,
|
||||
StartCreatingTimelineResult::Idempotent(_) => {
|
||||
unreachable!("FailWithConflict implies we get an error instead")
|
||||
}
|
||||
};
|
||||
let create_guard = self.create_timeline_create_guard(new_timeline_id)?;
|
||||
|
||||
let new_metadata = TimelineMetadata::new(
|
||||
// Initialize disk_consistent LSN to 0, The caller must import some data to
|
||||
@@ -2266,7 +2090,11 @@ impl Tenant {
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub(crate) async fn create_timeline(
|
||||
self: &Arc<Tenant>,
|
||||
params: CreateTimelineParams,
|
||||
new_timeline_id: TimelineId,
|
||||
ancestor_timeline_id: Option<TimelineId>,
|
||||
mut ancestor_start_lsn: Option<Lsn>,
|
||||
pg_version: u32,
|
||||
load_existing_initdb: Option<TimelineId>,
|
||||
broker_client: storage_broker::BrokerClientChannel,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<Arc<Timeline>, CreateTimelineError> {
|
||||
@@ -2285,25 +2113,54 @@ impl Tenant {
|
||||
.enter()
|
||||
.map_err(|_| CreateTimelineError::ShuttingDown)?;
|
||||
|
||||
let result: CreateTimelineResult = match params {
|
||||
CreateTimelineParams::Bootstrap(CreateTimelineParamsBootstrap {
|
||||
new_timeline_id,
|
||||
existing_initdb_timeline_id,
|
||||
pg_version,
|
||||
}) => {
|
||||
self.bootstrap_timeline(
|
||||
new_timeline_id,
|
||||
pg_version,
|
||||
existing_initdb_timeline_id,
|
||||
ctx,
|
||||
)
|
||||
.await?
|
||||
// Get exclusive access to the timeline ID: this ensures that it does not already exist,
|
||||
// and that no other creation attempts will be allowed in while we are working.
|
||||
let create_guard = match self.create_timeline_create_guard(new_timeline_id) {
|
||||
Ok(m) => m,
|
||||
Err(TimelineExclusionError::AlreadyCreating) => {
|
||||
// Creation is in progress, we cannot create it again, and we cannot
|
||||
// check if this request matches the existing one, so caller must try
|
||||
// again later.
|
||||
return Err(CreateTimelineError::AlreadyCreating);
|
||||
}
|
||||
CreateTimelineParams::Branch(CreateTimelineParamsBranch {
|
||||
new_timeline_id,
|
||||
ancestor_timeline_id,
|
||||
mut ancestor_start_lsn,
|
||||
}) => {
|
||||
Err(TimelineExclusionError::Other(e)) => {
|
||||
return Err(CreateTimelineError::Other(e));
|
||||
}
|
||||
Err(TimelineExclusionError::AlreadyExists(existing)) => {
|
||||
debug!("timeline {new_timeline_id} already exists");
|
||||
|
||||
// Idempotency: creating the same timeline twice is not an error, unless
|
||||
// the second creation has different parameters.
|
||||
if existing.get_ancestor_timeline_id() != ancestor_timeline_id
|
||||
|| existing.pg_version != pg_version
|
||||
|| (ancestor_start_lsn.is_some()
|
||||
&& ancestor_start_lsn != Some(existing.get_ancestor_lsn()))
|
||||
{
|
||||
return Err(CreateTimelineError::Conflict);
|
||||
}
|
||||
|
||||
// Wait for uploads to complete, so that when we return Ok, the timeline
|
||||
// is known to be durable on remote storage. Just like we do at the end of
|
||||
// this function, after we have created the timeline ourselves.
|
||||
//
|
||||
// We only really care that the initial version of `index_part.json` has
|
||||
// been uploaded. That's enough to remember that the timeline
|
||||
// exists. However, there is no function to wait specifically for that so
|
||||
// we just wait for all in-progress uploads to finish.
|
||||
existing
|
||||
.remote_client
|
||||
.wait_completion()
|
||||
.await
|
||||
.context("wait for timeline uploads to complete")?;
|
||||
|
||||
return Ok(existing);
|
||||
}
|
||||
};
|
||||
|
||||
pausable_failpoint!("timeline-creation-after-uninit");
|
||||
|
||||
let loaded_timeline = match ancestor_timeline_id {
|
||||
Some(ancestor_timeline_id) => {
|
||||
let ancestor_timeline = self
|
||||
.get_timeline(ancestor_timeline_id, false)
|
||||
.context("Cannot branch off the timeline that's not present in pageserver")?;
|
||||
@@ -2350,48 +2207,43 @@ impl Tenant {
|
||||
})?;
|
||||
}
|
||||
|
||||
self.branch_timeline(&ancestor_timeline, new_timeline_id, ancestor_start_lsn, ctx)
|
||||
.await?
|
||||
self.branch_timeline(
|
||||
&ancestor_timeline,
|
||||
new_timeline_id,
|
||||
ancestor_start_lsn,
|
||||
create_guard,
|
||||
ctx,
|
||||
)
|
||||
.await?
|
||||
}
|
||||
None => {
|
||||
self.bootstrap_timeline(
|
||||
new_timeline_id,
|
||||
pg_version,
|
||||
load_existing_initdb,
|
||||
create_guard,
|
||||
ctx,
|
||||
)
|
||||
.await?
|
||||
}
|
||||
};
|
||||
|
||||
// At this point we have dropped our guard on [`Self::timelines_creating`], and
|
||||
// the timeline is visible in [`Self::timelines`], but it is _not_ durable yet. We must
|
||||
// not send a success to the caller until it is. The same applies to idempotent retries.
|
||||
//
|
||||
// TODO: the timeline is already visible in [`Self::timelines`]; a caller could incorrectly
|
||||
// assume that, because they can see the timeline via API, that the creation is done and
|
||||
// that it is durable. Ideally, we would keep the timeline hidden (in [`Self::timelines_creating`])
|
||||
// until it is durable, e.g., by extending the time we hold the creation guard. This also
|
||||
// interacts with UninitializedTimeline and is generally a bit tricky.
|
||||
//
|
||||
// To re-emphasize: the only correct way to create a timeline is to repeat calling the
|
||||
// creation API until it returns success. Only then is durability guaranteed.
|
||||
info!(creation_result=%result.discriminant(), "waiting for timeline to be durable");
|
||||
result
|
||||
.timeline()
|
||||
// not send a success to the caller until it is. The same applies to handling retries,
|
||||
// see the handling of [`TimelineExclusionError::AlreadyExists`] above.
|
||||
let kind = ancestor_timeline_id
|
||||
.map(|_| "branched")
|
||||
.unwrap_or("bootstrapped");
|
||||
loaded_timeline
|
||||
.remote_client
|
||||
.wait_completion()
|
||||
.await
|
||||
.context("wait for timeline initial uploads to complete")?;
|
||||
.with_context(|| format!("wait for {} timeline initial uploads to complete", kind))?;
|
||||
|
||||
// The creating task is responsible for activating the timeline.
|
||||
// We do this after `wait_completion()` so that we don't spin up tasks that start
|
||||
// doing stuff before the IndexPart is durable in S3, which is done by the previous section.
|
||||
let activated_timeline = match result {
|
||||
CreateTimelineResult::Created(timeline) => {
|
||||
timeline.activate(self.clone(), broker_client, None, ctx);
|
||||
timeline
|
||||
}
|
||||
CreateTimelineResult::Idempotent(timeline) => {
|
||||
info!(
|
||||
"request was deemed idempotent, activation will be done by the creating task"
|
||||
);
|
||||
timeline
|
||||
}
|
||||
};
|
||||
loaded_timeline.activate(self.clone(), broker_client, None, ctx);
|
||||
|
||||
Ok(activated_timeline)
|
||||
Ok(loaded_timeline)
|
||||
}
|
||||
|
||||
pub(crate) async fn delete_timeline(
|
||||
@@ -3048,58 +2900,33 @@ impl Tenant {
|
||||
&self,
|
||||
child_shards: &Vec<TenantShardId>,
|
||||
) -> anyhow::Result<()> {
|
||||
let (timelines, offloaded) = {
|
||||
let timelines = self.timelines.lock().unwrap();
|
||||
let offloaded = self.timelines_offloaded.lock().unwrap();
|
||||
(timelines.clone(), offloaded.clone())
|
||||
};
|
||||
let timelines_iter = timelines
|
||||
.values()
|
||||
.map(TimelineOrOffloadedArcRef::<'_>::from)
|
||||
.chain(
|
||||
offloaded
|
||||
.values()
|
||||
.map(TimelineOrOffloadedArcRef::<'_>::from),
|
||||
);
|
||||
for timeline in timelines_iter {
|
||||
let timelines = self.timelines.lock().unwrap().clone();
|
||||
for timeline in timelines.values() {
|
||||
// We do not block timeline creation/deletion during splits inside the pageserver: it is up to higher levels
|
||||
// to ensure that they do not start a split if currently in the process of doing these.
|
||||
|
||||
let timeline_id = timeline.timeline_id();
|
||||
|
||||
if let TimelineOrOffloadedArcRef::Timeline(timeline) = timeline {
|
||||
// Upload an index from the parent: this is partly to provide freshness for the
|
||||
// child tenants that will copy it, and partly for general ease-of-debugging: there will
|
||||
// always be a parent shard index in the same generation as we wrote the child shard index.
|
||||
tracing::info!(%timeline_id, "Uploading index");
|
||||
timeline
|
||||
.remote_client
|
||||
.schedule_index_upload_for_file_changes()?;
|
||||
timeline.remote_client.wait_completion().await?;
|
||||
}
|
||||
|
||||
let remote_client = match timeline {
|
||||
TimelineOrOffloadedArcRef::Timeline(timeline) => timeline.remote_client.clone(),
|
||||
TimelineOrOffloadedArcRef::Offloaded(offloaded) => {
|
||||
let remote_client = self
|
||||
.build_timeline_client(offloaded.timeline_id, self.remote_storage.clone());
|
||||
Arc::new(remote_client)
|
||||
}
|
||||
};
|
||||
// Upload an index from the parent: this is partly to provide freshness for the
|
||||
// child tenants that will copy it, and partly for general ease-of-debugging: there will
|
||||
// always be a parent shard index in the same generation as we wrote the child shard index.
|
||||
tracing::info!(timeline_id=%timeline.timeline_id, "Uploading index");
|
||||
timeline
|
||||
.remote_client
|
||||
.schedule_index_upload_for_file_changes()?;
|
||||
timeline.remote_client.wait_completion().await?;
|
||||
|
||||
// Shut down the timeline's remote client: this means that the indices we write
|
||||
// for child shards will not be invalidated by the parent shard deleting layers.
|
||||
tracing::info!(%timeline_id, "Shutting down remote storage client");
|
||||
remote_client.shutdown().await;
|
||||
tracing::info!(timeline_id=%timeline.timeline_id, "Shutting down remote storage client");
|
||||
timeline.remote_client.shutdown().await;
|
||||
|
||||
// Download methods can still be used after shutdown, as they don't flow through the remote client's
|
||||
// queue. In principal the RemoteTimelineClient could provide this without downloading it, but this
|
||||
// operation is rare, so it's simpler to just download it (and robustly guarantees that the index
|
||||
// we use here really is the remotely persistent one).
|
||||
tracing::info!(%timeline_id, "Downloading index_part from parent");
|
||||
let result = remote_client
|
||||
tracing::info!(timeline_id=%timeline.timeline_id, "Downloading index_part from parent");
|
||||
let result = timeline.remote_client
|
||||
.download_index_file(&self.cancel)
|
||||
.instrument(info_span!("download_index_file", tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(), %timeline_id))
|
||||
.instrument(info_span!("download_index_file", tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(), timeline_id=%timeline.timeline_id))
|
||||
.await?;
|
||||
let index_part = match result {
|
||||
MaybeDeletedIndexPart::Deleted(_) => {
|
||||
@@ -3109,11 +2936,11 @@ impl Tenant {
|
||||
};
|
||||
|
||||
for child_shard in child_shards {
|
||||
tracing::info!(%timeline_id, "Uploading index_part for child {}", child_shard.to_index());
|
||||
tracing::info!(timeline_id=%timeline.timeline_id, "Uploading index_part for child {}", child_shard.to_index());
|
||||
upload_index_part(
|
||||
&self.remote_storage,
|
||||
child_shard,
|
||||
&timeline_id,
|
||||
&timeline.timeline_id,
|
||||
self.generation,
|
||||
&index_part,
|
||||
&self.cancel,
|
||||
@@ -3122,6 +2949,8 @@ impl Tenant {
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: also copy index files of offloaded timelines
|
||||
|
||||
let tenant_manifest = self.tenant_manifest();
|
||||
// TODO: generation support
|
||||
let generation = remote_timeline_client::TENANT_MANIFEST_GENERATION;
|
||||
@@ -3404,7 +3233,6 @@ impl Tenant {
|
||||
ancestor: Option<Arc<Timeline>>,
|
||||
resources: TimelineResources,
|
||||
cause: CreateTimelineCause,
|
||||
create_idempotency: CreateTimelineIdempotency,
|
||||
) -> anyhow::Result<Arc<Timeline>> {
|
||||
let state = match cause {
|
||||
CreateTimelineCause::Load => {
|
||||
@@ -3434,7 +3262,6 @@ impl Tenant {
|
||||
pg_version,
|
||||
state,
|
||||
self.attach_wal_lag_cooldown.clone(),
|
||||
create_idempotency,
|
||||
self.cancel.child_token(),
|
||||
);
|
||||
|
||||
@@ -3920,16 +3747,16 @@ impl Tenant {
|
||||
/// timeline background tasks are launched, except the flush loop.
|
||||
#[cfg(test)]
|
||||
async fn branch_timeline_test(
|
||||
self: &Arc<Self>,
|
||||
&self,
|
||||
src_timeline: &Arc<Timeline>,
|
||||
dst_id: TimelineId,
|
||||
ancestor_lsn: Option<Lsn>,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<Arc<Timeline>, CreateTimelineError> {
|
||||
let create_guard = self.create_timeline_create_guard(dst_id).unwrap();
|
||||
let tl = self
|
||||
.branch_timeline_impl(src_timeline, dst_id, ancestor_lsn, ctx)
|
||||
.await?
|
||||
.into_timeline_for_test();
|
||||
.branch_timeline_impl(src_timeline, dst_id, ancestor_lsn, create_guard, ctx)
|
||||
.await?;
|
||||
tl.set_state(TimelineState::Active);
|
||||
Ok(tl)
|
||||
}
|
||||
@@ -3938,7 +3765,7 @@ impl Tenant {
|
||||
#[cfg(test)]
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub async fn branch_timeline_test_with_layers(
|
||||
self: &Arc<Self>,
|
||||
&self,
|
||||
src_timeline: &Arc<Timeline>,
|
||||
dst_id: TimelineId,
|
||||
ancestor_lsn: Option<Lsn>,
|
||||
@@ -3986,24 +3813,28 @@ impl Tenant {
|
||||
}
|
||||
|
||||
/// Branch an existing timeline.
|
||||
///
|
||||
/// The caller is responsible for activating the returned timeline.
|
||||
async fn branch_timeline(
|
||||
self: &Arc<Self>,
|
||||
&self,
|
||||
src_timeline: &Arc<Timeline>,
|
||||
dst_id: TimelineId,
|
||||
start_lsn: Option<Lsn>,
|
||||
timeline_create_guard: TimelineCreateGuard<'_>,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<CreateTimelineResult, CreateTimelineError> {
|
||||
self.branch_timeline_impl(src_timeline, dst_id, start_lsn, ctx)
|
||||
) -> Result<Arc<Timeline>, CreateTimelineError> {
|
||||
self.branch_timeline_impl(src_timeline, dst_id, start_lsn, timeline_create_guard, ctx)
|
||||
.await
|
||||
}
|
||||
|
||||
async fn branch_timeline_impl(
|
||||
self: &Arc<Self>,
|
||||
&self,
|
||||
src_timeline: &Arc<Timeline>,
|
||||
dst_id: TimelineId,
|
||||
start_lsn: Option<Lsn>,
|
||||
timeline_create_guard: TimelineCreateGuard<'_>,
|
||||
_ctx: &RequestContext,
|
||||
) -> Result<CreateTimelineResult, CreateTimelineError> {
|
||||
) -> Result<Arc<Timeline>, CreateTimelineError> {
|
||||
let src_id = src_timeline.timeline_id;
|
||||
|
||||
// We will validate our ancestor LSN in this function. Acquire the GC lock so that
|
||||
@@ -4018,23 +3849,6 @@ impl Tenant {
|
||||
lsn
|
||||
});
|
||||
|
||||
// we finally have determined the ancestor_start_lsn, so we can get claim exclusivity now
|
||||
let timeline_create_guard = match self
|
||||
.start_creating_timeline(
|
||||
dst_id,
|
||||
CreateTimelineIdempotency::Branch {
|
||||
ancestor_timeline_id: src_timeline.timeline_id,
|
||||
ancestor_start_lsn: start_lsn,
|
||||
},
|
||||
)
|
||||
.await?
|
||||
{
|
||||
StartCreatingTimelineResult::CreateGuard(guard) => guard,
|
||||
StartCreatingTimelineResult::Idempotent(timeline) => {
|
||||
return Ok(CreateTimelineResult::Idempotent(timeline));
|
||||
}
|
||||
};
|
||||
|
||||
// Ensure that `start_lsn` is valid, i.e. the LSN is within the PITR
|
||||
// horizon on the source timeline
|
||||
//
|
||||
@@ -4120,92 +3934,28 @@ impl Tenant {
|
||||
.schedule_index_upload_for_full_metadata_update(&metadata)
|
||||
.context("branch initial metadata upload")?;
|
||||
|
||||
// Callers are responsible to wait for uploads to complete and for activating the timeline.
|
||||
|
||||
Ok(CreateTimelineResult::Created(new_timeline))
|
||||
Ok(new_timeline)
|
||||
}
|
||||
|
||||
/// For unit tests, make this visible so that other modules can directly create timelines
|
||||
#[cfg(test)]
|
||||
#[tracing::instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(), %timeline_id))]
|
||||
pub(crate) async fn bootstrap_timeline_test(
|
||||
self: &Arc<Self>,
|
||||
&self,
|
||||
timeline_id: TimelineId,
|
||||
pg_version: u32,
|
||||
load_existing_initdb: Option<TimelineId>,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<Arc<Timeline>> {
|
||||
self.bootstrap_timeline(timeline_id, pg_version, load_existing_initdb, ctx)
|
||||
.await
|
||||
.map_err(anyhow::Error::new)
|
||||
.map(|r| r.into_timeline_for_test())
|
||||
}
|
||||
|
||||
/// Get exclusive access to the timeline ID for creation.
|
||||
///
|
||||
/// Timeline-creating code paths must use this function before making changes
|
||||
/// to in-memory or persistent state.
|
||||
///
|
||||
/// The `state` parameter is a description of the timeline creation operation
|
||||
/// we intend to perform.
|
||||
/// If the timeline was already created in the meantime, we check whether this
|
||||
/// request conflicts or is idempotent , based on `state`.
|
||||
async fn start_creating_timeline(
|
||||
&self,
|
||||
new_timeline_id: TimelineId,
|
||||
idempotency: CreateTimelineIdempotency,
|
||||
) -> Result<StartCreatingTimelineResult<'_>, CreateTimelineError> {
|
||||
let allow_offloaded = false;
|
||||
match self.create_timeline_create_guard(new_timeline_id, idempotency, allow_offloaded) {
|
||||
Ok(create_guard) => {
|
||||
pausable_failpoint!("timeline-creation-after-uninit");
|
||||
Ok(StartCreatingTimelineResult::CreateGuard(create_guard))
|
||||
}
|
||||
Err(TimelineExclusionError::AlreadyCreating) => {
|
||||
// Creation is in progress, we cannot create it again, and we cannot
|
||||
// check if this request matches the existing one, so caller must try
|
||||
// again later.
|
||||
Err(CreateTimelineError::AlreadyCreating)
|
||||
}
|
||||
Err(TimelineExclusionError::Other(e)) => Err(CreateTimelineError::Other(e)),
|
||||
Err(TimelineExclusionError::AlreadyExists {
|
||||
existing: TimelineOrOffloaded::Offloaded(_existing),
|
||||
..
|
||||
}) => {
|
||||
info!("timeline already exists but is offloaded");
|
||||
Err(CreateTimelineError::Conflict)
|
||||
}
|
||||
Err(TimelineExclusionError::AlreadyExists {
|
||||
existing: TimelineOrOffloaded::Timeline(existing),
|
||||
arg,
|
||||
}) => {
|
||||
{
|
||||
let existing = &existing.create_idempotency;
|
||||
let _span = info_span!("idempotency_check", ?existing, ?arg).entered();
|
||||
debug!("timeline already exists");
|
||||
|
||||
match (existing, &arg) {
|
||||
// FailWithConflict => no idempotency check
|
||||
(CreateTimelineIdempotency::FailWithConflict, _)
|
||||
| (_, CreateTimelineIdempotency::FailWithConflict) => {
|
||||
warn!("timeline already exists, failing request");
|
||||
return Err(CreateTimelineError::Conflict);
|
||||
}
|
||||
// Idempotent <=> CreateTimelineIdempotency is identical
|
||||
(x, y) if x == y => {
|
||||
info!("timeline already exists and idempotency matches, succeeding request");
|
||||
// fallthrough
|
||||
}
|
||||
(_, _) => {
|
||||
warn!("idempotency conflict, failing request");
|
||||
return Err(CreateTimelineError::Conflict);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(StartCreatingTimelineResult::Idempotent(existing))
|
||||
}
|
||||
}
|
||||
let create_guard = self.create_timeline_create_guard(timeline_id).unwrap();
|
||||
self.bootstrap_timeline(
|
||||
timeline_id,
|
||||
pg_version,
|
||||
load_existing_initdb,
|
||||
create_guard,
|
||||
ctx,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
async fn upload_initdb(
|
||||
@@ -4259,26 +4009,16 @@ impl Tenant {
|
||||
|
||||
/// - run initdb to init temporary instance and get bootstrap data
|
||||
/// - after initialization completes, tar up the temp dir and upload it to S3.
|
||||
///
|
||||
/// The caller is responsible for activating the returned timeline.
|
||||
async fn bootstrap_timeline(
|
||||
self: &Arc<Self>,
|
||||
&self,
|
||||
timeline_id: TimelineId,
|
||||
pg_version: u32,
|
||||
load_existing_initdb: Option<TimelineId>,
|
||||
timeline_create_guard: TimelineCreateGuard<'_>,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<CreateTimelineResult, CreateTimelineError> {
|
||||
let timeline_create_guard = match self
|
||||
.start_creating_timeline(
|
||||
timeline_id,
|
||||
CreateTimelineIdempotency::Bootstrap { pg_version },
|
||||
)
|
||||
.await?
|
||||
{
|
||||
StartCreatingTimelineResult::CreateGuard(guard) => guard,
|
||||
StartCreatingTimelineResult::Idempotent(timeline) => {
|
||||
return Ok(CreateTimelineResult::Idempotent(timeline))
|
||||
}
|
||||
};
|
||||
|
||||
) -> anyhow::Result<Arc<Timeline>> {
|
||||
// create a `tenant/{tenant_id}/timelines/basebackup-{timeline_id}.{TEMP_FILE_SUFFIX}/`
|
||||
// temporary directory for basebackup files for the given timeline.
|
||||
|
||||
@@ -4342,9 +4082,7 @@ impl Tenant {
|
||||
.context("extract initdb tar")?;
|
||||
} else {
|
||||
// Init temporarily repo to get bootstrap data, this creates a directory in the `pgdata_path` path
|
||||
run_initdb(self.conf, &pgdata_path, pg_version, &self.cancel)
|
||||
.await
|
||||
.context("run initdb")?;
|
||||
run_initdb(self.conf, &pgdata_path, pg_version, &self.cancel).await?;
|
||||
|
||||
// Upload the created data dir to S3
|
||||
if self.tenant_shard_id().is_shard_zero() {
|
||||
@@ -4398,9 +4136,7 @@ impl Tenant {
|
||||
})?;
|
||||
|
||||
fail::fail_point!("before-checkpoint-new-timeline", |_| {
|
||||
Err(CreateTimelineError::Other(anyhow::anyhow!(
|
||||
"failpoint before-checkpoint-new-timeline"
|
||||
)))
|
||||
anyhow::bail!("failpoint before-checkpoint-new-timeline");
|
||||
});
|
||||
|
||||
unfinished_timeline
|
||||
@@ -4415,9 +4151,7 @@ impl Tenant {
|
||||
// All done!
|
||||
let timeline = raw_timeline.finish_creation()?;
|
||||
|
||||
// Callers are responsible to wait for uploads to complete and for activating the timeline.
|
||||
|
||||
Ok(CreateTimelineResult::Created(timeline))
|
||||
Ok(timeline)
|
||||
}
|
||||
|
||||
fn build_timeline_remote_client(&self, timeline_id: TimelineId) -> RemoteTimelineClient {
|
||||
@@ -4467,7 +4201,6 @@ impl Tenant {
|
||||
ancestor,
|
||||
resources,
|
||||
CreateTimelineCause::Load,
|
||||
create_guard.idempotency.clone(),
|
||||
)
|
||||
.context("Failed to create timeline data structure")?;
|
||||
|
||||
@@ -4505,26 +4238,15 @@ impl Tenant {
|
||||
|
||||
/// Get a guard that provides exclusive access to the timeline directory, preventing
|
||||
/// concurrent attempts to create the same timeline.
|
||||
///
|
||||
/// The `allow_offloaded` parameter controls whether to tolerate the existence of
|
||||
/// offloaded timelines or not.
|
||||
fn create_timeline_create_guard(
|
||||
&self,
|
||||
timeline_id: TimelineId,
|
||||
idempotency: CreateTimelineIdempotency,
|
||||
allow_offloaded: bool,
|
||||
) -> Result<TimelineCreateGuard, TimelineExclusionError> {
|
||||
let tenant_shard_id = self.tenant_shard_id;
|
||||
|
||||
let timeline_path = self.conf.timeline_path(&tenant_shard_id, &timeline_id);
|
||||
|
||||
let create_guard = TimelineCreateGuard::new(
|
||||
self,
|
||||
timeline_id,
|
||||
timeline_path.clone(),
|
||||
idempotency,
|
||||
allow_offloaded,
|
||||
)?;
|
||||
let create_guard = TimelineCreateGuard::new(self, timeline_id, timeline_path.clone())?;
|
||||
|
||||
// At this stage, we have got exclusive access to in-memory state for this timeline ID
|
||||
// for creation.
|
||||
@@ -5160,10 +4882,7 @@ mod tests {
|
||||
.await
|
||||
{
|
||||
Ok(_) => panic!("duplicate timeline creation should fail"),
|
||||
Err(e) => assert_eq!(
|
||||
e.to_string(),
|
||||
"timeline already exists with different parameters".to_string()
|
||||
),
|
||||
Err(e) => assert_eq!(e.to_string(), "Already exists".to_string()),
|
||||
}
|
||||
|
||||
Ok(())
|
||||
|
||||
@@ -1278,14 +1278,10 @@ impl RemoteTimelineClient {
|
||||
let fut = {
|
||||
let mut guard = self.upload_queue.lock().unwrap();
|
||||
let upload_queue = match &mut *guard {
|
||||
UploadQueue::Stopped(_) => {
|
||||
scopeguard::ScopeGuard::into_inner(sg);
|
||||
return;
|
||||
}
|
||||
UploadQueue::Stopped(_) => return,
|
||||
UploadQueue::Uninitialized => {
|
||||
// transition into Stopped state
|
||||
self.stop_impl(&mut guard);
|
||||
scopeguard::ScopeGuard::into_inner(sg);
|
||||
return;
|
||||
}
|
||||
UploadQueue::Initialized(ref mut init) => init,
|
||||
|
||||
@@ -187,8 +187,6 @@ pub(super) async fn gather_inputs(
|
||||
// but it is unlikely to cause any issues. In the worst case,
|
||||
// the calculation will error out.
|
||||
timelines.retain(|t| t.is_active());
|
||||
// Also filter out archived timelines.
|
||||
timelines.retain(|t| t.is_archived() != Some(true));
|
||||
|
||||
// Build a map of branch points.
|
||||
let mut branchpoints: HashMap<TimelineId, HashSet<Lsn>> = HashMap::new();
|
||||
|
||||
@@ -1084,7 +1084,7 @@ impl DeltaLayerInner {
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) async fn index_entries<'a>(
|
||||
pub(super) async fn load_keys<'a>(
|
||||
&'a self,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<Vec<DeltaEntry<'a>>> {
|
||||
@@ -1346,7 +1346,7 @@ impl DeltaLayerInner {
|
||||
|
||||
tree_reader.dump().await?;
|
||||
|
||||
let keys = self.index_entries(ctx).await?;
|
||||
let keys = self.load_keys(ctx).await?;
|
||||
|
||||
async fn dump_blob(val: &ValueRef<'_>, ctx: &RequestContext) -> anyhow::Result<String> {
|
||||
let buf = val.load_raw(ctx).await?;
|
||||
@@ -1453,16 +1453,6 @@ impl DeltaLayerInner {
|
||||
),
|
||||
}
|
||||
}
|
||||
|
||||
/// NB: not super efficient, but not terrible either. Should prob be an iterator.
|
||||
//
|
||||
// We're reusing the index traversal logical in plan_reads; would be nice to
|
||||
// factor that out.
|
||||
pub(crate) async fn load_keys(&self, ctx: &RequestContext) -> anyhow::Result<Vec<Key>> {
|
||||
self.index_entries(ctx)
|
||||
.await
|
||||
.map(|entries| entries.into_iter().map(|entry| entry.key).collect())
|
||||
}
|
||||
}
|
||||
|
||||
/// A set of data associated with a delta layer key and its value
|
||||
|
||||
@@ -673,21 +673,6 @@ impl ImageLayerInner {
|
||||
),
|
||||
}
|
||||
}
|
||||
|
||||
/// NB: not super efficient, but not terrible either. Should prob be an iterator.
|
||||
//
|
||||
// We're reusing the index traversal logical in plan_reads; would be nice to
|
||||
// factor that out.
|
||||
pub(crate) async fn load_keys(&self, ctx: &RequestContext) -> anyhow::Result<Vec<Key>> {
|
||||
let plan = self
|
||||
.plan_reads(KeySpace::single(self.key_range.clone()), None, ctx)
|
||||
.await?;
|
||||
Ok(plan
|
||||
.into_iter()
|
||||
.flat_map(|read| read.blobs_at)
|
||||
.map(|(_, blob_meta)| blob_meta.key)
|
||||
.collect())
|
||||
}
|
||||
}
|
||||
|
||||
/// A builder object for constructing a new image layer.
|
||||
|
||||
@@ -19,7 +19,7 @@ use crate::task_mgr::TaskKind;
|
||||
use crate::tenant::timeline::{CompactionError, GetVectoredError};
|
||||
use crate::tenant::{remote_timeline_client::LayerFileMetadata, Timeline};
|
||||
|
||||
use super::delta_layer::{self};
|
||||
use super::delta_layer::{self, DeltaEntry};
|
||||
use super::image_layer::{self};
|
||||
use super::{
|
||||
AsLayerDesc, ImageLayerWriter, LayerAccessStats, LayerAccessStatsReset, LayerName,
|
||||
@@ -1841,22 +1841,23 @@ impl ResidentLayer {
|
||||
pub(crate) async fn load_keys<'a>(
|
||||
&'a self,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<Vec<pageserver_api::key::Key>> {
|
||||
) -> anyhow::Result<Vec<DeltaEntry<'a>>> {
|
||||
use LayerKind::*;
|
||||
|
||||
let owner = &self.owner.0;
|
||||
let inner = self.downloaded.get(owner, ctx).await?;
|
||||
match self.downloaded.get(owner, ctx).await? {
|
||||
Delta(ref d) => {
|
||||
// this is valid because the DownloadedLayer::kind is a OnceCell, not a
|
||||
// Mutex<OnceCell>, so we cannot go and deinitialize the value with OnceCell::take
|
||||
// while it's being held.
|
||||
self.owner.record_access(ctx);
|
||||
|
||||
// this is valid because the DownloadedLayer::kind is a OnceCell, not a
|
||||
// Mutex<OnceCell>, so we cannot go and deinitialize the value with OnceCell::take
|
||||
// while it's being held.
|
||||
self.owner.record_access(ctx);
|
||||
|
||||
let res = match inner {
|
||||
Delta(ref d) => delta_layer::DeltaLayerInner::load_keys(d, ctx).await,
|
||||
Image(ref i) => image_layer::ImageLayerInner::load_keys(i, ctx).await,
|
||||
};
|
||||
res.with_context(|| format!("Layer index is corrupted for {self}"))
|
||||
delta_layer::DeltaLayerInner::load_keys(d, ctx)
|
||||
.await
|
||||
.with_context(|| format!("Layer index is corrupted for {self}"))
|
||||
}
|
||||
Image(_) => anyhow::bail!(format!("cannot load_keys on a image layer {self}")),
|
||||
}
|
||||
}
|
||||
|
||||
/// Read all they keys in this layer which match the ShardIdentity, and write them all to
|
||||
|
||||
@@ -57,34 +57,6 @@ impl std::fmt::Display for PersistentLayerKey {
|
||||
}
|
||||
}
|
||||
|
||||
impl From<ImageLayerName> for PersistentLayerKey {
|
||||
fn from(image_layer_name: ImageLayerName) -> Self {
|
||||
Self {
|
||||
key_range: image_layer_name.key_range,
|
||||
lsn_range: PersistentLayerDesc::image_layer_lsn_range(image_layer_name.lsn),
|
||||
is_delta: false,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<DeltaLayerName> for PersistentLayerKey {
|
||||
fn from(delta_layer_name: DeltaLayerName) -> Self {
|
||||
Self {
|
||||
key_range: delta_layer_name.key_range,
|
||||
lsn_range: delta_layer_name.lsn_range,
|
||||
is_delta: true,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<LayerName> for PersistentLayerKey {
|
||||
fn from(layer_name: LayerName) -> Self {
|
||||
match layer_name {
|
||||
LayerName::Image(i) => i.into(),
|
||||
LayerName::Delta(d) => d.into(),
|
||||
}
|
||||
}
|
||||
}
|
||||
impl PersistentLayerDesc {
|
||||
pub fn key(&self) -> PersistentLayerKey {
|
||||
PersistentLayerKey {
|
||||
|
||||
@@ -424,9 +424,6 @@ pub struct Timeline {
|
||||
pub(crate) handles: handle::PerTimelineState<crate::page_service::TenantManagerTypes>,
|
||||
|
||||
pub(crate) attach_wal_lag_cooldown: Arc<OnceLock<WalLagCooldown>>,
|
||||
|
||||
/// Cf. [`crate::tenant::CreateTimelineIdempotency`].
|
||||
pub(crate) create_idempotency: crate::tenant::CreateTimelineIdempotency,
|
||||
}
|
||||
|
||||
pub type TimelineDeleteProgress = Arc<tokio::sync::Mutex<DeleteTimelineFlow>>;
|
||||
@@ -2139,7 +2136,6 @@ impl Timeline {
|
||||
pg_version: u32,
|
||||
state: TimelineState,
|
||||
attach_wal_lag_cooldown: Arc<OnceLock<WalLagCooldown>>,
|
||||
create_idempotency: crate::tenant::CreateTimelineIdempotency,
|
||||
cancel: CancellationToken,
|
||||
) -> Arc<Self> {
|
||||
let disk_consistent_lsn = metadata.disk_consistent_lsn();
|
||||
@@ -2278,8 +2274,6 @@ impl Timeline {
|
||||
handles: Default::default(),
|
||||
|
||||
attach_wal_lag_cooldown,
|
||||
|
||||
create_idempotency,
|
||||
};
|
||||
|
||||
result.repartition_threshold =
|
||||
|
||||
@@ -834,12 +834,7 @@ impl Timeline {
|
||||
if self.cancel.is_cancelled() {
|
||||
return Err(CompactionError::ShuttingDown);
|
||||
}
|
||||
let delta = l.get_as_delta(ctx).await.map_err(CompactionError::Other)?;
|
||||
let keys = delta
|
||||
.index_entries(ctx)
|
||||
.await
|
||||
.map_err(CompactionError::Other)?;
|
||||
all_keys.extend(keys);
|
||||
all_keys.extend(l.load_keys(ctx).await.map_err(CompactionError::Other)?);
|
||||
}
|
||||
// The current stdlib sorting implementation is designed in a way where it is
|
||||
// particularly fast where the slice is made up of sorted sub-ranges.
|
||||
@@ -2443,7 +2438,7 @@ impl CompactionDeltaLayer<TimelineAdaptor> for ResidentDeltaLayer {
|
||||
type DeltaEntry<'a> = DeltaEntry<'a>;
|
||||
|
||||
async fn load_keys<'a>(&self, ctx: &RequestContext) -> anyhow::Result<Vec<DeltaEntry<'_>>> {
|
||||
self.0.get_as_delta(ctx).await?.index_entries(ctx).await
|
||||
self.0.load_keys(ctx).await
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -313,7 +313,6 @@ impl DeleteTimelineFlow {
|
||||
// Important. We dont pass ancestor above because it can be missing.
|
||||
// Thus we need to skip the validation here.
|
||||
CreateTimelineCause::Delete,
|
||||
crate::tenant::CreateTimelineIdempotency::FailWithConflict, // doesn't matter what we put here
|
||||
)
|
||||
.context("create_timeline_struct")?;
|
||||
|
||||
|
||||
@@ -45,16 +45,13 @@ impl LayerManager {
|
||||
pub(crate) fn get_from_key(&self, key: &PersistentLayerKey) -> Layer {
|
||||
// The assumption for the `expect()` is that all code maintains the following invariant:
|
||||
// A layer's descriptor is present in the LayerMap => the LayerFileManager contains a layer for the descriptor.
|
||||
self.try_get_from_key(key)
|
||||
self.layers()
|
||||
.get(key)
|
||||
.with_context(|| format!("get layer from key: {key}"))
|
||||
.expect("not found")
|
||||
.clone()
|
||||
}
|
||||
|
||||
pub(crate) fn try_get_from_key(&self, key: &PersistentLayerKey) -> Option<&Layer> {
|
||||
self.layers().get(key)
|
||||
}
|
||||
|
||||
pub(crate) fn get_from_desc(&self, desc: &PersistentLayerDesc) -> Layer {
|
||||
self.get_from_key(&desc.key())
|
||||
}
|
||||
|
||||
@@ -5,11 +5,7 @@ use camino::Utf8PathBuf;
|
||||
use tracing::{error, info, info_span};
|
||||
use utils::{fs_ext, id::TimelineId, lsn::Lsn};
|
||||
|
||||
use crate::{
|
||||
context::RequestContext,
|
||||
import_datadir,
|
||||
tenant::{CreateTimelineIdempotency, Tenant, TimelineOrOffloaded},
|
||||
};
|
||||
use crate::{context::RequestContext, import_datadir, tenant::Tenant};
|
||||
|
||||
use super::Timeline;
|
||||
|
||||
@@ -169,17 +165,13 @@ pub(crate) struct TimelineCreateGuard<'t> {
|
||||
owning_tenant: &'t Tenant,
|
||||
timeline_id: TimelineId,
|
||||
pub(crate) timeline_path: Utf8PathBuf,
|
||||
pub(crate) idempotency: CreateTimelineIdempotency,
|
||||
}
|
||||
|
||||
/// Errors when acquiring exclusive access to a timeline ID for creation
|
||||
#[derive(thiserror::Error, Debug)]
|
||||
pub(crate) enum TimelineExclusionError {
|
||||
#[error("Already exists")]
|
||||
AlreadyExists {
|
||||
existing: TimelineOrOffloaded,
|
||||
arg: CreateTimelineIdempotency,
|
||||
},
|
||||
AlreadyExists(Arc<Timeline>),
|
||||
#[error("Already creating")]
|
||||
AlreadyCreating,
|
||||
|
||||
@@ -193,42 +185,27 @@ impl<'t> TimelineCreateGuard<'t> {
|
||||
owning_tenant: &'t Tenant,
|
||||
timeline_id: TimelineId,
|
||||
timeline_path: Utf8PathBuf,
|
||||
idempotency: CreateTimelineIdempotency,
|
||||
allow_offloaded: bool,
|
||||
) -> Result<Self, TimelineExclusionError> {
|
||||
// Lock order: this is the only place we take both locks. During drop() we only
|
||||
// lock creating_timelines
|
||||
let timelines = owning_tenant.timelines.lock().unwrap();
|
||||
let timelines_offloaded = owning_tenant.timelines_offloaded.lock().unwrap();
|
||||
let mut creating_timelines: std::sync::MutexGuard<
|
||||
'_,
|
||||
std::collections::HashSet<TimelineId>,
|
||||
> = owning_tenant.timelines_creating.lock().unwrap();
|
||||
|
||||
if let Some(existing) = timelines.get(&timeline_id) {
|
||||
return Err(TimelineExclusionError::AlreadyExists {
|
||||
existing: TimelineOrOffloaded::Timeline(existing.clone()),
|
||||
arg: idempotency,
|
||||
});
|
||||
Err(TimelineExclusionError::AlreadyExists(existing.clone()))
|
||||
} else if creating_timelines.contains(&timeline_id) {
|
||||
Err(TimelineExclusionError::AlreadyCreating)
|
||||
} else {
|
||||
creating_timelines.insert(timeline_id);
|
||||
Ok(Self {
|
||||
owning_tenant,
|
||||
timeline_id,
|
||||
timeline_path,
|
||||
})
|
||||
}
|
||||
if !allow_offloaded {
|
||||
if let Some(existing) = timelines_offloaded.get(&timeline_id) {
|
||||
return Err(TimelineExclusionError::AlreadyExists {
|
||||
existing: TimelineOrOffloaded::Offloaded(existing.clone()),
|
||||
arg: idempotency,
|
||||
});
|
||||
}
|
||||
}
|
||||
if creating_timelines.contains(&timeline_id) {
|
||||
return Err(TimelineExclusionError::AlreadyCreating);
|
||||
}
|
||||
creating_timelines.insert(timeline_id);
|
||||
Ok(Self {
|
||||
owning_tenant,
|
||||
timeline_id,
|
||||
timeline_path,
|
||||
idempotency,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -16,24 +16,18 @@ use tokio_epoll_uring::{System, SystemHandle};
|
||||
|
||||
use crate::virtual_file::on_fatal_io_error;
|
||||
|
||||
use crate::metrics::tokio_epoll_uring::{self as metrics, THREAD_LOCAL_METRICS_STORAGE};
|
||||
use crate::metrics::tokio_epoll_uring as metrics;
|
||||
|
||||
#[derive(Clone)]
|
||||
struct ThreadLocalState(Arc<ThreadLocalStateInner>);
|
||||
|
||||
struct ThreadLocalStateInner {
|
||||
cell: tokio::sync::OnceCell<SystemHandle<metrics::ThreadLocalMetrics>>,
|
||||
cell: tokio::sync::OnceCell<SystemHandle>,
|
||||
launch_attempts: AtomicU32,
|
||||
/// populated through fetch_add from [`THREAD_LOCAL_STATE_ID`]
|
||||
thread_local_state_id: u64,
|
||||
}
|
||||
|
||||
impl Drop for ThreadLocalStateInner {
|
||||
fn drop(&mut self) {
|
||||
THREAD_LOCAL_METRICS_STORAGE.remove_system(self.thread_local_state_id);
|
||||
}
|
||||
}
|
||||
|
||||
impl ThreadLocalState {
|
||||
pub fn new() -> Self {
|
||||
Self(Arc::new(ThreadLocalStateInner {
|
||||
@@ -77,8 +71,7 @@ pub async fn thread_local_system() -> Handle {
|
||||
&fake_cancel,
|
||||
)
|
||||
.await;
|
||||
let per_system_metrics = metrics::THREAD_LOCAL_METRICS_STORAGE.register_system(inner.thread_local_state_id);
|
||||
let res = System::launch_with_metrics(per_system_metrics)
|
||||
let res = System::launch()
|
||||
// this might move us to another executor thread => loop outside the get_or_try_init, not inside it
|
||||
.await;
|
||||
match res {
|
||||
@@ -93,7 +86,6 @@ pub async fn thread_local_system() -> Handle {
|
||||
emit_launch_failure_process_stats();
|
||||
});
|
||||
metrics::THREAD_LOCAL_LAUNCH_FAILURES.inc();
|
||||
metrics::THREAD_LOCAL_METRICS_STORAGE.remove_system(inner.thread_local_state_id);
|
||||
Err(())
|
||||
}
|
||||
// abort the process instead of panicking because pageserver usually becomes half-broken if we panic somewhere.
|
||||
@@ -123,7 +115,7 @@ fn emit_launch_failure_process_stats() {
|
||||
// number of threads
|
||||
// rss / system memory usage generally
|
||||
|
||||
let tokio_epoll_uring::metrics::GlobalMetrics {
|
||||
let tokio_epoll_uring::metrics::Metrics {
|
||||
systems_created,
|
||||
systems_destroyed,
|
||||
} = tokio_epoll_uring::metrics::global();
|
||||
@@ -190,7 +182,7 @@ fn emit_launch_failure_process_stats() {
|
||||
pub struct Handle(ThreadLocalState);
|
||||
|
||||
impl std::ops::Deref for Handle {
|
||||
type Target = SystemHandle<metrics::ThreadLocalMetrics>;
|
||||
type Target = SystemHandle;
|
||||
|
||||
fn deref(&self) -> &Self::Target {
|
||||
self.0
|
||||
|
||||
@@ -21,7 +21,6 @@
|
||||
//! redo Postgres process, but some records it can handle directly with
|
||||
//! bespoken Rust code.
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
use std::sync::OnceLock;
|
||||
use std::time::Duration;
|
||||
@@ -1621,12 +1620,6 @@ impl WalIngest {
|
||||
},
|
||||
)?;
|
||||
|
||||
// Group relations to drop by dbNode. This map will contain all relations that _might_
|
||||
// exist, we will reduce it to which ones really exist later. This map can be huge if
|
||||
// the transaction touches a huge number of relations (there is no bound on this in
|
||||
// postgres).
|
||||
let mut drop_relations: HashMap<(u32, u32), Vec<RelTag>> = HashMap::new();
|
||||
|
||||
for xnode in &parsed.xnodes {
|
||||
for forknum in MAIN_FORKNUM..=INIT_FORKNUM {
|
||||
let rel = RelTag {
|
||||
@@ -1635,16 +1628,15 @@ impl WalIngest {
|
||||
dbnode: xnode.dbnode,
|
||||
relnode: xnode.relnode,
|
||||
};
|
||||
drop_relations
|
||||
.entry((xnode.spcnode, xnode.dbnode))
|
||||
.or_default()
|
||||
.push(rel);
|
||||
if modification
|
||||
.tline
|
||||
.get_rel_exists(rel, Version::Modified(modification), ctx)
|
||||
.await?
|
||||
{
|
||||
self.put_rel_drop(modification, rel, ctx).await?;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Execute relation drops in a batch: the number may be huge, so deleting individually is prohibitively expensive
|
||||
modification.put_rel_drops(drop_relations, ctx).await?;
|
||||
|
||||
if origin_id != 0 {
|
||||
modification
|
||||
.set_replorigin(origin_id, parsed.origin_lsn)
|
||||
@@ -2354,6 +2346,16 @@ impl WalIngest {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn put_rel_drop(
|
||||
&mut self,
|
||||
modification: &mut DatadirModification<'_>,
|
||||
rel: RelTag,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<()> {
|
||||
modification.put_rel_drop(rel, ctx).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn handle_rel_extend(
|
||||
&mut self,
|
||||
modification: &mut DatadirModification<'_>,
|
||||
@@ -2417,59 +2419,6 @@ impl WalIngest {
|
||||
WAL_INGEST
|
||||
.gap_blocks_zeroed_on_rel_extend
|
||||
.inc_by(gap_blocks_filled);
|
||||
|
||||
// Log something when relation extends cause use to fill gaps
|
||||
// with zero pages. Logging is rate limited per pg version to
|
||||
// avoid skewing.
|
||||
if gap_blocks_filled > 0 {
|
||||
use once_cell::sync::Lazy;
|
||||
use std::sync::Mutex;
|
||||
use utils::rate_limit::RateLimit;
|
||||
|
||||
struct RateLimitPerPgVersion {
|
||||
rate_limiters: [Lazy<Mutex<RateLimit>>; 4],
|
||||
}
|
||||
|
||||
impl RateLimitPerPgVersion {
|
||||
const fn new() -> Self {
|
||||
Self {
|
||||
rate_limiters: [const {
|
||||
Lazy::new(|| Mutex::new(RateLimit::new(Duration::from_secs(30))))
|
||||
}; 4],
|
||||
}
|
||||
}
|
||||
|
||||
const fn rate_limiter(
|
||||
&self,
|
||||
pg_version: u32,
|
||||
) -> Option<&Lazy<Mutex<RateLimit>>> {
|
||||
const MIN_PG_VERSION: u32 = 14;
|
||||
const MAX_PG_VERSION: u32 = 17;
|
||||
|
||||
if pg_version < MIN_PG_VERSION || pg_version > MAX_PG_VERSION {
|
||||
return None;
|
||||
}
|
||||
|
||||
Some(&self.rate_limiters[(pg_version - MIN_PG_VERSION) as usize])
|
||||
}
|
||||
}
|
||||
|
||||
static LOGGED: RateLimitPerPgVersion = RateLimitPerPgVersion::new();
|
||||
if let Some(rate_limiter) = LOGGED.rate_limiter(modification.tline.pg_version) {
|
||||
if let Ok(mut locked) = rate_limiter.try_lock() {
|
||||
locked.call(|| {
|
||||
info!(
|
||||
lsn=%modification.get_lsn(),
|
||||
pg_version=%modification.tline.pg_version,
|
||||
rel=%rel,
|
||||
"Filled {} gap blocks on rel extend to {} from {}",
|
||||
gap_blocks_filled,
|
||||
new_nblocks,
|
||||
old_nblocks);
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
@@ -2867,9 +2816,7 @@ mod tests {
|
||||
|
||||
// Drop rel
|
||||
let mut m = tline.begin_modification(Lsn(0x30));
|
||||
let mut rel_drops = HashMap::new();
|
||||
rel_drops.insert((TESTREL_A.spcnode, TESTREL_A.dbnode), vec![TESTREL_A]);
|
||||
m.put_rel_drops(rel_drops, &ctx).await?;
|
||||
walingest.put_rel_drop(&mut m, TESTREL_A, &ctx).await?;
|
||||
m.commit(&ctx).await?;
|
||||
|
||||
// Check that rel is not visible anymore
|
||||
|
||||
@@ -8,7 +8,6 @@ OBJS = \
|
||||
file_cache.o \
|
||||
hll.o \
|
||||
libpagestore.o \
|
||||
logical_replication_monitor.o \
|
||||
neon.o \
|
||||
neon_pgversioncompat.o \
|
||||
neon_perf_counters.o \
|
||||
@@ -33,6 +32,8 @@ DATA = \
|
||||
neon--1.2--1.3.sql \
|
||||
neon--1.3--1.4.sql \
|
||||
neon--1.4--1.5.sql \
|
||||
neon--1.5--1.6.sql \
|
||||
neon--1.6--1.5.sql \
|
||||
neon--1.5--1.4.sql \
|
||||
neon--1.4--1.3.sql \
|
||||
neon--1.3--1.2.sql \
|
||||
|
||||
@@ -22,6 +22,7 @@
|
||||
#include "neon_pgversioncompat.h"
|
||||
|
||||
#include "access/parallel.h"
|
||||
#include "access/xlog.h"
|
||||
#include "funcapi.h"
|
||||
#include "miscadmin.h"
|
||||
#include "pagestore_client.h"
|
||||
@@ -30,22 +31,28 @@
|
||||
#include "port/pg_iovec.h"
|
||||
#include "postmaster/bgworker.h"
|
||||
#include RELFILEINFO_HDR
|
||||
#include "replication/message.h"
|
||||
#include "storage/buf_internals.h"
|
||||
#include "storage/fd.h"
|
||||
#include "storage/ipc.h"
|
||||
#include "storage/latch.h"
|
||||
#include "storage/lwlock.h"
|
||||
#include "storage/pg_shmem.h"
|
||||
#include "tcop/tcopprot.h"
|
||||
#include "utils/builtins.h"
|
||||
#include "utils/dynahash.h"
|
||||
#include "utils/guc.h"
|
||||
|
||||
#if PG_VERSION_NUM >= 150000
|
||||
#include "access/xlogrecovery.h"
|
||||
#endif
|
||||
|
||||
#include "hll.h"
|
||||
#include "bitmap.h"
|
||||
#include "neon.h"
|
||||
#include "neon_perf_counters.h"
|
||||
|
||||
#define CriticalAssert(cond) do if (!(cond)) elog(PANIC, "Assertion %s failed at %s:%d: ", #cond, __FILE__, __LINE__); while (0)
|
||||
#define CriticalAssert(cond) do if (!(cond)) elog(PANIC, "LFC: assertion %s failed at %s:%d: ", #cond, __FILE__, __LINE__); while (0)
|
||||
|
||||
/*
|
||||
* Local file cache is used to temporary store relations pages in local file system.
|
||||
@@ -100,7 +107,9 @@ typedef struct FileCacheEntry
|
||||
BufferTag key;
|
||||
uint32 hash;
|
||||
uint32 offset;
|
||||
uint32 access_count;
|
||||
uint32 access_count : 30;
|
||||
uint32 prewarm_requested : 1; /* entry should be filled by prewarm */
|
||||
uint32 prewarm_started : 1; /* chunk is written by lfc_prewarm */
|
||||
uint32 bitmap[CHUNK_BITMAP_SIZE];
|
||||
dlist_node list_node; /* LRU/holes list node */
|
||||
} FileCacheEntry;
|
||||
@@ -118,26 +127,57 @@ typedef struct FileCacheControl
|
||||
uint64 writes; /* number of writes issued */
|
||||
uint64 time_read; /* time spent reading (us) */
|
||||
uint64 time_write; /* time spent writing (us) */
|
||||
uint32 prewarm_total_chunks;
|
||||
uint32 prewarm_curr_chunk;
|
||||
uint32 prewarmed_pages;
|
||||
uint32 skipped_pages;
|
||||
dlist_head lru; /* double linked list for LRU replacement
|
||||
* algorithm */
|
||||
dlist_head holes; /* double linked list of punched holes */
|
||||
HyperLogLogState wss_estimation; /* estimation of working set size */
|
||||
} FileCacheControl;
|
||||
|
||||
typedef struct FileCacheStateEntry
|
||||
{
|
||||
BufferTag key;
|
||||
uint32 bitmap[CHUNK_BITMAP_SIZE];
|
||||
} FileCacheStateEntry;
|
||||
|
||||
static HTAB *lfc_hash;
|
||||
static int lfc_desc = 0;
|
||||
static LWLockId lfc_lock;
|
||||
static int lfc_max_size;
|
||||
static int lfc_size_limit;
|
||||
static int lfc_prewarm_limit;
|
||||
static int lfc_prewarm_batch;
|
||||
static char *lfc_path;
|
||||
static FileCacheControl *lfc_ctl;
|
||||
static shmem_startup_hook_type prev_shmem_startup_hook;
|
||||
#if PG_VERSION_NUM>=150000
|
||||
static shmem_request_hook_type prev_shmem_request_hook;
|
||||
#endif
|
||||
static CustomCheckpointHookType PrevCheckpointHook;
|
||||
|
||||
|
||||
#define LFC_ENABLED() (lfc_ctl->limit != 0)
|
||||
|
||||
PGDLLEXPORT void LfcPrewarmMain(Datum main_arg);
|
||||
|
||||
static void
|
||||
LfcCheckpointHook(int flags)
|
||||
{
|
||||
if (flags & CHECKPOINT_IS_SHUTDOWN)
|
||||
{
|
||||
lfc_save_state();
|
||||
}
|
||||
|
||||
if (PrevCheckpointHook)
|
||||
{
|
||||
PrevCheckpointHook(flags);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* Local file cache is optional and Neon can work without it.
|
||||
* In case of any any errors with this cache, we should disable it but to not throw error.
|
||||
@@ -149,7 +189,7 @@ lfc_disable(char const *op)
|
||||
{
|
||||
int fd;
|
||||
|
||||
elog(WARNING, "Failed to %s local file cache at %s: %m, disabling local file cache", op, lfc_path);
|
||||
elog(WARNING, "LFC: failed to %s local file cache at %s: %m, disabling local file cache", op, lfc_path);
|
||||
|
||||
/* Invalidate hash */
|
||||
LWLockAcquire(lfc_lock, LW_EXCLUSIVE);
|
||||
@@ -184,7 +224,7 @@ lfc_disable(char const *op)
|
||||
pgstat_report_wait_end();
|
||||
|
||||
if (rc < 0)
|
||||
elog(WARNING, "Failed to truncate local file cache %s: %m", lfc_path);
|
||||
elog(WARNING, "LFC: failed to truncate local file cache %s: %m", lfc_path);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -196,7 +236,7 @@ lfc_disable(char const *op)
|
||||
|
||||
fd = BasicOpenFile(lfc_path, O_RDWR | O_CREAT | O_TRUNC);
|
||||
if (fd < 0)
|
||||
elog(WARNING, "Failed to recreate local file cache %s: %m", lfc_path);
|
||||
elog(WARNING, "LFC: failed to recreate local file cache %s: %m", lfc_path);
|
||||
else
|
||||
close(fd);
|
||||
|
||||
@@ -236,6 +276,17 @@ lfc_ensure_opened(void)
|
||||
return enabled;
|
||||
}
|
||||
|
||||
PGDLLEXPORT void
|
||||
LfcPrewarmMain(Datum main_arg)
|
||||
{
|
||||
pqsignal(SIGTERM, die);
|
||||
|
||||
BackgroundWorkerUnblockSignals();
|
||||
|
||||
lfc_load_pages();
|
||||
}
|
||||
|
||||
|
||||
static void
|
||||
lfc_shmem_startup(void)
|
||||
{
|
||||
@@ -267,14 +318,7 @@ lfc_shmem_startup(void)
|
||||
n_chunks + 1, n_chunks + 1,
|
||||
&info,
|
||||
HASH_ELEM | HASH_BLOBS);
|
||||
lfc_ctl->generation = 0;
|
||||
lfc_ctl->size = 0;
|
||||
lfc_ctl->used = 0;
|
||||
lfc_ctl->hits = 0;
|
||||
lfc_ctl->misses = 0;
|
||||
lfc_ctl->writes = 0;
|
||||
lfc_ctl->time_read = 0;
|
||||
lfc_ctl->time_write = 0;
|
||||
memset(lfc_ctl, 0, sizeof *lfc_ctl);
|
||||
dlist_init(&lfc_ctl->lru);
|
||||
dlist_init(&lfc_ctl->holes);
|
||||
|
||||
@@ -285,7 +329,7 @@ lfc_shmem_startup(void)
|
||||
fd = BasicOpenFile(lfc_path, O_RDWR | O_CREAT | O_TRUNC);
|
||||
if (fd < 0)
|
||||
{
|
||||
elog(WARNING, "Failed to create local file cache %s: %m", lfc_path);
|
||||
elog(WARNING, "LFC: failed to create local file cache %s: %m", lfc_path);
|
||||
lfc_ctl->limit = 0;
|
||||
}
|
||||
else
|
||||
@@ -295,6 +339,9 @@ lfc_shmem_startup(void)
|
||||
}
|
||||
}
|
||||
LWLockRelease(AddinShmemInitLock);
|
||||
|
||||
PrevCheckpointHook = CustomCheckpointHook;
|
||||
CustomCheckpointHook = LfcCheckpointHook;
|
||||
}
|
||||
|
||||
static void
|
||||
@@ -327,7 +374,7 @@ lfc_check_limit_hook(int *newval, void **extra, GucSource source)
|
||||
{
|
||||
if (*newval > lfc_max_size)
|
||||
{
|
||||
elog(ERROR, "neon.file_cache_size_limit can not be larger than neon.max_file_cache_size");
|
||||
elog(ERROR, "LFC: neon.file_cache_size_limit can not be larger than neon.max_file_cache_size");
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
@@ -436,6 +483,32 @@ lfc_init(void)
|
||||
NULL,
|
||||
NULL);
|
||||
|
||||
DefineCustomIntVariable("neon.file_cache_prewarm_limit",
|
||||
"Maximal number of prewarmed pages",
|
||||
NULL,
|
||||
&lfc_prewarm_limit,
|
||||
0, /* disabled by default */
|
||||
0,
|
||||
INT_MAX,
|
||||
PGC_SIGHUP,
|
||||
0,
|
||||
NULL,
|
||||
NULL,
|
||||
NULL);
|
||||
|
||||
DefineCustomIntVariable("neon.file_cache_prewarm_batch",
|
||||
"Number of pages retrivied by prewarm from page server",
|
||||
NULL,
|
||||
&lfc_prewarm_batch,
|
||||
64,
|
||||
1,
|
||||
INT_MAX,
|
||||
PGC_SIGHUP,
|
||||
0,
|
||||
NULL,
|
||||
NULL,
|
||||
NULL);
|
||||
|
||||
if (lfc_max_size == 0)
|
||||
return;
|
||||
|
||||
@@ -447,8 +520,326 @@ lfc_init(void)
|
||||
#else
|
||||
lfc_shmem_request();
|
||||
#endif
|
||||
|
||||
if (lfc_prewarm_limit != 0)
|
||||
{
|
||||
BackgroundWorker bgw;
|
||||
memset(&bgw, 0, sizeof(bgw));
|
||||
bgw.bgw_flags = BGWORKER_SHMEM_ACCESS;
|
||||
|
||||
bgw.bgw_start_time = BgWorkerStart_ConsistentState;
|
||||
snprintf(bgw.bgw_library_name, BGW_MAXLEN, "neon");
|
||||
snprintf(bgw.bgw_function_name, BGW_MAXLEN, "LfcPrewarmMain");
|
||||
snprintf(bgw.bgw_name, BGW_MAXLEN, "LFC prewarm");
|
||||
snprintf(bgw.bgw_type, BGW_MAXLEN, "LFC prewarm");
|
||||
|
||||
RegisterBackgroundWorker(&bgw);
|
||||
}
|
||||
}
|
||||
|
||||
static FileCacheStateEntry*
|
||||
lfc_get_state(size_t* n_entries)
|
||||
{
|
||||
size_t max_entries = *n_entries;
|
||||
size_t i = 0;
|
||||
FileCacheStateEntry* fs = (FileCacheStateEntry*)palloc(sizeof(FileCacheStateEntry) * max_entries);
|
||||
|
||||
LWLockAcquire(lfc_lock, LW_SHARED);
|
||||
|
||||
if (LFC_ENABLED())
|
||||
{
|
||||
dlist_iter iter;
|
||||
dlist_reverse_foreach(iter, &lfc_ctl->lru)
|
||||
{
|
||||
FileCacheEntry *entry = dlist_container(FileCacheEntry, list_node, iter.cur);
|
||||
memcpy(&fs[i].key, &entry->key, sizeof entry->key);
|
||||
memcpy(fs[i].bitmap, entry->bitmap, sizeof entry->bitmap);
|
||||
if (++i == max_entries)
|
||||
break;
|
||||
}
|
||||
elog(LOG, "LFC: save state of %ld chunks", (long)i);
|
||||
}
|
||||
|
||||
LWLockRelease(lfc_lock);
|
||||
|
||||
*n_entries = i;
|
||||
return fs;
|
||||
}
|
||||
|
||||
/*
|
||||
* Save state of local file cache as AUX file. Size of saved state is limited by lfc_prewarm_limit.
|
||||
* This function saves first mostrecently used pages.
|
||||
* It is expected to be called at shutdown checkpoint by checkpointer.
|
||||
*/
|
||||
void
|
||||
lfc_save_state(void)
|
||||
{
|
||||
size_t n_entries = lfc_prewarm_limit;
|
||||
FileCacheStateEntry* fs;
|
||||
|
||||
if (n_entries == 0)
|
||||
return;
|
||||
|
||||
fs = lfc_get_state(&n_entries);
|
||||
if (n_entries != 0)
|
||||
{
|
||||
#if PG_MAJORVERSION_NUM < 17
|
||||
XLogFlush(LogLogicalMessage("neon-file:lfc.state", (char const*)fs, sizeof(FileCacheStateEntry) * n_entries, false));
|
||||
#else
|
||||
LogLogicalMessage("neon-file:lfc.state", (char const*)fs, sizeof(FileCacheStateEntry) * n_entries, false, true);
|
||||
#endif
|
||||
}
|
||||
pfree(fs);
|
||||
}
|
||||
|
||||
/*
|
||||
* Prewarm LFC cache to the specified state.
|
||||
*
|
||||
* Prewarming can interfere with accesses to the pages by other backends. Usually access to LFC is protected by shared buffers: when Postgres
|
||||
* is reading page, it pins shared buffer and enforces that only one backend is reading it, while other are waiting for read completion.
|
||||
*
|
||||
* But it is not true for prewarming: backend can fetch page itself, modify and then write it to LFC. At the
|
||||
* same time `lfc_prewarm` tries to write deteriorated image of this page in LFC. To increase concurrency, access to LFC files (both read and write)
|
||||
* is performed without holding locks. So it can happen that two or more processes write different content to the same location in the LFC file.
|
||||
* Certainly we can not rely on disk content in this case.
|
||||
*
|
||||
* To solve this problem we use two flags in LFC entry: `prewarm_requested` and `prewarm_started`. First is set before prewarm is actually started.
|
||||
* `lfc_prewarm` writes to LFC file only if this flag is set. This flag is cleared if any other backend performs write to this LFC chunk.
|
||||
* In this case data loaded by `lfc_prewarm` is considered to be deteriorated and should be just ignored.
|
||||
*
|
||||
* But as far as write to LFC is performed without holding lock, there is no guarantee that no such write is in progress.
|
||||
* This is why second flag is used: `prewarm_started`. It is set by `lfc_prewarm` when is starts writing page and cleared when write is completed.
|
||||
* Any other backend writing to LFC should abandon it's write to LFC file (just not mark page as loaded in bitmap) if this flag is set.
|
||||
* So neither `lfc_prewarm`, neither backend are saving page in LFC in this case - it is just skipped.
|
||||
*/
|
||||
|
||||
static void
|
||||
lfc_prewarm(FileCacheStateEntry* fs, size_t n_entries)
|
||||
{
|
||||
ssize_t rc;
|
||||
size_t snd_idx = 0, rcv_idx = 0;
|
||||
size_t n_sent = 0, n_received = 0;
|
||||
FileCacheEntry *entry;
|
||||
uint64 generation;
|
||||
uint32 entry_offset;
|
||||
uint32 hash;
|
||||
size_t i;
|
||||
bool found;
|
||||
int shard_no;
|
||||
|
||||
if (!lfc_ensure_opened())
|
||||
return;
|
||||
|
||||
if (n_entries == 0 || fs == NULL)
|
||||
{
|
||||
elog(LOG, "LFC: prewarm is disabled");
|
||||
return;
|
||||
}
|
||||
|
||||
LWLockAcquire(lfc_lock, LW_EXCLUSIVE);
|
||||
|
||||
/* Do not prewarm more entries than LFC limit */
|
||||
if (lfc_ctl->limit <= lfc_ctl->size)
|
||||
{
|
||||
LWLockRelease(lfc_lock);
|
||||
return;
|
||||
}
|
||||
if (n_entries > lfc_ctl->limit - lfc_ctl->size)
|
||||
{
|
||||
n_entries = lfc_ctl->limit - lfc_ctl->size;
|
||||
}
|
||||
|
||||
/* Initialize fields used to track prewarming progress */
|
||||
lfc_ctl->prewarm_total_chunks = n_entries;
|
||||
lfc_ctl->prewarm_curr_chunk = 0;
|
||||
|
||||
/*
|
||||
* Load LFC state and add entries in hash table.
|
||||
* It is needed to track modification of prewarmed pages.
|
||||
* All such entries have `prewarm_requested` flag set. When entry is updated (some backed reads or writes
|
||||
* some pages from this chunk), then `prewarm_requested` flag is cleared, prohibiting prewarm of this chunk.
|
||||
* It prevents overwritting page updated or loaded by backend with older one, loaded by prewarm.
|
||||
*/
|
||||
for (i = 0; i < n_entries; i++)
|
||||
{
|
||||
hash = get_hash_value(lfc_hash, &fs[i].key);
|
||||
entry = hash_search_with_hash_value(lfc_hash, &fs[i].key, hash, HASH_ENTER, &found);
|
||||
/* Do not prewarm chunks which are already present in LFC */
|
||||
if (!found)
|
||||
{
|
||||
entry->offset = lfc_ctl->size++;
|
||||
entry->hash = hash;
|
||||
entry->access_count = 0;
|
||||
entry->prewarm_requested = true;
|
||||
entry->prewarm_started = false;
|
||||
memset(entry->bitmap, 0, sizeof entry->bitmap);
|
||||
/* Most recently visted pages are stored first */
|
||||
dlist_push_head(&lfc_ctl->lru, &entry->list_node);
|
||||
lfc_ctl->used += 1;
|
||||
}
|
||||
}
|
||||
LWLockRelease(lfc_lock);
|
||||
|
||||
elog(LOG, "LFC: start loading %ld chunks", (long)n_entries);
|
||||
|
||||
while (true)
|
||||
{
|
||||
size_t chunk_no = snd_idx / BLOCKS_PER_CHUNK;
|
||||
size_t offs_in_chunk = snd_idx % BLOCKS_PER_CHUNK;
|
||||
if (chunk_no < n_entries)
|
||||
{
|
||||
if (fs[chunk_no].bitmap[offs_in_chunk >> 5] & (1 << (offs_in_chunk & 31)))
|
||||
{
|
||||
/*
|
||||
* In case of prewarming replica we should be careful not to load too new version
|
||||
* of the page - with LSN larger than current replay LSN.
|
||||
* At primary we are always loading latest version.
|
||||
*/
|
||||
XLogRecPtr req_lsn = RecoveryInProgress() ? GetXLogReplayRecPtr(NULL) : UINT64_MAX;
|
||||
|
||||
NeonGetPageRequest request = {
|
||||
.req.tag = T_NeonGetPageRequest,
|
||||
/* lsn and not_modified_since are filled in below */
|
||||
.rinfo = BufTagGetNRelFileInfo(fs[chunk_no].key),
|
||||
.forknum = fs[chunk_no].key.forkNum,
|
||||
.blkno = fs[chunk_no].key.blockNum + offs_in_chunk,
|
||||
.req.lsn = req_lsn,
|
||||
.req.not_modified_since = 0
|
||||
};
|
||||
shard_no = get_shard_number(&fs[chunk_no].key);
|
||||
while (!page_server->send(shard_no, (NeonRequest *) &request)
|
||||
|| !page_server->flush(shard_no))
|
||||
{
|
||||
/* do nothing */
|
||||
}
|
||||
n_sent += 1;
|
||||
}
|
||||
snd_idx += 1;
|
||||
}
|
||||
if (n_sent >= n_received + lfc_prewarm_batch || chunk_no == n_entries)
|
||||
{
|
||||
NeonResponse * resp;
|
||||
do
|
||||
{
|
||||
chunk_no = rcv_idx / BLOCKS_PER_CHUNK;
|
||||
offs_in_chunk = rcv_idx % BLOCKS_PER_CHUNK;
|
||||
rcv_idx += 1;
|
||||
} while (!(fs[chunk_no].bitmap[offs_in_chunk >> 5] & (1 << (offs_in_chunk & 31))));
|
||||
|
||||
shard_no = get_shard_number(&fs[chunk_no].key);
|
||||
resp = page_server->receive(shard_no);
|
||||
lfc_ctl->prewarm_curr_chunk = chunk_no;
|
||||
|
||||
if (resp->tag != T_NeonGetPageResponse)
|
||||
{
|
||||
elog(LOG, "LFC: unexpected response type: %d", resp->tag);
|
||||
return;
|
||||
}
|
||||
|
||||
hash = get_hash_value(lfc_hash, &fs[chunk_no].key);
|
||||
|
||||
LWLockAcquire(lfc_lock, LW_EXCLUSIVE);
|
||||
entry = hash_search_with_hash_value(lfc_hash, &fs[chunk_no].key, hash, HASH_FIND, NULL);
|
||||
if (entry != NULL && entry->prewarm_requested)
|
||||
{
|
||||
/* Unlink entry from LRU list to pin it for the duration of IO operation */
|
||||
if (entry->access_count++ == 0)
|
||||
dlist_delete(&entry->list_node);
|
||||
|
||||
generation = lfc_ctl->generation;
|
||||
entry_offset = entry->offset;
|
||||
Assert(!entry->prewarm_started);
|
||||
entry->prewarm_started = true;
|
||||
|
||||
LWLockRelease(lfc_lock);
|
||||
|
||||
rc = pwrite(lfc_desc, ((NeonGetPageResponse*)resp)->page, BLCKSZ, ((off_t) entry_offset * BLOCKS_PER_CHUNK + offs_in_chunk) * BLCKSZ);
|
||||
if (rc != BLCKSZ)
|
||||
{
|
||||
lfc_disable("write");
|
||||
break;
|
||||
}
|
||||
else
|
||||
{
|
||||
LWLockAcquire(lfc_lock, LW_EXCLUSIVE);
|
||||
|
||||
if (lfc_ctl->generation == generation)
|
||||
{
|
||||
CriticalAssert(LFC_ENABLED());
|
||||
if (--entry->access_count == 0)
|
||||
dlist_push_tail(&lfc_ctl->lru, &entry->list_node);
|
||||
if (entry->prewarm_requested)
|
||||
{
|
||||
lfc_ctl->used_pages += 1 - ((entry->bitmap[offs_in_chunk >> 5] >> (offs_in_chunk & 31)) & 1);
|
||||
entry->bitmap[offs_in_chunk >> 5] |= 1 << (offs_in_chunk & 31);
|
||||
lfc_ctl->prewarmed_pages += 1;
|
||||
}
|
||||
else
|
||||
{
|
||||
lfc_ctl->skipped_pages += 1;
|
||||
}
|
||||
Assert(entry->prewarm_started);
|
||||
entry->prewarm_started = false;
|
||||
}
|
||||
|
||||
LWLockRelease(lfc_lock);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
Assert(!entry || !entry->prewarm_started);
|
||||
lfc_ctl->skipped_pages += 1;
|
||||
LWLockRelease(lfc_lock);
|
||||
}
|
||||
|
||||
if (++n_received == n_sent && snd_idx >= n_entries * BLOCKS_PER_CHUNK)
|
||||
{
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
Assert(n_sent == n_received);
|
||||
lfc_ctl->prewarm_curr_chunk = n_entries;
|
||||
elog(LOG, "LFC: complete prewarming: loaded %ld pages", (long)n_received);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* Load pages from LFC state saved in AUX file.
|
||||
*/
|
||||
void
|
||||
lfc_load_pages(void)
|
||||
{
|
||||
int fd;
|
||||
FileCacheStateEntry *fs;
|
||||
ssize_t rc;
|
||||
size_t max_entries = lfc_prewarm_limit;
|
||||
|
||||
fd = OpenTransientFile("lfc.state", O_RDONLY | PG_BINARY);
|
||||
if (fd < 0)
|
||||
{
|
||||
elog(LOG, "LFC: state file is missing");
|
||||
return;
|
||||
}
|
||||
|
||||
fs = (FileCacheStateEntry*)palloc(sizeof(FileCacheStateEntry) * max_entries);
|
||||
rc = read(fd, fs, sizeof(FileCacheStateEntry) * max_entries);
|
||||
if (rc <= 0)
|
||||
{
|
||||
elog(LOG, "LFC: Failed to read state file: %m");
|
||||
CloseTransientFile(fd);
|
||||
}
|
||||
else
|
||||
{
|
||||
CloseTransientFile(fd);
|
||||
elog(LOG, "LFC: read state with %lu entries", (long)(rc / sizeof(FileCacheStateEntry)));
|
||||
|
||||
lfc_prewarm(fs, rc / sizeof(FileCacheStateEntry));
|
||||
}
|
||||
pfree(fs);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* Check if page is present in the cache.
|
||||
* Returns true if page is found in local cache.
|
||||
@@ -616,6 +1007,7 @@ lfc_evict(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno)
|
||||
|
||||
/* remove the page from the cache */
|
||||
entry->bitmap[chunk_offs >> 5] &= ~(1 << (chunk_offs & (32 - 1)));
|
||||
entry->prewarm_requested = false; /* prohibit prewarm of this LFC entry */
|
||||
|
||||
if (entry->access_count == 0)
|
||||
{
|
||||
@@ -861,7 +1253,7 @@ lfc_writev(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
|
||||
|
||||
CriticalAssert(BufTagGetRelNumber(&tag) != InvalidRelFileNumber);
|
||||
|
||||
/*
|
||||
/*
|
||||
* For every chunk that has blocks we're interested in, we
|
||||
* 1. get the chunk header
|
||||
* 2. Check if the chunk actually has the blocks we're interested in
|
||||
@@ -899,6 +1291,17 @@ lfc_writev(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
|
||||
|
||||
if (found)
|
||||
{
|
||||
if (entry->prewarm_started)
|
||||
{
|
||||
/*
|
||||
* Some page of this chunk is currently written by `lfc_prewarm`.
|
||||
* We should give-up not to interfere with it.
|
||||
* But clearing `prewarm_requested` flag also will not allow `lfc_prewarm` to fix it result.
|
||||
*/
|
||||
entry->prewarm_requested = false;
|
||||
LWLockRelease(lfc_lock);
|
||||
return;
|
||||
}
|
||||
/*
|
||||
* Unlink entry from LRU list to pin it for the duration of IO
|
||||
* operation
|
||||
@@ -928,7 +1331,7 @@ lfc_writev(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
|
||||
{
|
||||
/* Cache overflow: evict least recently used chunk */
|
||||
FileCacheEntry *victim = dlist_container(FileCacheEntry, list_node, dlist_pop_head_node(&lfc_ctl->lru));
|
||||
|
||||
|
||||
for (int i = 0; i < BLOCKS_PER_CHUNK; i++)
|
||||
{
|
||||
lfc_ctl->used_pages -= (victim->bitmap[i >> 5] >> (i & 31)) & 1;
|
||||
@@ -944,10 +1347,10 @@ lfc_writev(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
|
||||
FileCacheEntry *hole = dlist_container(FileCacheEntry, list_node, dlist_pop_head_node(&lfc_ctl->holes));
|
||||
uint32 offset = hole->offset;
|
||||
bool hole_found;
|
||||
|
||||
|
||||
hash_search_with_hash_value(lfc_hash, &hole->key, hole->hash, HASH_REMOVE, &hole_found);
|
||||
CriticalAssert(hole_found);
|
||||
|
||||
|
||||
lfc_ctl->used += 1;
|
||||
entry->offset = offset; /* reuse the hole */
|
||||
}
|
||||
@@ -959,9 +1362,11 @@ lfc_writev(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
|
||||
}
|
||||
entry->access_count = 1;
|
||||
entry->hash = hash;
|
||||
entry->prewarm_started = false;
|
||||
memset(entry->bitmap, 0, sizeof entry->bitmap);
|
||||
}
|
||||
|
||||
entry->prewarm_requested = false; /* prohibit prewarm if LFC entry is updated by some backend */
|
||||
generation = lfc_ctl->generation;
|
||||
entry_offset = entry->offset;
|
||||
LWLockRelease(lfc_lock);
|
||||
@@ -1334,3 +1739,74 @@ approximate_working_set_size(PG_FUNCTION_ARGS)
|
||||
}
|
||||
PG_RETURN_NULL();
|
||||
}
|
||||
|
||||
PG_FUNCTION_INFO_V1(save_local_cache_state);
|
||||
|
||||
Datum
|
||||
save_local_cache_state(PG_FUNCTION_ARGS)
|
||||
{
|
||||
lfc_save_state();
|
||||
PG_RETURN_NULL();
|
||||
}
|
||||
|
||||
PG_FUNCTION_INFO_V1(get_local_cache_state);
|
||||
|
||||
Datum
|
||||
get_local_cache_state(PG_FUNCTION_ARGS)
|
||||
{
|
||||
size_t n_entries = PG_ARGISNULL(0) ? lfc_prewarm_limit : PG_GETARG_INT32(0);
|
||||
FileCacheStateEntry* fs = lfc_get_state(&n_entries);
|
||||
size_t size_in_bytes = sizeof(FileCacheStateEntry) * n_entries;
|
||||
bytea* res = (bytea*)palloc(VARHDRSZ + size_in_bytes);
|
||||
|
||||
SET_VARSIZE(res, VARHDRSZ + size_in_bytes);
|
||||
memcpy(VARDATA(res), fs, size_in_bytes);
|
||||
pfree(fs);
|
||||
|
||||
PG_RETURN_BYTEA_P(res);
|
||||
}
|
||||
|
||||
PG_FUNCTION_INFO_V1(prewarm_local_cache);
|
||||
|
||||
Datum
|
||||
prewarm_local_cache(PG_FUNCTION_ARGS)
|
||||
{
|
||||
bytea* state = PG_GETARG_BYTEA_PP(0);
|
||||
uint32 n_entries = VARSIZE_ANY_EXHDR(state);
|
||||
FileCacheStateEntry* fs = (FileCacheStateEntry*)VARDATA_ANY(state);
|
||||
|
||||
lfc_prewarm(fs, n_entries);
|
||||
|
||||
PG_RETURN_NULL();
|
||||
}
|
||||
|
||||
PG_FUNCTION_INFO_V1(get_prewarm_info);
|
||||
|
||||
Datum
|
||||
get_prewarm_info(PG_FUNCTION_ARGS)
|
||||
{
|
||||
Datum values[4];
|
||||
bool nulls[4];
|
||||
TupleDesc tupdesc;
|
||||
|
||||
if (lfc_size_limit == 0)
|
||||
PG_RETURN_NULL();
|
||||
|
||||
tupdesc = CreateTemplateTupleDesc(4);
|
||||
TupleDescInitEntry(tupdesc, (AttrNumber) 1, "total_chunks", INT4OID, -1, 0);
|
||||
TupleDescInitEntry(tupdesc, (AttrNumber) 2, "curr_chunk", INT4OID, -1, 0);
|
||||
TupleDescInitEntry(tupdesc, (AttrNumber) 3, "prewarmed_pages", INT4OID, -1, 0);
|
||||
TupleDescInitEntry(tupdesc, (AttrNumber) 4, "skipped_pages", INT4OID, -1, 0);
|
||||
tupdesc = BlessTupleDesc(tupdesc);
|
||||
|
||||
MemSet(nulls, 0, sizeof(nulls));
|
||||
LWLockAcquire(lfc_lock, LW_SHARED);
|
||||
values[0] = Int32GetDatum(lfc_ctl->prewarm_total_chunks);
|
||||
values[1] = Int32GetDatum(lfc_ctl->prewarm_curr_chunk);
|
||||
values[2] = Int32GetDatum(lfc_ctl->prewarmed_pages);
|
||||
values[3] = Int32GetDatum(lfc_ctl->skipped_pages);
|
||||
LWLockRelease(lfc_lock);
|
||||
|
||||
PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls)));
|
||||
}
|
||||
|
||||
|
||||
@@ -1,253 +0,0 @@
|
||||
#include <limits.h>
|
||||
#include <string.h>
|
||||
#include <dirent.h>
|
||||
#include <signal.h>
|
||||
|
||||
#include "postgres.h"
|
||||
|
||||
#include "miscadmin.h"
|
||||
#include "postmaster/bgworker.h"
|
||||
#include "postmaster/interrupt.h"
|
||||
#include "replication/slot.h"
|
||||
#include "storage/fd.h"
|
||||
#include "storage/procsignal.h"
|
||||
#include "tcop/tcopprot.h"
|
||||
#include "utils/guc.h"
|
||||
#include "utils/wait_event.h"
|
||||
|
||||
#include "logical_replication_monitor.h"
|
||||
|
||||
#define LS_MONITOR_CHECK_INTERVAL 10000 /* ms */
|
||||
|
||||
static int logical_replication_max_snap_files = 300;
|
||||
|
||||
PGDLLEXPORT void LogicalSlotsMonitorMain(Datum main_arg);
|
||||
|
||||
static int
|
||||
LsnDescComparator(const void *a, const void *b)
|
||||
{
|
||||
XLogRecPtr lsn1 = *((const XLogRecPtr *) a);
|
||||
XLogRecPtr lsn2 = *((const XLogRecPtr *) b);
|
||||
|
||||
if (lsn1 < lsn2)
|
||||
return 1;
|
||||
else if (lsn1 == lsn2)
|
||||
return 0;
|
||||
else
|
||||
return -1;
|
||||
}
|
||||
|
||||
/*
|
||||
* Look at .snap files and calculate minimum allowed restart_lsn of slot so that
|
||||
* next gc would leave not more than logical_replication_max_snap_files; all
|
||||
* slots having lower restart_lsn should be dropped.
|
||||
*/
|
||||
static XLogRecPtr
|
||||
get_num_snap_files_lsn_threshold(void)
|
||||
{
|
||||
DIR *dirdesc;
|
||||
struct dirent *de;
|
||||
char *snap_path = "pg_logical/snapshots/";
|
||||
int lsns_allocated = 1024;
|
||||
int lsns_num = 0;
|
||||
XLogRecPtr *lsns;
|
||||
XLogRecPtr cutoff;
|
||||
|
||||
if (logical_replication_max_snap_files < 0)
|
||||
return 0;
|
||||
|
||||
lsns = palloc(sizeof(XLogRecPtr) * lsns_allocated);
|
||||
|
||||
/* find all .snap files and get their lsns */
|
||||
dirdesc = AllocateDir(snap_path);
|
||||
while ((de = ReadDir(dirdesc, snap_path)) != NULL)
|
||||
{
|
||||
XLogRecPtr lsn;
|
||||
uint32 hi;
|
||||
uint32 lo;
|
||||
|
||||
if (strcmp(de->d_name, ".") == 0 ||
|
||||
strcmp(de->d_name, "..") == 0)
|
||||
continue;
|
||||
|
||||
if (sscanf(de->d_name, "%X-%X.snap", &hi, &lo) != 2)
|
||||
{
|
||||
ereport(LOG,
|
||||
(errmsg("could not parse file name as .snap file \"%s\"", de->d_name)));
|
||||
continue;
|
||||
}
|
||||
|
||||
lsn = ((uint64) hi) << 32 | lo;
|
||||
elog(DEBUG5, "found snap file %X/%X", LSN_FORMAT_ARGS(lsn));
|
||||
if (lsns_allocated == lsns_num)
|
||||
{
|
||||
lsns_allocated *= 2;
|
||||
lsns = repalloc(lsns, sizeof(XLogRecPtr) * lsns_allocated);
|
||||
}
|
||||
lsns[lsns_num++] = lsn;
|
||||
}
|
||||
/* sort by lsn desc */
|
||||
qsort(lsns, lsns_num, sizeof(XLogRecPtr), LsnDescComparator);
|
||||
/* and take cutoff at logical_replication_max_snap_files */
|
||||
if (logical_replication_max_snap_files > lsns_num)
|
||||
cutoff = 0;
|
||||
/* have less files than cutoff */
|
||||
else
|
||||
{
|
||||
cutoff = lsns[logical_replication_max_snap_files - 1];
|
||||
elog(LOG, "ls_monitor: dropping logical slots with restart_lsn lower %X/%X, found %d .snap files, limit is %d",
|
||||
LSN_FORMAT_ARGS(cutoff), lsns_num, logical_replication_max_snap_files);
|
||||
}
|
||||
pfree(lsns);
|
||||
FreeDir(dirdesc);
|
||||
return cutoff;
|
||||
}
|
||||
|
||||
void
|
||||
InitLogicalReplicationMonitor(void)
|
||||
{
|
||||
BackgroundWorker bgw;
|
||||
|
||||
DefineCustomIntVariable(
|
||||
"neon.logical_replication_max_snap_files",
|
||||
"Maximum allowed logical replication .snap files. When exceeded, slots are dropped until the limit is met. -1 disables the limit.",
|
||||
NULL,
|
||||
&logical_replication_max_snap_files,
|
||||
300, -1, INT_MAX,
|
||||
PGC_SIGHUP,
|
||||
0,
|
||||
NULL, NULL, NULL);
|
||||
|
||||
memset(&bgw, 0, sizeof(bgw));
|
||||
bgw.bgw_flags = BGWORKER_SHMEM_ACCESS;
|
||||
bgw.bgw_start_time = BgWorkerStart_RecoveryFinished;
|
||||
snprintf(bgw.bgw_library_name, BGW_MAXLEN, "neon");
|
||||
snprintf(bgw.bgw_function_name, BGW_MAXLEN, "LogicalSlotsMonitorMain");
|
||||
snprintf(bgw.bgw_name, BGW_MAXLEN, "Logical replication monitor");
|
||||
snprintf(bgw.bgw_type, BGW_MAXLEN, "Logical replication monitor");
|
||||
bgw.bgw_restart_time = 5;
|
||||
bgw.bgw_notify_pid = 0;
|
||||
bgw.bgw_main_arg = (Datum) 0;
|
||||
|
||||
RegisterBackgroundWorker(&bgw);
|
||||
}
|
||||
|
||||
/*
|
||||
* Unused logical replication slots pins WAL and prevents deletion of snapshots.
|
||||
* WAL bloat is guarded by max_slot_wal_keep_size; this bgw removes slots which
|
||||
* need too many .snap files.
|
||||
*/
|
||||
void
|
||||
LogicalSlotsMonitorMain(Datum main_arg)
|
||||
{
|
||||
/* Establish signal handlers. */
|
||||
pqsignal(SIGUSR1, procsignal_sigusr1_handler);
|
||||
pqsignal(SIGHUP, SignalHandlerForConfigReload);
|
||||
pqsignal(SIGTERM, die);
|
||||
|
||||
BackgroundWorkerUnblockSignals();
|
||||
|
||||
for (;;)
|
||||
{
|
||||
XLogRecPtr cutoff_lsn;
|
||||
|
||||
/* In case of a SIGHUP, just reload the configuration. */
|
||||
if (ConfigReloadPending)
|
||||
{
|
||||
ConfigReloadPending = false;
|
||||
ProcessConfigFile(PGC_SIGHUP);
|
||||
}
|
||||
|
||||
/*
|
||||
* If there are too many .snap files, just drop all logical slots to
|
||||
* prevent aux files bloat.
|
||||
*/
|
||||
cutoff_lsn = get_num_snap_files_lsn_threshold();
|
||||
if (cutoff_lsn > 0)
|
||||
{
|
||||
for (int i = 0; i < max_replication_slots; i++)
|
||||
{
|
||||
char slot_name[NAMEDATALEN];
|
||||
ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
|
||||
XLogRecPtr restart_lsn;
|
||||
|
||||
/* find the name */
|
||||
LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
|
||||
/* Consider only logical repliction slots */
|
||||
if (!s->in_use || !SlotIsLogical(s))
|
||||
{
|
||||
LWLockRelease(ReplicationSlotControlLock);
|
||||
continue;
|
||||
}
|
||||
|
||||
/* do we need to drop it? */
|
||||
SpinLockAcquire(&s->mutex);
|
||||
restart_lsn = s->data.restart_lsn;
|
||||
SpinLockRelease(&s->mutex);
|
||||
if (restart_lsn >= cutoff_lsn)
|
||||
{
|
||||
LWLockRelease(ReplicationSlotControlLock);
|
||||
continue;
|
||||
}
|
||||
|
||||
strlcpy(slot_name, s->data.name.data, NAMEDATALEN);
|
||||
elog(LOG, "ls_monitor: dropping slot %s with restart_lsn %X/%X below horizon %X/%X",
|
||||
slot_name, LSN_FORMAT_ARGS(restart_lsn), LSN_FORMAT_ARGS(cutoff_lsn));
|
||||
LWLockRelease(ReplicationSlotControlLock);
|
||||
|
||||
/* now try to drop it, killing owner before if any */
|
||||
for (;;)
|
||||
{
|
||||
pid_t active_pid;
|
||||
|
||||
SpinLockAcquire(&s->mutex);
|
||||
active_pid = s->active_pid;
|
||||
SpinLockRelease(&s->mutex);
|
||||
|
||||
if (active_pid == 0)
|
||||
{
|
||||
/*
|
||||
* Slot is releasted, try to drop it. Though of course
|
||||
* it could have been reacquired, so drop can ERROR
|
||||
* out. Similarly it could have been dropped in the
|
||||
* meanwhile.
|
||||
*
|
||||
* In principle we could remove pg_try/pg_catch, that
|
||||
* would restart the whole bgworker.
|
||||
*/
|
||||
ConditionVariableCancelSleep();
|
||||
PG_TRY();
|
||||
{
|
||||
ReplicationSlotDrop(slot_name, true);
|
||||
elog(LOG, "ls_monitor: slot %s dropped", slot_name);
|
||||
}
|
||||
PG_CATCH();
|
||||
{
|
||||
/* log ERROR and reset elog stack */
|
||||
EmitErrorReport();
|
||||
FlushErrorState();
|
||||
elog(LOG, "ls_monitor: failed to drop slot %s", slot_name);
|
||||
}
|
||||
PG_END_TRY();
|
||||
break;
|
||||
}
|
||||
else
|
||||
{
|
||||
/* kill the owner and wait for release */
|
||||
elog(LOG, "ls_monitor: killing slot %s owner %d", slot_name, active_pid);
|
||||
(void) kill(active_pid, SIGTERM);
|
||||
/* We shouldn't get stuck, but to be safe add timeout. */
|
||||
ConditionVariableTimedSleep(&s->active_cv, 1000, WAIT_EVENT_REPLICATION_SLOT_DROP);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
(void) WaitLatch(MyLatch,
|
||||
WL_LATCH_SET | WL_EXIT_ON_PM_DEATH | WL_TIMEOUT,
|
||||
LS_MONITOR_CHECK_INTERVAL,
|
||||
PG_WAIT_EXTENSION);
|
||||
ResetLatch(MyLatch);
|
||||
CHECK_FOR_INTERRUPTS();
|
||||
}
|
||||
}
|
||||
@@ -1,6 +0,0 @@
|
||||
#ifndef __NEON_LOGICAL_REPLICATION_MONITOR_H__
|
||||
#define __NEON_LOGICAL_REPLICATION_MONITOR_H__
|
||||
|
||||
void InitLogicalReplicationMonitor(void);
|
||||
|
||||
#endif
|
||||
28
pgxn/neon/neon--1.5--1.6.sql
Normal file
28
pgxn/neon/neon--1.5--1.6.sql
Normal file
@@ -0,0 +1,28 @@
|
||||
\echo Use "ALTER EXTENSION neon UPDATE TO '1.6'" to load this file. \quit
|
||||
|
||||
CREATE FUNCTION save_local_cache_state()
|
||||
RETURNS void
|
||||
AS 'MODULE_PATHNAME', 'save_local_cache_state'
|
||||
LANGUAGE C STRICT
|
||||
PARALLEL UNSAFE;
|
||||
|
||||
CREATE FUNCTION get_prewarm_info(out total_chunks integer, out curr_chunk integer, out prewarmed_pages integer, out skipped_pages integer)
|
||||
RETURNS record
|
||||
AS 'MODULE_PATHNAME', 'get_prewarm_info'
|
||||
LANGUAGE C STRICT
|
||||
PARALLEL SAFE;
|
||||
|
||||
CREATE FUNCTION get_local_cache_state(max_chunks integer default null)
|
||||
RETURNS bytea
|
||||
AS 'MODULE_PATHNAME', 'get_local_cache_state'
|
||||
LANGUAGE C
|
||||
PARALLEL UNSAFE;
|
||||
|
||||
CREATE FUNCTION prewarm_local_cache(state bytea)
|
||||
RETURNS void
|
||||
AS 'MODULE_PATHNAME', 'prewarm_local_cache'
|
||||
LANGUAGE C STRICT
|
||||
PARALLEL UNSAFE;
|
||||
|
||||
|
||||
|
||||
9
pgxn/neon/neon--1.6--1.5.sql
Normal file
9
pgxn/neon/neon--1.6--1.5.sql
Normal file
@@ -0,0 +1,9 @@
|
||||
DROP FUNCTION IF EXISTS save_local_cache_state();
|
||||
|
||||
DROP FUNCTION IF EXISTS get_prewarm_info(out total_chunks integer, out curr_chunk integer, out prewarmed_pages integer, out skipped_pages integer);
|
||||
|
||||
DROP FUNCTION IF EXISTS get_local_cache_state(max_chunks integer);
|
||||
|
||||
DROP FUNCTION IF EXISTS prewarm_local_cache(state bytea);
|
||||
|
||||
|
||||
245
pgxn/neon/neon.c
245
pgxn/neon/neon.c
@@ -14,22 +14,32 @@
|
||||
#include "miscadmin.h"
|
||||
#include "access/subtrans.h"
|
||||
#include "access/twophase.h"
|
||||
#include "access/xact.h"
|
||||
#include "access/xlog.h"
|
||||
#include "storage/buf_internals.h"
|
||||
#include "storage/bufmgr.h"
|
||||
#include "catalog/pg_type.h"
|
||||
#include "postmaster/bgworker.h"
|
||||
#include "postmaster/interrupt.h"
|
||||
#include "replication/logical.h"
|
||||
#include "replication/slot.h"
|
||||
#include "replication/walsender.h"
|
||||
#include "storage/proc.h"
|
||||
#include "storage/procsignal.h"
|
||||
#include "tcop/tcopprot.h"
|
||||
#include "funcapi.h"
|
||||
#include "access/htup_details.h"
|
||||
#include "utils/builtins.h"
|
||||
#include "utils/pg_lsn.h"
|
||||
#include "utils/guc.h"
|
||||
#include "utils/guc_tables.h"
|
||||
#include "utils/wait_event.h"
|
||||
|
||||
#include "extension_server.h"
|
||||
#include "neon.h"
|
||||
#include "walproposer.h"
|
||||
#include "pagestore_client.h"
|
||||
#include "control_plane_connector.h"
|
||||
#include "logical_replication_monitor.h"
|
||||
#include "walsender_hooks.h"
|
||||
#if PG_MAJORVERSION_NUM >= 16
|
||||
#include "storage/ipc.h"
|
||||
@@ -38,6 +48,7 @@
|
||||
PG_MODULE_MAGIC;
|
||||
void _PG_init(void);
|
||||
|
||||
static int logical_replication_max_snap_files = 300;
|
||||
|
||||
static int running_xacts_overflow_policy;
|
||||
|
||||
@@ -71,6 +82,237 @@ static const struct config_enum_entry running_xacts_overflow_policies[] = {
|
||||
{NULL, 0, false}
|
||||
};
|
||||
|
||||
static void
|
||||
InitLogicalReplicationMonitor(void)
|
||||
{
|
||||
BackgroundWorker bgw;
|
||||
|
||||
DefineCustomIntVariable(
|
||||
"neon.logical_replication_max_snap_files",
|
||||
"Maximum allowed logical replication .snap files. When exceeded, slots are dropped until the limit is met. -1 disables the limit.",
|
||||
NULL,
|
||||
&logical_replication_max_snap_files,
|
||||
300, -1, INT_MAX,
|
||||
PGC_SIGHUP,
|
||||
0,
|
||||
NULL, NULL, NULL);
|
||||
|
||||
memset(&bgw, 0, sizeof(bgw));
|
||||
bgw.bgw_flags = BGWORKER_SHMEM_ACCESS;
|
||||
bgw.bgw_start_time = BgWorkerStart_RecoveryFinished;
|
||||
snprintf(bgw.bgw_library_name, BGW_MAXLEN, "neon");
|
||||
snprintf(bgw.bgw_function_name, BGW_MAXLEN, "LogicalSlotsMonitorMain");
|
||||
snprintf(bgw.bgw_name, BGW_MAXLEN, "Logical replication monitor");
|
||||
snprintf(bgw.bgw_type, BGW_MAXLEN, "Logical replication monitor");
|
||||
bgw.bgw_restart_time = 5;
|
||||
bgw.bgw_notify_pid = 0;
|
||||
bgw.bgw_main_arg = (Datum) 0;
|
||||
|
||||
RegisterBackgroundWorker(&bgw);
|
||||
}
|
||||
|
||||
static int
|
||||
LsnDescComparator(const void *a, const void *b)
|
||||
{
|
||||
XLogRecPtr lsn1 = *((const XLogRecPtr *) a);
|
||||
XLogRecPtr lsn2 = *((const XLogRecPtr *) b);
|
||||
|
||||
if (lsn1 < lsn2)
|
||||
return 1;
|
||||
else if (lsn1 == lsn2)
|
||||
return 0;
|
||||
else
|
||||
return -1;
|
||||
}
|
||||
|
||||
/*
|
||||
* Look at .snap files and calculate minimum allowed restart_lsn of slot so that
|
||||
* next gc would leave not more than logical_replication_max_snap_files; all
|
||||
* slots having lower restart_lsn should be dropped.
|
||||
*/
|
||||
static XLogRecPtr
|
||||
get_num_snap_files_lsn_threshold(void)
|
||||
{
|
||||
DIR *dirdesc;
|
||||
struct dirent *de;
|
||||
char *snap_path = "pg_logical/snapshots/";
|
||||
int lsns_allocated = 1024;
|
||||
int lsns_num = 0;
|
||||
XLogRecPtr *lsns;
|
||||
XLogRecPtr cutoff;
|
||||
|
||||
if (logical_replication_max_snap_files < 0)
|
||||
return 0;
|
||||
|
||||
lsns = palloc(sizeof(XLogRecPtr) * lsns_allocated);
|
||||
|
||||
/* find all .snap files and get their lsns */
|
||||
dirdesc = AllocateDir(snap_path);
|
||||
while ((de = ReadDir(dirdesc, snap_path)) != NULL)
|
||||
{
|
||||
XLogRecPtr lsn;
|
||||
uint32 hi;
|
||||
uint32 lo;
|
||||
|
||||
if (strcmp(de->d_name, ".") == 0 ||
|
||||
strcmp(de->d_name, "..") == 0)
|
||||
continue;
|
||||
|
||||
if (sscanf(de->d_name, "%X-%X.snap", &hi, &lo) != 2)
|
||||
{
|
||||
ereport(LOG,
|
||||
(errmsg("could not parse file name as .snap file \"%s\"", de->d_name)));
|
||||
continue;
|
||||
}
|
||||
|
||||
lsn = ((uint64) hi) << 32 | lo;
|
||||
elog(DEBUG5, "found snap file %X/%X", LSN_FORMAT_ARGS(lsn));
|
||||
if (lsns_allocated == lsns_num)
|
||||
{
|
||||
lsns_allocated *= 2;
|
||||
lsns = repalloc(lsns, sizeof(XLogRecPtr) * lsns_allocated);
|
||||
}
|
||||
lsns[lsns_num++] = lsn;
|
||||
}
|
||||
/* sort by lsn desc */
|
||||
qsort(lsns, lsns_num, sizeof(XLogRecPtr), LsnDescComparator);
|
||||
/* and take cutoff at logical_replication_max_snap_files */
|
||||
if (logical_replication_max_snap_files > lsns_num)
|
||||
cutoff = 0;
|
||||
/* have less files than cutoff */
|
||||
else
|
||||
{
|
||||
cutoff = lsns[logical_replication_max_snap_files - 1];
|
||||
elog(LOG, "ls_monitor: dropping logical slots with restart_lsn lower %X/%X, found %d .snap files, limit is %d",
|
||||
LSN_FORMAT_ARGS(cutoff), lsns_num, logical_replication_max_snap_files);
|
||||
}
|
||||
pfree(lsns);
|
||||
FreeDir(dirdesc);
|
||||
return cutoff;
|
||||
}
|
||||
|
||||
#define LS_MONITOR_CHECK_INTERVAL 10000 /* ms */
|
||||
|
||||
/*
|
||||
* Unused logical replication slots pins WAL and prevents deletion of snapshots.
|
||||
* WAL bloat is guarded by max_slot_wal_keep_size; this bgw removes slots which
|
||||
* need too many .snap files.
|
||||
*/
|
||||
PGDLLEXPORT void
|
||||
LogicalSlotsMonitorMain(Datum main_arg)
|
||||
{
|
||||
/* Establish signal handlers. */
|
||||
pqsignal(SIGUSR1, procsignal_sigusr1_handler);
|
||||
pqsignal(SIGHUP, SignalHandlerForConfigReload);
|
||||
pqsignal(SIGTERM, die);
|
||||
|
||||
BackgroundWorkerUnblockSignals();
|
||||
|
||||
for (;;)
|
||||
{
|
||||
XLogRecPtr cutoff_lsn;
|
||||
|
||||
/* In case of a SIGHUP, just reload the configuration. */
|
||||
if (ConfigReloadPending)
|
||||
{
|
||||
ConfigReloadPending = false;
|
||||
ProcessConfigFile(PGC_SIGHUP);
|
||||
}
|
||||
|
||||
/*
|
||||
* If there are too many .snap files, just drop all logical slots to
|
||||
* prevent aux files bloat.
|
||||
*/
|
||||
cutoff_lsn = get_num_snap_files_lsn_threshold();
|
||||
if (cutoff_lsn > 0)
|
||||
{
|
||||
for (int i = 0; i < max_replication_slots; i++)
|
||||
{
|
||||
char slot_name[NAMEDATALEN];
|
||||
ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
|
||||
XLogRecPtr restart_lsn;
|
||||
|
||||
/* find the name */
|
||||
LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
|
||||
/* Consider only logical repliction slots */
|
||||
if (!s->in_use || !SlotIsLogical(s))
|
||||
{
|
||||
LWLockRelease(ReplicationSlotControlLock);
|
||||
continue;
|
||||
}
|
||||
|
||||
/* do we need to drop it? */
|
||||
SpinLockAcquire(&s->mutex);
|
||||
restart_lsn = s->data.restart_lsn;
|
||||
SpinLockRelease(&s->mutex);
|
||||
if (restart_lsn >= cutoff_lsn)
|
||||
{
|
||||
LWLockRelease(ReplicationSlotControlLock);
|
||||
continue;
|
||||
}
|
||||
|
||||
strlcpy(slot_name, s->data.name.data, NAMEDATALEN);
|
||||
elog(LOG, "ls_monitor: dropping slot %s with restart_lsn %X/%X below horizon %X/%X",
|
||||
slot_name, LSN_FORMAT_ARGS(restart_lsn), LSN_FORMAT_ARGS(cutoff_lsn));
|
||||
LWLockRelease(ReplicationSlotControlLock);
|
||||
|
||||
/* now try to drop it, killing owner before if any */
|
||||
for (;;)
|
||||
{
|
||||
pid_t active_pid;
|
||||
|
||||
SpinLockAcquire(&s->mutex);
|
||||
active_pid = s->active_pid;
|
||||
SpinLockRelease(&s->mutex);
|
||||
|
||||
if (active_pid == 0)
|
||||
{
|
||||
/*
|
||||
* Slot is releasted, try to drop it. Though of course
|
||||
* it could have been reacquired, so drop can ERROR
|
||||
* out. Similarly it could have been dropped in the
|
||||
* meanwhile.
|
||||
*
|
||||
* In principle we could remove pg_try/pg_catch, that
|
||||
* would restart the whole bgworker.
|
||||
*/
|
||||
ConditionVariableCancelSleep();
|
||||
PG_TRY();
|
||||
{
|
||||
ReplicationSlotDrop(slot_name, true);
|
||||
elog(LOG, "ls_monitor: slot %s dropped", slot_name);
|
||||
}
|
||||
PG_CATCH();
|
||||
{
|
||||
/* log ERROR and reset elog stack */
|
||||
EmitErrorReport();
|
||||
FlushErrorState();
|
||||
elog(LOG, "ls_monitor: failed to drop slot %s", slot_name);
|
||||
}
|
||||
PG_END_TRY();
|
||||
break;
|
||||
}
|
||||
else
|
||||
{
|
||||
/* kill the owner and wait for release */
|
||||
elog(LOG, "ls_monitor: killing slot %s owner %d", slot_name, active_pid);
|
||||
(void) kill(active_pid, SIGTERM);
|
||||
/* We shouldn't get stuck, but to be safe add timeout. */
|
||||
ConditionVariableTimedSleep(&s->active_cv, 1000, WAIT_EVENT_REPLICATION_SLOT_DROP);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
(void) WaitLatch(MyLatch,
|
||||
WL_LATCH_SET | WL_EXIT_ON_PM_DEATH | WL_TIMEOUT,
|
||||
LS_MONITOR_CHECK_INTERVAL,
|
||||
PG_WAIT_EXTENSION);
|
||||
ResetLatch(MyLatch);
|
||||
CHECK_FOR_INTERRUPTS();
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* XXX: These private to procarray.c, but we need them here.
|
||||
*/
|
||||
@@ -425,6 +667,7 @@ _PG_init(void)
|
||||
SlotFuncs_Custom_XLogReaderRoutines = NeonOnDemandXLogReaderRoutines;
|
||||
|
||||
InitLogicalReplicationMonitor();
|
||||
|
||||
InitControlPlaneConnector();
|
||||
|
||||
pg_init_extension_server();
|
||||
|
||||
@@ -276,6 +276,8 @@ extern int lfc_cache_containsv(NRelFileInfo rinfo, ForkNumber forkNum,
|
||||
BlockNumber blkno, int nblocks, bits8 *bitmap);
|
||||
extern void lfc_evict(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno);
|
||||
extern void lfc_init(void);
|
||||
extern void lfc_save_state(void);
|
||||
extern void lfc_load_pages(void);
|
||||
|
||||
static inline bool
|
||||
lfc_read(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
|
||||
|
||||
10
poetry.lock
generated
10
poetry.lock
generated
@@ -1,4 +1,4 @@
|
||||
# This file is automatically @generated by Poetry 1.8.3 and should not be changed by hand.
|
||||
# This file is automatically @generated by Poetry 1.8.4 and should not be changed by hand.
|
||||
|
||||
[[package]]
|
||||
name = "aiohappyeyeballs"
|
||||
@@ -3118,13 +3118,13 @@ files = [
|
||||
|
||||
[[package]]
|
||||
name = "werkzeug"
|
||||
version = "3.0.6"
|
||||
version = "3.0.3"
|
||||
description = "The comprehensive WSGI web application library."
|
||||
optional = false
|
||||
python-versions = ">=3.8"
|
||||
files = [
|
||||
{file = "werkzeug-3.0.6-py3-none-any.whl", hash = "sha256:1bc0c2310d2fbb07b1dd1105eba2f7af72f322e1e455f2f93c993bee8c8a5f17"},
|
||||
{file = "werkzeug-3.0.6.tar.gz", hash = "sha256:a8dd59d4de28ca70471a34cba79bed5f7ef2e036a76b3ab0835474246eb41f8d"},
|
||||
{file = "werkzeug-3.0.3-py3-none-any.whl", hash = "sha256:fc9645dc43e03e4d630d23143a04a7f947a9a3b5727cd535fdfe155a17cc48c8"},
|
||||
{file = "werkzeug-3.0.3.tar.gz", hash = "sha256:097e5bfda9f0aba8da6b8545146def481d06aa7d3266e7448e2cccf67dd8bd18"},
|
||||
]
|
||||
|
||||
[package.dependencies]
|
||||
@@ -3406,4 +3406,4 @@ cffi = ["cffi (>=1.11)"]
|
||||
[metadata]
|
||||
lock-version = "2.0"
|
||||
python-versions = "^3.9"
|
||||
content-hash = "0f4804119f417edf8e1fbd6d715d2e8d70ad731334fa9570304a2203f83339cf"
|
||||
content-hash = "f52632571e34b0e51b059c280c35d6ff6f69f6a8c9586caca78282baf635be91"
|
||||
|
||||
@@ -5,7 +5,6 @@ use std::time::{Duration, SystemTime};
|
||||
use arc_swap::ArcSwapOption;
|
||||
use dashmap::DashMap;
|
||||
use jose_jwk::crypto::KeyInfo;
|
||||
use reqwest::{redirect, Client};
|
||||
use serde::de::Visitor;
|
||||
use serde::{Deserialize, Deserializer};
|
||||
use signature::Verifier;
|
||||
@@ -25,7 +24,6 @@ const MIN_RENEW: Duration = Duration::from_secs(30);
|
||||
const AUTO_RENEW: Duration = Duration::from_secs(300);
|
||||
const MAX_RENEW: Duration = Duration::from_secs(3600);
|
||||
const MAX_JWK_BODY_SIZE: usize = 64 * 1024;
|
||||
const JWKS_USER_AGENT: &str = "neon-proxy";
|
||||
|
||||
/// How to get the JWT auth rules
|
||||
pub(crate) trait FetchAuthRules: Clone + Send + Sync + 'static {
|
||||
@@ -52,6 +50,7 @@ pub(crate) struct AuthRule {
|
||||
pub(crate) role_names: Vec<RoleNameInt>,
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
pub struct JwkCache {
|
||||
client: reqwest::Client,
|
||||
|
||||
@@ -358,20 +357,6 @@ impl JwkCache {
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for JwkCache {
|
||||
fn default() -> Self {
|
||||
let client = Client::builder()
|
||||
.user_agent(JWKS_USER_AGENT)
|
||||
.redirect(redirect::Policy::none())
|
||||
.build()
|
||||
.expect("using &str and standard redirect::Policy");
|
||||
JwkCache {
|
||||
client,
|
||||
map: DashMap::default(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn verify_ec_signature(data: &[u8], sig: &[u8], key: &jose_jwk::Ec) -> Result<(), JwtError> {
|
||||
use ecdsa::Signature;
|
||||
use signature::Verifier;
|
||||
|
||||
@@ -32,7 +32,6 @@ use hyper_util::rt::TokioExecutor;
|
||||
use hyper_util::server::conn::auto::Builder;
|
||||
use rand::rngs::StdRng;
|
||||
use rand::SeedableRng;
|
||||
use sql_over_http::{uuid_to_header_value, NEON_REQUEST_ID};
|
||||
use tokio::io::{AsyncRead, AsyncWrite};
|
||||
use tokio::net::{TcpListener, TcpStream};
|
||||
use tokio::time::timeout;
|
||||
@@ -310,18 +309,7 @@ async fn connection_handler(
|
||||
hyper_util::rt::TokioIo::new(conn),
|
||||
hyper::service::service_fn(move |req: hyper::Request<Incoming>| {
|
||||
// First HTTP request shares the same session ID
|
||||
let mut session_id = session_id.take().unwrap_or_else(uuid::Uuid::new_v4);
|
||||
|
||||
if matches!(backend.auth_backend, crate::auth::Backend::Local(_)) {
|
||||
// take session_id from request, if given.
|
||||
if let Some(id) = req
|
||||
.headers()
|
||||
.get(&NEON_REQUEST_ID)
|
||||
.and_then(|id| uuid::Uuid::try_parse_ascii(id.as_bytes()).ok())
|
||||
{
|
||||
session_id = id;
|
||||
}
|
||||
}
|
||||
let session_id = session_id.take().unwrap_or_else(uuid::Uuid::new_v4);
|
||||
|
||||
// Cancel the current inflight HTTP request if the requets stream is closed.
|
||||
// This is slightly different to `_cancel_connection` in that
|
||||
@@ -347,15 +335,8 @@ async fn connection_handler(
|
||||
.map_ok_or_else(api_error_into_response, |r| r),
|
||||
);
|
||||
async move {
|
||||
let mut res = handler.await;
|
||||
let res = handler.await;
|
||||
cancel_request.disarm();
|
||||
|
||||
// add the session ID to the response
|
||||
if let Ok(resp) = &mut res {
|
||||
resp.headers_mut()
|
||||
.append(&NEON_REQUEST_ID, uuid_to_header_value(session_id));
|
||||
}
|
||||
|
||||
res
|
||||
}
|
||||
}),
|
||||
|
||||
@@ -23,7 +23,6 @@ use typed_json::json;
|
||||
use url::Url;
|
||||
use urlencoding;
|
||||
use utils::http::error::ApiError;
|
||||
use uuid::Uuid;
|
||||
|
||||
use super::backend::{LocalProxyConnError, PoolingBackend};
|
||||
use super::conn_pool::{AuthData, ConnInfoWithAuth};
|
||||
@@ -64,8 +63,6 @@ enum Payload {
|
||||
Batch(BatchQueryData),
|
||||
}
|
||||
|
||||
pub(super) static NEON_REQUEST_ID: HeaderName = HeaderName::from_static("neon-request-id");
|
||||
|
||||
static CONN_STRING: HeaderName = HeaderName::from_static("neon-connection-string");
|
||||
static RAW_TEXT_OUTPUT: HeaderName = HeaderName::from_static("neon-raw-text-output");
|
||||
static ARRAY_MODE: HeaderName = HeaderName::from_static("neon-array-mode");
|
||||
@@ -709,12 +706,6 @@ static HEADERS_TO_FORWARD: &[&HeaderName] = &[
|
||||
&TXN_DEFERRABLE,
|
||||
];
|
||||
|
||||
pub(crate) fn uuid_to_header_value(id: Uuid) -> HeaderValue {
|
||||
let mut uuid = [0; uuid::fmt::Hyphenated::LENGTH];
|
||||
HeaderValue::from_str(id.as_hyphenated().encode_lower(&mut uuid[..]))
|
||||
.expect("uuid hyphenated format should be all valid header characters")
|
||||
}
|
||||
|
||||
async fn handle_auth_broker_inner(
|
||||
ctx: &RequestMonitoring,
|
||||
request: Request<Incoming>,
|
||||
@@ -741,7 +732,6 @@ async fn handle_auth_broker_inner(
|
||||
req = req.header(h, hv);
|
||||
}
|
||||
}
|
||||
req = req.header(&NEON_REQUEST_ID, uuid_to_header_value(ctx.session_id()));
|
||||
|
||||
let req = req
|
||||
.body(body)
|
||||
|
||||
@@ -23,7 +23,7 @@ backoff = "^2.2.1"
|
||||
pytest-lazy-fixture = "^0.6.3"
|
||||
prometheus-client = "^0.14.1"
|
||||
pytest-timeout = "^2.1.0"
|
||||
Werkzeug = "^3.0.6"
|
||||
Werkzeug = "^3.0.3"
|
||||
pytest-order = "^1.1.0"
|
||||
allure-pytest = "^2.13.2"
|
||||
pytest-asyncio = "^0.21.0"
|
||||
|
||||
@@ -193,8 +193,6 @@ struct Args {
|
||||
/// Usually, timeline eviction has to wait for `partial_backup_timeout` before being eligible for eviction,
|
||||
/// but if a timeline is un-evicted and then _not_ written to, it would immediately flap to evicting again,
|
||||
/// if it weren't for `eviction_min_resident` preventing that.
|
||||
///
|
||||
/// Also defines interval for eviction retries.
|
||||
#[arg(long, value_parser = humantime::parse_duration, default_value = DEFAULT_EVICTION_MIN_RESIDENT)]
|
||||
eviction_min_resident: Duration,
|
||||
}
|
||||
|
||||
@@ -14,10 +14,12 @@ use std::path::Path;
|
||||
use std::time::Instant;
|
||||
|
||||
use crate::control_file_upgrade::downgrade_v9_to_v8;
|
||||
use crate::control_file_upgrade::upgrade_control_file;
|
||||
use crate::metrics::PERSIST_CONTROL_FILE_SECONDS;
|
||||
use crate::state::{EvictionState, TimelinePersistentState};
|
||||
use utils::bin_ser::LeSer;
|
||||
use crate::{control_file_upgrade::upgrade_control_file, timeline::get_timeline_dir};
|
||||
use utils::{bin_ser::LeSer, id::TenantTimelineId};
|
||||
|
||||
use crate::SafeKeeperConf;
|
||||
|
||||
pub const SK_MAGIC: u32 = 0xcafeceefu32;
|
||||
pub const SK_FORMAT_VERSION: u32 = 9;
|
||||
@@ -52,12 +54,13 @@ pub struct FileStorage {
|
||||
|
||||
impl FileStorage {
|
||||
/// Initialize storage by loading state from disk.
|
||||
pub fn restore_new(timeline_dir: &Utf8Path, no_sync: bool) -> Result<FileStorage> {
|
||||
let state = Self::load_control_file_from_dir(timeline_dir)?;
|
||||
pub fn restore_new(ttid: &TenantTimelineId, conf: &SafeKeeperConf) -> Result<FileStorage> {
|
||||
let timeline_dir = get_timeline_dir(conf, ttid);
|
||||
let state = Self::load_control_file_from_dir(&timeline_dir)?;
|
||||
|
||||
Ok(FileStorage {
|
||||
timeline_dir: timeline_dir.to_path_buf(),
|
||||
no_sync,
|
||||
timeline_dir,
|
||||
no_sync: conf.no_sync,
|
||||
state,
|
||||
last_persist_at: Instant::now(),
|
||||
})
|
||||
@@ -68,16 +71,16 @@ impl FileStorage {
|
||||
/// Note: we normally call this in temp directory for atomic init, so
|
||||
/// interested in FileStorage as a result only in tests.
|
||||
pub async fn create_new(
|
||||
timeline_dir: &Utf8Path,
|
||||
dir: Utf8PathBuf,
|
||||
conf: &SafeKeeperConf,
|
||||
state: TimelinePersistentState,
|
||||
no_sync: bool,
|
||||
) -> Result<FileStorage> {
|
||||
// we don't support creating new timelines in offloaded state
|
||||
assert!(matches!(state.eviction_state, EvictionState::Present));
|
||||
|
||||
let mut store = FileStorage {
|
||||
timeline_dir: timeline_dir.to_path_buf(),
|
||||
no_sync,
|
||||
timeline_dir: dir,
|
||||
no_sync: conf.no_sync,
|
||||
state: state.clone(),
|
||||
last_persist_at: Instant::now(),
|
||||
};
|
||||
@@ -236,46 +239,89 @@ mod test {
|
||||
use tokio::fs;
|
||||
use utils::lsn::Lsn;
|
||||
|
||||
const NO_SYNC: bool = true;
|
||||
fn stub_conf() -> SafeKeeperConf {
|
||||
let workdir = camino_tempfile::tempdir().unwrap().into_path();
|
||||
SafeKeeperConf {
|
||||
workdir,
|
||||
..SafeKeeperConf::dummy()
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_read_write_safekeeper_state() -> anyhow::Result<()> {
|
||||
let tempdir = camino_tempfile::tempdir()?;
|
||||
let mut state = TimelinePersistentState::empty();
|
||||
let mut storage = FileStorage::create_new(tempdir.path(), state.clone(), NO_SYNC).await?;
|
||||
async fn load_from_control_file(
|
||||
conf: &SafeKeeperConf,
|
||||
ttid: &TenantTimelineId,
|
||||
) -> Result<(FileStorage, TimelinePersistentState)> {
|
||||
let timeline_dir = get_timeline_dir(conf, ttid);
|
||||
fs::create_dir_all(&timeline_dir)
|
||||
.await
|
||||
.expect("failed to create timeline dir");
|
||||
Ok((
|
||||
FileStorage::restore_new(ttid, conf)?,
|
||||
FileStorage::load_control_file_from_dir(&timeline_dir)?,
|
||||
))
|
||||
}
|
||||
|
||||
// Make a change.
|
||||
state.commit_lsn = Lsn(42);
|
||||
storage.persist(&state).await?;
|
||||
|
||||
// Reload the state. It should match the previously persisted state.
|
||||
let loaded_state = FileStorage::load_control_file_from_dir(tempdir.path())?;
|
||||
assert_eq!(loaded_state, state);
|
||||
Ok(())
|
||||
async fn create(
|
||||
conf: &SafeKeeperConf,
|
||||
ttid: &TenantTimelineId,
|
||||
) -> Result<(FileStorage, TimelinePersistentState)> {
|
||||
let timeline_dir = get_timeline_dir(conf, ttid);
|
||||
fs::create_dir_all(&timeline_dir)
|
||||
.await
|
||||
.expect("failed to create timeline dir");
|
||||
let state = TimelinePersistentState::empty();
|
||||
let storage = FileStorage::create_new(timeline_dir, conf, state.clone()).await?;
|
||||
Ok((storage, state))
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_safekeeper_state_checksum_mismatch() -> anyhow::Result<()> {
|
||||
let tempdir = camino_tempfile::tempdir()?;
|
||||
let mut state = TimelinePersistentState::empty();
|
||||
let mut storage = FileStorage::create_new(tempdir.path(), state.clone(), NO_SYNC).await?;
|
||||
|
||||
// Make a change.
|
||||
state.commit_lsn = Lsn(42);
|
||||
storage.persist(&state).await?;
|
||||
|
||||
// Change the first byte to fail checksum validation.
|
||||
let ctrl_path = tempdir.path().join(CONTROL_FILE_NAME);
|
||||
let mut data = fs::read(&ctrl_path).await?;
|
||||
data[0] += 1;
|
||||
fs::write(&ctrl_path, &data).await?;
|
||||
|
||||
// Loading the file should fail checksum validation.
|
||||
if let Err(err) = FileStorage::load_control_file_from_dir(tempdir.path()) {
|
||||
assert!(err.to_string().contains("control file checksum mismatch"))
|
||||
} else {
|
||||
panic!("expected checksum error")
|
||||
async fn test_read_write_safekeeper_state() {
|
||||
let conf = stub_conf();
|
||||
let ttid = TenantTimelineId::generate();
|
||||
{
|
||||
let (mut storage, mut state) =
|
||||
create(&conf, &ttid).await.expect("failed to create state");
|
||||
// change something
|
||||
state.commit_lsn = Lsn(42);
|
||||
storage
|
||||
.persist(&state)
|
||||
.await
|
||||
.expect("failed to persist state");
|
||||
}
|
||||
|
||||
let (_, state) = load_from_control_file(&conf, &ttid)
|
||||
.await
|
||||
.expect("failed to read state");
|
||||
assert_eq!(state.commit_lsn, Lsn(42));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_safekeeper_state_checksum_mismatch() {
|
||||
let conf = stub_conf();
|
||||
let ttid = TenantTimelineId::generate();
|
||||
{
|
||||
let (mut storage, mut state) =
|
||||
create(&conf, &ttid).await.expect("failed to read state");
|
||||
|
||||
// change something
|
||||
state.commit_lsn = Lsn(42);
|
||||
storage
|
||||
.persist(&state)
|
||||
.await
|
||||
.expect("failed to persist state");
|
||||
}
|
||||
let control_path = get_timeline_dir(&conf, &ttid).join(CONTROL_FILE_NAME);
|
||||
let mut data = fs::read(&control_path).await.unwrap();
|
||||
data[0] += 1; // change the first byte of the file to fail checksum validation
|
||||
fs::write(&control_path, &data)
|
||||
.await
|
||||
.expect("failed to write control file");
|
||||
|
||||
match load_from_control_file(&conf, &ttid).await {
|
||||
Err(err) => assert!(err
|
||||
.to_string()
|
||||
.contains("safekeeper control file checksum mismatch")),
|
||||
Ok(_) => panic!("expected error"),
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -154,7 +154,7 @@ pub async fn handle_request(request: Request) -> Result<()> {
|
||||
new_state.peer_horizon_lsn = request.until_lsn;
|
||||
new_state.backup_lsn = new_backup_lsn;
|
||||
|
||||
FileStorage::create_new(&tli_dir_path, new_state.clone(), conf.no_sync).await?;
|
||||
FileStorage::create_new(tli_dir_path.clone(), conf, new_state.clone()).await?;
|
||||
|
||||
// now we have a ready timeline in a temp directory
|
||||
validate_temp_timeline(conf, request.destination_ttid, &tli_dir_path).await?;
|
||||
|
||||
@@ -113,7 +113,6 @@ impl SafeKeeperConf {
|
||||
|
||||
impl SafeKeeperConf {
|
||||
#[cfg(test)]
|
||||
#[allow(unused)]
|
||||
fn dummy() -> Self {
|
||||
SafeKeeperConf {
|
||||
workdir: Utf8PathBuf::from("./"),
|
||||
|
||||
@@ -143,8 +143,8 @@ impl TimelinePersistentState {
|
||||
TimelinePersistentState::new(
|
||||
&TenantTimelineId::empty(),
|
||||
ServerInfo {
|
||||
pg_version: 170000, /* Postgres server version (major * 10000) */
|
||||
system_id: 0, /* Postgres system identifier */
|
||||
pg_version: 17, /* Postgres server version */
|
||||
system_id: 0, /* Postgres system identifier */
|
||||
wal_seg_size: 16 * 1024 * 1024,
|
||||
},
|
||||
vec![],
|
||||
|
||||
@@ -328,19 +328,15 @@ impl SharedState {
|
||||
/// Restore SharedState from control file. If file doesn't exist, bails out.
|
||||
fn restore(conf: &SafeKeeperConf, ttid: &TenantTimelineId) -> Result<Self> {
|
||||
let timeline_dir = get_timeline_dir(conf, ttid);
|
||||
let control_store = control_file::FileStorage::restore_new(&timeline_dir, conf.no_sync)?;
|
||||
let control_store = control_file::FileStorage::restore_new(ttid, conf)?;
|
||||
if control_store.server.wal_seg_size == 0 {
|
||||
bail!(TimelineError::UninitializedWalSegSize(*ttid));
|
||||
}
|
||||
|
||||
let sk = match control_store.eviction_state {
|
||||
EvictionState::Present => {
|
||||
let wal_store = wal_storage::PhysicalStorage::new(
|
||||
ttid,
|
||||
&timeline_dir,
|
||||
&control_store,
|
||||
conf.no_sync,
|
||||
)?;
|
||||
let wal_store =
|
||||
wal_storage::PhysicalStorage::new(ttid, timeline_dir, conf, &control_store)?;
|
||||
StateSK::Loaded(SafeKeeper::new(
|
||||
TimelineState::new(control_store),
|
||||
wal_store,
|
||||
@@ -1050,9 +1046,9 @@ impl ManagerTimeline {
|
||||
// trying to restore WAL storage
|
||||
let wal_store = wal_storage::PhysicalStorage::new(
|
||||
&self.ttid,
|
||||
&self.timeline_dir,
|
||||
self.timeline_dir.clone(),
|
||||
&conf,
|
||||
shared.sk.state(),
|
||||
conf.no_sync,
|
||||
)?;
|
||||
|
||||
// updating control file
|
||||
|
||||
@@ -66,15 +66,15 @@ impl Manager {
|
||||
ready
|
||||
}
|
||||
|
||||
/// Evict the timeline to remote storage. Returns whether the eviction was successful.
|
||||
/// Evict the timeline to remote storage.
|
||||
#[instrument(name = "evict_timeline", skip_all)]
|
||||
pub(crate) async fn evict_timeline(&mut self) -> bool {
|
||||
pub(crate) async fn evict_timeline(&mut self) {
|
||||
assert!(!self.is_offloaded);
|
||||
let partial_backup_uploaded = match &self.partial_backup_uploaded {
|
||||
Some(p) => p.clone(),
|
||||
None => {
|
||||
warn!("no partial backup uploaded, skipping eviction");
|
||||
return false;
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
@@ -91,12 +91,11 @@ impl Manager {
|
||||
|
||||
if let Err(e) = do_eviction(self, &partial_backup_uploaded).await {
|
||||
warn!("failed to evict timeline: {:?}", e);
|
||||
return false;
|
||||
return;
|
||||
}
|
||||
|
||||
info!("successfully evicted timeline");
|
||||
NUM_EVICTED_TIMELINES.inc();
|
||||
true
|
||||
}
|
||||
|
||||
/// Attempt to restore evicted timeline from remote storage; it must be
|
||||
|
||||
@@ -297,12 +297,7 @@ pub async fn main_task(
|
||||
match mgr.global_rate_limiter.try_acquire_eviction() {
|
||||
Some(_permit) => {
|
||||
mgr.set_status(Status::EvictTimeline);
|
||||
if !mgr.evict_timeline().await {
|
||||
// eviction failed, try again later
|
||||
mgr.evict_not_before =
|
||||
Instant::now() + rand_duration(&mgr.conf.eviction_min_resident);
|
||||
update_next_event(&mut next_event, mgr.evict_not_before);
|
||||
}
|
||||
mgr.evict_timeline().await;
|
||||
}
|
||||
None => {
|
||||
// we can't evict timeline now, will try again later
|
||||
|
||||
@@ -244,7 +244,7 @@ impl GlobalTimelines {
|
||||
// immediately initialize first WAL segment as well.
|
||||
let state =
|
||||
TimelinePersistentState::new(&ttid, server_info, vec![], commit_lsn, local_start_lsn)?;
|
||||
control_file::FileStorage::create_new(&tmp_dir_path, state, conf.no_sync).await?;
|
||||
control_file::FileStorage::create_new(tmp_dir_path.clone(), &conf, state).await?;
|
||||
let timeline = GlobalTimelines::load_temp_timeline(ttid, &tmp_dir_path, true).await?;
|
||||
Ok(timeline)
|
||||
}
|
||||
@@ -596,7 +596,7 @@ pub async fn validate_temp_timeline(
|
||||
bail!("wal_seg_size is not set");
|
||||
}
|
||||
|
||||
let wal_store = wal_storage::PhysicalStorage::new(&ttid, path, &control_store, conf.no_sync)?;
|
||||
let wal_store = wal_storage::PhysicalStorage::new(&ttid, path.clone(), conf, &control_store)?;
|
||||
|
||||
let commit_lsn = control_store.commit_lsn;
|
||||
let flush_lsn = wal_store.flush_lsn();
|
||||
|
||||
@@ -29,6 +29,7 @@ use crate::metrics::{
|
||||
};
|
||||
use crate::state::TimelinePersistentState;
|
||||
use crate::wal_backup::{read_object, remote_timeline_path};
|
||||
use crate::SafeKeeperConf;
|
||||
use postgres_ffi::waldecoder::WalStreamDecoder;
|
||||
use postgres_ffi::XLogFileName;
|
||||
use postgres_ffi::XLOG_BLCKSZ;
|
||||
@@ -86,9 +87,7 @@ pub trait Storage {
|
||||
pub struct PhysicalStorage {
|
||||
metrics: WalStorageMetrics,
|
||||
timeline_dir: Utf8PathBuf,
|
||||
|
||||
/// Disables fsync if true.
|
||||
no_sync: bool,
|
||||
conf: SafeKeeperConf,
|
||||
|
||||
/// Size of WAL segment in bytes.
|
||||
wal_seg_size: usize,
|
||||
@@ -152,9 +151,9 @@ impl PhysicalStorage {
|
||||
/// the disk. Otherwise, all LSNs are set to zero.
|
||||
pub fn new(
|
||||
ttid: &TenantTimelineId,
|
||||
timeline_dir: &Utf8Path,
|
||||
timeline_dir: Utf8PathBuf,
|
||||
conf: &SafeKeeperConf,
|
||||
state: &TimelinePersistentState,
|
||||
no_sync: bool,
|
||||
) -> Result<PhysicalStorage> {
|
||||
let wal_seg_size = state.server.wal_seg_size as usize;
|
||||
|
||||
@@ -199,8 +198,8 @@ impl PhysicalStorage {
|
||||
|
||||
Ok(PhysicalStorage {
|
||||
metrics: WalStorageMetrics::default(),
|
||||
timeline_dir: timeline_dir.to_path_buf(),
|
||||
no_sync,
|
||||
timeline_dir,
|
||||
conf: conf.clone(),
|
||||
wal_seg_size,
|
||||
pg_version: state.server.pg_version,
|
||||
system_id: state.server.system_id,
|
||||
@@ -225,7 +224,7 @@ impl PhysicalStorage {
|
||||
|
||||
/// Call fdatasync if config requires so.
|
||||
async fn fdatasync_file(&mut self, file: &File) -> Result<()> {
|
||||
if !self.no_sync {
|
||||
if !self.conf.no_sync {
|
||||
self.metrics
|
||||
.observe_flush_seconds(time_io_closure(file.sync_data()).await?);
|
||||
}
|
||||
@@ -264,7 +263,9 @@ impl PhysicalStorage {
|
||||
|
||||
// Note: this doesn't get into observe_flush_seconds metric. But
|
||||
// segment init should be separate metric, if any.
|
||||
if let Err(e) = durable_rename(&tmp_path, &wal_file_partial_path, !self.no_sync).await {
|
||||
if let Err(e) =
|
||||
durable_rename(&tmp_path, &wal_file_partial_path, !self.conf.no_sync).await
|
||||
{
|
||||
// Probably rename succeeded, but fsync of it failed. Remove
|
||||
// the file then to avoid using it.
|
||||
remove_file(wal_file_partial_path)
|
||||
|
||||
@@ -3130,11 +3130,9 @@ impl Service {
|
||||
.await?;
|
||||
|
||||
// Propagate the LSN that shard zero picked, if caller didn't provide one
|
||||
match &mut create_req.mode {
|
||||
models::TimelineCreateRequestMode::Branch { ancestor_start_lsn, .. } if ancestor_start_lsn.is_none() => {
|
||||
*ancestor_start_lsn = timeline_info.ancestor_lsn;
|
||||
},
|
||||
_ => {}
|
||||
if create_req.ancestor_timeline_id.is_some() && create_req.ancestor_start_lsn.is_none()
|
||||
{
|
||||
create_req.ancestor_start_lsn = timeline_info.ancestor_lsn;
|
||||
}
|
||||
|
||||
// Create timeline on remaining shards with number >0
|
||||
|
||||
@@ -150,7 +150,6 @@ PAGESERVER_GLOBAL_METRICS: tuple[str, ...] = (
|
||||
counter("pageserver_tenant_throttling_count_accounted_finish_global"),
|
||||
counter("pageserver_tenant_throttling_wait_usecs_sum_global"),
|
||||
counter("pageserver_tenant_throttling_count_global"),
|
||||
*histogram("pageserver_tokio_epoll_uring_slots_submission_queue_depth"),
|
||||
)
|
||||
|
||||
PAGESERVER_PER_TENANT_METRICS: tuple[str, ...] = (
|
||||
|
||||
@@ -44,14 +44,7 @@ from urllib3.util.retry import Retry
|
||||
|
||||
from fixtures import overlayfs
|
||||
from fixtures.auth_tokens import AuthKeys, TokenScope
|
||||
from fixtures.common_types import (
|
||||
Lsn,
|
||||
NodeId,
|
||||
TenantId,
|
||||
TenantShardId,
|
||||
TimelineArchivalState,
|
||||
TimelineId,
|
||||
)
|
||||
from fixtures.common_types import Lsn, NodeId, TenantId, TenantShardId, TimelineId
|
||||
from fixtures.endpoint.http import EndpointHttpClient
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.metrics import Metrics, MetricsGetter, parse_metrics
|
||||
@@ -61,11 +54,7 @@ from fixtures.pageserver.allowed_errors import (
|
||||
DEFAULT_STORAGE_CONTROLLER_ALLOWED_ERRORS,
|
||||
)
|
||||
from fixtures.pageserver.common_types import LayerName, parse_layer_file_name
|
||||
from fixtures.pageserver.http import (
|
||||
HistoricLayerInfo,
|
||||
PageserverHttpClient,
|
||||
ScanDisposableKeysResponse,
|
||||
)
|
||||
from fixtures.pageserver.http import PageserverHttpClient
|
||||
from fixtures.pageserver.utils import (
|
||||
wait_for_last_record_lsn,
|
||||
)
|
||||
@@ -2143,24 +2132,6 @@ class NeonStorageController(MetricsGetter, LogUtils):
|
||||
response.raise_for_status()
|
||||
return response.json()
|
||||
|
||||
def timeline_archival_config(
|
||||
self,
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
state: TimelineArchivalState,
|
||||
):
|
||||
config = {"state": state.value}
|
||||
log.info(
|
||||
f"requesting timeline archival config {config} for tenant {tenant_id} and timeline {timeline_id}"
|
||||
)
|
||||
res = self.request(
|
||||
"PUT",
|
||||
f"{self.api}/v1/tenant/{tenant_id}/timeline/{timeline_id}/archival_config",
|
||||
json=config,
|
||||
headers=self.headers(TokenScope.ADMIN),
|
||||
)
|
||||
return res.json()
|
||||
|
||||
def configure_failpoints(self, config_strings: tuple[str, str] | list[tuple[str, str]]):
|
||||
if isinstance(config_strings, tuple):
|
||||
pairs = [config_strings]
|
||||
@@ -2674,51 +2645,6 @@ class NeonPageserver(PgProtocol, LogUtils):
|
||||
layers = self.list_layers(tenant_id, timeline_id)
|
||||
return layer_name in [parse_layer_file_name(p.name) for p in layers]
|
||||
|
||||
def timeline_scan_no_disposable_keys(
|
||||
self, tenant_shard_id: TenantShardId, timeline_id: TimelineId
|
||||
) -> TimelineAssertNoDisposableKeysResult:
|
||||
"""
|
||||
Scan all keys in all layers of the tenant/timeline for disposable keys.
|
||||
Disposable keys are keys that are present in a layer referenced by the shard
|
||||
but are not going to be accessed by the shard.
|
||||
For example, after shard split, the child shards will reference the parent's layer
|
||||
files until new data is ingested and/or compaction rewrites the layers.
|
||||
"""
|
||||
|
||||
ps_http = self.http_client()
|
||||
tally = ScanDisposableKeysResponse(0, 0)
|
||||
per_layer = []
|
||||
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
|
||||
futs = []
|
||||
shard_layer_map = ps_http.layer_map_info(tenant_shard_id, timeline_id)
|
||||
for layer in shard_layer_map.historic_layers:
|
||||
|
||||
def do_layer(
|
||||
shard_ps_http: PageserverHttpClient,
|
||||
tenant_shard_id: TenantShardId,
|
||||
timeline_id: TimelineId,
|
||||
layer: HistoricLayerInfo,
|
||||
) -> tuple[HistoricLayerInfo, ScanDisposableKeysResponse]:
|
||||
return (
|
||||
layer,
|
||||
shard_ps_http.timeline_layer_scan_disposable_keys(
|
||||
tenant_shard_id, timeline_id, layer.layer_file_name
|
||||
),
|
||||
)
|
||||
|
||||
futs.append(executor.submit(do_layer, ps_http, tenant_shard_id, timeline_id, layer))
|
||||
for fut in futs:
|
||||
layer, result = fut.result()
|
||||
tally += result
|
||||
per_layer.append((layer, result))
|
||||
return TimelineAssertNoDisposableKeysResult(tally, per_layer)
|
||||
|
||||
|
||||
@dataclass
|
||||
class TimelineAssertNoDisposableKeysResult:
|
||||
tally: ScanDisposableKeysResponse
|
||||
per_layer: list[tuple[HistoricLayerInfo, ScanDisposableKeysResponse]]
|
||||
|
||||
|
||||
class PgBin:
|
||||
"""A helper class for executing postgres binaries"""
|
||||
|
||||
@@ -129,26 +129,6 @@ class LayerMapInfo:
|
||||
return set(x.layer_file_name for x in self.historic_layers)
|
||||
|
||||
|
||||
@dataclass
|
||||
class ScanDisposableKeysResponse:
|
||||
disposable_count: int
|
||||
not_disposable_count: int
|
||||
|
||||
def __add__(self, b):
|
||||
a = self
|
||||
assert isinstance(a, ScanDisposableKeysResponse)
|
||||
assert isinstance(b, ScanDisposableKeysResponse)
|
||||
return ScanDisposableKeysResponse(
|
||||
a.disposable_count + b.disposable_count, a.not_disposable_count + b.not_disposable_count
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def from_json(cls, d: dict[str, Any]) -> ScanDisposableKeysResponse:
|
||||
disposable_count = d["disposable_count"]
|
||||
not_disposable_count = d["not_disposable_count"]
|
||||
return ScanDisposableKeysResponse(disposable_count, not_disposable_count)
|
||||
|
||||
|
||||
@dataclass
|
||||
class TenantConfig:
|
||||
tenant_specific_overrides: dict[str, Any]
|
||||
@@ -162,19 +142,6 @@ class TenantConfig:
|
||||
)
|
||||
|
||||
|
||||
@dataclass
|
||||
class TimelinesInfoAndOffloaded:
|
||||
timelines: list[dict[str, Any]]
|
||||
offloaded: list[dict[str, Any]]
|
||||
|
||||
@classmethod
|
||||
def from_json(cls, d: dict[str, Any]) -> TimelinesInfoAndOffloaded:
|
||||
return TimelinesInfoAndOffloaded(
|
||||
timelines=d["timelines"],
|
||||
offloaded=d["offloaded"],
|
||||
)
|
||||
|
||||
|
||||
class PageserverHttpClient(requests.Session, MetricsGetter):
|
||||
def __init__(
|
||||
self,
|
||||
@@ -497,18 +464,6 @@ class PageserverHttpClient(requests.Session, MetricsGetter):
|
||||
assert isinstance(res_json, list)
|
||||
return res_json
|
||||
|
||||
def timeline_and_offloaded_list(
|
||||
self,
|
||||
tenant_id: Union[TenantId, TenantShardId],
|
||||
) -> TimelinesInfoAndOffloaded:
|
||||
res = self.get(
|
||||
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline_and_offloaded",
|
||||
)
|
||||
self.verbose_error(res)
|
||||
res_json = res.json()
|
||||
assert isinstance(res_json, dict)
|
||||
return TimelinesInfoAndOffloaded.from_json(res_json)
|
||||
|
||||
def timeline_create(
|
||||
self,
|
||||
pg_version: PgVersion,
|
||||
@@ -521,13 +476,12 @@ class PageserverHttpClient(requests.Session, MetricsGetter):
|
||||
) -> dict[Any, Any]:
|
||||
body: dict[str, Any] = {
|
||||
"new_timeline_id": str(new_timeline_id),
|
||||
"ancestor_start_lsn": str(ancestor_start_lsn) if ancestor_start_lsn else None,
|
||||
"ancestor_timeline_id": str(ancestor_timeline_id) if ancestor_timeline_id else None,
|
||||
"existing_initdb_timeline_id": str(existing_initdb_timeline_id)
|
||||
if existing_initdb_timeline_id
|
||||
else None,
|
||||
}
|
||||
if ancestor_timeline_id:
|
||||
body["ancestor_timeline_id"] = str(ancestor_timeline_id)
|
||||
if ancestor_start_lsn:
|
||||
body["ancestor_start_lsn"] = str(ancestor_start_lsn)
|
||||
if existing_initdb_timeline_id:
|
||||
body["existing_initdb_timeline_id"] = str(existing_initdb_timeline_id)
|
||||
if pg_version != PgVersion.NOT_SET:
|
||||
body["pg_version"] = int(pg_version)
|
||||
|
||||
@@ -925,16 +879,6 @@ class PageserverHttpClient(requests.Session, MetricsGetter):
|
||||
self.verbose_error(res)
|
||||
return LayerMapInfo.from_json(res.json())
|
||||
|
||||
def timeline_layer_scan_disposable_keys(
|
||||
self, tenant_id: Union[TenantId, TenantShardId], timeline_id: TimelineId, layer_name: str
|
||||
) -> ScanDisposableKeysResponse:
|
||||
res = self.post(
|
||||
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/layer/{layer_name}/scan_disposable_keys",
|
||||
)
|
||||
self.verbose_error(res)
|
||||
assert res.status_code == 200
|
||||
return ScanDisposableKeysResponse.from_json(res.json())
|
||||
|
||||
def download_layer(
|
||||
self, tenant_id: Union[TenantId, TenantShardId], timeline_id: TimelineId, layer_name: str
|
||||
):
|
||||
|
||||
52
test_runner/regress/test_lfc_prewarm.py
Normal file
52
test_runner/regress/test_lfc_prewarm.py
Normal file
@@ -0,0 +1,52 @@
|
||||
import time
|
||||
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import NeonEnv
|
||||
|
||||
|
||||
def test_lfc_prewarm(neon_simple_env: NeonEnv):
|
||||
env = neon_simple_env
|
||||
n_records = 1000000
|
||||
|
||||
endpoint = env.endpoints.create_start(
|
||||
branch_name="main",
|
||||
config_lines=[
|
||||
"autovacuum = off",
|
||||
"shared_buffers=1MB",
|
||||
"neon.max_file_cache_size=1GB",
|
||||
"neon.file_cache_size_limit=1GB",
|
||||
"neon.file_cache_prewarm_limit=1000",
|
||||
],
|
||||
)
|
||||
conn = endpoint.connect()
|
||||
cur = conn.cursor()
|
||||
cur.execute("create table t(pk integer primary key, payload text default repeat('?', 128))")
|
||||
cur.execute(f"insert into t (pk) values (generate_series(1,{n_records}))")
|
||||
|
||||
endpoint.stop()
|
||||
endpoint.start()
|
||||
|
||||
conn = endpoint.connect()
|
||||
cur = conn.cursor()
|
||||
cur.execute("create extension neon version '1.6'")
|
||||
|
||||
for _ in range(60):
|
||||
time.sleep(1) # give prewarm BGW some time to proceed
|
||||
cur.execute("select lfc_value from neon_lfc_stats where lfc_key='file_cache_used_pages'")
|
||||
lfc_used_pages = cur.fetchall()[0][0]
|
||||
log.info(f"Used LFC size: {lfc_used_pages}")
|
||||
cur.execute("select * from get_prewarm_info()")
|
||||
prewarm_info = cur.fetchall()[0]
|
||||
log.info(f"Prewarm info: {prewarm_info}")
|
||||
if prewarm_info[0] > 0:
|
||||
log.info(f"Prewarm progress: {prewarm_info[1]*100//prewarm_info[0]}%")
|
||||
if prewarm_info[0] == prewarm_info[1]:
|
||||
break
|
||||
|
||||
assert lfc_used_pages > 10000
|
||||
assert prewarm_info[0] > 0 and prewarm_info[0] == prewarm_info[1]
|
||||
|
||||
cur.execute("select sum(pk) from t")
|
||||
assert cur.fetchall()[0][0] == n_records * (n_records + 1) / 2
|
||||
|
||||
assert prewarm_info[1] > 0
|
||||
@@ -3,13 +3,10 @@
|
||||
#
|
||||
from __future__ import annotations
|
||||
|
||||
import os
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
from pathlib import Path
|
||||
from typing import TYPE_CHECKING, cast
|
||||
|
||||
import pytest
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import (
|
||||
Endpoint,
|
||||
NeonEnv,
|
||||
@@ -327,97 +324,3 @@ def test_sql_regress(
|
||||
pg_bin.run(pg_regress_command, env=env_vars, cwd=runpath)
|
||||
|
||||
post_checks(env, test_output_dir, DBNAME, endpoint)
|
||||
|
||||
|
||||
@pytest.mark.skipif(os.environ.get("BUILD_TYPE") == "debug", reason="only run with release build")
|
||||
def test_tx_abort_with_many_relations(
|
||||
neon_env_builder: NeonEnvBuilder,
|
||||
):
|
||||
"""
|
||||
This is not a pg_regress test as such, but perhaps it should be -- this test exercises postgres
|
||||
behavior when aborting a transaction with lots of relations.
|
||||
|
||||
Reproducer for https://github.com/neondatabase/neon/issues/9505
|
||||
"""
|
||||
|
||||
env = neon_env_builder.init_start()
|
||||
ep = env.endpoints.create_start(
|
||||
"main",
|
||||
tenant_id=env.initial_tenant,
|
||||
config_lines=[
|
||||
"shared_buffers=1000MB",
|
||||
"max_locks_per_transaction=16384",
|
||||
],
|
||||
)
|
||||
|
||||
# How many relations: this number is tuned to be long enough to take tens of seconds
|
||||
# if the rollback code path is buggy, tripping the test's timeout.
|
||||
n = 4000
|
||||
|
||||
def create():
|
||||
# Create many relations
|
||||
log.info(f"Creating {n} relations...")
|
||||
ep.safe_psql_many(
|
||||
[
|
||||
"BEGIN",
|
||||
f"""DO $$
|
||||
DECLARE
|
||||
i INT;
|
||||
table_name TEXT;
|
||||
BEGIN
|
||||
FOR i IN 1..{n} LOOP
|
||||
table_name := 'table_' || i;
|
||||
EXECUTE 'CREATE TABLE IF NOT EXISTS ' || table_name || ' (id SERIAL PRIMARY KEY, data TEXT)';
|
||||
END LOOP;
|
||||
END $$;
|
||||
""",
|
||||
"COMMIT",
|
||||
]
|
||||
)
|
||||
|
||||
def truncate():
|
||||
# Truncate relations, then roll back the transaction containing the truncations
|
||||
log.info(f"Truncating {n} relations...")
|
||||
ep.safe_psql_many(
|
||||
[
|
||||
"BEGIN",
|
||||
f"""DO $$
|
||||
DECLARE
|
||||
i INT;
|
||||
table_name TEXT;
|
||||
BEGIN
|
||||
FOR i IN 1..{n} LOOP
|
||||
table_name := 'table_' || i;
|
||||
EXECUTE 'TRUNCATE ' || table_name ;
|
||||
END LOOP;
|
||||
END $$;
|
||||
""",
|
||||
]
|
||||
)
|
||||
|
||||
def rollback_and_wait():
|
||||
log.info(f"Rolling back after truncating {n} relations...")
|
||||
ep.safe_psql("ROLLBACK")
|
||||
|
||||
# Restart the endpoint: this ensures that we can read back what we just wrote, i.e. pageserver
|
||||
# ingest has caught up.
|
||||
ep.stop()
|
||||
log.info(f"Starting endpoint after truncating {n} relations...")
|
||||
ep.start()
|
||||
log.info(f"Started endpoint after truncating {n} relations...")
|
||||
|
||||
# Actual create & truncate phases may be slow, these involves lots of WAL records. We do not
|
||||
# apply a special timeout, they are expected to complete within general test timeout
|
||||
create()
|
||||
truncate()
|
||||
|
||||
# Run in a thread because the failure case is to take pathologically long time, and we don't want
|
||||
# to block the test executor on that.
|
||||
with ThreadPoolExecutor(max_workers=1) as exec:
|
||||
try:
|
||||
# Rollback phase should be fast: this is one WAL record that we should process efficiently
|
||||
fut = exec.submit(rollback_and_wait)
|
||||
fut.result(timeout=5)
|
||||
except:
|
||||
exec.shutdown(wait=False, cancel_futures=True)
|
||||
raise
|
||||
|
||||
@@ -169,24 +169,23 @@ def test_readonly_node_gc(neon_env_builder: NeonEnvBuilder):
|
||||
)
|
||||
return last_flush_lsn
|
||||
|
||||
def trigger_gc_and_select(env: NeonEnv, ep_static: Endpoint, ctx: str):
|
||||
def trigger_gc_and_select(env: NeonEnv, ep_static: Endpoint):
|
||||
"""
|
||||
Trigger GC manually on all pageservers. Then run an `SELECT` query.
|
||||
"""
|
||||
for shard, ps in tenant_get_shards(env, env.initial_tenant):
|
||||
client = ps.http_client()
|
||||
gc_result = client.timeline_gc(shard, env.initial_timeline, 0)
|
||||
# Note: cannot assert on `layers_removed` here because it could be layers
|
||||
# not guarded by the lease. Rely on successful execution of the query instead.
|
||||
log.info(f"{gc_result=}")
|
||||
|
||||
assert (
|
||||
gc_result["layers_removed"] == 0
|
||||
), "No layers should be removed, old layers are guarded by leases."
|
||||
|
||||
with ep_static.cursor() as cur:
|
||||
# Following query should succeed if pages are properly guarded by leases.
|
||||
cur.execute("SELECT count(*) FROM t0")
|
||||
assert cur.fetchone() == (ROW_COUNT,)
|
||||
|
||||
log.info(f"`SELECT` query succeed after GC, {ctx=}")
|
||||
|
||||
# Insert some records on main branch
|
||||
with env.endpoints.create_start("main") as ep_main:
|
||||
with ep_main.cursor() as cur:
|
||||
@@ -211,9 +210,9 @@ def test_readonly_node_gc(neon_env_builder: NeonEnvBuilder):
|
||||
# Wait for static compute to renew lease at least once.
|
||||
time.sleep(LSN_LEASE_LENGTH / 2)
|
||||
|
||||
generate_updates_on_main(env, ep_main, 3, end=100)
|
||||
generate_updates_on_main(env, ep_main, i, end=100)
|
||||
|
||||
trigger_gc_and_select(env, ep_static, ctx="Before pageservers restart")
|
||||
trigger_gc_and_select(env, ep_static)
|
||||
|
||||
# Trigger Pageserver restarts
|
||||
for ps in env.pageservers:
|
||||
@@ -222,7 +221,7 @@ def test_readonly_node_gc(neon_env_builder: NeonEnvBuilder):
|
||||
time.sleep(LSN_LEASE_LENGTH / 2)
|
||||
ps.start()
|
||||
|
||||
trigger_gc_and_select(env, ep_static, ctx="After pageservers restart")
|
||||
trigger_gc_and_select(env, ep_static)
|
||||
|
||||
# Reconfigure pageservers
|
||||
env.pageservers[0].stop()
|
||||
@@ -231,7 +230,7 @@ def test_readonly_node_gc(neon_env_builder: NeonEnvBuilder):
|
||||
)
|
||||
env.storage_controller.reconcile_until_idle()
|
||||
|
||||
trigger_gc_and_select(env, ep_static, ctx="After putting pageserver 0 offline")
|
||||
trigger_gc_and_select(env, ep_static)
|
||||
|
||||
# Do some update so we can increment latest_gc_cutoff
|
||||
generate_updates_on_main(env, ep_main, i, end=100)
|
||||
|
||||
@@ -3,11 +3,11 @@ from __future__ import annotations
|
||||
import os
|
||||
import time
|
||||
from collections import defaultdict
|
||||
from typing import TYPE_CHECKING, Any
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
import pytest
|
||||
import requests
|
||||
from fixtures.common_types import Lsn, TenantId, TenantShardId, TimelineArchivalState, TimelineId
|
||||
from fixtures.common_types import Lsn, TenantId, TenantShardId, TimelineId
|
||||
from fixtures.compute_reconfigure import ComputeReconfigure
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import (
|
||||
@@ -188,9 +188,7 @@ def test_sharding_split_unsharded(
|
||||
"compact-shard-ancestors-persistent",
|
||||
],
|
||||
)
|
||||
def test_sharding_split_compaction(
|
||||
neon_env_builder: NeonEnvBuilder, failpoint: Optional[str], build_type: str
|
||||
):
|
||||
def test_sharding_split_compaction(neon_env_builder: NeonEnvBuilder, failpoint: Optional[str]):
|
||||
"""
|
||||
Test that after a split, we clean up parent layer data in the child shards via compaction.
|
||||
"""
|
||||
@@ -324,19 +322,9 @@ def test_sharding_split_compaction(
|
||||
# Physical size should shrink because layers are smaller
|
||||
assert detail_after["current_physical_size"] < detail_before["current_physical_size"]
|
||||
|
||||
# Validate filtering compaction actually happened
|
||||
# Validate size statistics
|
||||
for shard in shards:
|
||||
ps = env.get_tenant_pageserver(shard)
|
||||
|
||||
log.info("scan all layer files for disposable keys, there shouldn't be any")
|
||||
result = ps.timeline_scan_no_disposable_keys(shard, timeline_id)
|
||||
tally = result.tally
|
||||
raw_page_count = tally.not_disposable_count + tally.disposable_count
|
||||
assert tally.not_disposable_count > (
|
||||
raw_page_count // 2
|
||||
), "compaction doesn't rewrite layers that are >=50pct local"
|
||||
|
||||
log.info("check sizes")
|
||||
timeline_info = ps.http_client().timeline_detail(shard, timeline_id)
|
||||
reported_size = timeline_info["current_physical_size"]
|
||||
layer_paths = ps.list_layers(shard, timeline_id)
|
||||
@@ -365,145 +353,6 @@ def test_sharding_split_compaction(
|
||||
workload.validate()
|
||||
|
||||
|
||||
def test_sharding_split_offloading(neon_env_builder: NeonEnvBuilder):
|
||||
"""
|
||||
Test that during a split, we don't miss archived and offloaded timelines.
|
||||
"""
|
||||
|
||||
TENANT_CONF = {
|
||||
# small checkpointing and compaction targets to ensure we generate many upload operations
|
||||
"checkpoint_distance": 128 * 1024,
|
||||
"compaction_threshold": 1,
|
||||
"compaction_target_size": 128 * 1024,
|
||||
# no PITR horizon, we specify the horizon when we request on-demand GC
|
||||
"pitr_interval": "3600s",
|
||||
# disable background compaction, GC and offloading. We invoke it manually when we want it to happen.
|
||||
"gc_period": "0s",
|
||||
"compaction_period": "0s",
|
||||
# Disable automatic creation of image layers, as we will create them explicitly when we want them
|
||||
"image_creation_threshold": 9999,
|
||||
"image_layer_creation_check_threshold": 0,
|
||||
"lsn_lease_length": "0s",
|
||||
}
|
||||
|
||||
neon_env_builder.storage_controller_config = {
|
||||
# Default neon_local uses a small timeout: use a longer one to tolerate longer pageserver restarts.
|
||||
"max_offline": "30s",
|
||||
"max_warming_up": "300s",
|
||||
}
|
||||
|
||||
env = neon_env_builder.init_start(initial_tenant_conf=TENANT_CONF)
|
||||
tenant_id = env.initial_tenant
|
||||
timeline_id_main = env.initial_timeline
|
||||
|
||||
# Check that we created with an unsharded TenantShardId: this is the default,
|
||||
# but check it in case we change the default in future
|
||||
assert env.storage_controller.inspect(TenantShardId(tenant_id, 0, 0)) is not None
|
||||
|
||||
workload_main = Workload(env, tenant_id, timeline_id_main, branch_name="main")
|
||||
workload_main.init()
|
||||
workload_main.write_rows(256)
|
||||
workload_main.validate()
|
||||
workload_main.stop()
|
||||
|
||||
# Create two timelines, archive one, offload the other
|
||||
timeline_id_archived = env.create_branch("archived_not_offloaded")
|
||||
timeline_id_offloaded = env.create_branch("archived_offloaded")
|
||||
|
||||
def timeline_id_set_for(list: list[dict[str, Any]]) -> set[TimelineId]:
|
||||
return set(
|
||||
map(
|
||||
lambda t: TimelineId(t["timeline_id"]),
|
||||
list,
|
||||
)
|
||||
)
|
||||
|
||||
expected_offloaded_set = {timeline_id_offloaded}
|
||||
expected_timeline_set = {timeline_id_main, timeline_id_archived}
|
||||
|
||||
with env.get_tenant_pageserver(tenant_id).http_client() as http_client:
|
||||
http_client.timeline_archival_config(
|
||||
tenant_id, timeline_id_archived, TimelineArchivalState.ARCHIVED
|
||||
)
|
||||
http_client.timeline_archival_config(
|
||||
tenant_id, timeline_id_offloaded, TimelineArchivalState.ARCHIVED
|
||||
)
|
||||
http_client.timeline_offload(tenant_id, timeline_id_offloaded)
|
||||
list = http_client.timeline_and_offloaded_list(tenant_id)
|
||||
assert timeline_id_set_for(list.offloaded) == expected_offloaded_set
|
||||
assert timeline_id_set_for(list.timelines) == expected_timeline_set
|
||||
|
||||
# Do a full image layer generation before splitting
|
||||
http_client.timeline_checkpoint(
|
||||
tenant_id, timeline_id_main, force_image_layer_creation=True, wait_until_uploaded=True
|
||||
)
|
||||
|
||||
# Split one shard into two
|
||||
shards = env.storage_controller.tenant_shard_split(tenant_id, shard_count=2)
|
||||
|
||||
# Let all shards move into their stable locations, so that during subsequent steps we
|
||||
# don't have reconciles in progress (simpler to reason about what messages we expect in logs)
|
||||
env.storage_controller.reconcile_until_idle()
|
||||
|
||||
# Check we got the shard IDs we expected
|
||||
assert env.storage_controller.inspect(TenantShardId(tenant_id, 0, 2)) is not None
|
||||
assert env.storage_controller.inspect(TenantShardId(tenant_id, 1, 2)) is not None
|
||||
|
||||
workload_main.validate()
|
||||
workload_main.stop()
|
||||
|
||||
env.storage_controller.consistency_check()
|
||||
|
||||
# Ensure each shard has the same list of timelines and offloaded timelines
|
||||
for shard in shards:
|
||||
ps = env.get_tenant_pageserver(shard)
|
||||
|
||||
list = ps.http_client().timeline_and_offloaded_list(shard)
|
||||
assert timeline_id_set_for(list.offloaded) == expected_offloaded_set
|
||||
assert timeline_id_set_for(list.timelines) == expected_timeline_set
|
||||
|
||||
ps.http_client().timeline_compact(shard, timeline_id_main)
|
||||
|
||||
# Check that we can still read all the data
|
||||
workload_main.validate()
|
||||
|
||||
# Force a restart, which requires the state to be persisted.
|
||||
env.pageserver.stop()
|
||||
env.pageserver.start()
|
||||
|
||||
# Ensure each shard has the same list of timelines and offloaded timelines
|
||||
for shard in shards:
|
||||
ps = env.get_tenant_pageserver(shard)
|
||||
|
||||
list = ps.http_client().timeline_and_offloaded_list(shard)
|
||||
assert timeline_id_set_for(list.offloaded) == expected_offloaded_set
|
||||
assert timeline_id_set_for(list.timelines) == expected_timeline_set
|
||||
|
||||
ps.http_client().timeline_compact(shard, timeline_id_main)
|
||||
|
||||
# Compaction shouldn't make anything unreadable
|
||||
workload_main.validate()
|
||||
|
||||
# Do sharded unarchival
|
||||
env.storage_controller.timeline_archival_config(
|
||||
tenant_id, timeline_id_offloaded, TimelineArchivalState.UNARCHIVED
|
||||
)
|
||||
env.storage_controller.timeline_archival_config(
|
||||
tenant_id, timeline_id_archived, TimelineArchivalState.UNARCHIVED
|
||||
)
|
||||
|
||||
for shard in shards:
|
||||
ps = env.get_tenant_pageserver(shard)
|
||||
|
||||
list = ps.http_client().timeline_and_offloaded_list(shard)
|
||||
assert timeline_id_set_for(list.offloaded) == set()
|
||||
assert timeline_id_set_for(list.timelines) == {
|
||||
timeline_id_main,
|
||||
timeline_id_archived,
|
||||
timeline_id_offloaded,
|
||||
}
|
||||
|
||||
|
||||
def test_sharding_split_smoke(
|
||||
neon_env_builder: NeonEnvBuilder,
|
||||
):
|
||||
|
||||
@@ -1,12 +1,11 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import os
|
||||
import time
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
from fixtures.common_types import Lsn, TenantId
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import NeonEnv, NeonEnvBuilder
|
||||
from fixtures.utils import wait_until
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from typing import Any
|
||||
@@ -20,10 +19,6 @@ def test_pageserver_lsn_wait_error_start(neon_env_builder: NeonEnvBuilder):
|
||||
env = neon_env_builder.init_start()
|
||||
env.pageserver.http_client()
|
||||
|
||||
# In this test we force 'Timed out while waiting for WAL record error' while
|
||||
# fetching basebackup and don't want any retries.
|
||||
os.environ["NEON_COMPUTE_TESTING_BASEBACKUP_RETRIES"] = "1"
|
||||
|
||||
tenant_id, timeline_id = env.create_tenant()
|
||||
expected_timeout_error = f"Timed out while waiting for WAL record at LSN {future_lsn} to arrive"
|
||||
env.pageserver.allowed_errors.append(f".*{expected_timeout_error}.*")
|
||||
@@ -54,14 +49,11 @@ def test_pageserver_lsn_wait_error_start(neon_env_builder: NeonEnvBuilder):
|
||||
def test_pageserver_lsn_wait_error_safekeeper_stop(neon_env_builder: NeonEnvBuilder):
|
||||
# Trigger WAL wait timeout faster
|
||||
def customize_pageserver_toml(ps_cfg: dict[str, Any]):
|
||||
ps_cfg["wait_lsn_timeout"] = "2s"
|
||||
ps_cfg["wait_lsn_timeout"] = "1s"
|
||||
tenant_config = ps_cfg.setdefault("tenant_config", {})
|
||||
tenant_config["walreceiver_connect_timeout"] = "2s"
|
||||
tenant_config["lagging_wal_timeout"] = "2s"
|
||||
|
||||
# In this test we force 'Timed out while waiting for WAL record error' while
|
||||
# fetching basebackup and don't want any retries.
|
||||
os.environ["NEON_COMPUTE_TESTING_BASEBACKUP_RETRIES"] = "1"
|
||||
neon_env_builder.pageserver_config_override = customize_pageserver_toml
|
||||
|
||||
# Have notable SK ids to ensure we check logs for their presence, not some other random numbers
|
||||
@@ -72,6 +64,7 @@ def test_pageserver_lsn_wait_error_safekeeper_stop(neon_env_builder: NeonEnvBuil
|
||||
|
||||
tenant_id, timeline_id = env.create_tenant()
|
||||
|
||||
elements_to_insert = 1_000_000
|
||||
expected_timeout_error = f"Timed out while waiting for WAL record at LSN {future_lsn} to arrive"
|
||||
env.pageserver.allowed_errors.append(f".*{expected_timeout_error}.*")
|
||||
# we configure wait_lsn_timeout to a shorter value than the lagging_wal_timeout / walreceiver_connect_timeout
|
||||
@@ -81,50 +74,45 @@ def test_pageserver_lsn_wait_error_safekeeper_stop(neon_env_builder: NeonEnvBuil
|
||||
".*ingesting record with timestamp lagging more than wait_lsn_timeout.*"
|
||||
)
|
||||
|
||||
insert_test_elements(env, tenant_id, start=0, count=1)
|
||||
insert_test_elements(env, tenant_id, start=0, count=elements_to_insert)
|
||||
|
||||
def all_sks_in_wareceiver_state():
|
||||
try:
|
||||
trigger_wait_lsn_timeout(env, tenant_id)
|
||||
except Exception as e:
|
||||
exception_string = str(e)
|
||||
try:
|
||||
trigger_wait_lsn_timeout(env, tenant_id)
|
||||
except Exception as e:
|
||||
exception_string = str(e)
|
||||
assert expected_timeout_error in exception_string, "Should time out during waiting for WAL"
|
||||
|
||||
for safekeeper in env.safekeepers:
|
||||
assert (
|
||||
expected_timeout_error in exception_string
|
||||
), "Should time out during waiting for WAL"
|
||||
|
||||
for safekeeper in env.safekeepers:
|
||||
assert (
|
||||
str(safekeeper.id) in exception_string
|
||||
), f"Should have safekeeper {safekeeper.id} printed in walreceiver state after WAL wait timeout"
|
||||
|
||||
wait_until(60, 0.5, all_sks_in_wareceiver_state)
|
||||
str(safekeeper.id) in exception_string
|
||||
), f"Should have safekeeper {safekeeper.id} printed in walreceiver state after WAL wait timeout"
|
||||
|
||||
stopped_safekeeper = env.safekeepers[-1]
|
||||
stopped_safekeeper_id = stopped_safekeeper.id
|
||||
log.info(f"Stopping safekeeper {stopped_safekeeper.id}")
|
||||
stopped_safekeeper.stop()
|
||||
# sleep until stopped safekeeper is removed from candidates
|
||||
time.sleep(2)
|
||||
|
||||
def all_but_stopped_sks_in_wareceiver_state():
|
||||
try:
|
||||
trigger_wait_lsn_timeout(env, tenant_id)
|
||||
except Exception as e:
|
||||
# Strip out the part before stdout, as it contains full command with the list of all safekeepers
|
||||
exception_string = str(e).split("stdout", 1)[-1]
|
||||
assert (
|
||||
expected_timeout_error in exception_string
|
||||
), "Should time out during waiting for WAL"
|
||||
# Spend some more time inserting, to ensure SKs report updated statuses and walreceiver in PS have time to update its connection stats.
|
||||
insert_test_elements(env, tenant_id, start=elements_to_insert + 1, count=elements_to_insert)
|
||||
|
||||
for safekeeper in env.safekeepers:
|
||||
if safekeeper.id == stopped_safekeeper_id:
|
||||
assert (
|
||||
str(safekeeper.id) not in exception_string
|
||||
), f"Should not have stopped safekeeper {safekeeper.id} printed in walreceiver state after 2nd WAL wait timeout"
|
||||
else:
|
||||
assert (
|
||||
str(safekeeper.id) in exception_string
|
||||
), f"Should have safekeeper {safekeeper.id} printed in walreceiver state after 2nd WAL wait timeout"
|
||||
try:
|
||||
trigger_wait_lsn_timeout(env, tenant_id)
|
||||
except Exception as e:
|
||||
# Strip out the part before stdout, as it contains full command with the list of all safekeepers
|
||||
exception_string = str(e).split("stdout", 1)[-1]
|
||||
assert expected_timeout_error in exception_string, "Should time out during waiting for WAL"
|
||||
|
||||
wait_until(60, 0.5, all_but_stopped_sks_in_wareceiver_state)
|
||||
for safekeeper in env.safekeepers:
|
||||
if safekeeper.id == stopped_safekeeper_id:
|
||||
assert (
|
||||
str(safekeeper.id) not in exception_string
|
||||
), f"Should not have stopped safekeeper {safekeeper.id} printed in walreceiver state after 2nd WAL wait timeout"
|
||||
else:
|
||||
assert (
|
||||
str(safekeeper.id) in exception_string
|
||||
), f"Should have safekeeper {safekeeper.id} printed in walreceiver state after 2nd WAL wait timeout"
|
||||
|
||||
|
||||
def insert_test_elements(env: NeonEnv, tenant_id: TenantId, start: int, count: int):
|
||||
|
||||
2
vendor/postgres-v14
vendored
2
vendor/postgres-v14
vendored
Submodule vendor/postgres-v14 updated: 2199b83fb7...ecb1020ff7
2
vendor/postgres-v15
vendored
2
vendor/postgres-v15
vendored
Submodule vendor/postgres-v15 updated: 22e580fe9f...a4830163a6
2
vendor/postgres-v16
vendored
2
vendor/postgres-v16
vendored
Submodule vendor/postgres-v16 updated: e131a9c027...cc36e03bd0
2
vendor/postgres-v17
vendored
2
vendor/postgres-v17
vendored
Submodule vendor/postgres-v17 updated: 68b5038f27...37d5ead146
8
vendor/revisions.json
vendored
8
vendor/revisions.json
vendored
@@ -1,18 +1,18 @@
|
||||
{
|
||||
"v17": [
|
||||
"17.0",
|
||||
"68b5038f27e493bde6ae552fe066f10cbdfe6a14"
|
||||
"37d5ead146b028dd9a5c07e7a37068ec0df9f465"
|
||||
],
|
||||
"v16": [
|
||||
"16.4",
|
||||
"e131a9c027b202ce92bd7b9cf2569d48a6f9948e"
|
||||
"cc36e03bd0c927022cf3b3563e291e42d75366a1"
|
||||
],
|
||||
"v15": [
|
||||
"15.8",
|
||||
"22e580fe9ffcea7e02592110b1c9bf426d83cada"
|
||||
"a4830163a65811578824ce4022c1cd3daef33d4e"
|
||||
],
|
||||
"v14": [
|
||||
"14.13",
|
||||
"2199b83fb72680001ce0f43bf6187a21dfb8f45d"
|
||||
"ecb1020ff71927e9dd59c526254bb8846bb73ee1"
|
||||
]
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user