mirror of
https://github.com/neondatabase/neon.git
synced 2026-02-01 17:50:38 +00:00
Compare commits
7 Commits
heikki/tes
...
problame/p
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
1863a04fb0 | ||
|
|
f83a71ca6a | ||
|
|
74a634c9fa | ||
|
|
fc3f8a65b3 | ||
|
|
9f03dd24c2 | ||
|
|
dc96a7604a | ||
|
|
d7c94e67ce |
27
Cargo.lock
generated
27
Cargo.lock
generated
@@ -158,6 +158,17 @@ dependencies = [
|
||||
"syn 1.0.109",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "async-channel"
|
||||
version = "1.9.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "81953c529336010edd6d8e358f886d9581267795c61b19475b71314bffa46d35"
|
||||
dependencies = [
|
||||
"concurrent-queue",
|
||||
"event-listener",
|
||||
"futures-core",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "async-compression"
|
||||
version = "0.4.0"
|
||||
@@ -1031,6 +1042,15 @@ dependencies = [
|
||||
"zstd",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "concurrent-queue"
|
||||
version = "2.3.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "f057a694a54f12365049b0958a1685bb52d567f5593b355fbf685838e873d400"
|
||||
dependencies = [
|
||||
"crossbeam-utils",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "const_format"
|
||||
version = "0.2.30"
|
||||
@@ -1452,6 +1472,12 @@ dependencies = [
|
||||
"libc",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "event-listener"
|
||||
version = "2.5.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "0206175f82b8d6bf6652ff7d71a1e27fd2e4efde587fd368662814d6ec1d9ce0"
|
||||
|
||||
[[package]]
|
||||
name = "fail"
|
||||
version = "0.5.1"
|
||||
@@ -2674,6 +2700,7 @@ name = "pageserver"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"async-channel",
|
||||
"async-compression",
|
||||
"async-stream",
|
||||
"async-trait",
|
||||
|
||||
@@ -35,6 +35,7 @@ license = "Apache-2.0"
|
||||
## All dependency versions, used in the project
|
||||
[workspace.dependencies]
|
||||
anyhow = { version = "1.0", features = ["backtrace"] }
|
||||
async-channel = "1.9.0"
|
||||
async-compression = { version = "0.4.0", features = ["tokio", "gzip"] }
|
||||
flate2 = "1.0.26"
|
||||
async-stream = "0.3"
|
||||
|
||||
@@ -12,6 +12,7 @@ testing = ["fail/failpoints"]
|
||||
|
||||
[dependencies]
|
||||
anyhow.workspace = true
|
||||
async-channel.workspace = true
|
||||
async-compression.workspace = true
|
||||
async-stream.workspace = true
|
||||
async-trait.workspace = true
|
||||
|
||||
@@ -86,15 +86,18 @@
|
||||
//! [`RequestContext`] argument. Functions in the middle of the call chain
|
||||
//! only need to pass it on.
|
||||
|
||||
use std::sync::{Arc, Mutex, MutexGuard};
|
||||
|
||||
use crate::task_mgr::TaskKind;
|
||||
|
||||
// The main structure of this module, see module-level comment.
|
||||
#[derive(Clone, Debug)]
|
||||
#[derive(Clone)]
|
||||
pub struct RequestContext {
|
||||
task_kind: TaskKind,
|
||||
download_behavior: DownloadBehavior,
|
||||
access_stats_behavior: AccessStatsBehavior,
|
||||
page_content_kind: PageContentKind,
|
||||
page_cache_permit: Option<Arc<crate::page_cache::PinnedSlotsPermit>>,
|
||||
}
|
||||
|
||||
/// The kind of access to the page cache.
|
||||
@@ -150,6 +153,7 @@ impl RequestContextBuilder {
|
||||
download_behavior: DownloadBehavior::Download,
|
||||
access_stats_behavior: AccessStatsBehavior::Update,
|
||||
page_content_kind: PageContentKind::Unknown,
|
||||
page_cache_permit: None,
|
||||
},
|
||||
}
|
||||
}
|
||||
@@ -163,6 +167,7 @@ impl RequestContextBuilder {
|
||||
download_behavior: original.download_behavior,
|
||||
access_stats_behavior: original.access_stats_behavior,
|
||||
page_content_kind: original.page_content_kind,
|
||||
page_cache_permit: original.page_cache_permit.clone(),
|
||||
},
|
||||
}
|
||||
}
|
||||
@@ -186,6 +191,11 @@ impl RequestContextBuilder {
|
||||
self
|
||||
}
|
||||
|
||||
pub(crate) fn page_cache_permit(mut self, p: Arc<crate::page_cache::PinnedSlotsPermit>) -> Self {
|
||||
self.inner.page_cache_permit = Some(p);
|
||||
self
|
||||
}
|
||||
|
||||
pub fn build(self) -> RequestContext {
|
||||
self.inner
|
||||
}
|
||||
@@ -286,4 +296,8 @@ impl RequestContext {
|
||||
pub(crate) fn page_content_kind(&self) -> PageContentKind {
|
||||
self.page_content_kind
|
||||
}
|
||||
|
||||
pub(crate) fn permit(&self) -> Option<&crate::page_cache::PinnedSlotsPermit> {
|
||||
self.page_cache_permit.as_ref().map(|p| &**p)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -314,7 +314,6 @@ static PAGE_CACHE_ERRORS: Lazy<IntCounterVec> = Lazy::new(|| {
|
||||
#[strum(serialize_all = "kebab_case")]
|
||||
pub(crate) enum PageCacheErrorKind {
|
||||
AcquirePinnedSlotTimeout,
|
||||
EvictIterLimit,
|
||||
}
|
||||
|
||||
pub(crate) fn page_cache_errors_inc(error_kind: PageCacheErrorKind) {
|
||||
|
||||
@@ -66,8 +66,7 @@
|
||||
//! inserted to the mapping, but you must hold the write-lock on the slot until
|
||||
//! the contents are valid. If you need to release the lock without initializing
|
||||
//! the contents, you must remove the mapping first. We make that easy for the
|
||||
//! callers with PageWriteGuard: when lock_for_write() returns an uninitialized
|
||||
//! page, the caller must explicitly call guard.mark_valid() after it has
|
||||
//! callers with PageWriteGuard: the caller must explicitly call guard.mark_valid() after it has
|
||||
//! initialized it. If the guard is dropped without calling mark_valid(), the
|
||||
//! mapping is automatically removed and the slot is marked free.
|
||||
//!
|
||||
@@ -79,6 +78,7 @@ use std::{
|
||||
atomic::{AtomicU64, AtomicU8, AtomicUsize, Ordering},
|
||||
Arc, Weak,
|
||||
},
|
||||
task::Poll,
|
||||
time::Duration,
|
||||
};
|
||||
|
||||
@@ -215,16 +215,21 @@ impl Slot {
|
||||
|
||||
impl SlotInner {
|
||||
/// If there is aready a reader, drop our permit and share its permit, just like we share read access.
|
||||
fn coalesce_readers_permit(&self, permit: PinnedSlotsPermit) -> Arc<PinnedSlotsPermit> {
|
||||
let mut guard = self.permit.lock().unwrap();
|
||||
if let Some(existing_permit) = guard.upgrade() {
|
||||
drop(guard);
|
||||
drop(permit);
|
||||
existing_permit
|
||||
} else {
|
||||
let permit = Arc::new(permit);
|
||||
*guard = Arc::downgrade(&permit);
|
||||
permit
|
||||
fn coalesce_readers_permit<'c>(&self, permit: PermitKind<'c>) -> PermitKindReadGuard<'c> {
|
||||
match permit {
|
||||
PermitKind::CtxProvided(permit) => PermitKindReadGuard::CtxProvided(permit),
|
||||
PermitKind::Acquired(permit) => {
|
||||
let mut guard = self.permit.lock().unwrap();
|
||||
if let Some(existing_permit) = guard.upgrade() {
|
||||
drop(guard);
|
||||
drop(permit);
|
||||
existing_permit
|
||||
} else {
|
||||
let permit = Arc::new(permit);
|
||||
*guard = Arc::downgrade(&permit);
|
||||
permit
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -252,21 +257,36 @@ pub struct PageCache {
|
||||
/// This is interpreted modulo the page cache size.
|
||||
next_evict_slot: AtomicUsize,
|
||||
|
||||
find_victim_sender:
|
||||
async_channel::Sender<(usize, tokio::sync::RwLockWriteGuard<'static, SlotInner>)>,
|
||||
find_victim_waiters:
|
||||
async_channel::Receiver<(usize, tokio::sync::RwLockWriteGuard<'static, SlotInner>)>,
|
||||
|
||||
size_metrics: &'static PageCacheSizeMetrics,
|
||||
}
|
||||
|
||||
struct PinnedSlotsPermit(tokio::sync::OwnedSemaphorePermit);
|
||||
pub(crate) struct PinnedSlotsPermit(tokio::sync::OwnedSemaphorePermit);
|
||||
|
||||
enum PermitKind<'c> {
|
||||
CtxProvided(&'c PinnedSlotsPermit),
|
||||
Acquired(PinnedSlotsPermit),
|
||||
}
|
||||
|
||||
enum PermitKindReadGuard<'c> {
|
||||
CtxProvided(&'c PinnedSlotsPermit),
|
||||
Coalesced(Arc<PinnedSlotsPermit>),
|
||||
}
|
||||
|
||||
///
|
||||
/// PageReadGuard is a "lease" on a buffer, for reading. The page is kept locked
|
||||
/// until the guard is dropped.
|
||||
///
|
||||
pub struct PageReadGuard<'i> {
|
||||
_permit: Arc<PinnedSlotsPermit>,
|
||||
pub struct PageReadGuard<'c, 'i> {
|
||||
_permit: PermitKindReadGuard<'c>,
|
||||
slot_guard: tokio::sync::RwLockReadGuard<'i, SlotInner>,
|
||||
}
|
||||
|
||||
impl std::ops::Deref for PageReadGuard<'_> {
|
||||
impl std::ops::Deref for PageReadGuard<'_, '_> {
|
||||
type Target = [u8; PAGE_SZ];
|
||||
|
||||
fn deref(&self) -> &Self::Target {
|
||||
@@ -274,7 +294,7 @@ impl std::ops::Deref for PageReadGuard<'_> {
|
||||
}
|
||||
}
|
||||
|
||||
impl AsRef<[u8; PAGE_SZ]> for PageReadGuard<'_> {
|
||||
impl AsRef<[u8; PAGE_SZ]> for PageReadGuard<'_, '_> {
|
||||
fn as_ref(&self) -> &[u8; PAGE_SZ] {
|
||||
self.slot_guard.buf
|
||||
}
|
||||
@@ -286,78 +306,89 @@ impl AsRef<[u8; PAGE_SZ]> for PageReadGuard<'_> {
|
||||
///
|
||||
/// Counterintuitively, this is used even for a read, if the requested page is not
|
||||
/// currently found in the page cache. In that case, the caller of lock_for_read()
|
||||
/// is expected to fill in the page contents and call mark_valid(). Similarly
|
||||
/// lock_for_write() can return an invalid buffer that the caller is expected to
|
||||
/// to initialize.
|
||||
///
|
||||
pub struct PageWriteGuard<'i> {
|
||||
inner: tokio::sync::RwLockWriteGuard<'i, SlotInner>,
|
||||
|
||||
_permit: PinnedSlotsPermit,
|
||||
|
||||
// Are the page contents currently valid?
|
||||
// Used to mark pages as invalid that are assigned but not yet filled with data.
|
||||
valid: bool,
|
||||
/// is expected to fill in the page contents and call mark_valid().
|
||||
pub struct PageWriteGuard<'c, 'i> {
|
||||
state: PageWriteGuardState<'c, 'i>,
|
||||
}
|
||||
|
||||
impl std::ops::DerefMut for PageWriteGuard<'_> {
|
||||
enum PageWriteGuardState<'c, 'i> {
|
||||
Invalid {
|
||||
inner: tokio::sync::RwLockWriteGuard<'i, SlotInner>,
|
||||
_permit: PermitKindReadGuard<'c>,
|
||||
},
|
||||
Downgraded,
|
||||
}
|
||||
|
||||
impl std::ops::DerefMut for PageWriteGuard<'_, '_> {
|
||||
fn deref_mut(&mut self) -> &mut Self::Target {
|
||||
self.inner.buf
|
||||
match &mut self.state {
|
||||
PageWriteGuardState::Invalid { inner, _permit } => inner.buf,
|
||||
PageWriteGuardState::Downgraded => unreachable!(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl std::ops::Deref for PageWriteGuard<'_> {
|
||||
impl std::ops::Deref for PageWriteGuard<'_, '_> {
|
||||
type Target = [u8; PAGE_SZ];
|
||||
|
||||
fn deref(&self) -> &Self::Target {
|
||||
self.inner.buf
|
||||
match &self.state {
|
||||
PageWriteGuardState::Invalid { inner, _permit } => inner.buf,
|
||||
PageWriteGuardState::Downgraded => unreachable!(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl AsMut<[u8; PAGE_SZ]> for PageWriteGuard<'_> {
|
||||
impl AsMut<[u8; PAGE_SZ]> for PageWriteGuard<'_, '_> {
|
||||
fn as_mut(&mut self) -> &mut [u8; PAGE_SZ] {
|
||||
self.inner.buf
|
||||
match &mut self.state {
|
||||
PageWriteGuardState::Invalid { inner, _permit } => inner.buf,
|
||||
PageWriteGuardState::Downgraded => todo!(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl PageWriteGuard<'_> {
|
||||
impl<'c, 'a> PageWriteGuard<'c, 'a> {
|
||||
/// Mark that the buffer contents are now valid.
|
||||
pub fn mark_valid(&mut self) {
|
||||
assert!(self.inner.key.is_some());
|
||||
assert!(
|
||||
!self.valid,
|
||||
"mark_valid called on a buffer that was already valid"
|
||||
);
|
||||
self.valid = true;
|
||||
#[must_use]
|
||||
pub fn mark_valid(mut self) -> PageReadGuard<'c, 'a> {
|
||||
let prev = std::mem::replace(&mut self.state, PageWriteGuardState::Downgraded);
|
||||
match prev {
|
||||
PageWriteGuardState::Invalid { inner, _permit } => {
|
||||
assert!(inner.key.is_some());
|
||||
PageReadGuard {
|
||||
_permit,
|
||||
slot_guard: inner.downgrade(),
|
||||
}
|
||||
}
|
||||
PageWriteGuardState::Downgraded => unreachable!(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for PageWriteGuard<'_> {
|
||||
impl Drop for PageWriteGuard<'_, '_> {
|
||||
///
|
||||
/// If the buffer was allocated for a page that was not already in the
|
||||
/// cache, but the lock_for_read/write() caller dropped the buffer without
|
||||
/// initializing it, remove the mapping from the page cache.
|
||||
///
|
||||
fn drop(&mut self) {
|
||||
assert!(self.inner.key.is_some());
|
||||
if !self.valid {
|
||||
let self_key = self.inner.key.as_ref().unwrap();
|
||||
PAGE_CACHE.get().unwrap().remove_mapping(self_key);
|
||||
self.inner.key = None;
|
||||
match &mut self.state {
|
||||
PageWriteGuardState::Invalid { inner, _permit } => {
|
||||
assert!(inner.key.is_some());
|
||||
let self_key = inner.key.as_ref().unwrap();
|
||||
PAGE_CACHE.get().unwrap().remove_mapping(self_key);
|
||||
inner.key = None;
|
||||
}
|
||||
PageWriteGuardState::Downgraded => {}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// lock_for_read() return value
|
||||
pub enum ReadBufResult<'a> {
|
||||
Found(PageReadGuard<'a>),
|
||||
NotFound(PageWriteGuard<'a>),
|
||||
}
|
||||
|
||||
/// lock_for_write() return value
|
||||
pub enum WriteBufResult<'a> {
|
||||
Found(PageWriteGuard<'a>),
|
||||
NotFound(PageWriteGuard<'a>),
|
||||
pub enum ReadBufResult<'c, 'a> {
|
||||
Found(PageReadGuard<'c, 'a>),
|
||||
NotFound(PageWriteGuard<'c, 'a>),
|
||||
}
|
||||
|
||||
impl PageCache {
|
||||
@@ -379,10 +410,9 @@ impl PageCache {
|
||||
lsn: Lsn,
|
||||
ctx: &RequestContext,
|
||||
) -> Option<(Lsn, PageReadGuard)> {
|
||||
let Ok(permit) = self.try_get_pinned_slot_permit().await else {
|
||||
let Ok(permit) = self.try_get_pinned_slot_permit(ctx).await else {
|
||||
return None;
|
||||
};
|
||||
|
||||
crate::metrics::PAGE_CACHE
|
||||
.for_ctx(ctx)
|
||||
.read_accesses_materialized_page
|
||||
@@ -430,12 +460,13 @@ impl PageCache {
|
||||
/// Store an image of the given page in the cache.
|
||||
///
|
||||
pub async fn memorize_materialized_page(
|
||||
&self,
|
||||
&'static self,
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
key: Key,
|
||||
lsn: Lsn,
|
||||
img: &[u8],
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
let cache_key = CacheKey::MaterializedPage {
|
||||
hash_key: MaterializedPageHashKey {
|
||||
@@ -446,30 +477,87 @@ impl PageCache {
|
||||
lsn,
|
||||
};
|
||||
|
||||
match self.lock_for_write(&cache_key).await? {
|
||||
WriteBufResult::Found(write_guard) => {
|
||||
// We already had it in cache. Another thread must've put it there
|
||||
// concurrently. Check that it had the same contents that we
|
||||
// replayed.
|
||||
assert!(*write_guard == img);
|
||||
let mut permit = Some(self.try_get_pinned_slot_permit(ctx).await?);
|
||||
loop {
|
||||
// First check if the key already exists in the cache.
|
||||
if let Some(slot_idx) = self.search_mapping_exact(&cache_key) {
|
||||
// The page was found in the mapping. Lock the slot, and re-check
|
||||
// that it's still what we expected (because we don't released the mapping
|
||||
// lock already, another thread could have evicted the page)
|
||||
let slot = &self.slots[slot_idx];
|
||||
let inner = slot.inner.write().await;
|
||||
if inner.key.as_ref() == Some(&cache_key) {
|
||||
slot.inc_usage_count();
|
||||
debug_assert!(
|
||||
{
|
||||
let guard = inner.permit.lock().unwrap();
|
||||
guard.upgrade().is_none()
|
||||
},
|
||||
"we hold a write lock, so, no one else should have a permit"
|
||||
);
|
||||
debug_assert_eq!(inner.buf.len(), img.len());
|
||||
// We already had it in cache. Another thread must've put it there
|
||||
// concurrently. Check that it had the same contents that we
|
||||
// replayed.
|
||||
assert!(inner.buf == img);
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
WriteBufResult::NotFound(mut write_guard) => {
|
||||
write_guard.copy_from_slice(img);
|
||||
write_guard.mark_valid();
|
||||
}
|
||||
}
|
||||
debug_assert!(permit.is_some());
|
||||
|
||||
Ok(())
|
||||
// Not found. Find a victim buffer
|
||||
let (slot_idx, mut inner) = self
|
||||
.find_victim(permit.as_ref().unwrap())
|
||||
.await
|
||||
.context("Failed to find evict victim")?;
|
||||
|
||||
// Insert mapping for this. At this point, we may find that another
|
||||
// thread did the same thing concurrently. In that case, we evicted
|
||||
// our victim buffer unnecessarily. Put it into the free list and
|
||||
// continue with the slot that the other thread chose.
|
||||
if let Some(_existing_slot_idx) = self.try_insert_mapping(&cache_key, slot_idx) {
|
||||
// TODO: put to free list
|
||||
|
||||
// We now just loop back to start from beginning. This is not
|
||||
// optimal, we'll perform the lookup in the mapping again, which
|
||||
// is not really necessary because we already got
|
||||
// 'existing_slot_idx'. But this shouldn't happen often enough
|
||||
// to matter much.
|
||||
continue;
|
||||
}
|
||||
|
||||
// Make the slot ready
|
||||
let slot = &self.slots[slot_idx];
|
||||
inner.key = Some(cache_key.clone());
|
||||
slot.set_usage_count(1);
|
||||
// Create a write guard for the slot so we go through the expected motions.
|
||||
debug_assert!(
|
||||
{
|
||||
let guard = inner.permit.lock().unwrap();
|
||||
guard.upgrade().is_none()
|
||||
},
|
||||
"we hold a write lock, so, no one else should have a permit"
|
||||
);
|
||||
let mut write_guard = PageWriteGuard {
|
||||
state: PageWriteGuardState::Invalid {
|
||||
_permit: permit.take().unwrap(),
|
||||
inner,
|
||||
},
|
||||
};
|
||||
write_guard.copy_from_slice(img);
|
||||
let _ = write_guard.mark_valid();
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
|
||||
// Section 1.2: Public interface functions for working with immutable file pages.
|
||||
|
||||
pub async fn read_immutable_buf(
|
||||
&self,
|
||||
pub async fn read_immutable_buf<'c>(
|
||||
&'static self,
|
||||
file_id: FileId,
|
||||
blkno: u32,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<ReadBufResult> {
|
||||
ctx: &'c RequestContext,
|
||||
) -> anyhow::Result<ReadBufResult<'c, 'static>> {
|
||||
let mut cache_key = CacheKey::ImmutableFilePage { file_id, blkno };
|
||||
|
||||
self.lock_for_read(&mut cache_key, ctx).await
|
||||
@@ -483,7 +571,22 @@ impl PageCache {
|
||||
// "mappings" after this section. But the routines in this section should
|
||||
// not require changes.
|
||||
|
||||
async fn try_get_pinned_slot_permit(&self) -> anyhow::Result<PinnedSlotsPermit> {
|
||||
pub(crate) async fn get_permit(&self) -> Arc<PinnedSlotsPermit> {
|
||||
Arc::new(PinnedSlotsPermit(
|
||||
Arc::clone(&self.pinned_slots)
|
||||
.acquire_owned()
|
||||
.await
|
||||
.expect("the semaphore is never closed"),
|
||||
))
|
||||
}
|
||||
|
||||
async fn try_get_pinned_slot_permit<'c>(
|
||||
&self,
|
||||
ctx: &'c RequestContext,
|
||||
) -> anyhow::Result<PermitKind<'c>> {
|
||||
if let Some(permit) = ctx.permit() {
|
||||
return Ok(PermitKind::CtxProvided(permit));
|
||||
};
|
||||
let timer = crate::metrics::PAGE_CACHE_ACQUIRE_PINNED_SLOT_TIME.start_timer();
|
||||
match tokio::time::timeout(
|
||||
// Choose small timeout, neon_smgr does its own retries.
|
||||
@@ -493,9 +596,9 @@ impl PageCache {
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(res) => Ok(PinnedSlotsPermit(
|
||||
Ok(res) => Ok(PermitKind::Acquired(PinnedSlotsPermit(
|
||||
res.expect("this semaphore is never closed"),
|
||||
)),
|
||||
))),
|
||||
Err(_timeout) => {
|
||||
timer.stop_and_discard();
|
||||
crate::metrics::page_cache_errors_inc(
|
||||
@@ -515,10 +618,10 @@ impl PageCache {
|
||||
///
|
||||
/// If no page is found, returns None and *cache_key is left unmodified.
|
||||
///
|
||||
async fn try_lock_for_read(
|
||||
async fn try_lock_for_read<'c>(
|
||||
&self,
|
||||
cache_key: &mut CacheKey,
|
||||
permit: &mut Option<PinnedSlotsPermit>,
|
||||
permit: &mut Option<PermitKind<'c>>,
|
||||
) -> Option<PageReadGuard> {
|
||||
let cache_key_orig = cache_key.clone();
|
||||
if let Some(slot_idx) = self.search_mapping(cache_key) {
|
||||
@@ -571,11 +674,11 @@ impl PageCache {
|
||||
/// ```
|
||||
///
|
||||
async fn lock_for_read(
|
||||
&self,
|
||||
&'static self,
|
||||
cache_key: &mut CacheKey,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<ReadBufResult> {
|
||||
let mut permit = Some(self.try_get_pinned_slot_permit().await?);
|
||||
let mut permit = Some(self.try_get_pinned_slot_permit(ctx).await?);
|
||||
|
||||
let (read_access, hit) = match cache_key {
|
||||
CacheKey::MaterializedPage { .. } => {
|
||||
@@ -638,99 +741,10 @@ impl PageCache {
|
||||
);
|
||||
|
||||
return Ok(ReadBufResult::NotFound(PageWriteGuard {
|
||||
_permit: permit.take().unwrap(),
|
||||
inner,
|
||||
valid: false,
|
||||
}));
|
||||
}
|
||||
}
|
||||
|
||||
/// Look up a page in the cache and lock it in write mode. If it's not
|
||||
/// found, returns None.
|
||||
///
|
||||
/// When locking a page for writing, the search criteria is always "exact".
|
||||
async fn try_lock_for_write(
|
||||
&self,
|
||||
cache_key: &CacheKey,
|
||||
permit: &mut Option<PinnedSlotsPermit>,
|
||||
) -> Option<PageWriteGuard> {
|
||||
if let Some(slot_idx) = self.search_mapping_for_write(cache_key) {
|
||||
// The page was found in the mapping. Lock the slot, and re-check
|
||||
// that it's still what we expected (because we don't released the mapping
|
||||
// lock already, another thread could have evicted the page)
|
||||
let slot = &self.slots[slot_idx];
|
||||
let inner = slot.inner.write().await;
|
||||
if inner.key.as_ref() == Some(cache_key) {
|
||||
slot.inc_usage_count();
|
||||
debug_assert!(
|
||||
{
|
||||
let guard = inner.permit.lock().unwrap();
|
||||
guard.upgrade().is_none()
|
||||
},
|
||||
"we hold a write lock, so, no one else should have a permit"
|
||||
);
|
||||
return Some(PageWriteGuard {
|
||||
state: PageWriteGuardState::Invalid {
|
||||
_permit: permit.take().unwrap(),
|
||||
inner,
|
||||
valid: true,
|
||||
});
|
||||
}
|
||||
}
|
||||
None
|
||||
}
|
||||
|
||||
/// Return a write-locked buffer for given block.
|
||||
///
|
||||
/// Similar to lock_for_read(), but the returned buffer is write-locked and
|
||||
/// may be modified by the caller even if it's already found in the cache.
|
||||
async fn lock_for_write(&self, cache_key: &CacheKey) -> anyhow::Result<WriteBufResult> {
|
||||
let mut permit = Some(self.try_get_pinned_slot_permit().await?);
|
||||
loop {
|
||||
// First check if the key already exists in the cache.
|
||||
if let Some(write_guard) = self.try_lock_for_write(cache_key, &mut permit).await {
|
||||
debug_assert!(permit.is_none());
|
||||
return Ok(WriteBufResult::Found(write_guard));
|
||||
}
|
||||
debug_assert!(permit.is_some());
|
||||
|
||||
// Not found. Find a victim buffer
|
||||
let (slot_idx, mut inner) = self
|
||||
.find_victim(permit.as_ref().unwrap())
|
||||
.await
|
||||
.context("Failed to find evict victim")?;
|
||||
|
||||
// Insert mapping for this. At this point, we may find that another
|
||||
// thread did the same thing concurrently. In that case, we evicted
|
||||
// our victim buffer unnecessarily. Put it into the free list and
|
||||
// continue with the slot that the other thread chose.
|
||||
if let Some(_existing_slot_idx) = self.try_insert_mapping(cache_key, slot_idx) {
|
||||
// TODO: put to free list
|
||||
|
||||
// We now just loop back to start from beginning. This is not
|
||||
// optimal, we'll perform the lookup in the mapping again, which
|
||||
// is not really necessary because we already got
|
||||
// 'existing_slot_idx'. But this shouldn't happen often enough
|
||||
// to matter much.
|
||||
continue;
|
||||
}
|
||||
|
||||
// Make the slot ready
|
||||
let slot = &self.slots[slot_idx];
|
||||
inner.key = Some(cache_key.clone());
|
||||
slot.set_usage_count(1);
|
||||
|
||||
debug_assert!(
|
||||
{
|
||||
let guard = inner.permit.lock().unwrap();
|
||||
guard.upgrade().is_none()
|
||||
},
|
||||
"we hold a write lock, so, no one else should have a permit"
|
||||
);
|
||||
|
||||
return Ok(WriteBufResult::NotFound(PageWriteGuard {
|
||||
_permit: permit.take().unwrap(),
|
||||
inner,
|
||||
valid: false,
|
||||
}));
|
||||
}
|
||||
}
|
||||
@@ -775,7 +789,7 @@ impl PageCache {
|
||||
///
|
||||
/// Like 'search_mapping, but performs an "exact" search. Used for
|
||||
/// allocating a new buffer.
|
||||
fn search_mapping_for_write(&self, key: &CacheKey) -> Option<usize> {
|
||||
fn search_mapping_exact(&self, key: &CacheKey) -> Option<usize> {
|
||||
match key {
|
||||
CacheKey::MaterializedPage { hash_key, lsn } => {
|
||||
let map = self.materialized_page_map.read().unwrap();
|
||||
@@ -882,10 +896,12 @@ impl PageCache {
|
||||
///
|
||||
/// On return, the slot is empty and write-locked.
|
||||
async fn find_victim(
|
||||
&self,
|
||||
&'static self,
|
||||
_permit_witness: &PinnedSlotsPermit,
|
||||
) -> anyhow::Result<(usize, tokio::sync::RwLockWriteGuard<SlotInner>)> {
|
||||
let iter_limit = self.slots.len() * 10;
|
||||
// Get in line.
|
||||
let receiver = self.find_victim_waiters.recv();
|
||||
|
||||
let mut iters = 0;
|
||||
loop {
|
||||
iters += 1;
|
||||
@@ -897,41 +913,8 @@ impl PageCache {
|
||||
let mut inner = match slot.inner.try_write() {
|
||||
Ok(inner) => inner,
|
||||
Err(_err) => {
|
||||
if iters > iter_limit {
|
||||
// NB: Even with the permits, there's no hard guarantee that we will find a slot with
|
||||
// any particular number of iterations: other threads might race ahead and acquire and
|
||||
// release pins just as we're scanning the array.
|
||||
//
|
||||
// Imagine that nslots is 2, and as starting point, usage_count==1 on all
|
||||
// slots. There are two threads running concurrently, A and B. A has just
|
||||
// acquired the permit from the semaphore.
|
||||
//
|
||||
// A: Look at slot 1. Its usage_count == 1, so decrement it to zero, and continue the search
|
||||
// B: Acquire permit.
|
||||
// B: Look at slot 2, decrement its usage_count to zero and continue the search
|
||||
// B: Look at slot 1. Its usage_count is zero, so pin it and bump up its usage_count to 1.
|
||||
// B: Release pin and permit again
|
||||
// B: Acquire permit.
|
||||
// B: Look at slot 2. Its usage_count is zero, so pin it and bump up its usage_count to 1.
|
||||
// B: Release pin and permit again
|
||||
//
|
||||
// Now we're back in the starting situation that both slots have
|
||||
// usage_count 1, but A has now been through one iteration of the
|
||||
// find_victim() loop. This can repeat indefinitely and on each
|
||||
// iteration, A's iteration count increases by one.
|
||||
//
|
||||
// So, even though the semaphore for the permits is fair, the victim search
|
||||
// itself happens in parallel and is not fair.
|
||||
// Hence even with a permit, a task can theoretically be starved.
|
||||
// To avoid this, we'd need tokio to give priority to tasks that are holding
|
||||
// permits for longer.
|
||||
// Note that just yielding to tokio during iteration without such
|
||||
// priority boosting is likely counter-productive. We'd just give more opportunities
|
||||
// for B to bump usage count, further starving A.
|
||||
crate::metrics::page_cache_errors_inc(
|
||||
crate::metrics::PageCacheErrorKind::EvictIterLimit,
|
||||
);
|
||||
anyhow::bail!("exceeded evict iter limit");
|
||||
if iters > self.slots.len() * (MAX_USAGE_COUNT as usize) {
|
||||
unreachable!("find_victim_waiters prevents starvation");
|
||||
}
|
||||
continue;
|
||||
}
|
||||
@@ -942,7 +925,16 @@ impl PageCache {
|
||||
inner.key = None;
|
||||
}
|
||||
crate::metrics::PAGE_CACHE_FIND_VICTIMS_ITERS_TOTAL.inc_by(iters as u64);
|
||||
return Ok((slot_idx, inner));
|
||||
self.find_victim_sender
|
||||
.try_send((slot_idx, inner))
|
||||
.expect("we always get in line first");
|
||||
match futures::poll!(receiver) {
|
||||
Poll::Ready(Ok(res)) => return Ok(res),
|
||||
Poll::Ready(Err(_closed)) => unreachable!("we never close"),
|
||||
Poll::Pending => {
|
||||
unreachable!("we just sent to the channel and got in line earlier")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -979,6 +971,7 @@ impl PageCache {
|
||||
})
|
||||
.collect();
|
||||
|
||||
let (find_victim_sender, find_victim_waiters) = async_channel::bounded(num_pages);
|
||||
Self {
|
||||
materialized_page_map: Default::default(),
|
||||
immutable_page_map: Default::default(),
|
||||
@@ -986,6 +979,8 @@ impl PageCache {
|
||||
next_evict_slot: AtomicUsize::new(0),
|
||||
size_metrics,
|
||||
pinned_slots: Arc::new(tokio::sync::Semaphore::new(num_pages)),
|
||||
find_victim_sender,
|
||||
find_victim_waiters,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -20,10 +20,10 @@ use std::io::{Error, ErrorKind};
|
||||
|
||||
impl<'a> BlockCursor<'a> {
|
||||
/// Read a blob into a new buffer.
|
||||
pub async fn read_blob(
|
||||
pub async fn read_blob<'c>(
|
||||
&self,
|
||||
offset: u64,
|
||||
ctx: &RequestContext,
|
||||
ctx: &'c RequestContext,
|
||||
) -> Result<Vec<u8>, std::io::Error> {
|
||||
let mut buf = Vec::new();
|
||||
self.read_blob_into_buf(offset, &mut buf, ctx).await?;
|
||||
@@ -31,11 +31,11 @@ impl<'a> BlockCursor<'a> {
|
||||
}
|
||||
/// Read blob into the given buffer. Any previous contents in the buffer
|
||||
/// are overwritten.
|
||||
pub async fn read_blob_into_buf(
|
||||
pub async fn read_blob_into_buf<'c>(
|
||||
&self,
|
||||
offset: u64,
|
||||
dstbuf: &mut Vec<u8>,
|
||||
ctx: &RequestContext,
|
||||
ctx: &'c RequestContext,
|
||||
) -> Result<(), std::io::Error> {
|
||||
let mut blknum = (offset / PAGE_SZ as u64) as u32;
|
||||
let mut off = (offset % PAGE_SZ as u64) as usize;
|
||||
|
||||
@@ -34,27 +34,27 @@ where
|
||||
}
|
||||
|
||||
/// Reference to an in-memory copy of an immutable on-disk block.
|
||||
pub enum BlockLease<'a> {
|
||||
PageReadGuard(PageReadGuard<'static>),
|
||||
pub enum BlockLease<'c, 'a> {
|
||||
PageReadGuard(PageReadGuard<'c, 'static>),
|
||||
EphemeralFileMutableTail(&'a [u8; PAGE_SZ]),
|
||||
#[cfg(test)]
|
||||
Arc(std::sync::Arc<[u8; PAGE_SZ]>),
|
||||
}
|
||||
|
||||
impl From<PageReadGuard<'static>> for BlockLease<'static> {
|
||||
fn from(value: PageReadGuard<'static>) -> BlockLease<'static> {
|
||||
impl<'c, 'a> From<PageReadGuard<'c, 'a>> for BlockLease<'c, 'a> {
|
||||
fn from(value: PageReadGuard<'c, 'a>) -> BlockLease<'c, 'a> {
|
||||
BlockLease::PageReadGuard(value)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
impl<'a> From<std::sync::Arc<[u8; PAGE_SZ]>> for BlockLease<'a> {
|
||||
impl<'c, 'a> From<std::sync::Arc<[u8; PAGE_SZ]>> for BlockLease<'c, 'a> {
|
||||
fn from(value: std::sync::Arc<[u8; PAGE_SZ]>) -> Self {
|
||||
BlockLease::Arc(value)
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> Deref for BlockLease<'a> {
|
||||
impl<'c, 'a> Deref for BlockLease<'c, 'a> {
|
||||
type Target = [u8; PAGE_SZ];
|
||||
|
||||
fn deref(&self) -> &Self::Target {
|
||||
@@ -83,11 +83,11 @@ pub(crate) enum BlockReaderRef<'a> {
|
||||
|
||||
impl<'a> BlockReaderRef<'a> {
|
||||
#[inline(always)]
|
||||
async fn read_blk(
|
||||
async fn read_blk<'c>(
|
||||
&self,
|
||||
blknum: u32,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<BlockLease, std::io::Error> {
|
||||
ctx: &'c RequestContext,
|
||||
) -> Result<BlockLease<'c, '_>, std::io::Error> {
|
||||
use BlockReaderRef::*;
|
||||
match self {
|
||||
FileBlockReader(r) => r.read_blk(blknum, ctx).await,
|
||||
@@ -141,11 +141,11 @@ impl<'a> BlockCursor<'a> {
|
||||
/// access to the contents of the page. (For the page cache, the
|
||||
/// lease object represents a lock on the buffer.)
|
||||
#[inline(always)]
|
||||
pub async fn read_blk(
|
||||
pub async fn read_blk<'c>(
|
||||
&self,
|
||||
blknum: u32,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<BlockLease, std::io::Error> {
|
||||
ctx: &'c RequestContext,
|
||||
) -> Result<BlockLease<'c, '_>, std::io::Error> {
|
||||
self.reader.read_blk(blknum, ctx).await
|
||||
}
|
||||
}
|
||||
@@ -180,32 +180,27 @@ impl FileBlockReader {
|
||||
/// Returns a "lease" object that can be used to
|
||||
/// access to the contents of the page. (For the page cache, the
|
||||
/// lease object represents a lock on the buffer.)
|
||||
pub async fn read_blk(
|
||||
pub async fn read_blk<'c>(
|
||||
&self,
|
||||
blknum: u32,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<BlockLease, std::io::Error> {
|
||||
ctx: &'c RequestContext,
|
||||
) -> Result<BlockLease<'c, 'static>, std::io::Error> {
|
||||
let cache = page_cache::get();
|
||||
loop {
|
||||
match cache
|
||||
.read_immutable_buf(self.file_id, blknum, ctx)
|
||||
.await
|
||||
.map_err(|e| {
|
||||
std::io::Error::new(
|
||||
std::io::ErrorKind::Other,
|
||||
format!("Failed to read immutable buf: {e:#}"),
|
||||
)
|
||||
})? {
|
||||
ReadBufResult::Found(guard) => break Ok(guard.into()),
|
||||
ReadBufResult::NotFound(mut write_guard) => {
|
||||
// Read the page from disk into the buffer
|
||||
self.fill_buffer(write_guard.deref_mut(), blknum).await?;
|
||||
write_guard.mark_valid();
|
||||
|
||||
// Swap for read lock
|
||||
continue;
|
||||
}
|
||||
};
|
||||
match cache
|
||||
.read_immutable_buf(self.file_id, blknum, ctx)
|
||||
.await
|
||||
.map_err(|e| {
|
||||
std::io::Error::new(
|
||||
std::io::ErrorKind::Other,
|
||||
format!("Failed to read immutable buf: {e:#}"),
|
||||
)
|
||||
})? {
|
||||
ReadBufResult::Found(guard) => Ok(guard.into()),
|
||||
ReadBufResult::NotFound(mut write_guard) => {
|
||||
// Read the page from disk into the buffer
|
||||
self.fill_buffer(write_guard.deref_mut(), blknum).await?;
|
||||
Ok(write_guard.mark_valid().into())
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -64,44 +64,40 @@ impl EphemeralFile {
|
||||
self.len
|
||||
}
|
||||
|
||||
pub(crate) async fn read_blk(
|
||||
pub(crate) async fn read_blk<'c>(
|
||||
&self,
|
||||
blknum: u32,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<BlockLease, io::Error> {
|
||||
ctx: &'c RequestContext,
|
||||
) -> Result<BlockLease<'c, '_>, io::Error> {
|
||||
let flushed_blknums = 0..self.len / PAGE_SZ as u64;
|
||||
if flushed_blknums.contains(&(blknum as u64)) {
|
||||
let cache = page_cache::get();
|
||||
loop {
|
||||
match cache
|
||||
.read_immutable_buf(self.page_cache_file_id, blknum, ctx)
|
||||
.await
|
||||
.map_err(|e| {
|
||||
std::io::Error::new(
|
||||
std::io::ErrorKind::Other,
|
||||
// order path before error because error is anyhow::Error => might have many contexts
|
||||
format!(
|
||||
"ephemeral file: read immutable page #{}: {}: {:#}",
|
||||
blknum, self.file.path, e,
|
||||
),
|
||||
)
|
||||
})? {
|
||||
page_cache::ReadBufResult::Found(guard) => {
|
||||
return Ok(BlockLease::PageReadGuard(guard))
|
||||
}
|
||||
page_cache::ReadBufResult::NotFound(mut write_guard) => {
|
||||
let buf: &mut [u8] = write_guard.deref_mut();
|
||||
debug_assert_eq!(buf.len(), PAGE_SZ);
|
||||
self.file
|
||||
.read_exact_at(&mut buf[..], blknum as u64 * PAGE_SZ as u64)
|
||||
.await?;
|
||||
write_guard.mark_valid();
|
||||
|
||||
// Swap for read lock
|
||||
continue;
|
||||
}
|
||||
};
|
||||
}
|
||||
match cache
|
||||
.read_immutable_buf(self.page_cache_file_id, blknum, ctx)
|
||||
.await
|
||||
.map_err(|e| {
|
||||
std::io::Error::new(
|
||||
std::io::ErrorKind::Other,
|
||||
// order path before error because error is anyhow::Error => might have many contexts
|
||||
format!(
|
||||
"ephemeral file: read immutable page #{}: {}: {:#}",
|
||||
blknum, self.file.path, e,
|
||||
),
|
||||
)
|
||||
})? {
|
||||
page_cache::ReadBufResult::Found(guard) => {
|
||||
return Ok(BlockLease::PageReadGuard(guard))
|
||||
}
|
||||
page_cache::ReadBufResult::NotFound(mut write_guard) => {
|
||||
let buf: &mut [u8] = write_guard.deref_mut();
|
||||
debug_assert_eq!(buf.len(), PAGE_SZ);
|
||||
self.file
|
||||
.read_exact_at(&mut buf[..], blknum as u64 * PAGE_SZ as u64)
|
||||
.await?;
|
||||
let read_guard = write_guard.mark_valid();
|
||||
return Ok(BlockLease::PageReadGuard(read_guard));
|
||||
}
|
||||
};
|
||||
} else {
|
||||
debug_assert_eq!(blknum as u64, self.len / PAGE_SZ as u64);
|
||||
Ok(BlockLease::EphemeralFileMutableTail(&self.mutable_tail))
|
||||
@@ -171,7 +167,7 @@ impl EphemeralFile {
|
||||
let buf: &mut [u8] = write_guard.deref_mut();
|
||||
debug_assert_eq!(buf.len(), PAGE_SZ);
|
||||
buf.copy_from_slice(&self.ephemeral_file.mutable_tail);
|
||||
write_guard.mark_valid();
|
||||
let _ = write_guard.mark_valid();
|
||||
// pre-warm successful
|
||||
}
|
||||
Err(e) => {
|
||||
|
||||
@@ -549,7 +549,7 @@ impl DeltaLayer {
|
||||
/// Loads all keys stored in the layer. Returns key, lsn, value size and value reference.
|
||||
///
|
||||
/// The value can be obtained via the [`ValueRef::load`] function.
|
||||
pub(crate) async fn load_keys(&self, ctx: &RequestContext) -> Result<Vec<DeltaEntry<'_>>> {
|
||||
pub(crate) async fn load_keys<'c>(&self, ctx: &RequestContext) -> Result<Vec<DeltaEntry<'_>>> {
|
||||
let inner = self
|
||||
.load(LayerAccessKind::KeyIter, ctx)
|
||||
.await
|
||||
@@ -1038,9 +1038,9 @@ pub struct ValueRef<'a> {
|
||||
reader: BlockCursor<'a>,
|
||||
}
|
||||
|
||||
impl<'a> ValueRef<'a> {
|
||||
impl<'c, 'a> ValueRef<'a> {
|
||||
/// Loads the value from disk
|
||||
pub async fn load(&self, ctx: &RequestContext) -> Result<Value> {
|
||||
pub async fn load(&self, ctx: &'c RequestContext) -> Result<Value> {
|
||||
// theoretically we *could* record an access time for each, but it does not really matter
|
||||
let buf = self.reader.read_blob(self.blob_ref.pos(), ctx).await?;
|
||||
let val = Value::des(&buf)?;
|
||||
@@ -1051,11 +1051,11 @@ impl<'a> ValueRef<'a> {
|
||||
pub(crate) struct Adapter<T>(T);
|
||||
|
||||
impl<T: AsRef<DeltaLayerInner>> Adapter<T> {
|
||||
pub(crate) async fn read_blk(
|
||||
pub(crate) async fn read_blk<'c>(
|
||||
&self,
|
||||
blknum: u32,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<BlockLease, std::io::Error> {
|
||||
ctx: &'c RequestContext,
|
||||
) -> Result<BlockLease<'c, '_>, std::io::Error> {
|
||||
self.0.as_ref().file.read_blk(blknum, ctx).await
|
||||
}
|
||||
}
|
||||
|
||||
@@ -158,7 +158,7 @@ pub struct Timeline {
|
||||
|
||||
/// The generation of the tenant that instantiated us: this is used for safety when writing remote objects.
|
||||
/// Never changes for the lifetime of this [`Timeline`] object.
|
||||
///
|
||||
///
|
||||
/// This duplicates the generation stored in LocationConf, but that structure is mutable:
|
||||
/// this copy enforces the invariant that generatio doesn't change during a Tenant's lifetime.
|
||||
generation: Generation,
|
||||
@@ -505,7 +505,7 @@ impl Timeline {
|
||||
timer.stop_and_record();
|
||||
|
||||
let start = Instant::now();
|
||||
let res = self.reconstruct_value(key, lsn, reconstruct_state).await;
|
||||
let res = self.reconstruct_value(key, lsn, reconstruct_state, ctx).await;
|
||||
let elapsed = start.elapsed();
|
||||
crate::metrics::RECONSTRUCT_TIME
|
||||
.for_result(&res)
|
||||
@@ -4279,6 +4279,7 @@ impl Timeline {
|
||||
key: Key,
|
||||
request_lsn: Lsn,
|
||||
mut data: ValueReconstructState,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<Bytes, PageReconstructError> {
|
||||
// Perform WAL redo if needed
|
||||
data.records.reverse();
|
||||
@@ -4342,6 +4343,7 @@ impl Timeline {
|
||||
key,
|
||||
last_rec_lsn,
|
||||
&img,
|
||||
ctx,
|
||||
)
|
||||
.await
|
||||
.context("Materialized page memoization failed")
|
||||
|
||||
@@ -16,7 +16,7 @@
|
||||
use std::{
|
||||
collections::HashMap,
|
||||
ops::ControlFlow,
|
||||
sync::Arc,
|
||||
sync::{Arc, Mutex},
|
||||
time::{Duration, SystemTime},
|
||||
};
|
||||
|
||||
@@ -25,7 +25,7 @@ use tokio_util::sync::CancellationToken;
|
||||
use tracing::{debug, error, info, info_span, instrument, warn, Instrument};
|
||||
|
||||
use crate::{
|
||||
context::{DownloadBehavior, RequestContext},
|
||||
context::{DownloadBehavior, RequestContext, RequestContextBuilder},
|
||||
task_mgr::{self, TaskKind, BACKGROUND_RUNTIME},
|
||||
tenant::{
|
||||
config::{EvictionPolicy, EvictionPolicyLayerAccessThreshold},
|
||||
@@ -397,9 +397,14 @@ impl Timeline {
|
||||
}
|
||||
}
|
||||
|
||||
let permit = crate::page_cache::get().get_permit().await;
|
||||
let ctx = RequestContextBuilder::extend(ctx)
|
||||
.page_cache_permit(permit)
|
||||
.build();
|
||||
|
||||
// imitiate repartiting on first compactation
|
||||
if let Err(e) = self
|
||||
.collect_keyspace(lsn, ctx)
|
||||
.collect_keyspace(lsn, &ctx)
|
||||
.instrument(info_span!("collect_keyspace"))
|
||||
.await
|
||||
{
|
||||
|
||||
@@ -544,7 +544,7 @@ impl VirtualFile {
|
||||
pub(crate) async fn read_blk(
|
||||
&self,
|
||||
blknum: u32,
|
||||
) -> Result<crate::tenant::block_io::BlockLease<'_>, std::io::Error> {
|
||||
) -> Result<crate::tenant::block_io::BlockLease<'_, '_>, std::io::Error> {
|
||||
use crate::page_cache::PAGE_SZ;
|
||||
let mut buf = [0; PAGE_SZ];
|
||||
self.read_exact_at(&mut buf, blknum as u64 * (PAGE_SZ as u64))
|
||||
|
||||
Reference in New Issue
Block a user