mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-17 10:22:56 +00:00
Compare commits
2 Commits
release-pr
...
problame/s
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
b2c240e445 | ||
|
|
bc534e8bbb |
294
pageserver/examples/sketch_decoupled_pageservice.rs
Normal file
294
pageserver/examples/sketch_decoupled_pageservice.rs
Normal file
@@ -0,0 +1,294 @@
|
||||
//! # Memory Management
|
||||
//!
|
||||
//! Before Timeline shutdown we have a reference cycle
|
||||
//! ```
|
||||
//! Timeline::handlers =strong=> HandlerTimeline =strong=> Timeline
|
||||
//! ```
|
||||
//!
|
||||
//! We do this so the hot path need only do one, uncontended, atomic increment, i.e.,
|
||||
//! the Weak<HandlerTimeline>.
|
||||
//!
|
||||
//! Timeline::shutdown breaks the reference cycle.
|
||||
//!
|
||||
//! Conn shutdown removes the Arc<HandlerTimeline> from Timeline::handlers.
|
||||
|
||||
use tracing::{info, Instrument};
|
||||
|
||||
struct Timeline {
|
||||
tenant_id: TenantId,
|
||||
shard_id: ShardId,
|
||||
timeline_id: TimelineId,
|
||||
|
||||
gate: utils::sync::gate::Gate,
|
||||
// None = shutting down
|
||||
handlers: Mutex<Option<Vec<Arc<HandlerTimeline>>>>,
|
||||
}
|
||||
|
||||
impl Timeline {
|
||||
async fn shutdown(&self) {
|
||||
info!("Shutting down Timeline");
|
||||
scopeguard::defer!(info!("Timeline shut down"));
|
||||
// prevent new handlers from getting created
|
||||
let handlers = self.handlers.lock().expect("mutex poisoned").take();
|
||||
if let Some(handlers) = handlers.as_ref() {
|
||||
info!("Dropping {} Arc<HandlerTimeline>s", handlers.len());
|
||||
}
|
||||
drop(handlers);
|
||||
self.gate.close().await;
|
||||
}
|
||||
fn getpage(&self, key: usize) {
|
||||
info!("get {key}");
|
||||
}
|
||||
fn rel_size(&self) {
|
||||
if self.shard_id != 0 {
|
||||
info!("rel_size not supported on shard {}", self.shard_id);
|
||||
return;
|
||||
}
|
||||
info!("rel_size");
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for Timeline {
|
||||
fn drop(&mut self) {
|
||||
info!("Dropping Timeline");
|
||||
}
|
||||
}
|
||||
|
||||
type TenantId = u64;
|
||||
type ShardId = usize;
|
||||
type TimelineId = String;
|
||||
|
||||
struct PageServerHandler {
|
||||
cache: HashMap<(TenantId, ShardId, TimelineId), Weak<HandlerTimeline>>,
|
||||
}
|
||||
|
||||
impl Drop for PageServerHandler {
|
||||
fn drop(&mut self) {
|
||||
info!("Dropping PageServerHandler, unregistering its handlers from cached Timeline");
|
||||
for (_, cached) in self.cache.drain() {
|
||||
if let Some(cached) = cached.upgrade() {
|
||||
let mut lock_guard = cached.timeline.handlers.lock().expect("mutex poisoned");
|
||||
if let Some(handlers) = &mut *lock_guard {
|
||||
let pre = handlers.len();
|
||||
handlers.retain(|handler| !Arc::ptr_eq(handler, &cached));
|
||||
let post = handlers.len();
|
||||
info!("Removed {} handlers", pre - post);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
struct HandlerTimeline {
|
||||
_gate_guard: utils::sync::gate::GateGuard,
|
||||
timeline: Arc<Timeline>,
|
||||
}
|
||||
|
||||
impl Drop for HandlerTimeline {
|
||||
fn drop(&mut self) {
|
||||
info!("Dropping HandlerTimeline");
|
||||
}
|
||||
}
|
||||
|
||||
impl std::ops::Deref for HandlerTimeline {
|
||||
type Target = Timeline;
|
||||
fn deref(&self) -> &Self::Target {
|
||||
&self.timeline
|
||||
}
|
||||
}
|
||||
|
||||
use std::{
|
||||
collections::HashMap,
|
||||
sync::{Arc, Mutex, Weak},
|
||||
time::Duration,
|
||||
};
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() {
|
||||
utils::logging::init(
|
||||
utils::logging::LogFormat::Plain,
|
||||
utils::logging::TracingErrorLayerEnablement::Disabled,
|
||||
utils::logging::Output::Stderr,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let timeline = Arc::new(Timeline {
|
||||
tenant_id: 23,
|
||||
shard_id: 1,
|
||||
timeline_id: "a".to_owned(),
|
||||
gate: utils::sync::gate::Gate::default(),
|
||||
handlers: Mutex::new(Some(vec![])),
|
||||
});
|
||||
// startup
|
||||
let mgr: Arc<Mutex<HashMap<(TenantId, ShardId, TimelineId), Arc<Timeline>>>> =
|
||||
Arc::new(Mutex::new(HashMap::from_iter([(
|
||||
(
|
||||
timeline.tenant_id,
|
||||
timeline.shard_id,
|
||||
timeline.timeline_id.clone(),
|
||||
),
|
||||
timeline,
|
||||
)])));
|
||||
|
||||
// page_service
|
||||
let page_service = tokio::spawn({
|
||||
let mgr = Arc::clone(&mgr);
|
||||
async move {
|
||||
let conn_loop = async move {
|
||||
loop {
|
||||
struct Conn {}
|
||||
impl Drop for Conn {
|
||||
fn drop(&mut self) {
|
||||
info!("Dropping Conn");
|
||||
}
|
||||
}
|
||||
enum PagestreamRequest {
|
||||
RelSize,
|
||||
GetPage { key: usize },
|
||||
}
|
||||
impl Conn {
|
||||
async fn read_request(&self) -> PagestreamRequest {
|
||||
PagestreamRequest::GetPage { key: 43 }
|
||||
}
|
||||
async fn write_response(&self, _res: ()) {
|
||||
tokio::time::sleep(Duration::from_millis(3000)).await;
|
||||
info!("response written");
|
||||
}
|
||||
}
|
||||
let conn = Conn {};
|
||||
let mut handler = PageServerHandler {
|
||||
cache: HashMap::new(),
|
||||
};
|
||||
|
||||
// immediately enter pagestream sub-protocol, in reality there are other queries
|
||||
let (tenant_id, timeline_id) = (23, "a".to_owned());
|
||||
|
||||
// process requests
|
||||
loop {
|
||||
// read request from the wire
|
||||
let req = conn.read_request().await;
|
||||
|
||||
// shard routing based on request contents
|
||||
let shard = match req {
|
||||
PagestreamRequest::RelSize => {
|
||||
// shard 0
|
||||
let shard = 0;
|
||||
shard
|
||||
}
|
||||
PagestreamRequest::GetPage { key } => {
|
||||
// shard = key % num_shards
|
||||
let shard = key % 2;
|
||||
shard
|
||||
}
|
||||
};
|
||||
|
||||
// lookup the right shard
|
||||
let cached_timeline: Arc<HandlerTimeline> = loop {
|
||||
let key = (tenant_id, shard, timeline_id.clone());
|
||||
let cached = match handler.cache.get(&key) {
|
||||
None => {
|
||||
info!("handle cache miss");
|
||||
None
|
||||
}
|
||||
Some(weak) => match weak.upgrade() {
|
||||
None => {
|
||||
info!("handle cache stale");
|
||||
// clean up cache
|
||||
handler.cache.remove(&key).unwrap();
|
||||
None
|
||||
}
|
||||
Some(timeline) => {
|
||||
info!("handle cache hit");
|
||||
Some(timeline)
|
||||
}
|
||||
},
|
||||
};
|
||||
match cached {
|
||||
Some(timeline) => break timeline,
|
||||
None => {
|
||||
// do the expensive mgr lookup
|
||||
match mgr.lock().expect("poisoned").get(&key) {
|
||||
Some(timeline) => {
|
||||
let gate_guard = match timeline.gate.enter() {
|
||||
Ok(guard) => guard,
|
||||
Err(_) => {
|
||||
// gate is closed
|
||||
return Err::<(), &'static str>(
|
||||
"timeline: gate enter: gate is closed",
|
||||
);
|
||||
}
|
||||
};
|
||||
let handler_timeline = Arc::new(HandlerTimeline {
|
||||
_gate_guard: gate_guard,
|
||||
timeline: Arc::clone(timeline),
|
||||
});
|
||||
{
|
||||
let mut lock_guard = timeline
|
||||
.handlers
|
||||
.lock()
|
||||
.expect("mutex poisoned");
|
||||
match &mut *lock_guard {
|
||||
Some(timeline_handlers_list) => {
|
||||
for handler in timeline_handlers_list.iter() {
|
||||
assert!(!Arc::ptr_eq(
|
||||
handler,
|
||||
&handler_timeline
|
||||
));
|
||||
}
|
||||
timeline_handlers_list
|
||||
.push(Arc::clone(&handler_timeline));
|
||||
handler.cache.insert(
|
||||
key,
|
||||
Arc::downgrade(&handler_timeline),
|
||||
);
|
||||
|
||||
}
|
||||
None => {
|
||||
return Err("mgr: timeline shutdown started but mgr doesn't know yet");
|
||||
}
|
||||
}
|
||||
}
|
||||
break handler_timeline;
|
||||
}
|
||||
None => return Err("mgr: timeline not found"),
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
// execute the request against the right shard
|
||||
let res = match req {
|
||||
PagestreamRequest::RelSize => {
|
||||
cached_timeline.rel_size();
|
||||
}
|
||||
PagestreamRequest::GetPage { key } => {
|
||||
cached_timeline.getpage(key);
|
||||
}
|
||||
};
|
||||
|
||||
// drop the cached timeline before we speak protocol again, because we don't want to prevent timeline
|
||||
// shutdown while we're speaking protocol
|
||||
drop(cached_timeline);
|
||||
|
||||
// write response
|
||||
conn.write_response(res).await;
|
||||
}
|
||||
}
|
||||
};
|
||||
let res: Result<_, &str> = conn_loop.await;
|
||||
info!("conn handler exited {:?}", res);
|
||||
}.instrument(tracing::info_span!("page_service"))
|
||||
});
|
||||
|
||||
// give it some uptime
|
||||
tokio::time::sleep(Duration::from_secs(7)).await;
|
||||
// ctlrc comes in, mgr shutdown
|
||||
for (_, tl) in mgr.lock().expect("poisoned").drain() {
|
||||
tl.shutdown().await;
|
||||
}
|
||||
drop(mgr);
|
||||
|
||||
info!("The conn handler terminates independently, but, we have already dropped the Timeline, having shown that their lifecycles are only coupled while we're processing a request");
|
||||
page_service.await.unwrap();
|
||||
info!("conn handler exited");
|
||||
}
|
||||
@@ -28,6 +28,7 @@ use std::borrow::Cow;
|
||||
use std::collections::HashMap;
|
||||
use std::io;
|
||||
use std::net::TcpListener;
|
||||
use std::ops::Deref;
|
||||
use std::pin::pin;
|
||||
use std::str;
|
||||
use std::str::FromStr;
|
||||
@@ -41,7 +42,6 @@ use tokio_util::io::StreamReader;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::*;
|
||||
use utils::id::ConnectionId;
|
||||
use utils::sync::gate::GateGuard;
|
||||
use utils::{
|
||||
auth::{Claims, Scope, SwappableJwtAuth},
|
||||
id::{TenantId, TimelineId},
|
||||
@@ -291,11 +291,77 @@ async fn page_service_conn_main(
|
||||
}
|
||||
}
|
||||
|
||||
/// While a handler holds a reference to a Timeline, it also holds a the
|
||||
/// timeline's Gate open.
|
||||
struct HandlerTimeline {
|
||||
timeline: Arc<Timeline>,
|
||||
_guard: GateGuard,
|
||||
/// Wrapper around a [`std::sync::Weak<Timeline>`] to enforce separation of the
|
||||
/// lifecycles of page_service connection and timeline.
|
||||
///
|
||||
/// Use [`HandlerTimeline::upgrade`] to get an [`handler_timeline::Entered`] instance.
|
||||
/// This will fail if the Timeline has been cancelled via cancellation
|
||||
/// token or has already been deallocated.
|
||||
/// On success, the returned [`handler_timeline::Entered`] holds the Timeline's gate open.
|
||||
mod handler_timeline {
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use utils::sync::gate::{GateError, GateGuard};
|
||||
|
||||
use crate::tenant::Timeline;
|
||||
use std::sync::{Arc, Weak};
|
||||
|
||||
pub(super) struct HandlerTimeline {
|
||||
timeline: Weak<Timeline>,
|
||||
/// Child token of [`Timeline::cancel`].
|
||||
cancel: CancellationToken,
|
||||
}
|
||||
pub(super) enum Error {
|
||||
Cancelled,
|
||||
}
|
||||
pub(super) struct Entered {
|
||||
timeline: Arc<Timeline>,
|
||||
_gate_guard: GateGuard,
|
||||
}
|
||||
impl std::ops::Deref for Entered {
|
||||
type Target = Timeline;
|
||||
fn deref(&self) -> &Self::Target {
|
||||
&self.timeline
|
||||
}
|
||||
}
|
||||
impl HandlerTimeline {
|
||||
pub(super) fn from(timeline: Arc<Timeline>) -> Result<Self, Error> {
|
||||
Ok(HandlerTimeline {
|
||||
timeline: Arc::downgrade(&timeline),
|
||||
cancel: timeline.cancel.child_token(),
|
||||
})
|
||||
}
|
||||
pub(super) fn upgrade(&self) -> Result<Entered, Error> {
|
||||
// checking our child token instead of the parent token in the timeline
|
||||
// avoids cache contention on `Timeline::cancel`
|
||||
if self.cancel.is_cancelled() {
|
||||
return Err(Error::Cancelled);
|
||||
}
|
||||
let timeline = Weak::upgrade(&self.timeline).ok_or(Error::Cancelled)?;
|
||||
let _gate_guard = match timeline.gate.enter() {
|
||||
Ok(guard) => guard,
|
||||
Err(GateError::GateClosed) => return Err(Error::Cancelled),
|
||||
};
|
||||
|
||||
Ok(Entered {
|
||||
timeline,
|
||||
_gate_guard,
|
||||
})
|
||||
}
|
||||
|
||||
pub(super) async fn cancelled(&self) {
|
||||
self.cancel.cancelled().await;
|
||||
}
|
||||
|
||||
pub(super) fn is_cancelled(&self) -> bool {
|
||||
self.cancel.is_cancelled()
|
||||
}
|
||||
}
|
||||
}
|
||||
use handler_timeline::HandlerTimeline;
|
||||
|
||||
enum GetCachedTimelineForPage {
|
||||
Cancelled,
|
||||
AskLoadTimelineForPage(Key),
|
||||
}
|
||||
|
||||
struct PageServerHandler {
|
||||
@@ -437,7 +503,7 @@ impl PageServerHandler {
|
||||
cancellation_sources.extend(
|
||||
self.shard_timelines
|
||||
.values()
|
||||
.map(|ht| Either::Right(ht.timeline.cancel.cancelled())),
|
||||
.map(|ht| Either::Right(ht.cancelled())),
|
||||
);
|
||||
FuturesUnordered::from_iter(cancellation_sources)
|
||||
.next()
|
||||
@@ -447,10 +513,16 @@ impl PageServerHandler {
|
||||
/// Checking variant of [`Self::await_connection_cancelled`].
|
||||
fn is_connection_cancelled(&self) -> bool {
|
||||
task_mgr::is_shutdown_requested()
|
||||
|| self
|
||||
.shard_timelines
|
||||
.values()
|
||||
.any(|ht| ht.timeline.cancel.is_cancelled() || ht.timeline.is_stopping())
|
||||
|| self.shard_timelines.values().any(|ht| {
|
||||
if ht.is_cancelled() {
|
||||
return true;
|
||||
}
|
||||
let ht = match ht.upgrade() {
|
||||
Ok(ht) => ht,
|
||||
Err(handler_timeline::Error::Cancelled) => return true,
|
||||
};
|
||||
ht.is_stopping()
|
||||
})
|
||||
}
|
||||
|
||||
/// This function always respects cancellation of any timeline in `[Self::shard_timelines]`. Pass in
|
||||
@@ -967,7 +1039,7 @@ impl PageServerHandler {
|
||||
|
||||
let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
|
||||
let lsn = Self::wait_or_get_last_lsn(
|
||||
timeline,
|
||||
&timeline,
|
||||
req.request_lsn,
|
||||
req.not_modified_since,
|
||||
&latest_gc_cutoff_lsn,
|
||||
@@ -1000,7 +1072,7 @@ impl PageServerHandler {
|
||||
|
||||
let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
|
||||
let lsn = Self::wait_or_get_last_lsn(
|
||||
timeline,
|
||||
&timeline,
|
||||
req.request_lsn,
|
||||
req.not_modified_since,
|
||||
&latest_gc_cutoff_lsn,
|
||||
@@ -1033,7 +1105,7 @@ impl PageServerHandler {
|
||||
|
||||
let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
|
||||
let lsn = Self::wait_or_get_last_lsn(
|
||||
timeline,
|
||||
&timeline,
|
||||
req.request_lsn,
|
||||
req.not_modified_since,
|
||||
&latest_gc_cutoff_lsn,
|
||||
@@ -1056,33 +1128,53 @@ impl PageServerHandler {
|
||||
fn get_cached_timeline_for_page(
|
||||
&mut self,
|
||||
req: &PagestreamGetPageRequest,
|
||||
) -> Result<&Arc<Timeline>, Key> {
|
||||
) -> Result<handler_timeline::Entered, GetCachedTimelineForPage> {
|
||||
let key = if let Some((first_idx, first_timeline)) = self.shard_timelines.iter().next() {
|
||||
// Fastest path: single sharded case
|
||||
if first_idx.shard_count.count() == 1 {
|
||||
return Ok(&first_timeline.timeline);
|
||||
match first_timeline.upgrade() {
|
||||
Ok(tl) => return Ok(tl),
|
||||
Err(handler_timeline::Error::Cancelled) => {
|
||||
let first_idx = *first_idx;
|
||||
self.shard_timelines.remove(&first_idx);
|
||||
return Err(GetCachedTimelineForPage::Cancelled);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
let key = rel_block_to_key(req.rel, req.blkno);
|
||||
let shard_num = first_timeline
|
||||
.timeline
|
||||
.get_shard_identity()
|
||||
.get_shard_number(&key);
|
||||
|
||||
let first_timeline = match first_timeline.upgrade() {
|
||||
Ok(tl) => tl,
|
||||
Err(handler_timeline::Error::Cancelled) => {
|
||||
let first_idx = *first_idx;
|
||||
self.shard_timelines.remove(&first_idx);
|
||||
return Err(GetCachedTimelineForPage::Cancelled);
|
||||
}
|
||||
};
|
||||
|
||||
let shard_num = first_timeline.get_shard_identity().get_shard_number(&key);
|
||||
|
||||
// Fast path: matched the first timeline in our local handler map. This case is common if
|
||||
// only one shard per tenant is attached to this pageserver.
|
||||
if first_timeline.timeline.get_shard_identity().number == shard_num {
|
||||
return Ok(&first_timeline.timeline);
|
||||
if first_timeline.get_shard_identity().number == shard_num {
|
||||
return Ok(first_timeline);
|
||||
}
|
||||
|
||||
let shard_index = ShardIndex {
|
||||
shard_number: shard_num,
|
||||
shard_count: first_timeline.timeline.get_shard_identity().count,
|
||||
shard_count: first_timeline.get_shard_identity().count,
|
||||
};
|
||||
|
||||
// Fast-ish path: timeline is in the connection handler's local cache
|
||||
if let Some(found) = self.shard_timelines.get(&shard_index) {
|
||||
return Ok(&found.timeline);
|
||||
match found.upgrade() {
|
||||
Ok(tl) => return Ok(tl),
|
||||
Err(handler_timeline::Error::Cancelled) => {
|
||||
self.shard_timelines.remove(&shard_index);
|
||||
return Err(GetCachedTimelineForPage::Cancelled);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
key
|
||||
@@ -1090,7 +1182,7 @@ impl PageServerHandler {
|
||||
rel_block_to_key(req.rel, req.blkno)
|
||||
};
|
||||
|
||||
Err(key)
|
||||
Err(GetCachedTimelineForPage::AskLoadTimelineForPage(key))
|
||||
}
|
||||
|
||||
/// Having looked up the [`Timeline`] instance for a particular shard, cache it to enable
|
||||
@@ -1107,22 +1199,14 @@ impl PageServerHandler {
|
||||
fn cache_timeline(
|
||||
&mut self,
|
||||
timeline: Arc<Timeline>,
|
||||
) -> Result<&Arc<Timeline>, GetActiveTimelineError> {
|
||||
let gate_guard = timeline
|
||||
.gate
|
||||
.enter()
|
||||
.map_err(|_| GetActiveTimelineError::Tenant(GetActiveTenantError::Cancelled))?;
|
||||
|
||||
) -> Result<handler_timeline::Entered, GetActiveTimelineError> {
|
||||
let shard_index = timeline.tenant_shard_id.to_index();
|
||||
let entry = self
|
||||
.shard_timelines
|
||||
.entry(shard_index)
|
||||
.or_insert(HandlerTimeline {
|
||||
timeline,
|
||||
_guard: gate_guard,
|
||||
});
|
||||
.or_insert(HandlerTimeline::from(timeline)?);
|
||||
|
||||
Ok(&entry.timeline)
|
||||
Ok(entry.upgrade()?)
|
||||
}
|
||||
|
||||
/// If [`Self::get_cached_timeline_for_page`] missed, then this function is used to populate the cache with
|
||||
@@ -1133,7 +1217,7 @@ impl PageServerHandler {
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
key: Key,
|
||||
) -> anyhow::Result<&Arc<Timeline>, GetActiveTimelineError> {
|
||||
) -> Result<handler_timeline::Entered, GetActiveTimelineError> {
|
||||
// Slow path: we must call out to the TenantManager to find the timeline for this Key
|
||||
let timeline = self
|
||||
.get_active_tenant_timeline(tenant_id, timeline_id, ShardSelector::Page(key))
|
||||
@@ -1146,25 +1230,39 @@ impl PageServerHandler {
|
||||
&mut self,
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
) -> anyhow::Result<&Arc<Timeline>, GetActiveTimelineError> {
|
||||
// This is a borrow-checker workaround: we can't return from inside of the `if let Some` because
|
||||
// that would be an immutable-borrow-self return, whereas later in the function we will use a mutable
|
||||
// ref to salf. So instead, we first build a bool, and then return while not borrowing self.
|
||||
let have_cached = if let Some((idx, _tl)) = self.shard_timelines.iter().next() {
|
||||
idx.shard_number == ShardNumber(0)
|
||||
} else {
|
||||
false
|
||||
) -> Result<impl Deref<Target = Timeline>, GetActiveTimelineError> {
|
||||
let mut cancelled = vec![];
|
||||
let timeline = {
|
||||
let mut iter = self.shard_timelines.iter();
|
||||
loop {
|
||||
let Some((idx, tl)) = iter.next() else {
|
||||
break None;
|
||||
};
|
||||
if idx.shard_number != ShardNumber(0) {
|
||||
continue;
|
||||
}
|
||||
match tl.upgrade() {
|
||||
Ok(tl) => break Some(tl),
|
||||
Err(handler_timeline::Error::Cancelled) => {
|
||||
cancelled.push(*idx);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
if have_cached {
|
||||
let entry = self.shard_timelines.iter().next().unwrap();
|
||||
Ok(&entry.1.timeline)
|
||||
} else {
|
||||
let timeline = self
|
||||
.get_active_tenant_timeline(tenant_id, timeline_id, ShardSelector::Zero)
|
||||
.await?;
|
||||
Ok(self.cache_timeline(timeline)?)
|
||||
for idx in cancelled {
|
||||
self.shard_timelines.remove(&idx);
|
||||
}
|
||||
let timeline = match timeline {
|
||||
Some(timeline) => timeline,
|
||||
None => {
|
||||
let timeline = self
|
||||
.get_active_tenant_timeline(tenant_id, timeline_id, ShardSelector::Zero)
|
||||
.await?;
|
||||
self.cache_timeline(timeline)?
|
||||
}
|
||||
};
|
||||
Ok(timeline)
|
||||
}
|
||||
|
||||
#[instrument(skip_all, fields(shard_id))]
|
||||
@@ -1177,10 +1275,13 @@ impl PageServerHandler {
|
||||
) -> Result<PagestreamBeMessage, PageStreamError> {
|
||||
let timeline = match self.get_cached_timeline_for_page(req) {
|
||||
Ok(tl) => {
|
||||
set_tracing_field_shard_id(tl);
|
||||
set_tracing_field_shard_id(&tl);
|
||||
tl
|
||||
}
|
||||
Err(key) => {
|
||||
Err(GetCachedTimelineForPage::Cancelled) => {
|
||||
return Err(PageStreamError::Shutdown);
|
||||
}
|
||||
Err(GetCachedTimelineForPage::AskLoadTimelineForPage(key)) => {
|
||||
match self
|
||||
.load_timeline_for_page(tenant_id, timeline_id, key)
|
||||
.await
|
||||
@@ -1210,7 +1311,7 @@ impl PageServerHandler {
|
||||
|
||||
let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
|
||||
let lsn = Self::wait_or_get_last_lsn(
|
||||
timeline,
|
||||
&timeline,
|
||||
req.request_lsn,
|
||||
req.not_modified_since,
|
||||
&latest_gc_cutoff_lsn,
|
||||
@@ -1243,7 +1344,7 @@ impl PageServerHandler {
|
||||
|
||||
let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
|
||||
let lsn = Self::wait_or_get_last_lsn(
|
||||
timeline,
|
||||
&timeline,
|
||||
req.request_lsn,
|
||||
req.not_modified_since,
|
||||
&latest_gc_cutoff_lsn,
|
||||
@@ -1289,10 +1390,8 @@ impl PageServerHandler {
|
||||
|
||||
let started = std::time::Instant::now();
|
||||
|
||||
// check that the timeline exists
|
||||
let timeline = self
|
||||
.get_active_tenant_timeline(tenant_id, timeline_id, ShardSelector::Zero)
|
||||
.await?;
|
||||
let timeline = self.get_timeline_shard_zero(tenant_id, timeline_id).await?;
|
||||
|
||||
let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
|
||||
if let Some(lsn) = lsn {
|
||||
// Backup was requested at a particular LSN. Wait for it to arrive.
|
||||
@@ -1962,6 +2061,17 @@ impl From<GetActiveTimelineError> for QueryError {
|
||||
}
|
||||
}
|
||||
|
||||
impl From<handler_timeline::Error> for GetActiveTimelineError {
|
||||
fn from(e: handler_timeline::Error) -> Self {
|
||||
match e {
|
||||
handler_timeline::Error::Cancelled => {
|
||||
// XXX GetActiveTimelineError should have a Cancelled variant
|
||||
GetActiveTimelineError::Tenant(GetActiveTenantError::Cancelled)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn set_tracing_field_shard_id(timeline: &Timeline) {
|
||||
debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id();
|
||||
tracing::Span::current().record(
|
||||
|
||||
Reference in New Issue
Block a user