mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-17 13:10:38 +00:00
Compare commits
29 Commits
problame/e
...
skyzh/rm-p
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
f6b02a4b43 | ||
|
|
1863ae799d | ||
|
|
20fe57d93b | ||
|
|
0fad5e21ce | ||
|
|
a2056666ae | ||
|
|
a3909e03f8 | ||
|
|
fc190a2a19 | ||
|
|
faee3152f3 | ||
|
|
3693d1f431 | ||
|
|
fdf7a67ed2 | ||
|
|
1299df87d2 | ||
|
|
754ceaefac | ||
|
|
143fa0da42 | ||
|
|
4936ab6842 | ||
|
|
939593d0d3 | ||
|
|
2011cc05cd | ||
|
|
b0286e3c46 | ||
|
|
e4f05ce0a2 | ||
|
|
8d106708d7 | ||
|
|
f450369b20 | ||
|
|
aad918fb56 | ||
|
|
86dd8c96d3 | ||
|
|
6a65c4a4fe | ||
|
|
e9072ee178 | ||
|
|
7e17979d7a | ||
|
|
227271ccad | ||
|
|
fbf0367e27 | ||
|
|
a21b55fe0b | ||
|
|
add51e1372 |
7
Cargo.lock
generated
7
Cargo.lock
generated
@@ -110,6 +110,12 @@ dependencies = [
|
||||
"backtrace",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "arc-swap"
|
||||
version = "1.6.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "bddcadddf5e9015d310179a59bb28c4d4b9920ad0f11e8e14dbadf654890c9a6"
|
||||
|
||||
[[package]]
|
||||
name = "archery"
|
||||
version = "0.5.0"
|
||||
@@ -2542,6 +2548,7 @@ name = "pageserver"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"arc-swap",
|
||||
"async-stream",
|
||||
"async-trait",
|
||||
"byteorder",
|
||||
|
||||
@@ -32,6 +32,7 @@ license = "Apache-2.0"
|
||||
## All dependency versions, used in the project
|
||||
[workspace.dependencies]
|
||||
anyhow = { version = "1.0", features = ["backtrace"] }
|
||||
arc-swap = "1.6"
|
||||
async-stream = "0.3"
|
||||
async-trait = "0.1"
|
||||
atty = "0.2.14"
|
||||
|
||||
@@ -67,7 +67,7 @@ RUN apt update && \
|
||||
RUN wget https://gitlab.com/Oslandia/SFCGAL/-/archive/v1.3.10/SFCGAL-v1.3.10.tar.gz -O SFCGAL.tar.gz && \
|
||||
echo "4e39b3b2adada6254a7bdba6d297bb28e1a9835a9f879b74f37e2dab70203232 SFCGAL.tar.gz" | sha256sum --check && \
|
||||
mkdir sfcgal-src && cd sfcgal-src && tar xvzf ../SFCGAL.tar.gz --strip-components=1 -C . && \
|
||||
cmake . && make -j $(getconf _NPROCESSORS_ONLN) && \
|
||||
cmake -DCMAKE_BUILD_TYPE=Release . && make -j $(getconf _NPROCESSORS_ONLN) && \
|
||||
DESTDIR=/sfcgal make install -j $(getconf _NPROCESSORS_ONLN) && \
|
||||
make clean && cp -R /sfcgal/* /
|
||||
|
||||
@@ -95,7 +95,7 @@ RUN wget https://github.com/pgRouting/pgrouting/archive/v3.4.2.tar.gz -O pgrouti
|
||||
mkdir pgrouting-src && cd pgrouting-src && tar xvzf ../pgrouting.tar.gz --strip-components=1 -C . && \
|
||||
mkdir build && \
|
||||
cd build && \
|
||||
cmake .. && \
|
||||
cmake -DCMAKE_BUILD_TYPE=Release .. && \
|
||||
make -j $(getconf _NPROCESSORS_ONLN) && \
|
||||
make -j $(getconf _NPROCESSORS_ONLN) install && \
|
||||
echo 'trusted = true' >> /usr/local/pgsql/share/extension/pgrouting.control
|
||||
@@ -355,7 +355,7 @@ RUN apt-get update && \
|
||||
wget https://github.com/timescale/timescaledb/archive/refs/tags/2.10.1.tar.gz -O timescaledb.tar.gz && \
|
||||
echo "6fca72a6ed0f6d32d2b3523951ede73dc5f9b0077b38450a029a5f411fdb8c73 timescaledb.tar.gz" | sha256sum --check && \
|
||||
mkdir timescaledb-src && cd timescaledb-src && tar xvzf ../timescaledb.tar.gz --strip-components=1 -C . && \
|
||||
./bootstrap -DSEND_TELEMETRY_DEFAULT:BOOL=OFF -DUSE_TELEMETRY:BOOL=OFF -DAPACHE_ONLY:BOOL=ON && \
|
||||
./bootstrap -DSEND_TELEMETRY_DEFAULT:BOOL=OFF -DUSE_TELEMETRY:BOOL=OFF -DAPACHE_ONLY:BOOL=ON -DCMAKE_BUILD_TYPE=Release && \
|
||||
cd build && \
|
||||
make -j $(getconf _NPROCESSORS_ONLN) && \
|
||||
make install -j $(getconf _NPROCESSORS_ONLN) && \
|
||||
@@ -410,7 +410,7 @@ RUN apt-get update && \
|
||||
mkdir kq_imcx-src && cd kq_imcx-src && tar xvzf ../kq_imcx.tar.gz --strip-components=1 -C . && \
|
||||
mkdir build && \
|
||||
cd build && \
|
||||
cmake .. && \
|
||||
cmake -DCMAKE_BUILD_TYPE=Release .. && \
|
||||
make -j $(getconf _NPROCESSORS_ONLN) && \
|
||||
make -j $(getconf _NPROCESSORS_ONLN) install && \
|
||||
echo 'trusted = true' >> /usr/local/pgsql/share/extension/kq_imcx.control
|
||||
@@ -432,6 +432,54 @@ RUN wget https://github.com/citusdata/pg_cron/archive/refs/tags/v1.5.2.tar.gz -O
|
||||
make -j $(getconf _NPROCESSORS_ONLN) install && \
|
||||
echo 'trusted = true' >> /usr/local/pgsql/share/extension/pg_cron.control
|
||||
|
||||
#########################################################################################
|
||||
#
|
||||
# Layer "rdkit-pg-build"
|
||||
# compile rdkit extension
|
||||
#
|
||||
#########################################################################################
|
||||
FROM build-deps AS rdkit-pg-build
|
||||
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
|
||||
RUN apt-get update && \
|
||||
apt-get install -y \
|
||||
cmake \
|
||||
libboost-iostreams1.74-dev \
|
||||
libboost-regex1.74-dev \
|
||||
libboost-serialization1.74-dev \
|
||||
libboost-system1.74-dev \
|
||||
libeigen3-dev \
|
||||
libfreetype6-dev
|
||||
|
||||
ENV PATH "/usr/local/pgsql/bin/:/usr/local/pgsql/:$PATH"
|
||||
RUN wget https://github.com/rdkit/rdkit/archive/refs/tags/Release_2023_03_1.tar.gz -O rdkit.tar.gz && \
|
||||
echo "db346afbd0ba52c843926a2a62f8a38c7b774ffab37eaf382d789a824f21996c rdkit.tar.gz" | sha256sum --check && \
|
||||
mkdir rdkit-src && cd rdkit-src && tar xvzf ../rdkit.tar.gz --strip-components=1 -C . && \
|
||||
cmake \
|
||||
-D RDK_BUILD_CAIRO_SUPPORT=OFF \
|
||||
-D RDK_BUILD_INCHI_SUPPORT=ON \
|
||||
-D RDK_BUILD_AVALON_SUPPORT=ON \
|
||||
-D RDK_BUILD_PYTHON_WRAPPERS=OFF \
|
||||
-D RDK_BUILD_DESCRIPTORS3D=OFF \
|
||||
-D RDK_BUILD_FREESASA_SUPPORT=OFF \
|
||||
-D RDK_BUILD_COORDGEN_SUPPORT=ON \
|
||||
-D RDK_BUILD_MOLINTERCHANGE_SUPPORT=OFF \
|
||||
-D RDK_BUILD_YAEHMOP_SUPPORT=OFF \
|
||||
-D RDK_BUILD_STRUCTCHECKER_SUPPORT=OFF \
|
||||
-D RDK_USE_URF=OFF \
|
||||
-D RDK_BUILD_PGSQL=ON \
|
||||
-D RDK_PGSQL_STATIC=ON \
|
||||
-D PostgreSQL_CONFIG=pg_config \
|
||||
-D PostgreSQL_INCLUDE_DIR=`pg_config --includedir` \
|
||||
-D PostgreSQL_TYPE_INCLUDE_DIR=`pg_config --includedir-server` \
|
||||
-D PostgreSQL_LIBRARY_DIR=`pg_config --libdir` \
|
||||
-D RDK_INSTALL_INTREE=OFF \
|
||||
-D CMAKE_BUILD_TYPE=Release \
|
||||
. && \
|
||||
make -j $(getconf _NPROCESSORS_ONLN) && \
|
||||
make -j $(getconf _NPROCESSORS_ONLN) install && \
|
||||
echo 'trusted = true' >> /usr/local/pgsql/share/extension/rdkit.control
|
||||
|
||||
#########################################################################################
|
||||
#
|
||||
# Layer "rust extensions"
|
||||
@@ -564,6 +612,7 @@ COPY --from=pg-hint-plan-pg-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
COPY --from=kq-imcx-pg-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
COPY --from=pg-cron-pg-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
COPY --from=pg-pgx-ulid-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
COPY --from=rdkit-pg-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
COPY pgxn/ pgxn/
|
||||
|
||||
RUN make -j $(getconf _NPROCESSORS_ONLN) \
|
||||
@@ -637,14 +686,19 @@ COPY --from=compute-tools --chown=postgres /home/nonroot/target/release-line-deb
|
||||
# libgeos, libgdal, libsfcgal1, libproj and libprotobuf-c1 for PostGIS
|
||||
# libxml2, libxslt1.1 for xml2
|
||||
# libzstd1 for zstd
|
||||
# libboost*, libfreetype6, and zlib1g for rdkit
|
||||
RUN apt update && \
|
||||
apt install --no-install-recommends -y \
|
||||
gdb \
|
||||
locales \
|
||||
libicu67 \
|
||||
liblz4-1 \
|
||||
libreadline8 \
|
||||
libboost-iostreams1.74.0 \
|
||||
libboost-regex1.74.0 \
|
||||
libboost-serialization1.74.0 \
|
||||
libboost-system1.74.0 \
|
||||
libossp-uuid16 \
|
||||
libfreetype6 \
|
||||
libgeos-c1v5 \
|
||||
libgdal28 \
|
||||
libproj19 \
|
||||
@@ -654,7 +708,9 @@ RUN apt update && \
|
||||
libxslt1.1 \
|
||||
libzstd1 \
|
||||
libcurl4-openssl-dev \
|
||||
procps && \
|
||||
locales \
|
||||
procps \
|
||||
zlib1g && \
|
||||
rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/* && \
|
||||
localedef -i en_US -c -f UTF-8 -A /usr/share/locale/locale.alias en_US.UTF-8
|
||||
|
||||
|
||||
@@ -370,11 +370,6 @@ impl ComputeNode {
|
||||
// 'Close' connection
|
||||
drop(client);
|
||||
|
||||
info!(
|
||||
"finished configuration of compute for project {}",
|
||||
spec.cluster.cluster_id.as_deref().unwrap_or("None")
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -427,22 +422,22 @@ impl ComputeNode {
|
||||
#[instrument(skip(self))]
|
||||
pub fn start_compute(&self) -> Result<std::process::Child> {
|
||||
let compute_state = self.state.lock().unwrap().clone();
|
||||
let spec = compute_state.pspec.as_ref().expect("spec must be set");
|
||||
let pspec = compute_state.pspec.as_ref().expect("spec must be set");
|
||||
info!(
|
||||
"starting compute for project {}, operation {}, tenant {}, timeline {}",
|
||||
spec.spec.cluster.cluster_id.as_deref().unwrap_or("None"),
|
||||
spec.spec.operation_uuid.as_deref().unwrap_or("None"),
|
||||
spec.tenant_id,
|
||||
spec.timeline_id,
|
||||
pspec.spec.cluster.cluster_id.as_deref().unwrap_or("None"),
|
||||
pspec.spec.operation_uuid.as_deref().unwrap_or("None"),
|
||||
pspec.tenant_id,
|
||||
pspec.timeline_id,
|
||||
);
|
||||
|
||||
self.prepare_pgdata(&compute_state)?;
|
||||
|
||||
let start_time = Utc::now();
|
||||
|
||||
let pg = self.start_postgres(spec.storage_auth_token.clone())?;
|
||||
let pg = self.start_postgres(pspec.storage_auth_token.clone())?;
|
||||
|
||||
if spec.spec.mode == ComputeMode::Primary {
|
||||
if pspec.spec.mode == ComputeMode::Primary && !pspec.spec.skip_pg_catalog_updates {
|
||||
self.apply_config(&compute_state)?;
|
||||
}
|
||||
|
||||
@@ -462,6 +457,11 @@ impl ComputeNode {
|
||||
}
|
||||
self.set_status(ComputeStatus::Running);
|
||||
|
||||
info!(
|
||||
"finished configuration of compute for project {}",
|
||||
pspec.spec.cluster.cluster_id.as_deref().unwrap_or("None")
|
||||
);
|
||||
|
||||
Ok(pg)
|
||||
}
|
||||
|
||||
|
||||
@@ -450,6 +450,7 @@ impl Endpoint {
|
||||
|
||||
// Create spec file
|
||||
let spec = ComputeSpec {
|
||||
skip_pg_catalog_updates: false,
|
||||
format_version: 1.0,
|
||||
operation_uuid: None,
|
||||
cluster: Cluster {
|
||||
|
||||
@@ -27,6 +27,12 @@ pub struct ComputeSpec {
|
||||
pub cluster: Cluster,
|
||||
pub delta_operations: Option<Vec<DeltaOp>>,
|
||||
|
||||
/// An optinal hint that can be passed to speed up startup time if we know
|
||||
/// that no pg catalog mutations (like role creation, database creation,
|
||||
/// extension creation) need to be done on the actual database to start.
|
||||
#[serde(default)] // Default false
|
||||
pub skip_pg_catalog_updates: bool,
|
||||
|
||||
// Information needed to connect to the storage layer.
|
||||
//
|
||||
// `tenant_id`, `timeline_id` and `pageserver_connstring` are always needed.
|
||||
|
||||
@@ -111,6 +111,8 @@ pub trait RemoteStorage: Send + Sync + 'static {
|
||||
) -> Result<Download, DownloadError>;
|
||||
|
||||
async fn delete(&self, path: &RemotePath) -> anyhow::Result<()>;
|
||||
|
||||
async fn delete_objects<'a>(&self, paths: &'a [RemotePath]) -> anyhow::Result<()>;
|
||||
}
|
||||
|
||||
pub struct Download {
|
||||
@@ -223,6 +225,14 @@ impl GenericRemoteStorage {
|
||||
Self::Unreliable(s) => s.delete(path).await,
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn delete_objects<'a>(&self, paths: &'a [RemotePath]) -> anyhow::Result<()> {
|
||||
match self {
|
||||
Self::LocalFs(s) => s.delete_objects(paths).await,
|
||||
Self::AwsS3(s) => s.delete_objects(paths).await,
|
||||
Self::Unreliable(s) => s.delete_objects(paths).await,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl GenericRemoteStorage {
|
||||
|
||||
@@ -320,6 +320,13 @@ impl RemoteStorage for LocalFs {
|
||||
.await
|
||||
.map_err(|e| anyhow::anyhow!(e))?)
|
||||
}
|
||||
|
||||
async fn delete_objects<'a>(&self, paths: &'a [RemotePath]) -> anyhow::Result<()> {
|
||||
for path in paths {
|
||||
self.delete(path).await?
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
fn storage_metadata_path(original_path: &Path) -> PathBuf {
|
||||
|
||||
@@ -17,6 +17,7 @@ use aws_sdk_s3::{
|
||||
error::SdkError,
|
||||
operation::get_object::GetObjectError,
|
||||
primitives::ByteStream,
|
||||
types::{Delete, ObjectIdentifier},
|
||||
Client,
|
||||
};
|
||||
use aws_smithy_http::body::SdkBody;
|
||||
@@ -81,12 +82,24 @@ pub(super) mod metrics {
|
||||
.inc();
|
||||
}
|
||||
|
||||
pub fn inc_delete_objects(count: u64) {
|
||||
S3_REQUESTS_COUNT
|
||||
.with_label_values(&["delete_object"])
|
||||
.inc_by(count);
|
||||
}
|
||||
|
||||
pub fn inc_delete_object_fail() {
|
||||
S3_REQUESTS_FAIL_COUNT
|
||||
.with_label_values(&["delete_object"])
|
||||
.inc();
|
||||
}
|
||||
|
||||
pub fn inc_delete_objects_fail(count: u64) {
|
||||
S3_REQUESTS_FAIL_COUNT
|
||||
.with_label_values(&["delete_object"])
|
||||
.inc_by(count);
|
||||
}
|
||||
|
||||
pub fn inc_list_objects() {
|
||||
S3_REQUESTS_COUNT.with_label_values(&["list_objects"]).inc();
|
||||
}
|
||||
@@ -396,6 +409,34 @@ impl RemoteStorage for S3Bucket {
|
||||
})
|
||||
.await
|
||||
}
|
||||
async fn delete_objects<'a>(&self, paths: &'a [RemotePath]) -> anyhow::Result<()> {
|
||||
let _guard = self
|
||||
.concurrency_limiter
|
||||
.acquire()
|
||||
.await
|
||||
.context("Concurrency limiter semaphore got closed during S3 delete")?;
|
||||
|
||||
let mut delete_objects = Vec::with_capacity(paths.len());
|
||||
for path in paths {
|
||||
let obj_id = ObjectIdentifier::builder()
|
||||
.set_key(Some(self.relative_path_to_s3_object(path)))
|
||||
.build();
|
||||
delete_objects.push(obj_id);
|
||||
}
|
||||
|
||||
metrics::inc_delete_objects(paths.len() as u64);
|
||||
self.client
|
||||
.delete_objects()
|
||||
.bucket(self.bucket_name.clone())
|
||||
.delete(Delete::builder().set_objects(Some(delete_objects)).build())
|
||||
.send()
|
||||
.await
|
||||
.map_err(|e| {
|
||||
metrics::inc_delete_objects_fail(paths.len() as u64);
|
||||
e
|
||||
})?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn delete(&self, path: &RemotePath) -> anyhow::Result<()> {
|
||||
let _guard = self
|
||||
|
||||
@@ -119,4 +119,11 @@ impl RemoteStorage for UnreliableWrapper {
|
||||
self.attempt(RemoteOp::Delete(path.clone()))?;
|
||||
self.inner.delete(path).await
|
||||
}
|
||||
|
||||
async fn delete_objects<'a>(&self, paths: &'a [RemotePath]) -> anyhow::Result<()> {
|
||||
for path in paths {
|
||||
self.delete(path).await?
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -107,6 +107,37 @@ async fn s3_delete_non_exising_works(ctx: &mut MaybeEnabledS3) -> anyhow::Result
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test_context(MaybeEnabledS3)]
|
||||
#[tokio::test]
|
||||
async fn s3_delete_objects_works(ctx: &mut MaybeEnabledS3) -> anyhow::Result<()> {
|
||||
let ctx = match ctx {
|
||||
MaybeEnabledS3::Enabled(ctx) => ctx,
|
||||
MaybeEnabledS3::Disabled => return Ok(()),
|
||||
};
|
||||
|
||||
let path1 = RemotePath::new(&PathBuf::from(format!("{}/path1", ctx.base_prefix,)))
|
||||
.with_context(|| "RemotePath conversion")?;
|
||||
|
||||
let path2 = RemotePath::new(&PathBuf::from(format!("{}/path2", ctx.base_prefix,)))
|
||||
.with_context(|| "RemotePath conversion")?;
|
||||
|
||||
let data1 = "remote blob data1".as_bytes();
|
||||
let data1_len = data1.len();
|
||||
let data2 = "remote blob data2".as_bytes();
|
||||
let data2_len = data2.len();
|
||||
ctx.client
|
||||
.upload(std::io::Cursor::new(data1), data1_len, &path1, None)
|
||||
.await?;
|
||||
|
||||
ctx.client
|
||||
.upload(std::io::Cursor::new(data2), data2_len, &path2, None)
|
||||
.await?;
|
||||
|
||||
ctx.client.delete_objects(&[path1, path2]).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn ensure_logging_ready() {
|
||||
LOGGING_DONE.get_or_init(|| {
|
||||
utils::logging::init(
|
||||
|
||||
@@ -1,19 +1,18 @@
|
||||
use crate::auth::{Claims, JwtAuth};
|
||||
use crate::http::error::{api_error_handler, route_error_handler, ApiError};
|
||||
use anyhow::{anyhow, Context};
|
||||
use anyhow::Context;
|
||||
use hyper::header::{HeaderName, AUTHORIZATION};
|
||||
use hyper::http::HeaderValue;
|
||||
use hyper::Method;
|
||||
use hyper::{header::CONTENT_TYPE, Body, Request, Response, Server};
|
||||
use hyper::{header::CONTENT_TYPE, Body, Request, Response};
|
||||
use metrics::{register_int_counter, Encoder, IntCounter, TextEncoder};
|
||||
use once_cell::sync::Lazy;
|
||||
use routerify::ext::RequestExt;
|
||||
use routerify::{Middleware, RequestInfo, Router, RouterBuilder, RouterService};
|
||||
use routerify::{Middleware, RequestInfo, Router, RouterBuilder};
|
||||
use tokio::task::JoinError;
|
||||
use tracing::{self, debug, info, info_span, warn, Instrument};
|
||||
|
||||
use std::future::Future;
|
||||
use std::net::TcpListener;
|
||||
use std::str::FromStr;
|
||||
|
||||
static SERVE_METRICS_COUNT: Lazy<IntCounter> = Lazy::new(|| {
|
||||
@@ -348,40 +347,6 @@ pub fn check_permission_with(
|
||||
}
|
||||
}
|
||||
|
||||
///
|
||||
/// Start listening for HTTP requests on given socket.
|
||||
///
|
||||
/// 'shutdown_future' can be used to stop. If the Future becomes
|
||||
/// ready, we stop listening for new requests, and the function returns.
|
||||
///
|
||||
pub fn serve_thread_main<S>(
|
||||
router_builder: RouterBuilder<hyper::Body, ApiError>,
|
||||
listener: TcpListener,
|
||||
shutdown_future: S,
|
||||
) -> anyhow::Result<()>
|
||||
where
|
||||
S: Future<Output = ()> + Send + Sync,
|
||||
{
|
||||
info!("Starting an HTTP endpoint at {}", listener.local_addr()?);
|
||||
|
||||
// Create a Service from the router above to handle incoming requests.
|
||||
let service = RouterService::new(router_builder.build().map_err(|err| anyhow!(err))?).unwrap();
|
||||
|
||||
// Enter a single-threaded tokio runtime bound to the current thread
|
||||
let runtime = tokio::runtime::Builder::new_current_thread()
|
||||
.enable_all()
|
||||
.build()?;
|
||||
|
||||
let _guard = runtime.enter();
|
||||
|
||||
let server = Server::from_tcp(listener)?
|
||||
.serve(service)
|
||||
.with_graceful_shutdown(shutdown_future);
|
||||
|
||||
runtime.block_on(server)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
@@ -12,6 +12,7 @@ testing = ["fail/failpoints"]
|
||||
|
||||
[dependencies]
|
||||
anyhow.workspace = true
|
||||
arc-swap.workspace = true
|
||||
async-stream.workspace = true
|
||||
async-trait.workspace = true
|
||||
byteorder.workspace = true
|
||||
|
||||
@@ -1,22 +1,23 @@
|
||||
use pageserver::keyspace::{KeyPartitioning, KeySpace};
|
||||
use pageserver::repository::Key;
|
||||
use pageserver::tenant::layer_map::LayerMap;
|
||||
use pageserver::tenant::storage_layer::{Layer, LayerDescriptor, LayerFileName};
|
||||
use pageserver::tenant::storage_layer::{tests::LayerDescriptor, Layer, LayerFileName};
|
||||
use pageserver::tenant::storage_layer::{PersistentLayer, PersistentLayerDesc};
|
||||
use rand::prelude::{SeedableRng, SliceRandom, StdRng};
|
||||
use std::cmp::{max, min};
|
||||
use std::fs::File;
|
||||
use std::io::{BufRead, BufReader};
|
||||
use std::path::PathBuf;
|
||||
use std::str::FromStr;
|
||||
use std::sync::Arc;
|
||||
use std::time::Instant;
|
||||
use utils::id::{TenantId, TimelineId};
|
||||
|
||||
use utils::lsn::Lsn;
|
||||
|
||||
use criterion::{black_box, criterion_group, criterion_main, Criterion};
|
||||
|
||||
fn build_layer_map(filename_dump: PathBuf) -> LayerMap<LayerDescriptor> {
|
||||
let mut layer_map = LayerMap::<LayerDescriptor>::default();
|
||||
fn build_layer_map(filename_dump: PathBuf) -> LayerMap {
|
||||
let mut layer_map = LayerMap::default();
|
||||
|
||||
let mut min_lsn = Lsn(u64::MAX);
|
||||
let mut max_lsn = Lsn(0);
|
||||
@@ -33,7 +34,7 @@ fn build_layer_map(filename_dump: PathBuf) -> LayerMap<LayerDescriptor> {
|
||||
min_lsn = min(min_lsn, lsn_range.start);
|
||||
max_lsn = max(max_lsn, Lsn(lsn_range.end.0 - 1));
|
||||
|
||||
updates.insert_historic(layer.get_persistent_layer_desc(), Arc::new(layer));
|
||||
updates.insert_historic(layer.layer_desc().clone());
|
||||
}
|
||||
|
||||
println!("min: {min_lsn}, max: {max_lsn}");
|
||||
@@ -43,7 +44,7 @@ fn build_layer_map(filename_dump: PathBuf) -> LayerMap<LayerDescriptor> {
|
||||
}
|
||||
|
||||
/// Construct a layer map query pattern for benchmarks
|
||||
fn uniform_query_pattern(layer_map: &LayerMap<LayerDescriptor>) -> Vec<(Key, Lsn)> {
|
||||
fn uniform_query_pattern(layer_map: &LayerMap) -> Vec<(Key, Lsn)> {
|
||||
// For each image layer we query one of the pages contained, at LSN right
|
||||
// before the image layer was created. This gives us a somewhat uniform
|
||||
// coverage of both the lsn and key space because image layers have
|
||||
@@ -69,7 +70,7 @@ fn uniform_query_pattern(layer_map: &LayerMap<LayerDescriptor>) -> Vec<(Key, Lsn
|
||||
|
||||
// Construct a partitioning for testing get_difficulty map when we
|
||||
// don't have an exact result of `collect_keyspace` to work with.
|
||||
fn uniform_key_partitioning(layer_map: &LayerMap<LayerDescriptor>, _lsn: Lsn) -> KeyPartitioning {
|
||||
fn uniform_key_partitioning(layer_map: &LayerMap, _lsn: Lsn) -> KeyPartitioning {
|
||||
let mut parts = Vec::new();
|
||||
|
||||
// We add a partition boundary at the start of each image layer,
|
||||
@@ -209,13 +210,15 @@ fn bench_sequential(c: &mut Criterion) {
|
||||
for i in 0..100_000 {
|
||||
let i32 = (i as u32) % 100;
|
||||
let zero = Key::from_hex("000000000000000000000000000000000000").unwrap();
|
||||
let layer = LayerDescriptor {
|
||||
key: zero.add(10 * i32)..zero.add(10 * i32 + 1),
|
||||
lsn: Lsn(i)..Lsn(i + 1),
|
||||
is_incremental: false,
|
||||
short_id: format!("Layer {}", i),
|
||||
};
|
||||
updates.insert_historic(layer.get_persistent_layer_desc(), Arc::new(layer));
|
||||
let layer = LayerDescriptor::from(PersistentLayerDesc::new_img(
|
||||
TenantId::generate(),
|
||||
TimelineId::generate(),
|
||||
zero.add(10 * i32)..zero.add(10 * i32 + 1),
|
||||
Lsn(i),
|
||||
false,
|
||||
0,
|
||||
));
|
||||
updates.insert_historic(layer.layer_desc().clone());
|
||||
}
|
||||
updates.flush();
|
||||
println!("Finished layer map init in {:?}", now.elapsed());
|
||||
|
||||
@@ -516,7 +516,7 @@ async fn collect_eviction_candidates(
|
||||
if !tl.is_active() {
|
||||
continue;
|
||||
}
|
||||
let info = tl.get_local_layers_for_disk_usage_eviction();
|
||||
let info = tl.get_local_layers_for_disk_usage_eviction().await;
|
||||
debug!(tenant_id=%tl.tenant_id, timeline_id=%tl.timeline_id, "timeline resident layers count: {}", info.resident_layers.len());
|
||||
tenant_candidates.extend(
|
||||
info.resident_layers
|
||||
|
||||
@@ -215,7 +215,7 @@ async fn build_timeline_info(
|
||||
) -> anyhow::Result<TimelineInfo> {
|
||||
crate::tenant::debug_assert_current_span_has_tenant_and_timeline_id();
|
||||
|
||||
let mut info = build_timeline_info_common(timeline, ctx)?;
|
||||
let mut info = build_timeline_info_common(timeline, ctx).await?;
|
||||
if include_non_incremental_logical_size {
|
||||
// XXX we should be using spawn_ondemand_logical_size_calculation here.
|
||||
// Otherwise, if someone deletes the timeline / detaches the tenant while
|
||||
@@ -233,7 +233,7 @@ async fn build_timeline_info(
|
||||
Ok(info)
|
||||
}
|
||||
|
||||
fn build_timeline_info_common(
|
||||
async fn build_timeline_info_common(
|
||||
timeline: &Arc<Timeline>,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<TimelineInfo> {
|
||||
@@ -264,7 +264,7 @@ fn build_timeline_info_common(
|
||||
None
|
||||
}
|
||||
};
|
||||
let current_physical_size = Some(timeline.layer_size_sum());
|
||||
let current_physical_size = Some(timeline.layer_size_sum().await);
|
||||
let state = timeline.current_state();
|
||||
let remote_consistent_lsn = timeline.get_remote_consistent_lsn().unwrap_or(Lsn(0));
|
||||
|
||||
@@ -330,6 +330,7 @@ async fn timeline_create_handler(
|
||||
Ok(Some(new_timeline)) => {
|
||||
// Created. Construct a TimelineInfo for it.
|
||||
let timeline_info = build_timeline_info_common(&new_timeline, &ctx)
|
||||
.await
|
||||
.map_err(ApiError::InternalServerError)?;
|
||||
json_response(StatusCode::CREATED, timeline_info)
|
||||
}
|
||||
@@ -591,7 +592,7 @@ async fn tenant_status(
|
||||
// Calculate total physical size of all timelines
|
||||
let mut current_physical_size = 0;
|
||||
for timeline in tenant.list_timelines().iter() {
|
||||
current_physical_size += timeline.layer_size_sum();
|
||||
current_physical_size += timeline.layer_size_sum().await;
|
||||
}
|
||||
|
||||
let state = tenant.current_state();
|
||||
@@ -701,7 +702,7 @@ async fn layer_map_info_handler(
|
||||
check_permission(&request, Some(tenant_id))?;
|
||||
|
||||
let timeline = active_timeline_of_active_tenant(tenant_id, timeline_id).await?;
|
||||
let layer_map_info = timeline.layer_map_info(reset);
|
||||
let layer_map_info = timeline.layer_map_info(reset).await;
|
||||
|
||||
json_response(StatusCode::OK, layer_map_info)
|
||||
}
|
||||
|
||||
@@ -75,12 +75,12 @@ pub async fn import_timeline_from_postgres_datadir(
|
||||
{
|
||||
pg_control = Some(control_file);
|
||||
}
|
||||
modification.flush()?;
|
||||
modification.flush().await?;
|
||||
}
|
||||
}
|
||||
|
||||
// We're done importing all the data files.
|
||||
modification.commit()?;
|
||||
modification.commit().await?;
|
||||
|
||||
// We expect the Postgres server to be shut down cleanly.
|
||||
let pg_control = pg_control.context("pg_control file not found")?;
|
||||
@@ -359,7 +359,7 @@ pub async fn import_basebackup_from_tar(
|
||||
// We found the pg_control file.
|
||||
pg_control = Some(res);
|
||||
}
|
||||
modification.flush()?;
|
||||
modification.flush().await?;
|
||||
}
|
||||
tokio_tar::EntryType::Directory => {
|
||||
debug!("directory {:?}", file_path);
|
||||
@@ -377,7 +377,7 @@ pub async fn import_basebackup_from_tar(
|
||||
// sanity check: ensure that pg_control is loaded
|
||||
let _pg_control = pg_control.context("pg_control file not found")?;
|
||||
|
||||
modification.commit()?;
|
||||
modification.commit().await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -594,7 +594,7 @@ async fn import_file(
|
||||
// zenith.signal is not necessarily the last file, that we handle
|
||||
// but it is ok to call `finish_write()`, because final `modification.commit()`
|
||||
// will update lsn once more to the final one.
|
||||
let writer = modification.tline.writer();
|
||||
let writer = modification.tline.writer().await;
|
||||
writer.finish_write(prev_lsn);
|
||||
|
||||
debug!("imported zenith signal {}", prev_lsn);
|
||||
|
||||
@@ -799,12 +799,8 @@ impl PageCache {
|
||||
// a different victim. But if the problem persists, the page cache
|
||||
// could fill up with dirty pages that we cannot evict, and we will
|
||||
// loop retrying the writebacks indefinitely.
|
||||
if cfg!(test) {
|
||||
anyhow::bail!("writeback of buffer {:?} failed: {}", old_key, err);
|
||||
} else {
|
||||
error!("writeback of buffer {:?} failed: {}", old_key, err);
|
||||
continue;
|
||||
}
|
||||
error!("writeback of buffer {:?} failed: {}", old_key, err);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -489,7 +489,9 @@ impl PageServerHandler {
|
||||
// Create empty timeline
|
||||
info!("creating new timeline");
|
||||
let tenant = get_active_tenant_with_timeout(tenant_id, &ctx).await?;
|
||||
let timeline = tenant.create_empty_timeline(timeline_id, base_lsn, pg_version, &ctx)?;
|
||||
let timeline = tenant
|
||||
.create_empty_timeline(timeline_id, base_lsn, pg_version, &ctx)
|
||||
.await?;
|
||||
|
||||
// TODO mark timeline as not ready until it reaches end_lsn.
|
||||
// We might have some wal to import as well, and we should prevent compute
|
||||
|
||||
@@ -699,6 +699,20 @@ impl<'a> DatadirModification<'a> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
pub fn init_empty_test_timeline(&mut self) -> anyhow::Result<()> {
|
||||
self.init_empty()?;
|
||||
self.put_control_file(bytes::Bytes::from_static(
|
||||
b"control_file contents do not matter",
|
||||
))
|
||||
.context("put_control_file")?;
|
||||
self.put_checkpoint(bytes::Bytes::from_static(
|
||||
b"checkpoint_file contents do not matter",
|
||||
))
|
||||
.context("put_checkpoint_file")?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Put a new page version that can be constructed from a WAL record
|
||||
///
|
||||
/// NOTE: this will *not* implicitly extend the relation, if the page is beyond the
|
||||
@@ -1108,7 +1122,7 @@ impl<'a> DatadirModification<'a> {
|
||||
/// retains all the metadata, but data pages are flushed. That's again OK
|
||||
/// for bulk import, where you are just loading data pages and won't try to
|
||||
/// modify the same pages twice.
|
||||
pub fn flush(&mut self) -> anyhow::Result<()> {
|
||||
pub async fn flush(&mut self) -> anyhow::Result<()> {
|
||||
// Unless we have accumulated a decent amount of changes, it's not worth it
|
||||
// to scan through the pending_updates list.
|
||||
let pending_nblocks = self.pending_nblocks;
|
||||
@@ -1116,19 +1130,20 @@ impl<'a> DatadirModification<'a> {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let writer = self.tline.writer();
|
||||
let writer = self.tline.writer().await;
|
||||
|
||||
// Flush relation and SLRU data blocks, keep metadata.
|
||||
let mut result: anyhow::Result<()> = Ok(());
|
||||
self.pending_updates.retain(|&key, value| {
|
||||
if result.is_ok() && (is_rel_block_key(key) || is_slru_block_key(key)) {
|
||||
result = writer.put(key, self.lsn, value);
|
||||
false
|
||||
let mut retained_pending_updates = HashMap::new();
|
||||
for (key, value) in self.pending_updates.drain() {
|
||||
if is_rel_block_key(key) || is_slru_block_key(key) {
|
||||
// This bails out on first error without modifying pending_updates.
|
||||
// That's Ok, cf this function's doc comment.
|
||||
writer.put(key, self.lsn, &value).await?;
|
||||
} else {
|
||||
true
|
||||
retained_pending_updates.insert(key, value);
|
||||
}
|
||||
});
|
||||
result?;
|
||||
}
|
||||
self.pending_updates.extend(retained_pending_updates);
|
||||
|
||||
if pending_nblocks != 0 {
|
||||
writer.update_current_logical_size(pending_nblocks * i64::from(BLCKSZ));
|
||||
@@ -1143,17 +1158,17 @@ impl<'a> DatadirModification<'a> {
|
||||
/// underlying timeline.
|
||||
/// All the modifications in this atomic update are stamped by the specified LSN.
|
||||
///
|
||||
pub fn commit(&mut self) -> anyhow::Result<()> {
|
||||
let writer = self.tline.writer();
|
||||
pub async fn commit(&mut self) -> anyhow::Result<()> {
|
||||
let writer = self.tline.writer().await;
|
||||
let lsn = self.lsn;
|
||||
let pending_nblocks = self.pending_nblocks;
|
||||
self.pending_nblocks = 0;
|
||||
|
||||
for (key, value) in self.pending_updates.drain() {
|
||||
writer.put(key, lsn, &value)?;
|
||||
writer.put(key, lsn, &value).await?;
|
||||
}
|
||||
for key_range in self.pending_deletions.drain(..) {
|
||||
writer.delete(key_range, lsn)?;
|
||||
writer.delete(key_range, lsn).await?;
|
||||
}
|
||||
|
||||
writer.finish_write(lsn);
|
||||
@@ -1593,17 +1608,6 @@ fn is_slru_block_key(key: Key) -> bool {
|
||||
&& key.field6 != 0xffffffff // and not SlruSegSize
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
pub fn create_test_timeline(
|
||||
tenant: &crate::tenant::Tenant,
|
||||
timeline_id: utils::id::TimelineId,
|
||||
pg_version: u32,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<std::sync::Arc<Timeline>> {
|
||||
let tline = tenant.create_test_timeline(timeline_id, Lsn(8), pg_version, ctx)?;
|
||||
Ok(tline)
|
||||
}
|
||||
|
||||
#[allow(clippy::bool_assert_comparison)]
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@@ -39,7 +39,7 @@ pub struct EphemeralFile {
|
||||
file_id: u64,
|
||||
_tenant_id: TenantId,
|
||||
_timeline_id: TimelineId,
|
||||
file: Option<Arc<VirtualFile>>,
|
||||
file: Arc<VirtualFile>,
|
||||
|
||||
pub size: u64,
|
||||
}
|
||||
@@ -52,10 +52,7 @@ impl EphemeralFile {
|
||||
) -> Result<EphemeralFile, io::Error> {
|
||||
let mut l = EPHEMERAL_FILES.write().unwrap();
|
||||
let file_id = l.next_file_id;
|
||||
l.next_file_id = l
|
||||
.next_file_id
|
||||
.checked_add(1)
|
||||
.expect("next_file_id is u64, expecting it to not overflow");
|
||||
l.next_file_id += 1;
|
||||
|
||||
let filename = conf
|
||||
.timeline_path(&timeline_id, &tenant_id)
|
||||
@@ -63,30 +60,16 @@ impl EphemeralFile {
|
||||
|
||||
let file = VirtualFile::open_with_options(
|
||||
&filename,
|
||||
OpenOptions::new()
|
||||
.read(true)
|
||||
.write(true)
|
||||
// The next_file_id doesn't overlfow, so technically, `create_new` is not needed.
|
||||
// But it's cheap, so why not.
|
||||
.create_new(true),
|
||||
OpenOptions::new().read(true).write(true).create(true),
|
||||
)?;
|
||||
let file_rc = Arc::new(file);
|
||||
l.files.insert(file_id, file_rc.clone());
|
||||
|
||||
#[cfg(debug_assertions)]
|
||||
debug!(
|
||||
"created ephemeral file {}\n{}",
|
||||
filename.display(),
|
||||
std::backtrace::Backtrace::force_capture()
|
||||
);
|
||||
#[cfg(not(debug_assertions))]
|
||||
debug!("created ephemeral file {}", filename.display());
|
||||
|
||||
Ok(EphemeralFile {
|
||||
file_id,
|
||||
_tenant_id: tenant_id,
|
||||
_timeline_id: timeline_id,
|
||||
file: Some(file_rc),
|
||||
file: file_rc,
|
||||
size: 0,
|
||||
})
|
||||
}
|
||||
@@ -96,8 +79,6 @@ impl EphemeralFile {
|
||||
while off < PAGE_SZ {
|
||||
let n = self
|
||||
.file
|
||||
.as_ref()
|
||||
.unwrap()
|
||||
.read_at(&mut buf[off..], blkno as u64 * PAGE_SZ as u64 + off as u64)?;
|
||||
|
||||
if n == 0 {
|
||||
@@ -280,43 +261,17 @@ impl Drop for EphemeralFile {
|
||||
cache.drop_buffers_for_ephemeral(self.file_id);
|
||||
|
||||
// remove entry from the hash map
|
||||
let virtual_file = EPHEMERAL_FILES
|
||||
.write()
|
||||
.unwrap()
|
||||
.files
|
||||
.remove(&self.file_id)
|
||||
.unwrap();
|
||||
|
||||
// remove file from self
|
||||
let self_file = self.file.take().unwrap();
|
||||
|
||||
assert_eq!(
|
||||
Arc::as_ptr(&virtual_file) as *const (),
|
||||
Arc::as_ptr(&self_file) as *const ()
|
||||
);
|
||||
drop(self_file);
|
||||
|
||||
// XXX once we upgrade to Rust 1.70, use Arc::into_inner.
|
||||
// It does the following checks atomically.
|
||||
assert_eq!(Arc::weak_count(&virtual_file), 0);
|
||||
let virtual_file = Arc::try_unwrap(virtual_file).expect(
|
||||
"we are being dropped and EPHEMERAL_FILES is the only other place where we put the Arc",
|
||||
);
|
||||
EPHEMERAL_FILES.write().unwrap().files.remove(&self.file_id);
|
||||
|
||||
// unlink the file
|
||||
// TODO: we should be able to unwrap here, but, timeline delete and tenant detach do
|
||||
// std::fs::remove_dir_all without dropping all InMemoryLayer => EphemeralFile
|
||||
// of the tenant => need to fix that first.
|
||||
match virtual_file.remove() {
|
||||
Ok(()) => (),
|
||||
Err((virtual_file, e)) => {
|
||||
warn!(
|
||||
"could not remove ephemeral file '{}': {}",
|
||||
virtual_file.path.display(),
|
||||
e
|
||||
);
|
||||
}
|
||||
};
|
||||
let res = std::fs::remove_file(&self.file.path);
|
||||
if let Err(e) = res {
|
||||
warn!(
|
||||
"could not remove ephemeral file '{}': {}",
|
||||
self.file.path.display(),
|
||||
e
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
127
pageserver/src/tenant/layer_cache.rs
Normal file
127
pageserver/src/tenant/layer_cache.rs
Normal file
@@ -0,0 +1,127 @@
|
||||
use super::storage_layer::{PersistentLayer, PersistentLayerDesc, PersistentLayerKey, RemoteLayer};
|
||||
use super::Timeline;
|
||||
use crate::tenant::layer_map::{self, LayerMap};
|
||||
use anyhow::Result;
|
||||
use std::sync::{Mutex, Weak};
|
||||
use std::{collections::HashMap, sync::Arc};
|
||||
|
||||
pub struct LayerCache {
|
||||
/// Layer removal lock.
|
||||
/// A lock to ensure that no layer of the timeline is removed concurrently by other tasks.
|
||||
/// This lock is acquired in [`Timeline::gc`], [`Timeline::compact`],
|
||||
/// and [`Tenant::delete_timeline`]. This is an `Arc<Mutex>` lock because we need an owned
|
||||
/// lock guard in functions that will be spawned to tokio I/O pool (which requires `'static`).
|
||||
pub layers_removal_lock: Arc<tokio::sync::Mutex<()>>,
|
||||
|
||||
/// We need this lock b/c we do not have any way to prevent GC/compaction from removing files in-use.
|
||||
/// We need to do reference counting on Arc to prevent this from happening, and we can safely remove this lock.
|
||||
pub layers_operation_lock: Arc<tokio::sync::RwLock<()>>,
|
||||
|
||||
/// Will be useful when we move evict / download to layer cache.
|
||||
#[allow(unused)]
|
||||
timeline: Weak<Timeline>,
|
||||
|
||||
mapping: Mutex<HashMap<PersistentLayerKey, Arc<dyn PersistentLayer>>>,
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct DeleteGuard(Arc<tokio::sync::OwnedMutexGuard<()>>);
|
||||
|
||||
impl LayerCache {
|
||||
pub fn new(timeline: Weak<Timeline>) -> Self {
|
||||
Self {
|
||||
layers_operation_lock: Arc::new(tokio::sync::RwLock::new(())),
|
||||
layers_removal_lock: Arc::new(tokio::sync::Mutex::new(())),
|
||||
mapping: Mutex::new(HashMap::new()),
|
||||
timeline,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn get_from_desc(&self, desc: &PersistentLayerDesc) -> Arc<dyn PersistentLayer> {
|
||||
let guard = self.mapping.lock().unwrap();
|
||||
guard.get(&desc.key()).expect("not found").clone()
|
||||
}
|
||||
|
||||
/// Ensures only one of compaction / gc can happen at a time.
|
||||
pub async fn delete_guard(&self) -> DeleteGuard {
|
||||
DeleteGuard(Arc::new(
|
||||
self.layers_removal_lock.clone().lock_owned().await,
|
||||
))
|
||||
}
|
||||
|
||||
/// Should only be called when initializing the timeline. Bypass checks and layer operation lock.
|
||||
pub fn remove_local_when_init(&self, layer: Arc<dyn PersistentLayer>) {
|
||||
let mut guard = self.mapping.lock().unwrap();
|
||||
guard.remove(&layer.layer_desc().key());
|
||||
}
|
||||
|
||||
/// Should only be called when initializing the timeline. Bypass checks and layer operation lock.
|
||||
pub fn populate_remote_when_init(&self, layer: Arc<RemoteLayer>) {
|
||||
let mut guard = self.mapping.lock().unwrap();
|
||||
guard.insert(layer.layer_desc().key(), layer);
|
||||
}
|
||||
|
||||
/// Should only be called when initializing the timeline. Bypass checks and layer operation lock.
|
||||
pub fn populate_local_when_init(&self, layer: Arc<dyn PersistentLayer>) {
|
||||
let mut guard = self.mapping.lock().unwrap();
|
||||
guard.insert(layer.layer_desc().key(), layer);
|
||||
}
|
||||
|
||||
/// Called within read path.
|
||||
pub fn replace_and_verify(
|
||||
&self,
|
||||
expected: Arc<dyn PersistentLayer>,
|
||||
new: Arc<dyn PersistentLayer>,
|
||||
) -> Result<()> {
|
||||
let mut guard = self.mapping.lock().unwrap();
|
||||
|
||||
use super::layer_map::LayerKey;
|
||||
let key = LayerKey::from(&*expected);
|
||||
let other = LayerKey::from(&*new);
|
||||
|
||||
let expected_l0 = LayerMap::is_l0(expected.layer_desc());
|
||||
let new_l0 = LayerMap::is_l0(new.layer_desc());
|
||||
|
||||
fail::fail_point!("layermap-replace-notfound", |_| anyhow::bail!(
|
||||
"replacing downloaded layer into layermap failed because layer was not found"
|
||||
));
|
||||
|
||||
anyhow::ensure!(
|
||||
key == other,
|
||||
"replacing downloaded layer into layermap failed because two layers have different keys: {key:?} != {other:?}"
|
||||
);
|
||||
|
||||
anyhow::ensure!(
|
||||
expected_l0 == new_l0,
|
||||
"replacing downloaded layer into layermap failed because one layer is l0 while the other is not: {expected_l0} != {new_l0}"
|
||||
);
|
||||
|
||||
if let Some(layer) = guard.get_mut(&expected.layer_desc().key()) {
|
||||
anyhow::ensure!(
|
||||
layer_map::compare_arced_layers(&expected, layer),
|
||||
"replacing downloaded layer into layermap failed because another layer was found instead of expected, expected={expected:?}, new={new:?}",
|
||||
expected = Arc::as_ptr(&expected),
|
||||
new = Arc::as_ptr(layer),
|
||||
);
|
||||
*layer = new;
|
||||
Ok(())
|
||||
} else {
|
||||
anyhow::bail!(
|
||||
"replacing downloaded layer into layermap failed because layer was not found"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
/// Called within write path. When compaction and image layer creation we will create new layers.
|
||||
pub fn create_new_layer(&self, layer: Arc<dyn PersistentLayer>) {
|
||||
let mut guard = self.mapping.lock().unwrap();
|
||||
guard.insert(layer.layer_desc().key(), layer);
|
||||
}
|
||||
|
||||
/// Called within write path. When GC and compaction we will remove layers and delete them on disk.
|
||||
/// Will move logic to delete files here later.
|
||||
pub fn delete_layer(&self, layer: Arc<dyn PersistentLayer>) {
|
||||
let mut guard = self.mapping.lock().unwrap();
|
||||
guard.remove(&layer.layer_desc().key());
|
||||
}
|
||||
}
|
||||
@@ -51,25 +51,23 @@ use crate::keyspace::KeyPartitioning;
|
||||
use crate::repository::Key;
|
||||
use crate::tenant::storage_layer::InMemoryLayer;
|
||||
use crate::tenant::storage_layer::Layer;
|
||||
use anyhow::Context;
|
||||
use anyhow::Result;
|
||||
use std::collections::HashMap;
|
||||
use std::collections::VecDeque;
|
||||
use std::ops::Range;
|
||||
use std::sync::Arc;
|
||||
use utils::lsn::Lsn;
|
||||
|
||||
use historic_layer_coverage::BufferedHistoricLayerCoverage;
|
||||
pub use historic_layer_coverage::Replacement;
|
||||
pub use historic_layer_coverage::{LayerKey, Replacement};
|
||||
|
||||
use super::storage_layer::range_eq;
|
||||
use super::storage_layer::PersistentLayerDesc;
|
||||
use super::storage_layer::PersistentLayerKey;
|
||||
|
||||
///
|
||||
/// LayerMap tracks what layers exist on a timeline.
|
||||
///
|
||||
pub struct LayerMap<L: ?Sized> {
|
||||
#[derive(Default, Clone)]
|
||||
pub struct LayerMap {
|
||||
//
|
||||
// 'open_layer' holds the current InMemoryLayer that is accepting new
|
||||
// records. If it is None, 'next_open_layer_at' will be set instead, indicating
|
||||
@@ -95,24 +93,6 @@ pub struct LayerMap<L: ?Sized> {
|
||||
/// L0 layers have key range Key::MIN..Key::MAX, and locating them using R-Tree search is very inefficient.
|
||||
/// So L0 layers are held in l0_delta_layers vector, in addition to the R-tree.
|
||||
l0_delta_layers: Vec<Arc<PersistentLayerDesc>>,
|
||||
|
||||
/// Mapping from persistent layer key to the actual layer object. Currently, it stores delta, image, and
|
||||
/// remote layers. In future refactors, this will be eventually moved out of LayerMap into Timeline, and
|
||||
/// RemoteLayer will be removed.
|
||||
mapping: HashMap<PersistentLayerKey, Arc<L>>,
|
||||
}
|
||||
|
||||
impl<L: ?Sized> Default for LayerMap<L> {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
open_layer: None,
|
||||
next_open_layer_at: None,
|
||||
frozen_layers: VecDeque::default(),
|
||||
l0_delta_layers: Vec::default(),
|
||||
historic: BufferedHistoricLayerCoverage::default(),
|
||||
mapping: HashMap::default(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// The primary update API for the layer map.
|
||||
@@ -120,24 +100,21 @@ impl<L: ?Sized> Default for LayerMap<L> {
|
||||
/// Batching historic layer insertions and removals is good for
|
||||
/// performance and this struct helps us do that correctly.
|
||||
#[must_use]
|
||||
pub struct BatchedUpdates<'a, L: ?Sized + Layer> {
|
||||
pub struct BatchedUpdates<'a> {
|
||||
// While we hold this exclusive reference to the layer map the type checker
|
||||
// will prevent us from accidentally reading any unflushed updates.
|
||||
layer_map: &'a mut LayerMap<L>,
|
||||
layer_map: &'a mut LayerMap,
|
||||
}
|
||||
|
||||
/// Provide ability to batch more updates while hiding the read
|
||||
/// API so we don't accidentally read without flushing.
|
||||
impl<L> BatchedUpdates<'_, L>
|
||||
where
|
||||
L: ?Sized + Layer,
|
||||
{
|
||||
impl BatchedUpdates<'_> {
|
||||
///
|
||||
/// Insert an on-disk layer.
|
||||
///
|
||||
// TODO remove the `layer` argument when `mapping` is refactored out of `LayerMap`
|
||||
pub fn insert_historic(&mut self, layer_desc: PersistentLayerDesc, layer: Arc<L>) {
|
||||
self.layer_map.insert_historic_noflush(layer_desc, layer)
|
||||
pub fn insert_historic(&mut self, layer_desc: PersistentLayerDesc) {
|
||||
self.layer_map.insert_historic_noflush(layer_desc)
|
||||
}
|
||||
|
||||
///
|
||||
@@ -145,31 +122,8 @@ where
|
||||
///
|
||||
/// This should be called when the corresponding file on disk has been deleted.
|
||||
///
|
||||
pub fn remove_historic(&mut self, layer_desc: PersistentLayerDesc, layer: Arc<L>) {
|
||||
self.layer_map.remove_historic_noflush(layer_desc, layer)
|
||||
}
|
||||
|
||||
/// Replaces existing layer iff it is the `expected`.
|
||||
///
|
||||
/// If the expected layer has been removed it will not be inserted by this function.
|
||||
///
|
||||
/// Returned `Replacement` describes succeeding in replacement or the reason why it could not
|
||||
/// be done.
|
||||
///
|
||||
/// TODO replacement can be done without buffering and rebuilding layer map updates.
|
||||
/// One way to do that is to add a layer of indirection for returned values, so
|
||||
/// that we can replace values only by updating a hashmap.
|
||||
pub fn replace_historic(
|
||||
&mut self,
|
||||
expected_desc: PersistentLayerDesc,
|
||||
expected: &Arc<L>,
|
||||
new_desc: PersistentLayerDesc,
|
||||
new: Arc<L>,
|
||||
) -> anyhow::Result<Replacement<Arc<L>>> {
|
||||
fail::fail_point!("layermap-replace-notfound", |_| Ok(Replacement::NotFound));
|
||||
|
||||
self.layer_map
|
||||
.replace_historic_noflush(expected_desc, expected, new_desc, new)
|
||||
pub fn remove_historic(&mut self, layer_desc: PersistentLayerDesc) {
|
||||
self.layer_map.remove_historic_noflush(layer_desc)
|
||||
}
|
||||
|
||||
// We will flush on drop anyway, but this method makes it
|
||||
@@ -185,25 +139,19 @@ where
|
||||
// than panic later or read without flushing.
|
||||
//
|
||||
// TODO maybe warn if flush hasn't explicitly been called
|
||||
impl<L> Drop for BatchedUpdates<'_, L>
|
||||
where
|
||||
L: ?Sized + Layer,
|
||||
{
|
||||
impl Drop for BatchedUpdates<'_> {
|
||||
fn drop(&mut self) {
|
||||
self.layer_map.flush_updates();
|
||||
}
|
||||
}
|
||||
|
||||
/// Return value of LayerMap::search
|
||||
pub struct SearchResult<L: ?Sized> {
|
||||
pub layer: Arc<L>,
|
||||
pub struct SearchResult {
|
||||
pub layer: Arc<PersistentLayerDesc>,
|
||||
pub lsn_floor: Lsn,
|
||||
}
|
||||
|
||||
impl<L> LayerMap<L>
|
||||
where
|
||||
L: ?Sized + Layer,
|
||||
{
|
||||
impl LayerMap {
|
||||
///
|
||||
/// Find the latest layer (by lsn.end) that covers the given
|
||||
/// 'key', with lsn.start < 'end_lsn'.
|
||||
@@ -235,7 +183,7 @@ where
|
||||
/// NOTE: This only searches the 'historic' layers, *not* the
|
||||
/// 'open' and 'frozen' layers!
|
||||
///
|
||||
pub fn search(&self, key: Key, end_lsn: Lsn) -> Option<SearchResult<L>> {
|
||||
pub fn search(&self, key: Key, end_lsn: Lsn) -> Option<SearchResult> {
|
||||
let version = self.historic.get().unwrap().get_version(end_lsn.0 - 1)?;
|
||||
let latest_delta = version.delta_coverage.query(key.to_i128());
|
||||
let latest_image = version.image_coverage.query(key.to_i128());
|
||||
@@ -244,7 +192,6 @@ where
|
||||
(None, None) => None,
|
||||
(None, Some(image)) => {
|
||||
let lsn_floor = image.get_lsn_range().start;
|
||||
let image = self.get_layer_from_mapping(&image.key()).clone();
|
||||
Some(SearchResult {
|
||||
layer: image,
|
||||
lsn_floor,
|
||||
@@ -252,7 +199,6 @@ where
|
||||
}
|
||||
(Some(delta), None) => {
|
||||
let lsn_floor = delta.get_lsn_range().start;
|
||||
let delta = self.get_layer_from_mapping(&delta.key()).clone();
|
||||
Some(SearchResult {
|
||||
layer: delta,
|
||||
lsn_floor,
|
||||
@@ -263,7 +209,6 @@ where
|
||||
let image_is_newer = image.get_lsn_range().end >= delta.get_lsn_range().end;
|
||||
let image_exact_match = img_lsn + 1 == end_lsn;
|
||||
if image_is_newer || image_exact_match {
|
||||
let image = self.get_layer_from_mapping(&image.key()).clone();
|
||||
Some(SearchResult {
|
||||
layer: image,
|
||||
lsn_floor: img_lsn,
|
||||
@@ -271,7 +216,6 @@ where
|
||||
} else {
|
||||
let lsn_floor =
|
||||
std::cmp::max(delta.get_lsn_range().start, image.get_lsn_range().start + 1);
|
||||
let delta = self.get_layer_from_mapping(&delta.key()).clone();
|
||||
Some(SearchResult {
|
||||
layer: delta,
|
||||
lsn_floor,
|
||||
@@ -282,7 +226,7 @@ where
|
||||
}
|
||||
|
||||
/// Start a batch of updates, applied on drop
|
||||
pub fn batch_update(&mut self) -> BatchedUpdates<'_, L> {
|
||||
pub fn batch_update(&mut self) -> BatchedUpdates<'_> {
|
||||
BatchedUpdates { layer_map: self }
|
||||
}
|
||||
|
||||
@@ -292,48 +236,32 @@ where
|
||||
/// Helper function for BatchedUpdates::insert_historic
|
||||
///
|
||||
/// TODO(chi): remove L generic so that we do not need to pass layer object.
|
||||
pub(self) fn insert_historic_noflush(
|
||||
&mut self,
|
||||
layer_desc: PersistentLayerDesc,
|
||||
layer: Arc<L>,
|
||||
) {
|
||||
self.mapping.insert(layer_desc.key(), layer.clone());
|
||||
|
||||
pub(self) fn insert_historic_noflush(&mut self, layer_desc: PersistentLayerDesc) {
|
||||
// TODO: See #3869, resulting #4088, attempted fix and repro #4094
|
||||
|
||||
if Self::is_l0(&layer) {
|
||||
if Self::is_l0(&layer_desc) {
|
||||
self.l0_delta_layers.push(layer_desc.clone().into());
|
||||
}
|
||||
|
||||
self.historic.insert(
|
||||
historic_layer_coverage::LayerKey::from(&*layer),
|
||||
historic_layer_coverage::LayerKey::from(&layer_desc),
|
||||
layer_desc.into(),
|
||||
);
|
||||
}
|
||||
|
||||
fn get_layer_from_mapping(&self, key: &PersistentLayerKey) -> &Arc<L> {
|
||||
let layer = self
|
||||
.mapping
|
||||
.get(key)
|
||||
.with_context(|| format!("{key:?}"))
|
||||
.expect("inconsistent layer mapping");
|
||||
layer
|
||||
}
|
||||
|
||||
///
|
||||
/// Remove an on-disk layer from the map.
|
||||
///
|
||||
/// Helper function for BatchedUpdates::remove_historic
|
||||
///
|
||||
pub fn remove_historic_noflush(&mut self, layer_desc: PersistentLayerDesc, layer: Arc<L>) {
|
||||
pub fn remove_historic_noflush(&mut self, layer_desc: PersistentLayerDesc) {
|
||||
self.historic
|
||||
.remove(historic_layer_coverage::LayerKey::from(&*layer));
|
||||
if Self::is_l0(&layer) {
|
||||
.remove(historic_layer_coverage::LayerKey::from(&layer_desc));
|
||||
let layer_key = layer_desc.key();
|
||||
if Self::is_l0(&layer_desc) {
|
||||
let len_before = self.l0_delta_layers.len();
|
||||
let mut l0_delta_layers = std::mem::take(&mut self.l0_delta_layers);
|
||||
l0_delta_layers.retain(|other| {
|
||||
!Self::compare_arced_layers(self.get_layer_from_mapping(&other.key()), &layer)
|
||||
});
|
||||
l0_delta_layers.retain(|other| other.key() != layer_key);
|
||||
self.l0_delta_layers = l0_delta_layers;
|
||||
// this assertion is related to use of Arc::ptr_eq in Self::compare_arced_layers,
|
||||
// there's a chance that the comparison fails at runtime due to it comparing (pointer,
|
||||
@@ -344,69 +272,6 @@ where
|
||||
"failed to locate removed historic layer from l0_delta_layers"
|
||||
);
|
||||
}
|
||||
self.mapping.remove(&layer_desc.key());
|
||||
}
|
||||
|
||||
pub(self) fn replace_historic_noflush(
|
||||
&mut self,
|
||||
expected_desc: PersistentLayerDesc,
|
||||
expected: &Arc<L>,
|
||||
new_desc: PersistentLayerDesc,
|
||||
new: Arc<L>,
|
||||
) -> anyhow::Result<Replacement<Arc<L>>> {
|
||||
let key = historic_layer_coverage::LayerKey::from(&**expected);
|
||||
let other = historic_layer_coverage::LayerKey::from(&*new);
|
||||
|
||||
let expected_l0 = Self::is_l0(expected);
|
||||
let new_l0 = Self::is_l0(&new);
|
||||
|
||||
anyhow::ensure!(
|
||||
key == other,
|
||||
"expected and new must have equal LayerKeys: {key:?} != {other:?}"
|
||||
);
|
||||
|
||||
anyhow::ensure!(
|
||||
expected_l0 == new_l0,
|
||||
"expected and new must both be l0 deltas or neither should be: {expected_l0} != {new_l0}"
|
||||
);
|
||||
|
||||
let l0_index = if expected_l0 {
|
||||
// find the index in case replace worked, we need to replace that as well
|
||||
let pos = self.l0_delta_layers.iter().position(|slot| {
|
||||
Self::compare_arced_layers(self.get_layer_from_mapping(&slot.key()), expected)
|
||||
});
|
||||
|
||||
if pos.is_none() {
|
||||
return Ok(Replacement::NotFound);
|
||||
}
|
||||
pos
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
let new_desc = Arc::new(new_desc);
|
||||
let replaced = self.historic.replace(&key, new_desc.clone(), |existing| {
|
||||
**existing == expected_desc
|
||||
});
|
||||
|
||||
if let Replacement::Replaced { .. } = &replaced {
|
||||
self.mapping.remove(&expected_desc.key());
|
||||
self.mapping.insert(new_desc.key(), new);
|
||||
if let Some(index) = l0_index {
|
||||
self.l0_delta_layers[index] = new_desc;
|
||||
}
|
||||
}
|
||||
|
||||
let replaced = match replaced {
|
||||
Replacement::Replaced { in_buffered } => Replacement::Replaced { in_buffered },
|
||||
Replacement::NotFound => Replacement::NotFound,
|
||||
Replacement::RemovalBuffered => Replacement::RemovalBuffered,
|
||||
Replacement::Unexpected(x) => {
|
||||
Replacement::Unexpected(self.get_layer_from_mapping(&x.key()).clone())
|
||||
}
|
||||
};
|
||||
|
||||
Ok(replaced)
|
||||
}
|
||||
|
||||
/// Helper function for BatchedUpdates::drop.
|
||||
@@ -454,10 +319,8 @@ where
|
||||
Ok(true)
|
||||
}
|
||||
|
||||
pub fn iter_historic_layers(&self) -> impl '_ + Iterator<Item = Arc<L>> {
|
||||
self.historic
|
||||
.iter()
|
||||
.map(|x| self.get_layer_from_mapping(&x.key()).clone())
|
||||
pub fn iter_historic_layers(&self) -> impl '_ + Iterator<Item = Arc<PersistentLayerDesc>> {
|
||||
self.historic.iter()
|
||||
}
|
||||
|
||||
///
|
||||
@@ -472,7 +335,7 @@ where
|
||||
&self,
|
||||
key_range: &Range<Key>,
|
||||
lsn: Lsn,
|
||||
) -> Result<Vec<(Range<Key>, Option<Arc<L>>)>> {
|
||||
) -> Result<Vec<(Range<Key>, Option<Arc<PersistentLayerDesc>>)>> {
|
||||
let version = match self.historic.get().unwrap().get_version(lsn.0) {
|
||||
Some(v) => v,
|
||||
None => return Ok(vec![]),
|
||||
@@ -482,36 +345,26 @@ where
|
||||
let end = key_range.end.to_i128();
|
||||
|
||||
// Initialize loop variables
|
||||
let mut coverage: Vec<(Range<Key>, Option<Arc<L>>)> = vec![];
|
||||
let mut coverage: Vec<(Range<Key>, Option<Arc<PersistentLayerDesc>>)> = vec![];
|
||||
let mut current_key = start;
|
||||
let mut current_val = version.image_coverage.query(start);
|
||||
|
||||
// Loop through the change events and push intervals
|
||||
for (change_key, change_val) in version.image_coverage.range(start..end) {
|
||||
let kr = Key::from_i128(current_key)..Key::from_i128(change_key);
|
||||
coverage.push((
|
||||
kr,
|
||||
current_val
|
||||
.take()
|
||||
.map(|l| self.get_layer_from_mapping(&l.key()).clone()),
|
||||
));
|
||||
coverage.push((kr, current_val.take()));
|
||||
current_key = change_key;
|
||||
current_val = change_val.clone();
|
||||
}
|
||||
|
||||
// Add the final interval
|
||||
let kr = Key::from_i128(current_key)..Key::from_i128(end);
|
||||
coverage.push((
|
||||
kr,
|
||||
current_val
|
||||
.take()
|
||||
.map(|l| self.get_layer_from_mapping(&l.key()).clone()),
|
||||
));
|
||||
coverage.push((kr, current_val.take()));
|
||||
|
||||
Ok(coverage)
|
||||
}
|
||||
|
||||
pub fn is_l0(layer: &L) -> bool {
|
||||
pub fn is_l0(layer: &PersistentLayerDesc) -> bool {
|
||||
range_eq(&layer.get_key_range(), &(Key::MIN..Key::MAX))
|
||||
}
|
||||
|
||||
@@ -537,7 +390,7 @@ where
|
||||
/// TODO The optimal number should probably be slightly higher than 1, but to
|
||||
/// implement that we need to plumb a lot more context into this function
|
||||
/// than just the current partition_range.
|
||||
pub fn is_reimage_worthy(layer: &L, partition_range: &Range<Key>) -> bool {
|
||||
pub fn is_reimage_worthy(layer: &PersistentLayerDesc, partition_range: &Range<Key>) -> bool {
|
||||
// Case 1
|
||||
if !Self::is_l0(layer) {
|
||||
return true;
|
||||
@@ -595,9 +448,7 @@ where
|
||||
let kr = Key::from_i128(current_key)..Key::from_i128(change_key);
|
||||
let lr = lsn.start..val.get_lsn_range().start;
|
||||
if !kr.is_empty() {
|
||||
let base_count =
|
||||
Self::is_reimage_worthy(self.get_layer_from_mapping(&val.key()), key)
|
||||
as usize;
|
||||
let base_count = Self::is_reimage_worthy(&val, key) as usize;
|
||||
let new_limit = limit.map(|l| l - base_count);
|
||||
let max_stacked_deltas_underneath =
|
||||
self.count_deltas(&kr, &lr, new_limit)?;
|
||||
@@ -620,9 +471,7 @@ where
|
||||
let lr = lsn.start..val.get_lsn_range().start;
|
||||
|
||||
if !kr.is_empty() {
|
||||
let base_count =
|
||||
Self::is_reimage_worthy(self.get_layer_from_mapping(&val.key()), key)
|
||||
as usize;
|
||||
let base_count = Self::is_reimage_worthy(&val, key) as usize;
|
||||
let new_limit = limit.map(|l| l - base_count);
|
||||
let max_stacked_deltas_underneath = self.count_deltas(&kr, &lr, new_limit)?;
|
||||
max_stacked_deltas = std::cmp::max(
|
||||
@@ -772,12 +621,8 @@ where
|
||||
}
|
||||
|
||||
/// Return all L0 delta layers
|
||||
pub fn get_level0_deltas(&self) -> Result<Vec<Arc<L>>> {
|
||||
Ok(self
|
||||
.l0_delta_layers
|
||||
.iter()
|
||||
.map(|x| self.get_layer_from_mapping(&x.key()).clone())
|
||||
.collect())
|
||||
pub fn get_level0_deltas(&self) -> Result<Vec<Arc<PersistentLayerDesc>>> {
|
||||
Ok(self.l0_delta_layers.to_vec())
|
||||
}
|
||||
|
||||
/// debugging function to print out the contents of the layer map
|
||||
@@ -802,97 +647,76 @@ where
|
||||
println!("End dump LayerMap");
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// Similar to `Arc::ptr_eq`, but only compares the object pointers, not vtables.
|
||||
///
|
||||
/// Returns `true` if the two `Arc` point to the same layer, false otherwise.
|
||||
#[inline(always)]
|
||||
pub fn compare_arced_layers(left: &Arc<L>, right: &Arc<L>) -> bool {
|
||||
// "dyn Trait" objects are "fat pointers" in that they have two components:
|
||||
// - pointer to the object
|
||||
// - pointer to the vtable
|
||||
//
|
||||
// rust does not provide a guarantee that these vtables are unique, but however
|
||||
// `Arc::ptr_eq` as of writing (at least up to 1.67) uses a comparison where both the
|
||||
// pointer and the vtable need to be equal.
|
||||
//
|
||||
// See: https://github.com/rust-lang/rust/issues/103763
|
||||
//
|
||||
// A future version of rust will most likely use this form below, where we cast each
|
||||
// pointer into a pointer to unit, which drops the inaccessible vtable pointer, making it
|
||||
// not affect the comparison.
|
||||
//
|
||||
// See: https://github.com/rust-lang/rust/pull/106450
|
||||
let left = Arc::as_ptr(left) as *const ();
|
||||
let right = Arc::as_ptr(right) as *const ();
|
||||
/// Similar to `Arc::ptr_eq`, but only compares the object pointers, not vtables.
|
||||
///
|
||||
/// Returns `true` if the two `Arc` point to the same layer, false otherwise.
|
||||
///
|
||||
/// If comparing persistent layers, ALWAYS compare the layer descriptor key.
|
||||
#[inline(always)]
|
||||
pub fn compare_arced_layers<L: ?Sized>(left: &Arc<L>, right: &Arc<L>) -> bool {
|
||||
// "dyn Trait" objects are "fat pointers" in that they have two components:
|
||||
// - pointer to the object
|
||||
// - pointer to the vtable
|
||||
//
|
||||
// rust does not provide a guarantee that these vtables are unique, but however
|
||||
// `Arc::ptr_eq` as of writing (at least up to 1.67) uses a comparison where both the
|
||||
// pointer and the vtable need to be equal.
|
||||
//
|
||||
// See: https://github.com/rust-lang/rust/issues/103763
|
||||
//
|
||||
// A future version of rust will most likely use this form below, where we cast each
|
||||
// pointer into a pointer to unit, which drops the inaccessible vtable pointer, making it
|
||||
// not affect the comparison.
|
||||
//
|
||||
// See: https://github.com/rust-lang/rust/pull/106450
|
||||
let left = Arc::as_ptr(left) as *const ();
|
||||
let right = Arc::as_ptr(right) as *const ();
|
||||
|
||||
left == right
|
||||
}
|
||||
left == right
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::{LayerMap, Replacement};
|
||||
use crate::tenant::storage_layer::{Layer, LayerDescriptor, LayerFileName};
|
||||
use super::LayerMap;
|
||||
use crate::tenant::storage_layer::{tests::LayerDescriptor, LayerFileName};
|
||||
use std::str::FromStr;
|
||||
use std::sync::Arc;
|
||||
|
||||
mod l0_delta_layers_updated {
|
||||
|
||||
use crate::tenant::storage_layer::{PersistentLayer, PersistentLayerDesc};
|
||||
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn for_full_range_delta() {
|
||||
// l0_delta_layers are used by compaction, and should observe all buffered updates
|
||||
l0_delta_layers_updated_scenario(
|
||||
"000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000053423C21-0000000053424D69",
|
||||
true
|
||||
)
|
||||
"000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000053423C21-0000000053424D69",
|
||||
true
|
||||
)
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn for_non_full_range_delta() {
|
||||
// has minimal uncovered areas compared to l0_delta_layers_updated_on_insert_replace_remove_for_full_range_delta
|
||||
l0_delta_layers_updated_scenario(
|
||||
"000000000000000000000000000000000001-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFE__0000000053423C21-0000000053424D69",
|
||||
// because not full range
|
||||
false
|
||||
)
|
||||
"000000000000000000000000000000000001-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFE__0000000053423C21-0000000053424D69",
|
||||
// because not full range
|
||||
false
|
||||
)
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn for_image() {
|
||||
l0_delta_layers_updated_scenario(
|
||||
"000000000000000000000000000000000000-000000000000000000000000000000010000__0000000053424D69",
|
||||
// code only checks if it is a full range layer, doesn't care about images, which must
|
||||
// mean we should in practice never have full range images
|
||||
false
|
||||
)
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn replacing_missing_l0_is_notfound() {
|
||||
// original impl had an oversight, and L0 was an anyhow::Error. anyhow::Error should
|
||||
// however only happen for precondition failures.
|
||||
|
||||
let layer = "000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000053423C21-0000000053424D69";
|
||||
let layer = LayerFileName::from_str(layer).unwrap();
|
||||
let layer = LayerDescriptor::from(layer);
|
||||
|
||||
// same skeletan construction; see scenario below
|
||||
let not_found = Arc::new(layer.clone());
|
||||
let new_version = Arc::new(layer);
|
||||
|
||||
let mut map = LayerMap::default();
|
||||
|
||||
let res = map.batch_update().replace_historic(
|
||||
not_found.get_persistent_layer_desc(),
|
||||
¬_found,
|
||||
new_version.get_persistent_layer_desc(),
|
||||
new_version,
|
||||
);
|
||||
|
||||
assert!(matches!(res, Ok(Replacement::NotFound)), "{res:?}");
|
||||
"000000000000000000000000000000000000-000000000000000000000000000000010000__0000000053424D69",
|
||||
// code only checks if it is a full range layer, doesn't care about images, which must
|
||||
// mean we should in practice never have full range images
|
||||
false
|
||||
)
|
||||
}
|
||||
|
||||
fn l0_delta_layers_updated_scenario(layer_name: &str, expected_l0: bool) {
|
||||
@@ -906,46 +730,31 @@ mod tests {
|
||||
|
||||
// two disjoint Arcs in different lifecycle phases. even if it seems they must be the
|
||||
// same layer, we use LayerMap::compare_arced_layers as the identity of layers.
|
||||
assert!(!LayerMap::compare_arced_layers(&remote, &downloaded));
|
||||
assert_eq!(remote.layer_desc(), downloaded.layer_desc());
|
||||
|
||||
let expected_in_counts = (1, usize::from(expected_l0));
|
||||
|
||||
map.batch_update()
|
||||
.insert_historic(remote.get_persistent_layer_desc(), remote.clone());
|
||||
assert_eq!(count_layer_in(&map, &remote), expected_in_counts);
|
||||
|
||||
let replaced = map
|
||||
.batch_update()
|
||||
.replace_historic(
|
||||
remote.get_persistent_layer_desc(),
|
||||
&remote,
|
||||
downloaded.get_persistent_layer_desc(),
|
||||
downloaded.clone(),
|
||||
)
|
||||
.expect("name derived attributes are the same");
|
||||
assert!(
|
||||
matches!(replaced, Replacement::Replaced { .. }),
|
||||
"{replaced:?}"
|
||||
.insert_historic(remote.layer_desc().clone());
|
||||
assert_eq!(
|
||||
count_layer_in(&map, remote.layer_desc()),
|
||||
expected_in_counts
|
||||
);
|
||||
assert_eq!(count_layer_in(&map, &downloaded), expected_in_counts);
|
||||
|
||||
map.batch_update()
|
||||
.remove_historic(downloaded.get_persistent_layer_desc(), downloaded.clone());
|
||||
assert_eq!(count_layer_in(&map, &downloaded), (0, 0));
|
||||
.remove_historic(downloaded.layer_desc().clone());
|
||||
assert_eq!(count_layer_in(&map, downloaded.layer_desc()), (0, 0));
|
||||
}
|
||||
|
||||
fn count_layer_in<L: Layer + ?Sized>(map: &LayerMap<L>, layer: &Arc<L>) -> (usize, usize) {
|
||||
fn count_layer_in(map: &LayerMap, layer: &PersistentLayerDesc) -> (usize, usize) {
|
||||
let historic = map
|
||||
.iter_historic_layers()
|
||||
.filter(|x| LayerMap::compare_arced_layers(x, layer))
|
||||
.filter(|x| x.key() == layer.key())
|
||||
.count();
|
||||
let l0s = map
|
||||
.get_level0_deltas()
|
||||
.expect("why does this return a result");
|
||||
let l0 = l0s
|
||||
.iter()
|
||||
.filter(|x| LayerMap::compare_arced_layers(x, layer))
|
||||
.count();
|
||||
let l0 = l0s.iter().filter(|x| x.key() == layer.key()).count();
|
||||
|
||||
(historic, l0)
|
||||
}
|
||||
|
||||
@@ -3,6 +3,8 @@ use std::ops::Range;
|
||||
|
||||
use tracing::info;
|
||||
|
||||
use crate::tenant::storage_layer::PersistentLayerDesc;
|
||||
|
||||
use super::layer_coverage::LayerCoverageTuple;
|
||||
|
||||
/// Layers in this module are identified and indexed by this data.
|
||||
@@ -53,11 +55,24 @@ impl<'a, L: crate::tenant::storage_layer::Layer + ?Sized> From<&'a L> for LayerK
|
||||
}
|
||||
}
|
||||
|
||||
impl From<&PersistentLayerDesc> for LayerKey {
|
||||
fn from(layer: &PersistentLayerDesc) -> Self {
|
||||
let kr = layer.get_key_range();
|
||||
let lr = layer.get_lsn_range();
|
||||
LayerKey {
|
||||
key: kr.start.to_i128()..kr.end.to_i128(),
|
||||
lsn: lr.start.0..lr.end.0,
|
||||
is_image: !layer.is_incremental(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Efficiently queryable layer coverage for each LSN.
|
||||
///
|
||||
/// Allows answering layer map queries very efficiently,
|
||||
/// but doesn't allow retroactive insertion, which is
|
||||
/// sometimes necessary. See BufferedHistoricLayerCoverage.
|
||||
#[derive(Clone)]
|
||||
pub struct HistoricLayerCoverage<Value> {
|
||||
/// The latest state
|
||||
head: LayerCoverageTuple<Value>,
|
||||
@@ -411,6 +426,7 @@ fn test_persistent_overlapping() {
|
||||
///
|
||||
/// See this for more on persistent and retroactive techniques:
|
||||
/// https://www.youtube.com/watch?v=WqCWghETNDc&t=581s
|
||||
#[derive(Clone)]
|
||||
pub struct BufferedHistoricLayerCoverage<Value> {
|
||||
/// A persistent layer map that we rebuild when we need to retroactively update
|
||||
historic_coverage: HistoricLayerCoverage<Value>,
|
||||
@@ -467,6 +483,11 @@ impl<Value: Clone> BufferedHistoricLayerCoverage<Value> {
|
||||
///
|
||||
/// Returns a `Replacement` value describing the outcome; only the case of
|
||||
/// `Replacement::Replaced` modifies the map and requires a rebuild.
|
||||
///
|
||||
/// This function is unlikely to be used in the future because LayerMap now only records the
|
||||
/// layer descriptors. Therefore, anything added to the layer map will only be removed or
|
||||
/// added, and never replaced.
|
||||
#[allow(dead_code)]
|
||||
pub fn replace<F>(
|
||||
&mut self,
|
||||
layer_key: &LayerKey,
|
||||
|
||||
@@ -15,6 +15,7 @@ use rpds::RedBlackTreeMapSync;
|
||||
///
|
||||
/// NOTE The struct is parameterized over Value for easier
|
||||
/// testing, but in practice it's some sort of layer.
|
||||
#[derive(Clone)]
|
||||
pub struct LayerCoverage<Value> {
|
||||
/// For every change in coverage (as we sweep the key space)
|
||||
/// we store (lsn.end, value).
|
||||
@@ -139,6 +140,7 @@ impl<Value: Clone> LayerCoverage<Value> {
|
||||
}
|
||||
|
||||
/// Image and delta coverage at a specific LSN.
|
||||
#[derive(Clone)]
|
||||
pub struct LayerCoverageTuple<Value> {
|
||||
pub image_coverage: LayerCoverage<Value>,
|
||||
pub delta_coverage: LayerCoverage<Value>,
|
||||
|
||||
146
pageserver/src/tenant/layer_map_mgr.rs
Normal file
146
pageserver/src/tenant/layer_map_mgr.rs
Normal file
@@ -0,0 +1,146 @@
|
||||
//! This module implements `LayerMapMgr`, which manages a layer map object and provides lock-free access to the state.
|
||||
//!
|
||||
//! A common usage pattern is as follows:
|
||||
//!
|
||||
//! ```ignore
|
||||
//! async fn compaction(&self) {
|
||||
//! // Get the current state.
|
||||
//! let state = self.layer_map_mgr.read();
|
||||
//! // No lock held at this point. Do compaction based on the state. This part usually incurs I/O operations and may
|
||||
//! // take a long time.
|
||||
//! let compaction_result = self.do_compaction(&state).await?;
|
||||
//! // Update the state.
|
||||
//! self.layer_map_mgr.update(|mut state| async move {
|
||||
//! // do updates to the state, return it.
|
||||
//! Ok(state)
|
||||
//! }).await?;
|
||||
//! }
|
||||
//! ```
|
||||
use anyhow::Result;
|
||||
use arc_swap::ArcSwap;
|
||||
use futures::Future;
|
||||
use std::sync::Arc;
|
||||
|
||||
use super::layer_map::LayerMap;
|
||||
|
||||
/// Manages the storage state. Provide utility functions to modify the layer map and get an immutable reference to the
|
||||
/// layer map.
|
||||
pub struct LayerMapMgr {
|
||||
layer_map: ArcSwap<LayerMap>,
|
||||
state_lock: tokio::sync::Mutex<()>,
|
||||
}
|
||||
|
||||
impl LayerMapMgr {
|
||||
/// Get the current state of the layer map.
|
||||
pub fn read(&self) -> Arc<LayerMap> {
|
||||
// TODO: it is possible to use `load` to reduce the overhead of cloning the Arc, but read path usually involves
|
||||
// disk reads and layer mapping fetching, and therefore it's not a big deal to use a more optimized version
|
||||
// here.
|
||||
self.layer_map.load_full()
|
||||
}
|
||||
|
||||
/// Clone the layer map for modification.
|
||||
fn clone_for_write(&self, _state_lock_witness: &tokio::sync::MutexGuard<'_, ()>) -> LayerMap {
|
||||
(**self.layer_map.load()).clone()
|
||||
}
|
||||
|
||||
pub fn new(layer_map: LayerMap) -> Self {
|
||||
Self {
|
||||
layer_map: ArcSwap::new(Arc::new(layer_map)),
|
||||
state_lock: tokio::sync::Mutex::new(()),
|
||||
}
|
||||
}
|
||||
|
||||
/// Update the layer map.
|
||||
pub async fn update<O, F>(&self, operation: O) -> Result<()>
|
||||
where
|
||||
O: FnOnce(LayerMap) -> F,
|
||||
F: Future<Output = Result<LayerMap>>,
|
||||
{
|
||||
let state_lock = self.state_lock.lock().await;
|
||||
let state = self.clone_for_write(&state_lock);
|
||||
let new_state = operation(state).await?;
|
||||
self.layer_map.store(Arc::new(new_state));
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use utils::{
|
||||
id::{TenantId, TimelineId},
|
||||
lsn::Lsn,
|
||||
};
|
||||
|
||||
use crate::{repository::Key, tenant::storage_layer::PersistentLayerDesc};
|
||||
|
||||
use super::*;
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_layer_map_manage() -> Result<()> {
|
||||
let mgr = LayerMapMgr::new(Default::default());
|
||||
mgr.update(|mut map| async move {
|
||||
let mut updates = map.batch_update();
|
||||
updates.insert_historic(PersistentLayerDesc::new_img(
|
||||
TenantId::generate(),
|
||||
TimelineId::generate(),
|
||||
Key::from_i128(0)..Key::from_i128(1),
|
||||
Lsn(0),
|
||||
false,
|
||||
0,
|
||||
));
|
||||
updates.flush();
|
||||
Ok(map)
|
||||
})
|
||||
.await?;
|
||||
|
||||
let ref_1 = mgr.read();
|
||||
|
||||
mgr.update(|mut map| async move {
|
||||
let mut updates = map.batch_update();
|
||||
updates.insert_historic(PersistentLayerDesc::new_img(
|
||||
TenantId::generate(),
|
||||
TimelineId::generate(),
|
||||
Key::from_i128(1)..Key::from_i128(2),
|
||||
Lsn(0),
|
||||
false,
|
||||
0,
|
||||
));
|
||||
updates.flush();
|
||||
Ok(map)
|
||||
})
|
||||
.await?;
|
||||
|
||||
let ref_2 = mgr.read();
|
||||
|
||||
// Modification should not be visible to the old reference.
|
||||
assert_eq!(
|
||||
ref_1
|
||||
.search(Key::from_i128(0), Lsn(1))
|
||||
.unwrap()
|
||||
.layer
|
||||
.key_range,
|
||||
Key::from_i128(0)..Key::from_i128(1)
|
||||
);
|
||||
assert!(ref_1.search(Key::from_i128(1), Lsn(1)).is_none());
|
||||
|
||||
// Modification should be visible to the new reference.
|
||||
assert_eq!(
|
||||
ref_2
|
||||
.search(Key::from_i128(0), Lsn(1))
|
||||
.unwrap()
|
||||
.layer
|
||||
.key_range,
|
||||
Key::from_i128(0)..Key::from_i128(1)
|
||||
);
|
||||
assert_eq!(
|
||||
ref_2
|
||||
.search(Key::from_i128(1), Lsn(1))
|
||||
.unwrap()
|
||||
.layer
|
||||
.key_range,
|
||||
Key::from_i128(1)..Key::from_i128(2)
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
@@ -1392,7 +1392,12 @@ mod tests {
|
||||
let harness = TenantHarness::create(test_name)?;
|
||||
let (tenant, ctx) = runtime.block_on(harness.load());
|
||||
// create an empty timeline directory
|
||||
let _ = tenant.create_test_timeline(TIMELINE_ID, Lsn(8), DEFAULT_PG_VERSION, &ctx)?;
|
||||
let _ = runtime.block_on(tenant.create_test_timeline(
|
||||
TIMELINE_ID,
|
||||
Lsn(8),
|
||||
DEFAULT_PG_VERSION,
|
||||
&ctx,
|
||||
))?;
|
||||
|
||||
let remote_fs_dir = harness.conf.workdir.join("remote_fs");
|
||||
std::fs::create_dir_all(remote_fs_dir)?;
|
||||
|
||||
@@ -41,8 +41,6 @@ pub use inmemory_layer::InMemoryLayer;
|
||||
pub use layer_desc::{PersistentLayerDesc, PersistentLayerKey};
|
||||
pub use remote_layer::RemoteLayer;
|
||||
|
||||
use super::layer_map::BatchedUpdates;
|
||||
|
||||
pub fn range_overlaps<T>(a: &Range<T>, b: &Range<T>) -> bool
|
||||
where
|
||||
T: PartialOrd<T>,
|
||||
@@ -176,19 +174,9 @@ impl LayerAccessStats {
|
||||
/// Create an empty stats object and record a [`LayerLoad`] event with the given residence status.
|
||||
///
|
||||
/// See [`record_residence_event`] for why you need to do this while holding the layer map lock.
|
||||
pub(crate) fn for_loading_layer<L>(
|
||||
layer_map_lock_held_witness: &BatchedUpdates<'_, L>,
|
||||
status: LayerResidenceStatus,
|
||||
) -> Self
|
||||
where
|
||||
L: ?Sized + Layer,
|
||||
{
|
||||
pub(crate) fn for_loading_layer(status: LayerResidenceStatus) -> Self {
|
||||
let new = LayerAccessStats(Mutex::new(LayerAccessStatsLocked::default()));
|
||||
new.record_residence_event(
|
||||
layer_map_lock_held_witness,
|
||||
status,
|
||||
LayerResidenceEventReason::LayerLoad,
|
||||
);
|
||||
new.record_residence_event(status, LayerResidenceEventReason::LayerLoad);
|
||||
new
|
||||
}
|
||||
|
||||
@@ -197,24 +185,16 @@ impl LayerAccessStats {
|
||||
/// The `new_status` is not recorded in `self`.
|
||||
///
|
||||
/// See [`record_residence_event`] for why you need to do this while holding the layer map lock.
|
||||
pub(crate) fn clone_for_residence_change<L>(
|
||||
pub(crate) fn clone_for_residence_change(
|
||||
&self,
|
||||
layer_map_lock_held_witness: &BatchedUpdates<'_, L>,
|
||||
new_status: LayerResidenceStatus,
|
||||
) -> LayerAccessStats
|
||||
where
|
||||
L: ?Sized + Layer,
|
||||
{
|
||||
) -> LayerAccessStats {
|
||||
let clone = {
|
||||
let inner = self.0.lock().unwrap();
|
||||
inner.clone()
|
||||
};
|
||||
let new = LayerAccessStats(Mutex::new(clone));
|
||||
new.record_residence_event(
|
||||
layer_map_lock_held_witness,
|
||||
new_status,
|
||||
LayerResidenceEventReason::ResidenceChange,
|
||||
);
|
||||
new.record_residence_event(new_status, LayerResidenceEventReason::ResidenceChange);
|
||||
new
|
||||
}
|
||||
|
||||
@@ -232,14 +212,11 @@ impl LayerAccessStats {
|
||||
/// - Compact: Grab layer map lock, add the new L1 to layer map and remove the L0s, release layer map lock.
|
||||
/// - Eviction: observes the new L1 layer whose only activity timestamp is the LayerCreate event.
|
||||
///
|
||||
pub(crate) fn record_residence_event<L>(
|
||||
pub(crate) fn record_residence_event(
|
||||
&self,
|
||||
_layer_map_lock_held_witness: &BatchedUpdates<'_, L>,
|
||||
status: LayerResidenceStatus,
|
||||
reason: LayerResidenceEventReason,
|
||||
) where
|
||||
L: ?Sized + Layer,
|
||||
{
|
||||
) {
|
||||
let mut locked = self.0.lock().unwrap();
|
||||
locked.iter_mut().for_each(|inner| {
|
||||
inner
|
||||
@@ -389,10 +366,10 @@ pub trait Layer: std::fmt::Debug + Send + Sync {
|
||||
}
|
||||
|
||||
/// Returned by [`Layer::iter`]
|
||||
pub type LayerIter<'i> = Box<dyn Iterator<Item = Result<(Key, Lsn, Value)>> + 'i>;
|
||||
pub type LayerIter<'i> = Box<dyn Iterator<Item = Result<(Key, Lsn, Value)>> + 'i + Send>;
|
||||
|
||||
/// Returned by [`Layer::key_iter`]
|
||||
pub type LayerKeyIter<'i> = Box<dyn Iterator<Item = (Key, Lsn, u64)> + 'i>;
|
||||
pub type LayerKeyIter<'i> = Box<dyn Iterator<Item = (Key, Lsn, u64)> + 'i + Send>;
|
||||
|
||||
/// A Layer contains all data in a "rectangle" consisting of a range of keys and
|
||||
/// range of LSNs.
|
||||
@@ -473,94 +450,125 @@ pub fn downcast_remote_layer(
|
||||
}
|
||||
}
|
||||
|
||||
/// Holds metadata about a layer without any content. Used mostly for testing.
|
||||
///
|
||||
/// To use filenames as fixtures, parse them as [`LayerFileName`] then convert from that to a
|
||||
/// LayerDescriptor.
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct LayerDescriptor {
|
||||
pub key: Range<Key>,
|
||||
pub lsn: Range<Lsn>,
|
||||
pub is_incremental: bool,
|
||||
pub short_id: String,
|
||||
}
|
||||
pub mod tests {
|
||||
use super::*;
|
||||
|
||||
impl LayerDescriptor {
|
||||
/// `LayerDescriptor` is only used for testing purpose so it does not matter whether it is image / delta,
|
||||
/// and the tenant / timeline id does not matter.
|
||||
pub fn get_persistent_layer_desc(&self) -> PersistentLayerDesc {
|
||||
PersistentLayerDesc::new_delta(
|
||||
TenantId::from_array([0; 16]),
|
||||
TimelineId::from_array([0; 16]),
|
||||
self.key.clone(),
|
||||
self.lsn.clone(),
|
||||
233,
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
impl Layer for LayerDescriptor {
|
||||
fn get_key_range(&self) -> Range<Key> {
|
||||
self.key.clone()
|
||||
/// Holds metadata about a layer without any content. Used mostly for testing.
|
||||
///
|
||||
/// To use filenames as fixtures, parse them as [`LayerFileName`] then convert from that to a
|
||||
/// LayerDescriptor.
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct LayerDescriptor {
|
||||
base: PersistentLayerDesc,
|
||||
}
|
||||
|
||||
fn get_lsn_range(&self) -> Range<Lsn> {
|
||||
self.lsn.clone()
|
||||
}
|
||||
|
||||
fn is_incremental(&self) -> bool {
|
||||
self.is_incremental
|
||||
}
|
||||
|
||||
fn get_value_reconstruct_data(
|
||||
&self,
|
||||
_key: Key,
|
||||
_lsn_range: Range<Lsn>,
|
||||
_reconstruct_data: &mut ValueReconstructState,
|
||||
_ctx: &RequestContext,
|
||||
) -> Result<ValueReconstructResult> {
|
||||
todo!("This method shouldn't be part of the Layer trait")
|
||||
}
|
||||
|
||||
fn short_id(&self) -> String {
|
||||
self.short_id.clone()
|
||||
}
|
||||
|
||||
fn dump(&self, _verbose: bool, _ctx: &RequestContext) -> Result<()> {
|
||||
todo!()
|
||||
}
|
||||
}
|
||||
|
||||
impl From<DeltaFileName> for LayerDescriptor {
|
||||
fn from(value: DeltaFileName) -> Self {
|
||||
let short_id = value.to_string();
|
||||
LayerDescriptor {
|
||||
key: value.key_range,
|
||||
lsn: value.lsn_range,
|
||||
is_incremental: true,
|
||||
short_id,
|
||||
impl From<PersistentLayerDesc> for LayerDescriptor {
|
||||
fn from(base: PersistentLayerDesc) -> Self {
|
||||
Self { base }
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<ImageFileName> for LayerDescriptor {
|
||||
fn from(value: ImageFileName) -> Self {
|
||||
let short_id = value.to_string();
|
||||
let lsn = value.lsn_as_range();
|
||||
LayerDescriptor {
|
||||
key: value.key_range,
|
||||
lsn,
|
||||
is_incremental: false,
|
||||
short_id,
|
||||
impl Layer for LayerDescriptor {
|
||||
fn get_value_reconstruct_data(
|
||||
&self,
|
||||
_key: Key,
|
||||
_lsn_range: Range<Lsn>,
|
||||
_reconstruct_data: &mut ValueReconstructState,
|
||||
_ctx: &RequestContext,
|
||||
) -> Result<ValueReconstructResult> {
|
||||
todo!("This method shouldn't be part of the Layer trait")
|
||||
}
|
||||
|
||||
fn dump(&self, _verbose: bool, _ctx: &RequestContext) -> Result<()> {
|
||||
todo!()
|
||||
}
|
||||
|
||||
/// Boilerplate to implement the Layer trait, always use layer_desc for persistent layers.
|
||||
fn get_key_range(&self) -> Range<Key> {
|
||||
self.layer_desc().key_range.clone()
|
||||
}
|
||||
|
||||
/// Boilerplate to implement the Layer trait, always use layer_desc for persistent layers.
|
||||
fn get_lsn_range(&self) -> Range<Lsn> {
|
||||
self.layer_desc().lsn_range.clone()
|
||||
}
|
||||
|
||||
/// Boilerplate to implement the Layer trait, always use layer_desc for persistent layers.
|
||||
fn is_incremental(&self) -> bool {
|
||||
self.layer_desc().is_incremental
|
||||
}
|
||||
|
||||
/// Boilerplate to implement the Layer trait, always use layer_desc for persistent layers.
|
||||
fn short_id(&self) -> String {
|
||||
self.layer_desc().short_id()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<LayerFileName> for LayerDescriptor {
|
||||
fn from(value: LayerFileName) -> Self {
|
||||
match value {
|
||||
LayerFileName::Delta(d) => Self::from(d),
|
||||
LayerFileName::Image(i) => Self::from(i),
|
||||
impl PersistentLayer for LayerDescriptor {
|
||||
fn layer_desc(&self) -> &PersistentLayerDesc {
|
||||
&self.base
|
||||
}
|
||||
|
||||
fn local_path(&self) -> Option<PathBuf> {
|
||||
unimplemented!()
|
||||
}
|
||||
|
||||
fn iter(&self, _: &RequestContext) -> Result<LayerIter<'_>> {
|
||||
unimplemented!()
|
||||
}
|
||||
|
||||
fn key_iter(&self, _: &RequestContext) -> Result<LayerKeyIter<'_>> {
|
||||
unimplemented!()
|
||||
}
|
||||
|
||||
fn delete_resident_layer_file(&self) -> Result<()> {
|
||||
unimplemented!()
|
||||
}
|
||||
|
||||
fn info(&self, _: LayerAccessStatsReset) -> HistoricLayerInfo {
|
||||
unimplemented!()
|
||||
}
|
||||
|
||||
fn access_stats(&self) -> &LayerAccessStats {
|
||||
unimplemented!()
|
||||
}
|
||||
}
|
||||
|
||||
impl From<DeltaFileName> for LayerDescriptor {
|
||||
fn from(value: DeltaFileName) -> Self {
|
||||
LayerDescriptor {
|
||||
base: PersistentLayerDesc::new_delta(
|
||||
TenantId::from_array([0; 16]),
|
||||
TimelineId::from_array([0; 16]),
|
||||
value.key_range,
|
||||
value.lsn_range,
|
||||
233,
|
||||
),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<ImageFileName> for LayerDescriptor {
|
||||
fn from(value: ImageFileName) -> Self {
|
||||
LayerDescriptor {
|
||||
base: PersistentLayerDesc::new_img(
|
||||
TenantId::from_array([0; 16]),
|
||||
TimelineId::from_array([0; 16]),
|
||||
value.key_range,
|
||||
value.lsn,
|
||||
false,
|
||||
233,
|
||||
),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<LayerFileName> for LayerDescriptor {
|
||||
fn from(value: LayerFileName) -> Self {
|
||||
match value {
|
||||
LayerFileName::Delta(d) => Self::from(d),
|
||||
LayerFileName::Image(i) => Self::from(i),
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -37,6 +37,7 @@ use crate::virtual_file::VirtualFile;
|
||||
use crate::{walrecord, TEMP_FILE_SUFFIX};
|
||||
use crate::{DELTA_FILE_MAGIC, STORAGE_FORMAT_VERSION};
|
||||
use anyhow::{bail, ensure, Context, Result};
|
||||
use once_cell::sync::OnceCell;
|
||||
use pageserver_api::models::{HistoricLayerInfo, LayerAccessKind};
|
||||
use rand::{distributions::Alphanumeric, Rng};
|
||||
use serde::{Deserialize, Serialize};
|
||||
@@ -46,7 +47,6 @@ use std::io::{Seek, SeekFrom};
|
||||
use std::ops::Range;
|
||||
use std::os::unix::fs::FileExt;
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};
|
||||
use tracing::*;
|
||||
|
||||
use utils::{
|
||||
@@ -184,7 +184,7 @@ pub struct DeltaLayer {
|
||||
|
||||
access_stats: LayerAccessStats,
|
||||
|
||||
inner: RwLock<DeltaLayerInner>,
|
||||
inner: OnceCell<DeltaLayerInner>,
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for DeltaLayer {
|
||||
@@ -201,21 +201,17 @@ impl std::fmt::Debug for DeltaLayer {
|
||||
}
|
||||
|
||||
pub struct DeltaLayerInner {
|
||||
/// If false, the fields below have not been loaded into memory yet.
|
||||
loaded: bool,
|
||||
|
||||
// values copied from summary
|
||||
index_start_blk: u32,
|
||||
index_root_blk: u32,
|
||||
|
||||
/// Reader object for reading blocks from the file. (None if not loaded yet)
|
||||
file: Option<FileBlockReader<VirtualFile>>,
|
||||
/// Reader object for reading blocks from the file.
|
||||
file: FileBlockReader<VirtualFile>,
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for DeltaLayerInner {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
f.debug_struct("DeltaLayerInner")
|
||||
.field("loaded", &self.loaded)
|
||||
.field("index_start_blk", &self.index_start_blk)
|
||||
.field("index_root_blk", &self.index_root_blk)
|
||||
.finish()
|
||||
@@ -246,7 +242,7 @@ impl Layer for DeltaLayer {
|
||||
inner.index_start_blk, inner.index_root_blk
|
||||
);
|
||||
|
||||
let file = inner.file.as_ref().unwrap();
|
||||
let file = &inner.file;
|
||||
let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
|
||||
inner.index_start_blk,
|
||||
inner.index_root_blk,
|
||||
@@ -315,7 +311,7 @@ impl Layer for DeltaLayer {
|
||||
let inner = self.load(LayerAccessKind::GetValueReconstructData, ctx)?;
|
||||
|
||||
// Scan the page versions backwards, starting from `lsn`.
|
||||
let file = inner.file.as_ref().unwrap();
|
||||
let file = &inner.file;
|
||||
let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
|
||||
inner.index_start_blk,
|
||||
inner.index_root_blk,
|
||||
@@ -500,51 +496,22 @@ impl DeltaLayer {
|
||||
/// Open the underlying file and read the metadata into memory, if it's
|
||||
/// not loaded already.
|
||||
///
|
||||
fn load(
|
||||
&self,
|
||||
access_kind: LayerAccessKind,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<RwLockReadGuard<DeltaLayerInner>> {
|
||||
fn load(&self, access_kind: LayerAccessKind, ctx: &RequestContext) -> Result<&DeltaLayerInner> {
|
||||
self.access_stats
|
||||
.record_access(access_kind, ctx.task_kind());
|
||||
loop {
|
||||
// Quick exit if already loaded
|
||||
let inner = self.inner.read().unwrap();
|
||||
if inner.loaded {
|
||||
return Ok(inner);
|
||||
}
|
||||
|
||||
// Need to open the file and load the metadata. Upgrade our lock to
|
||||
// a write lock. (Or rather, release and re-lock in write mode.)
|
||||
drop(inner);
|
||||
let inner = self.inner.write().unwrap();
|
||||
if !inner.loaded {
|
||||
self.load_inner(inner).with_context(|| {
|
||||
format!("Failed to load delta layer {}", self.path().display())
|
||||
})?;
|
||||
} else {
|
||||
// Another thread loaded it while we were not holding the lock.
|
||||
}
|
||||
|
||||
// We now have the file open and loaded. There's no function to do
|
||||
// that in the std library RwLock, so we have to release and re-lock
|
||||
// in read mode. (To be precise, the lock guard was moved in the
|
||||
// above call to `load_inner`, so it's already been released). And
|
||||
// while we do that, another thread could unload again, so we have
|
||||
// to re-check and retry if that happens.
|
||||
}
|
||||
// Quick exit if already loaded
|
||||
self.inner
|
||||
.get_or_try_init(|| self.load_inner())
|
||||
.with_context(|| format!("Failed to load delta layer {}", self.path().display()))
|
||||
}
|
||||
|
||||
fn load_inner(&self, mut inner: RwLockWriteGuard<DeltaLayerInner>) -> Result<()> {
|
||||
fn load_inner(&self) -> Result<DeltaLayerInner> {
|
||||
let path = self.path();
|
||||
|
||||
// Open the file if it's not open already.
|
||||
if inner.file.is_none() {
|
||||
let file = VirtualFile::open(&path)
|
||||
.with_context(|| format!("Failed to open file '{}'", path.display()))?;
|
||||
inner.file = Some(FileBlockReader::new(file));
|
||||
}
|
||||
let file = inner.file.as_mut().unwrap();
|
||||
let file = VirtualFile::open(&path)
|
||||
.with_context(|| format!("Failed to open file '{}'", path.display()))?;
|
||||
let file = FileBlockReader::new(file);
|
||||
|
||||
let summary_blk = file.read_blk(0)?;
|
||||
let actual_summary = Summary::des_prefix(summary_blk.as_ref())?;
|
||||
|
||||
@@ -571,13 +538,13 @@ impl DeltaLayer {
|
||||
}
|
||||
}
|
||||
|
||||
inner.index_start_blk = actual_summary.index_start_blk;
|
||||
inner.index_root_blk = actual_summary.index_root_blk;
|
||||
|
||||
debug!("loaded from {}", &path.display());
|
||||
|
||||
inner.loaded = true;
|
||||
Ok(())
|
||||
Ok(DeltaLayerInner {
|
||||
file,
|
||||
index_start_blk: actual_summary.index_start_blk,
|
||||
index_root_blk: actual_summary.index_root_blk,
|
||||
})
|
||||
}
|
||||
|
||||
/// Create a DeltaLayer struct representing an existing file on disk.
|
||||
@@ -599,12 +566,7 @@ impl DeltaLayer {
|
||||
file_size,
|
||||
),
|
||||
access_stats,
|
||||
inner: RwLock::new(DeltaLayerInner {
|
||||
loaded: false,
|
||||
file: None,
|
||||
index_start_blk: 0,
|
||||
index_root_blk: 0,
|
||||
}),
|
||||
inner: once_cell::sync::OnceCell::new(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -631,12 +593,7 @@ impl DeltaLayer {
|
||||
metadata.len(),
|
||||
),
|
||||
access_stats: LayerAccessStats::empty_will_record_residence_event_later(),
|
||||
inner: RwLock::new(DeltaLayerInner {
|
||||
loaded: false,
|
||||
file: None,
|
||||
index_start_blk: 0,
|
||||
index_root_blk: 0,
|
||||
}),
|
||||
inner: once_cell::sync::OnceCell::new(),
|
||||
})
|
||||
}
|
||||
|
||||
@@ -800,12 +757,7 @@ impl DeltaLayerWriterInner {
|
||||
metadata.len(),
|
||||
),
|
||||
access_stats: LayerAccessStats::empty_will_record_residence_event_later(),
|
||||
inner: RwLock::new(DeltaLayerInner {
|
||||
loaded: false,
|
||||
file: None,
|
||||
index_start_blk,
|
||||
index_root_blk,
|
||||
}),
|
||||
inner: once_cell::sync::OnceCell::new(),
|
||||
};
|
||||
|
||||
// fsync the file
|
||||
@@ -917,7 +869,7 @@ impl Drop for DeltaLayerWriter {
|
||||
fn drop(&mut self) {
|
||||
if let Some(inner) = self.inner.take() {
|
||||
match inner.blob_writer.into_inner().into_inner() {
|
||||
Ok(vfile) => vfile.remove().unwrap(),
|
||||
Ok(vfile) => vfile.remove(),
|
||||
Err(err) => warn!(
|
||||
"error while flushing buffer of image layer temporary file: {}",
|
||||
err
|
||||
@@ -940,13 +892,13 @@ struct DeltaValueIter<'a> {
|
||||
reader: BlockCursor<Adapter<'a>>,
|
||||
}
|
||||
|
||||
struct Adapter<'a>(RwLockReadGuard<'a, DeltaLayerInner>);
|
||||
struct Adapter<'a>(&'a DeltaLayerInner);
|
||||
|
||||
impl<'a> BlockReader for Adapter<'a> {
|
||||
type BlockLease = PageReadGuard<'static>;
|
||||
|
||||
fn read_blk(&self, blknum: u32) -> Result<Self::BlockLease, std::io::Error> {
|
||||
self.0.file.as_ref().unwrap().read_blk(blknum)
|
||||
self.0.file.read_blk(blknum)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -959,8 +911,8 @@ impl<'a> Iterator for DeltaValueIter<'a> {
|
||||
}
|
||||
|
||||
impl<'a> DeltaValueIter<'a> {
|
||||
fn new(inner: RwLockReadGuard<'a, DeltaLayerInner>) -> Result<Self> {
|
||||
let file = inner.file.as_ref().unwrap();
|
||||
fn new(inner: &'a DeltaLayerInner) -> Result<Self> {
|
||||
let file = &inner.file;
|
||||
let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
|
||||
inner.index_start_blk,
|
||||
inner.index_root_blk,
|
||||
@@ -1033,8 +985,8 @@ impl Iterator for DeltaKeyIter {
|
||||
}
|
||||
|
||||
impl<'a> DeltaKeyIter {
|
||||
fn new(inner: RwLockReadGuard<'a, DeltaLayerInner>) -> Result<Self> {
|
||||
let file = inner.file.as_ref().unwrap();
|
||||
fn new(inner: &'a DeltaLayerInner) -> Result<Self> {
|
||||
let file = &inner.file;
|
||||
let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
|
||||
inner.index_start_blk,
|
||||
inner.index_root_blk,
|
||||
@@ -1074,3 +1026,21 @@ impl<'a> DeltaKeyIter {
|
||||
Ok(iter)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use super::DeltaKeyIter;
|
||||
use super::DeltaLayer;
|
||||
use super::DeltaValueIter;
|
||||
|
||||
// We will soon need the iters to be send in the compaction code.
|
||||
// Cf https://github.com/neondatabase/neon/pull/4462#issuecomment-1587398883
|
||||
// Cf https://github.com/neondatabase/neon/issues/4471
|
||||
#[test]
|
||||
fn is_send() {
|
||||
fn assert_send<T: Send>() {}
|
||||
assert_send::<DeltaLayer>();
|
||||
assert_send::<DeltaValueIter>();
|
||||
assert_send::<DeltaKeyIter>();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -709,7 +709,7 @@ impl ImageLayerWriter {
|
||||
impl Drop for ImageLayerWriter {
|
||||
fn drop(&mut self) {
|
||||
if let Some(inner) = self.inner.take() {
|
||||
inner.blob_writer.into_inner().remove().unwrap();
|
||||
inner.blob_writer.into_inner().remove();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -304,7 +304,7 @@ impl InMemoryLayer {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn put_tombstone(&self, _key_range: Range<Key>, _lsn: Lsn) -> Result<()> {
|
||||
pub async fn put_tombstone(&self, _key_range: Range<Key>, _lsn: Lsn) -> Result<()> {
|
||||
// TODO: Currently, we just leak the storage for any deleted keys
|
||||
|
||||
Ok(())
|
||||
|
||||
@@ -4,7 +4,6 @@
|
||||
use crate::config::PageServerConf;
|
||||
use crate::context::RequestContext;
|
||||
use crate::repository::Key;
|
||||
use crate::tenant::layer_map::BatchedUpdates;
|
||||
use crate::tenant::remote_timeline_client::index::LayerFileMetadata;
|
||||
use crate::tenant::storage_layer::{Layer, ValueReconstructResult, ValueReconstructState};
|
||||
use anyhow::{bail, Result};
|
||||
@@ -218,15 +217,11 @@ impl RemoteLayer {
|
||||
}
|
||||
|
||||
/// Create a Layer struct representing this layer, after it has been downloaded.
|
||||
pub fn create_downloaded_layer<L>(
|
||||
pub fn create_downloaded_layer(
|
||||
&self,
|
||||
layer_map_lock_held_witness: &BatchedUpdates<'_, L>,
|
||||
conf: &'static PageServerConf,
|
||||
file_size: u64,
|
||||
) -> Arc<dyn PersistentLayer>
|
||||
where
|
||||
L: ?Sized + Layer,
|
||||
{
|
||||
) -> Arc<dyn PersistentLayer> {
|
||||
if self.desc.is_delta {
|
||||
let fname = self.desc.delta_file_name();
|
||||
Arc::new(DeltaLayer::new(
|
||||
@@ -235,10 +230,8 @@ impl RemoteLayer {
|
||||
self.desc.tenant_id,
|
||||
&fname,
|
||||
file_size,
|
||||
self.access_stats.clone_for_residence_change(
|
||||
layer_map_lock_held_witness,
|
||||
LayerResidenceStatus::Resident,
|
||||
),
|
||||
self.access_stats
|
||||
.clone_for_residence_change(LayerResidenceStatus::Resident),
|
||||
))
|
||||
} else {
|
||||
let fname = self.desc.image_file_name();
|
||||
@@ -248,10 +241,8 @@ impl RemoteLayer {
|
||||
self.desc.tenant_id,
|
||||
&fname,
|
||||
file_size,
|
||||
self.access_stats.clone_for_residence_change(
|
||||
layer_map_lock_held_witness,
|
||||
LayerResidenceStatus::Resident,
|
||||
),
|
||||
self.access_stats
|
||||
.clone_for_residence_change(LayerResidenceStatus::Resident),
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@@ -197,9 +197,10 @@ impl Timeline {
|
||||
// We don't want to hold the layer map lock during eviction.
|
||||
// So, we just need to deal with this.
|
||||
let candidates: Vec<Arc<dyn PersistentLayer>> = {
|
||||
let layers = self.layers.read().unwrap();
|
||||
let layers = self.layer_mgr.read();
|
||||
let mut candidates = Vec::new();
|
||||
for hist_layer in layers.iter_historic_layers() {
|
||||
let hist_layer = self.lcache.get_from_desc(&hist_layer);
|
||||
if hist_layer.is_remote_layer() {
|
||||
continue;
|
||||
}
|
||||
|
||||
@@ -1325,6 +1325,7 @@ mod tests {
|
||||
let (tenant, ctx) = harness.load().await;
|
||||
let timeline = tenant
|
||||
.create_test_timeline(TIMELINE_ID, Lsn(0x8), crate::DEFAULT_PG_VERSION, &ctx)
|
||||
.await
|
||||
.expect("Failed to create an empty timeline for dummy wal connection manager");
|
||||
|
||||
ConnectionManagerState {
|
||||
|
||||
@@ -304,12 +304,15 @@ pub(super) async fn handle_walreceiver_connection(
|
||||
}
|
||||
}
|
||||
|
||||
timeline.check_checkpoint_distance().with_context(|| {
|
||||
format!(
|
||||
"Failed to check checkpoint distance for timeline {}",
|
||||
timeline.timeline_id
|
||||
)
|
||||
})?;
|
||||
timeline
|
||||
.check_checkpoint_distance()
|
||||
.await
|
||||
.with_context(|| {
|
||||
format!(
|
||||
"Failed to check checkpoint distance for timeline {}",
|
||||
timeline.timeline_id
|
||||
)
|
||||
})?;
|
||||
|
||||
if let Some(last_lsn) = status_update {
|
||||
let timeline_remote_consistent_lsn =
|
||||
|
||||
@@ -324,8 +324,16 @@ impl VirtualFile {
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
/// Idempotently close the file descriptor we might have or have not open for this VirtualFile.
|
||||
pub fn close(&mut self) {
|
||||
pub fn remove(self) {
|
||||
let path = self.path.clone();
|
||||
drop(self);
|
||||
std::fs::remove_file(path).expect("failed to remove the virtual file");
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for VirtualFile {
|
||||
/// If a VirtualFile is dropped, close the underlying file if it was open.
|
||||
fn drop(&mut self) {
|
||||
let handle = self.handle.get_mut().unwrap();
|
||||
|
||||
// We could check with a read-lock first, to avoid waiting on an
|
||||
@@ -343,26 +351,6 @@ impl VirtualFile {
|
||||
.observe_closure_duration(|| slot_guard.file.take());
|
||||
}
|
||||
}
|
||||
|
||||
/// Caller can retry if we return an `Err`.
|
||||
#[allow(clippy::result_large_err)]
|
||||
pub fn remove(mut self) -> Result<(), (Self, std::io::Error)> {
|
||||
// close our fd before unlink system call, so that the unlink actually performs the removal
|
||||
self.close();
|
||||
// Try to remove file on disk.
|
||||
// If it fails, we idempotently closed the fd, but the caller can choose to retry.
|
||||
match std::fs::remove_file(&self.path) {
|
||||
Ok(()) => Ok(()),
|
||||
Err(e) => Err((self, e)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for VirtualFile {
|
||||
/// If a VirtualFile is dropped, close the underlying file if it was open.
|
||||
fn drop(&mut self) {
|
||||
self.close();
|
||||
}
|
||||
}
|
||||
|
||||
impl Read for VirtualFile {
|
||||
|
||||
@@ -333,7 +333,7 @@ impl<'a> WalIngest<'a> {
|
||||
|
||||
// Now that this record has been fully handled, including updating the
|
||||
// checkpoint data, let the repository know that it is up-to-date to this LSN
|
||||
modification.commit()?;
|
||||
modification.commit().await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -1171,7 +1171,6 @@ impl<'a> WalIngest<'a> {
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::pgdatadir_mapping::create_test_timeline;
|
||||
use crate::tenant::harness::*;
|
||||
use crate::tenant::Timeline;
|
||||
use postgres_ffi::v14::xlog_utils::SIZEOF_CHECKPOINT;
|
||||
@@ -1200,7 +1199,7 @@ mod tests {
|
||||
let mut m = tline.begin_modification(Lsn(0x10));
|
||||
m.put_checkpoint(ZERO_CHECKPOINT.clone())?;
|
||||
m.put_relmap_file(0, 111, Bytes::from(""), ctx).await?; // dummy relmapper file
|
||||
m.commit()?;
|
||||
m.commit().await?;
|
||||
let walingest = WalIngest::new(tline, Lsn(0x10), ctx).await?;
|
||||
|
||||
Ok(walingest)
|
||||
@@ -1208,9 +1207,10 @@ mod tests {
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_relsize() -> Result<()> {
|
||||
let harness = TenantHarness::create("test_relsize")?;
|
||||
let (tenant, ctx) = harness.load().await;
|
||||
let tline = create_test_timeline(&tenant, TIMELINE_ID, DEFAULT_PG_VERSION, &ctx)?;
|
||||
let (tenant, ctx) = TenantHarness::create("test_relsize")?.load().await;
|
||||
let tline = tenant
|
||||
.create_test_timeline(TIMELINE_ID, Lsn(8), DEFAULT_PG_VERSION, &ctx)
|
||||
.await?;
|
||||
let mut walingest = init_walingest_test(&tline, &ctx).await?;
|
||||
|
||||
let mut m = tline.begin_modification(Lsn(0x20));
|
||||
@@ -1218,22 +1218,22 @@ mod tests {
|
||||
walingest
|
||||
.put_rel_page_image(&mut m, TESTREL_A, 0, TEST_IMG("foo blk 0 at 2"), &ctx)
|
||||
.await?;
|
||||
m.commit()?;
|
||||
m.commit().await?;
|
||||
let mut m = tline.begin_modification(Lsn(0x30));
|
||||
walingest
|
||||
.put_rel_page_image(&mut m, TESTREL_A, 0, TEST_IMG("foo blk 0 at 3"), &ctx)
|
||||
.await?;
|
||||
m.commit()?;
|
||||
m.commit().await?;
|
||||
let mut m = tline.begin_modification(Lsn(0x40));
|
||||
walingest
|
||||
.put_rel_page_image(&mut m, TESTREL_A, 1, TEST_IMG("foo blk 1 at 4"), &ctx)
|
||||
.await?;
|
||||
m.commit()?;
|
||||
m.commit().await?;
|
||||
let mut m = tline.begin_modification(Lsn(0x50));
|
||||
walingest
|
||||
.put_rel_page_image(&mut m, TESTREL_A, 2, TEST_IMG("foo blk 2 at 5"), &ctx)
|
||||
.await?;
|
||||
m.commit()?;
|
||||
m.commit().await?;
|
||||
|
||||
assert_current_logical_size(&tline, Lsn(0x50));
|
||||
|
||||
@@ -1319,7 +1319,7 @@ mod tests {
|
||||
walingest
|
||||
.put_rel_truncation(&mut m, TESTREL_A, 2, &ctx)
|
||||
.await?;
|
||||
m.commit()?;
|
||||
m.commit().await?;
|
||||
assert_current_logical_size(&tline, Lsn(0x60));
|
||||
|
||||
// Check reported size and contents after truncation
|
||||
@@ -1361,7 +1361,7 @@ mod tests {
|
||||
walingest
|
||||
.put_rel_truncation(&mut m, TESTREL_A, 0, &ctx)
|
||||
.await?;
|
||||
m.commit()?;
|
||||
m.commit().await?;
|
||||
assert_eq!(
|
||||
tline
|
||||
.get_rel_size(TESTREL_A, Lsn(0x68), false, &ctx)
|
||||
@@ -1374,7 +1374,7 @@ mod tests {
|
||||
walingest
|
||||
.put_rel_page_image(&mut m, TESTREL_A, 1, TEST_IMG("foo blk 1"), &ctx)
|
||||
.await?;
|
||||
m.commit()?;
|
||||
m.commit().await?;
|
||||
assert_eq!(
|
||||
tline
|
||||
.get_rel_size(TESTREL_A, Lsn(0x70), false, &ctx)
|
||||
@@ -1399,7 +1399,7 @@ mod tests {
|
||||
walingest
|
||||
.put_rel_page_image(&mut m, TESTREL_A, 1500, TEST_IMG("foo blk 1500"), &ctx)
|
||||
.await?;
|
||||
m.commit()?;
|
||||
m.commit().await?;
|
||||
assert_eq!(
|
||||
tline
|
||||
.get_rel_size(TESTREL_A, Lsn(0x80), false, &ctx)
|
||||
@@ -1428,16 +1428,17 @@ mod tests {
|
||||
// and then created it again within the same layer.
|
||||
#[tokio::test]
|
||||
async fn test_drop_extend() -> Result<()> {
|
||||
let harness = TenantHarness::create("test_drop_extend")?;
|
||||
let (tenant, ctx) = harness.load().await;
|
||||
let tline = create_test_timeline(&tenant, TIMELINE_ID, DEFAULT_PG_VERSION, &ctx)?;
|
||||
let (tenant, ctx) = TenantHarness::create("test_drop_extend")?.load().await;
|
||||
let tline = tenant
|
||||
.create_test_timeline(TIMELINE_ID, Lsn(8), DEFAULT_PG_VERSION, &ctx)
|
||||
.await?;
|
||||
let mut walingest = init_walingest_test(&tline, &ctx).await?;
|
||||
|
||||
let mut m = tline.begin_modification(Lsn(0x20));
|
||||
walingest
|
||||
.put_rel_page_image(&mut m, TESTREL_A, 0, TEST_IMG("foo blk 0 at 2"), &ctx)
|
||||
.await?;
|
||||
m.commit()?;
|
||||
m.commit().await?;
|
||||
|
||||
// Check that rel exists and size is correct
|
||||
assert_eq!(
|
||||
@@ -1456,7 +1457,7 @@ mod tests {
|
||||
// Drop rel
|
||||
let mut m = tline.begin_modification(Lsn(0x30));
|
||||
walingest.put_rel_drop(&mut m, TESTREL_A, &ctx).await?;
|
||||
m.commit()?;
|
||||
m.commit().await?;
|
||||
|
||||
// Check that rel is not visible anymore
|
||||
assert_eq!(
|
||||
@@ -1474,7 +1475,7 @@ mod tests {
|
||||
walingest
|
||||
.put_rel_page_image(&mut m, TESTREL_A, 0, TEST_IMG("foo blk 0 at 4"), &ctx)
|
||||
.await?;
|
||||
m.commit()?;
|
||||
m.commit().await?;
|
||||
|
||||
// Check that rel exists and size is correct
|
||||
assert_eq!(
|
||||
@@ -1498,9 +1499,10 @@ mod tests {
|
||||
// and then extended it again within the same layer.
|
||||
#[tokio::test]
|
||||
async fn test_truncate_extend() -> Result<()> {
|
||||
let harness = TenantHarness::create("test_truncate_extend")?;
|
||||
let (tenant, ctx) = harness.load().await;
|
||||
let tline = create_test_timeline(&tenant, TIMELINE_ID, DEFAULT_PG_VERSION, &ctx)?;
|
||||
let (tenant, ctx) = TenantHarness::create("test_truncate_extend")?.load().await;
|
||||
let tline = tenant
|
||||
.create_test_timeline(TIMELINE_ID, Lsn(8), DEFAULT_PG_VERSION, &ctx)
|
||||
.await?;
|
||||
let mut walingest = init_walingest_test(&tline, &ctx).await?;
|
||||
|
||||
// Create a 20 MB relation (the size is arbitrary)
|
||||
@@ -1512,7 +1514,7 @@ mod tests {
|
||||
.put_rel_page_image(&mut m, TESTREL_A, blkno, TEST_IMG(&data), &ctx)
|
||||
.await?;
|
||||
}
|
||||
m.commit()?;
|
||||
m.commit().await?;
|
||||
|
||||
// The relation was created at LSN 20, not visible at LSN 1 yet.
|
||||
assert_eq!(
|
||||
@@ -1557,7 +1559,7 @@ mod tests {
|
||||
walingest
|
||||
.put_rel_truncation(&mut m, TESTREL_A, 1, &ctx)
|
||||
.await?;
|
||||
m.commit()?;
|
||||
m.commit().await?;
|
||||
|
||||
// Check reported size and contents after truncation
|
||||
assert_eq!(
|
||||
@@ -1606,7 +1608,7 @@ mod tests {
|
||||
.put_rel_page_image(&mut m, TESTREL_A, blkno, TEST_IMG(&data), &ctx)
|
||||
.await?;
|
||||
}
|
||||
m.commit()?;
|
||||
m.commit().await?;
|
||||
|
||||
assert_eq!(
|
||||
tline
|
||||
@@ -1639,9 +1641,10 @@ mod tests {
|
||||
/// split into multiple 1 GB segments in Postgres.
|
||||
#[tokio::test]
|
||||
async fn test_large_rel() -> Result<()> {
|
||||
let harness = TenantHarness::create("test_large_rel")?;
|
||||
let (tenant, ctx) = harness.load().await;
|
||||
let tline = create_test_timeline(&tenant, TIMELINE_ID, DEFAULT_PG_VERSION, &ctx)?;
|
||||
let (tenant, ctx) = TenantHarness::create("test_large_rel")?.load().await;
|
||||
let tline = tenant
|
||||
.create_test_timeline(TIMELINE_ID, Lsn(8), DEFAULT_PG_VERSION, &ctx)
|
||||
.await?;
|
||||
let mut walingest = init_walingest_test(&tline, &ctx).await?;
|
||||
|
||||
let mut lsn = 0x10;
|
||||
@@ -1652,7 +1655,7 @@ mod tests {
|
||||
walingest
|
||||
.put_rel_page_image(&mut m, TESTREL_A, blknum as BlockNumber, img, &ctx)
|
||||
.await?;
|
||||
m.commit()?;
|
||||
m.commit().await?;
|
||||
}
|
||||
|
||||
assert_current_logical_size(&tline, Lsn(lsn));
|
||||
@@ -1668,7 +1671,7 @@ mod tests {
|
||||
walingest
|
||||
.put_rel_truncation(&mut m, TESTREL_A, RELSEG_SIZE, &ctx)
|
||||
.await?;
|
||||
m.commit()?;
|
||||
m.commit().await?;
|
||||
assert_eq!(
|
||||
tline.get_rel_size(TESTREL_A, Lsn(lsn), false, &ctx).await?,
|
||||
RELSEG_SIZE
|
||||
@@ -1681,7 +1684,7 @@ mod tests {
|
||||
walingest
|
||||
.put_rel_truncation(&mut m, TESTREL_A, RELSEG_SIZE - 1, &ctx)
|
||||
.await?;
|
||||
m.commit()?;
|
||||
m.commit().await?;
|
||||
assert_eq!(
|
||||
tline.get_rel_size(TESTREL_A, Lsn(lsn), false, &ctx).await?,
|
||||
RELSEG_SIZE - 1
|
||||
@@ -1697,7 +1700,7 @@ mod tests {
|
||||
walingest
|
||||
.put_rel_truncation(&mut m, TESTREL_A, size as BlockNumber, &ctx)
|
||||
.await?;
|
||||
m.commit()?;
|
||||
m.commit().await?;
|
||||
assert_eq!(
|
||||
tline.get_rel_size(TESTREL_A, Lsn(lsn), false, &ctx).await?,
|
||||
size as BlockNumber
|
||||
|
||||
56
poetry.lock
generated
56
poetry.lock
generated
@@ -1,4 +1,4 @@
|
||||
# This file is automatically @generated by Poetry 1.4.2 and should not be changed by hand.
|
||||
# This file is automatically @generated by Poetry and should not be changed by hand.
|
||||
|
||||
[[package]]
|
||||
name = "aiohttp"
|
||||
@@ -855,35 +855,31 @@ files = [
|
||||
|
||||
[[package]]
|
||||
name = "cryptography"
|
||||
version = "39.0.1"
|
||||
version = "41.0.0"
|
||||
description = "cryptography is a package which provides cryptographic recipes and primitives to Python developers."
|
||||
category = "main"
|
||||
optional = false
|
||||
python-versions = ">=3.6"
|
||||
python-versions = ">=3.7"
|
||||
files = [
|
||||
{file = "cryptography-39.0.1-cp36-abi3-macosx_10_12_universal2.whl", hash = "sha256:6687ef6d0a6497e2b58e7c5b852b53f62142cfa7cd1555795758934da363a965"},
|
||||
{file = "cryptography-39.0.1-cp36-abi3-macosx_10_12_x86_64.whl", hash = "sha256:706843b48f9a3f9b9911979761c91541e3d90db1ca905fd63fee540a217698bc"},
|
||||
{file = "cryptography-39.0.1-cp36-abi3-manylinux_2_17_aarch64.manylinux2014_aarch64.manylinux_2_24_aarch64.whl", hash = "sha256:5d2d8b87a490bfcd407ed9d49093793d0f75198a35e6eb1a923ce1ee86c62b41"},
|
||||
{file = "cryptography-39.0.1-cp36-abi3-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:83e17b26de248c33f3acffb922748151d71827d6021d98c70e6c1a25ddd78505"},
|
||||
{file = "cryptography-39.0.1-cp36-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:e124352fd3db36a9d4a21c1aa27fd5d051e621845cb87fb851c08f4f75ce8be6"},
|
||||
{file = "cryptography-39.0.1-cp36-abi3-manylinux_2_24_x86_64.whl", hash = "sha256:5aa67414fcdfa22cf052e640cb5ddc461924a045cacf325cd164e65312d99502"},
|
||||
{file = "cryptography-39.0.1-cp36-abi3-manylinux_2_28_aarch64.whl", hash = "sha256:35f7c7d015d474f4011e859e93e789c87d21f6f4880ebdc29896a60403328f1f"},
|
||||
{file = "cryptography-39.0.1-cp36-abi3-manylinux_2_28_x86_64.whl", hash = "sha256:f24077a3b5298a5a06a8e0536e3ea9ec60e4c7ac486755e5fb6e6ea9b3500106"},
|
||||
{file = "cryptography-39.0.1-cp36-abi3-musllinux_1_1_aarch64.whl", hash = "sha256:f0c64d1bd842ca2633e74a1a28033d139368ad959872533b1bab8c80e8240a0c"},
|
||||
{file = "cryptography-39.0.1-cp36-abi3-musllinux_1_1_x86_64.whl", hash = "sha256:0f8da300b5c8af9f98111ffd512910bc792b4c77392a9523624680f7956a99d4"},
|
||||
{file = "cryptography-39.0.1-cp36-abi3-win32.whl", hash = "sha256:fe913f20024eb2cb2f323e42a64bdf2911bb9738a15dba7d3cce48151034e3a8"},
|
||||
{file = "cryptography-39.0.1-cp36-abi3-win_amd64.whl", hash = "sha256:ced4e447ae29ca194449a3f1ce132ded8fcab06971ef5f618605aacaa612beac"},
|
||||
{file = "cryptography-39.0.1-pp38-pypy38_pp73-macosx_10_12_x86_64.whl", hash = "sha256:807ce09d4434881ca3a7594733669bd834f5b2c6d5c7e36f8c00f691887042ad"},
|
||||
{file = "cryptography-39.0.1-pp38-pypy38_pp73-manylinux_2_28_aarch64.whl", hash = "sha256:c5caeb8188c24888c90b5108a441c106f7faa4c4c075a2bcae438c6e8ca73cef"},
|
||||
{file = "cryptography-39.0.1-pp38-pypy38_pp73-manylinux_2_28_x86_64.whl", hash = "sha256:4789d1e3e257965e960232345002262ede4d094d1a19f4d3b52e48d4d8f3b885"},
|
||||
{file = "cryptography-39.0.1-pp38-pypy38_pp73-win_amd64.whl", hash = "sha256:96f1157a7c08b5b189b16b47bc9db2332269d6680a196341bf30046330d15388"},
|
||||
{file = "cryptography-39.0.1-pp39-pypy39_pp73-macosx_10_12_x86_64.whl", hash = "sha256:e422abdec8b5fa8462aa016786680720d78bdce7a30c652b7fadf83a4ba35336"},
|
||||
{file = "cryptography-39.0.1-pp39-pypy39_pp73-manylinux_2_17_aarch64.manylinux2014_aarch64.manylinux_2_24_aarch64.whl", hash = "sha256:b0afd054cd42f3d213bf82c629efb1ee5f22eba35bf0eec88ea9ea7304f511a2"},
|
||||
{file = "cryptography-39.0.1-pp39-pypy39_pp73-manylinux_2_24_x86_64.whl", hash = "sha256:6f8ba7f0328b79f08bdacc3e4e66fb4d7aab0c3584e0bd41328dce5262e26b2e"},
|
||||
{file = "cryptography-39.0.1-pp39-pypy39_pp73-manylinux_2_28_aarch64.whl", hash = "sha256:ef8b72fa70b348724ff1218267e7f7375b8de4e8194d1636ee60510aae104cd0"},
|
||||
{file = "cryptography-39.0.1-pp39-pypy39_pp73-manylinux_2_28_x86_64.whl", hash = "sha256:aec5a6c9864be7df2240c382740fcf3b96928c46604eaa7f3091f58b878c0bb6"},
|
||||
{file = "cryptography-39.0.1-pp39-pypy39_pp73-win_amd64.whl", hash = "sha256:fdd188c8a6ef8769f148f88f859884507b954cc64db6b52f66ef199bb9ad660a"},
|
||||
{file = "cryptography-39.0.1.tar.gz", hash = "sha256:d1f6198ee6d9148405e49887803907fe8962a23e6c6f83ea7d98f1c0de375695"},
|
||||
{file = "cryptography-41.0.0-cp37-abi3-macosx_10_12_universal2.whl", hash = "sha256:3c5ef25d060c80d6d9f7f9892e1d41bb1c79b78ce74805b8cb4aa373cb7d5ec8"},
|
||||
{file = "cryptography-41.0.0-cp37-abi3-macosx_10_12_x86_64.whl", hash = "sha256:8362565b3835ceacf4dc8f3b56471a2289cf51ac80946f9087e66dc283a810e0"},
|
||||
{file = "cryptography-41.0.0-cp37-abi3-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:3680248309d340fda9611498a5319b0193a8dbdb73586a1acf8109d06f25b92d"},
|
||||
{file = "cryptography-41.0.0-cp37-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:84a165379cb9d411d58ed739e4af3396e544eac190805a54ba2e0322feb55c46"},
|
||||
{file = "cryptography-41.0.0-cp37-abi3-manylinux_2_28_aarch64.whl", hash = "sha256:4ab14d567f7bbe7f1cdff1c53d5324ed4d3fc8bd17c481b395db224fb405c237"},
|
||||
{file = "cryptography-41.0.0-cp37-abi3-manylinux_2_28_x86_64.whl", hash = "sha256:9f65e842cb02550fac96536edb1d17f24c0a338fd84eaf582be25926e993dde4"},
|
||||
{file = "cryptography-41.0.0-cp37-abi3-musllinux_1_1_aarch64.whl", hash = "sha256:b7f2f5c525a642cecad24ee8670443ba27ac1fab81bba4cc24c7b6b41f2d0c75"},
|
||||
{file = "cryptography-41.0.0-cp37-abi3-musllinux_1_1_x86_64.whl", hash = "sha256:7d92f0248d38faa411d17f4107fc0bce0c42cae0b0ba5415505df72d751bf62d"},
|
||||
{file = "cryptography-41.0.0-cp37-abi3-win32.whl", hash = "sha256:34d405ea69a8b34566ba3dfb0521379b210ea5d560fafedf9f800a9a94a41928"},
|
||||
{file = "cryptography-41.0.0-cp37-abi3-win_amd64.whl", hash = "sha256:344c6de9f8bda3c425b3a41b319522ba3208551b70c2ae00099c205f0d9fd3be"},
|
||||
{file = "cryptography-41.0.0-pp38-pypy38_pp73-macosx_10_12_x86_64.whl", hash = "sha256:88ff107f211ea696455ea8d911389f6d2b276aabf3231bf72c8853d22db755c5"},
|
||||
{file = "cryptography-41.0.0-pp38-pypy38_pp73-manylinux_2_28_aarch64.whl", hash = "sha256:b846d59a8d5a9ba87e2c3d757ca019fa576793e8758174d3868aecb88d6fc8eb"},
|
||||
{file = "cryptography-41.0.0-pp38-pypy38_pp73-manylinux_2_28_x86_64.whl", hash = "sha256:f5d0bf9b252f30a31664b6f64432b4730bb7038339bd18b1fafe129cfc2be9be"},
|
||||
{file = "cryptography-41.0.0-pp38-pypy38_pp73-win_amd64.whl", hash = "sha256:5c1f7293c31ebc72163a9a0df246f890d65f66b4a40d9ec80081969ba8c78cc9"},
|
||||
{file = "cryptography-41.0.0-pp39-pypy39_pp73-macosx_10_12_x86_64.whl", hash = "sha256:bf8fc66012ca857d62f6a347007e166ed59c0bc150cefa49f28376ebe7d992a2"},
|
||||
{file = "cryptography-41.0.0-pp39-pypy39_pp73-manylinux_2_28_aarch64.whl", hash = "sha256:a4fc68d1c5b951cfb72dfd54702afdbbf0fb7acdc9b7dc4301bbf2225a27714d"},
|
||||
{file = "cryptography-41.0.0-pp39-pypy39_pp73-manylinux_2_28_x86_64.whl", hash = "sha256:14754bcdae909d66ff24b7b5f166d69340ccc6cb15731670435efd5719294895"},
|
||||
{file = "cryptography-41.0.0-pp39-pypy39_pp73-win_amd64.whl", hash = "sha256:0ddaee209d1cf1f180f1efa338a68c4621154de0afaef92b89486f5f96047c55"},
|
||||
{file = "cryptography-41.0.0.tar.gz", hash = "sha256:6b71f64beeea341c9b4f963b48ee3b62d62d57ba93eb120e1196b31dc1025e78"},
|
||||
]
|
||||
|
||||
[package.dependencies]
|
||||
@@ -892,12 +888,12 @@ cffi = ">=1.12"
|
||||
[package.extras]
|
||||
docs = ["sphinx (>=5.3.0)", "sphinx-rtd-theme (>=1.1.1)"]
|
||||
docstest = ["pyenchant (>=1.6.11)", "sphinxcontrib-spelling (>=4.0.1)", "twine (>=1.12.0)"]
|
||||
pep8test = ["black", "check-manifest", "mypy", "ruff", "types-pytz", "types-requests"]
|
||||
sdist = ["setuptools-rust (>=0.11.4)"]
|
||||
nox = ["nox"]
|
||||
pep8test = ["black", "check-sdist", "mypy", "ruff"]
|
||||
sdist = ["build"]
|
||||
ssh = ["bcrypt (>=3.1.5)"]
|
||||
test = ["hypothesis (>=1.11.4,!=3.79.2)", "iso8601", "pretend", "pytest (>=6.2.0)", "pytest-benchmark", "pytest-cov", "pytest-shard (>=0.1.2)", "pytest-subtests", "pytest-xdist", "pytz"]
|
||||
test = ["pretend", "pytest (>=6.2.0)", "pytest-benchmark", "pytest-cov", "pytest-xdist"]
|
||||
test-randomorder = ["pytest-randomly"]
|
||||
tox = ["tox"]
|
||||
|
||||
[[package]]
|
||||
name = "docker"
|
||||
|
||||
@@ -3,15 +3,19 @@
|
||||
//
|
||||
use anyhow::{bail, Context, Result};
|
||||
use clap::Parser;
|
||||
use futures::future::BoxFuture;
|
||||
use futures::stream::FuturesUnordered;
|
||||
use futures::{FutureExt, StreamExt};
|
||||
use remote_storage::RemoteStorageConfig;
|
||||
use tokio::runtime::Handle;
|
||||
use tokio::signal::unix::{signal, SignalKind};
|
||||
use tokio::task::JoinError;
|
||||
use toml_edit::Document;
|
||||
use utils::signals::ShutdownSignals;
|
||||
|
||||
use std::fs::{self, File};
|
||||
use std::io::{ErrorKind, Write};
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::sync::Arc;
|
||||
use std::thread;
|
||||
use std::time::Duration;
|
||||
use storage_broker::Uri;
|
||||
use tokio::sync::mpsc;
|
||||
@@ -20,22 +24,21 @@ use tracing::*;
|
||||
use utils::pid_file;
|
||||
|
||||
use metrics::set_build_info_metric;
|
||||
use safekeeper::broker;
|
||||
use safekeeper::control_file;
|
||||
use safekeeper::defaults::{
|
||||
DEFAULT_HEARTBEAT_TIMEOUT, DEFAULT_HTTP_LISTEN_ADDR, DEFAULT_MAX_OFFLOADER_LAG_BYTES,
|
||||
DEFAULT_PG_LISTEN_ADDR,
|
||||
};
|
||||
use safekeeper::http;
|
||||
use safekeeper::remove_wal;
|
||||
use safekeeper::wal_backup;
|
||||
use safekeeper::wal_service;
|
||||
use safekeeper::GlobalTimelines;
|
||||
use safekeeper::SafeKeeperConf;
|
||||
use safekeeper::{broker, WAL_SERVICE_RUNTIME};
|
||||
use safekeeper::{control_file, BROKER_RUNTIME};
|
||||
use safekeeper::{http, WAL_REMOVER_RUNTIME};
|
||||
use safekeeper::{remove_wal, WAL_BACKUP_RUNTIME};
|
||||
use safekeeper::{wal_backup, HTTP_RUNTIME};
|
||||
use storage_broker::DEFAULT_ENDPOINT;
|
||||
use utils::auth::JwtAuth;
|
||||
use utils::{
|
||||
http::endpoint,
|
||||
id::NodeId,
|
||||
logging::{self, LogFormat},
|
||||
project_git_version,
|
||||
@@ -104,10 +107,6 @@ struct Args {
|
||||
/// Safekeeper won't be elected for WAL offloading if it is lagging for more than this value in bytes
|
||||
#[arg(long, default_value_t = DEFAULT_MAX_OFFLOADER_LAG_BYTES)]
|
||||
max_offloader_lag: u64,
|
||||
/// Number of threads for wal backup runtime, by default number of cores
|
||||
/// available to the system.
|
||||
#[arg(long)]
|
||||
wal_backup_threads: Option<usize>,
|
||||
/// Number of max parallel WAL segments to be offloaded to remote storage.
|
||||
#[arg(long, default_value = "5")]
|
||||
wal_backup_parallel_jobs: usize,
|
||||
@@ -121,9 +120,14 @@ struct Args {
|
||||
/// Format for logging, either 'plain' or 'json'.
|
||||
#[arg(long, default_value = "plain")]
|
||||
log_format: String,
|
||||
/// Run everything in single threaded current thread runtime, might be
|
||||
/// useful for debugging.
|
||||
#[arg(long)]
|
||||
current_thread_runtime: bool,
|
||||
}
|
||||
|
||||
fn main() -> anyhow::Result<()> {
|
||||
#[tokio::main(flavor = "current_thread")]
|
||||
async fn main() -> anyhow::Result<()> {
|
||||
let args = Args::parse();
|
||||
|
||||
if let Some(addr) = args.dump_control_file {
|
||||
@@ -183,10 +187,10 @@ fn main() -> anyhow::Result<()> {
|
||||
heartbeat_timeout: args.heartbeat_timeout,
|
||||
remote_storage: args.remote_storage,
|
||||
max_offloader_lag_bytes: args.max_offloader_lag,
|
||||
backup_runtime_threads: args.wal_backup_threads,
|
||||
wal_backup_enabled: !args.disable_wal_backup,
|
||||
backup_parallel_jobs: args.wal_backup_parallel_jobs,
|
||||
auth,
|
||||
current_thread_runtime: args.current_thread_runtime,
|
||||
};
|
||||
|
||||
// initialize sentry if SENTRY_DSN is provided
|
||||
@@ -194,10 +198,14 @@ fn main() -> anyhow::Result<()> {
|
||||
Some(GIT_VERSION.into()),
|
||||
&[("node_id", &conf.my_id.to_string())],
|
||||
);
|
||||
start_safekeeper(conf)
|
||||
start_safekeeper(conf).await
|
||||
}
|
||||
|
||||
fn start_safekeeper(conf: SafeKeeperConf) -> Result<()> {
|
||||
/// Result of joining any of main tasks: upper error means task failed to
|
||||
/// complete, e.g. panicked, inner is error produced by task itself.
|
||||
type JoinTaskRes = Result<anyhow::Result<()>, JoinError>;
|
||||
|
||||
async fn start_safekeeper(conf: SafeKeeperConf) -> Result<()> {
|
||||
// Prevent running multiple safekeepers on the same directory
|
||||
let lock_file_path = conf.workdir.join(PID_FILE_NAME);
|
||||
let lock_file =
|
||||
@@ -208,14 +216,18 @@ fn start_safekeeper(conf: SafeKeeperConf) -> Result<()> {
|
||||
// we need to release the lock file only when the current process is gone
|
||||
std::mem::forget(lock_file);
|
||||
|
||||
let http_listener = tcp_listener::bind(conf.listen_http_addr.clone()).map_err(|e| {
|
||||
error!("failed to bind to address {}: {}", conf.listen_http_addr, e);
|
||||
info!("starting safekeeper WAL service on {}", conf.listen_pg_addr);
|
||||
let pg_listener = tcp_listener::bind(conf.listen_pg_addr.clone()).map_err(|e| {
|
||||
error!("failed to bind to address {}: {}", conf.listen_pg_addr, e);
|
||||
e
|
||||
})?;
|
||||
|
||||
info!("starting safekeeper on {}", conf.listen_pg_addr);
|
||||
let pg_listener = tcp_listener::bind(conf.listen_pg_addr.clone()).map_err(|e| {
|
||||
error!("failed to bind to address {}: {}", conf.listen_pg_addr, e);
|
||||
info!(
|
||||
"starting safekeeper HTTP service on {}",
|
||||
conf.listen_http_addr
|
||||
);
|
||||
let http_listener = tcp_listener::bind(conf.listen_http_addr.clone()).map_err(|e| {
|
||||
error!("failed to bind to address {}: {}", conf.listen_http_addr, e);
|
||||
e
|
||||
})?;
|
||||
|
||||
@@ -224,71 +236,88 @@ fn start_safekeeper(conf: SafeKeeperConf) -> Result<()> {
|
||||
let timeline_collector = safekeeper::metrics::TimelineCollector::new();
|
||||
metrics::register_internal(Box::new(timeline_collector))?;
|
||||
|
||||
let mut threads = vec![];
|
||||
let (wal_backup_launcher_tx, wal_backup_launcher_rx) = mpsc::channel(100);
|
||||
|
||||
// Load all timelines from disk to memory.
|
||||
GlobalTimelines::init(conf.clone(), wal_backup_launcher_tx)?;
|
||||
|
||||
let conf_ = conf.clone();
|
||||
threads.push(
|
||||
thread::Builder::new()
|
||||
.name("http_endpoint_thread".into())
|
||||
.spawn(|| {
|
||||
let router = http::make_router(conf_);
|
||||
endpoint::serve_thread_main(
|
||||
router,
|
||||
http_listener,
|
||||
std::future::pending(), // never shut down
|
||||
)
|
||||
.unwrap();
|
||||
})?,
|
||||
);
|
||||
|
||||
let conf_cloned = conf.clone();
|
||||
let safekeeper_thread = thread::Builder::new()
|
||||
.name("WAL service thread".into())
|
||||
.spawn(|| wal_service::thread_main(conf_cloned, pg_listener))
|
||||
.unwrap();
|
||||
|
||||
threads.push(safekeeper_thread);
|
||||
// Keep handles to main tasks to die if any of them disappears.
|
||||
let mut tasks_handles: FuturesUnordered<BoxFuture<(String, JoinTaskRes)>> =
|
||||
FuturesUnordered::new();
|
||||
|
||||
let conf_ = conf.clone();
|
||||
threads.push(
|
||||
thread::Builder::new()
|
||||
.name("broker thread".into())
|
||||
.spawn(|| {
|
||||
broker::thread_main(conf_);
|
||||
})?,
|
||||
);
|
||||
// Run everything in current thread rt, if asked.
|
||||
if conf.current_thread_runtime {
|
||||
info!("running in current thread runtime");
|
||||
}
|
||||
let current_thread_rt = conf
|
||||
.current_thread_runtime
|
||||
.then(|| Handle::try_current().expect("no runtime in main"));
|
||||
let wal_service_handle = current_thread_rt
|
||||
.as_ref()
|
||||
.unwrap_or_else(|| WAL_SERVICE_RUNTIME.handle())
|
||||
.spawn(wal_service::task_main(conf_, pg_listener))
|
||||
// wrap with task name for error reporting
|
||||
.map(|res| ("WAL service main".to_owned(), res));
|
||||
tasks_handles.push(Box::pin(wal_service_handle));
|
||||
|
||||
let conf_ = conf.clone();
|
||||
threads.push(
|
||||
thread::Builder::new()
|
||||
.name("WAL removal thread".into())
|
||||
.spawn(|| {
|
||||
remove_wal::thread_main(conf_);
|
||||
})?,
|
||||
);
|
||||
let http_handle = current_thread_rt
|
||||
.as_ref()
|
||||
.unwrap_or_else(|| HTTP_RUNTIME.handle())
|
||||
.spawn(http::task_main(conf_, http_listener))
|
||||
.map(|res| ("HTTP service main".to_owned(), res));
|
||||
tasks_handles.push(Box::pin(http_handle));
|
||||
|
||||
threads.push(
|
||||
thread::Builder::new()
|
||||
.name("WAL backup launcher thread".into())
|
||||
.spawn(move || {
|
||||
wal_backup::wal_backup_launcher_thread_main(conf, wal_backup_launcher_rx);
|
||||
})?,
|
||||
);
|
||||
let conf_ = conf.clone();
|
||||
let broker_task_handle = current_thread_rt
|
||||
.as_ref()
|
||||
.unwrap_or_else(|| BROKER_RUNTIME.handle())
|
||||
.spawn(broker::task_main(conf_).instrument(info_span!("broker")))
|
||||
.map(|res| ("broker main".to_owned(), res));
|
||||
tasks_handles.push(Box::pin(broker_task_handle));
|
||||
|
||||
let conf_ = conf.clone();
|
||||
let wal_remover_handle = current_thread_rt
|
||||
.as_ref()
|
||||
.unwrap_or_else(|| WAL_REMOVER_RUNTIME.handle())
|
||||
.spawn(remove_wal::task_main(conf_))
|
||||
.map(|res| ("WAL remover".to_owned(), res));
|
||||
tasks_handles.push(Box::pin(wal_remover_handle));
|
||||
|
||||
let conf_ = conf.clone();
|
||||
let wal_backup_handle = current_thread_rt
|
||||
.as_ref()
|
||||
.unwrap_or_else(|| WAL_BACKUP_RUNTIME.handle())
|
||||
.spawn(wal_backup::wal_backup_launcher_task_main(
|
||||
conf_,
|
||||
wal_backup_launcher_rx,
|
||||
))
|
||||
.map(|res| ("WAL backup launcher".to_owned(), res));
|
||||
tasks_handles.push(Box::pin(wal_backup_handle));
|
||||
|
||||
set_build_info_metric(GIT_VERSION);
|
||||
// TODO: put more thoughts into handling of failed threads
|
||||
// We should catch & die if they are in trouble.
|
||||
|
||||
// On any shutdown signal, log receival and exit. Additionally, handling
|
||||
// SIGQUIT prevents coredump.
|
||||
ShutdownSignals::handle(|signal| {
|
||||
info!("received {}, terminating", signal.name());
|
||||
std::process::exit(0);
|
||||
})
|
||||
// TODO: update tokio-stream, convert to real async Stream with
|
||||
// SignalStream, map it to obtain missing signal name, combine streams into
|
||||
// single stream we can easily sit on.
|
||||
let mut sigquit_stream = signal(SignalKind::quit())?;
|
||||
let mut sigint_stream = signal(SignalKind::interrupt())?;
|
||||
let mut sigterm_stream = signal(SignalKind::terminate())?;
|
||||
|
||||
tokio::select! {
|
||||
Some((task_name, res)) = tasks_handles.next()=> {
|
||||
error!("{} task failed: {:?}, exiting", task_name, res);
|
||||
std::process::exit(1);
|
||||
}
|
||||
// On any shutdown signal, log receival and exit. Additionally, handling
|
||||
// SIGQUIT prevents coredump.
|
||||
_ = sigquit_stream.recv() => info!("received SIGQUIT, terminating"),
|
||||
_ = sigint_stream.recv() => info!("received SIGINT, terminating"),
|
||||
_ = sigterm_stream.recv() => info!("received SIGTERM, terminating")
|
||||
|
||||
};
|
||||
std::process::exit(0);
|
||||
}
|
||||
|
||||
/// Determine safekeeper id.
|
||||
|
||||
@@ -8,7 +8,7 @@ use anyhow::Error;
|
||||
use anyhow::Result;
|
||||
|
||||
use storage_broker::parse_proto_ttid;
|
||||
use storage_broker::proto::broker_service_client::BrokerServiceClient;
|
||||
|
||||
use storage_broker::proto::subscribe_safekeeper_info_request::SubscriptionKey as ProtoSubscriptionKey;
|
||||
use storage_broker::proto::SubscribeSafekeeperInfoRequest;
|
||||
use storage_broker::Request;
|
||||
@@ -16,7 +16,7 @@ use storage_broker::Request;
|
||||
use std::time::Duration;
|
||||
use std::time::Instant;
|
||||
use tokio::task::JoinHandle;
|
||||
use tokio::{runtime, time::sleep};
|
||||
use tokio::time::sleep;
|
||||
use tracing::*;
|
||||
|
||||
use crate::metrics::BROKER_ITERATION_TIMELINES;
|
||||
@@ -29,23 +29,10 @@ use crate::SafeKeeperConf;
|
||||
const RETRY_INTERVAL_MSEC: u64 = 1000;
|
||||
const PUSH_INTERVAL_MSEC: u64 = 1000;
|
||||
|
||||
pub fn thread_main(conf: SafeKeeperConf) {
|
||||
let runtime = runtime::Builder::new_current_thread()
|
||||
.enable_all()
|
||||
.build()
|
||||
.unwrap();
|
||||
|
||||
let _enter = info_span!("broker").entered();
|
||||
info!("started, broker endpoint {:?}", conf.broker_endpoint);
|
||||
|
||||
runtime.block_on(async {
|
||||
main_loop(conf).await;
|
||||
});
|
||||
}
|
||||
|
||||
/// Push once in a while data about all active timelines to the broker.
|
||||
async fn push_loop(conf: SafeKeeperConf) -> anyhow::Result<()> {
|
||||
let mut client = BrokerServiceClient::connect(conf.broker_endpoint.clone()).await?;
|
||||
let mut client =
|
||||
storage_broker::connect(conf.broker_endpoint.clone(), conf.broker_keepalive_interval)?;
|
||||
let push_interval = Duration::from_millis(PUSH_INTERVAL_MSEC);
|
||||
|
||||
let outbound = async_stream::stream! {
|
||||
@@ -55,20 +42,27 @@ async fn push_loop(conf: SafeKeeperConf) -> anyhow::Result<()> {
|
||||
// sensitive and there is no risk of deadlock as we don't await while
|
||||
// lock is held.
|
||||
let now = Instant::now();
|
||||
let mut active_tlis = GlobalTimelines::get_all();
|
||||
active_tlis.retain(|tli| tli.is_active());
|
||||
for tli in &active_tlis {
|
||||
let sk_info = tli.get_safekeeper_info(&conf);
|
||||
let all_tlis = GlobalTimelines::get_all();
|
||||
let mut n_pushed_tlis = 0;
|
||||
for tli in &all_tlis {
|
||||
// filtering alternative futures::stream::iter(all_tlis)
|
||||
// .filter(|tli| {let tli = tli.clone(); async move { tli.is_active().await}}).collect::<Vec<_>>().await;
|
||||
// doesn't look better, and I'm not sure how to do that without collect.
|
||||
if !tli.is_active().await {
|
||||
continue;
|
||||
}
|
||||
let sk_info = tli.get_safekeeper_info(&conf).await;
|
||||
yield sk_info;
|
||||
BROKER_PUSHED_UPDATES.inc();
|
||||
n_pushed_tlis += 1;
|
||||
}
|
||||
let elapsed = now.elapsed();
|
||||
|
||||
BROKER_PUSH_ALL_UPDATES_SECONDS.observe(elapsed.as_secs_f64());
|
||||
BROKER_ITERATION_TIMELINES.observe(active_tlis.len() as f64);
|
||||
BROKER_ITERATION_TIMELINES.observe(n_pushed_tlis as f64);
|
||||
|
||||
if elapsed > push_interval / 2 {
|
||||
info!("broker push is too long, pushed {} timeline updates to broker in {:?}", active_tlis.len(), elapsed);
|
||||
info!("broker push is too long, pushed {} timeline updates to broker in {:?}", n_pushed_tlis, elapsed);
|
||||
}
|
||||
|
||||
sleep(push_interval).await;
|
||||
@@ -125,10 +119,13 @@ async fn pull_loop(conf: SafeKeeperConf) -> Result<()> {
|
||||
bail!("end of stream");
|
||||
}
|
||||
|
||||
async fn main_loop(conf: SafeKeeperConf) {
|
||||
pub async fn task_main(conf: SafeKeeperConf) -> anyhow::Result<()> {
|
||||
info!("started, broker endpoint {:?}", conf.broker_endpoint);
|
||||
|
||||
let mut ticker = tokio::time::interval(Duration::from_millis(RETRY_INTERVAL_MSEC));
|
||||
let mut push_handle: Option<JoinHandle<Result<(), Error>>> = None;
|
||||
let mut pull_handle: Option<JoinHandle<Result<(), Error>>> = None;
|
||||
|
||||
// Selecting on JoinHandles requires some squats; is there a better way to
|
||||
// reap tasks individually?
|
||||
|
||||
|
||||
@@ -2,9 +2,10 @@
|
||||
|
||||
use anyhow::{bail, ensure, Context, Result};
|
||||
use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
|
||||
use tokio::fs::{self, File};
|
||||
use tokio::io::AsyncWriteExt;
|
||||
|
||||
use std::fs::{self, File, OpenOptions};
|
||||
use std::io::{Read, Write};
|
||||
use std::io::Read;
|
||||
use std::ops::Deref;
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::time::Instant;
|
||||
@@ -26,9 +27,10 @@ pub const CHECKSUM_SIZE: usize = std::mem::size_of::<u32>();
|
||||
|
||||
/// Storage should keep actual state inside of it. It should implement Deref
|
||||
/// trait to access state fields and have persist method for updating that state.
|
||||
#[async_trait::async_trait]
|
||||
pub trait Storage: Deref<Target = SafeKeeperState> {
|
||||
/// Persist safekeeper state on disk and update internal state.
|
||||
fn persist(&mut self, s: &SafeKeeperState) -> Result<()>;
|
||||
async fn persist(&mut self, s: &SafeKeeperState) -> Result<()>;
|
||||
|
||||
/// Timestamp of last persist.
|
||||
fn last_persist_at(&self) -> Instant;
|
||||
@@ -82,7 +84,7 @@ impl FileStorage {
|
||||
/// Check the magic/version in the on-disk data and deserialize it, if possible.
|
||||
fn deser_sk_state(buf: &mut &[u8]) -> Result<SafeKeeperState> {
|
||||
// Read the version independent part
|
||||
let magic = buf.read_u32::<LittleEndian>()?;
|
||||
let magic = ReadBytesExt::read_u32::<LittleEndian>(buf)?;
|
||||
if magic != SK_MAGIC {
|
||||
bail!(
|
||||
"bad control file magic: {:X}, expected {:X}",
|
||||
@@ -90,7 +92,7 @@ impl FileStorage {
|
||||
SK_MAGIC
|
||||
);
|
||||
}
|
||||
let version = buf.read_u32::<LittleEndian>()?;
|
||||
let version = ReadBytesExt::read_u32::<LittleEndian>(buf)?;
|
||||
if version == SK_FORMAT_VERSION {
|
||||
let res = SafeKeeperState::des(buf)?;
|
||||
return Ok(res);
|
||||
@@ -110,7 +112,7 @@ impl FileStorage {
|
||||
|
||||
/// Read in the control file.
|
||||
pub fn load_control_file<P: AsRef<Path>>(control_file_path: P) -> Result<SafeKeeperState> {
|
||||
let mut control_file = OpenOptions::new()
|
||||
let mut control_file = std::fs::OpenOptions::new()
|
||||
.read(true)
|
||||
.write(true)
|
||||
.open(&control_file_path)
|
||||
@@ -159,30 +161,31 @@ impl Deref for FileStorage {
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl Storage for FileStorage {
|
||||
/// persists state durably to underlying storage
|
||||
/// for description see https://lwn.net/Articles/457667/
|
||||
fn persist(&mut self, s: &SafeKeeperState) -> Result<()> {
|
||||
async fn persist(&mut self, s: &SafeKeeperState) -> Result<()> {
|
||||
let _timer = PERSIST_CONTROL_FILE_SECONDS.start_timer();
|
||||
|
||||
// write data to safekeeper.control.partial
|
||||
let control_partial_path = self.timeline_dir.join(CONTROL_FILE_NAME_PARTIAL);
|
||||
let mut control_partial = File::create(&control_partial_path).with_context(|| {
|
||||
let mut control_partial = File::create(&control_partial_path).await.with_context(|| {
|
||||
format!(
|
||||
"failed to create partial control file at: {}",
|
||||
&control_partial_path.display()
|
||||
)
|
||||
})?;
|
||||
let mut buf: Vec<u8> = Vec::new();
|
||||
buf.write_u32::<LittleEndian>(SK_MAGIC)?;
|
||||
buf.write_u32::<LittleEndian>(SK_FORMAT_VERSION)?;
|
||||
WriteBytesExt::write_u32::<LittleEndian>(&mut buf, SK_MAGIC)?;
|
||||
WriteBytesExt::write_u32::<LittleEndian>(&mut buf, SK_FORMAT_VERSION)?;
|
||||
s.ser_into(&mut buf)?;
|
||||
|
||||
// calculate checksum before resize
|
||||
let checksum = crc32c::crc32c(&buf);
|
||||
buf.extend_from_slice(&checksum.to_le_bytes());
|
||||
|
||||
control_partial.write_all(&buf).with_context(|| {
|
||||
control_partial.write_all(&buf).await.with_context(|| {
|
||||
format!(
|
||||
"failed to write safekeeper state into control file at: {}",
|
||||
control_partial_path.display()
|
||||
@@ -191,7 +194,7 @@ impl Storage for FileStorage {
|
||||
|
||||
// fsync the file
|
||||
if !self.conf.no_sync {
|
||||
control_partial.sync_all().with_context(|| {
|
||||
control_partial.sync_all().await.with_context(|| {
|
||||
format!(
|
||||
"failed to sync partial control file at {}",
|
||||
control_partial_path.display()
|
||||
@@ -202,21 +205,22 @@ impl Storage for FileStorage {
|
||||
let control_path = self.timeline_dir.join(CONTROL_FILE_NAME);
|
||||
|
||||
// rename should be atomic
|
||||
fs::rename(&control_partial_path, &control_path)?;
|
||||
fs::rename(&control_partial_path, &control_path).await?;
|
||||
// this sync is not required by any standard but postgres does this (see durable_rename)
|
||||
if !self.conf.no_sync {
|
||||
File::open(&control_path)
|
||||
.and_then(|f| f.sync_all())
|
||||
.with_context(|| {
|
||||
format!(
|
||||
"failed to sync control file at: {}",
|
||||
&control_path.display()
|
||||
)
|
||||
})?;
|
||||
let new_f = File::open(&control_path).await?;
|
||||
new_f.sync_all().await.with_context(|| {
|
||||
format!(
|
||||
"failed to sync control file at: {}",
|
||||
&control_path.display()
|
||||
)
|
||||
})?;
|
||||
|
||||
// fsync the directory (linux specific)
|
||||
File::open(&self.timeline_dir)
|
||||
.and_then(|f| f.sync_all())
|
||||
let tli_dir = File::open(&self.timeline_dir).await?;
|
||||
tli_dir
|
||||
.sync_all()
|
||||
.await
|
||||
.context("failed to sync control file directory")?;
|
||||
}
|
||||
|
||||
@@ -236,7 +240,6 @@ mod test {
|
||||
use super::*;
|
||||
use crate::{safekeeper::SafeKeeperState, SafeKeeperConf};
|
||||
use anyhow::Result;
|
||||
use std::fs;
|
||||
use utils::{id::TenantTimelineId, lsn::Lsn};
|
||||
|
||||
fn stub_conf() -> SafeKeeperConf {
|
||||
@@ -247,59 +250,75 @@ mod test {
|
||||
}
|
||||
}
|
||||
|
||||
fn load_from_control_file(
|
||||
async fn load_from_control_file(
|
||||
conf: &SafeKeeperConf,
|
||||
ttid: &TenantTimelineId,
|
||||
) -> Result<(FileStorage, SafeKeeperState)> {
|
||||
fs::create_dir_all(conf.timeline_dir(ttid)).expect("failed to create timeline dir");
|
||||
fs::create_dir_all(conf.timeline_dir(ttid))
|
||||
.await
|
||||
.expect("failed to create timeline dir");
|
||||
Ok((
|
||||
FileStorage::restore_new(ttid, conf)?,
|
||||
FileStorage::load_control_file_conf(conf, ttid)?,
|
||||
))
|
||||
}
|
||||
|
||||
fn create(
|
||||
async fn create(
|
||||
conf: &SafeKeeperConf,
|
||||
ttid: &TenantTimelineId,
|
||||
) -> Result<(FileStorage, SafeKeeperState)> {
|
||||
fs::create_dir_all(conf.timeline_dir(ttid)).expect("failed to create timeline dir");
|
||||
fs::create_dir_all(conf.timeline_dir(ttid))
|
||||
.await
|
||||
.expect("failed to create timeline dir");
|
||||
let state = SafeKeeperState::empty();
|
||||
let storage = FileStorage::create_new(ttid, conf, state.clone())?;
|
||||
Ok((storage, state))
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_read_write_safekeeper_state() {
|
||||
#[tokio::test]
|
||||
async fn test_read_write_safekeeper_state() {
|
||||
let conf = stub_conf();
|
||||
let ttid = TenantTimelineId::generate();
|
||||
{
|
||||
let (mut storage, mut state) = create(&conf, &ttid).expect("failed to create state");
|
||||
let (mut storage, mut state) =
|
||||
create(&conf, &ttid).await.expect("failed to create state");
|
||||
// change something
|
||||
state.commit_lsn = Lsn(42);
|
||||
storage.persist(&state).expect("failed to persist state");
|
||||
storage
|
||||
.persist(&state)
|
||||
.await
|
||||
.expect("failed to persist state");
|
||||
}
|
||||
|
||||
let (_, state) = load_from_control_file(&conf, &ttid).expect("failed to read state");
|
||||
let (_, state) = load_from_control_file(&conf, &ttid)
|
||||
.await
|
||||
.expect("failed to read state");
|
||||
assert_eq!(state.commit_lsn, Lsn(42));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_safekeeper_state_checksum_mismatch() {
|
||||
#[tokio::test]
|
||||
async fn test_safekeeper_state_checksum_mismatch() {
|
||||
let conf = stub_conf();
|
||||
let ttid = TenantTimelineId::generate();
|
||||
{
|
||||
let (mut storage, mut state) = create(&conf, &ttid).expect("failed to read state");
|
||||
let (mut storage, mut state) =
|
||||
create(&conf, &ttid).await.expect("failed to read state");
|
||||
|
||||
// change something
|
||||
state.commit_lsn = Lsn(42);
|
||||
storage.persist(&state).expect("failed to persist state");
|
||||
storage
|
||||
.persist(&state)
|
||||
.await
|
||||
.expect("failed to persist state");
|
||||
}
|
||||
let control_path = conf.timeline_dir(&ttid).join(CONTROL_FILE_NAME);
|
||||
let mut data = fs::read(&control_path).unwrap();
|
||||
let mut data = fs::read(&control_path).await.unwrap();
|
||||
data[0] += 1; // change the first byte of the file to fail checksum validation
|
||||
fs::write(&control_path, &data).expect("failed to write control file");
|
||||
fs::write(&control_path, &data)
|
||||
.await
|
||||
.expect("failed to write control file");
|
||||
|
||||
match load_from_control_file(&conf, &ttid) {
|
||||
match load_from_control_file(&conf, &ttid).await {
|
||||
Err(err) => assert!(err
|
||||
.to_string()
|
||||
.contains("safekeeper control file checksum mismatch")),
|
||||
|
||||
@@ -121,7 +121,7 @@ pub struct FileInfo {
|
||||
}
|
||||
|
||||
/// Build debug dump response, using the provided [`Args`] filters.
|
||||
pub fn build(args: Args) -> Result<Response> {
|
||||
pub async fn build(args: Args) -> Result<Response> {
|
||||
let start_time = Utc::now();
|
||||
let timelines_count = GlobalTimelines::timelines_count();
|
||||
|
||||
@@ -155,7 +155,7 @@ pub fn build(args: Args) -> Result<Response> {
|
||||
}
|
||||
|
||||
let control_file = if args.dump_control_file {
|
||||
let mut state = tli.get_state().1;
|
||||
let mut state = tli.get_state().await.1;
|
||||
if !args.dump_term_history {
|
||||
state.acceptor_state.term_history = TermHistory(vec![]);
|
||||
}
|
||||
@@ -165,7 +165,7 @@ pub fn build(args: Args) -> Result<Response> {
|
||||
};
|
||||
|
||||
let memory = if args.dump_memory {
|
||||
Some(tli.memory_dump())
|
||||
Some(tli.memory_dump().await)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
@@ -256,14 +256,14 @@ impl SafekeeperPostgresHandler {
|
||||
|
||||
let lsn = if self.is_walproposer_recovery() {
|
||||
// walproposer should get all local WAL until flush_lsn
|
||||
tli.get_flush_lsn()
|
||||
tli.get_flush_lsn().await
|
||||
} else {
|
||||
// other clients shouldn't get any uncommitted WAL
|
||||
tli.get_state().0.commit_lsn
|
||||
tli.get_state().await.0.commit_lsn
|
||||
}
|
||||
.to_string();
|
||||
|
||||
let sysid = tli.get_state().1.server.system_id.to_string();
|
||||
let sysid = tli.get_state().await.1.server.system_id.to_string();
|
||||
let lsn_bytes = lsn.as_bytes();
|
||||
let tli = PG_TLI.to_string();
|
||||
let tli_bytes = tli.as_bytes();
|
||||
|
||||
@@ -2,3 +2,18 @@ pub mod routes;
|
||||
pub use routes::make_router;
|
||||
|
||||
pub use safekeeper_api::models;
|
||||
|
||||
use crate::SafeKeeperConf;
|
||||
|
||||
pub async fn task_main(
|
||||
conf: SafeKeeperConf,
|
||||
http_listener: std::net::TcpListener,
|
||||
) -> anyhow::Result<()> {
|
||||
let router = make_router(conf)
|
||||
.build()
|
||||
.map_err(|err| anyhow::anyhow!(err))?;
|
||||
let service = utils::http::RouterService::new(router).unwrap();
|
||||
let server = hyper::Server::from_tcp(http_listener)?;
|
||||
server.serve(service).await?;
|
||||
Ok(()) // unreachable
|
||||
}
|
||||
|
||||
@@ -13,7 +13,7 @@ use storage_broker::proto::SafekeeperTimelineInfo;
|
||||
use storage_broker::proto::TenantTimelineId as ProtoTenantTimelineId;
|
||||
use tokio::fs::File;
|
||||
use tokio::io::AsyncReadExt;
|
||||
use tokio::task::JoinError;
|
||||
use utils::http::endpoint::request_span;
|
||||
|
||||
use crate::safekeeper::ServerInfo;
|
||||
use crate::safekeeper::Term;
|
||||
@@ -116,8 +116,8 @@ async fn timeline_status_handler(request: Request<Body>) -> Result<Response<Body
|
||||
check_permission(&request, Some(ttid.tenant_id))?;
|
||||
|
||||
let tli = GlobalTimelines::get(ttid).map_err(ApiError::from)?;
|
||||
let (inmem, state) = tli.get_state();
|
||||
let flush_lsn = tli.get_flush_lsn();
|
||||
let (inmem, state) = tli.get_state().await;
|
||||
let flush_lsn = tli.get_flush_lsn().await;
|
||||
|
||||
let epoch = state.acceptor_state.get_epoch(flush_lsn);
|
||||
let term_history = state
|
||||
@@ -232,13 +232,11 @@ async fn timeline_delete_force_handler(
|
||||
);
|
||||
check_permission(&request, Some(ttid.tenant_id))?;
|
||||
ensure_no_body(&mut request).await?;
|
||||
let resp = tokio::task::spawn_blocking(move || {
|
||||
// FIXME: `delete_force` can fail from both internal errors and bad requests. Add better
|
||||
// error handling here when we're able to.
|
||||
GlobalTimelines::delete_force(&ttid).map_err(ApiError::InternalServerError)
|
||||
})
|
||||
.await
|
||||
.map_err(|e: JoinError| ApiError::InternalServerError(e.into()))??;
|
||||
// FIXME: `delete_force` can fail from both internal errors and bad requests. Add better
|
||||
// error handling here when we're able to.
|
||||
let resp = GlobalTimelines::delete_force(&ttid)
|
||||
.await
|
||||
.map_err(ApiError::InternalServerError)?;
|
||||
json_response(StatusCode::OK, resp)
|
||||
}
|
||||
|
||||
@@ -250,14 +248,11 @@ async fn tenant_delete_force_handler(
|
||||
let tenant_id = parse_request_param(&request, "tenant_id")?;
|
||||
check_permission(&request, Some(tenant_id))?;
|
||||
ensure_no_body(&mut request).await?;
|
||||
let delete_info = tokio::task::spawn_blocking(move || {
|
||||
// FIXME: `delete_force_all_for_tenant` can return an error for multiple different reasons;
|
||||
// Using an `InternalServerError` should be fixed when the types support it
|
||||
GlobalTimelines::delete_force_all_for_tenant(&tenant_id)
|
||||
.map_err(ApiError::InternalServerError)
|
||||
})
|
||||
.await
|
||||
.map_err(|e: JoinError| ApiError::InternalServerError(e.into()))??;
|
||||
// FIXME: `delete_force_all_for_tenant` can return an error for multiple different reasons;
|
||||
// Using an `InternalServerError` should be fixed when the types support it
|
||||
let delete_info = GlobalTimelines::delete_force_all_for_tenant(&tenant_id)
|
||||
.await
|
||||
.map_err(ApiError::InternalServerError)?;
|
||||
json_response(
|
||||
StatusCode::OK,
|
||||
delete_info
|
||||
@@ -353,11 +348,9 @@ async fn dump_debug_handler(mut request: Request<Body>) -> Result<Response<Body>
|
||||
timeline_id,
|
||||
};
|
||||
|
||||
let resp = tokio::task::spawn_blocking(move || {
|
||||
debug_dump::build(args).map_err(ApiError::InternalServerError)
|
||||
})
|
||||
.await
|
||||
.map_err(|e: JoinError| ApiError::InternalServerError(e.into()))??;
|
||||
let resp = debug_dump::build(args)
|
||||
.await
|
||||
.map_err(ApiError::InternalServerError)?;
|
||||
|
||||
// TODO: use streaming response
|
||||
json_response(StatusCode::OK, resp)
|
||||
@@ -386,29 +379,32 @@ pub fn make_router(conf: SafeKeeperConf) -> RouterBuilder<hyper::Body, ApiError>
|
||||
router
|
||||
.data(Arc::new(conf))
|
||||
.data(auth)
|
||||
.get("/v1/status", status_handler)
|
||||
.get("/v1/status", |r| request_span(r, status_handler))
|
||||
// Will be used in the future instead of implicit timeline creation
|
||||
.post("/v1/tenant/timeline", timeline_create_handler)
|
||||
.get(
|
||||
"/v1/tenant/:tenant_id/timeline/:timeline_id",
|
||||
timeline_status_handler,
|
||||
)
|
||||
.delete(
|
||||
"/v1/tenant/:tenant_id/timeline/:timeline_id",
|
||||
timeline_delete_force_handler,
|
||||
)
|
||||
.delete("/v1/tenant/:tenant_id", tenant_delete_force_handler)
|
||||
.post("/v1/pull_timeline", timeline_pull_handler)
|
||||
.post("/v1/tenant/timeline", |r| {
|
||||
request_span(r, timeline_create_handler)
|
||||
})
|
||||
.get("/v1/tenant/:tenant_id/timeline/:timeline_id", |r| {
|
||||
request_span(r, timeline_status_handler)
|
||||
})
|
||||
.delete("/v1/tenant/:tenant_id/timeline/:timeline_id", |r| {
|
||||
request_span(r, timeline_delete_force_handler)
|
||||
})
|
||||
.delete("/v1/tenant/:tenant_id", |r| {
|
||||
request_span(r, tenant_delete_force_handler)
|
||||
})
|
||||
.post("/v1/pull_timeline", |r| {
|
||||
request_span(r, timeline_pull_handler)
|
||||
})
|
||||
.get(
|
||||
"/v1/tenant/:tenant_id/timeline/:timeline_id/file/:filename",
|
||||
timeline_files_handler,
|
||||
|r| request_span(r, timeline_files_handler),
|
||||
)
|
||||
// for tests
|
||||
.post(
|
||||
"/v1/record_safekeeper_info/:tenant_id/:timeline_id",
|
||||
record_safekeeper_info,
|
||||
)
|
||||
.get("/v1/debug_dump", dump_debug_handler)
|
||||
.post("/v1/record_safekeeper_info/:tenant_id/:timeline_id", |r| {
|
||||
request_span(r, record_safekeeper_info)
|
||||
})
|
||||
.get("/v1/debug_dump", |r| request_span(r, dump_debug_handler))
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
||||
@@ -73,12 +73,12 @@ pub async fn handle_json_ctrl<IO: AsyncRead + AsyncWrite + Unpin>(
|
||||
|
||||
// if send_proposer_elected is true, we need to update local history
|
||||
if append_request.send_proposer_elected {
|
||||
send_proposer_elected(&tli, append_request.term, append_request.epoch_start_lsn)?;
|
||||
send_proposer_elected(&tli, append_request.term, append_request.epoch_start_lsn).await?;
|
||||
}
|
||||
|
||||
let inserted_wal = append_logical_message(&tli, append_request)?;
|
||||
let inserted_wal = append_logical_message(&tli, append_request).await?;
|
||||
let response = AppendResult {
|
||||
state: tli.get_state().1,
|
||||
state: tli.get_state().await.1,
|
||||
inserted_wal,
|
||||
};
|
||||
let response_data = serde_json::to_vec(&response)
|
||||
@@ -114,9 +114,9 @@ async fn prepare_safekeeper(
|
||||
.await
|
||||
}
|
||||
|
||||
fn send_proposer_elected(tli: &Arc<Timeline>, term: Term, lsn: Lsn) -> anyhow::Result<()> {
|
||||
async fn send_proposer_elected(tli: &Arc<Timeline>, term: Term, lsn: Lsn) -> anyhow::Result<()> {
|
||||
// add new term to existing history
|
||||
let history = tli.get_state().1.acceptor_state.term_history;
|
||||
let history = tli.get_state().await.1.acceptor_state.term_history;
|
||||
let history = history.up_to(lsn.checked_sub(1u64).unwrap());
|
||||
let mut history_entries = history.0;
|
||||
history_entries.push(TermSwitchEntry { term, lsn });
|
||||
@@ -129,7 +129,7 @@ fn send_proposer_elected(tli: &Arc<Timeline>, term: Term, lsn: Lsn) -> anyhow::R
|
||||
timeline_start_lsn: lsn,
|
||||
});
|
||||
|
||||
tli.process_msg(&proposer_elected_request)?;
|
||||
tli.process_msg(&proposer_elected_request).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -142,12 +142,12 @@ pub struct InsertedWAL {
|
||||
|
||||
/// Extend local WAL with new LogicalMessage record. To do that,
|
||||
/// create AppendRequest with new WAL and pass it to safekeeper.
|
||||
pub fn append_logical_message(
|
||||
pub async fn append_logical_message(
|
||||
tli: &Arc<Timeline>,
|
||||
msg: &AppendLogicalMessage,
|
||||
) -> anyhow::Result<InsertedWAL> {
|
||||
let wal_data = encode_logical_message(&msg.lm_prefix, &msg.lm_message);
|
||||
let sk_state = tli.get_state().1;
|
||||
let sk_state = tli.get_state().await.1;
|
||||
|
||||
let begin_lsn = msg.begin_lsn;
|
||||
let end_lsn = begin_lsn + wal_data.len() as u64;
|
||||
@@ -171,7 +171,7 @@ pub fn append_logical_message(
|
||||
wal_data: Bytes::from(wal_data),
|
||||
});
|
||||
|
||||
let response = tli.process_msg(&append_request)?;
|
||||
let response = tli.process_msg(&append_request).await?;
|
||||
|
||||
let append_response = match response {
|
||||
Some(AcceptorProposerMessage::AppendResponse(resp)) => resp,
|
||||
|
||||
@@ -1,4 +1,6 @@
|
||||
use once_cell::sync::Lazy;
|
||||
use remote_storage::RemoteStorageConfig;
|
||||
use tokio::runtime::Runtime;
|
||||
|
||||
use std::path::PathBuf;
|
||||
use std::time::Duration;
|
||||
@@ -36,7 +38,6 @@ pub mod defaults {
|
||||
DEFAULT_PG_LISTEN_PORT,
|
||||
};
|
||||
|
||||
pub const DEFAULT_WAL_BACKUP_RUNTIME_THREADS: usize = 8;
|
||||
pub const DEFAULT_HEARTBEAT_TIMEOUT: &str = "5000ms";
|
||||
pub const DEFAULT_MAX_OFFLOADER_LAG_BYTES: u64 = 128 * (1 << 20);
|
||||
}
|
||||
@@ -60,10 +61,10 @@ pub struct SafeKeeperConf {
|
||||
pub heartbeat_timeout: Duration,
|
||||
pub remote_storage: Option<RemoteStorageConfig>,
|
||||
pub max_offloader_lag_bytes: u64,
|
||||
pub backup_runtime_threads: Option<usize>,
|
||||
pub backup_parallel_jobs: usize,
|
||||
pub wal_backup_enabled: bool,
|
||||
pub auth: Option<Arc<JwtAuth>>,
|
||||
pub current_thread_runtime: bool,
|
||||
}
|
||||
|
||||
impl SafeKeeperConf {
|
||||
@@ -92,12 +93,64 @@ impl SafeKeeperConf {
|
||||
.parse()
|
||||
.expect("failed to parse default broker endpoint"),
|
||||
broker_keepalive_interval: Duration::from_secs(5),
|
||||
backup_runtime_threads: None,
|
||||
wal_backup_enabled: true,
|
||||
backup_parallel_jobs: 1,
|
||||
auth: None,
|
||||
heartbeat_timeout: Duration::new(5, 0),
|
||||
max_offloader_lag_bytes: defaults::DEFAULT_MAX_OFFLOADER_LAG_BYTES,
|
||||
current_thread_runtime: false,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Tokio runtimes.
|
||||
pub static WAL_SERVICE_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
|
||||
tokio::runtime::Builder::new_multi_thread()
|
||||
.thread_name("WAL service worker")
|
||||
.enable_all()
|
||||
.build()
|
||||
.expect("Failed to create WAL service runtime")
|
||||
});
|
||||
|
||||
pub static HTTP_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
|
||||
tokio::runtime::Builder::new_multi_thread()
|
||||
.thread_name("HTTP worker")
|
||||
.enable_all()
|
||||
.build()
|
||||
.expect("Failed to create WAL service runtime")
|
||||
});
|
||||
|
||||
pub static BROKER_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
|
||||
tokio::runtime::Builder::new_multi_thread()
|
||||
.thread_name("broker worker")
|
||||
.worker_threads(2) // there are only 2 tasks, having more threads doesn't make sense
|
||||
.enable_all()
|
||||
.build()
|
||||
.expect("Failed to create broker runtime")
|
||||
});
|
||||
|
||||
pub static WAL_REMOVER_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
|
||||
tokio::runtime::Builder::new_multi_thread()
|
||||
.thread_name("WAL remover")
|
||||
.worker_threads(1)
|
||||
.enable_all()
|
||||
.build()
|
||||
.expect("Failed to create broker runtime")
|
||||
});
|
||||
|
||||
pub static WAL_BACKUP_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
|
||||
tokio::runtime::Builder::new_multi_thread()
|
||||
.thread_name("WAL backup worker")
|
||||
.enable_all()
|
||||
.build()
|
||||
.expect("Failed to create WAL backup runtime")
|
||||
});
|
||||
|
||||
pub static METRICS_SHIFTER_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
|
||||
tokio::runtime::Builder::new_multi_thread()
|
||||
.thread_name("metric shifter")
|
||||
.worker_threads(1)
|
||||
.enable_all()
|
||||
.build()
|
||||
.expect("Failed to create broker runtime")
|
||||
});
|
||||
|
||||
@@ -7,6 +7,7 @@ use std::{
|
||||
|
||||
use ::metrics::{register_histogram, GaugeVec, Histogram, IntGauge, DISK_WRITE_SECONDS_BUCKETS};
|
||||
use anyhow::Result;
|
||||
use futures::Future;
|
||||
use metrics::{
|
||||
core::{AtomicU64, Collector, Desc, GenericCounter, GenericGaugeVec, Opts},
|
||||
proto::MetricFamily,
|
||||
@@ -292,14 +293,17 @@ impl WalStorageMetrics {
|
||||
}
|
||||
}
|
||||
|
||||
/// Accepts a closure that returns a result, and returns the duration of the closure.
|
||||
pub fn time_io_closure(closure: impl FnOnce() -> Result<()>) -> Result<f64> {
|
||||
/// Accepts async function that returns empty anyhow result, and returns the duration of its execution.
|
||||
pub async fn time_io_closure<E: Into<anyhow::Error>>(
|
||||
closure: impl Future<Output = Result<(), E>>,
|
||||
) -> Result<f64> {
|
||||
let start = std::time::Instant::now();
|
||||
closure()?;
|
||||
closure.await.map_err(|e| e.into())?;
|
||||
Ok(start.elapsed().as_secs_f64())
|
||||
}
|
||||
|
||||
/// Metrics for a single timeline.
|
||||
#[derive(Clone)]
|
||||
pub struct FullTimelineInfo {
|
||||
pub ttid: TenantTimelineId,
|
||||
pub ps_feedback: PageserverFeedback,
|
||||
@@ -575,13 +579,19 @@ impl Collector for TimelineCollector {
|
||||
let timelines = GlobalTimelines::get_all();
|
||||
let timelines_count = timelines.len();
|
||||
|
||||
for arc_tli in timelines {
|
||||
let tli = arc_tli.info_for_metrics();
|
||||
if tli.is_none() {
|
||||
continue;
|
||||
}
|
||||
let tli = tli.unwrap();
|
||||
// Prometheus Collector is sync, and data is stored under async lock. To
|
||||
// bridge the gap with a crutch, collect data in spawned thread with
|
||||
// local tokio runtime.
|
||||
let infos = std::thread::spawn(|| {
|
||||
let rt = tokio::runtime::Builder::new_current_thread()
|
||||
.build()
|
||||
.expect("failed to create rt");
|
||||
rt.block_on(collect_timeline_metrics())
|
||||
})
|
||||
.join()
|
||||
.expect("collect_timeline_metrics thread panicked");
|
||||
|
||||
for tli in &infos {
|
||||
let tenant_id = tli.ttid.tenant_id.to_string();
|
||||
let timeline_id = tli.ttid.timeline_id.to_string();
|
||||
let labels = &[tenant_id.as_str(), timeline_id.as_str()];
|
||||
@@ -682,3 +692,15 @@ impl Collector for TimelineCollector {
|
||||
mfs
|
||||
}
|
||||
}
|
||||
|
||||
async fn collect_timeline_metrics() -> Vec<FullTimelineInfo> {
|
||||
let mut res = vec![];
|
||||
let timelines = GlobalTimelines::get_all();
|
||||
|
||||
for tli in timelines {
|
||||
if let Some(info) = tli.info_for_metrics().await {
|
||||
res.push(info);
|
||||
}
|
||||
}
|
||||
res
|
||||
}
|
||||
|
||||
@@ -231,7 +231,7 @@ async fn pull_timeline(status: TimelineStatus, host: String) -> Result<Response>
|
||||
info!(
|
||||
"Loaded timeline {}, flush_lsn={}",
|
||||
ttid,
|
||||
tli.get_flush_lsn()
|
||||
tli.get_flush_lsn().await
|
||||
);
|
||||
|
||||
Ok(Response {
|
||||
|
||||
@@ -18,15 +18,14 @@ use postgres_backend::QueryError;
|
||||
use pq_proto::BeMessage;
|
||||
use std::net::SocketAddr;
|
||||
use std::sync::Arc;
|
||||
use std::thread;
|
||||
use std::thread::JoinHandle;
|
||||
use tokio::io::AsyncRead;
|
||||
use tokio::io::AsyncWrite;
|
||||
use tokio::sync::mpsc::channel;
|
||||
use tokio::sync::mpsc::error::TryRecvError;
|
||||
use tokio::sync::mpsc::Receiver;
|
||||
use tokio::sync::mpsc::Sender;
|
||||
use tokio::task::spawn_blocking;
|
||||
use tokio::task;
|
||||
use tokio::task::JoinHandle;
|
||||
use tokio::time::Duration;
|
||||
use tokio::time::Instant;
|
||||
use tracing::*;
|
||||
@@ -97,7 +96,7 @@ impl SafekeeperPostgresHandler {
|
||||
Err(res.expect_err("no error with WalAcceptor not spawn"))
|
||||
}
|
||||
Some(handle) => {
|
||||
let wal_acceptor_res = handle.join();
|
||||
let wal_acceptor_res = handle.await;
|
||||
|
||||
// If there was any network error, return it.
|
||||
res?;
|
||||
@@ -107,7 +106,7 @@ impl SafekeeperPostgresHandler {
|
||||
Ok(Ok(_)) => Ok(()), // can't happen currently; would be if we add graceful termination
|
||||
Ok(Err(e)) => Err(CopyStreamHandlerEnd::Other(e.context("WAL acceptor"))),
|
||||
Err(_) => Err(CopyStreamHandlerEnd::Other(anyhow!(
|
||||
"WalAcceptor thread panicked",
|
||||
"WalAcceptor task panicked",
|
||||
))),
|
||||
}
|
||||
}
|
||||
@@ -154,10 +153,12 @@ impl<'a, IO: AsyncRead + AsyncWrite + Unpin> NetworkReader<'a, IO> {
|
||||
}
|
||||
};
|
||||
|
||||
*self.acceptor_handle = Some(
|
||||
WalAcceptor::spawn(tli.clone(), msg_rx, reply_tx, self.conn_id)
|
||||
.context("spawn WalAcceptor thread")?,
|
||||
);
|
||||
*self.acceptor_handle = Some(WalAcceptor::spawn(
|
||||
tli.clone(),
|
||||
msg_rx,
|
||||
reply_tx,
|
||||
self.conn_id,
|
||||
));
|
||||
|
||||
// Forward all messages to WalAcceptor
|
||||
read_network_loop(self.pgb_reader, msg_tx, next_msg).await
|
||||
@@ -226,28 +227,19 @@ impl WalAcceptor {
|
||||
msg_rx: Receiver<ProposerAcceptorMessage>,
|
||||
reply_tx: Sender<AcceptorProposerMessage>,
|
||||
conn_id: ConnectionId,
|
||||
) -> anyhow::Result<JoinHandle<anyhow::Result<()>>> {
|
||||
let thread_name = format!("WAL acceptor {}", tli.ttid);
|
||||
thread::Builder::new()
|
||||
.name(thread_name)
|
||||
.spawn(move || -> anyhow::Result<()> {
|
||||
let mut wa = WalAcceptor {
|
||||
tli,
|
||||
msg_rx,
|
||||
reply_tx,
|
||||
};
|
||||
) -> JoinHandle<anyhow::Result<()>> {
|
||||
task::spawn(async move {
|
||||
let mut wa = WalAcceptor {
|
||||
tli,
|
||||
msg_rx,
|
||||
reply_tx,
|
||||
};
|
||||
|
||||
let runtime = tokio::runtime::Builder::new_current_thread()
|
||||
.enable_all()
|
||||
.build()?;
|
||||
|
||||
let span_ttid = wa.tli.ttid; // satisfy borrow checker
|
||||
runtime.block_on(
|
||||
wa.run()
|
||||
.instrument(info_span!("WAL acceptor", cid = %conn_id, ttid = %span_ttid)),
|
||||
)
|
||||
})
|
||||
.map_err(anyhow::Error::from)
|
||||
let span_ttid = wa.tli.ttid; // satisfy borrow checker
|
||||
wa.run()
|
||||
.instrument(info_span!("WAL acceptor", cid = %conn_id, ttid = %span_ttid))
|
||||
.await
|
||||
})
|
||||
}
|
||||
|
||||
/// The main loop. Returns Ok(()) if either msg_rx or reply_tx got closed;
|
||||
@@ -281,7 +273,7 @@ impl WalAcceptor {
|
||||
while let ProposerAcceptorMessage::AppendRequest(append_request) = next_msg {
|
||||
let noflush_msg = ProposerAcceptorMessage::NoFlushAppendRequest(append_request);
|
||||
|
||||
if let Some(reply) = self.tli.process_msg(&noflush_msg)? {
|
||||
if let Some(reply) = self.tli.process_msg(&noflush_msg).await? {
|
||||
if self.reply_tx.send(reply).await.is_err() {
|
||||
return Ok(()); // chan closed, streaming terminated
|
||||
}
|
||||
@@ -300,10 +292,12 @@ impl WalAcceptor {
|
||||
}
|
||||
|
||||
// flush all written WAL to the disk
|
||||
self.tli.process_msg(&ProposerAcceptorMessage::FlushWAL)?
|
||||
self.tli
|
||||
.process_msg(&ProposerAcceptorMessage::FlushWAL)
|
||||
.await?
|
||||
} else {
|
||||
// process message other than AppendRequest
|
||||
self.tli.process_msg(&next_msg)?
|
||||
self.tli.process_msg(&next_msg).await?
|
||||
};
|
||||
|
||||
if let Some(reply) = reply_msg {
|
||||
@@ -326,8 +320,8 @@ impl Drop for ComputeConnectionGuard {
|
||||
let tli = self.timeline.clone();
|
||||
// tokio forbids to call blocking_send inside the runtime, and see
|
||||
// comments in on_compute_disconnect why we call blocking_send.
|
||||
spawn_blocking(move || {
|
||||
if let Err(e) = tli.on_compute_disconnect() {
|
||||
tokio::spawn(async move {
|
||||
if let Err(e) = tli.on_compute_disconnect().await {
|
||||
error!("failed to unregister compute connection: {}", e);
|
||||
}
|
||||
});
|
||||
|
||||
@@ -1,29 +1,36 @@
|
||||
//! Thread removing old WAL.
|
||||
|
||||
use std::{thread, time::Duration};
|
||||
use std::time::Duration;
|
||||
|
||||
use tokio::time::sleep;
|
||||
use tracing::*;
|
||||
|
||||
use crate::{GlobalTimelines, SafeKeeperConf};
|
||||
|
||||
pub fn thread_main(conf: SafeKeeperConf) {
|
||||
pub async fn task_main(conf: SafeKeeperConf) -> anyhow::Result<()> {
|
||||
let wal_removal_interval = Duration::from_millis(5000);
|
||||
loop {
|
||||
let tlis = GlobalTimelines::get_all();
|
||||
for tli in &tlis {
|
||||
if !tli.is_active() {
|
||||
if !tli.is_active().await {
|
||||
continue;
|
||||
}
|
||||
let ttid = tli.ttid;
|
||||
let _enter =
|
||||
info_span!("", tenant = %ttid.tenant_id, timeline = %ttid.timeline_id).entered();
|
||||
if let Err(e) = tli.maybe_pesist_control_file() {
|
||||
if let Err(e) = tli
|
||||
.maybe_persist_control_file()
|
||||
.instrument(info_span!("", tenant = %ttid.tenant_id, timeline = %ttid.timeline_id))
|
||||
.await
|
||||
{
|
||||
warn!("failed to persist control file: {e}");
|
||||
}
|
||||
if let Err(e) = tli.remove_old_wal(conf.wal_backup_enabled) {
|
||||
warn!("failed to remove WAL: {}", e);
|
||||
if let Err(e) = tli
|
||||
.remove_old_wal(conf.wal_backup_enabled)
|
||||
.instrument(info_span!("", tenant = %ttid.tenant_id, timeline = %ttid.timeline_id))
|
||||
.await
|
||||
{
|
||||
error!("failed to remove WAL: {}", e);
|
||||
}
|
||||
}
|
||||
thread::sleep(wal_removal_interval)
|
||||
sleep(wal_removal_interval).await;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -568,25 +568,27 @@ where
|
||||
|
||||
/// Process message from proposer and possibly form reply. Concurrent
|
||||
/// callers must exclude each other.
|
||||
pub fn process_msg(
|
||||
pub async fn process_msg(
|
||||
&mut self,
|
||||
msg: &ProposerAcceptorMessage,
|
||||
) -> Result<Option<AcceptorProposerMessage>> {
|
||||
match msg {
|
||||
ProposerAcceptorMessage::Greeting(msg) => self.handle_greeting(msg),
|
||||
ProposerAcceptorMessage::VoteRequest(msg) => self.handle_vote_request(msg),
|
||||
ProposerAcceptorMessage::Elected(msg) => self.handle_elected(msg),
|
||||
ProposerAcceptorMessage::AppendRequest(msg) => self.handle_append_request(msg, true),
|
||||
ProposerAcceptorMessage::NoFlushAppendRequest(msg) => {
|
||||
self.handle_append_request(msg, false)
|
||||
ProposerAcceptorMessage::Greeting(msg) => self.handle_greeting(msg).await,
|
||||
ProposerAcceptorMessage::VoteRequest(msg) => self.handle_vote_request(msg).await,
|
||||
ProposerAcceptorMessage::Elected(msg) => self.handle_elected(msg).await,
|
||||
ProposerAcceptorMessage::AppendRequest(msg) => {
|
||||
self.handle_append_request(msg, true).await
|
||||
}
|
||||
ProposerAcceptorMessage::FlushWAL => self.handle_flush(),
|
||||
ProposerAcceptorMessage::NoFlushAppendRequest(msg) => {
|
||||
self.handle_append_request(msg, false).await
|
||||
}
|
||||
ProposerAcceptorMessage::FlushWAL => self.handle_flush().await,
|
||||
}
|
||||
}
|
||||
|
||||
/// Handle initial message from proposer: check its sanity and send my
|
||||
/// current term.
|
||||
fn handle_greeting(
|
||||
async fn handle_greeting(
|
||||
&mut self,
|
||||
msg: &ProposerGreeting,
|
||||
) -> Result<Option<AcceptorProposerMessage>> {
|
||||
@@ -649,7 +651,7 @@ where
|
||||
if msg.pg_version != UNKNOWN_SERVER_VERSION {
|
||||
state.server.pg_version = msg.pg_version;
|
||||
}
|
||||
self.state.persist(&state)?;
|
||||
self.state.persist(&state).await?;
|
||||
}
|
||||
|
||||
info!(
|
||||
@@ -664,7 +666,7 @@ where
|
||||
}
|
||||
|
||||
/// Give vote for the given term, if we haven't done that previously.
|
||||
fn handle_vote_request(
|
||||
async fn handle_vote_request(
|
||||
&mut self,
|
||||
msg: &VoteRequest,
|
||||
) -> Result<Option<AcceptorProposerMessage>> {
|
||||
@@ -678,7 +680,7 @@ where
|
||||
// handle_elected instead. Currently not a big deal, as proposer is the
|
||||
// only source of WAL; with peer2peer recovery it would be more
|
||||
// important.
|
||||
self.wal_store.flush_wal()?;
|
||||
self.wal_store.flush_wal().await?;
|
||||
// initialize with refusal
|
||||
let mut resp = VoteResponse {
|
||||
term: self.state.acceptor_state.term,
|
||||
@@ -692,7 +694,7 @@ where
|
||||
let mut state = self.state.clone();
|
||||
state.acceptor_state.term = msg.term;
|
||||
// persist vote before sending it out
|
||||
self.state.persist(&state)?;
|
||||
self.state.persist(&state).await?;
|
||||
|
||||
resp.term = self.state.acceptor_state.term;
|
||||
resp.vote_given = true as u64;
|
||||
@@ -715,12 +717,15 @@ where
|
||||
ar
|
||||
}
|
||||
|
||||
fn handle_elected(&mut self, msg: &ProposerElected) -> Result<Option<AcceptorProposerMessage>> {
|
||||
async fn handle_elected(
|
||||
&mut self,
|
||||
msg: &ProposerElected,
|
||||
) -> Result<Option<AcceptorProposerMessage>> {
|
||||
info!("received ProposerElected {:?}", msg);
|
||||
if self.state.acceptor_state.term < msg.term {
|
||||
let mut state = self.state.clone();
|
||||
state.acceptor_state.term = msg.term;
|
||||
self.state.persist(&state)?;
|
||||
self.state.persist(&state).await?;
|
||||
}
|
||||
|
||||
// If our term is higher, ignore the message (next feedback will inform the compute)
|
||||
@@ -750,7 +755,7 @@ where
|
||||
// intersection of our history and history from msg
|
||||
|
||||
// truncate wal, update the LSNs
|
||||
self.wal_store.truncate_wal(msg.start_streaming_at)?;
|
||||
self.wal_store.truncate_wal(msg.start_streaming_at).await?;
|
||||
|
||||
// and now adopt term history from proposer
|
||||
{
|
||||
@@ -784,7 +789,7 @@ where
|
||||
self.inmem.backup_lsn = max(self.inmem.backup_lsn, state.timeline_start_lsn);
|
||||
|
||||
state.acceptor_state.term_history = msg.term_history.clone();
|
||||
self.persist_control_file(state)?;
|
||||
self.persist_control_file(state).await?;
|
||||
}
|
||||
|
||||
info!("start receiving WAL since {:?}", msg.start_streaming_at);
|
||||
@@ -796,7 +801,7 @@ where
|
||||
///
|
||||
/// Note: it is assumed that 'WAL we have is from the right term' check has
|
||||
/// already been done outside.
|
||||
fn update_commit_lsn(&mut self, mut candidate: Lsn) -> Result<()> {
|
||||
async fn update_commit_lsn(&mut self, mut candidate: Lsn) -> Result<()> {
|
||||
// Both peers and walproposer communicate this value, we might already
|
||||
// have a fresher (higher) version.
|
||||
candidate = max(candidate, self.inmem.commit_lsn);
|
||||
@@ -818,29 +823,32 @@ where
|
||||
// that we receive new epoch_start_lsn, and we still need to sync
|
||||
// control file in this case.
|
||||
if commit_lsn == self.epoch_start_lsn && self.state.commit_lsn != commit_lsn {
|
||||
self.persist_control_file(self.state.clone())?;
|
||||
self.persist_control_file(self.state.clone()).await?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Persist control file to disk, called only after timeline creation (bootstrap).
|
||||
pub fn persist(&mut self) -> Result<()> {
|
||||
self.persist_control_file(self.state.clone())
|
||||
pub async fn persist(&mut self) -> Result<()> {
|
||||
self.persist_control_file(self.state.clone()).await
|
||||
}
|
||||
|
||||
/// Persist in-memory state to the disk, taking other data from state.
|
||||
fn persist_control_file(&mut self, mut state: SafeKeeperState) -> Result<()> {
|
||||
async fn persist_control_file(&mut self, mut state: SafeKeeperState) -> Result<()> {
|
||||
state.commit_lsn = self.inmem.commit_lsn;
|
||||
state.backup_lsn = self.inmem.backup_lsn;
|
||||
state.peer_horizon_lsn = self.inmem.peer_horizon_lsn;
|
||||
state.proposer_uuid = self.inmem.proposer_uuid;
|
||||
self.state.persist(&state)
|
||||
self.state.persist(&state).await
|
||||
}
|
||||
|
||||
/// Persist control file if there is something to save and enough time
|
||||
/// passed after the last save.
|
||||
pub fn maybe_persist_control_file(&mut self, inmem_remote_consistent_lsn: Lsn) -> Result<()> {
|
||||
pub async fn maybe_persist_control_file(
|
||||
&mut self,
|
||||
inmem_remote_consistent_lsn: Lsn,
|
||||
) -> Result<()> {
|
||||
const CF_SAVE_INTERVAL: Duration = Duration::from_secs(300);
|
||||
if self.state.last_persist_at().elapsed() < CF_SAVE_INTERVAL {
|
||||
return Ok(());
|
||||
@@ -852,7 +860,7 @@ where
|
||||
if need_persist {
|
||||
let mut state = self.state.clone();
|
||||
state.remote_consistent_lsn = inmem_remote_consistent_lsn;
|
||||
self.persist_control_file(state)?;
|
||||
self.persist_control_file(state).await?;
|
||||
trace!("saved control file: {CF_SAVE_INTERVAL:?} passed");
|
||||
}
|
||||
Ok(())
|
||||
@@ -860,7 +868,7 @@ where
|
||||
|
||||
/// Handle request to append WAL.
|
||||
#[allow(clippy::comparison_chain)]
|
||||
fn handle_append_request(
|
||||
async fn handle_append_request(
|
||||
&mut self,
|
||||
msg: &AppendRequest,
|
||||
require_flush: bool,
|
||||
@@ -883,17 +891,19 @@ where
|
||||
|
||||
// do the job
|
||||
if !msg.wal_data.is_empty() {
|
||||
self.wal_store.write_wal(msg.h.begin_lsn, &msg.wal_data)?;
|
||||
self.wal_store
|
||||
.write_wal(msg.h.begin_lsn, &msg.wal_data)
|
||||
.await?;
|
||||
}
|
||||
|
||||
// flush wal to the disk, if required
|
||||
if require_flush {
|
||||
self.wal_store.flush_wal()?;
|
||||
self.wal_store.flush_wal().await?;
|
||||
}
|
||||
|
||||
// Update commit_lsn.
|
||||
if msg.h.commit_lsn != Lsn(0) {
|
||||
self.update_commit_lsn(msg.h.commit_lsn)?;
|
||||
self.update_commit_lsn(msg.h.commit_lsn).await?;
|
||||
}
|
||||
// Value calculated by walproposer can always lag:
|
||||
// - safekeepers can forget inmem value and send to proposer lower
|
||||
@@ -909,7 +919,7 @@ where
|
||||
if self.state.peer_horizon_lsn + (self.state.server.wal_seg_size as u64)
|
||||
< self.inmem.peer_horizon_lsn
|
||||
{
|
||||
self.persist_control_file(self.state.clone())?;
|
||||
self.persist_control_file(self.state.clone()).await?;
|
||||
}
|
||||
|
||||
trace!(
|
||||
@@ -931,15 +941,15 @@ where
|
||||
}
|
||||
|
||||
/// Flush WAL to disk. Return AppendResponse with latest LSNs.
|
||||
fn handle_flush(&mut self) -> Result<Option<AcceptorProposerMessage>> {
|
||||
self.wal_store.flush_wal()?;
|
||||
async fn handle_flush(&mut self) -> Result<Option<AcceptorProposerMessage>> {
|
||||
self.wal_store.flush_wal().await?;
|
||||
Ok(Some(AcceptorProposerMessage::AppendResponse(
|
||||
self.append_response(),
|
||||
)))
|
||||
}
|
||||
|
||||
/// Update timeline state with peer safekeeper data.
|
||||
pub fn record_safekeeper_info(&mut self, sk_info: &SafekeeperTimelineInfo) -> Result<()> {
|
||||
pub async fn record_safekeeper_info(&mut self, sk_info: &SafekeeperTimelineInfo) -> Result<()> {
|
||||
let mut sync_control_file = false;
|
||||
|
||||
if (Lsn(sk_info.commit_lsn) != Lsn::INVALID) && (sk_info.last_log_term != INVALID_TERM) {
|
||||
@@ -947,7 +957,7 @@ where
|
||||
// commit_lsn if our history matches (is part of) history of advanced
|
||||
// commit_lsn provider.
|
||||
if sk_info.last_log_term == self.get_epoch() {
|
||||
self.update_commit_lsn(Lsn(sk_info.commit_lsn))?;
|
||||
self.update_commit_lsn(Lsn(sk_info.commit_lsn)).await?;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -973,7 +983,7 @@ where
|
||||
// Note: we could make remote_consistent_lsn update in cf common by
|
||||
// storing Arc to walsenders in Safekeeper.
|
||||
state.remote_consistent_lsn = new_remote_consistent_lsn;
|
||||
self.persist_control_file(state)?;
|
||||
self.persist_control_file(state).await?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
@@ -997,6 +1007,7 @@ where
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use futures::future::BoxFuture;
|
||||
use postgres_ffi::WAL_SEGMENT_SIZE;
|
||||
|
||||
use super::*;
|
||||
@@ -1008,8 +1019,9 @@ mod tests {
|
||||
persisted_state: SafeKeeperState,
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl control_file::Storage for InMemoryState {
|
||||
fn persist(&mut self, s: &SafeKeeperState) -> Result<()> {
|
||||
async fn persist(&mut self, s: &SafeKeeperState) -> Result<()> {
|
||||
self.persisted_state = s.clone();
|
||||
Ok(())
|
||||
}
|
||||
@@ -1039,27 +1051,28 @@ mod tests {
|
||||
lsn: Lsn,
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl wal_storage::Storage for DummyWalStore {
|
||||
fn flush_lsn(&self) -> Lsn {
|
||||
self.lsn
|
||||
}
|
||||
|
||||
fn write_wal(&mut self, startpos: Lsn, buf: &[u8]) -> Result<()> {
|
||||
async fn write_wal(&mut self, startpos: Lsn, buf: &[u8]) -> Result<()> {
|
||||
self.lsn = startpos + buf.len() as u64;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn truncate_wal(&mut self, end_pos: Lsn) -> Result<()> {
|
||||
async fn truncate_wal(&mut self, end_pos: Lsn) -> Result<()> {
|
||||
self.lsn = end_pos;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn flush_wal(&mut self) -> Result<()> {
|
||||
async fn flush_wal(&mut self) -> Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn remove_up_to(&self) -> Box<dyn Fn(XLogSegNo) -> Result<()>> {
|
||||
Box::new(move |_segno_up_to: XLogSegNo| Ok(()))
|
||||
fn remove_up_to(&self, _segno_up_to: XLogSegNo) -> BoxFuture<'static, anyhow::Result<()>> {
|
||||
Box::pin(async { Ok(()) })
|
||||
}
|
||||
|
||||
fn get_metrics(&self) -> crate::metrics::WalStorageMetrics {
|
||||
@@ -1067,8 +1080,8 @@ mod tests {
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_voting() {
|
||||
#[tokio::test]
|
||||
async fn test_voting() {
|
||||
let storage = InMemoryState {
|
||||
persisted_state: test_sk_state(),
|
||||
};
|
||||
@@ -1077,7 +1090,7 @@ mod tests {
|
||||
|
||||
// check voting for 1 is ok
|
||||
let vote_request = ProposerAcceptorMessage::VoteRequest(VoteRequest { term: 1 });
|
||||
let mut vote_resp = sk.process_msg(&vote_request);
|
||||
let mut vote_resp = sk.process_msg(&vote_request).await;
|
||||
match vote_resp.unwrap() {
|
||||
Some(AcceptorProposerMessage::VoteResponse(resp)) => assert!(resp.vote_given != 0),
|
||||
r => panic!("unexpected response: {:?}", r),
|
||||
@@ -1092,15 +1105,15 @@ mod tests {
|
||||
sk = SafeKeeper::new(storage, sk.wal_store, NodeId(0)).unwrap();
|
||||
|
||||
// and ensure voting second time for 1 is not ok
|
||||
vote_resp = sk.process_msg(&vote_request);
|
||||
vote_resp = sk.process_msg(&vote_request).await;
|
||||
match vote_resp.unwrap() {
|
||||
Some(AcceptorProposerMessage::VoteResponse(resp)) => assert!(resp.vote_given == 0),
|
||||
r => panic!("unexpected response: {:?}", r),
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_epoch_switch() {
|
||||
#[tokio::test]
|
||||
async fn test_epoch_switch() {
|
||||
let storage = InMemoryState {
|
||||
persisted_state: test_sk_state(),
|
||||
};
|
||||
@@ -1132,10 +1145,13 @@ mod tests {
|
||||
timeline_start_lsn: Lsn(0),
|
||||
};
|
||||
sk.process_msg(&ProposerAcceptorMessage::Elected(pem))
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// check that AppendRequest before epochStartLsn doesn't switch epoch
|
||||
let resp = sk.process_msg(&ProposerAcceptorMessage::AppendRequest(append_request));
|
||||
let resp = sk
|
||||
.process_msg(&ProposerAcceptorMessage::AppendRequest(append_request))
|
||||
.await;
|
||||
assert!(resp.is_ok());
|
||||
assert_eq!(sk.get_epoch(), 0);
|
||||
|
||||
@@ -1146,9 +1162,11 @@ mod tests {
|
||||
h: ar_hdr,
|
||||
wal_data: Bytes::from_static(b"b"),
|
||||
};
|
||||
let resp = sk.process_msg(&ProposerAcceptorMessage::AppendRequest(append_request));
|
||||
let resp = sk
|
||||
.process_msg(&ProposerAcceptorMessage::AppendRequest(append_request))
|
||||
.await;
|
||||
assert!(resp.is_ok());
|
||||
sk.wal_store.truncate_wal(Lsn(3)).unwrap(); // imitate the complete record at 3 %)
|
||||
sk.wal_store.truncate_wal(Lsn(3)).await.unwrap(); // imitate the complete record at 3 %)
|
||||
assert_eq!(sk.get_epoch(), 1);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -396,7 +396,7 @@ impl SafekeeperPostgresHandler {
|
||||
// on this safekeeper itself. That's ok as (old) proposer will never be
|
||||
// able to commit such WAL.
|
||||
let stop_pos: Option<Lsn> = if self.is_walproposer_recovery() {
|
||||
let wal_end = tli.get_flush_lsn();
|
||||
let wal_end = tli.get_flush_lsn().await;
|
||||
Some(wal_end)
|
||||
} else {
|
||||
None
|
||||
@@ -418,7 +418,7 @@ impl SafekeeperPostgresHandler {
|
||||
// switch to copy
|
||||
pgb.write_message(&BeMessage::CopyBothResponse).await?;
|
||||
|
||||
let (_, persisted_state) = tli.get_state();
|
||||
let (_, persisted_state) = tli.get_state().await;
|
||||
let wal_reader = WalReader::new(
|
||||
self.conf.workdir.clone(),
|
||||
self.conf.timeline_dir(&tli.ttid),
|
||||
@@ -562,7 +562,7 @@ impl<IO: AsyncRead + AsyncWrite + Unpin> WalSender<'_, IO> {
|
||||
.walsenders
|
||||
.get_ws_remote_consistent_lsn(self.ws_guard.id)
|
||||
{
|
||||
if self.tli.should_walsender_stop(remote_consistent_lsn) {
|
||||
if self.tli.should_walsender_stop(remote_consistent_lsn).await {
|
||||
// Terminate if there is nothing more to send.
|
||||
return Err(CopyStreamHandlerEnd::ServerInitiated(format!(
|
||||
"ending streaming to {:?} at {}, receiver is caughtup and there is no computes",
|
||||
|
||||
@@ -2,12 +2,13 @@
|
||||
//! to glue together SafeKeeper and all other background services.
|
||||
|
||||
use anyhow::{anyhow, bail, Result};
|
||||
use parking_lot::{Mutex, MutexGuard};
|
||||
use postgres_ffi::XLogSegNo;
|
||||
use tokio::fs;
|
||||
|
||||
use std::cmp::max;
|
||||
use std::path::PathBuf;
|
||||
use std::sync::Arc;
|
||||
use tokio::sync::{Mutex, MutexGuard};
|
||||
use tokio::{
|
||||
sync::{mpsc::Sender, watch},
|
||||
time::Instant,
|
||||
@@ -286,8 +287,9 @@ pub struct Timeline {
|
||||
commit_lsn_watch_tx: watch::Sender<Lsn>,
|
||||
commit_lsn_watch_rx: watch::Receiver<Lsn>,
|
||||
|
||||
/// Safekeeper and other state, that should remain consistent and synchronized
|
||||
/// with the disk.
|
||||
/// Safekeeper and other state, that should remain consistent and
|
||||
/// synchronized with the disk. This is tokio mutex as we write WAL to disk
|
||||
/// while holding it, ensuring that consensus checks are in order.
|
||||
mutex: Mutex<SharedState>,
|
||||
walsenders: Arc<WalSenders>,
|
||||
|
||||
@@ -361,8 +363,8 @@ impl Timeline {
|
||||
///
|
||||
/// Bootstrap is transactional, so if it fails, created files will be deleted,
|
||||
/// and state on disk should remain unchanged.
|
||||
pub fn bootstrap(&self, shared_state: &mut MutexGuard<SharedState>) -> Result<()> {
|
||||
match std::fs::metadata(&self.timeline_dir) {
|
||||
pub async fn bootstrap(&self, shared_state: &mut MutexGuard<'_, SharedState>) -> Result<()> {
|
||||
match fs::metadata(&self.timeline_dir).await {
|
||||
Ok(_) => {
|
||||
// Timeline directory exists on disk, we should leave state unchanged
|
||||
// and return error.
|
||||
@@ -375,53 +377,51 @@ impl Timeline {
|
||||
}
|
||||
|
||||
// Create timeline directory.
|
||||
std::fs::create_dir_all(&self.timeline_dir)?;
|
||||
fs::create_dir_all(&self.timeline_dir).await?;
|
||||
|
||||
// Write timeline to disk and TODO: start background tasks.
|
||||
match || -> Result<()> {
|
||||
shared_state.sk.persist()?;
|
||||
// TODO: add more initialization steps here
|
||||
self.update_status(shared_state);
|
||||
Ok(())
|
||||
}() {
|
||||
Ok(_) => Ok(()),
|
||||
Err(e) => {
|
||||
// Bootstrap failed, cancel timeline and remove timeline directory.
|
||||
self.cancel(shared_state);
|
||||
if let Err(e) = shared_state.sk.persist().await {
|
||||
// Bootstrap failed, cancel timeline and remove timeline directory.
|
||||
self.cancel(shared_state);
|
||||
|
||||
if let Err(fs_err) = std::fs::remove_dir_all(&self.timeline_dir) {
|
||||
warn!(
|
||||
"failed to remove timeline {} directory after bootstrap failure: {}",
|
||||
self.ttid, fs_err
|
||||
);
|
||||
}
|
||||
|
||||
Err(e)
|
||||
if let Err(fs_err) = fs::remove_dir_all(&self.timeline_dir).await {
|
||||
warn!(
|
||||
"failed to remove timeline {} directory after bootstrap failure: {}",
|
||||
self.ttid, fs_err
|
||||
);
|
||||
}
|
||||
|
||||
return Err(e);
|
||||
}
|
||||
|
||||
// TODO: add more initialization steps here
|
||||
self.update_status(shared_state);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Delete timeline from disk completely, by removing timeline directory. Background
|
||||
/// timeline activities will stop eventually.
|
||||
pub fn delete_from_disk(
|
||||
pub async fn delete_from_disk(
|
||||
&self,
|
||||
shared_state: &mut MutexGuard<SharedState>,
|
||||
shared_state: &mut MutexGuard<'_, SharedState>,
|
||||
) -> Result<(bool, bool)> {
|
||||
let was_active = shared_state.active;
|
||||
self.cancel(shared_state);
|
||||
let dir_existed = delete_dir(&self.timeline_dir)?;
|
||||
let dir_existed = delete_dir(&self.timeline_dir).await?;
|
||||
Ok((dir_existed, was_active))
|
||||
}
|
||||
|
||||
/// Cancel timeline to prevent further usage. Background tasks will stop
|
||||
/// eventually after receiving cancellation signal.
|
||||
fn cancel(&self, shared_state: &mut MutexGuard<SharedState>) {
|
||||
///
|
||||
/// Note that we can't notify backup launcher here while holding
|
||||
/// shared_state lock, as this is a potential deadlock: caller is
|
||||
/// responsible for that. Generally we should probably make WAL backup tasks
|
||||
/// to shut down on their own, checking once in a while whether it is the
|
||||
/// time.
|
||||
fn cancel(&self, shared_state: &mut MutexGuard<'_, SharedState>) {
|
||||
info!("timeline {} is cancelled", self.ttid);
|
||||
let _ = self.cancellation_tx.send(true);
|
||||
let res = self.wal_backup_launcher_tx.blocking_send(self.ttid);
|
||||
if let Err(e) = res {
|
||||
error!("Failed to send stop signal to wal_backup_launcher: {}", e);
|
||||
}
|
||||
// Close associated FDs. Nobody will be able to touch timeline data once
|
||||
// it is cancelled, so WAL storage won't be opened again.
|
||||
shared_state.sk.wal_store.close();
|
||||
@@ -433,8 +433,8 @@ impl Timeline {
|
||||
}
|
||||
|
||||
/// Take a writing mutual exclusive lock on timeline shared_state.
|
||||
pub fn write_shared_state(&self) -> MutexGuard<SharedState> {
|
||||
self.mutex.lock()
|
||||
pub async fn write_shared_state(&self) -> MutexGuard<SharedState> {
|
||||
self.mutex.lock().await
|
||||
}
|
||||
|
||||
fn update_status(&self, shared_state: &mut SharedState) -> bool {
|
||||
@@ -450,7 +450,7 @@ impl Timeline {
|
||||
|
||||
let is_wal_backup_action_pending: bool;
|
||||
{
|
||||
let mut shared_state = self.write_shared_state();
|
||||
let mut shared_state = self.write_shared_state().await;
|
||||
shared_state.num_computes += 1;
|
||||
is_wal_backup_action_pending = self.update_status(&mut shared_state);
|
||||
}
|
||||
@@ -464,22 +464,17 @@ impl Timeline {
|
||||
|
||||
/// De-register compute connection, shutting down timeline activity if
|
||||
/// pageserver doesn't need catchup.
|
||||
pub fn on_compute_disconnect(&self) -> Result<()> {
|
||||
pub async fn on_compute_disconnect(&self) -> Result<()> {
|
||||
let is_wal_backup_action_pending: bool;
|
||||
{
|
||||
let mut shared_state = self.write_shared_state();
|
||||
let mut shared_state = self.write_shared_state().await;
|
||||
shared_state.num_computes -= 1;
|
||||
is_wal_backup_action_pending = self.update_status(&mut shared_state);
|
||||
}
|
||||
// Wake up wal backup launcher, if it is time to stop the offloading.
|
||||
if is_wal_backup_action_pending {
|
||||
// Can fail only if channel to a static thread got closed, which is not normal at all.
|
||||
//
|
||||
// Note: this is blocking_send because on_compute_disconnect is called in Drop, there is
|
||||
// no async Drop and we use current thread runtimes. With current thread rt spawning
|
||||
// task in drop impl is racy, as thread along with runtime might finish before the task.
|
||||
// This should be switched send.await when/if we go to full async.
|
||||
self.wal_backup_launcher_tx.blocking_send(self.ttid)?;
|
||||
self.wal_backup_launcher_tx.send(self.ttid).await?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
@@ -489,11 +484,11 @@ impl Timeline {
|
||||
/// computes. While there might be nothing to stream already, we learn about
|
||||
/// remote_consistent_lsn update through replication feedback, and we want
|
||||
/// to stop pushing to the broker if pageserver is fully caughtup.
|
||||
pub fn should_walsender_stop(&self, reported_remote_consistent_lsn: Lsn) -> bool {
|
||||
pub async fn should_walsender_stop(&self, reported_remote_consistent_lsn: Lsn) -> bool {
|
||||
if self.is_cancelled() {
|
||||
return true;
|
||||
}
|
||||
let shared_state = self.write_shared_state();
|
||||
let shared_state = self.write_shared_state().await;
|
||||
if shared_state.num_computes == 0 {
|
||||
return shared_state.sk.inmem.commit_lsn == Lsn(0) || // no data at all yet
|
||||
reported_remote_consistent_lsn >= shared_state.sk.inmem.commit_lsn;
|
||||
@@ -503,12 +498,12 @@ impl Timeline {
|
||||
|
||||
/// Returns whether s3 offloading is required and sets current status as
|
||||
/// matching it.
|
||||
pub fn wal_backup_attend(&self) -> bool {
|
||||
pub async fn wal_backup_attend(&self) -> bool {
|
||||
if self.is_cancelled() {
|
||||
return false;
|
||||
}
|
||||
|
||||
self.write_shared_state().wal_backup_attend()
|
||||
self.write_shared_state().await.wal_backup_attend()
|
||||
}
|
||||
|
||||
/// Returns commit_lsn watch channel.
|
||||
@@ -517,7 +512,7 @@ impl Timeline {
|
||||
}
|
||||
|
||||
/// Pass arrived message to the safekeeper.
|
||||
pub fn process_msg(
|
||||
pub async fn process_msg(
|
||||
&self,
|
||||
msg: &ProposerAcceptorMessage,
|
||||
) -> Result<Option<AcceptorProposerMessage>> {
|
||||
@@ -528,8 +523,8 @@ impl Timeline {
|
||||
let mut rmsg: Option<AcceptorProposerMessage>;
|
||||
let commit_lsn: Lsn;
|
||||
{
|
||||
let mut shared_state = self.write_shared_state();
|
||||
rmsg = shared_state.sk.process_msg(msg)?;
|
||||
let mut shared_state = self.write_shared_state().await;
|
||||
rmsg = shared_state.sk.process_msg(msg).await?;
|
||||
|
||||
// if this is AppendResponse, fill in proper pageserver and hot
|
||||
// standby feedback.
|
||||
@@ -546,37 +541,37 @@ impl Timeline {
|
||||
}
|
||||
|
||||
/// Returns wal_seg_size.
|
||||
pub fn get_wal_seg_size(&self) -> usize {
|
||||
self.write_shared_state().get_wal_seg_size()
|
||||
pub async fn get_wal_seg_size(&self) -> usize {
|
||||
self.write_shared_state().await.get_wal_seg_size()
|
||||
}
|
||||
|
||||
/// Returns true only if the timeline is loaded and active.
|
||||
pub fn is_active(&self) -> bool {
|
||||
pub async fn is_active(&self) -> bool {
|
||||
if self.is_cancelled() {
|
||||
return false;
|
||||
}
|
||||
|
||||
self.write_shared_state().active
|
||||
self.write_shared_state().await.active
|
||||
}
|
||||
|
||||
/// Returns state of the timeline.
|
||||
pub fn get_state(&self) -> (SafekeeperMemState, SafeKeeperState) {
|
||||
let state = self.write_shared_state();
|
||||
pub async fn get_state(&self) -> (SafekeeperMemState, SafeKeeperState) {
|
||||
let state = self.write_shared_state().await;
|
||||
(state.sk.inmem.clone(), state.sk.state.clone())
|
||||
}
|
||||
|
||||
/// Returns latest backup_lsn.
|
||||
pub fn get_wal_backup_lsn(&self) -> Lsn {
|
||||
self.write_shared_state().sk.inmem.backup_lsn
|
||||
pub async fn get_wal_backup_lsn(&self) -> Lsn {
|
||||
self.write_shared_state().await.sk.inmem.backup_lsn
|
||||
}
|
||||
|
||||
/// Sets backup_lsn to the given value.
|
||||
pub fn set_wal_backup_lsn(&self, backup_lsn: Lsn) -> Result<()> {
|
||||
pub async fn set_wal_backup_lsn(&self, backup_lsn: Lsn) -> Result<()> {
|
||||
if self.is_cancelled() {
|
||||
bail!(TimelineError::Cancelled(self.ttid));
|
||||
}
|
||||
|
||||
let mut state = self.write_shared_state();
|
||||
let mut state = self.write_shared_state().await;
|
||||
state.sk.inmem.backup_lsn = max(state.sk.inmem.backup_lsn, backup_lsn);
|
||||
// we should check whether to shut down offloader, but this will be done
|
||||
// soon by peer communication anyway.
|
||||
@@ -584,8 +579,8 @@ impl Timeline {
|
||||
}
|
||||
|
||||
/// Get safekeeper info for broadcasting to broker and other peers.
|
||||
pub fn get_safekeeper_info(&self, conf: &SafeKeeperConf) -> SafekeeperTimelineInfo {
|
||||
let shared_state = self.write_shared_state();
|
||||
pub async fn get_safekeeper_info(&self, conf: &SafeKeeperConf) -> SafekeeperTimelineInfo {
|
||||
let shared_state = self.write_shared_state().await;
|
||||
shared_state.get_safekeeper_info(
|
||||
&self.ttid,
|
||||
conf,
|
||||
@@ -604,8 +599,8 @@ impl Timeline {
|
||||
let is_wal_backup_action_pending: bool;
|
||||
let commit_lsn: Lsn;
|
||||
{
|
||||
let mut shared_state = self.write_shared_state();
|
||||
shared_state.sk.record_safekeeper_info(&sk_info)?;
|
||||
let mut shared_state = self.write_shared_state().await;
|
||||
shared_state.sk.record_safekeeper_info(&sk_info).await?;
|
||||
let peer_info = PeerInfo::from_sk_info(&sk_info, Instant::now());
|
||||
shared_state.peers_info.upsert(&peer_info);
|
||||
is_wal_backup_action_pending = self.update_status(&mut shared_state);
|
||||
@@ -622,8 +617,8 @@ impl Timeline {
|
||||
/// Get our latest view of alive peers status on the timeline.
|
||||
/// We pass our own info through the broker as well, so when we don't have connection
|
||||
/// to the broker returned vec is empty.
|
||||
pub fn get_peers(&self, conf: &SafeKeeperConf) -> Vec<PeerInfo> {
|
||||
let shared_state = self.write_shared_state();
|
||||
pub async fn get_peers(&self, conf: &SafeKeeperConf) -> Vec<PeerInfo> {
|
||||
let shared_state = self.write_shared_state().await;
|
||||
let now = Instant::now();
|
||||
shared_state
|
||||
.peers_info
|
||||
@@ -640,34 +635,34 @@ impl Timeline {
|
||||
}
|
||||
|
||||
/// Returns flush_lsn.
|
||||
pub fn get_flush_lsn(&self) -> Lsn {
|
||||
self.write_shared_state().sk.wal_store.flush_lsn()
|
||||
pub async fn get_flush_lsn(&self) -> Lsn {
|
||||
self.write_shared_state().await.sk.wal_store.flush_lsn()
|
||||
}
|
||||
|
||||
/// Delete WAL segments from disk that are no longer needed. This is determined
|
||||
/// based on pageserver's remote_consistent_lsn and local backup_lsn/peer_lsn.
|
||||
pub fn remove_old_wal(&self, wal_backup_enabled: bool) -> Result<()> {
|
||||
pub async fn remove_old_wal(&self, wal_backup_enabled: bool) -> Result<()> {
|
||||
if self.is_cancelled() {
|
||||
bail!(TimelineError::Cancelled(self.ttid));
|
||||
}
|
||||
|
||||
let horizon_segno: XLogSegNo;
|
||||
let remover: Box<dyn Fn(u64) -> Result<(), anyhow::Error>>;
|
||||
{
|
||||
let shared_state = self.write_shared_state();
|
||||
let remover = {
|
||||
let shared_state = self.write_shared_state().await;
|
||||
horizon_segno = shared_state.sk.get_horizon_segno(wal_backup_enabled);
|
||||
remover = shared_state.sk.wal_store.remove_up_to();
|
||||
if horizon_segno <= 1 || horizon_segno <= shared_state.last_removed_segno {
|
||||
return Ok(());
|
||||
return Ok(()); // nothing to do
|
||||
}
|
||||
let remover = shared_state.sk.wal_store.remove_up_to(horizon_segno - 1);
|
||||
// release the lock before removing
|
||||
}
|
||||
remover
|
||||
};
|
||||
|
||||
// delete old WAL files
|
||||
remover(horizon_segno - 1)?;
|
||||
remover.await?;
|
||||
|
||||
// update last_removed_segno
|
||||
let mut shared_state = self.write_shared_state();
|
||||
let mut shared_state = self.write_shared_state().await;
|
||||
shared_state.last_removed_segno = horizon_segno;
|
||||
Ok(())
|
||||
}
|
||||
@@ -676,22 +671,24 @@ impl Timeline {
|
||||
/// passed after the last save. This helps to keep remote_consistent_lsn up
|
||||
/// to date so that storage nodes restart doesn't cause many pageserver ->
|
||||
/// safekeeper reconnections.
|
||||
pub fn maybe_pesist_control_file(&self) -> Result<()> {
|
||||
pub async fn maybe_persist_control_file(&self) -> Result<()> {
|
||||
let remote_consistent_lsn = self.walsenders.get_remote_consistent_lsn();
|
||||
self.write_shared_state()
|
||||
.await
|
||||
.sk
|
||||
.maybe_persist_control_file(remote_consistent_lsn)
|
||||
.await
|
||||
}
|
||||
|
||||
/// Returns full timeline info, required for the metrics. If the timeline is
|
||||
/// not active, returns None instead.
|
||||
pub fn info_for_metrics(&self) -> Option<FullTimelineInfo> {
|
||||
/// Gather timeline data for metrics. If the timeline is not active, returns
|
||||
/// None, we do not collect these.
|
||||
pub async fn info_for_metrics(&self) -> Option<FullTimelineInfo> {
|
||||
if self.is_cancelled() {
|
||||
return None;
|
||||
}
|
||||
|
||||
let ps_feedback = self.walsenders.get_ps_feedback();
|
||||
let state = self.write_shared_state();
|
||||
let state = self.write_shared_state().await;
|
||||
if state.active {
|
||||
Some(FullTimelineInfo {
|
||||
ttid: self.ttid,
|
||||
@@ -713,8 +710,8 @@ impl Timeline {
|
||||
}
|
||||
|
||||
/// Returns in-memory timeline state to build a full debug dump.
|
||||
pub fn memory_dump(&self) -> debug_dump::Memory {
|
||||
let state = self.write_shared_state();
|
||||
pub async fn memory_dump(&self) -> debug_dump::Memory {
|
||||
let state = self.write_shared_state().await;
|
||||
|
||||
let (write_lsn, write_record_lsn, flush_lsn, file_open) =
|
||||
state.sk.wal_store.internal_state();
|
||||
@@ -738,8 +735,8 @@ impl Timeline {
|
||||
}
|
||||
|
||||
/// Deletes directory and it's contents. Returns false if directory does not exist.
|
||||
fn delete_dir(path: &PathBuf) -> Result<bool> {
|
||||
match std::fs::remove_dir_all(path) {
|
||||
async fn delete_dir(path: &PathBuf) -> Result<bool> {
|
||||
match fs::remove_dir_all(path).await {
|
||||
Ok(_) => Ok(true),
|
||||
Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(false),
|
||||
Err(e) => Err(e.into()),
|
||||
|
||||
@@ -113,9 +113,17 @@ impl GlobalTimelines {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Loads all timelines for the given tenant to memory. Returns fs::read_dir errors if any.
|
||||
/// Loads all timelines for the given tenant to memory. Returns fs::read_dir
|
||||
/// errors if any.
|
||||
///
|
||||
/// Note: This function (and all reading/loading below) is sync because
|
||||
/// timelines are loaded while holding GlobalTimelinesState lock. Which is
|
||||
/// fine as this is called only from single threaded main runtime on boot,
|
||||
/// but clippy complains anyway, and suppressing that isn't trivial as async
|
||||
/// is the keyword, ha. That only other user is pull_timeline.rs for which
|
||||
/// being blocked is not that bad, and we can do spawn_blocking.
|
||||
fn load_tenant_timelines(
|
||||
state: &mut MutexGuard<GlobalTimelinesState>,
|
||||
state: &mut MutexGuard<'_, GlobalTimelinesState>,
|
||||
tenant_id: TenantId,
|
||||
) -> Result<()> {
|
||||
let timelines_dir = state.get_conf().tenant_dir(&tenant_id);
|
||||
@@ -220,7 +228,7 @@ impl GlobalTimelines {
|
||||
// Take a lock and finish the initialization holding this mutex. No other threads
|
||||
// can interfere with creation after we will insert timeline into the map.
|
||||
{
|
||||
let mut shared_state = timeline.write_shared_state();
|
||||
let mut shared_state = timeline.write_shared_state().await;
|
||||
|
||||
// We can get a race condition here in case of concurrent create calls, but only
|
||||
// in theory. create() will return valid timeline on the next try.
|
||||
@@ -232,7 +240,7 @@ impl GlobalTimelines {
|
||||
// Write the new timeline to the disk and start background workers.
|
||||
// Bootstrap is transactional, so if it fails, the timeline will be deleted,
|
||||
// and the state on disk should remain unchanged.
|
||||
if let Err(e) = timeline.bootstrap(&mut shared_state) {
|
||||
if let Err(e) = timeline.bootstrap(&mut shared_state).await {
|
||||
// Note: the most likely reason for bootstrap failure is that the timeline
|
||||
// directory already exists on disk. This happens when timeline is corrupted
|
||||
// and wasn't loaded from disk on startup because of that. We want to preserve
|
||||
@@ -294,15 +302,16 @@ impl GlobalTimelines {
|
||||
}
|
||||
|
||||
/// Cancels timeline, then deletes the corresponding data directory.
|
||||
pub fn delete_force(ttid: &TenantTimelineId) -> Result<TimelineDeleteForceResult> {
|
||||
pub async fn delete_force(ttid: &TenantTimelineId) -> Result<TimelineDeleteForceResult> {
|
||||
let tli_res = TIMELINES_STATE.lock().unwrap().get(ttid);
|
||||
match tli_res {
|
||||
Ok(timeline) => {
|
||||
// Take a lock and finish the deletion holding this mutex.
|
||||
let mut shared_state = timeline.write_shared_state();
|
||||
let mut shared_state = timeline.write_shared_state().await;
|
||||
|
||||
info!("deleting timeline {}", ttid);
|
||||
let (dir_existed, was_active) = timeline.delete_from_disk(&mut shared_state)?;
|
||||
let (dir_existed, was_active) =
|
||||
timeline.delete_from_disk(&mut shared_state).await?;
|
||||
|
||||
// Remove timeline from the map.
|
||||
// FIXME: re-enable it once we fix the issue with recreation of deleted timelines
|
||||
@@ -335,7 +344,7 @@ impl GlobalTimelines {
|
||||
/// the tenant had, `true` if a timeline was active. There may be a race if new timelines are
|
||||
/// created simultaneously. In that case the function will return error and the caller should
|
||||
/// retry tenant deletion again later.
|
||||
pub fn delete_force_all_for_tenant(
|
||||
pub async fn delete_force_all_for_tenant(
|
||||
tenant_id: &TenantId,
|
||||
) -> Result<HashMap<TenantTimelineId, TimelineDeleteForceResult>> {
|
||||
info!("deleting all timelines for tenant {}", tenant_id);
|
||||
@@ -345,7 +354,7 @@ impl GlobalTimelines {
|
||||
|
||||
let mut deleted = HashMap::new();
|
||||
for tli in &to_delete {
|
||||
match Self::delete_force(&tli.ttid) {
|
||||
match Self::delete_force(&tli.ttid).await {
|
||||
Ok(result) => {
|
||||
deleted.insert(tli.ttid, result);
|
||||
}
|
||||
|
||||
@@ -17,7 +17,6 @@ use postgres_ffi::XLogFileName;
|
||||
use postgres_ffi::{XLogSegNo, PG_TLI};
|
||||
use remote_storage::{GenericRemoteStorage, RemotePath};
|
||||
use tokio::fs::File;
|
||||
use tokio::runtime::Builder;
|
||||
|
||||
use tokio::select;
|
||||
use tokio::sync::mpsc::{self, Receiver, Sender};
|
||||
@@ -36,30 +35,16 @@ use once_cell::sync::OnceCell;
|
||||
const UPLOAD_FAILURE_RETRY_MIN_MS: u64 = 10;
|
||||
const UPLOAD_FAILURE_RETRY_MAX_MS: u64 = 5000;
|
||||
|
||||
pub fn wal_backup_launcher_thread_main(
|
||||
conf: SafeKeeperConf,
|
||||
wal_backup_launcher_rx: Receiver<TenantTimelineId>,
|
||||
) {
|
||||
let mut builder = Builder::new_multi_thread();
|
||||
if let Some(num_threads) = conf.backup_runtime_threads {
|
||||
builder.worker_threads(num_threads);
|
||||
}
|
||||
let rt = builder
|
||||
.enable_all()
|
||||
.build()
|
||||
.expect("failed to create wal backup runtime");
|
||||
|
||||
rt.block_on(async {
|
||||
wal_backup_launcher_main_loop(conf, wal_backup_launcher_rx).await;
|
||||
});
|
||||
}
|
||||
|
||||
/// Check whether wal backup is required for timeline. If yes, mark that launcher is
|
||||
/// aware of current status and return the timeline.
|
||||
fn is_wal_backup_required(ttid: TenantTimelineId) -> Option<Arc<Timeline>> {
|
||||
GlobalTimelines::get(ttid)
|
||||
.ok()
|
||||
.filter(|tli| tli.wal_backup_attend())
|
||||
async fn is_wal_backup_required(ttid: TenantTimelineId) -> Option<Arc<Timeline>> {
|
||||
match GlobalTimelines::get(ttid).ok() {
|
||||
Some(tli) => {
|
||||
tli.wal_backup_attend().await;
|
||||
Some(tli)
|
||||
}
|
||||
None => None,
|
||||
}
|
||||
}
|
||||
|
||||
struct WalBackupTaskHandle {
|
||||
@@ -143,8 +128,8 @@ async fn update_task(
|
||||
ttid: TenantTimelineId,
|
||||
entry: &mut WalBackupTimelineEntry,
|
||||
) {
|
||||
let alive_peers = entry.timeline.get_peers(conf);
|
||||
let wal_backup_lsn = entry.timeline.get_wal_backup_lsn();
|
||||
let alive_peers = entry.timeline.get_peers(conf).await;
|
||||
let wal_backup_lsn = entry.timeline.get_wal_backup_lsn().await;
|
||||
let (offloader, election_dbg_str) =
|
||||
determine_offloader(&alive_peers, wal_backup_lsn, ttid, conf);
|
||||
let elected_me = Some(conf.my_id) == offloader;
|
||||
@@ -183,10 +168,10 @@ const CHECK_TASKS_INTERVAL_MSEC: u64 = 1000;
|
||||
/// Sits on wal_backup_launcher_rx and starts/stops per timeline wal backup
|
||||
/// tasks. Having this in separate task simplifies locking, allows to reap
|
||||
/// panics and separate elections from offloading itself.
|
||||
async fn wal_backup_launcher_main_loop(
|
||||
pub async fn wal_backup_launcher_task_main(
|
||||
conf: SafeKeeperConf,
|
||||
mut wal_backup_launcher_rx: Receiver<TenantTimelineId>,
|
||||
) {
|
||||
) -> anyhow::Result<()> {
|
||||
info!(
|
||||
"WAL backup launcher started, remote config {:?}",
|
||||
conf.remote_storage
|
||||
@@ -214,7 +199,7 @@ async fn wal_backup_launcher_main_loop(
|
||||
if conf.remote_storage.is_none() || !conf.wal_backup_enabled {
|
||||
continue; /* just drain the channel and do nothing */
|
||||
}
|
||||
let timeline = is_wal_backup_required(ttid);
|
||||
let timeline = is_wal_backup_required(ttid).await;
|
||||
// do we need to do anything at all?
|
||||
if timeline.is_some() != tasks.contains_key(&ttid) {
|
||||
if let Some(timeline) = timeline {
|
||||
@@ -269,7 +254,7 @@ async fn backup_task_main(
|
||||
let tli = res.unwrap();
|
||||
|
||||
let mut wb = WalBackupTask {
|
||||
wal_seg_size: tli.get_wal_seg_size(),
|
||||
wal_seg_size: tli.get_wal_seg_size().await,
|
||||
commit_lsn_watch_rx: tli.get_commit_lsn_watch_rx(),
|
||||
timeline: tli,
|
||||
timeline_dir,
|
||||
@@ -326,7 +311,7 @@ impl WalBackupTask {
|
||||
continue; /* nothing to do, common case as we wake up on every commit_lsn bump */
|
||||
}
|
||||
// Perhaps peers advanced the position, check shmem value.
|
||||
backup_lsn = self.timeline.get_wal_backup_lsn();
|
||||
backup_lsn = self.timeline.get_wal_backup_lsn().await;
|
||||
if backup_lsn.segment_number(self.wal_seg_size)
|
||||
>= commit_lsn.segment_number(self.wal_seg_size)
|
||||
{
|
||||
@@ -402,6 +387,7 @@ pub async fn backup_lsn_range(
|
||||
let new_backup_lsn = segment.end_lsn;
|
||||
timeline
|
||||
.set_wal_backup_lsn(new_backup_lsn)
|
||||
.await
|
||||
.context("setting wal_backup_lsn")?;
|
||||
*backup_lsn = new_backup_lsn;
|
||||
} else {
|
||||
|
||||
@@ -4,7 +4,7 @@
|
||||
//!
|
||||
use anyhow::{Context, Result};
|
||||
use postgres_backend::QueryError;
|
||||
use std::{future, thread, time::Duration};
|
||||
use std::{future, time::Duration};
|
||||
use tokio::net::TcpStream;
|
||||
use tokio_io_timeout::TimeoutReader;
|
||||
use tracing::*;
|
||||
@@ -16,104 +16,82 @@ use crate::SafeKeeperConf;
|
||||
use postgres_backend::{AuthType, PostgresBackend};
|
||||
|
||||
/// Accept incoming TCP connections and spawn them into a background thread.
|
||||
pub fn thread_main(conf: SafeKeeperConf, pg_listener: std::net::TcpListener) {
|
||||
let runtime = tokio::runtime::Builder::new_current_thread()
|
||||
.enable_all()
|
||||
.build()
|
||||
.context("create runtime")
|
||||
// todo catch error in main thread
|
||||
.expect("failed to create runtime");
|
||||
pub async fn task_main(
|
||||
conf: SafeKeeperConf,
|
||||
pg_listener: std::net::TcpListener,
|
||||
) -> anyhow::Result<()> {
|
||||
// Tokio's from_std won't do this for us, per its comment.
|
||||
pg_listener.set_nonblocking(true)?;
|
||||
|
||||
runtime
|
||||
.block_on(async move {
|
||||
// Tokio's from_std won't do this for us, per its comment.
|
||||
pg_listener.set_nonblocking(true)?;
|
||||
let listener = tokio::net::TcpListener::from_std(pg_listener)?;
|
||||
let mut connection_count: ConnectionCount = 0;
|
||||
let listener = tokio::net::TcpListener::from_std(pg_listener)?;
|
||||
let mut connection_count: ConnectionCount = 0;
|
||||
|
||||
loop {
|
||||
match listener.accept().await {
|
||||
Ok((socket, peer_addr)) => {
|
||||
debug!("accepted connection from {}", peer_addr);
|
||||
let conf = conf.clone();
|
||||
let conn_id = issue_connection_id(&mut connection_count);
|
||||
loop {
|
||||
let (socket, peer_addr) = listener.accept().await.context("accept")?;
|
||||
debug!("accepted connection from {}", peer_addr);
|
||||
let conf = conf.clone();
|
||||
let conn_id = issue_connection_id(&mut connection_count);
|
||||
|
||||
let _ = thread::Builder::new()
|
||||
.name("WAL service thread".into())
|
||||
.spawn(move || {
|
||||
if let Err(err) = handle_socket(socket, conf, conn_id) {
|
||||
error!("connection handler exited: {}", err);
|
||||
}
|
||||
})
|
||||
.unwrap();
|
||||
}
|
||||
Err(e) => error!("Failed to accept connection: {}", e),
|
||||
}
|
||||
tokio::spawn(async move {
|
||||
if let Err(err) = handle_socket(socket, conf, conn_id)
|
||||
.instrument(info_span!("", cid = %conn_id))
|
||||
.await
|
||||
{
|
||||
error!("connection handler exited: {}", err);
|
||||
}
|
||||
#[allow(unreachable_code)] // hint compiler the closure return type
|
||||
Ok::<(), anyhow::Error>(())
|
||||
})
|
||||
.expect("listener failed")
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
/// This is run by `thread_main` above, inside a background thread.
|
||||
/// This is run by `task_main` above, inside a background thread.
|
||||
///
|
||||
fn handle_socket(
|
||||
async fn handle_socket(
|
||||
socket: TcpStream,
|
||||
conf: SafeKeeperConf,
|
||||
conn_id: ConnectionId,
|
||||
) -> Result<(), QueryError> {
|
||||
let _enter = info_span!("", cid = %conn_id).entered();
|
||||
|
||||
let runtime = tokio::runtime::Builder::new_current_thread()
|
||||
.enable_all()
|
||||
.build()?;
|
||||
|
||||
socket.set_nodelay(true)?;
|
||||
let peer_addr = socket.peer_addr()?;
|
||||
|
||||
// TimeoutReader wants async runtime during creation.
|
||||
runtime.block_on(async move {
|
||||
// Set timeout on reading from the socket. It prevents hanged up connection
|
||||
// if client suddenly disappears. Note that TCP_KEEPALIVE is not enabled by
|
||||
// default, and tokio doesn't provide ability to set it out of the box.
|
||||
let mut socket = TimeoutReader::new(socket);
|
||||
let wal_service_timeout = Duration::from_secs(60 * 10);
|
||||
socket.set_timeout(Some(wal_service_timeout));
|
||||
// pin! is here because TimeoutReader (due to storing sleep future inside)
|
||||
// is not Unpin, and all pgbackend/framed/tokio dependencies require stream
|
||||
// to be Unpin. Which is reasonable, as indeed something like TimeoutReader
|
||||
// shouldn't be moved.
|
||||
tokio::pin!(socket);
|
||||
// Set timeout on reading from the socket. It prevents hanged up connection
|
||||
// if client suddenly disappears. Note that TCP_KEEPALIVE is not enabled by
|
||||
// default, and tokio doesn't provide ability to set it out of the box.
|
||||
let mut socket = TimeoutReader::new(socket);
|
||||
let wal_service_timeout = Duration::from_secs(60 * 10);
|
||||
socket.set_timeout(Some(wal_service_timeout));
|
||||
// pin! is here because TimeoutReader (due to storing sleep future inside)
|
||||
// is not Unpin, and all pgbackend/framed/tokio dependencies require stream
|
||||
// to be Unpin. Which is reasonable, as indeed something like TimeoutReader
|
||||
// shouldn't be moved.
|
||||
tokio::pin!(socket);
|
||||
|
||||
let traffic_metrics = TrafficMetrics::new();
|
||||
if let Some(current_az) = conf.availability_zone.as_deref() {
|
||||
traffic_metrics.set_sk_az(current_az);
|
||||
}
|
||||
let traffic_metrics = TrafficMetrics::new();
|
||||
if let Some(current_az) = conf.availability_zone.as_deref() {
|
||||
traffic_metrics.set_sk_az(current_az);
|
||||
}
|
||||
|
||||
let socket = MeasuredStream::new(
|
||||
socket,
|
||||
|cnt| {
|
||||
traffic_metrics.observe_read(cnt);
|
||||
},
|
||||
|cnt| {
|
||||
traffic_metrics.observe_write(cnt);
|
||||
},
|
||||
);
|
||||
let socket = MeasuredStream::new(
|
||||
socket,
|
||||
|cnt| {
|
||||
traffic_metrics.observe_read(cnt);
|
||||
},
|
||||
|cnt| {
|
||||
traffic_metrics.observe_write(cnt);
|
||||
},
|
||||
);
|
||||
|
||||
let auth_type = match conf.auth {
|
||||
None => AuthType::Trust,
|
||||
Some(_) => AuthType::NeonJWT,
|
||||
};
|
||||
let mut conn_handler =
|
||||
SafekeeperPostgresHandler::new(conf, conn_id, Some(traffic_metrics.clone()));
|
||||
let pgbackend = PostgresBackend::new_from_io(socket, peer_addr, auth_type, None)?;
|
||||
// libpq protocol between safekeeper and walproposer / pageserver
|
||||
// We don't use shutdown.
|
||||
pgbackend
|
||||
.run(&mut conn_handler, future::pending::<()>)
|
||||
.await
|
||||
})
|
||||
let auth_type = match conf.auth {
|
||||
None => AuthType::Trust,
|
||||
Some(_) => AuthType::NeonJWT,
|
||||
};
|
||||
let mut conn_handler =
|
||||
SafekeeperPostgresHandler::new(conf, conn_id, Some(traffic_metrics.clone()));
|
||||
let pgbackend = PostgresBackend::new_from_io(socket, peer_addr, auth_type, None)?;
|
||||
// libpq protocol between safekeeper and walproposer / pageserver
|
||||
// We don't use shutdown.
|
||||
pgbackend
|
||||
.run(&mut conn_handler, future::pending::<()>)
|
||||
.await
|
||||
}
|
||||
|
||||
/// Unique WAL service connection ids are logged in spans for observability.
|
||||
|
||||
@@ -8,54 +8,47 @@
|
||||
//! Note that last file has `.partial` suffix, that's different from postgres.
|
||||
|
||||
use anyhow::{bail, Context, Result};
|
||||
use remote_storage::RemotePath;
|
||||
|
||||
use std::io::{self, Seek, SeekFrom};
|
||||
use std::pin::Pin;
|
||||
use tokio::io::AsyncRead;
|
||||
|
||||
use bytes::Bytes;
|
||||
use futures::future::BoxFuture;
|
||||
use postgres_ffi::v14::xlog_utils::{IsPartialXLogFileName, IsXLogFileName, XLogFromFileName};
|
||||
use postgres_ffi::{XLogSegNo, PG_TLI};
|
||||
use remote_storage::RemotePath;
|
||||
use std::cmp::{max, min};
|
||||
|
||||
use bytes::Bytes;
|
||||
use std::fs::{self, remove_file, File, OpenOptions};
|
||||
use std::io::Write;
|
||||
use std::io::{self, SeekFrom};
|
||||
use std::path::{Path, PathBuf};
|
||||
|
||||
use std::pin::Pin;
|
||||
use tokio::fs::{self, remove_file, File, OpenOptions};
|
||||
use tokio::io::{AsyncRead, AsyncWriteExt};
|
||||
use tokio::io::{AsyncReadExt, AsyncSeekExt};
|
||||
use tracing::*;
|
||||
|
||||
use utils::{id::TenantTimelineId, lsn::Lsn};
|
||||
|
||||
use crate::metrics::{time_io_closure, WalStorageMetrics, REMOVED_WAL_SEGMENTS};
|
||||
use crate::safekeeper::SafeKeeperState;
|
||||
|
||||
use crate::wal_backup::read_object;
|
||||
use crate::SafeKeeperConf;
|
||||
use postgres_ffi::waldecoder::WalStreamDecoder;
|
||||
use postgres_ffi::XLogFileName;
|
||||
use postgres_ffi::XLOG_BLCKSZ;
|
||||
|
||||
use postgres_ffi::waldecoder::WalStreamDecoder;
|
||||
|
||||
use pq_proto::SystemId;
|
||||
use tokio::io::{AsyncReadExt, AsyncSeekExt};
|
||||
use utils::{id::TenantTimelineId, lsn::Lsn};
|
||||
|
||||
#[async_trait::async_trait]
|
||||
pub trait Storage {
|
||||
/// LSN of last durably stored WAL record.
|
||||
fn flush_lsn(&self) -> Lsn;
|
||||
|
||||
/// Write piece of WAL from buf to disk, but not necessarily sync it.
|
||||
fn write_wal(&mut self, startpos: Lsn, buf: &[u8]) -> Result<()>;
|
||||
async fn write_wal(&mut self, startpos: Lsn, buf: &[u8]) -> Result<()>;
|
||||
|
||||
/// Truncate WAL at specified LSN, which must be the end of WAL record.
|
||||
fn truncate_wal(&mut self, end_pos: Lsn) -> Result<()>;
|
||||
async fn truncate_wal(&mut self, end_pos: Lsn) -> Result<()>;
|
||||
|
||||
/// Durably store WAL on disk, up to the last written WAL record.
|
||||
fn flush_wal(&mut self) -> Result<()>;
|
||||
async fn flush_wal(&mut self) -> Result<()>;
|
||||
|
||||
/// Remove all segments <= given segno. Returns closure as we want to do
|
||||
/// that without timeline lock.
|
||||
fn remove_up_to(&self) -> Box<dyn Fn(XLogSegNo) -> Result<()>>;
|
||||
/// Remove all segments <= given segno. Returns function doing that as we
|
||||
/// want to perform it without timeline lock.
|
||||
fn remove_up_to(&self, segno_up_to: XLogSegNo) -> BoxFuture<'static, anyhow::Result<()>>;
|
||||
|
||||
/// Release resources associated with the storage -- technically, close FDs.
|
||||
/// Currently we don't remove timelines until restart (#3146), so need to
|
||||
@@ -105,6 +98,22 @@ pub struct PhysicalStorage {
|
||||
/// - points to write_lsn, so no seek is needed for writing
|
||||
/// - doesn't point to the end of the segment
|
||||
file: Option<File>,
|
||||
|
||||
/// When false, we have just initialized storage using the LSN from find_end_of_wal().
|
||||
/// In this case, [`write_lsn`] can be less than actually written WAL on disk. In particular,
|
||||
/// there can be a case with unexpected .partial file.
|
||||
///
|
||||
/// Imagine the following:
|
||||
/// - 000000010000000000000001
|
||||
/// - it was fully written, but the last record is split between 2 segments
|
||||
/// - after restart, find_end_of_wal() returned 0/1FFFFF0, which is in the end of this segment
|
||||
/// - write_lsn, write_record_lsn and flush_record_lsn were initialized to 0/1FFFFF0
|
||||
/// - 000000010000000000000002.partial
|
||||
/// - it has only 1 byte written, which is not enough to make a full WAL record
|
||||
///
|
||||
/// Partial segment 002 has no WAL records, and it will be removed by the next truncate_wal().
|
||||
/// This flag will be set to true after the first truncate_wal() call.
|
||||
is_truncated_after_restart: bool,
|
||||
}
|
||||
|
||||
impl PhysicalStorage {
|
||||
@@ -164,6 +173,7 @@ impl PhysicalStorage {
|
||||
flush_record_lsn: flush_lsn,
|
||||
decoder: WalStreamDecoder::new(write_lsn, state.server.pg_version / 10000),
|
||||
file: None,
|
||||
is_truncated_after_restart: false,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -178,33 +188,37 @@ impl PhysicalStorage {
|
||||
}
|
||||
|
||||
/// Call fdatasync if config requires so.
|
||||
fn fdatasync_file(&mut self, file: &mut File) -> Result<()> {
|
||||
async fn fdatasync_file(&mut self, file: &mut File) -> Result<()> {
|
||||
if !self.conf.no_sync {
|
||||
self.metrics
|
||||
.observe_flush_seconds(time_io_closure(|| Ok(file.sync_data()?))?);
|
||||
.observe_flush_seconds(time_io_closure(file.sync_data()).await?);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Call fsync if config requires so.
|
||||
fn fsync_file(&mut self, file: &mut File) -> Result<()> {
|
||||
async fn fsync_file(&mut self, file: &mut File) -> Result<()> {
|
||||
if !self.conf.no_sync {
|
||||
self.metrics
|
||||
.observe_flush_seconds(time_io_closure(|| Ok(file.sync_all()?))?);
|
||||
.observe_flush_seconds(time_io_closure(file.sync_all()).await?);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Open or create WAL segment file. Caller must call seek to the wanted position.
|
||||
/// Returns `file` and `is_partial`.
|
||||
fn open_or_create(&mut self, segno: XLogSegNo) -> Result<(File, bool)> {
|
||||
async fn open_or_create(&mut self, segno: XLogSegNo) -> Result<(File, bool)> {
|
||||
let (wal_file_path, wal_file_partial_path) =
|
||||
wal_file_paths(&self.timeline_dir, segno, self.wal_seg_size)?;
|
||||
|
||||
// Try to open already completed segment
|
||||
if let Ok(file) = OpenOptions::new().write(true).open(&wal_file_path) {
|
||||
if let Ok(file) = OpenOptions::new().write(true).open(&wal_file_path).await {
|
||||
Ok((file, false))
|
||||
} else if let Ok(file) = OpenOptions::new().write(true).open(&wal_file_partial_path) {
|
||||
} else if let Ok(file) = OpenOptions::new()
|
||||
.write(true)
|
||||
.open(&wal_file_partial_path)
|
||||
.await
|
||||
{
|
||||
// Try to open existing partial file
|
||||
Ok((file, true))
|
||||
} else {
|
||||
@@ -213,35 +227,36 @@ impl PhysicalStorage {
|
||||
.create(true)
|
||||
.write(true)
|
||||
.open(&wal_file_partial_path)
|
||||
.await
|
||||
.with_context(|| format!("Failed to open log file {:?}", &wal_file_path))?;
|
||||
|
||||
write_zeroes(&mut file, self.wal_seg_size)?;
|
||||
self.fsync_file(&mut file)?;
|
||||
write_zeroes(&mut file, self.wal_seg_size).await?;
|
||||
self.fsync_file(&mut file).await?;
|
||||
Ok((file, true))
|
||||
}
|
||||
}
|
||||
|
||||
/// Write WAL bytes, which are known to be located in a single WAL segment.
|
||||
fn write_in_segment(&mut self, segno: u64, xlogoff: usize, buf: &[u8]) -> Result<()> {
|
||||
async fn write_in_segment(&mut self, segno: u64, xlogoff: usize, buf: &[u8]) -> Result<()> {
|
||||
let mut file = if let Some(file) = self.file.take() {
|
||||
file
|
||||
} else {
|
||||
let (mut file, is_partial) = self.open_or_create(segno)?;
|
||||
let (mut file, is_partial) = self.open_or_create(segno).await?;
|
||||
assert!(is_partial, "unexpected write into non-partial segment file");
|
||||
file.seek(SeekFrom::Start(xlogoff as u64))?;
|
||||
file.seek(SeekFrom::Start(xlogoff as u64)).await?;
|
||||
file
|
||||
};
|
||||
|
||||
file.write_all(buf)?;
|
||||
file.write_all(buf).await?;
|
||||
|
||||
if xlogoff + buf.len() == self.wal_seg_size {
|
||||
// If we reached the end of a WAL segment, flush and close it.
|
||||
self.fdatasync_file(&mut file)?;
|
||||
self.fdatasync_file(&mut file).await?;
|
||||
|
||||
// Rename partial file to completed file
|
||||
let (wal_file_path, wal_file_partial_path) =
|
||||
wal_file_paths(&self.timeline_dir, segno, self.wal_seg_size)?;
|
||||
fs::rename(wal_file_partial_path, wal_file_path)?;
|
||||
fs::rename(wal_file_partial_path, wal_file_path).await?;
|
||||
} else {
|
||||
// otherwise, file can be reused later
|
||||
self.file = Some(file);
|
||||
@@ -255,11 +270,11 @@ impl PhysicalStorage {
|
||||
/// be flushed separately later.
|
||||
///
|
||||
/// Updates `write_lsn`.
|
||||
fn write_exact(&mut self, pos: Lsn, mut buf: &[u8]) -> Result<()> {
|
||||
async fn write_exact(&mut self, pos: Lsn, mut buf: &[u8]) -> Result<()> {
|
||||
if self.write_lsn != pos {
|
||||
// need to flush the file before discarding it
|
||||
if let Some(mut file) = self.file.take() {
|
||||
self.fdatasync_file(&mut file)?;
|
||||
self.fdatasync_file(&mut file).await?;
|
||||
}
|
||||
|
||||
self.write_lsn = pos;
|
||||
@@ -277,7 +292,8 @@ impl PhysicalStorage {
|
||||
buf.len()
|
||||
};
|
||||
|
||||
self.write_in_segment(segno, xlogoff, &buf[..bytes_write])?;
|
||||
self.write_in_segment(segno, xlogoff, &buf[..bytes_write])
|
||||
.await?;
|
||||
self.write_lsn += bytes_write as u64;
|
||||
buf = &buf[bytes_write..];
|
||||
}
|
||||
@@ -286,6 +302,7 @@ impl PhysicalStorage {
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl Storage for PhysicalStorage {
|
||||
/// flush_lsn returns LSN of last durably stored WAL record.
|
||||
fn flush_lsn(&self) -> Lsn {
|
||||
@@ -293,7 +310,7 @@ impl Storage for PhysicalStorage {
|
||||
}
|
||||
|
||||
/// Write WAL to disk.
|
||||
fn write_wal(&mut self, startpos: Lsn, buf: &[u8]) -> Result<()> {
|
||||
async fn write_wal(&mut self, startpos: Lsn, buf: &[u8]) -> Result<()> {
|
||||
// Disallow any non-sequential writes, which can result in gaps or overwrites.
|
||||
// If we need to move the pointer, use truncate_wal() instead.
|
||||
if self.write_lsn > startpos {
|
||||
@@ -311,7 +328,7 @@ impl Storage for PhysicalStorage {
|
||||
);
|
||||
}
|
||||
|
||||
let write_seconds = time_io_closure(|| self.write_exact(startpos, buf))?;
|
||||
let write_seconds = time_io_closure(self.write_exact(startpos, buf)).await?;
|
||||
// WAL is written, updating write metrics
|
||||
self.metrics.observe_write_seconds(write_seconds);
|
||||
self.metrics.observe_write_bytes(buf.len());
|
||||
@@ -340,14 +357,14 @@ impl Storage for PhysicalStorage {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn flush_wal(&mut self) -> Result<()> {
|
||||
async fn flush_wal(&mut self) -> Result<()> {
|
||||
if self.flush_record_lsn == self.write_record_lsn {
|
||||
// no need to do extra flush
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
if let Some(mut unflushed_file) = self.file.take() {
|
||||
self.fdatasync_file(&mut unflushed_file)?;
|
||||
self.fdatasync_file(&mut unflushed_file).await?;
|
||||
self.file = Some(unflushed_file);
|
||||
} else {
|
||||
// We have unflushed data (write_lsn != flush_lsn), but no file.
|
||||
@@ -369,7 +386,7 @@ impl Storage for PhysicalStorage {
|
||||
|
||||
/// Truncate written WAL by removing all WAL segments after the given LSN.
|
||||
/// end_pos must point to the end of the WAL record.
|
||||
fn truncate_wal(&mut self, end_pos: Lsn) -> Result<()> {
|
||||
async fn truncate_wal(&mut self, end_pos: Lsn) -> Result<()> {
|
||||
// Streaming must not create a hole, so truncate cannot be called on non-written lsn
|
||||
if self.write_lsn != Lsn(0) && end_pos > self.write_lsn {
|
||||
bail!(
|
||||
@@ -381,47 +398,51 @@ impl Storage for PhysicalStorage {
|
||||
|
||||
// Quick exit if nothing to do to avoid writing up to 16 MiB of zeros on
|
||||
// disk (this happens on each connect).
|
||||
if end_pos == self.write_lsn {
|
||||
if self.is_truncated_after_restart
|
||||
&& end_pos == self.write_lsn
|
||||
&& end_pos == self.flush_record_lsn
|
||||
{
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
// Close previously opened file, if any
|
||||
if let Some(mut unflushed_file) = self.file.take() {
|
||||
self.fdatasync_file(&mut unflushed_file)?;
|
||||
self.fdatasync_file(&mut unflushed_file).await?;
|
||||
}
|
||||
|
||||
let xlogoff = end_pos.segment_offset(self.wal_seg_size);
|
||||
let segno = end_pos.segment_number(self.wal_seg_size);
|
||||
|
||||
// Remove all segments after the given LSN.
|
||||
remove_segments_from_disk(&self.timeline_dir, self.wal_seg_size, |x| x > segno)?;
|
||||
remove_segments_from_disk(&self.timeline_dir, self.wal_seg_size, |x| x > segno).await?;
|
||||
|
||||
let (mut file, is_partial) = self.open_or_create(segno)?;
|
||||
let (mut file, is_partial) = self.open_or_create(segno).await?;
|
||||
|
||||
// Fill end with zeroes
|
||||
file.seek(SeekFrom::Start(xlogoff as u64))?;
|
||||
write_zeroes(&mut file, self.wal_seg_size - xlogoff)?;
|
||||
self.fdatasync_file(&mut file)?;
|
||||
file.seek(SeekFrom::Start(xlogoff as u64)).await?;
|
||||
write_zeroes(&mut file, self.wal_seg_size - xlogoff).await?;
|
||||
self.fdatasync_file(&mut file).await?;
|
||||
|
||||
if !is_partial {
|
||||
// Make segment partial once again
|
||||
let (wal_file_path, wal_file_partial_path) =
|
||||
wal_file_paths(&self.timeline_dir, segno, self.wal_seg_size)?;
|
||||
fs::rename(wal_file_path, wal_file_partial_path)?;
|
||||
fs::rename(wal_file_path, wal_file_partial_path).await?;
|
||||
}
|
||||
|
||||
// Update LSNs
|
||||
self.write_lsn = end_pos;
|
||||
self.write_record_lsn = end_pos;
|
||||
self.flush_record_lsn = end_pos;
|
||||
self.is_truncated_after_restart = true;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn remove_up_to(&self) -> Box<dyn Fn(XLogSegNo) -> Result<()>> {
|
||||
fn remove_up_to(&self, segno_up_to: XLogSegNo) -> BoxFuture<'static, anyhow::Result<()>> {
|
||||
let timeline_dir = self.timeline_dir.clone();
|
||||
let wal_seg_size = self.wal_seg_size;
|
||||
Box::new(move |segno_up_to: XLogSegNo| {
|
||||
remove_segments_from_disk(&timeline_dir, wal_seg_size, |x| x <= segno_up_to)
|
||||
Box::pin(async move {
|
||||
remove_segments_from_disk(&timeline_dir, wal_seg_size, |x| x <= segno_up_to).await
|
||||
})
|
||||
}
|
||||
|
||||
@@ -436,7 +457,7 @@ impl Storage for PhysicalStorage {
|
||||
}
|
||||
|
||||
/// Remove all WAL segments in timeline_dir that match the given predicate.
|
||||
fn remove_segments_from_disk(
|
||||
async fn remove_segments_from_disk(
|
||||
timeline_dir: &Path,
|
||||
wal_seg_size: usize,
|
||||
remove_predicate: impl Fn(XLogSegNo) -> bool,
|
||||
@@ -445,8 +466,8 @@ fn remove_segments_from_disk(
|
||||
let mut min_removed = u64::MAX;
|
||||
let mut max_removed = u64::MIN;
|
||||
|
||||
for entry in fs::read_dir(timeline_dir)? {
|
||||
let entry = entry?;
|
||||
let mut entries = fs::read_dir(timeline_dir).await?;
|
||||
while let Some(entry) = entries.next_entry().await? {
|
||||
let entry_path = entry.path();
|
||||
let fname = entry_path.file_name().unwrap();
|
||||
|
||||
@@ -457,7 +478,7 @@ fn remove_segments_from_disk(
|
||||
}
|
||||
let (segno, _) = XLogFromFileName(fname_str, wal_seg_size);
|
||||
if remove_predicate(segno) {
|
||||
remove_file(entry_path)?;
|
||||
remove_file(entry_path).await?;
|
||||
n_removed += 1;
|
||||
min_removed = min(min_removed, segno);
|
||||
max_removed = max(max_removed, segno);
|
||||
@@ -689,12 +710,12 @@ impl WalReader {
|
||||
const ZERO_BLOCK: &[u8] = &[0u8; XLOG_BLCKSZ];
|
||||
|
||||
/// Helper for filling file with zeroes.
|
||||
fn write_zeroes(file: &mut File, mut count: usize) -> Result<()> {
|
||||
async fn write_zeroes(file: &mut File, mut count: usize) -> Result<()> {
|
||||
while count >= XLOG_BLCKSZ {
|
||||
file.write_all(ZERO_BLOCK)?;
|
||||
file.write_all(ZERO_BLOCK).await?;
|
||||
count -= XLOG_BLCKSZ;
|
||||
}
|
||||
file.write_all(&ZERO_BLOCK[0..count])?;
|
||||
file.write_all(&ZERO_BLOCK[0..count]).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
||||
@@ -32,6 +32,7 @@ pub const DEFAULT_LISTEN_ADDR: &str = "127.0.0.1:50051";
|
||||
pub const DEFAULT_ENDPOINT: &str = const_format::formatcp!("http://{DEFAULT_LISTEN_ADDR}");
|
||||
|
||||
pub const DEFAULT_KEEPALIVE_INTERVAL: &str = "5000 ms";
|
||||
pub const DEFAULT_CONNECT_TIMEOUT: Duration = Duration::from_millis(5000);
|
||||
|
||||
// BrokerServiceClient charged with tonic provided Channel transport; helps to
|
||||
// avoid depending on tonic directly in user crates.
|
||||
@@ -58,7 +59,8 @@ where
|
||||
}
|
||||
tonic_endpoint = tonic_endpoint
|
||||
.http2_keep_alive_interval(keepalive_interval)
|
||||
.keep_alive_while_idle(true);
|
||||
.keep_alive_while_idle(true)
|
||||
.connect_timeout(DEFAULT_CONNECT_TIMEOUT);
|
||||
// keep_alive_timeout is 20s by default on both client and server side
|
||||
let channel = tonic_endpoint.connect_lazy();
|
||||
Ok(BrokerClientChannel::new(channel))
|
||||
|
||||
@@ -163,7 +163,6 @@ def test_forward_params_to_client(static_proxy: NeonProxy):
|
||||
assert conn.get_parameter_status(name) == value
|
||||
|
||||
|
||||
@pytest.mark.timeout(5)
|
||||
def test_close_on_connections_exit(static_proxy: NeonProxy):
|
||||
# Open two connections, send SIGTERM, then ensure that proxy doesn't exit
|
||||
# until after connections close.
|
||||
|
||||
Reference in New Issue
Block a user