diff --git a/Cargo.toml b/Cargo.toml index 963841e340..e038c0b4ff 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -113,7 +113,7 @@ md5 = "0.7.0" measured = { version = "0.0.22", features=["lasso"] } measured-process = { version = "0.0.22" } memoffset = "0.8" -nix = { version = "0.27", features = ["fs", "process", "socket", "signal", "poll"] } +nix = { version = "0.27", features = ["dir", "fs", "process", "socket", "signal", "poll"] } notify = "6.0.0" num_cpus = "1.15" num-traits = "0.2.15" diff --git a/libs/pageserver_api/src/key.rs b/libs/pageserver_api/src/key.rs index 2fdd7de38f..77da58d63e 100644 --- a/libs/pageserver_api/src/key.rs +++ b/libs/pageserver_api/src/key.rs @@ -236,6 +236,15 @@ impl Key { field5: u8::MAX, field6: u32::MAX, }; + /// A key slightly smaller than [`Key::MAX`] for use in layer key ranges to avoid them to be confused with L0 layers + pub const NON_L0_MAX: Key = Key { + field1: u8::MAX, + field2: u32::MAX, + field3: u32::MAX, + field4: u32::MAX, + field5: u8::MAX, + field6: u32::MAX - 1, + }; pub fn from_hex(s: &str) -> Result { if s.len() != 36 { diff --git a/libs/pageserver_api/src/models.rs b/libs/pageserver_api/src/models.rs index d55c06b685..4cab56771b 100644 --- a/libs/pageserver_api/src/models.rs +++ b/libs/pageserver_api/src/models.rs @@ -718,6 +718,7 @@ pub struct TimelineInfo { pub pg_version: u32, pub state: TimelineState, + pub is_archived: bool, pub walreceiver_status: String, diff --git a/pageserver/src/bin/pageserver.rs b/pageserver/src/bin/pageserver.rs index da0c11d9bf..7d404e50a5 100644 --- a/pageserver/src/bin/pageserver.rs +++ b/pageserver/src/bin/pageserver.rs @@ -126,10 +126,56 @@ fn main() -> anyhow::Result<()> { info!(?conf.virtual_file_direct_io, "starting with virtual_file Direct IO settings"); info!(?conf.compact_level0_phase1_value_access, "starting with setting for compact_level0_phase1_value_access"); + // The tenants directory contains all the pageserver local disk state. + // Create if not exists and make sure all the contents are durable before proceeding. + // Ensuring durability eliminates a whole bug class where we come up after an unclean shutdown. + // After unclea shutdown, we don't know if all the filesystem content we can read via syscalls is actually durable or not. + // Examples for that: OOM kill, systemd killing us during shutdown, self abort due to unrecoverable IO error. let tenants_path = conf.tenants_path(); - if !tenants_path.exists() { - utils::crashsafe::create_dir_all(conf.tenants_path()) - .with_context(|| format!("Failed to create tenants root dir at '{tenants_path}'"))?; + { + let open = || { + nix::dir::Dir::open( + tenants_path.as_std_path(), + nix::fcntl::OFlag::O_DIRECTORY | nix::fcntl::OFlag::O_RDONLY, + nix::sys::stat::Mode::empty(), + ) + }; + let dirfd = match open() { + Ok(dirfd) => dirfd, + Err(e) => match e { + nix::errno::Errno::ENOENT => { + utils::crashsafe::create_dir_all(&tenants_path).with_context(|| { + format!("Failed to create tenants root dir at '{tenants_path}'") + })?; + open().context("open tenants dir after creating it")? + } + e => anyhow::bail!(e), + }, + }; + + let started = Instant::now(); + // Linux guarantees durability for syncfs. + // POSIX doesn't have syncfs, and further does not actually guarantee durability of sync(). + #[cfg(target_os = "linux")] + { + use std::os::fd::AsRawFd; + nix::unistd::syncfs(dirfd.as_raw_fd()).context("syncfs")?; + } + #[cfg(target_os = "macos")] + { + // macOS is not a production platform for Neon, don't even bother. + drop(dirfd); + } + #[cfg(not(any(target_os = "linux", target_os = "macos")))] + { + compile_error!("Unsupported OS"); + } + + let elapsed = started.elapsed(); + info!( + elapsed_ms = elapsed.as_millis(), + "made tenant directory contents durable" + ); } // Initialize up failpoints support diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index 4635e76ea9..cbcc162b32 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -318,6 +318,24 @@ impl From for ApiError { } } +impl From for ApiError { + fn from(value: crate::tenant::TimelineArchivalError) -> Self { + use crate::tenant::TimelineArchivalError::*; + match value { + NotFound => ApiError::NotFound(anyhow::anyhow!("timeline not found").into()), + Timeout => ApiError::Timeout("hit pageserver internal timeout".into()), + HasUnarchivedChildren(children) => ApiError::PreconditionFailed( + format!( + "Cannot archive timeline which has non-archived child timelines: {children:?}" + ) + .into_boxed_str(), + ), + a @ AlreadyInProgress => ApiError::Conflict(a.to_string()), + Other(e) => ApiError::InternalServerError(e), + } + } +} + impl From for ApiError { fn from(value: crate::tenant::mgr::DeleteTimelineError) -> Self { use crate::tenant::mgr::DeleteTimelineError::*; @@ -405,6 +423,8 @@ async fn build_timeline_info_common( let current_logical_size = timeline.get_current_logical_size(logical_size_task_priority, ctx); let current_physical_size = Some(timeline.layer_size_sum().await); let state = timeline.current_state(); + // Report is_archived = false if the timeline is still loading + let is_archived = timeline.is_archived().unwrap_or(false); let remote_consistent_lsn_projected = timeline .get_remote_consistent_lsn_projected() .unwrap_or(Lsn(0)); @@ -445,6 +465,7 @@ async fn build_timeline_info_common( pg_version: timeline.pg_version, state, + is_archived, walreceiver_status, @@ -686,9 +707,7 @@ async fn timeline_archival_config_handler( tenant .apply_timeline_archival_config(timeline_id, request_data.state) - .await - .context("applying archival config") - .map_err(ApiError::InternalServerError)?; + .await?; Ok::<_, ApiError>(()) } .instrument(info_span!("timeline_archival_config", diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index 3a7afff211..0364d521b6 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -501,6 +501,38 @@ impl Debug for DeleteTimelineError { } } +#[derive(thiserror::Error)] +pub enum TimelineArchivalError { + #[error("NotFound")] + NotFound, + + #[error("Timeout")] + Timeout, + + #[error("HasUnarchivedChildren")] + HasUnarchivedChildren(Vec), + + #[error("Timeline archival is already in progress")] + AlreadyInProgress, + + #[error(transparent)] + Other(#[from] anyhow::Error), +} + +impl Debug for TimelineArchivalError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::NotFound => write!(f, "NotFound"), + Self::Timeout => write!(f, "Timeout"), + Self::HasUnarchivedChildren(c) => { + f.debug_tuple("HasUnarchivedChildren").field(c).finish() + } + Self::AlreadyInProgress => f.debug_tuple("AlreadyInProgress").finish(), + Self::Other(e) => f.debug_tuple("Other").field(e).finish(), + } + } +} + pub enum SetStoppingError { AlreadyStopping(completion::Barrier), Broken, @@ -1326,24 +1358,50 @@ impl Tenant { &self, timeline_id: TimelineId, state: TimelineArchivalState, - ) -> anyhow::Result<()> { - let timeline = self - .get_timeline(timeline_id, false) - .context("Cannot apply timeline archival config to inexistent timeline")?; + ) -> Result<(), TimelineArchivalError> { + info!("setting timeline archival config"); + let timeline = { + let timelines = self.timelines.lock().unwrap(); + + let timeline = match timelines.get(&timeline_id) { + Some(t) => t, + None => return Err(TimelineArchivalError::NotFound), + }; + + // Ensure that there are no non-archived child timelines + let children: Vec = timelines + .iter() + .filter_map(|(id, entry)| { + if entry.get_ancestor_timeline_id() != Some(timeline_id) { + return None; + } + if entry.is_archived() == Some(true) { + return None; + } + Some(*id) + }) + .collect(); + + if !children.is_empty() && state == TimelineArchivalState::Archived { + return Err(TimelineArchivalError::HasUnarchivedChildren(children)); + } + Arc::clone(timeline) + }; let upload_needed = timeline .remote_client .schedule_index_upload_for_timeline_archival_state(state)?; if upload_needed { + info!("Uploading new state"); const MAX_WAIT: Duration = Duration::from_secs(10); let Ok(v) = tokio::time::timeout(MAX_WAIT, timeline.remote_client.wait_completion()).await else { tracing::warn!("reached timeout for waiting on upload queue"); - bail!("reached timeout for upload queue flush"); + return Err(TimelineArchivalError::Timeout); }; - v?; + v.map_err(|e| TimelineArchivalError::Other(anyhow::anyhow!(e)))?; } Ok(()) } @@ -7013,18 +7071,14 @@ mod tests { vec![ // Image layer at GC horizon PersistentLayerKey { - key_range: { - let mut key = Key::MAX; - key.field6 -= 1; - Key::MIN..key - }, + key_range: Key::MIN..Key::NON_L0_MAX, lsn_range: Lsn(0x30)..Lsn(0x31), is_delta: false }, - // The delta layer that is cut in the middle + // The delta layer covers the full range (with the layer key hack to avoid being recognized as L0) PersistentLayerKey { - key_range: get_key(3)..get_key(4), - lsn_range: Lsn(0x30)..Lsn(0x41), + key_range: Key::MIN..Key::NON_L0_MAX, + lsn_range: Lsn(0x30)..Lsn(0x48), is_delta: true }, // The delta3 layer that should not be picked for the compaction @@ -8004,6 +8058,214 @@ mod tests { Ok(()) } + #[tokio::test] + async fn test_simple_bottom_most_compaction_with_retain_lsns_single_key() -> anyhow::Result<()> + { + let harness = + TenantHarness::create("test_simple_bottom_most_compaction_with_retain_lsns_single_key") + .await?; + let (tenant, ctx) = harness.load().await; + + fn get_key(id: u32) -> Key { + // using aux key here b/c they are guaranteed to be inside `collect_keyspace`. + let mut key = Key::from_hex("620000000033333333444444445500000000").unwrap(); + key.field6 = id; + key + } + + let img_layer = (0..10) + .map(|id| (get_key(id), Bytes::from(format!("value {id}@0x10")))) + .collect_vec(); + + let delta1 = vec![ + ( + get_key(1), + Lsn(0x20), + Value::WalRecord(NeonWalRecord::wal_append("@0x20")), + ), + ( + get_key(1), + Lsn(0x28), + Value::WalRecord(NeonWalRecord::wal_append("@0x28")), + ), + ]; + let delta2 = vec![ + ( + get_key(1), + Lsn(0x30), + Value::WalRecord(NeonWalRecord::wal_append("@0x30")), + ), + ( + get_key(1), + Lsn(0x38), + Value::WalRecord(NeonWalRecord::wal_append("@0x38")), + ), + ]; + let delta3 = vec![ + ( + get_key(8), + Lsn(0x48), + Value::WalRecord(NeonWalRecord::wal_append("@0x48")), + ), + ( + get_key(9), + Lsn(0x48), + Value::WalRecord(NeonWalRecord::wal_append("@0x48")), + ), + ]; + + let tline = tenant + .create_test_timeline_with_layers( + TIMELINE_ID, + Lsn(0x10), + DEFAULT_PG_VERSION, + &ctx, + vec![ + // delta1 and delta 2 only contain a single key but multiple updates + DeltaLayerTestDesc::new_with_inferred_key_range(Lsn(0x10)..Lsn(0x30), delta1), + DeltaLayerTestDesc::new_with_inferred_key_range(Lsn(0x30)..Lsn(0x50), delta2), + DeltaLayerTestDesc::new_with_inferred_key_range(Lsn(0x10)..Lsn(0x50), delta3), + ], // delta layers + vec![(Lsn(0x10), img_layer)], // image layers + Lsn(0x50), + ) + .await?; + { + // Update GC info + let mut guard = tline.gc_info.write().unwrap(); + *guard = GcInfo { + retain_lsns: vec![ + (Lsn(0x10), tline.timeline_id), + (Lsn(0x20), tline.timeline_id), + ], + cutoffs: GcCutoffs { + time: Lsn(0x30), + space: Lsn(0x30), + }, + leases: Default::default(), + within_ancestor_pitr: false, + }; + } + + let expected_result = [ + Bytes::from_static(b"value 0@0x10"), + Bytes::from_static(b"value 1@0x10@0x20@0x28@0x30@0x38"), + Bytes::from_static(b"value 2@0x10"), + Bytes::from_static(b"value 3@0x10"), + Bytes::from_static(b"value 4@0x10"), + Bytes::from_static(b"value 5@0x10"), + Bytes::from_static(b"value 6@0x10"), + Bytes::from_static(b"value 7@0x10"), + Bytes::from_static(b"value 8@0x10@0x48"), + Bytes::from_static(b"value 9@0x10@0x48"), + ]; + + let expected_result_at_gc_horizon = [ + Bytes::from_static(b"value 0@0x10"), + Bytes::from_static(b"value 1@0x10@0x20@0x28@0x30"), + Bytes::from_static(b"value 2@0x10"), + Bytes::from_static(b"value 3@0x10"), + Bytes::from_static(b"value 4@0x10"), + Bytes::from_static(b"value 5@0x10"), + Bytes::from_static(b"value 6@0x10"), + Bytes::from_static(b"value 7@0x10"), + Bytes::from_static(b"value 8@0x10"), + Bytes::from_static(b"value 9@0x10"), + ]; + + let expected_result_at_lsn_20 = [ + Bytes::from_static(b"value 0@0x10"), + Bytes::from_static(b"value 1@0x10@0x20"), + Bytes::from_static(b"value 2@0x10"), + Bytes::from_static(b"value 3@0x10"), + Bytes::from_static(b"value 4@0x10"), + Bytes::from_static(b"value 5@0x10"), + Bytes::from_static(b"value 6@0x10"), + Bytes::from_static(b"value 7@0x10"), + Bytes::from_static(b"value 8@0x10"), + Bytes::from_static(b"value 9@0x10"), + ]; + + let expected_result_at_lsn_10 = [ + Bytes::from_static(b"value 0@0x10"), + Bytes::from_static(b"value 1@0x10"), + Bytes::from_static(b"value 2@0x10"), + Bytes::from_static(b"value 3@0x10"), + Bytes::from_static(b"value 4@0x10"), + Bytes::from_static(b"value 5@0x10"), + Bytes::from_static(b"value 6@0x10"), + Bytes::from_static(b"value 7@0x10"), + Bytes::from_static(b"value 8@0x10"), + Bytes::from_static(b"value 9@0x10"), + ]; + + let verify_result = || async { + let gc_horizon = { + let gc_info = tline.gc_info.read().unwrap(); + gc_info.cutoffs.time + }; + for idx in 0..10 { + assert_eq!( + tline + .get(get_key(idx as u32), Lsn(0x50), &ctx) + .await + .unwrap(), + &expected_result[idx] + ); + assert_eq!( + tline + .get(get_key(idx as u32), gc_horizon, &ctx) + .await + .unwrap(), + &expected_result_at_gc_horizon[idx] + ); + assert_eq!( + tline + .get(get_key(idx as u32), Lsn(0x20), &ctx) + .await + .unwrap(), + &expected_result_at_lsn_20[idx] + ); + assert_eq!( + tline + .get(get_key(idx as u32), Lsn(0x10), &ctx) + .await + .unwrap(), + &expected_result_at_lsn_10[idx] + ); + } + }; + + verify_result().await; + + let cancel = CancellationToken::new(); + let mut dryrun_flags = EnumSet::new(); + dryrun_flags.insert(CompactFlags::DryRun); + + tline + .compact_with_gc(&cancel, dryrun_flags, &ctx) + .await + .unwrap(); + // We expect layer map to be the same b/c the dry run flag, but we don't know whether there will be other background jobs + // cleaning things up, and therefore, we don't do sanity checks on the layer map during unit tests. + verify_result().await; + + tline + .compact_with_gc(&cancel, EnumSet::new(), &ctx) + .await + .unwrap(); + verify_result().await; + + // compact again + tline + .compact_with_gc(&cancel, EnumSet::new(), &ctx) + .await + .unwrap(); + verify_result().await; + + Ok(()) + } + #[tokio::test] async fn test_simple_bottom_most_compaction_on_branch() -> anyhow::Result<()> { let harness = TenantHarness::create("test_simple_bottom_most_compaction_on_branch").await?; diff --git a/pageserver/src/tenant/storage_layer.rs b/pageserver/src/tenant/storage_layer.rs index 133b34b8b5..a1202ad507 100644 --- a/pageserver/src/tenant/storage_layer.rs +++ b/pageserver/src/tenant/storage_layer.rs @@ -8,7 +8,6 @@ mod layer_desc; mod layer_name; pub mod merge_iterator; -#[cfg(test)] pub mod split_writer; use crate::context::{AccessStatsBehavior, RequestContext}; diff --git a/pageserver/src/tenant/storage_layer/delta_layer.rs b/pageserver/src/tenant/storage_layer/delta_layer.rs index b1b5217f7f..f4a2957972 100644 --- a/pageserver/src/tenant/storage_layer/delta_layer.rs +++ b/pageserver/src/tenant/storage_layer/delta_layer.rs @@ -36,6 +36,7 @@ use crate::tenant::block_io::{BlockBuf, BlockCursor, BlockLease, BlockReader, Fi use crate::tenant::disk_btree::{ DiskBtreeBuilder, DiskBtreeIterator, DiskBtreeReader, VisitDirection, }; +use crate::tenant::storage_layer::layer::S3_UPLOAD_LIMIT; use crate::tenant::timeline::GetVectoredError; use crate::tenant::vectored_blob_io::{ BlobFlag, MaxVectoredReadBytes, StreamingVectoredReadPlanner, VectoredBlobReader, VectoredRead, @@ -568,7 +569,6 @@ impl DeltaLayerWriterInner { // 5GB limit for objects without multipart upload (which we don't want to use) // Make it a little bit below to account for differing GB units // https://docs.aws.amazon.com/AmazonS3/latest/userguide/upload-objects.html - const S3_UPLOAD_LIMIT: u64 = 4_500_000_000; ensure!( metadata.len() <= S3_UPLOAD_LIMIT, "Created delta layer file at {} of size {} above limit {S3_UPLOAD_LIMIT}!", @@ -702,12 +702,10 @@ impl DeltaLayerWriter { self.inner.take().unwrap().finish(key_end, ctx).await } - #[cfg(test)] pub(crate) fn num_keys(&self) -> usize { self.inner.as_ref().unwrap().num_keys } - #[cfg(test)] pub(crate) fn estimated_size(&self) -> u64 { let inner = self.inner.as_ref().unwrap(); inner.blob_writer.size() + inner.tree.borrow_writer().size() + PAGE_SZ as u64 diff --git a/pageserver/src/tenant/storage_layer/image_layer.rs b/pageserver/src/tenant/storage_layer/image_layer.rs index 94120a4e3e..3cb2b1c83a 100644 --- a/pageserver/src/tenant/storage_layer/image_layer.rs +++ b/pageserver/src/tenant/storage_layer/image_layer.rs @@ -716,10 +716,6 @@ struct ImageLayerWriterInner { } impl ImageLayerWriterInner { - fn size(&self) -> u64 { - self.tree.borrow_writer().size() + self.blob_writer.size() - } - /// /// Start building a new image layer. /// @@ -854,13 +850,19 @@ impl ImageLayerWriterInner { res?; } + let final_key_range = if let Some(end_key) = end_key { + self.key_range.start..end_key + } else { + self.key_range.clone() + }; + // Fill in the summary on blk 0 let summary = Summary { magic: IMAGE_FILE_MAGIC, format_version: STORAGE_FORMAT_VERSION, tenant_id: self.tenant_shard_id.tenant_id, timeline_id: self.timeline_id, - key_range: self.key_range.clone(), + key_range: final_key_range.clone(), lsn: self.lsn, index_start_blk, index_root_blk, @@ -881,11 +883,7 @@ impl ImageLayerWriterInner { let desc = PersistentLayerDesc::new_img( self.tenant_shard_id, self.timeline_id, - if let Some(end_key) = end_key { - self.key_range.start..end_key - } else { - self.key_range.clone() - }, + final_key_range, self.lsn, metadata.len(), ); @@ -974,14 +972,12 @@ impl ImageLayerWriter { self.inner.as_mut().unwrap().put_image(key, img, ctx).await } - #[cfg(test)] /// Estimated size of the image layer. pub(crate) fn estimated_size(&self) -> u64 { let inner = self.inner.as_ref().unwrap(); inner.blob_writer.size() + inner.tree.borrow_writer().size() + PAGE_SZ as u64 } - #[cfg(test)] pub(crate) fn num_keys(&self) -> usize { self.inner.as_ref().unwrap().num_keys } @@ -997,7 +993,6 @@ impl ImageLayerWriter { self.inner.take().unwrap().finish(timeline, ctx, None).await } - #[cfg(test)] /// Finish writing the image layer with an end key, used in [`super::split_writer::SplitImageLayerWriter`]. The end key determines the end of the image layer's covered range and is exclusive. pub(super) async fn finish_with_end_key( mut self, @@ -1011,10 +1006,6 @@ impl ImageLayerWriter { .finish(timeline, ctx, Some(end_key)) .await } - - pub(crate) fn size(&self) -> u64 { - self.inner.as_ref().unwrap().size() - } } impl Drop for ImageLayerWriter { diff --git a/pageserver/src/tenant/storage_layer/layer.rs b/pageserver/src/tenant/storage_layer/layer.rs index 2607b574e7..53bb66b95e 100644 --- a/pageserver/src/tenant/storage_layer/layer.rs +++ b/pageserver/src/tenant/storage_layer/layer.rs @@ -35,6 +35,8 @@ mod tests; #[cfg(test)] mod failpoints; +pub const S3_UPLOAD_LIMIT: u64 = 4_500_000_000; + /// A Layer contains all data in a "rectangle" consisting of a range of keys and /// range of LSNs. /// diff --git a/pageserver/src/tenant/storage_layer/split_writer.rs b/pageserver/src/tenant/storage_layer/split_writer.rs index e12e29cd45..df910b5ad9 100644 --- a/pageserver/src/tenant/storage_layer/split_writer.rs +++ b/pageserver/src/tenant/storage_layer/split_writer.rs @@ -1,4 +1,4 @@ -use std::{ops::Range, sync::Arc}; +use std::{future::Future, ops::Range, sync::Arc}; use bytes::Bytes; use pageserver_api::key::{Key, KEY_SIZE}; @@ -7,7 +7,32 @@ use utils::{id::TimelineId, lsn::Lsn, shard::TenantShardId}; use crate::tenant::storage_layer::Layer; use crate::{config::PageServerConf, context::RequestContext, repository::Value, tenant::Timeline}; -use super::{DeltaLayerWriter, ImageLayerWriter, ResidentLayer}; +use super::layer::S3_UPLOAD_LIMIT; +use super::{ + DeltaLayerWriter, ImageLayerWriter, PersistentLayerDesc, PersistentLayerKey, ResidentLayer, +}; + +pub(crate) enum SplitWriterResult { + Produced(ResidentLayer), + Discarded(PersistentLayerKey), +} + +#[cfg(test)] +impl SplitWriterResult { + fn into_resident_layer(self) -> ResidentLayer { + match self { + SplitWriterResult::Produced(layer) => layer, + SplitWriterResult::Discarded(_) => panic!("unexpected discarded layer"), + } + } + + fn into_discarded_layer(self) -> PersistentLayerKey { + match self { + SplitWriterResult::Produced(_) => panic!("unexpected produced layer"), + SplitWriterResult::Discarded(layer) => layer, + } + } +} /// An image writer that takes images and produces multiple image layers. The interface does not /// guarantee atomicity (i.e., if the image layer generation fails, there might be leftover files @@ -16,11 +41,12 @@ use super::{DeltaLayerWriter, ImageLayerWriter, ResidentLayer}; pub struct SplitImageLayerWriter { inner: ImageLayerWriter, target_layer_size: u64, - generated_layers: Vec, + generated_layers: Vec, conf: &'static PageServerConf, timeline_id: TimelineId, tenant_shard_id: TenantShardId, lsn: Lsn, + start_key: Key, } impl SplitImageLayerWriter { @@ -49,16 +75,22 @@ impl SplitImageLayerWriter { timeline_id, tenant_shard_id, lsn, + start_key, }) } - pub async fn put_image( + pub async fn put_image_with_discard_fn( &mut self, key: Key, img: Bytes, tline: &Arc, ctx: &RequestContext, - ) -> anyhow::Result<()> { + discard: D, + ) -> anyhow::Result<()> + where + D: FnOnce(&PersistentLayerKey) -> F, + F: Future, + { // The current estimation is an upper bound of the space that the key/image could take // because we did not consider compression in this estimation. The resulting image layer // could be smaller than the target size. @@ -76,33 +108,87 @@ impl SplitImageLayerWriter { ) .await?; let prev_image_writer = std::mem::replace(&mut self.inner, next_image_writer); - self.generated_layers.push( - prev_image_writer - .finish_with_end_key(tline, key, ctx) - .await?, - ); + let layer_key = PersistentLayerKey { + key_range: self.start_key..key, + lsn_range: PersistentLayerDesc::image_layer_lsn_range(self.lsn), + is_delta: false, + }; + self.start_key = key; + + if discard(&layer_key).await { + drop(prev_image_writer); + self.generated_layers + .push(SplitWriterResult::Discarded(layer_key)); + } else { + self.generated_layers.push(SplitWriterResult::Produced( + prev_image_writer + .finish_with_end_key(tline, key, ctx) + .await?, + )); + } } self.inner.put_image(key, img, ctx).await } - pub(crate) async fn finish( + #[cfg(test)] + pub async fn put_image( + &mut self, + key: Key, + img: Bytes, + tline: &Arc, + ctx: &RequestContext, + ) -> anyhow::Result<()> { + self.put_image_with_discard_fn(key, img, tline, ctx, |_| async { false }) + .await + } + + pub(crate) async fn finish_with_discard_fn( self, tline: &Arc, ctx: &RequestContext, end_key: Key, - ) -> anyhow::Result> { + discard: D, + ) -> anyhow::Result> + where + D: FnOnce(&PersistentLayerKey) -> F, + F: Future, + { let Self { mut generated_layers, inner, .. } = self; - generated_layers.push(inner.finish_with_end_key(tline, end_key, ctx).await?); + if inner.num_keys() == 0 { + return Ok(generated_layers); + } + let layer_key = PersistentLayerKey { + key_range: self.start_key..end_key, + lsn_range: PersistentLayerDesc::image_layer_lsn_range(self.lsn), + is_delta: false, + }; + if discard(&layer_key).await { + generated_layers.push(SplitWriterResult::Discarded(layer_key)); + } else { + generated_layers.push(SplitWriterResult::Produced( + inner.finish_with_end_key(tline, end_key, ctx).await?, + )); + } Ok(generated_layers) } + #[cfg(test)] + pub(crate) async fn finish( + self, + tline: &Arc, + ctx: &RequestContext, + end_key: Key, + ) -> anyhow::Result> { + self.finish_with_discard_fn(tline, ctx, end_key, |_| async { false }) + .await + } + /// When split writer fails, the caller should call this function and handle partially generated layers. - #[allow(dead_code)] - pub(crate) async fn take(self) -> anyhow::Result<(Vec, ImageLayerWriter)> { + pub(crate) fn take(self) -> anyhow::Result<(Vec, ImageLayerWriter)> { Ok((self.generated_layers, self.inner)) } } @@ -110,15 +196,21 @@ impl SplitImageLayerWriter { /// A delta writer that takes key-lsn-values and produces multiple delta layers. The interface does not /// guarantee atomicity (i.e., if the delta layer generation fails, there might be leftover files /// to be cleaned up). +/// +/// Note that if updates of a single key exceed the target size limit, all of the updates will be batched +/// into a single file. This behavior might change in the future. For reference, the legacy compaction algorithm +/// will split them into multiple files based on size. #[must_use] pub struct SplitDeltaLayerWriter { inner: DeltaLayerWriter, target_layer_size: u64, - generated_layers: Vec, + generated_layers: Vec, conf: &'static PageServerConf, timeline_id: TimelineId, tenant_shard_id: TenantShardId, lsn_range: Range, + last_key_written: Key, + start_key: Key, } impl SplitDeltaLayerWriter { @@ -147,9 +239,74 @@ impl SplitDeltaLayerWriter { timeline_id, tenant_shard_id, lsn_range, + last_key_written: Key::MIN, + start_key, }) } + /// Put value into the layer writer. In the case the writer decides to produce a layer, and the discard fn returns true, no layer will be written in the end. + pub async fn put_value_with_discard_fn( + &mut self, + key: Key, + lsn: Lsn, + val: Value, + tline: &Arc, + ctx: &RequestContext, + discard: D, + ) -> anyhow::Result<()> + where + D: FnOnce(&PersistentLayerKey) -> F, + F: Future, + { + // The current estimation is key size plus LSN size plus value size estimation. This is not an accurate + // number, and therefore the final layer size could be a little bit larger or smaller than the target. + // + // Also, keep all updates of a single key in a single file. TODO: split them using the legacy compaction + // strategy. https://github.com/neondatabase/neon/issues/8837 + let addition_size_estimation = KEY_SIZE as u64 + 8 /* LSN u64 size */ + 80 /* value size estimation */; + if self.inner.num_keys() >= 1 + && self.inner.estimated_size() + addition_size_estimation >= self.target_layer_size + { + if key != self.last_key_written { + let next_delta_writer = DeltaLayerWriter::new( + self.conf, + self.timeline_id, + self.tenant_shard_id, + key, + self.lsn_range.clone(), + ctx, + ) + .await?; + let prev_delta_writer = std::mem::replace(&mut self.inner, next_delta_writer); + let layer_key = PersistentLayerKey { + key_range: self.start_key..key, + lsn_range: self.lsn_range.clone(), + is_delta: true, + }; + self.start_key = key; + if discard(&layer_key).await { + drop(prev_delta_writer); + self.generated_layers + .push(SplitWriterResult::Discarded(layer_key)); + } else { + let (desc, path) = prev_delta_writer.finish(key, ctx).await?; + let delta_layer = Layer::finish_creating(self.conf, tline, desc, &path)?; + self.generated_layers + .push(SplitWriterResult::Produced(delta_layer)); + } + } else if self.inner.estimated_size() >= S3_UPLOAD_LIMIT { + // We have to produce a very large file b/c a key is updated too often. + anyhow::bail!( + "a single key is updated too often: key={}, estimated_size={}, and the layer file cannot be produced", + key, + self.inner.estimated_size() + ); + } + } + self.last_key_written = key; + self.inner.put_value(key, lsn, val, ctx).await + } + pub async fn put_value( &mut self, key: Key, @@ -158,56 +315,64 @@ impl SplitDeltaLayerWriter { tline: &Arc, ctx: &RequestContext, ) -> anyhow::Result<()> { - // The current estimation is key size plus LSN size plus value size estimation. This is not an accurate - // number, and therefore the final layer size could be a little bit larger or smaller than the target. - let addition_size_estimation = KEY_SIZE as u64 + 8 /* LSN u64 size */ + 80 /* value size estimation */; - if self.inner.num_keys() >= 1 - && self.inner.estimated_size() + addition_size_estimation >= self.target_layer_size - { - let next_delta_writer = DeltaLayerWriter::new( - self.conf, - self.timeline_id, - self.tenant_shard_id, - key, - self.lsn_range.clone(), - ctx, - ) - .await?; - let prev_delta_writer = std::mem::replace(&mut self.inner, next_delta_writer); - let (desc, path) = prev_delta_writer.finish(key, ctx).await?; - let delta_layer = Layer::finish_creating(self.conf, tline, desc, &path)?; - self.generated_layers.push(delta_layer); - } - self.inner.put_value(key, lsn, val, ctx).await + self.put_value_with_discard_fn(key, lsn, val, tline, ctx, |_| async { false }) + .await } - pub(crate) async fn finish( + pub(crate) async fn finish_with_discard_fn( self, tline: &Arc, ctx: &RequestContext, end_key: Key, - ) -> anyhow::Result> { + discard: D, + ) -> anyhow::Result> + where + D: FnOnce(&PersistentLayerKey) -> F, + F: Future, + { let Self { mut generated_layers, inner, .. } = self; - - let (desc, path) = inner.finish(end_key, ctx).await?; - let delta_layer = Layer::finish_creating(self.conf, tline, desc, &path)?; - generated_layers.push(delta_layer); + if inner.num_keys() == 0 { + return Ok(generated_layers); + } + let layer_key = PersistentLayerKey { + key_range: self.start_key..end_key, + lsn_range: self.lsn_range.clone(), + is_delta: true, + }; + if discard(&layer_key).await { + generated_layers.push(SplitWriterResult::Discarded(layer_key)); + } else { + let (desc, path) = inner.finish(end_key, ctx).await?; + let delta_layer = Layer::finish_creating(self.conf, tline, desc, &path)?; + generated_layers.push(SplitWriterResult::Produced(delta_layer)); + } Ok(generated_layers) } - /// When split writer fails, the caller should call this function and handle partially generated layers. #[allow(dead_code)] - pub(crate) async fn take(self) -> anyhow::Result<(Vec, DeltaLayerWriter)> { + pub(crate) async fn finish( + self, + tline: &Arc, + ctx: &RequestContext, + end_key: Key, + ) -> anyhow::Result> { + self.finish_with_discard_fn(tline, ctx, end_key, |_| async { false }) + .await + } + + /// When split writer fails, the caller should call this function and handle partially generated layers. + pub(crate) fn take(self) -> anyhow::Result<(Vec, DeltaLayerWriter)> { Ok((self.generated_layers, self.inner)) } } #[cfg(test)] mod tests { + use itertools::Itertools; use rand::{RngCore, SeedableRng}; use crate::{ @@ -302,9 +467,16 @@ mod tests { #[tokio::test] async fn write_split() { - let harness = TenantHarness::create("split_writer_write_split") - .await - .unwrap(); + write_split_helper("split_writer_write_split", false).await; + } + + #[tokio::test] + async fn write_split_discard() { + write_split_helper("split_writer_write_split_discard", false).await; + } + + async fn write_split_helper(harness_name: &'static str, discard: bool) { + let harness = TenantHarness::create(harness_name).await.unwrap(); let (tenant, ctx) = harness.load().await; let tline = tenant @@ -338,16 +510,19 @@ mod tests { for i in 0..N { let i = i as u32; image_writer - .put_image(get_key(i), get_large_img(), &tline, &ctx) + .put_image_with_discard_fn(get_key(i), get_large_img(), &tline, &ctx, |_| async { + discard + }) .await .unwrap(); delta_writer - .put_value( + .put_value_with_discard_fn( get_key(i), Lsn(0x20), Value::Image(get_large_img()), &tline, &ctx, + |_| async { discard }, ) .await .unwrap(); @@ -360,22 +535,39 @@ mod tests { .finish(&tline, &ctx, get_key(N as u32)) .await .unwrap(); - assert_eq!(image_layers.len(), N / 512 + 1); - assert_eq!(delta_layers.len(), N / 512 + 1); - for idx in 0..image_layers.len() { - assert_ne!(image_layers[idx].layer_desc().key_range.start, Key::MIN); - assert_ne!(image_layers[idx].layer_desc().key_range.end, Key::MAX); - assert_ne!(delta_layers[idx].layer_desc().key_range.start, Key::MIN); - assert_ne!(delta_layers[idx].layer_desc().key_range.end, Key::MAX); - if idx > 0 { - assert_eq!( - image_layers[idx - 1].layer_desc().key_range.end, - image_layers[idx].layer_desc().key_range.start - ); - assert_eq!( - delta_layers[idx - 1].layer_desc().key_range.end, - delta_layers[idx].layer_desc().key_range.start - ); + if discard { + for layer in image_layers { + layer.into_discarded_layer(); + } + for layer in delta_layers { + layer.into_discarded_layer(); + } + } else { + let image_layers = image_layers + .into_iter() + .map(|x| x.into_resident_layer()) + .collect_vec(); + let delta_layers = delta_layers + .into_iter() + .map(|x| x.into_resident_layer()) + .collect_vec(); + assert_eq!(image_layers.len(), N / 512 + 1); + assert_eq!(delta_layers.len(), N / 512 + 1); + for idx in 0..image_layers.len() { + assert_ne!(image_layers[idx].layer_desc().key_range.start, Key::MIN); + assert_ne!(image_layers[idx].layer_desc().key_range.end, Key::MAX); + assert_ne!(delta_layers[idx].layer_desc().key_range.start, Key::MIN); + assert_ne!(delta_layers[idx].layer_desc().key_range.end, Key::MAX); + if idx > 0 { + assert_eq!( + image_layers[idx - 1].layer_desc().key_range.end, + image_layers[idx].layer_desc().key_range.start + ); + assert_eq!( + delta_layers[idx - 1].layer_desc().key_range.end, + delta_layers[idx].layer_desc().key_range.start + ); + } } } } @@ -456,4 +648,49 @@ mod tests { .unwrap(); assert_eq!(layers.len(), 2); } + + #[tokio::test] + async fn write_split_single_key() { + let harness = TenantHarness::create("split_writer_write_split_single_key") + .await + .unwrap(); + let (tenant, ctx) = harness.load().await; + + let tline = tenant + .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx) + .await + .unwrap(); + + const N: usize = 2000; + let mut delta_writer = SplitDeltaLayerWriter::new( + tenant.conf, + tline.timeline_id, + tenant.tenant_shard_id, + get_key(0), + Lsn(0x10)..Lsn(N as u64 * 16 + 0x10), + 4 * 1024 * 1024, + &ctx, + ) + .await + .unwrap(); + + for i in 0..N { + let i = i as u32; + delta_writer + .put_value( + get_key(0), + Lsn(i as u64 * 16 + 0x10), + Value::Image(get_large_img()), + &tline, + &ctx, + ) + .await + .unwrap(); + } + let delta_layers = delta_writer + .finish(&tline, &ctx, get_key(N as u32)) + .await + .unwrap(); + assert_eq!(delta_layers.len(), 1); + } } diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index b33e436fce..098c196ee8 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -5444,12 +5444,17 @@ impl Timeline { !(a.end <= b.start || b.end <= a.start) } - let guard = self.layers.read().await; - for layer in guard.layer_map()?.iter_historic_layers() { - if layer.is_delta() - && overlaps_with(&layer.lsn_range, &deltas.lsn_range) - && layer.lsn_range != deltas.lsn_range - { + if deltas.key_range.start.next() != deltas.key_range.end { + let guard = self.layers.read().await; + let mut invalid_layers = + guard.layer_map()?.iter_historic_layers().filter(|layer| { + layer.is_delta() + && overlaps_with(&layer.lsn_range, &deltas.lsn_range) + && layer.lsn_range != deltas.lsn_range + // skip single-key layer files + && layer.key_range.start.next() != layer.key_range.end + }); + if let Some(layer) = invalid_layers.next() { // If a delta layer overlaps with another delta layer AND their LSN range is not the same, panic panic!( "inserted layer violates delta layer LSN invariant: current_lsn_range={}..{}, conflict_lsn_range={}..{}", diff --git a/pageserver/src/tenant/timeline/compaction.rs b/pageserver/src/tenant/timeline/compaction.rs index 7370ec1386..aad75ac59c 100644 --- a/pageserver/src/tenant/timeline/compaction.rs +++ b/pageserver/src/tenant/timeline/compaction.rs @@ -14,7 +14,7 @@ use super::{ RecordedDuration, Timeline, }; -use anyhow::{anyhow, Context}; +use anyhow::{anyhow, bail, Context}; use bytes::Bytes; use enumset::EnumSet; use fail::fail_point; @@ -32,6 +32,9 @@ use crate::page_cache; use crate::tenant::config::defaults::{DEFAULT_CHECKPOINT_DISTANCE, DEFAULT_COMPACTION_THRESHOLD}; use crate::tenant::remote_timeline_client::WaitCompletionError; use crate::tenant::storage_layer::merge_iterator::MergeIterator; +use crate::tenant::storage_layer::split_writer::{ + SplitDeltaLayerWriter, SplitImageLayerWriter, SplitWriterResult, +}; use crate::tenant::storage_layer::{ AsLayerDesc, PersistentLayerDesc, PersistentLayerKey, ValueReconstructState, }; @@ -71,15 +74,60 @@ pub(crate) struct KeyHistoryRetention { } impl KeyHistoryRetention { + /// Hack: skip delta layer if we need to produce a layer of a same key-lsn. + /// + /// This can happen if we have removed some deltas in "the middle" of some existing layer's key-lsn-range. + /// For example, consider the case where a single delta with range [0x10,0x50) exists. + /// And we have branches at LSN 0x10, 0x20, 0x30. + /// Then we delete branch @ 0x20. + /// Bottom-most compaction may now delete the delta [0x20,0x30). + /// And that wouldnt' change the shape of the layer. + /// + /// Note that bottom-most-gc-compaction never _adds_ new data in that case, only removes. + /// + /// `discard_key` will only be called when the writer reaches its target (instead of for every key), so it's fine to grab a lock inside. + async fn discard_key(key: &PersistentLayerKey, tline: &Arc, dry_run: bool) -> bool { + if dry_run { + return true; + } + let guard = tline.layers.read().await; + if !guard.contains_key(key) { + return false; + } + let layer_generation = guard.get_from_key(key).metadata().generation; + drop(guard); + if layer_generation == tline.generation { + info!( + key=%key, + ?layer_generation, + "discard layer due to duplicated layer key in the same generation", + ); + true + } else { + false + } + } + + /// Pipe a history of a single key to the writers. + /// + /// If `image_writer` is none, the images will be placed into the delta layers. + /// The delta writer will contain all images and deltas (below and above the horizon) except the bottom-most images. + #[allow(clippy::too_many_arguments)] async fn pipe_to( self, key: Key, - delta_writer: &mut Vec<(Key, Lsn, Value)>, - mut image_writer: Option<&mut ImageLayerWriter>, + tline: &Arc, + delta_writer: &mut SplitDeltaLayerWriter, + mut image_writer: Option<&mut SplitImageLayerWriter>, stat: &mut CompactionStatistics, + dry_run: bool, ctx: &RequestContext, ) -> anyhow::Result<()> { let mut first_batch = true; + let discard = |key: &PersistentLayerKey| { + let key = key.clone(); + async move { Self::discard_key(&key, tline, dry_run).await } + }; for (cutoff_lsn, KeyLogAtLsn(logs)) in self.below_horizon { if first_batch { if logs.len() == 1 && logs[0].1.is_image() { @@ -88,28 +136,45 @@ impl KeyHistoryRetention { }; stat.produce_image_key(img); if let Some(image_writer) = image_writer.as_mut() { - image_writer.put_image(key, img.clone(), ctx).await?; + image_writer + .put_image_with_discard_fn(key, img.clone(), tline, ctx, discard) + .await?; } else { - delta_writer.push((key, cutoff_lsn, Value::Image(img.clone()))); + delta_writer + .put_value_with_discard_fn( + key, + cutoff_lsn, + Value::Image(img.clone()), + tline, + ctx, + discard, + ) + .await?; } } else { for (lsn, val) in logs { stat.produce_key(&val); - delta_writer.push((key, lsn, val)); + delta_writer + .put_value_with_discard_fn(key, lsn, val, tline, ctx, discard) + .await?; } } first_batch = false; } else { for (lsn, val) in logs { stat.produce_key(&val); - delta_writer.push((key, lsn, val)); + delta_writer + .put_value_with_discard_fn(key, lsn, val, tline, ctx, discard) + .await?; } } } let KeyLogAtLsn(above_horizon_logs) = self.above_horizon; for (lsn, val) in above_horizon_logs { stat.produce_key(&val); - delta_writer.push((key, lsn, val)); + delta_writer + .put_value_with_discard_fn(key, lsn, val, tline, ctx, discard) + .await?; } Ok(()) } @@ -1814,11 +1879,27 @@ impl Timeline { } let mut selected_layers = Vec::new(); drop(gc_info); + // Pick all the layers intersect or below the gc_cutoff, get the largest LSN in the selected layers. + let Some(max_layer_lsn) = layers + .iter_historic_layers() + .filter(|desc| desc.get_lsn_range().start <= gc_cutoff) + .map(|desc| desc.get_lsn_range().end) + .max() + else { + info!("no layers to compact with gc"); + return Ok(()); + }; + // Then, pick all the layers that are below the max_layer_lsn. This is to ensure we can pick all single-key + // layers to compact. for desc in layers.iter_historic_layers() { - if desc.get_lsn_range().start <= gc_cutoff { + if desc.get_lsn_range().end <= max_layer_lsn { selected_layers.push(guard.get_from_desc(&desc)); } } + if selected_layers.is_empty() { + info!("no layers to compact with gc"); + return Ok(()); + } retain_lsns_below_horizon.sort(); (selected_layers, gc_cutoff, retain_lsns_below_horizon) }; @@ -1848,27 +1929,53 @@ impl Timeline { lowest_retain_lsn ); // Step 1: (In the future) construct a k-merge iterator over all layers. For now, simply collect all keys + LSNs. - // Also, collect the layer information to decide when to split the new delta layers. - let mut downloaded_layers = Vec::new(); - let mut delta_split_points = BTreeSet::new(); + // Also, verify if the layer map can be split by drawing a horizontal line at every LSN start/end split point. + let mut lsn_split_point = BTreeSet::new(); // TODO: use a better data structure (range tree / range set?) for layer in &layer_selection { - let resident_layer = layer.download_and_keep_resident().await?; - downloaded_layers.push(resident_layer); - let desc = layer.layer_desc(); if desc.is_delta() { - // TODO: is it correct to only record split points for deltas intersecting with the GC horizon? (exclude those below/above the horizon) - // so that we can avoid having too many small delta layers. - let key_range = desc.get_key_range(); - delta_split_points.insert(key_range.start); - delta_split_points.insert(key_range.end); + // ignore single-key layer files + if desc.key_range.start.next() != desc.key_range.end { + let lsn_range = &desc.lsn_range; + lsn_split_point.insert(lsn_range.start); + lsn_split_point.insert(lsn_range.end); + } stat.visit_delta_layer(desc.file_size()); } else { stat.visit_image_layer(desc.file_size()); } } + for layer in &layer_selection { + let desc = layer.layer_desc(); + let key_range = &desc.key_range; + if desc.is_delta() && key_range.start.next() != key_range.end { + let lsn_range = desc.lsn_range.clone(); + let intersects = lsn_split_point.range(lsn_range).collect_vec(); + if intersects.len() > 1 { + bail!( + "cannot run gc-compaction because it violates the layer map LSN split assumption: layer {} intersects with LSN [{}]", + desc.key(), + intersects.into_iter().map(|lsn| lsn.to_string()).join(", ") + ); + } + } + } + // The maximum LSN we are processing in this compaction loop + let end_lsn = layer_selection + .iter() + .map(|l| l.layer_desc().lsn_range.end) + .max() + .unwrap(); + // We don't want any of the produced layers to cover the full key range (i.e., MIN..MAX) b/c it will then be recognized + // as an L0 layer. + let hack_end_key = Key::NON_L0_MAX; let mut delta_layers = Vec::new(); let mut image_layers = Vec::new(); + let mut downloaded_layers = Vec::new(); + for layer in &layer_selection { + let resident_layer = layer.download_and_keep_resident().await?; + downloaded_layers.push(resident_layer); + } for resident_layer in &downloaded_layers { if resident_layer.layer_desc().is_delta() { let layer = resident_layer.get_as_delta(ctx).await?; @@ -1884,138 +1991,17 @@ impl Timeline { let mut accumulated_values = Vec::new(); let mut last_key: Option = None; - enum FlushDeltaResult { - /// Create a new resident layer - CreateResidentLayer(ResidentLayer), - /// Keep an original delta layer - KeepLayer(PersistentLayerKey), - } - - #[allow(clippy::too_many_arguments)] - async fn flush_deltas( - deltas: &mut Vec<(Key, Lsn, crate::repository::Value)>, - last_key: Key, - delta_split_points: &[Key], - current_delta_split_point: &mut usize, - tline: &Arc, - lowest_retain_lsn: Lsn, - ctx: &RequestContext, - stats: &mut CompactionStatistics, - dry_run: bool, - last_batch: bool, - ) -> anyhow::Result> { - // Check if we need to split the delta layer. We split at the original delta layer boundary to avoid - // overlapping layers. - // - // If we have a structure like this: - // - // | Delta 1 | | Delta 4 | - // |---------| Delta 2 |---------| - // | Delta 3 | | Delta 5 | - // - // And we choose to compact delta 2+3+5. We will get an overlapping delta layer with delta 1+4. - // A simple solution here is to split the delta layers using the original boundary, while this - // might produce a lot of small layers. This should be improved and fixed in the future. - let mut need_split = false; - while *current_delta_split_point < delta_split_points.len() - && last_key >= delta_split_points[*current_delta_split_point] - { - *current_delta_split_point += 1; - need_split = true; - } - if !need_split && !last_batch { - return Ok(None); - } - let deltas: Vec<(Key, Lsn, Value)> = std::mem::take(deltas); - if deltas.is_empty() { - return Ok(None); - } - let end_lsn = deltas.iter().map(|(_, lsn, _)| lsn).max().copied().unwrap() + 1; - let delta_key = PersistentLayerKey { - key_range: { - let key_start = deltas.first().unwrap().0; - let key_end = deltas.last().unwrap().0.next(); - key_start..key_end - }, - lsn_range: lowest_retain_lsn..end_lsn, - is_delta: true, - }; - { - // Hack: skip delta layer if we need to produce a layer of a same key-lsn. - // - // This can happen if we have removed some deltas in "the middle" of some existing layer's key-lsn-range. - // For example, consider the case where a single delta with range [0x10,0x50) exists. - // And we have branches at LSN 0x10, 0x20, 0x30. - // Then we delete branch @ 0x20. - // Bottom-most compaction may now delete the delta [0x20,0x30). - // And that wouldnt' change the shape of the layer. - // - // Note that bottom-most-gc-compaction never _adds_ new data in that case, only removes. - // That's why it's safe to skip. - let guard = tline.layers.read().await; - - if guard.contains_key(&delta_key) { - let layer_generation = guard.get_from_key(&delta_key).metadata().generation; - drop(guard); - if layer_generation == tline.generation { - stats.discard_delta_layer(); - // TODO: depending on whether we design this compaction process to run along with - // other compactions, there could be layer map modifications after we drop the - // layer guard, and in case it creates duplicated layer key, we will still error - // in the end. - info!( - key=%delta_key, - ?layer_generation, - "discard delta layer due to duplicated layer in the same generation" - ); - return Ok(Some(FlushDeltaResult::KeepLayer(delta_key))); - } - } - } - - let mut delta_layer_writer = DeltaLayerWriter::new( - tline.conf, - tline.timeline_id, - tline.tenant_shard_id, - delta_key.key_range.start, - lowest_retain_lsn..end_lsn, - ctx, - ) - .await?; - for (key, lsn, val) in deltas { - delta_layer_writer.put_value(key, lsn, val, ctx).await?; - } - - stats.produce_delta_layer(delta_layer_writer.size()); - if dry_run { - return Ok(None); - } - - let (desc, path) = delta_layer_writer - .finish(delta_key.key_range.end, ctx) - .await?; - let delta_layer = Layer::finish_creating(tline.conf, tline, desc, &path)?; - Ok(Some(FlushDeltaResult::CreateResidentLayer(delta_layer))) - } - - // Hack the key range to be min..(max-1). Otherwise, the image layer will be - // interpreted as an L0 delta layer. - let hack_image_layer_range = { - let mut end_key = Key::MAX; - end_key.field6 -= 1; - Key::MIN..end_key - }; - // Only create image layers when there is no ancestor branches. TODO: create covering image layer // when some condition meet. let mut image_layer_writer = if self.ancestor_timeline.is_none() { Some( - ImageLayerWriter::new( + SplitImageLayerWriter::new( self.conf, self.timeline_id, self.tenant_shard_id, - &hack_image_layer_range, // covers the full key range + Key::MIN, lowest_retain_lsn, + self.get_compaction_target_size(), ctx, ) .await?, @@ -2024,6 +2010,17 @@ impl Timeline { None }; + let mut delta_layer_writer = SplitDeltaLayerWriter::new( + self.conf, + self.timeline_id, + self.tenant_shard_id, + Key::MIN, + lowest_retain_lsn..end_lsn, + self.get_compaction_target_size(), + ctx, + ) + .await?; + /// Returns None if there is no ancestor branch. Throw an error when the key is not found. /// /// Currently, we always get the ancestor image for each key in the child branch no matter whether the image @@ -2044,47 +2041,11 @@ impl Timeline { let img = tline.get(key, tline.ancestor_lsn, ctx).await?; Ok(Some((key, tline.ancestor_lsn, img))) } - let image_layer_key = PersistentLayerKey { - key_range: hack_image_layer_range, - lsn_range: PersistentLayerDesc::image_layer_lsn_range(lowest_retain_lsn), - is_delta: false, - }; - - // Like with delta layers, it can happen that we re-produce an already existing image layer. - // This could happen when a user triggers force compaction and image generation. In this case, - // it's always safe to rewrite the layer. - let discard_image_layer = { - let guard = self.layers.read().await; - if guard.contains_key(&image_layer_key) { - let layer_generation = guard.get_from_key(&image_layer_key).metadata().generation; - drop(guard); - if layer_generation == self.generation { - // TODO: depending on whether we design this compaction process to run along with - // other compactions, there could be layer map modifications after we drop the - // layer guard, and in case it creates duplicated layer key, we will still error - // in the end. - info!( - key=%image_layer_key, - ?layer_generation, - "discard image layer due to duplicated layer key in the same generation", - ); - true - } else { - false - } - } else { - false - } - }; // Actually, we can decide not to write to the image layer at all at this point because // the key and LSN range are determined. However, to keep things simple here, we still // create this writer, and discard the writer in the end. - let mut delta_values = Vec::new(); - let delta_split_points = delta_split_points.into_iter().collect_vec(); - let mut current_delta_split_point = 0; - let mut delta_layers = Vec::new(); while let Some((key, lsn, val)) = merge_iter.next().await? { if cancel.is_cancelled() { return Err(anyhow!("cancelled")); // TODO: refactor to CompactionError and pass cancel error @@ -2115,27 +2076,14 @@ impl Timeline { retention .pipe_to( *last_key, - &mut delta_values, + self, + &mut delta_layer_writer, image_layer_writer.as_mut(), &mut stat, + dry_run, ctx, ) .await?; - delta_layers.extend( - flush_deltas( - &mut delta_values, - *last_key, - &delta_split_points, - &mut current_delta_split_point, - self, - lowest_retain_lsn, - ctx, - &mut stat, - dry_run, - false, - ) - .await?, - ); accumulated_values.clear(); *last_key = key; accumulated_values.push((key, lsn, val)); @@ -2159,43 +2107,75 @@ impl Timeline { retention .pipe_to( last_key, - &mut delta_values, + self, + &mut delta_layer_writer, image_layer_writer.as_mut(), &mut stat, + dry_run, ctx, ) .await?; - delta_layers.extend( - flush_deltas( - &mut delta_values, - last_key, - &delta_split_points, - &mut current_delta_split_point, - self, - lowest_retain_lsn, - ctx, - &mut stat, - dry_run, - true, - ) - .await?, - ); - assert!(delta_values.is_empty(), "unprocessed keys"); - let image_layer = if discard_image_layer { - stat.discard_image_layer(); - None - } else if let Some(writer) = image_layer_writer { - stat.produce_image_layer(writer.size()); + let discard = |key: &PersistentLayerKey| { + let key = key.clone(); + async move { KeyHistoryRetention::discard_key(&key, self, dry_run).await } + }; + + let produced_image_layers = if let Some(writer) = image_layer_writer { if !dry_run { - Some(writer.finish(self, ctx).await?) + writer + .finish_with_discard_fn(self, ctx, hack_end_key, discard) + .await? } else { - None + let (layers, _) = writer.take()?; + assert!(layers.is_empty(), "image layers produced in dry run mode?"); + Vec::new() } } else { - None + Vec::new() }; + let produced_delta_layers = if !dry_run { + delta_layer_writer + .finish_with_discard_fn(self, ctx, hack_end_key, discard) + .await? + } else { + let (layers, _) = delta_layer_writer.take()?; + assert!(layers.is_empty(), "delta layers produced in dry run mode?"); + Vec::new() + }; + + let mut compact_to = Vec::new(); + let mut keep_layers = HashSet::new(); + let produced_delta_layers_len = produced_delta_layers.len(); + let produced_image_layers_len = produced_image_layers.len(); + for action in produced_delta_layers { + match action { + SplitWriterResult::Produced(layer) => { + stat.produce_delta_layer(layer.layer_desc().file_size()); + compact_to.push(layer); + } + SplitWriterResult::Discarded(l) => { + keep_layers.insert(l); + stat.discard_delta_layer(); + } + } + } + for action in produced_image_layers { + match action { + SplitWriterResult::Produced(layer) => { + stat.produce_image_layer(layer.layer_desc().file_size()); + compact_to.push(layer); + } + SplitWriterResult::Discarded(l) => { + keep_layers.insert(l); + stat.discard_image_layer(); + } + } + } + let mut layer_selection = layer_selection; + layer_selection.retain(|x| !keep_layers.contains(&x.layer_desc().key())); + info!( "gc-compaction statistics: {}", serde_json::to_string(&stat)? @@ -2206,28 +2186,11 @@ impl Timeline { } info!( - "produced {} delta layers and {} image layers", - delta_layers.len(), - if image_layer.is_some() { 1 } else { 0 } + "produced {} delta layers and {} image layers, {} layers are kept", + produced_delta_layers_len, + produced_image_layers_len, + layer_selection.len() ); - let mut compact_to = Vec::new(); - let mut keep_layers = HashSet::new(); - for action in delta_layers { - match action { - FlushDeltaResult::CreateResidentLayer(layer) => { - compact_to.push(layer); - } - FlushDeltaResult::KeepLayer(l) => { - keep_layers.insert(l); - } - } - } - if discard_image_layer { - keep_layers.insert(image_layer_key); - } - let mut layer_selection = layer_selection; - layer_selection.retain(|x| !keep_layers.contains(&x.layer_desc().key())); - compact_to.extend(image_layer); // Step 3: Place back to the layer map. { diff --git a/proxy/src/auth/backend.rs b/proxy/src/auth/backend.rs index ae72bc6de3..bb9a0ddffc 100644 --- a/proxy/src/auth/backend.rs +++ b/proxy/src/auth/backend.rs @@ -85,7 +85,7 @@ pub trait TestBackend: Send + Sync + 'static { impl std::fmt::Display for BackendType<'_, (), ()> { fn fmt(&self, fmt: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { - Self::Console(api, _) => match &**api { + Self::Console(api, ()) => match &**api { ConsoleBackend::Console(endpoint) => { fmt.debug_tuple("Console").field(&endpoint.url()).finish() } @@ -96,7 +96,7 @@ impl std::fmt::Display for BackendType<'_, (), ()> { #[cfg(test)] ConsoleBackend::Test(_) => fmt.debug_tuple("Test").finish(), }, - Self::Link(url, _) => fmt.debug_tuple("Link").field(&url.as_str()).finish(), + Self::Link(url, ()) => fmt.debug_tuple("Link").field(&url.as_str()).finish(), Self::Local(_) => fmt.debug_tuple("Local").finish(), } } @@ -324,21 +324,20 @@ async fn auth_quirks( }; let (cached_entry, secret) = cached_secret.take_value(); - let secret = match secret { - Some(secret) => config.check_rate_limit( + let secret = if let Some(secret) = secret { + config.check_rate_limit( ctx, config, secret, &info.endpoint, unauthenticated_password.is_some() || allow_cleartext, - )?, - None => { - // If we don't have an authentication secret, we mock one to - // prevent malicious probing (possible due to missing protocol steps). - // This mocked secret will never lead to successful authentication. - info!("authentication info not found, mocking it"); - AuthSecret::Scram(scram::ServerSecret::mock(rand::random())) - } + )? + } else { + // If we don't have an authentication secret, we mock one to + // prevent malicious probing (possible due to missing protocol steps). + // This mocked secret will never lead to successful authentication. + info!("authentication info not found, mocking it"); + AuthSecret::Scram(scram::ServerSecret::mock(rand::random())) }; match authenticate_with_secret( @@ -409,7 +408,7 @@ impl<'a> BackendType<'a, ComputeUserInfoMaybeEndpoint, &()> { pub fn get_endpoint(&self) -> Option { match self { Self::Console(_, user_info) => user_info.endpoint_id.clone(), - Self::Link(_, _) => Some("link".into()), + Self::Link(_, ()) => Some("link".into()), Self::Local(_) => Some("local".into()), } } @@ -418,7 +417,7 @@ impl<'a> BackendType<'a, ComputeUserInfoMaybeEndpoint, &()> { pub fn get_user(&self) -> &str { match self { Self::Console(_, user_info) => &user_info.user, - Self::Link(_, _) => "link", + Self::Link(_, ()) => "link", Self::Local(_) => "local", } } @@ -454,7 +453,7 @@ impl<'a> BackendType<'a, ComputeUserInfoMaybeEndpoint, &()> { BackendType::Console(api, credentials) } // NOTE: this auth backend doesn't use client credentials. - Self::Link(url, _) => { + Self::Link(url, ()) => { info!("performing link authentication"); let info = link::authenticate(ctx, &url, client).await?; @@ -478,7 +477,7 @@ impl BackendType<'_, ComputeUserInfo, &()> { ) -> Result { match self { Self::Console(api, user_info) => api.get_role_secret(ctx, user_info).await, - Self::Link(_, _) => Ok(Cached::new_uncached(None)), + Self::Link(_, ()) => Ok(Cached::new_uncached(None)), Self::Local(_) => Ok(Cached::new_uncached(None)), } } @@ -489,7 +488,7 @@ impl BackendType<'_, ComputeUserInfo, &()> { ) -> Result<(CachedAllowedIps, Option), GetAuthInfoError> { match self { Self::Console(api, user_info) => api.get_allowed_ips_and_secret(ctx, user_info).await, - Self::Link(_, _) => Ok((Cached::new_uncached(Arc::new(vec![])), None)), + Self::Link(_, ()) => Ok((Cached::new_uncached(Arc::new(vec![])), None)), Self::Local(_) => Ok((Cached::new_uncached(Arc::new(vec![])), None)), } } @@ -525,7 +524,7 @@ impl ComputeConnectBackend for BackendType<'_, ComputeCredentials, &()> { ) -> Result { match self { Self::Console(api, creds) => api.wake_compute(ctx, &creds.info).await, - Self::Link(_, _) => unreachable!("link auth flow doesn't support waking the compute"), + Self::Link(_, ()) => unreachable!("link auth flow doesn't support waking the compute"), Self::Local(local) => Ok(Cached::new_uncached(local.node_info.clone())), } } @@ -533,7 +532,7 @@ impl ComputeConnectBackend for BackendType<'_, ComputeCredentials, &()> { fn get_keys(&self) -> &ComputeCredentialKeys { match self { Self::Console(_, creds) => &creds.keys, - Self::Link(_, _) => &ComputeCredentialKeys::None, + Self::Link(_, ()) => &ComputeCredentialKeys::None, Self::Local(_) => &ComputeCredentialKeys::None, } } diff --git a/proxy/src/auth/backend/jwt.rs b/proxy/src/auth/backend/jwt.rs index 49d5de16c3..61833e19ed 100644 --- a/proxy/src/auth/backend/jwt.rs +++ b/proxy/src/auth/backend/jwt.rs @@ -224,10 +224,10 @@ impl JwkCacheEntryLock { // where Signature = alg( || . || ); let (header_payload, signature) = jwt - .rsplit_once(".") + .rsplit_once('.') .context("Provided authentication token is not a valid JWT encoding")?; let (header, payload) = header_payload - .split_once(".") + .split_once('.') .context("Provided authentication token is not a valid JWT encoding")?; let header = base64::decode_config(header, base64::URL_SAFE_NO_PAD) @@ -320,14 +320,11 @@ impl JwkCache { // try with just a read lock first let key = (endpoint, role_name.clone()); let entry = self.map.get(&key).as_deref().map(Arc::clone); - let entry = match entry { - Some(entry) => entry, - None => { - // acquire a write lock after to insert. - let entry = self.map.entry(key).or_default(); - Arc::clone(&*entry) - } - }; + let entry = entry.unwrap_or_else(|| { + // acquire a write lock after to insert. + let entry = self.map.entry(key).or_default(); + Arc::clone(&*entry) + }); entry .check_jwt(ctx, jwt, &self.client, role_name, fetch) diff --git a/proxy/src/auth/credentials.rs b/proxy/src/auth/credentials.rs index 849e7d65e8..cb06fcaf55 100644 --- a/proxy/src/auth/credentials.rs +++ b/proxy/src/auth/credentials.rs @@ -130,9 +130,12 @@ impl ComputeUserInfoMaybeEndpoint { })) } // Invariant: project name may not contain certain characters. - (a, b) => a.or(b).map(|name| match project_name_valid(name.as_ref()) { - false => Err(ComputeUserInfoParseError::MalformedProjectName(name)), - true => Ok(name), + (a, b) => a.or(b).map(|name| { + if project_name_valid(name.as_ref()) { + Ok(name) + } else { + Err(ComputeUserInfoParseError::MalformedProjectName(name)) + } }), } .transpose()?; diff --git a/proxy/src/cache/project_info.rs b/proxy/src/cache/project_info.rs index 10cc4ceee1..eda886a7af 100644 --- a/proxy/src/cache/project_info.rs +++ b/proxy/src/cache/project_info.rs @@ -274,13 +274,13 @@ impl ProjectInfoCacheImpl { let ttl_disabled_since_us = self .ttl_disabled_since_us .load(std::sync::atomic::Ordering::Relaxed); - let ignore_cache_since = if ttl_disabled_since_us != u64::MAX { + let ignore_cache_since = if ttl_disabled_since_us == u64::MAX { + None + } else { let ignore_cache_since = self.start_time + Duration::from_micros(ttl_disabled_since_us); // We are fine if entry is not older than ttl or was added before we are getting notifications. valid_since = valid_since.min(ignore_cache_since); Some(ignore_cache_since) - } else { - None }; (valid_since, ignore_cache_since) } @@ -306,7 +306,7 @@ impl ProjectInfoCacheImpl { let mut removed = 0; let shard = self.project2ep.shards()[shard].write(); for (_, endpoints) in shard.iter() { - for endpoint in endpoints.get().iter() { + for endpoint in endpoints.get() { self.cache.remove(endpoint); removed += 1; } diff --git a/proxy/src/cancellation.rs b/proxy/src/cancellation.rs index 34512e9f5b..ea8f7b4070 100644 --- a/proxy/src/cancellation.rs +++ b/proxy/src/cancellation.rs @@ -220,7 +220,8 @@ mod tests { #[tokio::test] async fn cancel_session_noop_regression() { - let handler = CancellationHandler::<()>::new(Default::default(), CancellationSource::Local); + let handler = + CancellationHandler::<()>::new(CancelMap::default(), CancellationSource::Local); handler .cancel_session( CancelKeyData { diff --git a/proxy/src/compute.rs b/proxy/src/compute.rs index c071a59d58..b6659f5dd0 100644 --- a/proxy/src/compute.rs +++ b/proxy/src/compute.rs @@ -286,7 +286,7 @@ impl ConnCfg { let client_config = if allow_self_signed_compute { // Allow all certificates for creating the connection - let verifier = Arc::new(AcceptEverythingVerifier) as Arc; + let verifier = Arc::new(AcceptEverythingVerifier); rustls::ClientConfig::builder() .dangerous() .with_custom_certificate_verifier(verifier) diff --git a/proxy/src/config.rs b/proxy/src/config.rs index a280aa88ce..6c42fb8d19 100644 --- a/proxy/src/config.rs +++ b/proxy/src/config.rs @@ -318,7 +318,7 @@ impl CertResolver { // a) Instead of multi-cert approach use single cert with extra // domains listed in Subject Alternative Name (SAN). // b) Deploy separate proxy instances for extra domains. - self.default.as_ref().cloned() + self.default.clone() } } } diff --git a/proxy/src/console/provider/mock.rs b/proxy/src/console/provider/mock.rs index 2093da7562..4e8b7a9365 100644 --- a/proxy/src/console/provider/mock.rs +++ b/proxy/src/console/provider/mock.rs @@ -64,7 +64,7 @@ impl Api { tokio_postgres::connect(self.endpoint.as_str(), tokio_postgres::NoTls).await?; tokio::spawn(connection); - let secret = match get_execute_postgres_query( + let secret = if let Some(entry) = get_execute_postgres_query( &client, "select rolpassword from pg_catalog.pg_authid where rolname = $1", &[&&*user_info.user], @@ -72,15 +72,12 @@ impl Api { ) .await? { - Some(entry) => { - info!("got a secret: {entry}"); // safe since it's not a prod scenario - let secret = scram::ServerSecret::parse(&entry).map(AuthSecret::Scram); - secret.or_else(|| parse_md5(&entry).map(AuthSecret::Md5)) - } - None => { - warn!("user '{}' does not exist", user_info.user); - None - } + info!("got a secret: {entry}"); // safe since it's not a prod scenario + let secret = scram::ServerSecret::parse(&entry).map(AuthSecret::Scram); + secret.or_else(|| parse_md5(&entry).map(AuthSecret::Md5)) + } else { + warn!("user '{}' does not exist", user_info.user); + None }; let allowed_ips = match get_execute_postgres_query( &client, @@ -142,12 +139,11 @@ async fn get_execute_postgres_query( let rows = client.query(query, params).await?; // We can get at most one row, because `rolname` is unique. - let row = match rows.first() { - Some(row) => row, + let Some(row) = rows.first() else { // This means that the user doesn't exist, so there can be no secret. // However, this is still a *valid* outcome which is very similar // to getting `404 Not found` from the Neon console. - None => return Ok(None), + return Ok(None); }; let entry = row.try_get(idx).map_err(MockApiError::PasswordNotSet)?; diff --git a/proxy/src/console/provider/neon.rs b/proxy/src/console/provider/neon.rs index 7eda238b66..a6c0e233fc 100644 --- a/proxy/src/console/provider/neon.rs +++ b/proxy/src/console/provider/neon.rs @@ -38,9 +38,9 @@ impl Api { locks: &'static ApiLocks, wake_compute_endpoint_rate_limiter: Arc, ) -> Self { - let jwt: String = match std::env::var("NEON_PROXY_TO_CONTROLPLANE_TOKEN") { + let jwt = match std::env::var("NEON_PROXY_TO_CONTROLPLANE_TOKEN") { Ok(v) => v, - Err(_) => "".to_string(), + Err(_) => String::new(), }; Self { endpoint, @@ -96,10 +96,10 @@ impl Api { // Error 404 is special: it's ok not to have a secret. // TODO(anna): retry Err(e) => { - if e.get_reason().is_not_found() { - return Ok(AuthInfo::default()); + return if e.get_reason().is_not_found() { + Ok(AuthInfo::default()) } else { - return Err(e.into()); + Err(e.into()) } } }; diff --git a/proxy/src/lib.rs b/proxy/src/lib.rs index 8e1a4e4fa2..1e14ca59ec 100644 --- a/proxy/src/lib.rs +++ b/proxy/src/lib.rs @@ -12,6 +12,8 @@ // https://rust-lang.github.io/rust-clippy/master/index.html#?groups=restriction #![warn( clippy::undocumented_unsafe_blocks, + // TODO: Enable once all individual checks are enabled. + //clippy::as_conversions, clippy::dbg_macro, clippy::empty_enum_variants_with_brackets, clippy::exit, @@ -31,8 +33,15 @@ )] // List of permanently allowed lints. #![allow( - // It's ok to cast u8 to bool, etc. + // It's ok to cast bool to u8, etc. clippy::cast_lossless, + // Seems unavoidable. + clippy::multiple_crate_versions, + // While #[must_use] is a great feature this check is too noisy. + clippy::must_use_candidate, + // Inline consts, structs, fns, imports, etc. are ok if they're used by + // the following statement(s). + clippy::items_after_statements, )] // List of temporarily allowed lints. // TODO: Switch to except() once stable with 1.81. @@ -43,46 +52,26 @@ clippy::cast_possible_wrap, clippy::cast_precision_loss, clippy::cast_sign_loss, - clippy::default_trait_access, clippy::doc_markdown, - clippy::explicit_iter_loop, - clippy::float_cmp, - clippy::if_not_else, - clippy::ignored_unit_patterns, clippy::implicit_hasher, - clippy::inconsistent_struct_constructor, clippy::inline_always, - clippy::items_after_statements, - clippy::manual_assert, - clippy::manual_let_else, - clippy::manual_string_new, - clippy::match_bool, clippy::match_same_arms, clippy::match_wild_err_arm, clippy::missing_errors_doc, clippy::missing_panics_doc, clippy::module_name_repetitions, - clippy::multiple_crate_versions, - clippy::must_use_candidate, - clippy::needless_for_each, clippy::needless_pass_by_value, clippy::needless_raw_string_hashes, - clippy::option_as_ref_cloned, clippy::redundant_closure_for_method_calls, - clippy::redundant_else, clippy::return_self_not_must_use, clippy::similar_names, - clippy::single_char_pattern, clippy::single_match_else, clippy::struct_excessive_bools, clippy::struct_field_names, clippy::too_many_lines, - clippy::uninlined_format_args, - clippy::unnested_or_patterns, clippy::unreadable_literal, clippy::unused_async, clippy::unused_self, - clippy::used_underscore_binding, clippy::wildcard_imports )] // List of temporarily allowed lints to unblock beta/nightly. diff --git a/proxy/src/proxy.rs b/proxy/src/proxy.rs index 2182f38fe7..aa1025a29f 100644 --- a/proxy/src/proxy.rs +++ b/proxy/src/proxy.rs @@ -254,7 +254,7 @@ pub async fn handle_client( let metrics = &Metrics::get().proxy; let proto = ctx.protocol(); - let _request_gauge = metrics.connection_requests.guard(proto); + let request_gauge = metrics.connection_requests.guard(proto); let tls = config.tls_config.as_ref(); @@ -283,7 +283,7 @@ pub async fn handle_client( let result = config .auth_backend .as_ref() - .map(|_| auth::ComputeUserInfoMaybeEndpoint::parse(ctx, ¶ms, hostname, common_names)) + .map(|()| auth::ComputeUserInfoMaybeEndpoint::parse(ctx, ¶ms, hostname, common_names)) .transpose(); let user_info = match result { @@ -340,7 +340,7 @@ pub async fn handle_client( client: stream, aux: node.aux.clone(), compute: node, - req: _request_gauge, + req: request_gauge, conn: conn_gauge, cancel: session, })) diff --git a/proxy/src/proxy/connect_compute.rs b/proxy/src/proxy/connect_compute.rs index e1a54a9c98..6305dc204e 100644 --- a/proxy/src/proxy/connect_compute.rs +++ b/proxy/src/proxy/connect_compute.rs @@ -30,9 +30,10 @@ pub fn invalidate_cache(node_info: console::CachedNodeInfo) -> NodeInfo { if is_cached { warn!("invalidating stalled compute node info cache entry"); } - let label = match is_cached { - true => ConnectionFailureKind::ComputeCached, - false => ConnectionFailureKind::ComputeUncached, + let label = if is_cached { + ConnectionFailureKind::ComputeCached + } else { + ConnectionFailureKind::ComputeUncached }; Metrics::get().proxy.connection_failures_total.inc(label); diff --git a/proxy/src/proxy/copy_bidirectional.rs b/proxy/src/proxy/copy_bidirectional.rs index 048523f69c..f8c8e8bc4b 100644 --- a/proxy/src/proxy/copy_bidirectional.rs +++ b/proxy/src/proxy/copy_bidirectional.rs @@ -230,11 +230,10 @@ impl CopyBuffer { io::ErrorKind::WriteZero, "write zero byte into writer", )))); - } else { - self.pos += i; - self.amt += i as u64; - self.need_flush = true; } + self.pos += i; + self.amt += i as u64; + self.need_flush = true; } // If pos larger than cap, this loop will never stop. diff --git a/proxy/src/proxy/tests.rs b/proxy/src/proxy/tests.rs index d8308c4f2a..21c0641a7f 100644 --- a/proxy/src/proxy/tests.rs +++ b/proxy/src/proxy/tests.rs @@ -433,7 +433,7 @@ impl ReportableError for TestConnectError { impl std::fmt::Display for TestConnectError { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "{:?}", self) + write!(f, "{self:?}") } } @@ -475,7 +475,7 @@ impl ConnectMechanism for TestConnectMechanism { retryable: false, kind: ErrorKind::Compute, }), - x => panic!("expecting action {:?}, connect is called instead", x), + x => panic!("expecting action {x:?}, connect is called instead"), } } @@ -515,7 +515,7 @@ impl TestBackend for TestConnectMechanism { assert!(err.could_retry()); Err(console::errors::WakeComputeError::ApiError(err)) } - x => panic!("expecting action {:?}, wake_compute is called instead", x), + x => panic!("expecting action {x:?}, wake_compute is called instead"), } } diff --git a/proxy/src/proxy/tests/mitm.rs b/proxy/src/proxy/tests/mitm.rs index 2d752b9183..71f07f4682 100644 --- a/proxy/src/proxy/tests/mitm.rs +++ b/proxy/src/proxy/tests/mitm.rs @@ -115,9 +115,7 @@ where let mut buf = [0]; stream.read_exact(&mut buf).await.unwrap(); - if buf[0] != b'S' { - panic!("ssl not supported by server"); - } + assert!(buf[0] == b'S', "ssl not supported by server"); tls.connect(stream).await.unwrap() } diff --git a/proxy/src/rate_limiter/leaky_bucket.rs b/proxy/src/rate_limiter/leaky_bucket.rs index 2d5e056540..f184e18f4c 100644 --- a/proxy/src/rate_limiter/leaky_bucket.rs +++ b/proxy/src/rate_limiter/leaky_bucket.rs @@ -119,6 +119,7 @@ impl Default for LeakyBucketState { } #[cfg(test)] +#[allow(clippy::float_cmp)] mod tests { use std::time::Duration; diff --git a/proxy/src/rate_limiter/limit_algorithm.rs b/proxy/src/rate_limiter/limit_algorithm.rs index 80a62b2a76..bc16837f65 100644 --- a/proxy/src/rate_limiter/limit_algorithm.rs +++ b/proxy/src/rate_limiter/limit_algorithm.rs @@ -174,9 +174,8 @@ impl DynamicLimiter { let mut inner = self.inner.lock(); if inner.take(&self.ready).is_some() { break Ok(Token::new(self.clone())); - } else { - notified.set(self.ready.notified()); } + notified.set(self.ready.notified()); } notified.as_mut().await; ready = true; diff --git a/proxy/src/redis/notifications.rs b/proxy/src/redis/notifications.rs index ad69246443..31c0e62c2c 100644 --- a/proxy/src/redis/notifications.rs +++ b/proxy/src/redis/notifications.rs @@ -150,7 +150,7 @@ impl MessageHandler { } } } - _ => { + Notification::AllowedIpsUpdate { .. } | Notification::PasswordUpdate { .. } => { invalidate_cache(self.cache.clone(), msg.clone()); if matches!(msg, Notification::AllowedIpsUpdate { .. }) { Metrics::get() diff --git a/proxy/src/scram/messages.rs b/proxy/src/scram/messages.rs index 5ecbbf7004..54157e450d 100644 --- a/proxy/src/scram/messages.rs +++ b/proxy/src/scram/messages.rs @@ -89,7 +89,7 @@ impl<'a> ClientFirstMessage<'a> { write!(&mut message, "r={}", self.nonce).unwrap(); base64::encode_config_buf(nonce, base64::STANDARD, &mut message); let combined_nonce = 2..message.len(); - write!(&mut message, ",s={},i={}", salt_base64, iterations).unwrap(); + write!(&mut message, ",s={salt_base64},i={iterations}").unwrap(); // This design guarantees that it's impossible to create a // server-first-message without receiving a client-first-message diff --git a/proxy/src/scram/secret.rs b/proxy/src/scram/secret.rs index 44c4f9e44a..a08cb943c3 100644 --- a/proxy/src/scram/secret.rs +++ b/proxy/src/scram/secret.rs @@ -82,13 +82,7 @@ mod tests { let stored_key = "D5h6KTMBlUvDJk2Y8ELfC1Sjtc6k9YHjRyuRZyBNJns="; let server_key = "Pi3QHbcluX//NDfVkKlFl88GGzlJ5LkyPwcdlN/QBvI="; - let secret = format!( - "SCRAM-SHA-256${iterations}:{salt}${stored_key}:{server_key}", - iterations = iterations, - salt = salt, - stored_key = stored_key, - server_key = server_key, - ); + let secret = format!("SCRAM-SHA-256${iterations}:{salt}${stored_key}:{server_key}"); let parsed = ServerSecret::parse(&secret).unwrap(); assert_eq!(parsed.iterations, iterations); diff --git a/proxy/src/scram/threadpool.rs b/proxy/src/scram/threadpool.rs index fa3d3ccca2..8fbaecf93d 100644 --- a/proxy/src/scram/threadpool.rs +++ b/proxy/src/scram/threadpool.rs @@ -222,12 +222,11 @@ fn thread_rt(pool: Arc, worker: Worker, index: usize) { } for i in 0.. { - let mut job = match worker + let Some(mut job) = worker .pop() .or_else(|| pool.steal(&mut rng, index, &worker)) - { - Some(job) => job, - None => continue 'wait, + else { + continue 'wait; }; pool.metrics diff --git a/proxy/src/serverless.rs b/proxy/src/serverless.rs index ea65867293..d9a9019746 100644 --- a/proxy/src/serverless.rs +++ b/proxy/src/serverless.rs @@ -93,11 +93,11 @@ pub async fn task_main( let mut tls_server_config = rustls::ServerConfig::clone(&config.to_server_config()); // prefer http2, but support http/1.1 tls_server_config.alpn_protocols = vec![b"h2".to_vec(), b"http/1.1".to_vec()]; - Arc::new(tls_server_config) as Arc<_> + Arc::new(tls_server_config) } None => { warn!("TLS config is missing"); - Arc::new(NoTls) as Arc<_> + Arc::new(NoTls) } }; diff --git a/proxy/src/serverless/backend.rs b/proxy/src/serverless/backend.rs index b44ecb76e3..9cc271c588 100644 --- a/proxy/src/serverless/backend.rs +++ b/proxy/src/serverless/backend.rs @@ -44,7 +44,11 @@ impl PoolingBackend { password: &[u8], ) -> Result { let user_info = user_info.clone(); - let backend = self.config.auth_backend.as_ref().map(|_| user_info.clone()); + let backend = self + .config + .auth_backend + .as_ref() + .map(|()| user_info.clone()); let (allowed_ips, maybe_secret) = backend.get_allowed_ips_and_secret(ctx).await?; if !check_peer_addr_is_in_list(&ctx.peer_addr(), &allowed_ips) { return Err(AuthError::ip_address_not_allowed(ctx.peer_addr())); @@ -101,10 +105,10 @@ impl PoolingBackend { jwt: &str, ) -> Result { match &self.config.auth_backend { - crate::auth::BackendType::Console(_, _) => { + crate::auth::BackendType::Console(_, ()) => { Err(AuthError::auth_failed("JWT login is not yet supported")) } - crate::auth::BackendType::Link(_, _) => Err(AuthError::auth_failed( + crate::auth::BackendType::Link(_, ()) => Err(AuthError::auth_failed( "JWT login over link proxy is not supported", )), crate::auth::BackendType::Local(cache) => { @@ -138,12 +142,12 @@ impl PoolingBackend { keys: ComputeCredentials, force_new: bool, ) -> Result, HttpConnError> { - let maybe_client = if !force_new { - info!("pool: looking for an existing connection"); - self.pool.get(ctx, &conn_info)? - } else { + let maybe_client = if force_new { info!("pool: pool is disabled"); None + } else { + info!("pool: looking for an existing connection"); + self.pool.get(ctx, &conn_info)? }; if let Some(client) = maybe_client { @@ -152,7 +156,7 @@ impl PoolingBackend { let conn_id = uuid::Uuid::new_v4(); tracing::Span::current().record("conn_id", display(conn_id)); info!(%conn_id, "pool: opening a new connection '{conn_info}'"); - let backend = self.config.auth_backend.as_ref().map(|_| keys); + let backend = self.config.auth_backend.as_ref().map(|()| keys); crate::proxy::connect_compute::connect_to_compute( ctx, &TokioMechanism { diff --git a/proxy/src/serverless/conn_pool.rs b/proxy/src/serverless/conn_pool.rs index 6ed694af58..476083d71e 100644 --- a/proxy/src/serverless/conn_pool.rs +++ b/proxy/src/serverless/conn_pool.rs @@ -339,9 +339,9 @@ impl GlobalConnPool { } = pool.get_mut(); // ensure that closed clients are removed - pools.iter_mut().for_each(|(_, db_pool)| { + for db_pool in pools.values_mut() { clients_removed += db_pool.clear_closed_clients(total_conns); - }); + } // we only remove this pool if it has no active connections if *total_conns == 0 { @@ -405,21 +405,20 @@ impl GlobalConnPool { if client.is_closed() { info!("pool: cached connection '{conn_info}' is closed, opening a new one"); return Ok(None); - } else { - tracing::Span::current().record("conn_id", tracing::field::display(client.conn_id)); - tracing::Span::current().record( - "pid", - tracing::field::display(client.inner.get_process_id()), - ); - info!( - cold_start_info = ColdStartInfo::HttpPoolHit.as_str(), - "pool: reusing connection '{conn_info}'" - ); - client.session.send(ctx.session_id())?; - ctx.set_cold_start_info(ColdStartInfo::HttpPoolHit); - ctx.success(); - return Ok(Some(Client::new(client, conn_info.clone(), endpoint_pool))); } + tracing::Span::current().record("conn_id", tracing::field::display(client.conn_id)); + tracing::Span::current().record( + "pid", + tracing::field::display(client.inner.get_process_id()), + ); + info!( + cold_start_info = ColdStartInfo::HttpPoolHit.as_str(), + "pool: reusing connection '{conn_info}'" + ); + client.session.send(ctx.session_id())?; + ctx.set_cold_start_info(ColdStartInfo::HttpPoolHit); + ctx.success(); + return Ok(Some(Client::new(client, conn_info.clone(), endpoint_pool))); } Ok(None) } @@ -660,7 +659,7 @@ impl Client { span: _, } = self; let inner = inner.as_mut().expect("client inner should not be removed"); - (&mut inner.inner, Discard { pool, conn_info }) + (&mut inner.inner, Discard { conn_info, pool }) } } @@ -722,7 +721,9 @@ impl Drop for Client { mod tests { use std::{mem, sync::atomic::AtomicBool}; - use crate::{serverless::cancel_set::CancelSet, BranchId, EndpointId, ProjectId}; + use crate::{ + proxy::NeonOptions, serverless::cancel_set::CancelSet, BranchId, EndpointId, ProjectId, + }; use super::*; @@ -781,7 +782,7 @@ mod tests { user_info: ComputeUserInfo { user: "user".into(), endpoint: "endpoint".into(), - options: Default::default(), + options: NeonOptions::default(), }, dbname: "dbname".into(), auth: AuthData::Password("password".as_bytes().into()), @@ -839,7 +840,7 @@ mod tests { user_info: ComputeUserInfo { user: "user".into(), endpoint: "endpoint-2".into(), - options: Default::default(), + options: NeonOptions::default(), }, dbname: "dbname".into(), auth: AuthData::Password("password".as_bytes().into()), diff --git a/proxy/src/serverless/json.rs b/proxy/src/serverless/json.rs index c22c63e85b..3776971fa1 100644 --- a/proxy/src/serverless/json.rs +++ b/proxy/src/serverless/json.rs @@ -55,7 +55,7 @@ fn json_array_to_pg_array(value: &Value) -> Option { .collect::>() .join(","); - Some(format!("{{{}}}", vals)) + Some(format!("{{{vals}}}")) } } } diff --git a/proxy/src/serverless/sql_over_http.rs b/proxy/src/serverless/sql_over_http.rs index 79baef45f6..9143469eea 100644 --- a/proxy/src/serverless/sql_over_http.rs +++ b/proxy/src/serverless/sql_over_http.rs @@ -207,12 +207,12 @@ fn get_conn_info( .ok_or(ConnInfoError::MalformedEndpoint)? } else { hostname - .split_once(".") + .split_once('.') .map_or(hostname, |(prefix, _)| prefix) .into() } } - Some(url::Host::Ipv4(_)) | Some(url::Host::Ipv6(_)) | None => { + Some(url::Host::Ipv4(_) | url::Host::Ipv6(_)) | None => { return Err(ConnInfoError::MissingHostname) } }; diff --git a/proxy/src/stream.rs b/proxy/src/stream.rs index 7809d2e574..ef13f5fc1a 100644 --- a/proxy/src/stream.rs +++ b/proxy/src/stream.rs @@ -67,7 +67,7 @@ impl PqStream { FeMessage::PasswordMessage(msg) => Ok(msg), bad => Err(io::Error::new( io::ErrorKind::InvalidData, - format!("unexpected message type: {:?}", bad), + format!("unexpected message type: {bad:?}"), )), } } diff --git a/proxy/src/usage_metrics.rs b/proxy/src/usage_metrics.rs index a8735fe0bb..4cf6da7e2d 100644 --- a/proxy/src/usage_metrics.rs +++ b/proxy/src/usage_metrics.rs @@ -450,12 +450,9 @@ async fn upload_events_chunk( remote_path: &RemotePath, cancel: &CancellationToken, ) -> anyhow::Result<()> { - let storage = match storage { - Some(storage) => storage, - None => { - error!("no remote storage configured"); - return Ok(()); - } + let Some(storage) = storage else { + error!("no remote storage configured"); + return Ok(()); }; let data = serde_json::to_vec(&chunk).context("serialize metrics")?; let mut encoder = GzipEncoder::new(Vec::new()); diff --git a/proxy/src/waiters.rs b/proxy/src/waiters.rs index 3bd8f4c8ef..9f78242ed3 100644 --- a/proxy/src/waiters.rs +++ b/proxy/src/waiters.rs @@ -31,7 +31,7 @@ pub struct Waiters(pub(self) Mutex>>); impl Default for Waiters { fn default() -> Self { - Waiters(Default::default()) + Waiters(Mutex::default()) } } diff --git a/test_runner/fixtures/common_types.py b/test_runner/fixtures/common_types.py index 7cadcbb4c2..8eda19d1e2 100644 --- a/test_runner/fixtures/common_types.py +++ b/test_runner/fixtures/common_types.py @@ -1,5 +1,6 @@ import random from dataclasses import dataclass +from enum import Enum from functools import total_ordering from typing import Any, Dict, Type, TypeVar, Union @@ -213,3 +214,9 @@ class TenantShardId: def __hash__(self) -> int: return hash(self._tuple()) + + +# TODO: Replace with `StrEnum` when we upgrade to python 3.11 +class TimelineArchivalState(str, Enum): + ARCHIVED = "Archived" + UNARCHIVED = "Unarchived" diff --git a/test_runner/fixtures/pageserver/http.py b/test_runner/fixtures/pageserver/http.py index cd4261f1b8..582f9c0264 100644 --- a/test_runner/fixtures/pageserver/http.py +++ b/test_runner/fixtures/pageserver/http.py @@ -10,7 +10,7 @@ import requests from requests.adapters import HTTPAdapter from urllib3.util.retry import Retry -from fixtures.common_types import Lsn, TenantId, TenantShardId, TimelineId +from fixtures.common_types import Lsn, TenantId, TenantShardId, TimelineArchivalState, TimelineId from fixtures.log_helper import log from fixtures.metrics import Metrics, MetricsGetter, parse_metrics from fixtures.pg_version import PgVersion @@ -621,6 +621,22 @@ class PageserverHttpClient(requests.Session, MetricsGetter): ) self.verbose_error(res) + def timeline_archival_config( + self, + tenant_id: Union[TenantId, TenantShardId], + timeline_id: TimelineId, + state: TimelineArchivalState, + ): + config = {"state": state.value} + log.info( + f"requesting timeline archival config {config} for tenant {tenant_id} and timeline {timeline_id}" + ) + res = self.post( + f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/archival_config", + json=config, + ) + self.verbose_error(res) + def timeline_get_lsn_by_timestamp( self, tenant_id: Union[TenantId, TenantShardId], diff --git a/test_runner/regress/test_timeline_archive.py b/test_runner/regress/test_timeline_archive.py new file mode 100644 index 0000000000..b774c7c9fe --- /dev/null +++ b/test_runner/regress/test_timeline_archive.py @@ -0,0 +1,96 @@ +import pytest +from fixtures.common_types import TenantId, TimelineArchivalState, TimelineId +from fixtures.neon_fixtures import ( + NeonEnv, +) +from fixtures.pageserver.http import PageserverApiException + + +def test_timeline_archive(neon_simple_env: NeonEnv): + env = neon_simple_env + + env.pageserver.allowed_errors.extend( + [ + ".*Timeline .* was not found.*", + ".*timeline not found.*", + ".*Cannot archive timeline which has unarchived child timelines.*", + ".*Precondition failed: Requested tenant is missing.*", + ] + ) + + ps_http = env.pageserver.http_client() + + # first try to archive non existing timeline + # for existing tenant: + invalid_timeline_id = TimelineId.generate() + with pytest.raises(PageserverApiException, match="timeline not found") as exc: + ps_http.timeline_archival_config( + tenant_id=env.initial_tenant, + timeline_id=invalid_timeline_id, + state=TimelineArchivalState.ARCHIVED, + ) + + assert exc.value.status_code == 404 + + # for non existing tenant: + invalid_tenant_id = TenantId.generate() + with pytest.raises( + PageserverApiException, + match=f"NotFound: tenant {invalid_tenant_id}", + ) as exc: + ps_http.timeline_archival_config( + tenant_id=invalid_tenant_id, + timeline_id=invalid_timeline_id, + state=TimelineArchivalState.ARCHIVED, + ) + + assert exc.value.status_code == 404 + + # construct pair of branches to validate that pageserver prohibits + # archival of ancestor timelines when they have non-archived child branches + parent_timeline_id = env.neon_cli.create_branch("test_ancestor_branch_archive_parent", "empty") + + leaf_timeline_id = env.neon_cli.create_branch( + "test_ancestor_branch_archive_branch1", "test_ancestor_branch_archive_parent" + ) + + timeline_path = env.pageserver.timeline_dir(env.initial_tenant, parent_timeline_id) + + with pytest.raises( + PageserverApiException, + match="Cannot archive timeline which has non-archived child timelines", + ) as exc: + assert timeline_path.exists() + + ps_http.timeline_archival_config( + tenant_id=env.initial_tenant, + timeline_id=parent_timeline_id, + state=TimelineArchivalState.ARCHIVED, + ) + + assert exc.value.status_code == 412 + + # Test timeline_detail + leaf_detail = ps_http.timeline_detail( + tenant_id=env.initial_tenant, + timeline_id=leaf_timeline_id, + ) + assert leaf_detail["is_archived"] is False + + # Test that archiving the leaf timeline and then the parent works + ps_http.timeline_archival_config( + tenant_id=env.initial_tenant, + timeline_id=leaf_timeline_id, + state=TimelineArchivalState.ARCHIVED, + ) + leaf_detail = ps_http.timeline_detail( + tenant_id=env.initial_tenant, + timeline_id=leaf_timeline_id, + ) + assert leaf_detail["is_archived"] is True + + ps_http.timeline_archival_config( + tenant_id=env.initial_tenant, + timeline_id=parent_timeline_id, + state=TimelineArchivalState.ARCHIVED, + ) diff --git a/vm-image-spec.yaml b/vm-image-spec.yaml index 622004b931..c94f95f447 100644 --- a/vm-image-spec.yaml +++ b/vm-image-spec.yaml @@ -326,15 +326,13 @@ files: SELECT checkpoints_timed FROM pg_stat_bgwriter; - metric_name: compute_logical_snapshot_files - type: guage + type: gauge help: 'Number of snapshot files in pg_logical/snapshot' key_labels: - - tenant_id - timeline_id values: [num_logical_snapshot_files] query: | SELECT - (SELECT setting FROM pg_settings WHERE name = 'neon.tenant_id') AS tenant_id, (SELECT setting FROM pg_settings WHERE name = 'neon.timeline_id') AS timeline_id, -- Postgres creates temporary snapshot files of the form %X-%X.snap.%d.tmp. These -- temporary snapshot files are renamed to the actual snapshot files after they are @@ -356,6 +354,17 @@ files: from pg_replication_slots where slot_type = 'logical'; + - metric_name: compute_subscriptions_count + type: gauge + help: 'Number of logical replication subscriptions grouped by enabled/disabled' + key_labels: + - enabled + values: [subscriptions_count] + query: | + select subenabled::text as enabled, count(*) as subscriptions_count + from pg_subscription + group by subenabled; + - metric_name: retained_wal type: gauge help: 'Retained WAL in inactive replication slots'