mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-25 17:10:38 +00:00
Merge branch 'main' of https://github.com/neondatabase/neon into skyzh/rm-file-if-fail
This commit is contained in:
@@ -24,6 +24,8 @@ const RESIDENT_SIZE: &str = "resident_size";
|
||||
const REMOTE_STORAGE_SIZE: &str = "remote_storage_size";
|
||||
const TIMELINE_LOGICAL_SIZE: &str = "timeline_logical_size";
|
||||
|
||||
const DEFAULT_HTTP_REPORTING_TIMEOUT: Duration = Duration::from_secs(60);
|
||||
|
||||
#[serde_as]
|
||||
#[derive(Serialize, Debug)]
|
||||
struct Ids {
|
||||
@@ -73,7 +75,10 @@ pub async fn collect_metrics(
|
||||
);
|
||||
|
||||
// define client here to reuse it for all requests
|
||||
let client = reqwest::Client::new();
|
||||
let client = reqwest::ClientBuilder::new()
|
||||
.timeout(DEFAULT_HTTP_REPORTING_TIMEOUT)
|
||||
.build()
|
||||
.expect("Failed to create http client with timeout");
|
||||
let mut cached_metrics: HashMap<PageserverConsumptionMetricsKey, u64> = HashMap::new();
|
||||
let mut prev_iteration_time: std::time::Instant = std::time::Instant::now();
|
||||
|
||||
@@ -83,7 +88,7 @@ pub async fn collect_metrics(
|
||||
info!("collect_metrics received cancellation request");
|
||||
return Ok(());
|
||||
},
|
||||
_ = ticker.tick() => {
|
||||
tick_at = ticker.tick() => {
|
||||
|
||||
// send cached metrics every cached_metric_collection_interval
|
||||
let send_cached = prev_iteration_time.elapsed() >= cached_metric_collection_interval;
|
||||
@@ -93,6 +98,12 @@ pub async fn collect_metrics(
|
||||
}
|
||||
|
||||
collect_metrics_iteration(&client, &mut cached_metrics, metric_collection_endpoint, node_id, &ctx, send_cached).await;
|
||||
|
||||
crate::tenant::tasks::warn_when_period_overrun(
|
||||
tick_at.elapsed(),
|
||||
metric_collection_interval,
|
||||
"consumption_metrics_collect_metrics",
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -223,14 +234,18 @@ pub async fn collect_metrics_iteration(
|
||||
// Note that this metric is calculated in a separate bgworker
|
||||
// Here we only use cached value, which may lag behind the real latest one
|
||||
let tenant_synthetic_size = tenant.get_cached_synthetic_size();
|
||||
current_metrics.push((
|
||||
PageserverConsumptionMetricsKey {
|
||||
tenant_id,
|
||||
timeline_id: None,
|
||||
metric: SYNTHETIC_STORAGE_SIZE,
|
||||
},
|
||||
tenant_synthetic_size,
|
||||
));
|
||||
|
||||
if tenant_synthetic_size != 0 {
|
||||
// only send non-zeroes because otherwise these show up as errors in logs
|
||||
current_metrics.push((
|
||||
PageserverConsumptionMetricsKey {
|
||||
tenant_id,
|
||||
timeline_id: None,
|
||||
metric: SYNTHETIC_STORAGE_SIZE,
|
||||
},
|
||||
tenant_synthetic_size,
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
// Filter metrics, unless we want to send all metrics, including cached ones.
|
||||
@@ -273,31 +288,42 @@ pub async fn collect_metrics_iteration(
|
||||
})
|
||||
.expect("PageserverConsumptionMetric should not fail serialization");
|
||||
|
||||
let res = client
|
||||
.post(metric_collection_endpoint.clone())
|
||||
.json(&chunk_json)
|
||||
.send()
|
||||
.await;
|
||||
const MAX_RETRIES: u32 = 3;
|
||||
|
||||
match res {
|
||||
Ok(res) => {
|
||||
if res.status().is_success() {
|
||||
// update cached metrics after they were sent successfully
|
||||
for (curr_key, curr_val) in chunk.iter() {
|
||||
cached_metrics.insert(curr_key.clone(), *curr_val);
|
||||
}
|
||||
} else {
|
||||
error!("metrics endpoint refused the sent metrics: {:?}", res);
|
||||
for metric in chunk_to_send.iter() {
|
||||
// Report if the metric value is suspiciously large
|
||||
if metric.value > (1u64 << 40) {
|
||||
for attempt in 0..MAX_RETRIES {
|
||||
let res = client
|
||||
.post(metric_collection_endpoint.clone())
|
||||
.json(&chunk_json)
|
||||
.send()
|
||||
.await;
|
||||
|
||||
match res {
|
||||
Ok(res) => {
|
||||
if res.status().is_success() {
|
||||
// update cached metrics after they were sent successfully
|
||||
for (curr_key, curr_val) in chunk.iter() {
|
||||
cached_metrics.insert(curr_key.clone(), *curr_val);
|
||||
}
|
||||
} else {
|
||||
error!("metrics endpoint refused the sent metrics: {:?}", res);
|
||||
for metric in chunk_to_send
|
||||
.iter()
|
||||
.filter(|metric| metric.value > (1u64 << 40))
|
||||
{
|
||||
// Report if the metric value is suspiciously large
|
||||
error!("potentially abnormal metric value: {:?}", metric);
|
||||
}
|
||||
}
|
||||
break;
|
||||
}
|
||||
Err(err) if err.is_timeout() => {
|
||||
error!(attempt, "timeout sending metrics, retrying immediately");
|
||||
continue;
|
||||
}
|
||||
Err(err) => {
|
||||
error!(attempt, ?err, "failed to send metrics");
|
||||
break;
|
||||
}
|
||||
}
|
||||
Err(err) => {
|
||||
error!("failed to send metrics: {:?}", err);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -317,7 +343,7 @@ pub async fn calculate_synthetic_size_worker(
|
||||
_ = task_mgr::shutdown_watcher() => {
|
||||
return Ok(());
|
||||
},
|
||||
_ = ticker.tick() => {
|
||||
tick_at = ticker.tick() => {
|
||||
|
||||
let tenants = match mgr::list_tenants().await {
|
||||
Ok(tenants) => tenants,
|
||||
@@ -343,6 +369,12 @@ pub async fn calculate_synthetic_size_worker(
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
crate::tenant::tasks::warn_when_period_overrun(
|
||||
tick_at.elapsed(),
|
||||
synthetic_size_calculation_interval,
|
||||
"consumption_metrics_synthetic_size_worker",
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -110,7 +110,6 @@ pub fn launch_disk_usage_global_eviction_task(
|
||||
|
||||
disk_usage_eviction_task(&state, task_config, storage, &conf.tenants_path(), cancel)
|
||||
.await;
|
||||
info!("disk usage based eviction task finishing");
|
||||
Ok(())
|
||||
},
|
||||
);
|
||||
@@ -126,13 +125,16 @@ async fn disk_usage_eviction_task(
|
||||
tenants_dir: &Path,
|
||||
cancel: CancellationToken,
|
||||
) {
|
||||
scopeguard::defer! {
|
||||
info!("disk usage based eviction task finishing");
|
||||
};
|
||||
|
||||
use crate::tenant::tasks::random_init_delay;
|
||||
{
|
||||
if random_init_delay(task_config.period, &cancel)
|
||||
.await
|
||||
.is_err()
|
||||
{
|
||||
info!("shutting down");
|
||||
return;
|
||||
}
|
||||
}
|
||||
@@ -167,7 +169,6 @@ async fn disk_usage_eviction_task(
|
||||
tokio::select! {
|
||||
_ = tokio::time::sleep_until(sleep_until) => {},
|
||||
_ = cancel.cancelled() => {
|
||||
info!("shutting down");
|
||||
break
|
||||
}
|
||||
}
|
||||
@@ -314,7 +315,7 @@ pub async fn disk_usage_eviction_task_iteration_impl<U: Usage>(
|
||||
partition,
|
||||
candidate.layer.get_tenant_id(),
|
||||
candidate.layer.get_timeline_id(),
|
||||
candidate.layer.filename().file_name(),
|
||||
candidate.layer,
|
||||
);
|
||||
}
|
||||
|
||||
|
||||
@@ -722,6 +722,12 @@ paths:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/ForbiddenError"
|
||||
"406":
|
||||
description: Permanently unsatisfiable request, don't retry.
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/Error"
|
||||
"409":
|
||||
description: Timeline already exists, creation skipped
|
||||
content:
|
||||
|
||||
@@ -338,6 +338,11 @@ async fn timeline_create_handler(
|
||||
Err(tenant::CreateTimelineError::AlreadyExists) => {
|
||||
json_response(StatusCode::CONFLICT, ())
|
||||
}
|
||||
Err(tenant::CreateTimelineError::AncestorLsn(err)) => {
|
||||
json_response(StatusCode::NOT_ACCEPTABLE, HttpErrorBody::from_msg(
|
||||
format!("{err:#}")
|
||||
))
|
||||
}
|
||||
Err(tenant::CreateTimelineError::Other(err)) => Err(ApiError::InternalServerError(err)),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,9 +1,9 @@
|
||||
use metrics::metric_vec_duration::DurationResultObserver;
|
||||
use metrics::{
|
||||
register_counter_vec, register_histogram, register_histogram_vec, register_int_counter,
|
||||
register_int_counter_vec, register_int_gauge, register_int_gauge_vec, register_uint_gauge_vec,
|
||||
Counter, CounterVec, Histogram, HistogramVec, IntCounter, IntCounterVec, IntGauge, IntGaugeVec,
|
||||
UIntGauge, UIntGaugeVec,
|
||||
register_int_counter_vec, register_int_gauge, register_int_gauge_vec, register_uint_gauge,
|
||||
register_uint_gauge_vec, Counter, CounterVec, Histogram, HistogramVec, IntCounter,
|
||||
IntCounterVec, IntGauge, IntGaugeVec, UIntGauge, UIntGaugeVec,
|
||||
};
|
||||
use once_cell::sync::Lazy;
|
||||
use pageserver_api::models::TenantState;
|
||||
@@ -130,6 +130,122 @@ pub static MATERIALIZED_PAGE_CACHE_HIT: Lazy<IntCounter> = Lazy::new(|| {
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
pub struct PageCacheMetrics {
|
||||
pub read_accesses_materialized_page: IntCounter,
|
||||
pub read_accesses_ephemeral: IntCounter,
|
||||
pub read_accesses_immutable: IntCounter,
|
||||
|
||||
pub read_hits_ephemeral: IntCounter,
|
||||
pub read_hits_immutable: IntCounter,
|
||||
pub read_hits_materialized_page_exact: IntCounter,
|
||||
pub read_hits_materialized_page_older_lsn: IntCounter,
|
||||
}
|
||||
|
||||
static PAGE_CACHE_READ_HITS: Lazy<IntCounterVec> = Lazy::new(|| {
|
||||
register_int_counter_vec!(
|
||||
"pageserver_page_cache_read_hits_total",
|
||||
"Number of read accesses to the page cache that hit",
|
||||
&["key_kind", "hit_kind"]
|
||||
)
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
static PAGE_CACHE_READ_ACCESSES: Lazy<IntCounterVec> = Lazy::new(|| {
|
||||
register_int_counter_vec!(
|
||||
"pageserver_page_cache_read_accesses_total",
|
||||
"Number of read accesses to the page cache",
|
||||
&["key_kind"]
|
||||
)
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
pub static PAGE_CACHE: Lazy<PageCacheMetrics> = Lazy::new(|| PageCacheMetrics {
|
||||
read_accesses_materialized_page: {
|
||||
PAGE_CACHE_READ_ACCESSES
|
||||
.get_metric_with_label_values(&["materialized_page"])
|
||||
.unwrap()
|
||||
},
|
||||
|
||||
read_accesses_ephemeral: {
|
||||
PAGE_CACHE_READ_ACCESSES
|
||||
.get_metric_with_label_values(&["ephemeral"])
|
||||
.unwrap()
|
||||
},
|
||||
|
||||
read_accesses_immutable: {
|
||||
PAGE_CACHE_READ_ACCESSES
|
||||
.get_metric_with_label_values(&["immutable"])
|
||||
.unwrap()
|
||||
},
|
||||
|
||||
read_hits_ephemeral: {
|
||||
PAGE_CACHE_READ_HITS
|
||||
.get_metric_with_label_values(&["ephemeral", "-"])
|
||||
.unwrap()
|
||||
},
|
||||
|
||||
read_hits_immutable: {
|
||||
PAGE_CACHE_READ_HITS
|
||||
.get_metric_with_label_values(&["immutable", "-"])
|
||||
.unwrap()
|
||||
},
|
||||
|
||||
read_hits_materialized_page_exact: {
|
||||
PAGE_CACHE_READ_HITS
|
||||
.get_metric_with_label_values(&["materialized_page", "exact"])
|
||||
.unwrap()
|
||||
},
|
||||
|
||||
read_hits_materialized_page_older_lsn: {
|
||||
PAGE_CACHE_READ_HITS
|
||||
.get_metric_with_label_values(&["materialized_page", "older_lsn"])
|
||||
.unwrap()
|
||||
},
|
||||
});
|
||||
|
||||
pub struct PageCacheSizeMetrics {
|
||||
pub max_bytes: UIntGauge,
|
||||
|
||||
pub current_bytes_ephemeral: UIntGauge,
|
||||
pub current_bytes_immutable: UIntGauge,
|
||||
pub current_bytes_materialized_page: UIntGauge,
|
||||
}
|
||||
|
||||
static PAGE_CACHE_SIZE_CURRENT_BYTES: Lazy<UIntGaugeVec> = Lazy::new(|| {
|
||||
register_uint_gauge_vec!(
|
||||
"pageserver_page_cache_size_current_bytes",
|
||||
"Current size of the page cache in bytes, by key kind",
|
||||
&["key_kind"]
|
||||
)
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
pub static PAGE_CACHE_SIZE: Lazy<PageCacheSizeMetrics> = Lazy::new(|| PageCacheSizeMetrics {
|
||||
max_bytes: {
|
||||
register_uint_gauge!(
|
||||
"pageserver_page_cache_size_max_bytes",
|
||||
"Maximum size of the page cache in bytes"
|
||||
)
|
||||
.expect("failed to define a metric")
|
||||
},
|
||||
|
||||
current_bytes_ephemeral: {
|
||||
PAGE_CACHE_SIZE_CURRENT_BYTES
|
||||
.get_metric_with_label_values(&["ephemeral"])
|
||||
.unwrap()
|
||||
},
|
||||
current_bytes_immutable: {
|
||||
PAGE_CACHE_SIZE_CURRENT_BYTES
|
||||
.get_metric_with_label_values(&["immutable"])
|
||||
.unwrap()
|
||||
},
|
||||
current_bytes_materialized_page: {
|
||||
PAGE_CACHE_SIZE_CURRENT_BYTES
|
||||
.get_metric_with_label_values(&["materialized_page"])
|
||||
.unwrap()
|
||||
},
|
||||
});
|
||||
|
||||
static WAIT_LSN_TIME: Lazy<HistogramVec> = Lazy::new(|| {
|
||||
register_histogram_vec!(
|
||||
"pageserver_wait_lsn_seconds",
|
||||
@@ -204,11 +320,11 @@ pub static TENANT_STATE_METRIC: Lazy<UIntGaugeVec> = Lazy::new(|| {
|
||||
|
||||
pub static TENANT_SYNTHETIC_SIZE_METRIC: Lazy<UIntGaugeVec> = Lazy::new(|| {
|
||||
register_uint_gauge_vec!(
|
||||
"pageserver_tenant_synthetic_size",
|
||||
"Synthetic size of each tenant",
|
||||
"pageserver_tenant_synthetic_cached_size_bytes",
|
||||
"Synthetic size of each tenant in bytes",
|
||||
&["tenant_id"]
|
||||
)
|
||||
.expect("Failed to register pageserver_tenant_synthetic_size metric")
|
||||
.expect("Failed to register pageserver_tenant_synthetic_cached_size_bytes metric")
|
||||
});
|
||||
|
||||
// Metrics for cloud upload. These metrics reflect data uploaded to cloud storage,
|
||||
@@ -968,7 +1084,6 @@ impl RemoteTimelineClientMetrics {
|
||||
op_kind: &RemoteOpKind,
|
||||
status: &'static str,
|
||||
) -> Histogram {
|
||||
// XXX would be nice to have an upgradable RwLock
|
||||
let mut guard = self.remote_operation_time.lock().unwrap();
|
||||
let key = (file_kind.as_str(), op_kind.as_str(), status);
|
||||
let metric = guard.entry(key).or_insert_with(move || {
|
||||
@@ -990,7 +1105,6 @@ impl RemoteTimelineClientMetrics {
|
||||
file_kind: &RemoteOpFileKind,
|
||||
op_kind: &RemoteOpKind,
|
||||
) -> IntGauge {
|
||||
// XXX would be nice to have an upgradable RwLock
|
||||
let mut guard = self.calls_unfinished_gauge.lock().unwrap();
|
||||
let key = (file_kind.as_str(), op_kind.as_str());
|
||||
let metric = guard.entry(key).or_insert_with(move || {
|
||||
@@ -1011,7 +1125,6 @@ impl RemoteTimelineClientMetrics {
|
||||
file_kind: &RemoteOpFileKind,
|
||||
op_kind: &RemoteOpKind,
|
||||
) -> Histogram {
|
||||
// XXX would be nice to have an upgradable RwLock
|
||||
let mut guard = self.calls_started_hist.lock().unwrap();
|
||||
let key = (file_kind.as_str(), op_kind.as_str());
|
||||
let metric = guard.entry(key).or_insert_with(move || {
|
||||
@@ -1032,7 +1145,6 @@ impl RemoteTimelineClientMetrics {
|
||||
file_kind: &RemoteOpFileKind,
|
||||
op_kind: &RemoteOpKind,
|
||||
) -> IntCounter {
|
||||
// XXX would be nice to have an upgradable RwLock
|
||||
let mut guard = self.bytes_started_counter.lock().unwrap();
|
||||
let key = (file_kind.as_str(), op_kind.as_str());
|
||||
let metric = guard.entry(key).or_insert_with(move || {
|
||||
@@ -1053,7 +1165,6 @@ impl RemoteTimelineClientMetrics {
|
||||
file_kind: &RemoteOpFileKind,
|
||||
op_kind: &RemoteOpKind,
|
||||
) -> IntCounter {
|
||||
// XXX would be nice to have an upgradable RwLock
|
||||
let mut guard = self.bytes_finished_counter.lock().unwrap();
|
||||
let key = (file_kind.as_str(), op_kind.as_str());
|
||||
let metric = guard.entry(key).or_insert_with(move || {
|
||||
|
||||
@@ -53,8 +53,8 @@ use utils::{
|
||||
lsn::Lsn,
|
||||
};
|
||||
|
||||
use crate::repository::Key;
|
||||
use crate::tenant::writeback_ephemeral_file;
|
||||
use crate::{metrics::PageCacheSizeMetrics, repository::Key};
|
||||
|
||||
static PAGE_CACHE: OnceCell<PageCache> = OnceCell::new();
|
||||
const TEST_PAGE_CACHE_SIZE: usize = 50;
|
||||
@@ -187,6 +187,8 @@ pub struct PageCache {
|
||||
/// Index of the next candidate to evict, for the Clock replacement algorithm.
|
||||
/// This is interpreted modulo the page cache size.
|
||||
next_evict_slot: AtomicUsize,
|
||||
|
||||
size_metrics: &'static PageCacheSizeMetrics,
|
||||
}
|
||||
|
||||
///
|
||||
@@ -313,6 +315,10 @@ impl PageCache {
|
||||
key: &Key,
|
||||
lsn: Lsn,
|
||||
) -> Option<(Lsn, PageReadGuard)> {
|
||||
crate::metrics::PAGE_CACHE
|
||||
.read_accesses_materialized_page
|
||||
.inc();
|
||||
|
||||
let mut cache_key = CacheKey::MaterializedPage {
|
||||
hash_key: MaterializedPageHashKey {
|
||||
tenant_id,
|
||||
@@ -323,8 +329,21 @@ impl PageCache {
|
||||
};
|
||||
|
||||
if let Some(guard) = self.try_lock_for_read(&mut cache_key) {
|
||||
if let CacheKey::MaterializedPage { hash_key: _, lsn } = cache_key {
|
||||
Some((lsn, guard))
|
||||
if let CacheKey::MaterializedPage {
|
||||
hash_key: _,
|
||||
lsn: available_lsn,
|
||||
} = cache_key
|
||||
{
|
||||
if available_lsn == lsn {
|
||||
crate::metrics::PAGE_CACHE
|
||||
.read_hits_materialized_page_exact
|
||||
.inc();
|
||||
} else {
|
||||
crate::metrics::PAGE_CACHE
|
||||
.read_hits_materialized_page_older_lsn
|
||||
.inc();
|
||||
}
|
||||
Some((available_lsn, guard))
|
||||
} else {
|
||||
panic!("unexpected key type in slot");
|
||||
}
|
||||
@@ -499,11 +518,31 @@ impl PageCache {
|
||||
/// ```
|
||||
///
|
||||
fn lock_for_read(&self, cache_key: &mut CacheKey) -> anyhow::Result<ReadBufResult> {
|
||||
let (read_access, hit) = match cache_key {
|
||||
CacheKey::MaterializedPage { .. } => {
|
||||
unreachable!("Materialized pages use lookup_materialized_page")
|
||||
}
|
||||
CacheKey::EphemeralPage { .. } => (
|
||||
&crate::metrics::PAGE_CACHE.read_accesses_ephemeral,
|
||||
&crate::metrics::PAGE_CACHE.read_hits_ephemeral,
|
||||
),
|
||||
CacheKey::ImmutableFilePage { .. } => (
|
||||
&crate::metrics::PAGE_CACHE.read_accesses_immutable,
|
||||
&crate::metrics::PAGE_CACHE.read_hits_immutable,
|
||||
),
|
||||
};
|
||||
read_access.inc();
|
||||
|
||||
let mut is_first_iteration = true;
|
||||
loop {
|
||||
// First check if the key already exists in the cache.
|
||||
if let Some(read_guard) = self.try_lock_for_read(cache_key) {
|
||||
if is_first_iteration {
|
||||
hit.inc();
|
||||
}
|
||||
return Ok(ReadBufResult::Found(read_guard));
|
||||
}
|
||||
is_first_iteration = false;
|
||||
|
||||
// Not found. Find a victim buffer
|
||||
let (slot_idx, mut inner) =
|
||||
@@ -681,6 +720,9 @@ impl PageCache {
|
||||
|
||||
if let Ok(version_idx) = versions.binary_search_by_key(old_lsn, |v| v.lsn) {
|
||||
versions.remove(version_idx);
|
||||
self.size_metrics
|
||||
.current_bytes_materialized_page
|
||||
.sub_page_sz(1);
|
||||
if versions.is_empty() {
|
||||
old_entry.remove_entry();
|
||||
}
|
||||
@@ -693,11 +735,13 @@ impl PageCache {
|
||||
let mut map = self.ephemeral_page_map.write().unwrap();
|
||||
map.remove(&(*file_id, *blkno))
|
||||
.expect("could not find old key in mapping");
|
||||
self.size_metrics.current_bytes_ephemeral.sub_page_sz(1);
|
||||
}
|
||||
CacheKey::ImmutableFilePage { file_id, blkno } => {
|
||||
let mut map = self.immutable_page_map.write().unwrap();
|
||||
map.remove(&(*file_id, *blkno))
|
||||
.expect("could not find old key in mapping");
|
||||
self.size_metrics.current_bytes_immutable.sub_page_sz(1);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -725,6 +769,9 @@ impl PageCache {
|
||||
slot_idx,
|
||||
},
|
||||
);
|
||||
self.size_metrics
|
||||
.current_bytes_materialized_page
|
||||
.add_page_sz(1);
|
||||
None
|
||||
}
|
||||
}
|
||||
@@ -735,6 +782,7 @@ impl PageCache {
|
||||
Entry::Occupied(entry) => Some(*entry.get()),
|
||||
Entry::Vacant(entry) => {
|
||||
entry.insert(slot_idx);
|
||||
self.size_metrics.current_bytes_ephemeral.add_page_sz(1);
|
||||
None
|
||||
}
|
||||
}
|
||||
@@ -745,6 +793,7 @@ impl PageCache {
|
||||
Entry::Occupied(entry) => Some(*entry.get()),
|
||||
Entry::Vacant(entry) => {
|
||||
entry.insert(slot_idx);
|
||||
self.size_metrics.current_bytes_immutable.add_page_sz(1);
|
||||
None
|
||||
}
|
||||
}
|
||||
@@ -844,6 +893,12 @@ impl PageCache {
|
||||
|
||||
let page_buffer = Box::leak(vec![0u8; num_pages * PAGE_SZ].into_boxed_slice());
|
||||
|
||||
let size_metrics = &crate::metrics::PAGE_CACHE_SIZE;
|
||||
size_metrics.max_bytes.set_page_sz(num_pages);
|
||||
size_metrics.current_bytes_ephemeral.set_page_sz(0);
|
||||
size_metrics.current_bytes_immutable.set_page_sz(0);
|
||||
size_metrics.current_bytes_materialized_page.set_page_sz(0);
|
||||
|
||||
let slots = page_buffer
|
||||
.chunks_exact_mut(PAGE_SZ)
|
||||
.map(|chunk| {
|
||||
@@ -866,6 +921,30 @@ impl PageCache {
|
||||
immutable_page_map: Default::default(),
|
||||
slots,
|
||||
next_evict_slot: AtomicUsize::new(0),
|
||||
size_metrics,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
trait PageSzBytesMetric {
|
||||
fn set_page_sz(&self, count: usize);
|
||||
fn add_page_sz(&self, count: usize);
|
||||
fn sub_page_sz(&self, count: usize);
|
||||
}
|
||||
|
||||
#[inline(always)]
|
||||
fn count_times_page_sz(count: usize) -> u64 {
|
||||
u64::try_from(count).unwrap() * u64::try_from(PAGE_SZ).unwrap()
|
||||
}
|
||||
|
||||
impl PageSzBytesMetric for metrics::UIntGauge {
|
||||
fn set_page_sz(&self, count: usize) {
|
||||
self.set(count_times_page_sz(count));
|
||||
}
|
||||
fn add_page_sz(&self, count: usize) {
|
||||
self.add(count_times_page_sz(count));
|
||||
}
|
||||
fn sub_page_sz(&self, count: usize) {
|
||||
self.sub(count_times_page_sz(count));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -11,7 +11,7 @@
|
||||
//! parent timeline, and the last LSN that has been written to disk.
|
||||
//!
|
||||
|
||||
use anyhow::{bail, ensure, Context};
|
||||
use anyhow::{bail, Context};
|
||||
use futures::FutureExt;
|
||||
use pageserver_api::models::TimelineState;
|
||||
use remote_storage::DownloadError;
|
||||
@@ -49,6 +49,8 @@ use std::time::{Duration, Instant};
|
||||
use self::config::TenantConf;
|
||||
use self::metadata::TimelineMetadata;
|
||||
use self::remote_timeline_client::RemoteTimelineClient;
|
||||
use self::timeline::uninit::TimelineUninitMark;
|
||||
use self::timeline::uninit::UninitializedTimeline;
|
||||
use self::timeline::EvictionTaskTenantState;
|
||||
use crate::config::PageServerConf;
|
||||
use crate::context::{DownloadBehavior, RequestContext};
|
||||
@@ -68,6 +70,7 @@ use crate::tenant::storage_layer::ImageLayer;
|
||||
use crate::tenant::storage_layer::Layer;
|
||||
use crate::InitializationOrder;
|
||||
|
||||
use crate::tenant::timeline::uninit::cleanup_timeline_directory;
|
||||
use crate::virtual_file::VirtualFile;
|
||||
use crate::walredo::PostgresRedoManager;
|
||||
use crate::walredo::WalRedoManager;
|
||||
@@ -87,6 +90,7 @@ pub mod disk_btree;
|
||||
pub(crate) mod ephemeral_file;
|
||||
pub mod layer_map;
|
||||
pub mod manifest;
|
||||
mod span;
|
||||
|
||||
pub mod metadata;
|
||||
mod par_fsync;
|
||||
@@ -102,7 +106,7 @@ mod timeline;
|
||||
|
||||
pub mod size;
|
||||
|
||||
pub(crate) use timeline::debug_assert_current_span_has_tenant_and_timeline_id;
|
||||
pub(crate) use timeline::span::debug_assert_current_span_has_tenant_and_timeline_id;
|
||||
pub use timeline::{
|
||||
LocalLayerInfoForDiskUsageEviction, LogicalSizeCalculationCause, PageReconstructError, Timeline,
|
||||
};
|
||||
@@ -161,200 +165,6 @@ pub struct Tenant {
|
||||
eviction_task_tenant_state: tokio::sync::Mutex<EvictionTaskTenantState>,
|
||||
}
|
||||
|
||||
/// A timeline with some of its files on disk, being initialized.
|
||||
/// This struct ensures the atomicity of the timeline init: it's either properly created and inserted into pageserver's memory, or
|
||||
/// its local files are removed. In the worst case of a crash, an uninit mark file is left behind, which causes the directory
|
||||
/// to be removed on next restart.
|
||||
///
|
||||
/// The caller is responsible for proper timeline data filling before the final init.
|
||||
#[must_use]
|
||||
pub struct UninitializedTimeline<'t> {
|
||||
owning_tenant: &'t Tenant,
|
||||
timeline_id: TimelineId,
|
||||
raw_timeline: Option<(Arc<Timeline>, TimelineUninitMark)>,
|
||||
}
|
||||
|
||||
/// An uninit mark file, created along the timeline dir to ensure the timeline either gets fully initialized and loaded into pageserver's memory,
|
||||
/// or gets removed eventually.
|
||||
///
|
||||
/// XXX: it's important to create it near the timeline dir, not inside it to ensure timeline dir gets removed first.
|
||||
#[must_use]
|
||||
struct TimelineUninitMark {
|
||||
uninit_mark_deleted: bool,
|
||||
uninit_mark_path: PathBuf,
|
||||
timeline_path: PathBuf,
|
||||
}
|
||||
|
||||
impl UninitializedTimeline<'_> {
|
||||
/// Finish timeline creation: insert it into the Tenant's timelines map and remove the
|
||||
/// uninit mark file.
|
||||
///
|
||||
/// This function launches the flush loop if not already done.
|
||||
///
|
||||
/// The caller is responsible for activating the timeline (function `.activate()`).
|
||||
fn finish_creation(mut self) -> anyhow::Result<Arc<Timeline>> {
|
||||
let timeline_id = self.timeline_id;
|
||||
let tenant_id = self.owning_tenant.tenant_id;
|
||||
|
||||
let (new_timeline, uninit_mark) = self.raw_timeline.take().with_context(|| {
|
||||
format!("No timeline for initalization found for {tenant_id}/{timeline_id}")
|
||||
})?;
|
||||
|
||||
// Check that the caller initialized disk_consistent_lsn
|
||||
let new_disk_consistent_lsn = new_timeline.get_disk_consistent_lsn();
|
||||
ensure!(
|
||||
new_disk_consistent_lsn.is_valid(),
|
||||
"new timeline {tenant_id}/{timeline_id} has invalid disk_consistent_lsn"
|
||||
);
|
||||
|
||||
let mut timelines = self.owning_tenant.timelines.lock().unwrap();
|
||||
match timelines.entry(timeline_id) {
|
||||
Entry::Occupied(_) => anyhow::bail!(
|
||||
"Found freshly initialized timeline {tenant_id}/{timeline_id} in the tenant map"
|
||||
),
|
||||
Entry::Vacant(v) => {
|
||||
uninit_mark.remove_uninit_mark().with_context(|| {
|
||||
format!(
|
||||
"Failed to remove uninit mark file for timeline {tenant_id}/{timeline_id}"
|
||||
)
|
||||
})?;
|
||||
v.insert(Arc::clone(&new_timeline));
|
||||
|
||||
new_timeline.maybe_spawn_flush_loop();
|
||||
}
|
||||
}
|
||||
|
||||
Ok(new_timeline)
|
||||
}
|
||||
|
||||
/// Prepares timeline data by loading it from the basebackup archive.
|
||||
pub async fn import_basebackup_from_tar(
|
||||
self,
|
||||
copyin_read: &mut (impl tokio::io::AsyncRead + Send + Sync + Unpin),
|
||||
base_lsn: Lsn,
|
||||
broker_client: storage_broker::BrokerClientChannel,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<Arc<Timeline>> {
|
||||
let raw_timeline = self.raw_timeline()?;
|
||||
|
||||
import_datadir::import_basebackup_from_tar(raw_timeline, copyin_read, base_lsn, ctx)
|
||||
.await
|
||||
.context("Failed to import basebackup")?;
|
||||
|
||||
// Flush the new layer files to disk, before we make the timeline as available to
|
||||
// the outside world.
|
||||
//
|
||||
// Flush loop needs to be spawned in order to be able to flush.
|
||||
raw_timeline.maybe_spawn_flush_loop();
|
||||
|
||||
fail::fail_point!("before-checkpoint-new-timeline", |_| {
|
||||
bail!("failpoint before-checkpoint-new-timeline");
|
||||
});
|
||||
|
||||
raw_timeline
|
||||
.freeze_and_flush()
|
||||
.await
|
||||
.context("Failed to flush after basebackup import")?;
|
||||
|
||||
// All the data has been imported. Insert the Timeline into the tenant's timelines
|
||||
// map and remove the uninit mark file.
|
||||
let tl = self.finish_creation()?;
|
||||
tl.activate(broker_client, None, ctx);
|
||||
Ok(tl)
|
||||
}
|
||||
|
||||
fn raw_timeline(&self) -> anyhow::Result<&Arc<Timeline>> {
|
||||
Ok(&self
|
||||
.raw_timeline
|
||||
.as_ref()
|
||||
.with_context(|| {
|
||||
format!(
|
||||
"No raw timeline {}/{} found",
|
||||
self.owning_tenant.tenant_id, self.timeline_id
|
||||
)
|
||||
})?
|
||||
.0)
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for UninitializedTimeline<'_> {
|
||||
fn drop(&mut self) {
|
||||
if let Some((_, uninit_mark)) = self.raw_timeline.take() {
|
||||
let _entered = info_span!("drop_uninitialized_timeline", tenant = %self.owning_tenant.tenant_id, timeline = %self.timeline_id).entered();
|
||||
error!("Timeline got dropped without initializing, cleaning its files");
|
||||
cleanup_timeline_directory(uninit_mark);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn cleanup_timeline_directory(uninit_mark: TimelineUninitMark) {
|
||||
let timeline_path = &uninit_mark.timeline_path;
|
||||
match ignore_absent_files(|| fs::remove_dir_all(timeline_path)) {
|
||||
Ok(()) => {
|
||||
info!("Timeline dir {timeline_path:?} removed successfully, removing the uninit mark")
|
||||
}
|
||||
Err(e) => {
|
||||
error!("Failed to clean up uninitialized timeline directory {timeline_path:?}: {e:?}")
|
||||
}
|
||||
}
|
||||
drop(uninit_mark); // mark handles its deletion on drop, gets retained if timeline dir exists
|
||||
}
|
||||
|
||||
impl TimelineUninitMark {
|
||||
fn new(uninit_mark_path: PathBuf, timeline_path: PathBuf) -> Self {
|
||||
Self {
|
||||
uninit_mark_deleted: false,
|
||||
uninit_mark_path,
|
||||
timeline_path,
|
||||
}
|
||||
}
|
||||
|
||||
fn remove_uninit_mark(mut self) -> anyhow::Result<()> {
|
||||
if !self.uninit_mark_deleted {
|
||||
self.delete_mark_file_if_present()?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn delete_mark_file_if_present(&mut self) -> anyhow::Result<()> {
|
||||
let uninit_mark_file = &self.uninit_mark_path;
|
||||
let uninit_mark_parent = uninit_mark_file
|
||||
.parent()
|
||||
.with_context(|| format!("Uninit mark file {uninit_mark_file:?} has no parent"))?;
|
||||
ignore_absent_files(|| fs::remove_file(uninit_mark_file)).with_context(|| {
|
||||
format!("Failed to remove uninit mark file at path {uninit_mark_file:?}")
|
||||
})?;
|
||||
crashsafe::fsync(uninit_mark_parent).context("Failed to fsync uninit mark parent")?;
|
||||
self.uninit_mark_deleted = true;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for TimelineUninitMark {
|
||||
fn drop(&mut self) {
|
||||
if !self.uninit_mark_deleted {
|
||||
if self.timeline_path.exists() {
|
||||
error!(
|
||||
"Uninit mark {} is not removed, timeline {} stays uninitialized",
|
||||
self.uninit_mark_path.display(),
|
||||
self.timeline_path.display()
|
||||
)
|
||||
} else {
|
||||
// unblock later timeline creation attempts
|
||||
warn!(
|
||||
"Removing intermediate uninit mark file {}",
|
||||
self.uninit_mark_path.display()
|
||||
);
|
||||
if let Err(e) = self.delete_mark_file_if_present() {
|
||||
error!("Failed to remove the uninit mark file: {e}")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// We should not blindly overwrite local metadata with remote one.
|
||||
// For example, consider the following case:
|
||||
// Image layer is flushed to disk as a new delta layer, we update local metadata and start upload task but after that
|
||||
@@ -506,6 +316,8 @@ pub enum CreateTimelineError {
|
||||
#[error("a timeline with the given ID already exists")]
|
||||
AlreadyExists,
|
||||
#[error(transparent)]
|
||||
AncestorLsn(anyhow::Error),
|
||||
#[error(transparent)]
|
||||
Other(#[from] anyhow::Error),
|
||||
}
|
||||
|
||||
@@ -693,7 +505,7 @@ impl Tenant {
|
||||
/// No background tasks are started as part of this routine.
|
||||
///
|
||||
async fn attach(self: &Arc<Tenant>, ctx: &RequestContext) -> anyhow::Result<()> {
|
||||
debug_assert_current_span_has_tenant_id();
|
||||
span::debug_assert_current_span_has_tenant_id();
|
||||
|
||||
let marker_file = self.conf.tenant_attaching_mark_file_path(&self.tenant_id);
|
||||
if !tokio::fs::try_exists(&marker_file)
|
||||
@@ -831,7 +643,7 @@ impl Tenant {
|
||||
remote_client: RemoteTimelineClient,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
debug_assert_current_span_has_tenant_id();
|
||||
span::debug_assert_current_span_has_tenant_id();
|
||||
|
||||
info!("downloading index file for timeline {}", timeline_id);
|
||||
tokio::fs::create_dir_all(self.conf.timeline_path(&timeline_id, &self.tenant_id))
|
||||
@@ -910,7 +722,7 @@ impl Tenant {
|
||||
init_order: Option<InitializationOrder>,
|
||||
ctx: &RequestContext,
|
||||
) -> Arc<Tenant> {
|
||||
debug_assert_current_span_has_tenant_id();
|
||||
span::debug_assert_current_span_has_tenant_id();
|
||||
|
||||
let tenant_conf = match Self::load_tenant_config(conf, tenant_id) {
|
||||
Ok(conf) => conf,
|
||||
@@ -1096,7 +908,7 @@ impl Tenant {
|
||||
init_order: Option<&InitializationOrder>,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
debug_assert_current_span_has_tenant_id();
|
||||
span::debug_assert_current_span_has_tenant_id();
|
||||
|
||||
debug!("loading tenant task");
|
||||
|
||||
@@ -1142,7 +954,7 @@ impl Tenant {
|
||||
init_order: Option<&InitializationOrder>,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
debug_assert_current_span_has_tenant_id();
|
||||
span::debug_assert_current_span_has_tenant_id();
|
||||
|
||||
let remote_client = self.remote_storage.as_ref().map(|remote_storage| {
|
||||
RemoteTimelineClient::new(
|
||||
@@ -1432,7 +1244,7 @@ impl Tenant {
|
||||
let ancestor_ancestor_lsn = ancestor_timeline.get_ancestor_lsn();
|
||||
if ancestor_ancestor_lsn > *lsn {
|
||||
// can we safely just branch from the ancestor instead?
|
||||
return Err(CreateTimelineError::Other(anyhow::anyhow!(
|
||||
return Err(CreateTimelineError::AncestorLsn(anyhow::anyhow!(
|
||||
"invalid start lsn {} for ancestor timeline {}: less than timeline ancestor lsn {}",
|
||||
lsn,
|
||||
ancestor_timeline_id,
|
||||
@@ -1733,7 +1545,7 @@ impl Tenant {
|
||||
timeline_id: TimelineId,
|
||||
_ctx: &RequestContext,
|
||||
) -> Result<(), DeleteTimelineError> {
|
||||
timeline::debug_assert_current_span_has_tenant_and_timeline_id();
|
||||
debug_assert_current_span_has_tenant_and_timeline_id();
|
||||
|
||||
// Transition the timeline into TimelineState::Stopping.
|
||||
// This should prevent new operations from starting.
|
||||
@@ -1897,7 +1709,7 @@ impl Tenant {
|
||||
background_jobs_can_start: Option<&completion::Barrier>,
|
||||
ctx: &RequestContext,
|
||||
) {
|
||||
debug_assert_current_span_has_tenant_id();
|
||||
span::debug_assert_current_span_has_tenant_id();
|
||||
|
||||
let mut activating = false;
|
||||
self.state.send_modify(|current_state| {
|
||||
@@ -1968,7 +1780,7 @@ impl Tenant {
|
||||
///
|
||||
/// This will attempt to shutdown even if tenant is broken.
|
||||
pub(crate) async fn shutdown(&self, freeze_and_flush: bool) -> Result<(), ShutdownError> {
|
||||
debug_assert_current_span_has_tenant_id();
|
||||
span::debug_assert_current_span_has_tenant_id();
|
||||
// Set tenant (and its timlines) to Stoppping state.
|
||||
//
|
||||
// Since we can only transition into Stopping state after activation is complete,
|
||||
@@ -2715,7 +2527,7 @@ impl Tenant {
|
||||
dst_id: TimelineId,
|
||||
start_lsn: Option<Lsn>,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<Arc<Timeline>> {
|
||||
) -> Result<Arc<Timeline>, CreateTimelineError> {
|
||||
let tl = self
|
||||
.branch_timeline_impl(src_timeline, dst_id, start_lsn, ctx)
|
||||
.await?;
|
||||
@@ -2732,7 +2544,7 @@ impl Tenant {
|
||||
dst_id: TimelineId,
|
||||
start_lsn: Option<Lsn>,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<Arc<Timeline>> {
|
||||
) -> Result<Arc<Timeline>, CreateTimelineError> {
|
||||
self.branch_timeline_impl(src_timeline, dst_id, start_lsn, ctx)
|
||||
.await
|
||||
}
|
||||
@@ -2743,7 +2555,7 @@ impl Tenant {
|
||||
dst_id: TimelineId,
|
||||
start_lsn: Option<Lsn>,
|
||||
_ctx: &RequestContext,
|
||||
) -> anyhow::Result<Arc<Timeline>> {
|
||||
) -> Result<Arc<Timeline>, CreateTimelineError> {
|
||||
let src_id = src_timeline.timeline_id;
|
||||
|
||||
// If no start LSN is specified, we branch the new timeline from the source timeline's last record LSN
|
||||
@@ -2783,16 +2595,17 @@ impl Tenant {
|
||||
.context(format!(
|
||||
"invalid branch start lsn: less than latest GC cutoff {}",
|
||||
*latest_gc_cutoff_lsn,
|
||||
))?;
|
||||
))
|
||||
.map_err(CreateTimelineError::AncestorLsn)?;
|
||||
|
||||
// and then the planned GC cutoff
|
||||
{
|
||||
let gc_info = src_timeline.gc_info.read().unwrap();
|
||||
let cutoff = min(gc_info.pitr_cutoff, gc_info.horizon_cutoff);
|
||||
if start_lsn < cutoff {
|
||||
bail!(format!(
|
||||
return Err(CreateTimelineError::AncestorLsn(anyhow::anyhow!(
|
||||
"invalid branch start lsn: less than planned GC cutoff {cutoff}"
|
||||
));
|
||||
)));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3009,11 +2822,11 @@ impl Tenant {
|
||||
|
||||
debug!("Successfully created initial files for timeline {tenant_id}/{new_timeline_id}");
|
||||
|
||||
Ok(UninitializedTimeline {
|
||||
owning_tenant: self,
|
||||
timeline_id: new_timeline_id,
|
||||
raw_timeline: Some((timeline_struct, uninit_mark)),
|
||||
})
|
||||
Ok(UninitializedTimeline::new(
|
||||
self,
|
||||
new_timeline_id,
|
||||
Some((timeline_struct, uninit_mark)),
|
||||
))
|
||||
}
|
||||
|
||||
fn create_timeline_files(
|
||||
@@ -3825,6 +3638,9 @@ mod tests {
|
||||
{
|
||||
Ok(_) => panic!("branching should have failed"),
|
||||
Err(err) => {
|
||||
let CreateTimelineError::AncestorLsn(err) = err else {
|
||||
panic!("wrong error type")
|
||||
};
|
||||
assert!(err.to_string().contains("invalid branch start lsn"));
|
||||
assert!(err
|
||||
.source()
|
||||
@@ -3854,6 +3670,9 @@ mod tests {
|
||||
{
|
||||
Ok(_) => panic!("branching should have failed"),
|
||||
Err(err) => {
|
||||
let CreateTimelineError::AncestorLsn(err) = err else {
|
||||
panic!("wrong error type");
|
||||
};
|
||||
assert!(&err.to_string().contains("invalid branch start lsn"));
|
||||
assert!(&err
|
||||
.source()
|
||||
@@ -4562,28 +4381,3 @@ mod tests {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(not(debug_assertions))]
|
||||
#[inline]
|
||||
pub(crate) fn debug_assert_current_span_has_tenant_id() {}
|
||||
|
||||
#[cfg(debug_assertions)]
|
||||
pub static TENANT_ID_EXTRACTOR: once_cell::sync::Lazy<
|
||||
utils::tracing_span_assert::MultiNameExtractor<2>,
|
||||
> = once_cell::sync::Lazy::new(|| {
|
||||
utils::tracing_span_assert::MultiNameExtractor::new("TenantId", ["tenant_id", "tenant"])
|
||||
});
|
||||
|
||||
#[cfg(debug_assertions)]
|
||||
#[inline]
|
||||
pub(crate) fn debug_assert_current_span_has_tenant_id() {
|
||||
use utils::tracing_span_assert;
|
||||
|
||||
match tracing_span_assert::check_fields_present([&*TENANT_ID_EXTRACTOR]) {
|
||||
Ok(()) => (),
|
||||
Err(missing) => panic!(
|
||||
"missing extractors: {:?}",
|
||||
missing.into_iter().map(|e| e.name()).collect::<Vec<_>>()
|
||||
),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -60,7 +60,6 @@ use utils::lsn::Lsn;
|
||||
use historic_layer_coverage::BufferedHistoricLayerCoverage;
|
||||
pub use historic_layer_coverage::LayerKey;
|
||||
|
||||
use super::storage_layer::range_eq;
|
||||
use super::storage_layer::PersistentLayerDesc;
|
||||
|
||||
///
|
||||
@@ -365,7 +364,7 @@ impl LayerMap {
|
||||
}
|
||||
|
||||
pub fn is_l0(layer: &PersistentLayerDesc) -> bool {
|
||||
range_eq(&layer.get_key_range(), &(Key::MIN..Key::MAX))
|
||||
layer.get_key_range() == (Key::MIN..Key::MAX)
|
||||
}
|
||||
|
||||
/// This function determines which layers are counted in `count_deltas`:
|
||||
@@ -397,7 +396,7 @@ impl LayerMap {
|
||||
}
|
||||
|
||||
// Case 2
|
||||
if range_eq(partition_range, &(Key::MIN..Key::MAX)) {
|
||||
if partition_range == &(Key::MIN..Key::MAX) {
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
@@ -608,10 +608,7 @@ impl RemoteTimelineClient {
|
||||
self.calls_unfinished_metric_begin(&op);
|
||||
upload_queue.queued_operations.push_back(op);
|
||||
|
||||
info!(
|
||||
"scheduled layer file upload {}",
|
||||
layer_file_name.file_name()
|
||||
);
|
||||
info!("scheduled layer file upload {layer_file_name}");
|
||||
|
||||
// Launch the task immediately, if possible
|
||||
self.launch_queued_tasks(upload_queue);
|
||||
@@ -664,7 +661,7 @@ impl RemoteTimelineClient {
|
||||
});
|
||||
self.calls_unfinished_metric_begin(&op);
|
||||
upload_queue.queued_operations.push_back(op);
|
||||
info!("scheduled layer file deletion {}", name.file_name());
|
||||
info!("scheduled layer file deletion {name}");
|
||||
}
|
||||
|
||||
// Launch the tasks immediately, if possible
|
||||
@@ -828,7 +825,7 @@ impl RemoteTimelineClient {
|
||||
.queued_operations
|
||||
.push_back(op);
|
||||
|
||||
info!("scheduled layer file deletion {}", name.file_name());
|
||||
info!("scheduled layer file deletion {name}");
|
||||
deletions_queued += 1;
|
||||
}
|
||||
|
||||
|
||||
@@ -16,7 +16,7 @@ use tracing::{info, warn};
|
||||
|
||||
use crate::config::PageServerConf;
|
||||
use crate::tenant::storage_layer::LayerFileName;
|
||||
use crate::tenant::timeline::debug_assert_current_span_has_tenant_and_timeline_id;
|
||||
use crate::tenant::timeline::span::debug_assert_current_span_has_tenant_and_timeline_id;
|
||||
use crate::{exponential_backoff, DEFAULT_BASE_BACKOFF_SECONDS, DEFAULT_MAX_BACKOFF_SECONDS};
|
||||
use remote_storage::{DownloadError, GenericRemoteStorage};
|
||||
use utils::crashsafe::path_with_suffix_extension;
|
||||
|
||||
20
pageserver/src/tenant/span.rs
Normal file
20
pageserver/src/tenant/span.rs
Normal file
@@ -0,0 +1,20 @@
|
||||
#[cfg(debug_assertions)]
|
||||
use utils::tracing_span_assert::{check_fields_present, MultiNameExtractor};
|
||||
|
||||
#[cfg(not(debug_assertions))]
|
||||
pub(crate) fn debug_assert_current_span_has_tenant_id() {}
|
||||
|
||||
#[cfg(debug_assertions)]
|
||||
pub(crate) static TENANT_ID_EXTRACTOR: once_cell::sync::Lazy<MultiNameExtractor<2>> =
|
||||
once_cell::sync::Lazy::new(|| MultiNameExtractor::new("TenantId", ["tenant_id", "tenant"]));
|
||||
|
||||
#[cfg(debug_assertions)]
|
||||
#[track_caller]
|
||||
pub(crate) fn debug_assert_current_span_has_tenant_id() {
|
||||
if let Err(missing) = check_fields_present([&*TENANT_ID_EXTRACTOR]) {
|
||||
panic!(
|
||||
"missing extractors: {:?}",
|
||||
missing.into_iter().map(|e| e.name()).collect::<Vec<_>>()
|
||||
)
|
||||
}
|
||||
}
|
||||
@@ -54,13 +54,6 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
pub fn range_eq<T>(a: &Range<T>, b: &Range<T>) -> bool
|
||||
where
|
||||
T: PartialEq<T>,
|
||||
{
|
||||
a.start == b.start && a.end == b.end
|
||||
}
|
||||
|
||||
/// Struct used to communicate across calls to 'get_value_reconstruct_data'.
|
||||
///
|
||||
/// Before first call, you can fill in 'page_img' if you have an older cached
|
||||
@@ -335,7 +328,7 @@ impl LayerAccessStats {
|
||||
/// All layers should implement a minimal `std::fmt::Debug` without tenant or
|
||||
/// timeline names, because those are known in the context of which the layers
|
||||
/// are used in (timeline).
|
||||
pub trait Layer: std::fmt::Debug + Send + Sync {
|
||||
pub trait Layer: std::fmt::Debug + std::fmt::Display + Send + Sync {
|
||||
/// Range of keys that this layer covers
|
||||
fn get_key_range(&self) -> Range<Key>;
|
||||
|
||||
@@ -373,9 +366,6 @@ pub trait Layer: std::fmt::Debug + Send + Sync {
|
||||
ctx: &RequestContext,
|
||||
) -> Result<ValueReconstructResult>;
|
||||
|
||||
/// A short ID string that uniquely identifies the given layer within a [`LayerMap`].
|
||||
fn short_id(&self) -> String;
|
||||
|
||||
/// Dump summary of the contents of the layer to stdout
|
||||
fn dump(&self, verbose: bool, ctx: &RequestContext) -> Result<()>;
|
||||
}
|
||||
@@ -512,10 +502,12 @@ pub mod tests {
|
||||
fn is_incremental(&self) -> bool {
|
||||
self.layer_desc().is_incremental
|
||||
}
|
||||
}
|
||||
|
||||
/// Boilerplate to implement the Layer trait, always use layer_desc for persistent layers.
|
||||
fn short_id(&self) -> String {
|
||||
self.layer_desc().short_id()
|
||||
/// Boilerplate to implement the Layer trait, always use layer_desc for persistent layers.
|
||||
impl std::fmt::Display for LayerDescriptor {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
write!(f, "{}", self.layer_desc().short_id())
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -10,7 +10,7 @@
|
||||
//! The delta files are stored in timelines/<timeline_id> directory. Currently,
|
||||
//! there are no subdirectories, and each delta file is named like this:
|
||||
//!
|
||||
//! <key start>-<key end>__<start LSN>-<end LSN
|
||||
//! <key start>-<key end>__<start LSN>-<end LSN>
|
||||
//!
|
||||
//! For example:
|
||||
//!
|
||||
@@ -222,13 +222,14 @@ impl Layer for DeltaLayer {
|
||||
/// debugging function to print out the contents of the layer
|
||||
fn dump(&self, verbose: bool, ctx: &RequestContext) -> Result<()> {
|
||||
println!(
|
||||
"----- delta layer for ten {} tli {} keys {}-{} lsn {}-{} ----",
|
||||
"----- delta layer for ten {} tli {} keys {}-{} lsn {}-{} size {} ----",
|
||||
self.desc.tenant_id,
|
||||
self.desc.timeline_id,
|
||||
self.desc.key_range.start,
|
||||
self.desc.key_range.end,
|
||||
self.desc.lsn_range.start,
|
||||
self.desc.lsn_range.end
|
||||
self.desc.lsn_range.end,
|
||||
self.desc.file_size,
|
||||
);
|
||||
|
||||
if !verbose {
|
||||
@@ -394,10 +395,11 @@ impl Layer for DeltaLayer {
|
||||
fn is_incremental(&self) -> bool {
|
||||
self.layer_desc().is_incremental
|
||||
}
|
||||
|
||||
/// Boilerplate to implement the Layer trait, always use layer_desc for persistent layers.
|
||||
fn short_id(&self) -> String {
|
||||
self.layer_desc().short_id()
|
||||
}
|
||||
/// Boilerplate to implement the Layer trait, always use layer_desc for persistent layers.
|
||||
impl std::fmt::Display for DeltaLayer {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
write!(f, "{}", self.layer_desc().short_id())
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -210,9 +210,15 @@ pub enum LayerFileName {
|
||||
|
||||
impl LayerFileName {
|
||||
pub fn file_name(&self) -> String {
|
||||
self.to_string()
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Display for LayerFileName {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
match self {
|
||||
Self::Image(fname) => fname.to_string(),
|
||||
Self::Delta(fname) => fname.to_string(),
|
||||
Self::Image(fname) => write!(f, "{fname}"),
|
||||
Self::Delta(fname) => write!(f, "{fname}"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -153,12 +153,14 @@ impl Layer for ImageLayer {
|
||||
/// debugging function to print out the contents of the layer
|
||||
fn dump(&self, verbose: bool, ctx: &RequestContext) -> Result<()> {
|
||||
println!(
|
||||
"----- image layer for ten {} tli {} key {}-{} at {} ----",
|
||||
"----- image layer for ten {} tli {} key {}-{} at {} is_incremental {} size {} ----",
|
||||
self.desc.tenant_id,
|
||||
self.desc.timeline_id,
|
||||
self.desc.key_range.start,
|
||||
self.desc.key_range.end,
|
||||
self.lsn
|
||||
self.lsn,
|
||||
self.desc.is_incremental,
|
||||
self.desc.file_size
|
||||
);
|
||||
|
||||
if !verbose {
|
||||
@@ -230,10 +232,12 @@ impl Layer for ImageLayer {
|
||||
fn is_incremental(&self) -> bool {
|
||||
self.layer_desc().is_incremental
|
||||
}
|
||||
}
|
||||
|
||||
/// Boilerplate to implement the Layer trait, always use layer_desc for persistent layers.
|
||||
fn short_id(&self) -> String {
|
||||
self.layer_desc().short_id()
|
||||
/// Boilerplate to implement the Layer trait, always use layer_desc for persistent layers.
|
||||
impl std::fmt::Display for ImageLayer {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
write!(f, "{}", self.layer_desc().short_id())
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -131,13 +131,6 @@ impl Layer for InMemoryLayer {
|
||||
true
|
||||
}
|
||||
|
||||
fn short_id(&self) -> String {
|
||||
let inner = self.inner.read().unwrap();
|
||||
|
||||
let end_lsn = inner.end_lsn.unwrap_or(Lsn(u64::MAX));
|
||||
format!("inmem-{:016X}-{:016X}", self.start_lsn.0, end_lsn.0)
|
||||
}
|
||||
|
||||
/// debugging function to print out the contents of the layer
|
||||
fn dump(&self, verbose: bool, _ctx: &RequestContext) -> Result<()> {
|
||||
let inner = self.inner.read().unwrap();
|
||||
@@ -240,6 +233,15 @@ impl Layer for InMemoryLayer {
|
||||
}
|
||||
}
|
||||
|
||||
impl std::fmt::Display for InMemoryLayer {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
let inner = self.inner.read().unwrap();
|
||||
|
||||
let end_lsn = inner.end_lsn.unwrap_or(Lsn(u64::MAX));
|
||||
write!(f, "inmem-{:016X}-{:016X}", self.start_lsn.0, end_lsn.0)
|
||||
}
|
||||
}
|
||||
|
||||
impl InMemoryLayer {
|
||||
///
|
||||
/// Get layer size on the disk
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
use anyhow::Result;
|
||||
use core::fmt::Display;
|
||||
use std::ops::Range;
|
||||
use utils::{
|
||||
id::{TenantId, TimelineId},
|
||||
@@ -48,8 +49,8 @@ impl PersistentLayerDesc {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn short_id(&self) -> String {
|
||||
self.filename().file_name()
|
||||
pub fn short_id(&self) -> impl Display {
|
||||
self.filename()
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
@@ -173,13 +174,16 @@ impl PersistentLayerDesc {
|
||||
|
||||
pub fn dump(&self, _verbose: bool, _ctx: &RequestContext) -> Result<()> {
|
||||
println!(
|
||||
"----- layer for ten {} tli {} keys {}-{} lsn {}-{} ----",
|
||||
"----- layer for ten {} tli {} keys {}-{} lsn {}-{} is_delta {} is_incremental {} size {} ----",
|
||||
self.tenant_id,
|
||||
self.timeline_id,
|
||||
self.key_range.start,
|
||||
self.key_range.end,
|
||||
self.lsn_range.start,
|
||||
self.lsn_range.end
|
||||
self.lsn_range.end,
|
||||
self.is_delta,
|
||||
self.is_incremental,
|
||||
self.file_size,
|
||||
);
|
||||
|
||||
Ok(())
|
||||
|
||||
@@ -71,22 +71,22 @@ impl Layer for RemoteLayer {
|
||||
_reconstruct_state: &mut ValueReconstructState,
|
||||
_ctx: &RequestContext,
|
||||
) -> Result<ValueReconstructResult> {
|
||||
bail!(
|
||||
"layer {} needs to be downloaded",
|
||||
self.filename().file_name()
|
||||
);
|
||||
bail!("layer {self} needs to be downloaded");
|
||||
}
|
||||
|
||||
/// debugging function to print out the contents of the layer
|
||||
fn dump(&self, _verbose: bool, _ctx: &RequestContext) -> Result<()> {
|
||||
println!(
|
||||
"----- remote layer for ten {} tli {} keys {}-{} lsn {}-{} ----",
|
||||
"----- remote layer for ten {} tli {} keys {}-{} lsn {}-{} is_delta {} is_incremental {} size {} ----",
|
||||
self.desc.tenant_id,
|
||||
self.desc.timeline_id,
|
||||
self.desc.key_range.start,
|
||||
self.desc.key_range.end,
|
||||
self.desc.lsn_range.start,
|
||||
self.desc.lsn_range.end
|
||||
self.desc.lsn_range.end,
|
||||
self.desc.is_delta,
|
||||
self.desc.is_incremental,
|
||||
self.desc.file_size,
|
||||
);
|
||||
|
||||
Ok(())
|
||||
@@ -106,10 +106,12 @@ impl Layer for RemoteLayer {
|
||||
fn is_incremental(&self) -> bool {
|
||||
self.layer_desc().is_incremental
|
||||
}
|
||||
}
|
||||
|
||||
/// Boilerplate to implement the Layer trait, always use layer_desc for persistent layers.
|
||||
fn short_id(&self) -> String {
|
||||
self.layer_desc().short_id()
|
||||
/// Boilerplate to implement the Layer trait, always use layer_desc for persistent layers.
|
||||
impl std::fmt::Display for RemoteLayer {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
write!(f, "{}", self.layer_desc().short_id())
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -1,6 +1,9 @@
|
||||
//!
|
||||
|
||||
mod eviction_task;
|
||||
mod logical_size;
|
||||
pub mod span;
|
||||
pub mod uninit;
|
||||
mod walreceiver;
|
||||
|
||||
use anyhow::{anyhow, bail, ensure, Context, Result};
|
||||
@@ -8,7 +11,6 @@ use bytes::Bytes;
|
||||
use fail::fail_point;
|
||||
use futures::StreamExt;
|
||||
use itertools::Itertools;
|
||||
use once_cell::sync::OnceCell;
|
||||
use pageserver_api::models::{
|
||||
DownloadRemoteLayersTaskInfo, DownloadRemoteLayersTaskSpawnRequest,
|
||||
DownloadRemoteLayersTaskState, LayerMapInfo, LayerResidenceEventReason, LayerResidenceStatus,
|
||||
@@ -17,7 +19,7 @@ use pageserver_api::models::{
|
||||
use remote_storage::GenericRemoteStorage;
|
||||
use serde_with::serde_as;
|
||||
use storage_broker::BrokerClientChannel;
|
||||
use tokio::sync::{oneshot, watch, Semaphore, TryAcquireError};
|
||||
use tokio::sync::{oneshot, watch, TryAcquireError};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::*;
|
||||
use utils::id::TenantTimelineId;
|
||||
@@ -28,7 +30,7 @@ use std::fs;
|
||||
use std::ops::{Deref, Range};
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::pin::pin;
|
||||
use std::sync::atomic::{AtomicI64, Ordering as AtomicOrdering};
|
||||
use std::sync::atomic::Ordering as AtomicOrdering;
|
||||
use std::sync::{Arc, Mutex, RwLock, Weak};
|
||||
use std::time::{Duration, Instant, SystemTime};
|
||||
|
||||
@@ -38,6 +40,7 @@ use crate::tenant::storage_layer::{
|
||||
DeltaFileName, DeltaLayerWriter, ImageFileName, ImageLayerWriter, InMemoryLayer,
|
||||
LayerAccessStats, LayerFileName, RemoteLayer,
|
||||
};
|
||||
use crate::tenant::timeline::logical_size::CurrentLogicalSize;
|
||||
use crate::tenant::{
|
||||
ephemeral_file::is_ephemeral_file,
|
||||
layer_map::{LayerMap, SearchResult},
|
||||
@@ -79,6 +82,7 @@ use crate::{is_temporary, task_mgr};
|
||||
|
||||
pub(super) use self::eviction_task::EvictionTaskTenantState;
|
||||
use self::eviction_task::EvictionTaskTimelineState;
|
||||
use self::logical_size::LogicalSize;
|
||||
use self::walreceiver::{WalReceiver, WalReceiverConf};
|
||||
|
||||
use super::config::TenantConf;
|
||||
@@ -128,7 +132,7 @@ impl LayerFileManager {
|
||||
// A layer's descriptor is present in the LayerMap => the LayerFileManager contains a layer for the descriptor.
|
||||
self.0
|
||||
.get(&desc.key())
|
||||
.with_context(|| format!("get layer from desc: {}", desc.filename().file_name()))
|
||||
.with_context(|| format!("get layer from desc: {}", desc.filename()))
|
||||
.expect("not found")
|
||||
.clone()
|
||||
}
|
||||
@@ -365,126 +369,6 @@ pub struct Timeline {
|
||||
initial_logical_size_attempt: Mutex<Option<completion::Completion>>,
|
||||
}
|
||||
|
||||
/// Internal structure to hold all data needed for logical size calculation.
|
||||
///
|
||||
/// Calculation consists of two stages:
|
||||
///
|
||||
/// 1. Initial size calculation. That might take a long time, because it requires
|
||||
/// reading all layers containing relation sizes at `initial_part_end`.
|
||||
///
|
||||
/// 2. Collecting an incremental part and adding that to the initial size.
|
||||
/// Increments are appended on walreceiver writing new timeline data,
|
||||
/// which result in increase or decrease of the logical size.
|
||||
struct LogicalSize {
|
||||
/// Size, potentially slow to compute. Calculating this might require reading multiple
|
||||
/// layers, and even ancestor's layers.
|
||||
///
|
||||
/// NOTE: size at a given LSN is constant, but after a restart we will calculate
|
||||
/// the initial size at a different LSN.
|
||||
initial_logical_size: OnceCell<u64>,
|
||||
|
||||
/// Semaphore to track ongoing calculation of `initial_logical_size`.
|
||||
initial_size_computation: Arc<tokio::sync::Semaphore>,
|
||||
|
||||
/// Latest Lsn that has its size uncalculated, could be absent for freshly created timelines.
|
||||
initial_part_end: Option<Lsn>,
|
||||
|
||||
/// All other size changes after startup, combined together.
|
||||
///
|
||||
/// Size shouldn't ever be negative, but this is signed for two reasons:
|
||||
///
|
||||
/// 1. If we initialized the "baseline" size lazily, while we already
|
||||
/// process incoming WAL, the incoming WAL records could decrement the
|
||||
/// variable and temporarily make it negative. (This is just future-proofing;
|
||||
/// the initialization is currently not done lazily.)
|
||||
///
|
||||
/// 2. If there is a bug and we e.g. forget to increment it in some cases
|
||||
/// when size grows, but remember to decrement it when it shrinks again, the
|
||||
/// variable could go negative. In that case, it seems better to at least
|
||||
/// try to keep tracking it, rather than clamp or overflow it. Note that
|
||||
/// get_current_logical_size() will clamp the returned value to zero if it's
|
||||
/// negative, and log an error. Could set it permanently to zero or some
|
||||
/// special value to indicate "broken" instead, but this will do for now.
|
||||
///
|
||||
/// Note that we also expose a copy of this value as a prometheus metric,
|
||||
/// see `current_logical_size_gauge`. Use the `update_current_logical_size`
|
||||
/// to modify this, it will also keep the prometheus metric in sync.
|
||||
size_added_after_initial: AtomicI64,
|
||||
}
|
||||
|
||||
/// Normalized current size, that the data in pageserver occupies.
|
||||
#[derive(Debug, Clone, Copy)]
|
||||
enum CurrentLogicalSize {
|
||||
/// The size is not yet calculated to the end, this is an intermediate result,
|
||||
/// constructed from walreceiver increments and normalized: logical data could delete some objects, hence be negative,
|
||||
/// yet total logical size cannot be below 0.
|
||||
Approximate(u64),
|
||||
// Fully calculated logical size, only other future walreceiver increments are changing it, and those changes are
|
||||
// available for observation without any calculations.
|
||||
Exact(u64),
|
||||
}
|
||||
|
||||
impl CurrentLogicalSize {
|
||||
fn size(&self) -> u64 {
|
||||
*match self {
|
||||
Self::Approximate(size) => size,
|
||||
Self::Exact(size) => size,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl LogicalSize {
|
||||
fn empty_initial() -> Self {
|
||||
Self {
|
||||
initial_logical_size: OnceCell::with_value(0),
|
||||
// initial_logical_size already computed, so, don't admit any calculations
|
||||
initial_size_computation: Arc::new(Semaphore::new(0)),
|
||||
initial_part_end: None,
|
||||
size_added_after_initial: AtomicI64::new(0),
|
||||
}
|
||||
}
|
||||
|
||||
fn deferred_initial(compute_to: Lsn) -> Self {
|
||||
Self {
|
||||
initial_logical_size: OnceCell::new(),
|
||||
initial_size_computation: Arc::new(Semaphore::new(1)),
|
||||
initial_part_end: Some(compute_to),
|
||||
size_added_after_initial: AtomicI64::new(0),
|
||||
}
|
||||
}
|
||||
|
||||
fn current_size(&self) -> anyhow::Result<CurrentLogicalSize> {
|
||||
let size_increment: i64 = self.size_added_after_initial.load(AtomicOrdering::Acquire);
|
||||
// ^^^ keep this type explicit so that the casts in this function break if
|
||||
// we change the type.
|
||||
match self.initial_logical_size.get() {
|
||||
Some(initial_size) => {
|
||||
initial_size.checked_add_signed(size_increment)
|
||||
.with_context(|| format!("Overflow during logical size calculation, initial_size: {initial_size}, size_increment: {size_increment}"))
|
||||
.map(CurrentLogicalSize::Exact)
|
||||
}
|
||||
None => {
|
||||
let non_negative_size_increment = u64::try_from(size_increment).unwrap_or(0);
|
||||
Ok(CurrentLogicalSize::Approximate(non_negative_size_increment))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn increment_size(&self, delta: i64) {
|
||||
self.size_added_after_initial
|
||||
.fetch_add(delta, AtomicOrdering::SeqCst);
|
||||
}
|
||||
|
||||
/// Make the value computed by initial logical size computation
|
||||
/// available for re-use. This doesn't contain the incremental part.
|
||||
fn initialized_size(&self, lsn: Lsn) -> Option<u64> {
|
||||
match self.initial_part_end {
|
||||
Some(v) if v == lsn => self.initial_logical_size.get().copied(),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct WalReceiverInfo {
|
||||
pub wal_source_connconf: PgConnectionConfig,
|
||||
pub last_received_msg_lsn: Lsn,
|
||||
@@ -1381,9 +1265,9 @@ impl Timeline {
|
||||
.read()
|
||||
.unwrap()
|
||||
.observe(delta);
|
||||
info!(layer=%local_layer.short_id(), residence_millis=delta.as_millis(), "evicted layer after known residence period");
|
||||
info!(layer=%local_layer, residence_millis=delta.as_millis(), "evicted layer after known residence period");
|
||||
} else {
|
||||
info!(layer=%local_layer.short_id(), "evicted layer after unknown residence period");
|
||||
info!(layer=%local_layer, "evicted layer after unknown residence period");
|
||||
}
|
||||
|
||||
true
|
||||
@@ -2239,7 +2123,7 @@ impl Timeline {
|
||||
ctx: &RequestContext,
|
||||
cancel: CancellationToken,
|
||||
) -> Result<u64, CalculateLogicalSizeError> {
|
||||
debug_assert_current_span_has_tenant_and_timeline_id();
|
||||
span::debug_assert_current_span_has_tenant_and_timeline_id();
|
||||
|
||||
let mut timeline_state_updates = self.subscribe_for_state_updates();
|
||||
let self_calculation = Arc::clone(self);
|
||||
@@ -2462,11 +2346,7 @@ impl TraversalLayerExt for Arc<dyn PersistentLayer> {
|
||||
format!("{}", local_path.display())
|
||||
}
|
||||
None => {
|
||||
format!(
|
||||
"remote {}/{}",
|
||||
self.get_timeline_id(),
|
||||
self.filename().file_name()
|
||||
)
|
||||
format!("remote {}/{self}", self.get_timeline_id())
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -2474,11 +2354,7 @@ impl TraversalLayerExt for Arc<dyn PersistentLayer> {
|
||||
|
||||
impl TraversalLayerExt for Arc<InMemoryLayer> {
|
||||
fn traversal_id(&self) -> TraversalId {
|
||||
format!(
|
||||
"timeline {} in-memory {}",
|
||||
self.get_timeline_id(),
|
||||
self.short_id()
|
||||
)
|
||||
format!("timeline {} in-memory {self}", self.get_timeline_id())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2932,14 +2808,10 @@ impl Timeline {
|
||||
layers.frozen_layers.front().cloned()
|
||||
// drop 'layers' lock to allow concurrent reads and writes
|
||||
};
|
||||
if let Some(layer_to_flush) = layer_to_flush {
|
||||
if let Err(err) = self.flush_frozen_layer(layer_to_flush, ctx).await {
|
||||
error!("could not flush frozen layer: {err:?}");
|
||||
break Err(err);
|
||||
}
|
||||
continue;
|
||||
} else {
|
||||
break Ok(());
|
||||
let Some(layer_to_flush) = layer_to_flush else { break Ok(()) };
|
||||
if let Err(err) = self.flush_frozen_layer(layer_to_flush, ctx).await {
|
||||
error!("could not flush frozen layer: {err:?}");
|
||||
break Err(err);
|
||||
}
|
||||
};
|
||||
// Notify any listeners that we're done
|
||||
@@ -2998,7 +2870,7 @@ impl Timeline {
|
||||
}
|
||||
|
||||
/// Flush one frozen in-memory layer to disk, as a new delta layer.
|
||||
#[instrument(skip_all, fields(tenant_id=%self.tenant_id, timeline_id=%self.timeline_id, layer=%frozen_layer.short_id()))]
|
||||
#[instrument(skip_all, fields(tenant_id=%self.tenant_id, timeline_id=%self.timeline_id, layer=%frozen_layer))]
|
||||
async fn flush_frozen_layer(
|
||||
self: &Arc<Self>,
|
||||
frozen_layer: Arc<InMemoryLayer>,
|
||||
@@ -3677,7 +3549,7 @@ impl Timeline {
|
||||
let remotes = deltas_to_compact
|
||||
.iter()
|
||||
.filter(|l| l.is_remote_layer())
|
||||
.inspect(|l| info!("compact requires download of {}", l.filename().file_name()))
|
||||
.inspect(|l| info!("compact requires download of {l}"))
|
||||
.map(|l| {
|
||||
l.clone()
|
||||
.downcast_remote_layer()
|
||||
@@ -3701,7 +3573,7 @@ impl Timeline {
|
||||
);
|
||||
|
||||
for l in deltas_to_compact.iter() {
|
||||
info!("compact includes {}", l.filename().file_name());
|
||||
info!("compact includes {l}");
|
||||
}
|
||||
|
||||
// We don't need the original list of layers anymore. Drop it so that
|
||||
@@ -4316,8 +4188,8 @@ impl Timeline {
|
||||
if l.get_lsn_range().end > horizon_cutoff {
|
||||
debug!(
|
||||
"keeping {} because it's newer than horizon_cutoff {}",
|
||||
l.filename().file_name(),
|
||||
horizon_cutoff
|
||||
l.filename(),
|
||||
horizon_cutoff,
|
||||
);
|
||||
result.layers_needed_by_cutoff += 1;
|
||||
continue 'outer;
|
||||
@@ -4327,8 +4199,8 @@ impl Timeline {
|
||||
if l.get_lsn_range().end > pitr_cutoff {
|
||||
debug!(
|
||||
"keeping {} because it's newer than pitr_cutoff {}",
|
||||
l.filename().file_name(),
|
||||
pitr_cutoff
|
||||
l.filename(),
|
||||
pitr_cutoff,
|
||||
);
|
||||
result.layers_needed_by_pitr += 1;
|
||||
continue 'outer;
|
||||
@@ -4346,7 +4218,7 @@ impl Timeline {
|
||||
if &l.get_lsn_range().start <= retain_lsn {
|
||||
debug!(
|
||||
"keeping {} because it's still might be referenced by child branch forked at {} is_dropped: xx is_incremental: {}",
|
||||
l.filename().file_name(),
|
||||
l.filename(),
|
||||
retain_lsn,
|
||||
l.is_incremental(),
|
||||
);
|
||||
@@ -4377,10 +4249,7 @@ impl Timeline {
|
||||
if !layers
|
||||
.image_layer_exists(&l.get_key_range(), &(l.get_lsn_range().end..new_gc_cutoff))?
|
||||
{
|
||||
debug!(
|
||||
"keeping {} because it is the latest layer",
|
||||
l.filename().file_name()
|
||||
);
|
||||
debug!("keeping {} because it is the latest layer", l.filename());
|
||||
// Collect delta key ranges that need image layers to allow garbage
|
||||
// collecting the layers.
|
||||
// It is not so obvious whether we need to propagate information only about
|
||||
@@ -4397,7 +4266,7 @@ impl Timeline {
|
||||
// We didn't find any reason to keep this file, so remove it.
|
||||
debug!(
|
||||
"garbage collecting {} is_dropped: xx is_incremental: {}",
|
||||
l.filename().file_name(),
|
||||
l.filename(),
|
||||
l.is_incremental(),
|
||||
);
|
||||
layers_to_remove.push(Arc::clone(&l));
|
||||
@@ -4551,12 +4420,12 @@ impl Timeline {
|
||||
/// If the caller has a deadline or needs a timeout, they can simply stop polling:
|
||||
/// we're **cancellation-safe** because the download happens in a separate task_mgr task.
|
||||
/// So, the current download attempt will run to completion even if we stop polling.
|
||||
#[instrument(skip_all, fields(layer=%remote_layer.short_id()))]
|
||||
#[instrument(skip_all, fields(layer=%remote_layer))]
|
||||
pub async fn download_remote_layer(
|
||||
&self,
|
||||
remote_layer: Arc<RemoteLayer>,
|
||||
) -> anyhow::Result<()> {
|
||||
debug_assert_current_span_has_tenant_and_timeline_id();
|
||||
span::debug_assert_current_span_has_tenant_and_timeline_id();
|
||||
|
||||
use std::sync::atomic::Ordering::Relaxed;
|
||||
|
||||
@@ -4589,7 +4458,7 @@ impl Timeline {
|
||||
TaskKind::RemoteDownloadTask,
|
||||
Some(self.tenant_id),
|
||||
Some(self.timeline_id),
|
||||
&format!("download layer {}", remote_layer.short_id()),
|
||||
&format!("download layer {}", remote_layer),
|
||||
false,
|
||||
async move {
|
||||
let remote_client = self_clone.remote_client.as_ref().unwrap();
|
||||
@@ -4865,15 +4734,12 @@ impl Timeline {
|
||||
continue;
|
||||
}
|
||||
|
||||
let last_activity_ts = l
|
||||
.access_stats()
|
||||
.latest_activity()
|
||||
.unwrap_or_else(|| {
|
||||
// We only use this fallback if there's an implementation error.
|
||||
// `latest_activity` already does rate-limited warn!() log.
|
||||
debug!(layer=%l.filename().file_name(), "last_activity returns None, using SystemTime::now");
|
||||
SystemTime::now()
|
||||
});
|
||||
let last_activity_ts = l.access_stats().latest_activity().unwrap_or_else(|| {
|
||||
// We only use this fallback if there's an implementation error.
|
||||
// `latest_activity` already does rate-limited warn!() log.
|
||||
debug!(layer=%l, "last_activity returns None, using SystemTime::now");
|
||||
SystemTime::now()
|
||||
});
|
||||
|
||||
resident_layers.push(LocalLayerInfoForDiskUsageEviction {
|
||||
layer: l,
|
||||
@@ -4993,33 +4859,6 @@ fn rename_to_backup(path: &Path) -> anyhow::Result<()> {
|
||||
bail!("couldn't find an unused backup number for {:?}", path)
|
||||
}
|
||||
|
||||
#[cfg(not(debug_assertions))]
|
||||
#[inline]
|
||||
pub(crate) fn debug_assert_current_span_has_tenant_and_timeline_id() {}
|
||||
|
||||
#[cfg(debug_assertions)]
|
||||
#[inline]
|
||||
pub(crate) fn debug_assert_current_span_has_tenant_and_timeline_id() {
|
||||
use utils::tracing_span_assert;
|
||||
|
||||
pub static TIMELINE_ID_EXTRACTOR: once_cell::sync::Lazy<
|
||||
tracing_span_assert::MultiNameExtractor<2>,
|
||||
> = once_cell::sync::Lazy::new(|| {
|
||||
tracing_span_assert::MultiNameExtractor::new("TimelineId", ["timeline_id", "timeline"])
|
||||
});
|
||||
|
||||
match tracing_span_assert::check_fields_present([
|
||||
&*super::TENANT_ID_EXTRACTOR,
|
||||
&*TIMELINE_ID_EXTRACTOR,
|
||||
]) {
|
||||
Ok(()) => (),
|
||||
Err(missing) => panic!(
|
||||
"missing extractors: {:?}",
|
||||
missing.into_iter().map(|e| e.name()).collect::<Vec<_>>()
|
||||
),
|
||||
}
|
||||
}
|
||||
|
||||
/// Similar to `Arc::ptr_eq`, but only compares the object pointers, not vtables.
|
||||
///
|
||||
/// Returns `true` if the two `Arc` point to the same layer, false otherwise.
|
||||
|
||||
@@ -70,7 +70,6 @@ impl Timeline {
|
||||
};
|
||||
|
||||
self_clone.eviction_task(cancel).await;
|
||||
info!("eviction task finishing");
|
||||
Ok(())
|
||||
},
|
||||
);
|
||||
@@ -78,6 +77,9 @@ impl Timeline {
|
||||
|
||||
#[instrument(skip_all, fields(tenant_id = %self.tenant_id, timeline_id = %self.timeline_id))]
|
||||
async fn eviction_task(self: Arc<Self>, cancel: CancellationToken) {
|
||||
scopeguard::defer! {
|
||||
info!("eviction task finishing");
|
||||
}
|
||||
use crate::tenant::tasks::random_init_delay;
|
||||
{
|
||||
let policy = self.get_eviction_policy();
|
||||
@@ -86,7 +88,6 @@ impl Timeline {
|
||||
EvictionPolicy::NoEviction => Duration::from_secs(10),
|
||||
};
|
||||
if random_init_delay(period, &cancel).await.is_err() {
|
||||
info!("shutting down");
|
||||
return;
|
||||
}
|
||||
}
|
||||
@@ -101,7 +102,6 @@ impl Timeline {
|
||||
ControlFlow::Continue(sleep_until) => {
|
||||
tokio::select! {
|
||||
_ = cancel.cancelled() => {
|
||||
info!("shutting down");
|
||||
break;
|
||||
}
|
||||
_ = tokio::time::sleep_until(sleep_until) => { }
|
||||
@@ -209,7 +209,7 @@ impl Timeline {
|
||||
let last_activity_ts = hist_layer.access_stats().latest_activity().unwrap_or_else(|| {
|
||||
// We only use this fallback if there's an implementation error.
|
||||
// `latest_activity` already does rate-limited warn!() log.
|
||||
debug!(layer=%hist_layer.filename().file_name(), "last_activity returns None, using SystemTime::now");
|
||||
debug!(layer=%hist_layer, "last_activity returns None, using SystemTime::now");
|
||||
SystemTime::now()
|
||||
});
|
||||
|
||||
|
||||
128
pageserver/src/tenant/timeline/logical_size.rs
Normal file
128
pageserver/src/tenant/timeline/logical_size.rs
Normal file
@@ -0,0 +1,128 @@
|
||||
use anyhow::Context;
|
||||
use once_cell::sync::OnceCell;
|
||||
|
||||
use tokio::sync::Semaphore;
|
||||
use utils::lsn::Lsn;
|
||||
|
||||
use std::sync::atomic::{AtomicI64, Ordering as AtomicOrdering};
|
||||
use std::sync::Arc;
|
||||
|
||||
/// Internal structure to hold all data needed for logical size calculation.
|
||||
///
|
||||
/// Calculation consists of two stages:
|
||||
///
|
||||
/// 1. Initial size calculation. That might take a long time, because it requires
|
||||
/// reading all layers containing relation sizes at `initial_part_end`.
|
||||
///
|
||||
/// 2. Collecting an incremental part and adding that to the initial size.
|
||||
/// Increments are appended on walreceiver writing new timeline data,
|
||||
/// which result in increase or decrease of the logical size.
|
||||
pub(super) struct LogicalSize {
|
||||
/// Size, potentially slow to compute. Calculating this might require reading multiple
|
||||
/// layers, and even ancestor's layers.
|
||||
///
|
||||
/// NOTE: size at a given LSN is constant, but after a restart we will calculate
|
||||
/// the initial size at a different LSN.
|
||||
pub initial_logical_size: OnceCell<u64>,
|
||||
|
||||
/// Semaphore to track ongoing calculation of `initial_logical_size`.
|
||||
pub initial_size_computation: Arc<tokio::sync::Semaphore>,
|
||||
|
||||
/// Latest Lsn that has its size uncalculated, could be absent for freshly created timelines.
|
||||
pub initial_part_end: Option<Lsn>,
|
||||
|
||||
/// All other size changes after startup, combined together.
|
||||
///
|
||||
/// Size shouldn't ever be negative, but this is signed for two reasons:
|
||||
///
|
||||
/// 1. If we initialized the "baseline" size lazily, while we already
|
||||
/// process incoming WAL, the incoming WAL records could decrement the
|
||||
/// variable and temporarily make it negative. (This is just future-proofing;
|
||||
/// the initialization is currently not done lazily.)
|
||||
///
|
||||
/// 2. If there is a bug and we e.g. forget to increment it in some cases
|
||||
/// when size grows, but remember to decrement it when it shrinks again, the
|
||||
/// variable could go negative. In that case, it seems better to at least
|
||||
/// try to keep tracking it, rather than clamp or overflow it. Note that
|
||||
/// get_current_logical_size() will clamp the returned value to zero if it's
|
||||
/// negative, and log an error. Could set it permanently to zero or some
|
||||
/// special value to indicate "broken" instead, but this will do for now.
|
||||
///
|
||||
/// Note that we also expose a copy of this value as a prometheus metric,
|
||||
/// see `current_logical_size_gauge`. Use the `update_current_logical_size`
|
||||
/// to modify this, it will also keep the prometheus metric in sync.
|
||||
pub size_added_after_initial: AtomicI64,
|
||||
}
|
||||
|
||||
/// Normalized current size, that the data in pageserver occupies.
|
||||
#[derive(Debug, Clone, Copy)]
|
||||
pub(super) enum CurrentLogicalSize {
|
||||
/// The size is not yet calculated to the end, this is an intermediate result,
|
||||
/// constructed from walreceiver increments and normalized: logical data could delete some objects, hence be negative,
|
||||
/// yet total logical size cannot be below 0.
|
||||
Approximate(u64),
|
||||
// Fully calculated logical size, only other future walreceiver increments are changing it, and those changes are
|
||||
// available for observation without any calculations.
|
||||
Exact(u64),
|
||||
}
|
||||
|
||||
impl CurrentLogicalSize {
|
||||
pub(super) fn size(&self) -> u64 {
|
||||
*match self {
|
||||
Self::Approximate(size) => size,
|
||||
Self::Exact(size) => size,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl LogicalSize {
|
||||
pub(super) fn empty_initial() -> Self {
|
||||
Self {
|
||||
initial_logical_size: OnceCell::with_value(0),
|
||||
// initial_logical_size already computed, so, don't admit any calculations
|
||||
initial_size_computation: Arc::new(Semaphore::new(0)),
|
||||
initial_part_end: None,
|
||||
size_added_after_initial: AtomicI64::new(0),
|
||||
}
|
||||
}
|
||||
|
||||
pub(super) fn deferred_initial(compute_to: Lsn) -> Self {
|
||||
Self {
|
||||
initial_logical_size: OnceCell::new(),
|
||||
initial_size_computation: Arc::new(Semaphore::new(1)),
|
||||
initial_part_end: Some(compute_to),
|
||||
size_added_after_initial: AtomicI64::new(0),
|
||||
}
|
||||
}
|
||||
|
||||
pub(super) fn current_size(&self) -> anyhow::Result<CurrentLogicalSize> {
|
||||
let size_increment: i64 = self.size_added_after_initial.load(AtomicOrdering::Acquire);
|
||||
// ^^^ keep this type explicit so that the casts in this function break if
|
||||
// we change the type.
|
||||
match self.initial_logical_size.get() {
|
||||
Some(initial_size) => {
|
||||
initial_size.checked_add_signed(size_increment)
|
||||
.with_context(|| format!("Overflow during logical size calculation, initial_size: {initial_size}, size_increment: {size_increment}"))
|
||||
.map(CurrentLogicalSize::Exact)
|
||||
}
|
||||
None => {
|
||||
let non_negative_size_increment = u64::try_from(size_increment).unwrap_or(0);
|
||||
Ok(CurrentLogicalSize::Approximate(non_negative_size_increment))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub(super) fn increment_size(&self, delta: i64) {
|
||||
self.size_added_after_initial
|
||||
.fetch_add(delta, AtomicOrdering::SeqCst);
|
||||
}
|
||||
|
||||
/// Make the value computed by initial logical size computation
|
||||
/// available for re-use. This doesn't contain the incremental part.
|
||||
pub(super) fn initialized_size(&self, lsn: Lsn) -> Option<u64> {
|
||||
match self.initial_part_end {
|
||||
Some(v) if v == lsn => self.initial_logical_size.get().copied(),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
}
|
||||
25
pageserver/src/tenant/timeline/span.rs
Normal file
25
pageserver/src/tenant/timeline/span.rs
Normal file
@@ -0,0 +1,25 @@
|
||||
#[cfg(debug_assertions)]
|
||||
use utils::tracing_span_assert::{check_fields_present, Extractor, MultiNameExtractor};
|
||||
|
||||
#[cfg(not(debug_assertions))]
|
||||
pub(crate) fn debug_assert_current_span_has_tenant_and_timeline_id() {}
|
||||
|
||||
#[cfg(debug_assertions)]
|
||||
#[track_caller]
|
||||
pub(crate) fn debug_assert_current_span_has_tenant_and_timeline_id() {
|
||||
static TIMELINE_ID_EXTRACTOR: once_cell::sync::Lazy<MultiNameExtractor<2>> =
|
||||
once_cell::sync::Lazy::new(|| {
|
||||
MultiNameExtractor::new("TimelineId", ["timeline_id", "timeline"])
|
||||
});
|
||||
|
||||
let fields: [&dyn Extractor; 2] = [
|
||||
&*crate::tenant::span::TENANT_ID_EXTRACTOR,
|
||||
&*TIMELINE_ID_EXTRACTOR,
|
||||
];
|
||||
if let Err(missing) = check_fields_present(fields) {
|
||||
panic!(
|
||||
"missing extractors: {:?}",
|
||||
missing.into_iter().map(|e| e.name()).collect::<Vec<_>>()
|
||||
)
|
||||
}
|
||||
}
|
||||
219
pageserver/src/tenant/timeline/uninit.rs
Normal file
219
pageserver/src/tenant/timeline/uninit.rs
Normal file
@@ -0,0 +1,219 @@
|
||||
use std::{collections::hash_map::Entry, fs, path::PathBuf, sync::Arc};
|
||||
|
||||
use anyhow::Context;
|
||||
use tracing::{error, info, info_span, warn};
|
||||
use utils::{crashsafe, id::TimelineId, lsn::Lsn};
|
||||
|
||||
use crate::{
|
||||
context::RequestContext,
|
||||
import_datadir,
|
||||
tenant::{ignore_absent_files, Tenant},
|
||||
};
|
||||
|
||||
use super::Timeline;
|
||||
|
||||
/// A timeline with some of its files on disk, being initialized.
|
||||
/// This struct ensures the atomicity of the timeline init: it's either properly created and inserted into pageserver's memory, or
|
||||
/// its local files are removed. In the worst case of a crash, an uninit mark file is left behind, which causes the directory
|
||||
/// to be removed on next restart.
|
||||
///
|
||||
/// The caller is responsible for proper timeline data filling before the final init.
|
||||
#[must_use]
|
||||
pub struct UninitializedTimeline<'t> {
|
||||
pub(crate) owning_tenant: &'t Tenant,
|
||||
timeline_id: TimelineId,
|
||||
raw_timeline: Option<(Arc<Timeline>, TimelineUninitMark)>,
|
||||
}
|
||||
|
||||
impl<'t> UninitializedTimeline<'t> {
|
||||
pub(crate) fn new(
|
||||
owning_tenant: &'t Tenant,
|
||||
timeline_id: TimelineId,
|
||||
raw_timeline: Option<(Arc<Timeline>, TimelineUninitMark)>,
|
||||
) -> Self {
|
||||
Self {
|
||||
owning_tenant,
|
||||
timeline_id,
|
||||
raw_timeline,
|
||||
}
|
||||
}
|
||||
|
||||
/// Finish timeline creation: insert it into the Tenant's timelines map and remove the
|
||||
/// uninit mark file.
|
||||
///
|
||||
/// This function launches the flush loop if not already done.
|
||||
///
|
||||
/// The caller is responsible for activating the timeline (function `.activate()`).
|
||||
pub(crate) fn finish_creation(mut self) -> anyhow::Result<Arc<Timeline>> {
|
||||
let timeline_id = self.timeline_id;
|
||||
let tenant_id = self.owning_tenant.tenant_id;
|
||||
|
||||
let (new_timeline, uninit_mark) = self.raw_timeline.take().with_context(|| {
|
||||
format!("No timeline for initalization found for {tenant_id}/{timeline_id}")
|
||||
})?;
|
||||
|
||||
// Check that the caller initialized disk_consistent_lsn
|
||||
let new_disk_consistent_lsn = new_timeline.get_disk_consistent_lsn();
|
||||
anyhow::ensure!(
|
||||
new_disk_consistent_lsn.is_valid(),
|
||||
"new timeline {tenant_id}/{timeline_id} has invalid disk_consistent_lsn"
|
||||
);
|
||||
|
||||
let mut timelines = self.owning_tenant.timelines.lock().unwrap();
|
||||
match timelines.entry(timeline_id) {
|
||||
Entry::Occupied(_) => anyhow::bail!(
|
||||
"Found freshly initialized timeline {tenant_id}/{timeline_id} in the tenant map"
|
||||
),
|
||||
Entry::Vacant(v) => {
|
||||
uninit_mark.remove_uninit_mark().with_context(|| {
|
||||
format!(
|
||||
"Failed to remove uninit mark file for timeline {tenant_id}/{timeline_id}"
|
||||
)
|
||||
})?;
|
||||
v.insert(Arc::clone(&new_timeline));
|
||||
|
||||
new_timeline.maybe_spawn_flush_loop();
|
||||
}
|
||||
}
|
||||
|
||||
Ok(new_timeline)
|
||||
}
|
||||
|
||||
/// Prepares timeline data by loading it from the basebackup archive.
|
||||
pub(crate) async fn import_basebackup_from_tar(
|
||||
self,
|
||||
copyin_read: &mut (impl tokio::io::AsyncRead + Send + Sync + Unpin),
|
||||
base_lsn: Lsn,
|
||||
broker_client: storage_broker::BrokerClientChannel,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<Arc<Timeline>> {
|
||||
let raw_timeline = self.raw_timeline()?;
|
||||
|
||||
import_datadir::import_basebackup_from_tar(raw_timeline, copyin_read, base_lsn, ctx)
|
||||
.await
|
||||
.context("Failed to import basebackup")?;
|
||||
|
||||
// Flush the new layer files to disk, before we make the timeline as available to
|
||||
// the outside world.
|
||||
//
|
||||
// Flush loop needs to be spawned in order to be able to flush.
|
||||
raw_timeline.maybe_spawn_flush_loop();
|
||||
|
||||
fail::fail_point!("before-checkpoint-new-timeline", |_| {
|
||||
anyhow::bail!("failpoint before-checkpoint-new-timeline");
|
||||
});
|
||||
|
||||
raw_timeline
|
||||
.freeze_and_flush()
|
||||
.await
|
||||
.context("Failed to flush after basebackup import")?;
|
||||
|
||||
// All the data has been imported. Insert the Timeline into the tenant's timelines
|
||||
// map and remove the uninit mark file.
|
||||
let tl = self.finish_creation()?;
|
||||
tl.activate(broker_client, None, ctx);
|
||||
Ok(tl)
|
||||
}
|
||||
|
||||
pub(crate) fn raw_timeline(&self) -> anyhow::Result<&Arc<Timeline>> {
|
||||
Ok(&self
|
||||
.raw_timeline
|
||||
.as_ref()
|
||||
.with_context(|| {
|
||||
format!(
|
||||
"No raw timeline {}/{} found",
|
||||
self.owning_tenant.tenant_id, self.timeline_id
|
||||
)
|
||||
})?
|
||||
.0)
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for UninitializedTimeline<'_> {
|
||||
fn drop(&mut self) {
|
||||
if let Some((_, uninit_mark)) = self.raw_timeline.take() {
|
||||
let _entered = info_span!("drop_uninitialized_timeline", tenant = %self.owning_tenant.tenant_id, timeline = %self.timeline_id).entered();
|
||||
error!("Timeline got dropped without initializing, cleaning its files");
|
||||
cleanup_timeline_directory(uninit_mark);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn cleanup_timeline_directory(uninit_mark: TimelineUninitMark) {
|
||||
let timeline_path = &uninit_mark.timeline_path;
|
||||
match ignore_absent_files(|| fs::remove_dir_all(timeline_path)) {
|
||||
Ok(()) => {
|
||||
info!("Timeline dir {timeline_path:?} removed successfully, removing the uninit mark")
|
||||
}
|
||||
Err(e) => {
|
||||
error!("Failed to clean up uninitialized timeline directory {timeline_path:?}: {e:?}")
|
||||
}
|
||||
}
|
||||
drop(uninit_mark); // mark handles its deletion on drop, gets retained if timeline dir exists
|
||||
}
|
||||
|
||||
/// An uninit mark file, created along the timeline dir to ensure the timeline either gets fully initialized and loaded into pageserver's memory,
|
||||
/// or gets removed eventually.
|
||||
///
|
||||
/// XXX: it's important to create it near the timeline dir, not inside it to ensure timeline dir gets removed first.
|
||||
#[must_use]
|
||||
pub(crate) struct TimelineUninitMark {
|
||||
uninit_mark_deleted: bool,
|
||||
uninit_mark_path: PathBuf,
|
||||
pub(crate) timeline_path: PathBuf,
|
||||
}
|
||||
|
||||
impl TimelineUninitMark {
|
||||
pub(crate) fn new(uninit_mark_path: PathBuf, timeline_path: PathBuf) -> Self {
|
||||
Self {
|
||||
uninit_mark_deleted: false,
|
||||
uninit_mark_path,
|
||||
timeline_path,
|
||||
}
|
||||
}
|
||||
|
||||
fn remove_uninit_mark(mut self) -> anyhow::Result<()> {
|
||||
if !self.uninit_mark_deleted {
|
||||
self.delete_mark_file_if_present()?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn delete_mark_file_if_present(&mut self) -> anyhow::Result<()> {
|
||||
let uninit_mark_file = &self.uninit_mark_path;
|
||||
let uninit_mark_parent = uninit_mark_file
|
||||
.parent()
|
||||
.with_context(|| format!("Uninit mark file {uninit_mark_file:?} has no parent"))?;
|
||||
ignore_absent_files(|| fs::remove_file(uninit_mark_file)).with_context(|| {
|
||||
format!("Failed to remove uninit mark file at path {uninit_mark_file:?}")
|
||||
})?;
|
||||
crashsafe::fsync(uninit_mark_parent).context("Failed to fsync uninit mark parent")?;
|
||||
self.uninit_mark_deleted = true;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for TimelineUninitMark {
|
||||
fn drop(&mut self) {
|
||||
if !self.uninit_mark_deleted {
|
||||
if self.timeline_path.exists() {
|
||||
error!(
|
||||
"Uninit mark {} is not removed, timeline {} stays uninitialized",
|
||||
self.uninit_mark_path.display(),
|
||||
self.timeline_path.display()
|
||||
)
|
||||
} else {
|
||||
// unblock later timeline creation attempts
|
||||
warn!(
|
||||
"Removing intermediate uninit mark file {}",
|
||||
self.uninit_mark_path.display()
|
||||
);
|
||||
if let Err(e) = self.delete_mark_file_if_present() {
|
||||
error!("Failed to remove the uninit mark file: {e}")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -71,6 +71,8 @@ pub(super) async fn handle_walreceiver_connection(
|
||||
ctx: RequestContext,
|
||||
node: NodeId,
|
||||
) -> anyhow::Result<()> {
|
||||
debug_assert_current_span_has_tenant_and_timeline_id();
|
||||
|
||||
WALRECEIVER_STARTED_CONNECTIONS.inc();
|
||||
|
||||
// Connect to the database in replication mode.
|
||||
@@ -140,6 +142,9 @@ pub(super) async fn handle_walreceiver_connection(
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
// Enrich the log lines emitted by this closure with meaningful context.
|
||||
// TODO: technically, this task outlives the surrounding function, so, the
|
||||
// spans won't be properly nested.
|
||||
.instrument(tracing::info_span!("poller")),
|
||||
);
|
||||
|
||||
|
||||
@@ -302,15 +302,6 @@ impl VirtualFile {
|
||||
.observe_closure_duration(|| self.open_options.open(&self.path))?;
|
||||
|
||||
// Perform the requested operation on it
|
||||
//
|
||||
// TODO: We could downgrade the locks to read mode before calling
|
||||
// 'func', to allow a little bit more concurrency, but the standard
|
||||
// library RwLock doesn't allow downgrading without releasing the lock,
|
||||
// and that doesn't seem worth the trouble.
|
||||
//
|
||||
// XXX: `parking_lot::RwLock` can enable such downgrades, yet its implementation is fair and
|
||||
// may deadlock on subsequent read calls.
|
||||
// Simply replacing all `RwLock` in project causes deadlocks, so use it sparingly.
|
||||
let result = STORAGE_IO_TIME
|
||||
.with_label_values(&[op, &self.tenant_id, &self.timeline_id])
|
||||
.observe_closure_duration(|| func(&file));
|
||||
|
||||
@@ -175,8 +175,8 @@ impl WalRedoManager for PostgresRedoManager {
|
||||
let mut img = base_img.map(|p| p.1);
|
||||
let mut batch_neon = can_apply_in_neon(&records[0].1);
|
||||
let mut batch_start = 0;
|
||||
for i in 1..records.len() {
|
||||
let rec_neon = can_apply_in_neon(&records[i].1);
|
||||
for (i, record) in records.iter().enumerate().skip(1) {
|
||||
let rec_neon = can_apply_in_neon(&record.1);
|
||||
|
||||
if rec_neon != batch_neon {
|
||||
let result = if batch_neon {
|
||||
|
||||
Reference in New Issue
Block a user