mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-04 12:02:55 +00:00
105 lines
2.9 KiB
Rust
105 lines
2.9 KiB
Rust
//
|
|
// Offload old WAL segments to S3 and remove them locally
|
|
//
|
|
|
|
use anyhow::Result;
|
|
use log::*;
|
|
use postgres_ffi::xlog_utils::*;
|
|
use s3::bucket::Bucket;
|
|
use s3::creds::Credentials;
|
|
use s3::region::Region;
|
|
use std::collections::HashSet;
|
|
use std::env;
|
|
use std::fs::{self, File};
|
|
use std::io::prelude::*;
|
|
use std::path::Path;
|
|
use std::time::SystemTime;
|
|
use tokio::runtime;
|
|
use tokio::time::sleep;
|
|
use walkdir::WalkDir;
|
|
|
|
use crate::WalAcceptorConf;
|
|
|
|
pub fn thread_main(conf: WalAcceptorConf) {
|
|
// Create a new thread pool
|
|
//
|
|
// FIXME: keep it single-threaded for now, make it easier to debug with gdb,
|
|
// and we're not concerned with performance yet.
|
|
//let runtime = runtime::Runtime::new().unwrap();
|
|
let runtime = runtime::Builder::new_current_thread()
|
|
.enable_all()
|
|
.build()
|
|
.unwrap();
|
|
|
|
info!("Starting S3 offload task");
|
|
|
|
runtime.block_on(async {
|
|
main_loop(&conf).await.unwrap();
|
|
});
|
|
}
|
|
|
|
async fn offload_files(
|
|
bucket: &Bucket,
|
|
listing: &HashSet<String>,
|
|
dir_path: &Path,
|
|
conf: &WalAcceptorConf,
|
|
) -> Result<u64> {
|
|
let horizon = SystemTime::now() - conf.ttl.unwrap();
|
|
let mut n: u64 = 0;
|
|
for entry in WalkDir::new(dir_path) {
|
|
let entry = entry?;
|
|
let path = entry.path();
|
|
|
|
if path.is_file()
|
|
&& IsXLogFileName(entry.file_name().to_str().unwrap())
|
|
&& entry.metadata().unwrap().created().unwrap() <= horizon
|
|
{
|
|
let relpath = path.strip_prefix(&conf.data_dir).unwrap();
|
|
let s3path = String::from("walarchive/") + relpath.to_str().unwrap();
|
|
if !listing.contains(&s3path) {
|
|
let mut file = File::open(&path)?;
|
|
let mut content = Vec::new();
|
|
file.read_to_end(&mut content)?;
|
|
bucket.put_object(s3path, &content).await?;
|
|
|
|
fs::remove_file(&path)?;
|
|
n += 1;
|
|
}
|
|
}
|
|
}
|
|
Ok(n)
|
|
}
|
|
|
|
async fn main_loop(conf: &WalAcceptorConf) -> Result<()> {
|
|
let region = Region::Custom {
|
|
region: env::var("S3_REGION").unwrap(),
|
|
endpoint: env::var("S3_ENDPOINT").unwrap(),
|
|
};
|
|
let credentials = Credentials::new(
|
|
Some(&env::var("S3_ACCESSKEY").unwrap()),
|
|
Some(&env::var("S3_SECRET").unwrap()),
|
|
None,
|
|
None,
|
|
None,
|
|
)
|
|
.unwrap();
|
|
|
|
// Create Bucket in REGION for BUCKET
|
|
let bucket = Bucket::new_with_path_style("zenith-testbucket", region, credentials)?;
|
|
|
|
loop {
|
|
// List out contents of directory
|
|
let results = bucket
|
|
.list("walarchive/".to_string(), Some("".to_string()))
|
|
.await?;
|
|
let listing = results
|
|
.iter()
|
|
.flat_map(|b| b.contents.iter().map(|o| o.key.clone()))
|
|
.collect();
|
|
|
|
let n = offload_files(&bucket, &listing, &conf.data_dir, conf).await?;
|
|
info!("Offload {} files to S3", n);
|
|
sleep(conf.ttl.unwrap()).await;
|
|
}
|
|
}
|