mirror of
https://github.com/neondatabase/neon.git
synced 2026-06-03 13:30:38 +00:00
Merge branch 'main' into arpad/less_async_trait
This commit is contained in:
8
Cargo.lock
generated
8
Cargo.lock
generated
@@ -2235,9 +2235,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "h2"
|
||||
version = "0.3.24"
|
||||
version = "0.3.26"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "bb2c4422095b67ee78da96fbb51a4cc413b3b25883c7717ff7ca1ab31022c9c9"
|
||||
checksum = "81fe527a889e1532da5c525686d96d4c2e74cdd345badf8dfef9f6b39dd5f5e8"
|
||||
dependencies = [
|
||||
"bytes",
|
||||
"fnv",
|
||||
@@ -3436,9 +3436,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "ordered-multimap"
|
||||
version = "0.7.1"
|
||||
version = "0.7.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "a4d6a8c22fc714f0c2373e6091bf6f5e9b37b1bc0b1184874b7e0a4e303d318f"
|
||||
checksum = "49203cdcae0030493bad186b28da2fa25645fa276a51b6fec8010d281e02ef79"
|
||||
dependencies = [
|
||||
"dlv-list",
|
||||
"hashbrown 0.14.0",
|
||||
|
||||
@@ -565,6 +565,16 @@ impl GenericRemoteStorage {
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub struct StorageMetadata(HashMap<String, String>);
|
||||
|
||||
impl<const N: usize> From<[(&str, &str); N]> for StorageMetadata {
|
||||
fn from(arr: [(&str, &str); N]) -> Self {
|
||||
let map: HashMap<String, String> = arr
|
||||
.iter()
|
||||
.map(|(k, v)| (k.to_string(), v.to_string()))
|
||||
.collect();
|
||||
Self(map)
|
||||
}
|
||||
}
|
||||
|
||||
/// External backup storage configuration, enough for creating a client for that storage.
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub struct RemoteStorageConfig {
|
||||
|
||||
@@ -993,11 +993,26 @@ async fn tenant_status(
|
||||
check_permission(&request, Some(tenant_shard_id.tenant_id))?;
|
||||
let state = get_state(&request);
|
||||
|
||||
// In tests, sometimes we want to query the state of a tenant without auto-activating it if it's currently waiting.
|
||||
let activate = true;
|
||||
#[cfg(feature = "testing")]
|
||||
let activate = parse_query_param(&request, "activate")?.unwrap_or(activate);
|
||||
|
||||
let tenant_info = async {
|
||||
let tenant = state
|
||||
.tenant_manager
|
||||
.get_attached_tenant_shard(tenant_shard_id)?;
|
||||
|
||||
if activate {
|
||||
// This is advisory: we prefer to let the tenant activate on-demand when this function is
|
||||
// called, but it is still valid to return 200 and describe the current state of the tenant
|
||||
// if it doesn't make it into an active state.
|
||||
tenant
|
||||
.wait_to_become_active(ACTIVE_TENANT_TIMEOUT)
|
||||
.await
|
||||
.ok();
|
||||
}
|
||||
|
||||
// Calculate total physical size of all timelines
|
||||
let mut current_physical_size = 0;
|
||||
for timeline in tenant.list_timelines().iter() {
|
||||
|
||||
@@ -12,6 +12,7 @@
|
||||
//!
|
||||
|
||||
use anyhow::{bail, Context};
|
||||
use arc_swap::ArcSwap;
|
||||
use camino::Utf8Path;
|
||||
use camino::Utf8PathBuf;
|
||||
use enumset::EnumSet;
|
||||
@@ -98,7 +99,7 @@ use std::ops::Bound::Included;
|
||||
use std::sync::atomic::AtomicU64;
|
||||
use std::sync::atomic::Ordering;
|
||||
use std::sync::Arc;
|
||||
use std::sync::{Mutex, RwLock};
|
||||
use std::sync::Mutex;
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
use crate::span;
|
||||
@@ -260,7 +261,7 @@ pub struct Tenant {
|
||||
// We keep TenantConfOpt sturct here to preserve the information
|
||||
// about parameters that are not set.
|
||||
// This is necessary to allow global config updates.
|
||||
tenant_conf: Arc<RwLock<AttachedTenantConf>>,
|
||||
tenant_conf: Arc<ArcSwap<AttachedTenantConf>>,
|
||||
|
||||
tenant_shard_id: TenantShardId,
|
||||
|
||||
@@ -1606,7 +1607,7 @@ impl Tenant {
|
||||
);
|
||||
|
||||
{
|
||||
let conf = self.tenant_conf.read().unwrap();
|
||||
let conf = self.tenant_conf.load();
|
||||
|
||||
if !conf.location.may_delete_layers_hint() {
|
||||
info!("Skipping GC in location state {:?}", conf.location);
|
||||
@@ -1633,7 +1634,7 @@ impl Tenant {
|
||||
}
|
||||
|
||||
{
|
||||
let conf = self.tenant_conf.read().unwrap();
|
||||
let conf = self.tenant_conf.load();
|
||||
if !conf.location.may_delete_layers_hint() || !conf.location.may_upload_layers_hint() {
|
||||
info!("Skipping compaction in location state {:?}", conf.location);
|
||||
return Ok(());
|
||||
@@ -1782,7 +1783,7 @@ impl Tenant {
|
||||
async fn shutdown(
|
||||
&self,
|
||||
shutdown_progress: completion::Barrier,
|
||||
freeze_and_flush: bool,
|
||||
shutdown_mode: timeline::ShutdownMode,
|
||||
) -> Result<(), completion::Barrier> {
|
||||
span::debug_assert_current_span_has_tenant_id();
|
||||
|
||||
@@ -1829,16 +1830,8 @@ impl Tenant {
|
||||
timelines.values().for_each(|timeline| {
|
||||
let timeline = Arc::clone(timeline);
|
||||
let timeline_id = timeline.timeline_id;
|
||||
|
||||
let span =
|
||||
tracing::info_span!("timeline_shutdown", %timeline_id, ?freeze_and_flush);
|
||||
js.spawn(async move {
|
||||
if freeze_and_flush {
|
||||
timeline.flush_and_shutdown().instrument(span).await
|
||||
} else {
|
||||
timeline.shutdown().instrument(span).await
|
||||
}
|
||||
});
|
||||
let span = tracing::info_span!("timeline_shutdown", %timeline_id, ?shutdown_mode);
|
||||
js.spawn(async move { timeline.shutdown(shutdown_mode).instrument(span).await });
|
||||
})
|
||||
};
|
||||
// test_long_timeline_create_then_tenant_delete is leaning on this message
|
||||
@@ -2082,14 +2075,14 @@ impl Tenant {
|
||||
}
|
||||
|
||||
pub(crate) fn get_attach_mode(&self) -> AttachmentMode {
|
||||
self.tenant_conf.read().unwrap().location.attach_mode
|
||||
self.tenant_conf.load().location.attach_mode
|
||||
}
|
||||
|
||||
/// For API access: generate a LocationConfig equivalent to the one that would be used to
|
||||
/// create a Tenant in the same state. Do not use this in hot paths: it's for relatively
|
||||
/// rare external API calls, like a reconciliation at startup.
|
||||
pub(crate) fn get_location_conf(&self) -> models::LocationConfig {
|
||||
let conf = self.tenant_conf.read().unwrap();
|
||||
let conf = self.tenant_conf.load();
|
||||
|
||||
let location_config_mode = match conf.location.attach_mode {
|
||||
AttachmentMode::Single => models::LocationConfigMode::AttachedSingle,
|
||||
@@ -2236,7 +2229,7 @@ where
|
||||
|
||||
impl Tenant {
|
||||
pub fn tenant_specific_overrides(&self) -> TenantConfOpt {
|
||||
self.tenant_conf.read().unwrap().tenant_conf.clone()
|
||||
self.tenant_conf.load().tenant_conf.clone()
|
||||
}
|
||||
|
||||
pub fn effective_config(&self) -> TenantConf {
|
||||
@@ -2245,84 +2238,84 @@ impl Tenant {
|
||||
}
|
||||
|
||||
pub fn get_checkpoint_distance(&self) -> u64 {
|
||||
let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf.clone();
|
||||
let tenant_conf = self.tenant_conf.load().tenant_conf.clone();
|
||||
tenant_conf
|
||||
.checkpoint_distance
|
||||
.unwrap_or(self.conf.default_tenant_conf.checkpoint_distance)
|
||||
}
|
||||
|
||||
pub fn get_checkpoint_timeout(&self) -> Duration {
|
||||
let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf.clone();
|
||||
let tenant_conf = self.tenant_conf.load().tenant_conf.clone();
|
||||
tenant_conf
|
||||
.checkpoint_timeout
|
||||
.unwrap_or(self.conf.default_tenant_conf.checkpoint_timeout)
|
||||
}
|
||||
|
||||
pub fn get_compaction_target_size(&self) -> u64 {
|
||||
let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf.clone();
|
||||
let tenant_conf = self.tenant_conf.load().tenant_conf.clone();
|
||||
tenant_conf
|
||||
.compaction_target_size
|
||||
.unwrap_or(self.conf.default_tenant_conf.compaction_target_size)
|
||||
}
|
||||
|
||||
pub fn get_compaction_period(&self) -> Duration {
|
||||
let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf.clone();
|
||||
let tenant_conf = self.tenant_conf.load().tenant_conf.clone();
|
||||
tenant_conf
|
||||
.compaction_period
|
||||
.unwrap_or(self.conf.default_tenant_conf.compaction_period)
|
||||
}
|
||||
|
||||
pub fn get_compaction_threshold(&self) -> usize {
|
||||
let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf.clone();
|
||||
let tenant_conf = self.tenant_conf.load().tenant_conf.clone();
|
||||
tenant_conf
|
||||
.compaction_threshold
|
||||
.unwrap_or(self.conf.default_tenant_conf.compaction_threshold)
|
||||
}
|
||||
|
||||
pub fn get_gc_horizon(&self) -> u64 {
|
||||
let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf.clone();
|
||||
let tenant_conf = self.tenant_conf.load().tenant_conf.clone();
|
||||
tenant_conf
|
||||
.gc_horizon
|
||||
.unwrap_or(self.conf.default_tenant_conf.gc_horizon)
|
||||
}
|
||||
|
||||
pub fn get_gc_period(&self) -> Duration {
|
||||
let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf.clone();
|
||||
let tenant_conf = self.tenant_conf.load().tenant_conf.clone();
|
||||
tenant_conf
|
||||
.gc_period
|
||||
.unwrap_or(self.conf.default_tenant_conf.gc_period)
|
||||
}
|
||||
|
||||
pub fn get_image_creation_threshold(&self) -> usize {
|
||||
let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf.clone();
|
||||
let tenant_conf = self.tenant_conf.load().tenant_conf.clone();
|
||||
tenant_conf
|
||||
.image_creation_threshold
|
||||
.unwrap_or(self.conf.default_tenant_conf.image_creation_threshold)
|
||||
}
|
||||
|
||||
pub fn get_pitr_interval(&self) -> Duration {
|
||||
let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf.clone();
|
||||
let tenant_conf = self.tenant_conf.load().tenant_conf.clone();
|
||||
tenant_conf
|
||||
.pitr_interval
|
||||
.unwrap_or(self.conf.default_tenant_conf.pitr_interval)
|
||||
}
|
||||
|
||||
pub fn get_trace_read_requests(&self) -> bool {
|
||||
let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf.clone();
|
||||
let tenant_conf = self.tenant_conf.load().tenant_conf.clone();
|
||||
tenant_conf
|
||||
.trace_read_requests
|
||||
.unwrap_or(self.conf.default_tenant_conf.trace_read_requests)
|
||||
}
|
||||
|
||||
pub fn get_min_resident_size_override(&self) -> Option<u64> {
|
||||
let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf.clone();
|
||||
let tenant_conf = self.tenant_conf.load().tenant_conf.clone();
|
||||
tenant_conf
|
||||
.min_resident_size_override
|
||||
.or(self.conf.default_tenant_conf.min_resident_size_override)
|
||||
}
|
||||
|
||||
pub fn get_heatmap_period(&self) -> Option<Duration> {
|
||||
let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf.clone();
|
||||
let tenant_conf = self.tenant_conf.load().tenant_conf.clone();
|
||||
let heatmap_period = tenant_conf
|
||||
.heatmap_period
|
||||
.unwrap_or(self.conf.default_tenant_conf.heatmap_period);
|
||||
@@ -2334,26 +2327,40 @@ impl Tenant {
|
||||
}
|
||||
|
||||
pub fn set_new_tenant_config(&self, new_tenant_conf: TenantConfOpt) {
|
||||
self.tenant_conf.write().unwrap().tenant_conf = new_tenant_conf;
|
||||
self.tenant_conf_updated();
|
||||
// Use read-copy-update in order to avoid overwriting the location config
|
||||
// state if this races with [`Tenant::set_new_location_config`]. Note that
|
||||
// this race is not possible if both request types come from the storage
|
||||
// controller (as they should!) because an exclusive op lock is required
|
||||
// on the storage controller side.
|
||||
self.tenant_conf.rcu(|inner| {
|
||||
Arc::new(AttachedTenantConf {
|
||||
tenant_conf: new_tenant_conf.clone(),
|
||||
location: inner.location,
|
||||
})
|
||||
});
|
||||
|
||||
self.tenant_conf_updated(&new_tenant_conf);
|
||||
// Don't hold self.timelines.lock() during the notifies.
|
||||
// There's no risk of deadlock right now, but there could be if we consolidate
|
||||
// mutexes in struct Timeline in the future.
|
||||
let timelines = self.list_timelines();
|
||||
for timeline in timelines {
|
||||
timeline.tenant_conf_updated();
|
||||
timeline.tenant_conf_updated(&new_tenant_conf);
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn set_new_location_config(&self, new_conf: AttachedTenantConf) {
|
||||
*self.tenant_conf.write().unwrap() = new_conf;
|
||||
self.tenant_conf_updated();
|
||||
let new_tenant_conf = new_conf.tenant_conf.clone();
|
||||
|
||||
self.tenant_conf.store(Arc::new(new_conf));
|
||||
|
||||
self.tenant_conf_updated(&new_tenant_conf);
|
||||
// Don't hold self.timelines.lock() during the notifies.
|
||||
// There's no risk of deadlock right now, but there could be if we consolidate
|
||||
// mutexes in struct Timeline in the future.
|
||||
let timelines = self.list_timelines();
|
||||
for timeline in timelines {
|
||||
timeline.tenant_conf_updated();
|
||||
timeline.tenant_conf_updated(&new_tenant_conf);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2367,11 +2374,8 @@ impl Tenant {
|
||||
.unwrap_or(psconf.default_tenant_conf.timeline_get_throttle.clone())
|
||||
}
|
||||
|
||||
pub(crate) fn tenant_conf_updated(&self) {
|
||||
let conf = {
|
||||
let guard = self.tenant_conf.read().unwrap();
|
||||
Self::get_timeline_get_throttle_config(self.conf, &guard.tenant_conf)
|
||||
};
|
||||
pub(crate) fn tenant_conf_updated(&self, new_conf: &TenantConfOpt) {
|
||||
let conf = Self::get_timeline_get_throttle_config(self.conf, new_conf);
|
||||
self.timeline_get_throttle.reconfigure(conf)
|
||||
}
|
||||
|
||||
@@ -2519,7 +2523,7 @@ impl Tenant {
|
||||
Tenant::get_timeline_get_throttle_config(conf, &attached_conf.tenant_conf),
|
||||
&crate::metrics::tenant_throttling::TIMELINE_GET,
|
||||
)),
|
||||
tenant_conf: Arc::new(RwLock::new(attached_conf)),
|
||||
tenant_conf: Arc::new(ArcSwap::from_pointee(attached_conf)),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3505,7 +3509,7 @@ impl Tenant {
|
||||
}
|
||||
|
||||
pub(crate) fn get_tenant_conf(&self) -> TenantConfOpt {
|
||||
self.tenant_conf.read().unwrap().tenant_conf.clone()
|
||||
self.tenant_conf.load().tenant_conf.clone()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3854,6 +3858,7 @@ mod tests {
|
||||
use hex_literal::hex;
|
||||
use pageserver_api::keyspace::KeySpace;
|
||||
use rand::{thread_rng, Rng};
|
||||
use tests::timeline::ShutdownMode;
|
||||
|
||||
static TEST_KEY: Lazy<Key> =
|
||||
Lazy::new(|| Key::from_slice(&hex!("010000000033333333444444445500000001")));
|
||||
@@ -4299,7 +4304,7 @@ mod tests {
|
||||
make_some_layers(tline.as_ref(), Lsn(0x8000), &ctx).await?;
|
||||
// so that all uploads finish & we can call harness.load() below again
|
||||
tenant
|
||||
.shutdown(Default::default(), true)
|
||||
.shutdown(Default::default(), ShutdownMode::FreezeAndFlush)
|
||||
.instrument(harness.span())
|
||||
.await
|
||||
.ok()
|
||||
@@ -4340,7 +4345,7 @@ mod tests {
|
||||
|
||||
// so that all uploads finish & we can call harness.load() below again
|
||||
tenant
|
||||
.shutdown(Default::default(), true)
|
||||
.shutdown(Default::default(), ShutdownMode::FreezeAndFlush)
|
||||
.instrument(harness.span())
|
||||
.await
|
||||
.ok()
|
||||
@@ -5121,7 +5126,7 @@ mod tests {
|
||||
// Leave the timeline ID in [`Tenant::timelines_creating`] to exclude attempting to create it again
|
||||
let raw_tline = tline.raw_timeline().unwrap();
|
||||
raw_tline
|
||||
.shutdown()
|
||||
.shutdown(super::timeline::ShutdownMode::Hard)
|
||||
.instrument(info_span!("test_shutdown", tenant_id=%raw_tline.tenant_shard_id, shard_id=%raw_tline.tenant_shard_id.shard_slug(), timeline_id=%TIMELINE_ID))
|
||||
.await;
|
||||
std::mem::forget(tline);
|
||||
|
||||
@@ -14,7 +14,10 @@ use crate::{
|
||||
config::PageServerConf,
|
||||
context::RequestContext,
|
||||
task_mgr::{self, TaskKind},
|
||||
tenant::mgr::{TenantSlot, TenantsMapRemoveResult},
|
||||
tenant::{
|
||||
mgr::{TenantSlot, TenantsMapRemoveResult},
|
||||
timeline::ShutdownMode,
|
||||
},
|
||||
};
|
||||
|
||||
use super::{
|
||||
@@ -463,7 +466,7 @@ impl DeleteTenantFlow {
|
||||
// tenant.shutdown
|
||||
// Its also bad that we're holding tenants.read here.
|
||||
// TODO relax set_stopping to be idempotent?
|
||||
if tenant.shutdown(progress, false).await.is_err() {
|
||||
if tenant.shutdown(progress, ShutdownMode::Hard).await.is_err() {
|
||||
return Err(DeleteTenantError::Other(anyhow::anyhow!(
|
||||
"tenant shutdown is already in progress"
|
||||
)));
|
||||
|
||||
@@ -44,6 +44,7 @@ use crate::tenant::config::{
|
||||
use crate::tenant::delete::DeleteTenantFlow;
|
||||
use crate::tenant::span::debug_assert_current_span_has_tenant_id;
|
||||
use crate::tenant::storage_layer::inmemory_layer;
|
||||
use crate::tenant::timeline::ShutdownMode;
|
||||
use crate::tenant::{AttachedTenantConf, SpawnMode, Tenant, TenantState};
|
||||
use crate::{InitializationOrder, IGNORED_TENANT_FILE_NAME, METADATA_FILE_NAME, TEMP_FILE_SUFFIX};
|
||||
|
||||
@@ -783,11 +784,9 @@ async fn shutdown_all_tenants0(tenants: &std::sync::RwLock<TenantsMap>) {
|
||||
shutdown_state.insert(tenant_shard_id, TenantSlot::Attached(t.clone()));
|
||||
join_set.spawn(
|
||||
async move {
|
||||
let freeze_and_flush = true;
|
||||
|
||||
let res = {
|
||||
let (_guard, shutdown_progress) = completion::channel();
|
||||
t.shutdown(shutdown_progress, freeze_and_flush).await
|
||||
t.shutdown(shutdown_progress, ShutdownMode::FreezeAndFlush).await
|
||||
};
|
||||
|
||||
if let Err(other_progress) = res {
|
||||
@@ -1107,7 +1106,7 @@ impl TenantManager {
|
||||
};
|
||||
|
||||
info!("Shutting down attached tenant");
|
||||
match tenant.shutdown(progress, false).await {
|
||||
match tenant.shutdown(progress, ShutdownMode::Hard).await {
|
||||
Ok(()) => {}
|
||||
Err(barrier) => {
|
||||
info!("Shutdown already in progress, waiting for it to complete");
|
||||
@@ -1223,7 +1222,7 @@ impl TenantManager {
|
||||
TenantSlot::Attached(tenant) => {
|
||||
let (_guard, progress) = utils::completion::channel();
|
||||
info!("Shutting down just-spawned tenant, because tenant manager is shut down");
|
||||
match tenant.shutdown(progress, false).await {
|
||||
match tenant.shutdown(progress, ShutdownMode::Hard).await {
|
||||
Ok(()) => {
|
||||
info!("Finished shutting down just-spawned tenant");
|
||||
}
|
||||
@@ -1273,7 +1272,7 @@ impl TenantManager {
|
||||
};
|
||||
|
||||
let (_guard, progress) = utils::completion::channel();
|
||||
match tenant.shutdown(progress, false).await {
|
||||
match tenant.shutdown(progress, ShutdownMode::Hard).await {
|
||||
Ok(()) => {
|
||||
slot_guard.drop_old_value()?;
|
||||
}
|
||||
@@ -1677,7 +1676,7 @@ impl TenantManager {
|
||||
|
||||
// Phase 5: Shut down the parent shard, and erase it from disk
|
||||
let (_guard, progress) = completion::channel();
|
||||
match parent.shutdown(progress, false).await {
|
||||
match parent.shutdown(progress, ShutdownMode::Hard).await {
|
||||
Ok(()) => {}
|
||||
Err(other) => {
|
||||
other.wait().await;
|
||||
@@ -2664,11 +2663,11 @@ where
|
||||
let attached_tenant = match slot_guard.get_old_value() {
|
||||
Some(TenantSlot::Attached(tenant)) => {
|
||||
// whenever we remove a tenant from memory, we don't want to flush and wait for upload
|
||||
let freeze_and_flush = false;
|
||||
let shutdown_mode = ShutdownMode::Hard;
|
||||
|
||||
// shutdown is sure to transition tenant to stopping, and wait for all tasks to complete, so
|
||||
// that we can continue safely to cleanup.
|
||||
match tenant.shutdown(progress, freeze_and_flush).await {
|
||||
match tenant.shutdown(progress, shutdown_mode).await {
|
||||
Ok(()) => {}
|
||||
Err(_other) => {
|
||||
// if pageserver shutdown or other detach/ignore is already ongoing, we don't want to
|
||||
|
||||
@@ -1569,7 +1569,7 @@ impl RemoteTimelineClient {
|
||||
/// Use [`RemoteTimelineClient::shutdown`] for graceful stop.
|
||||
///
|
||||
/// In-progress operations will still be running after this function returns.
|
||||
/// Use `task_mgr::shutdown_tasks(None, Some(self.tenant_id), Some(timeline_id))`
|
||||
/// Use `task_mgr::shutdown_tasks(Some(TaskKind::RemoteUploadTask), Some(self.tenant_shard_id), Some(timeline_id))`
|
||||
/// to wait for them to complete, after calling this function.
|
||||
pub(crate) fn stop(&self) {
|
||||
// Whichever *task* for this RemoteTimelineClient grabs the mutex first will transition the queue
|
||||
|
||||
@@ -9,6 +9,7 @@ pub mod uninit;
|
||||
mod walreceiver;
|
||||
|
||||
use anyhow::{anyhow, bail, ensure, Context, Result};
|
||||
use arc_swap::ArcSwap;
|
||||
use bytes::Bytes;
|
||||
use camino::Utf8Path;
|
||||
use enumset::EnumSet;
|
||||
@@ -183,7 +184,7 @@ pub(crate) struct AuxFilesState {
|
||||
|
||||
pub struct Timeline {
|
||||
conf: &'static PageServerConf,
|
||||
tenant_conf: Arc<RwLock<AttachedTenantConf>>,
|
||||
tenant_conf: Arc<ArcSwap<AttachedTenantConf>>,
|
||||
|
||||
myself: Weak<Self>,
|
||||
|
||||
@@ -618,6 +619,19 @@ pub(crate) enum WaitLsnWaiter<'a> {
|
||||
PageService,
|
||||
}
|
||||
|
||||
/// Argument to [`Timeline::shutdown`].
|
||||
#[derive(Debug, Clone, Copy)]
|
||||
pub(crate) enum ShutdownMode {
|
||||
/// Graceful shutdown, may do a lot of I/O as we flush any open layers to disk and then
|
||||
/// also to remote storage. This method can easily take multiple seconds for a busy timeline.
|
||||
///
|
||||
/// While we are flushing, we continue to accept read I/O for LSNs ingested before
|
||||
/// the call to [`Timeline::shutdown`].
|
||||
FreezeAndFlush,
|
||||
/// Shut down immediately, without waiting for any open layers to flush.
|
||||
Hard,
|
||||
}
|
||||
|
||||
/// Public interface functions
|
||||
impl Timeline {
|
||||
/// Get the LSN where this branch was created
|
||||
@@ -1305,86 +1319,119 @@ impl Timeline {
|
||||
self.launch_eviction_task(parent, background_jobs_can_start);
|
||||
}
|
||||
|
||||
/// Graceful shutdown, may do a lot of I/O as we flush any open layers to disk and then
|
||||
/// also to remote storage. This method can easily take multiple seconds for a busy timeline.
|
||||
/// After this function returns, there are no timeline-scoped tasks are left running.
|
||||
///
|
||||
/// While we are flushing, we continue to accept read I/O.
|
||||
pub(crate) async fn flush_and_shutdown(&self) {
|
||||
/// The preferred pattern for is:
|
||||
/// - in any spawned tasks, keep Timeline::guard open + Timeline::cancel / child token
|
||||
/// - if early shutdown (not just cancellation) of a sub-tree of tasks is required,
|
||||
/// go the extra mile and keep track of JoinHandles
|
||||
/// - Keep track of JoinHandles using a passed-down `Arc<Mutex<Option<JoinSet>>>` or similar,
|
||||
/// instead of spawning directly on a runtime. It is a more composable / testable pattern.
|
||||
///
|
||||
/// For legacy reasons, we still have multiple tasks spawned using
|
||||
/// `task_mgr::spawn(X, Some(tenant_id), Some(timeline_id))`.
|
||||
/// We refer to these as "timeline-scoped task_mgr tasks".
|
||||
/// Some of these tasks are already sensitive to Timeline::cancel while others are
|
||||
/// not sensitive to Timeline::cancel and instead respect [`task_mgr::shutdown_token`]
|
||||
/// or [`task_mgr::shutdown_watcher`].
|
||||
/// We want to gradually convert the code base away from these.
|
||||
///
|
||||
/// Here is an inventory of timeline-scoped task_mgr tasks that are still sensitive to
|
||||
/// `task_mgr::shutdown_{token,watcher}` (there are also tenant-scoped and global-scoped
|
||||
/// ones that aren't mentioned here):
|
||||
/// - [`TaskKind::TimelineDeletionWorker`]
|
||||
/// - NB: also used for tenant deletion
|
||||
/// - [`TaskKind::RemoteUploadTask`]`
|
||||
/// - [`TaskKind::InitialLogicalSizeCalculation`]
|
||||
/// - [`TaskKind::DownloadAllRemoteLayers`] (can we get rid of it?)
|
||||
// Inventory of timeline-scoped task_mgr tasks that use spawn but aren't sensitive:
|
||||
/// - [`TaskKind::Eviction`]
|
||||
/// - [`TaskKind::LayerFlushTask`]
|
||||
/// - [`TaskKind::OndemandLogicalSizeCalculation`]
|
||||
/// - [`TaskKind::GarbageCollector`] (immediate_gc is timeline-scoped)
|
||||
pub(crate) async fn shutdown(&self, mode: ShutdownMode) {
|
||||
debug_assert_current_span_has_tenant_and_timeline_id();
|
||||
|
||||
// Stop ingesting data. Walreceiver only provides cancellation but no
|
||||
// "wait until gone", because it uses the Timeline::gate. So, only
|
||||
// after the self.gate.close() in self.shutdown() below will we know for
|
||||
// sure that no walreceiver tasks are left.
|
||||
// This means that we might still be ingesting data during the call to
|
||||
// `self.freeze_and_flush()` below. That's not ideal, but, we don't have
|
||||
// the concept of a ChildGuard, which is what we'd need to properly model
|
||||
// early shutdown of the walreceiver task sub-tree before the other
|
||||
// Timeline task sub-trees.
|
||||
if let Some(walreceiver) = self.walreceiver.lock().unwrap().take() {
|
||||
let try_freeze_and_flush = match mode {
|
||||
ShutdownMode::FreezeAndFlush => true,
|
||||
ShutdownMode::Hard => false,
|
||||
};
|
||||
|
||||
// Regardless of whether we're going to try_freeze_and_flush
|
||||
// or not, stop ingesting any more data. Walreceiver only provides
|
||||
// cancellation but no "wait until gone", because it uses the Timeline::gate.
|
||||
// So, only after the self.gate.close() below will we know for sure that
|
||||
// no walreceiver tasks are left.
|
||||
// For `try_freeze_and_flush=true`, this means that we might still be ingesting
|
||||
// data during the call to `self.freeze_and_flush()` below.
|
||||
// That's not ideal, but, we don't have the concept of a ChildGuard,
|
||||
// which is what we'd need to properly model early shutdown of the walreceiver
|
||||
// task sub-tree before the other Timeline task sub-trees.
|
||||
let walreceiver = self.walreceiver.lock().unwrap().take();
|
||||
tracing::debug!(
|
||||
is_some = walreceiver.is_some(),
|
||||
"Waiting for WalReceiverManager..."
|
||||
);
|
||||
if let Some(walreceiver) = walreceiver {
|
||||
walreceiver.cancel();
|
||||
}
|
||||
|
||||
// Since we have shut down WAL ingest, we should not let anyone start waiting for the LSN to advance
|
||||
// ... and inform any waiters for newer LSNs that there won't be any.
|
||||
self.last_record_lsn.shutdown();
|
||||
|
||||
// now all writers to InMemory layer are gone, do the final flush if requested
|
||||
match self.freeze_and_flush().await {
|
||||
Ok(_) => {
|
||||
// drain the upload queue
|
||||
if let Some(client) = self.remote_client.as_ref() {
|
||||
// if we did not wait for completion here, it might be our shutdown process
|
||||
// didn't wait for remote uploads to complete at all, as new tasks can forever
|
||||
// be spawned.
|
||||
//
|
||||
// what is problematic is the shutting down of RemoteTimelineClient, because
|
||||
// obviously it does not make sense to stop while we wait for it, but what
|
||||
// about corner cases like s3 suddenly hanging up?
|
||||
client.shutdown().await;
|
||||
if try_freeze_and_flush {
|
||||
// we shut down walreceiver above, so, we won't add anything more
|
||||
// to the InMemoryLayer; freeze it and wait for all frozen layers
|
||||
// to reach the disk & upload queue, then shut the upload queue and
|
||||
// wait for it to drain.
|
||||
match self.freeze_and_flush().await {
|
||||
Ok(_) => {
|
||||
// drain the upload queue
|
||||
if let Some(client) = self.remote_client.as_ref() {
|
||||
// if we did not wait for completion here, it might be our shutdown process
|
||||
// didn't wait for remote uploads to complete at all, as new tasks can forever
|
||||
// be spawned.
|
||||
//
|
||||
// what is problematic is the shutting down of RemoteTimelineClient, because
|
||||
// obviously it does not make sense to stop while we wait for it, but what
|
||||
// about corner cases like s3 suddenly hanging up?
|
||||
client.shutdown().await;
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
// Non-fatal. Shutdown is infallible. Failures to flush just mean that
|
||||
// we have some extra WAL replay to do next time the timeline starts.
|
||||
warn!("failed to freeze and flush: {e:#}");
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
// Non-fatal. Shutdown is infallible. Failures to flush just mean that
|
||||
// we have some extra WAL replay to do next time the timeline starts.
|
||||
warn!("failed to freeze and flush: {e:#}");
|
||||
}
|
||||
}
|
||||
|
||||
self.shutdown().await;
|
||||
}
|
||||
|
||||
/// Shut down immediately, without waiting for any open layers to flush to disk. This is a subset of
|
||||
/// the graceful [`Timeline::flush_and_shutdown`] function.
|
||||
pub(crate) async fn shutdown(&self) {
|
||||
debug_assert_current_span_has_tenant_and_timeline_id();
|
||||
|
||||
// Signal any subscribers to our cancellation token to drop out
|
||||
tracing::debug!("Cancelling CancellationToken");
|
||||
self.cancel.cancel();
|
||||
|
||||
// Page request handlers might be waiting for LSN to advance: they do not respect Timeline::cancel
|
||||
// while doing so.
|
||||
self.last_record_lsn.shutdown();
|
||||
|
||||
// Shut down the layer flush task before the remote client, as one depends on the other
|
||||
task_mgr::shutdown_tasks(
|
||||
Some(TaskKind::LayerFlushTask),
|
||||
Some(self.tenant_shard_id),
|
||||
Some(self.timeline_id),
|
||||
)
|
||||
.await;
|
||||
|
||||
// Shut down remote timeline client: this gracefully moves its metadata into its Stopping state in
|
||||
// case our caller wants to use that for a deletion
|
||||
// Transition the remote_client into a state where it's only useful for timeline deletion.
|
||||
// (The deletion use case is why we can't just hook up remote_client to Self::cancel).)
|
||||
if let Some(remote_client) = self.remote_client.as_ref() {
|
||||
remote_client.stop();
|
||||
// As documented in remote_client.stop()'s doc comment, it's our responsibility
|
||||
// to shut down the upload queue tasks.
|
||||
// TODO: fix that, task management should be encapsulated inside remote_client.
|
||||
task_mgr::shutdown_tasks(
|
||||
Some(TaskKind::RemoteUploadTask),
|
||||
Some(self.tenant_shard_id),
|
||||
Some(self.timeline_id),
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
// TODO: work toward making this a no-op. See this funciton's doc comment for more context.
|
||||
tracing::debug!("Waiting for tasks...");
|
||||
|
||||
task_mgr::shutdown_tasks(None, Some(self.tenant_shard_id), Some(self.timeline_id)).await;
|
||||
|
||||
// Finally wait until any gate-holders are complete
|
||||
// Finally wait until any gate-holders are complete.
|
||||
//
|
||||
// TODO: once above shutdown_tasks is a no-op, we can close the gate before calling shutdown_tasks
|
||||
// and use a TBD variant of shutdown_tasks that asserts that there were no tasks left.
|
||||
self.gate.close().await;
|
||||
|
||||
self.metrics.shutdown();
|
||||
@@ -1588,57 +1635,65 @@ const REPARTITION_FREQ_IN_CHECKPOINT_DISTANCE: u64 = 10;
|
||||
// Private functions
|
||||
impl Timeline {
|
||||
pub(crate) fn get_lazy_slru_download(&self) -> bool {
|
||||
let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf.clone();
|
||||
let tenant_conf = self.tenant_conf.load();
|
||||
tenant_conf
|
||||
.tenant_conf
|
||||
.lazy_slru_download
|
||||
.unwrap_or(self.conf.default_tenant_conf.lazy_slru_download)
|
||||
}
|
||||
|
||||
fn get_checkpoint_distance(&self) -> u64 {
|
||||
let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf.clone();
|
||||
let tenant_conf = self.tenant_conf.load();
|
||||
tenant_conf
|
||||
.tenant_conf
|
||||
.checkpoint_distance
|
||||
.unwrap_or(self.conf.default_tenant_conf.checkpoint_distance)
|
||||
}
|
||||
|
||||
fn get_checkpoint_timeout(&self) -> Duration {
|
||||
let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf.clone();
|
||||
let tenant_conf = self.tenant_conf.load();
|
||||
tenant_conf
|
||||
.tenant_conf
|
||||
.checkpoint_timeout
|
||||
.unwrap_or(self.conf.default_tenant_conf.checkpoint_timeout)
|
||||
}
|
||||
|
||||
fn get_compaction_target_size(&self) -> u64 {
|
||||
let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf.clone();
|
||||
let tenant_conf = self.tenant_conf.load();
|
||||
tenant_conf
|
||||
.tenant_conf
|
||||
.compaction_target_size
|
||||
.unwrap_or(self.conf.default_tenant_conf.compaction_target_size)
|
||||
}
|
||||
|
||||
fn get_compaction_threshold(&self) -> usize {
|
||||
let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf.clone();
|
||||
let tenant_conf = self.tenant_conf.load();
|
||||
tenant_conf
|
||||
.tenant_conf
|
||||
.compaction_threshold
|
||||
.unwrap_or(self.conf.default_tenant_conf.compaction_threshold)
|
||||
}
|
||||
|
||||
fn get_image_creation_threshold(&self) -> usize {
|
||||
let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf.clone();
|
||||
let tenant_conf = self.tenant_conf.load();
|
||||
tenant_conf
|
||||
.tenant_conf
|
||||
.image_creation_threshold
|
||||
.unwrap_or(self.conf.default_tenant_conf.image_creation_threshold)
|
||||
}
|
||||
|
||||
fn get_compaction_algorithm(&self) -> CompactionAlgorithm {
|
||||
let tenant_conf = &self.tenant_conf.read().unwrap().tenant_conf;
|
||||
let tenant_conf = &self.tenant_conf.load();
|
||||
tenant_conf
|
||||
.tenant_conf
|
||||
.compaction_algorithm
|
||||
.unwrap_or(self.conf.default_tenant_conf.compaction_algorithm)
|
||||
}
|
||||
|
||||
fn get_eviction_policy(&self) -> EvictionPolicy {
|
||||
let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf.clone();
|
||||
let tenant_conf = self.tenant_conf.load();
|
||||
tenant_conf
|
||||
.tenant_conf
|
||||
.eviction_policy
|
||||
.unwrap_or(self.conf.default_tenant_conf.eviction_policy)
|
||||
}
|
||||
@@ -1653,22 +1708,25 @@ impl Timeline {
|
||||
}
|
||||
|
||||
fn get_image_layer_creation_check_threshold(&self) -> u8 {
|
||||
let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf.clone();
|
||||
tenant_conf.image_layer_creation_check_threshold.unwrap_or(
|
||||
self.conf
|
||||
.default_tenant_conf
|
||||
.image_layer_creation_check_threshold,
|
||||
)
|
||||
let tenant_conf = self.tenant_conf.load();
|
||||
tenant_conf
|
||||
.tenant_conf
|
||||
.image_layer_creation_check_threshold
|
||||
.unwrap_or(
|
||||
self.conf
|
||||
.default_tenant_conf
|
||||
.image_layer_creation_check_threshold,
|
||||
)
|
||||
}
|
||||
|
||||
pub(super) fn tenant_conf_updated(&self) {
|
||||
pub(super) fn tenant_conf_updated(&self, new_conf: &TenantConfOpt) {
|
||||
// NB: Most tenant conf options are read by background loops, so,
|
||||
// changes will automatically be picked up.
|
||||
|
||||
// The threshold is embedded in the metric. So, we need to update it.
|
||||
{
|
||||
let new_threshold = Self::get_evictions_low_residence_duration_metric_threshold(
|
||||
&self.tenant_conf.read().unwrap().tenant_conf,
|
||||
new_conf,
|
||||
&self.conf.default_tenant_conf,
|
||||
);
|
||||
|
||||
@@ -1695,7 +1753,7 @@ impl Timeline {
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub(super) fn new(
|
||||
conf: &'static PageServerConf,
|
||||
tenant_conf: Arc<RwLock<AttachedTenantConf>>,
|
||||
tenant_conf: Arc<ArcSwap<AttachedTenantConf>>,
|
||||
metadata: &TimelineMetadata,
|
||||
ancestor: Option<Arc<Timeline>>,
|
||||
timeline_id: TimelineId,
|
||||
@@ -1714,14 +1772,13 @@ impl Timeline {
|
||||
let (layer_flush_start_tx, _) = tokio::sync::watch::channel(0);
|
||||
let (layer_flush_done_tx, _) = tokio::sync::watch::channel((0, Ok(())));
|
||||
|
||||
let tenant_conf_guard = tenant_conf.read().unwrap();
|
||||
|
||||
let evictions_low_residence_duration_metric_threshold =
|
||||
let evictions_low_residence_duration_metric_threshold = {
|
||||
let loaded_tenant_conf = tenant_conf.load();
|
||||
Self::get_evictions_low_residence_duration_metric_threshold(
|
||||
&tenant_conf_guard.tenant_conf,
|
||||
&loaded_tenant_conf.tenant_conf,
|
||||
&conf.default_tenant_conf,
|
||||
);
|
||||
drop(tenant_conf_guard);
|
||||
)
|
||||
};
|
||||
|
||||
Arc::new_cyclic(|myself| {
|
||||
let mut result = Timeline {
|
||||
@@ -1904,20 +1961,19 @@ impl Timeline {
|
||||
self.timeline_id, self.tenant_shard_id
|
||||
);
|
||||
|
||||
let tenant_conf_guard = self.tenant_conf.read().unwrap();
|
||||
let wal_connect_timeout = tenant_conf_guard
|
||||
let tenant_conf = self.tenant_conf.load();
|
||||
let wal_connect_timeout = tenant_conf
|
||||
.tenant_conf
|
||||
.walreceiver_connect_timeout
|
||||
.unwrap_or(self.conf.default_tenant_conf.walreceiver_connect_timeout);
|
||||
let lagging_wal_timeout = tenant_conf_guard
|
||||
let lagging_wal_timeout = tenant_conf
|
||||
.tenant_conf
|
||||
.lagging_wal_timeout
|
||||
.unwrap_or(self.conf.default_tenant_conf.lagging_wal_timeout);
|
||||
let max_lsn_wal_lag = tenant_conf_guard
|
||||
let max_lsn_wal_lag = tenant_conf
|
||||
.tenant_conf
|
||||
.max_lsn_wal_lag
|
||||
.unwrap_or(self.conf.default_tenant_conf.max_lsn_wal_lag);
|
||||
drop(tenant_conf_guard);
|
||||
|
||||
let mut guard = self.walreceiver.lock().unwrap();
|
||||
assert!(
|
||||
@@ -2465,10 +2521,6 @@ impl Timeline {
|
||||
debug!("cancelling logical size calculation for timeline shutdown");
|
||||
calculation.await
|
||||
}
|
||||
_ = task_mgr::shutdown_watcher() => {
|
||||
debug!("cancelling logical size calculation for task shutdown");
|
||||
calculation.await
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3152,16 +3204,11 @@ impl Timeline {
|
||||
loop {
|
||||
tokio::select! {
|
||||
_ = self.cancel.cancelled() => {
|
||||
info!("shutting down layer flush task");
|
||||
break;
|
||||
},
|
||||
_ = task_mgr::shutdown_watcher() => {
|
||||
info!("shutting down layer flush task");
|
||||
info!("shutting down layer flush task due to Timeline::cancel");
|
||||
break;
|
||||
},
|
||||
_ = layer_flush_start_rx.changed() => {}
|
||||
}
|
||||
|
||||
trace!("waking up");
|
||||
let flush_counter = *layer_flush_start_rx.borrow();
|
||||
let result = loop {
|
||||
|
||||
@@ -14,7 +14,6 @@ use crate::{
|
||||
deletion_queue::DeletionQueueClient,
|
||||
task_mgr::{self, TaskKind},
|
||||
tenant::{
|
||||
debug_assert_current_span_has_tenant_and_timeline_id,
|
||||
metadata::TimelineMetadata,
|
||||
remote_timeline_client::{PersistIndexPartWithDeletedFlagError, RemoteTimelineClient},
|
||||
CreateTimelineCause, DeleteTimelineError, Tenant,
|
||||
@@ -23,42 +22,6 @@ use crate::{
|
||||
|
||||
use super::{Timeline, TimelineResources};
|
||||
|
||||
/// Now that the Timeline is in Stopping state, request all the related tasks to shut down.
|
||||
async fn stop_tasks(timeline: &Timeline) -> Result<(), DeleteTimelineError> {
|
||||
debug_assert_current_span_has_tenant_and_timeline_id();
|
||||
// Notify any timeline work to drop out of loops/requests
|
||||
tracing::debug!("Cancelling CancellationToken");
|
||||
timeline.cancel.cancel();
|
||||
|
||||
// Prevent new uploads from starting.
|
||||
if let Some(remote_client) = timeline.remote_client.as_ref() {
|
||||
remote_client.stop();
|
||||
}
|
||||
|
||||
// Stop & wait for the remaining timeline tasks, including upload tasks.
|
||||
// NB: This and other delete_timeline calls do not run as a task_mgr task,
|
||||
// so, they are not affected by this shutdown_tasks() call.
|
||||
info!("waiting for timeline tasks to shutdown");
|
||||
task_mgr::shutdown_tasks(
|
||||
None,
|
||||
Some(timeline.tenant_shard_id),
|
||||
Some(timeline.timeline_id),
|
||||
)
|
||||
.await;
|
||||
|
||||
fail::fail_point!("timeline-delete-before-index-deleted-at", |_| {
|
||||
Err(anyhow::anyhow!(
|
||||
"failpoint: timeline-delete-before-index-deleted-at"
|
||||
))?
|
||||
});
|
||||
|
||||
tracing::debug!("Waiting for gate...");
|
||||
timeline.gate.close().await;
|
||||
tracing::debug!("Shutdown complete");
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Mark timeline as deleted in S3 so we won't pick it up next time
|
||||
/// during attach or pageserver restart.
|
||||
/// See comment in persist_index_part_with_deleted_flag.
|
||||
@@ -252,7 +215,14 @@ impl DeleteTimelineFlow {
|
||||
|
||||
guard.mark_in_progress()?;
|
||||
|
||||
stop_tasks(&timeline).await?;
|
||||
// Now that the Timeline is in Stopping state, request all the related tasks to shut down.
|
||||
timeline.shutdown(super::ShutdownMode::Hard).await;
|
||||
|
||||
fail::fail_point!("timeline-delete-before-index-deleted-at", |_| {
|
||||
Err(anyhow::anyhow!(
|
||||
"failpoint: timeline-delete-before-index-deleted-at"
|
||||
))?
|
||||
});
|
||||
|
||||
set_deleted_in_remote_index(&timeline).await?;
|
||||
|
||||
|
||||
@@ -67,20 +67,19 @@ impl Timeline {
|
||||
),
|
||||
false,
|
||||
async move {
|
||||
let cancel = task_mgr::shutdown_token();
|
||||
tokio::select! {
|
||||
_ = cancel.cancelled() => { return Ok(()); }
|
||||
_ = self_clone.cancel.cancelled() => { return Ok(()); }
|
||||
_ = completion::Barrier::maybe_wait(background_tasks_can_start) => {}
|
||||
};
|
||||
|
||||
self_clone.eviction_task(parent, cancel).await;
|
||||
self_clone.eviction_task(parent).await;
|
||||
Ok(())
|
||||
},
|
||||
);
|
||||
}
|
||||
|
||||
#[instrument(skip_all, fields(tenant_id = %self.tenant_shard_id.tenant_id, shard_id = %self.tenant_shard_id.shard_slug(), timeline_id = %self.timeline_id))]
|
||||
async fn eviction_task(self: Arc<Self>, tenant: Arc<Tenant>, cancel: CancellationToken) {
|
||||
async fn eviction_task(self: Arc<Self>, tenant: Arc<Tenant>) {
|
||||
use crate::tenant::tasks::random_init_delay;
|
||||
|
||||
// acquire the gate guard only once within a useful span
|
||||
@@ -95,7 +94,7 @@ impl Timeline {
|
||||
EvictionPolicy::OnlyImitiate(lat) => lat.period,
|
||||
EvictionPolicy::NoEviction => Duration::from_secs(10),
|
||||
};
|
||||
if random_init_delay(period, &cancel).await.is_err() {
|
||||
if random_init_delay(period, &self.cancel).await.is_err() {
|
||||
return;
|
||||
}
|
||||
}
|
||||
@@ -104,13 +103,13 @@ impl Timeline {
|
||||
loop {
|
||||
let policy = self.get_eviction_policy();
|
||||
let cf = self
|
||||
.eviction_iteration(&tenant, &policy, &cancel, &guard, &ctx)
|
||||
.eviction_iteration(&tenant, &policy, &self.cancel, &guard, &ctx)
|
||||
.await;
|
||||
|
||||
match cf {
|
||||
ControlFlow::Break(()) => break,
|
||||
ControlFlow::Continue(sleep_until) => {
|
||||
if tokio::time::timeout_at(sleep_until, cancel.cancelled())
|
||||
if tokio::time::timeout_at(sleep_until, self.cancel.cancelled())
|
||||
.await
|
||||
.is_ok()
|
||||
{
|
||||
|
||||
@@ -33,6 +33,7 @@ once_cell.workspace = true
|
||||
parking_lot.workspace = true
|
||||
postgres.workspace = true
|
||||
postgres-protocol.workspace = true
|
||||
rand.workspace = true
|
||||
regex.workspace = true
|
||||
scopeguard.workspace = true
|
||||
reqwest = { workspace = true, features = ["json"] }
|
||||
|
||||
@@ -28,7 +28,7 @@ use utils::pid_file;
|
||||
use metrics::set_build_info_metric;
|
||||
use safekeeper::defaults::{
|
||||
DEFAULT_HEARTBEAT_TIMEOUT, DEFAULT_HTTP_LISTEN_ADDR, DEFAULT_MAX_OFFLOADER_LAG_BYTES,
|
||||
DEFAULT_PG_LISTEN_ADDR,
|
||||
DEFAULT_PARTIAL_BACKUP_TIMEOUT, DEFAULT_PG_LISTEN_ADDR,
|
||||
};
|
||||
use safekeeper::wal_service;
|
||||
use safekeeper::GlobalTimelines;
|
||||
@@ -170,6 +170,13 @@ struct Args {
|
||||
/// still needed for existing replication connection.
|
||||
#[arg(long)]
|
||||
walsenders_keep_horizon: bool,
|
||||
/// Enable partial backup. If disabled, safekeeper will not upload partial
|
||||
/// segments to remote storage.
|
||||
#[arg(long)]
|
||||
partial_backup_enabled: bool,
|
||||
/// Controls how long backup will wait until uploading the partial segment.
|
||||
#[arg(long, value_parser = humantime::parse_duration, default_value = DEFAULT_PARTIAL_BACKUP_TIMEOUT, verbatim_doc_comment)]
|
||||
partial_backup_timeout: Duration,
|
||||
}
|
||||
|
||||
// Like PathBufValueParser, but allows empty string.
|
||||
@@ -300,6 +307,8 @@ async fn main() -> anyhow::Result<()> {
|
||||
http_auth,
|
||||
current_thread_runtime: args.current_thread_runtime,
|
||||
walsenders_keep_horizon: args.walsenders_keep_horizon,
|
||||
partial_backup_enabled: args.partial_backup_enabled,
|
||||
partial_backup_timeout: args.partial_backup_timeout,
|
||||
};
|
||||
|
||||
// initialize sentry if SENTRY_DSN is provided
|
||||
@@ -365,6 +374,8 @@ async fn start_safekeeper(conf: SafeKeeperConf) -> Result<()> {
|
||||
|
||||
let (wal_backup_launcher_tx, wal_backup_launcher_rx) = mpsc::channel(100);
|
||||
|
||||
wal_backup::init_remote_storage(&conf);
|
||||
|
||||
// Keep handles to main tasks to die if any of them disappears.
|
||||
let mut tasks_handles: FuturesUnordered<BoxFuture<(String, JoinTaskRes)>> =
|
||||
FuturesUnordered::new();
|
||||
|
||||
@@ -20,7 +20,7 @@ use utils::{bin_ser::LeSer, id::TenantTimelineId};
|
||||
use crate::SafeKeeperConf;
|
||||
|
||||
pub const SK_MAGIC: u32 = 0xcafeceefu32;
|
||||
pub const SK_FORMAT_VERSION: u32 = 7;
|
||||
pub const SK_FORMAT_VERSION: u32 = 8;
|
||||
|
||||
// contains persistent metadata for safekeeper
|
||||
const CONTROL_FILE_NAME: &str = "safekeeper.control";
|
||||
|
||||
@@ -2,6 +2,7 @@
|
||||
use crate::{
|
||||
safekeeper::{AcceptorState, PgUuid, ServerInfo, Term, TermHistory, TermLsn},
|
||||
state::{PersistedPeers, TimelinePersistentState},
|
||||
wal_backup_partial,
|
||||
};
|
||||
use anyhow::{bail, Result};
|
||||
use pq_proto::SystemId;
|
||||
@@ -138,6 +139,50 @@ pub struct SafeKeeperStateV4 {
|
||||
pub peers: PersistedPeers,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
|
||||
pub struct SafeKeeperStateV7 {
|
||||
#[serde(with = "hex")]
|
||||
pub tenant_id: TenantId,
|
||||
#[serde(with = "hex")]
|
||||
pub timeline_id: TimelineId,
|
||||
/// persistent acceptor state
|
||||
pub acceptor_state: AcceptorState,
|
||||
/// information about server
|
||||
pub server: ServerInfo,
|
||||
/// Unique id of the last *elected* proposer we dealt with. Not needed
|
||||
/// for correctness, exists for monitoring purposes.
|
||||
#[serde(with = "hex")]
|
||||
pub proposer_uuid: PgUuid,
|
||||
/// Since which LSN this timeline generally starts. Safekeeper might have
|
||||
/// joined later.
|
||||
pub timeline_start_lsn: Lsn,
|
||||
/// Since which LSN safekeeper has (had) WAL for this timeline.
|
||||
/// All WAL segments next to one containing local_start_lsn are
|
||||
/// filled with data from the beginning.
|
||||
pub local_start_lsn: Lsn,
|
||||
/// Part of WAL acknowledged by quorum *and available locally*. Always points
|
||||
/// to record boundary.
|
||||
pub commit_lsn: Lsn,
|
||||
/// LSN that points to the end of the last backed up segment. Useful to
|
||||
/// persist to avoid finding out offloading progress on boot.
|
||||
pub backup_lsn: Lsn,
|
||||
/// Minimal LSN which may be needed for recovery of some safekeeper (end_lsn
|
||||
/// of last record streamed to everyone). Persisting it helps skipping
|
||||
/// recovery in walproposer, generally we compute it from peers. In
|
||||
/// walproposer proto called 'truncate_lsn'. Updates are currently drived
|
||||
/// only by walproposer.
|
||||
pub peer_horizon_lsn: Lsn,
|
||||
/// LSN of the oldest known checkpoint made by pageserver and successfully
|
||||
/// pushed to s3. We don't remove WAL beyond it. Persisted only for
|
||||
/// informational purposes, we receive it from pageserver (or broker).
|
||||
pub remote_consistent_lsn: Lsn,
|
||||
// Peers and their state as we remember it. Knowing peers themselves is
|
||||
// fundamental; but state is saved here only for informational purposes and
|
||||
// obviously can be stale. (Currently not saved at all, but let's provision
|
||||
// place to have less file version upgrades).
|
||||
pub peers: PersistedPeers,
|
||||
}
|
||||
|
||||
pub fn upgrade_control_file(buf: &[u8], version: u32) -> Result<TimelinePersistentState> {
|
||||
// migrate to storing full term history
|
||||
if version == 1 {
|
||||
@@ -167,6 +212,7 @@ pub fn upgrade_control_file(buf: &[u8], version: u32) -> Result<TimelinePersiste
|
||||
peer_horizon_lsn: oldstate.truncate_lsn,
|
||||
remote_consistent_lsn: Lsn(0),
|
||||
peers: PersistedPeers(vec![]),
|
||||
partial_backup: wal_backup_partial::State::default(),
|
||||
});
|
||||
// migrate to hexing some ids
|
||||
} else if version == 2 {
|
||||
@@ -190,6 +236,7 @@ pub fn upgrade_control_file(buf: &[u8], version: u32) -> Result<TimelinePersiste
|
||||
peer_horizon_lsn: oldstate.truncate_lsn,
|
||||
remote_consistent_lsn: Lsn(0),
|
||||
peers: PersistedPeers(vec![]),
|
||||
partial_backup: wal_backup_partial::State::default(),
|
||||
});
|
||||
// migrate to moving tenant_id/timeline_id to the top and adding some lsns
|
||||
} else if version == 3 {
|
||||
@@ -213,6 +260,7 @@ pub fn upgrade_control_file(buf: &[u8], version: u32) -> Result<TimelinePersiste
|
||||
peer_horizon_lsn: oldstate.truncate_lsn,
|
||||
remote_consistent_lsn: Lsn(0),
|
||||
peers: PersistedPeers(vec![]),
|
||||
partial_backup: wal_backup_partial::State::default(),
|
||||
});
|
||||
// migrate to having timeline_start_lsn
|
||||
} else if version == 4 {
|
||||
@@ -236,6 +284,7 @@ pub fn upgrade_control_file(buf: &[u8], version: u32) -> Result<TimelinePersiste
|
||||
peer_horizon_lsn: oldstate.peer_horizon_lsn,
|
||||
remote_consistent_lsn: Lsn(0),
|
||||
peers: PersistedPeers(vec![]),
|
||||
partial_backup: wal_backup_partial::State::default(),
|
||||
});
|
||||
} else if version == 5 {
|
||||
info!("reading safekeeper control file version {}", version);
|
||||
@@ -262,7 +311,30 @@ pub fn upgrade_control_file(buf: &[u8], version: u32) -> Result<TimelinePersiste
|
||||
oldstate.server.pg_version = 140005;
|
||||
|
||||
return Ok(oldstate);
|
||||
} else if version == 7 {
|
||||
info!("reading safekeeper control file version {}", version);
|
||||
let oldstate = SafeKeeperStateV7::des(&buf[..buf.len()])?;
|
||||
|
||||
return Ok(TimelinePersistentState {
|
||||
tenant_id: oldstate.tenant_id,
|
||||
timeline_id: oldstate.timeline_id,
|
||||
acceptor_state: oldstate.acceptor_state,
|
||||
server: oldstate.server,
|
||||
proposer_uuid: oldstate.proposer_uuid,
|
||||
timeline_start_lsn: oldstate.timeline_start_lsn,
|
||||
local_start_lsn: oldstate.local_start_lsn,
|
||||
commit_lsn: oldstate.commit_lsn,
|
||||
backup_lsn: oldstate.backup_lsn,
|
||||
peer_horizon_lsn: oldstate.peer_horizon_lsn,
|
||||
remote_consistent_lsn: oldstate.remote_consistent_lsn,
|
||||
peers: oldstate.peers,
|
||||
partial_backup: wal_backup_partial::State::default(),
|
||||
});
|
||||
}
|
||||
|
||||
// TODO: persist the file back to the disk after upgrade
|
||||
// TODO: think about backward compatibility and rollbacks
|
||||
|
||||
bail!("unsupported safekeeper control file version {}", version)
|
||||
}
|
||||
|
||||
|
||||
@@ -32,6 +32,7 @@ pub mod send_wal;
|
||||
pub mod state;
|
||||
pub mod timeline;
|
||||
pub mod wal_backup;
|
||||
pub mod wal_backup_partial;
|
||||
pub mod wal_service;
|
||||
pub mod wal_storage;
|
||||
|
||||
@@ -48,6 +49,7 @@ pub mod defaults {
|
||||
|
||||
pub const DEFAULT_HEARTBEAT_TIMEOUT: &str = "5000ms";
|
||||
pub const DEFAULT_MAX_OFFLOADER_LAG_BYTES: u64 = 128 * (1 << 20);
|
||||
pub const DEFAULT_PARTIAL_BACKUP_TIMEOUT: &str = "15m";
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
@@ -79,6 +81,8 @@ pub struct SafeKeeperConf {
|
||||
pub http_auth: Option<Arc<SwappableJwtAuth>>,
|
||||
pub current_thread_runtime: bool,
|
||||
pub walsenders_keep_horizon: bool,
|
||||
pub partial_backup_enabled: bool,
|
||||
pub partial_backup_timeout: Duration,
|
||||
}
|
||||
|
||||
impl SafeKeeperConf {
|
||||
@@ -123,6 +127,8 @@ impl SafeKeeperConf {
|
||||
max_offloader_lag_bytes: defaults::DEFAULT_MAX_OFFLOADER_LAG_BYTES,
|
||||
current_thread_runtime: false,
|
||||
walsenders_keep_horizon: false,
|
||||
partial_backup_enabled: false,
|
||||
partial_backup_timeout: Duration::from_secs(0),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -147,6 +147,21 @@ pub static RECEIVED_PS_FEEDBACKS: Lazy<IntCounter> = Lazy::new(|| {
|
||||
)
|
||||
.expect("Failed to register safekeeper_received_ps_feedbacks_total counter")
|
||||
});
|
||||
pub static PARTIAL_BACKUP_UPLOADS: Lazy<IntCounterVec> = Lazy::new(|| {
|
||||
register_int_counter_vec!(
|
||||
"safekeeper_partial_backup_uploads_total",
|
||||
"Number of partial backup uploads to the S3",
|
||||
&["result"]
|
||||
)
|
||||
.expect("Failed to register safekeeper_partial_backup_uploads_total counter")
|
||||
});
|
||||
pub static PARTIAL_BACKUP_UPLOADED_BYTES: Lazy<IntCounter> = Lazy::new(|| {
|
||||
register_int_counter!(
|
||||
"safekeeper_partial_backup_uploaded_bytes_total",
|
||||
"Number of bytes uploaded to the S3 during partial backup"
|
||||
)
|
||||
.expect("Failed to register safekeeper_partial_backup_uploaded_bytes_total counter")
|
||||
});
|
||||
|
||||
pub const LABEL_UNKNOWN: &str = "unknown";
|
||||
|
||||
|
||||
@@ -1221,6 +1221,7 @@ mod tests {
|
||||
commit_lsn: Lsn(1234567600),
|
||||
},
|
||||
)]),
|
||||
partial_backup: crate::wal_backup_partial::State::default(),
|
||||
};
|
||||
|
||||
let ser = state.ser().unwrap();
|
||||
@@ -1266,6 +1267,8 @@ mod tests {
|
||||
0x2a, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
|
||||
0x70, 0x02, 0x96, 0x49, 0x00, 0x00, 0x00, 0x00,
|
||||
0xb0, 0x01, 0x96, 0x49, 0x00, 0x00, 0x00, 0x00,
|
||||
// partial_backup
|
||||
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
|
||||
];
|
||||
|
||||
assert_eq!(Hex(&ser), Hex(&expected));
|
||||
|
||||
@@ -13,6 +13,7 @@ use utils::{
|
||||
use crate::{
|
||||
control_file,
|
||||
safekeeper::{AcceptorState, PersistedPeerInfo, PgUuid, ServerInfo, TermHistory},
|
||||
wal_backup_partial::{self},
|
||||
};
|
||||
|
||||
/// Persistent information stored on safekeeper node about timeline.
|
||||
@@ -54,11 +55,14 @@ pub struct TimelinePersistentState {
|
||||
/// pushed to s3. We don't remove WAL beyond it. Persisted only for
|
||||
/// informational purposes, we receive it from pageserver (or broker).
|
||||
pub remote_consistent_lsn: Lsn,
|
||||
// Peers and their state as we remember it. Knowing peers themselves is
|
||||
// fundamental; but state is saved here only for informational purposes and
|
||||
// obviously can be stale. (Currently not saved at all, but let's provision
|
||||
// place to have less file version upgrades).
|
||||
/// Peers and their state as we remember it. Knowing peers themselves is
|
||||
/// fundamental; but state is saved here only for informational purposes and
|
||||
/// obviously can be stale. (Currently not saved at all, but let's provision
|
||||
/// place to have less file version upgrades).
|
||||
pub peers: PersistedPeers,
|
||||
/// Holds names of partial segments uploaded to remote storage. Used to
|
||||
/// clean up old objects without leaving garbage in remote storage.
|
||||
pub partial_backup: wal_backup_partial::State,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
|
||||
@@ -93,6 +97,7 @@ impl TimelinePersistentState {
|
||||
.map(|p| (*p, PersistedPeerInfo::new()))
|
||||
.collect(),
|
||||
),
|
||||
partial_backup: wal_backup_partial::State::default(),
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -38,7 +38,7 @@ use crate::{control_file, safekeeper::UNKNOWN_SERVER_VERSION};
|
||||
|
||||
use crate::metrics::FullTimelineInfo;
|
||||
use crate::wal_storage::Storage as wal_storage_iface;
|
||||
use crate::{debug_dump, wal_storage};
|
||||
use crate::{debug_dump, wal_backup_partial, wal_storage};
|
||||
use crate::{GlobalTimelines, SafeKeeperConf};
|
||||
|
||||
/// Things safekeeper should know about timeline state on peers.
|
||||
@@ -503,6 +503,9 @@ impl Timeline {
|
||||
if conf.peer_recovery_enabled {
|
||||
tokio::spawn(recovery_main(self.clone(), conf.clone()));
|
||||
}
|
||||
if conf.is_wal_backup_enabled() && conf.partial_backup_enabled {
|
||||
tokio::spawn(wal_backup_partial::main_task(self.clone(), conf.clone()));
|
||||
}
|
||||
}
|
||||
|
||||
/// Delete timeline from disk completely, by removing timeline directory.
|
||||
@@ -667,8 +670,8 @@ impl Timeline {
|
||||
term_flush_lsn =
|
||||
TermLsn::from((shared_state.sk.get_term(), shared_state.sk.flush_lsn()));
|
||||
}
|
||||
self.commit_lsn_watch_tx.send(commit_lsn)?;
|
||||
self.term_flush_lsn_watch_tx.send(term_flush_lsn)?;
|
||||
self.commit_lsn_watch_tx.send(commit_lsn)?;
|
||||
Ok(rmsg)
|
||||
}
|
||||
|
||||
|
||||
@@ -18,7 +18,7 @@ use std::time::Duration;
|
||||
use postgres_ffi::v14::xlog_utils::XLogSegNoOffsetToRecPtr;
|
||||
use postgres_ffi::XLogFileName;
|
||||
use postgres_ffi::{XLogSegNo, PG_TLI};
|
||||
use remote_storage::{GenericRemoteStorage, RemotePath};
|
||||
use remote_storage::{GenericRemoteStorage, RemotePath, StorageMetadata};
|
||||
use tokio::fs::File;
|
||||
|
||||
use tokio::select;
|
||||
@@ -180,6 +180,16 @@ fn get_configured_remote_storage() -> &'static GenericRemoteStorage {
|
||||
.unwrap()
|
||||
}
|
||||
|
||||
pub fn init_remote_storage(conf: &SafeKeeperConf) {
|
||||
// TODO: refactor REMOTE_STORAGE to avoid using global variables, and provide
|
||||
// dependencies to all tasks instead.
|
||||
REMOTE_STORAGE.get_or_init(|| {
|
||||
conf.remote_storage
|
||||
.as_ref()
|
||||
.map(|c| GenericRemoteStorage::from_config(c).expect("failed to create remote storage"))
|
||||
});
|
||||
}
|
||||
|
||||
const CHECK_TASKS_INTERVAL_MSEC: u64 = 1000;
|
||||
|
||||
/// Sits on wal_backup_launcher_rx and starts/stops per timeline wal backup
|
||||
@@ -194,14 +204,6 @@ pub async fn wal_backup_launcher_task_main(
|
||||
conf.remote_storage
|
||||
);
|
||||
|
||||
let conf_ = conf.clone();
|
||||
REMOTE_STORAGE.get_or_init(|| {
|
||||
conf_
|
||||
.remote_storage
|
||||
.as_ref()
|
||||
.map(|c| GenericRemoteStorage::from_config(c).expect("failed to create remote storage"))
|
||||
});
|
||||
|
||||
// Presence in this map means launcher is aware s3 offloading is needed for
|
||||
// the timeline, but task is started only if it makes sense for to offload
|
||||
// from this safekeeper.
|
||||
@@ -518,6 +520,35 @@ async fn backup_object(
|
||||
.await
|
||||
}
|
||||
|
||||
pub(crate) async fn backup_partial_segment(
|
||||
source_file: &Utf8Path,
|
||||
target_file: &RemotePath,
|
||||
size: usize,
|
||||
) -> Result<()> {
|
||||
let storage = get_configured_remote_storage();
|
||||
|
||||
let file = File::open(&source_file)
|
||||
.await
|
||||
.with_context(|| format!("Failed to open file {source_file:?} for wal backup"))?;
|
||||
|
||||
// limiting the file to read only the first `size` bytes
|
||||
let limited_file = tokio::io::AsyncReadExt::take(file, size as u64);
|
||||
|
||||
let file = tokio_util::io::ReaderStream::with_capacity(limited_file, BUFFER_SIZE);
|
||||
|
||||
let cancel = CancellationToken::new();
|
||||
|
||||
storage
|
||||
.upload(
|
||||
file,
|
||||
size,
|
||||
target_file,
|
||||
Some(StorageMetadata::from([("sk_type", "partial_segment")])),
|
||||
&cancel,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
pub async fn read_object(
|
||||
file_path: &RemotePath,
|
||||
offset: u64,
|
||||
@@ -604,6 +635,13 @@ pub async fn delete_timeline(ttid: &TenantTimelineId) -> Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Used by wal_backup_partial.
|
||||
pub async fn delete_objects(paths: &[RemotePath]) -> Result<()> {
|
||||
let cancel = CancellationToken::new(); // not really used
|
||||
let storage = get_configured_remote_storage();
|
||||
storage.delete_objects(paths, &cancel).await
|
||||
}
|
||||
|
||||
/// Copy segments from one timeline to another. Used in copy_timeline.
|
||||
pub async fn copy_s3_segments(
|
||||
wal_seg_size: usize,
|
||||
|
||||
396
safekeeper/src/wal_backup_partial.rs
Normal file
396
safekeeper/src/wal_backup_partial.rs
Normal file
@@ -0,0 +1,396 @@
|
||||
//! Safekeeper timeline has a background task which is subscribed to `commit_lsn`
|
||||
//! and `flush_lsn` updates. After the partial segment was updated (`flush_lsn`
|
||||
//! was changed), the segment will be uploaded to S3 in about 15 minutes.
|
||||
//!
|
||||
//! The filename format for partial segments is
|
||||
//! `Segment_Term_Flush_Commit_skNN.partial`, where:
|
||||
//! - `Segment` – the segment name, like `000000010000000000000001`
|
||||
//! - `Term` – current term
|
||||
//! - `Flush` – flush_lsn in hex format `{:016X}`, e.g. `00000000346BC568`
|
||||
//! - `Commit` – commit_lsn in the same hex format
|
||||
//! - `NN` – safekeeper_id, like `1`
|
||||
//!
|
||||
//! The full object name example:
|
||||
//! `000000010000000000000002_2_0000000002534868_0000000002534410_sk1.partial`
|
||||
//!
|
||||
//! Each safekeeper will keep info about remote partial segments in its control
|
||||
//! file. Code updates state in the control file before doing any S3 operations.
|
||||
//! This way control file stores information about all potentially existing
|
||||
//! remote partial segments and can clean them up after uploading a newer version.
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
use camino::Utf8PathBuf;
|
||||
use postgres_ffi::{XLogFileName, XLogSegNo, PG_TLI};
|
||||
use rand::Rng;
|
||||
use remote_storage::RemotePath;
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use tracing::{debug, error, info, instrument};
|
||||
use utils::lsn::Lsn;
|
||||
|
||||
use crate::{
|
||||
metrics::{PARTIAL_BACKUP_UPLOADED_BYTES, PARTIAL_BACKUP_UPLOADS},
|
||||
safekeeper::Term,
|
||||
timeline::Timeline,
|
||||
wal_backup, SafeKeeperConf,
|
||||
};
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
|
||||
pub enum UploadStatus {
|
||||
/// Upload is in progress
|
||||
InProgress,
|
||||
/// Upload is finished
|
||||
Uploaded,
|
||||
/// Deletion is in progress
|
||||
Deleting,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
|
||||
pub struct PartialRemoteSegment {
|
||||
pub status: UploadStatus,
|
||||
pub name: String,
|
||||
pub commit_lsn: Lsn,
|
||||
pub flush_lsn: Lsn,
|
||||
pub term: Term,
|
||||
}
|
||||
|
||||
impl PartialRemoteSegment {
|
||||
fn eq_without_status(&self, other: &Self) -> bool {
|
||||
self.name == other.name
|
||||
&& self.commit_lsn == other.commit_lsn
|
||||
&& self.flush_lsn == other.flush_lsn
|
||||
&& self.term == other.term
|
||||
}
|
||||
}
|
||||
|
||||
// NB: these structures are a part of a control_file, you can't change them without
|
||||
// changing the control file format version.
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)]
|
||||
pub struct State {
|
||||
pub segments: Vec<PartialRemoteSegment>,
|
||||
}
|
||||
|
||||
impl State {
|
||||
/// Find an Uploaded segment. There should be only one Uploaded segment at a time.
|
||||
fn uploaded_segment(&self) -> Option<PartialRemoteSegment> {
|
||||
self.segments
|
||||
.iter()
|
||||
.find(|seg| seg.status == UploadStatus::Uploaded)
|
||||
.cloned()
|
||||
}
|
||||
}
|
||||
|
||||
struct PartialBackup {
|
||||
wal_seg_size: usize,
|
||||
tli: Arc<Timeline>,
|
||||
conf: SafeKeeperConf,
|
||||
local_prefix: Utf8PathBuf,
|
||||
remote_prefix: Utf8PathBuf,
|
||||
|
||||
state: State,
|
||||
}
|
||||
|
||||
// Read-only methods for getting segment names
|
||||
impl PartialBackup {
|
||||
fn segno(&self, lsn: Lsn) -> XLogSegNo {
|
||||
lsn.segment_number(self.wal_seg_size)
|
||||
}
|
||||
|
||||
fn segment_name(&self, segno: u64) -> String {
|
||||
XLogFileName(PG_TLI, segno, self.wal_seg_size)
|
||||
}
|
||||
|
||||
fn remote_segment_name(
|
||||
&self,
|
||||
segno: u64,
|
||||
term: u64,
|
||||
commit_lsn: Lsn,
|
||||
flush_lsn: Lsn,
|
||||
) -> String {
|
||||
format!(
|
||||
"{}_{}_{:016X}_{:016X}_sk{}.partial",
|
||||
self.segment_name(segno),
|
||||
term,
|
||||
flush_lsn.0,
|
||||
commit_lsn.0,
|
||||
self.conf.my_id.0,
|
||||
)
|
||||
}
|
||||
|
||||
fn local_segment_name(&self, segno: u64) -> String {
|
||||
format!("{}.partial", self.segment_name(segno))
|
||||
}
|
||||
}
|
||||
|
||||
impl PartialBackup {
|
||||
/// Takes a lock to read actual safekeeper state and returns a segment that should be uploaded.
|
||||
async fn prepare_upload(&self) -> PartialRemoteSegment {
|
||||
// this operation takes a lock to get the actual state
|
||||
let sk_info = self.tli.get_safekeeper_info(&self.conf).await;
|
||||
let flush_lsn = Lsn(sk_info.flush_lsn);
|
||||
let commit_lsn = Lsn(sk_info.commit_lsn);
|
||||
let term = sk_info.term;
|
||||
let segno = self.segno(flush_lsn);
|
||||
|
||||
let name = self.remote_segment_name(segno, term, commit_lsn, flush_lsn);
|
||||
|
||||
PartialRemoteSegment {
|
||||
status: UploadStatus::InProgress,
|
||||
name,
|
||||
commit_lsn,
|
||||
flush_lsn,
|
||||
term,
|
||||
}
|
||||
}
|
||||
|
||||
/// Reads segment from disk and uploads it to the remote storage.
|
||||
async fn upload_segment(&mut self, prepared: PartialRemoteSegment) -> anyhow::Result<()> {
|
||||
let flush_lsn = prepared.flush_lsn;
|
||||
let segno = self.segno(flush_lsn);
|
||||
|
||||
// We're going to backup bytes from the start of the segment up to flush_lsn.
|
||||
let backup_bytes = flush_lsn.segment_offset(self.wal_seg_size);
|
||||
|
||||
let local_path = self.local_prefix.join(self.local_segment_name(segno));
|
||||
let remote_path = RemotePath::new(self.remote_prefix.join(&prepared.name).as_ref())?;
|
||||
|
||||
// Upload first `backup_bytes` bytes of the segment to the remote storage.
|
||||
wal_backup::backup_partial_segment(&local_path, &remote_path, backup_bytes).await?;
|
||||
PARTIAL_BACKUP_UPLOADED_BYTES.inc_by(backup_bytes as u64);
|
||||
|
||||
// We uploaded the segment, now let's verify that the data is still actual.
|
||||
// If the term changed, we cannot guarantee the validity of the uploaded data.
|
||||
// If the term is the same, we know the data is not corrupted.
|
||||
let sk_info = self.tli.get_safekeeper_info(&self.conf).await;
|
||||
if sk_info.term != prepared.term {
|
||||
anyhow::bail!("term changed during upload");
|
||||
}
|
||||
assert!(prepared.commit_lsn <= Lsn(sk_info.commit_lsn));
|
||||
assert!(prepared.flush_lsn <= Lsn(sk_info.flush_lsn));
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Write new state to disk. If in-memory and on-disk states diverged, returns an error.
|
||||
async fn commit_state(&mut self, new_state: State) -> anyhow::Result<()> {
|
||||
self.tli
|
||||
.map_control_file(|cf| {
|
||||
if cf.partial_backup != self.state {
|
||||
let memory = self.state.clone();
|
||||
self.state = cf.partial_backup.clone();
|
||||
anyhow::bail!(
|
||||
"partial backup state diverged, memory={:?}, disk={:?}",
|
||||
memory,
|
||||
cf.partial_backup
|
||||
);
|
||||
}
|
||||
|
||||
cf.partial_backup = new_state.clone();
|
||||
Ok(())
|
||||
})
|
||||
.await?;
|
||||
// update in-memory state
|
||||
self.state = new_state;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Upload the latest version of the partial segment and garbage collect older versions.
|
||||
#[instrument(name = "upload", skip_all, fields(name = %prepared.name))]
|
||||
async fn do_upload(&mut self, prepared: &PartialRemoteSegment) -> anyhow::Result<()> {
|
||||
info!("starting upload {:?}", prepared);
|
||||
|
||||
let state_0 = self.state.clone();
|
||||
let state_1 = {
|
||||
let mut state = state_0.clone();
|
||||
state.segments.push(prepared.clone());
|
||||
state
|
||||
};
|
||||
|
||||
// we're going to upload a new segment, let's write it to disk to make GC later
|
||||
self.commit_state(state_1).await?;
|
||||
|
||||
self.upload_segment(prepared.clone()).await?;
|
||||
|
||||
let state_2 = {
|
||||
let mut state = state_0.clone();
|
||||
for seg in state.segments.iter_mut() {
|
||||
seg.status = UploadStatus::Deleting;
|
||||
}
|
||||
let mut actual_remote_segment = prepared.clone();
|
||||
actual_remote_segment.status = UploadStatus::Uploaded;
|
||||
state.segments.push(actual_remote_segment);
|
||||
state
|
||||
};
|
||||
|
||||
// we've uploaded new segment, it's actual, all other segments should be GCed
|
||||
self.commit_state(state_2).await?;
|
||||
self.gc().await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Delete all non-Uploaded segments from the remote storage. There should be only one
|
||||
/// Uploaded segment at a time.
|
||||
#[instrument(name = "gc", skip_all)]
|
||||
async fn gc(&mut self) -> anyhow::Result<()> {
|
||||
let mut segments_to_delete = vec![];
|
||||
|
||||
let new_segments: Vec<PartialRemoteSegment> = self
|
||||
.state
|
||||
.segments
|
||||
.iter()
|
||||
.filter_map(|seg| {
|
||||
if seg.status == UploadStatus::Uploaded {
|
||||
Some(seg.clone())
|
||||
} else {
|
||||
segments_to_delete.push(seg.name.clone());
|
||||
None
|
||||
}
|
||||
})
|
||||
.collect();
|
||||
|
||||
info!("deleting objects: {:?}", segments_to_delete);
|
||||
let mut objects_to_delete = vec![];
|
||||
for seg in segments_to_delete.iter() {
|
||||
let remote_path = RemotePath::new(self.remote_prefix.join(seg).as_ref())?;
|
||||
objects_to_delete.push(remote_path);
|
||||
}
|
||||
|
||||
// removing segments from remote storage
|
||||
wal_backup::delete_objects(&objects_to_delete).await?;
|
||||
|
||||
// now we can update the state on disk
|
||||
let new_state = {
|
||||
let mut state = self.state.clone();
|
||||
state.segments = new_segments;
|
||||
state
|
||||
};
|
||||
self.commit_state(new_state).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[instrument(name = "Partial backup", skip_all, fields(ttid = %tli.ttid))]
|
||||
pub async fn main_task(tli: Arc<Timeline>, conf: SafeKeeperConf) {
|
||||
debug!("started");
|
||||
let await_duration = conf.partial_backup_timeout;
|
||||
|
||||
let mut cancellation_rx = match tli.get_cancellation_rx() {
|
||||
Ok(rx) => rx,
|
||||
Err(_) => {
|
||||
info!("timeline canceled during task start");
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
// sleep for random time to avoid thundering herd
|
||||
{
|
||||
let randf64 = rand::thread_rng().gen_range(0.0..1.0);
|
||||
let sleep_duration = await_duration.mul_f64(randf64);
|
||||
tokio::time::sleep(sleep_duration).await;
|
||||
}
|
||||
|
||||
let (_, persistent_state) = tli.get_state().await;
|
||||
let mut commit_lsn_rx = tli.get_commit_lsn_watch_rx();
|
||||
let mut flush_lsn_rx = tli.get_term_flush_lsn_watch_rx();
|
||||
let wal_seg_size = tli.get_wal_seg_size().await;
|
||||
|
||||
let local_prefix = tli.timeline_dir.clone();
|
||||
let remote_prefix = match tli.timeline_dir.strip_prefix(&conf.workdir) {
|
||||
Ok(path) => path.to_owned(),
|
||||
Err(e) => {
|
||||
error!("failed to strip workspace dir prefix: {:?}", e);
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
let mut backup = PartialBackup {
|
||||
wal_seg_size,
|
||||
tli,
|
||||
state: persistent_state.partial_backup,
|
||||
conf,
|
||||
local_prefix,
|
||||
remote_prefix,
|
||||
};
|
||||
|
||||
debug!("state: {:?}", backup.state);
|
||||
|
||||
'outer: loop {
|
||||
// wait until we have something to upload
|
||||
let uploaded_segment = backup.state.uploaded_segment();
|
||||
if let Some(seg) = &uploaded_segment {
|
||||
// if we already uploaded something, wait until we have something new
|
||||
while flush_lsn_rx.borrow().lsn == seg.flush_lsn
|
||||
&& *commit_lsn_rx.borrow() == seg.commit_lsn
|
||||
&& flush_lsn_rx.borrow().term == seg.term
|
||||
{
|
||||
tokio::select! {
|
||||
_ = cancellation_rx.changed() => {
|
||||
info!("timeline canceled");
|
||||
return;
|
||||
}
|
||||
_ = commit_lsn_rx.changed() => {}
|
||||
_ = flush_lsn_rx.changed() => {}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// fixing the segno and waiting some time to prevent reuploading the same segment too often
|
||||
let pending_segno = backup.segno(flush_lsn_rx.borrow().lsn);
|
||||
let timeout = tokio::time::sleep(await_duration);
|
||||
tokio::pin!(timeout);
|
||||
let mut timeout_expired = false;
|
||||
|
||||
// waiting until timeout expires OR segno changes
|
||||
'inner: loop {
|
||||
tokio::select! {
|
||||
_ = cancellation_rx.changed() => {
|
||||
info!("timeline canceled");
|
||||
return;
|
||||
}
|
||||
_ = commit_lsn_rx.changed() => {}
|
||||
_ = flush_lsn_rx.changed() => {
|
||||
let segno = backup.segno(flush_lsn_rx.borrow().lsn);
|
||||
if segno != pending_segno {
|
||||
// previous segment is no longer partial, aborting the wait
|
||||
break 'inner;
|
||||
}
|
||||
}
|
||||
_ = &mut timeout => {
|
||||
// timeout expired, now we are ready for upload
|
||||
timeout_expired = true;
|
||||
break 'inner;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if !timeout_expired {
|
||||
// likely segno has changed, let's try again in the next iteration
|
||||
continue 'outer;
|
||||
}
|
||||
|
||||
let prepared = backup.prepare_upload().await;
|
||||
if let Some(seg) = &uploaded_segment {
|
||||
if seg.eq_without_status(&prepared) {
|
||||
// we already uploaded this segment, nothing to do
|
||||
continue 'outer;
|
||||
}
|
||||
}
|
||||
|
||||
match backup.do_upload(&prepared).await {
|
||||
Ok(()) => {
|
||||
debug!(
|
||||
"uploaded {} up to flush_lsn {}",
|
||||
prepared.name, prepared.flush_lsn
|
||||
);
|
||||
PARTIAL_BACKUP_UPLOADS.with_label_values(&["ok"]).inc();
|
||||
}
|
||||
Err(e) => {
|
||||
info!("failed to upload {}: {:#}", prepared.name, e);
|
||||
PARTIAL_BACKUP_UPLOADS.with_label_values(&["error"]).inc();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -176,6 +176,8 @@ pub fn run_server(os: NodeOs, disk: Arc<SafekeeperDisk>) -> Result<()> {
|
||||
http_auth: None,
|
||||
current_thread_runtime: false,
|
||||
walsenders_keep_horizon: false,
|
||||
partial_backup_enabled: false,
|
||||
partial_backup_timeout: Duration::from_secs(0),
|
||||
};
|
||||
|
||||
let mut global = GlobalMap::new(disk, conf.clone())?;
|
||||
|
||||
@@ -341,8 +341,21 @@ class PageserverHttpClient(requests.Session, MetricsGetter):
|
||||
res = self.post(f"http://localhost:{self.port}/v1/tenant/{tenant_id}/ignore")
|
||||
self.verbose_error(res)
|
||||
|
||||
def tenant_status(self, tenant_id: Union[TenantId, TenantShardId]) -> Dict[Any, Any]:
|
||||
res = self.get(f"http://localhost:{self.port}/v1/tenant/{tenant_id}")
|
||||
def tenant_status(
|
||||
self, tenant_id: Union[TenantId, TenantShardId], activate: bool = False
|
||||
) -> Dict[Any, Any]:
|
||||
"""
|
||||
:activate: hint the server not to accelerate activation of this tenant in response
|
||||
to this query. False by default for tests, because they generally want to observed the
|
||||
system rather than interfering with it. This is true by default on the server side,
|
||||
because in the field if the control plane is GET'ing a tenant it's a sign that it wants
|
||||
to do something with it.
|
||||
"""
|
||||
params = {}
|
||||
if not activate:
|
||||
params["activate"] = "false"
|
||||
|
||||
res = self.get(f"http://localhost:{self.port}/v1/tenant/{tenant_id}", params=params)
|
||||
self.verbose_error(res)
|
||||
res_json = res.json()
|
||||
assert isinstance(res_json, dict)
|
||||
|
||||
@@ -192,6 +192,9 @@ def test_backward_compatibility(
|
||||
assert not breaking_changes_allowed, "Breaking changes are allowed by ALLOW_BACKWARD_COMPATIBILITY_BREAKAGE, but the test has passed without any breakage"
|
||||
|
||||
|
||||
# Forward compatibility is broken due to https://github.com/neondatabase/neon/pull/6530
|
||||
# The test is disabled until the next release deployment
|
||||
@pytest.mark.xfail
|
||||
@check_ondisk_data_compatibility_if_enabled
|
||||
@pytest.mark.xdist_group("compatibility")
|
||||
@pytest.mark.order(after="test_create_snapshot")
|
||||
|
||||
@@ -20,9 +20,10 @@ from fixtures.pg_version import PgVersion
|
||||
from fixtures.types import Lsn, TenantId, TimelineId
|
||||
|
||||
|
||||
@pytest.mark.xfail
|
||||
def test_empty_tenant_size(neon_simple_env: NeonEnv, test_output_dir: Path):
|
||||
env = neon_simple_env
|
||||
def test_empty_tenant_size(neon_env_builder: NeonEnvBuilder):
|
||||
env = neon_env_builder.init_configs()
|
||||
env.start()
|
||||
|
||||
(tenant_id, _) = env.neon_cli.create_tenant()
|
||||
http_client = env.pageserver.http_client()
|
||||
initial_size = http_client.tenant_size(tenant_id)
|
||||
@@ -35,66 +36,25 @@ def test_empty_tenant_size(neon_simple_env: NeonEnv, test_output_dir: Path):
|
||||
branch_name, main_timeline_id = env.neon_cli.list_timelines(tenant_id)[0]
|
||||
assert branch_name == main_branch_name
|
||||
|
||||
with env.endpoints.create_start(
|
||||
endpoint = env.endpoints.create_start(
|
||||
main_branch_name,
|
||||
tenant_id=tenant_id,
|
||||
config_lines=["autovacuum=off", "checkpoint_timeout=10min"],
|
||||
) as endpoint:
|
||||
with endpoint.cursor() as cur:
|
||||
cur.execute("SELECT 1")
|
||||
row = cur.fetchone()
|
||||
assert row is not None
|
||||
assert row[0] == 1
|
||||
size = http_client.tenant_size(tenant_id)
|
||||
# we've disabled the autovacuum and checkpoint
|
||||
# so background processes should not change the size.
|
||||
# If this test will flake we should probably loosen the check
|
||||
assert (
|
||||
size == initial_size
|
||||
), f"starting idle compute should not change the tenant size (Currently {size}, expected {initial_size})"
|
||||
)
|
||||
|
||||
# the size should be the same, until we increase the size over the
|
||||
# gc_horizon
|
||||
size, inputs = http_client.tenant_size_and_modelinputs(tenant_id)
|
||||
assert (
|
||||
size == initial_size
|
||||
), f"tenant_size should not be affected by shutdown of compute (Currently {size}, expected {initial_size})"
|
||||
with endpoint.cursor() as cur:
|
||||
cur.execute("SELECT 1")
|
||||
row = cur.fetchone()
|
||||
assert row is not None
|
||||
assert row[0] == 1
|
||||
|
||||
expected_inputs = {
|
||||
"segments": [
|
||||
{
|
||||
"segment": {"parent": None, "lsn": 23694408, "size": 25362432, "needed": True},
|
||||
"timeline_id": f"{main_timeline_id}",
|
||||
"kind": "BranchStart",
|
||||
},
|
||||
{
|
||||
"segment": {"parent": 0, "lsn": 23694528, "size": None, "needed": True},
|
||||
"timeline_id": f"{main_timeline_id}",
|
||||
"kind": "BranchEnd",
|
||||
},
|
||||
],
|
||||
"timeline_inputs": [
|
||||
{
|
||||
"timeline_id": f"{main_timeline_id}",
|
||||
"ancestor_id": None,
|
||||
"ancestor_lsn": "0/0",
|
||||
"last_record": "0/1698CC0",
|
||||
"latest_gc_cutoff": "0/1698C48",
|
||||
"horizon_cutoff": "0/0",
|
||||
"pitr_cutoff": "0/0",
|
||||
"next_gc_cutoff": "0/0",
|
||||
"retention_param_cutoff": None,
|
||||
}
|
||||
],
|
||||
}
|
||||
expected_inputs = mask_model_inputs(expected_inputs)
|
||||
actual_inputs = mask_model_inputs(inputs)
|
||||
# The transaction above will make the compute generate a checkpoint.
|
||||
# In turn, the pageserver persists the checkpoint. This should only be
|
||||
# one key with a size of a couple hundred bytes.
|
||||
wait_for_last_flush_lsn(env, endpoint, tenant_id, main_timeline_id)
|
||||
size = http_client.tenant_size(tenant_id)
|
||||
|
||||
assert expected_inputs == actual_inputs
|
||||
|
||||
size_debug_file = open(test_output_dir / "size_debug.html", "w")
|
||||
size_debug = http_client.tenant_size_debug(tenant_id)
|
||||
size_debug_file.write(size_debug)
|
||||
assert size >= initial_size and size - initial_size < 1024
|
||||
|
||||
|
||||
def test_branched_empty_timeline_size(neon_simple_env: NeonEnv, test_output_dir: Path):
|
||||
@@ -190,7 +150,6 @@ def test_branched_from_many_empty_parents_size(neon_simple_env: NeonEnv, test_ou
|
||||
size_debug_file.write(size_debug)
|
||||
|
||||
|
||||
@pytest.mark.skip("This should work, but is left out because assumed covered by other tests")
|
||||
def test_branch_point_within_horizon(neon_simple_env: NeonEnv, test_output_dir: Path):
|
||||
"""
|
||||
gc_horizon = 15
|
||||
@@ -233,7 +192,6 @@ def test_branch_point_within_horizon(neon_simple_env: NeonEnv, test_output_dir:
|
||||
size_debug_file.write(size_debug)
|
||||
|
||||
|
||||
@pytest.mark.skip("This should work, but is left out because assumed covered by other tests")
|
||||
def test_parent_within_horizon(neon_simple_env: NeonEnv, test_output_dir: Path):
|
||||
"""
|
||||
gc_horizon = 5
|
||||
@@ -282,7 +240,6 @@ def test_parent_within_horizon(neon_simple_env: NeonEnv, test_output_dir: Path):
|
||||
size_debug_file.write(size_debug)
|
||||
|
||||
|
||||
@pytest.mark.skip("This should work, but is left out because assumed covered by other tests")
|
||||
def test_only_heads_within_horizon(neon_simple_env: NeonEnv, test_output_dir: Path):
|
||||
"""
|
||||
gc_horizon = small
|
||||
|
||||
@@ -10,6 +10,7 @@ import pytest
|
||||
import toml
|
||||
from fixtures.log_helper import getLogger
|
||||
from fixtures.neon_fixtures import Endpoint, NeonEnv, NeonEnvBuilder, Safekeeper
|
||||
from fixtures.remote_storage import RemoteStorageKind
|
||||
from fixtures.types import Lsn, TenantId, TimelineId
|
||||
|
||||
log = getLogger("root.safekeeper_async")
|
||||
@@ -199,7 +200,9 @@ async def run_restarts_under_load(
|
||||
# assert that at least one transaction has completed in every worker
|
||||
stats.check_progress()
|
||||
|
||||
victim.start()
|
||||
# testing #6530, temporary here
|
||||
# TODO: remove afer partial backup is enabled by default
|
||||
victim.start(extra_opts=["--partial-backup-enabled", "--partial-backup-timeout=2s"])
|
||||
|
||||
log.info("Iterations are finished, exiting coroutines...")
|
||||
stats.running = False
|
||||
@@ -213,6 +216,7 @@ async def run_restarts_under_load(
|
||||
# Restart acceptors one by one, while executing and validating bank transactions
|
||||
def test_restarts_under_load(neon_env_builder: NeonEnvBuilder):
|
||||
neon_env_builder.num_safekeepers = 3
|
||||
neon_env_builder.enable_safekeeper_remote_storage(RemoteStorageKind.LOCAL_FS)
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
env.neon_cli.create_branch("test_safekeepers_restarts_under_load")
|
||||
|
||||
Reference in New Issue
Block a user