diff --git a/pageserver/src/metrics.rs b/pageserver/src/metrics.rs index 33216085bb..454ff01f0e 100644 --- a/pageserver/src/metrics.rs +++ b/pageserver/src/metrics.rs @@ -205,18 +205,19 @@ pub static REMOTE_UPLOAD_QUEUE_UNFINISHED_TASKS: Lazy = Lazy::new(| register_int_gauge_vec!( "pageserver_remote_upload_queue_unfinished_tasks", "Number of tasks in the upload queue that are not finished yet.", - &["tenant_id", "timeline_id"], + &["tenant_id", "timeline_id", "file_kind", "op_kind"], ) .expect("failed to define a metric") }); +#[derive(Debug, Clone, Copy)] pub enum RemoteOpKind { Upload, Download, Delete, } impl RemoteOpKind { - fn as_str(&self) -> &str { + pub fn as_str(&self) -> &str { match self { Self::Upload => "upload", Self::Download => "download", @@ -225,12 +226,13 @@ impl RemoteOpKind { } } +#[derive(Debug, Clone, Copy)] pub enum RemoteOpFileKind { Layer, Index, } impl RemoteOpFileKind { - fn as_str(&self) -> &str { + pub fn as_str(&self) -> &str { match self { Self::Layer => "layer", Self::Index => "index", diff --git a/pageserver/src/storage_sync.rs b/pageserver/src/storage_sync.rs index 3fb17f9200..d45857cbdb 100644 --- a/pageserver/src/storage_sync.rs +++ b/pageserver/src/storage_sync.rs @@ -342,7 +342,7 @@ enum UploadOp { UploadMetadata(IndexPart, Lsn), /// Delete a file. - Delete(PathBuf), + Delete(RemoteOpFileKind, PathBuf), /// Barrier. When the barrier operation is reached, Barrier(tokio::sync::watch::Sender<()>), @@ -358,7 +358,7 @@ impl std::fmt::Display for UploadOp { metadata.file_size() ), UploadOp::UploadMetadata(_, lsn) => write!(f, "UploadMetadata(lsn: {})", lsn), - UploadOp::Delete(path) => write!(f, "Delete({})", path.display()), + UploadOp::Delete(_, path) => write!(f, "Delete({})", path.display()), UploadOp::Barrier(_) => write!(f, "Barrier"), } } @@ -483,10 +483,9 @@ impl RemoteTimelineClient { disk_consistent_lsn, upload_queue.latest_metadata.to_bytes()?, ); - upload_queue - .queued_operations - .push_back(UploadOp::UploadMetadata(index_part, disk_consistent_lsn)); - self.upload_queue_items_metric(1); + let op = UploadOp::UploadMetadata(index_part, disk_consistent_lsn); + self.update_upload_queue_unfinished_metric(1, &op); + upload_queue.queued_operations.push_back(op); info!( "scheduled metadata upload with {} files", @@ -528,13 +527,9 @@ impl RemoteTimelineClient { .latest_files .insert(relative_path, layer_metadata.clone()); - upload_queue - .queued_operations - .push_back(UploadOp::UploadLayer( - PathBuf::from(path), - layer_metadata.clone(), - )); - self.upload_queue_items_metric(1); + let op = UploadOp::UploadLayer(PathBuf::from(path), layer_metadata.clone()); + self.update_upload_queue_unfinished_metric(1, &op); + upload_queue.queued_operations.push_back(op); info!("scheduled layer file upload {}", path.display()); @@ -572,17 +567,15 @@ impl RemoteTimelineClient { disk_consistent_lsn, upload_queue.latest_metadata.to_bytes()?, ); - upload_queue - .queued_operations - .push_back(UploadOp::UploadMetadata(index_part, disk_consistent_lsn)); - self.upload_queue_items_metric(1); + let op = UploadOp::UploadMetadata(index_part, disk_consistent_lsn); + self.update_upload_queue_unfinished_metric(1, &op); + upload_queue.queued_operations.push_back(op); // schedule the actual deletions for path in paths { - upload_queue - .queued_operations - .push_back(UploadOp::Delete(PathBuf::from(path))); - self.upload_queue_items_metric(1); + let op = UploadOp::Delete(RemoteOpFileKind::Layer, PathBuf::from(path)); + self.update_upload_queue_unfinished_metric(1, &op); + upload_queue.queued_operations.push_back(op); info!("scheduled layer file deletion {}", path.display()); } @@ -632,7 +625,7 @@ impl RemoteTimelineClient { // have finished. upload_queue.inprogress_tasks.is_empty() } - UploadOp::Delete(_) => { + UploadOp::Delete(_, _) => { // Wait for preceding uploads to finish. Concurrent deletions are OK, though. upload_queue.num_inprogress_deletions == upload_queue.inprogress_tasks.len() } @@ -663,7 +656,7 @@ impl RemoteTimelineClient { UploadOp::UploadMetadata(_, _) => { upload_queue.num_inprogress_metadata_uploads += 1; } - UploadOp::Delete(_) => { + UploadOp::Delete(_, _) => { upload_queue.num_inprogress_deletions += 1; } UploadOp::Barrier(sender) => { @@ -696,8 +689,9 @@ impl RemoteTimelineClient { "remote upload", false, async move { + let task_clone = Arc::clone(&task); self_rc.perform_upload_task(task).await; - self_rc.upload_queue_items_metric(-1); + self_rc.update_upload_queue_unfinished_metric(-1, &task_clone.op); Ok(()) }, ); @@ -717,7 +711,7 @@ impl RemoteTimelineClient { async fn perform_upload_task(self: &Arc, task: Arc) { // Loop to retry until it completes. loop { - let upload_result: anyhow::Result<()> = match task.op { + let upload_result: anyhow::Result<()> = match &task.op { UploadOp::UploadLayer(ref path, ref layer_metadata) => { upload::upload_timeline_layer(&self.storage_impl, path, layer_metadata) .measure_remote_op( @@ -744,12 +738,12 @@ impl RemoteTimelineClient { ) .await } - UploadOp::Delete(ref path) => { + UploadOp::Delete(metric_file_kind, ref path) => { delete::delete_layer(&self.storage_impl, path) .measure_remote_op( self.tenant_id, self.timeline_id, - RemoteOpFileKind::Layer, + *metric_file_kind, RemoteOpKind::Delete, ) .await @@ -810,7 +804,7 @@ impl RemoteTimelineClient { upload_queue.num_inprogress_metadata_uploads -= 1; upload_queue.last_uploaded_consistent_lsn = lsn; // XXX monotonicity check? } - UploadOp::Delete(_) => { + UploadOp::Delete(_, _) => { upload_queue.num_inprogress_deletions -= 1; } UploadOp::Barrier(_) => unreachable!(), @@ -821,11 +815,21 @@ impl RemoteTimelineClient { } } - fn upload_queue_items_metric(&self, delta: i64) { + fn update_upload_queue_unfinished_metric(&self, delta: i64, op: &UploadOp) { + let (file_kind, op_kind) = match op { + UploadOp::UploadLayer(_, _) => (RemoteOpFileKind::Layer, RemoteOpKind::Upload), + UploadOp::UploadMetadata(_, _) => (RemoteOpFileKind::Index, RemoteOpKind::Upload), + UploadOp::Delete(file_kind, _) => (*file_kind, RemoteOpKind::Delete), + UploadOp::Barrier(_) => { + unreachable!("we execute barriers synchronously") + } + }; REMOTE_UPLOAD_QUEUE_UNFINISHED_TASKS .get_metric_with_label_values(&[ &self.tenant_id.to_string(), &self.timeline_id.to_string(), + file_kind.as_str(), + op_kind.as_str(), ]) .unwrap() .add(delta) diff --git a/test_runner/regress/test_remote_storage.py b/test_runner/regress/test_remote_storage.py index c93309a5ec..90ebb887b3 100644 --- a/test_runner/regress/test_remote_storage.py +++ b/test_runner/regress/test_remote_storage.py @@ -175,6 +175,13 @@ def test_remote_storage_backup_and_restore( ) +# Exercises the upload queue retry code paths. +# - Use failpoints to cause all storage ops to fail +# - Churn on database to create layer & index uploads, and layer deletions +# - Check that these operations are queued up, using the appropriate metrics +# - Disable failpoints +# - Wait for all uploads to finish +# - Verify that remote is consistent and up-to-date (=all retries were done and succeeded) @pytest.mark.parametrize("remote_storage_kind", [RemoteStorageKind.LOCAL_FS]) def test_remote_storage_upload_queue_retries( neon_env_builder: NeonEnvBuilder, @@ -250,20 +257,20 @@ def test_remote_storage_upload_queue_retries( assert gc_result["layers_removed"] > 0 # confirm all operations are queued up - def get_queued_count(): + def get_queued_count(file_kind, op_kind): metrics = client.get_metrics() matches = re.search( - f'^pageserver_remote_upload_queue_unfinished_tasks{{tenant_id="{tenant_id}",timeline_id="{timeline_id}"}} (\\S+)$', + f'^pageserver_remote_upload_queue_unfinished_tasks{{file_kind="{file_kind}",op_kind="{op_kind}",tenant_id="{tenant_id}",timeline_id="{timeline_id}"}} (\\S+)$', metrics, re.MULTILINE, ) assert matches return int(matches[1]) - # ensure that operations have queued up - queued_count = get_queued_count() - log.info(f"queued_count={queued_count}") - assert queued_count > 0 + # ensure that all operation types that can be in the upload queue have queued up + assert get_queued_count(file_kind="layer", op_kind="upload") > 0 + assert get_queued_count(file_kind="index", op_kind="upload") >= 2 + assert get_queued_count(file_kind="layer", op_kind="remove") > 0 # unblock all operations and wait for them to finish configure_storage_sync_failpoints("off")