Rework the relish thread model (#689)

This commit is contained in:
Kirill Bulatov
2021-10-05 10:15:56 +03:00
committed by GitHub
parent d134a9856e
commit 5719f13cb2
8 changed files with 179 additions and 167 deletions

View File

@@ -24,8 +24,8 @@ use pageserver::{
DEFAULT_HTTP_LISTEN_ADDR, DEFAULT_PG_LISTEN_ADDR,
DEFAULT_RELISH_STORAGE_MAX_CONCURRENT_SYNC_LIMITS,
},
http, page_service, tenant_mgr, PageServerConf, RelishStorageConfig, RelishStorageKind,
S3Config, LOG_FILE_NAME,
http, page_service, relish_storage, tenant_mgr, PageServerConf, RelishStorageConfig,
RelishStorageKind, S3Config, LOG_FILE_NAME,
};
use zenith_utils::http::endpoint;
@@ -483,12 +483,16 @@ fn start_pageserver(conf: &'static PageServerConf) -> Result<()> {
}
}
// keep join handles for spawned threads
// don't spawn threads before daemonizing
let mut join_handles = Vec::new();
if let Some(handle) = relish_storage::run_storage_sync_thread(conf)? {
join_handles.push(handle);
}
// Initialize tenant manager.
tenant_mgr::init(conf);
// keep join handles for spawned threads
let mut join_handles = vec![];
// initialize authentication for incoming connections
let auth = match &conf.auth_type {
AuthType::Trust | AuthType::MD5 => None,

View File

@@ -151,6 +151,7 @@ pub fn create_repo(
conf,
wal_redo_manager,
tenantid,
false,
));
// Load data into pageserver

View File

