diff --git a/pageserver/src/tenant/storage_layer/delta_layer.rs b/pageserver/src/tenant/storage_layer/delta_layer.rs index bd77d177b0..4324a6e9a0 100644 --- a/pageserver/src/tenant/storage_layer/delta_layer.rs +++ b/pageserver/src/tenant/storage_layer/delta_layer.rs @@ -549,25 +549,10 @@ impl DeltaLayer { &self.layer_name(), ) } - - /// Obtains all keys and value references stored in the layer + /// Loads all keys stored in the layer. Returns key, lsn, value size and value reference. /// /// The value can be obtained via the [`ValueRef::load`] function. - pub async fn load_val_refs( - &self, - ctx: &RequestContext, - ) -> Result>)>> { - let inner = self - .load(LayerAccessKind::Iter, ctx) - .await - .context("load delta layer")?; - DeltaLayerInner::load_val_refs(inner) - .await - .context("Layer index is corrupted") - } - - /// Loads all keys stored in the layer. Returns key, lsn and value size. - pub async fn load_keys(&self, ctx: &RequestContext) -> Result> { + pub async fn load_keys(&self, ctx: &RequestContext) -> Result> { let inner = self .load(LayerAccessKind::KeyIter, ctx) .await @@ -711,6 +696,17 @@ impl DeltaLayerWriterInner { .metadata() .context("get file metadata to determine size")?; + // 5GB limit for objects without multipart upload (which we don't want to use) + // Make it a little bit below to account for differing GB units + // https://docs.aws.amazon.com/AmazonS3/latest/userguide/upload-objects.html + const S3_UPLOAD_LIMIT: u64 = 4_500_000_000; + ensure!( + metadata.len() <= S3_UPLOAD_LIMIT, + "Created delta layer file at {} of size {} above limit {S3_UPLOAD_LIMIT}!", + file.path.display(), + metadata.len() + ); + // Note: Because we opened the file in write-only mode, we cannot // reuse the same VirtualFile for reading later. That's why we don't // set inner.file here. The first read will have to re-open it. @@ -955,15 +951,17 @@ impl DeltaLayerInner { } } - pub(super) async fn load_val_refs + Clone>( + pub(super) async fn load_keys + Clone>( this: &T, - ) -> Result)>> { + ) -> Result> { let dl = this.as_ref(); let file = &dl.file; + let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(dl.index_start_blk, dl.index_root_blk, file); - let mut all_offsets = Vec::<(Key, Lsn, ValueRef)>::new(); + let mut all_keys: Vec = Vec::new(); + tree_reader .visit( &[0u8; DELTA_KEY_SIZE], @@ -972,56 +970,45 @@ impl DeltaLayerInner { let delta_key = DeltaKey::from_slice(key); let val_ref = ValueRef { blob_ref: BlobRef(value), - reader: BlockCursor::new(Adapter(this.clone())), + reader: BlockCursor::new(Adapter(dl)), }; - all_offsets.push((delta_key.key(), delta_key.lsn(), val_ref)); - true - }, - ) - .await?; - - Ok(all_offsets) - } - - pub(super) async fn load_keys(&self) -> Result> { - let file = &self.file; - let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new( - self.index_start_blk, - self.index_root_blk, - file, - ); - - let mut all_keys: Vec<(Key, Lsn, u64)> = Vec::new(); - tree_reader - .visit( - &[0u8; DELTA_KEY_SIZE], - VisitDirection::Forwards, - |key, value| { - let delta_key = DeltaKey::from_slice(key); let pos = BlobRef(value).pos(); if let Some(last) = all_keys.last_mut() { - if last.0 == delta_key.key() { - return true; - } else { - // subtract offset of new key BLOB and first blob of this key - // to get total size if values associated with this key - let first_pos = last.2; - last.2 = pos - first_pos; - } + // subtract offset of the current and last entries to get the size + // of the value associated with this (key, lsn) tuple + let first_pos = last.size; + last.size = pos - first_pos; } - all_keys.push((delta_key.key(), delta_key.lsn(), pos)); + let entry = DeltaEntry { + key: delta_key.key(), + lsn: delta_key.lsn(), + size: pos, + val: val_ref, + }; + all_keys.push(entry); true }, ) .await?; if let Some(last) = all_keys.last_mut() { - // Last key occupies all space till end of layer - last.2 = std::fs::metadata(&file.file.path)?.len() - last.2; + // Last key occupies all space till end of value storage, + // which corresponds to beginning of the index + last.size = dl.index_start_blk as u64 * PAGE_SZ as u64 - last.size; } Ok(all_keys) } } +/// A set of data associated with a delta layer key and its value +pub struct DeltaEntry<'a> { + pub key: Key, + pub lsn: Lsn, + /// Size of the stored value + pub size: u64, + /// Reference to the on-disk value + pub val: ValueRef<&'a DeltaLayerInner>, +} + /// Reference to an on-disk value pub struct ValueRef> { blob_ref: BlobRef, diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 2deeacdc64..502e5ed44e 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -39,6 +39,7 @@ use crate::context::{ AccessStatsBehavior, DownloadBehavior, RequestContext, RequestContextBuilder, }; use crate::tenant::remote_timeline_client::{self, index::LayerFileMetadata}; +use crate::tenant::storage_layer::delta_layer::DeltaEntry; use crate::tenant::storage_layer::{ DeltaFileName, DeltaLayerWriter, ImageFileName, ImageLayerWriter, InMemoryLayer, LayerAccessStats, LayerFileName, RemoteLayer, @@ -3312,10 +3313,10 @@ struct CompactLevel0Phase1StatsBuilder { timeline_id: Option, read_lock_acquisition_micros: DurationRecorder, read_lock_held_spawn_blocking_startup_micros: DurationRecorder, + read_lock_held_key_sort_micros: DurationRecorder, read_lock_held_prerequisites_micros: DurationRecorder, read_lock_held_compute_holes_micros: DurationRecorder, read_lock_drop_micros: DurationRecorder, - prepare_iterators_micros: DurationRecorder, write_layer_files_micros: DurationRecorder, level0_deltas_count: Option, new_deltas_count: Option, @@ -3332,10 +3333,10 @@ struct CompactLevel0Phase1Stats { timeline_id: TimelineId, read_lock_acquisition_micros: RecordedDuration, read_lock_held_spawn_blocking_startup_micros: RecordedDuration, + read_lock_held_key_sort_micros: RecordedDuration, read_lock_held_prerequisites_micros: RecordedDuration, read_lock_held_compute_holes_micros: RecordedDuration, read_lock_drop_micros: RecordedDuration, - prepare_iterators_micros: RecordedDuration, write_layer_files_micros: RecordedDuration, level0_deltas_count: usize, new_deltas_count: usize, @@ -3362,6 +3363,10 @@ impl TryFrom for CompactLevel0Phase1Stats { .read_lock_held_spawn_blocking_startup_micros .into_recorded() .ok_or_else(|| anyhow!("read_lock_held_spawn_blocking_startup_micros not set"))?, + read_lock_held_key_sort_micros: value + .read_lock_held_key_sort_micros + .into_recorded() + .ok_or_else(|| anyhow!("read_lock_held_key_sort_micros not set"))?, read_lock_held_prerequisites_micros: value .read_lock_held_prerequisites_micros .into_recorded() @@ -3374,10 +3379,6 @@ impl TryFrom for CompactLevel0Phase1Stats { .read_lock_drop_micros .into_recorded() .ok_or_else(|| anyhow!("read_lock_drop_micros not set"))?, - prepare_iterators_micros: value - .prepare_iterators_micros - .into_recorded() - .ok_or_else(|| anyhow!("prepare_iterators_micros not set"))?, write_layer_files_micros: value .write_layer_files_micros .into_recorded() @@ -3547,28 +3548,24 @@ impl Timeline { let mut heap: BinaryHeap = BinaryHeap::with_capacity(max_holes + 1); let mut prev: Option = None; - let mut all_value_refs = Vec::new(); let mut all_keys = Vec::new(); - for l in deltas_to_compact.iter() { + let downcast_deltas: Vec<_> = deltas_to_compact + .iter() + .map(|l| l.clone().downcast_delta_layer().expect("delta layer")) + .collect(); + for dl in downcast_deltas.iter() { // TODO: replace this with an await once we fully go async - let delta = l.clone().downcast_delta_layer().expect("delta layer"); - Handle::current().block_on(async { - all_value_refs.extend(delta.load_val_refs(ctx).await?); - all_keys.extend(delta.load_keys(ctx).await?); - anyhow::Ok(()) - })?; + all_keys.extend(Handle::current().block_on(DeltaLayer::load_keys(dl, ctx))?); } // The current stdlib sorting implementation is designed in a way where it is // particularly fast where the slice is made up of sorted sub-ranges. - all_value_refs.sort_by_key(|(key, lsn, _value_ref)| (*key, *lsn)); + all_keys.sort_by_key(|DeltaEntry { key, lsn, .. }| (*key, *lsn)); - // The current stdlib sorting implementation is designed in a way where it is - // particularly fast where the slice is made up of sorted sub-ranges. - all_keys.sort_by_key(|(key, lsn, _size)| (*key, *lsn)); + stats.read_lock_held_key_sort_micros = stats.read_lock_held_prerequisites_micros.till_now(); - for (next_key, _next_lsn, _size) in all_keys.iter() { + for DeltaEntry { key: next_key, .. } in all_keys.iter() { let next_key = *next_key; if let Some(prev_key) = prev { // just first fast filter @@ -3592,8 +3589,7 @@ impl Timeline { } prev = Some(next_key.next()); } - stats.read_lock_held_compute_holes_micros = - stats.read_lock_held_prerequisites_micros.till_now(); + stats.read_lock_held_compute_holes_micros = stats.read_lock_held_key_sort_micros.till_now(); drop_rlock(guard); stats.read_lock_drop_micros = stats.read_lock_held_compute_holes_micros.till_now(); let mut holes = heap.into_vec(); @@ -3602,12 +3598,26 @@ impl Timeline { // This iterator walks through all key-value pairs from all the layers // we're compacting, in key, LSN order. - let all_values_iter = all_value_refs.into_iter(); + let all_values_iter = all_keys.iter(); // This iterator walks through all keys and is needed to calculate size used by each key - let mut all_keys_iter = all_keys.into_iter(); - - stats.prepare_iterators_micros = stats.read_lock_drop_micros.till_now(); + let mut all_keys_iter = all_keys + .iter() + .map(|DeltaEntry { key, lsn, size, .. }| (*key, *lsn, *size)) + .coalesce(|mut prev, cur| { + // Coalesce keys that belong to the same key pair. + // This ensures that compaction doesn't put them + // into different layer files. + // Still limit this by the target file size, + // so that we keep the size of the files in + // check. + if prev.0 == cur.0 && prev.2 < target_file_size { + prev.2 += cur.2; + Ok(prev) + } else { + Err((prev, cur)) + } + }); // Merge the contents of all the input delta layers into a new set // of delta layers, based on the current partitioning. @@ -3662,8 +3672,11 @@ impl Timeline { // TODO remove this block_on wrapper once we fully go async Handle::current().block_on(async { - for (key, lsn, value_ref) in all_values_iter { - let value = value_ref.load().await?; + for &DeltaEntry { + key, lsn, ref val, .. + } in all_values_iter + { + let value = val.load().await?; let same_key = prev_key.map_or(false, |prev_key| prev_key == key); // We need to check key boundaries once we reach next key or end of layer with the same key if !same_key || lsn == dup_end_lsn { @@ -3764,6 +3777,16 @@ impl Timeline { // Sync layers if !new_layers.is_empty() { + // Print a warning if the created layer is larger than double the target size + let warn_limit = target_file_size * 2; + for layer in new_layers.iter() { + if layer.desc.file_size > warn_limit { + warn!( + %layer, + "created delta file of size {} larger than double of target of {target_file_size}", layer.desc.file_size + ); + } + } let mut layer_paths: Vec = new_layers.iter().map(|l| l.path()).collect(); // Fsync all the layer files and directory using multiple threads to @@ -3776,12 +3799,10 @@ impl Timeline { layer_paths.pop().unwrap(); } - stats.write_layer_files_micros = stats.prepare_iterators_micros.till_now(); + stats.write_layer_files_micros = stats.read_lock_drop_micros.till_now(); stats.new_deltas_count = Some(new_layers.len()); stats.new_deltas_size = Some(new_layers.iter().map(|l| l.desc.file_size).sum()); - drop(all_keys_iter); // So that deltas_to_compact is no longer borrowed - match TryInto::::try_into(stats) .and_then(|stats| serde_json::to_string(&stats).context("serde_json::to_string")) { diff --git a/test_runner/regress/test_remote_storage.py b/test_runner/regress/test_remote_storage.py index 714118a015..4f5b193ce2 100644 --- a/test_runner/regress/test_remote_storage.py +++ b/test_runner/regress/test_remote_storage.py @@ -270,7 +270,7 @@ def test_remote_storage_upload_queue_retries( f""" INSERT INTO foo (id, val) SELECT g, '{data}' - FROM generate_series(1, 10000) g + FROM generate_series(1, 20000) g ON CONFLICT (id) DO UPDATE SET val = EXCLUDED.val """, @@ -371,7 +371,7 @@ def test_remote_storage_upload_queue_retries( log.info("restarting postgres to validate") endpoint = env.endpoints.create_start("main", tenant_id=tenant_id) with endpoint.cursor() as cur: - assert query_scalar(cur, "SELECT COUNT(*) FROM foo WHERE val = 'd'") == 10000 + assert query_scalar(cur, "SELECT COUNT(*) FROM foo WHERE val = 'd'") == 20000 @pytest.mark.parametrize("remote_storage_kind", [RemoteStorageKind.LOCAL_FS]) @@ -419,7 +419,7 @@ def test_remote_timeline_client_calls_started_metric( f""" INSERT INTO foo (id, val) SELECT g, '{data}' - FROM generate_series(1, 10000) g + FROM generate_series(1, 20000) g ON CONFLICT (id) DO UPDATE SET val = EXCLUDED.val """, @@ -510,7 +510,7 @@ def test_remote_timeline_client_calls_started_metric( log.info("restarting postgres to validate") endpoint = env.endpoints.create_start("main", tenant_id=tenant_id) with endpoint.cursor() as cur: - assert query_scalar(cur, "SELECT COUNT(*) FROM foo WHERE val = 'd'") == 10000 + assert query_scalar(cur, "SELECT COUNT(*) FROM foo WHERE val = 'd'") == 20000 # ensure that we updated the calls_started download metric fetch_calls_started()