This commit is contained in:
Christian Schwarz
2024-02-02 15:25:39 +00:00
parent 7c1b2dc9ef
commit de7d366df3
8 changed files with 76 additions and 64 deletions

View File

@@ -23,7 +23,9 @@ enum Command {
SetSlotCount(usize),
}
#[derive(thiserror::Error, Debug)]
pub enum GetError {
#[error("shutting down")]
ShuttingDown,
}

View File

@@ -309,6 +309,9 @@ fn start_pageserver(
info!("Starting pageserver pg protocol handler on {pg_addr}");
let pageserver_listener = tcp_listener::bind(pg_addr)?;
let walredo_process_pool =
Arc::new(COMPUTE_REQUEST_RUNTIME.block_on(pageserver::walredo::ProcessPool::launch(conf)));
// Launch broker client
// The storage_broker::connect call needs to happen inside a tokio runtime thread.
let broker_client = WALRECEIVER_RUNTIME
@@ -414,6 +417,7 @@ fn start_pageserver(
let tenant_manager = BACKGROUND_RUNTIME.block_on(mgr::init_tenant_mgr(
conf,
TenantSharedResources {
walredo_process_pool: walredo_process_pool.clone(),
broker_client: broker_client.clone(),
remote_storage: remote_storage.clone(),
deletion_queue_client,
@@ -545,6 +549,7 @@ fn start_pageserver(
disk_usage_eviction_state,
deletion_queue.new_client(),
secondary_controller,
walredo_process_pool,
)
.context("Failed to initialize router state")?,
);

View File

@@ -55,7 +55,9 @@ use crate::tenant::storage_layer::LayerAccessStatsReset;
use crate::tenant::timeline::CompactFlags;
use crate::tenant::timeline::Timeline;
use crate::tenant::SpawnMode;
use crate::tenant::TenantSharedResources;
use crate::tenant::{LogicalSizeCalculationCause, PageReconstructError};
use crate::walredo;
use crate::{config::PageServerConf, tenant::mgr};
use crate::{disk_usage_eviction_task, tenant};
use pageserver_api::models::{
@@ -97,6 +99,7 @@ pub struct State {
disk_usage_eviction_state: Arc<disk_usage_eviction_task::State>,
deletion_queue_client: DeletionQueueClient,
secondary_controller: SecondaryController,
walredo_process_pool: Arc<walredo::ProcessPool>,
}
impl State {
@@ -110,6 +113,7 @@ impl State {
disk_usage_eviction_state: Arc<disk_usage_eviction_task::State>,
deletion_queue_client: DeletionQueueClient,
secondary_controller: SecondaryController,
walredo_process_pool: Arc<walredo::ProcessPool>,
) -> anyhow::Result<Self> {
let allowlist_routes = ["/v1/status", "/v1/doc", "/swagger.yml", "/metrics"]
.iter()
@@ -125,8 +129,18 @@ impl State {
disk_usage_eviction_state,
deletion_queue_client,
secondary_controller,
walredo_process_pool,
})
}
pub(crate) fn tenant_shared_resources(&self) -> TenantSharedResources {
TenantSharedResources {
walredo_process_pool: Arc::clone(&self.walredo_process_pool),
broker_client: self.broker_client.clone(),
remote_storage: self.remote_storage.clone(),
deletion_queue_client: self.deletion_queue_client.clone(),
}
}
}
#[inline(always)]
@@ -890,9 +904,7 @@ async fn tenant_load_handler(
state.conf,
tenant_id,
generation,
state.broker_client.clone(),
state.remote_storage.clone(),
state.deletion_queue_client.clone(),
state.tenant_shared_resources(),
&ctx,
)
.instrument(info_span!("load", %tenant_id))

View File

@@ -1652,7 +1652,7 @@ pub(crate) static WAL_REDO_RECORD_COUNTER: Lazy<IntCounter> = Lazy::new(|| {
});
#[rustfmt::skip]
pub(crate) static WAL_REDO_PROCESS_LAUNCH_DURATION_HISTOGRAM: Lazy<Histogram> = Lazy::new(|| {
pub(crate) static WAL_REDO_PROCESS_ACQUISITION_LATENCY_HISTO: Lazy<Histogram> = Lazy::new(|| {
register_histogram!(
"pageserver_wal_redo_process_launch_duration",
"Histogram of the duration of successful WalRedoProcess::launch calls",
@@ -2452,7 +2452,7 @@ pub fn preinitialize_metrics() {
&WAL_REDO_TIME,
&WAL_REDO_RECORDS_HISTOGRAM,
&WAL_REDO_BYTES_HISTOGRAM,
&WAL_REDO_PROCESS_LAUNCH_DURATION_HISTOGRAM,
&WAL_REDO_PROCESS_ACQUISITION_LATENCY_HISTO,
]
.into_iter()
.for_each(|h| {

View File

@@ -187,7 +187,7 @@ pub const TENANT_DELETED_MARKER_FILE_NAME: &str = "deleted";
/// as the shared remote storage client and process initialization state.
#[derive(Clone)]
pub struct TenantSharedResources {
pub walredo_process_pool: Arc<walredo::ProcessPool>,
pub walredo_process_pool: Arc<crate::walredo::ProcessPool>,
pub broker_client: storage_broker::BrokerClientChannel,
pub remote_storage: Option<GenericRemoteStorage>,
pub deletion_queue_client: DeletionQueueClient,
@@ -620,12 +620,12 @@ impl Tenant {
broker_client,
remote_storage,
deletion_queue_client,
} = resources;
} = resources ;
let wal_redo_manager = Arc::new(WalRedoManager::from(PostgresRedoManager::new(
conf,
tenant_shard_id,
resources.walredo_process_pool,
walredo_process_pool,
)));
let tenant = Arc::new(Tenant::new(

View File

@@ -1668,9 +1668,7 @@ pub(crate) async fn load_tenant(
conf: &'static PageServerConf,
tenant_id: TenantId,
generation: Generation,
broker_client: storage_broker::BrokerClientChannel,
remote_storage: Option<GenericRemoteStorage>,
deletion_queue_client: DeletionQueueClient,
resources: TenantSharedResources,
ctx: &RequestContext,
) -> Result<(), TenantMapInsertError> {
// This is a legacy API (replaced by `/location_conf`). It does not support sharding
@@ -1689,12 +1687,6 @@ pub(crate) async fn load_tenant(
})?;
}
let resources = TenantSharedResources {
broker_client,
remote_storage,
deletion_queue_client,
};
let mut location_conf =
Tenant::load_tenant_config(conf, &tenant_shard_id).map_err(TenantMapInsertError::Other)?;
location_conf.attach_in_generation(generation);

View File

@@ -22,14 +22,15 @@
mod process;
mod process_pool;
pub(crate) use process_pool::Pool as ProcessPool;
pub use process_pool::Pool as ProcessPool;
use utils::sync::heavier_once_cell;
/// Code to apply [`NeonWalRecord`]s.
mod apply_neon;
use crate::config::PageServerConf;
use crate::metrics::{
WAL_REDO_BYTES_HISTOGRAM, WAL_REDO_PROCESS_LAUNCH_DURATION_HISTOGRAM,
WAL_REDO_BYTES_HISTOGRAM, WAL_REDO_PROCESS_ACQUISITION_LATENCY_HISTO,
WAL_REDO_RECORDS_HISTOGRAM, WAL_REDO_TIME,
};
use crate::repository::Key;
@@ -58,7 +59,8 @@ pub struct PostgresRedoManager {
tenant_shard_id: TenantShardId,
conf: &'static PageServerConf,
last_redo_at: std::sync::Mutex<Option<Instant>>,
redo_process: RwLock<Option<Arc<TaintedProcess>>>,
redo_process: heavier_once_cell::OnceCell<Arc<TaintedProcess>>,
pool: Arc<ProcessPool>,
}
///
@@ -106,6 +108,7 @@ impl PostgresRedoManager {
self.conf.wal_redo_timeout,
pg_version,
)
.await
};
img = Some(result?);
@@ -126,6 +129,7 @@ impl PostgresRedoManager {
self.conf.wal_redo_timeout,
pg_version,
)
.await
}
}
@@ -149,6 +153,16 @@ struct TaintedProcess {
process: Option<Box<WalRedoProcess>>,
}
impl std::ops::Deref for TaintedProcess {
type Target = WalRedoProcess;
fn deref(&self) -> &Self::Target {
self.process
.as_ref()
.expect("only Self::drop sets it to None")
}
}
impl Drop for TaintedProcess {
fn drop(&mut self) {
// ensure tenant_id and span_id are in span
@@ -166,7 +180,7 @@ impl PostgresRedoManager {
pub fn new(
conf: &'static PageServerConf,
tenant_shard_id: TenantShardId,
pool: process_pool::Pool,
pool: Arc<process_pool::Pool>,
) -> PostgresRedoManager {
// The actual process is launched lazily, on first request.
PostgresRedoManager {
@@ -196,7 +210,7 @@ impl PostgresRedoManager {
/// Process one request for WAL redo using wal-redo postgres
///
#[allow(clippy::too_many_arguments)]
fn apply_batch_postgres(
async fn apply_batch_postgres(
&self,
key: Key,
lsn: Lsn,
@@ -213,37 +227,24 @@ impl PostgresRedoManager {
let mut n_attempts = 0u32;
loop {
// launch the WAL redo process on first use
let proc: Arc<process::WalRedoProcess> = {
let proc_guard = self.redo_process.read().unwrap();
match &*proc_guard {
None => {
// "upgrade" to write lock to launch the process
drop(proc_guard);
let mut proc_guard = self.redo_process.write().unwrap();
match &*proc_guard {
None => {
let start = Instant::now();
let pool: Arc<process_pool::Pool> = todo!();
let proc = Arc::new(TaintedProcess {
tenant_shard_id: self.tenant_shard_id,
process: Some(pool.get(pg_version))?,
});
let duration = start.elapsed();
WAL_REDO_PROCESS_LAUNCH_DURATION_HISTOGRAM
.observe(duration.as_secs_f64());
info!(
duration_ms = duration.as_millis(),
pid = proc.id(),
"launched walredo process"
);
*proc_guard = Some(Arc::clone(&proc));
proc
}
Some(proc) => Arc::clone(proc),
}
}
Some(proc) => Arc::clone(proc),
}
let proc = {
let proc = self
.redo_process
.get_or_init(async move {
let start = Instant::now();
let proc = Arc::new(TaintedProcess {
tenant_shard_id: self.tenant_shard_id,
process: Some(self.pool.get(pg_version).await?),
});
let duration = start.elapsed();
WAL_REDO_PROCESS_ACQUISITION_LATENCY_HISTO.observe(duration.as_secs_f64());
info!(
duration_ms = duration.as_millis(),
pid = proc.id(),
"acquired pre-spawned walredo process"
);
})
.await?;
};
let started_at = std::time::Instant::now();
@@ -397,7 +398,7 @@ mod tests {
async fn short_v14_redo() {
let expected = std::fs::read("test_data/short_v14_redo.page").unwrap();
let h = RedoHarness::new().unwrap();
let h = RedoHarness::new().await.unwrap();
let page = h
.manager
@@ -423,7 +424,7 @@ mod tests {
#[tokio::test]
async fn short_v14_fails_for_wrong_key_but_returns_zero_page() {
let h = RedoHarness::new().unwrap();
let h = RedoHarness::new().await.unwrap();
let page = h
.manager
@@ -452,7 +453,7 @@ mod tests {
#[tokio::test]
async fn test_stderr() {
let h = RedoHarness::new().unwrap();
let h = RedoHarness::new().await.unwrap();
h
.manager
.request_redo(
@@ -493,7 +494,7 @@ mod tests {
}
impl RedoHarness {
fn new() -> anyhow::Result<Self> {
async fn new() -> anyhow::Result<Self> {
crate::tenant::harness::setup_logging();
let repo_dir = camino_tempfile::tempdir()?;
@@ -501,7 +502,9 @@ mod tests {
let conf = Box::leak(Box::new(conf));
let tenant_shard_id = TenantShardId::unsharded(TenantId::generate());
let manager = PostgresRedoManager::new(conf, tenant_shard_id);
let pool = crate::walredo::process_pool::Pool::launch(conf).await;
let manager = PostgresRedoManager::new(conf, tenant_shard_id, pool);
Ok(RedoHarness {
_repo_dir: repo_dir,

View File

@@ -1,5 +1,6 @@
use std::sync::Arc;
use anyhow::Context;
use utils::pre_spawned_pool;
use crate::config::PageServerConf;
@@ -19,7 +20,7 @@ struct Launcher {
impl utils::pre_spawned_pool::Launcher<Box<WalRedoProcess>> for Launcher {
fn create(&self) -> anyhow::Result<Box<WalRedoProcess>> {
Ok(Arc::new(WalRedoProcess::launch(
Ok(Box::new(WalRedoProcess::launch(
self.conf,
self.pg_version,
)?))
@@ -46,16 +47,13 @@ impl Pool {
.await,
}
}
pub fn get(
&self,
pg_version: usize,
) -> Result<Box<WalRedoProcess>, pre_spawned_pool::GetError> {
pub async fn get(&self, pg_version: u32) -> anyhow::Result<Box<WalRedoProcess>> {
let pool = match pg_version {
14 => &self.v14,
15 => &self.v15,
16 => &self.v16,
x => anyhow::bail!("unknown pg version: {x}"),
};
pool.get()
pool.get().await.context("get pre-spawned walredo process")
}
}