mirror of
https://github.com/neondatabase/neon.git
synced 2026-02-11 06:30:37 +00:00
Compare commits
1 Commits
skyzh/reve
...
arpad/safe
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
a3ca7eeb23 |
@@ -933,8 +933,7 @@ COPY --from=pgjwt-pg-build /pgjwt.tar.gz /ext-src
|
||||
#COPY --from=pg-tiktoken-pg-build /home/nonroot/pg_tiktoken.tar.gz /ext-src
|
||||
COPY --from=hypopg-pg-build /hypopg.tar.gz /ext-src
|
||||
COPY --from=pg-hashids-pg-build /pg_hashids.tar.gz /ext-src
|
||||
COPY --from=rum-pg-build /rum.tar.gz /ext-src
|
||||
COPY patches/rum.patch /ext-src
|
||||
#COPY --from=rum-pg-build /rum.tar.gz /ext-src
|
||||
#COPY --from=pgtap-pg-build /pgtap.tar.gz /ext-src
|
||||
COPY --from=ip4r-pg-build /ip4r.tar.gz /ext-src
|
||||
COPY --from=prefix-pg-build /prefix.tar.gz /ext-src
|
||||
@@ -946,7 +945,7 @@ COPY patches/pg_hintplan.patch /ext-src
|
||||
COPY --from=pg-cron-pg-build /pg_cron.tar.gz /ext-src
|
||||
COPY patches/pg_cron.patch /ext-src
|
||||
#COPY --from=pg-pgx-ulid-build /home/nonroot/pgx_ulid.tar.gz /ext-src
|
||||
#COPY --from=rdkit-pg-build /rdkit.tar.gz /ext-src
|
||||
COPY --from=rdkit-pg-build /rdkit.tar.gz /ext-src
|
||||
COPY --from=pg-uuidv7-pg-build /pg_uuidv7.tar.gz /ext-src
|
||||
COPY --from=pg-roaringbitmap-pg-build /pg_roaringbitmap.tar.gz /ext-src
|
||||
COPY --from=pg-semver-pg-build /pg_semver.tar.gz /ext-src
|
||||
@@ -961,7 +960,6 @@ RUN cd /ext-src/ && for f in *.tar.gz; \
|
||||
rm -rf $dname; mkdir $dname; tar xzf $f --strip-components=1 -C $dname \
|
||||
|| exit 1; rm -f $f; done
|
||||
RUN cd /ext-src/pgvector-src && patch -p1 <../pgvector.patch
|
||||
RUN cd /ext-src/rum-src && patch -p1 <../rum.patch
|
||||
# cmake is required for the h3 test
|
||||
RUN apt-get update && apt-get install -y cmake
|
||||
RUN patch -p1 < /ext-src/pg_hintplan.patch
|
||||
|
||||
@@ -78,7 +78,7 @@ for pg_version in 14 15 16; do
|
||||
docker cp $TMPDIR/data $COMPUTE_CONTAINER_NAME:/ext-src/pg_hint_plan-src/
|
||||
rm -rf $TMPDIR
|
||||
# We are running tests now
|
||||
if docker exec -e SKIP=timescaledb-src,rdkit-src,postgis-src,pgx_ulid-src,pgtap-src,pg_tiktoken-src,pg_jsonschema-src,pg_graphql-src,kq_imcx-src,wal2json_2_5-src \
|
||||
if docker exec -e SKIP=rum-src,timescaledb-src,rdkit-src,postgis-src,pgx_ulid-src,pgtap-src,pg_tiktoken-src,pg_jsonschema-src,pg_graphql-src,kq_imcx-src,wal2json_2_5-src \
|
||||
$TEST_CONTAINER_NAME /run-tests.sh | tee testout.txt
|
||||
then
|
||||
cleanup
|
||||
|
||||
@@ -1,15 +1,15 @@
|
||||
#!/bin/bash
|
||||
set -x
|
||||
|
||||
cd /ext-src || exit 2
|
||||
cd /ext-src
|
||||
FAILED=
|
||||
LIST=$( (echo "${SKIP//","/"\n"}"; ls -d -- *-src) | sort | uniq -u)
|
||||
LIST=$((echo ${SKIP} | sed 's/,/\n/g'; ls -d *-src) | sort | uniq -u)
|
||||
for d in ${LIST}
|
||||
do
|
||||
[ -d "${d}" ] || continue
|
||||
[ -d ${d} ] || continue
|
||||
psql -c "select 1" >/dev/null || break
|
||||
USE_PGXS=1 make -C "${d}" installcheck || FAILED="${d} ${FAILED}"
|
||||
make -C ${d} installcheck || FAILED="${d} ${FAILED}"
|
||||
done
|
||||
[ -z "${FAILED}" ] && exit 0
|
||||
echo "${FAILED}"
|
||||
echo ${FAILED}
|
||||
exit 1
|
||||
@@ -108,7 +108,3 @@ harness = false
|
||||
[[bench]]
|
||||
name = "bench_walredo"
|
||||
harness = false
|
||||
|
||||
[[bench]]
|
||||
name = "bench_ingest"
|
||||
harness = false
|
||||
|
||||
@@ -1,235 +0,0 @@
|
||||
use std::{env, num::NonZeroUsize};
|
||||
|
||||
use bytes::Bytes;
|
||||
use camino::Utf8PathBuf;
|
||||
use criterion::{criterion_group, criterion_main, Criterion};
|
||||
use pageserver::{
|
||||
config::PageServerConf,
|
||||
context::{DownloadBehavior, RequestContext},
|
||||
l0_flush::{L0FlushConfig, L0FlushGlobalState},
|
||||
page_cache,
|
||||
repository::Value,
|
||||
task_mgr::TaskKind,
|
||||
tenant::storage_layer::InMemoryLayer,
|
||||
virtual_file::{self, api::IoEngineKind},
|
||||
};
|
||||
use pageserver_api::{key::Key, shard::TenantShardId};
|
||||
use utils::{
|
||||
bin_ser::BeSer,
|
||||
id::{TenantId, TimelineId},
|
||||
};
|
||||
|
||||
// A very cheap hash for generating non-sequential keys.
|
||||
fn murmurhash32(mut h: u32) -> u32 {
|
||||
h ^= h >> 16;
|
||||
h = h.wrapping_mul(0x85ebca6b);
|
||||
h ^= h >> 13;
|
||||
h = h.wrapping_mul(0xc2b2ae35);
|
||||
h ^= h >> 16;
|
||||
h
|
||||
}
|
||||
|
||||
enum KeyLayout {
|
||||
/// Sequential unique keys
|
||||
Sequential,
|
||||
/// Random unique keys
|
||||
Random,
|
||||
/// Random keys, but only use the bits from the mask of them
|
||||
RandomReuse(u32),
|
||||
}
|
||||
|
||||
enum WriteDelta {
|
||||
Yes,
|
||||
No,
|
||||
}
|
||||
|
||||
async fn ingest(
|
||||
conf: &'static PageServerConf,
|
||||
put_size: usize,
|
||||
put_count: usize,
|
||||
key_layout: KeyLayout,
|
||||
write_delta: WriteDelta,
|
||||
) -> anyhow::Result<()> {
|
||||
let mut lsn = utils::lsn::Lsn(1000);
|
||||
let mut key = Key::from_i128(0x0);
|
||||
|
||||
let timeline_id = TimelineId::generate();
|
||||
let tenant_id = TenantId::generate();
|
||||
let tenant_shard_id = TenantShardId::unsharded(tenant_id);
|
||||
|
||||
tokio::fs::create_dir_all(conf.timeline_path(&tenant_shard_id, &timeline_id)).await?;
|
||||
|
||||
let ctx = RequestContext::new(TaskKind::DebugTool, DownloadBehavior::Error);
|
||||
|
||||
let layer = InMemoryLayer::create(conf, timeline_id, tenant_shard_id, lsn, &ctx).await?;
|
||||
|
||||
let data = Value::Image(Bytes::from(vec![0u8; put_size])).ser()?;
|
||||
let ctx = RequestContext::new(
|
||||
pageserver::task_mgr::TaskKind::WalReceiverConnectionHandler,
|
||||
pageserver::context::DownloadBehavior::Download,
|
||||
);
|
||||
|
||||
for i in 0..put_count {
|
||||
lsn += put_size as u64;
|
||||
|
||||
// Generate lots of keys within a single relation, which simulates the typical bulk ingest case: people
|
||||
// usually care the most about write performance when they're blasting a huge batch of data into a huge table.
|
||||
match key_layout {
|
||||
KeyLayout::Sequential => {
|
||||
// Use sequential order to illustrate the experience a user is likely to have
|
||||
// when ingesting bulk data.
|
||||
key.field6 = i as u32;
|
||||
}
|
||||
KeyLayout::Random => {
|
||||
// Use random-order keys to avoid giving a false advantage to data structures that are
|
||||
// faster when inserting on the end.
|
||||
key.field6 = murmurhash32(i as u32);
|
||||
}
|
||||
KeyLayout::RandomReuse(mask) => {
|
||||
// Use low bits only, to limit cardinality
|
||||
key.field6 = murmurhash32(i as u32) & mask;
|
||||
}
|
||||
}
|
||||
|
||||
layer.put_value(key, lsn, &data, &ctx).await?;
|
||||
}
|
||||
layer.freeze(lsn + 1).await;
|
||||
|
||||
if matches!(write_delta, WriteDelta::Yes) {
|
||||
let l0_flush_state = L0FlushGlobalState::new(L0FlushConfig::Direct {
|
||||
max_concurrency: NonZeroUsize::new(1).unwrap(),
|
||||
});
|
||||
let (_desc, path) = layer
|
||||
.write_to_disk(&ctx, None, l0_flush_state.inner())
|
||||
.await?
|
||||
.unwrap();
|
||||
tokio::fs::remove_file(path).await?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Wrapper to instantiate a tokio runtime
|
||||
fn ingest_main(
|
||||
conf: &'static PageServerConf,
|
||||
put_size: usize,
|
||||
put_count: usize,
|
||||
key_layout: KeyLayout,
|
||||
write_delta: WriteDelta,
|
||||
) {
|
||||
let runtime = tokio::runtime::Builder::new_current_thread()
|
||||
.enable_all()
|
||||
.build()
|
||||
.unwrap();
|
||||
|
||||
runtime.block_on(async move {
|
||||
let r = ingest(conf, put_size, put_count, key_layout, write_delta).await;
|
||||
if let Err(e) = r {
|
||||
panic!("{e:?}");
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/// Declare a series of benchmarks for the Pageserver's ingest write path.
|
||||
///
|
||||
/// This benchmark does not include WAL decode: it starts at InMemoryLayer::put_value, and ends either
|
||||
/// at freezing the ephemeral layer, or writing the ephemeral layer out to an L0 (depending on whether WriteDelta is set).
|
||||
///
|
||||
/// Genuine disk I/O is used, so expect results to differ depending on storage. However, when running on
|
||||
/// a fast disk, CPU is the bottleneck at time of writing.
|
||||
fn criterion_benchmark(c: &mut Criterion) {
|
||||
let temp_dir_parent: Utf8PathBuf = env::current_dir().unwrap().try_into().unwrap();
|
||||
let temp_dir = camino_tempfile::tempdir_in(temp_dir_parent).unwrap();
|
||||
eprintln!("Data directory: {}", temp_dir.path());
|
||||
|
||||
let conf: &'static PageServerConf = Box::leak(Box::new(
|
||||
pageserver::config::PageServerConf::dummy_conf(temp_dir.path().to_path_buf()),
|
||||
));
|
||||
virtual_file::init(16384, IoEngineKind::TokioEpollUring);
|
||||
page_cache::init(conf.page_cache_size);
|
||||
|
||||
{
|
||||
let mut group = c.benchmark_group("ingest-small-values");
|
||||
let put_size = 100usize;
|
||||
let put_count = 128 * 1024 * 1024 / put_size;
|
||||
group.throughput(criterion::Throughput::Bytes((put_size * put_count) as u64));
|
||||
group.sample_size(10);
|
||||
group.bench_function("ingest 128MB/100b seq", |b| {
|
||||
b.iter(|| {
|
||||
ingest_main(
|
||||
conf,
|
||||
put_size,
|
||||
put_count,
|
||||
KeyLayout::Sequential,
|
||||
WriteDelta::Yes,
|
||||
)
|
||||
})
|
||||
});
|
||||
group.bench_function("ingest 128MB/100b rand", |b| {
|
||||
b.iter(|| {
|
||||
ingest_main(
|
||||
conf,
|
||||
put_size,
|
||||
put_count,
|
||||
KeyLayout::Random,
|
||||
WriteDelta::Yes,
|
||||
)
|
||||
})
|
||||
});
|
||||
group.bench_function("ingest 128MB/100b rand-1024keys", |b| {
|
||||
b.iter(|| {
|
||||
ingest_main(
|
||||
conf,
|
||||
put_size,
|
||||
put_count,
|
||||
KeyLayout::RandomReuse(0x3ff),
|
||||
WriteDelta::Yes,
|
||||
)
|
||||
})
|
||||
});
|
||||
group.bench_function("ingest 128MB/100b seq, no delta", |b| {
|
||||
b.iter(|| {
|
||||
ingest_main(
|
||||
conf,
|
||||
put_size,
|
||||
put_count,
|
||||
KeyLayout::Sequential,
|
||||
WriteDelta::No,
|
||||
)
|
||||
})
|
||||
});
|
||||
}
|
||||
|
||||
{
|
||||
let mut group = c.benchmark_group("ingest-big-values");
|
||||
let put_size = 8192usize;
|
||||
let put_count = 128 * 1024 * 1024 / put_size;
|
||||
group.throughput(criterion::Throughput::Bytes((put_size * put_count) as u64));
|
||||
group.sample_size(10);
|
||||
group.bench_function("ingest 128MB/8k seq", |b| {
|
||||
b.iter(|| {
|
||||
ingest_main(
|
||||
conf,
|
||||
put_size,
|
||||
put_count,
|
||||
KeyLayout::Sequential,
|
||||
WriteDelta::Yes,
|
||||
)
|
||||
})
|
||||
});
|
||||
group.bench_function("ingest 128MB/8k seq, no delta", |b| {
|
||||
b.iter(|| {
|
||||
ingest_main(
|
||||
conf,
|
||||
put_size,
|
||||
put_count,
|
||||
KeyLayout::Sequential,
|
||||
WriteDelta::No,
|
||||
)
|
||||
})
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
criterion_group!(benches, criterion_benchmark);
|
||||
criterion_main!(benches);
|
||||
@@ -125,6 +125,7 @@ fn main() -> anyhow::Result<()> {
|
||||
info!(?conf.virtual_file_io_engine, "starting with virtual_file IO engine");
|
||||
info!(?conf.get_impl, "starting with get page implementation");
|
||||
info!(?conf.get_vectored_impl, "starting with vectored get page implementation");
|
||||
info!(?conf.compact_level0_phase1_value_access, "starting with setting for compact_level0_phase1_value_access");
|
||||
|
||||
let tenants_path = conf.tenants_path();
|
||||
if !tenants_path.exists() {
|
||||
|
||||
@@ -29,6 +29,7 @@ use utils::{
|
||||
logging::LogFormat,
|
||||
};
|
||||
|
||||
use crate::tenant::timeline::compaction::CompactL0Phase1ValueAccess;
|
||||
use crate::tenant::vectored_blob_io::MaxVectoredReadBytes;
|
||||
use crate::tenant::{config::TenantConfOpt, timeline::GetImpl};
|
||||
use crate::tenant::{TENANTS_SEGMENT_NAME, TIMELINES_SEGMENT_NAME};
|
||||
@@ -295,6 +296,10 @@ pub struct PageServerConf {
|
||||
pub ephemeral_bytes_per_memory_kb: usize,
|
||||
|
||||
pub l0_flush: L0FlushConfig,
|
||||
|
||||
/// This flag is temporary and will be removed after gradual rollout.
|
||||
/// See <https://github.com/neondatabase/neon/issues/8184>.
|
||||
pub compact_level0_phase1_value_access: CompactL0Phase1ValueAccess,
|
||||
}
|
||||
|
||||
/// We do not want to store this in a PageServerConf because the latter may be logged
|
||||
@@ -401,6 +406,8 @@ struct PageServerConfigBuilder {
|
||||
ephemeral_bytes_per_memory_kb: BuilderValue<usize>,
|
||||
|
||||
l0_flush: BuilderValue<L0FlushConfig>,
|
||||
|
||||
compact_level0_phase1_value_access: BuilderValue<CompactL0Phase1ValueAccess>,
|
||||
}
|
||||
|
||||
impl PageServerConfigBuilder {
|
||||
@@ -490,6 +497,7 @@ impl PageServerConfigBuilder {
|
||||
validate_vectored_get: Set(DEFAULT_VALIDATE_VECTORED_GET),
|
||||
ephemeral_bytes_per_memory_kb: Set(DEFAULT_EPHEMERAL_BYTES_PER_MEMORY_KB),
|
||||
l0_flush: Set(L0FlushConfig::default()),
|
||||
compact_level0_phase1_value_access: Set(CompactL0Phase1ValueAccess::default()),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -673,6 +681,10 @@ impl PageServerConfigBuilder {
|
||||
self.l0_flush = BuilderValue::Set(value);
|
||||
}
|
||||
|
||||
pub fn compact_level0_phase1_value_access(&mut self, value: CompactL0Phase1ValueAccess) {
|
||||
self.compact_level0_phase1_value_access = BuilderValue::Set(value);
|
||||
}
|
||||
|
||||
pub fn build(self, id: NodeId) -> anyhow::Result<PageServerConf> {
|
||||
let default = Self::default_values();
|
||||
|
||||
@@ -730,6 +742,7 @@ impl PageServerConfigBuilder {
|
||||
image_compression,
|
||||
ephemeral_bytes_per_memory_kb,
|
||||
l0_flush,
|
||||
compact_level0_phase1_value_access,
|
||||
}
|
||||
CUSTOM LOGIC
|
||||
{
|
||||
@@ -1002,6 +1015,9 @@ impl PageServerConf {
|
||||
"l0_flush" => {
|
||||
builder.l0_flush(utils::toml_edit_ext::deserialize_item(item).context("l0_flush")?)
|
||||
}
|
||||
"compact_level0_phase1_value_access" => {
|
||||
builder.compact_level0_phase1_value_access(utils::toml_edit_ext::deserialize_item(item).context("compact_level0_phase1_value_access")?)
|
||||
}
|
||||
_ => bail!("unrecognized pageserver option '{key}'"),
|
||||
}
|
||||
}
|
||||
@@ -1086,6 +1102,7 @@ impl PageServerConf {
|
||||
validate_vectored_get: defaults::DEFAULT_VALIDATE_VECTORED_GET,
|
||||
ephemeral_bytes_per_memory_kb: defaults::DEFAULT_EPHEMERAL_BYTES_PER_MEMORY_KB,
|
||||
l0_flush: L0FlushConfig::default(),
|
||||
compact_level0_phase1_value_access: CompactL0Phase1ValueAccess::default(),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1327,6 +1344,7 @@ background_task_maximum_delay = '334 s'
|
||||
image_compression: defaults::DEFAULT_IMAGE_COMPRESSION,
|
||||
ephemeral_bytes_per_memory_kb: defaults::DEFAULT_EPHEMERAL_BYTES_PER_MEMORY_KB,
|
||||
l0_flush: L0FlushConfig::default(),
|
||||
compact_level0_phase1_value_access: CompactL0Phase1ValueAccess::default(),
|
||||
},
|
||||
"Correct defaults should be used when no config values are provided"
|
||||
);
|
||||
@@ -1401,6 +1419,7 @@ background_task_maximum_delay = '334 s'
|
||||
image_compression: defaults::DEFAULT_IMAGE_COMPRESSION,
|
||||
ephemeral_bytes_per_memory_kb: defaults::DEFAULT_EPHEMERAL_BYTES_PER_MEMORY_KB,
|
||||
l0_flush: L0FlushConfig::default(),
|
||||
compact_level0_phase1_value_access: CompactL0Phase1ValueAccess::default(),
|
||||
},
|
||||
"Should be able to parse all basic config values correctly"
|
||||
);
|
||||
|
||||
@@ -24,7 +24,7 @@ impl Default for L0FlushConfig {
|
||||
#[derive(Clone)]
|
||||
pub struct L0FlushGlobalState(Arc<Inner>);
|
||||
|
||||
pub enum Inner {
|
||||
pub(crate) enum Inner {
|
||||
PageCached,
|
||||
Direct { semaphore: tokio::sync::Semaphore },
|
||||
}
|
||||
@@ -40,7 +40,7 @@ impl L0FlushGlobalState {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn inner(&self) -> &Arc<Inner> {
|
||||
pub(crate) fn inner(&self) -> &Arc<Inner> {
|
||||
&self.0
|
||||
}
|
||||
}
|
||||
|
||||
@@ -8,8 +8,7 @@ use std::time::Duration;
|
||||
pub use pageserver_api::key::{Key, KEY_SIZE};
|
||||
|
||||
/// A 'value' stored for a one Key.
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
#[cfg_attr(test, derive(PartialEq))]
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
|
||||
pub enum Value {
|
||||
/// An Image value contains a full copy of the value
|
||||
Image(Bytes),
|
||||
|
||||
@@ -296,13 +296,19 @@ where
|
||||
let mut stack = Vec::new();
|
||||
stack.push((self.root_blk, None));
|
||||
let block_cursor = self.reader.block_cursor();
|
||||
let mut node_buf = [0_u8; PAGE_SZ];
|
||||
while let Some((node_blknum, opt_iter)) = stack.pop() {
|
||||
// Locate the node.
|
||||
let node_buf = block_cursor
|
||||
// Read the node, through the PS PageCache, into local variable `node_buf`.
|
||||
// We could keep the page cache read guard alive, but, at the time of writing,
|
||||
// we run quite small PS PageCache s => can't risk running out of
|
||||
// PageCache space because this stream isn't consumed fast enough.
|
||||
let page_read_guard = block_cursor
|
||||
.read_blk(self.start_blk + node_blknum, ctx)
|
||||
.await?;
|
||||
node_buf.copy_from_slice(page_read_guard.as_ref());
|
||||
drop(page_read_guard); // drop page cache read guard early
|
||||
|
||||
let node = OnDiskNode::deparse(node_buf.as_ref())?;
|
||||
let node = OnDiskNode::deparse(&node_buf)?;
|
||||
let prefix_len = node.prefix_len as usize;
|
||||
let suffix_len = node.suffix_len as usize;
|
||||
|
||||
@@ -345,6 +351,7 @@ where
|
||||
Either::Left(idx..node.num_children.into())
|
||||
};
|
||||
|
||||
|
||||
// idx points to the first match now. Keep going from there
|
||||
while let Some(idx) = iter.next() {
|
||||
let key_off = idx * suffix_len;
|
||||
|
||||
@@ -539,25 +539,19 @@ impl LayerAccessStats {
|
||||
self.record_residence_event_at(SystemTime::now())
|
||||
}
|
||||
|
||||
fn record_access_at(&self, now: SystemTime) -> bool {
|
||||
pub(crate) fn record_access_at(&self, now: SystemTime) {
|
||||
let (mut mask, mut value) = Self::to_low_res_timestamp(Self::ATIME_SHIFT, now);
|
||||
|
||||
// A layer which is accessed must be visible.
|
||||
mask |= 0x1 << Self::VISIBILITY_SHIFT;
|
||||
value |= 0x1 << Self::VISIBILITY_SHIFT;
|
||||
|
||||
let old_bits = self.write_bits(mask, value);
|
||||
!matches!(
|
||||
self.decode_visibility(old_bits),
|
||||
LayerVisibilityHint::Visible
|
||||
)
|
||||
self.write_bits(mask, value);
|
||||
}
|
||||
|
||||
/// Returns true if we modified the layer's visibility to set it to Visible implicitly
|
||||
/// as a result of this access
|
||||
pub(crate) fn record_access(&self, ctx: &RequestContext) -> bool {
|
||||
pub(crate) fn record_access(&self, ctx: &RequestContext) {
|
||||
if ctx.access_stats_behavior() == AccessStatsBehavior::Skip {
|
||||
return false;
|
||||
return;
|
||||
}
|
||||
|
||||
self.record_access_at(SystemTime::now())
|
||||
|
||||
@@ -36,12 +36,13 @@ use crate::tenant::block_io::{BlockBuf, BlockCursor, BlockLease, BlockReader, Fi
|
||||
use crate::tenant::disk_btree::{
|
||||
DiskBtreeBuilder, DiskBtreeIterator, DiskBtreeReader, VisitDirection,
|
||||
};
|
||||
use crate::tenant::storage_layer::Layer;
|
||||
use crate::tenant::timeline::GetVectoredError;
|
||||
use crate::tenant::vectored_blob_io::{
|
||||
BlobFlag, MaxVectoredReadBytes, StreamingVectoredReadPlanner, VectoredBlobReader, VectoredRead,
|
||||
VectoredReadPlanner,
|
||||
};
|
||||
use crate::tenant::PageReconstructError;
|
||||
use crate::tenant::{PageReconstructError, Timeline};
|
||||
use crate::virtual_file::{self, VirtualFile};
|
||||
use crate::{walrecord, TEMP_FILE_SUFFIX};
|
||||
use crate::{DELTA_FILE_MAGIC, STORAGE_FORMAT_VERSION};
|
||||
@@ -71,7 +72,10 @@ use utils::{
|
||||
lsn::Lsn,
|
||||
};
|
||||
|
||||
use super::{AsLayerDesc, LayerName, PersistentLayerDesc, ValuesReconstructState};
|
||||
use super::{
|
||||
AsLayerDesc, LayerAccessStats, LayerName, PersistentLayerDesc, ResidentLayer,
|
||||
ValuesReconstructState,
|
||||
};
|
||||
|
||||
///
|
||||
/// Header stored in the beginning of the file
|
||||
@@ -196,6 +200,7 @@ impl DeltaKey {
|
||||
pub struct DeltaLayer {
|
||||
path: Utf8PathBuf,
|
||||
pub desc: PersistentLayerDesc,
|
||||
access_stats: LayerAccessStats,
|
||||
inner: OnceCell<Arc<DeltaLayerInner>>,
|
||||
}
|
||||
|
||||
@@ -294,6 +299,7 @@ impl DeltaLayer {
|
||||
/// not loaded already.
|
||||
///
|
||||
async fn load(&self, ctx: &RequestContext) -> Result<&Arc<DeltaLayerInner>> {
|
||||
self.access_stats.record_access(ctx);
|
||||
// Quick exit if already loaded
|
||||
self.inner
|
||||
.get_or_try_init(|| self.load_inner(ctx))
|
||||
@@ -344,6 +350,7 @@ impl DeltaLayer {
|
||||
summary.lsn_range,
|
||||
metadata.len(),
|
||||
),
|
||||
access_stats: Default::default(),
|
||||
inner: OnceCell::new(),
|
||||
})
|
||||
}
|
||||
@@ -366,6 +373,7 @@ impl DeltaLayer {
|
||||
/// 3. Call `finish`.
|
||||
///
|
||||
struct DeltaLayerWriterInner {
|
||||
conf: &'static PageServerConf,
|
||||
pub path: Utf8PathBuf,
|
||||
timeline_id: TimelineId,
|
||||
tenant_shard_id: TenantShardId,
|
||||
@@ -412,6 +420,7 @@ impl DeltaLayerWriterInner {
|
||||
let tree_builder = DiskBtreeBuilder::new(block_buf);
|
||||
|
||||
Ok(Self {
|
||||
conf,
|
||||
path,
|
||||
timeline_id,
|
||||
tenant_shard_id,
|
||||
@@ -486,10 +495,11 @@ impl DeltaLayerWriterInner {
|
||||
async fn finish(
|
||||
self,
|
||||
key_end: Key,
|
||||
timeline: &Arc<Timeline>,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<(PersistentLayerDesc, Utf8PathBuf)> {
|
||||
) -> anyhow::Result<ResidentLayer> {
|
||||
let temp_path = self.path.clone();
|
||||
let result = self.finish0(key_end, ctx).await;
|
||||
let result = self.finish0(key_end, timeline, ctx).await;
|
||||
if result.is_err() {
|
||||
tracing::info!(%temp_path, "cleaning up temporary file after error during writing");
|
||||
if let Err(e) = std::fs::remove_file(&temp_path) {
|
||||
@@ -502,8 +512,9 @@ impl DeltaLayerWriterInner {
|
||||
async fn finish0(
|
||||
self,
|
||||
key_end: Key,
|
||||
timeline: &Arc<Timeline>,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<(PersistentLayerDesc, Utf8PathBuf)> {
|
||||
) -> anyhow::Result<ResidentLayer> {
|
||||
let index_start_blk =
|
||||
((self.blob_writer.size() + PAGE_SZ as u64 - 1) / PAGE_SZ as u64) as u32;
|
||||
|
||||
@@ -568,9 +579,11 @@ impl DeltaLayerWriterInner {
|
||||
// fsync the file
|
||||
file.sync_all().await?;
|
||||
|
||||
trace!("created delta layer {}", self.path);
|
||||
let layer = Layer::finish_creating(self.conf, timeline, desc, &self.path)?;
|
||||
|
||||
Ok((desc, self.path))
|
||||
trace!("created delta layer {}", layer.local_path());
|
||||
|
||||
Ok(layer)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -671,9 +684,14 @@ impl DeltaLayerWriter {
|
||||
pub(crate) async fn finish(
|
||||
mut self,
|
||||
key_end: Key,
|
||||
timeline: &Arc<Timeline>,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<(PersistentLayerDesc, Utf8PathBuf)> {
|
||||
self.inner.take().unwrap().finish(key_end, ctx).await
|
||||
) -> anyhow::Result<ResidentLayer> {
|
||||
self.inner
|
||||
.take()
|
||||
.unwrap()
|
||||
.finish(key_end, timeline, ctx)
|
||||
.await
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
@@ -1580,9 +1598,8 @@ pub(crate) mod test {
|
||||
use super::*;
|
||||
use crate::repository::Value;
|
||||
use crate::tenant::harness::TIMELINE_ID;
|
||||
use crate::tenant::storage_layer::{Layer, ResidentLayer};
|
||||
use crate::tenant::vectored_blob_io::StreamingVectoredReadPlanner;
|
||||
use crate::tenant::{Tenant, Timeline};
|
||||
use crate::tenant::Tenant;
|
||||
use crate::{
|
||||
context::DownloadBehavior,
|
||||
task_mgr::TaskKind,
|
||||
@@ -1876,8 +1893,9 @@ pub(crate) mod test {
|
||||
res?;
|
||||
}
|
||||
|
||||
let (desc, path) = writer.finish(entries_meta.key_range.end, &ctx).await?;
|
||||
let resident = Layer::finish_creating(harness.conf, &timeline, desc, &path)?;
|
||||
let resident = writer
|
||||
.finish(entries_meta.key_range.end, &timeline, &ctx)
|
||||
.await?;
|
||||
|
||||
let inner = resident.get_as_delta(&ctx).await?;
|
||||
|
||||
@@ -2066,8 +2084,7 @@ pub(crate) mod test {
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let (desc, path) = writer.finish(Key::MAX, ctx).await.unwrap();
|
||||
let copied_layer = Layer::finish_creating(tenant.conf, &branch, desc, &path).unwrap();
|
||||
let copied_layer = writer.finish(Key::MAX, &branch, ctx).await.unwrap();
|
||||
|
||||
copied_layer.get_as_delta(ctx).await.unwrap();
|
||||
|
||||
@@ -2195,9 +2212,7 @@ pub(crate) mod test {
|
||||
for (key, lsn, value) in deltas {
|
||||
writer.put_value(key, lsn, value, ctx).await?;
|
||||
}
|
||||
|
||||
let (desc, path) = writer.finish(key_end, ctx).await?;
|
||||
let delta_layer = Layer::finish_creating(tenant.conf, tline, desc, &path)?;
|
||||
let delta_layer = writer.finish(key_end, tline, ctx).await?;
|
||||
|
||||
Ok::<_, anyhow::Error>(delta_layer)
|
||||
}
|
||||
|
||||
@@ -32,6 +32,7 @@ use crate::tenant::block_io::{BlockBuf, BlockReader, FileBlockReader};
|
||||
use crate::tenant::disk_btree::{
|
||||
DiskBtreeBuilder, DiskBtreeIterator, DiskBtreeReader, VisitDirection,
|
||||
};
|
||||
use crate::tenant::storage_layer::LayerAccessStats;
|
||||
use crate::tenant::timeline::GetVectoredError;
|
||||
use crate::tenant::vectored_blob_io::{
|
||||
BlobFlag, MaxVectoredReadBytes, StreamingVectoredReadPlanner, VectoredBlobReader, VectoredRead,
|
||||
@@ -134,6 +135,7 @@ pub struct ImageLayer {
|
||||
pub desc: PersistentLayerDesc,
|
||||
// This entry contains an image of all pages as of this LSN, should be the same as desc.lsn
|
||||
pub lsn: Lsn,
|
||||
access_stats: LayerAccessStats,
|
||||
inner: OnceCell<ImageLayerInner>,
|
||||
}
|
||||
|
||||
@@ -251,6 +253,7 @@ impl ImageLayer {
|
||||
/// not loaded already.
|
||||
///
|
||||
async fn load(&self, ctx: &RequestContext) -> Result<&ImageLayerInner> {
|
||||
self.access_stats.record_access(ctx);
|
||||
self.inner
|
||||
.get_or_try_init(|| self.load_inner(ctx))
|
||||
.await
|
||||
@@ -301,6 +304,7 @@ impl ImageLayer {
|
||||
metadata.len(),
|
||||
), // Now we assume image layer ALWAYS covers the full range. This may change in the future.
|
||||
lsn: summary.lsn,
|
||||
access_stats: Default::default(),
|
||||
inner: OnceCell::new(),
|
||||
})
|
||||
}
|
||||
|
||||
@@ -11,10 +11,9 @@ use crate::repository::{Key, Value};
|
||||
use crate::tenant::block_io::{BlockCursor, BlockReader, BlockReaderRef};
|
||||
use crate::tenant::ephemeral_file::EphemeralFile;
|
||||
use crate::tenant::timeline::GetVectoredError;
|
||||
use crate::tenant::PageReconstructError;
|
||||
use crate::tenant::{PageReconstructError, Timeline};
|
||||
use crate::{l0_flush, page_cache, walrecord};
|
||||
use anyhow::{anyhow, Result};
|
||||
use camino::Utf8PathBuf;
|
||||
use pageserver_api::keyspace::KeySpace;
|
||||
use pageserver_api::models::InMemoryLayerInfo;
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
@@ -33,9 +32,7 @@ use std::sync::atomic::Ordering as AtomicOrdering;
|
||||
use std::sync::atomic::{AtomicU64, AtomicUsize};
|
||||
use tokio::sync::{RwLock, RwLockWriteGuard};
|
||||
|
||||
use super::{
|
||||
DeltaLayerWriter, PersistentLayerDesc, ValueReconstructSituation, ValuesReconstructState,
|
||||
};
|
||||
use super::{DeltaLayerWriter, ResidentLayer, ValueReconstructSituation, ValuesReconstructState};
|
||||
|
||||
#[derive(Debug, PartialEq, Eq, Clone, Copy, Hash)]
|
||||
pub(crate) struct InMemoryLayerFileId(page_cache::FileId);
|
||||
@@ -413,7 +410,8 @@ impl InMemoryLayer {
|
||||
|
||||
/// Common subroutine of the public put_wal_record() and put_page_image() functions.
|
||||
/// Adds the page version to the in-memory tree
|
||||
pub async fn put_value(
|
||||
|
||||
pub(crate) async fn put_value(
|
||||
&self,
|
||||
key: Key,
|
||||
lsn: Lsn,
|
||||
@@ -478,6 +476,8 @@ impl InMemoryLayer {
|
||||
/// Records the end_lsn for non-dropped layers.
|
||||
/// `end_lsn` is exclusive
|
||||
pub async fn freeze(&self, end_lsn: Lsn) {
|
||||
let inner = self.inner.write().await;
|
||||
|
||||
assert!(
|
||||
self.start_lsn < end_lsn,
|
||||
"{} >= {}",
|
||||
@@ -495,13 +495,9 @@ impl InMemoryLayer {
|
||||
})
|
||||
.expect("frozen_local_path_str set only once");
|
||||
|
||||
#[cfg(debug_assertions)]
|
||||
{
|
||||
let inner = self.inner.write().await;
|
||||
for vec_map in inner.index.values() {
|
||||
for (lsn, _pos) in vec_map.as_slice() {
|
||||
assert!(*lsn < end_lsn);
|
||||
}
|
||||
for vec_map in inner.index.values() {
|
||||
for (lsn, _pos) in vec_map.as_slice() {
|
||||
assert!(*lsn < end_lsn);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -511,12 +507,12 @@ impl InMemoryLayer {
|
||||
/// if there are no matching keys.
|
||||
///
|
||||
/// Returns a new delta layer with all the same data as this in-memory layer
|
||||
pub async fn write_to_disk(
|
||||
pub(crate) async fn write_to_disk(
|
||||
&self,
|
||||
timeline: &Arc<Timeline>,
|
||||
ctx: &RequestContext,
|
||||
key_range: Option<Range<Key>>,
|
||||
l0_flush_global_state: &l0_flush::Inner,
|
||||
) -> Result<Option<(PersistentLayerDesc, Utf8PathBuf)>> {
|
||||
) -> Result<Option<ResidentLayer>> {
|
||||
// Grab the lock in read-mode. We hold it over the I/O, but because this
|
||||
// layer is not writeable anymore, no one should be trying to acquire the
|
||||
// write lock on it, so we shouldn't block anyone. There's one exception
|
||||
@@ -528,8 +524,9 @@ impl InMemoryLayer {
|
||||
// rare though, so we just accept the potential latency hit for now.
|
||||
let inner = self.inner.read().await;
|
||||
|
||||
let l0_flush_global_state = timeline.l0_flush_global_state.inner().clone();
|
||||
use l0_flush::Inner;
|
||||
let _concurrency_permit = match l0_flush_global_state {
|
||||
let _concurrency_permit = match &*l0_flush_global_state {
|
||||
Inner::PageCached => None,
|
||||
Inner::Direct { semaphore, .. } => Some(semaphore.acquire().await),
|
||||
};
|
||||
@@ -559,7 +556,7 @@ impl InMemoryLayer {
|
||||
)
|
||||
.await?;
|
||||
|
||||
match l0_flush_global_state {
|
||||
match &*l0_flush_global_state {
|
||||
l0_flush::Inner::PageCached => {
|
||||
let ctx = RequestContextBuilder::extend(ctx)
|
||||
.page_content_kind(PageContentKind::InMemoryLayer)
|
||||
@@ -624,7 +621,7 @@ impl InMemoryLayer {
|
||||
}
|
||||
|
||||
// MAX is used here because we identify L0 layers by full key range
|
||||
let (desc, path) = delta_layer_writer.finish(Key::MAX, ctx).await?;
|
||||
let delta_layer = delta_layer_writer.finish(Key::MAX, timeline, ctx).await?;
|
||||
|
||||
// Hold the permit until all the IO is done, including the fsync in `delta_layer_writer.finish()``.
|
||||
//
|
||||
@@ -636,6 +633,6 @@ impl InMemoryLayer {
|
||||
// we dirtied when writing to the filesystem have been flushed and marked !dirty.
|
||||
drop(_concurrency_permit);
|
||||
|
||||
Ok(Some((desc, path)))
|
||||
Ok(Some(delta_layer))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -316,7 +316,7 @@ impl Layer {
|
||||
other => GetVectoredError::Other(anyhow::anyhow!(other)),
|
||||
})?;
|
||||
|
||||
self.record_access(ctx);
|
||||
self.0.access_stats.record_access(ctx);
|
||||
|
||||
layer
|
||||
.get_values_reconstruct_data(keyspace, lsn_range, reconstruct_data, &self.0, ctx)
|
||||
@@ -396,12 +396,8 @@ impl Layer {
|
||||
self.0.info(reset)
|
||||
}
|
||||
|
||||
pub(crate) fn latest_activity(&self) -> SystemTime {
|
||||
self.0.access_stats.latest_activity()
|
||||
}
|
||||
|
||||
pub(crate) fn visibility(&self) -> LayerVisibilityHint {
|
||||
self.0.access_stats.visibility()
|
||||
pub(crate) fn access_stats(&self) -> &LayerAccessStats {
|
||||
&self.0.access_stats
|
||||
}
|
||||
|
||||
pub(crate) fn local_path(&self) -> &Utf8Path {
|
||||
@@ -451,31 +447,13 @@ impl Layer {
|
||||
}
|
||||
}
|
||||
|
||||
fn record_access(&self, ctx: &RequestContext) {
|
||||
if self.0.access_stats.record_access(ctx) {
|
||||
// Visibility was modified to Visible
|
||||
tracing::info!(
|
||||
"Layer {} became visible as a result of access",
|
||||
self.0.desc.key()
|
||||
);
|
||||
if let Some(tl) = self.0.timeline.upgrade() {
|
||||
tl.metrics
|
||||
.visible_physical_size_gauge
|
||||
.add(self.0.desc.file_size)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn set_visibility(&self, visibility: LayerVisibilityHint) {
|
||||
let old_visibility = self.0.access_stats.set_visibility(visibility.clone());
|
||||
let old_visibility = self.access_stats().set_visibility(visibility.clone());
|
||||
use LayerVisibilityHint::*;
|
||||
match (old_visibility, visibility) {
|
||||
(Visible, Covered) => {
|
||||
// Subtract this layer's contribution to the visible size metric
|
||||
if let Some(tl) = self.0.timeline.upgrade() {
|
||||
debug_assert!(
|
||||
tl.metrics.visible_physical_size_gauge.get() >= self.0.desc.file_size
|
||||
);
|
||||
tl.metrics
|
||||
.visible_physical_size_gauge
|
||||
.sub(self.0.desc.file_size)
|
||||
@@ -693,9 +671,6 @@ impl Drop for LayerInner {
|
||||
}
|
||||
|
||||
if matches!(self.access_stats.visibility(), LayerVisibilityHint::Visible) {
|
||||
debug_assert!(
|
||||
timeline.metrics.visible_physical_size_gauge.get() >= self.desc.file_size
|
||||
);
|
||||
timeline
|
||||
.metrics
|
||||
.visible_physical_size_gauge
|
||||
@@ -1835,7 +1810,7 @@ impl ResidentLayer {
|
||||
// this is valid because the DownloadedLayer::kind is a OnceCell, not a
|
||||
// Mutex<OnceCell>, so we cannot go and deinitialize the value with OnceCell::take
|
||||
// while it's being held.
|
||||
self.owner.record_access(ctx);
|
||||
owner.access_stats.record_access(ctx);
|
||||
|
||||
delta_layer::DeltaLayerInner::load_keys(d, ctx)
|
||||
.await
|
||||
|
||||
@@ -4,7 +4,6 @@ use bytes::Bytes;
|
||||
use pageserver_api::key::{Key, KEY_SIZE};
|
||||
use utils::{id::TimelineId, lsn::Lsn, shard::TenantShardId};
|
||||
|
||||
use crate::tenant::storage_layer::Layer;
|
||||
use crate::{config::PageServerConf, context::RequestContext, repository::Value, tenant::Timeline};
|
||||
|
||||
use super::{DeltaLayerWriter, ImageLayerWriter, ResidentLayer};
|
||||
@@ -174,9 +173,8 @@ impl SplitDeltaLayerWriter {
|
||||
)
|
||||
.await?;
|
||||
let prev_delta_writer = std::mem::replace(&mut self.inner, next_delta_writer);
|
||||
let (desc, path) = prev_delta_writer.finish(key, ctx).await?;
|
||||
let delta_layer = Layer::finish_creating(self.conf, tline, desc, &path)?;
|
||||
self.generated_layers.push(delta_layer);
|
||||
self.generated_layers
|
||||
.push(prev_delta_writer.finish(key, tline, ctx).await?);
|
||||
}
|
||||
self.inner.put_value(key, lsn, val, ctx).await
|
||||
}
|
||||
@@ -192,10 +190,7 @@ impl SplitDeltaLayerWriter {
|
||||
inner,
|
||||
..
|
||||
} = self;
|
||||
|
||||
let (desc, path) = inner.finish(end_key, ctx).await?;
|
||||
let delta_layer = Layer::finish_creating(self.conf, tline, desc, &path)?;
|
||||
generated_layers.push(delta_layer);
|
||||
generated_layers.push(inner.finish(end_key, tline, ctx).await?);
|
||||
Ok(generated_layers)
|
||||
}
|
||||
|
||||
|
||||
@@ -407,16 +407,9 @@ async fn gc_loop(tenant: Arc<Tenant>, cancel: CancellationToken) {
|
||||
error_run_count += 1;
|
||||
let wait_duration = Duration::from_secs_f64(wait_duration);
|
||||
|
||||
if matches!(e, crate::tenant::GcError::TimelineCancelled) {
|
||||
// Timeline was cancelled during gc. We might either be in an event
|
||||
// that affects the entire tenant (tenant deletion, pageserver shutdown),
|
||||
// or in one that affects the timeline only (timeline deletion).
|
||||
// Therefore, don't exit the loop.
|
||||
info!("Gc failed {error_run_count} times, retrying in {wait_duration:?}: {e:?}");
|
||||
} else {
|
||||
error!("Gc failed {error_run_count} times, retrying in {wait_duration:?}: {e:?}");
|
||||
}
|
||||
|
||||
error!(
|
||||
"Gc failed {error_run_count} times, retrying in {wait_duration:?}: {e:?}",
|
||||
);
|
||||
wait_duration
|
||||
}
|
||||
}
|
||||
|
||||
@@ -59,7 +59,7 @@ use std::{
|
||||
collections::{BTreeMap, HashMap, HashSet},
|
||||
sync::atomic::AtomicU64,
|
||||
};
|
||||
use std::{cmp::min, cmp::Ordering, ops::ControlFlow};
|
||||
use std::{cmp::min, ops::ControlFlow};
|
||||
use std::{
|
||||
collections::btree_map::Entry,
|
||||
ops::{Deref, Range},
|
||||
@@ -137,7 +137,7 @@ use self::layer_manager::LayerManager;
|
||||
use self::logical_size::LogicalSize;
|
||||
use self::walreceiver::{WalReceiver, WalReceiverConf};
|
||||
|
||||
use super::{config::TenantConf, storage_layer::LayerVisibilityHint, upload_queue::NotInitialized};
|
||||
use super::{config::TenantConf, upload_queue::NotInitialized};
|
||||
use super::{debug_assert_current_span_has_tenant_and_timeline_id, AttachedTenantConf};
|
||||
use super::{remote_timeline_client::index::IndexPart, storage_layer::LayerFringe};
|
||||
use super::{
|
||||
@@ -180,25 +180,6 @@ impl std::fmt::Display for ImageLayerCreationMode {
|
||||
}
|
||||
}
|
||||
|
||||
/// Wrapper for key range to provide reverse ordering by range length for BinaryHeap
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub(crate) struct Hole {
|
||||
key_range: Range<Key>,
|
||||
coverage_size: usize,
|
||||
}
|
||||
|
||||
impl Ord for Hole {
|
||||
fn cmp(&self, other: &Self) -> Ordering {
|
||||
other.coverage_size.cmp(&self.coverage_size) // inverse order
|
||||
}
|
||||
}
|
||||
|
||||
impl PartialOrd for Hole {
|
||||
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
|
||||
Some(self.cmp(other))
|
||||
}
|
||||
}
|
||||
|
||||
/// Temporary function for immutable storage state refactor, ensures we are dropping mutex guard instead of other things.
|
||||
/// Can be removed after all refactors are done.
|
||||
fn drop_rlock<T>(rlock: tokio::sync::RwLockReadGuard<T>) {
|
||||
@@ -2938,22 +2919,14 @@ impl Timeline {
|
||||
|
||||
let guard = self.layers.read().await;
|
||||
|
||||
let resident = guard.likely_resident_layers().filter_map(|layer| {
|
||||
match layer.visibility() {
|
||||
LayerVisibilityHint::Visible => {
|
||||
// Layer is visible to one or more read LSNs: elegible for inclusion in layer map
|
||||
let last_activity_ts = layer.latest_activity();
|
||||
Some(HeatMapLayer::new(
|
||||
layer.layer_desc().layer_name(),
|
||||
layer.metadata(),
|
||||
last_activity_ts,
|
||||
))
|
||||
}
|
||||
LayerVisibilityHint::Covered => {
|
||||
// Layer is resident but unlikely to be read: not elegible for inclusion in heatmap.
|
||||
None
|
||||
}
|
||||
}
|
||||
let resident = guard.likely_resident_layers().map(|layer| {
|
||||
let last_activity_ts = layer.access_stats().latest_activity();
|
||||
|
||||
HeatMapLayer::new(
|
||||
layer.layer_desc().layer_name(),
|
||||
layer.metadata(),
|
||||
last_activity_ts,
|
||||
)
|
||||
});
|
||||
|
||||
let layers = resident.collect();
|
||||
@@ -3728,14 +3701,12 @@ impl Timeline {
|
||||
let frozen_layer = Arc::clone(frozen_layer);
|
||||
let ctx = ctx.attached_child();
|
||||
let work = async move {
|
||||
let Some((desc, path)) = frozen_layer
|
||||
.write_to_disk(&ctx, key_range, self_clone.l0_flush_global_state.inner())
|
||||
let Some(new_delta) = frozen_layer
|
||||
.write_to_disk(&self_clone, &ctx, key_range)
|
||||
.await?
|
||||
else {
|
||||
return Ok(None);
|
||||
};
|
||||
let new_delta = Layer::finish_creating(self_clone.conf, &self_clone, desc, &path)?;
|
||||
|
||||
// The write_to_disk() above calls writer.finish() which already did the fsync of the inodes.
|
||||
// We just need to fsync the directory in which these inodes are linked,
|
||||
// which we know to be the timeline directory.
|
||||
@@ -5211,7 +5182,7 @@ impl Timeline {
|
||||
let file_size = layer.layer_desc().file_size;
|
||||
max_layer_size = max_layer_size.map_or(Some(file_size), |m| Some(m.max(file_size)));
|
||||
|
||||
let last_activity_ts = layer.latest_activity();
|
||||
let last_activity_ts = layer.access_stats().latest_activity();
|
||||
|
||||
EvictionCandidate {
|
||||
layer: layer.into(),
|
||||
@@ -5368,8 +5339,9 @@ impl Timeline {
|
||||
for (key, lsn, val) in deltas.data {
|
||||
delta_layer_writer.put_value(key, lsn, val, ctx).await?;
|
||||
}
|
||||
let (desc, path) = delta_layer_writer.finish(deltas.key_range.end, ctx).await?;
|
||||
let delta_layer = Layer::finish_creating(self.conf, self, desc, &path)?;
|
||||
let delta_layer = delta_layer_writer
|
||||
.finish(deltas.key_range.end, self, ctx)
|
||||
.await?;
|
||||
|
||||
{
|
||||
let mut guard = self.layers.write().await;
|
||||
|
||||
@@ -35,8 +35,8 @@ use crate::tenant::storage_layer::merge_iterator::MergeIterator;
|
||||
use crate::tenant::storage_layer::{
|
||||
AsLayerDesc, PersistentLayerDesc, PersistentLayerKey, ValueReconstructState,
|
||||
};
|
||||
use crate::tenant::timeline::ImageLayerCreationOutcome;
|
||||
use crate::tenant::timeline::{drop_rlock, DeltaLayerWriter, ImageLayerWriter};
|
||||
use crate::tenant::timeline::{Hole, ImageLayerCreationOutcome};
|
||||
use crate::tenant::timeline::{Layer, ResidentLayer};
|
||||
use crate::tenant::DeltaLayer;
|
||||
use crate::virtual_file::{MaybeFatalIo, VirtualFile};
|
||||
@@ -752,66 +752,230 @@ impl Timeline {
|
||||
.read_lock_held_spawn_blocking_startup_micros
|
||||
.till_now();
|
||||
|
||||
// Determine N largest holes where N is number of compacted layers.
|
||||
let max_holes = deltas_to_compact.len();
|
||||
let last_record_lsn = self.get_last_record_lsn();
|
||||
let min_hole_range = (target_file_size / page_cache::PAGE_SZ as u64) as i128;
|
||||
let min_hole_coverage_size = 3; // TODO: something more flexible?
|
||||
|
||||
// min-heap (reserve space for one more element added before eviction)
|
||||
let mut heap: BinaryHeap<Hole> = BinaryHeap::with_capacity(max_holes + 1);
|
||||
let mut prev: Option<Key> = None;
|
||||
|
||||
let mut all_keys = Vec::new();
|
||||
|
||||
for l in deltas_to_compact.iter() {
|
||||
all_keys.extend(l.load_keys(ctx).await.map_err(CompactionError::Other)?);
|
||||
}
|
||||
|
||||
// FIXME: should spawn_blocking the rest of this function
|
||||
|
||||
// The current stdlib sorting implementation is designed in a way where it is
|
||||
// particularly fast where the slice is made up of sorted sub-ranges.
|
||||
all_keys.sort_by_key(|DeltaEntry { key, lsn, .. }| (*key, *lsn));
|
||||
// TODO: replace with streaming k-merge
|
||||
let all_keys = {
|
||||
let mut all_keys = Vec::new();
|
||||
for l in deltas_to_compact.iter() {
|
||||
all_keys.extend(l.load_keys(ctx).await.map_err(CompactionError::Other)?);
|
||||
}
|
||||
// The current stdlib sorting implementation is designed in a way where it is
|
||||
// particularly fast where the slice is made up of sorted sub-ranges.
|
||||
all_keys.sort_by_key(|DeltaEntry { key, lsn, .. }| (*key, *lsn));
|
||||
all_keys
|
||||
};
|
||||
|
||||
stats.read_lock_held_key_sort_micros = stats.read_lock_held_prerequisites_micros.till_now();
|
||||
|
||||
for &DeltaEntry { key: next_key, .. } in all_keys.iter() {
|
||||
if let Some(prev_key) = prev {
|
||||
// just first fast filter, do not create hole entries for metadata keys. The last hole in the
|
||||
// compaction is the gap between data key and metadata keys.
|
||||
if next_key.to_i128() - prev_key.to_i128() >= min_hole_range
|
||||
&& !Key::is_metadata_key(&prev_key)
|
||||
{
|
||||
let key_range = prev_key..next_key;
|
||||
// Measuring hole by just subtraction of i128 representation of key range boundaries
|
||||
// has not so much sense, because largest holes will corresponds field1/field2 changes.
|
||||
// But we are mostly interested to eliminate holes which cause generation of excessive image layers.
|
||||
// That is why it is better to measure size of hole as number of covering image layers.
|
||||
let coverage_size = layers.image_coverage(&key_range, last_record_lsn).len();
|
||||
if coverage_size >= min_hole_coverage_size {
|
||||
heap.push(Hole {
|
||||
key_range,
|
||||
coverage_size,
|
||||
});
|
||||
if heap.len() > max_holes {
|
||||
heap.pop(); // remove smallest hole
|
||||
// Determine N largest holes where N is number of compacted layers. The vec is sorted by key range start.
|
||||
//
|
||||
// A hole is a key range for which this compaction doesn't have any WAL records.
|
||||
// Our goal in this compaction iteration is to avoid creating L1s that, in terms of their key range,
|
||||
// cover the hole, but actually don't contain any WAL records for that key range.
|
||||
// The reason is that the mere stack of L1s (`count_deltas`) triggers image layer creation (`create_image_layers`).
|
||||
// That image layer creation would be useless for a hole range covered by L1s that don't contain any WAL records.
|
||||
//
|
||||
// The algorithm chooses holes as follows.
|
||||
// - Slide a 2-window over the keys in key orde to get the hole range (=distance between two keys).
|
||||
// - Filter: min threshold on range length
|
||||
// - Rank: by coverage size (=number of image layers required to reconstruct each key in the range for which we have any data)
|
||||
//
|
||||
// For more details, intuition, and some ASCII art see https://github.com/neondatabase/neon/pull/3597#discussion_r1112704451
|
||||
#[derive(PartialEq, Eq)]
|
||||
struct Hole {
|
||||
key_range: Range<Key>,
|
||||
coverage_size: usize,
|
||||
}
|
||||
let holes: Vec<Hole> = {
|
||||
use std::cmp::Ordering;
|
||||
impl Ord for Hole {
|
||||
fn cmp(&self, other: &Self) -> Ordering {
|
||||
self.coverage_size.cmp(&other.coverage_size).reverse()
|
||||
}
|
||||
}
|
||||
impl PartialOrd for Hole {
|
||||
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
|
||||
Some(self.cmp(other))
|
||||
}
|
||||
}
|
||||
let max_holes = deltas_to_compact.len();
|
||||
let last_record_lsn = self.get_last_record_lsn();
|
||||
let min_hole_range = (target_file_size / page_cache::PAGE_SZ as u64) as i128;
|
||||
let min_hole_coverage_size = 3; // TODO: something more flexible?
|
||||
// min-heap (reserve space for one more element added before eviction)
|
||||
let mut heap: BinaryHeap<Hole> = BinaryHeap::with_capacity(max_holes + 1);
|
||||
let mut prev: Option<Key> = None;
|
||||
|
||||
for &DeltaEntry { key: next_key, .. } in all_keys.iter() {
|
||||
if let Some(prev_key) = prev {
|
||||
// just first fast filter, do not create hole entries for metadata keys. The last hole in the
|
||||
// compaction is the gap between data key and metadata keys.
|
||||
if next_key.to_i128() - prev_key.to_i128() >= min_hole_range
|
||||
&& !Key::is_metadata_key(&prev_key)
|
||||
{
|
||||
let key_range = prev_key..next_key;
|
||||
// Measuring hole by just subtraction of i128 representation of key range boundaries
|
||||
// has not so much sense, because largest holes will corresponds field1/field2 changes.
|
||||
// But we are mostly interested to eliminate holes which cause generation of excessive image layers.
|
||||
// That is why it is better to measure size of hole as number of covering image layers.
|
||||
let coverage_size =
|
||||
layers.image_coverage(&key_range, last_record_lsn).len();
|
||||
if coverage_size >= min_hole_coverage_size {
|
||||
heap.push(Hole {
|
||||
key_range,
|
||||
coverage_size,
|
||||
});
|
||||
if heap.len() > max_holes {
|
||||
heap.pop(); // remove smallest hole
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
prev = Some(next_key.next());
|
||||
}
|
||||
prev = Some(next_key.next());
|
||||
}
|
||||
let mut holes = heap.into_vec();
|
||||
holes.sort_unstable_by_key(|hole| hole.key_range.start);
|
||||
holes
|
||||
};
|
||||
stats.read_lock_held_compute_holes_micros = stats.read_lock_held_key_sort_micros.till_now();
|
||||
drop_rlock(guard);
|
||||
stats.read_lock_drop_micros = stats.read_lock_held_compute_holes_micros.till_now();
|
||||
let mut holes = heap.into_vec();
|
||||
holes.sort_unstable_by_key(|hole| hole.key_range.start);
|
||||
let mut next_hole = 0; // index of next hole in holes vector
|
||||
|
||||
// This iterator walks through all key-value pairs from all the layers
|
||||
// we're compacting, in key, LSN order.
|
||||
let all_values_iter = all_keys.iter();
|
||||
// If there's both a Value::Image and Value::WalRecord for the same (key,lsn),
|
||||
// then the Value::Image is ordered before Value::WalRecord.
|
||||
//
|
||||
// TODO(https://github.com/neondatabase/neon/issues/8184): remove the page cached blob_io
|
||||
// option and validation code once we've reached confidence.
|
||||
enum AllValuesIter<'a> {
|
||||
PageCachedBlobIo {
|
||||
all_keys_iter: VecIter<'a>,
|
||||
},
|
||||
StreamingKmergeBypassingPageCache {
|
||||
merge_iter: MergeIterator<'a>,
|
||||
},
|
||||
ValidatingStreamingKmergeBypassingPageCache {
|
||||
mode: CompactL0BypassPageCacheValidation,
|
||||
merge_iter: MergeIterator<'a>,
|
||||
all_keys_iter: VecIter<'a>,
|
||||
},
|
||||
}
|
||||
type VecIter<'a> = std::slice::Iter<'a, DeltaEntry<'a>>; // TODO: distinguished lifetimes
|
||||
impl AllValuesIter<'_> {
|
||||
async fn next_all_keys_iter(
|
||||
iter: &mut VecIter<'_>,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<Option<(Key, Lsn, Value)>> {
|
||||
let Some(DeltaEntry {
|
||||
key,
|
||||
lsn,
|
||||
val: value_ref,
|
||||
..
|
||||
}) = iter.next()
|
||||
else {
|
||||
return Ok(None);
|
||||
};
|
||||
let value = value_ref.load(ctx).await?;
|
||||
Ok(Some((*key, *lsn, value)))
|
||||
}
|
||||
async fn next(
|
||||
&mut self,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<Option<(Key, Lsn, Value)>> {
|
||||
match self {
|
||||
AllValuesIter::PageCachedBlobIo { all_keys_iter: iter } => {
|
||||
Self::next_all_keys_iter(iter, ctx).await
|
||||
}
|
||||
AllValuesIter::StreamingKmergeBypassingPageCache { merge_iter } => merge_iter.next().await,
|
||||
AllValuesIter::ValidatingStreamingKmergeBypassingPageCache { mode, merge_iter, all_keys_iter } => async {
|
||||
// advance both iterators
|
||||
let all_keys_iter_item = Self::next_all_keys_iter(all_keys_iter, ctx).await;
|
||||
let merge_iter_item = merge_iter.next().await;
|
||||
// compare results & log warnings as needed
|
||||
macro_rules! rate_limited_warn {
|
||||
($($arg:tt)*) => {{
|
||||
if cfg!(debug_assertions) || cfg!(feature = "testing") {
|
||||
warn!($($arg)*);
|
||||
panic!("CompactL0BypassPageCacheValidation failure, check logs");
|
||||
}
|
||||
use once_cell::sync::Lazy;
|
||||
use utils::rate_limit::RateLimit;
|
||||
use std::sync::Mutex;
|
||||
use std::time::Duration;
|
||||
static LOGGED: Lazy<Mutex<RateLimit>> =
|
||||
Lazy::new(|| Mutex::new(RateLimit::new(Duration::from_secs(10))));
|
||||
let mut rate_limit = LOGGED.lock().unwrap();
|
||||
rate_limit.call(|| {
|
||||
warn!($($arg)*);
|
||||
});
|
||||
}}
|
||||
}
|
||||
match (&all_keys_iter_item, &merge_iter_item) {
|
||||
(Err(_), Err(_)) => {
|
||||
// don't bother asserting equivality of the errors
|
||||
}
|
||||
(Err(all_keys), Ok(merge)) => {
|
||||
rate_limited_warn!(?merge, "all_keys_iter returned an error where merge did not: {all_keys:?}");
|
||||
},
|
||||
(Ok(all_keys), Err(merge)) => {
|
||||
rate_limited_warn!(?all_keys, "merge returned an error where all_keys_iter did not: {merge:?}");
|
||||
},
|
||||
(Ok(None), Ok(None)) => { }
|
||||
(Ok(Some(all_keys)), Ok(None)) => {
|
||||
rate_limited_warn!(?all_keys, "merge returned None where all_keys_iter returned Some");
|
||||
}
|
||||
(Ok(None), Ok(Some(merge))) => {
|
||||
rate_limited_warn!(?merge, "all_keys_iter returned None where merge returned Some");
|
||||
}
|
||||
(Ok(Some((all_keys_key, all_keys_lsn, all_keys_value))), Ok(Some((merge_key, merge_lsn, merge_value)))) => {
|
||||
match mode {
|
||||
// TODO: in this mode, we still load the value from disk for both iterators, even though we only need the all_keys_iter one
|
||||
CompactL0BypassPageCacheValidation::KeyLsn => {
|
||||
let all_keys = (all_keys_key, all_keys_lsn);
|
||||
let merge = (merge_key, merge_lsn);
|
||||
if all_keys != merge {
|
||||
rate_limited_warn!(?all_keys, ?merge, "merge returned a different (Key,LSN) than all_keys_iter");
|
||||
}
|
||||
}
|
||||
CompactL0BypassPageCacheValidation::KeyLsnValue => {
|
||||
let all_keys = (all_keys_key, all_keys_lsn, all_keys_value);
|
||||
let merge = (merge_key, merge_lsn, merge_value);
|
||||
if all_keys != merge {
|
||||
rate_limited_warn!(?all_keys, ?merge, "merge returned a different (Key,LSN,Value) than all_keys_iter");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
// in case of mismatch, trust the legacy all_keys_iter_item
|
||||
all_keys_iter_item
|
||||
}.instrument(info_span!("next")).await
|
||||
}
|
||||
}
|
||||
}
|
||||
let mut all_values_iter = match &self.conf.compact_level0_phase1_value_access {
|
||||
CompactL0Phase1ValueAccess::PageCachedBlobIo => AllValuesIter::PageCachedBlobIo {
|
||||
all_keys_iter: all_keys.iter(),
|
||||
},
|
||||
CompactL0Phase1ValueAccess::StreamingKmerge { validate } => {
|
||||
let merge_iter = {
|
||||
let mut deltas = Vec::with_capacity(deltas_to_compact.len());
|
||||
for l in deltas_to_compact.iter() {
|
||||
let l = l.get_as_delta(ctx).await.map_err(CompactionError::Other)?;
|
||||
deltas.push(l);
|
||||
}
|
||||
MergeIterator::create(&deltas, &[], ctx)
|
||||
};
|
||||
match validate {
|
||||
None => AllValuesIter::StreamingKmergeBypassingPageCache { merge_iter },
|
||||
Some(validate) => AllValuesIter::ValidatingStreamingKmergeBypassingPageCache {
|
||||
mode: validate.clone(),
|
||||
merge_iter,
|
||||
all_keys_iter: all_keys.iter(),
|
||||
},
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
// This iterator walks through all keys and is needed to calculate size used by each key
|
||||
let mut all_keys_iter = all_keys
|
||||
@@ -882,12 +1046,13 @@ impl Timeline {
|
||||
let mut key_values_total_size = 0u64;
|
||||
let mut dup_start_lsn: Lsn = Lsn::INVALID; // start LSN of layer containing values of the single key
|
||||
let mut dup_end_lsn: Lsn = Lsn::INVALID; // end LSN of layer containing values of the single key
|
||||
let mut next_hole = 0; // index of next hole in holes vector
|
||||
|
||||
for &DeltaEntry {
|
||||
key, lsn, ref val, ..
|
||||
} in all_values_iter
|
||||
while let Some((key, lsn, value)) = all_values_iter
|
||||
.next(ctx)
|
||||
.await
|
||||
.map_err(CompactionError::Other)?
|
||||
{
|
||||
let value = val.load(ctx).await.map_err(CompactionError::Other)?;
|
||||
let same_key = prev_key.map_or(false, |prev_key| prev_key == key);
|
||||
// We need to check key boundaries once we reach next key or end of layer with the same key
|
||||
if !same_key || lsn == dup_end_lsn {
|
||||
@@ -939,16 +1104,14 @@ impl Timeline {
|
||||
|| contains_hole
|
||||
{
|
||||
// ... if so, flush previous layer and prepare to write new one
|
||||
let (desc, path) = writer
|
||||
.take()
|
||||
.unwrap()
|
||||
.finish(prev_key.unwrap().next(), ctx)
|
||||
.await
|
||||
.map_err(CompactionError::Other)?;
|
||||
let new_delta = Layer::finish_creating(self.conf, self, desc, &path)
|
||||
.map_err(CompactionError::Other)?;
|
||||
|
||||
new_layers.push(new_delta);
|
||||
new_layers.push(
|
||||
writer
|
||||
.take()
|
||||
.unwrap()
|
||||
.finish(prev_key.unwrap().next(), self, ctx)
|
||||
.await
|
||||
.map_err(CompactionError::Other)?,
|
||||
);
|
||||
writer = None;
|
||||
|
||||
if contains_hole {
|
||||
@@ -1011,13 +1174,12 @@ impl Timeline {
|
||||
prev_key = Some(key);
|
||||
}
|
||||
if let Some(writer) = writer {
|
||||
let (desc, path) = writer
|
||||
.finish(prev_key.unwrap().next(), ctx)
|
||||
.await
|
||||
.map_err(CompactionError::Other)?;
|
||||
let new_delta = Layer::finish_creating(self.conf, self, desc, &path)
|
||||
.map_err(CompactionError::Other)?;
|
||||
new_layers.push(new_delta);
|
||||
new_layers.push(
|
||||
writer
|
||||
.finish(prev_key.unwrap().next(), self, ctx)
|
||||
.await
|
||||
.map_err(CompactionError::Other)?,
|
||||
);
|
||||
}
|
||||
|
||||
// Sync layers
|
||||
@@ -1075,6 +1237,10 @@ impl Timeline {
|
||||
}
|
||||
}
|
||||
|
||||
// Without this, rustc complains about deltas_to_compact still
|
||||
// being borrowed when we `.into_iter()` below.
|
||||
drop(all_values_iter);
|
||||
|
||||
Ok(CompactLevel0Phase1Result {
|
||||
new_layers,
|
||||
deltas_to_compact: deltas_to_compact
|
||||
@@ -1182,6 +1348,43 @@ impl TryFrom<CompactLevel0Phase1StatsBuilder> for CompactLevel0Phase1Stats {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq, Eq, Clone, serde::Deserialize, serde::Serialize)]
|
||||
#[serde(tag = "mode", rename_all = "kebab-case", deny_unknown_fields)]
|
||||
pub enum CompactL0Phase1ValueAccess {
|
||||
/// The old way.
|
||||
PageCachedBlobIo,
|
||||
/// The new way.
|
||||
StreamingKmerge {
|
||||
/// If set, we run both the old way and the new way, validate that
|
||||
/// they are identical (=> [`CompactL0BypassPageCacheValidation`]),
|
||||
/// and if the validation fails,
|
||||
/// - in tests: fail them with a panic or
|
||||
/// - in prod, log a rate-limited warning and use the old way's results.
|
||||
///
|
||||
/// If not set, we only run the new way and trust its results.
|
||||
validate: Option<CompactL0BypassPageCacheValidation>,
|
||||
},
|
||||
}
|
||||
|
||||
/// See [`CompactL0Phase1ValueAccess::StreamingKmerge`].
|
||||
#[derive(Debug, PartialEq, Eq, Clone, serde::Deserialize, serde::Serialize)]
|
||||
#[serde(rename_all = "kebab-case")]
|
||||
pub enum CompactL0BypassPageCacheValidation {
|
||||
/// Validate that the series of (key, lsn) pairs are the same.
|
||||
KeyLsn,
|
||||
/// Validate that the entire output of old and new way is identical.
|
||||
KeyLsnValue,
|
||||
}
|
||||
|
||||
impl Default for CompactL0Phase1ValueAccess {
|
||||
fn default() -> Self {
|
||||
CompactL0Phase1ValueAccess::StreamingKmerge {
|
||||
// TODO(https://github.com/neondatabase/neon/issues/8184): change to None once confident
|
||||
validate: Some(CompactL0BypassPageCacheValidation::KeyLsnValue),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Timeline {
|
||||
/// Entry point for new tiered compaction algorithm.
|
||||
///
|
||||
@@ -1763,16 +1966,13 @@ impl Timeline {
|
||||
for (key, lsn, val) in deltas {
|
||||
delta_layer_writer.put_value(key, lsn, val, ctx).await?;
|
||||
}
|
||||
|
||||
stats.produce_delta_layer(delta_layer_writer.size());
|
||||
if dry_run {
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
let (desc, path) = delta_layer_writer
|
||||
.finish(delta_key.key_range.end, ctx)
|
||||
let delta_layer = delta_layer_writer
|
||||
.finish(delta_key.key_range.end, tline, ctx)
|
||||
.await?;
|
||||
let delta_layer = Layer::finish_creating(tline.conf, tline, desc, &path)?;
|
||||
Ok(Some(FlushDeltaResult::CreateResidentLayer(delta_layer)))
|
||||
}
|
||||
|
||||
@@ -2213,9 +2413,9 @@ impl CompactionJobExecutor for TimelineAdaptor {
|
||||
))
|
||||
});
|
||||
|
||||
let (desc, path) = writer.finish(prev.unwrap().0.next(), ctx).await?;
|
||||
let new_delta_layer =
|
||||
Layer::finish_creating(self.timeline.conf, &self.timeline, desc, &path)?;
|
||||
let new_delta_layer = writer
|
||||
.finish(prev.unwrap().0.next(), &self.timeline, ctx)
|
||||
.await?;
|
||||
|
||||
self.new_deltas.push(new_delta_layer);
|
||||
Ok(())
|
||||
|
||||
@@ -488,12 +488,10 @@ async fn copy_lsn_prefix(
|
||||
// reuse the key instead of adding more holes between layers by using the real
|
||||
// highest key in the layer.
|
||||
let reused_highest_key = layer.layer_desc().key_range.end;
|
||||
let (desc, path) = writer
|
||||
.finish(reused_highest_key, ctx)
|
||||
let copied = writer
|
||||
.finish(reused_highest_key, target_timeline, ctx)
|
||||
.await
|
||||
.map_err(CopyDeltaPrefix)?;
|
||||
let copied = Layer::finish_creating(target_timeline.conf, target_timeline, desc, &path)
|
||||
.map_err(CopyDeltaPrefix)?;
|
||||
|
||||
tracing::debug!(%layer, %copied, "new layer produced");
|
||||
|
||||
|
||||
@@ -225,7 +225,7 @@ impl Timeline {
|
||||
continue;
|
||||
}
|
||||
|
||||
let last_activity_ts = layer.latest_activity();
|
||||
let last_activity_ts = layer.access_stats().latest_activity();
|
||||
|
||||
let no_activity_for = match now.duration_since(last_activity_ts) {
|
||||
Ok(d) => d,
|
||||
|
||||
@@ -259,10 +259,13 @@ impl LayerManager {
|
||||
new_layer.layer_desc().lsn_range
|
||||
);
|
||||
|
||||
// Transfer visibility hint from old to new layer, since the new layer covers the same key space. This is not guaranteed to
|
||||
// Transfer visibilty hint from old to new layer, since the new layer covers the same key space. This is not guaranteed to
|
||||
// be accurate (as the new layer may cover a different subset of the key range), but is a sensible default, and prevents
|
||||
// always marking rewritten layers as visible.
|
||||
new_layer.as_ref().set_visibility(old_layer.visibility());
|
||||
new_layer
|
||||
.as_ref()
|
||||
.access_stats()
|
||||
.set_visibility(old_layer.access_stats().visibility());
|
||||
|
||||
// Safety: we may never rewrite the same file in-place. Callers are responsible
|
||||
// for ensuring that they only rewrite layers after something changes the path,
|
||||
|
||||
@@ -7,6 +7,7 @@ use clap::{ArgAction, Parser};
|
||||
use futures::future::BoxFuture;
|
||||
use futures::stream::FuturesUnordered;
|
||||
use futures::{FutureExt, StreamExt};
|
||||
use metrics::launch_timestamp::{set_launch_timestamp_metric, LaunchTimestamp};
|
||||
use remote_storage::RemoteStorageConfig;
|
||||
use sd_notify::NotifyState;
|
||||
use tokio::runtime::Handle;
|
||||
@@ -204,6 +205,7 @@ fn opt_pathbuf_parser(s: &str) -> Result<Utf8PathBuf, String> {
|
||||
|
||||
#[tokio::main(flavor = "current_thread")]
|
||||
async fn main() -> anyhow::Result<()> {
|
||||
let launch_ts = Box::leak(Box::new(LaunchTimestamp::generate()));
|
||||
// We want to allow multiple occurences of the same arg (taking the last) so
|
||||
// that neon_local could generate command with defaults + overrides without
|
||||
// getting 'argument cannot be used multiple times' error. This seems to be
|
||||
@@ -356,14 +358,14 @@ async fn main() -> anyhow::Result<()> {
|
||||
Some(GIT_VERSION.into()),
|
||||
&[("node_id", &conf.my_id.to_string())],
|
||||
);
|
||||
start_safekeeper(conf).await
|
||||
start_safekeeper(launch_ts, conf).await
|
||||
}
|
||||
|
||||
/// Result of joining any of main tasks: upper error means task failed to
|
||||
/// complete, e.g. panicked, inner is error produced by task itself.
|
||||
type JoinTaskRes = Result<anyhow::Result<()>, JoinError>;
|
||||
|
||||
async fn start_safekeeper(conf: SafeKeeperConf) -> Result<()> {
|
||||
async fn start_safekeeper(launch_ts: &'static LaunchTimestamp, conf: SafeKeeperConf) -> Result<()> {
|
||||
// Prevent running multiple safekeepers on the same directory
|
||||
let lock_file_path = conf.workdir.join(PID_FILE_NAME);
|
||||
let lock_file =
|
||||
@@ -491,6 +493,7 @@ async fn start_safekeeper(conf: SafeKeeperConf) -> Result<()> {
|
||||
tasks_handles.push(Box::pin(broker_task_handle));
|
||||
|
||||
set_build_info_metric(GIT_VERSION, BUILD_TAG);
|
||||
set_launch_timestamp_metric(launch_ts);
|
||||
|
||||
// TODO: update tokio-stream, convert to real async Stream with
|
||||
// SignalStream, map it to obtain missing signal name, combine streams into
|
||||
|
||||
@@ -642,7 +642,8 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||
logging::replace_panic_hook_with_tracing_panic_hook().forget();
|
||||
// initialize sentry if SENTRY_DSN is provided
|
||||
let _sentry_guard = init_sentry(Some(GIT_VERSION.into()), &[]);
|
||||
info!("version: {GIT_VERSION} build_tag: {BUILD_TAG}");
|
||||
info!("version: {GIT_VERSION}");
|
||||
info!("build_tag: {BUILD_TAG}");
|
||||
metrics::set_build_info_metric(GIT_VERSION, BUILD_TAG);
|
||||
|
||||
// On any shutdown signal, log receival and exit.
|
||||
|
||||
@@ -172,11 +172,8 @@ pub(crate) async fn branch_cleanup_and_check_errors(
|
||||
}
|
||||
}
|
||||
BlobDataParseResult::Relic => {}
|
||||
BlobDataParseResult::Incorrect {
|
||||
errors,
|
||||
s3_layers: _,
|
||||
} => result.errors.extend(
|
||||
errors
|
||||
BlobDataParseResult::Incorrect(parse_errors) => result.errors.extend(
|
||||
parse_errors
|
||||
.into_iter()
|
||||
.map(|error| format!("parse error: {error}")),
|
||||
),
|
||||
@@ -303,10 +300,7 @@ pub(crate) enum BlobDataParseResult {
|
||||
},
|
||||
/// The remains of a deleted Timeline (i.e. an initdb archive only)
|
||||
Relic,
|
||||
Incorrect {
|
||||
errors: Vec<String>,
|
||||
s3_layers: HashSet<(LayerName, Generation)>,
|
||||
},
|
||||
Incorrect(Vec<String>),
|
||||
}
|
||||
|
||||
pub(crate) fn parse_layer_object_name(name: &str) -> Result<(LayerName, Generation), String> {
|
||||
@@ -449,7 +443,7 @@ pub(crate) async fn list_timeline_blobs(
|
||||
}
|
||||
|
||||
Ok(S3TimelineBlobData {
|
||||
blob_data: BlobDataParseResult::Incorrect { errors, s3_layers },
|
||||
blob_data: BlobDataParseResult::Incorrect(errors),
|
||||
unused_index_keys: index_part_keys,
|
||||
unknown_keys,
|
||||
})
|
||||
|
||||
@@ -208,21 +208,21 @@ async fn main() -> anyhow::Result<()> {
|
||||
}
|
||||
|
||||
if summary.is_fatal() {
|
||||
tracing::error!("Fatal scrub errors detected");
|
||||
Err(anyhow::anyhow!("Fatal scrub errors detected"))
|
||||
} else if summary.is_empty() {
|
||||
// Strictly speaking an empty bucket is a valid bucket, but if someone ran the
|
||||
// scrubber they were likely expecting to scan something, and if we see no timelines
|
||||
// at all then it's likely due to some configuration issues like a bad prefix
|
||||
tracing::error!(
|
||||
Err(anyhow::anyhow!(
|
||||
"No timelines found in bucket {} prefix {}",
|
||||
bucket_config.bucket,
|
||||
bucket_config
|
||||
.prefix_in_bucket
|
||||
.unwrap_or("<none>".to_string())
|
||||
);
|
||||
))
|
||||
} else {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -389,13 +389,10 @@ async fn gc_ancestor(
|
||||
// Post-deletion tenant location: don't try and GC it.
|
||||
continue;
|
||||
}
|
||||
BlobDataParseResult::Incorrect {
|
||||
errors,
|
||||
s3_layers: _, // TODO(yuchen): could still check references to these s3 layers?
|
||||
} => {
|
||||
BlobDataParseResult::Incorrect(reasons) => {
|
||||
// Our primary purpose isn't to report on bad data, but log this rather than skipping silently
|
||||
tracing::warn!(
|
||||
"Skipping ancestor GC for timeline {ttid}, bad metadata: {errors:?}"
|
||||
"Skipping ancestor GC for timeline {ttid}, bad metadata: {reasons:?}"
|
||||
);
|
||||
continue;
|
||||
}
|
||||
@@ -521,12 +518,9 @@ pub async fn pageserver_physical_gc(
|
||||
// Post-deletion tenant location: don't try and GC it.
|
||||
return Ok(summary);
|
||||
}
|
||||
BlobDataParseResult::Incorrect {
|
||||
errors,
|
||||
s3_layers: _,
|
||||
} => {
|
||||
BlobDataParseResult::Incorrect(reasons) => {
|
||||
// Our primary purpose isn't to report on bad data, but log this rather than skipping silently
|
||||
tracing::warn!("Skipping timeline {ttid}, bad metadata: {errors:?}");
|
||||
tracing::warn!("Skipping timeline {ttid}, bad metadata: {reasons:?}");
|
||||
return Ok(summary);
|
||||
}
|
||||
};
|
||||
|
||||
@@ -290,21 +290,13 @@ pub async fn scan_metadata(
|
||||
}
|
||||
}
|
||||
|
||||
match &data.blob_data {
|
||||
BlobDataParseResult::Parsed {
|
||||
index_part: _index_part,
|
||||
index_part_generation: _index_part_generation,
|
||||
s3_layers,
|
||||
} => {
|
||||
tenant_objects.push(ttid, s3_layers.clone());
|
||||
}
|
||||
BlobDataParseResult::Relic => (),
|
||||
BlobDataParseResult::Incorrect {
|
||||
errors: _,
|
||||
s3_layers,
|
||||
} => {
|
||||
tenant_objects.push(ttid, s3_layers.clone());
|
||||
}
|
||||
if let BlobDataParseResult::Parsed {
|
||||
index_part: _index_part,
|
||||
index_part_generation: _index_part_generation,
|
||||
s3_layers,
|
||||
} = &data.blob_data
|
||||
{
|
||||
tenant_objects.push(ttid, s3_layers.clone());
|
||||
}
|
||||
tenant_timeline_results.push((ttid, data));
|
||||
}
|
||||
|
||||
@@ -269,7 +269,7 @@ impl SnapshotDownloader {
|
||||
.context("Downloading timeline")?;
|
||||
}
|
||||
BlobDataParseResult::Relic => {}
|
||||
BlobDataParseResult::Incorrect { .. } => {
|
||||
BlobDataParseResult::Incorrect(_) => {
|
||||
tracing::error!("Bad metadata in timeline {ttid}");
|
||||
}
|
||||
};
|
||||
|
||||
@@ -978,10 +978,7 @@ class NeonEnvBuilder:
|
||||
and self.enable_scrub_on_exit
|
||||
):
|
||||
try:
|
||||
healthy, _ = self.env.storage_scrubber.scan_metadata()
|
||||
if not healthy:
|
||||
e = Exception("Remote storage metadata corrupted")
|
||||
cleanup_error = e
|
||||
self.env.storage_scrubber.scan_metadata()
|
||||
except Exception as e:
|
||||
log.error(f"Error during remote storage scrub: {e}")
|
||||
cleanup_error = e
|
||||
@@ -4414,19 +4411,14 @@ class StorageScrubber:
|
||||
assert stdout is not None
|
||||
return stdout
|
||||
|
||||
def scan_metadata(self, post_to_storage_controller: bool = False) -> Tuple[bool, Any]:
|
||||
"""
|
||||
Returns the health status and the metadata summary.
|
||||
"""
|
||||
def scan_metadata(self, post_to_storage_controller: bool = False) -> Any:
|
||||
args = ["scan-metadata", "--node-kind", "pageserver", "--json"]
|
||||
if post_to_storage_controller:
|
||||
args.append("--post")
|
||||
stdout = self.scrubber_cli(args, timeout=30)
|
||||
|
||||
try:
|
||||
summary = json.loads(stdout)
|
||||
healthy = not summary["with_errors"] and not summary["with_warnings"]
|
||||
return healthy, summary
|
||||
return json.loads(stdout)
|
||||
except:
|
||||
log.error("Failed to decode JSON output from `scan-metadata`. Dumping stdout:")
|
||||
log.error(stdout)
|
||||
|
||||
@@ -61,7 +61,6 @@ class HistoricLayerInfo:
|
||||
remote: bool
|
||||
# None for image layers, true if pageserver thinks this is an L0 delta layer
|
||||
l0: Optional[bool]
|
||||
visible: bool
|
||||
|
||||
@classmethod
|
||||
def from_json(cls, d: Dict[str, Any]) -> HistoricLayerInfo:
|
||||
@@ -80,7 +79,6 @@ class HistoricLayerInfo:
|
||||
lsn_end=d.get("lsn_end"),
|
||||
remote=d["remote"],
|
||||
l0=l0_ness,
|
||||
visible=d["access_stats"]["visible"],
|
||||
)
|
||||
|
||||
|
||||
|
||||
@@ -217,11 +217,7 @@ def test_storage_controller_many_tenants(
|
||||
# A reconciler operation: migrate a shard.
|
||||
shard_number = rng.randint(0, shard_count - 1)
|
||||
tenant_shard_id = TenantShardId(tenant_id, shard_number, shard_count)
|
||||
|
||||
# Migrate it to its secondary location
|
||||
desc = env.storage_controller.tenant_describe(tenant_id)
|
||||
dest_ps_id = desc["shards"][shard_number]["node_secondary"][0]
|
||||
|
||||
dest_ps_id = rng.choice([ps.id for ps in env.pageservers])
|
||||
f = executor.submit(
|
||||
env.storage_controller.tenant_shard_migrate, tenant_shard_id, dest_ps_id
|
||||
)
|
||||
@@ -235,11 +231,7 @@ def test_storage_controller_many_tenants(
|
||||
for f in futs:
|
||||
f.result()
|
||||
|
||||
# Some of the operations above (notably migrations) might leave the controller in a state where it has
|
||||
# some work to do, for example optimizing shard placement after we do a random migration. Wait for the system
|
||||
# to reach a quiescent state before doing following checks.
|
||||
env.storage_controller.reconcile_until_idle()
|
||||
|
||||
# Consistency check is safe here: all the previous operations waited for reconcile before completing
|
||||
env.storage_controller.consistency_check()
|
||||
check_memory()
|
||||
|
||||
|
||||
@@ -496,10 +496,11 @@ def test_historic_storage_formats(
|
||||
# Check the scrubber handles this old data correctly (can read it and doesn't consider it corrupt)
|
||||
#
|
||||
# Do this _before_ importing to the pageserver, as that import may start writing immediately
|
||||
healthy, metadata_summary = env.storage_scrubber.scan_metadata()
|
||||
assert healthy
|
||||
metadata_summary = env.storage_scrubber.scan_metadata()
|
||||
assert metadata_summary["tenant_count"] >= 1
|
||||
assert metadata_summary["timeline_count"] >= 1
|
||||
assert not metadata_summary["with_errors"]
|
||||
assert not metadata_summary["with_warnings"]
|
||||
|
||||
env.neon_cli.import_tenant(dataset.tenant_id)
|
||||
|
||||
|
||||
@@ -214,11 +214,12 @@ def test_generations_upgrade(neon_env_builder: NeonEnvBuilder):
|
||||
|
||||
# Having written a mixture of generation-aware and legacy index_part.json,
|
||||
# ensure the scrubber handles the situation as expected.
|
||||
healthy, metadata_summary = env.storage_scrubber.scan_metadata()
|
||||
metadata_summary = env.storage_scrubber.scan_metadata()
|
||||
assert metadata_summary["tenant_count"] == 1 # Scrubber should have seen our timeline
|
||||
assert metadata_summary["timeline_count"] == 1
|
||||
assert metadata_summary["timeline_shard_count"] == 1
|
||||
assert healthy
|
||||
assert not metadata_summary["with_errors"]
|
||||
assert not metadata_summary["with_warnings"]
|
||||
|
||||
|
||||
def test_deferred_deletion(neon_env_builder: NeonEnvBuilder):
|
||||
|
||||
@@ -2,11 +2,10 @@ import json
|
||||
import os
|
||||
import random
|
||||
import time
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict, Optional, Union
|
||||
from typing import Any, Dict, Optional
|
||||
|
||||
import pytest
|
||||
from fixtures.common_types import TenantId, TenantShardId, TimelineId
|
||||
from fixtures.common_types import TenantId, TimelineId
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import NeonEnvBuilder, NeonPageserver
|
||||
from fixtures.pageserver.common_types import parse_layer_file_name
|
||||
@@ -438,35 +437,6 @@ def test_heatmap_uploads(neon_env_builder: NeonEnvBuilder):
|
||||
validate_heatmap(heatmap_second)
|
||||
|
||||
|
||||
def list_elegible_layers(
|
||||
pageserver, tenant_id: Union[TenantId, TenantShardId], timeline_id: TimelineId
|
||||
) -> list[Path]:
|
||||
"""
|
||||
The subset of layer filenames that are elegible for secondary download: at time of writing this
|
||||
is all resident layers which are also visible.
|
||||
"""
|
||||
candidates = pageserver.list_layers(tenant_id, timeline_id)
|
||||
|
||||
layer_map = pageserver.http_client().layer_map_info(tenant_id, timeline_id)
|
||||
|
||||
# Map of layer filenames to their visibility the "layer name" is not the same as the filename: add suffix to resolve one to the other
|
||||
visible_map = dict(
|
||||
(f"{layer.layer_file_name}-v1-00000001", layer.visible)
|
||||
for layer in layer_map.historic_layers
|
||||
)
|
||||
|
||||
def is_visible(layer_file_name):
|
||||
try:
|
||||
return visible_map[str(layer_file_name)]
|
||||
except KeyError:
|
||||
# Unexpected: tests should call this when pageservers are in a quiet state such that the layer map
|
||||
# matches what's on disk.
|
||||
log.warn(f"Lookup {layer_file_name} from {list(visible_map.keys())}")
|
||||
raise
|
||||
|
||||
return list(c for c in candidates if is_visible(c))
|
||||
|
||||
|
||||
def test_secondary_downloads(neon_env_builder: NeonEnvBuilder):
|
||||
"""
|
||||
Test the overall data flow in secondary mode:
|
||||
@@ -521,7 +491,7 @@ def test_secondary_downloads(neon_env_builder: NeonEnvBuilder):
|
||||
|
||||
ps_secondary.http_client().tenant_secondary_download(tenant_id)
|
||||
|
||||
assert list_elegible_layers(ps_attached, tenant_id, timeline_id) == ps_secondary.list_layers(
|
||||
assert ps_attached.list_layers(tenant_id, timeline_id) == ps_secondary.list_layers(
|
||||
tenant_id, timeline_id
|
||||
)
|
||||
|
||||
@@ -539,9 +509,9 @@ def test_secondary_downloads(neon_env_builder: NeonEnvBuilder):
|
||||
ps_secondary.http_client().tenant_secondary_download(tenant_id)
|
||||
|
||||
try:
|
||||
assert list_elegible_layers(
|
||||
ps_attached, tenant_id, timeline_id
|
||||
) == ps_secondary.list_layers(tenant_id, timeline_id)
|
||||
assert ps_attached.list_layers(tenant_id, timeline_id) == ps_secondary.list_layers(
|
||||
tenant_id, timeline_id
|
||||
)
|
||||
except:
|
||||
# Do a full listing of the secondary location on errors, to help debug of
|
||||
# https://github.com/neondatabase/neon/issues/6966
|
||||
@@ -562,8 +532,8 @@ def test_secondary_downloads(neon_env_builder: NeonEnvBuilder):
|
||||
# ==================================================================
|
||||
try:
|
||||
log.info("Evicting a layer...")
|
||||
layer_to_evict = list_elegible_layers(ps_attached, tenant_id, timeline_id)[0]
|
||||
some_other_layer = list_elegible_layers(ps_attached, tenant_id, timeline_id)[1]
|
||||
layer_to_evict = ps_attached.list_layers(tenant_id, timeline_id)[0]
|
||||
some_other_layer = ps_attached.list_layers(tenant_id, timeline_id)[1]
|
||||
log.info(f"Victim layer: {layer_to_evict.name}")
|
||||
ps_attached.http_client().evict_layer(
|
||||
tenant_id, timeline_id, layer_name=layer_to_evict.name
|
||||
@@ -581,9 +551,9 @@ def test_secondary_downloads(neon_env_builder: NeonEnvBuilder):
|
||||
ps_secondary.http_client().tenant_secondary_download(tenant_id)
|
||||
|
||||
assert layer_to_evict not in ps_attached.list_layers(tenant_id, timeline_id)
|
||||
assert list_elegible_layers(
|
||||
ps_attached, tenant_id, timeline_id
|
||||
) == ps_secondary.list_layers(tenant_id, timeline_id)
|
||||
assert ps_attached.list_layers(tenant_id, timeline_id) == ps_secondary.list_layers(
|
||||
tenant_id, timeline_id
|
||||
)
|
||||
except:
|
||||
# On assertion failures, log some details to help with debugging
|
||||
heatmap = env.pageserver_remote_storage.heatmap_content(tenant_id)
|
||||
@@ -593,8 +563,7 @@ def test_secondary_downloads(neon_env_builder: NeonEnvBuilder):
|
||||
# Scrub the remote storage
|
||||
# ========================
|
||||
# This confirms that the scrubber isn't upset by the presence of the heatmap
|
||||
healthy, _ = env.storage_scrubber.scan_metadata()
|
||||
assert healthy
|
||||
env.storage_scrubber.scan_metadata()
|
||||
|
||||
# Detach secondary and delete tenant
|
||||
# ===================================
|
||||
|
||||
@@ -124,8 +124,7 @@ def test_sharding_smoke(
|
||||
|
||||
# Check the scrubber isn't confused by sharded content, then disable
|
||||
# it during teardown because we'll have deleted by then
|
||||
healthy, _ = env.storage_scrubber.scan_metadata()
|
||||
assert healthy
|
||||
env.storage_scrubber.scan_metadata()
|
||||
|
||||
env.storage_controller.pageserver_api().tenant_delete(tenant_id)
|
||||
assert_prefix_empty(
|
||||
|
||||
@@ -516,8 +516,9 @@ def test_scrubber_scan_pageserver_metadata(
|
||||
assert len(index.layer_metadata) > 0
|
||||
it = iter(index.layer_metadata.items())
|
||||
|
||||
healthy, scan_summary = env.storage_scrubber.scan_metadata(post_to_storage_controller=True)
|
||||
assert healthy
|
||||
scan_summary = env.storage_scrubber.scan_metadata(post_to_storage_controller=True)
|
||||
assert not scan_summary["with_warnings"]
|
||||
assert not scan_summary["with_errors"]
|
||||
|
||||
assert env.storage_controller.metadata_health_is_healthy()
|
||||
|
||||
@@ -531,18 +532,16 @@ def test_scrubber_scan_pageserver_metadata(
|
||||
log.info(f"delete response: {delete_response}")
|
||||
|
||||
# Check scan summary without posting to storage controller. Expect it to be a L0 layer so only emit warnings.
|
||||
_, scan_summary = env.storage_scrubber.scan_metadata()
|
||||
scan_summary = env.storage_scrubber.scan_metadata()
|
||||
log.info(f"{pprint.pformat(scan_summary)}")
|
||||
assert len(scan_summary["with_warnings"]) > 0
|
||||
|
||||
assert env.storage_controller.metadata_health_is_healthy()
|
||||
|
||||
# Now post to storage controller, expect seeing one unhealthy health record
|
||||
_, scan_summary = env.storage_scrubber.scan_metadata(post_to_storage_controller=True)
|
||||
scan_summary = env.storage_scrubber.scan_metadata(post_to_storage_controller=True)
|
||||
log.info(f"{pprint.pformat(scan_summary)}")
|
||||
assert len(scan_summary["with_warnings"]) > 0
|
||||
|
||||
unhealthy = env.storage_controller.metadata_health_list_unhealthy()["unhealthy_tenant_shards"]
|
||||
assert len(unhealthy) == 1 and unhealthy[0] == str(tenant_shard_id)
|
||||
|
||||
neon_env_builder.disable_scrub_on_exit()
|
||||
|
||||
@@ -341,13 +341,13 @@ def test_tenant_delete_scrubber(pg_bin: PgBin, neon_env_builder: NeonEnvBuilder)
|
||||
wait_for_upload(ps_http, tenant_id, timeline_id, last_flush_lsn)
|
||||
env.stop()
|
||||
|
||||
healthy, _ = env.storage_scrubber.scan_metadata()
|
||||
assert healthy
|
||||
result = env.storage_scrubber.scan_metadata()
|
||||
assert result["with_warnings"] == []
|
||||
|
||||
env.start()
|
||||
ps_http = env.pageserver.http_client()
|
||||
ps_http.tenant_delete(tenant_id)
|
||||
env.stop()
|
||||
|
||||
healthy, _ = env.storage_scrubber.scan_metadata()
|
||||
assert healthy
|
||||
env.storage_scrubber.scan_metadata()
|
||||
assert result["with_warnings"] == []
|
||||
|
||||
Reference in New Issue
Block a user