mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-16 09:52:54 +00:00
rip out page cache
This commit is contained in:
committed by
Christian Schwarz
parent
b9b7670a3a
commit
a608667301
@@ -12,10 +12,9 @@ use std::collections::BinaryHeap;
|
||||
use std::ops::Range;
|
||||
use std::{fs, str};
|
||||
|
||||
use pageserver::page_cache::PAGE_SZ;
|
||||
use pageserver::repository::{Key, KEY_SIZE};
|
||||
use pageserver::tenant::block_io::FileBlockReader;
|
||||
use pageserver::tenant::disk_btree::{DiskBtreeReader, VisitDirection};
|
||||
use pageserver::tenant::disk_btree::{DiskBtreeReader, VisitDirection, PAGE_SZ};
|
||||
use pageserver::tenant::storage_layer::delta_layer::{Summary, DELTA_KEY_SIZE};
|
||||
use pageserver::tenant::storage_layer::range_overlaps;
|
||||
use pageserver::virtual_file::{self, VirtualFile};
|
||||
@@ -143,7 +142,6 @@ pub(crate) async fn main(cmd: &AnalyzeLayerMapCmd) -> Result<()> {
|
||||
|
||||
// Initialize virtual_file (file desriptor cache) and page cache which are needed to access layer persistent B-Tree.
|
||||
pageserver::virtual_file::init(10, virtual_file::IoEngineKind::StdFs);
|
||||
pageserver::page_cache::init(100);
|
||||
|
||||
let mut total_delta_layers = 0usize;
|
||||
let mut total_image_layers = 0usize;
|
||||
|
||||
@@ -11,7 +11,7 @@ use pageserver::tenant::storage_layer::delta_layer::{BlobRef, Summary};
|
||||
use pageserver::tenant::storage_layer::{delta_layer, image_layer};
|
||||
use pageserver::tenant::storage_layer::{DeltaLayer, ImageLayer};
|
||||
use pageserver::tenant::{TENANTS_SEGMENT_NAME, TIMELINES_SEGMENT_NAME};
|
||||
use pageserver::{page_cache, virtual_file};
|
||||
use pageserver::virtual_file;
|
||||
use pageserver::{
|
||||
repository::{Key, KEY_SIZE},
|
||||
tenant::{
|
||||
@@ -60,7 +60,6 @@ pub(crate) enum LayerCmd {
|
||||
async fn read_delta_file(path: impl AsRef<Path>, ctx: &RequestContext) -> Result<()> {
|
||||
let path = Utf8Path::from_path(path.as_ref()).expect("non-Unicode path");
|
||||
virtual_file::init(10, virtual_file::IoEngineKind::StdFs);
|
||||
page_cache::init(100);
|
||||
let file = FileBlockReader::new(VirtualFile::open(path).await?);
|
||||
let summary_blk = file.read_blk(0, ctx).await?;
|
||||
let actual_summary = Summary::des_prefix(summary_blk.as_ref())?;
|
||||
@@ -188,7 +187,6 @@ pub(crate) async fn main(cmd: &LayerCmd) -> Result<()> {
|
||||
new_timeline_id,
|
||||
} => {
|
||||
pageserver::virtual_file::init(10, virtual_file::IoEngineKind::StdFs);
|
||||
pageserver::page_cache::init(100);
|
||||
|
||||
let ctx = RequestContext::new(TaskKind::DebugTool, DownloadBehavior::Error);
|
||||
|
||||
|
||||
@@ -15,7 +15,6 @@ use index_part::IndexPartCmd;
|
||||
use layers::LayerCmd;
|
||||
use pageserver::{
|
||||
context::{DownloadBehavior, RequestContext},
|
||||
page_cache,
|
||||
task_mgr::TaskKind,
|
||||
tenant::{dump_layerfile_from_path, metadata::TimelineMetadata},
|
||||
virtual_file,
|
||||
@@ -124,7 +123,6 @@ fn read_pg_control_file(control_file_path: &Utf8Path) -> anyhow::Result<()> {
|
||||
async fn print_layerfile(path: &Utf8Path) -> anyhow::Result<()> {
|
||||
// Basic initialization of things that don't change after startup
|
||||
virtual_file::init(10, virtual_file::IoEngineKind::StdFs);
|
||||
page_cache::init(100);
|
||||
let ctx = RequestContext::new(TaskKind::DebugTool, DownloadBehavior::Error);
|
||||
dump_layerfile_from_path(path, true, &ctx).await
|
||||
}
|
||||
|
||||
@@ -24,7 +24,7 @@ use pageserver::{
|
||||
config::{defaults::*, PageServerConf},
|
||||
context::{DownloadBehavior, RequestContext},
|
||||
deletion_queue::DeletionQueue,
|
||||
http, page_cache, page_service, task_mgr,
|
||||
http, page_service, task_mgr,
|
||||
task_mgr::TaskKind,
|
||||
task_mgr::{BACKGROUND_RUNTIME, COMPUTE_REQUEST_RUNTIME, MGMT_REQUEST_RUNTIME},
|
||||
tenant::mgr,
|
||||
@@ -131,7 +131,6 @@ fn main() -> anyhow::Result<()> {
|
||||
|
||||
// Basic initialization of things that don't change after startup
|
||||
virtual_file::init(conf.max_file_descriptors, conf.virtual_file_io_engine);
|
||||
page_cache::init(conf.page_cache_size);
|
||||
|
||||
start_pageserver(launch_ts, conf).context("Failed to start pageserver")?;
|
||||
|
||||
|
||||
82
pageserver/src/buffer_pool.rs
Normal file
82
pageserver/src/buffer_pool.rs
Normal file
@@ -0,0 +1,82 @@
|
||||
use std::cell::RefCell;
|
||||
|
||||
use crate::tenant::disk_btree::PAGE_SZ;
|
||||
|
||||
pub struct Buffer(Option<Box<[u8; PAGE_SZ]>>);
|
||||
|
||||
// Thread-local list of re-usable buffers.
|
||||
thread_local! {
|
||||
static POOL: RefCell<Vec<Box<[u8; PAGE_SZ]>>> = RefCell::new(Vec::new());
|
||||
}
|
||||
|
||||
pub(crate) fn get() -> Buffer {
|
||||
let maybe = POOL.with(|rc| rc.borrow_mut().pop());
|
||||
match maybe {
|
||||
Some(buf) => Buffer(Some(buf)),
|
||||
None => Buffer(Some(Box::new([0; PAGE_SZ]))),
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for Buffer {
|
||||
fn drop(&mut self) {
|
||||
let buf = self.0.take().unwrap();
|
||||
POOL.with(|rc| rc.borrow_mut().push(buf))
|
||||
}
|
||||
}
|
||||
|
||||
impl std::ops::Deref for Buffer {
|
||||
type Target = [u8; PAGE_SZ];
|
||||
|
||||
fn deref(&self) -> &Self::Target {
|
||||
self.0.as_ref().unwrap().as_ref()
|
||||
}
|
||||
}
|
||||
|
||||
impl std::ops::DerefMut for Buffer {
|
||||
fn deref_mut(&mut self) -> &mut Self::Target {
|
||||
self.0.as_mut().unwrap().as_mut()
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) struct PageWriteGuardBuf {
|
||||
page: Buffer,
|
||||
init_up_to: usize,
|
||||
}
|
||||
impl PageWriteGuardBuf {
|
||||
pub fn new(buf: Buffer) -> Self {
|
||||
PageWriteGuardBuf {
|
||||
page: buf,
|
||||
init_up_to: 0,
|
||||
}
|
||||
}
|
||||
pub fn assume_init(self) -> Buffer {
|
||||
assert_eq!(self.init_up_to, PAGE_SZ);
|
||||
self.page
|
||||
}
|
||||
}
|
||||
|
||||
// Safety: the [`PageWriteGuard`] gives us exclusive ownership of the page cache slot,
|
||||
// and the location remains stable even if [`Self`] or the [`PageWriteGuard`] is moved.
|
||||
unsafe impl tokio_epoll_uring::IoBuf for PageWriteGuardBuf {
|
||||
fn stable_ptr(&self) -> *const u8 {
|
||||
self.page.as_ptr()
|
||||
}
|
||||
fn bytes_init(&self) -> usize {
|
||||
self.init_up_to
|
||||
}
|
||||
fn bytes_total(&self) -> usize {
|
||||
self.page.len()
|
||||
}
|
||||
}
|
||||
// Safety: see above, plus: the ownership of [`PageWriteGuard`] means exclusive access,
|
||||
// hence it's safe to hand out the `stable_mut_ptr()`.
|
||||
unsafe impl tokio_epoll_uring::IoBufMut for PageWriteGuardBuf {
|
||||
fn stable_mut_ptr(&mut self) -> *mut u8 {
|
||||
self.page.as_mut_ptr()
|
||||
}
|
||||
|
||||
unsafe fn set_init(&mut self, pos: usize) {
|
||||
assert!(pos <= self.page.len());
|
||||
self.init_up_to = pos;
|
||||
}
|
||||
}
|
||||
@@ -282,8 +282,4 @@ impl RequestContext {
|
||||
pub(crate) fn access_stats_behavior(&self) -> AccessStatsBehavior {
|
||||
self.access_stats_behavior
|
||||
}
|
||||
|
||||
pub(crate) fn page_content_kind(&self) -> PageContentKind {
|
||||
self.page_content_kind
|
||||
}
|
||||
}
|
||||
|
||||
@@ -13,7 +13,6 @@ pub mod http;
|
||||
pub mod import_datadir;
|
||||
pub use pageserver_api::keyspace;
|
||||
pub mod metrics;
|
||||
pub mod page_cache;
|
||||
pub mod page_service;
|
||||
pub mod pgdatadir_mapping;
|
||||
pub mod repository;
|
||||
@@ -31,6 +30,8 @@ use camino::Utf8Path;
|
||||
use deletion_queue::DeletionQueue;
|
||||
use tracing::info;
|
||||
|
||||
pub mod buffer_pool;
|
||||
|
||||
/// Current storage format version
|
||||
///
|
||||
/// This is embedded in the header of all the layer files.
|
||||
|
||||
@@ -187,216 +187,6 @@ pub(crate) static GET_VECTORED_LATENCY: Lazy<GetVectoredLatency> = Lazy::new(||
|
||||
}
|
||||
});
|
||||
|
||||
pub(crate) struct PageCacheMetricsForTaskKind {
|
||||
pub read_accesses_materialized_page: IntCounter,
|
||||
pub read_accesses_immutable: IntCounter,
|
||||
|
||||
pub read_hits_immutable: IntCounter,
|
||||
pub read_hits_materialized_page_exact: IntCounter,
|
||||
pub read_hits_materialized_page_older_lsn: IntCounter,
|
||||
}
|
||||
|
||||
pub(crate) struct PageCacheMetrics {
|
||||
map: EnumMap<TaskKind, EnumMap<PageContentKind, PageCacheMetricsForTaskKind>>,
|
||||
}
|
||||
|
||||
static PAGE_CACHE_READ_HITS: Lazy<IntCounterVec> = Lazy::new(|| {
|
||||
register_int_counter_vec!(
|
||||
"pageserver_page_cache_read_hits_total",
|
||||
"Number of read accesses to the page cache that hit",
|
||||
&["task_kind", "key_kind", "content_kind", "hit_kind"]
|
||||
)
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
static PAGE_CACHE_READ_ACCESSES: Lazy<IntCounterVec> = Lazy::new(|| {
|
||||
register_int_counter_vec!(
|
||||
"pageserver_page_cache_read_accesses_total",
|
||||
"Number of read accesses to the page cache",
|
||||
&["task_kind", "key_kind", "content_kind"]
|
||||
)
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
pub(crate) static PAGE_CACHE: Lazy<PageCacheMetrics> = Lazy::new(|| PageCacheMetrics {
|
||||
map: EnumMap::from_array(std::array::from_fn(|task_kind| {
|
||||
let task_kind = <TaskKind as enum_map::Enum>::from_usize(task_kind);
|
||||
let task_kind: &'static str = task_kind.into();
|
||||
EnumMap::from_array(std::array::from_fn(|content_kind| {
|
||||
let content_kind = <PageContentKind as enum_map::Enum>::from_usize(content_kind);
|
||||
let content_kind: &'static str = content_kind.into();
|
||||
PageCacheMetricsForTaskKind {
|
||||
read_accesses_materialized_page: {
|
||||
PAGE_CACHE_READ_ACCESSES
|
||||
.get_metric_with_label_values(&[
|
||||
task_kind,
|
||||
"materialized_page",
|
||||
content_kind,
|
||||
])
|
||||
.unwrap()
|
||||
},
|
||||
|
||||
read_accesses_immutable: {
|
||||
PAGE_CACHE_READ_ACCESSES
|
||||
.get_metric_with_label_values(&[task_kind, "immutable", content_kind])
|
||||
.unwrap()
|
||||
},
|
||||
|
||||
read_hits_immutable: {
|
||||
PAGE_CACHE_READ_HITS
|
||||
.get_metric_with_label_values(&[task_kind, "immutable", content_kind, "-"])
|
||||
.unwrap()
|
||||
},
|
||||
|
||||
read_hits_materialized_page_exact: {
|
||||
PAGE_CACHE_READ_HITS
|
||||
.get_metric_with_label_values(&[
|
||||
task_kind,
|
||||
"materialized_page",
|
||||
content_kind,
|
||||
"exact",
|
||||
])
|
||||
.unwrap()
|
||||
},
|
||||
|
||||
read_hits_materialized_page_older_lsn: {
|
||||
PAGE_CACHE_READ_HITS
|
||||
.get_metric_with_label_values(&[
|
||||
task_kind,
|
||||
"materialized_page",
|
||||
content_kind,
|
||||
"older_lsn",
|
||||
])
|
||||
.unwrap()
|
||||
},
|
||||
}
|
||||
}))
|
||||
})),
|
||||
});
|
||||
|
||||
impl PageCacheMetrics {
|
||||
pub(crate) fn for_ctx(&self, ctx: &RequestContext) -> &PageCacheMetricsForTaskKind {
|
||||
&self.map[ctx.task_kind()][ctx.page_content_kind()]
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) struct PageCacheSizeMetrics {
|
||||
pub max_bytes: UIntGauge,
|
||||
|
||||
pub current_bytes_immutable: UIntGauge,
|
||||
pub current_bytes_materialized_page: UIntGauge,
|
||||
}
|
||||
|
||||
static PAGE_CACHE_SIZE_CURRENT_BYTES: Lazy<UIntGaugeVec> = Lazy::new(|| {
|
||||
register_uint_gauge_vec!(
|
||||
"pageserver_page_cache_size_current_bytes",
|
||||
"Current size of the page cache in bytes, by key kind",
|
||||
&["key_kind"]
|
||||
)
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
pub(crate) static PAGE_CACHE_SIZE: Lazy<PageCacheSizeMetrics> =
|
||||
Lazy::new(|| PageCacheSizeMetrics {
|
||||
max_bytes: {
|
||||
register_uint_gauge!(
|
||||
"pageserver_page_cache_size_max_bytes",
|
||||
"Maximum size of the page cache in bytes"
|
||||
)
|
||||
.expect("failed to define a metric")
|
||||
},
|
||||
current_bytes_immutable: {
|
||||
PAGE_CACHE_SIZE_CURRENT_BYTES
|
||||
.get_metric_with_label_values(&["immutable"])
|
||||
.unwrap()
|
||||
},
|
||||
current_bytes_materialized_page: {
|
||||
PAGE_CACHE_SIZE_CURRENT_BYTES
|
||||
.get_metric_with_label_values(&["materialized_page"])
|
||||
.unwrap()
|
||||
},
|
||||
});
|
||||
|
||||
pub(crate) mod page_cache_eviction_metrics {
|
||||
use std::num::NonZeroUsize;
|
||||
|
||||
use metrics::{register_int_counter_vec, IntCounter, IntCounterVec};
|
||||
use once_cell::sync::Lazy;
|
||||
|
||||
#[derive(Clone, Copy)]
|
||||
pub(crate) enum Outcome {
|
||||
FoundSlotUnused { iters: NonZeroUsize },
|
||||
FoundSlotEvicted { iters: NonZeroUsize },
|
||||
ItersExceeded { iters: NonZeroUsize },
|
||||
}
|
||||
|
||||
static ITERS_TOTAL_VEC: Lazy<IntCounterVec> = Lazy::new(|| {
|
||||
register_int_counter_vec!(
|
||||
"pageserver_page_cache_find_victim_iters_total",
|
||||
"Counter for the number of iterations in the find_victim loop",
|
||||
&["outcome"],
|
||||
)
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
static CALLS_VEC: Lazy<IntCounterVec> = Lazy::new(|| {
|
||||
register_int_counter_vec!(
|
||||
"pageserver_page_cache_find_victim_calls",
|
||||
"Incremented at the end of each find_victim() call.\
|
||||
Filter by outcome to get e.g., eviction rate.",
|
||||
&["outcome"]
|
||||
)
|
||||
.unwrap()
|
||||
});
|
||||
|
||||
pub(crate) fn observe(outcome: Outcome) {
|
||||
macro_rules! dry {
|
||||
($label:literal, $iters:expr) => {{
|
||||
static LABEL: &'static str = $label;
|
||||
static ITERS_TOTAL: Lazy<IntCounter> =
|
||||
Lazy::new(|| ITERS_TOTAL_VEC.with_label_values(&[LABEL]));
|
||||
static CALLS: Lazy<IntCounter> =
|
||||
Lazy::new(|| CALLS_VEC.with_label_values(&[LABEL]));
|
||||
ITERS_TOTAL.inc_by(($iters.get()) as u64);
|
||||
CALLS.inc();
|
||||
}};
|
||||
}
|
||||
match outcome {
|
||||
Outcome::FoundSlotUnused { iters } => dry!("found_empty", iters),
|
||||
Outcome::FoundSlotEvicted { iters } => {
|
||||
dry!("found_evicted", iters)
|
||||
}
|
||||
Outcome::ItersExceeded { iters } => {
|
||||
dry!("err_iters_exceeded", iters);
|
||||
super::page_cache_errors_inc(super::PageCacheErrorKind::EvictIterLimit);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
static PAGE_CACHE_ERRORS: Lazy<IntCounterVec> = Lazy::new(|| {
|
||||
register_int_counter_vec!(
|
||||
"page_cache_errors_total",
|
||||
"Number of timeouts while acquiring a pinned slot in the page cache",
|
||||
&["error_kind"]
|
||||
)
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
#[derive(IntoStaticStr)]
|
||||
#[strum(serialize_all = "kebab_case")]
|
||||
pub(crate) enum PageCacheErrorKind {
|
||||
AcquirePinnedSlotTimeout,
|
||||
EvictIterLimit,
|
||||
}
|
||||
|
||||
pub(crate) fn page_cache_errors_inc(error_kind: PageCacheErrorKind) {
|
||||
PAGE_CACHE_ERRORS
|
||||
.get_metric_with_label_values(&[error_kind.into()])
|
||||
.unwrap()
|
||||
.inc();
|
||||
}
|
||||
|
||||
pub(crate) static WAIT_LSN_TIME: Lazy<Histogram> = Lazy::new(|| {
|
||||
register_histogram!(
|
||||
"pageserver_wait_lsn_seconds",
|
||||
@@ -1998,7 +1788,6 @@ use std::sync::{Arc, Mutex};
|
||||
use std::task::{Context, Poll};
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
use crate::context::{PageContentKind, RequestContext};
|
||||
use crate::task_mgr::TaskKind;
|
||||
|
||||
/// Maintain a per timeline gauge in addition to the global gauge.
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@@ -12,12 +12,13 @@
|
||||
//! len >= 128: 1XXXXXXX XXXXXXXX XXXXXXXX XXXXXXXX
|
||||
//!
|
||||
use crate::context::RequestContext;
|
||||
use crate::page_cache::PAGE_SZ;
|
||||
use crate::tenant::block_io::BlockCursor;
|
||||
use crate::virtual_file::VirtualFile;
|
||||
use std::cmp::min;
|
||||
use std::io::{Error, ErrorKind};
|
||||
|
||||
use super::disk_btree::PAGE_SZ;
|
||||
|
||||
impl<'a> BlockCursor<'a> {
|
||||
/// Read a blob into a new buffer.
|
||||
pub async fn read_blob(
|
||||
|
||||
@@ -5,7 +5,7 @@
|
||||
use super::ephemeral_file::EphemeralFile;
|
||||
use super::storage_layer::delta_layer::{Adapter, DeltaLayerInner};
|
||||
use crate::context::RequestContext;
|
||||
use crate::page_cache::{self, PageReadGuard, PageWriteGuard, ReadBufResult, PAGE_SZ};
|
||||
use crate::tenant::disk_btree::PAGE_SZ;
|
||||
use crate::virtual_file::VirtualFile;
|
||||
use bytes::Bytes;
|
||||
use std::ops::Deref;
|
||||
@@ -35,7 +35,7 @@ where
|
||||
|
||||
/// Reference to an in-memory copy of an immutable on-disk block.
|
||||
pub enum BlockLease<'a> {
|
||||
PageReadGuard(PageReadGuard<'static>),
|
||||
PageReadGuard(crate::buffer_pool::Buffer),
|
||||
EphemeralFileMutableTail(&'a [u8; PAGE_SZ]),
|
||||
#[cfg(test)]
|
||||
Arc(std::sync::Arc<[u8; PAGE_SZ]>),
|
||||
@@ -43,8 +43,8 @@ pub enum BlockLease<'a> {
|
||||
Vec(Vec<u8>),
|
||||
}
|
||||
|
||||
impl From<PageReadGuard<'static>> for BlockLease<'static> {
|
||||
fn from(value: PageReadGuard<'static>) -> BlockLease<'static> {
|
||||
impl From<crate::buffer_pool::Buffer> for BlockLease<'static> {
|
||||
fn from(value: crate::buffer_pool::Buffer) -> BlockLease<'static> {
|
||||
BlockLease::PageReadGuard(value)
|
||||
}
|
||||
}
|
||||
@@ -162,28 +162,26 @@ impl<'a> BlockCursor<'a> {
|
||||
/// for modifying the file, nor for invalidating the cache if it is modified.
|
||||
pub struct FileBlockReader {
|
||||
pub file: VirtualFile,
|
||||
|
||||
/// Unique ID of this file, used as key in the page cache.
|
||||
file_id: page_cache::FileId,
|
||||
}
|
||||
|
||||
impl FileBlockReader {
|
||||
pub fn new(file: VirtualFile) -> Self {
|
||||
let file_id = page_cache::next_file_id();
|
||||
|
||||
FileBlockReader { file_id, file }
|
||||
FileBlockReader { file }
|
||||
}
|
||||
|
||||
/// Read a page from the underlying file into given buffer.
|
||||
async fn fill_buffer(
|
||||
&self,
|
||||
buf: PageWriteGuard<'static>,
|
||||
buf: crate::buffer_pool::Buffer,
|
||||
blkno: u32,
|
||||
) -> Result<PageWriteGuard<'static>, std::io::Error> {
|
||||
) -> Result<crate::buffer_pool::Buffer, std::io::Error> {
|
||||
assert!(buf.len() == PAGE_SZ);
|
||||
self.file
|
||||
.read_exact_at_page(buf, blkno as u64 * PAGE_SZ as u64)
|
||||
.read_exact_at(
|
||||
crate::buffer_pool::PageWriteGuardBuf::new(buf),
|
||||
blkno as u64 * PAGE_SZ as u64,
|
||||
)
|
||||
.await
|
||||
.map(|guard| guard.assume_init())
|
||||
}
|
||||
/// Read a block.
|
||||
///
|
||||
@@ -193,25 +191,12 @@ impl FileBlockReader {
|
||||
pub async fn read_blk(
|
||||
&self,
|
||||
blknum: u32,
|
||||
ctx: &RequestContext,
|
||||
_ctx: &RequestContext,
|
||||
) -> Result<BlockLease, std::io::Error> {
|
||||
let cache = page_cache::get();
|
||||
match cache
|
||||
.read_immutable_buf(self.file_id, blknum, ctx)
|
||||
.await
|
||||
.map_err(|e| {
|
||||
std::io::Error::new(
|
||||
std::io::ErrorKind::Other,
|
||||
format!("Failed to read immutable buf: {e:#}"),
|
||||
)
|
||||
})? {
|
||||
ReadBufResult::Found(guard) => Ok(guard.into()),
|
||||
ReadBufResult::NotFound(write_guard) => {
|
||||
// Read the page from disk into the buffer
|
||||
let write_guard = self.fill_buffer(write_guard, blknum).await?;
|
||||
Ok(write_guard.mark_valid().into())
|
||||
}
|
||||
}
|
||||
let buf = crate::buffer_pool::get();
|
||||
// Read the page from disk into the buffer
|
||||
let write_guard = self.fill_buffer(buf, blknum).await?;
|
||||
Ok(BlockLease::PageReadGuard(write_guard))
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -3,8 +3,8 @@
|
||||
|
||||
use crate::config::PageServerConf;
|
||||
use crate::context::RequestContext;
|
||||
use crate::page_cache::{self, PAGE_SZ};
|
||||
use crate::tenant::block_io::{BlockCursor, BlockLease, BlockReader};
|
||||
use crate::tenant::disk_btree::PAGE_SZ;
|
||||
use crate::virtual_file::{self, VirtualFile};
|
||||
use camino::Utf8PathBuf;
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
@@ -17,8 +17,6 @@ use tracing::*;
|
||||
use utils::id::TimelineId;
|
||||
|
||||
pub struct EphemeralFile {
|
||||
page_cache_file_id: page_cache::FileId,
|
||||
|
||||
_tenant_shard_id: TenantShardId,
|
||||
_timeline_id: TimelineId,
|
||||
file: VirtualFile,
|
||||
@@ -55,7 +53,6 @@ impl EphemeralFile {
|
||||
.await?;
|
||||
|
||||
Ok(EphemeralFile {
|
||||
page_cache_file_id: page_cache::next_file_id(),
|
||||
_tenant_shard_id: tenant_shard_id,
|
||||
_timeline_id: timeline_id,
|
||||
file,
|
||||
@@ -71,36 +68,22 @@ impl EphemeralFile {
|
||||
pub(crate) async fn read_blk(
|
||||
&self,
|
||||
blknum: u32,
|
||||
ctx: &RequestContext,
|
||||
_ctx: &RequestContext,
|
||||
) -> Result<BlockLease, io::Error> {
|
||||
let flushed_blknums = 0..self.len / PAGE_SZ as u64;
|
||||
if flushed_blknums.contains(&(blknum as u64)) {
|
||||
let cache = page_cache::get();
|
||||
match cache
|
||||
.read_immutable_buf(self.page_cache_file_id, blknum, ctx)
|
||||
.await
|
||||
.map_err(|e| {
|
||||
std::io::Error::new(
|
||||
std::io::ErrorKind::Other,
|
||||
// order path before error because error is anyhow::Error => might have many contexts
|
||||
format!(
|
||||
"ephemeral file: read immutable page #{}: {}: {:#}",
|
||||
blknum, self.file.path, e,
|
||||
),
|
||||
)
|
||||
})? {
|
||||
page_cache::ReadBufResult::Found(guard) => {
|
||||
return Ok(BlockLease::PageReadGuard(guard))
|
||||
}
|
||||
page_cache::ReadBufResult::NotFound(write_guard) => {
|
||||
let write_guard = self
|
||||
.file
|
||||
.read_exact_at_page(write_guard, blknum as u64 * PAGE_SZ as u64)
|
||||
.await?;
|
||||
let read_guard = write_guard.mark_valid();
|
||||
return Ok(BlockLease::PageReadGuard(read_guard));
|
||||
}
|
||||
};
|
||||
let mut write_guard: crate::buffer_pool::Buffer = crate::buffer_pool::get();
|
||||
let buf: &mut [u8] = write_guard.deref_mut();
|
||||
debug_assert_eq!(buf.len(), PAGE_SZ);
|
||||
let buf = self
|
||||
.file
|
||||
.read_exact_at(
|
||||
crate::buffer_pool::PageWriteGuardBuf::new(write_guard),
|
||||
blknum as u64 * PAGE_SZ as u64,
|
||||
)
|
||||
.await?
|
||||
.assume_init();
|
||||
Ok(BlockLease::PageReadGuard(buf))
|
||||
} else {
|
||||
debug_assert_eq!(blknum as u64, self.len / PAGE_SZ as u64);
|
||||
Ok(BlockLease::EphemeralFileMutableTail(&self.mutable_tail))
|
||||
@@ -131,7 +114,7 @@ impl EphemeralFile {
|
||||
async fn push_bytes(
|
||||
&mut self,
|
||||
src: &[u8],
|
||||
ctx: &RequestContext,
|
||||
_ctx: &RequestContext,
|
||||
) -> Result<(), io::Error> {
|
||||
let mut src_remaining = src;
|
||||
while !src_remaining.is_empty() {
|
||||
@@ -151,33 +134,6 @@ impl EphemeralFile {
|
||||
.await
|
||||
{
|
||||
Ok(_) => {
|
||||
// Pre-warm the page cache with what we just wrote.
|
||||
// This isn't necessary for coherency/correctness, but it's how we've always done it.
|
||||
let cache = page_cache::get();
|
||||
match cache
|
||||
.read_immutable_buf(
|
||||
self.ephemeral_file.page_cache_file_id,
|
||||
self.blknum,
|
||||
ctx,
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(page_cache::ReadBufResult::Found(_guard)) => {
|
||||
// This function takes &mut self, so, it shouldn't be possible to reach this point.
|
||||
unreachable!("we just wrote blknum {} and this function takes &mut self, so, no concurrent read_blk is possible", self.blknum);
|
||||
}
|
||||
Ok(page_cache::ReadBufResult::NotFound(mut write_guard)) => {
|
||||
let buf: &mut [u8] = write_guard.deref_mut();
|
||||
debug_assert_eq!(buf.len(), PAGE_SZ);
|
||||
buf.copy_from_slice(&self.ephemeral_file.mutable_tail);
|
||||
let _ = write_guard.mark_valid();
|
||||
// pre-warm successful
|
||||
}
|
||||
Err(e) => {
|
||||
error!("ephemeral_file write_blob failed to get immutable buf to pre-warm page cache: {e:?}");
|
||||
// fail gracefully, it's not the end of the world if we can't pre-warm the cache here
|
||||
}
|
||||
}
|
||||
// Zero the buffer for re-use.
|
||||
// Zeroing is critical for correcntess because the write_blob code below
|
||||
// and similarly read_blk expect zeroed pages.
|
||||
|
||||
@@ -29,10 +29,10 @@
|
||||
//!
|
||||
use crate::config::PageServerConf;
|
||||
use crate::context::{PageContentKind, RequestContext, RequestContextBuilder};
|
||||
use crate::page_cache::PAGE_SZ;
|
||||
use crate::repository::{Key, Value, KEY_SIZE};
|
||||
use crate::tenant::blob_io::BlobWriter;
|
||||
use crate::tenant::block_io::{BlockBuf, BlockCursor, BlockLease, BlockReader, FileBlockReader};
|
||||
use crate::tenant::disk_btree::PAGE_SZ;
|
||||
use crate::tenant::disk_btree::{DiskBtreeBuilder, DiskBtreeReader, VisitDirection};
|
||||
use crate::tenant::storage_layer::{Layer, ValueReconstructResult, ValueReconstructState};
|
||||
use crate::tenant::Timeline;
|
||||
|
||||
@@ -25,10 +25,10 @@
|
||||
//! actual page images are stored in the "values" part.
|
||||
use crate::config::PageServerConf;
|
||||
use crate::context::{PageContentKind, RequestContext, RequestContextBuilder};
|
||||
use crate::page_cache::PAGE_SZ;
|
||||
use crate::repository::{Key, KEY_SIZE};
|
||||
use crate::tenant::blob_io::BlobWriter;
|
||||
use crate::tenant::block_io::{BlockBuf, BlockReader, FileBlockReader};
|
||||
use crate::tenant::disk_btree::PAGE_SZ;
|
||||
use crate::tenant::disk_btree::{DiskBtreeBuilder, DiskBtreeReader, VisitDirection};
|
||||
use crate::tenant::storage_layer::{
|
||||
LayerAccessStats, ValueReconstructResult, ValueReconstructState,
|
||||
|
||||
@@ -33,7 +33,6 @@ use tokio_util::sync::CancellationToken;
|
||||
use tracing::*;
|
||||
use utils::sync::gate::Gate;
|
||||
|
||||
use std::collections::{BTreeMap, BinaryHeap, HashMap, HashSet};
|
||||
use std::ops::{Deref, Range};
|
||||
use std::pin::pin;
|
||||
use std::sync::atomic::Ordering as AtomicOrdering;
|
||||
@@ -41,19 +40,24 @@ use std::sync::{Arc, Mutex, RwLock, Weak};
|
||||
use std::time::{Duration, Instant, SystemTime};
|
||||
use std::{
|
||||
cmp::{max, min, Ordering},
|
||||
collections::{BTreeMap, BinaryHeap, HashMap, HashSet},
|
||||
ops::ControlFlow,
|
||||
};
|
||||
|
||||
use crate::tenant::disk_btree::PAGE_SZ;
|
||||
use crate::tenant::storage_layer::delta_layer::DeltaEntry;
|
||||
use crate::{
|
||||
context::{AccessStatsBehavior, DownloadBehavior, RequestContext, RequestContextBuilder},
|
||||
disk_usage_eviction_task::EvictionCandidate,
|
||||
};
|
||||
|
||||
use crate::disk_usage_eviction_task::DiskUsageEvictionInfo;
|
||||
use crate::tenant::timeline::logical_size::CurrentLogicalSize;
|
||||
use crate::tenant::{
|
||||
layer_map::{LayerMap, SearchResult},
|
||||
metadata::{save_metadata, TimelineMetadata},
|
||||
par_fsync,
|
||||
};
|
||||
use crate::{
|
||||
context::{AccessStatsBehavior, DownloadBehavior, RequestContext, RequestContextBuilder},
|
||||
disk_usage_eviction_task::DiskUsageEvictionInfo,
|
||||
};
|
||||
use crate::{deletion_queue::DeletionQueueClient, tenant::remote_timeline_client::StopError};
|
||||
use crate::{
|
||||
disk_usage_eviction_task::finite_f32,
|
||||
@@ -63,9 +67,6 @@ use crate::{
|
||||
ValueReconstructState,
|
||||
},
|
||||
};
|
||||
use crate::{
|
||||
disk_usage_eviction_task::EvictionCandidate, tenant::storage_layer::delta_layer::DeltaEntry,
|
||||
};
|
||||
use crate::{pgdatadir_mapping::LsnForTimestamp, tenant::tasks::BackgroundLoopKind};
|
||||
|
||||
use crate::config::PageServerConf;
|
||||
@@ -90,7 +91,6 @@ use utils::{
|
||||
simple_rcu::{Rcu, RcuReadGuard},
|
||||
};
|
||||
|
||||
use crate::page_cache;
|
||||
use crate::repository::GcResult;
|
||||
use crate::repository::{Key, Value};
|
||||
use crate::task_mgr;
|
||||
@@ -2555,24 +2555,13 @@ impl Timeline {
|
||||
}
|
||||
}
|
||||
|
||||
/// # Cancel-safety
|
||||
///
|
||||
/// This method is cancellation-safe.
|
||||
async fn lookup_cached_page(
|
||||
&self,
|
||||
key: &Key,
|
||||
lsn: Lsn,
|
||||
ctx: &RequestContext,
|
||||
_key: &Key,
|
||||
_lsn: Lsn,
|
||||
_ctx: &RequestContext,
|
||||
) -> Option<(Lsn, Bytes)> {
|
||||
let cache = page_cache::get();
|
||||
|
||||
// FIXME: It's pointless to check the cache for things that are not 8kB pages.
|
||||
// We should look at the key to determine if it's a cacheable object
|
||||
let (lsn, read_guard) = cache
|
||||
.lookup_materialized_page(self.tenant_shard_id, self.timeline_id, key, lsn, ctx)
|
||||
.await?;
|
||||
let img = Bytes::from(read_guard.to_vec());
|
||||
Some((lsn, img))
|
||||
None
|
||||
}
|
||||
|
||||
fn get_ancestor_timeline(&self) -> anyhow::Result<Arc<Timeline>> {
|
||||
@@ -3618,7 +3607,7 @@ impl Timeline {
|
||||
// Determine N largest holes where N is number of compacted layers.
|
||||
let max_holes = deltas_to_compact.len();
|
||||
let last_record_lsn = self.get_last_record_lsn();
|
||||
let min_hole_range = (target_file_size / page_cache::PAGE_SZ as u64) as i128;
|
||||
let min_hole_range = (target_file_size / PAGE_SZ as u64) as i128;
|
||||
let min_hole_coverage_size = 3; // TODO: something more flexible?
|
||||
|
||||
// min-heap (reserve space for one more element added before eviction)
|
||||
@@ -3870,7 +3859,7 @@ impl Timeline {
|
||||
// Add two pages for potential overhead. This should in theory be already
|
||||
// accounted for in the target calculation, but for very small targets,
|
||||
// we still might easily hit the limit otherwise.
|
||||
let warn_limit = target_file_size * 2 + page_cache::PAGE_SZ as u64 * 2;
|
||||
let warn_limit = target_file_size * 2 + PAGE_SZ as u64 * 2;
|
||||
for layer in new_layers.iter() {
|
||||
if layer.layer_desc().file_size > warn_limit {
|
||||
warn!(
|
||||
@@ -4398,8 +4387,6 @@ impl Timeline {
|
||||
trace!("found {} WAL records that will init the page for {} at {}, performing WAL redo", data.records.len(), key, request_lsn);
|
||||
};
|
||||
|
||||
let last_rec_lsn = data.records.last().unwrap().0;
|
||||
|
||||
let img = match self
|
||||
.walredo_mgr
|
||||
.request_redo(key, request_lsn, data.img, data.records, self.pg_version)
|
||||
@@ -4410,23 +4397,6 @@ impl Timeline {
|
||||
Err(e) => return Err(PageReconstructError::WalRedo(e)),
|
||||
};
|
||||
|
||||
if img.len() == page_cache::PAGE_SZ {
|
||||
let cache = page_cache::get();
|
||||
if let Err(e) = cache
|
||||
.memorize_materialized_page(
|
||||
self.tenant_shard_id,
|
||||
self.timeline_id,
|
||||
key,
|
||||
last_rec_lsn,
|
||||
&img,
|
||||
)
|
||||
.await
|
||||
.context("Materialized page memoization failed")
|
||||
{
|
||||
return Err(PageReconstructError::from(e));
|
||||
}
|
||||
}
|
||||
|
||||
Ok(img)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -12,7 +12,6 @@
|
||||
//!
|
||||
use crate::metrics::{StorageIoOperation, STORAGE_IO_SIZE, STORAGE_IO_TIME_METRIC};
|
||||
|
||||
use crate::page_cache::PageWriteGuard;
|
||||
use crate::tenant::TENANTS_SEGMENT_NAME;
|
||||
use camino::{Utf8Path, Utf8PathBuf};
|
||||
use once_cell::sync::OnceCell;
|
||||
@@ -119,37 +118,6 @@ struct SlotInner {
|
||||
file: Option<OwnedFd>,
|
||||
}
|
||||
|
||||
/// Impl of [`tokio_epoll_uring::IoBuf`] and [`tokio_epoll_uring::IoBufMut`] for [`PageWriteGuard`].
|
||||
struct PageWriteGuardBuf {
|
||||
page: PageWriteGuard<'static>,
|
||||
init_up_to: usize,
|
||||
}
|
||||
// Safety: the [`PageWriteGuard`] gives us exclusive ownership of the page cache slot,
|
||||
// and the location remains stable even if [`Self`] or the [`PageWriteGuard`] is moved.
|
||||
unsafe impl tokio_epoll_uring::IoBuf for PageWriteGuardBuf {
|
||||
fn stable_ptr(&self) -> *const u8 {
|
||||
self.page.as_ptr()
|
||||
}
|
||||
fn bytes_init(&self) -> usize {
|
||||
self.init_up_to
|
||||
}
|
||||
fn bytes_total(&self) -> usize {
|
||||
self.page.len()
|
||||
}
|
||||
}
|
||||
// Safety: see above, plus: the ownership of [`PageWriteGuard`] means exclusive access,
|
||||
// hence it's safe to hand out the `stable_mut_ptr()`.
|
||||
unsafe impl tokio_epoll_uring::IoBufMut for PageWriteGuardBuf {
|
||||
fn stable_mut_ptr(&mut self) -> *mut u8 {
|
||||
self.page.as_mut_ptr()
|
||||
}
|
||||
|
||||
unsafe fn set_init(&mut self, pos: usize) {
|
||||
assert!(pos <= self.page.len());
|
||||
self.init_up_to = pos;
|
||||
}
|
||||
}
|
||||
|
||||
impl OpenFiles {
|
||||
/// Find a slot to use, evicting an existing file descriptor if needed.
|
||||
///
|
||||
@@ -559,21 +527,6 @@ impl VirtualFile {
|
||||
res.map(|()| buf)
|
||||
}
|
||||
|
||||
/// Like [`Self::read_exact_at`] but for [`PageWriteGuard`].
|
||||
pub async fn read_exact_at_page(
|
||||
&self,
|
||||
page: PageWriteGuard<'static>,
|
||||
offset: u64,
|
||||
) -> Result<PageWriteGuard<'static>, Error> {
|
||||
let buf = PageWriteGuardBuf {
|
||||
page,
|
||||
init_up_to: 0,
|
||||
};
|
||||
let res = self.read_exact_at(buf, offset).await;
|
||||
res.map(|PageWriteGuardBuf { page, .. }| page)
|
||||
.map_err(|e| Error::new(ErrorKind::Other, e))
|
||||
}
|
||||
|
||||
// Copied from https://doc.rust-lang.org/1.72.0/src/std/os/unix/fs.rs.html#219-235
|
||||
pub async fn write_all_at(&self, mut buf: &[u8], mut offset: u64) -> Result<(), Error> {
|
||||
while !buf.is_empty() {
|
||||
|
||||
Reference in New Issue
Block a user