mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-15 17:32:56 +00:00
wire deletion queue into timeline
This commit is contained in:
@@ -33,6 +33,7 @@ struct FlushOp {
|
||||
tx: tokio::sync::oneshot::Sender<()>,
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct DeletionQueueClient {
|
||||
tx: tokio::sync::mpsc::Sender<QueueMessage>,
|
||||
}
|
||||
|
||||
@@ -397,6 +397,7 @@ impl Tenant {
|
||||
&self,
|
||||
timeline_id: TimelineId,
|
||||
remote_client: Option<RemoteTimelineClient>,
|
||||
deletion_queue_client: Option<DeletionQueueClient>,
|
||||
remote_startup_data: Option<RemoteStartupData>,
|
||||
local_metadata: Option<TimelineMetadata>,
|
||||
ancestor: Option<Arc<Timeline>>,
|
||||
@@ -418,6 +419,7 @@ impl Tenant {
|
||||
up_to_date_metadata,
|
||||
ancestor.clone(),
|
||||
remote_client,
|
||||
deletion_queue_client,
|
||||
init_order,
|
||||
CreateTimelineCause::Load,
|
||||
)?;
|
||||
@@ -589,6 +591,11 @@ impl Tenant {
|
||||
.as_ref()
|
||||
.ok_or_else(|| anyhow::anyhow!("cannot attach without remote storage"))?;
|
||||
|
||||
let deletion_queue_client = self
|
||||
.deletion_queue_client
|
||||
.as_ref()
|
||||
.ok_or_else(|| anyhow::anyhow!("cannot attach without deletion queue enabled"))?;
|
||||
|
||||
let remote_timeline_ids = remote_timeline_client::list_remote_timelines(
|
||||
remote_storage,
|
||||
self.conf,
|
||||
@@ -659,14 +666,21 @@ impl Tenant {
|
||||
.expect("just put it in above");
|
||||
|
||||
// TODO again handle early failure
|
||||
self.load_remote_timeline(timeline_id, index_part, remote_metadata, remote_client, ctx)
|
||||
.await
|
||||
.with_context(|| {
|
||||
format!(
|
||||
"failed to load remote timeline {} for tenant {}",
|
||||
timeline_id, self.tenant_id
|
||||
)
|
||||
})?;
|
||||
self.load_remote_timeline(
|
||||
timeline_id,
|
||||
index_part,
|
||||
remote_metadata,
|
||||
remote_client,
|
||||
deletion_queue_client.clone(),
|
||||
ctx,
|
||||
)
|
||||
.await
|
||||
.with_context(|| {
|
||||
format!(
|
||||
"failed to load remote timeline {} for tenant {}",
|
||||
timeline_id, self.tenant_id
|
||||
)
|
||||
})?;
|
||||
}
|
||||
|
||||
std::fs::remove_file(&marker_file)
|
||||
@@ -703,6 +717,7 @@ impl Tenant {
|
||||
index_part: IndexPart,
|
||||
remote_metadata: TimelineMetadata,
|
||||
remote_client: RemoteTimelineClient,
|
||||
deletion_queue_client: DeletionQueueClient,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
span::debug_assert_current_span_has_tenant_id();
|
||||
@@ -733,6 +748,7 @@ impl Tenant {
|
||||
self.timeline_init_and_sync(
|
||||
timeline_id,
|
||||
Some(remote_client),
|
||||
Some(deletion_queue_client),
|
||||
Some(RemoteStartupData {
|
||||
index_part,
|
||||
remote_metadata,
|
||||
@@ -1219,6 +1235,7 @@ impl Tenant {
|
||||
timeline_id,
|
||||
&local_metadata,
|
||||
Some(remote_client),
|
||||
self.deletion_queue_client.clone(),
|
||||
init_order,
|
||||
)
|
||||
.await
|
||||
@@ -1271,6 +1288,7 @@ impl Tenant {
|
||||
timeline_id,
|
||||
&local_metadata,
|
||||
None,
|
||||
None,
|
||||
init_order,
|
||||
)
|
||||
.await
|
||||
@@ -1295,6 +1313,7 @@ impl Tenant {
|
||||
self.timeline_init_and_sync(
|
||||
timeline_id,
|
||||
remote_client,
|
||||
self.deletion_queue_client.clone(),
|
||||
remote_startup_data,
|
||||
Some(local_metadata),
|
||||
ancestor,
|
||||
@@ -2156,6 +2175,7 @@ impl Tenant {
|
||||
new_metadata: &TimelineMetadata,
|
||||
ancestor: Option<Arc<Timeline>>,
|
||||
remote_client: Option<RemoteTimelineClient>,
|
||||
deletion_queue: Option<DeletionQueueClient>,
|
||||
init_order: Option<&InitializationOrder>,
|
||||
cause: CreateTimelineCause,
|
||||
) -> anyhow::Result<Arc<Timeline>> {
|
||||
@@ -2185,6 +2205,7 @@ impl Tenant {
|
||||
self.tenant_id,
|
||||
Arc::clone(&self.walredo_mgr),
|
||||
remote_client,
|
||||
deletion_queue,
|
||||
pg_version,
|
||||
initial_logical_size_can_start.cloned(),
|
||||
initial_logical_size_attempt.cloned().flatten(),
|
||||
@@ -2875,6 +2896,7 @@ impl Tenant {
|
||||
new_metadata,
|
||||
ancestor,
|
||||
remote_client,
|
||||
self.deletion_queue_client.clone(),
|
||||
None,
|
||||
CreateTimelineCause::Load,
|
||||
)
|
||||
|
||||
@@ -38,6 +38,7 @@ use std::time::{Duration, Instant, SystemTime};
|
||||
use crate::context::{
|
||||
AccessStatsBehavior, DownloadBehavior, RequestContext, RequestContextBuilder,
|
||||
};
|
||||
use crate::deletion_queue::DeletionQueueClient;
|
||||
use crate::tenant::remote_timeline_client::{self, index::LayerFileMetadata};
|
||||
use crate::tenant::storage_layer::delta_layer::DeltaEntry;
|
||||
use crate::tenant::storage_layer::{
|
||||
@@ -192,6 +193,9 @@ pub struct Timeline {
|
||||
/// See [`remote_timeline_client`](super::remote_timeline_client) module comment for details.
|
||||
pub remote_client: Option<Arc<RemoteTimelineClient>>,
|
||||
|
||||
/// Deletion queue: a global queue, separate to the remote storage queue's
|
||||
deletion_queue_client: Option<Arc<DeletionQueueClient>>,
|
||||
|
||||
// What page versions do we hold in the repository? If we get a
|
||||
// request > last_record_lsn, we need to wait until we receive all
|
||||
// the WAL up to the request. The SeqWait provides functions for
|
||||
@@ -1375,6 +1379,7 @@ impl Timeline {
|
||||
tenant_id: TenantId,
|
||||
walredo_mgr: Arc<dyn WalRedoManager + Send + Sync>,
|
||||
remote_client: Option<RemoteTimelineClient>,
|
||||
deletion_queue_client: Option<DeletionQueueClient>,
|
||||
pg_version: u32,
|
||||
initial_logical_size_can_start: Option<completion::Barrier>,
|
||||
initial_logical_size_attempt: Option<completion::Completion>,
|
||||
@@ -1410,6 +1415,7 @@ impl Timeline {
|
||||
walreceiver: Mutex::new(None),
|
||||
|
||||
remote_client: remote_client.map(Arc::new),
|
||||
deletion_queue_client: deletion_queue_client.map(Arc::new),
|
||||
|
||||
// initialize in-memory 'last_record_lsn' from 'disk_consistent_lsn'.
|
||||
last_record_lsn: SeqWait::new(RecordLsn {
|
||||
|
||||
@@ -14,6 +14,7 @@ use utils::{
|
||||
|
||||
use crate::{
|
||||
config::PageServerConf,
|
||||
deletion_queue::DeletionQueueClient,
|
||||
task_mgr::{self, TaskKind},
|
||||
tenant::{
|
||||
metadata::TimelineMetadata,
|
||||
@@ -407,6 +408,7 @@ impl DeleteTimelineFlow {
|
||||
timeline_id: TimelineId,
|
||||
local_metadata: &TimelineMetadata,
|
||||
remote_client: Option<RemoteTimelineClient>,
|
||||
deletion_queue_client: Option<DeletionQueueClient>,
|
||||
init_order: Option<&InitializationOrder>,
|
||||
) -> anyhow::Result<()> {
|
||||
// Note: here we even skip populating layer map. Timeline is essentially uninitialized.
|
||||
@@ -417,6 +419,7 @@ impl DeleteTimelineFlow {
|
||||
local_metadata,
|
||||
None, // Ancestor is not needed for deletion.
|
||||
remote_client,
|
||||
deletion_queue_client,
|
||||
init_order,
|
||||
// Important. We dont pass ancestor above because it can be missing.
|
||||
// Thus we need to skip the validation here.
|
||||
|
||||
Reference in New Issue
Block a user