From 0bdbc39cb192a344e7933bcde5b93b06a7c2102f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arpad=20M=C3=BCller?= Date: Wed, 16 Aug 2023 17:27:18 +0200 Subject: [PATCH] Compaction: unify key and value reference vecs (#4888) ## Problem PR #4839 has already reduced the number of b-tree traversals and vec creations from 3 to 2, but as pointed out in https://github.com/neondatabase/neon/pull/4839#discussion_r1279167815 , we would ideally just traverse the b-tree once during compaction. Afer #4836, the two vecs created are one for the list of keys, lsns and sizes, and one for the list of `(key, lsn, value reference)`. However, they are not equal, as pointed out in https://github.com/neondatabase/neon/pull/4839#issuecomment-1660418012 and the following comment: the key vec creation combines multiple entries for which the lsn is changing but the key stays the same into one, with the size being the sum of the sub-sizes. In SQL, this would correspond to something like `SELECT key, lsn, SUM(size) FROM b_tree GROUP BY key;` and `SELECT key, lsn, val_ref FROM b_tree;`. Therefore, the join operation is non-trivial. ## Summary of changes This PR merges the two lists of keys and value references into one. It's not a trivial change and affects the size pattern of the resulting files, which is why this is in a separate PR from #4839 . The key vec is used in compaction for determining when to start a new layer file. The loop uses various thresholds to come to this conclusion, but the grouping via the key has led to the behaviour that regardless of the threshold, it only starts a new file when either a new key is encountered, or a new delta file. The new code now does the combination after the merging and sorting of the various keys from the delta files. This *mostly* does the same as the old code, except for a detail: with the grouping done on a per-delta-layer basis, the sorted and merged vec would still have multiple entries for multiple delta files, but now, we don't have an easy way to tell when a new input delta layer file is encountered, so we cannot create multiple entries on that basis easily. To prevent possibly infinite growth, our new grouping code compares the combined size with the threshold, and if it is exceeded, it cuts a new entry so that the downstream code can cut a new output file. Here, we perform a tradeoff however, as if the threshold is too small, we risk putting entries for the same key into multiple layer files, but if the threshold is too big, we can in some instances exceed the target size. Currently, we set the threshold to the target size, so in theory we would stay below or roughly at double the `target_file_size`. We also fix the way the size was calculated for the last key. The calculation was wrong and accounted for the old layer's btree, even though we already account for the overhead of the in-construction btree. Builds on top of #4839 . --- .../src/tenant/storage_layer/delta_layer.rs | 99 ++++++++----------- pageserver/src/tenant/timeline.rs | 81 +++++++++------ test_runner/regress/test_remote_storage.py | 8 +- 3 files changed, 98 insertions(+), 90 deletions(-) diff --git a/pageserver/src/tenant/storage_layer/delta_layer.rs b/pageserver/src/tenant/storage_layer/delta_layer.rs index bd77d177b0..4324a6e9a0 100644 --- a/pageserver/src/tenant/storage_layer/delta_layer.rs +++ b/pageserver/src/tenant/storage_layer/delta_layer.rs @@ -549,25 +549,10 @@ impl DeltaLayer { &self.layer_name(), ) } - - /// Obtains all keys and value references stored in the layer + /// Loads all keys stored in the layer. Returns key, lsn, value size and value reference. /// /// The value can be obtained via the [`ValueRef::load`] function. - pub async fn load_val_refs( - &self, - ctx: &RequestContext, - ) -> Result>)>> { - let inner = self - .load(LayerAccessKind::Iter, ctx) - .await - .context("load delta layer")?; - DeltaLayerInner::load_val_refs(inner) - .await - .context("Layer index is corrupted") - } - - /// Loads all keys stored in the layer. Returns key, lsn and value size. - pub async fn load_keys(&self, ctx: &RequestContext) -> Result> { + pub async fn load_keys(&self, ctx: &RequestContext) -> Result> { let inner = self .load(LayerAccessKind::KeyIter, ctx) .await @@ -711,6 +696,17 @@ impl DeltaLayerWriterInner { .metadata() .context("get file metadata to determine size")?; + // 5GB limit for objects without multipart upload (which we don't want to use) + // Make it a little bit below to account for differing GB units + // https://docs.aws.amazon.com/AmazonS3/latest/userguide/upload-objects.html + const S3_UPLOAD_LIMIT: u64 = 4_500_000_000; + ensure!( + metadata.len() <= S3_UPLOAD_LIMIT, + "Created delta layer file at {} of size {} above limit {S3_UPLOAD_LIMIT}!", + file.path.display(), + metadata.len() + ); + // Note: Because we opened the file in write-only mode, we cannot // reuse the same VirtualFile for reading later. That's why we don't // set inner.file here. The first read will have to re-open it. @@ -955,15 +951,17 @@ impl DeltaLayerInner { } } - pub(super) async fn load_val_refs + Clone>( + pub(super) async fn load_keys + Clone>( this: &T, - ) -> Result)>> { + ) -> Result> { let dl = this.as_ref(); let file = &dl.file; + let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(dl.index_start_blk, dl.index_root_blk, file); - let mut all_offsets = Vec::<(Key, Lsn, ValueRef)>::new(); + let mut all_keys: Vec = Vec::new(); + tree_reader .visit( &[0u8; DELTA_KEY_SIZE], @@ -972,56 +970,45 @@ impl DeltaLayerInner { let delta_key = DeltaKey::from_slice(key); let val_ref = ValueRef { blob_ref: BlobRef(value), - reader: BlockCursor::new(Adapter(this.clone())), + reader: BlockCursor::new(Adapter(dl)), }; - all_offsets.push((delta_key.key(), delta_key.lsn(), val_ref)); - true - }, - ) - .await?; - - Ok(all_offsets) - } - - pub(super) async fn load_keys(&self) -> Result> { - let file = &self.file; - let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new( - self.index_start_blk, - self.index_root_blk, - file, - ); - - let mut all_keys: Vec<(Key, Lsn, u64)> = Vec::new(); - tree_reader - .visit( - &[0u8; DELTA_KEY_SIZE], - VisitDirection::Forwards, - |key, value| { - let delta_key = DeltaKey::from_slice(key); let pos = BlobRef(value).pos(); if let Some(last) = all_keys.last_mut() { - if last.0 == delta_key.key() { - return true; - } else { - // subtract offset of new key BLOB and first blob of this key - // to get total size if values associated with this key - let first_pos = last.2; - last.2 = pos - first_pos; - } + // subtract offset of the current and last entries to get the size + // of the value associated with this (key, lsn) tuple + let first_pos = last.size; + last.size = pos - first_pos; } - all_keys.push((delta_key.key(), delta_key.lsn(), pos)); + let entry = DeltaEntry { + key: delta_key.key(), + lsn: delta_key.lsn(), + size: pos, + val: val_ref, + }; + all_keys.push(entry); true }, ) .await?; if let Some(last) = all_keys.last_mut() { - // Last key occupies all space till end of layer - last.2 = std::fs::metadata(&file.file.path)?.len() - last.2; + // Last key occupies all space till end of value storage, + // which corresponds to beginning of the index + last.size = dl.index_start_blk as u64 * PAGE_SZ as u64 - last.size; } Ok(all_keys) } } +/// A set of data associated with a delta layer key and its value +pub struct DeltaEntry<'a> { + pub key: Key, + pub lsn: Lsn, + /// Size of the stored value + pub size: u64, + /// Reference to the on-disk value + pub val: ValueRef<&'a DeltaLayerInner>, +} + /// Reference to an on-disk value pub struct ValueRef> { blob_ref: BlobRef, diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 2deeacdc64..502e5ed44e 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -39,6 +39,7 @@ use crate::context::{ AccessStatsBehavior, DownloadBehavior, RequestContext, RequestContextBuilder, }; use crate::tenant::remote_timeline_client::{self, index::LayerFileMetadata}; +use crate::tenant::storage_layer::delta_layer::DeltaEntry; use crate::tenant::storage_layer::{ DeltaFileName, DeltaLayerWriter, ImageFileName, ImageLayerWriter, InMemoryLayer, LayerAccessStats, LayerFileName, RemoteLayer, @@ -3312,10 +3313,10 @@ struct CompactLevel0Phase1StatsBuilder { timeline_id: Option, read_lock_acquisition_micros: DurationRecorder, read_lock_held_spawn_blocking_startup_micros: DurationRecorder, + read_lock_held_key_sort_micros: DurationRecorder, read_lock_held_prerequisites_micros: DurationRecorder, read_lock_held_compute_holes_micros: DurationRecorder, read_lock_drop_micros: DurationRecorder, - prepare_iterators_micros: DurationRecorder, write_layer_files_micros: DurationRecorder, level0_deltas_count: Option, new_deltas_count: Option, @@ -3332,10 +3333,10 @@ struct CompactLevel0Phase1Stats { timeline_id: TimelineId, read_lock_acquisition_micros: RecordedDuration, read_lock_held_spawn_blocking_startup_micros: RecordedDuration, + read_lock_held_key_sort_micros: RecordedDuration, read_lock_held_prerequisites_micros: RecordedDuration, read_lock_held_compute_holes_micros: RecordedDuration, read_lock_drop_micros: RecordedDuration, - prepare_iterators_micros: RecordedDuration, write_layer_files_micros: RecordedDuration, level0_deltas_count: usize, new_deltas_count: usize, @@ -3362,6 +3363,10 @@ impl TryFrom for CompactLevel0Phase1Stats { .read_lock_held_spawn_blocking_startup_micros .into_recorded() .ok_or_else(|| anyhow!("read_lock_held_spawn_blocking_startup_micros not set"))?, + read_lock_held_key_sort_micros: value + .read_lock_held_key_sort_micros + .into_recorded() + .ok_or_else(|| anyhow!("read_lock_held_key_sort_micros not set"))?, read_lock_held_prerequisites_micros: value .read_lock_held_prerequisites_micros .into_recorded() @@ -3374,10 +3379,6 @@ impl TryFrom for CompactLevel0Phase1Stats { .read_lock_drop_micros .into_recorded() .ok_or_else(|| anyhow!("read_lock_drop_micros not set"))?, - prepare_iterators_micros: value - .prepare_iterators_micros - .into_recorded() - .ok_or_else(|| anyhow!("prepare_iterators_micros not set"))?, write_layer_files_micros: value .write_layer_files_micros .into_recorded() @@ -3547,28 +3548,24 @@ impl Timeline { let mut heap: BinaryHeap = BinaryHeap::with_capacity(max_holes + 1); let mut prev: Option = None; - let mut all_value_refs = Vec::new(); let mut all_keys = Vec::new(); - for l in deltas_to_compact.iter() { + let downcast_deltas: Vec<_> = deltas_to_compact + .iter() + .map(|l| l.clone().downcast_delta_layer().expect("delta layer")) + .collect(); + for dl in downcast_deltas.iter() { // TODO: replace this with an await once we fully go async - let delta = l.clone().downcast_delta_layer().expect("delta layer"); - Handle::current().block_on(async { - all_value_refs.extend(delta.load_val_refs(ctx).await?); - all_keys.extend(delta.load_keys(ctx).await?); - anyhow::Ok(()) - })?; + all_keys.extend(Handle::current().block_on(DeltaLayer::load_keys(dl, ctx))?); } // The current stdlib sorting implementation is designed in a way where it is // particularly fast where the slice is made up of sorted sub-ranges. - all_value_refs.sort_by_key(|(key, lsn, _value_ref)| (*key, *lsn)); + all_keys.sort_by_key(|DeltaEntry { key, lsn, .. }| (*key, *lsn)); - // The current stdlib sorting implementation is designed in a way where it is - // particularly fast where the slice is made up of sorted sub-ranges. - all_keys.sort_by_key(|(key, lsn, _size)| (*key, *lsn)); + stats.read_lock_held_key_sort_micros = stats.read_lock_held_prerequisites_micros.till_now(); - for (next_key, _next_lsn, _size) in all_keys.iter() { + for DeltaEntry { key: next_key, .. } in all_keys.iter() { let next_key = *next_key; if let Some(prev_key) = prev { // just first fast filter @@ -3592,8 +3589,7 @@ impl Timeline { } prev = Some(next_key.next()); } - stats.read_lock_held_compute_holes_micros = - stats.read_lock_held_prerequisites_micros.till_now(); + stats.read_lock_held_compute_holes_micros = stats.read_lock_held_key_sort_micros.till_now(); drop_rlock(guard); stats.read_lock_drop_micros = stats.read_lock_held_compute_holes_micros.till_now(); let mut holes = heap.into_vec(); @@ -3602,12 +3598,26 @@ impl Timeline { // This iterator walks through all key-value pairs from all the layers // we're compacting, in key, LSN order. - let all_values_iter = all_value_refs.into_iter(); + let all_values_iter = all_keys.iter(); // This iterator walks through all keys and is needed to calculate size used by each key - let mut all_keys_iter = all_keys.into_iter(); - - stats.prepare_iterators_micros = stats.read_lock_drop_micros.till_now(); + let mut all_keys_iter = all_keys + .iter() + .map(|DeltaEntry { key, lsn, size, .. }| (*key, *lsn, *size)) + .coalesce(|mut prev, cur| { + // Coalesce keys that belong to the same key pair. + // This ensures that compaction doesn't put them + // into different layer files. + // Still limit this by the target file size, + // so that we keep the size of the files in + // check. + if prev.0 == cur.0 && prev.2 < target_file_size { + prev.2 += cur.2; + Ok(prev) + } else { + Err((prev, cur)) + } + }); // Merge the contents of all the input delta layers into a new set // of delta layers, based on the current partitioning. @@ -3662,8 +3672,11 @@ impl Timeline { // TODO remove this block_on wrapper once we fully go async Handle::current().block_on(async { - for (key, lsn, value_ref) in all_values_iter { - let value = value_ref.load().await?; + for &DeltaEntry { + key, lsn, ref val, .. + } in all_values_iter + { + let value = val.load().await?; let same_key = prev_key.map_or(false, |prev_key| prev_key == key); // We need to check key boundaries once we reach next key or end of layer with the same key if !same_key || lsn == dup_end_lsn { @@ -3764,6 +3777,16 @@ impl Timeline { // Sync layers if !new_layers.is_empty() { + // Print a warning if the created layer is larger than double the target size + let warn_limit = target_file_size * 2; + for layer in new_layers.iter() { + if layer.desc.file_size > warn_limit { + warn!( + %layer, + "created delta file of size {} larger than double of target of {target_file_size}", layer.desc.file_size + ); + } + } let mut layer_paths: Vec = new_layers.iter().map(|l| l.path()).collect(); // Fsync all the layer files and directory using multiple threads to @@ -3776,12 +3799,10 @@ impl Timeline { layer_paths.pop().unwrap(); } - stats.write_layer_files_micros = stats.prepare_iterators_micros.till_now(); + stats.write_layer_files_micros = stats.read_lock_drop_micros.till_now(); stats.new_deltas_count = Some(new_layers.len()); stats.new_deltas_size = Some(new_layers.iter().map(|l| l.desc.file_size).sum()); - drop(all_keys_iter); // So that deltas_to_compact is no longer borrowed - match TryInto::::try_into(stats) .and_then(|stats| serde_json::to_string(&stats).context("serde_json::to_string")) { diff --git a/test_runner/regress/test_remote_storage.py b/test_runner/regress/test_remote_storage.py index 714118a015..4f5b193ce2 100644 --- a/test_runner/regress/test_remote_storage.py +++ b/test_runner/regress/test_remote_storage.py @@ -270,7 +270,7 @@ def test_remote_storage_upload_queue_retries( f""" INSERT INTO foo (id, val) SELECT g, '{data}' - FROM generate_series(1, 10000) g + FROM generate_series(1, 20000) g ON CONFLICT (id) DO UPDATE SET val = EXCLUDED.val """, @@ -371,7 +371,7 @@ def test_remote_storage_upload_queue_retries( log.info("restarting postgres to validate") endpoint = env.endpoints.create_start("main", tenant_id=tenant_id) with endpoint.cursor() as cur: - assert query_scalar(cur, "SELECT COUNT(*) FROM foo WHERE val = 'd'") == 10000 + assert query_scalar(cur, "SELECT COUNT(*) FROM foo WHERE val = 'd'") == 20000 @pytest.mark.parametrize("remote_storage_kind", [RemoteStorageKind.LOCAL_FS]) @@ -419,7 +419,7 @@ def test_remote_timeline_client_calls_started_metric( f""" INSERT INTO foo (id, val) SELECT g, '{data}' - FROM generate_series(1, 10000) g + FROM generate_series(1, 20000) g ON CONFLICT (id) DO UPDATE SET val = EXCLUDED.val """, @@ -510,7 +510,7 @@ def test_remote_timeline_client_calls_started_metric( log.info("restarting postgres to validate") endpoint = env.endpoints.create_start("main", tenant_id=tenant_id) with endpoint.cursor() as cur: - assert query_scalar(cur, "SELECT COUNT(*) FROM foo WHERE val = 'd'") == 10000 + assert query_scalar(cur, "SELECT COUNT(*) FROM foo WHERE val = 'd'") == 20000 # ensure that we updated the calls_started download metric fetch_calls_started()