Compare commits

...

37 Commits

Author SHA1 Message Date
Vlad Lazar
9b9eff3973 test: add benchmark for tenants with large slrus
The new bencmark generates tenants with around 300 SLRU blocks
of 8KiB each. Both vectored get implementations are benchmarked.
For the vectored implementation validation is disabled.
2024-02-16 17:10:08 +00:00
Vlad Lazar
2b24f5c33d pagebench: accept connstring for basebackup bench
(cherry picked from commit 49f4feee403eca3ddce3a468c98872767c4fee9c)
2024-02-16 17:07:04 +00:00
Vlad Lazar
707a2ef6f4 pageserver: add config for vectored get validation 2024-02-16 14:48:33 +00:00
Vlad Lazar
214be7e1e5 pagserver/delta_layer: use vectored blob read
This commit does a few things:
* update the delta layer to use vectored blob reads
* thread the start Lsn for the read down to the delta layer
* thread the max size of the vectored read to the delta layer
2024-02-15 18:41:05 +00:00
Vlad Lazar
a6f49c7a08 pageserver/image_layer: use vectored blob read
This commit updates the image layer to use the vectored
disk blob read. The max size config is also threaded to
the image layer.
2024-02-15 18:41:05 +00:00
Vlad Lazar
92361e4acb pageserver/blob_io: add vectored blob read 2024-02-15 18:41:05 +00:00
Vlad Lazar
3556a2bedb pageserver/config: add a config for max size of vectored read 2024-02-15 18:41:05 +00:00
Vlad Lazar
f1616c3b9b pagserver/virtual_file: add read variant with byte count 2024-02-15 18:41:04 +00:00
Vlad Lazar
4a9b29a8fc utils: add a few VecMap utility functions 2024-02-15 18:38:31 +00:00
Vlad Lazar
cffb5a66da pageserver: count SLRU blocks in basebackup 2024-02-15 18:38:31 +00:00
Vlad Lazar
bf3240ccae Merge branch 'main' into vlad/get-vectored-read-path 2024-02-15 18:33:17 +00:00
Vlad Lazar
8b44003647 review: validate vectored get in release mode too 2024-02-09 14:54:14 +00:00
Vlad Lazar
188ffe717f review: revert to allow unused 2024-02-09 09:10:10 +00:00
Vlad Lazar
4e6574ed90 review: fix test build in release mode 2024-02-08 20:53:46 +00:00
Vlad Lazar
b30171fa59 review: special case sequential cancel for validation 2024-02-08 20:47:16 +00:00
Vlad Lazar
e6c04119c4 review: address round one of Christian's comments 2024-02-08 20:41:13 +00:00
Vlad Lazar
02f1b7905f tests: enable vectored get only in regress tests 2024-02-07 21:00:03 +00:00
Vlad Lazar
c67dfad1e0 pageserver: add vectored get unit test 2024-02-07 21:00:03 +00:00
Vlad Lazar
80134af36a pageserver: validate real get vectored in debug builds 2024-02-07 21:00:03 +00:00
Vlad Lazar
48bd531c74 pageserver: dispatch get vectored based on config 2024-02-07 21:00:03 +00:00
Vlad Lazar
37fac40d4a pageserver/config: add get_vectored_impl config 2024-02-07 21:00:03 +00:00
Vlad Lazar
b342c0f0d4 pageserver: lift the sequential vectored get impl
This is preparation for adding the vectored implementation
2024-02-07 21:00:03 +00:00
Vlad Lazar
5e411a0e98 pageserver: fail vectored search on first missing key
This will be the behaviour of the real vectored search, so the
sequential implementation is changed to ease testing.
2024-02-07 21:00:03 +00:00
Vlad Lazar
ec6fb422bd pageserver: take key space as param for get get vectored
Move the copying up front since the vectored read path will
need the key space in any case.
2024-02-07 21:00:03 +00:00
Vlad Lazar
2b41b02172 pageserver/timeline: add vectorized get real implementation 2024-02-07 21:00:03 +00:00
Vlad Lazar
09527a6386 pageserver/timeline: add vectored reconstruct 2024-02-07 21:00:03 +00:00
Vlad Lazar
c40ebce267 pageserver: add a fringe tracker for the vectored search
The fringe of a vectored search is the set of layers intersect
by the ranges in the search at any given point. The keyspace
intersecting each layer is tracked and updated as the search will
dictate. The double indexing will allow the search to continue from
the highest Lsn.
2024-02-07 21:00:03 +00:00
Vlad Lazar
02fb357a49 pageserver: add a cheap & universal layer desc type
`ReadableLayerDesc` contains enough info about any layer type
such that the layer map can resolve the actual layer from it.
Its main role will be to proxy `get_values_reconstruct_data` calls
down to the right layer.
2024-02-07 21:00:03 +00:00
Vlad Lazar
358d791d53 pageserver/layer_map: add an in memory layer handle type
`InMemoryLayerHandle` can be thought of as the counterpart
of `PersistentLayerDesc`. A future commit will exploit this
similarity and shove both in an enum.
2024-02-07 21:00:03 +00:00
Vlad Lazar
4e9f320b1c pageserver: thread vectored reconstruct to generic layer 2024-02-07 21:00:03 +00:00
Vlad Lazar
98ad71bdf1 pageserver/image_layer: add vectored reconstruct
This change is similar to the previous two patches, but employs an
optimisation. If all the keys in a range exist in the image layer,
then they should be contigous on disk. Hence, we only need to query
the index for the first key in each range.

An assertion is also added to enforce the pre-condition above.
2024-02-07 21:00:03 +00:00
Vlad Lazar
5e6ef06e65 pageserver/delta_layer: add vectored reconstruct
Visit the btree index only one and collect all the offsets which
need to be read (stored in ascending order). Then issue the reads.
Again, read amplification is still an issue for now.
2024-02-07 21:00:03 +00:00
Vlad Lazar
294afcc0cc pageserver/inmemory_layer: add vectored reconstruct
Collect the values for a key space in one go.

Firstly, collect all the offsets at which a read is required and
order them. Secondly, perform all of the reads. Note that read
amplification is still present. A future patch set will deal
with this problem
2024-02-07 21:00:03 +00:00
Vlad Lazar
d985d21a7c pageserver: add data structure to track vectored reconstruct state 2024-02-07 21:00:03 +00:00
Vlad Lazar
dc69d5f37e pageserver_api: allow for merging of keyspaces 2024-02-07 21:00:03 +00:00
Vlad Lazar
a4889675e0 pageserver_api: consume random keyspace accumulator 2024-02-07 21:00:03 +00:00
Vlad Lazar
1b122adc5d layer_map: fix range search layer coalescing
Previously, the layer map range search returned a btree keyed
on an ordered search result type. That ordering was incorrect
since it was based solely on Lsn and treated layers with the
same Lsn floor as equal.

It turns out that the ordering is not actually required, so
this commit changes to using a hash map instead for correctness.
2024-02-07 21:00:00 +00:00
24 changed files with 2040 additions and 237 deletions

View File

@@ -472,6 +472,7 @@ jobs:
CHECK_ONDISK_DATA_COMPATIBILITY: nonempty
BUILD_TAG: ${{ needs.tag.outputs.build-tag }}
PAGESERVER_VIRTUAL_FILE_IO_ENGINE: std-fs
PAGESERVER_GET_VECTORED_IMPL: vectored
- name: Merge and upload coverage data
if: matrix.build_type == 'debug' && matrix.pg_version == 'v14'

1
Cargo.lock generated
View File

