mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-14 08:52:56 +00:00
This is a full switch, fs io operations are also tokio ones, working through thread pool. Similar to pageserver, we have multiple runtimes for easier `top` usage and isolation. Notable points: - Now that guts of safekeeper.rs are full of .await's, we need to be very careful not to drop task at random point, leaving timeline in unclear state. Currently the only writer is walreceiver and we don't have top level cancellation there, so we are good. But to be safe probably we should add a fuse panicking if task is being dropped while operation on a timeline is in progress. - Timeline lock is Tokio one now, as we do disk IO under it. - Collecting metrics got a crutch: since prometheus Collector is synchronous, it spawns a thread with current thread runtime collecting data. - Anything involving closures becomes significantly more complicated, as async fns are already kinda closures + 'async closures are unstable'. - Main thread now tracks other main tasks, which got much easier. - The only sync place left is initial data loading, as otherwise clippy complains on timeline map lock being held across await points -- which is not bad here as it happens only in single threaded runtime of main thread. But having it sync doesn't hurt either. I'm concerned about performance of thread pool io offloading, async traits and many await points; but we can try and see how it goes. fixes https://github.com/neondatabase/neon/issues/3036 fixes https://github.com/neondatabase/neon/issues/3966
157 lines
4.7 KiB
Rust
157 lines
4.7 KiB
Rust
use once_cell::sync::Lazy;
|
|
use remote_storage::RemoteStorageConfig;
|
|
use tokio::runtime::Runtime;
|
|
|
|
use std::path::PathBuf;
|
|
use std::time::Duration;
|
|
use storage_broker::Uri;
|
|
|
|
use utils::id::{NodeId, TenantId, TenantTimelineId};
|
|
|
|
mod auth;
|
|
pub mod broker;
|
|
pub mod control_file;
|
|
pub mod control_file_upgrade;
|
|
pub mod debug_dump;
|
|
pub mod handler;
|
|
pub mod http;
|
|
pub mod json_ctrl;
|
|
pub mod metrics;
|
|
pub mod pull_timeline;
|
|
pub mod receive_wal;
|
|
pub mod remove_wal;
|
|
pub mod safekeeper;
|
|
pub mod send_wal;
|
|
pub mod timeline;
|
|
pub mod wal_backup;
|
|
pub mod wal_service;
|
|
pub mod wal_storage;
|
|
|
|
mod timelines_global_map;
|
|
use std::sync::Arc;
|
|
pub use timelines_global_map::GlobalTimelines;
|
|
use utils::auth::JwtAuth;
|
|
|
|
pub mod defaults {
|
|
pub use safekeeper_api::{
|
|
DEFAULT_HTTP_LISTEN_ADDR, DEFAULT_HTTP_LISTEN_PORT, DEFAULT_PG_LISTEN_ADDR,
|
|
DEFAULT_PG_LISTEN_PORT,
|
|
};
|
|
|
|
pub const DEFAULT_HEARTBEAT_TIMEOUT: &str = "5000ms";
|
|
pub const DEFAULT_MAX_OFFLOADER_LAG_BYTES: u64 = 128 * (1 << 20);
|
|
}
|
|
|
|
#[derive(Debug, Clone)]
|
|
pub struct SafeKeeperConf {
|
|
// Repository directory, relative to current working directory.
|
|
// Normally, the safekeeper changes the current working directory
|
|
// to the repository, and 'workdir' is always '.'. But we don't do
|
|
// that during unit testing, because the current directory is global
|
|
// to the process but different unit tests work on different
|
|
// data directories to avoid clashing with each other.
|
|
pub workdir: PathBuf,
|
|
pub my_id: NodeId,
|
|
pub listen_pg_addr: String,
|
|
pub listen_http_addr: String,
|
|
pub availability_zone: Option<String>,
|
|
pub no_sync: bool,
|
|
pub broker_endpoint: Uri,
|
|
pub broker_keepalive_interval: Duration,
|
|
pub heartbeat_timeout: Duration,
|
|
pub remote_storage: Option<RemoteStorageConfig>,
|
|
pub max_offloader_lag_bytes: u64,
|
|
pub backup_parallel_jobs: usize,
|
|
pub wal_backup_enabled: bool,
|
|
pub auth: Option<Arc<JwtAuth>>,
|
|
pub current_thread_runtime: bool,
|
|
}
|
|
|
|
impl SafeKeeperConf {
|
|
pub fn tenant_dir(&self, tenant_id: &TenantId) -> PathBuf {
|
|
self.workdir.join(tenant_id.to_string())
|
|
}
|
|
|
|
pub fn timeline_dir(&self, ttid: &TenantTimelineId) -> PathBuf {
|
|
self.tenant_dir(&ttid.tenant_id)
|
|
.join(ttid.timeline_id.to_string())
|
|
}
|
|
}
|
|
|
|
impl SafeKeeperConf {
|
|
#[cfg(test)]
|
|
fn dummy() -> Self {
|
|
SafeKeeperConf {
|
|
workdir: PathBuf::from("./"),
|
|
no_sync: false,
|
|
listen_pg_addr: defaults::DEFAULT_PG_LISTEN_ADDR.to_string(),
|
|
listen_http_addr: defaults::DEFAULT_HTTP_LISTEN_ADDR.to_string(),
|
|
availability_zone: None,
|
|
remote_storage: None,
|
|
my_id: NodeId(0),
|
|
broker_endpoint: storage_broker::DEFAULT_ENDPOINT
|
|
.parse()
|
|
.expect("failed to parse default broker endpoint"),
|
|
broker_keepalive_interval: Duration::from_secs(5),
|
|
wal_backup_enabled: true,
|
|
backup_parallel_jobs: 1,
|
|
auth: None,
|
|
heartbeat_timeout: Duration::new(5, 0),
|
|
max_offloader_lag_bytes: defaults::DEFAULT_MAX_OFFLOADER_LAG_BYTES,
|
|
current_thread_runtime: false,
|
|
}
|
|
}
|
|
}
|
|
|
|
// Tokio runtimes.
|
|
pub static WAL_SERVICE_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
|
|
tokio::runtime::Builder::new_multi_thread()
|
|
.thread_name("WAL service worker")
|
|
.enable_all()
|
|
.build()
|
|
.expect("Failed to create WAL service runtime")
|
|
});
|
|
|
|
pub static HTTP_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
|
|
tokio::runtime::Builder::new_multi_thread()
|
|
.thread_name("HTTP worker")
|
|
.enable_all()
|
|
.build()
|
|
.expect("Failed to create WAL service runtime")
|
|
});
|
|
|
|
pub static BROKER_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
|
|
tokio::runtime::Builder::new_multi_thread()
|
|
.thread_name("broker worker")
|
|
.worker_threads(2) // there are only 2 tasks, having more threads doesn't make sense
|
|
.enable_all()
|
|
.build()
|
|
.expect("Failed to create broker runtime")
|
|
});
|
|
|
|
pub static WAL_REMOVER_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
|
|
tokio::runtime::Builder::new_multi_thread()
|
|
.thread_name("WAL remover")
|
|
.worker_threads(1)
|
|
.enable_all()
|
|
.build()
|
|
.expect("Failed to create broker runtime")
|
|
});
|
|
|
|
pub static WAL_BACKUP_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
|
|
tokio::runtime::Builder::new_multi_thread()
|
|
.thread_name("WAL backup worker")
|
|
.enable_all()
|
|
.build()
|
|
.expect("Failed to create WAL backup runtime")
|
|
});
|
|
|
|
pub static METRICS_SHIFTER_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
|
|
tokio::runtime::Builder::new_multi_thread()
|
|
.thread_name("metric shifter")
|
|
.worker_threads(1)
|
|
.enable_all()
|
|
.build()
|
|
.expect("Failed to create broker runtime")
|
|
});
|