mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-18 05:30:37 +00:00
This is a backwards-incompatible change. The new pageserver cannot read repositories created with an old pageserver binary, or vice versa. Simplify Repository to a value-store ------------------------------------ Move the responsibility of tracking relation metadata, like which relations exist and what are their sizes, from Repository to a new module, pgdatadir_mapping.rs. The interface to Repository is now a simple key-value PUT/GET operations. It's still not any old key-value store though. A Repository is still responsible from handling branching, and every GET operation comes with an LSN. Mapping from Postgres data directory to keys/values --------------------------------------------------- All the data is now stored in the key-value store. The 'pgdatadir_mapping.rs' module handles mapping from PostgreSQL objects like relation pages and SLRUs, to key-value pairs. The key to the Repository key-value store is a Key struct, which consists of a few integer fields. It's wide enough to store a full RelFileNode, fork and block number, and to distinguish those from metadata keys. 'pgdatadir_mapping.rs' is also responsible for maintaining a "partitioning" of the keyspace. Partitioning means splitting the keyspace so that each partition holds a roughly equal number of keys. The partitioning is used when new image layer files are created, so that each image layer file is roughly the same size. The partitioning is also responsible for reclaiming space used by deleted keys. The Repository implementation doesn't have any explicit support for deleting keys. Instead, the deleted keys are simply omitted from the partitioning, and when a new image layer is created, the omitted keys are not copied over to the new image layer. We might want to implement tombstone keys in the future, to reclaim space faster, but this will work for now. Changes to low-level layer file code ------------------------------------ The concept of a "segment" is gone. Each layer file can now store an arbitrary range of Keys. Checkpointing, compaction ------------------------- The background tasks are somewhat different now. Whenever checkpoint_distance is reached, the WAL receiver thread "freezes" the current in-memory layer, and creates a new one. This is a quick operation and doesn't perform any I/O yet. It then launches a background "layer flushing thread" to write the frozen layer to disk, as a new L0 delta layer. This mechanism takes care of durability. It replaces the checkpointing thread. Compaction is a new background operation that takes a bunch of L0 delta layers, and reshuffles the data in them. It runs in a separate compaction thread. Deployment ---------- This also contains changes to the ansible scripts that enable having multiple different pageservers running at the same time in the staging environment. We will use that to keep an old version of the pageserver running, for clusters created with the old version, at the same time with a new pageserver with the new binary. Author: Heikki Linnakangas Author: Konstantin Knizhnik <knizhnik@zenith.tech> Author: Andrey Taranik <andrey@zenith.tech> Reviewed-by: Matthias Van De Meent <matthias@zenith.tech> Reviewed-by: Bojan Serafimov <bojan@zenith.tech> Reviewed-by: Konstantin Knizhnik <knizhnik@zenith.tech> Reviewed-by: Anton Shyrabokau <antons@zenith.tech> Reviewed-by: Dhammika Pathirana <dham@zenith.tech> Reviewed-by: Kirill Bulatov <kirill@zenith.tech> Reviewed-by: Anastasia Lubennikova <anastasia@zenith.tech> Reviewed-by: Alexey Kondratov <alexey@zenith.tech>
322 lines
10 KiB
Rust
322 lines
10 KiB
Rust
//!
|
|
//! This module provides centralized handling of threads in the Page Server.
|
|
//!
|
|
//! We provide a few basic facilities:
|
|
//! - A global registry of threads that lists what kind of threads they are, and
|
|
//! which tenant or timeline they are working on
|
|
//!
|
|
//! - The ability to request a thread to shut down.
|
|
//!
|
|
//!
|
|
//! # How it works?
|
|
//!
|
|
//! There is a global hashmap of all the threads (`THREADS`). Whenever a new
|
|
//! thread is spawned, a PageServerThread entry is added there, and when a
|
|
//! thread dies, it removes itself from the hashmap. If you want to kill a
|
|
//! thread, you can scan the hashmap to find it.
|
|
//!
|
|
//! # Thread shutdown
|
|
//!
|
|
//! To kill a thread, we rely on co-operation from the victim. Each thread is
|
|
//! expected to periodically call the `is_shutdown_requested()` function, and
|
|
//! if it returns true, exit gracefully. In addition to that, when waiting for
|
|
//! the network or other long-running operation, you can use
|
|
//! `shutdown_watcher()` function to get a Future that will become ready if
|
|
//! the current thread has been requested to shut down. You can use that with
|
|
//! Tokio select!(), but note that it relies on thread-local storage, so it
|
|
//! will only work with the "current-thread" Tokio runtime!
|
|
//!
|
|
//!
|
|
//! TODO: This would be a good place to also handle panics in a somewhat sane way.
|
|
//! Depending on what thread panics, we might want to kill the whole server, or
|
|
//! only a single tenant or timeline.
|
|
//!
|
|
|
|
use std::cell::RefCell;
|
|
use std::collections::HashMap;
|
|
use std::panic;
|
|
use std::panic::AssertUnwindSafe;
|
|
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
|
|
use std::sync::{Arc, Mutex};
|
|
use std::thread;
|
|
use std::thread::JoinHandle;
|
|
|
|
use tokio::sync::watch;
|
|
|
|
use tracing::{debug, error, info, warn};
|
|
|
|
use lazy_static::lazy_static;
|
|
|
|
use zenith_utils::zid::{ZTenantId, ZTimelineId};
|
|
|
|
use crate::shutdown_pageserver;
|
|
|
|
lazy_static! {
|
|
/// Each thread that we track is associated with a "thread ID". It's just
|
|
/// an increasing number that we assign, not related to any system thread
|
|
/// id.
|
|
static ref NEXT_THREAD_ID: AtomicU64 = AtomicU64::new(1);
|
|
|
|
/// Global registry of threads
|
|
static ref THREADS: Mutex<HashMap<u64, Arc<PageServerThread>>> = Mutex::new(HashMap::new());
|
|
}
|
|
|
|
// There is a Tokio watch channel for each thread, which can be used to signal the
|
|
// thread that it needs to shut down. This thread local variable holds the receiving
|
|
// end of the channel. The sender is kept in the global registry, so that anyone
|
|
// can send the signal to request thread shutdown.
|
|
thread_local!(static SHUTDOWN_RX: RefCell<Option<watch::Receiver<()>>> = RefCell::new(None));
|
|
|
|
// Each thread holds reference to its own PageServerThread here.
|
|
thread_local!(static CURRENT_THREAD: RefCell<Option<Arc<PageServerThread>>> = RefCell::new(None));
|
|
|
|
///
|
|
/// There are many kinds of threads in the system. Some are associated with a particular
|
|
/// tenant or timeline, while others are global.
|
|
///
|
|
/// Note that we don't try to limit how may threads of a certain kind can be running
|
|
/// at the same time.
|
|
///
|
|
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
|
|
pub enum ThreadKind {
|
|
// libpq listener thread. It just accepts connection and spawns a
|
|
// PageRequestHandler thread for each connection.
|
|
LibpqEndpointListener,
|
|
|
|
// HTTP endpoint listener.
|
|
HttpEndpointListener,
|
|
|
|
// Thread that handles a single connection. A PageRequestHandler thread
|
|
// starts detached from any particular tenant or timeline, but it can be
|
|
// associated with one later, after receiving a command from the client.
|
|
PageRequestHandler,
|
|
|
|
// Thread that connects to a safekeeper to fetch WAL for one timeline.
|
|
WalReceiver,
|
|
|
|
// Thread that handles compaction of all timelines for a tenant.
|
|
Compactor,
|
|
|
|
// Thread that handles GC of a tenant
|
|
GarbageCollector,
|
|
|
|
// Thread that flushes frozen in-memory layers to disk
|
|
LayerFlushThread,
|
|
|
|
// Thread for synchronizing pageserver layer files with the remote storage.
|
|
// Shared by all tenants.
|
|
StorageSync,
|
|
}
|
|
|
|
struct PageServerThread {
|
|
_thread_id: u64,
|
|
|
|
kind: ThreadKind,
|
|
|
|
/// Tenant and timeline that this thread is associated with.
|
|
tenant_id: Option<ZTenantId>,
|
|
timeline_id: Option<ZTimelineId>,
|
|
|
|
name: String,
|
|
|
|
// To request thread shutdown, set the flag, and send a dummy message to the
|
|
// channel to notify it.
|
|
shutdown_requested: AtomicBool,
|
|
shutdown_tx: watch::Sender<()>,
|
|
|
|
/// Handle for waiting for the thread to exit. It can be None, if the
|
|
/// the thread has already exited.
|
|
join_handle: Mutex<Option<JoinHandle<()>>>,
|
|
}
|
|
|
|
/// Launch a new thread
|
|
pub fn spawn<F>(
|
|
kind: ThreadKind,
|
|
tenant_id: Option<ZTenantId>,
|
|
timeline_id: Option<ZTimelineId>,
|
|
name: &str,
|
|
fail_on_error: bool,
|
|
f: F,
|
|
) -> std::io::Result<()>
|
|
where
|
|
F: FnOnce() -> anyhow::Result<()> + Send + 'static,
|
|
{
|
|
let (shutdown_tx, shutdown_rx) = watch::channel(());
|
|
let thread_id = NEXT_THREAD_ID.fetch_add(1, Ordering::Relaxed);
|
|
let thread = PageServerThread {
|
|
_thread_id: thread_id,
|
|
kind,
|
|
tenant_id,
|
|
timeline_id,
|
|
name: name.to_string(),
|
|
|
|
shutdown_requested: AtomicBool::new(false),
|
|
shutdown_tx,
|
|
|
|
join_handle: Mutex::new(None),
|
|
};
|
|
|
|
let thread_rc = Arc::new(thread);
|
|
|
|
let mut jh_guard = thread_rc.join_handle.lock().unwrap();
|
|
|
|
THREADS
|
|
.lock()
|
|
.unwrap()
|
|
.insert(thread_id, Arc::clone(&thread_rc));
|
|
|
|
let thread_rc2 = Arc::clone(&thread_rc);
|
|
let thread_name = name.to_string();
|
|
let join_handle = match thread::Builder::new()
|
|
.name(name.to_string())
|
|
.spawn(move || {
|
|
thread_wrapper(
|
|
thread_name,
|
|
thread_id,
|
|
thread_rc2,
|
|
shutdown_rx,
|
|
fail_on_error,
|
|
f,
|
|
)
|
|
}) {
|
|
Ok(handle) => handle,
|
|
Err(err) => {
|
|
error!("Failed to spawn thread '{}': {}", name, err);
|
|
// Could not spawn the thread. Remove the entry
|
|
THREADS.lock().unwrap().remove(&thread_id);
|
|
return Err(err);
|
|
}
|
|
};
|
|
*jh_guard = Some(join_handle);
|
|
drop(jh_guard);
|
|
|
|
// The thread is now running. Nothing more to do here
|
|
Ok(())
|
|
}
|
|
|
|
/// This wrapper function runs in a newly-spawned thread. It initializes the
|
|
/// thread-local variables and calls the payload function
|
|
fn thread_wrapper<F>(
|
|
thread_name: String,
|
|
thread_id: u64,
|
|
thread: Arc<PageServerThread>,
|
|
shutdown_rx: watch::Receiver<()>,
|
|
fail_on_error: bool,
|
|
f: F,
|
|
) where
|
|
F: FnOnce() -> anyhow::Result<()> + Send + 'static,
|
|
{
|
|
SHUTDOWN_RX.with(|rx| {
|
|
*rx.borrow_mut() = Some(shutdown_rx);
|
|
});
|
|
CURRENT_THREAD.with(|ct| {
|
|
*ct.borrow_mut() = Some(thread);
|
|
});
|
|
|
|
debug!("Starting thread '{}'", thread_name);
|
|
|
|
// We use AssertUnwindSafe here so that the payload function
|
|
// doesn't need to be UnwindSafe. We don't do anything after the
|
|
// unwinding that would expose us to unwind-unsafe behavior.
|
|
let result = panic::catch_unwind(AssertUnwindSafe(f));
|
|
|
|
// Remove our entry from the global hashmap.
|
|
THREADS.lock().unwrap().remove(&thread_id);
|
|
|
|
match result {
|
|
Ok(Ok(())) => debug!("Thread '{}' exited normally", thread_name),
|
|
Ok(Err(err)) => {
|
|
if fail_on_error {
|
|
error!(
|
|
"Shutting down: thread '{}' exited with error: {:?}",
|
|
thread_name, err
|
|
);
|
|
shutdown_pageserver();
|
|
} else {
|
|
error!("Thread '{}' exited with error: {:?}", thread_name, err);
|
|
}
|
|
}
|
|
Err(err) => {
|
|
error!(
|
|
"Shutting down: thread '{}' panicked: {:?}",
|
|
thread_name, err
|
|
);
|
|
shutdown_pageserver();
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Is there a thread running that matches the criteria
|
|
|
|
/// Signal and wait for threads to shut down.
|
|
///
|
|
///
|
|
/// The arguments are used to select the threads to kill. Any None arguments are
|
|
/// ignored. For example, to shut down all WalReceiver threads:
|
|
///
|
|
/// shutdown_threads(Some(ThreadKind::WalReceiver), None, None)
|
|
///
|
|
/// Or to shut down all threads for given timeline:
|
|
///
|
|
/// shutdown_threads(None, Some(timelineid), None)
|
|
///
|
|
pub fn shutdown_threads(
|
|
kind: Option<ThreadKind>,
|
|
tenant_id: Option<ZTenantId>,
|
|
timeline_id: Option<ZTimelineId>,
|
|
) {
|
|
let mut victim_threads = Vec::new();
|
|
|
|
let threads = THREADS.lock().unwrap();
|
|
for thread in threads.values() {
|
|
if (kind.is_none() || Some(thread.kind) == kind)
|
|
&& (tenant_id.is_none() || thread.tenant_id == tenant_id)
|
|
&& (timeline_id.is_none() || thread.timeline_id == timeline_id)
|
|
{
|
|
thread.shutdown_requested.store(true, Ordering::Relaxed);
|
|
// FIXME: handle error?
|
|
let _ = thread.shutdown_tx.send(());
|
|
victim_threads.push(Arc::clone(thread));
|
|
}
|
|
}
|
|
drop(threads);
|
|
|
|
for thread in victim_threads {
|
|
info!("waiting for {} to shut down", thread.name);
|
|
if let Some(join_handle) = thread.join_handle.lock().unwrap().take() {
|
|
let _ = join_handle.join();
|
|
} else {
|
|
// The thread had not even fully started yet. Or it was shut down
|
|
// concurrently and already exited
|
|
}
|
|
}
|
|
}
|
|
|
|
/// A Future that can be used to check if the current thread has been requested to
|
|
/// shut down.
|
|
pub async fn shutdown_watcher() {
|
|
let _ = SHUTDOWN_RX
|
|
.with(|rx| {
|
|
rx.borrow()
|
|
.as_ref()
|
|
.expect("shutdown_requested() called in an unexpected thread")
|
|
.clone()
|
|
})
|
|
.changed()
|
|
.await;
|
|
}
|
|
|
|
/// Has the current thread been requested to shut down?
|
|
pub fn is_shutdown_requested() -> bool {
|
|
CURRENT_THREAD.with(|ct| {
|
|
if let Some(ct) = ct.borrow().as_ref() {
|
|
ct.shutdown_requested.load(Ordering::Relaxed)
|
|
} else {
|
|
if !cfg!(test) {
|
|
warn!("is_shutdown_requested() called in an unexpected thread");
|
|
}
|
|
false
|
|
}
|
|
})
|
|
}
|