pageserver: issue concurrent IO on the read path (#9353)

## Refs

- Epic: https://github.com/neondatabase/neon/issues/9378

Co-authored-by: Vlad Lazar <vlad@neon.tech>
Co-authored-by: Christian Schwarz <christian@neon.tech>

## Problem

The read path does its IOs sequentially.
This means that if N values need to be read to reconstruct a page,
we will do N IOs and getpage latency is `O(N*IoLatency)`.

## Solution

With this PR we gain the ability to issue IO concurrently within one
layer visit **and** to move on to the next layer without waiting for IOs
from the previous visit to complete.

This is an evolved version of the work done at the Lisbon hackathon,
cf https://github.com/neondatabase/neon/pull/9002.

## Design

### `will_init` now sourced from disk btree index keys

On the algorithmic level, the only change is that the
`get_values_reconstruct_data`
now sources `will_init` from the disk btree index key (which is
PS-page_cache'd), instead
of from the `Value`, which is only available after the IO completes.

### Concurrent IOs, Submission & Completion 

To separate IO submission from waiting for its completion, while
simultaneously
feature-gating the change, we introduce the notion of an `IoConcurrency`
struct
through which IO futures are "spawned".

An IO is an opaque future, and waiting for completions is handled
through
`tokio::sync::oneshot` channels.
The oneshot Receiver's take the place of the `img` and `records` fields
inside `VectoredValueReconstructState`.

When we're done visiting all the layers and submitting all the IOs along
the way
we concurrently `collect_pending_ios` for each value, which means
for each value there is a future that awaits all the oneshot receivers
and then calls into walredo to reconstruct the page image.
Walredo is now invoked concurrently for each value instead of
sequentially.
Walredo itself remains unchanged.

The spawned IO futures are driven to completion by a sidecar tokio task
that
is separate from the task that performs all the layer visiting and
spawning of IOs.
That tasks receives the IO futures via an unbounded mpsc channel and
drives them to completion inside a `FuturedUnordered`.

(The behavior from before this PR is available through
`IoConcurrency::Sequential`,
which awaits the IO futures in place, without "spawning" or "submitting"
them
anywhere.)

#### Alternatives Explored

A few words on the rationale behind having a sidecar *task* and what
alternatives were considered.

One option is to queue up all IO futures in a FuturesUnordered that is
polled
the first time when we `collect_pending_ios`.

Firstly, the IO futures are opaque, compiler-generated futures that need
to be polled at least once to submit their IO. "At least once" because
tokio-epoll-uring may not be able to submit the IO to the kernel on
first
poll right away.

Second, there are deadlocks if we don't drive the IO futures to
completion
independently of the spawning task.
The reason is that both the IO futures and the spawning task may hold
some
_and_ try to acquire _more_ shared limited resources.
For example, both spawning task and IO future may try to acquire
* a VirtualFile file descriptor cache slot async mutex (observed during
impl)
* a tokio-epoll-uring submission slot (observed during impl)
* a PageCache slot (currently this is not the case but we may move more
code into the IO futures in the future)

Another option is to spawn a short-lived `tokio::task` for each IO
future.
We implemented and benchmarked it during development, but found little
throughput improvement and moderate mean & tail latency degradation.
Concerns about pressure on the tokio scheduler made us discard this
variant.

The sidecar task could be obsoleted if the IOs were not arbitrary code
but a well-defined struct.
However,
1. the opaque futures approach taken in this PR allows leaving the
existing
   code unchanged, which
2. allows us to implement the `IoConcurrency::Sequential` mode for
feature-gating
   the change.

Once the new mode sidecar task implementation is rolled out everywhere,
and `::Sequential` removed, we can think about a descriptive submission
& completion interface.
The problems around deadlocks pointed out earlier will need to be solved
then.
For example, we could eliminate VirtualFile file descriptor cache and
tokio-epoll-uring slots.
The latter has been drafted in
https://github.com/neondatabase/tokio-epoll-uring/pull/63.

See the lengthy doc comment on `spawn_io()` for more details.

### Error handling

There are two error classes during reconstruct data retrieval:
* traversal errors: index lookup, move to next layer, and the like
* value read IO errors

A traversal error fails the entire get_vectored request, as before this
PR.
A value read error only fails that value.

In any case, we preserve the existing behavior that once
`get_vectored` returns, all IOs are done. Panics and failing
to poll `get_vectored` to completion will leave the IOs dangling,
which is safe but shouldn't happen, and so, a rate-limited
log statement will be emitted at warning level.
There is a doc comment on `collect_pending_ios` giving more code-level
details and rationale.

### Feature Gating

The new behavior is opt-in via pageserver config.
The `Sequential` mode is the default.
The only significant change in `Sequential` mode compared to before
this PR is the buffering of results in the `oneshot`s.

## Code-Level Changes

Prep work:
  * Make `GateGuard` clonable.

Core Feature:
* Traversal code: track  `will_init` in `BlobMeta` and source it from
the Delta/Image/InMemory layer index, instead of determining `will_init`
  after we've read the value. This avoids having to read the value to
  determine whether traversal can stop.
* Introduce `IoConcurrency` & its sidecar task.
  * `IoConcurrency` is the clonable handle.
  * It connects to the sidecar task via an `mpsc`.
* Plumb through `IoConcurrency` from high level code to the
  individual layer implementations' `get_values_reconstruct_data`.
  We piggy-back on the `ValuesReconstructState` for this.
   * The sidecar task should be long-lived, so, `IoConcurrency` needs
     to be rooted up "high" in the call stack.
   * Roots as of this PR:
     * `page_service`: outside of pagestream loop
     * `create_image_layers`: when it is called
     * `basebackup`(only auxfiles + replorigin + SLRU segments)
   * Code with no roots that uses `IoConcurrency::sequential`
     * any `Timeline::get` call
       * `collect_keyspace` is a good example
       * follow-up: https://github.com/neondatabase/neon/issues/10460
* `TimelineAdaptor` code used by the compaction simulator, unused in
practive
     * `ingest_xlog_dbase_create`
* Transform Delta/Image/InMemoryLayer to
  * do their values IO in a distinct `async {}` block
  * extend the residence of the Delta/Image layer until the IO is done
  * buffer their results in a `oneshot` channel instead of straight
    in `ValuesReconstructState` 
* the `oneshot` channel is wrapped in `OnDiskValueIo` /
`OnDiskValueIoWaiter`
    types that aid in expressiveness and are used to keep track of
    in-flight IOs so we can print warnings if we leave them dangling.
* Change `ValuesReconstructState` to hold the receiving end of the
 `oneshot` channel aka `OnDiskValueIoWaiter`.
* Change `get_vectored_impl` to `collect_pending_ios` and issue walredo
concurrently, in a `FuturesUnordered`.

Testing / Benchmarking:
* Support queue-depth in pagebench for manual benchmarkinng.
* Add test suite support for setting concurrency mode ps config
   field via a) an env var and b) via NeonEnvBuilder.
* Hacky helper to have sidecar-based IoConcurrency in tests.
   This will be cleaned up later.

More benchmarking will happen post-merge in nightly benchmarks, plus in
staging/pre-prod.
Some intermediate helpers for manual benchmarking have been preserved in
https://github.com/neondatabase/neon/pull/10466 and will be landed in
later PRs.
(L0 layer stack generator!)

Drive-By:
* test suite actually didn't enable batching by default because
`config.compatibility_neon_binpath` is always Truthy in our CI
environment
  => https://neondb.slack.com/archives/C059ZC138NR/p1737490501941309
* initial logical size calculation wasn't always polled to completion,
which was
  surfaced through the added WARN logs emitted when dropping a 
  `ValuesReconstructState` that still has inflight IOs.
* remove the timing histograms
`pageserver_getpage_get_reconstruct_data_seconds`
and `pageserver_getpage_reconstruct_seconds` because with planning,
value read
IO, and walredo happening concurrently, one can no longer attribute
latency
to any one of them; we'll revisit this when Vlad's work on
tracing/sampling
  through RequestContext lands.
* remove code related to `get_cached_lsn()`.
  The logic around this has been dead at runtime for a long time,
  ever since the removal of the materialized page cache in #8105.

## Testing

Unit tests use the sidecar task by default and run both modes in CI.
Python regression tests and benchmarks also use the sidecar task by
default.
We'll test more in staging and possibly preprod.

# Future Work

Please refer to the parent epic for the full plan.

The next step will be to fold the plumbing of IoConcurrency
into RequestContext so that the function signatures get cleaned up.

Once `Sequential` isn't used anymore, we can take the next
big leap which is replacing the opaque IOs with structs
that have well-defined semantics.

---------

Co-authored-by: Christian Schwarz <christian@neon.tech>
This commit is contained in:
Vlad Lazar
2025-01-22 15:30:23 +00:00
committed by GitHub
parent 881e351f69
commit 414ed82c1f
29 changed files with 1490 additions and 556 deletions

View File

@@ -229,8 +229,13 @@ jobs:
${cov_prefix} cargo nextest run $CARGO_FLAGS $CARGO_FEATURES -E '!package(pageserver)'
# run pageserver tests with different settings
for io_engine in std-fs tokio-epoll-uring ; do
NEON_PAGESERVER_UNIT_TEST_VIRTUAL_FILE_IOENGINE=$io_engine ${cov_prefix} cargo nextest run $CARGO_FLAGS $CARGO_FEATURES -E 'package(pageserver)'
for get_vectored_concurrent_io in sequential sidecar-task; do
for io_engine in std-fs tokio-epoll-uring ; do
NEON_PAGESERVER_UNIT_TEST_GET_VECTORED_CONCURRENT_IO=$get_vectored_concurrent_io \
NEON_PAGESERVER_UNIT_TEST_VIRTUAL_FILE_IOENGINE=$io_engine \
${cov_prefix} \
cargo nextest run $CARGO_FLAGS $CARGO_FEATURES -E 'package(pageserver)'
done
done
# Run separate tests for real S3
@@ -314,6 +319,7 @@ jobs:
CHECK_ONDISK_DATA_COMPATIBILITY: nonempty
BUILD_TAG: ${{ inputs.build-tag }}
PAGESERVER_VIRTUAL_FILE_IO_ENGINE: tokio-epoll-uring
PAGESERVER_GET_VECTORED_CONCURRENT_IO: sidecar-task
USE_LFC: ${{ matrix.lfc_state == 'with-lfc' && 'true' || 'false' }}
# Temporary disable this step until we figure out why it's so flaky

4
Cargo.lock generated
View File

@@ -6774,7 +6774,7 @@ dependencies = [
[[package]]
name = "tokio-epoll-uring"
version = "0.1.0"
source = "git+https://github.com/neondatabase/tokio-epoll-uring.git?branch=main#33e00106a268644d02ba0461bbd64476073b0ee1"
source = "git+https://github.com/neondatabase/tokio-epoll-uring.git?branch=main#781989bb540a1408b0b93daa1e9d1fa452195497"
dependencies = [
"futures",
"nix 0.26.4",
@@ -7369,7 +7369,7 @@ dependencies = [
[[package]]
name = "uring-common"
version = "0.1.0"
source = "git+https://github.com/neondatabase/tokio-epoll-uring.git?branch=main#33e00106a268644d02ba0461bbd64476073b0ee1"
source = "git+https://github.com/neondatabase/tokio-epoll-uring.git?branch=main#781989bb540a1408b0b93daa1e9d1fa452195497"
dependencies = [
"bytes",
"io-uring",

View File

@@ -120,6 +120,7 @@ pub struct ConfigToml {
pub no_sync: Option<bool>,
pub wal_receiver_protocol: PostgresClientProtocol,
pub page_service_pipelining: PageServicePipeliningConfig,
pub get_vectored_concurrent_io: GetVectoredConcurrentIo,
}
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
@@ -158,6 +159,25 @@ pub enum PageServiceProtocolPipelinedExecutionStrategy {
Tasks,
}
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
#[serde(tag = "mode", rename_all = "kebab-case")]
#[serde(deny_unknown_fields)]
pub enum GetVectoredConcurrentIo {
/// The read path is fully sequential: layers are visited
/// one after the other and IOs are issued and waited upon
/// from the same task that traverses the layers.
Sequential,
/// The read path still traverses layers sequentially, and
/// index blocks will be read into the PS PageCache from
/// that task, with waiting.
/// But data IOs are dispatched and waited upon from a sidecar
/// task so that the traversing task can continue to traverse
/// layers while the IOs are in flight.
/// If the PS PageCache miss rate is low, this improves
/// throughput dramatically.
SidecarTask,
}
pub mod statvfs {
pub mod mock {
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
@@ -464,6 +484,11 @@ impl Default for ConfigToml {
execution: PageServiceProtocolPipelinedExecutionStrategy::ConcurrentFutures,
})
},
get_vectored_concurrent_io: if !cfg!(test) {
GetVectoredConcurrentIo::Sequential
} else {
GetVectoredConcurrentIo::SidecarTask
},
}
}
}

View File

@@ -2,6 +2,7 @@
use std::{fmt::Display, str::FromStr};
/// For types `V` that implement [`FromStr`].
pub fn var<V, E>(varname: &str) -> Option<V>
where
V: FromStr<Err = E>,
@@ -10,7 +11,9 @@ where
match std::env::var(varname) {
Ok(s) => Some(
s.parse()
.map_err(|e| format!("failed to parse env var {varname}: {e:#}"))
.map_err(|e| {
format!("failed to parse env var {varname} using FromStr::parse: {e:#}")
})
.unwrap(),
),
Err(std::env::VarError::NotPresent) => None,
@@ -19,3 +22,24 @@ where
}
}
}
/// For types `V` that implement [`serde::de::DeserializeOwned`].
pub fn var_serde_json_string<V>(varname: &str) -> Option<V>
where
V: serde::de::DeserializeOwned,
{
match std::env::var(varname) {
Ok(s) => Some({
let value = serde_json::Value::String(s);
serde_json::from_value(value)
.map_err(|e| {
format!("failed to parse env var {varname} as a serde_json json string: {e:#}")
})
.unwrap()
}),
Err(std::env::VarError::NotPresent) => None,
Err(std::env::VarError::NotUnicode(_)) => {
panic!("env var {varname} is not unicode")
}
}
}

View File

