Compare commits

..

12 Commits

Author SHA1 Message Date
Joonas Koivunen
d87549696b chore: clippy::too_many_arguments 2023-12-13 23:07:25 +00:00
Joonas Koivunen
960a29a6fe refactor: cleanup extra cancellation waits 2023-12-13 23:05:54 +00:00
Joonas Koivunen
d6f6e9a87b fix: layer backoff 2023-12-13 23:05:54 +00:00
Joonas Koivunen
ddae6e2b0a feat: task hierarchy 2023-12-13 23:05:48 +00:00
Joonas Koivunen
e021298dec use child_token instead of cloning 2023-12-13 23:05:35 +00:00
Joonas Koivunen
9790a7c2e8 test: allow shutdown_token when #[cfg(test)] 2023-12-13 22:56:31 +00:00
Joonas Koivunen
9660282c69 chore: cleanup unused 2023-12-13 22:41:06 +00:00
Joonas Koivunen
894cd3ddf7 refactor: eviction_task: stop using plain rate_limit 2023-12-13 22:41:06 +00:00
Joonas Koivunen
735c9b3b70 fix: gc lock acquire cancel 2023-12-13 22:33:32 +00:00
Joonas Koivunen
e76b24ccc5 fix: initial logical size permit cancel 2023-12-13 22:33:32 +00:00
Joonas Koivunen
6ff2c07cc8 fix: compaction lock and permit cancellable 2023-12-13 22:33:32 +00:00
Joonas Koivunen
efd46e478a refactor: split concurrent_background_tasks_rate_limit 2023-12-13 22:33:32 +00:00
64 changed files with 625 additions and 1931 deletions

2
Cargo.lock generated
View File

