From a33e271cd7cd06d3beed4b84b6da736f6d0c82f2 Mon Sep 17 00:00:00 2001 From: Arthur Petukhovsky Date: Fri, 3 May 2024 11:12:50 +0200 Subject: [PATCH] Download from s3 right away --- .../src/validate_safekeeper_timeline.rs | 178 +++++++++++++----- 1 file changed, 132 insertions(+), 46 deletions(-) diff --git a/s3_scrubber/src/validate_safekeeper_timeline.rs b/s3_scrubber/src/validate_safekeeper_timeline.rs index 5d57f04849..d1b099b8ae 100644 --- a/s3_scrubber/src/validate_safekeeper_timeline.rs +++ b/s3_scrubber/src/validate_safekeeper_timeline.rs @@ -5,6 +5,7 @@ use std::{ }; use aws_sdk_s3::{types::ObjectIdentifier, Client}; +use camino::Utf8Path; use futures::stream::{StreamExt, TryStreamExt}; use postgres_ffi::{XLogFileName, PG_TLI}; @@ -12,7 +13,7 @@ use reqwest::Url; use safekeeper::patch_control_file; use serde_json::json; -use tokio::{fs::File, io::{AsyncBufReadExt, AsyncWriteExt, BufReader}}; +use tokio::{fs::File, io::{AsyncBufReadExt, AsyncWriteExt, BufReader}, process::Command}; use tokio_postgres::types::PgLsn; use tracing::{error, info, info_span, Instrument}; use utils::{ @@ -21,8 +22,7 @@ use utils::{ }; use crate::{ - init_remote, metadata_stream::stream_listing, - BucketConfig, NodeKind, RootTarget, SafekeeperApiConfig, + download_object_to_file, init_remote, metadata_stream::stream_listing, BucketConfig, NodeKind, RootTarget, SafekeeperApiConfig }; /// Generally we should ask safekeepers, but so far we use everywhere default 16MB. @@ -188,28 +188,58 @@ async fn validate_timeline( } let another_segment = another_segment.unwrap(); - shared.append_script(&[ - String::new(), - format!( - "aws s3 cp s3://{}/{} {}", - root.bucket_name(), - segment_of_interest.key, - "our_segment" - ), - format!( - "aws s3 cp s3://{}/{} {}", - root.bucket_name(), - another_segment.key, - "their_segment" - ), - // if equal - "if cmp -s our_segment their_segment; then".to_string(), - format!(" echo '{}' >> valid.log", tli.ttid), - "else".to_string(), - format!(" echo '{}' >> invalid.log", tli.ttid), - "fi".to_string(), - format!("rm {} {}", "our_segment", "their_segment"), - ]); + let ours_str = format!("./tmp/ours"); + let ours = Utf8Path::new(&ours_str); + download_object_to_file( + s3_client, + root.bucket_name(), + &segment_of_interest.key, + None, + &ours, + ).await?; + + let their_str = format!("./tmp/their"); + let their = Utf8Path::new(&their_str); + download_object_to_file( + s3_client, + root.bucket_name(), + &another_segment.key, + None, + &their, + ).await?; + + let cmd = "cmp -l ./tmp/ours ./tmp/their"; + + // Execute the command using `bash -c` + let mut process = Command::new("bash") + .arg("-c") + .arg(&cmd) + .spawn()?; + + // Await the process to finish and check the exit status + let status = process.wait().await?; + + // let's delete the file + tokio::fs::remove_file(ours).await?; + tokio::fs::remove_file(their).await?; + + if status.success() { + info!("Command executed successfully."); + shared.append_script(&[ + format!("echo '{}' >> valid.log", tli.ttid), + ]); + } else { + shared.append_script(&[ + format!("echo '{}' >> invalid.log", tli.ttid), + ]); + + info!("Command failed with status: {}", status); + if let Some(code) = status.code() { + info!("Exit code: {}", code); + } else { + info!("Process terminated by signal"); + } + } return Ok(()); } @@ -224,26 +254,81 @@ async fn validate_timeline( let segment_start_lsn = local_start_lsn.segment_lsn(WAL_SEGSIZE); - shared.append_script(&[ - String::new(), - format!("aws s3 cp s3://{}/{} {}", root.bucket_name(), key, segfile), - format!( - "/usr/local/v{}/bin/pg_waldump{} {} > /dev/null", - status.pg_info.pg_version / 10000, - if segment_start_lsn < timeline_start_lsn { - format!(" -s {}", timeline_start_lsn) - } else { - "".to_string() - }, - segfile, - ), - "if [ $? -ne 0 ]; then".to_string(), - format!(" echo '{}' >> invalid.log", tli.ttid), - "else".to_string(), - format!(" echo '{}' >> valid.log", tli.ttid), - "fi".to_string(), - format!("rm {}", segfile), - ]); + let path_str = format!("./tmp/{}", segfile); + let path = Utf8Path::new(&path_str); + download_object_to_file( + s3_client, + root.bucket_name(), + &key, + None, + &path, + ).await?; + + info!("downloaded file to {}", path); + + let waldump_cmd = format!( + "./pg_install/v{}/bin/pg_waldump -i{} ./tmp/{} > /dev/null", + status.pg_info.pg_version / 10000, + if segment_start_lsn < timeline_start_lsn { + format!(" -s {}", timeline_start_lsn) + } else { + "".to_string() + }, + segfile, + ); + + info!("running {}", waldump_cmd); + + // Execute the command using `bash -c` + let mut process = Command::new("bash") + .arg("-c") + .arg(&waldump_cmd) + .spawn()?; + + // Await the process to finish and check the exit status + let status = process.wait().await?; + + // let's delete the file + tokio::fs::remove_file(path).await?; + + if status.success() { + info!("Command executed successfully."); + shared.append_script(&[ + format!("echo '{}' >> valid.log", tli.ttid), + ]); + } else { + shared.append_script(&[ + format!("echo '{}' >> invalid.log", tli.ttid), + ]); + + info!("Command failed with status: {}", status); + if let Some(code) = status.code() { + info!("Exit code: {}", code); + } else { + info!("Process terminated by signal"); + } + } + + // shared.append_script(&[ + // String::new(), + // format!("aws s3 cp s3://{}/{} {}", root.bucket_name(), key, segfile), + // format!( + // "/usr/local/v{}/bin/pg_waldump{} {} > /dev/null", + // status.pg_info.pg_version / 10000, + // if segment_start_lsn < timeline_start_lsn { + // format!(" -s {}", timeline_start_lsn) + // } else { + // "".to_string() + // }, + // segfile, + // ), + // "if [ $? -ne 0 ]; then".to_string(), + // format!(" echo '{}' >> invalid.log", tli.ttid), + // "else".to_string(), + // format!(" echo '{}' >> valid.log", tli.ttid), + // "fi".to_string(), + // format!("rm {}", segfile), + // ]); Ok(()) } @@ -307,6 +392,7 @@ pub async fn validate_timelines( where timeline_start_lsn != local_start_lsn AND sk_id = 51 +AND local_start_lsn > backup_lsn AND timeline_id != '13a865b39537d5538e0ea74c926d9c6f' AND timeline_id != 'c5e944b5c13628ba8fe128b01e7e663d' AND timeline_id != 'e7cfa4a2bd15c88ff011d69feef4b076';", @@ -344,7 +430,7 @@ AND timeline_id != 'e7cfa4a2bd15c88ff011d69feef4b076';", }); // Run tasks concurrently. - const CONCURRENCY: usize = 10; + const CONCURRENCY: usize = 1; let mut timelines = checks.try_buffered(CONCURRENCY); while let Some(r) = timelines.next().await {