Implement offloading of old WAL files to S3 in walkeeper

This commit is contained in:
Konstantin Knizhnik
2021-04-26 16:22:50 +03:00
parent f617115467
commit 3b09a74f58
9 changed files with 154 additions and 15 deletions

3
Cargo.lock generated
View File

@@ -2552,10 +2552,12 @@ dependencies = [
"lazy_static",
"log",
"pageserver",
"parse_duration",
"postgres",
"postgres-protocol",
"postgres_ffi",
"regex",
"rust-s3",
"slog",
"slog-async",
"slog-scope",
@@ -2564,6 +2566,7 @@ dependencies = [
"tokio",
"tokio-postgres",
"tokio-stream",
"walkdir",
]
[[package]]

View File

@@ -492,7 +492,7 @@ impl PostgresNode {
.env("PGHOST", self.address.ip().to_string())
.status()
.expect("pg_regress failed");
regress_check
regress_check
}
pub fn pg_bench(&self, clients: u32, seconds: u32) -> ExitStatus {
@@ -523,7 +523,7 @@ impl PostgresNode {
.env("DYLD_LIBRARY_PATH", self.env.pg_lib_dir().to_str().unwrap())
.status()
.expect("pgbench run");
pg_bench_run
pg_bench_run
}
}

View File

@@ -63,7 +63,7 @@ fn test_regress() {
node.start().unwrap();
let status = node.pg_regress();
assert!(status.success());
assert!(status.success());
}
// Runs pg_bench on a compute node
@@ -81,7 +81,7 @@ fn pgbench() {
node.start().unwrap();
let status = node.pg_bench(10, 100);
assert!(status.success());
assert!(status.success());
}
// Run two postgres instances on one pageserver, on different timelines

View File

@@ -20,6 +20,7 @@ slog = "2.7.0"
log = "0.4.14"
clap = "2.33.0"
daemonize = "0.4.1"
rust-s3 = { git = "https://github.com/hlinnaka/rust-s3", rev="7f15a24ec7daa0a5d9516da706212745f9042818", features = ["no-verify-ssl"] }
tokio = { version = "1.3.0", features = ["full"] }
tokio-stream = { version = "0.1.4" }
tokio-postgres = { git = "https://github.com/zenithdb/rust-postgres.git", rev="a0d067b66447951d1276a53fb09886539c3fa094" }
@@ -27,6 +28,8 @@ postgres-protocol = { git = "https://github.com/zenithdb/rust-postgres.git", rev
postgres = { git = "https://github.com/zenithdb/rust-postgres.git", rev="a0d067b66447951d1276a53fb09886539c3fa094" }
anyhow = "1.0"
crc32c = "0.6.0"
parse_duration = "*"
walkdir = "2"
# FIXME: 'pageserver' is needed for ZTimelineId. Refactor
pageserver = { path = "../pageserver" }

View File

@@ -3,10 +3,11 @@
//
use daemonize::Daemonize;
use log::*;
use parse_duration::parse;
use std::io;
use std::path::Path;
use std::path::PathBuf;
use std::path::{Path, PathBuf};
use std::thread;
use std::time::Duration;
use std::{fs::File, fs::OpenOptions};
use anyhow::Result;
@@ -14,6 +15,7 @@ use clap::{App, Arg};
use slog::Drain;
use walkeeper::s3_offload;
use walkeeper::wal_service;
use walkeeper::WalAcceptorConf;
@@ -42,11 +44,17 @@ fn main() -> Result<()> {
.help("listen for incoming connections on ip:port (default: 127.0.0.1:5454)"),
)
.arg(
Arg::with_name("pageserver")
.short("p")
.long("pageserver")
Arg::with_name("listen")
.short("l")
.long("listen")
.takes_value(true)
.help("address ip:port of pageserver with which wal_acceptor should establish connection"),
.help("listen for incoming connections on ip:port (default: 127.0.0.1:5454)"),
)
.arg(
Arg::with_name("ttl")
.long("ttl")
.takes_value(true)
.help("interval for keeping WAL as walkeeper node, after which them will be uploaded to S3 and removed locally"),
)
.arg(
Arg::with_name("daemonize")
@@ -74,6 +82,7 @@ fn main() -> Result<()> {
no_sync: false,
pageserver_addr: None,
listen_addr: "127.0.0.1:5454".parse()?,
ttl: None,
};
if let Some(dir) = arg_matches.value_of("datadir") {
@@ -94,9 +103,8 @@ fn main() -> Result<()> {
if let Some(addr) = arg_matches.value_of("listen") {
conf.listen_addr = addr.parse().unwrap();
}
if let Some(addr) = arg_matches.value_of("pageserver") {
conf.pageserver_addr = Some(addr.parse().unwrap());
if let Some(ttl) = arg_matches.value_of("ttl") {
conf.ttl = Some::<Duration>(parse(ttl)?);
}
start_wal_acceptor(conf)
@@ -138,6 +146,19 @@ fn start_wal_acceptor(conf: WalAcceptorConf) -> Result<()> {
}
let mut threads = Vec::new();
if conf.ttl.is_some() {
let s3_conf = conf.clone();
let s3_offload_thread = thread::Builder::new()
.name("S3 offload thread".into())
.spawn(|| {
// thread code
s3_offload::thread_main(s3_conf);
})
.unwrap();
threads.push(s3_offload_thread);
}
let wal_acceptor_thread = thread::Builder::new()
.name("WAL acceptor thread".into())
.spawn(|| {

View File

@@ -1,8 +1,10 @@
//
use std::net::SocketAddr;
use std::path::PathBuf;
use std::time::Duration;
mod pq_protocol;
pub mod s3_offload;
pub mod wal_service;
use crate::pq_protocol::SystemId;
@@ -15,4 +17,5 @@ pub struct WalAcceptorConf {
pub no_sync: bool,
pub listen_addr: SocketAddr,
pub pageserver_addr: Option<SocketAddr>,
pub ttl: Option<Duration>,
}

106
walkeeper/src/s3_offload.rs Normal file
View File

@@ -0,0 +1,106 @@
//
// Offload old WAL segments to S3 and remove them locally
//
use anyhow::Result;
use log::*;
use postgres_ffi::xlog_utils::*;
use s3::bucket::Bucket;
use s3::creds::Credentials;
use s3::region::Region;
use std::collections::HashSet;
use std::env;
use std::fs::{self, File};
use std::io::prelude::*;
use std::iter::FromIterator;
use std::path::PathBuf;
use std::time::SystemTime;
use tokio::runtime;
use tokio::time::sleep;
use walkdir::WalkDir;
use crate::WalAcceptorConf;
pub fn thread_main(conf: WalAcceptorConf) {
// Create a new thread pool
//
// FIXME: keep it single-threaded for now, make it easier to debug with gdb,
// and we're not concerned with performance yet.
//let runtime = runtime::Runtime::new().unwrap();
let runtime = runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
info!("Starting S3 offload task");
runtime.block_on(async {
main_loop(&conf).await.unwrap();
});
}
async fn offload_files(
bucket: &Bucket,
listing: &HashSet<String>,
dir_path: &PathBuf,
conf: &WalAcceptorConf,
) -> Result<u64> {
let horizon = SystemTime::now() - conf.ttl.unwrap();
let mut n: u64 = 0;
for entry in WalkDir::new(dir_path) {
let entry = entry?;
let path = entry.path();
if path.is_file()
&& IsXLogFileName(entry.file_name().to_str().unwrap())
&& entry.metadata().unwrap().created().unwrap() <= horizon
{
let relpath = path.strip_prefix(&conf.data_dir).unwrap();
let s3path = String::from("walarchive/") + relpath.to_str().unwrap();
if !listing.contains(&s3path) {
let mut file = File::open(&path)?;
let mut content = Vec::new();
file.read_to_end(&mut content)?;
bucket.put_object(s3path, &content).await?;
fs::remove_file(&path)?;
n += 1;
}
}
}
Ok(n)
}
async fn main_loop(conf: &WalAcceptorConf) -> Result<()> {
let region = Region::Custom {
region: env::var("S3_REGION").unwrap(),
endpoint: env::var("S3_ENDPOINT").unwrap(),
};
let credentials = Credentials::new(
Some(&env::var("S3_ACCESSKEY").unwrap()),
Some(&env::var("S3_SECRET").unwrap()),
None,
None,
None,
)
.unwrap();
// Create Bucket in REGION for BUCKET
let bucket = Bucket::new_with_path_style("zenith-testbucket", region, credentials)?;
loop {
// List out contents of directory
let results = bucket
.list("walarchive/".to_string(), Some("".to_string()))
.await?;
let listing = HashSet::from_iter(
results
.iter()
.flat_map(|b| b.contents.iter().map(|o| o.key.clone())),
);
let n = offload_files(&bucket, &listing, &conf.data_dir, conf).await?;
info!("Offload {} files to S3", n);
sleep(conf.ttl.unwrap()).await;
}
}

View File

@@ -192,7 +192,10 @@ mod tests {
assert_eq!(format!("{}", Lsn(0x12345678AAAA5555)), "12345678/AAAA5555");
assert_eq!(format!("{}", Lsn(0x000000010000000A)), "1/A");
assert_eq!(Lsn::from_hex("12345678AAAA5555"), Ok(Lsn(0x12345678AAAA5555)));
assert_eq!(
Lsn::from_hex("12345678AAAA5555"),
Ok(Lsn(0x12345678AAAA5555))
);
assert_eq!(Lsn::from_hex("0"), Ok(Lsn(0)));
assert_eq!(Lsn::from_hex("F12345678AAAA5555"), Err(LsnParseError));
}