mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-16 18:02:56 +00:00
hand-roll it instead
This commit is contained in:
27
Cargo.lock
generated
27
Cargo.lock
generated
@@ -158,17 +158,6 @@ dependencies = [
|
||||
"syn 1.0.109",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "async-channel"
|
||||
version = "1.9.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "81953c529336010edd6d8e358f886d9581267795c61b19475b71314bffa46d35"
|
||||
dependencies = [
|
||||
"concurrent-queue",
|
||||
"event-listener",
|
||||
"futures-core",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "async-compression"
|
||||
version = "0.4.0"
|
||||
@@ -1042,15 +1031,6 @@ dependencies = [
|
||||
"zstd",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "concurrent-queue"
|
||||
version = "2.3.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "f057a694a54f12365049b0958a1685bb52d567f5593b355fbf685838e873d400"
|
||||
dependencies = [
|
||||
"crossbeam-utils",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "const_format"
|
||||
version = "0.2.30"
|
||||
@@ -1472,12 +1452,6 @@ dependencies = [
|
||||
"libc",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "event-listener"
|
||||
version = "2.5.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "0206175f82b8d6bf6652ff7d71a1e27fd2e4efde587fd368662814d6ec1d9ce0"
|
||||
|
||||
[[package]]
|
||||
name = "fail"
|
||||
version = "0.5.1"
|
||||
@@ -2700,7 +2674,6 @@ name = "pageserver"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"async-channel",
|
||||
"async-compression",
|
||||
"async-stream",
|
||||
"async-trait",
|
||||
|
||||
@@ -35,7 +35,6 @@ license = "Apache-2.0"
|
||||
## All dependency versions, used in the project
|
||||
[workspace.dependencies]
|
||||
anyhow = { version = "1.0", features = ["backtrace"] }
|
||||
async-channel = "1.9.0"
|
||||
async-compression = { version = "0.4.0", features = ["tokio", "gzip"] }
|
||||
flate2 = "1.0.26"
|
||||
async-stream = "0.3"
|
||||
|
||||
@@ -12,7 +12,6 @@ testing = ["fail/failpoints"]
|
||||
|
||||
[dependencies]
|
||||
anyhow.workspace = true
|
||||
async-channel.workspace = true
|
||||
async-compression.workspace = true
|
||||
async-stream.workspace = true
|
||||
async-trait.workspace = true
|
||||
|
||||
@@ -72,13 +72,14 @@
|
||||
//!
|
||||
|
||||
use std::{
|
||||
collections::{hash_map::Entry, HashMap},
|
||||
collections::{hash_map::Entry, HashMap, VecDeque},
|
||||
convert::TryInto,
|
||||
future::poll_fn,
|
||||
sync::{
|
||||
atomic::{AtomicU64, AtomicU8, AtomicUsize, Ordering},
|
||||
Arc, Weak,
|
||||
Arc, Mutex, Weak,
|
||||
},
|
||||
task::Poll,
|
||||
task::{Poll, Waker},
|
||||
time::Duration,
|
||||
};
|
||||
|
||||
@@ -252,10 +253,16 @@ pub struct PageCache {
|
||||
/// This is interpreted modulo the page cache size.
|
||||
next_evict_slot: AtomicUsize,
|
||||
|
||||
find_victim_sender:
|
||||
async_channel::Sender<(usize, tokio::sync::RwLockWriteGuard<'static, SlotInner>)>,
|
||||
find_victim_waiters:
|
||||
async_channel::Receiver<(usize, tokio::sync::RwLockWriteGuard<'static, SlotInner>)>,
|
||||
find_victim_waiters: Mutex<(
|
||||
VecDeque<usize>,
|
||||
VecDeque<usize>,
|
||||
Vec<
|
||||
Option<(
|
||||
Option<Waker>,
|
||||
Option<(usize, tokio::sync::RwLockWriteGuard<'static, SlotInner>)>,
|
||||
)>,
|
||||
>,
|
||||
)>,
|
||||
|
||||
size_metrics: &'static PageCacheSizeMetrics,
|
||||
}
|
||||
@@ -870,7 +877,17 @@ impl PageCache {
|
||||
_permit_witness: &PinnedSlotsPermit,
|
||||
) -> anyhow::Result<(usize, tokio::sync::RwLockWriteGuard<SlotInner>)> {
|
||||
// Get in line.
|
||||
let receiver = self.find_victim_waiters.recv();
|
||||
let my_idx = {
|
||||
let mut guard = self.find_victim_waiters.lock().unwrap();
|
||||
let (waiters, free, slots) = &mut *guard;
|
||||
let my_idx = free
|
||||
.pop_front()
|
||||
.expect("can't happen, len(slots) = len(waiters");
|
||||
waiters.push_back(my_idx);
|
||||
let prev = slots[my_idx].replace((None, None));
|
||||
debug_assert!(prev.is_none());
|
||||
my_idx
|
||||
};
|
||||
|
||||
let mut iters = 0;
|
||||
loop {
|
||||
@@ -895,16 +912,42 @@ impl PageCache {
|
||||
inner.key = None;
|
||||
}
|
||||
crate::metrics::PAGE_CACHE_FIND_VICTIMS_ITERS_TOTAL.inc_by(iters as u64);
|
||||
self.find_victim_sender
|
||||
.try_send((slot_idx, inner))
|
||||
.expect("we always get in line first");
|
||||
match futures::poll!(receiver) {
|
||||
Poll::Ready(Ok(res)) => return Ok(res),
|
||||
Poll::Ready(Err(_closed)) => unreachable!("we never close"),
|
||||
Poll::Pending => {
|
||||
unreachable!("we just sent to the channel and got in line earlier")
|
||||
|
||||
// put in the victim we found
|
||||
{
|
||||
let mut slots_guard = self.find_victim_waiters.lock().unwrap();
|
||||
let (waiters, _, slots) = &mut *slots_guard;
|
||||
let winner_idx = waiters.pop_front().expect("we put ourselves in");
|
||||
let winner_slot = slots[winner_idx].as_mut().unwrap();
|
||||
let prev = winner_slot.1.replace((slot_idx, inner));
|
||||
debug_assert!(
|
||||
prev.is_none(),
|
||||
"ensure we didn't mess up this simple ring buffer structure"
|
||||
);
|
||||
if let Some(waker) = winner_slot.0.take() {
|
||||
waker.wake()
|
||||
}
|
||||
}
|
||||
|
||||
// take the victim that was found by someone else
|
||||
return Ok(poll_fn(move |cx| {
|
||||
let mut guard = self.find_victim_waiters.lock().unwrap();
|
||||
let (_, free, waiters) = &mut *guard;
|
||||
|
||||
let my_slot = waiters[my_idx].as_mut().unwrap();
|
||||
if let Some(res) = my_slot.1.take() {
|
||||
// .take() resets the waiters slot to None
|
||||
free.push_back(my_idx);
|
||||
debug_assert!(my_slot.0.is_none());
|
||||
debug_assert!(my_slot.1.is_none());
|
||||
waiters[my_idx] = None;
|
||||
return Poll::Ready(res);
|
||||
}
|
||||
let prev = my_slot.0.replace(cx.waker().clone());
|
||||
debug_assert!(prev.is_none());
|
||||
Poll::Pending
|
||||
})
|
||||
.await);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -941,7 +984,6 @@ impl PageCache {
|
||||
})
|
||||
.collect();
|
||||
|
||||
let (find_victim_sender, find_victim_waiters) = async_channel::bounded(num_pages);
|
||||
Self {
|
||||
materialized_page_map: Default::default(),
|
||||
immutable_page_map: Default::default(),
|
||||
@@ -949,8 +991,11 @@ impl PageCache {
|
||||
next_evict_slot: AtomicUsize::new(0),
|
||||
size_metrics,
|
||||
pinned_slots: Arc::new(tokio::sync::Semaphore::new(num_pages)),
|
||||
find_victim_sender,
|
||||
find_victim_waiters,
|
||||
find_victim_waiters: Mutex::new((VecDeque::new(), (0..num_pages).collect(), {
|
||||
let mut v = Vec::with_capacity(num_pages);
|
||||
v.resize_with(num_pages, || None);
|
||||
v
|
||||
})),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user