mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-21 07:00:38 +00:00
Compare commits
18 Commits
fix-XLogWa
...
jcsp/sk-co
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
025a183091 | ||
|
|
baedfc90b5 | ||
|
|
9519f06d48 | ||
|
|
6e17c359bd | ||
|
|
78459aee43 | ||
|
|
81dbfc33c2 | ||
|
|
7771275cc6 | ||
|
|
86453b422d | ||
|
|
35d6599278 | ||
|
|
e8d0956cf7 | ||
|
|
053ada80b2 | ||
|
|
75801f0451 | ||
|
|
de8dfee4bd | ||
|
|
e3f51abadf | ||
|
|
a7b84cca5a | ||
|
|
291fcb9e4f | ||
|
|
a5ecca976e | ||
|
|
5caee4ca54 |
@@ -17,6 +17,7 @@
|
||||
!libs/
|
||||
!neon_local/
|
||||
!pageserver/
|
||||
!patches/
|
||||
!pgxn/
|
||||
!proxy/
|
||||
!s3_scrubber/
|
||||
|
||||
29
Cargo.lock
generated
29
Cargo.lock
generated
@@ -1072,9 +1072,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "chrono"
|
||||
version = "0.4.31"
|
||||
version = "0.4.38"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "7f2c685bad3eb3d45a01354cedb7d5faa66194d1d58ba6e267a8de788f79db38"
|
||||
checksum = "a21f936df1771bf62b77f047b726c4625ff2e8aa607c01ec06e5a05bd8463401"
|
||||
dependencies = [
|
||||
"android-tzdata",
|
||||
"iana-time-zone",
|
||||
@@ -1082,7 +1082,7 @@ dependencies = [
|
||||
"num-traits",
|
||||
"serde",
|
||||
"wasm-bindgen",
|
||||
"windows-targets 0.48.0",
|
||||
"windows-targets 0.52.4",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -1109,7 +1109,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "defaa24ecc093c77630e6c15e17c51f5e187bf35ee514f4e2d67baaa96dae22b"
|
||||
dependencies = [
|
||||
"ciborium-io",
|
||||
"half",
|
||||
"half 1.8.2",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -2278,6 +2278,17 @@ version = "1.8.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "eabb4a44450da02c90444cf74558da904edde8fb4e9035a9a6a4e15445af0bd7"
|
||||
|
||||
[[package]]
|
||||
name = "half"
|
||||
version = "2.4.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "6dd08c532ae367adf81c312a4580bc67f1d0fe8bc9c460520283f4c0ff277888"
|
||||
dependencies = [
|
||||
"cfg-if",
|
||||
"crunchy",
|
||||
"num-traits",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "hash32"
|
||||
version = "0.3.1"
|
||||
@@ -3902,12 +3913,13 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "parquet"
|
||||
version = "49.0.0"
|
||||
source = "git+https://github.com/neondatabase/arrow-rs?branch=neon-fix-bugs#8a0bc58aa67b98aabbd8eee7c6ca4281967ff9e9"
|
||||
version = "51.0.0"
|
||||
source = "git+https://github.com/apache/arrow-rs?branch=master#2534976a564be3d2d56312dc88fb1b6ed4cef829"
|
||||
dependencies = [
|
||||
"ahash",
|
||||
"bytes",
|
||||
"chrono",
|
||||
"half 2.4.1",
|
||||
"hashbrown 0.14.5",
|
||||
"num",
|
||||
"num-bigint",
|
||||
@@ -3916,12 +3928,13 @@ dependencies = [
|
||||
"thrift",
|
||||
"twox-hash",
|
||||
"zstd",
|
||||
"zstd-sys",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "parquet_derive"
|
||||
version = "49.0.0"
|
||||
source = "git+https://github.com/neondatabase/arrow-rs?branch=neon-fix-bugs#8a0bc58aa67b98aabbd8eee7c6ca4281967ff9e9"
|
||||
version = "51.0.0"
|
||||
source = "git+https://github.com/apache/arrow-rs?branch=master#2534976a564be3d2d56312dc88fb1b6ed4cef829"
|
||||
dependencies = [
|
||||
"parquet",
|
||||
"proc-macro2",
|
||||
|
||||
@@ -122,8 +122,8 @@ opentelemetry = "0.20.0"
|
||||
opentelemetry-otlp = { version = "0.13.0", default_features=false, features = ["http-proto", "trace", "http", "reqwest-client"] }
|
||||
opentelemetry-semantic-conventions = "0.12.0"
|
||||
parking_lot = "0.12"
|
||||
parquet = { version = "49.0.0", default-features = false, features = ["zstd"] }
|
||||
parquet_derive = "49.0.0"
|
||||
parquet = { version = "51.0.0", default-features = false, features = ["zstd"] }
|
||||
parquet_derive = "51.0.0"
|
||||
pbkdf2 = { version = "0.12.1", features = ["simple", "std"] }
|
||||
pin-project-lite = "0.2"
|
||||
procfs = "0.14"
|
||||
@@ -244,8 +244,8 @@ tonic-build = "0.9"
|
||||
tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", branch="neon" }
|
||||
|
||||
# bug fixes for UUID
|
||||
parquet = { git = "https://github.com/neondatabase/arrow-rs", branch = "neon-fix-bugs" }
|
||||
parquet_derive = { git = "https://github.com/neondatabase/arrow-rs", branch = "neon-fix-bugs" }
|
||||
parquet = { git = "https://github.com/apache/arrow-rs", branch = "master" }
|
||||
parquet_derive = { git = "https://github.com/apache/arrow-rs", branch = "master" }
|
||||
|
||||
################# Binary contents sections
|
||||
|
||||
|
||||
@@ -241,9 +241,12 @@ RUN wget https://github.com/df7cb/postgresql-unit/archive/refs/tags/7.7.tar.gz -
|
||||
FROM build-deps AS vector-pg-build
|
||||
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
|
||||
RUN wget https://github.com/pgvector/pgvector/archive/refs/tags/v0.5.1.tar.gz -O pgvector.tar.gz && \
|
||||
echo "cc7a8e034a96e30a819911ac79d32f6bc47bdd1aa2de4d7d4904e26b83209dc8 pgvector.tar.gz" | sha256sum --check && \
|
||||
COPY patches/pgvector.patch /pgvector.patch
|
||||
|
||||
RUN wget https://github.com/pgvector/pgvector/archive/refs/tags/v0.7.0.tar.gz -O pgvector.tar.gz && \
|
||||
echo "1b5503a35c265408b6eb282621c5e1e75f7801afc04eecb950796cfee2e3d1d8 pgvector.tar.gz" | sha256sum --check && \
|
||||
mkdir pgvector-src && cd pgvector-src && tar xvzf ../pgvector.tar.gz --strip-components=1 -C . && \
|
||||
patch -p1 < /pgvector.patch && \
|
||||
make -j $(getconf _NPROCESSORS_ONLN) PG_CONFIG=/usr/local/pgsql/bin/pg_config && \
|
||||
make -j $(getconf _NPROCESSORS_ONLN) install PG_CONFIG=/usr/local/pgsql/bin/pg_config && \
|
||||
echo 'trusted = true' >> /usr/local/pgsql/share/extension/vector.control
|
||||
|
||||
@@ -820,10 +820,11 @@ impl<IO: AsyncRead + AsyncWrite + Unpin> PostgresBackend<IO> {
|
||||
Ok(ProcessMsgResult::Continue)
|
||||
}
|
||||
|
||||
/// Log as info/error result of handling COPY stream and send back
|
||||
/// ErrorResponse if that makes sense. Shutdown the stream if we got
|
||||
/// Terminate. TODO: transition into waiting for Sync msg if we initiate the
|
||||
/// close.
|
||||
/// - Log as info/error result of handling COPY stream and send back
|
||||
/// ErrorResponse if that makes sense.
|
||||
/// - Shutdown the stream if we got Terminate.
|
||||
/// - Then close the connection because we don't handle exiting from COPY
|
||||
/// stream normally.
|
||||
pub async fn handle_copy_stream_end(&mut self, end: CopyStreamHandlerEnd) {
|
||||
use CopyStreamHandlerEnd::*;
|
||||
|
||||
@@ -849,10 +850,6 @@ impl<IO: AsyncRead + AsyncWrite + Unpin> PostgresBackend<IO> {
|
||||
}
|
||||
}
|
||||
|
||||
if let Terminate = &end {
|
||||
self.state = ProtoState::Closed;
|
||||
}
|
||||
|
||||
let err_to_send_and_errcode = match &end {
|
||||
ServerInitiated(_) => Some((end.to_string(), SQLSTATE_SUCCESSFUL_COMPLETION)),
|
||||
Other(_) => Some((format!("{end:#}"), SQLSTATE_INTERNAL_ERROR)),
|
||||
@@ -882,6 +879,12 @@ impl<IO: AsyncRead + AsyncWrite + Unpin> PostgresBackend<IO> {
|
||||
error!("failed to send ErrorResponse: {}", ee);
|
||||
}
|
||||
}
|
||||
|
||||
// Proper COPY stream finishing to continue using the connection is not
|
||||
// implemented at the server side (we don't need it so far). To prevent
|
||||
// further usages of the connection, close it.
|
||||
self.framed.shutdown().await.ok();
|
||||
self.state = ProtoState::Closed;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -359,7 +359,7 @@ impl Crafter for LastWalRecordXlogSwitchEndsOnPageBoundary {
|
||||
// Is there enough space on the page for another logical message and an
|
||||
// XLOG_SWITCH? If not, start over.
|
||||
let page_remain = XLOG_BLCKSZ as u64 - u64::from(after_lsn) % XLOG_BLCKSZ as u64;
|
||||
if page_remain < base_size - XLOG_SIZE_OF_XLOG_RECORD as u64 {
|
||||
if page_remain < base_size + XLOG_SIZE_OF_XLOG_RECORD as u64 {
|
||||
continue;
|
||||
}
|
||||
|
||||
|
||||
@@ -62,14 +62,10 @@ use super::{
|
||||
CommandRequest, DownloadCommand,
|
||||
};
|
||||
|
||||
/// For each tenant, how long must have passed since the last download_tenant call before
|
||||
/// calling it again. This is approximately the time by which local data is allowed
|
||||
/// to fall behind remote data.
|
||||
///
|
||||
/// TODO: this should just be a default, and the actual period should be controlled
|
||||
/// via the heatmap itself
|
||||
/// `<ttps://github.com/neondatabase/neon/issues/6200>`
|
||||
const DOWNLOAD_FRESHEN_INTERVAL: Duration = Duration::from_millis(60000);
|
||||
/// For each tenant, default period for how long must have passed since the last download_tenant call before
|
||||
/// calling it again. This default is replaced with the value of [`HeatMapTenant::upload_period_ms`] after first
|
||||
/// download, if the uploader populated it.
|
||||
const DEFAULT_DOWNLOAD_INTERVAL: Duration = Duration::from_millis(60000);
|
||||
|
||||
/// Range of concurrency we may use when downloading layers within a timeline. This is independent
|
||||
/// for each tenant we're downloading: the concurrency of _tenants_ is defined separately in
|
||||
@@ -152,14 +148,22 @@ pub(super) struct SecondaryDetailTimeline {
|
||||
pub(super) evicted_at: HashMap<LayerName, SystemTime>,
|
||||
}
|
||||
|
||||
// Aspects of a heatmap that we remember after downloading it
|
||||
#[derive(Clone, Debug)]
|
||||
struct DownloadSummary {
|
||||
etag: Etag,
|
||||
#[allow(unused)]
|
||||
mtime: SystemTime,
|
||||
upload_period: Duration,
|
||||
}
|
||||
|
||||
/// This state is written by the secondary downloader, it is opaque
|
||||
/// to TenantManager
|
||||
#[derive(Debug)]
|
||||
pub(super) struct SecondaryDetail {
|
||||
pub(super) config: SecondaryLocationConfig,
|
||||
|
||||
last_download: Option<Instant>,
|
||||
last_etag: Option<Etag>,
|
||||
last_download: Option<DownloadSummary>,
|
||||
next_download: Option<Instant>,
|
||||
pub(super) timelines: HashMap<TimelineId, SecondaryDetailTimeline>,
|
||||
}
|
||||
@@ -189,7 +193,6 @@ impl SecondaryDetail {
|
||||
Self {
|
||||
config,
|
||||
last_download: None,
|
||||
last_etag: None,
|
||||
next_download: None,
|
||||
timelines: HashMap::new(),
|
||||
}
|
||||
@@ -243,9 +246,8 @@ impl SecondaryDetail {
|
||||
|
||||
struct PendingDownload {
|
||||
secondary_state: Arc<SecondaryTenant>,
|
||||
last_download: Option<Instant>,
|
||||
last_download: Option<DownloadSummary>,
|
||||
target_time: Option<Instant>,
|
||||
period: Option<Duration>,
|
||||
}
|
||||
|
||||
impl scheduler::PendingJob for PendingDownload {
|
||||
@@ -295,10 +297,17 @@ impl JobGenerator<PendingDownload, RunningDownload, CompleteDownload, DownloadCo
|
||||
|
||||
tracing::debug!("Secondary tenant download completed");
|
||||
|
||||
// Update freshened_at even if there was an error: we don't want errored tenants to implicitly
|
||||
// take priority to run again.
|
||||
let mut detail = secondary_state.detail.lock().unwrap();
|
||||
detail.next_download = Some(Instant::now() + period_jitter(DOWNLOAD_FRESHEN_INTERVAL, 5));
|
||||
|
||||
let period = detail
|
||||
.last_download
|
||||
.as_ref()
|
||||
.map(|d| d.upload_period)
|
||||
.unwrap_or(DEFAULT_DOWNLOAD_INTERVAL);
|
||||
|
||||
// We advance next_download irrespective of errors: we don't want error cases to result in
|
||||
// expensive busy-polling.
|
||||
detail.next_download = Some(Instant::now() + period_jitter(period, 5));
|
||||
}
|
||||
|
||||
async fn schedule(&mut self) -> SchedulingResult<PendingDownload> {
|
||||
@@ -331,11 +340,11 @@ impl JobGenerator<PendingDownload, RunningDownload, CompleteDownload, DownloadCo
|
||||
if detail.next_download.is_none() {
|
||||
// Initialize randomly in the range from 0 to our interval: this uniformly spreads the start times. Subsequent
|
||||
// rounds will use a smaller jitter to avoid accidentally synchronizing later.
|
||||
detail.next_download = Some(now.checked_add(period_warmup(DOWNLOAD_FRESHEN_INTERVAL)).expect(
|
||||
detail.next_download = Some(now.checked_add(period_warmup(DEFAULT_DOWNLOAD_INTERVAL)).expect(
|
||||
"Using our constant, which is known to be small compared with clock range",
|
||||
));
|
||||
}
|
||||
(detail.last_download, detail.next_download.unwrap())
|
||||
(detail.last_download.clone(), detail.next_download.unwrap())
|
||||
};
|
||||
|
||||
if now > next_download {
|
||||
@@ -343,7 +352,6 @@ impl JobGenerator<PendingDownload, RunningDownload, CompleteDownload, DownloadCo
|
||||
secondary_state: secondary_tenant,
|
||||
last_download,
|
||||
target_time: Some(next_download),
|
||||
period: Some(DOWNLOAD_FRESHEN_INTERVAL),
|
||||
})
|
||||
} else {
|
||||
None
|
||||
@@ -369,7 +377,6 @@ impl JobGenerator<PendingDownload, RunningDownload, CompleteDownload, DownloadCo
|
||||
|
||||
Ok(PendingDownload {
|
||||
target_time: None,
|
||||
period: None,
|
||||
last_download: None,
|
||||
secondary_state: tenant,
|
||||
})
|
||||
@@ -386,7 +393,6 @@ impl JobGenerator<PendingDownload, RunningDownload, CompleteDownload, DownloadCo
|
||||
secondary_state,
|
||||
last_download,
|
||||
target_time,
|
||||
period,
|
||||
} = job;
|
||||
|
||||
let (completion, barrier) = utils::completion::channel();
|
||||
@@ -423,20 +429,15 @@ impl JobGenerator<PendingDownload, RunningDownload, CompleteDownload, DownloadCo
|
||||
|
||||
// If the job had a target execution time, we may check our final execution
|
||||
// time against that for observability purposes.
|
||||
if let (Some(target_time), Some(period)) = (target_time, period) {
|
||||
// Only track execution lag if this isn't our first download: otherwise, it is expected
|
||||
// that execution will have taken longer than our configured interval, for example
|
||||
// when starting up a pageserver and
|
||||
if last_download.is_some() {
|
||||
// Elapsed time includes any scheduling lag as well as the execution of the job
|
||||
let elapsed = Instant::now().duration_since(target_time);
|
||||
if let (Some(target_time), Some(last_download)) = (target_time, last_download) {
|
||||
// Elapsed time includes any scheduling lag as well as the execution of the job
|
||||
let elapsed = Instant::now().duration_since(target_time);
|
||||
|
||||
warn_when_period_overrun(
|
||||
elapsed,
|
||||
period,
|
||||
BackgroundLoopKind::SecondaryDownload,
|
||||
);
|
||||
}
|
||||
warn_when_period_overrun(
|
||||
elapsed,
|
||||
last_download.upload_period,
|
||||
BackgroundLoopKind::SecondaryDownload,
|
||||
);
|
||||
}
|
||||
|
||||
CompleteDownload {
|
||||
@@ -525,12 +526,12 @@ impl<'a> TenantDownloader<'a> {
|
||||
let tenant_shard_id = self.secondary_state.get_tenant_shard_id();
|
||||
|
||||
// We will use the etag from last successful download to make the download conditional on changes
|
||||
let last_etag = self
|
||||
let last_download = self
|
||||
.secondary_state
|
||||
.detail
|
||||
.lock()
|
||||
.unwrap()
|
||||
.last_etag
|
||||
.last_download
|
||||
.clone();
|
||||
|
||||
// Download the tenant's heatmap
|
||||
@@ -539,7 +540,7 @@ impl<'a> TenantDownloader<'a> {
|
||||
etag: heatmap_etag,
|
||||
bytes: heatmap_bytes,
|
||||
} = match tokio::select!(
|
||||
bytes = self.download_heatmap(last_etag.as_ref()) => {bytes?},
|
||||
bytes = self.download_heatmap(last_download.as_ref().map(|d| &d.etag)) => {bytes?},
|
||||
_ = self.secondary_state.cancel.cancelled() => return Ok(())
|
||||
) {
|
||||
HeatMapDownload::Unmodified => {
|
||||
@@ -599,7 +600,14 @@ impl<'a> TenantDownloader<'a> {
|
||||
|
||||
// Only update last_etag after a full successful download: this way will not skip
|
||||
// the next download, even if the heatmap's actual etag is unchanged.
|
||||
self.secondary_state.detail.lock().unwrap().last_etag = Some(heatmap_etag);
|
||||
self.secondary_state.detail.lock().unwrap().last_download = Some(DownloadSummary {
|
||||
etag: heatmap_etag,
|
||||
mtime: heatmap_mtime,
|
||||
upload_period: heatmap
|
||||
.upload_period_ms
|
||||
.map(|ms| Duration::from_millis(ms as u64))
|
||||
.unwrap_or(DEFAULT_DOWNLOAD_INTERVAL),
|
||||
});
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
78
patches/pgvector.patch
Normal file
78
patches/pgvector.patch
Normal file
@@ -0,0 +1,78 @@
|
||||
From 0b0194a57bd0f3598bd57dbedd0df3932330169d Mon Sep 17 00:00:00 2001
|
||||
From: Heikki Linnakangas <heikki.linnakangas@iki.fi>
|
||||
Date: Fri, 2 Feb 2024 22:26:45 +0200
|
||||
Subject: [PATCH 1/1] Make v0.6.0 work with Neon
|
||||
|
||||
Now that the WAL-logging happens as a separate step at the end of the
|
||||
build, we need a few neon-specific hints to make it work.
|
||||
---
|
||||
src/hnswbuild.c | 36 ++++++++++++++++++++++++++++++++++++
|
||||
1 file changed, 36 insertions(+)
|
||||
|
||||
diff --git a/src/hnswbuild.c b/src/hnswbuild.c
|
||||
index 680789b..ec54dea 100644
|
||||
--- a/src/hnswbuild.c
|
||||
+++ b/src/hnswbuild.c
|
||||
@@ -840,9 +840,17 @@ HnswParallelBuildMain(dsm_segment *seg, shm_toc *toc)
|
||||
|
||||
hnswarea = shm_toc_lookup(toc, PARALLEL_KEY_HNSW_AREA, false);
|
||||
|
||||
+#ifdef NEON_SMGR
|
||||
+ smgr_start_unlogged_build(RelationGetSmgr(indexRel));
|
||||
+#endif
|
||||
+
|
||||
/* Perform inserts */
|
||||
HnswParallelScanAndInsert(heapRel, indexRel, hnswshared, hnswarea, false);
|
||||
|
||||
+#ifdef NEON_SMGR
|
||||
+ smgr_finish_unlogged_build_phase_1(RelationGetSmgr(indexRel));
|
||||
+#endif
|
||||
+
|
||||
/* Close relations within worker */
|
||||
index_close(indexRel, indexLockmode);
|
||||
table_close(heapRel, heapLockmode);
|
||||
@@ -1089,13 +1097,41 @@ BuildIndex(Relation heap, Relation index, IndexInfo *indexInfo,
|
||||
SeedRandom(42);
|
||||
#endif
|
||||
|
||||
+#ifdef NEON_SMGR
|
||||
+ smgr_start_unlogged_build(RelationGetSmgr(index));
|
||||
+#endif
|
||||
+
|
||||
InitBuildState(buildstate, heap, index, indexInfo, forkNum);
|
||||
|
||||
BuildGraph(buildstate, forkNum);
|
||||
|
||||
+#ifdef NEON_SMGR
|
||||
+ smgr_finish_unlogged_build_phase_1(RelationGetSmgr(index));
|
||||
+#endif
|
||||
+
|
||||
if (RelationNeedsWAL(index))
|
||||
+ {
|
||||
log_newpage_range(index, forkNum, 0, RelationGetNumberOfBlocks(index), true);
|
||||
|
||||
+#ifdef NEON_SMGR
|
||||
+ {
|
||||
+#if PG_VERSION_NUM >= 160000
|
||||
+ RelFileLocator rlocator = RelationGetSmgr(index)->smgr_rlocator.locator;
|
||||
+#else
|
||||
+ RelFileNode rlocator = RelationGetSmgr(index)->smgr_rnode.node;
|
||||
+#endif
|
||||
+
|
||||
+ SetLastWrittenLSNForBlockRange(XactLastRecEnd, rlocator,
|
||||
+ MAIN_FORKNUM, 0, RelationGetNumberOfBlocks(index));
|
||||
+ SetLastWrittenLSNForRelation(XactLastRecEnd, rlocator, MAIN_FORKNUM);
|
||||
+ }
|
||||
+#endif
|
||||
+ }
|
||||
+
|
||||
+#ifdef NEON_SMGR
|
||||
+ smgr_end_unlogged_build(RelationGetSmgr(index));
|
||||
+#endif
|
||||
+
|
||||
FreeBuildState(buildstate);
|
||||
}
|
||||
|
||||
--
|
||||
2.39.2
|
||||
|
||||
@@ -45,6 +45,7 @@
|
||||
*/
|
||||
#include "postgres.h"
|
||||
|
||||
#include "access/parallel.h"
|
||||
#include "access/xact.h"
|
||||
#include "access/xlog.h"
|
||||
#include "access/xlogdefs.h"
|
||||
@@ -2822,10 +2823,14 @@ neon_start_unlogged_build(SMgrRelation reln)
|
||||
reln->smgr_relpersistence = RELPERSISTENCE_UNLOGGED;
|
||||
|
||||
/*
|
||||
* Create the local file. In a parallel build, the leader is expected to
|
||||
* call this first and do it.
|
||||
*
|
||||
* FIXME: should we pass isRedo true to create the tablespace dir if it
|
||||
* doesn't exist? Is it needed?
|
||||
*/
|
||||
mdcreate(reln, MAIN_FORKNUM, false);
|
||||
if (!IsParallelWorker())
|
||||
mdcreate(reln, MAIN_FORKNUM, false);
|
||||
}
|
||||
|
||||
/*
|
||||
@@ -2849,7 +2854,17 @@ neon_finish_unlogged_build_phase_1(SMgrRelation reln)
|
||||
Assert(unlogged_build_phase == UNLOGGED_BUILD_PHASE_1);
|
||||
Assert(reln->smgr_relpersistence == RELPERSISTENCE_UNLOGGED);
|
||||
|
||||
unlogged_build_phase = UNLOGGED_BUILD_PHASE_2;
|
||||
/*
|
||||
* In a parallel build, (only) the leader process performs the 2nd
|
||||
* phase.
|
||||
*/
|
||||
if (IsParallelWorker())
|
||||
{
|
||||
unlogged_build_rel = NULL;
|
||||
unlogged_build_phase = UNLOGGED_BUILD_NOT_IN_PROGRESS;
|
||||
}
|
||||
else
|
||||
unlogged_build_phase = UNLOGGED_BUILD_PHASE_2;
|
||||
}
|
||||
|
||||
/*
|
||||
|
||||
@@ -307,7 +307,7 @@ where
|
||||
}
|
||||
|
||||
async fn upload_parquet(
|
||||
w: SerializedFileWriter<Writer<BytesMut>>,
|
||||
mut w: SerializedFileWriter<Writer<BytesMut>>,
|
||||
len: i64,
|
||||
storage: &GenericRemoteStorage,
|
||||
) -> anyhow::Result<Writer<BytesMut>> {
|
||||
@@ -319,11 +319,15 @@ async fn upload_parquet(
|
||||
|
||||
// I don't know how compute intensive this is, although it probably isn't much... better be safe than sorry.
|
||||
// finish method only available on the fork: https://github.com/apache/arrow-rs/issues/5253
|
||||
let (writer, metadata) = tokio::task::spawn_blocking(move || w.finish())
|
||||
let (mut buffer, metadata) =
|
||||
tokio::task::spawn_blocking(move || -> parquet::errors::Result<_> {
|
||||
let metadata = w.finish()?;
|
||||
let buffer = std::mem::take(w.inner_mut().get_mut());
|
||||
Ok((buffer, metadata))
|
||||
})
|
||||
.await
|
||||
.unwrap()?;
|
||||
|
||||
let mut buffer = writer.into_inner();
|
||||
let data = buffer.split().freeze();
|
||||
|
||||
let compression = len as f64 / len_uncompressed as f64;
|
||||
@@ -474,10 +478,11 @@ mod tests {
|
||||
RequestData {
|
||||
session_id: uuid::Builder::from_random_bytes(rng.gen()).into_uuid(),
|
||||
peer_addr: Ipv4Addr::from(rng.gen::<[u8; 4]>()).to_string(),
|
||||
timestamp: chrono::NaiveDateTime::from_timestamp_millis(
|
||||
timestamp: chrono::DateTime::from_timestamp_millis(
|
||||
rng.gen_range(1703862754..1803862754),
|
||||
)
|
||||
.unwrap(),
|
||||
.unwrap()
|
||||
.naive_utc(),
|
||||
application_name: Some("test".to_owned()),
|
||||
username: Some(hex::encode(rng.gen::<[u8; 4]>())),
|
||||
endpoint_id: Some(hex::encode(rng.gen::<[u8; 16]>())),
|
||||
@@ -560,15 +565,15 @@ mod tests {
|
||||
assert_eq!(
|
||||
file_stats,
|
||||
[
|
||||
(1315008, 3, 6000),
|
||||
(1315001, 3, 6000),
|
||||
(1315061, 3, 6000),
|
||||
(1315018, 3, 6000),
|
||||
(1315148, 3, 6000),
|
||||
(1314990, 3, 6000),
|
||||
(1314782, 3, 6000),
|
||||
(1315018, 3, 6000),
|
||||
(438575, 1, 2000)
|
||||
(1315314, 3, 6000),
|
||||
(1315307, 3, 6000),
|
||||
(1315367, 3, 6000),
|
||||
(1315324, 3, 6000),
|
||||
(1315454, 3, 6000),
|
||||
(1315296, 3, 6000),
|
||||
(1315088, 3, 6000),
|
||||
(1315324, 3, 6000),
|
||||
(438713, 1, 2000)
|
||||
]
|
||||
);
|
||||
|
||||
@@ -598,11 +603,11 @@ mod tests {
|
||||
assert_eq!(
|
||||
file_stats,
|
||||
[
|
||||
(1221738, 5, 10000),
|
||||
(1227888, 5, 10000),
|
||||
(1229682, 5, 10000),
|
||||
(1229044, 5, 10000),
|
||||
(1220322, 5, 10000)
|
||||
(1222212, 5, 10000),
|
||||
(1228362, 5, 10000),
|
||||
(1230156, 5, 10000),
|
||||
(1229518, 5, 10000),
|
||||
(1220796, 5, 10000)
|
||||
]
|
||||
);
|
||||
|
||||
@@ -634,11 +639,11 @@ mod tests {
|
||||
assert_eq!(
|
||||
file_stats,
|
||||
[
|
||||
(1207385, 5, 10000),
|
||||
(1207116, 5, 10000),
|
||||
(1207409, 5, 10000),
|
||||
(1207397, 5, 10000),
|
||||
(1207652, 5, 10000)
|
||||
(1207859, 5, 10000),
|
||||
(1207590, 5, 10000),
|
||||
(1207883, 5, 10000),
|
||||
(1207871, 5, 10000),
|
||||
(1208126, 5, 10000)
|
||||
]
|
||||
);
|
||||
|
||||
@@ -663,15 +668,15 @@ mod tests {
|
||||
assert_eq!(
|
||||
file_stats,
|
||||
[
|
||||
(1315008, 3, 6000),
|
||||
(1315001, 3, 6000),
|
||||
(1315061, 3, 6000),
|
||||
(1315018, 3, 6000),
|
||||
(1315148, 3, 6000),
|
||||
(1314990, 3, 6000),
|
||||
(1314782, 3, 6000),
|
||||
(1315018, 3, 6000),
|
||||
(438575, 1, 2000)
|
||||
(1315314, 3, 6000),
|
||||
(1315307, 3, 6000),
|
||||
(1315367, 3, 6000),
|
||||
(1315324, 3, 6000),
|
||||
(1315454, 3, 6000),
|
||||
(1315296, 3, 6000),
|
||||
(1315088, 3, 6000),
|
||||
(1315324, 3, 6000),
|
||||
(438713, 1, 2000)
|
||||
]
|
||||
);
|
||||
|
||||
@@ -708,7 +713,7 @@ mod tests {
|
||||
// files are smaller than the size threshold, but they took too long to fill so were flushed early
|
||||
assert_eq!(
|
||||
file_stats,
|
||||
[(659240, 2, 3001), (658954, 2, 3000), (658750, 2, 2999)]
|
||||
[(659462, 2, 3001), (659176, 2, 3000), (658972, 2, 2999)]
|
||||
);
|
||||
|
||||
tmpdir.close().unwrap();
|
||||
|
||||
@@ -20,7 +20,6 @@ use std::str::FromStr;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use storage_broker::Uri;
|
||||
use tokio::sync::mpsc;
|
||||
|
||||
use tracing::*;
|
||||
use utils::pid_file;
|
||||
@@ -30,13 +29,13 @@ use safekeeper::defaults::{
|
||||
DEFAULT_HEARTBEAT_TIMEOUT, DEFAULT_HTTP_LISTEN_ADDR, DEFAULT_MAX_OFFLOADER_LAG_BYTES,
|
||||
DEFAULT_PARTIAL_BACKUP_TIMEOUT, DEFAULT_PG_LISTEN_ADDR,
|
||||
};
|
||||
use safekeeper::remove_wal;
|
||||
use safekeeper::wal_service;
|
||||
use safekeeper::GlobalTimelines;
|
||||
use safekeeper::SafeKeeperConf;
|
||||
use safekeeper::{broker, WAL_SERVICE_RUNTIME};
|
||||
use safekeeper::{control_file, BROKER_RUNTIME};
|
||||
use safekeeper::{http, WAL_REMOVER_RUNTIME};
|
||||
use safekeeper::{remove_wal, WAL_BACKUP_RUNTIME};
|
||||
use safekeeper::{wal_backup, HTTP_RUNTIME};
|
||||
use storage_broker::DEFAULT_ENDPOINT;
|
||||
use utils::auth::{JwtAuth, Scope, SwappableJwtAuth};
|
||||
@@ -377,8 +376,6 @@ async fn start_safekeeper(conf: SafeKeeperConf) -> Result<()> {
|
||||
let timeline_collector = safekeeper::metrics::TimelineCollector::new();
|
||||
metrics::register_internal(Box::new(timeline_collector))?;
|
||||
|
||||
let (wal_backup_launcher_tx, wal_backup_launcher_rx) = mpsc::channel(100);
|
||||
|
||||
wal_backup::init_remote_storage(&conf);
|
||||
|
||||
// Keep handles to main tasks to die if any of them disappears.
|
||||
@@ -391,19 +388,9 @@ async fn start_safekeeper(conf: SafeKeeperConf) -> Result<()> {
|
||||
let current_thread_rt = conf
|
||||
.current_thread_runtime
|
||||
.then(|| Handle::try_current().expect("no runtime in main"));
|
||||
let conf_ = conf.clone();
|
||||
let wal_backup_handle = current_thread_rt
|
||||
.as_ref()
|
||||
.unwrap_or_else(|| WAL_BACKUP_RUNTIME.handle())
|
||||
.spawn(wal_backup::wal_backup_launcher_task_main(
|
||||
conf_,
|
||||
wal_backup_launcher_rx,
|
||||
))
|
||||
.map(|res| ("WAL backup launcher".to_owned(), res));
|
||||
tasks_handles.push(Box::pin(wal_backup_handle));
|
||||
|
||||
// Load all timelines from disk to memory.
|
||||
GlobalTimelines::init(conf.clone(), wal_backup_launcher_tx).await?;
|
||||
GlobalTimelines::init(conf.clone()).await?;
|
||||
|
||||
let conf_ = conf.clone();
|
||||
// Run everything in current thread rt, if asked.
|
||||
|
||||
@@ -46,6 +46,8 @@ async fn push_loop(conf: SafeKeeperConf) -> anyhow::Result<()> {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let active_timelines_set = GlobalTimelines::get_global_broker_active_set();
|
||||
|
||||
let mut client =
|
||||
storage_broker::connect(conf.broker_endpoint.clone(), conf.broker_keepalive_interval)?;
|
||||
let push_interval = Duration::from_millis(PUSH_INTERVAL_MSEC);
|
||||
@@ -57,15 +59,9 @@ async fn push_loop(conf: SafeKeeperConf) -> anyhow::Result<()> {
|
||||
// sensitive and there is no risk of deadlock as we don't await while
|
||||
// lock is held.
|
||||
let now = Instant::now();
|
||||
let all_tlis = GlobalTimelines::get_all();
|
||||
let all_tlis = active_timelines_set.get_all();
|
||||
let mut n_pushed_tlis = 0;
|
||||
for tli in &all_tlis {
|
||||
// filtering alternative futures::stream::iter(all_tlis)
|
||||
// .filter(|tli| {let tli = tli.clone(); async move { tli.is_active().await}}).collect::<Vec<_>>().await;
|
||||
// doesn't look better, and I'm not sure how to do that without collect.
|
||||
if !tli.is_active().await {
|
||||
continue;
|
||||
}
|
||||
let sk_info = tli.get_safekeeper_info(&conf).await;
|
||||
yield sk_info;
|
||||
BROKER_PUSHED_UPDATES.inc();
|
||||
@@ -90,6 +86,7 @@ async fn push_loop(conf: SafeKeeperConf) -> anyhow::Result<()> {
|
||||
}
|
||||
|
||||
/// Subscribe and fetch all the interesting data from the broker.
|
||||
#[instrument(name = "broker pull", skip_all)]
|
||||
async fn pull_loop(conf: SafeKeeperConf, stats: Arc<BrokerStats>) -> Result<()> {
|
||||
let mut client = storage_broker::connect(conf.broker_endpoint, conf.broker_keepalive_interval)?;
|
||||
|
||||
@@ -319,7 +316,7 @@ async fn task_stats(stats: Arc<BrokerStats>) {
|
||||
|
||||
let now = BrokerStats::now_millis();
|
||||
if now > last_pulled && now - last_pulled > warn_duration.as_millis() as u64 {
|
||||
let ts = chrono::NaiveDateTime::from_timestamp_millis(last_pulled as i64).expect("invalid timestamp");
|
||||
let ts = chrono::DateTime::from_timestamp_millis(last_pulled as i64).expect("invalid timestamp");
|
||||
info!("no broker updates for some time, last update: {:?}", ts);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -31,6 +31,8 @@ pub mod safekeeper;
|
||||
pub mod send_wal;
|
||||
pub mod state;
|
||||
pub mod timeline;
|
||||
pub mod timeline_manager;
|
||||
pub mod timelines_set;
|
||||
pub mod wal_backup;
|
||||
pub mod wal_backup_partial;
|
||||
pub mod wal_service;
|
||||
|
||||
@@ -11,8 +11,9 @@ use futures::Future;
|
||||
use metrics::{
|
||||
core::{AtomicU64, Collector, Desc, GenericCounter, GenericGaugeVec, Opts},
|
||||
proto::MetricFamily,
|
||||
register_int_counter, register_int_counter_pair_vec, register_int_counter_vec, Gauge,
|
||||
IntCounter, IntCounterPairVec, IntCounterVec, IntGaugeVec,
|
||||
register_int_counter, register_int_counter_pair, register_int_counter_pair_vec,
|
||||
register_int_counter_vec, Gauge, IntCounter, IntCounterPair, IntCounterPairVec, IntCounterVec,
|
||||
IntGaugeVec,
|
||||
};
|
||||
use once_cell::sync::Lazy;
|
||||
|
||||
@@ -162,6 +163,29 @@ pub static PARTIAL_BACKUP_UPLOADED_BYTES: Lazy<IntCounter> = Lazy::new(|| {
|
||||
)
|
||||
.expect("Failed to register safekeeper_partial_backup_uploaded_bytes_total counter")
|
||||
});
|
||||
pub static MANAGER_ITERATIONS_TOTAL: Lazy<IntCounter> = Lazy::new(|| {
|
||||
register_int_counter!(
|
||||
"safekeeper_manager_iterations_total",
|
||||
"Number of iterations of the timeline manager task"
|
||||
)
|
||||
.expect("Failed to register safekeeper_manager_iterations_total counter")
|
||||
});
|
||||
pub static MANAGER_ACTIVE_CHANGES: Lazy<IntCounter> = Lazy::new(|| {
|
||||
register_int_counter!(
|
||||
"safekeeper_manager_active_changes_total",
|
||||
"Number of timeline active status changes in the timeline manager task"
|
||||
)
|
||||
.expect("Failed to register safekeeper_manager_active_changes_total counter")
|
||||
});
|
||||
pub static WAL_BACKUP_TASKS: Lazy<IntCounterPair> = Lazy::new(|| {
|
||||
register_int_counter_pair!(
|
||||
"safekeeper_wal_backup_tasks_started_total",
|
||||
"Number of active WAL backup tasks",
|
||||
"safekeeper_wal_backup_tasks_finished_total",
|
||||
"Number of finished WAL backup tasks",
|
||||
)
|
||||
.expect("Failed to register safekeeper_wal_backup_tasks_finished_total counter")
|
||||
});
|
||||
|
||||
pub const LABEL_UNKNOWN: &str = "unknown";
|
||||
|
||||
@@ -614,8 +638,7 @@ impl Collector for TimelineCollector {
|
||||
self.written_wal_seconds.reset();
|
||||
self.flushed_wal_seconds.reset();
|
||||
|
||||
let timelines = GlobalTimelines::get_all();
|
||||
let timelines_count = timelines.len();
|
||||
let timelines_count = GlobalTimelines::get_all().len();
|
||||
let mut active_timelines_count = 0;
|
||||
|
||||
// Prometheus Collector is sync, and data is stored under async lock. To
|
||||
@@ -746,9 +769,9 @@ impl Collector for TimelineCollector {
|
||||
|
||||
async fn collect_timeline_metrics() -> Vec<FullTimelineInfo> {
|
||||
let mut res = vec![];
|
||||
let timelines = GlobalTimelines::get_all();
|
||||
let active_timelines = GlobalTimelines::get_global_broker_active_set().get_all();
|
||||
|
||||
for tli in timelines {
|
||||
for tli in active_timelines {
|
||||
if let Some(info) = tli.info_for_metrics().await {
|
||||
res.push(info);
|
||||
}
|
||||
|
||||
@@ -45,6 +45,9 @@ const DEFAULT_FEEDBACK_CAPACITY: usize = 8;
|
||||
pub struct WalReceivers {
|
||||
mutex: Mutex<WalReceiversShared>,
|
||||
pageserver_feedback_tx: tokio::sync::broadcast::Sender<PageserverFeedback>,
|
||||
|
||||
num_computes_tx: tokio::sync::watch::Sender<usize>,
|
||||
num_computes_rx: tokio::sync::watch::Receiver<usize>,
|
||||
}
|
||||
|
||||
/// Id under which walreceiver is registered in shmem.
|
||||
@@ -55,16 +58,21 @@ impl WalReceivers {
|
||||
let (pageserver_feedback_tx, _) =
|
||||
tokio::sync::broadcast::channel(DEFAULT_FEEDBACK_CAPACITY);
|
||||
|
||||
let (num_computes_tx, num_computes_rx) = tokio::sync::watch::channel(0usize);
|
||||
|
||||
Arc::new(WalReceivers {
|
||||
mutex: Mutex::new(WalReceiversShared { slots: Vec::new() }),
|
||||
pageserver_feedback_tx,
|
||||
num_computes_tx,
|
||||
num_computes_rx,
|
||||
})
|
||||
}
|
||||
|
||||
/// Register new walreceiver. Returned guard provides access to the slot and
|
||||
/// automatically deregisters in Drop.
|
||||
pub fn register(self: &Arc<WalReceivers>, conn_id: Option<ConnectionId>) -> WalReceiverGuard {
|
||||
let slots = &mut self.mutex.lock().slots;
|
||||
let mut shared = self.mutex.lock();
|
||||
let slots = &mut shared.slots;
|
||||
let walreceiver = WalReceiverState {
|
||||
conn_id,
|
||||
status: WalReceiverStatus::Voting,
|
||||
@@ -78,6 +86,9 @@ impl WalReceivers {
|
||||
slots.push(Some(walreceiver));
|
||||
pos
|
||||
};
|
||||
|
||||
self.update_num(&shared);
|
||||
|
||||
WalReceiverGuard {
|
||||
id: pos,
|
||||
walreceivers: self.clone(),
|
||||
@@ -99,7 +110,18 @@ impl WalReceivers {
|
||||
|
||||
/// Get number of walreceivers (compute connections).
|
||||
pub fn get_num(self: &Arc<WalReceivers>) -> usize {
|
||||
self.mutex.lock().slots.iter().flatten().count()
|
||||
self.mutex.lock().get_num()
|
||||
}
|
||||
|
||||
/// Get channel for number of walreceivers.
|
||||
pub fn get_num_rx(self: &Arc<WalReceivers>) -> tokio::sync::watch::Receiver<usize> {
|
||||
self.num_computes_rx.clone()
|
||||
}
|
||||
|
||||
/// Should get called after every update of slots.
|
||||
fn update_num(self: &Arc<WalReceivers>, shared: &MutexGuard<WalReceiversShared>) {
|
||||
let num = shared.get_num();
|
||||
self.num_computes_tx.send_replace(num);
|
||||
}
|
||||
|
||||
/// Get state of all walreceivers.
|
||||
@@ -123,6 +145,7 @@ impl WalReceivers {
|
||||
fn unregister(self: &Arc<WalReceivers>, id: WalReceiverId) {
|
||||
let mut shared = self.mutex.lock();
|
||||
shared.slots[id] = None;
|
||||
self.update_num(&shared);
|
||||
}
|
||||
|
||||
/// Broadcast pageserver feedback to connected walproposers.
|
||||
@@ -137,6 +160,13 @@ struct WalReceiversShared {
|
||||
slots: Vec<Option<WalReceiverState>>,
|
||||
}
|
||||
|
||||
impl WalReceiversShared {
|
||||
/// Get number of walreceivers (compute connections).
|
||||
fn get_num(&self) -> usize {
|
||||
self.slots.iter().flatten().count()
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct WalReceiverState {
|
||||
/// None means it is recovery initiated by us (this safekeeper).
|
||||
@@ -183,9 +213,19 @@ impl SafekeeperPostgresHandler {
|
||||
&mut self,
|
||||
pgb: &mut PostgresBackend<IO>,
|
||||
) -> Result<(), QueryError> {
|
||||
if let Err(end) = self.handle_start_wal_push_guts(pgb).await {
|
||||
let mut tli: Option<Arc<Timeline>> = None;
|
||||
if let Err(end) = self.handle_start_wal_push_guts(pgb, &mut tli).await {
|
||||
// Log the result and probably send it to the client, closing the stream.
|
||||
pgb.handle_copy_stream_end(end).await;
|
||||
let handle_end_fut = pgb.handle_copy_stream_end(end);
|
||||
// If we managed to create the timeline, augment logging with current LSNs etc.
|
||||
if let Some(tli) = tli {
|
||||
let info = tli.get_safekeeper_info(&self.conf).await;
|
||||
handle_end_fut
|
||||
.instrument(info_span!("", term=%info.term, last_log_term=%info.last_log_term, flush_lsn=%Lsn(info.flush_lsn), commit_lsn=%Lsn(info.commit_lsn)))
|
||||
.await;
|
||||
} else {
|
||||
handle_end_fut.await;
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
@@ -193,6 +233,7 @@ impl SafekeeperPostgresHandler {
|
||||
pub async fn handle_start_wal_push_guts<IO: AsyncRead + AsyncWrite + Unpin>(
|
||||
&mut self,
|
||||
pgb: &mut PostgresBackend<IO>,
|
||||
tli: &mut Option<Arc<Timeline>>,
|
||||
) -> Result<(), CopyStreamHandlerEnd> {
|
||||
// Notify the libpq client that it's allowed to send `CopyData` messages
|
||||
pgb.write_message(&BeMessage::CopyBothResponse).await?;
|
||||
@@ -222,13 +263,17 @@ impl SafekeeperPostgresHandler {
|
||||
// Read first message and create timeline if needed.
|
||||
let res = network_reader.read_first_message().await;
|
||||
|
||||
let res = if let Ok((tli, next_msg)) = res {
|
||||
let network_res = if let Ok((timeline, next_msg)) = res {
|
||||
let pageserver_feedback_rx: tokio::sync::broadcast::Receiver<PageserverFeedback> =
|
||||
tli.get_walreceivers().pageserver_feedback_tx.subscribe();
|
||||
timeline
|
||||
.get_walreceivers()
|
||||
.pageserver_feedback_tx
|
||||
.subscribe();
|
||||
*tli = Some(timeline.clone());
|
||||
|
||||
tokio::select! {
|
||||
// todo: add read|write .context to these errors
|
||||
r = network_reader.run(msg_tx, msg_rx, reply_tx, tli.clone(), next_msg) => r,
|
||||
r = network_reader.run(msg_tx, msg_rx, reply_tx, timeline.clone(), next_msg) => r,
|
||||
r = network_write(pgb, reply_rx, pageserver_feedback_rx) => r,
|
||||
}
|
||||
} else {
|
||||
@@ -244,13 +289,13 @@ impl SafekeeperPostgresHandler {
|
||||
match acceptor_handle {
|
||||
None => {
|
||||
// failed even before spawning; read_network should have error
|
||||
Err(res.expect_err("no error with WalAcceptor not spawn"))
|
||||
Err(network_res.expect_err("no error with WalAcceptor not spawn"))
|
||||
}
|
||||
Some(handle) => {
|
||||
let wal_acceptor_res = handle.await;
|
||||
|
||||
// If there was any network error, return it.
|
||||
res?;
|
||||
network_res?;
|
||||
|
||||
// Otherwise, WalAcceptor thread must have errored.
|
||||
match wal_acceptor_res {
|
||||
@@ -441,14 +486,7 @@ impl WalAcceptor {
|
||||
/// The main loop. Returns Ok(()) if either msg_rx or reply_tx got closed;
|
||||
/// it must mean that network thread terminated.
|
||||
async fn run(&mut self) -> anyhow::Result<()> {
|
||||
// Register the connection and defer unregister.
|
||||
// Order of the next two lines is important: we want first to remove our entry and then
|
||||
// update status which depends on registered connections.
|
||||
let _compute_conn_guard = ComputeConnectionGuard {
|
||||
timeline: Arc::clone(&self.tli),
|
||||
};
|
||||
let walreceiver_guard = self.tli.get_walreceivers().register(self.conn_id);
|
||||
self.tli.update_status_notify().await?;
|
||||
|
||||
// After this timestamp we will stop processing AppendRequests and send a response
|
||||
// to the walproposer. walproposer sends at least one AppendRequest per second,
|
||||
@@ -514,19 +552,3 @@ impl WalAcceptor {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Calls update_status_notify in drop to update timeline status.
|
||||
struct ComputeConnectionGuard {
|
||||
timeline: Arc<Timeline>,
|
||||
}
|
||||
|
||||
impl Drop for ComputeConnectionGuard {
|
||||
fn drop(&mut self) {
|
||||
let tli = self.timeline.clone();
|
||||
tokio::spawn(async move {
|
||||
if let Err(e) = tli.update_status_notify().await {
|
||||
error!("failed to update timeline status: {}", e);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
@@ -37,17 +37,11 @@ use crate::{
|
||||
#[instrument(name = "recovery task", skip_all, fields(ttid = %tli.ttid))]
|
||||
pub async fn recovery_main(tli: Arc<Timeline>, conf: SafeKeeperConf) {
|
||||
info!("started");
|
||||
let mut cancellation_rx = match tli.get_cancellation_rx() {
|
||||
Ok(rx) => rx,
|
||||
Err(_) => {
|
||||
info!("timeline canceled during task start");
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
let cancel = tli.cancel.clone();
|
||||
select! {
|
||||
_ = recovery_main_loop(tli, conf) => { unreachable!() }
|
||||
_ = cancellation_rx.changed() => {
|
||||
_ = cancel.cancelled() => {
|
||||
info!("stopped");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -7,29 +7,18 @@ use tracing::*;
|
||||
|
||||
use crate::{GlobalTimelines, SafeKeeperConf};
|
||||
|
||||
const ALLOW_INACTIVE_TIMELINES: bool = true;
|
||||
|
||||
pub async fn task_main(conf: SafeKeeperConf) -> anyhow::Result<()> {
|
||||
pub async fn task_main(_conf: SafeKeeperConf) -> anyhow::Result<()> {
|
||||
let wal_removal_interval = Duration::from_millis(5000);
|
||||
loop {
|
||||
let now = tokio::time::Instant::now();
|
||||
let mut active_timelines = 0;
|
||||
|
||||
let tlis = GlobalTimelines::get_all();
|
||||
for tli in &tlis {
|
||||
let is_active = tli.is_active().await;
|
||||
if is_active {
|
||||
active_timelines += 1;
|
||||
}
|
||||
if !ALLOW_INACTIVE_TIMELINES && !is_active {
|
||||
continue;
|
||||
}
|
||||
let ttid = tli.ttid;
|
||||
async {
|
||||
if let Err(e) = tli.maybe_persist_control_file().await {
|
||||
warn!("failed to persist control file: {e}");
|
||||
}
|
||||
if let Err(e) = tli.remove_old_wal(conf.wal_backup_enabled).await {
|
||||
if let Err(e) = tli.remove_old_wal().await {
|
||||
error!("failed to remove WAL: {}", e);
|
||||
}
|
||||
}
|
||||
@@ -42,8 +31,8 @@ pub async fn task_main(conf: SafeKeeperConf) -> anyhow::Result<()> {
|
||||
|
||||
if elapsed > wal_removal_interval {
|
||||
info!(
|
||||
"WAL removal is too long, processed {} active timelines ({} total) in {:?}",
|
||||
active_timelines, total_timelines, elapsed
|
||||
"WAL removal is too long, processed {} timelines in {:?}",
|
||||
total_timelines, elapsed
|
||||
);
|
||||
}
|
||||
|
||||
|
||||
@@ -340,12 +340,16 @@ impl SafekeeperPostgresHandler {
|
||||
start_pos: Lsn,
|
||||
term: Option<Term>,
|
||||
) -> Result<(), QueryError> {
|
||||
let tli = GlobalTimelines::get(self.ttid).map_err(|e| QueryError::Other(e.into()))?;
|
||||
if let Err(end) = self
|
||||
.handle_start_replication_guts(pgb, start_pos, term)
|
||||
.handle_start_replication_guts(pgb, start_pos, term, tli.clone())
|
||||
.await
|
||||
{
|
||||
let info = tli.get_safekeeper_info(&self.conf).await;
|
||||
// Log the result and probably send it to the client, closing the stream.
|
||||
pgb.handle_copy_stream_end(end).await;
|
||||
pgb.handle_copy_stream_end(end)
|
||||
.instrument(info_span!("", term=%info.term, last_log_term=%info.last_log_term, flush_lsn=%Lsn(info.flush_lsn), commit_lsn=%Lsn(info.flush_lsn)))
|
||||
.await;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
@@ -355,10 +359,9 @@ impl SafekeeperPostgresHandler {
|
||||
pgb: &mut PostgresBackend<IO>,
|
||||
start_pos: Lsn,
|
||||
term: Option<Term>,
|
||||
tli: Arc<Timeline>,
|
||||
) -> Result<(), CopyStreamHandlerEnd> {
|
||||
let appname = self.appname.clone();
|
||||
let tli =
|
||||
GlobalTimelines::get(self.ttid).map_err(|e| CopyStreamHandlerEnd::Other(e.into()))?;
|
||||
|
||||
// Use a guard object to remove our entry from the timeline when we are done.
|
||||
let ws_guard = Arc::new(tli.get_walsenders().register(
|
||||
|
||||
@@ -6,15 +6,16 @@ use camino::Utf8PathBuf;
|
||||
use postgres_ffi::XLogSegNo;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use tokio::fs;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use utils::sync::gate::Gate;
|
||||
|
||||
use std::cmp::max;
|
||||
use std::ops::{Deref, DerefMut};
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use tokio::sync::{Mutex, MutexGuard};
|
||||
use tokio::{
|
||||
sync::{mpsc::Sender, watch},
|
||||
time::Instant,
|
||||
};
|
||||
use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};
|
||||
use tokio::{sync::watch, time::Instant};
|
||||
use tracing::*;
|
||||
use utils::http::error::ApiError;
|
||||
use utils::{
|
||||
@@ -33,12 +34,13 @@ use crate::safekeeper::{
|
||||
};
|
||||
use crate::send_wal::WalSenders;
|
||||
use crate::state::{TimelineMemState, TimelinePersistentState};
|
||||
use crate::timelines_set::TimelinesSet;
|
||||
use crate::wal_backup::{self};
|
||||
use crate::{control_file, safekeeper::UNKNOWN_SERVER_VERSION};
|
||||
|
||||
use crate::metrics::FullTimelineInfo;
|
||||
use crate::wal_storage::Storage as wal_storage_iface;
|
||||
use crate::{debug_dump, wal_backup_partial, wal_storage};
|
||||
use crate::{debug_dump, timeline_manager, wal_backup_partial, wal_storage};
|
||||
use crate::{GlobalTimelines, SafeKeeperConf};
|
||||
|
||||
/// Things safekeeper should know about timeline state on peers.
|
||||
@@ -51,8 +53,7 @@ pub struct PeerInfo {
|
||||
/// LSN of the last record.
|
||||
pub flush_lsn: Lsn,
|
||||
pub commit_lsn: Lsn,
|
||||
/// Since which LSN safekeeper has WAL. TODO: remove this once we fill new
|
||||
/// sk since backup_lsn.
|
||||
/// Since which LSN safekeeper has WAL.
|
||||
pub local_start_lsn: Lsn,
|
||||
/// When info was received. Serde annotations are not very useful but make
|
||||
/// the code compile -- we don't rely on this field externally.
|
||||
@@ -97,25 +98,72 @@ impl PeersInfo {
|
||||
}
|
||||
}
|
||||
|
||||
pub type ReadGuardSharedState<'a> = RwLockReadGuard<'a, SharedState>;
|
||||
|
||||
/// WriteGuardSharedState is a wrapper around `RwLockWriteGuard<SharedState>` that
|
||||
/// automatically updates `watch::Sender` channels with state on drop.
|
||||
pub struct WriteGuardSharedState<'a> {
|
||||
tli: Arc<Timeline>,
|
||||
guard: RwLockWriteGuard<'a, SharedState>,
|
||||
}
|
||||
|
||||
impl<'a> WriteGuardSharedState<'a> {
|
||||
fn new(tli: Arc<Timeline>, guard: RwLockWriteGuard<'a, SharedState>) -> Self {
|
||||
WriteGuardSharedState { tli, guard }
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> Deref for WriteGuardSharedState<'a> {
|
||||
type Target = SharedState;
|
||||
|
||||
fn deref(&self) -> &Self::Target {
|
||||
&self.guard
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> DerefMut for WriteGuardSharedState<'a> {
|
||||
fn deref_mut(&mut self) -> &mut Self::Target {
|
||||
&mut self.guard
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> Drop for WriteGuardSharedState<'a> {
|
||||
fn drop(&mut self) {
|
||||
let term_flush_lsn = TermLsn::from((self.guard.sk.get_term(), self.guard.sk.flush_lsn()));
|
||||
let commit_lsn = self.guard.sk.state.inmem.commit_lsn;
|
||||
|
||||
let _ = self.tli.term_flush_lsn_watch_tx.send_if_modified(|old| {
|
||||
if *old != term_flush_lsn {
|
||||
*old = term_flush_lsn;
|
||||
true
|
||||
} else {
|
||||
false
|
||||
}
|
||||
});
|
||||
|
||||
let _ = self.tli.commit_lsn_watch_tx.send_if_modified(|old| {
|
||||
if *old != commit_lsn {
|
||||
*old = commit_lsn;
|
||||
true
|
||||
} else {
|
||||
false
|
||||
}
|
||||
});
|
||||
|
||||
// send notification about shared state update
|
||||
self.tli.shared_state_version_tx.send_modify(|old| {
|
||||
*old += 1;
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
/// Shared state associated with database instance
|
||||
pub struct SharedState {
|
||||
/// Safekeeper object
|
||||
sk: SafeKeeper<control_file::FileStorage, wal_storage::PhysicalStorage>,
|
||||
pub(crate) sk: SafeKeeper<control_file::FileStorage, wal_storage::PhysicalStorage>,
|
||||
/// In memory list containing state of peers sent in latest messages from them.
|
||||
peers_info: PeersInfo,
|
||||
/// True when WAL backup launcher oversees the timeline, making sure WAL is
|
||||
/// offloaded, allows to bother launcher less.
|
||||
wal_backup_active: bool,
|
||||
/// True whenever there is at least some pending activity on timeline: live
|
||||
/// compute connection, pageserver is not caughtup (it must have latest WAL
|
||||
/// for new compute start) or WAL backuping is not finished. Practically it
|
||||
/// means safekeepers broadcast info to peers about the timeline, old WAL is
|
||||
/// trimmed.
|
||||
///
|
||||
/// TODO: it might be better to remove tli completely from GlobalTimelines
|
||||
/// when tli is inactive instead of having this flag.
|
||||
active: bool,
|
||||
last_removed_segno: XLogSegNo,
|
||||
pub(crate) peers_info: PeersInfo,
|
||||
pub(crate) last_removed_segno: XLogSegNo,
|
||||
}
|
||||
|
||||
impl SharedState {
|
||||
@@ -152,8 +200,6 @@ impl SharedState {
|
||||
Ok(Self {
|
||||
sk,
|
||||
peers_info: PeersInfo(vec![]),
|
||||
wal_backup_active: false,
|
||||
active: false,
|
||||
last_removed_segno: 0,
|
||||
})
|
||||
}
|
||||
@@ -171,75 +217,10 @@ impl SharedState {
|
||||
Ok(Self {
|
||||
sk: SafeKeeper::new(control_store, wal_store, conf.my_id)?,
|
||||
peers_info: PeersInfo(vec![]),
|
||||
wal_backup_active: false,
|
||||
active: false,
|
||||
last_removed_segno: 0,
|
||||
})
|
||||
}
|
||||
|
||||
fn is_active(&self, num_computes: usize) -> bool {
|
||||
self.is_wal_backup_required(num_computes)
|
||||
// FIXME: add tracking of relevant pageservers and check them here individually,
|
||||
// otherwise migration won't work (we suspend too early).
|
||||
|| self.sk.state.inmem.remote_consistent_lsn < self.sk.state.inmem.commit_lsn
|
||||
}
|
||||
|
||||
/// Mark timeline active/inactive and return whether s3 offloading requires
|
||||
/// start/stop action. If timeline is deactivated, control file is persisted
|
||||
/// as maintenance task does that only for active timelines.
|
||||
async fn update_status(&mut self, num_computes: usize, ttid: TenantTimelineId) -> bool {
|
||||
let is_active = self.is_active(num_computes);
|
||||
if self.active != is_active {
|
||||
info!(
|
||||
"timeline {} active={} now, remote_consistent_lsn={}, commit_lsn={}",
|
||||
ttid,
|
||||
is_active,
|
||||
self.sk.state.inmem.remote_consistent_lsn,
|
||||
self.sk.state.inmem.commit_lsn
|
||||
);
|
||||
if !is_active {
|
||||
if let Err(e) = self.sk.state.flush().await {
|
||||
warn!("control file save in update_status failed: {:?}", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
self.active = is_active;
|
||||
self.is_wal_backup_action_pending(num_computes)
|
||||
}
|
||||
|
||||
/// Should we run s3 offloading in current state?
|
||||
fn is_wal_backup_required(&self, num_computes: usize) -> bool {
|
||||
let seg_size = self.get_wal_seg_size();
|
||||
num_computes > 0 ||
|
||||
// Currently only the whole segment is offloaded, so compare segment numbers.
|
||||
(self.sk.state.inmem.commit_lsn.segment_number(seg_size) >
|
||||
self.sk.state.inmem.backup_lsn.segment_number(seg_size))
|
||||
}
|
||||
|
||||
/// Is current state of s3 offloading is not what it ought to be?
|
||||
fn is_wal_backup_action_pending(&self, num_computes: usize) -> bool {
|
||||
let res = self.wal_backup_active != self.is_wal_backup_required(num_computes);
|
||||
if res {
|
||||
let action_pending = if self.is_wal_backup_required(num_computes) {
|
||||
"start"
|
||||
} else {
|
||||
"stop"
|
||||
};
|
||||
trace!(
|
||||
"timeline {} s3 offloading action {} pending: num_computes={}, commit_lsn={}, backup_lsn={}",
|
||||
self.sk.state.timeline_id, action_pending, num_computes, self.sk.state.inmem.commit_lsn, self.sk.state.inmem.backup_lsn
|
||||
);
|
||||
}
|
||||
res
|
||||
}
|
||||
|
||||
/// Returns whether s3 offloading is required and sets current status as
|
||||
/// matching.
|
||||
fn wal_backup_attend(&mut self, num_computes: usize) -> bool {
|
||||
self.wal_backup_active = self.is_wal_backup_required(num_computes);
|
||||
self.wal_backup_active
|
||||
}
|
||||
|
||||
fn get_wal_seg_size(&self) -> usize {
|
||||
self.sk.state.server.wal_seg_size as usize
|
||||
}
|
||||
@@ -276,7 +257,7 @@ impl SharedState {
|
||||
/// Get our latest view of alive peers status on the timeline.
|
||||
/// We pass our own info through the broker as well, so when we don't have connection
|
||||
/// to the broker returned vec is empty.
|
||||
fn get_peers(&self, heartbeat_timeout: Duration) -> Vec<PeerInfo> {
|
||||
pub(crate) fn get_peers(&self, heartbeat_timeout: Duration) -> Vec<PeerInfo> {
|
||||
let now = Instant::now();
|
||||
self.peers_info
|
||||
.0
|
||||
@@ -292,18 +273,13 @@ impl SharedState {
|
||||
/// offloading.
|
||||
/// While it is safe to use inmem values for determining horizon,
|
||||
/// we use persistent to make possible normal states less surprising.
|
||||
fn get_horizon_segno(
|
||||
&self,
|
||||
wal_backup_enabled: bool,
|
||||
extra_horizon_lsn: Option<Lsn>,
|
||||
) -> XLogSegNo {
|
||||
fn get_horizon_segno(&self, extra_horizon_lsn: Option<Lsn>) -> XLogSegNo {
|
||||
let state = &self.sk.state;
|
||||
|
||||
use std::cmp::min;
|
||||
let mut horizon_lsn = min(state.remote_consistent_lsn, state.peer_horizon_lsn);
|
||||
if wal_backup_enabled {
|
||||
horizon_lsn = min(horizon_lsn, state.backup_lsn);
|
||||
}
|
||||
// we don't want to remove WAL that is not yet offloaded to s3
|
||||
horizon_lsn = min(horizon_lsn, state.backup_lsn);
|
||||
if let Some(extra_horizon_lsn) = extra_horizon_lsn {
|
||||
horizon_lsn = min(horizon_lsn, extra_horizon_lsn);
|
||||
}
|
||||
@@ -344,11 +320,6 @@ impl From<TimelineError> for ApiError {
|
||||
pub struct Timeline {
|
||||
pub ttid: TenantTimelineId,
|
||||
|
||||
/// Sending here asks for wal backup launcher attention (start/stop
|
||||
/// offloading). Sending ttid instead of concrete command allows to do
|
||||
/// sending without timeline lock.
|
||||
pub wal_backup_launcher_tx: Sender<TenantTimelineId>,
|
||||
|
||||
/// Used to broadcast commit_lsn updates to all background jobs.
|
||||
commit_lsn_watch_tx: watch::Sender<Lsn>,
|
||||
commit_lsn_watch_rx: watch::Receiver<Lsn>,
|
||||
@@ -360,19 +331,22 @@ pub struct Timeline {
|
||||
term_flush_lsn_watch_tx: watch::Sender<TermLsn>,
|
||||
term_flush_lsn_watch_rx: watch::Receiver<TermLsn>,
|
||||
|
||||
/// Broadcasts shared state updates.
|
||||
shared_state_version_tx: watch::Sender<usize>,
|
||||
shared_state_version_rx: watch::Receiver<usize>,
|
||||
|
||||
/// Safekeeper and other state, that should remain consistent and
|
||||
/// synchronized with the disk. This is tokio mutex as we write WAL to disk
|
||||
/// while holding it, ensuring that consensus checks are in order.
|
||||
mutex: Mutex<SharedState>,
|
||||
mutex: RwLock<SharedState>,
|
||||
walsenders: Arc<WalSenders>,
|
||||
walreceivers: Arc<WalReceivers>,
|
||||
|
||||
/// Cancellation channel. Delete/cancel will send `true` here as a cancellation signal.
|
||||
cancellation_tx: watch::Sender<bool>,
|
||||
/// Delete/cancel will trigger this, background tasks should drop out as soon as it fires
|
||||
pub(crate) cancel: CancellationToken,
|
||||
|
||||
/// Timeline should not be used after cancellation. Background tasks should
|
||||
/// monitor this channel and stop eventually after receiving `true` from this channel.
|
||||
cancellation_rx: watch::Receiver<bool>,
|
||||
/// Gate to be held by background tasks, blocks timeline deletion
|
||||
pub(crate) gate: Gate,
|
||||
|
||||
/// Directory where timeline state is stored.
|
||||
pub timeline_dir: Utf8PathBuf,
|
||||
@@ -382,15 +356,15 @@ pub struct Timeline {
|
||||
/// with different speed.
|
||||
// TODO: add `Arc<SafeKeeperConf>` here instead of adding each field separately.
|
||||
walsenders_keep_horizon: bool,
|
||||
|
||||
// timeline_manager controlled state
|
||||
pub(crate) broker_active: AtomicBool,
|
||||
pub(crate) wal_backup_active: AtomicBool,
|
||||
}
|
||||
|
||||
impl Timeline {
|
||||
/// Load existing timeline from disk.
|
||||
pub fn load_timeline(
|
||||
conf: &SafeKeeperConf,
|
||||
ttid: TenantTimelineId,
|
||||
wal_backup_launcher_tx: Sender<TenantTimelineId>,
|
||||
) -> Result<Timeline> {
|
||||
pub fn load_timeline(conf: &SafeKeeperConf, ttid: TenantTimelineId) -> Result<Timeline> {
|
||||
let _enter = info_span!("load_timeline", timeline = %ttid.timeline_id).entered();
|
||||
|
||||
let shared_state = SharedState::restore(conf, &ttid)?;
|
||||
@@ -400,23 +374,26 @@ impl Timeline {
|
||||
shared_state.sk.get_term(),
|
||||
shared_state.sk.flush_lsn(),
|
||||
)));
|
||||
let (cancellation_tx, cancellation_rx) = watch::channel(false);
|
||||
let (shared_state_version_tx, shared_state_version_rx) = watch::channel(0);
|
||||
|
||||
let walreceivers = WalReceivers::new();
|
||||
Ok(Timeline {
|
||||
ttid,
|
||||
wal_backup_launcher_tx,
|
||||
commit_lsn_watch_tx,
|
||||
commit_lsn_watch_rx,
|
||||
term_flush_lsn_watch_tx,
|
||||
term_flush_lsn_watch_rx,
|
||||
mutex: Mutex::new(shared_state),
|
||||
shared_state_version_tx,
|
||||
shared_state_version_rx,
|
||||
mutex: RwLock::new(shared_state),
|
||||
walsenders: WalSenders::new(walreceivers.clone()),
|
||||
walreceivers,
|
||||
cancellation_rx,
|
||||
cancellation_tx,
|
||||
cancel: CancellationToken::default(),
|
||||
gate: Gate::default(),
|
||||
timeline_dir: conf.timeline_dir(&ttid),
|
||||
walsenders_keep_horizon: conf.walsenders_keep_horizon,
|
||||
broker_active: AtomicBool::new(false),
|
||||
wal_backup_active: AtomicBool::new(false),
|
||||
})
|
||||
}
|
||||
|
||||
@@ -424,7 +401,6 @@ impl Timeline {
|
||||
pub fn create_empty(
|
||||
conf: &SafeKeeperConf,
|
||||
ttid: TenantTimelineId,
|
||||
wal_backup_launcher_tx: Sender<TenantTimelineId>,
|
||||
server_info: ServerInfo,
|
||||
commit_lsn: Lsn,
|
||||
local_start_lsn: Lsn,
|
||||
@@ -432,25 +408,29 @@ impl Timeline {
|
||||
let (commit_lsn_watch_tx, commit_lsn_watch_rx) = watch::channel(Lsn::INVALID);
|
||||
let (term_flush_lsn_watch_tx, term_flush_lsn_watch_rx) =
|
||||
watch::channel(TermLsn::from((INVALID_TERM, Lsn::INVALID)));
|
||||
let (cancellation_tx, cancellation_rx) = watch::channel(false);
|
||||
let (shared_state_version_tx, shared_state_version_rx) = watch::channel(0);
|
||||
|
||||
let state =
|
||||
TimelinePersistentState::new(&ttid, server_info, vec![], commit_lsn, local_start_lsn);
|
||||
|
||||
let walreceivers = WalReceivers::new();
|
||||
Ok(Timeline {
|
||||
ttid,
|
||||
wal_backup_launcher_tx,
|
||||
commit_lsn_watch_tx,
|
||||
commit_lsn_watch_rx,
|
||||
term_flush_lsn_watch_tx,
|
||||
term_flush_lsn_watch_rx,
|
||||
mutex: Mutex::new(SharedState::create_new(conf, &ttid, state)?),
|
||||
shared_state_version_tx,
|
||||
shared_state_version_rx,
|
||||
mutex: RwLock::new(SharedState::create_new(conf, &ttid, state)?),
|
||||
walsenders: WalSenders::new(walreceivers.clone()),
|
||||
walreceivers,
|
||||
cancellation_rx,
|
||||
cancellation_tx,
|
||||
cancel: CancellationToken::default(),
|
||||
gate: Gate::default(),
|
||||
timeline_dir: conf.timeline_dir(&ttid),
|
||||
walsenders_keep_horizon: conf.walsenders_keep_horizon,
|
||||
broker_active: AtomicBool::new(false),
|
||||
wal_backup_active: AtomicBool::new(false),
|
||||
})
|
||||
}
|
||||
|
||||
@@ -461,8 +441,9 @@ impl Timeline {
|
||||
/// and state on disk should remain unchanged.
|
||||
pub async fn init_new(
|
||||
self: &Arc<Timeline>,
|
||||
shared_state: &mut MutexGuard<'_, SharedState>,
|
||||
shared_state: &mut WriteGuardSharedState<'_>,
|
||||
conf: &SafeKeeperConf,
|
||||
broker_active_set: Arc<TimelinesSet>,
|
||||
) -> Result<()> {
|
||||
match fs::metadata(&self.timeline_dir).await {
|
||||
Ok(_) => {
|
||||
@@ -493,16 +474,35 @@ impl Timeline {
|
||||
|
||||
return Err(e);
|
||||
}
|
||||
self.bootstrap(conf);
|
||||
self.bootstrap(conf, broker_active_set);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Bootstrap new or existing timeline starting background stasks.
|
||||
pub fn bootstrap(self: &Arc<Timeline>, conf: &SafeKeeperConf) {
|
||||
/// Bootstrap new or existing timeline starting background tasks.
|
||||
pub fn bootstrap(
|
||||
self: &Arc<Timeline>,
|
||||
conf: &SafeKeeperConf,
|
||||
broker_active_set: Arc<TimelinesSet>,
|
||||
) {
|
||||
// Start manager task which will monitor timeline state and update
|
||||
// background tasks.
|
||||
let Ok(gate_guard) = self.gate.enter() else {
|
||||
// We were already shut down
|
||||
return;
|
||||
};
|
||||
|
||||
tokio::spawn(timeline_manager::main_task(
|
||||
self.clone(),
|
||||
conf.clone(),
|
||||
broker_active_set,
|
||||
gate_guard,
|
||||
));
|
||||
|
||||
// Start recovery task which always runs on the timeline.
|
||||
if conf.peer_recovery_enabled {
|
||||
tokio::spawn(recovery_main(self.clone(), conf.clone()));
|
||||
}
|
||||
// TODO: migrate to timeline_manager
|
||||
if conf.is_wal_backup_enabled() && conf.partial_backup_enabled {
|
||||
tokio::spawn(wal_backup_partial::main_task(self.clone(), conf.clone()));
|
||||
}
|
||||
@@ -515,12 +515,14 @@ impl Timeline {
|
||||
/// deletion API endpoint is retriable.
|
||||
pub async fn delete(
|
||||
&self,
|
||||
shared_state: &mut MutexGuard<'_, SharedState>,
|
||||
shared_state: &mut WriteGuardSharedState<'_>,
|
||||
only_local: bool,
|
||||
) -> Result<(bool, bool)> {
|
||||
let was_active = shared_state.active;
|
||||
) -> Result<bool> {
|
||||
self.cancel(shared_state);
|
||||
|
||||
// Make sure any background tasks are gone before we start deleting things from storage
|
||||
self.gate.close().await;
|
||||
|
||||
// TODO: It's better to wait for s3 offloader termination before
|
||||
// removing data from s3. Though since s3 doesn't have transactions it
|
||||
// still wouldn't guarantee absense of data after removal.
|
||||
@@ -532,20 +534,14 @@ impl Timeline {
|
||||
wal_backup::delete_timeline(&self.ttid).await?;
|
||||
}
|
||||
let dir_existed = delete_dir(&self.timeline_dir).await?;
|
||||
Ok((dir_existed, was_active))
|
||||
Ok(dir_existed)
|
||||
}
|
||||
|
||||
/// Cancel timeline to prevent further usage. Background tasks will stop
|
||||
/// eventually after receiving cancellation signal.
|
||||
///
|
||||
/// Note that we can't notify backup launcher here while holding
|
||||
/// shared_state lock, as this is a potential deadlock: caller is
|
||||
/// responsible for that. Generally we should probably make WAL backup tasks
|
||||
/// to shut down on their own, checking once in a while whether it is the
|
||||
/// time.
|
||||
fn cancel(&self, shared_state: &mut MutexGuard<'_, SharedState>) {
|
||||
fn cancel(&self, shared_state: &mut WriteGuardSharedState<'_>) {
|
||||
info!("timeline {} is cancelled", self.ttid);
|
||||
let _ = self.cancellation_tx.send(true);
|
||||
self.cancel.cancel();
|
||||
// Close associated FDs. Nobody will be able to touch timeline data once
|
||||
// it is cancelled, so WAL storage won't be opened again.
|
||||
shared_state.sk.wal_store.close();
|
||||
@@ -553,44 +549,16 @@ impl Timeline {
|
||||
|
||||
/// Returns if timeline is cancelled.
|
||||
pub fn is_cancelled(&self) -> bool {
|
||||
*self.cancellation_rx.borrow()
|
||||
}
|
||||
|
||||
/// Returns watch channel which gets value when timeline is cancelled. It is
|
||||
/// guaranteed to have not cancelled value observed (errors otherwise).
|
||||
pub fn get_cancellation_rx(&self) -> Result<watch::Receiver<bool>> {
|
||||
let rx = self.cancellation_rx.clone();
|
||||
if *rx.borrow() {
|
||||
bail!(TimelineError::Cancelled(self.ttid));
|
||||
}
|
||||
Ok(rx)
|
||||
self.cancel.is_cancelled()
|
||||
}
|
||||
|
||||
/// Take a writing mutual exclusive lock on timeline shared_state.
|
||||
pub async fn write_shared_state(&self) -> MutexGuard<SharedState> {
|
||||
self.mutex.lock().await
|
||||
pub async fn write_shared_state<'a>(self: &'a Arc<Self>) -> WriteGuardSharedState<'a> {
|
||||
WriteGuardSharedState::new(self.clone(), self.mutex.write().await)
|
||||
}
|
||||
|
||||
async fn update_status(&self, shared_state: &mut SharedState) -> bool {
|
||||
shared_state
|
||||
.update_status(self.walreceivers.get_num(), self.ttid)
|
||||
.await
|
||||
}
|
||||
|
||||
/// Update timeline status and kick wal backup launcher to stop/start offloading if needed.
|
||||
pub async fn update_status_notify(&self) -> Result<()> {
|
||||
if self.is_cancelled() {
|
||||
bail!(TimelineError::Cancelled(self.ttid));
|
||||
}
|
||||
let is_wal_backup_action_pending: bool = {
|
||||
let mut shared_state = self.write_shared_state().await;
|
||||
self.update_status(&mut shared_state).await
|
||||
};
|
||||
if is_wal_backup_action_pending {
|
||||
// Can fail only if channel to a static thread got closed, which is not normal at all.
|
||||
self.wal_backup_launcher_tx.send(self.ttid).await?;
|
||||
}
|
||||
Ok(())
|
||||
pub async fn read_shared_state(&self) -> ReadGuardSharedState {
|
||||
self.mutex.read().await
|
||||
}
|
||||
|
||||
/// Returns true if walsender should stop sending WAL to pageserver. We
|
||||
@@ -602,7 +570,7 @@ impl Timeline {
|
||||
if self.is_cancelled() {
|
||||
return true;
|
||||
}
|
||||
let shared_state = self.write_shared_state().await;
|
||||
let shared_state = self.read_shared_state().await;
|
||||
if self.walreceivers.get_num() == 0 {
|
||||
return shared_state.sk.state.inmem.commit_lsn == Lsn(0) || // no data at all yet
|
||||
reported_remote_consistent_lsn >= shared_state.sk.state.inmem.commit_lsn;
|
||||
@@ -610,9 +578,9 @@ impl Timeline {
|
||||
false
|
||||
}
|
||||
|
||||
/// Ensure taht current term is t, erroring otherwise, and lock the state.
|
||||
pub async fn acquire_term(&self, t: Term) -> Result<MutexGuard<SharedState>> {
|
||||
let ss = self.write_shared_state().await;
|
||||
/// Ensure that current term is t, erroring otherwise, and lock the state.
|
||||
pub async fn acquire_term(&self, t: Term) -> Result<ReadGuardSharedState> {
|
||||
let ss = self.read_shared_state().await;
|
||||
if ss.sk.state.acceptor_state.term != t {
|
||||
bail!(
|
||||
"failed to acquire term {}, current term {}",
|
||||
@@ -623,18 +591,6 @@ impl Timeline {
|
||||
Ok(ss)
|
||||
}
|
||||
|
||||
/// Returns whether s3 offloading is required and sets current status as
|
||||
/// matching it.
|
||||
pub async fn wal_backup_attend(&self) -> bool {
|
||||
if self.is_cancelled() {
|
||||
return false;
|
||||
}
|
||||
|
||||
self.write_shared_state()
|
||||
.await
|
||||
.wal_backup_attend(self.walreceivers.get_num())
|
||||
}
|
||||
|
||||
/// Returns commit_lsn watch channel.
|
||||
pub fn get_commit_lsn_watch_rx(&self) -> watch::Receiver<Lsn> {
|
||||
self.commit_lsn_watch_rx.clone()
|
||||
@@ -645,9 +601,14 @@ impl Timeline {
|
||||
self.term_flush_lsn_watch_rx.clone()
|
||||
}
|
||||
|
||||
/// Returns watch channel for SharedState update version.
|
||||
pub fn get_state_version_rx(&self) -> watch::Receiver<usize> {
|
||||
self.shared_state_version_rx.clone()
|
||||
}
|
||||
|
||||
/// Pass arrived message to the safekeeper.
|
||||
pub async fn process_msg(
|
||||
&self,
|
||||
self: &Arc<Self>,
|
||||
msg: &ProposerAcceptorMessage,
|
||||
) -> Result<Option<AcceptorProposerMessage>> {
|
||||
if self.is_cancelled() {
|
||||
@@ -655,8 +616,6 @@ impl Timeline {
|
||||
}
|
||||
|
||||
let mut rmsg: Option<AcceptorProposerMessage>;
|
||||
let commit_lsn: Lsn;
|
||||
let term_flush_lsn: TermLsn;
|
||||
{
|
||||
let mut shared_state = self.write_shared_state().await;
|
||||
rmsg = shared_state.sk.process_msg(msg).await?;
|
||||
@@ -665,43 +624,28 @@ impl Timeline {
|
||||
if let Some(AcceptorProposerMessage::AppendResponse(ref mut resp)) = rmsg {
|
||||
resp.hs_feedback = self.walsenders.get_hotstandby();
|
||||
}
|
||||
|
||||
commit_lsn = shared_state.sk.state.inmem.commit_lsn;
|
||||
term_flush_lsn =
|
||||
TermLsn::from((shared_state.sk.get_term(), shared_state.sk.flush_lsn()));
|
||||
}
|
||||
self.term_flush_lsn_watch_tx.send(term_flush_lsn)?;
|
||||
self.commit_lsn_watch_tx.send(commit_lsn)?;
|
||||
Ok(rmsg)
|
||||
}
|
||||
|
||||
/// Returns wal_seg_size.
|
||||
pub async fn get_wal_seg_size(&self) -> usize {
|
||||
self.write_shared_state().await.get_wal_seg_size()
|
||||
}
|
||||
|
||||
/// Returns true only if the timeline is loaded and active.
|
||||
pub async fn is_active(&self) -> bool {
|
||||
if self.is_cancelled() {
|
||||
return false;
|
||||
}
|
||||
|
||||
self.write_shared_state().await.active
|
||||
self.read_shared_state().await.get_wal_seg_size()
|
||||
}
|
||||
|
||||
/// Returns state of the timeline.
|
||||
pub async fn get_state(&self) -> (TimelineMemState, TimelinePersistentState) {
|
||||
let state = self.write_shared_state().await;
|
||||
let state = self.read_shared_state().await;
|
||||
(state.sk.state.inmem.clone(), state.sk.state.clone())
|
||||
}
|
||||
|
||||
/// Returns latest backup_lsn.
|
||||
pub async fn get_wal_backup_lsn(&self) -> Lsn {
|
||||
self.write_shared_state().await.sk.state.inmem.backup_lsn
|
||||
self.read_shared_state().await.sk.state.inmem.backup_lsn
|
||||
}
|
||||
|
||||
/// Sets backup_lsn to the given value.
|
||||
pub async fn set_wal_backup_lsn(&self, backup_lsn: Lsn) -> Result<()> {
|
||||
pub async fn set_wal_backup_lsn(self: &Arc<Self>, backup_lsn: Lsn) -> Result<()> {
|
||||
if self.is_cancelled() {
|
||||
bail!(TimelineError::Cancelled(self.ttid));
|
||||
}
|
||||
@@ -715,39 +659,33 @@ impl Timeline {
|
||||
|
||||
/// Get safekeeper info for broadcasting to broker and other peers.
|
||||
pub async fn get_safekeeper_info(&self, conf: &SafeKeeperConf) -> SafekeeperTimelineInfo {
|
||||
let shared_state = self.write_shared_state().await;
|
||||
let shared_state = self.read_shared_state().await;
|
||||
shared_state.get_safekeeper_info(&self.ttid, conf)
|
||||
}
|
||||
|
||||
/// Update timeline state with peer safekeeper data.
|
||||
pub async fn record_safekeeper_info(&self, sk_info: SafekeeperTimelineInfo) -> Result<()> {
|
||||
let is_wal_backup_action_pending: bool;
|
||||
let commit_lsn: Lsn;
|
||||
pub async fn record_safekeeper_info(
|
||||
self: &Arc<Self>,
|
||||
sk_info: SafekeeperTimelineInfo,
|
||||
) -> Result<()> {
|
||||
{
|
||||
let mut shared_state = self.write_shared_state().await;
|
||||
shared_state.sk.record_safekeeper_info(&sk_info).await?;
|
||||
let peer_info = PeerInfo::from_sk_info(&sk_info, Instant::now());
|
||||
shared_state.peers_info.upsert(&peer_info);
|
||||
is_wal_backup_action_pending = self.update_status(&mut shared_state).await;
|
||||
commit_lsn = shared_state.sk.state.inmem.commit_lsn;
|
||||
}
|
||||
self.commit_lsn_watch_tx.send(commit_lsn)?;
|
||||
// Wake up wal backup launcher, if it is time to stop the offloading.
|
||||
if is_wal_backup_action_pending {
|
||||
self.wal_backup_launcher_tx.send(self.ttid).await?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Update in memory remote consistent lsn.
|
||||
pub async fn update_remote_consistent_lsn(&self, candidate: Lsn) {
|
||||
pub async fn update_remote_consistent_lsn(self: &Arc<Self>, candidate: Lsn) {
|
||||
let mut shared_state = self.write_shared_state().await;
|
||||
shared_state.sk.state.inmem.remote_consistent_lsn =
|
||||
max(shared_state.sk.state.inmem.remote_consistent_lsn, candidate);
|
||||
}
|
||||
|
||||
pub async fn get_peers(&self, conf: &SafeKeeperConf) -> Vec<PeerInfo> {
|
||||
let shared_state = self.write_shared_state().await;
|
||||
let shared_state = self.read_shared_state().await;
|
||||
shared_state.get_peers(conf.heartbeat_timeout)
|
||||
}
|
||||
|
||||
@@ -769,7 +707,7 @@ impl Timeline {
|
||||
/// depending on assembled quorum (e.g. classic picture 8 from Raft paper).
|
||||
/// Thus we don't try to predict it here.
|
||||
pub async fn recovery_needed(&self, heartbeat_timeout: Duration) -> RecoveryNeededInfo {
|
||||
let ss = self.write_shared_state().await;
|
||||
let ss = self.read_shared_state().await;
|
||||
let term = ss.sk.state.acceptor_state.term;
|
||||
let last_log_term = ss.sk.get_epoch();
|
||||
let flush_lsn = ss.sk.flush_lsn();
|
||||
@@ -840,12 +778,12 @@ impl Timeline {
|
||||
|
||||
/// Returns flush_lsn.
|
||||
pub async fn get_flush_lsn(&self) -> Lsn {
|
||||
self.write_shared_state().await.sk.wal_store.flush_lsn()
|
||||
self.read_shared_state().await.sk.wal_store.flush_lsn()
|
||||
}
|
||||
|
||||
/// Delete WAL segments from disk that are no longer needed. This is determined
|
||||
/// based on pageserver's remote_consistent_lsn and local backup_lsn/peer_lsn.
|
||||
pub async fn remove_old_wal(&self, wal_backup_enabled: bool) -> Result<()> {
|
||||
pub async fn remove_old_wal(self: &Arc<Self>) -> Result<()> {
|
||||
if self.is_cancelled() {
|
||||
bail!(TimelineError::Cancelled(self.ttid));
|
||||
}
|
||||
@@ -861,9 +799,8 @@ impl Timeline {
|
||||
|
||||
let horizon_segno: XLogSegNo;
|
||||
let remover = {
|
||||
let shared_state = self.write_shared_state().await;
|
||||
horizon_segno =
|
||||
shared_state.get_horizon_segno(wal_backup_enabled, replication_horizon_lsn);
|
||||
let shared_state = self.read_shared_state().await;
|
||||
horizon_segno = shared_state.get_horizon_segno(replication_horizon_lsn);
|
||||
if horizon_segno <= 1 || horizon_segno <= shared_state.last_removed_segno {
|
||||
return Ok(()); // nothing to do
|
||||
}
|
||||
@@ -885,7 +822,7 @@ impl Timeline {
|
||||
/// passed after the last save. This helps to keep remote_consistent_lsn up
|
||||
/// to date so that storage nodes restart doesn't cause many pageserver ->
|
||||
/// safekeeper reconnections.
|
||||
pub async fn maybe_persist_control_file(&self) -> Result<()> {
|
||||
pub async fn maybe_persist_control_file(self: &Arc<Self>) -> Result<()> {
|
||||
self.write_shared_state()
|
||||
.await
|
||||
.sk
|
||||
@@ -893,38 +830,33 @@ impl Timeline {
|
||||
.await
|
||||
}
|
||||
|
||||
/// Gather timeline data for metrics. If the timeline is not active, returns
|
||||
/// None, we do not collect these.
|
||||
/// Gather timeline data for metrics.
|
||||
pub async fn info_for_metrics(&self) -> Option<FullTimelineInfo> {
|
||||
if self.is_cancelled() {
|
||||
return None;
|
||||
}
|
||||
|
||||
let (ps_feedback_count, last_ps_feedback) = self.walsenders.get_ps_feedback_stats();
|
||||
let state = self.write_shared_state().await;
|
||||
if state.active {
|
||||
Some(FullTimelineInfo {
|
||||
ttid: self.ttid,
|
||||
ps_feedback_count,
|
||||
last_ps_feedback,
|
||||
wal_backup_active: state.wal_backup_active,
|
||||
timeline_is_active: state.active,
|
||||
num_computes: self.walreceivers.get_num() as u32,
|
||||
last_removed_segno: state.last_removed_segno,
|
||||
epoch_start_lsn: state.sk.epoch_start_lsn,
|
||||
mem_state: state.sk.state.inmem.clone(),
|
||||
persisted_state: state.sk.state.clone(),
|
||||
flush_lsn: state.sk.wal_store.flush_lsn(),
|
||||
wal_storage: state.sk.wal_store.get_metrics(),
|
||||
})
|
||||
} else {
|
||||
None
|
||||
}
|
||||
let state = self.read_shared_state().await;
|
||||
Some(FullTimelineInfo {
|
||||
ttid: self.ttid,
|
||||
ps_feedback_count,
|
||||
last_ps_feedback,
|
||||
wal_backup_active: self.wal_backup_active.load(Ordering::Relaxed),
|
||||
timeline_is_active: self.broker_active.load(Ordering::Relaxed),
|
||||
num_computes: self.walreceivers.get_num() as u32,
|
||||
last_removed_segno: state.last_removed_segno,
|
||||
epoch_start_lsn: state.sk.epoch_start_lsn,
|
||||
mem_state: state.sk.state.inmem.clone(),
|
||||
persisted_state: state.sk.state.clone(),
|
||||
flush_lsn: state.sk.wal_store.flush_lsn(),
|
||||
wal_storage: state.sk.wal_store.get_metrics(),
|
||||
})
|
||||
}
|
||||
|
||||
/// Returns in-memory timeline state to build a full debug dump.
|
||||
pub async fn memory_dump(&self) -> debug_dump::Memory {
|
||||
let state = self.write_shared_state().await;
|
||||
let state = self.read_shared_state().await;
|
||||
|
||||
let (write_lsn, write_record_lsn, flush_lsn, file_open) =
|
||||
state.sk.wal_store.internal_state();
|
||||
@@ -933,8 +865,8 @@ impl Timeline {
|
||||
is_cancelled: self.is_cancelled(),
|
||||
peers_info_len: state.peers_info.0.len(),
|
||||
walsenders: self.walsenders.get_all(),
|
||||
wal_backup_active: state.wal_backup_active,
|
||||
active: state.active,
|
||||
wal_backup_active: self.wal_backup_active.load(Ordering::Relaxed),
|
||||
active: self.broker_active.load(Ordering::Relaxed),
|
||||
num_computes: self.walreceivers.get_num() as u32,
|
||||
last_removed_segno: state.last_removed_segno,
|
||||
epoch_start_lsn: state.sk.epoch_start_lsn,
|
||||
@@ -948,7 +880,7 @@ impl Timeline {
|
||||
|
||||
/// Apply a function to the control file state and persist it.
|
||||
pub async fn map_control_file<T>(
|
||||
&self,
|
||||
self: &Arc<Self>,
|
||||
f: impl FnOnce(&mut TimelinePersistentState) -> Result<T>,
|
||||
) -> Result<T> {
|
||||
let mut state = self.write_shared_state().await;
|
||||
|
||||
140
safekeeper/src/timeline_manager.rs
Normal file
140
safekeeper/src/timeline_manager.rs
Normal file
@@ -0,0 +1,140 @@
|
||||
use std::{sync::Arc, time::Duration};
|
||||
|
||||
use tracing::{info, instrument, warn};
|
||||
use utils::{lsn::Lsn, sync::gate::GateGuard};
|
||||
|
||||
use crate::{
|
||||
metrics::{MANAGER_ACTIVE_CHANGES, MANAGER_ITERATIONS_TOTAL},
|
||||
timeline::{PeerInfo, ReadGuardSharedState, Timeline},
|
||||
timelines_set::TimelinesSet,
|
||||
wal_backup::{self, WalBackupTaskHandle},
|
||||
SafeKeeperConf,
|
||||
};
|
||||
|
||||
pub struct StateSnapshot {
|
||||
pub commit_lsn: Lsn,
|
||||
pub backup_lsn: Lsn,
|
||||
pub remote_consistent_lsn: Lsn,
|
||||
pub peers: Vec<PeerInfo>,
|
||||
}
|
||||
|
||||
impl StateSnapshot {
|
||||
/// Create a new snapshot of the timeline state.
|
||||
fn new(read_guard: ReadGuardSharedState, heartbeat_timeout: Duration) -> Self {
|
||||
Self {
|
||||
commit_lsn: read_guard.sk.state.inmem.commit_lsn,
|
||||
backup_lsn: read_guard.sk.state.inmem.backup_lsn,
|
||||
remote_consistent_lsn: read_guard.sk.state.inmem.remote_consistent_lsn,
|
||||
peers: read_guard.get_peers(heartbeat_timeout),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Control how often the manager task should wake up to check updates.
|
||||
/// There is no need to check for updates more often than this.
|
||||
const REFRESH_INTERVAL: Duration = Duration::from_millis(300);
|
||||
|
||||
/// This task gets spawned alongside each timeline and is responsible for managing the timeline's
|
||||
/// background tasks.
|
||||
#[instrument(name = "manager", skip_all, fields(ttid = %tli.ttid))]
|
||||
pub async fn main_task(
|
||||
tli: Arc<Timeline>,
|
||||
conf: SafeKeeperConf,
|
||||
broker_active_set: Arc<TimelinesSet>,
|
||||
_gate_guard: GateGuard,
|
||||
) {
|
||||
scopeguard::defer! {
|
||||
if tli.is_cancelled() {
|
||||
info!("manager task finished");
|
||||
} else {
|
||||
warn!("manager task finished prematurely");
|
||||
}
|
||||
};
|
||||
|
||||
// sets whether timeline is active for broker pushes or not
|
||||
let mut tli_broker_active = broker_active_set.guard(tli.clone());
|
||||
|
||||
let ttid = tli.ttid;
|
||||
let wal_seg_size = tli.get_wal_seg_size().await;
|
||||
let heartbeat_timeout = conf.heartbeat_timeout;
|
||||
|
||||
let mut state_version_rx = tli.get_state_version_rx();
|
||||
|
||||
let walreceivers = tli.get_walreceivers();
|
||||
let mut num_computes_rx = walreceivers.get_num_rx();
|
||||
|
||||
// list of background tasks
|
||||
let mut backup_task: Option<WalBackupTaskHandle> = None;
|
||||
|
||||
let last_state = 'outer: loop {
|
||||
MANAGER_ITERATIONS_TOTAL.inc();
|
||||
|
||||
let state_snapshot = StateSnapshot::new(tli.read_shared_state().await, heartbeat_timeout);
|
||||
let num_computes = *num_computes_rx.borrow();
|
||||
|
||||
let is_wal_backup_required =
|
||||
wal_backup::is_wal_backup_required(wal_seg_size, num_computes, &state_snapshot);
|
||||
|
||||
if conf.is_wal_backup_enabled() {
|
||||
wal_backup::update_task(
|
||||
&conf,
|
||||
ttid,
|
||||
is_wal_backup_required,
|
||||
&state_snapshot,
|
||||
&mut backup_task,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
let is_active = is_wal_backup_required
|
||||
|| state_snapshot.remote_consistent_lsn < state_snapshot.commit_lsn;
|
||||
|
||||
// update the broker timeline set
|
||||
if tli_broker_active.set(is_active) {
|
||||
// write log if state has changed
|
||||
info!(
|
||||
"timeline active={} now, remote_consistent_lsn={}, commit_lsn={}",
|
||||
is_active, state_snapshot.remote_consistent_lsn, state_snapshot.commit_lsn,
|
||||
);
|
||||
|
||||
MANAGER_ACTIVE_CHANGES.inc();
|
||||
|
||||
if !is_active {
|
||||
// TODO: maybe use tokio::spawn?
|
||||
if let Err(e) = tli.maybe_persist_control_file().await {
|
||||
warn!("control file save in update_status failed: {:?}", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// update the state in Arc<Timeline>
|
||||
tli.wal_backup_active
|
||||
.store(is_wal_backup_required, std::sync::atomic::Ordering::SeqCst);
|
||||
tli.broker_active
|
||||
.store(is_active, std::sync::atomic::Ordering::SeqCst);
|
||||
|
||||
// wait until something changes. tx channels are stored under Arc, so they will not be
|
||||
// dropped until the manager task is finished.
|
||||
tokio::select! {
|
||||
_ = tli.cancel.cancelled() => {
|
||||
// timeline was deleted
|
||||
break 'outer state_snapshot;
|
||||
}
|
||||
_ = async {
|
||||
// don't wake up on every state change, but at most every REFRESH_INTERVAL
|
||||
tokio::time::sleep(REFRESH_INTERVAL).await;
|
||||
let _ = state_version_rx.changed().await;
|
||||
} => {
|
||||
// state was updated
|
||||
}
|
||||
_ = num_computes_rx.changed() => {
|
||||
// number of connected computes was updated
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
// shutdown background tasks
|
||||
if conf.is_wal_backup_enabled() {
|
||||
wal_backup::update_task(&conf, ttid, false, &last_state, &mut backup_task).await;
|
||||
}
|
||||
}
|
||||
@@ -4,6 +4,7 @@
|
||||
|
||||
use crate::safekeeper::ServerInfo;
|
||||
use crate::timeline::{Timeline, TimelineError};
|
||||
use crate::timelines_set::TimelinesSet;
|
||||
use crate::SafeKeeperConf;
|
||||
use anyhow::{bail, Context, Result};
|
||||
use camino::Utf8PathBuf;
|
||||
@@ -11,16 +12,16 @@ use once_cell::sync::Lazy;
|
||||
use serde::Serialize;
|
||||
use std::collections::HashMap;
|
||||
use std::str::FromStr;
|
||||
use std::sync::atomic::Ordering;
|
||||
use std::sync::{Arc, Mutex};
|
||||
use tokio::sync::mpsc::Sender;
|
||||
use tracing::*;
|
||||
use utils::id::{TenantId, TenantTimelineId, TimelineId};
|
||||
use utils::lsn::Lsn;
|
||||
|
||||
struct GlobalTimelinesState {
|
||||
timelines: HashMap<TenantTimelineId, Arc<Timeline>>,
|
||||
wal_backup_launcher_tx: Option<Sender<TenantTimelineId>>,
|
||||
conf: Option<SafeKeeperConf>,
|
||||
broker_active_set: Arc<TimelinesSet>,
|
||||
load_lock: Arc<tokio::sync::Mutex<TimelineLoadLock>>,
|
||||
}
|
||||
|
||||
@@ -36,11 +37,8 @@ impl GlobalTimelinesState {
|
||||
}
|
||||
|
||||
/// Get dependencies for a timeline constructor.
|
||||
fn get_dependencies(&self) -> (SafeKeeperConf, Sender<TenantTimelineId>) {
|
||||
(
|
||||
self.get_conf().clone(),
|
||||
self.wal_backup_launcher_tx.as_ref().unwrap().clone(),
|
||||
)
|
||||
fn get_dependencies(&self) -> (SafeKeeperConf, Arc<TimelinesSet>) {
|
||||
(self.get_conf().clone(), self.broker_active_set.clone())
|
||||
}
|
||||
|
||||
/// Insert timeline into the map. Returns error if timeline with the same id already exists.
|
||||
@@ -65,8 +63,8 @@ impl GlobalTimelinesState {
|
||||
static TIMELINES_STATE: Lazy<Mutex<GlobalTimelinesState>> = Lazy::new(|| {
|
||||
Mutex::new(GlobalTimelinesState {
|
||||
timelines: HashMap::new(),
|
||||
wal_backup_launcher_tx: None,
|
||||
conf: None,
|
||||
broker_active_set: Arc::new(TimelinesSet::default()),
|
||||
load_lock: Arc::new(tokio::sync::Mutex::new(TimelineLoadLock)),
|
||||
})
|
||||
});
|
||||
@@ -76,16 +74,11 @@ pub struct GlobalTimelines;
|
||||
|
||||
impl GlobalTimelines {
|
||||
/// Inject dependencies needed for the timeline constructors and load all timelines to memory.
|
||||
pub async fn init(
|
||||
conf: SafeKeeperConf,
|
||||
wal_backup_launcher_tx: Sender<TenantTimelineId>,
|
||||
) -> Result<()> {
|
||||
pub async fn init(conf: SafeKeeperConf) -> Result<()> {
|
||||
// clippy isn't smart enough to understand that drop(state) releases the
|
||||
// lock, so use explicit block
|
||||
let tenants_dir = {
|
||||
let mut state = TIMELINES_STATE.lock().unwrap();
|
||||
assert!(state.wal_backup_launcher_tx.is_none());
|
||||
state.wal_backup_launcher_tx = Some(wal_backup_launcher_tx);
|
||||
state.conf = Some(conf);
|
||||
|
||||
// Iterate through all directories and load tenants for all directories
|
||||
@@ -129,12 +122,9 @@ impl GlobalTimelines {
|
||||
/// this function is called during init when nothing else is running, so
|
||||
/// this is fine.
|
||||
async fn load_tenant_timelines(tenant_id: TenantId) -> Result<()> {
|
||||
let (conf, wal_backup_launcher_tx) = {
|
||||
let (conf, broker_active_set) = {
|
||||
let state = TIMELINES_STATE.lock().unwrap();
|
||||
(
|
||||
state.get_conf().clone(),
|
||||
state.wal_backup_launcher_tx.as_ref().unwrap().clone(),
|
||||
)
|
||||
state.get_dependencies()
|
||||
};
|
||||
|
||||
let timelines_dir = conf.tenant_dir(&tenant_id);
|
||||
@@ -147,7 +137,7 @@ impl GlobalTimelines {
|
||||
TimelineId::from_str(timeline_dir_entry.file_name().to_str().unwrap_or(""))
|
||||
{
|
||||
let ttid = TenantTimelineId::new(tenant_id, timeline_id);
|
||||
match Timeline::load_timeline(&conf, ttid, wal_backup_launcher_tx.clone()) {
|
||||
match Timeline::load_timeline(&conf, ttid) {
|
||||
Ok(timeline) => {
|
||||
let tli = Arc::new(timeline);
|
||||
TIMELINES_STATE
|
||||
@@ -155,8 +145,7 @@ impl GlobalTimelines {
|
||||
.unwrap()
|
||||
.timelines
|
||||
.insert(ttid, tli.clone());
|
||||
tli.bootstrap(&conf);
|
||||
tli.update_status_notify().await.unwrap();
|
||||
tli.bootstrap(&conf, broker_active_set.clone());
|
||||
}
|
||||
// If we can't load a timeline, it's most likely because of a corrupted
|
||||
// directory. We will log an error and won't allow to delete/recreate
|
||||
@@ -189,9 +178,9 @@ impl GlobalTimelines {
|
||||
_guard: &tokio::sync::MutexGuard<'a, TimelineLoadLock>,
|
||||
ttid: TenantTimelineId,
|
||||
) -> Result<Arc<Timeline>> {
|
||||
let (conf, wal_backup_launcher_tx) = TIMELINES_STATE.lock().unwrap().get_dependencies();
|
||||
let (conf, broker_active_set) = TIMELINES_STATE.lock().unwrap().get_dependencies();
|
||||
|
||||
match Timeline::load_timeline(&conf, ttid, wal_backup_launcher_tx) {
|
||||
match Timeline::load_timeline(&conf, ttid) {
|
||||
Ok(timeline) => {
|
||||
let tli = Arc::new(timeline);
|
||||
|
||||
@@ -202,7 +191,7 @@ impl GlobalTimelines {
|
||||
.timelines
|
||||
.insert(ttid, tli.clone());
|
||||
|
||||
tli.bootstrap(&conf);
|
||||
tli.bootstrap(&conf, broker_active_set);
|
||||
|
||||
Ok(tli)
|
||||
}
|
||||
@@ -221,6 +210,10 @@ impl GlobalTimelines {
|
||||
TIMELINES_STATE.lock().unwrap().get_conf().clone()
|
||||
}
|
||||
|
||||
pub fn get_global_broker_active_set() -> Arc<TimelinesSet> {
|
||||
TIMELINES_STATE.lock().unwrap().broker_active_set.clone()
|
||||
}
|
||||
|
||||
/// Create a new timeline with the given id. If the timeline already exists, returns
|
||||
/// an existing timeline.
|
||||
pub async fn create(
|
||||
@@ -229,7 +222,7 @@ impl GlobalTimelines {
|
||||
commit_lsn: Lsn,
|
||||
local_start_lsn: Lsn,
|
||||
) -> Result<Arc<Timeline>> {
|
||||
let (conf, wal_backup_launcher_tx) = {
|
||||
let (conf, broker_active_set) = {
|
||||
let state = TIMELINES_STATE.lock().unwrap();
|
||||
if let Ok(timeline) = state.get(&ttid) {
|
||||
// Timeline already exists, return it.
|
||||
@@ -243,7 +236,6 @@ impl GlobalTimelines {
|
||||
let timeline = Arc::new(Timeline::create_empty(
|
||||
&conf,
|
||||
ttid,
|
||||
wal_backup_launcher_tx,
|
||||
server_info,
|
||||
commit_lsn,
|
||||
local_start_lsn,
|
||||
@@ -264,7 +256,10 @@ impl GlobalTimelines {
|
||||
// Write the new timeline to the disk and start background workers.
|
||||
// Bootstrap is transactional, so if it fails, the timeline will be deleted,
|
||||
// and the state on disk should remain unchanged.
|
||||
if let Err(e) = timeline.init_new(&mut shared_state, &conf).await {
|
||||
if let Err(e) = timeline
|
||||
.init_new(&mut shared_state, &conf, broker_active_set)
|
||||
.await
|
||||
{
|
||||
// Note: the most likely reason for init failure is that the timeline
|
||||
// directory already exists on disk. This happens when timeline is corrupted
|
||||
// and wasn't loaded from disk on startup because of that. We want to preserve
|
||||
@@ -281,8 +276,6 @@ impl GlobalTimelines {
|
||||
// We are done with bootstrap, release the lock, return the timeline.
|
||||
// {} block forces release before .await
|
||||
}
|
||||
timeline.update_status_notify().await?;
|
||||
timeline.wal_backup_launcher_tx.send(timeline.ttid).await?;
|
||||
Ok(timeline)
|
||||
}
|
||||
|
||||
@@ -335,12 +328,13 @@ impl GlobalTimelines {
|
||||
let tli_res = TIMELINES_STATE.lock().unwrap().get(ttid);
|
||||
match tli_res {
|
||||
Ok(timeline) => {
|
||||
let was_active = timeline.broker_active.load(Ordering::Relaxed);
|
||||
|
||||
// Take a lock and finish the deletion holding this mutex.
|
||||
let mut shared_state = timeline.write_shared_state().await;
|
||||
|
||||
info!("deleting timeline {}, only_local={}", ttid, only_local);
|
||||
let (dir_existed, was_active) =
|
||||
timeline.delete(&mut shared_state, only_local).await?;
|
||||
let dir_existed = timeline.delete(&mut shared_state, only_local).await?;
|
||||
|
||||
// Remove timeline from the map.
|
||||
// FIXME: re-enable it once we fix the issue with recreation of deleted timelines
|
||||
@@ -349,7 +343,7 @@ impl GlobalTimelines {
|
||||
|
||||
Ok(TimelineDeleteForceResult {
|
||||
dir_existed,
|
||||
was_active,
|
||||
was_active, // TODO: we probably should remove this field
|
||||
})
|
||||
}
|
||||
Err(_) => {
|
||||
|
||||
90
safekeeper/src/timelines_set.rs
Normal file
90
safekeeper/src/timelines_set.rs
Normal file
@@ -0,0 +1,90 @@
|
||||
use std::{collections::HashMap, sync::Arc};
|
||||
|
||||
use utils::id::TenantTimelineId;
|
||||
|
||||
use crate::timeline::Timeline;
|
||||
|
||||
/// Set of timelines, supports operations:
|
||||
/// - add timeline
|
||||
/// - remove timeline
|
||||
/// - clone the set
|
||||
///
|
||||
/// Usually used for keeping subset of timelines. For example active timelines that require broker push.
|
||||
pub struct TimelinesSet {
|
||||
timelines: std::sync::Mutex<HashMap<TenantTimelineId, Arc<Timeline>>>,
|
||||
}
|
||||
|
||||
impl Default for TimelinesSet {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
timelines: std::sync::Mutex::new(HashMap::new()),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl TimelinesSet {
|
||||
pub fn insert(&self, tli: Arc<Timeline>) {
|
||||
self.timelines.lock().unwrap().insert(tli.ttid, tli);
|
||||
}
|
||||
|
||||
pub fn delete(&self, ttid: &TenantTimelineId) {
|
||||
self.timelines.lock().unwrap().remove(ttid);
|
||||
}
|
||||
|
||||
/// If present is true, adds timeline to the set, otherwise removes it.
|
||||
pub fn set_present(&self, tli: Arc<Timeline>, present: bool) {
|
||||
if present {
|
||||
self.insert(tli);
|
||||
} else {
|
||||
self.delete(&tli.ttid);
|
||||
}
|
||||
}
|
||||
|
||||
pub fn is_present(&self, ttid: &TenantTimelineId) -> bool {
|
||||
self.timelines.lock().unwrap().contains_key(ttid)
|
||||
}
|
||||
|
||||
/// Returns all timelines in the set.
|
||||
pub fn get_all(&self) -> Vec<Arc<Timeline>> {
|
||||
self.timelines.lock().unwrap().values().cloned().collect()
|
||||
}
|
||||
|
||||
/// Returns a timeline guard for easy presence control.
|
||||
pub fn guard(self: &Arc<Self>, tli: Arc<Timeline>) -> TimelineSetGuard {
|
||||
let is_present = self.is_present(&tli.ttid);
|
||||
TimelineSetGuard {
|
||||
timelines_set: self.clone(),
|
||||
tli,
|
||||
is_present,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Guard is used to add or remove timeline from the set.
|
||||
/// If the timeline present in set, it will be removed from it on drop.
|
||||
/// Note: do not use more than one guard for the same timeline, it caches the presence state.
|
||||
/// It is designed to be used in the manager task only.
|
||||
pub struct TimelineSetGuard {
|
||||
timelines_set: Arc<TimelinesSet>,
|
||||
tli: Arc<Timeline>,
|
||||
is_present: bool,
|
||||
}
|
||||
|
||||
impl TimelineSetGuard {
|
||||
/// Returns true if the state was changed.
|
||||
pub fn set(&mut self, present: bool) -> bool {
|
||||
if present == self.is_present {
|
||||
return false;
|
||||
}
|
||||
self.is_present = present;
|
||||
self.timelines_set.set_present(self.tli.clone(), present);
|
||||
true
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for TimelineSetGuard {
|
||||
fn drop(&mut self) {
|
||||
// remove timeline from the map on drop
|
||||
self.timelines_set.delete(&self.tli.ttid);
|
||||
}
|
||||
}
|
||||
@@ -9,7 +9,7 @@ use utils::backoff;
|
||||
use utils::id::NodeId;
|
||||
|
||||
use std::cmp::min;
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use std::collections::HashSet;
|
||||
use std::num::NonZeroU32;
|
||||
use std::pin::Pin;
|
||||
use std::sync::Arc;
|
||||
@@ -29,9 +29,10 @@ use tracing::*;
|
||||
|
||||
use utils::{id::TenantTimelineId, lsn::Lsn};
|
||||
|
||||
use crate::metrics::{BACKED_UP_SEGMENTS, BACKUP_ERRORS};
|
||||
use crate::metrics::{BACKED_UP_SEGMENTS, BACKUP_ERRORS, WAL_BACKUP_TASKS};
|
||||
use crate::timeline::{PeerInfo, Timeline};
|
||||
use crate::{GlobalTimelines, SafeKeeperConf};
|
||||
use crate::timeline_manager::StateSnapshot;
|
||||
use crate::{GlobalTimelines, SafeKeeperConf, WAL_BACKUP_RUNTIME};
|
||||
|
||||
use once_cell::sync::OnceCell;
|
||||
|
||||
@@ -41,35 +42,84 @@ const UPLOAD_FAILURE_RETRY_MAX_MS: u64 = 5000;
|
||||
/// Default buffer size when interfacing with [`tokio::fs::File`].
|
||||
const BUFFER_SIZE: usize = 32 * 1024;
|
||||
|
||||
/// Check whether wal backup is required for timeline. If yes, mark that launcher is
|
||||
/// aware of current status and return the timeline.
|
||||
async fn is_wal_backup_required(ttid: TenantTimelineId) -> Option<Arc<Timeline>> {
|
||||
match GlobalTimelines::get(ttid).ok() {
|
||||
Some(tli) => {
|
||||
tli.wal_backup_attend().await;
|
||||
Some(tli)
|
||||
}
|
||||
None => None,
|
||||
}
|
||||
}
|
||||
|
||||
struct WalBackupTaskHandle {
|
||||
pub struct WalBackupTaskHandle {
|
||||
shutdown_tx: Sender<()>,
|
||||
handle: JoinHandle<()>,
|
||||
}
|
||||
|
||||
struct WalBackupTimelineEntry {
|
||||
timeline: Arc<Timeline>,
|
||||
handle: Option<WalBackupTaskHandle>,
|
||||
/// Do we have anything to upload to S3, i.e. should safekeepers run backup activity?
|
||||
pub fn is_wal_backup_required(
|
||||
wal_seg_size: usize,
|
||||
num_computes: usize,
|
||||
state: &StateSnapshot,
|
||||
) -> bool {
|
||||
num_computes > 0 ||
|
||||
// Currently only the whole segment is offloaded, so compare segment numbers.
|
||||
(state.commit_lsn.segment_number(wal_seg_size) > state.backup_lsn.segment_number(wal_seg_size))
|
||||
}
|
||||
|
||||
async fn shut_down_task(ttid: TenantTimelineId, entry: &mut WalBackupTimelineEntry) {
|
||||
if let Some(wb_handle) = entry.handle.take() {
|
||||
/// Based on peer information determine which safekeeper should offload; if it
|
||||
/// is me, run (per timeline) task, if not yet. OTOH, if it is not me and task
|
||||
/// is running, kill it.
|
||||
pub async fn update_task(
|
||||
conf: &SafeKeeperConf,
|
||||
ttid: TenantTimelineId,
|
||||
need_backup: bool,
|
||||
state: &StateSnapshot,
|
||||
entry: &mut Option<WalBackupTaskHandle>,
|
||||
) {
|
||||
let (offloader, election_dbg_str) =
|
||||
determine_offloader(&state.peers, state.backup_lsn, ttid, conf);
|
||||
let elected_me = Some(conf.my_id) == offloader;
|
||||
|
||||
let should_task_run = need_backup && elected_me;
|
||||
|
||||
// start or stop the task
|
||||
if should_task_run != (entry.is_some()) {
|
||||
if should_task_run {
|
||||
info!("elected for backup: {}", election_dbg_str);
|
||||
|
||||
let (shutdown_tx, shutdown_rx) = mpsc::channel(1);
|
||||
let timeline_dir = conf.timeline_dir(&ttid);
|
||||
|
||||
let async_task = backup_task_main(
|
||||
ttid,
|
||||
timeline_dir,
|
||||
conf.workdir.clone(),
|
||||
conf.backup_parallel_jobs,
|
||||
shutdown_rx,
|
||||
);
|
||||
|
||||
let handle = if conf.current_thread_runtime {
|
||||
tokio::spawn(async_task)
|
||||
} else {
|
||||
WAL_BACKUP_RUNTIME.spawn(async_task)
|
||||
};
|
||||
|
||||
*entry = Some(WalBackupTaskHandle {
|
||||
shutdown_tx,
|
||||
handle,
|
||||
});
|
||||
} else {
|
||||
if !need_backup {
|
||||
// don't need backup at all
|
||||
info!("stepping down from backup, need_backup={}", need_backup);
|
||||
} else {
|
||||
// someone else has been elected
|
||||
info!("stepping down from backup: {}", election_dbg_str);
|
||||
}
|
||||
shut_down_task(entry).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn shut_down_task(entry: &mut Option<WalBackupTaskHandle>) {
|
||||
if let Some(wb_handle) = entry.take() {
|
||||
// Tell the task to shutdown. Error means task exited earlier, that's ok.
|
||||
let _ = wb_handle.shutdown_tx.send(()).await;
|
||||
// Await the task itself. TODO: restart panicked tasks earlier.
|
||||
if let Err(e) = wb_handle.handle.await {
|
||||
warn!("WAL backup task for {} panicked: {}", ttid, e);
|
||||
warn!("WAL backup task panicked: {}", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -126,49 +176,6 @@ fn determine_offloader(
|
||||
}
|
||||
}
|
||||
|
||||
/// Based on peer information determine which safekeeper should offload; if it
|
||||
/// is me, run (per timeline) task, if not yet. OTOH, if it is not me and task
|
||||
/// is running, kill it.
|
||||
async fn update_task(
|
||||
conf: &SafeKeeperConf,
|
||||
ttid: TenantTimelineId,
|
||||
entry: &mut WalBackupTimelineEntry,
|
||||
) {
|
||||
let alive_peers = entry.timeline.get_peers(conf).await;
|
||||
let wal_backup_lsn = entry.timeline.get_wal_backup_lsn().await;
|
||||
let (offloader, election_dbg_str) =
|
||||
determine_offloader(&alive_peers, wal_backup_lsn, ttid, conf);
|
||||
let elected_me = Some(conf.my_id) == offloader;
|
||||
|
||||
if elected_me != (entry.handle.is_some()) {
|
||||
if elected_me {
|
||||
info!("elected for backup: {}", election_dbg_str);
|
||||
|
||||
let (shutdown_tx, shutdown_rx) = mpsc::channel(1);
|
||||
let timeline_dir = conf.timeline_dir(&ttid);
|
||||
|
||||
let handle = tokio::spawn(
|
||||
backup_task_main(
|
||||
ttid,
|
||||
timeline_dir,
|
||||
conf.workdir.clone(),
|
||||
conf.backup_parallel_jobs,
|
||||
shutdown_rx,
|
||||
)
|
||||
.in_current_span(),
|
||||
);
|
||||
|
||||
entry.handle = Some(WalBackupTaskHandle {
|
||||
shutdown_tx,
|
||||
handle,
|
||||
});
|
||||
} else {
|
||||
info!("stepping down from backup: {}", election_dbg_str);
|
||||
shut_down_task(ttid, entry).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
static REMOTE_STORAGE: OnceCell<Option<GenericRemoteStorage>> = OnceCell::new();
|
||||
|
||||
// Storage must be configured and initialized when this is called.
|
||||
@@ -190,67 +197,6 @@ pub fn init_remote_storage(conf: &SafeKeeperConf) {
|
||||
});
|
||||
}
|
||||
|
||||
const CHECK_TASKS_INTERVAL_MSEC: u64 = 1000;
|
||||
|
||||
/// Sits on wal_backup_launcher_rx and starts/stops per timeline wal backup
|
||||
/// tasks. Having this in separate task simplifies locking, allows to reap
|
||||
/// panics and separate elections from offloading itself.
|
||||
pub async fn wal_backup_launcher_task_main(
|
||||
conf: SafeKeeperConf,
|
||||
mut wal_backup_launcher_rx: Receiver<TenantTimelineId>,
|
||||
) -> anyhow::Result<()> {
|
||||
info!(
|
||||
"WAL backup launcher started, remote config {:?}",
|
||||
conf.remote_storage
|
||||
);
|
||||
|
||||
// Presence in this map means launcher is aware s3 offloading is needed for
|
||||
// the timeline, but task is started only if it makes sense for to offload
|
||||
// from this safekeeper.
|
||||
let mut tasks: HashMap<TenantTimelineId, WalBackupTimelineEntry> = HashMap::new();
|
||||
|
||||
let mut ticker = tokio::time::interval(Duration::from_millis(CHECK_TASKS_INTERVAL_MSEC));
|
||||
loop {
|
||||
tokio::select! {
|
||||
ttid = wal_backup_launcher_rx.recv() => {
|
||||
// channel is never expected to get closed
|
||||
let ttid = ttid.unwrap();
|
||||
if !conf.is_wal_backup_enabled() {
|
||||
continue; /* just drain the channel and do nothing */
|
||||
}
|
||||
async {
|
||||
let timeline = is_wal_backup_required(ttid).await;
|
||||
// do we need to do anything at all?
|
||||
if timeline.is_some() != tasks.contains_key(&ttid) {
|
||||
if let Some(timeline) = timeline {
|
||||
// need to start the task
|
||||
let entry = tasks.entry(ttid).or_insert(WalBackupTimelineEntry {
|
||||
timeline,
|
||||
handle: None,
|
||||
});
|
||||
update_task(&conf, ttid, entry).await;
|
||||
} else {
|
||||
// need to stop the task
|
||||
info!("stopping WAL backup task");
|
||||
let mut entry = tasks.remove(&ttid).unwrap();
|
||||
shut_down_task(ttid, &mut entry).await;
|
||||
}
|
||||
}
|
||||
}.instrument(info_span!("WAL backup", ttid = %ttid)).await;
|
||||
}
|
||||
// For each timeline needing offloading, check if this safekeeper
|
||||
// should do the job and start/stop the task accordingly.
|
||||
_ = ticker.tick() => {
|
||||
for (ttid, entry) in tasks.iter_mut() {
|
||||
update_task(&conf, *ttid, entry)
|
||||
.instrument(info_span!("WAL backup", ttid = %ttid))
|
||||
.await;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
struct WalBackupTask {
|
||||
timeline: Arc<Timeline>,
|
||||
timeline_dir: Utf8PathBuf,
|
||||
@@ -261,6 +207,7 @@ struct WalBackupTask {
|
||||
}
|
||||
|
||||
/// Offload single timeline.
|
||||
#[instrument(name = "WAL backup", skip_all, fields(ttid = %ttid))]
|
||||
async fn backup_task_main(
|
||||
ttid: TenantTimelineId,
|
||||
timeline_dir: Utf8PathBuf,
|
||||
@@ -268,6 +215,8 @@ async fn backup_task_main(
|
||||
parallel_jobs: usize,
|
||||
mut shutdown_rx: Receiver<()>,
|
||||
) {
|
||||
let _guard = WAL_BACKUP_TASKS.guard();
|
||||
|
||||
info!("started");
|
||||
let res = GlobalTimelines::get(ttid);
|
||||
if let Err(e) = res {
|
||||
|
||||
@@ -277,14 +277,6 @@ pub async fn main_task(tli: Arc<Timeline>, conf: SafeKeeperConf) {
|
||||
debug!("started");
|
||||
let await_duration = conf.partial_backup_timeout;
|
||||
|
||||
let mut cancellation_rx = match tli.get_cancellation_rx() {
|
||||
Ok(rx) => rx,
|
||||
Err(_) => {
|
||||
info!("timeline canceled during task start");
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
// sleep for random time to avoid thundering herd
|
||||
{
|
||||
let randf64 = rand::thread_rng().gen_range(0.0..1.0);
|
||||
@@ -327,7 +319,7 @@ pub async fn main_task(tli: Arc<Timeline>, conf: SafeKeeperConf) {
|
||||
&& flush_lsn_rx.borrow().term == seg.term
|
||||
{
|
||||
tokio::select! {
|
||||
_ = cancellation_rx.changed() => {
|
||||
_ = backup.tli.cancel.cancelled() => {
|
||||
info!("timeline canceled");
|
||||
return;
|
||||
}
|
||||
@@ -340,7 +332,7 @@ pub async fn main_task(tli: Arc<Timeline>, conf: SafeKeeperConf) {
|
||||
// if we don't have any data and zero LSNs, wait for something
|
||||
while flush_lsn_rx.borrow().lsn == Lsn(0) {
|
||||
tokio::select! {
|
||||
_ = cancellation_rx.changed() => {
|
||||
_ = backup.tli.cancel.cancelled() => {
|
||||
info!("timeline canceled");
|
||||
return;
|
||||
}
|
||||
@@ -357,7 +349,7 @@ pub async fn main_task(tli: Arc<Timeline>, conf: SafeKeeperConf) {
|
||||
// waiting until timeout expires OR segno changes
|
||||
'inner: loop {
|
||||
tokio::select! {
|
||||
_ = cancellation_rx.changed() => {
|
||||
_ = backup.tli.cancel.cancelled() => {
|
||||
info!("timeline canceled");
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -575,7 +575,10 @@ def test_secondary_background_downloads(neon_env_builder: NeonEnvBuilder):
|
||||
tenant_timelines = {}
|
||||
|
||||
# This mirrors a constant in `downloader.rs`
|
||||
freshen_interval_secs = 60
|
||||
default_download_period_secs = 60
|
||||
|
||||
# The upload period, which will also be the download once the secondary has seen its first heatmap
|
||||
upload_period_secs = 20
|
||||
|
||||
for _i in range(0, tenant_count):
|
||||
tenant_id = TenantId.generate()
|
||||
@@ -587,7 +590,7 @@ def test_secondary_background_downloads(neon_env_builder: NeonEnvBuilder):
|
||||
placement_policy='{"Attached":1}',
|
||||
# Run with a low heatmap period so that we can avoid having to do synthetic API calls
|
||||
# to trigger the upload promptly.
|
||||
conf={"heatmap_period": "1s"},
|
||||
conf={"heatmap_period": f"{upload_period_secs}s"},
|
||||
)
|
||||
env.neon_cli.create_timeline("main2", tenant_id, timeline_b)
|
||||
|
||||
@@ -597,7 +600,7 @@ def test_secondary_background_downloads(neon_env_builder: NeonEnvBuilder):
|
||||
|
||||
# Wait long enough that the background downloads should happen; we expect all the inital layers
|
||||
# of all the initial timelines to show up on the secondary location of each tenant.
|
||||
time.sleep(freshen_interval_secs * 1.5)
|
||||
time.sleep(default_download_period_secs * 1.5)
|
||||
|
||||
for tenant_id, timelines in tenant_timelines.items():
|
||||
attached_to_id = env.storage_controller.locate(tenant_id)[0]["node_id"]
|
||||
@@ -613,8 +616,8 @@ def test_secondary_background_downloads(neon_env_builder: NeonEnvBuilder):
|
||||
# Delete the second timeline: this should be reflected later on the secondary
|
||||
env.storage_controller.pageserver_api().timeline_delete(tenant_id, timelines[1])
|
||||
|
||||
# Wait long enough for the secondary locations to see the deletion
|
||||
time.sleep(freshen_interval_secs * 1.5)
|
||||
# Wait long enough for the secondary locations to see the deletion: 2x period plus a grace factor
|
||||
time.sleep(upload_period_secs * 2.5)
|
||||
|
||||
for tenant_id, timelines in tenant_timelines.items():
|
||||
attached_to_id = env.storage_controller.locate(tenant_id)[0]["node_id"]
|
||||
@@ -626,6 +629,9 @@ def test_secondary_background_downloads(neon_env_builder: NeonEnvBuilder):
|
||||
assert ps_secondary.list_layers(tenant_id, timelines[0])
|
||||
|
||||
# This one was deleted
|
||||
log.info(
|
||||
f"Checking for secondary timeline deletion {tenant_id}/{timeline_id} on node {ps_secondary.id}"
|
||||
)
|
||||
assert not ps_secondary.list_layers(tenant_id, timelines[1])
|
||||
|
||||
t_end = time.time()
|
||||
@@ -640,7 +646,7 @@ def test_secondary_background_downloads(neon_env_builder: NeonEnvBuilder):
|
||||
|
||||
download_rate = (total_heatmap_downloads / tenant_count) / (t_end - t_start)
|
||||
|
||||
expect_download_rate = 1.0 / freshen_interval_secs
|
||||
expect_download_rate = 1.0 / upload_period_secs
|
||||
log.info(f"Download rate: {download_rate * 60}/min vs expected {expect_download_rate * 60}/min")
|
||||
|
||||
assert download_rate < expect_download_rate * 2
|
||||
|
||||
2
vendor/postgres-v14
vendored
2
vendor/postgres-v14
vendored
Submodule vendor/postgres-v14 updated: a6dc3f010d...d6f7e2c604
2
vendor/postgres-v15
vendored
2
vendor/postgres-v15
vendored
Submodule vendor/postgres-v15 updated: 6fd679f515...f0d6b0ef75
2
vendor/postgres-v16
vendored
2
vendor/postgres-v16
vendored
Submodule vendor/postgres-v16 updated: 1f63dd206a...8ef3c33aa0
6
vendor/revisions.json
vendored
6
vendor/revisions.json
vendored
@@ -1,5 +1,5 @@
|
||||
{
|
||||
"v16": ["16.2", "1f63dd206a8aaa4727baad334c548219c52878e1"],
|
||||
"v15": ["15.6", "6fd679f5154d12f4892ddd450cc6be28a8ac31b0"],
|
||||
"v14": ["14.11", "a6dc3f010da31472a7ae9ab0ddfbf6e49131d93c"]
|
||||
"v16": ["16.2", "8ef3c33aa01631e17cb24a122776349fcc777b46"],
|
||||
"v15": ["15.6", "f0d6b0ef7581bd78011832e23d8420a7d2c8a83a"],
|
||||
"v14": ["14.11", "d6f7e2c604bfc7cbc4c46bcea0a8e800f4bc778a"]
|
||||
}
|
||||
|
||||
@@ -51,7 +51,7 @@ num-bigint = { version = "0.4" }
|
||||
num-integer = { version = "0.1", features = ["i128"] }
|
||||
num-traits = { version = "0.2", features = ["i128", "libm"] }
|
||||
once_cell = { version = "1" }
|
||||
parquet = { git = "https://github.com/neondatabase/arrow-rs", branch = "neon-fix-bugs", default-features = false, features = ["zstd"] }
|
||||
parquet = { git = "https://github.com/apache/arrow-rs", branch = "master", default-features = false, features = ["zstd"] }
|
||||
prost = { version = "0.11" }
|
||||
rand = { version = "0.8", features = ["small_rng"] }
|
||||
regex = { version = "1" }
|
||||
@@ -102,7 +102,7 @@ num-bigint = { version = "0.4" }
|
||||
num-integer = { version = "0.1", features = ["i128"] }
|
||||
num-traits = { version = "0.2", features = ["i128", "libm"] }
|
||||
once_cell = { version = "1" }
|
||||
parquet = { git = "https://github.com/neondatabase/arrow-rs", branch = "neon-fix-bugs", default-features = false, features = ["zstd"] }
|
||||
parquet = { git = "https://github.com/apache/arrow-rs", branch = "master", default-features = false, features = ["zstd"] }
|
||||
prost = { version = "0.11" }
|
||||
regex = { version = "1" }
|
||||
regex-automata = { version = "0.4", default-features = false, features = ["dfa-onepass", "hybrid", "meta", "nfa-backtrack", "perf-inline", "perf-literal", "unicode"] }
|
||||
|
||||
Reference in New Issue
Block a user