mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-07 05:22:56 +00:00
Compaction with on-demand download (#3598)
Repeatedly (twice) try to download the compaction targeted layers before actual compaction. Adds tests for both L0 compaction downloading layers and image creation downloading layers. Image creation support existed already. Fixes #3591 Co-authored-by: Christian Schwarz <christian@neon.tech>
This commit is contained in:
@@ -571,14 +571,15 @@ impl RemoteTimelineClient {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
///
|
||||
/// Launch a delete operation in the background.
|
||||
///
|
||||
/// The operation does not modify local state but assumes the local files have already been
|
||||
/// deleted, and is used to mirror those changes to remote.
|
||||
///
|
||||
/// Note: This schedules an index file upload before the deletions. The
|
||||
/// deletion won't actually be performed, until any previously scheduled
|
||||
/// upload operations, and the index file upload, have completed
|
||||
/// succesfully.
|
||||
///
|
||||
pub fn schedule_layer_file_deletion(
|
||||
self: &Arc<Self>,
|
||||
names: &[LayerFileName],
|
||||
|
||||
@@ -613,7 +613,10 @@ impl Timeline {
|
||||
self.flush_frozen_layers_and_wait().await
|
||||
}
|
||||
|
||||
/// Outermost timeline compaction operation; downloads needed layers.
|
||||
pub async fn compact(&self, ctx: &RequestContext) -> anyhow::Result<()> {
|
||||
const ROUNDS: usize = 2;
|
||||
|
||||
let last_record_lsn = self.get_last_record_lsn();
|
||||
|
||||
// Last record Lsn could be zero in case the timeline was just created
|
||||
@@ -622,6 +625,86 @@ impl Timeline {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
// retry two times to allow first round to find layers which need to be downloaded, then
|
||||
// download them, then retry compaction
|
||||
for round in 0..ROUNDS {
|
||||
// should we error out with the most specific error?
|
||||
let last_round = round == ROUNDS - 1;
|
||||
|
||||
let res = self.compact_inner(ctx).await;
|
||||
|
||||
// If `create_image_layers' or `compact_level0` scheduled any
|
||||
// uploads or deletions, but didn't update the index file yet,
|
||||
// do it now.
|
||||
//
|
||||
// This isn't necessary for correctness, the remote state is
|
||||
// consistent without the uploads and deletions, and we would
|
||||
// update the index file on next flush iteration too. But it
|
||||
// could take a while until that happens.
|
||||
//
|
||||
// Additionally, only do this on the terminal round before sleeping.
|
||||
if last_round {
|
||||
if let Some(remote_client) = &self.remote_client {
|
||||
remote_client.schedule_index_upload_for_file_changes()?;
|
||||
}
|
||||
}
|
||||
|
||||
let rls = match res {
|
||||
Ok(()) => return Ok(()),
|
||||
Err(CompactionError::DownloadRequired(rls)) if !last_round => {
|
||||
// this can be done at most one time before exiting, waiting
|
||||
rls
|
||||
}
|
||||
Err(CompactionError::DownloadRequired(rls)) => {
|
||||
anyhow::bail!("Compaction requires downloading multiple times (last was {} layers), possibly battling against eviction", rls.len())
|
||||
}
|
||||
Err(CompactionError::Other(e)) => {
|
||||
return Err(e);
|
||||
}
|
||||
};
|
||||
|
||||
// this path can be visited in the second round of retrying, if first one found that we
|
||||
// must first download some remote layers
|
||||
let total = rls.len();
|
||||
|
||||
let mut downloads = rls
|
||||
.into_iter()
|
||||
.map(|rl| self.download_remote_layer(rl))
|
||||
.collect::<futures::stream::FuturesUnordered<_>>();
|
||||
|
||||
let mut failed = 0;
|
||||
|
||||
let cancelled = task_mgr::shutdown_watcher();
|
||||
tokio::pin!(cancelled);
|
||||
|
||||
loop {
|
||||
tokio::select! {
|
||||
_ = &mut cancelled => anyhow::bail!("Cancelled while downloading remote layers"),
|
||||
res = downloads.next() => {
|
||||
match res {
|
||||
Some(Ok(())) => {},
|
||||
Some(Err(e)) => {
|
||||
warn!("Downloading remote layer for compaction failed: {e:#}");
|
||||
failed += 1;
|
||||
}
|
||||
None => break,
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if failed != 0 {
|
||||
anyhow::bail!("{failed} out of {total} layers failed to download, retrying later");
|
||||
}
|
||||
|
||||
// if everything downloaded fine, lets try again
|
||||
}
|
||||
|
||||
unreachable!("retry loop exits")
|
||||
}
|
||||
|
||||
/// Compaction which might need to be retried after downloading remote layers.
|
||||
async fn compact_inner(&self, ctx: &RequestContext) -> Result<(), CompactionError> {
|
||||
//
|
||||
// High level strategy for compaction / image creation:
|
||||
//
|
||||
@@ -660,7 +743,7 @@ impl Timeline {
|
||||
// Is the timeline being deleted?
|
||||
let state = *self.state.borrow();
|
||||
if state == TimelineState::Stopping {
|
||||
anyhow::bail!("timeline is Stopping");
|
||||
return Err(anyhow::anyhow!("timeline is Stopping").into());
|
||||
}
|
||||
|
||||
let target_file_size = self.get_checkpoint_distance();
|
||||
@@ -680,7 +763,8 @@ impl Timeline {
|
||||
// "enough".
|
||||
let layer_paths_to_upload = self
|
||||
.create_image_layers(&partitioning, lsn, false, ctx)
|
||||
.await?;
|
||||
.await
|
||||
.map_err(anyhow::Error::from)?;
|
||||
if let Some(remote_client) = &self.remote_client {
|
||||
for (path, layer_metadata) in layer_paths_to_upload {
|
||||
remote_client.schedule_layer_file_upload(&path, &layer_metadata)?;
|
||||
@@ -692,18 +776,6 @@ impl Timeline {
|
||||
self.compact_level0(&layer_removal_cs, target_file_size, ctx)
|
||||
.await?;
|
||||
timer.stop_and_record();
|
||||
|
||||
// If `create_image_layers' or `compact_level0` scheduled any
|
||||
// uploads or deletions, but didn't update the index file yet,
|
||||
// do it now.
|
||||
//
|
||||
// This isn't necessary for correctness, the remote state is
|
||||
// consistent without the uploads and deletions, and we would
|
||||
// update the index file on next flush iteration too. But it
|
||||
// could take a while until that happens.
|
||||
if let Some(remote_client) = &self.remote_client {
|
||||
remote_client.schedule_index_upload_for_file_changes()?;
|
||||
}
|
||||
}
|
||||
Err(err) => {
|
||||
// no partitioning? This is normal, if the timeline was just created
|
||||
@@ -2541,10 +2613,13 @@ impl Timeline {
|
||||
) -> anyhow::Result<(KeyPartitioning, Lsn)> {
|
||||
{
|
||||
let partitioning_guard = self.partitioning.lock().unwrap();
|
||||
if partitioning_guard.1 != Lsn(0)
|
||||
&& lsn.0 - partitioning_guard.1 .0 <= self.repartition_threshold
|
||||
{
|
||||
// no repartitioning needed
|
||||
let distance = lsn.0 - partitioning_guard.1 .0;
|
||||
if partitioning_guard.1 != Lsn(0) && distance <= self.repartition_threshold {
|
||||
debug!(
|
||||
distance,
|
||||
threshold = self.repartition_threshold,
|
||||
"no repartitioning needed"
|
||||
);
|
||||
return Ok((partitioning_guard.0.clone(), partitioning_guard.1));
|
||||
}
|
||||
}
|
||||
@@ -2562,8 +2637,12 @@ impl Timeline {
|
||||
|
||||
// Is it time to create a new image layer for the given partition?
|
||||
fn time_for_new_image_layer(&self, partition: &KeySpace, lsn: Lsn) -> anyhow::Result<bool> {
|
||||
let threshold = self.get_image_creation_threshold();
|
||||
|
||||
let layers = self.layers.read().unwrap();
|
||||
|
||||
let mut max_deltas = 0;
|
||||
|
||||
for part_range in &partition.ranges {
|
||||
let image_coverage = layers.image_coverage(part_range, lsn)?;
|
||||
for (img_range, last_img) in image_coverage {
|
||||
@@ -2585,21 +2664,25 @@ impl Timeline {
|
||||
// are some delta layers *later* than current 'lsn', if more WAL was processed and flushed
|
||||
// after we read last_record_lsn, which is passed here in the 'lsn' argument.
|
||||
if img_lsn < lsn {
|
||||
let threshold = self.get_image_creation_threshold();
|
||||
let num_deltas =
|
||||
layers.count_deltas(&img_range, &(img_lsn..lsn), Some(threshold))?;
|
||||
|
||||
debug!(
|
||||
"key range {}-{}, has {} deltas on this timeline in LSN range {}..{}",
|
||||
img_range.start, img_range.end, num_deltas, img_lsn, lsn
|
||||
);
|
||||
max_deltas = max_deltas.max(num_deltas);
|
||||
if num_deltas >= threshold {
|
||||
debug!(
|
||||
"key range {}-{}, has {} deltas on this timeline in LSN range {}..{}",
|
||||
img_range.start, img_range.end, num_deltas, img_lsn, lsn
|
||||
);
|
||||
return Ok(true);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
debug!(
|
||||
max_deltas,
|
||||
"none of the partitioned ranges had >= {threshold} deltas"
|
||||
);
|
||||
Ok(false)
|
||||
}
|
||||
|
||||
@@ -2712,25 +2795,55 @@ impl Timeline {
|
||||
Ok(layer_paths_to_upload)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
struct CompactLevel0Phase1Result {
|
||||
new_layers: Vec<DeltaLayer>,
|
||||
deltas_to_compact: Vec<Arc<dyn PersistentLayer>>,
|
||||
}
|
||||
|
||||
/// Top-level failure to compact.
|
||||
#[derive(Debug)]
|
||||
enum CompactionError {
|
||||
/// L0 compaction requires layers to be downloaded.
|
||||
///
|
||||
/// This should not happen repeatedly, but will be retried once by top-level
|
||||
/// `Timeline::compact`.
|
||||
DownloadRequired(Vec<Arc<RemoteLayer>>),
|
||||
/// Compaction cannot be done right now; page reconstruction and so on.
|
||||
Other(anyhow::Error),
|
||||
}
|
||||
|
||||
impl From<anyhow::Error> for CompactionError {
|
||||
fn from(value: anyhow::Error) -> Self {
|
||||
CompactionError::Other(value)
|
||||
}
|
||||
}
|
||||
|
||||
impl Timeline {
|
||||
/// Level0 files first phase of compaction, explained in the [`compact_inner`] comment.
|
||||
///
|
||||
/// This method takes the `_layer_removal_cs` guard to highlight it required downloads are
|
||||
/// returned as an error. If the `layer_removal_cs` boundary is changed not to be taken in the
|
||||
/// start of level0 files compaction, the on-demand download should be revisited as well.
|
||||
async fn compact_level0_phase1(
|
||||
&self,
|
||||
_layer_removal_cs: &tokio::sync::MutexGuard<'_, ()>,
|
||||
target_file_size: u64,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<CompactLevel0Phase1Result> {
|
||||
) -> Result<CompactLevel0Phase1Result, CompactionError> {
|
||||
let layers = self.layers.read().unwrap();
|
||||
let mut level0_deltas = layers.get_level0_deltas()?;
|
||||
drop(layers);
|
||||
|
||||
// Only compact if enough layers have accumulated.
|
||||
if level0_deltas.is_empty() || level0_deltas.len() < self.get_compaction_threshold() {
|
||||
return Ok(Default::default());
|
||||
let threshold = self.get_compaction_threshold();
|
||||
if level0_deltas.is_empty() || level0_deltas.len() < threshold {
|
||||
debug!(
|
||||
level0_deltas = level0_deltas.len(),
|
||||
threshold, "too few deltas to compact"
|
||||
);
|
||||
return Ok(CompactLevel0Phase1Result::default());
|
||||
}
|
||||
|
||||
// Gather the files to compact in this iteration.
|
||||
@@ -2766,6 +2879,24 @@ impl Timeline {
|
||||
end: deltas_to_compact.last().unwrap().get_lsn_range().end,
|
||||
};
|
||||
|
||||
let remotes = deltas_to_compact
|
||||
.iter()
|
||||
.filter(|l| l.is_remote_layer())
|
||||
.inspect(|l| info!("compact requires download of {}", l.filename().file_name()))
|
||||
.map(|l| {
|
||||
l.clone()
|
||||
.downcast_remote_layer()
|
||||
.expect("just checked it is remote layer")
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
if !remotes.is_empty() {
|
||||
// caller is holding the lock to layer_removal_cs, and we don't want to download while
|
||||
// holding that; in future download_remote_layer might take it as well. this is
|
||||
// regardless of earlier image creation downloading on-demand, while holding the lock.
|
||||
return Err(CompactionError::DownloadRequired(remotes));
|
||||
}
|
||||
|
||||
info!(
|
||||
"Starting Level0 compaction in LSN range {}-{} for {} layers ({} deltas in total)",
|
||||
lsn_range.start,
|
||||
@@ -2773,9 +2904,11 @@ impl Timeline {
|
||||
deltas_to_compact.len(),
|
||||
level0_deltas.len()
|
||||
);
|
||||
|
||||
for l in deltas_to_compact.iter() {
|
||||
info!("compact includes {}", l.filename().file_name());
|
||||
}
|
||||
|
||||
// We don't need the original list of layers anymore. Drop it so that
|
||||
// we don't accidentally use it later in the function.
|
||||
drop(level0_deltas);
|
||||
@@ -2945,7 +3078,9 @@ impl Timeline {
|
||||
}
|
||||
|
||||
fail_point!("delta-layer-writer-fail-before-finish", |_| {
|
||||
anyhow::bail!("failpoint delta-layer-writer-fail-before-finish");
|
||||
return Err(
|
||||
anyhow::anyhow!("failpoint delta-layer-writer-fail-before-finish").into(),
|
||||
);
|
||||
});
|
||||
|
||||
writer.as_mut().unwrap().put_value(key, lsn, value)?;
|
||||
@@ -2964,7 +3099,7 @@ impl Timeline {
|
||||
|
||||
// Fsync all the layer files and directory using multiple threads to
|
||||
// minimize latency.
|
||||
par_fsync::par_fsync(&layer_paths)?;
|
||||
par_fsync::par_fsync(&layer_paths).context("fsync all new layers")?;
|
||||
|
||||
layer_paths.pop().unwrap();
|
||||
}
|
||||
@@ -2986,11 +3121,13 @@ impl Timeline {
|
||||
layer_removal_cs: &tokio::sync::MutexGuard<'_, ()>,
|
||||
target_file_size: u64,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
) -> Result<(), CompactionError> {
|
||||
let CompactLevel0Phase1Result {
|
||||
new_layers,
|
||||
deltas_to_compact,
|
||||
} = self.compact_level0_phase1(target_file_size, ctx).await?;
|
||||
} = self
|
||||
.compact_level0_phase1(layer_removal_cs, target_file_size, ctx)
|
||||
.await?;
|
||||
|
||||
if new_layers.is_empty() && deltas_to_compact.is_empty() {
|
||||
// nothing to do
|
||||
@@ -3014,7 +3151,12 @@ impl Timeline {
|
||||
for l in new_layers {
|
||||
let new_delta_path = l.path();
|
||||
|
||||
let metadata = new_delta_path.metadata()?;
|
||||
let metadata = new_delta_path.metadata().with_context(|| {
|
||||
format!(
|
||||
"read file metadata for new created layer {}",
|
||||
new_delta_path.display()
|
||||
)
|
||||
})?;
|
||||
|
||||
if let Some(remote_client) = &self.remote_client {
|
||||
remote_client.schedule_layer_file_upload(
|
||||
@@ -3248,7 +3390,7 @@ impl Timeline {
|
||||
|
||||
let mut layers_to_remove = Vec::new();
|
||||
|
||||
// Scan all on-disk layers in the timeline.
|
||||
// Scan all layers in the timeline (remote or on-disk).
|
||||
//
|
||||
// Garbage collect the layer if all conditions are satisfied:
|
||||
// 1. it is older than cutoff LSN;
|
||||
|
||||
@@ -1,7 +1,10 @@
|
||||
# It's possible to run any regular test with the local fs remote storage via
|
||||
# env ZENITH_PAGESERVER_OVERRIDES="remote_storage={local_path='/tmp/neon_zzz/'}" poetry ......
|
||||
|
||||
import time
|
||||
from collections import defaultdict
|
||||
from pathlib import Path
|
||||
from typing import Any, DefaultDict, Dict
|
||||
|
||||
import pytest
|
||||
from fixtures.log_helper import log
|
||||
@@ -10,6 +13,7 @@ from fixtures.neon_fixtures import (
|
||||
RemoteStorageKind,
|
||||
assert_tenant_status,
|
||||
available_remote_storages,
|
||||
wait_for_last_flush_lsn,
|
||||
wait_for_last_record_lsn,
|
||||
wait_for_sk_commit_lsn_to_reach_remote_storage,
|
||||
wait_for_upload,
|
||||
@@ -449,3 +453,167 @@ def test_download_remote_layers_api(
|
||||
pg_old = env.postgres.create_start(branch_name="main")
|
||||
with pg_old.cursor() as cur:
|
||||
assert query_scalar(cur, "select count(*) from testtab") == table_len
|
||||
|
||||
|
||||
@pytest.mark.parametrize("remote_storage_kind", [RemoteStorageKind.MOCK_S3])
|
||||
def test_compaction_downloads_on_demand_without_image_creation(
|
||||
neon_env_builder: NeonEnvBuilder, remote_storage_kind: RemoteStorageKind
|
||||
):
|
||||
"""
|
||||
Create a few layers, then evict, then make sure compaction runs successfully.
|
||||
"""
|
||||
neon_env_builder.enable_remote_storage(
|
||||
remote_storage_kind=remote_storage_kind,
|
||||
test_name="test_compaction_downloads_on_demand_without_image_creation",
|
||||
)
|
||||
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
conf = {
|
||||
# Disable background GC & compaction
|
||||
"gc_period": "0s",
|
||||
"compaction_period": "0s",
|
||||
# unused, because manual will be called after each table
|
||||
"checkpoint_distance": 100 * 1024**2,
|
||||
# this will be updated later on to allow manual compaction outside of checkpoints
|
||||
"compaction_threshold": 100,
|
||||
# repartitioning parameter, not required here
|
||||
"image_creation_threshold": 100,
|
||||
# repartitioning parameter, not required here
|
||||
"compaction_target_size": 128 * 1024**2,
|
||||
# pitr_interval and gc_horizon are not interesting because we dont run gc
|
||||
}
|
||||
|
||||
# Override defaults, to create more layers
|
||||
tenant_id, timeline_id = env.neon_cli.create_tenant(conf=stringify(conf))
|
||||
env.initial_tenant = tenant_id
|
||||
pageserver_http = env.pageserver.http_client()
|
||||
|
||||
with env.postgres.create_start("main") as pg:
|
||||
# no particular reason to create the layers like this, but we are sure
|
||||
# not to hit the image_creation_threshold here.
|
||||
with pg.cursor() as cur:
|
||||
cur.execute("create table a as select id::bigint from generate_series(1, 204800) s(id)")
|
||||
wait_for_last_flush_lsn(env, pg, tenant_id, timeline_id)
|
||||
pageserver_http.timeline_checkpoint(tenant_id, timeline_id)
|
||||
|
||||
with pg.cursor() as cur:
|
||||
cur.execute("update a set id = -id")
|
||||
wait_for_last_flush_lsn(env, pg, tenant_id, timeline_id)
|
||||
pageserver_http.timeline_checkpoint(tenant_id, timeline_id)
|
||||
|
||||
layers = pageserver_http.layer_map_info(tenant_id, timeline_id)
|
||||
assert not layers.in_memory_layers, "no inmemory layers expected after post-commit checkpoint"
|
||||
assert len(layers.historic_layers) == 1 + 2, "should have inidb layer and 2 deltas"
|
||||
|
||||
for layer in layers.historic_layers:
|
||||
log.info(f"pre-compact: {layer}")
|
||||
pageserver_http.evict_layer(tenant_id, timeline_id, layer.layer_file_name)
|
||||
|
||||
env.neon_cli.config_tenant(tenant_id, {"compaction_threshold": "3"})
|
||||
|
||||
pageserver_http.timeline_compact(tenant_id, timeline_id)
|
||||
layers = pageserver_http.layer_map_info(tenant_id, timeline_id)
|
||||
for layer in layers.historic_layers:
|
||||
log.info(f"post compact: {layer}")
|
||||
assert len(layers.historic_layers) == 1, "should have compacted to single layer"
|
||||
|
||||
|
||||
@pytest.mark.parametrize("remote_storage_kind", [RemoteStorageKind.MOCK_S3])
|
||||
def test_compaction_downloads_on_demand_with_image_creation(
|
||||
neon_env_builder: NeonEnvBuilder, remote_storage_kind: RemoteStorageKind
|
||||
):
|
||||
"""
|
||||
Create layers, compact with high image_creation_threshold, then run final compaction with all layers evicted.
|
||||
|
||||
Due to current implementation, this will make image creation on-demand download layers, but we cannot really
|
||||
directly test for it.
|
||||
"""
|
||||
neon_env_builder.enable_remote_storage(
|
||||
remote_storage_kind=remote_storage_kind,
|
||||
test_name="test_compaction_downloads_on_demand",
|
||||
)
|
||||
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
conf = {
|
||||
# Disable background GC & compaction
|
||||
"gc_period": "0s",
|
||||
"compaction_period": "0s",
|
||||
# repartitioning threshold is this / 10, but it doesn't really seem to matter
|
||||
"checkpoint_distance": 50 * 1024**2,
|
||||
"compaction_threshold": 3,
|
||||
# important: keep this high for the data ingestion
|
||||
"image_creation_threshold": 100,
|
||||
# repartitioning parameter, unused
|
||||
"compaction_target_size": 128 * 1024**2,
|
||||
# pitr_interval and gc_horizon are not interesting because we dont run gc
|
||||
}
|
||||
|
||||
# Override defaults, to create more layers
|
||||
tenant_id, timeline_id = env.neon_cli.create_tenant(conf=stringify(conf))
|
||||
env.initial_tenant = tenant_id
|
||||
pageserver_http = env.pageserver.http_client()
|
||||
|
||||
with env.postgres.create_start("main") as pg:
|
||||
# no particular reason to create the layers like this, but we are sure
|
||||
# not to hit the image_creation_threshold here.
|
||||
with pg.cursor() as cur:
|
||||
cur.execute("create table a (id bigserial primary key, some_value bigint not null)")
|
||||
cur.execute("insert into a(some_value) select i from generate_series(1, 10000) s(i)")
|
||||
wait_for_last_flush_lsn(env, pg, tenant_id, timeline_id)
|
||||
pageserver_http.timeline_checkpoint(tenant_id, timeline_id)
|
||||
|
||||
for _ in range(0, 2):
|
||||
for i in range(0, 3):
|
||||
# create a minimal amount of "delta difficulty" for this table
|
||||
with pg.cursor() as cur:
|
||||
cur.execute("update a set some_value = -some_value + %s", (i,))
|
||||
|
||||
with pg.cursor() as cur:
|
||||
# vacuuming should aid to reuse keys, though it's not really important
|
||||
# with image_creation_threshold=1 which we will use on the last compaction
|
||||
cur.execute("vacuum")
|
||||
|
||||
wait_for_last_flush_lsn(env, pg, tenant_id, timeline_id)
|
||||
pageserver_http.timeline_checkpoint(tenant_id, timeline_id)
|
||||
|
||||
# images should not yet be created, because threshold is too high,
|
||||
# but these will be reshuffled to L1 layers
|
||||
pageserver_http.timeline_compact(tenant_id, timeline_id)
|
||||
|
||||
for _ in range(0, 20):
|
||||
# loop in case flushing is still in progress
|
||||
layers = pageserver_http.layer_map_info(tenant_id, timeline_id)
|
||||
if not layers.in_memory_layers:
|
||||
break
|
||||
time.sleep(0.2)
|
||||
|
||||
layers = pageserver_http.layer_map_info(tenant_id, timeline_id)
|
||||
assert not layers.in_memory_layers, "no inmemory layers expected after post-commit checkpoint"
|
||||
|
||||
kinds_before: DefaultDict[str, int] = defaultdict(int)
|
||||
|
||||
for layer in layers.historic_layers:
|
||||
kinds_before[layer.kind] += 1
|
||||
pageserver_http.evict_layer(tenant_id, timeline_id, layer.layer_file_name)
|
||||
|
||||
assert dict(kinds_before) == {"Delta": 4}
|
||||
|
||||
# now having evicted all layers, reconfigure to have lower image creation
|
||||
# threshold to expose image creation to downloading all of the needed
|
||||
# layers -- threshold of 2 would sound more reasonable, but keeping it as 1
|
||||
# to be less flaky
|
||||
env.neon_cli.config_tenant(tenant_id, {"image_creation_threshold": "1"})
|
||||
|
||||
pageserver_http.timeline_compact(tenant_id, timeline_id)
|
||||
layers = pageserver_http.layer_map_info(tenant_id, timeline_id)
|
||||
kinds_after: DefaultDict[str, int] = defaultdict(int)
|
||||
for layer in layers.historic_layers:
|
||||
kinds_after[layer.kind] += 1
|
||||
|
||||
assert dict(kinds_after) == {"Delta": 4, "Image": 1}
|
||||
|
||||
|
||||
def stringify(conf: Dict[str, Any]) -> Dict[str, str]:
|
||||
return dict(map(lambda x: (x[0], str(x[1])), conf.items()))
|
||||
|
||||
Reference in New Issue
Block a user