Compare commits

...

29 Commits

Author SHA1 Message Date
Christian Schwarz
02205e9191 measured BACKGROUND_RUNTIME performance using wrk
Launch wrk from command line 3-4 seconds after the load starts.
=> blocking of executor threads is clearly visible, my branch
  performs _much_ better.

baseline: commit 15b8618d25 (HEAD -> problame/loadtest-baseline, origin/problame/loadtest-baseline, main)
neon-main (compaction semaphore disabled!)

admin@ip-172-31-13-23:[~/neon]: wrk --latency http://localhost:2342
Running 10s test @ http://localhost:2342
  2 threads and 10 connections
  Thread Stats   Avg      Stdev     Max   +/- Stdev
    Latency    71.42ms   15.97ms 125.18ms   70.82%
    Req/Sec    41.44     28.85   101.00     57.35%
  Latency Distribution
     50%   72.53ms
     75%   82.07ms
     90%   91.44ms
     99%  116.56ms
  291 requests in 10.01s, 22.73KB read
  Socket errors: connect 0, read 0, write 0, timeout 10
Requests/sec:     29.07
Transfer/sec:      2.27KB

this branch (comapction semaphore also disabled!):

admin@ip-172-31-13-23:[~/neon]: wrk --latency http://localhost:2342
Running 10s test @ http://localhost:2342
  2 threads and 10 connections
  Thread Stats   Avg      Stdev     Max   +/- Stdev
    Latency    45.74ms   64.13ms 293.44ms   83.27%
    Req/Sec   442.81    258.18     1.32k    69.79%
  Latency Distribution
     50%    2.92ms
     75%   75.52ms
     90%  148.03ms
     99%  248.50ms
  8641 requests in 10.01s, 675.08KB read