@@ -3537,6 +3537,7 @@ dependencies = [
"enum-map",
"hex",
"humantime-serde",
"itertools",
"postgres_ffi",
"rand 0.8.5",
"serde",

View File

@@ -21,6 +21,7 @@ hex.workspace = true
thiserror.workspace = true
humantime-serde.workspace = true
chrono.workspace = true
itertools.workspace = true
workspace_hack.workspace = true

View File

@@ -2,6 +2,7 @@ use postgres_ffi::BLCKSZ;
use std::ops::Range;
use crate::key::Key;
use itertools::Itertools;
///
/// Represents a set of Keys, in a compact form.
@@ -63,6 +64,34 @@ impl KeySpace {
KeyPartitioning { parts }
}
/// Merge another keyspace into the current one.
/// Note: the keyspaces must not ovelap (enforced via assertions)
pub fn merge(&mut self, other: &KeySpace) {
let all_ranges = self
.ranges
.iter()
.merge_by(other.ranges.iter(), |lhs, rhs| lhs.start < rhs.start);
let mut accum = KeySpaceAccum::new();
let mut prev: Option<&Range<Key>> = None;
for range in all_ranges {
if let Some(prev) = prev {
let overlap =
std::cmp::max(range.start, prev.start) < std::cmp::min(range.end, prev.end);
assert!(
!overlap,
"Attempt to merge ovelapping keyspaces: {:?} overlaps {:?}",
prev, range
);
}
accum.add_range(range.clone());
prev = Some(range);
}
self.ranges = accum.to_keyspace().ranges;
}
/// Update the keyspace such that it doesn't contain any range
/// that is overlapping with `other`. This can involve splitting or
/// removing of existing ranges.
@@ -279,6 +308,13 @@ impl KeySpaceRandomAccum {
}
KeySpace { ranges }
}
pub fn consume_keyspace(&mut self) -> KeySpace {
let mut prev_accum = KeySpaceRandomAccum::new();
std::mem::swap(self, &mut prev_accum);
prev_accum.to_keyspace()
}
}
pub fn key_range_size(key_range: &Range<Key>) -> u32 {

View File

@@ -20,10 +20,22 @@ impl<K: Ord, V> VecMap<K, V> {
self.0.is_empty()
}
pub fn last(&self) -> Option<&(K, V)> {
self.0.last()
}
pub fn as_slice(&self) -> &[(K, V)] {
self.0.as_slice()
}
pub fn inner(self) -> Vec<(K, V)> {
self.0
}
pub fn len(&self) -> usize {
self.0.len()
}
/// This function may panic if given a range where the lower bound is
/// greater than the upper bound.
pub fn slice_range<R: RangeBounds<K>>(&self, range: R) -> &[(K, V)] {
@@ -90,6 +102,10 @@ impl<K: Ord, V> VecMap<K, V> {
Ok((None, delta_size))
}
pub fn truncate(&mut self, pos: usize) {
self.0.truncate(pos)
}
/// Split the map into two.
///
/// The left map contains everything before `cutoff` (exclusive).

View File

@@ -8,7 +8,7 @@ use utils::lsn::Lsn;
use rand::prelude::*;
use tokio::sync::Barrier;
use tokio::task::JoinSet;
use tracing::{debug, info, instrument};
use tracing::{info, instrument};
use std::collections::HashMap;
use std::num::NonZeroUsize;
@@ -28,6 +28,8 @@ pub(crate) struct Args {
#[clap(long, default_value = "localhost:64000")]
page_service_host_port: String,
#[clap(long)]
page_service_connstring: Option<String>,
#[clap(long)]
pageserver_jwt: Option<String>,
#[clap(long, default_value = "1")]
num_clients: NonZeroUsize,
@@ -230,12 +232,17 @@ async fn client(
) {
start_work_barrier.wait().await;
let client = pageserver_client::page_service::Client::new(crate::util::connstring::connstring(
&args.page_service_host_port,
args.pageserver_jwt.as_deref(),
))
.await
.unwrap();
let connstr = match &args.page_service_connstring {
Some(connstr) => connstr.clone(),
None => crate::util::connstring::connstring(
&args.page_service_host_port,
args.pageserver_jwt.as_deref(),
),
};
let client = pageserver_client::page_service::Client::new(connstr)
.await
.unwrap();
while let Some(Work { lsn, gzip }) = work.recv().await {
let start = Instant::now();
@@ -263,7 +270,7 @@ async fn client(
}
})
.await;
debug!("basebackup size is {} bytes", size.load(Ordering::Relaxed));
info!("basebackup size is {} bytes", size.load(Ordering::Relaxed));
let elapsed = start.elapsed();
live_stats.inc();
STATS.with(|stats| {

View File

@@ -143,6 +143,7 @@ where
ar: &'a mut Builder<&'b mut W>,
buf: Vec<u8>,
current_segment: Option<(SlruKind, u32)>,
total_blocks: usize,
}
impl<'a, 'b, W> SlruSegmentsBuilder<'a, 'b, W>
@@ -154,6 +155,7 @@ where
ar,
buf: Vec::new(),
current_segment: None,
total_blocks: 0,
}
}
@@ -199,7 +201,8 @@ where
let header = new_tar_header(&segname, self.buf.len() as u64)?;
self.ar.append(&header, self.buf.as_slice()).await?;
trace!("Added to basebackup slru {} relsize {}", segname, nblocks);
self.total_blocks += nblocks;
debug!("Added to basebackup slru {} relsize {}", segname, nblocks);
self.buf.clear();
@@ -207,11 +210,15 @@ where
}
async fn finish(mut self) -> anyhow::Result<()> {
if self.current_segment.is_none() || self.buf.is_empty() {
return Ok(());
}
let res = if self.current_segment.is_none() || self.buf.is_empty() {
Ok(())
} else {
self.flush().await
};
self.flush().await
info!("Collected {} SLRU blocks", self.total_blocks);
res
}
}
@@ -261,10 +268,7 @@ where
let mut slru_builder = SlruSegmentsBuilder::new(&mut self.ar);
for part in slru_partitions.parts {
let blocks = self
.timeline
.get_vectored(&part.ranges, self.lsn, self.ctx)
.await?;
let blocks = self.timeline.get_vectored(part, self.lsn, self.ctx).await?;
for (key, block) in blocks {
slru_builder.add_block(&key, block?).await?;

View File

@@ -33,6 +33,7 @@ use utils::{
use crate::disk_usage_eviction_task::DiskUsageEvictionTaskConfig;
use crate::tenant::config::TenantConf;
use crate::tenant::config::TenantConfOpt;
use crate::tenant::timeline::GetVectoredImpl;
use crate::tenant::{
TENANTS_SEGMENT_NAME, TENANT_DELETED_MARKER_FILE_NAME, TIMELINES_SEGMENT_NAME,
};
@@ -84,6 +85,12 @@ pub mod defaults {
pub const DEFAULT_VIRTUAL_FILE_IO_ENGINE: &str = "std-fs";
pub const DEFAULT_GET_VECTORED_IMPL: &str = "sequential";
pub const DEFAULT_MAX_VECTORED_READ_SIZE: usize = 128 * 1024; // 128 KiB
pub const DEFAULT_VALIDATE_VECTORED_GET: bool = true;
///
/// Default built-in configuration file.
///
@@ -121,6 +128,12 @@ pub mod defaults {
#virtual_file_io_engine = '{DEFAULT_VIRTUAL_FILE_IO_ENGINE}'
#get_vectored_impl = '{DEFAULT_GET_VECTORED_IMPL}'
#max_vectored_read_size = '{DEFAULT_MAX_VECTORED_READ_SIZE}'
#validate_vectored_get = '{DEFAULT_VALIDATE_VECTORED_GET}'
[tenant_config]
#checkpoint_distance = {DEFAULT_CHECKPOINT_DISTANCE} # in bytes
#checkpoint_timeout = {DEFAULT_CHECKPOINT_TIMEOUT}
@@ -256,6 +269,12 @@ pub struct PageServerConf {
pub ingest_batch_size: u64,
pub virtual_file_io_engine: virtual_file::IoEngineKind,
pub get_vectored_impl: GetVectoredImpl,
pub max_vectored_read_size: usize,
pub validate_vectored_get: bool,
}
/// We do not want to store this in a PageServerConf because the latter may be logged
@@ -342,6 +361,12 @@ struct PageServerConfigBuilder {
ingest_batch_size: BuilderValue<u64>,
virtual_file_io_engine: BuilderValue<virtual_file::IoEngineKind>,
get_vectored_impl: BuilderValue<GetVectoredImpl>,
max_vectored_read_size: BuilderValue<usize>,
validate_vectored_get: BuilderValue<bool>,
}
impl Default for PageServerConfigBuilder {
@@ -419,6 +444,10 @@ impl Default for PageServerConfigBuilder {
ingest_batch_size: Set(DEFAULT_INGEST_BATCH_SIZE),
virtual_file_io_engine: Set(DEFAULT_VIRTUAL_FILE_IO_ENGINE.parse().unwrap()),
get_vectored_impl: Set(DEFAULT_GET_VECTORED_IMPL.parse().unwrap()),
max_vectored_read_size: Set(DEFAULT_MAX_VECTORED_READ_SIZE),
validate_vectored_get: Set(DEFAULT_VALIDATE_VECTORED_GET),
}
}
}
@@ -579,6 +608,18 @@ impl PageServerConfigBuilder {
self.virtual_file_io_engine = BuilderValue::Set(value);
}
pub fn get_vectored_impl(&mut self, value: GetVectoredImpl) {
self.get_vectored_impl = BuilderValue::Set(value);
}
pub fn get_max_vectored_read_size(&mut self, value: usize) {
self.max_vectored_read_size = BuilderValue::Set(value);
}
pub fn get_validate_vectored_get(&mut self, value: bool) {
self.validate_vectored_get = BuilderValue::Set(value);
}
pub fn build(self) -> anyhow::Result<PageServerConf> {
let concurrent_tenant_warmup = self
.concurrent_tenant_warmup
@@ -689,6 +730,15 @@ impl PageServerConfigBuilder {
virtual_file_io_engine: self
.virtual_file_io_engine
.ok_or(anyhow!("missing virtual_file_io_engine"))?,
get_vectored_impl: self
.get_vectored_impl
.ok_or(anyhow!("missing get_vectored_impl"))?,
max_vectored_read_size: self
.max_vectored_read_size
.ok_or(anyhow!("missing max_vectored_read_size"))?,
validate_vectored_get: self
.validate_vectored_get
.ok_or(anyhow!("missing validate_vectored_get"))?,
})
}
}
@@ -943,6 +993,15 @@ impl PageServerConf {
"virtual_file_io_engine" => {
builder.virtual_file_io_engine(parse_toml_from_str("virtual_file_io_engine", item)?)
}
"get_vectored_impl" => {
builder.get_vectored_impl(parse_toml_from_str("get_vectored_impl", item)?)
}
"max_vectored_read_size" => {
builder.get_max_vectored_read_size(parse_toml_u64("max_vectored_read_size", item)? as usize)
}
"validate_vectored_get" => {
builder.get_validate_vectored_get(parse_toml_bool("validate_vectored_get", item)?)
}
_ => bail!("unrecognized pageserver option '{key}'"),
}
}
@@ -1017,6 +1076,9 @@ impl PageServerConf {
secondary_download_concurrency: defaults::DEFAULT_SECONDARY_DOWNLOAD_CONCURRENCY,
ingest_batch_size: defaults::DEFAULT_INGEST_BATCH_SIZE,
virtual_file_io_engine: DEFAULT_VIRTUAL_FILE_IO_ENGINE.parse().unwrap(),
get_vectored_impl: defaults::DEFAULT_GET_VECTORED_IMPL.parse().unwrap(),
max_vectored_read_size: defaults::DEFAULT_MAX_VECTORED_READ_SIZE,
validate_vectored_get: defaults::DEFAULT_VALIDATE_VECTORED_GET,
}
}
}
@@ -1250,6 +1312,9 @@ background_task_maximum_delay = '334 s'
secondary_download_concurrency: defaults::DEFAULT_SECONDARY_DOWNLOAD_CONCURRENCY,
ingest_batch_size: defaults::DEFAULT_INGEST_BATCH_SIZE,
virtual_file_io_engine: DEFAULT_VIRTUAL_FILE_IO_ENGINE.parse().unwrap(),
get_vectored_impl: defaults::DEFAULT_GET_VECTORED_IMPL.parse().unwrap(),
max_vectored_read_size: defaults::DEFAULT_MAX_VECTORED_READ_SIZE,
validate_vectored_get: defaults::DEFAULT_VALIDATE_VECTORED_GET,
},
"Correct defaults should be used when no config values are provided"
);
@@ -1314,6 +1379,9 @@ background_task_maximum_delay = '334 s'
secondary_download_concurrency: defaults::DEFAULT_SECONDARY_DOWNLOAD_CONCURRENCY,
ingest_batch_size: 100,
virtual_file_io_engine: DEFAULT_VIRTUAL_FILE_IO_ENGINE.parse().unwrap(),
get_vectored_impl: defaults::DEFAULT_GET_VECTORED_IMPL.parse().unwrap(),
max_vectored_read_size: defaults::DEFAULT_MAX_VECTORED_READ_SIZE,
validate_vectored_get: defaults::DEFAULT_VALIDATE_VECTORED_GET,
},
"Should be able to parse all basic config values correctly"
);

View File

