Merge branch 'main' into problame/integrate-tokio-epoll-uring/benchmarking/2024-01-31-prs/async-walredo

This commit is contained in:
Christian Schwarz
2024-03-20 14:32:17 +00:00
126 changed files with 6964 additions and 1591 deletions

View File

@@ -1,3 +1,5 @@
#![recursion_limit = "300"]
//! Main entry point for the Page Server executable.
use std::env::{var, VarError};
@@ -118,6 +120,9 @@ fn main() -> anyhow::Result<()> {
&[("node_id", &conf.id.to_string())],
);
// after setting up logging, log the effective IO engine choice
info!(?conf.virtual_file_io_engine, "starting with virtual_file IO engine");
let tenants_path = conf.tenants_path();
if !tenants_path.exists() {
utils::crashsafe::create_dir_all(conf.tenants_path())
@@ -312,6 +317,7 @@ fn start_pageserver(
let http_listener = tcp_listener::bind(http_addr)?;
let pg_addr = &conf.listen_pg_addr;
info!("Starting pageserver pg protocol handler on {pg_addr}");
let pageserver_listener = tcp_listener::bind(pg_addr)?;
@@ -544,7 +550,7 @@ fn start_pageserver(
let router_state = Arc::new(
http::routes::State::new(
conf,
tenant_manager,
tenant_manager.clone(),
http_auth.clone(),
remote_storage.clone(),
broker_client.clone(),
@@ -688,6 +694,7 @@ fn start_pageserver(
let bg_remote_storage = remote_storage.clone();
let bg_deletion_queue = deletion_queue.clone();
BACKGROUND_RUNTIME.block_on(pageserver::shutdown_pageserver(
&tenant_manager,
bg_remote_storage.map(|_| bg_deletion_queue),
0,
));

View File

@@ -30,18 +30,17 @@ use utils::{
logging::LogFormat,
};
use crate::disk_usage_eviction_task::DiskUsageEvictionTaskConfig;
use crate::tenant::config::TenantConf;
use crate::tenant::config::TenantConfOpt;
use crate::tenant::timeline::GetVectoredImpl;
use crate::tenant::vectored_blob_io::MaxVectoredReadBytes;
use crate::tenant::{
TENANTS_SEGMENT_NAME, TENANT_DELETED_MARKER_FILE_NAME, TIMELINES_SEGMENT_NAME,
};
use crate::virtual_file;
use crate::{disk_usage_eviction_task::DiskUsageEvictionTaskConfig, virtual_file::io_engine};
use crate::{tenant::config::TenantConf, virtual_file};
use crate::{
IGNORED_TENANT_FILE_NAME, TENANT_CONFIG_NAME, TENANT_HEATMAP_BASENAME,
TENANT_LOCATION_CONFIG_NAME, TIMELINE_DELETE_MARK_SUFFIX, TIMELINE_UNINIT_MARK_SUFFIX,
TENANT_LOCATION_CONFIG_NAME, TIMELINE_DELETE_MARK_SUFFIX,
};
use self::defaults::DEFAULT_CONCURRENT_TENANT_WARMUP;
@@ -287,16 +286,23 @@ pub static SAFEKEEPER_AUTH_TOKEN: OnceCell<Arc<String>> = OnceCell::new();
// use dedicated enum for builder to better indicate the intention
// and avoid possible confusion with nested options
#[derive(Clone, Default)]
pub enum BuilderValue<T> {
Set(T),
#[default]
NotSet,
}
impl<T> BuilderValue<T> {
pub fn ok_or<E>(self, err: E) -> Result<T, E> {
impl<T: Clone> BuilderValue<T> {
pub fn ok_or(&self, field_name: &'static str, default: BuilderValue<T>) -> anyhow::Result<T> {
match self {
Self::Set(v) => Ok(v),
Self::NotSet => Err(err),
Self::Set(v) => Ok(v.clone()),
Self::NotSet => match default {
BuilderValue::Set(v) => Ok(v.clone()),
BuilderValue::NotSet => {
anyhow::bail!("missing config value {field_name:?}")
}
},
}
}
}
@@ -322,6 +328,7 @@ pub(crate) struct NodeMetadata {
}
// needed to simplify config construction
#[derive(Default)]
struct PageServerConfigBuilder {
listen_pg_addr: BuilderValue<String>,
@@ -388,8 +395,9 @@ struct PageServerConfigBuilder {
validate_vectored_get: BuilderValue<bool>,
}
impl Default for PageServerConfigBuilder {
fn default() -> Self {
impl PageServerConfigBuilder {
#[inline(always)]
fn default_values() -> Self {
use self::BuilderValue::*;
use defaults::*;
Self {
@@ -636,122 +644,95 @@ impl PageServerConfigBuilder {
}
pub fn build(self) -> anyhow::Result<PageServerConf> {
let concurrent_tenant_warmup = self
.concurrent_tenant_warmup
.ok_or(anyhow!("missing concurrent_tenant_warmup"))?;
let concurrent_tenant_size_logical_size_queries = self
.concurrent_tenant_size_logical_size_queries
.ok_or(anyhow!(
"missing concurrent_tenant_size_logical_size_queries"
))?;
Ok(PageServerConf {
listen_pg_addr: self
.listen_pg_addr
.ok_or(anyhow!("missing listen_pg_addr"))?,
listen_http_addr: self
.listen_http_addr
.ok_or(anyhow!("missing listen_http_addr"))?,
availability_zone: self
.availability_zone
.ok_or(anyhow!("missing availability_zone"))?,
wait_lsn_timeout: self
.wait_lsn_timeout
.ok_or(anyhow!("missing wait_lsn_timeout"))?,
superuser: self.superuser.ok_or(anyhow!("missing superuser"))?,
page_cache_size: self
.page_cache_size
.ok_or(anyhow!("missing page_cache_size"))?,
max_file_descriptors: self
.max_file_descriptors
.ok_or(anyhow!("missing max_file_descriptors"))?,
workdir: self.workdir.ok_or(anyhow!("missing workdir"))?,
pg_distrib_dir: self
.pg_distrib_dir
.ok_or(anyhow!("missing pg_distrib_dir"))?,
http_auth_type: self
.http_auth_type
.ok_or(anyhow!("missing http_auth_type"))?,
pg_auth_type: self.pg_auth_type.ok_or(anyhow!("missing pg_auth_type"))?,
auth_validation_public_key_path: self
.auth_validation_public_key_path
.ok_or(anyhow!("missing auth_validation_public_key_path"))?,
remote_storage_config: self
.remote_storage_config
.ok_or(anyhow!("missing remote_storage_config"))?,
id: self.id.ok_or(anyhow!("missing id"))?,
// TenantConf is handled separately
default_tenant_conf: TenantConf::default(),
broker_endpoint: self
.broker_endpoint
.ok_or(anyhow!("No broker endpoints provided"))?,
broker_keepalive_interval: self
.broker_keepalive_interval
.ok_or(anyhow!("No broker keepalive interval provided"))?,
log_format: self.log_format.ok_or(anyhow!("missing log_format"))?,
concurrent_tenant_warmup: ConfigurableSemaphore::new(concurrent_tenant_warmup),
concurrent_tenant_size_logical_size_queries: ConfigurableSemaphore::new(
concurrent_tenant_size_logical_size_queries,
),
eviction_task_immitated_concurrent_logical_size_queries: ConfigurableSemaphore::new(
concurrent_tenant_size_logical_size_queries,
),
metric_collection_interval: self
.metric_collection_interval
.ok_or(anyhow!("missing metric_collection_interval"))?,
cached_metric_collection_interval: self
.cached_metric_collection_interval
.ok_or(anyhow!("missing cached_metric_collection_interval"))?,
metric_collection_endpoint: self
.metric_collection_endpoint
.ok_or(anyhow!("missing metric_collection_endpoint"))?,
synthetic_size_calculation_interval: self
.synthetic_size_calculation_interval
.ok_or(anyhow!("missing synthetic_size_calculation_interval"))?,
disk_usage_based_eviction: self
.disk_usage_based_eviction
.ok_or(anyhow!("missing disk_usage_based_eviction"))?,
test_remote_failures: self
.test_remote_failures
.ok_or(anyhow!("missing test_remote_failuers"))?,
ondemand_download_behavior_treat_error_as_warn: self
.ondemand_download_behavior_treat_error_as_warn
.ok_or(anyhow!(
"missing ondemand_download_behavior_treat_error_as_warn"
))?,
background_task_maximum_delay: self
.background_task_maximum_delay
.ok_or(anyhow!("missing background_task_maximum_delay"))?,
control_plane_api: self
.control_plane_api
.ok_or(anyhow!("missing control_plane_api"))?,
control_plane_api_token: self
.control_plane_api_token
.ok_or(anyhow!("missing control_plane_api_token"))?,
control_plane_emergency_mode: self
.control_plane_emergency_mode
.ok_or(anyhow!("missing control_plane_emergency_mode"))?,
heatmap_upload_concurrency: self
.heatmap_upload_concurrency
.ok_or(anyhow!("missing heatmap_upload_concurrency"))?,
secondary_download_concurrency: self
.secondary_download_concurrency
.ok_or(anyhow!("missing secondary_download_concurrency"))?,
ingest_batch_size: self
.ingest_batch_size
.ok_or(anyhow!("missing ingest_batch_size"))?,
virtual_file_io_engine: self
.virtual_file_io_engine
.ok_or(anyhow!("missing virtual_file_io_engine"))?,
get_vectored_impl: self
.get_vectored_impl
.ok_or(anyhow!("missing get_vectored_impl"))?,
max_vectored_read_bytes: self
.max_vectored_read_bytes
.ok_or(anyhow!("missing max_vectored_read_bytes"))?,
validate_vectored_get: self
.validate_vectored_get
.ok_or(anyhow!("missing validate_vectored_get"))?,
})
let default = Self::default_values();
macro_rules! conf {
(USING DEFAULT { $($field:ident,)* } CUSTOM LOGIC { $($custom_field:ident : $custom_value:expr,)* } ) => {
PageServerConf {
$(
$field: self.$field.ok_or(stringify!($field), default.$field)?,
)*
$(
$custom_field: $custom_value,
)*
}
};
}
Ok(conf!(
USING DEFAULT
{
listen_pg_addr,
listen_http_addr,
availability_zone,
wait_lsn_timeout,
superuser,
page_cache_size,
max_file_descriptors,
workdir,
pg_distrib_dir,
http_auth_type,
pg_auth_type,
auth_validation_public_key_path,
remote_storage_config,
id,
broker_endpoint,
broker_keepalive_interval,
log_format,
metric_collection_interval,
cached_metric_collection_interval,
metric_collection_endpoint,
synthetic_size_calculation_interval,
disk_usage_based_eviction,
test_remote_failures,
ondemand_download_behavior_treat_error_as_warn,
background_task_maximum_delay,
control_plane_api,
control_plane_api_token,
control_plane_emergency_mode,
heatmap_upload_concurrency,
secondary_download_concurrency,
ingest_batch_size,
get_vectored_impl,
max_vectored_read_bytes,
validate_vectored_get,
}
CUSTOM LOGIC
{
// TenantConf is handled separately
default_tenant_conf: TenantConf::default(),
concurrent_tenant_warmup: ConfigurableSemaphore::new({
self
.concurrent_tenant_warmup
.ok_or("concurrent_tenant_warmpup",
default.concurrent_tenant_warmup)?
}),
concurrent_tenant_size_logical_size_queries: ConfigurableSemaphore::new(
self
.concurrent_tenant_size_logical_size_queries
.ok_or("concurrent_tenant_size_logical_size_queries",
default.concurrent_tenant_size_logical_size_queries.clone())?
),
eviction_task_immitated_concurrent_logical_size_queries: ConfigurableSemaphore::new(
// re-use `concurrent_tenant_size_logical_size_queries`
self
.concurrent_tenant_size_logical_size_queries
.ok_or("eviction_task_immitated_concurrent_logical_size_queries",
default.concurrent_tenant_size_logical_size_queries.clone())?,
),
virtual_file_io_engine: match self.virtual_file_io_engine {
BuilderValue::Set(v) => v,
BuilderValue::NotSet => match crate::virtual_file::io_engine_feature_test().context("auto-detect virtual_file_io_engine")? {
io_engine::FeatureTestResult::PlatformPreferred(v) => v, // make no noise
io_engine::FeatureTestResult::Worse { engine, remark } => {
// TODO: bubble this up to the caller so we can tracing::warn! it.
eprintln!("auto-detected IO engine is not platform-preferred: engine={engine:?} remark={remark:?}");
engine
}
},
},
}
))
}
}
@@ -831,18 +812,7 @@ impl PageServerConf {
.join(timeline_id.to_string())
}
pub fn timeline_uninit_mark_file_path(
&self,
tenant_shard_id: TenantShardId,
timeline_id: TimelineId,
) -> Utf8PathBuf {
path_with_suffix_extension(
self.timeline_path(&tenant_shard_id, &timeline_id),
TIMELINE_UNINIT_MARK_SUFFIX,
)
}
pub fn timeline_delete_mark_file_path(
pub(crate) fn timeline_delete_mark_file_path(
&self,
tenant_shard_id: TenantShardId,
timeline_id: TimelineId,
@@ -853,7 +823,10 @@ impl PageServerConf {
)
}
pub fn tenant_deleted_mark_file_path(&self, tenant_shard_id: &TenantShardId) -> Utf8PathBuf {
pub(crate) fn tenant_deleted_mark_file_path(
&self,
tenant_shard_id: &TenantShardId,
) -> Utf8PathBuf {
self.tenant_path(tenant_shard_id)
.join(TENANT_DELETED_MARKER_FILE_NAME)
}

View File

@@ -567,9 +567,9 @@ paths:
application/json:
schema:
$ref: "#/components/schemas/ServiceUnavailableError"
/v1/tenant/{tenant_id}/location_config:
/v1/tenant/{tenant_shard_id}/location_config:
parameters:
- name: tenant_id
- name: tenant_shard_id
in: path
required: true
schema:
@@ -965,12 +965,28 @@ paths:
required: true
schema:
type: string
- name: wait_ms
description: If set, we will wait this long for download to complete, and if it isn't complete then return 202
in: query
required: false
schema:
type: integer
post:
description: |
If the location is in secondary mode, download latest heatmap and layers
responses:
"200":
description: Success
content:
application/json:
schema:
$ref: "#/components/schemas/SecondaryProgress"
"202":
description: Download has started but not yet finished
content:
application/json:
schema:
$ref: "#/components/schemas/SecondaryProgress"
"500":
description: Generic operation error
content:
@@ -1367,10 +1383,11 @@ components:
TenantLocationConfigRequest:
type: object
required:
- tenant_id
- mode
properties:
tenant_id:
type: string
description: Not used, scheduled for removal.
mode:
type: string
enum: ["AttachedSingle", "AttachedMulti", "AttachedStale", "Secondary", "Detached"]
@@ -1622,6 +1639,37 @@ components:
Lower is better score for how good this pageserver would be for the next tenant.
The default or maximum value can be returned in situations when a proper score cannot (yet) be calculated.
SecondaryProgress:
type: object
required:
- heatmap_mtime
- layers_downloaded
- layers_total
- bytes_downloaded
- bytes_total
properties:
heatmap_mtime:
type: string
format: date-time
description: Modification time of the most recently downloaded layer heatmap (RFC 3339 format)
layers_downloaded:
type: integer
format: int64
description: How many layers from the latest layer heatmap are present on disk
bytes_downloaded:
type: integer
format: int64
description: How many bytes of layer content from the latest layer heatmap are present on disk
layers_total:
type: integer
format: int64
description: How many layers were in the latest layer heatmap
bytes_total:
type: integer
format: int64
description: How many bytes of layer content were in the latest layer heatmap
Error:
type: object
required:

View File

@@ -535,9 +535,9 @@ async fn timeline_create_handler(
)
}
Err(
tenant::CreateTimelineError::Conflict
| tenant::CreateTimelineError::AlreadyCreating,
) => json_response(StatusCode::CONFLICT, ()),
e @ tenant::CreateTimelineError::Conflict
| e @ tenant::CreateTimelineError::AlreadyCreating,
) => json_response(StatusCode::CONFLICT, HttpErrorBody::from_msg(e.to_string())),
Err(tenant::CreateTimelineError::AncestorLsn(err)) => json_response(
StatusCode::NOT_ACCEPTABLE,
HttpErrorBody::from_msg(format!("{err:#}")),
@@ -885,14 +885,16 @@ async fn tenant_detach_handler(
let state = get_state(&request);
let conf = state.conf;
mgr::detach_tenant(
conf,
tenant_shard_id,
detach_ignored.unwrap_or(false),
&state.deletion_queue_client,
)
.instrument(info_span!("tenant_detach", %tenant_id, shard_id=%tenant_shard_id.shard_slug()))
.await?;
state
.tenant_manager
.detach_tenant(
conf,
tenant_shard_id,
detach_ignored.unwrap_or(false),
&state.deletion_queue_client,
)
.instrument(info_span!("tenant_detach", %tenant_id, shard_id=%tenant_shard_id.shard_slug()))
.await?;
json_response(StatusCode::OK, ())
}
@@ -1403,7 +1405,9 @@ async fn update_tenant_config_handler(
TenantConfOpt::try_from(&request_data.config).map_err(ApiError::BadRequest)?;
let state = get_state(&request);
mgr::set_new_tenant_config(state.conf, tenant_conf, tenant_id)
state
.tenant_manager
.set_new_tenant_config(tenant_conf, tenant_id)
.instrument(info_span!("tenant_config", %tenant_id))
.await?;
@@ -1428,13 +1432,14 @@ async fn put_tenant_location_config_handler(
// The `Detached` state is special, it doesn't upsert a tenant, it removes
// its local disk content and drops it from memory.
if let LocationConfigMode::Detached = request_data.config.mode {
if let Err(e) =
mgr::detach_tenant(conf, tenant_shard_id, true, &state.deletion_queue_client)
.instrument(info_span!("tenant_detach",
tenant_id = %tenant_shard_id.tenant_id,
shard_id = %tenant_shard_id.shard_slug()
))
.await
if let Err(e) = state
.tenant_manager
.detach_tenant(conf, tenant_shard_id, true, &state.deletion_queue_client)
.instrument(info_span!("tenant_detach",
tenant_id = %tenant_shard_id.tenant_id,
shard_id = %tenant_shard_id.shard_slug()
))
.await
{
match e {
TenantStateError::SlotError(TenantSlotError::NotFound(_)) => {
@@ -1648,8 +1653,7 @@ async fn timeline_gc_handler(
let gc_req: TimelineGcRequest = json_request(&mut request).await?;
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
let wait_task_done =
mgr::immediate_gc(tenant_shard_id, timeline_id, gc_req, cancel, &ctx).await?;
let wait_task_done = mgr::immediate_gc(tenant_shard_id, timeline_id, gc_req, cancel, &ctx)?;
let gc_result = wait_task_done
.await
.context("wait for gc task")
@@ -1987,13 +1991,42 @@ async fn secondary_download_handler(
) -> Result<Response<Body>, ApiError> {
let state = get_state(&request);
let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
state
.secondary_controller
.download_tenant(tenant_shard_id)
.await
.map_err(ApiError::InternalServerError)?;
let wait = parse_query_param(&request, "wait_ms")?.map(Duration::from_millis);
json_response(StatusCode::OK, ())
// We don't need this to issue the download request, but:
// - it enables us to cleanly return 404 if we get a request for an absent shard
// - we will use this to provide status feedback in the response
let Some(secondary_tenant) = state
.tenant_manager
.get_secondary_tenant_shard(tenant_shard_id)
else {
return Err(ApiError::NotFound(
anyhow::anyhow!("Shard {} not found", tenant_shard_id).into(),
));
};
let timeout = wait.unwrap_or(Duration::MAX);
let status = match tokio::time::timeout(
timeout,
state.secondary_controller.download_tenant(tenant_shard_id),
)
.await
{
// Download job ran to completion.
Ok(Ok(())) => StatusCode::OK,
// Edge case: downloads aren't usually fallible: things like a missing heatmap are considered
// okay. We could get an error here in the unlikely edge case that the tenant
// was detached between our check above and executing the download job.
Ok(Err(e)) => return Err(ApiError::InternalServerError(e)),
// A timeout is not an error: we have started the download, we're just not done
// yet. The caller will get a response body indicating status.
Err(_) => StatusCode::ACCEPTED,
};
let progress = secondary_tenant.progress.lock().unwrap().clone();
json_response(status, progress)
}
async fn handler_404(_: Request<Body>) -> Result<Response<Body>, ApiError> {
@@ -2053,6 +2086,10 @@ async fn get_utilization(
r: Request<Body>,
_cancel: CancellationToken,
) -> Result<Response<Body>, ApiError> {
fail::fail_point!("get-utilization-http-handler", |_| {
Err(ApiError::ResourceUnavailable("failpoint".into()))
});
// this probably could be completely public, but lets make that change later.
check_permission(&r, None)?;
@@ -2108,6 +2145,16 @@ where
R: std::future::Future<Output = Result<Response<Body>, ApiError>> + Send + 'static,
H: FnOnce(Request<Body>, CancellationToken) -> R + Send + Sync + 'static,
{
if request.uri() != &"/v1/failpoints".parse::<Uri>().unwrap() {
fail::fail_point!("api-503", |_| Err(ApiError::ResourceUnavailable(
"failpoint".into()
)));
fail::fail_point!("api-500", |_| Err(ApiError::InternalServerError(
anyhow::anyhow!("failpoint")
)));
}
// Spawn a new task to handle the request, to protect the handler from unexpected
// async cancellations. Most pageserver functions are not async cancellation safe.
// We arm a drop-guard, so that if Hyper drops the Future, we signal the task

View File

@@ -2,28 +2,20 @@
//! Import data and WAL from a PostgreSQL data directory and WAL segments into
//! a neon Timeline.
//!
use std::io::SeekFrom;
use std::path::{Path, PathBuf};
use anyhow::{bail, ensure, Context, Result};
use async_compression::tokio::bufread::ZstdDecoder;
use async_compression::{tokio::write::ZstdEncoder, zstd::CParameter, Level};
use bytes::Bytes;
use camino::Utf8Path;
use futures::StreamExt;
use nix::NixPath;
use tokio::fs::{File, OpenOptions};
use tokio::io::{AsyncBufRead, AsyncRead, AsyncReadExt, AsyncSeekExt, AsyncWriteExt};
use tokio::io::{AsyncRead, AsyncReadExt};
use tokio_tar::Archive;
use tokio_tar::Builder;
use tokio_tar::HeaderMode;
use tracing::*;
use walkdir::WalkDir;
use crate::context::RequestContext;
use crate::metrics::WAL_INGEST;
use crate::pgdatadir_mapping::*;
use crate::tenant::remote_timeline_client::INITDB_PATH;
use crate::tenant::Timeline;
use crate::walingest::WalIngest;
use crate::walrecord::DecodedWALRecord;
@@ -633,65 +625,3 @@ async fn read_all_bytes(reader: &mut (impl AsyncRead + Unpin)) -> Result<Bytes>
reader.read_to_end(&mut buf).await?;
Ok(Bytes::from(buf))
}
pub async fn create_tar_zst(pgdata_path: &Utf8Path, tmp_path: &Utf8Path) -> Result<(File, u64)> {
let file = OpenOptions::new()
.create(true)
.truncate(true)
.read(true)
.write(true)
.open(&tmp_path)
.await
.with_context(|| format!("tempfile creation {tmp_path}"))?;
let mut paths = Vec::new();
for entry in WalkDir::new(pgdata_path) {
let entry = entry?;
let metadata = entry.metadata().expect("error getting dir entry metadata");
// Also allow directories so that we also get empty directories
if !(metadata.is_file() || metadata.is_dir()) {
continue;
}
let path = entry.into_path();
paths.push(path);
}
// Do a sort to get a more consistent listing
paths.sort_unstable();
let zstd = ZstdEncoder::with_quality_and_params(
file,
Level::Default,
&[CParameter::enable_long_distance_matching(true)],
);
let mut builder = Builder::new(zstd);
// Use reproducible header mode
builder.mode(HeaderMode::Deterministic);
for path in paths {
let rel_path = path.strip_prefix(pgdata_path)?;
if rel_path.is_empty() {
// The top directory should not be compressed,
// the tar crate doesn't like that
continue;
}
builder.append_path_with_name(&path, rel_path).await?;
}
let mut zstd = builder.into_inner().await?;
zstd.shutdown().await?;
let mut compressed = zstd.into_inner();
let compressed_len = compressed.metadata().await?.len();
const INITDB_TAR_ZST_WARN_LIMIT: u64 = 2 * 1024 * 1024;
if compressed_len > INITDB_TAR_ZST_WARN_LIMIT {
warn!("compressed {INITDB_PATH} size of {compressed_len} is above limit {INITDB_TAR_ZST_WARN_LIMIT}.");
}
compressed.seek(SeekFrom::Start(0)).await?;
Ok((compressed, compressed_len))
}
pub async fn extract_tar_zst(
pgdata_path: &Utf8Path,
tar_zst: impl AsyncBufRead + Unpin,
) -> Result<()> {
let tar = Box::pin(ZstdDecoder::new(tar_zst));
let mut archive = Archive::new(tar);
archive.unpack(pgdata_path).await?;
Ok(())
}

View File

@@ -31,6 +31,7 @@ pub mod walredo;
use crate::task_mgr::TaskKind;
use camino::Utf8Path;
use deletion_queue::DeletionQueue;
use tenant::mgr::TenantManager;
use tracing::info;
/// Current storage format version
@@ -53,7 +54,11 @@ static ZERO_PAGE: bytes::Bytes = bytes::Bytes::from_static(&[0u8; 8192]);
pub use crate::metrics::preinitialize_metrics;
#[tracing::instrument(skip_all, fields(%exit_code))]
pub async fn shutdown_pageserver(deletion_queue: Option<DeletionQueue>, exit_code: i32) {
pub async fn shutdown_pageserver(
tenant_manager: &TenantManager,
deletion_queue: Option<DeletionQueue>,
exit_code: i32,
) {
use std::time::Duration;
// Shut down the libpq endpoint task. This prevents new connections from
// being accepted.
@@ -67,7 +72,7 @@ pub async fn shutdown_pageserver(deletion_queue: Option<DeletionQueue>, exit_cod
// Shut down all the tenants. This flushes everything to disk and kills
// the checkpoint and GC tasks.
timed(
tenant::mgr::shutdown_all_tenants(),
tenant_manager.shutdown(),
"shutdown all tenants",
Duration::from_secs(5),
)
@@ -114,27 +119,27 @@ pub const METADATA_FILE_NAME: &str = "metadata";
/// Per-tenant configuration file.
/// Full path: `tenants/<tenant_id>/config`.
pub const TENANT_CONFIG_NAME: &str = "config";
pub(crate) const TENANT_CONFIG_NAME: &str = "config";
/// Per-tenant configuration file.
/// Full path: `tenants/<tenant_id>/config`.
pub const TENANT_LOCATION_CONFIG_NAME: &str = "config-v1";
pub(crate) const TENANT_LOCATION_CONFIG_NAME: &str = "config-v1";
/// Per-tenant copy of their remote heatmap, downloaded into the local
/// tenant path while in secondary mode.
pub const TENANT_HEATMAP_BASENAME: &str = "heatmap-v1.json";
pub(crate) const TENANT_HEATMAP_BASENAME: &str = "heatmap-v1.json";
/// A suffix used for various temporary files. Any temporary files found in the
/// data directory at pageserver startup can be automatically removed.
pub const TEMP_FILE_SUFFIX: &str = "___temp";
pub(crate) const TEMP_FILE_SUFFIX: &str = "___temp";
/// A marker file to mark that a timeline directory was not fully initialized.
/// If a timeline directory with this marker is encountered at pageserver startup,
/// the timeline directory and the marker file are both removed.
/// Full path: `tenants/<tenant_id>/timelines/<timeline_id>___uninit`.
pub const TIMELINE_UNINIT_MARK_SUFFIX: &str = "___uninit";
pub(crate) const TIMELINE_UNINIT_MARK_SUFFIX: &str = "___uninit";
pub const TIMELINE_DELETE_MARK_SUFFIX: &str = "___delete";
pub(crate) const TIMELINE_DELETE_MARK_SUFFIX: &str = "___delete";
/// A marker file to prevent pageserver from loading a certain tenant on restart.
/// Different from [`TIMELINE_UNINIT_MARK_SUFFIX`] due to semantics of the corresponding
@@ -161,11 +166,11 @@ fn ends_with_suffix(path: &Utf8Path, suffix: &str) -> bool {
// from the directory name. Instead create type "UninitMark(TimelineId)" and only parse it once
// from the name.
pub fn is_uninit_mark(path: &Utf8Path) -> bool {
pub(crate) fn is_uninit_mark(path: &Utf8Path) -> bool {
ends_with_suffix(path, TIMELINE_UNINIT_MARK_SUFFIX)
}
pub fn is_delete_mark(path: &Utf8Path) -> bool {
pub(crate) fn is_delete_mark(path: &Utf8Path) -> bool {
ends_with_suffix(path, TIMELINE_DELETE_MARK_SUFFIX)
}

View File

@@ -167,7 +167,7 @@ impl GetVectoredLatency {
pub(crate) static GET_VECTORED_LATENCY: Lazy<GetVectoredLatency> = Lazy::new(|| {
let inner = register_histogram_vec!(
"pageserver_get_vectored_seconds",
"Time spent in get_vectored",
"Time spent in get_vectored, excluding time spent in timeline_get_throttle.",
&["task_kind"],
CRITICAL_OP_BUCKETS.into(),
)
@@ -2465,7 +2465,8 @@ impl<F: Future<Output = Result<O, E>>, O, E> Future for MeasuredRemoteOp<F> {
}
pub mod tokio_epoll_uring {
use metrics::UIntGauge;
use metrics::{register_int_counter, UIntGauge};
use once_cell::sync::Lazy;
pub struct Collector {
descs: Vec<metrics::core::Desc>,
@@ -2473,15 +2474,13 @@ pub mod tokio_epoll_uring {
systems_destroyed: UIntGauge,
}
const NMETRICS: usize = 2;
impl metrics::core::Collector for Collector {
fn desc(&self) -> Vec<&metrics::core::Desc> {
self.descs.iter().collect()
}
fn collect(&self) -> Vec<metrics::proto::MetricFamily> {
let mut mfs = Vec::with_capacity(NMETRICS);
let mut mfs = Vec::with_capacity(Self::NMETRICS);
let tokio_epoll_uring::metrics::Metrics {
systems_created,
systems_destroyed,
@@ -2495,6 +2494,8 @@ pub mod tokio_epoll_uring {
}
impl Collector {
const NMETRICS: usize = 2;
#[allow(clippy::new_without_default)]
pub fn new() -> Self {
let mut descs = Vec::new();
@@ -2528,6 +2529,22 @@ pub mod tokio_epoll_uring {
}
}
}
pub(crate) static THREAD_LOCAL_LAUNCH_SUCCESSES: Lazy<metrics::IntCounter> = Lazy::new(|| {
register_int_counter!(
"pageserver_tokio_epoll_uring_pageserver_thread_local_launch_success_count",
"Number of times where thread_local_system creation spanned multiple executor threads",
)
.unwrap()
});
pub(crate) static THREAD_LOCAL_LAUNCH_FAILURES: Lazy<metrics::IntCounter> = Lazy::new(|| {
register_int_counter!(
"pageserver_tokio_epoll_uring_pageserver_thread_local_launch_failures_count",
"Number of times thread_local_system creation failed and was retried after back-off.",
)
.unwrap()
});
}
pub(crate) mod tenant_throttling {
@@ -2656,6 +2673,8 @@ pub fn preinitialize_metrics() {
&WALRECEIVER_BROKER_UPDATES,
&WALRECEIVER_CANDIDATES_ADDED,
&WALRECEIVER_CANDIDATES_REMOVED,
&tokio_epoll_uring::THREAD_LOCAL_LAUNCH_FAILURES,
&tokio_epoll_uring::THREAD_LOCAL_LAUNCH_SUCCESSES,
]
.into_iter()
.for_each(|c| {

View File

@@ -50,8 +50,6 @@ use once_cell::sync::Lazy;
use utils::id::TimelineId;
use crate::shutdown_pageserver;
//
// There are four runtimes:
//
@@ -453,7 +451,7 @@ async fn task_finish(
}
if shutdown_process {
shutdown_pageserver(None, 1).await;
std::process::exit(1);
}
}

View File

@@ -43,6 +43,8 @@ use utils::sync::gate::Gate;
use utils::sync::gate::GateGuard;
use utils::timeout::timeout_cancellable;
use utils::timeout::TimeoutCancellableError;
use utils::zstd::create_zst_tarball;
use utils::zstd::extract_zst_tarball;
use self::config::AttachedLocationConfig;
use self::config::AttachmentMode;
@@ -55,8 +57,8 @@ use self::mgr::GetTenantError;
use self::mgr::TenantsMap;
use self::remote_timeline_client::upload::upload_index_part;
use self::remote_timeline_client::RemoteTimelineClient;
use self::timeline::uninit::TimelineCreateGuard;
use self::timeline::uninit::TimelineExclusionError;
use self::timeline::uninit::TimelineUninitMark;
use self::timeline::uninit::UninitializedTimeline;
use self::timeline::EvictionTaskTenantState;
use self::timeline::TimelineResources;
@@ -565,9 +567,8 @@ impl Tenant {
// avoiding holding it across awaits
let mut timelines_accessor = self.timelines.lock().unwrap();
match timelines_accessor.entry(timeline_id) {
// We should never try and load the same timeline twice during startup
Entry::Occupied(_) => {
// The uninit mark file acts as a lock that prevents another task from
// initializing the timeline at the same time.
unreachable!(
"Timeline {tenant_id}/{timeline_id} already exists in the tenant map"
);
@@ -1064,8 +1065,7 @@ impl Tenant {
let entry_path = entry.path();
let purge = if crate::is_temporary(entry_path)
// TODO: uninit_mark isn't needed any more, since uninitialized timelines are already
// covered by the check that the timeline must exist in remote storage.
// TODO: remove uninit mark code (https://github.com/neondatabase/neon/issues/5718)
|| is_uninit_mark(entry_path)
|| crate::is_delete_mark(entry_path)
{
@@ -1298,11 +1298,6 @@ impl Tenant {
/// Until that happens, the on-disk state is invalid (disk_consistent_lsn=Lsn(0))
/// and the timeline will fail to load at a restart.
///
/// That's why we add an uninit mark file, and wrap it together witht the Timeline
/// in-memory object into UninitializedTimeline.
/// Once the caller is done setting up the timeline, they should call
/// `UninitializedTimeline::initialize_with_lock` to remove the uninit mark.
///
/// For tests, use `DatadirModification::init_empty_test_timeline` + `commit` to setup the
/// minimum amount of keys required to get a writable timeline.
/// (Without it, `put` might fail due to `repartition` failing.)
@@ -1318,7 +1313,9 @@ impl Tenant {
"Cannot create empty timelines on inactive tenant"
);
let timeline_uninit_mark = self.create_timeline_uninit_mark(new_timeline_id)?;
// Protect against concurrent attempts to use this TimelineId
let create_guard = self.create_timeline_create_guard(new_timeline_id)?;
let new_metadata = TimelineMetadata::new(
// Initialize disk_consistent LSN to 0, The caller must import some data to
// make it valid, before calling finish_creation()
@@ -1333,7 +1330,7 @@ impl Tenant {
self.prepare_new_timeline(
new_timeline_id,
&new_metadata,
timeline_uninit_mark,
create_guard,
initdb_lsn,
None,
)
@@ -1421,9 +1418,8 @@ impl Tenant {
.map_err(|_| CreateTimelineError::ShuttingDown)?;
// Get exclusive access to the timeline ID: this ensures that it does not already exist,
// and that no other creation attempts will be allowed in while we are working. The
// uninit_mark is a guard.
let uninit_mark = match self.create_timeline_uninit_mark(new_timeline_id) {
// and that no other creation attempts will be allowed in while we are working.
let create_guard = match self.create_timeline_create_guard(new_timeline_id) {
Ok(m) => m,
Err(TimelineExclusionError::AlreadyCreating) => {
// Creation is in progress, we cannot create it again, and we cannot
@@ -1466,6 +1462,8 @@ impl Tenant {
}
};
pausable_failpoint!("timeline-creation-after-uninit");
let loaded_timeline = match ancestor_timeline_id {
Some(ancestor_timeline_id) => {
let ancestor_timeline = self
@@ -1513,7 +1511,7 @@ impl Tenant {
&ancestor_timeline,
new_timeline_id,
ancestor_start_lsn,
uninit_mark,
create_guard,
ctx,
)
.await?
@@ -1523,7 +1521,7 @@ impl Tenant {
new_timeline_id,
pg_version,
load_existing_initdb,
uninit_mark,
create_guard,
ctx,
)
.await?
@@ -2870,9 +2868,9 @@ impl Tenant {
start_lsn: Option<Lsn>,
ctx: &RequestContext,
) -> Result<Arc<Timeline>, CreateTimelineError> {
let uninit_mark = self.create_timeline_uninit_mark(dst_id).unwrap();
let create_guard = self.create_timeline_create_guard(dst_id).unwrap();
let tl = self
.branch_timeline_impl(src_timeline, dst_id, start_lsn, uninit_mark, ctx)
.branch_timeline_impl(src_timeline, dst_id, start_lsn, create_guard, ctx)
.await?;
tl.set_state(TimelineState::Active);
Ok(tl)
@@ -2886,10 +2884,10 @@ impl Tenant {
src_timeline: &Arc<Timeline>,
dst_id: TimelineId,
start_lsn: Option<Lsn>,
timeline_uninit_mark: TimelineUninitMark<'_>,
timeline_create_guard: TimelineCreateGuard<'_>,
ctx: &RequestContext,
) -> Result<Arc<Timeline>, CreateTimelineError> {
self.branch_timeline_impl(src_timeline, dst_id, start_lsn, timeline_uninit_mark, ctx)
self.branch_timeline_impl(src_timeline, dst_id, start_lsn, timeline_create_guard, ctx)
.await
}
@@ -2898,7 +2896,7 @@ impl Tenant {
src_timeline: &Arc<Timeline>,
dst_id: TimelineId,
start_lsn: Option<Lsn>,
timeline_uninit_mark: TimelineUninitMark<'_>,
timeline_create_guard: TimelineCreateGuard<'_>,
_ctx: &RequestContext,
) -> Result<Arc<Timeline>, CreateTimelineError> {
let src_id = src_timeline.timeline_id;
@@ -2982,7 +2980,7 @@ impl Tenant {
.prepare_new_timeline(
dst_id,
&metadata,
timeline_uninit_mark,
timeline_create_guard,
start_lsn + 1,
Some(Arc::clone(src_timeline)),
)
@@ -3014,12 +3012,12 @@ impl Tenant {
load_existing_initdb: Option<TimelineId>,
ctx: &RequestContext,
) -> anyhow::Result<Arc<Timeline>> {
let uninit_mark = self.create_timeline_uninit_mark(timeline_id).unwrap();
let create_guard = self.create_timeline_create_guard(timeline_id).unwrap();
self.bootstrap_timeline(
timeline_id,
pg_version,
load_existing_initdb,
uninit_mark,
create_guard,
ctx,
)
.await
@@ -3046,8 +3044,13 @@ impl Tenant {
}
}
let (pgdata_zstd, tar_zst_size) =
import_datadir::create_tar_zst(pgdata_path, &temp_path).await?;
let (pgdata_zstd, tar_zst_size) = create_zst_tarball(pgdata_path, &temp_path).await?;
const INITDB_TAR_ZST_WARN_LIMIT: u64 = 2 * 1024 * 1024;
if tar_zst_size > INITDB_TAR_ZST_WARN_LIMIT {
warn!(
"compressed {temp_path} size of {tar_zst_size} is above limit {INITDB_TAR_ZST_WARN_LIMIT}."
);
}
pausable_failpoint!("before-initdb-upload");
@@ -3083,7 +3086,7 @@ impl Tenant {
timeline_id: TimelineId,
pg_version: u32,
load_existing_initdb: Option<TimelineId>,
timeline_uninit_mark: TimelineUninitMark<'_>,
timeline_create_guard: TimelineCreateGuard<'_>,
ctx: &RequestContext,
) -> anyhow::Result<Arc<Timeline>> {
// create a `tenant/{tenant_id}/timelines/basebackup-{timeline_id}.{TEMP_FILE_SUFFIX}/`
@@ -3095,13 +3098,14 @@ impl Tenant {
TEMP_FILE_SUFFIX,
);
// an uninit mark was placed before, nothing else can access this timeline files
// current initdb was not run yet, so remove whatever was left from the previous runs
// Remove whatever was left from the previous runs: safe because TimelineCreateGuard guarantees
// we won't race with other creations or existent timelines with the same path.
if pgdata_path.exists() {
fs::remove_dir_all(&pgdata_path).with_context(|| {
format!("Failed to remove already existing initdb directory: {pgdata_path}")
})?;
}
// this new directory is very temporary, set to remove it immediately after bootstrap, we don't need it
scopeguard::defer! {
if let Err(e) = fs::remove_dir_all(&pgdata_path) {
@@ -3146,7 +3150,7 @@ impl Tenant {
let buf_read =
BufReader::with_capacity(remote_timeline_client::BUFFER_SIZE, initdb_tar_zst);
import_datadir::extract_tar_zst(&pgdata_path, buf_read)
extract_zst_tarball(&pgdata_path, buf_read)
.await
.context("extract initdb tar")?;
} else {
@@ -3178,7 +3182,7 @@ impl Tenant {
.prepare_new_timeline(
timeline_id,
&new_metadata,
timeline_uninit_mark,
timeline_create_guard,
pgdata_lsn,
None,
)
@@ -3250,13 +3254,12 @@ impl Tenant {
///
/// An empty layer map is initialized, and new data and WAL can be imported starting
/// at 'disk_consistent_lsn'. After any initial data has been imported, call
/// `finish_creation` to insert the Timeline into the timelines map and to remove the
/// uninit mark file.
/// `finish_creation` to insert the Timeline into the timelines map.
async fn prepare_new_timeline<'a>(
&'a self,
new_timeline_id: TimelineId,
new_metadata: &TimelineMetadata,
uninit_mark: TimelineUninitMark<'a>,
create_guard: TimelineCreateGuard<'a>,
start_lsn: Lsn,
ancestor: Option<Arc<Timeline>>,
) -> anyhow::Result<UninitializedTimeline> {
@@ -3279,9 +3282,12 @@ impl Tenant {
timeline_struct.init_empty_layer_map(start_lsn);
if let Err(e) = self.create_timeline_files(&uninit_mark.timeline_path).await {
if let Err(e) = self
.create_timeline_files(&create_guard.timeline_path)
.await
{
error!("Failed to create initial files for timeline {tenant_shard_id}/{new_timeline_id}, cleaning up: {e:?}");
cleanup_timeline_directory(uninit_mark);
cleanup_timeline_directory(create_guard);
return Err(e);
}
@@ -3292,41 +3298,31 @@ impl Tenant {
Ok(UninitializedTimeline::new(
self,
new_timeline_id,
Some((timeline_struct, uninit_mark)),
Some((timeline_struct, create_guard)),
))
}
async fn create_timeline_files(&self, timeline_path: &Utf8Path) -> anyhow::Result<()> {
crashsafe::create_dir(timeline_path).context("Failed to create timeline directory")?;
fail::fail_point!("after-timeline-uninit-mark-creation", |_| {
anyhow::bail!("failpoint after-timeline-uninit-mark-creation");
fail::fail_point!("after-timeline-dir-creation", |_| {
anyhow::bail!("failpoint after-timeline-dir-creation");
});
Ok(())
}
/// Attempts to create an uninit mark file for the timeline initialization.
/// Bails, if the timeline is already loaded into the memory (i.e. initialized before), or the uninit mark file already exists.
///
/// This way, we need to hold the timelines lock only for small amount of time during the mark check/creation per timeline init.
fn create_timeline_uninit_mark(
/// Get a guard that provides exclusive access to the timeline directory, preventing
/// concurrent attempts to create the same timeline.
fn create_timeline_create_guard(
&self,
timeline_id: TimelineId,
) -> Result<TimelineUninitMark, TimelineExclusionError> {
) -> Result<TimelineCreateGuard, TimelineExclusionError> {
let tenant_shard_id = self.tenant_shard_id;
let uninit_mark_path = self
.conf
.timeline_uninit_mark_file_path(tenant_shard_id, timeline_id);
let timeline_path = self.conf.timeline_path(&tenant_shard_id, &timeline_id);
let uninit_mark = TimelineUninitMark::new(
self,
timeline_id,
uninit_mark_path.clone(),
timeline_path.clone(),
)?;
let create_guard = TimelineCreateGuard::new(self, timeline_id, timeline_path.clone())?;
// At this stage, we have got exclusive access to in-memory state for this timeline ID
// for creation.
@@ -3342,23 +3338,7 @@ impl Tenant {
)));
}
// Create the on-disk uninit mark _after_ the in-memory acquisition of the tenant ID: guarantees
// that during process runtime, colliding creations will be caught in-memory without getting
// as far as failing to write a file.
fs::OpenOptions::new()
.write(true)
.create_new(true)
.open(&uninit_mark_path)
.context("Failed to create uninit mark file")
.and_then(|_| {
crashsafe::fsync_file_and_parent(&uninit_mark_path)
.context("Failed to fsync uninit mark file")
})
.with_context(|| {
format!("Failed to crate uninit mark for timeline {tenant_shard_id}/{timeline_id}")
})?;
Ok(uninit_mark)
Ok(create_guard)
}
/// Gathers inputs from all of the timelines to produce a sizing model input.
@@ -5099,15 +5079,15 @@ mod tests {
}
#[tokio::test]
async fn test_uninit_mark_crash() -> anyhow::Result<()> {
let name = "test_uninit_mark_crash";
async fn test_create_guard_crash() -> anyhow::Result<()> {
let name = "test_create_guard_crash";
let harness = TenantHarness::create(name)?;
{
let (tenant, ctx) = harness.load().await;
let tline = tenant
.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)
.await?;
// Keeps uninit mark in place
// Leave the timeline ID in [`Tenant::timelines_creating`] to exclude attempting to create it again
let raw_tline = tline.raw_timeline().unwrap();
raw_tline
.shutdown()
@@ -5135,11 +5115,6 @@ mod tests {
.timeline_path(&tenant.tenant_shard_id, &TIMELINE_ID)
.exists());
assert!(!harness
.conf
.timeline_uninit_mark_file_path(tenant.tenant_shard_id, TIMELINE_ID)
.exists());
Ok(())
}

View File

@@ -296,6 +296,7 @@ impl DeleteTenantFlow {
remote_storage: Option<GenericRemoteStorage>,
tenants: &'static std::sync::RwLock<TenantsMap>,
tenant: Arc<Tenant>,
cancel: &CancellationToken,
) -> Result<(), DeleteTenantError> {
span::debug_assert_current_span_has_tenant_id();
@@ -303,7 +304,9 @@ impl DeleteTenantFlow {
let mut guard = Self::prepare(&tenant).await?;
if let Err(e) = Self::run_inner(&mut guard, conf, remote_storage.as_ref(), &tenant).await {
if let Err(e) =
Self::run_inner(&mut guard, conf, remote_storage.as_ref(), &tenant, cancel).await
{
tenant.set_broken(format!("{e:#}")).await;
return Err(e);
}
@@ -322,6 +325,7 @@ impl DeleteTenantFlow {
conf: &'static PageServerConf,
remote_storage: Option<&GenericRemoteStorage>,
tenant: &Tenant,
cancel: &CancellationToken,
) -> Result<(), DeleteTenantError> {
guard.mark_in_progress()?;
@@ -335,15 +339,9 @@ impl DeleteTenantFlow {
// Though sounds scary, different mark name?
// Detach currently uses remove_dir_all so in case of a crash we can end up in a weird state.
if let Some(remote_storage) = &remote_storage {
create_remote_delete_mark(
conf,
remote_storage,
&tenant.tenant_shard_id,
// Can't use tenant.cancel, it's already shut down. TODO: wire in an appropriate token
&CancellationToken::new(),
)
.await
.context("remote_mark")?
create_remote_delete_mark(conf, remote_storage, &tenant.tenant_shard_id, cancel)
.await
.context("remote_mark")?
}
fail::fail_point!("tenant-delete-before-create-local-mark", |_| {
@@ -546,8 +544,7 @@ impl DeleteTenantFlow {
conf,
remote_storage.as_ref(),
&tenant.tenant_shard_id,
// Can't use tenant.cancel, it's already shut down. TODO: wire in an appropriate token
&CancellationToken::new(),
&task_mgr::shutdown_token(),
)
.await?;

View File

@@ -102,7 +102,7 @@ pub(crate) enum TenantsMap {
/// [`init_tenant_mgr`] is done, all on-disk tenants have been loaded.
/// New tenants can be added using [`tenant_map_acquire_slot`].
Open(BTreeMap<TenantShardId, TenantSlot>),
/// The pageserver has entered shutdown mode via [`shutdown_all_tenants`].
/// The pageserver has entered shutdown mode via [`TenantManager::shutdown`].
/// Existing tenants are still accessible, but no new tenants can be created.
ShuttingDown(BTreeMap<TenantShardId, TenantSlot>),
}
@@ -261,6 +261,12 @@ pub struct TenantManager {
// See https://github.com/neondatabase/neon/issues/5796
tenants: &'static std::sync::RwLock<TenantsMap>,
resources: TenantSharedResources,
// Long-running operations that happen outside of a [`Tenant`] lifetime should respect this token.
// This is for edge cases like tenant deletion. In normal cases (within a Tenant lifetime),
// tenants have their own cancellation tokens, which we fire individually in [`Self::shutdown`], or
// when the tenant detaches.
cancel: CancellationToken,
}
fn emergency_generations(
@@ -620,13 +626,14 @@ pub async fn init_tenant_mgr(
conf,
tenants: &TENANTS,
resources,
cancel: CancellationToken::new(),
})
}
/// Wrapper for Tenant::spawn that checks invariants before running, and inserts
/// a broken tenant in the map if Tenant::spawn fails.
#[allow(clippy::too_many_arguments)]
pub(crate) fn tenant_spawn(
fn tenant_spawn(
conf: &'static PageServerConf,
tenant_shard_id: TenantShardId,
tenant_path: &Utf8Path,
@@ -680,21 +687,6 @@ pub(crate) fn tenant_spawn(
Ok(tenant)
}
///
/// Shut down all tenants. This runs as part of pageserver shutdown.
///
/// NB: We leave the tenants in the map, so that they remain accessible through
/// the management API until we shut it down. If we removed the shut-down tenants
/// from the tenants map, the management API would return 404 for these tenants,
/// because TenantsMap::get() now returns `None`.
/// That could be easily misinterpreted by control plane, the consumer of the
/// management API. For example, it could attach the tenant on a different pageserver.
/// We would then be in split-brain once this pageserver restarts.
#[instrument(skip_all)]
pub(crate) async fn shutdown_all_tenants() {
shutdown_all_tenants0(&TENANTS).await
}
async fn shutdown_all_tenants0(tenants: &std::sync::RwLock<TenantsMap>) {
let mut join_set = JoinSet::new();
@@ -833,40 +825,6 @@ pub(crate) enum SetNewTenantConfigError {
Other(anyhow::Error),
}
pub(crate) async fn set_new_tenant_config(
conf: &'static PageServerConf,
new_tenant_conf: TenantConfOpt,
tenant_id: TenantId,
) -> Result<(), SetNewTenantConfigError> {
// Legacy API: does not support sharding
let tenant_shard_id = TenantShardId::unsharded(tenant_id);
info!("configuring tenant {tenant_id}");
let tenant = get_tenant(tenant_shard_id, true)?;
if !tenant.tenant_shard_id().shard_count.is_unsharded() {
// Note that we use ShardParameters::default below.
return Err(SetNewTenantConfigError::Other(anyhow::anyhow!(
"This API may only be used on single-sharded tenants, use the /location_config API for sharded tenants"
)));
}
// This is a legacy API that only operates on attached tenants: the preferred
// API to use is the location_config/ endpoint, which lets the caller provide
// the full LocationConf.
let location_conf = LocationConf::attached_single(
new_tenant_conf.clone(),
tenant.generation,
&ShardParameters::default(),
);
Tenant::persist_tenant_config(conf, &tenant_shard_id, &location_conf)
.await
.map_err(SetNewTenantConfigError::Persist)?;
tenant.set_new_tenant_config(new_tenant_conf);
Ok(())
}
#[derive(thiserror::Error, Debug)]
pub(crate) enum UpsertLocationError {
#[error("Bad config request: {0}")]
@@ -1428,6 +1386,7 @@ impl TenantManager {
self.resources.remote_storage.clone(),
&TENANTS,
tenant,
&self.cancel,
)
.await;
@@ -1443,6 +1402,35 @@ impl TenantManager {
new_shard_count: ShardCount,
new_stripe_size: Option<ShardStripeSize>,
ctx: &RequestContext,
) -> anyhow::Result<Vec<TenantShardId>> {
let r = self
.do_shard_split(tenant_shard_id, new_shard_count, new_stripe_size, ctx)
.await;
if r.is_err() {
// Shard splitting might have left the original shard in a partially shut down state (it
// stops the shard's remote timeline client). Reset it to ensure we leave things in
// a working state.
if self.get(tenant_shard_id).is_some() {
tracing::warn!("Resetting {tenant_shard_id} after shard split failure");
if let Err(e) = self.reset_tenant(tenant_shard_id, false, ctx).await {
// Log this error because our return value will still be the original error, not this one. This is
// a severe error: if this happens, we might be leaving behind a tenant that is not fully functional
// (e.g. has uploads disabled). We can't do anything else: if reset fails then shutting the tenant down or
// setting it broken probably won't help either.
tracing::error!("Failed to reset {tenant_shard_id}: {e}");
}
}
}
r
}
pub(crate) async fn do_shard_split(
&self,
tenant_shard_id: TenantShardId,
new_shard_count: ShardCount,
new_stripe_size: Option<ShardStripeSize>,
ctx: &RequestContext,
) -> anyhow::Result<Vec<TenantShardId>> {
let tenant = get_tenant(tenant_shard_id, true)?;
@@ -1477,6 +1465,10 @@ impl TenantManager {
.join(",")
);
fail::fail_point!("shard-split-pre-prepare", |_| Err(anyhow::anyhow!(
"failpoint"
)));
let parent_shard_identity = tenant.shard_identity;
let parent_tenant_conf = tenant.get_tenant_conf();
let parent_generation = tenant.generation;
@@ -1490,6 +1482,10 @@ impl TenantManager {
return Err(e);
}
fail::fail_point!("shard-split-post-prepare", |_| Err(anyhow::anyhow!(
"failpoint"
)));
self.resources.deletion_queue_client.flush_advisory();
// Phase 2: Put the parent shard to InProgress and grab a reference to the parent Tenant
@@ -1511,11 +1507,16 @@ impl TenantManager {
anyhow::bail!("Detached parent shard in the middle of split!")
}
};
fail::fail_point!("shard-split-pre-hardlink", |_| Err(anyhow::anyhow!(
"failpoint"
)));
// Optimization: hardlink layers from the parent into the children, so that they don't have to
// re-download & duplicate the data referenced in their initial IndexPart
self.shard_split_hardlink(parent, child_shards.clone())
.await?;
fail::fail_point!("shard-split-post-hardlink", |_| Err(anyhow::anyhow!(
"failpoint"
)));
// Take a snapshot of where the parent's WAL ingest had got to: we will wait for
// child shards to reach this point.
@@ -1555,6 +1556,10 @@ impl TenantManager {
.await?;
}
fail::fail_point!("shard-split-post-child-conf", |_| Err(anyhow::anyhow!(
"failpoint"
)));
// Phase 4: wait for child chards WAL ingest to catch up to target LSN
for child_shard_id in &child_shards {
let child_shard_id = *child_shard_id;
@@ -1587,6 +1592,10 @@ impl TenantManager {
timeline.timeline_id,
target_lsn
);
fail::fail_point!("shard-split-lsn-wait", |_| Err(anyhow::anyhow!(
"failpoint"
)));
if let Err(e) = timeline.wait_lsn(*target_lsn, ctx).await {
// Failure here might mean shutdown, in any case this part is an optimization
// and we shouldn't hold up the split operation.
@@ -1618,19 +1627,11 @@ impl TenantManager {
let tmp_path = safe_rename_tenant_dir(&local_tenant_directory)
.await
.with_context(|| format!("local tenant directory {local_tenant_directory:?} rename"))?;
task_mgr::spawn(
task_mgr::BACKGROUND_RUNTIME.handle(),
TaskKind::MgmtRequest,
None,
None,
"tenant_files_delete",
false,
async move {
fs::remove_dir_all(tmp_path.as_path())
.await
.with_context(|| format!("tenant directory {:?} deletion", tmp_path))
},
);
self.spawn_background_purge(tmp_path);
fail::fail_point!("shard-split-pre-finish", |_| Err(anyhow::anyhow!(
"failpoint"
)));
parent_slot_guard.drop_old_value()?;
@@ -1763,6 +1764,151 @@ impl TenantManager {
Ok(())
}
///
/// Shut down all tenants. This runs as part of pageserver shutdown.
///
/// NB: We leave the tenants in the map, so that they remain accessible through
/// the management API until we shut it down. If we removed the shut-down tenants
/// from the tenants map, the management API would return 404 for these tenants,
/// because TenantsMap::get() now returns `None`.
/// That could be easily misinterpreted by control plane, the consumer of the
/// management API. For example, it could attach the tenant on a different pageserver.
/// We would then be in split-brain once this pageserver restarts.
#[instrument(skip_all)]
pub(crate) async fn shutdown(&self) {
self.cancel.cancel();
shutdown_all_tenants0(self.tenants).await
}
/// When we have moved a tenant's content to a temporary directory, we may delete it lazily in
/// the background, and thereby avoid blocking any API requests on this deletion completing.
fn spawn_background_purge(&self, tmp_path: Utf8PathBuf) {
// Although we are cleaning up the tenant, this task is not meant to be bound by the lifetime of the tenant in memory.
// After a tenant is detached, there are no more task_mgr tasks for that tenant_id.
let task_tenant_id = None;
task_mgr::spawn(
task_mgr::BACKGROUND_RUNTIME.handle(),
TaskKind::MgmtRequest,
task_tenant_id,
None,
"tenant_files_delete",
false,
async move {
fs::remove_dir_all(tmp_path.as_path())
.await
.with_context(|| format!("tenant directory {:?} deletion", tmp_path))
},
);
}
pub(crate) async fn detach_tenant(
&self,
conf: &'static PageServerConf,
tenant_shard_id: TenantShardId,
detach_ignored: bool,
deletion_queue_client: &DeletionQueueClient,
) -> Result<(), TenantStateError> {
let tmp_path = self
.detach_tenant0(
conf,
&TENANTS,
tenant_shard_id,
detach_ignored,
deletion_queue_client,
)
.await?;
self.spawn_background_purge(tmp_path);
Ok(())
}
async fn detach_tenant0(
&self,
conf: &'static PageServerConf,
tenants: &std::sync::RwLock<TenantsMap>,
tenant_shard_id: TenantShardId,
detach_ignored: bool,
deletion_queue_client: &DeletionQueueClient,
) -> Result<Utf8PathBuf, TenantStateError> {
let tenant_dir_rename_operation = |tenant_id_to_clean: TenantShardId| async move {
let local_tenant_directory = conf.tenant_path(&tenant_id_to_clean);
safe_rename_tenant_dir(&local_tenant_directory)
.await
.with_context(|| {
format!("local tenant directory {local_tenant_directory:?} rename")
})
};
let removal_result = remove_tenant_from_memory(
tenants,
tenant_shard_id,
tenant_dir_rename_operation(tenant_shard_id),
)
.await;
// Flush pending deletions, so that they have a good chance of passing validation
// before this tenant is potentially re-attached elsewhere.
deletion_queue_client.flush_advisory();
// Ignored tenants are not present in memory and will bail the removal from memory operation.
// Before returning the error, check for ignored tenant removal case — we only need to clean its local files then.
if detach_ignored
&& matches!(
removal_result,
Err(TenantStateError::SlotError(TenantSlotError::NotFound(_)))
)
{
let tenant_ignore_mark = conf.tenant_ignore_mark_file_path(&tenant_shard_id);
if tenant_ignore_mark.exists() {
info!("Detaching an ignored tenant");
let tmp_path = tenant_dir_rename_operation(tenant_shard_id)
.await
.with_context(|| {
format!("Ignored tenant {tenant_shard_id} local directory rename")
})?;
return Ok(tmp_path);
}
}
removal_result
}
pub(crate) async fn set_new_tenant_config(
&self,
new_tenant_conf: TenantConfOpt,
tenant_id: TenantId,
) -> Result<(), SetNewTenantConfigError> {
// Legacy API: does not support sharding
let tenant_shard_id = TenantShardId::unsharded(tenant_id);
info!("configuring tenant {tenant_id}");
let tenant = get_tenant(tenant_shard_id, true)?;
if !tenant.tenant_shard_id().shard_count.is_unsharded() {
// Note that we use ShardParameters::default below.
return Err(SetNewTenantConfigError::Other(anyhow::anyhow!(
"This API may only be used on single-sharded tenants, use the /location_config API for sharded tenants"
)));
}
// This is a legacy API that only operates on attached tenants: the preferred
// API to use is the location_config/ endpoint, which lets the caller provide
// the full LocationConf.
let location_conf = LocationConf::attached_single(
new_tenant_conf.clone(),
tenant.generation,
&ShardParameters::default(),
);
Tenant::persist_tenant_config(self.conf, &tenant_shard_id, &location_conf)
.await
.map_err(SetNewTenantConfigError::Persist)?;
tenant.set_new_tenant_config(new_tenant_conf);
Ok(())
}
}
#[derive(Debug, thiserror::Error)]
@@ -1964,87 +2110,6 @@ pub(crate) enum TenantStateError {
Other(#[from] anyhow::Error),
}
pub(crate) async fn detach_tenant(
conf: &'static PageServerConf,
tenant_shard_id: TenantShardId,
detach_ignored: bool,
deletion_queue_client: &DeletionQueueClient,
) -> Result<(), TenantStateError> {
let tmp_path = detach_tenant0(
conf,
&TENANTS,
tenant_shard_id,
detach_ignored,
deletion_queue_client,
)
.await?;
// Although we are cleaning up the tenant, this task is not meant to be bound by the lifetime of the tenant in memory.
// After a tenant is detached, there are no more task_mgr tasks for that tenant_id.
let task_tenant_id = None;
task_mgr::spawn(
task_mgr::BACKGROUND_RUNTIME.handle(),
TaskKind::MgmtRequest,
task_tenant_id,
None,
"tenant_files_delete",
false,
async move {
fs::remove_dir_all(tmp_path.as_path())
.await
.with_context(|| format!("tenant directory {:?} deletion", tmp_path))
},
);
Ok(())
}
async fn detach_tenant0(
conf: &'static PageServerConf,
tenants: &std::sync::RwLock<TenantsMap>,
tenant_shard_id: TenantShardId,
detach_ignored: bool,
deletion_queue_client: &DeletionQueueClient,
) -> Result<Utf8PathBuf, TenantStateError> {
let tenant_dir_rename_operation = |tenant_id_to_clean: TenantShardId| async move {
let local_tenant_directory = conf.tenant_path(&tenant_id_to_clean);
safe_rename_tenant_dir(&local_tenant_directory)
.await
.with_context(|| format!("local tenant directory {local_tenant_directory:?} rename"))
};
let removal_result = remove_tenant_from_memory(
tenants,
tenant_shard_id,
tenant_dir_rename_operation(tenant_shard_id),
)
.await;
// Flush pending deletions, so that they have a good chance of passing validation
// before this tenant is potentially re-attached elsewhere.
deletion_queue_client.flush_advisory();
// Ignored tenants are not present in memory and will bail the removal from memory operation.
// Before returning the error, check for ignored tenant removal case — we only need to clean its local files then.
if detach_ignored
&& matches!(
removal_result,
Err(TenantStateError::SlotError(TenantSlotError::NotFound(_)))
)
{
let tenant_ignore_mark = conf.tenant_ignore_mark_file_path(&tenant_shard_id);
if tenant_ignore_mark.exists() {
info!("Detaching an ignored tenant");
let tmp_path = tenant_dir_rename_operation(tenant_shard_id)
.await
.with_context(|| {
format!("Ignored tenant {tenant_shard_id} local directory rename")
})?;
return Ok(tmp_path);
}
}
removal_result
}
pub(crate) async fn load_tenant(
conf: &'static PageServerConf,
tenant_id: TenantId,
@@ -2665,7 +2730,7 @@ use {
utils::http::error::ApiError,
};
pub(crate) async fn immediate_gc(
pub(crate) fn immediate_gc(
tenant_shard_id: TenantShardId,
timeline_id: TimelineId,
gc_req: TimelineGcRequest,
@@ -2687,6 +2752,8 @@ pub(crate) async fn immediate_gc(
// Run in task_mgr to avoid race with tenant_detach operation
let ctx = ctx.detached_child(TaskKind::GarbageCollector, DownloadBehavior::Download);
let (task_done, wait_task_done) = tokio::sync::oneshot::channel();
let span = info_span!("manual_gc", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), %timeline_id);
// TODO: spawning is redundant now, need to hold the gate
task_mgr::spawn(
&tokio::runtime::Handle::current(),
@@ -2701,16 +2768,15 @@ pub(crate) async fn immediate_gc(
#[allow(unused_mut)]
let mut result = tenant
.gc_iteration(Some(timeline_id), gc_horizon, pitr, &cancel, &ctx)
.instrument(info_span!("manual_gc", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), %timeline_id))
.await;
// FIXME: `gc_iteration` can return an error for multiple reasons; we should handle it
// better once the types support it.
#[cfg(feature = "testing")]
{
// we need to synchronize with drop completion for python tests without polling for
// log messages
if let Ok(result) = result.as_mut() {
// why not futures unordered? it seems it needs very much the same task structure
// but would only run on single task.
let mut js = tokio::task::JoinSet::new();
for layer in std::mem::take(&mut result.doomed_layers) {
js.spawn(layer.wait_drop());
@@ -2726,7 +2792,7 @@ pub(crate) async fn immediate_gc(
if let Some(rtc) = rtc {
// layer drops schedule actions on remote timeline client to actually do the
// deletions; don't care just exit fast about the shutdown error
// deletions; don't care about the shutdown error, just exit fast
drop(rtc.wait_completion().await);
}
}
@@ -2737,6 +2803,7 @@ pub(crate) async fn immediate_gc(
}
Ok(())
}
.instrument(span)
);
// drop the guard until after we've spawned the task so that timeline shutdown will wait for the task

View File

@@ -23,7 +23,7 @@ use crate::tenant::storage_layer::LayerFileName;
use crate::tenant::Generation;
use crate::virtual_file::{on_fatal_io_error, MaybeFatalIo, VirtualFile};
use crate::TEMP_FILE_SUFFIX;
use remote_storage::{DownloadError, GenericRemoteStorage, ListingMode};
use remote_storage::{DownloadError, GenericRemoteStorage, ListingMode, RemotePath};
use utils::crashsafe::path_with_suffix_extension;
use utils::id::TimelineId;
@@ -73,55 +73,13 @@ pub async fn download_layer_file<'a>(
// If pageserver crashes the temp file will be deleted on startup and re-downloaded.
let temp_file_path = path_with_suffix_extension(&local_path, TEMP_DOWNLOAD_EXTENSION);
let (mut destination_file, bytes_amount) = download_retry(
|| async {
let destination_file = tokio::fs::File::create(&temp_file_path)
.await
.with_context(|| format!("create a destination file for layer '{temp_file_path}'"))
.map_err(DownloadError::Other)?;
let download = storage.download(&remote_path, cancel).await?;
let mut destination_file =
tokio::io::BufWriter::with_capacity(super::BUFFER_SIZE, destination_file);
let mut reader = tokio_util::io::StreamReader::new(download.download_stream);
let bytes_amount = tokio::io::copy_buf(&mut reader, &mut destination_file).await;
match bytes_amount {
Ok(bytes_amount) => {
let destination_file = destination_file.into_inner();
Ok((destination_file, bytes_amount))
}
Err(e) => {
if let Err(e) = tokio::fs::remove_file(&temp_file_path).await {
on_fatal_io_error(&e, &format!("Removing temporary file {temp_file_path}"));
}
Err(e.into())
}
}
},
let bytes_amount = download_retry(
|| async { download_object(storage, &remote_path, &temp_file_path, cancel).await },
&format!("download {remote_path:?}"),
cancel,
)
.await?;
// Tokio doc here: https://docs.rs/tokio/1.17.0/tokio/fs/struct.File.html states that:
// A file will not be closed immediately when it goes out of scope if there are any IO operations
// that have not yet completed. To ensure that a file is closed immediately when it is dropped,
// you should call flush before dropping it.
//
// From the tokio code I see that it waits for pending operations to complete. There shouldt be any because
// we assume that `destination_file` file is fully written. I e there is no pending .write(...).await operations.
// But for additional safety lets check/wait for any pending operations.
destination_file
.flush()
.await
.with_context(|| format!("flush source file at {temp_file_path}"))
.map_err(DownloadError::Other)?;
let expected = layer_metadata.file_size();
if expected != bytes_amount {
return Err(DownloadError::Other(anyhow!(
@@ -129,14 +87,6 @@ pub async fn download_layer_file<'a>(
)));
}
// not using sync_data because it can lose file size update
destination_file
.sync_all()
.await
.with_context(|| format!("failed to fsync source file at {temp_file_path}"))
.map_err(DownloadError::Other)?;
drop(destination_file);
fail::fail_point!("remote-storage-download-pre-rename", |_| {
Err(DownloadError::Other(anyhow!(
"remote-storage-download-pre-rename failpoint triggered"
@@ -169,6 +119,128 @@ pub async fn download_layer_file<'a>(
Ok(bytes_amount)
}
/// Download the object `src_path` in the remote `storage` to local path `dst_path`.
///
/// If Ok() is returned, the download succeeded and the inode & data have been made durable.
/// (Note that the directory entry for the inode is not made durable.)
/// The file size in bytes is returned.
///
/// If Err() is returned, there was some error. The file at `dst_path` has been unlinked.
/// The unlinking has _not_ been made durable.
async fn download_object<'a>(
storage: &'a GenericRemoteStorage,
src_path: &RemotePath,
dst_path: &Utf8PathBuf,
cancel: &CancellationToken,
) -> Result<u64, DownloadError> {
let res = match crate::virtual_file::io_engine::get() {
crate::virtual_file::io_engine::IoEngine::NotSet => panic!("unset"),
crate::virtual_file::io_engine::IoEngine::StdFs => {
async {
let destination_file = tokio::fs::File::create(dst_path)
.await
.with_context(|| format!("create a destination file for layer '{dst_path}'"))
.map_err(DownloadError::Other)?;
let download = storage.download(src_path, cancel).await?;
let mut buf_writer =
tokio::io::BufWriter::with_capacity(super::BUFFER_SIZE, destination_file);
let mut reader = tokio_util::io::StreamReader::new(download.download_stream);
let bytes_amount = tokio::io::copy_buf(&mut reader, &mut buf_writer).await?;
buf_writer.flush().await?;
let mut destination_file = buf_writer.into_inner();
// Tokio doc here: https://docs.rs/tokio/1.17.0/tokio/fs/struct.File.html states that:
// A file will not be closed immediately when it goes out of scope if there are any IO operations
// that have not yet completed. To ensure that a file is closed immediately when it is dropped,
// you should call flush before dropping it.
//
// From the tokio code I see that it waits for pending operations to complete. There shouldt be any because
// we assume that `destination_file` file is fully written. I e there is no pending .write(...).await operations.
// But for additional safety lets check/wait for any pending operations.
destination_file
.flush()
.await
.with_context(|| format!("flush source file at {dst_path}"))
.map_err(DownloadError::Other)?;
// not using sync_data because it can lose file size update
destination_file
.sync_all()
.await
.with_context(|| format!("failed to fsync source file at {dst_path}"))
.map_err(DownloadError::Other)?;
Ok(bytes_amount)
}
.await
}
#[cfg(target_os = "linux")]
crate::virtual_file::io_engine::IoEngine::TokioEpollUring => {
use crate::virtual_file::owned_buffers_io::{self, util::size_tracking_writer};
async {
let destination_file = VirtualFile::create(dst_path)
.await
.with_context(|| format!("create a destination file for layer '{dst_path}'"))
.map_err(DownloadError::Other)?;
let mut download = storage.download(src_path, cancel).await?;
// TODO: use vectored write (writev) once supported by tokio-epoll-uring.
// There's chunks_vectored() on the stream.
let (bytes_amount, destination_file) = async {
let size_tracking = size_tracking_writer::Writer::new(destination_file);
let mut buffered = owned_buffers_io::write::BufferedWriter::<
{ super::BUFFER_SIZE },
_,
>::new(size_tracking);
while let Some(res) =
futures::StreamExt::next(&mut download.download_stream).await
{
let chunk = match res {
Ok(chunk) => chunk,
Err(e) => return Err(e),
};
buffered
.write_buffered(tokio_epoll_uring::BoundedBuf::slice_full(chunk))
.await?;
}
let size_tracking = buffered.flush_and_into_inner().await?;
Ok(size_tracking.into_inner())
}
.await?;
// not using sync_data because it can lose file size update
destination_file
.sync_all()
.await
.with_context(|| format!("failed to fsync source file at {dst_path}"))
.map_err(DownloadError::Other)?;
Ok(bytes_amount)
}
.await
}
};
// in case the download failed, clean up
match res {
Ok(bytes_amount) => Ok(bytes_amount),
Err(e) => {
if let Err(e) = tokio::fs::remove_file(dst_path).await {
if e.kind() != std::io::ErrorKind::NotFound {
on_fatal_io_error(&e, &format!("Removing temporary file {dst_path}"));
}
}
Err(e)
}
}
}
const TEMP_DOWNLOAD_EXTENSION: &str = "temp_download";
pub(crate) fn is_temp_download_file(path: &Utf8Path) -> bool {

View File

@@ -95,7 +95,11 @@ pub(crate) struct SecondaryTenant {
shard_identity: ShardIdentity,
tenant_conf: std::sync::Mutex<TenantConfOpt>,
// Internal state used by the Downloader.
detail: std::sync::Mutex<SecondaryDetail>,
// Public state indicating overall progress of downloads relative to the last heatmap seen
pub(crate) progress: std::sync::Mutex<models::SecondaryProgress>,
}
impl SecondaryTenant {
@@ -118,6 +122,8 @@ impl SecondaryTenant {
tenant_conf: std::sync::Mutex::new(tenant_conf),
detail: std::sync::Mutex::new(SecondaryDetail::new(config.clone())),
progress: std::sync::Mutex::default(),
})
}
@@ -247,9 +253,12 @@ impl SecondaryTenant {
}
/// The SecondaryController is a pseudo-rpc client for administrative control of secondary mode downloads,
/// and heatmap uploads. This is not a hot data path: it's primarily a hook for tests,
/// where we want to immediately upload/download for a particular tenant. In normal operation
/// uploads & downloads are autonomous and not driven by this interface.
/// and heatmap uploads. This is not a hot data path: it's used for:
/// - Live migrations, where we want to ensure a migration destination has the freshest possible
/// content before trying to cut over.
/// - Tests, where we want to immediately upload/download for a particular tenant.
///
/// In normal operations, outside of migrations, uploads & downloads are autonomous and not driven by this interface.
pub struct SecondaryController {
upload_req_tx: tokio::sync::mpsc::Sender<CommandRequest<UploadCommand>>,
download_req_tx: tokio::sync::mpsc::Sender<CommandRequest<DownloadCommand>>,

View File

@@ -41,14 +41,16 @@ use crate::tenant::{
use camino::Utf8PathBuf;
use chrono::format::{DelayedFormat, StrftimeItems};
use futures::Future;
use pageserver_api::models::SecondaryProgress;
use pageserver_api::shard::TenantShardId;
use rand::Rng;
use remote_storage::{DownloadError, GenericRemoteStorage};
use remote_storage::{DownloadError, Etag, GenericRemoteStorage};
use tokio_util::sync::CancellationToken;
use tracing::{info_span, instrument, warn, Instrument};
use utils::{
backoff, completion::Barrier, crashsafe::path_with_suffix_extension, fs_ext, id::TimelineId,
backoff, completion::Barrier, crashsafe::path_with_suffix_extension, failpoint_support, fs_ext,
id::TimelineId,
};
use super::{
@@ -128,6 +130,7 @@ pub(super) struct SecondaryDetail {
pub(super) config: SecondaryLocationConfig,
last_download: Option<Instant>,
last_etag: Option<Etag>,
next_download: Option<Instant>,
pub(super) timelines: HashMap<TimelineId, SecondaryDetailTimeline>,
}
@@ -138,11 +141,26 @@ fn strftime(t: &'_ SystemTime) -> DelayedFormat<StrftimeItems<'_>> {
datetime.format("%d/%m/%Y %T")
}
/// Information returned from download function when it detects the heatmap has changed
struct HeatMapModified {
etag: Etag,
last_modified: SystemTime,
bytes: Vec<u8>,
}
enum HeatMapDownload {
// The heatmap's etag has changed: return the new etag, mtime and the body bytes
Modified(HeatMapModified),
// The heatmap's etag is unchanged
Unmodified,
}
impl SecondaryDetail {
pub(super) fn new(config: SecondaryLocationConfig) -> Self {
Self {
config,
last_download: None,
last_etag: None,
next_download: None,
timelines: HashMap::new(),
}
@@ -477,11 +495,31 @@ impl<'a> TenantDownloader<'a> {
};
let tenant_shard_id = self.secondary_state.get_tenant_shard_id();
// We will use the etag from last successful download to make the download conditional on changes
let last_etag = self
.secondary_state
.detail
.lock()
.unwrap()
.last_etag
.clone();
// Download the tenant's heatmap
let heatmap_bytes = tokio::select!(
bytes = self.download_heatmap() => {bytes?},
let HeatMapModified {
last_modified: heatmap_mtime,
etag: heatmap_etag,
bytes: heatmap_bytes,
} = match tokio::select!(
bytes = self.download_heatmap(last_etag.as_ref()) => {bytes?},
_ = self.secondary_state.cancel.cancelled() => return Ok(())
);
) {
HeatMapDownload::Unmodified => {
tracing::info!("Heatmap unchanged since last successful download");
return Ok(());
}
HeatMapDownload::Modified(m) => m,
};
let heatmap = serde_json::from_slice::<HeatMapTenant>(&heatmap_bytes)?;
@@ -498,6 +536,14 @@ impl<'a> TenantDownloader<'a> {
tracing::debug!("Wrote local heatmap to {}", heatmap_path);
// Clean up any local layers that aren't in the heatmap. We do this first for all timelines, on the general
// principle that deletions should be done before writes wherever possible, and so that we can use this
// phase to initialize our SecondaryProgress.
{
*self.secondary_state.progress.lock().unwrap() =
self.prepare_timelines(&heatmap, heatmap_mtime).await?;
}
// Download the layers in the heatmap
for timeline in heatmap.timelines {
if self.secondary_state.cancel.is_cancelled() {
@@ -515,30 +561,159 @@ impl<'a> TenantDownloader<'a> {
.await?;
}
// Only update last_etag after a full successful download: this way will not skip
// the next download, even if the heatmap's actual etag is unchanged.
self.secondary_state.detail.lock().unwrap().last_etag = Some(heatmap_etag);
Ok(())
}
async fn download_heatmap(&self) -> Result<Vec<u8>, UpdateError> {
/// Do any fast local cleanup that comes before the much slower process of downloading
/// layers from remote storage. In the process, initialize the SecondaryProgress object
/// that will later be updated incrementally as we download layers.
async fn prepare_timelines(
&self,
heatmap: &HeatMapTenant,
heatmap_mtime: SystemTime,
) -> Result<SecondaryProgress, UpdateError> {
let heatmap_stats = heatmap.get_stats();
// We will construct a progress object, and then populate its initial "downloaded" numbers
// while iterating through local layer state in [`Self::prepare_timelines`]
let mut progress = SecondaryProgress {
layers_total: heatmap_stats.layers,
bytes_total: heatmap_stats.bytes,
heatmap_mtime: Some(heatmap_mtime),
layers_downloaded: 0,
bytes_downloaded: 0,
};
// Accumulate list of things to delete while holding the detail lock, for execution after dropping the lock
let mut delete_layers = Vec::new();
let mut delete_timelines = Vec::new();
{
let mut detail = self.secondary_state.detail.lock().unwrap();
for (timeline_id, timeline_state) in &mut detail.timelines {
let Some(heatmap_timeline_index) = heatmap
.timelines
.iter()
.position(|t| t.timeline_id == *timeline_id)
else {
// This timeline is no longer referenced in the heatmap: delete it locally
delete_timelines.push(*timeline_id);
continue;
};
let heatmap_timeline = heatmap.timelines.get(heatmap_timeline_index).unwrap();
let layers_in_heatmap = heatmap_timeline
.layers
.iter()
.map(|l| &l.name)
.collect::<HashSet<_>>();
let layers_on_disk = timeline_state
.on_disk_layers
.iter()
.map(|l| l.0)
.collect::<HashSet<_>>();
let mut layer_count = layers_on_disk.len();
let mut layer_byte_count: u64 = timeline_state
.on_disk_layers
.values()
.map(|l| l.metadata.file_size())
.sum();
// Remove on-disk layers that are no longer present in heatmap
for layer in layers_on_disk.difference(&layers_in_heatmap) {
layer_count -= 1;
layer_byte_count -= timeline_state
.on_disk_layers
.get(layer)
.unwrap()
.metadata
.file_size();
delete_layers.push((*timeline_id, (*layer).clone()));
}
progress.bytes_downloaded += layer_byte_count;
progress.layers_downloaded += layer_count;
}
}
// Execute accumulated deletions
for (timeline_id, layer_name) in delete_layers {
let timeline_path = self
.conf
.timeline_path(self.secondary_state.get_tenant_shard_id(), &timeline_id);
let local_path = timeline_path.join(layer_name.to_string());
tracing::info!(timeline_id=%timeline_id, "Removing secondary local layer {layer_name} because it's absent in heatmap",);
tokio::fs::remove_file(&local_path)
.await
.or_else(fs_ext::ignore_not_found)
.maybe_fatal_err("Removing secondary layer")?;
// Update in-memory housekeeping to reflect the absence of the deleted layer
let mut detail = self.secondary_state.detail.lock().unwrap();
let Some(timeline_state) = detail.timelines.get_mut(&timeline_id) else {
continue;
};
timeline_state.on_disk_layers.remove(&layer_name);
}
for timeline_id in delete_timelines {
let timeline_path = self
.conf
.timeline_path(self.secondary_state.get_tenant_shard_id(), &timeline_id);
tracing::info!(timeline_id=%timeline_id,
"Timeline no longer in heatmap, removing from secondary location"
);
tokio::fs::remove_dir_all(&timeline_path)
.await
.or_else(fs_ext::ignore_not_found)
.maybe_fatal_err("Removing secondary timeline")?;
}
Ok(progress)
}
/// Returns downloaded bytes if the etag differs from `prev_etag`, or None if the object
/// still matches `prev_etag`.
async fn download_heatmap(
&self,
prev_etag: Option<&Etag>,
) -> Result<HeatMapDownload, UpdateError> {
debug_assert_current_span_has_tenant_id();
let tenant_shard_id = self.secondary_state.get_tenant_shard_id();
// TODO: make download conditional on ETag having changed since last download
// TODO: pull up etag check into the request, to do a conditional GET rather than
// issuing a GET and then maybe ignoring the response body
// (https://github.com/neondatabase/neon/issues/6199)
tracing::debug!("Downloading heatmap for secondary tenant",);
let heatmap_path = remote_heatmap_path(tenant_shard_id);
let cancel = &self.secondary_state.cancel;
let heatmap_bytes = backoff::retry(
backoff::retry(
|| async {
let download = self
.remote_storage
.download(&heatmap_path, cancel)
.await
.map_err(UpdateError::from)?;
let mut heatmap_bytes = Vec::new();
let mut body = tokio_util::io::StreamReader::new(download.download_stream);
let _size = tokio::io::copy_buf(&mut body, &mut heatmap_bytes).await?;
Ok(heatmap_bytes)
if Some(&download.etag) == prev_etag {
Ok(HeatMapDownload::Unmodified)
} else {
let mut heatmap_bytes = Vec::new();
let mut body = tokio_util::io::StreamReader::new(download.download_stream);
let _size = tokio::io::copy_buf(&mut body, &mut heatmap_bytes).await?;
SECONDARY_MODE.download_heatmap.inc();
Ok(HeatMapDownload::Modified(HeatMapModified {
etag: download.etag,
last_modified: download.last_modified,
bytes: heatmap_bytes,
}))
}
},
|e| matches!(e, UpdateError::NoData | UpdateError::Cancelled),
FAILED_DOWNLOAD_WARN_THRESHOLD,
@@ -548,11 +723,7 @@ impl<'a> TenantDownloader<'a> {
)
.await
.ok_or_else(|| UpdateError::Cancelled)
.and_then(|x| x)?;
SECONDARY_MODE.download_heatmap.inc();
Ok(heatmap_bytes)
.and_then(|x| x)
}
async fn download_timeline(&self, timeline: HeatMapTimeline) -> Result<(), UpdateError> {
@@ -593,27 +764,6 @@ impl<'a> TenantDownloader<'a> {
}
};
let layers_in_heatmap = timeline
.layers
.iter()
.map(|l| &l.name)
.collect::<HashSet<_>>();
let layers_on_disk = timeline_state
.on_disk_layers
.iter()
.map(|l| l.0)
.collect::<HashSet<_>>();
// Remove on-disk layers that are no longer present in heatmap
for layer in layers_on_disk.difference(&layers_in_heatmap) {
let local_path = timeline_path.join(layer.to_string());
tracing::info!("Removing secondary local layer {layer} because it's absent in heatmap",);
tokio::fs::remove_file(&local_path)
.await
.or_else(fs_ext::ignore_not_found)
.maybe_fatal_err("Removing secondary layer")?;
}
// Download heatmap layers that are not present on local disk, or update their
// access time if they are already present.
for layer in timeline.layers {
@@ -662,6 +812,12 @@ impl<'a> TenantDownloader<'a> {
}
}
// Failpoint for simulating slow remote storage
failpoint_support::sleep_millis_async!(
"secondary-layer-download-sleep",
&self.secondary_state.cancel
);
// Note: no backoff::retry wrapper here because download_layer_file does its own retries internally
let downloaded_bytes = match download_layer_file(
self.conf,
@@ -701,6 +857,11 @@ impl<'a> TenantDownloader<'a> {
tokio::fs::remove_file(&local_path)
.await
.or_else(fs_ext::ignore_not_found)?;
} else {
tracing::info!("Downloaded layer {}, size {}", layer.name, downloaded_bytes);
let mut progress = self.secondary_state.progress.lock().unwrap();
progress.bytes_downloaded += downloaded_bytes;
progress.layers_downloaded += 1;
}
SECONDARY_MODE.download_layer.inc();

View File

@@ -62,3 +62,25 @@ impl HeatMapTimeline {
}
}
}
pub(crate) struct HeatMapStats {
pub(crate) bytes: u64,
pub(crate) layers: usize,
}
impl HeatMapTenant {
pub(crate) fn get_stats(&self) -> HeatMapStats {
let mut stats = HeatMapStats {
bytes: 0,
layers: 0,
};
for timeline in &self.timelines {
for layer in &timeline.layers {
stats.layers += 1;
stats.bytes += layer.metadata.file_size;
}
}
stats
}
}

View File

@@ -183,7 +183,13 @@ pub(super) async fn gather_inputs(
// new gc run, which we have no control over. however differently from `Timeline::gc`
// we don't consider the `Timeline::disk_consistent_lsn` at all, because we are not
// actually removing files.
let mut next_gc_cutoff = cmp::min(gc_info.horizon_cutoff, gc_info.pitr_cutoff);
//
// We only consider [`GcInfo::pitr_cutoff`], and not [`GcInfo::horizon_cutoff`], because from
// a user's perspective they have only requested retention up to the time bound (pitr_cutoff), rather
// than a space bound (horizon cutoff). This means that if someone drops a database and waits for their
// PITR interval, they will see synthetic size decrease, even if we are still storing data inside
// horizon_cutoff.
let mut next_gc_cutoff = gc_info.pitr_cutoff;
// If the caller provided a shorter retention period, use that instead of the GC cutoff.
let retention_param_cutoff = if let Some(max_retention_period) = max_retention_period {

View File

@@ -20,6 +20,7 @@ use pageserver_api::keyspace::{KeySpace, KeySpaceRandomAccum};
use pageserver_api::models::{
LayerAccessKind, LayerResidenceEvent, LayerResidenceEventReason, LayerResidenceStatus,
};
use std::borrow::Cow;
use std::cmp::{Ordering, Reverse};
use std::collections::hash_map::Entry;
use std::collections::{BinaryHeap, HashMap};
@@ -427,7 +428,7 @@ impl LayerAccessStatFullDetails {
} = self;
pageserver_api::models::LayerAccessStatFullDetails {
when_millis_since_epoch: system_time_to_millis_since_epoch(when),
task_kind: task_kind.into(), // into static str, powered by strum_macros
task_kind: Cow::Borrowed(task_kind.into()), // into static str, powered by strum_macros
access_kind: *access_kind,
}
}
@@ -525,7 +526,7 @@ impl LayerAccessStats {
.collect(),
task_kind_access_flag: task_kind_flag
.iter()
.map(|task_kind| task_kind.into()) // into static str, powered by strum_macros
.map(|task_kind| Cow::Borrowed(task_kind.into())) // into static str, powered by strum_macros
.collect(),
first: first_access.as_ref().map(|a| a.as_api_model()),
accesses_history: last_accesses.map(|m| m.as_api_model()),

View File

@@ -710,10 +710,6 @@ impl LayerInner {
// disable any scheduled but not yet running eviction deletions for this
let next_version = 1 + self.version.fetch_add(1, Ordering::Relaxed);
// count cancellations, which currently remain largely unexpected
let init_cancelled =
scopeguard::guard((), |_| LAYER_IMPL_METRICS.inc_init_cancelled());
// no need to make the evict_and_wait wait for the actual download to complete
drop(self.status.send(Status::Downloaded));
@@ -722,7 +718,9 @@ impl LayerInner {
.upgrade()
.ok_or_else(|| DownloadError::TimelineShutdown)?;
// FIXME: grab a gate
// count cancellations, which currently remain largely unexpected
let init_cancelled =
scopeguard::guard((), |_| LAYER_IMPL_METRICS.inc_init_cancelled());
let can_ever_evict = timeline.remote_client.as_ref().is_some();
@@ -731,9 +729,17 @@ impl LayerInner {
let needs_download = self
.needs_download()
.await
.map_err(DownloadError::PreStatFailed)?;
.map_err(DownloadError::PreStatFailed);
let permit = if let Some(reason) = needs_download {
let needs_download = match needs_download {
Ok(reason) => reason,
Err(e) => {
scopeguard::ScopeGuard::into_inner(init_cancelled);
return Err(e);
}
};
let (permit, downloaded) = if let Some(reason) = needs_download {
if let NeedsDownload::NotFile(ft) = reason {
return Err(DownloadError::NotFile(ft));
}
@@ -744,36 +750,59 @@ impl LayerInner {
self.wanted_evicted.store(false, Ordering::Release);
if !can_ever_evict {
scopeguard::ScopeGuard::into_inner(init_cancelled);
return Err(DownloadError::NoRemoteStorage);
}
if let Some(ctx) = ctx {
self.check_expected_download(ctx)?;
let res = self.check_expected_download(ctx);
if let Err(e) = res {
scopeguard::ScopeGuard::into_inner(init_cancelled);
return Err(e);
}
}
if !allow_download {
// this does look weird, but for LayerInner the "downloading" means also changing
// internal once related state ...
scopeguard::ScopeGuard::into_inner(init_cancelled);
return Err(DownloadError::DownloadRequired);
}
tracing::info!(%reason, "downloading on-demand");
self.spawn_download_and_wait(timeline, permit).await?
let permit = self.spawn_download_and_wait(timeline, permit).await;
let permit = match permit {
Ok(permit) => permit,
Err(e) => {
scopeguard::ScopeGuard::into_inner(init_cancelled);
return Err(e);
}
};
(permit, true)
} else {
// the file is present locally, probably by a previous but cancelled call to
// get_or_maybe_download. alternatively we might be running without remote storage.
LAYER_IMPL_METRICS.inc_init_needed_no_download();
permit
(permit, false)
};
let since_last_eviction =
self.last_evicted_at.lock().unwrap().map(|ts| ts.elapsed());
if let Some(since_last_eviction) = since_last_eviction {
// FIXME: this will not always be recorded correctly until #6028 (the no
// download needed branch above)
LAYER_IMPL_METRICS.record_redownloaded_after(since_last_eviction);
scopeguard::ScopeGuard::into_inner(init_cancelled);
if downloaded {
let since_last_eviction = self
.last_evicted_at
.lock()
.unwrap()
.take()
.map(|ts| ts.elapsed());
if let Some(since_last_eviction) = since_last_eviction {
LAYER_IMPL_METRICS.record_redownloaded_after(since_last_eviction);
}
}
let res = Arc::new(DownloadedLayer {
@@ -795,8 +824,6 @@ impl LayerInner {
);
}
scopeguard::ScopeGuard::into_inner(init_cancelled);
Ok((ResidentOrWantedEvicted::Resident(res), permit))
}
.instrument(tracing::info_span!("get_or_maybe_download", layer=%self))
@@ -1457,7 +1484,7 @@ impl ResidentLayer {
}
/// Loads all keys stored in the layer. Returns key, lsn and value size.
#[tracing::instrument(skip_all, fields(layer=%self))]
#[tracing::instrument(level = tracing::Level::DEBUG, skip_all, fields(layer=%self))]
pub(crate) async fn load_keys<'a>(
&'a self,
ctx: &RequestContext,
@@ -1477,9 +1504,9 @@ impl ResidentLayer {
// while it's being held.
delta_layer::DeltaLayerInner::load_keys(d, ctx)
.await
.context("Layer index is corrupted")
.with_context(|| format!("Layer index is corrupted for {self}"))
}
Image(_) => anyhow::bail!("cannot load_keys on a image layer"),
Image(_) => anyhow::bail!(format!("cannot load_keys on a image layer {self}")),
}
}

View File

@@ -130,10 +130,10 @@ where
self.inner.load().config.steady_rps()
}
pub async fn throttle(&self, ctx: &RequestContext, key_count: usize) {
pub async fn throttle(&self, ctx: &RequestContext, key_count: usize) -> Option<Duration> {
let inner = self.inner.load_full(); // clones the `Inner` Arc
if !inner.task_kinds.contains(ctx.task_kind()) {
return;
return None;
};
let start = std::time::Instant::now();
let mut did_throttle = false;
@@ -170,6 +170,9 @@ where
});
}
}
Some(wait_time)
} else {
None
}
}
}

View File

@@ -634,6 +634,8 @@ impl Timeline {
/// If a remote layer file is needed, it is downloaded as part of this
/// call.
///
/// This method enforces [`Self::timeline_get_throttle`] internally.
///
/// NOTE: It is considered an error to 'get' a key that doesn't exist. The
/// abstraction above this needs to store suitable metadata to track what
/// data exists with what keys, in separate metadata entries. If a
@@ -644,18 +646,27 @@ impl Timeline {
/// # Cancel-Safety
///
/// This method is cancellation-safe.
#[inline(always)]
pub(crate) async fn get(
&self,
key: Key,
lsn: Lsn,
ctx: &RequestContext,
) -> Result<Bytes, PageReconstructError> {
self.timeline_get_throttle.throttle(ctx, 1).await;
self.get_impl(key, lsn, ctx).await
}
/// Not subject to [`Self::timeline_get_throttle`].
async fn get_impl(
&self,
key: Key,
lsn: Lsn,
ctx: &RequestContext,
) -> Result<Bytes, PageReconstructError> {
if !lsn.is_valid() {
return Err(PageReconstructError::Other(anyhow::anyhow!("Invalid LSN")));
}
self.timeline_get_throttle.throttle(ctx, 1).await;
// This check is debug-only because of the cost of hashing, and because it's a double-check: we
// already checked the key against the shard_identity when looking up the Timeline from
// page_service.
@@ -752,10 +763,6 @@ impl Timeline {
return Err(GetVectoredError::Oversized(key_count));
}
self.timeline_get_throttle
.throttle(ctx, key_count as usize)
.await;
for range in &keyspace.ranges {
let mut key = range.start;
while key != range.end {
@@ -772,11 +779,18 @@ impl Timeline {
self.conf.get_vectored_impl
);
let _timer = crate::metrics::GET_VECTORED_LATENCY
let start = crate::metrics::GET_VECTORED_LATENCY
.for_task_kind(ctx.task_kind())
.map(|t| t.start_timer());
.map(|metric| (metric, Instant::now()));
match self.conf.get_vectored_impl {
// start counting after throttle so that throttle time
// is always less than observation time
let throttled = self
.timeline_get_throttle
.throttle(ctx, key_count as usize)
.await;
let res = match self.conf.get_vectored_impl {
GetVectoredImpl::Sequential => {
self.get_vectored_sequential_impl(keyspace, lsn, ctx).await
}
@@ -790,9 +804,33 @@ impl Timeline {
vectored_res
}
};
if let Some((metric, start)) = start {
let elapsed = start.elapsed();
let ex_throttled = if let Some(throttled) = throttled {
elapsed.checked_sub(throttled)
} else {
Some(elapsed)
};
if let Some(ex_throttled) = ex_throttled {
metric.observe(ex_throttled.as_secs_f64());
} else {
use utils::rate_limit::RateLimit;
static LOGGED: Lazy<Mutex<RateLimit>> =
Lazy::new(|| Mutex::new(RateLimit::new(Duration::from_secs(10))));
let mut rate_limit = LOGGED.lock().unwrap();
rate_limit.call(|| {
warn!("error deducting time spent throttled; this message is logged at a global rate limit");
});
}
}
res
}
/// Not subject to [`Self::timeline_get_throttle`].
pub(super) async fn get_vectored_sequential_impl(
&self,
keyspace: KeySpace,
@@ -803,7 +841,7 @@ impl Timeline {
for range in keyspace.ranges {
let mut key = range.start;
while key != range.end {
let block = self.get(key, lsn, ctx).await;
let block = self.get_impl(key, lsn, ctx).await;
use PageReconstructError::*;
match block {
@@ -853,6 +891,7 @@ impl Timeline {
Ok(results)
}
/// Not subject to [`Self::timeline_get_throttle`].
pub(super) async fn validate_get_vectored_impl(
&self,
vectored_res: &Result<BTreeMap<Key, Result<Bytes, PageReconstructError>>, GetVectoredError>,
@@ -2967,7 +3006,6 @@ impl Timeline {
}
trace!("waking up");
let timer = self.metrics.flush_time_histo.start_timer();
let flush_counter = *layer_flush_start_rx.borrow();
let result = loop {
if self.cancel.is_cancelled() {
@@ -2978,6 +3016,8 @@ impl Timeline {
return;
}
let timer = self.metrics.flush_time_histo.start_timer();
let layer_to_flush = {
let guard = self.layers.read().await;
guard.layer_map().frozen_layers.front().cloned()
@@ -2999,13 +3039,12 @@ impl Timeline {
break err;
}
}
timer.stop_and_record();
};
// Notify any listeners that we're done
let _ = self
.layer_flush_done_tx
.send_replace((flush_counter, result));
timer.stop_and_record();
}
}
@@ -3073,6 +3112,7 @@ impl Timeline {
ctx: &RequestContext,
) -> Result<(), FlushLayerError> {
debug_assert_current_span_has_tenant_and_timeline_id();
// As a special case, when we have just imported an image into the repository,
// instead of writing out a L0 delta layer, we directly write out image layer
// files instead. This is possible as long as *all* the data imported into the
@@ -3744,8 +3784,11 @@ impl Timeline {
// The timestamp is in the future. That sounds impossible,
// but what it really means is that there hasn't been
// any commits since the cutoff timestamp.
//
// In this case we should use the LSN of the most recent commit,
// which is implicitly the last LSN in the log.
debug!("future({})", lsn);
cutoff_horizon
self.get_last_record_lsn()
}
LsnForTimestamp::Past(lsn) => {
debug!("past({})", lsn);

View File

@@ -2,8 +2,8 @@ use std::{collections::hash_map::Entry, fs, sync::Arc};
use anyhow::Context;
use camino::Utf8PathBuf;
use tracing::{error, info, info_span, warn};
use utils::{crashsafe, fs_ext, id::TimelineId, lsn::Lsn};
use tracing::{error, info, info_span};
use utils::{fs_ext, id::TimelineId, lsn::Lsn};
use crate::{context::RequestContext, import_datadir, tenant::Tenant};
@@ -11,22 +11,22 @@ use super::Timeline;
/// A timeline with some of its files on disk, being initialized.
/// This struct ensures the atomicity of the timeline init: it's either properly created and inserted into pageserver's memory, or
/// its local files are removed. In the worst case of a crash, an uninit mark file is left behind, which causes the directory
/// to be removed on next restart.
/// its local files are removed. If we crash while this class exists, then the timeline's local
/// state is cleaned up during [`Tenant::clean_up_timelines`], because the timeline's content isn't in remote storage.
///
/// The caller is responsible for proper timeline data filling before the final init.
#[must_use]
pub struct UninitializedTimeline<'t> {
pub(crate) owning_tenant: &'t Tenant,
timeline_id: TimelineId,
raw_timeline: Option<(Arc<Timeline>, TimelineUninitMark<'t>)>,
raw_timeline: Option<(Arc<Timeline>, TimelineCreateGuard<'t>)>,
}
impl<'t> UninitializedTimeline<'t> {
pub(crate) fn new(
owning_tenant: &'t Tenant,
timeline_id: TimelineId,
raw_timeline: Option<(Arc<Timeline>, TimelineUninitMark<'t>)>,
raw_timeline: Option<(Arc<Timeline>, TimelineCreateGuard<'t>)>,
) -> Self {
Self {
owning_tenant,
@@ -35,8 +35,7 @@ impl<'t> UninitializedTimeline<'t> {
}
}
/// Finish timeline creation: insert it into the Tenant's timelines map and remove the
/// uninit mark file.
/// Finish timeline creation: insert it into the Tenant's timelines map
///
/// This function launches the flush loop if not already done.
///
@@ -72,16 +71,9 @@ impl<'t> UninitializedTimeline<'t> {
Entry::Vacant(v) => {
// after taking here should be no fallible operations, because the drop guard will not
// cleanup after and would block for example the tenant deletion
let (new_timeline, uninit_mark) =
let (new_timeline, _create_guard) =
self.raw_timeline.take().expect("already checked");
// this is the mutual exclusion between different retries to create the timeline;
// this should be an assertion.
uninit_mark.remove_uninit_mark().with_context(|| {
format!(
"Failed to remove uninit mark file for timeline {tenant_shard_id}/{timeline_id}"
)
})?;
v.insert(Arc::clone(&new_timeline));
new_timeline.maybe_spawn_flush_loop();
@@ -120,8 +112,7 @@ impl<'t> UninitializedTimeline<'t> {
.await
.context("Failed to flush after basebackup import")?;
// All the data has been imported. Insert the Timeline into the tenant's timelines
// map and remove the uninit mark file.
// All the data has been imported. Insert the Timeline into the tenant's timelines map
let tl = self.finish_creation()?;
tl.activate(broker_client, None, ctx);
Ok(tl)
@@ -143,37 +134,35 @@ impl<'t> UninitializedTimeline<'t> {
impl Drop for UninitializedTimeline<'_> {
fn drop(&mut self) {
if let Some((_, uninit_mark)) = self.raw_timeline.take() {
if let Some((_, create_guard)) = self.raw_timeline.take() {
let _entered = info_span!("drop_uninitialized_timeline", tenant_id = %self.owning_tenant.tenant_shard_id.tenant_id, shard_id = %self.owning_tenant.tenant_shard_id.shard_slug(), timeline_id = %self.timeline_id).entered();
error!("Timeline got dropped without initializing, cleaning its files");
cleanup_timeline_directory(uninit_mark);
cleanup_timeline_directory(create_guard);
}
}
}
pub(crate) fn cleanup_timeline_directory(uninit_mark: TimelineUninitMark) {
let timeline_path = &uninit_mark.timeline_path;
pub(crate) fn cleanup_timeline_directory(create_guard: TimelineCreateGuard) {
let timeline_path = &create_guard.timeline_path;
match fs_ext::ignore_absent_files(|| fs::remove_dir_all(timeline_path)) {
Ok(()) => {
info!("Timeline dir {timeline_path:?} removed successfully, removing the uninit mark")
info!("Timeline dir {timeline_path:?} removed successfully")
}
Err(e) => {
error!("Failed to clean up uninitialized timeline directory {timeline_path:?}: {e:?}")
}
}
drop(uninit_mark); // mark handles its deletion on drop, gets retained if timeline dir exists
// Having cleaned up, we can release this TimelineId in `[Tenant::timelines_creating]` to allow other
// timeline creation attempts under this TimelineId to proceed
drop(create_guard);
}
/// An uninit mark file, created along the timeline dir to ensure the timeline either gets fully initialized and loaded into pageserver's memory,
/// or gets removed eventually.
///
/// XXX: it's important to create it near the timeline dir, not inside it to ensure timeline dir gets removed first.
/// A guard for timeline creations in process: as long as this object exists, the timeline ID
/// is kept in `[Tenant::timelines_creating]` to exclude concurrent attempts to create the same timeline.
#[must_use]
pub(crate) struct TimelineUninitMark<'t> {
pub(crate) struct TimelineCreateGuard<'t> {
owning_tenant: &'t Tenant,
timeline_id: TimelineId,
uninit_mark_deleted: bool,
uninit_mark_path: Utf8PathBuf,
pub(crate) timeline_path: Utf8PathBuf,
}
@@ -190,11 +179,10 @@ pub(crate) enum TimelineExclusionError {
Other(#[from] anyhow::Error),
}
impl<'t> TimelineUninitMark<'t> {
impl<'t> TimelineCreateGuard<'t> {
pub(crate) fn new(
owning_tenant: &'t Tenant,
timeline_id: TimelineId,
uninit_mark_path: Utf8PathBuf,
timeline_path: Utf8PathBuf,
) -> Result<Self, TimelineExclusionError> {
// Lock order: this is the only place we take both locks. During drop() we only
@@ -214,56 +202,14 @@ impl<'t> TimelineUninitMark<'t> {
Ok(Self {
owning_tenant,
timeline_id,
uninit_mark_deleted: false,
uninit_mark_path,
timeline_path,
})
}
}
fn remove_uninit_mark(mut self) -> anyhow::Result<()> {
if !self.uninit_mark_deleted {
self.delete_mark_file_if_present()?;
}
Ok(())
}
fn delete_mark_file_if_present(&mut self) -> anyhow::Result<()> {
let uninit_mark_file = &self.uninit_mark_path;
let uninit_mark_parent = uninit_mark_file
.parent()
.with_context(|| format!("Uninit mark file {uninit_mark_file:?} has no parent"))?;
fs_ext::ignore_absent_files(|| fs::remove_file(uninit_mark_file)).with_context(|| {
format!("Failed to remove uninit mark file at path {uninit_mark_file:?}")
})?;
crashsafe::fsync(uninit_mark_parent).context("Failed to fsync uninit mark parent")?;
self.uninit_mark_deleted = true;
Ok(())
}
}
impl Drop for TimelineUninitMark<'_> {
impl Drop for TimelineCreateGuard<'_> {
fn drop(&mut self) {
if !self.uninit_mark_deleted {
if self.timeline_path.exists() {
error!(
"Uninit mark {} is not removed, timeline {} stays uninitialized",
self.uninit_mark_path, self.timeline_path
)
} else {
// unblock later timeline creation attempts
warn!(
"Removing intermediate uninit mark file {}",
self.uninit_mark_path
);
if let Err(e) = self.delete_mark_file_if_present() {
error!("Failed to remove the uninit mark file: {e}")
}
}
}
self.owning_tenant
.timelines_creating
.lock()

View File

@@ -448,6 +448,7 @@ pub(super) async fn handle_walreceiver_connection(
disk_consistent_lsn,
remote_consistent_lsn,
replytime: ts,
shard_number: timeline.tenant_shard_id.shard_number.0 as u32,
};
debug!("neon_status_update {status_update:?}");

View File

@@ -28,12 +28,31 @@ use tokio::time::Instant;
pub use pageserver_api::models::virtual_file as api;
pub(crate) mod io_engine;
pub use io_engine::feature_test as io_engine_feature_test;
pub use io_engine::FeatureTestResult as IoEngineFeatureTestResult;
mod metadata;
mod open_options;
pub(crate) use io_engine::IoEngineKind;
pub(crate) use metadata::Metadata;
pub(crate) use open_options::*;
#[cfg_attr(not(target_os = "linux"), allow(dead_code))]
pub(crate) mod owned_buffers_io {
//! Abstractions for IO with owned buffers.
//!
//! Not actually tied to [`crate::virtual_file`] specifically, but, it's the primary
//! reason we need this abstraction.
//!
//! Over time, this could move into the `tokio-epoll-uring` crate, maybe `uring-common`,
//! but for the time being we're proving out the primitives in the neon.git repo
//! for faster iteration.
pub(crate) mod write;
pub(crate) mod util {
pub(crate) mod size_tracking_writer;
}
}
///
/// A virtual file descriptor. You can use this just like std::fs::File, but internally
/// the underlying file is closed if the system is low on file descriptors,

View File

@@ -6,6 +6,11 @@
//! Initialize using [`init`].
//!
//! Then use [`get`] and [`super::OpenOptions`].
//!
//!
#[cfg(target_os = "linux")]
pub(super) mod tokio_epoll_uring_ext;
use tokio_epoll_uring::{IoBuf, Slice};
use tracing::Instrument;
@@ -145,7 +150,7 @@ impl IoEngine {
}
#[cfg(target_os = "linux")]
IoEngine::TokioEpollUring => {
let system = tokio_epoll_uring::thread_local_system().await;
let system = tokio_epoll_uring_ext::thread_local_system().await;
let (resources, res) = system.read(file_guard, offset, buf).await;
(resources, res.map_err(epoll_uring_error_to_std))
}
@@ -160,7 +165,7 @@ impl IoEngine {
}
#[cfg(target_os = "linux")]
IoEngine::TokioEpollUring => {
let system = tokio_epoll_uring::thread_local_system().await;
let system = tokio_epoll_uring_ext::thread_local_system().await;
let (resources, res) = system.fsync(file_guard).await;
(resources, res.map_err(epoll_uring_error_to_std))
}
@@ -178,7 +183,7 @@ impl IoEngine {
}
#[cfg(target_os = "linux")]
IoEngine::TokioEpollUring => {
let system = tokio_epoll_uring::thread_local_system().await;
let system = tokio_epoll_uring_ext::thread_local_system().await;
let (resources, res) = system.fdatasync(file_guard).await;
(resources, res.map_err(epoll_uring_error_to_std))
}
@@ -197,7 +202,7 @@ impl IoEngine {
}
#[cfg(target_os = "linux")]
IoEngine::TokioEpollUring => {
let system = tokio_epoll_uring::thread_local_system().await;
let system = tokio_epoll_uring_ext::thread_local_system().await;
let (resources, res) = system.statx(file_guard).await;
(
resources,
@@ -220,7 +225,7 @@ impl IoEngine {
}
#[cfg(target_os = "linux")]
IoEngine::TokioEpollUring => {
let system = tokio_epoll_uring::thread_local_system().await;
let system = tokio_epoll_uring_ext::thread_local_system().await;
let (resources, res) = system.write(file_guard, offset, buf).await;
(resources, res.map_err(epoll_uring_error_to_std))
}
@@ -253,3 +258,82 @@ impl IoEngine {
}
}
}
pub enum FeatureTestResult {
PlatformPreferred(IoEngineKind),
Worse {
engine: IoEngineKind,
remark: String,
},
}
impl FeatureTestResult {
#[cfg(target_os = "linux")]
const PLATFORM_PREFERRED: IoEngineKind = IoEngineKind::TokioEpollUring;
#[cfg(not(target_os = "linux"))]
const PLATFORM_PREFERRED: IoEngineKind = IoEngineKind::StdFs;
}
impl From<FeatureTestResult> for IoEngineKind {
fn from(val: FeatureTestResult) -> Self {
match val {
FeatureTestResult::PlatformPreferred(e) => e,
FeatureTestResult::Worse { engine, .. } => engine,
}
}
}
/// Somewhat costly under the hood, do only once.
/// Panics if we can't set up the feature test.
pub fn feature_test() -> anyhow::Result<FeatureTestResult> {
std::thread::spawn(|| {
#[cfg(not(target_os = "linux"))]
{
Ok(FeatureTestResult::PlatformPreferred(
FeatureTestResult::PLATFORM_PREFERRED,
))
}
#[cfg(target_os = "linux")]
{
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
Ok(match rt.block_on(tokio_epoll_uring::System::launch()) {
Ok(_) => FeatureTestResult::PlatformPreferred({
assert!(matches!(
IoEngineKind::TokioEpollUring,
FeatureTestResult::PLATFORM_PREFERRED
));
FeatureTestResult::PLATFORM_PREFERRED
}),
Err(tokio_epoll_uring::LaunchResult::IoUringBuild(e)) => {
let remark = match e.raw_os_error() {
Some(nix::libc::EPERM) => {
// fall back
"creating tokio-epoll-uring fails with EPERM, assuming it's admin-disabled "
.to_string()
}
Some(nix::libc::EFAULT) => {
// fail feature test
anyhow::bail!(
"creating tokio-epoll-uring fails with EFAULT, might have corrupted memory"
);
}
Some(_) | None => {
// fall back
format!("creating tokio-epoll-uring fails with error: {e:#}")
}
};
FeatureTestResult::Worse {
engine: IoEngineKind::StdFs,
remark,
}
}
})
}
})
.join()
.unwrap()
}

View File

@@ -0,0 +1,194 @@
//! Like [`::tokio_epoll_uring::thread_local_system()`], but with pageserver-specific
//! handling in case the instance can't launched.
//!
//! This is primarily necessary due to ENOMEM aka OutOfMemory errors during io_uring creation
//! on older kernels, such as some (but not all) older kernels in the Linux 5.10 series.
//! See <https://github.com/neondatabase/neon/issues/6373#issuecomment-1905814391> for more details.
use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
use std::sync::Arc;
use tokio_util::sync::CancellationToken;
use tracing::{error, info, info_span, warn, Instrument};
use utils::backoff::{DEFAULT_BASE_BACKOFF_SECONDS, DEFAULT_MAX_BACKOFF_SECONDS};
use tokio_epoll_uring::{System, SystemHandle};
use crate::virtual_file::on_fatal_io_error;
use crate::metrics::tokio_epoll_uring as metrics;
#[derive(Clone)]
struct ThreadLocalState(Arc<ThreadLocalStateInner>);
struct ThreadLocalStateInner {
cell: tokio::sync::OnceCell<SystemHandle>,
launch_attempts: AtomicU32,
/// populated through fetch_add from [`THREAD_LOCAL_STATE_ID`]
thread_local_state_id: u64,
}
impl ThreadLocalState {
pub fn new() -> Self {
Self(Arc::new(ThreadLocalStateInner {
cell: tokio::sync::OnceCell::default(),
launch_attempts: AtomicU32::new(0),
thread_local_state_id: THREAD_LOCAL_STATE_ID.fetch_add(1, Ordering::Relaxed),
}))
}
pub fn make_id_string(&self) -> String {
format!("{}", self.0.thread_local_state_id)
}
}
static THREAD_LOCAL_STATE_ID: AtomicU64 = AtomicU64::new(0);
thread_local! {
static THREAD_LOCAL: ThreadLocalState = ThreadLocalState::new();
}
/// Panics if we cannot [`System::launch`].
pub async fn thread_local_system() -> Handle {
let fake_cancel = CancellationToken::new();
loop {
let thread_local_state = THREAD_LOCAL.with(|arc| arc.clone());
let inner = &thread_local_state.0;
let get_or_init_res = inner
.cell
.get_or_try_init(|| async {
let attempt_no = inner
.launch_attempts
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
let span = info_span!("tokio_epoll_uring_ext::thread_local_system", thread_local=%thread_local_state.make_id_string(), %attempt_no);
async {
// Rate-limit retries per thread-local.
// NB: doesn't yield to executor at attempt_no=0.
utils::backoff::exponential_backoff(
attempt_no,
DEFAULT_BASE_BACKOFF_SECONDS,
DEFAULT_MAX_BACKOFF_SECONDS,
&fake_cancel,
)
.await;
let res = System::launch()
// this might move us to another executor thread => loop outside the get_or_try_init, not inside it
.await;
match res {
Ok(system) => {
info!("successfully launched system");
metrics::THREAD_LOCAL_LAUNCH_SUCCESSES.inc();
Ok(system)
}
Err(tokio_epoll_uring::LaunchResult::IoUringBuild(e)) if e.kind() == std::io::ErrorKind::OutOfMemory => {
warn!("not enough locked memory to tokio-epoll-uring, will retry");
info_span!("stats").in_scope(|| {
emit_launch_failure_process_stats();
});
metrics::THREAD_LOCAL_LAUNCH_FAILURES.inc();
Err(())
}
// abort the process instead of panicking because pageserver usually becomes half-broken if we panic somewhere.
// This is equivalent to a fatal IO error.
Err(ref e @ tokio_epoll_uring::LaunchResult::IoUringBuild(ref inner)) => {
error!(error=%e, "failed to launch thread-local tokio-epoll-uring, this should not happen, aborting process");
info_span!("stats").in_scope(|| {
emit_launch_failure_process_stats();
});
on_fatal_io_error(inner, "launch thread-local tokio-epoll-uring");
},
}
}
.instrument(span)
.await
})
.await;
if get_or_init_res.is_ok() {
return Handle(thread_local_state);
}
}
}
fn emit_launch_failure_process_stats() {
// tokio-epoll-uring stats
// vmlck + rlimit
// number of threads
// rss / system memory usage generally
let tokio_epoll_uring::metrics::Metrics {
systems_created,
systems_destroyed,
} = tokio_epoll_uring::metrics::global();
info!(systems_created, systems_destroyed, "tokio-epoll-uring");
match procfs::process::Process::myself() {
Ok(myself) => {
match myself.limits() {
Ok(limits) => {
info!(?limits.max_locked_memory, "/proc/self/limits");
}
Err(error) => {
info!(%error, "no limit stats due to error");
}
}
match myself.status() {
Ok(status) => {
let procfs::process::Status {
vmsize,
vmlck,
vmpin,
vmrss,
rssanon,
rssfile,
rssshmem,
vmdata,
vmstk,
vmexe,
vmlib,
vmpte,
threads,
..
} = status;
info!(
vmsize,
vmlck,
vmpin,
vmrss,
rssanon,
rssfile,
rssshmem,
vmdata,
vmstk,
vmexe,
vmlib,
vmpte,
threads,
"/proc/self/status"
);
}
Err(error) => {
info!(%error, "no status status due to error");
}
}
}
Err(error) => {
info!(%error, "no process stats due to error");
}
};
}
#[derive(Clone)]
pub struct Handle(ThreadLocalState);
impl std::ops::Deref for Handle {
type Target = SystemHandle;
fn deref(&self) -> &Self::Target {
self.0
.0
.cell
.get()
.expect("must be already initialized when using this")
}
}

View File

@@ -98,7 +98,7 @@ impl OpenOptions {
OpenOptions::StdFs(x) => x.open(path).map(|file| file.into()),
#[cfg(target_os = "linux")]
OpenOptions::TokioEpollUring(x) => {
let system = tokio_epoll_uring::thread_local_system().await;
let system = super::io_engine::tokio_epoll_uring_ext::thread_local_system().await;
system.open(path, x).await.map_err(|e| match e {
tokio_epoll_uring::Error::Op(e) => e,
tokio_epoll_uring::Error::System(system) => {

View File

@@ -0,0 +1,34 @@
use crate::virtual_file::{owned_buffers_io::write::OwnedAsyncWriter, VirtualFile};
use tokio_epoll_uring::{BoundedBuf, IoBuf};
pub struct Writer {
dst: VirtualFile,
bytes_amount: u64,
}
impl Writer {
pub fn new(dst: VirtualFile) -> Self {
Self {
dst,
bytes_amount: 0,
}
}
/// Returns the wrapped `VirtualFile` object as well as the number
/// of bytes that were written to it through this object.
pub fn into_inner(self) -> (u64, VirtualFile) {
(self.bytes_amount, self.dst)
}
}
impl OwnedAsyncWriter for Writer {
#[inline(always)]
async fn write_all<B: BoundedBuf<Buf = Buf>, Buf: IoBuf + Send>(
&mut self,
buf: B,
) -> std::io::Result<(usize, B::Buf)> {
let (buf, res) = self.dst.write_all(buf).await;
let nwritten = res?;
self.bytes_amount += u64::try_from(nwritten).unwrap();
Ok((nwritten, buf))
}
}

View File

@@ -0,0 +1,206 @@
use bytes::BytesMut;
use tokio_epoll_uring::{BoundedBuf, IoBuf, Slice};
/// A trait for doing owned-buffer write IO.
/// Think [`tokio::io::AsyncWrite`] but with owned buffers.
pub trait OwnedAsyncWriter {
async fn write_all<B: BoundedBuf<Buf = Buf>, Buf: IoBuf + Send>(
&mut self,
buf: B,
) -> std::io::Result<(usize, B::Buf)>;
}
/// A wrapper aorund an [`OwnedAsyncWriter`] that batches smaller writers
/// into `BUFFER_SIZE`-sized writes.
///
/// # Passthrough Of Large Writers
///
/// Buffered writes larger than the `BUFFER_SIZE` cause the internal
/// buffer to be flushed, even if it is not full yet. Then, the large
/// buffered write is passed through to the unerlying [`OwnedAsyncWriter`].
///
/// This pass-through is generally beneficial for throughput, but if
/// the storage backend of the [`OwnedAsyncWriter`] is a shared resource,
/// unlimited large writes may cause latency or fairness issues.
///
/// In such cases, a different implementation that always buffers in memory
/// may be preferable.
pub struct BufferedWriter<const BUFFER_SIZE: usize, W> {
writer: W,
// invariant: always remains Some(buf)
// with buf.capacity() == BUFFER_SIZE except
// - while IO is ongoing => goes back to Some() once the IO completed successfully
// - after an IO error => stays `None` forever
// In these exceptional cases, it's `None`.
buf: Option<BytesMut>,
}
impl<const BUFFER_SIZE: usize, W> BufferedWriter<BUFFER_SIZE, W>
where
W: OwnedAsyncWriter,
{
pub fn new(writer: W) -> Self {
Self {
writer,
buf: Some(BytesMut::with_capacity(BUFFER_SIZE)),
}
}
pub async fn flush_and_into_inner(mut self) -> std::io::Result<W> {
self.flush().await?;
let Self { buf, writer } = self;
assert!(buf.is_some());
Ok(writer)
}
pub async fn write_buffered<B: IoBuf>(&mut self, chunk: Slice<B>) -> std::io::Result<()>
where
B: IoBuf + Send,
{
// avoid memcpy for the middle of the chunk
if chunk.len() >= BUFFER_SIZE {
self.flush().await?;
// do a big write, bypassing `buf`
assert_eq!(
self.buf
.as_ref()
.expect("must not use after an error")
.len(),
0
);
let chunk_len = chunk.len();
let (nwritten, chunk) = self.writer.write_all(chunk).await?;
assert_eq!(nwritten, chunk_len);
drop(chunk);
return Ok(());
}
// in-memory copy the < BUFFER_SIZED tail of the chunk
assert!(chunk.len() < BUFFER_SIZE);
let mut chunk = &chunk[..];
while !chunk.is_empty() {
let buf = self.buf.as_mut().expect("must not use after an error");
let need = BUFFER_SIZE - buf.len();
let have = chunk.len();
let n = std::cmp::min(need, have);
buf.extend_from_slice(&chunk[..n]);
chunk = &chunk[n..];
if buf.len() >= BUFFER_SIZE {
assert_eq!(buf.len(), BUFFER_SIZE);
self.flush().await?;
}
}
assert!(chunk.is_empty(), "by now we should have drained the chunk");
Ok(())
}
async fn flush(&mut self) -> std::io::Result<()> {
let buf = self.buf.take().expect("must not use after an error");
if buf.is_empty() {
self.buf = Some(buf);
return std::io::Result::Ok(());
}
let buf_len = buf.len();
let (nwritten, mut buf) = self.writer.write_all(buf).await?;
assert_eq!(nwritten, buf_len);
buf.clear();
self.buf = Some(buf);
Ok(())
}
}
impl OwnedAsyncWriter for Vec<u8> {
async fn write_all<B: BoundedBuf<Buf = Buf>, Buf: IoBuf + Send>(
&mut self,
buf: B,
) -> std::io::Result<(usize, B::Buf)> {
let nbytes = buf.bytes_init();
if nbytes == 0 {
return Ok((0, Slice::into_inner(buf.slice_full())));
}
let buf = buf.slice(0..nbytes);
self.extend_from_slice(&buf[..]);
Ok((buf.len(), Slice::into_inner(buf)))
}
}
#[cfg(test)]
mod tests {
use super::*;
#[derive(Default)]
struct RecorderWriter {
writes: Vec<Vec<u8>>,
}
impl OwnedAsyncWriter for RecorderWriter {
async fn write_all<B: BoundedBuf<Buf = Buf>, Buf: IoBuf + Send>(
&mut self,
buf: B,
) -> std::io::Result<(usize, B::Buf)> {
let nbytes = buf.bytes_init();
if nbytes == 0 {
self.writes.push(vec![]);
return Ok((0, Slice::into_inner(buf.slice_full())));
}
let buf = buf.slice(0..nbytes);
self.writes.push(Vec::from(&buf[..]));
Ok((buf.len(), Slice::into_inner(buf)))
}
}
macro_rules! write {
($writer:ident, $data:literal) => {{
$writer
.write_buffered(::bytes::Bytes::from_static($data).slice_full())
.await?;
}};
}
#[tokio::test]
async fn test_buffered_writes_only() -> std::io::Result<()> {
let recorder = RecorderWriter::default();
let mut writer = BufferedWriter::<2, _>::new(recorder);
write!(writer, b"a");
write!(writer, b"b");
write!(writer, b"c");
write!(writer, b"d");
write!(writer, b"e");
let recorder = writer.flush_and_into_inner().await?;
assert_eq!(
recorder.writes,
vec![Vec::from(b"ab"), Vec::from(b"cd"), Vec::from(b"e")]
);
Ok(())
}
#[tokio::test]
async fn test_passthrough_writes_only() -> std::io::Result<()> {
let recorder = RecorderWriter::default();
let mut writer = BufferedWriter::<2, _>::new(recorder);
write!(writer, b"abc");
write!(writer, b"de");
write!(writer, b"");
write!(writer, b"fghijk");
let recorder = writer.flush_and_into_inner().await?;
assert_eq!(
recorder.writes,
vec![Vec::from(b"abc"), Vec::from(b"de"), Vec::from(b"fghijk")]
);
Ok(())
}
#[tokio::test]
async fn test_passthrough_write_with_nonempty_buffer() -> std::io::Result<()> {
let recorder = RecorderWriter::default();
let mut writer = BufferedWriter::<2, _>::new(recorder);
write!(writer, b"a");
write!(writer, b"bc");
write!(writer, b"d");
write!(writer, b"e");
let recorder = writer.flush_and_into_inner().await?;
assert_eq!(
recorder.writes,
vec![Vec::from(b"a"), Vec::from(b"bc"), Vec::from(b"de")]
);
Ok(())
}
}

View File

@@ -109,6 +109,8 @@ impl WalIngest {
self.checkpoint_modified = true;
}
failpoint_support::sleep_millis_async!("wal-ingest-record-sleep");
match decoded.xl_rmid {
pg_constants::RM_HEAP_ID | pg_constants::RM_HEAP2_ID => {
// Heap AM records need some special handling, because they modify VM pages