Compare commits

..

1 Commits

Author SHA1 Message Date
Arpad Müller
a3ca7eeb23 Set set_launch_timestamp_metric also in safekeepers 2024-08-06 16:23:10 +02:00
38 changed files with 472 additions and 606 deletions

View File

@@ -933,8 +933,7 @@ COPY --from=pgjwt-pg-build /pgjwt.tar.gz /ext-src
#COPY --from=pg-tiktoken-pg-build /home/nonroot/pg_tiktoken.tar.gz /ext-src
COPY --from=hypopg-pg-build /hypopg.tar.gz /ext-src
COPY --from=pg-hashids-pg-build /pg_hashids.tar.gz /ext-src
COPY --from=rum-pg-build /rum.tar.gz /ext-src
COPY patches/rum.patch /ext-src
#COPY --from=rum-pg-build /rum.tar.gz /ext-src
#COPY --from=pgtap-pg-build /pgtap.tar.gz /ext-src
COPY --from=ip4r-pg-build /ip4r.tar.gz /ext-src
COPY --from=prefix-pg-build /prefix.tar.gz /ext-src
@@ -946,7 +945,7 @@ COPY patches/pg_hintplan.patch /ext-src
COPY --from=pg-cron-pg-build /pg_cron.tar.gz /ext-src
COPY patches/pg_cron.patch /ext-src
#COPY --from=pg-pgx-ulid-build /home/nonroot/pgx_ulid.tar.gz /ext-src
#COPY --from=rdkit-pg-build /rdkit.tar.gz /ext-src
COPY --from=rdkit-pg-build /rdkit.tar.gz /ext-src
COPY --from=pg-uuidv7-pg-build /pg_uuidv7.tar.gz /ext-src
COPY --from=pg-roaringbitmap-pg-build /pg_roaringbitmap.tar.gz /ext-src
COPY --from=pg-semver-pg-build /pg_semver.tar.gz /ext-src
@@ -961,7 +960,6 @@ RUN cd /ext-src/ && for f in *.tar.gz; \
rm -rf $dname; mkdir $dname; tar xzf $f --strip-components=1 -C $dname \
|| exit 1; rm -f $f; done
RUN cd /ext-src/pgvector-src && patch -p1 <../pgvector.patch
RUN cd /ext-src/rum-src && patch -p1 <../rum.patch
# cmake is required for the h3 test
RUN apt-get update && apt-get install -y cmake
RUN patch -p1 < /ext-src/pg_hintplan.patch

View File

@@ -78,7 +78,7 @@ for pg_version in 14 15 16; do
docker cp $TMPDIR/data $COMPUTE_CONTAINER_NAME:/ext-src/pg_hint_plan-src/
rm -rf $TMPDIR
# We are running tests now
if docker exec -e SKIP=timescaledb-src,rdkit-src,postgis-src,pgx_ulid-src,pgtap-src,pg_tiktoken-src,pg_jsonschema-src,pg_graphql-src,kq_imcx-src,wal2json_2_5-src \
if docker exec -e SKIP=rum-src,timescaledb-src,rdkit-src,postgis-src,pgx_ulid-src,pgtap-src,pg_tiktoken-src,pg_jsonschema-src,pg_graphql-src,kq_imcx-src,wal2json_2_5-src \
$TEST_CONTAINER_NAME /run-tests.sh | tee testout.txt
then
cleanup

View File

@@ -1,15 +1,15 @@
#!/bin/bash
set -x
cd /ext-src || exit 2
cd /ext-src
FAILED=
LIST=$( (echo "${SKIP//","/"\n"}"; ls -d -- *-src) | sort | uniq -u)
LIST=$((echo ${SKIP} | sed 's/,/\n/g'; ls -d *-src) | sort | uniq -u)
for d in ${LIST}
do
[ -d "${d}" ] || continue
[ -d ${d} ] || continue
psql -c "select 1" >/dev/null || break
USE_PGXS=1 make -C "${d}" installcheck || FAILED="${d} ${FAILED}"
make -C ${d} installcheck || FAILED="${d} ${FAILED}"
done
[ -z "${FAILED}" ] && exit 0
echo "${FAILED}"
echo ${FAILED}
exit 1

View File

@@ -108,7 +108,3 @@ harness = false
[[bench]]
name = "bench_walredo"
harness = false
[[bench]]
name = "bench_ingest"
harness = false

View File

@@ -1,235 +0,0 @@
use std::{env, num::NonZeroUsize};
use bytes::Bytes;
use camino::Utf8PathBuf;
use criterion::{criterion_group, criterion_main, Criterion};
use pageserver::{
config::PageServerConf,
context::{DownloadBehavior, RequestContext},
l0_flush::{L0FlushConfig, L0FlushGlobalState},
page_cache,
repository::Value,
task_mgr::TaskKind,
tenant::storage_layer::InMemoryLayer,
virtual_file::{self, api::IoEngineKind},
};
use pageserver_api::{key::Key, shard::TenantShardId};
use utils::{
bin_ser::BeSer,
id::{TenantId, TimelineId},
};
// A very cheap hash for generating non-sequential keys.
fn murmurhash32(mut h: u32) -> u32 {
h ^= h >> 16;
h = h.wrapping_mul(0x85ebca6b);
h ^= h >> 13;
h = h.wrapping_mul(0xc2b2ae35);
h ^= h >> 16;
h
}
enum KeyLayout {
/// Sequential unique keys
Sequential,
/// Random unique keys
Random,
/// Random keys, but only use the bits from the mask of them
RandomReuse(u32),
}
enum WriteDelta {
Yes,
No,
}
async fn ingest(
conf: &'static PageServerConf,
put_size: usize,
put_count: usize,
key_layout: KeyLayout,
write_delta: WriteDelta,
) -> anyhow::Result<()> {
let mut lsn = utils::lsn::Lsn(1000);
let mut key = Key::from_i128(0x0);
let timeline_id = TimelineId::generate();
let tenant_id = TenantId::generate();
let tenant_shard_id = TenantShardId::unsharded(tenant_id);
tokio::fs::create_dir_all(conf.timeline_path(&tenant_shard_id, &timeline_id)).await?;
let ctx = RequestContext::new(TaskKind::DebugTool, DownloadBehavior::Error);
let layer = InMemoryLayer::create(conf, timeline_id, tenant_shard_id, lsn, &ctx).await?;
let data = Value::Image(Bytes::from(vec![0u8; put_size])).ser()?;
let ctx = RequestContext::new(
pageserver::task_mgr::TaskKind::WalReceiverConnectionHandler,
pageserver::context::DownloadBehavior::Download,
);
for i in 0..put_count {
lsn += put_size as u64;
// Generate lots of keys within a single relation, which simulates the typical bulk ingest case: people
// usually care the most about write performance when they're blasting a huge batch of data into a huge table.
match key_layout {
KeyLayout::Sequential => {
// Use sequential order to illustrate the experience a user is likely to have
// when ingesting bulk data.
key.field6 = i as u32;
}
KeyLayout::Random => {
// Use random-order keys to avoid giving a false advantage to data structures that are
// faster when inserting on the end.
key.field6 = murmurhash32(i as u32);
}
KeyLayout::RandomReuse(mask) => {
// Use low bits only, to limit cardinality
key.field6 = murmurhash32(i as u32) & mask;
}
}
layer.put_value(key, lsn, &data, &ctx).await?;
}
layer.freeze(lsn + 1).await;
if matches!(write_delta, WriteDelta::Yes) {
let l0_flush_state = L0FlushGlobalState::new(L0FlushConfig::Direct {
max_concurrency: NonZeroUsize::new(1).unwrap(),
});
let (_desc, path) = layer
.write_to_disk(&ctx, None, l0_flush_state.inner())
.await?
.unwrap();
tokio::fs::remove_file(path).await?;
}
Ok(())
}
/// Wrapper to instantiate a tokio runtime
fn ingest_main(
conf: &'static PageServerConf,
put_size: usize,
put_count: usize,
key_layout: KeyLayout,
write_delta: WriteDelta,
) {
let runtime = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
runtime.block_on(async move {
let r = ingest(conf, put_size, put_count, key_layout, write_delta).await;
if let Err(e) = r {
panic!("{e:?}");
}
});
}
/// Declare a series of benchmarks for the Pageserver's ingest write path.
///
/// This benchmark does not include WAL decode: it starts at InMemoryLayer::put_value, and ends either
/// at freezing the ephemeral layer, or writing the ephemeral layer out to an L0 (depending on whether WriteDelta is set).
///
/// Genuine disk I/O is used, so expect results to differ depending on storage. However, when running on
/// a fast disk, CPU is the bottleneck at time of writing.
fn criterion_benchmark(c: &mut Criterion) {
let temp_dir_parent: Utf8PathBuf = env::current_dir().unwrap().try_into().unwrap();
let temp_dir = camino_tempfile::tempdir_in(temp_dir_parent).unwrap();
eprintln!("Data directory: {}", temp_dir.path());
let conf: &'static PageServerConf = Box::leak(Box::new(
pageserver::config::PageServerConf::dummy_conf(temp_dir.path().to_path_buf()),
));
virtual_file::init(16384, IoEngineKind::TokioEpollUring);
page_cache::init(conf.page_cache_size);
{
let mut group = c.benchmark_group("ingest-small-values");
let put_size = 100usize;
let put_count = 128 * 1024 * 1024 / put_size;
group.throughput(criterion::Throughput::Bytes((put_size * put_count) as u64));
group.sample_size(10);
group.bench_function("ingest 128MB/100b seq", |b| {
b.iter(|| {
ingest_main(
conf,
put_size,
put_count,
KeyLayout::Sequential,
WriteDelta::Yes,
)
})
});
group.bench_function("ingest 128MB/100b rand", |b| {
b.iter(|| {
ingest_main(
conf,
put_size,
put_count,
KeyLayout::Random,
WriteDelta::Yes,
)
})
});
group.bench_function("ingest 128MB/100b rand-1024keys", |b| {
b.iter(|| {
ingest_main(
conf,
put_size,
put_count,
KeyLayout::RandomReuse(0x3ff),
WriteDelta::Yes,
)
})
});
group.bench_function("ingest 128MB/100b seq, no delta", |b| {
b.iter(|| {
ingest_main(
conf,
put_size,
put_count,
KeyLayout::Sequential,
WriteDelta::No,
)
})
});
}
{
let mut group = c.benchmark_group("ingest-big-values");
let put_size = 8192usize;
let put_count = 128 * 1024 * 1024 / put_size;
group.throughput(criterion::Throughput::Bytes((put_size * put_count) as u64));
group.sample_size(10);
group.bench_function("ingest 128MB/8k seq", |b| {
b.iter(|| {
ingest_main(
conf,
put_size,
put_count,
KeyLayout::Sequential,
WriteDelta::Yes,
)
})
});
group.bench_function("ingest 128MB/8k seq, no delta", |b| {
b.iter(|| {
ingest_main(
conf,
put_size,
put_count,
KeyLayout::Sequential,
WriteDelta::No,
)
})
});
}
}
criterion_group!(benches, criterion_benchmark);
criterion_main!(benches);