@@ -64,6 +64,12 @@ pub struct GateGuard {
gate: Arc<GateInner>,
}
impl GateGuard {
pub fn try_clone(&self) -> Result<Self, GateError> {
Gate::enter_impl(self.gate.clone())
}
}
impl Drop for GateGuard {
fn drop(&mut self) {
if self.gate.closing.load(Ordering::Relaxed) {
@@ -107,11 +113,11 @@ impl Gate {
/// to avoid blocking close() indefinitely: typically types that contain a Gate will
/// also contain a CancellationToken.
pub fn enter(&self) -> Result<GateGuard, GateError> {
let permit = self
.inner
.sem
.try_acquire()
.map_err(|_| GateError::GateClosed)?;
Self::enter_impl(self.inner.clone())
}
fn enter_impl(gate: Arc<GateInner>) -> Result<GateGuard, GateError> {
let permit = gate.sem.try_acquire().map_err(|_| GateError::GateClosed)?;
// we now have the permit, let's disable the normal raii functionality and leave
// "returning" the permit to our GateGuard::drop.
@@ -122,7 +128,7 @@ impl Gate {
Ok(GateGuard {
span_at_enter: tracing::Span::current(),
gate: self.inner.clone(),
gate,
})
}
@@ -252,4 +258,39 @@ mod tests {
// Attempting to enter() is still forbidden
gate.enter().expect_err("enter should fail finishing close");
}
#[tokio::test(start_paused = true)]
async fn clone_gate_guard() {
let gate = Gate::default();
let forever = Duration::from_secs(24 * 7 * 365);
let guard1 = gate.enter().expect("gate isn't closed");
let guard2 = guard1.try_clone().expect("gate isn't clsoed");
let mut close_fut = std::pin::pin!(gate.close());
tokio::time::timeout(forever, &mut close_fut)
.await
.unwrap_err();
// we polled close_fut once, that should prevent all later enters and clones
gate.enter().unwrap_err();
guard1.try_clone().unwrap_err();
guard2.try_clone().unwrap_err();
// guard2 keeps gate open even if guard1 is closed
drop(guard1);
tokio::time::timeout(forever, &mut close_fut)
.await
.unwrap_err();
drop(guard2);
// now that the last guard is dropped, closing should complete
close_fut.await;
// entering is still forbidden
gate.enter().expect_err("enter should stilll fail");
}
}

View File

@@ -13,7 +13,7 @@ use rand::prelude::*;
use tokio::task::JoinSet;
use tracing::info;
use std::collections::HashSet;
use std::collections::{HashSet, VecDeque};
use std::future::Future;
use std::num::NonZeroUsize;
use std::pin::Pin;
@@ -63,6 +63,10 @@ pub(crate) struct Args {
#[clap(long)]
set_io_mode: Option<pageserver_api::models::virtual_file::IoMode>,
/// Queue depth generated in each client.
#[clap(long, default_value = "1")]
queue_depth: NonZeroUsize,
targets: Option<Vec<TenantTimelineId>>,
}
@@ -298,6 +302,7 @@ async fn main_impl(
start_work_barrier.wait().await;
let client_start = Instant::now();
let mut ticks_processed = 0;
let mut inflight = VecDeque::new();
while !cancel.is_cancelled() {
// Detect if a request took longer than the RPS rate
if let Some(period) = &rps_period {
@@ -311,31 +316,37 @@ async fn main_impl(
ticks_processed = periods_passed_until_now;
}
let start = Instant::now();
let req = {
let mut rng = rand::thread_rng();
let r = &ranges[weights.sample(&mut rng)];
let key: i128 = rng.gen_range(r.start..r.end);
let key = Key::from_i128(key);
assert!(key.is_rel_block_key());
let (rel_tag, block_no) = key
.to_rel_block()
.expect("we filter non-rel-block keys out above");
PagestreamGetPageRequest {
hdr: PagestreamRequest {
reqid: 0,
request_lsn: if rng.gen_bool(args.req_latest_probability) {
Lsn::MAX
} else {
r.timeline_lsn
while inflight.len() < args.queue_depth.get() {
let start = Instant::now();
let req = {
let mut rng = rand::thread_rng();
let r = &ranges[weights.sample(&mut rng)];
let key: i128 = rng.gen_range(r.start..r.end);
let key = Key::from_i128(key);
assert!(key.is_rel_block_key());
let (rel_tag, block_no) = key
.to_rel_block()
.expect("we filter non-rel-block keys out above");
PagestreamGetPageRequest {
hdr: PagestreamRequest {
reqid: 0,
request_lsn: if rng.gen_bool(args.req_latest_probability) {
Lsn::MAX
} else {
r.timeline_lsn
},
not_modified_since: r.timeline_lsn,
},
not_modified_since: r.timeline_lsn,
},
rel: rel_tag,
blkno: block_no,
}
};
client.getpage(req).await.unwrap();
rel: rel_tag,
blkno: block_no,
}
};
client.getpage_send(req).await.unwrap();
inflight.push_back(start);
}
let start = inflight.pop_front().unwrap();
client.getpage_recv().await.unwrap();
let end = Instant::now();
live_stats.request_done();
ticks_processed += 1;

View File

@@ -25,6 +25,7 @@ use tokio_tar::{Builder, EntryType, Header};
use crate::context::RequestContext;
use crate::pgdatadir_mapping::Version;
use crate::tenant::storage_layer::IoConcurrency;
use crate::tenant::Timeline;
use pageserver_api::reltag::{RelTag, SlruKind};
@@ -123,6 +124,13 @@ where
full_backup,
replica,
ctx,
io_concurrency: IoConcurrency::spawn_from_conf(
timeline.conf,
timeline
.gate
.enter()
.map_err(|e| BasebackupError::Server(e.into()))?,
),
};
basebackup
.send_tarball()
@@ -144,6 +152,7 @@ where
full_backup: bool,
replica: bool,
ctx: &'a RequestContext,
io_concurrency: IoConcurrency,
}
/// A sink that accepts SLRU blocks ordered by key and forwards
@@ -303,7 +312,7 @@ where
for part in slru_partitions.parts {
let blocks = self
.timeline
.get_vectored(part, self.lsn, self.ctx)
.get_vectored(part, self.lsn, self.io_concurrency.clone(), self.ctx)
.await
.map_err(|e| BasebackupError::Server(e.into()))?;
@@ -358,7 +367,7 @@ where
let start_time = Instant::now();
let aux_files = self
.timeline
.list_aux_files(self.lsn, self.ctx)
.list_aux_files(self.lsn, self.ctx, self.io_concurrency.clone())
.await
.map_err(|e| BasebackupError::Server(e.into()))?;
let aux_scan_time = start_time.elapsed();
@@ -422,7 +431,7 @@ where
}
let repl_origins = self
.timeline
.get_replorigins(self.lsn, self.ctx)
.get_replorigins(self.lsn, self.ctx, self.io_concurrency.clone())
.await
.map_err(|e| BasebackupError::Server(e.into()))?;
let n_origins = repl_origins.len();
@@ -489,7 +498,13 @@ where
for blknum in startblk..endblk {
let img = self
.timeline
.get_rel_page_at_lsn(src, blknum, Version::Lsn(self.lsn), self.ctx)
.get_rel_page_at_lsn(
src,
blknum,
Version::Lsn(self.lsn),
self.ctx,
self.io_concurrency.clone(),
)
.await
.map_err(|e| BasebackupError::Server(e.into()))?;
segment_data.extend_from_slice(&img[..]);

View File

@@ -135,6 +135,7 @@ fn main() -> anyhow::Result<()> {
info!(?conf.virtual_file_io_mode, "starting with virtual_file IO mode");
info!(?conf.wal_receiver_protocol, "starting with WAL receiver protocol");
info!(?conf.page_service_pipelining, "starting with page service pipelining config");
info!(?conf.get_vectored_concurrent_io, "starting with get_vectored IO concurrency config");
// The tenants directory contains all the pageserver local disk state.
// Create if not exists and make sure all the contents are durable before proceeding.

View File

@@ -191,6 +191,8 @@ pub struct PageServerConf {
pub wal_receiver_protocol: PostgresClientProtocol,
pub page_service_pipelining: pageserver_api::config::PageServicePipeliningConfig,
pub get_vectored_concurrent_io: pageserver_api::config::GetVectoredConcurrentIo,
}
/// Token for authentication to safekeepers
@@ -352,6 +354,7 @@ impl PageServerConf {
no_sync,
wal_receiver_protocol,
page_service_pipelining,
get_vectored_concurrent_io,
} = config_toml;
let mut conf = PageServerConf {
@@ -396,6 +399,7 @@ impl PageServerConf {
import_pgdata_aws_endpoint_url,
wal_receiver_protocol,
page_service_pipelining,
get_vectored_concurrent_io,
// ------------------------------------------------------------
// fields that require additional validation or custom handling

View File

@@ -84,6 +84,7 @@ use crate::tenant::remote_timeline_client::list_remote_tenant_shards;
use crate::tenant::remote_timeline_client::list_remote_timelines;
use crate::tenant::secondary::SecondaryController;
use crate::tenant::size::ModelInputs;
use crate::tenant::storage_layer::IoConcurrency;
use crate::tenant::storage_layer::LayerAccessStatsReset;
use crate::tenant::storage_layer::LayerName;
use crate::tenant::timeline::import_pgdata;
@@ -2938,8 +2939,15 @@ async fn list_aux_files(
active_timeline_of_active_tenant(&state.tenant_manager, tenant_shard_id, timeline_id)
.await?;
let io_concurrency = IoConcurrency::spawn_from_conf(
state.conf,
timeline.gate.enter().map_err(|_| ApiError::Cancelled)?,
);
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
let files = timeline.list_aux_files(body.lsn, &ctx).await?;
let files = timeline
.list_aux_files(body.lsn, &ctx, io_concurrency)
.await?;
json_response(StatusCode::OK, files)
}

View File

@@ -126,73 +126,6 @@ pub(crate) static INITDB_RUN_TIME: Lazy<Histogram> = Lazy::new(|| {
.expect("failed to define metric")
});
// Metrics collected on operations on the storage repository.
#[derive(
Clone, Copy, enum_map::Enum, strum_macros::EnumString, strum_macros::Display, IntoStaticStr,
)]
pub(crate) enum GetKind {
Singular,
Vectored,
}
pub(crate) struct ReconstructTimeMetrics {
singular: Histogram,
vectored: Histogram,
}
pub(crate) static RECONSTRUCT_TIME: Lazy<ReconstructTimeMetrics> = Lazy::new(|| {
let inner = register_histogram_vec!(
"pageserver_getpage_reconstruct_seconds",
"Time spent in reconstruct_value (reconstruct a page from deltas)",
&["get_kind"],
CRITICAL_OP_BUCKETS.into(),
)
.expect("failed to define a metric");
ReconstructTimeMetrics {
singular: inner.with_label_values(&[GetKind::Singular.into()]),
vectored: inner.with_label_values(&[GetKind::Vectored.into()]),
}
});
impl ReconstructTimeMetrics {
pub(crate) fn for_get_kind(&self, get_kind: GetKind) -> &Histogram {
match get_kind {
GetKind::Singular => &self.singular,
GetKind::Vectored => &self.vectored,
}
}
}
pub(crate) struct ReconstructDataTimeMetrics {
singular: Histogram,
vectored: Histogram,
}
impl ReconstructDataTimeMetrics {
pub(crate) fn for_get_kind(&self, get_kind: GetKind) -> &Histogram {
match get_kind {
GetKind::Singular => &self.singular,
GetKind::Vectored => &self.vectored,
}
}
}
pub(crate) static GET_RECONSTRUCT_DATA_TIME: Lazy<ReconstructDataTimeMetrics> = Lazy::new(|| {
let inner = register_histogram_vec!(
"pageserver_getpage_get_reconstruct_data_seconds",
"Time spent in get_reconstruct_value_data",
&["get_kind"],
CRITICAL_OP_BUCKETS.into(),
)
.expect("failed to define a metric");
ReconstructDataTimeMetrics {
singular: inner.with_label_values(&[GetKind::Singular.into()]),
vectored: inner.with_label_values(&[GetKind::Vectored.into()]),
}
});
pub(crate) struct GetVectoredLatency {
map: EnumMap<TaskKind, Option<Histogram>>,
}
@@ -3934,7 +3867,6 @@ pub fn preinitialize_metrics(conf: &'static PageServerConf) {
});
// Custom
Lazy::force(&RECONSTRUCT_TIME);
Lazy::force(&BASEBACKUP_QUERY_TIME);
Lazy::force(&COMPUTE_COMMANDS_COUNTERS);
Lazy::force(&tokio_epoll_uring::THREAD_LOCAL_METRICS_STORAGE);

View File

@@ -39,6 +39,7 @@ use tokio::io::{AsyncWriteExt, BufWriter};
use tokio::task::JoinHandle;
use tokio_util::sync::CancellationToken;
use tracing::*;
use utils::sync::gate::{Gate, GateGuard};
use utils::sync::spsc_fold;
use utils::{
auth::{Claims, Scope, SwappableJwtAuth},
@@ -61,6 +62,7 @@ use crate::task_mgr::{self, COMPUTE_REQUEST_RUNTIME};
use crate::tenant::mgr::ShardSelector;
use crate::tenant::mgr::TenantManager;
use crate::tenant::mgr::{GetActiveTenantError, GetTenantError, ShardResolveResult};
use crate::tenant::storage_layer::IoConcurrency;
use crate::tenant::timeline::{self, WaitLsnError};
use crate::tenant::GetTimelineError;
use crate::tenant::PageReconstructError;
@@ -90,6 +92,7 @@ pub struct Listener {
pub struct Connections {
cancel: CancellationToken,
tasks: tokio::task::JoinSet<ConnectionHandlerResult>,
gate: Gate,
}
pub fn spawn(
@@ -110,6 +113,7 @@ pub fn spawn(
let task = COMPUTE_REQUEST_RUNTIME.spawn(task_mgr::exit_on_panic_or_error(
"libpq listener",
libpq_listener_main(
conf,
tenant_manager,
pg_auth,
tcp_listener,
@@ -134,11 +138,16 @@ impl Listener {
}
impl Connections {
pub(crate) async fn shutdown(self) {
let Self { cancel, mut tasks } = self;
let Self {
cancel,
mut tasks,
gate,
} = self;
cancel.cancel();
while let Some(res) = tasks.join_next().await {
Self::handle_connection_completion(res);
}
gate.close().await;
}
fn handle_connection_completion(res: Result<anyhow::Result<()>, tokio::task::JoinError>) {
@@ -158,7 +167,9 @@ impl Connections {
/// Returns Ok(()) upon cancellation via `cancel`, returning the set of
/// open connections.
///
#[allow(clippy::too_many_arguments)]
pub async fn libpq_listener_main(
conf: &'static PageServerConf,
tenant_manager: Arc<TenantManager>,
auth: Option<Arc<SwappableJwtAuth>>,
listener: tokio::net::TcpListener,
@@ -168,9 +179,15 @@ pub async fn libpq_listener_main(
listener_cancel: CancellationToken,
) -> Connections {
let connections_cancel = CancellationToken::new();
let connections_gate = Gate::default();
let mut connection_handler_tasks = tokio::task::JoinSet::default();
loop {
let gate_guard = match connections_gate.enter() {
Ok(guard) => guard,
Err(_) => break,
};
let accepted = tokio::select! {
biased;
_ = listener_cancel.cancelled() => break,
@@ -190,6 +207,7 @@ pub async fn libpq_listener_main(
let connection_ctx = listener_ctx
.detached_child(TaskKind::PageRequestHandler, DownloadBehavior::Download);
connection_handler_tasks.spawn(page_service_conn_main(
conf,
tenant_manager.clone(),
local_auth,
socket,
@@ -197,6 +215,7 @@ pub async fn libpq_listener_main(
pipelining_config.clone(),
connection_ctx,
connections_cancel.child_token(),
gate_guard,
));
}
Err(err) => {
@@ -211,13 +230,16 @@ pub async fn libpq_listener_main(
Connections {
cancel: connections_cancel,
tasks: connection_handler_tasks,
gate: connections_gate,
}
}
type ConnectionHandlerResult = anyhow::Result<()>;
#[instrument(skip_all, fields(peer_addr))]
#[allow(clippy::too_many_arguments)]
async fn page_service_conn_main(
conf: &'static PageServerConf,
tenant_manager: Arc<TenantManager>,
auth: Option<Arc<SwappableJwtAuth>>,
socket: tokio::net::TcpStream,
@@ -225,6 +247,7 @@ async fn page_service_conn_main(
pipelining_config: PageServicePipeliningConfig,
connection_ctx: RequestContext,
cancel: CancellationToken,
gate_guard: GateGuard,
) -> ConnectionHandlerResult {
let _guard = LIVE_CONNECTIONS
.with_label_values(&["page_service"])
@@ -274,11 +297,13 @@ async fn page_service_conn_main(
// But it's in a shared crate, so, we store connection_ctx inside PageServerHandler
// and create the per-query context in process_query ourselves.
let mut conn_handler = PageServerHandler::new(
conf,
tenant_manager,
auth,
pipelining_config,
connection_ctx,
cancel.clone(),
gate_guard,
);
let pgbackend = PostgresBackend::new_from_io(socket, peer_addr, auth_type, None)?;
@@ -310,6 +335,7 @@ async fn page_service_conn_main(
}
struct PageServerHandler {
conf: &'static PageServerConf,
auth: Option<Arc<SwappableJwtAuth>>,
claims: Option<Claims>,
@@ -325,6 +351,8 @@ struct PageServerHandler {
timeline_handles: Option<TimelineHandles>,
pipelining_config: PageServicePipeliningConfig,
gate_guard: GateGuard,
}
struct TimelineHandles {
@@ -634,19 +662,23 @@ impl BatchedFeMessage {
impl PageServerHandler {
pub fn new(
conf: &'static PageServerConf,
tenant_manager: Arc<TenantManager>,
auth: Option<Arc<SwappableJwtAuth>>,
pipelining_config: PageServicePipeliningConfig,
connection_ctx: RequestContext,
cancel: CancellationToken,
gate_guard: GateGuard,
) -> Self {
PageServerHandler {
conf,
auth,
claims: None,
connection_ctx,
timeline_handles: Some(TimelineHandles::new(tenant_manager)),
cancel,
pipelining_config,
gate_guard,
}
}
@@ -1015,6 +1047,7 @@ impl PageServerHandler {
&mut self,
pgb_writer: &mut PostgresBackend<IO>,
batch: BatchedFeMessage,
io_concurrency: IoConcurrency,
cancel: &CancellationToken,
protocol_version: PagestreamProtocolVersion,
ctx: &RequestContext,
@@ -1084,6 +1117,7 @@ impl PageServerHandler {
&*shard.upgrade()?,
effective_request_lsn,
pages,
io_concurrency,
ctx,
)
.instrument(span.clone())
@@ -1288,6 +1322,17 @@ impl PageServerHandler {
}
}
let io_concurrency = IoConcurrency::spawn_from_conf(
self.conf,
match self.gate_guard.try_clone() {
Ok(guard) => guard,
Err(_) => {
info!("shutdown request received in page handler");
return Err(QueryError::Shutdown);
}
},
);
let pgb_reader = pgb
.split()
.context("implementation error: split pgb into reader and writer")?;
@@ -1309,6 +1354,7 @@ impl PageServerHandler {
request_span,
pipelining_config,
protocol_version,
io_concurrency,
&ctx,
)
.await
@@ -1322,6 +1368,7 @@ impl PageServerHandler {
timeline_handles,
request_span,
protocol_version,
io_concurrency,
&ctx,
)
.await
@@ -1349,6 +1396,7 @@ impl PageServerHandler {
mut timeline_handles: TimelineHandles,
request_span: Span,
protocol_version: PagestreamProtocolVersion,
io_concurrency: IoConcurrency,
ctx: &RequestContext,
) -> (
(PostgresBackendReader<IO>, TimelineHandles),
@@ -1383,7 +1431,14 @@ impl PageServerHandler {
};
let err = self
.pagesteam_handle_batched_message(pgb_writer, msg, &cancel, protocol_version, ctx)
.pagesteam_handle_batched_message(
pgb_writer,
msg,
io_concurrency.clone(),
&cancel,
protocol_version,
ctx,
)
.await;
match err {
Ok(()) => {}
@@ -1407,6 +1462,7 @@ impl PageServerHandler {
request_span: Span,
pipelining_config: PageServicePipeliningConfigPipelined,
protocol_version: PagestreamProtocolVersion,
io_concurrency: IoConcurrency,
ctx: &RequestContext,
) -> (
(PostgresBackendReader<IO>, TimelineHandles),
@@ -1550,6 +1606,7 @@ impl PageServerHandler {
self.pagesteam_handle_batched_message(
pgb_writer,
batch,
io_concurrency.clone(),
&cancel,
protocol_version,
&ctx,
@@ -1806,6 +1863,7 @@ impl PageServerHandler {
timeline: &Timeline,
effective_lsn: Lsn,
requests: smallvec::SmallVec<[BatchedGetPageRequest; 1]>,
io_concurrency: IoConcurrency,
ctx: &RequestContext,
) -> Vec<Result<(PagestreamBeMessage, SmgrOpTimer), BatchedPageStreamError>> {
debug_assert_current_span_has_tenant_and_timeline_id();
@@ -1832,6 +1890,7 @@ impl PageServerHandler {
.get_rel_page_at_lsn_batched(
requests.iter().map(|p| (&p.req.rel, &p.req.blkno)),
effective_lsn,
io_concurrency,
ctx,
)
.await;

View File

@@ -17,6 +17,7 @@ use crate::span::{
debug_assert_current_span_has_tenant_and_timeline_id,
debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id,
};
use crate::tenant::storage_layer::IoConcurrency;
use crate::tenant::timeline::GetVectoredError;
use anyhow::{ensure, Context};
use bytes::{Buf, Bytes, BytesMut};
@@ -200,6 +201,7 @@ impl Timeline {
blknum: BlockNumber,
version: Version<'_>,
ctx: &RequestContext,
io_concurrency: IoConcurrency,
) -> Result<Bytes, PageReconstructError> {
match version {
Version::Lsn(effective_lsn) => {
@@ -208,6 +210,7 @@ impl Timeline {
.get_rel_page_at_lsn_batched(
pages.iter().map(|(tag, blknum)| (tag, blknum)),
effective_lsn,
io_concurrency.clone(),
ctx,
)
.await;
@@ -246,6 +249,7 @@ impl Timeline {
&self,
pages: impl ExactSizeIterator<Item = (&RelTag, &BlockNumber)>,
effective_lsn: Lsn,
io_concurrency: IoConcurrency,
ctx: &RequestContext,
) -> Vec<Result<Bytes, PageReconstructError>> {
debug_assert_current_span_has_tenant_and_timeline_id();
@@ -309,7 +313,10 @@ impl Timeline {
acc.to_keyspace()
};
match self.get_vectored(keyspace, effective_lsn, ctx).await {
match self
.get_vectored(keyspace, effective_lsn, io_concurrency, ctx)
.await
{
Ok(results) => {
for (key, res) in results {
let mut key_slots = keys_slots.remove(&key).unwrap().into_iter();
@@ -889,9 +896,15 @@ impl Timeline {
&self,
lsn: Lsn,
ctx: &RequestContext,
io_concurrency: IoConcurrency,
) -> Result<HashMap<String, Bytes>, PageReconstructError> {
let kv = self
.scan(KeySpace::single(Key::metadata_aux_key_range()), lsn, ctx)
.scan(
KeySpace::single(Key::metadata_aux_key_range()),
lsn,
ctx,
io_concurrency,
)
.await?;
let mut result = HashMap::new();
let mut sz = 0;
@@ -914,8 +927,9 @@ impl Timeline {
&self,
lsn: Lsn,
ctx: &RequestContext,
io_concurrency: IoConcurrency,
) -> Result<(), PageReconstructError> {
self.list_aux_files_v2(lsn, ctx).await?;
self.list_aux_files_v2(lsn, ctx, io_concurrency).await?;
Ok(())
}
@@ -923,17 +937,24 @@ impl Timeline {
&self,
lsn: Lsn,
ctx: &RequestContext,
io_concurrency: IoConcurrency,
) -> Result<HashMap<String, Bytes>, PageReconstructError> {
self.list_aux_files_v2(lsn, ctx).await
self.list_aux_files_v2(lsn, ctx, io_concurrency).await
}
pub(crate) async fn get_replorigins(
&self,
lsn: Lsn,
ctx: &RequestContext,
io_concurrency: IoConcurrency,
) -> Result<HashMap<RepOriginId, Lsn>, PageReconstructError> {
let kv = self
.scan(KeySpace::single(repl_origin_key_range()), lsn, ctx)
.scan(
KeySpace::single(repl_origin_key_range()),
lsn,
ctx,
io_concurrency,
)
.await?;
let mut result = HashMap::new();
for (k, v) in kv {
@@ -2432,7 +2453,11 @@ mod tests {
("foo/bar2".to_string(), Bytes::from_static(b"content2")),
]);
let readback = tline.list_aux_files(Lsn(0x1008), &ctx).await?;
let io_concurrency = IoConcurrency::spawn_for_test();
let readback = tline
.list_aux_files(Lsn(0x1008), &ctx, io_concurrency.clone())
.await?;
assert_eq!(readback, expect_1008);
// Second modification: update one key, remove the other
@@ -2444,11 +2469,15 @@ mod tests {
let expect_2008 =
HashMap::from([("foo/bar1".to_string(), Bytes::from_static(b"content3"))]);
let readback = tline.list_aux_files(Lsn(0x2008), &ctx).await?;
let readback = tline
.list_aux_files(Lsn(0x2008), &ctx, io_concurrency.clone())
.await?;
assert_eq!(readback, expect_2008);
// Reading back in time works
let readback = tline.list_aux_files(Lsn(0x1008), &ctx).await?;
let readback = tline
.list_aux_files(Lsn(0x1008), &ctx, io_concurrency.clone())
.await?;
assert_eq!(readback, expect_1008);
Ok(())

View File

@@ -5714,7 +5714,7 @@ mod tests {
use pageserver_api::value::Value;
use pageserver_compaction::helpers::overlaps_with;
use rand::{thread_rng, Rng};
use storage_layer::PersistentLayerKey;
use storage_layer::{IoConcurrency, PersistentLayerKey};
use tests::storage_layer::ValuesReconstructState;
use tests::timeline::{GetVectoredError, ShutdownMode};
use timeline::{CompactOptions, DeltaLayerTestDesc};
@@ -6495,6 +6495,7 @@ mod tests {
async fn test_get_vectored() -> anyhow::Result<()> {
let harness = TenantHarness::create("test_get_vectored").await?;
let (tenant, ctx) = harness.load().await;
let io_concurrency = IoConcurrency::spawn_for_test();
let tline = tenant
.create_test_timeline(TIMELINE_ID, Lsn(0x08), DEFAULT_PG_VERSION, &ctx)
.await?;
@@ -6559,7 +6560,7 @@ mod tests {
.get_vectored_impl(
read.clone(),
reads_lsn,
&mut ValuesReconstructState::new(),
&mut ValuesReconstructState::new(io_concurrency.clone()),
&ctx,
)
.await;
@@ -6606,6 +6607,7 @@ mod tests {
let harness = TenantHarness::create("test_get_vectored_aux_files").await?;
let (tenant, ctx) = harness.load().await;
let io_concurrency = IoConcurrency::spawn_for_test();
let tline = tenant
.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)
.await?;
@@ -6640,7 +6642,7 @@ mod tests {
.get_vectored_impl(
aux_keyspace.clone(),
read_lsn,
&mut ValuesReconstructState::new(),
&mut ValuesReconstructState::new(io_concurrency.clone()),
&ctx,
)
.await;
@@ -6688,6 +6690,7 @@ mod tests {
)
.await?;
let (tenant, ctx) = harness.load().await;
let io_concurrency = IoConcurrency::spawn_for_test();
let mut current_key = Key::from_hex("010000000033333333444444445500000000").unwrap();
let gap_at_key = current_key.add(100);
@@ -6788,7 +6791,7 @@ mod tests {
.get_vectored_impl(
read.clone(),
current_lsn,
&mut ValuesReconstructState::new(),
&mut ValuesReconstructState::new(io_concurrency.clone()),
&ctx,
)
.await?;
@@ -6831,6 +6834,7 @@ mod tests {
async fn test_get_vectored_ancestor_descent() -> anyhow::Result<()> {
let harness = TenantHarness::create("test_get_vectored_on_lsn_axis").await?;
let (tenant, ctx) = harness.load().await;
let io_concurrency = IoConcurrency::spawn_for_test();
let start_key = Key::from_hex("010000000033333333444444445500000000").unwrap();
let end_key = start_key.add(1000);
@@ -6923,7 +6927,7 @@ mod tests {
ranges: vec![child_gap_at_key..child_gap_at_key.next()],
},
query_lsn,
&mut ValuesReconstructState::new(),
&mut ValuesReconstructState::new(io_concurrency.clone()),
&ctx,
)
.await;
@@ -7369,6 +7373,7 @@ mod tests {
async fn test_metadata_scan() -> anyhow::Result<()> {
let harness = TenantHarness::create("test_metadata_scan").await?;
let (tenant, ctx) = harness.load().await;
let io_concurrency = IoConcurrency::spawn_for_test();
let tline = tenant
.create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
.await?;
@@ -7422,7 +7427,7 @@ mod tests {
.get_vectored_impl(
keyspace.clone(),
lsn,
&mut ValuesReconstructState::default(),
&mut ValuesReconstructState::new(io_concurrency.clone()),
&ctx,
)
.await?
@@ -7537,6 +7542,7 @@ mod tests {
let harness = TenantHarness::create("test_aux_file_e2e").await.unwrap();
let (tenant, ctx) = harness.load().await;
let io_concurrency = IoConcurrency::spawn_for_test();
let mut lsn = Lsn(0x08);
@@ -7556,7 +7562,10 @@ mod tests {
}
// we can read everything from the storage
let files = tline.list_aux_files(lsn, &ctx).await.unwrap();
let files = tline
.list_aux_files(lsn, &ctx, io_concurrency.clone())
.await
.unwrap();
assert_eq!(
files.get("pg_logical/mappings/test1"),
Some(&bytes::Bytes::from_static(b"first"))
@@ -7572,7 +7581,10 @@ mod tests {
modification.commit(&ctx).await.unwrap();
}
let files = tline.list_aux_files(lsn, &ctx).await.unwrap();
let files = tline
.list_aux_files(lsn, &ctx, io_concurrency.clone())
.await
.unwrap();
assert_eq!(
files.get("pg_logical/mappings/test2"),
Some(&bytes::Bytes::from_static(b"second"))
@@ -7583,7 +7595,10 @@ mod tests {
.await
.unwrap();
let files = child.list_aux_files(lsn, &ctx).await.unwrap();
let files = child
.list_aux_files(lsn, &ctx, io_concurrency.clone())
.await
.unwrap();
assert_eq!(files.get("pg_logical/mappings/test1"), None);
assert_eq!(files.get("pg_logical/mappings/test2"), None);
}
@@ -7592,6 +7607,7 @@ mod tests {
async fn test_metadata_image_creation() -> anyhow::Result<()> {
let harness = TenantHarness::create("test_metadata_image_creation").await?;
let (tenant, ctx) = harness.load().await;
let io_concurrency = IoConcurrency::spawn_for_test();
let tline = tenant
.create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
.await?;
@@ -7611,8 +7627,9 @@ mod tests {
keyspace: &KeySpace,
lsn: Lsn,
ctx: &RequestContext,
io_concurrency: IoConcurrency,
) -> anyhow::Result<(BTreeMap<Key, Result<Bytes, PageReconstructError>>, usize)> {
let mut reconstruct_state = ValuesReconstructState::default();
let mut reconstruct_state = ValuesReconstructState::new(io_concurrency);
let res = tline
.get_vectored_impl(keyspace.clone(), lsn, &mut reconstruct_state, ctx)
.await?;
@@ -7660,7 +7677,8 @@ mod tests {
if iter % 5 == 0 {
let (_, before_delta_file_accessed) =
scan_with_statistics(&tline, &keyspace, lsn, &ctx).await?;
scan_with_statistics(&tline, &keyspace, lsn, &ctx, io_concurrency.clone())
.await?;
tline
.compact(
&cancel,
@@ -7674,7 +7692,8 @@ mod tests {
)
.await?;
let (_, after_delta_file_accessed) =
scan_with_statistics(&tline, &keyspace, lsn, &ctx).await?;
scan_with_statistics(&tline, &keyspace, lsn, &ctx, io_concurrency.clone())
.await?;
assert!(after_delta_file_accessed < before_delta_file_accessed, "after_delta_file_accessed={after_delta_file_accessed}, before_delta_file_accessed={before_delta_file_accessed}");
// Given that we already produced an image layer, there should be no delta layer needed for the scan, but still setting a low threshold there for unforeseen circumstances.
assert!(
@@ -7763,6 +7782,7 @@ mod tests {
async fn test_vectored_missing_metadata_key_reads() -> anyhow::Result<()> {
let harness = TenantHarness::create("test_vectored_missing_metadata_key_reads").await?;
let (tenant, ctx) = harness.load().await;
let io_concurrency = IoConcurrency::spawn_for_test();
let base_key = Key::from_hex("620000000033333333444444445500000000").unwrap();
let base_key_child = Key::from_hex("620000000033333333444444445500000001").unwrap();
@@ -7901,7 +7921,7 @@ mod tests {
);
// test vectored scan on parent timeline
let mut reconstruct_state = ValuesReconstructState::new();
let mut reconstruct_state = ValuesReconstructState::new(io_concurrency.clone());
let res = tline
.get_vectored_impl(
KeySpace::single(Key::metadata_key_range()),
@@ -7927,7 +7947,7 @@ mod tests {
);
// test vectored scan on child timeline
let mut reconstruct_state = ValuesReconstructState::new();
let mut reconstruct_state = ValuesReconstructState::new(io_concurrency.clone());
let res = child
.get_vectored_impl(
KeySpace::single(Key::metadata_key_range()),
@@ -7965,7 +7985,9 @@ mod tests {
lsn: Lsn,
ctx: &RequestContext,
) -> Result<Option<Bytes>, GetVectoredError> {
let mut reconstruct_state = ValuesReconstructState::new();
let io_concurrency =
IoConcurrency::spawn_from_conf(tline.conf, tline.gate.enter().unwrap());
let mut reconstruct_state = ValuesReconstructState::new(io_concurrency);
let mut res = tline
.get_vectored_impl(
KeySpace::single(key..key.next()),
@@ -8066,6 +8088,7 @@ mod tests {
.await
.unwrap();
let (tenant, ctx) = harness.load().await;
let io_concurrency = IoConcurrency::spawn_for_test();
let key0 = Key::from_hex("620000000033333333444444445500000000").unwrap();
let key1 = Key::from_hex("620000000033333333444444445500000001").unwrap();
@@ -8125,7 +8148,7 @@ mod tests {
// Image layers are created at last_record_lsn
let images = tline
.inspect_image_layers(Lsn(0x40), &ctx)
.inspect_image_layers(Lsn(0x40), &ctx, io_concurrency.clone())
.await
.unwrap()
.into_iter()
@@ -8140,6 +8163,7 @@ mod tests {
.await
.unwrap();
let (tenant, ctx) = harness.load().await;
let io_concurrency = IoConcurrency::spawn_for_test();
let key1 = Key::from_hex("620000000033333333444444445500000001").unwrap();
let key2 = Key::from_hex("620000000033333333444444445500000002").unwrap();
@@ -8190,7 +8214,7 @@ mod tests {
// Image layers are created at last_record_lsn
let images = tline
.inspect_image_layers(Lsn(0x30), &ctx)
.inspect_image_layers(Lsn(0x30), &ctx, io_concurrency.clone())
.await
.unwrap()
.into_iter()
@@ -8203,6 +8227,7 @@ mod tests {
async fn test_simple_bottom_most_compaction_images() -> anyhow::Result<()> {
let harness = TenantHarness::create("test_simple_bottom_most_compaction_images").await?;
let (tenant, ctx) = harness.load().await;
let io_concurrency = IoConcurrency::spawn_for_test();
fn get_key(id: u32) -> Key {
// using aux key here b/c they are guaranteed to be inside `collect_keyspace`.
@@ -8344,7 +8369,7 @@ mod tests {
// Check if the image layer at the GC horizon contains exactly what we want
let image_at_gc_horizon = tline
.inspect_image_layers(Lsn(0x30), &ctx)
.inspect_image_layers(Lsn(0x30), &ctx, io_concurrency.clone())
.await
.unwrap()
.into_iter()
@@ -10057,7 +10082,12 @@ mod tests {
let keyspace = KeySpace::single(get_key(0)..get_key(10));
let results = tline
.get_vectored(keyspace, delta_layer_end_lsn, &ctx)
.get_vectored(
keyspace,
delta_layer_end_lsn,
IoConcurrency::sequential(),
&ctx,
)
.await
.expect("No vectored errors");
for (key, res) in results {

View File

@@ -10,18 +10,26 @@ mod layer_desc;
mod layer_name;
pub mod merge_iterator;
use crate::config::PageServerConf;
use crate::context::{AccessStatsBehavior, RequestContext};
use bytes::Bytes;
use futures::stream::FuturesUnordered;
use futures::StreamExt;
use pageserver_api::key::Key;
use pageserver_api::keyspace::{KeySpace, KeySpaceRandomAccum};
use pageserver_api::record::NeonWalRecord;
use pageserver_api::value::Value;
use std::cmp::{Ordering, Reverse};
use std::cmp::Ordering;
use std::collections::hash_map::Entry;
use std::collections::{BinaryHeap, HashMap};
use std::future::Future;
use std::ops::Range;
use std::pin::Pin;
use std::sync::atomic::AtomicUsize;
use std::sync::Arc;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use tracing::{trace, Instrument};
use utils::sync::gate::GateGuard;
use utils::lsn::Lsn;
@@ -78,30 +86,151 @@ pub(crate) enum ValueReconstructSituation {
Continue,
}
/// Reconstruct data accumulated for a single key during a vectored get
#[derive(Debug, Default, Clone)]
pub(crate) struct VectoredValueReconstructState {
pub(crate) records: Vec<(Lsn, NeonWalRecord)>,
pub(crate) img: Option<(Lsn, Bytes)>,
situation: ValueReconstructSituation,
/// On disk representation of a value loaded in a buffer
#[derive(Debug)]
pub(crate) enum OnDiskValue {
/// Unencoded [`Value::Image`]
RawImage(Bytes),
/// Encoded [`Value`]. Can deserialize into an image or a WAL record
WalRecordOrImage(Bytes),
}
impl VectoredValueReconstructState {
fn get_cached_lsn(&self) -> Option<Lsn> {
self.img.as_ref().map(|img| img.0)
/// Reconstruct data accumulated for a single key during a vectored get
#[derive(Debug, Default)]
pub(crate) struct VectoredValueReconstructState {
pub(crate) on_disk_values: Vec<(Lsn, OnDiskValueIoWaiter)>,
pub(crate) situation: ValueReconstructSituation,
}
#[derive(Debug)]
pub(crate) struct OnDiskValueIoWaiter {
rx: tokio::sync::oneshot::Receiver<OnDiskValueIoResult>,
}
#[derive(Debug)]
#[must_use]
pub(crate) enum OnDiskValueIo {
/// Traversal identified this IO as required to complete the vectored get.
Required {
num_active_ios: Arc<AtomicUsize>,
tx: tokio::sync::oneshot::Sender<OnDiskValueIoResult>,
},
/// Sparse keyspace reads always read all the values for a given key,
/// even though only the first value is needed.
///
/// This variant represents the unnecessary IOs for those values at lower LSNs
/// that aren't needed, but are currently still being done.
///
/// The execution of unnecessary IOs was a pre-existing behavior before concurrent IO.
/// We added this explicit representation here so that we can drop
/// unnecessary IO results immediately, instead of buffering them in
/// `oneshot` channels inside [`VectoredValueReconstructState`] until
/// [`VectoredValueReconstructState::collect_pending_ios`] gets called.
Unnecessary,
}
type OnDiskValueIoResult = Result<OnDiskValue, std::io::Error>;
impl OnDiskValueIo {
pub(crate) fn complete(self, res: OnDiskValueIoResult) {
match self {
OnDiskValueIo::Required { num_active_ios, tx } => {
num_active_ios.fetch_sub(1, std::sync::atomic::Ordering::Release);
let _ = tx.send(res);
}
OnDiskValueIo::Unnecessary => {
// Nobody cared, see variant doc comment.
}
}
}
}
impl From<VectoredValueReconstructState> for ValueReconstructState {
fn from(mut state: VectoredValueReconstructState) -> Self {
// walredo expects the records to be descending in terms of Lsn
state.records.sort_by_key(|(lsn, _)| Reverse(*lsn));
#[derive(Debug, thiserror::Error)]
pub(crate) enum WaitCompletionError {
#[error("OnDiskValueIo was dropped without completing, likely the sidecar task panicked")]
IoDropped,
}
ValueReconstructState {
records: state.records,
img: state.img,
impl OnDiskValueIoWaiter {
pub(crate) async fn wait_completion(self) -> Result<OnDiskValueIoResult, WaitCompletionError> {
// NB: for Unnecessary IOs, this method never gets called because we don't add them to `on_disk_values`.
self.rx.await.map_err(|_| WaitCompletionError::IoDropped)
}
}
impl VectoredValueReconstructState {
/// # Cancel-Safety
///
/// Technically fine to stop polling this future, but, the IOs will still
/// be executed to completion by the sidecar task and hold on to / consume resources.
/// Better not do it to make reasonsing about the system easier.
pub(crate) async fn collect_pending_ios(
self,
) -> Result<ValueReconstructState, PageReconstructError> {
use utils::bin_ser::BeSer;
let mut res = Ok(ValueReconstructState::default());
// We should try hard not to bail early, so that by the time we return from this
// function, all IO for this value is done. It's not required -- we could totally
// stop polling the IO futures in the sidecar task, they need to support that,
// but just stopping to poll doesn't reduce the IO load on the disk. It's easier
// to reason about the system if we just wait for all IO to complete, even if
// we're no longer interested in the result.
//
// Revisit this when IO futures are replaced with a more sophisticated IO system
// and an IO scheduler, where we know which IOs were submitted and which ones
// just queued. Cf the comment on IoConcurrency::spawn_io.
for (lsn, waiter) in self.on_disk_values {
let value_recv_res = waiter
.wait_completion()
// we rely on the caller to poll us to completion, so this is not a bail point
.await;
// Force not bailing early by wrapping the code into a closure.
#[allow(clippy::redundant_closure_call)]
let _: () = (|| {
match (&mut res, value_recv_res) {
(Err(_), _) => {
// We've already failed, no need to process more.
}
(Ok(_), Err(wait_err)) => {
// This shouldn't happen - likely the sidecar task panicked.
res = Err(PageReconstructError::Other(wait_err.into()));
}
(Ok(_), Ok(Err(err))) => {
let err: std::io::Error = err;
// TODO: returning IO error here will fail a compute query.
// Probably not what we want, we're not doing `maybe_fatal_err`
// in the IO futures.
// But it's been like that for a long time, not changing it
// as part of concurrent IO.
// => https://github.com/neondatabase/neon/issues/10454
res = Err(PageReconstructError::Other(err.into()));
}
(Ok(ok), Ok(Ok(OnDiskValue::RawImage(img)))) => {
assert!(ok.img.is_none());
ok.img = Some((lsn, img));
}
(Ok(ok), Ok(Ok(OnDiskValue::WalRecordOrImage(buf)))) => {
match Value::des(&buf) {
Ok(Value::WalRecord(rec)) => {
ok.records.push((lsn, rec));
}
Ok(Value::Image(img)) => {
assert!(ok.img.is_none());
ok.img = Some((lsn, img));
}
Err(err) => {
res = Err(PageReconstructError::Other(err.into()));
}
}
}
}
})();
}
res
}
}
@@ -109,7 +238,7 @@ impl From<VectoredValueReconstructState> for ValueReconstructState {
pub(crate) struct ValuesReconstructState {
/// The keys will be removed after `get_vectored` completes. The caller outside `Timeline`
/// should not expect to get anything from this hashmap.
pub(crate) keys: HashMap<Key, Result<VectoredValueReconstructState, PageReconstructError>>,
pub(crate) keys: HashMap<Key, VectoredValueReconstructState>,
/// The keys which are already retrieved
keys_done: KeySpaceRandomAccum,
@@ -119,27 +248,365 @@ pub(crate) struct ValuesReconstructState {
// Statistics that are still accessible as a caller of `get_vectored_impl`.
layers_visited: u32,
delta_layers_visited: u32,
pub(crate) io_concurrency: IoConcurrency,
num_active_ios: Arc<AtomicUsize>,
}
/// The level of IO concurrency to be used on the read path
///
/// The desired end state is that we always do parallel IO.
/// This struct and the dispatching in the impl will be removed once
/// we've built enough confidence.
pub(crate) enum IoConcurrency {
Sequential,
SidecarTask {
task_id: usize,
ios_tx: tokio::sync::mpsc::UnboundedSender<IoFuture>,
},
}
type IoFuture = Pin<Box<dyn Send + Future<Output = ()>>>;
pub(crate) enum SelectedIoConcurrency {
Sequential,
SidecarTask(GateGuard),
}
impl std::fmt::Debug for IoConcurrency {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
IoConcurrency::Sequential => write!(f, "Sequential"),
IoConcurrency::SidecarTask { .. } => write!(f, "SidecarTask"),
}
}
}
impl std::fmt::Debug for SelectedIoConcurrency {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
SelectedIoConcurrency::Sequential => write!(f, "Sequential"),
SelectedIoConcurrency::SidecarTask(_) => write!(f, "SidecarTask"),
}
}
}
impl IoConcurrency {
/// Force sequential IO. This is a temporary workaround until we have
/// moved plumbing-through-the-call-stack
/// of IoConcurrency into `RequestContextq.
///
/// DO NOT USE for new code.
///
/// Tracking issue: <https://github.com/neondatabase/neon/issues/10460>.
pub(crate) fn sequential() -> Self {
Self::spawn(SelectedIoConcurrency::Sequential)
}
pub(crate) fn spawn_from_conf(
conf: &'static PageServerConf,
gate_guard: GateGuard,
) -> IoConcurrency {
use pageserver_api::config::GetVectoredConcurrentIo;
let selected = match conf.get_vectored_concurrent_io {
GetVectoredConcurrentIo::Sequential => SelectedIoConcurrency::Sequential,
GetVectoredConcurrentIo::SidecarTask => SelectedIoConcurrency::SidecarTask(gate_guard),
};
Self::spawn(selected)
}
pub(crate) fn spawn(io_concurrency: SelectedIoConcurrency) -> Self {
match io_concurrency {
SelectedIoConcurrency::Sequential => IoConcurrency::Sequential,
SelectedIoConcurrency::SidecarTask(gate_guard) => {
let (ios_tx, ios_rx) = tokio::sync::mpsc::unbounded_channel();
static TASK_ID: AtomicUsize = AtomicUsize::new(0);
let task_id = TASK_ID.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
// TODO: enrich the span with more context (tenant,shard,timeline) + (basebackup|pagestream|...)
let span =
tracing::info_span!(parent: None, "IoConcurrency_sidecar", task_id = task_id);
trace!(task_id, "spawning sidecar task");
tokio::spawn(async move {
trace!("start");
scopeguard::defer!{ trace!("end") };
type IosRx = tokio::sync::mpsc::UnboundedReceiver<IoFuture>;
enum State {
Waiting {
// invariant: is_empty(), but we recycle the allocation
empty_futures: FuturesUnordered<IoFuture>,
ios_rx: IosRx,
},
Executing {
futures: FuturesUnordered<IoFuture>,
ios_rx: IosRx,
},
ShuttingDown {
futures: FuturesUnordered<IoFuture>,
},
}
let mut state = State::Waiting {
empty_futures: FuturesUnordered::new(),
ios_rx,
};
loop {
match state {
State::Waiting {
empty_futures,
mut ios_rx,
} => {
assert!(empty_futures.is_empty());
tokio::select! {
fut = ios_rx.recv() => {
if let Some(fut) = fut {
trace!("received new io future");
empty_futures.push(fut);
state = State::Executing { futures: empty_futures, ios_rx };
} else {
state = State::ShuttingDown { futures: empty_futures }
}
}
}
}
State::Executing {
mut futures,
mut ios_rx,
} => {
tokio::select! {
res = futures.next() => {
trace!("io future completed");
assert!(res.is_some());
if futures.is_empty() {
state = State::Waiting { empty_futures: futures, ios_rx};
} else {
state = State::Executing { futures, ios_rx };
}
}
fut = ios_rx.recv() => {
if let Some(fut) = fut {
trace!("received new io future");
futures.push(fut);
state = State::Executing { futures, ios_rx};
} else {
state = State::ShuttingDown { futures };
}
}
}
}
State::ShuttingDown {
mut futures,
} => {
trace!("shutting down");
while let Some(()) = futures.next().await {
trace!("io future completed (shutdown)");
// drain
}
trace!("shutdown complete");
break;
}
}
}
drop(gate_guard); // drop it right before we exit
}.instrument(span));
IoConcurrency::SidecarTask { task_id, ios_tx }
}
}
}
pub(crate) fn clone(&self) -> Self {
match self {
IoConcurrency::Sequential => IoConcurrency::Sequential,
IoConcurrency::SidecarTask { task_id, ios_tx } => IoConcurrency::SidecarTask {
task_id: *task_id,
ios_tx: ios_tx.clone(),
},
}
}
/// Submit an IO to be executed in the background. DEADLOCK RISK, read the full doc string.
///
/// The IO is represented as an opaque future.
/// IO completion must be handled inside the future, e.g., through a oneshot channel.
///
/// The API seems simple but there are multiple **pitfalls** involving
/// DEADLOCK RISK.
///
/// First, there are no guarantees about the exexecution of the IO.
/// It may be `await`ed in-place before this function returns.
/// It may be polled partially by this task and handed off to another task to be finished.
/// It may be polled and then dropped before returning ready.
///
/// This means that submitted IOs must not be interedependent.
/// Interdependence may be through shared limited resources, e.g.,
/// - VirtualFile file descriptor cache slot acquisition
/// - tokio-epoll-uring slot
///
/// # Why current usage is safe from deadlocks
///
/// Textbook condition for a deadlock is that _all_ of the following be given
/// - Mutual exclusion
/// - Hold and wait
/// - No preemption
/// - Circular wait
///
/// The current usage is safe because:
/// - Mutual exclusion: IO futures definitely use mutexes, no way around that for now
/// - Hold and wait: IO futures currently hold two kinds of locks/resources while waiting
/// for acquisition of other resources:
/// - VirtualFile file descriptor cache slot tokio mutex
/// - tokio-epoll-uring slot (uses tokio notify => wait queue, much like mutex)
/// - No preemption: there's no taking-away of acquired locks/resources => given
/// - Circular wait: this is the part of the condition that isn't met: all IO futures
/// first acquire VirtualFile mutex, then tokio-epoll-uring slot.
/// There is no IO future that acquires slot before VirtualFile.
/// Hence there can be no circular waiting.
/// Hence there cannot be a deadlock.
///
/// This is a very fragile situation and must be revisited whenver any code called from
/// inside the IO futures is changed.
///
/// We will move away from opaque IO futures towards well-defined IOs at some point in
/// the future when we have shipped this first version of concurrent IO to production
/// and are ready to retire the Sequential mode which runs the futures in place.
/// Right now, while brittle, the opaque IO approach allows us to ship the feature
/// with minimal changes to the code and minimal changes to existing behavior in Sequential mode.
///
/// Also read the comment in `collect_pending_ios`.
pub(crate) async fn spawn_io<F>(&mut self, fut: F)
where
F: std::future::Future<Output = ()> + Send + 'static,
{
match self {
IoConcurrency::Sequential => fut.await,
IoConcurrency::SidecarTask { ios_tx, .. } => {
let fut = Box::pin(fut);
// NB: experiments showed that doing an opportunistic poll of `fut` here was bad for throughput
// while insignificant for latency.
// It would make sense to revisit the tokio-epoll-uring API in the future such that we can try
// a submission here, but never poll the future. That way, io_uring can make proccess while
// the future sits in the ios_tx queue.
match ios_tx.send(fut) {
Ok(()) => {}
Err(_) => {
unreachable!("the io task must have exited, likely it panicked")
}
}
}
}
}
#[cfg(test)]
pub(crate) fn spawn_for_test() -> impl std::ops::DerefMut<Target = Self> {
use std::ops::{Deref, DerefMut};
use tracing::info;
use utils::sync::gate::Gate;
// Spawn needs a Gate, give it one.
struct Wrapper {
inner: IoConcurrency,
#[allow(dead_code)]
gate: Box<Gate>,
}
impl Deref for Wrapper {
type Target = IoConcurrency;
fn deref(&self) -> &Self::Target {
&self.inner
}
}
impl DerefMut for Wrapper {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.inner
}
}
let gate = Box::new(Gate::default());
// The default behavior when running Rust unit tests without any further
// flags is to use the new behavior.
// The CI uses the following environment variable to unit test both old
// and new behavior.
// NB: the Python regression & perf tests take the `else` branch
// below and have their own defaults management.
let selected = {
// The pageserver_api::config type is unsuitable because it's internally tagged.
#[derive(serde::Deserialize)]
#[serde(rename_all = "kebab-case")]
enum TestOverride {
Sequential,
SidecarTask,
}
use once_cell::sync::Lazy;
static TEST_OVERRIDE: Lazy<TestOverride> = Lazy::new(|| {
utils::env::var_serde_json_string(
"NEON_PAGESERVER_UNIT_TEST_GET_VECTORED_CONCURRENT_IO",
)
.unwrap_or(TestOverride::SidecarTask)
});
match *TEST_OVERRIDE {
TestOverride::Sequential => SelectedIoConcurrency::Sequential,
TestOverride::SidecarTask => {
SelectedIoConcurrency::SidecarTask(gate.enter().expect("just created it"))
}
}
};
info!(?selected, "get_vectored_concurrent_io test");
Wrapper {
inner: Self::spawn(selected),
gate,
}
}
}
/// Make noise in case the [`ValuesReconstructState`] gets dropped while
/// there are still IOs in flight.
/// Refer to `collect_pending_ios` for why we prefer not to do that.
//
/// We log from here instead of from the sidecar task because the [`ValuesReconstructState`]
/// gets dropped in a tracing span with more context.
/// We repeat the sidecar tasks's `task_id` so we can correlate what we emit here with
/// the logs / panic handler logs from the sidecar task, which also logs the `task_id`.
impl Drop for ValuesReconstructState {
fn drop(&mut self) {
let num_active_ios = self
.num_active_ios
.load(std::sync::atomic::Ordering::Acquire);
if num_active_ios == 0 {
return;
}
let sidecar_task_id = match &self.io_concurrency {
IoConcurrency::Sequential => None,
IoConcurrency::SidecarTask { task_id, .. } => Some(*task_id),
};
tracing::warn!(
num_active_ios,
?sidecar_task_id,
backtrace=%std::backtrace::Backtrace::force_capture(),
"dropping ValuesReconstructState while some IOs have not been completed",
);
}
}
impl ValuesReconstructState {
pub(crate) fn new() -> Self {
pub(crate) fn new(io_concurrency: IoConcurrency) -> Self {
Self {
keys: HashMap::new(),
keys_done: KeySpaceRandomAccum::new(),
keys_with_image_coverage: None,
layers_visited: 0,
delta_layers_visited: 0,
io_concurrency,
num_active_ios: Arc::new(AtomicUsize::new(0)),
}
}
/// Associate a key with the error which it encountered and mark it as done
pub(crate) fn on_key_error(&mut self, key: Key, err: PageReconstructError) {
let previous = self.keys.insert(key, Err(err));
if let Some(Ok(state)) = previous {
if state.situation == ValueReconstructSituation::Continue {
self.keys_done.add_key(key);
}
}
/// Absolutely read [`IoConcurrency::spawn_io`] to learn about assumptions & pitfalls.
pub(crate) async fn spawn_io<F>(&mut self, fut: F)
where
F: std::future::Future<Output = ()> + Send + 'static,
{
self.io_concurrency.spawn_io(fut).await;
}
pub(crate) fn on_layer_visited(&mut self, layer: &ReadableLayer) {
@@ -159,29 +626,6 @@ impl ValuesReconstructState {
self.layers_visited
}
/// This function is called after reading a keyspace from a layer.
/// It checks if the read path has now moved past the cached Lsn for any keys.
///
/// Implementation note: We intentionally iterate over the keys for which we've
/// already collected some reconstruct data. This avoids scaling complexity with
/// the size of the search space.
pub(crate) fn on_lsn_advanced(&mut self, keyspace: &KeySpace, advanced_to: Lsn) {
for (key, value) in self.keys.iter_mut() {
if !keyspace.contains(key) {
continue;
}
if let Ok(state) = value {
if state.situation != ValueReconstructSituation::Complete
&& state.get_cached_lsn() >= Some(advanced_to)
{
state.situation = ValueReconstructSituation::Complete;
self.keys_done.add_key(*key);
}
}
}
}
/// On hitting image layer, we can mark all keys in this range as done, because
/// if the image layer does not contain a key, it is deleted/never added.
pub(crate) fn on_image_layer_visited(&mut self, key_range: &Range<Key>) {
@@ -199,70 +643,42 @@ impl ValuesReconstructState {
///
/// If the key is in the sparse keyspace (i.e., aux files), we do not track them in
/// `key_done`.
pub(crate) fn update_key(
&mut self,
key: &Key,
lsn: Lsn,
value: Value,
) -> ValueReconstructSituation {
let state = self
.keys
.entry(*key)
.or_insert(Ok(VectoredValueReconstructState::default()));
// TODO: rename this method & update description.
pub(crate) fn update_key(&mut self, key: &Key, lsn: Lsn, completes: bool) -> OnDiskValueIo {
let state = self.keys.entry(*key).or_default();
let is_sparse_key = key.is_sparse();
if let Ok(state) = state {
let key_done = match state.situation {
ValueReconstructSituation::Complete => {
if is_sparse_key {
// Sparse keyspace might be visited multiple times because
// we don't track unmapped keyspaces.
return ValueReconstructSituation::Complete;
} else {
unreachable!()
}
}
ValueReconstructSituation::Continue => match value {
Value::Image(img) => {
state.img = Some((lsn, img));
true
}
Value::WalRecord(rec) => {
debug_assert!(
Some(lsn) > state.get_cached_lsn(),
"Attempt to collect a record below cached LSN for walredo: {} < {}",
lsn,
state
.get_cached_lsn()
.expect("Assertion can only fire if a cached lsn is present")
);
let will_init = rec.will_init();
state.records.push((lsn, rec));
will_init
}
},
};
if key_done && state.situation == ValueReconstructSituation::Continue {
state.situation = ValueReconstructSituation::Complete;
if !is_sparse_key {
self.keys_done.add_key(*key);
let required_io = match state.situation {
ValueReconstructSituation::Complete => {
if is_sparse_key {
// Sparse keyspace might be visited multiple times because
// we don't track unmapped keyspaces.
return OnDiskValueIo::Unnecessary;
} else {
unreachable!()
}
}
ValueReconstructSituation::Continue => {
self.num_active_ios
.fetch_add(1, std::sync::atomic::Ordering::Release);
let (tx, rx) = tokio::sync::oneshot::channel();
state.on_disk_values.push((lsn, OnDiskValueIoWaiter { rx }));
OnDiskValueIo::Required {
tx,
num_active_ios: Arc::clone(&self.num_active_ios),
}
}
};
state.situation
} else {
ValueReconstructSituation::Complete
if completes && state.situation == ValueReconstructSituation::Continue {
state.situation = ValueReconstructSituation::Complete;
if !is_sparse_key {
self.keys_done.add_key(*key);
}
}
}
/// Returns the Lsn at which this key is cached if one exists.
/// The read path should go no further than this Lsn for the given key.
pub(crate) fn get_cached_lsn(&self, key: &Key) -> Option<Lsn> {
self.keys
.get(key)
.and_then(|k| k.as_ref().ok())
.and_then(|state| state.get_cached_lsn())
required_io
}
/// Returns the key space describing the keys that have
@@ -276,12 +692,6 @@ impl ValuesReconstructState {
}
}
impl Default for ValuesReconstructState {
fn default() -> Self {
Self::new()
}
}
/// A key that uniquely identifies a layer in a timeline
#[derive(Debug, PartialEq, Eq, Clone, Hash)]
pub(crate) enum LayerId {
@@ -720,3 +1130,78 @@ impl<T: std::fmt::Display> std::fmt::Debug for RangeDisplayDebug<'_, T> {
write!(f, "{}..{}", self.0.start, self.0.end)
}
}
#[cfg(test)]
mod tests2 {
use pageserver_api::key::DBDIR_KEY;
use tracing::info;
use super::*;
use crate::tenant::storage_layer::IoConcurrency;
/// TODO: currently this test relies on manual visual inspection of the --no-capture output.
/// Should look like so:
/// ```text
/// RUST_LOG=trace cargo nextest run --features testing --no-capture test_io_concurrency_noise
/// running 1 test
/// 2025-01-21T17:42:01.335679Z INFO get_vectored_concurrent_io test selected=SidecarTask
/// 2025-01-21T17:42:01.335680Z TRACE spawning sidecar task task_id=0
/// 2025-01-21T17:42:01.335937Z TRACE IoConcurrency_sidecar{task_id=0}: start
/// 2025-01-21T17:42:01.335972Z TRACE IoConcurrency_sidecar{task_id=0}: received new io future
/// 2025-01-21T17:42:01.335999Z INFO IoConcurrency_sidecar{task_id=0}: waiting for signal to complete IO
/// 2025-01-21T17:42:01.336229Z WARN dropping ValuesReconstructState while some IOs have not been completed num_active_ios=1 sidecar_task_id=Some(0) backtrace= 0: <pageserver::tenant::storage_layer::ValuesReconstructState as core::ops::drop::Drop>::drop
/// at ./src/tenant/storage_layer.rs:553:24
/// 1: core::ptr::drop_in_place<pageserver::tenant::storage_layer::ValuesReconstructState>
/// at /home/christian/.rustup/toolchains/1.84.0-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/library/core/src/ptr/mod.rs:521:1
/// 2: core::mem::drop
/// at /home/christian/.rustup/toolchains/1.84.0-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/library/core/src/mem/mod.rs:942:24
/// 3: pageserver::tenant::storage_layer::tests2::test_io_concurrency_noise::{{closure}}
/// at ./src/tenant/storage_layer.rs:1159:9
/// ...
/// 49: <unknown>
/// 2025-01-21T17:42:01.452293Z INFO IoConcurrency_sidecar{task_id=0}: completing IO
/// 2025-01-21T17:42:01.452357Z TRACE IoConcurrency_sidecar{task_id=0}: io future completed
/// 2025-01-21T17:42:01.452473Z TRACE IoConcurrency_sidecar{task_id=0}: end
/// test tenant::storage_layer::tests2::test_io_concurrency_noise ... ok
///
/// ```
#[tokio::test]
async fn test_io_concurrency_noise() {
crate::tenant::harness::setup_logging();
let io_concurrency = IoConcurrency::spawn_for_test();
match *io_concurrency {
IoConcurrency::Sequential => {
// This test asserts behavior in sidecar mode, doesn't make sense in sequential mode.
return;
}
IoConcurrency::SidecarTask { .. } => {}
}
let mut reconstruct_state = ValuesReconstructState::new(io_concurrency.clone());
let (io_fut_is_waiting_tx, io_fut_is_waiting) = tokio::sync::oneshot::channel();
let (do_complete_io, should_complete_io) = tokio::sync::oneshot::channel();
let (io_fut_exiting_tx, io_fut_exiting) = tokio::sync::oneshot::channel();
let io = reconstruct_state.update_key(&DBDIR_KEY, Lsn(8), true);
reconstruct_state
.spawn_io(async move {
info!("waiting for signal to complete IO");
io_fut_is_waiting_tx.send(()).unwrap();
should_complete_io.await.unwrap();
info!("completing IO");
io.complete(Ok(OnDiskValue::RawImage(Bytes::new())));
io_fut_exiting_tx.send(()).unwrap();
})
.await;
io_fut_is_waiting.await.unwrap();
// this is what makes the noise
drop(reconstruct_state);
do_complete_io.send(()).unwrap();
io_fut_exiting.await.unwrap();
}
}

View File

@@ -41,13 +41,12 @@ use crate::tenant::vectored_blob_io::{
BlobFlag, BufView, StreamingVectoredReadPlanner, VectoredBlobReader, VectoredRead,
VectoredReadPlanner,
};
use crate::tenant::PageReconstructError;
use crate::virtual_file::owned_buffers_io::io_buf_ext::{FullSlice, IoBufExt};
use crate::virtual_file::IoBufferMut;
use crate::virtual_file::{self, MaybeFatalIo, VirtualFile};
use crate::TEMP_FILE_SUFFIX;
use crate::{DELTA_FILE_MAGIC, STORAGE_FORMAT_VERSION};
use anyhow::{anyhow, bail, ensure, Context, Result};
use anyhow::{bail, ensure, Context, Result};
use camino::{Utf8Path, Utf8PathBuf};
use futures::StreamExt;
use itertools::Itertools;
@@ -60,7 +59,7 @@ use pageserver_api::shard::TenantShardId;
use pageserver_api::value::Value;
use rand::{distributions::Alphanumeric, Rng};
use serde::{Deserialize, Serialize};
use std::collections::VecDeque;
use std::collections::{HashMap, VecDeque};
use std::fs::File;
use std::io::SeekFrom;
use std::ops::Range;
@@ -77,7 +76,10 @@ use utils::{
lsn::Lsn,
};
use super::{AsLayerDesc, LayerName, PersistentLayerDesc, ValuesReconstructState};
use super::{
AsLayerDesc, LayerName, OnDiskValue, OnDiskValueIo, PersistentLayerDesc, ResidentLayer,
ValuesReconstructState,
};
///
/// Header stored in the beginning of the file
@@ -226,7 +228,7 @@ pub struct DeltaLayerInner {
index_start_blk: u32,
index_root_blk: u32,
file: VirtualFile,
file: Arc<VirtualFile>,
file_id: FileId,
layer_key_range: Range<Key>,
@@ -795,9 +797,11 @@ impl DeltaLayerInner {
max_vectored_read_bytes: Option<MaxVectoredReadBytes>,
ctx: &RequestContext,
) -> anyhow::Result<Self> {
let file = VirtualFile::open_v2(path, ctx)
.await
.context("open layer file")?;
let file = Arc::new(
VirtualFile::open_v2(path, ctx)
.await
.context("open layer file")?,
);
let file_id = page_cache::next_file_id();
@@ -842,12 +846,11 @@ impl DeltaLayerInner {
// Look up the keys in the provided keyspace and update
// the reconstruct state with whatever is found.
//
// If the key is cached, go no further than the cached Lsn.
//
// Currently, the index is visited for each range, but this
// can be further optimised to visit the index only once.
pub(super) async fn get_values_reconstruct_data(
&self,
this: ResidentLayer,
keyspace: KeySpace,
lsn_range: Range<Lsn>,
reconstruct_state: &mut ValuesReconstructState,
@@ -875,17 +878,14 @@ impl DeltaLayerInner {
data_end_offset,
index_reader,
planner,
reconstruct_state,
ctx,
)
.await
.map_err(GetVectoredError::Other)?;
self.do_reads_and_update_state(reads, reconstruct_state, ctx)
self.do_reads_and_update_state(this, reads, reconstruct_state, ctx)
.await;
reconstruct_state.on_lsn_advanced(&keyspace, lsn_range.start);
Ok(())
}
@@ -895,7 +895,6 @@ impl DeltaLayerInner {
data_end_offset: u64,
index_reader: DiskBtreeReader<Reader, DELTA_KEY_SIZE>,
mut planner: VectoredReadPlanner,
reconstruct_state: &mut ValuesReconstructState,
ctx: &RequestContext,
) -> anyhow::Result<Vec<VectoredRead>>
where
@@ -922,10 +921,9 @@ impl DeltaLayerInner {
assert!(key >= range.start);
let outside_lsn_range = !lsn_range.contains(&lsn);
let below_cached_lsn = reconstruct_state.get_cached_lsn(&key) >= Some(lsn);
let flag = {
if outside_lsn_range || below_cached_lsn {
if outside_lsn_range {
BlobFlag::Ignore
} else if blob_ref.will_init() {
BlobFlag::ReplaceAll
@@ -994,98 +992,78 @@ impl DeltaLayerInner {
async fn do_reads_and_update_state(
&self,
this: ResidentLayer,
reads: Vec<VectoredRead>,
reconstruct_state: &mut ValuesReconstructState,
ctx: &RequestContext,
) {
let vectored_blob_reader = VectoredBlobReader::new(&self.file);
let mut ignore_key_with_err = None;
let max_vectored_read_bytes = self
.max_vectored_read_bytes
.expect("Layer is loaded with max vectored bytes config")
.0
.into();
let buf_size = Self::get_min_read_buffer_size(&reads, max_vectored_read_bytes);
let mut buf = Some(IoBufferMut::with_capacity(buf_size));
// Note that reads are processed in reverse order (from highest key+lsn).
// This is the order that `ReconstructState` requires such that it can
// track when a key is done.
for read in reads.into_iter().rev() {
let res = vectored_blob_reader
.read_blobs(&read, buf.take().expect("Should have a buffer"), ctx)
.await;
let blobs_buf = match res {
Ok(blobs_buf) => blobs_buf,
Err(err) => {
let kind = err.kind();
for (_, blob_meta) in read.blobs_at.as_slice() {
reconstruct_state.on_key_error(
blob_meta.key,
PageReconstructError::Other(anyhow!(
"Failed to read blobs from virtual file {}: {}",
self.file.path(),
kind
)),
);
}
// We have "lost" the buffer since the lower level IO api
// doesn't return the buffer on error. Allocate a new one.
buf = Some(IoBufferMut::with_capacity(buf_size));
continue;
}
};
let view = BufView::new_slice(&blobs_buf.buf);
for meta in blobs_buf.blobs.iter().rev() {
if Some(meta.meta.key) == ignore_key_with_err {
continue;
}
let blob_read = meta.read(&view).await;
let blob_read = match blob_read {
Ok(buf) => buf,
Err(e) => {
reconstruct_state.on_key_error(
meta.meta.key,
PageReconstructError::Other(anyhow!(e).context(format!(
"Failed to decompress blob from virtual file {}",
self.file.path(),
))),
);
ignore_key_with_err = Some(meta.meta.key);
continue;
}
};
let value = Value::des(&blob_read);
let value = match value {
Ok(v) => v,
Err(e) => {
reconstruct_state.on_key_error(
meta.meta.key,
PageReconstructError::Other(anyhow!(e).context(format!(
"Failed to deserialize blob from virtual file {}",
self.file.path(),
))),
);
ignore_key_with_err = Some(meta.meta.key);
continue;
}
};
// Invariant: once a key reaches [`ValueReconstructSituation::Complete`]
// state, no further updates shall be made to it. The call below will
// panic if the invariant is violated.
reconstruct_state.update_key(&meta.meta.key, meta.meta.lsn, value);
let mut ios: HashMap<(Key, Lsn), OnDiskValueIo> = Default::default();
for (_, blob_meta) in read.blobs_at.as_slice().iter().rev() {
let io = reconstruct_state.update_key(
&blob_meta.key,
blob_meta.lsn,
blob_meta.will_init,
);
ios.insert((blob_meta.key, blob_meta.lsn), io);
}
buf = Some(blobs_buf.buf);
let read_extend_residency = this.clone();
let read_from = self.file.clone();
let read_ctx = ctx.attached_child();
reconstruct_state
.spawn_io(async move {
let vectored_blob_reader = VectoredBlobReader::new(&read_from);
let buf = IoBufferMut::with_capacity(buf_size);
let res = vectored_blob_reader.read_blobs(&read, buf, &read_ctx).await;
match res {
Ok(blobs_buf) => {
let view = BufView::new_slice(&blobs_buf.buf);
for meta in blobs_buf.blobs.iter().rev() {
let io = ios.remove(&(meta.meta.key, meta.meta.lsn)).unwrap();
let blob_read = meta.read(&view).await;
let blob_read = match blob_read {
Ok(buf) => buf,
Err(e) => {
io.complete(Err(e));
continue;
}
};
io.complete(Ok(OnDiskValue::WalRecordOrImage(
blob_read.into_bytes(),
)));
}
assert!(ios.is_empty());
}
Err(err) => {
for (_, sender) in ios {
sender.complete(Err(std::io::Error::new(
err.kind(),
"vec read failed",
)));
}
}
}
// keep layer resident until this IO is done; this spawned IO future generally outlives the
// call to `self` / the `Arc<DownloadedLayer>` / the `ResidentLayer` that guarantees residency
drop(read_extend_residency);
})
.await;
}
}
@@ -1224,7 +1202,14 @@ impl DeltaLayerInner {
let actionable = if let Some((key, lsn, start_offset)) = prev.take() {
let end_offset = offset;
Some((BlobMeta { key, lsn }, start_offset..end_offset))
Some((
BlobMeta {
key,
lsn,
will_init: false,
},
start_offset..end_offset,
))
} else {
None
};
@@ -1560,7 +1545,9 @@ impl DeltaLayerIterator<'_> {
let lsn = DeltaKey::extract_lsn_from_buf(&raw_key);
let blob_ref = BlobRef(value);
let offset = blob_ref.pos();
if let Some(batch_plan) = self.planner.handle(key, lsn, offset) {
if let Some(batch_plan) =
self.planner.handle(key, lsn, offset, blob_ref.will_init())
{
break batch_plan;
}
} else {
@@ -1673,7 +1660,6 @@ pub(crate) mod test {
.expect("In memory disk finish should never fail");
let reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(0, root_offset, disk);
let planner = VectoredReadPlanner::new(100);
let mut reconstruct_state = ValuesReconstructState::new();
let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
let keyspace = KeySpace {
@@ -1691,7 +1677,6 @@ pub(crate) mod test {
disk_offset,
reader,
planner,
&mut reconstruct_state,
&ctx,
)
.await
@@ -1935,7 +1920,6 @@ pub(crate) mod test {
);
let planner = VectoredReadPlanner::new(constants::MAX_VECTORED_READ_BYTES);
let mut reconstruct_state = ValuesReconstructState::new();
let keyspace = pick_random_keyspace(rng, &entries_meta.key_range);
let data_end_offset = inner.index_start_blk as u64 * PAGE_SZ as u64;
@@ -1945,7 +1929,6 @@ pub(crate) mod test {
data_end_offset,
index_reader,
planner,
&mut reconstruct_state,
&ctx,
)
.await?;

View File

@@ -38,12 +38,11 @@ use crate::tenant::vectored_blob_io::{
BlobFlag, BufView, StreamingVectoredReadPlanner, VectoredBlobReader, VectoredRead,
VectoredReadPlanner,
};
use crate::tenant::PageReconstructError;
use crate::virtual_file::owned_buffers_io::io_buf_ext::IoBufExt;
use crate::virtual_file::IoBufferMut;
use crate::virtual_file::{self, MaybeFatalIo, VirtualFile};
use crate::{IMAGE_FILE_MAGIC, STORAGE_FORMAT_VERSION, TEMP_FILE_SUFFIX};
use anyhow::{anyhow, bail, ensure, Context, Result};
use anyhow::{bail, ensure, Context, Result};
use bytes::Bytes;
use camino::{Utf8Path, Utf8PathBuf};
use hex;
@@ -56,12 +55,13 @@ use pageserver_api::shard::{ShardIdentity, TenantShardId};
use pageserver_api::value::Value;
use rand::{distributions::Alphanumeric, Rng};
use serde::{Deserialize, Serialize};
use std::collections::VecDeque;
use std::collections::{HashMap, VecDeque};
use std::fs::File;
use std::io::SeekFrom;
use std::ops::Range;
use std::os::unix::prelude::FileExt;
use std::str::FromStr;
use std::sync::Arc;
use tokio::sync::OnceCell;
use tokio_stream::StreamExt;
use tracing::*;
@@ -73,7 +73,10 @@ use utils::{
};
use super::layer_name::ImageLayerName;
use super::{AsLayerDesc, LayerName, PersistentLayerDesc, ValuesReconstructState};
use super::{
AsLayerDesc, LayerName, OnDiskValue, OnDiskValueIo, PersistentLayerDesc, ResidentLayer,
ValuesReconstructState,
};
///
/// Header stored in the beginning of the file
@@ -164,7 +167,7 @@ pub struct ImageLayerInner {
key_range: Range<Key>,
lsn: Lsn,
file: VirtualFile,
file: Arc<VirtualFile>,
file_id: FileId,
max_vectored_read_bytes: Option<MaxVectoredReadBytes>,
@@ -391,9 +394,11 @@ impl ImageLayerInner {
max_vectored_read_bytes: Option<MaxVectoredReadBytes>,
ctx: &RequestContext,
) -> anyhow::Result<Self> {
let file = VirtualFile::open_v2(path, ctx)
.await
.context("open layer file")?;
let file = Arc::new(
VirtualFile::open_v2(path, ctx)
.await
.context("open layer file")?,
);
let file_id = page_cache::next_file_id();
let block_reader = FileBlockReader::new(&file, file_id);
let summary_blk = block_reader
@@ -439,6 +444,7 @@ impl ImageLayerInner {
// the reconstruct state with whatever is found.
pub(super) async fn get_values_reconstruct_data(
&self,
this: ResidentLayer,
keyspace: KeySpace,
reconstruct_state: &mut ValuesReconstructState,
ctx: &RequestContext,
@@ -448,7 +454,7 @@ impl ImageLayerInner {
.await
.map_err(GetVectoredError::Other)?;
self.do_reads_and_update_state(reads, reconstruct_state, ctx)
self.do_reads_and_update_state(this, reads, reconstruct_state, ctx)
.await;
reconstruct_state.on_image_layer_visited(&self.key_range);
@@ -570,6 +576,7 @@ impl ImageLayerInner {
async fn do_reads_and_update_state(
&self,
this: ResidentLayer,
reads: Vec<VectoredRead>,
reconstruct_state: &mut ValuesReconstructState,
ctx: &RequestContext,
@@ -580,8 +587,13 @@ impl ImageLayerInner {
.0
.into();
let vectored_blob_reader = VectoredBlobReader::new(&self.file);
for read in reads.into_iter() {
let mut ios: HashMap<(Key, Lsn), OnDiskValueIo> = Default::default();
for (_, blob_meta) in read.blobs_at.as_slice() {
let io = reconstruct_state.update_key(&blob_meta.key, blob_meta.lsn, true);
ios.insert((blob_meta.key, blob_meta.lsn), io);
}
let buf_size = read.size();
if buf_size > max_vectored_read_bytes {
@@ -611,50 +623,51 @@ impl ImageLayerInner {
}
}
let buf = IoBufferMut::with_capacity(buf_size);
let res = vectored_blob_reader.read_blobs(&read, buf, ctx).await;
let read_extend_residency = this.clone();
let read_from = self.file.clone();
let read_ctx = ctx.attached_child();
reconstruct_state
.spawn_io(async move {
let buf = IoBufferMut::with_capacity(buf_size);
let vectored_blob_reader = VectoredBlobReader::new(&read_from);
let res = vectored_blob_reader.read_blobs(&read, buf, &read_ctx).await;
match res {
Ok(blobs_buf) => {
let view = BufView::new_slice(&blobs_buf.buf);
for meta in blobs_buf.blobs.iter() {
let img_buf = meta.read(&view).await;
match res {
Ok(blobs_buf) => {
let view = BufView::new_slice(&blobs_buf.buf);
for meta in blobs_buf.blobs.iter() {
let io: OnDiskValueIo =
ios.remove(&(meta.meta.key, meta.meta.lsn)).unwrap();
let img_buf = meta.read(&view).await;
let img_buf = match img_buf {
Ok(img_buf) => img_buf,
Err(e) => {
reconstruct_state.on_key_error(
meta.meta.key,
PageReconstructError::Other(anyhow!(e).context(format!(
"Failed to decompress blob from virtual file {}",
self.file.path(),
))),
);
let img_buf = match img_buf {
Ok(img_buf) => img_buf,
Err(e) => {
io.complete(Err(e));
continue;
}
};
continue;
io.complete(Ok(OnDiskValue::RawImage(img_buf.into_bytes())));
}
};
reconstruct_state.update_key(
&meta.meta.key,
self.lsn,
Value::Image(img_buf.into_bytes()),
);
assert!(ios.is_empty());
}
Err(err) => {
for (_, io) in ios {
io.complete(Err(std::io::Error::new(
err.kind(),
"vec read failed",
)));
}
}
}
}
Err(err) => {
let kind = err.kind();
for (_, blob_meta) in read.blobs_at.as_slice() {
reconstruct_state.on_key_error(
blob_meta.key,
PageReconstructError::from(anyhow!(
"Failed to read blobs from virtual file {}: {}",
self.file.path(),
kind
)),
);
}
}
};
// keep layer resident until this IO is done; this spawned IO future generally outlives the
// call to `self` / the `Arc<DownloadedLayer>` / the `ResidentLayer` that guarantees residency
drop(read_extend_residency);
})
.await;
}
}
@@ -1069,6 +1082,7 @@ impl ImageLayerIterator<'_> {
Key::from_slice(&raw_key[..KEY_SIZE]),
self.image_layer.lsn,
offset,
true,
) {
break batch_plan;
}

View File

@@ -8,23 +8,22 @@ use crate::assert_u64_eq_usize::{u64_to_usize, U64IsUsize, UsizeIsU64};
use crate::config::PageServerConf;
use crate::context::{PageContentKind, RequestContext, RequestContextBuilder};
use crate::tenant::ephemeral_file::EphemeralFile;
use crate::tenant::storage_layer::{OnDiskValue, OnDiskValueIo};
use crate::tenant::timeline::GetVectoredError;
use crate::tenant::PageReconstructError;
use crate::virtual_file::owned_buffers_io::io_buf_ext::IoBufExt;
use crate::{l0_flush, page_cache};
use anyhow::{anyhow, Result};
use anyhow::Result;
use camino::Utf8PathBuf;
use pageserver_api::key::CompactKey;
use pageserver_api::key::Key;
use pageserver_api::keyspace::KeySpace;
use pageserver_api::models::InMemoryLayerInfo;
use pageserver_api::shard::TenantShardId;
use pageserver_api::value::Value;
use std::collections::{BTreeMap, HashMap};
use std::sync::{Arc, OnceLock};
use std::time::Instant;
use tracing::*;
use utils::{bin_ser::BeSer, id::TimelineId, lsn::Lsn, vec_map::VecMap};
use utils::{id::TimelineId, lsn::Lsn, vec_map::VecMap};
use wal_decoder::serialized_batch::{SerializedValueBatch, SerializedValueMeta, ValueMeta};
// avoid binding to Write (conflicts with std::io::Write)
// while being able to use std::fmt::Write's methods
@@ -36,9 +35,7 @@ use std::sync::atomic::Ordering as AtomicOrdering;
use std::sync::atomic::{AtomicU64, AtomicUsize};
use tokio::sync::RwLock;
use super::{
DeltaLayerWriter, PersistentLayerDesc, ValueReconstructSituation, ValuesReconstructState,
};
use super::{DeltaLayerWriter, PersistentLayerDesc, ValuesReconstructState};
pub(crate) mod vectored_dio_read;
@@ -415,10 +412,8 @@ impl InMemoryLayer {
// Look up the keys in the provided keyspace and update
// the reconstruct state with whatever is found.
//
// If the key is cached, go no further than the cached Lsn.
pub(crate) async fn get_values_reconstruct_data(
&self,
self: &Arc<InMemoryLayer>,
keyspace: KeySpace,
end_lsn: Lsn,
reconstruct_state: &mut ValuesReconstructState,
@@ -435,6 +430,9 @@ impl InMemoryLayer {
read: vectored_dio_read::LogicalRead<Vec<u8>>,
}
let mut reads: HashMap<Key, Vec<ValueRead>> = HashMap::new();
let mut ios: HashMap<(Key, Lsn), OnDiskValueIo> = Default::default();
let lsn_range = self.start_lsn..end_lsn;
for range in keyspace.ranges.iter() {
for (key, vec_map) in inner
@@ -442,12 +440,7 @@ impl InMemoryLayer {
.range(range.start.to_compact()..range.end.to_compact())
{
let key = Key::from_compact(*key);
let lsn_range = match reconstruct_state.get_cached_lsn(&key) {
Some(cached_lsn) => (cached_lsn + 1)..end_lsn,
None => self.start_lsn..end_lsn,
};
let slice = vec_map.slice_range(lsn_range);
let slice = vec_map.slice_range(lsn_range.clone());
for (entry_lsn, index_entry) in slice.iter().rev() {
let IndexEntryUnpacked {
@@ -463,55 +456,59 @@ impl InMemoryLayer {
Vec::with_capacity(len as usize),
),
});
let io = reconstruct_state.update_key(&key, *entry_lsn, will_init);
ios.insert((key, *entry_lsn), io);
if will_init {
break;
}
}
}
}
drop(inner); // release the lock before we spawn the IO; if it's serial-mode IO we will deadlock on the read().await below
let read_from = Arc::clone(self);
let read_ctx = ctx.attached_child();
reconstruct_state
.spawn_io(async move {
let inner = read_from.inner.read().await;
let f = vectored_dio_read::execute(
&inner.file,
reads
.iter()
.flat_map(|(_, value_reads)| value_reads.iter().map(|v| &v.read)),
&read_ctx,
);
send_future::SendFuture::send(f) // https://github.com/rust-lang/rust/issues/96865
.await;
// Execute the reads.
let f = vectored_dio_read::execute(
&inner.file,
reads
.iter()
.flat_map(|(_, value_reads)| value_reads.iter().map(|v| &v.read)),
&ctx,
);
send_future::SendFuture::send(f) // https://github.com/rust-lang/rust/issues/96865
.await;
// Process results into the reconstruct state
'next_key: for (key, value_reads) in reads {
for ValueRead { entry_lsn, read } in value_reads {
match read.into_result().expect("we run execute() above") {
Err(e) => {
reconstruct_state.on_key_error(key, PageReconstructError::from(anyhow!(e)));
continue 'next_key;
}
Ok(value_buf) => {
let value = Value::des(&value_buf);
if let Err(e) = value {
reconstruct_state
.on_key_error(key, PageReconstructError::from(anyhow!(e)));
continue 'next_key;
for (key, value_reads) in reads {
for ValueRead { entry_lsn, read } in value_reads {
let io = ios.remove(&(key, entry_lsn)).expect("sender must exist");
match read.into_result().expect("we run execute() above") {
Err(e) => {
io.complete(Err(std::io::Error::new(
e.kind(),
"dio vec read failed",
)));
}
Ok(value_buf) => {
io.complete(Ok(OnDiskValue::WalRecordOrImage(value_buf.into())));
}
}
let key_situation =
reconstruct_state.update_key(&key, entry_lsn, value.unwrap());
if key_situation == ValueReconstructSituation::Complete {
// TODO: metric to see if we fetched more values than necessary
continue 'next_key;
}
// process the next value in the next iteration of the loop
}
}
}
}
reconstruct_state.on_lsn_advanced(&keyspace, self.start_lsn);
assert!(ios.is_empty());
// Keep layer existent until this IO is done;
// This is kinda forced for InMemoryLayer because we need to inner.read() anyway,
// but it's less obvious for DeltaLayer and ImageLayer. So, keep this explicit
// drop for consistency among all three layer types.
drop(inner);
drop(read_from);
})
.await;
Ok(())
}
@@ -606,6 +603,7 @@ impl InMemoryLayer {
// Write the batch to the file
inner.file.write_raw(&raw, ctx).await?;
let new_size = inner.file.len();
let expected_new_len = base_offset
.checked_add(raw.len().into_u64())
// write_raw would error if we were to overflow u64.

View File

@@ -308,7 +308,7 @@ impl Layer {
reconstruct_data: &mut ValuesReconstructState,
ctx: &RequestContext,
) -> Result<(), GetVectoredError> {
let layer = self
let downloaded = self
.0
.get_or_maybe_download(true, Some(ctx))
.await
@@ -318,11 +318,15 @@ impl Layer {
}
other => GetVectoredError::Other(anyhow::anyhow!(other)),
})?;
let this = ResidentLayer {
downloaded: downloaded.clone(),
owner: self.clone(),
};
self.record_access(ctx);
layer
.get_values_reconstruct_data(keyspace, lsn_range, reconstruct_data, &self.0, ctx)
downloaded
.get_values_reconstruct_data(this, keyspace, lsn_range, reconstruct_data, ctx)
.instrument(tracing::debug_span!("get_values_reconstruct_data", layer=%self))
.await
.map_err(|err| match err {
@@ -1768,25 +1772,25 @@ impl DownloadedLayer {
async fn get_values_reconstruct_data(
&self,
this: ResidentLayer,
keyspace: KeySpace,
lsn_range: Range<Lsn>,
reconstruct_data: &mut ValuesReconstructState,
owner: &Arc<LayerInner>,
ctx: &RequestContext,
) -> Result<(), GetVectoredError> {
use LayerKind::*;
match self
.get(owner, ctx)
.get(&this.owner.0, ctx)
.await
.map_err(GetVectoredError::Other)?
{
Delta(d) => {
d.get_values_reconstruct_data(keyspace, lsn_range, reconstruct_data, ctx)
d.get_values_reconstruct_data(this, keyspace, lsn_range, reconstruct_data, ctx)
.await
}
Image(i) => {
i.get_values_reconstruct_data(keyspace, reconstruct_data, ctx)
i.get_values_reconstruct_data(this, keyspace, reconstruct_data, ctx)
.await
}
}

View File

@@ -11,7 +11,10 @@ use super::failpoints::{Failpoint, FailpointKind};
use super::*;
use crate::{
context::DownloadBehavior,
tenant::{harness::test_img, storage_layer::LayerVisibilityHint},
tenant::{
harness::test_img,
storage_layer::{IoConcurrency, LayerVisibilityHint},
},
};
use crate::{task_mgr::TaskKind, tenant::harness::TenantHarness};
@@ -31,6 +34,7 @@ async fn smoke_test() {
let span = h.span();
let download_span = span.in_scope(|| tracing::info_span!("downloading", timeline_id = 1));
let (tenant, _) = h.load().await;
let io_concurrency = IoConcurrency::spawn_for_test();
let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Download);
@@ -89,7 +93,7 @@ async fn smoke_test() {
};
let img_before = {
let mut data = ValuesReconstructState::default();
let mut data = ValuesReconstructState::new(io_concurrency.clone());
layer
.get_values_reconstruct_data(
controlfile_keyspace.clone(),
@@ -99,10 +103,13 @@ async fn smoke_test() {
)
.await
.unwrap();
data.keys
.remove(&CONTROLFILE_KEY)
.expect("must be present")
.expect("should not error")
.collect_pending_ios()
.await
.expect("must not error")
.img
.take()
.expect("tenant harness writes the control file")
@@ -121,7 +128,7 @@ async fn smoke_test() {
// on accesses when the layer is evicted, it will automatically be downloaded.
let img_after = {
let mut data = ValuesReconstructState::default();
let mut data = ValuesReconstructState::new(io_concurrency.clone());
layer
.get_values_reconstruct_data(
controlfile_keyspace.clone(),
@@ -135,7 +142,9 @@ async fn smoke_test() {
data.keys
.remove(&CONTROLFILE_KEY)
.expect("must be present")
.expect("should not error")
.collect_pending_ios()
.await
.expect("must not error")
.img
.take()
.expect("tenant harness writes the control file")

View File

@@ -20,6 +20,7 @@ use camino::Utf8Path;
use chrono::{DateTime, Utc};
use enumset::EnumSet;
use fail::fail_point;
use futures::{stream::FuturesUnordered, StreamExt};
use handle::ShardTimelineId;
use offload::OffloadError;
use once_cell::sync::Lazy;
@@ -74,6 +75,7 @@ use std::{
ops::{Deref, Range},
};
use crate::l0_flush::{self, L0FlushGlobalState};
use crate::{
aux_file::AuxFileSizeEstimator,
page_service::TenantManagerTypes,
@@ -81,7 +83,10 @@ use crate::{
config::AttachmentMode,
layer_map::{LayerMap, SearchResult},
metadata::TimelineMetadata,
storage_layer::{inmemory_layer::IndexEntry, PersistentLayerDesc},
storage_layer::{
inmemory_layer::IndexEntry, IoConcurrency, PersistentLayerDesc,
ValueReconstructSituation,
},
},
walingest::WalLagCooldown,
walredo,
@@ -102,10 +107,6 @@ use crate::{
use crate::{
disk_usage_eviction_task::EvictionCandidate, tenant::storage_layer::delta_layer::DeltaEntry,
};
use crate::{
l0_flush::{self, L0FlushGlobalState},
metrics::GetKind,
};
use crate::{
metrics::ScanLatencyOngoingRecording, tenant::timeline::logical_size::CurrentLogicalSize,
};
@@ -1005,9 +1006,7 @@ impl Timeline {
ranges: vec![key..key.next()],
};
// Initialise the reconstruct state for the key with the cache
// entry returned above.
let mut reconstruct_state = ValuesReconstructState::new();
let mut reconstruct_state = ValuesReconstructState::new(IoConcurrency::sequential());
let vectored_res = self
.get_vectored_impl(keyspace.clone(), lsn, &mut reconstruct_state, ctx)
@@ -1050,6 +1049,7 @@ impl Timeline {
&self,
keyspace: KeySpace,
lsn: Lsn,
io_concurrency: super::storage_layer::IoConcurrency,
ctx: &RequestContext,
) -> Result<BTreeMap<Key, Result<Bytes, PageReconstructError>>, GetVectoredError> {
if !lsn.is_valid() {
@@ -1084,7 +1084,7 @@ impl Timeline {
.get_vectored_impl(
keyspace.clone(),
lsn,
&mut ValuesReconstructState::new(),
&mut ValuesReconstructState::new(io_concurrency),
ctx,
)
.await;
@@ -1109,6 +1109,7 @@ impl Timeline {
keyspace: KeySpace,
lsn: Lsn,
ctx: &RequestContext,
io_concurrency: super::storage_layer::IoConcurrency,
) -> Result<BTreeMap<Key, Result<Bytes, PageReconstructError>>, GetVectoredError> {
if !lsn.is_valid() {
return Err(GetVectoredError::InvalidLsn(lsn));
@@ -1140,7 +1141,7 @@ impl Timeline {
.get_vectored_impl(
keyspace.clone(),
lsn,
&mut ValuesReconstructState::default(),
&mut ValuesReconstructState::new(io_concurrency),
ctx,
)
.await;
@@ -1159,39 +1160,56 @@ impl Timeline {
reconstruct_state: &mut ValuesReconstructState,
ctx: &RequestContext,
) -> Result<BTreeMap<Key, Result<Bytes, PageReconstructError>>, GetVectoredError> {
let get_kind = if keyspace.total_raw_size() == 1 {
GetKind::Singular
} else {
GetKind::Vectored
let traversal_res: Result<(), _> = self
.get_vectored_reconstruct_data(keyspace.clone(), lsn, reconstruct_state, ctx)
.await;
if let Err(err) = traversal_res {
// Wait for all the spawned IOs to complete.
// See comments on `spawn_io` inside `storage_layer` for more details.
let mut collect_futs = std::mem::take(&mut reconstruct_state.keys)
.into_values()
.map(|state| state.collect_pending_ios())
.collect::<FuturesUnordered<_>>();
while collect_futs.next().await.is_some() {}
return Err(err);
};
let get_data_timer = crate::metrics::GET_RECONSTRUCT_DATA_TIME
.for_get_kind(get_kind)
.start_timer();
self.get_vectored_reconstruct_data(keyspace.clone(), lsn, reconstruct_state, ctx)
.await?;
get_data_timer.stop_and_record();
let reconstruct_timer = crate::metrics::RECONSTRUCT_TIME
.for_get_kind(get_kind)
.start_timer();
let mut results: BTreeMap<Key, Result<Bytes, PageReconstructError>> = BTreeMap::new();
let layers_visited = reconstruct_state.get_layers_visited();
for (key, res) in std::mem::take(&mut reconstruct_state.keys) {
match res {
Err(err) => {
results.insert(key, Err(err));
}
Ok(state) => {
let state = ValueReconstructState::from(state);
let futs = FuturesUnordered::new();
for (key, state) in std::mem::take(&mut reconstruct_state.keys) {
futs.push({
let walredo_self = self.myself.upgrade().expect("&self method holds the arc");
async move {
assert_eq!(state.situation, ValueReconstructSituation::Complete);
let reconstruct_res = self.reconstruct_value(key, lsn, state).await;
results.insert(key, reconstruct_res);
let converted = match state.collect_pending_ios().await {
Ok(ok) => ok,
Err(err) => {
return (key, Err(err));
}
};
// The walredo module expects the records to be descending in terms of Lsn.
// And we submit the IOs in that order, so, there shuold be no need to sort here.
debug_assert!(
converted
.records
.is_sorted_by_key(|(lsn, _)| std::cmp::Reverse(*lsn)),
"{converted:?}"
);
(
key,
walredo_self.reconstruct_value(key, lsn, converted).await,
)
}
}
});
}
reconstruct_timer.stop_and_record();
let results = futs
.collect::<BTreeMap<Key, Result<Bytes, PageReconstructError>>>()
.await;
// For aux file keys (v1 or v2) the vectored read path does not return an error
// when they're missing. Instead they are omitted from the resulting btree
@@ -2873,6 +2891,14 @@ impl Timeline {
crate::metrics::initial_logical_size::START_CALCULATION.retry(circumstances)
};
let io_concurrency = IoConcurrency::spawn_from_conf(
self_ref.conf,
self_ref
.gate
.enter()
.map_err(|_| CalculateLogicalSizeError::Cancelled)?,
);
let calculated_size = self_ref
.logical_size_calculation_task(
initial_part_end,
@@ -2882,7 +2908,11 @@ impl Timeline {
.await?;
self_ref
.trigger_aux_file_size_computation(initial_part_end, background_ctx)
.trigger_aux_file_size_computation(
initial_part_end,
background_ctx,
io_concurrency,
)
.await?;
// TODO: add aux file size to logical size
@@ -4115,6 +4145,7 @@ impl Timeline {
/// Create image layers for Postgres data. Assumes the caller passes a partition that is not too large,
/// so that at most one image layer will be produced from this function.
#[allow(clippy::too_many_arguments)]
async fn create_image_layer_for_rel_blocks(
self: &Arc<Self>,
partition: &KeySpace,
@@ -4123,6 +4154,7 @@ impl Timeline {
ctx: &RequestContext,
img_range: Range<Key>,
start: Key,
io_concurrency: IoConcurrency,
) -> Result<ImageLayerCreationOutcome, CreateImageLayersError> {
let mut wrote_keys = false;
@@ -4151,7 +4183,12 @@ impl Timeline {
|| (last_key_in_range && key_request_accum.raw_size() > 0)
{
let results = self
.get_vectored(key_request_accum.consume_keyspace(), lsn, ctx)
.get_vectored(
key_request_accum.consume_keyspace(),
lsn,
io_concurrency.clone(),
ctx,
)
.await?;
if self.cancel.is_cancelled() {
@@ -4230,9 +4267,10 @@ impl Timeline {
img_range: Range<Key>,
mode: ImageLayerCreationMode,
start: Key,
io_concurrency: IoConcurrency,
) -> Result<ImageLayerCreationOutcome, CreateImageLayersError> {
// Metadata keys image layer creation.
let mut reconstruct_state = ValuesReconstructState::default();
let mut reconstruct_state = ValuesReconstructState::new(io_concurrency);
let begin = Instant::now();
let data = self
.get_vectored_impl(partition.clone(), lsn, &mut reconstruct_state, ctx)
@@ -4449,6 +4487,13 @@ impl Timeline {
)))
});
let io_concurrency = IoConcurrency::spawn_from_conf(
self.conf,
self.gate
.enter()
.map_err(|_| CreateImageLayersError::Cancelled)?,
);
if !compact_metadata {
let ImageLayerCreationOutcome {
image,
@@ -4461,6 +4506,7 @@ impl Timeline {
ctx,
img_range,
start,
io_concurrency,
)
.await?;
@@ -4479,6 +4525,7 @@ impl Timeline {
img_range,
mode,
start,
io_concurrency,
)
.await?;
start = next_start_key;
@@ -5746,13 +5793,14 @@ impl Timeline {
self: &Arc<Timeline>,
lsn: Lsn,
ctx: &RequestContext,
io_concurrency: IoConcurrency,
) -> anyhow::Result<Vec<(Key, Bytes)>> {
let mut all_data = Vec::new();
let guard = self.layers.read().await;
for layer in guard.layer_map()?.iter_historic_layers() {
if !layer.is_delta() && layer.image_layer_lsn() == lsn {
let layer = guard.get_from_desc(&layer);
let mut reconstruct_data = ValuesReconstructState::default();
let mut reconstruct_data = ValuesReconstructState::new(io_concurrency.clone());
layer
.get_values_reconstruct_data(
KeySpace::single(Key::MIN..Key::MAX),
@@ -5761,8 +5809,9 @@ impl Timeline {
ctx,
)
.await?;
for (k, v) in reconstruct_data.keys {
all_data.push((k, v?.img.unwrap().1));
for (k, v) in std::mem::take(&mut reconstruct_data.keys) {
let v = v.collect_pending_ios().await?;
all_data.push((k, v.img.unwrap().1));
}
}
}

View File

@@ -42,8 +42,8 @@ use crate::tenant::storage_layer::merge_iterator::MergeIterator;
use crate::tenant::storage_layer::{
AsLayerDesc, PersistentLayerDesc, PersistentLayerKey, ValueReconstructState,
};
use crate::tenant::timeline::ImageLayerCreationOutcome;
use crate::tenant::timeline::{drop_rlock, DeltaLayerWriter, ImageLayerWriter};
use crate::tenant::timeline::{ImageLayerCreationOutcome, IoConcurrency};
use crate::tenant::timeline::{Layer, ResidentLayer};
use crate::tenant::{gc_block, DeltaLayer, MaybeOffloaded};
use crate::virtual_file::{MaybeFatalIo, VirtualFile};
@@ -3170,6 +3170,7 @@ impl TimelineAdaptor {
ctx,
key_range.clone(),
start,
IoConcurrency::sequential(),
)
.await?;

View File

@@ -35,6 +35,7 @@ use crate::virtual_file::{self, VirtualFile};
pub struct BlobMeta {
pub key: Key,
pub lsn: Lsn,
pub will_init: bool,
}
/// A view into the vectored blobs read buffer.
@@ -310,7 +311,15 @@ pub enum BlobFlag {
/// * Iterate over the collected blobs and coalesce them into reads at the end
pub struct VectoredReadPlanner {
// Track all the blob offsets. Start offsets must be ordered.
blobs: BTreeMap<Key, Vec<(Lsn, u64, u64)>>,
// Values in the value tuples are:
// (
// lsn of the blob,
// start offset of the blob in the underlying file,
// end offset of the blob in the underlying file,
// whether the blob initializes the page image or not
// see [`pageserver_api::record::NeonWalRecord::will_init`]
// )
blobs: BTreeMap<Key, Vec<(Lsn, u64, u64, bool)>>,
// Arguments for previous blob passed into [`VectoredReadPlanner::handle`]
prev: Option<(Key, Lsn, u64, BlobFlag)>,
@@ -371,12 +380,12 @@ impl VectoredReadPlanner {
match flag {
BlobFlag::None => {
let blobs_for_key = self.blobs.entry(key).or_default();
blobs_for_key.push((lsn, start_offset, end_offset));
blobs_for_key.push((lsn, start_offset, end_offset, false));
}
BlobFlag::ReplaceAll => {
let blobs_for_key = self.blobs.entry(key).or_default();
blobs_for_key.clear();
blobs_for_key.push((lsn, start_offset, end_offset));
blobs_for_key.push((lsn, start_offset, end_offset, true));
}
BlobFlag::Ignore => {}
}
@@ -387,11 +396,17 @@ impl VectoredReadPlanner {
let mut reads = Vec::new();
for (key, blobs_for_key) in self.blobs {
for (lsn, start_offset, end_offset) in blobs_for_key {
for (lsn, start_offset, end_offset, will_init) in blobs_for_key {
let extended = match &mut current_read_builder {
Some(read_builder) => {
read_builder.extend(start_offset, end_offset, BlobMeta { key, lsn })
}
Some(read_builder) => read_builder.extend(
start_offset,
end_offset,
BlobMeta {
key,
lsn,
will_init,
},
),
None => VectoredReadExtended::No,
};
@@ -399,7 +414,11 @@ impl VectoredReadPlanner {
let next_read_builder = ChunkedVectoredReadBuilder::new(
start_offset,
end_offset,
BlobMeta { key, lsn },
BlobMeta {
key,
lsn,
will_init,
},
self.max_read_size,
);
@@ -527,7 +546,7 @@ impl<'a> VectoredBlobReader<'a> {
pub struct StreamingVectoredReadPlanner {
read_builder: Option<ChunkedVectoredReadBuilder>,
// Arguments for previous blob passed into [`StreamingVectoredReadPlanner::handle`]
prev: Option<(Key, Lsn, u64)>,
prev: Option<(Key, Lsn, u64, bool)>,
/// Max read size per batch. This is not a strict limit. If there are [0, 100) and [100, 200), while the `max_read_size` is 150,
/// we will produce a single batch instead of split them.
max_read_size: u64,
@@ -550,27 +569,47 @@ impl StreamingVectoredReadPlanner {
}
}
pub fn handle(&mut self, key: Key, lsn: Lsn, offset: u64) -> Option<VectoredRead> {
pub fn handle(
&mut self,
key: Key,
lsn: Lsn,
offset: u64,
will_init: bool,
) -> Option<VectoredRead> {
// Implementation note: internally lag behind by one blob such that
// we have a start and end offset when initialising [`VectoredRead`]
let (prev_key, prev_lsn, prev_offset) = match self.prev {
let (prev_key, prev_lsn, prev_offset, prev_will_init) = match self.prev {
None => {
self.prev = Some((key, lsn, offset));
self.prev = Some((key, lsn, offset, will_init));
return None;
}
Some(prev) => prev,
};
let res = self.add_blob(prev_key, prev_lsn, prev_offset, offset, false);
let res = self.add_blob(
prev_key,
prev_lsn,
prev_offset,
offset,
false,
prev_will_init,
);
self.prev = Some((key, lsn, offset));
self.prev = Some((key, lsn, offset, will_init));
res
}
pub fn handle_range_end(&mut self, offset: u64) -> Option<VectoredRead> {
let res = if let Some((prev_key, prev_lsn, prev_offset)) = self.prev {
self.add_blob(prev_key, prev_lsn, prev_offset, offset, true)
let res = if let Some((prev_key, prev_lsn, prev_offset, prev_will_init)) = self.prev {
self.add_blob(
prev_key,
prev_lsn,
prev_offset,
offset,
true,
prev_will_init,
)
} else {
None
};
@@ -587,10 +626,19 @@ impl StreamingVectoredReadPlanner {
start_offset: u64,
end_offset: u64,
is_last_blob_in_read: bool,
will_init: bool,
) -> Option<VectoredRead> {
match &mut self.read_builder {
Some(read_builder) => {
let extended = read_builder.extend(start_offset, end_offset, BlobMeta { key, lsn });
let extended = read_builder.extend(
start_offset,
end_offset,
BlobMeta {
key,
lsn,
will_init,
},
);
assert_eq!(extended, VectoredReadExtended::Yes);
}
None => {
@@ -598,7 +646,11 @@ impl StreamingVectoredReadPlanner {
Some(ChunkedVectoredReadBuilder::new_streaming(
start_offset,
end_offset,
BlobMeta { key, lsn },
BlobMeta {
key,
lsn,
will_init,
},
))
};
}
@@ -812,7 +864,7 @@ mod tests {
let mut planner = StreamingVectoredReadPlanner::new(max_read_size, 1000);
let mut reads = Vec::new();
for (key, lsn, offset, _) in blob_descriptions.clone() {
reads.extend(planner.handle(key, lsn, offset));
reads.extend(planner.handle(key, lsn, offset, false));
}
reads.extend(planner.handle_range_end(652 * 1024));
@@ -850,7 +902,7 @@ mod tests {
let mut planner = StreamingVectoredReadPlanner::new(max_read_size, 2);
let mut reads = Vec::new();
for (key, lsn, offset, _) in blob_descriptions.clone() {
reads.extend(planner.handle(key, lsn, offset));
reads.extend(planner.handle(key, lsn, offset, false));
}
reads.extend(planner.handle_range_end(652 * 1024));
@@ -875,7 +927,7 @@ mod tests {
{
let mut planner = StreamingVectoredReadPlanner::new(max_read_size, 1);
let mut reads = Vec::new();
reads.extend(planner.handle(key, lsn, 0));
reads.extend(planner.handle(key, lsn, 0, false));
reads.extend(planner.handle_range_end(652 * 1024));
assert_eq!(reads.len(), 1);
validate_read(&reads[0], &[(key, lsn, 0, BlobFlag::None)]);
@@ -883,8 +935,8 @@ mod tests {
{
let mut planner = StreamingVectoredReadPlanner::new(max_read_size, 1);
let mut reads = Vec::new();
reads.extend(planner.handle(key, lsn, 0));
reads.extend(planner.handle(key, lsn, 128 * 1024));
reads.extend(planner.handle(key, lsn, 0, false));
reads.extend(planner.handle(key, lsn, 128 * 1024, false));
reads.extend(planner.handle_range_end(652 * 1024));
assert_eq!(reads.len(), 2);
validate_read(&reads[0], &[(key, lsn, 0, BlobFlag::None)]);
@@ -893,8 +945,8 @@ mod tests {
{
let mut planner = StreamingVectoredReadPlanner::new(max_read_size, 2);
let mut reads = Vec::new();
reads.extend(planner.handle(key, lsn, 0));
reads.extend(planner.handle(key, lsn, 128 * 1024));
reads.extend(planner.handle(key, lsn, 0, false));
reads.extend(planner.handle(key, lsn, 128 * 1024, false));
reads.extend(planner.handle_range_end(652 * 1024));
assert_eq!(reads.len(), 1);
validate_read(
@@ -923,6 +975,7 @@ mod tests {
let meta = BlobMeta {
key: Key::MIN,
lsn: Lsn(0),
will_init: false,
};
for (idx, (blob, offset)) in blobs.iter().zip(offsets.iter()).enumerate() {

View File

@@ -499,7 +499,13 @@ impl WalIngest {
let content = modification
.tline
.get_rel_page_at_lsn(src_rel, blknum, Version::Modified(modification), ctx)
.get_rel_page_at_lsn(
src_rel,
blknum,
Version::Modified(modification),
ctx,
crate::tenant::storage_layer::IoConcurrency::sequential(),
)
.await?;
modification.put_rel_page_image(dst_rel, blknum, content)?;
num_blocks_copied += 1;
@@ -1489,6 +1495,7 @@ mod tests {
use super::*;
use crate::tenant::harness::*;
use crate::tenant::remote_timeline_client::{remote_initdb_archive_path, INITDB_PATH};
use crate::tenant::storage_layer::IoConcurrency;
use postgres_ffi::RELSEG_SIZE;
use crate::DEFAULT_PG_VERSION;
@@ -1532,6 +1539,7 @@ mod tests {
#[tokio::test]
async fn test_relsize() -> Result<()> {
let (tenant, ctx) = TenantHarness::create("test_relsize").await?.load().await;
let io_concurrency = IoConcurrency::spawn_for_test();
let tline = tenant
.create_test_timeline(TIMELINE_ID, Lsn(8), DEFAULT_PG_VERSION, &ctx)
.await?;
@@ -1599,7 +1607,13 @@ mod tests {
// Check page contents at each LSN
assert_eq!(
tline
.get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x20)), &ctx)
.get_rel_page_at_lsn(
TESTREL_A,
0,
Version::Lsn(Lsn(0x20)),
&ctx,
io_concurrency.clone()
)
.instrument(test_span.clone())
.await?,
test_img("foo blk 0 at 2")
@@ -1607,7 +1621,13 @@ mod tests {
assert_eq!(
tline
.get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x30)), &ctx)
.get_rel_page_at_lsn(
TESTREL_A,
0,
Version::Lsn(Lsn(0x30)),
&ctx,
io_concurrency.clone()
)
.instrument(test_span.clone())
.await?,
test_img("foo blk 0 at 3")
@@ -1615,14 +1635,26 @@ mod tests {
assert_eq!(
tline
.get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x40)), &ctx)
.get_rel_page_at_lsn(
TESTREL_A,
0,
Version::Lsn(Lsn(0x40)),
&ctx,
io_concurrency.clone()
)
.instrument(test_span.clone())
.await?,
test_img("foo blk 0 at 3")
);
assert_eq!(
tline
.get_rel_page_at_lsn(TESTREL_A, 1, Version::Lsn(Lsn(0x40)), &ctx)
.get_rel_page_at_lsn(
TESTREL_A,
1,
Version::Lsn(Lsn(0x40)),
&ctx,
io_concurrency.clone()
)
.instrument(test_span.clone())
.await?,
test_img("foo blk 1 at 4")
@@ -1630,21 +1662,39 @@ mod tests {
assert_eq!(
tline
.get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x50)), &ctx)
.get_rel_page_at_lsn(
TESTREL_A,
0,
Version::Lsn(Lsn(0x50)),
&ctx,
io_concurrency.clone()
)
.instrument(test_span.clone())
.await?,
test_img("foo blk 0 at 3")
);
assert_eq!(
tline
.get_rel_page_at_lsn(TESTREL_A, 1, Version::Lsn(Lsn(0x50)), &ctx)
.get_rel_page_at_lsn(
TESTREL_A,
1,
Version::Lsn(Lsn(0x50)),
&ctx,
io_concurrency.clone()
)
.instrument(test_span.clone())
.await?,
test_img("foo blk 1 at 4")
);
assert_eq!(
tline
.get_rel_page_at_lsn(TESTREL_A, 2, Version::Lsn(Lsn(0x50)), &ctx)
.get_rel_page_at_lsn(
TESTREL_A,
2,
Version::Lsn(Lsn(0x50)),
&ctx,
io_concurrency.clone()
)
.instrument(test_span.clone())
.await?,
test_img("foo blk 2 at 5")
@@ -1667,14 +1717,26 @@ mod tests {
);
assert_eq!(
tline
.get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x60)), &ctx)
.get_rel_page_at_lsn(
TESTREL_A,
0,
Version::Lsn(Lsn(0x60)),
&ctx,
io_concurrency.clone()
)
.instrument(test_span.clone())
.await?,
test_img("foo blk 0 at 3")
);
assert_eq!(
tline
.get_rel_page_at_lsn(TESTREL_A, 1, Version::Lsn(Lsn(0x60)), &ctx)
.get_rel_page_at_lsn(
TESTREL_A,
1,
Version::Lsn(Lsn(0x60)),
&ctx,
io_concurrency.clone()
)
.instrument(test_span.clone())
.await?,
test_img("foo blk 1 at 4")
@@ -1689,7 +1751,13 @@ mod tests {
);
assert_eq!(
tline
.get_rel_page_at_lsn(TESTREL_A, 2, Version::Lsn(Lsn(0x50)), &ctx)
.get_rel_page_at_lsn(
TESTREL_A,
2,
Version::Lsn(Lsn(0x50)),
&ctx,
io_concurrency.clone()
)
.instrument(test_span.clone())
.await?,
test_img("foo blk 2 at 5")
@@ -1722,14 +1790,26 @@ mod tests {
);
assert_eq!(
tline
.get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x70)), &ctx)
.get_rel_page_at_lsn(
TESTREL_A,
0,
Version::Lsn(Lsn(0x70)),
&ctx,
io_concurrency.clone()
)
.instrument(test_span.clone())
.await?,
ZERO_PAGE
);
assert_eq!(
tline
.get_rel_page_at_lsn(TESTREL_A, 1, Version::Lsn(Lsn(0x70)), &ctx)
.get_rel_page_at_lsn(
TESTREL_A,
1,
Version::Lsn(Lsn(0x70)),
&ctx,
io_concurrency.clone()
)
.instrument(test_span.clone())
.await?,
test_img("foo blk 1")
@@ -1750,7 +1830,13 @@ mod tests {
for blk in 2..1500 {
assert_eq!(
tline
.get_rel_page_at_lsn(TESTREL_A, blk, Version::Lsn(Lsn(0x80)), &ctx)
.get_rel_page_at_lsn(
TESTREL_A,
blk,
Version::Lsn(Lsn(0x80)),
&ctx,
io_concurrency.clone()
)
.instrument(test_span.clone())
.await?,
ZERO_PAGE
@@ -1758,7 +1844,13 @@ mod tests {
}
assert_eq!(
tline
.get_rel_page_at_lsn(TESTREL_A, 1500, Version::Lsn(Lsn(0x80)), &ctx)
.get_rel_page_at_lsn(
TESTREL_A,
1500,
Version::Lsn(Lsn(0x80)),
&ctx,
io_concurrency.clone()
)
.instrument(test_span.clone())
.await?,
test_img("foo blk 1500")
@@ -1851,6 +1943,7 @@ mod tests {
.await?
.load()
.await;
let io_concurrency = IoConcurrency::spawn_for_test();
let tline = tenant
.create_test_timeline(TIMELINE_ID, Lsn(8), DEFAULT_PG_VERSION, &ctx)
.await?;
@@ -1903,7 +1996,13 @@ mod tests {
let data = format!("foo blk {} at {}", blkno, lsn);
assert_eq!(
tline
.get_rel_page_at_lsn(TESTREL_A, blkno, Version::Lsn(lsn), &ctx)
.get_rel_page_at_lsn(
TESTREL_A,
blkno,
Version::Lsn(lsn),
&ctx,
io_concurrency.clone()
)
.instrument(test_span.clone())
.await?,
test_img(&data)
@@ -1931,7 +2030,13 @@ mod tests {
let data = format!("foo blk {} at {}", blkno, lsn);
assert_eq!(
tline
.get_rel_page_at_lsn(TESTREL_A, blkno, Version::Lsn(Lsn(0x60)), &ctx)
.get_rel_page_at_lsn(
TESTREL_A,
blkno,
Version::Lsn(Lsn(0x60)),
&ctx,
io_concurrency.clone()
)
.instrument(test_span.clone())
.await?,
test_img(&data)
@@ -1950,7 +2055,13 @@ mod tests {
let data = format!("foo blk {} at {}", blkno, lsn);
assert_eq!(
tline
.get_rel_page_at_lsn(TESTREL_A, blkno, Version::Lsn(Lsn(0x50)), &ctx)
.get_rel_page_at_lsn(
TESTREL_A,
blkno,
Version::Lsn(Lsn(0x50)),
&ctx,
io_concurrency.clone()
)
.instrument(test_span.clone())
.await?,
test_img(&data)
@@ -1987,7 +2098,13 @@ mod tests {
let data = format!("foo blk {} at {}", blkno, lsn);
assert_eq!(
tline
.get_rel_page_at_lsn(TESTREL_A, blkno, Version::Lsn(Lsn(0x80)), &ctx)
.get_rel_page_at_lsn(
TESTREL_A,
blkno,
Version::Lsn(Lsn(0x80)),
&ctx,
io_concurrency.clone()
)
.instrument(test_span.clone())
.await?,
test_img(&data)

View File

@@ -208,6 +208,10 @@ class ShardIndex:
shard_count=int(input[2:4], 16),
)
@property
def is_sharded(self) -> bool:
return self.shard_count != 0
class TenantShardId:
def __init__(self, tenant_id: TenantId, shard_number: int, shard_count: int):

View File

@@ -126,12 +126,8 @@ PAGESERVER_GLOBAL_METRICS: tuple[str, ...] = (
"pageserver_page_cache_read_accesses_total",
"pageserver_page_cache_size_current_bytes",
"pageserver_page_cache_size_max_bytes",
"pageserver_getpage_reconstruct_seconds_bucket",
"pageserver_getpage_reconstruct_seconds_count",
"pageserver_getpage_reconstruct_seconds_sum",
*[f"pageserver_basebackup_query_seconds_{x}" for x in ["bucket", "count", "sum"]],
*histogram("pageserver_smgr_query_seconds_global"),
*histogram("pageserver_getpage_get_reconstruct_data_seconds"),
*histogram("pageserver_wait_lsn_seconds"),
*histogram("pageserver_remote_operation_seconds"),
*histogram("pageserver_io_operations_seconds"),

View File

@@ -313,6 +313,10 @@ class PgProtocol:
"""
return self.safe_psql(query, log_query=log_query)[0][0]
def show_timeline_id(self) -> TimelineId:
"""SHOW neon.timeline_id"""
return TimelineId(cast("str", self.safe_psql("show neon.timeline_id")[0][0]))
class PageserverWalReceiverProtocol(StrEnum):
VANILLA = "vanilla"
@@ -387,6 +391,7 @@ class NeonEnvBuilder:
storage_controller_port_override: int | None = None,
pageserver_virtual_file_io_mode: str | None = None,
pageserver_wal_receiver_protocol: PageserverWalReceiverProtocol | None = None,
pageserver_get_vectored_concurrent_io: str | None = None,
):
self.repo_dir = repo_dir
self.rust_log_override = rust_log_override
@@ -426,6 +431,9 @@ class NeonEnvBuilder:
self.storage_controller_config: dict[Any, Any] | None = None
self.pageserver_virtual_file_io_engine: str | None = pageserver_virtual_file_io_engine
self.pageserver_get_vectored_concurrent_io: str | None = (
pageserver_get_vectored_concurrent_io
)
self.pageserver_default_tenant_config_compaction_algorithm: dict[str, Any] | None = (
pageserver_default_tenant_config_compaction_algorithm
@@ -452,6 +460,7 @@ class NeonEnvBuilder:
self.test_name = test_name
self.compatibility_neon_binpath = compatibility_neon_binpath
self.compatibility_pg_distrib_dir = compatibility_pg_distrib_dir
self.test_may_use_compatibility_snapshot_binaries = False
self.version_combination = combination
self.mixdir = self.test_output_dir / "mixdir_neon"
if self.version_combination is not None:
@@ -463,6 +472,7 @@ class NeonEnvBuilder:
), "the environment variable COMPATIBILITY_POSTGRES_DISTRIB_DIR is required when using mixed versions"
self.mixdir.mkdir(mode=0o755, exist_ok=True)
self._mix_versions()
self.test_may_use_compatibility_snapshot_binaries = True
def init_configs(self, default_remote_storage_if_missing: bool = True) -> NeonEnv:
# Cannot create more than one environment from one builder
@@ -1062,6 +1072,7 @@ class NeonEnv:
self.pageserver_virtual_file_io_engine = config.pageserver_virtual_file_io_engine
self.pageserver_virtual_file_io_mode = config.pageserver_virtual_file_io_mode
self.pageserver_wal_receiver_protocol = config.pageserver_wal_receiver_protocol
self.pageserver_get_vectored_concurrent_io = config.pageserver_get_vectored_concurrent_io
# Create the neon_local's `NeonLocalInitConf`
cfg: dict[str, Any] = {
@@ -1121,6 +1132,20 @@ class NeonEnv:
"max_batch_size": 32,
}
# Concurrent IO (https://github.com/neondatabase/neon/issues/9378):
# enable concurrent IO by default in tests and benchmarks.
# Compat tests are exempt because old versions fail to parse the new config.
get_vectored_concurrent_io = self.pageserver_get_vectored_concurrent_io
if config.test_may_use_compatibility_snapshot_binaries:
log.info(
"Forcing use of binary-built-in default to avoid forward-compatibility related test failures"
)
get_vectored_concurrent_io = None
if get_vectored_concurrent_io is not None:
ps_cfg["get_vectored_concurrent_io"] = {
"mode": self.pageserver_get_vectored_concurrent_io,
}
if self.pageserver_virtual_file_io_engine is not None:
ps_cfg["virtual_file_io_engine"] = self.pageserver_virtual_file_io_engine
if config.pageserver_default_tenant_config_compaction_algorithm is not None:
@@ -1455,6 +1480,7 @@ def neon_simple_env(
pageserver_virtual_file_io_engine: str,
pageserver_default_tenant_config_compaction_algorithm: dict[str, Any] | None,
pageserver_virtual_file_io_mode: str | None,
pageserver_get_vectored_concurrent_io: str | None,
) -> Iterator[NeonEnv]:
"""
Simple Neon environment, with 1 safekeeper and 1 pageserver. No authentication, no fsync.
@@ -1487,6 +1513,7 @@ def neon_simple_env(
pageserver_virtual_file_io_engine=pageserver_virtual_file_io_engine,
pageserver_default_tenant_config_compaction_algorithm=pageserver_default_tenant_config_compaction_algorithm,
pageserver_virtual_file_io_mode=pageserver_virtual_file_io_mode,
pageserver_get_vectored_concurrent_io=pageserver_get_vectored_concurrent_io,
combination=combination,
) as builder:
env = builder.init_start()
@@ -1513,6 +1540,7 @@ def neon_env_builder(
pageserver_default_tenant_config_compaction_algorithm: dict[str, Any] | None,
record_property: Callable[[str, object], None],
pageserver_virtual_file_io_mode: str | None,
pageserver_get_vectored_concurrent_io: str | None,
) -> Iterator[NeonEnvBuilder]:
"""
Fixture to create a Neon environment for test.
@@ -1555,6 +1583,7 @@ def neon_env_builder(
test_overlay_dir=test_overlay_dir,
pageserver_default_tenant_config_compaction_algorithm=pageserver_default_tenant_config_compaction_algorithm,
pageserver_virtual_file_io_mode=pageserver_virtual_file_io_mode,
pageserver_get_vectored_concurrent_io=pageserver_get_vectored_concurrent_io,
) as builder:
yield builder
# Propogate `preserve_database_files` to make it possible to use in other fixtures,

View File

@@ -44,6 +44,11 @@ def pageserver_virtual_file_io_mode() -> str | None:
return os.getenv("PAGESERVER_VIRTUAL_FILE_IO_MODE")
@pytest.fixture(scope="function", autouse=True)
def pageserver_get_vectored_concurrent_io() -> str | None:
return os.getenv("PAGESERVER_GET_VECTORED_CONCURRENT_IO")
def get_pageserver_default_tenant_config_compaction_algorithm() -> dict[str, Any] | None:
toml_table = os.getenv("PAGESERVER_DEFAULT_TENANT_CONFIG_COMPACTION_ALGORITHM")
if toml_table is None:

View File

@@ -251,6 +251,8 @@ def test_forward_compatibility(
os.environ.get("ALLOW_FORWARD_COMPATIBILITY_BREAKAGE", "false").lower() == "true"
)
neon_env_builder.test_may_use_compatibility_snapshot_binaries = True
try:
neon_env_builder.num_safekeepers = 3