Download from s3 right away

This commit is contained in:
Arthur Petukhovsky
2024-05-03 11:12:50 +02:00
parent 0f7629cc42
commit a33e271cd7

View File

@@ -5,6 +5,7 @@ use std::{
};
use aws_sdk_s3::{types::ObjectIdentifier, Client};
use camino::Utf8Path;
use futures::stream::{StreamExt, TryStreamExt};
use postgres_ffi::{XLogFileName, PG_TLI};
@@ -12,7 +13,7 @@ use reqwest::Url;
use safekeeper::patch_control_file;
use serde_json::json;
use tokio::{fs::File, io::{AsyncBufReadExt, AsyncWriteExt, BufReader}};
use tokio::{fs::File, io::{AsyncBufReadExt, AsyncWriteExt, BufReader}, process::Command};
use tokio_postgres::types::PgLsn;
use tracing::{error, info, info_span, Instrument};
use utils::{
@@ -21,8 +22,7 @@ use utils::{
};
use crate::{
init_remote, metadata_stream::stream_listing,
BucketConfig, NodeKind, RootTarget, SafekeeperApiConfig,
download_object_to_file, init_remote, metadata_stream::stream_listing, BucketConfig, NodeKind, RootTarget, SafekeeperApiConfig
};
/// Generally we should ask safekeepers, but so far we use everywhere default 16MB.
@@ -188,28 +188,58 @@ async fn validate_timeline(
}
let another_segment = another_segment.unwrap();
shared.append_script(&[
String::new(),
format!(
"aws s3 cp s3://{}/{} {}",
root.bucket_name(),
segment_of_interest.key,
"our_segment"
),
format!(
"aws s3 cp s3://{}/{} {}",
root.bucket_name(),
another_segment.key,
"their_segment"
),
// if equal
"if cmp -s our_segment their_segment; then".to_string(),
format!(" echo '{}' >> valid.log", tli.ttid),
"else".to_string(),
format!(" echo '{}' >> invalid.log", tli.ttid),
"fi".to_string(),
format!("rm {} {}", "our_segment", "their_segment"),
]);
let ours_str = format!("./tmp/ours");
let ours = Utf8Path::new(&ours_str);
download_object_to_file(
s3_client,
root.bucket_name(),
&segment_of_interest.key,
None,
&ours,
).await?;
let their_str = format!("./tmp/their");
let their = Utf8Path::new(&their_str);
download_object_to_file(
s3_client,
root.bucket_name(),
&another_segment.key,
None,
&their,
).await?;
let cmd = "cmp -l ./tmp/ours ./tmp/their";
// Execute the command using `bash -c`
let mut process = Command::new("bash")
.arg("-c")
.arg(&cmd)
.spawn()?;
// Await the process to finish and check the exit status
let status = process.wait().await?;
// let's delete the file
tokio::fs::remove_file(ours).await?;
tokio::fs::remove_file(their).await?;
if status.success() {
info!("Command executed successfully.");
shared.append_script(&[
format!("echo '{}' >> valid.log", tli.ttid),
]);
} else {
shared.append_script(&[
format!("echo '{}' >> invalid.log", tli.ttid),
]);
info!("Command failed with status: {}", status);
if let Some(code) = status.code() {
info!("Exit code: {}", code);
} else {
info!("Process terminated by signal");
}
}
return Ok(());
}
@@ -224,26 +254,81 @@ async fn validate_timeline(
let segment_start_lsn = local_start_lsn.segment_lsn(WAL_SEGSIZE);
shared.append_script(&[
String::new(),
format!("aws s3 cp s3://{}/{} {}", root.bucket_name(), key, segfile),
format!(
"/usr/local/v{}/bin/pg_waldump{} {} > /dev/null",
status.pg_info.pg_version / 10000,
if segment_start_lsn < timeline_start_lsn {
format!(" -s {}", timeline_start_lsn)
} else {
"".to_string()
},
segfile,
),
"if [ $? -ne 0 ]; then".to_string(),
format!(" echo '{}' >> invalid.log", tli.ttid),
"else".to_string(),
format!(" echo '{}' >> valid.log", tli.ttid),
"fi".to_string(),
format!("rm {}", segfile),
]);
let path_str = format!("./tmp/{}", segfile);
let path = Utf8Path::new(&path_str);
download_object_to_file(
s3_client,
root.bucket_name(),
&key,
None,
&path,
).await?;
info!("downloaded file to {}", path);
let waldump_cmd = format!(
"./pg_install/v{}/bin/pg_waldump -i{} ./tmp/{} > /dev/null",
status.pg_info.pg_version / 10000,
if segment_start_lsn < timeline_start_lsn {
format!(" -s {}", timeline_start_lsn)
} else {
"".to_string()
},
segfile,
);
info!("running {}", waldump_cmd);
// Execute the command using `bash -c`
let mut process = Command::new("bash")
.arg("-c")
.arg(&waldump_cmd)
.spawn()?;
// Await the process to finish and check the exit status
let status = process.wait().await?;
// let's delete the file
tokio::fs::remove_file(path).await?;
if status.success() {
info!("Command executed successfully.");
shared.append_script(&[
format!("echo '{}' >> valid.log", tli.ttid),
]);
} else {
shared.append_script(&[
format!("echo '{}' >> invalid.log", tli.ttid),
]);
info!("Command failed with status: {}", status);
if let Some(code) = status.code() {
info!("Exit code: {}", code);
} else {
info!("Process terminated by signal");
}
}
// shared.append_script(&[
// String::new(),
// format!("aws s3 cp s3://{}/{} {}", root.bucket_name(), key, segfile),
// format!(
// "/usr/local/v{}/bin/pg_waldump{} {} > /dev/null",
// status.pg_info.pg_version / 10000,
// if segment_start_lsn < timeline_start_lsn {
// format!(" -s {}", timeline_start_lsn)
// } else {
// "".to_string()
// },
// segfile,
// ),
// "if [ $? -ne 0 ]; then".to_string(),
// format!(" echo '{}' >> invalid.log", tli.ttid),
// "else".to_string(),
// format!(" echo '{}' >> valid.log", tli.ttid),
// "fi".to_string(),
// format!("rm {}", segfile),
// ]);
Ok(())
}
@@ -307,6 +392,7 @@ pub async fn validate_timelines(
where
timeline_start_lsn != local_start_lsn
AND sk_id = 51
AND local_start_lsn > backup_lsn
AND timeline_id != '13a865b39537d5538e0ea74c926d9c6f'
AND timeline_id != 'c5e944b5c13628ba8fe128b01e7e663d'
AND timeline_id != 'e7cfa4a2bd15c88ff011d69feef4b076';",
@@ -344,7 +430,7 @@ AND timeline_id != 'e7cfa4a2bd15c88ff011d69feef4b076';",
});
// Run tasks concurrently.
const CONCURRENCY: usize = 10;
const CONCURRENCY: usize = 1;
let mut timelines = checks.try_buffered(CONCURRENCY);
while let Some(r) = timelines.next().await {