Compare commits

..

14 Commits

Author SHA1 Message Date
Konstantin Knizhnik
8852e8a766 Fix prewarming terminatoin condition 2024-10-27 14:14:06 +02:00
Konstantin Knizhnik
c04ae556c5 Fix comments and other minor refactoring 2024-10-27 09:08:40 +02:00
Konstantin Knizhnik
44b283e107 Increase timeout in test_lfc_prewarm 2024-10-26 08:22:53 +03:00
Konstantin Knizhnik
c7a3359edd Add prewarm_local_cache and get_local_cache_state functions 2024-10-25 23:05:07 +03:00
Konstantin Knizhnik
012a8a360f Fix get_prewarm_info() 2024-10-25 10:11:58 +03:00
Konstantin Knizhnik
0b42695983 Report prewarm progress 2024-10-25 08:26:02 +03:00
Konstantin Knizhnik
284d7b4da6 Report prewarm progress 2024-10-25 08:26:00 +03:00
Konstantin Knizhnik
361fc04cd6 Fix warnings 2024-10-25 08:24:45 +03:00
Konstantin Knizhnik
12f635aa04 Fix warnings 2024-10-25 08:24:45 +03:00
Konstantin Knizhnik
ac6c53b94b Support prewarming of replica 2024-10-25 08:24:45 +03:00
Konstantin Knizhnik
df289738b8 Explain why we do not want to perform prewarm of LFC at replica and how it is avoided now 2024-10-25 08:24:45 +03:00
Konstantin Knizhnik
0d3503a187 Check for number of used pages rather than chunks in test_lfc_prewarm.py 2024-10-25 08:24:44 +03:00
Konstantin Knizhnik
f328d497e1 Wait LFC prewarm completion in the loop in test_lfc_prewarm.py 2024-10-25 08:24:44 +03:00
Konstantin Knizhnik
f971c3a786 Implement LFC prewarm 2024-10-25 08:24:44 +03:00
64 changed files with 1338 additions and 1853 deletions

2
.gitignore vendored
View File

@@ -6,8 +6,6 @@ __pycache__/
test_output/
.vscode
.idea
*.swp
tags
neon.iml
/.neon
/integration_tests/.neon

4
Cargo.lock generated
View File