View File

@@ -125,6 +125,7 @@ fn main() -> anyhow::Result<()> {
info!(?conf.virtual_file_io_engine, "starting with virtual_file IO engine");
info!(?conf.get_impl, "starting with get page implementation");
info!(?conf.get_vectored_impl, "starting with vectored get page implementation");
info!(?conf.compact_level0_phase1_value_access, "starting with setting for compact_level0_phase1_value_access");
let tenants_path = conf.tenants_path();
if !tenants_path.exists() {

View File

@@ -29,6 +29,7 @@ use utils::{
logging::LogFormat,
};
use crate::tenant::timeline::compaction::CompactL0Phase1ValueAccess;
use crate::tenant::vectored_blob_io::MaxVectoredReadBytes;
use crate::tenant::{config::TenantConfOpt, timeline::GetImpl};
use crate::tenant::{TENANTS_SEGMENT_NAME, TIMELINES_SEGMENT_NAME};
@@ -295,6 +296,10 @@ pub struct PageServerConf {
pub ephemeral_bytes_per_memory_kb: usize,
pub l0_flush: L0FlushConfig,
/// This flag is temporary and will be removed after gradual rollout.
/// See <https://github.com/neondatabase/neon/issues/8184>.
pub compact_level0_phase1_value_access: CompactL0Phase1ValueAccess,
}
/// We do not want to store this in a PageServerConf because the latter may be logged
@@ -401,6 +406,8 @@ struct PageServerConfigBuilder {
ephemeral_bytes_per_memory_kb: BuilderValue<usize>,
l0_flush: BuilderValue<L0FlushConfig>,
compact_level0_phase1_value_access: BuilderValue<CompactL0Phase1ValueAccess>,
}
impl PageServerConfigBuilder {
@@ -490,6 +497,7 @@ impl PageServerConfigBuilder {
validate_vectored_get: Set(DEFAULT_VALIDATE_VECTORED_GET),
ephemeral_bytes_per_memory_kb: Set(DEFAULT_EPHEMERAL_BYTES_PER_MEMORY_KB),
l0_flush: Set(L0FlushConfig::default()),
compact_level0_phase1_value_access: Set(CompactL0Phase1ValueAccess::default()),
}
}
}
@@ -673,6 +681,10 @@ impl PageServerConfigBuilder {
self.l0_flush = BuilderValue::Set(value);
}
pub fn compact_level0_phase1_value_access(&mut self, value: CompactL0Phase1ValueAccess) {
self.compact_level0_phase1_value_access = BuilderValue::Set(value);
}
pub fn build(self, id: NodeId) -> anyhow::Result<PageServerConf> {
let default = Self::default_values();
@@ -730,6 +742,7 @@ impl PageServerConfigBuilder {
image_compression,
ephemeral_bytes_per_memory_kb,
l0_flush,
compact_level0_phase1_value_access,
}
CUSTOM LOGIC
{
@@ -1002,6 +1015,9 @@ impl PageServerConf {
"l0_flush" => {
builder.l0_flush(utils::toml_edit_ext::deserialize_item(item).context("l0_flush")?)
}
"compact_level0_phase1_value_access" => {
builder.compact_level0_phase1_value_access(utils::toml_edit_ext::deserialize_item(item).context("compact_level0_phase1_value_access")?)
}
_ => bail!("unrecognized pageserver option '{key}'"),
}
}
@@ -1086,6 +1102,7 @@ impl PageServerConf {
validate_vectored_get: defaults::DEFAULT_VALIDATE_VECTORED_GET,
ephemeral_bytes_per_memory_kb: defaults::DEFAULT_EPHEMERAL_BYTES_PER_MEMORY_KB,
l0_flush: L0FlushConfig::default(),
compact_level0_phase1_value_access: CompactL0Phase1ValueAccess::default(),
}
}
}
@@ -1327,6 +1344,7 @@ background_task_maximum_delay = '334 s'
image_compression: defaults::DEFAULT_IMAGE_COMPRESSION,
ephemeral_bytes_per_memory_kb: defaults::DEFAULT_EPHEMERAL_BYTES_PER_MEMORY_KB,
l0_flush: L0FlushConfig::default(),
compact_level0_phase1_value_access: CompactL0Phase1ValueAccess::default(),
},
"Correct defaults should be used when no config values are provided"
);
@@ -1401,6 +1419,7 @@ background_task_maximum_delay = '334 s'
image_compression: defaults::DEFAULT_IMAGE_COMPRESSION,
ephemeral_bytes_per_memory_kb: defaults::DEFAULT_EPHEMERAL_BYTES_PER_MEMORY_KB,
l0_flush: L0FlushConfig::default(),
compact_level0_phase1_value_access: CompactL0Phase1ValueAccess::default(),
},
"Should be able to parse all basic config values correctly"
);

View File

