mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-20 06:30:43 +00:00
Merge remote-tracking branch 'origin/main' into problame/standby-horizon-removal-poc-rip-out
This commit is contained in:
@@ -145,7 +145,7 @@ pub struct PageServerConf {
|
||||
pub metric_collection_bucket: Option<RemoteStorageConfig>,
|
||||
pub synthetic_size_calculation_interval: Duration,
|
||||
|
||||
pub disk_usage_based_eviction: Option<DiskUsageEvictionTaskConfig>,
|
||||
pub disk_usage_based_eviction: DiskUsageEvictionTaskConfig,
|
||||
|
||||
pub test_remote_failures: u64,
|
||||
|
||||
@@ -706,9 +706,12 @@ impl ConfigurableSemaphore {
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
|
||||
use std::time::Duration;
|
||||
|
||||
use camino::Utf8PathBuf;
|
||||
use pageserver_api::config::{DiskUsageEvictionTaskConfig, EvictionOrder};
|
||||
use rstest::rstest;
|
||||
use utils::id::NodeId;
|
||||
use utils::{id::NodeId, serde_percent::Percent};
|
||||
|
||||
use super::PageServerConf;
|
||||
|
||||
@@ -807,4 +810,70 @@ mod tests {
|
||||
PageServerConf::parse_and_validate(NodeId(0), config_toml, &workdir)
|
||||
.expect("parse_and_validate");
|
||||
}
|
||||
|
||||
#[rstest]
|
||||
#[
|
||||
case::omit_the_whole_config(
|
||||
DiskUsageEvictionTaskConfig {
|
||||
max_usage_pct: Percent::new(80).unwrap(),
|
||||
min_avail_bytes: 2_000_000_000,
|
||||
period: Duration::from_secs(60),
|
||||
eviction_order: Default::default(),
|
||||
#[cfg(feature = "testing")]
|
||||
mock_statvfs: None,
|
||||
enabled: true,
|
||||
},
|
||||
r#"
|
||||
control_plane_api = "http://localhost:6666"
|
||||
"#,
|
||||
)]
|
||||
#[
|
||||
case::omit_enabled_field(
|
||||
DiskUsageEvictionTaskConfig {
|
||||
max_usage_pct: Percent::new(80).unwrap(),
|
||||
min_avail_bytes: 1_000_000_000,
|
||||
period: Duration::from_secs(60),
|
||||
eviction_order: EvictionOrder::RelativeAccessed {
|
||||
highest_layer_count_loses_first: true,
|
||||
},
|
||||
#[cfg(feature = "testing")]
|
||||
mock_statvfs: None,
|
||||
enabled: true,
|
||||
},
|
||||
r#"
|
||||
control_plane_api = "http://localhost:6666"
|
||||
disk_usage_based_eviction = { max_usage_pct = 80, min_avail_bytes = 1000000000, period = "60s" }
|
||||
"#,
|
||||
)]
|
||||
#[case::disabled(
|
||||
DiskUsageEvictionTaskConfig {
|
||||
max_usage_pct: Percent::new(80).unwrap(),
|
||||
min_avail_bytes: 2_000_000_000,
|
||||
period: Duration::from_secs(60),
|
||||
eviction_order: EvictionOrder::RelativeAccessed {
|
||||
highest_layer_count_loses_first: true,
|
||||
},
|
||||
#[cfg(feature = "testing")]
|
||||
mock_statvfs: None,
|
||||
enabled: false,
|
||||
},
|
||||
r#"
|
||||
control_plane_api = "http://localhost:6666"
|
||||
disk_usage_based_eviction = { enabled = false }
|
||||
"#
|
||||
)]
|
||||
fn test_config_disk_usage_based_eviction_is_valid(
|
||||
#[case] expected_disk_usage_based_eviction: DiskUsageEvictionTaskConfig,
|
||||
#[case] input: &str,
|
||||
) {
|
||||
let config_toml = toml_edit::de::from_str::<pageserver_api::config::ConfigToml>(input)
|
||||
.expect("disk_usage_based_eviction is valid");
|
||||
let workdir = Utf8PathBuf::from("/nonexistent");
|
||||
let config = PageServerConf::parse_and_validate(NodeId(0), config_toml, &workdir).unwrap();
|
||||
let disk_usage_based_eviction = config.disk_usage_based_eviction;
|
||||
assert_eq!(
|
||||
expected_disk_usage_based_eviction,
|
||||
disk_usage_based_eviction
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -99,7 +99,7 @@ pub(super) async fn upload_metrics_bucket(
|
||||
|
||||
// Compose object path
|
||||
let datetime: DateTime<Utc> = SystemTime::now().into();
|
||||
let ts_prefix = datetime.format("year=%Y/month=%m/day=%d/%H:%M:%SZ");
|
||||
let ts_prefix = datetime.format("year=%Y/month=%m/day=%d/hour=%H/%H:%M:%SZ");
|
||||
let path = RemotePath::from_string(&format!("{ts_prefix}_{node_id}.ndjson.gz"))?;
|
||||
|
||||
// Set up a gzip writer into a buffer
|
||||
@@ -109,7 +109,7 @@ pub(super) async fn upload_metrics_bucket(
|
||||
|
||||
// Serialize and write into compressed buffer
|
||||
let started_at = std::time::Instant::now();
|
||||
for res in serialize_in_chunks(CHUNK_SIZE, metrics, idempotency_keys) {
|
||||
for res in serialize_in_chunks_ndjson(CHUNK_SIZE, metrics, idempotency_keys) {
|
||||
let (_chunk, body) = res?;
|
||||
gzip_writer.write_all(&body).await?;
|
||||
}
|
||||
@@ -216,6 +216,86 @@ fn serialize_in_chunks<'a>(
|
||||
}
|
||||
}
|
||||
|
||||
/// Serializes the input metrics as NDJSON in chunks of chunk_size. Each event
|
||||
/// is serialized as a separate JSON object on its own line. The provided
|
||||
/// idempotency keys are injected into the corresponding metric events (reused
|
||||
/// across different metrics sinks), and must have the same length as input.
|
||||
fn serialize_in_chunks_ndjson<'a>(
|
||||
chunk_size: usize,
|
||||
input: &'a [NewRawMetric],
|
||||
idempotency_keys: &'a [IdempotencyKey<'a>],
|
||||
) -> impl ExactSizeIterator<Item = Result<(&'a [NewRawMetric], bytes::Bytes), serde_json::Error>> + 'a
|
||||
{
|
||||
use bytes::BufMut;
|
||||
|
||||
assert_eq!(input.len(), idempotency_keys.len());
|
||||
|
||||
struct Iter<'a> {
|
||||
inner: std::slice::Chunks<'a, NewRawMetric>,
|
||||
idempotency_keys: std::slice::Iter<'a, IdempotencyKey<'a>>,
|
||||
chunk_size: usize,
|
||||
|
||||
// write to a BytesMut so that we can cheaply clone the frozen Bytes for retries
|
||||
buffer: bytes::BytesMut,
|
||||
// chunk amount of events are reused to produce the serialized document
|
||||
scratch: Vec<Event<Ids, Name>>,
|
||||
}
|
||||
|
||||
impl<'a> Iterator for Iter<'a> {
|
||||
type Item = Result<(&'a [NewRawMetric], bytes::Bytes), serde_json::Error>;
|
||||
|
||||
fn next(&mut self) -> Option<Self::Item> {
|
||||
let chunk = self.inner.next()?;
|
||||
|
||||
if self.scratch.is_empty() {
|
||||
// first round: create events with N strings
|
||||
self.scratch.extend(
|
||||
chunk
|
||||
.iter()
|
||||
.zip(&mut self.idempotency_keys)
|
||||
.map(|(raw_metric, key)| raw_metric.as_event(key)),
|
||||
);
|
||||
} else {
|
||||
// next rounds: update_in_place to reuse allocations
|
||||
assert_eq!(self.scratch.len(), self.chunk_size);
|
||||
itertools::izip!(self.scratch.iter_mut(), chunk, &mut self.idempotency_keys)
|
||||
.for_each(|(slot, raw_metric, key)| raw_metric.update_in_place(slot, key));
|
||||
}
|
||||
|
||||
// Serialize each event as NDJSON (one JSON object per line)
|
||||
for event in self.scratch[..chunk.len()].iter() {
|
||||
let res = serde_json::to_writer((&mut self.buffer).writer(), event);
|
||||
if let Err(e) = res {
|
||||
return Some(Err(e));
|
||||
}
|
||||
// Add newline after each event to follow NDJSON format
|
||||
self.buffer.put_u8(b'\n');
|
||||
}
|
||||
|
||||
Some(Ok((chunk, self.buffer.split().freeze())))
|
||||
}
|
||||
|
||||
fn size_hint(&self) -> (usize, Option<usize>) {
|
||||
self.inner.size_hint()
|
||||
}
|
||||
}
|
||||
|
||||
impl ExactSizeIterator for Iter<'_> {}
|
||||
|
||||
let buffer = bytes::BytesMut::new();
|
||||
let inner = input.chunks(chunk_size);
|
||||
let idempotency_keys = idempotency_keys.iter();
|
||||
let scratch = Vec::new();
|
||||
|
||||
Iter {
|
||||
inner,
|
||||
idempotency_keys,
|
||||
chunk_size,
|
||||
buffer,
|
||||
scratch,
|
||||
}
|
||||
}
|
||||
|
||||
trait RawMetricExt {
|
||||
fn as_event(&self, key: &IdempotencyKey<'_>) -> Event<Ids, Name>;
|
||||
fn update_in_place(&self, event: &mut Event<Ids, Name>, key: &IdempotencyKey<'_>);
|
||||
@@ -479,6 +559,43 @@ mod tests {
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn chunked_serialization_ndjson() {
|
||||
let examples = metric_samples();
|
||||
assert!(examples.len() > 1);
|
||||
|
||||
let now = Utc::now();
|
||||
let idempotency_keys = (0..examples.len())
|
||||
.map(|i| FixedGen::new(now, "1", i as u16).generate())
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
// Parse NDJSON format - each line is a separate JSON object
|
||||
let parse_ndjson = |body: &[u8]| -> Vec<Event<Ids, Name>> {
|
||||
let body_str = std::str::from_utf8(body).unwrap();
|
||||
body_str
|
||||
.trim_end_matches('\n')
|
||||
.lines()
|
||||
.filter(|line| !line.is_empty())
|
||||
.map(|line| serde_json::from_str::<Event<Ids, Name>>(line).unwrap())
|
||||
.collect()
|
||||
};
|
||||
|
||||
let correct = serialize_in_chunks_ndjson(examples.len(), &examples, &idempotency_keys)
|
||||
.map(|res| res.unwrap().1)
|
||||
.flat_map(|body| parse_ndjson(&body))
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
for chunk_size in 1..examples.len() {
|
||||
let actual = serialize_in_chunks_ndjson(chunk_size, &examples, &idempotency_keys)
|
||||
.map(|res| res.unwrap().1)
|
||||
.flat_map(|body| parse_ndjson(&body))
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
// if these are equal, it means that multi-chunking version works as well
|
||||
assert_eq!(correct, actual);
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy)]
|
||||
struct FixedGen<'a>(chrono::DateTime<chrono::Utc>, &'a str, u16);
|
||||
|
||||
|
||||
@@ -171,7 +171,8 @@ pub fn launch_disk_usage_global_eviction_task(
|
||||
tenant_manager: Arc<TenantManager>,
|
||||
background_jobs_barrier: completion::Barrier,
|
||||
) -> Option<DiskUsageEvictionTask> {
|
||||
let Some(task_config) = &conf.disk_usage_based_eviction else {
|
||||
let task_config = &conf.disk_usage_based_eviction;
|
||||
if !task_config.enabled {
|
||||
info!("disk usage based eviction task not configured");
|
||||
return None;
|
||||
};
|
||||
@@ -458,6 +459,9 @@ pub(crate) async fn disk_usage_eviction_task_iteration_impl<U: Usage>(
|
||||
match next {
|
||||
Ok(Ok(file_size)) => {
|
||||
METRICS.layers_evicted.inc();
|
||||
/*BEGIN_HADRON */
|
||||
METRICS.bytes_evicted.inc_by(file_size);
|
||||
/*END_HADRON */
|
||||
usage_assumed.add_available_bytes(file_size);
|
||||
}
|
||||
Ok(Err((
|
||||
@@ -1265,6 +1269,7 @@ mod filesystem_level_usage {
|
||||
#[cfg(feature = "testing")]
|
||||
mock_statvfs: None,
|
||||
eviction_order: pageserver_api::config::EvictionOrder::default(),
|
||||
enabled: true,
|
||||
},
|
||||
total_bytes: 100_000,
|
||||
avail_bytes: 0,
|
||||
|
||||
@@ -1,4 +1,8 @@
|
||||
use std::{collections::HashMap, sync::Arc, time::Duration};
|
||||
use std::{
|
||||
collections::HashMap,
|
||||
sync::{Arc, atomic::AtomicBool},
|
||||
time::Duration,
|
||||
};
|
||||
|
||||
use arc_swap::ArcSwap;
|
||||
use pageserver_api::config::NodeMetadata;
|
||||
@@ -6,12 +10,13 @@ use posthog_client_lite::{
|
||||
CaptureEvent, FeatureResolverBackgroundLoop, PostHogEvaluationError,
|
||||
PostHogFlagFilterPropertyValue,
|
||||
};
|
||||
use rand::Rng;
|
||||
use remote_storage::RemoteStorageKind;
|
||||
use serde_json::json;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use utils::id::TenantId;
|
||||
|
||||
use crate::{config::PageServerConf, metrics::FEATURE_FLAG_EVALUATION};
|
||||
use crate::{config::PageServerConf, metrics::FEATURE_FLAG_EVALUATION, tenant::TenantShard};
|
||||
|
||||
const DEFAULT_POSTHOG_REFRESH_INTERVAL: Duration = Duration::from_secs(600);
|
||||
|
||||
@@ -138,6 +143,7 @@ impl FeatureResolver {
|
||||
}
|
||||
Arc::new(properties)
|
||||
};
|
||||
|
||||
let fake_tenants = {
|
||||
let mut tenants = Vec::new();
|
||||
for i in 0..10 {
|
||||
@@ -147,9 +153,16 @@ impl FeatureResolver {
|
||||
conf.id,
|
||||
i
|
||||
);
|
||||
|
||||
let tenant_properties = PerTenantProperties {
|
||||
remote_size_mb: Some(rand::thread_rng().gen_range(100.0..1000000.00)),
|
||||
}
|
||||
.into_posthog_properties();
|
||||
|
||||
let properties = Self::collect_properties_inner(
|
||||
distinct_id.clone(),
|
||||
Some(&internal_properties),
|
||||
&tenant_properties,
|
||||
);
|
||||
tenants.push(CaptureEvent {
|
||||
event: "initial_tenant_report".to_string(),
|
||||
@@ -183,6 +196,7 @@ impl FeatureResolver {
|
||||
fn collect_properties_inner(
|
||||
tenant_id: String,
|
||||
internal_properties: Option<&HashMap<String, PostHogFlagFilterPropertyValue>>,
|
||||
tenant_properties: &HashMap<String, PostHogFlagFilterPropertyValue>,
|
||||
) -> HashMap<String, PostHogFlagFilterPropertyValue> {
|
||||
let mut properties = HashMap::new();
|
||||
if let Some(internal_properties) = internal_properties {
|
||||
@@ -194,6 +208,9 @@ impl FeatureResolver {
|
||||
"tenant_id".to_string(),
|
||||
PostHogFlagFilterPropertyValue::String(tenant_id),
|
||||
);
|
||||
for (key, value) in tenant_properties.iter() {
|
||||
properties.insert(key.clone(), value.clone());
|
||||
}
|
||||
properties
|
||||
}
|
||||
|
||||
@@ -201,8 +218,13 @@ impl FeatureResolver {
|
||||
pub(crate) fn collect_properties(
|
||||
&self,
|
||||
tenant_id: TenantId,
|
||||
tenant_properties: &HashMap<String, PostHogFlagFilterPropertyValue>,
|
||||
) -> HashMap<String, PostHogFlagFilterPropertyValue> {
|
||||
Self::collect_properties_inner(tenant_id.to_string(), self.internal_properties.as_deref())
|
||||
Self::collect_properties_inner(
|
||||
tenant_id.to_string(),
|
||||
self.internal_properties.as_deref(),
|
||||
tenant_properties,
|
||||
)
|
||||
}
|
||||
|
||||
/// Evaluate a multivariate feature flag. Currently, we do not support any properties.
|
||||
@@ -214,6 +236,7 @@ impl FeatureResolver {
|
||||
&self,
|
||||
flag_key: &str,
|
||||
tenant_id: TenantId,
|
||||
tenant_properties: &HashMap<String, PostHogFlagFilterPropertyValue>,
|
||||
) -> Result<String, PostHogEvaluationError> {
|
||||
let force_overrides = self.force_overrides_for_testing.load();
|
||||
if let Some(value) = force_overrides.get(flag_key) {
|
||||
@@ -224,7 +247,7 @@ impl FeatureResolver {
|
||||
let res = inner.feature_store().evaluate_multivariate(
|
||||
flag_key,
|
||||
&tenant_id.to_string(),
|
||||
&self.collect_properties(tenant_id),
|
||||
&self.collect_properties(tenant_id, tenant_properties),
|
||||
);
|
||||
match &res {
|
||||
Ok(value) => {
|
||||
@@ -257,6 +280,7 @@ impl FeatureResolver {
|
||||
&self,
|
||||
flag_key: &str,
|
||||
tenant_id: TenantId,
|
||||
tenant_properties: &HashMap<String, PostHogFlagFilterPropertyValue>,
|
||||
) -> Result<(), PostHogEvaluationError> {
|
||||
let force_overrides = self.force_overrides_for_testing.load();
|
||||
if let Some(value) = force_overrides.get(flag_key) {
|
||||
@@ -271,7 +295,7 @@ impl FeatureResolver {
|
||||
let res = inner.feature_store().evaluate_boolean(
|
||||
flag_key,
|
||||
&tenant_id.to_string(),
|
||||
&self.collect_properties(tenant_id),
|
||||
&self.collect_properties(tenant_id, tenant_properties),
|
||||
);
|
||||
match &res {
|
||||
Ok(()) => {
|
||||
@@ -317,3 +341,93 @@ impl FeatureResolver {
|
||||
.store(Arc::new(force_overrides));
|
||||
}
|
||||
}
|
||||
|
||||
struct PerTenantProperties {
|
||||
pub remote_size_mb: Option<f64>,
|
||||
}
|
||||
|
||||
impl PerTenantProperties {
|
||||
pub fn into_posthog_properties(self) -> HashMap<String, PostHogFlagFilterPropertyValue> {
|
||||
let mut properties = HashMap::new();
|
||||
if let Some(remote_size_mb) = self.remote_size_mb {
|
||||
properties.insert(
|
||||
"tenant_remote_size_mb".to_string(),
|
||||
PostHogFlagFilterPropertyValue::Number(remote_size_mb),
|
||||
);
|
||||
}
|
||||
properties
|
||||
}
|
||||
}
|
||||
|
||||
pub struct TenantFeatureResolver {
|
||||
inner: FeatureResolver,
|
||||
tenant_id: TenantId,
|
||||
cached_tenant_properties: ArcSwap<HashMap<String, PostHogFlagFilterPropertyValue>>,
|
||||
|
||||
// Add feature flag on the critical path below.
|
||||
//
|
||||
// If a feature flag will be used on the critical path, we will update it in the tenant housekeeping loop insetad of
|
||||
// resolving directly by calling `evaluate_multivariate` or `evaluate_boolean`. Remember to update the flag in the
|
||||
// housekeeping loop. The user should directly read this atomic flag instead of using the set of evaluate functions.
|
||||
pub feature_test_remote_size_flag: AtomicBool,
|
||||
}
|
||||
|
||||
impl TenantFeatureResolver {
|
||||
pub fn new(inner: FeatureResolver, tenant_id: TenantId) -> Self {
|
||||
Self {
|
||||
inner,
|
||||
tenant_id,
|
||||
cached_tenant_properties: ArcSwap::new(Arc::new(HashMap::new())),
|
||||
feature_test_remote_size_flag: AtomicBool::new(false),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn evaluate_multivariate(&self, flag_key: &str) -> Result<String, PostHogEvaluationError> {
|
||||
self.inner.evaluate_multivariate(
|
||||
flag_key,
|
||||
self.tenant_id,
|
||||
&self.cached_tenant_properties.load(),
|
||||
)
|
||||
}
|
||||
|
||||
pub fn evaluate_boolean(&self, flag_key: &str) -> Result<(), PostHogEvaluationError> {
|
||||
self.inner.evaluate_boolean(
|
||||
flag_key,
|
||||
self.tenant_id,
|
||||
&self.cached_tenant_properties.load(),
|
||||
)
|
||||
}
|
||||
|
||||
pub fn collect_properties(&self) -> HashMap<String, PostHogFlagFilterPropertyValue> {
|
||||
self.inner
|
||||
.collect_properties(self.tenant_id, &self.cached_tenant_properties.load())
|
||||
}
|
||||
|
||||
pub fn is_feature_flag_boolean(&self, flag_key: &str) -> Result<bool, PostHogEvaluationError> {
|
||||
self.inner.is_feature_flag_boolean(flag_key)
|
||||
}
|
||||
|
||||
/// Refresh the cached properties and flags on the critical path.
|
||||
pub fn refresh_properties_and_flags(&self, tenant_shard: &TenantShard) {
|
||||
let mut remote_size_mb = None;
|
||||
for timeline in tenant_shard.list_timelines() {
|
||||
let size = timeline.metrics.resident_physical_size_get();
|
||||
if size == 0 {
|
||||
remote_size_mb = None;
|
||||
}
|
||||
if let Some(ref mut remote_size_mb) = remote_size_mb {
|
||||
*remote_size_mb += size as f64 / 1024.0 / 1024.0;
|
||||
}
|
||||
}
|
||||
self.cached_tenant_properties.store(Arc::new(
|
||||
PerTenantProperties { remote_size_mb }.into_posthog_properties(),
|
||||
));
|
||||
|
||||
// BEGIN: Update the feature flag on the critical path.
|
||||
self.feature_test_remote_size_flag.store(
|
||||
self.evaluate_boolean("test-remote-size-flag").is_ok(),
|
||||
std::sync::atomic::Ordering::Relaxed,
|
||||
);
|
||||
// END: Update the feature flag on the critical path.
|
||||
}
|
||||
}
|
||||
|
||||
@@ -61,6 +61,7 @@ use crate::context;
|
||||
use crate::context::{DownloadBehavior, RequestContext, RequestContextBuilder};
|
||||
use crate::deletion_queue::DeletionQueueClient;
|
||||
use crate::feature_resolver::FeatureResolver;
|
||||
use crate::metrics::LOCAL_DATA_LOSS_SUSPECTED;
|
||||
use crate::pgdatadir_mapping::LsnForTimestamp;
|
||||
use crate::task_mgr::TaskKind;
|
||||
use crate::tenant::config::LocationConf;
|
||||
@@ -2438,6 +2439,7 @@ async fn timeline_offload_handler(
|
||||
.map_err(|e| {
|
||||
match e {
|
||||
OffloadError::Cancelled => ApiError::ResourceUnavailable("Timeline shutting down".into()),
|
||||
OffloadError::AlreadyInProgress => ApiError::Conflict("Timeline already being offloaded or deleted".into()),
|
||||
_ => ApiError::InternalServerError(anyhow!(e))
|
||||
}
|
||||
})?;
|
||||
@@ -2500,10 +2502,7 @@ async fn timeline_checkpoint_handler(
|
||||
.map_err(|e|
|
||||
match e {
|
||||
CompactionError::ShuttingDown => ApiError::ShuttingDown,
|
||||
CompactionError::Offload(e) => ApiError::InternalServerError(anyhow::anyhow!(e)),
|
||||
CompactionError::CollectKeySpaceError(e) => ApiError::InternalServerError(anyhow::anyhow!(e)),
|
||||
CompactionError::Other(e) => ApiError::InternalServerError(e),
|
||||
CompactionError::AlreadyRunning(_) => ApiError::InternalServerError(anyhow::anyhow!(e)),
|
||||
}
|
||||
)?;
|
||||
}
|
||||
@@ -3629,6 +3628,17 @@ async fn activate_post_import_handler(
|
||||
.await
|
||||
}
|
||||
|
||||
// [Hadron] Reset gauge metrics that are used to raised alerts. We need this API as a stop-gap measure to reset alerts
|
||||
// after we manually rectify situations such as local SSD data loss. We will eventually automate this.
|
||||
async fn hadron_reset_alert_gauges(
|
||||
request: Request<Body>,
|
||||
_cancel: CancellationToken,
|
||||
) -> Result<Response<Body>, ApiError> {
|
||||
check_permission(&request, None)?;
|
||||
LOCAL_DATA_LOSS_SUSPECTED.set(0);
|
||||
json_response(StatusCode::OK, ())
|
||||
}
|
||||
|
||||
/// Read the end of a tar archive.
|
||||
///
|
||||
/// A tar archive normally ends with two consecutive blocks of zeros, 512 bytes each.
|
||||
@@ -3697,23 +3707,25 @@ async fn tenant_evaluate_feature_flag(
|
||||
let tenant = state
|
||||
.tenant_manager
|
||||
.get_attached_tenant_shard(tenant_shard_id)?;
|
||||
let properties = tenant.feature_resolver.collect_properties(tenant_shard_id.tenant_id);
|
||||
// TODO: the properties we get here might be stale right after it is collected. But such races are rare (updated every 10s)
|
||||
// and we don't need to worry about it for now.
|
||||
let properties = tenant.feature_resolver.collect_properties();
|
||||
if as_type.as_deref() == Some("boolean") {
|
||||
let result = tenant.feature_resolver.evaluate_boolean(&flag, tenant_shard_id.tenant_id);
|
||||
let result = tenant.feature_resolver.evaluate_boolean(&flag);
|
||||
let result = result.map(|_| true).map_err(|e| e.to_string());
|
||||
json_response(StatusCode::OK, json!({ "result": result, "properties": properties }))
|
||||
} else if as_type.as_deref() == Some("multivariate") {
|
||||
let result = tenant.feature_resolver.evaluate_multivariate(&flag, tenant_shard_id.tenant_id).map_err(|e| e.to_string());
|
||||
let result = tenant.feature_resolver.evaluate_multivariate(&flag).map_err(|e| e.to_string());
|
||||
json_response(StatusCode::OK, json!({ "result": result, "properties": properties }))
|
||||
} else {
|
||||
// Auto infer the type of the feature flag.
|
||||
let is_boolean = tenant.feature_resolver.is_feature_flag_boolean(&flag).map_err(|e| ApiError::InternalServerError(anyhow::anyhow!("{e}")))?;
|
||||
if is_boolean {
|
||||
let result = tenant.feature_resolver.evaluate_boolean(&flag, tenant_shard_id.tenant_id);
|
||||
let result = tenant.feature_resolver.evaluate_boolean(&flag);
|
||||
let result = result.map(|_| true).map_err(|e| e.to_string());
|
||||
json_response(StatusCode::OK, json!({ "result": result, "properties": properties }))
|
||||
} else {
|
||||
let result = tenant.feature_resolver.evaluate_multivariate(&flag, tenant_shard_id.tenant_id).map_err(|e| e.to_string());
|
||||
let result = tenant.feature_resolver.evaluate_multivariate(&flag).map_err(|e| e.to_string());
|
||||
json_response(StatusCode::OK, json!({ "result": result, "properties": properties }))
|
||||
}
|
||||
}
|
||||
@@ -4153,5 +4165,8 @@ pub fn make_router(
|
||||
.post("/v1/feature_flag_spec", |r| {
|
||||
api_handler(r, update_feature_flag_spec)
|
||||
})
|
||||
.post("/hadron-internal/reset_alert_gauges", |r| {
|
||||
api_handler(r, hadron_reset_alert_gauges)
|
||||
})
|
||||
.any(handler_404))
|
||||
}
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
use std::cell::Cell;
|
||||
use std::collections::HashMap;
|
||||
use std::num::NonZeroUsize;
|
||||
use std::os::fd::RawFd;
|
||||
@@ -102,7 +103,18 @@ pub(crate) static STORAGE_TIME_COUNT_PER_TIMELINE: Lazy<IntCounterVec> = Lazy::n
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
// Buckets for background operation duration in seconds, like compaction, GC, size calculation.
|
||||
/* BEGIN_HADRON */
|
||||
pub(crate) static STORAGE_ACTIVE_COUNT_PER_TIMELINE: Lazy<IntGaugeVec> = Lazy::new(|| {
|
||||
register_int_gauge_vec!(
|
||||
"pageserver_active_storage_operations_count",
|
||||
"Count of active storage operations with operation, tenant and timeline dimensions",
|
||||
&["operation", "tenant_id", "shard_id", "timeline_id"],
|
||||
)
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
/*END_HADRON */
|
||||
|
||||
// Buckets for background operations like compaction, GC, size calculation
|
||||
const STORAGE_OP_BUCKETS: &[f64] = &[0.010, 0.100, 1.0, 10.0, 100.0, 1000.0];
|
||||
|
||||
pub(crate) static STORAGE_TIME_GLOBAL: Lazy<HistogramVec> = Lazy::new(|| {
|
||||
@@ -2801,6 +2813,31 @@ pub(crate) static WALRECEIVER_CANDIDATES_ADDED: Lazy<IntCounter> =
|
||||
pub(crate) static WALRECEIVER_CANDIDATES_REMOVED: Lazy<IntCounter> =
|
||||
Lazy::new(|| WALRECEIVER_CANDIDATES_EVENTS.with_label_values(&["remove"]));
|
||||
|
||||
pub(crate) static LOCAL_DATA_LOSS_SUSPECTED: Lazy<IntGauge> = Lazy::new(|| {
|
||||
register_int_gauge!(
|
||||
"pageserver_local_data_loss_suspected",
|
||||
"Non-zero value indicates that pageserver local data loss is suspected (and highly likely)."
|
||||
)
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
// Counter keeping track of misrouted PageStream requests. Spelling out PageStream requests here to distinguish
|
||||
// it from other types of reqeusts (SK wal replication, http requests, etc.). PageStream requests are used by
|
||||
// Postgres compute to fetch data from pageservers.
|
||||
// A misrouted PageStream request is registered if the pageserver cannot find the tenant identified in the
|
||||
// request, or if the pageserver is not the "primary" serving the tenant shard. These error almost always identify
|
||||
// issues with compute configuration, caused by either the compute node itself being stuck in the wrong
|
||||
// configuration or Storage Controller reconciliation bugs. Misrouted requests are expected during tenant migration
|
||||
// and/or during recovery following a pageserver failure, but persistently high rates of misrouted requests
|
||||
// are indicative of bugs (and unavailability).
|
||||
pub(crate) static MISROUTED_PAGESTREAM_REQUESTS: Lazy<IntCounter> = Lazy::new(|| {
|
||||
register_int_counter!(
|
||||
"pageserver_misrouted_pagestream_requests_total",
|
||||
"Number of pageserver pagestream requests that were routed to the wrong pageserver"
|
||||
)
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
// Metrics collected on WAL redo operations
|
||||
//
|
||||
// We collect the time spent in actual WAL redo ('redo'), and time waiting
|
||||
@@ -3039,13 +3076,19 @@ pub(crate) static WAL_REDO_PROCESS_COUNTERS: Lazy<WalRedoProcessCounters> =
|
||||
pub(crate) struct StorageTimeMetricsTimer {
|
||||
metrics: StorageTimeMetrics,
|
||||
start: Instant,
|
||||
stopped: Cell<bool>,
|
||||
}
|
||||
|
||||
impl StorageTimeMetricsTimer {
|
||||
fn new(metrics: StorageTimeMetrics) -> Self {
|
||||
/*BEGIN_HADRON */
|
||||
// record the active operation as the timer starts
|
||||
metrics.timeline_active_count.inc();
|
||||
/*END_HADRON */
|
||||
Self {
|
||||
metrics,
|
||||
start: Instant::now(),
|
||||
stopped: Cell::new(false),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3061,6 +3104,10 @@ impl StorageTimeMetricsTimer {
|
||||
self.metrics.timeline_sum.inc_by(seconds);
|
||||
self.metrics.timeline_count.inc();
|
||||
self.metrics.global_histogram.observe(seconds);
|
||||
/* BEGIN_HADRON*/
|
||||
self.stopped.set(true);
|
||||
self.metrics.timeline_active_count.dec();
|
||||
/*END_HADRON */
|
||||
duration
|
||||
}
|
||||
|
||||
@@ -3071,6 +3118,16 @@ impl StorageTimeMetricsTimer {
|
||||
}
|
||||
}
|
||||
|
||||
/*BEGIN_HADRON */
|
||||
impl Drop for StorageTimeMetricsTimer {
|
||||
fn drop(&mut self) {
|
||||
if !self.stopped.get() {
|
||||
self.metrics.timeline_active_count.dec();
|
||||
}
|
||||
}
|
||||
}
|
||||
/*END_HADRON */
|
||||
|
||||
pub(crate) struct AlwaysRecordingStorageTimeMetricsTimer(Option<StorageTimeMetricsTimer>);
|
||||
|
||||
impl Drop for AlwaysRecordingStorageTimeMetricsTimer {
|
||||
@@ -3096,6 +3153,10 @@ pub(crate) struct StorageTimeMetrics {
|
||||
timeline_sum: Counter,
|
||||
/// Number of oeprations, per operation, tenant_id and timeline_id
|
||||
timeline_count: IntCounter,
|
||||
/*BEGIN_HADRON */
|
||||
/// Number of active operations per operation, tenant_id, and timeline_id
|
||||
timeline_active_count: IntGauge,
|
||||
/*END_HADRON */
|
||||
/// Global histogram having only the "operation" label.
|
||||
global_histogram: Histogram,
|
||||
}
|
||||
@@ -3115,6 +3176,11 @@ impl StorageTimeMetrics {
|
||||
let timeline_count = STORAGE_TIME_COUNT_PER_TIMELINE
|
||||
.get_metric_with_label_values(&[operation, tenant_id, shard_id, timeline_id])
|
||||
.unwrap();
|
||||
/*BEGIN_HADRON */
|
||||
let timeline_active_count = STORAGE_ACTIVE_COUNT_PER_TIMELINE
|
||||
.get_metric_with_label_values(&[operation, tenant_id, shard_id, timeline_id])
|
||||
.unwrap();
|
||||
/*END_HADRON */
|
||||
let global_histogram = STORAGE_TIME_GLOBAL
|
||||
.get_metric_with_label_values(&[operation])
|
||||
.unwrap();
|
||||
@@ -3122,6 +3188,7 @@ impl StorageTimeMetrics {
|
||||
StorageTimeMetrics {
|
||||
timeline_sum,
|
||||
timeline_count,
|
||||
timeline_active_count,
|
||||
global_histogram,
|
||||
}
|
||||
}
|
||||
@@ -3529,6 +3596,14 @@ impl TimelineMetrics {
|
||||
shard_id,
|
||||
timeline_id,
|
||||
]);
|
||||
/* BEGIN_HADRON */
|
||||
let _ = STORAGE_ACTIVE_COUNT_PER_TIMELINE.remove_label_values(&[
|
||||
op,
|
||||
tenant_id,
|
||||
shard_id,
|
||||
timeline_id,
|
||||
]);
|
||||
/*END_HADRON */
|
||||
}
|
||||
|
||||
for op in StorageIoSizeOperation::VARIANTS {
|
||||
@@ -4321,6 +4396,9 @@ pub(crate) mod disk_usage_based_eviction {
|
||||
pub(crate) layers_collected: IntCounter,
|
||||
pub(crate) layers_selected: IntCounter,
|
||||
pub(crate) layers_evicted: IntCounter,
|
||||
/*BEGIN_HADRON */
|
||||
pub(crate) bytes_evicted: IntCounter,
|
||||
/*END_HADRON */
|
||||
}
|
||||
|
||||
impl Default for Metrics {
|
||||
@@ -4357,12 +4435,21 @@ pub(crate) mod disk_usage_based_eviction {
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
/*BEGIN_HADRON */
|
||||
let bytes_evicted = register_int_counter!(
|
||||
"pageserver_disk_usage_based_eviction_evicted_bytes_total",
|
||||
"Amount of bytes successfully evicted"
|
||||
)
|
||||
.unwrap();
|
||||
/*END_HADRON */
|
||||
|
||||
Self {
|
||||
tenant_collection_time,
|
||||
tenant_layer_count,
|
||||
layers_collected,
|
||||
layers_selected,
|
||||
layers_evicted,
|
||||
bytes_evicted,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -4482,6 +4569,7 @@ pub fn preinitialize_metrics(
|
||||
&CIRCUIT_BREAKERS_UNBROKEN,
|
||||
&PAGE_SERVICE_SMGR_FLUSH_INPROGRESS_MICROS_GLOBAL,
|
||||
&WAIT_LSN_IN_PROGRESS_GLOBAL_MICROS,
|
||||
&MISROUTED_PAGESTREAM_REQUESTS,
|
||||
]
|
||||
.into_iter()
|
||||
.for_each(|c| {
|
||||
@@ -4519,6 +4607,7 @@ pub fn preinitialize_metrics(
|
||||
|
||||
// gauges
|
||||
WALRECEIVER_ACTIVE_MANAGERS.get();
|
||||
LOCAL_DATA_LOSS_SUSPECTED.get();
|
||||
|
||||
// histograms
|
||||
[
|
||||
|
||||
@@ -50,6 +50,7 @@ use tokio::io::{AsyncRead, AsyncReadExt as _, AsyncWrite, AsyncWriteExt as _, Bu
|
||||
use tokio::task::JoinHandle;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tonic::service::Interceptor as _;
|
||||
use tonic::transport::server::TcpConnectInfo;
|
||||
use tracing::*;
|
||||
use utils::auth::{Claims, Scope, SwappableJwtAuth};
|
||||
use utils::id::{TenantId, TenantTimelineId, TimelineId};
|
||||
@@ -69,7 +70,7 @@ use crate::context::{
|
||||
};
|
||||
use crate::metrics::{
|
||||
self, COMPUTE_COMMANDS_COUNTERS, ComputeCommandKind, GetPageBatchBreakReason, LIVE_CONNECTIONS,
|
||||
SmgrOpTimer, TimelineMetrics,
|
||||
MISROUTED_PAGESTREAM_REQUESTS, SmgrOpTimer, TimelineMetrics,
|
||||
};
|
||||
use crate::pgdatadir_mapping::{LsnRange, Version};
|
||||
use crate::span::{
|
||||
@@ -90,7 +91,8 @@ use crate::{CancellableTask, PERF_TRACE_TARGET, timed_after_cancellation};
|
||||
/// is not yet in state [`TenantState::Active`].
|
||||
///
|
||||
/// NB: this is a different value than [`crate::http::routes::ACTIVE_TENANT_TIMEOUT`].
|
||||
const ACTIVE_TENANT_TIMEOUT: Duration = Duration::from_millis(30000);
|
||||
/// HADRON: reduced timeout and we will retry in Cache::get().
|
||||
const ACTIVE_TENANT_TIMEOUT: Duration = Duration::from_millis(5000);
|
||||
|
||||
/// Threshold at which to log slow GetPage requests.
|
||||
const LOG_SLOW_GETPAGE_THRESHOLD: Duration = Duration::from_secs(30);
|
||||
@@ -1127,6 +1129,7 @@ impl PageServerHandler {
|
||||
// Closing the connection by returning ``::Reconnect` has the side effect of rate-limiting above message, via
|
||||
// client's reconnect backoff, as well as hopefully prompting the client to load its updated configuration
|
||||
// and talk to a different pageserver.
|
||||
MISROUTED_PAGESTREAM_REQUESTS.inc();
|
||||
return respond_error!(
|
||||
span,
|
||||
PageStreamError::Reconnect(
|
||||
@@ -3685,8 +3688,15 @@ impl proto::PageService for GrpcPageServiceHandler {
|
||||
yield match result {
|
||||
Ok(resp) => resp,
|
||||
// Convert per-request errors to GetPageResponses as appropriate, or terminate
|
||||
// the stream with a tonic::Status.
|
||||
Err(err) => page_api::GetPageResponse::try_from_status(err, req_id)?.into(),
|
||||
// the stream with a tonic::Status. Log the error regardless, since
|
||||
// ObservabilityLayer can't automatically log stream errors.
|
||||
Err(status) => {
|
||||
// TODO: it would be nice if we could propagate the get_page() fields here.
|
||||
span.in_scope(|| {
|
||||
warn!("request failed with {:?}: {}", status.code(), status.message());
|
||||
});
|
||||
page_api::GetPageResponse::try_from_status(status, req_id)?.into()
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
@@ -3824,40 +3834,85 @@ impl<S: tonic::server::NamedService> tonic::server::NamedService for Observabili
|
||||
const NAME: &'static str = S::NAME; // propagate inner service name
|
||||
}
|
||||
|
||||
impl<S, B> tower::Service<http::Request<B>> for ObservabilityLayerService<S>
|
||||
impl<S, Req, Resp> tower::Service<http::Request<Req>> for ObservabilityLayerService<S>
|
||||
where
|
||||
S: tower::Service<http::Request<B>>,
|
||||
S: tower::Service<http::Request<Req>, Response = http::Response<Resp>> + Send,
|
||||
S::Future: Send + 'static,
|
||||
{
|
||||
type Response = S::Response;
|
||||
type Error = S::Error;
|
||||
type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
|
||||
|
||||
fn call(&mut self, mut req: http::Request<B>) -> Self::Future {
|
||||
fn call(&mut self, mut req: http::Request<Req>) -> Self::Future {
|
||||
// Record the request start time as a request extension.
|
||||
//
|
||||
// TODO: we should start a timer here instead, but it currently requires a timeline handle
|
||||
// and SmgrQueryType, which we don't have yet. Refactor it to provide it later.
|
||||
req.extensions_mut().insert(ReceivedAt(Instant::now()));
|
||||
|
||||
// Create a basic tracing span. Enter the span for the current thread (to use it for inner
|
||||
// sync code like interceptors), and instrument the future (to use it for inner async code
|
||||
// like the page service itself).
|
||||
// Extract the peer address and gRPC method.
|
||||
let peer = req
|
||||
.extensions()
|
||||
.get::<TcpConnectInfo>()
|
||||
.and_then(|info| info.remote_addr())
|
||||
.map(|addr| addr.to_string())
|
||||
.unwrap_or_default();
|
||||
|
||||
let method = req
|
||||
.uri()
|
||||
.path()
|
||||
.split('/')
|
||||
.nth(2)
|
||||
.unwrap_or(req.uri().path())
|
||||
.to_string();
|
||||
|
||||
// Create a basic tracing span.
|
||||
//
|
||||
// The instrument() call below is not sufficient. It only affects the returned future, and
|
||||
// only takes effect when the caller polls it. Any sync code executed when we call
|
||||
// self.inner.call() below (such as interceptors) runs outside of the returned future, and
|
||||
// is not affected by it. We therefore have to enter the span on the current thread too.
|
||||
// Enter the span for the current thread and instrument the future. It is not sufficient to
|
||||
// only instrument the future, since it only takes effect after the future is returned and
|
||||
// polled, not when the inner service is called below (e.g. during interceptor execution).
|
||||
let span = info_span!(
|
||||
"grpc:pageservice",
|
||||
// Set by TenantMetadataInterceptor.
|
||||
// These will be populated by TenantMetadataInterceptor.
|
||||
tenant_id = field::Empty,
|
||||
timeline_id = field::Empty,
|
||||
shard_id = field::Empty,
|
||||
// NB: empty fields must be listed first above. Otherwise, the field names will be
|
||||
// clobbered when the empty fields are populated. They will be output last regardless.
|
||||
%peer,
|
||||
%method,
|
||||
);
|
||||
let _guard = span.enter();
|
||||
|
||||
Box::pin(self.inner.call(req).instrument(span.clone()))
|
||||
// Construct a future for calling the inner service, but don't await it. This avoids having
|
||||
// to clone the inner service into the future below.
|
||||
let call = self.inner.call(req);
|
||||
|
||||
async move {
|
||||
// Await the inner service call.
|
||||
let result = call.await;
|
||||
|
||||
// Log gRPC error statuses. This won't include request info from handler spans, but it
|
||||
// will catch all errors (even those emitted before handler spans are constructed). Only
|
||||
// unary request errors are logged here, not streaming response errors.
|
||||
if let Ok(ref resp) = result
|
||||
&& let Some(status) = tonic::Status::from_header_map(resp.headers())
|
||||
&& status.code() != tonic::Code::Ok
|
||||
{
|
||||
// TODO: it would be nice if we could propagate the handler span's request fields
|
||||
// here. This could e.g. be done by attaching the request fields to
|
||||
// tonic::Status::metadata via a proc macro.
|
||||
warn!(
|
||||
"request failed with {:?}: {}",
|
||||
status.code(),
|
||||
status.message()
|
||||
);
|
||||
}
|
||||
|
||||
result
|
||||
}
|
||||
.instrument(span.clone())
|
||||
.boxed()
|
||||
}
|
||||
|
||||
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||
|
||||
@@ -141,6 +141,23 @@ pub(crate) enum CollectKeySpaceError {
|
||||
Cancelled,
|
||||
}
|
||||
|
||||
impl CollectKeySpaceError {
|
||||
pub(crate) fn is_cancel(&self) -> bool {
|
||||
match self {
|
||||
CollectKeySpaceError::Decode(_) => false,
|
||||
CollectKeySpaceError::PageRead(e) => e.is_cancel(),
|
||||
CollectKeySpaceError::Cancelled => true,
|
||||
}
|
||||
}
|
||||
pub(crate) fn into_anyhow(self) -> anyhow::Error {
|
||||
match self {
|
||||
CollectKeySpaceError::Decode(e) => anyhow::Error::new(e),
|
||||
CollectKeySpaceError::PageRead(e) => anyhow::Error::new(e),
|
||||
CollectKeySpaceError::Cancelled => anyhow::Error::new(self),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<PageReconstructError> for CollectKeySpaceError {
|
||||
fn from(err: PageReconstructError) -> Self {
|
||||
match err {
|
||||
|
||||
@@ -86,7 +86,7 @@ use crate::context;
|
||||
use crate::context::RequestContextBuilder;
|
||||
use crate::context::{DownloadBehavior, RequestContext};
|
||||
use crate::deletion_queue::{DeletionQueueClient, DeletionQueueError};
|
||||
use crate::feature_resolver::FeatureResolver;
|
||||
use crate::feature_resolver::{FeatureResolver, TenantFeatureResolver};
|
||||
use crate::l0_flush::L0FlushGlobalState;
|
||||
use crate::metrics::{
|
||||
BROKEN_TENANTS_SET, CIRCUIT_BREAKERS_BROKEN, CIRCUIT_BREAKERS_UNBROKEN, CONCURRENT_INITDBS,
|
||||
@@ -142,6 +142,9 @@ mod gc_block;
|
||||
mod gc_result;
|
||||
pub(crate) mod throttle;
|
||||
|
||||
#[cfg(test)]
|
||||
pub mod debug;
|
||||
|
||||
pub(crate) use timeline::{LogicalSizeCalculationCause, PageReconstructError, Timeline};
|
||||
|
||||
pub(crate) use crate::span::debug_assert_current_span_has_tenant_and_timeline_id;
|
||||
@@ -388,7 +391,7 @@ pub struct TenantShard {
|
||||
|
||||
l0_flush_global_state: L0FlushGlobalState,
|
||||
|
||||
pub(crate) feature_resolver: FeatureResolver,
|
||||
pub(crate) feature_resolver: Arc<TenantFeatureResolver>,
|
||||
}
|
||||
impl std::fmt::Debug for TenantShard {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
@@ -3265,7 +3268,7 @@ impl TenantShard {
|
||||
};
|
||||
let gc_compaction_strategy = self
|
||||
.feature_resolver
|
||||
.evaluate_multivariate("gc-comapction-strategy", self.tenant_shard_id.tenant_id)
|
||||
.evaluate_multivariate("gc-comapction-strategy")
|
||||
.ok();
|
||||
let span = if let Some(gc_compaction_strategy) = gc_compaction_strategy {
|
||||
info_span!("gc_compact_timeline", timeline_id = %timeline.timeline_id, strategy = %gc_compaction_strategy)
|
||||
@@ -3287,7 +3290,10 @@ impl TenantShard {
|
||||
.or_else(|err| match err {
|
||||
// Ignore this, we likely raced with unarchival.
|
||||
OffloadError::NotArchived => Ok(()),
|
||||
err => Err(err),
|
||||
OffloadError::AlreadyInProgress => Ok(()),
|
||||
OffloadError::Cancelled => Err(CompactionError::ShuttingDown),
|
||||
// don't break the anyhow chain
|
||||
OffloadError::Other(err) => Err(CompactionError::Other(err)),
|
||||
})?;
|
||||
}
|
||||
|
||||
@@ -3318,23 +3324,12 @@ impl TenantShard {
|
||||
match err {
|
||||
err if err.is_cancel() => {}
|
||||
CompactionError::ShuttingDown => (),
|
||||
// Offload failures don't trip the circuit breaker, since they're cheap to retry and
|
||||
// shouldn't block compaction.
|
||||
CompactionError::Offload(_) => {}
|
||||
CompactionError::CollectKeySpaceError(err) => {
|
||||
// CollectKeySpaceError::Cancelled and PageRead::Cancelled are handled in `err.is_cancel` branch.
|
||||
self.compaction_circuit_breaker
|
||||
.lock()
|
||||
.unwrap()
|
||||
.fail(&CIRCUIT_BREAKERS_BROKEN, err);
|
||||
}
|
||||
CompactionError::Other(err) => {
|
||||
self.compaction_circuit_breaker
|
||||
.lock()
|
||||
.unwrap()
|
||||
.fail(&CIRCUIT_BREAKERS_BROKEN, err);
|
||||
}
|
||||
CompactionError::AlreadyRunning(_) => {}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3410,6 +3405,9 @@ impl TenantShard {
|
||||
if let Some(ref walredo_mgr) = self.walredo_mgr {
|
||||
walredo_mgr.maybe_quiesce(WALREDO_IDLE_TIMEOUT);
|
||||
}
|
||||
|
||||
// Update the feature resolver with the latest tenant-spcific data.
|
||||
self.feature_resolver.refresh_properties_and_flags(self);
|
||||
}
|
||||
|
||||
pub fn timeline_has_no_attached_children(&self, timeline_id: TimelineId) -> bool {
|
||||
@@ -4498,7 +4496,10 @@ impl TenantShard {
|
||||
gc_block: Default::default(),
|
||||
l0_flush_global_state,
|
||||
basebackup_cache,
|
||||
feature_resolver,
|
||||
feature_resolver: Arc::new(TenantFeatureResolver::new(
|
||||
feature_resolver,
|
||||
tenant_shard_id.tenant_id,
|
||||
)),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -6010,12 +6011,11 @@ pub(crate) mod harness {
|
||||
}
|
||||
|
||||
#[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug()))]
|
||||
pub(crate) async fn do_try_load(
|
||||
pub(crate) async fn do_try_load_with_redo(
|
||||
&self,
|
||||
walredo_mgr: Arc<WalRedoManager>,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<Arc<TenantShard>> {
|
||||
let walredo_mgr = Arc::new(WalRedoManager::from(TestRedoManager));
|
||||
|
||||
let (basebackup_cache, _) = BasebackupCache::new(Utf8PathBuf::new(), None);
|
||||
|
||||
let tenant = Arc::new(TenantShard::new(
|
||||
@@ -6053,6 +6053,14 @@ pub(crate) mod harness {
|
||||
Ok(tenant)
|
||||
}
|
||||
|
||||
pub(crate) async fn do_try_load(
|
||||
&self,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<Arc<TenantShard>> {
|
||||
let walredo_mgr = Arc::new(WalRedoManager::from(TestRedoManager));
|
||||
self.do_try_load_with_redo(walredo_mgr, ctx).await
|
||||
}
|
||||
|
||||
pub fn timeline_path(&self, timeline_id: &TimelineId) -> Utf8PathBuf {
|
||||
self.conf.timeline_path(&self.tenant_shard_id, timeline_id)
|
||||
}
|
||||
@@ -6129,7 +6137,7 @@ mod tests {
|
||||
use pageserver_api::keyspace::KeySpace;
|
||||
#[cfg(feature = "testing")]
|
||||
use pageserver_api::keyspace::KeySpaceRandomAccum;
|
||||
use pageserver_api::models::{CompactionAlgorithm, CompactionAlgorithmSettings};
|
||||
use pageserver_api::models::{CompactionAlgorithm, CompactionAlgorithmSettings, LsnLease};
|
||||
use pageserver_compaction::helpers::overlaps_with;
|
||||
#[cfg(feature = "testing")]
|
||||
use rand::SeedableRng;
|
||||
@@ -9391,6 +9399,14 @@ mod tests {
|
||||
.unwrap()
|
||||
.load()
|
||||
.await;
|
||||
// set a non-zero lease length to test the feature
|
||||
tenant
|
||||
.update_tenant_config(|mut conf| {
|
||||
conf.lsn_lease_length = Some(LsnLease::DEFAULT_LENGTH);
|
||||
Ok(conf)
|
||||
})
|
||||
.unwrap();
|
||||
|
||||
let key = Key::from_hex("010000000033333333444444445500000000").unwrap();
|
||||
|
||||
let end_lsn = Lsn(0x100);
|
||||
|
||||
366
pageserver/src/tenant/debug.rs
Normal file
366
pageserver/src/tenant/debug.rs
Normal file
@@ -0,0 +1,366 @@
|
||||
use std::{ops::Range, str::FromStr, sync::Arc};
|
||||
|
||||
use crate::walredo::RedoAttemptType;
|
||||
use base64::{Engine as _, engine::general_purpose::STANDARD};
|
||||
use bytes::{Bytes, BytesMut};
|
||||
use camino::Utf8PathBuf;
|
||||
use clap::Parser;
|
||||
use itertools::Itertools;
|
||||
use pageserver_api::{
|
||||
key::Key,
|
||||
keyspace::KeySpace,
|
||||
shard::{ShardIdentity, ShardStripeSize},
|
||||
};
|
||||
use postgres_ffi::PgMajorVersion;
|
||||
use postgres_ffi::{BLCKSZ, page_is_new, page_set_lsn};
|
||||
use tracing::Instrument;
|
||||
use utils::{
|
||||
generation::Generation,
|
||||
id::{TenantId, TimelineId},
|
||||
lsn::Lsn,
|
||||
shard::{ShardCount, ShardIndex, ShardNumber},
|
||||
};
|
||||
use wal_decoder::models::record::NeonWalRecord;
|
||||
|
||||
use crate::{
|
||||
context::{DownloadBehavior, RequestContext},
|
||||
task_mgr::TaskKind,
|
||||
tenant::storage_layer::ValueReconstructState,
|
||||
walredo::harness::RedoHarness,
|
||||
};
|
||||
|
||||
use super::{
|
||||
WalRedoManager, WalredoManagerId,
|
||||
harness::TenantHarness,
|
||||
remote_timeline_client::LayerFileMetadata,
|
||||
storage_layer::{AsLayerDesc, IoConcurrency, Layer, LayerName, ValuesReconstructState},
|
||||
};
|
||||
|
||||
fn process_page_image(next_record_lsn: Lsn, is_fpw: bool, img_bytes: Bytes) -> Bytes {
|
||||
// To match the logic in libs/wal_decoder/src/serialized_batch.rs
|
||||
let mut new_image: BytesMut = img_bytes.into();
|
||||
if is_fpw && !page_is_new(&new_image) {
|
||||
page_set_lsn(&mut new_image, next_record_lsn);
|
||||
}
|
||||
assert_eq!(new_image.len(), BLCKSZ as usize);
|
||||
new_image.freeze()
|
||||
}
|
||||
|
||||
async fn redo_wals(input: &str, key: Key) -> anyhow::Result<()> {
|
||||
let tenant_id = TenantId::generate();
|
||||
let timeline_id = TimelineId::generate();
|
||||
let redo_harness = RedoHarness::new()?;
|
||||
let span = redo_harness.span();
|
||||
let tenant_conf = pageserver_api::models::TenantConfig {
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
let ctx = RequestContext::new(TaskKind::DebugTool, DownloadBehavior::Error);
|
||||
let tenant = TenantHarness::create_custom(
|
||||
"search_key",
|
||||
tenant_conf,
|
||||
tenant_id,
|
||||
ShardIdentity::unsharded(),
|
||||
Generation::new(1),
|
||||
)
|
||||
.await?
|
||||
.do_try_load_with_redo(
|
||||
Arc::new(WalRedoManager::Prod(
|
||||
WalredoManagerId::next(),
|
||||
redo_harness.manager,
|
||||
)),
|
||||
&ctx,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
let timeline = tenant
|
||||
.create_test_timeline(timeline_id, Lsn(0x10), PgMajorVersion::PG16, &ctx)
|
||||
.await?;
|
||||
let contents = tokio::fs::read_to_string(input)
|
||||
.await
|
||||
.map_err(|e| anyhow::Error::msg(format!("Failed to read input file {input}: {e}")))
|
||||
.unwrap();
|
||||
let lines = contents.lines();
|
||||
let mut last_wal_lsn: Option<Lsn> = None;
|
||||
let state = {
|
||||
let mut state = ValueReconstructState::default();
|
||||
let mut is_fpw = false;
|
||||
let mut is_first_line = true;
|
||||
for line in lines {
|
||||
if is_first_line {
|
||||
is_first_line = false;
|
||||
if line.trim() == "FPW" {
|
||||
is_fpw = true;
|
||||
}
|
||||
continue; // Skip the first line.
|
||||
}
|
||||
// Each input line is in the "<next_record_lsn>,<base64>" format.
|
||||
let (lsn_str, payload_b64) = line
|
||||
.split_once(',')
|
||||
.expect("Invalid input format: expected '<lsn>,<base64>'");
|
||||
|
||||
// Parse the LSN and decode the payload.
|
||||
let lsn = Lsn::from_str(lsn_str.trim()).expect("Invalid LSN format");
|
||||
let bytes = Bytes::from(
|
||||
STANDARD
|
||||
.decode(payload_b64.trim())
|
||||
.expect("Invalid base64 payload"),
|
||||
);
|
||||
|
||||
// The first line is considered the base image, the rest are WAL records.
|
||||
if state.img.is_none() {
|
||||
state.img = Some((lsn, process_page_image(lsn, is_fpw, bytes)));
|
||||
} else {
|
||||
let wal_record = NeonWalRecord::Postgres {
|
||||
will_init: false,
|
||||
rec: bytes,
|
||||
};
|
||||
state.records.push((lsn, wal_record));
|
||||
last_wal_lsn.replace(lsn);
|
||||
}
|
||||
}
|
||||
state
|
||||
};
|
||||
|
||||
assert!(state.img.is_some(), "No base image found");
|
||||
assert!(!state.records.is_empty(), "No WAL records found");
|
||||
let result = timeline
|
||||
.reconstruct_value(key, last_wal_lsn.unwrap(), state, RedoAttemptType::ReadPage)
|
||||
.instrument(span.clone())
|
||||
.await?;
|
||||
|
||||
eprintln!("final image: {:?}", STANDARD.encode(result));
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn search_key(
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
dir: String,
|
||||
key: Key,
|
||||
lsn: Lsn,
|
||||
) -> anyhow::Result<()> {
|
||||
let shard_index = ShardIndex {
|
||||
shard_number: ShardNumber(0),
|
||||
shard_count: ShardCount(4),
|
||||
};
|
||||
|
||||
let redo_harness = RedoHarness::new()?;
|
||||
let span = redo_harness.span();
|
||||
let tenant_conf = pageserver_api::models::TenantConfig {
|
||||
..Default::default()
|
||||
};
|
||||
let ctx = RequestContext::new(TaskKind::DebugTool, DownloadBehavior::Error);
|
||||
let tenant = TenantHarness::create_custom(
|
||||
"search_key",
|
||||
tenant_conf,
|
||||
tenant_id,
|
||||
ShardIdentity::new(
|
||||
shard_index.shard_number,
|
||||
shard_index.shard_count,
|
||||
ShardStripeSize(32768),
|
||||
)
|
||||
.unwrap(),
|
||||
Generation::new(1),
|
||||
)
|
||||
.await?
|
||||
.do_try_load_with_redo(
|
||||
Arc::new(WalRedoManager::Prod(
|
||||
WalredoManagerId::next(),
|
||||
redo_harness.manager,
|
||||
)),
|
||||
&ctx,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let timeline = tenant
|
||||
.create_test_timeline(timeline_id, Lsn(0x10), PgMajorVersion::PG16, &ctx)
|
||||
.await?;
|
||||
|
||||
let mut delta_layers: Vec<Layer> = Vec::new();
|
||||
let mut img_layer: Option<Layer> = Option::None;
|
||||
let mut dir = tokio::fs::read_dir(dir).await?;
|
||||
loop {
|
||||
let entry = dir.next_entry().await?;
|
||||
if entry.is_none() || !entry.as_ref().unwrap().file_type().await?.is_file() {
|
||||
break;
|
||||
}
|
||||
let path = Utf8PathBuf::from_path_buf(entry.unwrap().path()).unwrap();
|
||||
let layer_name = match LayerName::from_str(path.file_name().unwrap()) {
|
||||
Ok(name) => name,
|
||||
Err(_) => {
|
||||
eprintln!("Skipped invalid layer: {path}");
|
||||
continue;
|
||||
}
|
||||
};
|
||||
let layer = Layer::for_resident(
|
||||
tenant.conf,
|
||||
&timeline,
|
||||
path.clone(),
|
||||
layer_name,
|
||||
LayerFileMetadata::new(
|
||||
tokio::fs::metadata(path.clone()).await?.len(),
|
||||
Generation::new(1),
|
||||
shard_index,
|
||||
),
|
||||
);
|
||||
if layer.layer_desc().is_delta() {
|
||||
delta_layers.push(layer.into());
|
||||
} else if img_layer.is_none() {
|
||||
img_layer = Some(layer.into());
|
||||
} else {
|
||||
anyhow::bail!("Found multiple image layers");
|
||||
}
|
||||
}
|
||||
// sort delta layers based on the descending order of LSN
|
||||
delta_layers.sort_by(|a, b| {
|
||||
b.layer_desc()
|
||||
.get_lsn_range()
|
||||
.start
|
||||
.cmp(&a.layer_desc().get_lsn_range().start)
|
||||
});
|
||||
|
||||
let mut state = ValuesReconstructState::new(IoConcurrency::Sequential);
|
||||
|
||||
let key_space = KeySpace::single(Range {
|
||||
start: key,
|
||||
end: key.next(),
|
||||
});
|
||||
let lsn_range = Range {
|
||||
start: img_layer
|
||||
.as_ref()
|
||||
.map_or(Lsn(0x00), |img| img.layer_desc().image_layer_lsn()),
|
||||
end: lsn,
|
||||
};
|
||||
for delta_layer in delta_layers.iter() {
|
||||
delta_layer
|
||||
.get_values_reconstruct_data(key_space.clone(), lsn_range.clone(), &mut state, &ctx)
|
||||
.await?;
|
||||
}
|
||||
|
||||
img_layer
|
||||
.as_ref()
|
||||
.unwrap()
|
||||
.get_values_reconstruct_data(key_space.clone(), lsn_range.clone(), &mut state, &ctx)
|
||||
.await?;
|
||||
|
||||
for (_key, result) in std::mem::take(&mut state.keys) {
|
||||
let state = result.collect_pending_ios().await?;
|
||||
if state.img.is_some() {
|
||||
eprintln!(
|
||||
"image: {}: {:x?}",
|
||||
state.img.as_ref().unwrap().0,
|
||||
STANDARD.encode(state.img.as_ref().unwrap().1.clone())
|
||||
);
|
||||
}
|
||||
for delta in state.records.iter() {
|
||||
match &delta.1 {
|
||||
NeonWalRecord::Postgres { will_init, rec } => {
|
||||
eprintln!(
|
||||
"delta: {}: will_init: {}, {:x?}",
|
||||
delta.0,
|
||||
will_init,
|
||||
STANDARD.encode(rec)
|
||||
);
|
||||
}
|
||||
_ => {
|
||||
eprintln!("delta: {}: {:x?}", delta.0, delta.1);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let result = timeline
|
||||
.reconstruct_value(key, lsn_range.end, state, RedoAttemptType::ReadPage)
|
||||
.instrument(span.clone())
|
||||
.await?;
|
||||
eprintln!("final image: {lsn} : {result:?}");
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Redo all WALs against the base image in the input file. Return the base64 encoded final image.
|
||||
/// Each line in the input file must be in the form "<lsn>,<base64>" where:
|
||||
/// * `<lsn>` is a PostgreSQL LSN in hexadecimal notation, e.g. `0/16ABCDE`.
|
||||
/// * `<base64>` is the base64‐encoded page image (first line) or WAL record (subsequent lines).
|
||||
///
|
||||
/// The first line provides the base image of a page. The LSN is the LSN of "next record" following
|
||||
/// the record containing the FPI. For example, if the FPI was extracted from a WAL record occuping
|
||||
/// [0/1, 0/200) in the WAL stream, the LSN appearing along side the page image here should be 0/200.
|
||||
///
|
||||
/// The subsequent lines are WAL records, ordered from the oldest to the newest. The LSN is the
|
||||
/// record LSN of the WAL record, not the "next record" LSN. For example, if the WAL record here
|
||||
/// occupies [0/1, 0/200) in the WAL stream, the LSN appearing along side the WAL record here should
|
||||
/// be 0/1.
|
||||
#[derive(Parser)]
|
||||
struct RedoWalsCmd {
|
||||
#[clap(long)]
|
||||
input: String,
|
||||
#[clap(long)]
|
||||
key: String,
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_redo_wals() -> anyhow::Result<()> {
|
||||
let args = std::env::args().collect_vec();
|
||||
let pos = args
|
||||
.iter()
|
||||
.position(|arg| arg == "--")
|
||||
.unwrap_or(args.len());
|
||||
let slice = &args[pos..args.len()];
|
||||
let cmd = match RedoWalsCmd::try_parse_from(slice) {
|
||||
Ok(cmd) => cmd,
|
||||
Err(err) => {
|
||||
eprintln!("{err}");
|
||||
return Ok(());
|
||||
}
|
||||
};
|
||||
|
||||
let key = Key::from_hex(&cmd.key).unwrap();
|
||||
redo_wals(&cmd.input, key).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Search for a page at the given LSN in all layers of the data_dir.
|
||||
/// Return the base64-encoded image and all WAL records, as well as the final reconstructed image.
|
||||
#[derive(Parser)]
|
||||
struct SearchKeyCmd {
|
||||
#[clap(long)]
|
||||
tenant_id: String,
|
||||
#[clap(long)]
|
||||
timeline_id: String,
|
||||
#[clap(long)]
|
||||
data_dir: String,
|
||||
#[clap(long)]
|
||||
key: String,
|
||||
#[clap(long)]
|
||||
lsn: String,
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_search_key() -> anyhow::Result<()> {
|
||||
let args = std::env::args().collect_vec();
|
||||
let pos = args
|
||||
.iter()
|
||||
.position(|arg| arg == "--")
|
||||
.unwrap_or(args.len());
|
||||
let slice = &args[pos..args.len()];
|
||||
let cmd = match SearchKeyCmd::try_parse_from(slice) {
|
||||
Ok(cmd) => cmd,
|
||||
Err(err) => {
|
||||
eprintln!("{err}");
|
||||
return Ok(());
|
||||
}
|
||||
};
|
||||
|
||||
let tenant_id = TenantId::from_str(&cmd.tenant_id).unwrap();
|
||||
let timeline_id = TimelineId::from_str(&cmd.timeline_id).unwrap();
|
||||
let key = Key::from_hex(&cmd.key).unwrap();
|
||||
let lsn = Lsn::from_str(&cmd.lsn).unwrap();
|
||||
search_key(tenant_id, timeline_id, cmd.data_dir, key, lsn).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -43,7 +43,7 @@ use crate::controller_upcall_client::{
|
||||
};
|
||||
use crate::deletion_queue::DeletionQueueClient;
|
||||
use crate::http::routes::ACTIVE_TENANT_TIMEOUT;
|
||||
use crate::metrics::{TENANT, TENANT_MANAGER as METRICS};
|
||||
use crate::metrics::{LOCAL_DATA_LOSS_SUSPECTED, TENANT, TENANT_MANAGER as METRICS};
|
||||
use crate::task_mgr::{BACKGROUND_RUNTIME, TaskKind};
|
||||
use crate::tenant::config::{
|
||||
AttachedLocationConfig, AttachmentMode, LocationConf, LocationMode, SecondaryLocationConfig,
|
||||
@@ -538,6 +538,21 @@ pub async fn init_tenant_mgr(
|
||||
// Determine which tenants are to be secondary or attached, and in which generation
|
||||
let tenant_modes = init_load_generations(conf, &tenant_configs, resources, cancel).await?;
|
||||
|
||||
// Hadron local SSD check: Raise an alert if our local filesystem does not contain any tenants but the re-attach request returned tenants.
|
||||
// This can happen if the PS suffered a Kubernetes node failure resulting in loss of all local data, but recovered quickly on another node
|
||||
// so the Storage Controller has not had the time to move tenants out.
|
||||
let data_loss_suspected = if let Some(tenant_modes) = &tenant_modes {
|
||||
tenant_configs.is_empty() && !tenant_modes.is_empty()
|
||||
} else {
|
||||
false
|
||||
};
|
||||
if data_loss_suspected {
|
||||
tracing::error!(
|
||||
"Local data loss suspected: no tenants found on local filesystem, but re-attach request returned tenants"
|
||||
);
|
||||
}
|
||||
LOCAL_DATA_LOSS_SUSPECTED.set(if data_loss_suspected { 1 } else { 0 });
|
||||
|
||||
tracing::info!(
|
||||
"Attaching {} tenants at startup, warming up {} at a time",
|
||||
tenant_configs.len(),
|
||||
|
||||
@@ -141,11 +141,29 @@ pub(super) async fn upload_timeline_layer<'a>(
|
||||
|
||||
let fs_size = usize::try_from(fs_size)
|
||||
.with_context(|| format!("convert {local_path:?} size {fs_size} usize"))?;
|
||||
|
||||
/* BEGIN_HADRON */
|
||||
let mut metadata = None;
|
||||
match storage {
|
||||
// Pass the file path as a storage metadata to minimize changes to neon.
|
||||
// Otherwise, we need to change the upload interface.
|
||||
GenericRemoteStorage::AzureBlob(s) => {
|
||||
let block_size_mb = s.put_block_size_mb.unwrap_or(0);
|
||||
if block_size_mb > 0 && fs_size > block_size_mb * 1024 * 1024 {
|
||||
metadata = Some(remote_storage::StorageMetadata::from([(
|
||||
"databricks_azure_put_block",
|
||||
local_path.as_str(),
|
||||
)]));
|
||||
}
|
||||
}
|
||||
GenericRemoteStorage::LocalFs(_) => {}
|
||||
GenericRemoteStorage::AwsS3(_) => {}
|
||||
GenericRemoteStorage::Unreliable(_) => {}
|
||||
};
|
||||
/* END_HADRON */
|
||||
let reader = tokio_util::io::ReaderStream::with_capacity(source_file, super::BUFFER_SIZE);
|
||||
|
||||
storage
|
||||
.upload(reader, fs_size, remote_path, None, cancel)
|
||||
.upload(reader, fs_size, remote_path, metadata, cancel)
|
||||
.await
|
||||
.with_context(|| format!("upload layer from local path '{local_path}'"))
|
||||
}
|
||||
|
||||
@@ -34,6 +34,21 @@ use crate::virtual_file::owned_buffers_io::write::FlushTaskError;
|
||||
/// We use 3/4 Tokio threads, to avoid blocking all threads in case we do any CPU-heavy work.
|
||||
static CONCURRENT_BACKGROUND_TASKS: Lazy<Semaphore> = Lazy::new(|| {
|
||||
let total_threads = TOKIO_WORKER_THREADS.get();
|
||||
|
||||
/*BEGIN_HADRON*/
|
||||
// ideally we should run at least one compaction task per tenant in order to (1) maximize
|
||||
// compaction throughput (2) avoid head-of-line blocking of large compactions. However doing
|
||||
// that may create too many compaction tasks with lots of memory overheads. So we limit the
|
||||
// number of compaction tasks based on the available CPU core count.
|
||||
// Need to revisit.
|
||||
// let tasks_per_thread = std::env::var("BG_TASKS_PER_THREAD")
|
||||
// .ok()
|
||||
// .and_then(|s| s.parse().ok())
|
||||
// .unwrap_or(4);
|
||||
// let permits = usize::max(1, total_threads * tasks_per_thread);
|
||||
// // assert!(permits < total_threads, "need threads for other work");
|
||||
/*END_HADRON*/
|
||||
|
||||
let permits = max(1, (total_threads * 3).checked_div(4).unwrap_or(0));
|
||||
assert_ne!(permits, 0, "we will not be adding in permits later");
|
||||
assert!(permits < total_threads, "need threads for other work");
|
||||
@@ -303,9 +318,6 @@ pub(crate) fn log_compaction_error(
|
||||
let level = match err {
|
||||
e if e.is_cancel() => return,
|
||||
ShuttingDown => return,
|
||||
Offload(_) => Level::ERROR,
|
||||
AlreadyRunning(_) => Level::ERROR,
|
||||
CollectKeySpaceError(_) => Level::ERROR,
|
||||
_ if task_cancelled => Level::INFO,
|
||||
Other(err) => {
|
||||
let root_cause = err.root_cause();
|
||||
@@ -315,7 +327,7 @@ pub(crate) fn log_compaction_error(
|
||||
.is_some_and(|e| e.is_stopping());
|
||||
let timeline = root_cause
|
||||
.downcast_ref::<PageReconstructError>()
|
||||
.is_some_and(|e| e.is_stopping());
|
||||
.is_some_and(|e| e.is_cancel());
|
||||
let buffered_writer_flush_task_canelled = root_cause
|
||||
.downcast_ref::<FlushTaskError>()
|
||||
.is_some_and(|e| e.is_cancel());
|
||||
|
||||
@@ -40,7 +40,6 @@ use layer_manager::{
|
||||
Shutdown,
|
||||
};
|
||||
|
||||
use offload::OffloadError;
|
||||
use once_cell::sync::Lazy;
|
||||
use pageserver_api::config::tenant_conf_defaults::DEFAULT_PITR_INTERVAL;
|
||||
use pageserver_api::key::{
|
||||
@@ -78,7 +77,7 @@ use utils::rate_limit::RateLimit;
|
||||
use utils::seqwait::SeqWait;
|
||||
use utils::simple_rcu::{Rcu, RcuReadGuard};
|
||||
use utils::sync::gate::{Gate, GateGuard};
|
||||
use utils::{completion, critical, fs_ext, pausable_failpoint};
|
||||
use utils::{completion, critical_timeline, fs_ext, pausable_failpoint};
|
||||
#[cfg(test)]
|
||||
use wal_decoder::models::value::Value;
|
||||
use wal_decoder::serialized_batch::{SerializedValueBatch, ValueMeta};
|
||||
@@ -106,7 +105,7 @@ use crate::context::{
|
||||
DownloadBehavior, PerfInstrumentFutureExt, RequestContext, RequestContextBuilder,
|
||||
};
|
||||
use crate::disk_usage_eviction_task::{DiskUsageEvictionInfo, EvictionCandidate, finite_f32};
|
||||
use crate::feature_resolver::FeatureResolver;
|
||||
use crate::feature_resolver::TenantFeatureResolver;
|
||||
use crate::keyspace::{KeyPartitioning, KeySpace};
|
||||
use crate::l0_flush::{self, L0FlushGlobalState};
|
||||
use crate::metrics::{
|
||||
@@ -119,7 +118,6 @@ use crate::pgdatadir_mapping::{
|
||||
MAX_AUX_FILE_V2_DELTAS, MetricsUpdate,
|
||||
};
|
||||
use crate::task_mgr::TaskKind;
|
||||
use crate::tenant::config::AttachmentMode;
|
||||
use crate::tenant::gc_result::GcResult;
|
||||
use crate::tenant::layer_map::LayerMap;
|
||||
use crate::tenant::metadata::TimelineMetadata;
|
||||
@@ -202,7 +200,7 @@ pub struct TimelineResources {
|
||||
pub l0_compaction_trigger: Arc<Notify>,
|
||||
pub l0_flush_global_state: l0_flush::L0FlushGlobalState,
|
||||
pub basebackup_cache: Arc<BasebackupCache>,
|
||||
pub feature_resolver: FeatureResolver,
|
||||
pub feature_resolver: Arc<TenantFeatureResolver>,
|
||||
}
|
||||
|
||||
pub struct Timeline {
|
||||
@@ -448,7 +446,7 @@ pub struct Timeline {
|
||||
/// A channel to send async requests to prepare a basebackup for the basebackup cache.
|
||||
basebackup_cache: Arc<BasebackupCache>,
|
||||
|
||||
feature_resolver: FeatureResolver,
|
||||
feature_resolver: Arc<TenantFeatureResolver>,
|
||||
}
|
||||
|
||||
pub(crate) enum PreviousHeatmap {
|
||||
@@ -585,6 +583,28 @@ pub(crate) enum PageReconstructError {
|
||||
MissingKey(Box<MissingKeyError>),
|
||||
}
|
||||
|
||||
impl PageReconstructError {
|
||||
pub(crate) fn is_cancel(&self) -> bool {
|
||||
match self {
|
||||
PageReconstructError::Other(_) => false,
|
||||
PageReconstructError::AncestorLsnTimeout(e) => e.is_cancel(),
|
||||
PageReconstructError::Cancelled => true,
|
||||
PageReconstructError::WalRedo(_) => false,
|
||||
PageReconstructError::MissingKey(_) => false,
|
||||
}
|
||||
}
|
||||
#[allow(dead_code)] // we use the is_cancel + into_anyhow pattern in quite a few places, this one will follow soon enough
|
||||
pub(crate) fn into_anyhow(self) -> anyhow::Error {
|
||||
match self {
|
||||
PageReconstructError::Other(e) => e,
|
||||
PageReconstructError::AncestorLsnTimeout(e) => e.into_anyhow(),
|
||||
PageReconstructError::Cancelled => anyhow::Error::new(self),
|
||||
PageReconstructError::WalRedo(e) => e,
|
||||
PageReconstructError::MissingKey(_) => anyhow::Error::new(self),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<anyhow::Error> for PageReconstructError {
|
||||
fn from(value: anyhow::Error) -> Self {
|
||||
// with walingest.rs many PageReconstructError are wrapped in as anyhow::Error
|
||||
@@ -738,17 +758,6 @@ impl std::fmt::Display for MissingKeyError {
|
||||
}
|
||||
}
|
||||
|
||||
impl PageReconstructError {
|
||||
/// Returns true if this error indicates a tenant/timeline shutdown alike situation
|
||||
pub(crate) fn is_stopping(&self) -> bool {
|
||||
use PageReconstructError::*;
|
||||
match self {
|
||||
Cancelled => true,
|
||||
Other(_) | AncestorLsnTimeout(_) | WalRedo(_) | MissingKey(_) => false,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(thiserror::Error, Debug)]
|
||||
pub(crate) enum CreateImageLayersError {
|
||||
#[error("timeline shutting down")]
|
||||
@@ -951,13 +960,35 @@ pub enum WaitLsnError {
|
||||
Timeout(String),
|
||||
}
|
||||
|
||||
impl WaitLsnError {
|
||||
pub(crate) fn is_cancel(&self) -> bool {
|
||||
match self {
|
||||
WaitLsnError::Shutdown => true,
|
||||
WaitLsnError::BadState(timeline_state) => match timeline_state {
|
||||
TimelineState::Loading => false,
|
||||
TimelineState::Active => false,
|
||||
TimelineState::Stopping => true,
|
||||
TimelineState::Broken { .. } => false,
|
||||
},
|
||||
WaitLsnError::Timeout(_) => false,
|
||||
}
|
||||
}
|
||||
pub(crate) fn into_anyhow(self) -> anyhow::Error {
|
||||
match self {
|
||||
WaitLsnError::Shutdown => anyhow::Error::new(self),
|
||||
WaitLsnError::BadState(_) => anyhow::Error::new(self),
|
||||
WaitLsnError::Timeout(_) => anyhow::Error::new(self),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<WaitLsnError> for tonic::Status {
|
||||
fn from(err: WaitLsnError) -> Self {
|
||||
use tonic::Code;
|
||||
let code = match &err {
|
||||
WaitLsnError::Timeout(_) => Code::Internal,
|
||||
WaitLsnError::BadState(_) => Code::Internal,
|
||||
WaitLsnError::Shutdown => Code::Unavailable,
|
||||
let code = if err.is_cancel() {
|
||||
Code::Unavailable
|
||||
} else {
|
||||
Code::Internal
|
||||
};
|
||||
tonic::Status::new(code, err.to_string())
|
||||
}
|
||||
@@ -1084,6 +1115,26 @@ enum ImageLayerCreationOutcome {
|
||||
Skip,
|
||||
}
|
||||
|
||||
enum RepartitionError {
|
||||
Other(anyhow::Error),
|
||||
CollectKeyspace(CollectKeySpaceError),
|
||||
}
|
||||
|
||||
impl RepartitionError {
|
||||
fn is_cancel(&self) -> bool {
|
||||
match self {
|
||||
RepartitionError::Other(_) => false,
|
||||
RepartitionError::CollectKeyspace(e) => e.is_cancel(),
|
||||
}
|
||||
}
|
||||
fn into_anyhow(self) -> anyhow::Error {
|
||||
match self {
|
||||
RepartitionError::Other(e) => e,
|
||||
RepartitionError::CollectKeyspace(e) => e.into_anyhow(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Public interface functions
|
||||
impl Timeline {
|
||||
/// Get the LSN where this branch was created
|
||||
@@ -1770,30 +1821,31 @@ impl Timeline {
|
||||
existing_lease.clone()
|
||||
}
|
||||
Entry::Vacant(vacant) => {
|
||||
// Reject already GC-ed LSN if we are in AttachedSingle and
|
||||
// not blocked by the lsn lease deadline.
|
||||
// Never allow a lease to be requested for an LSN below the applied GC cutoff. The data could have been deleted.
|
||||
let latest_gc_cutoff_lsn = self.get_applied_gc_cutoff_lsn();
|
||||
if lsn < *latest_gc_cutoff_lsn {
|
||||
bail!(
|
||||
"tried to request an lsn lease for an lsn below the latest gc cutoff. requested at {} gc cutoff {}",
|
||||
lsn,
|
||||
*latest_gc_cutoff_lsn
|
||||
);
|
||||
}
|
||||
|
||||
// We allow create lease for those below the planned gc cutoff if we are still within the grace period
|
||||
// of GC blocking.
|
||||
let validate = {
|
||||
let conf = self.tenant_conf.load();
|
||||
conf.location.attach_mode == AttachmentMode::Single
|
||||
&& !conf.is_gc_blocked_by_lsn_lease_deadline()
|
||||
!conf.is_gc_blocked_by_lsn_lease_deadline()
|
||||
};
|
||||
|
||||
if init || validate {
|
||||
let latest_gc_cutoff_lsn = self.get_applied_gc_cutoff_lsn();
|
||||
if lsn < *latest_gc_cutoff_lsn {
|
||||
bail!(
|
||||
"tried to request an lsn lease for an lsn below the latest gc cutoff. requested at {} gc cutoff {}",
|
||||
lsn,
|
||||
*latest_gc_cutoff_lsn
|
||||
);
|
||||
}
|
||||
if lsn < planned_cutoff {
|
||||
bail!(
|
||||
"tried to request an lsn lease for an lsn below the planned gc cutoff. requested at {} planned gc cutoff {}",
|
||||
lsn,
|
||||
planned_cutoff
|
||||
);
|
||||
}
|
||||
// Do not allow initial lease creation to be below the planned gc cutoff. The client (compute_ctl) determines
|
||||
// whether it is a initial lease creation or a renewal.
|
||||
if (init || validate) && lsn < planned_cutoff {
|
||||
bail!(
|
||||
"tried to request an lsn lease for an lsn below the planned gc cutoff. requested at {} planned gc cutoff {}",
|
||||
lsn,
|
||||
planned_cutoff
|
||||
);
|
||||
}
|
||||
|
||||
let dt: DateTime<Utc> = valid_until.into();
|
||||
@@ -2066,19 +2118,9 @@ impl Timeline {
|
||||
Err(CompactionError::ShuttingDown) => {
|
||||
// Covered by the `Err(e) if e.is_cancel()` branch.
|
||||
}
|
||||
Err(CompactionError::AlreadyRunning(_)) => {
|
||||
// Covered by the `Err(e) if e.is_cancel()` branch.
|
||||
}
|
||||
Err(CompactionError::Other(_)) => {
|
||||
self.compaction_failed.store(true, AtomicOrdering::Relaxed)
|
||||
}
|
||||
Err(CompactionError::CollectKeySpaceError(_)) => {
|
||||
// Cancelled errors are covered by the `Err(e) if e.is_cancel()` branch.
|
||||
self.compaction_failed.store(true, AtomicOrdering::Relaxed)
|
||||
}
|
||||
// Don't change the current value on offload failure or shutdown. We don't want to
|
||||
// abruptly stall nor resume L0 flushes in these cases.
|
||||
Err(CompactionError::Offload(_)) => {}
|
||||
};
|
||||
|
||||
result
|
||||
@@ -2142,14 +2184,31 @@ impl Timeline {
|
||||
debug_assert_current_span_has_tenant_and_timeline_id();
|
||||
|
||||
// Regardless of whether we're going to try_freeze_and_flush
|
||||
// or not, stop ingesting any more data.
|
||||
// cancel walreceiver to stop ingesting more data asap.
|
||||
//
|
||||
// Note that we're accepting a race condition here where we may
|
||||
// do the final flush below, before walreceiver observes the
|
||||
// cancellation and exits.
|
||||
// This means we may open a new InMemoryLayer after the final flush below.
|
||||
// Flush loop is also still running for a short while, so, in theory, it
|
||||
// could also make its way into the upload queue.
|
||||
//
|
||||
// If we wait for the shutdown of the walreceiver before moving on to the
|
||||
// flush, then that would be avoided. But we don't do it because the
|
||||
// walreceiver entertains reads internally, which means that it possibly
|
||||
// depends on the download of layers. Layer download is only sensitive to
|
||||
// the cancellation of the entire timeline, so cancelling the walreceiver
|
||||
// will have no effect on the individual get requests.
|
||||
// This would cause problems when there is a lot of ongoing downloads or
|
||||
// there is S3 unavailabilities, i.e. detach, deletion, etc would hang,
|
||||
// and we can't deallocate resources of the timeline, etc.
|
||||
let walreceiver = self.walreceiver.lock().unwrap().take();
|
||||
tracing::debug!(
|
||||
is_some = walreceiver.is_some(),
|
||||
"Waiting for WalReceiverManager..."
|
||||
);
|
||||
if let Some(walreceiver) = walreceiver {
|
||||
walreceiver.shutdown().await;
|
||||
walreceiver.cancel().await;
|
||||
}
|
||||
// ... and inform any waiters for newer LSNs that there won't be any.
|
||||
self.last_record_lsn.shutdown();
|
||||
@@ -3108,7 +3167,7 @@ impl Timeline {
|
||||
|
||||
basebackup_cache: resources.basebackup_cache,
|
||||
|
||||
feature_resolver: resources.feature_resolver,
|
||||
feature_resolver: resources.feature_resolver.clone(),
|
||||
};
|
||||
|
||||
result.repartition_threshold =
|
||||
@@ -4725,7 +4784,7 @@ impl Timeline {
|
||||
}
|
||||
|
||||
// Fetch the next layer to flush, if any.
|
||||
let (layer, l0_count, frozen_count, frozen_size) = {
|
||||
let (layer, l0_count, frozen_count, frozen_size, open_layer_size) = {
|
||||
let layers = self.layers.read(LayerManagerLockHolder::FlushLoop).await;
|
||||
let Ok(lm) = layers.layer_map() else {
|
||||
info!("dropping out of flush loop for timeline shutdown");
|
||||
@@ -4738,8 +4797,13 @@ impl Timeline {
|
||||
.iter()
|
||||
.map(|l| l.estimated_in_mem_size())
|
||||
.sum();
|
||||
let open_layer_size: u64 = lm
|
||||
.open_layer
|
||||
.as_ref()
|
||||
.map(|l| l.estimated_in_mem_size())
|
||||
.unwrap_or(0);
|
||||
let layer = lm.frozen_layers.front().cloned();
|
||||
(layer, l0_count, frozen_count, frozen_size)
|
||||
(layer, l0_count, frozen_count, frozen_size, open_layer_size)
|
||||
// drop 'layers' lock
|
||||
};
|
||||
let Some(layer) = layer else {
|
||||
@@ -4752,7 +4816,7 @@ impl Timeline {
|
||||
if l0_count >= stall_threshold {
|
||||
warn!(
|
||||
"stalling layer flushes for compaction backpressure at {l0_count} \
|
||||
L0 layers ({frozen_count} frozen layers with {frozen_size} bytes)"
|
||||
L0 layers ({frozen_count} frozen layers with {frozen_size} bytes, {open_layer_size} bytes in open layer)"
|
||||
);
|
||||
let stall_timer = self
|
||||
.metrics
|
||||
@@ -4805,7 +4869,7 @@ impl Timeline {
|
||||
let delay = flush_duration.as_secs_f64();
|
||||
info!(
|
||||
"delaying layer flush by {delay:.3}s for compaction backpressure at \
|
||||
{l0_count} L0 layers ({frozen_count} frozen layers with {frozen_size} bytes)"
|
||||
{l0_count} L0 layers ({frozen_count} frozen layers with {frozen_size} bytes, {open_layer_size} bytes in open layer)"
|
||||
);
|
||||
let _delay_timer = self
|
||||
.metrics
|
||||
@@ -4944,7 +5008,7 @@ impl Timeline {
|
||||
ctx,
|
||||
)
|
||||
.await
|
||||
.map_err(|e| FlushLayerError::from_anyhow(self, e.into()))?;
|
||||
.map_err(|e| FlushLayerError::from_anyhow(self, e.into_anyhow()))?;
|
||||
|
||||
if self.cancel.is_cancelled() {
|
||||
return Err(FlushLayerError::Cancelled);
|
||||
@@ -5194,18 +5258,18 @@ impl Timeline {
|
||||
partition_size: u64,
|
||||
flags: EnumSet<CompactFlags>,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<((KeyPartitioning, SparseKeyPartitioning), Lsn), CompactionError> {
|
||||
) -> Result<((KeyPartitioning, SparseKeyPartitioning), Lsn), RepartitionError> {
|
||||
let Ok(mut guard) = self.partitioning.try_write_guard() else {
|
||||
// NB: there are two callers, one is the compaction task, of which there is only one per struct Tenant and hence Timeline.
|
||||
// The other is the initdb optimization in flush_frozen_layer, used by `boostrap_timeline`, which runs before `.activate()`
|
||||
// and hence before the compaction task starts.
|
||||
return Err(CompactionError::Other(anyhow!(
|
||||
return Err(RepartitionError::Other(anyhow!(
|
||||
"repartition() called concurrently"
|
||||
)));
|
||||
};
|
||||
let ((dense_partition, sparse_partition), partition_lsn) = &*guard.read();
|
||||
if lsn < *partition_lsn {
|
||||
return Err(CompactionError::Other(anyhow!(
|
||||
return Err(RepartitionError::Other(anyhow!(
|
||||
"repartition() called with LSN going backwards, this should not happen"
|
||||
)));
|
||||
}
|
||||
@@ -5226,7 +5290,10 @@ impl Timeline {
|
||||
));
|
||||
}
|
||||
|
||||
let (dense_ks, sparse_ks) = self.collect_keyspace(lsn, ctx).await?;
|
||||
let (dense_ks, sparse_ks) = self
|
||||
.collect_keyspace(lsn, ctx)
|
||||
.await
|
||||
.map_err(RepartitionError::CollectKeyspace)?;
|
||||
let dense_partitioning = dense_ks.partition(
|
||||
&self.shard_identity,
|
||||
partition_size,
|
||||
@@ -5304,6 +5371,7 @@ impl Timeline {
|
||||
ctx: &RequestContext,
|
||||
img_range: Range<Key>,
|
||||
io_concurrency: IoConcurrency,
|
||||
progress: Option<(usize, usize)>,
|
||||
) -> Result<ImageLayerCreationOutcome, CreateImageLayersError> {
|
||||
let mut wrote_keys = false;
|
||||
|
||||
@@ -5380,11 +5448,15 @@ impl Timeline {
|
||||
}
|
||||
}
|
||||
|
||||
let progress_report = progress
|
||||
.map(|(idx, total)| format!("({idx}/{total}) "))
|
||||
.unwrap_or_default();
|
||||
if wrote_keys {
|
||||
// Normal path: we have written some data into the new image layer for this
|
||||
// partition, so flush it to disk.
|
||||
info!(
|
||||
"produced image layer for rel {}",
|
||||
"{} produced image layer for rel {}",
|
||||
progress_report,
|
||||
ImageLayerName {
|
||||
key_range: img_range.clone(),
|
||||
lsn
|
||||
@@ -5394,7 +5466,12 @@ impl Timeline {
|
||||
unfinished_image_layer: image_layer_writer,
|
||||
})
|
||||
} else {
|
||||
tracing::debug!("no data in range {}-{}", img_range.start, img_range.end);
|
||||
tracing::debug!(
|
||||
"{} no data in range {}-{}",
|
||||
progress_report,
|
||||
img_range.start,
|
||||
img_range.end
|
||||
);
|
||||
Ok(ImageLayerCreationOutcome::Empty)
|
||||
}
|
||||
}
|
||||
@@ -5629,7 +5706,8 @@ impl Timeline {
|
||||
}
|
||||
}
|
||||
|
||||
for partition in partition_parts.iter() {
|
||||
let total = partition_parts.len();
|
||||
for (idx, partition) in partition_parts.iter().enumerate() {
|
||||
if self.cancel.is_cancelled() {
|
||||
return Err(CreateImageLayersError::Cancelled);
|
||||
}
|
||||
@@ -5714,6 +5792,7 @@ impl Timeline {
|
||||
ctx,
|
||||
img_range.clone(),
|
||||
io_concurrency,
|
||||
Some((idx, total)),
|
||||
)
|
||||
.await?
|
||||
} else {
|
||||
@@ -5979,52 +6058,21 @@ impl Drop for Timeline {
|
||||
pub(crate) enum CompactionError {
|
||||
#[error("The timeline or pageserver is shutting down")]
|
||||
ShuttingDown,
|
||||
/// Compaction tried to offload a timeline and failed
|
||||
#[error("Failed to offload timeline: {0}")]
|
||||
Offload(OffloadError),
|
||||
/// Compaction cannot be done right now; page reconstruction and so on.
|
||||
#[error("Failed to collect keyspace: {0}")]
|
||||
CollectKeySpaceError(#[from] CollectKeySpaceError),
|
||||
#[error(transparent)]
|
||||
Other(anyhow::Error),
|
||||
#[error("Compaction already running: {0}")]
|
||||
AlreadyRunning(&'static str),
|
||||
}
|
||||
|
||||
impl CompactionError {
|
||||
/// Errors that can be ignored, i.e., cancel and shutdown.
|
||||
pub fn is_cancel(&self) -> bool {
|
||||
matches!(
|
||||
self,
|
||||
matches!(self, Self::ShuttingDown)
|
||||
}
|
||||
|
||||
pub fn from_collect_keyspace(err: CollectKeySpaceError) -> Self {
|
||||
if err.is_cancel() {
|
||||
Self::ShuttingDown
|
||||
| Self::AlreadyRunning(_)
|
||||
| Self::CollectKeySpaceError(CollectKeySpaceError::Cancelled)
|
||||
| Self::CollectKeySpaceError(CollectKeySpaceError::PageRead(
|
||||
PageReconstructError::Cancelled
|
||||
))
|
||||
| Self::Offload(OffloadError::Cancelled)
|
||||
)
|
||||
}
|
||||
|
||||
/// Critical errors that indicate data corruption.
|
||||
pub fn is_critical(&self) -> bool {
|
||||
matches!(
|
||||
self,
|
||||
Self::CollectKeySpaceError(
|
||||
CollectKeySpaceError::Decode(_)
|
||||
| CollectKeySpaceError::PageRead(
|
||||
PageReconstructError::MissingKey(_) | PageReconstructError::WalRedo(_),
|
||||
)
|
||||
)
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
impl From<OffloadError> for CompactionError {
|
||||
fn from(e: OffloadError) -> Self {
|
||||
match e {
|
||||
OffloadError::Cancelled => Self::ShuttingDown,
|
||||
_ => Self::Offload(e),
|
||||
} else {
|
||||
Self::Other(err.into_anyhow())
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -6702,7 +6750,7 @@ impl Timeline {
|
||||
}
|
||||
|
||||
/// Reconstruct a value, using the given base image and WAL records in 'data'.
|
||||
async fn reconstruct_value(
|
||||
pub(crate) async fn reconstruct_value(
|
||||
&self,
|
||||
key: Key,
|
||||
request_lsn: Lsn,
|
||||
@@ -6778,7 +6826,11 @@ impl Timeline {
|
||||
Err(walredo::Error::Cancelled) => return Err(PageReconstructError::Cancelled),
|
||||
Err(walredo::Error::Other(err)) => {
|
||||
if fire_critical_error {
|
||||
critical!("walredo failure during page reconstruction: {err:?}");
|
||||
critical_timeline!(
|
||||
self.tenant_shard_id,
|
||||
self.timeline_id,
|
||||
"walredo failure during page reconstruction: {err:?}"
|
||||
);
|
||||
}
|
||||
return Err(PageReconstructError::WalRedo(
|
||||
err.context("reconstruct a page image"),
|
||||
|
||||
@@ -9,14 +9,15 @@ use std::ops::{Deref, Range};
|
||||
use std::sync::Arc;
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
use super::layer_manager::{LayerManagerLockHolder, LayerManagerReadGuard};
|
||||
use super::layer_manager::LayerManagerLockHolder;
|
||||
use super::{
|
||||
CompactFlags, CompactOptions, CompactionError, CreateImageLayersError, DurationRecorder,
|
||||
GetVectoredError, ImageLayerCreationMode, LastImageLayerCreationStatus, RecordedDuration,
|
||||
Timeline,
|
||||
};
|
||||
|
||||
use crate::tenant::timeline::DeltaEntry;
|
||||
use crate::pgdatadir_mapping::CollectKeySpaceError;
|
||||
use crate::tenant::timeline::{DeltaEntry, RepartitionError};
|
||||
use crate::walredo::RedoAttemptType;
|
||||
use anyhow::{Context, anyhow};
|
||||
use bytes::Bytes;
|
||||
@@ -36,7 +37,7 @@ use serde::Serialize;
|
||||
use tokio::sync::{OwnedSemaphorePermit, Semaphore};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::{Instrument, debug, error, info, info_span, trace, warn};
|
||||
use utils::critical;
|
||||
use utils::critical_timeline;
|
||||
use utils::id::TimelineId;
|
||||
use utils::lsn::Lsn;
|
||||
use wal_decoder::models::record::NeonWalRecord;
|
||||
@@ -64,7 +65,7 @@ use crate::tenant::timeline::{
|
||||
DeltaLayerWriter, ImageLayerCreationOutcome, ImageLayerWriter, IoConcurrency, Layer,
|
||||
ResidentLayer, drop_layer_manager_rlock,
|
||||
};
|
||||
use crate::tenant::{DeltaLayer, MaybeOffloaded};
|
||||
use crate::tenant::{DeltaLayer, MaybeOffloaded, PageReconstructError};
|
||||
use crate::virtual_file::{MaybeFatalIo, VirtualFile};
|
||||
|
||||
/// Maximum number of deltas before generating an image layer in bottom-most compaction.
|
||||
@@ -101,7 +102,11 @@ pub enum GcCompactionQueueItem {
|
||||
/// Whether the compaction is triggered automatically (determines whether we need to update L2 LSN)
|
||||
auto: bool,
|
||||
},
|
||||
SubCompactionJob(CompactOptions),
|
||||
SubCompactionJob {
|
||||
i: usize,
|
||||
total: usize,
|
||||
options: CompactOptions,
|
||||
},
|
||||
Notify(GcCompactionJobId, Option<Lsn>),
|
||||
}
|
||||
|
||||
@@ -163,7 +168,7 @@ impl GcCompactionQueueItem {
|
||||
running,
|
||||
job_id: id.0,
|
||||
}),
|
||||
GcCompactionQueueItem::SubCompactionJob(options) => Some(CompactInfoResponse {
|
||||
GcCompactionQueueItem::SubCompactionJob { options, .. } => Some(CompactInfoResponse {
|
||||
compact_key_range: options.compact_key_range,
|
||||
compact_lsn_range: options.compact_lsn_range,
|
||||
sub_compaction: options.sub_compaction,
|
||||
@@ -489,7 +494,7 @@ impl GcCompactionQueue {
|
||||
.map(|job| job.compact_lsn_range.end)
|
||||
.max()
|
||||
.unwrap();
|
||||
for job in jobs {
|
||||
for (i, job) in jobs.into_iter().enumerate() {
|
||||
// Unfortunately we need to convert the `GcCompactJob` back to `CompactionOptions`
|
||||
// until we do further refactors to allow directly call `compact_with_gc`.
|
||||
let mut flags: EnumSet<CompactFlags> = EnumSet::default();
|
||||
@@ -507,7 +512,11 @@ impl GcCompactionQueue {
|
||||
compact_lsn_range: Some(job.compact_lsn_range.into()),
|
||||
sub_compaction_max_job_size_mb: None,
|
||||
};
|
||||
pending_tasks.push(GcCompactionQueueItem::SubCompactionJob(options));
|
||||
pending_tasks.push(GcCompactionQueueItem::SubCompactionJob {
|
||||
options,
|
||||
i,
|
||||
total: jobs_len,
|
||||
});
|
||||
}
|
||||
|
||||
if !auto {
|
||||
@@ -564,7 +573,7 @@ impl GcCompactionQueue {
|
||||
match res {
|
||||
Ok(res) => Ok(res),
|
||||
Err(CompactionError::ShuttingDown) => Err(CompactionError::ShuttingDown),
|
||||
Err(_) => {
|
||||
Err(CompactionError::Other(_)) => {
|
||||
// There are some cases where traditional gc might collect some layer
|
||||
// files causing gc-compaction cannot read the full history of the key.
|
||||
// This needs to be resolved in the long-term by improving the compaction
|
||||
@@ -583,9 +592,9 @@ impl GcCompactionQueue {
|
||||
timeline: &Arc<Timeline>,
|
||||
) -> Result<CompactionOutcome, CompactionError> {
|
||||
let Ok(_one_op_at_a_time_guard) = self.consumer_lock.try_lock() else {
|
||||
return Err(CompactionError::AlreadyRunning(
|
||||
"cannot run gc-compaction because another gc-compaction is running. This should not happen because we only call this function from the gc-compaction queue.",
|
||||
));
|
||||
return Err(CompactionError::Other(anyhow::anyhow!(
|
||||
"cannot run gc-compaction because another gc-compaction is running. This should not happen because we only call this function from the gc-compaction queue."
|
||||
)));
|
||||
};
|
||||
let has_pending_tasks;
|
||||
let mut yield_for_l0 = false;
|
||||
@@ -651,7 +660,7 @@ impl GcCompactionQueue {
|
||||
}
|
||||
}
|
||||
}
|
||||
GcCompactionQueueItem::SubCompactionJob(options) => {
|
||||
GcCompactionQueueItem::SubCompactionJob { options, i, total } => {
|
||||
// TODO: error handling, clear the queue if any task fails?
|
||||
let _gc_guard = match gc_block.start().await {
|
||||
Ok(guard) => guard,
|
||||
@@ -663,6 +672,7 @@ impl GcCompactionQueue {
|
||||
)));
|
||||
}
|
||||
};
|
||||
info!("running gc-compaction subcompaction job {}/{}", i, total);
|
||||
let res = timeline.compact_with_options(cancel, options, ctx).await;
|
||||
let compaction_result = match res {
|
||||
Ok(res) => res,
|
||||
@@ -1310,7 +1320,7 @@ impl Timeline {
|
||||
|| cfg!(feature = "testing")
|
||||
|| self
|
||||
.feature_resolver
|
||||
.evaluate_boolean("image-compaction-boundary", self.tenant_shard_id.tenant_id)
|
||||
.evaluate_boolean("image-compaction-boundary")
|
||||
.is_ok()
|
||||
{
|
||||
let last_repartition_lsn = self.partitioning.read().1;
|
||||
@@ -1381,7 +1391,11 @@ impl Timeline {
|
||||
GetVectoredError::MissingKey(_),
|
||||
) = err
|
||||
{
|
||||
critical!("missing key during compaction: {err:?}");
|
||||
critical_timeline!(
|
||||
self.tenant_shard_id,
|
||||
self.timeline_id,
|
||||
"missing key during compaction: {err:?}"
|
||||
);
|
||||
}
|
||||
})?;
|
||||
|
||||
@@ -1404,18 +1418,33 @@ impl Timeline {
|
||||
}
|
||||
|
||||
// Suppress errors when cancelled.
|
||||
Err(_) if self.cancel.is_cancelled() => {}
|
||||
Err(err) if err.is_cancel() => {}
|
||||
|
||||
// Alert on critical errors that indicate data corruption.
|
||||
Err(err) if err.is_critical() => {
|
||||
critical!("could not compact, repartitioning keyspace failed: {err:?}");
|
||||
}
|
||||
|
||||
// Log other errors. No partitioning? This is normal, if the timeline was just created
|
||||
//
|
||||
// Log other errors but continue. Failure to repartition is normal, if the timeline was just created
|
||||
// as an empty timeline. Also in unit tests, when we use the timeline as a simple
|
||||
// key-value store, ignoring the datadir layout. Log the error but continue.
|
||||
Err(err) => error!("could not compact, repartitioning keyspace failed: {err:?}"),
|
||||
//
|
||||
// TODO:
|
||||
// 1. shouldn't we return early here if we observe cancellation
|
||||
// 2. Experiment: can we stop checking self.cancel here?
|
||||
Err(_) if self.cancel.is_cancelled() => {} // TODO: try how we fare removing this branch
|
||||
Err(err) if err.is_cancel() => {}
|
||||
Err(RepartitionError::CollectKeyspace(
|
||||
e @ CollectKeySpaceError::Decode(_)
|
||||
| e @ CollectKeySpaceError::PageRead(
|
||||
PageReconstructError::MissingKey(_) | PageReconstructError::WalRedo(_),
|
||||
),
|
||||
)) => {
|
||||
// Alert on critical errors that indicate data corruption.
|
||||
critical_timeline!(
|
||||
self.tenant_shard_id,
|
||||
self.timeline_id,
|
||||
"could not compact, repartitioning keyspace failed: {e:?}"
|
||||
);
|
||||
}
|
||||
Err(e) => error!(
|
||||
"could not compact, repartitioning keyspace failed: {:?}",
|
||||
e.into_anyhow()
|
||||
),
|
||||
};
|
||||
|
||||
let partition_count = self.partitioning.read().0.0.parts.len();
|
||||
@@ -1591,13 +1620,15 @@ impl Timeline {
|
||||
let started = Instant::now();
|
||||
|
||||
let mut replace_image_layers = Vec::new();
|
||||
let total = layers_to_rewrite.len();
|
||||
|
||||
for layer in layers_to_rewrite {
|
||||
for (i, layer) in layers_to_rewrite.into_iter().enumerate() {
|
||||
if self.cancel.is_cancelled() {
|
||||
return Err(CompactionError::ShuttingDown);
|
||||
}
|
||||
|
||||
info!(layer=%layer, "rewriting layer after shard split");
|
||||
info!(layer=%layer, "rewriting layer after shard split: {}/{}", i, total);
|
||||
|
||||
let mut image_layer_writer = ImageLayerWriter::new(
|
||||
self.conf,
|
||||
self.timeline_id,
|
||||
@@ -1779,20 +1810,14 @@ impl Timeline {
|
||||
} = {
|
||||
let phase1_span = info_span!("compact_level0_phase1");
|
||||
let ctx = ctx.attached_child();
|
||||
let mut stats = CompactLevel0Phase1StatsBuilder {
|
||||
let stats = CompactLevel0Phase1StatsBuilder {
|
||||
version: Some(2),
|
||||
tenant_id: Some(self.tenant_shard_id),
|
||||
timeline_id: Some(self.timeline_id),
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
let begin = tokio::time::Instant::now();
|
||||
let phase1_layers_locked = self.layers.read(LayerManagerLockHolder::Compaction).await;
|
||||
let now = tokio::time::Instant::now();
|
||||
stats.read_lock_acquisition_micros =
|
||||
DurationRecorder::Recorded(RecordedDuration(now - begin), now);
|
||||
self.compact_level0_phase1(
|
||||
phase1_layers_locked,
|
||||
stats,
|
||||
target_file_size,
|
||||
force_compaction_ignore_threshold,
|
||||
@@ -1813,16 +1838,19 @@ impl Timeline {
|
||||
}
|
||||
|
||||
/// Level0 files first phase of compaction, explained in the [`Self::compact_legacy`] comment.
|
||||
async fn compact_level0_phase1<'a>(
|
||||
self: &'a Arc<Self>,
|
||||
guard: LayerManagerReadGuard<'a>,
|
||||
async fn compact_level0_phase1(
|
||||
self: &Arc<Self>,
|
||||
mut stats: CompactLevel0Phase1StatsBuilder,
|
||||
target_file_size: u64,
|
||||
force_compaction_ignore_threshold: bool,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<CompactLevel0Phase1Result, CompactionError> {
|
||||
stats.read_lock_held_spawn_blocking_startup_micros =
|
||||
stats.read_lock_acquisition_micros.till_now(); // set by caller
|
||||
let begin = tokio::time::Instant::now();
|
||||
let guard = self.layers.read(LayerManagerLockHolder::Compaction).await;
|
||||
let now = tokio::time::Instant::now();
|
||||
stats.read_lock_acquisition_micros =
|
||||
DurationRecorder::Recorded(RecordedDuration(now - begin), now);
|
||||
|
||||
let layers = guard.layer_map()?;
|
||||
let level0_deltas = layers.level0_deltas();
|
||||
stats.level0_deltas_count = Some(level0_deltas.len());
|
||||
@@ -1857,6 +1885,12 @@ impl Timeline {
|
||||
.map(|x| guard.get_from_desc(x))
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
drop_layer_manager_rlock(guard);
|
||||
|
||||
// The is the last LSN that we have seen for L0 compaction in the timeline. This LSN might be updated
|
||||
// by the time we finish the compaction. So we need to get it here.
|
||||
let l0_last_record_lsn = self.get_last_record_lsn();
|
||||
|
||||
// Gather the files to compact in this iteration.
|
||||
//
|
||||
// Start with the oldest Level 0 delta file, and collect any other
|
||||
@@ -1944,9 +1978,7 @@ impl Timeline {
|
||||
// we don't accidentally use it later in the function.
|
||||
drop(level0_deltas);
|
||||
|
||||
stats.read_lock_held_prerequisites_micros = stats
|
||||
.read_lock_held_spawn_blocking_startup_micros
|
||||
.till_now();
|
||||
stats.compaction_prerequisites_micros = stats.read_lock_acquisition_micros.till_now();
|
||||
|
||||
// TODO: replace with streaming k-merge
|
||||
let all_keys = {
|
||||
@@ -1968,7 +2000,7 @@ impl Timeline {
|
||||
all_keys
|
||||
};
|
||||
|
||||
stats.read_lock_held_key_sort_micros = stats.read_lock_held_prerequisites_micros.till_now();
|
||||
stats.read_lock_held_key_sort_micros = stats.compaction_prerequisites_micros.till_now();
|
||||
|
||||
// Determine N largest holes where N is number of compacted layers. The vec is sorted by key range start.
|
||||
//
|
||||
@@ -2002,7 +2034,6 @@ impl Timeline {
|
||||
}
|
||||
}
|
||||
let max_holes = deltas_to_compact.len();
|
||||
let last_record_lsn = self.get_last_record_lsn();
|
||||
let min_hole_range = (target_file_size / page_cache::PAGE_SZ as u64) as i128;
|
||||
let min_hole_coverage_size = 3; // TODO: something more flexible?
|
||||
// min-heap (reserve space for one more element added before eviction)
|
||||
@@ -2021,8 +2052,12 @@ impl Timeline {
|
||||
// has not so much sense, because largest holes will corresponds field1/field2 changes.
|
||||
// But we are mostly interested to eliminate holes which cause generation of excessive image layers.
|
||||
// That is why it is better to measure size of hole as number of covering image layers.
|
||||
let coverage_size =
|
||||
layers.image_coverage(&key_range, last_record_lsn).len();
|
||||
let coverage_size = {
|
||||
// TODO: optimize this with copy-on-write layer map.
|
||||
let guard = self.layers.read(LayerManagerLockHolder::Compaction).await;
|
||||
let layers = guard.layer_map()?;
|
||||
layers.image_coverage(&key_range, l0_last_record_lsn).len()
|
||||
};
|
||||
if coverage_size >= min_hole_coverage_size {
|
||||
heap.push(Hole {
|
||||
key_range,
|
||||
@@ -2041,7 +2076,6 @@ impl Timeline {
|
||||
holes
|
||||
};
|
||||
stats.read_lock_held_compute_holes_micros = stats.read_lock_held_key_sort_micros.till_now();
|
||||
drop_layer_manager_rlock(guard);
|
||||
|
||||
if self.cancel.is_cancelled() {
|
||||
return Err(CompactionError::ShuttingDown);
|
||||
@@ -2382,9 +2416,8 @@ struct CompactLevel0Phase1StatsBuilder {
|
||||
tenant_id: Option<TenantShardId>,
|
||||
timeline_id: Option<TimelineId>,
|
||||
read_lock_acquisition_micros: DurationRecorder,
|
||||
read_lock_held_spawn_blocking_startup_micros: DurationRecorder,
|
||||
read_lock_held_key_sort_micros: DurationRecorder,
|
||||
read_lock_held_prerequisites_micros: DurationRecorder,
|
||||
compaction_prerequisites_micros: DurationRecorder,
|
||||
read_lock_held_compute_holes_micros: DurationRecorder,
|
||||
read_lock_drop_micros: DurationRecorder,
|
||||
write_layer_files_micros: DurationRecorder,
|
||||
@@ -2399,9 +2432,8 @@ struct CompactLevel0Phase1Stats {
|
||||
tenant_id: TenantShardId,
|
||||
timeline_id: TimelineId,
|
||||
read_lock_acquisition_micros: RecordedDuration,
|
||||
read_lock_held_spawn_blocking_startup_micros: RecordedDuration,
|
||||
read_lock_held_key_sort_micros: RecordedDuration,
|
||||
read_lock_held_prerequisites_micros: RecordedDuration,
|
||||
compaction_prerequisites_micros: RecordedDuration,
|
||||
read_lock_held_compute_holes_micros: RecordedDuration,
|
||||
read_lock_drop_micros: RecordedDuration,
|
||||
write_layer_files_micros: RecordedDuration,
|
||||
@@ -2426,16 +2458,12 @@ impl TryFrom<CompactLevel0Phase1StatsBuilder> for CompactLevel0Phase1Stats {
|
||||
.read_lock_acquisition_micros
|
||||
.into_recorded()
|
||||
.ok_or_else(|| anyhow!("read_lock_acquisition_micros not set"))?,
|
||||
read_lock_held_spawn_blocking_startup_micros: value
|
||||
.read_lock_held_spawn_blocking_startup_micros
|
||||
.into_recorded()
|
||||
.ok_or_else(|| anyhow!("read_lock_held_spawn_blocking_startup_micros not set"))?,
|
||||
read_lock_held_key_sort_micros: value
|
||||
.read_lock_held_key_sort_micros
|
||||
.into_recorded()
|
||||
.ok_or_else(|| anyhow!("read_lock_held_key_sort_micros not set"))?,
|
||||
read_lock_held_prerequisites_micros: value
|
||||
.read_lock_held_prerequisites_micros
|
||||
compaction_prerequisites_micros: value
|
||||
.compaction_prerequisites_micros
|
||||
.into_recorded()
|
||||
.ok_or_else(|| anyhow!("read_lock_held_prerequisites_micros not set"))?,
|
||||
read_lock_held_compute_holes_micros: value
|
||||
@@ -2502,7 +2530,10 @@ impl Timeline {
|
||||
return Err(CompactionError::ShuttingDown);
|
||||
}
|
||||
|
||||
let (dense_ks, _sparse_ks) = self.collect_keyspace(end_lsn, ctx).await?;
|
||||
let (dense_ks, _sparse_ks) = self
|
||||
.collect_keyspace(end_lsn, ctx)
|
||||
.await
|
||||
.map_err(CompactionError::from_collect_keyspace)?;
|
||||
// TODO(chi): ignore sparse_keyspace for now, compact it in the future.
|
||||
let mut adaptor = TimelineAdaptor::new(self, (end_lsn, dense_ks));
|
||||
|
||||
@@ -4343,6 +4374,7 @@ impl TimelineAdaptor {
|
||||
ctx,
|
||||
key_range.clone(),
|
||||
IoConcurrency::sequential(),
|
||||
None,
|
||||
)
|
||||
.await?;
|
||||
|
||||
|
||||
@@ -182,6 +182,7 @@ pub(crate) async fn generate_tombstone_image_layer(
|
||||
detached: &Arc<Timeline>,
|
||||
ancestor: &Arc<Timeline>,
|
||||
ancestor_lsn: Lsn,
|
||||
historic_layers_to_copy: &Vec<Layer>,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<Option<ResidentLayer>, Error> {
|
||||
tracing::info!(
|
||||
@@ -199,6 +200,20 @@ pub(crate) async fn generate_tombstone_image_layer(
|
||||
let image_lsn = ancestor_lsn;
|
||||
|
||||
{
|
||||
for layer in historic_layers_to_copy {
|
||||
let desc = layer.layer_desc();
|
||||
if !desc.is_delta
|
||||
&& desc.lsn_range.start == image_lsn
|
||||
&& overlaps_with(&key_range, &desc.key_range)
|
||||
{
|
||||
tracing::info!(
|
||||
layer=%layer, "will copy tombstone from ancestor instead of creating a new one"
|
||||
);
|
||||
|
||||
return Ok(None);
|
||||
}
|
||||
}
|
||||
|
||||
let layers = detached
|
||||
.layers
|
||||
.read(LayerManagerLockHolder::DetachAncestor)
|
||||
@@ -450,7 +465,8 @@ pub(super) async fn prepare(
|
||||
Vec::with_capacity(straddling_branchpoint.len() + rest_of_historic.len() + 1);
|
||||
|
||||
if let Some(tombstone_layer) =
|
||||
generate_tombstone_image_layer(detached, &ancestor, ancestor_lsn, ctx).await?
|
||||
generate_tombstone_image_layer(detached, &ancestor, ancestor_lsn, &rest_of_historic, ctx)
|
||||
.await?
|
||||
{
|
||||
new_layers.push(tombstone_layer.into());
|
||||
}
|
||||
|
||||
@@ -212,8 +212,12 @@
|
||||
//! to the parent shard during a shard split. Eventually, the shard split task will
|
||||
//! shut down the parent => case (1).
|
||||
|
||||
use std::collections::{HashMap, hash_map};
|
||||
use std::sync::{Arc, Mutex, Weak};
|
||||
use std::collections::HashMap;
|
||||
use std::collections::hash_map;
|
||||
use std::sync::Arc;
|
||||
use std::sync::Mutex;
|
||||
use std::sync::Weak;
|
||||
use std::time::Duration;
|
||||
|
||||
use pageserver_api::shard::ShardIdentity;
|
||||
use tracing::{instrument, trace};
|
||||
@@ -333,6 +337,44 @@ enum RoutingResult<T: Types> {
|
||||
}
|
||||
|
||||
impl<T: Types> Cache<T> {
|
||||
/* BEGIN_HADRON */
|
||||
/// A wrapper of do_get to resolve the tenant shard for a get page request.
|
||||
#[instrument(level = "trace", skip_all)]
|
||||
pub(crate) async fn get(
|
||||
&mut self,
|
||||
timeline_id: TimelineId,
|
||||
shard_selector: ShardSelector,
|
||||
tenant_manager: &T::TenantManager,
|
||||
) -> Result<Handle<T>, GetError<T>> {
|
||||
const GET_MAX_RETRIES: usize = 10;
|
||||
const RETRY_BACKOFF: Duration = Duration::from_millis(100);
|
||||
let mut attempt = 0;
|
||||
loop {
|
||||
attempt += 1;
|
||||
match self
|
||||
.do_get(timeline_id, shard_selector, tenant_manager)
|
||||
.await
|
||||
{
|
||||
Ok(handle) => return Ok(handle),
|
||||
Err(e) => {
|
||||
// Retry on tenant manager error to handle tenant split more gracefully
|
||||
if attempt < GET_MAX_RETRIES {
|
||||
tracing::warn!(
|
||||
"Fail to resolve tenant shard in attempt {}: {:?}. Retrying...",
|
||||
attempt,
|
||||
e
|
||||
);
|
||||
tokio::time::sleep(RETRY_BACKOFF).await;
|
||||
continue;
|
||||
} else {
|
||||
return Err(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
/* END_HADRON */
|
||||
|
||||
/// See module-level comment for details.
|
||||
///
|
||||
/// Does NOT check for the shutdown state of [`Types::Timeline`].
|
||||
@@ -341,7 +383,7 @@ impl<T: Types> Cache<T> {
|
||||
/// and if so, return an error that causes the page service to
|
||||
/// close the connection.
|
||||
#[instrument(level = "trace", skip_all)]
|
||||
pub(crate) async fn get(
|
||||
async fn do_get(
|
||||
&mut self,
|
||||
timeline_id: TimelineId,
|
||||
shard_selector: ShardSelector,
|
||||
@@ -879,6 +921,7 @@ mod tests {
|
||||
.await
|
||||
.err()
|
||||
.expect("documented behavior: can't get new handle after shutdown");
|
||||
|
||||
assert_eq!(cache.map.len(), 1, "next access cleans up the cache");
|
||||
|
||||
cache
|
||||
|
||||
@@ -17,8 +17,8 @@ pub(crate) enum OffloadError {
|
||||
Cancelled,
|
||||
#[error("Timeline is not archived")]
|
||||
NotArchived,
|
||||
#[error(transparent)]
|
||||
RemoteStorage(anyhow::Error),
|
||||
#[error("Offload or deletion already in progress")]
|
||||
AlreadyInProgress,
|
||||
#[error("Unexpected offload error: {0}")]
|
||||
Other(anyhow::Error),
|
||||
}
|
||||
@@ -27,7 +27,7 @@ impl From<TenantManifestError> for OffloadError {
|
||||
fn from(e: TenantManifestError) -> Self {
|
||||
match e {
|
||||
TenantManifestError::Cancelled => Self::Cancelled,
|
||||
TenantManifestError::RemoteStorage(e) => Self::RemoteStorage(e),
|
||||
TenantManifestError::RemoteStorage(e) => Self::Other(e),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -44,20 +44,26 @@ pub(crate) async fn offload_timeline(
|
||||
timeline.timeline_id,
|
||||
TimelineDeleteGuardKind::Offload,
|
||||
);
|
||||
if let Err(DeleteTimelineError::HasChildren(children)) = delete_guard_res {
|
||||
let is_archived = timeline.is_archived();
|
||||
if is_archived == Some(true) {
|
||||
tracing::error!("timeline is archived but has non-archived children: {children:?}");
|
||||
let (timeline, guard) = match delete_guard_res {
|
||||
Ok(timeline_and_guard) => timeline_and_guard,
|
||||
Err(DeleteTimelineError::HasChildren(children)) => {
|
||||
let is_archived = timeline.is_archived();
|
||||
if is_archived == Some(true) {
|
||||
tracing::error!("timeline is archived but has non-archived children: {children:?}");
|
||||
return Err(OffloadError::NotArchived);
|
||||
}
|
||||
tracing::info!(
|
||||
?is_archived,
|
||||
"timeline is not archived and has unarchived children"
|
||||
);
|
||||
return Err(OffloadError::NotArchived);
|
||||
}
|
||||
tracing::info!(
|
||||
?is_archived,
|
||||
"timeline is not archived and has unarchived children"
|
||||
);
|
||||
return Err(OffloadError::NotArchived);
|
||||
Err(DeleteTimelineError::AlreadyInProgress(_)) => {
|
||||
tracing::info!("timeline offload or deletion already in progress");
|
||||
return Err(OffloadError::AlreadyInProgress);
|
||||
}
|
||||
Err(e) => return Err(OffloadError::Other(anyhow::anyhow!(e))),
|
||||
};
|
||||
let (timeline, guard) =
|
||||
delete_guard_res.map_err(|e| OffloadError::Other(anyhow::anyhow!(e)))?;
|
||||
|
||||
let TimelineOrOffloaded::Timeline(timeline) = timeline else {
|
||||
tracing::error!("timeline already offloaded, but given timeline object");
|
||||
|
||||
@@ -63,7 +63,6 @@ pub struct WalReceiver {
|
||||
/// All task spawned by [`WalReceiver::start`] and its children are sensitive to this token.
|
||||
/// It's a child token of [`Timeline`] so that timeline shutdown can cancel WalReceiver tasks early for `freeze_and_flush=true`.
|
||||
cancel: CancellationToken,
|
||||
task: tokio::task::JoinHandle<()>,
|
||||
}
|
||||
|
||||
impl WalReceiver {
|
||||
@@ -80,7 +79,7 @@ impl WalReceiver {
|
||||
let loop_status = Arc::new(std::sync::RwLock::new(None));
|
||||
let manager_status = Arc::clone(&loop_status);
|
||||
let cancel = timeline.cancel.child_token();
|
||||
let task = WALRECEIVER_RUNTIME.spawn({
|
||||
let _task = WALRECEIVER_RUNTIME.spawn({
|
||||
let cancel = cancel.clone();
|
||||
async move {
|
||||
debug_assert_current_span_has_tenant_and_timeline_id();
|
||||
@@ -121,25 +120,14 @@ impl WalReceiver {
|
||||
Self {
|
||||
manager_status,
|
||||
cancel,
|
||||
task,
|
||||
}
|
||||
}
|
||||
|
||||
#[instrument(skip_all, level = tracing::Level::DEBUG)]
|
||||
pub async fn shutdown(self) {
|
||||
pub async fn cancel(self) {
|
||||
debug_assert_current_span_has_tenant_and_timeline_id();
|
||||
debug!("cancelling walreceiver tasks");
|
||||
self.cancel.cancel();
|
||||
match self.task.await {
|
||||
Ok(()) => debug!("Shutdown success"),
|
||||
Err(je) if je.is_cancelled() => unreachable!("not used"),
|
||||
Err(je) if je.is_panic() => {
|
||||
// already logged by panic hook
|
||||
}
|
||||
Err(je) => {
|
||||
error!("shutdown walreceiver task join error: {je}")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn status(&self) -> Option<ConnectionManagerStatus> {
|
||||
|
||||
@@ -100,6 +100,7 @@ pub(super) async fn connection_manager_loop_step(
|
||||
// with other streams on this client (other connection managers). When
|
||||
// object goes out of scope, stream finishes in drop() automatically.
|
||||
let mut broker_subscription = subscribe_for_timeline_updates(broker_client, id, cancel).await?;
|
||||
let mut broker_reset_interval = tokio::time::interval(tokio::time::Duration::from_secs(30));
|
||||
debug!("Subscribed for broker timeline updates");
|
||||
|
||||
loop {
|
||||
@@ -156,7 +157,10 @@ pub(super) async fn connection_manager_loop_step(
|
||||
// Got a new update from the broker
|
||||
broker_update = broker_subscription.message() /* TODO: review cancellation-safety */ => {
|
||||
match broker_update {
|
||||
Ok(Some(broker_update)) => connection_manager_state.register_timeline_update(broker_update),
|
||||
Ok(Some(broker_update)) => {
|
||||
broker_reset_interval.reset();
|
||||
connection_manager_state.register_timeline_update(broker_update);
|
||||
},
|
||||
Err(status) => {
|
||||
match status.code() {
|
||||
Code::Unknown if status.message().contains("stream closed because of a broken pipe") || status.message().contains("connection reset") || status.message().contains("error reading a body from connection") => {
|
||||
@@ -178,6 +182,21 @@ pub(super) async fn connection_manager_loop_step(
|
||||
}
|
||||
},
|
||||
|
||||
// If we've not received any updates from the broker from a while, are waiting for WAL
|
||||
// and have no safekeeper connection or connection candidates, then it might be that
|
||||
// the broker subscription is wedged. Drop the currrent subscription and re-subscribe
|
||||
// with the goal of unblocking it.
|
||||
_ = broker_reset_interval.tick() => {
|
||||
let awaiting_lsn = wait_lsn_status.borrow().is_some();
|
||||
let no_candidates = connection_manager_state.wal_stream_candidates.is_empty();
|
||||
let no_connection = connection_manager_state.wal_connection.is_none();
|
||||
|
||||
if awaiting_lsn && no_candidates && no_connection {
|
||||
tracing::warn!("No broker updates received for a while, but waiting for WAL. Re-setting stream ...");
|
||||
broker_subscription = subscribe_for_timeline_updates(broker_client, id, cancel).await?;
|
||||
}
|
||||
},
|
||||
|
||||
new_event = async {
|
||||
// Reminder: this match arm needs to be cancellation-safe.
|
||||
loop {
|
||||
|
||||
@@ -25,7 +25,7 @@ use tokio_postgres::replication::ReplicationStream;
|
||||
use tokio_postgres::{Client, SimpleQueryMessage, SimpleQueryRow};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::{Instrument, debug, error, info, trace, warn};
|
||||
use utils::critical;
|
||||
use utils::critical_timeline;
|
||||
use utils::id::NodeId;
|
||||
use utils::lsn::Lsn;
|
||||
use utils::pageserver_feedback::PageserverFeedback;
|
||||
@@ -275,20 +275,12 @@ pub(super) async fn handle_walreceiver_connection(
|
||||
let copy_stream = replication_client.copy_both_simple(&query).await?;
|
||||
let mut physical_stream = pin!(ReplicationStream::new(copy_stream));
|
||||
|
||||
let walingest_future = WalIngest::new(timeline.as_ref(), startpoint, &ctx);
|
||||
let walingest_res = select! {
|
||||
walingest_res = walingest_future => walingest_res,
|
||||
_ = cancellation.cancelled() => {
|
||||
// We are doing reads in WalIngest::new, and those can hang as they come from the network.
|
||||
// Timeline cancellation hits the walreceiver cancellation token before it hits the timeline global one.
|
||||
debug!("Connection cancelled");
|
||||
return Err(WalReceiverError::Cancelled);
|
||||
},
|
||||
};
|
||||
let mut walingest = walingest_res.map_err(|e| match e.kind {
|
||||
crate::walingest::WalIngestErrorKind::Cancelled => WalReceiverError::Cancelled,
|
||||
_ => WalReceiverError::Other(e.into()),
|
||||
})?;
|
||||
let mut walingest = WalIngest::new(timeline.as_ref(), startpoint, &ctx)
|
||||
.await
|
||||
.map_err(|e| match e.kind {
|
||||
crate::walingest::WalIngestErrorKind::Cancelled => WalReceiverError::Cancelled,
|
||||
_ => WalReceiverError::Other(e.into()),
|
||||
})?;
|
||||
|
||||
let (format, compression) = match protocol {
|
||||
PostgresClientProtocol::Interpreted {
|
||||
@@ -368,9 +360,13 @@ pub(super) async fn handle_walreceiver_connection(
|
||||
match raw_wal_start_lsn.cmp(&expected_wal_start) {
|
||||
std::cmp::Ordering::Greater => {
|
||||
let msg = format!(
|
||||
"Gap in streamed WAL: [{expected_wal_start}, {raw_wal_start_lsn})"
|
||||
"Gap in streamed WAL: [{expected_wal_start}, {raw_wal_start_lsn}"
|
||||
);
|
||||
critical_timeline!(
|
||||
timeline.tenant_shard_id,
|
||||
timeline.timeline_id,
|
||||
"{msg}"
|
||||
);
|
||||
critical!("{msg}");
|
||||
return Err(WalReceiverError::Other(anyhow!(msg)));
|
||||
}
|
||||
std::cmp::Ordering::Less => {
|
||||
@@ -383,7 +379,11 @@ pub(super) async fn handle_walreceiver_connection(
|
||||
"Received record with next_record_lsn multiple times ({} < {})",
|
||||
first_rec.next_record_lsn, expected_wal_start
|
||||
);
|
||||
critical!("{msg}");
|
||||
critical_timeline!(
|
||||
timeline.tenant_shard_id,
|
||||
timeline.timeline_id,
|
||||
"{msg}"
|
||||
);
|
||||
return Err(WalReceiverError::Other(anyhow!(msg)));
|
||||
}
|
||||
}
|
||||
@@ -452,7 +452,11 @@ pub(super) async fn handle_walreceiver_connection(
|
||||
// TODO: we can't differentiate cancellation errors with
|
||||
// anyhow::Error, so just ignore it if we're cancelled.
|
||||
if !cancellation.is_cancelled() && !timeline.is_stopping() {
|
||||
critical!("{err:?}")
|
||||
critical_timeline!(
|
||||
timeline.tenant_shard_id,
|
||||
timeline.timeline_id,
|
||||
"{err:?}"
|
||||
);
|
||||
}
|
||||
})?;
|
||||
|
||||
|
||||
@@ -45,9 +45,10 @@ pub(crate) fn regenerate(
|
||||
let (disk_wanted_bytes, shard_count) = tenant_manager.calculate_utilization()?;
|
||||
|
||||
// Fetch the fraction of disk space which may be used
|
||||
let disk_usable_pct = match conf.disk_usage_based_eviction.clone() {
|
||||
Some(e) => e.max_usage_pct,
|
||||
None => Percent::new(100).unwrap(),
|
||||
let disk_usable_pct = if conf.disk_usage_based_eviction.enabled {
|
||||
conf.disk_usage_based_eviction.max_usage_pct
|
||||
} else {
|
||||
Percent::new(100).unwrap()
|
||||
};
|
||||
|
||||
// Express a static value for how many shards we may schedule on one node
|
||||
|
||||
@@ -40,7 +40,7 @@ use tracing::*;
|
||||
use utils::bin_ser::{DeserializeError, SerializeError};
|
||||
use utils::lsn::Lsn;
|
||||
use utils::rate_limit::RateLimit;
|
||||
use utils::{critical, failpoint_support};
|
||||
use utils::{critical_timeline, failpoint_support};
|
||||
use wal_decoder::models::record::NeonWalRecord;
|
||||
use wal_decoder::models::*;
|
||||
|
||||
@@ -418,18 +418,30 @@ impl WalIngest {
|
||||
// as there has historically been cases where PostgreSQL has cleared spurious VM pages. See:
|
||||
// https://github.com/neondatabase/neon/pull/10634.
|
||||
let Some(vm_size) = get_relsize(modification, vm_rel, ctx).await? else {
|
||||
critical!("clear_vm_bits for unknown VM relation {vm_rel}");
|
||||
critical_timeline!(
|
||||
modification.tline.tenant_shard_id,
|
||||
modification.tline.timeline_id,
|
||||
"clear_vm_bits for unknown VM relation {vm_rel}"
|
||||
);
|
||||
return Ok(());
|
||||
};
|
||||
if let Some(blknum) = new_vm_blk {
|
||||
if blknum >= vm_size {
|
||||
critical!("new_vm_blk {blknum} not in {vm_rel} of size {vm_size}");
|
||||
critical_timeline!(
|
||||
modification.tline.tenant_shard_id,
|
||||
modification.tline.timeline_id,
|
||||
"new_vm_blk {blknum} not in {vm_rel} of size {vm_size}"
|
||||
);
|
||||
new_vm_blk = None;
|
||||
}
|
||||
}
|
||||
if let Some(blknum) = old_vm_blk {
|
||||
if blknum >= vm_size {
|
||||
critical!("old_vm_blk {blknum} not in {vm_rel} of size {vm_size}");
|
||||
critical_timeline!(
|
||||
modification.tline.tenant_shard_id,
|
||||
modification.tline.timeline_id,
|
||||
"old_vm_blk {blknum} not in {vm_rel} of size {vm_size}"
|
||||
);
|
||||
old_vm_blk = None;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -566,22 +566,55 @@ impl PostgresRedoManager {
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
pub(crate) mod harness {
|
||||
use super::PostgresRedoManager;
|
||||
use crate::config::PageServerConf;
|
||||
use utils::{id::TenantId, shard::TenantShardId};
|
||||
|
||||
pub struct RedoHarness {
|
||||
// underscored because unused, except for removal at drop
|
||||
_repo_dir: camino_tempfile::Utf8TempDir,
|
||||
pub manager: PostgresRedoManager,
|
||||
tenant_shard_id: TenantShardId,
|
||||
}
|
||||
|
||||
impl RedoHarness {
|
||||
pub fn new() -> anyhow::Result<Self> {
|
||||
crate::tenant::harness::setup_logging();
|
||||
|
||||
let repo_dir = camino_tempfile::tempdir()?;
|
||||
let conf = PageServerConf::dummy_conf(repo_dir.path().to_path_buf());
|
||||
let conf = Box::leak(Box::new(conf));
|
||||
let tenant_shard_id = TenantShardId::unsharded(TenantId::generate());
|
||||
|
||||
let manager = PostgresRedoManager::new(conf, tenant_shard_id);
|
||||
|
||||
Ok(RedoHarness {
|
||||
_repo_dir: repo_dir,
|
||||
manager,
|
||||
tenant_shard_id,
|
||||
})
|
||||
}
|
||||
pub fn span(&self) -> tracing::Span {
|
||||
tracing::info_span!("RedoHarness", tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::str::FromStr;
|
||||
|
||||
use bytes::Bytes;
|
||||
use pageserver_api::key::Key;
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use postgres_ffi::PgMajorVersion;
|
||||
use tracing::Instrument;
|
||||
use utils::id::TenantId;
|
||||
use utils::lsn::Lsn;
|
||||
use wal_decoder::models::record::NeonWalRecord;
|
||||
|
||||
use super::PostgresRedoManager;
|
||||
use crate::config::PageServerConf;
|
||||
use crate::walredo::RedoAttemptType;
|
||||
use crate::walredo::harness::RedoHarness;
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_ping() {
|
||||
@@ -692,33 +725,4 @@ mod tests {
|
||||
)
|
||||
]
|
||||
}
|
||||
|
||||
struct RedoHarness {
|
||||
// underscored because unused, except for removal at drop
|
||||
_repo_dir: camino_tempfile::Utf8TempDir,
|
||||
manager: PostgresRedoManager,
|
||||
tenant_shard_id: TenantShardId,
|
||||
}
|
||||
|
||||
impl RedoHarness {
|
||||
fn new() -> anyhow::Result<Self> {
|
||||
crate::tenant::harness::setup_logging();
|
||||
|
||||
let repo_dir = camino_tempfile::tempdir()?;
|
||||
let conf = PageServerConf::dummy_conf(repo_dir.path().to_path_buf());
|
||||
let conf = Box::leak(Box::new(conf));
|
||||
let tenant_shard_id = TenantShardId::unsharded(TenantId::generate());
|
||||
|
||||
let manager = PostgresRedoManager::new(conf, tenant_shard_id);
|
||||
|
||||
Ok(RedoHarness {
|
||||
_repo_dir: repo_dir,
|
||||
manager,
|
||||
tenant_shard_id,
|
||||
})
|
||||
}
|
||||
fn span(&self) -> tracing::Span {
|
||||
tracing::info_span!("RedoHarness", tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user