@@ -6272,7 +6272,7 @@ dependencies = [
[[package]]
name = "tokio-epoll-uring"
version = "0.1.0"
source = "git+https://github.com/neondatabase/tokio-epoll-uring.git?branch=main#cb2dcea2058034bc209e7917b01c5097712a3168"
source = "git+https://github.com/neondatabase/tokio-epoll-uring.git?branch=main#08ccfa94ff5507727bf4d8d006666b5b192e04c6"
dependencies = [
"futures",
"nix 0.26.4",
@@ -6788,7 +6788,7 @@ dependencies = [
[[package]]
name = "uring-common"
version = "0.1.0"
source = "git+https://github.com/neondatabase/tokio-epoll-uring.git?branch=main#cb2dcea2058034bc209e7917b01c5097712a3168"
source = "git+https://github.com/neondatabase/tokio-epoll-uring.git?branch=main#08ccfa94ff5507727bf4d8d006666b5b192e04c6"
dependencies = [
"bytes",
"io-uring",

View File

@@ -1073,10 +1073,10 @@ async fn handle_tenant(subcmd: &TenantCmd, env: &mut local_env::LocalEnv) -> any
tenant_id,
TimelineCreateRequest {
new_timeline_id,
mode: pageserver_api::models::TimelineCreateRequestMode::Bootstrap {
existing_initdb_timeline_id: None,
pg_version: Some(args.pg_version),
},
ancestor_timeline_id: None,
ancestor_start_lsn: None,
existing_initdb_timeline_id: None,
pg_version: Some(args.pg_version),
},
)
.await?;
@@ -1133,10 +1133,10 @@ async fn handle_timeline(cmd: &TimelineCmd, env: &mut local_env::LocalEnv) -> Re
let storage_controller = StorageController::from_env(env);
let create_req = TimelineCreateRequest {
new_timeline_id,
mode: pageserver_api::models::TimelineCreateRequestMode::Bootstrap {
existing_initdb_timeline_id: None,
pg_version: Some(args.pg_version),
},
ancestor_timeline_id: None,
existing_initdb_timeline_id: None,
ancestor_start_lsn: None,
pg_version: Some(args.pg_version),
};
let timeline_info = storage_controller
.tenant_timeline_create(tenant_id, create_req)
@@ -1189,11 +1189,10 @@ async fn handle_timeline(cmd: &TimelineCmd, env: &mut local_env::LocalEnv) -> Re
let storage_controller = StorageController::from_env(env);
let create_req = TimelineCreateRequest {
new_timeline_id,
mode: pageserver_api::models::TimelineCreateRequestMode::Branch {
ancestor_timeline_id,
ancestor_start_lsn: start_lsn,
pg_version: None,
},
ancestor_timeline_id: Some(ancestor_timeline_id),
existing_initdb_timeline_id: None,
ancestor_start_lsn: start_lsn,
pg_version: None,
};
let timeline_info = storage_controller
.tenant_timeline_create(tenant_id, create_req)

View File

@@ -529,6 +529,28 @@ impl PageServerNode {
Ok(self.http_client.list_timelines(*tenant_shard_id).await?)
}
pub async fn timeline_create(
&self,
tenant_shard_id: TenantShardId,
new_timeline_id: TimelineId,
ancestor_start_lsn: Option<Lsn>,
ancestor_timeline_id: Option<TimelineId>,
pg_version: Option<u32>,
existing_initdb_timeline_id: Option<TimelineId>,
) -> anyhow::Result<TimelineInfo> {
let req = models::TimelineCreateRequest {
new_timeline_id,
ancestor_start_lsn,
ancestor_timeline_id,
pg_version,
existing_initdb_timeline_id,
};
Ok(self
.http_client
.timeline_create(tenant_shard_id, &req)
.await?)
}
/// Import a basebackup prepared using either:
/// a) `pg_basebackup -F tar`, or
/// b) The `fullbackup` pageserver endpoint

View File

@@ -19,7 +19,6 @@ use once_cell::sync::Lazy;
use prometheus::core::{
Atomic, AtomicU64, Collector, GenericCounter, GenericCounterVec, GenericGauge, GenericGaugeVec,
};
pub use prometheus::local::LocalHistogram;
pub use prometheus::opts;
pub use prometheus::register;
pub use prometheus::Error;

View File

@@ -211,30 +211,13 @@ pub enum TimelineState {
#[derive(Serialize, Deserialize, Clone)]
pub struct TimelineCreateRequest {
pub new_timeline_id: TimelineId,
#[serde(flatten)]
pub mode: TimelineCreateRequestMode,
}
#[derive(Serialize, Deserialize, Clone)]
#[serde(untagged)]
pub enum TimelineCreateRequestMode {
Branch {
ancestor_timeline_id: TimelineId,
#[serde(default)]
ancestor_start_lsn: Option<Lsn>,
// TODO: cplane sets this, but, the branching code always
// inherits the ancestor's pg_version. Earlier code wasn't
// using a flattened enum, so, it was an accepted field, and
// we continue to accept it by having it here.
pg_version: Option<u32>,
},
// NB: Bootstrap is all-optional, and thus the serde(untagged) will cause serde to stop at Bootstrap.
// (serde picks the first matching enum variant, in declaration order).
Bootstrap {
#[serde(default)]
existing_initdb_timeline_id: Option<TimelineId>,
pg_version: Option<u32>,
},
#[serde(default)]
pub ancestor_timeline_id: Option<TimelineId>,
#[serde(default)]
pub existing_initdb_timeline_id: Option<TimelineId>,
#[serde(default)]
pub ancestor_start_lsn: Option<Lsn>,
pub pg_version: Option<u32>,
}
#[derive(Serialize, Deserialize, Clone)]
@@ -1068,12 +1051,6 @@ pub mod virtual_file {
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ScanDisposableKeysResponse {
pub disposable_count: usize,
pub not_disposable_count: usize,
}
// Wrapped in libpq CopyData
#[derive(PartialEq, Eq, Debug)]
pub enum PagestreamFeMessage {

View File

@@ -357,20 +357,22 @@ impl RemoteStorage for LocalFs {
.list_recursive(prefix)
.await
.map_err(DownloadError::Other)?;
let mut objects = Vec::with_capacity(keys.len());
for key in keys {
let path = key.with_base(&self.storage_root);
let metadata = file_metadata(&path).await?;
if metadata.is_dir() {
continue;
}
objects.push(ListingObject {
key: key.clone(),
last_modified: metadata.modified()?,
size: metadata.len(),
});
}
let objects = objects;
let objects = keys
.into_iter()
.filter_map(|k| {
let path = k.with_base(&self.storage_root);
if path.is_dir() {
None
} else {
Some(ListingObject {
key: k.clone(),
// LocalFs is just for testing, so just specify a dummy time
last_modified: SystemTime::now(),
size: 0,
})
}
})
.collect();
if let ListingMode::NoDelimiter = mode {
result.keys = objects;
@@ -408,8 +410,9 @@ impl RemoteStorage for LocalFs {
} else {
result.keys.push(ListingObject {
key: RemotePath::from_string(&relative_key).unwrap(),
last_modified: object.last_modified,
size: object.size,
// LocalFs is just for testing
last_modified: SystemTime::now(),
size: 0,
});
}
}

View File

@@ -345,6 +345,7 @@ impl AuxFileV2 {
AuxFileV2::Recognized("pg_logical/replorigin_checkpoint", hash)
}
(2, 1) => AuxFileV2::Recognized("pg_replslot/", hash),
(4, 1) => AuxFileV2::Recognized("lfc.state", hash),
(1, 0xff) => AuxFileV2::OtherWithPrefix("pg_logical/", hash),
(0xff, 0xff) => AuxFileV2::Other(hash),
_ => return None,

View File

@@ -39,6 +39,7 @@ fn aux_hash_to_metadata_key(dir_level1: u8, dir_level2: u8, data: &[u8]) -> Key
const AUX_DIR_PG_LOGICAL: u8 = 0x01;
const AUX_DIR_PG_REPLSLOT: u8 = 0x02;
const AUX_DIR_LFC_STATE: u8 = 0x04;
const AUX_DIR_PG_UNKNOWN: u8 = 0xFF;
/// Encode the aux file into a fixed-size key.
@@ -75,6 +76,8 @@ pub fn encode_aux_file_key(path: &str) -> Key {
aux_hash_to_metadata_key(AUX_DIR_PG_LOGICAL, 0xFF, fname.as_bytes())
} else if let Some(fname) = path.strip_prefix("pg_replslot/") {
aux_hash_to_metadata_key(AUX_DIR_PG_REPLSLOT, 0x01, fname.as_bytes())
} else if let Some(fname) = path.strip_prefix("lfc.state") {
aux_hash_to_metadata_key(AUX_DIR_LFC_STATE, 0x01, fname.as_bytes())
} else {
if cfg!(debug_assertions) {
warn!(

View File

@@ -597,10 +597,6 @@ paths:
Create a timeline. Returns new timeline id on success.
Recreating the same timeline will succeed if the parameters match the existing timeline.
If no pg_version is specified, assume DEFAULT_PG_VERSION hardcoded in the pageserver.
To ensure durability, the caller must retry the creation until success.
Just because the timeline is visible via other endpoints does not mean it is durable.
Future versions may stop showing timelines that are not yet durable.
requestBody:
content:
application/json:

View File

@@ -38,7 +38,6 @@ use pageserver_api::models::TenantShardSplitRequest;
use pageserver_api::models::TenantShardSplitResponse;
use pageserver_api::models::TenantSorting;
use pageserver_api::models::TimelineArchivalConfigRequest;
use pageserver_api::models::TimelineCreateRequestMode;
use pageserver_api::models::TimelinesInfoAndOffloaded;
use pageserver_api::models::TopTenantShardItem;
use pageserver_api::models::TopTenantShardsRequest;
@@ -86,7 +85,6 @@ use crate::tenant::timeline::Timeline;
use crate::tenant::GetTimelineError;
use crate::tenant::OffloadedTimeline;
use crate::tenant::{LogicalSizeCalculationCause, PageReconstructError};
use crate::DEFAULT_PG_VERSION;
use crate::{disk_usage_eviction_task, tenant};
use pageserver_api::models::{
StatusResponse, TenantConfigRequest, TenantInfo, TimelineCreateRequest, TimelineGcRequest,
@@ -549,26 +547,6 @@ async fn timeline_create_handler(
check_permission(&request, Some(tenant_shard_id.tenant_id))?;
let new_timeline_id = request_data.new_timeline_id;
// fill in the default pg_version if not provided & convert request into domain model
let params: tenant::CreateTimelineParams = match request_data.mode {
TimelineCreateRequestMode::Bootstrap {
existing_initdb_timeline_id,
pg_version,
} => tenant::CreateTimelineParams::Bootstrap(tenant::CreateTimelineParamsBootstrap {
new_timeline_id,
existing_initdb_timeline_id,
pg_version: pg_version.unwrap_or(DEFAULT_PG_VERSION),
}),
TimelineCreateRequestMode::Branch {
ancestor_timeline_id,
ancestor_start_lsn,
pg_version: _,
} => tenant::CreateTimelineParams::Branch(tenant::CreateTimelineParamsBranch {
new_timeline_id,
ancestor_timeline_id,
ancestor_start_lsn,
}),
};
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Error);
@@ -581,12 +559,22 @@ async fn timeline_create_handler(
tenant.wait_to_become_active(ACTIVE_TENANT_TIMEOUT).await?;
// earlier versions of the code had pg_version and ancestor_lsn in the span
// => continue to provide that information, but, through a log message that doesn't require us to destructure
tracing::info!(?params, "creating timeline");
if let Some(ancestor_id) = request_data.ancestor_timeline_id.as_ref() {
tracing::info!(%ancestor_id, "starting to branch");
} else {
tracing::info!("bootstrapping");
}
match tenant
.create_timeline(params, state.broker_client.clone(), &ctx)
.create_timeline(
new_timeline_id,
request_data.ancestor_timeline_id,
request_data.ancestor_start_lsn,
request_data.pg_version.unwrap_or(crate::DEFAULT_PG_VERSION),
request_data.existing_initdb_timeline_id,
state.broker_client.clone(),
&ctx,
)
.await
{
Ok(new_timeline) => {
@@ -637,6 +625,8 @@ async fn timeline_create_handler(
tenant_id = %tenant_shard_id.tenant_id,
shard_id = %tenant_shard_id.shard_slug(),
timeline_id = %new_timeline_id,
lsn=?request_data.ancestor_start_lsn,
pg_version=?request_data.pg_version
))
.await
}
@@ -1293,99 +1283,6 @@ async fn layer_map_info_handler(
json_response(StatusCode::OK, layer_map_info)
}
#[instrument(skip_all, fields(tenant_id, shard_id, timeline_id, layer_name))]
async fn timeline_layer_scan_disposable_keys(
request: Request<Body>,
cancel: CancellationToken,
) -> Result<Response<Body>, ApiError> {
let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
let layer_name: LayerName = parse_request_param(&request, "layer_name")?;
tracing::Span::current().record(
"tenant_id",
tracing::field::display(&tenant_shard_id.tenant_id),
);
tracing::Span::current().record(
"shard_id",
tracing::field::display(tenant_shard_id.shard_slug()),
);
tracing::Span::current().record("timeline_id", tracing::field::display(&timeline_id));
tracing::Span::current().record("layer_name", tracing::field::display(&layer_name));
let state = get_state(&request);
check_permission(&request, Some(tenant_shard_id.tenant_id))?;
// technically the timeline need not be active for this scan to complete
let timeline =
active_timeline_of_active_tenant(&state.tenant_manager, tenant_shard_id, timeline_id)
.await?;
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
let guard = timeline.layers.read().await;
let Some(layer) = guard.try_get_from_key(&layer_name.clone().into()) else {
return Err(ApiError::NotFound(
anyhow::anyhow!("Layer {tenant_shard_id}/{timeline_id}/{layer_name} not found").into(),
));
};
let resident_layer = layer
.download_and_keep_resident()
.await
.map_err(|err| match err {
tenant::storage_layer::layer::DownloadError::TimelineShutdown
| tenant::storage_layer::layer::DownloadError::DownloadCancelled => {
ApiError::ShuttingDown
}
tenant::storage_layer::layer::DownloadError::ContextAndConfigReallyDeniesDownloads
| tenant::storage_layer::layer::DownloadError::DownloadRequired
| tenant::storage_layer::layer::DownloadError::NotFile(_)
| tenant::storage_layer::layer::DownloadError::DownloadFailed
| tenant::storage_layer::layer::DownloadError::PreStatFailed(_) => {
ApiError::InternalServerError(err.into())
}
#[cfg(test)]
tenant::storage_layer::layer::DownloadError::Failpoint(_) => {
ApiError::InternalServerError(err.into())
}
})?;
let keys = resident_layer
.load_keys(&ctx)
.await
.map_err(ApiError::InternalServerError)?;
let shard_identity = timeline.get_shard_identity();
let mut disposable_count = 0;
let mut not_disposable_count = 0;
let cancel = cancel.clone();
for (i, key) in keys.into_iter().enumerate() {
if shard_identity.is_key_disposable(&key) {
disposable_count += 1;
tracing::debug!(key = %key, key.dbg=?key, "disposable key");
} else {
not_disposable_count += 1;
}
#[allow(clippy::collapsible_if)]
if i % 10000 == 0 {
if cancel.is_cancelled() || timeline.cancel.is_cancelled() || timeline.is_stopping() {
return Err(ApiError::ShuttingDown);
}
}
}
json_response(
StatusCode::OK,
pageserver_api::models::ScanDisposableKeysResponse {
disposable_count,
not_disposable_count,
},
)
}
async fn layer_download_handler(
request: Request<Body>,
_cancel: CancellationToken,
@@ -3248,10 +3145,6 @@ pub fn make_router(
"/v1/tenant/:tenant_shard_id/timeline/:timeline_id/layer/:layer_file_name",
|r| api_handler(r, evict_timeline_layer_handler),
)
.post(
"/v1/tenant/:tenant_shard_id/timeline/:timeline_id/layer/:layer_name/scan_disposable_keys",
|r| testing_api_handler("timeline_layer_scan_disposable_keys", r, timeline_layer_scan_disposable_keys),
)
.post(
"/v1/tenant/:tenant_shard_id/timeline/:timeline_id/block_gc",
|r| api_handler(r, timeline_gc_blocking_handler),

View File

@@ -3040,111 +3040,13 @@ impl<F: Future<Output = Result<O, E>>, O, E> Future for MeasuredRemoteOp<F> {
}
pub mod tokio_epoll_uring {
use std::{
collections::HashMap,
sync::{Arc, Mutex},
};
use metrics::{register_histogram, register_int_counter, Histogram, LocalHistogram, UIntGauge};
use metrics::{register_int_counter, UIntGauge};
use once_cell::sync::Lazy;
/// Shared storage for tokio-epoll-uring thread local metrics.
pub(crate) static THREAD_LOCAL_METRICS_STORAGE: Lazy<ThreadLocalMetricsStorage> =
Lazy::new(|| {
let slots_submission_queue_depth = register_histogram!(
"pageserver_tokio_epoll_uring_slots_submission_queue_depth",
"The slots waiters queue depth of each tokio_epoll_uring system",
vec![1.0, 2.0, 4.0, 8.0, 16.0, 32.0, 64.0, 128.0, 256.0, 512.0, 1024.0],
)
.expect("failed to define a metric");
ThreadLocalMetricsStorage {
observers: Mutex::new(HashMap::new()),
slots_submission_queue_depth,
}
});
pub struct ThreadLocalMetricsStorage {
/// List of thread local metrics observers.
observers: Mutex<HashMap<u64, Arc<ThreadLocalMetrics>>>,
/// A histogram shared between all thread local systems
/// for collecting slots submission queue depth.
slots_submission_queue_depth: Histogram,
}
/// Each thread-local [`tokio_epoll_uring::System`] gets one of these as its
/// [`tokio_epoll_uring::metrics::PerSystemMetrics`] generic.
///
/// The System makes observations into [`Self`] and periodically, the collector
/// comes along and flushes [`Self`] into the shared storage [`THREAD_LOCAL_METRICS_STORAGE`].
///
/// [`LocalHistogram`] is `!Send`, so, we need to put it behind a [`Mutex`].
/// But except for the periodic flush, the lock is uncontended so there's no waiting
/// for cache coherence protocol to get an exclusive cache line.
pub struct ThreadLocalMetrics {
/// Local observer of thread local tokio-epoll-uring system's slots waiters queue depth.
slots_submission_queue_depth: Mutex<LocalHistogram>,
}
impl ThreadLocalMetricsStorage {
/// Registers a new thread local system. Returns a thread local metrics observer.
pub fn register_system(&self, id: u64) -> Arc<ThreadLocalMetrics> {
let per_system_metrics = Arc::new(ThreadLocalMetrics::new(
self.slots_submission_queue_depth.local(),
));
let mut g = self.observers.lock().unwrap();
g.insert(id, Arc::clone(&per_system_metrics));
per_system_metrics
}
/// Removes metrics observer for a thread local system.
/// This should be called before dropping a thread local system.
pub fn remove_system(&self, id: u64) {
let mut g = self.observers.lock().unwrap();
g.remove(&id);
}
/// Flush all thread local metrics to the shared storage.
pub fn flush_thread_local_metrics(&self) {
let g = self.observers.lock().unwrap();
g.values().for_each(|local| {
local.flush();
});
}
}
impl ThreadLocalMetrics {
pub fn new(slots_submission_queue_depth: LocalHistogram) -> Self {
ThreadLocalMetrics {
slots_submission_queue_depth: Mutex::new(slots_submission_queue_depth),
}
}
/// Flushes the thread local metrics to shared aggregator.
pub fn flush(&self) {
let Self {
slots_submission_queue_depth,
} = self;
slots_submission_queue_depth.lock().unwrap().flush();
}
}
impl tokio_epoll_uring::metrics::PerSystemMetrics for ThreadLocalMetrics {
fn observe_slots_submission_queue_depth(&self, queue_depth: u64) {
let Self {
slots_submission_queue_depth,
} = self;
slots_submission_queue_depth
.lock()
.unwrap()
.observe(queue_depth as f64);
}
}
pub struct Collector {
descs: Vec<metrics::core::Desc>,
systems_created: UIntGauge,
systems_destroyed: UIntGauge,
thread_local_metrics_storage: &'static ThreadLocalMetricsStorage,
}
impl metrics::core::Collector for Collector {
@@ -3154,7 +3056,7 @@ pub mod tokio_epoll_uring {
fn collect(&self) -> Vec<metrics::proto::MetricFamily> {
let mut mfs = Vec::with_capacity(Self::NMETRICS);
let tokio_epoll_uring::metrics::GlobalMetrics {
let tokio_epoll_uring::metrics::Metrics {
systems_created,
systems_destroyed,
} = tokio_epoll_uring::metrics::global();
@@ -3162,21 +3064,12 @@ pub mod tokio_epoll_uring {
mfs.extend(self.systems_created.collect());
self.systems_destroyed.set(systems_destroyed);
mfs.extend(self.systems_destroyed.collect());
self.thread_local_metrics_storage
.flush_thread_local_metrics();
mfs.extend(
self.thread_local_metrics_storage
.slots_submission_queue_depth
.collect(),
);
mfs
}
}
impl Collector {
const NMETRICS: usize = 3;
const NMETRICS: usize = 2;
#[allow(clippy::new_without_default)]
pub fn new() -> Self {
@@ -3208,7 +3101,6 @@ pub mod tokio_epoll_uring {
descs,
systems_created,
systems_destroyed,
thread_local_metrics_storage: &THREAD_LOCAL_METRICS_STORAGE,
}
}
}
@@ -3568,7 +3460,6 @@ pub fn preinitialize_metrics() {
Lazy::force(&RECONSTRUCT_TIME);
Lazy::force(&BASEBACKUP_QUERY_TIME);
Lazy::force(&COMPUTE_COMMANDS_COUNTERS);
Lazy::force(&tokio_epoll_uring::THREAD_LOCAL_METRICS_STORAGE);
tenant_throttling::preinitialize_global_metrics();
}

View File

@@ -1506,42 +1506,35 @@ impl<'a> DatadirModification<'a> {
Ok(())
}
/// Drop some relations
pub(crate) async fn put_rel_drops(
&mut self,
drop_relations: HashMap<(u32, u32), Vec<RelTag>>,
ctx: &RequestContext,
) -> anyhow::Result<()> {
for ((spc_node, db_node), rel_tags) in drop_relations {
let dir_key = rel_dir_to_key(spc_node, db_node);
let buf = self.get(dir_key, ctx).await?;
let mut dir = RelDirectory::des(&buf)?;
/// Drop a relation.
pub async fn put_rel_drop(&mut self, rel: RelTag, ctx: &RequestContext) -> anyhow::Result<()> {
anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
let mut dirty = false;
for rel_tag in rel_tags {
if dir.rels.remove(&(rel_tag.relnode, rel_tag.forknum)) {
dirty = true;
// Remove it from the directory entry
let dir_key = rel_dir_to_key(rel.spcnode, rel.dbnode);
let buf = self.get(dir_key, ctx).await?;
let mut dir = RelDirectory::des(&buf)?;
// update logical size
let size_key = rel_size_to_key(rel_tag);
let old_size = self.get(size_key, ctx).await?.get_u32_le();
self.pending_nblocks -= old_size as i64;
self.pending_directory_entries
.push((DirectoryKind::Rel, dir.rels.len()));
// Remove entry from relation size cache
self.tline.remove_cached_rel_size(&rel_tag);
// Delete size entry, as well as all blocks
self.delete(rel_key_range(rel_tag));
}
}
if dirty {
self.put(dir_key, Value::Image(Bytes::from(RelDirectory::ser(&dir)?)));
self.pending_directory_entries
.push((DirectoryKind::Rel, dir.rels.len()));
}
if dir.rels.remove(&(rel.relnode, rel.forknum)) {
self.put(dir_key, Value::Image(Bytes::from(RelDirectory::ser(&dir)?)));
} else {
warn!("dropped rel {} did not exist in rel directory", rel);
}
// update logical size
let size_key = rel_size_to_key(rel);
let old_size = self.get(size_key, ctx).await?.get_u32_le();
self.pending_nblocks -= old_size as i64;
// Remove enty from relation size cache
self.tline.remove_cached_rel_size(&rel);
// Delete size entry, as well as all blocks
self.delete(rel_key_range(rel));
Ok(())
}

View File

@@ -294,11 +294,11 @@ pub struct Tenant {
/// During timeline creation, we first insert the TimelineId to the
/// creating map, then `timelines`, then remove it from the creating map.
/// **Lock order**: if acquiring all (or a subset), acquire them in order `timelines`, `timelines_offloaded`, `timelines_creating`
/// **Lock order**: if acquiring both, acquire`timelines` before `timelines_creating`
timelines_creating: std::sync::Mutex<HashSet<TimelineId>>,
/// Possibly offloaded and archived timelines
/// **Lock order**: if acquiring all (or a subset), acquire them in order `timelines`, `timelines_offloaded`, `timelines_creating`
/// **Lock order**: if acquiring both, acquire`timelines` before `timelines_offloaded`
timelines_offloaded: Mutex<HashMap<TimelineId, Arc<OffloadedTimeline>>>,
// This mutex prevents creation of new timelines during GC.
@@ -584,40 +584,30 @@ impl OffloadedTimeline {
}
}
impl fmt::Debug for OffloadedTimeline {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "OffloadedTimeline<{}>", self.timeline_id)
}
}
#[derive(Copy, Clone, PartialEq, Eq, Hash, Debug)]
pub enum MaybeOffloaded {
Yes,
No,
}
#[derive(Clone, Debug)]
#[derive(Clone)]
pub enum TimelineOrOffloaded {
Timeline(Arc<Timeline>),
Offloaded(Arc<OffloadedTimeline>),
}
impl TimelineOrOffloaded {
pub fn arc_ref(&self) -> TimelineOrOffloadedArcRef<'_> {
pub fn tenant_shard_id(&self) -> TenantShardId {
match self {
TimelineOrOffloaded::Timeline(timeline) => {
TimelineOrOffloadedArcRef::Timeline(timeline)
}
TimelineOrOffloaded::Offloaded(offloaded) => {
TimelineOrOffloadedArcRef::Offloaded(offloaded)
}
TimelineOrOffloaded::Timeline(timeline) => timeline.tenant_shard_id,
TimelineOrOffloaded::Offloaded(offloaded) => offloaded.tenant_shard_id,
}
}
pub fn tenant_shard_id(&self) -> TenantShardId {
self.arc_ref().tenant_shard_id()
}
pub fn timeline_id(&self) -> TimelineId {
self.arc_ref().timeline_id()
match self {
TimelineOrOffloaded::Timeline(timeline) => timeline.timeline_id,
TimelineOrOffloaded::Offloaded(offloaded) => offloaded.timeline_id,
}
}
pub fn delete_progress(&self) -> &Arc<tokio::sync::Mutex<DeleteTimelineFlow>> {
match self {
@@ -625,7 +615,7 @@ impl TimelineOrOffloaded {
TimelineOrOffloaded::Offloaded(offloaded) => &offloaded.delete_progress,
}
}
fn remote_client_maybe_construct(&self, tenant: &Tenant) -> Arc<RemoteTimelineClient> {
pub fn remote_client_maybe_construct(&self, tenant: &Tenant) -> Arc<RemoteTimelineClient> {
match self {
TimelineOrOffloaded::Timeline(timeline) => timeline.remote_client.clone(),
TimelineOrOffloaded::Offloaded(offloaded) => match offloaded.remote_client.clone() {
@@ -642,38 +632,6 @@ impl TimelineOrOffloaded {
}
}
pub enum TimelineOrOffloadedArcRef<'a> {
Timeline(&'a Arc<Timeline>),
Offloaded(&'a Arc<OffloadedTimeline>),
}
impl TimelineOrOffloadedArcRef<'_> {
pub fn tenant_shard_id(&self) -> TenantShardId {
match self {
TimelineOrOffloadedArcRef::Timeline(timeline) => timeline.tenant_shard_id,
TimelineOrOffloadedArcRef::Offloaded(offloaded) => offloaded.tenant_shard_id,
}
}
pub fn timeline_id(&self) -> TimelineId {
match self {
TimelineOrOffloadedArcRef::Timeline(timeline) => timeline.timeline_id,
TimelineOrOffloadedArcRef::Offloaded(offloaded) => offloaded.timeline_id,
}
}
}
impl<'a> From<&'a Arc<Timeline>> for TimelineOrOffloadedArcRef<'a> {
fn from(timeline: &'a Arc<Timeline>) -> Self {
Self::Timeline(timeline)
}
}
impl<'a> From<&'a Arc<OffloadedTimeline>> for TimelineOrOffloadedArcRef<'a> {
fn from(timeline: &'a Arc<OffloadedTimeline>) -> Self {
Self::Offloaded(timeline)
}
}
#[derive(Debug, thiserror::Error, PartialEq, Eq)]
pub enum GetTimelineError {
#[error("Timeline is shutting down")]
@@ -779,99 +737,6 @@ impl Debug for SetStoppingError {
}
}
/// Arguments to [`Tenant::create_timeline`].
///
/// Not usable as an idempotency key for timeline creation because if [`CreateTimelineParamsBranch::ancestor_start_lsn`]
/// is `None`, the result of the timeline create call is not deterministic.
///
/// See [`CreateTimelineIdempotency`] for an idempotency key.
#[derive(Debug)]
pub(crate) enum CreateTimelineParams {
Bootstrap(CreateTimelineParamsBootstrap),
Branch(CreateTimelineParamsBranch),
}
#[derive(Debug)]
pub(crate) struct CreateTimelineParamsBootstrap {
pub(crate) new_timeline_id: TimelineId,
pub(crate) existing_initdb_timeline_id: Option<TimelineId>,
pub(crate) pg_version: u32,
}
/// NB: See comment on [`CreateTimelineIdempotency::Branch`] for why there's no `pg_version` here.
#[derive(Debug)]
pub(crate) struct CreateTimelineParamsBranch {
pub(crate) new_timeline_id: TimelineId,
pub(crate) ancestor_timeline_id: TimelineId,
pub(crate) ancestor_start_lsn: Option<Lsn>,
}
/// What is used to determine idempotency of a [`Tenant::create_timeline`] call in [`Tenant::start_creating_timeline`].
///
/// Each [`Timeline`] object holds [`Self`] as an immutable property in [`Timeline::create_idempotency`].
///
/// We lower timeline creation requests to [`Self`], and then use [`PartialEq::eq`] to compare [`Timeline::create_idempotency`] with the request.
/// If they are equal, we return a reference to the existing timeline, otherwise it's an idempotency conflict.
///
/// There is special treatment for [`Self::FailWithConflict`] to always return an idempotency conflict.
/// It would be nice to have more advanced derive macros to make that special treatment declarative.
///
/// Notes:
/// - Unlike [`CreateTimelineParams`], ancestor LSN is fixed, so, branching will be at a deterministic LSN.
/// - We make some trade-offs though, e.g., [`CreateTimelineParamsBootstrap::existing_initdb_timeline_id`]
/// is not considered for idempotency. We can improve on this over time if we deem it necessary.
///
#[derive(Debug, Clone, PartialEq, Eq)]
pub(crate) enum CreateTimelineIdempotency {
/// NB: special treatment, see comment in [`Self`].
FailWithConflict,
Bootstrap {
pg_version: u32,
},
/// NB: branches always have the same `pg_version` as their ancestor.
/// While [`pageserver_api::models::TimelineCreateRequestMode::Branch::pg_version`]
/// exists as a field, and is set by cplane, it has always been ignored by pageserver when
/// determining the child branch pg_version.
Branch {
ancestor_timeline_id: TimelineId,
ancestor_start_lsn: Lsn,
},
}
/// What is returned by [`Tenant::start_creating_timeline`].
#[must_use]
enum StartCreatingTimelineResult<'t> {
CreateGuard(TimelineCreateGuard<'t>),
Idempotent(Arc<Timeline>),
}
/// What is returned by [`Tenant::create_timeline`].
enum CreateTimelineResult {
Created(Arc<Timeline>),
Idempotent(Arc<Timeline>),
}
impl CreateTimelineResult {
fn discriminant(&self) -> &'static str {
match self {
Self::Created(_) => "Created",
Self::Idempotent(_) => "Idempotent",
}
}
fn timeline(&self) -> &Arc<Timeline> {
match self {
Self::Created(t) | Self::Idempotent(t) => t,
}
}
/// Unit test timelines aren't activated, test has to do it if it needs to.
#[cfg(test)]
fn into_timeline_for_test(self) -> Arc<Timeline> {
match self {
Self::Created(t) | Self::Idempotent(t) => t,
}
}
}
#[derive(thiserror::Error, Debug)]
pub enum CreateTimelineError {
#[error("creation of timeline with the given ID is in progress")]
@@ -1011,24 +876,12 @@ impl Tenant {
) -> anyhow::Result<()> {
let tenant_id = self.tenant_shard_id;
let idempotency = if metadata.ancestor_timeline().is_none() {
CreateTimelineIdempotency::Bootstrap {
pg_version: metadata.pg_version(),
}
} else {
CreateTimelineIdempotency::Branch {
ancestor_timeline_id: metadata.ancestor_timeline().unwrap(),
ancestor_start_lsn: metadata.ancestor_lsn(),
}
};
let timeline = self.create_timeline_struct(
timeline_id,
&metadata,
ancestor.clone(),
resources,
CreateTimelineCause::Load,
idempotency.clone(),
)?;
let disk_consistent_lsn = timeline.get_disk_consistent_lsn();
anyhow::ensure!(
@@ -1821,8 +1674,6 @@ impl Tenant {
}
/// Loads the specified (offloaded) timeline from S3 and attaches it as a loaded timeline
///
/// Counterpart to [`offload_timeline`].
async fn unoffload_timeline(
self: &Arc<Self>,
timeline_id: TimelineId,
@@ -1831,24 +1682,6 @@ impl Tenant {
) -> Result<Arc<Timeline>, TimelineArchivalError> {
info!("unoffloading timeline");
let cancel = self.cancel.clone();
// Protect against concurrent attempts to use this TimelineId
// We don't care much about idempotency, as it's ensured a layer above.
let allow_offloaded = true;
let _create_guard = self
.create_timeline_create_guard(
timeline_id,
CreateTimelineIdempotency::FailWithConflict,
allow_offloaded,
)
.map_err(|err| match err {
TimelineExclusionError::AlreadyCreating => TimelineArchivalError::AlreadyInProgress,
TimelineExclusionError::AlreadyExists { .. } => {
TimelineArchivalError::Other(anyhow::anyhow!("Timeline already exists"))
}
TimelineExclusionError::Other(e) => TimelineArchivalError::Other(e),
})?;
let timeline_preload = self
.load_timeline_metadata(timeline_id, self.remote_storage.clone(), cancel.clone())
.await;
@@ -2115,17 +1948,16 @@ impl Tenant {
self.timelines.lock().unwrap().keys().cloned().collect()
}
/// This is used by tests & import-from-basebackup.
/// This is used to create the initial 'main' timeline during bootstrapping,
/// or when importing a new base backup. The caller is expected to load an
/// initial image of the datadir to the new timeline after this.
///
/// The returned [`UninitializedTimeline`] contains no data nor metadata and it is in
/// a state that will fail [`Tenant::load_remote_timeline`] because `disk_consistent_lsn=Lsn(0)`.
/// Until that happens, the on-disk state is invalid (disk_consistent_lsn=Lsn(0))
/// and the timeline will fail to load at a restart.
///
/// The caller is responsible for getting the timeline into a state that will be accepted
/// by [`Tenant::load_remote_timeline`] / [`Tenant::attach`].
/// Then they may call [`UninitializedTimeline::finish_creation`] to add the timeline
/// to the [`Tenant::timelines`].
///
/// Tests should use `Tenant::create_test_timeline` to set up the minimum required metadata keys.
/// For tests, use `DatadirModification::init_empty_test_timeline` + `commit` to setup the
/// minimum amount of keys required to get a writable timeline.
/// (Without it, `put` might fail due to `repartition` failing.)
pub(crate) async fn create_empty_timeline(
&self,
new_timeline_id: TimelineId,
@@ -2139,15 +1971,7 @@ impl Tenant {
);
// Protect against concurrent attempts to use this TimelineId
let create_guard = match self
.start_creating_timeline(new_timeline_id, CreateTimelineIdempotency::FailWithConflict)
.await?
{
StartCreatingTimelineResult::CreateGuard(guard) => guard,
StartCreatingTimelineResult::Idempotent(_) => {
unreachable!("FailWithConflict implies we get an error instead")
}
};
let create_guard = self.create_timeline_create_guard(new_timeline_id)?;
let new_metadata = TimelineMetadata::new(
// Initialize disk_consistent LSN to 0, The caller must import some data to
@@ -2266,7 +2090,11 @@ impl Tenant {
#[allow(clippy::too_many_arguments)]
pub(crate) async fn create_timeline(
self: &Arc<Tenant>,
params: CreateTimelineParams,
new_timeline_id: TimelineId,
ancestor_timeline_id: Option<TimelineId>,
mut ancestor_start_lsn: Option<Lsn>,
pg_version: u32,
load_existing_initdb: Option<TimelineId>,
broker_client: storage_broker::BrokerClientChannel,
ctx: &RequestContext,
) -> Result<Arc<Timeline>, CreateTimelineError> {
@@ -2285,25 +2113,54 @@ impl Tenant {
.enter()
.map_err(|_| CreateTimelineError::ShuttingDown)?;
let result: CreateTimelineResult = match params {
CreateTimelineParams::Bootstrap(CreateTimelineParamsBootstrap {
new_timeline_id,
existing_initdb_timeline_id,
pg_version,
}) => {
self.bootstrap_timeline(
new_timeline_id,
pg_version,
existing_initdb_timeline_id,
ctx,
)
.await?
// Get exclusive access to the timeline ID: this ensures that it does not already exist,
// and that no other creation attempts will be allowed in while we are working.
let create_guard = match self.create_timeline_create_guard(new_timeline_id) {
Ok(m) => m,
Err(TimelineExclusionError::AlreadyCreating) => {
// Creation is in progress, we cannot create it again, and we cannot
// check if this request matches the existing one, so caller must try
// again later.
return Err(CreateTimelineError::AlreadyCreating);
}
CreateTimelineParams::Branch(CreateTimelineParamsBranch {
new_timeline_id,
ancestor_timeline_id,
mut ancestor_start_lsn,
}) => {
Err(TimelineExclusionError::Other(e)) => {
return Err(CreateTimelineError::Other(e));
}
Err(TimelineExclusionError::AlreadyExists(existing)) => {
debug!("timeline {new_timeline_id} already exists");
// Idempotency: creating the same timeline twice is not an error, unless
// the second creation has different parameters.
if existing.get_ancestor_timeline_id() != ancestor_timeline_id
|| existing.pg_version != pg_version
|| (ancestor_start_lsn.is_some()
&& ancestor_start_lsn != Some(existing.get_ancestor_lsn()))
{
return Err(CreateTimelineError::Conflict);
}
// Wait for uploads to complete, so that when we return Ok, the timeline
// is known to be durable on remote storage. Just like we do at the end of
// this function, after we have created the timeline ourselves.
//
// We only really care that the initial version of `index_part.json` has
// been uploaded. That's enough to remember that the timeline
// exists. However, there is no function to wait specifically for that so
// we just wait for all in-progress uploads to finish.
existing
.remote_client
.wait_completion()
.await
.context("wait for timeline uploads to complete")?;
return Ok(existing);
}
};
pausable_failpoint!("timeline-creation-after-uninit");
let loaded_timeline = match ancestor_timeline_id {
Some(ancestor_timeline_id) => {
let ancestor_timeline = self
.get_timeline(ancestor_timeline_id, false)
.context("Cannot branch off the timeline that's not present in pageserver")?;
@@ -2350,48 +2207,43 @@ impl Tenant {
})?;
}
self.branch_timeline(&ancestor_timeline, new_timeline_id, ancestor_start_lsn, ctx)
.await?
self.branch_timeline(
&ancestor_timeline,
new_timeline_id,
ancestor_start_lsn,
create_guard,
ctx,
)
.await?
}
None => {
self.bootstrap_timeline(
new_timeline_id,
pg_version,
load_existing_initdb,
create_guard,
ctx,
)
.await?
}
};
// At this point we have dropped our guard on [`Self::timelines_creating`], and
// the timeline is visible in [`Self::timelines`], but it is _not_ durable yet. We must
// not send a success to the caller until it is. The same applies to idempotent retries.
//
// TODO: the timeline is already visible in [`Self::timelines`]; a caller could incorrectly
// assume that, because they can see the timeline via API, that the creation is done and
// that it is durable. Ideally, we would keep the timeline hidden (in [`Self::timelines_creating`])
// until it is durable, e.g., by extending the time we hold the creation guard. This also
// interacts with UninitializedTimeline and is generally a bit tricky.
//
// To re-emphasize: the only correct way to create a timeline is to repeat calling the
// creation API until it returns success. Only then is durability guaranteed.
info!(creation_result=%result.discriminant(), "waiting for timeline to be durable");
result
.timeline()
// not send a success to the caller until it is. The same applies to handling retries,
// see the handling of [`TimelineExclusionError::AlreadyExists`] above.
let kind = ancestor_timeline_id
.map(|_| "branched")
.unwrap_or("bootstrapped");
loaded_timeline
.remote_client
.wait_completion()
.await
.context("wait for timeline initial uploads to complete")?;
.with_context(|| format!("wait for {} timeline initial uploads to complete", kind))?;
// The creating task is responsible for activating the timeline.
// We do this after `wait_completion()` so that we don't spin up tasks that start
// doing stuff before the IndexPart is durable in S3, which is done by the previous section.
let activated_timeline = match result {
CreateTimelineResult::Created(timeline) => {
timeline.activate(self.clone(), broker_client, None, ctx);
timeline
}
CreateTimelineResult::Idempotent(timeline) => {
info!(
"request was deemed idempotent, activation will be done by the creating task"
);
timeline
}
};
loaded_timeline.activate(self.clone(), broker_client, None, ctx);
Ok(activated_timeline)
Ok(loaded_timeline)
}
pub(crate) async fn delete_timeline(
@@ -3048,58 +2900,33 @@ impl Tenant {
&self,
child_shards: &Vec<TenantShardId>,
) -> anyhow::Result<()> {
let (timelines, offloaded) = {
let timelines = self.timelines.lock().unwrap();
let offloaded = self.timelines_offloaded.lock().unwrap();
(timelines.clone(), offloaded.clone())
};
let timelines_iter = timelines
.values()
.map(TimelineOrOffloadedArcRef::<'_>::from)
.chain(
offloaded
.values()
.map(TimelineOrOffloadedArcRef::<'_>::from),
);
for timeline in timelines_iter {
let timelines = self.timelines.lock().unwrap().clone();
for timeline in timelines.values() {
// We do not block timeline creation/deletion during splits inside the pageserver: it is up to higher levels
// to ensure that they do not start a split if currently in the process of doing these.
let timeline_id = timeline.timeline_id();
if let TimelineOrOffloadedArcRef::Timeline(timeline) = timeline {
// Upload an index from the parent: this is partly to provide freshness for the
// child tenants that will copy it, and partly for general ease-of-debugging: there will
// always be a parent shard index in the same generation as we wrote the child shard index.
tracing::info!(%timeline_id, "Uploading index");
timeline
.remote_client
.schedule_index_upload_for_file_changes()?;
timeline.remote_client.wait_completion().await?;
}
let remote_client = match timeline {
TimelineOrOffloadedArcRef::Timeline(timeline) => timeline.remote_client.clone(),
TimelineOrOffloadedArcRef::Offloaded(offloaded) => {
let remote_client = self
.build_timeline_client(offloaded.timeline_id, self.remote_storage.clone());
Arc::new(remote_client)
}
};
// Upload an index from the parent: this is partly to provide freshness for the
// child tenants that will copy it, and partly for general ease-of-debugging: there will
// always be a parent shard index in the same generation as we wrote the child shard index.
tracing::info!(timeline_id=%timeline.timeline_id, "Uploading index");
timeline
.remote_client
.schedule_index_upload_for_file_changes()?;
timeline.remote_client.wait_completion().await?;
// Shut down the timeline's remote client: this means that the indices we write
// for child shards will not be invalidated by the parent shard deleting layers.
tracing::info!(%timeline_id, "Shutting down remote storage client");
remote_client.shutdown().await;
tracing::info!(timeline_id=%timeline.timeline_id, "Shutting down remote storage client");
timeline.remote_client.shutdown().await;
// Download methods can still be used after shutdown, as they don't flow through the remote client's
// queue. In principal the RemoteTimelineClient could provide this without downloading it, but this
// operation is rare, so it's simpler to just download it (and robustly guarantees that the index
// we use here really is the remotely persistent one).
tracing::info!(%timeline_id, "Downloading index_part from parent");
let result = remote_client
tracing::info!(timeline_id=%timeline.timeline_id, "Downloading index_part from parent");
let result = timeline.remote_client
.download_index_file(&self.cancel)
.instrument(info_span!("download_index_file", tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(), %timeline_id))
.instrument(info_span!("download_index_file", tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(), timeline_id=%timeline.timeline_id))
.await?;
let index_part = match result {
MaybeDeletedIndexPart::Deleted(_) => {
@@ -3109,11 +2936,11 @@ impl Tenant {
};
for child_shard in child_shards {
tracing::info!(%timeline_id, "Uploading index_part for child {}", child_shard.to_index());
tracing::info!(timeline_id=%timeline.timeline_id, "Uploading index_part for child {}", child_shard.to_index());
upload_index_part(
&self.remote_storage,
child_shard,
&timeline_id,
&timeline.timeline_id,
self.generation,
&index_part,
&self.cancel,
@@ -3122,6 +2949,8 @@ impl Tenant {
}
}
// TODO: also copy index files of offloaded timelines
let tenant_manifest = self.tenant_manifest();
// TODO: generation support
let generation = remote_timeline_client::TENANT_MANIFEST_GENERATION;
@@ -3404,7 +3233,6 @@ impl Tenant {
ancestor: Option<Arc<Timeline>>,
resources: TimelineResources,
cause: CreateTimelineCause,
create_idempotency: CreateTimelineIdempotency,
) -> anyhow::Result<Arc<Timeline>> {
let state = match cause {
CreateTimelineCause::Load => {
@@ -3434,7 +3262,6 @@ impl Tenant {
pg_version,
state,
self.attach_wal_lag_cooldown.clone(),
create_idempotency,
self.cancel.child_token(),
);
@@ -3920,16 +3747,16 @@ impl Tenant {
/// timeline background tasks are launched, except the flush loop.
#[cfg(test)]
async fn branch_timeline_test(
self: &Arc<Self>,
&self,
src_timeline: &Arc<Timeline>,
dst_id: TimelineId,
ancestor_lsn: Option<Lsn>,
ctx: &RequestContext,
) -> Result<Arc<Timeline>, CreateTimelineError> {
let create_guard = self.create_timeline_create_guard(dst_id).unwrap();
let tl = self
.branch_timeline_impl(src_timeline, dst_id, ancestor_lsn, ctx)
.await?
.into_timeline_for_test();
.branch_timeline_impl(src_timeline, dst_id, ancestor_lsn, create_guard, ctx)
.await?;
tl.set_state(TimelineState::Active);
Ok(tl)
}
@@ -3938,7 +3765,7 @@ impl Tenant {
#[cfg(test)]
#[allow(clippy::too_many_arguments)]
pub async fn branch_timeline_test_with_layers(
self: &Arc<Self>,
&self,
src_timeline: &Arc<Timeline>,
dst_id: TimelineId,
ancestor_lsn: Option<Lsn>,
@@ -3986,24 +3813,28 @@ impl Tenant {
}
/// Branch an existing timeline.
///
/// The caller is responsible for activating the returned timeline.
async fn branch_timeline(
self: &Arc<Self>,
&self,
src_timeline: &Arc<Timeline>,
dst_id: TimelineId,
start_lsn: Option<Lsn>,
timeline_create_guard: TimelineCreateGuard<'_>,
ctx: &RequestContext,
) -> Result<CreateTimelineResult, CreateTimelineError> {
self.branch_timeline_impl(src_timeline, dst_id, start_lsn, ctx)
) -> Result<Arc<Timeline>, CreateTimelineError> {
self.branch_timeline_impl(src_timeline, dst_id, start_lsn, timeline_create_guard, ctx)
.await
}
async fn branch_timeline_impl(
self: &Arc<Self>,
&self,
src_timeline: &Arc<Timeline>,
dst_id: TimelineId,
start_lsn: Option<Lsn>,
timeline_create_guard: TimelineCreateGuard<'_>,
_ctx: &RequestContext,
) -> Result<CreateTimelineResult, CreateTimelineError> {
) -> Result<Arc<Timeline>, CreateTimelineError> {
let src_id = src_timeline.timeline_id;
// We will validate our ancestor LSN in this function. Acquire the GC lock so that
@@ -4018,23 +3849,6 @@ impl Tenant {
lsn
});
// we finally have determined the ancestor_start_lsn, so we can get claim exclusivity now
let timeline_create_guard = match self
.start_creating_timeline(
dst_id,
CreateTimelineIdempotency::Branch {
ancestor_timeline_id: src_timeline.timeline_id,
ancestor_start_lsn: start_lsn,
},
)
.await?
{
StartCreatingTimelineResult::CreateGuard(guard) => guard,
StartCreatingTimelineResult::Idempotent(timeline) => {
return Ok(CreateTimelineResult::Idempotent(timeline));
}
};
// Ensure that `start_lsn` is valid, i.e. the LSN is within the PITR
// horizon on the source timeline
//
@@ -4120,92 +3934,28 @@ impl Tenant {
.schedule_index_upload_for_full_metadata_update(&metadata)
.context("branch initial metadata upload")?;
// Callers are responsible to wait for uploads to complete and for activating the timeline.
Ok(CreateTimelineResult::Created(new_timeline))
Ok(new_timeline)
}
/// For unit tests, make this visible so that other modules can directly create timelines
#[cfg(test)]
#[tracing::instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(), %timeline_id))]
pub(crate) async fn bootstrap_timeline_test(
self: &Arc<Self>,
&self,
timeline_id: TimelineId,
pg_version: u32,
load_existing_initdb: Option<TimelineId>,
ctx: &RequestContext,
) -> anyhow::Result<Arc<Timeline>> {
self.bootstrap_timeline(timeline_id, pg_version, load_existing_initdb, ctx)
.await
.map_err(anyhow::Error::new)
.map(|r| r.into_timeline_for_test())
}
/// Get exclusive access to the timeline ID for creation.
///
/// Timeline-creating code paths must use this function before making changes
/// to in-memory or persistent state.
///
/// The `state` parameter is a description of the timeline creation operation
/// we intend to perform.
/// If the timeline was already created in the meantime, we check whether this
/// request conflicts or is idempotent , based on `state`.
async fn start_creating_timeline(
&self,
new_timeline_id: TimelineId,
idempotency: CreateTimelineIdempotency,
) -> Result<StartCreatingTimelineResult<'_>, CreateTimelineError> {
let allow_offloaded = false;
match self.create_timeline_create_guard(new_timeline_id, idempotency, allow_offloaded) {
Ok(create_guard) => {
pausable_failpoint!("timeline-creation-after-uninit");
Ok(StartCreatingTimelineResult::CreateGuard(create_guard))
}
Err(TimelineExclusionError::AlreadyCreating) => {
// Creation is in progress, we cannot create it again, and we cannot
// check if this request matches the existing one, so caller must try
// again later.
Err(CreateTimelineError::AlreadyCreating)
}
Err(TimelineExclusionError::Other(e)) => Err(CreateTimelineError::Other(e)),
Err(TimelineExclusionError::AlreadyExists {
existing: TimelineOrOffloaded::Offloaded(_existing),
..
}) => {
info!("timeline already exists but is offloaded");
Err(CreateTimelineError::Conflict)
}
Err(TimelineExclusionError::AlreadyExists {
existing: TimelineOrOffloaded::Timeline(existing),
arg,
}) => {
{
let existing = &existing.create_idempotency;
let _span = info_span!("idempotency_check", ?existing, ?arg).entered();
debug!("timeline already exists");
match (existing, &arg) {
// FailWithConflict => no idempotency check
(CreateTimelineIdempotency::FailWithConflict, _)
| (_, CreateTimelineIdempotency::FailWithConflict) => {
warn!("timeline already exists, failing request");
return Err(CreateTimelineError::Conflict);
}
// Idempotent <=> CreateTimelineIdempotency is identical
(x, y) if x == y => {
info!("timeline already exists and idempotency matches, succeeding request");
// fallthrough
}
(_, _) => {
warn!("idempotency conflict, failing request");
return Err(CreateTimelineError::Conflict);
}
}
}
Ok(StartCreatingTimelineResult::Idempotent(existing))
}
}
let create_guard = self.create_timeline_create_guard(timeline_id).unwrap();
self.bootstrap_timeline(
timeline_id,
pg_version,
load_existing_initdb,
create_guard,
ctx,
)
.await
}
async fn upload_initdb(
@@ -4259,26 +4009,16 @@ impl Tenant {
/// - run initdb to init temporary instance and get bootstrap data
/// - after initialization completes, tar up the temp dir and upload it to S3.
///
/// The caller is responsible for activating the returned timeline.
async fn bootstrap_timeline(
self: &Arc<Self>,
&self,
timeline_id: TimelineId,
pg_version: u32,
load_existing_initdb: Option<TimelineId>,
timeline_create_guard: TimelineCreateGuard<'_>,
ctx: &RequestContext,
) -> Result<CreateTimelineResult, CreateTimelineError> {
let timeline_create_guard = match self
.start_creating_timeline(
timeline_id,
CreateTimelineIdempotency::Bootstrap { pg_version },
)
.await?
{
StartCreatingTimelineResult::CreateGuard(guard) => guard,
StartCreatingTimelineResult::Idempotent(timeline) => {
return Ok(CreateTimelineResult::Idempotent(timeline))
}
};
) -> anyhow::Result<Arc<Timeline>> {
// create a `tenant/{tenant_id}/timelines/basebackup-{timeline_id}.{TEMP_FILE_SUFFIX}/`
// temporary directory for basebackup files for the given timeline.
@@ -4342,9 +4082,7 @@ impl Tenant {
.context("extract initdb tar")?;
} else {
// Init temporarily repo to get bootstrap data, this creates a directory in the `pgdata_path` path
run_initdb(self.conf, &pgdata_path, pg_version, &self.cancel)
.await
.context("run initdb")?;
run_initdb(self.conf, &pgdata_path, pg_version, &self.cancel).await?;
// Upload the created data dir to S3
if self.tenant_shard_id().is_shard_zero() {
@@ -4398,9 +4136,7 @@ impl Tenant {
})?;
fail::fail_point!("before-checkpoint-new-timeline", |_| {
Err(CreateTimelineError::Other(anyhow::anyhow!(
"failpoint before-checkpoint-new-timeline"
)))
anyhow::bail!("failpoint before-checkpoint-new-timeline");
});
unfinished_timeline
@@ -4415,9 +4151,7 @@ impl Tenant {
// All done!
let timeline = raw_timeline.finish_creation()?;
// Callers are responsible to wait for uploads to complete and for activating the timeline.
Ok(CreateTimelineResult::Created(timeline))
Ok(timeline)
}
fn build_timeline_remote_client(&self, timeline_id: TimelineId) -> RemoteTimelineClient {
@@ -4467,7 +4201,6 @@ impl Tenant {
ancestor,
resources,
CreateTimelineCause::Load,
create_guard.idempotency.clone(),
)
.context("Failed to create timeline data structure")?;
@@ -4505,26 +4238,15 @@ impl Tenant {
/// Get a guard that provides exclusive access to the timeline directory, preventing
/// concurrent attempts to create the same timeline.
///
/// The `allow_offloaded` parameter controls whether to tolerate the existence of
/// offloaded timelines or not.
fn create_timeline_create_guard(
&self,
timeline_id: TimelineId,
idempotency: CreateTimelineIdempotency,
allow_offloaded: bool,
) -> Result<TimelineCreateGuard, TimelineExclusionError> {
let tenant_shard_id = self.tenant_shard_id;
let timeline_path = self.conf.timeline_path(&tenant_shard_id, &timeline_id);
let create_guard = TimelineCreateGuard::new(
self,
timeline_id,
timeline_path.clone(),
idempotency,
allow_offloaded,
)?;
let create_guard = TimelineCreateGuard::new(self, timeline_id, timeline_path.clone())?;
// At this stage, we have got exclusive access to in-memory state for this timeline ID
// for creation.
@@ -5160,10 +4882,7 @@ mod tests {
.await
{
Ok(_) => panic!("duplicate timeline creation should fail"),
Err(e) => assert_eq!(
e.to_string(),
"timeline already exists with different parameters".to_string()
),
Err(e) => assert_eq!(e.to_string(), "Already exists".to_string()),
}
Ok(())

View File

@@ -1278,14 +1278,10 @@ impl RemoteTimelineClient {
let fut = {
let mut guard = self.upload_queue.lock().unwrap();
let upload_queue = match &mut *guard {
UploadQueue::Stopped(_) => {
scopeguard::ScopeGuard::into_inner(sg);
return;
}
UploadQueue::Stopped(_) => return,
UploadQueue::Uninitialized => {
// transition into Stopped state
self.stop_impl(&mut guard);
scopeguard::ScopeGuard::into_inner(sg);
return;
}
UploadQueue::Initialized(ref mut init) => init,

View File

@@ -187,8 +187,6 @@ pub(super) async fn gather_inputs(
// but it is unlikely to cause any issues. In the worst case,
// the calculation will error out.
timelines.retain(|t| t.is_active());
// Also filter out archived timelines.
timelines.retain(|t| t.is_archived() != Some(true));
// Build a map of branch points.
let mut branchpoints: HashMap<TimelineId, HashSet<Lsn>> = HashMap::new();

View File

@@ -1084,7 +1084,7 @@ impl DeltaLayerInner {
}
}
pub(crate) async fn index_entries<'a>(
pub(super) async fn load_keys<'a>(
&'a self,
ctx: &RequestContext,
) -> Result<Vec<DeltaEntry<'a>>> {
@@ -1346,7 +1346,7 @@ impl DeltaLayerInner {
tree_reader.dump().await?;
let keys = self.index_entries(ctx).await?;
let keys = self.load_keys(ctx).await?;
async fn dump_blob(val: &ValueRef<'_>, ctx: &RequestContext) -> anyhow::Result<String> {
let buf = val.load_raw(ctx).await?;
@@ -1453,16 +1453,6 @@ impl DeltaLayerInner {
),
}
}
/// NB: not super efficient, but not terrible either. Should prob be an iterator.
//
// We're reusing the index traversal logical in plan_reads; would be nice to
// factor that out.
pub(crate) async fn load_keys(&self, ctx: &RequestContext) -> anyhow::Result<Vec<Key>> {
self.index_entries(ctx)
.await
.map(|entries| entries.into_iter().map(|entry| entry.key).collect())
}
}
/// A set of data associated with a delta layer key and its value

View File

@@ -673,21 +673,6 @@ impl ImageLayerInner {
),
}
}
/// NB: not super efficient, but not terrible either. Should prob be an iterator.
//
// We're reusing the index traversal logical in plan_reads; would be nice to
// factor that out.
pub(crate) async fn load_keys(&self, ctx: &RequestContext) -> anyhow::Result<Vec<Key>> {
let plan = self
.plan_reads(KeySpace::single(self.key_range.clone()), None, ctx)
.await?;
Ok(plan
.into_iter()
.flat_map(|read| read.blobs_at)
.map(|(_, blob_meta)| blob_meta.key)
.collect())
}
}
/// A builder object for constructing a new image layer.

View File

@@ -19,7 +19,7 @@ use crate::task_mgr::TaskKind;
use crate::tenant::timeline::{CompactionError, GetVectoredError};
use crate::tenant::{remote_timeline_client::LayerFileMetadata, Timeline};
use super::delta_layer::{self};
use super::delta_layer::{self, DeltaEntry};
use super::image_layer::{self};
use super::{
AsLayerDesc, ImageLayerWriter, LayerAccessStats, LayerAccessStatsReset, LayerName,
@@ -1841,22 +1841,23 @@ impl ResidentLayer {
pub(crate) async fn load_keys<'a>(
&'a self,
ctx: &RequestContext,
) -> anyhow::Result<Vec<pageserver_api::key::Key>> {
) -> anyhow::Result<Vec<DeltaEntry<'a>>> {
use LayerKind::*;
let owner = &self.owner.0;
let inner = self.downloaded.get(owner, ctx).await?;
match self.downloaded.get(owner, ctx).await? {
Delta(ref d) => {
// this is valid because the DownloadedLayer::kind is a OnceCell, not a
// Mutex<OnceCell>, so we cannot go and deinitialize the value with OnceCell::take
// while it's being held.
self.owner.record_access(ctx);
// this is valid because the DownloadedLayer::kind is a OnceCell, not a
// Mutex<OnceCell>, so we cannot go and deinitialize the value with OnceCell::take
// while it's being held.
self.owner.record_access(ctx);
let res = match inner {
Delta(ref d) => delta_layer::DeltaLayerInner::load_keys(d, ctx).await,
Image(ref i) => image_layer::ImageLayerInner::load_keys(i, ctx).await,
};
res.with_context(|| format!("Layer index is corrupted for {self}"))
delta_layer::DeltaLayerInner::load_keys(d, ctx)
.await
.with_context(|| format!("Layer index is corrupted for {self}"))
}
Image(_) => anyhow::bail!(format!("cannot load_keys on a image layer {self}")),
}
}
/// Read all they keys in this layer which match the ShardIdentity, and write them all to

View File

@@ -57,34 +57,6 @@ impl std::fmt::Display for PersistentLayerKey {
}
}
impl From<ImageLayerName> for PersistentLayerKey {
fn from(image_layer_name: ImageLayerName) -> Self {
Self {
key_range: image_layer_name.key_range,
lsn_range: PersistentLayerDesc::image_layer_lsn_range(image_layer_name.lsn),
is_delta: false,
}
}
}
impl From<DeltaLayerName> for PersistentLayerKey {
fn from(delta_layer_name: DeltaLayerName) -> Self {
Self {
key_range: delta_layer_name.key_range,
lsn_range: delta_layer_name.lsn_range,
is_delta: true,
}
}
}
impl From<LayerName> for PersistentLayerKey {
fn from(layer_name: LayerName) -> Self {
match layer_name {
LayerName::Image(i) => i.into(),
LayerName::Delta(d) => d.into(),
}
}
}
impl PersistentLayerDesc {
pub fn key(&self) -> PersistentLayerKey {
PersistentLayerKey {

View File

@@ -424,9 +424,6 @@ pub struct Timeline {
pub(crate) handles: handle::PerTimelineState<crate::page_service::TenantManagerTypes>,
pub(crate) attach_wal_lag_cooldown: Arc<OnceLock<WalLagCooldown>>,
/// Cf. [`crate::tenant::CreateTimelineIdempotency`].
pub(crate) create_idempotency: crate::tenant::CreateTimelineIdempotency,
}
pub type TimelineDeleteProgress = Arc<tokio::sync::Mutex<DeleteTimelineFlow>>;
@@ -2139,7 +2136,6 @@ impl Timeline {
pg_version: u32,
state: TimelineState,
attach_wal_lag_cooldown: Arc<OnceLock<WalLagCooldown>>,
create_idempotency: crate::tenant::CreateTimelineIdempotency,
cancel: CancellationToken,
) -> Arc<Self> {
let disk_consistent_lsn = metadata.disk_consistent_lsn();
@@ -2278,8 +2274,6 @@ impl Timeline {
handles: Default::default(),
attach_wal_lag_cooldown,
create_idempotency,
};
result.repartition_threshold =

View File

@@ -834,12 +834,7 @@ impl Timeline {
if self.cancel.is_cancelled() {
return Err(CompactionError::ShuttingDown);
}
let delta = l.get_as_delta(ctx).await.map_err(CompactionError::Other)?;
let keys = delta
.index_entries(ctx)
.await
.map_err(CompactionError::Other)?;
all_keys.extend(keys);
all_keys.extend(l.load_keys(ctx).await.map_err(CompactionError::Other)?);
}
// The current stdlib sorting implementation is designed in a way where it is
// particularly fast where the slice is made up of sorted sub-ranges.
@@ -2443,7 +2438,7 @@ impl CompactionDeltaLayer<TimelineAdaptor> for ResidentDeltaLayer {
type DeltaEntry<'a> = DeltaEntry<'a>;
async fn load_keys<'a>(&self, ctx: &RequestContext) -> anyhow::Result<Vec<DeltaEntry<'_>>> {
self.0.get_as_delta(ctx).await?.index_entries(ctx).await
self.0.load_keys(ctx).await
}
}

View File

@@ -313,7 +313,6 @@ impl DeleteTimelineFlow {
// Important. We dont pass ancestor above because it can be missing.
// Thus we need to skip the validation here.
CreateTimelineCause::Delete,
crate::tenant::CreateTimelineIdempotency::FailWithConflict, // doesn't matter what we put here
)
.context("create_timeline_struct")?;

View File

@@ -45,16 +45,13 @@ impl LayerManager {
pub(crate) fn get_from_key(&self, key: &PersistentLayerKey) -> Layer {
// The assumption for the `expect()` is that all code maintains the following invariant:
// A layer's descriptor is present in the LayerMap => the LayerFileManager contains a layer for the descriptor.
self.try_get_from_key(key)
self.layers()
.get(key)
.with_context(|| format!("get layer from key: {key}"))
.expect("not found")
.clone()
}
pub(crate) fn try_get_from_key(&self, key: &PersistentLayerKey) -> Option<&Layer> {
self.layers().get(key)
}
pub(crate) fn get_from_desc(&self, desc: &PersistentLayerDesc) -> Layer {
self.get_from_key(&desc.key())
}

View File

@@ -5,11 +5,7 @@ use camino::Utf8PathBuf;
use tracing::{error, info, info_span};
use utils::{fs_ext, id::TimelineId, lsn::Lsn};
use crate::{
context::RequestContext,
import_datadir,
tenant::{CreateTimelineIdempotency, Tenant, TimelineOrOffloaded},
};
use crate::{context::RequestContext, import_datadir, tenant::Tenant};
use super::Timeline;
@@ -169,17 +165,13 @@ pub(crate) struct TimelineCreateGuard<'t> {
owning_tenant: &'t Tenant,
timeline_id: TimelineId,
pub(crate) timeline_path: Utf8PathBuf,
pub(crate) idempotency: CreateTimelineIdempotency,
}
/// Errors when acquiring exclusive access to a timeline ID for creation
#[derive(thiserror::Error, Debug)]
pub(crate) enum TimelineExclusionError {
#[error("Already exists")]
AlreadyExists {
existing: TimelineOrOffloaded,
arg: CreateTimelineIdempotency,
},
AlreadyExists(Arc<Timeline>),
#[error("Already creating")]
AlreadyCreating,
@@ -193,42 +185,27 @@ impl<'t> TimelineCreateGuard<'t> {
owning_tenant: &'t Tenant,
timeline_id: TimelineId,
timeline_path: Utf8PathBuf,
idempotency: CreateTimelineIdempotency,
allow_offloaded: bool,
) -> Result<Self, TimelineExclusionError> {
// Lock order: this is the only place we take both locks. During drop() we only
// lock creating_timelines
let timelines = owning_tenant.timelines.lock().unwrap();
let timelines_offloaded = owning_tenant.timelines_offloaded.lock().unwrap();
let mut creating_timelines: std::sync::MutexGuard<
'_,
std::collections::HashSet<TimelineId>,
> = owning_tenant.timelines_creating.lock().unwrap();
if let Some(existing) = timelines.get(&timeline_id) {
return Err(TimelineExclusionError::AlreadyExists {
existing: TimelineOrOffloaded::Timeline(existing.clone()),
arg: idempotency,
});
Err(TimelineExclusionError::AlreadyExists(existing.clone()))
} else if creating_timelines.contains(&timeline_id) {
Err(TimelineExclusionError::AlreadyCreating)
} else {
creating_timelines.insert(timeline_id);
Ok(Self {
owning_tenant,
timeline_id,
timeline_path,
})
}
if !allow_offloaded {
if let Some(existing) = timelines_offloaded.get(&timeline_id) {
return Err(TimelineExclusionError::AlreadyExists {
existing: TimelineOrOffloaded::Offloaded(existing.clone()),
arg: idempotency,
});
}
}
if creating_timelines.contains(&timeline_id) {
return Err(TimelineExclusionError::AlreadyCreating);
}
creating_timelines.insert(timeline_id);
Ok(Self {
owning_tenant,
timeline_id,
timeline_path,
idempotency,
})
}
}

View File

@@ -16,24 +16,18 @@ use tokio_epoll_uring::{System, SystemHandle};
use crate::virtual_file::on_fatal_io_error;
use crate::metrics::tokio_epoll_uring::{self as metrics, THREAD_LOCAL_METRICS_STORAGE};
use crate::metrics::tokio_epoll_uring as metrics;
#[derive(Clone)]
struct ThreadLocalState(Arc<ThreadLocalStateInner>);
struct ThreadLocalStateInner {
cell: tokio::sync::OnceCell<SystemHandle<metrics::ThreadLocalMetrics>>,
cell: tokio::sync::OnceCell<SystemHandle>,
launch_attempts: AtomicU32,
/// populated through fetch_add from [`THREAD_LOCAL_STATE_ID`]
thread_local_state_id: u64,
}
impl Drop for ThreadLocalStateInner {
fn drop(&mut self) {
THREAD_LOCAL_METRICS_STORAGE.remove_system(self.thread_local_state_id);
}
}
impl ThreadLocalState {
pub fn new() -> Self {
Self(Arc::new(ThreadLocalStateInner {
@@ -77,8 +71,7 @@ pub async fn thread_local_system() -> Handle {
&fake_cancel,
)
.await;
let per_system_metrics = metrics::THREAD_LOCAL_METRICS_STORAGE.register_system(inner.thread_local_state_id);
let res = System::launch_with_metrics(per_system_metrics)
let res = System::launch()
// this might move us to another executor thread => loop outside the get_or_try_init, not inside it
.await;
match res {
@@ -93,7 +86,6 @@ pub async fn thread_local_system() -> Handle {
emit_launch_failure_process_stats();
});
metrics::THREAD_LOCAL_LAUNCH_FAILURES.inc();
metrics::THREAD_LOCAL_METRICS_STORAGE.remove_system(inner.thread_local_state_id);
Err(())
}
// abort the process instead of panicking because pageserver usually becomes half-broken if we panic somewhere.
@@ -123,7 +115,7 @@ fn emit_launch_failure_process_stats() {
// number of threads
// rss / system memory usage generally
let tokio_epoll_uring::metrics::GlobalMetrics {
let tokio_epoll_uring::metrics::Metrics {
systems_created,
systems_destroyed,
} = tokio_epoll_uring::metrics::global();
@@ -190,7 +182,7 @@ fn emit_launch_failure_process_stats() {
pub struct Handle(ThreadLocalState);
impl std::ops::Deref for Handle {
type Target = SystemHandle<metrics::ThreadLocalMetrics>;
type Target = SystemHandle;
fn deref(&self) -> &Self::Target {
self.0

View File

@@ -21,7 +21,6 @@
//! redo Postgres process, but some records it can handle directly with
//! bespoken Rust code.
use std::collections::HashMap;
use std::sync::Arc;
use std::sync::OnceLock;
use std::time::Duration;
@@ -1621,12 +1620,6 @@ impl WalIngest {
},
)?;
// Group relations to drop by dbNode. This map will contain all relations that _might_
// exist, we will reduce it to which ones really exist later. This map can be huge if
// the transaction touches a huge number of relations (there is no bound on this in
// postgres).
let mut drop_relations: HashMap<(u32, u32), Vec<RelTag>> = HashMap::new();
for xnode in &parsed.xnodes {
for forknum in MAIN_FORKNUM..=INIT_FORKNUM {
let rel = RelTag {
@@ -1635,16 +1628,15 @@ impl WalIngest {
dbnode: xnode.dbnode,
relnode: xnode.relnode,
};
drop_relations
.entry((xnode.spcnode, xnode.dbnode))
.or_default()
.push(rel);
if modification
.tline
.get_rel_exists(rel, Version::Modified(modification), ctx)
.await?
{
self.put_rel_drop(modification, rel, ctx).await?;
}
}
}
// Execute relation drops in a batch: the number may be huge, so deleting individually is prohibitively expensive
modification.put_rel_drops(drop_relations, ctx).await?;
if origin_id != 0 {
modification
.set_replorigin(origin_id, parsed.origin_lsn)
@@ -2354,6 +2346,16 @@ impl WalIngest {
Ok(())
}
async fn put_rel_drop(
&mut self,
modification: &mut DatadirModification<'_>,
rel: RelTag,
ctx: &RequestContext,
) -> Result<()> {
modification.put_rel_drop(rel, ctx).await?;
Ok(())
}
async fn handle_rel_extend(
&mut self,
modification: &mut DatadirModification<'_>,
@@ -2417,59 +2419,6 @@ impl WalIngest {
WAL_INGEST
.gap_blocks_zeroed_on_rel_extend
.inc_by(gap_blocks_filled);
// Log something when relation extends cause use to fill gaps
// with zero pages. Logging is rate limited per pg version to
// avoid skewing.
if gap_blocks_filled > 0 {
use once_cell::sync::Lazy;
use std::sync::Mutex;
use utils::rate_limit::RateLimit;
struct RateLimitPerPgVersion {
rate_limiters: [Lazy<Mutex<RateLimit>>; 4],
}
impl RateLimitPerPgVersion {
const fn new() -> Self {
Self {
rate_limiters: [const {
Lazy::new(|| Mutex::new(RateLimit::new(Duration::from_secs(30))))
}; 4],
}
}
const fn rate_limiter(
&self,
pg_version: u32,
) -> Option<&Lazy<Mutex<RateLimit>>> {
const MIN_PG_VERSION: u32 = 14;
const MAX_PG_VERSION: u32 = 17;
if pg_version < MIN_PG_VERSION || pg_version > MAX_PG_VERSION {
return None;
}
Some(&self.rate_limiters[(pg_version - MIN_PG_VERSION) as usize])
}
}
static LOGGED: RateLimitPerPgVersion = RateLimitPerPgVersion::new();
if let Some(rate_limiter) = LOGGED.rate_limiter(modification.tline.pg_version) {
if let Ok(mut locked) = rate_limiter.try_lock() {
locked.call(|| {
info!(
lsn=%modification.get_lsn(),
pg_version=%modification.tline.pg_version,
rel=%rel,
"Filled {} gap blocks on rel extend to {} from {}",
gap_blocks_filled,
new_nblocks,
old_nblocks);
});
}
}
}
}
Ok(())
}
@@ -2867,9 +2816,7 @@ mod tests {
// Drop rel
let mut m = tline.begin_modification(Lsn(0x30));
let mut rel_drops = HashMap::new();
rel_drops.insert((TESTREL_A.spcnode, TESTREL_A.dbnode), vec![TESTREL_A]);
m.put_rel_drops(rel_drops, &ctx).await?;
walingest.put_rel_drop(&mut m, TESTREL_A, &ctx).await?;
m.commit(&ctx).await?;
// Check that rel is not visible anymore

View File

@@ -8,7 +8,6 @@ OBJS = \
file_cache.o \
hll.o \
libpagestore.o \
logical_replication_monitor.o \
neon.o \
neon_pgversioncompat.o \
neon_perf_counters.o \
@@ -33,6 +32,8 @@ DATA = \
neon--1.2--1.3.sql \
neon--1.3--1.4.sql \
neon--1.4--1.5.sql \
neon--1.5--1.6.sql \
neon--1.6--1.5.sql \
neon--1.5--1.4.sql \
neon--1.4--1.3.sql \
neon--1.3--1.2.sql \

View File

@@ -22,6 +22,7 @@
#include "neon_pgversioncompat.h"
#include "access/parallel.h"
#include "access/xlog.h"
#include "funcapi.h"
#include "miscadmin.h"
#include "pagestore_client.h"
@@ -30,22 +31,28 @@
#include "port/pg_iovec.h"
#include "postmaster/bgworker.h"
#include RELFILEINFO_HDR
#include "replication/message.h"
#include "storage/buf_internals.h"
#include "storage/fd.h"
#include "storage/ipc.h"
#include "storage/latch.h"
#include "storage/lwlock.h"
#include "storage/pg_shmem.h"
#include "tcop/tcopprot.h"
#include "utils/builtins.h"
#include "utils/dynahash.h"
#include "utils/guc.h"
#if PG_VERSION_NUM >= 150000
#include "access/xlogrecovery.h"
#endif
#include "hll.h"
#include "bitmap.h"
#include "neon.h"
#include "neon_perf_counters.h"
#define CriticalAssert(cond) do if (!(cond)) elog(PANIC, "Assertion %s failed at %s:%d: ", #cond, __FILE__, __LINE__); while (0)
#define CriticalAssert(cond) do if (!(cond)) elog(PANIC, "LFC: assertion %s failed at %s:%d: ", #cond, __FILE__, __LINE__); while (0)
/*
* Local file cache is used to temporary store relations pages in local file system.
@@ -100,7 +107,9 @@ typedef struct FileCacheEntry
BufferTag key;
uint32 hash;
uint32 offset;
uint32 access_count;
uint32 access_count : 30;
uint32 prewarm_requested : 1; /* entry should be filled by prewarm */
uint32 prewarm_started : 1; /* chunk is written by lfc_prewarm */
uint32 bitmap[CHUNK_BITMAP_SIZE];
dlist_node list_node; /* LRU/holes list node */
} FileCacheEntry;
@@ -118,26 +127,57 @@ typedef struct FileCacheControl
uint64 writes; /* number of writes issued */
uint64 time_read; /* time spent reading (us) */
uint64 time_write; /* time spent writing (us) */
uint32 prewarm_total_chunks;
uint32 prewarm_curr_chunk;
uint32 prewarmed_pages;
uint32 skipped_pages;
dlist_head lru; /* double linked list for LRU replacement
* algorithm */
dlist_head holes; /* double linked list of punched holes */
HyperLogLogState wss_estimation; /* estimation of working set size */
} FileCacheControl;
typedef struct FileCacheStateEntry
{
BufferTag key;
uint32 bitmap[CHUNK_BITMAP_SIZE];
} FileCacheStateEntry;
static HTAB *lfc_hash;
static int lfc_desc = 0;
static LWLockId lfc_lock;
static int lfc_max_size;
static int lfc_size_limit;
static int lfc_prewarm_limit;
static int lfc_prewarm_batch;
static char *lfc_path;
static FileCacheControl *lfc_ctl;
static shmem_startup_hook_type prev_shmem_startup_hook;
#if PG_VERSION_NUM>=150000
static shmem_request_hook_type prev_shmem_request_hook;
#endif
static CustomCheckpointHookType PrevCheckpointHook;
#define LFC_ENABLED() (lfc_ctl->limit != 0)
PGDLLEXPORT void LfcPrewarmMain(Datum main_arg);
static void
LfcCheckpointHook(int flags)
{
if (flags & CHECKPOINT_IS_SHUTDOWN)
{
lfc_save_state();
}
if (PrevCheckpointHook)
{
PrevCheckpointHook(flags);
}
}
/*
* Local file cache is optional and Neon can work without it.
* In case of any any errors with this cache, we should disable it but to not throw error.
@@ -149,7 +189,7 @@ lfc_disable(char const *op)
{
int fd;
elog(WARNING, "Failed to %s local file cache at %s: %m, disabling local file cache", op, lfc_path);
elog(WARNING, "LFC: failed to %s local file cache at %s: %m, disabling local file cache", op, lfc_path);
/* Invalidate hash */
LWLockAcquire(lfc_lock, LW_EXCLUSIVE);
@@ -184,7 +224,7 @@ lfc_disable(char const *op)
pgstat_report_wait_end();
if (rc < 0)
elog(WARNING, "Failed to truncate local file cache %s: %m", lfc_path);
elog(WARNING, "LFC: failed to truncate local file cache %s: %m", lfc_path);
}
}
@@ -196,7 +236,7 @@ lfc_disable(char const *op)
fd = BasicOpenFile(lfc_path, O_RDWR | O_CREAT | O_TRUNC);
if (fd < 0)
elog(WARNING, "Failed to recreate local file cache %s: %m", lfc_path);
elog(WARNING, "LFC: failed to recreate local file cache %s: %m", lfc_path);
else
close(fd);
@@ -236,6 +276,17 @@ lfc_ensure_opened(void)
return enabled;
}
PGDLLEXPORT void
LfcPrewarmMain(Datum main_arg)
{
pqsignal(SIGTERM, die);
BackgroundWorkerUnblockSignals();
lfc_load_pages();
}
static void
lfc_shmem_startup(void)
{
@@ -267,14 +318,7 @@ lfc_shmem_startup(void)
n_chunks + 1, n_chunks + 1,
&info,
HASH_ELEM | HASH_BLOBS);
lfc_ctl->generation = 0;
lfc_ctl->size = 0;
lfc_ctl->used = 0;
lfc_ctl->hits = 0;
lfc_ctl->misses = 0;
lfc_ctl->writes = 0;
lfc_ctl->time_read = 0;
lfc_ctl->time_write = 0;
memset(lfc_ctl, 0, sizeof *lfc_ctl);
dlist_init(&lfc_ctl->lru);
dlist_init(&lfc_ctl->holes);
@@ -285,7 +329,7 @@ lfc_shmem_startup(void)
fd = BasicOpenFile(lfc_path, O_RDWR | O_CREAT | O_TRUNC);
if (fd < 0)
{
elog(WARNING, "Failed to create local file cache %s: %m", lfc_path);
elog(WARNING, "LFC: failed to create local file cache %s: %m", lfc_path);
lfc_ctl->limit = 0;
}
else
@@ -295,6 +339,9 @@ lfc_shmem_startup(void)
}
}
LWLockRelease(AddinShmemInitLock);
PrevCheckpointHook = CustomCheckpointHook;
CustomCheckpointHook = LfcCheckpointHook;
}
static void
@@ -327,7 +374,7 @@ lfc_check_limit_hook(int *newval, void **extra, GucSource source)
{
if (*newval > lfc_max_size)
{
elog(ERROR, "neon.file_cache_size_limit can not be larger than neon.max_file_cache_size");
elog(ERROR, "LFC: neon.file_cache_size_limit can not be larger than neon.max_file_cache_size");
return false;
}
return true;
@@ -436,6 +483,32 @@ lfc_init(void)
NULL,
NULL);
DefineCustomIntVariable("neon.file_cache_prewarm_limit",
"Maximal number of prewarmed pages",
NULL,
&lfc_prewarm_limit,
0, /* disabled by default */
0,
INT_MAX,
PGC_SIGHUP,
0,
NULL,
NULL,
NULL);
DefineCustomIntVariable("neon.file_cache_prewarm_batch",
"Number of pages retrivied by prewarm from page server",
NULL,
&lfc_prewarm_batch,
64,
1,
INT_MAX,
PGC_SIGHUP,
0,
NULL,
NULL,
NULL);
if (lfc_max_size == 0)
return;
@@ -447,8 +520,326 @@ lfc_init(void)
#else
lfc_shmem_request();
#endif
if (lfc_prewarm_limit != 0)
{
BackgroundWorker bgw;
memset(&bgw, 0, sizeof(bgw));
bgw.bgw_flags = BGWORKER_SHMEM_ACCESS;
bgw.bgw_start_time = BgWorkerStart_ConsistentState;
snprintf(bgw.bgw_library_name, BGW_MAXLEN, "neon");
snprintf(bgw.bgw_function_name, BGW_MAXLEN, "LfcPrewarmMain");
snprintf(bgw.bgw_name, BGW_MAXLEN, "LFC prewarm");
snprintf(bgw.bgw_type, BGW_MAXLEN, "LFC prewarm");
RegisterBackgroundWorker(&bgw);
}
}
static FileCacheStateEntry*
lfc_get_state(size_t* n_entries)
{
size_t max_entries = *n_entries;
size_t i = 0;
FileCacheStateEntry* fs = (FileCacheStateEntry*)palloc(sizeof(FileCacheStateEntry) * max_entries);
LWLockAcquire(lfc_lock, LW_SHARED);
if (LFC_ENABLED())
{
dlist_iter iter;
dlist_reverse_foreach(iter, &lfc_ctl->lru)
{
FileCacheEntry *entry = dlist_container(FileCacheEntry, list_node, iter.cur);
memcpy(&fs[i].key, &entry->key, sizeof entry->key);
memcpy(fs[i].bitmap, entry->bitmap, sizeof entry->bitmap);
if (++i == max_entries)
break;
}
elog(LOG, "LFC: save state of %ld chunks", (long)i);
}
LWLockRelease(lfc_lock);
*n_entries = i;
return fs;
}
/*
* Save state of local file cache as AUX file. Size of saved state is limited by lfc_prewarm_limit.
* This function saves first mostrecently used pages.
* It is expected to be called at shutdown checkpoint by checkpointer.
*/
void
lfc_save_state(void)
{
size_t n_entries = lfc_prewarm_limit;
FileCacheStateEntry* fs;
if (n_entries == 0)
return;
fs = lfc_get_state(&n_entries);
if (n_entries != 0)
{
#if PG_MAJORVERSION_NUM < 17
XLogFlush(LogLogicalMessage("neon-file:lfc.state", (char const*)fs, sizeof(FileCacheStateEntry) * n_entries, false));
#else
LogLogicalMessage("neon-file:lfc.state", (char const*)fs, sizeof(FileCacheStateEntry) * n_entries, false, true);
#endif
}
pfree(fs);
}
/*
* Prewarm LFC cache to the specified state.
*
* Prewarming can interfere with accesses to the pages by other backends. Usually access to LFC is protected by shared buffers: when Postgres
* is reading page, it pins shared buffer and enforces that only one backend is reading it, while other are waiting for read completion.
*
* But it is not true for prewarming: backend can fetch page itself, modify and then write it to LFC. At the
* same time `lfc_prewarm` tries to write deteriorated image of this page in LFC. To increase concurrency, access to LFC files (both read and write)
* is performed without holding locks. So it can happen that two or more processes write different content to the same location in the LFC file.
* Certainly we can not rely on disk content in this case.
*
* To solve this problem we use two flags in LFC entry: `prewarm_requested` and `prewarm_started`. First is set before prewarm is actually started.
* `lfc_prewarm` writes to LFC file only if this flag is set. This flag is cleared if any other backend performs write to this LFC chunk.
* In this case data loaded by `lfc_prewarm` is considered to be deteriorated and should be just ignored.
*
* But as far as write to LFC is performed without holding lock, there is no guarantee that no such write is in progress.
* This is why second flag is used: `prewarm_started`. It is set by `lfc_prewarm` when is starts writing page and cleared when write is completed.
* Any other backend writing to LFC should abandon it's write to LFC file (just not mark page as loaded in bitmap) if this flag is set.
* So neither `lfc_prewarm`, neither backend are saving page in LFC in this case - it is just skipped.
*/
static void
lfc_prewarm(FileCacheStateEntry* fs, size_t n_entries)
{
ssize_t rc;
size_t snd_idx = 0, rcv_idx = 0;
size_t n_sent = 0, n_received = 0;
FileCacheEntry *entry;
uint64 generation;
uint32 entry_offset;
uint32 hash;
size_t i;
bool found;
int shard_no;
if (!lfc_ensure_opened())
return;
if (n_entries == 0 || fs == NULL)
{
elog(LOG, "LFC: prewarm is disabled");
return;
}
LWLockAcquire(lfc_lock, LW_EXCLUSIVE);
/* Do not prewarm more entries than LFC limit */
if (lfc_ctl->limit <= lfc_ctl->size)
{
LWLockRelease(lfc_lock);
return;
}
if (n_entries > lfc_ctl->limit - lfc_ctl->size)
{
n_entries = lfc_ctl->limit - lfc_ctl->size;
}
/* Initialize fields used to track prewarming progress */
lfc_ctl->prewarm_total_chunks = n_entries;
lfc_ctl->prewarm_curr_chunk = 0;
/*
* Load LFC state and add entries in hash table.
* It is needed to track modification of prewarmed pages.
* All such entries have `prewarm_requested` flag set. When entry is updated (some backed reads or writes
* some pages from this chunk), then `prewarm_requested` flag is cleared, prohibiting prewarm of this chunk.
* It prevents overwritting page updated or loaded by backend with older one, loaded by prewarm.
*/
for (i = 0; i < n_entries; i++)
{
hash = get_hash_value(lfc_hash, &fs[i].key);
entry = hash_search_with_hash_value(lfc_hash, &fs[i].key, hash, HASH_ENTER, &found);
/* Do not prewarm chunks which are already present in LFC */
if (!found)
{
entry->offset = lfc_ctl->size++;
entry->hash = hash;
entry->access_count = 0;
entry->prewarm_requested = true;
entry->prewarm_started = false;
memset(entry->bitmap, 0, sizeof entry->bitmap);
/* Most recently visted pages are stored first */
dlist_push_head(&lfc_ctl->lru, &entry->list_node);
lfc_ctl->used += 1;
}
}
LWLockRelease(lfc_lock);
elog(LOG, "LFC: start loading %ld chunks", (long)n_entries);
while (true)
{
size_t chunk_no = snd_idx / BLOCKS_PER_CHUNK;
size_t offs_in_chunk = snd_idx % BLOCKS_PER_CHUNK;
if (chunk_no < n_entries)
{
if (fs[chunk_no].bitmap[offs_in_chunk >> 5] & (1 << (offs_in_chunk & 31)))
{
/*
* In case of prewarming replica we should be careful not to load too new version
* of the page - with LSN larger than current replay LSN.
* At primary we are always loading latest version.
*/
XLogRecPtr req_lsn = RecoveryInProgress() ? GetXLogReplayRecPtr(NULL) : UINT64_MAX;
NeonGetPageRequest request = {
.req.tag = T_NeonGetPageRequest,
/* lsn and not_modified_since are filled in below */
.rinfo = BufTagGetNRelFileInfo(fs[chunk_no].key),
.forknum = fs[chunk_no].key.forkNum,
.blkno = fs[chunk_no].key.blockNum + offs_in_chunk,
.req.lsn = req_lsn,
.req.not_modified_since = 0
};
shard_no = get_shard_number(&fs[chunk_no].key);
while (!page_server->send(shard_no, (NeonRequest *) &request)
|| !page_server->flush(shard_no))
{
/* do nothing */
}
n_sent += 1;
}
snd_idx += 1;
}
if (n_sent >= n_received + lfc_prewarm_batch || chunk_no == n_entries)
{
NeonResponse * resp;
do
{
chunk_no = rcv_idx / BLOCKS_PER_CHUNK;
offs_in_chunk = rcv_idx % BLOCKS_PER_CHUNK;
rcv_idx += 1;
} while (!(fs[chunk_no].bitmap[offs_in_chunk >> 5] & (1 << (offs_in_chunk & 31))));
shard_no = get_shard_number(&fs[chunk_no].key);
resp = page_server->receive(shard_no);
lfc_ctl->prewarm_curr_chunk = chunk_no;
if (resp->tag != T_NeonGetPageResponse)
{
elog(LOG, "LFC: unexpected response type: %d", resp->tag);
return;
}
hash = get_hash_value(lfc_hash, &fs[chunk_no].key);
LWLockAcquire(lfc_lock, LW_EXCLUSIVE);
entry = hash_search_with_hash_value(lfc_hash, &fs[chunk_no].key, hash, HASH_FIND, NULL);
if (entry != NULL && entry->prewarm_requested)
{
/* Unlink entry from LRU list to pin it for the duration of IO operation */
if (entry->access_count++ == 0)
dlist_delete(&entry->list_node);
generation = lfc_ctl->generation;
entry_offset = entry->offset;
Assert(!entry->prewarm_started);
entry->prewarm_started = true;
LWLockRelease(lfc_lock);
rc = pwrite(lfc_desc, ((NeonGetPageResponse*)resp)->page, BLCKSZ, ((off_t) entry_offset * BLOCKS_PER_CHUNK + offs_in_chunk) * BLCKSZ);
if (rc != BLCKSZ)
{
lfc_disable("write");
break;
}
else
{
LWLockAcquire(lfc_lock, LW_EXCLUSIVE);
if (lfc_ctl->generation == generation)
{
CriticalAssert(LFC_ENABLED());
if (--entry->access_count == 0)
dlist_push_tail(&lfc_ctl->lru, &entry->list_node);
if (entry->prewarm_requested)
{
lfc_ctl->used_pages += 1 - ((entry->bitmap[offs_in_chunk >> 5] >> (offs_in_chunk & 31)) & 1);
entry->bitmap[offs_in_chunk >> 5] |= 1 << (offs_in_chunk & 31);
lfc_ctl->prewarmed_pages += 1;
}
else
{
lfc_ctl->skipped_pages += 1;
}
Assert(entry->prewarm_started);
entry->prewarm_started = false;
}
LWLockRelease(lfc_lock);
}
}
else
{
Assert(!entry || !entry->prewarm_started);
lfc_ctl->skipped_pages += 1;
LWLockRelease(lfc_lock);
}
if (++n_received == n_sent && snd_idx >= n_entries * BLOCKS_PER_CHUNK)
{
break;
}
}
}
Assert(n_sent == n_received);
lfc_ctl->prewarm_curr_chunk = n_entries;
elog(LOG, "LFC: complete prewarming: loaded %ld pages", (long)n_received);
}
/*
* Load pages from LFC state saved in AUX file.
*/
void
lfc_load_pages(void)
{
int fd;
FileCacheStateEntry *fs;
ssize_t rc;
size_t max_entries = lfc_prewarm_limit;
fd = OpenTransientFile("lfc.state", O_RDONLY | PG_BINARY);
if (fd < 0)
{
elog(LOG, "LFC: state file is missing");
return;
}
fs = (FileCacheStateEntry*)palloc(sizeof(FileCacheStateEntry) * max_entries);
rc = read(fd, fs, sizeof(FileCacheStateEntry) * max_entries);
if (rc <= 0)
{
elog(LOG, "LFC: Failed to read state file: %m");
CloseTransientFile(fd);
}
else
{
CloseTransientFile(fd);
elog(LOG, "LFC: read state with %lu entries", (long)(rc / sizeof(FileCacheStateEntry)));
lfc_prewarm(fs, rc / sizeof(FileCacheStateEntry));
}
pfree(fs);
}
/*
* Check if page is present in the cache.
* Returns true if page is found in local cache.
@@ -616,6 +1007,7 @@ lfc_evict(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno)
/* remove the page from the cache */
entry->bitmap[chunk_offs >> 5] &= ~(1 << (chunk_offs & (32 - 1)));
entry->prewarm_requested = false; /* prohibit prewarm of this LFC entry */
if (entry->access_count == 0)
{
@@ -861,7 +1253,7 @@ lfc_writev(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
CriticalAssert(BufTagGetRelNumber(&tag) != InvalidRelFileNumber);
/*
/*
* For every chunk that has blocks we're interested in, we
* 1. get the chunk header
* 2. Check if the chunk actually has the blocks we're interested in
@@ -899,6 +1291,17 @@ lfc_writev(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
if (found)
{
if (entry->prewarm_started)
{
/*
* Some page of this chunk is currently written by `lfc_prewarm`.
* We should give-up not to interfere with it.
* But clearing `prewarm_requested` flag also will not allow `lfc_prewarm` to fix it result.
*/
entry->prewarm_requested = false;
LWLockRelease(lfc_lock);
return;
}
/*
* Unlink entry from LRU list to pin it for the duration of IO
* operation
@@ -928,7 +1331,7 @@ lfc_writev(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
{
/* Cache overflow: evict least recently used chunk */
FileCacheEntry *victim = dlist_container(FileCacheEntry, list_node, dlist_pop_head_node(&lfc_ctl->lru));
for (int i = 0; i < BLOCKS_PER_CHUNK; i++)
{
lfc_ctl->used_pages -= (victim->bitmap[i >> 5] >> (i & 31)) & 1;
@@ -944,10 +1347,10 @@ lfc_writev(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
FileCacheEntry *hole = dlist_container(FileCacheEntry, list_node, dlist_pop_head_node(&lfc_ctl->holes));
uint32 offset = hole->offset;
bool hole_found;
hash_search_with_hash_value(lfc_hash, &hole->key, hole->hash, HASH_REMOVE, &hole_found);
CriticalAssert(hole_found);
lfc_ctl->used += 1;
entry->offset = offset; /* reuse the hole */
}
@@ -959,9 +1362,11 @@ lfc_writev(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
}
entry->access_count = 1;
entry->hash = hash;
entry->prewarm_started = false;
memset(entry->bitmap, 0, sizeof entry->bitmap);
}
entry->prewarm_requested = false; /* prohibit prewarm if LFC entry is updated by some backend */
generation = lfc_ctl->generation;
entry_offset = entry->offset;
LWLockRelease(lfc_lock);
@@ -1334,3 +1739,74 @@ approximate_working_set_size(PG_FUNCTION_ARGS)
}
PG_RETURN_NULL();
}
PG_FUNCTION_INFO_V1(save_local_cache_state);
Datum
save_local_cache_state(PG_FUNCTION_ARGS)
{
lfc_save_state();
PG_RETURN_NULL();
}
PG_FUNCTION_INFO_V1(get_local_cache_state);
Datum
get_local_cache_state(PG_FUNCTION_ARGS)
{
size_t n_entries = PG_ARGISNULL(0) ? lfc_prewarm_limit : PG_GETARG_INT32(0);
FileCacheStateEntry* fs = lfc_get_state(&n_entries);
size_t size_in_bytes = sizeof(FileCacheStateEntry) * n_entries;
bytea* res = (bytea*)palloc(VARHDRSZ + size_in_bytes);
SET_VARSIZE(res, VARHDRSZ + size_in_bytes);
memcpy(VARDATA(res), fs, size_in_bytes);
pfree(fs);
PG_RETURN_BYTEA_P(res);
}
PG_FUNCTION_INFO_V1(prewarm_local_cache);
Datum
prewarm_local_cache(PG_FUNCTION_ARGS)
{
bytea* state = PG_GETARG_BYTEA_PP(0);
uint32 n_entries = VARSIZE_ANY_EXHDR(state);
FileCacheStateEntry* fs = (FileCacheStateEntry*)VARDATA_ANY(state);
lfc_prewarm(fs, n_entries);
PG_RETURN_NULL();
}
PG_FUNCTION_INFO_V1(get_prewarm_info);
Datum
get_prewarm_info(PG_FUNCTION_ARGS)
{
Datum values[4];
bool nulls[4];
TupleDesc tupdesc;
if (lfc_size_limit == 0)
PG_RETURN_NULL();
tupdesc = CreateTemplateTupleDesc(4);
TupleDescInitEntry(tupdesc, (AttrNumber) 1, "total_chunks", INT4OID, -1, 0);
TupleDescInitEntry(tupdesc, (AttrNumber) 2, "curr_chunk", INT4OID, -1, 0);
TupleDescInitEntry(tupdesc, (AttrNumber) 3, "prewarmed_pages", INT4OID, -1, 0);
TupleDescInitEntry(tupdesc, (AttrNumber) 4, "skipped_pages", INT4OID, -1, 0);
tupdesc = BlessTupleDesc(tupdesc);
MemSet(nulls, 0, sizeof(nulls));
LWLockAcquire(lfc_lock, LW_SHARED);
values[0] = Int32GetDatum(lfc_ctl->prewarm_total_chunks);
values[1] = Int32GetDatum(lfc_ctl->prewarm_curr_chunk);
values[2] = Int32GetDatum(lfc_ctl->prewarmed_pages);
values[3] = Int32GetDatum(lfc_ctl->skipped_pages);
LWLockRelease(lfc_lock);
PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls)));
}

View File

@@ -1,253 +0,0 @@
#include <limits.h>
#include <string.h>
#include <dirent.h>
#include <signal.h>
#include "postgres.h"
#include "miscadmin.h"
#include "postmaster/bgworker.h"
#include "postmaster/interrupt.h"
#include "replication/slot.h"
#include "storage/fd.h"
#include "storage/procsignal.h"
#include "tcop/tcopprot.h"
#include "utils/guc.h"
#include "utils/wait_event.h"
#include "logical_replication_monitor.h"
#define LS_MONITOR_CHECK_INTERVAL 10000 /* ms */
static int logical_replication_max_snap_files = 300;
PGDLLEXPORT void LogicalSlotsMonitorMain(Datum main_arg);
static int
LsnDescComparator(const void *a, const void *b)
{
XLogRecPtr lsn1 = *((const XLogRecPtr *) a);
XLogRecPtr lsn2 = *((const XLogRecPtr *) b);
if (lsn1 < lsn2)
return 1;
else if (lsn1 == lsn2)
return 0;
else
return -1;
}
/*
* Look at .snap files and calculate minimum allowed restart_lsn of slot so that
* next gc would leave not more than logical_replication_max_snap_files; all
* slots having lower restart_lsn should be dropped.
*/
static XLogRecPtr
get_num_snap_files_lsn_threshold(void)
{
DIR *dirdesc;
struct dirent *de;
char *snap_path = "pg_logical/snapshots/";
int lsns_allocated = 1024;
int lsns_num = 0;
XLogRecPtr *lsns;
XLogRecPtr cutoff;
if (logical_replication_max_snap_files < 0)
return 0;
lsns = palloc(sizeof(XLogRecPtr) * lsns_allocated);
/* find all .snap files and get their lsns */
dirdesc = AllocateDir(snap_path);
while ((de = ReadDir(dirdesc, snap_path)) != NULL)
{
XLogRecPtr lsn;
uint32 hi;
uint32 lo;
if (strcmp(de->d_name, ".") == 0 ||
strcmp(de->d_name, "..") == 0)
continue;
if (sscanf(de->d_name, "%X-%X.snap", &hi, &lo) != 2)
{
ereport(LOG,
(errmsg("could not parse file name as .snap file \"%s\"", de->d_name)));
continue;
}
lsn = ((uint64) hi) << 32 | lo;
elog(DEBUG5, "found snap file %X/%X", LSN_FORMAT_ARGS(lsn));
if (lsns_allocated == lsns_num)
{
lsns_allocated *= 2;
lsns = repalloc(lsns, sizeof(XLogRecPtr) * lsns_allocated);
}
lsns[lsns_num++] = lsn;
}
/* sort by lsn desc */
qsort(lsns, lsns_num, sizeof(XLogRecPtr), LsnDescComparator);
/* and take cutoff at logical_replication_max_snap_files */
if (logical_replication_max_snap_files > lsns_num)
cutoff = 0;
/* have less files than cutoff */
else
{
cutoff = lsns[logical_replication_max_snap_files - 1];
elog(LOG, "ls_monitor: dropping logical slots with restart_lsn lower %X/%X, found %d .snap files, limit is %d",
LSN_FORMAT_ARGS(cutoff), lsns_num, logical_replication_max_snap_files);
}
pfree(lsns);
FreeDir(dirdesc);
return cutoff;
}
void
InitLogicalReplicationMonitor(void)
{
BackgroundWorker bgw;
DefineCustomIntVariable(
"neon.logical_replication_max_snap_files",
"Maximum allowed logical replication .snap files. When exceeded, slots are dropped until the limit is met. -1 disables the limit.",
NULL,
&logical_replication_max_snap_files,
300, -1, INT_MAX,
PGC_SIGHUP,
0,
NULL, NULL, NULL);
memset(&bgw, 0, sizeof(bgw));
bgw.bgw_flags = BGWORKER_SHMEM_ACCESS;
bgw.bgw_start_time = BgWorkerStart_RecoveryFinished;
snprintf(bgw.bgw_library_name, BGW_MAXLEN, "neon");
snprintf(bgw.bgw_function_name, BGW_MAXLEN, "LogicalSlotsMonitorMain");
snprintf(bgw.bgw_name, BGW_MAXLEN, "Logical replication monitor");
snprintf(bgw.bgw_type, BGW_MAXLEN, "Logical replication monitor");
bgw.bgw_restart_time = 5;
bgw.bgw_notify_pid = 0;
bgw.bgw_main_arg = (Datum) 0;
RegisterBackgroundWorker(&bgw);
}
/*
* Unused logical replication slots pins WAL and prevents deletion of snapshots.
* WAL bloat is guarded by max_slot_wal_keep_size; this bgw removes slots which
* need too many .snap files.
*/
void
LogicalSlotsMonitorMain(Datum main_arg)
{
/* Establish signal handlers. */
pqsignal(SIGUSR1, procsignal_sigusr1_handler);
pqsignal(SIGHUP, SignalHandlerForConfigReload);
pqsignal(SIGTERM, die);
BackgroundWorkerUnblockSignals();
for (;;)
{
XLogRecPtr cutoff_lsn;
/* In case of a SIGHUP, just reload the configuration. */
if (ConfigReloadPending)
{
ConfigReloadPending = false;
ProcessConfigFile(PGC_SIGHUP);
}
/*
* If there are too many .snap files, just drop all logical slots to
* prevent aux files bloat.
*/
cutoff_lsn = get_num_snap_files_lsn_threshold();
if (cutoff_lsn > 0)
{
for (int i = 0; i < max_replication_slots; i++)
{
char slot_name[NAMEDATALEN];
ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
XLogRecPtr restart_lsn;
/* find the name */
LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
/* Consider only logical repliction slots */
if (!s->in_use || !SlotIsLogical(s))
{
LWLockRelease(ReplicationSlotControlLock);
continue;
}
/* do we need to drop it? */
SpinLockAcquire(&s->mutex);
restart_lsn = s->data.restart_lsn;
SpinLockRelease(&s->mutex);
if (restart_lsn >= cutoff_lsn)
{
LWLockRelease(ReplicationSlotControlLock);
continue;
}
strlcpy(slot_name, s->data.name.data, NAMEDATALEN);
elog(LOG, "ls_monitor: dropping slot %s with restart_lsn %X/%X below horizon %X/%X",
slot_name, LSN_FORMAT_ARGS(restart_lsn), LSN_FORMAT_ARGS(cutoff_lsn));
LWLockRelease(ReplicationSlotControlLock);
/* now try to drop it, killing owner before if any */
for (;;)
{
pid_t active_pid;
SpinLockAcquire(&s->mutex);
active_pid = s->active_pid;
SpinLockRelease(&s->mutex);
if (active_pid == 0)
{
/*
* Slot is releasted, try to drop it. Though of course
* it could have been reacquired, so drop can ERROR
* out. Similarly it could have been dropped in the
* meanwhile.
*
* In principle we could remove pg_try/pg_catch, that
* would restart the whole bgworker.
*/
ConditionVariableCancelSleep();
PG_TRY();
{
ReplicationSlotDrop(slot_name, true);
elog(LOG, "ls_monitor: slot %s dropped", slot_name);
}
PG_CATCH();
{
/* log ERROR and reset elog stack */
EmitErrorReport();
FlushErrorState();
elog(LOG, "ls_monitor: failed to drop slot %s", slot_name);
}
PG_END_TRY();
break;
}
else
{
/* kill the owner and wait for release */
elog(LOG, "ls_monitor: killing slot %s owner %d", slot_name, active_pid);
(void) kill(active_pid, SIGTERM);
/* We shouldn't get stuck, but to be safe add timeout. */
ConditionVariableTimedSleep(&s->active_cv, 1000, WAIT_EVENT_REPLICATION_SLOT_DROP);
}
}
}
}
(void) WaitLatch(MyLatch,
WL_LATCH_SET | WL_EXIT_ON_PM_DEATH | WL_TIMEOUT,
LS_MONITOR_CHECK_INTERVAL,
PG_WAIT_EXTENSION);
ResetLatch(MyLatch);
CHECK_FOR_INTERRUPTS();
}
}

View File

@@ -1,6 +0,0 @@
#ifndef __NEON_LOGICAL_REPLICATION_MONITOR_H__
#define __NEON_LOGICAL_REPLICATION_MONITOR_H__
void InitLogicalReplicationMonitor(void);
#endif

View File

@@ -0,0 +1,28 @@
\echo Use "ALTER EXTENSION neon UPDATE TO '1.6'" to load this file. \quit
CREATE FUNCTION save_local_cache_state()
RETURNS void
AS 'MODULE_PATHNAME', 'save_local_cache_state'
LANGUAGE C STRICT
PARALLEL UNSAFE;
CREATE FUNCTION get_prewarm_info(out total_chunks integer, out curr_chunk integer, out prewarmed_pages integer, out skipped_pages integer)
RETURNS record
AS 'MODULE_PATHNAME', 'get_prewarm_info'
LANGUAGE C STRICT
PARALLEL SAFE;
CREATE FUNCTION get_local_cache_state(max_chunks integer default null)
RETURNS bytea
AS 'MODULE_PATHNAME', 'get_local_cache_state'
LANGUAGE C
PARALLEL UNSAFE;
CREATE FUNCTION prewarm_local_cache(state bytea)
RETURNS void
AS 'MODULE_PATHNAME', 'prewarm_local_cache'
LANGUAGE C STRICT
PARALLEL UNSAFE;

View File

@@ -0,0 +1,9 @@
DROP FUNCTION IF EXISTS save_local_cache_state();
DROP FUNCTION IF EXISTS get_prewarm_info(out total_chunks integer, out curr_chunk integer, out prewarmed_pages integer, out skipped_pages integer);
DROP FUNCTION IF EXISTS get_local_cache_state(max_chunks integer);
DROP FUNCTION IF EXISTS prewarm_local_cache(state bytea);

View File

@@ -14,22 +14,32 @@
#include "miscadmin.h"
#include "access/subtrans.h"
#include "access/twophase.h"
#include "access/xact.h"
#include "access/xlog.h"
#include "storage/buf_internals.h"
#include "storage/bufmgr.h"
#include "catalog/pg_type.h"
#include "postmaster/bgworker.h"
#include "postmaster/interrupt.h"
#include "replication/logical.h"
#include "replication/slot.h"
#include "replication/walsender.h"
#include "storage/proc.h"
#include "storage/procsignal.h"
#include "tcop/tcopprot.h"
#include "funcapi.h"
#include "access/htup_details.h"
#include "utils/builtins.h"
#include "utils/pg_lsn.h"
#include "utils/guc.h"
#include "utils/guc_tables.h"
#include "utils/wait_event.h"
#include "extension_server.h"
#include "neon.h"
#include "walproposer.h"
#include "pagestore_client.h"
#include "control_plane_connector.h"
#include "logical_replication_monitor.h"
#include "walsender_hooks.h"
#if PG_MAJORVERSION_NUM >= 16
#include "storage/ipc.h"
@@ -38,6 +48,7 @@
PG_MODULE_MAGIC;
void _PG_init(void);
static int logical_replication_max_snap_files = 300;
static int running_xacts_overflow_policy;
@@ -71,6 +82,237 @@ static const struct config_enum_entry running_xacts_overflow_policies[] = {
{NULL, 0, false}
};
static void
InitLogicalReplicationMonitor(void)
{
BackgroundWorker bgw;
DefineCustomIntVariable(
"neon.logical_replication_max_snap_files",
"Maximum allowed logical replication .snap files. When exceeded, slots are dropped until the limit is met. -1 disables the limit.",
NULL,
&logical_replication_max_snap_files,
300, -1, INT_MAX,
PGC_SIGHUP,
0,
NULL, NULL, NULL);
memset(&bgw, 0, sizeof(bgw));
bgw.bgw_flags = BGWORKER_SHMEM_ACCESS;
bgw.bgw_start_time = BgWorkerStart_RecoveryFinished;
snprintf(bgw.bgw_library_name, BGW_MAXLEN, "neon");
snprintf(bgw.bgw_function_name, BGW_MAXLEN, "LogicalSlotsMonitorMain");
snprintf(bgw.bgw_name, BGW_MAXLEN, "Logical replication monitor");
snprintf(bgw.bgw_type, BGW_MAXLEN, "Logical replication monitor");
bgw.bgw_restart_time = 5;
bgw.bgw_notify_pid = 0;
bgw.bgw_main_arg = (Datum) 0;
RegisterBackgroundWorker(&bgw);
}
static int
LsnDescComparator(const void *a, const void *b)
{
XLogRecPtr lsn1 = *((const XLogRecPtr *) a);
XLogRecPtr lsn2 = *((const XLogRecPtr *) b);
if (lsn1 < lsn2)
return 1;
else if (lsn1 == lsn2)
return 0;
else
return -1;
}
/*
* Look at .snap files and calculate minimum allowed restart_lsn of slot so that
* next gc would leave not more than logical_replication_max_snap_files; all
* slots having lower restart_lsn should be dropped.
*/
static XLogRecPtr
get_num_snap_files_lsn_threshold(void)
{
DIR *dirdesc;
struct dirent *de;
char *snap_path = "pg_logical/snapshots/";
int lsns_allocated = 1024;
int lsns_num = 0;
XLogRecPtr *lsns;
XLogRecPtr cutoff;
if (logical_replication_max_snap_files < 0)
return 0;
lsns = palloc(sizeof(XLogRecPtr) * lsns_allocated);
/* find all .snap files and get their lsns */
dirdesc = AllocateDir(snap_path);
while ((de = ReadDir(dirdesc, snap_path)) != NULL)
{
XLogRecPtr lsn;
uint32 hi;
uint32 lo;
if (strcmp(de->d_name, ".") == 0 ||
strcmp(de->d_name, "..") == 0)
continue;
if (sscanf(de->d_name, "%X-%X.snap", &hi, &lo) != 2)
{
ereport(LOG,
(errmsg("could not parse file name as .snap file \"%s\"", de->d_name)));
continue;
}
lsn = ((uint64) hi) << 32 | lo;
elog(DEBUG5, "found snap file %X/%X", LSN_FORMAT_ARGS(lsn));
if (lsns_allocated == lsns_num)
{
lsns_allocated *= 2;
lsns = repalloc(lsns, sizeof(XLogRecPtr) * lsns_allocated);
}
lsns[lsns_num++] = lsn;
}
/* sort by lsn desc */
qsort(lsns, lsns_num, sizeof(XLogRecPtr), LsnDescComparator);
/* and take cutoff at logical_replication_max_snap_files */
if (logical_replication_max_snap_files > lsns_num)
cutoff = 0;
/* have less files than cutoff */
else
{
cutoff = lsns[logical_replication_max_snap_files - 1];
elog(LOG, "ls_monitor: dropping logical slots with restart_lsn lower %X/%X, found %d .snap files, limit is %d",
LSN_FORMAT_ARGS(cutoff), lsns_num, logical_replication_max_snap_files);
}
pfree(lsns);
FreeDir(dirdesc);
return cutoff;
}
#define LS_MONITOR_CHECK_INTERVAL 10000 /* ms */
/*
* Unused logical replication slots pins WAL and prevents deletion of snapshots.
* WAL bloat is guarded by max_slot_wal_keep_size; this bgw removes slots which
* need too many .snap files.
*/
PGDLLEXPORT void
LogicalSlotsMonitorMain(Datum main_arg)
{
/* Establish signal handlers. */
pqsignal(SIGUSR1, procsignal_sigusr1_handler);
pqsignal(SIGHUP, SignalHandlerForConfigReload);
pqsignal(SIGTERM, die);
BackgroundWorkerUnblockSignals();
for (;;)
{
XLogRecPtr cutoff_lsn;
/* In case of a SIGHUP, just reload the configuration. */
if (ConfigReloadPending)
{
ConfigReloadPending = false;
ProcessConfigFile(PGC_SIGHUP);
}
/*
* If there are too many .snap files, just drop all logical slots to
* prevent aux files bloat.
*/
cutoff_lsn = get_num_snap_files_lsn_threshold();
if (cutoff_lsn > 0)
{
for (int i = 0; i < max_replication_slots; i++)
{
char slot_name[NAMEDATALEN];
ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
XLogRecPtr restart_lsn;
/* find the name */
LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
/* Consider only logical repliction slots */
if (!s->in_use || !SlotIsLogical(s))
{
LWLockRelease(ReplicationSlotControlLock);
continue;
}
/* do we need to drop it? */
SpinLockAcquire(&s->mutex);
restart_lsn = s->data.restart_lsn;
SpinLockRelease(&s->mutex);
if (restart_lsn >= cutoff_lsn)
{
LWLockRelease(ReplicationSlotControlLock);
continue;
}
strlcpy(slot_name, s->data.name.data, NAMEDATALEN);
elog(LOG, "ls_monitor: dropping slot %s with restart_lsn %X/%X below horizon %X/%X",
slot_name, LSN_FORMAT_ARGS(restart_lsn), LSN_FORMAT_ARGS(cutoff_lsn));
LWLockRelease(ReplicationSlotControlLock);
/* now try to drop it, killing owner before if any */
for (;;)
{
pid_t active_pid;
SpinLockAcquire(&s->mutex);
active_pid = s->active_pid;
SpinLockRelease(&s->mutex);
if (active_pid == 0)
{
/*
* Slot is releasted, try to drop it. Though of course
* it could have been reacquired, so drop can ERROR
* out. Similarly it could have been dropped in the
* meanwhile.
*
* In principle we could remove pg_try/pg_catch, that
* would restart the whole bgworker.
*/
ConditionVariableCancelSleep();
PG_TRY();
{
ReplicationSlotDrop(slot_name, true);
elog(LOG, "ls_monitor: slot %s dropped", slot_name);
}
PG_CATCH();
{
/* log ERROR and reset elog stack */
EmitErrorReport();
FlushErrorState();
elog(LOG, "ls_monitor: failed to drop slot %s", slot_name);
}
PG_END_TRY();
break;
}
else
{
/* kill the owner and wait for release */
elog(LOG, "ls_monitor: killing slot %s owner %d", slot_name, active_pid);
(void) kill(active_pid, SIGTERM);
/* We shouldn't get stuck, but to be safe add timeout. */
ConditionVariableTimedSleep(&s->active_cv, 1000, WAIT_EVENT_REPLICATION_SLOT_DROP);
}
}
}
}
(void) WaitLatch(MyLatch,
WL_LATCH_SET | WL_EXIT_ON_PM_DEATH | WL_TIMEOUT,
LS_MONITOR_CHECK_INTERVAL,
PG_WAIT_EXTENSION);
ResetLatch(MyLatch);
CHECK_FOR_INTERRUPTS();
}
}
/*
* XXX: These private to procarray.c, but we need them here.
*/
@@ -425,6 +667,7 @@ _PG_init(void)
SlotFuncs_Custom_XLogReaderRoutines = NeonOnDemandXLogReaderRoutines;
InitLogicalReplicationMonitor();
InitControlPlaneConnector();
pg_init_extension_server();

View File

@@ -276,6 +276,8 @@ extern int lfc_cache_containsv(NRelFileInfo rinfo, ForkNumber forkNum,
BlockNumber blkno, int nblocks, bits8 *bitmap);
extern void lfc_evict(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno);
extern void lfc_init(void);
extern void lfc_save_state(void);
extern void lfc_load_pages(void);
static inline bool
lfc_read(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,

10
poetry.lock generated
View File

@@ -1,4 +1,4 @@
# This file is automatically @generated by Poetry 1.8.3 and should not be changed by hand.
# This file is automatically @generated by Poetry 1.8.4 and should not be changed by hand.
[[package]]
name = "aiohappyeyeballs"
@@ -3118,13 +3118,13 @@ files = [
[[package]]
name = "werkzeug"
version = "3.0.6"
version = "3.0.3"
description = "The comprehensive WSGI web application library."
optional = false
python-versions = ">=3.8"
files = [
{file = "werkzeug-3.0.6-py3-none-any.whl", hash = "sha256:1bc0c2310d2fbb07b1dd1105eba2f7af72f322e1e455f2f93c993bee8c8a5f17"},
{file = "werkzeug-3.0.6.tar.gz", hash = "sha256:a8dd59d4de28ca70471a34cba79bed5f7ef2e036a76b3ab0835474246eb41f8d"},
{file = "werkzeug-3.0.3-py3-none-any.whl", hash = "sha256:fc9645dc43e03e4d630d23143a04a7f947a9a3b5727cd535fdfe155a17cc48c8"},
{file = "werkzeug-3.0.3.tar.gz", hash = "sha256:097e5bfda9f0aba8da6b8545146def481d06aa7d3266e7448e2cccf67dd8bd18"},
]
[package.dependencies]
@@ -3406,4 +3406,4 @@ cffi = ["cffi (>=1.11)"]
[metadata]
lock-version = "2.0"
python-versions = "^3.9"
content-hash = "0f4804119f417edf8e1fbd6d715d2e8d70ad731334fa9570304a2203f83339cf"
content-hash = "f52632571e34b0e51b059c280c35d6ff6f69f6a8c9586caca78282baf635be91"

View File

@@ -5,7 +5,6 @@ use std::time::{Duration, SystemTime};
use arc_swap::ArcSwapOption;
use dashmap::DashMap;
use jose_jwk::crypto::KeyInfo;
use reqwest::{redirect, Client};
use serde::de::Visitor;
use serde::{Deserialize, Deserializer};
use signature::Verifier;
@@ -25,7 +24,6 @@ const MIN_RENEW: Duration = Duration::from_secs(30);
const AUTO_RENEW: Duration = Duration::from_secs(300);
const MAX_RENEW: Duration = Duration::from_secs(3600);
const MAX_JWK_BODY_SIZE: usize = 64 * 1024;
const JWKS_USER_AGENT: &str = "neon-proxy";
/// How to get the JWT auth rules
pub(crate) trait FetchAuthRules: Clone + Send + Sync + 'static {
@@ -52,6 +50,7 @@ pub(crate) struct AuthRule {
pub(crate) role_names: Vec<RoleNameInt>,
}
#[derive(Default)]
pub struct JwkCache {
client: reqwest::Client,
@@ -358,20 +357,6 @@ impl JwkCache {
}
}
impl Default for JwkCache {
fn default() -> Self {
let client = Client::builder()
.user_agent(JWKS_USER_AGENT)
.redirect(redirect::Policy::none())
.build()
.expect("using &str and standard redirect::Policy");
JwkCache {
client,
map: DashMap::default(),
}
}
}
fn verify_ec_signature(data: &[u8], sig: &[u8], key: &jose_jwk::Ec) -> Result<(), JwtError> {
use ecdsa::Signature;
use signature::Verifier;

View File

@@ -32,7 +32,6 @@ use hyper_util::rt::TokioExecutor;
use hyper_util::server::conn::auto::Builder;
use rand::rngs::StdRng;
use rand::SeedableRng;
use sql_over_http::{uuid_to_header_value, NEON_REQUEST_ID};
use tokio::io::{AsyncRead, AsyncWrite};
use tokio::net::{TcpListener, TcpStream};
use tokio::time::timeout;
@@ -310,18 +309,7 @@ async fn connection_handler(
hyper_util::rt::TokioIo::new(conn),
hyper::service::service_fn(move |req: hyper::Request<Incoming>| {
// First HTTP request shares the same session ID
let mut session_id = session_id.take().unwrap_or_else(uuid::Uuid::new_v4);
if matches!(backend.auth_backend, crate::auth::Backend::Local(_)) {
// take session_id from request, if given.
if let Some(id) = req
.headers()
.get(&NEON_REQUEST_ID)
.and_then(|id| uuid::Uuid::try_parse_ascii(id.as_bytes()).ok())
{
session_id = id;
}
}
let session_id = session_id.take().unwrap_or_else(uuid::Uuid::new_v4);
// Cancel the current inflight HTTP request if the requets stream is closed.
// This is slightly different to `_cancel_connection` in that
@@ -347,15 +335,8 @@ async fn connection_handler(
.map_ok_or_else(api_error_into_response, |r| r),
);
async move {
let mut res = handler.await;
let res = handler.await;
cancel_request.disarm();
// add the session ID to the response
if let Ok(resp) = &mut res {
resp.headers_mut()
.append(&NEON_REQUEST_ID, uuid_to_header_value(session_id));
}
res
}
}),

View File

@@ -23,7 +23,6 @@ use typed_json::json;
use url::Url;
use urlencoding;
use utils::http::error::ApiError;
use uuid::Uuid;
use super::backend::{LocalProxyConnError, PoolingBackend};
use super::conn_pool::{AuthData, ConnInfoWithAuth};
@@ -64,8 +63,6 @@ enum Payload {
Batch(BatchQueryData),
}
pub(super) static NEON_REQUEST_ID: HeaderName = HeaderName::from_static("neon-request-id");
static CONN_STRING: HeaderName = HeaderName::from_static("neon-connection-string");
static RAW_TEXT_OUTPUT: HeaderName = HeaderName::from_static("neon-raw-text-output");
static ARRAY_MODE: HeaderName = HeaderName::from_static("neon-array-mode");
@@ -709,12 +706,6 @@ static HEADERS_TO_FORWARD: &[&HeaderName] = &[
&TXN_DEFERRABLE,
];
pub(crate) fn uuid_to_header_value(id: Uuid) -> HeaderValue {
let mut uuid = [0; uuid::fmt::Hyphenated::LENGTH];
HeaderValue::from_str(id.as_hyphenated().encode_lower(&mut uuid[..]))
.expect("uuid hyphenated format should be all valid header characters")
}
async fn handle_auth_broker_inner(
ctx: &RequestMonitoring,
request: Request<Incoming>,
@@ -741,7 +732,6 @@ async fn handle_auth_broker_inner(
req = req.header(h, hv);
}
}
req = req.header(&NEON_REQUEST_ID, uuid_to_header_value(ctx.session_id()));
let req = req
.body(body)

View File

@@ -23,7 +23,7 @@ backoff = "^2.2.1"
pytest-lazy-fixture = "^0.6.3"
prometheus-client = "^0.14.1"
pytest-timeout = "^2.1.0"
Werkzeug = "^3.0.6"
Werkzeug = "^3.0.3"
pytest-order = "^1.1.0"
allure-pytest = "^2.13.2"
pytest-asyncio = "^0.21.0"

View File

@@ -193,8 +193,6 @@ struct Args {
/// Usually, timeline eviction has to wait for `partial_backup_timeout` before being eligible for eviction,
/// but if a timeline is un-evicted and then _not_ written to, it would immediately flap to evicting again,
/// if it weren't for `eviction_min_resident` preventing that.
///
/// Also defines interval for eviction retries.
#[arg(long, value_parser = humantime::parse_duration, default_value = DEFAULT_EVICTION_MIN_RESIDENT)]
eviction_min_resident: Duration,
}

View File

@@ -14,10 +14,12 @@ use std::path::Path;
use std::time::Instant;
use crate::control_file_upgrade::downgrade_v9_to_v8;
use crate::control_file_upgrade::upgrade_control_file;
use crate::metrics::PERSIST_CONTROL_FILE_SECONDS;
use crate::state::{EvictionState, TimelinePersistentState};
use utils::bin_ser::LeSer;
use crate::{control_file_upgrade::upgrade_control_file, timeline::get_timeline_dir};
use utils::{bin_ser::LeSer, id::TenantTimelineId};
use crate::SafeKeeperConf;
pub const SK_MAGIC: u32 = 0xcafeceefu32;
pub const SK_FORMAT_VERSION: u32 = 9;
@@ -52,12 +54,13 @@ pub struct FileStorage {
impl FileStorage {
/// Initialize storage by loading state from disk.
pub fn restore_new(timeline_dir: &Utf8Path, no_sync: bool) -> Result<FileStorage> {
let state = Self::load_control_file_from_dir(timeline_dir)?;
pub fn restore_new(ttid: &TenantTimelineId, conf: &SafeKeeperConf) -> Result<FileStorage> {
let timeline_dir = get_timeline_dir(conf, ttid);
let state = Self::load_control_file_from_dir(&timeline_dir)?;
Ok(FileStorage {
timeline_dir: timeline_dir.to_path_buf(),
no_sync,
timeline_dir,
no_sync: conf.no_sync,
state,
last_persist_at: Instant::now(),
})
@@ -68,16 +71,16 @@ impl FileStorage {
/// Note: we normally call this in temp directory for atomic init, so
/// interested in FileStorage as a result only in tests.
pub async fn create_new(
timeline_dir: &Utf8Path,
dir: Utf8PathBuf,
conf: &SafeKeeperConf,
state: TimelinePersistentState,
no_sync: bool,
) -> Result<FileStorage> {
// we don't support creating new timelines in offloaded state
assert!(matches!(state.eviction_state, EvictionState::Present));
let mut store = FileStorage {
timeline_dir: timeline_dir.to_path_buf(),
no_sync,
timeline_dir: dir,
no_sync: conf.no_sync,
state: state.clone(),
last_persist_at: Instant::now(),
};
@@ -236,46 +239,89 @@ mod test {
use tokio::fs;
use utils::lsn::Lsn;
const NO_SYNC: bool = true;
fn stub_conf() -> SafeKeeperConf {
let workdir = camino_tempfile::tempdir().unwrap().into_path();
SafeKeeperConf {
workdir,
..SafeKeeperConf::dummy()
}
}
#[tokio::test]
async fn test_read_write_safekeeper_state() -> anyhow::Result<()> {
let tempdir = camino_tempfile::tempdir()?;
let mut state = TimelinePersistentState::empty();
let mut storage = FileStorage::create_new(tempdir.path(), state.clone(), NO_SYNC).await?;
async fn load_from_control_file(
conf: &SafeKeeperConf,
ttid: &TenantTimelineId,
) -> Result<(FileStorage, TimelinePersistentState)> {
let timeline_dir = get_timeline_dir(conf, ttid);
fs::create_dir_all(&timeline_dir)
.await
.expect("failed to create timeline dir");
Ok((
FileStorage::restore_new(ttid, conf)?,
FileStorage::load_control_file_from_dir(&timeline_dir)?,
))
}
// Make a change.
state.commit_lsn = Lsn(42);
storage.persist(&state).await?;
// Reload the state. It should match the previously persisted state.
let loaded_state = FileStorage::load_control_file_from_dir(tempdir.path())?;
assert_eq!(loaded_state, state);
Ok(())
async fn create(
conf: &SafeKeeperConf,
ttid: &TenantTimelineId,
) -> Result<(FileStorage, TimelinePersistentState)> {
let timeline_dir = get_timeline_dir(conf, ttid);
fs::create_dir_all(&timeline_dir)
.await
.expect("failed to create timeline dir");
let state = TimelinePersistentState::empty();
let storage = FileStorage::create_new(timeline_dir, conf, state.clone()).await?;
Ok((storage, state))
}
#[tokio::test]
async fn test_safekeeper_state_checksum_mismatch() -> anyhow::Result<()> {
let tempdir = camino_tempfile::tempdir()?;
let mut state = TimelinePersistentState::empty();
let mut storage = FileStorage::create_new(tempdir.path(), state.clone(), NO_SYNC).await?;
// Make a change.
state.commit_lsn = Lsn(42);
storage.persist(&state).await?;
// Change the first byte to fail checksum validation.
let ctrl_path = tempdir.path().join(CONTROL_FILE_NAME);
let mut data = fs::read(&ctrl_path).await?;
data[0] += 1;
fs::write(&ctrl_path, &data).await?;
// Loading the file should fail checksum validation.
if let Err(err) = FileStorage::load_control_file_from_dir(tempdir.path()) {
assert!(err.to_string().contains("control file checksum mismatch"))
} else {
panic!("expected checksum error")
async fn test_read_write_safekeeper_state() {
let conf = stub_conf();
let ttid = TenantTimelineId::generate();
{
let (mut storage, mut state) =
create(&conf, &ttid).await.expect("failed to create state");
// change something
state.commit_lsn = Lsn(42);
storage
.persist(&state)
.await
.expect("failed to persist state");
}
let (_, state) = load_from_control_file(&conf, &ttid)
.await
.expect("failed to read state");
assert_eq!(state.commit_lsn, Lsn(42));
}
#[tokio::test]
async fn test_safekeeper_state_checksum_mismatch() {
let conf = stub_conf();
let ttid = TenantTimelineId::generate();
{
let (mut storage, mut state) =
create(&conf, &ttid).await.expect("failed to read state");
// change something
state.commit_lsn = Lsn(42);
storage
.persist(&state)
.await
.expect("failed to persist state");
}
let control_path = get_timeline_dir(&conf, &ttid).join(CONTROL_FILE_NAME);
let mut data = fs::read(&control_path).await.unwrap();
data[0] += 1; // change the first byte of the file to fail checksum validation
fs::write(&control_path, &data)
.await
.expect("failed to write control file");
match load_from_control_file(&conf, &ttid).await {
Err(err) => assert!(err
.to_string()
.contains("safekeeper control file checksum mismatch")),
Ok(_) => panic!("expected error"),
}
Ok(())
}
}

View File

@@ -154,7 +154,7 @@ pub async fn handle_request(request: Request) -> Result<()> {
new_state.peer_horizon_lsn = request.until_lsn;
new_state.backup_lsn = new_backup_lsn;
FileStorage::create_new(&tli_dir_path, new_state.clone(), conf.no_sync).await?;
FileStorage::create_new(tli_dir_path.clone(), conf, new_state.clone()).await?;
// now we have a ready timeline in a temp directory
validate_temp_timeline(conf, request.destination_ttid, &tli_dir_path).await?;

View File

@@ -113,7 +113,6 @@ impl SafeKeeperConf {
impl SafeKeeperConf {
#[cfg(test)]
#[allow(unused)]
fn dummy() -> Self {
SafeKeeperConf {
workdir: Utf8PathBuf::from("./"),

View File

@@ -143,8 +143,8 @@ impl TimelinePersistentState {
TimelinePersistentState::new(
&TenantTimelineId::empty(),
ServerInfo {
pg_version: 170000, /* Postgres server version (major * 10000) */
system_id: 0, /* Postgres system identifier */
pg_version: 17, /* Postgres server version */
system_id: 0, /* Postgres system identifier */
wal_seg_size: 16 * 1024 * 1024,
},
vec![],

View File

@@ -328,19 +328,15 @@ impl SharedState {
/// Restore SharedState from control file. If file doesn't exist, bails out.
fn restore(conf: &SafeKeeperConf, ttid: &TenantTimelineId) -> Result<Self> {
let timeline_dir = get_timeline_dir(conf, ttid);
let control_store = control_file::FileStorage::restore_new(&timeline_dir, conf.no_sync)?;
let control_store = control_file::FileStorage::restore_new(ttid, conf)?;
if control_store.server.wal_seg_size == 0 {
bail!(TimelineError::UninitializedWalSegSize(*ttid));
}
let sk = match control_store.eviction_state {
EvictionState::Present => {
let wal_store = wal_storage::PhysicalStorage::new(
ttid,
&timeline_dir,
&control_store,
conf.no_sync,
)?;
let wal_store =
wal_storage::PhysicalStorage::new(ttid, timeline_dir, conf, &control_store)?;
StateSK::Loaded(SafeKeeper::new(
TimelineState::new(control_store),
wal_store,
@@ -1050,9 +1046,9 @@ impl ManagerTimeline {
// trying to restore WAL storage
let wal_store = wal_storage::PhysicalStorage::new(
&self.ttid,
&self.timeline_dir,
self.timeline_dir.clone(),
&conf,
shared.sk.state(),
conf.no_sync,
)?;
// updating control file

View File

@@ -66,15 +66,15 @@ impl Manager {
ready
}
/// Evict the timeline to remote storage. Returns whether the eviction was successful.
/// Evict the timeline to remote storage.
#[instrument(name = "evict_timeline", skip_all)]
pub(crate) async fn evict_timeline(&mut self) -> bool {
pub(crate) async fn evict_timeline(&mut self) {
assert!(!self.is_offloaded);
let partial_backup_uploaded = match &self.partial_backup_uploaded {
Some(p) => p.clone(),
None => {
warn!("no partial backup uploaded, skipping eviction");
return false;
return;
}
};
@@ -91,12 +91,11 @@ impl Manager {
if let Err(e) = do_eviction(self, &partial_backup_uploaded).await {
warn!("failed to evict timeline: {:?}", e);
return false;
return;
}
info!("successfully evicted timeline");
NUM_EVICTED_TIMELINES.inc();
true
}
/// Attempt to restore evicted timeline from remote storage; it must be

View File

@@ -297,12 +297,7 @@ pub async fn main_task(
match mgr.global_rate_limiter.try_acquire_eviction() {
Some(_permit) => {
mgr.set_status(Status::EvictTimeline);
if !mgr.evict_timeline().await {
// eviction failed, try again later
mgr.evict_not_before =
Instant::now() + rand_duration(&mgr.conf.eviction_min_resident);
update_next_event(&mut next_event, mgr.evict_not_before);
}
mgr.evict_timeline().await;
}
None => {
// we can't evict timeline now, will try again later

View File

@@ -244,7 +244,7 @@ impl GlobalTimelines {
// immediately initialize first WAL segment as well.
let state =
TimelinePersistentState::new(&ttid, server_info, vec![], commit_lsn, local_start_lsn)?;
control_file::FileStorage::create_new(&tmp_dir_path, state, conf.no_sync).await?;
control_file::FileStorage::create_new(tmp_dir_path.clone(), &conf, state).await?;
let timeline = GlobalTimelines::load_temp_timeline(ttid, &tmp_dir_path, true).await?;
Ok(timeline)
}
@@ -596,7 +596,7 @@ pub async fn validate_temp_timeline(
bail!("wal_seg_size is not set");
}
let wal_store = wal_storage::PhysicalStorage::new(&ttid, path, &control_store, conf.no_sync)?;
let wal_store = wal_storage::PhysicalStorage::new(&ttid, path.clone(), conf, &control_store)?;
let commit_lsn = control_store.commit_lsn;
let flush_lsn = wal_store.flush_lsn();

View File

@@ -29,6 +29,7 @@ use crate::metrics::{
};
use crate::state::TimelinePersistentState;
use crate::wal_backup::{read_object, remote_timeline_path};
use crate::SafeKeeperConf;
use postgres_ffi::waldecoder::WalStreamDecoder;
use postgres_ffi::XLogFileName;
use postgres_ffi::XLOG_BLCKSZ;
@@ -86,9 +87,7 @@ pub trait Storage {
pub struct PhysicalStorage {
metrics: WalStorageMetrics,
timeline_dir: Utf8PathBuf,
/// Disables fsync if true.
no_sync: bool,
conf: SafeKeeperConf,
/// Size of WAL segment in bytes.
wal_seg_size: usize,
@@ -152,9 +151,9 @@ impl PhysicalStorage {
/// the disk. Otherwise, all LSNs are set to zero.
pub fn new(
ttid: &TenantTimelineId,
timeline_dir: &Utf8Path,
timeline_dir: Utf8PathBuf,
conf: &SafeKeeperConf,
state: &TimelinePersistentState,
no_sync: bool,
) -> Result<PhysicalStorage> {
let wal_seg_size = state.server.wal_seg_size as usize;
@@ -199,8 +198,8 @@ impl PhysicalStorage {
Ok(PhysicalStorage {
metrics: WalStorageMetrics::default(),
timeline_dir: timeline_dir.to_path_buf(),
no_sync,
timeline_dir,
conf: conf.clone(),
wal_seg_size,
pg_version: state.server.pg_version,
system_id: state.server.system_id,
@@ -225,7 +224,7 @@ impl PhysicalStorage {
/// Call fdatasync if config requires so.
async fn fdatasync_file(&mut self, file: &File) -> Result<()> {
if !self.no_sync {
if !self.conf.no_sync {
self.metrics
.observe_flush_seconds(time_io_closure(file.sync_data()).await?);
}
@@ -264,7 +263,9 @@ impl PhysicalStorage {
// Note: this doesn't get into observe_flush_seconds metric. But
// segment init should be separate metric, if any.
if let Err(e) = durable_rename(&tmp_path, &wal_file_partial_path, !self.no_sync).await {
if let Err(e) =
durable_rename(&tmp_path, &wal_file_partial_path, !self.conf.no_sync).await
{
// Probably rename succeeded, but fsync of it failed. Remove
// the file then to avoid using it.
remove_file(wal_file_partial_path)

View File

@@ -3130,11 +3130,9 @@ impl Service {
.await?;
// Propagate the LSN that shard zero picked, if caller didn't provide one
match &mut create_req.mode {
models::TimelineCreateRequestMode::Branch { ancestor_start_lsn, .. } if ancestor_start_lsn.is_none() => {
*ancestor_start_lsn = timeline_info.ancestor_lsn;
},
_ => {}
if create_req.ancestor_timeline_id.is_some() && create_req.ancestor_start_lsn.is_none()
{
create_req.ancestor_start_lsn = timeline_info.ancestor_lsn;
}
// Create timeline on remaining shards with number >0

View File

@@ -150,7 +150,6 @@ PAGESERVER_GLOBAL_METRICS: tuple[str, ...] = (
counter("pageserver_tenant_throttling_count_accounted_finish_global"),
counter("pageserver_tenant_throttling_wait_usecs_sum_global"),
counter("pageserver_tenant_throttling_count_global"),
*histogram("pageserver_tokio_epoll_uring_slots_submission_queue_depth"),
)
PAGESERVER_PER_TENANT_METRICS: tuple[str, ...] = (

View File

@@ -44,14 +44,7 @@ from urllib3.util.retry import Retry
from fixtures import overlayfs
from fixtures.auth_tokens import AuthKeys, TokenScope
from fixtures.common_types import (
Lsn,
NodeId,
TenantId,
TenantShardId,
TimelineArchivalState,
TimelineId,
)
from fixtures.common_types import Lsn, NodeId, TenantId, TenantShardId, TimelineId
from fixtures.endpoint.http import EndpointHttpClient
from fixtures.log_helper import log
from fixtures.metrics import Metrics, MetricsGetter, parse_metrics
@@ -61,11 +54,7 @@ from fixtures.pageserver.allowed_errors import (
DEFAULT_STORAGE_CONTROLLER_ALLOWED_ERRORS,
)
from fixtures.pageserver.common_types import LayerName, parse_layer_file_name
from fixtures.pageserver.http import (
HistoricLayerInfo,
PageserverHttpClient,
ScanDisposableKeysResponse,
)
from fixtures.pageserver.http import PageserverHttpClient
from fixtures.pageserver.utils import (
wait_for_last_record_lsn,
)
@@ -2143,24 +2132,6 @@ class NeonStorageController(MetricsGetter, LogUtils):
response.raise_for_status()
return response.json()
def timeline_archival_config(
self,
tenant_id: TenantId,
timeline_id: TimelineId,
state: TimelineArchivalState,
):
config = {"state": state.value}
log.info(
f"requesting timeline archival config {config} for tenant {tenant_id} and timeline {timeline_id}"
)
res = self.request(
"PUT",
f"{self.api}/v1/tenant/{tenant_id}/timeline/{timeline_id}/archival_config",
json=config,
headers=self.headers(TokenScope.ADMIN),
)
return res.json()
def configure_failpoints(self, config_strings: tuple[str, str] | list[tuple[str, str]]):
if isinstance(config_strings, tuple):
pairs = [config_strings]
@@ -2674,51 +2645,6 @@ class NeonPageserver(PgProtocol, LogUtils):
layers = self.list_layers(tenant_id, timeline_id)
return layer_name in [parse_layer_file_name(p.name) for p in layers]
def timeline_scan_no_disposable_keys(
self, tenant_shard_id: TenantShardId, timeline_id: TimelineId
) -> TimelineAssertNoDisposableKeysResult:
"""
Scan all keys in all layers of the tenant/timeline for disposable keys.
Disposable keys are keys that are present in a layer referenced by the shard
but are not going to be accessed by the shard.
For example, after shard split, the child shards will reference the parent's layer
files until new data is ingested and/or compaction rewrites the layers.
"""
ps_http = self.http_client()
tally = ScanDisposableKeysResponse(0, 0)
per_layer = []
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
futs = []
shard_layer_map = ps_http.layer_map_info(tenant_shard_id, timeline_id)
for layer in shard_layer_map.historic_layers:
def do_layer(
shard_ps_http: PageserverHttpClient,
tenant_shard_id: TenantShardId,
timeline_id: TimelineId,
layer: HistoricLayerInfo,
) -> tuple[HistoricLayerInfo, ScanDisposableKeysResponse]:
return (
layer,
shard_ps_http.timeline_layer_scan_disposable_keys(
tenant_shard_id, timeline_id, layer.layer_file_name
),
)
futs.append(executor.submit(do_layer, ps_http, tenant_shard_id, timeline_id, layer))
for fut in futs:
layer, result = fut.result()
tally += result
per_layer.append((layer, result))
return TimelineAssertNoDisposableKeysResult(tally, per_layer)
@dataclass
class TimelineAssertNoDisposableKeysResult:
tally: ScanDisposableKeysResponse
per_layer: list[tuple[HistoricLayerInfo, ScanDisposableKeysResponse]]
class PgBin:
"""A helper class for executing postgres binaries"""

View File

@@ -129,26 +129,6 @@ class LayerMapInfo:
return set(x.layer_file_name for x in self.historic_layers)
@dataclass
class ScanDisposableKeysResponse:
disposable_count: int
not_disposable_count: int
def __add__(self, b):
a = self
assert isinstance(a, ScanDisposableKeysResponse)
assert isinstance(b, ScanDisposableKeysResponse)
return ScanDisposableKeysResponse(
a.disposable_count + b.disposable_count, a.not_disposable_count + b.not_disposable_count
)
@classmethod
def from_json(cls, d: dict[str, Any]) -> ScanDisposableKeysResponse:
disposable_count = d["disposable_count"]
not_disposable_count = d["not_disposable_count"]
return ScanDisposableKeysResponse(disposable_count, not_disposable_count)
@dataclass
class TenantConfig:
tenant_specific_overrides: dict[str, Any]
@@ -162,19 +142,6 @@ class TenantConfig:
)
@dataclass
class TimelinesInfoAndOffloaded:
timelines: list[dict[str, Any]]
offloaded: list[dict[str, Any]]
@classmethod
def from_json(cls, d: dict[str, Any]) -> TimelinesInfoAndOffloaded:
return TimelinesInfoAndOffloaded(
timelines=d["timelines"],
offloaded=d["offloaded"],
)
class PageserverHttpClient(requests.Session, MetricsGetter):
def __init__(
self,
@@ -497,18 +464,6 @@ class PageserverHttpClient(requests.Session, MetricsGetter):
assert isinstance(res_json, list)
return res_json
def timeline_and_offloaded_list(
self,
tenant_id: Union[TenantId, TenantShardId],
) -> TimelinesInfoAndOffloaded:
res = self.get(
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline_and_offloaded",
)
self.verbose_error(res)
res_json = res.json()
assert isinstance(res_json, dict)
return TimelinesInfoAndOffloaded.from_json(res_json)
def timeline_create(
self,
pg_version: PgVersion,
@@ -521,13 +476,12 @@ class PageserverHttpClient(requests.Session, MetricsGetter):
) -> dict[Any, Any]:
body: dict[str, Any] = {
"new_timeline_id": str(new_timeline_id),
"ancestor_start_lsn": str(ancestor_start_lsn) if ancestor_start_lsn else None,
"ancestor_timeline_id": str(ancestor_timeline_id) if ancestor_timeline_id else None,
"existing_initdb_timeline_id": str(existing_initdb_timeline_id)
if existing_initdb_timeline_id
else None,
}
if ancestor_timeline_id:
body["ancestor_timeline_id"] = str(ancestor_timeline_id)
if ancestor_start_lsn:
body["ancestor_start_lsn"] = str(ancestor_start_lsn)
if existing_initdb_timeline_id:
body["existing_initdb_timeline_id"] = str(existing_initdb_timeline_id)
if pg_version != PgVersion.NOT_SET:
body["pg_version"] = int(pg_version)
@@ -925,16 +879,6 @@ class PageserverHttpClient(requests.Session, MetricsGetter):
self.verbose_error(res)
return LayerMapInfo.from_json(res.json())
def timeline_layer_scan_disposable_keys(
self, tenant_id: Union[TenantId, TenantShardId], timeline_id: TimelineId, layer_name: str
) -> ScanDisposableKeysResponse:
res = self.post(
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/layer/{layer_name}/scan_disposable_keys",
)
self.verbose_error(res)
assert res.status_code == 200
return ScanDisposableKeysResponse.from_json(res.json())
def download_layer(
self, tenant_id: Union[TenantId, TenantShardId], timeline_id: TimelineId, layer_name: str
):

View File

@@ -0,0 +1,52 @@
import time
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnv
def test_lfc_prewarm(neon_simple_env: NeonEnv):
env = neon_simple_env
n_records = 1000000
endpoint = env.endpoints.create_start(
branch_name="main",
config_lines=[
"autovacuum = off",
"shared_buffers=1MB",
"neon.max_file_cache_size=1GB",
"neon.file_cache_size_limit=1GB",
"neon.file_cache_prewarm_limit=1000",
],
)
conn = endpoint.connect()
cur = conn.cursor()
cur.execute("create table t(pk integer primary key, payload text default repeat('?', 128))")
cur.execute(f"insert into t (pk) values (generate_series(1,{n_records}))")
endpoint.stop()
endpoint.start()
conn = endpoint.connect()
cur = conn.cursor()
cur.execute("create extension neon version '1.6'")
for _ in range(60):
time.sleep(1) # give prewarm BGW some time to proceed
cur.execute("select lfc_value from neon_lfc_stats where lfc_key='file_cache_used_pages'")
lfc_used_pages = cur.fetchall()[0][0]
log.info(f"Used LFC size: {lfc_used_pages}")
cur.execute("select * from get_prewarm_info()")
prewarm_info = cur.fetchall()[0]
log.info(f"Prewarm info: {prewarm_info}")
if prewarm_info[0] > 0:
log.info(f"Prewarm progress: {prewarm_info[1]*100//prewarm_info[0]}%")
if prewarm_info[0] == prewarm_info[1]:
break
assert lfc_used_pages > 10000
assert prewarm_info[0] > 0 and prewarm_info[0] == prewarm_info[1]
cur.execute("select sum(pk) from t")
assert cur.fetchall()[0][0] == n_records * (n_records + 1) / 2
assert prewarm_info[1] > 0

View File

@@ -3,13 +3,10 @@
#
from __future__ import annotations
import os
from concurrent.futures import ThreadPoolExecutor
from pathlib import Path
from typing import TYPE_CHECKING, cast
import pytest
from fixtures.log_helper import log
from fixtures.neon_fixtures import (
Endpoint,
NeonEnv,
@@ -327,97 +324,3 @@ def test_sql_regress(
pg_bin.run(pg_regress_command, env=env_vars, cwd=runpath)
post_checks(env, test_output_dir, DBNAME, endpoint)
@pytest.mark.skipif(os.environ.get("BUILD_TYPE") == "debug", reason="only run with release build")
def test_tx_abort_with_many_relations(
neon_env_builder: NeonEnvBuilder,
):
"""
This is not a pg_regress test as such, but perhaps it should be -- this test exercises postgres
behavior when aborting a transaction with lots of relations.
Reproducer for https://github.com/neondatabase/neon/issues/9505
"""
env = neon_env_builder.init_start()
ep = env.endpoints.create_start(
"main",
tenant_id=env.initial_tenant,
config_lines=[
"shared_buffers=1000MB",
"max_locks_per_transaction=16384",
],
)
# How many relations: this number is tuned to be long enough to take tens of seconds
# if the rollback code path is buggy, tripping the test's timeout.
n = 4000
def create():
# Create many relations
log.info(f"Creating {n} relations...")
ep.safe_psql_many(
[
"BEGIN",
f"""DO $$
DECLARE
i INT;
table_name TEXT;
BEGIN
FOR i IN 1..{n} LOOP
table_name := 'table_' || i;
EXECUTE 'CREATE TABLE IF NOT EXISTS ' || table_name || ' (id SERIAL PRIMARY KEY, data TEXT)';
END LOOP;
END $$;
""",
"COMMIT",
]
)
def truncate():
# Truncate relations, then roll back the transaction containing the truncations
log.info(f"Truncating {n} relations...")
ep.safe_psql_many(
[
"BEGIN",
f"""DO $$
DECLARE
i INT;
table_name TEXT;
BEGIN
FOR i IN 1..{n} LOOP
table_name := 'table_' || i;
EXECUTE 'TRUNCATE ' || table_name ;
END LOOP;
END $$;
""",
]
)
def rollback_and_wait():
log.info(f"Rolling back after truncating {n} relations...")
ep.safe_psql("ROLLBACK")
# Restart the endpoint: this ensures that we can read back what we just wrote, i.e. pageserver
# ingest has caught up.
ep.stop()
log.info(f"Starting endpoint after truncating {n} relations...")
ep.start()
log.info(f"Started endpoint after truncating {n} relations...")
# Actual create & truncate phases may be slow, these involves lots of WAL records. We do not
# apply a special timeout, they are expected to complete within general test timeout
create()
truncate()
# Run in a thread because the failure case is to take pathologically long time, and we don't want
# to block the test executor on that.
with ThreadPoolExecutor(max_workers=1) as exec:
try:
# Rollback phase should be fast: this is one WAL record that we should process efficiently
fut = exec.submit(rollback_and_wait)
fut.result(timeout=5)
except:
exec.shutdown(wait=False, cancel_futures=True)
raise

View File

@@ -169,24 +169,23 @@ def test_readonly_node_gc(neon_env_builder: NeonEnvBuilder):
)
return last_flush_lsn
def trigger_gc_and_select(env: NeonEnv, ep_static: Endpoint, ctx: str):
def trigger_gc_and_select(env: NeonEnv, ep_static: Endpoint):
"""
Trigger GC manually on all pageservers. Then run an `SELECT` query.
"""
for shard, ps in tenant_get_shards(env, env.initial_tenant):
client = ps.http_client()
gc_result = client.timeline_gc(shard, env.initial_timeline, 0)
# Note: cannot assert on `layers_removed` here because it could be layers
# not guarded by the lease. Rely on successful execution of the query instead.
log.info(f"{gc_result=}")
assert (
gc_result["layers_removed"] == 0
), "No layers should be removed, old layers are guarded by leases."
with ep_static.cursor() as cur:
# Following query should succeed if pages are properly guarded by leases.
cur.execute("SELECT count(*) FROM t0")
assert cur.fetchone() == (ROW_COUNT,)
log.info(f"`SELECT` query succeed after GC, {ctx=}")
# Insert some records on main branch
with env.endpoints.create_start("main") as ep_main:
with ep_main.cursor() as cur:
@@ -211,9 +210,9 @@ def test_readonly_node_gc(neon_env_builder: NeonEnvBuilder):
# Wait for static compute to renew lease at least once.
time.sleep(LSN_LEASE_LENGTH / 2)
generate_updates_on_main(env, ep_main, 3, end=100)
generate_updates_on_main(env, ep_main, i, end=100)
trigger_gc_and_select(env, ep_static, ctx="Before pageservers restart")
trigger_gc_and_select(env, ep_static)
# Trigger Pageserver restarts
for ps in env.pageservers:
@@ -222,7 +221,7 @@ def test_readonly_node_gc(neon_env_builder: NeonEnvBuilder):
time.sleep(LSN_LEASE_LENGTH / 2)
ps.start()
trigger_gc_and_select(env, ep_static, ctx="After pageservers restart")
trigger_gc_and_select(env, ep_static)
# Reconfigure pageservers
env.pageservers[0].stop()
@@ -231,7 +230,7 @@ def test_readonly_node_gc(neon_env_builder: NeonEnvBuilder):
)
env.storage_controller.reconcile_until_idle()
trigger_gc_and_select(env, ep_static, ctx="After putting pageserver 0 offline")
trigger_gc_and_select(env, ep_static)
# Do some update so we can increment latest_gc_cutoff
generate_updates_on_main(env, ep_main, i, end=100)

View File

@@ -3,11 +3,11 @@ from __future__ import annotations
import os
import time
from collections import defaultdict
from typing import TYPE_CHECKING, Any
from typing import TYPE_CHECKING
import pytest
import requests
from fixtures.common_types import Lsn, TenantId, TenantShardId, TimelineArchivalState, TimelineId
from fixtures.common_types import Lsn, TenantId, TenantShardId, TimelineId
from fixtures.compute_reconfigure import ComputeReconfigure
from fixtures.log_helper import log
from fixtures.neon_fixtures import (
@@ -188,9 +188,7 @@ def test_sharding_split_unsharded(
"compact-shard-ancestors-persistent",
],
)
def test_sharding_split_compaction(
neon_env_builder: NeonEnvBuilder, failpoint: Optional[str], build_type: str
):
def test_sharding_split_compaction(neon_env_builder: NeonEnvBuilder, failpoint: Optional[str]):
"""
Test that after a split, we clean up parent layer data in the child shards via compaction.
"""
@@ -324,19 +322,9 @@ def test_sharding_split_compaction(
# Physical size should shrink because layers are smaller
assert detail_after["current_physical_size"] < detail_before["current_physical_size"]
# Validate filtering compaction actually happened
# Validate size statistics
for shard in shards:
ps = env.get_tenant_pageserver(shard)
log.info("scan all layer files for disposable keys, there shouldn't be any")
result = ps.timeline_scan_no_disposable_keys(shard, timeline_id)
tally = result.tally
raw_page_count = tally.not_disposable_count + tally.disposable_count
assert tally.not_disposable_count > (
raw_page_count // 2
), "compaction doesn't rewrite layers that are >=50pct local"
log.info("check sizes")
timeline_info = ps.http_client().timeline_detail(shard, timeline_id)
reported_size = timeline_info["current_physical_size"]
layer_paths = ps.list_layers(shard, timeline_id)
@@ -365,145 +353,6 @@ def test_sharding_split_compaction(
workload.validate()
def test_sharding_split_offloading(neon_env_builder: NeonEnvBuilder):
"""
Test that during a split, we don't miss archived and offloaded timelines.
"""
TENANT_CONF = {
# small checkpointing and compaction targets to ensure we generate many upload operations
"checkpoint_distance": 128 * 1024,
"compaction_threshold": 1,
"compaction_target_size": 128 * 1024,
# no PITR horizon, we specify the horizon when we request on-demand GC
"pitr_interval": "3600s",
# disable background compaction, GC and offloading. We invoke it manually when we want it to happen.
"gc_period": "0s",
"compaction_period": "0s",
# Disable automatic creation of image layers, as we will create them explicitly when we want them
"image_creation_threshold": 9999,
"image_layer_creation_check_threshold": 0,
"lsn_lease_length": "0s",
}
neon_env_builder.storage_controller_config = {
# Default neon_local uses a small timeout: use a longer one to tolerate longer pageserver restarts.
"max_offline": "30s",
"max_warming_up": "300s",
}
env = neon_env_builder.init_start(initial_tenant_conf=TENANT_CONF)
tenant_id = env.initial_tenant
timeline_id_main = env.initial_timeline
# Check that we created with an unsharded TenantShardId: this is the default,
# but check it in case we change the default in future
assert env.storage_controller.inspect(TenantShardId(tenant_id, 0, 0)) is not None
workload_main = Workload(env, tenant_id, timeline_id_main, branch_name="main")
workload_main.init()
workload_main.write_rows(256)
workload_main.validate()
workload_main.stop()
# Create two timelines, archive one, offload the other
timeline_id_archived = env.create_branch("archived_not_offloaded")
timeline_id_offloaded = env.create_branch("archived_offloaded")
def timeline_id_set_for(list: list[dict[str, Any]]) -> set[TimelineId]:
return set(
map(
lambda t: TimelineId(t["timeline_id"]),
list,
)
)
expected_offloaded_set = {timeline_id_offloaded}
expected_timeline_set = {timeline_id_main, timeline_id_archived}
with env.get_tenant_pageserver(tenant_id).http_client() as http_client:
http_client.timeline_archival_config(
tenant_id, timeline_id_archived, TimelineArchivalState.ARCHIVED
)
http_client.timeline_archival_config(
tenant_id, timeline_id_offloaded, TimelineArchivalState.ARCHIVED
)
http_client.timeline_offload(tenant_id, timeline_id_offloaded)
list = http_client.timeline_and_offloaded_list(tenant_id)
assert timeline_id_set_for(list.offloaded) == expected_offloaded_set
assert timeline_id_set_for(list.timelines) == expected_timeline_set
# Do a full image layer generation before splitting
http_client.timeline_checkpoint(
tenant_id, timeline_id_main, force_image_layer_creation=True, wait_until_uploaded=True
)
# Split one shard into two
shards = env.storage_controller.tenant_shard_split(tenant_id, shard_count=2)
# Let all shards move into their stable locations, so that during subsequent steps we
# don't have reconciles in progress (simpler to reason about what messages we expect in logs)
env.storage_controller.reconcile_until_idle()
# Check we got the shard IDs we expected
assert env.storage_controller.inspect(TenantShardId(tenant_id, 0, 2)) is not None
assert env.storage_controller.inspect(TenantShardId(tenant_id, 1, 2)) is not None
workload_main.validate()
workload_main.stop()
env.storage_controller.consistency_check()
# Ensure each shard has the same list of timelines and offloaded timelines
for shard in shards:
ps = env.get_tenant_pageserver(shard)
list = ps.http_client().timeline_and_offloaded_list(shard)
assert timeline_id_set_for(list.offloaded) == expected_offloaded_set
assert timeline_id_set_for(list.timelines) == expected_timeline_set
ps.http_client().timeline_compact(shard, timeline_id_main)
# Check that we can still read all the data
workload_main.validate()
# Force a restart, which requires the state to be persisted.
env.pageserver.stop()
env.pageserver.start()
# Ensure each shard has the same list of timelines and offloaded timelines
for shard in shards:
ps = env.get_tenant_pageserver(shard)
list = ps.http_client().timeline_and_offloaded_list(shard)
assert timeline_id_set_for(list.offloaded) == expected_offloaded_set
assert timeline_id_set_for(list.timelines) == expected_timeline_set
ps.http_client().timeline_compact(shard, timeline_id_main)
# Compaction shouldn't make anything unreadable
workload_main.validate()
# Do sharded unarchival
env.storage_controller.timeline_archival_config(
tenant_id, timeline_id_offloaded, TimelineArchivalState.UNARCHIVED
)
env.storage_controller.timeline_archival_config(
tenant_id, timeline_id_archived, TimelineArchivalState.UNARCHIVED
)
for shard in shards:
ps = env.get_tenant_pageserver(shard)
list = ps.http_client().timeline_and_offloaded_list(shard)
assert timeline_id_set_for(list.offloaded) == set()
assert timeline_id_set_for(list.timelines) == {
timeline_id_main,
timeline_id_archived,
timeline_id_offloaded,
}
def test_sharding_split_smoke(
neon_env_builder: NeonEnvBuilder,
):

View File

@@ -1,12 +1,11 @@
from __future__ import annotations
import os
import time
from typing import TYPE_CHECKING
from fixtures.common_types import Lsn, TenantId
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnv, NeonEnvBuilder
from fixtures.utils import wait_until
if TYPE_CHECKING:
from typing import Any
@@ -20,10 +19,6 @@ def test_pageserver_lsn_wait_error_start(neon_env_builder: NeonEnvBuilder):
env = neon_env_builder.init_start()
env.pageserver.http_client()
# In this test we force 'Timed out while waiting for WAL record error' while
# fetching basebackup and don't want any retries.
os.environ["NEON_COMPUTE_TESTING_BASEBACKUP_RETRIES"] = "1"
tenant_id, timeline_id = env.create_tenant()
expected_timeout_error = f"Timed out while waiting for WAL record at LSN {future_lsn} to arrive"
env.pageserver.allowed_errors.append(f".*{expected_timeout_error}.*")
@@ -54,14 +49,11 @@ def test_pageserver_lsn_wait_error_start(neon_env_builder: NeonEnvBuilder):
def test_pageserver_lsn_wait_error_safekeeper_stop(neon_env_builder: NeonEnvBuilder):
# Trigger WAL wait timeout faster
def customize_pageserver_toml(ps_cfg: dict[str, Any]):
ps_cfg["wait_lsn_timeout"] = "2s"
ps_cfg["wait_lsn_timeout"] = "1s"
tenant_config = ps_cfg.setdefault("tenant_config", {})
tenant_config["walreceiver_connect_timeout"] = "2s"
tenant_config["lagging_wal_timeout"] = "2s"
# In this test we force 'Timed out while waiting for WAL record error' while
# fetching basebackup and don't want any retries.
os.environ["NEON_COMPUTE_TESTING_BASEBACKUP_RETRIES"] = "1"
neon_env_builder.pageserver_config_override = customize_pageserver_toml
# Have notable SK ids to ensure we check logs for their presence, not some other random numbers
@@ -72,6 +64,7 @@ def test_pageserver_lsn_wait_error_safekeeper_stop(neon_env_builder: NeonEnvBuil
tenant_id, timeline_id = env.create_tenant()
elements_to_insert = 1_000_000
expected_timeout_error = f"Timed out while waiting for WAL record at LSN {future_lsn} to arrive"
env.pageserver.allowed_errors.append(f".*{expected_timeout_error}.*")
# we configure wait_lsn_timeout to a shorter value than the lagging_wal_timeout / walreceiver_connect_timeout
@@ -81,50 +74,45 @@ def test_pageserver_lsn_wait_error_safekeeper_stop(neon_env_builder: NeonEnvBuil
".*ingesting record with timestamp lagging more than wait_lsn_timeout.*"
)
insert_test_elements(env, tenant_id, start=0, count=1)
insert_test_elements(env, tenant_id, start=0, count=elements_to_insert)
def all_sks_in_wareceiver_state():
try:
trigger_wait_lsn_timeout(env, tenant_id)
except Exception as e:
exception_string = str(e)
try:
trigger_wait_lsn_timeout(env, tenant_id)
except Exception as e:
exception_string = str(e)
assert expected_timeout_error in exception_string, "Should time out during waiting for WAL"
for safekeeper in env.safekeepers:
assert (
expected_timeout_error in exception_string
), "Should time out during waiting for WAL"
for safekeeper in env.safekeepers:
assert (
str(safekeeper.id) in exception_string
), f"Should have safekeeper {safekeeper.id} printed in walreceiver state after WAL wait timeout"
wait_until(60, 0.5, all_sks_in_wareceiver_state)
str(safekeeper.id) in exception_string
), f"Should have safekeeper {safekeeper.id} printed in walreceiver state after WAL wait timeout"
stopped_safekeeper = env.safekeepers[-1]
stopped_safekeeper_id = stopped_safekeeper.id
log.info(f"Stopping safekeeper {stopped_safekeeper.id}")
stopped_safekeeper.stop()
# sleep until stopped safekeeper is removed from candidates
time.sleep(2)
def all_but_stopped_sks_in_wareceiver_state():
try:
trigger_wait_lsn_timeout(env, tenant_id)
except Exception as e:
# Strip out the part before stdout, as it contains full command with the list of all safekeepers
exception_string = str(e).split("stdout", 1)[-1]
assert (
expected_timeout_error in exception_string
), "Should time out during waiting for WAL"
# Spend some more time inserting, to ensure SKs report updated statuses and walreceiver in PS have time to update its connection stats.
insert_test_elements(env, tenant_id, start=elements_to_insert + 1, count=elements_to_insert)
for safekeeper in env.safekeepers:
if safekeeper.id == stopped_safekeeper_id:
assert (
str(safekeeper.id) not in exception_string
), f"Should not have stopped safekeeper {safekeeper.id} printed in walreceiver state after 2nd WAL wait timeout"
else:
assert (
str(safekeeper.id) in exception_string
), f"Should have safekeeper {safekeeper.id} printed in walreceiver state after 2nd WAL wait timeout"
try:
trigger_wait_lsn_timeout(env, tenant_id)
except Exception as e:
# Strip out the part before stdout, as it contains full command with the list of all safekeepers
exception_string = str(e).split("stdout", 1)[-1]
assert expected_timeout_error in exception_string, "Should time out during waiting for WAL"
wait_until(60, 0.5, all_but_stopped_sks_in_wareceiver_state)
for safekeeper in env.safekeepers:
if safekeeper.id == stopped_safekeeper_id:
assert (
str(safekeeper.id) not in exception_string
), f"Should not have stopped safekeeper {safekeeper.id} printed in walreceiver state after 2nd WAL wait timeout"
else:
assert (
str(safekeeper.id) in exception_string
), f"Should have safekeeper {safekeeper.id} printed in walreceiver state after 2nd WAL wait timeout"
def insert_test_elements(env: NeonEnv, tenant_id: TenantId, start: int, count: int):

View File

@@ -1,18 +1,18 @@
{
"v17": [
"17.0",
"68b5038f27e493bde6ae552fe066f10cbdfe6a14"
"37d5ead146b028dd9a5c07e7a37068ec0df9f465"
],
"v16": [
"16.4",
"e131a9c027b202ce92bd7b9cf2569d48a6f9948e"
"cc36e03bd0c927022cf3b3563e291e42d75366a1"
],
"v15": [
"15.8",
"22e580fe9ffcea7e02592110b1c9bf426d83cada"
"a4830163a65811578824ce4022c1cd3daef33d4e"
],
"v14": [
"14.13",
"2199b83fb72680001ce0f43bf6187a21dfb8f45d"
"ecb1020ff71927e9dd59c526254bb8846bb73ee1"
]
}