@@ -24,7 +24,7 @@ impl Default for L0FlushConfig {
#[derive(Clone)]
pub struct L0FlushGlobalState(Arc<Inner>);
pub enum Inner {
pub(crate) enum Inner {
PageCached,
Direct { semaphore: tokio::sync::Semaphore },
}
@@ -40,7 +40,7 @@ impl L0FlushGlobalState {
}
}
pub fn inner(&self) -> &Arc<Inner> {
pub(crate) fn inner(&self) -> &Arc<Inner> {
&self.0
}
}

View File

@@ -8,8 +8,7 @@ use std::time::Duration;
pub use pageserver_api::key::{Key, KEY_SIZE};
/// A 'value' stored for a one Key.
#[derive(Debug, Clone, Serialize, Deserialize)]
#[cfg_attr(test, derive(PartialEq))]
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub enum Value {
/// An Image value contains a full copy of the value
Image(Bytes),

View File

@@ -296,13 +296,19 @@ where
let mut stack = Vec::new();
stack.push((self.root_blk, None));
let block_cursor = self.reader.block_cursor();
let mut node_buf = [0_u8; PAGE_SZ];
while let Some((node_blknum, opt_iter)) = stack.pop() {
// Locate the node.
let node_buf = block_cursor
// Read the node, through the PS PageCache, into local variable `node_buf`.
// We could keep the page cache read guard alive, but, at the time of writing,
// we run quite small PS PageCache s => can't risk running out of
// PageCache space because this stream isn't consumed fast enough.
let page_read_guard = block_cursor
.read_blk(self.start_blk + node_blknum, ctx)
.await?;
node_buf.copy_from_slice(page_read_guard.as_ref());
drop(page_read_guard); // drop page cache read guard early
let node = OnDiskNode::deparse(node_buf.as_ref())?;
let node = OnDiskNode::deparse(&node_buf)?;
let prefix_len = node.prefix_len as usize;
let suffix_len = node.suffix_len as usize;
@@ -345,6 +351,7 @@ where
Either::Left(idx..node.num_children.into())
};
// idx points to the first match now. Keep going from there
while let Some(idx) = iter.next() {
let key_off = idx * suffix_len;

View File

@@ -539,25 +539,19 @@ impl LayerAccessStats {
self.record_residence_event_at(SystemTime::now())
}
fn record_access_at(&self, now: SystemTime) -> bool {
pub(crate) fn record_access_at(&self, now: SystemTime) {
let (mut mask, mut value) = Self::to_low_res_timestamp(Self::ATIME_SHIFT, now);
// A layer which is accessed must be visible.
mask |= 0x1 << Self::VISIBILITY_SHIFT;
value |= 0x1 << Self::VISIBILITY_SHIFT;
let old_bits = self.write_bits(mask, value);
!matches!(
self.decode_visibility(old_bits),
LayerVisibilityHint::Visible
)
self.write_bits(mask, value);
}
/// Returns true if we modified the layer's visibility to set it to Visible implicitly
/// as a result of this access
pub(crate) fn record_access(&self, ctx: &RequestContext) -> bool {
pub(crate) fn record_access(&self, ctx: &RequestContext) {
if ctx.access_stats_behavior() == AccessStatsBehavior::Skip {
return false;
return;
}
self.record_access_at(SystemTime::now())

View File

@@ -36,12 +36,13 @@ use crate::tenant::block_io::{BlockBuf, BlockCursor, BlockLease, BlockReader, Fi
use crate::tenant::disk_btree::{
DiskBtreeBuilder, DiskBtreeIterator, DiskBtreeReader, VisitDirection,
};
use crate::tenant::storage_layer::Layer;
use crate::tenant::timeline::GetVectoredError;
use crate::tenant::vectored_blob_io::{
BlobFlag, MaxVectoredReadBytes, StreamingVectoredReadPlanner, VectoredBlobReader, VectoredRead,
VectoredReadPlanner,
};
use crate::tenant::PageReconstructError;
use crate::tenant::{PageReconstructError, Timeline};
use crate::virtual_file::{self, VirtualFile};
use crate::{walrecord, TEMP_FILE_SUFFIX};
use crate::{DELTA_FILE_MAGIC, STORAGE_FORMAT_VERSION};
@@ -71,7 +72,10 @@ use utils::{
lsn::Lsn,
};
use super::{AsLayerDesc, LayerName, PersistentLayerDesc, ValuesReconstructState};
use super::{
AsLayerDesc, LayerAccessStats, LayerName, PersistentLayerDesc, ResidentLayer,
ValuesReconstructState,
};
///
/// Header stored in the beginning of the file
@@ -196,6 +200,7 @@ impl DeltaKey {
pub struct DeltaLayer {
path: Utf8PathBuf,
pub desc: PersistentLayerDesc,
access_stats: LayerAccessStats,
inner: OnceCell<Arc<DeltaLayerInner>>,
}
@@ -294,6 +299,7 @@ impl DeltaLayer {
/// not loaded already.
///
async fn load(&self, ctx: &RequestContext) -> Result<&Arc<DeltaLayerInner>> {
self.access_stats.record_access(ctx);
// Quick exit if already loaded
self.inner
.get_or_try_init(|| self.load_inner(ctx))
@@ -344,6 +350,7 @@ impl DeltaLayer {
summary.lsn_range,
metadata.len(),
),
access_stats: Default::default(),
inner: OnceCell::new(),
})
}
@@ -366,6 +373,7 @@ impl DeltaLayer {
/// 3. Call `finish`.
///
struct DeltaLayerWriterInner {
conf: &'static PageServerConf,
pub path: Utf8PathBuf,
timeline_id: TimelineId,
tenant_shard_id: TenantShardId,
@@ -412,6 +420,7 @@ impl DeltaLayerWriterInner {
let tree_builder = DiskBtreeBuilder::new(block_buf);
Ok(Self {
conf,
path,
timeline_id,
tenant_shard_id,
@@ -486,10 +495,11 @@ impl DeltaLayerWriterInner {
async fn finish(
self,
key_end: Key,
timeline: &Arc<Timeline>,
ctx: &RequestContext,
) -> anyhow::Result<(PersistentLayerDesc, Utf8PathBuf)> {
) -> anyhow::Result<ResidentLayer> {
let temp_path = self.path.clone();
let result = self.finish0(key_end, ctx).await;
let result = self.finish0(key_end, timeline, ctx).await;
if result.is_err() {
tracing::info!(%temp_path, "cleaning up temporary file after error during writing");
if let Err(e) = std::fs::remove_file(&temp_path) {
@@ -502,8 +512,9 @@ impl DeltaLayerWriterInner {
async fn finish0(
self,
key_end: Key,
timeline: &Arc<Timeline>,
ctx: &RequestContext,
) -> anyhow::Result<(PersistentLayerDesc, Utf8PathBuf)> {
) -> anyhow::Result<ResidentLayer> {
let index_start_blk =
((self.blob_writer.size() + PAGE_SZ as u64 - 1) / PAGE_SZ as u64) as u32;
@@ -568,9 +579,11 @@ impl DeltaLayerWriterInner {
// fsync the file
file.sync_all().await?;
trace!("created delta layer {}", self.path);
let layer = Layer::finish_creating(self.conf, timeline, desc, &self.path)?;
Ok((desc, self.path))
trace!("created delta layer {}", layer.local_path());
Ok(layer)
}
}
@@ -671,9 +684,14 @@ impl DeltaLayerWriter {
pub(crate) async fn finish(
mut self,
key_end: Key,
timeline: &Arc<Timeline>,
ctx: &RequestContext,
) -> anyhow::Result<(PersistentLayerDesc, Utf8PathBuf)> {
self.inner.take().unwrap().finish(key_end, ctx).await
) -> anyhow::Result<ResidentLayer> {
self.inner
.take()
.unwrap()
.finish(key_end, timeline, ctx)
.await
}
#[cfg(test)]
@@ -1580,9 +1598,8 @@ pub(crate) mod test {
use super::*;
use crate::repository::Value;
use crate::tenant::harness::TIMELINE_ID;
use crate::tenant::storage_layer::{Layer, ResidentLayer};
use crate::tenant::vectored_blob_io::StreamingVectoredReadPlanner;
use crate::tenant::{Tenant, Timeline};
use crate::tenant::Tenant;
use crate::{
context::DownloadBehavior,
task_mgr::TaskKind,
@@ -1876,8 +1893,9 @@ pub(crate) mod test {
res?;
}
let (desc, path) = writer.finish(entries_meta.key_range.end, &ctx).await?;
let resident = Layer::finish_creating(harness.conf, &timeline, desc, &path)?;
let resident = writer
.finish(entries_meta.key_range.end, &timeline, &ctx)
.await?;
let inner = resident.get_as_delta(&ctx).await?;
@@ -2066,8 +2084,7 @@ pub(crate) mod test {
.await
.unwrap();
let (desc, path) = writer.finish(Key::MAX, ctx).await.unwrap();
let copied_layer = Layer::finish_creating(tenant.conf, &branch, desc, &path).unwrap();
let copied_layer = writer.finish(Key::MAX, &branch, ctx).await.unwrap();
copied_layer.get_as_delta(ctx).await.unwrap();
@@ -2195,9 +2212,7 @@ pub(crate) mod test {
for (key, lsn, value) in deltas {
writer.put_value(key, lsn, value, ctx).await?;
}
let (desc, path) = writer.finish(key_end, ctx).await?;
let delta_layer = Layer::finish_creating(tenant.conf, tline, desc, &path)?;
let delta_layer = writer.finish(key_end, tline, ctx).await?;
Ok::<_, anyhow::Error>(delta_layer)
}

View File

@@ -32,6 +32,7 @@ use crate::tenant::block_io::{BlockBuf, BlockReader, FileBlockReader};
use crate::tenant::disk_btree::{
DiskBtreeBuilder, DiskBtreeIterator, DiskBtreeReader, VisitDirection,
};
use crate::tenant::storage_layer::LayerAccessStats;
use crate::tenant::timeline::GetVectoredError;
use crate::tenant::vectored_blob_io::{
BlobFlag, MaxVectoredReadBytes, StreamingVectoredReadPlanner, VectoredBlobReader, VectoredRead,
@@ -134,6 +135,7 @@ pub struct ImageLayer {
pub desc: PersistentLayerDesc,
// This entry contains an image of all pages as of this LSN, should be the same as desc.lsn
pub lsn: Lsn,
access_stats: LayerAccessStats,
inner: OnceCell<ImageLayerInner>,
}
@@ -251,6 +253,7 @@ impl ImageLayer {
/// not loaded already.
///
async fn load(&self, ctx: &RequestContext) -> Result<&ImageLayerInner> {
self.access_stats.record_access(ctx);
self.inner
.get_or_try_init(|| self.load_inner(ctx))
.await
@@ -301,6 +304,7 @@ impl ImageLayer {
metadata.len(),
), // Now we assume image layer ALWAYS covers the full range. This may change in the future.
lsn: summary.lsn,
access_stats: Default::default(),
inner: OnceCell::new(),
})
}

View File

@@ -11,10 +11,9 @@ use crate::repository::{Key, Value};
use crate::tenant::block_io::{BlockCursor, BlockReader, BlockReaderRef};
use crate::tenant::ephemeral_file::EphemeralFile;
use crate::tenant::timeline::GetVectoredError;
use crate::tenant::PageReconstructError;
use crate::tenant::{PageReconstructError, Timeline};
use crate::{l0_flush, page_cache, walrecord};
use anyhow::{anyhow, Result};
use camino::Utf8PathBuf;
use pageserver_api::keyspace::KeySpace;
use pageserver_api::models::InMemoryLayerInfo;
use pageserver_api::shard::TenantShardId;
@@ -33,9 +32,7 @@ use std::sync::atomic::Ordering as AtomicOrdering;
use std::sync::atomic::{AtomicU64, AtomicUsize};
use tokio::sync::{RwLock, RwLockWriteGuard};
use super::{
DeltaLayerWriter, PersistentLayerDesc, ValueReconstructSituation, ValuesReconstructState,
};
use super::{DeltaLayerWriter, ResidentLayer, ValueReconstructSituation, ValuesReconstructState};
#[derive(Debug, PartialEq, Eq, Clone, Copy, Hash)]
pub(crate) struct InMemoryLayerFileId(page_cache::FileId);
@@ -413,7 +410,8 @@ impl InMemoryLayer {
/// Common subroutine of the public put_wal_record() and put_page_image() functions.
/// Adds the page version to the in-memory tree
pub async fn put_value(
pub(crate) async fn put_value(
&self,
key: Key,
lsn: Lsn,
@@ -478,6 +476,8 @@ impl InMemoryLayer {
/// Records the end_lsn for non-dropped layers.
/// `end_lsn` is exclusive
pub async fn freeze(&self, end_lsn: Lsn) {
let inner = self.inner.write().await;
assert!(
self.start_lsn < end_lsn,
"{} >= {}",
@@ -495,13 +495,9 @@ impl InMemoryLayer {
})
.expect("frozen_local_path_str set only once");
#[cfg(debug_assertions)]
{
let inner = self.inner.write().await;
for vec_map in inner.index.values() {
for (lsn, _pos) in vec_map.as_slice() {
assert!(*lsn < end_lsn);
}
for vec_map in inner.index.values() {
for (lsn, _pos) in vec_map.as_slice() {
assert!(*lsn < end_lsn);
}
}
}
@@ -511,12 +507,12 @@ impl InMemoryLayer {
/// if there are no matching keys.
///
/// Returns a new delta layer with all the same data as this in-memory layer
pub async fn write_to_disk(
pub(crate) async fn write_to_disk(
&self,
timeline: &Arc<Timeline>,
ctx: &RequestContext,
key_range: Option<Range<Key>>,
l0_flush_global_state: &l0_flush::Inner,
) -> Result<Option<(PersistentLayerDesc, Utf8PathBuf)>> {
) -> Result<Option<ResidentLayer>> {
// Grab the lock in read-mode. We hold it over the I/O, but because this
// layer is not writeable anymore, no one should be trying to acquire the
// write lock on it, so we shouldn't block anyone. There's one exception
@@ -528,8 +524,9 @@ impl InMemoryLayer {
// rare though, so we just accept the potential latency hit for now.
let inner = self.inner.read().await;
let l0_flush_global_state = timeline.l0_flush_global_state.inner().clone();
use l0_flush::Inner;
let _concurrency_permit = match l0_flush_global_state {
let _concurrency_permit = match &*l0_flush_global_state {
Inner::PageCached => None,
Inner::Direct { semaphore, .. } => Some(semaphore.acquire().await),
};
@@ -559,7 +556,7 @@ impl InMemoryLayer {
)
.await?;
match l0_flush_global_state {
match &*l0_flush_global_state {
l0_flush::Inner::PageCached => {
let ctx = RequestContextBuilder::extend(ctx)
.page_content_kind(PageContentKind::InMemoryLayer)
@@ -624,7 +621,7 @@ impl InMemoryLayer {
}
// MAX is used here because we identify L0 layers by full key range
let (desc, path) = delta_layer_writer.finish(Key::MAX, ctx).await?;
let delta_layer = delta_layer_writer.finish(Key::MAX, timeline, ctx).await?;
// Hold the permit until all the IO is done, including the fsync in `delta_layer_writer.finish()``.
//
@@ -636,6 +633,6 @@ impl InMemoryLayer {
// we dirtied when writing to the filesystem have been flushed and marked !dirty.
drop(_concurrency_permit);
Ok(Some((desc, path)))
Ok(Some(delta_layer))
}
}

View File

@@ -316,7 +316,7 @@ impl Layer {
other => GetVectoredError::Other(anyhow::anyhow!(other)),
})?;
self.record_access(ctx);
self.0.access_stats.record_access(ctx);
layer
.get_values_reconstruct_data(keyspace, lsn_range, reconstruct_data, &self.0, ctx)
@@ -396,12 +396,8 @@ impl Layer {
self.0.info(reset)
}
pub(crate) fn latest_activity(&self) -> SystemTime {
self.0.access_stats.latest_activity()
}
pub(crate) fn visibility(&self) -> LayerVisibilityHint {
self.0.access_stats.visibility()
pub(crate) fn access_stats(&self) -> &LayerAccessStats {
&self.0.access_stats
}
pub(crate) fn local_path(&self) -> &Utf8Path {
@@ -451,31 +447,13 @@ impl Layer {
}
}
fn record_access(&self, ctx: &RequestContext) {
if self.0.access_stats.record_access(ctx) {
// Visibility was modified to Visible
tracing::info!(
"Layer {} became visible as a result of access",
self.0.desc.key()
);
if let Some(tl) = self.0.timeline.upgrade() {
tl.metrics
.visible_physical_size_gauge
.add(self.0.desc.file_size)
}
}
}
pub(crate) fn set_visibility(&self, visibility: LayerVisibilityHint) {
let old_visibility = self.0.access_stats.set_visibility(visibility.clone());
let old_visibility = self.access_stats().set_visibility(visibility.clone());
use LayerVisibilityHint::*;
match (old_visibility, visibility) {
(Visible, Covered) => {
// Subtract this layer's contribution to the visible size metric
if let Some(tl) = self.0.timeline.upgrade() {
debug_assert!(
tl.metrics.visible_physical_size_gauge.get() >= self.0.desc.file_size
);
tl.metrics
.visible_physical_size_gauge
.sub(self.0.desc.file_size)
@@ -693,9 +671,6 @@ impl Drop for LayerInner {
}
if matches!(self.access_stats.visibility(), LayerVisibilityHint::Visible) {
debug_assert!(
timeline.metrics.visible_physical_size_gauge.get() >= self.desc.file_size
);
timeline
.metrics
.visible_physical_size_gauge
@@ -1835,7 +1810,7 @@ impl ResidentLayer {
// this is valid because the DownloadedLayer::kind is a OnceCell, not a
// Mutex<OnceCell>, so we cannot go and deinitialize the value with OnceCell::take
// while it's being held.
self.owner.record_access(ctx);
owner.access_stats.record_access(ctx);
delta_layer::DeltaLayerInner::load_keys(d, ctx)
.await

View File

@@ -4,7 +4,6 @@ use bytes::Bytes;
use pageserver_api::key::{Key, KEY_SIZE};
use utils::{id::TimelineId, lsn::Lsn, shard::TenantShardId};
use crate::tenant::storage_layer::Layer;
use crate::{config::PageServerConf, context::RequestContext, repository::Value, tenant::Timeline};
use super::{DeltaLayerWriter, ImageLayerWriter, ResidentLayer};
@@ -174,9 +173,8 @@ impl SplitDeltaLayerWriter {
)
.await?;
let prev_delta_writer = std::mem::replace(&mut self.inner, next_delta_writer);
let (desc, path) = prev_delta_writer.finish(key, ctx).await?;
let delta_layer = Layer::finish_creating(self.conf, tline, desc, &path)?;
self.generated_layers.push(delta_layer);
self.generated_layers
.push(prev_delta_writer.finish(key, tline, ctx).await?);
}
self.inner.put_value(key, lsn, val, ctx).await
}
@@ -192,10 +190,7 @@ impl SplitDeltaLayerWriter {
inner,
..
} = self;
let (desc, path) = inner.finish(end_key, ctx).await?;
let delta_layer = Layer::finish_creating(self.conf, tline, desc, &path)?;
generated_layers.push(delta_layer);
generated_layers.push(inner.finish(end_key, tline, ctx).await?);
Ok(generated_layers)
}

View File

@@ -407,16 +407,9 @@ async fn gc_loop(tenant: Arc<Tenant>, cancel: CancellationToken) {
error_run_count += 1;
let wait_duration = Duration::from_secs_f64(wait_duration);
if matches!(e, crate::tenant::GcError::TimelineCancelled) {
// Timeline was cancelled during gc. We might either be in an event
// that affects the entire tenant (tenant deletion, pageserver shutdown),
// or in one that affects the timeline only (timeline deletion).
// Therefore, don't exit the loop.
info!("Gc failed {error_run_count} times, retrying in {wait_duration:?}: {e:?}");
} else {
error!("Gc failed {error_run_count} times, retrying in {wait_duration:?}: {e:?}");
}
error!(
"Gc failed {error_run_count} times, retrying in {wait_duration:?}: {e:?}",
);
wait_duration
}
}

View File

@@ -59,7 +59,7 @@ use std::{
collections::{BTreeMap, HashMap, HashSet},
sync::atomic::AtomicU64,
};
use std::{cmp::min, cmp::Ordering, ops::ControlFlow};
use std::{cmp::min, ops::ControlFlow};
use std::{
collections::btree_map::Entry,
ops::{Deref, Range},
@@ -137,7 +137,7 @@ use self::layer_manager::LayerManager;
use self::logical_size::LogicalSize;
use self::walreceiver::{WalReceiver, WalReceiverConf};
use super::{config::TenantConf, storage_layer::LayerVisibilityHint, upload_queue::NotInitialized};
use super::{config::TenantConf, upload_queue::NotInitialized};
use super::{debug_assert_current_span_has_tenant_and_timeline_id, AttachedTenantConf};
use super::{remote_timeline_client::index::IndexPart, storage_layer::LayerFringe};
use super::{
@@ -180,25 +180,6 @@ impl std::fmt::Display for ImageLayerCreationMode {
}
}
/// Wrapper for key range to provide reverse ordering by range length for BinaryHeap
#[derive(Debug, Clone, PartialEq, Eq)]
pub(crate) struct Hole {
key_range: Range<Key>,
coverage_size: usize,
}
impl Ord for Hole {
fn cmp(&self, other: &Self) -> Ordering {
other.coverage_size.cmp(&self.coverage_size) // inverse order
}
}
impl PartialOrd for Hole {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
/// Temporary function for immutable storage state refactor, ensures we are dropping mutex guard instead of other things.
/// Can be removed after all refactors are done.
fn drop_rlock<T>(rlock: tokio::sync::RwLockReadGuard<T>) {
@@ -2938,22 +2919,14 @@ impl Timeline {
let guard = self.layers.read().await;
let resident = guard.likely_resident_layers().filter_map(|layer| {
match layer.visibility() {
LayerVisibilityHint::Visible => {
// Layer is visible to one or more read LSNs: elegible for inclusion in layer map
let last_activity_ts = layer.latest_activity();
Some(HeatMapLayer::new(
layer.layer_desc().layer_name(),
layer.metadata(),
last_activity_ts,
))
}
LayerVisibilityHint::Covered => {
// Layer is resident but unlikely to be read: not elegible for inclusion in heatmap.
None
}
}
let resident = guard.likely_resident_layers().map(|layer| {
let last_activity_ts = layer.access_stats().latest_activity();
HeatMapLayer::new(
layer.layer_desc().layer_name(),
layer.metadata(),
last_activity_ts,
)
});
let layers = resident.collect();
@@ -3728,14 +3701,12 @@ impl Timeline {
let frozen_layer = Arc::clone(frozen_layer);
let ctx = ctx.attached_child();
let work = async move {
let Some((desc, path)) = frozen_layer
.write_to_disk(&ctx, key_range, self_clone.l0_flush_global_state.inner())
let Some(new_delta) = frozen_layer
.write_to_disk(&self_clone, &ctx, key_range)
.await?
else {
return Ok(None);
};
let new_delta = Layer::finish_creating(self_clone.conf, &self_clone, desc, &path)?;
// The write_to_disk() above calls writer.finish() which already did the fsync of the inodes.
// We just need to fsync the directory in which these inodes are linked,
// which we know to be the timeline directory.
@@ -5211,7 +5182,7 @@ impl Timeline {
let file_size = layer.layer_desc().file_size;
max_layer_size = max_layer_size.map_or(Some(file_size), |m| Some(m.max(file_size)));
let last_activity_ts = layer.latest_activity();
let last_activity_ts = layer.access_stats().latest_activity();
EvictionCandidate {
layer: layer.into(),
@@ -5368,8 +5339,9 @@ impl Timeline {
for (key, lsn, val) in deltas.data {
delta_layer_writer.put_value(key, lsn, val, ctx).await?;
}
let (desc, path) = delta_layer_writer.finish(deltas.key_range.end, ctx).await?;
let delta_layer = Layer::finish_creating(self.conf, self, desc, &path)?;
let delta_layer = delta_layer_writer
.finish(deltas.key_range.end, self, ctx)
.await?;
{
let mut guard = self.layers.write().await;

View File

@@ -35,8 +35,8 @@ use crate::tenant::storage_layer::merge_iterator::MergeIterator;
use crate::tenant::storage_layer::{
AsLayerDesc, PersistentLayerDesc, PersistentLayerKey, ValueReconstructState,
};
use crate::tenant::timeline::ImageLayerCreationOutcome;
use crate::tenant::timeline::{drop_rlock, DeltaLayerWriter, ImageLayerWriter};
use crate::tenant::timeline::{Hole, ImageLayerCreationOutcome};
use crate::tenant::timeline::{Layer, ResidentLayer};
use crate::tenant::DeltaLayer;
use crate::virtual_file::{MaybeFatalIo, VirtualFile};
@@ -752,66 +752,230 @@ impl Timeline {
.read_lock_held_spawn_blocking_startup_micros
.till_now();
// Determine N largest holes where N is number of compacted layers.
let max_holes = deltas_to_compact.len();
let last_record_lsn = self.get_last_record_lsn();
let min_hole_range = (target_file_size / page_cache::PAGE_SZ as u64) as i128;
let min_hole_coverage_size = 3; // TODO: something more flexible?
// min-heap (reserve space for one more element added before eviction)
let mut heap: BinaryHeap<Hole> = BinaryHeap::with_capacity(max_holes + 1);
let mut prev: Option<Key> = None;
let mut all_keys = Vec::new();
for l in deltas_to_compact.iter() {
all_keys.extend(l.load_keys(ctx).await.map_err(CompactionError::Other)?);
}
// FIXME: should spawn_blocking the rest of this function
// The current stdlib sorting implementation is designed in a way where it is
// particularly fast where the slice is made up of sorted sub-ranges.
all_keys.sort_by_key(|DeltaEntry { key, lsn, .. }| (*key, *lsn));
// TODO: replace with streaming k-merge
let all_keys = {
let mut all_keys = Vec::new();
for l in deltas_to_compact.iter() {
all_keys.extend(l.load_keys(ctx).await.map_err(CompactionError::Other)?);
}
// The current stdlib sorting implementation is designed in a way where it is
// particularly fast where the slice is made up of sorted sub-ranges.
all_keys.sort_by_key(|DeltaEntry { key, lsn, .. }| (*key, *lsn));
all_keys
};
stats.read_lock_held_key_sort_micros = stats.read_lock_held_prerequisites_micros.till_now();
for &DeltaEntry { key: next_key, .. } in all_keys.iter() {
if let Some(prev_key) = prev {
// just first fast filter, do not create hole entries for metadata keys. The last hole in the
// compaction is the gap between data key and metadata keys.
if next_key.to_i128() - prev_key.to_i128() >= min_hole_range
&& !Key::is_metadata_key(&prev_key)
{
let key_range = prev_key..next_key;
// Measuring hole by just subtraction of i128 representation of key range boundaries
// has not so much sense, because largest holes will corresponds field1/field2 changes.
// But we are mostly interested to eliminate holes which cause generation of excessive image layers.
// That is why it is better to measure size of hole as number of covering image layers.
let coverage_size = layers.image_coverage(&key_range, last_record_lsn).len();
if coverage_size >= min_hole_coverage_size {
heap.push(Hole {
key_range,
coverage_size,
});
if heap.len() > max_holes {
heap.pop(); // remove smallest hole
// Determine N largest holes where N is number of compacted layers. The vec is sorted by key range start.
//
// A hole is a key range for which this compaction doesn't have any WAL records.
// Our goal in this compaction iteration is to avoid creating L1s that, in terms of their key range,
// cover the hole, but actually don't contain any WAL records for that key range.
// The reason is that the mere stack of L1s (`count_deltas`) triggers image layer creation (`create_image_layers`).
// That image layer creation would be useless for a hole range covered by L1s that don't contain any WAL records.
//
// The algorithm chooses holes as follows.
// - Slide a 2-window over the keys in key orde to get the hole range (=distance between two keys).
// - Filter: min threshold on range length
// - Rank: by coverage size (=number of image layers required to reconstruct each key in the range for which we have any data)
//
// For more details, intuition, and some ASCII art see https://github.com/neondatabase/neon/pull/3597#discussion_r1112704451
#[derive(PartialEq, Eq)]
struct Hole {
key_range: Range<Key>,
coverage_size: usize,
}
let holes: Vec<Hole> = {
use std::cmp::Ordering;
impl Ord for Hole {
fn cmp(&self, other: &Self) -> Ordering {
self.coverage_size.cmp(&other.coverage_size).reverse()
}
}
impl PartialOrd for Hole {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
let max_holes = deltas_to_compact.len();
let last_record_lsn = self.get_last_record_lsn();
let min_hole_range = (target_file_size / page_cache::PAGE_SZ as u64) as i128;
let min_hole_coverage_size = 3; // TODO: something more flexible?
// min-heap (reserve space for one more element added before eviction)
let mut heap: BinaryHeap<Hole> = BinaryHeap::with_capacity(max_holes + 1);
let mut prev: Option<Key> = None;
for &DeltaEntry { key: next_key, .. } in all_keys.iter() {
if let Some(prev_key) = prev {
// just first fast filter, do not create hole entries for metadata keys. The last hole in the
// compaction is the gap between data key and metadata keys.
if next_key.to_i128() - prev_key.to_i128() >= min_hole_range
&& !Key::is_metadata_key(&prev_key)
{
let key_range = prev_key..next_key;
// Measuring hole by just subtraction of i128 representation of key range boundaries
// has not so much sense, because largest holes will corresponds field1/field2 changes.
// But we are mostly interested to eliminate holes which cause generation of excessive image layers.
// That is why it is better to measure size of hole as number of covering image layers.
let coverage_size =
layers.image_coverage(&key_range, last_record_lsn).len();
if coverage_size >= min_hole_coverage_size {
heap.push(Hole {
key_range,
coverage_size,
});
if heap.len() > max_holes {
heap.pop(); // remove smallest hole
}
}
}
}
prev = Some(next_key.next());
}
prev = Some(next_key.next());
}
let mut holes = heap.into_vec();
holes.sort_unstable_by_key(|hole| hole.key_range.start);
holes
};
stats.read_lock_held_compute_holes_micros = stats.read_lock_held_key_sort_micros.till_now();
drop_rlock(guard);
stats.read_lock_drop_micros = stats.read_lock_held_compute_holes_micros.till_now();
let mut holes = heap.into_vec();
holes.sort_unstable_by_key(|hole| hole.key_range.start);
let mut next_hole = 0; // index of next hole in holes vector
// This iterator walks through all key-value pairs from all the layers
// we're compacting, in key, LSN order.
let all_values_iter = all_keys.iter();
// If there's both a Value::Image and Value::WalRecord for the same (key,lsn),
// then the Value::Image is ordered before Value::WalRecord.
//
// TODO(https://github.com/neondatabase/neon/issues/8184): remove the page cached blob_io
// option and validation code once we've reached confidence.
enum AllValuesIter<'a> {
PageCachedBlobIo {
all_keys_iter: VecIter<'a>,
},
StreamingKmergeBypassingPageCache {
merge_iter: MergeIterator<'a>,
},
ValidatingStreamingKmergeBypassingPageCache {
mode: CompactL0BypassPageCacheValidation,
merge_iter: MergeIterator<'a>,
all_keys_iter: VecIter<'a>,
},
}
type VecIter<'a> = std::slice::Iter<'a, DeltaEntry<'a>>; // TODO: distinguished lifetimes
impl AllValuesIter<'_> {
async fn next_all_keys_iter(
iter: &mut VecIter<'_>,
ctx: &RequestContext,
) -> anyhow::Result<Option<(Key, Lsn, Value)>> {
let Some(DeltaEntry {
key,
lsn,
val: value_ref,
..
}) = iter.next()
else {
return Ok(None);
};
let value = value_ref.load(ctx).await?;
Ok(Some((*key, *lsn, value)))
}
async fn next(
&mut self,
ctx: &RequestContext,
) -> anyhow::Result<Option<(Key, Lsn, Value)>> {
match self {
AllValuesIter::PageCachedBlobIo { all_keys_iter: iter } => {
Self::next_all_keys_iter(iter, ctx).await
}
AllValuesIter::StreamingKmergeBypassingPageCache { merge_iter } => merge_iter.next().await,
AllValuesIter::ValidatingStreamingKmergeBypassingPageCache { mode, merge_iter, all_keys_iter } => async {
// advance both iterators
let all_keys_iter_item = Self::next_all_keys_iter(all_keys_iter, ctx).await;
let merge_iter_item = merge_iter.next().await;
// compare results & log warnings as needed
macro_rules! rate_limited_warn {
($($arg:tt)*) => {{
if cfg!(debug_assertions) || cfg!(feature = "testing") {
warn!($($arg)*);
panic!("CompactL0BypassPageCacheValidation failure, check logs");
}
use once_cell::sync::Lazy;
use utils::rate_limit::RateLimit;
use std::sync::Mutex;
use std::time::Duration;
static LOGGED: Lazy<Mutex<RateLimit>> =
Lazy::new(|| Mutex::new(RateLimit::new(Duration::from_secs(10))));
let mut rate_limit = LOGGED.lock().unwrap();
rate_limit.call(|| {
warn!($($arg)*);
});
}}
}
match (&all_keys_iter_item, &merge_iter_item) {
(Err(_), Err(_)) => {
// don't bother asserting equivality of the errors
}
(Err(all_keys), Ok(merge)) => {
rate_limited_warn!(?merge, "all_keys_iter returned an error where merge did not: {all_keys:?}");
},
(Ok(all_keys), Err(merge)) => {
rate_limited_warn!(?all_keys, "merge returned an error where all_keys_iter did not: {merge:?}");
},
(Ok(None), Ok(None)) => { }
(Ok(Some(all_keys)), Ok(None)) => {
rate_limited_warn!(?all_keys, "merge returned None where all_keys_iter returned Some");
}
(Ok(None), Ok(Some(merge))) => {
rate_limited_warn!(?merge, "all_keys_iter returned None where merge returned Some");
}
(Ok(Some((all_keys_key, all_keys_lsn, all_keys_value))), Ok(Some((merge_key, merge_lsn, merge_value)))) => {
match mode {
// TODO: in this mode, we still load the value from disk for both iterators, even though we only need the all_keys_iter one
CompactL0BypassPageCacheValidation::KeyLsn => {
let all_keys = (all_keys_key, all_keys_lsn);
let merge = (merge_key, merge_lsn);
if all_keys != merge {
rate_limited_warn!(?all_keys, ?merge, "merge returned a different (Key,LSN) than all_keys_iter");
}
}
CompactL0BypassPageCacheValidation::KeyLsnValue => {
let all_keys = (all_keys_key, all_keys_lsn, all_keys_value);
let merge = (merge_key, merge_lsn, merge_value);
if all_keys != merge {
rate_limited_warn!(?all_keys, ?merge, "merge returned a different (Key,LSN,Value) than all_keys_iter");
}
}
}
}
}
// in case of mismatch, trust the legacy all_keys_iter_item
all_keys_iter_item
}.instrument(info_span!("next")).await
}
}
}
let mut all_values_iter = match &self.conf.compact_level0_phase1_value_access {
CompactL0Phase1ValueAccess::PageCachedBlobIo => AllValuesIter::PageCachedBlobIo {
all_keys_iter: all_keys.iter(),
},
CompactL0Phase1ValueAccess::StreamingKmerge { validate } => {
let merge_iter = {
let mut deltas = Vec::with_capacity(deltas_to_compact.len());
for l in deltas_to_compact.iter() {
let l = l.get_as_delta(ctx).await.map_err(CompactionError::Other)?;
deltas.push(l);
}
MergeIterator::create(&deltas, &[], ctx)
};
match validate {
None => AllValuesIter::StreamingKmergeBypassingPageCache { merge_iter },
Some(validate) => AllValuesIter::ValidatingStreamingKmergeBypassingPageCache {
mode: validate.clone(),
merge_iter,
all_keys_iter: all_keys.iter(),
},
}
}
};
// This iterator walks through all keys and is needed to calculate size used by each key
let mut all_keys_iter = all_keys
@@ -882,12 +1046,13 @@ impl Timeline {
let mut key_values_total_size = 0u64;
let mut dup_start_lsn: Lsn = Lsn::INVALID; // start LSN of layer containing values of the single key
let mut dup_end_lsn: Lsn = Lsn::INVALID; // end LSN of layer containing values of the single key
let mut next_hole = 0; // index of next hole in holes vector
for &DeltaEntry {
key, lsn, ref val, ..
} in all_values_iter
while let Some((key, lsn, value)) = all_values_iter
.next(ctx)
.await
.map_err(CompactionError::Other)?
{
let value = val.load(ctx).await.map_err(CompactionError::Other)?;
let same_key = prev_key.map_or(false, |prev_key| prev_key == key);
// We need to check key boundaries once we reach next key or end of layer with the same key
if !same_key || lsn == dup_end_lsn {
@@ -939,16 +1104,14 @@ impl Timeline {
|| contains_hole
{
// ... if so, flush previous layer and prepare to write new one
let (desc, path) = writer
.take()
.unwrap()
.finish(prev_key.unwrap().next(), ctx)
.await
.map_err(CompactionError::Other)?;
let new_delta = Layer::finish_creating(self.conf, self, desc, &path)
.map_err(CompactionError::Other)?;
new_layers.push(new_delta);
new_layers.push(
writer
.take()
.unwrap()
.finish(prev_key.unwrap().next(), self, ctx)
.await
.map_err(CompactionError::Other)?,
);
writer = None;
if contains_hole {
@@ -1011,13 +1174,12 @@ impl Timeline {
prev_key = Some(key);
}
if let Some(writer) = writer {
let (desc, path) = writer
.finish(prev_key.unwrap().next(), ctx)
.await
.map_err(CompactionError::Other)?;
let new_delta = Layer::finish_creating(self.conf, self, desc, &path)
.map_err(CompactionError::Other)?;
new_layers.push(new_delta);
new_layers.push(
writer
.finish(prev_key.unwrap().next(), self, ctx)
.await
.map_err(CompactionError::Other)?,
);
}
// Sync layers
@@ -1075,6 +1237,10 @@ impl Timeline {
}
}
// Without this, rustc complains about deltas_to_compact still
// being borrowed when we `.into_iter()` below.
drop(all_values_iter);
Ok(CompactLevel0Phase1Result {
new_layers,
deltas_to_compact: deltas_to_compact
@@ -1182,6 +1348,43 @@ impl TryFrom<CompactLevel0Phase1StatsBuilder> for CompactLevel0Phase1Stats {
}
}
#[derive(Debug, PartialEq, Eq, Clone, serde::Deserialize, serde::Serialize)]
#[serde(tag = "mode", rename_all = "kebab-case", deny_unknown_fields)]
pub enum CompactL0Phase1ValueAccess {
/// The old way.
PageCachedBlobIo,
/// The new way.
StreamingKmerge {
/// If set, we run both the old way and the new way, validate that
/// they are identical (=> [`CompactL0BypassPageCacheValidation`]),
/// and if the validation fails,
/// - in tests: fail them with a panic or
/// - in prod, log a rate-limited warning and use the old way's results.
///
/// If not set, we only run the new way and trust its results.
validate: Option<CompactL0BypassPageCacheValidation>,
},
}
/// See [`CompactL0Phase1ValueAccess::StreamingKmerge`].
#[derive(Debug, PartialEq, Eq, Clone, serde::Deserialize, serde::Serialize)]
#[serde(rename_all = "kebab-case")]
pub enum CompactL0BypassPageCacheValidation {
/// Validate that the series of (key, lsn) pairs are the same.
KeyLsn,
/// Validate that the entire output of old and new way is identical.
KeyLsnValue,
}
impl Default for CompactL0Phase1ValueAccess {
fn default() -> Self {
CompactL0Phase1ValueAccess::StreamingKmerge {
// TODO(https://github.com/neondatabase/neon/issues/8184): change to None once confident
validate: Some(CompactL0BypassPageCacheValidation::KeyLsnValue),
}
}
}
impl Timeline {
/// Entry point for new tiered compaction algorithm.
///
@@ -1763,16 +1966,13 @@ impl Timeline {
for (key, lsn, val) in deltas {
delta_layer_writer.put_value(key, lsn, val, ctx).await?;
}
stats.produce_delta_layer(delta_layer_writer.size());
if dry_run {
return Ok(None);
}
let (desc, path) = delta_layer_writer
.finish(delta_key.key_range.end, ctx)
let delta_layer = delta_layer_writer
.finish(delta_key.key_range.end, tline, ctx)
.await?;
let delta_layer = Layer::finish_creating(tline.conf, tline, desc, &path)?;
Ok(Some(FlushDeltaResult::CreateResidentLayer(delta_layer)))
}
@@ -2213,9 +2413,9 @@ impl CompactionJobExecutor for TimelineAdaptor {
))
});
let (desc, path) = writer.finish(prev.unwrap().0.next(), ctx).await?;
let new_delta_layer =
Layer::finish_creating(self.timeline.conf, &self.timeline, desc, &path)?;
let new_delta_layer = writer
.finish(prev.unwrap().0.next(), &self.timeline, ctx)
.await?;
self.new_deltas.push(new_delta_layer);
Ok(())

View File

@@ -488,12 +488,10 @@ async fn copy_lsn_prefix(
// reuse the key instead of adding more holes between layers by using the real
// highest key in the layer.
let reused_highest_key = layer.layer_desc().key_range.end;
let (desc, path) = writer
.finish(reused_highest_key, ctx)
let copied = writer
.finish(reused_highest_key, target_timeline, ctx)
.await
.map_err(CopyDeltaPrefix)?;
let copied = Layer::finish_creating(target_timeline.conf, target_timeline, desc, &path)
.map_err(CopyDeltaPrefix)?;
tracing::debug!(%layer, %copied, "new layer produced");

View File

@@ -225,7 +225,7 @@ impl Timeline {
continue;
}
let last_activity_ts = layer.latest_activity();
let last_activity_ts = layer.access_stats().latest_activity();
let no_activity_for = match now.duration_since(last_activity_ts) {
Ok(d) => d,

View File

@@ -259,10 +259,13 @@ impl LayerManager {
new_layer.layer_desc().lsn_range
);
// Transfer visibility hint from old to new layer, since the new layer covers the same key space. This is not guaranteed to
// Transfer visibilty hint from old to new layer, since the new layer covers the same key space. This is not guaranteed to
// be accurate (as the new layer may cover a different subset of the key range), but is a sensible default, and prevents
// always marking rewritten layers as visible.
new_layer.as_ref().set_visibility(old_layer.visibility());
new_layer
.as_ref()
.access_stats()
.set_visibility(old_layer.access_stats().visibility());
// Safety: we may never rewrite the same file in-place. Callers are responsible
// for ensuring that they only rewrite layers after something changes the path,

View File

@@ -7,6 +7,7 @@ use clap::{ArgAction, Parser};
use futures::future::BoxFuture;
use futures::stream::FuturesUnordered;
use futures::{FutureExt, StreamExt};
use metrics::launch_timestamp::{set_launch_timestamp_metric, LaunchTimestamp};
use remote_storage::RemoteStorageConfig;
use sd_notify::NotifyState;
use tokio::runtime::Handle;
@@ -204,6 +205,7 @@ fn opt_pathbuf_parser(s: &str) -> Result<Utf8PathBuf, String> {
#[tokio::main(flavor = "current_thread")]
async fn main() -> anyhow::Result<()> {
let launch_ts = Box::leak(Box::new(LaunchTimestamp::generate()));
// We want to allow multiple occurences of the same arg (taking the last) so
// that neon_local could generate command with defaults + overrides without
// getting 'argument cannot be used multiple times' error. This seems to be
@@ -356,14 +358,14 @@ async fn main() -> anyhow::Result<()> {
Some(GIT_VERSION.into()),
&[("node_id", &conf.my_id.to_string())],
);
start_safekeeper(conf).await
start_safekeeper(launch_ts, conf).await
}
/// Result of joining any of main tasks: upper error means task failed to
/// complete, e.g. panicked, inner is error produced by task itself.
type JoinTaskRes = Result<anyhow::Result<()>, JoinError>;
async fn start_safekeeper(conf: SafeKeeperConf) -> Result<()> {
async fn start_safekeeper(launch_ts: &'static LaunchTimestamp, conf: SafeKeeperConf) -> Result<()> {
// Prevent running multiple safekeepers on the same directory
let lock_file_path = conf.workdir.join(PID_FILE_NAME);
let lock_file =
@@ -491,6 +493,7 @@ async fn start_safekeeper(conf: SafeKeeperConf) -> Result<()> {
tasks_handles.push(Box::pin(broker_task_handle));
set_build_info_metric(GIT_VERSION, BUILD_TAG);
set_launch_timestamp_metric(launch_ts);
// TODO: update tokio-stream, convert to real async Stream with
// SignalStream, map it to obtain missing signal name, combine streams into

View File

@@ -642,7 +642,8 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
logging::replace_panic_hook_with_tracing_panic_hook().forget();
// initialize sentry if SENTRY_DSN is provided
let _sentry_guard = init_sentry(Some(GIT_VERSION.into()), &[]);
info!("version: {GIT_VERSION} build_tag: {BUILD_TAG}");
info!("version: {GIT_VERSION}");
info!("build_tag: {BUILD_TAG}");
metrics::set_build_info_metric(GIT_VERSION, BUILD_TAG);
// On any shutdown signal, log receival and exit.

View File

@@ -172,11 +172,8 @@ pub(crate) async fn branch_cleanup_and_check_errors(
}
}
BlobDataParseResult::Relic => {}
BlobDataParseResult::Incorrect {
errors,
s3_layers: _,
} => result.errors.extend(
errors
BlobDataParseResult::Incorrect(parse_errors) => result.errors.extend(
parse_errors
.into_iter()
.map(|error| format!("parse error: {error}")),
),
@@ -303,10 +300,7 @@ pub(crate) enum BlobDataParseResult {
},
/// The remains of a deleted Timeline (i.e. an initdb archive only)
Relic,
Incorrect {
errors: Vec<String>,
s3_layers: HashSet<(LayerName, Generation)>,
},
Incorrect(Vec<String>),
}
pub(crate) fn parse_layer_object_name(name: &str) -> Result<(LayerName, Generation), String> {
@@ -449,7 +443,7 @@ pub(crate) async fn list_timeline_blobs(
}
Ok(S3TimelineBlobData {
blob_data: BlobDataParseResult::Incorrect { errors, s3_layers },
blob_data: BlobDataParseResult::Incorrect(errors),
unused_index_keys: index_part_keys,
unknown_keys,
})

View File

@@ -208,21 +208,21 @@ async fn main() -> anyhow::Result<()> {
}
if summary.is_fatal() {
tracing::error!("Fatal scrub errors detected");
Err(anyhow::anyhow!("Fatal scrub errors detected"))
} else if summary.is_empty() {
// Strictly speaking an empty bucket is a valid bucket, but if someone ran the
// scrubber they were likely expecting to scan something, and if we see no timelines
// at all then it's likely due to some configuration issues like a bad prefix
tracing::error!(
Err(anyhow::anyhow!(
"No timelines found in bucket {} prefix {}",
bucket_config.bucket,
bucket_config
.prefix_in_bucket
.unwrap_or("<none>".to_string())
);
))
} else {
Ok(())
}
Ok(())
}
}
}

View File

@@ -389,13 +389,10 @@ async fn gc_ancestor(
// Post-deletion tenant location: don't try and GC it.
continue;
}
BlobDataParseResult::Incorrect {
errors,
s3_layers: _, // TODO(yuchen): could still check references to these s3 layers?
} => {
BlobDataParseResult::Incorrect(reasons) => {
// Our primary purpose isn't to report on bad data, but log this rather than skipping silently
tracing::warn!(
"Skipping ancestor GC for timeline {ttid}, bad metadata: {errors:?}"
"Skipping ancestor GC for timeline {ttid}, bad metadata: {reasons:?}"
);
continue;
}
@@ -521,12 +518,9 @@ pub async fn pageserver_physical_gc(
// Post-deletion tenant location: don't try and GC it.
return Ok(summary);
}
BlobDataParseResult::Incorrect {
errors,
s3_layers: _,
} => {
BlobDataParseResult::Incorrect(reasons) => {
// Our primary purpose isn't to report on bad data, but log this rather than skipping silently
tracing::warn!("Skipping timeline {ttid}, bad metadata: {errors:?}");
tracing::warn!("Skipping timeline {ttid}, bad metadata: {reasons:?}");
return Ok(summary);
}
};

View File

@@ -290,21 +290,13 @@ pub async fn scan_metadata(
}
}
match &data.blob_data {
BlobDataParseResult::Parsed {
index_part: _index_part,
index_part_generation: _index_part_generation,
s3_layers,
} => {
tenant_objects.push(ttid, s3_layers.clone());
}
BlobDataParseResult::Relic => (),
BlobDataParseResult::Incorrect {
errors: _,
s3_layers,
} => {
tenant_objects.push(ttid, s3_layers.clone());
}
if let BlobDataParseResult::Parsed {
index_part: _index_part,
index_part_generation: _index_part_generation,
s3_layers,
} = &data.blob_data
{
tenant_objects.push(ttid, s3_layers.clone());
}
tenant_timeline_results.push((ttid, data));
}

View File

@@ -269,7 +269,7 @@ impl SnapshotDownloader {
.context("Downloading timeline")?;
}
BlobDataParseResult::Relic => {}
BlobDataParseResult::Incorrect { .. } => {
BlobDataParseResult::Incorrect(_) => {
tracing::error!("Bad metadata in timeline {ttid}");
}
};

View File

@@ -978,10 +978,7 @@ class NeonEnvBuilder:
and self.enable_scrub_on_exit
):
try:
healthy, _ = self.env.storage_scrubber.scan_metadata()
if not healthy:
e = Exception("Remote storage metadata corrupted")
cleanup_error = e
self.env.storage_scrubber.scan_metadata()
except Exception as e:
log.error(f"Error during remote storage scrub: {e}")
cleanup_error = e
@@ -4414,19 +4411,14 @@ class StorageScrubber:
assert stdout is not None
return stdout
def scan_metadata(self, post_to_storage_controller: bool = False) -> Tuple[bool, Any]:
"""
Returns the health status and the metadata summary.
"""
def scan_metadata(self, post_to_storage_controller: bool = False) -> Any:
args = ["scan-metadata", "--node-kind", "pageserver", "--json"]
if post_to_storage_controller:
args.append("--post")
stdout = self.scrubber_cli(args, timeout=30)
try:
summary = json.loads(stdout)
healthy = not summary["with_errors"] and not summary["with_warnings"]
return healthy, summary
return json.loads(stdout)
except:
log.error("Failed to decode JSON output from `scan-metadata`. Dumping stdout:")
log.error(stdout)

View File

@@ -61,7 +61,6 @@ class HistoricLayerInfo:
remote: bool
# None for image layers, true if pageserver thinks this is an L0 delta layer
l0: Optional[bool]
visible: bool
@classmethod
def from_json(cls, d: Dict[str, Any]) -> HistoricLayerInfo:
@@ -80,7 +79,6 @@ class HistoricLayerInfo:
lsn_end=d.get("lsn_end"),
remote=d["remote"],
l0=l0_ness,
visible=d["access_stats"]["visible"],
)

View File

@@ -217,11 +217,7 @@ def test_storage_controller_many_tenants(
# A reconciler operation: migrate a shard.
shard_number = rng.randint(0, shard_count - 1)
tenant_shard_id = TenantShardId(tenant_id, shard_number, shard_count)
# Migrate it to its secondary location
desc = env.storage_controller.tenant_describe(tenant_id)
dest_ps_id = desc["shards"][shard_number]["node_secondary"][0]
dest_ps_id = rng.choice([ps.id for ps in env.pageservers])
f = executor.submit(
env.storage_controller.tenant_shard_migrate, tenant_shard_id, dest_ps_id
)
@@ -235,11 +231,7 @@ def test_storage_controller_many_tenants(
for f in futs:
f.result()
# Some of the operations above (notably migrations) might leave the controller in a state where it has
# some work to do, for example optimizing shard placement after we do a random migration. Wait for the system
# to reach a quiescent state before doing following checks.
env.storage_controller.reconcile_until_idle()
# Consistency check is safe here: all the previous operations waited for reconcile before completing
env.storage_controller.consistency_check()
check_memory()

View File

@@ -496,10 +496,11 @@ def test_historic_storage_formats(
# Check the scrubber handles this old data correctly (can read it and doesn't consider it corrupt)
#
# Do this _before_ importing to the pageserver, as that import may start writing immediately
healthy, metadata_summary = env.storage_scrubber.scan_metadata()
assert healthy
metadata_summary = env.storage_scrubber.scan_metadata()
assert metadata_summary["tenant_count"] >= 1
assert metadata_summary["timeline_count"] >= 1
assert not metadata_summary["with_errors"]
assert not metadata_summary["with_warnings"]
env.neon_cli.import_tenant(dataset.tenant_id)

View File

@@ -214,11 +214,12 @@ def test_generations_upgrade(neon_env_builder: NeonEnvBuilder):
# Having written a mixture of generation-aware and legacy index_part.json,
# ensure the scrubber handles the situation as expected.
healthy, metadata_summary = env.storage_scrubber.scan_metadata()
metadata_summary = env.storage_scrubber.scan_metadata()
assert metadata_summary["tenant_count"] == 1 # Scrubber should have seen our timeline
assert metadata_summary["timeline_count"] == 1
assert metadata_summary["timeline_shard_count"] == 1
assert healthy
assert not metadata_summary["with_errors"]
assert not metadata_summary["with_warnings"]
def test_deferred_deletion(neon_env_builder: NeonEnvBuilder):

View File

@@ -2,11 +2,10 @@ import json
import os
import random
import time
from pathlib import Path
from typing import Any, Dict, Optional, Union
from typing import Any, Dict, Optional
import pytest
from fixtures.common_types import TenantId, TenantShardId, TimelineId
from fixtures.common_types import TenantId, TimelineId
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnvBuilder, NeonPageserver
from fixtures.pageserver.common_types import parse_layer_file_name
@@ -438,35 +437,6 @@ def test_heatmap_uploads(neon_env_builder: NeonEnvBuilder):
validate_heatmap(heatmap_second)
def list_elegible_layers(
pageserver, tenant_id: Union[TenantId, TenantShardId], timeline_id: TimelineId
) -> list[Path]:
"""
The subset of layer filenames that are elegible for secondary download: at time of writing this
is all resident layers which are also visible.
"""
candidates = pageserver.list_layers(tenant_id, timeline_id)
layer_map = pageserver.http_client().layer_map_info(tenant_id, timeline_id)
# Map of layer filenames to their visibility the "layer name" is not the same as the filename: add suffix to resolve one to the other
visible_map = dict(
(f"{layer.layer_file_name}-v1-00000001", layer.visible)
for layer in layer_map.historic_layers
)
def is_visible(layer_file_name):
try:
return visible_map[str(layer_file_name)]
except KeyError:
# Unexpected: tests should call this when pageservers are in a quiet state such that the layer map
# matches what's on disk.
log.warn(f"Lookup {layer_file_name} from {list(visible_map.keys())}")
raise
return list(c for c in candidates if is_visible(c))
def test_secondary_downloads(neon_env_builder: NeonEnvBuilder):
"""
Test the overall data flow in secondary mode:
@@ -521,7 +491,7 @@ def test_secondary_downloads(neon_env_builder: NeonEnvBuilder):
ps_secondary.http_client().tenant_secondary_download(tenant_id)
assert list_elegible_layers(ps_attached, tenant_id, timeline_id) == ps_secondary.list_layers(
assert ps_attached.list_layers(tenant_id, timeline_id) == ps_secondary.list_layers(
tenant_id, timeline_id
)
@@ -539,9 +509,9 @@ def test_secondary_downloads(neon_env_builder: NeonEnvBuilder):
ps_secondary.http_client().tenant_secondary_download(tenant_id)
try:
assert list_elegible_layers(
ps_attached, tenant_id, timeline_id
) == ps_secondary.list_layers(tenant_id, timeline_id)
assert ps_attached.list_layers(tenant_id, timeline_id) == ps_secondary.list_layers(
tenant_id, timeline_id
)
except:
# Do a full listing of the secondary location on errors, to help debug of
# https://github.com/neondatabase/neon/issues/6966
@@ -562,8 +532,8 @@ def test_secondary_downloads(neon_env_builder: NeonEnvBuilder):
# ==================================================================
try:
log.info("Evicting a layer...")
layer_to_evict = list_elegible_layers(ps_attached, tenant_id, timeline_id)[0]
some_other_layer = list_elegible_layers(ps_attached, tenant_id, timeline_id)[1]
layer_to_evict = ps_attached.list_layers(tenant_id, timeline_id)[0]
some_other_layer = ps_attached.list_layers(tenant_id, timeline_id)[1]
log.info(f"Victim layer: {layer_to_evict.name}")
ps_attached.http_client().evict_layer(
tenant_id, timeline_id, layer_name=layer_to_evict.name
@@ -581,9 +551,9 @@ def test_secondary_downloads(neon_env_builder: NeonEnvBuilder):
ps_secondary.http_client().tenant_secondary_download(tenant_id)
assert layer_to_evict not in ps_attached.list_layers(tenant_id, timeline_id)
assert list_elegible_layers(
ps_attached, tenant_id, timeline_id
) == ps_secondary.list_layers(tenant_id, timeline_id)
assert ps_attached.list_layers(tenant_id, timeline_id) == ps_secondary.list_layers(
tenant_id, timeline_id
)
except:
# On assertion failures, log some details to help with debugging
heatmap = env.pageserver_remote_storage.heatmap_content(tenant_id)
@@ -593,8 +563,7 @@ def test_secondary_downloads(neon_env_builder: NeonEnvBuilder):
# Scrub the remote storage
# ========================
# This confirms that the scrubber isn't upset by the presence of the heatmap
healthy, _ = env.storage_scrubber.scan_metadata()
assert healthy
env.storage_scrubber.scan_metadata()
# Detach secondary and delete tenant
# ===================================

View File

@@ -124,8 +124,7 @@ def test_sharding_smoke(
# Check the scrubber isn't confused by sharded content, then disable
# it during teardown because we'll have deleted by then
healthy, _ = env.storage_scrubber.scan_metadata()
assert healthy
env.storage_scrubber.scan_metadata()
env.storage_controller.pageserver_api().tenant_delete(tenant_id)
assert_prefix_empty(

View File

@@ -516,8 +516,9 @@ def test_scrubber_scan_pageserver_metadata(
assert len(index.layer_metadata) > 0
it = iter(index.layer_metadata.items())
healthy, scan_summary = env.storage_scrubber.scan_metadata(post_to_storage_controller=True)
assert healthy
scan_summary = env.storage_scrubber.scan_metadata(post_to_storage_controller=True)
assert not scan_summary["with_warnings"]
assert not scan_summary["with_errors"]
assert env.storage_controller.metadata_health_is_healthy()
@@ -531,18 +532,16 @@ def test_scrubber_scan_pageserver_metadata(
log.info(f"delete response: {delete_response}")
# Check scan summary without posting to storage controller. Expect it to be a L0 layer so only emit warnings.
_, scan_summary = env.storage_scrubber.scan_metadata()
scan_summary = env.storage_scrubber.scan_metadata()
log.info(f"{pprint.pformat(scan_summary)}")
assert len(scan_summary["with_warnings"]) > 0
assert env.storage_controller.metadata_health_is_healthy()
# Now post to storage controller, expect seeing one unhealthy health record
_, scan_summary = env.storage_scrubber.scan_metadata(post_to_storage_controller=True)
scan_summary = env.storage_scrubber.scan_metadata(post_to_storage_controller=True)
log.info(f"{pprint.pformat(scan_summary)}")
assert len(scan_summary["with_warnings"]) > 0
unhealthy = env.storage_controller.metadata_health_list_unhealthy()["unhealthy_tenant_shards"]
assert len(unhealthy) == 1 and unhealthy[0] == str(tenant_shard_id)
neon_env_builder.disable_scrub_on_exit()

View File

@@ -341,13 +341,13 @@ def test_tenant_delete_scrubber(pg_bin: PgBin, neon_env_builder: NeonEnvBuilder)
wait_for_upload(ps_http, tenant_id, timeline_id, last_flush_lsn)
env.stop()
healthy, _ = env.storage_scrubber.scan_metadata()
assert healthy
result = env.storage_scrubber.scan_metadata()
assert result["with_warnings"] == []
env.start()
ps_http = env.pageserver.http_client()
ps_http.tenant_delete(tenant_id)
env.stop()
healthy, _ = env.storage_scrubber.scan_metadata()
assert healthy
env.storage_scrubber.scan_metadata()
assert result["with_warnings"] == []