Compare commits

..

36 Commits

Author SHA1 Message Date
Yuchen Liang
afa0fe0e87 Merge branch 'yuchen/direct-io-for-read' into yuchen/direct-io-for-read-test 2024-10-21 09:28:20 -04:00
Yuchen Liang
656ddbce2c Merge branch 'main' into yuchen/direct-io-for-read 2024-10-21 09:27:59 -04:00
Yuchen Liang
36850cb047 Merge branch 'yuchen/direct-io-for-read' into yuchen/direct-io-for-read-test 2024-10-18 14:22:41 -04:00
Yuchen Liang
a5a86bedb2 fix clippy
Signed-off-by: Yuchen Liang <yuchen@neon.tech>
2024-10-18 18:22:02 +00:00
Yuchen Liang
9653b64569 Merge branch 'yuchen/direct-io-for-read' into yuchen/direct-io-for-read-test 2024-10-18 14:08:26 -04:00
Yuchen Liang
e92d0fa83f Merge branch 'main' into yuchen/direct-io-for-read 2024-10-18 14:08:06 -04:00
Yuchen Liang
ad44e11a69 follow bytes::Bytes convention for AlignedBuffer
Signed-off-by: Yuchen Liang <yuchen@neon.tech>
2024-10-18 18:06:16 +00:00
Yuchen Liang
d99a61bd75 review: remove outdated todo
Signed-off-by: Yuchen Liang <yuchen@neon.tech>
2024-10-18 17:54:59 +00:00
Yuchen Liang
3b88998f8e review: clarify IoBuf safety comments
Signed-off-by: Yuchen Liang <yuchen@neon.tech>
2024-10-18 17:53:49 +00:00
Yuchen Liang
4b4a3edb80 review: remove allow(unused)
Signed-off-by: Yuchen Liang <yuchen@neon.tech>
2024-10-18 17:53:11 +00:00
Yuchen Liang
0d0ba568d2 test: use direct on linux
Signed-off-by: Yuchen Liang <yuchen@neon.tech>
2024-10-14 09:10:03 -04:00
Yuchen Liang
cb51ddc949 Merge branch 'main' into yuchen/direct-io-for-read 2024-10-14 00:41:57 -04:00
Yuchen Liang
156237d553 add more comments
Signed-off-by: Yuchen Liang <yuchen@neon.tech>
2024-10-11 17:30:49 -04:00
Yuchen Liang
136006907b Merge branch 'main' into yuchen/direct-io-for-read 2024-10-09 14:00:17 -04:00
Yuchen Liang
929c8d42d3 use io mode from config
Signed-off-by: Yuchen Liang <yuchen@neon.tech>
2024-10-09 17:36:30 +00:00
Yuchen Liang
e377177680 use IoBuffer instead of Bytes for inmemory_layer put_bytes
Signed-off-by: Yuchen Liang <yuchen@neon.tech>
2024-10-09 12:39:19 -04:00
Yuchen Liang
84b0902e6a fix clippy
Signed-off-by: Yuchen Liang <yuchen@neon.tech>
2024-10-09 12:32:52 -04:00
Yuchen Liang
8f9679ce95 refactor aligned buffer
Signed-off-by: Yuchen Liang <yuchen@neon.tech>
2024-10-09 10:30:25 -04:00
Yuchen Liang
62722e8559 fix clippy
Signed-off-by: Yuchen Liang <yuchen@neon.tech>
2024-10-09 09:39:24 -04:00
Yuchen Liang
b418c343e1 Merge branch 'main' into yuchen/direct-io-for-read 2024-10-09 09:20:47 -04:00
Yuchen Liang
1c61b68c3b use aligned buffer marker trait
Signed-off-by: Yuchen Liang <yuchen@neon.tech>
2024-10-08 17:25:57 -04:00
Yuchen Liang
84e2242673 use aligned buffer for inmemory layer
Signed-off-by: Yuchen Liang <yuchen@neon.tech>
2024-10-08 17:25:57 -04:00
Yuchen Liang
e9d9663fcd use aligned buffer for page cache
Signed-off-by: Yuchen Liang <yuchen@neon.tech>
2024-10-08 17:25:57 -04:00
Yuchen Liang
f28dd95022 use aligned buffer for image and delta layers
Signed-off-by: Yuchen Liang <yuchen@neon.tech>
2024-10-08 17:25:57 -04:00
Yuchen Liang
ee4600034e pageserver: implement aligned io buffer
Signed-off-by: Yuchen Liang <yuchen@neon.tech>
2024-10-08 17:25:57 -04:00
Yuchen Liang
12a4e33022 Merge branch 'main' into yuchen/virtual-file-config 2024-10-08 17:04:43 -04:00
Yuchen Liang
6d03e2810d review: use a inner and mode member for VirtualFile
Signed-off-by: Yuchen Liang <yuchen@neon.tech>
2024-10-08 17:21:58 +00:00
Yuchen Liang
4e1309475c review: clone open_options instead of taking mut
Signed-off-by: Yuchen Liang <yuchen@neon.tech>
2024-10-08 17:11:04 +00:00
Yuchen Liang
f1418cad52 Merge branch 'main' into yuchen/virtual-file-config 2024-10-07 15:15:26 -04:00
Yuchen Liang
a04cfd754b get rid of io_buffer_alignment config (always 512)
Signed-off-by: Yuchen Liang <yuchen@neon.tech>
2024-10-07 12:16:11 -04:00
Yuchen Liang
bc13310e56 Merge branch 'main' into yuchen/virtual-file-config 2024-10-07 11:49:13 -04:00
Yuchen Liang
5c76b2d474 fix put_io_mode to use the correct http endpoint
Signed-off-by: Yuchen Liang <yuchen@neon.tech>
2024-10-01 10:58:47 -04:00
Yuchen Liang
97f7b0b86f simplify virtual file wrapper
Signed-off-by: Yuchen Liang <yuchen@neon.tech>
2024-10-01 08:31:30 -04:00
Yuchen Liang
3a5b44ea53 add set_io_mode option to getpage_latest_lsn
Signed-off-by: Yuchen Liang <yuchen@neon.tech>
2024-10-01 08:16:18 -04:00
Yuchen Liang
95554c7377 fix clippy
Signed-off-by: Yuchen Liang <yuchen@neon.tech>
2024-10-01 07:59:15 -04:00
Yuchen Liang
a85bd88866 pageserver: add direct io config to virtual file
Signed-off-by: Yuchen Liang <yuchen@neon.tech>
2024-09-30 23:54:14 -04:00
21 changed files with 592 additions and 670 deletions

View File

