mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-23 08:00:37 +00:00
Compare commits
36 Commits
enable_v17
...
yuchen/dir
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
afa0fe0e87 | ||
|
|
656ddbce2c | ||
|
|
36850cb047 | ||
|
|
a5a86bedb2 | ||
|
|
9653b64569 | ||
|
|
e92d0fa83f | ||
|
|
ad44e11a69 | ||
|
|
d99a61bd75 | ||
|
|
3b88998f8e | ||
|
|
4b4a3edb80 | ||
|
|
0d0ba568d2 | ||
|
|
cb51ddc949 | ||
|
|
156237d553 | ||
|
|
136006907b | ||
|
|
929c8d42d3 | ||
|
|
e377177680 | ||
|
|
84b0902e6a | ||
|
|
8f9679ce95 | ||
|
|
62722e8559 | ||
|
|
b418c343e1 | ||
|
|
1c61b68c3b | ||
|
|
84e2242673 | ||
|
|
e9d9663fcd | ||
|
|
f28dd95022 | ||
|
|
ee4600034e | ||
|
|
12a4e33022 | ||
|
|
6d03e2810d | ||
|
|
4e1309475c | ||
|
|
f1418cad52 | ||
|
|
a04cfd754b | ||
|
|
bc13310e56 | ||
|
|
5c76b2d474 | ||
|
|
97f7b0b86f | ||
|
|
3a5b44ea53 | ||
|
|
95554c7377 | ||
|
|
a85bd88866 |
@@ -860,24 +860,14 @@ ENV PATH="/home/nonroot/.cargo/bin:/usr/local/pgsql/bin/:$PATH"
|
||||
USER nonroot
|
||||
WORKDIR /home/nonroot
|
||||
|
||||
# Use new version only for v17
|
||||
# TODO test and update pgrx for older versions too
|
||||
RUN case "${PG_VERSION}" in \
|
||||
"v17") \
|
||||
export PGRX_VERSION=0.12.6 \
|
||||
;; \
|
||||
"v14" | "v15" | "v16") \
|
||||
export PGRX_VERSION=0.11.3 \
|
||||
;; \
|
||||
*) \
|
||||
echo "unexpected PostgreSQL version" && exit 1 \
|
||||
;; \
|
||||
RUN case "${PG_VERSION}" in "v17") \
|
||||
echo "v17 is not supported yet by pgrx. Quit" && exit 0;; \
|
||||
esac && \
|
||||
curl -sSO https://static.rust-lang.org/rustup/dist/$(uname -m)-unknown-linux-gnu/rustup-init && \
|
||||
chmod +x rustup-init && \
|
||||
./rustup-init -y --no-modify-path --profile minimal --default-toolchain stable && \
|
||||
rm rustup-init && \
|
||||
cargo install --locked --version ${PGRX_VERSION} cargo-pgrx && \
|
||||
cargo install --locked --version 0.11.3 cargo-pgrx && \
|
||||
/bin/bash -c 'cargo pgrx init --pg${PG_VERSION:1}=/usr/local/pgsql/bin/pg_config'
|
||||
|
||||
USER root
|
||||
@@ -892,41 +882,18 @@ USER root
|
||||
FROM rust-extensions-build AS pg-jsonschema-pg-build
|
||||
ARG PG_VERSION
|
||||
|
||||
# version 0.3.3 supports v17
|
||||
# last release v0.3.3 - Oct 16, 2024
|
||||
#
|
||||
# there were no breaking changes
|
||||
# so we can use the same version for all versions
|
||||
# when we're ready to bump pgrx for everyone
|
||||
RUN case "${PG_VERSION}" in \
|
||||
"v17") \
|
||||
export PG_JSONSCHEMA_VERSION=0.3.3 \
|
||||
export PG_JSONSCHEMA_CHECKSUM=40c2cffab4187e0233cb8c3bde013be92218c282f95f4469c5282f6b30d64eac \
|
||||
;; \
|
||||
"v14" | "v15" | "v16") \
|
||||
export PG_JSONSCHEMA_VERSION=0.3.1 \
|
||||
export PG_JSONSCHEMA_CHECKSUM=61df3db1ed83cf24f6aa39c826f8818bfa4f0bd33b587fd6b2b1747985642297 \
|
||||
;; \
|
||||
*) \
|
||||
echo "unexpected PostgreSQL version" && exit 1 \
|
||||
;; \
|
||||
RUN case "${PG_VERSION}" in "v17") \
|
||||
echo "pg_jsonschema does not yet have a release that supports pg17" && exit 0;; \
|
||||
esac && \
|
||||
wget https://github.com/supabase/pg_jsonschema/archive/refs/tags/v${PG_JSONSCHEMA_VERSION}.tar.gz -O pg_jsonschema.tar.gz && \
|
||||
echo "${PG_JSONSCHEMA_CHECKSUM} pg_jsonschema.tar.gz" | sha256sum --check && \
|
||||
wget https://github.com/supabase/pg_jsonschema/archive/refs/tags/v0.3.1.tar.gz -O pg_jsonschema.tar.gz && \
|
||||
echo "61df3db1ed83cf24f6aa39c826f8818bfa4f0bd33b587fd6b2b1747985642297 pg_jsonschema.tar.gz" | sha256sum --check && \
|
||||
mkdir pg_jsonschema-src && cd pg_jsonschema-src && tar xzf ../pg_jsonschema.tar.gz --strip-components=1 -C . && \
|
||||
# see commit 252b3685a27a0f4c31a0f91e983c6314838e89e8
|
||||
# `unsafe-postgres` feature allows to build pgx extensions
|
||||
# against postgres forks that decided to change their ABI name (like us).
|
||||
# With that we can build extensions without forking them and using stock
|
||||
# pgx. As this feature is new few manual version bumps were required.
|
||||
case "${PG_VERSION}" in \
|
||||
"v17") \
|
||||
sed -i 's/pgrx = "0.12.6"/pgrx = { version = "0.12.6", features = [ "unsafe-postgres" ] }/g' Cargo.toml \
|
||||
;; \
|
||||
"v14" | "v15" | "v16") \
|
||||
sed -i 's/pgrx = "0.11.3"/pgrx = { version = "0.11.3", features = [ "unsafe-postgres" ] }/g' Cargo.toml \
|
||||
;; \
|
||||
esac && \
|
||||
sed -i 's/pgrx = "0.11.3"/pgrx = { version = "0.11.3", features = [ "unsafe-postgres" ] }/g' Cargo.toml && \
|
||||
cargo pgrx install --release && \
|
||||
echo "trusted = true" >> /usr/local/pgsql/share/extension/pg_jsonschema.control
|
||||
|
||||
@@ -940,36 +907,13 @@ RUN case "${PG_VERSION}" in \
|
||||
FROM rust-extensions-build AS pg-graphql-pg-build
|
||||
ARG PG_VERSION
|
||||
|
||||
# version 1.5.9 supports v17
|
||||
# last release v1.5.9 - Oct 16, 2024
|
||||
#
|
||||
# looks like there were no breaking changes
|
||||
# so we can use the same version for all versions
|
||||
# when we're ready to bump pgrx for everyone
|
||||
RUN case "${PG_VERSION}" in \
|
||||
"v17") \
|
||||
export PG_GRAPHQL_VERSION=1.5.9 \
|
||||
export PG_GRAPHQL_CHECKSUM=cf768385a41278be1333472204fc0328118644ae443182cf52f7b9b23277e497 \
|
||||
;; \
|
||||
"v14" | "v15" | "v16") \
|
||||
export PG_GRAPHQL_VERSION=1.5.7 \
|
||||
export PG_GRAPHQL_CHECKSUM=2b3e567a5b31019cb97ae0e33263c1bcc28580be5a444ac4c8ece5c4be2aea41 \
|
||||
;; \
|
||||
*) \
|
||||
echo "unexpected PostgreSQL version" && exit 1 \
|
||||
;; \
|
||||
RUN case "${PG_VERSION}" in "v17") \
|
||||
echo "pg_graphql does not yet have a release that supports pg17 as of now" && exit 0;; \
|
||||
esac && \
|
||||
wget https://github.com/supabase/pg_graphql/archive/refs/tags/v${PG_GRAPHQL_VERSION}.tar.gz -O pg_graphql.tar.gz && \
|
||||
echo "${PG_GRAPHQL_CHECKSUM} pg_graphql.tar.gz" | sha256sum --check && \
|
||||
wget https://github.com/supabase/pg_graphql/archive/refs/tags/v1.5.7.tar.gz -O pg_graphql.tar.gz && \
|
||||
echo "2b3e567a5b31019cb97ae0e33263c1bcc28580be5a444ac4c8ece5c4be2aea41 pg_graphql.tar.gz" | sha256sum --check && \
|
||||
mkdir pg_graphql-src && cd pg_graphql-src && tar xzf ../pg_graphql.tar.gz --strip-components=1 -C . && \
|
||||
case "${PG_VERSION}" in \
|
||||
"v17") \
|
||||
sed -i 's/pgrx = "0.12.6"/pgrx = { version = "0.12.6", features = [ "unsafe-postgres" ] }/g' Cargo.toml \
|
||||
;; \
|
||||
"v14" | "v15" | "v16") \
|
||||
sed -i 's/pgrx = "0.11.3"/pgrx = { version = "0.11.3", features = [ "unsafe-postgres" ] }/g' Cargo.toml \
|
||||
;; \
|
||||
esac && \
|
||||
sed -i 's/pgrx = "=0.11.3"/pgrx = { version = "0.11.3", features = [ "unsafe-postgres" ] }/g' Cargo.toml && \
|
||||
cargo pgrx install --release && \
|
||||
# it's needed to enable extension because it uses untrusted C language
|
||||
sed -i 's/superuser = false/superuser = true/g' /usr/local/pgsql/share/extension/pg_graphql.control && \
|
||||
@@ -986,10 +930,6 @@ FROM rust-extensions-build AS pg-tiktoken-pg-build
|
||||
ARG PG_VERSION
|
||||
|
||||
# 26806147b17b60763039c6a6878884c41a262318 made on 26/09/2023
|
||||
# doesn't support pg17 yet
|
||||
# it depends on pgrx which doesn't support pg17
|
||||
#
|
||||
# Also, we need to start using releases instead of commits
|
||||
RUN case "${PG_VERSION}" in "v17") \
|
||||
echo "pg_tiktoken does not have versions, nor support for pg17" && exit 0;; \
|
||||
esac && \
|
||||
@@ -1012,9 +952,6 @@ RUN case "${PG_VERSION}" in "v17") \
|
||||
FROM rust-extensions-build AS pg-pgx-ulid-build
|
||||
ARG PG_VERSION
|
||||
|
||||
# doesn't support pg17 yet
|
||||
# it depends on pgrx which doesn't support pg17
|
||||
# https://github.com/pksunkara/pgx_ulid/issues/51
|
||||
RUN case "${PG_VERSION}" in "v17") \
|
||||
echo "pgx_ulid does not support pg17 as of the latest version (0.1.5)" && exit 0;; \
|
||||
esac && \
|
||||
@@ -1035,31 +972,13 @@ RUN case "${PG_VERSION}" in "v17") \
|
||||
FROM rust-extensions-build AS pg-session-jwt-build
|
||||
ARG PG_VERSION
|
||||
|
||||
# Both pgrx 0.12.6 and 0.11.3 are maintained in parallel, with identical extension versions and features.
|
||||
RUN case "${PG_VERSION}" in \
|
||||
"v17") \
|
||||
export PG_SESSION_JWT_VERSION=0.1.2-v17 \
|
||||
esport PG_SESSION_JWT_CHECKSUM=c8ecbed9cb8c6441bce5134a176002b043018adf9d05a08e457dda233090a86e \
|
||||
;; \
|
||||
"v14" | "v15" | "v16") \
|
||||
export PG_SESSION_JWT_VERSION=0.1.2 \
|
||||
esport PG_SESSION_JWT_CHECKSUM=837932a077888d5545fd54b0abcc79e5f8e37017c2769a930afc2f5c94df6f4e \
|
||||
;; \
|
||||
*) \
|
||||
echo "unexpected PostgreSQL version" && exit 1 \
|
||||
;; \
|
||||
RUN case "${PG_VERSION}" in "v17") \
|
||||
echo "pg_session_jwt does not yet have a release that supports pg17" && exit 0;; \
|
||||
esac && \
|
||||
wget https://github.com/neondatabase/pg_session_jwt/archive/refs/tags/v${PG_SESSION_JWT_VERSION}.tar.gz -O pg_session_jwt.tar.gz && \
|
||||
echo "${PG_SESSION_JWT_CHECKSUM} pg_session_jwt.tar.gz" | sha256sum --check && \
|
||||
wget https://github.com/neondatabase/pg_session_jwt/archive/e642528f429dd3f5403845a50191b78d434b84a6.tar.gz -O pg_session_jwt.tar.gz && \
|
||||
echo "1a69210703cc91224785e59a0a67562dd9eed9a0914ac84b11447582ca0d5b93 pg_session_jwt.tar.gz" | sha256sum --check && \
|
||||
mkdir pg_session_jwt-src && cd pg_session_jwt-src && tar xzf ../pg_session_jwt.tar.gz --strip-components=1 -C . && \
|
||||
case "${PG_VERSION}" in \
|
||||
"v17") \
|
||||
sed -i 's/pgrx = "0.12.6"/pgrx = { version = "0.12.6", features = [ "unsafe-postgres" ] }/g' Cargo.toml \
|
||||
;; \
|
||||
"v14" | "v15" | "v16") \
|
||||
sed -i 's/pgrx = "0.11.3"/pgrx = { version = "0.11.3", features = [ "unsafe-postgres" ] }/g' Cargo.toml \
|
||||
;; \
|
||||
esac && \
|
||||
sed -i 's/pgrx = "=0.11.3"/pgrx = { version = "=0.11.3", features = [ "unsafe-postgres" ] }/g' Cargo.toml && \
|
||||
cargo pgrx install --release
|
||||
|
||||
#########################################################################################
|
||||
|
||||
@@ -684,23 +684,6 @@ pub struct TimelineArchivalConfigRequest {
|
||||
pub state: TimelineArchivalState,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize, Clone)]
|
||||
pub struct TimelinesInfoAndOffloaded {
|
||||
pub timelines: Vec<TimelineInfo>,
|
||||
pub offloaded: Vec<OffloadedTimelineInfo>,
|
||||
}
|
||||
|
||||
/// Analog of [`TimelineInfo`] for offloaded timelines.
|
||||
#[derive(Debug, Serialize, Deserialize, Clone)]
|
||||
pub struct OffloadedTimelineInfo {
|
||||
pub tenant_id: TenantShardId,
|
||||
pub timeline_id: TimelineId,
|
||||
/// Whether the timeline has a parent it has been branched off from or not
|
||||
pub ancestor_timeline_id: Option<TimelineId>,
|
||||
/// Whether to retain the branch lsn at the ancestor or not
|
||||
pub ancestor_retain_lsn: Option<Lsn>,
|
||||
}
|
||||
|
||||
/// This represents the output of the "timeline_detail" and "timeline_list" API calls.
|
||||
#[derive(Debug, Serialize, Deserialize, Clone)]
|
||||
pub struct TimelineInfo {
|
||||
@@ -1030,6 +1013,12 @@ pub mod virtual_file {
|
||||
}
|
||||
|
||||
impl IoMode {
|
||||
#[cfg(target_os = "linux")]
|
||||
pub const fn preferred() -> Self {
|
||||
Self::Direct
|
||||
}
|
||||
|
||||
#[cfg(target_os = "macos")]
|
||||
pub const fn preferred() -> Self {
|
||||
Self::Buffered
|
||||
}
|
||||
|
||||
@@ -26,7 +26,6 @@ use pageserver_api::models::LocationConfigListResponse;
|
||||
use pageserver_api::models::LocationConfigMode;
|
||||
use pageserver_api::models::LsnLease;
|
||||
use pageserver_api::models::LsnLeaseRequest;
|
||||
use pageserver_api::models::OffloadedTimelineInfo;
|
||||
use pageserver_api::models::ShardParameters;
|
||||
use pageserver_api::models::TenantDetails;
|
||||
use pageserver_api::models::TenantLocationConfigRequest;
|
||||
@@ -38,7 +37,6 @@ use pageserver_api::models::TenantShardSplitRequest;
|
||||
use pageserver_api::models::TenantShardSplitResponse;
|
||||
use pageserver_api::models::TenantSorting;
|
||||
use pageserver_api::models::TimelineArchivalConfigRequest;
|
||||
use pageserver_api::models::TimelinesInfoAndOffloaded;
|
||||
use pageserver_api::models::TopTenantShardItem;
|
||||
use pageserver_api::models::TopTenantShardsRequest;
|
||||
use pageserver_api::models::TopTenantShardsResponse;
|
||||
@@ -83,7 +81,6 @@ use crate::tenant::timeline::CompactFlags;
|
||||
use crate::tenant::timeline::CompactionError;
|
||||
use crate::tenant::timeline::Timeline;
|
||||
use crate::tenant::GetTimelineError;
|
||||
use crate::tenant::OffloadedTimeline;
|
||||
use crate::tenant::{LogicalSizeCalculationCause, PageReconstructError};
|
||||
use crate::{disk_usage_eviction_task, tenant};
|
||||
use pageserver_api::models::{
|
||||
@@ -480,22 +477,6 @@ async fn build_timeline_info_common(
|
||||
Ok(info)
|
||||
}
|
||||
|
||||
fn build_timeline_offloaded_info(offloaded: &Arc<OffloadedTimeline>) -> OffloadedTimelineInfo {
|
||||
let &OffloadedTimeline {
|
||||
tenant_shard_id,
|
||||
timeline_id,
|
||||
ancestor_retain_lsn,
|
||||
ancestor_timeline_id,
|
||||
..
|
||||
} = offloaded.as_ref();
|
||||
OffloadedTimelineInfo {
|
||||
tenant_id: tenant_shard_id,
|
||||
timeline_id,
|
||||
ancestor_retain_lsn,
|
||||
ancestor_timeline_id,
|
||||
}
|
||||
}
|
||||
|
||||
// healthcheck handler
|
||||
async fn status_handler(
|
||||
request: Request<Body>,
|
||||
@@ -662,7 +643,7 @@ async fn timeline_list_handler(
|
||||
)
|
||||
.instrument(info_span!("build_timeline_info", timeline_id = %timeline.timeline_id))
|
||||
.await
|
||||
.context("Failed to build timeline info")
|
||||
.context("Failed to convert tenant timeline {timeline_id} into the local one: {e:?}")
|
||||
.map_err(ApiError::InternalServerError)?;
|
||||
|
||||
response_data.push(timeline_info);
|
||||
@@ -677,62 +658,6 @@ async fn timeline_list_handler(
|
||||
json_response(StatusCode::OK, response_data)
|
||||
}
|
||||
|
||||
async fn timeline_and_offloaded_list_handler(
|
||||
request: Request<Body>,
|
||||
_cancel: CancellationToken,
|
||||
) -> Result<Response<Body>, ApiError> {
|
||||
let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
|
||||
let include_non_incremental_logical_size: Option<bool> =
|
||||
parse_query_param(&request, "include-non-incremental-logical-size")?;
|
||||
let force_await_initial_logical_size: Option<bool> =
|
||||
parse_query_param(&request, "force-await-initial-logical-size")?;
|
||||
check_permission(&request, Some(tenant_shard_id.tenant_id))?;
|
||||
|
||||
let state = get_state(&request);
|
||||
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
|
||||
|
||||
let response_data = async {
|
||||
let tenant = state
|
||||
.tenant_manager
|
||||
.get_attached_tenant_shard(tenant_shard_id)?;
|
||||
|
||||
tenant.wait_to_become_active(ACTIVE_TENANT_TIMEOUT).await?;
|
||||
|
||||
let (timelines, offloadeds) = tenant.list_timelines_and_offloaded();
|
||||
|
||||
let mut timeline_infos = Vec::with_capacity(timelines.len());
|
||||
for timeline in timelines {
|
||||
let timeline_info = build_timeline_info(
|
||||
&timeline,
|
||||
include_non_incremental_logical_size.unwrap_or(false),
|
||||
force_await_initial_logical_size.unwrap_or(false),
|
||||
&ctx,
|
||||
)
|
||||
.instrument(info_span!("build_timeline_info", timeline_id = %timeline.timeline_id))
|
||||
.await
|
||||
.context("Failed to build timeline info")
|
||||
.map_err(ApiError::InternalServerError)?;
|
||||
|
||||
timeline_infos.push(timeline_info);
|
||||
}
|
||||
let offloaded_infos = offloadeds
|
||||
.into_iter()
|
||||
.map(|offloaded| build_timeline_offloaded_info(&offloaded))
|
||||
.collect::<Vec<_>>();
|
||||
let res = TimelinesInfoAndOffloaded {
|
||||
timelines: timeline_infos,
|
||||
offloaded: offloaded_infos,
|
||||
};
|
||||
Ok::<TimelinesInfoAndOffloaded, ApiError>(res)
|
||||
}
|
||||
.instrument(info_span!("timeline_and_offloaded_list",
|
||||
tenant_id = %tenant_shard_id.tenant_id,
|
||||
shard_id = %tenant_shard_id.shard_slug()))
|
||||
.await?;
|
||||
|
||||
json_response(StatusCode::OK, response_data)
|
||||
}
|
||||
|
||||
async fn timeline_preserve_initdb_handler(
|
||||
request: Request<Body>,
|
||||
_cancel: CancellationToken,
|
||||
@@ -3068,9 +2993,6 @@ pub fn make_router(
|
||||
.get("/v1/tenant/:tenant_shard_id/timeline", |r| {
|
||||
api_handler(r, timeline_list_handler)
|
||||
})
|
||||
.get("/v1/tenant/:tenant_shard_id/timeline_and_offloaded", |r| {
|
||||
api_handler(r, timeline_and_offloaded_list_handler)
|
||||
})
|
||||
.post("/v1/tenant/:tenant_shard_id/timeline", |r| {
|
||||
api_handler(r, timeline_create_handler)
|
||||
})
|
||||
|
||||
@@ -1755,7 +1755,7 @@ impl Tenant {
|
||||
}
|
||||
|
||||
/// Lists timelines the tenant contains.
|
||||
/// It's up to callers to omit certain timelines that are not considered ready for use.
|
||||
/// Up to tenant's implementation to omit certain timelines that ar not considered ready for use.
|
||||
pub fn list_timelines(&self) -> Vec<Arc<Timeline>> {
|
||||
self.timelines
|
||||
.lock()
|
||||
@@ -1765,29 +1765,6 @@ impl Tenant {
|
||||
.collect()
|
||||
}
|
||||
|
||||
/// Lists timelines the tenant manages, including offloaded ones.
|
||||
///
|
||||
/// It's up to callers to omit certain timelines that are not considered ready for use.
|
||||
pub fn list_timelines_and_offloaded(
|
||||
&self,
|
||||
) -> (Vec<Arc<Timeline>>, Vec<Arc<OffloadedTimeline>>) {
|
||||
let timelines = self
|
||||
.timelines
|
||||
.lock()
|
||||
.unwrap()
|
||||
.values()
|
||||
.map(Arc::clone)
|
||||
.collect();
|
||||
let offloaded = self
|
||||
.timelines_offloaded
|
||||
.lock()
|
||||
.unwrap()
|
||||
.values()
|
||||
.map(Arc::clone)
|
||||
.collect();
|
||||
(timelines, offloaded)
|
||||
}
|
||||
|
||||
pub fn list_timeline_ids(&self) -> Vec<TimelineId> {
|
||||
self.timelines.lock().unwrap().keys().cloned().collect()
|
||||
}
|
||||
|
||||
@@ -515,8 +515,8 @@ impl DeltaLayerWriterInner {
|
||||
) -> anyhow::Result<(PersistentLayerDesc, Utf8PathBuf)> {
|
||||
let temp_path = self.path.clone();
|
||||
let result = self.finish0(key_end, ctx).await;
|
||||
if let Err(ref e) = result {
|
||||
tracing::info!(%temp_path, "cleaning up temporary file after error during writing: {e}");
|
||||
if result.is_err() {
|
||||
tracing::info!(%temp_path, "cleaning up temporary file after error during writing");
|
||||
if let Err(e) = std::fs::remove_file(&temp_path) {
|
||||
tracing::warn!(error=%e, %temp_path, "error cleaning up temporary layer file after error during writing");
|
||||
}
|
||||
|
||||
@@ -827,25 +827,6 @@ impl ImageLayerWriterInner {
|
||||
self,
|
||||
ctx: &RequestContext,
|
||||
end_key: Option<Key>,
|
||||
) -> anyhow::Result<(PersistentLayerDesc, Utf8PathBuf)> {
|
||||
let temp_path = self.path.clone();
|
||||
let result = self.finish0(ctx, end_key).await;
|
||||
if let Err(ref e) = result {
|
||||
tracing::info!(%temp_path, "cleaning up temporary file after error during writing: {e}");
|
||||
if let Err(e) = std::fs::remove_file(&temp_path) {
|
||||
tracing::warn!(error=%e, %temp_path, "error cleaning up temporary layer file after error during writing");
|
||||
}
|
||||
}
|
||||
result
|
||||
}
|
||||
|
||||
///
|
||||
/// Finish writing the image layer.
|
||||
///
|
||||
async fn finish0(
|
||||
self,
|
||||
ctx: &RequestContext,
|
||||
end_key: Option<Key>,
|
||||
) -> anyhow::Result<(PersistentLayerDesc, Utf8PathBuf)> {
|
||||
let index_start_blk = self.blob_writer.size().div_ceil(PAGE_SZ as u64) as u32;
|
||||
|
||||
|
||||
@@ -42,7 +42,7 @@ impl SplitWriterResult {
|
||||
pub struct SplitImageLayerWriter {
|
||||
inner: ImageLayerWriter,
|
||||
target_layer_size: u64,
|
||||
generated_layer_writers: Vec<(ImageLayerWriter, PersistentLayerKey)>,
|
||||
generated_layers: Vec<SplitWriterResult>,
|
||||
conf: &'static PageServerConf,
|
||||
timeline_id: TimelineId,
|
||||
tenant_shard_id: TenantShardId,
|
||||
@@ -71,7 +71,7 @@ impl SplitImageLayerWriter {
|
||||
ctx,
|
||||
)
|
||||
.await?,
|
||||
generated_layer_writers: Vec::new(),
|
||||
generated_layers: Vec::new(),
|
||||
conf,
|
||||
timeline_id,
|
||||
tenant_shard_id,
|
||||
@@ -80,12 +80,18 @@ impl SplitImageLayerWriter {
|
||||
})
|
||||
}
|
||||
|
||||
pub async fn put_image(
|
||||
pub async fn put_image_with_discard_fn<D, F>(
|
||||
&mut self,
|
||||
key: Key,
|
||||
img: Bytes,
|
||||
tline: &Arc<Timeline>,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
discard: D,
|
||||
) -> anyhow::Result<()>
|
||||
where
|
||||
D: FnOnce(&PersistentLayerKey) -> F,
|
||||
F: Future<Output = bool>,
|
||||
{
|
||||
// The current estimation is an upper bound of the space that the key/image could take
|
||||
// because we did not consider compression in this estimation. The resulting image layer
|
||||
// could be smaller than the target size.
|
||||
@@ -102,83 +108,72 @@ impl SplitImageLayerWriter {
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
let prev_image_writer = std::mem::replace(&mut self.inner, next_image_writer);
|
||||
let layer_key = PersistentLayerKey {
|
||||
key_range: self.start_key..key,
|
||||
lsn_range: PersistentLayerDesc::image_layer_lsn_range(self.lsn),
|
||||
is_delta: false,
|
||||
};
|
||||
let prev_image_writer = std::mem::replace(&mut self.inner, next_image_writer);
|
||||
self.start_key = key;
|
||||
|
||||
self.generated_layer_writers
|
||||
.push((prev_image_writer, layer_key));
|
||||
if discard(&layer_key).await {
|
||||
drop(prev_image_writer);
|
||||
self.generated_layers
|
||||
.push(SplitWriterResult::Discarded(layer_key));
|
||||
} else {
|
||||
let (desc, path) = prev_image_writer.finish_with_end_key(key, ctx).await?;
|
||||
|
||||
let layer = Layer::finish_creating(self.conf, tline, desc, &path)?;
|
||||
self.generated_layers
|
||||
.push(SplitWriterResult::Produced(layer));
|
||||
}
|
||||
}
|
||||
self.inner.put_image(key, img, ctx).await
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
pub async fn put_image(
|
||||
&mut self,
|
||||
key: Key,
|
||||
img: Bytes,
|
||||
tline: &Arc<Timeline>,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
self.put_image_with_discard_fn(key, img, tline, ctx, |_| async { false })
|
||||
.await
|
||||
}
|
||||
|
||||
pub(crate) async fn finish_with_discard_fn<D, F>(
|
||||
self,
|
||||
tline: &Arc<Timeline>,
|
||||
ctx: &RequestContext,
|
||||
end_key: Key,
|
||||
discard_fn: D,
|
||||
discard: D,
|
||||
) -> anyhow::Result<Vec<SplitWriterResult>>
|
||||
where
|
||||
D: Fn(&PersistentLayerKey) -> F,
|
||||
D: FnOnce(&PersistentLayerKey) -> F,
|
||||
F: Future<Output = bool>,
|
||||
{
|
||||
let Self {
|
||||
mut generated_layer_writers,
|
||||
mut generated_layers,
|
||||
inner,
|
||||
..
|
||||
} = self;
|
||||
if inner.num_keys() != 0 {
|
||||
let layer_key = PersistentLayerKey {
|
||||
key_range: self.start_key..end_key,
|
||||
lsn_range: PersistentLayerDesc::image_layer_lsn_range(self.lsn),
|
||||
is_delta: false,
|
||||
};
|
||||
generated_layer_writers.push((inner, layer_key));
|
||||
if inner.num_keys() == 0 {
|
||||
return Ok(generated_layers);
|
||||
}
|
||||
let clean_up_layers = |generated_layers: Vec<SplitWriterResult>| {
|
||||
for produced_layer in generated_layers {
|
||||
if let SplitWriterResult::Produced(image_layer) = produced_layer {
|
||||
let layer: Layer = image_layer.into();
|
||||
layer.delete_on_drop();
|
||||
}
|
||||
}
|
||||
let layer_key = PersistentLayerKey {
|
||||
key_range: self.start_key..end_key,
|
||||
lsn_range: PersistentLayerDesc::image_layer_lsn_range(self.lsn),
|
||||
is_delta: false,
|
||||
};
|
||||
// BEGIN: catch every error and do the recovery in the below section
|
||||
let mut generated_layers = Vec::new();
|
||||
for (inner, layer_key) in generated_layer_writers {
|
||||
if discard_fn(&layer_key).await {
|
||||
generated_layers.push(SplitWriterResult::Discarded(layer_key));
|
||||
} else {
|
||||
let layer = match inner
|
||||
.finish_with_end_key(layer_key.key_range.end, ctx)
|
||||
.await
|
||||
{
|
||||
Ok((desc, path)) => {
|
||||
match Layer::finish_creating(self.conf, tline, desc, &path) {
|
||||
Ok(layer) => layer,
|
||||
Err(e) => {
|
||||
tokio::fs::remove_file(&path).await.ok();
|
||||
clean_up_layers(generated_layers);
|
||||
return Err(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
// ImageLayerWriter::finish will clean up the temporary layer if anything goes wrong,
|
||||
// so we don't need to remove it by ourselves.
|
||||
clean_up_layers(generated_layers);
|
||||
return Err(e);
|
||||
}
|
||||
};
|
||||
generated_layers.push(SplitWriterResult::Produced(layer));
|
||||
}
|
||||
if discard(&layer_key).await {
|
||||
generated_layers.push(SplitWriterResult::Discarded(layer_key));
|
||||
} else {
|
||||
let (desc, path) = inner.finish_with_end_key(end_key, ctx).await?;
|
||||
let layer = Layer::finish_creating(self.conf, tline, desc, &path)?;
|
||||
generated_layers.push(SplitWriterResult::Produced(layer));
|
||||
}
|
||||
// END: catch every error and do the recovery in the above section
|
||||
Ok(generated_layers)
|
||||
}
|
||||
|
||||
@@ -192,6 +187,11 @@ impl SplitImageLayerWriter {
|
||||
self.finish_with_discard_fn(tline, ctx, end_key, |_| async { false })
|
||||
.await
|
||||
}
|
||||
|
||||
/// This function will be deprecated with #8841.
|
||||
pub(crate) fn take(self) -> anyhow::Result<(Vec<SplitWriterResult>, ImageLayerWriter)> {
|
||||
Ok((self.generated_layers, self.inner))
|
||||
}
|
||||
}
|
||||
|
||||
/// A delta writer that takes key-lsn-values and produces multiple delta layers.
|
||||
@@ -296,16 +296,8 @@ impl SplitDeltaLayerWriter {
|
||||
self.generated_layers
|
||||
.push(SplitWriterResult::Discarded(layer_key));
|
||||
} else {
|
||||
// `finish` will remove the file if anything goes wrong, while we need to handle deleting temporary
|
||||
// files for `finish_creating`.
|
||||
let (desc, path) = prev_delta_writer.finish(key, ctx).await?;
|
||||
let delta_layer = match Layer::finish_creating(self.conf, tline, desc, &path) {
|
||||
Ok(layer) => layer,
|
||||
Err(e) => {
|
||||
tokio::fs::remove_file(&path).await.ok();
|
||||
return Err(e);
|
||||
}
|
||||
};
|
||||
let delta_layer = Layer::finish_creating(self.conf, tline, desc, &path)?;
|
||||
self.generated_layers
|
||||
.push(SplitWriterResult::Produced(delta_layer));
|
||||
}
|
||||
@@ -365,16 +357,8 @@ impl SplitDeltaLayerWriter {
|
||||
if discard(&layer_key).await {
|
||||
generated_layers.push(SplitWriterResult::Discarded(layer_key));
|
||||
} else {
|
||||
// `finish` will remove the file if anything goes wrong, while we need to handle deleting temporary
|
||||
// files for `finish_creating`.
|
||||
let (desc, path) = inner.finish(end_key, ctx).await?;
|
||||
let delta_layer = match Layer::finish_creating(self.conf, tline, desc, &path) {
|
||||
Ok(layer) => layer,
|
||||
Err(e) => {
|
||||
tokio::fs::remove_file(&path).await.ok();
|
||||
return Err(e);
|
||||
}
|
||||
};
|
||||
let delta_layer = Layer::finish_creating(self.conf, tline, desc, &path)?;
|
||||
generated_layers.push(SplitWriterResult::Produced(delta_layer));
|
||||
}
|
||||
Ok(generated_layers)
|
||||
@@ -463,7 +447,7 @@ mod tests {
|
||||
.unwrap();
|
||||
|
||||
image_writer
|
||||
.put_image(get_key(0), get_img(0), &ctx)
|
||||
.put_image(get_key(0), get_img(0), &tline, &ctx)
|
||||
.await
|
||||
.unwrap();
|
||||
let layers = image_writer
|
||||
@@ -502,18 +486,14 @@ mod tests {
|
||||
|
||||
#[tokio::test]
|
||||
async fn write_split() {
|
||||
// Test the split writer with retaining all the layers we have produced (discard=false)
|
||||
write_split_helper("split_writer_write_split", false).await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn write_split_discard() {
|
||||
// Test the split writer with discarding all the layers we have produced (discard=true)
|
||||
write_split_helper("split_writer_write_split_discard", true).await;
|
||||
write_split_helper("split_writer_write_split_discard", false).await;
|
||||
}
|
||||
|
||||
/// Test the image+delta writer by writing a large number of images and deltas. If discard is
|
||||
/// set to true, all layers will be discarded.
|
||||
async fn write_split_helper(harness_name: &'static str, discard: bool) {
|
||||
let harness = TenantHarness::create(harness_name).await.unwrap();
|
||||
let (tenant, ctx) = harness.load().await;
|
||||
@@ -547,7 +527,9 @@ mod tests {
|
||||
for i in 0..N {
|
||||
let i = i as u32;
|
||||
image_writer
|
||||
.put_image(get_key(i), get_large_img(), &ctx)
|
||||
.put_image_with_discard_fn(get_key(i), get_large_img(), &tline, &ctx, |_| async {
|
||||
discard
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
delta_writer
|
||||
@@ -563,54 +545,51 @@ mod tests {
|
||||
.unwrap();
|
||||
}
|
||||
let image_layers = image_writer
|
||||
.finish_with_discard_fn(&tline, &ctx, get_key(N as u32), |_| async { discard })
|
||||
.finish(&tline, &ctx, get_key(N as u32))
|
||||
.await
|
||||
.unwrap();
|
||||
let delta_layers = delta_writer
|
||||
.finish_with_discard_fn(&tline, &ctx, |_| async { discard })
|
||||
.await
|
||||
.unwrap();
|
||||
let image_layers = image_layers
|
||||
.into_iter()
|
||||
.map(|x| {
|
||||
if discard {
|
||||
x.into_discarded_layer()
|
||||
} else {
|
||||
x.into_resident_layer().layer_desc().key()
|
||||
let delta_layers = delta_writer.finish(&tline, &ctx).await.unwrap();
|
||||
if discard {
|
||||
for layer in image_layers {
|
||||
layer.into_discarded_layer();
|
||||
}
|
||||
for layer in delta_layers {
|
||||
layer.into_discarded_layer();
|
||||
}
|
||||
} else {
|
||||
let image_layers = image_layers
|
||||
.into_iter()
|
||||
.map(|x| x.into_resident_layer())
|
||||
.collect_vec();
|
||||
let delta_layers = delta_layers
|
||||
.into_iter()
|
||||
.map(|x| x.into_resident_layer())
|
||||
.collect_vec();
|
||||
assert_eq!(image_layers.len(), N / 512 + 1);
|
||||
assert_eq!(delta_layers.len(), N / 512 + 1);
|
||||
assert_eq!(
|
||||
delta_layers.first().unwrap().layer_desc().key_range.start,
|
||||
get_key(0)
|
||||
);
|
||||
assert_eq!(
|
||||
delta_layers.last().unwrap().layer_desc().key_range.end,
|
||||
get_key(N as u32)
|
||||
);
|
||||
for idx in 0..image_layers.len() {
|
||||
assert_ne!(image_layers[idx].layer_desc().key_range.start, Key::MIN);
|
||||
assert_ne!(image_layers[idx].layer_desc().key_range.end, Key::MAX);
|
||||
assert_ne!(delta_layers[idx].layer_desc().key_range.start, Key::MIN);
|
||||
assert_ne!(delta_layers[idx].layer_desc().key_range.end, Key::MAX);
|
||||
if idx > 0 {
|
||||
assert_eq!(
|
||||
image_layers[idx - 1].layer_desc().key_range.end,
|
||||
image_layers[idx].layer_desc().key_range.start
|
||||
);
|
||||
assert_eq!(
|
||||
delta_layers[idx - 1].layer_desc().key_range.end,
|
||||
delta_layers[idx].layer_desc().key_range.start
|
||||
);
|
||||
}
|
||||
})
|
||||
.collect_vec();
|
||||
let delta_layers = delta_layers
|
||||
.into_iter()
|
||||
.map(|x| {
|
||||
if discard {
|
||||
x.into_discarded_layer()
|
||||
} else {
|
||||
x.into_resident_layer().layer_desc().key()
|
||||
}
|
||||
})
|
||||
.collect_vec();
|
||||
assert_eq!(image_layers.len(), N / 512 + 1);
|
||||
assert_eq!(delta_layers.len(), N / 512 + 1);
|
||||
assert_eq!(delta_layers.first().unwrap().key_range.start, get_key(0));
|
||||
assert_eq!(
|
||||
delta_layers.last().unwrap().key_range.end,
|
||||
get_key(N as u32)
|
||||
);
|
||||
for idx in 0..image_layers.len() {
|
||||
assert_ne!(image_layers[idx].key_range.start, Key::MIN);
|
||||
assert_ne!(image_layers[idx].key_range.end, Key::MAX);
|
||||
assert_ne!(delta_layers[idx].key_range.start, Key::MIN);
|
||||
assert_ne!(delta_layers[idx].key_range.end, Key::MAX);
|
||||
if idx > 0 {
|
||||
assert_eq!(
|
||||
image_layers[idx - 1].key_range.end,
|
||||
image_layers[idx].key_range.start
|
||||
);
|
||||
assert_eq!(
|
||||
delta_layers[idx - 1].key_range.end,
|
||||
delta_layers[idx].key_range.start
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -650,11 +629,11 @@ mod tests {
|
||||
.unwrap();
|
||||
|
||||
image_writer
|
||||
.put_image(get_key(0), get_img(0), &ctx)
|
||||
.put_image(get_key(0), get_img(0), &tline, &ctx)
|
||||
.await
|
||||
.unwrap();
|
||||
image_writer
|
||||
.put_image(get_key(1), get_large_img(), &ctx)
|
||||
.put_image(get_key(1), get_large_img(), &tline, &ctx)
|
||||
.await
|
||||
.unwrap();
|
||||
let layers = image_writer
|
||||
|
||||
@@ -141,7 +141,9 @@ impl KeyHistoryRetention {
|
||||
};
|
||||
stat.produce_image_key(img);
|
||||
if let Some(image_writer) = image_writer.as_mut() {
|
||||
image_writer.put_image(key, img.clone(), ctx).await?;
|
||||
image_writer
|
||||
.put_image_with_discard_fn(key, img.clone(), tline, ctx, discard)
|
||||
.await?;
|
||||
} else {
|
||||
delta_writer
|
||||
.put_value_with_discard_fn(
|
||||
@@ -2039,7 +2041,8 @@ impl Timeline {
|
||||
.finish_with_discard_fn(self, ctx, Key::MAX, discard)
|
||||
.await?
|
||||
} else {
|
||||
drop(writer);
|
||||
let (layers, _) = writer.take()?;
|
||||
assert!(layers.is_empty(), "image layers produced in dry run mode?");
|
||||
Vec::new()
|
||||
}
|
||||
} else {
|
||||
|
||||
@@ -161,9 +161,6 @@ pub(crate) enum Reason {
|
||||
/// LockAlreadyTaken indicates that the we attempted to take a lock that was already taken.
|
||||
#[serde(rename = "LOCK_ALREADY_TAKEN")]
|
||||
LockAlreadyTaken,
|
||||
/// ActiveEndpointsLimitExceeded indicates that the limit of concurrently active endpoints was exceeded.
|
||||
#[serde(rename = "ACTIVE_ENDPOINTS_LIMIT_EXCEEDED")]
|
||||
ActiveEndpointsLimitExceeded,
|
||||
#[default]
|
||||
#[serde(other)]
|
||||
Unknown,
|
||||
@@ -197,8 +194,7 @@ impl Reason {
|
||||
| Reason::ComputeTimeQuotaExceeded
|
||||
| Reason::WrittenDataQuotaExceeded
|
||||
| Reason::DataTransferQuotaExceeded
|
||||
| Reason::LogicalSizeQuotaExceeded
|
||||
| Reason::ActiveEndpointsLimitExceeded => false,
|
||||
| Reason::LogicalSizeQuotaExceeded => false,
|
||||
// transitive error. control plane is currently busy
|
||||
// but might be ready soon
|
||||
Reason::RunningOperations
|
||||
|
||||
@@ -87,8 +87,36 @@ pub(crate) mod errors {
|
||||
Reason::ConcurrencyLimitReached => ErrorKind::ControlPlane,
|
||||
Reason::LockAlreadyTaken => ErrorKind::ControlPlane,
|
||||
Reason::RunningOperations => ErrorKind::ControlPlane,
|
||||
Reason::ActiveEndpointsLimitExceeded => ErrorKind::ControlPlane,
|
||||
Reason::Unknown => ErrorKind::ControlPlane,
|
||||
Reason::Unknown => match &**e {
|
||||
ControlPlaneError {
|
||||
http_status_code:
|
||||
http::StatusCode::NOT_FOUND | http::StatusCode::NOT_ACCEPTABLE,
|
||||
..
|
||||
} => crate::error::ErrorKind::User,
|
||||
ControlPlaneError {
|
||||
http_status_code: http::StatusCode::UNPROCESSABLE_ENTITY,
|
||||
error,
|
||||
..
|
||||
} if error
|
||||
.contains("compute time quota of non-primary branches is exceeded") =>
|
||||
{
|
||||
crate::error::ErrorKind::Quota
|
||||
}
|
||||
ControlPlaneError {
|
||||
http_status_code: http::StatusCode::LOCKED,
|
||||
error,
|
||||
..
|
||||
} if error.contains("quota exceeded")
|
||||
|| error.contains("the limit for current plan reached") =>
|
||||
{
|
||||
crate::error::ErrorKind::Quota
|
||||
}
|
||||
ControlPlaneError {
|
||||
http_status_code: http::StatusCode::TOO_MANY_REQUESTS,
|
||||
..
|
||||
} => crate::error::ErrorKind::ServiceRateLimit,
|
||||
ControlPlaneError { .. } => crate::error::ErrorKind::ControlPlane,
|
||||
},
|
||||
},
|
||||
ApiError::Transport(_) => crate::error::ErrorKind::ControlPlane,
|
||||
}
|
||||
|
||||
@@ -14,7 +14,6 @@ use metrics::{CounterPairAssoc, CounterPairVec, HyperLogLog, HyperLogLogVec};
|
||||
use tokio::time::{self, Instant};
|
||||
|
||||
use crate::control_plane::messages::ColdStartInfo;
|
||||
use crate::error::ErrorKind;
|
||||
|
||||
#[derive(MetricGroup)]
|
||||
#[metric(new(thread_pool: Arc<ThreadPoolMetrics>))]
|
||||
@@ -326,10 +325,23 @@ pub enum ConnectionFailureKind {
|
||||
ComputeUncached,
|
||||
}
|
||||
|
||||
#[derive(FixedCardinalityLabel, Copy, Clone)]
|
||||
#[label(singleton = "kind")]
|
||||
pub enum WakeupFailureKind {
|
||||
BadComputeAddress,
|
||||
ApiTransportError,
|
||||
QuotaExceeded,
|
||||
ApiConsoleLocked,
|
||||
ApiConsoleBadRequest,
|
||||
ApiConsoleOtherServerError,
|
||||
ApiConsoleOtherError,
|
||||
TimeoutError,
|
||||
}
|
||||
|
||||
#[derive(LabelGroup)]
|
||||
#[label(set = ConnectionFailuresBreakdownSet)]
|
||||
pub struct ConnectionFailuresBreakdownGroup {
|
||||
pub kind: ErrorKind,
|
||||
pub kind: WakeupFailureKind,
|
||||
pub retry: Bool,
|
||||
}
|
||||
|
||||
|
||||
@@ -1,13 +1,15 @@
|
||||
use hyper::StatusCode;
|
||||
use tracing::{error, info, warn};
|
||||
|
||||
use super::connect_compute::ComputeConnectBackend;
|
||||
use crate::config::RetryConfig;
|
||||
use crate::context::RequestMonitoring;
|
||||
use crate::control_plane::errors::WakeComputeError;
|
||||
use crate::control_plane::messages::{ControlPlaneError, Reason};
|
||||
use crate::control_plane::provider::CachedNodeInfo;
|
||||
use crate::error::ReportableError;
|
||||
use crate::metrics::{
|
||||
ConnectOutcome, ConnectionFailuresBreakdownGroup, Metrics, RetriesMetricGroup, RetryType,
|
||||
WakeupFailureKind,
|
||||
};
|
||||
use crate::proxy::retry::{retry_after, should_retry};
|
||||
|
||||
@@ -58,8 +60,62 @@ pub(crate) async fn wake_compute<B: ComputeConnectBackend>(
|
||||
}
|
||||
|
||||
fn report_error(e: &WakeComputeError, retry: bool) {
|
||||
let kind = e.get_error_kind();
|
||||
|
||||
use crate::control_plane::errors::ApiError;
|
||||
let kind = match e {
|
||||
WakeComputeError::BadComputeAddress(_) => WakeupFailureKind::BadComputeAddress,
|
||||
WakeComputeError::ApiError(ApiError::Transport(_)) => WakeupFailureKind::ApiTransportError,
|
||||
WakeComputeError::ApiError(ApiError::ControlPlane(e)) => match e.get_reason() {
|
||||
Reason::RoleProtected => WakeupFailureKind::ApiConsoleBadRequest,
|
||||
Reason::ResourceNotFound => WakeupFailureKind::ApiConsoleBadRequest,
|
||||
Reason::ProjectNotFound => WakeupFailureKind::ApiConsoleBadRequest,
|
||||
Reason::EndpointNotFound => WakeupFailureKind::ApiConsoleBadRequest,
|
||||
Reason::BranchNotFound => WakeupFailureKind::ApiConsoleBadRequest,
|
||||
Reason::RateLimitExceeded => WakeupFailureKind::ApiConsoleLocked,
|
||||
Reason::NonDefaultBranchComputeTimeExceeded => WakeupFailureKind::QuotaExceeded,
|
||||
Reason::ActiveTimeQuotaExceeded => WakeupFailureKind::QuotaExceeded,
|
||||
Reason::ComputeTimeQuotaExceeded => WakeupFailureKind::QuotaExceeded,
|
||||
Reason::WrittenDataQuotaExceeded => WakeupFailureKind::QuotaExceeded,
|
||||
Reason::DataTransferQuotaExceeded => WakeupFailureKind::QuotaExceeded,
|
||||
Reason::LogicalSizeQuotaExceeded => WakeupFailureKind::QuotaExceeded,
|
||||
Reason::ConcurrencyLimitReached => WakeupFailureKind::ApiConsoleLocked,
|
||||
Reason::LockAlreadyTaken => WakeupFailureKind::ApiConsoleLocked,
|
||||
Reason::RunningOperations => WakeupFailureKind::ApiConsoleLocked,
|
||||
Reason::Unknown => match **e {
|
||||
ControlPlaneError {
|
||||
http_status_code: StatusCode::LOCKED,
|
||||
ref error,
|
||||
..
|
||||
} if error.contains("written data quota exceeded")
|
||||
|| error.contains("the limit for current plan reached") =>
|
||||
{
|
||||
WakeupFailureKind::QuotaExceeded
|
||||
}
|
||||
ControlPlaneError {
|
||||
http_status_code: StatusCode::UNPROCESSABLE_ENTITY,
|
||||
ref error,
|
||||
..
|
||||
} if error.contains("compute time quota of non-primary branches is exceeded") => {
|
||||
WakeupFailureKind::QuotaExceeded
|
||||
}
|
||||
ControlPlaneError {
|
||||
http_status_code: StatusCode::LOCKED,
|
||||
..
|
||||
} => WakeupFailureKind::ApiConsoleLocked,
|
||||
ControlPlaneError {
|
||||
http_status_code: StatusCode::BAD_REQUEST,
|
||||
..
|
||||
} => WakeupFailureKind::ApiConsoleBadRequest,
|
||||
ControlPlaneError {
|
||||
http_status_code, ..
|
||||
} if http_status_code.is_server_error() => {
|
||||
WakeupFailureKind::ApiConsoleOtherServerError
|
||||
}
|
||||
ControlPlaneError { .. } => WakeupFailureKind::ApiConsoleOtherError,
|
||||
},
|
||||
},
|
||||
WakeComputeError::TooManyConnections => WakeupFailureKind::ApiConsoleLocked,
|
||||
WakeComputeError::TooManyConnectionAttempts(_) => WakeupFailureKind::TimeoutError,
|
||||
};
|
||||
Metrics::get()
|
||||
.proxy
|
||||
.connection_failures_breakdown
|
||||
|
||||
@@ -66,25 +66,22 @@ impl FileStorage {
|
||||
})
|
||||
}
|
||||
|
||||
/// Create and reliably persist new control file at given location.
|
||||
///
|
||||
/// Note: we normally call this in temp directory for atomic init, so
|
||||
/// interested in FileStorage as a result only in tests.
|
||||
pub async fn create_new(
|
||||
dir: Utf8PathBuf,
|
||||
/// Create file storage for a new timeline, but don't persist it yet.
|
||||
pub fn create_new(
|
||||
timeline_dir: Utf8PathBuf,
|
||||
conf: &SafeKeeperConf,
|
||||
state: TimelinePersistentState,
|
||||
) -> Result<FileStorage> {
|
||||
// we don't support creating new timelines in offloaded state
|
||||
assert!(matches!(state.eviction_state, EvictionState::Present));
|
||||
|
||||
let mut store = FileStorage {
|
||||
timeline_dir: dir,
|
||||
let store = FileStorage {
|
||||
timeline_dir,
|
||||
no_sync: conf.no_sync,
|
||||
state: state.clone(),
|
||||
state,
|
||||
last_persist_at: Instant::now(),
|
||||
};
|
||||
store.persist(&state).await?;
|
||||
|
||||
Ok(store)
|
||||
}
|
||||
|
||||
@@ -193,6 +190,8 @@ impl TimelinePersistentState {
|
||||
|
||||
impl Storage for FileStorage {
|
||||
/// Persists state durably to the underlying storage.
|
||||
///
|
||||
/// For a description, see <https://lwn.net/Articles/457667/>.
|
||||
async fn persist(&mut self, s: &TimelinePersistentState) -> Result<()> {
|
||||
let _timer = PERSIST_CONTROL_FILE_SECONDS.start_timer();
|
||||
|
||||
@@ -270,7 +269,7 @@ mod test {
|
||||
.await
|
||||
.expect("failed to create timeline dir");
|
||||
let state = TimelinePersistentState::empty();
|
||||
let storage = FileStorage::create_new(timeline_dir, conf, state.clone()).await?;
|
||||
let storage = FileStorage::create_new(timeline_dir, conf, state.clone())?;
|
||||
Ok((storage, state))
|
||||
}
|
||||
|
||||
|
||||
@@ -12,10 +12,10 @@ use tracing::{info, warn};
|
||||
use utils::{id::TenantTimelineId, lsn::Lsn};
|
||||
|
||||
use crate::{
|
||||
control_file::FileStorage,
|
||||
control_file::{FileStorage, Storage},
|
||||
pull_timeline::{create_temp_timeline_dir, load_temp_timeline, validate_temp_timeline},
|
||||
state::TimelinePersistentState,
|
||||
timeline::{Timeline, TimelineError, WalResidentTimeline},
|
||||
timelines_global_map::{create_temp_timeline_dir, validate_temp_timeline},
|
||||
wal_backup::copy_s3_segments,
|
||||
wal_storage::{wal_file_paths, WalReader},
|
||||
GlobalTimelines,
|
||||
@@ -149,16 +149,17 @@ pub async fn handle_request(request: Request) -> Result<()> {
|
||||
vec![],
|
||||
request.until_lsn,
|
||||
start_lsn,
|
||||
)?;
|
||||
);
|
||||
new_state.timeline_start_lsn = start_lsn;
|
||||
new_state.peer_horizon_lsn = request.until_lsn;
|
||||
new_state.backup_lsn = new_backup_lsn;
|
||||
|
||||
FileStorage::create_new(tli_dir_path.clone(), conf, new_state.clone()).await?;
|
||||
let mut file_storage = FileStorage::create_new(tli_dir_path.clone(), conf, new_state.clone())?;
|
||||
file_storage.persist(&new_state).await?;
|
||||
|
||||
// now we have a ready timeline in a temp directory
|
||||
validate_temp_timeline(conf, request.destination_ttid, &tli_dir_path).await?;
|
||||
GlobalTimelines::load_temp_timeline(request.destination_ttid, &tli_dir_path, true).await?;
|
||||
load_temp_timeline(conf, request.destination_ttid, &tli_dir_path).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
use anyhow::{anyhow, bail, Context, Result};
|
||||
use bytes::Bytes;
|
||||
use camino::Utf8PathBuf;
|
||||
use camino_tempfile::Utf8TempDir;
|
||||
use chrono::{DateTime, Utc};
|
||||
use futures::{SinkExt, StreamExt, TryStreamExt};
|
||||
use postgres_ffi::{XLogFileName, XLogSegNo, PG_TLI};
|
||||
@@ -8,6 +9,7 @@ use serde::{Deserialize, Serialize};
|
||||
use std::{
|
||||
cmp::min,
|
||||
io::{self, ErrorKind},
|
||||
sync::Arc,
|
||||
};
|
||||
use tokio::{fs::OpenOptions, io::AsyncWrite, sync::mpsc, task};
|
||||
use tokio_tar::{Archive, Builder, Header};
|
||||
@@ -18,7 +20,7 @@ use tokio_util::{
|
||||
use tracing::{error, info, instrument};
|
||||
|
||||
use crate::{
|
||||
control_file::CONTROL_FILE_NAME,
|
||||
control_file::{self, CONTROL_FILE_NAME},
|
||||
debug_dump,
|
||||
http::{
|
||||
client::{self, Client},
|
||||
@@ -26,14 +28,13 @@ use crate::{
|
||||
},
|
||||
safekeeper::Term,
|
||||
state::TimelinePersistentState,
|
||||
timeline::WalResidentTimeline,
|
||||
timelines_global_map::{create_temp_timeline_dir, validate_temp_timeline},
|
||||
timeline::{get_tenant_dir, get_timeline_dir, Timeline, TimelineError, WalResidentTimeline},
|
||||
wal_backup,
|
||||
wal_storage::open_wal_file,
|
||||
GlobalTimelines,
|
||||
wal_storage::{self, open_wal_file, Storage},
|
||||
GlobalTimelines, SafeKeeperConf,
|
||||
};
|
||||
use utils::{
|
||||
crashsafe::fsync_async_opt,
|
||||
crashsafe::{durable_rename, fsync_async_opt},
|
||||
id::{NodeId, TenantId, TenantTimelineId, TimelineId},
|
||||
logging::SecretString,
|
||||
lsn::Lsn,
|
||||
@@ -427,9 +428,100 @@ async fn pull_timeline(
|
||||
assert!(status.commit_lsn <= status.flush_lsn);
|
||||
|
||||
// Finally, load the timeline.
|
||||
let _tli = GlobalTimelines::load_temp_timeline(ttid, &tli_dir_path, false).await?;
|
||||
let _tli = load_temp_timeline(conf, ttid, &tli_dir_path).await?;
|
||||
|
||||
Ok(Response {
|
||||
safekeeper_host: host,
|
||||
})
|
||||
}
|
||||
|
||||
/// Create temp directory for a new timeline. It needs to be located on the same
|
||||
/// filesystem as the rest of the timelines. It will be automatically deleted when
|
||||
/// Utf8TempDir goes out of scope.
|
||||
pub async fn create_temp_timeline_dir(
|
||||
conf: &SafeKeeperConf,
|
||||
ttid: TenantTimelineId,
|
||||
) -> Result<(Utf8TempDir, Utf8PathBuf)> {
|
||||
// conf.workdir is usually /storage/safekeeper/data
|
||||
// will try to transform it into /storage/safekeeper/tmp
|
||||
let temp_base = conf
|
||||
.workdir
|
||||
.parent()
|
||||
.ok_or(anyhow::anyhow!("workdir has no parent"))?
|
||||
.join("tmp");
|
||||
|
||||
tokio::fs::create_dir_all(&temp_base).await?;
|
||||
|
||||
let tli_dir = camino_tempfile::Builder::new()
|
||||
.suffix("_temptli")
|
||||
.prefix(&format!("{}_{}_", ttid.tenant_id, ttid.timeline_id))
|
||||
.tempdir_in(temp_base)?;
|
||||
|
||||
let tli_dir_path = tli_dir.path().to_path_buf();
|
||||
|
||||
Ok((tli_dir, tli_dir_path))
|
||||
}
|
||||
|
||||
/// Do basic validation of a temp timeline, before moving it to the global map.
|
||||
pub async fn validate_temp_timeline(
|
||||
conf: &SafeKeeperConf,
|
||||
ttid: TenantTimelineId,
|
||||
path: &Utf8PathBuf,
|
||||
) -> Result<(Lsn, Lsn)> {
|
||||
let control_path = path.join("safekeeper.control");
|
||||
|
||||
let control_store = control_file::FileStorage::load_control_file(control_path)?;
|
||||
if control_store.server.wal_seg_size == 0 {
|
||||
bail!("wal_seg_size is not set");
|
||||
}
|
||||
|
||||
let wal_store = wal_storage::PhysicalStorage::new(&ttid, path.clone(), conf, &control_store)?;
|
||||
|
||||
let commit_lsn = control_store.commit_lsn;
|
||||
let flush_lsn = wal_store.flush_lsn();
|
||||
|
||||
Ok((commit_lsn, flush_lsn))
|
||||
}
|
||||
|
||||
/// Move timeline from a temp directory to the main storage, and load it to the global map.
|
||||
///
|
||||
/// This operation is done under a lock to prevent bugs if several concurrent requests are
|
||||
/// trying to load the same timeline. Note that it doesn't guard against creating the
|
||||
/// timeline with the same ttid, but no one should be doing this anyway.
|
||||
pub async fn load_temp_timeline(
|
||||
conf: &SafeKeeperConf,
|
||||
ttid: TenantTimelineId,
|
||||
tmp_path: &Utf8PathBuf,
|
||||
) -> Result<Arc<Timeline>> {
|
||||
// Take a lock to prevent concurrent loadings
|
||||
let load_lock = GlobalTimelines::loading_lock().await;
|
||||
let guard = load_lock.lock().await;
|
||||
|
||||
if !matches!(GlobalTimelines::get(ttid), Err(TimelineError::NotFound(_))) {
|
||||
bail!("timeline already exists, cannot overwrite it")
|
||||
}
|
||||
|
||||
// Move timeline dir to the correct location
|
||||
let timeline_path = get_timeline_dir(conf, &ttid);
|
||||
|
||||
info!(
|
||||
"moving timeline {} from {} to {}",
|
||||
ttid, tmp_path, timeline_path
|
||||
);
|
||||
tokio::fs::create_dir_all(get_tenant_dir(conf, &ttid.tenant_id)).await?;
|
||||
// fsync tenant dir creation
|
||||
fsync_async_opt(&conf.workdir, !conf.no_sync).await?;
|
||||
durable_rename(tmp_path, &timeline_path, !conf.no_sync).await?;
|
||||
|
||||
let tli = GlobalTimelines::load_timeline(&guard, ttid)
|
||||
.await
|
||||
.context("Failed to load timeline after copy")?;
|
||||
|
||||
info!(
|
||||
"loaded timeline {}, flush_lsn={}",
|
||||
ttid,
|
||||
tli.get_flush_lsn().await
|
||||
);
|
||||
|
||||
Ok(tli)
|
||||
}
|
||||
|
||||
@@ -339,8 +339,7 @@ impl<'a, IO: AsyncRead + AsyncWrite + Unpin> NetworkReader<'a, IO> {
|
||||
};
|
||||
let tli =
|
||||
GlobalTimelines::create(self.ttid, server_info, Lsn::INVALID, Lsn::INVALID)
|
||||
.await
|
||||
.context("create timeline")?;
|
||||
.await?;
|
||||
tli.wal_residence_guard().await?
|
||||
}
|
||||
_ => {
|
||||
|
||||
@@ -3,7 +3,7 @@
|
||||
|
||||
use std::{cmp::max, ops::Deref};
|
||||
|
||||
use anyhow::{bail, Result};
|
||||
use anyhow::Result;
|
||||
use safekeeper_api::models::TimelineTermBumpResponse;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use utils::{
|
||||
@@ -13,11 +13,7 @@ use utils::{
|
||||
|
||||
use crate::{
|
||||
control_file,
|
||||
safekeeper::{
|
||||
AcceptorState, PersistedPeerInfo, PgUuid, ServerInfo, Term, TermHistory,
|
||||
UNKNOWN_SERVER_VERSION,
|
||||
},
|
||||
timeline::TimelineError,
|
||||
safekeeper::{AcceptorState, PersistedPeerInfo, PgUuid, ServerInfo, Term, TermHistory},
|
||||
wal_backup_partial::{self},
|
||||
};
|
||||
|
||||
@@ -95,24 +91,8 @@ impl TimelinePersistentState {
|
||||
peers: Vec<NodeId>,
|
||||
commit_lsn: Lsn,
|
||||
local_start_lsn: Lsn,
|
||||
) -> anyhow::Result<TimelinePersistentState> {
|
||||
if server_info.wal_seg_size == 0 {
|
||||
bail!(TimelineError::UninitializedWalSegSize(*ttid));
|
||||
}
|
||||
|
||||
if server_info.pg_version == UNKNOWN_SERVER_VERSION {
|
||||
bail!(TimelineError::UninitialinzedPgVersion(*ttid));
|
||||
}
|
||||
|
||||
if commit_lsn < local_start_lsn {
|
||||
bail!(
|
||||
"commit_lsn {} is smaller than local_start_lsn {}",
|
||||
commit_lsn,
|
||||
local_start_lsn
|
||||
);
|
||||
}
|
||||
|
||||
Ok(TimelinePersistentState {
|
||||
) -> TimelinePersistentState {
|
||||
TimelinePersistentState {
|
||||
tenant_id: ttid.tenant_id,
|
||||
timeline_id: ttid.timeline_id,
|
||||
acceptor_state: AcceptorState {
|
||||
@@ -135,23 +115,24 @@ impl TimelinePersistentState {
|
||||
),
|
||||
partial_backup: wal_backup_partial::State::default(),
|
||||
eviction_state: EvictionState::Present,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
pub fn empty() -> Self {
|
||||
use crate::safekeeper::UNKNOWN_SERVER_VERSION;
|
||||
|
||||
TimelinePersistentState::new(
|
||||
&TenantTimelineId::empty(),
|
||||
ServerInfo {
|
||||
pg_version: 17, /* Postgres server version */
|
||||
system_id: 0, /* Postgres system identifier */
|
||||
wal_seg_size: 16 * 1024 * 1024,
|
||||
pg_version: UNKNOWN_SERVER_VERSION, /* Postgres server version */
|
||||
system_id: 0, /* Postgres system identifier */
|
||||
wal_seg_size: 0,
|
||||
},
|
||||
vec![],
|
||||
Lsn::INVALID,
|
||||
Lsn::INVALID,
|
||||
)
|
||||
.unwrap()
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -27,11 +27,11 @@ use utils::{
|
||||
use storage_broker::proto::SafekeeperTimelineInfo;
|
||||
use storage_broker::proto::TenantTimelineId as ProtoTenantTimelineId;
|
||||
|
||||
use crate::control_file;
|
||||
use crate::rate_limit::RateLimiter;
|
||||
use crate::receive_wal::WalReceivers;
|
||||
use crate::safekeeper::{
|
||||
AcceptorProposerMessage, ProposerAcceptorMessage, SafeKeeper, Term, TermLsn,
|
||||
AcceptorProposerMessage, ProposerAcceptorMessage, SafeKeeper, ServerInfo, Term, TermLsn,
|
||||
INVALID_TERM,
|
||||
};
|
||||
use crate::send_wal::WalSenders;
|
||||
use crate::state::{EvictionState, TimelineMemState, TimelinePersistentState, TimelineState};
|
||||
@@ -40,6 +40,7 @@ use crate::timeline_manager::{AtomicStatus, ManagerCtl};
|
||||
use crate::timelines_set::TimelinesSet;
|
||||
use crate::wal_backup::{self, remote_timeline_path};
|
||||
use crate::wal_backup_partial::PartialRemoteSegment;
|
||||
use crate::{control_file, safekeeper::UNKNOWN_SERVER_VERSION};
|
||||
|
||||
use crate::metrics::{FullTimelineInfo, WalStorageMetrics, MISC_OPERATION_SECONDS};
|
||||
use crate::wal_storage::{Storage as wal_storage_iface, WalReader};
|
||||
@@ -325,6 +326,44 @@ pub struct SharedState {
|
||||
}
|
||||
|
||||
impl SharedState {
|
||||
/// Initialize fresh timeline state without persisting anything to disk.
|
||||
fn create_new(
|
||||
conf: &SafeKeeperConf,
|
||||
ttid: &TenantTimelineId,
|
||||
state: TimelinePersistentState,
|
||||
) -> Result<Self> {
|
||||
if state.server.wal_seg_size == 0 {
|
||||
bail!(TimelineError::UninitializedWalSegSize(*ttid));
|
||||
}
|
||||
|
||||
if state.server.pg_version == UNKNOWN_SERVER_VERSION {
|
||||
bail!(TimelineError::UninitialinzedPgVersion(*ttid));
|
||||
}
|
||||
|
||||
if state.commit_lsn < state.local_start_lsn {
|
||||
bail!(
|
||||
"commit_lsn {} is higher than local_start_lsn {}",
|
||||
state.commit_lsn,
|
||||
state.local_start_lsn
|
||||
);
|
||||
}
|
||||
|
||||
// We don't want to write anything to disk, because we may have existing timeline there.
|
||||
// These functions should not change anything on disk.
|
||||
let timeline_dir = get_timeline_dir(conf, ttid);
|
||||
let control_store =
|
||||
control_file::FileStorage::create_new(timeline_dir.clone(), conf, state)?;
|
||||
let wal_store =
|
||||
wal_storage::PhysicalStorage::new(ttid, timeline_dir, conf, &control_store)?;
|
||||
let sk = SafeKeeper::new(TimelineState::new(control_store), wal_store, conf.my_id)?;
|
||||
|
||||
Ok(Self {
|
||||
sk: StateSK::Loaded(sk),
|
||||
peers_info: PeersInfo(vec![]),
|
||||
wal_removal_on_hold: false,
|
||||
})
|
||||
}
|
||||
|
||||
/// Restore SharedState from control file. If file doesn't exist, bails out.
|
||||
fn restore(conf: &SafeKeeperConf, ttid: &TenantTimelineId) -> Result<Self> {
|
||||
let timeline_dir = get_timeline_dir(conf, ttid);
|
||||
@@ -411,8 +450,6 @@ pub enum TimelineError {
|
||||
Cancelled(TenantTimelineId),
|
||||
#[error("Timeline {0} was not found in global map")]
|
||||
NotFound(TenantTimelineId),
|
||||
#[error("Timeline {0} creation is in progress")]
|
||||
CreationInProgress(TenantTimelineId),
|
||||
#[error("Timeline {0} exists on disk, but wasn't loaded on startup")]
|
||||
Invalid(TenantTimelineId),
|
||||
#[error("Timeline {0} is already exists")]
|
||||
@@ -477,7 +514,7 @@ pub struct Timeline {
|
||||
|
||||
impl Timeline {
|
||||
/// Load existing timeline from disk.
|
||||
pub fn load_timeline(conf: &SafeKeeperConf, ttid: TenantTimelineId) -> Result<Arc<Timeline>> {
|
||||
pub fn load_timeline(conf: &SafeKeeperConf, ttid: TenantTimelineId) -> Result<Timeline> {
|
||||
let _enter = info_span!("load_timeline", timeline = %ttid.timeline_id).entered();
|
||||
|
||||
let shared_state = SharedState::restore(conf, &ttid)?;
|
||||
@@ -491,7 +528,7 @@ impl Timeline {
|
||||
|
||||
let walreceivers = WalReceivers::new();
|
||||
let remote_path = remote_timeline_path(&ttid)?;
|
||||
Ok(Arc::new(Timeline {
|
||||
Ok(Timeline {
|
||||
ttid,
|
||||
remote_path,
|
||||
commit_lsn_watch_tx,
|
||||
@@ -510,7 +547,47 @@ impl Timeline {
|
||||
wal_backup_active: AtomicBool::new(false),
|
||||
last_removed_segno: AtomicU64::new(0),
|
||||
mgr_status: AtomicStatus::new(),
|
||||
}))
|
||||
})
|
||||
}
|
||||
|
||||
/// Create a new timeline, which is not yet persisted to disk.
|
||||
pub fn create_empty(
|
||||
conf: &SafeKeeperConf,
|
||||
ttid: TenantTimelineId,
|
||||
server_info: ServerInfo,
|
||||
commit_lsn: Lsn,
|
||||
local_start_lsn: Lsn,
|
||||
) -> Result<Timeline> {
|
||||
let (commit_lsn_watch_tx, commit_lsn_watch_rx) = watch::channel(Lsn::INVALID);
|
||||
let (term_flush_lsn_watch_tx, term_flush_lsn_watch_rx) =
|
||||
watch::channel(TermLsn::from((INVALID_TERM, Lsn::INVALID)));
|
||||
let (shared_state_version_tx, shared_state_version_rx) = watch::channel(0);
|
||||
|
||||
let state =
|
||||
TimelinePersistentState::new(&ttid, server_info, vec![], commit_lsn, local_start_lsn);
|
||||
|
||||
let walreceivers = WalReceivers::new();
|
||||
let remote_path = remote_timeline_path(&ttid)?;
|
||||
Ok(Timeline {
|
||||
ttid,
|
||||
remote_path,
|
||||
commit_lsn_watch_tx,
|
||||
commit_lsn_watch_rx,
|
||||
term_flush_lsn_watch_tx,
|
||||
term_flush_lsn_watch_rx,
|
||||
shared_state_version_tx,
|
||||
shared_state_version_rx,
|
||||
mutex: RwLock::new(SharedState::create_new(conf, &ttid, state)?),
|
||||
walsenders: WalSenders::new(walreceivers.clone()),
|
||||
walreceivers,
|
||||
cancel: CancellationToken::default(),
|
||||
timeline_dir: get_timeline_dir(conf, &ttid),
|
||||
manager_ctl: ManagerCtl::new(),
|
||||
broker_active: AtomicBool::new(false),
|
||||
wal_backup_active: AtomicBool::new(false),
|
||||
last_removed_segno: AtomicU64::new(0),
|
||||
mgr_status: AtomicStatus::new(),
|
||||
})
|
||||
}
|
||||
|
||||
/// Initialize fresh timeline on disk and start background tasks. If init
|
||||
|
||||
@@ -5,14 +5,11 @@
|
||||
use crate::defaults::DEFAULT_EVICTION_CONCURRENCY;
|
||||
use crate::rate_limit::RateLimiter;
|
||||
use crate::safekeeper::ServerInfo;
|
||||
use crate::state::TimelinePersistentState;
|
||||
use crate::timeline::{get_tenant_dir, get_timeline_dir, Timeline, TimelineError};
|
||||
use crate::timelines_set::TimelinesSet;
|
||||
use crate::wal_storage::Storage;
|
||||
use crate::{control_file, wal_storage, SafeKeeperConf};
|
||||
use crate::SafeKeeperConf;
|
||||
use anyhow::{bail, Context, Result};
|
||||
use camino::Utf8PathBuf;
|
||||
use camino_tempfile::Utf8TempDir;
|
||||
use once_cell::sync::Lazy;
|
||||
use serde::Serialize;
|
||||
use std::collections::HashMap;
|
||||
@@ -20,22 +17,12 @@ use std::str::FromStr;
|
||||
use std::sync::atomic::Ordering;
|
||||
use std::sync::{Arc, Mutex};
|
||||
use std::time::{Duration, Instant};
|
||||
use tokio::fs;
|
||||
use tracing::*;
|
||||
use utils::crashsafe::{durable_rename, fsync_async_opt};
|
||||
use utils::id::{TenantId, TenantTimelineId, TimelineId};
|
||||
use utils::lsn::Lsn;
|
||||
|
||||
// Timeline entry in the global map: either a ready timeline, or mark that it is
|
||||
// being created.
|
||||
#[derive(Clone)]
|
||||
enum GlobalMapTimeline {
|
||||
CreationInProgress,
|
||||
Timeline(Arc<Timeline>),
|
||||
}
|
||||
|
||||
struct GlobalTimelinesState {
|
||||
timelines: HashMap<TenantTimelineId, GlobalMapTimeline>,
|
||||
timelines: HashMap<TenantTimelineId, Arc<Timeline>>,
|
||||
|
||||
// A tombstone indicates this timeline used to exist has been deleted. These are used to prevent
|
||||
// on-demand timeline creation from recreating deleted timelines. This is only soft-enforced, as
|
||||
@@ -44,9 +31,13 @@ struct GlobalTimelinesState {
|
||||
|
||||
conf: Option<SafeKeeperConf>,
|
||||
broker_active_set: Arc<TimelinesSet>,
|
||||
load_lock: Arc<tokio::sync::Mutex<TimelineLoadLock>>,
|
||||
global_rate_limiter: RateLimiter,
|
||||
}
|
||||
|
||||
// Used to prevent concurrent timeline loading.
|
||||
pub struct TimelineLoadLock;
|
||||
|
||||
impl GlobalTimelinesState {
|
||||
/// Get configuration, which must be set once during init.
|
||||
fn get_conf(&self) -> &SafeKeeperConf {
|
||||
@@ -64,16 +55,22 @@ impl GlobalTimelinesState {
|
||||
)
|
||||
}
|
||||
|
||||
/// Get timeline from the map. Returns error if timeline doesn't exist or
|
||||
/// creation is in progress.
|
||||
fn get(&self, ttid: &TenantTimelineId) -> Result<Arc<Timeline>, TimelineError> {
|
||||
match self.timelines.get(ttid).cloned() {
|
||||
Some(GlobalMapTimeline::Timeline(tli)) => Ok(tli),
|
||||
Some(GlobalMapTimeline::CreationInProgress) => {
|
||||
Err(TimelineError::CreationInProgress(*ttid))
|
||||
}
|
||||
None => Err(TimelineError::NotFound(*ttid)),
|
||||
/// Insert timeline into the map. Returns error if timeline with the same id already exists.
|
||||
fn try_insert(&mut self, timeline: Arc<Timeline>) -> Result<()> {
|
||||
let ttid = timeline.ttid;
|
||||
if self.timelines.contains_key(&ttid) {
|
||||
bail!(TimelineError::AlreadyExists(ttid));
|
||||
}
|
||||
self.timelines.insert(ttid, timeline);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Get timeline from the map. Returns error if timeline doesn't exist.
|
||||
fn get(&self, ttid: &TenantTimelineId) -> Result<Arc<Timeline>, TimelineError> {
|
||||
self.timelines
|
||||
.get(ttid)
|
||||
.cloned()
|
||||
.ok_or(TimelineError::NotFound(*ttid))
|
||||
}
|
||||
|
||||
fn delete(&mut self, ttid: TenantTimelineId) {
|
||||
@@ -88,6 +85,7 @@ static TIMELINES_STATE: Lazy<Mutex<GlobalTimelinesState>> = Lazy::new(|| {
|
||||
tombstones: HashMap::new(),
|
||||
conf: None,
|
||||
broker_active_set: Arc::new(TimelinesSet::default()),
|
||||
load_lock: Arc::new(tokio::sync::Mutex::new(TimelineLoadLock)),
|
||||
global_rate_limiter: RateLimiter::new(1, 1),
|
||||
})
|
||||
});
|
||||
@@ -143,10 +141,11 @@ impl GlobalTimelines {
|
||||
/// Loads all timelines for the given tenant to memory. Returns fs::read_dir
|
||||
/// errors if any.
|
||||
///
|
||||
/// It is async, but TIMELINES_STATE lock is sync and there is no important
|
||||
/// reason to make it async (it is always held for a short while), so we
|
||||
/// just lock and unlock it for each timeline -- this function is called
|
||||
/// during init when nothing else is running, so this is fine.
|
||||
/// It is async for update_status_notify sake. Since TIMELINES_STATE lock is
|
||||
/// sync and there is no important reason to make it async (it is always
|
||||
/// held for a short while) we just lock and unlock it for each timeline --
|
||||
/// this function is called during init when nothing else is running, so
|
||||
/// this is fine.
|
||||
async fn load_tenant_timelines(tenant_id: TenantId) -> Result<()> {
|
||||
let (conf, broker_active_set, partial_backup_rate_limiter) = {
|
||||
let state = TIMELINES_STATE.lock().unwrap();
|
||||
@@ -164,13 +163,14 @@ impl GlobalTimelines {
|
||||
{
|
||||
let ttid = TenantTimelineId::new(tenant_id, timeline_id);
|
||||
match Timeline::load_timeline(&conf, ttid) {
|
||||
Ok(tli) => {
|
||||
Ok(timeline) => {
|
||||
let tli = Arc::new(timeline);
|
||||
let mut shared_state = tli.write_shared_state().await;
|
||||
TIMELINES_STATE
|
||||
.lock()
|
||||
.unwrap()
|
||||
.timelines
|
||||
.insert(ttid, GlobalMapTimeline::Timeline(tli.clone()));
|
||||
.insert(ttid, tli.clone());
|
||||
tli.bootstrap(
|
||||
&mut shared_state,
|
||||
&conf,
|
||||
@@ -199,6 +199,51 @@ impl GlobalTimelines {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Take a lock for timeline loading.
|
||||
pub async fn loading_lock() -> Arc<tokio::sync::Mutex<TimelineLoadLock>> {
|
||||
TIMELINES_STATE.lock().unwrap().load_lock.clone()
|
||||
}
|
||||
|
||||
/// Load timeline from disk to the memory.
|
||||
pub async fn load_timeline<'a>(
|
||||
_guard: &tokio::sync::MutexGuard<'a, TimelineLoadLock>,
|
||||
ttid: TenantTimelineId,
|
||||
) -> Result<Arc<Timeline>> {
|
||||
let (conf, broker_active_set, partial_backup_rate_limiter) =
|
||||
TIMELINES_STATE.lock().unwrap().get_dependencies();
|
||||
|
||||
match Timeline::load_timeline(&conf, ttid) {
|
||||
Ok(timeline) => {
|
||||
let tli = Arc::new(timeline);
|
||||
let mut shared_state = tli.write_shared_state().await;
|
||||
|
||||
// TODO: prevent concurrent timeline creation/loading
|
||||
{
|
||||
let mut state = TIMELINES_STATE.lock().unwrap();
|
||||
|
||||
// We may be have been asked to load a timeline that was previously deleted (e.g. from `pull_timeline.rs`). We trust
|
||||
// that the human doing this manual intervention knows what they are doing, and remove its tombstone.
|
||||
if state.tombstones.remove(&ttid).is_some() {
|
||||
warn!("Un-deleted timeline {ttid}");
|
||||
}
|
||||
|
||||
state.timelines.insert(ttid, tli.clone());
|
||||
}
|
||||
|
||||
tli.bootstrap(
|
||||
&mut shared_state,
|
||||
&conf,
|
||||
broker_active_set,
|
||||
partial_backup_rate_limiter,
|
||||
);
|
||||
drop(shared_state);
|
||||
Ok(tli)
|
||||
}
|
||||
// If we can't load a timeline, it's bad. Caller will figure it out.
|
||||
Err(e) => bail!("failed to load timeline {}, reason: {:?}", ttid, e),
|
||||
}
|
||||
}
|
||||
|
||||
/// Get the number of timelines in the map.
|
||||
pub fn timelines_count() -> usize {
|
||||
TIMELINES_STATE.lock().unwrap().timelines.len()
|
||||
@@ -221,7 +266,7 @@ impl GlobalTimelines {
|
||||
commit_lsn: Lsn,
|
||||
local_start_lsn: Lsn,
|
||||
) -> Result<Arc<Timeline>> {
|
||||
let (conf, _, _) = {
|
||||
let (conf, broker_active_set, partial_backup_rate_limiter) = {
|
||||
let state = TIMELINES_STATE.lock().unwrap();
|
||||
if let Ok(timeline) = state.get(&ttid) {
|
||||
// Timeline already exists, return it.
|
||||
@@ -237,146 +282,55 @@ impl GlobalTimelines {
|
||||
|
||||
info!("creating new timeline {}", ttid);
|
||||
|
||||
// Do on disk initialization in tmp dir.
|
||||
let (_tmp_dir, tmp_dir_path) = create_temp_timeline_dir(&conf, ttid).await?;
|
||||
let timeline = Arc::new(Timeline::create_empty(
|
||||
&conf,
|
||||
ttid,
|
||||
server_info,
|
||||
commit_lsn,
|
||||
local_start_lsn,
|
||||
)?);
|
||||
|
||||
// TODO: currently we create only cfile. It would be reasonable to
|
||||
// immediately initialize first WAL segment as well.
|
||||
let state =
|
||||
TimelinePersistentState::new(&ttid, server_info, vec![], commit_lsn, local_start_lsn)?;
|
||||
control_file::FileStorage::create_new(tmp_dir_path.clone(), &conf, state).await?;
|
||||
let timeline = GlobalTimelines::load_temp_timeline(ttid, &tmp_dir_path, true).await?;
|
||||
Ok(timeline)
|
||||
}
|
||||
// Take a lock and finish the initialization holding this mutex. No other threads
|
||||
// can interfere with creation after we will insert timeline into the map.
|
||||
{
|
||||
let mut shared_state = timeline.write_shared_state().await;
|
||||
|
||||
/// Move timeline from a temp directory to the main storage, and load it to
|
||||
/// the global map. Creating timeline in this way ensures atomicity: rename
|
||||
/// is atomic, so either move of the whole datadir succeeds or it doesn't,
|
||||
/// but corrupted data dir shouldn't be possible.
|
||||
///
|
||||
/// We'd like to avoid holding map lock while doing IO, so it's a 3 step
|
||||
/// process:
|
||||
/// 1) check the global map that timeline doesn't exist and mark that we're
|
||||
/// creating it;
|
||||
/// 2) move the directory and load the timeline
|
||||
/// 3) take lock again and insert the timeline into the global map.
|
||||
pub async fn load_temp_timeline(
|
||||
ttid: TenantTimelineId,
|
||||
tmp_path: &Utf8PathBuf,
|
||||
check_tombstone: bool,
|
||||
) -> Result<Arc<Timeline>> {
|
||||
// Check for existence and mark that we're creating it.
|
||||
let (conf, broker_active_set, partial_backup_rate_limiter) = {
|
||||
let mut state = TIMELINES_STATE.lock().unwrap();
|
||||
match state.timelines.get(&ttid) {
|
||||
Some(GlobalMapTimeline::CreationInProgress) => {
|
||||
bail!(TimelineError::CreationInProgress(ttid));
|
||||
}
|
||||
Some(GlobalMapTimeline::Timeline(_)) => {
|
||||
bail!(TimelineError::AlreadyExists(ttid));
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
if check_tombstone {
|
||||
if state.tombstones.contains_key(&ttid) {
|
||||
anyhow::bail!("timeline {ttid} is deleted, refusing to recreate");
|
||||
}
|
||||
} else {
|
||||
// We may be have been asked to load a timeline that was previously deleted (e.g. from `pull_timeline.rs`). We trust
|
||||
// that the human doing this manual intervention knows what they are doing, and remove its tombstone.
|
||||
if state.tombstones.remove(&ttid).is_some() {
|
||||
warn!("un-deleted timeline {ttid}");
|
||||
}
|
||||
}
|
||||
state
|
||||
.timelines
|
||||
.insert(ttid, GlobalMapTimeline::CreationInProgress);
|
||||
state.get_dependencies()
|
||||
};
|
||||
// We can get a race condition here in case of concurrent create calls, but only
|
||||
// in theory. create() will return valid timeline on the next try.
|
||||
TIMELINES_STATE
|
||||
.lock()
|
||||
.unwrap()
|
||||
.try_insert(timeline.clone())?;
|
||||
|
||||
// Do the actual move and reflect the result in the map.
|
||||
match GlobalTimelines::install_temp_timeline(ttid, tmp_path, &conf).await {
|
||||
Ok(timeline) => {
|
||||
let mut timeline_shared_state = timeline.write_shared_state().await;
|
||||
let mut state = TIMELINES_STATE.lock().unwrap();
|
||||
assert!(matches!(
|
||||
state.timelines.get(&ttid),
|
||||
Some(GlobalMapTimeline::CreationInProgress)
|
||||
));
|
||||
|
||||
state
|
||||
.timelines
|
||||
.insert(ttid, GlobalMapTimeline::Timeline(timeline.clone()));
|
||||
drop(state);
|
||||
timeline.bootstrap(
|
||||
&mut timeline_shared_state,
|
||||
// Write the new timeline to the disk and start background workers.
|
||||
// Bootstrap is transactional, so if it fails, the timeline will be deleted,
|
||||
// and the state on disk should remain unchanged.
|
||||
if let Err(e) = timeline
|
||||
.init_new(
|
||||
&mut shared_state,
|
||||
&conf,
|
||||
broker_active_set,
|
||||
partial_backup_rate_limiter,
|
||||
);
|
||||
drop(timeline_shared_state);
|
||||
Ok(timeline)
|
||||
}
|
||||
Err(e) => {
|
||||
// Init failed, remove the marker from the map
|
||||
let mut state = TIMELINES_STATE.lock().unwrap();
|
||||
assert!(matches!(
|
||||
state.timelines.get(&ttid),
|
||||
Some(GlobalMapTimeline::CreationInProgress)
|
||||
));
|
||||
state.timelines.remove(&ttid);
|
||||
Err(e)
|
||||
)
|
||||
.await
|
||||
{
|
||||
// Note: the most likely reason for init failure is that the timeline
|
||||
// directory already exists on disk. This happens when timeline is corrupted
|
||||
// and wasn't loaded from disk on startup because of that. We want to preserve
|
||||
// the timeline directory in this case, for further inspection.
|
||||
|
||||
// TODO: this is an unusual error, perhaps we should send it to sentry
|
||||
// TODO: compute will try to create timeline every second, we should add backoff
|
||||
error!("failed to init new timeline {}: {}", ttid, e);
|
||||
|
||||
// Timeline failed to init, it cannot be used. Remove it from the map.
|
||||
TIMELINES_STATE.lock().unwrap().timelines.remove(&ttid);
|
||||
return Err(e);
|
||||
}
|
||||
// We are done with bootstrap, release the lock, return the timeline.
|
||||
// {} block forces release before .await
|
||||
}
|
||||
}
|
||||
|
||||
/// Main part of load_temp_timeline: do the move and load.
|
||||
async fn install_temp_timeline(
|
||||
ttid: TenantTimelineId,
|
||||
tmp_path: &Utf8PathBuf,
|
||||
conf: &SafeKeeperConf,
|
||||
) -> Result<Arc<Timeline>> {
|
||||
let tenant_path = get_tenant_dir(conf, &ttid.tenant_id);
|
||||
let timeline_path = get_timeline_dir(conf, &ttid);
|
||||
|
||||
// We must have already checked that timeline doesn't exist in the map,
|
||||
// but there might be existing datadir: if timeline is corrupted it is
|
||||
// not loaded. We don't want to overwrite such a dir, so check for its
|
||||
// existence.
|
||||
match fs::metadata(&timeline_path).await {
|
||||
Ok(_) => {
|
||||
// Timeline directory exists on disk, we should leave state unchanged
|
||||
// and return error.
|
||||
bail!(TimelineError::Invalid(ttid));
|
||||
}
|
||||
Err(e) if e.kind() == std::io::ErrorKind::NotFound => {}
|
||||
Err(e) => {
|
||||
return Err(e.into());
|
||||
}
|
||||
}
|
||||
|
||||
info!(
|
||||
"moving timeline {} from {} to {}",
|
||||
ttid, tmp_path, timeline_path
|
||||
);
|
||||
|
||||
// Now it is safe to move the timeline directory to the correct
|
||||
// location. First, create tenant directory. Ignore error if it already
|
||||
// exists.
|
||||
if let Err(e) = tokio::fs::create_dir(&tenant_path).await {
|
||||
if e.kind() != std::io::ErrorKind::AlreadyExists {
|
||||
return Err(e.into());
|
||||
}
|
||||
}
|
||||
// fsync it
|
||||
fsync_async_opt(&tenant_path, !conf.no_sync).await?;
|
||||
// and its creation
|
||||
fsync_async_opt(&conf.workdir, !conf.no_sync).await?;
|
||||
|
||||
// Do the move.
|
||||
durable_rename(tmp_path, &timeline_path, !conf.no_sync).await?;
|
||||
|
||||
Timeline::load_timeline(conf, ttid)
|
||||
Ok(timeline)
|
||||
}
|
||||
|
||||
/// Get a timeline from the global map. If it's not present, it doesn't exist on disk,
|
||||
@@ -404,16 +358,8 @@ impl GlobalTimelines {
|
||||
global_lock
|
||||
.timelines
|
||||
.values()
|
||||
.filter_map(|t| match t {
|
||||
GlobalMapTimeline::Timeline(t) => {
|
||||
if t.is_cancelled() {
|
||||
None
|
||||
} else {
|
||||
Some(t.clone())
|
||||
}
|
||||
}
|
||||
_ => None,
|
||||
})
|
||||
.filter(|t| !t.is_cancelled())
|
||||
.cloned()
|
||||
.collect()
|
||||
}
|
||||
|
||||
@@ -424,11 +370,8 @@ impl GlobalTimelines {
|
||||
global_lock
|
||||
.timelines
|
||||
.values()
|
||||
.filter_map(|t| match t {
|
||||
GlobalMapTimeline::Timeline(t) => Some(t.clone()),
|
||||
_ => None,
|
||||
})
|
||||
.filter(|t| t.ttid.tenant_id == tenant_id)
|
||||
.cloned()
|
||||
.collect()
|
||||
}
|
||||
|
||||
@@ -561,45 +504,3 @@ fn delete_dir(path: Utf8PathBuf) -> Result<bool> {
|
||||
Err(e) => Err(e.into()),
|
||||
}
|
||||
}
|
||||
|
||||
/// Create temp directory for a new timeline. It needs to be located on the same
|
||||
/// filesystem as the rest of the timelines. It will be automatically deleted when
|
||||
/// Utf8TempDir goes out of scope.
|
||||
pub async fn create_temp_timeline_dir(
|
||||
conf: &SafeKeeperConf,
|
||||
ttid: TenantTimelineId,
|
||||
) -> Result<(Utf8TempDir, Utf8PathBuf)> {
|
||||
let temp_base = conf.workdir.join("tmp");
|
||||
|
||||
tokio::fs::create_dir_all(&temp_base).await?;
|
||||
|
||||
let tli_dir = camino_tempfile::Builder::new()
|
||||
.suffix("_temptli")
|
||||
.prefix(&format!("{}_{}_", ttid.tenant_id, ttid.timeline_id))
|
||||
.tempdir_in(temp_base)?;
|
||||
|
||||
let tli_dir_path = tli_dir.path().to_path_buf();
|
||||
|
||||
Ok((tli_dir, tli_dir_path))
|
||||
}
|
||||
|
||||
/// Do basic validation of a temp timeline, before moving it to the global map.
|
||||
pub async fn validate_temp_timeline(
|
||||
conf: &SafeKeeperConf,
|
||||
ttid: TenantTimelineId,
|
||||
path: &Utf8PathBuf,
|
||||
) -> Result<(Lsn, Lsn)> {
|
||||
let control_path = path.join("safekeeper.control");
|
||||
|
||||
let control_store = control_file::FileStorage::load_control_file(control_path)?;
|
||||
if control_store.server.wal_seg_size == 0 {
|
||||
bail!("wal_seg_size is not set");
|
||||
}
|
||||
|
||||
let wal_store = wal_storage::PhysicalStorage::new(&ttid, path.clone(), conf, &control_store)?;
|
||||
|
||||
let commit_lsn = control_store.commit_lsn;
|
||||
let flush_lsn = wal_store.flush_lsn();
|
||||
|
||||
Ok((commit_lsn, flush_lsn))
|
||||
}
|
||||
|
||||
@@ -186,14 +186,8 @@ impl PhysicalStorage {
|
||||
"initialized storage for timeline {}, flush_lsn={}, commit_lsn={}, peer_horizon_lsn={}",
|
||||
ttid.timeline_id, flush_lsn, state.commit_lsn, state.peer_horizon_lsn,
|
||||
);
|
||||
if flush_lsn < state.commit_lsn {
|
||||
bail!("timeline {} potential data loss: flush_lsn {} by find_end_of_wal is less than commit_lsn {} from control file", ttid.timeline_id, flush_lsn, state.commit_lsn);
|
||||
}
|
||||
if flush_lsn < state.peer_horizon_lsn {
|
||||
warn!(
|
||||
"timeline {}: flush_lsn {} is less than cfile peer_horizon_lsn {}",
|
||||
ttid.timeline_id, flush_lsn, state.peer_horizon_lsn
|
||||
);
|
||||
if flush_lsn < state.commit_lsn || flush_lsn < state.peer_horizon_lsn {
|
||||
warn!("timeline {} potential data loss: flush_lsn by find_end_of_wal is less than either commit_lsn or peer_horizon_lsn from control file", ttid.timeline_id);
|
||||
}
|
||||
|
||||
Ok(PhysicalStorage {
|
||||
|
||||
@@ -59,7 +59,7 @@ impl GlobalMap {
|
||||
|
||||
if state.commit_lsn < state.local_start_lsn {
|
||||
bail!(
|
||||
"commit_lsn {} is smaller than local_start_lsn {}",
|
||||
"commit_lsn {} is higher than local_start_lsn {}",
|
||||
state.commit_lsn,
|
||||
state.local_start_lsn
|
||||
);
|
||||
@@ -96,7 +96,23 @@ impl GlobalMap {
|
||||
let local_start_lsn = Lsn::INVALID;
|
||||
|
||||
let state =
|
||||
TimelinePersistentState::new(&ttid, server_info, vec![], commit_lsn, local_start_lsn)?;
|
||||
TimelinePersistentState::new(&ttid, server_info, vec![], commit_lsn, local_start_lsn);
|
||||
|
||||
if state.server.wal_seg_size == 0 {
|
||||
bail!(TimelineError::UninitializedWalSegSize(ttid));
|
||||
}
|
||||
|
||||
if state.server.pg_version == UNKNOWN_SERVER_VERSION {
|
||||
bail!(TimelineError::UninitialinzedPgVersion(ttid));
|
||||
}
|
||||
|
||||
if state.commit_lsn < state.local_start_lsn {
|
||||
bail!(
|
||||
"commit_lsn {} is higher than local_start_lsn {}",
|
||||
state.commit_lsn,
|
||||
state.local_start_lsn
|
||||
);
|
||||
}
|
||||
|
||||
let disk_timeline = self.disk.put_state(&ttid, state);
|
||||
let control_store = DiskStateStorage::new(disk_timeline.clone());
|
||||
|
||||
Reference in New Issue
Block a user