mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-15 01:12:56 +00:00
Add placeholders for RequestContext and friends.
This commit adds the 'ctx' parameter to all the functions that will need an active context. However, you can just create new contexts on the fly, there is no cross-checks that the tenant/timeline is still in active state. You can simply call Tenant::get_context or Timeline::get_context, and they always succee. In the next commit, we will change the functions for constructing contexts, so that you cannot create a new TenantRequestContext if the tenant is being stopped (and similarly for TimelineRequestContext). This commit isn't useful on its own, but splitting these fairly mechanical changes helps to make the next commit smaller, and thus easier to review. Because the contexts are merely passed through places, and not actually used for anything, this introduces a lot of "unused variable" warnings. They will go away with the next commit.
This commit is contained in:
@@ -27,7 +27,7 @@ use tracing::*;
|
||||
///
|
||||
use tokio_tar::{Builder, EntryType, Header};
|
||||
|
||||
use crate::tenant::Timeline;
|
||||
use crate::tenant::{Timeline, TimelineRequestContext};
|
||||
use pageserver_api::reltag::{RelTag, SlruKind};
|
||||
|
||||
use postgres_ffi::pg_constants::{DEFAULTTABLESPACE_OID, GLOBALTABLESPACE_OID};
|
||||
@@ -52,6 +52,7 @@ pub async fn send_basebackup_tarball<'a, W>(
|
||||
req_lsn: Option<Lsn>,
|
||||
prev_lsn: Option<Lsn>,
|
||||
full_backup: bool,
|
||||
ctx: &'a TimelineRequestContext,
|
||||
) -> anyhow::Result<()>
|
||||
where
|
||||
W: AsyncWrite + Send + Sync + Unpin,
|
||||
@@ -110,6 +111,7 @@ where
|
||||
lsn: backup_lsn,
|
||||
prev_record_lsn: prev_lsn,
|
||||
full_backup,
|
||||
ctx,
|
||||
};
|
||||
basebackup
|
||||
.send_tarball()
|
||||
@@ -129,6 +131,7 @@ where
|
||||
lsn: Lsn,
|
||||
prev_record_lsn: Lsn,
|
||||
full_backup: bool,
|
||||
ctx: &'a TimelineRequestContext,
|
||||
}
|
||||
|
||||
impl<'a, W> Basebackup<'a, W>
|
||||
@@ -171,23 +174,37 @@ where
|
||||
SlruKind::MultiXactOffsets,
|
||||
SlruKind::MultiXactMembers,
|
||||
] {
|
||||
for segno in self.timeline.list_slru_segments(kind, self.lsn).await? {
|
||||
for segno in self
|
||||
.timeline
|
||||
.list_slru_segments(kind, self.lsn, self.ctx)
|
||||
.await?
|
||||
{
|
||||
self.add_slru_segment(kind, segno).await?;
|
||||
}
|
||||
}
|
||||
|
||||
// Create tablespace directories
|
||||
for ((spcnode, dbnode), has_relmap_file) in self.timeline.list_dbdirs(self.lsn).await? {
|
||||
for ((spcnode, dbnode), has_relmap_file) in
|
||||
self.timeline.list_dbdirs(self.lsn, self.ctx).await?
|
||||
{
|
||||
self.add_dbdir(spcnode, dbnode, has_relmap_file).await?;
|
||||
|
||||
// Gather and send relational files in each database if full backup is requested.
|
||||
if self.full_backup {
|
||||
for rel in self.timeline.list_rels(spcnode, dbnode, self.lsn).await? {
|
||||
for rel in self
|
||||
.timeline
|
||||
.list_rels(spcnode, dbnode, self.lsn, self.ctx)
|
||||
.await?
|
||||
{
|
||||
self.add_rel(rel).await?;
|
||||
}
|
||||
}
|
||||
}
|
||||
for xid in self.timeline.list_twophase_files(self.lsn).await? {
|
||||
for xid in self
|
||||
.timeline
|
||||
.list_twophase_files(self.lsn, self.ctx)
|
||||
.await?
|
||||
{
|
||||
self.add_twophase_file(xid).await?;
|
||||
}
|
||||
|
||||
@@ -203,7 +220,10 @@ where
|
||||
}
|
||||
|
||||
async fn add_rel(&mut self, tag: RelTag) -> anyhow::Result<()> {
|
||||
let nblocks = self.timeline.get_rel_size(tag, self.lsn, false).await?;
|
||||
let nblocks = self
|
||||
.timeline
|
||||
.get_rel_size(tag, self.lsn, false, self.ctx)
|
||||
.await?;
|
||||
|
||||
// If the relation is empty, create an empty file
|
||||
if nblocks == 0 {
|
||||
@@ -218,12 +238,11 @@ where
|
||||
let mut seg = 0;
|
||||
while startblk < nblocks {
|
||||
let endblk = std::cmp::min(startblk + RELSEG_SIZE, nblocks);
|
||||
|
||||
let mut segment_data: Vec<u8> = vec![];
|
||||
for blknum in startblk..endblk {
|
||||
let img = self
|
||||
.timeline
|
||||
.get_rel_page_at_lsn(tag, blknum, self.lsn, false)
|
||||
.get_rel_page_at_lsn(tag, blknum, self.lsn, false, self.ctx)
|
||||
.await?;
|
||||
segment_data.extend_from_slice(&img[..]);
|
||||
}
|
||||
@@ -245,14 +264,14 @@ where
|
||||
async fn add_slru_segment(&mut self, slru: SlruKind, segno: u32) -> anyhow::Result<()> {
|
||||
let nblocks = self
|
||||
.timeline
|
||||
.get_slru_segment_size(slru, segno, self.lsn)
|
||||
.get_slru_segment_size(slru, segno, self.lsn, self.ctx)
|
||||
.await?;
|
||||
|
||||
let mut slru_buf: Vec<u8> = Vec::with_capacity(nblocks as usize * BLCKSZ as usize);
|
||||
for blknum in 0..nblocks {
|
||||
let img = self
|
||||
.timeline
|
||||
.get_slru_page_at_lsn(slru, segno, blknum, self.lsn)
|
||||
.get_slru_page_at_lsn(slru, segno, blknum, self.lsn, self.ctx)
|
||||
.await?;
|
||||
|
||||
if slru == SlruKind::Clog {
|
||||
@@ -287,7 +306,7 @@ where
|
||||
let relmap_img = if has_relmap_file {
|
||||
let img = self
|
||||
.timeline
|
||||
.get_relmap_file(spcnode, dbnode, self.lsn)
|
||||
.get_relmap_file(spcnode, dbnode, self.lsn, self.ctx)
|
||||
.await?;
|
||||
ensure!(img.len() == 512);
|
||||
Some(img)
|
||||
@@ -323,7 +342,7 @@ where
|
||||
if !has_relmap_file
|
||||
&& self
|
||||
.timeline
|
||||
.list_rels(spcnode, dbnode, self.lsn)
|
||||
.list_rels(spcnode, dbnode, self.lsn, self.ctx)
|
||||
.await?
|
||||
.is_empty()
|
||||
{
|
||||
@@ -356,7 +375,10 @@ where
|
||||
// Extract twophase state files
|
||||
//
|
||||
async fn add_twophase_file(&mut self, xid: TransactionId) -> anyhow::Result<()> {
|
||||
let img = self.timeline.get_twophase_file(xid, self.lsn).await?;
|
||||
let img = self
|
||||
.timeline
|
||||
.get_twophase_file(xid, self.lsn, self.ctx)
|
||||
.await?;
|
||||
|
||||
let mut buf = BytesMut::new();
|
||||
buf.extend_from_slice(&img[..]);
|
||||
@@ -394,12 +416,12 @@ where
|
||||
|
||||
let checkpoint_bytes = self
|
||||
.timeline
|
||||
.get_checkpoint(self.lsn)
|
||||
.get_checkpoint(self.lsn, self.ctx)
|
||||
.await
|
||||
.context("failed to get checkpoint bytes")?;
|
||||
let pg_control_bytes = self
|
||||
.timeline
|
||||
.get_control_file(self.lsn)
|
||||
.get_control_file(self.lsn, self.ctx)
|
||||
.await
|
||||
.context("failed get control bytes")?;
|
||||
|
||||
|
||||
@@ -197,6 +197,7 @@ pub async fn collect_metrics_task(
|
||||
// iterate through list of timelines in tenant
|
||||
for timeline in tenant.list_timelines().iter() {
|
||||
// collect per-timeline metrics only for active timelines
|
||||
let timeline_ctx = timeline.get_context();
|
||||
if timeline.is_active() {
|
||||
let timeline_written_size = u64::from(timeline.get_last_record_lsn());
|
||||
|
||||
@@ -209,7 +210,8 @@ pub async fn collect_metrics_task(
|
||||
timeline_written_size,
|
||||
));
|
||||
|
||||
let (timeline_logical_size, is_exact) = timeline.get_current_logical_size()?;
|
||||
let (timeline_logical_size, is_exact) =
|
||||
timeline.get_current_logical_size(&timeline_ctx)?;
|
||||
// Only send timeline logical size when it is fully calculated.
|
||||
if is_exact {
|
||||
current_metrics.push((
|
||||
|
||||
12
pageserver/src/context.rs
Normal file
12
pageserver/src/context.rs
Normal file
@@ -0,0 +1,12 @@
|
||||
//! Most async functions throughout the pageserver take a `ctx: &RequestContext`
|
||||
//! argument. Currently, it's just a placeholder, but in upcoming commit, it
|
||||
//! will be used for cancellation, and to ensure that a Tenant or Timeline isn't
|
||||
//! removed while there are still tasks operating on it.
|
||||
|
||||
pub struct RequestContext {}
|
||||
|
||||
impl RequestContext {
|
||||
pub fn new() -> Self {
|
||||
RequestContext {}
|
||||
}
|
||||
}
|
||||
@@ -11,9 +11,10 @@ use super::models::{
|
||||
StatusResponse, TenantConfigRequest, TenantCreateRequest, TenantCreateResponse, TenantInfo,
|
||||
TimelineCreateRequest, TimelineInfo,
|
||||
};
|
||||
use crate::context::RequestContext;
|
||||
use crate::pgdatadir_mapping::LsnForTimestamp;
|
||||
use crate::tenant::config::TenantConfOpt;
|
||||
use crate::tenant::{PageReconstructError, Timeline};
|
||||
use crate::tenant::{PageReconstructError, Timeline, TimelineRequestContext};
|
||||
use crate::{config::PageServerConf, tenant::mgr};
|
||||
use utils::{
|
||||
auth::JwtAuth,
|
||||
@@ -90,8 +91,9 @@ fn apierror_from_prerror(err: PageReconstructError) -> ApiError {
|
||||
async fn build_timeline_info(
|
||||
timeline: &Arc<Timeline>,
|
||||
include_non_incremental_logical_size: bool,
|
||||
ctx: &TimelineRequestContext,
|
||||
) -> anyhow::Result<TimelineInfo> {
|
||||
let mut info = build_timeline_info_common(timeline)?;
|
||||
let mut info = build_timeline_info_common(timeline, ctx)?;
|
||||
if include_non_incremental_logical_size {
|
||||
// XXX we should be using spawn_ondemand_logical_size_calculation here.
|
||||
// Otherwise, if someone deletes the timeline / detaches the tenant while
|
||||
@@ -101,6 +103,7 @@ async fn build_timeline_info(
|
||||
.get_current_logical_size_non_incremental(
|
||||
info.last_record_lsn,
|
||||
CancellationToken::new(),
|
||||
ctx,
|
||||
)
|
||||
.await?,
|
||||
);
|
||||
@@ -108,7 +111,10 @@ async fn build_timeline_info(
|
||||
Ok(info)
|
||||
}
|
||||
|
||||
fn build_timeline_info_common(timeline: &Arc<Timeline>) -> anyhow::Result<TimelineInfo> {
|
||||
fn build_timeline_info_common(
|
||||
timeline: &Arc<Timeline>,
|
||||
ctx: &TimelineRequestContext,
|
||||
) -> anyhow::Result<TimelineInfo> {
|
||||
let last_record_lsn = timeline.get_last_record_lsn();
|
||||
let (wal_source_connstr, last_received_msg_lsn, last_received_msg_ts) = {
|
||||
let guard = timeline.last_received_wal.lock().unwrap();
|
||||
@@ -128,7 +134,7 @@ fn build_timeline_info_common(timeline: &Arc<Timeline>) -> anyhow::Result<Timeli
|
||||
Lsn(0) => None,
|
||||
lsn @ Lsn(_) => Some(lsn),
|
||||
};
|
||||
let current_logical_size = match timeline.get_current_logical_size() {
|
||||
let current_logical_size = match timeline.get_current_logical_size(ctx) {
|
||||
Ok((size, _)) => Some(size),
|
||||
Err(err) => {
|
||||
error!("Timeline info creation failed to get current logical size: {err:?}");
|
||||
@@ -182,17 +188,20 @@ async fn timeline_create_handler(mut request: Request<Body>) -> Result<Response<
|
||||
let tenant = mgr::get_tenant(tenant_id, true)
|
||||
.await
|
||||
.map_err(ApiError::NotFound)?;
|
||||
let tenant_ctx = tenant.get_context();
|
||||
match tenant.create_timeline(
|
||||
new_timeline_id,
|
||||
request_data.ancestor_timeline_id.map(TimelineId::from),
|
||||
request_data.ancestor_start_lsn,
|
||||
request_data.pg_version.unwrap_or(crate::DEFAULT_PG_VERSION)
|
||||
request_data.pg_version.unwrap_or(crate::DEFAULT_PG_VERSION),
|
||||
&tenant_ctx,
|
||||
)
|
||||
.instrument(info_span!("timeline_create", tenant = %tenant_id, new_timeline = ?request_data.new_timeline_id, timeline_id = %new_timeline_id, lsn=?request_data.ancestor_start_lsn, pg_version=?request_data.pg_version))
|
||||
.await {
|
||||
Ok(Some(new_timeline)) => {
|
||||
// Created. Construct a TimelineInfo for it.
|
||||
let timeline_info = build_timeline_info_common(&new_timeline)
|
||||
let timeline_ctx = new_timeline.get_context();
|
||||
let timeline_info = build_timeline_info_common(&new_timeline, &timeline_ctx)
|
||||
.map_err(ApiError::InternalServerError)?;
|
||||
json_response(StatusCode::CREATED, timeline_info)
|
||||
}
|
||||
@@ -215,13 +224,15 @@ async fn timeline_list_handler(request: Request<Body>) -> Result<Response<Body>,
|
||||
|
||||
let mut response_data = Vec::with_capacity(timelines.len());
|
||||
for timeline in timelines {
|
||||
let timeline_info =
|
||||
build_timeline_info(&timeline, include_non_incremental_logical_size)
|
||||
.await
|
||||
.context(
|
||||
"Failed to convert tenant timeline {timeline_id} into the local one: {e:?}",
|
||||
)
|
||||
.map_err(ApiError::InternalServerError)?;
|
||||
let timeline_ctx = timeline.get_context();
|
||||
let timeline_info = build_timeline_info(
|
||||
&timeline,
|
||||
include_non_incremental_logical_size,
|
||||
&timeline_ctx,
|
||||
)
|
||||
.await
|
||||
.context("Failed to convert tenant timeline {timeline_id} into the local one: {e:?}")
|
||||
.map_err(ApiError::InternalServerError)?;
|
||||
|
||||
response_data.push(timeline_info);
|
||||
}
|
||||
@@ -278,11 +289,16 @@ async fn timeline_detail_handler(request: Request<Body>) -> Result<Response<Body
|
||||
let timeline = tenant
|
||||
.get_timeline(timeline_id, false)
|
||||
.map_err(ApiError::NotFound)?;
|
||||
let timeline_ctx = timeline.get_context();
|
||||
|
||||
let timeline_info = build_timeline_info(&timeline, include_non_incremental_logical_size)
|
||||
.await
|
||||
.context("Failed to get local timeline info: {e:#}")
|
||||
.map_err(ApiError::InternalServerError)?;
|
||||
let timeline_info = build_timeline_info(
|
||||
&timeline,
|
||||
include_non_incremental_logical_size,
|
||||
&timeline_ctx,
|
||||
)
|
||||
.await
|
||||
.context("Failed to get local timeline info: {e:#}")
|
||||
.map_err(ApiError::InternalServerError)?;
|
||||
|
||||
Ok::<_, ApiError>(timeline_info)
|
||||
}
|
||||
@@ -307,8 +323,9 @@ async fn get_lsn_by_timestamp_handler(request: Request<Body>) -> Result<Response
|
||||
.await
|
||||
.and_then(|tenant| tenant.get_timeline(timeline_id, true))
|
||||
.map_err(ApiError::NotFound)?;
|
||||
let timeline_ctx = timeline.get_context();
|
||||
let result = timeline
|
||||
.find_lsn_for_timestamp(timestamp_pg)
|
||||
.find_lsn_for_timestamp(timestamp_pg, &timeline_ctx)
|
||||
.await
|
||||
.map_err(apierror_from_prerror)?;
|
||||
|
||||
@@ -350,7 +367,9 @@ async fn timeline_delete_handler(request: Request<Body>) -> Result<Response<Body
|
||||
let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
|
||||
check_permission(&request, Some(tenant_id))?;
|
||||
|
||||
mgr::delete_timeline(tenant_id, timeline_id)
|
||||
let ctx = RequestContext::new();
|
||||
|
||||
mgr::delete_timeline(tenant_id, timeline_id, &ctx)
|
||||
.instrument(info_span!("timeline_delete", tenant = %tenant_id, timeline = %timeline_id))
|
||||
.await
|
||||
// FIXME: Errors from `delete_timeline` can occur for a number of reasons, incuding both
|
||||
@@ -459,10 +478,11 @@ async fn tenant_size_handler(request: Request<Body>) -> Result<Response<Body>, A
|
||||
let tenant = mgr::get_tenant(tenant_id, true)
|
||||
.await
|
||||
.map_err(ApiError::InternalServerError)?;
|
||||
let ctx = tenant.get_context();
|
||||
|
||||
// this can be long operation, it currently is not backed by any request coalescing or similar
|
||||
let inputs = tenant
|
||||
.gather_size_inputs()
|
||||
.gather_size_inputs(&ctx)
|
||||
.await
|
||||
.map_err(ApiError::InternalServerError)?;
|
||||
|
||||
@@ -505,6 +525,8 @@ fn bad_duration<'a>(field_name: &'static str, value: &'a str) -> impl 'a + Fn()
|
||||
async fn tenant_create_handler(mut request: Request<Body>) -> Result<Response<Body>, ApiError> {
|
||||
check_permission(&request, None)?;
|
||||
|
||||
let ctx = RequestContext::new();
|
||||
|
||||
let request_data: TenantCreateRequest = json_request(&mut request).await?;
|
||||
|
||||
let mut tenant_conf = TenantConfOpt::default();
|
||||
@@ -593,7 +615,7 @@ async fn tenant_create_handler(mut request: Request<Body>) -> Result<Response<Bo
|
||||
Some(tenant) => {
|
||||
// We created the tenant. Existing API semantics are that the tenant
|
||||
// is Active when this function returns.
|
||||
if let res @ Err(_) = tenant.wait_to_become_active().await {
|
||||
if let res @ Err(_) = tenant.wait_to_become_active(&ctx).await {
|
||||
// This shouldn't happen because we just created the tenant directory
|
||||
// in tenant::mgr::create_tenant, and there aren't any remote timelines
|
||||
// to load, so, nothing can really fail during load.
|
||||
@@ -617,6 +639,8 @@ async fn tenant_config_handler(mut request: Request<Body>) -> Result<Response<Bo
|
||||
let tenant_id = request_data.tenant_id;
|
||||
check_permission(&request, Some(tenant_id))?;
|
||||
|
||||
let ctx = RequestContext::new();
|
||||
|
||||
let mut tenant_conf: TenantConfOpt = Default::default();
|
||||
if let Some(gc_period) = request_data.gc_period {
|
||||
tenant_conf.gc_period = Some(
|
||||
@@ -679,7 +703,7 @@ async fn tenant_config_handler(mut request: Request<Body>) -> Result<Response<Bo
|
||||
}
|
||||
|
||||
let state = get_state(&request);
|
||||
mgr::update_tenant_config(state.conf, tenant_conf, tenant_id)
|
||||
mgr::update_tenant_config(state.conf, tenant_conf, tenant_id, &ctx)
|
||||
.instrument(info_span!("tenant_config", tenant = ?tenant_id))
|
||||
.await
|
||||
// FIXME: `update_tenant_config` can fail because of both user and internal errors.
|
||||
@@ -775,12 +799,13 @@ async fn timeline_checkpoint_handler(request: Request<Body>) -> Result<Response<
|
||||
let timeline = tenant
|
||||
.get_timeline(timeline_id, true)
|
||||
.map_err(ApiError::NotFound)?;
|
||||
let ctx = timeline.get_context();
|
||||
timeline
|
||||
.freeze_and_flush()
|
||||
.await
|
||||
.map_err(ApiError::InternalServerError)?;
|
||||
timeline
|
||||
.compact()
|
||||
.compact(&ctx)
|
||||
.await
|
||||
.map_err(ApiError::InternalServerError)?;
|
||||
|
||||
@@ -800,7 +825,8 @@ async fn timeline_download_remote_layers_handler_post(
|
||||
let timeline = tenant
|
||||
.get_timeline(timeline_id, true)
|
||||
.map_err(ApiError::NotFound)?;
|
||||
match timeline.spawn_download_all_remote_layers().await {
|
||||
let ctx = timeline.get_context();
|
||||
match timeline.spawn_download_all_remote_layers(&ctx).await {
|
||||
Ok(st) => json_response(StatusCode::ACCEPTED, st),
|
||||
Err(st) => json_response(StatusCode::CONFLICT, st),
|
||||
}
|
||||
|
||||
@@ -13,7 +13,7 @@ use tracing::*;
|
||||
use walkdir::WalkDir;
|
||||
|
||||
use crate::pgdatadir_mapping::*;
|
||||
use crate::tenant::Timeline;
|
||||
use crate::tenant::{Timeline, TimelineRequestContext};
|
||||
use crate::walingest::WalIngest;
|
||||
use crate::walrecord::DecodedWALRecord;
|
||||
use pageserver_api::reltag::{RelTag, SlruKind};
|
||||
@@ -47,6 +47,7 @@ pub async fn import_timeline_from_postgres_datadir(
|
||||
tline: &Timeline,
|
||||
pgdata_path: &Path,
|
||||
pgdata_lsn: Lsn,
|
||||
ctx: &TimelineRequestContext,
|
||||
) -> Result<()> {
|
||||
let mut pg_control: Option<ControlFileData> = None;
|
||||
|
||||
@@ -69,7 +70,7 @@ pub async fn import_timeline_from_postgres_datadir(
|
||||
let mut file = tokio::fs::File::open(absolute_path).await?;
|
||||
let len = metadata.len() as usize;
|
||||
if let Some(control_file) =
|
||||
import_file(&mut modification, relative_path, &mut file, len).await?
|
||||
import_file(&mut modification, relative_path, &mut file, len, ctx).await?
|
||||
{
|
||||
pg_control = Some(control_file);
|
||||
}
|
||||
@@ -99,6 +100,7 @@ pub async fn import_timeline_from_postgres_datadir(
|
||||
tline,
|
||||
Lsn(pg_control.checkPointCopy.redo),
|
||||
pgdata_lsn,
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
|
||||
@@ -113,6 +115,7 @@ async fn import_rel(
|
||||
dboid: Oid,
|
||||
reader: &mut (impl AsyncRead + Send + Sync + Unpin),
|
||||
len: usize,
|
||||
ctx: &TimelineRequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
// Does it look like a relation file?
|
||||
trace!("importing rel file {}", path.display());
|
||||
@@ -147,7 +150,10 @@ async fn import_rel(
|
||||
// FIXME: use proper error type for this, instead of parsing the error message.
|
||||
// Or better yet, keep track of which relations we've already created
|
||||
// https://github.com/neondatabase/neon/issues/3309
|
||||
if let Err(e) = modification.put_rel_creation(rel, nblocks as u32).await {
|
||||
if let Err(e) = modification
|
||||
.put_rel_creation(rel, nblocks as u32, ctx)
|
||||
.await
|
||||
{
|
||||
if e.to_string().contains("already exists") {
|
||||
debug!("relation {} already exists. we must be extending it", rel);
|
||||
} else {
|
||||
@@ -182,7 +188,7 @@ async fn import_rel(
|
||||
//
|
||||
// If we process rel segments out of order,
|
||||
// put_rel_extend will skip the update.
|
||||
modification.put_rel_extend(rel, blknum).await?;
|
||||
modification.put_rel_extend(rel, blknum, ctx).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -195,6 +201,7 @@ async fn import_slru(
|
||||
path: &Path,
|
||||
reader: &mut (impl AsyncRead + Send + Sync + Unpin),
|
||||
len: usize,
|
||||
ctx: &TimelineRequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
info!("importing slru file {path:?}");
|
||||
|
||||
@@ -211,7 +218,7 @@ async fn import_slru(
|
||||
ensure!(nblocks <= pg_constants::SLRU_PAGES_PER_SEGMENT as usize);
|
||||
|
||||
modification
|
||||
.put_slru_segment_creation(slru, segno, nblocks as u32)
|
||||
.put_slru_segment_creation(slru, segno, nblocks as u32, ctx)
|
||||
.await?;
|
||||
|
||||
let mut rpageno = 0;
|
||||
@@ -252,6 +259,7 @@ async fn import_wal(
|
||||
tline: &Timeline,
|
||||
startpoint: Lsn,
|
||||
endpoint: Lsn,
|
||||
ctx: &TimelineRequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
use std::io::Read;
|
||||
let mut waldecoder = WalStreamDecoder::new(startpoint, tline.pg_version);
|
||||
@@ -260,7 +268,7 @@ async fn import_wal(
|
||||
let mut offset = startpoint.segment_offset(WAL_SEGMENT_SIZE);
|
||||
let mut last_lsn = startpoint;
|
||||
|
||||
let mut walingest = WalIngest::new(tline, startpoint).await?;
|
||||
let mut walingest = WalIngest::new(tline, startpoint, ctx).await?;
|
||||
|
||||
while last_lsn <= endpoint {
|
||||
// FIXME: assume postgresql tli 1 for now
|
||||
@@ -297,7 +305,7 @@ async fn import_wal(
|
||||
while last_lsn <= endpoint {
|
||||
if let Some((lsn, recdata)) = waldecoder.poll_decode()? {
|
||||
walingest
|
||||
.ingest_record(recdata, lsn, &mut modification, &mut decoded)
|
||||
.ingest_record(recdata, lsn, &mut modification, &mut decoded, ctx)
|
||||
.await?;
|
||||
last_lsn = lsn;
|
||||
|
||||
@@ -326,6 +334,7 @@ pub async fn import_basebackup_from_tar(
|
||||
tline: &Timeline,
|
||||
reader: &mut (impl AsyncRead + Send + Sync + Unpin),
|
||||
base_lsn: Lsn,
|
||||
ctx: &TimelineRequestContext,
|
||||
) -> Result<()> {
|
||||
info!("importing base at {base_lsn}");
|
||||
let mut modification = tline.begin_modification(base_lsn);
|
||||
@@ -344,7 +353,7 @@ pub async fn import_basebackup_from_tar(
|
||||
match header.entry_type() {
|
||||
tokio_tar::EntryType::Regular => {
|
||||
if let Some(res) =
|
||||
import_file(&mut modification, file_path.as_ref(), &mut entry, len).await?
|
||||
import_file(&mut modification, file_path.as_ref(), &mut entry, len, ctx).await?
|
||||
{
|
||||
// We found the pg_control file.
|
||||
pg_control = Some(res);
|
||||
@@ -376,13 +385,14 @@ pub async fn import_wal_from_tar(
|
||||
reader: &mut (impl AsyncRead + Send + Sync + Unpin),
|
||||
start_lsn: Lsn,
|
||||
end_lsn: Lsn,
|
||||
ctx: &TimelineRequestContext,
|
||||
) -> Result<()> {
|
||||
// Set up walingest mutable state
|
||||
let mut waldecoder = WalStreamDecoder::new(start_lsn, tline.pg_version);
|
||||
let mut segno = start_lsn.segment_number(WAL_SEGMENT_SIZE);
|
||||
let mut offset = start_lsn.segment_offset(WAL_SEGMENT_SIZE);
|
||||
let mut last_lsn = start_lsn;
|
||||
let mut walingest = WalIngest::new(tline, start_lsn).await?;
|
||||
let mut walingest = WalIngest::new(tline, start_lsn, ctx).await?;
|
||||
|
||||
// Ingest wal until end_lsn
|
||||
info!("importing wal until {}", end_lsn);
|
||||
@@ -431,7 +441,7 @@ pub async fn import_wal_from_tar(
|
||||
while last_lsn <= end_lsn {
|
||||
if let Some((lsn, recdata)) = waldecoder.poll_decode()? {
|
||||
walingest
|
||||
.ingest_record(recdata, lsn, &mut modification, &mut decoded)
|
||||
.ingest_record(recdata, lsn, &mut modification, &mut decoded, ctx)
|
||||
.await?;
|
||||
last_lsn = lsn;
|
||||
|
||||
@@ -466,6 +476,7 @@ async fn import_file(
|
||||
file_path: &Path,
|
||||
reader: &mut (impl AsyncRead + Send + Sync + Unpin),
|
||||
len: usize,
|
||||
ctx: &TimelineRequestContext,
|
||||
) -> Result<Option<ControlFileData>> {
|
||||
let file_name = match file_path.file_name() {
|
||||
Some(name) => name.to_string_lossy(),
|
||||
@@ -498,14 +509,16 @@ async fn import_file(
|
||||
}
|
||||
"pg_filenode.map" => {
|
||||
let bytes = read_all_bytes(reader).await?;
|
||||
modification.put_relmap_file(spcnode, dbnode, bytes).await?;
|
||||
modification
|
||||
.put_relmap_file(spcnode, dbnode, bytes, ctx)
|
||||
.await?;
|
||||
debug!("imported relmap file")
|
||||
}
|
||||
"PG_VERSION" => {
|
||||
debug!("ignored PG_VERSION file");
|
||||
}
|
||||
_ => {
|
||||
import_rel(modification, file_path, spcnode, dbnode, reader, len).await?;
|
||||
import_rel(modification, file_path, spcnode, dbnode, reader, len, ctx).await?;
|
||||
debug!("imported rel creation");
|
||||
}
|
||||
}
|
||||
@@ -521,38 +534,40 @@ async fn import_file(
|
||||
match file_name.as_ref() {
|
||||
"pg_filenode.map" => {
|
||||
let bytes = read_all_bytes(reader).await?;
|
||||
modification.put_relmap_file(spcnode, dbnode, bytes).await?;
|
||||
modification
|
||||
.put_relmap_file(spcnode, dbnode, bytes, ctx)
|
||||
.await?;
|
||||
debug!("imported relmap file")
|
||||
}
|
||||
"PG_VERSION" => {
|
||||
debug!("ignored PG_VERSION file");
|
||||
}
|
||||
_ => {
|
||||
import_rel(modification, file_path, spcnode, dbnode, reader, len).await?;
|
||||
import_rel(modification, file_path, spcnode, dbnode, reader, len, ctx).await?;
|
||||
debug!("imported rel creation");
|
||||
}
|
||||
}
|
||||
} else if file_path.starts_with("pg_xact") {
|
||||
let slru = SlruKind::Clog;
|
||||
|
||||
import_slru(modification, slru, file_path, reader, len).await?;
|
||||
import_slru(modification, slru, file_path, reader, len, ctx).await?;
|
||||
debug!("imported clog slru");
|
||||
} else if file_path.starts_with("pg_multixact/offsets") {
|
||||
let slru = SlruKind::MultiXactOffsets;
|
||||
|
||||
import_slru(modification, slru, file_path, reader, len).await?;
|
||||
import_slru(modification, slru, file_path, reader, len, ctx).await?;
|
||||
debug!("imported multixact offsets slru");
|
||||
} else if file_path.starts_with("pg_multixact/members") {
|
||||
let slru = SlruKind::MultiXactMembers;
|
||||
|
||||
import_slru(modification, slru, file_path, reader, len).await?;
|
||||
import_slru(modification, slru, file_path, reader, len, ctx).await?;
|
||||
debug!("imported multixact members slru");
|
||||
} else if file_path.starts_with("pg_twophase") {
|
||||
let xid = u32::from_str_radix(file_name.as_ref(), 16)?;
|
||||
|
||||
let bytes = read_all_bytes(reader).await?;
|
||||
modification
|
||||
.put_twophase_file(xid, Bytes::copy_from_slice(&bytes[..]))
|
||||
.put_twophase_file(xid, Bytes::copy_from_slice(&bytes[..]), ctx)
|
||||
.await?;
|
||||
debug!("imported twophase file");
|
||||
} else if file_path.starts_with("pg_wal") {
|
||||
|
||||
@@ -2,6 +2,7 @@ mod auth;
|
||||
pub mod basebackup;
|
||||
pub mod config;
|
||||
pub mod consumption_metrics;
|
||||
pub mod context;
|
||||
pub mod http;
|
||||
pub mod import_datadir;
|
||||
pub mod keyspace;
|
||||
|
||||
@@ -43,18 +43,22 @@ use utils::{
|
||||
use crate::auth::check_permission;
|
||||
use crate::basebackup;
|
||||
use crate::config::PageServerConf;
|
||||
use crate::context::RequestContext;
|
||||
use crate::import_datadir::import_wal_from_tar;
|
||||
use crate::metrics::{LIVE_CONNECTIONS_COUNT, SMGR_QUERY_TIME};
|
||||
use crate::task_mgr;
|
||||
use crate::task_mgr::TaskKind;
|
||||
use crate::tenant::mgr;
|
||||
use crate::tenant::{Tenant, Timeline};
|
||||
use crate::tenant::{Tenant, Timeline, TimelineRequestContext};
|
||||
use crate::trace::Tracer;
|
||||
|
||||
use postgres_ffi::pg_constants::DEFAULTTABLESPACE_OID;
|
||||
use postgres_ffi::BLCKSZ;
|
||||
|
||||
fn copyin_stream(pgb: &mut PostgresBackend) -> impl Stream<Item = io::Result<Bytes>> + '_ {
|
||||
fn copyin_stream<'a>(
|
||||
pgb: &'a mut PostgresBackend,
|
||||
ctx: &'a RequestContext,
|
||||
) -> impl Stream<Item = io::Result<Bytes>> + 'a {
|
||||
async_stream::try_stream! {
|
||||
loop {
|
||||
let msg = tokio::select! {
|
||||
@@ -146,6 +150,8 @@ pub async fn libpq_listener_main(
|
||||
debug!("accepted connection from {}", peer_addr);
|
||||
let local_auth = auth.clone();
|
||||
|
||||
let connection_ctx = RequestContext::new();
|
||||
|
||||
// PageRequestHandler tasks are not associated with any particular
|
||||
// timeline in the task manager. In practice most connections will
|
||||
// only deal with a particular timeline, but we don't know which one
|
||||
@@ -157,7 +163,7 @@ pub async fn libpq_listener_main(
|
||||
None,
|
||||
"serving compute connection task",
|
||||
false,
|
||||
page_service_conn_main(conf, local_auth, socket, auth_type),
|
||||
page_service_conn_main(conf, local_auth, socket, auth_type, connection_ctx),
|
||||
);
|
||||
}
|
||||
Err(err) => {
|
||||
@@ -177,6 +183,7 @@ async fn page_service_conn_main(
|
||||
auth: Option<Arc<JwtAuth>>,
|
||||
socket: tokio::net::TcpStream,
|
||||
auth_type: AuthType,
|
||||
connection_ctx: RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
// Immediately increment the gauge, then create a job to decrement it on task exit.
|
||||
// One of the pros of `defer!` is that this will *most probably*
|
||||
@@ -191,7 +198,7 @@ async fn page_service_conn_main(
|
||||
.set_nodelay(true)
|
||||
.context("could not set TCP_NODELAY")?;
|
||||
|
||||
let mut conn_handler = PageServerHandler::new(conf, auth);
|
||||
let mut conn_handler = PageServerHandler::new(conf, auth, connection_ctx);
|
||||
let pgbackend = PostgresBackend::new(socket, auth_type, None)?;
|
||||
|
||||
let result = pgbackend
|
||||
@@ -255,20 +262,27 @@ struct PageServerHandler {
|
||||
_conf: &'static PageServerConf,
|
||||
auth: Option<Arc<JwtAuth>>,
|
||||
claims: Option<Claims>,
|
||||
|
||||
connection_ctx: RequestContext,
|
||||
}
|
||||
|
||||
impl PageServerHandler {
|
||||
pub fn new(conf: &'static PageServerConf, auth: Option<Arc<JwtAuth>>) -> Self {
|
||||
pub fn new(
|
||||
conf: &'static PageServerConf,
|
||||
auth: Option<Arc<JwtAuth>>,
|
||||
connection_ctx: RequestContext,
|
||||
) -> Self {
|
||||
PageServerHandler {
|
||||
_conf: conf,
|
||||
auth,
|
||||
claims: None,
|
||||
connection_ctx,
|
||||
}
|
||||
}
|
||||
|
||||
#[instrument(skip(self, pgb))]
|
||||
async fn handle_pagerequests(
|
||||
&self,
|
||||
&mut self,
|
||||
pgb: &mut PostgresBackend,
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
@@ -278,7 +292,7 @@ impl PageServerHandler {
|
||||
task_mgr::associate_with(Some(tenant_id), Some(timeline_id));
|
||||
|
||||
// Make request tracer if needed
|
||||
let tenant = get_active_tenant_with_timeout(tenant_id).await?;
|
||||
let tenant = get_active_tenant_with_timeout(tenant_id, &self.connection_ctx).await?;
|
||||
let mut tracer = if tenant.get_trace_read_requests() {
|
||||
let connection_id = ConnectionId::generate();
|
||||
let path = tenant
|
||||
@@ -291,6 +305,7 @@ impl PageServerHandler {
|
||||
|
||||
// Check that the timeline exists
|
||||
let timeline = tenant.get_timeline(timeline_id, true)?;
|
||||
let ctx = timeline.get_context();
|
||||
|
||||
// switch client to COPYBOTH
|
||||
pgb.write_message(&BeMessage::CopyBothResponse)?;
|
||||
@@ -332,19 +347,21 @@ impl PageServerHandler {
|
||||
let response = match neon_fe_msg {
|
||||
PagestreamFeMessage::Exists(req) => {
|
||||
let _timer = metrics.get_rel_exists.start_timer();
|
||||
self.handle_get_rel_exists_request(&timeline, &req).await
|
||||
self.handle_get_rel_exists_request(&timeline, &req, &ctx)
|
||||
.await
|
||||
}
|
||||
PagestreamFeMessage::Nblocks(req) => {
|
||||
let _timer = metrics.get_rel_size.start_timer();
|
||||
self.handle_get_nblocks_request(&timeline, &req).await
|
||||
self.handle_get_nblocks_request(&timeline, &req, &ctx).await
|
||||
}
|
||||
PagestreamFeMessage::GetPage(req) => {
|
||||
let _timer = metrics.get_page_at_lsn.start_timer();
|
||||
self.handle_get_page_at_lsn_request(&timeline, &req).await
|
||||
self.handle_get_page_at_lsn_request(&timeline, &req, &ctx)
|
||||
.await
|
||||
}
|
||||
PagestreamFeMessage::DbSize(req) => {
|
||||
let _timer = metrics.get_db_size.start_timer();
|
||||
self.handle_db_size_request(&timeline, &req).await
|
||||
self.handle_db_size_request(&timeline, &req, &ctx).await
|
||||
}
|
||||
};
|
||||
|
||||
@@ -376,8 +393,10 @@ impl PageServerHandler {
|
||||
task_mgr::associate_with(Some(tenant_id), Some(timeline_id));
|
||||
// Create empty timeline
|
||||
info!("creating new timeline");
|
||||
let tenant = get_active_tenant_with_timeout(tenant_id).await?;
|
||||
let timeline = tenant.create_empty_timeline(timeline_id, base_lsn, pg_version)?;
|
||||
let tenant = get_active_tenant_with_timeout(tenant_id, &self.connection_ctx).await?;
|
||||
let tenant_ctx = tenant.get_context();
|
||||
let (timeline, ctx) =
|
||||
tenant.create_empty_timeline(timeline_id, base_lsn, pg_version, &tenant_ctx)?;
|
||||
|
||||
// TODO mark timeline as not ready until it reaches end_lsn.
|
||||
// We might have some wal to import as well, and we should prevent compute
|
||||
@@ -394,9 +413,9 @@ impl PageServerHandler {
|
||||
pgb.write_message(&BeMessage::CopyInResponse)?;
|
||||
pgb.flush().await?;
|
||||
|
||||
let mut copyin_stream = Box::pin(copyin_stream(pgb));
|
||||
let mut copyin_stream = Box::pin(copyin_stream(pgb, &ctx));
|
||||
timeline
|
||||
.import_basebackup_from_tar(&mut copyin_stream, base_lsn)
|
||||
.import_basebackup_from_tar(&mut copyin_stream, base_lsn, &ctx)
|
||||
.await?;
|
||||
|
||||
// Drain the rest of the Copy data
|
||||
@@ -429,7 +448,9 @@ impl PageServerHandler {
|
||||
) -> Result<(), QueryError> {
|
||||
task_mgr::associate_with(Some(tenant_id), Some(timeline_id));
|
||||
|
||||
let timeline = get_active_timeline_with_timeout(tenant_id, timeline_id).await?;
|
||||
let timeline =
|
||||
get_active_timeline_with_timeout(tenant_id, timeline_id, &self.connection_ctx).await?;
|
||||
let ctx = timeline.get_context();
|
||||
let last_record_lsn = timeline.get_last_record_lsn();
|
||||
if last_record_lsn != start_lsn {
|
||||
return Err(QueryError::Other(
|
||||
@@ -444,9 +465,9 @@ impl PageServerHandler {
|
||||
info!("importing wal");
|
||||
pgb.write_message(&BeMessage::CopyInResponse)?;
|
||||
pgb.flush().await?;
|
||||
let mut copyin_stream = Box::pin(copyin_stream(pgb));
|
||||
let mut copyin_stream = Box::pin(copyin_stream(pgb, &ctx));
|
||||
let mut reader = tokio_util::io::StreamReader::new(&mut copyin_stream);
|
||||
import_wal_from_tar(&timeline, &mut reader, start_lsn, end_lsn).await?;
|
||||
import_wal_from_tar(&timeline, &mut reader, start_lsn, end_lsn, &ctx).await?;
|
||||
info!("wal import complete");
|
||||
|
||||
// Drain the rest of the Copy data
|
||||
@@ -492,6 +513,7 @@ impl PageServerHandler {
|
||||
mut lsn: Lsn,
|
||||
latest: bool,
|
||||
latest_gc_cutoff_lsn: &RcuReadGuard<Lsn>,
|
||||
ctx: &TimelineRequestContext,
|
||||
) -> anyhow::Result<Lsn> {
|
||||
if latest {
|
||||
// Latest page version was requested. If LSN is given, it is a hint
|
||||
@@ -515,7 +537,7 @@ impl PageServerHandler {
|
||||
if lsn <= last_record_lsn {
|
||||
lsn = last_record_lsn;
|
||||
} else {
|
||||
timeline.wait_lsn(lsn).await?;
|
||||
timeline.wait_lsn(lsn, ctx).await?;
|
||||
// Since we waited for 'lsn' to arrive, that is now the last
|
||||
// record LSN. (Or close enough for our purposes; the
|
||||
// last-record LSN can advance immediately after we return
|
||||
@@ -525,7 +547,7 @@ impl PageServerHandler {
|
||||
if lsn == Lsn(0) {
|
||||
anyhow::bail!("invalid LSN(0) in request");
|
||||
}
|
||||
timeline.wait_lsn(lsn).await?;
|
||||
timeline.wait_lsn(lsn, ctx).await?;
|
||||
}
|
||||
anyhow::ensure!(
|
||||
lsn >= **latest_gc_cutoff_lsn,
|
||||
@@ -535,52 +557,60 @@ impl PageServerHandler {
|
||||
Ok(lsn)
|
||||
}
|
||||
|
||||
#[instrument(skip(self, timeline, req), fields(rel = %req.rel, req_lsn = %req.lsn))]
|
||||
#[instrument(skip(self, timeline, req, ctx), fields(rel = %req.rel, req_lsn = %req.lsn))]
|
||||
async fn handle_get_rel_exists_request(
|
||||
&self,
|
||||
timeline: &Timeline,
|
||||
req: &PagestreamExistsRequest,
|
||||
ctx: &TimelineRequestContext,
|
||||
) -> anyhow::Result<PagestreamBeMessage> {
|
||||
let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
|
||||
let lsn = Self::wait_or_get_last_lsn(timeline, req.lsn, req.latest, &latest_gc_cutoff_lsn)
|
||||
.await?;
|
||||
let lsn =
|
||||
Self::wait_or_get_last_lsn(timeline, req.lsn, req.latest, &latest_gc_cutoff_lsn, ctx)
|
||||
.await?;
|
||||
|
||||
let exists = timeline.get_rel_exists(req.rel, lsn, req.latest).await?;
|
||||
let exists = timeline
|
||||
.get_rel_exists(req.rel, lsn, req.latest, ctx)
|
||||
.await?;
|
||||
|
||||
Ok(PagestreamBeMessage::Exists(PagestreamExistsResponse {
|
||||
exists,
|
||||
}))
|
||||
}
|
||||
|
||||
#[instrument(skip(self, timeline, req), fields(rel = %req.rel, req_lsn = %req.lsn))]
|
||||
#[instrument(skip(self, timeline, req, ctx), fields(rel = %req.rel, req_lsn = %req.lsn))]
|
||||
async fn handle_get_nblocks_request(
|
||||
&self,
|
||||
timeline: &Timeline,
|
||||
req: &PagestreamNblocksRequest,
|
||||
ctx: &TimelineRequestContext,
|
||||
) -> anyhow::Result<PagestreamBeMessage> {
|
||||
let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
|
||||
let lsn = Self::wait_or_get_last_lsn(timeline, req.lsn, req.latest, &latest_gc_cutoff_lsn)
|
||||
.await?;
|
||||
let lsn =
|
||||
Self::wait_or_get_last_lsn(timeline, req.lsn, req.latest, &latest_gc_cutoff_lsn, ctx)
|
||||
.await?;
|
||||
|
||||
let n_blocks = timeline.get_rel_size(req.rel, lsn, req.latest).await?;
|
||||
let n_blocks = timeline.get_rel_size(req.rel, lsn, req.latest, ctx).await?;
|
||||
|
||||
Ok(PagestreamBeMessage::Nblocks(PagestreamNblocksResponse {
|
||||
n_blocks,
|
||||
}))
|
||||
}
|
||||
|
||||
#[instrument(skip(self, timeline, req), fields(dbnode = %req.dbnode, req_lsn = %req.lsn))]
|
||||
#[instrument(skip(self, timeline, req, ctx), fields(dbnode = %req.dbnode, req_lsn = %req.lsn))]
|
||||
async fn handle_db_size_request(
|
||||
&self,
|
||||
timeline: &Timeline,
|
||||
req: &PagestreamDbSizeRequest,
|
||||
ctx: &TimelineRequestContext,
|
||||
) -> anyhow::Result<PagestreamBeMessage> {
|
||||
let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
|
||||
let lsn = Self::wait_or_get_last_lsn(timeline, req.lsn, req.latest, &latest_gc_cutoff_lsn)
|
||||
.await?;
|
||||
let lsn =
|
||||
Self::wait_or_get_last_lsn(timeline, req.lsn, req.latest, &latest_gc_cutoff_lsn, ctx)
|
||||
.await?;
|
||||
|
||||
let total_blocks = timeline
|
||||
.get_db_size(DEFAULTTABLESPACE_OID, req.dbnode, lsn, req.latest)
|
||||
.get_db_size(DEFAULTTABLESPACE_OID, req.dbnode, lsn, req.latest, ctx)
|
||||
.await?;
|
||||
let db_size = total_blocks as i64 * BLCKSZ as i64;
|
||||
|
||||
@@ -589,15 +619,17 @@ impl PageServerHandler {
|
||||
}))
|
||||
}
|
||||
|
||||
#[instrument(skip(self, timeline, req), fields(rel = %req.rel, blkno = %req.blkno, req_lsn = %req.lsn))]
|
||||
#[instrument(skip(self, timeline, req, ctx), fields(rel = %req.rel, blkno = %req.blkno, req_lsn = %req.lsn))]
|
||||
async fn handle_get_page_at_lsn_request(
|
||||
&self,
|
||||
timeline: &Timeline,
|
||||
req: &PagestreamGetPageRequest,
|
||||
ctx: &TimelineRequestContext,
|
||||
) -> anyhow::Result<PagestreamBeMessage> {
|
||||
let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
|
||||
let lsn = Self::wait_or_get_last_lsn(timeline, req.lsn, req.latest, &latest_gc_cutoff_lsn)
|
||||
.await?;
|
||||
let lsn =
|
||||
Self::wait_or_get_last_lsn(timeline, req.lsn, req.latest, &latest_gc_cutoff_lsn, ctx)
|
||||
.await?;
|
||||
/*
|
||||
// Add a 1s delay to some requests. The delay helps the requests to
|
||||
// hit the race condition from github issue #1047 more easily.
|
||||
@@ -608,7 +640,7 @@ impl PageServerHandler {
|
||||
*/
|
||||
|
||||
let page = timeline
|
||||
.get_rel_page_at_lsn(req.rel, req.blkno, lsn, req.latest)
|
||||
.get_rel_page_at_lsn(req.rel, req.blkno, lsn, req.latest, ctx)
|
||||
.await?;
|
||||
|
||||
Ok(PagestreamBeMessage::GetPage(PagestreamGetPageResponse {
|
||||
@@ -618,7 +650,7 @@ impl PageServerHandler {
|
||||
|
||||
#[instrument(skip(self, pgb))]
|
||||
async fn handle_basebackup_request(
|
||||
&self,
|
||||
&mut self,
|
||||
pgb: &mut PostgresBackend,
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
@@ -627,12 +659,15 @@ impl PageServerHandler {
|
||||
full_backup: bool,
|
||||
) -> anyhow::Result<()> {
|
||||
// check that the timeline exists
|
||||
let timeline = get_active_timeline_with_timeout(tenant_id, timeline_id).await?;
|
||||
let timeline =
|
||||
get_active_timeline_with_timeout(tenant_id, timeline_id, &self.connection_ctx).await?;
|
||||
let ctx = timeline.get_context();
|
||||
|
||||
let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
|
||||
if let Some(lsn) = lsn {
|
||||
// Backup was requested at a particular LSN. Wait for it to arrive.
|
||||
info!("waiting for {}", lsn);
|
||||
timeline.wait_lsn(lsn).await?;
|
||||
timeline.wait_lsn(lsn, &ctx).await?;
|
||||
timeline
|
||||
.check_lsn_is_in_scope(lsn, &latest_gc_cutoff_lsn)
|
||||
.context("invalid basebackup lsn")?;
|
||||
@@ -645,8 +680,15 @@ impl PageServerHandler {
|
||||
// Send a tarball of the latest layer on the timeline
|
||||
{
|
||||
let mut writer = pgb.copyout_writer();
|
||||
basebackup::send_basebackup_tarball(&mut writer, &timeline, lsn, prev_lsn, full_backup)
|
||||
.await?;
|
||||
basebackup::send_basebackup_tarball(
|
||||
&mut writer,
|
||||
&timeline,
|
||||
lsn,
|
||||
prev_lsn,
|
||||
full_backup,
|
||||
&ctx,
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
|
||||
pgb.write_message(&BeMessage::CopyDone)?;
|
||||
@@ -784,7 +826,9 @@ impl postgres_backend_async::Handler for PageServerHandler {
|
||||
.with_context(|| format!("Failed to parse timeline id from {}", params[1]))?;
|
||||
|
||||
self.check_permission(Some(tenant_id))?;
|
||||
let timeline = get_active_timeline_with_timeout(tenant_id, timeline_id).await?;
|
||||
let timeline =
|
||||
get_active_timeline_with_timeout(tenant_id, timeline_id, &self.connection_ctx)
|
||||
.await?;
|
||||
|
||||
let end_of_timeline = timeline.get_last_record_rlsn();
|
||||
|
||||
@@ -944,7 +988,7 @@ impl postgres_backend_async::Handler for PageServerHandler {
|
||||
|
||||
self.check_permission(Some(tenant_id))?;
|
||||
|
||||
let tenant = get_active_tenant_with_timeout(tenant_id).await?;
|
||||
let tenant = get_active_tenant_with_timeout(tenant_id, &self.connection_ctx).await?;
|
||||
pgb.write_message(&BeMessage::RowDescription(&[
|
||||
RowDescriptor::int8_col(b"checkpoint_distance"),
|
||||
RowDescriptor::int8_col(b"checkpoint_timeout"),
|
||||
@@ -995,9 +1039,17 @@ impl postgres_backend_async::Handler for PageServerHandler {
|
||||
/// If the tenant is Loading, waits for it to become Active, for up to 30 s. That
|
||||
/// ensures that queries don't fail immediately after pageserver startup, because
|
||||
/// all tenants are still loading.
|
||||
async fn get_active_tenant_with_timeout(tenant_id: TenantId) -> anyhow::Result<Arc<Tenant>> {
|
||||
async fn get_active_tenant_with_timeout(
|
||||
tenant_id: TenantId,
|
||||
parent_ctx: &RequestContext,
|
||||
) -> anyhow::Result<Arc<Tenant>> {
|
||||
let tenant = mgr::get_tenant(tenant_id, false).await?;
|
||||
match tokio::time::timeout(Duration::from_secs(30), tenant.wait_to_become_active()).await {
|
||||
match tokio::time::timeout(
|
||||
Duration::from_secs(30),
|
||||
tenant.wait_to_become_active(parent_ctx),
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(wait_result) => wait_result
|
||||
// no .context(), the error message is good enough and some tests depend on it
|
||||
.map(move |()| tenant),
|
||||
@@ -1009,8 +1061,9 @@ async fn get_active_tenant_with_timeout(tenant_id: TenantId) -> anyhow::Result<A
|
||||
async fn get_active_timeline_with_timeout(
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<Arc<Timeline>> {
|
||||
get_active_tenant_with_timeout(tenant_id)
|
||||
get_active_tenant_with_timeout(tenant_id, ctx)
|
||||
.await
|
||||
.and_then(|tenant| tenant.get_timeline(timeline_id, true))
|
||||
}
|
||||
|
||||
@@ -6,9 +6,9 @@
|
||||
//! walingest.rs handles a few things like implicit relation creation and extension.
|
||||
//! Clarify that)
|
||||
//!
|
||||
use super::tenant::{PageReconstructError, Timeline};
|
||||
use crate::keyspace::{KeySpace, KeySpaceAccum};
|
||||
use crate::repository::*;
|
||||
use crate::tenant::{PageReconstructError, Timeline, TimelineRequestContext};
|
||||
use crate::walrecord::NeonWalRecord;
|
||||
use anyhow::Context;
|
||||
use bytes::{Buf, Bytes};
|
||||
@@ -97,6 +97,7 @@ impl Timeline {
|
||||
blknum: BlockNumber,
|
||||
lsn: Lsn,
|
||||
latest: bool,
|
||||
ctx: &TimelineRequestContext,
|
||||
) -> Result<Bytes, PageReconstructError> {
|
||||
if tag.relnode == 0 {
|
||||
return Err(PageReconstructError::Other(anyhow::anyhow!(
|
||||
@@ -104,7 +105,7 @@ impl Timeline {
|
||||
)));
|
||||
}
|
||||
|
||||
let nblocks = self.get_rel_size(tag, lsn, latest).await?;
|
||||
let nblocks = self.get_rel_size(tag, lsn, latest, ctx).await?;
|
||||
if blknum >= nblocks {
|
||||
debug!(
|
||||
"read beyond EOF at {} blk {} at {}, size is {}: returning all-zeros page",
|
||||
@@ -114,7 +115,7 @@ impl Timeline {
|
||||
}
|
||||
|
||||
let key = rel_block_to_key(tag, blknum);
|
||||
self.get(key, lsn).await
|
||||
self.get(key, lsn, ctx).await
|
||||
}
|
||||
|
||||
// Get size of a database in blocks
|
||||
@@ -124,13 +125,14 @@ impl Timeline {
|
||||
dbnode: Oid,
|
||||
lsn: Lsn,
|
||||
latest: bool,
|
||||
ctx: &TimelineRequestContext,
|
||||
) -> Result<usize, PageReconstructError> {
|
||||
let mut total_blocks = 0;
|
||||
|
||||
let rels = self.list_rels(spcnode, dbnode, lsn).await?;
|
||||
let rels = self.list_rels(spcnode, dbnode, lsn, ctx).await?;
|
||||
|
||||
for rel in rels {
|
||||
let n_blocks = self.get_rel_size(rel, lsn, latest).await?;
|
||||
let n_blocks = self.get_rel_size(rel, lsn, latest, ctx).await?;
|
||||
total_blocks += n_blocks as usize;
|
||||
}
|
||||
Ok(total_blocks)
|
||||
@@ -142,6 +144,7 @@ impl Timeline {
|
||||
tag: RelTag,
|
||||
lsn: Lsn,
|
||||
latest: bool,
|
||||
ctx: &TimelineRequestContext,
|
||||
) -> Result<BlockNumber, PageReconstructError> {
|
||||
if tag.relnode == 0 {
|
||||
return Err(PageReconstructError::Other(anyhow::anyhow!(
|
||||
@@ -154,7 +157,7 @@ impl Timeline {
|
||||
}
|
||||
|
||||
if (tag.forknum == FSM_FORKNUM || tag.forknum == VISIBILITYMAP_FORKNUM)
|
||||
&& !self.get_rel_exists(tag, lsn, latest).await?
|
||||
&& !self.get_rel_exists(tag, lsn, latest, ctx).await?
|
||||
{
|
||||
// FIXME: Postgres sometimes calls smgrcreate() to create
|
||||
// FSM, and smgrnblocks() on it immediately afterwards,
|
||||
@@ -164,7 +167,7 @@ impl Timeline {
|
||||
}
|
||||
|
||||
let key = rel_size_to_key(tag);
|
||||
let mut buf = self.get(key, lsn).await?;
|
||||
let mut buf = self.get(key, lsn, ctx).await?;
|
||||
let nblocks = buf.get_u32_le();
|
||||
|
||||
if latest {
|
||||
@@ -186,6 +189,7 @@ impl Timeline {
|
||||
tag: RelTag,
|
||||
lsn: Lsn,
|
||||
_latest: bool,
|
||||
ctx: &TimelineRequestContext,
|
||||
) -> Result<bool, PageReconstructError> {
|
||||
if tag.relnode == 0 {
|
||||
return Err(PageReconstructError::Other(anyhow::anyhow!(
|
||||
@@ -199,7 +203,7 @@ impl Timeline {
|
||||
}
|
||||
// fetch directory listing
|
||||
let key = rel_dir_to_key(tag.spcnode, tag.dbnode);
|
||||
let buf = self.get(key, lsn).await?;
|
||||
let buf = self.get(key, lsn, ctx).await?;
|
||||
|
||||
match RelDirectory::des(&buf).context("deserialization failure") {
|
||||
Ok(dir) => {
|
||||
@@ -216,10 +220,11 @@ impl Timeline {
|
||||
spcnode: Oid,
|
||||
dbnode: Oid,
|
||||
lsn: Lsn,
|
||||
ctx: &TimelineRequestContext,
|
||||
) -> Result<HashSet<RelTag>, PageReconstructError> {
|
||||
// fetch directory listing
|
||||
let key = rel_dir_to_key(spcnode, dbnode);
|
||||
let buf = self.get(key, lsn).await?;
|
||||
let buf = self.get(key, lsn, ctx).await?;
|
||||
|
||||
match RelDirectory::des(&buf).context("deserialization failure") {
|
||||
Ok(dir) => {
|
||||
@@ -244,9 +249,10 @@ impl Timeline {
|
||||
segno: u32,
|
||||
blknum: BlockNumber,
|
||||
lsn: Lsn,
|
||||
ctx: &TimelineRequestContext,
|
||||
) -> Result<Bytes, PageReconstructError> {
|
||||
let key = slru_block_to_key(kind, segno, blknum);
|
||||
self.get(key, lsn).await
|
||||
self.get(key, lsn, ctx).await
|
||||
}
|
||||
|
||||
/// Get size of an SLRU segment
|
||||
@@ -255,9 +261,10 @@ impl Timeline {
|
||||
kind: SlruKind,
|
||||
segno: u32,
|
||||
lsn: Lsn,
|
||||
ctx: &TimelineRequestContext,
|
||||
) -> Result<BlockNumber, PageReconstructError> {
|
||||
let key = slru_segment_size_to_key(kind, segno);
|
||||
let mut buf = self.get(key, lsn).await?;
|
||||
let mut buf = self.get(key, lsn, ctx).await?;
|
||||
Ok(buf.get_u32_le())
|
||||
}
|
||||
|
||||
@@ -267,10 +274,11 @@ impl Timeline {
|
||||
kind: SlruKind,
|
||||
segno: u32,
|
||||
lsn: Lsn,
|
||||
ctx: &TimelineRequestContext,
|
||||
) -> Result<bool, PageReconstructError> {
|
||||
// fetch directory listing
|
||||
let key = slru_dir_to_key(kind);
|
||||
let buf = self.get(key, lsn).await?;
|
||||
let buf = self.get(key, lsn, ctx).await?;
|
||||
|
||||
match SlruSegmentDirectory::des(&buf).context("deserialization failure") {
|
||||
Ok(dir) => {
|
||||
@@ -291,6 +299,7 @@ impl Timeline {
|
||||
pub async fn find_lsn_for_timestamp(
|
||||
&self,
|
||||
search_timestamp: TimestampTz,
|
||||
ctx: &TimelineRequestContext,
|
||||
) -> Result<LsnForTimestamp, PageReconstructError> {
|
||||
let gc_cutoff_lsn_guard = self.get_latest_gc_cutoff_lsn();
|
||||
let min_lsn = *gc_cutoff_lsn_guard;
|
||||
@@ -313,6 +322,7 @@ impl Timeline {
|
||||
Lsn(mid * 8),
|
||||
&mut found_smaller,
|
||||
&mut found_larger,
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
|
||||
@@ -362,14 +372,18 @@ impl Timeline {
|
||||
probe_lsn: Lsn,
|
||||
found_smaller: &mut bool,
|
||||
found_larger: &mut bool,
|
||||
ctx: &TimelineRequestContext,
|
||||
) -> Result<bool, PageReconstructError> {
|
||||
for segno in self.list_slru_segments(SlruKind::Clog, probe_lsn).await? {
|
||||
for segno in self
|
||||
.list_slru_segments(SlruKind::Clog, probe_lsn, ctx)
|
||||
.await?
|
||||
{
|
||||
let nblocks = self
|
||||
.get_slru_segment_size(SlruKind::Clog, segno, probe_lsn)
|
||||
.get_slru_segment_size(SlruKind::Clog, segno, probe_lsn, ctx)
|
||||
.await?;
|
||||
for blknum in (0..nblocks).rev() {
|
||||
let clog_page = self
|
||||
.get_slru_page_at_lsn(SlruKind::Clog, segno, blknum, probe_lsn)
|
||||
.get_slru_page_at_lsn(SlruKind::Clog, segno, blknum, probe_lsn, ctx)
|
||||
.await?;
|
||||
|
||||
if clog_page.len() == BLCKSZ as usize + 8 {
|
||||
@@ -394,11 +408,12 @@ impl Timeline {
|
||||
&self,
|
||||
kind: SlruKind,
|
||||
lsn: Lsn,
|
||||
ctx: &TimelineRequestContext,
|
||||
) -> Result<HashSet<u32>, PageReconstructError> {
|
||||
// fetch directory entry
|
||||
let key = slru_dir_to_key(kind);
|
||||
|
||||
let buf = self.get(key, lsn).await?;
|
||||
let buf = self.get(key, lsn, ctx).await?;
|
||||
match SlruSegmentDirectory::des(&buf).context("deserialization failure") {
|
||||
Ok(dir) => Ok(dir.segments),
|
||||
Err(e) => Err(PageReconstructError::from(e)),
|
||||
@@ -410,18 +425,20 @@ impl Timeline {
|
||||
spcnode: Oid,
|
||||
dbnode: Oid,
|
||||
lsn: Lsn,
|
||||
ctx: &TimelineRequestContext,
|
||||
) -> Result<Bytes, PageReconstructError> {
|
||||
let key = relmap_file_key(spcnode, dbnode);
|
||||
|
||||
self.get(key, lsn).await
|
||||
self.get(key, lsn, ctx).await
|
||||
}
|
||||
|
||||
pub async fn list_dbdirs(
|
||||
&self,
|
||||
lsn: Lsn,
|
||||
ctx: &TimelineRequestContext,
|
||||
) -> Result<HashMap<(Oid, Oid), bool>, PageReconstructError> {
|
||||
// fetch directory entry
|
||||
let buf = self.get(DBDIR_KEY, lsn).await?;
|
||||
let buf = self.get(DBDIR_KEY, lsn, ctx).await?;
|
||||
|
||||
match DbDirectory::des(&buf).context("deserialization failure") {
|
||||
Ok(dir) => Ok(dir.dbdirs),
|
||||
@@ -433,18 +450,20 @@ impl Timeline {
|
||||
&self,
|
||||
xid: TransactionId,
|
||||
lsn: Lsn,
|
||||
ctx: &TimelineRequestContext,
|
||||
) -> Result<Bytes, PageReconstructError> {
|
||||
let key = twophase_file_key(xid);
|
||||
let buf = self.get(key, lsn).await?;
|
||||
let buf = self.get(key, lsn, ctx).await?;
|
||||
Ok(buf)
|
||||
}
|
||||
|
||||
pub async fn list_twophase_files(
|
||||
&self,
|
||||
lsn: Lsn,
|
||||
ctx: &TimelineRequestContext,
|
||||
) -> Result<HashSet<TransactionId>, PageReconstructError> {
|
||||
// fetch directory entry
|
||||
let buf = self.get(TWOPHASEDIR_KEY, lsn).await?;
|
||||
let buf = self.get(TWOPHASEDIR_KEY, lsn, ctx).await?;
|
||||
|
||||
match TwoPhaseDirectory::des(&buf).context("deserialization failure") {
|
||||
Ok(dir) => Ok(dir.xids),
|
||||
@@ -452,12 +471,20 @@ impl Timeline {
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn get_control_file(&self, lsn: Lsn) -> Result<Bytes, PageReconstructError> {
|
||||
self.get(CONTROLFILE_KEY, lsn).await
|
||||
pub async fn get_control_file(
|
||||
&self,
|
||||
lsn: Lsn,
|
||||
ctx: &TimelineRequestContext,
|
||||
) -> Result<Bytes, PageReconstructError> {
|
||||
self.get(CONTROLFILE_KEY, lsn, ctx).await
|
||||
}
|
||||
|
||||
pub async fn get_checkpoint(&self, lsn: Lsn) -> Result<Bytes, PageReconstructError> {
|
||||
self.get(CHECKPOINT_KEY, lsn).await
|
||||
pub async fn get_checkpoint(
|
||||
&self,
|
||||
lsn: Lsn,
|
||||
ctx: &TimelineRequestContext,
|
||||
) -> Result<Bytes, PageReconstructError> {
|
||||
self.get(CHECKPOINT_KEY, lsn, ctx).await
|
||||
}
|
||||
|
||||
/// Does the same as get_current_logical_size but counted on demand.
|
||||
@@ -469,15 +496,16 @@ impl Timeline {
|
||||
&self,
|
||||
lsn: Lsn,
|
||||
cancel: CancellationToken,
|
||||
ctx: &TimelineRequestContext,
|
||||
) -> Result<u64, CalculateLogicalSizeError> {
|
||||
// Fetch list of database dirs and iterate them
|
||||
let buf = self.get(DBDIR_KEY, lsn).await.context("read dbdir")?;
|
||||
let buf = self.get(DBDIR_KEY, lsn, ctx).await.context("read dbdir")?;
|
||||
let dbdir = DbDirectory::des(&buf).context("deserialize db directory")?;
|
||||
|
||||
let mut total_size: u64 = 0;
|
||||
for (spcnode, dbnode) in dbdir.dbdirs.keys() {
|
||||
for rel in self
|
||||
.list_rels(*spcnode, *dbnode, lsn)
|
||||
.list_rels(*spcnode, *dbnode, lsn, ctx)
|
||||
.await
|
||||
.context("list rels")?
|
||||
{
|
||||
@@ -486,7 +514,7 @@ impl Timeline {
|
||||
}
|
||||
let relsize_key = rel_size_to_key(rel);
|
||||
let mut buf = self
|
||||
.get(relsize_key, lsn)
|
||||
.get(relsize_key, lsn, ctx)
|
||||
.await
|
||||
.context("read relation size of {rel:?}")?;
|
||||
let relsize = buf.get_u32_le();
|
||||
@@ -501,7 +529,11 @@ impl Timeline {
|
||||
/// Get a KeySpace that covers all the Keys that are in use at the given LSN.
|
||||
/// Anything that's not listed maybe removed from the underlying storage (from
|
||||
/// that LSN forwards).
|
||||
pub async fn collect_keyspace(&self, lsn: Lsn) -> anyhow::Result<KeySpace> {
|
||||
pub async fn collect_keyspace(
|
||||
&self,
|
||||
lsn: Lsn,
|
||||
ctx: &TimelineRequestContext,
|
||||
) -> anyhow::Result<KeySpace> {
|
||||
// Iterate through key ranges, greedily packing them into partitions
|
||||
let mut result = KeySpaceAccum::new();
|
||||
|
||||
@@ -509,7 +541,7 @@ impl Timeline {
|
||||
result.add_key(DBDIR_KEY);
|
||||
|
||||
// Fetch list of database dirs and iterate them
|
||||
let buf = self.get(DBDIR_KEY, lsn).await?;
|
||||
let buf = self.get(DBDIR_KEY, lsn, ctx).await?;
|
||||
let dbdir = DbDirectory::des(&buf).context("deserialization failure")?;
|
||||
|
||||
let mut dbs: Vec<(Oid, Oid)> = dbdir.dbdirs.keys().cloned().collect();
|
||||
@@ -519,14 +551,14 @@ impl Timeline {
|
||||
result.add_key(rel_dir_to_key(spcnode, dbnode));
|
||||
|
||||
let mut rels: Vec<RelTag> = self
|
||||
.list_rels(spcnode, dbnode, lsn)
|
||||
.list_rels(spcnode, dbnode, lsn, ctx)
|
||||
.await?
|
||||
.into_iter()
|
||||
.collect();
|
||||
rels.sort_unstable();
|
||||
for rel in rels {
|
||||
let relsize_key = rel_size_to_key(rel);
|
||||
let mut buf = self.get(relsize_key, lsn).await?;
|
||||
let mut buf = self.get(relsize_key, lsn, ctx).await?;
|
||||
let relsize = buf.get_u32_le();
|
||||
|
||||
result.add_range(rel_block_to_key(rel, 0)..rel_block_to_key(rel, relsize));
|
||||
@@ -542,13 +574,13 @@ impl Timeline {
|
||||
] {
|
||||
let slrudir_key = slru_dir_to_key(kind);
|
||||
result.add_key(slrudir_key);
|
||||
let buf = self.get(slrudir_key, lsn).await?;
|
||||
let buf = self.get(slrudir_key, lsn, ctx).await?;
|
||||
let dir = SlruSegmentDirectory::des(&buf).context("deserialization failure")?;
|
||||
let mut segments: Vec<u32> = dir.segments.iter().cloned().collect();
|
||||
segments.sort_unstable();
|
||||
for segno in segments {
|
||||
let segsize_key = slru_segment_size_to_key(kind, segno);
|
||||
let mut buf = self.get(segsize_key, lsn).await?;
|
||||
let mut buf = self.get(segsize_key, lsn, ctx).await?;
|
||||
let segsize = buf.get_u32_le();
|
||||
|
||||
result.add_range(
|
||||
@@ -560,7 +592,7 @@ impl Timeline {
|
||||
|
||||
// Then pg_twophase
|
||||
result.add_key(TWOPHASEDIR_KEY);
|
||||
let buf = self.get(TWOPHASEDIR_KEY, lsn).await?;
|
||||
let buf = self.get(TWOPHASEDIR_KEY, lsn, ctx).await?;
|
||||
let twophase_dir = TwoPhaseDirectory::des(&buf).context("deserialization failure")?;
|
||||
let mut xids: Vec<TransactionId> = twophase_dir.xids.iter().cloned().collect();
|
||||
xids.sort_unstable();
|
||||
@@ -723,9 +755,10 @@ impl<'a> DatadirModification<'a> {
|
||||
spcnode: Oid,
|
||||
dbnode: Oid,
|
||||
img: Bytes,
|
||||
ctx: &TimelineRequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
// Add it to the directory (if it doesn't exist already)
|
||||
let buf = self.get(DBDIR_KEY).await?;
|
||||
let buf = self.get(DBDIR_KEY, ctx).await?;
|
||||
let mut dbdir = DbDirectory::des(&buf)?;
|
||||
|
||||
let r = dbdir.dbdirs.insert((spcnode, dbnode), true);
|
||||
@@ -755,9 +788,10 @@ impl<'a> DatadirModification<'a> {
|
||||
&mut self,
|
||||
xid: TransactionId,
|
||||
img: Bytes,
|
||||
ctx: &TimelineRequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
// Add it to the directory entry
|
||||
let buf = self.get(TWOPHASEDIR_KEY).await?;
|
||||
let buf = self.get(TWOPHASEDIR_KEY, ctx).await?;
|
||||
let mut dir = TwoPhaseDirectory::des(&buf)?;
|
||||
if !dir.xids.insert(xid) {
|
||||
anyhow::bail!("twophase file for xid {} already exists", xid);
|
||||
@@ -781,16 +815,21 @@ impl<'a> DatadirModification<'a> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn drop_dbdir(&mut self, spcnode: Oid, dbnode: Oid) -> anyhow::Result<()> {
|
||||
pub async fn drop_dbdir(
|
||||
&mut self,
|
||||
spcnode: Oid,
|
||||
dbnode: Oid,
|
||||
ctx: &TimelineRequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
let req_lsn = self.tline.get_last_record_lsn();
|
||||
|
||||
let total_blocks = self
|
||||
.tline
|
||||
.get_db_size(spcnode, dbnode, req_lsn, true)
|
||||
.get_db_size(spcnode, dbnode, req_lsn, true, ctx)
|
||||
.await?;
|
||||
|
||||
// Remove entry from dbdir
|
||||
let buf = self.get(DBDIR_KEY).await?;
|
||||
let buf = self.get(DBDIR_KEY, ctx).await?;
|
||||
let mut dir = DbDirectory::des(&buf)?;
|
||||
if dir.dbdirs.remove(&(spcnode, dbnode)).is_some() {
|
||||
let buf = DbDirectory::ser(&dir)?;
|
||||
@@ -817,11 +856,12 @@ impl<'a> DatadirModification<'a> {
|
||||
&mut self,
|
||||
rel: RelTag,
|
||||
nblocks: BlockNumber,
|
||||
ctx: &TimelineRequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
anyhow::ensure!(rel.relnode != 0, "invalid relnode");
|
||||
// It's possible that this is the first rel for this db in this
|
||||
// tablespace. Create the reldir entry for it if so.
|
||||
let mut dbdir = DbDirectory::des(&self.get(DBDIR_KEY).await?)?;
|
||||
let mut dbdir = DbDirectory::des(&self.get(DBDIR_KEY, ctx).await?)?;
|
||||
let rel_dir_key = rel_dir_to_key(rel.spcnode, rel.dbnode);
|
||||
let mut rel_dir = if dbdir.dbdirs.get(&(rel.spcnode, rel.dbnode)).is_none() {
|
||||
// Didn't exist. Update dbdir
|
||||
@@ -833,7 +873,7 @@ impl<'a> DatadirModification<'a> {
|
||||
RelDirectory::default()
|
||||
} else {
|
||||
// reldir already exists, fetch it
|
||||
RelDirectory::des(&self.get(rel_dir_key).await?)?
|
||||
RelDirectory::des(&self.get(rel_dir_key, ctx).await?)?
|
||||
};
|
||||
|
||||
// Add the new relation to the rel directory entry, and write it back
|
||||
@@ -865,13 +905,14 @@ impl<'a> DatadirModification<'a> {
|
||||
&mut self,
|
||||
rel: RelTag,
|
||||
nblocks: BlockNumber,
|
||||
ctx: &TimelineRequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
anyhow::ensure!(rel.relnode != 0, "invalid relnode");
|
||||
let last_lsn = self.tline.get_last_record_lsn();
|
||||
if self.tline.get_rel_exists(rel, last_lsn, true).await? {
|
||||
if self.tline.get_rel_exists(rel, last_lsn, true, ctx).await? {
|
||||
let size_key = rel_size_to_key(rel);
|
||||
// Fetch the old size first
|
||||
let old_size = self.get(size_key).await?.get_u32_le();
|
||||
let old_size = self.get(size_key, ctx).await?.get_u32_le();
|
||||
|
||||
// Update the entry with the new size.
|
||||
let buf = nblocks.to_le_bytes();
|
||||
@@ -895,12 +936,13 @@ impl<'a> DatadirModification<'a> {
|
||||
&mut self,
|
||||
rel: RelTag,
|
||||
nblocks: BlockNumber,
|
||||
ctx: &TimelineRequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
anyhow::ensure!(rel.relnode != 0, "invalid relnode");
|
||||
|
||||
// Put size
|
||||
let size_key = rel_size_to_key(rel);
|
||||
let old_size = self.get(size_key).await?.get_u32_le();
|
||||
let old_size = self.get(size_key, ctx).await?.get_u32_le();
|
||||
|
||||
// only extend relation here. never decrease the size
|
||||
if nblocks > old_size {
|
||||
@@ -916,12 +958,16 @@ impl<'a> DatadirModification<'a> {
|
||||
}
|
||||
|
||||
/// Drop a relation.
|
||||
pub async fn put_rel_drop(&mut self, rel: RelTag) -> anyhow::Result<()> {
|
||||
pub async fn put_rel_drop(
|
||||
&mut self,
|
||||
rel: RelTag,
|
||||
ctx: &TimelineRequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
anyhow::ensure!(rel.relnode != 0, "invalid relnode");
|
||||
|
||||
// Remove it from the directory entry
|
||||
let dir_key = rel_dir_to_key(rel.spcnode, rel.dbnode);
|
||||
let buf = self.get(dir_key).await?;
|
||||
let buf = self.get(dir_key, ctx).await?;
|
||||
let mut dir = RelDirectory::des(&buf)?;
|
||||
|
||||
if dir.rels.remove(&(rel.relnode, rel.forknum)) {
|
||||
@@ -932,7 +978,7 @@ impl<'a> DatadirModification<'a> {
|
||||
|
||||
// update logical size
|
||||
let size_key = rel_size_to_key(rel);
|
||||
let old_size = self.get(size_key).await?.get_u32_le();
|
||||
let old_size = self.get(size_key, ctx).await?.get_u32_le();
|
||||
self.pending_nblocks -= old_size as i64;
|
||||
|
||||
// Remove enty from relation size cache
|
||||
@@ -949,10 +995,11 @@ impl<'a> DatadirModification<'a> {
|
||||
kind: SlruKind,
|
||||
segno: u32,
|
||||
nblocks: BlockNumber,
|
||||
ctx: &TimelineRequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
// Add it to the directory entry
|
||||
let dir_key = slru_dir_to_key(kind);
|
||||
let buf = self.get(dir_key).await?;
|
||||
let buf = self.get(dir_key, ctx).await?;
|
||||
let mut dir = SlruSegmentDirectory::des(&buf)?;
|
||||
|
||||
if !dir.segments.insert(segno) {
|
||||
@@ -988,10 +1035,15 @@ impl<'a> DatadirModification<'a> {
|
||||
}
|
||||
|
||||
/// This method is used for marking truncated SLRU files
|
||||
pub async fn drop_slru_segment(&mut self, kind: SlruKind, segno: u32) -> anyhow::Result<()> {
|
||||
pub async fn drop_slru_segment(
|
||||
&mut self,
|
||||
kind: SlruKind,
|
||||
segno: u32,
|
||||
ctx: &TimelineRequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
// Remove it from the directory entry
|
||||
let dir_key = slru_dir_to_key(kind);
|
||||
let buf = self.get(dir_key).await?;
|
||||
let buf = self.get(dir_key, ctx).await?;
|
||||
let mut dir = SlruSegmentDirectory::des(&buf)?;
|
||||
|
||||
if !dir.segments.remove(&segno) {
|
||||
@@ -1015,9 +1067,13 @@ impl<'a> DatadirModification<'a> {
|
||||
}
|
||||
|
||||
/// This method is used for marking truncated SLRU files
|
||||
pub async fn drop_twophase_file(&mut self, xid: TransactionId) -> anyhow::Result<()> {
|
||||
pub async fn drop_twophase_file(
|
||||
&mut self,
|
||||
xid: TransactionId,
|
||||
ctx: &TimelineRequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
// Remove it from the directory entry
|
||||
let buf = self.get(TWOPHASEDIR_KEY).await?;
|
||||
let buf = self.get(TWOPHASEDIR_KEY, ctx).await?;
|
||||
let mut dir = TwoPhaseDirectory::des(&buf)?;
|
||||
|
||||
if !dir.xids.remove(&xid) {
|
||||
@@ -1111,7 +1167,11 @@ impl<'a> DatadirModification<'a> {
|
||||
|
||||
// Internal helper functions to batch the modifications
|
||||
|
||||
async fn get(&self, key: Key) -> Result<Bytes, PageReconstructError> {
|
||||
async fn get(
|
||||
&self,
|
||||
key: Key,
|
||||
ctx: &TimelineRequestContext,
|
||||
) -> Result<Bytes, PageReconstructError> {
|
||||
// Have we already updated the same key? Read the pending updated
|
||||
// version in that case.
|
||||
//
|
||||
@@ -1132,7 +1192,7 @@ impl<'a> DatadirModification<'a> {
|
||||
}
|
||||
} else {
|
||||
let lsn = Lsn::max(self.tline.get_last_record_lsn(), self.lsn);
|
||||
self.tline.get(key, lsn).await
|
||||
self.tline.get(key, lsn, ctx).await
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1539,17 +1599,18 @@ fn is_slru_block_key(key: Key) -> bool {
|
||||
|
||||
#[cfg(test)]
|
||||
pub fn create_test_timeline(
|
||||
tenant: &crate::tenant::Tenant,
|
||||
tenant: &std::sync::Arc<crate::tenant::Tenant>,
|
||||
timeline_id: utils::id::TimelineId,
|
||||
pg_version: u32,
|
||||
) -> anyhow::Result<std::sync::Arc<Timeline>> {
|
||||
let tline = tenant
|
||||
.create_empty_timeline(timeline_id, Lsn(8), pg_version)?
|
||||
.initialize()?;
|
||||
tenant_ctx: &crate::tenant::TenantRequestContext,
|
||||
) -> anyhow::Result<(std::sync::Arc<Timeline>, TimelineRequestContext)> {
|
||||
let (tline, timeline_ctx) =
|
||||
tenant.create_empty_timeline(timeline_id, Lsn(8), pg_version, tenant_ctx)?;
|
||||
let tline = tline.initialize(&timeline_ctx)?;
|
||||
let mut m = tline.begin_modification(Lsn(8));
|
||||
m.init_empty()?;
|
||||
m.commit()?;
|
||||
Ok(tline)
|
||||
Ok((tline, timeline_ctx))
|
||||
}
|
||||
|
||||
#[allow(clippy::bool_assert_comparison)]
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@@ -16,6 +16,7 @@ use remote_storage::GenericRemoteStorage;
|
||||
use utils::crashsafe;
|
||||
|
||||
use crate::config::PageServerConf;
|
||||
use crate::context::RequestContext;
|
||||
use crate::task_mgr::{self, TaskKind};
|
||||
use crate::tenant::config::TenantConfOpt;
|
||||
use crate::tenant::{Tenant, TenantState};
|
||||
@@ -234,6 +235,7 @@ pub async fn update_tenant_config(
|
||||
conf: &'static PageServerConf,
|
||||
tenant_conf: TenantConfOpt,
|
||||
tenant_id: TenantId,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
info!("configuring tenant {tenant_id}");
|
||||
get_tenant(tenant_id, true)
|
||||
@@ -260,10 +262,15 @@ pub async fn get_tenant(tenant_id: TenantId, active_only: bool) -> anyhow::Resul
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn delete_timeline(tenant_id: TenantId, timeline_id: TimelineId) -> anyhow::Result<()> {
|
||||
pub async fn delete_timeline(
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
match get_tenant(tenant_id, true).await {
|
||||
Ok(tenant) => {
|
||||
tenant.delete_timeline(timeline_id).await?;
|
||||
let tenant_ctx = tenant.get_context();
|
||||
tenant.delete_timeline(timeline_id, &tenant_ctx).await?;
|
||||
}
|
||||
Err(e) => anyhow::bail!("Cannot access tenant {tenant_id} in local tenant state: {e:?}"),
|
||||
}
|
||||
@@ -473,8 +480,9 @@ pub async fn immediate_gc(
|
||||
false,
|
||||
async move {
|
||||
fail::fail_point!("immediate_gc_task_pre");
|
||||
let tenant_ctx = tenant.get_context();
|
||||
let result = tenant
|
||||
.gc_iteration(Some(timeline_id), gc_horizon, pitr)
|
||||
.gc_iteration(Some(timeline_id), gc_horizon, pitr, &tenant_ctx)
|
||||
.instrument(info_span!("manual_gc", tenant = %tenant_id, timeline = %timeline_id))
|
||||
.await;
|
||||
// FIXME: `gc_iteration` can return an error for multiple reasons; we should handle it
|
||||
@@ -509,6 +517,7 @@ pub async fn immediate_compact(
|
||||
let timeline = tenant
|
||||
.get_timeline(timeline_id, true)
|
||||
.map_err(ApiError::NotFound)?;
|
||||
let timeline_ctx = timeline.get_context();
|
||||
|
||||
// Run in task_mgr to avoid race with detach operation
|
||||
let (task_done, wait_task_done) = tokio::sync::oneshot::channel();
|
||||
@@ -523,7 +532,7 @@ pub async fn immediate_compact(
|
||||
false,
|
||||
async move {
|
||||
let result = timeline
|
||||
.compact()
|
||||
.compact(&timeline_ctx)
|
||||
.instrument(
|
||||
info_span!("manual_compact", tenant = %tenant_id, timeline = %timeline_id),
|
||||
)
|
||||
|
||||
@@ -7,6 +7,7 @@ use tokio::sync::oneshot::error::RecvError;
|
||||
use tokio::sync::Semaphore;
|
||||
|
||||
use crate::pgdatadir_mapping::CalculateLogicalSizeError;
|
||||
use crate::tenant::{TenantRequestContext, TimelineRequestContext};
|
||||
|
||||
use super::Tenant;
|
||||
use utils::id::TimelineId;
|
||||
@@ -63,13 +64,14 @@ pub(super) async fn gather_inputs(
|
||||
tenant: &Tenant,
|
||||
limit: &Arc<Semaphore>,
|
||||
logical_size_cache: &mut HashMap<(TimelineId, Lsn), u64>,
|
||||
tenant_ctx: &TenantRequestContext,
|
||||
) -> anyhow::Result<ModelInputs> {
|
||||
// with joinset, on drop, all of the tasks will just be de-scheduled, which we can use to
|
||||
// our advantage with `?` error handling.
|
||||
let mut joinset = tokio::task::JoinSet::new();
|
||||
|
||||
let timelines = tenant
|
||||
.refresh_gc_info()
|
||||
.refresh_gc_info(tenant_ctx)
|
||||
.await
|
||||
.context("Failed to refresh gc_info before gathering inputs")?;
|
||||
|
||||
@@ -100,6 +102,9 @@ pub(super) async fn gather_inputs(
|
||||
for timeline in timelines {
|
||||
let last_record_lsn = timeline.get_last_record_lsn();
|
||||
|
||||
let ctx = timeline.get_context();
|
||||
let ctx = Arc::new(ctx);
|
||||
|
||||
let (interesting_lsns, horizon_cutoff, pitr_cutoff, next_gc_cutoff) = {
|
||||
// there's a race between the update (holding tenant.gc_lock) and this read but it
|
||||
// might not be an issue, because it's not for Timeline::gc
|
||||
@@ -169,19 +174,23 @@ pub(super) async fn gather_inputs(
|
||||
timeline_id: timeline.timeline_id,
|
||||
});
|
||||
|
||||
for (lsn, _kind) in &interesting_lsns {
|
||||
if let Some(size) = logical_size_cache.get(&(timeline.timeline_id, *lsn)) {
|
||||
for (lsn, _kind) in interesting_lsns.iter() {
|
||||
let lsn = *lsn;
|
||||
if let Some(size) = logical_size_cache.get(&(timeline.timeline_id, lsn)) {
|
||||
updates.push(Update {
|
||||
lsn: *lsn,
|
||||
lsn,
|
||||
timeline_id: timeline.timeline_id,
|
||||
command: Command::Update(*size),
|
||||
});
|
||||
|
||||
needed_cache.insert((timeline.timeline_id, *lsn));
|
||||
needed_cache.insert((timeline.timeline_id, lsn));
|
||||
} else {
|
||||
let timeline = Arc::clone(&timeline);
|
||||
let parallel_size_calcs = Arc::clone(limit);
|
||||
joinset.spawn(calculate_logical_size(parallel_size_calcs, timeline, *lsn));
|
||||
let ctx_clone = Arc::clone(&ctx);
|
||||
joinset.spawn(async move {
|
||||
calculate_logical_size(parallel_size_calcs, timeline, lsn, &ctx_clone).await
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
@@ -365,13 +374,14 @@ async fn calculate_logical_size(
|
||||
limit: Arc<tokio::sync::Semaphore>,
|
||||
timeline: Arc<crate::tenant::Timeline>,
|
||||
lsn: utils::lsn::Lsn,
|
||||
ctx: &TimelineRequestContext,
|
||||
) -> Result<TimelineAtLsnSizeResult, RecvError> {
|
||||
let _permit = tokio::sync::Semaphore::acquire_owned(limit)
|
||||
.await
|
||||
.expect("global semaphore should not had been closed");
|
||||
.expect("global semaphore should not have been closed");
|
||||
|
||||
let size_res = timeline
|
||||
.spawn_ondemand_logical_size_calculation(lsn)
|
||||
.spawn_ondemand_logical_size_calculation(lsn, ctx)
|
||||
.await?;
|
||||
Ok(TimelineAtLsnSizeResult(timeline, lsn, size_res))
|
||||
}
|
||||
|
||||
@@ -65,6 +65,7 @@ async fn compaction_loop(tenant_id: TenantId) {
|
||||
ControlFlow::Continue(tenant) => tenant,
|
||||
},
|
||||
};
|
||||
let tenant_ctx = tenant.get_context();
|
||||
|
||||
let mut sleep_duration = tenant.get_compaction_period();
|
||||
if sleep_duration == Duration::ZERO {
|
||||
@@ -73,7 +74,7 @@ async fn compaction_loop(tenant_id: TenantId) {
|
||||
sleep_duration = Duration::from_secs(10);
|
||||
} else {
|
||||
// Run compaction
|
||||
if let Err(e) = tenant.compaction_iteration().await {
|
||||
if let Err(e) = tenant.compaction_iteration(&tenant_ctx).await {
|
||||
sleep_duration = wait_duration;
|
||||
error!("Compaction failed, retrying in {:?}: {e:?}", sleep_duration);
|
||||
}
|
||||
@@ -116,6 +117,7 @@ async fn gc_loop(tenant_id: TenantId) {
|
||||
ControlFlow::Continue(tenant) => tenant,
|
||||
},
|
||||
};
|
||||
let tenant_ctx = tenant.get_context();
|
||||
|
||||
let gc_period = tenant.get_gc_period();
|
||||
let gc_horizon = tenant.get_gc_horizon();
|
||||
@@ -127,7 +129,9 @@ async fn gc_loop(tenant_id: TenantId) {
|
||||
} else {
|
||||
// Run gc
|
||||
if gc_horizon > 0 {
|
||||
if let Err(e) = tenant.gc_iteration(None, gc_horizon, tenant.get_pitr_interval()).await
|
||||
if let Err(e) = tenant
|
||||
.gc_iteration(None, gc_horizon, tenant.get_pitr_interval(), &tenant_ctx)
|
||||
.await
|
||||
{
|
||||
sleep_duration = wait_duration;
|
||||
error!("Gc failed, retrying in {:?}: {e:?}", sleep_duration);
|
||||
|
||||
@@ -23,6 +23,7 @@ use std::sync::atomic::{AtomicI64, Ordering as AtomicOrdering};
|
||||
use std::sync::{Arc, Mutex, MutexGuard, RwLock, Weak};
|
||||
use std::time::{Duration, Instant, SystemTime};
|
||||
|
||||
use crate::context::RequestContext;
|
||||
use crate::tenant::remote_timeline_client::{self, index::LayerFileMetadata};
|
||||
use crate::tenant::storage_layer::{
|
||||
DeltaFileName, DeltaLayerWriter, ImageFileName, ImageLayerWriter, InMemoryLayer, LayerFileName,
|
||||
@@ -34,6 +35,7 @@ use crate::tenant::{
|
||||
metadata::{save_metadata, TimelineMetadata},
|
||||
par_fsync,
|
||||
storage_layer::{PersistentLayer, ValueReconstructResult, ValueReconstructState},
|
||||
TenantRequestContext,
|
||||
};
|
||||
|
||||
use crate::config::PageServerConf;
|
||||
@@ -410,6 +412,18 @@ impl Timeline {
|
||||
self.latest_gc_cutoff_lsn.read()
|
||||
}
|
||||
|
||||
/// Return a new TenantRequestContext.
|
||||
///
|
||||
/// XXX: This is a placeholder. In the next commit, this will
|
||||
/// check that the timeline is still active.
|
||||
pub fn get_context(&self) -> TimelineRequestContext {
|
||||
TimelineRequestContext {
|
||||
ctx: TenantRequestContext {
|
||||
ctx: RequestContext {},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
/// Look up given page version.
|
||||
///
|
||||
/// If a remote layer file is needed, it is downloaded as part of this
|
||||
@@ -422,7 +436,12 @@ impl Timeline {
|
||||
/// an ancestor branch, for example, or waste a lot of cycles chasing the
|
||||
/// non-existing key.
|
||||
///
|
||||
pub async fn get(&self, key: Key, lsn: Lsn) -> Result<Bytes, PageReconstructError> {
|
||||
pub async fn get(
|
||||
&self,
|
||||
key: Key,
|
||||
lsn: Lsn,
|
||||
ctx: &TimelineRequestContext,
|
||||
) -> Result<Bytes, PageReconstructError> {
|
||||
if !lsn.is_valid() {
|
||||
return Err(PageReconstructError::Other(anyhow::anyhow!("Invalid LSN")));
|
||||
}
|
||||
@@ -450,7 +469,7 @@ impl Timeline {
|
||||
img: cached_page_img,
|
||||
};
|
||||
|
||||
self.get_reconstruct_data(key, lsn, &mut reconstruct_state)
|
||||
self.get_reconstruct_data(key, lsn, &mut reconstruct_state, ctx)
|
||||
.await?;
|
||||
|
||||
self.metrics
|
||||
@@ -513,7 +532,7 @@ impl Timeline {
|
||||
/// You should call this before any of the other get_* or list_* functions. Calling
|
||||
/// those functions with an LSN that has been processed yet is an error.
|
||||
///
|
||||
pub async fn wait_lsn(&self, lsn: Lsn) -> anyhow::Result<()> {
|
||||
pub async fn wait_lsn(&self, lsn: Lsn, ctx: &TimelineRequestContext) -> anyhow::Result<()> {
|
||||
anyhow::ensure!(self.is_active(), "Cannot wait for Lsn on inactive timeline");
|
||||
|
||||
// This should never be called from the WAL receiver, because that could lead
|
||||
@@ -558,7 +577,7 @@ impl Timeline {
|
||||
self.flush_frozen_layers_and_wait().await
|
||||
}
|
||||
|
||||
pub async fn compact(&self) -> anyhow::Result<()> {
|
||||
pub async fn compact(&self, ctx: &TimelineRequestContext) -> anyhow::Result<()> {
|
||||
let last_record_lsn = self.get_last_record_lsn();
|
||||
|
||||
// Last record Lsn could be zero in case the timeline was just created
|
||||
@@ -616,14 +635,16 @@ impl Timeline {
|
||||
.repartition(
|
||||
self.get_last_record_lsn(),
|
||||
self.get_compaction_target_size(),
|
||||
ctx,
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok((partitioning, lsn)) => {
|
||||
// 2. Create new image layers for partitions that have been modified
|
||||
// "enough".
|
||||
let layer_paths_to_upload =
|
||||
self.create_image_layers(&partitioning, lsn, false).await?;
|
||||
let layer_paths_to_upload = self
|
||||
.create_image_layers(&partitioning, lsn, false, ctx)
|
||||
.await?;
|
||||
if let Some(remote_client) = &self.remote_client {
|
||||
for (path, layer_metadata) in layer_paths_to_upload {
|
||||
remote_client.schedule_layer_file_upload(&path, &layer_metadata)?;
|
||||
@@ -632,7 +653,7 @@ impl Timeline {
|
||||
|
||||
// 3. Compact
|
||||
let timer = self.metrics.compact_time_histo.start_timer();
|
||||
self.compact_level0(target_file_size).await?;
|
||||
self.compact_level0(target_file_size, ctx).await?;
|
||||
timer.stop_and_record();
|
||||
|
||||
// If `create_image_layers' or `compact_level0` scheduled any
|
||||
@@ -673,7 +694,10 @@ impl Timeline {
|
||||
/// the initial size calculation has not been run (gets triggered on the first size access).
|
||||
///
|
||||
/// return size and boolean flag that shows if the size is exact
|
||||
pub fn get_current_logical_size(self: &Arc<Self>) -> anyhow::Result<(u64, bool)> {
|
||||
pub fn get_current_logical_size(
|
||||
self: &Arc<Self>,
|
||||
ctx: &TimelineRequestContext,
|
||||
) -> anyhow::Result<(u64, bool)> {
|
||||
let current_size = self.current_logical_size.current_size()?;
|
||||
debug!("Current size: {current_size:?}");
|
||||
|
||||
@@ -683,7 +707,7 @@ impl Timeline {
|
||||
(current_size, self.current_logical_size.initial_part_end)
|
||||
{
|
||||
is_exact = false;
|
||||
self.try_spawn_size_init_task(init_lsn);
|
||||
self.try_spawn_size_init_task(init_lsn, ctx);
|
||||
}
|
||||
|
||||
Ok((size, is_exact))
|
||||
@@ -887,7 +911,7 @@ impl Timeline {
|
||||
})
|
||||
}
|
||||
|
||||
pub(super) fn maybe_spawn_flush_loop(self: &Arc<Self>) {
|
||||
pub(super) fn maybe_spawn_flush_loop(self: &Arc<Self>, ctx: &TimelineRequestContext) {
|
||||
let mut flush_loop_state = self.flush_loop_state.lock().unwrap();
|
||||
match *flush_loop_state {
|
||||
FlushLoopState::NotStarted => (),
|
||||
@@ -929,7 +953,7 @@ impl Timeline {
|
||||
*flush_loop_state = FlushLoopState::Running;
|
||||
}
|
||||
|
||||
pub(super) fn launch_wal_receiver(self: &Arc<Self>) {
|
||||
pub(super) fn launch_wal_receiver(self: &Arc<Self>, ctx: &TimelineRequestContext) {
|
||||
if !is_broker_client_initialized() {
|
||||
if cfg!(test) {
|
||||
info!("not launching WAL receiver because broker client hasn't been initialized");
|
||||
@@ -939,6 +963,8 @@ impl Timeline {
|
||||
}
|
||||
}
|
||||
|
||||
let background_ctx = self.get_context();
|
||||
|
||||
info!(
|
||||
"launching WAL receiver for timeline {} of tenant {}",
|
||||
self.timeline_id, self.tenant_id
|
||||
@@ -961,6 +987,7 @@ impl Timeline {
|
||||
lagging_wal_timeout,
|
||||
max_lsn_wal_lag,
|
||||
crate::config::SAFEKEEPER_AUTH_TOKEN.get().cloned(),
|
||||
background_ctx,
|
||||
);
|
||||
}
|
||||
|
||||
@@ -1220,11 +1247,12 @@ impl Timeline {
|
||||
/// # TODO
|
||||
/// May be a bit cleaner to do things based on populated remote client,
|
||||
/// and then do things based on its upload_queue.latest_files.
|
||||
#[instrument(skip(self, index_part, up_to_date_metadata))]
|
||||
#[instrument(skip(self, index_part, up_to_date_metadata, ctx))]
|
||||
pub async fn reconcile_with_remote(
|
||||
&self,
|
||||
up_to_date_metadata: &TimelineMetadata,
|
||||
index_part: Option<&IndexPart>,
|
||||
ctx: &TimelineRequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
info!("starting");
|
||||
let remote_client = self
|
||||
@@ -1280,7 +1308,7 @@ impl Timeline {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn try_spawn_size_init_task(self: &Arc<Self>, init_lsn: Lsn) {
|
||||
fn try_spawn_size_init_task(self: &Arc<Self>, init_lsn: Lsn, ctx: &TimelineRequestContext) {
|
||||
let permit = match Arc::clone(&self.current_logical_size.initial_size_computation)
|
||||
.try_acquire_owned()
|
||||
{
|
||||
@@ -1342,6 +1370,7 @@ impl Timeline {
|
||||
pub fn spawn_ondemand_logical_size_calculation(
|
||||
self: &Arc<Self>,
|
||||
lsn: Lsn,
|
||||
ctx: &TimelineRequestContext,
|
||||
) -> oneshot::Receiver<Result<u64, CalculateLogicalSizeError>> {
|
||||
let (sender, receiver) = oneshot::channel();
|
||||
let self_clone = Arc::clone(self);
|
||||
@@ -1377,7 +1406,8 @@ impl Timeline {
|
||||
// synchronous file IO without .await inbetween
|
||||
// if there are no RemoteLayers that would require downloading.
|
||||
let h = tokio::runtime::Handle::current();
|
||||
h.block_on(self_calculation.calculate_logical_size(init_lsn, cancel))
|
||||
let ctx = self_calculation.get_context();
|
||||
h.block_on(self_calculation.calculate_logical_size(init_lsn, cancel, &ctx))
|
||||
})
|
||||
.await
|
||||
.context("Failed to spawn calculation result task")?
|
||||
@@ -1436,6 +1466,7 @@ impl Timeline {
|
||||
&self,
|
||||
up_to_lsn: Lsn,
|
||||
cancel: CancellationToken,
|
||||
ctx: &TimelineRequestContext,
|
||||
) -> Result<u64, CalculateLogicalSizeError> {
|
||||
info!(
|
||||
"Calculating logical size for timeline {} at {}",
|
||||
@@ -1478,7 +1509,7 @@ impl Timeline {
|
||||
self.metrics.logical_size_histo.start_timer()
|
||||
};
|
||||
let logical_size = self
|
||||
.get_current_logical_size_non_incremental(up_to_lsn, cancel)
|
||||
.get_current_logical_size_non_incremental(up_to_lsn, cancel, ctx)
|
||||
.await?;
|
||||
debug!("calculated logical size: {logical_size}");
|
||||
timer.stop_and_record();
|
||||
@@ -1555,6 +1586,7 @@ impl Timeline {
|
||||
key: Key,
|
||||
request_lsn: Lsn,
|
||||
reconstruct_state: &mut ValueReconstructState,
|
||||
ctx: &TimelineRequestContext,
|
||||
) -> Result<(), PageReconstructError> {
|
||||
// Start from the current timeline.
|
||||
let mut timeline_owned;
|
||||
@@ -1873,6 +1905,7 @@ impl Timeline {
|
||||
/// Layer flusher task's main loop.
|
||||
async fn flush_loop(&self, mut layer_flush_start_rx: tokio::sync::watch::Receiver<u64>) {
|
||||
info!("started flush loop");
|
||||
let ctx = self.get_context();
|
||||
loop {
|
||||
tokio::select! {
|
||||
_ = task_mgr::shutdown_watcher() => {
|
||||
@@ -1892,7 +1925,7 @@ impl Timeline {
|
||||
// drop 'layers' lock to allow concurrent reads and writes
|
||||
};
|
||||
if let Some(layer_to_flush) = layer_to_flush {
|
||||
if let Err(err) = self.flush_frozen_layer(layer_to_flush).await {
|
||||
if let Err(err) = self.flush_frozen_layer(layer_to_flush, &ctx).await {
|
||||
error!("could not flush frozen layer: {err:?}");
|
||||
break Err(err);
|
||||
}
|
||||
@@ -1957,8 +1990,12 @@ impl Timeline {
|
||||
}
|
||||
|
||||
/// Flush one frozen in-memory layer to disk, as a new delta layer.
|
||||
#[instrument(skip(self, frozen_layer), fields(tenant_id=%self.tenant_id, timeline_id=%self.timeline_id, layer=%frozen_layer.short_id()))]
|
||||
async fn flush_frozen_layer(&self, frozen_layer: Arc<InMemoryLayer>) -> anyhow::Result<()> {
|
||||
#[instrument(skip(self, frozen_layer, ctx), fields(tenant_id=%self.tenant_id, timeline_id=%self.timeline_id, layer=%frozen_layer.short_id()))]
|
||||
async fn flush_frozen_layer(
|
||||
&self,
|
||||
frozen_layer: Arc<InMemoryLayer>,
|
||||
ctx: &TimelineRequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
// As a special case, when we have just imported an image into the repository,
|
||||
// instead of writing out a L0 delta layer, we directly write out image layer
|
||||
// files instead. This is possible as long as *all* the data imported into the
|
||||
@@ -1967,9 +2004,9 @@ impl Timeline {
|
||||
let layer_paths_to_upload =
|
||||
if lsn_range.start == self.initdb_lsn && lsn_range.end == Lsn(self.initdb_lsn.0 + 1) {
|
||||
let (partitioning, _lsn) = self
|
||||
.repartition(self.initdb_lsn, self.get_compaction_target_size())
|
||||
.repartition(self.initdb_lsn, self.get_compaction_target_size(), ctx)
|
||||
.await?;
|
||||
self.create_image_layers(&partitioning, self.initdb_lsn, true)
|
||||
self.create_image_layers(&partitioning, self.initdb_lsn, true, ctx)
|
||||
.await?
|
||||
} else {
|
||||
// normal case, write out a L0 delta layer file.
|
||||
@@ -2119,6 +2156,7 @@ impl Timeline {
|
||||
&self,
|
||||
lsn: Lsn,
|
||||
partition_size: u64,
|
||||
ctx: &TimelineRequestContext,
|
||||
) -> anyhow::Result<(KeyPartitioning, Lsn)> {
|
||||
{
|
||||
let partitioning_guard = self.partitioning.lock().unwrap();
|
||||
@@ -2129,7 +2167,7 @@ impl Timeline {
|
||||
return Ok((partitioning_guard.0.clone(), partitioning_guard.1));
|
||||
}
|
||||
}
|
||||
let keyspace = self.collect_keyspace(lsn).await?;
|
||||
let keyspace = self.collect_keyspace(lsn, ctx).await?;
|
||||
let partitioning = keyspace.partition(partition_size);
|
||||
|
||||
let mut partitioning_guard = self.partitioning.lock().unwrap();
|
||||
@@ -2187,6 +2225,7 @@ impl Timeline {
|
||||
partitioning: &KeyPartitioning,
|
||||
lsn: Lsn,
|
||||
force: bool,
|
||||
ctx: &TimelineRequestContext,
|
||||
) -> Result<HashMap<LayerFileName, LayerFileMetadata>, PageReconstructError> {
|
||||
let timer = self.metrics.create_images_time_histo.start_timer();
|
||||
let mut image_layers: Vec<ImageLayer> = Vec::new();
|
||||
@@ -2211,7 +2250,7 @@ impl Timeline {
|
||||
for range in &partition.ranges {
|
||||
let mut key = range.start;
|
||||
while key < range.end {
|
||||
let img = match self.get(key, lsn).await {
|
||||
let img = match self.get(key, lsn, ctx).await {
|
||||
Ok(img) => img,
|
||||
Err(err) => {
|
||||
// If we fail to reconstruct a VM or FSM page, we can zero the
|
||||
@@ -2554,7 +2593,11 @@ impl Timeline {
|
||||
/// Collect a bunch of Level 0 layer files, and compact and reshuffle them as
|
||||
/// as Level 1 files.
|
||||
///
|
||||
async fn compact_level0(&self, target_file_size: u64) -> anyhow::Result<()> {
|
||||
async fn compact_level0(
|
||||
&self,
|
||||
target_file_size: u64,
|
||||
ctx: &TimelineRequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
let CompactLevel0Phase1Result {
|
||||
new_layers,
|
||||
deltas_to_compact,
|
||||
@@ -2570,10 +2613,7 @@ impl Timeline {
|
||||
// Do it here because we don't want to hold self.layers.write() while waiting.
|
||||
if let Some(remote_client) = &self.remote_client {
|
||||
debug!("waiting for upload ops to complete");
|
||||
remote_client
|
||||
.wait_completion()
|
||||
.await
|
||||
.context("wait for layer upload ops to complete")?;
|
||||
remote_client.wait_completion().await?;
|
||||
}
|
||||
|
||||
let mut layers = self.layers.write().unwrap();
|
||||
@@ -2662,6 +2702,7 @@ impl Timeline {
|
||||
retain_lsns: Vec<Lsn>,
|
||||
cutoff_horizon: Lsn,
|
||||
pitr: Duration,
|
||||
ctx: &TimelineRequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
// First, calculate pitr_cutoff_timestamp and then convert it to LSN.
|
||||
//
|
||||
@@ -2674,7 +2715,7 @@ impl Timeline {
|
||||
if let Some(pitr_cutoff_timestamp) = now.checked_sub(pitr) {
|
||||
let pitr_timestamp = to_pg_timestamp(pitr_cutoff_timestamp);
|
||||
|
||||
match self.find_lsn_for_timestamp(pitr_timestamp).await? {
|
||||
match self.find_lsn_for_timestamp(pitr_timestamp, ctx).await? {
|
||||
LsnForTimestamp::Present(lsn) => lsn,
|
||||
LsnForTimestamp::Future(lsn) => {
|
||||
// The timestamp is in the future. That sounds impossible,
|
||||
@@ -2724,7 +2765,7 @@ impl Timeline {
|
||||
/// within a layer file. We can only remove the whole file if it's fully
|
||||
/// obsolete.
|
||||
///
|
||||
pub(super) async fn gc(&self) -> anyhow::Result<GcResult> {
|
||||
pub(super) async fn gc(&self, ctx: &TimelineRequestContext) -> anyhow::Result<GcResult> {
|
||||
fail_point!("before-timeline-gc");
|
||||
|
||||
let _layer_removal_cs = self.layer_removal_cs.lock().await;
|
||||
@@ -2745,7 +2786,7 @@ impl Timeline {
|
||||
|
||||
let new_gc_cutoff = Lsn::min(horizon_cutoff, pitr_cutoff);
|
||||
|
||||
self.gc_timeline(horizon_cutoff, pitr_cutoff, retain_lsns, new_gc_cutoff)
|
||||
self.gc_timeline(horizon_cutoff, pitr_cutoff, retain_lsns, new_gc_cutoff, ctx)
|
||||
.instrument(
|
||||
info_span!("gc_timeline", timeline = %self.timeline_id, cutoff = %new_gc_cutoff),
|
||||
)
|
||||
@@ -2758,6 +2799,7 @@ impl Timeline {
|
||||
pitr_cutoff: Lsn,
|
||||
retain_lsns: Vec<Lsn>,
|
||||
new_gc_cutoff: Lsn,
|
||||
ctx: &TimelineRequestContext,
|
||||
) -> anyhow::Result<GcResult> {
|
||||
let now = SystemTime::now();
|
||||
let mut result: GcResult = GcResult::default();
|
||||
@@ -2796,10 +2838,7 @@ impl Timeline {
|
||||
// Do it here because we don't want to hold self.layers.write() while waiting.
|
||||
if let Some(remote_client) = &self.remote_client {
|
||||
debug!("waiting for upload ops to complete");
|
||||
remote_client
|
||||
.wait_completion()
|
||||
.await
|
||||
.context("wait for layer upload ops to complete")?;
|
||||
remote_client.wait_completion().await?;
|
||||
}
|
||||
|
||||
let mut layers_to_remove = Vec::new();
|
||||
@@ -3116,6 +3155,7 @@ impl Timeline {
|
||||
|
||||
pub async fn spawn_download_all_remote_layers(
|
||||
self: Arc<Self>,
|
||||
ctx: &TimelineRequestContext,
|
||||
) -> Result<DownloadRemoteLayersTaskInfo, DownloadRemoteLayersTaskInfo> {
|
||||
let mut status_guard = self.download_all_remote_layers_task_info.write().unwrap();
|
||||
if let Some(st) = &*status_guard {
|
||||
@@ -3130,6 +3170,8 @@ impl Timeline {
|
||||
}
|
||||
}
|
||||
|
||||
let child_ctx = self.get_context();
|
||||
|
||||
let self_clone = Arc::clone(&self);
|
||||
let task_id = task_mgr::spawn(
|
||||
task_mgr::BACKGROUND_RUNTIME.handle(),
|
||||
@@ -3139,7 +3181,7 @@ impl Timeline {
|
||||
"download all remote layers task",
|
||||
false,
|
||||
async move {
|
||||
self_clone.download_all_remote_layers().await;
|
||||
self_clone.download_all_remote_layers(&child_ctx).await;
|
||||
let mut status_guard = self_clone.download_all_remote_layers_task_info.write().unwrap();
|
||||
match &mut *status_guard {
|
||||
None => {
|
||||
@@ -3171,7 +3213,7 @@ impl Timeline {
|
||||
Ok(initial_info)
|
||||
}
|
||||
|
||||
async fn download_all_remote_layers(self: &Arc<Self>) {
|
||||
async fn download_all_remote_layers(self: &Arc<Self>, ctx: &TimelineRequestContext) {
|
||||
let mut downloads: FuturesUnordered<_> = {
|
||||
let layers = self.layers.read().unwrap();
|
||||
layers
|
||||
@@ -3335,3 +3377,16 @@ fn rename_to_backup(path: &Path) -> anyhow::Result<()> {
|
||||
|
||||
bail!("couldn't find an unused backup number for {:?}", path)
|
||||
}
|
||||
|
||||
/// XXX: Placeholder
|
||||
pub struct TimelineRequestContext {
|
||||
ctx: TenantRequestContext,
|
||||
}
|
||||
|
||||
impl Deref for TimelineRequestContext {
|
||||
type Target = RequestContext;
|
||||
|
||||
fn deref(&self) -> &RequestContext {
|
||||
&self.ctx.ctx
|
||||
}
|
||||
}
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@@ -13,7 +13,7 @@ use std::{collections::HashMap, num::NonZeroU64, ops::ControlFlow, sync::Arc, ti
|
||||
|
||||
use crate::task_mgr::TaskKind;
|
||||
use crate::task_mgr::WALRECEIVER_RUNTIME;
|
||||
use crate::tenant::Timeline;
|
||||
use crate::tenant::{Timeline, TimelineRequestContext};
|
||||
use crate::{task_mgr, walreceiver::TaskStateUpdate};
|
||||
use anyhow::Context;
|
||||
use chrono::{NaiveDateTime, Utc};
|
||||
@@ -46,6 +46,7 @@ pub fn spawn_connection_manager_task(
|
||||
lagging_wal_timeout: Duration,
|
||||
max_lsn_wal_lag: NonZeroU64,
|
||||
auth_token: Option<Arc<String>>,
|
||||
ctx: TimelineRequestContext,
|
||||
) {
|
||||
let mut broker_client = get_broker_client().clone();
|
||||
|
||||
@@ -78,6 +79,7 @@ pub fn spawn_connection_manager_task(
|
||||
loop_step_result = connection_manager_loop_step(
|
||||
&mut broker_client,
|
||||
&mut walreceiver_state,
|
||||
&ctx,
|
||||
) => match loop_step_result {
|
||||
ControlFlow::Continue(()) => continue,
|
||||
ControlFlow::Break(()) => {
|
||||
@@ -101,6 +103,7 @@ pub fn spawn_connection_manager_task(
|
||||
async fn connection_manager_loop_step(
|
||||
broker_client: &mut BrokerClientChannel,
|
||||
walreceiver_state: &mut WalreceiverState,
|
||||
ctx: &TimelineRequestContext,
|
||||
) -> ControlFlow<(), ()> {
|
||||
let mut timeline_state_updates = walreceiver_state.timeline.subscribe_for_state_updates();
|
||||
|
||||
@@ -226,6 +229,7 @@ async fn connection_manager_loop_step(
|
||||
.change_connection(
|
||||
new_candidate.safekeeper_id,
|
||||
new_candidate.wal_source_connconf,
|
||||
ctx,
|
||||
)
|
||||
.await
|
||||
}
|
||||
@@ -389,12 +393,14 @@ impl WalreceiverState {
|
||||
&mut self,
|
||||
new_sk_id: NodeId,
|
||||
new_wal_source_connconf: PgConnectionConfig,
|
||||
ctx: &TimelineRequestContext,
|
||||
) {
|
||||
self.drop_old_connection(true).await;
|
||||
|
||||
let id = self.id;
|
||||
let connect_timeout = self.wal_connect_timeout;
|
||||
let timeline = Arc::clone(&self.timeline);
|
||||
let child_ctx = timeline.get_context();
|
||||
let connection_handle = TaskHandle::spawn(move |events_sender, cancellation| {
|
||||
async move {
|
||||
super::walreceiver_connection::handle_walreceiver_connection(
|
||||
@@ -403,6 +409,7 @@ impl WalreceiverState {
|
||||
events_sender,
|
||||
cancellation,
|
||||
connect_timeout,
|
||||
child_ctx,
|
||||
)
|
||||
.await
|
||||
.context("walreceiver connection handling failure")
|
||||
@@ -1233,18 +1240,18 @@ mod tests {
|
||||
const DUMMY_SAFEKEEPER_HOST: &str = "safekeeper_connstr";
|
||||
|
||||
async fn dummy_state(harness: &TenantHarness<'_>) -> WalreceiverState {
|
||||
let tenant = harness.load().await;
|
||||
let tenant_ctx = tenant.get_context();
|
||||
let (timeline, timeline_ctx) = tenant
|
||||
.create_empty_timeline(TIMELINE_ID, Lsn(0), crate::DEFAULT_PG_VERSION, &tenant_ctx)
|
||||
.expect("Failed to create an empty timeline for dummy wal connection manager");
|
||||
let timeline = timeline.initialize(&timeline_ctx).unwrap();
|
||||
WalreceiverState {
|
||||
id: TenantTimelineId {
|
||||
tenant_id: harness.tenant_id,
|
||||
timeline_id: TIMELINE_ID,
|
||||
},
|
||||
timeline: harness
|
||||
.load()
|
||||
.await
|
||||
.create_empty_timeline(TIMELINE_ID, Lsn(0), crate::DEFAULT_PG_VERSION)
|
||||
.expect("Failed to create an empty timeline for dummy wal connection manager")
|
||||
.initialize()
|
||||
.unwrap(),
|
||||
timeline,
|
||||
wal_connect_timeout: Duration::from_secs(1),
|
||||
lagging_wal_timeout: Duration::from_secs(1),
|
||||
max_lsn_wal_lag: NonZeroU64::new(1024 * 1024).unwrap(),
|
||||
|
||||
@@ -27,7 +27,7 @@ use crate::{
|
||||
task_mgr,
|
||||
task_mgr::TaskKind,
|
||||
task_mgr::WALRECEIVER_RUNTIME,
|
||||
tenant::{Timeline, WalReceiverInfo},
|
||||
tenant::{Timeline, TimelineRequestContext, WalReceiverInfo},
|
||||
walingest::WalIngest,
|
||||
walrecord::DecodedWALRecord,
|
||||
};
|
||||
@@ -62,6 +62,7 @@ pub async fn handle_walreceiver_connection(
|
||||
events_sender: watch::Sender<TaskStateUpdate<WalConnectionStatus>>,
|
||||
cancellation: CancellationToken,
|
||||
connect_timeout: Duration,
|
||||
ctx: TimelineRequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
// Connect to the database in replication mode.
|
||||
info!("connecting to {wal_source_connconf:?}");
|
||||
@@ -180,7 +181,7 @@ pub async fn handle_walreceiver_connection(
|
||||
|
||||
let mut waldecoder = WalStreamDecoder::new(startpoint, timeline.pg_version);
|
||||
|
||||
let mut walingest = WalIngest::new(timeline.as_ref(), startpoint).await?;
|
||||
let mut walingest = WalIngest::new(timeline.as_ref(), startpoint, &ctx).await?;
|
||||
|
||||
while let Some(replication_message) = {
|
||||
select! {
|
||||
@@ -251,7 +252,7 @@ pub async fn handle_walreceiver_connection(
|
||||
ensure!(lsn.is_aligned());
|
||||
|
||||
walingest
|
||||
.ingest_record(recdata.clone(), lsn, &mut modification, &mut decoded)
|
||||
.ingest_record(recdata, lsn, &mut modification, &mut decoded, &ctx)
|
||||
.await
|
||||
.with_context(|| format!("could not ingest record at {lsn}"))?;
|
||||
|
||||
@@ -329,7 +330,7 @@ pub async fn handle_walreceiver_connection(
|
||||
// Send the replication feedback message.
|
||||
// Regular standby_status_update fields are put into this message.
|
||||
let (timeline_logical_size, _) = timeline
|
||||
.get_current_logical_size()
|
||||
.get_current_logical_size(&ctx)
|
||||
.context("Status update creation failed to get current logical size")?;
|
||||
let status_update = ReplicationFeedback {
|
||||
current_timeline_size: timeline_logical_size,
|
||||
|
||||
Reference in New Issue
Block a user