Compare commits

...

9 Commits

Author SHA1 Message Date
Alex Chi Z
d33feb0c8e resolve conflicts
Signed-off-by: Alex Chi Z <chi@neon.tech>
2024-08-07 20:01:20 +08:00
Alex Chi Z
674bdcb7c6 Revert "cleanup(compact_level0_phase1): some commentary and wrapping into block expressions (#8544)"
This reverts commit d95b46f3f3.
2024-08-07 19:58:48 +08:00
Alex Chi Z
a3b6c4b0c0 Revert "compaction_level0_phase1: bypass PS PageCache for data blocks (#8543)"
This reverts commit 4825b0fec3.
2024-08-07 19:58:12 +08:00
Arpad Müller
00c981576a Lower level for timeline cancellations during gc (#8626)
Timeline cancellation running in parallel with gc yields error log lines
like:

```
Gc failed 1 times, retrying in 2s: TimelineCancelled
```

They are completely harmless though and normal to occur. Therefore, only
print those messages at an info level. Still print them at all so that
we know what is going on if we focus on a single timeline.
2024-08-07 09:29:52 +02:00
Arpad Müller
c3f2240fbd storage broker: only print one line for version and build tag in init (#8624)
This makes it more consistent with pageserver and safekeeper. Also, it
is easier to collect the two values into one data point.
2024-08-07 09:14:26 +02:00
Yuchen Liang
ed5724d79d scrubber: clean up scan_metadata before prod (#8565)
Part of #8128.

## Problem
Currently, scrubber `scan_metadata` command will return with an error
code if the metadata on remote storage is corrupted with fatal errors.
To safely deploy this command in a cronjob, we want to differentiate
between failures while running scrubber command and the erroneous
metadata. At the same time, we also want our regression tests to catch
corrupted metadata using the scrubber command.

## Summary of changes

- Return with error code only when the scrubber command fails
- Uses explicit checks on errors and warnings to determine metadata
health in regression tests.

**Resolve conflict with `tenant-snapshot` command (after shard split):**
[`test_scrubber_tenant_snapshot`](https://github.com/neondatabase/neon/blob/yuchen/scrubber-scan-cleanup-before-prod/test_runner/regress/test_storage_scrubber.py#L23)
failed before applying 422a8443dd
- When taking a snapshot, the old `index_part.json` in the unsharded
tenant directory is not kept.
- The current `list_timeline_blobs` implementation consider no
`index_part.json` as a parse error.
- During the scan, we are only analyzing shards with highest shard
count, so we will not get a parse error. but we do need to add the
layers to tenant object listing, otherwise we will get index is
referencing a layer that is not in remote storage error.
- **Action:** Add s3_layers from `list_timeline_blobs` regardless of
parsing error

Signed-off-by: Yuchen Liang <yuchen@neon.tech>
2024-08-06 18:55:42 +01:00
John Spray
ca5390a89d pageserver: add bench_ingest (#7409)
## Problem

We lack a rust bench for the inmemory layer and delta layer write paths:
it is useful to benchmark these components independent of postgres & WAL
decoding.

Related: https://github.com/neondatabase/neon/issues/8452

## Summary of changes

- Refactor DeltaLayerWriter to avoid carrying a Timeline, so that it can
be cleanly tested + benched without a Tenant/Timeline test harness. It
only needed the Timeline for building `Layer`, so this can be done in a
separate step.
- Add `bench_ingest`, which exercises a variety of workload "shapes"
(big values, small values, sequential keys, random keys)
- Include a small uncontroversial optimization: in `freeze`, only
exhaustively walk values to assert ordering relative to end_lsn in debug
mode.

These benches are limited by drive performance on a lot of machines, but
still useful as a local tool for iterating on CPU/memory improvements
around this code path.

Anecdotal measurements on Hetzner AX102 (Ryzen 7950xd):

```

ingest-small-values/ingest 128MB/100b seq
                        time:   [1.1160 s 1.1230 s 1.1289 s]
                        thrpt:  [113.38 MiB/s 113.98 MiB/s 114.70 MiB/s]
Found 1 outliers among 10 measurements (10.00%)
  1 (10.00%) low mild
Benchmarking ingest-small-values/ingest 128MB/100b rand: Warming up for 3.0000 s
Warning: Unable to complete 10 samples in 10.0s. You may wish to increase target time to 18.9s.
ingest-small-values/ingest 128MB/100b rand
                        time:   [1.9001 s 1.9056 s 1.9110 s]
                        thrpt:  [66.982 MiB/s 67.171 MiB/s 67.365 MiB/s]
Benchmarking ingest-small-values/ingest 128MB/100b rand-1024keys: Warming up for 3.0000 s
Warning: Unable to complete 10 samples in 10.0s. You may wish to increase target time to 11.0s.
ingest-small-values/ingest 128MB/100b rand-1024keys
                        time:   [1.0715 s 1.0828 s 1.0937 s]
                        thrpt:  [117.04 MiB/s 118.21 MiB/s 119.46 MiB/s]
ingest-small-values/ingest 128MB/100b seq, no delta
                        time:   [425.49 ms 429.07 ms 432.04 ms]
                        thrpt:  [296.27 MiB/s 298.32 MiB/s 300.83 MiB/s]
Found 1 outliers among 10 measurements (10.00%)
  1 (10.00%) low mild

ingest-big-values/ingest 128MB/8k seq
                        time:   [373.03 ms 375.84 ms 379.17 ms]
                        thrpt:  [337.58 MiB/s 340.57 MiB/s 343.13 MiB/s]
Found 1 outliers among 10 measurements (10.00%)
  1 (10.00%) high mild
ingest-big-values/ingest 128MB/8k seq, no delta
                        time:   [81.534 ms 82.811 ms 83.364 ms]
                        thrpt:  [1.4994 GiB/s 1.5095 GiB/s 1.5331 GiB/s]
Found 1 outliers among 10 measurements (10.00%)


```
2024-08-06 16:39:40 +00:00
John Spray
3727c6fbbe pageserver: use layer visibility when composing heatmap (#8616)
## Problem

Sometimes, a layer is Covered by hasn't yet been evicted from local disk
(e.g. shortly after image layer generation). It is not good use of
resources to download these to a secondary location, as there's a good
chance they will never be read.

This follows the previous change that added layer visibility:
- #8511 

Part of epic:
- https://github.com/neondatabase/neon/issues/8398

## Summary of changes

- When generating heatmaps, only include Visible layers
- Update test_secondary_downloads to filter to visible layers when
listing layers from an attached location
2024-08-06 17:15:40 +01:00
John Spray
42229aacf6 pageserver: fixes for layer visibility metric (#8603)
## Problem

In staging, we could see that occasionally tenants were wrapping their
pageserver_visible_physical_size metric past zero to 2^64.

This is harmless right now, but will matter more later when we start
using visible size in things like the /utilization endpoint.

## Summary of changes

- Add debug asserts that detect this case. `test_gc_of_remote_layers`
works as a reproducer for this issue once the asserts are added.
- Tighten up the interface around access_stats so that only Layer can
mutate it.
- In Layer, wrap calls to `record_access` in code that will update the
visible size statistic if the access implicitly marks the layer visible
(this was what caused the bug)
- In LayerManager::rewrite_layers, use the proper set_visibility layer
function instead of directly using access_stats (this is an additional
path where metrics could go bad.)
- Removed unused instances of LayerAccessStats in DeltaLayer and
ImageLayer which I noticed while reviewing the code paths that call
record_access.
2024-08-06 14:47:01 +01:00
33 changed files with 584 additions and 457 deletions

View File

@@ -108,3 +108,7 @@ harness = false
[[bench]]
name = "bench_walredo"
harness = false
[[bench]]
name = "bench_ingest"
harness = false

View File

@@ -0,0 +1,235 @@
use std::{env, num::NonZeroUsize};
use bytes::Bytes;
use camino::Utf8PathBuf;
use criterion::{criterion_group, criterion_main, Criterion};
use pageserver::{
config::PageServerConf,
context::{DownloadBehavior, RequestContext},
l0_flush::{L0FlushConfig, L0FlushGlobalState},
page_cache,
repository::Value,
task_mgr::TaskKind,
tenant::storage_layer::InMemoryLayer,
virtual_file::{self, api::IoEngineKind},
};
use pageserver_api::{key::Key, shard::TenantShardId};
use utils::{
bin_ser::BeSer,
id::{TenantId, TimelineId},
};
// A very cheap hash for generating non-sequential keys.
fn murmurhash32(mut h: u32) -> u32 {
h ^= h >> 16;
h = h.wrapping_mul(0x85ebca6b);
h ^= h >> 13;
h = h.wrapping_mul(0xc2b2ae35);
h ^= h >> 16;
h
}
enum KeyLayout {
/// Sequential unique keys
Sequential,
/// Random unique keys
Random,
/// Random keys, but only use the bits from the mask of them
RandomReuse(u32),
}
enum WriteDelta {
Yes,
No,
}
async fn ingest(
conf: &'static PageServerConf,
put_size: usize,
put_count: usize,
key_layout: KeyLayout,
write_delta: WriteDelta,
) -> anyhow::Result<()> {
let mut lsn = utils::lsn::Lsn(1000);
let mut key = Key::from_i128(0x0);
let timeline_id = TimelineId::generate();
let tenant_id = TenantId::generate();
let tenant_shard_id = TenantShardId::unsharded(tenant_id);
tokio::fs::create_dir_all(conf.timeline_path(&tenant_shard_id, &timeline_id)).await?;
let ctx = RequestContext::new(TaskKind::DebugTool, DownloadBehavior::Error);
let layer = InMemoryLayer::create(conf, timeline_id, tenant_shard_id, lsn, &ctx).await?;
let data = Value::Image(Bytes::from(vec![0u8; put_size])).ser()?;
let ctx = RequestContext::new(
pageserver::task_mgr::TaskKind::WalReceiverConnectionHandler,
pageserver::context::DownloadBehavior::Download,
);
for i in 0..put_count {
lsn += put_size as u64;
// Generate lots of keys within a single relation, which simulates the typical bulk ingest case: people
// usually care the most about write performance when they're blasting a huge batch of data into a huge table.
match key_layout {
KeyLayout::Sequential => {
// Use sequential order to illustrate the experience a user is likely to have
// when ingesting bulk data.
key.field6 = i as u32;
}
KeyLayout::Random => {
// Use random-order keys to avoid giving a false advantage to data structures that are
// faster when inserting on the end.
key.field6 = murmurhash32(i as u32);
}
KeyLayout::RandomReuse(mask) => {
// Use low bits only, to limit cardinality
key.field6 = murmurhash32(i as u32) & mask;
}
}
layer.put_value(key, lsn, &data, &ctx).await?;
}
layer.freeze(lsn + 1).await;
if matches!(write_delta, WriteDelta::Yes) {
let l0_flush_state = L0FlushGlobalState::new(L0FlushConfig::Direct {
max_concurrency: NonZeroUsize::new(1).unwrap(),
});
let (_desc, path) = layer
.write_to_disk(&ctx, None, l0_flush_state.inner())
.await?
.unwrap();
tokio::fs::remove_file(path).await?;
}
Ok(())
}
/// Wrapper to instantiate a tokio runtime
fn ingest_main(
conf: &'static PageServerConf,
put_size: usize,
put_count: usize,
key_layout: KeyLayout,
write_delta: WriteDelta,
) {
let runtime = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
runtime.block_on(async move {
let r = ingest(conf, put_size, put_count, key_layout, write_delta).await;
if let Err(e) = r {
panic!("{e:?}");
}
});
}
/// Declare a series of benchmarks for the Pageserver's ingest write path.
///
/// This benchmark does not include WAL decode: it starts at InMemoryLayer::put_value, and ends either
/// at freezing the ephemeral layer, or writing the ephemeral layer out to an L0 (depending on whether WriteDelta is set).
///
/// Genuine disk I/O is used, so expect results to differ depending on storage. However, when running on
/// a fast disk, CPU is the bottleneck at time of writing.
fn criterion_benchmark(c: &mut Criterion) {
let temp_dir_parent: Utf8PathBuf = env::current_dir().unwrap().try_into().unwrap();
let temp_dir = camino_tempfile::tempdir_in(temp_dir_parent).unwrap();
eprintln!("Data directory: {}", temp_dir.path());
let conf: &'static PageServerConf = Box::leak(Box::new(
pageserver::config::PageServerConf::dummy_conf(temp_dir.path().to_path_buf()),
));
virtual_file::init(16384, IoEngineKind::TokioEpollUring);
page_cache::init(conf.page_cache_size);
{
let mut group = c.benchmark_group("ingest-small-values");
let put_size = 100usize;
let put_count = 128 * 1024 * 1024 / put_size;
group.throughput(criterion::Throughput::Bytes((put_size * put_count) as u64));
group.sample_size(10);
group.bench_function("ingest 128MB/100b seq", |b| {
b.iter(|| {
ingest_main(
conf,
put_size,
put_count,
KeyLayout::Sequential,
WriteDelta::Yes,
)
})
});
group.bench_function("ingest 128MB/100b rand", |b| {
b.iter(|| {
ingest_main(
conf,
put_size,
put_count,
KeyLayout::Random,
WriteDelta::Yes,
)
})
});
group.bench_function("ingest 128MB/100b rand-1024keys", |b| {
b.iter(|| {
ingest_main(
conf,
put_size,
put_count,
KeyLayout::RandomReuse(0x3ff),
WriteDelta::Yes,
)
})
});
group.bench_function("ingest 128MB/100b seq, no delta", |b| {
b.iter(|| {
ingest_main(
conf,
put_size,
put_count,
KeyLayout::Sequential,
WriteDelta::No,
)
})
});
}
{
let mut group = c.benchmark_group("ingest-big-values");
let put_size = 8192usize;
let put_count = 128 * 1024 * 1024 / put_size;
group.throughput(criterion::Throughput::Bytes((put_size * put_count) as u64));
group.sample_size(10);
group.bench_function("ingest 128MB/8k seq", |b| {
b.iter(|| {
ingest_main(
conf,
put_size,
put_count,
KeyLayout::Sequential,
WriteDelta::Yes,
)
})
});
group.bench_function("ingest 128MB/8k seq, no delta", |b| {
b.iter(|| {
ingest_main(
conf,
put_size,
put_count,
KeyLayout::Sequential,
WriteDelta::No,
)
})
});
}
}
criterion_group!(benches, criterion_benchmark);
criterion_main!(benches);

View File

@@ -125,7 +125,6 @@ fn main() -> anyhow::Result<()> {
info!(?conf.virtual_file_io_engine, "starting with virtual_file IO engine");
info!(?conf.get_impl, "starting with get page implementation");
info!(?conf.get_vectored_impl, "starting with vectored get page implementation");
info!(?conf.compact_level0_phase1_value_access, "starting with setting for compact_level0_phase1_value_access");
let tenants_path = conf.tenants_path();
if !tenants_path.exists() {

View File

@@ -29,7 +29,6 @@ use utils::{
logging::LogFormat,
};
use crate::tenant::timeline::compaction::CompactL0Phase1ValueAccess;
use crate::tenant::vectored_blob_io::MaxVectoredReadBytes;
use crate::tenant::{config::TenantConfOpt, timeline::GetImpl};
use crate::tenant::{TENANTS_SEGMENT_NAME, TIMELINES_SEGMENT_NAME};
@@ -296,10 +295,6 @@ pub struct PageServerConf {
pub ephemeral_bytes_per_memory_kb: usize,
pub l0_flush: L0FlushConfig,
/// This flag is temporary and will be removed after gradual rollout.
/// See <https://github.com/neondatabase/neon/issues/8184>.
pub compact_level0_phase1_value_access: CompactL0Phase1ValueAccess,
}
/// We do not want to store this in a PageServerConf because the latter may be logged
@@ -406,8 +401,6 @@ struct PageServerConfigBuilder {
ephemeral_bytes_per_memory_kb: BuilderValue<usize>,
l0_flush: BuilderValue<L0FlushConfig>,
compact_level0_phase1_value_access: BuilderValue<CompactL0Phase1ValueAccess>,
}
impl PageServerConfigBuilder {
@@ -497,7 +490,6 @@ impl PageServerConfigBuilder {
validate_vectored_get: Set(DEFAULT_VALIDATE_VECTORED_GET),
ephemeral_bytes_per_memory_kb: Set(DEFAULT_EPHEMERAL_BYTES_PER_MEMORY_KB),
l0_flush: Set(L0FlushConfig::default()),
compact_level0_phase1_value_access: Set(CompactL0Phase1ValueAccess::default()),
}
}
}
@@ -681,10 +673,6 @@ impl PageServerConfigBuilder {
self.l0_flush = BuilderValue::Set(value);
}
pub fn compact_level0_phase1_value_access(&mut self, value: CompactL0Phase1ValueAccess) {
self.compact_level0_phase1_value_access = BuilderValue::Set(value);
}
pub fn build(self, id: NodeId) -> anyhow::Result<PageServerConf> {
let default = Self::default_values();
@@ -742,7 +730,6 @@ impl PageServerConfigBuilder {
image_compression,
ephemeral_bytes_per_memory_kb,
l0_flush,
compact_level0_phase1_value_access,
}
CUSTOM LOGIC
{
@@ -1015,9 +1002,6 @@ impl PageServerConf {
"l0_flush" => {
builder.l0_flush(utils::toml_edit_ext::deserialize_item(item).context("l0_flush")?)
}
"compact_level0_phase1_value_access" => {
builder.compact_level0_phase1_value_access(utils::toml_edit_ext::deserialize_item(item).context("compact_level0_phase1_value_access")?)
}
_ => bail!("unrecognized pageserver option '{key}'"),
}
}
@@ -1102,7 +1086,6 @@ impl PageServerConf {
validate_vectored_get: defaults::DEFAULT_VALIDATE_VECTORED_GET,
ephemeral_bytes_per_memory_kb: defaults::DEFAULT_EPHEMERAL_BYTES_PER_MEMORY_KB,
l0_flush: L0FlushConfig::default(),
compact_level0_phase1_value_access: CompactL0Phase1ValueAccess::default(),
}
}
}
@@ -1344,7 +1327,6 @@ background_task_maximum_delay = '334 s'
image_compression: defaults::DEFAULT_IMAGE_COMPRESSION,
ephemeral_bytes_per_memory_kb: defaults::DEFAULT_EPHEMERAL_BYTES_PER_MEMORY_KB,
l0_flush: L0FlushConfig::default(),
compact_level0_phase1_value_access: CompactL0Phase1ValueAccess::default(),
},
"Correct defaults should be used when no config values are provided"
);
@@ -1419,7 +1401,6 @@ background_task_maximum_delay = '334 s'
image_compression: defaults::DEFAULT_IMAGE_COMPRESSION,
ephemeral_bytes_per_memory_kb: defaults::DEFAULT_EPHEMERAL_BYTES_PER_MEMORY_KB,
l0_flush: L0FlushConfig::default(),
compact_level0_phase1_value_access: CompactL0Phase1ValueAccess::default(),
},
"Should be able to parse all basic config values correctly"
);

View File

@@ -24,7 +24,7 @@ impl Default for L0FlushConfig {
#[derive(Clone)]
pub struct L0FlushGlobalState(Arc<Inner>);
pub(crate) enum Inner {
pub enum Inner {
PageCached,
Direct { semaphore: tokio::sync::Semaphore },
}
@@ -40,7 +40,7 @@ impl L0FlushGlobalState {
}
}
pub(crate) fn inner(&self) -> &Arc<Inner> {
pub fn inner(&self) -> &Arc<Inner> {
&self.0
}
}

View File

@@ -8,7 +8,8 @@ use std::time::Duration;
pub use pageserver_api::key::{Key, KEY_SIZE};
/// A 'value' stored for a one Key.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
#[derive(Debug, Clone, Serialize, Deserialize)]
#[cfg_attr(test, derive(PartialEq))]
pub enum Value {
/// An Image value contains a full copy of the value
Image(Bytes),

View File

@@ -296,19 +296,13 @@ where
let mut stack = Vec::new();
stack.push((self.root_blk, None));
let block_cursor = self.reader.block_cursor();
let mut node_buf = [0_u8; PAGE_SZ];
while let Some((node_blknum, opt_iter)) = stack.pop() {
// Read the node, through the PS PageCache, into local variable `node_buf`.
// We could keep the page cache read guard alive, but, at the time of writing,
// we run quite small PS PageCache s => can't risk running out of
// PageCache space because this stream isn't consumed fast enough.
let page_read_guard = block_cursor
// Locate the node.
let node_buf = block_cursor
.read_blk(self.start_blk + node_blknum, ctx)
.await?;
node_buf.copy_from_slice(page_read_guard.as_ref());
drop(page_read_guard); // drop page cache read guard early
let node = OnDiskNode::deparse(&node_buf)?;
let node = OnDiskNode::deparse(node_buf.as_ref())?;
let prefix_len = node.prefix_len as usize;
let suffix_len = node.suffix_len as usize;
@@ -351,7 +345,6 @@ where
Either::Left(idx..node.num_children.into())
};
// idx points to the first match now. Keep going from there
while let Some(idx) = iter.next() {
let key_off = idx * suffix_len;

View File

@@ -539,19 +539,25 @@ impl LayerAccessStats {
self.record_residence_event_at(SystemTime::now())
}
pub(crate) fn record_access_at(&self, now: SystemTime) {
fn record_access_at(&self, now: SystemTime) -> bool {
let (mut mask, mut value) = Self::to_low_res_timestamp(Self::ATIME_SHIFT, now);
// A layer which is accessed must be visible.
mask |= 0x1 << Self::VISIBILITY_SHIFT;
value |= 0x1 << Self::VISIBILITY_SHIFT;
self.write_bits(mask, value);
let old_bits = self.write_bits(mask, value);
!matches!(
self.decode_visibility(old_bits),
LayerVisibilityHint::Visible
)
}
pub(crate) fn record_access(&self, ctx: &RequestContext) {
/// Returns true if we modified the layer's visibility to set it to Visible implicitly
/// as a result of this access
pub(crate) fn record_access(&self, ctx: &RequestContext) -> bool {
if ctx.access_stats_behavior() == AccessStatsBehavior::Skip {
return;
return false;
}
self.record_access_at(SystemTime::now())

View File

@@ -36,13 +36,12 @@ use crate::tenant::block_io::{BlockBuf, BlockCursor, BlockLease, BlockReader, Fi
use crate::tenant::disk_btree::{
DiskBtreeBuilder, DiskBtreeIterator, DiskBtreeReader, VisitDirection,
};
use crate::tenant::storage_layer::Layer;
use crate::tenant::timeline::GetVectoredError;
use crate::tenant::vectored_blob_io::{
BlobFlag, MaxVectoredReadBytes, StreamingVectoredReadPlanner, VectoredBlobReader, VectoredRead,
VectoredReadPlanner,
};
use crate::tenant::{PageReconstructError, Timeline};
use crate::tenant::PageReconstructError;
use crate::virtual_file::{self, VirtualFile};
use crate::{walrecord, TEMP_FILE_SUFFIX};
use crate::{DELTA_FILE_MAGIC, STORAGE_FORMAT_VERSION};
@@ -72,10 +71,7 @@ use utils::{
lsn::Lsn,
};
use super::{
AsLayerDesc, LayerAccessStats, LayerName, PersistentLayerDesc, ResidentLayer,
ValuesReconstructState,
};
use super::{AsLayerDesc, LayerName, PersistentLayerDesc, ValuesReconstructState};
///
/// Header stored in the beginning of the file
@@ -200,7 +196,6 @@ impl DeltaKey {
pub struct DeltaLayer {
path: Utf8PathBuf,
pub desc: PersistentLayerDesc,
access_stats: LayerAccessStats,
inner: OnceCell<Arc<DeltaLayerInner>>,
}
@@ -299,7 +294,6 @@ impl DeltaLayer {
/// not loaded already.
///
async fn load(&self, ctx: &RequestContext) -> Result<&Arc<DeltaLayerInner>> {
self.access_stats.record_access(ctx);
// Quick exit if already loaded
self.inner
.get_or_try_init(|| self.load_inner(ctx))
@@ -350,7 +344,6 @@ impl DeltaLayer {
summary.lsn_range,
metadata.len(),
),
access_stats: Default::default(),
inner: OnceCell::new(),
})
}
@@ -373,7 +366,6 @@ impl DeltaLayer {
/// 3. Call `finish`.
///
struct DeltaLayerWriterInner {
conf: &'static PageServerConf,
pub path: Utf8PathBuf,
timeline_id: TimelineId,
tenant_shard_id: TenantShardId,
@@ -420,7 +412,6 @@ impl DeltaLayerWriterInner {
let tree_builder = DiskBtreeBuilder::new(block_buf);
Ok(Self {
conf,
path,
timeline_id,
tenant_shard_id,
@@ -495,11 +486,10 @@ impl DeltaLayerWriterInner {
async fn finish(
self,
key_end: Key,
timeline: &Arc<Timeline>,
ctx: &RequestContext,
) -> anyhow::Result<ResidentLayer> {
) -> anyhow::Result<(PersistentLayerDesc, Utf8PathBuf)> {
let temp_path = self.path.clone();
let result = self.finish0(key_end, timeline, ctx).await;
let result = self.finish0(key_end, ctx).await;
if result.is_err() {
tracing::info!(%temp_path, "cleaning up temporary file after error during writing");
if let Err(e) = std::fs::remove_file(&temp_path) {
@@ -512,9 +502,8 @@ impl DeltaLayerWriterInner {
async fn finish0(
self,
key_end: Key,
timeline: &Arc<Timeline>,
ctx: &RequestContext,
) -> anyhow::Result<ResidentLayer> {
) -> anyhow::Result<(PersistentLayerDesc, Utf8PathBuf)> {
let index_start_blk =
((self.blob_writer.size() + PAGE_SZ as u64 - 1) / PAGE_SZ as u64) as u32;
@@ -579,11 +568,9 @@ impl DeltaLayerWriterInner {
// fsync the file
file.sync_all().await?;
let layer = Layer::finish_creating(self.conf, timeline, desc, &self.path)?;
trace!("created delta layer {}", self.path);
trace!("created delta layer {}", layer.local_path());
Ok(layer)
Ok((desc, self.path))
}
}
@@ -684,14 +671,9 @@ impl DeltaLayerWriter {
pub(crate) async fn finish(
mut self,
key_end: Key,
timeline: &Arc<Timeline>,
ctx: &RequestContext,
) -> anyhow::Result<ResidentLayer> {
self.inner
.take()
.unwrap()
.finish(key_end, timeline, ctx)
.await
) -> anyhow::Result<(PersistentLayerDesc, Utf8PathBuf)> {
self.inner.take().unwrap().finish(key_end, ctx).await
}
#[cfg(test)]
@@ -1598,8 +1580,9 @@ pub(crate) mod test {
use super::*;
use crate::repository::Value;
use crate::tenant::harness::TIMELINE_ID;
use crate::tenant::storage_layer::{Layer, ResidentLayer};
use crate::tenant::vectored_blob_io::StreamingVectoredReadPlanner;
use crate::tenant::Tenant;
use crate::tenant::{Tenant, Timeline};
use crate::{
context::DownloadBehavior,
task_mgr::TaskKind,
@@ -1893,9 +1876,8 @@ pub(crate) mod test {
res?;
}
let resident = writer
.finish(entries_meta.key_range.end, &timeline, &ctx)
.await?;
let (desc, path) = writer.finish(entries_meta.key_range.end, &ctx).await?;
let resident = Layer::finish_creating(harness.conf, &timeline, desc, &path)?;
let inner = resident.get_as_delta(&ctx).await?;
@@ -2084,7 +2066,8 @@ pub(crate) mod test {
.await
.unwrap();
let copied_layer = writer.finish(Key::MAX, &branch, ctx).await.unwrap();
let (desc, path) = writer.finish(Key::MAX, ctx).await.unwrap();
let copied_layer = Layer::finish_creating(tenant.conf, &branch, desc, &path).unwrap();
copied_layer.get_as_delta(ctx).await.unwrap();
@@ -2212,7 +2195,9 @@ pub(crate) mod test {
for (key, lsn, value) in deltas {
writer.put_value(key, lsn, value, ctx).await?;
}
let delta_layer = writer.finish(key_end, tline, ctx).await?;
let (desc, path) = writer.finish(key_end, ctx).await?;
let delta_layer = Layer::finish_creating(tenant.conf, tline, desc, &path)?;
Ok::<_, anyhow::Error>(delta_layer)
}

View File

@@ -32,7 +32,6 @@ use crate::tenant::block_io::{BlockBuf, BlockReader, FileBlockReader};
use crate::tenant::disk_btree::{
DiskBtreeBuilder, DiskBtreeIterator, DiskBtreeReader, VisitDirection,
};
use crate::tenant::storage_layer::LayerAccessStats;
use crate::tenant::timeline::GetVectoredError;
use crate::tenant::vectored_blob_io::{
BlobFlag, MaxVectoredReadBytes, StreamingVectoredReadPlanner, VectoredBlobReader, VectoredRead,
@@ -135,7 +134,6 @@ pub struct ImageLayer {
pub desc: PersistentLayerDesc,
// This entry contains an image of all pages as of this LSN, should be the same as desc.lsn
pub lsn: Lsn,
access_stats: LayerAccessStats,
inner: OnceCell<ImageLayerInner>,
}
@@ -253,7 +251,6 @@ impl ImageLayer {
/// not loaded already.
///
async fn load(&self, ctx: &RequestContext) -> Result<&ImageLayerInner> {
self.access_stats.record_access(ctx);
self.inner
.get_or_try_init(|| self.load_inner(ctx))
.await
@@ -304,7 +301,6 @@ impl ImageLayer {
metadata.len(),
), // Now we assume image layer ALWAYS covers the full range. This may change in the future.
lsn: summary.lsn,
access_stats: Default::default(),
inner: OnceCell::new(),
})
}

View File

@@ -11,9 +11,10 @@ use crate::repository::{Key, Value};
use crate::tenant::block_io::{BlockCursor, BlockReader, BlockReaderRef};
use crate::tenant::ephemeral_file::EphemeralFile;
use crate::tenant::timeline::GetVectoredError;
use crate::tenant::{PageReconstructError, Timeline};
use crate::tenant::PageReconstructError;
use crate::{l0_flush, page_cache, walrecord};
use anyhow::{anyhow, Result};
use camino::Utf8PathBuf;
use pageserver_api::keyspace::KeySpace;
use pageserver_api::models::InMemoryLayerInfo;
use pageserver_api::shard::TenantShardId;
@@ -32,7 +33,9 @@ use std::sync::atomic::Ordering as AtomicOrdering;
use std::sync::atomic::{AtomicU64, AtomicUsize};
use tokio::sync::{RwLock, RwLockWriteGuard};
use super::{DeltaLayerWriter, ResidentLayer, ValueReconstructSituation, ValuesReconstructState};
use super::{
DeltaLayerWriter, PersistentLayerDesc, ValueReconstructSituation, ValuesReconstructState,
};
#[derive(Debug, PartialEq, Eq, Clone, Copy, Hash)]
pub(crate) struct InMemoryLayerFileId(page_cache::FileId);
@@ -410,8 +413,7 @@ impl InMemoryLayer {
/// Common subroutine of the public put_wal_record() and put_page_image() functions.
/// Adds the page version to the in-memory tree
pub(crate) async fn put_value(
pub async fn put_value(
&self,
key: Key,
lsn: Lsn,
@@ -476,8 +478,6 @@ impl InMemoryLayer {
/// Records the end_lsn for non-dropped layers.
/// `end_lsn` is exclusive
pub async fn freeze(&self, end_lsn: Lsn) {
let inner = self.inner.write().await;
assert!(
self.start_lsn < end_lsn,
"{} >= {}",
@@ -495,9 +495,13 @@ impl InMemoryLayer {
})
.expect("frozen_local_path_str set only once");
for vec_map in inner.index.values() {
for (lsn, _pos) in vec_map.as_slice() {
assert!(*lsn < end_lsn);
#[cfg(debug_assertions)]
{
let inner = self.inner.write().await;
for vec_map in inner.index.values() {
for (lsn, _pos) in vec_map.as_slice() {
assert!(*lsn < end_lsn);
}
}
}
}
@@ -507,12 +511,12 @@ impl InMemoryLayer {
/// if there are no matching keys.
///
/// Returns a new delta layer with all the same data as this in-memory layer
pub(crate) async fn write_to_disk(
pub async fn write_to_disk(
&self,
timeline: &Arc<Timeline>,
ctx: &RequestContext,
key_range: Option<Range<Key>>,
) -> Result<Option<ResidentLayer>> {
l0_flush_global_state: &l0_flush::Inner,
) -> Result<Option<(PersistentLayerDesc, Utf8PathBuf)>> {
// Grab the lock in read-mode. We hold it over the I/O, but because this
// layer is not writeable anymore, no one should be trying to acquire the
// write lock on it, so we shouldn't block anyone. There's one exception
@@ -524,9 +528,8 @@ impl InMemoryLayer {
// rare though, so we just accept the potential latency hit for now.
let inner = self.inner.read().await;
let l0_flush_global_state = timeline.l0_flush_global_state.inner().clone();
use l0_flush::Inner;
let _concurrency_permit = match &*l0_flush_global_state {
let _concurrency_permit = match l0_flush_global_state {
Inner::PageCached => None,
Inner::Direct { semaphore, .. } => Some(semaphore.acquire().await),
};
@@ -556,7 +559,7 @@ impl InMemoryLayer {
)
.await?;
match &*l0_flush_global_state {
match l0_flush_global_state {
l0_flush::Inner::PageCached => {
let ctx = RequestContextBuilder::extend(ctx)
.page_content_kind(PageContentKind::InMemoryLayer)
@@ -621,7 +624,7 @@ impl InMemoryLayer {
}
// MAX is used here because we identify L0 layers by full key range
let delta_layer = delta_layer_writer.finish(Key::MAX, timeline, ctx).await?;
let (desc, path) = delta_layer_writer.finish(Key::MAX, ctx).await?;
// Hold the permit until all the IO is done, including the fsync in `delta_layer_writer.finish()``.
//
@@ -633,6 +636,6 @@ impl InMemoryLayer {
// we dirtied when writing to the filesystem have been flushed and marked !dirty.
drop(_concurrency_permit);
Ok(Some(delta_layer))
Ok(Some((desc, path)))
}
}

View File

@@ -316,7 +316,7 @@ impl Layer {
other => GetVectoredError::Other(anyhow::anyhow!(other)),
})?;
self.0.access_stats.record_access(ctx);
self.record_access(ctx);
layer
.get_values_reconstruct_data(keyspace, lsn_range, reconstruct_data, &self.0, ctx)
@@ -396,8 +396,12 @@ impl Layer {
self.0.info(reset)
}
pub(crate) fn access_stats(&self) -> &LayerAccessStats {
&self.0.access_stats
pub(crate) fn latest_activity(&self) -> SystemTime {
self.0.access_stats.latest_activity()
}
pub(crate) fn visibility(&self) -> LayerVisibilityHint {
self.0.access_stats.visibility()
}
pub(crate) fn local_path(&self) -> &Utf8Path {
@@ -447,13 +451,31 @@ impl Layer {
}
}
fn record_access(&self, ctx: &RequestContext) {
if self.0.access_stats.record_access(ctx) {
// Visibility was modified to Visible
tracing::info!(
"Layer {} became visible as a result of access",
self.0.desc.key()
);
if let Some(tl) = self.0.timeline.upgrade() {
tl.metrics
.visible_physical_size_gauge
.add(self.0.desc.file_size)
}
}
}
pub(crate) fn set_visibility(&self, visibility: LayerVisibilityHint) {
let old_visibility = self.access_stats().set_visibility(visibility.clone());
let old_visibility = self.0.access_stats.set_visibility(visibility.clone());
use LayerVisibilityHint::*;
match (old_visibility, visibility) {
(Visible, Covered) => {
// Subtract this layer's contribution to the visible size metric
if let Some(tl) = self.0.timeline.upgrade() {
debug_assert!(
tl.metrics.visible_physical_size_gauge.get() >= self.0.desc.file_size
);
tl.metrics
.visible_physical_size_gauge
.sub(self.0.desc.file_size)
@@ -671,6 +693,9 @@ impl Drop for LayerInner {
}
if matches!(self.access_stats.visibility(), LayerVisibilityHint::Visible) {
debug_assert!(
timeline.metrics.visible_physical_size_gauge.get() >= self.desc.file_size
);
timeline
.metrics
.visible_physical_size_gauge
@@ -1810,7 +1835,7 @@ impl ResidentLayer {
// this is valid because the DownloadedLayer::kind is a OnceCell, not a
// Mutex<OnceCell>, so we cannot go and deinitialize the value with OnceCell::take
// while it's being held.
owner.access_stats.record_access(ctx);
self.owner.record_access(ctx);
delta_layer::DeltaLayerInner::load_keys(d, ctx)
.await

View File

@@ -4,6 +4,7 @@ use bytes::Bytes;
use pageserver_api::key::{Key, KEY_SIZE};
use utils::{id::TimelineId, lsn::Lsn, shard::TenantShardId};
use crate::tenant::storage_layer::Layer;
use crate::{config::PageServerConf, context::RequestContext, repository::Value, tenant::Timeline};
use super::{DeltaLayerWriter, ImageLayerWriter, ResidentLayer};
@@ -173,8 +174,9 @@ impl SplitDeltaLayerWriter {
)
.await?;
let prev_delta_writer = std::mem::replace(&mut self.inner, next_delta_writer);
self.generated_layers
.push(prev_delta_writer.finish(key, tline, ctx).await?);
let (desc, path) = prev_delta_writer.finish(key, ctx).await?;
let delta_layer = Layer::finish_creating(self.conf, tline, desc, &path)?;
self.generated_layers.push(delta_layer);
}
self.inner.put_value(key, lsn, val, ctx).await
}
@@ -190,7 +192,10 @@ impl SplitDeltaLayerWriter {
inner,
..
} = self;
generated_layers.push(inner.finish(end_key, tline, ctx).await?);
let (desc, path) = inner.finish(end_key, ctx).await?;
let delta_layer = Layer::finish_creating(self.conf, tline, desc, &path)?;
generated_layers.push(delta_layer);
Ok(generated_layers)
}

View File

@@ -407,9 +407,16 @@ async fn gc_loop(tenant: Arc<Tenant>, cancel: CancellationToken) {
error_run_count += 1;
let wait_duration = Duration::from_secs_f64(wait_duration);
error!(
"Gc failed {error_run_count} times, retrying in {wait_duration:?}: {e:?}",
);
if matches!(e, crate::tenant::GcError::TimelineCancelled) {
// Timeline was cancelled during gc. We might either be in an event
// that affects the entire tenant (tenant deletion, pageserver shutdown),
// or in one that affects the timeline only (timeline deletion).
// Therefore, don't exit the loop.
info!("Gc failed {error_run_count} times, retrying in {wait_duration:?}: {e:?}");
} else {
error!("Gc failed {error_run_count} times, retrying in {wait_duration:?}: {e:?}");
}
wait_duration
}
}

View File

@@ -59,7 +59,7 @@ use std::{
collections::{BTreeMap, HashMap, HashSet},
sync::atomic::AtomicU64,
};
use std::{cmp::min, ops::ControlFlow};
use std::{cmp::min, cmp::Ordering, ops::ControlFlow};
use std::{
collections::btree_map::Entry,
ops::{Deref, Range},
@@ -137,7 +137,7 @@ use self::layer_manager::LayerManager;
use self::logical_size::LogicalSize;
use self::walreceiver::{WalReceiver, WalReceiverConf};
use super::{config::TenantConf, upload_queue::NotInitialized};
use super::{config::TenantConf, storage_layer::LayerVisibilityHint, upload_queue::NotInitialized};
use super::{debug_assert_current_span_has_tenant_and_timeline_id, AttachedTenantConf};
use super::{remote_timeline_client::index::IndexPart, storage_layer::LayerFringe};
use super::{
@@ -180,6 +180,25 @@ impl std::fmt::Display for ImageLayerCreationMode {
}
}
/// Wrapper for key range to provide reverse ordering by range length for BinaryHeap
#[derive(Debug, Clone, PartialEq, Eq)]
pub(crate) struct Hole {
key_range: Range<Key>,
coverage_size: usize,
}
impl Ord for Hole {
fn cmp(&self, other: &Self) -> Ordering {
other.coverage_size.cmp(&self.coverage_size) // inverse order
}
}
impl PartialOrd for Hole {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
/// Temporary function for immutable storage state refactor, ensures we are dropping mutex guard instead of other things.
/// Can be removed after all refactors are done.
fn drop_rlock<T>(rlock: tokio::sync::RwLockReadGuard<T>) {
@@ -2919,14 +2938,22 @@ impl Timeline {
let guard = self.layers.read().await;
let resident = guard.likely_resident_layers().map(|layer| {
let last_activity_ts = layer.access_stats().latest_activity();
HeatMapLayer::new(
layer.layer_desc().layer_name(),
layer.metadata(),
last_activity_ts,
)
let resident = guard.likely_resident_layers().filter_map(|layer| {
match layer.visibility() {
LayerVisibilityHint::Visible => {
// Layer is visible to one or more read LSNs: elegible for inclusion in layer map
let last_activity_ts = layer.latest_activity();
Some(HeatMapLayer::new(
layer.layer_desc().layer_name(),
layer.metadata(),
last_activity_ts,
))
}
LayerVisibilityHint::Covered => {
// Layer is resident but unlikely to be read: not elegible for inclusion in heatmap.
None
}
}
});
let layers = resident.collect();
@@ -3701,12 +3728,14 @@ impl Timeline {
let frozen_layer = Arc::clone(frozen_layer);
let ctx = ctx.attached_child();
let work = async move {
let Some(new_delta) = frozen_layer
.write_to_disk(&self_clone, &ctx, key_range)
let Some((desc, path)) = frozen_layer
.write_to_disk(&ctx, key_range, self_clone.l0_flush_global_state.inner())
.await?
else {
return Ok(None);
};
let new_delta = Layer::finish_creating(self_clone.conf, &self_clone, desc, &path)?;
// The write_to_disk() above calls writer.finish() which already did the fsync of the inodes.
// We just need to fsync the directory in which these inodes are linked,
// which we know to be the timeline directory.
@@ -5182,7 +5211,7 @@ impl Timeline {
let file_size = layer.layer_desc().file_size;
max_layer_size = max_layer_size.map_or(Some(file_size), |m| Some(m.max(file_size)));
let last_activity_ts = layer.access_stats().latest_activity();
let last_activity_ts = layer.latest_activity();
EvictionCandidate {
layer: layer.into(),
@@ -5339,9 +5368,8 @@ impl Timeline {
for (key, lsn, val) in deltas.data {
delta_layer_writer.put_value(key, lsn, val, ctx).await?;
}
let delta_layer = delta_layer_writer
.finish(deltas.key_range.end, self, ctx)
.await?;
let (desc, path) = delta_layer_writer.finish(deltas.key_range.end, ctx).await?;
let delta_layer = Layer::finish_creating(self.conf, self, desc, &path)?;
{
let mut guard = self.layers.write().await;

View File

@@ -35,8 +35,8 @@ use crate::tenant::storage_layer::merge_iterator::MergeIterator;
use crate::tenant::storage_layer::{
AsLayerDesc, PersistentLayerDesc, PersistentLayerKey, ValueReconstructState,
};
use crate::tenant::timeline::ImageLayerCreationOutcome;
use crate::tenant::timeline::{drop_rlock, DeltaLayerWriter, ImageLayerWriter};
use crate::tenant::timeline::{Hole, ImageLayerCreationOutcome};
use crate::tenant::timeline::{Layer, ResidentLayer};
use crate::tenant::DeltaLayer;
use crate::virtual_file::{MaybeFatalIo, VirtualFile};
@@ -752,230 +752,66 @@ impl Timeline {
.read_lock_held_spawn_blocking_startup_micros
.till_now();
// TODO: replace with streaming k-merge
let all_keys = {
let mut all_keys = Vec::new();
for l in deltas_to_compact.iter() {
all_keys.extend(l.load_keys(ctx).await.map_err(CompactionError::Other)?);
}
// The current stdlib sorting implementation is designed in a way where it is
// particularly fast where the slice is made up of sorted sub-ranges.
all_keys.sort_by_key(|DeltaEntry { key, lsn, .. }| (*key, *lsn));
all_keys
};
// Determine N largest holes where N is number of compacted layers.
let max_holes = deltas_to_compact.len();
let last_record_lsn = self.get_last_record_lsn();
let min_hole_range = (target_file_size / page_cache::PAGE_SZ as u64) as i128;
let min_hole_coverage_size = 3; // TODO: something more flexible?
// min-heap (reserve space for one more element added before eviction)
let mut heap: BinaryHeap<Hole> = BinaryHeap::with_capacity(max_holes + 1);
let mut prev: Option<Key> = None;
let mut all_keys = Vec::new();
for l in deltas_to_compact.iter() {
all_keys.extend(l.load_keys(ctx).await.map_err(CompactionError::Other)?);
}
// FIXME: should spawn_blocking the rest of this function
// The current stdlib sorting implementation is designed in a way where it is
// particularly fast where the slice is made up of sorted sub-ranges.
all_keys.sort_by_key(|DeltaEntry { key, lsn, .. }| (*key, *lsn));
stats.read_lock_held_key_sort_micros = stats.read_lock_held_prerequisites_micros.till_now();
// Determine N largest holes where N is number of compacted layers. The vec is sorted by key range start.
//
// A hole is a key range for which this compaction doesn't have any WAL records.
// Our goal in this compaction iteration is to avoid creating L1s that, in terms of their key range,
// cover the hole, but actually don't contain any WAL records for that key range.
// The reason is that the mere stack of L1s (`count_deltas`) triggers image layer creation (`create_image_layers`).
// That image layer creation would be useless for a hole range covered by L1s that don't contain any WAL records.
//
// The algorithm chooses holes as follows.
// - Slide a 2-window over the keys in key orde to get the hole range (=distance between two keys).
// - Filter: min threshold on range length
// - Rank: by coverage size (=number of image layers required to reconstruct each key in the range for which we have any data)
//
// For more details, intuition, and some ASCII art see https://github.com/neondatabase/neon/pull/3597#discussion_r1112704451
#[derive(PartialEq, Eq)]
struct Hole {
key_range: Range<Key>,
coverage_size: usize,
}
let holes: Vec<Hole> = {
use std::cmp::Ordering;
impl Ord for Hole {
fn cmp(&self, other: &Self) -> Ordering {
self.coverage_size.cmp(&other.coverage_size).reverse()
}
}
impl PartialOrd for Hole {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
let max_holes = deltas_to_compact.len();
let last_record_lsn = self.get_last_record_lsn();
let min_hole_range = (target_file_size / page_cache::PAGE_SZ as u64) as i128;
let min_hole_coverage_size = 3; // TODO: something more flexible?
// min-heap (reserve space for one more element added before eviction)
let mut heap: BinaryHeap<Hole> = BinaryHeap::with_capacity(max_holes + 1);
let mut prev: Option<Key> = None;
for &DeltaEntry { key: next_key, .. } in all_keys.iter() {
if let Some(prev_key) = prev {
// just first fast filter, do not create hole entries for metadata keys. The last hole in the
// compaction is the gap between data key and metadata keys.
if next_key.to_i128() - prev_key.to_i128() >= min_hole_range
&& !Key::is_metadata_key(&prev_key)
{
let key_range = prev_key..next_key;
// Measuring hole by just subtraction of i128 representation of key range boundaries
// has not so much sense, because largest holes will corresponds field1/field2 changes.
// But we are mostly interested to eliminate holes which cause generation of excessive image layers.
// That is why it is better to measure size of hole as number of covering image layers.
let coverage_size =
layers.image_coverage(&key_range, last_record_lsn).len();
if coverage_size >= min_hole_coverage_size {
heap.push(Hole {
key_range,
coverage_size,
});
if heap.len() > max_holes {
heap.pop(); // remove smallest hole
}
for &DeltaEntry { key: next_key, .. } in all_keys.iter() {
if let Some(prev_key) = prev {
// just first fast filter, do not create hole entries for metadata keys. The last hole in the
// compaction is the gap between data key and metadata keys.
if next_key.to_i128() - prev_key.to_i128() >= min_hole_range
&& !Key::is_metadata_key(&prev_key)
{
let key_range = prev_key..next_key;
// Measuring hole by just subtraction of i128 representation of key range boundaries
// has not so much sense, because largest holes will corresponds field1/field2 changes.
// But we are mostly interested to eliminate holes which cause generation of excessive image layers.
// That is why it is better to measure size of hole as number of covering image layers.
let coverage_size = layers.image_coverage(&key_range, last_record_lsn).len();
if coverage_size >= min_hole_coverage_size {
heap.push(Hole {
key_range,
coverage_size,
});
if heap.len() > max_holes {
heap.pop(); // remove smallest hole
}
}
}
prev = Some(next_key.next());
}
let mut holes = heap.into_vec();
holes.sort_unstable_by_key(|hole| hole.key_range.start);
holes
};
prev = Some(next_key.next());
}
stats.read_lock_held_compute_holes_micros = stats.read_lock_held_key_sort_micros.till_now();
drop_rlock(guard);
stats.read_lock_drop_micros = stats.read_lock_held_compute_holes_micros.till_now();
let mut holes = heap.into_vec();
holes.sort_unstable_by_key(|hole| hole.key_range.start);
let mut next_hole = 0; // index of next hole in holes vector
// This iterator walks through all key-value pairs from all the layers
// we're compacting, in key, LSN order.
// If there's both a Value::Image and Value::WalRecord for the same (key,lsn),
// then the Value::Image is ordered before Value::WalRecord.
//
// TODO(https://github.com/neondatabase/neon/issues/8184): remove the page cached blob_io
// option and validation code once we've reached confidence.
enum AllValuesIter<'a> {
PageCachedBlobIo {
all_keys_iter: VecIter<'a>,
},
StreamingKmergeBypassingPageCache {
merge_iter: MergeIterator<'a>,
},
ValidatingStreamingKmergeBypassingPageCache {
mode: CompactL0BypassPageCacheValidation,
merge_iter: MergeIterator<'a>,
all_keys_iter: VecIter<'a>,
},
}
type VecIter<'a> = std::slice::Iter<'a, DeltaEntry<'a>>; // TODO: distinguished lifetimes
impl AllValuesIter<'_> {
async fn next_all_keys_iter(
iter: &mut VecIter<'_>,
ctx: &RequestContext,
) -> anyhow::Result<Option<(Key, Lsn, Value)>> {
let Some(DeltaEntry {
key,
lsn,
val: value_ref,
..
}) = iter.next()
else {
return Ok(None);
};
let value = value_ref.load(ctx).await?;
Ok(Some((*key, *lsn, value)))
}
async fn next(
&mut self,
ctx: &RequestContext,
) -> anyhow::Result<Option<(Key, Lsn, Value)>> {
match self {
AllValuesIter::PageCachedBlobIo { all_keys_iter: iter } => {
Self::next_all_keys_iter(iter, ctx).await
}
AllValuesIter::StreamingKmergeBypassingPageCache { merge_iter } => merge_iter.next().await,
AllValuesIter::ValidatingStreamingKmergeBypassingPageCache { mode, merge_iter, all_keys_iter } => async {
// advance both iterators
let all_keys_iter_item = Self::next_all_keys_iter(all_keys_iter, ctx).await;
let merge_iter_item = merge_iter.next().await;
// compare results & log warnings as needed
macro_rules! rate_limited_warn {
($($arg:tt)*) => {{
if cfg!(debug_assertions) || cfg!(feature = "testing") {
warn!($($arg)*);
panic!("CompactL0BypassPageCacheValidation failure, check logs");
}
use once_cell::sync::Lazy;
use utils::rate_limit::RateLimit;
use std::sync::Mutex;
use std::time::Duration;
static LOGGED: Lazy<Mutex<RateLimit>> =
Lazy::new(|| Mutex::new(RateLimit::new(Duration::from_secs(10))));
let mut rate_limit = LOGGED.lock().unwrap();
rate_limit.call(|| {
warn!($($arg)*);
});
}}
}
match (&all_keys_iter_item, &merge_iter_item) {
(Err(_), Err(_)) => {
// don't bother asserting equivality of the errors
}
(Err(all_keys), Ok(merge)) => {
rate_limited_warn!(?merge, "all_keys_iter returned an error where merge did not: {all_keys:?}");
},
(Ok(all_keys), Err(merge)) => {
rate_limited_warn!(?all_keys, "merge returned an error where all_keys_iter did not: {merge:?}");
},
(Ok(None), Ok(None)) => { }
(Ok(Some(all_keys)), Ok(None)) => {
rate_limited_warn!(?all_keys, "merge returned None where all_keys_iter returned Some");
}
(Ok(None), Ok(Some(merge))) => {
rate_limited_warn!(?merge, "all_keys_iter returned None where merge returned Some");
}
(Ok(Some((all_keys_key, all_keys_lsn, all_keys_value))), Ok(Some((merge_key, merge_lsn, merge_value)))) => {
match mode {
// TODO: in this mode, we still load the value from disk for both iterators, even though we only need the all_keys_iter one
CompactL0BypassPageCacheValidation::KeyLsn => {
let all_keys = (all_keys_key, all_keys_lsn);
let merge = (merge_key, merge_lsn);
if all_keys != merge {
rate_limited_warn!(?all_keys, ?merge, "merge returned a different (Key,LSN) than all_keys_iter");
}
}
CompactL0BypassPageCacheValidation::KeyLsnValue => {
let all_keys = (all_keys_key, all_keys_lsn, all_keys_value);
let merge = (merge_key, merge_lsn, merge_value);
if all_keys != merge {
rate_limited_warn!(?all_keys, ?merge, "merge returned a different (Key,LSN,Value) than all_keys_iter");
}
}
}
}
}
// in case of mismatch, trust the legacy all_keys_iter_item
all_keys_iter_item
}.instrument(info_span!("next")).await
}
}
}
let mut all_values_iter = match &self.conf.compact_level0_phase1_value_access {
CompactL0Phase1ValueAccess::PageCachedBlobIo => AllValuesIter::PageCachedBlobIo {
all_keys_iter: all_keys.iter(),
},
CompactL0Phase1ValueAccess::StreamingKmerge { validate } => {
let merge_iter = {
let mut deltas = Vec::with_capacity(deltas_to_compact.len());
for l in deltas_to_compact.iter() {
let l = l.get_as_delta(ctx).await.map_err(CompactionError::Other)?;
deltas.push(l);
}
MergeIterator::create(&deltas, &[], ctx)
};
match validate {
None => AllValuesIter::StreamingKmergeBypassingPageCache { merge_iter },
Some(validate) => AllValuesIter::ValidatingStreamingKmergeBypassingPageCache {
mode: validate.clone(),
merge_iter,
all_keys_iter: all_keys.iter(),
},
}
}
};
let all_values_iter = all_keys.iter();
// This iterator walks through all keys and is needed to calculate size used by each key
let mut all_keys_iter = all_keys
@@ -1046,13 +882,12 @@ impl Timeline {
let mut key_values_total_size = 0u64;
let mut dup_start_lsn: Lsn = Lsn::INVALID; // start LSN of layer containing values of the single key
let mut dup_end_lsn: Lsn = Lsn::INVALID; // end LSN of layer containing values of the single key
let mut next_hole = 0; // index of next hole in holes vector
while let Some((key, lsn, value)) = all_values_iter
.next(ctx)
.await
.map_err(CompactionError::Other)?
for &DeltaEntry {
key, lsn, ref val, ..
} in all_values_iter
{
let value = val.load(ctx).await.map_err(CompactionError::Other)?;
let same_key = prev_key.map_or(false, |prev_key| prev_key == key);
// We need to check key boundaries once we reach next key or end of layer with the same key
if !same_key || lsn == dup_end_lsn {
@@ -1104,14 +939,16 @@ impl Timeline {
|| contains_hole
{
// ... if so, flush previous layer and prepare to write new one
new_layers.push(
writer
.take()
.unwrap()
.finish(prev_key.unwrap().next(), self, ctx)
.await
.map_err(CompactionError::Other)?,
);
let (desc, path) = writer
.take()
.unwrap()
.finish(prev_key.unwrap().next(), ctx)
.await
.map_err(CompactionError::Other)?;
let new_delta = Layer::finish_creating(self.conf, self, desc, &path)
.map_err(CompactionError::Other)?;
new_layers.push(new_delta);
writer = None;
if contains_hole {
@@ -1174,12 +1011,13 @@ impl Timeline {
prev_key = Some(key);
}
if let Some(writer) = writer {
new_layers.push(
writer
.finish(prev_key.unwrap().next(), self, ctx)
.await
.map_err(CompactionError::Other)?,
);
let (desc, path) = writer
.finish(prev_key.unwrap().next(), ctx)
.await
.map_err(CompactionError::Other)?;
let new_delta = Layer::finish_creating(self.conf, self, desc, &path)
.map_err(CompactionError::Other)?;
new_layers.push(new_delta);
}
// Sync layers
@@ -1237,10 +1075,6 @@ impl Timeline {
}
}
// Without this, rustc complains about deltas_to_compact still
// being borrowed when we `.into_iter()` below.
drop(all_values_iter);
Ok(CompactLevel0Phase1Result {
new_layers,
deltas_to_compact: deltas_to_compact
@@ -1348,43 +1182,6 @@ impl TryFrom<CompactLevel0Phase1StatsBuilder> for CompactLevel0Phase1Stats {
}
}
#[derive(Debug, PartialEq, Eq, Clone, serde::Deserialize, serde::Serialize)]
#[serde(tag = "mode", rename_all = "kebab-case", deny_unknown_fields)]
pub enum CompactL0Phase1ValueAccess {
/// The old way.
PageCachedBlobIo,
/// The new way.
StreamingKmerge {
/// If set, we run both the old way and the new way, validate that
/// they are identical (=> [`CompactL0BypassPageCacheValidation`]),
/// and if the validation fails,
/// - in tests: fail them with a panic or
/// - in prod, log a rate-limited warning and use the old way's results.
///
/// If not set, we only run the new way and trust its results.
validate: Option<CompactL0BypassPageCacheValidation>,
},
}
/// See [`CompactL0Phase1ValueAccess::StreamingKmerge`].
#[derive(Debug, PartialEq, Eq, Clone, serde::Deserialize, serde::Serialize)]
#[serde(rename_all = "kebab-case")]
pub enum CompactL0BypassPageCacheValidation {
/// Validate that the series of (key, lsn) pairs are the same.
KeyLsn,
/// Validate that the entire output of old and new way is identical.
KeyLsnValue,
}
impl Default for CompactL0Phase1ValueAccess {
fn default() -> Self {
CompactL0Phase1ValueAccess::StreamingKmerge {
// TODO(https://github.com/neondatabase/neon/issues/8184): change to None once confident
validate: Some(CompactL0BypassPageCacheValidation::KeyLsnValue),
}
}
}
impl Timeline {
/// Entry point for new tiered compaction algorithm.
///
@@ -1966,13 +1763,16 @@ impl Timeline {
for (key, lsn, val) in deltas {
delta_layer_writer.put_value(key, lsn, val, ctx).await?;
}
stats.produce_delta_layer(delta_layer_writer.size());
if dry_run {
return Ok(None);
}
let delta_layer = delta_layer_writer
.finish(delta_key.key_range.end, tline, ctx)
let (desc, path) = delta_layer_writer
.finish(delta_key.key_range.end, ctx)
.await?;
let delta_layer = Layer::finish_creating(tline.conf, tline, desc, &path)?;
Ok(Some(FlushDeltaResult::CreateResidentLayer(delta_layer)))
}
@@ -2413,9 +2213,9 @@ impl CompactionJobExecutor for TimelineAdaptor {
))
});
let new_delta_layer = writer
.finish(prev.unwrap().0.next(), &self.timeline, ctx)
.await?;
let (desc, path) = writer.finish(prev.unwrap().0.next(), ctx).await?;
let new_delta_layer =
Layer::finish_creating(self.timeline.conf, &self.timeline, desc, &path)?;
self.new_deltas.push(new_delta_layer);
Ok(())

View File

@@ -488,10 +488,12 @@ async fn copy_lsn_prefix(
// reuse the key instead of adding more holes between layers by using the real
// highest key in the layer.
let reused_highest_key = layer.layer_desc().key_range.end;
let copied = writer
.finish(reused_highest_key, target_timeline, ctx)
let (desc, path) = writer
.finish(reused_highest_key, ctx)
.await
.map_err(CopyDeltaPrefix)?;
let copied = Layer::finish_creating(target_timeline.conf, target_timeline, desc, &path)
.map_err(CopyDeltaPrefix)?;
tracing::debug!(%layer, %copied, "new layer produced");

View File

@@ -225,7 +225,7 @@ impl Timeline {
continue;
}
let last_activity_ts = layer.access_stats().latest_activity();
let last_activity_ts = layer.latest_activity();
let no_activity_for = match now.duration_since(last_activity_ts) {
Ok(d) => d,

View File

@@ -259,13 +259,10 @@ impl LayerManager {
new_layer.layer_desc().lsn_range
);
// Transfer visibilty hint from old to new layer, since the new layer covers the same key space. This is not guaranteed to
// Transfer visibility hint from old to new layer, since the new layer covers the same key space. This is not guaranteed to
// be accurate (as the new layer may cover a different subset of the key range), but is a sensible default, and prevents
// always marking rewritten layers as visible.
new_layer
.as_ref()
.access_stats()
.set_visibility(old_layer.access_stats().visibility());
new_layer.as_ref().set_visibility(old_layer.visibility());
// Safety: we may never rewrite the same file in-place. Callers are responsible
// for ensuring that they only rewrite layers after something changes the path,

View File

@@ -642,8 +642,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
logging::replace_panic_hook_with_tracing_panic_hook().forget();
// initialize sentry if SENTRY_DSN is provided
let _sentry_guard = init_sentry(Some(GIT_VERSION.into()), &[]);
info!("version: {GIT_VERSION}");
info!("build_tag: {BUILD_TAG}");
info!("version: {GIT_VERSION} build_tag: {BUILD_TAG}");
metrics::set_build_info_metric(GIT_VERSION, BUILD_TAG);
// On any shutdown signal, log receival and exit.

View File

@@ -172,8 +172,11 @@ pub(crate) async fn branch_cleanup_and_check_errors(
}
}
BlobDataParseResult::Relic => {}
BlobDataParseResult::Incorrect(parse_errors) => result.errors.extend(
parse_errors
BlobDataParseResult::Incorrect {
errors,
s3_layers: _,
} => result.errors.extend(
errors
.into_iter()
.map(|error| format!("parse error: {error}")),
),
@@ -300,7 +303,10 @@ pub(crate) enum BlobDataParseResult {
},
/// The remains of a deleted Timeline (i.e. an initdb archive only)
Relic,
Incorrect(Vec<String>),
Incorrect {
errors: Vec<String>,
s3_layers: HashSet<(LayerName, Generation)>,
},
}
pub(crate) fn parse_layer_object_name(name: &str) -> Result<(LayerName, Generation), String> {
@@ -443,7 +449,7 @@ pub(crate) async fn list_timeline_blobs(
}
Ok(S3TimelineBlobData {
blob_data: BlobDataParseResult::Incorrect(errors),
blob_data: BlobDataParseResult::Incorrect { errors, s3_layers },
unused_index_keys: index_part_keys,
unknown_keys,
})

View File

@@ -208,21 +208,21 @@ async fn main() -> anyhow::Result<()> {
}
if summary.is_fatal() {
Err(anyhow::anyhow!("Fatal scrub errors detected"))
tracing::error!("Fatal scrub errors detected");
} else if summary.is_empty() {
// Strictly speaking an empty bucket is a valid bucket, but if someone ran the
// scrubber they were likely expecting to scan something, and if we see no timelines
// at all then it's likely due to some configuration issues like a bad prefix
Err(anyhow::anyhow!(
tracing::error!(
"No timelines found in bucket {} prefix {}",
bucket_config.bucket,
bucket_config
.prefix_in_bucket
.unwrap_or("<none>".to_string())
))
} else {
Ok(())
);
}
Ok(())
}
}
}

View File

@@ -389,10 +389,13 @@ async fn gc_ancestor(
// Post-deletion tenant location: don't try and GC it.
continue;
}
BlobDataParseResult::Incorrect(reasons) => {
BlobDataParseResult::Incorrect {
errors,
s3_layers: _, // TODO(yuchen): could still check references to these s3 layers?
} => {
// Our primary purpose isn't to report on bad data, but log this rather than skipping silently
tracing::warn!(
"Skipping ancestor GC for timeline {ttid}, bad metadata: {reasons:?}"
"Skipping ancestor GC for timeline {ttid}, bad metadata: {errors:?}"
);
continue;
}
@@ -518,9 +521,12 @@ pub async fn pageserver_physical_gc(
// Post-deletion tenant location: don't try and GC it.
return Ok(summary);
}
BlobDataParseResult::Incorrect(reasons) => {
BlobDataParseResult::Incorrect {
errors,
s3_layers: _,
} => {
// Our primary purpose isn't to report on bad data, but log this rather than skipping silently
tracing::warn!("Skipping timeline {ttid}, bad metadata: {reasons:?}");
tracing::warn!("Skipping timeline {ttid}, bad metadata: {errors:?}");
return Ok(summary);
}
};

View File

@@ -290,13 +290,21 @@ pub async fn scan_metadata(
}
}
if let BlobDataParseResult::Parsed {
index_part: _index_part,
index_part_generation: _index_part_generation,
s3_layers,
} = &data.blob_data
{
tenant_objects.push(ttid, s3_layers.clone());
match &data.blob_data {
BlobDataParseResult::Parsed {
index_part: _index_part,
index_part_generation: _index_part_generation,
s3_layers,
} => {
tenant_objects.push(ttid, s3_layers.clone());
}
BlobDataParseResult::Relic => (),
BlobDataParseResult::Incorrect {
errors: _,
s3_layers,
} => {
tenant_objects.push(ttid, s3_layers.clone());
}
}
tenant_timeline_results.push((ttid, data));
}

View File

@@ -269,7 +269,7 @@ impl SnapshotDownloader {
.context("Downloading timeline")?;
}
BlobDataParseResult::Relic => {}
BlobDataParseResult::Incorrect(_) => {
BlobDataParseResult::Incorrect { .. } => {
tracing::error!("Bad metadata in timeline {ttid}");
}
};

View File

@@ -978,7 +978,10 @@ class NeonEnvBuilder:
and self.enable_scrub_on_exit
):
try:
self.env.storage_scrubber.scan_metadata()
healthy, _ = self.env.storage_scrubber.scan_metadata()
if not healthy:
e = Exception("Remote storage metadata corrupted")
cleanup_error = e
except Exception as e:
log.error(f"Error during remote storage scrub: {e}")
cleanup_error = e
@@ -4411,14 +4414,19 @@ class StorageScrubber:
assert stdout is not None
return stdout
def scan_metadata(self, post_to_storage_controller: bool = False) -> Any:
def scan_metadata(self, post_to_storage_controller: bool = False) -> Tuple[bool, Any]:
"""
Returns the health status and the metadata summary.
"""
args = ["scan-metadata", "--node-kind", "pageserver", "--json"]
if post_to_storage_controller:
args.append("--post")
stdout = self.scrubber_cli(args, timeout=30)
try:
return json.loads(stdout)
summary = json.loads(stdout)
healthy = not summary["with_errors"] and not summary["with_warnings"]
return healthy, summary
except:
log.error("Failed to decode JSON output from `scan-metadata`. Dumping stdout:")
log.error(stdout)

View File

@@ -61,6 +61,7 @@ class HistoricLayerInfo:
remote: bool
# None for image layers, true if pageserver thinks this is an L0 delta layer
l0: Optional[bool]
visible: bool
@classmethod
def from_json(cls, d: Dict[str, Any]) -> HistoricLayerInfo:
@@ -79,6 +80,7 @@ class HistoricLayerInfo:
lsn_end=d.get("lsn_end"),
remote=d["remote"],
l0=l0_ness,
visible=d["access_stats"]["visible"],
)

View File

@@ -496,11 +496,10 @@ def test_historic_storage_formats(
# Check the scrubber handles this old data correctly (can read it and doesn't consider it corrupt)
#
# Do this _before_ importing to the pageserver, as that import may start writing immediately
metadata_summary = env.storage_scrubber.scan_metadata()
healthy, metadata_summary = env.storage_scrubber.scan_metadata()
assert healthy
assert metadata_summary["tenant_count"] >= 1
assert metadata_summary["timeline_count"] >= 1
assert not metadata_summary["with_errors"]
assert not metadata_summary["with_warnings"]
env.neon_cli.import_tenant(dataset.tenant_id)

View File

@@ -214,12 +214,11 @@ def test_generations_upgrade(neon_env_builder: NeonEnvBuilder):
# Having written a mixture of generation-aware and legacy index_part.json,
# ensure the scrubber handles the situation as expected.
metadata_summary = env.storage_scrubber.scan_metadata()
healthy, metadata_summary = env.storage_scrubber.scan_metadata()
assert metadata_summary["tenant_count"] == 1 # Scrubber should have seen our timeline
assert metadata_summary["timeline_count"] == 1
assert metadata_summary["timeline_shard_count"] == 1
assert not metadata_summary["with_errors"]
assert not metadata_summary["with_warnings"]
assert healthy
def test_deferred_deletion(neon_env_builder: NeonEnvBuilder):

View File

@@ -2,10 +2,11 @@ import json
import os
import random
import time
from typing import Any, Dict, Optional
from pathlib import Path
from typing import Any, Dict, Optional, Union
import pytest
from fixtures.common_types import TenantId, TimelineId
from fixtures.common_types import TenantId, TenantShardId, TimelineId
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnvBuilder, NeonPageserver
from fixtures.pageserver.common_types import parse_layer_file_name
@@ -437,6 +438,35 @@ def test_heatmap_uploads(neon_env_builder: NeonEnvBuilder):
validate_heatmap(heatmap_second)
def list_elegible_layers(
pageserver, tenant_id: Union[TenantId, TenantShardId], timeline_id: TimelineId
) -> list[Path]:
"""
The subset of layer filenames that are elegible for secondary download: at time of writing this
is all resident layers which are also visible.
"""
candidates = pageserver.list_layers(tenant_id, timeline_id)
layer_map = pageserver.http_client().layer_map_info(tenant_id, timeline_id)
# Map of layer filenames to their visibility the "layer name" is not the same as the filename: add suffix to resolve one to the other
visible_map = dict(
(f"{layer.layer_file_name}-v1-00000001", layer.visible)
for layer in layer_map.historic_layers
)
def is_visible(layer_file_name):
try:
return visible_map[str(layer_file_name)]
except KeyError:
# Unexpected: tests should call this when pageservers are in a quiet state such that the layer map
# matches what's on disk.
log.warn(f"Lookup {layer_file_name} from {list(visible_map.keys())}")
raise
return list(c for c in candidates if is_visible(c))
def test_secondary_downloads(neon_env_builder: NeonEnvBuilder):
"""
Test the overall data flow in secondary mode:
@@ -491,7 +521,7 @@ def test_secondary_downloads(neon_env_builder: NeonEnvBuilder):
ps_secondary.http_client().tenant_secondary_download(tenant_id)
assert ps_attached.list_layers(tenant_id, timeline_id) == ps_secondary.list_layers(
assert list_elegible_layers(ps_attached, tenant_id, timeline_id) == ps_secondary.list_layers(
tenant_id, timeline_id
)
@@ -509,9 +539,9 @@ def test_secondary_downloads(neon_env_builder: NeonEnvBuilder):
ps_secondary.http_client().tenant_secondary_download(tenant_id)
try:
assert ps_attached.list_layers(tenant_id, timeline_id) == ps_secondary.list_layers(
tenant_id, timeline_id
)
assert list_elegible_layers(
ps_attached, tenant_id, timeline_id
) == ps_secondary.list_layers(tenant_id, timeline_id)
except:
# Do a full listing of the secondary location on errors, to help debug of
# https://github.com/neondatabase/neon/issues/6966
@@ -532,8 +562,8 @@ def test_secondary_downloads(neon_env_builder: NeonEnvBuilder):
# ==================================================================
try:
log.info("Evicting a layer...")
layer_to_evict = ps_attached.list_layers(tenant_id, timeline_id)[0]
some_other_layer = ps_attached.list_layers(tenant_id, timeline_id)[1]
layer_to_evict = list_elegible_layers(ps_attached, tenant_id, timeline_id)[0]
some_other_layer = list_elegible_layers(ps_attached, tenant_id, timeline_id)[1]
log.info(f"Victim layer: {layer_to_evict.name}")
ps_attached.http_client().evict_layer(
tenant_id, timeline_id, layer_name=layer_to_evict.name
@@ -551,9 +581,9 @@ def test_secondary_downloads(neon_env_builder: NeonEnvBuilder):
ps_secondary.http_client().tenant_secondary_download(tenant_id)
assert layer_to_evict not in ps_attached.list_layers(tenant_id, timeline_id)
assert ps_attached.list_layers(tenant_id, timeline_id) == ps_secondary.list_layers(
tenant_id, timeline_id
)
assert list_elegible_layers(
ps_attached, tenant_id, timeline_id
) == ps_secondary.list_layers(tenant_id, timeline_id)
except:
# On assertion failures, log some details to help with debugging
heatmap = env.pageserver_remote_storage.heatmap_content(tenant_id)
@@ -563,7 +593,8 @@ def test_secondary_downloads(neon_env_builder: NeonEnvBuilder):
# Scrub the remote storage
# ========================
# This confirms that the scrubber isn't upset by the presence of the heatmap
env.storage_scrubber.scan_metadata()
healthy, _ = env.storage_scrubber.scan_metadata()
assert healthy
# Detach secondary and delete tenant
# ===================================

View File

@@ -124,7 +124,8 @@ def test_sharding_smoke(
# Check the scrubber isn't confused by sharded content, then disable
# it during teardown because we'll have deleted by then
env.storage_scrubber.scan_metadata()
healthy, _ = env.storage_scrubber.scan_metadata()
assert healthy
env.storage_controller.pageserver_api().tenant_delete(tenant_id)
assert_prefix_empty(

View File

@@ -516,9 +516,8 @@ def test_scrubber_scan_pageserver_metadata(
assert len(index.layer_metadata) > 0
it = iter(index.layer_metadata.items())
scan_summary = env.storage_scrubber.scan_metadata(post_to_storage_controller=True)
assert not scan_summary["with_warnings"]
assert not scan_summary["with_errors"]
healthy, scan_summary = env.storage_scrubber.scan_metadata(post_to_storage_controller=True)
assert healthy
assert env.storage_controller.metadata_health_is_healthy()
@@ -532,16 +531,18 @@ def test_scrubber_scan_pageserver_metadata(
log.info(f"delete response: {delete_response}")
# Check scan summary without posting to storage controller. Expect it to be a L0 layer so only emit warnings.
scan_summary = env.storage_scrubber.scan_metadata()
_, scan_summary = env.storage_scrubber.scan_metadata()
log.info(f"{pprint.pformat(scan_summary)}")
assert len(scan_summary["with_warnings"]) > 0
assert env.storage_controller.metadata_health_is_healthy()
# Now post to storage controller, expect seeing one unhealthy health record
scan_summary = env.storage_scrubber.scan_metadata(post_to_storage_controller=True)
_, scan_summary = env.storage_scrubber.scan_metadata(post_to_storage_controller=True)
log.info(f"{pprint.pformat(scan_summary)}")
assert len(scan_summary["with_warnings"]) > 0
unhealthy = env.storage_controller.metadata_health_list_unhealthy()["unhealthy_tenant_shards"]
assert len(unhealthy) == 1 and unhealthy[0] == str(tenant_shard_id)
neon_env_builder.disable_scrub_on_exit()

View File

@@ -341,13 +341,13 @@ def test_tenant_delete_scrubber(pg_bin: PgBin, neon_env_builder: NeonEnvBuilder)
wait_for_upload(ps_http, tenant_id, timeline_id, last_flush_lsn)
env.stop()
result = env.storage_scrubber.scan_metadata()
assert result["with_warnings"] == []
healthy, _ = env.storage_scrubber.scan_metadata()
assert healthy
env.start()
ps_http = env.pageserver.http_client()
ps_http.tenant_delete(tenant_id)
env.stop()
env.storage_scrubber.scan_metadata()
assert result["with_warnings"] == []
healthy, _ = env.storage_scrubber.scan_metadata()
assert healthy