@@ -3103,7 +3103,6 @@ dependencies = [
"humantime-serde",
"hyper",
"itertools",
"md5",
"metrics",
"nix 0.26.2",
"num-traits",
@@ -5765,7 +5764,6 @@ dependencies = [
"serde",
"serde_assert",
"serde_json",
"serde_path_to_error",
"serde_with",
"signal-hook",
"strum",

View File

@@ -407,7 +407,6 @@ impl PageServerNode {
.map(|x| x.parse::<bool>())
.transpose()
.context("Failed to parse 'gc_feedback' as bool")?,
heatmap_period: settings.remove("heatmap_period").map(|x| x.to_string()),
};
let request = models::TenantCreateRequest {
@@ -505,7 +504,6 @@ impl PageServerNode {
.map(|x| x.parse::<bool>())
.transpose()
.context("Failed to parse 'gc_feedback' as bool")?,
heatmap_period: settings.remove("heatmap_period").map(|x| x.to_string()),
}
};

View File

@@ -3,11 +3,8 @@
//! Otherwise, we might not see all metrics registered via
//! a default registry.
#![deny(clippy::undocumented_unsafe_blocks)]
use once_cell::sync::Lazy;
use prometheus::core::{
Atomic, AtomicU64, Collector, GenericCounter, GenericCounterVec, GenericGauge, GenericGaugeVec,
};
use prometheus::core::{AtomicU64, Collector, GenericGauge, GenericGaugeVec};
pub use prometheus::opts;
pub use prometheus::register;
pub use prometheus::Error;
@@ -135,137 +132,3 @@ fn get_rusage_stats() -> libc::rusage {
rusage.assume_init()
}
}
/// Create an [`IntCounterPairVec`] and registers to default registry.
#[macro_export(local_inner_macros)]
macro_rules! register_int_counter_pair_vec {
($NAME1:expr, $HELP1:expr, $NAME2:expr, $HELP2:expr, $LABELS_NAMES:expr $(,)?) => {{
match (
$crate::register_int_counter_vec!($NAME1, $HELP1, $LABELS_NAMES),
$crate::register_int_counter_vec!($NAME2, $HELP2, $LABELS_NAMES),
) {
(Ok(inc), Ok(dec)) => Ok($crate::IntCounterPairVec::new(inc, dec)),
(Err(e), _) | (_, Err(e)) => Err(e),
}
}};
}
/// Create an [`IntCounterPair`] and registers to default registry.
#[macro_export(local_inner_macros)]
macro_rules! register_int_counter_pair {
($NAME1:expr, $HELP1:expr, $NAME2:expr, $HELP2:expr $(,)?) => {{
match (
$crate::register_int_counter!($NAME1, $HELP1),
$crate::register_int_counter!($NAME2, $HELP2),
) {
(Ok(inc), Ok(dec)) => Ok($crate::IntCounterPair::new(inc, dec)),
(Err(e), _) | (_, Err(e)) => Err(e),
}
}};
}
/// A Pair of [`GenericCounterVec`]s. Like an [`GenericGaugeVec`] but will always observe changes
pub struct GenericCounterPairVec<P: Atomic> {
inc: GenericCounterVec<P>,
dec: GenericCounterVec<P>,
}
/// A Pair of [`GenericCounter`]s. Like an [`GenericGauge`] but will always observe changes
pub struct GenericCounterPair<P: Atomic> {
inc: GenericCounter<P>,
dec: GenericCounter<P>,
}
impl<P: Atomic> GenericCounterPairVec<P> {
pub fn new(inc: GenericCounterVec<P>, dec: GenericCounterVec<P>) -> Self {
Self { inc, dec }
}
/// `get_metric_with_label_values` returns the [`GenericCounterPair<P>`] for the given slice
/// of label values (same order as the VariableLabels in Desc). If that combination of
/// label values is accessed for the first time, a new [`GenericCounterPair<P>`] is created.
///
/// An error is returned if the number of label values is not the same as the
/// number of VariableLabels in Desc.
pub fn get_metric_with_label_values(&self, vals: &[&str]) -> Result<GenericCounterPair<P>> {
Ok(GenericCounterPair {
inc: self.inc.get_metric_with_label_values(vals)?,
dec: self.dec.get_metric_with_label_values(vals)?,
})
}
/// `with_label_values` works as `get_metric_with_label_values`, but panics if an error
/// occurs.
pub fn with_label_values(&self, vals: &[&str]) -> GenericCounterPair<P> {
self.get_metric_with_label_values(vals).unwrap()
}
}
impl<P: Atomic> GenericCounterPair<P> {
pub fn new(inc: GenericCounter<P>, dec: GenericCounter<P>) -> Self {
Self { inc, dec }
}
/// Increment the gauge by 1, returning a guard that decrements by 1 on drop.
pub fn guard(&self) -> GenericCounterPairGuard<P> {
self.inc.inc();
GenericCounterPairGuard(self.dec.clone())
}
/// Increment the gauge by n, returning a guard that decrements by n on drop.
pub fn guard_by(&self, n: P::T) -> GenericCounterPairGuardBy<P> {
self.inc.inc_by(n);
GenericCounterPairGuardBy(self.dec.clone(), n)
}
/// Increase the gauge by 1.
#[inline]
pub fn inc(&self) {
self.inc.inc();
}
/// Decrease the gauge by 1.
#[inline]
pub fn dec(&self) {
self.dec.inc();
}
/// Add the given value to the gauge. (The value can be
/// negative, resulting in a decrement of the gauge.)
#[inline]
pub fn inc_by(&self, v: P::T) {
self.inc.inc_by(v);
}
/// Subtract the given value from the gauge. (The value can be
/// negative, resulting in an increment of the gauge.)
#[inline]
pub fn dec_by(&self, v: P::T) {
self.dec.inc_by(v);
}
}
/// Guard returned by [`GenericCounterPair::guard`]
pub struct GenericCounterPairGuard<P: Atomic>(GenericCounter<P>);
impl<P: Atomic> Drop for GenericCounterPairGuard<P> {
fn drop(&mut self) {
self.0.inc();
}
}
/// Guard returned by [`GenericCounterPair::guard_by`]
pub struct GenericCounterPairGuardBy<P: Atomic>(GenericCounter<P>, P::T);
impl<P: Atomic> Drop for GenericCounterPairGuardBy<P> {
fn drop(&mut self) {
self.0.inc_by(self.1);
}
}
/// A Pair of [`IntCounterVec`]s. Like an [`IntGaugeVec`] but will always observe changes
pub type IntCounterPairVec = GenericCounterPairVec<AtomicU64>;
/// A Pair of [`IntCounter`]s. Like an [`IntGauge`] but will always observe changes
pub type IntCounterPair = GenericCounterPair<AtomicU64>;
/// A guard for [`IntCounterPair`] that will decrement the gauge on drop
pub type IntCounterPairGuard = GenericCounterPairGuard<AtomicU64>;

View File

@@ -237,7 +237,6 @@ pub struct TenantConfig {
pub min_resident_size_override: Option<u64>,
pub evictions_low_residence_duration_metric_threshold: Option<String>,
pub gc_feedback: Option<bool>,
pub heatmap_period: Option<String>,
}
/// A flattened analog of a `pagesever::tenant::LocationMode`, which

View File

@@ -50,8 +50,6 @@ const_format.workspace = true
# why is it only here? no other crate should use it, streams are rarely needed.
tokio-stream = { version = "0.1.14" }
serde_path_to_error.workspace = true
[dev-dependencies]
byteorder.workspace = true
bytes.workspace = true

View File

@@ -25,12 +25,8 @@ pub async fn json_request_or_empty_body<T: for<'de> Deserialize<'de>>(
if body.remaining() == 0 {
return Ok(None);
}
let mut deser = serde_json::de::Deserializer::from_reader(body.reader());
serde_path_to_error::deserialize(&mut deser)
// intentionally stringify because the debug version is not helpful in python logs
.map_err(|e| anyhow::anyhow!("Failed to parse json request: {e}"))
serde_json::from_reader(body.reader())
.context("Failed to parse json request")
.map(Some)
.map_err(ApiError::BadRequest)
}

View File

@@ -1,7 +1,6 @@
use std::str::FromStr;
use anyhow::Context;
use metrics::{IntCounter, IntCounterVec};
use once_cell::sync::Lazy;
use strum_macros::{EnumString, EnumVariantNames};
@@ -25,48 +24,16 @@ impl LogFormat {
}
}
struct TracingEventCountMetric {
error: IntCounter,
warn: IntCounter,
info: IntCounter,
debug: IntCounter,
trace: IntCounter,
}
static TRACING_EVENT_COUNT_METRIC: Lazy<TracingEventCountMetric> = Lazy::new(|| {
let vec = metrics::register_int_counter_vec!(
static TRACING_EVENT_COUNT: Lazy<metrics::IntCounterVec> = Lazy::new(|| {
metrics::register_int_counter_vec!(
"libmetrics_tracing_event_count",
"Number of tracing events, by level",
&["level"]
)
.expect("failed to define metric");
TracingEventCountMetric::new(vec)
.expect("failed to define metric")
});
impl TracingEventCountMetric {
fn new(vec: IntCounterVec) -> Self {
Self {
error: vec.with_label_values(&["error"]),
warn: vec.with_label_values(&["warn"]),
info: vec.with_label_values(&["info"]),
debug: vec.with_label_values(&["debug"]),
trace: vec.with_label_values(&["trace"]),
}
}
fn inc_for_level(&self, level: tracing::Level) {
let counter = match level {
tracing::Level::ERROR => &self.error,
tracing::Level::WARN => &self.warn,
tracing::Level::INFO => &self.info,
tracing::Level::DEBUG => &self.debug,
tracing::Level::TRACE => &self.trace,
};
counter.inc();
}
}
struct TracingEventCountLayer(&'static TracingEventCountMetric);
struct TracingEventCountLayer(&'static metrics::IntCounterVec);
impl<S> tracing_subscriber::layer::Layer<S> for TracingEventCountLayer
where
@@ -77,7 +44,15 @@ where
event: &tracing::Event<'_>,
_ctx: tracing_subscriber::layer::Context<'_, S>,
) {
self.0.inc_for_level(*event.metadata().level());
let level = event.metadata().level();
let level = match *level {
tracing::Level::ERROR => "error",
tracing::Level::WARN => "warn",
tracing::Level::INFO => "info",
tracing::Level::DEBUG => "debug",
tracing::Level::TRACE => "trace",
};
self.0.with_label_values(&[level]).inc();
}
}
@@ -131,9 +106,7 @@ pub fn init(
};
log_layer.with_filter(rust_log_env_filter())
});
let r = r.with(
TracingEventCountLayer(&TRACING_EVENT_COUNT_METRIC).with_filter(rust_log_env_filter()),
);
let r = r.with(TracingEventCountLayer(&TRACING_EVENT_COUNT).with_filter(rust_log_env_filter()));
match tracing_error_layer_enablement {
TracingErrorLayerEnablement::EnableWithRustLogFilter => r
.with(tracing_error::ErrorLayer::default().with_filter(rust_log_env_filter()))
@@ -284,14 +257,14 @@ impl std::fmt::Debug for SecretString {
mod tests {
use metrics::{core::Opts, IntCounterVec};
use crate::logging::{TracingEventCountLayer, TracingEventCountMetric};
use super::TracingEventCountLayer;
#[test]
fn tracing_event_count_metric() {
let counter_vec =
IntCounterVec::new(Opts::new("testmetric", "testhelp"), &["level"]).unwrap();
let metric = Box::leak(Box::new(TracingEventCountMetric::new(counter_vec.clone())));
let layer = TracingEventCountLayer(metric);
let counter_vec = Box::leak(Box::new(counter_vec)); // make it 'static
let layer = TracingEventCountLayer(counter_vec);
use tracing_subscriber::prelude::*;
tracing::subscriber::with_default(tracing_subscriber::registry().with(layer), || {

View File

@@ -436,9 +436,9 @@ mod tests {
event_mask: 0,
}),
expected_messages: vec![
// Greeting(ProposerGreeting { protocol_version: 2, pg_version: 160001, proposer_id: [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0], system_id: 0, timeline_id: 9e4c8f36063c6c6e93bc20d65a820f3d, tenant_id: 9e4c8f36063c6c6e93bc20d65a820f3d, tli: 1, wal_seg_size: 16777216 })
// Greeting(ProposerGreeting { protocol_version: 2, pg_version: 160000, proposer_id: [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0], system_id: 0, timeline_id: 9e4c8f36063c6c6e93bc20d65a820f3d, tenant_id: 9e4c8f36063c6c6e93bc20d65a820f3d, tli: 1, wal_seg_size: 16777216 })
vec![
103, 0, 0, 0, 0, 0, 0, 0, 2, 0, 0, 0, 1, 113, 2, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
103, 0, 0, 0, 0, 0, 0, 0, 2, 0, 0, 0, 0, 113, 2, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 158, 76, 143, 54, 6, 60, 108, 110,
147, 188, 32, 214, 90, 130, 15, 61, 158, 76, 143, 54, 6, 60, 108, 110, 147,
188, 32, 214, 90, 130, 15, 61, 1, 0, 0, 0, 0, 0, 0, 1,
@@ -478,7 +478,7 @@ mod tests {
// walproposer will panic when it finishes sync_safekeepers
std::panic::catch_unwind(|| wp.start()).unwrap_err();
// validate the resulting LSN
assert_eq!(receiver.try_recv(), Ok(1337));
assert_eq!(receiver.recv()?, 1337);
Ok(())
// drop() will free up resources here
}

View File

@@ -36,7 +36,6 @@ humantime.workspace = true
humantime-serde.workspace = true
hyper.workspace = true
itertools.workspace = true
md5.workspace = true
nix.workspace = true
# hack to get the number of worker threads tokio uses
num_cpus = { version = "1.15" }

View File

@@ -14,7 +14,7 @@ use pageserver::control_plane_client::ControlPlaneClient;
use pageserver::disk_usage_eviction_task::{self, launch_disk_usage_global_eviction_task};
use pageserver::metrics::{STARTUP_DURATION, STARTUP_IS_LOADING};
use pageserver::task_mgr::WALRECEIVER_RUNTIME;
use pageserver::tenant::{secondary, TenantSharedResources};
use pageserver::tenant::TenantSharedResources;
use remote_storage::GenericRemoteStorage;
use tokio::time::Instant;
use tracing::*;
@@ -370,13 +370,18 @@ fn start_pageserver(
// Top-level cancellation token for the process
let shutdown_pageserver = tokio_util::sync::CancellationToken::new();
pageserver::PAGESERVER_SHUTDOWN_TOKEN
.set(shutdown_pageserver.clone())
.map_err(|_| ())
.expect("cannot be set already");
// Set up remote storage client
let remote_storage = create_remote_storage_client(conf)?;
// Set up deletion queue
let (deletion_queue, deletion_workers) = DeletionQueue::new(
remote_storage.clone(),
ControlPlaneClient::new(conf, &shutdown_pageserver),
ControlPlaneClient::new(conf, shutdown_pageserver.child_token()),
conf,
);
if let Some(deletion_workers) = deletion_workers {
@@ -420,12 +425,12 @@ fn start_pageserver(
deletion_queue_client,
},
order,
shutdown_pageserver.clone(),
shutdown_pageserver.child_token(),
))?;
let tenant_manager = Arc::new(tenant_manager);
BACKGROUND_RUNTIME.spawn({
let shutdown_pageserver = shutdown_pageserver.clone();
let shutdown_pageserver = shutdown_pageserver.child_token();
let drive_init = async move {
// NOTE: unlike many futures in pageserver, this one is cancellation-safe
let guard = scopeguard::guard_on_success((), |_| {
@@ -504,17 +509,6 @@ fn start_pageserver(
}
});
let secondary_controller = if let Some(remote_storage) = &remote_storage {
secondary::spawn_tasks(
tenant_manager.clone(),
remote_storage.clone(),
background_jobs_barrier.clone(),
shutdown_pageserver.clone(),
)
} else {
secondary::null_controller()
};
// shared state between the disk-usage backed eviction background task and the http endpoint
// that allows triggering disk-usage based eviction manually. note that the http endpoint
// is still accessible even if background task is not configured as long as remote storage has
@@ -527,6 +521,7 @@ fn start_pageserver(
remote_storage.clone(),
disk_usage_eviction_state.clone(),
background_jobs_barrier.clone(),
shutdown_pageserver.child_token(),
)?;
}
@@ -544,17 +539,19 @@ fn start_pageserver(
broker_client.clone(),
disk_usage_eviction_state,
deletion_queue.new_client(),
secondary_controller,
)
.context("Failed to initialize router state")?,
);
let cancel = shutdown_pageserver.child_token();
let router = http::make_router(router_state, launch_ts, http_auth.clone())?
.build()
.map_err(|err| anyhow!(err))?;
let service = utils::http::RouterService::new(router).unwrap();
let server = hyper::Server::from_tcp(http_listener)?
.serve(service)
.with_graceful_shutdown(task_mgr::shutdown_watcher());
.with_graceful_shutdown(cancel.clone().cancelled_owned());
task_mgr::spawn(
MGMT_REQUEST_RUNTIME.handle(),
@@ -563,6 +560,7 @@ fn start_pageserver(
None,
"http endpoint listener",
true,
cancel,
async {
server.await?;
Ok(())
@@ -588,6 +586,7 @@ fn start_pageserver(
None,
"consumption metrics collection",
true,
shutdown_pageserver.child_token(),
async move {
// first wait until background jobs are cleared to launch.
//
@@ -636,6 +635,7 @@ fn start_pageserver(
None,
"libpq endpoint listener",
true,
shutdown_pageserver.child_token(),
async move {
page_service::libpq_listener_main(
conf,
@@ -669,9 +669,8 @@ fn start_pageserver(
signal.name()
);
// This cancels the `shutdown_pageserver` cancellation tree.
// Right now that tree doesn't reach very far, and `task_mgr` is used instead.
// The plan is to change that over time.
// This cancels the `shutdown_pageserver` cancellation tree and signals cancellation to
// all tasks in the system.
shutdown_pageserver.take();
let bg_remote_storage = remote_storage.clone();
let bg_deletion_queue = deletion_queue.clone();

View File

@@ -70,8 +70,6 @@ pub mod defaults {
pub const DEFAULT_SYNTHETIC_SIZE_CALCULATION_INTERVAL: &str = "10 min";
pub const DEFAULT_BACKGROUND_TASK_MAXIMUM_DELAY: &str = "10s";
pub const DEFAULT_HEATMAP_UPLOAD_CONCURRENCY: usize = 8;
///
/// Default built-in configuration file.
///
@@ -119,8 +117,6 @@ pub mod defaults {
#evictions_low_residence_duration_metric_threshold = '{DEFAULT_EVICTIONS_LOW_RESIDENCE_DURATION_METRIC_THRESHOLD}'
#gc_feedback = false
#heatmap_upload_concurrency = {DEFAULT_HEATMAP_UPLOAD_CONCURRENCY}
[remote_storage]
"#
@@ -219,10 +215,6 @@ pub struct PageServerConf {
/// If true, pageserver will make best-effort to operate without a control plane: only
/// for use in major incidents.
pub control_plane_emergency_mode: bool,
/// How many heatmap uploads may be done concurrency: lower values implicitly deprioritize
/// heatmap uploads vs. other remote storage operations.
pub heatmap_upload_concurrency: usize,
}
/// We do not want to store this in a PageServerConf because the latter may be logged
@@ -301,8 +293,6 @@ struct PageServerConfigBuilder {
control_plane_api: BuilderValue<Option<Url>>,
control_plane_api_token: BuilderValue<Option<SecretString>>,
control_plane_emergency_mode: BuilderValue<bool>,
heatmap_upload_concurrency: BuilderValue<usize>,
}
impl Default for PageServerConfigBuilder {
@@ -371,8 +361,6 @@ impl Default for PageServerConfigBuilder {
control_plane_api: Set(None),
control_plane_api_token: Set(None),
control_plane_emergency_mode: Set(false),
heatmap_upload_concurrency: Set(DEFAULT_HEATMAP_UPLOAD_CONCURRENCY),
}
}
}
@@ -513,10 +501,6 @@ impl PageServerConfigBuilder {
self.control_plane_emergency_mode = BuilderValue::Set(enabled)
}
pub fn heatmap_upload_concurrency(&mut self, value: usize) {
self.heatmap_upload_concurrency = BuilderValue::Set(value)
}
pub fn build(self) -> anyhow::Result<PageServerConf> {
let concurrent_tenant_size_logical_size_queries = self
.concurrent_tenant_size_logical_size_queries
@@ -611,10 +595,6 @@ impl PageServerConfigBuilder {
control_plane_emergency_mode: self
.control_plane_emergency_mode
.ok_or(anyhow!("missing control_plane_emergency_mode"))?,
heatmap_upload_concurrency: self
.heatmap_upload_concurrency
.ok_or(anyhow!("missing heatmap_upload_concurrency"))?,
})
}
}
@@ -848,9 +828,7 @@ impl PageServerConf {
},
"control_plane_emergency_mode" => {
builder.control_plane_emergency_mode(parse_toml_bool(key, item)?)
},
"heatmap_upload_concurrency" => {
builder.heatmap_upload_concurrency(parse_toml_u64(key, item)? as usize)
},
_ => bail!("unrecognized pageserver option '{key}'"),
}
@@ -918,7 +896,6 @@ impl PageServerConf {
control_plane_api: None,
control_plane_api_token: None,
control_plane_emergency_mode: false,
heatmap_upload_concurrency: defaults::DEFAULT_HEATMAP_UPLOAD_CONCURRENCY,
}
}
}
@@ -1143,8 +1120,7 @@ background_task_maximum_delay = '334 s'
)?,
control_plane_api: None,
control_plane_api_token: None,
control_plane_emergency_mode: false,
heatmap_upload_concurrency: defaults::DEFAULT_HEATMAP_UPLOAD_CONCURRENCY
control_plane_emergency_mode: false
},
"Correct defaults should be used when no config values are provided"
);
@@ -1201,8 +1177,7 @@ background_task_maximum_delay = '334 s'
background_task_maximum_delay: Duration::from_secs(334),
control_plane_api: None,
control_plane_api_token: None,
control_plane_emergency_mode: false,
heatmap_upload_concurrency: defaults::DEFAULT_HEATMAP_UPLOAD_CONCURRENCY
control_plane_emergency_mode: false
},
"Should be able to parse all basic config values correctly"
);

View File

@@ -65,6 +65,7 @@ pub async fn collect_metrics(
None,
"synthetic size calculation",
false,
cancel.child_token(),
async move {
calculate_synthetic_size_worker(
synthetic_size_calculation_interval,

View File

@@ -40,7 +40,7 @@ pub trait ControlPlaneGenerationsApi {
impl ControlPlaneClient {
/// A None return value indicates that the input `conf` object does not have control
/// plane API enabled.
pub fn new(conf: &'static PageServerConf, cancel: &CancellationToken) -> Option<Self> {
pub fn new(conf: &'static PageServerConf, cancel: CancellationToken) -> Option<Self> {
let mut url = match conf.control_plane_api.as_ref() {
Some(u) => u.clone(),
None => return None,
@@ -67,7 +67,7 @@ impl ControlPlaneClient {
http_client: client.build().expect("Failed to construct HTTP client"),
base_url: url,
node_id: conf.id,
cancel: cancel.clone(),
cancel,
})
}

View File

@@ -87,6 +87,7 @@ pub fn launch_disk_usage_global_eviction_task(
storage: GenericRemoteStorage,
state: Arc<State>,
background_jobs_barrier: completion::Barrier,
cancel: CancellationToken,
) -> anyhow::Result<()> {
let Some(task_config) = &conf.disk_usage_based_eviction else {
info!("disk usage based eviction task not configured");
@@ -102,6 +103,7 @@ pub fn launch_disk_usage_global_eviction_task(
None,
"disk usage based eviction",
false,
cancel,
async move {
let cancel = task_mgr::shutdown_token();

View File

@@ -992,8 +992,8 @@ paths:
type: string
post:
description: |
Create a timeline. Returns new timeline id on success.
Recreating the same timeline will succeed if the parameters match the existing timeline.
Create a timeline. Returns new timeline id on success.\
If no new timeline id is specified in parameters, it would be generated. It's an error to recreate the same timeline.
If no pg_version is specified, assume DEFAULT_PG_VERSION hardcoded in the pageserver.
requestBody:
content:
@@ -1405,8 +1405,6 @@ components:
type: integer
trace_read_requests:
type: boolean
heatmap_period:
type: integer
TenantConfigResponse:
type: object
properties:

View File

@@ -42,7 +42,6 @@ use crate::tenant::mgr::{
GetTenantError, SetNewTenantConfigError, TenantManager, TenantMapError, TenantMapInsertError,
TenantSlotError, TenantSlotUpsertError, TenantStateError,
};
use crate::tenant::secondary::SecondaryController;
use crate::tenant::size::ModelInputs;
use crate::tenant::storage_layer::LayerAccessStatsReset;
use crate::tenant::timeline::CompactFlags;
@@ -76,11 +75,9 @@ pub struct State {
broker_client: storage_broker::BrokerClientChannel,
disk_usage_eviction_state: Arc<disk_usage_eviction_task::State>,
deletion_queue_client: DeletionQueueClient,
secondary_controller: SecondaryController,
}
impl State {
#[allow(clippy::too_many_arguments)]
pub fn new(
conf: &'static PageServerConf,
tenant_manager: Arc<TenantManager>,
@@ -89,7 +86,6 @@ impl State {
broker_client: storage_broker::BrokerClientChannel,
disk_usage_eviction_state: Arc<disk_usage_eviction_task::State>,
deletion_queue_client: DeletionQueueClient,
secondary_controller: SecondaryController,
) -> anyhow::Result<Self> {
let allowlist_routes = ["/v1/status", "/v1/doc", "/swagger.yml", "/metrics"]
.iter()
@@ -104,7 +100,6 @@ impl State {
broker_client,
disk_usage_eviction_state,
deletion_queue_client,
secondary_controller,
})
}
@@ -453,7 +448,7 @@ async fn timeline_create_handler(
.map_err(ApiError::InternalServerError)?;
json_response(StatusCode::CREATED, timeline_info)
}
Err(tenant::CreateTimelineError::Conflict | tenant::CreateTimelineError::AlreadyCreating) => {
Err(tenant::CreateTimelineError::AlreadyExists) => {
json_response(StatusCode::CONFLICT, ())
}
Err(tenant::CreateTimelineError::AncestorLsn(err)) => {
@@ -1621,7 +1616,9 @@ async fn disk_usage_eviction_run(
}
}
let config = json_request::<Config>(&mut r).await?;
let config = json_request::<Config>(&mut r)
.await
.map_err(|_| ApiError::BadRequest(anyhow::anyhow!("invalid JSON body")))?;
let usage = Usage {
config,
@@ -1650,21 +1647,6 @@ async fn disk_usage_eviction_run(
json_response(StatusCode::OK, res)
}
async fn secondary_upload_handler(
request: Request<Body>,
_cancel: CancellationToken,
) -> Result<Response<Body>, ApiError> {
let state = get_state(&request);
let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
state
.secondary_controller
.upload_tenant(tenant_shard_id)
.await
.map_err(ApiError::InternalServerError)?;
json_response(StatusCode::OK, ())
}
async fn handler_404(_: Request<Body>) -> Result<Response<Body>, ApiError> {
json_response(
StatusCode::NOT_FOUND,
@@ -1924,9 +1906,6 @@ pub fn make_router(
"/v1/tenant/:tenant_shard_id/timeline/:timeline_id/layer/:layer_file_name",
|r| api_handler(r, evict_timeline_layer_handler),
)
.post("/v1/tenant/:tenant_shard_id/heatmap_upload", |r| {
api_handler(r, secondary_upload_handler)
})
.put("/v1/disk_usage_eviction/run", |r| {
api_handler(r, disk_usage_eviction_run)
})

View File

@@ -49,11 +49,22 @@ pub const DELTA_FILE_MAGIC: u16 = 0x5A61;
static ZERO_PAGE: bytes::Bytes = bytes::Bytes::from_static(&[0u8; 8192]);
/// The main cancellation token for the process.
///
/// Should only ever be used to create child tokens.
pub static PAGESERVER_SHUTDOWN_TOKEN: std::sync::OnceLock<tokio_util::sync::CancellationToken> =
std::sync::OnceLock::new();
pub use crate::metrics::preinitialize_metrics;
#[tracing::instrument(skip_all, fields(%exit_code))]
pub async fn shutdown_pageserver(deletion_queue: Option<DeletionQueue>, exit_code: i32) {
use std::time::Duration;
if let Some(token) = PAGESERVER_SHUTDOWN_TOKEN.get() {
token.cancel();
}
// Shut down the libpq endpoint task. This prevents new connections from
// being accepted.
timed(

View File

@@ -2,10 +2,9 @@ use enum_map::EnumMap;
use metrics::metric_vec_duration::DurationResultObserver;
use metrics::{
register_counter_vec, register_gauge_vec, register_histogram, register_histogram_vec,
register_int_counter, register_int_counter_pair_vec, register_int_counter_vec,
register_int_gauge, register_int_gauge_vec, register_uint_gauge, register_uint_gauge_vec,
Counter, CounterVec, GaugeVec, Histogram, HistogramVec, IntCounter, IntCounterPairVec,
IntCounterVec, IntGauge, IntGaugeVec, UIntGauge, UIntGaugeVec,
register_int_counter, register_int_counter_vec, register_int_gauge, register_int_gauge_vec,
register_uint_gauge, register_uint_gauge_vec, Counter, CounterVec, GaugeVec, Histogram,
HistogramVec, IntCounter, IntCounterVec, IntGauge, IntGaugeVec, UIntGauge, UIntGaugeVec,
};
use once_cell::sync::Lazy;
use pageserver_api::shard::TenantShardId;
@@ -1271,28 +1270,6 @@ pub(crate) static WAL_INGEST: Lazy<WalIngestMetrics> = Lazy::new(|| WalIngestMet
)
.expect("failed to define a metric"),
});
pub(crate) struct SecondaryModeMetrics {
pub(crate) upload_heatmap: IntCounter,
pub(crate) upload_heatmap_errors: IntCounter,
pub(crate) upload_heatmap_duration: Histogram,
}
pub(crate) static SECONDARY_MODE: Lazy<SecondaryModeMetrics> = Lazy::new(|| SecondaryModeMetrics {
upload_heatmap: register_int_counter!(
"pageserver_secondary_upload_heatmap",
"Number of heatmaps written to remote storage by attached tenants"
)
.expect("failed to define a metric"),
upload_heatmap_errors: register_int_counter!(
"pageserver_secondary_upload_heatmap_errors",
"Failures writing heatmap to remote storage"
)
.expect("failed to define a metric"),
upload_heatmap_duration: register_histogram!(
"pageserver_secondary_upload_heatmap_duration",
"Time to build and upload a heatmap, including any waiting inside the S3 client"
)
.expect("failed to define a metric"),
});
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum RemoteOpKind {
@@ -1344,16 +1321,25 @@ pub(crate) static TENANT_TASK_EVENTS: Lazy<IntCounterVec> = Lazy::new(|| {
.expect("Failed to register tenant_task_events metric")
});
pub(crate) static BACKGROUND_LOOP_SEMAPHORE_WAIT_GAUGE: Lazy<IntCounterPairVec> = Lazy::new(|| {
register_int_counter_pair_vec!(
"pageserver_background_loop_semaphore_wait_start_count",
"Counter for background loop concurrency-limiting semaphore acquire calls started",
"pageserver_background_loop_semaphore_wait_finish_count",
"Counter for background loop concurrency-limiting semaphore acquire calls finished",
&["task"],
)
.unwrap()
});
pub(crate) static BACKGROUND_LOOP_SEMAPHORE_WAIT_START_COUNT: Lazy<IntCounterVec> =
Lazy::new(|| {
register_int_counter_vec!(
"pageserver_background_loop_semaphore_wait_start_count",
"Counter for background loop concurrency-limiting semaphore acquire calls started",
&["task"],
)
.unwrap()
});
pub(crate) static BACKGROUND_LOOP_SEMAPHORE_WAIT_FINISH_COUNT: Lazy<IntCounterVec> =
Lazy::new(|| {
register_int_counter_vec!(
"pageserver_background_loop_semaphore_wait_finish_count",
"Counter for background loop concurrency-limiting semaphore acquire calls finished",
&["task"],
)
.unwrap()
});
pub(crate) static BACKGROUND_LOOP_PERIOD_OVERRUN_COUNT: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(

View File

@@ -166,6 +166,7 @@ pub async fn libpq_listener_main(
None,
"serving compute connection task",
false,
cancel.child_token(),
page_service_conn_main(
conf,
broker_client.clone(),

View File

@@ -258,9 +258,6 @@ pub enum TaskKind {
/// See [`crate::disk_usage_eviction_task`].
DiskUsageEviction,
/// See [`crate::tenant::secondary`].
SecondaryUploads,
// Initial logical size calculation
InitialLogicalSizeCalculation,
@@ -330,6 +327,7 @@ struct PageServerTask {
/// Launch a new task
/// Note: if shutdown_process_on_error is set to true failure
/// of the task will lead to shutdown of entire process
#[allow(clippy::too_many_arguments)]
pub fn spawn<F>(
runtime: &tokio::runtime::Handle,
kind: TaskKind,
@@ -337,12 +335,13 @@ pub fn spawn<F>(
timeline_id: Option<TimelineId>,
name: &str,
shutdown_process_on_error: bool,
cancel: CancellationToken,
future: F,
) -> PageserverTaskId
where
F: Future<Output = anyhow::Result<()>> + Send + 'static,
{
let cancel = CancellationToken::new();
// let cancel = CancellationToken::new();
let task_id = NEXT_TASK_ID.fetch_add(1, Ordering::Relaxed);
let task = Arc::new(PageServerTask {
task_id: PageserverTaskId(task_id),
@@ -564,9 +563,9 @@ pub fn shutdown_token() -> CancellationToken {
let res = SHUTDOWN_TOKEN.try_with(|t| t.clone());
if cfg!(test) {
// in tests this method is called from non-taskmgr spawned tasks, and that is all ok.
res.unwrap_or_default()
} else {
// tests need to call the same paths which need to use get the shutdown token
res.expect("shutdown_token() called in an unexpected task or thread")
}
}

View File

@@ -48,7 +48,6 @@ use self::mgr::GetActiveTenantError;
use self::mgr::GetTenantError;
use self::mgr::TenantsMap;
use self::remote_timeline_client::RemoteTimelineClient;
use self::timeline::uninit::TimelineExclusionError;
use self::timeline::uninit::TimelineUninitMark;
use self::timeline::uninit::UninitializedTimeline;
use self::timeline::EvictionTaskTenantState;
@@ -88,6 +87,7 @@ use std::process::Stdio;
use std::sync::atomic::AtomicU64;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::sync::MutexGuard;
use std::sync::{Mutex, RwLock};
use std::time::{Duration, Instant};
@@ -144,7 +144,6 @@ pub mod storage_layer;
pub mod config;
pub mod delete;
pub mod mgr;
pub mod secondary;
pub mod tasks;
pub mod upload_queue;
@@ -249,12 +248,6 @@ pub struct Tenant {
generation: Generation,
timelines: Mutex<HashMap<TimelineId, Arc<Timeline>>>,
/// During timeline creation, we first insert the TimelineId to the
/// creating map, then `timelines`, then remove it from the creating map.
/// **Lock order**: if acquring both, acquire`timelines` before `timelines_creating`
timelines_creating: std::sync::Mutex<HashSet<TimelineId>>,
// This mutex prevents creation of new timelines during GC.
// Adding yet another mutex (in addition to `timelines`) is needed because holding
// `timelines` mutex during all GC iteration
@@ -413,10 +406,8 @@ impl Debug for SetStoppingError {
#[derive(thiserror::Error, Debug)]
pub enum CreateTimelineError {
#[error("creation of timeline with the given ID is in progress")]
AlreadyCreating,
#[error("timeline already exists with different parameters")]
Conflict,
#[error("a timeline with the given ID already exists")]
AlreadyExists,
#[error(transparent)]
AncestorLsn(anyhow::Error),
#[error("ancestor timeline is not active")]
@@ -495,6 +486,7 @@ impl Tenant {
ancestor.clone(),
resources,
CreateTimelineCause::Load,
self.cancel.child_token(),
)?;
let disk_consistent_lsn = timeline.get_disk_consistent_lsn();
anyhow::ensure!(
@@ -512,7 +504,7 @@ impl Tenant {
.remote_client
.as_ref()
.unwrap()
.init_upload_queue(index_part)?;
.init_upload_queue(index_part, timeline.cancel.child_token())?;
} else if self.remote_storage.is_some() {
// No data on the remote storage, but we have local metadata file. We can end up
// here with timeline_create being interrupted before finishing index part upload.
@@ -520,7 +512,7 @@ impl Tenant {
// If control plane retries timeline creation in the meantime, the mgmt API handler
// for timeline creation will coalesce on the upload we queue here.
let rtc = timeline.remote_client.as_ref().unwrap();
rtc.init_upload_queue_for_empty_remote(&metadata)?;
rtc.init_upload_queue_for_empty_remote(&metadata, timeline.cancel.child_token())?;
rtc.schedule_index_upload_for_metadata_update(&metadata)?;
}
@@ -614,6 +606,12 @@ impl Tenant {
let tenant_clone = Arc::clone(&tenant);
let ctx = ctx.detached_child(TaskKind::Attach, DownloadBehavior::Warn);
let cancel = crate::PAGESERVER_SHUTDOWN_TOKEN
.get()
.cloned()
.unwrap_or_default()
.child_token();
task_mgr::spawn(
&tokio::runtime::Handle::current(),
TaskKind::Attach,
@@ -621,6 +619,7 @@ impl Tenant {
None,
"attach tenant",
false,
cancel,
async move {
// Ideally we should use Tenant::set_broken_no_wait, but it is not supposed to be used when tenant is in loading state.
let make_broken =
@@ -880,8 +879,10 @@ impl Tenant {
// Walk through deleted timelines, resume deletion
for (timeline_id, index_part, remote_timeline_client) in timelines_to_resume_deletions {
let cancel = self.cancel.child_token();
remote_timeline_client
.init_upload_queue_stopped_to_continue_deletion(&index_part)
.init_upload_queue_stopped_to_continue_deletion(&index_part, cancel.child_token())
.context("init queue stopped")
.map_err(LoadLocalTimelineError::ResumeDeletion)?;
@@ -891,6 +892,7 @@ impl Tenant {
&index_part.metadata,
Some(remote_timeline_client),
self.deletion_queue_client.clone(),
cancel,
)
.await
.context("resume_deletion")
@@ -1224,7 +1226,7 @@ impl Tenant {
timeline_id,
self.generation,
);
let cancel_clone = cancel.clone();
let cancel_clone = cancel.child_token();
part_downloads.spawn(
async move {
debug!("starting index part download");
@@ -1385,6 +1387,7 @@ impl Tenant {
&local_metadata,
None,
self.deletion_queue_client.clone(),
self.cancel.child_token(),
)
.await
.context("resume deletion")
@@ -1466,7 +1469,7 @@ impl Tenant {
/// For tests, use `DatadirModification::init_empty_test_timeline` + `commit` to setup the
/// minimum amount of keys required to get a writable timeline.
/// (Without it, `put` might fail due to `repartition` failing.)
pub(crate) async fn create_empty_timeline(
pub async fn create_empty_timeline(
&self,
new_timeline_id: TimelineId,
initdb_lsn: Lsn,
@@ -1478,7 +1481,10 @@ impl Tenant {
"Cannot create empty timelines on inactive tenant"
);
let timeline_uninit_mark = self.create_timeline_uninit_mark(new_timeline_id)?;
let timeline_uninit_mark = {
let timelines = self.timelines.lock().unwrap();
self.create_timeline_uninit_mark(new_timeline_id, &timelines)?
};
let new_metadata = TimelineMetadata::new(
// Initialize disk_consistent LSN to 0, The caller must import some data to
// make it valid, before calling finish_creation()
@@ -1555,7 +1561,7 @@ impl Tenant {
/// If the caller specified the timeline ID to use (`new_timeline_id`), and timeline with
/// the same timeline ID already exists, returns CreateTimelineError::AlreadyExists.
#[allow(clippy::too_many_arguments)]
pub(crate) async fn create_timeline(
pub async fn create_timeline(
&self,
new_timeline_id: TimelineId,
ancestor_timeline_id: Option<TimelineId>,
@@ -1576,51 +1582,26 @@ impl Tenant {
.enter()
.map_err(|_| CreateTimelineError::ShuttingDown)?;
// Get exclusive access to the timeline ID: this ensures that it does not already exist,
// and that no other creation attempts will be allowed in while we are working. The
// uninit_mark is a guard.
let uninit_mark = match self.create_timeline_uninit_mark(new_timeline_id) {
Ok(m) => m,
Err(TimelineExclusionError::AlreadyCreating) => {
// Creation is in progress, we cannot create it again, and we cannot
// check if this request matches the existing one, so caller must try
// again later.
return Err(CreateTimelineError::AlreadyCreating);
}
Err(TimelineExclusionError::Other(e)) => {
return Err(CreateTimelineError::Other(e));
}
Err(TimelineExclusionError::AlreadyExists(existing)) => {
debug!("timeline {new_timeline_id} already exists");
if let Ok(existing) = self.get_timeline(new_timeline_id, false) {
debug!("timeline {new_timeline_id} already exists");
// Idempotency: creating the same timeline twice is not an error, unless
// the second creation has different parameters.
if existing.get_ancestor_timeline_id() != ancestor_timeline_id
|| existing.pg_version != pg_version
|| (ancestor_start_lsn.is_some()
&& ancestor_start_lsn != Some(existing.get_ancestor_lsn()))
{
return Err(CreateTimelineError::Conflict);
}
if let Some(remote_client) = existing.remote_client.as_ref() {
// Wait for uploads to complete, so that when we return Ok, the timeline
// is known to be durable on remote storage. Just like we do at the end of
// this function, after we have created the timeline ourselves.
//
// We only really care that the initial version of `index_part.json` has
// been uploaded. That's enough to remember that the timeline
// exists. However, there is no function to wait specifically for that so
// we just wait for all in-progress uploads to finish.
remote_client
.wait_completion()
.await
.context("wait for timeline uploads to complete")?;
}
return Ok(existing);
if let Some(remote_client) = existing.remote_client.as_ref() {
// Wait for uploads to complete, so that when we return Ok, the timeline
// is known to be durable on remote storage. Just like we do at the end of
// this function, after we have created the timeline ourselves.
//
// We only really care that the initial version of `index_part.json` has
// been uploaded. That's enough to remember that the timeline
// exists. However, there is no function to wait specifically for that so
// we just wait for all in-progress uploads to finish.
remote_client
.wait_completion()
.await
.context("wait for timeline uploads to complete")?;
}
};
return Err(CreateTimelineError::AlreadyExists);
}
let loaded_timeline = match ancestor_timeline_id {
Some(ancestor_timeline_id) => {
@@ -1657,32 +1638,18 @@ impl Tenant {
ancestor_timeline.wait_lsn(*lsn, ctx).await?;
}
self.branch_timeline(
&ancestor_timeline,
new_timeline_id,
ancestor_start_lsn,
uninit_mark,
ctx,
)
.await?
self.branch_timeline(&ancestor_timeline, new_timeline_id, ancestor_start_lsn, ctx)
.await?
}
None => {
self.bootstrap_timeline(
new_timeline_id,
pg_version,
load_existing_initdb,
uninit_mark,
ctx,
)
.await?
self.bootstrap_timeline(new_timeline_id, pg_version, load_existing_initdb, ctx)
.await?
}
};
// At this point we have dropped our guard on [`Self::timelines_creating`], and
// the timeline is visible in [`Self::timelines`], but it is _not_ durable yet. We must
// not send a success to the caller until it is. The same applies to handling retries,
// see the handling of [`TimelineExclusionError::AlreadyExists`] above.
if let Some(remote_client) = loaded_timeline.remote_client.as_ref() {
// Wait for the upload of the 'index_part.json` file to finish, so that when we return
// Ok, the timeline is durable in remote storage.
let kind = ancestor_timeline_id
.map(|_| "branched")
.unwrap_or("bootstrapped");
@@ -2159,14 +2126,6 @@ impl Tenant {
.attach_mode
.clone()
}
pub(crate) fn get_tenant_shard_id(&self) -> &TenantShardId {
&self.tenant_shard_id
}
pub(crate) fn get_generation(&self) -> Generation {
self.generation
}
}
/// Given a Vec of timelines and their ancestors (timeline_id, ancestor_id),
@@ -2305,18 +2264,6 @@ impl Tenant {
.or(self.conf.default_tenant_conf.min_resident_size_override)
}
pub fn get_heatmap_period(&self) -> Option<Duration> {
let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
let heatmap_period = tenant_conf
.heatmap_period
.unwrap_or(self.conf.default_tenant_conf.heatmap_period);
if heatmap_period.is_zero() {
None
} else {
Some(heatmap_period)
}
}
pub fn set_new_tenant_config(&self, new_tenant_conf: TenantConfOpt) {
self.tenant_conf.write().unwrap().tenant_conf = new_tenant_conf;
// Don't hold self.timelines.lock() during the notifies.
@@ -2355,6 +2302,7 @@ impl Tenant {
ancestor: Option<Arc<Timeline>>,
resources: TimelineResources,
cause: CreateTimelineCause,
cancel: CancellationToken,
) -> anyhow::Result<Arc<Timeline>> {
let state = match cause {
CreateTimelineCause::Load => {
@@ -2383,7 +2331,7 @@ impl Tenant {
resources,
pg_version,
state,
self.cancel.child_token(),
cancel,
);
Ok(timeline)
@@ -2456,6 +2404,12 @@ impl Tenant {
}
});
let cancel = crate::PAGESERVER_SHUTDOWN_TOKEN
.get()
.cloned()
.unwrap_or_default()
.child_token();
Tenant {
tenant_shard_id,
shard_identity,
@@ -2466,7 +2420,6 @@ impl Tenant {
loading_started_at: Instant::now(),
tenant_conf: Arc::new(RwLock::new(attached_conf)),
timelines: Mutex::new(HashMap::new()),
timelines_creating: Mutex::new(HashSet::new()),
gc_cs: tokio::sync::Mutex::new(()),
walredo_mgr,
remote_storage,
@@ -2476,7 +2429,7 @@ impl Tenant {
cached_synthetic_tenant_size: Arc::new(AtomicU64::new(0)),
eviction_task_tenant_state: tokio::sync::Mutex::new(EvictionTaskTenantState::default()),
delete_progress: Arc::new(tokio::sync::Mutex::new(DeleteTenantFlow::default())),
cancel: CancellationToken::default(),
cancel,
gate: Gate::new(format!("Tenant<{tenant_shard_id}>")),
}
}
@@ -2858,9 +2811,8 @@ impl Tenant {
start_lsn: Option<Lsn>,
ctx: &RequestContext,
) -> Result<Arc<Timeline>, CreateTimelineError> {
let uninit_mark = self.create_timeline_uninit_mark(dst_id).unwrap();
let tl = self
.branch_timeline_impl(src_timeline, dst_id, start_lsn, uninit_mark, ctx)
.branch_timeline_impl(src_timeline, dst_id, start_lsn, ctx)
.await?;
tl.set_state(TimelineState::Active);
Ok(tl)
@@ -2874,10 +2826,9 @@ impl Tenant {
src_timeline: &Arc<Timeline>,
dst_id: TimelineId,
start_lsn: Option<Lsn>,
timeline_uninit_mark: TimelineUninitMark<'_>,
ctx: &RequestContext,
) -> Result<Arc<Timeline>, CreateTimelineError> {
self.branch_timeline_impl(src_timeline, dst_id, start_lsn, timeline_uninit_mark, ctx)
self.branch_timeline_impl(src_timeline, dst_id, start_lsn, ctx)
.await
}
@@ -2886,14 +2837,13 @@ impl Tenant {
src_timeline: &Arc<Timeline>,
dst_id: TimelineId,
start_lsn: Option<Lsn>,
timeline_uninit_mark: TimelineUninitMark<'_>,
_ctx: &RequestContext,
) -> Result<Arc<Timeline>, CreateTimelineError> {
let src_id = src_timeline.timeline_id;
// We will validate our ancestor LSN in this function. Acquire the GC lock so that
// this check cannot race with GC, and the ancestor LSN is guaranteed to remain
// valid while we are creating the branch.
// First acquire the GC lock so that another task cannot advance the GC
// cutoff in 'gc_info', and make 'start_lsn' invalid, while we are
// creating the branch.
let _gc_cs = self.gc_cs.lock().await;
// If no start LSN is specified, we branch the new timeline from the source timeline's last record LSN
@@ -2903,6 +2853,13 @@ impl Tenant {
lsn
});
// Create a placeholder for the new branch. This will error
// out if the new timeline ID is already in use.
let timeline_uninit_mark = {
let timelines = self.timelines.lock().unwrap();
self.create_timeline_uninit_mark(dst_id, &timelines)?
};
// Ensure that `start_lsn` is valid, i.e. the LSN is within the PITR
// horizon on the source timeline
//
@@ -2994,38 +2951,21 @@ impl Tenant {
Ok(new_timeline)
}
/// For unit tests, make this visible so that other modules can directly create timelines
#[cfg(test)]
pub(crate) async fn bootstrap_timeline_test(
&self,
timeline_id: TimelineId,
pg_version: u32,
load_existing_initdb: Option<TimelineId>,
ctx: &RequestContext,
) -> anyhow::Result<Arc<Timeline>> {
let uninit_mark = self.create_timeline_uninit_mark(timeline_id).unwrap();
self.bootstrap_timeline(
timeline_id,
pg_version,
load_existing_initdb,
uninit_mark,
ctx,
)
.await
}
/// - run initdb to init temporary instance and get bootstrap data
/// - after initialization completes, tar up the temp dir and upload it to S3.
///
/// The caller is responsible for activating the returned timeline.
async fn bootstrap_timeline(
pub(crate) async fn bootstrap_timeline(
&self,
timeline_id: TimelineId,
pg_version: u32,
load_existing_initdb: Option<TimelineId>,
timeline_uninit_mark: TimelineUninitMark<'_>,
ctx: &RequestContext,
) -> anyhow::Result<Arc<Timeline>> {
let timeline_uninit_mark = {
let timelines = self.timelines.lock().unwrap();
self.create_timeline_uninit_mark(timeline_id, &timelines)?
};
// create a `tenant/{tenant_id}/timelines/basebackup-{timeline_id}.{TEMP_FILE_SUFFIX}/`
// temporary directory for basebackup files for the given timeline.
@@ -3106,9 +3046,8 @@ impl Tenant {
3,
u32::MAX,
"persist_initdb_tar_zst",
backoff::Cancel::new(self.cancel.clone(), || {
anyhow::anyhow!("initdb upload cancelled")
}),
// TODO: use a cancellation token (https://github.com/neondatabase/neon/issues/5066)
backoff::Cancel::new(CancellationToken::new(), || unreachable!()),
)
.await?;
@@ -3223,19 +3162,22 @@ impl Tenant {
/// at 'disk_consistent_lsn'. After any initial data has been imported, call
/// `finish_creation` to insert the Timeline into the timelines map and to remove the
/// uninit mark file.
async fn prepare_new_timeline<'a>(
&'a self,
async fn prepare_new_timeline(
&self,
new_timeline_id: TimelineId,
new_metadata: &TimelineMetadata,
uninit_mark: TimelineUninitMark<'a>,
uninit_mark: TimelineUninitMark,
start_lsn: Lsn,
ancestor: Option<Arc<Timeline>>,
) -> anyhow::Result<UninitializedTimeline> {
let tenant_shard_id = self.tenant_shard_id;
let resources = self.build_timeline_resources(new_timeline_id);
let cancel = self.cancel.child_token();
if let Some(remote_client) = &resources.remote_client {
remote_client.init_upload_queue_for_empty_remote(new_metadata)?;
remote_client.init_upload_queue_for_empty_remote(new_metadata, cancel.child_token())?;
}
let timeline_struct = self
@@ -3245,6 +3187,7 @@ impl Tenant {
ancestor,
resources,
CreateTimelineCause::Load,
cancel,
)
.context("Failed to create timeline data structure")?;
@@ -3300,38 +3243,23 @@ impl Tenant {
fn create_timeline_uninit_mark(
&self,
timeline_id: TimelineId,
) -> Result<TimelineUninitMark, TimelineExclusionError> {
timelines: &MutexGuard<HashMap<TimelineId, Arc<Timeline>>>,
) -> anyhow::Result<TimelineUninitMark> {
let tenant_shard_id = self.tenant_shard_id;
anyhow::ensure!(
timelines.get(&timeline_id).is_none(),
"Timeline {tenant_shard_id}/{timeline_id} already exists in pageserver's memory"
);
let timeline_path = self.conf.timeline_path(&tenant_shard_id, &timeline_id);
anyhow::ensure!(
!timeline_path.exists(),
"Timeline {timeline_path} already exists, cannot create its uninit mark file",
);
let uninit_mark_path = self
.conf
.timeline_uninit_mark_file_path(tenant_shard_id, timeline_id);
let timeline_path = self.conf.timeline_path(&tenant_shard_id, &timeline_id);
let uninit_mark = TimelineUninitMark::new(
self,
timeline_id,
uninit_mark_path.clone(),
timeline_path.clone(),
)?;
// At this stage, we have got exclusive access to in-memory state for this timeline ID
// for creation.
// A timeline directory should never exist on disk already:
// - a previous failed creation would have cleaned up after itself
// - a pageserver restart would clean up timeline directories that don't have valid remote state
//
// Therefore it is an unexpected internal error to encounter a timeline directory already existing here,
// this error may indicate a bug in cleanup on failed creations.
if timeline_path.exists() {
return Err(TimelineExclusionError::Other(anyhow::anyhow!(
"Timeline directory already exists! This is a bug."
)));
}
// Create the on-disk uninit mark _after_ the in-memory acquisition of the tenant ID: guarantees
// that during process runtime, colliding creations will be caught in-memory without getting
// as far as failing to write a file.
fs::OpenOptions::new()
.write(true)
.create_new(true)
@@ -3345,6 +3273,8 @@ impl Tenant {
format!("Failed to crate uninit mark for timeline {tenant_shard_id}/{timeline_id}")
})?;
let uninit_mark = TimelineUninitMark::new(uninit_mark_path, timeline_path);
Ok(uninit_mark)
}
@@ -3787,7 +3717,6 @@ pub(crate) mod harness {
tenant_conf.evictions_low_residence_duration_metric_threshold,
),
gc_feedback: Some(tenant_conf.gc_feedback),
heatmap_period: Some(tenant_conf.heatmap_period),
}
}
}
@@ -4094,7 +4023,13 @@ mod tests {
.await
{
Ok(_) => panic!("duplicate timeline creation should fail"),
Err(e) => assert_eq!(e.to_string(), "Already exists".to_string()),
Err(e) => assert_eq!(
e.to_string(),
format!(
"Timeline {}/{} already exists in pageserver's memory",
tenant.tenant_shard_id, TIMELINE_ID
)
),
}
Ok(())

View File

@@ -334,11 +334,6 @@ pub struct TenantConf {
#[serde(with = "humantime_serde")]
pub evictions_low_residence_duration_metric_threshold: Duration,
pub gc_feedback: bool,
/// If non-zero, the period between uploads of a heatmap from attached tenants. This
/// may be disabled if a Tenant will not have secondary locations: only secondary
/// locations will use the heatmap uploaded by attached locations.
pub heatmap_period: Duration,
}
/// Same as TenantConf, but this struct preserves the information about
@@ -419,11 +414,6 @@ pub struct TenantConfOpt {
#[serde(skip_serializing_if = "Option::is_none")]
#[serde(default)]
pub gc_feedback: Option<bool>,
#[serde(skip_serializing_if = "Option::is_none")]
#[serde(with = "humantime_serde")]
#[serde(default)]
pub heatmap_period: Option<Duration>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
@@ -492,7 +482,6 @@ impl TenantConfOpt {
.evictions_low_residence_duration_metric_threshold
.unwrap_or(global_conf.evictions_low_residence_duration_metric_threshold),
gc_feedback: self.gc_feedback.unwrap_or(global_conf.gc_feedback),
heatmap_period: self.heatmap_period.unwrap_or(global_conf.heatmap_period),
}
}
}
@@ -530,7 +519,6 @@ impl Default for TenantConf {
)
.expect("cannot parse default evictions_low_residence_duration_metric_threshold"),
gc_feedback: false,
heatmap_period: Duration::ZERO,
}
}
}

View File

@@ -460,6 +460,12 @@ impl DeleteTenantFlow {
) {
let tenant_shard_id = tenant.tenant_shard_id;
let cancel = crate::PAGESERVER_SHUTDOWN_TOKEN
.get()
.cloned()
.unwrap_or_default()
.child_token();
task_mgr::spawn(
task_mgr::BACKGROUND_RUNTIME.handle(),
TaskKind::TimelineDeletionWorker,
@@ -467,6 +473,7 @@ impl DeleteTenantFlow {
None,
"tenant_delete",
false,
cancel,
async move {
if let Err(err) =
Self::background(guard, conf, remote_storage, tenants, &tenant).await

View File

@@ -283,7 +283,7 @@ async fn init_load_generations(
"Emergency mode! Tenants will be attached unsafely using their last known generation"
);
emergency_generations(tenant_confs)
} else if let Some(client) = ControlPlaneClient::new(conf, cancel) {
} else if let Some(client) = ControlPlaneClient::new(conf, cancel.child_token()) {
info!("Calling control plane API to re-attach tenants");
// If we are configured to use the control plane API, then it is the source of truth for what tenants to load.
match client.re_attach().await {
@@ -807,12 +807,6 @@ pub(crate) async fn set_new_tenant_config(
}
impl TenantManager {
/// Convenience function so that anyone with a TenantManager can get at the global configuration, without
/// having to pass it around everywhere as a separate object.
pub(crate) fn get_conf(&self) -> &'static PageServerConf {
self.conf
}
/// Gets the attached tenant from the in-memory data, erroring if it's absent, in secondary mode, or is not fitting to the query.
/// `active_only = true` allows to query only tenants that are ready for operations, erroring on other kinds of tenants.
pub(crate) fn get_attached_tenant_shard(
@@ -1093,20 +1087,6 @@ impl TenantManager {
Ok(())
}
pub(crate) fn get_attached_active_tenant_shards(&self) -> Vec<Arc<Tenant>> {
let locked = self.tenants.read().unwrap();
match &*locked {
TenantsMap::Initializing => Vec::new(),
TenantsMap::Open(map) | TenantsMap::ShuttingDown(map) => map
.values()
.filter_map(|slot| {
slot.get_attached()
.and_then(|t| if t.is_active() { Some(t.clone()) } else { None })
})
.collect(),
}
}
}
#[derive(Debug, thiserror::Error)]
@@ -1372,6 +1352,11 @@ pub(crate) async fn detach_tenant(
// Although we are cleaning up the tenant, this task is not meant to be bound by the lifetime of the tenant in memory.
// After a tenant is detached, there are no more task_mgr tasks for that tenant_id.
let task_tenant_id = None;
let cancel = crate::PAGESERVER_SHUTDOWN_TOKEN
.get()
.cloned()
.unwrap_or_default()
.child_token();
task_mgr::spawn(
task_mgr::BACKGROUND_RUNTIME.handle(),
TaskKind::MgmtRequest,
@@ -1379,6 +1364,7 @@ pub(crate) async fn detach_tenant(
None,
"tenant_files_delete",
false,
cancel,
async move {
fs::remove_dir_all(tmp_path.as_path())
.await
@@ -2106,6 +2092,7 @@ pub(crate) async fn immediate_gc(
Some(timeline_id),
&format!("timeline_gc_handler garbage collection run for tenant {tenant_shard_id} timeline {timeline_id}"),
false,
tenant.cancel.child_token(),
async move {
fail::fail_point!("immediate_gc_task_pre");

View File

@@ -180,7 +180,7 @@
//! [`Tenant::timeline_init_and_sync`]: super::Tenant::timeline_init_and_sync
//! [`Timeline::load_layer_map`]: super::Timeline::load_layer_map
pub(crate) mod download;
mod download;
pub mod index;
mod upload;
@@ -357,9 +357,13 @@ impl RemoteTimelineClient {
/// Initialize the upload queue for a remote storage that already received
/// an index file upload, i.e., it's not empty.
/// The given `index_part` must be the one on the remote.
pub fn init_upload_queue(&self, index_part: &IndexPart) -> anyhow::Result<()> {
pub fn init_upload_queue(
&self,
index_part: &IndexPart,
cancel: CancellationToken,
) -> anyhow::Result<()> {
let mut upload_queue = self.upload_queue.lock().unwrap();
upload_queue.initialize_with_current_remote_index_part(index_part)?;
upload_queue.initialize_with_current_remote_index_part(index_part, cancel)?;
self.update_remote_physical_size_gauge(Some(index_part));
info!(
"initialized upload queue from remote index with {} layer files",
@@ -373,9 +377,10 @@ impl RemoteTimelineClient {
pub fn init_upload_queue_for_empty_remote(
&self,
local_metadata: &TimelineMetadata,
cancel: CancellationToken,
) -> anyhow::Result<()> {
let mut upload_queue = self.upload_queue.lock().unwrap();
upload_queue.initialize_empty_remote(local_metadata)?;
upload_queue.initialize_empty_remote(local_metadata, cancel)?;
self.update_remote_physical_size_gauge(None);
info!("initialized upload queue as empty");
Ok(())
@@ -386,6 +391,7 @@ impl RemoteTimelineClient {
pub fn init_upload_queue_stopped_to_continue_deletion(
&self,
index_part: &IndexPart,
cancel: CancellationToken,
) -> anyhow::Result<()> {
// FIXME: consider newtype for DeletedIndexPart.
let deleted_at = index_part.deleted_at.ok_or(anyhow::anyhow!(
@@ -394,7 +400,7 @@ impl RemoteTimelineClient {
{
let mut upload_queue = self.upload_queue.lock().unwrap();
upload_queue.initialize_with_current_remote_index_part(index_part)?;
upload_queue.initialize_with_current_remote_index_part(index_part, cancel)?;
self.update_remote_physical_size_gauge(Some(index_part));
}
// also locks upload queue, without dropping the guard above it will be a deadlock
@@ -1227,6 +1233,7 @@ impl RemoteTimelineClient {
Some(self.timeline_id),
"remote upload",
false,
upload_queue.cancel.child_token(),
async move {
self_rc.perform_upload_task(task).await;
Ok(())
@@ -1561,6 +1568,13 @@ impl RemoteTimelineClient {
dangling_files: HashMap::default(),
shutting_down: false,
shutdown_ready: Arc::new(tokio::sync::Semaphore::new(0)),
// TODO: this is the only place where we cannot reasonably continue the
// tree
cancel: crate::PAGESERVER_SHUTDOWN_TOKEN
.get()
.cloned()
.unwrap_or_default()
.child_token(),
};
let upload_queue = std::mem::replace(
@@ -1604,23 +1618,6 @@ impl RemoteTimelineClient {
}
}
}
pub(crate) fn get_layers_metadata(
&self,
layers: Vec<LayerFileName>,
) -> anyhow::Result<Vec<Option<LayerFileMetadata>>> {
let q = self.upload_queue.lock().unwrap();
let q = match &*q {
UploadQueue::Stopped(_) | UploadQueue::Uninitialized => {
anyhow::bail!("queue is in state {}", q.as_str())
}
UploadQueue::Initialized(inner) => inner,
};
let decorated = layers.into_iter().map(|l| q.latest_files.get(&l).cloned());
Ok(decorated.collect())
}
}
pub fn remote_timelines_path(tenant_shard_id: &TenantShardId) -> RemotePath {
@@ -1676,13 +1673,6 @@ pub fn remote_index_path(
.expect("Failed to construct path")
}
pub const HEATMAP_BASENAME: &str = "heatmap-v1.json";
pub(crate) fn remote_heatmap_path(tenant_shard_id: &TenantShardId) -> RemotePath {
RemotePath::from_string(&format!("tenants/{tenant_shard_id}/{HEATMAP_BASENAME}"))
.expect("Failed to construct path")
}
/// Given the key of an index, parse out the generation part of the name
pub fn parse_remote_index_path(path: RemotePath) -> Option<Generation> {
let file_name = match path.get_path().file_name() {

View File

@@ -4,9 +4,8 @@ use anyhow::{bail, Context};
use camino::Utf8Path;
use fail::fail_point;
use pageserver_api::shard::TenantShardId;
use std::io::{ErrorKind, SeekFrom};
use std::io::ErrorKind;
use tokio::fs::{self, File};
use tokio::io::AsyncSeekExt;
use super::Generation;
use crate::{
@@ -120,14 +119,11 @@ pub(crate) async fn upload_initdb_dir(
storage: &GenericRemoteStorage,
tenant_id: &TenantId,
timeline_id: &TimelineId,
mut initdb_tar_zst: File,
initdb_tar_zst: File,
size: u64,
) -> anyhow::Result<()> {
tracing::trace!("uploading initdb dir");
// We might have read somewhat into the file already in the prior retry attempt
initdb_tar_zst.seek(SeekFrom::Start(0)).await?;
let file = tokio_util::io::ReaderStream::with_capacity(initdb_tar_zst, super::BUFFER_SIZE);
let remote_path = remote_initdb_archive_path(tenant_id, timeline_id);

View File

@@ -1,104 +0,0 @@
pub mod heatmap;
mod heatmap_uploader;
use std::sync::Arc;
use crate::task_mgr::{self, TaskKind, BACKGROUND_RUNTIME};
use self::heatmap_uploader::heatmap_uploader_task;
use super::mgr::TenantManager;
use pageserver_api::shard::TenantShardId;
use remote_storage::GenericRemoteStorage;
use tokio_util::sync::CancellationToken;
use utils::completion::Barrier;
enum UploadCommand {
Upload(TenantShardId),
}
struct CommandRequest<T> {
payload: T,
response_tx: tokio::sync::oneshot::Sender<CommandResponse>,
}
struct CommandResponse {
result: anyhow::Result<()>,
}
/// The SecondaryController is a pseudo-rpc client for administrative control of secondary mode downloads,
/// and heatmap uploads. This is not a hot data path: it's primarily a hook for tests,
/// where we want to immediately upload/download for a particular tenant. In normal operation
/// uploads & downloads are autonomous and not driven by this interface.
pub struct SecondaryController {
upload_req_tx: tokio::sync::mpsc::Sender<CommandRequest<UploadCommand>>,
}
impl SecondaryController {
async fn dispatch<T>(
&self,
queue: &tokio::sync::mpsc::Sender<CommandRequest<T>>,
payload: T,
) -> anyhow::Result<()> {
let (response_tx, response_rx) = tokio::sync::oneshot::channel();
queue
.send(CommandRequest {
payload,
response_tx,
})
.await
.map_err(|_| anyhow::anyhow!("Receiver shut down"))?;
let response = response_rx
.await
.map_err(|_| anyhow::anyhow!("Request dropped"))?;
response.result
}
pub async fn upload_tenant(&self, tenant_shard_id: TenantShardId) -> anyhow::Result<()> {
self.dispatch(&self.upload_req_tx, UploadCommand::Upload(tenant_shard_id))
.await
}
}
pub fn spawn_tasks(
tenant_manager: Arc<TenantManager>,
remote_storage: GenericRemoteStorage,
background_jobs_can_start: Barrier,
cancel: CancellationToken,
) -> SecondaryController {
let (upload_req_tx, upload_req_rx) =
tokio::sync::mpsc::channel::<CommandRequest<UploadCommand>>(16);
task_mgr::spawn(
BACKGROUND_RUNTIME.handle(),
TaskKind::SecondaryUploads,
None,
None,
"heatmap uploads",
false,
async move {
heatmap_uploader_task(
tenant_manager,
remote_storage,
upload_req_rx,
background_jobs_can_start,
cancel,
)
.await
},
);
SecondaryController { upload_req_tx }
}
/// For running with remote storage disabled: a SecondaryController that is connected to nothing.
pub fn null_controller() -> SecondaryController {
let (upload_req_tx, _upload_req_rx) =
tokio::sync::mpsc::channel::<CommandRequest<UploadCommand>>(16);
SecondaryController { upload_req_tx }
}

View File

@@ -1,64 +0,0 @@
use std::time::SystemTime;
use crate::tenant::{
remote_timeline_client::index::IndexLayerMetadata, storage_layer::LayerFileName,
};
use serde::{Deserialize, Serialize};
use serde_with::{serde_as, DisplayFromStr, TimestampSeconds};
use utils::{generation::Generation, id::TimelineId};
#[derive(Serialize, Deserialize)]
pub(super) struct HeatMapTenant {
/// Generation of the attached location that uploaded the heatmap: this is not required
/// for correctness, but acts as a hint to secondary locations in order to detect thrashing
/// in the unlikely event that two attached locations are both uploading conflicting heatmaps.
pub(super) generation: Generation,
pub(super) timelines: Vec<HeatMapTimeline>,
}
#[serde_as]
#[derive(Serialize, Deserialize)]
pub(crate) struct HeatMapTimeline {
#[serde_as(as = "DisplayFromStr")]
pub(super) timeline_id: TimelineId,
pub(super) layers: Vec<HeatMapLayer>,
}
#[serde_as]
#[derive(Serialize, Deserialize)]
pub(crate) struct HeatMapLayer {
pub(super) name: LayerFileName,
pub(super) metadata: IndexLayerMetadata,
#[serde_as(as = "TimestampSeconds<i64>")]
pub(super) access_time: SystemTime,
// TODO: an actual 'heat' score that would let secondary locations prioritize downloading
// the hottest layers, rather than trying to simply mirror whatever layers are on-disk on the primary.
}
impl HeatMapLayer {
pub(crate) fn new(
name: LayerFileName,
metadata: IndexLayerMetadata,
access_time: SystemTime,
) -> Self {
Self {
name,
metadata,
access_time,
}
}
}
impl HeatMapTimeline {
pub(crate) fn new(timeline_id: TimelineId, layers: Vec<HeatMapLayer>) -> Self {
Self {
timeline_id,
layers,
}
}
}

View File

@@ -1,582 +0,0 @@
use std::{
collections::HashMap,
sync::{Arc, Weak},
time::{Duration, Instant},
};
use crate::{
metrics::SECONDARY_MODE,
tenant::{
config::AttachmentMode, mgr::TenantManager, remote_timeline_client::remote_heatmap_path,
secondary::CommandResponse, span::debug_assert_current_span_has_tenant_id, Tenant,
},
};
use md5;
use pageserver_api::shard::TenantShardId;
use remote_storage::GenericRemoteStorage;
use tokio::task::JoinSet;
use tokio_util::sync::CancellationToken;
use tracing::instrument;
use utils::{backoff, completion::Barrier};
use super::{heatmap::HeatMapTenant, CommandRequest, UploadCommand};
/// Period between heatmap uploader walking Tenants to look for work to do.
/// If any tenants have a heatmap upload period lower than this, it will be adjusted
/// downward to match.
const DEFAULT_SCHEDULING_INTERVAL: Duration = Duration::from_millis(60000);
const MIN_SCHEDULING_INTERVAL: Duration = Duration::from_millis(1000);
struct WriteInProgress {
barrier: Barrier,
}
struct UploadPending {
tenant: Arc<Tenant>,
last_digest: Option<md5::Digest>,
}
struct WriteComplete {
tenant_shard_id: TenantShardId,
completed_at: Instant,
digest: Option<md5::Digest>,
next_upload: Option<Instant>,
}
/// The heatmap uploader keeps a little bit of per-tenant state, mainly to remember
/// when we last did a write. We only populate this after doing at least one
/// write for a tenant -- this avoids holding state for tenants that have
/// uploads disabled.
struct UploaderTenantState {
// This Weak only exists to enable culling idle instances of this type
// when the Tenant has been deallocated.
tenant: Weak<Tenant>,
/// Digest of the serialized heatmap that we last successfully uploaded
///
/// md5 is generally a bad hash. We use it because it's convenient for interop with AWS S3's ETag,
/// which is also an md5sum.
last_digest: Option<md5::Digest>,
/// When the last upload attempt completed (may have been successful or failed)
last_upload: Option<Instant>,
/// When should we next do an upload? None means never.
next_upload: Option<Instant>,
}
/// This type is owned by a single task ([`heatmap_uploader_task`]) which runs an event
/// handling loop and mutates it as needed: there are no locks here, because that event loop
/// can hold &mut references to this type throughout.
struct HeatmapUploader {
tenant_manager: Arc<TenantManager>,
remote_storage: GenericRemoteStorage,
cancel: CancellationToken,
tenants: HashMap<TenantShardId, UploaderTenantState>,
/// Tenants with work to do, for which tasks should be spawned as soon as concurrency
/// limits permit it.
tenants_pending: std::collections::VecDeque<UploadPending>,
/// Tenants for which a task in `tasks` has been spawned.
tenants_uploading: HashMap<TenantShardId, WriteInProgress>,
tasks: JoinSet<()>,
/// Channel for our child tasks to send results to: we use a channel for results rather than
/// just getting task results via JoinSet because we need the channel's recv() "sleep until something
/// is available" semantic, rather than JoinSet::join_next()'s "sleep until next thing is available _or_ I'm empty"
/// behavior.
task_result_tx: tokio::sync::mpsc::UnboundedSender<WriteComplete>,
task_result_rx: tokio::sync::mpsc::UnboundedReceiver<WriteComplete>,
concurrent_uploads: usize,
scheduling_interval: Duration,
}
/// The uploader task runs a loop that periodically wakes up and schedules tasks for
/// tenants that require an upload, or handles any commands that have been sent into
/// `command_queue`. No I/O is done in this loop: that all happens in the tasks we
/// spawn.
///
/// Scheduling iterations are somewhat infrequent. However, each one will enqueue
/// all tenants that require an upload, and in between scheduling iterations we will
/// continue to spawn new tasks for pending tenants, as our concurrency limit permits.
///
/// While we take a CancellationToken here, it is subordinate to the CancellationTokens
/// of tenants: i.e. we expect all Tenants to have been shut down before we are shut down, otherwise
/// we might block waiting on a Tenant.
pub(super) async fn heatmap_uploader_task(
tenant_manager: Arc<TenantManager>,
remote_storage: GenericRemoteStorage,
mut command_queue: tokio::sync::mpsc::Receiver<CommandRequest<UploadCommand>>,
background_jobs_can_start: Barrier,
cancel: CancellationToken,
) -> anyhow::Result<()> {
let concurrent_uploads = tenant_manager.get_conf().heatmap_upload_concurrency;
let (result_tx, result_rx) = tokio::sync::mpsc::unbounded_channel();
let mut uploader = HeatmapUploader {
tenant_manager,
remote_storage,
cancel: cancel.clone(),
tasks: JoinSet::new(),
tenants: HashMap::new(),
tenants_pending: std::collections::VecDeque::new(),
tenants_uploading: HashMap::new(),
task_result_tx: result_tx,
task_result_rx: result_rx,
concurrent_uploads,
scheduling_interval: DEFAULT_SCHEDULING_INTERVAL,
};
tracing::info!("Waiting for background_jobs_can start...");
background_jobs_can_start.wait().await;
tracing::info!("background_jobs_can is ready, proceeding.");
while !cancel.is_cancelled() {
// Look for new work: this is relatively expensive because we have to go acquire the lock on
// the tenant manager to retrieve tenants, and then iterate over them to figure out which ones
// require an upload.
uploader.schedule_iteration().await?;
// Between scheduling iterations, we will:
// - Drain any complete tasks and spawn pending tasks
// - Handle incoming administrative commands
// - Check our cancellation token
let next_scheduling_iteration = Instant::now()
.checked_add(uploader.scheduling_interval)
.unwrap_or_else(|| {
tracing::warn!(
"Scheduling interval invalid ({}s), running immediately!",
uploader.scheduling_interval.as_secs_f64()
);
Instant::now()
});
loop {
tokio::select! {
_ = cancel.cancelled() => {
// We do not simply drop the JoinSet, in order to have an orderly shutdown without cancellation.
tracing::info!("Heatmap uploader joining tasks");
while let Some(_r) = uploader.tasks.join_next().await {};
tracing::info!("Heatmap uploader terminating");
break;
},
_ = tokio::time::sleep(next_scheduling_iteration.duration_since(Instant::now())) => {
tracing::debug!("heatmap_uploader_task: woke for scheduling interval");
break;},
cmd = command_queue.recv() => {
tracing::debug!("heatmap_uploader_task: woke for command queue");
let cmd = match cmd {
Some(c) =>c,
None => {
// SecondaryController was destroyed, and this has raced with
// our CancellationToken
tracing::info!("Heatmap uploader terminating");
cancel.cancel();
break;
}
};
let CommandRequest{
response_tx,
payload
} = cmd;
uploader.handle_command(payload, response_tx);
},
_ = uploader.process_next_completion() => {
if !cancel.is_cancelled() {
uploader.spawn_pending();
}
}
}
}
}
Ok(())
}
impl HeatmapUploader {
/// Periodic execution phase: inspect all attached tenants and schedule any work they require.
async fn schedule_iteration(&mut self) -> anyhow::Result<()> {
// Cull any entries in self.tenants whose Arc<Tenant> is gone
self.tenants
.retain(|_k, v| v.tenant.upgrade().is_some() && v.next_upload.is_some());
// The priority order of previously scheduled work may be invalidated by current state: drop
// all pending work (it will be re-scheduled if still needed)
self.tenants_pending.clear();
// Used a fixed 'now' through the following loop, for efficiency and fairness.
let now = Instant::now();
// While iterating over the potentially-long list of tenants, we will periodically yield
// to avoid blocking executor.
const YIELD_ITERATIONS: usize = 1000;
// Iterate over tenants looking for work to do.
let tenants = self.tenant_manager.get_attached_active_tenant_shards();
for (i, tenant) in tenants.into_iter().enumerate() {
// Process is shutting down, drop out
if self.cancel.is_cancelled() {
return Ok(());
}
// Skip tenants that already have a write in flight
if self
.tenants_uploading
.contains_key(tenant.get_tenant_shard_id())
{
continue;
}
self.maybe_schedule_upload(&now, tenant);
if i + 1 % YIELD_ITERATIONS == 0 {
tokio::task::yield_now().await;
}
}
// Spawn tasks for as many of our pending tenants as we can.
self.spawn_pending();
Ok(())
}
///
/// Cancellation: this method is cancel-safe.
async fn process_next_completion(&mut self) {
match self.task_result_rx.recv().await {
Some(r) => {
self.on_completion(r);
}
None => {
unreachable!("Result sender is stored on Self");
}
}
}
/// The 'maybe' refers to the tenant's state: whether it is configured
/// for heatmap uploads at all, and whether sufficient time has passed
/// since the last upload.
fn maybe_schedule_upload(&mut self, now: &Instant, tenant: Arc<Tenant>) {
match tenant.get_heatmap_period() {
None => {
// Heatmaps are disabled for this tenant
return;
}
Some(period) => {
// If any tenant has asked for uploads more frequent than our scheduling interval,
// reduce it to match so that we can keep up. This is mainly useful in testing, where
// we may set rather short intervals.
if period < self.scheduling_interval {
self.scheduling_interval = std::cmp::max(period, MIN_SCHEDULING_INTERVAL);
}
}
}
// Stale attachments do not upload anything: if we are in this state, there is probably some
// other attachment in mode Single or Multi running on another pageserver, and we don't
// want to thrash and overwrite their heatmap uploads.
if tenant.get_attach_mode() == AttachmentMode::Stale {
return;
}
// Create an entry in self.tenants if one doesn't already exist: this will later be updated
// with the completion time in on_completion.
let state = self
.tenants
.entry(*tenant.get_tenant_shard_id())
.or_insert_with(|| UploaderTenantState {
tenant: Arc::downgrade(&tenant),
last_upload: None,
next_upload: Some(Instant::now()),
last_digest: None,
});
// Decline to do the upload if insufficient time has passed
if state.next_upload.map(|nu| &nu > now).unwrap_or(false) {
return;
}
let last_digest = state.last_digest;
self.tenants_pending.push_back(UploadPending {
tenant,
last_digest,
})
}
fn spawn_pending(&mut self) {
while !self.tenants_pending.is_empty()
&& self.tenants_uploading.len() < self.concurrent_uploads
{
// unwrap: loop condition includes !is_empty()
let pending = self.tenants_pending.pop_front().unwrap();
self.spawn_upload(pending.tenant, pending.last_digest);
}
}
fn spawn_upload(&mut self, tenant: Arc<Tenant>, last_digest: Option<md5::Digest>) {
let remote_storage = self.remote_storage.clone();
let tenant_shard_id = *tenant.get_tenant_shard_id();
let (completion, barrier) = utils::completion::channel();
let result_tx = self.task_result_tx.clone();
self.tasks.spawn(async move {
// Guard for the barrier in [`WriteInProgress`]
let _completion = completion;
let started_at = Instant::now();
let digest = match upload_tenant_heatmap(remote_storage, &tenant, last_digest).await {
Ok(UploadHeatmapOutcome::Uploaded(digest)) => {
let duration = Instant::now().duration_since(started_at);
SECONDARY_MODE
.upload_heatmap_duration
.observe(duration.as_secs_f64());
SECONDARY_MODE.upload_heatmap.inc();
Some(digest)
}
Ok(UploadHeatmapOutcome::NoChange | UploadHeatmapOutcome::Skipped) => last_digest,
Err(UploadHeatmapError::Upload(e)) => {
tracing::warn!(
"Failed to upload heatmap for tenant {}: {e:#}",
tenant.get_tenant_shard_id(),
);
let duration = Instant::now().duration_since(started_at);
SECONDARY_MODE
.upload_heatmap_duration
.observe(duration.as_secs_f64());
SECONDARY_MODE.upload_heatmap_errors.inc();
last_digest
}
Err(UploadHeatmapError::Cancelled) => {
tracing::info!("Cancelled heatmap upload, shutting down");
last_digest
}
};
let now = Instant::now();
let next_upload = tenant
.get_heatmap_period()
.and_then(|period| now.checked_add(period));
result_tx
.send(WriteComplete {
tenant_shard_id: *tenant.get_tenant_shard_id(),
completed_at: now,
digest,
next_upload,
})
.ok();
});
self.tenants_uploading
.insert(tenant_shard_id, WriteInProgress { barrier });
}
#[instrument(skip_all, fields(tenant_id=%completion.tenant_shard_id.tenant_id, shard_id=%completion.tenant_shard_id.shard_slug()))]
fn on_completion(&mut self, completion: WriteComplete) {
tracing::debug!("Heatmap upload completed");
let WriteComplete {
tenant_shard_id,
completed_at,
digest,
next_upload,
} = completion;
self.tenants_uploading.remove(&tenant_shard_id);
use std::collections::hash_map::Entry;
match self.tenants.entry(tenant_shard_id) {
Entry::Vacant(_) => {
// Tenant state was dropped, nothing to update.
}
Entry::Occupied(mut entry) => {
entry.get_mut().last_upload = Some(completed_at);
entry.get_mut().last_digest = digest;
entry.get_mut().next_upload = next_upload
}
}
}
fn handle_command(
&mut self,
command: UploadCommand,
response_tx: tokio::sync::oneshot::Sender<CommandResponse>,
) {
match command {
UploadCommand::Upload(tenant_shard_id) => {
// If an upload was ongoing for this tenant, let it finish first.
let barrier = if let Some(writing_state) =
self.tenants_uploading.get(&tenant_shard_id)
{
tracing::info!(
tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(),
"Waiting for heatmap write to complete");
writing_state.barrier.clone()
} else {
// Spawn the upload then immediately wait for it. This will block processing of other commands and
// starting of other background work.
tracing::info!(
tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(),
"Starting heatmap write on command");
let tenant = match self
.tenant_manager
.get_attached_tenant_shard(tenant_shard_id, true)
{
Ok(t) => t,
Err(e) => {
// Drop result of send: we don't care if caller dropped their receiver
drop(response_tx.send(CommandResponse {
result: Err(e.into()),
}));
return;
}
};
self.spawn_upload(tenant, None);
let writing_state = self
.tenants_uploading
.get(&tenant_shard_id)
.expect("We just inserted this");
tracing::info!(
tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(),
"Waiting for heatmap upload to complete");
writing_state.barrier.clone()
};
// This task does no I/O: it only listens for a barrier's completion and then
// sends to the command response channel. It is therefore safe to spawn this without
// any gates/task_mgr hooks.
tokio::task::spawn(async move {
barrier.wait().await;
tracing::info!(
tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(),
"Heatmap upload complete");
// Drop result of send: we don't care if caller dropped their receiver
drop(response_tx.send(CommandResponse { result: Ok(()) }))
});
}
}
}
}
enum UploadHeatmapOutcome {
/// We successfully wrote to remote storage, with this digest.
Uploaded(md5::Digest),
/// We did not upload because the heatmap digest was unchanged since the last upload
NoChange,
/// We skipped the upload for some reason, such as tenant/timeline not ready
Skipped,
}
#[derive(thiserror::Error, Debug)]
enum UploadHeatmapError {
#[error("Cancelled")]
Cancelled,
#[error(transparent)]
Upload(#[from] anyhow::Error),
}
/// The inner upload operation. This will skip if `last_digest` is Some and matches the digest
/// of the object we would have uploaded.
#[instrument(skip_all, fields(tenant_id = %tenant.get_tenant_shard_id().tenant_id, shard_id = %tenant.get_tenant_shard_id().shard_slug()))]
async fn upload_tenant_heatmap(
remote_storage: GenericRemoteStorage,
tenant: &Arc<Tenant>,
last_digest: Option<md5::Digest>,
) -> Result<UploadHeatmapOutcome, UploadHeatmapError> {
debug_assert_current_span_has_tenant_id();
let generation = tenant.get_generation();
if generation.is_none() {
// We do not expect this: generations were implemented before heatmap uploads. However,
// handle it so that we don't have to make the generation in the heatmap an Option<>
// (Generation::none is not serializable)
tracing::warn!("Skipping heatmap upload for tenant with generation==None");
return Ok(UploadHeatmapOutcome::Skipped);
}
let mut heatmap = HeatMapTenant {
timelines: Vec::new(),
generation,
};
let timelines = tenant.timelines.lock().unwrap().clone();
let tenant_cancel = tenant.cancel.clone();
// Ensure that Tenant::shutdown waits for any upload in flight: this is needed because otherwise
// when we delete a tenant, we might race with an upload in flight and end up leaving a heatmap behind
// in remote storage.
let _guard = match tenant.gate.enter() {
Ok(g) => g,
Err(_) => {
tracing::info!("Skipping heatmap upload for tenant which is shutting down");
return Err(UploadHeatmapError::Cancelled);
}
};
for (timeline_id, timeline) in timelines {
let heatmap_timeline = timeline.generate_heatmap().await;
match heatmap_timeline {
None => {
tracing::debug!(
"Skipping heatmap upload because timeline {timeline_id} is not ready"
);
return Ok(UploadHeatmapOutcome::Skipped);
}
Some(heatmap_timeline) => {
heatmap.timelines.push(heatmap_timeline);
}
}
}
// Serialize the heatmap
let bytes = serde_json::to_vec(&heatmap).map_err(|e| anyhow::anyhow!(e))?;
let size = bytes.len();
// Drop out early if nothing changed since our last upload
let digest = md5::compute(&bytes);
if Some(digest) == last_digest {
return Ok(UploadHeatmapOutcome::NoChange);
}
let path = remote_heatmap_path(tenant.get_tenant_shard_id());
// Write the heatmap.
tracing::debug!("Uploading {size} byte heatmap to {path}");
if let Err(e) = backoff::retry(
|| async {
let bytes = futures::stream::once(futures::future::ready(Ok(bytes::Bytes::from(
bytes.clone(),
))));
remote_storage
.upload_storage_object(bytes, size, &path)
.await
},
|_| false,
3,
u32::MAX,
"Uploading heatmap",
backoff::Cancel::new(tenant_cancel.clone(), || anyhow::anyhow!("Shutting down")),
)
.await
{
if tenant_cancel.is_cancelled() {
return Err(UploadHeatmapError::Cancelled);
} else {
return Err(e.into());
}
}
tracing::info!("Successfully uploaded {size} byte heatmap to {path}");
Ok(UploadHeatmapOutcome::Uploaded(digest))
}

View File

@@ -457,8 +457,6 @@ struct LayerInner {
/// For loaded layers, this may be some other value if the tenant has undergone
/// a shard split since the layer was originally written.
shard: ShardIndex,
last_evicted_at: std::sync::Mutex<Option<std::time::Instant>>,
}
impl std::fmt::Display for LayerInner {
@@ -589,7 +587,6 @@ impl LayerInner {
consecutive_failures: AtomicUsize::new(0),
generation,
shard,
last_evicted_at: std::sync::Mutex::default(),
}
}
@@ -725,14 +722,6 @@ impl LayerInner {
permit
};
let since_last_eviction =
self.last_evicted_at.lock().unwrap().map(|ts| ts.elapsed());
if let Some(since_last_eviction) = since_last_eviction {
// FIXME: this will not always be recorded correctly until #6028 (the no
// download needed branch above)
LAYER_IMPL_METRICS.record_redownloaded_after(since_last_eviction);
}
let res = Arc::new(DownloadedLayer {
owner: Arc::downgrade(self),
kind: tokio::sync::OnceCell::default(),
@@ -852,6 +841,7 @@ impl LayerInner {
Some(self.desc.timeline_id),
&task_name,
false,
timeline.cancel.child_token(),
async move {
let client = timeline
@@ -871,6 +861,21 @@ impl LayerInner {
Ok(())
}
Err(e) => {
let consecutive_failures =
this.consecutive_failures.fetch_add(1, Ordering::Relaxed);
let backoff = utils::backoff::exponential_backoff_duration_seconds(
consecutive_failures.min(u32::MAX as usize) as u32,
1.5,
60.0,
);
let backoff = std::time::Duration::from_secs_f64(backoff);
tokio::select! {
_ = tokio::time::sleep(backoff) => {},
_ = crate::task_mgr::shutdown_token().cancelled_owned() => {},
};
Err(e)
}
};
@@ -918,24 +923,7 @@ impl LayerInner {
Ok(permit)
}
Ok((Err(e), _permit)) => {
// FIXME: this should be with the spawned task and be cancellation sensitive
//
// while we should not need this, this backoff has turned out to be useful with
// a bug of unexpectedly deleted remote layer file (#5787).
let consecutive_failures =
self.consecutive_failures.fetch_add(1, Ordering::Relaxed);
tracing::error!(consecutive_failures, "layer file download failed: {e:#}");
let backoff = utils::backoff::exponential_backoff_duration_seconds(
consecutive_failures.min(u32::MAX as usize) as u32,
1.5,
60.0,
);
let backoff = std::time::Duration::from_secs_f64(backoff);
tokio::time::sleep(backoff).await;
Err(DownloadError::DownloadFailed)
}
Ok((Err(_), _permit)) => Err(DownloadError::DownloadFailed),
Err(_gone) => Err(DownloadError::DownloadCancelled),
}
}
@@ -1128,8 +1116,6 @@ impl LayerInner {
// we are still holding the permit, so no new spawn_download_and_wait can happen
drop(self.status.send(Status::Evicted));
*self.last_evicted_at.lock().unwrap() = Some(std::time::Instant::now());
res
}
@@ -1434,7 +1420,6 @@ pub(crate) struct LayerImplMetrics {
rare_counters: enum_map::EnumMap<RareEvent, IntCounter>,
inits_cancelled: metrics::core::GenericCounter<metrics::core::AtomicU64>,
redownload_after: metrics::Histogram,
}
impl Default for LayerImplMetrics {
@@ -1510,26 +1495,6 @@ impl Default for LayerImplMetrics {
)
.unwrap();
let redownload_after = {
let minute = 60.0;
let hour = 60.0 * minute;
metrics::register_histogram!(
"pageserver_layer_redownloaded_after",
"Time between evicting and re-downloading.",
vec![
10.0,
30.0,
minute,
5.0 * minute,
15.0 * minute,
30.0 * minute,
hour,
12.0 * hour,
]
)
.unwrap()
};
Self {
started_evictions,
completed_evictions,
@@ -1541,7 +1506,6 @@ impl Default for LayerImplMetrics {
rare_counters,
inits_cancelled,
redownload_after,
}
}
}
@@ -1609,10 +1573,6 @@ impl LayerImplMetrics {
fn inc_init_cancelled(&self) {
self.inits_cancelled.inc()
}
fn record_redownloaded_after(&self, duration: std::time::Duration) {
self.redownload_after.observe(duration.as_secs_f64())
}
}
#[derive(enum_map::Enum)]

View File

@@ -54,14 +54,17 @@ impl BackgroundLoopKind {
}
}
/// Cancellation safe.
pub(crate) async fn concurrent_background_tasks_rate_limit_permit(
loop_kind: BackgroundLoopKind,
_ctx: &RequestContext,
) -> impl Drop {
let _guard = crate::metrics::BACKGROUND_LOOP_SEMAPHORE_WAIT_GAUGE
crate::metrics::BACKGROUND_LOOP_SEMAPHORE_WAIT_START_COUNT
.with_label_values(&[loop_kind.as_static_str()])
.guard();
.inc();
scopeguard::defer!(
crate::metrics::BACKGROUND_LOOP_SEMAPHORE_WAIT_FINISH_COUNT.with_label_values(&[loop_kind.as_static_str()]).inc();
);
match CONCURRENT_BACKGROUND_TASKS.acquire().await {
Ok(permit) => permit,
@@ -82,6 +85,7 @@ pub fn start_background_loops(
None,
&format!("compactor for tenant {tenant_shard_id}"),
false,
tenant.cancel.child_token(),
{
let tenant = Arc::clone(tenant);
let background_jobs_can_start = background_jobs_can_start.cloned();
@@ -105,6 +109,7 @@ pub fn start_background_loops(
None,
&format!("garbage collector for tenant {tenant_shard_id}"),
false,
tenant.cancel.child_token(),
{
let tenant = Arc::clone(tenant);
let background_jobs_can_start = background_jobs_can_start.cloned();

View File

@@ -98,9 +98,8 @@ use self::logical_size::LogicalSize;
use self::walreceiver::{WalReceiver, WalReceiverConf};
use super::config::TenantConf;
use super::remote_timeline_client::index::{IndexLayerMetadata, IndexPart};
use super::remote_timeline_client::index::IndexPart;
use super::remote_timeline_client::RemoteTimelineClient;
use super::secondary::heatmap::{HeatMapLayer, HeatMapTimeline};
use super::{debug_assert_current_span_has_tenant_and_timeline_id, AttachedTenantConf};
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
@@ -446,12 +445,6 @@ pub(crate) enum CompactFlags {
ForceRepartition,
}
impl std::fmt::Debug for Timeline {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "Timeline<{}>", self.timeline_id)
}
}
/// Public interface functions
impl Timeline {
/// Get the LSN where this branch was created
@@ -1404,6 +1397,7 @@ impl Timeline {
Some(self.timeline_id),
"layer flush task",
false,
self.cancel.child_token(),
async move {
let _guard = guard;
let background_ctx = RequestContext::todo_child(TaskKind::LayerFlushTask, DownloadBehavior::Error);
@@ -1755,6 +1749,7 @@ impl Timeline {
Some(self.timeline_id),
"initial size calculation",
false,
self.cancel.child_token(),
// NB: don't log errors here, task_mgr will do that.
async move {
let cancel = task_mgr::shutdown_token();
@@ -1800,9 +1795,6 @@ impl Timeline {
permit = wait_for_permit => {
(Some(permit), StartCircumstances::AfterBackgroundTasksRateLimit)
}
_ = self_ref.cancel.cancelled() => {
return Err(BackgroundCalculationError::Cancelled);
}
_ = cancel.cancelled() => {
return Err(BackgroundCalculationError::Cancelled);
},
@@ -1928,6 +1920,7 @@ impl Timeline {
Some(self.timeline_id),
"ondemand logical size calculation",
false,
self.cancel.child_token(),
async move {
let res = self_clone
.logical_size_calculation_task(lsn, cause, &ctx)
@@ -2070,55 +2063,6 @@ impl Timeline {
None
}
/// The timeline heatmap is a hint to secondary locations from the primary location,
/// indicating which layers are currently on-disk on the primary.
///
/// None is returned if the Timeline is in a state where uploading a heatmap
/// doesn't make sense, such as shutting down or initializing. The caller
/// should treat this as a cue to simply skip doing any heatmap uploading
/// for this timeline.
pub(crate) async fn generate_heatmap(&self) -> Option<HeatMapTimeline> {
let eviction_info = self.get_local_layers_for_disk_usage_eviction().await;
let remote_client = match &self.remote_client {
Some(c) => c,
None => return None,
};
let layer_file_names = eviction_info
.resident_layers
.iter()
.map(|l| l.layer.layer_desc().filename())
.collect::<Vec<_>>();
let decorated = match remote_client.get_layers_metadata(layer_file_names) {
Ok(d) => d,
Err(_) => {
// Getting metadata only fails on Timeline in bad state.
return None;
}
};
let heatmap_layers = std::iter::zip(
eviction_info.resident_layers.into_iter(),
decorated.into_iter(),
)
.filter_map(|(layer, remote_info)| {
remote_info.map(|remote_info| {
HeatMapLayer::new(
layer.layer.layer_desc().filename(),
IndexLayerMetadata::from(remote_info),
layer.last_activity_ts,
)
})
});
Some(HeatMapTimeline::new(
self.timeline_id,
heatmap_layers.collect(),
))
}
}
type TraversalId = String;
@@ -4209,6 +4153,7 @@ impl Timeline {
Some(self.timeline_id),
"download all remote layers task",
false,
self.cancel.child_token(),
async move {
self_clone.download_all_remote_layers(request).await;
let mut status_guard = self_clone.download_all_remote_layers_task_info.write().unwrap();

View File

@@ -6,6 +6,7 @@ use std::{
use anyhow::Context;
use pageserver_api::{models::TimelineState, shard::TenantShardId};
use tokio::sync::OwnedMutexGuard;
use tokio_util::sync::CancellationToken;
use tracing::{debug, error, info, instrument, warn, Instrument, Span};
use utils::{crashsafe, fs_ext, id::TimelineId};
@@ -406,6 +407,7 @@ impl DeleteTimelineFlow {
local_metadata: &TimelineMetadata,
remote_client: Option<RemoteTimelineClient>,
deletion_queue_client: DeletionQueueClient,
cancel: CancellationToken,
) -> anyhow::Result<()> {
// Note: here we even skip populating layer map. Timeline is essentially uninitialized.
// RemoteTimelineClient is the only functioning part.
@@ -421,6 +423,7 @@ impl DeleteTimelineFlow {
// Important. We dont pass ancestor above because it can be missing.
// Thus we need to skip the validation here.
CreateTimelineCause::Delete,
cancel,
)
.context("create_timeline_struct")?;
@@ -532,6 +535,7 @@ impl DeleteTimelineFlow {
Some(timeline_id),
"timeline_delete",
false,
tenant.cancel.child_token(),
async move {
if let Err(err) = Self::background(guard, conf, &tenant, &timeline).await {
error!("Error: {err:#}");

View File

@@ -67,6 +67,7 @@ impl Timeline {
self.tenant_shard_id, self.timeline_id
),
false,
self.cancel.child_token(),
async move {
let cancel = task_mgr::shutdown_token();
tokio::select! {
@@ -166,7 +167,6 @@ impl Timeline {
let _permit = tokio::select! {
permit = acquire_permit => permit,
_ = cancel.cancelled() => return ControlFlow::Break(()),
_ = self.cancel.cancelled() => return ControlFlow::Break(()),
};
// If we evict layers but keep cached values derived from those layers, then

View File

@@ -19,14 +19,14 @@ use super::Timeline;
pub struct UninitializedTimeline<'t> {
pub(crate) owning_tenant: &'t Tenant,
timeline_id: TimelineId,
raw_timeline: Option<(Arc<Timeline>, TimelineUninitMark<'t>)>,
raw_timeline: Option<(Arc<Timeline>, TimelineUninitMark)>,
}
impl<'t> UninitializedTimeline<'t> {
pub(crate) fn new(
owning_tenant: &'t Tenant,
timeline_id: TimelineId,
raw_timeline: Option<(Arc<Timeline>, TimelineUninitMark<'t>)>,
raw_timeline: Option<(Arc<Timeline>, TimelineUninitMark)>,
) -> Self {
Self {
owning_tenant,
@@ -169,55 +169,18 @@ pub(crate) fn cleanup_timeline_directory(uninit_mark: TimelineUninitMark) {
///
/// XXX: it's important to create it near the timeline dir, not inside it to ensure timeline dir gets removed first.
#[must_use]
pub(crate) struct TimelineUninitMark<'t> {
owning_tenant: &'t Tenant,
timeline_id: TimelineId,
pub(crate) struct TimelineUninitMark {
uninit_mark_deleted: bool,
uninit_mark_path: Utf8PathBuf,
pub(crate) timeline_path: Utf8PathBuf,
}
/// Errors when acquiring exclusive access to a timeline ID for creation
#[derive(thiserror::Error, Debug)]
pub(crate) enum TimelineExclusionError {
#[error("Already exists")]
AlreadyExists(Arc<Timeline>),
#[error("Already creating")]
AlreadyCreating,
// e.g. I/O errors, or some failure deep in postgres initdb
#[error(transparent)]
Other(#[from] anyhow::Error),
}
impl<'t> TimelineUninitMark<'t> {
pub(crate) fn new(
owning_tenant: &'t Tenant,
timeline_id: TimelineId,
uninit_mark_path: Utf8PathBuf,
timeline_path: Utf8PathBuf,
) -> Result<Self, TimelineExclusionError> {
// Lock order: this is the only place we take both locks. During drop() we only
// lock creating_timelines
let timelines = owning_tenant.timelines.lock().unwrap();
let mut creating_timelines: std::sync::MutexGuard<
'_,
std::collections::HashSet<TimelineId>,
> = owning_tenant.timelines_creating.lock().unwrap();
if let Some(existing) = timelines.get(&timeline_id) {
Err(TimelineExclusionError::AlreadyExists(existing.clone()))
} else if creating_timelines.contains(&timeline_id) {
Err(TimelineExclusionError::AlreadyCreating)
} else {
creating_timelines.insert(timeline_id);
Ok(Self {
owning_tenant,
timeline_id,
uninit_mark_deleted: false,
uninit_mark_path,
timeline_path,
})
impl TimelineUninitMark {
pub(crate) fn new(uninit_mark_path: Utf8PathBuf, timeline_path: Utf8PathBuf) -> Self {
Self {
uninit_mark_deleted: false,
uninit_mark_path,
timeline_path,
}
}
@@ -244,7 +207,7 @@ impl<'t> TimelineUninitMark<'t> {
}
}
impl Drop for TimelineUninitMark<'_> {
impl Drop for TimelineUninitMark {
fn drop(&mut self) {
if !self.uninit_mark_deleted {
if self.timeline_path.exists() {
@@ -263,11 +226,5 @@ impl Drop for TimelineUninitMark<'_> {
}
}
}
self.owning_tenant
.timelines_creating
.lock()
.unwrap()
.remove(&self.timeline_id);
}
}

View File

@@ -87,6 +87,7 @@ impl WalReceiver {
Some(timeline_id),
&format!("walreceiver for timeline {tenant_shard_id}/{timeline_id}"),
false,
timeline.cancel.child_token(),
async move {
debug_assert_current_span_has_tenant_and_timeline_id();
debug!("WAL receiver manager started, connecting to broker");

View File

@@ -167,6 +167,7 @@ pub(super) async fn handle_walreceiver_connection(
Some(timeline.timeline_id),
"walreceiver connection",
false,
cancellation.clone(),
async move {
debug_assert_current_span_has_tenant_and_timeline_id();

View File

@@ -8,6 +8,7 @@ use std::fmt::Debug;
use chrono::NaiveDateTime;
use std::sync::Arc;
use tokio_util::sync::CancellationToken;
use tracing::info;
use utils::lsn::AtomicLsn;
@@ -98,6 +99,8 @@ pub(crate) struct UploadQueueInitialized {
/// wait on until one of them stops the queue. The semaphore is closed when
/// `RemoteTimelineClient::launch_queued_tasks` encounters `UploadOp::Shutdown`.
pub(crate) shutdown_ready: Arc<tokio::sync::Semaphore>,
pub(crate) cancel: CancellationToken,
}
impl UploadQueueInitialized {
@@ -130,6 +133,7 @@ impl UploadQueue {
pub(crate) fn initialize_empty_remote(
&mut self,
metadata: &TimelineMetadata,
cancel: CancellationToken,
) -> anyhow::Result<&mut UploadQueueInitialized> {
match self {
UploadQueue::Uninitialized => (),
@@ -158,6 +162,7 @@ impl UploadQueue {
dangling_files: HashMap::new(),
shutting_down: false,
shutdown_ready: Arc::new(tokio::sync::Semaphore::new(0)),
cancel,
};
*self = UploadQueue::Initialized(state);
@@ -167,6 +172,7 @@ impl UploadQueue {
pub(crate) fn initialize_with_current_remote_index_part(
&mut self,
index_part: &IndexPart,
cancel: CancellationToken,
) -> anyhow::Result<&mut UploadQueueInitialized> {
match self {
UploadQueue::Uninitialized => (),
@@ -207,6 +213,7 @@ impl UploadQueue {
dangling_files: HashMap::new(),
shutting_down: false,
shutdown_ready: Arc::new(tokio::sync::Semaphore::new(0)),
cancel,
};
*self = UploadQueue::Initialized(state);

View File

@@ -2191,7 +2191,7 @@ mod tests {
.load()
.await;
let tline = tenant
.bootstrap_timeline_test(TIMELINE_ID, pg_version, None, &ctx)
.bootstrap_timeline(TIMELINE_ID, pg_version, None, &ctx)
.await
.unwrap();

View File

@@ -7,8 +7,6 @@ use proxy::console;
use proxy::console::provider::AllowedIpsCache;
use proxy::console::provider::NodeInfoCache;
use proxy::http;
use proxy::rate_limiter::EndpointRateLimiter;
use proxy::rate_limiter::RateBucketInfo;
use proxy::rate_limiter::RateLimiterConfig;
use proxy::usage_metrics;
@@ -16,7 +14,6 @@ use anyhow::bail;
use proxy::config::{self, ProxyConfig};
use proxy::serverless;
use std::pin::pin;
use std::sync::Arc;
use std::{borrow::Cow, net::SocketAddr};
use tokio::net::TcpListener;
use tokio::task::JoinSet;
@@ -116,11 +113,8 @@ struct ProxyCliArgs {
#[clap(long, default_value = "15s", value_parser = humantime::parse_duration)]
rate_limiter_timeout: tokio::time::Duration,
/// Endpoint rate limiter max number of requests per second.
///
/// Provided in the form '<Requests Per Second>@<Bucket Duration Size>'.
/// Can be given multiple times for different bucket sizes.
#[clap(long, default_values_t = RateBucketInfo::DEFAULT_SET)]
endpoint_rps_limit: Vec<RateBucketInfo>,
#[clap(long, default_value_t = 300)]
endpoint_rps_limit: u32,
/// Initial limit for dynamic rate limiter. Makes sense only if `rate_limit_algorithm` is *not* `None`.
#[clap(long, default_value_t = 100)]
initial_limit: usize,
@@ -163,8 +157,6 @@ async fn main() -> anyhow::Result<()> {
let proxy_listener = TcpListener::bind(proxy_address).await?;
let cancellation_token = CancellationToken::new();
let endpoint_rate_limiter = Arc::new(EndpointRateLimiter::new(&config.endpoint_rps_limit));
// client facing tasks. these will exit on error or on cancellation
// cancellation returns Ok(())
let mut client_tasks = JoinSet::new();
@@ -172,7 +164,6 @@ async fn main() -> anyhow::Result<()> {
config,
proxy_listener,
cancellation_token.clone(),
endpoint_rate_limiter.clone(),
));
// TODO: rename the argument to something like serverless.
@@ -186,7 +177,6 @@ async fn main() -> anyhow::Result<()> {
config,
serverless_listener,
cancellation_token.clone(),
endpoint_rate_limiter.clone(),
));
}
@@ -321,10 +311,6 @@ fn build_config(args: &ProxyCliArgs) -> anyhow::Result<&'static ProxyConfig> {
let authentication_config = AuthenticationConfig {
scram_protocol_timeout: args.scram_protocol_timeout,
};
let mut endpoint_rps_limit = args.endpoint_rps_limit.clone();
RateBucketInfo::validate(&mut endpoint_rps_limit)?;
let config = Box::leak(Box::new(ProxyConfig {
tls_config,
auth_backend,
@@ -334,35 +320,8 @@ fn build_config(args: &ProxyCliArgs) -> anyhow::Result<&'static ProxyConfig> {
authentication_config,
require_client_ip: args.require_client_ip,
disable_ip_check_for_http: args.disable_ip_check_for_http,
endpoint_rps_limit,
endpoint_rps_limit: args.endpoint_rps_limit,
}));
Ok(config)
}
#[cfg(test)]
mod tests {
use std::time::Duration;
use clap::Parser;
use proxy::rate_limiter::RateBucketInfo;
#[test]
fn parse_endpoint_rps_limit() {
let config = super::ProxyCliArgs::parse_from([
"proxy",
"--endpoint-rps-limit",
"100@1s",
"--endpoint-rps-limit",
"20@30s",
]);
assert_eq!(
config.endpoint_rps_limit,
vec![
RateBucketInfo::new(100, Duration::from_secs(1)),
RateBucketInfo::new(20, Duration::from_secs(30)),
]
);
}
}

View File

@@ -1,13 +1,9 @@
use crate::{
auth::parse_endpoint_param,
cancellation::CancelClosure,
console::errors::WakeComputeError,
error::UserFacingError,
proxy::{neon_option, NUM_DB_CONNECTIONS_GAUGE},
auth::parse_endpoint_param, cancellation::CancelClosure, console::errors::WakeComputeError,
error::UserFacingError, proxy::neon_option,
};
use futures::{FutureExt, TryFutureExt};
use itertools::Itertools;
use metrics::IntCounterPairGuard;
use pq_proto::StartupMessageParams;
use std::{io, net::SocketAddr, time::Duration};
use thiserror::Error;
@@ -227,8 +223,6 @@ pub struct PostgresConnection {
pub params: std::collections::HashMap<String, String>,
/// Query cancellation token.
pub cancel_closure: CancelClosure,
_guage: IntCounterPairGuard,
}
impl ConnCfg {
@@ -237,7 +231,6 @@ impl ConnCfg {
&self,
allow_self_signed_compute: bool,
timeout: Duration,
proto: &'static str,
) -> Result<PostgresConnection, ConnectionError> {
let (socket_addr, stream, host) = self.connect_raw(timeout).await?;
@@ -271,7 +264,6 @@ impl ConnCfg {
stream,
params,
cancel_closure,
_guage: NUM_DB_CONNECTIONS_GAUGE.with_label_values(&[proto]).guard(),
};
Ok(connection)

View File

@@ -1,4 +1,4 @@
use crate::{auth, rate_limiter::RateBucketInfo};
use crate::auth;
use anyhow::{bail, ensure, Context, Ok};
use rustls::{sign, Certificate, PrivateKey};
use sha2::{Digest, Sha256};
@@ -20,7 +20,7 @@ pub struct ProxyConfig {
pub authentication_config: AuthenticationConfig,
pub require_client_ip: bool,
pub disable_ip_check_for_http: bool,
pub endpoint_rps_limit: Vec<RateBucketInfo>,
pub endpoint_rps_limit: u32,
}
#[derive(Debug)]

View File

@@ -4,12 +4,14 @@
pub mod health_server;
use std::time::Duration;
use std::{sync::Arc, time::Duration};
use futures::FutureExt;
pub use reqwest::{Request, Response, StatusCode};
pub use reqwest_middleware::{ClientWithMiddleware, Error};
pub use reqwest_retry::{policies::ExponentialBackoff, RetryTransientMiddleware};
use tokio::time::Instant;
use tracing::trace;
use crate::{proxy::CONSOLE_REQUEST_LATENCY, rate_limiter, url::ApiUrl};
use reqwest_middleware::RequestBuilder;
@@ -19,7 +21,7 @@ use reqwest_middleware::RequestBuilder;
/// We deliberately don't want to replace this with a public static.
pub fn new_client(rate_limiter_config: rate_limiter::RateLimiterConfig) -> ClientWithMiddleware {
let client = reqwest::ClientBuilder::new()
.http2_prior_knowledge()
.dns_resolver(Arc::new(GaiResolver::default()))
.connection_verbose(true)
.build()
.expect("Failed to create http client");
@@ -32,6 +34,7 @@ pub fn new_client(rate_limiter_config: rate_limiter::RateLimiterConfig) -> Clien
pub fn new_client_with_timeout(default_timout: Duration) -> ClientWithMiddleware {
let timeout_client = reqwest::ClientBuilder::new()
.dns_resolver(Arc::new(GaiResolver::default()))
.connection_verbose(true)
.timeout(default_timout)
.build()
@@ -97,6 +100,37 @@ impl Endpoint {
}
}
/// https://docs.rs/reqwest/0.11.18/src/reqwest/dns/gai.rs.html
use hyper::{
client::connect::dns::{GaiResolver as HyperGaiResolver, Name},
service::Service,
};
use reqwest::dns::{Addrs, Resolve, Resolving};
#[derive(Debug)]
pub struct GaiResolver(HyperGaiResolver);
impl Default for GaiResolver {
fn default() -> Self {
Self(HyperGaiResolver::new())
}
}
impl Resolve for GaiResolver {
fn resolve(&self, name: Name) -> Resolving {
let this = &mut self.0.clone();
let start = Instant::now();
Box::pin(
Service::<Name>::call(this, name.clone()).map(move |result| {
let resolve_duration = start.elapsed();
trace!(duration = ?resolve_duration, addr = %name, "resolve host complete");
result
.map(|addrs| -> Addrs { Box::new(addrs) })
.map_err(|err| -> Box<dyn std::error::Error + Send + Sync> { Box::new(err) })
}),
)
}
}
#[cfg(test)]
mod tests {
use super::*;

View File

@@ -9,7 +9,7 @@ use crate::{
console::{self, errors::WakeComputeError, messages::MetricsAuxInfo, Api},
http::StatusCode,
protocol2::WithClientIp,
rate_limiter::EndpointRateLimiter,
rate_limiter::{EndpointRateLimiter, RateBucketInfo},
stream::{PqStream, Stream},
usage_metrics::{Ids, USAGE_METRICS},
};
@@ -17,10 +17,7 @@ use anyhow::{bail, Context};
use async_trait::async_trait;
use futures::TryFutureExt;
use itertools::Itertools;
use metrics::{
exponential_buckets, register_int_counter_pair_vec, register_int_counter_vec,
IntCounterPairVec, IntCounterVec,
};
use metrics::{exponential_buckets, register_int_counter_vec, IntCounterVec};
use once_cell::sync::{Lazy, OnceCell};
use pq_proto::{BeMessage as Be, FeStartupPacket, StartupMessageParams};
use prometheus::{
@@ -47,10 +44,17 @@ const RETRY_WAIT_EXPONENT_BASE: f64 = std::f64::consts::SQRT_2;
const ERR_INSECURE_CONNECTION: &str = "connection is insecure (try using `sslmode=require`)";
const ERR_PROTO_VIOLATION: &str = "protocol violation";
pub static NUM_DB_CONNECTIONS_GAUGE: Lazy<IntCounterPairVec> = Lazy::new(|| {
register_int_counter_pair_vec!(
pub static NUM_DB_CONNECTIONS_OPENED_COUNTER: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"proxy_opened_db_connections_total",
"Number of opened connections to a database.",
&["protocol"],
)
.unwrap()
});
pub static NUM_DB_CONNECTIONS_CLOSED_COUNTER: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"proxy_closed_db_connections_total",
"Number of closed connections to a database.",
&["protocol"],
@@ -58,10 +62,17 @@ pub static NUM_DB_CONNECTIONS_GAUGE: Lazy<IntCounterPairVec> = Lazy::new(|| {
.unwrap()
});
pub static NUM_CLIENT_CONNECTION_GAUGE: Lazy<IntCounterPairVec> = Lazy::new(|| {
register_int_counter_pair_vec!(
pub static NUM_CLIENT_CONNECTION_OPENED_COUNTER: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"proxy_opened_client_connections_total",
"Number of opened connections from a client.",
&["protocol"],
)
.unwrap()
});
pub static NUM_CLIENT_CONNECTION_CLOSED_COUNTER: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"proxy_closed_client_connections_total",
"Number of closed connections from a client.",
&["protocol"],
@@ -69,10 +80,17 @@ pub static NUM_CLIENT_CONNECTION_GAUGE: Lazy<IntCounterPairVec> = Lazy::new(|| {
.unwrap()
});
pub static NUM_CONNECTION_REQUESTS_GAUGE: Lazy<IntCounterPairVec> = Lazy::new(|| {
register_int_counter_pair_vec!(
pub static NUM_CONNECTIONS_ACCEPTED_COUNTER: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"proxy_accepted_connections_total",
"Number of client connections accepted.",
&["protocol"],
)
.unwrap()
});
pub static NUM_CONNECTIONS_CLOSED_COUNTER: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"proxy_closed_connections_total",
"Number of client connections closed.",
&["protocol"],
@@ -279,7 +297,6 @@ pub async fn task_main(
config: &'static ProxyConfig,
listener: tokio::net::TcpListener,
cancellation_token: CancellationToken,
endpoint_rate_limiter: Arc<EndpointRateLimiter>,
) -> anyhow::Result<()> {
scopeguard::defer! {
info!("proxy has shut down");
@@ -291,6 +308,10 @@ pub async fn task_main(
let connections = tokio_util::task::task_tracker::TaskTracker::new();
let cancel_map = Arc::new(CancelMap::default());
let endpoint_rate_limiter = Arc::new(EndpointRateLimiter::new([RateBucketInfo::new(
config.endpoint_rps_limit,
time::Duration::from_secs(1),
)]));
while let Some(accept_result) =
run_until_cancelled(listener.accept(), &cancellation_token).await
@@ -410,12 +431,16 @@ pub async fn handle_client<S: AsyncRead + AsyncWrite + Unpin>(
);
let proto = mode.protocol_label();
let _client_gauge = NUM_CLIENT_CONNECTION_GAUGE
NUM_CLIENT_CONNECTION_OPENED_COUNTER
.with_label_values(&[proto])
.guard();
let _request_gauge = NUM_CONNECTION_REQUESTS_GAUGE
.inc();
NUM_CONNECTIONS_ACCEPTED_COUNTER
.with_label_values(&[proto])
.guard();
.inc();
scopeguard::defer! {
NUM_CLIENT_CONNECTION_CLOSED_COUNTER.with_label_values(&[proto]).inc();
NUM_CONNECTIONS_CLOSED_COUNTER.with_label_values(&[proto]).inc();
}
let tls = config.tls_config.as_ref();
@@ -562,13 +587,12 @@ pub fn invalidate_cache(node_info: console::CachedNodeInfo) -> compute::ConnCfg
async fn connect_to_compute_once(
node_info: &console::CachedNodeInfo,
timeout: time::Duration,
proto: &'static str,
) -> Result<PostgresConnection, compute::ConnectionError> {
let allow_self_signed_compute = node_info.allow_self_signed_compute;
node_info
.config
.connect(allow_self_signed_compute, timeout, proto)
.connect(allow_self_signed_compute, timeout)
.await
}
@@ -589,7 +613,6 @@ pub trait ConnectMechanism {
pub struct TcpMechanism<'a> {
/// KV-dictionary with PostgreSQL connection params.
pub params: &'a StartupMessageParams,
pub proto: &'static str,
}
#[async_trait]
@@ -603,7 +626,7 @@ impl ConnectMechanism for TcpMechanism<'_> {
node_info: &console::CachedNodeInfo,
timeout: time::Duration,
) -> Result<PostgresConnection, Self::Error> {
connect_to_compute_once(node_info, timeout, self.proto).await
connect_to_compute_once(node_info, timeout).await
}
fn update_connect_config(&self, config: &mut compute::ConnCfg) {
@@ -1008,7 +1031,7 @@ impl<S: AsyncRead + AsyncWrite + Unpin> Client<'_, S> {
let aux = node_info.aux.clone();
let mut node = connect_to_compute(
&TcpMechanism { params, proto },
&TcpMechanism { params },
node_info,
&extra,
&creds,
@@ -1017,6 +1040,13 @@ impl<S: AsyncRead + AsyncWrite + Unpin> Client<'_, S> {
.or_else(|e| stream.throw_error(e))
.await?;
NUM_DB_CONNECTIONS_OPENED_COUNTER
.with_label_values(&[proto])
.inc();
scopeguard::defer! {
NUM_DB_CONNECTIONS_CLOSED_COUNTER.with_label_values(&[proto]).inc();
}
prepare_client_connection(&node, session, &mut stream).await?;
// Before proxy passing, forward to compute whatever data is left in the
// PqStream input buffer. Normally there is none, but our serverless npm

View File

@@ -33,6 +33,39 @@ impl Aimd {
min_utilisation_threshold: config.aimd_min_utilisation_threshold,
}
}
pub fn decrease_factor(self, factor: f32) -> Self {
assert!((0.5..1.0).contains(&factor));
Self {
decrease_factor: factor,
..self
}
}
pub fn increase_by(self, increase: usize) -> Self {
assert!(increase > 0);
Self {
increase_by: increase,
..self
}
}
pub fn with_max_limit(self, max: usize) -> Self {
assert!(max > 0);
Self {
max_limit: max,
..self
}
}
/// A threshold below which the limit won't be increased. 0.5 = 50%.
pub fn with_min_utilisation_threshold(self, min_util: f32) -> Self {
assert!(min_util > 0. && min_util < 1.);
Self {
min_utilisation_threshold: min_util,
..self
}
}
}
#[async_trait]

View File

@@ -1,16 +1,10 @@
use std::{
collections::hash_map::RandomState,
hash::BuildHasher,
sync::{
atomic::{AtomicUsize, Ordering},
Arc, Mutex,
},
use std::sync::{
atomic::{AtomicUsize, Ordering},
Arc,
};
use anyhow::bail;
use dashmap::DashMap;
use itertools::Itertools;
use rand::{rngs::StdRng, Rng, SeedableRng};
use rand::{thread_rng, Rng};
use smol_str::SmolStr;
use tokio::sync::{Mutex as AsyncMutex, Semaphore, SemaphorePermit};
use tokio::time::{timeout, Duration, Instant};
@@ -32,11 +26,14 @@ use super::{
// saw SNI, before doing TLS handshake. User-side error messages in that case
// does not look very nice (`SSL SYSCALL error: Undefined error: 0`), so for now
// I went with a more expensive way that yields user-friendlier error messages.
pub struct EndpointRateLimiter<Rand = StdRng, Hasher = RandomState> {
map: DashMap<SmolStr, Vec<RateBucket>, Hasher>,
info: &'static [RateBucketInfo],
//
// TODO: add a better bucketing here, e.g. not more than 300 requests per second,
// and not more than 1000 requests per 10 seconds, etc. Short bursts of reconnects
// are normal during redeployments, so we should not block them.
pub struct EndpointRateLimiter {
map: DashMap<SmolStr, Vec<RateBucket>>,
info: Vec<RateBucketInfo>,
access_count: AtomicUsize,
rand: Mutex<Rand>,
}
#[derive(Clone, Copy)]
@@ -63,85 +60,27 @@ impl RateBucket {
}
}
#[derive(Clone, Copy, PartialEq)]
pub struct RateBucketInfo {
pub interval: Duration,
interval: Duration,
// requests per interval
pub max_rpi: u32,
}
impl std::fmt::Display for RateBucketInfo {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let rps = self.max_rpi * 1000 / self.interval.as_millis() as u32;
write!(f, "{rps}@{}", humantime::format_duration(self.interval))
}
}
impl std::fmt::Debug for RateBucketInfo {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{self}")
}
}
impl std::str::FromStr for RateBucketInfo {
type Err = anyhow::Error;
fn from_str(s: &str) -> Result<Self, Self::Err> {
let Some((max_rps, interval)) = s.split_once('@') else {
bail!("invalid rate info")
};
let max_rps = max_rps.parse()?;
let interval = humantime::parse_duration(interval)?;
Ok(Self::new(max_rps, interval))
}
max_rpi: u32,
}
impl RateBucketInfo {
pub const DEFAULT_SET: [Self; 3] = [
Self::new(300, Duration::from_secs(1)),
Self::new(200, Duration::from_secs(60)),
Self::new(100, Duration::from_secs(600)),
];
pub fn validate(info: &mut [Self]) -> anyhow::Result<()> {
info.sort_unstable_by_key(|info| info.interval);
let invalid = info
.iter()
.tuple_windows()
.find(|(a, b)| a.max_rpi > b.max_rpi);
if let Some((a, b)) = invalid {
bail!(
"invalid endpoint RPS limits. {b} allows fewer requests per bucket than {a} ({} vs {})",
b.max_rpi,
a.max_rpi,
);
}
Ok(())
}
pub const fn new(max_rps: u32, interval: Duration) -> Self {
pub fn new(max_rps: u32, interval: Duration) -> Self {
Self {
interval,
max_rpi: max_rps * interval.as_millis() as u32 / 1000,
max_rpi: max_rps * 1000 / interval.as_millis() as u32,
}
}
}
impl EndpointRateLimiter {
pub fn new(info: &'static [RateBucketInfo]) -> Self {
Self::new_with_rand_and_hasher(info, StdRng::from_entropy(), RandomState::new())
}
}
impl<R: Rng, S: BuildHasher + Clone> EndpointRateLimiter<R, S> {
fn new_with_rand_and_hasher(info: &'static [RateBucketInfo], rand: R, hasher: S) -> Self {
info!(buckets = ?info, "endpoint rate limiter");
pub fn new(info: impl IntoIterator<Item = RateBucketInfo>) -> Self {
Self {
info,
map: DashMap::with_hasher_and_shard_amount(hasher, 64),
info: info.into_iter().collect(),
map: DashMap::with_shard_amount(64),
access_count: AtomicUsize::new(1), // start from 1 to avoid GC on the first request
rand: Mutex::new(rand),
}
}
@@ -168,7 +107,7 @@ impl<R: Rng, S: BuildHasher + Clone> EndpointRateLimiter<R, S> {
let should_allow_request = entry
.iter_mut()
.zip(self.info)
.zip(&self.info)
.all(|(bucket, info)| bucket.should_allow_request(info, now));
if should_allow_request {
@@ -188,9 +127,7 @@ impl<R: Rng, S: BuildHasher + Clone> EndpointRateLimiter<R, S> {
self.map.len()
);
let n = self.map.shards().len();
// this lock is ok as the periodic cycle of do_gc makes this very unlikely to collide
// (impossible, infact, unless we have 2048 threads)
let shard = self.rand.lock().unwrap().gen_range(0..n);
let shard = thread_rng().gen_range(0..n);
self.map.shards()[shard].write().clear();
}
}
@@ -233,6 +170,7 @@ pub struct Token<'t> {
#[derive(Debug, Clone, Copy)]
pub struct LimiterState {
limit: usize,
available: usize,
in_flight: usize,
}
@@ -410,7 +348,11 @@ impl Limiter {
pub fn state(&self) -> LimiterState {
let limit = self.limits.load(Ordering::Relaxed);
let in_flight = self.in_flight.load(Ordering::Relaxed);
LimiterState { limit, in_flight }
LimiterState {
limit,
available: limit.saturating_sub(in_flight),
in_flight,
}
}
}
@@ -423,6 +365,13 @@ impl<'t> Token<'t> {
}
}
#[cfg(test)]
pub fn set_latency(&mut self, latency: Duration) {
use std::ops::Sub;
self.start = Instant::now().sub(latency);
}
pub fn forget(&mut self) {
if let Some(permit) = self.permit.take() {
permit.forget();
@@ -441,6 +390,10 @@ impl LimiterState {
pub fn limit(&self) -> usize {
self.limit
}
/// The amount of concurrency available to use.
pub fn available(&self) -> usize {
self.available
}
/// The number of jobs in flight.
pub fn in_flight(&self) -> usize {
self.in_flight
@@ -488,16 +441,12 @@ impl reqwest_middleware::Middleware for Limiter {
#[cfg(test)]
mod tests {
use std::{hash::BuildHasherDefault, pin::pin, task::Context, time::Duration};
use std::{pin::pin, task::Context, time::Duration};
use futures::{task::noop_waker_ref, Future};
use rand::SeedableRng;
use rustc_hash::FxHasher;
use smol_str::SmolStr;
use tokio::time;
use super::{EndpointRateLimiter, Limiter, Outcome};
use crate::rate_limiter::{RateBucketInfo, RateLimitAlgorithm};
use super::{Limiter, Outcome};
use crate::rate_limiter::RateLimitAlgorithm;
#[tokio::test]
async fn it_works() {
@@ -606,105 +555,4 @@ mod tests {
limiter.release(token1, None).await;
limiter.release(token2, None).await;
}
#[test]
fn rate_bucket_rpi() {
let rate_bucket = RateBucketInfo::new(50, Duration::from_secs(5));
assert_eq!(rate_bucket.max_rpi, 50 * 5);
let rate_bucket = RateBucketInfo::new(50, Duration::from_millis(500));
assert_eq!(rate_bucket.max_rpi, 50 / 2);
}
#[test]
fn rate_bucket_parse() {
let rate_bucket: RateBucketInfo = "100@10s".parse().unwrap();
assert_eq!(rate_bucket.interval, Duration::from_secs(10));
assert_eq!(rate_bucket.max_rpi, 100 * 10);
assert_eq!(rate_bucket.to_string(), "100@10s");
let rate_bucket: RateBucketInfo = "100@1m".parse().unwrap();
assert_eq!(rate_bucket.interval, Duration::from_secs(60));
assert_eq!(rate_bucket.max_rpi, 100 * 60);
assert_eq!(rate_bucket.to_string(), "100@1m");
}
#[test]
fn default_rate_buckets() {
let mut defaults = RateBucketInfo::DEFAULT_SET;
RateBucketInfo::validate(&mut defaults[..]).unwrap();
}
#[test]
#[should_panic = "invalid endpoint RPS limits. 10@10s allows fewer requests per bucket than 300@1s (100 vs 300)"]
fn rate_buckets_validate() {
let mut rates: Vec<RateBucketInfo> = ["300@1s", "10@10s"]
.into_iter()
.map(|s| s.parse().unwrap())
.collect();
RateBucketInfo::validate(&mut rates).unwrap();
}
#[tokio::test]
async fn test_rate_limits() {
let mut rates: Vec<RateBucketInfo> = ["100@1s", "20@30s"]
.into_iter()
.map(|s| s.parse().unwrap())
.collect();
RateBucketInfo::validate(&mut rates).unwrap();
let limiter = EndpointRateLimiter::new(Vec::leak(rates));
let endpoint = SmolStr::from("ep-my-endpoint-1234");
time::pause();
for _ in 0..100 {
assert!(limiter.check(endpoint.clone()));
}
// more connections fail
assert!(!limiter.check(endpoint.clone()));
// fail even after 500ms as it's in the same bucket
time::advance(time::Duration::from_millis(500)).await;
assert!(!limiter.check(endpoint.clone()));
// after a full 1s, 100 requests are allowed again
time::advance(time::Duration::from_millis(500)).await;
for _ in 1..6 {
for _ in 0..100 {
assert!(limiter.check(endpoint.clone()));
}
time::advance(time::Duration::from_millis(1000)).await;
}
// more connections after 600 will exceed the 20rps@30s limit
assert!(!limiter.check(endpoint.clone()));
// will still fail before the 30 second limit
time::advance(time::Duration::from_millis(30_000 - 6_000 - 1)).await;
assert!(!limiter.check(endpoint.clone()));
// after the full 30 seconds, 100 requests are allowed again
time::advance(time::Duration::from_millis(1)).await;
for _ in 0..100 {
assert!(limiter.check(endpoint.clone()));
}
}
#[tokio::test]
async fn test_rate_limits_gc() {
// fixed seeded random/hasher to ensure that the test is not flaky
let rand = rand::rngs::StdRng::from_seed([1; 32]);
let hasher = BuildHasherDefault::<FxHasher>::default();
let limiter = EndpointRateLimiter::new_with_rand_and_hasher(
&RateBucketInfo::DEFAULT_SET,
rand,
hasher,
);
for i in 0..1_000_000 {
limiter.check(format!("{i}").into());
}
assert!(limiter.map.len() < 150_000);
}
}

View File

@@ -8,14 +8,14 @@ mod websocket;
use anyhow::bail;
use hyper::StatusCode;
use metrics::IntCounterPairGuard;
pub use reqwest_middleware::{ClientWithMiddleware, Error};
pub use reqwest_retry::{policies::ExponentialBackoff, RetryTransientMiddleware};
use tokio::time;
use tokio_util::task::TaskTracker;
use crate::protocol2::{ProxyProtocolAccept, WithClientIp};
use crate::proxy::NUM_CLIENT_CONNECTION_GAUGE;
use crate::rate_limiter::EndpointRateLimiter;
use crate::proxy::{NUM_CLIENT_CONNECTION_CLOSED_COUNTER, NUM_CLIENT_CONNECTION_OPENED_COUNTER};
use crate::rate_limiter::{EndpointRateLimiter, RateBucketInfo};
use crate::{cancellation::CancelMap, config::ProxyConfig};
use futures::StreamExt;
use hyper::{
@@ -39,13 +39,16 @@ pub async fn task_main(
config: &'static ProxyConfig,
ws_listener: TcpListener,
cancellation_token: CancellationToken,
endpoint_rate_limiter: Arc<EndpointRateLimiter>,
) -> anyhow::Result<()> {
scopeguard::defer! {
info!("websocket server has shut down");
}
let conn_pool = conn_pool::GlobalConnPool::new(config);
let endpoint_rate_limiter = Arc::new(EndpointRateLimiter::new([RateBucketInfo::new(
config.endpoint_rps_limit,
time::Duration::from_secs(1),
)]));
// shutdown the connection pool
tokio::spawn({
@@ -150,17 +153,22 @@ pub async fn task_main(
struct MetricService<S> {
inner: S,
_gauge: IntCounterPairGuard,
}
impl<S> MetricService<S> {
fn new(inner: S) -> MetricService<S> {
MetricService {
inner,
_gauge: NUM_CLIENT_CONNECTION_GAUGE
.with_label_values(&["http"])
.guard(),
}
NUM_CLIENT_CONNECTION_OPENED_COUNTER
.with_label_values(&["http"])
.inc();
MetricService { inner }
}
}
impl<S> Drop for MetricService<S> {
fn drop(&mut self) {
NUM_CLIENT_CONNECTION_CLOSED_COUNTER
.with_label_values(&["http"])
.inc();
}
}
@@ -244,7 +252,7 @@ async fn request_handler(
.header("Access-Control-Allow-Origin", "*")
.header(
"Access-Control-Allow-Headers",
"Neon-Connection-String, Neon-Raw-Text-Output, Neon-Array-Mode, Neon-Pool-Opt-In, Neon-Batch-Read-Only, Neon-Batch-Isolation-Level",
"Neon-Connection-String, Neon-Raw-Text-Output, Neon-Array-Mode, Neon-Pool-Opt-In",
)
.header("Access-Control-Max-Age", "86400" /* 24 hours */)
.status(StatusCode::OK) // 204 is also valid, but see: https://developer.mozilla.org/en-US/docs/Web/HTTP/Methods/OPTIONS#status_code

View File

@@ -24,7 +24,10 @@ use tokio_postgres::{AsyncMessage, ReadyForQueryStatus};
use crate::{
auth::{self, backend::ComputeUserInfo, check_peer_addr_is_in_list},
console,
proxy::{neon_options, LatencyTimer, NUM_DB_CONNECTIONS_GAUGE},
proxy::{
neon_options, LatencyTimer, NUM_DB_CONNECTIONS_CLOSED_COUNTER,
NUM_DB_CONNECTIONS_OPENED_COUNTER,
},
usage_metrics::{Ids, MetricCounter, USAGE_METRICS},
};
use crate::{compute, config};
@@ -474,11 +477,6 @@ async fn connect_to_compute_once(
.connect_timeout(timeout)
.connect(tokio_postgres::NoTls)
.await?;
let conn_gauge = NUM_DB_CONNECTIONS_GAUGE
.with_label_values(&["http"])
.guard();
tracing::Span::current().record("pid", &tracing::field::display(client.get_process_id()));
let (tx, mut rx) = tokio::sync::watch::channel(session);
@@ -494,7 +492,10 @@ async fn connect_to_compute_once(
tokio::spawn(
async move {
let _conn_gauge = conn_gauge;
NUM_DB_CONNECTIONS_OPENED_COUNTER.with_label_values(&["http"]).inc();
scopeguard::defer! {
NUM_DB_CONNECTIONS_CLOSED_COUNTER.with_label_values(&["http"]).inc();
}
poll_fn(move |cx| {
if matches!(rx.has_changed(), Ok(true)) {
session = *rx.borrow_and_update();

View File

@@ -29,7 +29,7 @@ use utils::http::error::ApiError;
use utils::http::json::json_response;
use crate::config::HttpConfig;
use crate::proxy::NUM_CONNECTION_REQUESTS_GAUGE;
use crate::proxy::{NUM_CONNECTIONS_ACCEPTED_COUNTER, NUM_CONNECTIONS_CLOSED_COUNTER};
use super::conn_pool::ConnInfo;
use super::conn_pool::GlobalConnPool;
@@ -303,9 +303,12 @@ async fn handle_inner(
session_id: uuid::Uuid,
peer_addr: IpAddr,
) -> anyhow::Result<Response<Body>> {
let _request_gauge = NUM_CONNECTION_REQUESTS_GAUGE
NUM_CONNECTIONS_ACCEPTED_COUNTER
.with_label_values(&["http"])
.guard();
.inc();
scopeguard::defer! {
NUM_CONNECTIONS_CLOSED_COUNTER.with_label_values(&["http"]).inc();
}
//
// Determine the destination and connection params

View File

@@ -27,15 +27,15 @@ use sync_wrapper::SyncWrapper;
pin_project! {
/// This is a wrapper around a [`WebSocketStream`] that
/// implements [`AsyncRead`] and [`AsyncWrite`].
pub struct WebSocketRw<S = Upgraded> {
pub struct WebSocketRw {
#[pin]
stream: SyncWrapper<WebSocketStream<S>>,
stream: SyncWrapper<WebSocketStream<Upgraded>>,
bytes: Bytes,
}
}
impl<S> WebSocketRw<S> {
pub fn new(stream: WebSocketStream<S>) -> Self {
impl WebSocketRw {
pub fn new(stream: WebSocketStream<Upgraded>) -> Self {
Self {
stream: stream.into(),
bytes: Bytes::new(),
@@ -43,7 +43,7 @@ impl<S> WebSocketRw<S> {
}
}
impl<S: AsyncRead + AsyncWrite + Unpin> AsyncWrite for WebSocketRw<S> {
impl AsyncWrite for WebSocketRw {
fn poll_write(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
@@ -69,7 +69,7 @@ impl<S: AsyncRead + AsyncWrite + Unpin> AsyncWrite for WebSocketRw<S> {
}
}
impl<S: AsyncRead + AsyncWrite + Unpin> AsyncRead for WebSocketRw<S> {
impl AsyncRead for WebSocketRw {
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
@@ -86,7 +86,7 @@ impl<S: AsyncRead + AsyncWrite + Unpin> AsyncRead for WebSocketRw<S> {
}
}
impl<S: AsyncRead + AsyncWrite + Unpin> AsyncBufRead for WebSocketRw<S> {
impl AsyncBufRead for WebSocketRw {
fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> {
// Please refer to poll_fill_buf's documentation.
const EOF: Poll<io::Result<&[u8]>> = Poll::Ready(Ok(&[]));
@@ -151,60 +151,3 @@ pub async fn serve_websocket(
.await?;
Ok(())
}
#[cfg(test)]
mod tests {
use std::pin::pin;
use futures::{SinkExt, StreamExt};
use hyper_tungstenite::{
tungstenite::{protocol::Role, Message},
WebSocketStream,
};
use tokio::{
io::{duplex, AsyncReadExt, AsyncWriteExt},
task::JoinSet,
};
use super::WebSocketRw;
#[tokio::test]
async fn websocket_stream_wrapper_happy_path() {
let (stream1, stream2) = duplex(1024);
let mut js = JoinSet::new();
js.spawn(async move {
let mut client = WebSocketStream::from_raw_socket(stream1, Role::Client, None).await;
client
.send(Message::Binary(b"hello world".to_vec()))
.await
.unwrap();
let message = client.next().await.unwrap().unwrap();
assert_eq!(message, Message::Binary(b"websockets are cool".to_vec()));
client.close(None).await.unwrap();
});
js.spawn(async move {
let mut rw = pin!(WebSocketRw::new(
WebSocketStream::from_raw_socket(stream2, Role::Server, None).await
));
let mut buf = vec![0; 1024];
let n = rw.read(&mut buf).await.unwrap();
assert_eq!(&buf[..n], b"hello world");
rw.write_all(b"websockets are cool").await.unwrap();
rw.flush().await.unwrap();
let n = rw.read_to_end(&mut buf).await.unwrap();
assert_eq!(n, 0);
});
js.join_next().await.unwrap().unwrap();
js.join_next().await.unwrap().unwrap();
}
}

View File

@@ -11,7 +11,7 @@ use tracing::{debug, info, info_span, Instrument};
use crate::auth::check_permission;
use crate::json_ctrl::{handle_json_ctrl, AppendLogicalMessage};
use crate::metrics::{TrafficMetrics, PG_QUERIES_GAUGE};
use crate::metrics::{TrafficMetrics, PG_QUERIES_FINISHED, PG_QUERIES_RECEIVED};
use crate::safekeeper::Term;
use crate::timeline::TimelineError;
use crate::wal_service::ConnectionId;
@@ -210,7 +210,10 @@ impl<IO: AsyncRead + AsyncWrite + Unpin + Send> postgres_backend::Handler<IO>
let cmd = parse_cmd(query_string)?;
let cmd_str = cmd_to_string(&cmd);
let _guard = PG_QUERIES_GAUGE.with_label_values(&[cmd_str]).guard();
PG_QUERIES_RECEIVED.with_label_values(&[cmd_str]).inc();
scopeguard::defer! {
PG_QUERIES_FINISHED.with_label_values(&[cmd_str]).inc();
}
info!("got query {:?}", query_string);

View File

@@ -11,8 +11,7 @@ use futures::Future;
use metrics::{
core::{AtomicU64, Collector, Desc, GenericCounter, GenericGaugeVec, Opts},
proto::MetricFamily,
register_int_counter, register_int_counter_pair_vec, register_int_counter_vec, Gauge,
IntCounter, IntCounterPairVec, IntCounterVec, IntGaugeVec,
register_int_counter, register_int_counter_vec, Gauge, IntCounter, IntCounterVec, IntGaugeVec,
};
use once_cell::sync::Lazy;
@@ -90,10 +89,16 @@ pub static BROKER_PULLED_UPDATES: Lazy<IntCounterVec> = Lazy::new(|| {
)
.expect("Failed to register safekeeper_broker_pulled_updates_total counter")
});
pub static PG_QUERIES_GAUGE: Lazy<IntCounterPairVec> = Lazy::new(|| {
register_int_counter_pair_vec!(
pub static PG_QUERIES_RECEIVED: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"safekeeper_pg_queries_received_total",
"Number of queries received through pg protocol",
&["query"]
)
.expect("Failed to register safekeeper_pg_queries_received_total counter")
});
pub static PG_QUERIES_FINISHED: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"safekeeper_pg_queries_finished_total",
"Number of queries finished through pg protocol",
&["query"]

View File

@@ -2945,7 +2945,7 @@ class Safekeeper:
tli_dir = self.timeline_dir(tenant_id, timeline_id)
segments = []
for _, _, filenames in os.walk(tli_dir):
segments.extend([f for f in filenames if not f.startswith("safekeeper.control")])
segments.extend([f for f in filenames if f != "safekeeper.control"])
segments.sort()
return segments

View File

@@ -322,10 +322,6 @@ class PageserverHttpClient(requests.Session):
self.verbose_error(res)
return TenantConfig.from_json(res.json())
def tenant_heatmap_upload(self, tenant_id: TenantId):
res = self.post(f"http://localhost:{self.port}/v1/tenant/{tenant_id}/heatmap_upload")
self.verbose_error(res)
def set_tenant_config(self, tenant_id: TenantId, config: dict[str, Any]):
assert "tenant_id" not in config.keys()
res = self.put(

View File

@@ -16,7 +16,6 @@ from fixtures.log_helper import log
from fixtures.types import TenantId, TimelineId
TIMELINE_INDEX_PART_FILE_NAME = "index_part.json"
TENANT_HEATMAP_FILE_NAME = "heatmap-v1.json"
@enum.unique
@@ -134,13 +133,6 @@ class LocalFsStorage:
with self.index_path(tenant_id, timeline_id).open("r") as f:
return json.load(f)
def heatmap_path(self, tenant_id: TenantId) -> Path:
return self.tenant_path(tenant_id) / TENANT_HEATMAP_FILE_NAME
def heatmap_content(self, tenant_id):
with self.heatmap_path(tenant_id).open("r") as f:
return json.load(f)
def to_toml_inline_table(self) -> str:
rv = {
"local_path": str(self.root),

View File

@@ -163,7 +163,6 @@ def test_fully_custom_config(positive_env: NeonEnv):
"gc_feedback": True,
"gc_horizon": 23 * (1024 * 1024),
"gc_period": "2h 13m",
"heatmap_period": "10m",
"image_creation_threshold": 7,
"pitr_interval": "1m",
"lagging_wal_timeout": "23m",

View File

@@ -1,7 +1,8 @@
import random
import threading
import time
from typing import List
from queue import SimpleQueue
from typing import Any, Dict, List, Union
import pytest
from fixtures.log_helper import log
@@ -238,6 +239,92 @@ def test_cannot_branch_from_non_uploaded_branch(neon_env_builder: NeonEnvBuilder
t.join()
def test_competing_branchings_from_loading_race_to_ok_or_err(neon_env_builder: NeonEnvBuilder):
"""
If the activate only after upload is used, then retries could become competing.
"""
env = neon_env_builder.init_configs()
env.start()
env.pageserver.allowed_errors.extend(
[
".*request{method=POST path=/v1/tenant/.*/timeline request_id=.*}: request was dropped before completing.*",
".*Error processing HTTP request: InternalServerError\\(Timeline .*/.* already exists in pageserver's memory",
]
)
ps_http = env.pageserver.http_client()
# pause all uploads
ps_http.configure_failpoints(("before-upload-index-pausable", "pause"))
env.pageserver.tenant_create(env.initial_tenant)
def start_creating_timeline():
ps_http.timeline_create(
env.pg_version, env.initial_tenant, env.initial_timeline, timeout=60
)
create_root = threading.Thread(target=start_creating_timeline)
branch_id = TimelineId.generate()
queue: SimpleQueue[Union[Dict[Any, Any], Exception]] = SimpleQueue()
barrier = threading.Barrier(3)
def try_branch():
barrier.wait()
barrier.wait()
try:
ret = ps_http.timeline_create(
env.pg_version,
env.initial_tenant,
branch_id,
ancestor_timeline_id=env.initial_timeline,
timeout=5,
)
queue.put(ret)
except Exception as e:
queue.put(e)
threads = [threading.Thread(target=try_branch) for _ in range(2)]
try:
create_root.start()
for t in threads:
t.start()
wait_until_paused(env, "before-upload-index-pausable")
barrier.wait()
ps_http.configure_failpoints(("before-upload-index-pausable", "off"))
barrier.wait()
# now both requests race to branch, only one can win because they take gc_cs, Tenant::timelines or marker files
first = queue.get()
second = queue.get()
log.info(first)
log.info(second)
(succeeded, failed) = (first, second) if isinstance(second, Exception) else (second, first)
assert isinstance(failed, Exception)
assert isinstance(succeeded, Dict)
# there's multiple valid status codes:
# - Timeline x/y already exists
# - whatever 409 response says, but that is a subclass of PageserverApiException
assert isinstance(failed, PageserverApiException)
assert succeeded["state"] == "Active"
finally:
# we might still have the failpoint active
env.pageserver.stop(immediate=True)
for t in threads:
t.join()
create_root.join()
def test_non_uploaded_root_timeline_is_deleted_after_restart(neon_env_builder: NeonEnvBuilder):
"""
Check that a timeline is deleted locally on subsequent restart if it never successfully uploaded during creation.

View File

@@ -273,24 +273,9 @@ def check_neon_works(env: NeonEnv, test_output_dir: Path, sql_dump_path: Path, r
timeline_id = env.initial_timeline
pg_version = env.pg_version
# Delete all files from local_fs_remote_storage except initdb.tar.zst,
# the file is required for `timeline_create` with `existing_initdb_timeline_id`.
#
# TODO: switch to Path.walk() in Python 3.12
# for dirpath, _dirnames, filenames in (repo_dir / "local_fs_remote_storage").walk():
for dirpath, _dirnames, filenames in os.walk(repo_dir / "local_fs_remote_storage"):
for filename in filenames:
if filename != "initdb.tar.zst":
(Path(dirpath) / filename).unlink()
shutil.rmtree(repo_dir / "local_fs_remote_storage")
timeline_delete_wait_completed(pageserver_http, tenant_id, timeline_id)
pageserver_http.timeline_create(
pg_version=pg_version,
tenant_id=tenant_id,
new_timeline_id=timeline_id,
existing_initdb_timeline_id=timeline_id,
)
pageserver_http.timeline_create(pg_version, tenant_id, timeline_id)
pg_bin.run_capture(
["pg_dumpall", f"--dbname={connstr}", f"--file={test_output_dir / 'dump-from-wal.sql'}"]
)

View File

@@ -4,7 +4,7 @@ from typing import Any, Dict, Optional
import pytest
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnvBuilder, NeonPageserver
from fixtures.remote_storage import LocalFsStorage, RemoteStorageKind
from fixtures.remote_storage import RemoteStorageKind
from fixtures.types import TenantId, TimelineId
from fixtures.utils import wait_until
from fixtures.workload import Workload
@@ -330,46 +330,3 @@ def test_live_migration(neon_env_builder: NeonEnvBuilder):
workload.churn_rows(64, pageserver_b.id)
workload.validate(pageserver_b.id)
def test_heatmap_uploads(neon_env_builder: NeonEnvBuilder):
"""
Test the sequence of location states that are used in a live migration.
"""
env = neon_env_builder.init_start() # initial_tenant_conf=TENANT_CONF)
assert isinstance(env.pageserver_remote_storage, LocalFsStorage)
tenant_id = env.initial_tenant
timeline_id = env.initial_timeline
# Write some data so that we have some layers
workload = Workload(env, tenant_id, timeline_id)
workload.init(env.pageservers[0].id)
# Write some layers and upload a heatmap
workload.write_rows(256, env.pageservers[0].id)
env.pageserver.http_client().tenant_heatmap_upload(tenant_id)
def validate_heatmap(heatmap):
assert len(heatmap["timelines"]) == 1
assert heatmap["timelines"][0]["timeline_id"] == str(timeline_id)
assert len(heatmap["timelines"][0]["layers"]) > 0
layers = heatmap["timelines"][0]["layers"]
# Each layer appears at most once
assert len(set(layer["name"] for layer in layers)) == len(layers)
# Download and inspect the heatmap that the pageserver uploaded
heatmap_first = env.pageserver_remote_storage.heatmap_content(tenant_id)
log.info(f"Read back heatmap: {heatmap_first}")
validate_heatmap(heatmap_first)
# Do some more I/O to generate more layers
workload.churn_rows(64, env.pageservers[0].id)
env.pageserver.http_client().tenant_heatmap_upload(tenant_id)
# Ensure that another heatmap upload includes the new layers
heatmap_second = env.pageserver_remote_storage.heatmap_content(tenant_id)
log.info(f"Read back heatmap: {heatmap_second}")
assert heatmap_second != heatmap_first
validate_heatmap(heatmap_second)

View File

@@ -1,6 +1,7 @@
import sys
import tarfile
import tempfile
import time
from pathlib import Path
import pytest
@@ -11,7 +12,6 @@ from fixtures.neon_fixtures import (
PgBin,
VanillaPostgres,
)
from fixtures.pageserver.utils import timeline_delete_wait_completed
from fixtures.port_distributor import PortDistributor
from fixtures.remote_storage import LocalFsStorage
from fixtures.types import Lsn, TenantId, TimelineId
@@ -128,7 +128,10 @@ def test_wal_restore_initdb(
assert restored.safe_psql("select count(*) from t", user="cloud_admin") == [(300000,)]
def test_wal_restore_http(neon_env_builder: NeonEnvBuilder):
def test_wal_restore_http(
neon_env_builder: NeonEnvBuilder,
test_output_dir: Path,
):
env = neon_env_builder.init_start()
endpoint = env.endpoints.create_start("main")
endpoint.safe_psql("create table t as select generate_series(1,300000)")
@@ -142,7 +145,15 @@ def test_wal_restore_http(neon_env_builder: NeonEnvBuilder):
assert isinstance(env.pageserver_remote_storage, LocalFsStorage)
timeline_delete_wait_completed(ps_client, tenant_id, timeline_id)
test_output_dir / "initdb.tar.zst"
(env.pageserver_remote_storage.timeline_path(tenant_id, timeline_id) / "initdb.tar.zst")
ps_client.timeline_delete(tenant_id, timeline_id)
time.sleep(2)
# verify that it is indeed deleted
# TODO
# issue the restoration command
ps_client.timeline_create(

View File

@@ -1,5 +1,5 @@
{
"postgres-v16": "863b71572bc441581efb3bbee2ad18af037be1bb",
"postgres-v15": "24333abb81a9ecae4541019478f0bf7d0b289df7",
"postgres-v14": "0bb356aa0cd1582112926fbcf0b5370222c2db6d"
"postgres-v16": "e3a22b72922055f9212eca12700190f118578362",
"postgres-v15": "bc88f539312fcc4bb292ce94ae9db09ab6656e8a",
"postgres-v14": "dd067cf656f6810a25aca6025633d32d02c5085a"
}