Requests/sec:    862.81
Transfer/sec:     67.41KB
2023-08-31 08:02:17 +00:00
Christian Schwarz
66c501f5b8 HACK: BACKGROUND_RUNTIME webserver to measure response time using wrk 2023-08-31 07:50:13 +00:00
Christian Schwarz
887f464825 use hacked-together open_at for async VirtualFile open calls instead of spawn_blocking
This makes Delta/Image ::load fns fully tokio-epoll-uring
2023-08-29 19:14:35 +00:00
Christian Schwarz
0b8ff8dbe0 Revert "switch back to spawn_blocking to make the comparison"
This reverts commit 60971e282e.
2023-08-29 16:42:14 +00:00
Christian Schwarz
60971e282e switch back to spawn_blocking to make the comparison 2023-08-29 16:00:21 +00:00
Christian Schwarz
189aa1b077 with the page cache removed, we spend almost 0 time in futex 2023-08-29 15:53:41 +00:00
Christian Schwarz
9f087f93f8 buffer pool impl: re-use allocations 2023-08-29 15:49:05 +00:00
Christian Schwarz
fa1fb214b3 also rip out memoization code and make rest compile 2023-08-29 15:28:14 +00:00
Christian Schwarz
4db24c9de0 RIP out page cache, but keep memoization code (doesn't compile) 2023-08-29 15:27:55 +00:00
Christian Schwarz
5f920a9993 profile.release=debug 2023-08-29 15:05:02 +00:00
Christian Schwarz
2aac082385 disable concurrent compactions limiter 2023-08-29 13:05:18 +00:00
Christian Schwarz
99f8f87ba5 spawn_blocking-based file open for image and delta layer loading 2023-08-29 12:49:53 +00:00
Christian Schwarz
e7e1df2a79 tokio_epoll_uring for read path 2023-08-29 12:24:30 +00:00
Christian Schwarz
10ee8f7981 FileBlockReaderFile is not needed, was doing all the sync IO 2023-08-29 11:04:16 +00:00
Christian Schwarz
a5b6e32b01 PoC using spawn_blocking 2023-08-29 09:59:53 +00:00
Christian Schwarz
bb88e5bf57 try to convert to async, now lifetime errors because buffer lifetime is insufficient 2023-08-29 09:53:01 +00:00
Christian Schwarz
876efcfc0a REPRO the problem: , uses 430GB of space; 4 seconds load time; constant 20kIOPS after ~20s 2023-08-29 09:39:39 +00:00
Christian Schwarz
4d69192ae5 QUICK HACK: rip out virtual file cache 2023-08-29 09:37:08 +00:00
Christian Schwarz
2d5d046062 WIP switch to tokio::sync::RwLock 2023-08-29 11:21:29 +02:00
Christian Schwarz
d8b8a203a4 WIP: async with_file & read_at_async, problem is that FileSlotGuard is not Send 2023-08-29 11:14:07 +02:00
Christian Schwarz
11e9b25f2b refactor: get_file_guard & base with_file on it 2023-08-29 11:02:42 +02:00
Christian Schwarz
235baffbf4 make the read path async, except the read_at impl 2023-08-29 10:50:21 +02:00
Christian Schwarz
5be0f9d69a read_at need not be public 2023-08-29 10:48:29 +02:00
Christian Schwarz
e91e4d0b96 move code around to minimize diff 2023-08-29 10:42:23 +02:00
Arpad Müller
edbe3d2f76 Remove Read impl that was only used in one place 2023-08-29 01:52:39 +02:00
Arpad Müller
0d9fa95454 Move used FileExt functions to inherent impls 2023-08-29 01:52:39 +02:00
Arpad Müller
e983b3cc2e Don't use generics bounded by trait 2023-08-29 01:52:39 +02:00
Arpad Müller
a362ab9169 Move VirtualFile::seek to inherent function 2023-08-28 22:46:36 +02:00
Arpad Müller
0cfc9edcb8 Make read_blk and parts of the page cache async
The returned PageReadGuard is not Send so we change the locks used for
the SlotInner's in the page cache to the ones from tokio.

Also, make read_blk async.
2023-08-28 22:43:39 +02:00
21 changed files with 511 additions and 1634 deletions

83
Cargo.lock generated
View File

@@ -1996,6 +1996,26 @@ dependencies = [
"windows-sys 0.48.0",
]
[[package]]
name = "io-uring"
version = "0.5.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dd1e1a01cfb924fd8c5c43b6827965db394f5a3a16c599ce03452266e1cf984c"
dependencies = [
"bitflags",
"libc",
]
[[package]]
name = "io-uring"
version = "0.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "141a0f4546a50b2ed637c7a6df0d7dff45c9f41523254996764461c8ae0d9424"
dependencies = [
"bitflags",
"libc",
]
[[package]]
name = "ipnet"
version = "2.7.2"
@@ -2095,9 +2115,9 @@ checksum = "830d08ce1d1d941e6b30645f1a0eb5643013d835ce3779a5fc208261dbe10f55"
[[package]]
name = "libc"
version = "0.2.144"
version = "0.2.147"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2b00cc1c228a6782d0f076e7b232802e0c5689d41bb5df366f2a6b6621cfdfe1"
checksum = "b4668fb0ea861c1df094127ac5f1da3409a82116a4ba74fca2e58ef927159bb3"
[[package]]
name = "libloading"
@@ -2663,6 +2683,7 @@ dependencies = [
"tenant_size_model",
"thiserror",
"tokio",
"tokio-epoll-uring",
"tokio-io-timeout",
"tokio-postgres",
"tokio-tar",
@@ -2836,9 +2857,9 @@ dependencies = [
[[package]]
name = "pin-project-lite"
version = "0.2.9"
version = "0.2.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e0a7ae3ac2f1173085d398531c705756c94a4c56843785df85a60c1a0afac116"
checksum = "8afb450f006bf6385ca15ef45d71d2288452bc3683ce2e2cacc0d18e4be60b58"
[[package]]
name = "pin-utils"
@@ -3725,6 +3746,12 @@ dependencies = [
"windows-sys 0.42.0",
]
[[package]]
name = "scoped-tls"
version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e1cf6437eb19a8f4a6cc0f7dca544973b0b78843adbfeb3683d1a94a0024a294"
[[package]]
name = "scopeguard"
version = "1.1.0"
@@ -4296,18 +4323,18 @@ dependencies = [
[[package]]
name = "thiserror"
version = "1.0.40"
version = "1.0.47"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "978c9a314bd8dc99be594bc3c175faaa9794be04a5a5e153caba6915336cebac"
checksum = "97a802ec30afc17eee47b2855fc72e0c4cd62be9b4efe6591edde0ec5bd68d8f"
dependencies = [
"thiserror-impl",
]
[[package]]
name = "thiserror-impl"
version = "1.0.40"
version = "1.0.47"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f9456a42c5b0d803c8cd86e73dd7cc9edd429499f37a3550d286d5e86720569f"
checksum = "6bb623b56e39ab7dcd4b1b98bb6c8f8d907ed255b18de254088016b27a8ee19b"
dependencies = [
"proc-macro2",
"quote",
@@ -4392,22 +4419,40 @@ dependencies = [
[[package]]
name = "tokio"
version = "1.28.1"
version = "1.32.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0aa32867d44e6f2ce3385e89dceb990188b8bb0fb25b0cf576647a6f98ac5105"
checksum = "17ed6077ed6cd6c74735e21f37eb16dc3935f96878b1fe961074089cc80893f9"
dependencies = [
"autocfg",
"backtrace",
"bytes",
"libc",
"mio",
"num_cpus",
"parking_lot 0.12.1",
"pin-project-lite",
"signal-hook-registry",
"socket2 0.4.9",
"socket2 0.5.3",
"tokio-macros",
"windows-sys 0.48.0",
]
[[package]]
name = "tokio-epoll-uring"
version = "0.1.0"
source = "git+https://github.com/neondatabase/tokio-epoll-uring.git?branch=problame/hacky-openat#96e5a1f3a3d6921438002807475d01540e1211b2"
dependencies = [
"futures",
"io-uring 0.6.1",
"libc",
"once_cell",
"scopeguard",
"thiserror",
"tokio",
"tokio-uring",
"tokio-util",
"tracing",
]
[[package]]
name = "tokio-io-timeout"
version = "1.2.0"
@@ -4547,6 +4592,20 @@ dependencies = [
"tungstenite 0.20.0",
]
[[package]]
name = "tokio-uring"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0d5e02bb137e030b3a547c65a3bd2f1836d66a97369fdcc69034002b10e155ef"
dependencies = [
"io-uring 0.5.13",
"libc",
"scoped-tls",
"slab",
"socket2 0.4.9",
"tokio",
]
[[package]]
name = "tokio-util"
version = "0.7.8"

View File

@@ -200,58 +200,3 @@ tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", re
# Besides, debug info should not affect the performance.
debug = true
# disable debug symbols for all packages except this one to decrease binaries size
[profile.release.package."*"]
debug = false
[profile.release-line-debug]
inherits = "release"
debug = 1 # true = 2 = all symbols, 1 = line only
[profile.release-line-debug-lto]
inherits = "release"
debug = 1 # true = 2 = all symbols, 1 = line only
lto = true
[profile.release-line-debug-size]
inherits = "release"
debug = 1 # true = 2 = all symbols, 1 = line only
opt-level = "s"
[profile.release-line-debug-zize]
inherits = "release"
debug = 1 # true = 2 = all symbols, 1 = line only
opt-level = "z"
[profile.release-line-debug-size-lto]
inherits = "release"
debug = 1 # true = 2 = all symbols, 1 = line only
opt-level = "s"
lto = true
[profile.release-line-debug-zize-lto]
inherits = "release"
debug = 1 # true = 2 = all symbols, 1 = line only
opt-level = "z"
lto = true
[profile.release-no-debug]
inherits = "release"
debug = false # true = 2 = all symbols, 1 = line only
[profile.release-no-debug-size]
inherits = "release"
debug = false # true = 2 = all symbols, 1 = line only
opt-level = "s"
[profile.release-no-debug-zize]
inherits = "release"
debug = false # true = 2 = all symbols, 1 = line only
opt-level = "z"
[profile.release-no-debug-size-lto]
inherits = "release"
debug = false # true = 2 = all symbols, 1 = line only
opt-level = "s"
lto = true
[profile.release-no-debug-zize-lto]
inherits = "release"
debug = false # true = 2 = all symbols, 1 = line only
opt-level = "z"
lto = true

View File

@@ -80,6 +80,8 @@ enum-map.workspace = true
enumset.workspace = true
strum.workspace = true
strum_macros.workspace = true
#tokio-epoll-uring = { path = "/home/admin/tokio-epoll-uring/tokio-epoll-uring" }
tokio-epoll-uring = { git = "https://github.com/neondatabase/tokio-epoll-uring.git" , branch = "problame/hacky-openat" }
[dev-dependencies]
criterion.workspace = true

View File

@@ -8,10 +8,9 @@ use std::collections::BinaryHeap;
use std::ops::Range;
use std::{fs, path::Path, str};
use pageserver::page_cache::PAGE_SZ;
use pageserver::repository::{Key, KEY_SIZE};
use pageserver::tenant::block_io::FileBlockReader;
use pageserver::tenant::disk_btree::{DiskBtreeReader, VisitDirection};
use pageserver::tenant::disk_btree::{DiskBtreeReader, VisitDirection, PAGE_SZ};
use pageserver::tenant::storage_layer::delta_layer::{Summary, DELTA_KEY_SIZE};
use pageserver::tenant::storage_layer::range_overlaps;
use pageserver::virtual_file::VirtualFile;
@@ -97,7 +96,7 @@ pub(crate) fn parse_filename(name: &str) -> Option<LayerFile> {
// Finds the max_holes largest holes, ignoring any that are smaller than MIN_HOLE_LENGTH"
async fn get_holes(path: &Path, max_holes: usize) -> Result<Vec<Hole>> {
let file = FileBlockReader::new(VirtualFile::open(path)?);
let summary_blk = file.read_blk(0)?;
let summary_blk = file.read_blk(0).await?;
let actual_summary = Summary::des_prefix(summary_blk.as_ref())?;
let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
actual_summary.index_start_blk,
@@ -135,10 +134,6 @@ pub(crate) async fn main(cmd: &AnalyzeLayerMapCmd) -> Result<()> {
let storage_path = &cmd.path;
let max_holes = cmd.max_holes.unwrap_or(DEFAULT_MAX_HOLES);
// Initialize virtual_file (file desriptor cache) and page cache which are needed to access layer persistent B-Tree.
pageserver::virtual_file::init(10);
pageserver::page_cache::init(100);
let mut total_delta_layers = 0usize;
let mut total_image_layers = 0usize;
let mut total_excess_layers = 0usize;

View File

@@ -5,7 +5,6 @@ use clap::Subcommand;
use pageserver::tenant::block_io::BlockCursor;
use pageserver::tenant::disk_btree::DiskBtreeReader;
use pageserver::tenant::storage_layer::delta_layer::{BlobRef, Summary};
use pageserver::{page_cache, virtual_file};
use pageserver::{
repository::{Key, KEY_SIZE},
tenant::{
@@ -45,10 +44,8 @@ pub(crate) enum LayerCmd {
async fn read_delta_file(path: impl AsRef<Path>) -> Result<()> {
let path = path.as_ref();
virtual_file::init(10);
page_cache::init(100);
let file = FileBlockReader::new(VirtualFile::open(path)?);
let summary_blk = file.read_blk(0)?;
let summary_blk = file.read_blk(0).await?;
let actual_summary = Summary::des_prefix(summary_blk.as_ref())?;
let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
actual_summary.index_start_blk,

View File

@@ -12,10 +12,8 @@ use clap::{Parser, Subcommand};
use layers::LayerCmd;
use pageserver::{
context::{DownloadBehavior, RequestContext},
page_cache,
task_mgr::TaskKind,
task_mgr::TaskKind,
tenant::{dump_layerfile_from_path, metadata::TimelineMetadata},
virtual_file,
};
use postgres_ffi::ControlFileData;
use std::path::{Path, PathBuf};
@@ -115,9 +113,6 @@ fn read_pg_control_file(control_file_path: &Path) -> anyhow::Result<()> {
}
async fn print_layerfile(path: &Path) -> anyhow::Result<()> {
// Basic initialization of things that don't change after startup
virtual_file::init(10);
page_cache::init(100);
let ctx = RequestContext::new(TaskKind::DebugTool, DownloadBehavior::Error);
dump_layerfile_from_path(path, true, &ctx).await
}

View File

@@ -20,11 +20,10 @@ use metrics::set_build_info_metric;
use pageserver::{
config::{defaults::*, PageServerConf},
context::{DownloadBehavior, RequestContext},
http, page_cache, page_service, task_mgr,
http, page_service, task_mgr,
task_mgr::TaskKind,
task_mgr::{BACKGROUND_RUNTIME, COMPUTE_REQUEST_RUNTIME, MGMT_REQUEST_RUNTIME},
tenant::mgr,
virtual_file,
};
use postgres_backend::AuthType;
use utils::logging::TracingErrorLayerEnablement;
@@ -124,10 +123,6 @@ fn main() -> anyhow::Result<()> {
// Initialize up failpoints support
let scenario = pageserver::failpoint_support::init();
// Basic initialization of things that don't change after startup
virtual_file::init(conf.max_file_descriptors);
page_cache::init(conf.page_cache_size);
start_pageserver(launch_ts, conf).context("Failed to start pageserver")?;
scenario.teardown();
@@ -581,6 +576,31 @@ fn start_pageserver(
);
}
task_mgr::spawn(
BACKGROUND_RUNTIME.handle(),
TaskKind::BackgroundRuntimeTurnaroundMeasure,
None,
None,
"background runtime turnaround measure",
true,
async move {
let server = hyper::Server::try_bind(&"0.0.0.0:2342".parse().unwrap()).expect("bind");
let server = server
.serve(hyper::service::make_service_fn(|_| async move {
Ok::<_, std::convert::Infallible>(hyper::service::service_fn(
move |_: hyper::Request<hyper::Body>| async move {
Ok::<_, std::convert::Infallible>(hyper::Response::new(
hyper::Body::from(format!("alive")),
))
},
))
}))
.with_graceful_shutdown(task_mgr::shutdown_watcher());
server.await?;
Ok(())
},
);
let mut shutdown_pageserver = Some(shutdown_pageserver.drop_guard());
// All started up! Now just sit and wait for shutdown signal.

View File

@@ -0,0 +1,39 @@
use std::cell::RefCell;
use crate::tenant::disk_btree::PAGE_SZ;
pub struct Buffer(Option<Box<[u8; PAGE_SZ]>>);
// Thread-local list of re-usable buffers.
thread_local! {
static POOL: RefCell<Vec<Box<[u8; PAGE_SZ]>>> = RefCell::new(Vec::new());
}
pub(crate) fn get() -> Buffer {
let maybe = POOL.with(|rc| rc.borrow_mut().pop());
match maybe {
Some(buf) => Buffer(Some(buf)),
None => Buffer(Some(Box::new([0; PAGE_SZ]))),
}
}
impl Drop for Buffer {
fn drop(&mut self) {
let buf = self.0.take().unwrap();
POOL.with(|rc| rc.borrow_mut().push(buf))
}
}
impl std::ops::Deref for Buffer {
type Target = [u8; PAGE_SZ];
fn deref(&self) -> &Self::Target {
self.0.as_ref().unwrap().as_ref()
}
}
impl std::ops::DerefMut for Buffer {
fn deref_mut(&mut self) -> &mut Self::Target {
self.0.as_mut().unwrap().as_mut()
}
}

View File

@@ -8,7 +8,6 @@ pub mod http;
pub mod import_datadir;
pub mod keyspace;
pub mod metrics;
pub mod page_cache;
pub mod page_service;
pub mod pgdatadir_mapping;
pub mod repository;
@@ -28,6 +27,8 @@ use std::path::Path;
use crate::task_mgr::TaskKind;
use tracing::info;
pub mod buffer_pool;
/// Current storage format version
///
/// This is embedded in the header of all the layer files.

View File

@@ -1,848 +0,0 @@
//!
//! Global page cache
//!
//! The page cache uses up most of the memory in the page server. It is shared
//! by all tenants, and it is used to store different kinds of pages. Sharing
//! the cache allows memory to be dynamically allocated where it's needed the
//! most.
//!
//! The page cache consists of fixed-size buffers, 8 kB each to match the
//! PostgreSQL buffer size, and a Slot struct for each buffer to contain
//! information about what's stored in the buffer.
//!
//! # Types Of Pages
//!
//! [`PageCache`] only supports immutable pages.
//! Hence there is no need to worry about coherency.
//!
//! Two types of pages are supported:
//!
//! * **Materialized pages**, filled & used by page reconstruction
//! * **Immutable File pages**, filled & used by [`crate::tenant::block_io`] and [`crate::tenant::ephemeral_file`].
//!
//! Note that [`crate::tenant::ephemeral_file::EphemeralFile`] is generally mutable, but, it's append-only.
//! It uses the page cache only for the blocks that are already fully written and immutable.
//!
//! # Filling The Page Cache
//!
//! Page cache maps from a cache key to a buffer slot.
//! The cache key uniquely identifies the piece of data that is being cached.
//!
//! The cache key for **materialized pages** is [`TenantId`], [`TimelineId`], [`Key`], and [`Lsn`].
//! Use [`PageCache::memorize_materialized_page`] and [`PageCache::lookup_materialized_page`] for fill & access.
//!
//! The cache key for **immutable file** pages is [`FileId`] and a block number.
//! Users of page cache that wish to page-cache an arbitrary (immutable!) on-disk file do the following:
//! * Have a mechanism to deterministically associate the on-disk file with a [`FileId`].
//! * Get a [`FileId`] using [`next_file_id`].
//! * Use the mechanism to associate the on-disk file with the returned [`FileId`].
//! * Use [`PageCache::read_immutable_buf`] to get a [`ReadBufResult`].
//! * If the page was already cached, it'll be the [`ReadBufResult::Found`] variant that contains
//! a read guard for the page. Just use it.
//! * If the page was not cached, it'll be the [`ReadBufResult::NotFound`] variant that contains
//! a write guard for the page. Fill the page with the contents of the on-disk file.
//! Then call [`PageWriteGuard::mark_valid`] to mark the page as valid.
//! Then try again to [`PageCache::read_immutable_buf`].
//! Unless there's high cache pressure, the page should now be cached.
//! (TODO: allow downgrading the write guard to a read guard to ensure forward progress.)
//!
//! # Locking
//!
//! There are two levels of locking involved: There's one lock for the "mapping"
//! from page identifier (tenant ID, timeline ID, rel, block, LSN) to the buffer
//! slot, and a separate lock on each slot. To read or write the contents of a
//! slot, you must hold the lock on the slot in read or write mode,
//! respectively. To change the mapping of a slot, i.e. to evict a page or to
//! assign a buffer for a page, you must hold the mapping lock and the lock on
//! the slot at the same time.
//!
//! Whenever you need to hold both locks simultaneously, the slot lock must be
//! acquired first. This consistent ordering avoids deadlocks. To look up a page
//! in the cache, you would first look up the mapping, while holding the mapping
//! lock, and then lock the slot. You must release the mapping lock in between,
//! to obey the lock ordering and avoid deadlock.
//!
//! A slot can momentarily have invalid contents, even if it's already been
//! inserted to the mapping, but you must hold the write-lock on the slot until
//! the contents are valid. If you need to release the lock without initializing
//! the contents, you must remove the mapping first. We make that easy for the
//! callers with PageWriteGuard: when lock_for_write() returns an uninitialized
//! page, the caller must explicitly call guard.mark_valid() after it has
//! initialized it. If the guard is dropped without calling mark_valid(), the
//! mapping is automatically removed and the slot is marked free.
//!
use std::{
collections::{hash_map::Entry, HashMap},
convert::TryInto,
sync::{
atomic::{AtomicU64, AtomicU8, AtomicUsize, Ordering},
RwLock, RwLockReadGuard, RwLockWriteGuard, TryLockError,
},
};
use anyhow::Context;
use once_cell::sync::OnceCell;
use utils::{
id::{TenantId, TimelineId},
lsn::Lsn,
};
use crate::{metrics::PageCacheSizeMetrics, repository::Key};
static PAGE_CACHE: OnceCell<PageCache> = OnceCell::new();
const TEST_PAGE_CACHE_SIZE: usize = 50;
///
/// Initialize the page cache. This must be called once at page server startup.
///
pub fn init(size: usize) {
if PAGE_CACHE.set(PageCache::new(size)).is_err() {
panic!("page cache already initialized");
}
}
///
/// Get a handle to the page cache.
///
pub fn get() -> &'static PageCache {
//
// In unit tests, page server startup doesn't happen and no one calls
// page_cache::init(). Initialize it here with a tiny cache, so that the
// page cache is usable in unit tests.
//
if cfg!(test) {
PAGE_CACHE.get_or_init(|| PageCache::new(TEST_PAGE_CACHE_SIZE))
} else {
PAGE_CACHE.get().expect("page cache not initialized")
}
}
pub const PAGE_SZ: usize = postgres_ffi::BLCKSZ as usize;
const MAX_USAGE_COUNT: u8 = 5;
/// See module-level comment.
#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
pub struct FileId(u64);
static NEXT_ID: AtomicU64 = AtomicU64::new(1);
/// See module-level comment.
pub fn next_file_id() -> FileId {
FileId(NEXT_ID.fetch_add(1, Ordering::Relaxed))
}
///
/// CacheKey uniquely identifies a "thing" to cache in the page cache.
///
#[derive(Debug, PartialEq, Eq, Clone)]
#[allow(clippy::enum_variant_names)]
enum CacheKey {
MaterializedPage {
hash_key: MaterializedPageHashKey,
lsn: Lsn,
},
ImmutableFilePage {
file_id: FileId,
blkno: u32,
},
}
#[derive(Debug, PartialEq, Eq, Hash, Clone)]
struct MaterializedPageHashKey {
tenant_id: TenantId,
timeline_id: TimelineId,
key: Key,
}
#[derive(Clone)]
struct Version {
lsn: Lsn,
slot_idx: usize,
}
struct Slot {
inner: RwLock<SlotInner>,
usage_count: AtomicU8,
}
struct SlotInner {
key: Option<CacheKey>,
buf: &'static mut [u8; PAGE_SZ],
}
impl Slot {
/// Increment usage count on the buffer, with ceiling at MAX_USAGE_COUNT.
fn inc_usage_count(&self) {
let _ = self
.usage_count
.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |val| {
if val == MAX_USAGE_COUNT {
None
} else {
Some(val + 1)
}
});
}
/// Decrement usage count on the buffer, unless it's already zero. Returns
/// the old usage count.
fn dec_usage_count(&self) -> u8 {
let count_res =
self.usage_count
.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |val| {
if val == 0 {
None
} else {
Some(val - 1)
}
});
match count_res {
Ok(usage_count) => usage_count,
Err(usage_count) => usage_count,
}
}
}
pub struct PageCache {
/// This contains the mapping from the cache key to buffer slot that currently
/// contains the page, if any.
///
/// TODO: This is protected by a single lock. If that becomes a bottleneck,
/// this HashMap can be replaced with a more concurrent version, there are
/// plenty of such crates around.
///
/// If you add support for caching different kinds of objects, each object kind
/// can have a separate mapping map, next to this field.
materialized_page_map: RwLock<HashMap<MaterializedPageHashKey, Vec<Version>>>,
immutable_page_map: RwLock<HashMap<(FileId, u32), usize>>,
/// The actual buffers with their metadata.
slots: Box<[Slot]>,
/// Index of the next candidate to evict, for the Clock replacement algorithm.
/// This is interpreted modulo the page cache size.
next_evict_slot: AtomicUsize,
size_metrics: &'static PageCacheSizeMetrics,
}
///
/// PageReadGuard is a "lease" on a buffer, for reading. The page is kept locked
/// until the guard is dropped.
///
pub struct PageReadGuard<'i>(RwLockReadGuard<'i, SlotInner>);
impl std::ops::Deref for PageReadGuard<'_> {
type Target = [u8; PAGE_SZ];
fn deref(&self) -> &Self::Target {
self.0.buf
}
}
impl AsRef<[u8; PAGE_SZ]> for PageReadGuard<'_> {
fn as_ref(&self) -> &[u8; PAGE_SZ] {
self.0.buf
}
}
///
/// PageWriteGuard is a lease on a buffer for modifying it. The page is kept locked
/// until the guard is dropped.
///
/// Counterintuitively, this is used even for a read, if the requested page is not
/// currently found in the page cache. In that case, the caller of lock_for_read()
/// is expected to fill in the page contents and call mark_valid(). Similarly
/// lock_for_write() can return an invalid buffer that the caller is expected to
/// to initialize.
///
pub struct PageWriteGuard<'i> {
inner: RwLockWriteGuard<'i, SlotInner>,
// Are the page contents currently valid?
valid: bool,
}
impl std::ops::DerefMut for PageWriteGuard<'_> {
fn deref_mut(&mut self) -> &mut Self::Target {
self.inner.buf
}
}
impl std::ops::Deref for PageWriteGuard<'_> {
type Target = [u8; PAGE_SZ];
fn deref(&self) -> &Self::Target {
self.inner.buf
}
}
impl AsMut<[u8; PAGE_SZ]> for PageWriteGuard<'_> {
fn as_mut(&mut self) -> &mut [u8; PAGE_SZ] {
self.inner.buf
}
}
impl PageWriteGuard<'_> {
/// Mark that the buffer contents are now valid.
pub fn mark_valid(&mut self) {
assert!(self.inner.key.is_some());
assert!(
!self.valid,
"mark_valid called on a buffer that was already valid"
);
self.valid = true;
}
}
impl Drop for PageWriteGuard<'_> {
///
/// If the buffer was allocated for a page that was not already in the
/// cache, but the lock_for_read/write() caller dropped the buffer without
/// initializing it, remove the mapping from the page cache.
///
fn drop(&mut self) {
assert!(self.inner.key.is_some());
if !self.valid {
let self_key = self.inner.key.as_ref().unwrap();
PAGE_CACHE.get().unwrap().remove_mapping(self_key);
self.inner.key = None;
}
}
}
/// lock_for_read() return value
pub enum ReadBufResult<'a> {
Found(PageReadGuard<'a>),
NotFound(PageWriteGuard<'a>),
}
/// lock_for_write() return value
pub enum WriteBufResult<'a> {
Found(PageWriteGuard<'a>),
NotFound(PageWriteGuard<'a>),
}
impl PageCache {
//
// Section 1.1: Public interface functions for looking up and memorizing materialized page
// versions in the page cache
//
/// Look up a materialized page version.
///
/// The 'lsn' is an upper bound, this will return the latest version of
/// the given block, but not newer than 'lsn'. Returns the actual LSN of the
/// returned page.
pub fn lookup_materialized_page(
&self,
tenant_id: TenantId,
timeline_id: TimelineId,
key: &Key,
lsn: Lsn,
) -> Option<(Lsn, PageReadGuard)> {
crate::metrics::PAGE_CACHE
.read_accesses_materialized_page
.inc();
let mut cache_key = CacheKey::MaterializedPage {
hash_key: MaterializedPageHashKey {
tenant_id,
timeline_id,
key: *key,
},
lsn,
};
if let Some(guard) = self.try_lock_for_read(&mut cache_key) {
if let CacheKey::MaterializedPage {
hash_key: _,
lsn: available_lsn,
} = cache_key
{
if available_lsn == lsn {
crate::metrics::PAGE_CACHE
.read_hits_materialized_page_exact
.inc();
} else {
crate::metrics::PAGE_CACHE
.read_hits_materialized_page_older_lsn
.inc();
}
Some((available_lsn, guard))
} else {
panic!("unexpected key type in slot");
}
} else {
None
}
}
///
/// Store an image of the given page in the cache.
///
pub fn memorize_materialized_page(
&self,
tenant_id: TenantId,
timeline_id: TimelineId,
key: Key,
lsn: Lsn,
img: &[u8],
) -> anyhow::Result<()> {
let cache_key = CacheKey::MaterializedPage {
hash_key: MaterializedPageHashKey {
tenant_id,
timeline_id,
key,
},
lsn,
};
match self.lock_for_write(&cache_key)? {
WriteBufResult::Found(write_guard) => {
// We already had it in cache. Another thread must've put it there
// concurrently. Check that it had the same contents that we
// replayed.
assert!(*write_guard == img);
}
WriteBufResult::NotFound(mut write_guard) => {
write_guard.copy_from_slice(img);
write_guard.mark_valid();
}
}
Ok(())
}
// Section 1.2: Public interface functions for working with immutable file pages.
pub fn read_immutable_buf(&self, file_id: FileId, blkno: u32) -> anyhow::Result<ReadBufResult> {
let mut cache_key = CacheKey::ImmutableFilePage { file_id, blkno };
self.lock_for_read(&mut cache_key)
}
//
// Section 2: Internal interface functions for lookup/update.
//
// To add support for a new kind of "thing" to cache, you will need
// to add public interface routines above, and code to deal with the
// "mappings" after this section. But the routines in this section should
// not require changes.
/// Look up a page in the cache.
///
/// If the search criteria is not exact, *cache_key is updated with the key
/// for exact key of the returned page. (For materialized pages, that means
/// that the LSN in 'cache_key' is updated with the LSN of the returned page
/// version.)
///
/// If no page is found, returns None and *cache_key is left unmodified.
///
fn try_lock_for_read(&self, cache_key: &mut CacheKey) -> Option<PageReadGuard> {
let cache_key_orig = cache_key.clone();
if let Some(slot_idx) = self.search_mapping(cache_key) {
// The page was found in the mapping. Lock the slot, and re-check
// that it's still what we expected (because we released the mapping
// lock already, another thread could have evicted the page)
let slot = &self.slots[slot_idx];
let inner = slot.inner.read().unwrap();
if inner.key.as_ref() == Some(cache_key) {
slot.inc_usage_count();
return Some(PageReadGuard(inner));
} else {
// search_mapping might have modified the search key; restore it.
*cache_key = cache_key_orig;
}
}
None
}
/// Return a locked buffer for given block.
///
/// Like try_lock_for_read(), if the search criteria is not exact and the
/// page is already found in the cache, *cache_key is updated.
///
/// If the page is not found in the cache, this allocates a new buffer for
/// it. The caller may then initialize the buffer with the contents, and
/// call mark_valid().
///
/// Example usage:
///
/// ```ignore
/// let cache = page_cache::get();
///
/// match cache.lock_for_read(&key) {
/// ReadBufResult::Found(read_guard) => {
/// // The page was found in cache. Use it
/// },
/// ReadBufResult::NotFound(write_guard) => {
/// // The page was not found in cache. Read it from disk into the
/// // buffer.
/// //read_my_page_from_disk(write_guard);
///
/// // The buffer contents are now valid. Tell the page cache.
/// write_guard.mark_valid();
/// },
/// }
/// ```
///
fn lock_for_read(&self, cache_key: &mut CacheKey) -> anyhow::Result<ReadBufResult> {
let (read_access, hit) = match cache_key {
CacheKey::MaterializedPage { .. } => {
unreachable!("Materialized pages use lookup_materialized_page")
}
CacheKey::ImmutableFilePage { .. } => (
&crate::metrics::PAGE_CACHE.read_accesses_immutable,
&crate::metrics::PAGE_CACHE.read_hits_immutable,
),
};
read_access.inc();
let mut is_first_iteration = true;
loop {
// First check if the key already exists in the cache.
if let Some(read_guard) = self.try_lock_for_read(cache_key) {
if is_first_iteration {
hit.inc();
}
return Ok(ReadBufResult::Found(read_guard));
}
is_first_iteration = false;
// Not found. Find a victim buffer
let (slot_idx, mut inner) =
self.find_victim().context("Failed to find evict victim")?;
// Insert mapping for this. At this point, we may find that another
// thread did the same thing concurrently. In that case, we evicted
// our victim buffer unnecessarily. Put it into the free list and
// continue with the slot that the other thread chose.
if let Some(_existing_slot_idx) = self.try_insert_mapping(cache_key, slot_idx) {
// TODO: put to free list
// We now just loop back to start from beginning. This is not
// optimal, we'll perform the lookup in the mapping again, which
// is not really necessary because we already got
// 'existing_slot_idx'. But this shouldn't happen often enough
// to matter much.
continue;
}
// Make the slot ready
let slot = &self.slots[slot_idx];
inner.key = Some(cache_key.clone());
slot.usage_count.store(1, Ordering::Relaxed);
return Ok(ReadBufResult::NotFound(PageWriteGuard {
inner,
valid: false,
}));
}
}
/// Look up a page in the cache and lock it in write mode. If it's not
/// found, returns None.
///
/// When locking a page for writing, the search criteria is always "exact".
fn try_lock_for_write(&self, cache_key: &CacheKey) -> Option<PageWriteGuard> {
if let Some(slot_idx) = self.search_mapping_for_write(cache_key) {
// The page was found in the mapping. Lock the slot, and re-check
// that it's still what we expected (because we don't released the mapping
// lock already, another thread could have evicted the page)
let slot = &self.slots[slot_idx];
let inner = slot.inner.write().unwrap();
if inner.key.as_ref() == Some(cache_key) {
slot.inc_usage_count();
return Some(PageWriteGuard { inner, valid: true });
}
}
None
}
/// Return a write-locked buffer for given block.
///
/// Similar to lock_for_read(), but the returned buffer is write-locked and
/// may be modified by the caller even if it's already found in the cache.
fn lock_for_write(&self, cache_key: &CacheKey) -> anyhow::Result<WriteBufResult> {
loop {
// First check if the key already exists in the cache.
if let Some(write_guard) = self.try_lock_for_write(cache_key) {
return Ok(WriteBufResult::Found(write_guard));
}
// Not found. Find a victim buffer
let (slot_idx, mut inner) =
self.find_victim().context("Failed to find evict victim")?;
// Insert mapping for this. At this point, we may find that another
// thread did the same thing concurrently. In that case, we evicted
// our victim buffer unnecessarily. Put it into the free list and
// continue with the slot that the other thread chose.
if let Some(_existing_slot_idx) = self.try_insert_mapping(cache_key, slot_idx) {
// TODO: put to free list
// We now just loop back to start from beginning. This is not
// optimal, we'll perform the lookup in the mapping again, which
// is not really necessary because we already got
// 'existing_slot_idx'. But this shouldn't happen often enough
// to matter much.
continue;
}
// Make the slot ready
let slot = &self.slots[slot_idx];
inner.key = Some(cache_key.clone());
slot.usage_count.store(1, Ordering::Relaxed);
return Ok(WriteBufResult::NotFound(PageWriteGuard {
inner,
valid: false,
}));
}
}
//
// Section 3: Mapping functions
//
/// Search for a page in the cache using the given search key.
///
/// Returns the slot index, if any. If the search criteria is not exact,
/// *cache_key is updated with the actual key of the found page.
///
/// NOTE: We don't hold any lock on the mapping on return, so the slot might
/// get recycled for an unrelated page immediately after this function
/// returns. The caller is responsible for re-checking that the slot still
/// contains the page with the same key before using it.
///
fn search_mapping(&self, cache_key: &mut CacheKey) -> Option<usize> {
match cache_key {
CacheKey::MaterializedPage { hash_key, lsn } => {
let map = self.materialized_page_map.read().unwrap();
let versions = map.get(hash_key)?;
let version_idx = match versions.binary_search_by_key(lsn, |v| v.lsn) {
Ok(version_idx) => version_idx,
Err(0) => return None,
Err(version_idx) => version_idx - 1,
};
let version = &versions[version_idx];
*lsn = version.lsn;
Some(version.slot_idx)
}
CacheKey::ImmutableFilePage { file_id, blkno } => {
let map = self.immutable_page_map.read().unwrap();
Some(*map.get(&(*file_id, *blkno))?)
}
}
}
/// Search for a page in the cache using the given search key.
///
/// Like 'search_mapping, but performs an "exact" search. Used for
/// allocating a new buffer.
fn search_mapping_for_write(&self, key: &CacheKey) -> Option<usize> {
match key {
CacheKey::MaterializedPage { hash_key, lsn } => {
let map = self.materialized_page_map.read().unwrap();
let versions = map.get(hash_key)?;
if let Ok(version_idx) = versions.binary_search_by_key(lsn, |v| v.lsn) {
Some(versions[version_idx].slot_idx)
} else {
None
}
}
CacheKey::ImmutableFilePage { file_id, blkno } => {
let map = self.immutable_page_map.read().unwrap();
Some(*map.get(&(*file_id, *blkno))?)
}
}
}
///
/// Remove mapping for given key.
///
fn remove_mapping(&self, old_key: &CacheKey) {
match old_key {
CacheKey::MaterializedPage {
hash_key: old_hash_key,
lsn: old_lsn,
} => {
let mut map = self.materialized_page_map.write().unwrap();
if let Entry::Occupied(mut old_entry) = map.entry(old_hash_key.clone()) {
let versions = old_entry.get_mut();
if let Ok(version_idx) = versions.binary_search_by_key(old_lsn, |v| v.lsn) {
versions.remove(version_idx);
self.size_metrics
.current_bytes_materialized_page
.sub_page_sz(1);
if versions.is_empty() {
old_entry.remove_entry();
}
}
} else {
panic!("could not find old key in mapping")
}
}
CacheKey::ImmutableFilePage { file_id, blkno } => {
let mut map = self.immutable_page_map.write().unwrap();
map.remove(&(*file_id, *blkno))
.expect("could not find old key in mapping");
self.size_metrics.current_bytes_immutable.sub_page_sz(1);
}
}
}
///
/// Insert mapping for given key.
///
/// If a mapping already existed for the given key, returns the slot index
/// of the existing mapping and leaves it untouched.
fn try_insert_mapping(&self, new_key: &CacheKey, slot_idx: usize) -> Option<usize> {
match new_key {
CacheKey::MaterializedPage {
hash_key: new_key,
lsn: new_lsn,
} => {
let mut map = self.materialized_page_map.write().unwrap();
let versions = map.entry(new_key.clone()).or_default();
match versions.binary_search_by_key(new_lsn, |v| v.lsn) {
Ok(version_idx) => Some(versions[version_idx].slot_idx),
Err(version_idx) => {
versions.insert(
version_idx,
Version {
lsn: *new_lsn,
slot_idx,
},
);
self.size_metrics
.current_bytes_materialized_page
.add_page_sz(1);
None
}
}
}
CacheKey::ImmutableFilePage { file_id, blkno } => {
let mut map = self.immutable_page_map.write().unwrap();
match map.entry((*file_id, *blkno)) {
Entry::Occupied(entry) => Some(*entry.get()),
Entry::Vacant(entry) => {
entry.insert(slot_idx);
self.size_metrics.current_bytes_immutable.add_page_sz(1);
None
}
}
}
}
}
//
// Section 4: Misc internal helpers
//
/// Find a slot to evict.
///
/// On return, the slot is empty and write-locked.
fn find_victim(&self) -> anyhow::Result<(usize, RwLockWriteGuard<SlotInner>)> {
let iter_limit = self.slots.len() * 10;
let mut iters = 0;
loop {
iters += 1;
let slot_idx = self.next_evict_slot.fetch_add(1, Ordering::Relaxed) % self.slots.len();
let slot = &self.slots[slot_idx];
if slot.dec_usage_count() == 0 {
let mut inner = match slot.inner.try_write() {
Ok(inner) => inner,
Err(TryLockError::Poisoned(err)) => {
anyhow::bail!("buffer lock was poisoned: {err:?}")
}
Err(TryLockError::WouldBlock) => {
// If we have looped through the whole buffer pool 10 times
// and still haven't found a victim buffer, something's wrong.
// Maybe all the buffers were in locked. That could happen in
// theory, if you have more threads holding buffers locked than
// there are buffers in the pool. In practice, with a reasonably
// large buffer pool it really shouldn't happen.
if iters > iter_limit {
anyhow::bail!("exceeded evict iter limit");
}
continue;
}
};
if let Some(old_key) = &inner.key {
// remove mapping for old buffer
self.remove_mapping(old_key);
inner.key = None;
}
return Ok((slot_idx, inner));
}
}
}
/// Initialize a new page cache
///
/// This should be called only once at page server startup.
fn new(num_pages: usize) -> Self {
assert!(num_pages > 0, "page cache size must be > 0");
let page_buffer = Box::leak(vec![0u8; num_pages * PAGE_SZ].into_boxed_slice());
let size_metrics = &crate::metrics::PAGE_CACHE_SIZE;
size_metrics.max_bytes.set_page_sz(num_pages);
size_metrics.current_bytes_immutable.set_page_sz(0);
size_metrics.current_bytes_materialized_page.set_page_sz(0);
let slots = page_buffer
.chunks_exact_mut(PAGE_SZ)
.map(|chunk| {
let buf: &mut [u8; PAGE_SZ] = chunk.try_into().unwrap();
Slot {
inner: RwLock::new(SlotInner { key: None, buf }),
usage_count: AtomicU8::new(0),
}
})
.collect();
Self {
materialized_page_map: Default::default(),
immutable_page_map: Default::default(),
slots,
next_evict_slot: AtomicUsize::new(0),
size_metrics,
}
}
}
trait PageSzBytesMetric {
fn set_page_sz(&self, count: usize);
fn add_page_sz(&self, count: usize);
fn sub_page_sz(&self, count: usize);
}
#[inline(always)]
fn count_times_page_sz(count: usize) -> u64 {
u64::try_from(count).unwrap() * u64::try_from(PAGE_SZ).unwrap()
}
impl PageSzBytesMetric for metrics::UIntGauge {
fn set_page_sz(&self, count: usize) {
self.set(count_times_page_sz(count));
}
fn add_page_sz(&self, count: usize) {
self.add(count_times_page_sz(count));
}
fn sub_page_sz(&self, count: usize) {
self.sub(count_times_page_sz(count));
}
}

View File

@@ -292,6 +292,8 @@ pub enum TaskKind {
DebugTool,
BackgroundRuntimeTurnaroundMeasure,
#[cfg(test)]
UnitTest,
}

View File

@@ -11,11 +11,12 @@
//! len < 128: 0XXXXXXX
//! len >= 128: 1XXXXXXX XXXXXXXX XXXXXXXX XXXXXXXX
//!
use crate::page_cache::PAGE_SZ;
use crate::tenant::block_io::BlockCursor;
use std::cmp::min;
use std::io::{Error, ErrorKind};
use super::disk_btree::PAGE_SZ;
impl<'a> BlockCursor<'a> {
/// Read a blob into a new buffer.
pub async fn read_blob(&self, offset: u64) -> Result<Vec<u8>, std::io::Error> {
@@ -33,7 +34,7 @@ impl<'a> BlockCursor<'a> {
let mut blknum = (offset / PAGE_SZ as u64) as u32;
let mut off = (offset % PAGE_SZ as u64) as usize;
let mut buf = self.read_blk(blknum)?;
let mut buf = self.read_blk(blknum).await?;
// peek at the first byte, to determine if it's a 1- or 4-byte length
let first_len_byte = buf[off];
@@ -49,7 +50,7 @@ impl<'a> BlockCursor<'a> {
// it is split across two pages
len_buf[..thislen].copy_from_slice(&buf[off..PAGE_SZ]);
blknum += 1;
buf = self.read_blk(blknum)?;
buf = self.read_blk(blknum).await?;
len_buf[thislen..].copy_from_slice(&buf[0..4 - thislen]);
off = 4 - thislen;
} else {
@@ -70,7 +71,7 @@ impl<'a> BlockCursor<'a> {
if page_remain == 0 {
// continue on next page
blknum += 1;
buf = self.read_blk(blknum)?;
buf = self.read_blk(blknum).await?;
off = 0;
page_remain = PAGE_SZ;
}

View File

@@ -4,7 +4,7 @@
use super::ephemeral_file::EphemeralFile;
use super::storage_layer::delta_layer::{Adapter, DeltaLayerInner};
use crate::page_cache::{self, PageReadGuard, ReadBufResult, PAGE_SZ};
use crate::tenant::disk_btree::PAGE_SZ;
use crate::virtual_file::VirtualFile;
use bytes::Bytes;
use std::fs::File;
@@ -36,22 +36,22 @@ where
/// Reference to an in-memory copy of an immutable on-disk block.
pub enum BlockLease<'a> {
PageReadGuard(PageReadGuard<'static>),
PageReadGuard(crate::buffer_pool::Buffer),
EphemeralFileMutableTail(&'a [u8; PAGE_SZ]),
#[cfg(test)]
Rc(std::rc::Rc<[u8; PAGE_SZ]>),
Arc(std::sync::Arc<[u8; PAGE_SZ]>),
}
impl From<PageReadGuard<'static>> for BlockLease<'static> {
fn from(value: PageReadGuard<'static>) -> BlockLease<'static> {
impl From<crate::buffer_pool::Buffer> for BlockLease<'static> {
fn from(value: crate::buffer_pool::Buffer) -> BlockLease<'static> {
BlockLease::PageReadGuard(value)
}
}
#[cfg(test)]
impl<'a> From<std::rc::Rc<[u8; PAGE_SZ]>> for BlockLease<'a> {
fn from(value: std::rc::Rc<[u8; PAGE_SZ]>) -> Self {
BlockLease::Rc(value)
impl<'a> From<std::sync::Arc<[u8; PAGE_SZ]>> for BlockLease<'a> {
fn from(value: std::sync::Arc<[u8; PAGE_SZ]>) -> Self {
BlockLease::Arc(value)
}
}
@@ -63,7 +63,7 @@ impl<'a> Deref for BlockLease<'a> {
BlockLease::PageReadGuard(v) => v.deref(),
BlockLease::EphemeralFileMutableTail(v) => v,
#[cfg(test)]
BlockLease::Rc(v) => v.deref(),
BlockLease::Arc(v) => v.deref(),
}
}
}
@@ -74,7 +74,7 @@ impl<'a> Deref for BlockLease<'a> {
/// Unlike traits, we also support the read function to be async though.
pub(crate) enum BlockReaderRef<'a> {
FileBlockReaderVirtual(&'a FileBlockReader<VirtualFile>),
FileBlockReaderFile(&'a FileBlockReader<std::fs::File>),
// FileBlockReaderFile(&'a FileBlockReader<std::fs::File>),
EphemeralFile(&'a EphemeralFile),
Adapter(Adapter<&'a DeltaLayerInner>),
#[cfg(test)]
@@ -83,13 +83,13 @@ pub(crate) enum BlockReaderRef<'a> {
impl<'a> BlockReaderRef<'a> {
#[inline(always)]
fn read_blk(&self, blknum: u32) -> Result<BlockLease, std::io::Error> {
async fn read_blk(&self, blknum: u32) -> Result<BlockLease, std::io::Error> {
use BlockReaderRef::*;
match self {
FileBlockReaderVirtual(r) => r.read_blk(blknum),
FileBlockReaderFile(r) => r.read_blk(blknum),
EphemeralFile(r) => r.read_blk(blknum),
Adapter(r) => r.read_blk(blknum),
FileBlockReaderVirtual(r) => r.read_blk(blknum).await,
// FileBlockReaderFile(r) => r.read_blk(blknum).await,
EphemeralFile(r) => r.read_blk(blknum).await,
Adapter(r) => r.read_blk(blknum).await,
#[cfg(test)]
TestDisk(r) => r.read_blk(blknum),
}
@@ -134,8 +134,8 @@ impl<'a> BlockCursor<'a> {
/// access to the contents of the page. (For the page cache, the
/// lease object represents a lock on the buffer.)
#[inline(always)]
pub fn read_blk(&self, blknum: u32) -> Result<BlockLease, std::io::Error> {
self.reader.read_blk(blknum)
pub async fn read_blk(&self, blknum: u32) -> Result<BlockLease, std::io::Error> {
self.reader.read_blk(blknum).await
}
}
@@ -145,61 +145,51 @@ impl<'a> BlockCursor<'a> {
/// for modifying the file, nor for invalidating the cache if it is modified.
pub struct FileBlockReader<F> {
pub file: F,
/// Unique ID of this file, used as key in the page cache.
file_id: page_cache::FileId,
}
impl<F> FileBlockReader<F>
where
F: FileExt,
{
impl<F> FileBlockReader<F> {
pub fn new(file: F) -> Self {
let file_id = page_cache::next_file_id();
FileBlockReader { file_id, file }
FileBlockReader { file }
}
}
/// Read a page from the underlying file into given buffer.
fn fill_buffer(&self, buf: &mut [u8], blkno: u32) -> Result<(), std::io::Error> {
assert!(buf.len() == PAGE_SZ);
self.file.read_exact_at(buf, blkno as u64 * PAGE_SZ as u64)
}
/// Read a block.
///
/// Returns a "lease" object that can be used to
/// access to the contents of the page. (For the page cache, the
/// lease object represents a lock on the buffer.)
pub fn read_blk(&self, blknum: u32) -> Result<BlockLease, std::io::Error> {
let cache = page_cache::get();
loop {
match cache
.read_immutable_buf(self.file_id, blknum)
.map_err(|e| {
std::io::Error::new(
std::io::ErrorKind::Other,
format!("Failed to read immutable buf: {e:#}"),
)
})? {
ReadBufResult::Found(guard) => break Ok(guard.into()),
ReadBufResult::NotFound(mut write_guard) => {
// Read the page from disk into the buffer
self.fill_buffer(write_guard.deref_mut(), blknum)?;
write_guard.mark_valid();
// Swap for read lock
continue;
}
};
macro_rules! impls {
(FileBlockReader<$ty:ty>) => {
impl FileBlockReader<$ty> {
/// Read a page from the underlying file into given buffer.
async fn fill_buffer(
&self,
buf: crate::buffer_pool::Buffer,
blkno: u32,
) -> Result<crate::buffer_pool::Buffer, std::io::Error> {
assert!(buf.len() == PAGE_SZ);
self.file
.read_exact_at_async(buf, blkno as u64 * PAGE_SZ as u64)
.await
}
/// Read a block.
///
/// Returns a "lease" object that can be used to
/// access to the contents of the page. (For the page cache, the
/// lease object represents a lock on the buffer.)
pub async fn read_blk(&self, blknum: u32) -> Result<BlockLease, std::io::Error> {
let buf = crate::buffer_pool::get();
// Read the page from disk into the buffer
let mut write_guard = self.fill_buffer(buf, blknum).await?;
Ok(BlockLease::PageReadGuard(write_guard))
}
}
}
};
}
impl BlockReader for FileBlockReader<File> {
fn block_cursor(&self) -> BlockCursor<'_> {
BlockCursor::new(BlockReaderRef::FileBlockReaderFile(self))
}
}
// impls!(FileBlockReader<File>);
impls!(FileBlockReader<VirtualFile>);
// impl BlockReader for FileBlockReader<File> {
// fn block_cursor(&self) -> BlockCursor<'_> {
// BlockCursor::new(BlockReaderRef::FileBlockReaderFile(self))
// }
// }
impl BlockReader for FileBlockReader<VirtualFile> {
fn block_cursor(&self) -> BlockCursor<'_> {

View File

@@ -262,7 +262,7 @@ where
let block_cursor = self.reader.block_cursor();
while let Some((node_blknum, opt_iter)) = stack.pop() {
// Locate the node.
let node_buf = block_cursor.read_blk(self.start_blk + node_blknum)?;
let node_buf = block_cursor.read_blk(self.start_blk + node_blknum).await?;
let node = OnDiskNode::deparse(node_buf.as_ref())?;
let prefix_len = node.prefix_len as usize;
@@ -357,7 +357,7 @@ where
let block_cursor = self.reader.block_cursor();
while let Some((blknum, path, depth, child_idx, key_off)) = stack.pop() {
let blk = block_cursor.read_blk(self.start_blk + blknum)?;
let blk = block_cursor.read_blk(self.start_blk + blknum).await?;
let buf: &[u8] = blk.as_ref();
let node = OnDiskNode::<L>::deparse(buf)?;
@@ -704,7 +704,7 @@ pub(crate) mod tests {
pub(crate) fn read_blk(&self, blknum: u32) -> io::Result<BlockLease> {
let mut buf = [0u8; PAGE_SZ];
buf.copy_from_slice(&self.blocks[blknum as usize]);
Ok(std::rc::Rc::new(buf).into())
Ok(std::sync::Arc::new(buf).into())
}
}
impl BlockReader for TestDisk {

View File

@@ -2,22 +2,19 @@
//! used to keep in-memory layers spilled on disk.
use crate::config::PageServerConf;
use crate::page_cache::{self, PAGE_SZ};
use crate::tenant::block_io::{BlockCursor, BlockLease, BlockReader};
use crate::tenant::disk_btree::PAGE_SZ;
use crate::virtual_file::VirtualFile;
use std::cmp::min;
use std::fs::OpenOptions;
use std::io::{self, ErrorKind};
use std::ops::DerefMut;
use std::os::unix::prelude::FileExt;
use std::path::PathBuf;
use std::sync::atomic::AtomicU64;
use tracing::*;
use utils::id::{TenantId, TimelineId};
pub struct EphemeralFile {
page_cache_file_id: page_cache::FileId,
_tenant_id: TenantId,
_timeline_id: TimelineId,
file: VirtualFile,
@@ -48,7 +45,6 @@ impl EphemeralFile {
)?;
Ok(EphemeralFile {
page_cache_file_id: page_cache::next_file_id(),
_tenant_id: tenant_id,
_timeline_id: timeline_id,
file,
@@ -61,40 +57,17 @@ impl EphemeralFile {
self.len
}
pub(crate) fn read_blk(&self, blknum: u32) -> Result<BlockLease, io::Error> {
pub(crate) async fn read_blk(&self, blknum: u32) -> Result<BlockLease, io::Error> {
let flushed_blknums = 0..self.len / PAGE_SZ as u64;
if flushed_blknums.contains(&(blknum as u64)) {
let cache = page_cache::get();
loop {
match cache
.read_immutable_buf(self.page_cache_file_id, blknum)
.map_err(|e| {
std::io::Error::new(
std::io::ErrorKind::Other,
// order path before error because error is anyhow::Error => might have many contexts
format!(
"ephemeral file: read immutable page #{}: {}: {:#}",
blknum,
self.file.path.display(),
e,
),
)
})? {
page_cache::ReadBufResult::Found(guard) => {
return Ok(BlockLease::PageReadGuard(guard))
}
page_cache::ReadBufResult::NotFound(mut write_guard) => {
let buf: &mut [u8] = write_guard.deref_mut();
debug_assert_eq!(buf.len(), PAGE_SZ);
self.file
.read_exact_at(&mut buf[..], blknum as u64 * PAGE_SZ as u64)?;
write_guard.mark_valid();
// Swap for read lock
continue;
}
};
}
let mut write_guard: crate::buffer_pool::Buffer = crate::buffer_pool::get();
let buf: &mut [u8] = write_guard.deref_mut();
debug_assert_eq!(buf.len(), PAGE_SZ);
let mut buf = self
.file
.read_exact_at_async(write_guard, blknum as u64 * PAGE_SZ as u64)
.await?;
Ok(BlockLease::PageReadGuard(buf))
} else {
debug_assert_eq!(blknum as u64, self.len / PAGE_SZ as u64);
Ok(BlockLease::EphemeralFileMutableTail(&self.mutable_tail))
@@ -132,29 +105,6 @@ impl EphemeralFile {
self.blknum as u64 * PAGE_SZ as u64,
) {
Ok(_) => {
// Pre-warm the page cache with what we just wrote.
// This isn't necessary for coherency/correctness, but it's how we've always done it.
let cache = page_cache::get();
match cache.read_immutable_buf(
self.ephemeral_file.page_cache_file_id,
self.blknum,
) {
Ok(page_cache::ReadBufResult::Found(_guard)) => {
// This function takes &mut self, so, it shouldn't be possible to reach this point.
unreachable!("we just wrote blknum {} and this function takes &mut self, so, no concurrent read_blk is possible", self.blknum);
}
Ok(page_cache::ReadBufResult::NotFound(mut write_guard)) => {
let buf: &mut [u8] = write_guard.deref_mut();
debug_assert_eq!(buf.len(), PAGE_SZ);
buf.copy_from_slice(&self.ephemeral_file.mutable_tail);
write_guard.mark_valid();
// pre-warm successful
}
Err(e) => {
error!("ephemeral_file write_blob failed to get immutable buf to pre-warm page cache: {e:?}");
// fail gracefully, it's not the end of the world if we can't pre-warm the cache here
}
}
// Zero the buffer for re-use.
// Zeroing is critical for correcntess because the write_blob code below
// and similarly read_blk expect zeroed pages.

View File

@@ -26,7 +26,7 @@
//! recovered from this file. This is tracked in
//! <https://github.com/neondatabase/neon/issues/4418>
use std::io::{self, Read, Write};
use std::io::{self, Write};
use crate::virtual_file::VirtualFile;
use anyhow::Result;
@@ -151,11 +151,12 @@ impl Manifest {
/// Load a manifest. Returns the manifest and a list of operations. If the manifest is corrupted,
/// the bool flag will be set to true and the user is responsible to reconstruct a new manifest and
/// backup the current one.
pub fn load(
mut file: VirtualFile,
pub async fn load(
file: VirtualFile,
) -> Result<(Self, Vec<Operation>, ManifestPartiallyCorrupted), ManifestLoadError> {
let mut buf = vec![];
file.read_to_end(&mut buf).map_err(ManifestLoadError::Io)?;
file.read_exact_at(&mut buf, 0)
.map_err(ManifestLoadError::Io)?;
// Read manifest header
let mut buf = Bytes::from(buf);
@@ -241,8 +242,8 @@ mod tests {
use super::*;
#[test]
fn test_read_manifest() {
#[tokio::test]
async fn test_read_manifest() {
let testdir = crate::config::PageServerConf::test_repo_dir("test_read_manifest");
std::fs::create_dir_all(&testdir).unwrap();
let file = VirtualFile::create(&testdir.join("MANIFEST")).unwrap();
@@ -274,7 +275,7 @@ mod tests {
.truncate(false),
)
.unwrap();
let (mut manifest, operations, corrupted) = Manifest::load(file).unwrap();
let (mut manifest, operations, corrupted) = Manifest::load(file).await.unwrap();
assert!(!corrupted.0);
assert_eq!(operations.len(), 2);
assert_eq!(
@@ -306,7 +307,7 @@ mod tests {
.truncate(false),
)
.unwrap();
let (_manifest, operations, corrupted) = Manifest::load(file).unwrap();
let (_manifest, operations, corrupted) = Manifest::load(file).await.unwrap();
assert!(!corrupted.0);
assert_eq!(operations.len(), 3);
assert_eq!(&operations[0], &Operation::Snapshot(snapshot, Lsn::from(0)));

View File

@@ -29,7 +29,7 @@
//!
use crate::config::PageServerConf;
use crate::context::RequestContext;
use crate::page_cache::PAGE_SZ;
use crate::tenant::disk_btree::PAGE_SZ;
use crate::repository::{Key, Value, KEY_SIZE};
use crate::tenant::blob_io::{BlobWriter, WriteBlobWriter};
use crate::tenant::block_io::{BlockBuf, BlockCursor, BlockLease, BlockReader, FileBlockReader};
@@ -45,8 +45,8 @@ use pageserver_api::models::{HistoricLayerInfo, LayerAccessKind};
use rand::{distributions::Alphanumeric, Rng};
use serde::{Deserialize, Serialize};
use std::fs::{self, File};
use std::io::SeekFrom;
use std::io::{BufWriter, Write};
use std::io::{Seek, SeekFrom};
use std::ops::Range;
use std::os::unix::fs::FileExt;
use std::path::{Path, PathBuf};
@@ -467,7 +467,7 @@ impl DeltaLayer {
PathOrConf::Path(_) => None,
};
let loaded = DeltaLayerInner::load(&path, summary)?;
let loaded = DeltaLayerInner::load(&path, summary).await?;
if let PathOrConf::Path(ref path) = self.path_or_conf {
// not production code
@@ -841,12 +841,16 @@ impl Drop for DeltaLayerWriter {
}
impl DeltaLayerInner {
pub(super) fn load(path: &std::path::Path, summary: Option<Summary>) -> anyhow::Result<Self> {
let file = VirtualFile::open(path)
pub(super) async fn load(
path: &std::path::Path,
summary: Option<Summary>,
) -> anyhow::Result<Self> {
let file = VirtualFile::open_async(path)
.await
.with_context(|| format!("Failed to open file '{}'", path.display()))?;
let file = FileBlockReader::new(file);
let summary_blk = file.read_blk(0)?;
let summary_blk = file.read_blk(0).await?;
let actual_summary = Summary::des_prefix(summary_blk.as_ref())?;
if let Some(mut expected_summary) = summary {
@@ -854,11 +858,11 @@ impl DeltaLayerInner {
expected_summary.index_start_blk = actual_summary.index_start_blk;
expected_summary.index_root_blk = actual_summary.index_root_blk;
if actual_summary != expected_summary {
bail!(
"in-file summary does not match expected summary. actual = {:?} expected = {:?}",
actual_summary,
expected_summary
);
// bail!(
// "in-file summary does not match expected summary. actual = {:?} expected = {:?}",
// actual_summary,
// expected_summary
// );
}
}
@@ -1028,7 +1032,7 @@ impl<'a> ValueRef<'a> {
pub(crate) struct Adapter<T>(T);
impl<T: AsRef<DeltaLayerInner>> Adapter<T> {
pub(crate) fn read_blk(&self, blknum: u32) -> Result<BlockLease, std::io::Error> {
self.0.as_ref().file.read_blk(blknum)
pub(crate) async fn read_blk(&self, blknum: u32) -> Result<BlockLease, std::io::Error> {
self.0.as_ref().file.read_blk(blknum).await
}
}

View File

@@ -25,7 +25,7 @@
//! actual page images are stored in the "values" part.
use crate::config::PageServerConf;
use crate::context::RequestContext;
use crate::page_cache::PAGE_SZ;
use crate::tenant::disk_btree::PAGE_SZ;
use crate::repository::{Key, KEY_SIZE};
use crate::tenant::blob_io::{BlobWriter, WriteBlobWriter};
use crate::tenant::block_io::{BlockBuf, BlockReader, FileBlockReader};
@@ -42,8 +42,8 @@ use pageserver_api::models::{HistoricLayerInfo, LayerAccessKind};
use rand::{distributions::Alphanumeric, Rng};
use serde::{Deserialize, Serialize};
use std::fs::{self, File};
use std::io::SeekFrom;
use std::io::Write;
use std::io::{Seek, SeekFrom};
use std::ops::Range;
use std::os::unix::prelude::FileExt;
use std::path::{Path, PathBuf};
@@ -349,7 +349,8 @@ impl ImageLayer {
PathOrConf::Path(_) => None,
};
let loaded = ImageLayerInner::load(&path, self.desc.image_layer_lsn(), expected_summary)?;
let loaded =
ImageLayerInner::load(&path, self.desc.image_layer_lsn(), expected_summary).await?;
if let PathOrConf::Path(ref path) = self.path_or_conf {
// not production code
@@ -432,15 +433,16 @@ impl ImageLayer {
}
impl ImageLayerInner {
pub(super) fn load(
pub(super) async fn load(
path: &std::path::Path,
lsn: Lsn,
summary: Option<Summary>,
) -> anyhow::Result<Self> {
let file = VirtualFile::open(path)
let file = VirtualFile::open_async(path)
.await
.with_context(|| format!("Failed to open file '{}'", path.display()))?;
let file = FileBlockReader::new(file);
let summary_blk = file.read_blk(0)?;
let summary_blk = file.read_blk(0).await?;
let actual_summary = Summary::des_prefix(summary_blk.as_ref())?;
if let Some(mut expected_summary) = summary {
@@ -449,11 +451,11 @@ impl ImageLayerInner {
expected_summary.index_root_blk = actual_summary.index_root_blk;
if actual_summary != expected_summary {
bail!(
"in-file summary does not match expected summary. actual = {:?} expected = {:?}",
actual_summary,
expected_summary
);
// bail!(
// "in-file summary does not match expected summary. actual = {:?} expected = {:?}",
// actual_summary,
// expected_summary
// );
}
}

View File

@@ -38,6 +38,7 @@ use std::time::{Duration, Instant, SystemTime};
use crate::context::{
AccessStatsBehavior, DownloadBehavior, RequestContext, RequestContextBuilder,
};
use crate::tenant::disk_btree::PAGE_SZ;
use crate::tenant::remote_timeline_client::index::LayerFileMetadata;
use crate::tenant::storage_layer::delta_layer::DeltaEntry;
use crate::tenant::storage_layer::{
@@ -73,7 +74,6 @@ use utils::{
simple_rcu::{Rcu, RcuReadGuard},
};
use crate::page_cache;
use crate::repository::GcResult;
use crate::repository::{Key, Value};
use crate::task_mgr;
@@ -465,7 +465,7 @@ impl Timeline {
// The cached image can be returned directly if there is no WAL between the cached image
// and requested LSN. The cached image can also be used to reduce the amount of WAL needed
// for redo.
let cached_page_img = match self.lookup_cached_page(&key, lsn) {
let cached_page_img = match self.lookup_cached_page(&key, lsn).await {
Some((cached_lsn, cached_img)) => {
match cached_lsn.cmp(&lsn) {
Ordering::Less => {} // there might be WAL between cached_lsn and lsn, we need to check
@@ -494,6 +494,7 @@ impl Timeline {
RECONSTRUCT_TIME
.observe_closure_duration(|| self.reconstruct_value(key, lsn, reconstruct_state))
.await
}
/// Get last or prev record separately. Same as get_last_record_rlsn().last/prev.
@@ -630,38 +631,38 @@ impl Timeline {
) -> anyhow::Result<()> {
const ROUNDS: usize = 2;
static CONCURRENT_COMPACTIONS: once_cell::sync::Lazy<tokio::sync::Semaphore> =
once_cell::sync::Lazy::new(|| {
let total_threads = *task_mgr::BACKGROUND_RUNTIME_WORKER_THREADS;
let permits = usize::max(
1,
// while a lot of the work is done on spawn_blocking, we still do
// repartitioning in the async context. this should give leave us some workers
// unblocked to be blocked on other work, hopefully easing any outside visible
// effects of restarts.
//
// 6/8 is a guess; previously we ran with unlimited 8 and more from
// spawn_blocking.
(total_threads * 3).checked_div(4).unwrap_or(0),
);
assert_ne!(permits, 0, "we will not be adding in permits later");
assert!(
permits < total_threads,
"need threads avail for shorter work"
);
tokio::sync::Semaphore::new(permits)
});
// static CONCURRENT_COMPACTIONS: once_cell::sync::Lazy<tokio::sync::Semaphore> =
// once_cell::sync::Lazy::new(|| {
// let total_threads = *task_mgr::BACKGROUND_RUNTIME_WORKER_THREADS;
// let permits = usize::max(
// 1,
// // while a lot of the work is done on spawn_blocking, we still do
// // repartitioning in the async context. this should give leave us some workers
// // unblocked to be blocked on other work, hopefully easing any outside visible
// // effects of restarts.
// //
// // 6/8 is a guess; previously we ran with unlimited 8 and more from
// // spawn_blocking.
// (total_threads * 3).checked_div(4).unwrap_or(0),
// );
// assert_ne!(permits, 0, "we will not be adding in permits later");
// assert!(
// permits < total_threads,
// "need threads avail for shorter work"
// );
// tokio::sync::Semaphore::new(permits)
// });
// this wait probably never needs any "long time spent" logging, because we already nag if
// compaction task goes over it's period (20s) which is quite often in production.
let _permit = tokio::select! {
permit = CONCURRENT_COMPACTIONS.acquire() => {
permit
},
_ = cancel.cancelled() => {
return Ok(());
}
};
// // this wait probably never needs any "long time spent" logging, because we already nag if
// // compaction task goes over it's period (20s) which is quite often in production.
// let _permit = tokio::select! {
// permit = CONCURRENT_COMPACTIONS.acquire() => {
// permit
// },
// _ = cancel.cancelled() => {
// return Ok(());
// }
// };
let last_record_lsn = self.get_last_record_lsn();
@@ -2443,15 +2444,8 @@ impl Timeline {
}
}
fn lookup_cached_page(&self, key: &Key, lsn: Lsn) -> Option<(Lsn, Bytes)> {
let cache = page_cache::get();
// FIXME: It's pointless to check the cache for things that are not 8kB pages.
// We should look at the key to determine if it's a cacheable object
let (lsn, read_guard) =
cache.lookup_materialized_page(self.tenant_id, self.timeline_id, key, lsn)?;
let img = Bytes::from(read_guard.to_vec());
Some((lsn, img))
async fn lookup_cached_page(&self, key: &Key, lsn: Lsn) -> Option<(Lsn, Bytes)> {
None
}
fn get_ancestor_timeline(&self) -> anyhow::Result<Arc<Timeline>> {
@@ -3366,7 +3360,7 @@ impl Timeline {
// Determine N largest holes where N is number of compacted layers.
let max_holes = deltas_to_compact.len();
let last_record_lsn = self.get_last_record_lsn();
let min_hole_range = (target_file_size / page_cache::PAGE_SZ as u64) as i128;
let min_hole_range = (target_file_size / PAGE_SZ as u64) as i128;
let min_hole_coverage_size = 3; // TODO: something more flexible?
// min-heap (reserve space for one more element added before eviction)
@@ -3602,7 +3596,7 @@ impl Timeline {
// Add two pages for potential overhead. This should in theory be already
// accounted for in the target calculation, but for very small targets,
// we still might easily hit the limit otherwise.
let warn_limit = target_file_size * 2 + page_cache::PAGE_SZ as u64 * 2;
let warn_limit = target_file_size * 2 + PAGE_SZ as u64 * 2;
for layer in new_layers.iter() {
if layer.layer_desc().file_size > warn_limit {
warn!(
@@ -4131,7 +4125,7 @@ impl Timeline {
///
/// Reconstruct a value, using the given base image and WAL records in 'data'.
///
fn reconstruct_value(
async fn reconstruct_value(
&self,
key: Key,
request_lsn: Lsn,
@@ -4190,22 +4184,6 @@ impl Timeline {
Err(e) => return Err(PageReconstructError::from(e)),
};
if img.len() == page_cache::PAGE_SZ {
let cache = page_cache::get();
if let Err(e) = cache
.memorize_materialized_page(
self.tenant_id,
self.timeline_id,
key,
last_rec_lsn,
&img,
)
.context("Materialized page memoization failed")
{
return Err(PageReconstructError::from(e));
}
}
Ok(img)
}
}

View File

@@ -11,13 +11,15 @@
//! src/backend/storage/file/fd.c
//!
use crate::metrics::{STORAGE_IO_SIZE, STORAGE_IO_TIME};
use once_cell::sync::OnceCell;
use std::fs::{self, File, OpenOptions};
use std::io::{Error, ErrorKind, Read, Seek, SeekFrom, Write};
use std::io::{Error, ErrorKind, Seek, SeekFrom, Write};
use std::os::fd::OwnedFd;
use std::os::unix::fs::FileExt;
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::{RwLock, RwLockWriteGuard};
use std::sync::atomic::AtomicBool;
use std::sync::{Arc, Mutex};
///
/// A virtual file descriptor. You can use this just like std::fs::File, but internally
@@ -39,7 +41,7 @@ pub struct VirtualFile {
/// Lazy handle to the global file descriptor cache. The slot that this points to
/// might contain our File, or it may be empty, or it may contain a File that
/// belongs to a different VirtualFile.
handle: RwLock<SlotHandle>,
handle: Arc<Mutex<Option<File>>>, // only transiently None
/// Current file position
pos: u64,
@@ -51,7 +53,6 @@ pub struct VirtualFile {
/// opened, in the VirtualFile::create() function, and strip the flag before
/// storing it here.
pub path: PathBuf,
open_options: OpenOptions,
// These are strings becase we only use them for metrics, and those expect strings.
// It makes no sense for us to constantly turn the `TimelineId` and `TenantId` into
@@ -60,118 +61,6 @@ pub struct VirtualFile {
timeline_id: String,
}
#[derive(Debug, PartialEq, Clone, Copy)]
struct SlotHandle {
/// Index into OPEN_FILES.slots
index: usize,
/// Value of 'tag' in the slot. If slot's tag doesn't match, then the slot has
/// been recycled and no longer contains the FD for this virtual file.
tag: u64,
}
/// OPEN_FILES is the global array that holds the physical file descriptors that
/// are currently open. Each slot in the array is protected by a separate lock,
/// so that different files can be accessed independently. The lock must be held
/// in write mode to replace the slot with a different file, but a read mode
/// is enough to operate on the file, whether you're reading or writing to it.
///
/// OPEN_FILES starts in uninitialized state, and it's initialized by
/// the virtual_file::init() function. It must be called exactly once at page
/// server startup.
static OPEN_FILES: OnceCell<OpenFiles> = OnceCell::new();
struct OpenFiles {
slots: &'static [Slot],
/// clock arm for the clock algorithm
next: AtomicUsize,
}
struct Slot {
inner: RwLock<SlotInner>,
/// has this file been used since last clock sweep?
recently_used: AtomicBool,
}
struct SlotInner {
/// Counter that's incremented every time a different file is stored here.
/// To avoid the ABA problem.
tag: u64,
/// the underlying file
file: Option<File>,
}
impl OpenFiles {
/// Find a slot to use, evicting an existing file descriptor if needed.
///
/// On return, we hold a lock on the slot, and its 'tag' has been updated
/// recently_used has been set. It's all ready for reuse.
fn find_victim_slot(&self) -> (SlotHandle, RwLockWriteGuard<SlotInner>) {
//
// Run the clock algorithm to find a slot to replace.
//
let num_slots = self.slots.len();
let mut retries = 0;
let mut slot;
let mut slot_guard;
let index;
loop {
let next = self.next.fetch_add(1, Ordering::AcqRel) % num_slots;
slot = &self.slots[next];
// If the recently_used flag on this slot is set, continue the clock
// sweep. Otherwise try to use this slot. If we cannot acquire the
// lock, also continue the clock sweep.
//
// We only continue in this manner for a while, though. If we loop
// through the array twice without finding a victim, just pick the
// next slot and wait until we can reuse it. This way, we avoid
// spinning in the extreme case that all the slots are busy with an
// I/O operation.
if retries < num_slots * 2 {
if !slot.recently_used.swap(false, Ordering::Release) {
if let Ok(guard) = slot.inner.try_write() {
slot_guard = guard;
index = next;
break;
}
}
retries += 1;
} else {
slot_guard = slot.inner.write().unwrap();
index = next;
break;
}
}
//
// We now have the victim slot locked. If it was in use previously, close the
// old file.
//
if let Some(old_file) = slot_guard.file.take() {
// the normal path of dropping VirtualFile uses "close", use "close-by-replace" here to
// distinguish the two.
STORAGE_IO_TIME
.with_label_values(&["close-by-replace"])
.observe_closure_duration(|| drop(old_file));
}
// Prepare the slot for reuse and return it
slot_guard.tag += 1;
slot.recently_used.store(true, Ordering::Relaxed);
(
SlotHandle {
index,
tag: slot_guard.tag,
},
slot_guard,
)
}
}
impl VirtualFile {
/// Open a file in read-only mode. Like File::open.
pub fn open(path: &Path) -> Result<VirtualFile, std::io::Error> {
@@ -207,7 +96,6 @@ impl VirtualFile {
tenant_id = "*".to_string();
timeline_id = "*".to_string();
}
let (handle, mut slot_guard) = get_open_files().find_victim_slot();
let file = STORAGE_IO_TIME
.with_label_values(&["open"])
.observe_closure_duration(|| open_options.open(path))?;
@@ -223,15 +111,76 @@ impl VirtualFile {
reopen_options.truncate(false);
let vfile = VirtualFile {
handle: RwLock::new(handle),
handle: Arc::new(Mutex::new(Some(file))),
pos: 0,
path: path.to_path_buf(),
open_options: reopen_options,
tenant_id,
timeline_id,
};
slot_guard.file.replace(file);
Ok(vfile)
}
/// Open a file in read-only mode. Like File::open.
pub async fn open_async(path: &Path) -> Result<VirtualFile, std::io::Error> {
let mut options = tokio_epoll_uring::ops::open_at::OpenOptions::new();
options.read(true);
Self::open_with_options_async(path, options).await
}
/// Open a file with given options.
///
/// Note: If any custom flags were set in 'open_options' through OpenOptionsExt,
/// they will be applied also when the file is subsequently re-opened, not only
/// on the first time. Make sure that's sane!
pub async fn open_with_options_async(
path: &Path,
open_options: tokio_epoll_uring::ops::open_at::OpenOptions,
) -> Result<VirtualFile, std::io::Error> {
let path_str = path.to_string_lossy();
let parts = path_str.split('/').collect::<Vec<&str>>();
let tenant_id;
let timeline_id;
if parts.len() > 5 && parts[parts.len() - 5] == "tenants" {
tenant_id = parts[parts.len() - 4].to_string();
timeline_id = parts[parts.len() - 2].to_string();
} else {
tenant_id = "*".to_string();
timeline_id = "*".to_string();
}
let start = std::time::Instant::now();
let system = tokio_epoll_uring::thread_local_system().await;
let file: OwnedFd = system
.open(path, &open_options)
.await
.map_err(|e| match e {
tokio_epoll_uring::Error::Op(e) => e,
tokio_epoll_uring::Error::System(system) => {
std::io::Error::new(std::io::ErrorKind::Other, system)
}
})?;
let file = File::from(file);
STORAGE_IO_TIME
.with_label_values(&["open"])
.observe(start.elapsed().as_secs_f64());
// Strip all options other than read and write.
//
// It would perhaps be nicer to check just for the read and write flags
// explicitly, but OpenOptions doesn't contain any functions to read flags,
// only to set them.
let mut reopen_options = open_options;
reopen_options.create(false);
reopen_options.create_new(false);
reopen_options.truncate(false);
let vfile = VirtualFile {
handle: Arc::new(Mutex::new(Some(file))),
pos: 0,
path: path.to_path_buf(),
tenant_id,
timeline_id,
};
Ok(vfile)
}
@@ -244,7 +193,9 @@ impl VirtualFile {
pub fn metadata(&self) -> Result<fs::Metadata, Error> {
self.with_file("metadata", |file| file.metadata())?
}
}
impl VirtualFile {
/// Helper function that looks up the underlying File for this VirtualFile,
/// opening it and evicting some other File if necessary. It calls 'func'
/// with the physical File.
@@ -252,68 +203,9 @@ impl VirtualFile {
where
F: FnMut(&File) -> R,
{
let open_files = get_open_files();
let mut handle_guard = {
// Read the cached slot handle, and see if the slot that it points to still
// contains our File.
//
// We only need to hold the handle lock while we read the current handle. If
// another thread closes the file and recycles the slot for a different file,
// we will notice that the handle we read is no longer valid and retry.
let mut handle = *self.handle.read().unwrap();
loop {
// Check if the slot contains our File
{
let slot = &open_files.slots[handle.index];
let slot_guard = slot.inner.read().unwrap();
if slot_guard.tag == handle.tag {
if let Some(file) = &slot_guard.file {
// Found a cached file descriptor.
slot.recently_used.store(true, Ordering::Relaxed);
return Ok(STORAGE_IO_TIME
.with_label_values(&[op])
.observe_closure_duration(|| func(file)));
}
}
}
// The slot didn't contain our File. We will have to open it ourselves,
// but before that, grab a write lock on handle in the VirtualFile, so
// that no other thread will try to concurrently open the same file.
let handle_guard = self.handle.write().unwrap();
// If another thread changed the handle while we were not holding the lock,
// then the handle might now be valid again. Loop back to retry.
if *handle_guard != handle {
handle = *handle_guard;
continue;
}
break handle_guard;
}
};
// We need to open the file ourselves. The handle in the VirtualFile is
// now locked in write-mode. Find a free slot to put it in.
let (handle, mut slot_guard) = open_files.find_victim_slot();
// Open the physical file
let file = STORAGE_IO_TIME
.with_label_values(&["open"])
.observe_closure_duration(|| self.open_options.open(&self.path))?;
// Perform the requested operation on it
let result = STORAGE_IO_TIME
return Ok(STORAGE_IO_TIME
.with_label_values(&[op])
.observe_closure_duration(|| func(&file));
// Store the File in the slot and update the handle in the VirtualFile
// to point to it.
slot_guard.file.replace(file);
*handle_guard = handle;
Ok(result)
.observe_closure_duration(|| func(&*self.handle.lock().unwrap().as_ref().unwrap())));
}
pub fn remove(self) {
@@ -323,35 +215,6 @@ impl VirtualFile {
}
}
impl Drop for VirtualFile {
/// If a VirtualFile is dropped, close the underlying file if it was open.
fn drop(&mut self) {
let handle = self.handle.get_mut().unwrap();
// We could check with a read-lock first, to avoid waiting on an
// unrelated I/O.
let slot = &get_open_files().slots[handle.index];
let mut slot_guard = slot.inner.write().unwrap();
if slot_guard.tag == handle.tag {
slot.recently_used.store(false, Ordering::Relaxed);
// there is also operation "close-by-replace" for closes done on eviction for
// comparison.
STORAGE_IO_TIME
.with_label_values(&["close"])
.observe_closure_duration(|| drop(slot_guard.file.take()));
}
}
}
impl Read for VirtualFile {
fn read(&mut self, buf: &mut [u8]) -> Result<usize, Error> {
let pos = self.pos;
let n = self.read_at(buf, pos)?;
self.pos += n as u64;
Ok(n)
}
}
impl Write for VirtualFile {
fn write(&mut self, buf: &[u8]) -> Result<usize, std::io::Error> {
let pos = self.pos;
@@ -367,8 +230,8 @@ impl Write for VirtualFile {
}
}
impl Seek for VirtualFile {
fn seek(&mut self, pos: SeekFrom) -> Result<u64, Error> {
impl VirtualFile {
pub fn seek(&mut self, pos: SeekFrom) -> Result<u64, Error> {
match pos {
SeekFrom::Start(offset) => {
self.pos = offset;
@@ -392,11 +255,113 @@ impl Seek for VirtualFile {
}
Ok(self.pos)
}
}
impl FileExt for VirtualFile {
// Copied from https://doc.rust-lang.org/1.72.0/src/std/os/unix/fs.rs.html#117-135
pub fn read_exact_at(&self, mut buf: &mut [u8], mut offset: u64) -> Result<(), Error> {
while !buf.is_empty() {
match self.read_at(buf, offset) {
Ok(0) => {
return Err(Error::new(
std::io::ErrorKind::UnexpectedEof,
"failed to fill whole buffer",
))
}
Ok(n) => {
buf = &mut buf[n..];
offset += n as u64;
}
Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => {}
Err(e) => return Err(e),
}
}
Ok(())
}
// Copied from https://doc.rust-lang.org/1.72.0/src/std/os/unix/fs.rs.html#117-135
pub async fn read_exact_at_async(
&self,
mut write_guard: crate::buffer_pool::Buffer,
offset: u64,
) -> Result<crate::buffer_pool::Buffer, Error> {
let file = self.handle.lock().unwrap().take().unwrap();
let put_back = AtomicBool::new(false);
let put_back_ref = &put_back;
scopeguard::defer! {
if !put_back_ref.load(std::sync::atomic::Ordering::Relaxed) {
panic!("mut put self.handle back")
}
};
let system = tokio_epoll_uring::thread_local_system().await;
struct PageWriteGuardBuf {
buf: crate::buffer_pool::Buffer,
init_up_to: usize,
}
unsafe impl tokio_epoll_uring::IoBuf for PageWriteGuardBuf {
fn stable_ptr(&self) -> *const u8 {
self.buf.as_ptr()
}
fn bytes_init(&self) -> usize {
self.init_up_to
}
fn bytes_total(&self) -> usize {
self.buf.len()
}
}
unsafe impl tokio_epoll_uring::IoBufMut for PageWriteGuardBuf {
fn stable_mut_ptr(&mut self) -> *mut u8 {
self.buf.as_mut_ptr()
}
unsafe fn set_init(&mut self, pos: usize) {
assert!(pos <= self.buf.len());
self.init_up_to = pos;
}
}
let buf = PageWriteGuardBuf {
buf: write_guard,
init_up_to: 0,
};
let ((file, buf), res) = system.read(file.into(), offset, buf).await;
let PageWriteGuardBuf {
buf: write_guard,
init_up_to,
} = buf;
if let Ok(num_read) = res {
assert!(init_up_to <= num_read);
}
let replaced = self.handle.lock().unwrap().replace(File::from(file));
assert!(replaced.is_none());
put_back.store(true, std::sync::atomic::Ordering::Relaxed);
res.map(|_| write_guard)
.map_err(|e| Error::new(ErrorKind::Other, e))
}
// Copied from https://doc.rust-lang.org/1.72.0/src/std/os/unix/fs.rs.html#219-235
pub fn write_all_at(&self, mut buf: &[u8], mut offset: u64) -> Result<(), Error> {
while !buf.is_empty() {
match self.write_at(buf, offset) {
Ok(0) => {
return Err(Error::new(
std::io::ErrorKind::WriteZero,
"failed to write whole buffer",
));
}
Ok(n) => {
buf = &buf[n..];
offset += n as u64;
}
Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => {}
Err(e) => return Err(e),
}
}
Ok(())
}
fn read_at(&self, buf: &mut [u8], offset: u64) -> Result<usize, Error> {
let result = self.with_file("read", |file| file.read_at(buf, offset))?;
let result = self.with_file("read", |file| {
tracing::info!("sync read\n{}", std::backtrace::Backtrace::force_capture());
file.read_at(buf, offset)
})?;
if let Ok(size) = result {
STORAGE_IO_SIZE
.with_label_values(&["read", &self.tenant_id, &self.timeline_id])
@@ -405,7 +370,7 @@ impl FileExt for VirtualFile {
result
}
fn write_at(&self, buf: &[u8], offset: u64) -> Result<usize, Error> {
pub fn write_at(&self, buf: &[u8], offset: u64) -> Result<usize, Error> {
let result = self.with_file("write", |file| file.write_at(buf, offset))?;
if let Ok(size) = result {
STORAGE_IO_SIZE
@@ -415,256 +380,3 @@ impl FileExt for VirtualFile {
result
}
}
impl OpenFiles {
fn new(num_slots: usize) -> OpenFiles {
let mut slots = Box::new(Vec::with_capacity(num_slots));
for _ in 0..num_slots {
let slot = Slot {
recently_used: AtomicBool::new(false),
inner: RwLock::new(SlotInner { tag: 0, file: None }),
};
slots.push(slot);
}
OpenFiles {
next: AtomicUsize::new(0),
slots: Box::leak(slots),
}
}
}
///
/// Initialize the virtual file module. This must be called once at page
/// server startup.
///
pub fn init(num_slots: usize) {
if OPEN_FILES.set(OpenFiles::new(num_slots)).is_err() {
panic!("virtual_file::init called twice");
}
}
const TEST_MAX_FILE_DESCRIPTORS: usize = 10;
// Get a handle to the global slots array.
fn get_open_files() -> &'static OpenFiles {
//
// In unit tests, page server startup doesn't happen and no one calls
// virtual_file::init(). Initialize it here, with a small array.
//
// This applies to the virtual file tests below, but all other unit
// tests too, so the virtual file facility is always usable in
// unit tests.
//
if cfg!(test) {
OPEN_FILES.get_or_init(|| OpenFiles::new(TEST_MAX_FILE_DESCRIPTORS))
} else {
OPEN_FILES.get().expect("virtual_file::init not called yet")
}
}
#[cfg(test)]
mod tests {
use super::*;
use rand::seq::SliceRandom;
use rand::thread_rng;
use rand::Rng;
use std::sync::Arc;
use std::thread;
// Helper function to slurp contents of a file, starting at the current position,
// into a string
fn read_string<FD>(vfile: &mut FD) -> Result<String, Error>
where
FD: Read,
{
let mut buf = String::new();
vfile.read_to_string(&mut buf)?;
Ok(buf)
}
// Helper function to slurp a portion of a file into a string
fn read_string_at<FD>(vfile: &mut FD, pos: u64, len: usize) -> Result<String, Error>
where
FD: FileExt,
{
let mut buf = Vec::new();
buf.resize(len, 0);
vfile.read_exact_at(&mut buf, pos)?;
Ok(String::from_utf8(buf).unwrap())
}
#[test]
fn test_virtual_files() -> Result<(), Error> {
// The real work is done in the test_files() helper function. This
// allows us to run the same set of tests against a native File, and
// VirtualFile. We trust the native Files and wouldn't need to test them,
// but this allows us to verify that the operations return the same
// results with VirtualFiles as with native Files. (Except that with
// native files, you will run out of file descriptors if the ulimit
// is low enough.)
test_files("virtual_files", |path, open_options| {
VirtualFile::open_with_options(path, open_options)
})
}
#[test]
fn test_physical_files() -> Result<(), Error> {
test_files("physical_files", |path, open_options| {
open_options.open(path)
})
}
fn test_files<OF, FD>(testname: &str, openfunc: OF) -> Result<(), Error>
where
FD: Read + Write + Seek + FileExt,
OF: Fn(&Path, &OpenOptions) -> Result<FD, std::io::Error>,
{
let testdir = crate::config::PageServerConf::test_repo_dir(testname);
std::fs::create_dir_all(&testdir)?;
let path_a = testdir.join("file_a");
let mut file_a = openfunc(
&path_a,
OpenOptions::new().write(true).create(true).truncate(true),
)?;
file_a.write_all(b"foobar")?;
// cannot read from a file opened in write-only mode
assert!(read_string(&mut file_a).is_err());
// Close the file and re-open for reading
let mut file_a = openfunc(&path_a, OpenOptions::new().read(true))?;
// cannot write to a file opened in read-only mode
assert!(file_a.write(b"bar").is_err());
// Try simple read
assert_eq!("foobar", read_string(&mut file_a)?);
// It's positioned at the EOF now.
assert_eq!("", read_string(&mut file_a)?);
// Test seeks.
assert_eq!(file_a.seek(SeekFrom::Start(1))?, 1);
assert_eq!("oobar", read_string(&mut file_a)?);
assert_eq!(file_a.seek(SeekFrom::End(-2))?, 4);
assert_eq!("ar", read_string(&mut file_a)?);
assert_eq!(file_a.seek(SeekFrom::Start(1))?, 1);
assert_eq!(file_a.seek(SeekFrom::Current(2))?, 3);
assert_eq!("bar", read_string(&mut file_a)?);
assert_eq!(file_a.seek(SeekFrom::Current(-5))?, 1);
assert_eq!("oobar", read_string(&mut file_a)?);
// Test erroneous seeks to before byte 0
assert!(file_a.seek(SeekFrom::End(-7)).is_err());
assert_eq!(file_a.seek(SeekFrom::Start(1))?, 1);
assert!(file_a.seek(SeekFrom::Current(-2)).is_err());
// the erroneous seek should have left the position unchanged
assert_eq!("oobar", read_string(&mut file_a)?);
// Create another test file, and try FileExt functions on it.
let path_b = testdir.join("file_b");
let mut file_b = openfunc(
&path_b,
OpenOptions::new()
.read(true)
.write(true)
.create(true)
.truncate(true),
)?;
file_b.write_all_at(b"BAR", 3)?;
file_b.write_all_at(b"FOO", 0)?;
assert_eq!(read_string_at(&mut file_b, 2, 3)?, "OBA");
// Open a lot of files, enough to cause some evictions. (Or to be precise,
// open the same file many times. The effect is the same.)
//
// leave file_a positioned at offset 1 before we start
assert_eq!(file_a.seek(SeekFrom::Start(1))?, 1);
let mut vfiles = Vec::new();
for _ in 0..100 {
let mut vfile = openfunc(&path_b, OpenOptions::new().read(true))?;
assert_eq!("FOOBAR", read_string(&mut vfile)?);
vfiles.push(vfile);
}
// make sure we opened enough files to definitely cause evictions.
assert!(vfiles.len() > TEST_MAX_FILE_DESCRIPTORS * 2);
// The underlying file descriptor for 'file_a' should be closed now. Try to read
// from it again. We left the file positioned at offset 1 above.
assert_eq!("oobar", read_string(&mut file_a)?);
// Check that all the other FDs still work too. Use them in random order for
// good measure.
vfiles.as_mut_slice().shuffle(&mut thread_rng());
for vfile in vfiles.iter_mut() {
assert_eq!("OOBAR", read_string_at(vfile, 1, 5)?);
}
Ok(())
}
/// Test using VirtualFiles from many threads concurrently. This tests both using
/// a lot of VirtualFiles concurrently, causing evictions, and also using the same
/// VirtualFile from multiple threads concurrently.
#[test]
fn test_vfile_concurrency() -> Result<(), Error> {
const SIZE: usize = 8 * 1024;
const VIRTUAL_FILES: usize = 100;
const THREADS: usize = 100;
const SAMPLE: [u8; SIZE] = [0xADu8; SIZE];
let testdir = crate::config::PageServerConf::test_repo_dir("vfile_concurrency");
std::fs::create_dir_all(&testdir)?;
// Create a test file.
let test_file_path = testdir.join("concurrency_test_file");
{
let file = File::create(&test_file_path)?;
file.write_all_at(&SAMPLE, 0)?;
}
// Open the file many times.
let mut files = Vec::new();
for _ in 0..VIRTUAL_FILES {
let f = VirtualFile::open_with_options(&test_file_path, OpenOptions::new().read(true))?;
files.push(f);
}
let files = Arc::new(files);
// Launch many threads, and use the virtual files concurrently in random order.
let mut threads = Vec::new();
for threadno in 0..THREADS {
let builder =
thread::Builder::new().name(format!("test_vfile_concurrency thread {}", threadno));
let files = files.clone();
let thread = builder
.spawn(move || {
let mut buf = [0u8; SIZE];
let mut rng = rand::thread_rng();
for _ in 1..1000 {
let f = &files[rng.gen_range(0..files.len())];
f.read_exact_at(&mut buf, 0).unwrap();
assert!(buf == SAMPLE);
}
})
.unwrap();
threads.push(thread);
}
for thread in threads {
thread.join().unwrap();
}
Ok(())
}
}

View File

@@ -0,0 +1,32 @@
import queue
import threading
from fixtures.neon_fixtures import NeonEnvBuilder, PgBin, wait_for_last_flush_lsn
from fixtures.types import TenantId
def test_pageserver_startup_many_tenants(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin):
env = neon_env_builder.init_start()
# below doesn't work because summaries contain tenant and timeline ids and we check for them
tenant_id, timeline_id = env.initial_tenant, env.initial_timeline
pshttp = env.pageserver.http_client()
ep = env.endpoints.create_start("main")
ep.safe_psql("create table foo(b text)")
for i in range(0, 8):
ep.safe_psql("insert into foo(b) values ('some text')")
# pg_bin.run_capture(["pgbench", "-i", "-s1", ep.connstr()])
wait_for_last_flush_lsn(env, ep, tenant_id, timeline_id)
pshttp.timeline_checkpoint(tenant_id, timeline_id)
ep.stop_and_destroy()
env.pageserver.stop()
for sk in env.safekeepers:
sk.stop()
tenant_dir = env.repo_dir / "tenants" / str(env.initial_tenant)
for i in range(0, 20_000):
import shutil
shutil.copytree(tenant_dir, tenant_dir.parent / str(TenantId.generate()))