mirror of
https://github.com/neondatabase/neon.git
synced 2026-06-02 21:10:38 +00:00
Compare commits
7 Commits
release-pr
...
problame/p
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
1863a04fb0 | ||
|
|
f83a71ca6a | ||
|
|
74a634c9fa | ||
|
|
fc3f8a65b3 | ||
|
|
9f03dd24c2 | ||
|
|
dc96a7604a | ||
|
|
d7c94e67ce |
27
Cargo.lock
generated
27
Cargo.lock
generated
@@ -158,6 +158,17 @@ dependencies = [
|
|||||||
"syn 1.0.109",
|
"syn 1.0.109",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "async-channel"
|
||||||
|
version = "1.9.0"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "81953c529336010edd6d8e358f886d9581267795c61b19475b71314bffa46d35"
|
||||||
|
dependencies = [
|
||||||
|
"concurrent-queue",
|
||||||
|
"event-listener",
|
||||||
|
"futures-core",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "async-compression"
|
name = "async-compression"
|
||||||
version = "0.4.0"
|
version = "0.4.0"
|
||||||
@@ -1031,6 +1042,15 @@ dependencies = [
|
|||||||
"zstd",
|
"zstd",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "concurrent-queue"
|
||||||
|
version = "2.3.0"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "f057a694a54f12365049b0958a1685bb52d567f5593b355fbf685838e873d400"
|
||||||
|
dependencies = [
|
||||||
|
"crossbeam-utils",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "const_format"
|
name = "const_format"
|
||||||
version = "0.2.30"
|
version = "0.2.30"
|
||||||
@@ -1452,6 +1472,12 @@ dependencies = [
|
|||||||
"libc",
|
"libc",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "event-listener"
|
||||||
|
version = "2.5.3"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "0206175f82b8d6bf6652ff7d71a1e27fd2e4efde587fd368662814d6ec1d9ce0"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "fail"
|
name = "fail"
|
||||||
version = "0.5.1"
|
version = "0.5.1"
|
||||||
@@ -2674,6 +2700,7 @@ name = "pageserver"
|
|||||||
version = "0.1.0"
|
version = "0.1.0"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"anyhow",
|
"anyhow",
|
||||||
|
"async-channel",
|
||||||
"async-compression",
|
"async-compression",
|
||||||
"async-stream",
|
"async-stream",
|
||||||
"async-trait",
|
"async-trait",
|
||||||
|
|||||||
@@ -35,6 +35,7 @@ license = "Apache-2.0"
|
|||||||
## All dependency versions, used in the project
|
## All dependency versions, used in the project
|
||||||
[workspace.dependencies]
|
[workspace.dependencies]
|
||||||
anyhow = { version = "1.0", features = ["backtrace"] }
|
anyhow = { version = "1.0", features = ["backtrace"] }
|
||||||
|
async-channel = "1.9.0"
|
||||||
async-compression = { version = "0.4.0", features = ["tokio", "gzip"] }
|
async-compression = { version = "0.4.0", features = ["tokio", "gzip"] }
|
||||||
flate2 = "1.0.26"
|
flate2 = "1.0.26"
|
||||||
async-stream = "0.3"
|
async-stream = "0.3"
|
||||||
|
|||||||
@@ -12,6 +12,7 @@ testing = ["fail/failpoints"]
|
|||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
anyhow.workspace = true
|
anyhow.workspace = true
|
||||||
|
async-channel.workspace = true
|
||||||
async-compression.workspace = true
|
async-compression.workspace = true
|
||||||
async-stream.workspace = true
|
async-stream.workspace = true
|
||||||
async-trait.workspace = true
|
async-trait.workspace = true
|
||||||
|
|||||||
@@ -86,15 +86,18 @@
|
|||||||
//! [`RequestContext`] argument. Functions in the middle of the call chain
|
//! [`RequestContext`] argument. Functions in the middle of the call chain
|
||||||
//! only need to pass it on.
|
//! only need to pass it on.
|
||||||
|
|
||||||
|
use std::sync::{Arc, Mutex, MutexGuard};
|
||||||
|
|
||||||
use crate::task_mgr::TaskKind;
|
use crate::task_mgr::TaskKind;
|
||||||
|
|
||||||
// The main structure of this module, see module-level comment.
|
// The main structure of this module, see module-level comment.
|
||||||
#[derive(Clone, Debug)]
|
#[derive(Clone)]
|
||||||
pub struct RequestContext {
|
pub struct RequestContext {
|
||||||
task_kind: TaskKind,
|
task_kind: TaskKind,
|
||||||
download_behavior: DownloadBehavior,
|
download_behavior: DownloadBehavior,
|
||||||
access_stats_behavior: AccessStatsBehavior,
|
access_stats_behavior: AccessStatsBehavior,
|
||||||
page_content_kind: PageContentKind,
|
page_content_kind: PageContentKind,
|
||||||
|
page_cache_permit: Option<Arc<crate::page_cache::PinnedSlotsPermit>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// The kind of access to the page cache.
|
/// The kind of access to the page cache.
|
||||||
@@ -150,6 +153,7 @@ impl RequestContextBuilder {
|
|||||||
download_behavior: DownloadBehavior::Download,
|
download_behavior: DownloadBehavior::Download,
|
||||||
access_stats_behavior: AccessStatsBehavior::Update,
|
access_stats_behavior: AccessStatsBehavior::Update,
|
||||||
page_content_kind: PageContentKind::Unknown,
|
page_content_kind: PageContentKind::Unknown,
|
||||||
|
page_cache_permit: None,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -163,6 +167,7 @@ impl RequestContextBuilder {
|
|||||||
download_behavior: original.download_behavior,
|
download_behavior: original.download_behavior,
|
||||||
access_stats_behavior: original.access_stats_behavior,
|
access_stats_behavior: original.access_stats_behavior,
|
||||||
page_content_kind: original.page_content_kind,
|
page_content_kind: original.page_content_kind,
|
||||||
|
page_cache_permit: original.page_cache_permit.clone(),
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -186,6 +191,11 @@ impl RequestContextBuilder {
|
|||||||
self
|
self
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub(crate) fn page_cache_permit(mut self, p: Arc<crate::page_cache::PinnedSlotsPermit>) -> Self {
|
||||||
|
self.inner.page_cache_permit = Some(p);
|
||||||
|
self
|
||||||
|
}
|
||||||
|
|
||||||
pub fn build(self) -> RequestContext {
|
pub fn build(self) -> RequestContext {
|
||||||
self.inner
|
self.inner
|
||||||
}
|
}
|
||||||
@@ -286,4 +296,8 @@ impl RequestContext {
|
|||||||
pub(crate) fn page_content_kind(&self) -> PageContentKind {
|
pub(crate) fn page_content_kind(&self) -> PageContentKind {
|
||||||
self.page_content_kind
|
self.page_content_kind
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub(crate) fn permit(&self) -> Option<&crate::page_cache::PinnedSlotsPermit> {
|
||||||
|
self.page_cache_permit.as_ref().map(|p| &**p)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -314,7 +314,6 @@ static PAGE_CACHE_ERRORS: Lazy<IntCounterVec> = Lazy::new(|| {
|
|||||||
#[strum(serialize_all = "kebab_case")]
|
#[strum(serialize_all = "kebab_case")]
|
||||||
pub(crate) enum PageCacheErrorKind {
|
pub(crate) enum PageCacheErrorKind {
|
||||||
AcquirePinnedSlotTimeout,
|
AcquirePinnedSlotTimeout,
|
||||||
EvictIterLimit,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) fn page_cache_errors_inc(error_kind: PageCacheErrorKind) {
|
pub(crate) fn page_cache_errors_inc(error_kind: PageCacheErrorKind) {
|
||||||
|
|||||||
@@ -66,8 +66,7 @@
|
|||||||
//! inserted to the mapping, but you must hold the write-lock on the slot until
|
//! inserted to the mapping, but you must hold the write-lock on the slot until
|
||||||
//! the contents are valid. If you need to release the lock without initializing
|
//! the contents are valid. If you need to release the lock without initializing
|
||||||
//! the contents, you must remove the mapping first. We make that easy for the
|
//! the contents, you must remove the mapping first. We make that easy for the
|
||||||
//! callers with PageWriteGuard: when lock_for_write() returns an uninitialized
|
//! callers with PageWriteGuard: the caller must explicitly call guard.mark_valid() after it has
|
||||||
//! page, the caller must explicitly call guard.mark_valid() after it has
|
|
||||||
//! initialized it. If the guard is dropped without calling mark_valid(), the
|
//! initialized it. If the guard is dropped without calling mark_valid(), the
|
||||||
//! mapping is automatically removed and the slot is marked free.
|
//! mapping is automatically removed and the slot is marked free.
|
||||||
//!
|
//!
|
||||||
@@ -79,6 +78,7 @@ use std::{
|
|||||||
atomic::{AtomicU64, AtomicU8, AtomicUsize, Ordering},
|
atomic::{AtomicU64, AtomicU8, AtomicUsize, Ordering},
|
||||||
Arc, Weak,
|
Arc, Weak,
|
||||||
},
|
},
|
||||||
|
task::Poll,
|
||||||
time::Duration,
|
time::Duration,
|
||||||
};
|
};
|
||||||
|
|
||||||
@@ -215,16 +215,21 @@ impl Slot {
|
|||||||
|
|
||||||
impl SlotInner {
|
impl SlotInner {
|
||||||
/// If there is aready a reader, drop our permit and share its permit, just like we share read access.
|
/// If there is aready a reader, drop our permit and share its permit, just like we share read access.
|
||||||
fn coalesce_readers_permit(&self, permit: PinnedSlotsPermit) -> Arc<PinnedSlotsPermit> {
|
fn coalesce_readers_permit<'c>(&self, permit: PermitKind<'c>) -> PermitKindReadGuard<'c> {
|
||||||
let mut guard = self.permit.lock().unwrap();
|
match permit {
|
||||||
if let Some(existing_permit) = guard.upgrade() {
|
PermitKind::CtxProvided(permit) => PermitKindReadGuard::CtxProvided(permit),
|
||||||
drop(guard);
|
PermitKind::Acquired(permit) => {
|
||||||
drop(permit);
|
let mut guard = self.permit.lock().unwrap();
|
||||||
existing_permit
|
if let Some(existing_permit) = guard.upgrade() {
|
||||||
} else {
|
drop(guard);
|
||||||
let permit = Arc::new(permit);
|
drop(permit);
|
||||||
*guard = Arc::downgrade(&permit);
|
existing_permit
|
||||||
permit
|
} else {
|
||||||
|
let permit = Arc::new(permit);
|
||||||
|
*guard = Arc::downgrade(&permit);
|
||||||
|
permit
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -252,21 +257,36 @@ pub struct PageCache {
|
|||||||
/// This is interpreted modulo the page cache size.
|
/// This is interpreted modulo the page cache size.
|
||||||
next_evict_slot: AtomicUsize,
|
next_evict_slot: AtomicUsize,
|
||||||
|
|
||||||
|
find_victim_sender:
|
||||||
|
async_channel::Sender<(usize, tokio::sync::RwLockWriteGuard<'static, SlotInner>)>,
|
||||||
|
find_victim_waiters:
|
||||||
|
async_channel::Receiver<(usize, tokio::sync::RwLockWriteGuard<'static, SlotInner>)>,
|
||||||
|
|
||||||
size_metrics: &'static PageCacheSizeMetrics,
|
size_metrics: &'static PageCacheSizeMetrics,
|
||||||
}
|
}
|
||||||
|
|
||||||
struct PinnedSlotsPermit(tokio::sync::OwnedSemaphorePermit);
|
pub(crate) struct PinnedSlotsPermit(tokio::sync::OwnedSemaphorePermit);
|
||||||
|
|
||||||
|
enum PermitKind<'c> {
|
||||||
|
CtxProvided(&'c PinnedSlotsPermit),
|
||||||
|
Acquired(PinnedSlotsPermit),
|
||||||
|
}
|
||||||
|
|
||||||
|
enum PermitKindReadGuard<'c> {
|
||||||
|
CtxProvided(&'c PinnedSlotsPermit),
|
||||||
|
Coalesced(Arc<PinnedSlotsPermit>),
|
||||||
|
}
|
||||||
|
|
||||||
///
|
///
|
||||||
/// PageReadGuard is a "lease" on a buffer, for reading. The page is kept locked
|
/// PageReadGuard is a "lease" on a buffer, for reading. The page is kept locked
|
||||||
/// until the guard is dropped.
|
/// until the guard is dropped.
|
||||||
///
|
///
|
||||||
pub struct PageReadGuard<'i> {
|
pub struct PageReadGuard<'c, 'i> {
|
||||||
_permit: Arc<PinnedSlotsPermit>,
|
_permit: PermitKindReadGuard<'c>,
|
||||||
slot_guard: tokio::sync::RwLockReadGuard<'i, SlotInner>,
|
slot_guard: tokio::sync::RwLockReadGuard<'i, SlotInner>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl std::ops::Deref for PageReadGuard<'_> {
|
impl std::ops::Deref for PageReadGuard<'_, '_> {
|
||||||
type Target = [u8; PAGE_SZ];
|
type Target = [u8; PAGE_SZ];
|
||||||
|
|
||||||
fn deref(&self) -> &Self::Target {
|
fn deref(&self) -> &Self::Target {
|
||||||
@@ -274,7 +294,7 @@ impl std::ops::Deref for PageReadGuard<'_> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl AsRef<[u8; PAGE_SZ]> for PageReadGuard<'_> {
|
impl AsRef<[u8; PAGE_SZ]> for PageReadGuard<'_, '_> {
|
||||||
fn as_ref(&self) -> &[u8; PAGE_SZ] {
|
fn as_ref(&self) -> &[u8; PAGE_SZ] {
|
||||||
self.slot_guard.buf
|
self.slot_guard.buf
|
||||||
}
|
}
|
||||||
@@ -286,78 +306,89 @@ impl AsRef<[u8; PAGE_SZ]> for PageReadGuard<'_> {
|
|||||||
///
|
///
|
||||||
/// Counterintuitively, this is used even for a read, if the requested page is not
|
/// Counterintuitively, this is used even for a read, if the requested page is not
|
||||||
/// currently found in the page cache. In that case, the caller of lock_for_read()
|
/// currently found in the page cache. In that case, the caller of lock_for_read()
|
||||||
/// is expected to fill in the page contents and call mark_valid(). Similarly
|
/// is expected to fill in the page contents and call mark_valid().
|
||||||
/// lock_for_write() can return an invalid buffer that the caller is expected to
|
pub struct PageWriteGuard<'c, 'i> {
|
||||||
/// to initialize.
|
state: PageWriteGuardState<'c, 'i>,
|
||||||
///
|
|
||||||
pub struct PageWriteGuard<'i> {
|
|
||||||
inner: tokio::sync::RwLockWriteGuard<'i, SlotInner>,
|
|
||||||
|
|
||||||
_permit: PinnedSlotsPermit,
|
|
||||||
|
|
||||||
// Are the page contents currently valid?
|
|
||||||
// Used to mark pages as invalid that are assigned but not yet filled with data.
|
|
||||||
valid: bool,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl std::ops::DerefMut for PageWriteGuard<'_> {
|
enum PageWriteGuardState<'c, 'i> {
|
||||||
|
Invalid {
|
||||||
|
inner: tokio::sync::RwLockWriteGuard<'i, SlotInner>,
|
||||||
|
_permit: PermitKindReadGuard<'c>,
|
||||||
|
},
|
||||||
|
Downgraded,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl std::ops::DerefMut for PageWriteGuard<'_, '_> {
|
||||||
fn deref_mut(&mut self) -> &mut Self::Target {
|
fn deref_mut(&mut self) -> &mut Self::Target {
|
||||||
self.inner.buf
|
match &mut self.state {
|
||||||
|
PageWriteGuardState::Invalid { inner, _permit } => inner.buf,
|
||||||
|
PageWriteGuardState::Downgraded => unreachable!(),
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl std::ops::Deref for PageWriteGuard<'_> {
|
impl std::ops::Deref for PageWriteGuard<'_, '_> {
|
||||||
type Target = [u8; PAGE_SZ];
|
type Target = [u8; PAGE_SZ];
|
||||||
|
|
||||||
fn deref(&self) -> &Self::Target {
|
fn deref(&self) -> &Self::Target {
|
||||||
self.inner.buf
|
match &self.state {
|
||||||
|
PageWriteGuardState::Invalid { inner, _permit } => inner.buf,
|
||||||
|
PageWriteGuardState::Downgraded => unreachable!(),
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl AsMut<[u8; PAGE_SZ]> for PageWriteGuard<'_> {
|
impl AsMut<[u8; PAGE_SZ]> for PageWriteGuard<'_, '_> {
|
||||||
fn as_mut(&mut self) -> &mut [u8; PAGE_SZ] {
|
fn as_mut(&mut self) -> &mut [u8; PAGE_SZ] {
|
||||||
self.inner.buf
|
match &mut self.state {
|
||||||
|
PageWriteGuardState::Invalid { inner, _permit } => inner.buf,
|
||||||
|
PageWriteGuardState::Downgraded => todo!(),
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl PageWriteGuard<'_> {
|
impl<'c, 'a> PageWriteGuard<'c, 'a> {
|
||||||
/// Mark that the buffer contents are now valid.
|
/// Mark that the buffer contents are now valid.
|
||||||
pub fn mark_valid(&mut self) {
|
#[must_use]
|
||||||
assert!(self.inner.key.is_some());
|
pub fn mark_valid(mut self) -> PageReadGuard<'c, 'a> {
|
||||||
assert!(
|
let prev = std::mem::replace(&mut self.state, PageWriteGuardState::Downgraded);
|
||||||
!self.valid,
|
match prev {
|
||||||
"mark_valid called on a buffer that was already valid"
|
PageWriteGuardState::Invalid { inner, _permit } => {
|
||||||
);
|
assert!(inner.key.is_some());
|
||||||
self.valid = true;
|
PageReadGuard {
|
||||||
|
_permit,
|
||||||
|
slot_guard: inner.downgrade(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
PageWriteGuardState::Downgraded => unreachable!(),
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Drop for PageWriteGuard<'_> {
|
impl Drop for PageWriteGuard<'_, '_> {
|
||||||
///
|
///
|
||||||
/// If the buffer was allocated for a page that was not already in the
|
/// If the buffer was allocated for a page that was not already in the
|
||||||
/// cache, but the lock_for_read/write() caller dropped the buffer without
|
/// cache, but the lock_for_read/write() caller dropped the buffer without
|
||||||
/// initializing it, remove the mapping from the page cache.
|
/// initializing it, remove the mapping from the page cache.
|
||||||
///
|
///
|
||||||
fn drop(&mut self) {
|
fn drop(&mut self) {
|
||||||
assert!(self.inner.key.is_some());
|
match &mut self.state {
|
||||||
if !self.valid {
|
PageWriteGuardState::Invalid { inner, _permit } => {
|
||||||
let self_key = self.inner.key.as_ref().unwrap();
|
assert!(inner.key.is_some());
|
||||||
PAGE_CACHE.get().unwrap().remove_mapping(self_key);
|
let self_key = inner.key.as_ref().unwrap();
|
||||||
self.inner.key = None;
|
PAGE_CACHE.get().unwrap().remove_mapping(self_key);
|
||||||
|
inner.key = None;
|
||||||
|
}
|
||||||
|
PageWriteGuardState::Downgraded => {}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// lock_for_read() return value
|
/// lock_for_read() return value
|
||||||
pub enum ReadBufResult<'a> {
|
pub enum ReadBufResult<'c, 'a> {
|
||||||
Found(PageReadGuard<'a>),
|
Found(PageReadGuard<'c, 'a>),
|
||||||
NotFound(PageWriteGuard<'a>),
|
NotFound(PageWriteGuard<'c, 'a>),
|
||||||
}
|
|
||||||
|
|
||||||
/// lock_for_write() return value
|
|
||||||
pub enum WriteBufResult<'a> {
|
|
||||||
Found(PageWriteGuard<'a>),
|
|
||||||
NotFound(PageWriteGuard<'a>),
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl PageCache {
|
impl PageCache {
|
||||||
@@ -379,10 +410,9 @@ impl PageCache {
|
|||||||
lsn: Lsn,
|
lsn: Lsn,
|
||||||
ctx: &RequestContext,
|
ctx: &RequestContext,
|
||||||
) -> Option<(Lsn, PageReadGuard)> {
|
) -> Option<(Lsn, PageReadGuard)> {
|
||||||
let Ok(permit) = self.try_get_pinned_slot_permit().await else {
|
let Ok(permit) = self.try_get_pinned_slot_permit(ctx).await else {
|
||||||
return None;
|
return None;
|
||||||
};
|
};
|
||||||
|
|
||||||
crate::metrics::PAGE_CACHE
|
crate::metrics::PAGE_CACHE
|
||||||
.for_ctx(ctx)
|
.for_ctx(ctx)
|
||||||
.read_accesses_materialized_page
|
.read_accesses_materialized_page
|
||||||
@@ -430,12 +460,13 @@ impl PageCache {
|
|||||||
/// Store an image of the given page in the cache.
|
/// Store an image of the given page in the cache.
|
||||||
///
|
///
|
||||||
pub async fn memorize_materialized_page(
|
pub async fn memorize_materialized_page(
|
||||||
&self,
|
&'static self,
|
||||||
tenant_id: TenantId,
|
tenant_id: TenantId,
|
||||||
timeline_id: TimelineId,
|
timeline_id: TimelineId,
|
||||||
key: Key,
|
key: Key,
|
||||||
lsn: Lsn,
|
lsn: Lsn,
|
||||||
img: &[u8],
|
img: &[u8],
|
||||||
|
ctx: &RequestContext,
|
||||||
) -> anyhow::Result<()> {
|
) -> anyhow::Result<()> {
|
||||||
let cache_key = CacheKey::MaterializedPage {
|
let cache_key = CacheKey::MaterializedPage {
|
||||||
hash_key: MaterializedPageHashKey {
|
hash_key: MaterializedPageHashKey {
|
||||||
@@ -446,30 +477,87 @@ impl PageCache {
|
|||||||
lsn,
|
lsn,
|
||||||
};
|
};
|
||||||
|
|
||||||
match self.lock_for_write(&cache_key).await? {
|
let mut permit = Some(self.try_get_pinned_slot_permit(ctx).await?);
|
||||||
WriteBufResult::Found(write_guard) => {
|
loop {
|
||||||
// We already had it in cache. Another thread must've put it there
|
// First check if the key already exists in the cache.
|
||||||
// concurrently. Check that it had the same contents that we
|
if let Some(slot_idx) = self.search_mapping_exact(&cache_key) {
|
||||||
// replayed.
|
// The page was found in the mapping. Lock the slot, and re-check
|
||||||
assert!(*write_guard == img);
|
// that it's still what we expected (because we don't released the mapping
|
||||||
|
// lock already, another thread could have evicted the page)
|
||||||
|
let slot = &self.slots[slot_idx];
|
||||||
|
let inner = slot.inner.write().await;
|
||||||
|
if inner.key.as_ref() == Some(&cache_key) {
|
||||||
|
slot.inc_usage_count();
|
||||||
|
debug_assert!(
|
||||||
|
{
|
||||||
|
let guard = inner.permit.lock().unwrap();
|
||||||
|
guard.upgrade().is_none()
|
||||||
|
},
|
||||||
|
"we hold a write lock, so, no one else should have a permit"
|
||||||
|
);
|
||||||
|
debug_assert_eq!(inner.buf.len(), img.len());
|
||||||
|
// We already had it in cache. Another thread must've put it there
|
||||||
|
// concurrently. Check that it had the same contents that we
|
||||||
|
// replayed.
|
||||||
|
assert!(inner.buf == img);
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
WriteBufResult::NotFound(mut write_guard) => {
|
debug_assert!(permit.is_some());
|
||||||
write_guard.copy_from_slice(img);
|
|
||||||
write_guard.mark_valid();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(())
|
// Not found. Find a victim buffer
|
||||||
|
let (slot_idx, mut inner) = self
|
||||||
|
.find_victim(permit.as_ref().unwrap())
|
||||||
|
.await
|
||||||
|
.context("Failed to find evict victim")?;
|
||||||
|
|
||||||
|
// Insert mapping for this. At this point, we may find that another
|
||||||
|
// thread did the same thing concurrently. In that case, we evicted
|
||||||
|
// our victim buffer unnecessarily. Put it into the free list and
|
||||||
|
// continue with the slot that the other thread chose.
|
||||||
|
if let Some(_existing_slot_idx) = self.try_insert_mapping(&cache_key, slot_idx) {
|
||||||
|
// TODO: put to free list
|
||||||
|
|
||||||
|
// We now just loop back to start from beginning. This is not
|
||||||
|
// optimal, we'll perform the lookup in the mapping again, which
|
||||||
|
// is not really necessary because we already got
|
||||||
|
// 'existing_slot_idx'. But this shouldn't happen often enough
|
||||||
|
// to matter much.
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Make the slot ready
|
||||||
|
let slot = &self.slots[slot_idx];
|
||||||
|
inner.key = Some(cache_key.clone());
|
||||||
|
slot.set_usage_count(1);
|
||||||
|
// Create a write guard for the slot so we go through the expected motions.
|
||||||
|
debug_assert!(
|
||||||
|
{
|
||||||
|
let guard = inner.permit.lock().unwrap();
|
||||||
|
guard.upgrade().is_none()
|
||||||
|
},
|
||||||
|
"we hold a write lock, so, no one else should have a permit"
|
||||||
|
);
|
||||||
|
let mut write_guard = PageWriteGuard {
|
||||||
|
state: PageWriteGuardState::Invalid {
|
||||||
|
_permit: permit.take().unwrap(),
|
||||||
|
inner,
|
||||||
|
},
|
||||||
|
};
|
||||||
|
write_guard.copy_from_slice(img);
|
||||||
|
let _ = write_guard.mark_valid();
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Section 1.2: Public interface functions for working with immutable file pages.
|
// Section 1.2: Public interface functions for working with immutable file pages.
|
||||||
|
|
||||||
pub async fn read_immutable_buf(
|
pub async fn read_immutable_buf<'c>(
|
||||||
&self,
|
&'static self,
|
||||||
file_id: FileId,
|
file_id: FileId,
|
||||||
blkno: u32,
|
blkno: u32,
|
||||||
ctx: &RequestContext,
|
ctx: &'c RequestContext,
|
||||||
) -> anyhow::Result<ReadBufResult> {
|
) -> anyhow::Result<ReadBufResult<'c, 'static>> {
|
||||||
let mut cache_key = CacheKey::ImmutableFilePage { file_id, blkno };
|
let mut cache_key = CacheKey::ImmutableFilePage { file_id, blkno };
|
||||||
|
|
||||||
self.lock_for_read(&mut cache_key, ctx).await
|
self.lock_for_read(&mut cache_key, ctx).await
|
||||||
@@ -483,7 +571,22 @@ impl PageCache {
|
|||||||
// "mappings" after this section. But the routines in this section should
|
// "mappings" after this section. But the routines in this section should
|
||||||
// not require changes.
|
// not require changes.
|
||||||
|
|
||||||
async fn try_get_pinned_slot_permit(&self) -> anyhow::Result<PinnedSlotsPermit> {
|
pub(crate) async fn get_permit(&self) -> Arc<PinnedSlotsPermit> {
|
||||||
|
Arc::new(PinnedSlotsPermit(
|
||||||
|
Arc::clone(&self.pinned_slots)
|
||||||
|
.acquire_owned()
|
||||||
|
.await
|
||||||
|
.expect("the semaphore is never closed"),
|
||||||
|
))
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn try_get_pinned_slot_permit<'c>(
|
||||||
|
&self,
|
||||||
|
ctx: &'c RequestContext,
|
||||||
|
) -> anyhow::Result<PermitKind<'c>> {
|
||||||
|
if let Some(permit) = ctx.permit() {
|
||||||
|
return Ok(PermitKind::CtxProvided(permit));
|
||||||
|
};
|
||||||
let timer = crate::metrics::PAGE_CACHE_ACQUIRE_PINNED_SLOT_TIME.start_timer();
|
let timer = crate::metrics::PAGE_CACHE_ACQUIRE_PINNED_SLOT_TIME.start_timer();
|
||||||
match tokio::time::timeout(
|
match tokio::time::timeout(
|
||||||
// Choose small timeout, neon_smgr does its own retries.
|
// Choose small timeout, neon_smgr does its own retries.
|
||||||
@@ -493,9 +596,9 @@ impl PageCache {
|
|||||||
)
|
)
|
||||||
.await
|
.await
|
||||||
{
|
{
|
||||||
Ok(res) => Ok(PinnedSlotsPermit(
|
Ok(res) => Ok(PermitKind::Acquired(PinnedSlotsPermit(
|
||||||
res.expect("this semaphore is never closed"),
|
res.expect("this semaphore is never closed"),
|
||||||
)),
|
))),
|
||||||
Err(_timeout) => {
|
Err(_timeout) => {
|
||||||
timer.stop_and_discard();
|
timer.stop_and_discard();
|
||||||
crate::metrics::page_cache_errors_inc(
|
crate::metrics::page_cache_errors_inc(
|
||||||
@@ -515,10 +618,10 @@ impl PageCache {
|
|||||||
///
|
///
|
||||||
/// If no page is found, returns None and *cache_key is left unmodified.
|
/// If no page is found, returns None and *cache_key is left unmodified.
|
||||||
///
|
///
|
||||||
async fn try_lock_for_read(
|
async fn try_lock_for_read<'c>(
|
||||||
&self,
|
&self,
|
||||||
cache_key: &mut CacheKey,
|
cache_key: &mut CacheKey,
|
||||||
permit: &mut Option<PinnedSlotsPermit>,
|
permit: &mut Option<PermitKind<'c>>,
|
||||||
) -> Option<PageReadGuard> {
|
) -> Option<PageReadGuard> {
|
||||||
let cache_key_orig = cache_key.clone();
|
let cache_key_orig = cache_key.clone();
|
||||||
if let Some(slot_idx) = self.search_mapping(cache_key) {
|
if let Some(slot_idx) = self.search_mapping(cache_key) {
|
||||||
@@ -571,11 +674,11 @@ impl PageCache {
|
|||||||
/// ```
|
/// ```
|
||||||
///
|
///
|
||||||
async fn lock_for_read(
|
async fn lock_for_read(
|
||||||
&self,
|
&'static self,
|
||||||
cache_key: &mut CacheKey,
|
cache_key: &mut CacheKey,
|
||||||
ctx: &RequestContext,
|
ctx: &RequestContext,
|
||||||
) -> anyhow::Result<ReadBufResult> {
|
) -> anyhow::Result<ReadBufResult> {
|
||||||
let mut permit = Some(self.try_get_pinned_slot_permit().await?);
|
let mut permit = Some(self.try_get_pinned_slot_permit(ctx).await?);
|
||||||
|
|
||||||
let (read_access, hit) = match cache_key {
|
let (read_access, hit) = match cache_key {
|
||||||
CacheKey::MaterializedPage { .. } => {
|
CacheKey::MaterializedPage { .. } => {
|
||||||
@@ -638,99 +741,10 @@ impl PageCache {
|
|||||||
);
|
);
|
||||||
|
|
||||||
return Ok(ReadBufResult::NotFound(PageWriteGuard {
|
return Ok(ReadBufResult::NotFound(PageWriteGuard {
|
||||||
_permit: permit.take().unwrap(),
|
state: PageWriteGuardState::Invalid {
|
||||||
inner,
|
|
||||||
valid: false,
|
|
||||||
}));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Look up a page in the cache and lock it in write mode. If it's not
|
|
||||||
/// found, returns None.
|
|
||||||
///
|
|
||||||
/// When locking a page for writing, the search criteria is always "exact".
|
|
||||||
async fn try_lock_for_write(
|
|
||||||
&self,
|
|
||||||
cache_key: &CacheKey,
|
|
||||||
permit: &mut Option<PinnedSlotsPermit>,
|
|
||||||
) -> Option<PageWriteGuard> {
|
|
||||||
if let Some(slot_idx) = self.search_mapping_for_write(cache_key) {
|
|
||||||
// The page was found in the mapping. Lock the slot, and re-check
|
|
||||||
// that it's still what we expected (because we don't released the mapping
|
|
||||||
// lock already, another thread could have evicted the page)
|
|
||||||
let slot = &self.slots[slot_idx];
|
|
||||||
let inner = slot.inner.write().await;
|
|
||||||
if inner.key.as_ref() == Some(cache_key) {
|
|
||||||
slot.inc_usage_count();
|
|
||||||
debug_assert!(
|
|
||||||
{
|
|
||||||
let guard = inner.permit.lock().unwrap();
|
|
||||||
guard.upgrade().is_none()
|
|
||||||
},
|
|
||||||
"we hold a write lock, so, no one else should have a permit"
|
|
||||||
);
|
|
||||||
return Some(PageWriteGuard {
|
|
||||||
_permit: permit.take().unwrap(),
|
_permit: permit.take().unwrap(),
|
||||||
inner,
|
inner,
|
||||||
valid: true,
|
|
||||||
});
|
|
||||||
}
|
|
||||||
}
|
|
||||||
None
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Return a write-locked buffer for given block.
|
|
||||||
///
|
|
||||||
/// Similar to lock_for_read(), but the returned buffer is write-locked and
|
|
||||||
/// may be modified by the caller even if it's already found in the cache.
|
|
||||||
async fn lock_for_write(&self, cache_key: &CacheKey) -> anyhow::Result<WriteBufResult> {
|
|
||||||
let mut permit = Some(self.try_get_pinned_slot_permit().await?);
|
|
||||||
loop {
|
|
||||||
// First check if the key already exists in the cache.
|
|
||||||
if let Some(write_guard) = self.try_lock_for_write(cache_key, &mut permit).await {
|
|
||||||
debug_assert!(permit.is_none());
|
|
||||||
return Ok(WriteBufResult::Found(write_guard));
|
|
||||||
}
|
|
||||||
debug_assert!(permit.is_some());
|
|
||||||
|
|
||||||
// Not found. Find a victim buffer
|
|
||||||
let (slot_idx, mut inner) = self
|
|
||||||
.find_victim(permit.as_ref().unwrap())
|
|
||||||
.await
|
|
||||||
.context("Failed to find evict victim")?;
|
|
||||||
|
|
||||||
// Insert mapping for this. At this point, we may find that another
|
|
||||||
// thread did the same thing concurrently. In that case, we evicted
|
|
||||||
// our victim buffer unnecessarily. Put it into the free list and
|
|
||||||
// continue with the slot that the other thread chose.
|
|
||||||
if let Some(_existing_slot_idx) = self.try_insert_mapping(cache_key, slot_idx) {
|
|
||||||
// TODO: put to free list
|
|
||||||
|
|
||||||
// We now just loop back to start from beginning. This is not
|
|
||||||
// optimal, we'll perform the lookup in the mapping again, which
|
|
||||||
// is not really necessary because we already got
|
|
||||||
// 'existing_slot_idx'. But this shouldn't happen often enough
|
|
||||||
// to matter much.
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Make the slot ready
|
|
||||||
let slot = &self.slots[slot_idx];
|
|
||||||
inner.key = Some(cache_key.clone());
|
|
||||||
slot.set_usage_count(1);
|
|
||||||
|
|
||||||
debug_assert!(
|
|
||||||
{
|
|
||||||
let guard = inner.permit.lock().unwrap();
|
|
||||||
guard.upgrade().is_none()
|
|
||||||
},
|
},
|
||||||
"we hold a write lock, so, no one else should have a permit"
|
|
||||||
);
|
|
||||||
|
|
||||||
return Ok(WriteBufResult::NotFound(PageWriteGuard {
|
|
||||||
_permit: permit.take().unwrap(),
|
|
||||||
inner,
|
|
||||||
valid: false,
|
|
||||||
}));
|
}));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -775,7 +789,7 @@ impl PageCache {
|
|||||||
///
|
///
|
||||||
/// Like 'search_mapping, but performs an "exact" search. Used for
|
/// Like 'search_mapping, but performs an "exact" search. Used for
|
||||||
/// allocating a new buffer.
|
/// allocating a new buffer.
|
||||||
fn search_mapping_for_write(&self, key: &CacheKey) -> Option<usize> {
|
fn search_mapping_exact(&self, key: &CacheKey) -> Option<usize> {
|
||||||
match key {
|
match key {
|
||||||
CacheKey::MaterializedPage { hash_key, lsn } => {
|
CacheKey::MaterializedPage { hash_key, lsn } => {
|
||||||
let map = self.materialized_page_map.read().unwrap();
|
let map = self.materialized_page_map.read().unwrap();
|
||||||
@@ -882,10 +896,12 @@ impl PageCache {
|
|||||||
///
|
///
|
||||||
/// On return, the slot is empty and write-locked.
|
/// On return, the slot is empty and write-locked.
|
||||||
async fn find_victim(
|
async fn find_victim(
|
||||||
&self,
|
&'static self,
|
||||||
_permit_witness: &PinnedSlotsPermit,
|
_permit_witness: &PinnedSlotsPermit,
|
||||||
) -> anyhow::Result<(usize, tokio::sync::RwLockWriteGuard<SlotInner>)> {
|
) -> anyhow::Result<(usize, tokio::sync::RwLockWriteGuard<SlotInner>)> {
|
||||||
let iter_limit = self.slots.len() * 10;
|
// Get in line.
|
||||||
|
let receiver = self.find_victim_waiters.recv();
|
||||||
|
|
||||||
let mut iters = 0;
|
let mut iters = 0;
|
||||||
loop {
|
loop {
|
||||||
iters += 1;
|
iters += 1;
|
||||||
@@ -897,41 +913,8 @@ impl PageCache {
|
|||||||
let mut inner = match slot.inner.try_write() {
|
let mut inner = match slot.inner.try_write() {
|
||||||
Ok(inner) => inner,
|
Ok(inner) => inner,
|
||||||
Err(_err) => {
|
Err(_err) => {
|
||||||
if iters > iter_limit {
|
if iters > self.slots.len() * (MAX_USAGE_COUNT as usize) {
|
||||||
// NB: Even with the permits, there's no hard guarantee that we will find a slot with
|
unreachable!("find_victim_waiters prevents starvation");
|
||||||
// any particular number of iterations: other threads might race ahead and acquire and
|
|
||||||
// release pins just as we're scanning the array.
|
|
||||||
//
|
|
||||||
// Imagine that nslots is 2, and as starting point, usage_count==1 on all
|
|
||||||
// slots. There are two threads running concurrently, A and B. A has just
|
|
||||||
// acquired the permit from the semaphore.
|
|
||||||
//
|
|
||||||
// A: Look at slot 1. Its usage_count == 1, so decrement it to zero, and continue the search
|
|
||||||
// B: Acquire permit.
|
|
||||||
// B: Look at slot 2, decrement its usage_count to zero and continue the search
|
|
||||||
// B: Look at slot 1. Its usage_count is zero, so pin it and bump up its usage_count to 1.
|
|
||||||
// B: Release pin and permit again
|
|
||||||
// B: Acquire permit.
|
|
||||||
// B: Look at slot 2. Its usage_count is zero, so pin it and bump up its usage_count to 1.
|
|
||||||
// B: Release pin and permit again
|
|
||||||
//
|
|
||||||
// Now we're back in the starting situation that both slots have
|
|
||||||
// usage_count 1, but A has now been through one iteration of the
|
|
||||||
// find_victim() loop. This can repeat indefinitely and on each
|
|
||||||
// iteration, A's iteration count increases by one.
|
|
||||||
//
|
|
||||||
// So, even though the semaphore for the permits is fair, the victim search
|
|
||||||
// itself happens in parallel and is not fair.
|
|
||||||
// Hence even with a permit, a task can theoretically be starved.
|
|
||||||
// To avoid this, we'd need tokio to give priority to tasks that are holding
|
|
||||||
// permits for longer.
|
|
||||||
// Note that just yielding to tokio during iteration without such
|
|
||||||
// priority boosting is likely counter-productive. We'd just give more opportunities
|
|
||||||
// for B to bump usage count, further starving A.
|
|
||||||
crate::metrics::page_cache_errors_inc(
|
|
||||||
crate::metrics::PageCacheErrorKind::EvictIterLimit,
|
|
||||||
);
|
|
||||||
anyhow::bail!("exceeded evict iter limit");
|
|
||||||
}
|
}
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
@@ -942,7 +925,16 @@ impl PageCache {
|
|||||||
inner.key = None;
|
inner.key = None;
|
||||||
}
|
}
|
||||||
crate::metrics::PAGE_CACHE_FIND_VICTIMS_ITERS_TOTAL.inc_by(iters as u64);
|
crate::metrics::PAGE_CACHE_FIND_VICTIMS_ITERS_TOTAL.inc_by(iters as u64);
|
||||||
return Ok((slot_idx, inner));
|
self.find_victim_sender
|
||||||
|
.try_send((slot_idx, inner))
|
||||||
|
.expect("we always get in line first");
|
||||||
|
match futures::poll!(receiver) {
|
||||||
|
Poll::Ready(Ok(res)) => return Ok(res),
|
||||||
|
Poll::Ready(Err(_closed)) => unreachable!("we never close"),
|
||||||
|
Poll::Pending => {
|
||||||
|
unreachable!("we just sent to the channel and got in line earlier")
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -979,6 +971,7 @@ impl PageCache {
|
|||||||
})
|
})
|
||||||
.collect();
|
.collect();
|
||||||
|
|
||||||
|
let (find_victim_sender, find_victim_waiters) = async_channel::bounded(num_pages);
|
||||||
Self {
|
Self {
|
||||||
materialized_page_map: Default::default(),
|
materialized_page_map: Default::default(),
|
||||||
immutable_page_map: Default::default(),
|
immutable_page_map: Default::default(),
|
||||||
@@ -986,6 +979,8 @@ impl PageCache {
|
|||||||
next_evict_slot: AtomicUsize::new(0),
|
next_evict_slot: AtomicUsize::new(0),
|
||||||
size_metrics,
|
size_metrics,
|
||||||
pinned_slots: Arc::new(tokio::sync::Semaphore::new(num_pages)),
|
pinned_slots: Arc::new(tokio::sync::Semaphore::new(num_pages)),
|
||||||
|
find_victim_sender,
|
||||||
|
find_victim_waiters,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -20,10 +20,10 @@ use std::io::{Error, ErrorKind};
|
|||||||
|
|
||||||
impl<'a> BlockCursor<'a> {
|
impl<'a> BlockCursor<'a> {
|
||||||
/// Read a blob into a new buffer.
|
/// Read a blob into a new buffer.
|
||||||
pub async fn read_blob(
|
pub async fn read_blob<'c>(
|
||||||
&self,
|
&self,
|
||||||
offset: u64,
|
offset: u64,
|
||||||
ctx: &RequestContext,
|
ctx: &'c RequestContext,
|
||||||
) -> Result<Vec<u8>, std::io::Error> {
|
) -> Result<Vec<u8>, std::io::Error> {
|
||||||
let mut buf = Vec::new();
|
let mut buf = Vec::new();
|
||||||
self.read_blob_into_buf(offset, &mut buf, ctx).await?;
|
self.read_blob_into_buf(offset, &mut buf, ctx).await?;
|
||||||
@@ -31,11 +31,11 @@ impl<'a> BlockCursor<'a> {
|
|||||||
}
|
}
|
||||||
/// Read blob into the given buffer. Any previous contents in the buffer
|
/// Read blob into the given buffer. Any previous contents in the buffer
|
||||||
/// are overwritten.
|
/// are overwritten.
|
||||||
pub async fn read_blob_into_buf(
|
pub async fn read_blob_into_buf<'c>(
|
||||||
&self,
|
&self,
|
||||||
offset: u64,
|
offset: u64,
|
||||||
dstbuf: &mut Vec<u8>,
|
dstbuf: &mut Vec<u8>,
|
||||||
ctx: &RequestContext,
|
ctx: &'c RequestContext,
|
||||||
) -> Result<(), std::io::Error> {
|
) -> Result<(), std::io::Error> {
|
||||||
let mut blknum = (offset / PAGE_SZ as u64) as u32;
|
let mut blknum = (offset / PAGE_SZ as u64) as u32;
|
||||||
let mut off = (offset % PAGE_SZ as u64) as usize;
|
let mut off = (offset % PAGE_SZ as u64) as usize;
|
||||||
|
|||||||
@@ -34,27 +34,27 @@ where
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Reference to an in-memory copy of an immutable on-disk block.
|
/// Reference to an in-memory copy of an immutable on-disk block.
|
||||||
pub enum BlockLease<'a> {
|
pub enum BlockLease<'c, 'a> {
|
||||||
PageReadGuard(PageReadGuard<'static>),
|
PageReadGuard(PageReadGuard<'c, 'static>),
|
||||||
EphemeralFileMutableTail(&'a [u8; PAGE_SZ]),
|
EphemeralFileMutableTail(&'a [u8; PAGE_SZ]),
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
Arc(std::sync::Arc<[u8; PAGE_SZ]>),
|
Arc(std::sync::Arc<[u8; PAGE_SZ]>),
|
||||||
}
|
}
|
||||||
|
|
||||||
impl From<PageReadGuard<'static>> for BlockLease<'static> {
|
impl<'c, 'a> From<PageReadGuard<'c, 'a>> for BlockLease<'c, 'a> {
|
||||||
fn from(value: PageReadGuard<'static>) -> BlockLease<'static> {
|
fn from(value: PageReadGuard<'c, 'a>) -> BlockLease<'c, 'a> {
|
||||||
BlockLease::PageReadGuard(value)
|
BlockLease::PageReadGuard(value)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
impl<'a> From<std::sync::Arc<[u8; PAGE_SZ]>> for BlockLease<'a> {
|
impl<'c, 'a> From<std::sync::Arc<[u8; PAGE_SZ]>> for BlockLease<'c, 'a> {
|
||||||
fn from(value: std::sync::Arc<[u8; PAGE_SZ]>) -> Self {
|
fn from(value: std::sync::Arc<[u8; PAGE_SZ]>) -> Self {
|
||||||
BlockLease::Arc(value)
|
BlockLease::Arc(value)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<'a> Deref for BlockLease<'a> {
|
impl<'c, 'a> Deref for BlockLease<'c, 'a> {
|
||||||
type Target = [u8; PAGE_SZ];
|
type Target = [u8; PAGE_SZ];
|
||||||
|
|
||||||
fn deref(&self) -> &Self::Target {
|
fn deref(&self) -> &Self::Target {
|
||||||
@@ -83,11 +83,11 @@ pub(crate) enum BlockReaderRef<'a> {
|
|||||||
|
|
||||||
impl<'a> BlockReaderRef<'a> {
|
impl<'a> BlockReaderRef<'a> {
|
||||||
#[inline(always)]
|
#[inline(always)]
|
||||||
async fn read_blk(
|
async fn read_blk<'c>(
|
||||||
&self,
|
&self,
|
||||||
blknum: u32,
|
blknum: u32,
|
||||||
ctx: &RequestContext,
|
ctx: &'c RequestContext,
|
||||||
) -> Result<BlockLease, std::io::Error> {
|
) -> Result<BlockLease<'c, '_>, std::io::Error> {
|
||||||
use BlockReaderRef::*;
|
use BlockReaderRef::*;
|
||||||
match self {
|
match self {
|
||||||
FileBlockReader(r) => r.read_blk(blknum, ctx).await,
|
FileBlockReader(r) => r.read_blk(blknum, ctx).await,
|
||||||
@@ -141,11 +141,11 @@ impl<'a> BlockCursor<'a> {
|
|||||||
/// access to the contents of the page. (For the page cache, the
|
/// access to the contents of the page. (For the page cache, the
|
||||||
/// lease object represents a lock on the buffer.)
|
/// lease object represents a lock on the buffer.)
|
||||||
#[inline(always)]
|
#[inline(always)]
|
||||||
pub async fn read_blk(
|
pub async fn read_blk<'c>(
|
||||||
&self,
|
&self,
|
||||||
blknum: u32,
|
blknum: u32,
|
||||||
ctx: &RequestContext,
|
ctx: &'c RequestContext,
|
||||||
) -> Result<BlockLease, std::io::Error> {
|
) -> Result<BlockLease<'c, '_>, std::io::Error> {
|
||||||
self.reader.read_blk(blknum, ctx).await
|
self.reader.read_blk(blknum, ctx).await
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -180,32 +180,27 @@ impl FileBlockReader {
|
|||||||
/// Returns a "lease" object that can be used to
|
/// Returns a "lease" object that can be used to
|
||||||
/// access to the contents of the page. (For the page cache, the
|
/// access to the contents of the page. (For the page cache, the
|
||||||
/// lease object represents a lock on the buffer.)
|
/// lease object represents a lock on the buffer.)
|
||||||
pub async fn read_blk(
|
pub async fn read_blk<'c>(
|
||||||
&self,
|
&self,
|
||||||
blknum: u32,
|
blknum: u32,
|
||||||
ctx: &RequestContext,
|
ctx: &'c RequestContext,
|
||||||
) -> Result<BlockLease, std::io::Error> {
|
) -> Result<BlockLease<'c, 'static>, std::io::Error> {
|
||||||
let cache = page_cache::get();
|
let cache = page_cache::get();
|
||||||
loop {
|
match cache
|
||||||
match cache
|
.read_immutable_buf(self.file_id, blknum, ctx)
|
||||||
.read_immutable_buf(self.file_id, blknum, ctx)
|
.await
|
||||||
.await
|
.map_err(|e| {
|
||||||
.map_err(|e| {
|
std::io::Error::new(
|
||||||
std::io::Error::new(
|
std::io::ErrorKind::Other,
|
||||||
std::io::ErrorKind::Other,
|
format!("Failed to read immutable buf: {e:#}"),
|
||||||
format!("Failed to read immutable buf: {e:#}"),
|
)
|
||||||
)
|
})? {
|
||||||
})? {
|
ReadBufResult::Found(guard) => Ok(guard.into()),
|
||||||
ReadBufResult::Found(guard) => break Ok(guard.into()),
|
ReadBufResult::NotFound(mut write_guard) => {
|
||||||
ReadBufResult::NotFound(mut write_guard) => {
|
// Read the page from disk into the buffer
|
||||||
// Read the page from disk into the buffer
|
self.fill_buffer(write_guard.deref_mut(), blknum).await?;
|
||||||
self.fill_buffer(write_guard.deref_mut(), blknum).await?;
|
Ok(write_guard.mark_valid().into())
|
||||||
write_guard.mark_valid();
|
}
|
||||||
|
|
||||||
// Swap for read lock
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -64,44 +64,40 @@ impl EphemeralFile {
|
|||||||
self.len
|
self.len
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) async fn read_blk(
|
pub(crate) async fn read_blk<'c>(
|
||||||
&self,
|
&self,
|
||||||
blknum: u32,
|
blknum: u32,
|
||||||
ctx: &RequestContext,
|
ctx: &'c RequestContext,
|
||||||
) -> Result<BlockLease, io::Error> {
|
) -> Result<BlockLease<'c, '_>, io::Error> {
|
||||||
let flushed_blknums = 0..self.len / PAGE_SZ as u64;
|
let flushed_blknums = 0..self.len / PAGE_SZ as u64;
|
||||||
if flushed_blknums.contains(&(blknum as u64)) {
|
if flushed_blknums.contains(&(blknum as u64)) {
|
||||||
let cache = page_cache::get();
|
let cache = page_cache::get();
|
||||||
loop {
|
match cache
|
||||||
match cache
|
.read_immutable_buf(self.page_cache_file_id, blknum, ctx)
|
||||||
.read_immutable_buf(self.page_cache_file_id, blknum, ctx)
|
.await
|
||||||
.await
|
.map_err(|e| {
|
||||||
.map_err(|e| {
|
std::io::Error::new(
|
||||||
std::io::Error::new(
|
std::io::ErrorKind::Other,
|
||||||
std::io::ErrorKind::Other,
|
// order path before error because error is anyhow::Error => might have many contexts
|
||||||
// order path before error because error is anyhow::Error => might have many contexts
|
format!(
|
||||||
format!(
|
"ephemeral file: read immutable page #{}: {}: {:#}",
|
||||||
"ephemeral file: read immutable page #{}: {}: {:#}",
|
blknum, self.file.path, e,
|
||||||
blknum, self.file.path, e,
|
),
|
||||||
),
|
)
|
||||||
)
|
})? {
|
||||||
})? {
|
page_cache::ReadBufResult::Found(guard) => {
|
||||||
page_cache::ReadBufResult::Found(guard) => {
|
return Ok(BlockLease::PageReadGuard(guard))
|
||||||
return Ok(BlockLease::PageReadGuard(guard))
|
}
|
||||||
}
|
page_cache::ReadBufResult::NotFound(mut write_guard) => {
|
||||||
page_cache::ReadBufResult::NotFound(mut write_guard) => {
|
let buf: &mut [u8] = write_guard.deref_mut();
|
||||||
let buf: &mut [u8] = write_guard.deref_mut();
|
debug_assert_eq!(buf.len(), PAGE_SZ);
|
||||||
debug_assert_eq!(buf.len(), PAGE_SZ);
|
self.file
|
||||||
self.file
|
.read_exact_at(&mut buf[..], blknum as u64 * PAGE_SZ as u64)
|
||||||
.read_exact_at(&mut buf[..], blknum as u64 * PAGE_SZ as u64)
|
.await?;
|
||||||
.await?;
|
let read_guard = write_guard.mark_valid();
|
||||||
write_guard.mark_valid();
|
return Ok(BlockLease::PageReadGuard(read_guard));
|
||||||
|
}
|
||||||
// Swap for read lock
|
};
|
||||||
continue;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
}
|
|
||||||
} else {
|
} else {
|
||||||
debug_assert_eq!(blknum as u64, self.len / PAGE_SZ as u64);
|
debug_assert_eq!(blknum as u64, self.len / PAGE_SZ as u64);
|
||||||
Ok(BlockLease::EphemeralFileMutableTail(&self.mutable_tail))
|
Ok(BlockLease::EphemeralFileMutableTail(&self.mutable_tail))
|
||||||
@@ -171,7 +167,7 @@ impl EphemeralFile {
|
|||||||
let buf: &mut [u8] = write_guard.deref_mut();
|
let buf: &mut [u8] = write_guard.deref_mut();
|
||||||
debug_assert_eq!(buf.len(), PAGE_SZ);
|
debug_assert_eq!(buf.len(), PAGE_SZ);
|
||||||
buf.copy_from_slice(&self.ephemeral_file.mutable_tail);
|
buf.copy_from_slice(&self.ephemeral_file.mutable_tail);
|
||||||
write_guard.mark_valid();
|
let _ = write_guard.mark_valid();
|
||||||
// pre-warm successful
|
// pre-warm successful
|
||||||
}
|
}
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
|
|||||||
@@ -549,7 +549,7 @@ impl DeltaLayer {
|
|||||||
/// Loads all keys stored in the layer. Returns key, lsn, value size and value reference.
|
/// Loads all keys stored in the layer. Returns key, lsn, value size and value reference.
|
||||||
///
|
///
|
||||||
/// The value can be obtained via the [`ValueRef::load`] function.
|
/// The value can be obtained via the [`ValueRef::load`] function.
|
||||||
pub(crate) async fn load_keys(&self, ctx: &RequestContext) -> Result<Vec<DeltaEntry<'_>>> {
|
pub(crate) async fn load_keys<'c>(&self, ctx: &RequestContext) -> Result<Vec<DeltaEntry<'_>>> {
|
||||||
let inner = self
|
let inner = self
|
||||||
.load(LayerAccessKind::KeyIter, ctx)
|
.load(LayerAccessKind::KeyIter, ctx)
|
||||||
.await
|
.await
|
||||||
@@ -1038,9 +1038,9 @@ pub struct ValueRef<'a> {
|
|||||||
reader: BlockCursor<'a>,
|
reader: BlockCursor<'a>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<'a> ValueRef<'a> {
|
impl<'c, 'a> ValueRef<'a> {
|
||||||
/// Loads the value from disk
|
/// Loads the value from disk
|
||||||
pub async fn load(&self, ctx: &RequestContext) -> Result<Value> {
|
pub async fn load(&self, ctx: &'c RequestContext) -> Result<Value> {
|
||||||
// theoretically we *could* record an access time for each, but it does not really matter
|
// theoretically we *could* record an access time for each, but it does not really matter
|
||||||
let buf = self.reader.read_blob(self.blob_ref.pos(), ctx).await?;
|
let buf = self.reader.read_blob(self.blob_ref.pos(), ctx).await?;
|
||||||
let val = Value::des(&buf)?;
|
let val = Value::des(&buf)?;
|
||||||
@@ -1051,11 +1051,11 @@ impl<'a> ValueRef<'a> {
|
|||||||
pub(crate) struct Adapter<T>(T);
|
pub(crate) struct Adapter<T>(T);
|
||||||
|
|
||||||
impl<T: AsRef<DeltaLayerInner>> Adapter<T> {
|
impl<T: AsRef<DeltaLayerInner>> Adapter<T> {
|
||||||
pub(crate) async fn read_blk(
|
pub(crate) async fn read_blk<'c>(
|
||||||
&self,
|
&self,
|
||||||
blknum: u32,
|
blknum: u32,
|
||||||
ctx: &RequestContext,
|
ctx: &'c RequestContext,
|
||||||
) -> Result<BlockLease, std::io::Error> {
|
) -> Result<BlockLease<'c, '_>, std::io::Error> {
|
||||||
self.0.as_ref().file.read_blk(blknum, ctx).await
|
self.0.as_ref().file.read_blk(blknum, ctx).await
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -158,7 +158,7 @@ pub struct Timeline {
|
|||||||
|
|
||||||
/// The generation of the tenant that instantiated us: this is used for safety when writing remote objects.
|
/// The generation of the tenant that instantiated us: this is used for safety when writing remote objects.
|
||||||
/// Never changes for the lifetime of this [`Timeline`] object.
|
/// Never changes for the lifetime of this [`Timeline`] object.
|
||||||
///
|
///
|
||||||
/// This duplicates the generation stored in LocationConf, but that structure is mutable:
|
/// This duplicates the generation stored in LocationConf, but that structure is mutable:
|
||||||
/// this copy enforces the invariant that generatio doesn't change during a Tenant's lifetime.
|
/// this copy enforces the invariant that generatio doesn't change during a Tenant's lifetime.
|
||||||
generation: Generation,
|
generation: Generation,
|
||||||
@@ -505,7 +505,7 @@ impl Timeline {
|
|||||||
timer.stop_and_record();
|
timer.stop_and_record();
|
||||||
|
|
||||||
let start = Instant::now();
|
let start = Instant::now();
|
||||||
let res = self.reconstruct_value(key, lsn, reconstruct_state).await;
|
let res = self.reconstruct_value(key, lsn, reconstruct_state, ctx).await;
|
||||||
let elapsed = start.elapsed();
|
let elapsed = start.elapsed();
|
||||||
crate::metrics::RECONSTRUCT_TIME
|
crate::metrics::RECONSTRUCT_TIME
|
||||||
.for_result(&res)
|
.for_result(&res)
|
||||||
@@ -4279,6 +4279,7 @@ impl Timeline {
|
|||||||
key: Key,
|
key: Key,
|
||||||
request_lsn: Lsn,
|
request_lsn: Lsn,
|
||||||
mut data: ValueReconstructState,
|
mut data: ValueReconstructState,
|
||||||
|
ctx: &RequestContext,
|
||||||
) -> Result<Bytes, PageReconstructError> {
|
) -> Result<Bytes, PageReconstructError> {
|
||||||
// Perform WAL redo if needed
|
// Perform WAL redo if needed
|
||||||
data.records.reverse();
|
data.records.reverse();
|
||||||
@@ -4342,6 +4343,7 @@ impl Timeline {
|
|||||||
key,
|
key,
|
||||||
last_rec_lsn,
|
last_rec_lsn,
|
||||||
&img,
|
&img,
|
||||||
|
ctx,
|
||||||
)
|
)
|
||||||
.await
|
.await
|
||||||
.context("Materialized page memoization failed")
|
.context("Materialized page memoization failed")
|
||||||
|
|||||||
@@ -16,7 +16,7 @@
|
|||||||
use std::{
|
use std::{
|
||||||
collections::HashMap,
|
collections::HashMap,
|
||||||
ops::ControlFlow,
|
ops::ControlFlow,
|
||||||
sync::Arc,
|
sync::{Arc, Mutex},
|
||||||
time::{Duration, SystemTime},
|
time::{Duration, SystemTime},
|
||||||
};
|
};
|
||||||
|
|
||||||
@@ -25,7 +25,7 @@ use tokio_util::sync::CancellationToken;
|
|||||||
use tracing::{debug, error, info, info_span, instrument, warn, Instrument};
|
use tracing::{debug, error, info, info_span, instrument, warn, Instrument};
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
context::{DownloadBehavior, RequestContext},
|
context::{DownloadBehavior, RequestContext, RequestContextBuilder},
|
||||||
task_mgr::{self, TaskKind, BACKGROUND_RUNTIME},
|
task_mgr::{self, TaskKind, BACKGROUND_RUNTIME},
|
||||||
tenant::{
|
tenant::{
|
||||||
config::{EvictionPolicy, EvictionPolicyLayerAccessThreshold},
|
config::{EvictionPolicy, EvictionPolicyLayerAccessThreshold},
|
||||||
@@ -397,9 +397,14 @@ impl Timeline {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
let permit = crate::page_cache::get().get_permit().await;
|
||||||
|
let ctx = RequestContextBuilder::extend(ctx)
|
||||||
|
.page_cache_permit(permit)
|
||||||
|
.build();
|
||||||
|
|
||||||
// imitiate repartiting on first compactation
|
// imitiate repartiting on first compactation
|
||||||
if let Err(e) = self
|
if let Err(e) = self
|
||||||
.collect_keyspace(lsn, ctx)
|
.collect_keyspace(lsn, &ctx)
|
||||||
.instrument(info_span!("collect_keyspace"))
|
.instrument(info_span!("collect_keyspace"))
|
||||||
.await
|
.await
|
||||||
{
|
{
|
||||||
|
|||||||
@@ -544,7 +544,7 @@ impl VirtualFile {
|
|||||||
pub(crate) async fn read_blk(
|
pub(crate) async fn read_blk(
|
||||||
&self,
|
&self,
|
||||||
blknum: u32,
|
blknum: u32,
|
||||||
) -> Result<crate::tenant::block_io::BlockLease<'_>, std::io::Error> {
|
) -> Result<crate::tenant::block_io::BlockLease<'_, '_>, std::io::Error> {
|
||||||
use crate::page_cache::PAGE_SZ;
|
use crate::page_cache::PAGE_SZ;
|
||||||
let mut buf = [0; PAGE_SZ];
|
let mut buf = [0; PAGE_SZ];
|
||||||
self.read_exact_at(&mut buf, blknum as u64 * (PAGE_SZ as u64))
|
self.read_exact_at(&mut buf, blknum as u64 * (PAGE_SZ as u64))
|
||||||
|
|||||||
Reference in New Issue
Block a user