diff --git a/safekeeper/src/http/routes.rs b/safekeeper/src/http/routes.rs
index 91ffa95c21..9b7424a818 100644
--- a/safekeeper/src/http/routes.rs
+++ b/safekeeper/src/http/routes.rs
@@ -389,6 +389,25 @@ async fn timeline_digest_handler(request: Request
) -> Result) -> Result, ApiError> {
+ let ttid = TenantTimelineId::new(
+ parse_request_param(&request, "tenant_id")?,
+ parse_request_param(&request, "timeline_id")?,
+ );
+ check_permission(&request, Some(ttid.tenant_id))?;
+
+ let tli = GlobalTimelines::get(ttid).map_err(ApiError::from)?;
+
+ let response = tli
+ .backup_partial_reset()
+ .await
+ .map_err(ApiError::InternalServerError)?;
+ json_response(StatusCode::OK, response)
+}
+
/// Used only in tests to hand craft required data.
async fn record_safekeeper_info(mut request: Request) -> Result, ApiError> {
let ttid = TenantTimelineId::new(
@@ -607,6 +626,10 @@ pub fn make_router(conf: SafeKeeperConf) -> RouterBuilder
.get("/v1/tenant/:tenant_id/timeline/:timeline_id/digest", |r| {
request_span(r, timeline_digest_handler)
})
+ .post(
+ "/v1/tenant/:tenant_id/timeline/:timeline_id/backup_partial_reset",
+ |r| request_span(r, timeline_backup_partial_reset),
+ )
.post("/v1/record_safekeeper_info/:tenant_id/:timeline_id", |r| {
request_span(r, record_safekeeper_info)
})
diff --git a/safekeeper/src/timeline.rs b/safekeeper/src/timeline.rs
index f7c96d4f02..95ee925e1a 100644
--- a/safekeeper/src/timeline.rs
+++ b/safekeeper/src/timeline.rs
@@ -908,6 +908,10 @@ impl Timeline {
Ok(WalResidentTimeline::new(self.clone(), guard))
}
+
+ pub async fn backup_partial_reset(self: &Arc) -> Result> {
+ self.manager_ctl.backup_partial_reset().await
+ }
}
/// This is a guard that allows to read/write disk timeline state.
diff --git a/safekeeper/src/timeline_eviction.rs b/safekeeper/src/timeline_eviction.rs
index 2ccb058720..5d0567575c 100644
--- a/safekeeper/src/timeline_eviction.rs
+++ b/safekeeper/src/timeline_eviction.rs
@@ -28,28 +28,38 @@ impl Manager {
/// - control file is flushed (no next event scheduled)
/// - no WAL residence guards
/// - no pushes to the broker
- /// - partial WAL backup is uploaded
+ /// - last partial WAL segment is uploaded
+ /// - all local segments before the uploaded partial are committed and uploaded
pub(crate) fn ready_for_eviction(
&self,
next_event: &Option,
state: &StateSnapshot,
) -> bool {
- self.backup_task.is_none()
+ let ready = self.backup_task.is_none()
&& self.recovery_task.is_none()
&& self.wal_removal_task.is_none()
&& self.partial_backup_task.is_none()
- && self.partial_backup_uploaded.is_some()
&& next_event.is_none()
&& self.access_service.is_empty()
&& !self.tli_broker_active.get()
+ // Partial segment of current flush_lsn is uploaded up to this flush_lsn.
&& !wal_backup_partial::needs_uploading(state, &self.partial_backup_uploaded)
+ // And it is the next one after the last removed. Given that local
+ // WAL is removed only after it is uploaded to s3 (and pageserver
+ // advancing remote_consistent_lsn) which happens only after WAL is
+ // committed, true means all this is done.
+ //
+ // This also works for the first segment despite last_removed_segno
+ // being 0 on init because this 0 triggers run of wal_removal_task
+ // on success of which manager updates the horizon.
&& self
.partial_backup_uploaded
.as_ref()
.unwrap()
.flush_lsn
.segment_number(self.wal_seg_size)
- == self.last_removed_segno + 1
+ == self.last_removed_segno + 1;
+ ready
}
/// Evict the timeline to remote storage.
@@ -83,7 +93,8 @@ impl Manager {
info!("successfully evicted timeline");
}
- /// Restore evicted timeline from remote storage.
+ /// Attempt to restore evicted timeline from remote storage; it must be
+ /// offloaded.
#[instrument(name = "unevict_timeline", skip_all)]
pub(crate) async fn unevict_timeline(&mut self) {
assert!(self.is_offloaded);
diff --git a/safekeeper/src/timeline_manager.rs b/safekeeper/src/timeline_manager.rs
index 482614fac7..f997f48454 100644
--- a/safekeeper/src/timeline_manager.rs
+++ b/safekeeper/src/timeline_manager.rs
@@ -11,12 +11,14 @@ use std::{
time::Duration,
};
+use futures::channel::oneshot;
use postgres_ffi::XLogSegNo;
use serde::{Deserialize, Serialize};
use tokio::{
task::{JoinError, JoinHandle},
time::Instant,
};
+use tokio_util::sync::CancellationToken;
use tracing::{debug, info, info_span, instrument, warn, Instrument};
use utils::lsn::Lsn;
@@ -33,7 +35,7 @@ use crate::{
timeline_guard::{AccessService, GuardId, ResidenceGuard},
timelines_set::{TimelineSetGuard, TimelinesSet},
wal_backup::{self, WalBackupTaskHandle},
- wal_backup_partial::{self, PartialRemoteSegment},
+ wal_backup_partial::{self, PartialBackup, PartialRemoteSegment},
SafeKeeperConf,
};
@@ -96,6 +98,8 @@ pub enum ManagerCtlMessage {
GuardRequest(tokio::sync::oneshot::Sender>),
/// Request to drop the guard.
GuardDrop(GuardId),
+ /// Request to reset uploaded partial backup state.
+ BackupPartialReset(oneshot::Sender>>),
}
impl std::fmt::Debug for ManagerCtlMessage {
@@ -103,6 +107,7 @@ impl std::fmt::Debug for ManagerCtlMessage {
match self {
ManagerCtlMessage::GuardRequest(_) => write!(f, "GuardRequest"),
ManagerCtlMessage::GuardDrop(id) => write!(f, "GuardDrop({:?})", id),
+ ManagerCtlMessage::BackupPartialReset(_) => write!(f, "BackupPartialReset"),
}
}
}
@@ -143,6 +148,19 @@ impl ManagerCtl {
.and_then(std::convert::identity)
}
+ /// Request timeline manager to reset uploaded partial segment state and
+ /// wait for the result.
+ pub async fn backup_partial_reset(&self) -> anyhow::Result> {
+ let (tx, rx) = oneshot::channel();
+ self.manager_tx
+ .send(ManagerCtlMessage::BackupPartialReset(tx))
+ .expect("manager task is not running");
+ match rx.await {
+ Ok(res) => res,
+ Err(_) => anyhow::bail!("timeline manager is gone"),
+ }
+ }
+
/// Must be called exactly once to bootstrap the manager.
pub fn bootstrap_manager(
&self,
@@ -181,7 +199,8 @@ pub(crate) struct Manager {
pub(crate) wal_removal_task: Option>>,
// partial backup
- pub(crate) partial_backup_task: Option>>,
+ pub(crate) partial_backup_task:
+ Option<(JoinHandle