mirror of
https://github.com/neondatabase/neon.git
synced 2026-02-05 03:30:36 +00:00
Compare commits
29 Commits
add_audit_
...
problame/l
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
02205e9191 | ||
|
|
66c501f5b8 | ||
|
|
887f464825 | ||
|
|
0b8ff8dbe0 | ||
|
|
60971e282e | ||
|
|
189aa1b077 | ||
|
|
9f087f93f8 | ||
|
|
fa1fb214b3 | ||
|
|
4db24c9de0 | ||
|
|
5f920a9993 | ||
|
|
2aac082385 | ||
|
|
99f8f87ba5 | ||
|
|
e7e1df2a79 | ||
|
|
10ee8f7981 | ||
|
|
a5b6e32b01 | ||
|
|
bb88e5bf57 | ||
|
|
876efcfc0a | ||
|
|
4d69192ae5 | ||
|
|
2d5d046062 | ||
|
|
d8b8a203a4 | ||
|
|
11e9b25f2b | ||
|
|
235baffbf4 | ||
|
|
5be0f9d69a | ||
|
|
e91e4d0b96 | ||
|
|
edbe3d2f76 | ||
|
|
0d9fa95454 | ||
|
|
e983b3cc2e | ||
|
|
a362ab9169 | ||
|
|
0cfc9edcb8 |
83
Cargo.lock
generated
83
Cargo.lock
generated
@@ -1996,6 +1996,26 @@ dependencies = [
|
||||
"windows-sys 0.48.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "io-uring"
|
||||
version = "0.5.13"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "dd1e1a01cfb924fd8c5c43b6827965db394f5a3a16c599ce03452266e1cf984c"
|
||||
dependencies = [
|
||||
"bitflags",
|
||||
"libc",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "io-uring"
|
||||
version = "0.6.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "141a0f4546a50b2ed637c7a6df0d7dff45c9f41523254996764461c8ae0d9424"
|
||||
dependencies = [
|
||||
"bitflags",
|
||||
"libc",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "ipnet"
|
||||
version = "2.7.2"
|
||||
@@ -2095,9 +2115,9 @@ checksum = "830d08ce1d1d941e6b30645f1a0eb5643013d835ce3779a5fc208261dbe10f55"
|
||||
|
||||
[[package]]
|
||||
name = "libc"
|
||||
version = "0.2.144"
|
||||
version = "0.2.147"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "2b00cc1c228a6782d0f076e7b232802e0c5689d41bb5df366f2a6b6621cfdfe1"
|
||||
checksum = "b4668fb0ea861c1df094127ac5f1da3409a82116a4ba74fca2e58ef927159bb3"
|
||||
|
||||
[[package]]
|
||||
name = "libloading"
|
||||
@@ -2663,6 +2683,7 @@ dependencies = [
|
||||
"tenant_size_model",
|
||||
"thiserror",
|
||||
"tokio",
|
||||
"tokio-epoll-uring",
|
||||
"tokio-io-timeout",
|
||||
"tokio-postgres",
|
||||
"tokio-tar",
|
||||
@@ -2836,9 +2857,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "pin-project-lite"
|
||||
version = "0.2.9"
|
||||
version = "0.2.13"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "e0a7ae3ac2f1173085d398531c705756c94a4c56843785df85a60c1a0afac116"
|
||||
checksum = "8afb450f006bf6385ca15ef45d71d2288452bc3683ce2e2cacc0d18e4be60b58"
|
||||
|
||||
[[package]]
|
||||
name = "pin-utils"
|
||||
@@ -3725,6 +3746,12 @@ dependencies = [
|
||||
"windows-sys 0.42.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "scoped-tls"
|
||||
version = "1.0.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "e1cf6437eb19a8f4a6cc0f7dca544973b0b78843adbfeb3683d1a94a0024a294"
|
||||
|
||||
[[package]]
|
||||
name = "scopeguard"
|
||||
version = "1.1.0"
|
||||
@@ -4296,18 +4323,18 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "thiserror"
|
||||
version = "1.0.40"
|
||||
version = "1.0.47"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "978c9a314bd8dc99be594bc3c175faaa9794be04a5a5e153caba6915336cebac"
|
||||
checksum = "97a802ec30afc17eee47b2855fc72e0c4cd62be9b4efe6591edde0ec5bd68d8f"
|
||||
dependencies = [
|
||||
"thiserror-impl",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "thiserror-impl"
|
||||
version = "1.0.40"
|
||||
version = "1.0.47"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "f9456a42c5b0d803c8cd86e73dd7cc9edd429499f37a3550d286d5e86720569f"
|
||||
checksum = "6bb623b56e39ab7dcd4b1b98bb6c8f8d907ed255b18de254088016b27a8ee19b"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
@@ -4392,22 +4419,40 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "tokio"
|
||||
version = "1.28.1"
|
||||
version = "1.32.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "0aa32867d44e6f2ce3385e89dceb990188b8bb0fb25b0cf576647a6f98ac5105"
|
||||
checksum = "17ed6077ed6cd6c74735e21f37eb16dc3935f96878b1fe961074089cc80893f9"
|
||||
dependencies = [
|
||||
"autocfg",
|
||||
"backtrace",
|
||||
"bytes",
|
||||
"libc",
|
||||
"mio",
|
||||
"num_cpus",
|
||||
"parking_lot 0.12.1",
|
||||
"pin-project-lite",
|
||||
"signal-hook-registry",
|
||||
"socket2 0.4.9",
|
||||
"socket2 0.5.3",
|
||||
"tokio-macros",
|
||||
"windows-sys 0.48.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tokio-epoll-uring"
|
||||
version = "0.1.0"
|
||||
source = "git+https://github.com/neondatabase/tokio-epoll-uring.git?branch=problame/hacky-openat#96e5a1f3a3d6921438002807475d01540e1211b2"
|
||||
dependencies = [
|
||||
"futures",
|
||||
"io-uring 0.6.1",
|
||||
"libc",
|
||||
"once_cell",
|
||||
"scopeguard",
|
||||
"thiserror",
|
||||
"tokio",
|
||||
"tokio-uring",
|
||||
"tokio-util",
|
||||
"tracing",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tokio-io-timeout"
|
||||
version = "1.2.0"
|
||||
@@ -4547,6 +4592,20 @@ dependencies = [
|
||||
"tungstenite 0.20.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tokio-uring"
|
||||
version = "0.4.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "0d5e02bb137e030b3a547c65a3bd2f1836d66a97369fdcc69034002b10e155ef"
|
||||
dependencies = [
|
||||
"io-uring 0.5.13",
|
||||
"libc",
|
||||
"scoped-tls",
|
||||
"slab",
|
||||
"socket2 0.4.9",
|
||||
"tokio",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tokio-util"
|
||||
version = "0.7.8"
|
||||
|
||||
55
Cargo.toml
55
Cargo.toml
@@ -200,58 +200,3 @@ tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", re
|
||||
# Besides, debug info should not affect the performance.
|
||||
debug = true
|
||||
|
||||
# disable debug symbols for all packages except this one to decrease binaries size
|
||||
[profile.release.package."*"]
|
||||
debug = false
|
||||
|
||||
[profile.release-line-debug]
|
||||
inherits = "release"
|
||||
debug = 1 # true = 2 = all symbols, 1 = line only
|
||||
[profile.release-line-debug-lto]
|
||||
inherits = "release"
|
||||
debug = 1 # true = 2 = all symbols, 1 = line only
|
||||
lto = true
|
||||
|
||||
[profile.release-line-debug-size]
|
||||
inherits = "release"
|
||||
debug = 1 # true = 2 = all symbols, 1 = line only
|
||||
opt-level = "s"
|
||||
[profile.release-line-debug-zize]
|
||||
inherits = "release"
|
||||
debug = 1 # true = 2 = all symbols, 1 = line only
|
||||
opt-level = "z"
|
||||
[profile.release-line-debug-size-lto]
|
||||
inherits = "release"
|
||||
debug = 1 # true = 2 = all symbols, 1 = line only
|
||||
opt-level = "s"
|
||||
lto = true
|
||||
[profile.release-line-debug-zize-lto]
|
||||
inherits = "release"
|
||||
debug = 1 # true = 2 = all symbols, 1 = line only
|
||||
opt-level = "z"
|
||||
lto = true
|
||||
|
||||
[profile.release-no-debug]
|
||||
inherits = "release"
|
||||
debug = false # true = 2 = all symbols, 1 = line only
|
||||
|
||||
[profile.release-no-debug-size]
|
||||
inherits = "release"
|
||||
debug = false # true = 2 = all symbols, 1 = line only
|
||||
opt-level = "s"
|
||||
[profile.release-no-debug-zize]
|
||||
inherits = "release"
|
||||
debug = false # true = 2 = all symbols, 1 = line only
|
||||
opt-level = "z"
|
||||
|
||||
[profile.release-no-debug-size-lto]
|
||||
inherits = "release"
|
||||
debug = false # true = 2 = all symbols, 1 = line only
|
||||
opt-level = "s"
|
||||
lto = true
|
||||
|
||||
[profile.release-no-debug-zize-lto]
|
||||
inherits = "release"
|
||||
debug = false # true = 2 = all symbols, 1 = line only
|
||||
opt-level = "z"
|
||||
lto = true
|
||||
|
||||
@@ -80,6 +80,8 @@ enum-map.workspace = true
|
||||
enumset.workspace = true
|
||||
strum.workspace = true
|
||||
strum_macros.workspace = true
|
||||
#tokio-epoll-uring = { path = "/home/admin/tokio-epoll-uring/tokio-epoll-uring" }
|
||||
tokio-epoll-uring = { git = "https://github.com/neondatabase/tokio-epoll-uring.git" , branch = "problame/hacky-openat" }
|
||||
|
||||
[dev-dependencies]
|
||||
criterion.workspace = true
|
||||
|
||||
@@ -8,10 +8,9 @@ use std::collections::BinaryHeap;
|
||||
use std::ops::Range;
|
||||
use std::{fs, path::Path, str};
|
||||
|
||||
use pageserver::page_cache::PAGE_SZ;
|
||||
use pageserver::repository::{Key, KEY_SIZE};
|
||||
use pageserver::tenant::block_io::FileBlockReader;
|
||||
use pageserver::tenant::disk_btree::{DiskBtreeReader, VisitDirection};
|
||||
use pageserver::tenant::disk_btree::{DiskBtreeReader, VisitDirection, PAGE_SZ};
|
||||
use pageserver::tenant::storage_layer::delta_layer::{Summary, DELTA_KEY_SIZE};
|
||||
use pageserver::tenant::storage_layer::range_overlaps;
|
||||
use pageserver::virtual_file::VirtualFile;
|
||||
@@ -97,7 +96,7 @@ pub(crate) fn parse_filename(name: &str) -> Option<LayerFile> {
|
||||
// Finds the max_holes largest holes, ignoring any that are smaller than MIN_HOLE_LENGTH"
|
||||
async fn get_holes(path: &Path, max_holes: usize) -> Result<Vec<Hole>> {
|
||||
let file = FileBlockReader::new(VirtualFile::open(path)?);
|
||||
let summary_blk = file.read_blk(0)?;
|
||||
let summary_blk = file.read_blk(0).await?;
|
||||
let actual_summary = Summary::des_prefix(summary_blk.as_ref())?;
|
||||
let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
|
||||
actual_summary.index_start_blk,
|
||||
@@ -135,10 +134,6 @@ pub(crate) async fn main(cmd: &AnalyzeLayerMapCmd) -> Result<()> {
|
||||
let storage_path = &cmd.path;
|
||||
let max_holes = cmd.max_holes.unwrap_or(DEFAULT_MAX_HOLES);
|
||||
|
||||
// Initialize virtual_file (file desriptor cache) and page cache which are needed to access layer persistent B-Tree.
|
||||
pageserver::virtual_file::init(10);
|
||||
pageserver::page_cache::init(100);
|
||||
|
||||
let mut total_delta_layers = 0usize;
|
||||
let mut total_image_layers = 0usize;
|
||||
let mut total_excess_layers = 0usize;
|
||||
|
||||
@@ -5,7 +5,6 @@ use clap::Subcommand;
|
||||
use pageserver::tenant::block_io::BlockCursor;
|
||||
use pageserver::tenant::disk_btree::DiskBtreeReader;
|
||||
use pageserver::tenant::storage_layer::delta_layer::{BlobRef, Summary};
|
||||
use pageserver::{page_cache, virtual_file};
|
||||
use pageserver::{
|
||||
repository::{Key, KEY_SIZE},
|
||||
tenant::{
|
||||
@@ -45,10 +44,8 @@ pub(crate) enum LayerCmd {
|
||||
|
||||
async fn read_delta_file(path: impl AsRef<Path>) -> Result<()> {
|
||||
let path = path.as_ref();
|
||||
virtual_file::init(10);
|
||||
page_cache::init(100);
|
||||
let file = FileBlockReader::new(VirtualFile::open(path)?);
|
||||
let summary_blk = file.read_blk(0)?;
|
||||
let summary_blk = file.read_blk(0).await?;
|
||||
let actual_summary = Summary::des_prefix(summary_blk.as_ref())?;
|
||||
let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
|
||||
actual_summary.index_start_blk,
|
||||
|
||||
@@ -12,10 +12,8 @@ use clap::{Parser, Subcommand};
|
||||
use layers::LayerCmd;
|
||||
use pageserver::{
|
||||
context::{DownloadBehavior, RequestContext},
|
||||
page_cache,
|
||||
task_mgr::TaskKind,
|
||||
task_mgr::TaskKind,
|
||||
tenant::{dump_layerfile_from_path, metadata::TimelineMetadata},
|
||||
virtual_file,
|
||||
};
|
||||
use postgres_ffi::ControlFileData;
|
||||
use std::path::{Path, PathBuf};
|
||||
@@ -115,9 +113,6 @@ fn read_pg_control_file(control_file_path: &Path) -> anyhow::Result<()> {
|
||||
}
|
||||
|
||||
async fn print_layerfile(path: &Path) -> anyhow::Result<()> {
|
||||
// Basic initialization of things that don't change after startup
|
||||
virtual_file::init(10);
|
||||
page_cache::init(100);
|
||||
let ctx = RequestContext::new(TaskKind::DebugTool, DownloadBehavior::Error);
|
||||
dump_layerfile_from_path(path, true, &ctx).await
|
||||
}
|
||||
|
||||
@@ -20,11 +20,10 @@ use metrics::set_build_info_metric;
|
||||
use pageserver::{
|
||||
config::{defaults::*, PageServerConf},
|
||||
context::{DownloadBehavior, RequestContext},
|
||||
http, page_cache, page_service, task_mgr,
|
||||
http, page_service, task_mgr,
|
||||
task_mgr::TaskKind,
|
||||
task_mgr::{BACKGROUND_RUNTIME, COMPUTE_REQUEST_RUNTIME, MGMT_REQUEST_RUNTIME},
|
||||
tenant::mgr,
|
||||
virtual_file,
|
||||
};
|
||||
use postgres_backend::AuthType;
|
||||
use utils::logging::TracingErrorLayerEnablement;
|
||||
@@ -124,10 +123,6 @@ fn main() -> anyhow::Result<()> {
|
||||
// Initialize up failpoints support
|
||||
let scenario = pageserver::failpoint_support::init();
|
||||
|
||||
// Basic initialization of things that don't change after startup
|
||||
virtual_file::init(conf.max_file_descriptors);
|
||||
page_cache::init(conf.page_cache_size);
|
||||
|
||||
start_pageserver(launch_ts, conf).context("Failed to start pageserver")?;
|
||||
|
||||
scenario.teardown();
|
||||
@@ -581,6 +576,31 @@ fn start_pageserver(
|
||||
);
|
||||
}
|
||||
|
||||
task_mgr::spawn(
|
||||
BACKGROUND_RUNTIME.handle(),
|
||||
TaskKind::BackgroundRuntimeTurnaroundMeasure,
|
||||
None,
|
||||
None,
|
||||
"background runtime turnaround measure",
|
||||
true,
|
||||
async move {
|
||||
let server = hyper::Server::try_bind(&"0.0.0.0:2342".parse().unwrap()).expect("bind");
|
||||
let server = server
|
||||
.serve(hyper::service::make_service_fn(|_| async move {
|
||||
Ok::<_, std::convert::Infallible>(hyper::service::service_fn(
|
||||
move |_: hyper::Request<hyper::Body>| async move {
|
||||
Ok::<_, std::convert::Infallible>(hyper::Response::new(
|
||||
hyper::Body::from(format!("alive")),
|
||||
))
|
||||
},
|
||||
))
|
||||
}))
|
||||
.with_graceful_shutdown(task_mgr::shutdown_watcher());
|
||||
server.await?;
|
||||
Ok(())
|
||||
},
|
||||
);
|
||||
|
||||
let mut shutdown_pageserver = Some(shutdown_pageserver.drop_guard());
|
||||
|
||||
// All started up! Now just sit and wait for shutdown signal.
|
||||
|
||||
39
pageserver/src/buffer_pool.rs
Normal file
39
pageserver/src/buffer_pool.rs
Normal file
@@ -0,0 +1,39 @@
|
||||
use std::cell::RefCell;
|
||||
|
||||
use crate::tenant::disk_btree::PAGE_SZ;
|
||||
|
||||
pub struct Buffer(Option<Box<[u8; PAGE_SZ]>>);
|
||||
|
||||
// Thread-local list of re-usable buffers.
|
||||
thread_local! {
|
||||
static POOL: RefCell<Vec<Box<[u8; PAGE_SZ]>>> = RefCell::new(Vec::new());
|
||||
}
|
||||
|
||||
pub(crate) fn get() -> Buffer {
|
||||
let maybe = POOL.with(|rc| rc.borrow_mut().pop());
|
||||
match maybe {
|
||||
Some(buf) => Buffer(Some(buf)),
|
||||
None => Buffer(Some(Box::new([0; PAGE_SZ]))),
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for Buffer {
|
||||
fn drop(&mut self) {
|
||||
let buf = self.0.take().unwrap();
|
||||
POOL.with(|rc| rc.borrow_mut().push(buf))
|
||||
}
|
||||
}
|
||||
|
||||
impl std::ops::Deref for Buffer {
|
||||
type Target = [u8; PAGE_SZ];
|
||||
|
||||
fn deref(&self) -> &Self::Target {
|
||||
self.0.as_ref().unwrap().as_ref()
|
||||
}
|
||||
}
|
||||
|
||||
impl std::ops::DerefMut for Buffer {
|
||||
fn deref_mut(&mut self) -> &mut Self::Target {
|
||||
self.0.as_mut().unwrap().as_mut()
|
||||
}
|
||||
}
|
||||
@@ -8,7 +8,6 @@ pub mod http;
|
||||
pub mod import_datadir;
|
||||
pub mod keyspace;
|
||||
pub mod metrics;
|
||||
pub mod page_cache;
|
||||
pub mod page_service;
|
||||
pub mod pgdatadir_mapping;
|
||||
pub mod repository;
|
||||
@@ -28,6 +27,8 @@ use std::path::Path;
|
||||
use crate::task_mgr::TaskKind;
|
||||
use tracing::info;
|
||||
|
||||
pub mod buffer_pool;
|
||||
|
||||
/// Current storage format version
|
||||
///
|
||||
/// This is embedded in the header of all the layer files.
|
||||
|
||||
@@ -1,848 +0,0 @@
|
||||
//!
|
||||
//! Global page cache
|
||||
//!
|
||||
//! The page cache uses up most of the memory in the page server. It is shared
|
||||
//! by all tenants, and it is used to store different kinds of pages. Sharing
|
||||
//! the cache allows memory to be dynamically allocated where it's needed the
|
||||
//! most.
|
||||
//!
|
||||
//! The page cache consists of fixed-size buffers, 8 kB each to match the
|
||||
//! PostgreSQL buffer size, and a Slot struct for each buffer to contain
|
||||
//! information about what's stored in the buffer.
|
||||
//!
|
||||
//! # Types Of Pages
|
||||
//!
|
||||
//! [`PageCache`] only supports immutable pages.
|
||||
//! Hence there is no need to worry about coherency.
|
||||
//!
|
||||
//! Two types of pages are supported:
|
||||
//!
|
||||
//! * **Materialized pages**, filled & used by page reconstruction
|
||||
//! * **Immutable File pages**, filled & used by [`crate::tenant::block_io`] and [`crate::tenant::ephemeral_file`].
|
||||
//!
|
||||
//! Note that [`crate::tenant::ephemeral_file::EphemeralFile`] is generally mutable, but, it's append-only.
|
||||
//! It uses the page cache only for the blocks that are already fully written and immutable.
|
||||
//!
|
||||
//! # Filling The Page Cache
|
||||
//!
|
||||
//! Page cache maps from a cache key to a buffer slot.
|
||||
//! The cache key uniquely identifies the piece of data that is being cached.
|
||||
//!
|
||||
//! The cache key for **materialized pages** is [`TenantId`], [`TimelineId`], [`Key`], and [`Lsn`].
|
||||
//! Use [`PageCache::memorize_materialized_page`] and [`PageCache::lookup_materialized_page`] for fill & access.
|
||||
//!
|
||||
//! The cache key for **immutable file** pages is [`FileId`] and a block number.
|
||||
//! Users of page cache that wish to page-cache an arbitrary (immutable!) on-disk file do the following:
|
||||
//! * Have a mechanism to deterministically associate the on-disk file with a [`FileId`].
|
||||
//! * Get a [`FileId`] using [`next_file_id`].
|
||||
//! * Use the mechanism to associate the on-disk file with the returned [`FileId`].
|
||||
//! * Use [`PageCache::read_immutable_buf`] to get a [`ReadBufResult`].
|
||||
//! * If the page was already cached, it'll be the [`ReadBufResult::Found`] variant that contains
|
||||
//! a read guard for the page. Just use it.
|
||||
//! * If the page was not cached, it'll be the [`ReadBufResult::NotFound`] variant that contains
|
||||
//! a write guard for the page. Fill the page with the contents of the on-disk file.
|
||||
//! Then call [`PageWriteGuard::mark_valid`] to mark the page as valid.
|
||||
//! Then try again to [`PageCache::read_immutable_buf`].
|
||||
//! Unless there's high cache pressure, the page should now be cached.
|
||||
//! (TODO: allow downgrading the write guard to a read guard to ensure forward progress.)
|
||||
//!
|
||||
//! # Locking
|
||||
//!
|
||||
//! There are two levels of locking involved: There's one lock for the "mapping"
|
||||
//! from page identifier (tenant ID, timeline ID, rel, block, LSN) to the buffer
|
||||
//! slot, and a separate lock on each slot. To read or write the contents of a
|
||||
//! slot, you must hold the lock on the slot in read or write mode,
|
||||
//! respectively. To change the mapping of a slot, i.e. to evict a page or to
|
||||
//! assign a buffer for a page, you must hold the mapping lock and the lock on
|
||||
//! the slot at the same time.
|
||||
//!
|
||||
//! Whenever you need to hold both locks simultaneously, the slot lock must be
|
||||
//! acquired first. This consistent ordering avoids deadlocks. To look up a page
|
||||
//! in the cache, you would first look up the mapping, while holding the mapping
|
||||
//! lock, and then lock the slot. You must release the mapping lock in between,
|
||||
//! to obey the lock ordering and avoid deadlock.
|
||||
//!
|
||||
//! A slot can momentarily have invalid contents, even if it's already been
|
||||
//! inserted to the mapping, but you must hold the write-lock on the slot until
|
||||
//! the contents are valid. If you need to release the lock without initializing
|
||||
//! the contents, you must remove the mapping first. We make that easy for the
|
||||
//! callers with PageWriteGuard: when lock_for_write() returns an uninitialized
|
||||
//! page, the caller must explicitly call guard.mark_valid() after it has
|
||||
//! initialized it. If the guard is dropped without calling mark_valid(), the
|
||||
//! mapping is automatically removed and the slot is marked free.
|
||||
//!
|
||||
|
||||
use std::{
|
||||
collections::{hash_map::Entry, HashMap},
|
||||
convert::TryInto,
|
||||
sync::{
|
||||
atomic::{AtomicU64, AtomicU8, AtomicUsize, Ordering},
|
||||
RwLock, RwLockReadGuard, RwLockWriteGuard, TryLockError,
|
||||
},
|
||||
};
|
||||
|
||||
use anyhow::Context;
|
||||
use once_cell::sync::OnceCell;
|
||||
use utils::{
|
||||
id::{TenantId, TimelineId},
|
||||
lsn::Lsn,
|
||||
};
|
||||
|
||||
use crate::{metrics::PageCacheSizeMetrics, repository::Key};
|
||||
|
||||
static PAGE_CACHE: OnceCell<PageCache> = OnceCell::new();
|
||||
const TEST_PAGE_CACHE_SIZE: usize = 50;
|
||||
|
||||
///
|
||||
/// Initialize the page cache. This must be called once at page server startup.
|
||||
///
|
||||
pub fn init(size: usize) {
|
||||
if PAGE_CACHE.set(PageCache::new(size)).is_err() {
|
||||
panic!("page cache already initialized");
|
||||
}
|
||||
}
|
||||
|
||||
///
|
||||
/// Get a handle to the page cache.
|
||||
///
|
||||
pub fn get() -> &'static PageCache {
|
||||
//
|
||||
// In unit tests, page server startup doesn't happen and no one calls
|
||||
// page_cache::init(). Initialize it here with a tiny cache, so that the
|
||||
// page cache is usable in unit tests.
|
||||
//
|
||||
if cfg!(test) {
|
||||
PAGE_CACHE.get_or_init(|| PageCache::new(TEST_PAGE_CACHE_SIZE))
|
||||
} else {
|
||||
PAGE_CACHE.get().expect("page cache not initialized")
|
||||
}
|
||||
}
|
||||
|
||||
pub const PAGE_SZ: usize = postgres_ffi::BLCKSZ as usize;
|
||||
const MAX_USAGE_COUNT: u8 = 5;
|
||||
|
||||
/// See module-level comment.
|
||||
#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
|
||||
pub struct FileId(u64);
|
||||
|
||||
static NEXT_ID: AtomicU64 = AtomicU64::new(1);
|
||||
|
||||
/// See module-level comment.
|
||||
pub fn next_file_id() -> FileId {
|
||||
FileId(NEXT_ID.fetch_add(1, Ordering::Relaxed))
|
||||
}
|
||||
|
||||
///
|
||||
/// CacheKey uniquely identifies a "thing" to cache in the page cache.
|
||||
///
|
||||
#[derive(Debug, PartialEq, Eq, Clone)]
|
||||
#[allow(clippy::enum_variant_names)]
|
||||
enum CacheKey {
|
||||
MaterializedPage {
|
||||
hash_key: MaterializedPageHashKey,
|
||||
lsn: Lsn,
|
||||
},
|
||||
ImmutableFilePage {
|
||||
file_id: FileId,
|
||||
blkno: u32,
|
||||
},
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq, Eq, Hash, Clone)]
|
||||
struct MaterializedPageHashKey {
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
key: Key,
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
struct Version {
|
||||
lsn: Lsn,
|
||||
slot_idx: usize,
|
||||
}
|
||||
|
||||
struct Slot {
|
||||
inner: RwLock<SlotInner>,
|
||||
usage_count: AtomicU8,
|
||||
}
|
||||
|
||||
struct SlotInner {
|
||||
key: Option<CacheKey>,
|
||||
buf: &'static mut [u8; PAGE_SZ],
|
||||
}
|
||||
|
||||
impl Slot {
|
||||
/// Increment usage count on the buffer, with ceiling at MAX_USAGE_COUNT.
|
||||
fn inc_usage_count(&self) {
|
||||
let _ = self
|
||||
.usage_count
|
||||
.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |val| {
|
||||
if val == MAX_USAGE_COUNT {
|
||||
None
|
||||
} else {
|
||||
Some(val + 1)
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/// Decrement usage count on the buffer, unless it's already zero. Returns
|
||||
/// the old usage count.
|
||||
fn dec_usage_count(&self) -> u8 {
|
||||
let count_res =
|
||||
self.usage_count
|
||||
.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |val| {
|
||||
if val == 0 {
|
||||
None
|
||||
} else {
|
||||
Some(val - 1)
|
||||
}
|
||||
});
|
||||
|
||||
match count_res {
|
||||
Ok(usage_count) => usage_count,
|
||||
Err(usage_count) => usage_count,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct PageCache {
|
||||
/// This contains the mapping from the cache key to buffer slot that currently
|
||||
/// contains the page, if any.
|
||||
///
|
||||
/// TODO: This is protected by a single lock. If that becomes a bottleneck,
|
||||
/// this HashMap can be replaced with a more concurrent version, there are
|
||||
/// plenty of such crates around.
|
||||
///
|
||||
/// If you add support for caching different kinds of objects, each object kind
|
||||
/// can have a separate mapping map, next to this field.
|
||||
materialized_page_map: RwLock<HashMap<MaterializedPageHashKey, Vec<Version>>>,
|
||||
|
||||
immutable_page_map: RwLock<HashMap<(FileId, u32), usize>>,
|
||||
|
||||
/// The actual buffers with their metadata.
|
||||
slots: Box<[Slot]>,
|
||||
|
||||
/// Index of the next candidate to evict, for the Clock replacement algorithm.
|
||||
/// This is interpreted modulo the page cache size.
|
||||
next_evict_slot: AtomicUsize,
|
||||
|
||||
size_metrics: &'static PageCacheSizeMetrics,
|
||||
}
|
||||
|
||||
///
|
||||
/// PageReadGuard is a "lease" on a buffer, for reading. The page is kept locked
|
||||
/// until the guard is dropped.
|
||||
///
|
||||
pub struct PageReadGuard<'i>(RwLockReadGuard<'i, SlotInner>);
|
||||
|
||||
impl std::ops::Deref for PageReadGuard<'_> {
|
||||
type Target = [u8; PAGE_SZ];
|
||||
|
||||
fn deref(&self) -> &Self::Target {
|
||||
self.0.buf
|
||||
}
|
||||
}
|
||||
|
||||
impl AsRef<[u8; PAGE_SZ]> for PageReadGuard<'_> {
|
||||
fn as_ref(&self) -> &[u8; PAGE_SZ] {
|
||||
self.0.buf
|
||||
}
|
||||
}
|
||||
|
||||
///
|
||||
/// PageWriteGuard is a lease on a buffer for modifying it. The page is kept locked
|
||||
/// until the guard is dropped.
|
||||
///
|
||||
/// Counterintuitively, this is used even for a read, if the requested page is not
|
||||
/// currently found in the page cache. In that case, the caller of lock_for_read()
|
||||
/// is expected to fill in the page contents and call mark_valid(). Similarly
|
||||
/// lock_for_write() can return an invalid buffer that the caller is expected to
|
||||
/// to initialize.
|
||||
///
|
||||
pub struct PageWriteGuard<'i> {
|
||||
inner: RwLockWriteGuard<'i, SlotInner>,
|
||||
|
||||
// Are the page contents currently valid?
|
||||
valid: bool,
|
||||
}
|
||||
|
||||
impl std::ops::DerefMut for PageWriteGuard<'_> {
|
||||
fn deref_mut(&mut self) -> &mut Self::Target {
|
||||
self.inner.buf
|
||||
}
|
||||
}
|
||||
|
||||
impl std::ops::Deref for PageWriteGuard<'_> {
|
||||
type Target = [u8; PAGE_SZ];
|
||||
|
||||
fn deref(&self) -> &Self::Target {
|
||||
self.inner.buf
|
||||
}
|
||||
}
|
||||
|
||||
impl AsMut<[u8; PAGE_SZ]> for PageWriteGuard<'_> {
|
||||
fn as_mut(&mut self) -> &mut [u8; PAGE_SZ] {
|
||||
self.inner.buf
|
||||
}
|
||||
}
|
||||
|
||||
impl PageWriteGuard<'_> {
|
||||
/// Mark that the buffer contents are now valid.
|
||||
pub fn mark_valid(&mut self) {
|
||||
assert!(self.inner.key.is_some());
|
||||
assert!(
|
||||
!self.valid,
|
||||
"mark_valid called on a buffer that was already valid"
|
||||
);
|
||||
self.valid = true;
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for PageWriteGuard<'_> {
|
||||
///
|
||||
/// If the buffer was allocated for a page that was not already in the
|
||||
/// cache, but the lock_for_read/write() caller dropped the buffer without
|
||||
/// initializing it, remove the mapping from the page cache.
|
||||
///
|
||||
fn drop(&mut self) {
|
||||
assert!(self.inner.key.is_some());
|
||||
if !self.valid {
|
||||
let self_key = self.inner.key.as_ref().unwrap();
|
||||
PAGE_CACHE.get().unwrap().remove_mapping(self_key);
|
||||
self.inner.key = None;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// lock_for_read() return value
|
||||
pub enum ReadBufResult<'a> {
|
||||
Found(PageReadGuard<'a>),
|
||||
NotFound(PageWriteGuard<'a>),
|
||||
}
|
||||
|
||||
/// lock_for_write() return value
|
||||
pub enum WriteBufResult<'a> {
|
||||
Found(PageWriteGuard<'a>),
|
||||
NotFound(PageWriteGuard<'a>),
|
||||
}
|
||||
|
||||
impl PageCache {
|
||||
//
|
||||
// Section 1.1: Public interface functions for looking up and memorizing materialized page
|
||||
// versions in the page cache
|
||||
//
|
||||
|
||||
/// Look up a materialized page version.
|
||||
///
|
||||
/// The 'lsn' is an upper bound, this will return the latest version of
|
||||
/// the given block, but not newer than 'lsn'. Returns the actual LSN of the
|
||||
/// returned page.
|
||||
pub fn lookup_materialized_page(
|
||||
&self,
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
key: &Key,
|
||||
lsn: Lsn,
|
||||
) -> Option<(Lsn, PageReadGuard)> {
|
||||
crate::metrics::PAGE_CACHE
|
||||
.read_accesses_materialized_page
|
||||
.inc();
|
||||
|
||||
let mut cache_key = CacheKey::MaterializedPage {
|
||||
hash_key: MaterializedPageHashKey {
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
key: *key,
|
||||
},
|
||||
lsn,
|
||||
};
|
||||
|
||||
if let Some(guard) = self.try_lock_for_read(&mut cache_key) {
|
||||
if let CacheKey::MaterializedPage {
|
||||
hash_key: _,
|
||||
lsn: available_lsn,
|
||||
} = cache_key
|
||||
{
|
||||
if available_lsn == lsn {
|
||||
crate::metrics::PAGE_CACHE
|
||||
.read_hits_materialized_page_exact
|
||||
.inc();
|
||||
} else {
|
||||
crate::metrics::PAGE_CACHE
|
||||
.read_hits_materialized_page_older_lsn
|
||||
.inc();
|
||||
}
|
||||
Some((available_lsn, guard))
|
||||
} else {
|
||||
panic!("unexpected key type in slot");
|
||||
}
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
///
|
||||
/// Store an image of the given page in the cache.
|
||||
///
|
||||
pub fn memorize_materialized_page(
|
||||
&self,
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
key: Key,
|
||||
lsn: Lsn,
|
||||
img: &[u8],
|
||||
) -> anyhow::Result<()> {
|
||||
let cache_key = CacheKey::MaterializedPage {
|
||||
hash_key: MaterializedPageHashKey {
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
key,
|
||||
},
|
||||
lsn,
|
||||
};
|
||||
|
||||
match self.lock_for_write(&cache_key)? {
|
||||
WriteBufResult::Found(write_guard) => {
|
||||
// We already had it in cache. Another thread must've put it there
|
||||
// concurrently. Check that it had the same contents that we
|
||||
// replayed.
|
||||
assert!(*write_guard == img);
|
||||
}
|
||||
WriteBufResult::NotFound(mut write_guard) => {
|
||||
write_guard.copy_from_slice(img);
|
||||
write_guard.mark_valid();
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// Section 1.2: Public interface functions for working with immutable file pages.
|
||||
|
||||
pub fn read_immutable_buf(&self, file_id: FileId, blkno: u32) -> anyhow::Result<ReadBufResult> {
|
||||
let mut cache_key = CacheKey::ImmutableFilePage { file_id, blkno };
|
||||
|
||||
self.lock_for_read(&mut cache_key)
|
||||
}
|
||||
|
||||
//
|
||||
// Section 2: Internal interface functions for lookup/update.
|
||||
//
|
||||
// To add support for a new kind of "thing" to cache, you will need
|
||||
// to add public interface routines above, and code to deal with the
|
||||
// "mappings" after this section. But the routines in this section should
|
||||
// not require changes.
|
||||
|
||||
/// Look up a page in the cache.
|
||||
///
|
||||
/// If the search criteria is not exact, *cache_key is updated with the key
|
||||
/// for exact key of the returned page. (For materialized pages, that means
|
||||
/// that the LSN in 'cache_key' is updated with the LSN of the returned page
|
||||
/// version.)
|
||||
///
|
||||
/// If no page is found, returns None and *cache_key is left unmodified.
|
||||
///
|
||||
fn try_lock_for_read(&self, cache_key: &mut CacheKey) -> Option<PageReadGuard> {
|
||||
let cache_key_orig = cache_key.clone();
|
||||
if let Some(slot_idx) = self.search_mapping(cache_key) {
|
||||
// The page was found in the mapping. Lock the slot, and re-check
|
||||
// that it's still what we expected (because we released the mapping
|
||||
// lock already, another thread could have evicted the page)
|
||||
let slot = &self.slots[slot_idx];
|
||||
let inner = slot.inner.read().unwrap();
|
||||
if inner.key.as_ref() == Some(cache_key) {
|
||||
slot.inc_usage_count();
|
||||
return Some(PageReadGuard(inner));
|
||||
} else {
|
||||
// search_mapping might have modified the search key; restore it.
|
||||
*cache_key = cache_key_orig;
|
||||
}
|
||||
}
|
||||
None
|
||||
}
|
||||
|
||||
/// Return a locked buffer for given block.
|
||||
///
|
||||
/// Like try_lock_for_read(), if the search criteria is not exact and the
|
||||
/// page is already found in the cache, *cache_key is updated.
|
||||
///
|
||||
/// If the page is not found in the cache, this allocates a new buffer for
|
||||
/// it. The caller may then initialize the buffer with the contents, and
|
||||
/// call mark_valid().
|
||||
///
|
||||
/// Example usage:
|
||||
///
|
||||
/// ```ignore
|
||||
/// let cache = page_cache::get();
|
||||
///
|
||||
/// match cache.lock_for_read(&key) {
|
||||
/// ReadBufResult::Found(read_guard) => {
|
||||
/// // The page was found in cache. Use it
|
||||
/// },
|
||||
/// ReadBufResult::NotFound(write_guard) => {
|
||||
/// // The page was not found in cache. Read it from disk into the
|
||||
/// // buffer.
|
||||
/// //read_my_page_from_disk(write_guard);
|
||||
///
|
||||
/// // The buffer contents are now valid. Tell the page cache.
|
||||
/// write_guard.mark_valid();
|
||||
/// },
|
||||
/// }
|
||||
/// ```
|
||||
///
|
||||
fn lock_for_read(&self, cache_key: &mut CacheKey) -> anyhow::Result<ReadBufResult> {
|
||||
let (read_access, hit) = match cache_key {
|
||||
CacheKey::MaterializedPage { .. } => {
|
||||
unreachable!("Materialized pages use lookup_materialized_page")
|
||||
}
|
||||
CacheKey::ImmutableFilePage { .. } => (
|
||||
&crate::metrics::PAGE_CACHE.read_accesses_immutable,
|
||||
&crate::metrics::PAGE_CACHE.read_hits_immutable,
|
||||
),
|
||||
};
|
||||
read_access.inc();
|
||||
|
||||
let mut is_first_iteration = true;
|
||||
loop {
|
||||
// First check if the key already exists in the cache.
|
||||
if let Some(read_guard) = self.try_lock_for_read(cache_key) {
|
||||
if is_first_iteration {
|
||||
hit.inc();
|
||||
}
|
||||
return Ok(ReadBufResult::Found(read_guard));
|
||||
}
|
||||
is_first_iteration = false;
|
||||
|
||||
// Not found. Find a victim buffer
|
||||
let (slot_idx, mut inner) =
|
||||
self.find_victim().context("Failed to find evict victim")?;
|
||||
|
||||
// Insert mapping for this. At this point, we may find that another
|
||||
// thread did the same thing concurrently. In that case, we evicted
|
||||
// our victim buffer unnecessarily. Put it into the free list and
|
||||
// continue with the slot that the other thread chose.
|
||||
if let Some(_existing_slot_idx) = self.try_insert_mapping(cache_key, slot_idx) {
|
||||
// TODO: put to free list
|
||||
|
||||
// We now just loop back to start from beginning. This is not
|
||||
// optimal, we'll perform the lookup in the mapping again, which
|
||||
// is not really necessary because we already got
|
||||
// 'existing_slot_idx'. But this shouldn't happen often enough
|
||||
// to matter much.
|
||||
continue;
|
||||
}
|
||||
|
||||
// Make the slot ready
|
||||
let slot = &self.slots[slot_idx];
|
||||
inner.key = Some(cache_key.clone());
|
||||
slot.usage_count.store(1, Ordering::Relaxed);
|
||||
|
||||
return Ok(ReadBufResult::NotFound(PageWriteGuard {
|
||||
inner,
|
||||
valid: false,
|
||||
}));
|
||||
}
|
||||
}
|
||||
|
||||
/// Look up a page in the cache and lock it in write mode. If it's not
|
||||
/// found, returns None.
|
||||
///
|
||||
/// When locking a page for writing, the search criteria is always "exact".
|
||||
fn try_lock_for_write(&self, cache_key: &CacheKey) -> Option<PageWriteGuard> {
|
||||
if let Some(slot_idx) = self.search_mapping_for_write(cache_key) {
|
||||
// The page was found in the mapping. Lock the slot, and re-check
|
||||
// that it's still what we expected (because we don't released the mapping
|
||||
// lock already, another thread could have evicted the page)
|
||||
let slot = &self.slots[slot_idx];
|
||||
let inner = slot.inner.write().unwrap();
|
||||
if inner.key.as_ref() == Some(cache_key) {
|
||||
slot.inc_usage_count();
|
||||
return Some(PageWriteGuard { inner, valid: true });
|
||||
}
|
||||
}
|
||||
None
|
||||
}
|
||||
|
||||
/// Return a write-locked buffer for given block.
|
||||
///
|
||||
/// Similar to lock_for_read(), but the returned buffer is write-locked and
|
||||
/// may be modified by the caller even if it's already found in the cache.
|
||||
fn lock_for_write(&self, cache_key: &CacheKey) -> anyhow::Result<WriteBufResult> {
|
||||
loop {
|
||||
// First check if the key already exists in the cache.
|
||||
if let Some(write_guard) = self.try_lock_for_write(cache_key) {
|
||||
return Ok(WriteBufResult::Found(write_guard));
|
||||
}
|
||||
|
||||
// Not found. Find a victim buffer
|
||||
let (slot_idx, mut inner) =
|
||||
self.find_victim().context("Failed to find evict victim")?;
|
||||
|
||||
// Insert mapping for this. At this point, we may find that another
|
||||
// thread did the same thing concurrently. In that case, we evicted
|
||||
// our victim buffer unnecessarily. Put it into the free list and
|
||||
// continue with the slot that the other thread chose.
|
||||
if let Some(_existing_slot_idx) = self.try_insert_mapping(cache_key, slot_idx) {
|
||||
// TODO: put to free list
|
||||
|
||||
// We now just loop back to start from beginning. This is not
|
||||
// optimal, we'll perform the lookup in the mapping again, which
|
||||
// is not really necessary because we already got
|
||||
// 'existing_slot_idx'. But this shouldn't happen often enough
|
||||
// to matter much.
|
||||
continue;
|
||||
}
|
||||
|
||||
// Make the slot ready
|
||||
let slot = &self.slots[slot_idx];
|
||||
inner.key = Some(cache_key.clone());
|
||||
slot.usage_count.store(1, Ordering::Relaxed);
|
||||
|
||||
return Ok(WriteBufResult::NotFound(PageWriteGuard {
|
||||
inner,
|
||||
valid: false,
|
||||
}));
|
||||
}
|
||||
}
|
||||
|
||||
//
|
||||
// Section 3: Mapping functions
|
||||
//
|
||||
|
||||
/// Search for a page in the cache using the given search key.
|
||||
///
|
||||
/// Returns the slot index, if any. If the search criteria is not exact,
|
||||
/// *cache_key is updated with the actual key of the found page.
|
||||
///
|
||||
/// NOTE: We don't hold any lock on the mapping on return, so the slot might
|
||||
/// get recycled for an unrelated page immediately after this function
|
||||
/// returns. The caller is responsible for re-checking that the slot still
|
||||
/// contains the page with the same key before using it.
|
||||
///
|
||||
fn search_mapping(&self, cache_key: &mut CacheKey) -> Option<usize> {
|
||||
match cache_key {
|
||||
CacheKey::MaterializedPage { hash_key, lsn } => {
|
||||
let map = self.materialized_page_map.read().unwrap();
|
||||
let versions = map.get(hash_key)?;
|
||||
|
||||
let version_idx = match versions.binary_search_by_key(lsn, |v| v.lsn) {
|
||||
Ok(version_idx) => version_idx,
|
||||
Err(0) => return None,
|
||||
Err(version_idx) => version_idx - 1,
|
||||
};
|
||||
let version = &versions[version_idx];
|
||||
*lsn = version.lsn;
|
||||
Some(version.slot_idx)
|
||||
}
|
||||
CacheKey::ImmutableFilePage { file_id, blkno } => {
|
||||
let map = self.immutable_page_map.read().unwrap();
|
||||
Some(*map.get(&(*file_id, *blkno))?)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Search for a page in the cache using the given search key.
|
||||
///
|
||||
/// Like 'search_mapping, but performs an "exact" search. Used for
|
||||
/// allocating a new buffer.
|
||||
fn search_mapping_for_write(&self, key: &CacheKey) -> Option<usize> {
|
||||
match key {
|
||||
CacheKey::MaterializedPage { hash_key, lsn } => {
|
||||
let map = self.materialized_page_map.read().unwrap();
|
||||
let versions = map.get(hash_key)?;
|
||||
|
||||
if let Ok(version_idx) = versions.binary_search_by_key(lsn, |v| v.lsn) {
|
||||
Some(versions[version_idx].slot_idx)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
CacheKey::ImmutableFilePage { file_id, blkno } => {
|
||||
let map = self.immutable_page_map.read().unwrap();
|
||||
Some(*map.get(&(*file_id, *blkno))?)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
///
|
||||
/// Remove mapping for given key.
|
||||
///
|
||||
fn remove_mapping(&self, old_key: &CacheKey) {
|
||||
match old_key {
|
||||
CacheKey::MaterializedPage {
|
||||
hash_key: old_hash_key,
|
||||
lsn: old_lsn,
|
||||
} => {
|
||||
let mut map = self.materialized_page_map.write().unwrap();
|
||||
if let Entry::Occupied(mut old_entry) = map.entry(old_hash_key.clone()) {
|
||||
let versions = old_entry.get_mut();
|
||||
|
||||
if let Ok(version_idx) = versions.binary_search_by_key(old_lsn, |v| v.lsn) {
|
||||
versions.remove(version_idx);
|
||||
self.size_metrics
|
||||
.current_bytes_materialized_page
|
||||
.sub_page_sz(1);
|
||||
if versions.is_empty() {
|
||||
old_entry.remove_entry();
|
||||
}
|
||||
}
|
||||
} else {
|
||||
panic!("could not find old key in mapping")
|
||||
}
|
||||
}
|
||||
CacheKey::ImmutableFilePage { file_id, blkno } => {
|
||||
let mut map = self.immutable_page_map.write().unwrap();
|
||||
map.remove(&(*file_id, *blkno))
|
||||
.expect("could not find old key in mapping");
|
||||
self.size_metrics.current_bytes_immutable.sub_page_sz(1);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
///
|
||||
/// Insert mapping for given key.
|
||||
///
|
||||
/// If a mapping already existed for the given key, returns the slot index
|
||||
/// of the existing mapping and leaves it untouched.
|
||||
fn try_insert_mapping(&self, new_key: &CacheKey, slot_idx: usize) -> Option<usize> {
|
||||
match new_key {
|
||||
CacheKey::MaterializedPage {
|
||||
hash_key: new_key,
|
||||
lsn: new_lsn,
|
||||
} => {
|
||||
let mut map = self.materialized_page_map.write().unwrap();
|
||||
let versions = map.entry(new_key.clone()).or_default();
|
||||
match versions.binary_search_by_key(new_lsn, |v| v.lsn) {
|
||||
Ok(version_idx) => Some(versions[version_idx].slot_idx),
|
||||
Err(version_idx) => {
|
||||
versions.insert(
|
||||
version_idx,
|
||||
Version {
|
||||
lsn: *new_lsn,
|
||||
slot_idx,
|
||||
},
|
||||
);
|
||||
self.size_metrics
|
||||
.current_bytes_materialized_page
|
||||
.add_page_sz(1);
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
CacheKey::ImmutableFilePage { file_id, blkno } => {
|
||||
let mut map = self.immutable_page_map.write().unwrap();
|
||||
match map.entry((*file_id, *blkno)) {
|
||||
Entry::Occupied(entry) => Some(*entry.get()),
|
||||
Entry::Vacant(entry) => {
|
||||
entry.insert(slot_idx);
|
||||
self.size_metrics.current_bytes_immutable.add_page_sz(1);
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
//
|
||||
// Section 4: Misc internal helpers
|
||||
//
|
||||
|
||||
/// Find a slot to evict.
|
||||
///
|
||||
/// On return, the slot is empty and write-locked.
|
||||
fn find_victim(&self) -> anyhow::Result<(usize, RwLockWriteGuard<SlotInner>)> {
|
||||
let iter_limit = self.slots.len() * 10;
|
||||
let mut iters = 0;
|
||||
loop {
|
||||
iters += 1;
|
||||
let slot_idx = self.next_evict_slot.fetch_add(1, Ordering::Relaxed) % self.slots.len();
|
||||
|
||||
let slot = &self.slots[slot_idx];
|
||||
|
||||
if slot.dec_usage_count() == 0 {
|
||||
let mut inner = match slot.inner.try_write() {
|
||||
Ok(inner) => inner,
|
||||
Err(TryLockError::Poisoned(err)) => {
|
||||
anyhow::bail!("buffer lock was poisoned: {err:?}")
|
||||
}
|
||||
Err(TryLockError::WouldBlock) => {
|
||||
// If we have looped through the whole buffer pool 10 times
|
||||
// and still haven't found a victim buffer, something's wrong.
|
||||
// Maybe all the buffers were in locked. That could happen in
|
||||
// theory, if you have more threads holding buffers locked than
|
||||
// there are buffers in the pool. In practice, with a reasonably
|
||||
// large buffer pool it really shouldn't happen.
|
||||
if iters > iter_limit {
|
||||
anyhow::bail!("exceeded evict iter limit");
|
||||
}
|
||||
continue;
|
||||
}
|
||||
};
|
||||
if let Some(old_key) = &inner.key {
|
||||
// remove mapping for old buffer
|
||||
self.remove_mapping(old_key);
|
||||
inner.key = None;
|
||||
}
|
||||
return Ok((slot_idx, inner));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Initialize a new page cache
|
||||
///
|
||||
/// This should be called only once at page server startup.
|
||||
fn new(num_pages: usize) -> Self {
|
||||
assert!(num_pages > 0, "page cache size must be > 0");
|
||||
|
||||
let page_buffer = Box::leak(vec![0u8; num_pages * PAGE_SZ].into_boxed_slice());
|
||||
|
||||
let size_metrics = &crate::metrics::PAGE_CACHE_SIZE;
|
||||
size_metrics.max_bytes.set_page_sz(num_pages);
|
||||
size_metrics.current_bytes_immutable.set_page_sz(0);
|
||||
size_metrics.current_bytes_materialized_page.set_page_sz(0);
|
||||
|
||||
let slots = page_buffer
|
||||
.chunks_exact_mut(PAGE_SZ)
|
||||
.map(|chunk| {
|
||||
let buf: &mut [u8; PAGE_SZ] = chunk.try_into().unwrap();
|
||||
|
||||
Slot {
|
||||
inner: RwLock::new(SlotInner { key: None, buf }),
|
||||
usage_count: AtomicU8::new(0),
|
||||
}
|
||||
})
|
||||
.collect();
|
||||
|
||||
Self {
|
||||
materialized_page_map: Default::default(),
|
||||
immutable_page_map: Default::default(),
|
||||
slots,
|
||||
next_evict_slot: AtomicUsize::new(0),
|
||||
size_metrics,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
trait PageSzBytesMetric {
|
||||
fn set_page_sz(&self, count: usize);
|
||||
fn add_page_sz(&self, count: usize);
|
||||
fn sub_page_sz(&self, count: usize);
|
||||
}
|
||||
|
||||
#[inline(always)]
|
||||
fn count_times_page_sz(count: usize) -> u64 {
|
||||
u64::try_from(count).unwrap() * u64::try_from(PAGE_SZ).unwrap()
|
||||
}
|
||||
|
||||
impl PageSzBytesMetric for metrics::UIntGauge {
|
||||
fn set_page_sz(&self, count: usize) {
|
||||
self.set(count_times_page_sz(count));
|
||||
}
|
||||
fn add_page_sz(&self, count: usize) {
|
||||
self.add(count_times_page_sz(count));
|
||||
}
|
||||
fn sub_page_sz(&self, count: usize) {
|
||||
self.sub(count_times_page_sz(count));
|
||||
}
|
||||
}
|
||||
@@ -292,6 +292,8 @@ pub enum TaskKind {
|
||||
|
||||
DebugTool,
|
||||
|
||||
BackgroundRuntimeTurnaroundMeasure,
|
||||
|
||||
#[cfg(test)]
|
||||
UnitTest,
|
||||
}
|
||||
|
||||
@@ -11,11 +11,12 @@
|
||||
//! len < 128: 0XXXXXXX
|
||||
//! len >= 128: 1XXXXXXX XXXXXXXX XXXXXXXX XXXXXXXX
|
||||
//!
|
||||
use crate::page_cache::PAGE_SZ;
|
||||
use crate::tenant::block_io::BlockCursor;
|
||||
use std::cmp::min;
|
||||
use std::io::{Error, ErrorKind};
|
||||
|
||||
use super::disk_btree::PAGE_SZ;
|
||||
|
||||
impl<'a> BlockCursor<'a> {
|
||||
/// Read a blob into a new buffer.
|
||||
pub async fn read_blob(&self, offset: u64) -> Result<Vec<u8>, std::io::Error> {
|
||||
@@ -33,7 +34,7 @@ impl<'a> BlockCursor<'a> {
|
||||
let mut blknum = (offset / PAGE_SZ as u64) as u32;
|
||||
let mut off = (offset % PAGE_SZ as u64) as usize;
|
||||
|
||||
let mut buf = self.read_blk(blknum)?;
|
||||
let mut buf = self.read_blk(blknum).await?;
|
||||
|
||||
// peek at the first byte, to determine if it's a 1- or 4-byte length
|
||||
let first_len_byte = buf[off];
|
||||
@@ -49,7 +50,7 @@ impl<'a> BlockCursor<'a> {
|
||||
// it is split across two pages
|
||||
len_buf[..thislen].copy_from_slice(&buf[off..PAGE_SZ]);
|
||||
blknum += 1;
|
||||
buf = self.read_blk(blknum)?;
|
||||
buf = self.read_blk(blknum).await?;
|
||||
len_buf[thislen..].copy_from_slice(&buf[0..4 - thislen]);
|
||||
off = 4 - thislen;
|
||||
} else {
|
||||
@@ -70,7 +71,7 @@ impl<'a> BlockCursor<'a> {
|
||||
if page_remain == 0 {
|
||||
// continue on next page
|
||||
blknum += 1;
|
||||
buf = self.read_blk(blknum)?;
|
||||
buf = self.read_blk(blknum).await?;
|
||||
off = 0;
|
||||
page_remain = PAGE_SZ;
|
||||
}
|
||||
|
||||
@@ -4,7 +4,7 @@
|
||||
|
||||
use super::ephemeral_file::EphemeralFile;
|
||||
use super::storage_layer::delta_layer::{Adapter, DeltaLayerInner};
|
||||
use crate::page_cache::{self, PageReadGuard, ReadBufResult, PAGE_SZ};
|
||||
use crate::tenant::disk_btree::PAGE_SZ;
|
||||
use crate::virtual_file::VirtualFile;
|
||||
use bytes::Bytes;
|
||||
use std::fs::File;
|
||||
@@ -36,22 +36,22 @@ where
|
||||
|
||||
/// Reference to an in-memory copy of an immutable on-disk block.
|
||||
pub enum BlockLease<'a> {
|
||||
PageReadGuard(PageReadGuard<'static>),
|
||||
PageReadGuard(crate::buffer_pool::Buffer),
|
||||
EphemeralFileMutableTail(&'a [u8; PAGE_SZ]),
|
||||
#[cfg(test)]
|
||||
Rc(std::rc::Rc<[u8; PAGE_SZ]>),
|
||||
Arc(std::sync::Arc<[u8; PAGE_SZ]>),
|
||||
}
|
||||
|
||||
impl From<PageReadGuard<'static>> for BlockLease<'static> {
|
||||
fn from(value: PageReadGuard<'static>) -> BlockLease<'static> {
|
||||
impl From<crate::buffer_pool::Buffer> for BlockLease<'static> {
|
||||
fn from(value: crate::buffer_pool::Buffer) -> BlockLease<'static> {
|
||||
BlockLease::PageReadGuard(value)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
impl<'a> From<std::rc::Rc<[u8; PAGE_SZ]>> for BlockLease<'a> {
|
||||
fn from(value: std::rc::Rc<[u8; PAGE_SZ]>) -> Self {
|
||||
BlockLease::Rc(value)
|
||||
impl<'a> From<std::sync::Arc<[u8; PAGE_SZ]>> for BlockLease<'a> {
|
||||
fn from(value: std::sync::Arc<[u8; PAGE_SZ]>) -> Self {
|
||||
BlockLease::Arc(value)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -63,7 +63,7 @@ impl<'a> Deref for BlockLease<'a> {
|
||||
BlockLease::PageReadGuard(v) => v.deref(),
|
||||
BlockLease::EphemeralFileMutableTail(v) => v,
|
||||
#[cfg(test)]
|
||||
BlockLease::Rc(v) => v.deref(),
|
||||
BlockLease::Arc(v) => v.deref(),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -74,7 +74,7 @@ impl<'a> Deref for BlockLease<'a> {
|
||||
/// Unlike traits, we also support the read function to be async though.
|
||||
pub(crate) enum BlockReaderRef<'a> {
|
||||
FileBlockReaderVirtual(&'a FileBlockReader<VirtualFile>),
|
||||
FileBlockReaderFile(&'a FileBlockReader<std::fs::File>),
|
||||
// FileBlockReaderFile(&'a FileBlockReader<std::fs::File>),
|
||||
EphemeralFile(&'a EphemeralFile),
|
||||
Adapter(Adapter<&'a DeltaLayerInner>),
|
||||
#[cfg(test)]
|
||||
@@ -83,13 +83,13 @@ pub(crate) enum BlockReaderRef<'a> {
|
||||
|
||||
impl<'a> BlockReaderRef<'a> {
|
||||
#[inline(always)]
|
||||
fn read_blk(&self, blknum: u32) -> Result<BlockLease, std::io::Error> {
|
||||
async fn read_blk(&self, blknum: u32) -> Result<BlockLease, std::io::Error> {
|
||||
use BlockReaderRef::*;
|
||||
match self {
|
||||
FileBlockReaderVirtual(r) => r.read_blk(blknum),
|
||||
FileBlockReaderFile(r) => r.read_blk(blknum),
|
||||
EphemeralFile(r) => r.read_blk(blknum),
|
||||
Adapter(r) => r.read_blk(blknum),
|
||||
FileBlockReaderVirtual(r) => r.read_blk(blknum).await,
|
||||
// FileBlockReaderFile(r) => r.read_blk(blknum).await,
|
||||
EphemeralFile(r) => r.read_blk(blknum).await,
|
||||
Adapter(r) => r.read_blk(blknum).await,
|
||||
#[cfg(test)]
|
||||
TestDisk(r) => r.read_blk(blknum),
|
||||
}
|
||||
@@ -134,8 +134,8 @@ impl<'a> BlockCursor<'a> {
|
||||
/// access to the contents of the page. (For the page cache, the
|
||||
/// lease object represents a lock on the buffer.)
|
||||
#[inline(always)]
|
||||
pub fn read_blk(&self, blknum: u32) -> Result<BlockLease, std::io::Error> {
|
||||
self.reader.read_blk(blknum)
|
||||
pub async fn read_blk(&self, blknum: u32) -> Result<BlockLease, std::io::Error> {
|
||||
self.reader.read_blk(blknum).await
|
||||
}
|
||||
}
|
||||
|
||||
@@ -145,61 +145,51 @@ impl<'a> BlockCursor<'a> {
|
||||
/// for modifying the file, nor for invalidating the cache if it is modified.
|
||||
pub struct FileBlockReader<F> {
|
||||
pub file: F,
|
||||
|
||||
/// Unique ID of this file, used as key in the page cache.
|
||||
file_id: page_cache::FileId,
|
||||
}
|
||||
|
||||
impl<F> FileBlockReader<F>
|
||||
where
|
||||
F: FileExt,
|
||||
{
|
||||
impl<F> FileBlockReader<F> {
|
||||
pub fn new(file: F) -> Self {
|
||||
let file_id = page_cache::next_file_id();
|
||||
|
||||
FileBlockReader { file_id, file }
|
||||
FileBlockReader { file }
|
||||
}
|
||||
}
|
||||
|
||||
/// Read a page from the underlying file into given buffer.
|
||||
fn fill_buffer(&self, buf: &mut [u8], blkno: u32) -> Result<(), std::io::Error> {
|
||||
assert!(buf.len() == PAGE_SZ);
|
||||
self.file.read_exact_at(buf, blkno as u64 * PAGE_SZ as u64)
|
||||
}
|
||||
/// Read a block.
|
||||
///
|
||||
/// Returns a "lease" object that can be used to
|
||||
/// access to the contents of the page. (For the page cache, the
|
||||
/// lease object represents a lock on the buffer.)
|
||||
pub fn read_blk(&self, blknum: u32) -> Result<BlockLease, std::io::Error> {
|
||||
let cache = page_cache::get();
|
||||
loop {
|
||||
match cache
|
||||
.read_immutable_buf(self.file_id, blknum)
|
||||
.map_err(|e| {
|
||||
std::io::Error::new(
|
||||
std::io::ErrorKind::Other,
|
||||
format!("Failed to read immutable buf: {e:#}"),
|
||||
)
|
||||
})? {
|
||||
ReadBufResult::Found(guard) => break Ok(guard.into()),
|
||||
ReadBufResult::NotFound(mut write_guard) => {
|
||||
// Read the page from disk into the buffer
|
||||
self.fill_buffer(write_guard.deref_mut(), blknum)?;
|
||||
write_guard.mark_valid();
|
||||
|
||||
// Swap for read lock
|
||||
continue;
|
||||
}
|
||||
};
|
||||
macro_rules! impls {
|
||||
(FileBlockReader<$ty:ty>) => {
|
||||
impl FileBlockReader<$ty> {
|
||||
/// Read a page from the underlying file into given buffer.
|
||||
async fn fill_buffer(
|
||||
&self,
|
||||
buf: crate::buffer_pool::Buffer,
|
||||
blkno: u32,
|
||||
) -> Result<crate::buffer_pool::Buffer, std::io::Error> {
|
||||
assert!(buf.len() == PAGE_SZ);
|
||||
self.file
|
||||
.read_exact_at_async(buf, blkno as u64 * PAGE_SZ as u64)
|
||||
.await
|
||||
}
|
||||
/// Read a block.
|
||||
///
|
||||
/// Returns a "lease" object that can be used to
|
||||
/// access to the contents of the page. (For the page cache, the
|
||||
/// lease object represents a lock on the buffer.)
|
||||
pub async fn read_blk(&self, blknum: u32) -> Result<BlockLease, std::io::Error> {
|
||||
let buf = crate::buffer_pool::get();
|
||||
// Read the page from disk into the buffer
|
||||
let mut write_guard = self.fill_buffer(buf, blknum).await?;
|
||||
Ok(BlockLease::PageReadGuard(write_guard))
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
impl BlockReader for FileBlockReader<File> {
|
||||
fn block_cursor(&self) -> BlockCursor<'_> {
|
||||
BlockCursor::new(BlockReaderRef::FileBlockReaderFile(self))
|
||||
}
|
||||
}
|
||||
// impls!(FileBlockReader<File>);
|
||||
impls!(FileBlockReader<VirtualFile>);
|
||||
|
||||
// impl BlockReader for FileBlockReader<File> {
|
||||
// fn block_cursor(&self) -> BlockCursor<'_> {
|
||||
// BlockCursor::new(BlockReaderRef::FileBlockReaderFile(self))
|
||||
// }
|
||||
// }
|
||||
|
||||
impl BlockReader for FileBlockReader<VirtualFile> {
|
||||
fn block_cursor(&self) -> BlockCursor<'_> {
|
||||
|
||||
@@ -262,7 +262,7 @@ where
|
||||
let block_cursor = self.reader.block_cursor();
|
||||
while let Some((node_blknum, opt_iter)) = stack.pop() {
|
||||
// Locate the node.
|
||||
let node_buf = block_cursor.read_blk(self.start_blk + node_blknum)?;
|
||||
let node_buf = block_cursor.read_blk(self.start_blk + node_blknum).await?;
|
||||
|
||||
let node = OnDiskNode::deparse(node_buf.as_ref())?;
|
||||
let prefix_len = node.prefix_len as usize;
|
||||
@@ -357,7 +357,7 @@ where
|
||||
let block_cursor = self.reader.block_cursor();
|
||||
|
||||
while let Some((blknum, path, depth, child_idx, key_off)) = stack.pop() {
|
||||
let blk = block_cursor.read_blk(self.start_blk + blknum)?;
|
||||
let blk = block_cursor.read_blk(self.start_blk + blknum).await?;
|
||||
let buf: &[u8] = blk.as_ref();
|
||||
let node = OnDiskNode::<L>::deparse(buf)?;
|
||||
|
||||
@@ -704,7 +704,7 @@ pub(crate) mod tests {
|
||||
pub(crate) fn read_blk(&self, blknum: u32) -> io::Result<BlockLease> {
|
||||
let mut buf = [0u8; PAGE_SZ];
|
||||
buf.copy_from_slice(&self.blocks[blknum as usize]);
|
||||
Ok(std::rc::Rc::new(buf).into())
|
||||
Ok(std::sync::Arc::new(buf).into())
|
||||
}
|
||||
}
|
||||
impl BlockReader for TestDisk {
|
||||
|
||||
@@ -2,22 +2,19 @@
|
||||
//! used to keep in-memory layers spilled on disk.
|
||||
|
||||
use crate::config::PageServerConf;
|
||||
use crate::page_cache::{self, PAGE_SZ};
|
||||
use crate::tenant::block_io::{BlockCursor, BlockLease, BlockReader};
|
||||
use crate::tenant::disk_btree::PAGE_SZ;
|
||||
use crate::virtual_file::VirtualFile;
|
||||
use std::cmp::min;
|
||||
use std::fs::OpenOptions;
|
||||
use std::io::{self, ErrorKind};
|
||||
use std::ops::DerefMut;
|
||||
use std::os::unix::prelude::FileExt;
|
||||
use std::path::PathBuf;
|
||||
use std::sync::atomic::AtomicU64;
|
||||
use tracing::*;
|
||||
use utils::id::{TenantId, TimelineId};
|
||||
|
||||
pub struct EphemeralFile {
|
||||
page_cache_file_id: page_cache::FileId,
|
||||
|
||||
_tenant_id: TenantId,
|
||||
_timeline_id: TimelineId,
|
||||
file: VirtualFile,
|
||||
@@ -48,7 +45,6 @@ impl EphemeralFile {
|
||||
)?;
|
||||
|
||||
Ok(EphemeralFile {
|
||||
page_cache_file_id: page_cache::next_file_id(),
|
||||
_tenant_id: tenant_id,
|
||||
_timeline_id: timeline_id,
|
||||
file,
|
||||
@@ -61,40 +57,17 @@ impl EphemeralFile {
|
||||
self.len
|
||||
}
|
||||
|
||||
pub(crate) fn read_blk(&self, blknum: u32) -> Result<BlockLease, io::Error> {
|
||||
pub(crate) async fn read_blk(&self, blknum: u32) -> Result<BlockLease, io::Error> {
|
||||
let flushed_blknums = 0..self.len / PAGE_SZ as u64;
|
||||
if flushed_blknums.contains(&(blknum as u64)) {
|
||||
let cache = page_cache::get();
|
||||
loop {
|
||||
match cache
|
||||
.read_immutable_buf(self.page_cache_file_id, blknum)
|
||||
.map_err(|e| {
|
||||
std::io::Error::new(
|
||||
std::io::ErrorKind::Other,
|
||||
// order path before error because error is anyhow::Error => might have many contexts
|
||||
format!(
|
||||
"ephemeral file: read immutable page #{}: {}: {:#}",
|
||||
blknum,
|
||||
self.file.path.display(),
|
||||
e,
|
||||
),
|
||||
)
|
||||
})? {
|
||||
page_cache::ReadBufResult::Found(guard) => {
|
||||
return Ok(BlockLease::PageReadGuard(guard))
|
||||
}
|
||||
page_cache::ReadBufResult::NotFound(mut write_guard) => {
|
||||
let buf: &mut [u8] = write_guard.deref_mut();
|
||||
debug_assert_eq!(buf.len(), PAGE_SZ);
|
||||
self.file
|
||||
.read_exact_at(&mut buf[..], blknum as u64 * PAGE_SZ as u64)?;
|
||||
write_guard.mark_valid();
|
||||
|
||||
// Swap for read lock
|
||||
continue;
|
||||
}
|
||||
};
|
||||
}
|
||||
let mut write_guard: crate::buffer_pool::Buffer = crate::buffer_pool::get();
|
||||
let buf: &mut [u8] = write_guard.deref_mut();
|
||||
debug_assert_eq!(buf.len(), PAGE_SZ);
|
||||
let mut buf = self
|
||||
.file
|
||||
.read_exact_at_async(write_guard, blknum as u64 * PAGE_SZ as u64)
|
||||
.await?;
|
||||
Ok(BlockLease::PageReadGuard(buf))
|
||||
} else {
|
||||
debug_assert_eq!(blknum as u64, self.len / PAGE_SZ as u64);
|
||||
Ok(BlockLease::EphemeralFileMutableTail(&self.mutable_tail))
|
||||
@@ -132,29 +105,6 @@ impl EphemeralFile {
|
||||
self.blknum as u64 * PAGE_SZ as u64,
|
||||
) {
|
||||
Ok(_) => {
|
||||
// Pre-warm the page cache with what we just wrote.
|
||||
// This isn't necessary for coherency/correctness, but it's how we've always done it.
|
||||
let cache = page_cache::get();
|
||||
match cache.read_immutable_buf(
|
||||
self.ephemeral_file.page_cache_file_id,
|
||||
self.blknum,
|
||||
) {
|
||||
Ok(page_cache::ReadBufResult::Found(_guard)) => {
|
||||
// This function takes &mut self, so, it shouldn't be possible to reach this point.
|
||||
unreachable!("we just wrote blknum {} and this function takes &mut self, so, no concurrent read_blk is possible", self.blknum);
|
||||
}
|
||||
Ok(page_cache::ReadBufResult::NotFound(mut write_guard)) => {
|
||||
let buf: &mut [u8] = write_guard.deref_mut();
|
||||
debug_assert_eq!(buf.len(), PAGE_SZ);
|
||||
buf.copy_from_slice(&self.ephemeral_file.mutable_tail);
|
||||
write_guard.mark_valid();
|
||||
// pre-warm successful
|
||||
}
|
||||
Err(e) => {
|
||||
error!("ephemeral_file write_blob failed to get immutable buf to pre-warm page cache: {e:?}");
|
||||
// fail gracefully, it's not the end of the world if we can't pre-warm the cache here
|
||||
}
|
||||
}
|
||||
// Zero the buffer for re-use.
|
||||
// Zeroing is critical for correcntess because the write_blob code below
|
||||
// and similarly read_blk expect zeroed pages.
|
||||
|
||||
@@ -26,7 +26,7 @@
|
||||
//! recovered from this file. This is tracked in
|
||||
//! <https://github.com/neondatabase/neon/issues/4418>
|
||||
|
||||
use std::io::{self, Read, Write};
|
||||
use std::io::{self, Write};
|
||||
|
||||
use crate::virtual_file::VirtualFile;
|
||||
use anyhow::Result;
|
||||
@@ -151,11 +151,12 @@ impl Manifest {
|
||||
/// Load a manifest. Returns the manifest and a list of operations. If the manifest is corrupted,
|
||||
/// the bool flag will be set to true and the user is responsible to reconstruct a new manifest and
|
||||
/// backup the current one.
|
||||
pub fn load(
|
||||
mut file: VirtualFile,
|
||||
pub async fn load(
|
||||
file: VirtualFile,
|
||||
) -> Result<(Self, Vec<Operation>, ManifestPartiallyCorrupted), ManifestLoadError> {
|
||||
let mut buf = vec![];
|
||||
file.read_to_end(&mut buf).map_err(ManifestLoadError::Io)?;
|
||||
file.read_exact_at(&mut buf, 0)
|
||||
.map_err(ManifestLoadError::Io)?;
|
||||
|
||||
// Read manifest header
|
||||
let mut buf = Bytes::from(buf);
|
||||
@@ -241,8 +242,8 @@ mod tests {
|
||||
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_read_manifest() {
|
||||
#[tokio::test]
|
||||
async fn test_read_manifest() {
|
||||
let testdir = crate::config::PageServerConf::test_repo_dir("test_read_manifest");
|
||||
std::fs::create_dir_all(&testdir).unwrap();
|
||||
let file = VirtualFile::create(&testdir.join("MANIFEST")).unwrap();
|
||||
@@ -274,7 +275,7 @@ mod tests {
|
||||
.truncate(false),
|
||||
)
|
||||
.unwrap();
|
||||
let (mut manifest, operations, corrupted) = Manifest::load(file).unwrap();
|
||||
let (mut manifest, operations, corrupted) = Manifest::load(file).await.unwrap();
|
||||
assert!(!corrupted.0);
|
||||
assert_eq!(operations.len(), 2);
|
||||
assert_eq!(
|
||||
@@ -306,7 +307,7 @@ mod tests {
|
||||
.truncate(false),
|
||||
)
|
||||
.unwrap();
|
||||
let (_manifest, operations, corrupted) = Manifest::load(file).unwrap();
|
||||
let (_manifest, operations, corrupted) = Manifest::load(file).await.unwrap();
|
||||
assert!(!corrupted.0);
|
||||
assert_eq!(operations.len(), 3);
|
||||
assert_eq!(&operations[0], &Operation::Snapshot(snapshot, Lsn::from(0)));
|
||||
|
||||
@@ -29,7 +29,7 @@
|
||||
//!
|
||||
use crate::config::PageServerConf;
|
||||
use crate::context::RequestContext;
|
||||
use crate::page_cache::PAGE_SZ;
|
||||
use crate::tenant::disk_btree::PAGE_SZ;
|
||||
use crate::repository::{Key, Value, KEY_SIZE};
|
||||
use crate::tenant::blob_io::{BlobWriter, WriteBlobWriter};
|
||||
use crate::tenant::block_io::{BlockBuf, BlockCursor, BlockLease, BlockReader, FileBlockReader};
|
||||
@@ -45,8 +45,8 @@ use pageserver_api::models::{HistoricLayerInfo, LayerAccessKind};
|
||||
use rand::{distributions::Alphanumeric, Rng};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::fs::{self, File};
|
||||
use std::io::SeekFrom;
|
||||
use std::io::{BufWriter, Write};
|
||||
use std::io::{Seek, SeekFrom};
|
||||
use std::ops::Range;
|
||||
use std::os::unix::fs::FileExt;
|
||||
use std::path::{Path, PathBuf};
|
||||
@@ -467,7 +467,7 @@ impl DeltaLayer {
|
||||
PathOrConf::Path(_) => None,
|
||||
};
|
||||
|
||||
let loaded = DeltaLayerInner::load(&path, summary)?;
|
||||
let loaded = DeltaLayerInner::load(&path, summary).await?;
|
||||
|
||||
if let PathOrConf::Path(ref path) = self.path_or_conf {
|
||||
// not production code
|
||||
@@ -841,12 +841,16 @@ impl Drop for DeltaLayerWriter {
|
||||
}
|
||||
|
||||
impl DeltaLayerInner {
|
||||
pub(super) fn load(path: &std::path::Path, summary: Option<Summary>) -> anyhow::Result<Self> {
|
||||
let file = VirtualFile::open(path)
|
||||
pub(super) async fn load(
|
||||
path: &std::path::Path,
|
||||
summary: Option<Summary>,
|
||||
) -> anyhow::Result<Self> {
|
||||
let file = VirtualFile::open_async(path)
|
||||
.await
|
||||
.with_context(|| format!("Failed to open file '{}'", path.display()))?;
|
||||
let file = FileBlockReader::new(file);
|
||||
|
||||
let summary_blk = file.read_blk(0)?;
|
||||
let summary_blk = file.read_blk(0).await?;
|
||||
let actual_summary = Summary::des_prefix(summary_blk.as_ref())?;
|
||||
|
||||
if let Some(mut expected_summary) = summary {
|
||||
@@ -854,11 +858,11 @@ impl DeltaLayerInner {
|
||||
expected_summary.index_start_blk = actual_summary.index_start_blk;
|
||||
expected_summary.index_root_blk = actual_summary.index_root_blk;
|
||||
if actual_summary != expected_summary {
|
||||
bail!(
|
||||
"in-file summary does not match expected summary. actual = {:?} expected = {:?}",
|
||||
actual_summary,
|
||||
expected_summary
|
||||
);
|
||||
// bail!(
|
||||
// "in-file summary does not match expected summary. actual = {:?} expected = {:?}",
|
||||
// actual_summary,
|
||||
// expected_summary
|
||||
// );
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1028,7 +1032,7 @@ impl<'a> ValueRef<'a> {
|
||||
pub(crate) struct Adapter<T>(T);
|
||||
|
||||
impl<T: AsRef<DeltaLayerInner>> Adapter<T> {
|
||||
pub(crate) fn read_blk(&self, blknum: u32) -> Result<BlockLease, std::io::Error> {
|
||||
self.0.as_ref().file.read_blk(blknum)
|
||||
pub(crate) async fn read_blk(&self, blknum: u32) -> Result<BlockLease, std::io::Error> {
|
||||
self.0.as_ref().file.read_blk(blknum).await
|
||||
}
|
||||
}
|
||||
|
||||
@@ -25,7 +25,7 @@
|
||||
//! actual page images are stored in the "values" part.
|
||||
use crate::config::PageServerConf;
|
||||
use crate::context::RequestContext;
|
||||
use crate::page_cache::PAGE_SZ;
|
||||
use crate::tenant::disk_btree::PAGE_SZ;
|
||||
use crate::repository::{Key, KEY_SIZE};
|
||||
use crate::tenant::blob_io::{BlobWriter, WriteBlobWriter};
|
||||
use crate::tenant::block_io::{BlockBuf, BlockReader, FileBlockReader};
|
||||
@@ -42,8 +42,8 @@ use pageserver_api::models::{HistoricLayerInfo, LayerAccessKind};
|
||||
use rand::{distributions::Alphanumeric, Rng};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::fs::{self, File};
|
||||
use std::io::SeekFrom;
|
||||
use std::io::Write;
|
||||
use std::io::{Seek, SeekFrom};
|
||||
use std::ops::Range;
|
||||
use std::os::unix::prelude::FileExt;
|
||||
use std::path::{Path, PathBuf};
|
||||
@@ -349,7 +349,8 @@ impl ImageLayer {
|
||||
PathOrConf::Path(_) => None,
|
||||
};
|
||||
|
||||
let loaded = ImageLayerInner::load(&path, self.desc.image_layer_lsn(), expected_summary)?;
|
||||
let loaded =
|
||||
ImageLayerInner::load(&path, self.desc.image_layer_lsn(), expected_summary).await?;
|
||||
|
||||
if let PathOrConf::Path(ref path) = self.path_or_conf {
|
||||
// not production code
|
||||
@@ -432,15 +433,16 @@ impl ImageLayer {
|
||||
}
|
||||
|
||||
impl ImageLayerInner {
|
||||
pub(super) fn load(
|
||||
pub(super) async fn load(
|
||||
path: &std::path::Path,
|
||||
lsn: Lsn,
|
||||
summary: Option<Summary>,
|
||||
) -> anyhow::Result<Self> {
|
||||
let file = VirtualFile::open(path)
|
||||
let file = VirtualFile::open_async(path)
|
||||
.await
|
||||
.with_context(|| format!("Failed to open file '{}'", path.display()))?;
|
||||
let file = FileBlockReader::new(file);
|
||||
let summary_blk = file.read_blk(0)?;
|
||||
let summary_blk = file.read_blk(0).await?;
|
||||
let actual_summary = Summary::des_prefix(summary_blk.as_ref())?;
|
||||
|
||||
if let Some(mut expected_summary) = summary {
|
||||
@@ -449,11 +451,11 @@ impl ImageLayerInner {
|
||||
expected_summary.index_root_blk = actual_summary.index_root_blk;
|
||||
|
||||
if actual_summary != expected_summary {
|
||||
bail!(
|
||||
"in-file summary does not match expected summary. actual = {:?} expected = {:?}",
|
||||
actual_summary,
|
||||
expected_summary
|
||||
);
|
||||
// bail!(
|
||||
// "in-file summary does not match expected summary. actual = {:?} expected = {:?}",
|
||||
// actual_summary,
|
||||
// expected_summary
|
||||
// );
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -38,6 +38,7 @@ use std::time::{Duration, Instant, SystemTime};
|
||||
use crate::context::{
|
||||
AccessStatsBehavior, DownloadBehavior, RequestContext, RequestContextBuilder,
|
||||
};
|
||||
use crate::tenant::disk_btree::PAGE_SZ;
|
||||
use crate::tenant::remote_timeline_client::index::LayerFileMetadata;
|
||||
use crate::tenant::storage_layer::delta_layer::DeltaEntry;
|
||||
use crate::tenant::storage_layer::{
|
||||
@@ -73,7 +74,6 @@ use utils::{
|
||||
simple_rcu::{Rcu, RcuReadGuard},
|
||||
};
|
||||
|
||||
use crate::page_cache;
|
||||
use crate::repository::GcResult;
|
||||
use crate::repository::{Key, Value};
|
||||
use crate::task_mgr;
|
||||
@@ -465,7 +465,7 @@ impl Timeline {
|
||||
// The cached image can be returned directly if there is no WAL between the cached image
|
||||
// and requested LSN. The cached image can also be used to reduce the amount of WAL needed
|
||||
// for redo.
|
||||
let cached_page_img = match self.lookup_cached_page(&key, lsn) {
|
||||
let cached_page_img = match self.lookup_cached_page(&key, lsn).await {
|
||||
Some((cached_lsn, cached_img)) => {
|
||||
match cached_lsn.cmp(&lsn) {
|
||||
Ordering::Less => {} // there might be WAL between cached_lsn and lsn, we need to check
|
||||
@@ -494,6 +494,7 @@ impl Timeline {
|
||||
|
||||
RECONSTRUCT_TIME
|
||||
.observe_closure_duration(|| self.reconstruct_value(key, lsn, reconstruct_state))
|
||||
.await
|
||||
}
|
||||
|
||||
/// Get last or prev record separately. Same as get_last_record_rlsn().last/prev.
|
||||
@@ -630,38 +631,38 @@ impl Timeline {
|
||||
) -> anyhow::Result<()> {
|
||||
const ROUNDS: usize = 2;
|
||||
|
||||
static CONCURRENT_COMPACTIONS: once_cell::sync::Lazy<tokio::sync::Semaphore> =
|
||||
once_cell::sync::Lazy::new(|| {
|
||||
let total_threads = *task_mgr::BACKGROUND_RUNTIME_WORKER_THREADS;
|
||||
let permits = usize::max(
|
||||
1,
|
||||
// while a lot of the work is done on spawn_blocking, we still do
|
||||
// repartitioning in the async context. this should give leave us some workers
|
||||
// unblocked to be blocked on other work, hopefully easing any outside visible
|
||||
// effects of restarts.
|
||||
//
|
||||
// 6/8 is a guess; previously we ran with unlimited 8 and more from
|
||||
// spawn_blocking.
|
||||
(total_threads * 3).checked_div(4).unwrap_or(0),
|
||||
);
|
||||
assert_ne!(permits, 0, "we will not be adding in permits later");
|
||||
assert!(
|
||||
permits < total_threads,
|
||||
"need threads avail for shorter work"
|
||||
);
|
||||
tokio::sync::Semaphore::new(permits)
|
||||
});
|
||||
// static CONCURRENT_COMPACTIONS: once_cell::sync::Lazy<tokio::sync::Semaphore> =
|
||||
// once_cell::sync::Lazy::new(|| {
|
||||
// let total_threads = *task_mgr::BACKGROUND_RUNTIME_WORKER_THREADS;
|
||||
// let permits = usize::max(
|
||||
// 1,
|
||||
// // while a lot of the work is done on spawn_blocking, we still do
|
||||
// // repartitioning in the async context. this should give leave us some workers
|
||||
// // unblocked to be blocked on other work, hopefully easing any outside visible
|
||||
// // effects of restarts.
|
||||
// //
|
||||
// // 6/8 is a guess; previously we ran with unlimited 8 and more from
|
||||
// // spawn_blocking.
|
||||
// (total_threads * 3).checked_div(4).unwrap_or(0),
|
||||
// );
|
||||
// assert_ne!(permits, 0, "we will not be adding in permits later");
|
||||
// assert!(
|
||||
// permits < total_threads,
|
||||
// "need threads avail for shorter work"
|
||||
// );
|
||||
// tokio::sync::Semaphore::new(permits)
|
||||
// });
|
||||
|
||||
// this wait probably never needs any "long time spent" logging, because we already nag if
|
||||
// compaction task goes over it's period (20s) which is quite often in production.
|
||||
let _permit = tokio::select! {
|
||||
permit = CONCURRENT_COMPACTIONS.acquire() => {
|
||||
permit
|
||||
},
|
||||
_ = cancel.cancelled() => {
|
||||
return Ok(());
|
||||
}
|
||||
};
|
||||
// // this wait probably never needs any "long time spent" logging, because we already nag if
|
||||
// // compaction task goes over it's period (20s) which is quite often in production.
|
||||
// let _permit = tokio::select! {
|
||||
// permit = CONCURRENT_COMPACTIONS.acquire() => {
|
||||
// permit
|
||||
// },
|
||||
// _ = cancel.cancelled() => {
|
||||
// return Ok(());
|
||||
// }
|
||||
// };
|
||||
|
||||
let last_record_lsn = self.get_last_record_lsn();
|
||||
|
||||
@@ -2443,15 +2444,8 @@ impl Timeline {
|
||||
}
|
||||
}
|
||||
|
||||
fn lookup_cached_page(&self, key: &Key, lsn: Lsn) -> Option<(Lsn, Bytes)> {
|
||||
let cache = page_cache::get();
|
||||
|
||||
// FIXME: It's pointless to check the cache for things that are not 8kB pages.
|
||||
// We should look at the key to determine if it's a cacheable object
|
||||
let (lsn, read_guard) =
|
||||
cache.lookup_materialized_page(self.tenant_id, self.timeline_id, key, lsn)?;
|
||||
let img = Bytes::from(read_guard.to_vec());
|
||||
Some((lsn, img))
|
||||
async fn lookup_cached_page(&self, key: &Key, lsn: Lsn) -> Option<(Lsn, Bytes)> {
|
||||
None
|
||||
}
|
||||
|
||||
fn get_ancestor_timeline(&self) -> anyhow::Result<Arc<Timeline>> {
|
||||
@@ -3366,7 +3360,7 @@ impl Timeline {
|
||||
// Determine N largest holes where N is number of compacted layers.
|
||||
let max_holes = deltas_to_compact.len();
|
||||
let last_record_lsn = self.get_last_record_lsn();
|
||||
let min_hole_range = (target_file_size / page_cache::PAGE_SZ as u64) as i128;
|
||||
let min_hole_range = (target_file_size / PAGE_SZ as u64) as i128;
|
||||
let min_hole_coverage_size = 3; // TODO: something more flexible?
|
||||
|
||||
// min-heap (reserve space for one more element added before eviction)
|
||||
@@ -3602,7 +3596,7 @@ impl Timeline {
|
||||
// Add two pages for potential overhead. This should in theory be already
|
||||
// accounted for in the target calculation, but for very small targets,
|
||||
// we still might easily hit the limit otherwise.
|
||||
let warn_limit = target_file_size * 2 + page_cache::PAGE_SZ as u64 * 2;
|
||||
let warn_limit = target_file_size * 2 + PAGE_SZ as u64 * 2;
|
||||
for layer in new_layers.iter() {
|
||||
if layer.layer_desc().file_size > warn_limit {
|
||||
warn!(
|
||||
@@ -4131,7 +4125,7 @@ impl Timeline {
|
||||
///
|
||||
/// Reconstruct a value, using the given base image and WAL records in 'data'.
|
||||
///
|
||||
fn reconstruct_value(
|
||||
async fn reconstruct_value(
|
||||
&self,
|
||||
key: Key,
|
||||
request_lsn: Lsn,
|
||||
@@ -4190,22 +4184,6 @@ impl Timeline {
|
||||
Err(e) => return Err(PageReconstructError::from(e)),
|
||||
};
|
||||
|
||||
if img.len() == page_cache::PAGE_SZ {
|
||||
let cache = page_cache::get();
|
||||
if let Err(e) = cache
|
||||
.memorize_materialized_page(
|
||||
self.tenant_id,
|
||||
self.timeline_id,
|
||||
key,
|
||||
last_rec_lsn,
|
||||
&img,
|
||||
)
|
||||
.context("Materialized page memoization failed")
|
||||
{
|
||||
return Err(PageReconstructError::from(e));
|
||||
}
|
||||
}
|
||||
|
||||
Ok(img)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -11,13 +11,15 @@
|
||||
//! src/backend/storage/file/fd.c
|
||||
//!
|
||||
use crate::metrics::{STORAGE_IO_SIZE, STORAGE_IO_TIME};
|
||||
use once_cell::sync::OnceCell;
|
||||
|
||||
use std::fs::{self, File, OpenOptions};
|
||||
use std::io::{Error, ErrorKind, Read, Seek, SeekFrom, Write};
|
||||
use std::io::{Error, ErrorKind, Seek, SeekFrom, Write};
|
||||
|
||||
use std::os::fd::OwnedFd;
|
||||
use std::os::unix::fs::FileExt;
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
|
||||
use std::sync::{RwLock, RwLockWriteGuard};
|
||||
use std::sync::atomic::AtomicBool;
|
||||
use std::sync::{Arc, Mutex};
|
||||
|
||||
///
|
||||
/// A virtual file descriptor. You can use this just like std::fs::File, but internally
|
||||
@@ -39,7 +41,7 @@ pub struct VirtualFile {
|
||||
/// Lazy handle to the global file descriptor cache. The slot that this points to
|
||||
/// might contain our File, or it may be empty, or it may contain a File that
|
||||
/// belongs to a different VirtualFile.
|
||||
handle: RwLock<SlotHandle>,
|
||||
handle: Arc<Mutex<Option<File>>>, // only transiently None
|
||||
|
||||
/// Current file position
|
||||
pos: u64,
|
||||
@@ -51,7 +53,6 @@ pub struct VirtualFile {
|
||||
/// opened, in the VirtualFile::create() function, and strip the flag before
|
||||
/// storing it here.
|
||||
pub path: PathBuf,
|
||||
open_options: OpenOptions,
|
||||
|
||||
// These are strings becase we only use them for metrics, and those expect strings.
|
||||
// It makes no sense for us to constantly turn the `TimelineId` and `TenantId` into
|
||||
@@ -60,118 +61,6 @@ pub struct VirtualFile {
|
||||
timeline_id: String,
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq, Clone, Copy)]
|
||||
struct SlotHandle {
|
||||
/// Index into OPEN_FILES.slots
|
||||
index: usize,
|
||||
|
||||
/// Value of 'tag' in the slot. If slot's tag doesn't match, then the slot has
|
||||
/// been recycled and no longer contains the FD for this virtual file.
|
||||
tag: u64,
|
||||
}
|
||||
|
||||
/// OPEN_FILES is the global array that holds the physical file descriptors that
|
||||
/// are currently open. Each slot in the array is protected by a separate lock,
|
||||
/// so that different files can be accessed independently. The lock must be held
|
||||
/// in write mode to replace the slot with a different file, but a read mode
|
||||
/// is enough to operate on the file, whether you're reading or writing to it.
|
||||
///
|
||||
/// OPEN_FILES starts in uninitialized state, and it's initialized by
|
||||
/// the virtual_file::init() function. It must be called exactly once at page
|
||||
/// server startup.
|
||||
static OPEN_FILES: OnceCell<OpenFiles> = OnceCell::new();
|
||||
|
||||
struct OpenFiles {
|
||||
slots: &'static [Slot],
|
||||
|
||||
/// clock arm for the clock algorithm
|
||||
next: AtomicUsize,
|
||||
}
|
||||
|
||||
struct Slot {
|
||||
inner: RwLock<SlotInner>,
|
||||
|
||||
/// has this file been used since last clock sweep?
|
||||
recently_used: AtomicBool,
|
||||
}
|
||||
|
||||
struct SlotInner {
|
||||
/// Counter that's incremented every time a different file is stored here.
|
||||
/// To avoid the ABA problem.
|
||||
tag: u64,
|
||||
|
||||
/// the underlying file
|
||||
file: Option<File>,
|
||||
}
|
||||
|
||||
impl OpenFiles {
|
||||
/// Find a slot to use, evicting an existing file descriptor if needed.
|
||||
///
|
||||
/// On return, we hold a lock on the slot, and its 'tag' has been updated
|
||||
/// recently_used has been set. It's all ready for reuse.
|
||||
fn find_victim_slot(&self) -> (SlotHandle, RwLockWriteGuard<SlotInner>) {
|
||||
//
|
||||
// Run the clock algorithm to find a slot to replace.
|
||||
//
|
||||
let num_slots = self.slots.len();
|
||||
let mut retries = 0;
|
||||
let mut slot;
|
||||
let mut slot_guard;
|
||||
let index;
|
||||
loop {
|
||||
let next = self.next.fetch_add(1, Ordering::AcqRel) % num_slots;
|
||||
slot = &self.slots[next];
|
||||
|
||||
// If the recently_used flag on this slot is set, continue the clock
|
||||
// sweep. Otherwise try to use this slot. If we cannot acquire the
|
||||
// lock, also continue the clock sweep.
|
||||
//
|
||||
// We only continue in this manner for a while, though. If we loop
|
||||
// through the array twice without finding a victim, just pick the
|
||||
// next slot and wait until we can reuse it. This way, we avoid
|
||||
// spinning in the extreme case that all the slots are busy with an
|
||||
// I/O operation.
|
||||
if retries < num_slots * 2 {
|
||||
if !slot.recently_used.swap(false, Ordering::Release) {
|
||||
if let Ok(guard) = slot.inner.try_write() {
|
||||
slot_guard = guard;
|
||||
index = next;
|
||||
break;
|
||||
}
|
||||
}
|
||||
retries += 1;
|
||||
} else {
|
||||
slot_guard = slot.inner.write().unwrap();
|
||||
index = next;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
//
|
||||
// We now have the victim slot locked. If it was in use previously, close the
|
||||
// old file.
|
||||
//
|
||||
if let Some(old_file) = slot_guard.file.take() {
|
||||
// the normal path of dropping VirtualFile uses "close", use "close-by-replace" here to
|
||||
// distinguish the two.
|
||||
STORAGE_IO_TIME
|
||||
.with_label_values(&["close-by-replace"])
|
||||
.observe_closure_duration(|| drop(old_file));
|
||||
}
|
||||
|
||||
// Prepare the slot for reuse and return it
|
||||
slot_guard.tag += 1;
|
||||
slot.recently_used.store(true, Ordering::Relaxed);
|
||||
(
|
||||
SlotHandle {
|
||||
index,
|
||||
tag: slot_guard.tag,
|
||||
},
|
||||
slot_guard,
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
impl VirtualFile {
|
||||
/// Open a file in read-only mode. Like File::open.
|
||||
pub fn open(path: &Path) -> Result<VirtualFile, std::io::Error> {
|
||||
@@ -207,7 +96,6 @@ impl VirtualFile {
|
||||
tenant_id = "*".to_string();
|
||||
timeline_id = "*".to_string();
|
||||
}
|
||||
let (handle, mut slot_guard) = get_open_files().find_victim_slot();
|
||||
let file = STORAGE_IO_TIME
|
||||
.with_label_values(&["open"])
|
||||
.observe_closure_duration(|| open_options.open(path))?;
|
||||
@@ -223,15 +111,76 @@ impl VirtualFile {
|
||||
reopen_options.truncate(false);
|
||||
|
||||
let vfile = VirtualFile {
|
||||
handle: RwLock::new(handle),
|
||||
handle: Arc::new(Mutex::new(Some(file))),
|
||||
pos: 0,
|
||||
path: path.to_path_buf(),
|
||||
open_options: reopen_options,
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
};
|
||||
|
||||
slot_guard.file.replace(file);
|
||||
Ok(vfile)
|
||||
}
|
||||
|
||||
/// Open a file in read-only mode. Like File::open.
|
||||
pub async fn open_async(path: &Path) -> Result<VirtualFile, std::io::Error> {
|
||||
let mut options = tokio_epoll_uring::ops::open_at::OpenOptions::new();
|
||||
options.read(true);
|
||||
Self::open_with_options_async(path, options).await
|
||||
}
|
||||
|
||||
/// Open a file with given options.
|
||||
///
|
||||
/// Note: If any custom flags were set in 'open_options' through OpenOptionsExt,
|
||||
/// they will be applied also when the file is subsequently re-opened, not only
|
||||
/// on the first time. Make sure that's sane!
|
||||
pub async fn open_with_options_async(
|
||||
path: &Path,
|
||||
open_options: tokio_epoll_uring::ops::open_at::OpenOptions,
|
||||
) -> Result<VirtualFile, std::io::Error> {
|
||||
let path_str = path.to_string_lossy();
|
||||
let parts = path_str.split('/').collect::<Vec<&str>>();
|
||||
let tenant_id;
|
||||
let timeline_id;
|
||||
if parts.len() > 5 && parts[parts.len() - 5] == "tenants" {
|
||||
tenant_id = parts[parts.len() - 4].to_string();
|
||||
timeline_id = parts[parts.len() - 2].to_string();
|
||||
} else {
|
||||
tenant_id = "*".to_string();
|
||||
timeline_id = "*".to_string();
|
||||
}
|
||||
let start = std::time::Instant::now();
|
||||
let system = tokio_epoll_uring::thread_local_system().await;
|
||||
let file: OwnedFd = system
|
||||
.open(path, &open_options)
|
||||
.await
|
||||
.map_err(|e| match e {
|
||||
tokio_epoll_uring::Error::Op(e) => e,
|
||||
tokio_epoll_uring::Error::System(system) => {
|
||||
std::io::Error::new(std::io::ErrorKind::Other, system)
|
||||
}
|
||||
})?;
|
||||
let file = File::from(file);
|
||||
STORAGE_IO_TIME
|
||||
.with_label_values(&["open"])
|
||||
.observe(start.elapsed().as_secs_f64());
|
||||
|
||||
// Strip all options other than read and write.
|
||||
//
|
||||
// It would perhaps be nicer to check just for the read and write flags
|
||||
// explicitly, but OpenOptions doesn't contain any functions to read flags,
|
||||
// only to set them.
|
||||
let mut reopen_options = open_options;
|
||||
reopen_options.create(false);
|
||||
reopen_options.create_new(false);
|
||||
reopen_options.truncate(false);
|
||||
|
||||
let vfile = VirtualFile {
|
||||
handle: Arc::new(Mutex::new(Some(file))),
|
||||
pos: 0,
|
||||
path: path.to_path_buf(),
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
};
|
||||
|
||||
Ok(vfile)
|
||||
}
|
||||
@@ -244,7 +193,9 @@ impl VirtualFile {
|
||||
pub fn metadata(&self) -> Result<fs::Metadata, Error> {
|
||||
self.with_file("metadata", |file| file.metadata())?
|
||||
}
|
||||
}
|
||||
|
||||
impl VirtualFile {
|
||||
/// Helper function that looks up the underlying File for this VirtualFile,
|
||||
/// opening it and evicting some other File if necessary. It calls 'func'
|
||||
/// with the physical File.
|
||||
@@ -252,68 +203,9 @@ impl VirtualFile {
|
||||
where
|
||||
F: FnMut(&File) -> R,
|
||||
{
|
||||
let open_files = get_open_files();
|
||||
|
||||
let mut handle_guard = {
|
||||
// Read the cached slot handle, and see if the slot that it points to still
|
||||
// contains our File.
|
||||
//
|
||||
// We only need to hold the handle lock while we read the current handle. If
|
||||
// another thread closes the file and recycles the slot for a different file,
|
||||
// we will notice that the handle we read is no longer valid and retry.
|
||||
let mut handle = *self.handle.read().unwrap();
|
||||
loop {
|
||||
// Check if the slot contains our File
|
||||
{
|
||||
let slot = &open_files.slots[handle.index];
|
||||
let slot_guard = slot.inner.read().unwrap();
|
||||
if slot_guard.tag == handle.tag {
|
||||
if let Some(file) = &slot_guard.file {
|
||||
// Found a cached file descriptor.
|
||||
slot.recently_used.store(true, Ordering::Relaxed);
|
||||
return Ok(STORAGE_IO_TIME
|
||||
.with_label_values(&[op])
|
||||
.observe_closure_duration(|| func(file)));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// The slot didn't contain our File. We will have to open it ourselves,
|
||||
// but before that, grab a write lock on handle in the VirtualFile, so
|
||||
// that no other thread will try to concurrently open the same file.
|
||||
let handle_guard = self.handle.write().unwrap();
|
||||
|
||||
// If another thread changed the handle while we were not holding the lock,
|
||||
// then the handle might now be valid again. Loop back to retry.
|
||||
if *handle_guard != handle {
|
||||
handle = *handle_guard;
|
||||
continue;
|
||||
}
|
||||
break handle_guard;
|
||||
}
|
||||
};
|
||||
|
||||
// We need to open the file ourselves. The handle in the VirtualFile is
|
||||
// now locked in write-mode. Find a free slot to put it in.
|
||||
let (handle, mut slot_guard) = open_files.find_victim_slot();
|
||||
|
||||
// Open the physical file
|
||||
let file = STORAGE_IO_TIME
|
||||
.with_label_values(&["open"])
|
||||
.observe_closure_duration(|| self.open_options.open(&self.path))?;
|
||||
|
||||
// Perform the requested operation on it
|
||||
let result = STORAGE_IO_TIME
|
||||
return Ok(STORAGE_IO_TIME
|
||||
.with_label_values(&[op])
|
||||
.observe_closure_duration(|| func(&file));
|
||||
|
||||
// Store the File in the slot and update the handle in the VirtualFile
|
||||
// to point to it.
|
||||
slot_guard.file.replace(file);
|
||||
|
||||
*handle_guard = handle;
|
||||
|
||||
Ok(result)
|
||||
.observe_closure_duration(|| func(&*self.handle.lock().unwrap().as_ref().unwrap())));
|
||||
}
|
||||
|
||||
pub fn remove(self) {
|
||||
@@ -323,35 +215,6 @@ impl VirtualFile {
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for VirtualFile {
|
||||
/// If a VirtualFile is dropped, close the underlying file if it was open.
|
||||
fn drop(&mut self) {
|
||||
let handle = self.handle.get_mut().unwrap();
|
||||
|
||||
// We could check with a read-lock first, to avoid waiting on an
|
||||
// unrelated I/O.
|
||||
let slot = &get_open_files().slots[handle.index];
|
||||
let mut slot_guard = slot.inner.write().unwrap();
|
||||
if slot_guard.tag == handle.tag {
|
||||
slot.recently_used.store(false, Ordering::Relaxed);
|
||||
// there is also operation "close-by-replace" for closes done on eviction for
|
||||
// comparison.
|
||||
STORAGE_IO_TIME
|
||||
.with_label_values(&["close"])
|
||||
.observe_closure_duration(|| drop(slot_guard.file.take()));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Read for VirtualFile {
|
||||
fn read(&mut self, buf: &mut [u8]) -> Result<usize, Error> {
|
||||
let pos = self.pos;
|
||||
let n = self.read_at(buf, pos)?;
|
||||
self.pos += n as u64;
|
||||
Ok(n)
|
||||
}
|
||||
}
|
||||
|
||||
impl Write for VirtualFile {
|
||||
fn write(&mut self, buf: &[u8]) -> Result<usize, std::io::Error> {
|
||||
let pos = self.pos;
|
||||
@@ -367,8 +230,8 @@ impl Write for VirtualFile {
|
||||
}
|
||||
}
|
||||
|
||||
impl Seek for VirtualFile {
|
||||
fn seek(&mut self, pos: SeekFrom) -> Result<u64, Error> {
|
||||
impl VirtualFile {
|
||||
pub fn seek(&mut self, pos: SeekFrom) -> Result<u64, Error> {
|
||||
match pos {
|
||||
SeekFrom::Start(offset) => {
|
||||
self.pos = offset;
|
||||
@@ -392,11 +255,113 @@ impl Seek for VirtualFile {
|
||||
}
|
||||
Ok(self.pos)
|
||||
}
|
||||
}
|
||||
|
||||
impl FileExt for VirtualFile {
|
||||
// Copied from https://doc.rust-lang.org/1.72.0/src/std/os/unix/fs.rs.html#117-135
|
||||
pub fn read_exact_at(&self, mut buf: &mut [u8], mut offset: u64) -> Result<(), Error> {
|
||||
while !buf.is_empty() {
|
||||
match self.read_at(buf, offset) {
|
||||
Ok(0) => {
|
||||
return Err(Error::new(
|
||||
std::io::ErrorKind::UnexpectedEof,
|
||||
"failed to fill whole buffer",
|
||||
))
|
||||
}
|
||||
Ok(n) => {
|
||||
buf = &mut buf[n..];
|
||||
offset += n as u64;
|
||||
}
|
||||
Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => {}
|
||||
Err(e) => return Err(e),
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// Copied from https://doc.rust-lang.org/1.72.0/src/std/os/unix/fs.rs.html#117-135
|
||||
pub async fn read_exact_at_async(
|
||||
&self,
|
||||
mut write_guard: crate::buffer_pool::Buffer,
|
||||
offset: u64,
|
||||
) -> Result<crate::buffer_pool::Buffer, Error> {
|
||||
let file = self.handle.lock().unwrap().take().unwrap();
|
||||
let put_back = AtomicBool::new(false);
|
||||
let put_back_ref = &put_back;
|
||||
scopeguard::defer! {
|
||||
if !put_back_ref.load(std::sync::atomic::Ordering::Relaxed) {
|
||||
panic!("mut put self.handle back")
|
||||
}
|
||||
};
|
||||
let system = tokio_epoll_uring::thread_local_system().await;
|
||||
struct PageWriteGuardBuf {
|
||||
buf: crate::buffer_pool::Buffer,
|
||||
init_up_to: usize,
|
||||
}
|
||||
unsafe impl tokio_epoll_uring::IoBuf for PageWriteGuardBuf {
|
||||
fn stable_ptr(&self) -> *const u8 {
|
||||
self.buf.as_ptr()
|
||||
}
|
||||
fn bytes_init(&self) -> usize {
|
||||
self.init_up_to
|
||||
}
|
||||
fn bytes_total(&self) -> usize {
|
||||
self.buf.len()
|
||||
}
|
||||
}
|
||||
unsafe impl tokio_epoll_uring::IoBufMut for PageWriteGuardBuf {
|
||||
fn stable_mut_ptr(&mut self) -> *mut u8 {
|
||||
self.buf.as_mut_ptr()
|
||||
}
|
||||
|
||||
unsafe fn set_init(&mut self, pos: usize) {
|
||||
assert!(pos <= self.buf.len());
|
||||
self.init_up_to = pos;
|
||||
}
|
||||
}
|
||||
let buf = PageWriteGuardBuf {
|
||||
buf: write_guard,
|
||||
init_up_to: 0,
|
||||
};
|
||||
let ((file, buf), res) = system.read(file.into(), offset, buf).await;
|
||||
let PageWriteGuardBuf {
|
||||
buf: write_guard,
|
||||
init_up_to,
|
||||
} = buf;
|
||||
if let Ok(num_read) = res {
|
||||
assert!(init_up_to <= num_read);
|
||||
}
|
||||
let replaced = self.handle.lock().unwrap().replace(File::from(file));
|
||||
assert!(replaced.is_none());
|
||||
put_back.store(true, std::sync::atomic::Ordering::Relaxed);
|
||||
res.map(|_| write_guard)
|
||||
.map_err(|e| Error::new(ErrorKind::Other, e))
|
||||
}
|
||||
|
||||
// Copied from https://doc.rust-lang.org/1.72.0/src/std/os/unix/fs.rs.html#219-235
|
||||
pub fn write_all_at(&self, mut buf: &[u8], mut offset: u64) -> Result<(), Error> {
|
||||
while !buf.is_empty() {
|
||||
match self.write_at(buf, offset) {
|
||||
Ok(0) => {
|
||||
return Err(Error::new(
|
||||
std::io::ErrorKind::WriteZero,
|
||||
"failed to write whole buffer",
|
||||
));
|
||||
}
|
||||
Ok(n) => {
|
||||
buf = &buf[n..];
|
||||
offset += n as u64;
|
||||
}
|
||||
Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => {}
|
||||
Err(e) => return Err(e),
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn read_at(&self, buf: &mut [u8], offset: u64) -> Result<usize, Error> {
|
||||
let result = self.with_file("read", |file| file.read_at(buf, offset))?;
|
||||
let result = self.with_file("read", |file| {
|
||||
tracing::info!("sync read\n{}", std::backtrace::Backtrace::force_capture());
|
||||
file.read_at(buf, offset)
|
||||
})?;
|
||||
if let Ok(size) = result {
|
||||
STORAGE_IO_SIZE
|
||||
.with_label_values(&["read", &self.tenant_id, &self.timeline_id])
|
||||
@@ -405,7 +370,7 @@ impl FileExt for VirtualFile {
|
||||
result
|
||||
}
|
||||
|
||||
fn write_at(&self, buf: &[u8], offset: u64) -> Result<usize, Error> {
|
||||
pub fn write_at(&self, buf: &[u8], offset: u64) -> Result<usize, Error> {
|
||||
let result = self.with_file("write", |file| file.write_at(buf, offset))?;
|
||||
if let Ok(size) = result {
|
||||
STORAGE_IO_SIZE
|
||||
@@ -415,256 +380,3 @@ impl FileExt for VirtualFile {
|
||||
result
|
||||
}
|
||||
}
|
||||
|
||||
impl OpenFiles {
|
||||
fn new(num_slots: usize) -> OpenFiles {
|
||||
let mut slots = Box::new(Vec::with_capacity(num_slots));
|
||||
for _ in 0..num_slots {
|
||||
let slot = Slot {
|
||||
recently_used: AtomicBool::new(false),
|
||||
inner: RwLock::new(SlotInner { tag: 0, file: None }),
|
||||
};
|
||||
slots.push(slot);
|
||||
}
|
||||
|
||||
OpenFiles {
|
||||
next: AtomicUsize::new(0),
|
||||
slots: Box::leak(slots),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
///
|
||||
/// Initialize the virtual file module. This must be called once at page
|
||||
/// server startup.
|
||||
///
|
||||
pub fn init(num_slots: usize) {
|
||||
if OPEN_FILES.set(OpenFiles::new(num_slots)).is_err() {
|
||||
panic!("virtual_file::init called twice");
|
||||
}
|
||||
}
|
||||
|
||||
const TEST_MAX_FILE_DESCRIPTORS: usize = 10;
|
||||
|
||||
// Get a handle to the global slots array.
|
||||
fn get_open_files() -> &'static OpenFiles {
|
||||
//
|
||||
// In unit tests, page server startup doesn't happen and no one calls
|
||||
// virtual_file::init(). Initialize it here, with a small array.
|
||||
//
|
||||
// This applies to the virtual file tests below, but all other unit
|
||||
// tests too, so the virtual file facility is always usable in
|
||||
// unit tests.
|
||||
//
|
||||
if cfg!(test) {
|
||||
OPEN_FILES.get_or_init(|| OpenFiles::new(TEST_MAX_FILE_DESCRIPTORS))
|
||||
} else {
|
||||
OPEN_FILES.get().expect("virtual_file::init not called yet")
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use rand::seq::SliceRandom;
|
||||
use rand::thread_rng;
|
||||
use rand::Rng;
|
||||
use std::sync::Arc;
|
||||
use std::thread;
|
||||
|
||||
// Helper function to slurp contents of a file, starting at the current position,
|
||||
// into a string
|
||||
fn read_string<FD>(vfile: &mut FD) -> Result<String, Error>
|
||||
where
|
||||
FD: Read,
|
||||
{
|
||||
let mut buf = String::new();
|
||||
vfile.read_to_string(&mut buf)?;
|
||||
Ok(buf)
|
||||
}
|
||||
|
||||
// Helper function to slurp a portion of a file into a string
|
||||
fn read_string_at<FD>(vfile: &mut FD, pos: u64, len: usize) -> Result<String, Error>
|
||||
where
|
||||
FD: FileExt,
|
||||
{
|
||||
let mut buf = Vec::new();
|
||||
buf.resize(len, 0);
|
||||
vfile.read_exact_at(&mut buf, pos)?;
|
||||
Ok(String::from_utf8(buf).unwrap())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_virtual_files() -> Result<(), Error> {
|
||||
// The real work is done in the test_files() helper function. This
|
||||
// allows us to run the same set of tests against a native File, and
|
||||
// VirtualFile. We trust the native Files and wouldn't need to test them,
|
||||
// but this allows us to verify that the operations return the same
|
||||
// results with VirtualFiles as with native Files. (Except that with
|
||||
// native files, you will run out of file descriptors if the ulimit
|
||||
// is low enough.)
|
||||
test_files("virtual_files", |path, open_options| {
|
||||
VirtualFile::open_with_options(path, open_options)
|
||||
})
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_physical_files() -> Result<(), Error> {
|
||||
test_files("physical_files", |path, open_options| {
|
||||
open_options.open(path)
|
||||
})
|
||||
}
|
||||
|
||||
fn test_files<OF, FD>(testname: &str, openfunc: OF) -> Result<(), Error>
|
||||
where
|
||||
FD: Read + Write + Seek + FileExt,
|
||||
OF: Fn(&Path, &OpenOptions) -> Result<FD, std::io::Error>,
|
||||
{
|
||||
let testdir = crate::config::PageServerConf::test_repo_dir(testname);
|
||||
std::fs::create_dir_all(&testdir)?;
|
||||
|
||||
let path_a = testdir.join("file_a");
|
||||
let mut file_a = openfunc(
|
||||
&path_a,
|
||||
OpenOptions::new().write(true).create(true).truncate(true),
|
||||
)?;
|
||||
file_a.write_all(b"foobar")?;
|
||||
|
||||
// cannot read from a file opened in write-only mode
|
||||
assert!(read_string(&mut file_a).is_err());
|
||||
|
||||
// Close the file and re-open for reading
|
||||
let mut file_a = openfunc(&path_a, OpenOptions::new().read(true))?;
|
||||
|
||||
// cannot write to a file opened in read-only mode
|
||||
assert!(file_a.write(b"bar").is_err());
|
||||
|
||||
// Try simple read
|
||||
assert_eq!("foobar", read_string(&mut file_a)?);
|
||||
|
||||
// It's positioned at the EOF now.
|
||||
assert_eq!("", read_string(&mut file_a)?);
|
||||
|
||||
// Test seeks.
|
||||
assert_eq!(file_a.seek(SeekFrom::Start(1))?, 1);
|
||||
assert_eq!("oobar", read_string(&mut file_a)?);
|
||||
|
||||
assert_eq!(file_a.seek(SeekFrom::End(-2))?, 4);
|
||||
assert_eq!("ar", read_string(&mut file_a)?);
|
||||
|
||||
assert_eq!(file_a.seek(SeekFrom::Start(1))?, 1);
|
||||
assert_eq!(file_a.seek(SeekFrom::Current(2))?, 3);
|
||||
assert_eq!("bar", read_string(&mut file_a)?);
|
||||
|
||||
assert_eq!(file_a.seek(SeekFrom::Current(-5))?, 1);
|
||||
assert_eq!("oobar", read_string(&mut file_a)?);
|
||||
|
||||
// Test erroneous seeks to before byte 0
|
||||
assert!(file_a.seek(SeekFrom::End(-7)).is_err());
|
||||
assert_eq!(file_a.seek(SeekFrom::Start(1))?, 1);
|
||||
assert!(file_a.seek(SeekFrom::Current(-2)).is_err());
|
||||
|
||||
// the erroneous seek should have left the position unchanged
|
||||
assert_eq!("oobar", read_string(&mut file_a)?);
|
||||
|
||||
// Create another test file, and try FileExt functions on it.
|
||||
let path_b = testdir.join("file_b");
|
||||
let mut file_b = openfunc(
|
||||
&path_b,
|
||||
OpenOptions::new()
|
||||
.read(true)
|
||||
.write(true)
|
||||
.create(true)
|
||||
.truncate(true),
|
||||
)?;
|
||||
file_b.write_all_at(b"BAR", 3)?;
|
||||
file_b.write_all_at(b"FOO", 0)?;
|
||||
|
||||
assert_eq!(read_string_at(&mut file_b, 2, 3)?, "OBA");
|
||||
|
||||
// Open a lot of files, enough to cause some evictions. (Or to be precise,
|
||||
// open the same file many times. The effect is the same.)
|
||||
//
|
||||
// leave file_a positioned at offset 1 before we start
|
||||
assert_eq!(file_a.seek(SeekFrom::Start(1))?, 1);
|
||||
|
||||
let mut vfiles = Vec::new();
|
||||
for _ in 0..100 {
|
||||
let mut vfile = openfunc(&path_b, OpenOptions::new().read(true))?;
|
||||
assert_eq!("FOOBAR", read_string(&mut vfile)?);
|
||||
vfiles.push(vfile);
|
||||
}
|
||||
|
||||
// make sure we opened enough files to definitely cause evictions.
|
||||
assert!(vfiles.len() > TEST_MAX_FILE_DESCRIPTORS * 2);
|
||||
|
||||
// The underlying file descriptor for 'file_a' should be closed now. Try to read
|
||||
// from it again. We left the file positioned at offset 1 above.
|
||||
assert_eq!("oobar", read_string(&mut file_a)?);
|
||||
|
||||
// Check that all the other FDs still work too. Use them in random order for
|
||||
// good measure.
|
||||
vfiles.as_mut_slice().shuffle(&mut thread_rng());
|
||||
for vfile in vfiles.iter_mut() {
|
||||
assert_eq!("OOBAR", read_string_at(vfile, 1, 5)?);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Test using VirtualFiles from many threads concurrently. This tests both using
|
||||
/// a lot of VirtualFiles concurrently, causing evictions, and also using the same
|
||||
/// VirtualFile from multiple threads concurrently.
|
||||
#[test]
|
||||
fn test_vfile_concurrency() -> Result<(), Error> {
|
||||
const SIZE: usize = 8 * 1024;
|
||||
const VIRTUAL_FILES: usize = 100;
|
||||
const THREADS: usize = 100;
|
||||
const SAMPLE: [u8; SIZE] = [0xADu8; SIZE];
|
||||
|
||||
let testdir = crate::config::PageServerConf::test_repo_dir("vfile_concurrency");
|
||||
std::fs::create_dir_all(&testdir)?;
|
||||
|
||||
// Create a test file.
|
||||
let test_file_path = testdir.join("concurrency_test_file");
|
||||
{
|
||||
let file = File::create(&test_file_path)?;
|
||||
file.write_all_at(&SAMPLE, 0)?;
|
||||
}
|
||||
|
||||
// Open the file many times.
|
||||
let mut files = Vec::new();
|
||||
for _ in 0..VIRTUAL_FILES {
|
||||
let f = VirtualFile::open_with_options(&test_file_path, OpenOptions::new().read(true))?;
|
||||
files.push(f);
|
||||
}
|
||||
let files = Arc::new(files);
|
||||
|
||||
// Launch many threads, and use the virtual files concurrently in random order.
|
||||
let mut threads = Vec::new();
|
||||
for threadno in 0..THREADS {
|
||||
let builder =
|
||||
thread::Builder::new().name(format!("test_vfile_concurrency thread {}", threadno));
|
||||
|
||||
let files = files.clone();
|
||||
let thread = builder
|
||||
.spawn(move || {
|
||||
let mut buf = [0u8; SIZE];
|
||||
let mut rng = rand::thread_rng();
|
||||
for _ in 1..1000 {
|
||||
let f = &files[rng.gen_range(0..files.len())];
|
||||
f.read_exact_at(&mut buf, 0).unwrap();
|
||||
assert!(buf == SAMPLE);
|
||||
}
|
||||
})
|
||||
.unwrap();
|
||||
threads.push(thread);
|
||||
}
|
||||
|
||||
for thread in threads {
|
||||
thread.join().unwrap();
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,32 @@
|
||||
import queue
|
||||
import threading
|
||||
from fixtures.neon_fixtures import NeonEnvBuilder, PgBin, wait_for_last_flush_lsn
|
||||
from fixtures.types import TenantId
|
||||
|
||||
|
||||
def test_pageserver_startup_many_tenants(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin):
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
# below doesn't work because summaries contain tenant and timeline ids and we check for them
|
||||
|
||||
tenant_id, timeline_id = env.initial_tenant, env.initial_timeline
|
||||
pshttp = env.pageserver.http_client()
|
||||
ep = env.endpoints.create_start("main")
|
||||
ep.safe_psql("create table foo(b text)")
|
||||
for i in range(0, 8):
|
||||
ep.safe_psql("insert into foo(b) values ('some text')")
|
||||
# pg_bin.run_capture(["pgbench", "-i", "-s1", ep.connstr()])
|
||||
wait_for_last_flush_lsn(env, ep, tenant_id, timeline_id)
|
||||
pshttp.timeline_checkpoint(tenant_id, timeline_id)
|
||||
ep.stop_and_destroy()
|
||||
|
||||
env.pageserver.stop()
|
||||
for sk in env.safekeepers:
|
||||
sk.stop()
|
||||
|
||||
tenant_dir = env.repo_dir / "tenants" / str(env.initial_tenant)
|
||||
|
||||
for i in range(0, 20_000):
|
||||
import shutil
|
||||
|
||||
shutil.copytree(tenant_dir, tenant_dir.parent / str(TenantId.generate()))
|
||||
Reference in New Issue
Block a user