mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-08 14:02:55 +00:00
add gauge for in-flight layer uploads (#3951)
For the "worst-case /storage usage panel", we need to compute ``` remote size + local-only size ``` We currently don't have a metric for local-only layers. The number of in-flight layers in the upload queue is just that, so, let Prometheus scrape it. The metric is two counters (started and finished). The delta is the amount of in-flight uploads in the queue. The metrics are incremented in the respective `call_unfinished_metric_*` functions. These track ongoing operations by file_kind and op_kind. We only need this metric for layer uploads, so, there's the new RemoteTimelineClientMetricsCallTrackSize type that forces all call sites to decide whether they want the size tracked or not. If we find that other file_kinds or op_kinds are interesting (metadata uploads, layer downloads, layer deletes) are interesting, we can just enable them, and they'll be just another label combination within the metrics that this PR adds. fixes https://github.com/neondatabase/neon/issues/3922
This commit is contained in:
committed by
GitHub
parent
4911d7ce6f
commit
fa20e37574
@@ -385,6 +385,26 @@ static REMOTE_TIMELINE_CLIENT_CALLS_STARTED_HIST: Lazy<HistogramVec> = Lazy::new
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
static REMOTE_TIMELINE_CLIENT_BYTES_STARTED_COUNTER: Lazy<IntCounterVec> = Lazy::new(|| {
|
||||
register_int_counter_vec!(
|
||||
"pageserver_remote_timeline_client_bytes_started",
|
||||
"Incremented by the number of bytes associated with a remote timeline client operation. \
|
||||
The increment happens when the operation is scheduled.",
|
||||
&["tenant_id", "timeline_id", "file_kind", "op_kind"],
|
||||
)
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
static REMOTE_TIMELINE_CLIENT_BYTES_FINISHED_COUNTER: Lazy<IntCounterVec> = Lazy::new(|| {
|
||||
register_int_counter_vec!(
|
||||
"pageserver_remote_timeline_client_bytes_finished",
|
||||
"Incremented by the number of bytes associated with a remote timeline client operation. \
|
||||
The increment happens when the operation finishes (regardless of success/failure/shutdown).",
|
||||
&["tenant_id", "timeline_id", "file_kind", "op_kind"],
|
||||
)
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
|
||||
pub enum RemoteOpKind {
|
||||
Upload,
|
||||
@@ -739,6 +759,8 @@ pub struct RemoteTimelineClientMetrics {
|
||||
remote_operation_time: Mutex<HashMap<(&'static str, &'static str, &'static str), Histogram>>,
|
||||
calls_unfinished_gauge: Mutex<HashMap<(&'static str, &'static str), IntGauge>>,
|
||||
calls_started_hist: Mutex<HashMap<(&'static str, &'static str), Histogram>>,
|
||||
bytes_started_counter: Mutex<HashMap<(&'static str, &'static str), IntCounter>>,
|
||||
bytes_finished_counter: Mutex<HashMap<(&'static str, &'static str), IntCounter>>,
|
||||
}
|
||||
|
||||
impl RemoteTimelineClientMetrics {
|
||||
@@ -749,6 +771,8 @@ impl RemoteTimelineClientMetrics {
|
||||
remote_operation_time: Mutex::new(HashMap::default()),
|
||||
calls_unfinished_gauge: Mutex::new(HashMap::default()),
|
||||
calls_started_hist: Mutex::new(HashMap::default()),
|
||||
bytes_started_counter: Mutex::new(HashMap::default()),
|
||||
bytes_finished_counter: Mutex::new(HashMap::default()),
|
||||
remote_physical_size_gauge: Mutex::new(None),
|
||||
}
|
||||
}
|
||||
@@ -787,6 +811,7 @@ impl RemoteTimelineClientMetrics {
|
||||
});
|
||||
metric.clone()
|
||||
}
|
||||
|
||||
fn calls_unfinished_gauge(
|
||||
&self,
|
||||
file_kind: &RemoteOpFileKind,
|
||||
@@ -828,32 +853,125 @@ impl RemoteTimelineClientMetrics {
|
||||
});
|
||||
metric.clone()
|
||||
}
|
||||
|
||||
fn bytes_started_counter(
|
||||
&self,
|
||||
file_kind: &RemoteOpFileKind,
|
||||
op_kind: &RemoteOpKind,
|
||||
) -> IntCounter {
|
||||
// XXX would be nice to have an upgradable RwLock
|
||||
let mut guard = self.bytes_started_counter.lock().unwrap();
|
||||
let key = (file_kind.as_str(), op_kind.as_str());
|
||||
let metric = guard.entry(key).or_insert_with(move || {
|
||||
REMOTE_TIMELINE_CLIENT_BYTES_STARTED_COUNTER
|
||||
.get_metric_with_label_values(&[
|
||||
&self.tenant_id.to_string(),
|
||||
&self.timeline_id.to_string(),
|
||||
key.0,
|
||||
key.1,
|
||||
])
|
||||
.unwrap()
|
||||
});
|
||||
metric.clone()
|
||||
}
|
||||
|
||||
fn bytes_finished_counter(
|
||||
&self,
|
||||
file_kind: &RemoteOpFileKind,
|
||||
op_kind: &RemoteOpKind,
|
||||
) -> IntCounter {
|
||||
// XXX would be nice to have an upgradable RwLock
|
||||
let mut guard = self.bytes_finished_counter.lock().unwrap();
|
||||
let key = (file_kind.as_str(), op_kind.as_str());
|
||||
let metric = guard.entry(key).or_insert_with(move || {
|
||||
REMOTE_TIMELINE_CLIENT_BYTES_FINISHED_COUNTER
|
||||
.get_metric_with_label_values(&[
|
||||
&self.tenant_id.to_string(),
|
||||
&self.timeline_id.to_string(),
|
||||
key.0,
|
||||
key.1,
|
||||
])
|
||||
.unwrap()
|
||||
});
|
||||
metric.clone()
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
impl RemoteTimelineClientMetrics {
|
||||
pub fn get_bytes_started_counter_value(
|
||||
&self,
|
||||
file_kind: &RemoteOpFileKind,
|
||||
op_kind: &RemoteOpKind,
|
||||
) -> Option<u64> {
|
||||
let guard = self.bytes_started_counter.lock().unwrap();
|
||||
let key = (file_kind.as_str(), op_kind.as_str());
|
||||
guard.get(&key).map(|counter| counter.get())
|
||||
}
|
||||
|
||||
pub fn get_bytes_finished_counter_value(
|
||||
&self,
|
||||
file_kind: &RemoteOpFileKind,
|
||||
op_kind: &RemoteOpKind,
|
||||
) -> Option<u64> {
|
||||
let guard = self.bytes_finished_counter.lock().unwrap();
|
||||
let key = (file_kind.as_str(), op_kind.as_str());
|
||||
guard.get(&key).map(|counter| counter.get())
|
||||
}
|
||||
}
|
||||
|
||||
/// See [`RemoteTimelineClientMetrics::call_begin`].
|
||||
#[must_use]
|
||||
pub(crate) struct RemoteTimelineClientCallMetricGuard(Option<IntGauge>);
|
||||
pub(crate) struct RemoteTimelineClientCallMetricGuard {
|
||||
/// Decremented on drop.
|
||||
calls_unfinished_metric: Option<IntGauge>,
|
||||
/// If Some(), this references the bytes_finished metric, and we increment it by the given `u64` on drop.
|
||||
bytes_finished: Option<(IntCounter, u64)>,
|
||||
}
|
||||
|
||||
impl RemoteTimelineClientCallMetricGuard {
|
||||
/// Consume this guard object without decrementing the metric.
|
||||
/// The caller vouches to do this manually, so that the prior increment of the gauge will cancel out.
|
||||
/// Consume this guard object without performing the metric updates it would do on `drop()`.
|
||||
/// The caller vouches to do the metric updates manually.
|
||||
pub fn will_decrement_manually(mut self) {
|
||||
self.0 = None; // prevent drop() from decrementing
|
||||
let RemoteTimelineClientCallMetricGuard {
|
||||
calls_unfinished_metric,
|
||||
bytes_finished,
|
||||
} = &mut self;
|
||||
calls_unfinished_metric.take();
|
||||
bytes_finished.take();
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for RemoteTimelineClientCallMetricGuard {
|
||||
fn drop(&mut self) {
|
||||
if let RemoteTimelineClientCallMetricGuard(Some(guard)) = self {
|
||||
let RemoteTimelineClientCallMetricGuard {
|
||||
calls_unfinished_metric,
|
||||
bytes_finished,
|
||||
} = self;
|
||||
if let Some(guard) = calls_unfinished_metric.take() {
|
||||
guard.dec();
|
||||
}
|
||||
if let Some((bytes_finished_metric, value)) = bytes_finished {
|
||||
bytes_finished_metric.inc_by(*value);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// The enum variants communicate to the [`RemoteTimelineClientMetrics`] whether to
|
||||
/// track the byte size of this call in applicable metric(s).
|
||||
pub(crate) enum RemoteTimelineClientMetricsCallTrackSize {
|
||||
/// Do not account for this call's byte size in any metrics.
|
||||
/// The `reason` field is there to make the call sites self-documenting
|
||||
/// about why they don't need the metric.
|
||||
DontTrackSize { reason: &'static str },
|
||||
/// Track the byte size of the call in applicable metric(s).
|
||||
Bytes(u64),
|
||||
}
|
||||
|
||||
impl RemoteTimelineClientMetrics {
|
||||
/// Increment the metrics that track ongoing calls to the remote timeline client instance.
|
||||
/// Update the metrics that change when a call to the remote timeline client instance starts.
|
||||
///
|
||||
/// Drop the returned guard object once the operation is finished to decrement the values.
|
||||
/// Drop the returned guard object once the operation is finished to updates corresponding metrics that track completions.
|
||||
/// Or, use [`RemoteTimelineClientCallMetricGuard::will_decrement_manually`] and [`call_end`] if that
|
||||
/// is more suitable.
|
||||
/// Never do both.
|
||||
@@ -861,24 +979,51 @@ impl RemoteTimelineClientMetrics {
|
||||
&self,
|
||||
file_kind: &RemoteOpFileKind,
|
||||
op_kind: &RemoteOpKind,
|
||||
size: RemoteTimelineClientMetricsCallTrackSize,
|
||||
) -> RemoteTimelineClientCallMetricGuard {
|
||||
let unfinished_metric = self.calls_unfinished_gauge(file_kind, op_kind);
|
||||
let calls_unfinished_metric = self.calls_unfinished_gauge(file_kind, op_kind);
|
||||
self.calls_started_hist(file_kind, op_kind)
|
||||
.observe(unfinished_metric.get() as f64);
|
||||
unfinished_metric.inc();
|
||||
RemoteTimelineClientCallMetricGuard(Some(unfinished_metric))
|
||||
.observe(calls_unfinished_metric.get() as f64);
|
||||
calls_unfinished_metric.inc(); // NB: inc after the histogram, see comment on underlying metric
|
||||
|
||||
let bytes_finished = match size {
|
||||
RemoteTimelineClientMetricsCallTrackSize::DontTrackSize { reason: _reason } => {
|
||||
// nothing to do
|
||||
None
|
||||
}
|
||||
RemoteTimelineClientMetricsCallTrackSize::Bytes(size) => {
|
||||
self.bytes_started_counter(file_kind, op_kind).inc_by(size);
|
||||
let finished_counter = self.bytes_finished_counter(file_kind, op_kind);
|
||||
Some((finished_counter, size))
|
||||
}
|
||||
};
|
||||
RemoteTimelineClientCallMetricGuard {
|
||||
calls_unfinished_metric: Some(calls_unfinished_metric),
|
||||
bytes_finished,
|
||||
}
|
||||
}
|
||||
|
||||
/// Manually decrement the metric instead of using the guard object.
|
||||
/// Manually udpate the metrics that track completions, instead of using the guard object.
|
||||
/// Using the guard object is generally preferable.
|
||||
/// See [`call_begin`] for more context.
|
||||
pub(crate) fn call_end(&self, file_kind: &RemoteOpFileKind, op_kind: &RemoteOpKind) {
|
||||
let unfinished_metric = self.calls_unfinished_gauge(file_kind, op_kind);
|
||||
pub(crate) fn call_end(
|
||||
&self,
|
||||
file_kind: &RemoteOpFileKind,
|
||||
op_kind: &RemoteOpKind,
|
||||
size: RemoteTimelineClientMetricsCallTrackSize,
|
||||
) {
|
||||
let calls_unfinished_metric = self.calls_unfinished_gauge(file_kind, op_kind);
|
||||
debug_assert!(
|
||||
unfinished_metric.get() > 0,
|
||||
calls_unfinished_metric.get() > 0,
|
||||
"begin and end should cancel out"
|
||||
);
|
||||
unfinished_metric.dec();
|
||||
calls_unfinished_metric.dec();
|
||||
match size {
|
||||
RemoteTimelineClientMetricsCallTrackSize::DontTrackSize { reason: _reason } => {}
|
||||
RemoteTimelineClientMetricsCallTrackSize::Bytes(size) => {
|
||||
self.bytes_finished_counter(file_kind, op_kind).inc_by(size);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -891,6 +1036,8 @@ impl Drop for RemoteTimelineClientMetrics {
|
||||
remote_operation_time,
|
||||
calls_unfinished_gauge,
|
||||
calls_started_hist,
|
||||
bytes_started_counter,
|
||||
bytes_finished_counter,
|
||||
} = self;
|
||||
for ((a, b, c), _) in remote_operation_time.get_mut().unwrap().drain() {
|
||||
let _ = REMOTE_OPERATION_TIME.remove_label_values(&[tenant_id, timeline_id, a, b, c]);
|
||||
@@ -911,6 +1058,22 @@ impl Drop for RemoteTimelineClientMetrics {
|
||||
b,
|
||||
]);
|
||||
}
|
||||
for ((a, b), _) in bytes_started_counter.get_mut().unwrap().drain() {
|
||||
let _ = REMOTE_TIMELINE_CLIENT_BYTES_STARTED_COUNTER.remove_label_values(&[
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
a,
|
||||
b,
|
||||
]);
|
||||
}
|
||||
for ((a, b), _) in bytes_finished_counter.get_mut().unwrap().drain() {
|
||||
let _ = REMOTE_TIMELINE_CLIENT_BYTES_FINISHED_COUNTER.remove_label_values(&[
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
a,
|
||||
b,
|
||||
]);
|
||||
}
|
||||
{
|
||||
let _ = remote_physical_size_gauge; // use to avoid 'unused' warning in desctructuring above
|
||||
let _ = REMOTE_PHYSICAL_SIZE.remove_label_values(&[tenant_id, timeline_id]);
|
||||
|
||||
@@ -219,7 +219,8 @@ use utils::lsn::Lsn;
|
||||
|
||||
use crate::metrics::{
|
||||
MeasureRemoteOp, RemoteOpFileKind, RemoteOpKind, RemoteTimelineClientMetrics,
|
||||
REMOTE_ONDEMAND_DOWNLOADED_BYTES, REMOTE_ONDEMAND_DOWNLOADED_LAYERS,
|
||||
RemoteTimelineClientMetricsCallTrackSize, REMOTE_ONDEMAND_DOWNLOADED_BYTES,
|
||||
REMOTE_ONDEMAND_DOWNLOADED_LAYERS,
|
||||
};
|
||||
use crate::tenant::remote_timeline_client::index::LayerFileMetadata;
|
||||
use crate::{
|
||||
@@ -367,9 +368,13 @@ impl RemoteTimelineClient {
|
||||
|
||||
/// Download index file
|
||||
pub async fn download_index_file(&self) -> Result<IndexPart, DownloadError> {
|
||||
let _unfinished_gauge_guard = self
|
||||
.metrics
|
||||
.call_begin(&RemoteOpFileKind::Index, &RemoteOpKind::Download);
|
||||
let _unfinished_gauge_guard = self.metrics.call_begin(
|
||||
&RemoteOpFileKind::Index,
|
||||
&RemoteOpKind::Download,
|
||||
crate::metrics::RemoteTimelineClientMetricsCallTrackSize::DontTrackSize {
|
||||
reason: "no need for a downloads gauge",
|
||||
},
|
||||
);
|
||||
|
||||
download::download_index_part(
|
||||
self.conf,
|
||||
@@ -398,9 +403,13 @@ impl RemoteTimelineClient {
|
||||
layer_metadata: &LayerFileMetadata,
|
||||
) -> anyhow::Result<u64> {
|
||||
let downloaded_size = {
|
||||
let _unfinished_gauge_guard = self
|
||||
.metrics
|
||||
.call_begin(&RemoteOpFileKind::Layer, &RemoteOpKind::Download);
|
||||
let _unfinished_gauge_guard = self.metrics.call_begin(
|
||||
&RemoteOpFileKind::Layer,
|
||||
&RemoteOpKind::Download,
|
||||
crate::metrics::RemoteTimelineClientMetricsCallTrackSize::DontTrackSize {
|
||||
reason: "no need for a downloads gauge",
|
||||
},
|
||||
);
|
||||
download::download_layer_file(
|
||||
self.conf,
|
||||
&self.storage_impl,
|
||||
@@ -886,11 +895,32 @@ impl RemoteTimelineClient {
|
||||
fn calls_unfinished_metric_impl(
|
||||
&self,
|
||||
op: &UploadOp,
|
||||
) -> Option<(RemoteOpFileKind, RemoteOpKind)> {
|
||||
) -> Option<(
|
||||
RemoteOpFileKind,
|
||||
RemoteOpKind,
|
||||
RemoteTimelineClientMetricsCallTrackSize,
|
||||
)> {
|
||||
use RemoteTimelineClientMetricsCallTrackSize::DontTrackSize;
|
||||
let res = match op {
|
||||
UploadOp::UploadLayer(_, _) => (RemoteOpFileKind::Layer, RemoteOpKind::Upload),
|
||||
UploadOp::UploadMetadata(_, _) => (RemoteOpFileKind::Index, RemoteOpKind::Upload),
|
||||
UploadOp::Delete(file_kind, _) => (*file_kind, RemoteOpKind::Delete),
|
||||
UploadOp::UploadLayer(_, m) => (
|
||||
RemoteOpFileKind::Layer,
|
||||
RemoteOpKind::Upload,
|
||||
RemoteTimelineClientMetricsCallTrackSize::Bytes(m.file_size()),
|
||||
),
|
||||
UploadOp::UploadMetadata(_, _) => (
|
||||
RemoteOpFileKind::Index,
|
||||
RemoteOpKind::Upload,
|
||||
DontTrackSize {
|
||||
reason: "metadata uploads are tiny",
|
||||
},
|
||||
),
|
||||
UploadOp::Delete(file_kind, _) => (
|
||||
*file_kind,
|
||||
RemoteOpKind::Delete,
|
||||
DontTrackSize {
|
||||
reason: "should we track deletes? positive or negative sign?",
|
||||
},
|
||||
),
|
||||
UploadOp::Barrier(_) => {
|
||||
// we do not account these
|
||||
return None;
|
||||
@@ -900,20 +930,20 @@ impl RemoteTimelineClient {
|
||||
}
|
||||
|
||||
fn calls_unfinished_metric_begin(&self, op: &UploadOp) {
|
||||
let (file_kind, op_kind) = match self.calls_unfinished_metric_impl(op) {
|
||||
let (file_kind, op_kind, track_bytes) = match self.calls_unfinished_metric_impl(op) {
|
||||
Some(x) => x,
|
||||
None => return,
|
||||
};
|
||||
let guard = self.metrics.call_begin(&file_kind, &op_kind);
|
||||
let guard = self.metrics.call_begin(&file_kind, &op_kind, track_bytes);
|
||||
guard.will_decrement_manually(); // in unfinished_ops_metric_end()
|
||||
}
|
||||
|
||||
fn calls_unfinished_metric_end(&self, op: &UploadOp) {
|
||||
let (file_kind, op_kind) = match self.calls_unfinished_metric_impl(op) {
|
||||
let (file_kind, op_kind, track_bytes) = match self.calls_unfinished_metric_impl(op) {
|
||||
Some(x) => x,
|
||||
None => return,
|
||||
};
|
||||
self.metrics.call_end(&file_kind, &op_kind);
|
||||
self.metrics.call_end(&file_kind, &op_kind, track_bytes);
|
||||
}
|
||||
|
||||
fn stop(&self) {
|
||||
@@ -981,11 +1011,19 @@ impl RemoteTimelineClient {
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::{
|
||||
tenant::harness::{TenantHarness, TIMELINE_ID},
|
||||
context::RequestContext,
|
||||
tenant::{
|
||||
harness::{TenantHarness, TIMELINE_ID},
|
||||
Tenant,
|
||||
},
|
||||
DEFAULT_PG_VERSION,
|
||||
};
|
||||
use remote_storage::{RemoteStorageConfig, RemoteStorageKind};
|
||||
use std::{collections::HashSet, path::Path};
|
||||
use std::{
|
||||
collections::HashSet,
|
||||
path::{Path, PathBuf},
|
||||
};
|
||||
use tokio::runtime::EnterGuard;
|
||||
use utils::lsn::Lsn;
|
||||
|
||||
pub(super) fn dummy_contents(name: &str) -> Vec<u8> {
|
||||
@@ -1034,39 +1072,80 @@ mod tests {
|
||||
assert_eq!(found, expected);
|
||||
}
|
||||
|
||||
struct TestSetup {
|
||||
runtime: &'static tokio::runtime::Runtime,
|
||||
entered_runtime: EnterGuard<'static>,
|
||||
harness: TenantHarness<'static>,
|
||||
tenant: Arc<Tenant>,
|
||||
tenant_ctx: RequestContext,
|
||||
remote_fs_dir: PathBuf,
|
||||
client: Arc<RemoteTimelineClient>,
|
||||
}
|
||||
|
||||
impl TestSetup {
|
||||
fn new(test_name: &str) -> anyhow::Result<Self> {
|
||||
// Use a current-thread runtime in the test
|
||||
let runtime = Box::leak(Box::new(
|
||||
tokio::runtime::Builder::new_current_thread()
|
||||
.enable_all()
|
||||
.build()?,
|
||||
));
|
||||
let entered_runtime = runtime.enter();
|
||||
|
||||
let test_name = Box::leak(Box::new(format!("remote_timeline_client__{test_name}")));
|
||||
let harness = TenantHarness::create(test_name)?;
|
||||
let (tenant, ctx) = runtime.block_on(harness.load());
|
||||
// create an empty timeline directory
|
||||
let timeline =
|
||||
tenant.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?;
|
||||
let _ = timeline.initialize(&ctx).unwrap();
|
||||
|
||||
let remote_fs_dir = harness.conf.workdir.join("remote_fs");
|
||||
std::fs::create_dir_all(remote_fs_dir)?;
|
||||
let remote_fs_dir = std::fs::canonicalize(harness.conf.workdir.join("remote_fs"))?;
|
||||
|
||||
let storage_config = RemoteStorageConfig {
|
||||
max_concurrent_syncs: std::num::NonZeroUsize::new(
|
||||
remote_storage::DEFAULT_REMOTE_STORAGE_MAX_CONCURRENT_SYNCS,
|
||||
)
|
||||
.unwrap(),
|
||||
max_sync_errors: std::num::NonZeroU32::new(
|
||||
remote_storage::DEFAULT_REMOTE_STORAGE_MAX_SYNC_ERRORS,
|
||||
)
|
||||
.unwrap(),
|
||||
storage: RemoteStorageKind::LocalFs(remote_fs_dir.clone()),
|
||||
};
|
||||
|
||||
let storage = GenericRemoteStorage::from_config(&storage_config).unwrap();
|
||||
|
||||
let client = Arc::new(RemoteTimelineClient {
|
||||
conf: harness.conf,
|
||||
runtime,
|
||||
tenant_id: harness.tenant_id,
|
||||
timeline_id: TIMELINE_ID,
|
||||
storage_impl: storage,
|
||||
upload_queue: Mutex::new(UploadQueue::Uninitialized),
|
||||
metrics: Arc::new(RemoteTimelineClientMetrics::new(
|
||||
&harness.tenant_id,
|
||||
&TIMELINE_ID,
|
||||
)),
|
||||
});
|
||||
|
||||
Ok(Self {
|
||||
runtime,
|
||||
entered_runtime,
|
||||
harness,
|
||||
tenant,
|
||||
tenant_ctx: ctx,
|
||||
remote_fs_dir,
|
||||
client,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// Test scheduling
|
||||
#[test]
|
||||
fn upload_scheduling() -> anyhow::Result<()> {
|
||||
// Use a current-thread runtime in the test
|
||||
let runtime = Box::leak(Box::new(
|
||||
tokio::runtime::Builder::new_current_thread()
|
||||
.enable_all()
|
||||
.build()?,
|
||||
));
|
||||
let _entered = runtime.enter();
|
||||
|
||||
let harness = TenantHarness::create("upload_scheduling")?;
|
||||
let (tenant, ctx) = runtime.block_on(harness.load());
|
||||
let _timeline =
|
||||
tenant.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?;
|
||||
let timeline_path = harness.timeline_path(&TIMELINE_ID);
|
||||
|
||||
let remote_fs_dir = harness.conf.workdir.join("remote_fs");
|
||||
std::fs::create_dir_all(remote_fs_dir)?;
|
||||
let remote_fs_dir = std::fs::canonicalize(harness.conf.workdir.join("remote_fs"))?;
|
||||
|
||||
let storage_config = RemoteStorageConfig {
|
||||
max_concurrent_syncs: std::num::NonZeroUsize::new(
|
||||
remote_storage::DEFAULT_REMOTE_STORAGE_MAX_CONCURRENT_SYNCS,
|
||||
)
|
||||
.unwrap(),
|
||||
max_sync_errors: std::num::NonZeroU32::new(
|
||||
remote_storage::DEFAULT_REMOTE_STORAGE_MAX_SYNC_ERRORS,
|
||||
)
|
||||
.unwrap(),
|
||||
storage: RemoteStorageKind::LocalFs(remote_fs_dir.clone()),
|
||||
};
|
||||
|
||||
// Test outline:
|
||||
//
|
||||
// Schedule upload of a bunch of layers. Check that they are started immediately, not queued
|
||||
@@ -1081,21 +1160,19 @@ mod tests {
|
||||
// Schedule another deletion. Check that it's launched immediately.
|
||||
// Schedule index upload. Check that it's queued
|
||||
|
||||
println!("workdir: {}", harness.conf.workdir.display());
|
||||
|
||||
let storage_impl = GenericRemoteStorage::from_config(&storage_config)?;
|
||||
let client = Arc::new(RemoteTimelineClient {
|
||||
conf: harness.conf,
|
||||
let TestSetup {
|
||||
runtime,
|
||||
tenant_id: harness.tenant_id,
|
||||
timeline_id: TIMELINE_ID,
|
||||
storage_impl,
|
||||
upload_queue: Mutex::new(UploadQueue::Uninitialized),
|
||||
metrics: Arc::new(RemoteTimelineClientMetrics::new(
|
||||
&harness.tenant_id,
|
||||
&TIMELINE_ID,
|
||||
)),
|
||||
});
|
||||
entered_runtime: _entered_runtime,
|
||||
harness,
|
||||
tenant: _tenant,
|
||||
tenant_ctx: _tenant_ctx,
|
||||
remote_fs_dir,
|
||||
client,
|
||||
} = TestSetup::new("upload_scheduling").unwrap();
|
||||
|
||||
let timeline_path = harness.timeline_path(&TIMELINE_ID);
|
||||
|
||||
println!("workdir: {}", harness.conf.workdir.display());
|
||||
|
||||
let remote_timeline_dir =
|
||||
remote_fs_dir.join(timeline_path.strip_prefix(&harness.conf.workdir)?);
|
||||
@@ -1216,4 +1293,90 @@ mod tests {
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn bytes_unfinished_gauge_for_layer_file_uploads() -> anyhow::Result<()> {
|
||||
// Setup
|
||||
|
||||
let TestSetup {
|
||||
runtime,
|
||||
harness,
|
||||
client,
|
||||
..
|
||||
} = TestSetup::new("metrics")?;
|
||||
|
||||
let metadata = dummy_metadata(Lsn(0x10));
|
||||
client.init_upload_queue_for_empty_remote(&metadata)?;
|
||||
|
||||
let timeline_path = harness.timeline_path(&TIMELINE_ID);
|
||||
|
||||
let layer_file_name_1: LayerFileName = "000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap();
|
||||
let content_1 = dummy_contents("foo");
|
||||
std::fs::write(
|
||||
timeline_path.join(layer_file_name_1.file_name()),
|
||||
&content_1,
|
||||
)?;
|
||||
|
||||
#[derive(Debug, PartialEq)]
|
||||
struct BytesStartedFinished {
|
||||
started: Option<usize>,
|
||||
finished: Option<usize>,
|
||||
}
|
||||
let get_bytes_started_stopped = || {
|
||||
let started = client
|
||||
.metrics
|
||||
.get_bytes_started_counter_value(&RemoteOpFileKind::Layer, &RemoteOpKind::Upload)
|
||||
.map(|v| v.try_into().unwrap());
|
||||
let stopped = client
|
||||
.metrics
|
||||
.get_bytes_finished_counter_value(&RemoteOpFileKind::Layer, &RemoteOpKind::Upload)
|
||||
.map(|v| v.try_into().unwrap());
|
||||
BytesStartedFinished {
|
||||
started,
|
||||
finished: stopped,
|
||||
}
|
||||
};
|
||||
|
||||
// Test
|
||||
|
||||
let init = get_bytes_started_stopped();
|
||||
|
||||
client.schedule_layer_file_upload(
|
||||
&layer_file_name_1,
|
||||
&LayerFileMetadata::new(content_1.len() as u64),
|
||||
)?;
|
||||
|
||||
let pre = get_bytes_started_stopped();
|
||||
|
||||
runtime.block_on(client.wait_completion())?;
|
||||
|
||||
let post = get_bytes_started_stopped();
|
||||
|
||||
// Validate
|
||||
|
||||
assert_eq!(
|
||||
init,
|
||||
BytesStartedFinished {
|
||||
started: None,
|
||||
finished: None
|
||||
}
|
||||
);
|
||||
assert_eq!(
|
||||
pre,
|
||||
BytesStartedFinished {
|
||||
started: Some(content_1.len()),
|
||||
// assert that the _finished metric is created eagerly so that subtractions work on first sample
|
||||
finished: Some(0),
|
||||
}
|
||||
);
|
||||
assert_eq!(
|
||||
post,
|
||||
BytesStartedFinished {
|
||||
started: Some(content_1.len()),
|
||||
finished: Some(content_1.len())
|
||||
}
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -45,6 +45,8 @@ PAGESERVER_PER_TENANT_REMOTE_TIMELINE_CLIENT_METRICS: Tuple[str, ...] = (
|
||||
*[f"pageserver_remote_timeline_client_calls_started_{x}" for x in ["bucket", "count", "sum"]],
|
||||
*[f"pageserver_remote_operation_seconds_{x}" for x in ["bucket", "count", "sum"]],
|
||||
"pageserver_remote_physical_size",
|
||||
"pageserver_remote_timeline_client_bytes_started_total",
|
||||
"pageserver_remote_timeline_client_bytes_finished_total",
|
||||
)
|
||||
|
||||
PAGESERVER_GLOBAL_METRICS: Tuple[str, ...] = (
|
||||
|
||||
Reference in New Issue
Block a user