commented out the check for just-once-polled, works now, don't understand why though

This commit is contained in:
Christian Schwarz
2023-10-09 19:26:47 +02:00
parent f5bbba5014
commit 174bceccb1
2 changed files with 34 additions and 26 deletions

View File

@@ -18,7 +18,7 @@ use camino::Utf8PathBuf;
use pageserver_api::models::{self, TenantInfo, TimelineInfo};
use postgres_backend::AuthType;
use postgres_connection::{parse_host_port, PgConnectionConfig};
use reqwest::blocking::{Client, RequestBuilder, Response};
use reqwest::blocking::{Client, ClientBuilder, RequestBuilder, Response};
use reqwest::{IntoUrl, Method};
use thiserror::Error;
use utils::auth::{Claims, Scope};
@@ -93,7 +93,7 @@ impl PageServerNode {
pg_connection_config: PgConnectionConfig::new_host_port(host, port),
conf: conf.clone(),
env: env.clone(),
http_client: Client::new(),
http_client: ClientBuilder::new().timeout(None).build().unwrap(),
http_base_url: format!("http://{}/v1", conf.listen_http_addr),
}
}

View File

@@ -72,7 +72,7 @@
//!
use std::{
collections::{hash_map::Entry, HashMap, VecDeque},
collections::{hash_map::Entry, HashMap, VecDeque, HashSet},
convert::TryInto,
future::poll_fn,
sync::{
@@ -85,6 +85,7 @@ use std::{
use anyhow::Context;
use once_cell::sync::OnceCell;
use tracing::{Instrument, instrument};
use utils::{
id::{TenantId, TimelineId},
lsn::Lsn,
@@ -452,6 +453,7 @@ impl PageCache {
///
/// Store an image of the given page in the cache.
///
#[instrument(skip_all, level = "trace", fields(%key, %lsn))]
pub async fn memorize_materialized_page(
&'static self,
tenant_id: TenantId,
@@ -544,6 +546,7 @@ impl PageCache {
// Section 1.2: Public interface functions for working with immutable file pages.
#[instrument(skip_all, level = "trace", fields(?file_id, ?blkno))]
pub async fn read_immutable_buf(
&'static self,
file_id: FileId,
@@ -887,6 +890,8 @@ impl PageCache {
>,
)>| {
let (waiters, free, slots) = &**guard;
let waiters = waiters.iter().copied().collect::<HashSet<_>>();
let free = free.iter().copied().collect::<HashSet<_>>();
for (slot_idx, slot) in slots.iter().enumerate() {
match slot {
None => {
@@ -915,6 +920,7 @@ impl PageCache {
// Get in line.
let my_waitslot_idx = {
tracing::trace!("get in line locking waiters");
let mut guard = self.find_victim_waiters.lock().unwrap();
integrity_check(&guard);
let (waiters, free, waitslot) = &mut *guard;
@@ -928,6 +934,9 @@ impl PageCache {
my_waitslot_idx
};
let span = tracing::info_span!("find_victim", my_waitslot_idx);
let _enter = span.enter();
let mut iters = 0;
loop {
iters += 1;
@@ -954,10 +963,12 @@ impl PageCache {
// put in the victim we found
{
tracing::trace!("found victim locking waiters");
let mut guard = self.find_victim_waiters.lock().unwrap();
integrity_check(&guard);
let (waiters, _, waitslots) = &mut *guard;
let winner_idx = waiters.pop_front().expect("we put ourselves in");
tracing::trace!(winner_idx, "putting victim into next waiters slot");
let winner_slot = waitslots[winner_idx].as_mut().unwrap();
let prev = winner_slot.1.replace((slot_idx, inner));
debug_assert!(
@@ -965,6 +976,7 @@ impl PageCache {
"ensure we didn't mess up this simple ring buffer structure"
);
if let Some(waker) = winner_slot.0.take() {
tracing::trace!(winner_idx, "waking up winner");
waker.wake()
}
integrity_check(&guard);
@@ -976,18 +988,20 @@ impl PageCache {
}));
// take the victim that was found by someone else
let mut fut = poll_fn(move |cx| {
return Ok(poll_fn(move |cx| {
poll_num += 1;
tracing::trace!(poll_num, "poll_fn locking waiters");
let mut guard = self.find_victim_waiters.lock().unwrap();
integrity_check(&guard);
let (_, free, waitslots) = &mut *guard;
let my_waitslot = waitslots[my_waitslot_idx].as_mut().unwrap();
poll_num += 1;
assert!(
poll_num <= 2,
"once we place the waker in the slot, next wakeup should have a result: {}",
my_waitslot.1.is_some()
);
// assert!(
// poll_num <= 2,
// "once we place the waker in the slot, next wakeup should have a result: {}",
// my_waitslot.1.is_some()
// );
if let Some(res) = my_waitslot.1.take() {
tracing::trace!(poll_num, "have cache slot");
// above .take() resets the waiters slot to None
free.push_back(my_waitslot_idx);
debug_assert!(my_waitslot.0.is_none());
@@ -997,23 +1011,17 @@ impl PageCache {
integrity_check(&guard);
return Poll::Ready(res);
}
assert_eq!(poll_num, 1);
let prev = my_waitslot.0.replace(cx.waker().clone());
debug_assert!(prev.is_none());
integrity_check(&guard);
Poll::Pending
});
loop {
match tokio::time::timeout(Duration::from_secs(1), &mut fut).await {
Ok(res) => return Ok(res),
Err(_) => {
tracing::warn!(
"find_victim: timeout waiting for victim\n{}",
std::backtrace::Backtrace::force_capture()
);
}
// assert_eq!(poll_num, 1);
if !my_waitslot.0.as_ref().map(|existing| cx.waker().will_wake(existing)).unwrap_or(false) {
let prev = my_waitslot.0.replace(cx.waker().clone());
tracing::trace!(poll_num, prev_is_some=prev.is_some(), "updating waker");
}
}
integrity_check(&guard);
tracing::trace!(poll_num, "waiting to be woken up");
Poll::Pending
})
.instrument(span.clone())
.await);
}
}
}