@@ -860,24 +860,14 @@ ENV PATH="/home/nonroot/.cargo/bin:/usr/local/pgsql/bin/:$PATH"
USER nonroot
WORKDIR /home/nonroot
# Use new version only for v17
# TODO test and update pgrx for older versions too
RUN case "${PG_VERSION}" in \
"v17") \
export PGRX_VERSION=0.12.6 \
;; \
"v14" | "v15" | "v16") \
export PGRX_VERSION=0.11.3 \
;; \
*) \
echo "unexpected PostgreSQL version" && exit 1 \
;; \
RUN case "${PG_VERSION}" in "v17") \
echo "v17 is not supported yet by pgrx. Quit" && exit 0;; \
esac && \
curl -sSO https://static.rust-lang.org/rustup/dist/$(uname -m)-unknown-linux-gnu/rustup-init && \
chmod +x rustup-init && \
./rustup-init -y --no-modify-path --profile minimal --default-toolchain stable && \
rm rustup-init && \
cargo install --locked --version ${PGRX_VERSION} cargo-pgrx && \
cargo install --locked --version 0.11.3 cargo-pgrx && \
/bin/bash -c 'cargo pgrx init --pg${PG_VERSION:1}=/usr/local/pgsql/bin/pg_config'
USER root
@@ -892,41 +882,18 @@ USER root
FROM rust-extensions-build AS pg-jsonschema-pg-build
ARG PG_VERSION
# version 0.3.3 supports v17
# last release v0.3.3 - Oct 16, 2024
#
# there were no breaking changes
# so we can use the same version for all versions
# when we're ready to bump pgrx for everyone
RUN case "${PG_VERSION}" in \
"v17") \
export PG_JSONSCHEMA_VERSION=0.3.3 \
export PG_JSONSCHEMA_CHECKSUM=40c2cffab4187e0233cb8c3bde013be92218c282f95f4469c5282f6b30d64eac \
;; \
"v14" | "v15" | "v16") \
export PG_JSONSCHEMA_VERSION=0.3.1 \
export PG_JSONSCHEMA_CHECKSUM=61df3db1ed83cf24f6aa39c826f8818bfa4f0bd33b587fd6b2b1747985642297 \
;; \
*) \
echo "unexpected PostgreSQL version" && exit 1 \
;; \
RUN case "${PG_VERSION}" in "v17") \
echo "pg_jsonschema does not yet have a release that supports pg17" && exit 0;; \
esac && \
wget https://github.com/supabase/pg_jsonschema/archive/refs/tags/v${PG_JSONSCHEMA_VERSION}.tar.gz -O pg_jsonschema.tar.gz && \
echo "${PG_JSONSCHEMA_CHECKSUM} pg_jsonschema.tar.gz" | sha256sum --check && \
wget https://github.com/supabase/pg_jsonschema/archive/refs/tags/v0.3.1.tar.gz -O pg_jsonschema.tar.gz && \
echo "61df3db1ed83cf24f6aa39c826f8818bfa4f0bd33b587fd6b2b1747985642297 pg_jsonschema.tar.gz" | sha256sum --check && \
mkdir pg_jsonschema-src && cd pg_jsonschema-src && tar xzf ../pg_jsonschema.tar.gz --strip-components=1 -C . && \
# see commit 252b3685a27a0f4c31a0f91e983c6314838e89e8
# `unsafe-postgres` feature allows to build pgx extensions
# against postgres forks that decided to change their ABI name (like us).
# With that we can build extensions without forking them and using stock
# pgx. As this feature is new few manual version bumps were required.
case "${PG_VERSION}" in \
"v17") \
sed -i 's/pgrx = "0.12.6"/pgrx = { version = "0.12.6", features = [ "unsafe-postgres" ] }/g' Cargo.toml \
;; \
"v14" | "v15" | "v16") \
sed -i 's/pgrx = "0.11.3"/pgrx = { version = "0.11.3", features = [ "unsafe-postgres" ] }/g' Cargo.toml \
;; \
esac && \
sed -i 's/pgrx = "0.11.3"/pgrx = { version = "0.11.3", features = [ "unsafe-postgres" ] }/g' Cargo.toml && \
cargo pgrx install --release && \
echo "trusted = true" >> /usr/local/pgsql/share/extension/pg_jsonschema.control
@@ -940,36 +907,13 @@ RUN case "${PG_VERSION}" in \
FROM rust-extensions-build AS pg-graphql-pg-build
ARG PG_VERSION
# version 1.5.9 supports v17
# last release v1.5.9 - Oct 16, 2024
#
# looks like there were no breaking changes
# so we can use the same version for all versions
# when we're ready to bump pgrx for everyone
RUN case "${PG_VERSION}" in \
"v17") \
export PG_GRAPHQL_VERSION=1.5.9 \
export PG_GRAPHQL_CHECKSUM=cf768385a41278be1333472204fc0328118644ae443182cf52f7b9b23277e497 \
;; \
"v14" | "v15" | "v16") \
export PG_GRAPHQL_VERSION=1.5.7 \
export PG_GRAPHQL_CHECKSUM=2b3e567a5b31019cb97ae0e33263c1bcc28580be5a444ac4c8ece5c4be2aea41 \
;; \
*) \
echo "unexpected PostgreSQL version" && exit 1 \
;; \
RUN case "${PG_VERSION}" in "v17") \
echo "pg_graphql does not yet have a release that supports pg17 as of now" && exit 0;; \
esac && \
wget https://github.com/supabase/pg_graphql/archive/refs/tags/v${PG_GRAPHQL_VERSION}.tar.gz -O pg_graphql.tar.gz && \
echo "${PG_GRAPHQL_CHECKSUM} pg_graphql.tar.gz" | sha256sum --check && \
wget https://github.com/supabase/pg_graphql/archive/refs/tags/v1.5.7.tar.gz -O pg_graphql.tar.gz && \
echo "2b3e567a5b31019cb97ae0e33263c1bcc28580be5a444ac4c8ece5c4be2aea41 pg_graphql.tar.gz" | sha256sum --check && \
mkdir pg_graphql-src && cd pg_graphql-src && tar xzf ../pg_graphql.tar.gz --strip-components=1 -C . && \
case "${PG_VERSION}" in \
"v17") \
sed -i 's/pgrx = "0.12.6"/pgrx = { version = "0.12.6", features = [ "unsafe-postgres" ] }/g' Cargo.toml \
;; \
"v14" | "v15" | "v16") \
sed -i 's/pgrx = "0.11.3"/pgrx = { version = "0.11.3", features = [ "unsafe-postgres" ] }/g' Cargo.toml \
;; \
esac && \
sed -i 's/pgrx = "=0.11.3"/pgrx = { version = "0.11.3", features = [ "unsafe-postgres" ] }/g' Cargo.toml && \
cargo pgrx install --release && \
# it's needed to enable extension because it uses untrusted C language
sed -i 's/superuser = false/superuser = true/g' /usr/local/pgsql/share/extension/pg_graphql.control && \
@@ -986,10 +930,6 @@ FROM rust-extensions-build AS pg-tiktoken-pg-build
ARG PG_VERSION
# 26806147b17b60763039c6a6878884c41a262318 made on 26/09/2023
# doesn't support pg17 yet
# it depends on pgrx which doesn't support pg17
#
# Also, we need to start using releases instead of commits
RUN case "${PG_VERSION}" in "v17") \
echo "pg_tiktoken does not have versions, nor support for pg17" && exit 0;; \
esac && \
@@ -1012,9 +952,6 @@ RUN case "${PG_VERSION}" in "v17") \
FROM rust-extensions-build AS pg-pgx-ulid-build
ARG PG_VERSION
# doesn't support pg17 yet
# it depends on pgrx which doesn't support pg17
# https://github.com/pksunkara/pgx_ulid/issues/51
RUN case "${PG_VERSION}" in "v17") \
echo "pgx_ulid does not support pg17 as of the latest version (0.1.5)" && exit 0;; \
esac && \
@@ -1035,31 +972,13 @@ RUN case "${PG_VERSION}" in "v17") \
FROM rust-extensions-build AS pg-session-jwt-build
ARG PG_VERSION
# Both pgrx 0.12.6 and 0.11.3 are maintained in parallel, with identical extension versions and features.
RUN case "${PG_VERSION}" in \
"v17") \
export PG_SESSION_JWT_VERSION=0.1.2-v17 \
esport PG_SESSION_JWT_CHECKSUM=c8ecbed9cb8c6441bce5134a176002b043018adf9d05a08e457dda233090a86e \
;; \
"v14" | "v15" | "v16") \
export PG_SESSION_JWT_VERSION=0.1.2 \
esport PG_SESSION_JWT_CHECKSUM=837932a077888d5545fd54b0abcc79e5f8e37017c2769a930afc2f5c94df6f4e \
;; \
*) \
echo "unexpected PostgreSQL version" && exit 1 \
;; \
RUN case "${PG_VERSION}" in "v17") \
echo "pg_session_jwt does not yet have a release that supports pg17" && exit 0;; \
esac && \
wget https://github.com/neondatabase/pg_session_jwt/archive/refs/tags/v${PG_SESSION_JWT_VERSION}.tar.gz -O pg_session_jwt.tar.gz && \
echo "${PG_SESSION_JWT_CHECKSUM} pg_session_jwt.tar.gz" | sha256sum --check && \
wget https://github.com/neondatabase/pg_session_jwt/archive/e642528f429dd3f5403845a50191b78d434b84a6.tar.gz -O pg_session_jwt.tar.gz && \
echo "1a69210703cc91224785e59a0a67562dd9eed9a0914ac84b11447582ca0d5b93 pg_session_jwt.tar.gz" | sha256sum --check && \
mkdir pg_session_jwt-src && cd pg_session_jwt-src && tar xzf ../pg_session_jwt.tar.gz --strip-components=1 -C . && \
case "${PG_VERSION}" in \
"v17") \
sed -i 's/pgrx = "0.12.6"/pgrx = { version = "0.12.6", features = [ "unsafe-postgres" ] }/g' Cargo.toml \
;; \
"v14" | "v15" | "v16") \
sed -i 's/pgrx = "0.11.3"/pgrx = { version = "0.11.3", features = [ "unsafe-postgres" ] }/g' Cargo.toml \
;; \
esac && \
sed -i 's/pgrx = "=0.11.3"/pgrx = { version = "=0.11.3", features = [ "unsafe-postgres" ] }/g' Cargo.toml && \
cargo pgrx install --release
#########################################################################################

View File