@@ -34,7 +34,7 @@ use std::{fs, thread};
use crate::layered_repository::inmemory_layer::FreezeLayers;
use crate::relish::*;
use crate::relish_storage::storage_uploader::QueueBasedRelishUploader;
use crate::relish_storage::schedule_timeline_upload;
use crate::repository::{GcResult, Repository, Timeline, WALRecord};
use crate::walreceiver::IS_WAL_RECEIVER;
use crate::walredo::WalRedoManager;
@@ -119,7 +119,9 @@ pub struct LayeredRepository {
timelines: Mutex<HashMap<ZTimelineId, Arc<LayeredTimeline>>>,
walredo_mgr: Arc<dyn WalRedoManager + Send + Sync>,
relish_uploader: Option<Arc<QueueBasedRelishUploader>>,
/// Makes evey repo's timelines to backup their files to remote storage,
/// when they get frozen.
upload_relishes: bool,
}
/// Public interface
@@ -151,8 +153,8 @@ impl Repository for LayeredRepository {
timelineid,
self.tenantid,
Arc::clone(&self.walredo_mgr),
self.relish_uploader.as_ref().map(Arc::clone),
0,
false,
)?;
let timeline_rc = Arc::new(timeline);
@@ -244,8 +246,8 @@ impl LayeredRepository {
timelineid,
self.tenantid,
Arc::clone(&self.walredo_mgr),
self.relish_uploader.as_ref().map(Arc::clone),
0, // init with 0 and update after layers are loaded
0, // init with 0 and update after layers are loaded,
self.upload_relishes,
)?;
// List the layers on disk, and load them into the layer map
@@ -278,15 +280,14 @@ impl LayeredRepository {
conf: &'static PageServerConf,
walredo_mgr: Arc<dyn WalRedoManager + Send + Sync>,
tenantid: ZTenantId,
upload_relishes: bool,
) -> LayeredRepository {
LayeredRepository {
tenantid,
conf,
timelines: Mutex::new(HashMap::new()),
walredo_mgr,
relish_uploader: conf.relish_storage_config.as_ref().map(|config| {
Arc::new(QueueBasedRelishUploader::new(config, &conf.workdir).unwrap())
}),
upload_relishes,
}
}
@@ -588,8 +589,6 @@ pub struct LayeredTimeline {
// WAL redo manager
walredo_mgr: Arc<dyn WalRedoManager + Sync + Send>,
relish_uploader: Option<Arc<QueueBasedRelishUploader>>,
// What page versions do we hold in the repository? If we get a
// request > last_record_lsn, we need to wait until we receive all
// the WAL up to the request. The SeqWait provides functions for
@@ -637,6 +636,9 @@ pub struct LayeredTimeline {
// TODO: it is possible to combine these two fields into single one using custom metric which uses SeqCst
// ordering for its operations, but involves private modules, and macro trickery
current_logical_size_gauge: IntGauge,
/// If `true`, will backup its timeline files to remote storage after freezing.
upload_relishes: bool,
}
/// Public interface functions
@@ -1020,8 +1022,8 @@ impl LayeredTimeline {
timelineid: ZTimelineId,
tenantid: ZTenantId,
walredo_mgr: Arc<dyn WalRedoManager + Send + Sync>,
relish_uploader: Option<Arc<QueueBasedRelishUploader>>,
current_logical_size: usize,
upload_relishes: bool,
) -> Result<LayeredTimeline> {
let current_logical_size_gauge = LOGICAL_TIMELINE_SIZE
.get_metric_with_label_values(&[&tenantid.to_string(), &timelineid.to_string()])
@@ -1033,7 +1035,6 @@ impl LayeredTimeline {
layers: Mutex::new(LayerMap::default()),
walredo_mgr,
relish_uploader,
// initialize in-memory 'last_record_lsn' from 'disk_consistent_lsn'.
last_record_lsn: SeqWait::new(RecordLsn {
@@ -1046,6 +1047,7 @@ impl LayeredTimeline {
ancestor_lsn: metadata.ancestor_lsn,
current_logical_size: AtomicUsize::new(current_logical_size),
current_logical_size_gauge,
upload_relishes,
};
Ok(timeline)
}
@@ -1448,12 +1450,6 @@ impl LayeredTimeline {
created_historics = true;
}
if let Some(relish_uploader) = &self.relish_uploader {
for label_path in new_historics.iter().filter_map(|layer| layer.path()) {
relish_uploader.schedule_upload(self.timelineid, label_path);
}
}
// Finally, replace the frozen in-memory layer with the new on-disk layers
layers.remove_historic(frozen.clone());
@@ -1515,15 +1511,23 @@ impl LayeredTimeline {
ancestor_timeline: ancestor_timelineid,
ancestor_lsn: self.ancestor_lsn,
};
let metadata_path = LayeredRepository::save_metadata(
let _metadata_path = LayeredRepository::save_metadata(
self.conf,
self.timelineid,
self.tenantid,
&metadata,
false,
)?;
if let Some(relish_uploader) = &self.relish_uploader {
relish_uploader.schedule_upload(self.timelineid, metadata_path);
if self.upload_relishes {
schedule_timeline_upload(())
// schedule_timeline_upload(LocalTimeline {
// tenant_id: self.tenantid,
// timeline_id: self.timelineid,
// metadata_path,
// image_layers: image_layer_uploads,
// delta_layers: delta_layer_uploads,
// disk_consistent_lsn,
// });
}
// Also update the in-memory copy

View File

@@ -8,14 +8,43 @@
mod local_fs;
mod rust_s3;
/// A queue and the background machinery behind it to upload
/// local page server layer files to external storage.
pub mod storage_uploader;
/// A queue-based storage with the background machinery behind it to synchronize
/// local page server layer files with external storage.
mod synced_storage;
use std::path::Path;
use std::thread;
use anyhow::Context;
use self::local_fs::LocalFs;
pub use self::synced_storage::schedule_timeline_upload;
use crate::relish_storage::rust_s3::RustS3;
use crate::{PageServerConf, RelishStorageKind};
pub fn run_storage_sync_thread(
config: &'static PageServerConf,
) -> anyhow::Result<Option<thread::JoinHandle<anyhow::Result<()>>>> {
match &config.relish_storage_config {
Some(relish_storage_config) => {
let max_concurrent_sync = relish_storage_config.max_concurrent_sync;
match &relish_storage_config.storage {
RelishStorageKind::LocalFs(root) => synced_storage::run_storage_sync_thread(
config,
LocalFs::new(root.clone())?,
max_concurrent_sync,
),
RelishStorageKind::AwsS3(s3_config) => synced_storage::run_storage_sync_thread(
config,
RustS3::new(s3_config)?,
max_concurrent_sync,
),
}
}
None => Ok(None),
}
}
/// Storage (potentially remote) API to manage its state.
#[async_trait::async_trait]
pub trait RelishStorage: Send + Sync {

View File

@@ -1,116 +0,0 @@
use std::{
collections::VecDeque,
path::{Path, PathBuf},
sync::{Arc, Mutex},
thread,
};
use zenith_utils::zid::ZTimelineId;
use crate::{relish_storage::RelishStorage, RelishStorageConfig, RelishStorageKind};
use super::{local_fs::LocalFs, rust_s3::RustS3};
pub struct QueueBasedRelishUploader {
upload_queue: Arc<Mutex<VecDeque<(ZTimelineId, PathBuf)>>>,
}
impl QueueBasedRelishUploader {
pub fn new(
config: &RelishStorageConfig,
page_server_workdir: &'static Path,
) -> anyhow::Result<Self> {
let upload_queue = Arc::new(Mutex::new(VecDeque::new()));
let _handle = match &config.storage {
RelishStorageKind::LocalFs(root) => {
let relish_storage = LocalFs::new(root.clone())?;
create_upload_thread(
Arc::clone(&upload_queue),
relish_storage,
page_server_workdir,
)?
}
RelishStorageKind::AwsS3(s3_config) => {
let relish_storage = RustS3::new(s3_config)?;
create_upload_thread(
Arc::clone(&upload_queue),
relish_storage,
page_server_workdir,
)?
}
};
Ok(Self { upload_queue })
}
pub fn schedule_upload(&self, timeline_id: ZTimelineId, relish_path: PathBuf) {
self.upload_queue
.lock()
.unwrap()
.push_back((timeline_id, relish_path))
}
}
fn create_upload_thread<P, S: 'static + RelishStorage<RelishStoragePath = P>>(
upload_queue: Arc<Mutex<VecDeque<(ZTimelineId, PathBuf)>>>,
relish_storage: S,
page_server_workdir: &'static Path,
) -> std::io::Result<thread::JoinHandle<()>> {
let runtime = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()?;
thread::Builder::new()
.name("Queue based relish uploader".to_string())
.spawn(move || loop {
runtime.block_on(async {
upload_loop_step(&upload_queue, &relish_storage, page_server_workdir).await;
})
})
}
async fn upload_loop_step<P, S: 'static + RelishStorage<RelishStoragePath = P>>(
upload_queue: &Mutex<VecDeque<(ZTimelineId, PathBuf)>>,
relish_storage: &S,
page_server_workdir: &Path,
) {
let mut queue_accessor = upload_queue.lock().unwrap();
log::debug!("current upload queue length: {}", queue_accessor.len());
let next_upload = queue_accessor.pop_front();
drop(queue_accessor);
let (relish_timeline_id, relish_local_path) = match next_upload {
Some(data) => data,
None => {
// Don't spin and allow others to use the queue.
// In future, could be improved to be more clever about delays depending on relish upload stats
thread::sleep(std::time::Duration::from_secs(1));
return;
}
};
if let Err(e) = upload_relish(relish_storage, page_server_workdir, &relish_local_path).await {
log::error!(
"Failed to upload relish '{}' for timeline {}, reason: {:#}",
relish_local_path.display(),
relish_timeline_id,
e
);
upload_queue
.lock()
.unwrap()
.push_back((relish_timeline_id, relish_local_path))
} else {
log::debug!("Relish successfully uploaded");
}
}
async fn upload_relish<P, S: RelishStorage<RelishStoragePath = P>>(
relish_storage: &S,
page_server_workdir: &Path,
relish_local_path: &Path,
) -> anyhow::Result<()> {
let destination = S::derive_destination(page_server_workdir, relish_local_path)?;
relish_storage
.upload_relish(relish_local_path, &destination)
.await
}

View File

@@ -0,0 +1,52 @@
use std::time::Duration;
use std::{collections::BinaryHeap, sync::Mutex, thread};
use crate::{relish_storage::RelishStorage, PageServerConf};
lazy_static::lazy_static! {
static ref UPLOAD_QUEUE: Mutex<BinaryHeap<SyncTask>> = Mutex::new(BinaryHeap::new());
}
pub fn schedule_timeline_upload(_local_timeline: ()) {
// UPLOAD_QUEUE
// .lock()
// .unwrap()
// .push(SyncTask::Upload(local_timeline))
}
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord)]
enum SyncTask {}
pub fn run_storage_sync_thread<
P: std::fmt::Debug,
S: 'static + RelishStorage<RelishStoragePath = P>,
>(
config: &'static PageServerConf,
relish_storage: S,
max_concurrent_sync: usize,
) -> anyhow::Result<Option<thread::JoinHandle<anyhow::Result<()>>>> {
let runtime = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()?;
let handle = thread::Builder::new()
.name("Queue based relish storage sync".to_string())
.spawn(move || loop {
let mut queue_accessor = UPLOAD_QUEUE.lock().unwrap();
log::debug!("Upload queue length: {}", queue_accessor.len());
let next_task = queue_accessor.pop();
drop(queue_accessor);
match next_task {
Some(task) => runtime.block_on(async {
// suppress warnings
let _ = (config, task, &relish_storage, max_concurrent_sync);
todo!("omitted for brevity")
}),
None => {
thread::sleep(Duration::from_secs(1));
continue;
}
}
})?;
Ok(Some(handle))
}

View File

@@ -289,6 +289,7 @@ mod tests {
self.conf,
walredo_mgr,
self.tenant_id,
false,
))
}

View File

@@ -9,46 +9,84 @@ use crate::PageServerConf;
use anyhow::{anyhow, bail, Context, Result};
use lazy_static::lazy_static;
use log::info;
use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::fs;
use std::str::FromStr;
use std::sync::{Arc, Mutex};
use std::sync::{Arc, Mutex, MutexGuard};
use zenith_utils::zid::{ZTenantId, ZTimelineId};
lazy_static! {
pub static ref REPOSITORY: Mutex<HashMap<ZTenantId, Arc<dyn Repository>>> =
static ref REPOSITORY: Mutex<HashMap<ZTenantId, Arc<dyn Repository>>> =
Mutex::new(HashMap::new());
}
pub fn init(conf: &'static PageServerConf) {
let mut m = REPOSITORY.lock().unwrap();
fn access_repository() -> MutexGuard<'static, HashMap<ZTenantId, Arc<dyn Repository>>> {
REPOSITORY.lock().unwrap()
}
pub fn init(conf: &'static PageServerConf) {
let mut m = access_repository();
for dir_entry in fs::read_dir(conf.tenants_path()).unwrap() {
let tenantid =
ZTenantId::from_str(dir_entry.unwrap().file_name().to_str().unwrap()).unwrap();
// Set up a WAL redo manager, for applying WAL records.
let walredo_mgr = PostgresRedoManager::new(conf, tenantid);
// Set up an object repository, for actual data storage.
let repo = Arc::new(LayeredRepository::new(
conf,
Arc::new(walredo_mgr),
tenantid,
));
LayeredRepository::launch_checkpointer_thread(conf, repo.clone());
LayeredRepository::launch_gc_thread(conf, repo.clone());
let repo = init_repo(conf, tenantid);
info!("initialized storage for tenant: {}", &tenantid);
m.insert(tenantid, repo);
}
}
fn init_repo(conf: &'static PageServerConf, tenant_id: ZTenantId) -> Arc<LayeredRepository> {
// Set up a WAL redo manager, for applying WAL records.
let walredo_mgr = PostgresRedoManager::new(conf, tenant_id);
// Set up an object repository, for actual data storage.
let repo = Arc::new(LayeredRepository::new(
conf,
Arc::new(walredo_mgr),
tenant_id,
true,
));
LayeredRepository::launch_checkpointer_thread(conf, repo.clone());
LayeredRepository::launch_gc_thread(conf, repo.clone());
repo
}
// TODO kb Currently unused function, will later be used when the relish storage downloads a new layer.
// Relevant PR: https://github.com/zenithdb/zenith/pull/686
pub fn register_relish_download(
conf: &'static PageServerConf,
tenant_id: ZTenantId,
timeline_id: ZTimelineId,
) {
log::info!(
"Registering new download, tenant id {}, timeline id: {}",
tenant_id,
timeline_id
);
match access_repository().entry(tenant_id) {
Entry::Occupied(o) => init_timeline(o.get().as_ref(), timeline_id),
Entry::Vacant(v) => {
log::info!("New repo initialized");
let new_repo = init_repo(conf, tenant_id);
init_timeline(new_repo.as_ref(), timeline_id);
v.insert(new_repo);
}
}
}
fn init_timeline(repo: &dyn Repository, timeline_id: ZTimelineId) {
match repo.get_timeline(timeline_id) {
Ok(_timeline) => log::info!("Successfully initialized timeline {}", timeline_id),
Err(e) => log::error!("Failed to init timeline {}, reason: {:#}", timeline_id, e),
}
}
pub fn create_repository_for_tenant(
conf: &'static PageServerConf,
tenantid: ZTenantId,
) -> Result<()> {
let mut m = REPOSITORY.lock().unwrap();
let mut m = access_repository();
// First check that the tenant doesn't exist already
if m.get(&tenantid).is_some() {
@@ -63,13 +101,12 @@ pub fn create_repository_for_tenant(
}
pub fn insert_repository_for_tenant(tenantid: ZTenantId, repo: Arc<dyn Repository>) {
let o = &mut REPOSITORY.lock().unwrap();
o.insert(tenantid, repo);
access_repository().insert(tenantid, repo);
}
pub fn get_repository_for_tenant(tenantid: ZTenantId) -> Result<Arc<dyn Repository>> {
let o = &REPOSITORY.lock().unwrap();
o.get(&tenantid)
access_repository()
.get(&tenantid)
.map(Arc::clone)
.ok_or_else(|| anyhow!("repository not found for tenant name {}", tenantid))
}