mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-19 22:20:37 +00:00
Merge 2025-04-09 main commit 'a04e33ceb638a3ee5fef8d642b57ffc3a4543c98' into yuchen/direct-io-delta-image-layer-write
This commit is contained in:
@@ -118,16 +118,18 @@ struct Cli {
|
||||
#[arg(long)]
|
||||
pub set_disk_quota_for_fs: Option<String>,
|
||||
|
||||
#[arg(short = 's', long = "spec", group = "spec")]
|
||||
pub spec_json: Option<String>,
|
||||
|
||||
#[arg(short = 'S', long, group = "spec-path")]
|
||||
pub spec_path: Option<OsString>,
|
||||
|
||||
#[arg(short = 'i', long, group = "compute-id")]
|
||||
pub compute_id: String,
|
||||
|
||||
#[arg(short = 'p', long, conflicts_with_all = ["spec", "spec-path"], value_name = "CONTROL_PLANE_API_BASE_URL")]
|
||||
#[arg(
|
||||
short = 'p',
|
||||
long,
|
||||
conflicts_with = "spec-path",
|
||||
value_name = "CONTROL_PLANE_API_BASE_URL"
|
||||
)]
|
||||
pub control_plane_uri: Option<String>,
|
||||
}
|
||||
|
||||
@@ -172,7 +174,6 @@ fn main() -> Result<()> {
|
||||
cgroup: cli.cgroup,
|
||||
#[cfg(target_os = "linux")]
|
||||
vm_monitor_addr: cli.vm_monitor_addr,
|
||||
live_config_allowed: cli_spec.live_config_allowed,
|
||||
},
|
||||
cli_spec.spec,
|
||||
cli_spec.compute_ctl_config,
|
||||
@@ -201,23 +202,12 @@ async fn init() -> Result<()> {
|
||||
}
|
||||
|
||||
fn try_spec_from_cli(cli: &Cli) -> Result<CliSpecParams> {
|
||||
// First, try to get cluster spec from the cli argument
|
||||
if let Some(ref spec_json) = cli.spec_json {
|
||||
info!("got spec from cli argument {}", spec_json);
|
||||
return Ok(CliSpecParams {
|
||||
spec: Some(serde_json::from_str(spec_json)?),
|
||||
compute_ctl_config: ComputeCtlConfig::default(),
|
||||
live_config_allowed: false,
|
||||
});
|
||||
}
|
||||
|
||||
// Second, try to read it from the file if path is provided
|
||||
// First, read spec from the path if provided
|
||||
if let Some(ref spec_path) = cli.spec_path {
|
||||
let file = File::open(Path::new(spec_path))?;
|
||||
return Ok(CliSpecParams {
|
||||
spec: Some(serde_json::from_reader(file)?),
|
||||
compute_ctl_config: ComputeCtlConfig::default(),
|
||||
live_config_allowed: true,
|
||||
});
|
||||
}
|
||||
|
||||
@@ -225,11 +215,12 @@ fn try_spec_from_cli(cli: &Cli) -> Result<CliSpecParams> {
|
||||
panic!("must specify --control-plane-uri");
|
||||
};
|
||||
|
||||
// If the spec wasn't provided in the CLI arguments, then retrieve it from
|
||||
// the control plane
|
||||
match get_spec_from_control_plane(cli.control_plane_uri.as_ref().unwrap(), &cli.compute_id) {
|
||||
Ok(resp) => Ok(CliSpecParams {
|
||||
spec: resp.0,
|
||||
compute_ctl_config: resp.1,
|
||||
live_config_allowed: true,
|
||||
}),
|
||||
Err(e) => {
|
||||
error!(
|
||||
@@ -247,7 +238,6 @@ struct CliSpecParams {
|
||||
spec: Option<ComputeSpec>,
|
||||
#[allow(dead_code)]
|
||||
compute_ctl_config: ComputeCtlConfig,
|
||||
live_config_allowed: bool,
|
||||
}
|
||||
|
||||
fn deinit_and_exit(exit_code: Option<i32>) -> ! {
|
||||
|
||||
@@ -93,20 +93,6 @@ pub struct ComputeNodeParams {
|
||||
|
||||
/// the address of extension storage proxy gateway
|
||||
pub ext_remote_storage: Option<String>,
|
||||
|
||||
/// We should only allow live re- / configuration of the compute node if
|
||||
/// it uses 'pull model', i.e. it can go to control-plane and fetch
|
||||
/// the latest configuration. Otherwise, there could be a case:
|
||||
/// - we start compute with some spec provided as argument
|
||||
/// - we push new spec and it does reconfiguration
|
||||
/// - but then something happens and compute pod / VM is destroyed,
|
||||
/// so k8s controller starts it again with the **old** spec
|
||||
///
|
||||
/// and the same for empty computes:
|
||||
/// - we started compute without any spec
|
||||
/// - we push spec and it does configuration
|
||||
/// - but then it is restarted without any spec again
|
||||
pub live_config_allowed: bool,
|
||||
}
|
||||
|
||||
/// Compute node info shared across several `compute_ctl` threads.
|
||||
|
||||
@@ -6,20 +6,15 @@ use axum_extra::{
|
||||
TypedHeader,
|
||||
headers::{Authorization, authorization::Bearer},
|
||||
};
|
||||
use compute_api::requests::ComputeClaims;
|
||||
use futures::future::BoxFuture;
|
||||
use http::{Request, Response, StatusCode};
|
||||
use jsonwebtoken::{Algorithm, DecodingKey, TokenData, Validation, jwk::JwkSet};
|
||||
use serde::Deserialize;
|
||||
use tower_http::auth::AsyncAuthorizeRequest;
|
||||
use tracing::warn;
|
||||
|
||||
use crate::http::{JsonResponse, extract::RequestId};
|
||||
|
||||
#[derive(Clone, Debug, Deserialize)]
|
||||
pub(in crate::http) struct Claims {
|
||||
compute_id: String,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub(in crate::http) struct Authorize {
|
||||
compute_id: String,
|
||||
@@ -112,7 +107,11 @@ impl AsyncAuthorizeRequest<Body> for Authorize {
|
||||
|
||||
impl Authorize {
|
||||
/// Verify the token using the JSON Web Key set and return the token data.
|
||||
fn verify(jwks: &JwkSet, token: &str, validation: &Validation) -> Result<TokenData<Claims>> {
|
||||
fn verify(
|
||||
jwks: &JwkSet,
|
||||
token: &str,
|
||||
validation: &Validation,
|
||||
) -> Result<TokenData<ComputeClaims>> {
|
||||
for jwk in jwks.keys.iter() {
|
||||
let decoding_key = match DecodingKey::from_jwk(jwk) {
|
||||
Ok(key) => key,
|
||||
@@ -127,7 +126,7 @@ impl Authorize {
|
||||
}
|
||||
};
|
||||
|
||||
match jsonwebtoken::decode::<Claims>(token, &decoding_key, validation) {
|
||||
match jsonwebtoken::decode::<ComputeClaims>(token, &decoding_key, validation) {
|
||||
Ok(data) => return Ok(data),
|
||||
Err(e) => {
|
||||
warn!(
|
||||
|
||||
@@ -22,13 +22,6 @@ pub(in crate::http) async fn configure(
|
||||
State(compute): State<Arc<ComputeNode>>,
|
||||
request: Json<ConfigurationRequest>,
|
||||
) -> Response {
|
||||
if !compute.params.live_config_allowed {
|
||||
return JsonResponse::error(
|
||||
StatusCode::PRECONDITION_FAILED,
|
||||
"live configuration is not allowed for this compute node".to_string(),
|
||||
);
|
||||
}
|
||||
|
||||
let pspec = match ParsedSpec::try_from(request.spec.clone()) {
|
||||
Ok(p) => p,
|
||||
Err(e) => return JsonResponse::error(StatusCode::BAD_REQUEST, e),
|
||||
|
||||
@@ -15,7 +15,7 @@ use clap::ValueEnum;
|
||||
use postgres_backend::AuthType;
|
||||
use reqwest::Url;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use utils::auth::{Claims, encode_from_key_file};
|
||||
use utils::auth::encode_from_key_file;
|
||||
use utils::id::{NodeId, TenantId, TenantTimelineId, TimelineId};
|
||||
|
||||
use crate::object_storage::{OBJECT_STORAGE_REMOTE_STORAGE_DIR, ObjectStorage};
|
||||
@@ -757,7 +757,7 @@ impl LocalEnv {
|
||||
}
|
||||
|
||||
// this function is used only for testing purposes in CLI e g generate tokens during init
|
||||
pub fn generate_auth_token(&self, claims: &Claims) -> anyhow::Result<String> {
|
||||
pub fn generate_auth_token<S: Serialize>(&self, claims: &S) -> anyhow::Result<String> {
|
||||
let private_key_path = self.get_private_key_path();
|
||||
let key_data = fs::read(private_key_path)?;
|
||||
encode_from_key_file(claims, &key_data)
|
||||
|
||||
@@ -5,6 +5,14 @@ use crate::privilege::Privilege;
|
||||
use crate::responses::ComputeCtlConfig;
|
||||
use crate::spec::{ComputeSpec, ExtVersion, PgIdent};
|
||||
|
||||
/// When making requests to the `compute_ctl` external HTTP server, the client
|
||||
/// must specify a set of claims in `Authorization` header JWTs such that
|
||||
/// `compute_ctl` can authorize the request.
|
||||
#[derive(Clone, Debug, Deserialize, Serialize)]
|
||||
pub struct ComputeClaims {
|
||||
pub compute_id: String,
|
||||
}
|
||||
|
||||
/// Request of the /configure API
|
||||
///
|
||||
/// We now pass only `spec` in the configuration request, but later we can
|
||||
|
||||
@@ -173,7 +173,7 @@ impl std::fmt::Debug for JwtAuth {
|
||||
}
|
||||
|
||||
// this function is used only for testing purposes in CLI e g generate tokens during init
|
||||
pub fn encode_from_key_file(claims: &Claims, key_data: &[u8]) -> Result<String> {
|
||||
pub fn encode_from_key_file<S: Serialize>(claims: &S, key_data: &[u8]) -> Result<String> {
|
||||
let key = EncodingKey::from_ed_pem(key_data)?;
|
||||
Ok(encode(&Header::new(STORAGE_TOKEN_ALGORITHM), claims, &key)?)
|
||||
}
|
||||
|
||||
@@ -65,7 +65,7 @@ use bytes::{Buf, Bytes};
|
||||
use criterion::{BenchmarkId, Criterion};
|
||||
use once_cell::sync::Lazy;
|
||||
use pageserver::config::PageServerConf;
|
||||
use pageserver::walredo::PostgresRedoManager;
|
||||
use pageserver::walredo::{PostgresRedoManager, RedoAttemptType};
|
||||
use pageserver_api::key::Key;
|
||||
use pageserver_api::record::NeonWalRecord;
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
@@ -223,7 +223,14 @@ impl Request {
|
||||
|
||||
// TODO: avoid these clones
|
||||
manager
|
||||
.request_redo(*key, *lsn, base_img.clone(), records.clone(), *pg_version)
|
||||
.request_redo(
|
||||
*key,
|
||||
*lsn,
|
||||
base_img.clone(),
|
||||
records.clone(),
|
||||
*pg_version,
|
||||
RedoAttemptType::ReadPage,
|
||||
)
|
||||
.await
|
||||
.context("request_redo")
|
||||
}
|
||||
|
||||
@@ -2274,6 +2274,7 @@ async fn timeline_compact_handler(
|
||||
if Some(true) == parse_query_param::<_, bool>(&request, "dry_run")? {
|
||||
flags |= CompactFlags::DryRun;
|
||||
}
|
||||
// Manual compaction does not yield for L0.
|
||||
|
||||
let wait_until_uploaded =
|
||||
parse_query_param::<_, bool>(&request, "wait_until_uploaded")?.unwrap_or(false);
|
||||
|
||||
@@ -100,7 +100,7 @@ use crate::tenant::timeline::delete::DeleteTimelineFlow;
|
||||
use crate::tenant::timeline::uninit::cleanup_timeline_directory;
|
||||
use crate::virtual_file::VirtualFile;
|
||||
use crate::walingest::WalLagCooldown;
|
||||
use crate::walredo::PostgresRedoManager;
|
||||
use crate::walredo::{PostgresRedoManager, RedoAttemptType};
|
||||
use crate::{InitializationOrder, TEMP_FILE_SUFFIX, import_datadir, span, task_mgr, walredo};
|
||||
|
||||
static INIT_DB_SEMAPHORE: Lazy<Semaphore> = Lazy::new(|| Semaphore::new(8));
|
||||
@@ -473,15 +473,16 @@ impl WalRedoManager {
|
||||
base_img: Option<(Lsn, bytes::Bytes)>,
|
||||
records: Vec<(Lsn, pageserver_api::record::NeonWalRecord)>,
|
||||
pg_version: u32,
|
||||
redo_attempt_type: RedoAttemptType,
|
||||
) -> Result<bytes::Bytes, walredo::Error> {
|
||||
match self {
|
||||
Self::Prod(_, mgr) => {
|
||||
mgr.request_redo(key, lsn, base_img, records, pg_version)
|
||||
mgr.request_redo(key, lsn, base_img, records, pg_version, redo_attempt_type)
|
||||
.await
|
||||
}
|
||||
#[cfg(test)]
|
||||
Self::Test(mgr) => {
|
||||
mgr.request_redo(key, lsn, base_img, records, pg_version)
|
||||
mgr.request_redo(key, lsn, base_img, records, pg_version, redo_attempt_type)
|
||||
.await
|
||||
}
|
||||
}
|
||||
@@ -5879,6 +5880,7 @@ pub(crate) mod harness {
|
||||
base_img: Option<(Lsn, Bytes)>,
|
||||
records: Vec<(Lsn, NeonWalRecord)>,
|
||||
_pg_version: u32,
|
||||
_redo_attempt_type: RedoAttemptType,
|
||||
) -> Result<Bytes, walredo::Error> {
|
||||
let records_neon = records.iter().all(|r| apply_neon::can_apply_in_neon(&r.1));
|
||||
if records_neon {
|
||||
|
||||
@@ -24,6 +24,7 @@ use std::sync::{Arc, Mutex, OnceLock, RwLock, Weak};
|
||||
use std::time::{Duration, Instant, SystemTime};
|
||||
|
||||
use crate::PERF_TRACE_TARGET;
|
||||
use crate::walredo::RedoAttemptType;
|
||||
use anyhow::{Context, Result, anyhow, bail, ensure};
|
||||
use arc_swap::{ArcSwap, ArcSwapOption};
|
||||
use bytes::Bytes;
|
||||
@@ -1293,6 +1294,12 @@ impl Timeline {
|
||||
};
|
||||
reconstruct_state.read_path = read_path;
|
||||
|
||||
let redo_attempt_type = if ctx.task_kind() == TaskKind::Compaction {
|
||||
RedoAttemptType::LegacyCompaction
|
||||
} else {
|
||||
RedoAttemptType::ReadPage
|
||||
};
|
||||
|
||||
let traversal_res: Result<(), _> = {
|
||||
let ctx = RequestContextBuilder::from(ctx)
|
||||
.perf_span(|crnt_perf_span| {
|
||||
@@ -1380,7 +1387,7 @@ impl Timeline {
|
||||
|
||||
let walredo_deltas = converted.num_deltas();
|
||||
let walredo_res = walredo_self
|
||||
.reconstruct_value(key, lsn, converted)
|
||||
.reconstruct_value(key, lsn, converted, redo_attempt_type)
|
||||
.maybe_perf_instrument(&ctx, |crnt_perf_span| {
|
||||
info_span!(
|
||||
target: PERF_TRACE_TARGET,
|
||||
@@ -6351,37 +6358,21 @@ impl Timeline {
|
||||
|
||||
/// Reconstruct a value, using the given base image and WAL records in 'data'.
|
||||
async fn reconstruct_value(
|
||||
&self,
|
||||
key: Key,
|
||||
request_lsn: Lsn,
|
||||
data: ValueReconstructState,
|
||||
) -> Result<Bytes, PageReconstructError> {
|
||||
self.reconstruct_value_inner(key, request_lsn, data, false)
|
||||
.await
|
||||
}
|
||||
|
||||
/// Reconstruct a value, using the given base image and WAL records in 'data'. It does not fire critical errors because
|
||||
/// sometimes it is expected to fail due to unreplayable history described in <https://github.com/neondatabase/neon/issues/10395>.
|
||||
async fn reconstruct_value_wo_critical_error(
|
||||
&self,
|
||||
key: Key,
|
||||
request_lsn: Lsn,
|
||||
data: ValueReconstructState,
|
||||
) -> Result<Bytes, PageReconstructError> {
|
||||
self.reconstruct_value_inner(key, request_lsn, data, true)
|
||||
.await
|
||||
}
|
||||
|
||||
async fn reconstruct_value_inner(
|
||||
&self,
|
||||
key: Key,
|
||||
request_lsn: Lsn,
|
||||
mut data: ValueReconstructState,
|
||||
no_critical_error: bool,
|
||||
redo_attempt_type: RedoAttemptType,
|
||||
) -> Result<Bytes, PageReconstructError> {
|
||||
// Perform WAL redo if needed
|
||||
data.records.reverse();
|
||||
|
||||
let fire_critical_error = match redo_attempt_type {
|
||||
RedoAttemptType::ReadPage => true,
|
||||
RedoAttemptType::LegacyCompaction => true,
|
||||
RedoAttemptType::GcCompaction => false,
|
||||
};
|
||||
|
||||
// If we have a page image, and no WAL, we're all set
|
||||
if data.records.is_empty() {
|
||||
if let Some((img_lsn, img)) = &data.img {
|
||||
@@ -6428,13 +6419,20 @@ impl Timeline {
|
||||
.as_ref()
|
||||
.context("timeline has no walredo manager")
|
||||
.map_err(PageReconstructError::WalRedo)?
|
||||
.request_redo(key, request_lsn, data.img, data.records, self.pg_version)
|
||||
.request_redo(
|
||||
key,
|
||||
request_lsn,
|
||||
data.img,
|
||||
data.records,
|
||||
self.pg_version,
|
||||
redo_attempt_type,
|
||||
)
|
||||
.await;
|
||||
let img = match res {
|
||||
Ok(img) => img,
|
||||
Err(walredo::Error::Cancelled) => return Err(PageReconstructError::Cancelled),
|
||||
Err(walredo::Error::Other(err)) => {
|
||||
if !no_critical_error {
|
||||
if fire_critical_error {
|
||||
critical!("walredo failure during page reconstruction: {err:?}");
|
||||
}
|
||||
return Err(PageReconstructError::WalRedo(
|
||||
|
||||
@@ -7,7 +7,7 @@
|
||||
use std::collections::{BinaryHeap, HashMap, HashSet, VecDeque};
|
||||
use std::ops::{Deref, Range};
|
||||
use std::sync::Arc;
|
||||
use std::time::Instant;
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
use super::layer_manager::LayerManager;
|
||||
use super::{
|
||||
@@ -16,6 +16,8 @@ use super::{
|
||||
Timeline,
|
||||
};
|
||||
|
||||
use crate::tenant::timeline::DeltaEntry;
|
||||
use crate::walredo::RedoAttemptType;
|
||||
use anyhow::{Context, anyhow};
|
||||
use bytes::Bytes;
|
||||
use enumset::EnumSet;
|
||||
@@ -315,6 +317,9 @@ impl GcCompactionQueue {
|
||||
flags: {
|
||||
let mut flags = EnumSet::new();
|
||||
flags |= CompactFlags::EnhancedGcBottomMostCompaction;
|
||||
if timeline.get_compaction_l0_first() {
|
||||
flags |= CompactFlags::YieldForL0;
|
||||
}
|
||||
flags
|
||||
},
|
||||
sub_compaction: true,
|
||||
@@ -820,15 +825,16 @@ pub struct CompactionStatistics {
|
||||
time_acquire_lock_secs: f64,
|
||||
time_analyze_secs: f64,
|
||||
time_download_layer_secs: f64,
|
||||
time_to_first_kv_pair_secs: f64,
|
||||
time_main_loop_secs: f64,
|
||||
time_final_phase_secs: f64,
|
||||
time_total_secs: f64,
|
||||
|
||||
// Summary
|
||||
/// Ratio of the key-value size before/after gc-compaction.
|
||||
uncompressed_size_ratio: f64,
|
||||
/// Ratio of the physical size before/after gc-compaction.
|
||||
physical_size_ratio: f64,
|
||||
/// Ratio of the key-value size after/before gc-compaction.
|
||||
uncompressed_retention_ratio: f64,
|
||||
/// Ratio of the physical size after/before gc-compaction.
|
||||
compressed_retention_ratio: f64,
|
||||
}
|
||||
|
||||
impl CompactionStatistics {
|
||||
@@ -897,15 +903,15 @@ impl CompactionStatistics {
|
||||
fn finalize(&mut self) {
|
||||
let original_key_value_size = self.image_keys_visited.size + self.wal_keys_visited.size;
|
||||
let produced_key_value_size = self.image_produced.size + self.wal_produced.size;
|
||||
self.uncompressed_size_ratio =
|
||||
original_key_value_size as f64 / (produced_key_value_size as f64 + 1.0); // avoid div by 0
|
||||
self.uncompressed_retention_ratio =
|
||||
produced_key_value_size as f64 / (original_key_value_size as f64 + 1.0); // avoid div by 0
|
||||
let original_physical_size = self.image_layer_visited.size + self.delta_layer_visited.size;
|
||||
let produced_physical_size = self.image_layer_produced.size
|
||||
+ self.delta_layer_produced.size
|
||||
+ self.image_layer_discarded.size
|
||||
+ self.delta_layer_discarded.size; // Also include the discarded layers to make the ratio accurate
|
||||
self.physical_size_ratio =
|
||||
original_physical_size as f64 / (produced_physical_size as f64 + 1.0); // avoid div by 0
|
||||
self.compressed_retention_ratio =
|
||||
produced_physical_size as f64 / (original_physical_size as f64 + 1.0); // avoid div by 0
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2416,7 +2422,7 @@ impl Timeline {
|
||||
lsn_split_points[i]
|
||||
};
|
||||
let img = self
|
||||
.reconstruct_value_wo_critical_error(key, request_lsn, state)
|
||||
.reconstruct_value(key, request_lsn, state, RedoAttemptType::GcCompaction)
|
||||
.await?;
|
||||
Some((request_lsn, img))
|
||||
} else {
|
||||
@@ -3037,7 +3043,7 @@ impl Timeline {
|
||||
.map_err(CompactionError::Other)?;
|
||||
|
||||
let time_download_layer = timer.elapsed();
|
||||
let timer = Instant::now();
|
||||
let mut timer = Instant::now();
|
||||
|
||||
// Step 2: Produce images+deltas.
|
||||
let mut accumulated_values = Vec::new();
|
||||
@@ -3115,6 +3121,7 @@ impl Timeline {
|
||||
// Actually, we can decide not to write to the image layer at all at this point because
|
||||
// the key and LSN range are determined. However, to keep things simple here, we still
|
||||
// create this writer, and discard the writer in the end.
|
||||
let mut time_to_first_kv_pair = None;
|
||||
|
||||
while let Some(((key, lsn, val), desc)) = merge_iter
|
||||
.next_with_trace()
|
||||
@@ -3122,6 +3129,11 @@ impl Timeline {
|
||||
.context("failed to get next key-value pair")
|
||||
.map_err(CompactionError::Other)?
|
||||
{
|
||||
if time_to_first_kv_pair.is_none() {
|
||||
time_to_first_kv_pair = Some(timer.elapsed());
|
||||
timer = Instant::now();
|
||||
}
|
||||
|
||||
if cancel.is_cancelled() {
|
||||
return Err(CompactionError::ShuttingDown);
|
||||
}
|
||||
@@ -3463,6 +3475,9 @@ impl Timeline {
|
||||
let time_final_phase = timer.elapsed();
|
||||
|
||||
stat.time_final_phase_secs = time_final_phase.as_secs_f64();
|
||||
stat.time_to_first_kv_pair_secs = time_to_first_kv_pair
|
||||
.unwrap_or(Duration::ZERO)
|
||||
.as_secs_f64();
|
||||
stat.time_main_loop_secs = time_main_loop.as_secs_f64();
|
||||
stat.time_acquire_lock_secs = time_acquire_lock.as_secs_f64();
|
||||
stat.time_download_layer_secs = time_download_layer.as_secs_f64();
|
||||
@@ -3927,8 +3942,6 @@ impl CompactionLayer<Key> for OwnArc<DeltaLayer> {
|
||||
}
|
||||
}
|
||||
|
||||
use crate::tenant::timeline::DeltaEntry;
|
||||
|
||||
impl CompactionLayer<Key> for ResidentDeltaLayer {
|
||||
fn key_range(&self) -> &Range<Key> {
|
||||
&self.0.layer_desc().key_range
|
||||
|
||||
@@ -136,6 +136,16 @@ macro_rules! bail {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy)]
|
||||
pub enum RedoAttemptType {
|
||||
/// Used for the read path. Will fire critical errors and retry twice if failure.
|
||||
ReadPage,
|
||||
// Used for legacy compaction (only used in image compaction). Will fire critical errors and retry once if failure.
|
||||
LegacyCompaction,
|
||||
// Used for gc compaction. Will not fire critical errors and not retry.
|
||||
GcCompaction,
|
||||
}
|
||||
|
||||
///
|
||||
/// Public interface of WAL redo manager
|
||||
///
|
||||
@@ -156,11 +166,18 @@ impl PostgresRedoManager {
|
||||
base_img: Option<(Lsn, Bytes)>,
|
||||
records: Vec<(Lsn, NeonWalRecord)>,
|
||||
pg_version: u32,
|
||||
redo_attempt_type: RedoAttemptType,
|
||||
) -> Result<Bytes, Error> {
|
||||
if records.is_empty() {
|
||||
bail!("invalid WAL redo request with no records");
|
||||
}
|
||||
|
||||
let max_retry_attempts = match redo_attempt_type {
|
||||
RedoAttemptType::ReadPage => 2,
|
||||
RedoAttemptType::LegacyCompaction => 1,
|
||||
RedoAttemptType::GcCompaction => 0,
|
||||
};
|
||||
|
||||
let base_img_lsn = base_img.as_ref().map(|p| p.0).unwrap_or(Lsn::INVALID);
|
||||
let mut img = base_img.map(|p| p.1);
|
||||
let mut batch_neon = apply_neon::can_apply_in_neon(&records[0].1);
|
||||
@@ -180,6 +197,7 @@ impl PostgresRedoManager {
|
||||
&records[batch_start..i],
|
||||
self.conf.wal_redo_timeout,
|
||||
pg_version,
|
||||
max_retry_attempts,
|
||||
)
|
||||
.await
|
||||
};
|
||||
@@ -201,6 +219,7 @@ impl PostgresRedoManager {
|
||||
&records[batch_start..],
|
||||
self.conf.wal_redo_timeout,
|
||||
pg_version,
|
||||
max_retry_attempts,
|
||||
)
|
||||
.await
|
||||
}
|
||||
@@ -424,11 +443,11 @@ impl PostgresRedoManager {
|
||||
records: &[(Lsn, NeonWalRecord)],
|
||||
wal_redo_timeout: Duration,
|
||||
pg_version: u32,
|
||||
max_retry_attempts: u32,
|
||||
) -> Result<Bytes, Error> {
|
||||
*(self.last_redo_at.lock().unwrap()) = Some(Instant::now());
|
||||
|
||||
let (rel, blknum) = key.to_rel_block().context("invalid record")?;
|
||||
const MAX_RETRY_ATTEMPTS: u32 = 1;
|
||||
let mut n_attempts = 0u32;
|
||||
loop {
|
||||
let base_img = &base_img;
|
||||
@@ -486,7 +505,7 @@ impl PostgresRedoManager {
|
||||
info!(n_attempts, "retried walredo succeeded");
|
||||
}
|
||||
n_attempts += 1;
|
||||
if n_attempts > MAX_RETRY_ATTEMPTS || result.is_ok() {
|
||||
if n_attempts > max_retry_attempts || result.is_ok() {
|
||||
return result;
|
||||
}
|
||||
}
|
||||
@@ -560,6 +579,7 @@ mod tests {
|
||||
|
||||
use super::PostgresRedoManager;
|
||||
use crate::config::PageServerConf;
|
||||
use crate::walredo::RedoAttemptType;
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_ping() {
|
||||
@@ -593,6 +613,7 @@ mod tests {
|
||||
None,
|
||||
short_records(),
|
||||
14,
|
||||
RedoAttemptType::ReadPage,
|
||||
)
|
||||
.instrument(h.span())
|
||||
.await
|
||||
@@ -621,6 +642,7 @@ mod tests {
|
||||
None,
|
||||
short_records(),
|
||||
14,
|
||||
RedoAttemptType::ReadPage,
|
||||
)
|
||||
.instrument(h.span())
|
||||
.await
|
||||
@@ -642,6 +664,7 @@ mod tests {
|
||||
None,
|
||||
short_records(),
|
||||
16, /* 16 currently produces stderr output on startup, which adds a nice extra edge */
|
||||
RedoAttemptType::ReadPage,
|
||||
)
|
||||
.instrument(h.span())
|
||||
.await
|
||||
|
||||
@@ -105,7 +105,7 @@ def parse_layer_file_name(file_name: str) -> LayerName:
|
||||
except InvalidFileName:
|
||||
pass
|
||||
|
||||
raise InvalidFileName("neither image nor delta layer")
|
||||
raise InvalidFileName(f"neither image nor delta layer: {file_name}")
|
||||
|
||||
|
||||
def is_future_layer(layer_file_name: LayerName, disk_consistent_lsn: Lsn):
|
||||
|
||||
@@ -40,6 +40,8 @@ def ensure_pageserver_ready_for_benchmarking(env: NeonEnv, n_tenants: int):
|
||||
for layer in info.historic_layers:
|
||||
assert not layer.remote
|
||||
|
||||
env.storage_controller.reconcile_until_idle(timeout_secs=60)
|
||||
|
||||
log.info("ready")
|
||||
|
||||
|
||||
|
||||
@@ -145,11 +145,14 @@ def run_database_maintenance(env: PgCompare):
|
||||
END $$;
|
||||
"""
|
||||
)
|
||||
|
||||
log.info("start REINDEX TABLE CONCURRENTLY transaction.transaction")
|
||||
with env.zenbenchmark.record_duration("reindex concurrently"):
|
||||
cur.execute("REINDEX TABLE CONCURRENTLY transaction.transaction;")
|
||||
log.info("finished REINDEX TABLE CONCURRENTLY transaction.transaction")
|
||||
# in production a customer would likely use reindex concurrently
|
||||
# but for our test we don't care about the downtime
|
||||
# and it would just about double the time we report in the test
|
||||
# because we need one more table scan for each index
|
||||
log.info("start REINDEX TABLE transaction.transaction")
|
||||
with env.zenbenchmark.record_duration("reindex"):
|
||||
cur.execute("REINDEX TABLE transaction.transaction;")
|
||||
log.info("finished REINDEX TABLE transaction.transaction")
|
||||
|
||||
|
||||
@pytest.mark.parametrize("custom_scripts", get_custom_scripts())
|
||||
|
||||
@@ -38,12 +38,34 @@ PREEMPT_COMPACTION_TENANT_CONF = {
|
||||
"compaction_target_size": 1024**2,
|
||||
"image_creation_threshold": 1,
|
||||
"image_creation_preempt_threshold": 1,
|
||||
# compact more frequently
|
||||
# Compact more frequently
|
||||
"compaction_threshold": 3,
|
||||
"compaction_upper_limit": 6,
|
||||
"lsn_lease_length": "0s",
|
||||
}
|
||||
|
||||
PREEMPT_GC_COMPACTION_TENANT_CONF = {
|
||||
"gc_period": "5s",
|
||||
"compaction_period": "5s",
|
||||
# Small checkpoint distance to create many layers
|
||||
"checkpoint_distance": 1024**2,
|
||||
# Compact small layers
|
||||
"compaction_target_size": 1024**2,
|
||||
"image_creation_threshold": 10000, # Do not create image layers at all
|
||||
"image_creation_preempt_threshold": 10000,
|
||||
# Compact more frequently
|
||||
"compaction_threshold": 3,
|
||||
"compaction_upper_limit": 6,
|
||||
"lsn_lease_length": "0s",
|
||||
# Enable gc-compaction
|
||||
"gc_compaction_enabled": "true",
|
||||
"gc_compaction_initial_threshold_kb": 1024, # At a small threshold
|
||||
"gc_compaction_ratio_percent": 1,
|
||||
# No PiTR interval and small GC horizon
|
||||
"pitr_interval": "0s",
|
||||
"gc_horizon": f"{1024**2}",
|
||||
}
|
||||
|
||||
|
||||
@skip_in_debug_build("only run with release build")
|
||||
@pytest.mark.parametrize(
|
||||
@@ -165,6 +187,41 @@ def test_pageserver_compaction_preempt(
|
||||
env.pageserver.assert_log_contains("resuming image layer creation")
|
||||
|
||||
|
||||
@skip_in_debug_build("only run with release build")
|
||||
def test_pageserver_gc_compaction_preempt(
|
||||
neon_env_builder: NeonEnvBuilder,
|
||||
):
|
||||
# Ideally we should be able to do unit tests for this, but we need real Postgres
|
||||
# WALs in order to do unit testing...
|
||||
|
||||
conf = PREEMPT_GC_COMPACTION_TENANT_CONF.copy()
|
||||
env = neon_env_builder.init_start(initial_tenant_conf=conf)
|
||||
|
||||
tenant_id = env.initial_tenant
|
||||
timeline_id = env.initial_timeline
|
||||
|
||||
row_count = 200000
|
||||
churn_rounds = 10
|
||||
|
||||
ps_http = env.pageserver.http_client()
|
||||
|
||||
workload = Workload(env, tenant_id, timeline_id)
|
||||
workload.init(env.pageserver.id)
|
||||
|
||||
log.info("Writing initial data ...")
|
||||
workload.write_rows(row_count, env.pageserver.id)
|
||||
|
||||
for i in range(1, churn_rounds + 1):
|
||||
log.info(f"Running churn round {i}/{churn_rounds} ...")
|
||||
workload.churn_rows(row_count, env.pageserver.id, upload=False)
|
||||
workload.validate(env.pageserver.id)
|
||||
ps_http.timeline_compact(tenant_id, timeline_id, wait_until_uploaded=True)
|
||||
log.info("Validating at workload end ...")
|
||||
workload.validate(env.pageserver.id)
|
||||
# ensure gc_compaction gets preempted and then resumed
|
||||
env.pageserver.assert_log_contains("preempt gc-compaction")
|
||||
|
||||
|
||||
@skip_in_debug_build("only run with release build")
|
||||
@pytest.mark.timeout(900) # This test is slow with sanitizers enabled, especially on ARM
|
||||
@pytest.mark.parametrize(
|
||||
|
||||
@@ -61,7 +61,7 @@ def evict_random_layers(
|
||||
)
|
||||
client = pageserver.http_client()
|
||||
for layer in initial_local_layers:
|
||||
if "ephemeral" in layer.name or "temp_download" in layer.name:
|
||||
if "ephemeral" in layer.name or "temp_download" in layer.name or ".___temp" in layer.name:
|
||||
continue
|
||||
|
||||
layer_name = parse_layer_file_name(layer.name)
|
||||
|
||||
Reference in New Issue
Block a user