mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-14 00:42:54 +00:00
# Problem
The timeout-based batching adds latency to unbatchable workloads.
We can choose a short batching timeout (e.g. 10us) but that requires
high-resolution timers, which tokio doesn't have.
I thoroughly explored options to use OS timers (see
[this](https://github.com/neondatabase/neon/pull/9822) abandoned PR).
In short, it's not an attractive option because any timer implementation
adds non-trivial overheads.
# Solution
The insight is that, in the steady state of a batchable workload, the
time we spend in `get_vectored` will be hundreds of microseconds anyway.
If we prepare the next batch concurrently to `get_vectored`, we will
have a sizeable batch ready once `get_vectored` of the current batch is
done and do not need an explicit timeout.
This can be reasonably described as **pipelining of the protocol
handler**.
# Implementation
We model the sub-protocol handler for pagestream requests
(`handle_pagrequests`) as two futures that form a pipeline:
2. Batching: read requests from the connection and fill the current
batch
3. Execution: `take` the current batch, execute it using `get_vectored`,
and send the response.
The Reading and Batching stage are connected through a new type of
channel called `spsc_fold`.
See the long comment in the `handle_pagerequests_pipelined` for details.
# Changes
- Refactor `handle_pagerequests`
- separate functions for
- reading one protocol message; produces a `BatchedFeMessage` with just
one page request in it
- batching; tried to merge an incoming `BatchedFeMessage` into an
existing `BatchedFeMessage`; returns `None` on success and returns back
the incoming message in case merging isn't possible
- execution of a batched message
- unify the timeline handle acquisition & request span construction; it
now happen in the function that reads the protocol message
- Implement serial and pipelined model
- serial: what we had before any of the batching changes
- read one protocol message
- execute protocol messages
- pipelined: the design described above
- optionality for execution of the pipeline: either via concurrent
futures vs tokio tasks
- Pageserver config
- remove batching timeout field
- add ability to configure pipelining mode
- add ability to limit max batch size for pipelined configurations
(required for the rollout, cf
https://github.com/neondatabase/cloud/issues/20620 )
- ability to configure execution mode
- Tests
- remove `batch_timeout` parametrization
- rename `test_getpage_merge_smoke` to `test_throughput`
- add parametrization to test different max batch sizes and execution
moes
- rename `test_timer_precision` to `test_latency`
- rename the test case file to `test_page_service_batching.py`
- better descriptions of what the tests actually do
## On the holding The `TimelineHandle` in the pending batch
While batching, we hold the `TimelineHandle` in the pending batch.
Therefore, the timeline will not finish shutting down while we're
batching.
This is not a problem in practice because the concurrently ongoing
`get_vectored` call will fail quickly with an error indicating that the
timeline is shutting down.
This results in the Execution stage returning a `QueryError::Shutdown`,
which causes the pipeline / entire page service connection to shut down.
This drops all references to the
`Arc<Mutex<Option<Box<BatchedFeMessage>>>>` object, thereby dropping the
contained `TimelineHandle`s.
- => fixes https://github.com/neondatabase/neon/issues/9850
# Performance
Local run of the benchmarks, results in [this empty
commit](1cf5b1463f)
in the PR branch.
Key take-aways:
* `concurrent-futures` and `tasks` deliver identical `batching_factor`
* tail latency impact unknown, cf
https://github.com/neondatabase/neon/issues/9837
* `concurrent-futures` has higher throughput than `tasks` in all
workloads (=lower `time` metric)
* In unbatchable workloads, `concurrent-futures` has 5% higher
`CPU-per-throughput` than that of `tasks`, and 15% higher than that of
`serial`.
* In batchable-32 workload, `concurrent-futures` has 8% lower
`CPU-per-throughput` than that of `tasks` (comparison to tput of
`serial` is irrelevant)
* in unbatchable workloads, mean and tail latencies of
`concurrent-futures` is practically identical to `serial`, whereas
`tasks` adds 20-30us of overhead
Overall, `concurrent-futures` seems like a slightly more attractive
choice.
# Rollout
This change is disabled-by-default.
Rollout plan:
- https://github.com/neondatabase/cloud/issues/20620
# Refs
- epic: https://github.com/neondatabase/neon/issues/9376
- this sub-task: https://github.com/neondatabase/neon/issues/9377
- the abandoned attempt to improve batching timeout resolution:
https://github.com/neondatabase/neon/pull/9820
- closes https://github.com/neondatabase/neon/issues/9850
- fixes https://github.com/neondatabase/neon/issues/9835
410 lines
13 KiB
Rust
410 lines
13 KiB
Rust
#![recursion_limit = "300"]
|
|
#![deny(clippy::undocumented_unsafe_blocks)]
|
|
|
|
mod auth;
|
|
pub mod basebackup;
|
|
pub mod config;
|
|
pub mod consumption_metrics;
|
|
pub mod context;
|
|
pub mod controller_upcall_client;
|
|
pub mod deletion_queue;
|
|
pub mod disk_usage_eviction_task;
|
|
pub mod http;
|
|
pub mod import_datadir;
|
|
pub mod l0_flush;
|
|
|
|
extern crate hyper0 as hyper;
|
|
|
|
use futures::{stream::FuturesUnordered, StreamExt};
|
|
pub use pageserver_api::keyspace;
|
|
use tokio_util::sync::CancellationToken;
|
|
mod assert_u64_eq_usize;
|
|
pub mod aux_file;
|
|
pub mod metrics;
|
|
pub mod page_cache;
|
|
pub mod page_service;
|
|
pub mod pgdatadir_mapping;
|
|
pub mod span;
|
|
pub(crate) mod statvfs;
|
|
pub mod task_mgr;
|
|
pub mod tenant;
|
|
pub mod utilization;
|
|
pub mod virtual_file;
|
|
pub mod walingest;
|
|
pub mod walredo;
|
|
|
|
use camino::Utf8Path;
|
|
use deletion_queue::DeletionQueue;
|
|
use tenant::{
|
|
mgr::{BackgroundPurges, TenantManager},
|
|
secondary,
|
|
};
|
|
use tracing::{info, info_span};
|
|
|
|
/// Current storage format version
|
|
///
|
|
/// This is embedded in the header of all the layer files.
|
|
/// If you make any backwards-incompatible changes to the storage
|
|
/// format, bump this!
|
|
/// Note that TimelineMetadata uses its own version number to track
|
|
/// backwards-compatible changes to the metadata format.
|
|
pub const STORAGE_FORMAT_VERSION: u16 = 3;
|
|
|
|
pub const DEFAULT_PG_VERSION: u32 = 16;
|
|
|
|
// Magic constants used to identify different kinds of files
|
|
pub const IMAGE_FILE_MAGIC: u16 = 0x5A60;
|
|
pub const DELTA_FILE_MAGIC: u16 = 0x5A61;
|
|
|
|
static ZERO_PAGE: bytes::Bytes = bytes::Bytes::from_static(&[0u8; 8192]);
|
|
|
|
pub use crate::metrics::preinitialize_metrics;
|
|
|
|
pub struct CancellableTask {
|
|
pub task: tokio::task::JoinHandle<()>,
|
|
pub cancel: CancellationToken,
|
|
}
|
|
pub struct HttpEndpointListener(pub CancellableTask);
|
|
pub struct ConsumptionMetricsTasks(pub CancellableTask);
|
|
pub struct DiskUsageEvictionTask(pub CancellableTask);
|
|
impl CancellableTask {
|
|
pub async fn shutdown(self) {
|
|
self.cancel.cancel();
|
|
self.task.await.unwrap();
|
|
}
|
|
}
|
|
|
|
#[tracing::instrument(skip_all, fields(%exit_code))]
|
|
#[allow(clippy::too_many_arguments)]
|
|
pub async fn shutdown_pageserver(
|
|
http_listener: HttpEndpointListener,
|
|
page_service: page_service::Listener,
|
|
consumption_metrics_worker: ConsumptionMetricsTasks,
|
|
disk_usage_eviction_task: Option<DiskUsageEvictionTask>,
|
|
tenant_manager: &TenantManager,
|
|
background_purges: BackgroundPurges,
|
|
mut deletion_queue: DeletionQueue,
|
|
secondary_controller_tasks: secondary::GlobalTasks,
|
|
exit_code: i32,
|
|
) {
|
|
use std::time::Duration;
|
|
|
|
let started_at = std::time::Instant::now();
|
|
|
|
// If the orderly shutdown below takes too long, we still want to make
|
|
// sure that all walredo processes are killed and wait()ed on by us, not systemd.
|
|
//
|
|
// (Leftover walredo processes are the hypothesized trigger for the systemd freezes
|
|
// that we keep seeing in prod => https://github.com/neondatabase/cloud/issues/11387.
|
|
//
|
|
// We use a thread instead of a tokio task because the background runtime is likely busy
|
|
// with the final flushing / uploads. This activity here has priority, and due to lack
|
|
// of scheduling priority feature sin the tokio scheduler, using a separate thread is
|
|
// an effective priority booster.
|
|
let walredo_extraordinary_shutdown_thread_span = {
|
|
let span = info_span!(parent: None, "walredo_extraordinary_shutdown_thread");
|
|
span.follows_from(tracing::Span::current());
|
|
span
|
|
};
|
|
let walredo_extraordinary_shutdown_thread_cancel = CancellationToken::new();
|
|
let walredo_extraordinary_shutdown_thread = std::thread::spawn({
|
|
let walredo_extraordinary_shutdown_thread_cancel =
|
|
walredo_extraordinary_shutdown_thread_cancel.clone();
|
|
move || {
|
|
let rt = tokio::runtime::Builder::new_current_thread()
|
|
.enable_all()
|
|
.build()
|
|
.unwrap();
|
|
let _entered = rt.enter();
|
|
let _entered = walredo_extraordinary_shutdown_thread_span.enter();
|
|
if let Ok(()) = rt.block_on(tokio::time::timeout(
|
|
Duration::from_secs(8),
|
|
walredo_extraordinary_shutdown_thread_cancel.cancelled(),
|
|
)) {
|
|
info!("cancellation requested");
|
|
return;
|
|
}
|
|
let managers = tenant::WALREDO_MANAGERS
|
|
.lock()
|
|
.unwrap()
|
|
// prevents new walredo managers from being inserted
|
|
.take()
|
|
.expect("only we take()");
|
|
// Use FuturesUnordered to get in queue early for each manager's
|
|
// heavier_once_cell semaphore wait list.
|
|
// Also, for idle tenants that for some reason haven't
|
|
// shut down yet, it's quite likely that we're not going
|
|
// to get Poll::Pending once.
|
|
let mut futs: FuturesUnordered<_> = managers
|
|
.into_iter()
|
|
.filter_map(|(_, mgr)| mgr.upgrade())
|
|
.map(|mgr| async move { tokio::task::unconstrained(mgr.shutdown()).await })
|
|
.collect();
|
|
info!(count=%futs.len(), "built FuturesUnordered");
|
|
let mut last_log_at = std::time::Instant::now();
|
|
#[derive(Debug, Default)]
|
|
struct Results {
|
|
initiated: u64,
|
|
already: u64,
|
|
}
|
|
let mut results = Results::default();
|
|
while let Some(we_initiated) = rt.block_on(futs.next()) {
|
|
if we_initiated {
|
|
results.initiated += 1;
|
|
} else {
|
|
results.already += 1;
|
|
}
|
|
if last_log_at.elapsed() > Duration::from_millis(100) {
|
|
info!(remaining=%futs.len(), ?results, "progress");
|
|
last_log_at = std::time::Instant::now();
|
|
}
|
|
}
|
|
info!(?results, "done");
|
|
}
|
|
});
|
|
|
|
// Shut down the libpq endpoint task. This prevents new connections from
|
|
// being accepted.
|
|
let remaining_connections = timed(
|
|
page_service.stop_accepting(),
|
|
"shutdown LibpqEndpointListener",
|
|
Duration::from_secs(1),
|
|
)
|
|
.await;
|
|
|
|
// Shut down all the tenants. This flushes everything to disk and kills
|
|
// the checkpoint and GC tasks.
|
|
timed(
|
|
tenant_manager.shutdown(),
|
|
"shutdown all tenants",
|
|
Duration::from_secs(5),
|
|
)
|
|
.await;
|
|
|
|
// Shut down any page service tasks: any in-progress work for particular timelines or tenants
|
|
// should already have been canclled via mgr::shutdown_all_tenants
|
|
timed(
|
|
remaining_connections.shutdown(),
|
|
"shutdown PageRequestHandlers",
|
|
Duration::from_secs(1),
|
|
)
|
|
.await;
|
|
|
|
// Best effort to persist any outstanding deletions, to avoid leaking objects
|
|
deletion_queue.shutdown(Duration::from_secs(5)).await;
|
|
|
|
timed(
|
|
consumption_metrics_worker.0.shutdown(),
|
|
"shutdown consumption metrics",
|
|
Duration::from_secs(1),
|
|
)
|
|
.await;
|
|
|
|
timed(
|
|
futures::future::OptionFuture::from(disk_usage_eviction_task.map(|t| t.0.shutdown())),
|
|
"shutdown disk usage eviction",
|
|
Duration::from_secs(1),
|
|
)
|
|
.await;
|
|
|
|
timed(
|
|
background_purges.shutdown(),
|
|
"shutdown background purges",
|
|
Duration::from_secs(1),
|
|
)
|
|
.await;
|
|
|
|
// Shut down the HTTP endpoint last, so that you can still check the server's
|
|
// status while it's shutting down.
|
|
// FIXME: We should probably stop accepting commands like attach/detach earlier.
|
|
timed(
|
|
http_listener.0.shutdown(),
|
|
"shutdown http",
|
|
Duration::from_secs(1),
|
|
)
|
|
.await;
|
|
|
|
timed(
|
|
secondary_controller_tasks.wait(), // cancellation happened in caller
|
|
"secondary controller wait",
|
|
Duration::from_secs(1),
|
|
)
|
|
.await;
|
|
|
|
// There should be nothing left, but let's be sure
|
|
timed(
|
|
task_mgr::shutdown_tasks(None, None, None),
|
|
"shutdown leftovers",
|
|
Duration::from_secs(1),
|
|
)
|
|
.await;
|
|
|
|
info!("cancel & join walredo_extraordinary_shutdown_thread");
|
|
walredo_extraordinary_shutdown_thread_cancel.cancel();
|
|
walredo_extraordinary_shutdown_thread.join().unwrap();
|
|
info!("walredo_extraordinary_shutdown_thread done");
|
|
|
|
info!(
|
|
elapsed_ms = started_at.elapsed().as_millis(),
|
|
"Shut down successfully completed"
|
|
);
|
|
std::process::exit(exit_code);
|
|
}
|
|
|
|
/// Per-tenant configuration file.
|
|
/// Full path: `tenants/<tenant_id>/config-v1`.
|
|
pub(crate) const TENANT_LOCATION_CONFIG_NAME: &str = "config-v1";
|
|
|
|
/// Per-tenant copy of their remote heatmap, downloaded into the local
|
|
/// tenant path while in secondary mode.
|
|
pub(crate) const TENANT_HEATMAP_BASENAME: &str = "heatmap-v1.json";
|
|
|
|
/// A suffix used for various temporary files. Any temporary files found in the
|
|
/// data directory at pageserver startup can be automatically removed.
|
|
pub(crate) const TEMP_FILE_SUFFIX: &str = "___temp";
|
|
|
|
/// A marker file to mark that a timeline directory was not fully initialized.
|
|
/// If a timeline directory with this marker is encountered at pageserver startup,
|
|
/// the timeline directory and the marker file are both removed.
|
|
/// Full path: `tenants/<tenant_id>/timelines/<timeline_id>___uninit`.
|
|
pub(crate) const TIMELINE_UNINIT_MARK_SUFFIX: &str = "___uninit";
|
|
|
|
pub(crate) const TIMELINE_DELETE_MARK_SUFFIX: &str = "___delete";
|
|
|
|
pub fn is_temporary(path: &Utf8Path) -> bool {
|
|
match path.file_name() {
|
|
Some(name) => name.ends_with(TEMP_FILE_SUFFIX),
|
|
None => false,
|
|
}
|
|
}
|
|
|
|
fn ends_with_suffix(path: &Utf8Path, suffix: &str) -> bool {
|
|
match path.file_name() {
|
|
Some(name) => name.ends_with(suffix),
|
|
None => false,
|
|
}
|
|
}
|
|
|
|
// FIXME: DO NOT ADD new query methods like this, which will have a next step of parsing timelineid
|
|
// from the directory name. Instead create type "UninitMark(TimelineId)" and only parse it once
|
|
// from the name.
|
|
|
|
pub(crate) fn is_uninit_mark(path: &Utf8Path) -> bool {
|
|
ends_with_suffix(path, TIMELINE_UNINIT_MARK_SUFFIX)
|
|
}
|
|
|
|
pub(crate) fn is_delete_mark(path: &Utf8Path) -> bool {
|
|
ends_with_suffix(path, TIMELINE_DELETE_MARK_SUFFIX)
|
|
}
|
|
|
|
/// During pageserver startup, we need to order operations not to exhaust tokio worker threads by
|
|
/// blocking.
|
|
///
|
|
/// The instances of this value exist only during startup, otherwise `None` is provided, meaning no
|
|
/// delaying is needed.
|
|
#[derive(Clone)]
|
|
pub struct InitializationOrder {
|
|
/// Each initial tenant load task carries this until it is done loading timelines from remote storage
|
|
pub initial_tenant_load_remote: Option<utils::completion::Completion>,
|
|
|
|
/// Each initial tenant load task carries this until completion.
|
|
pub initial_tenant_load: Option<utils::completion::Completion>,
|
|
|
|
/// Barrier for when we can start any background jobs.
|
|
///
|
|
/// This can be broken up later on, but right now there is just one class of a background job.
|
|
pub background_jobs_can_start: utils::completion::Barrier,
|
|
}
|
|
|
|
/// Time the future with a warning when it exceeds a threshold.
|
|
async fn timed<Fut: std::future::Future>(
|
|
fut: Fut,
|
|
name: &str,
|
|
warn_at: std::time::Duration,
|
|
) -> <Fut as std::future::Future>::Output {
|
|
let started = std::time::Instant::now();
|
|
|
|
let mut fut = std::pin::pin!(fut);
|
|
|
|
match tokio::time::timeout(warn_at, &mut fut).await {
|
|
Ok(ret) => {
|
|
tracing::info!(
|
|
stage = name,
|
|
elapsed_ms = started.elapsed().as_millis(),
|
|
"completed"
|
|
);
|
|
ret
|
|
}
|
|
Err(_) => {
|
|
tracing::info!(
|
|
stage = name,
|
|
elapsed_ms = started.elapsed().as_millis(),
|
|
"still waiting, taking longer than expected..."
|
|
);
|
|
|
|
let ret = fut.await;
|
|
|
|
// this has a global allowed_errors
|
|
tracing::warn!(
|
|
stage = name,
|
|
elapsed_ms = started.elapsed().as_millis(),
|
|
"completed, took longer than expected"
|
|
);
|
|
|
|
ret
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Like [`timed`], but the warning timeout only starts after `cancel` has been cancelled.
|
|
async fn timed_after_cancellation<Fut: std::future::Future>(
|
|
fut: Fut,
|
|
name: &str,
|
|
warn_at: std::time::Duration,
|
|
cancel: &CancellationToken,
|
|
) -> <Fut as std::future::Future>::Output {
|
|
let mut fut = std::pin::pin!(fut);
|
|
|
|
tokio::select! {
|
|
_ = cancel.cancelled() => {
|
|
timed(fut, name, warn_at).await
|
|
}
|
|
ret = &mut fut => {
|
|
ret
|
|
}
|
|
}
|
|
}
|
|
|
|
#[cfg(test)]
|
|
mod timed_tests {
|
|
use super::timed;
|
|
use std::time::Duration;
|
|
|
|
#[tokio::test]
|
|
async fn timed_completes_when_inner_future_completes() {
|
|
// A future that completes on time should have its result returned
|
|
let r1 = timed(
|
|
async move {
|
|
tokio::time::sleep(Duration::from_millis(10)).await;
|
|
123
|
|
},
|
|
"test 1",
|
|
Duration::from_millis(50),
|
|
)
|
|
.await;
|
|
assert_eq!(r1, 123);
|
|
|
|
// A future that completes too slowly should also have its result returned
|
|
let r1 = timed(
|
|
async move {
|
|
tokio::time::sleep(Duration::from_millis(50)).await;
|
|
456
|
|
},
|
|
"test 1",
|
|
Duration::from_millis(10),
|
|
)
|
|
.await;
|
|
assert_eq!(r1, 456);
|
|
}
|
|
}
|