diff --git a/control_plane/src/pageserver.rs b/control_plane/src/pageserver.rs index 0746dde4ef..179724efc3 100644 --- a/control_plane/src/pageserver.rs +++ b/control_plane/src/pageserver.rs @@ -18,7 +18,7 @@ use camino::Utf8PathBuf; use pageserver_api::models::{self, TenantInfo, TimelineInfo}; use postgres_backend::AuthType; use postgres_connection::{parse_host_port, PgConnectionConfig}; -use reqwest::blocking::{Client, RequestBuilder, Response}; +use reqwest::blocking::{Client, ClientBuilder, RequestBuilder, Response}; use reqwest::{IntoUrl, Method}; use thiserror::Error; use utils::auth::{Claims, Scope}; @@ -93,7 +93,7 @@ impl PageServerNode { pg_connection_config: PgConnectionConfig::new_host_port(host, port), conf: conf.clone(), env: env.clone(), - http_client: Client::new(), + http_client: ClientBuilder::new().timeout(None).build().unwrap(), http_base_url: format!("http://{}/v1", conf.listen_http_addr), } } diff --git a/pageserver/src/page_cache.rs b/pageserver/src/page_cache.rs index 24e328c357..3b114a7fd4 100644 --- a/pageserver/src/page_cache.rs +++ b/pageserver/src/page_cache.rs @@ -72,7 +72,7 @@ //! use std::{ - collections::{hash_map::Entry, HashMap, VecDeque}, + collections::{hash_map::Entry, HashMap, VecDeque, HashSet}, convert::TryInto, future::poll_fn, sync::{ @@ -85,6 +85,7 @@ use std::{ use anyhow::Context; use once_cell::sync::OnceCell; +use tracing::{Instrument, instrument}; use utils::{ id::{TenantId, TimelineId}, lsn::Lsn, @@ -452,6 +453,7 @@ impl PageCache { /// /// Store an image of the given page in the cache. /// + #[instrument(skip_all, level = "trace", fields(%key, %lsn))] pub async fn memorize_materialized_page( &'static self, tenant_id: TenantId, @@ -544,6 +546,7 @@ impl PageCache { // Section 1.2: Public interface functions for working with immutable file pages. + #[instrument(skip_all, level = "trace", fields(?file_id, ?blkno))] pub async fn read_immutable_buf( &'static self, file_id: FileId, @@ -887,6 +890,8 @@ impl PageCache { >, )>| { let (waiters, free, slots) = &**guard; + let waiters = waiters.iter().copied().collect::>(); + let free = free.iter().copied().collect::>(); for (slot_idx, slot) in slots.iter().enumerate() { match slot { None => { @@ -915,6 +920,7 @@ impl PageCache { // Get in line. let my_waitslot_idx = { + tracing::trace!("get in line locking waiters"); let mut guard = self.find_victim_waiters.lock().unwrap(); integrity_check(&guard); let (waiters, free, waitslot) = &mut *guard; @@ -928,6 +934,9 @@ impl PageCache { my_waitslot_idx }; + let span = tracing::info_span!("find_victim", my_waitslot_idx); + let _enter = span.enter(); + let mut iters = 0; loop { iters += 1; @@ -954,10 +963,12 @@ impl PageCache { // put in the victim we found { + tracing::trace!("found victim locking waiters"); let mut guard = self.find_victim_waiters.lock().unwrap(); integrity_check(&guard); let (waiters, _, waitslots) = &mut *guard; let winner_idx = waiters.pop_front().expect("we put ourselves in"); + tracing::trace!(winner_idx, "putting victim into next waiters slot"); let winner_slot = waitslots[winner_idx].as_mut().unwrap(); let prev = winner_slot.1.replace((slot_idx, inner)); debug_assert!( @@ -965,6 +976,7 @@ impl PageCache { "ensure we didn't mess up this simple ring buffer structure" ); if let Some(waker) = winner_slot.0.take() { + tracing::trace!(winner_idx, "waking up winner"); waker.wake() } integrity_check(&guard); @@ -976,18 +988,20 @@ impl PageCache { })); // take the victim that was found by someone else - let mut fut = poll_fn(move |cx| { + return Ok(poll_fn(move |cx| { + poll_num += 1; + tracing::trace!(poll_num, "poll_fn locking waiters"); let mut guard = self.find_victim_waiters.lock().unwrap(); integrity_check(&guard); let (_, free, waitslots) = &mut *guard; let my_waitslot = waitslots[my_waitslot_idx].as_mut().unwrap(); - poll_num += 1; - assert!( - poll_num <= 2, - "once we place the waker in the slot, next wakeup should have a result: {}", - my_waitslot.1.is_some() - ); + // assert!( + // poll_num <= 2, + // "once we place the waker in the slot, next wakeup should have a result: {}", + // my_waitslot.1.is_some() + // ); if let Some(res) = my_waitslot.1.take() { + tracing::trace!(poll_num, "have cache slot"); // above .take() resets the waiters slot to None free.push_back(my_waitslot_idx); debug_assert!(my_waitslot.0.is_none()); @@ -997,23 +1011,17 @@ impl PageCache { integrity_check(&guard); return Poll::Ready(res); } - assert_eq!(poll_num, 1); - let prev = my_waitslot.0.replace(cx.waker().clone()); - debug_assert!(prev.is_none()); - integrity_check(&guard); - Poll::Pending - }); - loop { - match tokio::time::timeout(Duration::from_secs(1), &mut fut).await { - Ok(res) => return Ok(res), - Err(_) => { - tracing::warn!( - "find_victim: timeout waiting for victim\n{}", - std::backtrace::Backtrace::force_capture() - ); - } + // assert_eq!(poll_num, 1); + if !my_waitslot.0.as_ref().map(|existing| cx.waker().will_wake(existing)).unwrap_or(false) { + let prev = my_waitslot.0.replace(cx.waker().clone()); + tracing::trace!(poll_num, prev_is_some=prev.is_some(), "updating waker"); } - } + integrity_check(&guard); + tracing::trace!(poll_num, "waiting to be woken up"); + Poll::Pending + }) + .instrument(span.clone()) + .await); } } }