@@ -684,23 +684,6 @@ pub struct TimelineArchivalConfigRequest {
pub state: TimelineArchivalState,
}
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct TimelinesInfoAndOffloaded {
pub timelines: Vec<TimelineInfo>,
pub offloaded: Vec<OffloadedTimelineInfo>,
}
/// Analog of [`TimelineInfo`] for offloaded timelines.
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct OffloadedTimelineInfo {
pub tenant_id: TenantShardId,
pub timeline_id: TimelineId,
/// Whether the timeline has a parent it has been branched off from or not
pub ancestor_timeline_id: Option<TimelineId>,
/// Whether to retain the branch lsn at the ancestor or not
pub ancestor_retain_lsn: Option<Lsn>,
}
/// This represents the output of the "timeline_detail" and "timeline_list" API calls.
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct TimelineInfo {
@@ -1030,6 +1013,12 @@ pub mod virtual_file {
}
impl IoMode {
#[cfg(target_os = "linux")]
pub const fn preferred() -> Self {
Self::Direct
}
#[cfg(target_os = "macos")]
pub const fn preferred() -> Self {
Self::Buffered
}

View File

@@ -26,7 +26,6 @@ use pageserver_api::models::LocationConfigListResponse;
use pageserver_api::models::LocationConfigMode;
use pageserver_api::models::LsnLease;
use pageserver_api::models::LsnLeaseRequest;
use pageserver_api::models::OffloadedTimelineInfo;
use pageserver_api::models::ShardParameters;
use pageserver_api::models::TenantDetails;
use pageserver_api::models::TenantLocationConfigRequest;
@@ -38,7 +37,6 @@ use pageserver_api::models::TenantShardSplitRequest;
use pageserver_api::models::TenantShardSplitResponse;
use pageserver_api::models::TenantSorting;
use pageserver_api::models::TimelineArchivalConfigRequest;
use pageserver_api::models::TimelinesInfoAndOffloaded;
use pageserver_api::models::TopTenantShardItem;
use pageserver_api::models::TopTenantShardsRequest;
use pageserver_api::models::TopTenantShardsResponse;
@@ -83,7 +81,6 @@ use crate::tenant::timeline::CompactFlags;
use crate::tenant::timeline::CompactionError;
use crate::tenant::timeline::Timeline;
use crate::tenant::GetTimelineError;
use crate::tenant::OffloadedTimeline;
use crate::tenant::{LogicalSizeCalculationCause, PageReconstructError};
use crate::{disk_usage_eviction_task, tenant};
use pageserver_api::models::{
@@ -480,22 +477,6 @@ async fn build_timeline_info_common(
Ok(info)
}
fn build_timeline_offloaded_info(offloaded: &Arc<OffloadedTimeline>) -> OffloadedTimelineInfo {
let &OffloadedTimeline {
tenant_shard_id,
timeline_id,
ancestor_retain_lsn,
ancestor_timeline_id,
..
} = offloaded.as_ref();
OffloadedTimelineInfo {
tenant_id: tenant_shard_id,
timeline_id,
ancestor_retain_lsn,
ancestor_timeline_id,
}
}
// healthcheck handler
async fn status_handler(
request: Request<Body>,
@@ -662,7 +643,7 @@ async fn timeline_list_handler(
)
.instrument(info_span!("build_timeline_info", timeline_id = %timeline.timeline_id))
.await
.context("Failed to build timeline info")
.context("Failed to convert tenant timeline {timeline_id} into the local one: {e:?}")
.map_err(ApiError::InternalServerError)?;
response_data.push(timeline_info);
@@ -677,62 +658,6 @@ async fn timeline_list_handler(
json_response(StatusCode::OK, response_data)
}
async fn timeline_and_offloaded_list_handler(
request: Request<Body>,
_cancel: CancellationToken,
) -> Result<Response<Body>, ApiError> {
let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
let include_non_incremental_logical_size: Option<bool> =
parse_query_param(&request, "include-non-incremental-logical-size")?;
let force_await_initial_logical_size: Option<bool> =
parse_query_param(&request, "force-await-initial-logical-size")?;
check_permission(&request, Some(tenant_shard_id.tenant_id))?;
let state = get_state(&request);
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
let response_data = async {
let tenant = state
.tenant_manager
.get_attached_tenant_shard(tenant_shard_id)?;
tenant.wait_to_become_active(ACTIVE_TENANT_TIMEOUT).await?;
let (timelines, offloadeds) = tenant.list_timelines_and_offloaded();
let mut timeline_infos = Vec::with_capacity(timelines.len());
for timeline in timelines {
let timeline_info = build_timeline_info(
&timeline,
include_non_incremental_logical_size.unwrap_or(false),
force_await_initial_logical_size.unwrap_or(false),
&ctx,
)
.instrument(info_span!("build_timeline_info", timeline_id = %timeline.timeline_id))
.await
.context("Failed to build timeline info")
.map_err(ApiError::InternalServerError)?;
timeline_infos.push(timeline_info);
}
let offloaded_infos = offloadeds
.into_iter()
.map(|offloaded| build_timeline_offloaded_info(&offloaded))
.collect::<Vec<_>>();
let res = TimelinesInfoAndOffloaded {
timelines: timeline_infos,
offloaded: offloaded_infos,
};
Ok::<TimelinesInfoAndOffloaded, ApiError>(res)
}
.instrument(info_span!("timeline_and_offloaded_list",
tenant_id = %tenant_shard_id.tenant_id,
shard_id = %tenant_shard_id.shard_slug()))
.await?;
json_response(StatusCode::OK, response_data)
}
async fn timeline_preserve_initdb_handler(
request: Request<Body>,
_cancel: CancellationToken,
@@ -3068,9 +2993,6 @@ pub fn make_router(
.get("/v1/tenant/:tenant_shard_id/timeline", |r| {
api_handler(r, timeline_list_handler)
})
.get("/v1/tenant/:tenant_shard_id/timeline_and_offloaded", |r| {
api_handler(r, timeline_and_offloaded_list_handler)
})
.post("/v1/tenant/:tenant_shard_id/timeline", |r| {
api_handler(r, timeline_create_handler)
})

View File

@@ -1755,7 +1755,7 @@ impl Tenant {
}
/// Lists timelines the tenant contains.
/// It's up to callers to omit certain timelines that are not considered ready for use.
/// Up to tenant's implementation to omit certain timelines that ar not considered ready for use.
pub fn list_timelines(&self) -> Vec<Arc<Timeline>> {
self.timelines
.lock()
@@ -1765,29 +1765,6 @@ impl Tenant {
.collect()
}
/// Lists timelines the tenant manages, including offloaded ones.
///
/// It's up to callers to omit certain timelines that are not considered ready for use.
pub fn list_timelines_and_offloaded(
&self,
) -> (Vec<Arc<Timeline>>, Vec<Arc<OffloadedTimeline>>) {
let timelines = self
.timelines
.lock()
.unwrap()
.values()
.map(Arc::clone)
.collect();
let offloaded = self
.timelines_offloaded
.lock()
.unwrap()
.values()
.map(Arc::clone)
.collect();
(timelines, offloaded)
}
pub fn list_timeline_ids(&self) -> Vec<TimelineId> {
self.timelines.lock().unwrap().keys().cloned().collect()
}

View File

@@ -515,8 +515,8 @@ impl DeltaLayerWriterInner {
) -> anyhow::Result<(PersistentLayerDesc, Utf8PathBuf)> {
let temp_path = self.path.clone();
let result = self.finish0(key_end, ctx).await;
if let Err(ref e) = result {
tracing::info!(%temp_path, "cleaning up temporary file after error during writing: {e}");
if result.is_err() {
tracing::info!(%temp_path, "cleaning up temporary file after error during writing");
if let Err(e) = std::fs::remove_file(&temp_path) {
tracing::warn!(error=%e, %temp_path, "error cleaning up temporary layer file after error during writing");
}

View File

@@ -827,25 +827,6 @@ impl ImageLayerWriterInner {
self,
ctx: &RequestContext,
end_key: Option<Key>,
) -> anyhow::Result<(PersistentLayerDesc, Utf8PathBuf)> {
let temp_path = self.path.clone();
let result = self.finish0(ctx, end_key).await;
if let Err(ref e) = result {
tracing::info!(%temp_path, "cleaning up temporary file after error during writing: {e}");
if let Err(e) = std::fs::remove_file(&temp_path) {
tracing::warn!(error=%e, %temp_path, "error cleaning up temporary layer file after error during writing");
}
}
result
}
///
/// Finish writing the image layer.
///
async fn finish0(
self,
ctx: &RequestContext,
end_key: Option<Key>,
) -> anyhow::Result<(PersistentLayerDesc, Utf8PathBuf)> {
let index_start_blk = self.blob_writer.size().div_ceil(PAGE_SZ as u64) as u32;

View File

@@ -42,7 +42,7 @@ impl SplitWriterResult {
pub struct SplitImageLayerWriter {
inner: ImageLayerWriter,
target_layer_size: u64,
generated_layer_writers: Vec<(ImageLayerWriter, PersistentLayerKey)>,
generated_layers: Vec<SplitWriterResult>,
conf: &'static PageServerConf,
timeline_id: TimelineId,
tenant_shard_id: TenantShardId,
@@ -71,7 +71,7 @@ impl SplitImageLayerWriter {
ctx,
)
.await?,
generated_layer_writers: Vec::new(),
generated_layers: Vec::new(),
conf,
timeline_id,
tenant_shard_id,
@@ -80,12 +80,18 @@ impl SplitImageLayerWriter {
})
}
pub async fn put_image(
pub async fn put_image_with_discard_fn<D, F>(
&mut self,
key: Key,
img: Bytes,
tline: &Arc<Timeline>,
ctx: &RequestContext,
) -> anyhow::Result<()> {
discard: D,
) -> anyhow::Result<()>
where
D: FnOnce(&PersistentLayerKey) -> F,
F: Future<Output = bool>,
{
// The current estimation is an upper bound of the space that the key/image could take
// because we did not consider compression in this estimation. The resulting image layer
// could be smaller than the target size.
@@ -102,83 +108,72 @@ impl SplitImageLayerWriter {
ctx,
)
.await?;
let prev_image_writer = std::mem::replace(&mut self.inner, next_image_writer);
let layer_key = PersistentLayerKey {
key_range: self.start_key..key,
lsn_range: PersistentLayerDesc::image_layer_lsn_range(self.lsn),
is_delta: false,
};
let prev_image_writer = std::mem::replace(&mut self.inner, next_image_writer);
self.start_key = key;
self.generated_layer_writers
.push((prev_image_writer, layer_key));
if discard(&layer_key).await {
drop(prev_image_writer);
self.generated_layers
.push(SplitWriterResult::Discarded(layer_key));
} else {
let (desc, path) = prev_image_writer.finish_with_end_key(key, ctx).await?;
let layer = Layer::finish_creating(self.conf, tline, desc, &path)?;
self.generated_layers
.push(SplitWriterResult::Produced(layer));
}
}
self.inner.put_image(key, img, ctx).await
}
#[cfg(test)]
pub async fn put_image(
&mut self,
key: Key,
img: Bytes,
tline: &Arc<Timeline>,
ctx: &RequestContext,
) -> anyhow::Result<()> {
self.put_image_with_discard_fn(key, img, tline, ctx, |_| async { false })
.await
}
pub(crate) async fn finish_with_discard_fn<D, F>(
self,
tline: &Arc<Timeline>,
ctx: &RequestContext,
end_key: Key,
discard_fn: D,
discard: D,
) -> anyhow::Result<Vec<SplitWriterResult>>
where
D: Fn(&PersistentLayerKey) -> F,
D: FnOnce(&PersistentLayerKey) -> F,
F: Future<Output = bool>,
{
let Self {
mut generated_layer_writers,
mut generated_layers,
inner,
..
} = self;
if inner.num_keys() != 0 {
let layer_key = PersistentLayerKey {
key_range: self.start_key..end_key,
lsn_range: PersistentLayerDesc::image_layer_lsn_range(self.lsn),
is_delta: false,
};
generated_layer_writers.push((inner, layer_key));
if inner.num_keys() == 0 {
return Ok(generated_layers);
}
let clean_up_layers = |generated_layers: Vec<SplitWriterResult>| {
for produced_layer in generated_layers {
if let SplitWriterResult::Produced(image_layer) = produced_layer {
let layer: Layer = image_layer.into();
layer.delete_on_drop();
}
}
let layer_key = PersistentLayerKey {
key_range: self.start_key..end_key,
lsn_range: PersistentLayerDesc::image_layer_lsn_range(self.lsn),
is_delta: false,
};
// BEGIN: catch every error and do the recovery in the below section
let mut generated_layers = Vec::new();
for (inner, layer_key) in generated_layer_writers {
if discard_fn(&layer_key).await {
generated_layers.push(SplitWriterResult::Discarded(layer_key));
} else {
let layer = match inner
.finish_with_end_key(layer_key.key_range.end, ctx)
.await
{
Ok((desc, path)) => {
match Layer::finish_creating(self.conf, tline, desc, &path) {
Ok(layer) => layer,
Err(e) => {
tokio::fs::remove_file(&path).await.ok();
clean_up_layers(generated_layers);
return Err(e);
}
}
}
Err(e) => {
// ImageLayerWriter::finish will clean up the temporary layer if anything goes wrong,
// so we don't need to remove it by ourselves.
clean_up_layers(generated_layers);
return Err(e);
}
};
generated_layers.push(SplitWriterResult::Produced(layer));
}
if discard(&layer_key).await {
generated_layers.push(SplitWriterResult::Discarded(layer_key));
} else {
let (desc, path) = inner.finish_with_end_key(end_key, ctx).await?;
let layer = Layer::finish_creating(self.conf, tline, desc, &path)?;
generated_layers.push(SplitWriterResult::Produced(layer));
}
// END: catch every error and do the recovery in the above section
Ok(generated_layers)
}
@@ -192,6 +187,11 @@ impl SplitImageLayerWriter {
self.finish_with_discard_fn(tline, ctx, end_key, |_| async { false })
.await
}
/// This function will be deprecated with #8841.
pub(crate) fn take(self) -> anyhow::Result<(Vec<SplitWriterResult>, ImageLayerWriter)> {
Ok((self.generated_layers, self.inner))
}
}
/// A delta writer that takes key-lsn-values and produces multiple delta layers.
@@ -296,16 +296,8 @@ impl SplitDeltaLayerWriter {
self.generated_layers
.push(SplitWriterResult::Discarded(layer_key));
} else {
// `finish` will remove the file if anything goes wrong, while we need to handle deleting temporary
// files for `finish_creating`.
let (desc, path) = prev_delta_writer.finish(key, ctx).await?;
let delta_layer = match Layer::finish_creating(self.conf, tline, desc, &path) {
Ok(layer) => layer,
Err(e) => {
tokio::fs::remove_file(&path).await.ok();
return Err(e);
}
};
let delta_layer = Layer::finish_creating(self.conf, tline, desc, &path)?;
self.generated_layers
.push(SplitWriterResult::Produced(delta_layer));
}
@@ -365,16 +357,8 @@ impl SplitDeltaLayerWriter {
if discard(&layer_key).await {
generated_layers.push(SplitWriterResult::Discarded(layer_key));
} else {
// `finish` will remove the file if anything goes wrong, while we need to handle deleting temporary
// files for `finish_creating`.
let (desc, path) = inner.finish(end_key, ctx).await?;
let delta_layer = match Layer::finish_creating(self.conf, tline, desc, &path) {
Ok(layer) => layer,
Err(e) => {
tokio::fs::remove_file(&path).await.ok();
return Err(e);
}
};
let delta_layer = Layer::finish_creating(self.conf, tline, desc, &path)?;
generated_layers.push(SplitWriterResult::Produced(delta_layer));
}
Ok(generated_layers)
@@ -463,7 +447,7 @@ mod tests {
.unwrap();
image_writer
.put_image(get_key(0), get_img(0), &ctx)
.put_image(get_key(0), get_img(0), &tline, &ctx)
.await
.unwrap();
let layers = image_writer
@@ -502,18 +486,14 @@ mod tests {
#[tokio::test]
async fn write_split() {
// Test the split writer with retaining all the layers we have produced (discard=false)
write_split_helper("split_writer_write_split", false).await;
}
#[tokio::test]
async fn write_split_discard() {
// Test the split writer with discarding all the layers we have produced (discard=true)
write_split_helper("split_writer_write_split_discard", true).await;
write_split_helper("split_writer_write_split_discard", false).await;
}
/// Test the image+delta writer by writing a large number of images and deltas. If discard is
/// set to true, all layers will be discarded.
async fn write_split_helper(harness_name: &'static str, discard: bool) {
let harness = TenantHarness::create(harness_name).await.unwrap();
let (tenant, ctx) = harness.load().await;
@@ -547,7 +527,9 @@ mod tests {
for i in 0..N {
let i = i as u32;
image_writer
.put_image(get_key(i), get_large_img(), &ctx)
.put_image_with_discard_fn(get_key(i), get_large_img(), &tline, &ctx, |_| async {
discard
})
.await
.unwrap();
delta_writer
@@ -563,54 +545,51 @@ mod tests {
.unwrap();
}
let image_layers = image_writer
.finish_with_discard_fn(&tline, &ctx, get_key(N as u32), |_| async { discard })
.finish(&tline, &ctx, get_key(N as u32))
.await
.unwrap();
let delta_layers = delta_writer
.finish_with_discard_fn(&tline, &ctx, |_| async { discard })
.await
.unwrap();
let image_layers = image_layers
.into_iter()
.map(|x| {
if discard {
x.into_discarded_layer()
} else {
x.into_resident_layer().layer_desc().key()
let delta_layers = delta_writer.finish(&tline, &ctx).await.unwrap();
if discard {
for layer in image_layers {
layer.into_discarded_layer();
}
for layer in delta_layers {
layer.into_discarded_layer();
}
} else {
let image_layers = image_layers
.into_iter()
.map(|x| x.into_resident_layer())
.collect_vec();
let delta_layers = delta_layers
.into_iter()
.map(|x| x.into_resident_layer())
.collect_vec();
assert_eq!(image_layers.len(), N / 512 + 1);
assert_eq!(delta_layers.len(), N / 512 + 1);
assert_eq!(
delta_layers.first().unwrap().layer_desc().key_range.start,
get_key(0)
);
assert_eq!(
delta_layers.last().unwrap().layer_desc().key_range.end,
get_key(N as u32)
);
for idx in 0..image_layers.len() {
assert_ne!(image_layers[idx].layer_desc().key_range.start, Key::MIN);
assert_ne!(image_layers[idx].layer_desc().key_range.end, Key::MAX);
assert_ne!(delta_layers[idx].layer_desc().key_range.start, Key::MIN);
assert_ne!(delta_layers[idx].layer_desc().key_range.end, Key::MAX);
if idx > 0 {
assert_eq!(
image_layers[idx - 1].layer_desc().key_range.end,
image_layers[idx].layer_desc().key_range.start
);
assert_eq!(
delta_layers[idx - 1].layer_desc().key_range.end,
delta_layers[idx].layer_desc().key_range.start
);
}
})
.collect_vec();
let delta_layers = delta_layers
.into_iter()
.map(|x| {
if discard {
x.into_discarded_layer()
} else {
x.into_resident_layer().layer_desc().key()
}
})
.collect_vec();
assert_eq!(image_layers.len(), N / 512 + 1);
assert_eq!(delta_layers.len(), N / 512 + 1);
assert_eq!(delta_layers.first().unwrap().key_range.start, get_key(0));
assert_eq!(
delta_layers.last().unwrap().key_range.end,
get_key(N as u32)
);
for idx in 0..image_layers.len() {
assert_ne!(image_layers[idx].key_range.start, Key::MIN);
assert_ne!(image_layers[idx].key_range.end, Key::MAX);
assert_ne!(delta_layers[idx].key_range.start, Key::MIN);
assert_ne!(delta_layers[idx].key_range.end, Key::MAX);
if idx > 0 {
assert_eq!(
image_layers[idx - 1].key_range.end,
image_layers[idx].key_range.start
);
assert_eq!(
delta_layers[idx - 1].key_range.end,
delta_layers[idx].key_range.start
);
}
}
}
@@ -650,11 +629,11 @@ mod tests {
.unwrap();
image_writer
.put_image(get_key(0), get_img(0), &ctx)
.put_image(get_key(0), get_img(0), &tline, &ctx)
.await
.unwrap();
image_writer
.put_image(get_key(1), get_large_img(), &ctx)
.put_image(get_key(1), get_large_img(), &tline, &ctx)
.await
.unwrap();
let layers = image_writer

View File

@@ -141,7 +141,9 @@ impl KeyHistoryRetention {
};
stat.produce_image_key(img);
if let Some(image_writer) = image_writer.as_mut() {
image_writer.put_image(key, img.clone(), ctx).await?;
image_writer
.put_image_with_discard_fn(key, img.clone(), tline, ctx, discard)
.await?;
} else {
delta_writer
.put_value_with_discard_fn(
@@ -2039,7 +2041,8 @@ impl Timeline {
.finish_with_discard_fn(self, ctx, Key::MAX, discard)
.await?
} else {
drop(writer);
let (layers, _) = writer.take()?;
assert!(layers.is_empty(), "image layers produced in dry run mode?");
Vec::new()
}
} else {

View File

@@ -161,9 +161,6 @@ pub(crate) enum Reason {
/// LockAlreadyTaken indicates that the we attempted to take a lock that was already taken.
#[serde(rename = "LOCK_ALREADY_TAKEN")]
LockAlreadyTaken,
/// ActiveEndpointsLimitExceeded indicates that the limit of concurrently active endpoints was exceeded.
#[serde(rename = "ACTIVE_ENDPOINTS_LIMIT_EXCEEDED")]
ActiveEndpointsLimitExceeded,
#[default]
#[serde(other)]
Unknown,
@@ -197,8 +194,7 @@ impl Reason {
| Reason::ComputeTimeQuotaExceeded
| Reason::WrittenDataQuotaExceeded
| Reason::DataTransferQuotaExceeded
| Reason::LogicalSizeQuotaExceeded
| Reason::ActiveEndpointsLimitExceeded => false,
| Reason::LogicalSizeQuotaExceeded => false,
// transitive error. control plane is currently busy
// but might be ready soon
Reason::RunningOperations

View File

@@ -87,8 +87,36 @@ pub(crate) mod errors {
Reason::ConcurrencyLimitReached => ErrorKind::ControlPlane,
Reason::LockAlreadyTaken => ErrorKind::ControlPlane,
Reason::RunningOperations => ErrorKind::ControlPlane,
Reason::ActiveEndpointsLimitExceeded => ErrorKind::ControlPlane,
Reason::Unknown => ErrorKind::ControlPlane,
Reason::Unknown => match &**e {
ControlPlaneError {
http_status_code:
http::StatusCode::NOT_FOUND | http::StatusCode::NOT_ACCEPTABLE,
..
} => crate::error::ErrorKind::User,
ControlPlaneError {
http_status_code: http::StatusCode::UNPROCESSABLE_ENTITY,
error,
..
} if error
.contains("compute time quota of non-primary branches is exceeded") =>
{
crate::error::ErrorKind::Quota
}
ControlPlaneError {
http_status_code: http::StatusCode::LOCKED,
error,
..
} if error.contains("quota exceeded")
|| error.contains("the limit for current plan reached") =>
{
crate::error::ErrorKind::Quota
}
ControlPlaneError {
http_status_code: http::StatusCode::TOO_MANY_REQUESTS,
..
} => crate::error::ErrorKind::ServiceRateLimit,
ControlPlaneError { .. } => crate::error::ErrorKind::ControlPlane,
},
},
ApiError::Transport(_) => crate::error::ErrorKind::ControlPlane,
}

View File

@@ -14,7 +14,6 @@ use metrics::{CounterPairAssoc, CounterPairVec, HyperLogLog, HyperLogLogVec};
use tokio::time::{self, Instant};
use crate::control_plane::messages::ColdStartInfo;
use crate::error::ErrorKind;
#[derive(MetricGroup)]
#[metric(new(thread_pool: Arc<ThreadPoolMetrics>))]
@@ -326,10 +325,23 @@ pub enum ConnectionFailureKind {
ComputeUncached,
}
#[derive(FixedCardinalityLabel, Copy, Clone)]
#[label(singleton = "kind")]
pub enum WakeupFailureKind {
BadComputeAddress,
ApiTransportError,
QuotaExceeded,
ApiConsoleLocked,
ApiConsoleBadRequest,
ApiConsoleOtherServerError,
ApiConsoleOtherError,
TimeoutError,
}
#[derive(LabelGroup)]
#[label(set = ConnectionFailuresBreakdownSet)]
pub struct ConnectionFailuresBreakdownGroup {
pub kind: ErrorKind,
pub kind: WakeupFailureKind,
pub retry: Bool,
}

View File

@@ -1,13 +1,15 @@
use hyper::StatusCode;
use tracing::{error, info, warn};
use super::connect_compute::ComputeConnectBackend;
use crate::config::RetryConfig;
use crate::context::RequestMonitoring;
use crate::control_plane::errors::WakeComputeError;
use crate::control_plane::messages::{ControlPlaneError, Reason};
use crate::control_plane::provider::CachedNodeInfo;
use crate::error::ReportableError;
use crate::metrics::{
ConnectOutcome, ConnectionFailuresBreakdownGroup, Metrics, RetriesMetricGroup, RetryType,
WakeupFailureKind,
};
use crate::proxy::retry::{retry_after, should_retry};
@@ -58,8 +60,62 @@ pub(crate) async fn wake_compute<B: ComputeConnectBackend>(
}
fn report_error(e: &WakeComputeError, retry: bool) {
let kind = e.get_error_kind();
use crate::control_plane::errors::ApiError;
let kind = match e {
WakeComputeError::BadComputeAddress(_) => WakeupFailureKind::BadComputeAddress,
WakeComputeError::ApiError(ApiError::Transport(_)) => WakeupFailureKind::ApiTransportError,
WakeComputeError::ApiError(ApiError::ControlPlane(e)) => match e.get_reason() {
Reason::RoleProtected => WakeupFailureKind::ApiConsoleBadRequest,
Reason::ResourceNotFound => WakeupFailureKind::ApiConsoleBadRequest,
Reason::ProjectNotFound => WakeupFailureKind::ApiConsoleBadRequest,
Reason::EndpointNotFound => WakeupFailureKind::ApiConsoleBadRequest,
Reason::BranchNotFound => WakeupFailureKind::ApiConsoleBadRequest,
Reason::RateLimitExceeded => WakeupFailureKind::ApiConsoleLocked,
Reason::NonDefaultBranchComputeTimeExceeded => WakeupFailureKind::QuotaExceeded,
Reason::ActiveTimeQuotaExceeded => WakeupFailureKind::QuotaExceeded,
Reason::ComputeTimeQuotaExceeded => WakeupFailureKind::QuotaExceeded,
Reason::WrittenDataQuotaExceeded => WakeupFailureKind::QuotaExceeded,
Reason::DataTransferQuotaExceeded => WakeupFailureKind::QuotaExceeded,
Reason::LogicalSizeQuotaExceeded => WakeupFailureKind::QuotaExceeded,
Reason::ConcurrencyLimitReached => WakeupFailureKind::ApiConsoleLocked,
Reason::LockAlreadyTaken => WakeupFailureKind::ApiConsoleLocked,
Reason::RunningOperations => WakeupFailureKind::ApiConsoleLocked,
Reason::Unknown => match **e {
ControlPlaneError {
http_status_code: StatusCode::LOCKED,
ref error,
..
} if error.contains("written data quota exceeded")
|| error.contains("the limit for current plan reached") =>
{
WakeupFailureKind::QuotaExceeded
}
ControlPlaneError {
http_status_code: StatusCode::UNPROCESSABLE_ENTITY,
ref error,
..
} if error.contains("compute time quota of non-primary branches is exceeded") => {
WakeupFailureKind::QuotaExceeded
}
ControlPlaneError {
http_status_code: StatusCode::LOCKED,
..
} => WakeupFailureKind::ApiConsoleLocked,
ControlPlaneError {
http_status_code: StatusCode::BAD_REQUEST,
..
} => WakeupFailureKind::ApiConsoleBadRequest,
ControlPlaneError {
http_status_code, ..
} if http_status_code.is_server_error() => {
WakeupFailureKind::ApiConsoleOtherServerError
}
ControlPlaneError { .. } => WakeupFailureKind::ApiConsoleOtherError,
},
},
WakeComputeError::TooManyConnections => WakeupFailureKind::ApiConsoleLocked,
WakeComputeError::TooManyConnectionAttempts(_) => WakeupFailureKind::TimeoutError,
};
Metrics::get()
.proxy
.connection_failures_breakdown

View File

@@ -66,25 +66,22 @@ impl FileStorage {
})
}
/// Create and reliably persist new control file at given location.
///
/// Note: we normally call this in temp directory for atomic init, so
/// interested in FileStorage as a result only in tests.
pub async fn create_new(
dir: Utf8PathBuf,
/// Create file storage for a new timeline, but don't persist it yet.
pub fn create_new(
timeline_dir: Utf8PathBuf,
conf: &SafeKeeperConf,
state: TimelinePersistentState,
) -> Result<FileStorage> {
// we don't support creating new timelines in offloaded state
assert!(matches!(state.eviction_state, EvictionState::Present));
let mut store = FileStorage {
timeline_dir: dir,
let store = FileStorage {
timeline_dir,
no_sync: conf.no_sync,
state: state.clone(),
state,
last_persist_at: Instant::now(),
};
store.persist(&state).await?;
Ok(store)
}
@@ -193,6 +190,8 @@ impl TimelinePersistentState {
impl Storage for FileStorage {
/// Persists state durably to the underlying storage.
///
/// For a description, see <https://lwn.net/Articles/457667/>.
async fn persist(&mut self, s: &TimelinePersistentState) -> Result<()> {
let _timer = PERSIST_CONTROL_FILE_SECONDS.start_timer();
@@ -270,7 +269,7 @@ mod test {
.await
.expect("failed to create timeline dir");
let state = TimelinePersistentState::empty();
let storage = FileStorage::create_new(timeline_dir, conf, state.clone()).await?;
let storage = FileStorage::create_new(timeline_dir, conf, state.clone())?;
Ok((storage, state))
}

View File

@@ -12,10 +12,10 @@ use tracing::{info, warn};
use utils::{id::TenantTimelineId, lsn::Lsn};
use crate::{
control_file::FileStorage,
control_file::{FileStorage, Storage},
pull_timeline::{create_temp_timeline_dir, load_temp_timeline, validate_temp_timeline},
state::TimelinePersistentState,
timeline::{Timeline, TimelineError, WalResidentTimeline},
timelines_global_map::{create_temp_timeline_dir, validate_temp_timeline},
wal_backup::copy_s3_segments,
wal_storage::{wal_file_paths, WalReader},
GlobalTimelines,
@@ -149,16 +149,17 @@ pub async fn handle_request(request: Request) -> Result<()> {
vec![],
request.until_lsn,
start_lsn,
)?;
);
new_state.timeline_start_lsn = start_lsn;
new_state.peer_horizon_lsn = request.until_lsn;
new_state.backup_lsn = new_backup_lsn;
FileStorage::create_new(tli_dir_path.clone(), conf, new_state.clone()).await?;
let mut file_storage = FileStorage::create_new(tli_dir_path.clone(), conf, new_state.clone())?;
file_storage.persist(&new_state).await?;
// now we have a ready timeline in a temp directory
validate_temp_timeline(conf, request.destination_ttid, &tli_dir_path).await?;
GlobalTimelines::load_temp_timeline(request.destination_ttid, &tli_dir_path, true).await?;
load_temp_timeline(conf, request.destination_ttid, &tli_dir_path).await?;
Ok(())
}

View File

@@ -1,6 +1,7 @@
use anyhow::{anyhow, bail, Context, Result};
use bytes::Bytes;
use camino::Utf8PathBuf;
use camino_tempfile::Utf8TempDir;
use chrono::{DateTime, Utc};
use futures::{SinkExt, StreamExt, TryStreamExt};
use postgres_ffi::{XLogFileName, XLogSegNo, PG_TLI};
@@ -8,6 +9,7 @@ use serde::{Deserialize, Serialize};
use std::{
cmp::min,
io::{self, ErrorKind},
sync::Arc,
};
use tokio::{fs::OpenOptions, io::AsyncWrite, sync::mpsc, task};
use tokio_tar::{Archive, Builder, Header};
@@ -18,7 +20,7 @@ use tokio_util::{
use tracing::{error, info, instrument};
use crate::{
control_file::CONTROL_FILE_NAME,
control_file::{self, CONTROL_FILE_NAME},
debug_dump,
http::{
client::{self, Client},
@@ -26,14 +28,13 @@ use crate::{
},
safekeeper::Term,
state::TimelinePersistentState,
timeline::WalResidentTimeline,
timelines_global_map::{create_temp_timeline_dir, validate_temp_timeline},
timeline::{get_tenant_dir, get_timeline_dir, Timeline, TimelineError, WalResidentTimeline},
wal_backup,
wal_storage::open_wal_file,
GlobalTimelines,
wal_storage::{self, open_wal_file, Storage},
GlobalTimelines, SafeKeeperConf,
};
use utils::{
crashsafe::fsync_async_opt,
crashsafe::{durable_rename, fsync_async_opt},
id::{NodeId, TenantId, TenantTimelineId, TimelineId},
logging::SecretString,
lsn::Lsn,
@@ -427,9 +428,100 @@ async fn pull_timeline(
assert!(status.commit_lsn <= status.flush_lsn);
// Finally, load the timeline.
let _tli = GlobalTimelines::load_temp_timeline(ttid, &tli_dir_path, false).await?;
let _tli = load_temp_timeline(conf, ttid, &tli_dir_path).await?;
Ok(Response {
safekeeper_host: host,
})
}
/// Create temp directory for a new timeline. It needs to be located on the same
/// filesystem as the rest of the timelines. It will be automatically deleted when
/// Utf8TempDir goes out of scope.
pub async fn create_temp_timeline_dir(
conf: &SafeKeeperConf,
ttid: TenantTimelineId,
) -> Result<(Utf8TempDir, Utf8PathBuf)> {
// conf.workdir is usually /storage/safekeeper/data
// will try to transform it into /storage/safekeeper/tmp
let temp_base = conf
.workdir
.parent()
.ok_or(anyhow::anyhow!("workdir has no parent"))?
.join("tmp");
tokio::fs::create_dir_all(&temp_base).await?;
let tli_dir = camino_tempfile::Builder::new()
.suffix("_temptli")
.prefix(&format!("{}_{}_", ttid.tenant_id, ttid.timeline_id))
.tempdir_in(temp_base)?;
let tli_dir_path = tli_dir.path().to_path_buf();
Ok((tli_dir, tli_dir_path))
}
/// Do basic validation of a temp timeline, before moving it to the global map.
pub async fn validate_temp_timeline(
conf: &SafeKeeperConf,
ttid: TenantTimelineId,
path: &Utf8PathBuf,
) -> Result<(Lsn, Lsn)> {
let control_path = path.join("safekeeper.control");
let control_store = control_file::FileStorage::load_control_file(control_path)?;
if control_store.server.wal_seg_size == 0 {
bail!("wal_seg_size is not set");
}
let wal_store = wal_storage::PhysicalStorage::new(&ttid, path.clone(), conf, &control_store)?;
let commit_lsn = control_store.commit_lsn;
let flush_lsn = wal_store.flush_lsn();
Ok((commit_lsn, flush_lsn))
}
/// Move timeline from a temp directory to the main storage, and load it to the global map.
///
/// This operation is done under a lock to prevent bugs if several concurrent requests are
/// trying to load the same timeline. Note that it doesn't guard against creating the
/// timeline with the same ttid, but no one should be doing this anyway.
pub async fn load_temp_timeline(
conf: &SafeKeeperConf,
ttid: TenantTimelineId,
tmp_path: &Utf8PathBuf,
) -> Result<Arc<Timeline>> {
// Take a lock to prevent concurrent loadings
let load_lock = GlobalTimelines::loading_lock().await;
let guard = load_lock.lock().await;
if !matches!(GlobalTimelines::get(ttid), Err(TimelineError::NotFound(_))) {
bail!("timeline already exists, cannot overwrite it")
}
// Move timeline dir to the correct location
let timeline_path = get_timeline_dir(conf, &ttid);
info!(
"moving timeline {} from {} to {}",
ttid, tmp_path, timeline_path
);
tokio::fs::create_dir_all(get_tenant_dir(conf, &ttid.tenant_id)).await?;
// fsync tenant dir creation
fsync_async_opt(&conf.workdir, !conf.no_sync).await?;
durable_rename(tmp_path, &timeline_path, !conf.no_sync).await?;
let tli = GlobalTimelines::load_timeline(&guard, ttid)
.await
.context("Failed to load timeline after copy")?;
info!(
"loaded timeline {}, flush_lsn={}",
ttid,
tli.get_flush_lsn().await
);
Ok(tli)
}

View File

@@ -339,8 +339,7 @@ impl<'a, IO: AsyncRead + AsyncWrite + Unpin> NetworkReader<'a, IO> {
};
let tli =
GlobalTimelines::create(self.ttid, server_info, Lsn::INVALID, Lsn::INVALID)
.await
.context("create timeline")?;
.await?;
tli.wal_residence_guard().await?
}
_ => {

View File

@@ -3,7 +3,7 @@
use std::{cmp::max, ops::Deref};
use anyhow::{bail, Result};
use anyhow::Result;
use safekeeper_api::models::TimelineTermBumpResponse;
use serde::{Deserialize, Serialize};
use utils::{
@@ -13,11 +13,7 @@ use utils::{
use crate::{
control_file,
safekeeper::{
AcceptorState, PersistedPeerInfo, PgUuid, ServerInfo, Term, TermHistory,
UNKNOWN_SERVER_VERSION,
},
timeline::TimelineError,
safekeeper::{AcceptorState, PersistedPeerInfo, PgUuid, ServerInfo, Term, TermHistory},
wal_backup_partial::{self},
};
@@ -95,24 +91,8 @@ impl TimelinePersistentState {
peers: Vec<NodeId>,
commit_lsn: Lsn,
local_start_lsn: Lsn,
) -> anyhow::Result<TimelinePersistentState> {
if server_info.wal_seg_size == 0 {
bail!(TimelineError::UninitializedWalSegSize(*ttid));
}
if server_info.pg_version == UNKNOWN_SERVER_VERSION {
bail!(TimelineError::UninitialinzedPgVersion(*ttid));
}
if commit_lsn < local_start_lsn {
bail!(
"commit_lsn {} is smaller than local_start_lsn {}",
commit_lsn,
local_start_lsn
);
}
Ok(TimelinePersistentState {
) -> TimelinePersistentState {
TimelinePersistentState {
tenant_id: ttid.tenant_id,
timeline_id: ttid.timeline_id,
acceptor_state: AcceptorState {
@@ -135,23 +115,24 @@ impl TimelinePersistentState {
),
partial_backup: wal_backup_partial::State::default(),
eviction_state: EvictionState::Present,
})
}
}
#[cfg(test)]
pub fn empty() -> Self {
use crate::safekeeper::UNKNOWN_SERVER_VERSION;
TimelinePersistentState::new(
&TenantTimelineId::empty(),
ServerInfo {
pg_version: 17, /* Postgres server version */
system_id: 0, /* Postgres system identifier */
wal_seg_size: 16 * 1024 * 1024,
pg_version: UNKNOWN_SERVER_VERSION, /* Postgres server version */
system_id: 0, /* Postgres system identifier */
wal_seg_size: 0,
},
vec![],
Lsn::INVALID,
Lsn::INVALID,
)
.unwrap()
}
}

View File

@@ -27,11 +27,11 @@ use utils::{
use storage_broker::proto::SafekeeperTimelineInfo;
use storage_broker::proto::TenantTimelineId as ProtoTenantTimelineId;
use crate::control_file;
use crate::rate_limit::RateLimiter;
use crate::receive_wal::WalReceivers;
use crate::safekeeper::{
AcceptorProposerMessage, ProposerAcceptorMessage, SafeKeeper, Term, TermLsn,
AcceptorProposerMessage, ProposerAcceptorMessage, SafeKeeper, ServerInfo, Term, TermLsn,
INVALID_TERM,
};
use crate::send_wal::WalSenders;
use crate::state::{EvictionState, TimelineMemState, TimelinePersistentState, TimelineState};
@@ -40,6 +40,7 @@ use crate::timeline_manager::{AtomicStatus, ManagerCtl};
use crate::timelines_set::TimelinesSet;
use crate::wal_backup::{self, remote_timeline_path};
use crate::wal_backup_partial::PartialRemoteSegment;
use crate::{control_file, safekeeper::UNKNOWN_SERVER_VERSION};
use crate::metrics::{FullTimelineInfo, WalStorageMetrics, MISC_OPERATION_SECONDS};
use crate::wal_storage::{Storage as wal_storage_iface, WalReader};
@@ -325,6 +326,44 @@ pub struct SharedState {
}
impl SharedState {
/// Initialize fresh timeline state without persisting anything to disk.
fn create_new(
conf: &SafeKeeperConf,
ttid: &TenantTimelineId,
state: TimelinePersistentState,
) -> Result<Self> {
if state.server.wal_seg_size == 0 {
bail!(TimelineError::UninitializedWalSegSize(*ttid));
}
if state.server.pg_version == UNKNOWN_SERVER_VERSION {
bail!(TimelineError::UninitialinzedPgVersion(*ttid));
}
if state.commit_lsn < state.local_start_lsn {
bail!(
"commit_lsn {} is higher than local_start_lsn {}",
state.commit_lsn,
state.local_start_lsn
);
}
// We don't want to write anything to disk, because we may have existing timeline there.
// These functions should not change anything on disk.
let timeline_dir = get_timeline_dir(conf, ttid);
let control_store =
control_file::FileStorage::create_new(timeline_dir.clone(), conf, state)?;
let wal_store =
wal_storage::PhysicalStorage::new(ttid, timeline_dir, conf, &control_store)?;
let sk = SafeKeeper::new(TimelineState::new(control_store), wal_store, conf.my_id)?;
Ok(Self {
sk: StateSK::Loaded(sk),
peers_info: PeersInfo(vec![]),
wal_removal_on_hold: false,
})
}
/// Restore SharedState from control file. If file doesn't exist, bails out.
fn restore(conf: &SafeKeeperConf, ttid: &TenantTimelineId) -> Result<Self> {
let timeline_dir = get_timeline_dir(conf, ttid);
@@ -411,8 +450,6 @@ pub enum TimelineError {
Cancelled(TenantTimelineId),
#[error("Timeline {0} was not found in global map")]
NotFound(TenantTimelineId),
#[error("Timeline {0} creation is in progress")]
CreationInProgress(TenantTimelineId),
#[error("Timeline {0} exists on disk, but wasn't loaded on startup")]
Invalid(TenantTimelineId),
#[error("Timeline {0} is already exists")]
@@ -477,7 +514,7 @@ pub struct Timeline {
impl Timeline {
/// Load existing timeline from disk.
pub fn load_timeline(conf: &SafeKeeperConf, ttid: TenantTimelineId) -> Result<Arc<Timeline>> {
pub fn load_timeline(conf: &SafeKeeperConf, ttid: TenantTimelineId) -> Result<Timeline> {
let _enter = info_span!("load_timeline", timeline = %ttid.timeline_id).entered();
let shared_state = SharedState::restore(conf, &ttid)?;
@@ -491,7 +528,7 @@ impl Timeline {
let walreceivers = WalReceivers::new();
let remote_path = remote_timeline_path(&ttid)?;
Ok(Arc::new(Timeline {
Ok(Timeline {
ttid,
remote_path,
commit_lsn_watch_tx,
@@ -510,7 +547,47 @@ impl Timeline {
wal_backup_active: AtomicBool::new(false),
last_removed_segno: AtomicU64::new(0),
mgr_status: AtomicStatus::new(),
}))
})
}
/// Create a new timeline, which is not yet persisted to disk.
pub fn create_empty(
conf: &SafeKeeperConf,
ttid: TenantTimelineId,
server_info: ServerInfo,
commit_lsn: Lsn,
local_start_lsn: Lsn,
) -> Result<Timeline> {
let (commit_lsn_watch_tx, commit_lsn_watch_rx) = watch::channel(Lsn::INVALID);
let (term_flush_lsn_watch_tx, term_flush_lsn_watch_rx) =
watch::channel(TermLsn::from((INVALID_TERM, Lsn::INVALID)));
let (shared_state_version_tx, shared_state_version_rx) = watch::channel(0);
let state =
TimelinePersistentState::new(&ttid, server_info, vec![], commit_lsn, local_start_lsn);
let walreceivers = WalReceivers::new();
let remote_path = remote_timeline_path(&ttid)?;
Ok(Timeline {
ttid,
remote_path,
commit_lsn_watch_tx,
commit_lsn_watch_rx,
term_flush_lsn_watch_tx,
term_flush_lsn_watch_rx,
shared_state_version_tx,
shared_state_version_rx,
mutex: RwLock::new(SharedState::create_new(conf, &ttid, state)?),
walsenders: WalSenders::new(walreceivers.clone()),
walreceivers,
cancel: CancellationToken::default(),
timeline_dir: get_timeline_dir(conf, &ttid),
manager_ctl: ManagerCtl::new(),
broker_active: AtomicBool::new(false),
wal_backup_active: AtomicBool::new(false),
last_removed_segno: AtomicU64::new(0),
mgr_status: AtomicStatus::new(),
})
}
/// Initialize fresh timeline on disk and start background tasks. If init

View File

@@ -5,14 +5,11 @@
use crate::defaults::DEFAULT_EVICTION_CONCURRENCY;
use crate::rate_limit::RateLimiter;
use crate::safekeeper::ServerInfo;
use crate::state::TimelinePersistentState;
use crate::timeline::{get_tenant_dir, get_timeline_dir, Timeline, TimelineError};
use crate::timelines_set::TimelinesSet;
use crate::wal_storage::Storage;
use crate::{control_file, wal_storage, SafeKeeperConf};
use crate::SafeKeeperConf;
use anyhow::{bail, Context, Result};
use camino::Utf8PathBuf;
use camino_tempfile::Utf8TempDir;
use once_cell::sync::Lazy;
use serde::Serialize;
use std::collections::HashMap;
@@ -20,22 +17,12 @@ use std::str::FromStr;
use std::sync::atomic::Ordering;
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};
use tokio::fs;
use tracing::*;
use utils::crashsafe::{durable_rename, fsync_async_opt};
use utils::id::{TenantId, TenantTimelineId, TimelineId};
use utils::lsn::Lsn;
// Timeline entry in the global map: either a ready timeline, or mark that it is
// being created.
#[derive(Clone)]
enum GlobalMapTimeline {
CreationInProgress,
Timeline(Arc<Timeline>),
}
struct GlobalTimelinesState {
timelines: HashMap<TenantTimelineId, GlobalMapTimeline>,
timelines: HashMap<TenantTimelineId, Arc<Timeline>>,
// A tombstone indicates this timeline used to exist has been deleted. These are used to prevent
// on-demand timeline creation from recreating deleted timelines. This is only soft-enforced, as
@@ -44,9 +31,13 @@ struct GlobalTimelinesState {
conf: Option<SafeKeeperConf>,
broker_active_set: Arc<TimelinesSet>,
load_lock: Arc<tokio::sync::Mutex<TimelineLoadLock>>,
global_rate_limiter: RateLimiter,
}
// Used to prevent concurrent timeline loading.
pub struct TimelineLoadLock;
impl GlobalTimelinesState {
/// Get configuration, which must be set once during init.
fn get_conf(&self) -> &SafeKeeperConf {
@@ -64,16 +55,22 @@ impl GlobalTimelinesState {
)
}
/// Get timeline from the map. Returns error if timeline doesn't exist or
/// creation is in progress.
fn get(&self, ttid: &TenantTimelineId) -> Result<Arc<Timeline>, TimelineError> {
match self.timelines.get(ttid).cloned() {
Some(GlobalMapTimeline::Timeline(tli)) => Ok(tli),
Some(GlobalMapTimeline::CreationInProgress) => {
Err(TimelineError::CreationInProgress(*ttid))
}
None => Err(TimelineError::NotFound(*ttid)),
/// Insert timeline into the map. Returns error if timeline with the same id already exists.
fn try_insert(&mut self, timeline: Arc<Timeline>) -> Result<()> {
let ttid = timeline.ttid;
if self.timelines.contains_key(&ttid) {
bail!(TimelineError::AlreadyExists(ttid));
}
self.timelines.insert(ttid, timeline);
Ok(())
}
/// Get timeline from the map. Returns error if timeline doesn't exist.
fn get(&self, ttid: &TenantTimelineId) -> Result<Arc<Timeline>, TimelineError> {
self.timelines
.get(ttid)
.cloned()
.ok_or(TimelineError::NotFound(*ttid))
}
fn delete(&mut self, ttid: TenantTimelineId) {
@@ -88,6 +85,7 @@ static TIMELINES_STATE: Lazy<Mutex<GlobalTimelinesState>> = Lazy::new(|| {
tombstones: HashMap::new(),
conf: None,
broker_active_set: Arc::new(TimelinesSet::default()),
load_lock: Arc::new(tokio::sync::Mutex::new(TimelineLoadLock)),
global_rate_limiter: RateLimiter::new(1, 1),
})
});
@@ -143,10 +141,11 @@ impl GlobalTimelines {
/// Loads all timelines for the given tenant to memory. Returns fs::read_dir
/// errors if any.
///
/// It is async, but TIMELINES_STATE lock is sync and there is no important
/// reason to make it async (it is always held for a short while), so we
/// just lock and unlock it for each timeline -- this function is called
/// during init when nothing else is running, so this is fine.
/// It is async for update_status_notify sake. Since TIMELINES_STATE lock is
/// sync and there is no important reason to make it async (it is always
/// held for a short while) we just lock and unlock it for each timeline --
/// this function is called during init when nothing else is running, so
/// this is fine.
async fn load_tenant_timelines(tenant_id: TenantId) -> Result<()> {
let (conf, broker_active_set, partial_backup_rate_limiter) = {
let state = TIMELINES_STATE.lock().unwrap();
@@ -164,13 +163,14 @@ impl GlobalTimelines {
{
let ttid = TenantTimelineId::new(tenant_id, timeline_id);
match Timeline::load_timeline(&conf, ttid) {
Ok(tli) => {
Ok(timeline) => {
let tli = Arc::new(timeline);
let mut shared_state = tli.write_shared_state().await;
TIMELINES_STATE
.lock()
.unwrap()
.timelines
.insert(ttid, GlobalMapTimeline::Timeline(tli.clone()));
.insert(ttid, tli.clone());
tli.bootstrap(
&mut shared_state,
&conf,
@@ -199,6 +199,51 @@ impl GlobalTimelines {
Ok(())
}
/// Take a lock for timeline loading.
pub async fn loading_lock() -> Arc<tokio::sync::Mutex<TimelineLoadLock>> {
TIMELINES_STATE.lock().unwrap().load_lock.clone()
}
/// Load timeline from disk to the memory.
pub async fn load_timeline<'a>(
_guard: &tokio::sync::MutexGuard<'a, TimelineLoadLock>,
ttid: TenantTimelineId,
) -> Result<Arc<Timeline>> {
let (conf, broker_active_set, partial_backup_rate_limiter) =
TIMELINES_STATE.lock().unwrap().get_dependencies();
match Timeline::load_timeline(&conf, ttid) {
Ok(timeline) => {
let tli = Arc::new(timeline);
let mut shared_state = tli.write_shared_state().await;
// TODO: prevent concurrent timeline creation/loading
{
let mut state = TIMELINES_STATE.lock().unwrap();
// We may be have been asked to load a timeline that was previously deleted (e.g. from `pull_timeline.rs`). We trust
// that the human doing this manual intervention knows what they are doing, and remove its tombstone.
if state.tombstones.remove(&ttid).is_some() {
warn!("Un-deleted timeline {ttid}");
}
state.timelines.insert(ttid, tli.clone());
}
tli.bootstrap(
&mut shared_state,
&conf,
broker_active_set,
partial_backup_rate_limiter,
);
drop(shared_state);
Ok(tli)
}
// If we can't load a timeline, it's bad. Caller will figure it out.
Err(e) => bail!("failed to load timeline {}, reason: {:?}", ttid, e),
}
}
/// Get the number of timelines in the map.
pub fn timelines_count() -> usize {
TIMELINES_STATE.lock().unwrap().timelines.len()
@@ -221,7 +266,7 @@ impl GlobalTimelines {
commit_lsn: Lsn,
local_start_lsn: Lsn,
) -> Result<Arc<Timeline>> {
let (conf, _, _) = {
let (conf, broker_active_set, partial_backup_rate_limiter) = {
let state = TIMELINES_STATE.lock().unwrap();
if let Ok(timeline) = state.get(&ttid) {
// Timeline already exists, return it.
@@ -237,146 +282,55 @@ impl GlobalTimelines {
info!("creating new timeline {}", ttid);
// Do on disk initialization in tmp dir.
let (_tmp_dir, tmp_dir_path) = create_temp_timeline_dir(&conf, ttid).await?;
let timeline = Arc::new(Timeline::create_empty(
&conf,
ttid,
server_info,
commit_lsn,
local_start_lsn,
)?);
// TODO: currently we create only cfile. It would be reasonable to
// immediately initialize first WAL segment as well.
let state =
TimelinePersistentState::new(&ttid, server_info, vec![], commit_lsn, local_start_lsn)?;
control_file::FileStorage::create_new(tmp_dir_path.clone(), &conf, state).await?;
let timeline = GlobalTimelines::load_temp_timeline(ttid, &tmp_dir_path, true).await?;
Ok(timeline)
}
// Take a lock and finish the initialization holding this mutex. No other threads
// can interfere with creation after we will insert timeline into the map.
{
let mut shared_state = timeline.write_shared_state().await;
/// Move timeline from a temp directory to the main storage, and load it to
/// the global map. Creating timeline in this way ensures atomicity: rename
/// is atomic, so either move of the whole datadir succeeds or it doesn't,
/// but corrupted data dir shouldn't be possible.
///
/// We'd like to avoid holding map lock while doing IO, so it's a 3 step
/// process:
/// 1) check the global map that timeline doesn't exist and mark that we're
/// creating it;
/// 2) move the directory and load the timeline
/// 3) take lock again and insert the timeline into the global map.
pub async fn load_temp_timeline(
ttid: TenantTimelineId,
tmp_path: &Utf8PathBuf,
check_tombstone: bool,
) -> Result<Arc<Timeline>> {
// Check for existence and mark that we're creating it.
let (conf, broker_active_set, partial_backup_rate_limiter) = {
let mut state = TIMELINES_STATE.lock().unwrap();
match state.timelines.get(&ttid) {
Some(GlobalMapTimeline::CreationInProgress) => {
bail!(TimelineError::CreationInProgress(ttid));
}
Some(GlobalMapTimeline::Timeline(_)) => {
bail!(TimelineError::AlreadyExists(ttid));
}
_ => {}
}
if check_tombstone {
if state.tombstones.contains_key(&ttid) {
anyhow::bail!("timeline {ttid} is deleted, refusing to recreate");
}
} else {
// We may be have been asked to load a timeline that was previously deleted (e.g. from `pull_timeline.rs`). We trust
// that the human doing this manual intervention knows what they are doing, and remove its tombstone.
if state.tombstones.remove(&ttid).is_some() {
warn!("un-deleted timeline {ttid}");
}
}
state
.timelines
.insert(ttid, GlobalMapTimeline::CreationInProgress);
state.get_dependencies()
};
// We can get a race condition here in case of concurrent create calls, but only
// in theory. create() will return valid timeline on the next try.
TIMELINES_STATE
.lock()
.unwrap()
.try_insert(timeline.clone())?;
// Do the actual move and reflect the result in the map.
match GlobalTimelines::install_temp_timeline(ttid, tmp_path, &conf).await {
Ok(timeline) => {
let mut timeline_shared_state = timeline.write_shared_state().await;
let mut state = TIMELINES_STATE.lock().unwrap();
assert!(matches!(
state.timelines.get(&ttid),
Some(GlobalMapTimeline::CreationInProgress)
));
state
.timelines
.insert(ttid, GlobalMapTimeline::Timeline(timeline.clone()));
drop(state);
timeline.bootstrap(
&mut timeline_shared_state,
// Write the new timeline to the disk and start background workers.
// Bootstrap is transactional, so if it fails, the timeline will be deleted,
// and the state on disk should remain unchanged.
if let Err(e) = timeline
.init_new(
&mut shared_state,
&conf,
broker_active_set,
partial_backup_rate_limiter,
);
drop(timeline_shared_state);
Ok(timeline)
}
Err(e) => {
// Init failed, remove the marker from the map
let mut state = TIMELINES_STATE.lock().unwrap();
assert!(matches!(
state.timelines.get(&ttid),
Some(GlobalMapTimeline::CreationInProgress)
));
state.timelines.remove(&ttid);
Err(e)
)
.await
{
// Note: the most likely reason for init failure is that the timeline
// directory already exists on disk. This happens when timeline is corrupted
// and wasn't loaded from disk on startup because of that. We want to preserve
// the timeline directory in this case, for further inspection.
// TODO: this is an unusual error, perhaps we should send it to sentry
// TODO: compute will try to create timeline every second, we should add backoff
error!("failed to init new timeline {}: {}", ttid, e);
// Timeline failed to init, it cannot be used. Remove it from the map.
TIMELINES_STATE.lock().unwrap().timelines.remove(&ttid);
return Err(e);
}
// We are done with bootstrap, release the lock, return the timeline.
// {} block forces release before .await
}
}
/// Main part of load_temp_timeline: do the move and load.
async fn install_temp_timeline(
ttid: TenantTimelineId,
tmp_path: &Utf8PathBuf,
conf: &SafeKeeperConf,
) -> Result<Arc<Timeline>> {
let tenant_path = get_tenant_dir(conf, &ttid.tenant_id);
let timeline_path = get_timeline_dir(conf, &ttid);
// We must have already checked that timeline doesn't exist in the map,
// but there might be existing datadir: if timeline is corrupted it is
// not loaded. We don't want to overwrite such a dir, so check for its
// existence.
match fs::metadata(&timeline_path).await {
Ok(_) => {
// Timeline directory exists on disk, we should leave state unchanged
// and return error.
bail!(TimelineError::Invalid(ttid));
}
Err(e) if e.kind() == std::io::ErrorKind::NotFound => {}
Err(e) => {
return Err(e.into());
}
}
info!(
"moving timeline {} from {} to {}",
ttid, tmp_path, timeline_path
);
// Now it is safe to move the timeline directory to the correct
// location. First, create tenant directory. Ignore error if it already
// exists.
if let Err(e) = tokio::fs::create_dir(&tenant_path).await {
if e.kind() != std::io::ErrorKind::AlreadyExists {
return Err(e.into());
}
}
// fsync it
fsync_async_opt(&tenant_path, !conf.no_sync).await?;
// and its creation
fsync_async_opt(&conf.workdir, !conf.no_sync).await?;
// Do the move.
durable_rename(tmp_path, &timeline_path, !conf.no_sync).await?;
Timeline::load_timeline(conf, ttid)
Ok(timeline)
}
/// Get a timeline from the global map. If it's not present, it doesn't exist on disk,
@@ -404,16 +358,8 @@ impl GlobalTimelines {
global_lock
.timelines
.values()
.filter_map(|t| match t {
GlobalMapTimeline::Timeline(t) => {
if t.is_cancelled() {
None
} else {
Some(t.clone())
}
}
_ => None,
})
.filter(|t| !t.is_cancelled())
.cloned()
.collect()
}
@@ -424,11 +370,8 @@ impl GlobalTimelines {
global_lock
.timelines
.values()
.filter_map(|t| match t {
GlobalMapTimeline::Timeline(t) => Some(t.clone()),
_ => None,
})
.filter(|t| t.ttid.tenant_id == tenant_id)
.cloned()
.collect()
}
@@ -561,45 +504,3 @@ fn delete_dir(path: Utf8PathBuf) -> Result<bool> {
Err(e) => Err(e.into()),
}
}
/// Create temp directory for a new timeline. It needs to be located on the same
/// filesystem as the rest of the timelines. It will be automatically deleted when
/// Utf8TempDir goes out of scope.
pub async fn create_temp_timeline_dir(
conf: &SafeKeeperConf,
ttid: TenantTimelineId,
) -> Result<(Utf8TempDir, Utf8PathBuf)> {
let temp_base = conf.workdir.join("tmp");
tokio::fs::create_dir_all(&temp_base).await?;
let tli_dir = camino_tempfile::Builder::new()
.suffix("_temptli")
.prefix(&format!("{}_{}_", ttid.tenant_id, ttid.timeline_id))
.tempdir_in(temp_base)?;
let tli_dir_path = tli_dir.path().to_path_buf();
Ok((tli_dir, tli_dir_path))
}
/// Do basic validation of a temp timeline, before moving it to the global map.
pub async fn validate_temp_timeline(
conf: &SafeKeeperConf,
ttid: TenantTimelineId,
path: &Utf8PathBuf,
) -> Result<(Lsn, Lsn)> {
let control_path = path.join("safekeeper.control");
let control_store = control_file::FileStorage::load_control_file(control_path)?;
if control_store.server.wal_seg_size == 0 {
bail!("wal_seg_size is not set");
}
let wal_store = wal_storage::PhysicalStorage::new(&ttid, path.clone(), conf, &control_store)?;
let commit_lsn = control_store.commit_lsn;
let flush_lsn = wal_store.flush_lsn();
Ok((commit_lsn, flush_lsn))
}

View File

@@ -186,14 +186,8 @@ impl PhysicalStorage {
"initialized storage for timeline {}, flush_lsn={}, commit_lsn={}, peer_horizon_lsn={}",
ttid.timeline_id, flush_lsn, state.commit_lsn, state.peer_horizon_lsn,
);
if flush_lsn < state.commit_lsn {
bail!("timeline {} potential data loss: flush_lsn {} by find_end_of_wal is less than commit_lsn {} from control file", ttid.timeline_id, flush_lsn, state.commit_lsn);
}
if flush_lsn < state.peer_horizon_lsn {
warn!(
"timeline {}: flush_lsn {} is less than cfile peer_horizon_lsn {}",
ttid.timeline_id, flush_lsn, state.peer_horizon_lsn
);
if flush_lsn < state.commit_lsn || flush_lsn < state.peer_horizon_lsn {
warn!("timeline {} potential data loss: flush_lsn by find_end_of_wal is less than either commit_lsn or peer_horizon_lsn from control file", ttid.timeline_id);
}
Ok(PhysicalStorage {

View File

@@ -59,7 +59,7 @@ impl GlobalMap {
if state.commit_lsn < state.local_start_lsn {
bail!(
"commit_lsn {} is smaller than local_start_lsn {}",
"commit_lsn {} is higher than local_start_lsn {}",
state.commit_lsn,
state.local_start_lsn
);
@@ -96,7 +96,23 @@ impl GlobalMap {
let local_start_lsn = Lsn::INVALID;
let state =
TimelinePersistentState::new(&ttid, server_info, vec![], commit_lsn, local_start_lsn)?;
TimelinePersistentState::new(&ttid, server_info, vec![], commit_lsn, local_start_lsn);
if state.server.wal_seg_size == 0 {
bail!(TimelineError::UninitializedWalSegSize(ttid));
}
if state.server.pg_version == UNKNOWN_SERVER_VERSION {
bail!(TimelineError::UninitialinzedPgVersion(ttid));
}
if state.commit_lsn < state.local_start_lsn {
bail!(
"commit_lsn {} is higher than local_start_lsn {}",
state.commit_lsn,
state.local_start_lsn
);
}
let disk_timeline = self.disk.put_state(&ttid, state);
let control_store = DiskStateStorage::new(disk_timeline.clone());