metric pageserver_remote_upload_queue_unfinished_tasks: add labels for file & op kind, and check for them in the test

Fails right now because turns out we don't actually generate layer
removal tasks with the current test code. That will be the next commit.
This commit is contained in:
Christian Schwarz
2022-11-22 11:40:22 -05:00
parent 5a55dce282
commit c61731a31f
3 changed files with 51 additions and 38 deletions

View File

@@ -205,18 +205,19 @@ pub static REMOTE_UPLOAD_QUEUE_UNFINISHED_TASKS: Lazy<IntGaugeVec> = Lazy::new(|
register_int_gauge_vec!(
"pageserver_remote_upload_queue_unfinished_tasks",
"Number of tasks in the upload queue that are not finished yet.",
&["tenant_id", "timeline_id"],
&["tenant_id", "timeline_id", "file_kind", "op_kind"],
)
.expect("failed to define a metric")
});
#[derive(Debug, Clone, Copy)]
pub enum RemoteOpKind {
Upload,
Download,
Delete,
}
impl RemoteOpKind {
fn as_str(&self) -> &str {
pub fn as_str(&self) -> &str {
match self {
Self::Upload => "upload",
Self::Download => "download",
@@ -225,12 +226,13 @@ impl RemoteOpKind {
}
}
#[derive(Debug, Clone, Copy)]
pub enum RemoteOpFileKind {
Layer,
Index,
}
impl RemoteOpFileKind {
fn as_str(&self) -> &str {
pub fn as_str(&self) -> &str {
match self {
Self::Layer => "layer",
Self::Index => "index",

View File

@@ -342,7 +342,7 @@ enum UploadOp {
UploadMetadata(IndexPart, Lsn),
/// Delete a file.
Delete(PathBuf),
Delete(RemoteOpFileKind, PathBuf),
/// Barrier. When the barrier operation is reached,
Barrier(tokio::sync::watch::Sender<()>),
@@ -358,7 +358,7 @@ impl std::fmt::Display for UploadOp {
metadata.file_size()
),
UploadOp::UploadMetadata(_, lsn) => write!(f, "UploadMetadata(lsn: {})", lsn),
UploadOp::Delete(path) => write!(f, "Delete({})", path.display()),
UploadOp::Delete(_, path) => write!(f, "Delete({})", path.display()),
UploadOp::Barrier(_) => write!(f, "Barrier"),
}
}
@@ -483,10 +483,9 @@ impl RemoteTimelineClient {
disk_consistent_lsn,
upload_queue.latest_metadata.to_bytes()?,
);
upload_queue
.queued_operations
.push_back(UploadOp::UploadMetadata(index_part, disk_consistent_lsn));
self.upload_queue_items_metric(1);
let op = UploadOp::UploadMetadata(index_part, disk_consistent_lsn);
self.update_upload_queue_unfinished_metric(1, &op);
upload_queue.queued_operations.push_back(op);
info!(
"scheduled metadata upload with {} files",
@@ -528,13 +527,9 @@ impl RemoteTimelineClient {
.latest_files
.insert(relative_path, layer_metadata.clone());
upload_queue
.queued_operations
.push_back(UploadOp::UploadLayer(
PathBuf::from(path),
layer_metadata.clone(),
));
self.upload_queue_items_metric(1);
let op = UploadOp::UploadLayer(PathBuf::from(path), layer_metadata.clone());
self.update_upload_queue_unfinished_metric(1, &op);
upload_queue.queued_operations.push_back(op);
info!("scheduled layer file upload {}", path.display());
@@ -572,17 +567,15 @@ impl RemoteTimelineClient {
disk_consistent_lsn,
upload_queue.latest_metadata.to_bytes()?,
);
upload_queue
.queued_operations
.push_back(UploadOp::UploadMetadata(index_part, disk_consistent_lsn));
self.upload_queue_items_metric(1);
let op = UploadOp::UploadMetadata(index_part, disk_consistent_lsn);
self.update_upload_queue_unfinished_metric(1, &op);
upload_queue.queued_operations.push_back(op);
// schedule the actual deletions
for path in paths {
upload_queue
.queued_operations
.push_back(UploadOp::Delete(PathBuf::from(path)));
self.upload_queue_items_metric(1);
let op = UploadOp::Delete(RemoteOpFileKind::Layer, PathBuf::from(path));
self.update_upload_queue_unfinished_metric(1, &op);
upload_queue.queued_operations.push_back(op);
info!("scheduled layer file deletion {}", path.display());
}
@@ -632,7 +625,7 @@ impl RemoteTimelineClient {
// have finished.
upload_queue.inprogress_tasks.is_empty()
}
UploadOp::Delete(_) => {
UploadOp::Delete(_, _) => {
// Wait for preceding uploads to finish. Concurrent deletions are OK, though.
upload_queue.num_inprogress_deletions == upload_queue.inprogress_tasks.len()
}
@@ -663,7 +656,7 @@ impl RemoteTimelineClient {
UploadOp::UploadMetadata(_, _) => {
upload_queue.num_inprogress_metadata_uploads += 1;
}
UploadOp::Delete(_) => {
UploadOp::Delete(_, _) => {
upload_queue.num_inprogress_deletions += 1;
}
UploadOp::Barrier(sender) => {
@@ -696,8 +689,9 @@ impl RemoteTimelineClient {
"remote upload",
false,
async move {
let task_clone = Arc::clone(&task);
self_rc.perform_upload_task(task).await;
self_rc.upload_queue_items_metric(-1);
self_rc.update_upload_queue_unfinished_metric(-1, &task_clone.op);
Ok(())
},
);
@@ -717,7 +711,7 @@ impl RemoteTimelineClient {
async fn perform_upload_task(self: &Arc<Self>, task: Arc<UploadTask>) {
// Loop to retry until it completes.
loop {
let upload_result: anyhow::Result<()> = match task.op {
let upload_result: anyhow::Result<()> = match &task.op {
UploadOp::UploadLayer(ref path, ref layer_metadata) => {
upload::upload_timeline_layer(&self.storage_impl, path, layer_metadata)
.measure_remote_op(
@@ -744,12 +738,12 @@ impl RemoteTimelineClient {
)
.await
}
UploadOp::Delete(ref path) => {
UploadOp::Delete(metric_file_kind, ref path) => {
delete::delete_layer(&self.storage_impl, path)
.measure_remote_op(
self.tenant_id,
self.timeline_id,
RemoteOpFileKind::Layer,
*metric_file_kind,
RemoteOpKind::Delete,
)
.await
@@ -810,7 +804,7 @@ impl RemoteTimelineClient {
upload_queue.num_inprogress_metadata_uploads -= 1;
upload_queue.last_uploaded_consistent_lsn = lsn; // XXX monotonicity check?
}
UploadOp::Delete(_) => {
UploadOp::Delete(_, _) => {
upload_queue.num_inprogress_deletions -= 1;
}
UploadOp::Barrier(_) => unreachable!(),
@@ -821,11 +815,21 @@ impl RemoteTimelineClient {
}
}
fn upload_queue_items_metric(&self, delta: i64) {
fn update_upload_queue_unfinished_metric(&self, delta: i64, op: &UploadOp) {
let (file_kind, op_kind) = match op {
UploadOp::UploadLayer(_, _) => (RemoteOpFileKind::Layer, RemoteOpKind::Upload),
UploadOp::UploadMetadata(_, _) => (RemoteOpFileKind::Index, RemoteOpKind::Upload),
UploadOp::Delete(file_kind, _) => (*file_kind, RemoteOpKind::Delete),
UploadOp::Barrier(_) => {
unreachable!("we execute barriers synchronously")
}
};
REMOTE_UPLOAD_QUEUE_UNFINISHED_TASKS
.get_metric_with_label_values(&[
&self.tenant_id.to_string(),
&self.timeline_id.to_string(),
file_kind.as_str(),
op_kind.as_str(),
])
.unwrap()
.add(delta)

View File

@@ -175,6 +175,13 @@ def test_remote_storage_backup_and_restore(
)
# Exercises the upload queue retry code paths.
# - Use failpoints to cause all storage ops to fail
# - Churn on database to create layer & index uploads, and layer deletions
# - Check that these operations are queued up, using the appropriate metrics
# - Disable failpoints
# - Wait for all uploads to finish
# - Verify that remote is consistent and up-to-date (=all retries were done and succeeded)
@pytest.mark.parametrize("remote_storage_kind", [RemoteStorageKind.LOCAL_FS])
def test_remote_storage_upload_queue_retries(
neon_env_builder: NeonEnvBuilder,
@@ -250,20 +257,20 @@ def test_remote_storage_upload_queue_retries(
assert gc_result["layers_removed"] > 0
# confirm all operations are queued up
def get_queued_count():
def get_queued_count(file_kind, op_kind):
metrics = client.get_metrics()
matches = re.search(
f'^pageserver_remote_upload_queue_unfinished_tasks{{tenant_id="{tenant_id}",timeline_id="{timeline_id}"}} (\\S+)$',
f'^pageserver_remote_upload_queue_unfinished_tasks{{file_kind="{file_kind}",op_kind="{op_kind}",tenant_id="{tenant_id}",timeline_id="{timeline_id}"}} (\\S+)$',
metrics,
re.MULTILINE,
)
assert matches
return int(matches[1])
# ensure that operations have queued up
queued_count = get_queued_count()
log.info(f"queued_count={queued_count}")
assert queued_count > 0
# ensure that all operation types that can be in the upload queue have queued up
assert get_queued_count(file_kind="layer", op_kind="upload") > 0
assert get_queued_count(file_kind="index", op_kind="upload") >= 2
assert get_queued_count(file_kind="layer", op_kind="remove") > 0
# unblock all operations and wait for them to finish
configure_storage_sync_failpoints("off")