feat(pageserver): separate sparse and dense keyspace (#7503)

extracted (and tested) from
https://github.com/neondatabase/neon/pull/7468, part of
https://github.com/neondatabase/neon/issues/7462.

The current codebase assumes the keyspace is dense -- which means that
if we have a keyspace of 0x00-0x100, we assume every key (e.g., 0x00,
0x01, 0x02, ...) exists in the storage engine. However, the assumption
does not hold any more in metadata keyspace. The metadata keyspace is
sparse. It is impossible to do per-key check.

Ideally, we should not have the assumption of dense keyspace at all, but
this would incur a lot of refactors. Therefore, we split the keyspaces
we have to dense/sparse and handle them differently in the code for now.
At some point in the future, we should assume all keyspaces are sparse.

## Summary of changes

* Split collect_keyspace to return dense+sparse keyspace.
* Do not allow generating image layers for sparse keyspace (for now --
will fix this next week, we need image layers anyways).
* Generate delta layers for sparse keyspace.

---------

Signed-off-by: Alex Chi Z <chi@neon.tech>
This commit is contained in:
Alex Chi Z
2024-04-30 09:39:10 -04:00
committed by GitHub
parent 84b6b95783
commit 45c625fb34
8 changed files with 269 additions and 103 deletions

View File

@@ -17,6 +17,10 @@ pub struct KeySpace {
pub ranges: Vec<Range<Key>>,
}
/// A wrapper type for sparse keyspaces.
#[derive(Clone, Debug, Default, PartialEq, Eq)]
pub struct SparseKeySpace(pub KeySpace);
/// Represents a contiguous half-open range of the keyspace, masked according to a particular
/// ShardNumber's stripes: within this range of keys, only some "belong" to the current
/// shard.
@@ -435,10 +439,33 @@ pub struct KeyPartitioning {
pub parts: Vec<KeySpace>,
}
/// Represents a partitioning of the sparse key space.
#[derive(Clone, Debug, Default)]
pub struct SparseKeyPartitioning {
pub parts: Vec<SparseKeySpace>,
}
impl KeyPartitioning {
pub fn new() -> Self {
KeyPartitioning { parts: Vec::new() }
}
/// Convert a key partitioning to a sparse partition.
pub fn into_sparse(self) -> SparseKeyPartitioning {
SparseKeyPartitioning {
parts: self.parts.into_iter().map(SparseKeySpace).collect(),
}
}
}
impl SparseKeyPartitioning {
/// Note: use this function with caution. Attempt to handle a sparse keyspace in the same way as a dense keyspace will
/// cause long/dead loops.
pub fn into_dense(self) -> KeyPartitioning {
KeyPartitioning {
parts: self.parts.into_iter().map(|x| x.0).collect(),
}
}
}
///

View File

@@ -1,9 +1,11 @@
use utils::lsn::Lsn;
use crate::keyspace::SparseKeySpace;
#[derive(Debug, PartialEq, Eq)]
pub struct Partitioning {
pub keys: crate::keyspace::KeySpace,
pub sparse_keys: crate::keyspace::SparseKeySpace,
pub at_lsn: Lsn,
}
@@ -32,6 +34,8 @@ impl serde::Serialize for Partitioning {
let mut map = serializer.serialize_map(Some(2))?;
map.serialize_key("keys")?;
map.serialize_value(&KeySpace(&self.keys))?;
map.serialize_key("sparse_keys")?;
map.serialize_value(&KeySpace(&self.sparse_keys.0))?;
map.serialize_key("at_lsn")?;
map.serialize_value(&WithDisplay(&self.at_lsn))?;
map.end()
@@ -99,6 +103,7 @@ impl<'a> serde::Deserialize<'a> for Partitioning {
#[derive(serde::Deserialize)]
struct De {
keys: KeySpace,
sparse_keys: KeySpace,
#[serde_as(as = "serde_with::DisplayFromStr")]
at_lsn: Lsn,
}
@@ -107,6 +112,7 @@ impl<'a> serde::Deserialize<'a> for Partitioning {
Ok(Self {
at_lsn: de.at_lsn,
keys: de.keys.0,
sparse_keys: SparseKeySpace(de.sparse_keys.0),
})
}
}
@@ -133,6 +139,12 @@ mod tests {
"030000000000000000000000000000000003"
]
],
"sparse_keys": [
[
"620000000000000000000000000000000000",
"620000000000000000000000000000000003"
]
],
"at_lsn": "0/2240160"
}
"#;