@@ -4219,6 +4219,7 @@ mod tests {
use bytes::BytesMut;
use hex_literal::hex;
use once_cell::sync::Lazy;
use pageserver_api::keyspace::KeySpace;
use rand::{thread_rng, Rng};
use tokio_util::sync::CancellationToken;
@@ -4910,6 +4911,61 @@ mod tests {
Ok(())
}
async fn bulk_insert_compact_gc(
timeline: Arc<Timeline>,
ctx: &RequestContext,
mut lsn: Lsn,
repeat: usize,
key_count: usize,
) -> anyhow::Result<()> {
let mut test_key = Key::from_hex("010000000033333333444444445500000000").unwrap();
let mut blknum = 0;
// Enforce that key range is monotonously increasing
let mut keyspace = KeySpaceAccum::new();
for _ in 0..repeat {
for _ in 0..key_count {
test_key.field6 = blknum;
let writer = timeline.writer().await;
writer
.put(
test_key,
lsn,
&Value::Image(test_img(&format!("{} at {}", blknum, lsn))),
ctx,
)
.await?;
writer.finish_write(lsn);
drop(writer);
keyspace.add_key(test_key);
lsn = Lsn(lsn.0 + 0x10);
blknum += 1;
}
let cutoff = timeline.get_last_record_lsn();
timeline
.update_gc_info(
Vec::new(),
cutoff,
Duration::ZERO,
&CancellationToken::new(),
ctx,
)
.await?;
timeline.freeze_and_flush().await?;
timeline
.compact(&CancellationToken::new(), EnumSet::empty(), ctx)
.await?;
timeline.gc().await?;
}
Ok(())
}
//
// Insert 1000 key-value pairs with increasing keys, flush, compact, GC.
// Repeat 50 times.
@@ -4922,49 +4978,98 @@ mod tests {
.create_test_timeline(TIMELINE_ID, Lsn(0x08), DEFAULT_PG_VERSION, &ctx)
.await?;
let mut lsn = Lsn(0x10);
let lsn = Lsn(0x10);
bulk_insert_compact_gc(tline.clone(), &ctx, lsn, 50, 10000).await?;
let mut keyspace = KeySpaceAccum::new();
Ok(())
}
let mut test_key = Key::from_hex("010000000033333333444444445500000000").unwrap();
let mut blknum = 0;
for _ in 0..50 {
for _ in 0..10000 {
test_key.field6 = blknum;
let writer = tline.writer().await;
writer
.put(
test_key,
lsn,
&Value::Image(test_img(&format!("{} at {}", blknum, lsn))),
&ctx,
)
.await?;
writer.finish_write(lsn);
drop(writer);
// Test the vectored get real implementation against a simple sequential implementation.
//
// The test generates a keyspace by repeatedly flushing the in-memory layer and compacting.
// Projected to 2D the key space looks like below. Lsn grows upwards on the Y axis and keys
// grow to the right on the X axis.
// [Delta]
// [Delta]
// [Delta]
// [Delta]
// ------------ Image ---------------
//
// After layer generation we pick the ranges to query as follows:
// 1. The beginning of each delta layer
// 2. At the seam between two adjacent delta layers
//
// There's one major downside to this test: delta layers only contains images,
// so the search can stop at the first delta layer and doesn't traverse any deeper.
#[tokio::test]
async fn test_get_vectored() -> anyhow::Result<()> {
let harness = TenantHarness::create("test_get_vectored")?;
let (tenant, ctx) = harness.load().await;
let tline = tenant
.create_test_timeline(TIMELINE_ID, Lsn(0x08), DEFAULT_PG_VERSION, &ctx)
.await?;
keyspace.add_key(test_key);
let lsn = Lsn(0x10);
bulk_insert_compact_gc(tline.clone(), &ctx, lsn, 50, 10000).await?;
lsn = Lsn(lsn.0 + 0x10);
blknum += 1;
let guard = tline.layers.read().await;
guard.layer_map().dump(true, &ctx).await?;
let mut reads = Vec::new();
let mut prev = None;
guard.layer_map().iter_historic_layers().for_each(|desc| {
if !desc.is_delta() {
prev = Some(desc.clone());
return;
}
let cutoff = tline.get_last_record_lsn();
let start = desc.key_range.start;
let end = desc
.key_range
.start
.add(Timeline::MAX_GET_VECTORED_KEYS.try_into().unwrap());
reads.push(KeySpace {
ranges: vec![start..end],
});
if let Some(prev) = &prev {
if !prev.is_delta() {
return;
}
let first_range = Key {
field6: prev.key_range.end.field6 - 4,
..prev.key_range.end
}..prev.key_range.end;
let second_range = desc.key_range.start..Key {
field6: desc.key_range.start.field6 + 4,
..desc.key_range.start
};
reads.push(KeySpace {
ranges: vec![first_range, second_range],
});
};
prev = Some(desc.clone());
});
drop(guard);
// Pick a big LSN such that we query over all the changes.
// Technically, u64::MAX - 1 is the largest LSN supported by the read path,
// but there seems to be a bug on the non-vectored search path which surfaces
// in that case.
let reads_lsn = Lsn(u64::MAX - 1000);
for read in reads {
info!("Doing vectored read on {:?}", read);
let vectored_res = tline.get_vectored_impl(read.clone(), reads_lsn, &ctx).await;
tline
.update_gc_info(
Vec::new(),
cutoff,
Duration::ZERO,
&CancellationToken::new(),
&ctx,
)
.await?;
tline.freeze_and_flush().await?;
tline
.compact(&CancellationToken::new(), EnumSet::empty(), &ctx)
.await?;
tline.gc().await?;
.validate_get_vectored_impl(&vectored_res, read, reads_lsn, &ctx)
.await;
}
Ok(())

View File

@@ -13,6 +13,7 @@
//!
use bytes::{BufMut, BytesMut};
use tokio_epoll_uring::{BoundedBuf, Slice};
use utils::vec_map::VecMap;
use crate::context::RequestContext;
use crate::page_cache::PAGE_SZ;
@@ -21,6 +22,17 @@ use crate::virtual_file::VirtualFile;
use std::cmp::min;
use std::io::{Error, ErrorKind};
pub struct VectoredBlobMeta<Meta> {
pub start: usize,
pub end: usize,
pub meta: Meta,
}
pub struct VectoredBlobs<Meta> {
pub buf: BytesMut,
pub metas: Vec<VectoredBlobMeta<Meta>>,
}
impl<'a> BlockCursor<'a> {
/// Read a blob into a new buffer.
pub async fn read_blob(
@@ -93,6 +105,146 @@ impl<'a> BlockCursor<'a> {
}
}
#[derive(Debug)]
pub struct VectoredRead<Meta> {
pub start: u64,
pub end: u64,
pub blobs_at: VecMap<u64, Meta>,
max_read_size: usize,
}
#[derive(Eq, PartialEq)]
pub enum VectoredReadExtended {
Yes,
No,
}
impl<Meta> VectoredRead<Meta> {
pub fn new(start_offset: u64, end_offset: u64, meta: Meta, max_read_size: usize) -> Self {
let mut blobs_at = VecMap::default();
blobs_at
.append(start_offset, meta)
.expect("First insertion always succeeds");
Self {
start: start_offset,
end: end_offset,
blobs_at,
max_read_size,
}
}
pub fn extend(&mut self, start: u64, end: u64, meta: Meta) -> VectoredReadExtended {
let size = (end - start) as usize;
if self.end == start && self.size() + size <= self.max_read_size {
self.end = end;
self.blobs_at
.append(start, meta)
.expect("LSNs are ordered within vectored reads");
return VectoredReadExtended::Yes;
}
VectoredReadExtended::No
}
pub fn last_meta(&self) -> Option<&Meta> {
self.blobs_at.last().map(|(_, meta)| meta)
}
pub fn truncate_at<Pred>(&mut self, mut pred: Pred)
where
Pred: FnMut(&Meta) -> bool,
{
if let Some(pos) = self
.blobs_at
.as_slice()
.iter()
.position(|(_, meta)| pred(meta))
{
self.blobs_at.truncate(pos);
}
}
pub fn size(&self) -> usize {
(self.end - self.start) as usize
}
}
pub struct VectoredBlobReader {
file: VirtualFile,
max_vectored_read_size: usize,
}
impl VectoredBlobReader {
pub fn new(file: VirtualFile, max_vectored_read_size: usize) -> Self {
Self {
file,
max_vectored_read_size,
}
}
pub fn get_max_read_size(&self) -> usize {
self.max_vectored_read_size
}
pub async fn read_blobs<Meta: Copy>(
&self,
read: &VectoredRead<Meta>,
buf: BytesMut,
) -> Result<VectoredBlobs<Meta>, std::io::Error> {
// tracing::info!("read_blobs(read={read:?}, read_size={})", read.size());
assert!(read.size() > 0);
assert!(
read.size() <= buf.capacity(),
"{} > {}",
read.size(),
buf.capacity()
);
let buf = self
.file
.read_exact_at_n(buf, read.start, read.size())
.await?;
let blobs_at = read.blobs_at.as_slice();
let start_offset = blobs_at.first().expect("VectoredRead is never empty").0;
let mut metas = Vec::new();
let pairs = blobs_at.iter().zip(
blobs_at
.iter()
.map(Some)
.skip(1)
.chain(std::iter::once(None)),
);
for ((offset, meta), next) in pairs {
let offset_in_buf = offset - start_offset;
let first_len_byte = buf[offset_in_buf as usize];
// Each blob is prefixed by a header containing it's size.
// Skip that header to find the start of the data.
// TODO: validate against the stored size
let size_offset = if first_len_byte < 0x80 { 1 } else { 4 };
let start = offset_in_buf + size_offset;
let end = match next {
Some((next_blob_start_offset, _)) => next_blob_start_offset - start_offset,
None => read.end - start_offset,
};
metas.push(VectoredBlobMeta {
start: start as usize,
end: end as usize,
meta: *meta,
})
}
Ok(VectoredBlobs { buf, metas })
}
}
/// A wrapper of `VirtualFile` that allows users to write blobs.
///
/// If a `BlobWriter` is dropped, the internal buffer will be

View File

@@ -52,8 +52,7 @@ use crate::repository::Key;
use crate::tenant::storage_layer::InMemoryLayer;
use anyhow::Result;
use pageserver_api::keyspace::KeySpaceAccum;
use std::cmp::Ordering;
use std::collections::{BTreeMap, VecDeque};
use std::collections::{HashMap, VecDeque};
use std::iter::Peekable;
use std::ops::Range;
use std::sync::Arc;
@@ -147,43 +146,22 @@ impl Drop for BatchedUpdates<'_> {
}
/// Return value of LayerMap::search
#[derive(Eq, PartialEq, Debug)]
#[derive(Eq, PartialEq, Debug, Hash)]
pub struct SearchResult {
pub layer: Arc<PersistentLayerDesc>,
pub lsn_floor: Lsn,
}
pub struct OrderedSearchResult(SearchResult);
impl Ord for OrderedSearchResult {
fn cmp(&self, other: &Self) -> Ordering {
self.0.lsn_floor.cmp(&other.0.lsn_floor)
}
}
impl PartialOrd for OrderedSearchResult {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl PartialEq for OrderedSearchResult {
fn eq(&self, other: &Self) -> bool {
self.0.lsn_floor == other.0.lsn_floor
}
}
impl Eq for OrderedSearchResult {}
#[derive(Debug)]
pub struct RangeSearchResult {
pub found: BTreeMap<OrderedSearchResult, KeySpaceAccum>,
pub found: HashMap<SearchResult, KeySpaceAccum>,
pub not_found: KeySpaceAccum,
}
impl RangeSearchResult {
fn new() -> Self {
Self {
found: BTreeMap::new(),
found: HashMap::new(),
not_found: KeySpaceAccum::new(),
}
}
@@ -314,7 +292,7 @@ where
Some(search_result) => self
.result
.found
.entry(OrderedSearchResult(search_result))
.entry(search_result)
.or_default()
.add_range(covered_range),
None => self.pad_range(covered_range),
@@ -362,6 +340,35 @@ where
}
}
#[derive(PartialEq, Eq, Hash, Debug, Clone)]
pub enum InMemoryLayerHandle {
Open {
lsn_floor: Lsn,
end_lsn: Lsn,
},
Frozen {
idx: usize,
lsn_floor: Lsn,
end_lsn: Lsn,
},
}
impl InMemoryLayerHandle {
pub fn get_lsn_floor(&self) -> Lsn {
match self {
InMemoryLayerHandle::Open { lsn_floor, .. } => *lsn_floor,
InMemoryLayerHandle::Frozen { lsn_floor, .. } => *lsn_floor,
}
}
pub fn get_end_lsn(&self) -> Lsn {
match self {
InMemoryLayerHandle::Open { end_lsn, .. } => *end_lsn,
InMemoryLayerHandle::Frozen { end_lsn, .. } => *end_lsn,
}
}
}
impl LayerMap {
///
/// Find the latest layer (by lsn.end) that covers the given
@@ -556,6 +563,43 @@ impl LayerMap {
self.historic.iter()
}
/// Get a handle for the first in memory layer that matches the provided predicate.
/// The handle should be used with [`Self::get_in_memory_layer`] to retrieve the actual layer.
///
/// Note: [`Self::find_in_memory_layer`] and [`Self::get_in_memory_layer`] should be called during
/// the same exclusive region established by holding the layer manager lock.
pub fn find_in_memory_layer<Pred>(&self, mut pred: Pred) -> Option<InMemoryLayerHandle>
where
Pred: FnMut(&Arc<InMemoryLayer>) -> bool,
{
if let Some(open) = &self.open_layer {
if pred(open) {
return Some(InMemoryLayerHandle::Open {
lsn_floor: open.get_lsn_range().start,
end_lsn: open.get_lsn_range().end,
});
}
}
let pos = self.frozen_layers.iter().rev().position(pred);
pos.map(|rev_idx| {
let idx = self.frozen_layers.len() - 1 - rev_idx;
InMemoryLayerHandle::Frozen {
idx,
lsn_floor: self.frozen_layers[idx].get_lsn_range().start,
end_lsn: self.frozen_layers[idx].get_lsn_range().end,
}
})
}
/// Get the layer pointed to by the provided handle.
pub fn get_in_memory_layer(&self, handle: &InMemoryLayerHandle) -> Option<Arc<InMemoryLayer>> {
match handle {
InMemoryLayerHandle::Open { .. } => self.open_layer.clone(),
InMemoryLayerHandle::Frozen { idx, .. } => self.frozen_layers.get(*idx).cloned(),
}
}
///
/// Divide the whole given range of keys into sub-ranges based on the latest
/// image layer that covers each range at the specified lsn (inclusive).
@@ -869,6 +913,8 @@ impl LayerMap {
#[cfg(test)]
mod tests {
use pageserver_api::keyspace::KeySpace;
use super::*;
#[derive(Clone)]
@@ -895,15 +941,15 @@ mod tests {
fn assert_range_search_result_eq(lhs: RangeSearchResult, rhs: RangeSearchResult) {
assert_eq!(lhs.not_found.to_keyspace(), rhs.not_found.to_keyspace());
let lhs: Vec<_> = lhs
let lhs: HashMap<SearchResult, KeySpace> = lhs
.found
.into_iter()
.map(|(search_result, accum)| (search_result.0, accum.to_keyspace()))
.map(|(search_result, accum)| (search_result, accum.to_keyspace()))
.collect();
let rhs: Vec<_> = rhs
let rhs: HashMap<SearchResult, KeySpace> = rhs
.found
.into_iter()
.map(|(search_result, accum)| (search_result.0, accum.to_keyspace()))
.map(|(search_result, accum)| (search_result, accum.to_keyspace()))
.collect();
assert_eq!(lhs, rhs);
@@ -923,7 +969,7 @@ mod tests {
Some(res) => {
range_search_result
.found
.entry(OrderedSearchResult(res))
.entry(res)
.or_default()
.add_key(key);
}

View File

@@ -8,15 +8,21 @@ pub(crate) mod layer;
mod layer_desc;
use crate::context::{AccessStatsBehavior, RequestContext};
use crate::repository::Value;
use crate::task_mgr::TaskKind;
use crate::walrecord::NeonWalRecord;
use bytes::Bytes;
use enum_map::EnumMap;
use enumset::EnumSet;
use once_cell::sync::Lazy;
use pageserver_api::key::Key;
use pageserver_api::keyspace::{KeySpace, KeySpaceRandomAccum};
use pageserver_api::models::{
LayerAccessKind, LayerResidenceEvent, LayerResidenceEventReason, LayerResidenceStatus,
};
use std::cmp::{Ordering, Reverse};
use std::collections::hash_map::Entry;
use std::collections::{BinaryHeap, HashMap};
use std::ops::Range;
use std::sync::Mutex;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
@@ -34,6 +40,11 @@ pub use layer_desc::{PersistentLayerDesc, PersistentLayerKey};
pub(crate) use layer::{EvictionError, Layer, ResidentLayer};
use super::layer_map::InMemoryLayerHandle;
use super::timeline::layer_manager::LayerManager;
use super::timeline::GetVectoredError;
use super::PageReconstructError;
pub fn range_overlaps<T>(a: &Range<T>, b: &Range<T>) -> bool
where
T: PartialOrd<T>,
@@ -67,6 +78,287 @@ pub struct ValueReconstructState {
pub img: Option<(Lsn, Bytes)>,
}
#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
pub(crate) enum ValueReconstructSituation {
Complete,
#[default]
Continue,
}
/// Reconstruct data accumulated for a single key during a vectored get
#[derive(Debug, Default, Clone)]
pub(crate) struct VectoredValueReconstructState {
pub(crate) records: Vec<(Lsn, NeonWalRecord)>,
pub(crate) img: Option<(Lsn, Bytes)>,
situation: ValueReconstructSituation,
}
impl VectoredValueReconstructState {
fn get_cached_lsn(&self) -> Option<Lsn> {
self.img.as_ref().map(|img| img.0)
}
}
impl From<VectoredValueReconstructState> for ValueReconstructState {
fn from(mut state: VectoredValueReconstructState) -> Self {
// walredo expects the records to be descending in terms of Lsn
state.records.sort_by_key(|(lsn, _)| Reverse(*lsn));
ValueReconstructState {
records: state.records,
img: state.img,
}
}
}
/// Bag of data accumulated during a vectored get
pub(crate) struct ValuesReconstructState {
pub(crate) keys: HashMap<Key, Result<VectoredValueReconstructState, PageReconstructError>>,
keys_done: KeySpaceRandomAccum,
}
impl ValuesReconstructState {
pub(crate) fn new() -> Self {
Self {
keys: HashMap::new(),
keys_done: KeySpaceRandomAccum::new(),
}
}
/// Associate a key with the error which it encountered and mark it as done
pub(crate) fn on_key_error(&mut self, key: Key, err: PageReconstructError) {
let previous = self.keys.insert(key, Err(err));
if let Some(Ok(state)) = previous {
if state.situation == ValueReconstructSituation::Continue {
self.keys_done.add_key(key);
}
}
}
/// Update the state collected for a given key.
/// Returns true if this was the last value needed for the key and false otherwise.
///
/// If the key is done after the update, mark it as such.
pub(crate) fn update_key(
&mut self,
key: &Key,
lsn: Lsn,
value: Value,
) -> ValueReconstructSituation {
let state = self
.keys
.entry(*key)
.or_insert(Ok(VectoredValueReconstructState::default()));
if let Ok(state) = state {
let key_done = match state.situation {
ValueReconstructSituation::Complete => unreachable!(),
ValueReconstructSituation::Continue => match value {
Value::Image(img) => {
state.img = Some((lsn, img));
true
}
Value::WalRecord(rec) => {
let reached_cache =
state.get_cached_lsn().map(|clsn| clsn + 1) == Some(lsn);
let will_init = rec.will_init();
state.records.push((lsn, rec));
will_init || reached_cache
}
},
};
if key_done && state.situation == ValueReconstructSituation::Continue {
state.situation = ValueReconstructSituation::Complete;
self.keys_done.add_key(*key);
}
state.situation
} else {
ValueReconstructSituation::Complete
}
}
/// Returns the Lsn at which this key is cached if one exists.
/// The read path should go no further than this Lsn for the given key.
pub(crate) fn get_cached_lsn(&self, key: &Key) -> Option<Lsn> {
self.keys
.get(key)
.and_then(|k| k.as_ref().ok())
.and_then(|state| state.get_cached_lsn())
}
/// Returns the key space describing the keys that have
/// been marked as completed since the last call to this function.
pub(crate) fn consume_done_keys(&mut self) -> KeySpace {
self.keys_done.consume_keyspace()
}
}
impl Default for ValuesReconstructState {
fn default() -> Self {
Self::new()
}
}
/// Description of layer to be read - the layer map can turn
/// this description into the actual layer.
#[derive(PartialEq, Eq, Hash, Debug, Clone)]
pub(crate) enum ReadableLayerDesc {
Persistent {
desc: PersistentLayerDesc,
lsn_floor: Lsn,
lsn_ceil: Lsn,
},
InMemory {
handle: InMemoryLayerHandle,
lsn_ceil: Lsn,
},
}
/// Wraper for 'ReadableLayerDesc' sorted by Lsn
#[derive(Debug)]
struct ReadableLayerDescOrdered(ReadableLayerDesc);
/// Data structure which maintains a fringe of layers for the
/// read path. The fringe is the set of layers which intersects
/// the current keyspace that the search is descending on.
/// Each layer tracks the keyspace that intersects it.
///
/// The fringe must appear sorted by Lsn. Hence, it uses
/// a two layer indexing scheme.
#[derive(Debug)]
pub(crate) struct LayerFringe {
layers_by_lsn: BinaryHeap<ReadableLayerDescOrdered>,
layers: HashMap<ReadableLayerDesc, KeySpace>,
}
impl LayerFringe {
pub(crate) fn new() -> Self {
LayerFringe {
layers_by_lsn: BinaryHeap::new(),
layers: HashMap::new(),
}
}
pub(crate) fn next_layer(&mut self) -> Option<(ReadableLayerDesc, KeySpace)> {
let handle = match self.layers_by_lsn.pop() {
Some(h) => h,
None => return None,
};
let removed = self.layers.remove_entry(&handle.0);
match removed {
Some((layer, keyspace)) => Some((layer, keyspace)),
None => panic!(),
}
}
pub(crate) fn update(&mut self, layer: ReadableLayerDesc, keyspace: KeySpace) {
let entry = self.layers.entry(layer.clone());
match entry {
Entry::Occupied(mut entry) => {
entry.get_mut().merge(&keyspace);
}
Entry::Vacant(entry) => {
self.layers_by_lsn
.push(ReadableLayerDescOrdered(entry.key().clone()));
entry.insert(keyspace);
}
}
}
}
impl Default for LayerFringe {
fn default() -> Self {
Self::new()
}
}
impl Ord for ReadableLayerDescOrdered {
fn cmp(&self, other: &Self) -> Ordering {
let ord = self.0.get_lsn_ceil().cmp(&other.0.get_lsn_ceil());
if ord == std::cmp::Ordering::Equal {
self.0
.get_lsn_floor()
.cmp(&other.0.get_lsn_floor())
.reverse()
} else {
ord
}
}
}
impl PartialOrd for ReadableLayerDescOrdered {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl PartialEq for ReadableLayerDescOrdered {
fn eq(&self, other: &Self) -> bool {
self.0.get_lsn_floor() == other.0.get_lsn_floor()
&& self.0.get_lsn_ceil() == other.0.get_lsn_ceil()
}
}
impl Eq for ReadableLayerDescOrdered {}
impl ReadableLayerDesc {
pub(crate) fn get_lsn_floor(&self) -> Lsn {
match self {
ReadableLayerDesc::Persistent { lsn_floor, .. } => *lsn_floor,
ReadableLayerDesc::InMemory { handle, .. } => handle.get_lsn_floor(),
}
}
pub(crate) fn get_lsn_ceil(&self) -> Lsn {
match self {
ReadableLayerDesc::Persistent { lsn_ceil, .. } => *lsn_ceil,
ReadableLayerDesc::InMemory { lsn_ceil, .. } => *lsn_ceil,
}
}
pub(crate) async fn get_values_reconstruct_data(
&self,
layer_manager: &LayerManager,
keyspace: KeySpace,
reconstruct_state: &mut ValuesReconstructState,
ctx: &RequestContext,
) -> Result<(), GetVectoredError> {
match self {
ReadableLayerDesc::Persistent {
desc,
lsn_floor,
lsn_ceil,
} => {
let layer = layer_manager.get_from_desc(desc);
layer
.get_values_reconstruct_data(
keyspace,
*lsn_floor,
*lsn_ceil,
reconstruct_state,
ctx,
)
.await
}
ReadableLayerDesc::InMemory { handle, lsn_ceil } => {
let layer = layer_manager
.layer_map()
.get_in_memory_layer(handle)
.unwrap();
layer
.get_values_reconstruct_data(keyspace, *lsn_ceil, reconstruct_state, ctx)
.await
}
}
}
}
/// Return value from [`Layer::get_value_reconstruct_data`]
#[derive(Clone, Copy, Debug)]
pub enum ValueReconstructResult {

View File

@@ -31,16 +31,19 @@ use crate::config::PageServerConf;
use crate::context::{PageContentKind, RequestContext, RequestContextBuilder};
use crate::page_cache::PAGE_SZ;
use crate::repository::{Key, Value, KEY_SIZE};
use crate::tenant::blob_io::BlobWriter;
use crate::tenant::blob_io::{BlobWriter, VectoredBlobReader, VectoredRead, VectoredReadExtended};
use crate::tenant::block_io::{BlockBuf, BlockCursor, BlockLease, BlockReader, FileBlockReader};
use crate::tenant::disk_btree::{DiskBtreeBuilder, DiskBtreeReader, VisitDirection};
use crate::tenant::storage_layer::{Layer, ValueReconstructResult, ValueReconstructState};
use crate::tenant::Timeline;
use crate::tenant::timeline::GetVectoredError;
use crate::tenant::{PageReconstructError, Timeline};
use crate::virtual_file::{self, VirtualFile};
use crate::{walrecord, TEMP_FILE_SUFFIX};
use crate::{DELTA_FILE_MAGIC, STORAGE_FORMAT_VERSION};
use anyhow::{bail, ensure, Context, Result};
use anyhow::{anyhow, bail, ensure, Context, Result};
use bytes::BytesMut;
use camino::{Utf8Path, Utf8PathBuf};
use pageserver_api::keyspace::KeySpace;
use pageserver_api::models::LayerAccessKind;
use pageserver_api::shard::TenantShardId;
use rand::{distributions::Alphanumeric, Rng};
@@ -59,7 +62,10 @@ use utils::{
lsn::Lsn,
};
use super::{AsLayerDesc, LayerAccessStats, PersistentLayerDesc, ResidentLayer};
use super::{
AsLayerDesc, LayerAccessStats, PersistentLayerDesc, ResidentLayer, ValueReconstructSituation,
ValuesReconstructState,
};
///
/// Header stored in the beginning of the file
@@ -207,6 +213,7 @@ pub struct DeltaLayerInner {
// values copied from summary
index_start_blk: u32,
index_root_blk: u32,
vectored_blob_reader: VectoredBlobReader,
/// Reader object for reading blocks from the file.
file: FileBlockReader,
@@ -242,7 +249,7 @@ impl DeltaLayer {
return Ok(());
}
let inner = self.load(LayerAccessKind::Dump, ctx).await?;
let inner = self.load(LayerAccessKind::Dump, 0, ctx).await?;
inner.dump(ctx).await
}
@@ -278,20 +285,25 @@ impl DeltaLayer {
async fn load(
&self,
access_kind: LayerAccessKind,
max_vectored_read_size: usize,
ctx: &RequestContext,
) -> Result<&Arc<DeltaLayerInner>> {
self.access_stats.record_access(access_kind, ctx);
// Quick exit if already loaded
self.inner
.get_or_try_init(|| self.load_inner(ctx))
.get_or_try_init(|| self.load_inner(max_vectored_read_size, ctx))
.await
.with_context(|| format!("Failed to load delta layer {}", self.path()))
}
async fn load_inner(&self, ctx: &RequestContext) -> Result<Arc<DeltaLayerInner>> {
async fn load_inner(
&self,
max_vectored_read_size: usize,
ctx: &RequestContext,
) -> Result<Arc<DeltaLayerInner>> {
let path = self.path();
let loaded = DeltaLayerInner::load(&path, None, ctx)
let loaded = DeltaLayerInner::load(&path, None, max_vectored_read_size, ctx)
.await
.and_then(|res| res)?;
@@ -685,6 +697,12 @@ impl DeltaLayer {
}
}
#[derive(Copy, Clone, Debug)]
struct DeltaVecBlobMeta {
pub key: Key,
pub lsn: Lsn,
}
impl DeltaLayerInner {
/// Returns nested result following Result<Result<_, OpErr>, Critical>:
/// - inner has the success or transient failure
@@ -692,15 +710,16 @@ impl DeltaLayerInner {
pub(super) async fn load(
path: &Utf8Path,
summary: Option<Summary>,
max_vectored_read_size: usize,
ctx: &RequestContext,
) -> Result<Result<Self, anyhow::Error>, anyhow::Error> {
let file = match VirtualFile::open(path).await {
Ok(file) => file,
Err(e) => return Ok(Err(anyhow::Error::new(e).context("open layer file"))),
};
let file = FileBlockReader::new(file);
let block_reader = FileBlockReader::new(file);
let summary_blk = match file.read_blk(0, ctx).await {
let summary_blk = match block_reader.read_blk(0, ctx).await {
Ok(blk) => blk,
Err(e) => return Ok(Err(anyhow::Error::new(e).context("read first block"))),
};
@@ -722,8 +741,16 @@ impl DeltaLayerInner {
}
}
// TODO: don't open file twice
let file = match VirtualFile::open(path).await {
Ok(file) => file,
Err(e) => return Ok(Err(anyhow::Error::new(e).context("open layer file"))),
};
let vectored_blob_reader = VectoredBlobReader::new(file, max_vectored_read_size);
Ok(Ok(DeltaLayerInner {
file,
file: block_reader,
vectored_blob_reader,
index_start_blk: actual_summary.index_start_blk,
index_root_blk: actual_summary.index_root_blk,
}))
@@ -818,6 +845,182 @@ impl DeltaLayerInner {
}
}
// Look up the keys in the provided keyspace and update
// the reconstruct state with whatever is found.
//
// If the key is cached, go no further than the cached Lsn.
//
// Currently, the index is visited for each range, but this
// can be further optimised to visit the index only once.
pub(super) async fn get_values_reconstruct_data(
&self,
keyspace: KeySpace,
start_lsn: Lsn,
end_lsn: Lsn,
reconstruct_state: &mut ValuesReconstructState,
ctx: &RequestContext,
) -> Result<(), GetVectoredError> {
let file = &self.file;
let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
self.index_start_blk,
self.index_root_blk,
file,
);
let mut reads = Vec::new();
for range in keyspace.ranges.iter() {
let mut current_read: Option<VectoredRead<DeltaVecBlobMeta>> = None;
let mut prev_idx = None;
// Scan the page versions backwards, starting from the last key in the range.
// to collect all the offsets which need to be read.
let start_key = DeltaKey::from_key_lsn(&range.start, start_lsn);
tree_reader
.visit(
&start_key.0,
VisitDirection::Forwards,
|raw_key, value| {
let key = Key::from_slice(&raw_key[..KEY_SIZE]);
let lsn = DeltaKey::extract_lsn_from_buf(raw_key);
let blob_ref = BlobRef(value);
let (prev_key, prev_lsn, prev_blob) = match prev_idx {
None => {
prev_idx = Some((key, lsn, blob_ref));
return true;
}
Some(prev) => prev,
};
let key_start_lsn =
std::cmp::max(reconstruct_state.get_cached_lsn(&key), Some(start_lsn));
if Some(prev_lsn) < key_start_lsn || prev_lsn >= end_lsn {
prev_idx = Some((key, lsn, blob_ref));
return true;
}
if prev_key >= range.end {
if let Some(read) = current_read.take() {
reads.push(read);
}
return false;
}
let extended = match &mut current_read {
None => VectoredReadExtended::No,
Some(read) => {
// The index is traversed forwards. If the blob is `will_init`, it renders the already
// accumulated blobs obsolete.
if read.last_meta().map(|meta| meta.key) == Some(prev_key)
&& prev_blob.will_init()
{
read.truncate_at(|meta| meta.key == prev_key);
}
read.extend(
prev_blob.pos(),
blob_ref.pos(),
DeltaVecBlobMeta {
key: prev_key,
lsn: prev_lsn,
},
)
}
};
if extended == VectoredReadExtended::No {
let next_read = VectoredRead::new(
prev_blob.pos(),
blob_ref.pos(),
DeltaVecBlobMeta {
key: prev_key,
lsn: prev_lsn,
},
self.vectored_blob_reader.get_max_read_size(),
);
let prev_read = current_read.replace(next_read);
if let Some(read) = prev_read {
reads.push(read);
}
}
prev_idx = Some((key, lsn, blob_ref));
true
},
&RequestContextBuilder::extend(ctx)
.page_content_kind(PageContentKind::DeltaLayerBtreeNode)
.build(),
)
.await
.map_err(|err| GetVectoredError::Other(anyhow!(err)))?;
}
let mut ignore_key = None;
let mut buf = Some(BytesMut::with_capacity(
self.vectored_blob_reader.get_max_read_size(),
));
for read in reads.into_iter().rev() {
let res = self
.vectored_blob_reader
.read_blobs(&read, buf.take().expect("Should have a buffer"))
.await;
let blobs = match res {
Ok(blobs) => blobs,
Err(err) => {
let kind = err.kind();
for (_, blob_meta) in read.blobs_at.as_slice() {
reconstruct_state.on_key_error(
blob_meta.key,
PageReconstructError::from(anyhow!(
"Failed to read blobs from virtual file {}: {}",
file.file.path,
kind
)),
);
}
continue;
}
};
for meta in blobs.metas.iter().rev() {
if Some(meta.meta.key) == ignore_key {
continue;
}
let value = Value::des(&blobs.buf[meta.start..meta.end]);
if let Err(e) = value {
reconstruct_state.on_key_error(
meta.meta.key,
PageReconstructError::from(anyhow!(e).context(format!(
"Failed to deserialize blob from virtual file {}",
file.file.path
))),
);
ignore_key = Some(meta.meta.key);
continue;
}
let key_situation =
reconstruct_state.update_key(&meta.meta.key, meta.meta.lsn, value.unwrap());
if key_situation == ValueReconstructSituation::Complete {
ignore_key = Some(meta.meta.key);
}
}
buf = Some(blobs.buf);
}
Ok(())
}
pub(super) async fn load_keys<'a>(
&'a self,
ctx: &RequestContext,

View File

@@ -26,20 +26,22 @@
use crate::config::PageServerConf;
use crate::context::{PageContentKind, RequestContext, RequestContextBuilder};
use crate::page_cache::PAGE_SZ;
use crate::repository::{Key, KEY_SIZE};
use crate::tenant::blob_io::BlobWriter;
use crate::repository::{Key, Value, KEY_SIZE};
use crate::tenant::blob_io::{BlobWriter, VectoredBlobReader, VectoredRead, VectoredReadExtended};
use crate::tenant::block_io::{BlockBuf, BlockReader, FileBlockReader};
use crate::tenant::disk_btree::{DiskBtreeBuilder, DiskBtreeReader, VisitDirection};
use crate::tenant::storage_layer::{
LayerAccessStats, ValueReconstructResult, ValueReconstructState,
};
use crate::tenant::Timeline;
use crate::tenant::timeline::GetVectoredError;
use crate::tenant::{PageReconstructError, Timeline};
use crate::virtual_file::{self, VirtualFile};
use crate::{IMAGE_FILE_MAGIC, STORAGE_FORMAT_VERSION, TEMP_FILE_SUFFIX};
use anyhow::{bail, ensure, Context, Result};
use bytes::Bytes;
use anyhow::{anyhow, bail, ensure, Context, Result};
use bytes::{Bytes, BytesMut};
use camino::{Utf8Path, Utf8PathBuf};
use hex;
use pageserver_api::keyspace::KeySpace;
use pageserver_api::models::LayerAccessKind;
use pageserver_api::shard::TenantShardId;
use rand::{distributions::Alphanumeric, Rng};
@@ -59,7 +61,7 @@ use utils::{
};
use super::filename::ImageFileName;
use super::{AsLayerDesc, Layer, PersistentLayerDesc, ResidentLayer};
use super::{AsLayerDesc, Layer, PersistentLayerDesc, ResidentLayer, ValuesReconstructState};
///
/// Header stored in the beginning of the file
@@ -152,6 +154,7 @@ pub struct ImageLayerInner {
/// Reader object for reading blocks from the file.
file: FileBlockReader,
vectored_blob_reader: VectoredBlobReader,
}
impl std::fmt::Debug for ImageLayerInner {
@@ -208,7 +211,7 @@ impl ImageLayer {
return Ok(());
}
let inner = self.load(LayerAccessKind::Dump, ctx).await?;
let inner = self.load(LayerAccessKind::Dump, 0, ctx).await?;
inner.dump(ctx).await?;
@@ -238,21 +241,32 @@ impl ImageLayer {
async fn load(
&self,
access_kind: LayerAccessKind,
max_vectored_read_size: usize,
ctx: &RequestContext,
) -> Result<&ImageLayerInner> {
self.access_stats.record_access(access_kind, ctx);
self.inner
.get_or_try_init(|| self.load_inner(ctx))
.get_or_try_init(|| self.load_inner(max_vectored_read_size, ctx))
.await
.with_context(|| format!("Failed to load image layer {}", self.path()))
}
async fn load_inner(&self, ctx: &RequestContext) -> Result<ImageLayerInner> {
async fn load_inner(
&self,
max_vectored_read_size: usize,
ctx: &RequestContext,
) -> Result<ImageLayerInner> {
let path = self.path();
let loaded = ImageLayerInner::load(&path, self.desc.image_layer_lsn(), None, ctx)
.await
.and_then(|res| res)?;
let loaded = ImageLayerInner::load(
&path,
self.desc.image_layer_lsn(),
None,
max_vectored_read_size,
ctx,
)
.await
.and_then(|res| res)?;
// not production code
let actual_filename = path.file_name().unwrap().to_owned();
@@ -351,6 +365,11 @@ impl ImageLayer {
}
}
#[derive(Copy, Clone, Debug)]
struct ImageVecBlobMeta {
pub key: Key,
}
impl ImageLayerInner {
/// Returns nested result following Result<Result<_, OpErr>, Critical>:
/// - inner has the success or transient failure
@@ -359,14 +378,15 @@ impl ImageLayerInner {
path: &Utf8Path,
lsn: Lsn,
summary: Option<Summary>,
max_vectored_read_size: usize,
ctx: &RequestContext,
) -> Result<Result<Self, anyhow::Error>, anyhow::Error> {
let file = match VirtualFile::open(path).await {
Ok(file) => file,
Err(e) => return Ok(Err(anyhow::Error::new(e).context("open layer file"))),
};
let file = FileBlockReader::new(file);
let summary_blk = match file.read_blk(0, ctx).await {
let block_reader = FileBlockReader::new(file);
let summary_blk = match block_reader.read_blk(0, ctx).await {
Ok(blk) => blk,
Err(e) => return Ok(Err(anyhow::Error::new(e).context("read first block"))),
};
@@ -392,11 +412,19 @@ impl ImageLayerInner {
}
}
// TODO: don't open file twice
let file = match VirtualFile::open(path).await {
Ok(file) => file,
Err(e) => return Ok(Err(anyhow::Error::new(e).context("open layer file"))),
};
let vectored_blob_reader = VectoredBlobReader::new(file, max_vectored_read_size);
Ok(Ok(ImageLayerInner {
index_start_blk: actual_summary.index_start_blk,
index_root_blk: actual_summary.index_root_blk,
lsn,
file,
file: block_reader,
vectored_blob_reader,
}))
}
@@ -438,6 +466,120 @@ impl ImageLayerInner {
Ok(ValueReconstructResult::Missing)
}
}
// Look up the keys in the provided keyspace and update
// the reconstruct state with whatever is found.
pub(super) async fn get_values_reconstruct_data(
&self,
keyspace: KeySpace,
reconstruct_state: &mut ValuesReconstructState,
ctx: &RequestContext,
) -> Result<(), GetVectoredError> {
let file = &self.file;
let tree_reader = DiskBtreeReader::new(self.index_start_blk, self.index_root_blk, file);
let mut reads = Vec::new();
for range in keyspace.ranges.iter() {
let mut current_read: Option<VectoredRead<ImageVecBlobMeta>> = None;
let mut prev_idx = None;
let mut raw_key: [u8; KEY_SIZE] = [0u8; KEY_SIZE];
range.start.write_to_byte_slice(&mut raw_key);
tree_reader
.visit(
&raw_key,
VisitDirection::Forwards,
|raw_key, offset| {
let key = Key::from_slice(&raw_key[..KEY_SIZE]);
let (prev_key, prev_offset) = match prev_idx {
None => {
prev_idx = Some((key, offset));
return true;
}
Some(prev) => prev,
};
if prev_key >= range.end {
if let Some(read) = current_read.take() {
reads.push(read);
}
return false;
}
let extended = match &mut current_read {
None => VectoredReadExtended::No,
Some(read) => {
read.extend(prev_offset, offset, ImageVecBlobMeta { key: prev_key })
}
};
if extended == VectoredReadExtended::No {
let next_read = VectoredRead::new(
prev_offset,
offset,
ImageVecBlobMeta { key: prev_key },
self.vectored_blob_reader.get_max_read_size(),
);
let prev_read = current_read.replace(next_read);
if let Some(read) = prev_read {
reads.push(read);
}
}
prev_idx = Some((key, offset));
true
},
&RequestContextBuilder::extend(ctx)
.page_content_kind(PageContentKind::ImageLayerBtreeNode)
.build(),
)
.await
.map_err(|err| GetVectoredError::Other(anyhow!(err)))?;
}
let mut buf = Some(BytesMut::with_capacity(
self.vectored_blob_reader.get_max_read_size(),
));
for read in reads.into_iter().rev() {
let res = self
.vectored_blob_reader
.read_blobs(&read, buf.take().expect("Should have a buffer"))
.await;
let blobs = match res {
Ok(blobs) => blobs,
Err(err) => {
let kind = err.kind();
for (_, blob_meta) in read.blobs_at.as_slice() {
reconstruct_state.on_key_error(
blob_meta.key,
PageReconstructError::from(anyhow!(
"Failed to read blobs from virtual file {}: {}",
file.file.path,
kind
)),
);
}
continue;
}
};
for meta in blobs.metas.iter().rev() {
let img_buf = Bytes::copy_from_slice(&blobs.buf[meta.start..meta.end]);
reconstruct_state.update_key(&meta.meta.key, self.lsn, Value::Image(img_buf));
}
buf = Some(blobs.buf);
}
Ok(())
}
}
/// A builder object for constructing a new image layer.

View File

@@ -9,13 +9,15 @@ use crate::context::{PageContentKind, RequestContext, RequestContextBuilder};
use crate::repository::{Key, Value};
use crate::tenant::block_io::BlockReader;
use crate::tenant::ephemeral_file::EphemeralFile;
use crate::tenant::storage_layer::{ValueReconstructResult, ValueReconstructState};
use crate::tenant::Timeline;
use crate::tenant::storage_layer::ValueReconstructResult;
use crate::tenant::timeline::GetVectoredError;
use crate::tenant::{PageReconstructError, Timeline};
use crate::walrecord;
use anyhow::{ensure, Result};
use anyhow::{anyhow, ensure, Result};
use pageserver_api::keyspace::KeySpace;
use pageserver_api::models::InMemoryLayerInfo;
use pageserver_api::shard::TenantShardId;
use std::collections::HashMap;
use std::collections::{BinaryHeap, HashMap, HashSet};
use std::sync::{Arc, OnceLock};
use tracing::*;
use utils::{bin_ser::BeSer, id::TimelineId, lsn::Lsn, vec_map::VecMap};
@@ -25,7 +27,10 @@ use std::fmt::Write as _;
use std::ops::Range;
use tokio::sync::{RwLock, RwLockWriteGuard};
use super::{DeltaLayerWriter, ResidentLayer};
use super::{
DeltaLayerWriter, ResidentLayer, ValueReconstructSituation, ValueReconstructState,
ValuesReconstructState,
};
pub struct InMemoryLayer {
conf: &'static PageServerConf,
@@ -67,6 +72,13 @@ pub struct InMemoryLayerInner {
file: EphemeralFile,
}
#[derive(Eq, PartialEq, Ord, PartialOrd)]
struct BlockRead {
key: Key,
lsn: Lsn,
block_offset: u64,
}
impl std::fmt::Debug for InMemoryLayerInner {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("InMemoryLayerInner").finish()
@@ -202,6 +214,85 @@ impl InMemoryLayer {
Ok(ValueReconstructResult::Complete)
}
}
// Look up the keys in the provided keyspace and update
// the reconstruct state with whatever is found.
//
// If the key is cached, go no further than the cached Lsn.
pub(crate) async fn get_values_reconstruct_data(
&self,
keyspace: KeySpace,
end_lsn: Lsn,
reconstruct_state: &mut ValuesReconstructState,
ctx: &RequestContext,
) -> Result<(), GetVectoredError> {
let ctx = RequestContextBuilder::extend(ctx)
.page_content_kind(PageContentKind::InMemoryLayer)
.build();
let inner = self.inner.read().await;
let reader = inner.file.block_cursor();
let mut completed_keys = HashSet::new();
let mut min_heap = BinaryHeap::new();
for range in keyspace.ranges.iter() {
let mut key = range.start;
while key < range.end {
if let Some(vec_map) = inner.index.get(&key) {
let range = match reconstruct_state.get_cached_lsn(&key) {
Some(cached_lsn) => (cached_lsn + 1)..end_lsn,
None => self.start_lsn..end_lsn,
};
let slice = vec_map.slice_range(range);
for (entry_lsn, pos) in slice.iter().rev() {
min_heap.push(BlockRead {
key,
lsn: *entry_lsn,
block_offset: *pos,
});
}
}
key = key.next();
}
}
let keyspace_size = keyspace.total_size();
while completed_keys.len() < keyspace_size && !min_heap.is_empty() {
let block_read = min_heap.pop().unwrap();
if completed_keys.contains(&block_read.key) {
continue;
}
let buf = reader.read_blob(block_read.block_offset, &ctx).await;
if let Err(e) = buf {
reconstruct_state
.on_key_error(block_read.key, PageReconstructError::from(anyhow!(e)));
completed_keys.insert(block_read.key);
continue;
}
let value = Value::des(&buf.unwrap());
if let Err(e) = value {
reconstruct_state
.on_key_error(block_read.key, PageReconstructError::from(anyhow!(e)));
completed_keys.insert(block_read.key);
continue;
}
let key_situation =
reconstruct_state.update_key(&block_read.key, block_read.lsn, value.unwrap());
if key_situation == ValueReconstructSituation::Complete {
completed_keys.insert(block_read.key);
}
}
Ok(())
}
}
impl std::fmt::Display for InMemoryLayer {

View File

@@ -1,5 +1,6 @@
use anyhow::Context;
use camino::{Utf8Path, Utf8PathBuf};
use pageserver_api::keyspace::KeySpace;
use pageserver_api::models::{
HistoricLayerInfo, LayerAccessKind, LayerResidenceEventReason, LayerResidenceStatus,
};
@@ -16,13 +17,14 @@ use crate::config::PageServerConf;
use crate::context::RequestContext;
use crate::repository::Key;
use crate::span::debug_assert_current_span_has_tenant_and_timeline_id;
use crate::tenant::timeline::GetVectoredError;
use crate::tenant::{remote_timeline_client::LayerFileMetadata, Timeline};
use super::delta_layer::{self, DeltaEntry};
use super::image_layer;
use super::{
AsLayerDesc, LayerAccessStats, LayerAccessStatsReset, LayerFileName, PersistentLayerDesc,
ValueReconstructResult, ValueReconstructState,
ValueReconstructResult, ValueReconstructState, ValuesReconstructState,
};
use utils::generation::Generation;
@@ -262,6 +264,37 @@ impl Layer {
.with_context(|| format!("get_value_reconstruct_data for layer {self}"))
}
pub(crate) async fn get_values_reconstruct_data(
&self,
keyspace: KeySpace,
start_lsn: Lsn,
end_lsn: Lsn,
reconstruct_data: &mut ValuesReconstructState,
ctx: &RequestContext,
) -> Result<(), GetVectoredError> {
let layer = self
.0
.get_or_maybe_download(true, Some(ctx))
.await
.map_err(|err| GetVectoredError::Other(anyhow::anyhow!(err)))?;
self.0
.access_stats
.record_access(LayerAccessKind::GetValueReconstructData, ctx);
layer
.get_values_reconstruct_data(
keyspace,
start_lsn,
end_lsn,
reconstruct_data,
&self.0,
ctx,
)
.instrument(tracing::debug_span!("get_values_reconstruct_data", layer=%self))
.await
}
/// Download the layer if evicted.
///
/// Will not error when the layer is already downloaded.
@@ -1177,7 +1210,7 @@ pub(crate) enum EvictionError {
/// Error internal to the [`LayerInner::get_or_maybe_download`]
#[derive(Debug, thiserror::Error)]
enum DownloadError {
pub(crate) enum DownloadError {
#[error("timeline has already shutdown")]
TimelineShutdown,
#[error("no remote storage configured")]
@@ -1274,9 +1307,14 @@ impl DownloadedLayer {
owner.desc.key_range.clone(),
owner.desc.lsn_range.clone(),
));
delta_layer::DeltaLayerInner::load(&owner.path, summary, ctx)
.await
.map(|res| res.map(LayerKind::Delta))
delta_layer::DeltaLayerInner::load(
&owner.path,
summary,
owner.conf.max_vectored_read_size,
ctx,
)
.await
.map(|res| res.map(LayerKind::Delta))
} else {
let lsn = owner.desc.image_layer_lsn();
let summary = Some(image_layer::Summary::expected(
@@ -1285,9 +1323,15 @@ impl DownloadedLayer {
owner.desc.key_range.clone(),
lsn,
));
image_layer::ImageLayerInner::load(&owner.path, lsn, summary, ctx)
.await
.map(|res| res.map(LayerKind::Image))
image_layer::ImageLayerInner::load(
&owner.path,
lsn,
summary,
owner.conf.max_vectored_read_size,
ctx,
)
.await
.map(|res| res.map(LayerKind::Image))
};
match res {
@@ -1337,6 +1381,29 @@ impl DownloadedLayer {
}
}
async fn get_values_reconstruct_data(
&self,
keyspace: KeySpace,
start_lsn: Lsn,
end_lsn: Lsn,
reconstruct_data: &mut ValuesReconstructState,
owner: &Arc<LayerInner>,
ctx: &RequestContext,
) -> Result<(), GetVectoredError> {
use LayerKind::*;
match self.get(owner, ctx).await.map_err(GetVectoredError::from)? {
Delta(d) => {
d.get_values_reconstruct_data(keyspace, start_lsn, end_lsn, reconstruct_data, ctx)
.await
}
Image(i) => {
i.get_values_reconstruct_data(keyspace, reconstruct_data, ctx)
.await
}
}
}
async fn dump(&self, owner: &Arc<LayerInner>, ctx: &RequestContext) -> anyhow::Result<()> {
use LayerKind::*;
match self.get(owner, ctx).await? {

View File

@@ -15,7 +15,7 @@ use utils::id::TenantId;
/// A unique identifier of a persistent layer. This is different from `LayerDescriptor`, which is only used in the
/// benchmarks. This struct contains all necessary information to find the image / delta layer. It also provides
/// a unified way to generate layer information like file name.
#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)]
#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize, Hash)]
pub struct PersistentLayerDesc {
pub tenant_shard_id: TenantShardId,
pub timeline_id: TimelineId,

View File

@@ -16,7 +16,7 @@ use futures::stream::StreamExt;
use itertools::Itertools;
use once_cell::sync::Lazy;
use pageserver_api::{
keyspace::{key_range_size, KeySpaceAccum},
keyspace::KeySpaceAccum,
models::{
DownloadRemoteLayersTaskInfo, DownloadRemoteLayersTaskSpawnRequest, EvictionPolicy,
LayerMapInfo, TimelineState,
@@ -67,7 +67,7 @@ use crate::{
tenant::storage_layer::{
AsLayerDesc, DeltaLayerWriter, EvictionError, ImageLayerWriter, InMemoryLayer, Layer,
LayerAccessStatsReset, LayerFileName, ResidentLayer, ValueReconstructResult,
ValueReconstructState,
ValueReconstructState, ValuesReconstructState,
},
};
use crate::{
@@ -111,11 +111,11 @@ use self::layer_manager::LayerManager;
use self::logical_size::LogicalSize;
use self::walreceiver::{WalReceiver, WalReceiverConf};
use super::config::TenantConf;
use super::remote_timeline_client::index::IndexPart;
use super::remote_timeline_client::RemoteTimelineClient;
use super::secondary::heatmap::{HeatMapLayer, HeatMapTimeline};
use super::{config::TenantConf, storage_layer::ReadableLayerDesc};
use super::{debug_assert_current_span_has_tenant_and_timeline_id, AttachedTenantConf};
use super::{remote_timeline_client::index::IndexPart, storage_layer::LayerFringe};
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
pub(super) enum FlushLoopState {
@@ -464,6 +464,15 @@ pub(crate) enum GetVectoredError {
#[error("Requested at invalid LSN: {0}")]
InvalidLsn(Lsn),
#[error("Requested key {0} not found")]
MissingKey(Key),
#[error(transparent)]
GetReadyAncestorError(GetReadyAncestorError),
#[error(transparent)]
Other(#[from] anyhow::Error),
}
#[derive(thiserror::Error, Debug)]
@@ -571,6 +580,23 @@ impl From<GetReadyAncestorError> for PageReconstructError {
}
}
#[derive(
Eq,
PartialEq,
Debug,
Copy,
Clone,
strum_macros::EnumString,
strum_macros::Display,
serde_with::DeserializeFromStr,
serde_with::SerializeDisplay,
)]
#[strum(serialize_all = "kebab-case")]
pub enum GetVectoredImpl {
Sequential,
Vectored,
}
/// Public interface functions
impl Timeline {
/// Get the LSN where this branch was created
@@ -698,7 +724,7 @@ impl Timeline {
/// which actually vectorizes the read path.
pub(crate) async fn get_vectored(
&self,
key_ranges: &[Range<Key>],
keyspace: KeySpace,
lsn: Lsn,
ctx: &RequestContext,
) -> Result<BTreeMap<Key, Result<Bytes, PageReconstructError>>, GetVectoredError> {
@@ -706,41 +732,170 @@ impl Timeline {
return Err(GetVectoredError::InvalidLsn(lsn));
}
let key_count = key_ranges
.iter()
.map(|range| key_range_size(range) as u64)
.sum();
let key_count = keyspace.total_size().try_into().unwrap();
if key_count > Timeline::MAX_GET_VECTORED_KEYS {
return Err(GetVectoredError::Oversized(key_count));
}
for range in &keyspace.ranges {
let mut key = range.start;
while key != range.end {
assert!(!self.shard_identity.is_key_disposable(&key));
key = key.next();
}
}
trace!(
"get vectored request for {:?}@{} from task kind {:?} will use {} implementation",
keyspace,
lsn,
ctx.task_kind(),
self.conf.get_vectored_impl
);
let _timer = crate::metrics::GET_VECTORED_LATENCY
.for_task_kind(ctx.task_kind())
.map(|t| t.start_timer());
let mut values = BTreeMap::new();
for range in key_ranges {
let mut key = range.start;
while key != range.end {
assert!(!self.shard_identity.is_key_disposable(&key));
match self.conf.get_vectored_impl {
GetVectoredImpl::Sequential => {
self.get_vectored_sequential_impl(keyspace, lsn, ctx).await
}
GetVectoredImpl::Vectored => {
let vectored_res = self.get_vectored_impl(keyspace.clone(), lsn, ctx).await;
let block = self.get(key, lsn, ctx).await;
if matches!(
block,
Err(PageReconstructError::Cancelled | PageReconstructError::AncestorStopping(_))
) {
return Err(GetVectoredError::Cancelled);
if self.conf.validate_vectored_get {
self.validate_get_vectored_impl(&vectored_res, keyspace, lsn, ctx)
.await;
}
values.insert(key, block);
key = key.next();
vectored_res
}
}
}
pub(super) async fn get_vectored_sequential_impl(
&self,
keyspace: KeySpace,
lsn: Lsn,
ctx: &RequestContext,
) -> Result<BTreeMap<Key, Result<Bytes, PageReconstructError>>, GetVectoredError> {
let mut values = BTreeMap::new();
for range in keyspace.ranges {
let mut key = range.start;
while key != range.end {
let block = self.get(key, lsn, ctx).await;
use PageReconstructError::*;
match block {
Err(Cancelled | AncestorStopping(_)) => {
return Err(GetVectoredError::Cancelled)
}
Err(Other(err)) if err.to_string().contains("could not find data for key") => {
return Err(GetVectoredError::MissingKey(key))
}
_ => {
values.insert(key, block);
key = key.next();
}
}
}
}
Ok(values)
}
pub(super) async fn get_vectored_impl(
&self,
keyspace: KeySpace,
lsn: Lsn,
ctx: &RequestContext,
) -> Result<BTreeMap<Key, Result<Bytes, PageReconstructError>>, GetVectoredError> {
let mut reconstruct_state = ValuesReconstructState::new();
self.get_vectored_reconstruct_data(keyspace, lsn, &mut reconstruct_state, ctx)
.await?;
let mut results: BTreeMap<Key, Result<Bytes, PageReconstructError>> = BTreeMap::new();
for (key, res) in reconstruct_state.keys {
match res {
Err(err) => {
results.insert(key, Err(err));
}
Ok(state) => {
let state = ValueReconstructState::from(state);
let reconstruct_res = self.reconstruct_value(key, lsn, state).await;
results.insert(key, reconstruct_res);
}
}
}
Ok(results)
}
pub(super) async fn validate_get_vectored_impl(
&self,
vectored_res: &Result<BTreeMap<Key, Result<Bytes, PageReconstructError>>, GetVectoredError>,
keyspace: KeySpace,
lsn: Lsn,
ctx: &RequestContext,
) {
let sequential_res = self
.get_vectored_sequential_impl(keyspace.clone(), lsn, ctx)
.await;
fn errors_match(lhs: &GetVectoredError, rhs: &GetVectoredError) -> bool {
use GetVectoredError::*;
match (lhs, rhs) {
(Cancelled, Cancelled) => true,
(_, Cancelled) => true,
(Oversized(l), Oversized(r)) => l == r,
(InvalidLsn(l), InvalidLsn(r)) => l == r,
(MissingKey(l), MissingKey(r)) => l == r,
(GetReadyAncestorError(_), GetReadyAncestorError(_)) => true,
(Other(_), Other(_)) => true,
_ => false,
}
}
match (&sequential_res, vectored_res) {
(Err(seq_err), Ok(_)) => {
panic!(concat!("Sequential get failed with {}, but vectored get did not",
" - keyspace={:?} lsn={}"),
seq_err, keyspace, lsn) },
(Ok(_), Err(vec_err)) => {
panic!(concat!("Vectored get failed with {}, but sequential get did not",
" - keyspace={:?} lsn={}"),
vec_err, keyspace, lsn) },
(Err(seq_err), Err(vec_err)) => {
assert!(errors_match(seq_err, vec_err),
"Mismatched errors: {seq_err} != {vec_err} - keyspace={keyspace:?} lsn={lsn}")},
(Ok(seq_values), Ok(vec_values)) => {
seq_values.iter().zip(vec_values.iter()).for_each(|((seq_key, seq_res), (vec_key, vec_res))| {
assert_eq!(seq_key, vec_key);
match (seq_res, vec_res) {
(Ok(seq_blob), Ok(vec_blob)) => {
assert_eq!(seq_blob, vec_blob,
"Image mismatch for key {seq_key} - keyspace={keyspace:?} lsn={lsn}");
},
(Err(err), Ok(_)) => {
panic!(
concat!("Sequential get failed with {} for key {}, but vectored get did not",
" - keyspace={:?} lsn={}"),
err, seq_key, keyspace, lsn) },
(Ok(_), Err(err)) => {
panic!(
concat!("Vectored get failed with {} for key {}, but sequential get did not",
" - keyspace={:?} lsn={}"),
err, seq_key, keyspace, lsn) },
(Err(_), Err(_)) => {}
}
})
}
}
}
/// Get last or prev record separately. Same as get_last_record_rlsn().last/prev.
pub(crate) fn get_last_record_lsn(&self) -> Lsn {
self.last_record_lsn.load().last
@@ -2574,6 +2729,160 @@ impl Timeline {
}
}
/// Get the data needed to reconstruct all keys in the provided keyspace
///
/// The algorithm is as follows:
/// 1. While some keys are still not done and there's a timeline to visit:
/// 2. Visit the timeline (see [`Timeline::get_vectored_reconstruct_data_inner`]:
/// 2.1: Build the fringe for the current keyspace
/// 2.2 Visit the newest layer from the fringe to collect all values for the range it
/// intersects
/// 2.3. Pop the timeline from the fringe
/// 2.4. If the fringe is empty, go back to 1
async fn get_vectored_reconstruct_data(
&self,
mut keyspace: KeySpace,
request_lsn: Lsn,
reconstruct_state: &mut ValuesReconstructState,
ctx: &RequestContext,
) -> Result<(), GetVectoredError> {
let mut timeline_owned: Arc<Timeline>;
let mut timeline = self;
let mut cont_lsn = Lsn(request_lsn.0 + 1);
loop {
if self.cancel.is_cancelled() {
return Err(GetVectoredError::Cancelled);
}
let completed = self
.get_vectored_reconstruct_data_inner(
timeline,
keyspace.clone(),
cont_lsn,
reconstruct_state,
ctx,
)
.await?;
keyspace.remove_overlapping_with(&completed);
if keyspace.total_size() == 0 || timeline.ancestor_timeline.is_none() {
break;
}
cont_lsn = Lsn(timeline.ancestor_lsn.0 + 1);
timeline_owned = timeline
.get_ready_ancestor_timeline(ctx)
.await
.map_err(GetVectoredError::GetReadyAncestorError)?;
timeline = &*timeline_owned;
}
if keyspace.total_size() != 0 {
return Err(GetVectoredError::MissingKey(keyspace.start().unwrap()));
}
Ok(())
}
/// Collect the reconstruct data for a ketspace from the specified timeline.
///
/// Maintain a fringe [`LayerFringe`] which tracks all the layers that intersect
/// the current keyspace. The current keyspace of the search at any given timeline
/// is the original keyspace minus all the keys that have been completed minus
/// any keys for which we couldn't find an intersecting layer. It's not tracked explicitly,
/// but if you merge all the keyspaces in the fringe, you get the "current keyspace".
///
/// At each iteration pop the top of the fringe (the layer with the highest Lsn)
/// and get all the required reconstruct data from the layer in one go.
async fn get_vectored_reconstruct_data_inner(
&self,
timeline: &Timeline,
keyspace: KeySpace,
mut cont_lsn: Lsn,
reconstruct_state: &mut ValuesReconstructState,
ctx: &RequestContext,
) -> Result<KeySpace, GetVectoredError> {
let mut unmapped_keyspace = keyspace.clone();
let mut fringe = LayerFringe::new();
let mut completed_keyspace = KeySpace { ranges: Vec::new() };
'outer: loop {
if self.cancel.is_cancelled() {
return Err(GetVectoredError::Cancelled);
}
let keys_done_last_step = reconstruct_state.consume_done_keys();
unmapped_keyspace.remove_overlapping_with(&keys_done_last_step);
completed_keyspace.merge(&keys_done_last_step);
let guard = timeline.layers.read().await;
let layers = guard.layer_map();
let in_memory_layer = layers.find_in_memory_layer(|l| {
let start_lsn = l.get_lsn_range().start;
cont_lsn > start_lsn
});
match in_memory_layer {
Some(l) => {
fringe.update(
ReadableLayerDesc::InMemory {
handle: l,
lsn_ceil: cont_lsn,
},
unmapped_keyspace.clone(),
);
}
None => {
for range in unmapped_keyspace.ranges.iter() {
let results = match layers.range_search(range.clone(), cont_lsn) {
Some(res) => res,
None => {
break 'outer;
}
};
results
.found
.into_iter()
.map(|(res, accum)| {
(
ReadableLayerDesc::Persistent {
desc: (*res.layer).clone(),
lsn_floor: res.lsn_floor,
lsn_ceil: cont_lsn,
},
accum.to_keyspace(),
)
})
.for_each(|(layer, keyspace)| fringe.update(layer, keyspace));
}
}
}
if let Some((layer_to_read, keyspace_to_read)) = fringe.next_layer() {
layer_to_read
.get_values_reconstruct_data(
&guard,
keyspace_to_read.clone(),
reconstruct_state,
ctx,
)
.await?;
unmapped_keyspace = keyspace_to_read;
cont_lsn = layer_to_read.get_lsn_floor();
} else {
break;
}
}
Ok(completed_keyspace)
}
/// # Cancel-safety
///
/// This method is cancellation-safe.
@@ -3310,11 +3619,7 @@ impl Timeline {
|| key.next() == range.end
{
let results = self
.get_vectored(
&key_request_accum.consume_keyspace().ranges,
lsn,
ctx,
)
.get_vectored(key_request_accum.consume_keyspace(), lsn, ctx)
.await?;
for (img_key, img) in results {

View File

@@ -562,7 +562,18 @@ impl VirtualFile {
B: IoBufMut + Send,
{
let (buf, res) =
read_exact_at_impl(buf, offset, |buf, offset| self.read_at(buf, offset)).await;
read_exact_at_impl(buf, offset, None, |buf, offset| self.read_at(buf, offset)).await;
res.map(|()| buf)
}
pub async fn read_exact_at_n<B>(&self, buf: B, offset: u64, count: usize) -> Result<B, Error>
where
B: IoBufMut + Send,
{
let (buf, res) = read_exact_at_impl(buf, offset, Some(count), |buf, offset| {
self.read_at(buf, offset)
})
.await;
res.map(|()| buf)
}
@@ -696,6 +707,7 @@ impl VirtualFile {
pub async fn read_exact_at_impl<B, F, Fut>(
buf: B,
mut offset: u64,
count: Option<usize>,
mut read_at: F,
) -> (B, std::io::Result<()>)
where
@@ -703,7 +715,15 @@ where
F: FnMut(tokio_epoll_uring::Slice<B>, u64) -> Fut,
Fut: std::future::Future<Output = (tokio_epoll_uring::Slice<B>, std::io::Result<usize>)>,
{
let mut buf: tokio_epoll_uring::Slice<B> = buf.slice_full(); // includes all the uninitialized memory
let mut buf: tokio_epoll_uring::Slice<B> = match count {
Some(count) => {
assert!(count <= buf.bytes_total());
assert!(count > 0);
buf.slice(..count) // may include uninitialized memory
}
None => buf.slice_full(), // includes all the uninitialized memory
};
while buf.bytes_total() != 0 {
let res;
(buf, res) = read_at(buf, offset).await;
@@ -793,7 +813,7 @@ mod test_read_exact_at_impl {
result: Ok(vec![b'a', b'b', b'c', b'd', b'e']),
}]),
}));
let (buf, res) = read_exact_at_impl(buf, 0, |buf, offset| {
let (buf, res) = read_exact_at_impl(buf, 0, None, |buf, offset| {
let mock_read_at = Arc::clone(&mock_read_at);
async move { mock_read_at.lock().await.read_at(buf, offset).await }
})
@@ -802,13 +822,33 @@ mod test_read_exact_at_impl {
assert_eq!(buf, vec![b'a', b'b', b'c', b'd', b'e']);
}
#[tokio::test]
async fn test_with_count() {
let buf = Vec::with_capacity(5);
let mock_read_at = Arc::new(tokio::sync::Mutex::new(MockReadAt {
expectations: VecDeque::from(vec![Expectation {
offset: 0,
bytes_total: 3,
result: Ok(vec![b'a', b'b', b'c']),
}]),
}));
let (buf, res) = read_exact_at_impl(buf, 0, Some(3), |buf, offset| {
let mock_read_at = Arc::clone(&mock_read_at);
async move { mock_read_at.lock().await.read_at(buf, offset).await }
})
.await;
assert!(res.is_ok());
assert_eq!(buf, vec![b'a', b'b', b'c']);
}
#[tokio::test]
async fn test_empty_buf_issues_no_syscall() {
let buf = Vec::new();
let mock_read_at = Arc::new(tokio::sync::Mutex::new(MockReadAt {
expectations: VecDeque::new(),
}));
let (_buf, res) = read_exact_at_impl(buf, 0, |buf, offset| {
let (_buf, res) = read_exact_at_impl(buf, 0, None, |buf, offset| {
let mock_read_at = Arc::clone(&mock_read_at);
async move { mock_read_at.lock().await.read_at(buf, offset).await }
})
@@ -833,7 +873,7 @@ mod test_read_exact_at_impl {
},
]),
}));
let (buf, res) = read_exact_at_impl(buf, 0, |buf, offset| {
let (buf, res) = read_exact_at_impl(buf, 0, None, |buf, offset| {
let mock_read_at = Arc::clone(&mock_read_at);
async move { mock_read_at.lock().await.read_at(buf, offset).await }
})
@@ -864,7 +904,7 @@ mod test_read_exact_at_impl {
},
]),
}));
let (_buf, res) = read_exact_at_impl(buf, 0, |buf, offset| {
let (_buf, res) = read_exact_at_impl(buf, 0, None, |buf, offset| {
let mock_read_at = Arc::clone(&mock_read_at);
async move { mock_read_at.lock().await.read_at(buf, offset).await }
})

View File

@@ -486,6 +486,11 @@ class NeonEnvBuilder:
self.pageserver_virtual_file_io_engine: Optional[str] = pageserver_virtual_file_io_engine
self.pageserver_get_vectored_impl: Optional[str] = None
if os.getenv("PAGESERVER_GET_VECTORED_IMPL", "") == "vectored":
self.pageserver_get_vectored_impl = "vectored"
log.debug('Overriding pageserver get_vectored_impl config to "vectored"')
assert test_name.startswith(
"test_"
), "Unexpectedly instantiated from outside a test function"
@@ -1053,6 +1058,8 @@ class NeonEnv:
}
if self.pageserver_virtual_file_io_engine is not None:
ps_cfg["virtual_file_io_engine"] = self.pageserver_virtual_file_io_engine
if config.pageserver_get_vectored_impl is not None:
ps_cfg["get_vectored_impl"] = config.pageserver_get_vectored_impl
# Create a corresponding NeonPageserver object
self.pageservers.append(

View File

@@ -0,0 +1,196 @@
from pathlib import Path
import asyncio
import json
import pytest
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnv, NeonEnvBuilder, PgBin, wait_for_last_flush_lsn, Endpoint
from fixtures.benchmark_fixture import MetricReport, NeonBenchmarker
from performance.pageserver.util import (
ensure_pageserver_ready_for_benchmarking,
setup_pageserver_with_tenants,
)
from fixtures.utils import get_scale_for_db, humantime_to_ms
@pytest.mark.parametrize("duration", [30])
@pytest.mark.parametrize("pgbench_scale", [get_scale_for_db(200)])
@pytest.mark.parametrize("n_tenants", [10])
@pytest.mark.parametrize("get_vectored_impl", ["sequential", "vectored"])
@pytest.mark.timeout(1000)
def test_basebackup_with_high_slru_count(
neon_env_builder: NeonEnvBuilder,
zenbenchmark: NeonBenchmarker,
pg_bin: PgBin,
get_vectored_impl: str,
n_tenants: int,
pgbench_scale: int,
duration: int,
):
def record(metric, **kwargs):
zenbenchmark.record(metric_name=f"pageserver_basebackup.{metric}", **kwargs)
params: Dict[str, Tuple[Any, Dict[str, Any]]] = {}
# params from fixtures
params.update(
{
"n_tenants": (n_tenants, {"unit": ""}),
"pgbench_scale": (pgbench_scale, {"unit": ""}),
"duration": (duration, {"unit": "s"}),
}
)
# configure cache sizes like in prod
page_cache_size = 16384
max_file_descriptors = 500000
neon_env_builder.pageserver_config_override = (
f"page_cache_size={page_cache_size}; max_file_descriptors={max_file_descriptors}; "
f"get_vectored_impl='{get_vectored_impl}'; validate_vectored_get=false"
)
params.update(
{
"pageserver_config_override.page_cache_size": (
page_cache_size * 8192,
{"unit": "byte"},
),
"pageserver_config_override.max_file_descriptors": (max_file_descriptors, {"unit": ""}),
}
)
for param, (value, kwargs) in params.items():
record(param, metric_value=value, report=MetricReport.TEST_PARAM, **kwargs)
n_txns = 100000
def setup_wrapper(env: NeonEnv):
return setup_tenant_template(env, n_txns)
env = setup_pageserver_with_tenants(
neon_env_builder, f"large_slru_count-{n_tenants}-{n_txns}", n_tenants, setup_wrapper
)
run_benchmark(env, pg_bin, record, duration)
def setup_tenant_template(env: NeonEnv, n_txns: int):
config = {
"gc_period": "0s", # disable periodic gc
"checkpoint_timeout": "10 years",
"compaction_period": "0s", # disable periodic compaction
"compaction_threshold": 10,
"compaction_target_size": 134217728,
"checkpoint_distance": 268435456,
"image_creation_threshold": 3,
}
template_tenant, template_timeline = env.neon_cli.create_tenant(set_default=True)
env.pageserver.tenant_detach(template_tenant)
env.pageserver.allowed_errors.append(
# tenant detach causes this because the underlying attach-hook removes the tenant from attachment_service entirely
".*Dropped remote consistent LSN updates.*",
)
env.pageserver.tenant_attach(template_tenant, config)
ps_http = env.pageserver.http_client()
with env.endpoints.create_start(
"main", tenant_id=template_tenant, config_lines=["shared_buffers=1MB"]
) as ep:
rels = 10
with ep.cursor() as cur:
asyncio.run(run_updates(ep, n_txns, rels))
wait_for_last_flush_lsn(env, ep, template_tenant, template_timeline)
ps_http.timeline_checkpoint(template_tenant, template_timeline)
ps_http.timeline_compact(template_tenant, template_timeline)
return (template_tenant, template_timeline, config)
# Takes about 5 minutes and produces tenants with around 300 SLRU blocks
# of 8 KiB each.
async def run_updates(ep: Endpoint, n_txns: int, workers_count: int):
workers = []
for i in range(workers_count):
workers.append(asyncio.create_task(run_update_loop_worker(ep, n_txns, i)))
await asyncio.gather(*workers)
async def run_update_loop_worker(ep: Endpoint, n_txns: int, idx: int):
table = f"t_{idx}"
conn = await ep.connect_async()
await conn.execute(f"CREATE TABLE {table} (pk integer PRIMARY KEY, x integer)")
await conn.execute(f"ALTER TABLE {table} SET (autovacuum_enabled = false)")
await conn.execute(f"INSERT INTO {table} VALUES (1, 0)")
await conn.execute(
"""
CREATE PROCEDURE updating{0}() as
$$
DECLARE
i integer;
BEGIN
FOR i IN 1..{1} LOOP
UPDATE {0} SET x = x + 1 WHERE pk=1;
COMMIT;
END LOOP;
END
$$ LANGUAGE plpgsql
""".format(table, n_txns)
)
await conn.execute("SET statement_timeout=0")
await conn.execute(f"call updating{table}()")
def run_benchmark(env: NeonEnv, pg_bin: PgBin, record, duration_secs: int):
ps_http = env.pageserver.http_client()
cmd = [
str(env.neon_binpath / "pagebench"),
"basebackup",
"--mgmt-api-endpoint",
ps_http.base_url,
"--page-service-connstring",
env.pageserver.connstr(password=None),
"--gzip-probability",
"1",
"--runtime",
f"{duration_secs}s",
# don't specify the targets explicitly, let pagebench auto-discover them
]
log.info(f"command: {' '.join(cmd)}")
basepath = pg_bin.run_capture(cmd, with_command_header=False)
results_path = Path(basepath + ".stdout")
log.info(f"Benchmark results at: {results_path}")
with open(results_path, "r") as f:
results = json.load(f)
log.info(f"Results:\n{json.dumps(results, sort_keys=True, indent=2)}")
total = results["total"]
metric = "request_count"
record(
metric,
metric_value=total[metric],
unit="",
report=MetricReport.HIGHER_IS_BETTER,
)
metric = "latency_mean"
record(
metric,
metric_value=humantime_to_ms(total[metric]),
unit="ms",
report=MetricReport.LOWER_IS_BETTER,
)
metric = "latency_percentiles"
for k, v in total[metric].items():
record(
f"{metric}.{k}",
metric_value=humantime_to_ms(v),
unit="ms",
report=MetricReport.LOWER_IS_BETTER,
)

View File

@@ -14,7 +14,10 @@ from fixtures.neon_fixtures import (
)
from fixtures.utils import get_scale_for_db, humantime_to_ms
from performance.pageserver.util import ensure_pageserver_ready_for_benchmarking
from performance.pageserver.util import (
ensure_pageserver_ready_for_benchmarking,
setup_pageserver_with_tenants,
)
# For reference, the space usage of the snapshots:
@@ -75,10 +78,72 @@ def test_pageserver_max_throughput_getpage_at_latest_lsn(
for param, (value, kwargs) in params.items():
record(param, metric_value=value, report=MetricReport.TEST_PARAM, **kwargs)
env = setup_pageserver_with_pgbench_tenants(neon_env_builder, pg_bin, n_tenants, pgbench_scale)
def setup_wrapper(env: NeonEnv):
return setup_tenant_template(env, pg_bin, pgbench_scale)
env = setup_pageserver_with_tenants(
neon_env_builder,
f"max_throughput_latest_lsn-{n_tenants}-{pgbench_scale}",
n_tenants,
setup_wrapper,
)
run_benchmark_max_throughput_latest_lsn(env, pg_bin, record, duration)
def setup_tenant_template(env: NeonEnv, pg_bin: PgBin, scale: int):
# use a config that makes production of on-disk state timing-insensitive
# as we ingest data into the tenant.
config = {
"gc_period": "0s", # disable periodic gc
"checkpoint_timeout": "10 years",
"compaction_period": "0s", # disable periodic compaction
"compaction_threshold": 10,
"compaction_target_size": 134217728,
"checkpoint_distance": 268435456,
"image_creation_threshold": 3,
}
template_tenant, template_timeline = env.neon_cli.create_tenant(set_default=True)
env.pageserver.tenant_detach(template_tenant)
env.pageserver.allowed_errors.append(
# tenant detach causes this because the underlying attach-hook removes the tenant from attachment_service entirely
".*Dropped remote consistent LSN updates.*",
)
env.pageserver.tenant_attach(template_tenant, config)
ps_http = env.pageserver.http_client()
with env.endpoints.create_start("main", tenant_id=template_tenant) as ep:
pg_bin.run_capture(["pgbench", "-i", f"-s{scale}", "-I", "dtGvp", ep.connstr()])
wait_for_last_flush_lsn(env, ep, template_tenant, template_timeline)
ps_http.timeline_checkpoint(template_tenant, template_timeline)
ps_http.timeline_compact(template_tenant, template_timeline)
for _ in range(
0, 17
): # some prime number to avoid potential resonances with the "_threshold" variables from the config
# the L0s produced by this appear to have size ~5MiB
num_txns = 10_000
pg_bin.run_capture(
["pgbench", "-N", "-c1", "--transactions", f"{num_txns}", ep.connstr()]
)
wait_for_last_flush_lsn(env, ep, template_tenant, template_timeline)
ps_http.timeline_checkpoint(template_tenant, template_timeline)
ps_http.timeline_compact(template_tenant, template_timeline)
# for reference, the output at scale=6 looked like so (306M total)
# ls -sh test_output/shared-snapshots/max_throughput_latest_lsn-2-6/snapshot/pageserver_1/tenants/35c30b88ea16a7a09f82d9c6a115551b/timelines/da902b378eebe83dc8a4e81cd3dc1c59
# total 306M
# 188M 000000000000000000000000000000000000-030000000000000000000000000000000003__000000000149F060-0000000009E75829
# 4.5M 000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000009E75829-000000000A21E919
# 33M 000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__000000000A21E919-000000000C20CB71
# 36M 000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__000000000C20CB71-000000000E470791
# 16M 000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__000000000E470791-000000000F34AEF1
# 8.2M 000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__000000000F34AEF1-000000000FABA8A9
# 6.0M 000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__000000000FABA8A9-000000000FFE0639
# 6.1M 000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__000000000FFE0639-000000001051D799
# 4.7M 000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__000000001051D799-0000000010908F19
# 4.6M 000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000010908F19-0000000010CD3021
return (template_tenant, template_timeline, config)
def run_benchmark_max_throughput_latest_lsn(
env: NeonEnv, pg_bin: PgBin, record, duration_secs: int
):
@@ -133,78 +198,3 @@ def run_benchmark_max_throughput_latest_lsn(
unit="ms",
report=MetricReport.LOWER_IS_BETTER,
)
def setup_pageserver_with_pgbench_tenants(
neon_env_builder: NeonEnvBuilder,
pg_bin: PgBin,
n_tenants: int,
scale: int,
) -> NeonEnv:
"""
Utility function to set up a pageserver with a given number of identical tenants.
Each tenant is a pgbench tenant, initialize to a certain scale, and treated afterwards
with a repeat application of (pgbench simple-update workload, checkpoint, compact).
"""
def setup_template(env: NeonEnv):
# use a config that makes production of on-disk state timing-insensitive
# as we ingest data into the tenant.
config = {
"gc_period": "0s", # disable periodic gc
"checkpoint_timeout": "10 years",
"compaction_period": "0s", # disable periodic compaction
"compaction_threshold": 10,
"compaction_target_size": 134217728,
"checkpoint_distance": 268435456,
"image_creation_threshold": 3,
}
template_tenant, template_timeline = env.neon_cli.create_tenant(set_default=True)
env.pageserver.tenant_detach(template_tenant)
env.pageserver.allowed_errors.append(
# tenant detach causes this because the underlying attach-hook removes the tenant from attachment_service entirely
".*Dropped remote consistent LSN updates.*",
)
env.pageserver.tenant_attach(template_tenant, config)
ps_http = env.pageserver.http_client()
with env.endpoints.create_start("main", tenant_id=template_tenant) as ep:
pg_bin.run_capture(["pgbench", "-i", f"-s{scale}", "-I", "dtGvp", ep.connstr()])
wait_for_last_flush_lsn(env, ep, template_tenant, template_timeline)
ps_http.timeline_checkpoint(template_tenant, template_timeline)
ps_http.timeline_compact(template_tenant, template_timeline)
for _ in range(
0, 17
): # some prime number to avoid potential resonances with the "_threshold" variables from the config
# the L0s produced by this appear to have size ~5MiB
num_txns = 10_000
pg_bin.run_capture(
["pgbench", "-N", "-c1", "--transactions", f"{num_txns}", ep.connstr()]
)
wait_for_last_flush_lsn(env, ep, template_tenant, template_timeline)
ps_http.timeline_checkpoint(template_tenant, template_timeline)
ps_http.timeline_compact(template_tenant, template_timeline)
# for reference, the output at scale=6 looked like so (306M total)
# ls -sh test_output/shared-snapshots/max_throughput_latest_lsn-2-6/snapshot/pageserver_1/tenants/35c30b88ea16a7a09f82d9c6a115551b/timelines/da902b378eebe83dc8a4e81cd3dc1c59
# total 306M
# 188M 000000000000000000000000000000000000-030000000000000000000000000000000003__000000000149F060-0000000009E75829
# 4.5M 000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000009E75829-000000000A21E919
# 33M 000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__000000000A21E919-000000000C20CB71
# 36M 000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__000000000C20CB71-000000000E470791
# 16M 000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__000000000E470791-000000000F34AEF1
# 8.2M 000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__000000000F34AEF1-000000000FABA8A9
# 6.0M 000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__000000000FABA8A9-000000000FFE0639
# 6.1M 000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__000000000FFE0639-000000001051D799
# 4.7M 000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__000000001051D799-0000000010908F19
# 4.6M 000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000010908F19-0000000010CD3021
return (template_tenant, template_timeline, config)
def doit(neon_env_builder: NeonEnvBuilder) -> NeonEnv:
return many_tenants.single_timeline(neon_env_builder, setup_template, n_tenants)
env = neon_env_builder.build_and_use_snapshot(
f"max_throughput_latest_lsn-{n_tenants}-{scale}", doit
)
env.start()
ensure_pageserver_ready_for_benchmarking(env, n_tenants)
return env

View File

@@ -2,9 +2,17 @@
Utilities used by all code in this sub-directory
"""
from typing import Any, Callable, Dict, Tuple
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnv
from fixtures.neon_fixtures import (
NeonEnv,
NeonEnvBuilder,
PgBin,
wait_for_last_flush_lsn,
)
import fixtures.pageserver.many_tenants as many_tenants
from fixtures.pageserver.utils import wait_until_all_tenants_state
from fixtures.types import TenantId, TimelineId
def ensure_pageserver_ready_for_benchmarking(env: NeonEnv, n_tenants: int):
@@ -27,3 +35,24 @@ def ensure_pageserver_ready_for_benchmarking(env: NeonEnv, n_tenants: int):
assert not layer.remote
log.info("ready")
def setup_pageserver_with_tenants(
neon_env_builder: NeonEnvBuilder,
name: str,
n_tenants: int,
setup: Callable[[NeonEnv], Tuple[TenantId, TimelineId, Dict[str, Any]]],
) -> NeonEnv:
"""
Utility function to set up a pageserver with a given number of identical tenants.
Each tenant is a pgbench tenant, initialize to a certain scale, and treated afterwards
with a repeat application of (pgbench simple-update workload, checkpoint, compact).
"""
def doit(neon_env_builder: NeonEnvBuilder) -> NeonEnv:
return many_tenants.single_timeline(neon_env_builder, setup, n_tenants)
env = neon_env_builder.build_and_use_snapshot(name, doit)
env.start()
ensure_pageserver_ready_for_benchmarking(env, n_tenants)
return env

View File

@@ -221,6 +221,10 @@ def test_forward_compatibility(
)
try:
# TODO: remove this once the previous pageserrver version understands
# the 'get_vectored_impl' config
neon_env_builder.pageserver_get_vectored_impl = None
neon_env_builder.num_safekeepers = 3
neon_local_binpath = neon_env_builder.neon_binpath
env = neon_env_builder.from_repo_dir(