Add upload thread

This commit is contained in:
Konstantin Knizhnik
2021-11-24 18:04:48 +03:00
parent 92562145c0
commit 5ad82418a9
8 changed files with 164 additions and 10 deletions

2
Cargo.lock generated
View File

@@ -2572,8 +2572,6 @@ checksum = "b07db065a5cf61a7e4ba64f29e67db906fb1787316516c4e6e5ff0fea1efcd8a"
[[package]]
name = "yakv"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8ae1ec7c67193f20a10b113492c57f097c34958d5bdc3c974a65361559aa21d9"
dependencies = [
"anyhow",
"fs2",

View File

@@ -38,7 +38,7 @@ const_format = "0.2.21"
tracing = "0.1.27"
signal-hook = {version = "0.3.10", features = ["extended-siginfo"] }
#yakv = { path = "../../yakv" }
yakv = "0.2.1"
yakv = "0.2.2"
lz4_flex = "0.9.0"
postgres_ffi = { path = "../postgres_ffi" }

View File

@@ -42,6 +42,8 @@ struct CfgFileParams {
listen_http_addr: Option<String>,
checkpoint_distance: Option<String>,
checkpoint_period: Option<String>,
upload_distance: Option<String>,
upload_period: Option<String>,
reconstruct_threshold: Option<String>,
gc_horizon: Option<String>,
gc_period: Option<String>,
@@ -104,6 +106,8 @@ impl CfgFileParams {
listen_http_addr: get_arg("listen-http"),
checkpoint_distance: get_arg("checkpoint_distance"),
checkpoint_period: get_arg("checkpoint_period"),
upload_distance: get_arg("upload_distance"),
upload_period: get_arg("upload_period"),
reconstruct_threshold: get_arg("reconstruct_threshold"),
gc_horizon: get_arg("gc_horizon"),
gc_period: get_arg("gc_period"),
@@ -123,6 +127,8 @@ impl CfgFileParams {
listen_http_addr: self.listen_http_addr.or(other.listen_http_addr),
checkpoint_distance: self.checkpoint_distance.or(other.checkpoint_distance),
checkpoint_period: self.checkpoint_period.or(other.checkpoint_period),
upload_distance: self.upload_distance.or(other.upload_distance),
upload_period: self.upload_period.or(other.upload_period),
reconstruct_threshold: self.reconstruct_threshold.or(other.reconstruct_threshold),
gc_horizon: self.gc_horizon.or(other.gc_horizon),
gc_period: self.gc_period.or(other.gc_period),
@@ -161,6 +167,15 @@ impl CfgFileParams {
None => DEFAULT_CHECKPOINT_PERIOD,
};
let upload_distance: u64 = match self.upload_distance.as_ref() {
Some(upload_distance_str) => upload_distance_str.parse()?,
None => DEFAULT_UPLOAD_DISTANCE,
};
let upload_period = match self.upload_period.as_ref() {
Some(upload_period_str) => humantime::parse_duration(upload_period_str)?,
None => DEFAULT_UPLOAD_PERIOD,
};
let reconstruct_threshold: u64 = match self.reconstruct_threshold.as_ref() {
Some(reconstruct_threshold_str) => reconstruct_threshold_str.parse()?,
None => DEFAULT_RECONSTRUCT_THRESHOLD,
@@ -244,6 +259,8 @@ impl CfgFileParams {
listen_http_addr,
checkpoint_distance,
checkpoint_period,
upload_distance,
upload_period,
reconstruct_threshold,
gc_horizon,
gc_period,
@@ -305,6 +322,18 @@ fn main() -> Result<()> {
.takes_value(true)
.help("Interval between checkpoint iterations"),
)
.arg(
Arg::with_name("checkpoint_distance")
.long("checkpoint_distance")
.takes_value(true)
.help("Distance from current LSN to perform checkpoint of in-memory layers"),
)
.arg(
Arg::with_name("upload_period")
.long("upload_period")
.takes_value(true)
.help("Interval between upload iterations"),
)
.arg(
Arg::with_name("reconstruct_threshold")
.long("reconstruct_threshold")
@@ -615,6 +644,8 @@ mod tests {
listen_http_addr: Some("listen_http_addr_VALUE".to_string()),
checkpoint_distance: Some("checkpoint_distance_VALUE".to_string()),
checkpoint_period: Some("checkpoint_period_VALUE".to_string()),
upload_distance: Some("upload_distance_VALUE".to_string()),
upload_period: Some("upload_period_VALUE".to_string()),
reconstruct_threshold: Some("reconstruct_threshold_VALUE".to_string()),
gc_horizon: Some("gc_horizon_VALUE".to_string()),
gc_period: Some("gc_period_VALUE".to_string()),
@@ -639,6 +670,8 @@ mod tests {
listen_http_addr = 'listen_http_addr_VALUE'
checkpoint_distance = 'checkpoint_distance_VALUE'
checkpoint_period = 'checkpoint_period_VALUE'
upload_distance = 'upload_distance_VALUE'
upload_period = 'upload_period_VALUE'
reconstruct_threshold = 'reconstruct_threshold_VALUE'
gc_horizon = 'gc_horizon_VALUE'
gc_period = 'gc_period_VALUE'
@@ -674,6 +707,8 @@ local_path = 'relish_storage_local_VALUE'
listen_http_addr: Some("listen_http_addr_VALUE".to_string()),
checkpoint_distance: Some("checkpoint_distance_VALUE".to_string()),
checkpoint_period: Some("checkpoint_period_VALUE".to_string()),
upload_distance: Some("upload_distance_VALUE".to_string()),
upload_period: Some("upload_period_VALUE".to_string()),
reconstruct_threshold: Some("reconstruct_threshold_VALUE".to_string()),
gc_horizon: Some("gc_horizon_VALUE".to_string()),
gc_period: Some("gc_period_VALUE".to_string()),
@@ -701,6 +736,8 @@ local_path = 'relish_storage_local_VALUE'
listen_http_addr = 'listen_http_addr_VALUE'
checkpoint_distance = 'checkpoint_distance_VALUE'
checkpoint_period = 'checkpoint_period_VALUE'
upload_distance = 'upload_distance_VALUE'
upload_period = 'upload_period_VALUE'
reconstruct_threshold = 'reconstruct_threshold_VALUE'
gc_horizon = 'gc_horizon_VALUE'
gc_period = 'gc_period_VALUE'

View File

@@ -427,6 +427,53 @@ impl BufferedRepository {
Ok(())
}
///
/// Launch the S3 uppload thread in given repository.
///
pub fn launch_upload_thread(
conf: &'static PageServerConf,
rc: Arc<BufferedRepository>,
) -> JoinHandle<()> {
std::thread::Builder::new()
.name("Upload thread".into())
.spawn(move || {
// FIXME: relaunch it? Panic is not good.
rc.upload_loop(conf).expect("Checkpointer thread died");
})
.unwrap()
}
///
/// Upload thread's main loop
///
fn upload_loop(&self, conf: &'static PageServerConf) -> Result<()> {
while !tenant_mgr::shutdown_requested() {
std::thread::sleep(conf.upload_period);
info!("upload thread for tenant {} waking up", self.tenantid);
{
let timelines: Vec<(ZTimelineId, Arc<BufferedTimeline>)> = self
.timelines
.lock()
.unwrap()
.iter()
.map(|pair| (*pair.0, pair.1.clone()))
.collect();
for (timelineid, timeline) in timelines.iter() {
let _entered =
info_span!("upload", timeline = %timelineid, tenant = %self.tenantid)
.entered();
STORAGE_TIME
.with_label_values(&["upload_timed"])
.observe_closure_duration(|| timeline.upload_internal())?
}
}
}
trace!("Upload thread shut down");
Ok(())
}
///
/// Launch the GC thread in given repository.
///
@@ -1001,9 +1048,11 @@ impl Timeline for BufferedTimeline {
blknum: 0,
lsn: Lsn(0),
});
let nosync = true;
// currently proceed block number
let mut from_blknum = 0;
let mut last_lsn = Lsn(0);
let mut page_versions: Vec<(u32, Lsn, PageVersion)> = Vec::new();
'pages: loop {
let iter = store.data.range(&from.ser()?..);
@@ -1011,7 +1060,7 @@ impl Timeline for BufferedTimeline {
let pair = entry?;
if let StoreKey::Data(dk) = StoreKey::des(&pair.0)? {
let same_seg = from_rel == dk.rel
&& dk.blknum / RELISH_SEG_SIZE < from_blknum / RELISH_SEG_SIZE;
&& dk.blknum / RELISH_SEG_SIZE == from_blknum / RELISH_SEG_SIZE;
if !same_seg && from_rel != zero_rel {
let is_dropped = dropped.contains(&from_rel);
let segtag = SegmentTag::from_blknum(from_rel, from_blknum);
@@ -1026,8 +1075,10 @@ impl Timeline for BufferedTimeline {
is_dropped,
page_versions.iter().map(|t| (t.0, t.1, &t.2)),
relsizes[&from_rel].clone(),
nosync,
)?;
page_versions.clear();
last_lsn = Lsn(0);
}
if !is_dropped {
let mut images: Vec<Bytes> =
@@ -1048,6 +1099,7 @@ impl Timeline for BufferedTimeline {
segtag,
end_lsn,
images,
nosync,
)?;
}
}
@@ -1060,7 +1112,7 @@ impl Timeline for BufferedTimeline {
blknum: from_blknum,
lsn: start_lsn,
});
} else if dk.lsn >= start_lsn {
} else if dk.lsn >= end_lsn {
from_blknum += 1;
from = StoreKey::Data(DataKey {
rel: from_rel,
@@ -1068,7 +1120,10 @@ impl Timeline for BufferedTimeline {
lsn: start_lsn,
});
} else {
page_versions.push((dk.blknum, dk.lsn, PageVersion::des(&pair.1)?));
if dk.lsn != last_lsn {
last_lsn = dk.lsn;
page_versions.push((dk.blknum, dk.lsn, PageVersion::des(&pair.1)?));
}
continue;
}
continue 'pages;
@@ -1092,6 +1147,7 @@ impl Timeline for BufferedTimeline {
is_dropped,
page_versions.iter().map(|t| (t.0, t.1, &t.2)),
relsizes[&from_rel].clone(),
nosync,
)?;
}
if !is_dropped {
@@ -1112,6 +1168,7 @@ impl Timeline for BufferedTimeline {
segtag,
end_lsn,
images,
nosync,
)?;
}
}
@@ -1369,6 +1426,52 @@ impl BufferedTimeline {
Ok(result)
}
///
/// Upload materialized page versions to S3
///
fn upload_internal(&self) -> Result<()> {
/*
// TODO: remember LSN of previos backup
let start_lsn = Lsn(0);
let end_lsn = self.get_last_record_lsn();
if start_lsn + self.conf.upload_distance < end_lsn {
self.export_timeline(start_lsn, end_lsn)?;
}
Ok(())
*/
self.make_snapshot()
}
fn make_snapshot(&self) -> Result<()> {
let store = self.store.read().unwrap();
let now = Instant::now();
if let Some(meta_hash) = &store.meta {
let lsn = self.get_last_record_lsn();
for (rel, snap) in meta_hash.iter() {
let rel_size = snap.size;
for segno in 0..(rel_size + RELISH_SEG_SIZE - 1) / RELISH_SEG_SIZE {
let first_blknum = segno * RELISH_SEG_SIZE;
let last_blknum = u32::min(first_blknum + RELISH_SEG_SIZE, rel_size);
let images: Result<Vec<Bytes>> = (first_blknum..last_blknum)
.map(|blknum| self.get_page_at_lsn(*rel, blknum, lsn))
.collect();
let segtag = SegmentTag::from_blknum(*rel, first_blknum);
ImageLayer::create(
self.conf,
self.timelineid,
self.tenantid,
segtag,
lsn,
images?,
true,
)?;
}
}
}
info!("Make snapshot in {:?}", now.elapsed());
Ok(())
}
///
/// Matrialize last page versions
///

View File

@@ -379,6 +379,7 @@ impl DeltaLayer {
dropped: bool,
page_versions: impl Iterator<Item = (u32, Lsn, &'a PageVersion)>,
relsizes: VecMap<Lsn, u32>,
nosync: bool,
) -> Result<DeltaLayer> {
if seg.rel.is_blocky() {
assert!(!relsizes.is_empty());
@@ -451,8 +452,9 @@ impl DeltaLayer {
// This flushes the underlying 'buf_writer'.
let writer = book.close()?;
writer.get_ref().sync_all()?;
if !nosync {
writer.get_ref().sync_all()?;
}
trace!("saved {}", &path.display());
drop(inner);

View File

@@ -258,6 +258,7 @@ impl ImageLayer {
seg: SegmentTag,
lsn: Lsn,
base_images: Vec<Bytes>,
nosync: bool,
) -> Result<ImageLayer> {
let image_type = if seg.rel.is_blocky() {
let num_blocks: u32 = base_images.len().try_into()?;
@@ -317,8 +318,9 @@ impl ImageLayer {
// This flushes the underlying 'buf_writer'.
let writer = book.close()?;
writer.get_ref().sync_all()?;
if !nosync {
writer.get_ref().sync_all()?;
}
trace!("saved {}", path.display());
drop(inner);

View File

@@ -36,6 +36,9 @@ pub mod defaults {
pub const DEFAULT_CHECKPOINT_DISTANCE: u64 = 256 * 1024 * 1024;
pub const DEFAULT_CHECKPOINT_PERIOD: Duration = Duration::from_secs(10);
pub const DEFAULT_UPLOAD_DISTANCE: u64 = 1024 * 1024 * 1024;
pub const DEFAULT_UPLOAD_PERIOD: Duration = Duration::from_secs(250);
pub const DEFAULT_RECONSTRUCT_THRESHOLD: u64 = 0;
pub const DEFAULT_GC_HORIZON: u64 = 1024;
@@ -66,6 +69,8 @@ pub struct PageServerConf {
// page server crashes.
pub checkpoint_distance: u64,
pub checkpoint_period: Duration,
pub upload_period: Duration,
pub upload_distance: u64,
pub reconstruct_threshold: u64,
pub gc_horizon: u64,
@@ -152,6 +157,8 @@ impl PageServerConf {
daemonize: false,
checkpoint_distance: defaults::DEFAULT_CHECKPOINT_DISTANCE,
checkpoint_period: Duration::from_secs(10),
upload_distance: defaults::DEFAULT_UPLOAD_DISTANCE,
upload_period: defaults::DEFAULT_UPLOAD_PERIOD,
reconstruct_threshold: defaults::DEFAULT_RECONSTRUCT_THRESHOLD,
gc_horizon: defaults::DEFAULT_GC_HORIZON,
gc_period: Duration::from_secs(10),

View File

@@ -62,6 +62,7 @@ fn access_tenants() -> MutexGuard<'static, HashMap<ZTenantId, Tenant>> {
struct TenantHandleEntry {
checkpointer_handle: Option<JoinHandle<()>>,
uploader_handle: Option<JoinHandle<()>>,
gc_handle: Option<JoinHandle<()>>,
}
@@ -107,11 +108,13 @@ fn init_repo(conf: &'static PageServerConf, tenant_id: ZTenantId) {
let checkpointer_handle = BufferedRepository::launch_checkpointer_thread(conf, repo.clone());
let gc_handle = BufferedRepository::launch_gc_thread(conf, repo.clone());
let uploader_handle = BufferedRepository::launch_upload_thread(conf, repo.clone());
let mut handles = TENANT_HANDLES.lock().unwrap();
let h = TenantHandleEntry {
checkpointer_handle: Some(checkpointer_handle),
gc_handle: Some(gc_handle),
uploader_handle: Some(uploader_handle),
};
handles.insert(tenant_id, h);
@@ -170,6 +173,8 @@ pub fn stop_tenant_threads(tenantid: ZTenantId) {
if let Some(h) = handles.get_mut(&tenantid) {
h.checkpointer_handle.take().map(JoinHandle::join);
debug!("checkpointer for tenant {} has stopped", tenantid);
h.uploader_handle.take().map(JoinHandle::join);
debug!("uploader for tenant {} has stopped", tenantid);
h.gc_handle.take().map(JoinHandle::join);
debug!("gc for tenant {} has stopped", tenantid);
}