Evict excessively failing sync tasks, improve processing for the rest of

the tasks
This commit is contained in:
Kirill Bulatov
2021-11-11 00:59:43 +02:00
committed by Kirill Bulatov
parent f72d4814b1
commit 670205e17a
10 changed files with 625 additions and 494 deletions

View File

@@ -5,6 +5,7 @@
use serde::{Deserialize, Serialize};
use std::{
env,
num::{NonZeroU32, NonZeroUsize},
path::{Path, PathBuf},
str::FromStr,
thread,
@@ -50,6 +51,7 @@ struct CfgFileParams {
auth_validation_public_key_path: Option<String>,
auth_type: Option<String>,
remote_storage_max_concurrent_sync: Option<String>,
remote_storage_max_sync_errors: Option<String>,
/////////////////////////////////
//// Don't put `Option<String>` and other "simple" values below.
////
@@ -115,6 +117,7 @@ impl CfgFileParams {
auth_type: get_arg("auth-type"),
remote_storage,
remote_storage_max_concurrent_sync: get_arg("remote-storage-max-concurrent-sync"),
remote_storage_max_sync_errors: get_arg("remote-storage-max-sync-errors"),
}
}
@@ -140,6 +143,9 @@ impl CfgFileParams {
remote_storage_max_concurrent_sync: self
.remote_storage_max_concurrent_sync
.or(other.remote_storage_max_concurrent_sync),
remote_storage_max_sync_errors: self
.remote_storage_max_sync_errors
.or(other.remote_storage_max_sync_errors),
}
}
@@ -225,7 +231,11 @@ impl CfgFileParams {
let max_concurrent_sync = match self.remote_storage_max_concurrent_sync.as_deref() {
Some(number_str) => number_str.parse()?,
None => DEFAULT_REMOTE_STORAGE_MAX_CONCURRENT_SYNC_LIMITS,
None => NonZeroUsize::new(DEFAULT_REMOTE_STORAGE_MAX_CONCURRENT_SYNC).unwrap(),
};
let max_sync_errors = match self.remote_storage_max_sync_errors.as_deref() {
Some(number_str) => number_str.parse()?,
None => NonZeroU32::new(DEFAULT_REMOTE_STORAGE_MAX_SYNC_ERRORS).unwrap(),
};
let remote_storage_config = self.remote_storage.as_ref().map(|storage_params| {
let storage = match storage_params.clone() {
@@ -246,6 +256,7 @@ impl CfgFileParams {
};
RemoteStorageConfig {
max_concurrent_sync,
max_sync_errors,
storage,
}
});
@@ -667,6 +678,9 @@ mod tests {
remote_storage_max_concurrent_sync: Some(
"remote_storage_max_concurrent_sync_VALUE".to_string(),
),
remote_storage_max_sync_errors: Some(
"remote_storage_max_sync_errors_VALUE".to_string(),
),
};
let toml_string = toml::to_string(&params).expect("Failed to serialize correct config");
@@ -686,6 +700,7 @@ pg_distrib_dir = 'pg_distrib_dir_VALUE'
auth_validation_public_key_path = 'auth_validation_public_key_path_VALUE'
auth_type = 'auth_type_VALUE'
remote_storage_max_concurrent_sync = 'remote_storage_max_concurrent_sync_VALUE'
remote_storage_max_sync_errors = 'remote_storage_max_sync_errors_VALUE'
[remote_storage]
local_path = 'remote_storage_local_VALUE'
@@ -733,6 +748,9 @@ local_path = 'remote_storage_local_VALUE'
remote_storage_max_concurrent_sync: Some(
"remote_storage_max_concurrent_sync_VALUE".to_string(),
),
remote_storage_max_sync_errors: Some(
"remote_storage_max_sync_errors_VALUE".to_string(),
),
};
let toml_string = toml::to_string(&params).expect("Failed to serialize correct config");
@@ -752,6 +770,7 @@ pg_distrib_dir = 'pg_distrib_dir_VALUE'
auth_validation_public_key_path = 'auth_validation_public_key_path_VALUE'
auth_type = 'auth_type_VALUE'
remote_storage_max_concurrent_sync = 'remote_storage_max_concurrent_sync_VALUE'
remote_storage_max_sync_errors = 'remote_storage_max_sync_errors_VALUE'
[remote_storage]
bucket_name = 'bucket_name_VALUE'

View File

@@ -34,7 +34,7 @@ use std::time::{Duration, Instant};
use self::metadata::{metadata_path, TimelineMetadata};
use crate::page_cache;
use crate::relish::*;
use crate::remote_storage::schedule_timeline_upload;
use crate::remote_storage::schedule_timeline_checkpoint_upload;
use crate::repository::{GcResult, Repository, Timeline, TimelineWriter, WALRecord};
use crate::tenant_mgr;
use crate::walreceiver;
@@ -259,16 +259,37 @@ impl Repository for LayeredRepository {
let timelines = self.timelines.lock().unwrap();
for (timelineid, timeline) in timelines.iter() {
walreceiver::stop_wal_receiver(*timelineid);
// Wait for syncing data to disk
trace!("repo shutdown. checkpoint timeline {}", timelineid);
timeline.checkpoint(CheckpointConfig::Forced)?;
//TODO Wait for walredo process to shutdown too
shutdown_timeline(*timelineid, timeline.as_ref())?;
}
Ok(())
}
fn unload_timeline(&self, timeline_id: ZTimelineId) -> Result<()> {
let mut timelines = self.timelines.lock().unwrap();
let removed_timeline = match timelines.remove(&timeline_id) {
Some(timeline) => timeline,
None => {
warn!("Timeline {} not found, nothing to remove", timeline_id);
return Ok(());
}
};
drop(timelines);
shutdown_timeline(timeline_id, removed_timeline.as_ref())?;
Ok(())
}
}
fn shutdown_timeline(
timelineid: ZTimelineId,
timeline: &LayeredTimeline,
) -> Result<(), anyhow::Error> {
walreceiver::stop_wal_receiver(timelineid);
trace!("repo shutdown. checkpoint timeline {}", timelineid);
timeline.checkpoint(CheckpointConfig::Forced)?;
//TODO Wait for walredo process to shutdown too
Ok(())
}
/// Private functions
@@ -318,7 +339,12 @@ impl LayeredRepository {
.load_layer_map(disk_consistent_lsn)
.context("failed to load layermap")?;
if self.upload_relishes {
schedule_timeline_upload(self.tenantid, timelineid, loaded_layers, metadata);
schedule_timeline_checkpoint_upload(
self.tenantid,
timelineid,
loaded_layers,
metadata,
);
}
// needs to be after load_layer_map
@@ -1332,7 +1358,12 @@ impl LayeredTimeline {
false,
)?;
if self.upload_relishes {
schedule_timeline_upload(self.tenantid, self.timelineid, layer_uploads, metadata);
schedule_timeline_checkpoint_upload(
self.tenantid,
self.timelineid,
layer_uploads,
metadata,
);
}
// Also update the in-memory copy

View File

@@ -2,6 +2,7 @@ use layered_repository::{TENANTS_SEGMENT_NAME, TIMELINES_SEGMENT_NAME};
use zenith_utils::postgres_backend::AuthType;
use zenith_utils::zid::{ZTenantId, ZTimelineId};
use std::num::{NonZeroU32, NonZeroUsize};
use std::path::PathBuf;
use std::time::Duration;
@@ -44,7 +45,8 @@ pub mod defaults {
pub const DEFAULT_GC_PERIOD: Duration = Duration::from_secs(100);
pub const DEFAULT_SUPERUSER: &str = "zenith_admin";
pub const DEFAULT_REMOTE_STORAGE_MAX_CONCURRENT_SYNC_LIMITS: usize = 100;
pub const DEFAULT_REMOTE_STORAGE_MAX_CONCURRENT_SYNC: usize = 100;
pub const DEFAULT_REMOTE_STORAGE_MAX_SYNC_ERRORS: u32 = 10;
pub const DEFAULT_OPEN_MEM_LIMIT: usize = 128 * 1024 * 1024;
pub const DEFAULT_PAGE_CACHE_SIZE: usize = 8192;
@@ -186,8 +188,10 @@ pub enum CheckpointConfig {
/// External backup storage configuration, enough for creating a client for that storage.
#[derive(Debug, Clone)]
pub struct RemoteStorageConfig {
/// Limits the number of concurrent sync operations between pageserver and the remote storage.
pub max_concurrent_sync: usize,
/// Max allowed number of concurrent sync operations between pageserver and the remote storage.
pub max_concurrent_sync: NonZeroUsize,
/// Max allowed errors before the sync task is considered failed and evicted.
pub max_sync_errors: NonZeroU32,
/// The storage connection configuration.
pub storage: RemoteStorageKind,
}

View File

@@ -9,7 +9,7 @@
//!
//! * synchronization logic at [`storage_sync`] module that keeps pageserver state (both runtime one and the workdir files) and storage state in sync.
//!
//! * public API via to interact with the external world: [`run_storage_sync_thread`] and [`schedule_timeline_upload`]
//! * public API via to interact with the external world: [`run_storage_sync_thread`] and [`schedule_timeline_checkpoint_upload`]
//!
//! Here's a schematic overview of all interactions backup and the rest of the pageserver perform:
//!
@@ -17,7 +17,7 @@
//! | | - - - (init async loop) - - - -> | |
//! | | | |
//! | | -------------------------------> | async |
//! | pageserver | (schedule frozen layer upload) | upload/download |
//! | pageserver | (schedule checkpoint upload) | upload/download |
//! | | | loop |
//! | | <------------------------------- | |
//! | | (register downloaded layers) | |
@@ -80,11 +80,17 @@ use std::{
use anyhow::Context;
use tokio::io;
use zenith_utils::zid::{ZTenantId, ZTimelineId};
pub use self::storage_sync::schedule_timeline_upload;
pub use self::storage_sync::schedule_timeline_checkpoint_upload;
use self::{local_fs::LocalFs, rust_s3::S3};
use crate::{PageServerConf, RemoteStorageKind};
/// Any timeline has its own id and its own tenant it belongs to,
/// the sync processes group timelines by both for simplicity.
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Copy, Hash)]
pub struct TimelineSyncId(ZTenantId, ZTimelineId);
/// Based on the config, initiates the remote storage connection and starts a separate thread
/// that ensures that pageserver and the remote storage are in sync with each other.
/// If no external configuraion connection given, no thread or storage initialization is done.
@@ -94,16 +100,19 @@ pub fn run_storage_sync_thread(
match &config.remote_storage_config {
Some(storage_config) => {
let max_concurrent_sync = storage_config.max_concurrent_sync;
let max_sync_errors = storage_config.max_sync_errors;
let handle = match &storage_config.storage {
RemoteStorageKind::LocalFs(root) => storage_sync::spawn_storage_sync_thread(
config,
LocalFs::new(root.clone(), &config.workdir)?,
max_concurrent_sync,
max_sync_errors,
),
RemoteStorageKind::AwsS3(s3_config) => storage_sync::spawn_storage_sync_thread(
config,
S3::new(s3_config, &config.workdir)?,
max_concurrent_sync,
max_sync_errors,
),
};
handle.map(Some)

View File

@@ -43,13 +43,6 @@ AWS S3 returns file checksums during the `list` operation, so that can be used t
For now, due to this, we consider local workdir files as source of truth, not removing them ever and adjusting remote files instead, if image files mismatch.
* no proper retry management
Now, the storage sync attempts to redo the upload/download operation for the image files that failed.
No proper task eviction or backpressure is implemented currently: the tasks will stay in the queue forever, reattempting the downloads.
This will be fixed when more details on the file consistency model will be agreed on.
* sad rust-s3 api
rust-s3 is not very pleasant to use:

File diff suppressed because it is too large Load Diff

View File

@@ -16,6 +16,9 @@ use zenith_utils::zid::ZTimelineId;
pub trait Repository: Send + Sync {
fn shutdown(&self) -> Result<()>;
/// Stops all timeline-related process in the repository and removes the timeline data from memory.
fn unload_timeline(&self, timeline_id: ZTimelineId) -> Result<()>;
/// Get Timeline handle for given zenith timeline ID.
fn get_timeline(&self, timelineid: ZTimelineId) -> Result<Arc<dyn Timeline>>;

View File

@@ -9,7 +9,7 @@ use crate::walredo::PostgresRedoManager;
use crate::PageServerConf;
use anyhow::{anyhow, bail, Context, Result};
use lazy_static::lazy_static;
use log::{debug, info};
use log::*;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::fmt;
@@ -49,6 +49,17 @@ pub enum TenantState {
Stopping,
}
/// A remote storage timeline synchronization event, that needs another step
/// to be fully completed.
#[derive(Debug)]
pub enum PostTimelineSyncStep {
/// The timeline cannot be synchronized anymore due to some sync issues.
/// Needs to be removed from pageserver, to avoid further data diverging.
Evict,
/// A new timeline got downloaded and needs to be loaded into pageserver.
RegisterDownload,
}
impl fmt::Display for TenantState {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
@@ -104,39 +115,57 @@ fn init_repo(conf: &'static PageServerConf, tenant_id: ZTenantId) {
tenant.state = TenantState::Idle;
}
pub fn register_timeline_download(
pub fn perform_post_timeline_sync_steps(
conf: &'static PageServerConf,
tenant_id: ZTenantId,
timeline_id: ZTimelineId,
post_sync_steps: HashMap<(ZTenantId, ZTimelineId), PostTimelineSyncStep>,
) {
log::info!(
"Registering new download, tenant id {}, timeline id: {}",
tenant_id,
timeline_id
);
if post_sync_steps.is_empty() {
return;
}
info!("Performing {} post-sync steps", post_sync_steps.len());
trace!("Steps: {:?}", post_sync_steps);
{
let mut m = access_tenants();
let tenant = m.entry(tenant_id).or_insert_with(|| Tenant {
state: TenantState::Downloading,
repo: None,
});
tenant.state = TenantState::Downloading;
match &tenant.repo {
Some(repo) => {
init_timeline(repo.as_ref(), timeline_id);
tenant.state = TenantState::Idle;
return;
for &(tenant_id, timeline_id) in post_sync_steps.keys() {
let tenant = m.entry(tenant_id).or_insert_with(|| Tenant {
state: TenantState::Downloading,
repo: None,
});
tenant.state = TenantState::Downloading;
match &tenant.repo {
Some(repo) => {
init_timeline(repo.as_ref(), timeline_id);
tenant.state = TenantState::Idle;
return;
}
None => log::warn!("Initialize new repo"),
}
None => log::warn!("Initialize new repo"),
tenant.state = TenantState::Idle;
}
tenant.state = TenantState::Idle;
}
// init repo updates Tenant state
init_repo(conf, tenant_id);
let new_repo = get_repository_for_tenant(tenant_id).unwrap();
init_timeline(new_repo.as_ref(), timeline_id);
for ((tenant_id, timeline_id), post_sync_step) in post_sync_steps {
match post_sync_step {
PostTimelineSyncStep::Evict => {
if let Err(e) = get_repository_for_tenant(tenant_id)
.and_then(|repo| repo.unload_timeline(timeline_id))
{
error!(
"Failed to remove repository for tenant {}, timeline {}: {:#}",
tenant_id, timeline_id, e
)
}
}
PostTimelineSyncStep::RegisterDownload => {
// init repo updates Tenant state
init_repo(conf, tenant_id);
let new_repo = get_repository_for_tenant(tenant_id).unwrap();
init_timeline(new_repo.as_ref(), timeline_id);
}
}
}
}
fn init_timeline(repo: &dyn Repository, timeline_id: ZTimelineId) {