View File

@@ -1918,12 +1918,14 @@ async fn timeline_collect_keyspace(
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
let timeline = active_timeline_of_active_tenant(&state.tenant_manager, tenant_shard_id, timeline_id).await?;
let at_lsn = at_lsn.unwrap_or_else(|| timeline.get_last_record_lsn());
let keys = timeline
let (dense_ks, sparse_ks) = timeline
.collect_keyspace(at_lsn, &ctx)
.await
.map_err(|e| ApiError::InternalServerError(e.into()))?;
let res = pageserver_api::models::partitioning::Partitioning { keys, at_lsn };
// This API is currently used by pagebench. Pagebench will iterate all keys within the keyspace.
// Therefore, we split dense/sparse keys in this API.
let res = pageserver_api::models::partitioning::Partitioning { keys: dense_ks, sparse_keys: sparse_ks, at_lsn };
json_response(StatusCode::OK, res)
}

View File

@@ -23,6 +23,7 @@ use pageserver_api::key::{
slru_segment_key_range, slru_segment_size_to_key, twophase_file_key, twophase_key_range,
AUX_FILES_KEY, CHECKPOINT_KEY, CONTROLFILE_KEY, DBDIR_KEY, TWOPHASEDIR_KEY,
};
use pageserver_api::keyspace::SparseKeySpace;
use pageserver_api::reltag::{BlockNumber, RelTag, SlruKind};
use postgres_ffi::relfile_utils::{FSM_FORKNUM, VISIBILITYMAP_FORKNUM};
use postgres_ffi::BLCKSZ;
@@ -730,11 +731,13 @@ impl Timeline {
/// Get a KeySpace that covers all the Keys that are in use at the given LSN.
/// Anything that's not listed maybe removed from the underlying storage (from
/// that LSN forwards).
///
/// The return value is (dense keyspace, sparse keyspace).
pub(crate) async fn collect_keyspace(
&self,
lsn: Lsn,
ctx: &RequestContext,
) -> Result<KeySpace, CollectKeySpaceError> {
) -> Result<(KeySpace, SparseKeySpace), CollectKeySpaceError> {
// Iterate through key ranges, greedily packing them into partitions
let mut result = KeySpaceAccum::new();
@@ -806,7 +809,12 @@ impl Timeline {
if self.get(AUX_FILES_KEY, lsn, ctx).await.is_ok() {
result.add_key(AUX_FILES_KEY);
}
Ok(result.to_keyspace())
Ok((
result.to_keyspace(),
/* AUX sparse key space */
SparseKeySpace(KeySpace::single(Key::metadata_aux_key_range())),
))
}
/// Get cached size of relation if it not updated after specified LSN

View File

@@ -916,6 +916,7 @@ mod tests {
assert_eq!(lhs, rhs);
}
#[cfg(test)]
fn brute_force_range_search(
layer_map: &LayerMap,
key_range: Range<Key>,

View File

@@ -597,14 +597,17 @@ impl InMemoryLayer {
}
}
/// Write this frozen in-memory layer to disk.
/// Write this frozen in-memory layer to disk. If `key_range` is set, the delta
/// layer will only contain the key range the user specifies, and may return `None`
/// if there are no matching keys.
///
/// Returns a new delta layer with all the same data as this in-memory layer
pub(crate) async fn write_to_disk(
&self,
timeline: &Arc<Timeline>,
ctx: &RequestContext,
) -> Result<ResidentLayer> {
key_range: Option<Range<Key>>,
) -> Result<Option<ResidentLayer>> {
// Grab the lock in read-mode. We hold it over the I/O, but because this
// layer is not writeable anymore, no one should be trying to acquire the
// write lock on it, so we shouldn't block anyone. There's one exception
@@ -618,6 +621,21 @@ impl InMemoryLayer {
let end_lsn = *self.end_lsn.get().unwrap();
let keys: Vec<_> = if let Some(key_range) = key_range {
inner
.index
.iter()
.filter(|(k, _)| key_range.contains(k))
.map(|(k, m)| (k.to_i128(), m))
.collect()
} else {
inner.index.iter().map(|(k, m)| (k.to_i128(), m)).collect()
};
if keys.is_empty() {
return Ok(None);
}
let mut delta_layer_writer = DeltaLayerWriter::new(
self.conf,
self.timeline_id,
@@ -649,6 +667,6 @@ impl InMemoryLayer {
// MAX is used here because we identify L0 layers by full key range
let delta_layer = delta_layer_writer.finish(Key::MAX, timeline).await?;
Ok(delta_layer)
Ok(Some(delta_layer))
}
}

View File

@@ -17,7 +17,7 @@ use fail::fail_point;
use once_cell::sync::Lazy;
use pageserver_api::{
key::{AUX_FILES_KEY, NON_INHERITED_RANGE},
keyspace::KeySpaceAccum,
keyspace::{KeySpaceAccum, SparseKeyPartitioning},
models::{
CompactionAlgorithm, DownloadRemoteLayersTaskInfo, DownloadRemoteLayersTaskSpawnRequest,
EvictionPolicy, InMemoryLayerInfo, LayerMapInfo, TimelineState,
@@ -55,7 +55,6 @@ use std::{
ops::ControlFlow,
};
use crate::deletion_queue::DeletionQueueClient;
use crate::tenant::timeline::logical_size::CurrentLogicalSize;
use crate::tenant::{
layer_map::{LayerMap, SearchResult},
@@ -66,6 +65,7 @@ use crate::{
disk_usage_eviction_task::DiskUsageEvictionInfo,
pgdatadir_mapping::CollectKeySpaceError,
};
use crate::{deletion_queue::DeletionQueueClient, metrics::GetKind};
use crate::{
disk_usage_eviction_task::finite_f32,
tenant::storage_layer::{
@@ -86,7 +86,7 @@ use crate::{
use crate::config::PageServerConf;
use crate::keyspace::{KeyPartitioning, KeySpace};
use crate::metrics::{
GetKind, TimelineMetrics, MATERIALIZED_PAGE_CACHE_HIT, MATERIALIZED_PAGE_CACHE_HIT_DIRECT,
TimelineMetrics, MATERIALIZED_PAGE_CACHE_HIT, MATERIALIZED_PAGE_CACHE_HIT_DIRECT,
};
use crate::pgdatadir_mapping::CalculateLogicalSizeError;
use crate::tenant::config::TenantConfOpt;
@@ -137,6 +137,25 @@ pub(super) enum FlushLoopState {
Exited,
}
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub enum ImageLayerCreationMode {
/// Try to create image layers based on `time_for_new_image_layer`. Used in compaction code path.
Try,
/// Force creating the image layers if possible. For now, no image layers will be created
/// for metadata keys. Used in compaction code path with force flag enabled.
Force,
/// Initial ingestion of the data, and no data should be dropped in this function. This
/// means that no metadata keys should be included in the partitions. Used in flush frozen layer
/// code path.
Initial,
}
impl std::fmt::Display for ImageLayerCreationMode {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{:?}", self)
}
}
/// Wrapper for key range to provide reverse ordering by range length for BinaryHeap
#[derive(Debug, Clone, PartialEq, Eq)]
pub(crate) struct Hole {
@@ -317,7 +336,7 @@ pub struct Timeline {
pub initdb_lsn: Lsn,
/// When did we last calculate the partitioning?
partitioning: tokio::sync::Mutex<(KeyPartitioning, Lsn)>,
partitioning: tokio::sync::Mutex<((KeyPartitioning, SparseKeyPartitioning), Lsn)>,
/// Configuration: how often should the partitioning be recalculated.
repartition_threshold: u64,
@@ -2104,7 +2123,10 @@ impl Timeline {
// initial logical size is 0.
LogicalSize::empty_initial()
},
partitioning: tokio::sync::Mutex::new((KeyPartitioning::new(), Lsn(0))),
partitioning: tokio::sync::Mutex::new((
(KeyPartitioning::new(), KeyPartitioning::new().into_sparse()),
Lsn(0),
)),
repartition_threshold: 0,
last_image_layer_creation_check_at: AtomicLsn::new(0),
@@ -3106,7 +3128,6 @@ impl Timeline {
if let Some(SearchResult { lsn_floor, layer }) = layers.search(key, cont_lsn) {
let layer = guard.get_from_desc(&layer);
drop(guard);
// Get all the data needed to reconstruct the page version from this layer.
// But if we have an older cached page image, no need to go past that.
let lsn_floor = max(cached_lsn + 1, lsn_floor);
@@ -3227,7 +3248,7 @@ impl Timeline {
Ok(())
}
/// Collect the reconstruct data for a ketspace from the specified timeline.
/// Collect the reconstruct data for a keyspace from the specified timeline.
///
/// Maintain a fringe [`LayerFringe`] which tracks all the layers that intersect
/// the current keyspace. The current keyspace of the search at any given timeline
@@ -3656,66 +3677,103 @@ impl Timeline {
// files instead. This is possible as long as *all* the data imported into the
// repository have the same LSN.
let lsn_range = frozen_layer.get_lsn_range();
let (layers_to_upload, delta_layer_to_add) =
if lsn_range.start == self.initdb_lsn && lsn_range.end == Lsn(self.initdb_lsn.0 + 1) {
#[cfg(test)]
match &mut *self.flush_loop_state.lock().unwrap() {
FlushLoopState::NotStarted | FlushLoopState::Exited => {
panic!("flush loop not running")
}
FlushLoopState::Running {
initdb_optimization_count,
..
} => {
// Whether to directly create image layers for this flush, or flush them as delta layers
let create_image_layer =
lsn_range.start == self.initdb_lsn && lsn_range.end == Lsn(self.initdb_lsn.0 + 1);
#[cfg(test)]
{
match &mut *self.flush_loop_state.lock().unwrap() {
FlushLoopState::NotStarted | FlushLoopState::Exited => {
panic!("flush loop not running")
}
FlushLoopState::Running {
expect_initdb_optimization,
initdb_optimization_count,
..
} => {
if create_image_layer {
*initdb_optimization_count += 1;
}
}
// Note: The 'ctx' in use here has DownloadBehavior::Error. We should not
// require downloading anything during initial import.
let (partitioning, _lsn) = self
.repartition(
self.initdb_lsn,
self.get_compaction_target_size(),
EnumSet::empty(),
ctx,
)
.await?;
if self.cancel.is_cancelled() {
return Err(FlushLayerError::Cancelled);
}
// For image layers, we add them immediately into the layer map.
(
self.create_image_layers(&partitioning, self.initdb_lsn, true, ctx)
.await?,
None,
)
} else {
#[cfg(test)]
match &mut *self.flush_loop_state.lock().unwrap() {
FlushLoopState::NotStarted | FlushLoopState::Exited => {
panic!("flush loop not running")
}
FlushLoopState::Running {
expect_initdb_optimization,
..
} => {
} else {
assert!(!*expect_initdb_optimization, "expected initdb optimization");
}
}
// Normal case, write out a L0 delta layer file.
// `create_delta_layer` will not modify the layer map.
// We will remove frozen layer and add delta layer in one atomic operation later.
let layer = self.create_delta_layer(&frozen_layer, ctx).await?;
(
// FIXME: even though we have a single image and single delta layer assumption
// we push them to vec
vec![layer.clone()],
Some(layer),
}
}
let (layers_to_upload, delta_layer_to_add) = if create_image_layer {
// Note: The 'ctx' in use here has DownloadBehavior::Error. We should not
// require downloading anything during initial import.
let ((rel_partition, metadata_partition), _lsn) = self
.repartition(
self.initdb_lsn,
self.get_compaction_target_size(),
EnumSet::empty(),
ctx,
)
.await?;
if self.cancel.is_cancelled() {
return Err(FlushLayerError::Cancelled);
}
// For metadata, always create delta layers.
let delta_layer = if !metadata_partition.parts.is_empty() {
assert_eq!(
metadata_partition.parts.len(),
1,
"currently sparse keyspace should only contain a single aux file keyspace"
);
let metadata_keyspace = &metadata_partition.parts[0];
assert_eq!(
metadata_keyspace.0.ranges.len(),
1,
"aux file keyspace should be a single range"
);
self.create_delta_layer(
&frozen_layer,
ctx,
Some(metadata_keyspace.0.ranges[0].clone()),
)
.await?
} else {
None
};
// For image layers, we add them immediately into the layer map.
let mut layers_to_upload = Vec::new();
layers_to_upload.extend(
self.create_image_layers(
&rel_partition,
self.initdb_lsn,
ImageLayerCreationMode::Initial,
ctx,
)
.await?,
);
if let Some(delta_layer) = delta_layer {
layers_to_upload.push(delta_layer.clone());
(layers_to_upload, Some(delta_layer))
} else {
(layers_to_upload, None)
}
} else {
// Normal case, write out a L0 delta layer file.
// `create_delta_layer` will not modify the layer map.
// We will remove frozen layer and add delta layer in one atomic operation later.
let Some(layer) = self.create_delta_layer(&frozen_layer, ctx, None).await? else {
panic!("delta layer cannot be empty if no filter is applied");
};
(
// FIXME: even though we have a single image and single delta layer assumption
// we push them to vec
vec![layer.clone()],
Some(layer),
)
};
pausable_failpoint!("flush-layer-cancel-after-writing-layer-out-pausable");
if self.cancel.is_cancelled() {
@@ -3835,12 +3893,18 @@ impl Timeline {
self: &Arc<Self>,
frozen_layer: &Arc<InMemoryLayer>,
ctx: &RequestContext,
) -> anyhow::Result<ResidentLayer> {
key_range: Option<Range<Key>>,
) -> anyhow::Result<Option<ResidentLayer>> {
let self_clone = Arc::clone(self);
let frozen_layer = Arc::clone(frozen_layer);
let ctx = ctx.attached_child();
let work = async move {
let new_delta = frozen_layer.write_to_disk(&self_clone, &ctx).await?;
let Some(new_delta) = frozen_layer
.write_to_disk(&self_clone, &ctx, key_range)
.await?
else {
return Ok(None);
};
// The write_to_disk() above calls writer.finish() which already did the fsync of the inodes.
// We just need to fsync the directory in which these inodes are linked,
// which we know to be the timeline directory.
@@ -3859,7 +3923,7 @@ impl Timeline {
.sync_all()
.await
.fatal_err("VirtualFile::sync_all timeline dir");
anyhow::Ok(new_delta)
anyhow::Ok(Some(new_delta))
};
// Before tokio-epoll-uring, we ran write_to_disk & the sync_all inside spawn_blocking.
// Preserve that behavior to maintain the same behavior for `virtual_file_io_engine=std-fs`.
@@ -3886,19 +3950,20 @@ impl Timeline {
partition_size: u64,
flags: EnumSet<CompactFlags>,
ctx: &RequestContext,
) -> anyhow::Result<(KeyPartitioning, Lsn)> {
) -> anyhow::Result<((KeyPartitioning, SparseKeyPartitioning), Lsn)> {
let Ok(mut partitioning_guard) = self.partitioning.try_lock() else {
// NB: there are two callers, one is the compaction task, of which there is only one per struct Tenant and hence Timeline.
// The other is the initdb optimization in flush_frozen_layer, used by `boostrap_timeline`, which runs before `.activate()`
// and hence before the compaction task starts.
anyhow::bail!("repartition() called concurrently, this should not happen");
};
if lsn < partitioning_guard.1 {
let ((dense_partition, sparse_partition), partition_lsn) = &*partitioning_guard;
if lsn < *partition_lsn {
anyhow::bail!("repartition() called with LSN going backwards, this should not happen");
}
let distance = lsn.0 - partitioning_guard.1 .0;
if partitioning_guard.1 != Lsn(0)
let distance = lsn.0 - partition_lsn.0;
if *partition_lsn != Lsn(0)
&& distance <= self.repartition_threshold
&& !flags.contains(CompactFlags::ForceRepartition)
{
@@ -3907,13 +3972,18 @@ impl Timeline {
threshold = self.repartition_threshold,
"no repartitioning needed"
);
return Ok((partitioning_guard.0.clone(), partitioning_guard.1));
return Ok((
(dense_partition.clone(), sparse_partition.clone()),
*partition_lsn,
));
}
let keyspace = self.collect_keyspace(lsn, ctx).await?;
let partitioning = keyspace.partition(&self.shard_identity, partition_size);
*partitioning_guard = (partitioning, lsn);
let (dense_ks, sparse_ks) = self.collect_keyspace(lsn, ctx).await?;
let dense_partitioning = dense_ks.partition(&self.shard_identity, partition_size);
let sparse_partitioning = SparseKeyPartitioning {
parts: vec![sparse_ks],
}; // no partitioning for metadata keys for now
*partitioning_guard = ((dense_partitioning, sparse_partitioning), lsn);
Ok((partitioning_guard.0.clone(), partitioning_guard.1))
}
@@ -3969,12 +4039,12 @@ impl Timeline {
false
}
#[tracing::instrument(skip_all, fields(%lsn, %force))]
#[tracing::instrument(skip_all, fields(%lsn, %mode))]
async fn create_image_layers(
self: &Arc<Timeline>,
partitioning: &KeyPartitioning,
lsn: Lsn,
force: bool,
mode: ImageLayerCreationMode,
ctx: &RequestContext,
) -> Result<Vec<ResidentLayer>, CreateImageLayersError> {
let timer = self.metrics.create_images_time_histo.start_timer();
@@ -4011,19 +4081,26 @@ impl Timeline {
for partition in partitioning.parts.iter() {
let img_range = start..partition.ranges.last().unwrap().end;
let do_it = if force {
true
} else if check_for_image_layers {
// [`Self::time_for_new_image_layer`] is CPU expensive,
// so skip if we've not collected enough WAL since the last time
self.time_for_new_image_layer(partition, lsn).await
} else {
false
};
if !do_it {
start = img_range.end;
continue;
if partition.overlaps(&Key::metadata_key_range()) {
// TODO(chi): The next patch will correctly create image layers for metadata keys, and it would be a
// rather big change. Keep this patch small for now.
match mode {
ImageLayerCreationMode::Force | ImageLayerCreationMode::Try => {
// skip image layer creation anyways for metadata keys.
start = img_range.end;
continue;
}
ImageLayerCreationMode::Initial => {
return Err(CreateImageLayersError::Other(anyhow::anyhow!("no image layer should be created for metadata keys when flushing frozen layers")));
}
}
} else if let ImageLayerCreationMode::Try = mode {
// check_for_image_layers = false -> skip
// check_for_image_layers = true -> check time_for_new_image_layer -> skip/generate
if !check_for_image_layers || !self.time_for_new_image_layer(partition, lsn).await {
start = img_range.end;
continue;
}
}
let mut image_layer_writer = ImageLayerWriter::new(

View File

@@ -9,7 +9,7 @@ use std::ops::{Deref, Range};
use std::sync::Arc;
use super::layer_manager::LayerManager;
use super::{CompactFlags, DurationRecorder, RecordedDuration, Timeline};
use super::{CompactFlags, DurationRecorder, ImageLayerCreationMode, RecordedDuration, Timeline};
use anyhow::{anyhow, Context};
use enumset::EnumSet;
@@ -102,7 +102,7 @@ impl Timeline {
)
.await
{
Ok((partitioning, lsn)) => {
Ok(((dense_partitioning, sparse_partitioning), lsn)) => {
// Disables access_stats updates, so that the files we read remain candidates for eviction after we're done with them
let image_ctx = RequestContextBuilder::extend(ctx)
.access_stats_behavior(AccessStatsBehavior::Skip)
@@ -115,17 +115,37 @@ impl Timeline {
// 3. Create new image layers for partitions that have been modified
// "enough".
let layers = self
let dense_layers = self
.create_image_layers(
&partitioning,
&dense_partitioning,
lsn,
flags.contains(CompactFlags::ForceImageLayerCreation),
if flags.contains(CompactFlags::ForceImageLayerCreation) {
ImageLayerCreationMode::Force
} else {
ImageLayerCreationMode::Try
},
&image_ctx,
)
.await
.map_err(anyhow::Error::from)?;
self.upload_new_image_layers(layers)?;
// For now, nothing will be produced...
let sparse_layers = self
.create_image_layers(
&sparse_partitioning.clone().into_dense(),
lsn,
if flags.contains(CompactFlags::ForceImageLayerCreation) {
ImageLayerCreationMode::Force
} else {
ImageLayerCreationMode::Try
},
&image_ctx,
)
.await
.map_err(anyhow::Error::from)?;
assert!(sparse_layers.is_empty());
self.upload_new_image_layers(dense_layers)?;
}
Err(err) => {
// no partitioning? This is normal, if the timeline was just created
@@ -758,8 +778,9 @@ impl Timeline {
return Err(CompactionError::ShuttingDown);
}
let keyspace = self.collect_keyspace(end_lsn, ctx).await?;
let mut adaptor = TimelineAdaptor::new(self, (end_lsn, keyspace));
let (dense_ks, _sparse_ks) = self.collect_keyspace(end_lsn, ctx).await?;
// TODO(chi): ignore sparse_keyspace for now, compact it in the future.
let mut adaptor = TimelineAdaptor::new(self, (end_lsn, dense_ks));
pageserver_compaction::compact_tiered::compact_tiered(
&mut adaptor,