mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-22 23:50:39 +00:00
half-borked attempt at propagating to more parts of the implementation; page_service's shard swapping turns out to be painful / requires Handle: Clone, don't want that
This commit is contained in:
@@ -92,6 +92,7 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use once_cell::sync::Lazy;
|
||||
use tracing::warn;
|
||||
|
||||
use crate::{
|
||||
task_mgr::TaskKind,
|
||||
@@ -120,6 +121,10 @@ pub(crate) enum Scope {
|
||||
Timeline {
|
||||
timeline: Arc<Timeline>,
|
||||
},
|
||||
TimelineHandle {
|
||||
timeline_handle:
|
||||
crate::tenant::timeline::handle::Handle<crate::page_service::TenantManagerTypes>,
|
||||
},
|
||||
}
|
||||
|
||||
impl Scope {
|
||||
@@ -140,17 +145,22 @@ impl Scope {
|
||||
timeline: Arc::clone(timeline),
|
||||
}
|
||||
}
|
||||
pub(crate) fn new_timeline_handle(
|
||||
timeline_handle: crate::tenant::timeline::handle::Handle<
|
||||
crate::page_service::TenantManagerTypes,
|
||||
>,
|
||||
) -> Self {
|
||||
Scope::TimelineHandle { timeline_handle }
|
||||
}
|
||||
|
||||
pub(crate) fn io_size_metrics(&self) -> &crate::metrics::StorageIoSizeMetrics {
|
||||
match self {
|
||||
Scope::Global { io_size_metrics } => io_size_metrics,
|
||||
Scope::Tenant { tenant } => &tenant.virtual_file_io_metrics,
|
||||
Scope::Timeline { timeline } => &timeline.metrics.storage_io_size,
|
||||
Scope::TimelineHandle { timeline_handle } => &timeline_handle.metrics.storage_io_size,
|
||||
}
|
||||
}
|
||||
pub(crate) fn is_timeline(&self) -> bool {
|
||||
matches!(self, Scope::Timeline { .. })
|
||||
}
|
||||
}
|
||||
|
||||
/// The kind of access to the page cache.
|
||||
@@ -366,4 +376,32 @@ impl RequestContext {
|
||||
pub(crate) fn scope(&self) -> &Scope {
|
||||
&self.scope
|
||||
}
|
||||
|
||||
pub(crate) fn scope_mut(&mut self) -> &mut Scope {
|
||||
&mut self.scope
|
||||
}
|
||||
|
||||
pub(crate) fn assert_is_timeline_scoped(&self, what: &str) {
|
||||
if let Scope::Timeline { .. } = self.scope() {
|
||||
return;
|
||||
}
|
||||
if cfg!(debug_assertions) || cfg!(feature = "testing") {
|
||||
panic!("RequestContext must be timeline-scoped what={what}");
|
||||
} else {
|
||||
use once_cell::sync::Lazy;
|
||||
use std::sync::Mutex;
|
||||
use std::time::Duration;
|
||||
use utils::rate_limit::RateLimit;
|
||||
static LIMIT: Lazy<Mutex<RateLimit>> =
|
||||
Lazy::new(|| Mutex::new(RateLimit::new(Duration::from_secs(1))));
|
||||
let mut guard = LIMIT.lock().unwrap();
|
||||
guard.call2(|rate_limit_stats| {
|
||||
warn!(
|
||||
%rate_limit_stats,
|
||||
what,
|
||||
"RequestContext must be timeline-scoped",
|
||||
);
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3286,7 +3286,7 @@ async fn put_tenant_timeline_import_basebackup(
|
||||
|
||||
tenant.wait_to_become_active(ACTIVE_TENANT_TIMEOUT).await?;
|
||||
|
||||
let timeline = tenant
|
||||
let (timeline, timeline_ctx) = tenant
|
||||
.create_empty_timeline(timeline_id, base_lsn, pg_version, &ctx)
|
||||
.map_err(ApiError::InternalServerError)
|
||||
.await?;
|
||||
@@ -3305,7 +3305,7 @@ async fn put_tenant_timeline_import_basebackup(
|
||||
info!("importing basebackup");
|
||||
|
||||
timeline
|
||||
.import_basebackup_from_tar(tenant.clone(), &mut body, base_lsn, broker_client, &ctx)
|
||||
.import_basebackup_from_tar(tenant.clone(), &mut body, base_lsn, broker_client, &timeline_ctx)
|
||||
.await
|
||||
.map_err(ApiError::InternalServerError)?;
|
||||
|
||||
|
||||
@@ -53,7 +53,7 @@ use utils::{
|
||||
use crate::auth::check_permission;
|
||||
use crate::basebackup::BasebackupError;
|
||||
use crate::config::PageServerConf;
|
||||
use crate::context::{DownloadBehavior, RequestContext};
|
||||
use crate::context::{self, DownloadBehavior, RequestContext, RequestContextBuilder};
|
||||
use crate::metrics::{self, SmgrOpTimer};
|
||||
use crate::metrics::{ComputeCommandKind, COMPUTE_COMMANDS_COUNTERS, LIVE_CONNECTIONS};
|
||||
use crate::pgdatadir_mapping::Version;
|
||||
@@ -1086,6 +1086,16 @@ impl PageServerHandler {
|
||||
batch
|
||||
};
|
||||
|
||||
macro_rules! upgrade_handle_and_set_context {
|
||||
($shard:expr) => {{
|
||||
let shard = $shard.upgrade()?;
|
||||
let ctx = RequestContextBuilder::extend(ctx)
|
||||
.scope(context::Scope::new_timeline_handle(shard.clone()))
|
||||
.build();
|
||||
(shard, ctx)
|
||||
}};
|
||||
}
|
||||
|
||||
// invoke handler function
|
||||
let (mut handler_results, span): (
|
||||
Vec<Result<(PagestreamBeMessage, SmgrOpTimer), BatchedPageStreamError>>,
|
||||
@@ -1098,9 +1108,10 @@ impl PageServerHandler {
|
||||
req,
|
||||
} => {
|
||||
fail::fail_point!("ps::handle-pagerequest-message::exists");
|
||||
let (shard, ctx) = upgrade_handle_and_set_context!(shard);
|
||||
(
|
||||
vec![self
|
||||
.handle_get_rel_exists_request(&*shard.upgrade()?, &req, ctx)
|
||||
.handle_get_rel_exists_request(&*shard, &req, &ctx)
|
||||
.instrument(span.clone())
|
||||
.await
|
||||
.map(|msg| (msg, timer))
|
||||
@@ -1115,9 +1126,10 @@ impl PageServerHandler {
|
||||
req,
|
||||
} => {
|
||||
fail::fail_point!("ps::handle-pagerequest-message::nblocks");
|
||||
let (shard, ctx) = upgrade_handle_and_set_context!(shard);
|
||||
(
|
||||
vec![self
|
||||
.handle_get_nblocks_request(&*shard.upgrade()?, &req, ctx)
|
||||
.handle_get_nblocks_request(&*shard, &req, &ctx)
|
||||
.instrument(span.clone())
|
||||
.await
|
||||
.map(|msg| (msg, timer))
|
||||
@@ -1132,17 +1144,18 @@ impl PageServerHandler {
|
||||
pages,
|
||||
} => {
|
||||
fail::fail_point!("ps::handle-pagerequest-message::getpage");
|
||||
let (shard, ctx) = upgrade_handle_and_set_context!(shard);
|
||||
(
|
||||
{
|
||||
let npages = pages.len();
|
||||
trace!(npages, "handling getpage request");
|
||||
let res = self
|
||||
.handle_get_page_at_lsn_request_batched(
|
||||
&*shard.upgrade()?,
|
||||
&*shard,
|
||||
effective_request_lsn,
|
||||
pages,
|
||||
io_concurrency,
|
||||
ctx,
|
||||
&ctx,
|
||||
)
|
||||
.instrument(span.clone())
|
||||
.await;
|
||||
@@ -1159,9 +1172,10 @@ impl PageServerHandler {
|
||||
req,
|
||||
} => {
|
||||
fail::fail_point!("ps::handle-pagerequest-message::dbsize");
|
||||
let (shard, ctx) = upgrade_handle_and_set_context!(shard);
|
||||
(
|
||||
vec![self
|
||||
.handle_db_size_request(&*shard.upgrade()?, &req, ctx)
|
||||
.handle_db_size_request(&*shard, &req, &ctx)
|
||||
.instrument(span.clone())
|
||||
.await
|
||||
.map(|msg| (msg, timer))
|
||||
@@ -1176,9 +1190,10 @@ impl PageServerHandler {
|
||||
req,
|
||||
} => {
|
||||
fail::fail_point!("ps::handle-pagerequest-message::slrusegment");
|
||||
let (shard, ctx) = upgrade_handle_and_set_context!(shard);
|
||||
(
|
||||
vec![self
|
||||
.handle_get_slru_segment_request(&*shard.upgrade()?, &req, ctx)
|
||||
.handle_get_slru_segment_request(&*shard, &req, &ctx)
|
||||
.instrument(span.clone())
|
||||
.await
|
||||
.map(|msg| (msg, timer))
|
||||
@@ -1193,12 +1208,13 @@ impl PageServerHandler {
|
||||
requests,
|
||||
} => {
|
||||
fail::fail_point!("ps::handle-pagerequest-message::test");
|
||||
let (shard, ctx) = upgrade_handle_and_set_context!(shard);
|
||||
(
|
||||
{
|
||||
let npages = requests.len();
|
||||
trace!(npages, "handling getpage request");
|
||||
let res = self
|
||||
.handle_test_request_batch(&*shard.upgrade()?, requests, ctx)
|
||||
.handle_test_request_batch(&*shard, requests, ctx)
|
||||
.instrument(span.clone())
|
||||
.await;
|
||||
assert_eq!(res.len(), npages);
|
||||
@@ -1631,7 +1647,7 @@ impl PageServerHandler {
|
||||
//
|
||||
|
||||
let executor = pipeline_stage!("executor", self.cancel.clone(), move |cancel| {
|
||||
let ctx = ctx.attached_child();
|
||||
let mut ctx = ctx.attached_child();
|
||||
async move {
|
||||
let _cancel_batcher = cancel_batcher.drop_guard();
|
||||
loop {
|
||||
@@ -1658,7 +1674,7 @@ impl PageServerHandler {
|
||||
io_concurrency.clone(),
|
||||
&cancel,
|
||||
protocol_version,
|
||||
&ctx,
|
||||
&mut ctx,
|
||||
),
|
||||
)
|
||||
.await?;
|
||||
|
||||
@@ -1164,7 +1164,7 @@ impl Tenant {
|
||||
}
|
||||
};
|
||||
|
||||
let timeline = self.create_timeline_struct(
|
||||
let (timeline, timeline_ctx) = self.create_timeline_struct(
|
||||
timeline_id,
|
||||
&metadata,
|
||||
previous_heatmap,
|
||||
@@ -1172,6 +1172,7 @@ impl Tenant {
|
||||
resources,
|
||||
CreateTimelineCause::Load,
|
||||
idempotency.clone(),
|
||||
ctx,
|
||||
)?;
|
||||
let disk_consistent_lsn = timeline.get_disk_consistent_lsn();
|
||||
anyhow::ensure!(
|
||||
@@ -1793,6 +1794,7 @@ impl Tenant {
|
||||
timeline_id,
|
||||
&index_part.metadata,
|
||||
remote_timeline_client,
|
||||
ctx,
|
||||
)
|
||||
.instrument(tracing::info_span!("timeline_delete", %timeline_id))
|
||||
.await
|
||||
@@ -2426,8 +2428,8 @@ impl Tenant {
|
||||
new_timeline_id: TimelineId,
|
||||
initdb_lsn: Lsn,
|
||||
pg_version: u32,
|
||||
_ctx: &RequestContext,
|
||||
) -> anyhow::Result<UninitializedTimeline> {
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<(UninitializedTimeline, RequestContext)> {
|
||||
anyhow::ensure!(
|
||||
self.is_active(),
|
||||
"Cannot create empty timelines on inactive tenant"
|
||||
@@ -2461,6 +2463,7 @@ impl Tenant {
|
||||
create_guard,
|
||||
initdb_lsn,
|
||||
None,
|
||||
ctx,
|
||||
)
|
||||
.await
|
||||
}
|
||||
@@ -2478,7 +2481,7 @@ impl Tenant {
|
||||
pg_version: u32,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<Arc<Timeline>> {
|
||||
let uninit_tl = self
|
||||
let (uninit_tl, ctx) = self
|
||||
.create_empty_timeline(new_timeline_id, initdb_lsn, pg_version, ctx)
|
||||
.await?;
|
||||
let tline = uninit_tl.raw_timeline().expect("we just created it");
|
||||
@@ -2490,7 +2493,7 @@ impl Tenant {
|
||||
.init_empty_test_timeline()
|
||||
.context("init_empty_test_timeline")?;
|
||||
modification
|
||||
.commit(ctx)
|
||||
.commit(&ctx)
|
||||
.await
|
||||
.context("commit init_empty_test_timeline modification")?;
|
||||
|
||||
@@ -2762,10 +2765,9 @@ impl Tenant {
|
||||
}
|
||||
};
|
||||
|
||||
let mut uninit_timeline = {
|
||||
let (mut uninit_timeline, timeline_ctx) = {
|
||||
let this = &self;
|
||||
let initdb_lsn = Lsn(0);
|
||||
let _ctx = ctx;
|
||||
async move {
|
||||
let new_metadata = TimelineMetadata::new(
|
||||
// Initialize disk_consistent LSN to 0, The caller must import some data to
|
||||
@@ -2784,6 +2786,7 @@ impl Tenant {
|
||||
timeline_create_guard,
|
||||
initdb_lsn,
|
||||
None,
|
||||
&ctx,
|
||||
)
|
||||
.await
|
||||
}
|
||||
@@ -4130,7 +4133,8 @@ impl Tenant {
|
||||
resources: TimelineResources,
|
||||
cause: CreateTimelineCause,
|
||||
create_idempotency: CreateTimelineIdempotency,
|
||||
) -> anyhow::Result<Arc<Timeline>> {
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<(Arc<Timeline>, RequestContext)> {
|
||||
let state = match cause {
|
||||
CreateTimelineCause::Load => {
|
||||
let ancestor_id = new_metadata.ancestor_timeline();
|
||||
@@ -4164,7 +4168,11 @@ impl Tenant {
|
||||
self.cancel.child_token(),
|
||||
);
|
||||
|
||||
Ok(timeline)
|
||||
let timeline_ctx = RequestContextBuilder::extend(ctx)
|
||||
.scope(context::Scope::new_timeline(&timeline))
|
||||
.build();
|
||||
|
||||
Ok((timeline, timeline_ctx))
|
||||
}
|
||||
|
||||
/// [`Tenant::shutdown`] must be called before dropping the returned [`Tenant`] object
|
||||
@@ -4283,7 +4291,9 @@ impl Tenant {
|
||||
pagestream_throttle_metrics: Arc::new(
|
||||
crate::metrics::tenant_throttling::Pagestream::new(&tenant_shard_id),
|
||||
),
|
||||
virtual_file_io_metrics: crate::metrics::StorageIoSizeMetrics::new_tenant(&tenant_shard_id),
|
||||
virtual_file_io_metrics: crate::metrics::StorageIoSizeMetrics::new_tenant(
|
||||
&tenant_shard_id,
|
||||
),
|
||||
tenant_conf: Arc::new(ArcSwap::from_pointee(attached_conf)),
|
||||
ongoing_timeline_detach: std::sync::Mutex::default(),
|
||||
gc_block: Default::default(),
|
||||
@@ -4754,7 +4764,7 @@ impl Tenant {
|
||||
src_timeline: &Arc<Timeline>,
|
||||
dst_id: TimelineId,
|
||||
start_lsn: Option<Lsn>,
|
||||
_ctx: &RequestContext,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<CreateTimelineResult, CreateTimelineError> {
|
||||
let src_id = src_timeline.timeline_id;
|
||||
|
||||
@@ -4854,13 +4864,14 @@ impl Tenant {
|
||||
src_timeline.pg_version,
|
||||
);
|
||||
|
||||
let uninitialized_timeline = self
|
||||
let (uninitialized_timeline, _timeline_ctx) = self
|
||||
.prepare_new_timeline(
|
||||
dst_id,
|
||||
&metadata,
|
||||
timeline_create_guard,
|
||||
start_lsn + 1,
|
||||
Some(Arc::clone(src_timeline)),
|
||||
&ctx,
|
||||
)
|
||||
.await?;
|
||||
|
||||
@@ -5125,13 +5136,14 @@ impl Tenant {
|
||||
pgdata_lsn,
|
||||
pg_version,
|
||||
);
|
||||
let mut raw_timeline = self
|
||||
let (mut raw_timeline, timeline_ctx) = self
|
||||
.prepare_new_timeline(
|
||||
timeline_id,
|
||||
&new_metadata,
|
||||
timeline_create_guard,
|
||||
pgdata_lsn,
|
||||
None,
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
|
||||
@@ -5142,7 +5154,7 @@ impl Tenant {
|
||||
&unfinished_timeline,
|
||||
&pgdata_path,
|
||||
pgdata_lsn,
|
||||
ctx,
|
||||
&timeline_ctx,
|
||||
)
|
||||
.await
|
||||
.with_context(|| {
|
||||
@@ -5210,7 +5222,8 @@ impl Tenant {
|
||||
create_guard: TimelineCreateGuard,
|
||||
start_lsn: Lsn,
|
||||
ancestor: Option<Arc<Timeline>>,
|
||||
) -> anyhow::Result<UninitializedTimeline<'a>> {
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<(UninitializedTimeline<'a>, RequestContext)> {
|
||||
let tenant_shard_id = self.tenant_shard_id;
|
||||
|
||||
let resources = self.build_timeline_resources(new_timeline_id);
|
||||
@@ -5218,7 +5231,7 @@ impl Tenant {
|
||||
.remote_client
|
||||
.init_upload_queue_for_empty_remote(new_metadata)?;
|
||||
|
||||
let timeline_struct = self
|
||||
let (timeline_struct, timeline_ctx) = self
|
||||
.create_timeline_struct(
|
||||
new_timeline_id,
|
||||
new_metadata,
|
||||
@@ -5227,6 +5240,7 @@ impl Tenant {
|
||||
resources,
|
||||
CreateTimelineCause::Load,
|
||||
create_guard.idempotency.clone(),
|
||||
ctx,
|
||||
)
|
||||
.context("Failed to create timeline data structure")?;
|
||||
|
||||
@@ -5245,10 +5259,13 @@ impl Tenant {
|
||||
"Successfully created initial files for timeline {tenant_shard_id}/{new_timeline_id}"
|
||||
);
|
||||
|
||||
Ok(UninitializedTimeline::new(
|
||||
self,
|
||||
new_timeline_id,
|
||||
Some((timeline_struct, create_guard)),
|
||||
Ok((
|
||||
UninitializedTimeline::new(
|
||||
self,
|
||||
new_timeline_id,
|
||||
Some((timeline_struct, create_guard)),
|
||||
),
|
||||
timeline_ctx,
|
||||
))
|
||||
}
|
||||
|
||||
|
||||
@@ -132,6 +132,7 @@ impl EphemeralFile {
|
||||
srcbuf: &[u8],
|
||||
ctx: &RequestContext,
|
||||
) -> std::io::Result<u64> {
|
||||
ctx.assert_is_timeline_scoped("EphemeralFile::write_raw");
|
||||
let (pos, control) = self.write_raw_controlled(srcbuf, ctx).await?;
|
||||
if let Some(control) = control {
|
||||
control.release().await;
|
||||
|
||||
@@ -1131,21 +1131,6 @@ impl Timeline {
|
||||
lsn: Lsn,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<Bytes, PageReconstructError> {
|
||||
if !ctx.scope().is_timeline() {
|
||||
if cfg!(debug_assertions) || cfg!(feature = "testing") {
|
||||
panic!("get() called with RequestContext in non-timeline scope");
|
||||
} else {
|
||||
static LIMIT: Lazy<Mutex<RateLimit>> =
|
||||
Lazy::new(|| Mutex::new(RateLimit::new(Duration::from_secs(1))));
|
||||
let mut guard = LIMIT.lock().unwrap();
|
||||
guard.call2(|rate_limit_stats| {
|
||||
warn!(
|
||||
%rate_limit_stats,
|
||||
"get() called with RequestContext in non-timeline scope",
|
||||
);
|
||||
});
|
||||
}
|
||||
}
|
||||
if !lsn.is_valid() {
|
||||
return Err(PageReconstructError::Other(anyhow::anyhow!("Invalid LSN")));
|
||||
}
|
||||
@@ -1314,6 +1299,8 @@ impl Timeline {
|
||||
reconstruct_state: &mut ValuesReconstructState,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<BTreeMap<Key, Result<Bytes, PageReconstructError>>, GetVectoredError> {
|
||||
ctx.assert_is_timeline_scoped("Timeline::get_vectored_impl");
|
||||
|
||||
let read_path = if self.conf.enable_read_path_debugging || ctx.read_path_debug() {
|
||||
Some(ReadPath::new(keyspace.clone(), lsn))
|
||||
} else {
|
||||
|
||||
@@ -6,20 +6,18 @@ use std::{
|
||||
use anyhow::Context;
|
||||
use pageserver_api::{models::TimelineState, shard::TenantShardId};
|
||||
use remote_storage::DownloadError;
|
||||
use reqwest::Request;
|
||||
use tokio::sync::OwnedMutexGuard;
|
||||
use tracing::{error, info, info_span, instrument, Instrument};
|
||||
use utils::{crashsafe, fs_ext, id::TimelineId, pausable_failpoint};
|
||||
|
||||
use crate::{
|
||||
config::PageServerConf,
|
||||
task_mgr::{self, TaskKind},
|
||||
tenant::{
|
||||
config::PageServerConf, context::RequestContext, task_mgr::{self, TaskKind}, tenant::{
|
||||
metadata::TimelineMetadata,
|
||||
remote_timeline_client::{PersistIndexPartWithDeletedFlagError, RemoteTimelineClient},
|
||||
CreateTimelineCause, DeleteTimelineError, MaybeDeletedIndexPart, Tenant,
|
||||
TenantManifestError, Timeline, TimelineOrOffloaded,
|
||||
},
|
||||
virtual_file::MaybeFatalIo,
|
||||
}, virtual_file::MaybeFatalIo
|
||||
};
|
||||
|
||||
/// Mark timeline as deleted in S3 so we won't pick it up next time
|
||||
@@ -286,10 +284,11 @@ impl DeleteTimelineFlow {
|
||||
timeline_id: TimelineId,
|
||||
local_metadata: &TimelineMetadata,
|
||||
remote_client: RemoteTimelineClient,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
// Note: here we even skip populating layer map. Timeline is essentially uninitialized.
|
||||
// RemoteTimelineClient is the only functioning part.
|
||||
let timeline = tenant
|
||||
let (timeline, timeline_ctx) = tenant
|
||||
.create_timeline_struct(
|
||||
timeline_id,
|
||||
local_metadata,
|
||||
@@ -300,6 +299,7 @@ impl DeleteTimelineFlow {
|
||||
// Thus we need to skip the validation here.
|
||||
CreateTimelineCause::Delete,
|
||||
crate::tenant::CreateTimelineIdempotency::FailWithConflict, // doesn't matter what we put here
|
||||
ctx,
|
||||
)
|
||||
.context("create_timeline_struct")?;
|
||||
|
||||
|
||||
Reference in New Issue
Block a user