mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-31 03:50:37 +00:00
Merge branch 'main' into ps-thread-pool-2
This commit is contained in:
@@ -7,7 +7,9 @@ use pageserver::layered_repository::dump_layerfile_from_path;
|
||||
use pageserver::page_cache;
|
||||
use pageserver::virtual_file;
|
||||
use std::path::PathBuf;
|
||||
use utils::GIT_VERSION;
|
||||
use utils::project_git_version;
|
||||
|
||||
project_git_version!(GIT_VERSION);
|
||||
|
||||
fn main() -> Result<()> {
|
||||
let arg_matches = App::new("Zenith dump_layerfile utility")
|
||||
|
||||
@@ -26,24 +26,24 @@ use utils::{
|
||||
http::endpoint,
|
||||
logging,
|
||||
postgres_backend::AuthType,
|
||||
project_git_version,
|
||||
shutdown::exit_now,
|
||||
signals::{self, Signal},
|
||||
tcp_listener,
|
||||
zid::{ZTenantId, ZTimelineId},
|
||||
GIT_VERSION,
|
||||
};
|
||||
|
||||
project_git_version!(GIT_VERSION);
|
||||
|
||||
fn version() -> String {
|
||||
format!(
|
||||
"{} profiling:{} failpoints:{}",
|
||||
GIT_VERSION,
|
||||
"{GIT_VERSION} profiling:{} failpoints:{}",
|
||||
cfg!(feature = "profiling"),
|
||||
fail::has_failpoints()
|
||||
)
|
||||
}
|
||||
|
||||
fn main() -> anyhow::Result<()> {
|
||||
metrics::set_common_metrics_prefix("pageserver");
|
||||
let arg_matches = App::new("Zenith page server")
|
||||
.about("Materializes WAL stream to pages and serves them to the postgres")
|
||||
.version(&*version())
|
||||
@@ -103,6 +103,8 @@ fn main() -> anyhow::Result<()> {
|
||||
let features: &[&str] = &[
|
||||
#[cfg(feature = "failpoints")]
|
||||
"failpoints",
|
||||
#[cfg(feature = "profiling")]
|
||||
"profiling",
|
||||
];
|
||||
println!("{{\"features\": {features:?} }}");
|
||||
return Ok(());
|
||||
@@ -188,13 +190,8 @@ fn main() -> anyhow::Result<()> {
|
||||
// as a ref.
|
||||
let conf: &'static PageServerConf = Box::leak(Box::new(conf));
|
||||
|
||||
// If failpoints are used, terminate the whole pageserver process if they are hit.
|
||||
// Initialize up failpoints support
|
||||
let scenario = FailScenario::setup();
|
||||
if fail::has_failpoints() {
|
||||
std::panic::set_hook(Box::new(|_| {
|
||||
std::process::exit(1);
|
||||
}));
|
||||
}
|
||||
|
||||
// Basic initialization of things that don't change after startup
|
||||
virtual_file::init(conf.max_file_descriptors);
|
||||
@@ -223,7 +220,7 @@ fn start_pageserver(conf: &'static PageServerConf, daemonize: bool) -> Result<()
|
||||
// Initialize logger
|
||||
let log_file = logging::init(LOG_FILE_NAME, daemonize)?;
|
||||
|
||||
info!("version: {}", GIT_VERSION);
|
||||
info!("version: {GIT_VERSION}");
|
||||
|
||||
// TODO: Check that it looks like a valid repository before going further
|
||||
|
||||
@@ -263,7 +260,7 @@ fn start_pageserver(conf: &'static PageServerConf, daemonize: bool) -> Result<()
|
||||
// Otherwise, the coverage data will be damaged.
|
||||
match daemonize.exit_action(|| exit_now(0)).start() {
|
||||
Ok(_) => info!("Success, daemonized"),
|
||||
Err(err) => error!(%err, "could not daemonize"),
|
||||
Err(err) => bail!("{err}. could not daemonize. bailing."),
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -6,7 +6,9 @@ use clap::{App, Arg};
|
||||
use pageserver::layered_repository::metadata::TimelineMetadata;
|
||||
use std::path::PathBuf;
|
||||
use std::str::FromStr;
|
||||
use utils::{lsn::Lsn, GIT_VERSION};
|
||||
use utils::{lsn::Lsn, project_git_version};
|
||||
|
||||
project_git_version!(GIT_VERSION);
|
||||
|
||||
fn main() -> Result<()> {
|
||||
let arg_matches = App::new("Zenith update metadata utility")
|
||||
|
||||
@@ -13,6 +13,7 @@ use std::str::FromStr;
|
||||
use std::time::Duration;
|
||||
use toml_edit;
|
||||
use toml_edit::{Document, Item};
|
||||
use url::Url;
|
||||
use utils::{
|
||||
postgres_backend::AuthType,
|
||||
zid::{ZNodeId, ZTenantId, ZTimelineId},
|
||||
@@ -111,6 +112,13 @@ pub struct PageServerConf {
|
||||
|
||||
pub profiling: ProfilingConfig,
|
||||
pub default_tenant_conf: TenantConf,
|
||||
|
||||
/// A prefix to add in etcd brokers before every key.
|
||||
/// Can be used for isolating different pageserver groups withing the same etcd cluster.
|
||||
pub broker_etcd_prefix: String,
|
||||
|
||||
/// Etcd broker endpoints to connect to.
|
||||
pub broker_endpoints: Vec<Url>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
@@ -175,6 +183,8 @@ struct PageServerConfigBuilder {
|
||||
id: BuilderValue<ZNodeId>,
|
||||
|
||||
profiling: BuilderValue<ProfilingConfig>,
|
||||
broker_etcd_prefix: BuilderValue<String>,
|
||||
broker_endpoints: BuilderValue<Vec<Url>>,
|
||||
}
|
||||
|
||||
impl Default for PageServerConfigBuilder {
|
||||
@@ -200,6 +210,8 @@ impl Default for PageServerConfigBuilder {
|
||||
remote_storage_config: Set(None),
|
||||
id: NotSet,
|
||||
profiling: Set(ProfilingConfig::Disabled),
|
||||
broker_etcd_prefix: Set(etcd_broker::DEFAULT_NEON_BROKER_ETCD_PREFIX.to_string()),
|
||||
broker_endpoints: Set(Vec::new()),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -256,6 +268,14 @@ impl PageServerConfigBuilder {
|
||||
self.remote_storage_config = BuilderValue::Set(remote_storage_config)
|
||||
}
|
||||
|
||||
pub fn broker_endpoints(&mut self, broker_endpoints: Vec<Url>) {
|
||||
self.broker_endpoints = BuilderValue::Set(broker_endpoints)
|
||||
}
|
||||
|
||||
pub fn broker_etcd_prefix(&mut self, broker_etcd_prefix: String) {
|
||||
self.broker_etcd_prefix = BuilderValue::Set(broker_etcd_prefix)
|
||||
}
|
||||
|
||||
pub fn id(&mut self, node_id: ZNodeId) {
|
||||
self.id = BuilderValue::Set(node_id)
|
||||
}
|
||||
@@ -264,7 +284,11 @@ impl PageServerConfigBuilder {
|
||||
self.profiling = BuilderValue::Set(profiling)
|
||||
}
|
||||
|
||||
pub fn build(self) -> Result<PageServerConf> {
|
||||
pub fn build(self) -> anyhow::Result<PageServerConf> {
|
||||
let broker_endpoints = self
|
||||
.broker_endpoints
|
||||
.ok_or(anyhow!("No broker endpoints provided"))?;
|
||||
|
||||
Ok(PageServerConf {
|
||||
listen_pg_addr: self
|
||||
.listen_pg_addr
|
||||
@@ -300,6 +324,10 @@ impl PageServerConfigBuilder {
|
||||
profiling: self.profiling.ok_or(anyhow!("missing profiling"))?,
|
||||
// TenantConf is handled separately
|
||||
default_tenant_conf: TenantConf::default(),
|
||||
broker_endpoints,
|
||||
broker_etcd_prefix: self
|
||||
.broker_etcd_prefix
|
||||
.ok_or(anyhow!("missing broker_etcd_prefix"))?,
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -341,7 +369,7 @@ impl PageServerConf {
|
||||
/// validating the input and failing on errors.
|
||||
///
|
||||
/// This leaves any options not present in the file in the built-in defaults.
|
||||
pub fn parse_and_validate(toml: &Document, workdir: &Path) -> Result<Self> {
|
||||
pub fn parse_and_validate(toml: &Document, workdir: &Path) -> anyhow::Result<Self> {
|
||||
let mut builder = PageServerConfigBuilder::default();
|
||||
builder.workdir(workdir.to_owned());
|
||||
|
||||
@@ -373,6 +401,17 @@ impl PageServerConf {
|
||||
}
|
||||
"id" => builder.id(ZNodeId(parse_toml_u64(key, item)?)),
|
||||
"profiling" => builder.profiling(parse_toml_from_str(key, item)?),
|
||||
"broker_etcd_prefix" => builder.broker_etcd_prefix(parse_toml_string(key, item)?),
|
||||
"broker_endpoints" => builder.broker_endpoints(
|
||||
parse_toml_array(key, item)?
|
||||
.into_iter()
|
||||
.map(|endpoint_str| {
|
||||
endpoint_str.parse::<Url>().with_context(|| {
|
||||
format!("Array item {endpoint_str} for key {key} is not a valid url endpoint")
|
||||
})
|
||||
})
|
||||
.collect::<anyhow::Result<_>>()?,
|
||||
),
|
||||
_ => bail!("unrecognized pageserver option '{key}'"),
|
||||
}
|
||||
}
|
||||
@@ -526,6 +565,8 @@ impl PageServerConf {
|
||||
remote_storage_config: None,
|
||||
profiling: ProfilingConfig::Disabled,
|
||||
default_tenant_conf: TenantConf::dummy_conf(),
|
||||
broker_endpoints: Vec::new(),
|
||||
broker_etcd_prefix: etcd_broker::DEFAULT_NEON_BROKER_ETCD_PREFIX.to_string(),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -576,14 +617,36 @@ fn parse_toml_duration(name: &str, item: &Item) -> Result<Duration> {
|
||||
Ok(humantime::parse_duration(s)?)
|
||||
}
|
||||
|
||||
fn parse_toml_from_str<T>(name: &str, item: &Item) -> Result<T>
|
||||
fn parse_toml_from_str<T>(name: &str, item: &Item) -> anyhow::Result<T>
|
||||
where
|
||||
T: FromStr<Err = anyhow::Error>,
|
||||
T: FromStr,
|
||||
<T as FromStr>::Err: std::fmt::Display,
|
||||
{
|
||||
let v = item
|
||||
.as_str()
|
||||
.with_context(|| format!("configure option {name} is not a string"))?;
|
||||
T::from_str(v)
|
||||
T::from_str(v).map_err(|e| {
|
||||
anyhow!(
|
||||
"Failed to parse string as {parse_type} for configure option {name}: {e}",
|
||||
parse_type = stringify!(T)
|
||||
)
|
||||
})
|
||||
}
|
||||
|
||||
fn parse_toml_array(name: &str, item: &Item) -> anyhow::Result<Vec<String>> {
|
||||
let array = item
|
||||
.as_array()
|
||||
.with_context(|| format!("configure option {name} is not an array"))?;
|
||||
|
||||
array
|
||||
.iter()
|
||||
.map(|value| {
|
||||
value
|
||||
.as_str()
|
||||
.map(str::to_string)
|
||||
.with_context(|| format!("Array item {value:?} for key {name} is not a string"))
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
@@ -616,12 +679,16 @@ id = 10
|
||||
fn parse_defaults() -> anyhow::Result<()> {
|
||||
let tempdir = tempdir()?;
|
||||
let (workdir, pg_distrib_dir) = prepare_fs(&tempdir)?;
|
||||
// we have to create dummy pathes to overcome the validation errors
|
||||
let config_string = format!("pg_distrib_dir='{}'\nid=10", pg_distrib_dir.display());
|
||||
let broker_endpoint = "http://127.0.0.1:7777";
|
||||
// we have to create dummy values to overcome the validation errors
|
||||
let config_string = format!(
|
||||
"pg_distrib_dir='{}'\nid=10\nbroker_endpoints = ['{broker_endpoint}']",
|
||||
pg_distrib_dir.display()
|
||||
);
|
||||
let toml = config_string.parse()?;
|
||||
|
||||
let parsed_config = PageServerConf::parse_and_validate(&toml, &workdir)
|
||||
.unwrap_or_else(|e| panic!("Failed to parse config '{config_string}', reason: {e}"));
|
||||
.unwrap_or_else(|e| panic!("Failed to parse config '{config_string}', reason: {e:?}"));
|
||||
|
||||
assert_eq!(
|
||||
parsed_config,
|
||||
@@ -641,6 +708,10 @@ id = 10
|
||||
remote_storage_config: None,
|
||||
profiling: ProfilingConfig::Disabled,
|
||||
default_tenant_conf: TenantConf::default(),
|
||||
broker_endpoints: vec![broker_endpoint
|
||||
.parse()
|
||||
.expect("Failed to parse a valid broker endpoint URL")],
|
||||
broker_etcd_prefix: etcd_broker::DEFAULT_NEON_BROKER_ETCD_PREFIX.to_string(),
|
||||
},
|
||||
"Correct defaults should be used when no config values are provided"
|
||||
);
|
||||
@@ -652,15 +723,16 @@ id = 10
|
||||
fn parse_basic_config() -> anyhow::Result<()> {
|
||||
let tempdir = tempdir()?;
|
||||
let (workdir, pg_distrib_dir) = prepare_fs(&tempdir)?;
|
||||
let broker_endpoint = "http://127.0.0.1:7777";
|
||||
|
||||
let config_string = format!(
|
||||
"{ALL_BASE_VALUES_TOML}pg_distrib_dir='{}'",
|
||||
"{ALL_BASE_VALUES_TOML}pg_distrib_dir='{}'\nbroker_endpoints = ['{broker_endpoint}']",
|
||||
pg_distrib_dir.display()
|
||||
);
|
||||
let toml = config_string.parse()?;
|
||||
|
||||
let parsed_config = PageServerConf::parse_and_validate(&toml, &workdir)
|
||||
.unwrap_or_else(|e| panic!("Failed to parse config '{config_string}', reason: {e}"));
|
||||
.unwrap_or_else(|e| panic!("Failed to parse config '{config_string}', reason: {e:?}"));
|
||||
|
||||
assert_eq!(
|
||||
parsed_config,
|
||||
@@ -680,6 +752,10 @@ id = 10
|
||||
remote_storage_config: None,
|
||||
profiling: ProfilingConfig::Disabled,
|
||||
default_tenant_conf: TenantConf::default(),
|
||||
broker_endpoints: vec![broker_endpoint
|
||||
.parse()
|
||||
.expect("Failed to parse a valid broker endpoint URL")],
|
||||
broker_etcd_prefix: etcd_broker::DEFAULT_NEON_BROKER_ETCD_PREFIX.to_string(),
|
||||
},
|
||||
"Should be able to parse all basic config values correctly"
|
||||
);
|
||||
@@ -691,6 +767,7 @@ id = 10
|
||||
fn parse_remote_fs_storage_config() -> anyhow::Result<()> {
|
||||
let tempdir = tempdir()?;
|
||||
let (workdir, pg_distrib_dir) = prepare_fs(&tempdir)?;
|
||||
let broker_endpoint = "http://127.0.0.1:7777";
|
||||
|
||||
let local_storage_path = tempdir.path().join("local_remote_storage");
|
||||
|
||||
@@ -710,6 +787,7 @@ local_path = '{}'"#,
|
||||
let config_string = format!(
|
||||
r#"{ALL_BASE_VALUES_TOML}
|
||||
pg_distrib_dir='{}'
|
||||
broker_endpoints = ['{broker_endpoint}']
|
||||
|
||||
{remote_storage_config_str}"#,
|
||||
pg_distrib_dir.display(),
|
||||
@@ -718,7 +796,9 @@ pg_distrib_dir='{}'
|
||||
let toml = config_string.parse()?;
|
||||
|
||||
let parsed_remote_storage_config = PageServerConf::parse_and_validate(&toml, &workdir)
|
||||
.unwrap_or_else(|e| panic!("Failed to parse config '{config_string}', reason: {e}"))
|
||||
.unwrap_or_else(|e| {
|
||||
panic!("Failed to parse config '{config_string}', reason: {e:?}")
|
||||
})
|
||||
.remote_storage_config
|
||||
.expect("Should have remote storage config for the local FS");
|
||||
|
||||
@@ -728,7 +808,7 @@ pg_distrib_dir='{}'
|
||||
max_concurrent_syncs: NonZeroUsize::new(
|
||||
remote_storage::DEFAULT_REMOTE_STORAGE_MAX_CONCURRENT_SYNCS
|
||||
)
|
||||
.unwrap(),
|
||||
.unwrap(),
|
||||
max_sync_errors: NonZeroU32::new(remote_storage::DEFAULT_REMOTE_STORAGE_MAX_SYNC_ERRORS)
|
||||
.unwrap(),
|
||||
storage: RemoteStorageKind::LocalFs(local_storage_path.clone()),
|
||||
@@ -751,6 +831,7 @@ pg_distrib_dir='{}'
|
||||
let max_concurrent_syncs = NonZeroUsize::new(111).unwrap();
|
||||
let max_sync_errors = NonZeroU32::new(222).unwrap();
|
||||
let s3_concurrency_limit = NonZeroUsize::new(333).unwrap();
|
||||
let broker_endpoint = "http://127.0.0.1:7777";
|
||||
|
||||
let identical_toml_declarations = &[
|
||||
format!(
|
||||
@@ -773,6 +854,7 @@ concurrency_limit = {s3_concurrency_limit}"#
|
||||
let config_string = format!(
|
||||
r#"{ALL_BASE_VALUES_TOML}
|
||||
pg_distrib_dir='{}'
|
||||
broker_endpoints = ['{broker_endpoint}']
|
||||
|
||||
{remote_storage_config_str}"#,
|
||||
pg_distrib_dir.display(),
|
||||
@@ -781,7 +863,9 @@ pg_distrib_dir='{}'
|
||||
let toml = config_string.parse()?;
|
||||
|
||||
let parsed_remote_storage_config = PageServerConf::parse_and_validate(&toml, &workdir)
|
||||
.unwrap_or_else(|e| panic!("Failed to parse config '{config_string}', reason: {e}"))
|
||||
.unwrap_or_else(|e| {
|
||||
panic!("Failed to parse config '{config_string}', reason: {e:?}")
|
||||
})
|
||||
.remote_storage_config
|
||||
.expect("Should have remote storage config for S3");
|
||||
|
||||
|
||||
@@ -123,6 +123,53 @@ paths:
|
||||
schema:
|
||||
$ref: "#/components/schemas/Error"
|
||||
|
||||
/v1/tenant/{tenant_id}/timeline/{timeline_id}/wal_receiver:
|
||||
parameters:
|
||||
- name: tenant_id
|
||||
in: path
|
||||
required: true
|
||||
schema:
|
||||
type: string
|
||||
format: hex
|
||||
- name: timeline_id
|
||||
in: path
|
||||
required: true
|
||||
schema:
|
||||
type: string
|
||||
format: hex
|
||||
get:
|
||||
description: Get wal receiver's data attached to the timeline
|
||||
responses:
|
||||
"200":
|
||||
description: WalReceiverEntry
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/WalReceiverEntry"
|
||||
"401":
|
||||
description: Unauthorized Error
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/UnauthorizedError"
|
||||
"403":
|
||||
description: Forbidden Error
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/ForbiddenError"
|
||||
"404":
|
||||
description: Error when no wal receiver is running or found
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/NotFoundError"
|
||||
"500":
|
||||
description: Generic operation error
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/Error"
|
||||
|
||||
/v1/tenant/{tenant_id}/timeline/{timeline_id}/attach:
|
||||
parameters:
|
||||
@@ -520,6 +567,21 @@ components:
|
||||
type: integer
|
||||
current_logical_size_non_incremental:
|
||||
type: integer
|
||||
WalReceiverEntry:
|
||||
type: object
|
||||
required:
|
||||
- thread_id
|
||||
- wal_producer_connstr
|
||||
properties:
|
||||
thread_id:
|
||||
type: integer
|
||||
wal_producer_connstr:
|
||||
type: string
|
||||
last_received_msg_lsn:
|
||||
type: string
|
||||
format: hex
|
||||
last_received_msg_ts:
|
||||
type: integer
|
||||
|
||||
Error:
|
||||
type: object
|
||||
|
||||
@@ -224,6 +224,30 @@ async fn timeline_detail_handler(request: Request<Body>) -> Result<Response<Body
|
||||
json_response(StatusCode::OK, timeline_info)
|
||||
}
|
||||
|
||||
async fn wal_receiver_get_handler(request: Request<Body>) -> Result<Response<Body>, ApiError> {
|
||||
let tenant_id: ZTenantId = parse_request_param(&request, "tenant_id")?;
|
||||
check_permission(&request, Some(tenant_id))?;
|
||||
|
||||
let timeline_id: ZTimelineId = parse_request_param(&request, "timeline_id")?;
|
||||
|
||||
let wal_receiver = tokio::task::spawn_blocking(move || {
|
||||
let _enter =
|
||||
info_span!("wal_receiver_get", tenant = %tenant_id, timeline = %timeline_id).entered();
|
||||
|
||||
crate::walreceiver::get_wal_receiver_entry(tenant_id, timeline_id)
|
||||
})
|
||||
.await
|
||||
.map_err(ApiError::from_err)?
|
||||
.ok_or_else(|| {
|
||||
ApiError::NotFound(format!(
|
||||
"WAL receiver not found for tenant {} and timeline {}",
|
||||
tenant_id, timeline_id
|
||||
))
|
||||
})?;
|
||||
|
||||
json_response(StatusCode::OK, wal_receiver)
|
||||
}
|
||||
|
||||
async fn timeline_attach_handler(request: Request<Body>) -> Result<Response<Body>, ApiError> {
|
||||
let tenant_id: ZTenantId = parse_request_param(&request, "tenant_id")?;
|
||||
check_permission(&request, Some(tenant_id))?;
|
||||
@@ -485,6 +509,10 @@ pub fn make_router(
|
||||
"/v1/tenant/:tenant_id/timeline/:timeline_id",
|
||||
timeline_detail_handler,
|
||||
)
|
||||
.get(
|
||||
"/v1/tenant/:tenant_id/timeline/:timeline_id/wal_receiver",
|
||||
wal_receiver_get_handler,
|
||||
)
|
||||
.post(
|
||||
"/v1/tenant/:tenant_id/timeline/:timeline_id/attach",
|
||||
timeline_attach_handler,
|
||||
|
||||
@@ -74,6 +74,7 @@ pub mod metadata;
|
||||
mod par_fsync;
|
||||
mod storage_layer;
|
||||
|
||||
use crate::pgdatadir_mapping::LsnForTimestamp;
|
||||
use delta_layer::{DeltaLayer, DeltaLayerWriter};
|
||||
use ephemeral_file::is_ephemeral_file;
|
||||
use filename::{DeltaFileName, ImageFileName};
|
||||
@@ -81,6 +82,7 @@ use image_layer::{ImageLayer, ImageLayerWriter};
|
||||
use inmemory_layer::InMemoryLayer;
|
||||
use layer_map::LayerMap;
|
||||
use layer_map::SearchResult;
|
||||
use postgres_ffi::xlog_utils::to_pg_timestamp;
|
||||
use storage_layer::{Layer, ValueReconstructResult, ValueReconstructState};
|
||||
|
||||
// re-export this function so that page_cache.rs can use it.
|
||||
@@ -89,7 +91,7 @@ pub use crate::layered_repository::ephemeral_file::writeback as writeback_epheme
|
||||
// Metrics collected on operations on the storage repository.
|
||||
lazy_static! {
|
||||
static ref STORAGE_TIME: HistogramVec = register_histogram_vec!(
|
||||
"pageserver_storage_time",
|
||||
"pageserver_storage_operations_seconds",
|
||||
"Time spent on storage operations",
|
||||
&["operation", "tenant_id", "timeline_id"]
|
||||
)
|
||||
@@ -99,8 +101,8 @@ lazy_static! {
|
||||
// Metrics collected on operations on the storage repository.
|
||||
lazy_static! {
|
||||
static ref RECONSTRUCT_TIME: HistogramVec = register_histogram_vec!(
|
||||
"pageserver_getpage_reconstruct_time",
|
||||
"Time spent on storage operations",
|
||||
"pageserver_getpage_reconstruct_seconds",
|
||||
"Time spent in reconstruct_value",
|
||||
&["tenant_id", "timeline_id"]
|
||||
)
|
||||
.expect("failed to define a metric");
|
||||
@@ -108,13 +110,13 @@ lazy_static! {
|
||||
|
||||
lazy_static! {
|
||||
static ref MATERIALIZED_PAGE_CACHE_HIT: IntCounterVec = register_int_counter_vec!(
|
||||
"materialize_page_cache_hits",
|
||||
"pageserver_materialized_cache_hits_total",
|
||||
"Number of cache hits from materialized page cache",
|
||||
&["tenant_id", "timeline_id"]
|
||||
)
|
||||
.expect("failed to define a metric");
|
||||
static ref WAIT_LSN_TIME: HistogramVec = register_histogram_vec!(
|
||||
"wait_lsn_time",
|
||||
"pageserver_wait_lsn_seconds",
|
||||
"Time spent waiting for WAL to arrive",
|
||||
&["tenant_id", "timeline_id"]
|
||||
)
|
||||
@@ -134,12 +136,12 @@ lazy_static! {
|
||||
// or in testing they estimate how much we would upload if we did.
|
||||
lazy_static! {
|
||||
static ref NUM_PERSISTENT_FILES_CREATED: IntCounter = register_int_counter!(
|
||||
"pageserver_num_persistent_files_created",
|
||||
"pageserver_created_persistent_files_total",
|
||||
"Number of files created that are meant to be uploaded to cloud storage",
|
||||
)
|
||||
.expect("failed to define a metric");
|
||||
static ref PERSISTENT_BYTES_WRITTEN: IntCounter = register_int_counter!(
|
||||
"pageserver_persistent_bytes_written",
|
||||
"pageserver_written_persistent_bytes_total",
|
||||
"Total bytes written that are meant to be uploaded to cloud storage",
|
||||
)
|
||||
.expect("failed to define a metric");
|
||||
@@ -1355,7 +1357,9 @@ impl LayeredTimeline {
|
||||
let mut timeline_owned;
|
||||
let mut timeline = self;
|
||||
|
||||
let mut path: Vec<(ValueReconstructResult, Lsn, Arc<dyn Layer>)> = Vec::new();
|
||||
// For debugging purposes, collect the path of layers that we traversed
|
||||
// through. It's included in the error message if we fail to find the key.
|
||||
let mut traversal_path: Vec<(ValueReconstructResult, Lsn, Arc<dyn Layer>)> = Vec::new();
|
||||
|
||||
let cached_lsn = if let Some((cached_lsn, _)) = &reconstruct_state.img {
|
||||
*cached_lsn
|
||||
@@ -1385,32 +1389,24 @@ impl LayeredTimeline {
|
||||
if prev_lsn <= cont_lsn {
|
||||
// Didn't make any progress in last iteration. Error out to avoid
|
||||
// getting stuck in the loop.
|
||||
|
||||
// For debugging purposes, print the path of layers that we traversed
|
||||
// through.
|
||||
for (r, c, l) in path {
|
||||
error!(
|
||||
"PATH: result {:?}, cont_lsn {}, layer: {}",
|
||||
r,
|
||||
c,
|
||||
l.filename().display()
|
||||
);
|
||||
}
|
||||
bail!("could not find layer with more data for key {} at LSN {}, request LSN {}, ancestor {}",
|
||||
key,
|
||||
Lsn(cont_lsn.0 - 1),
|
||||
request_lsn,
|
||||
timeline.ancestor_lsn)
|
||||
return layer_traversal_error(format!(
|
||||
"could not find layer with more data for key {} at LSN {}, request LSN {}, ancestor {}",
|
||||
key,
|
||||
Lsn(cont_lsn.0 - 1),
|
||||
request_lsn,
|
||||
timeline.ancestor_lsn
|
||||
), traversal_path);
|
||||
}
|
||||
prev_lsn = cont_lsn;
|
||||
}
|
||||
ValueReconstructResult::Missing => {
|
||||
bail!(
|
||||
"could not find data for key {} at LSN {}, for request at LSN {}",
|
||||
key,
|
||||
cont_lsn,
|
||||
request_lsn
|
||||
)
|
||||
return layer_traversal_error(
|
||||
format!(
|
||||
"could not find data for key {} at LSN {}, for request at LSN {}",
|
||||
key, cont_lsn, request_lsn
|
||||
),
|
||||
traversal_path,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1445,7 +1441,7 @@ impl LayeredTimeline {
|
||||
reconstruct_state,
|
||||
)?;
|
||||
cont_lsn = lsn_floor;
|
||||
path.push((result, cont_lsn, open_layer.clone()));
|
||||
traversal_path.push((result, cont_lsn, open_layer.clone()));
|
||||
continue;
|
||||
}
|
||||
}
|
||||
@@ -1460,7 +1456,7 @@ impl LayeredTimeline {
|
||||
reconstruct_state,
|
||||
)?;
|
||||
cont_lsn = lsn_floor;
|
||||
path.push((result, cont_lsn, frozen_layer.clone()));
|
||||
traversal_path.push((result, cont_lsn, frozen_layer.clone()));
|
||||
continue 'outer;
|
||||
}
|
||||
}
|
||||
@@ -1475,7 +1471,7 @@ impl LayeredTimeline {
|
||||
reconstruct_state,
|
||||
)?;
|
||||
cont_lsn = lsn_floor;
|
||||
path.push((result, cont_lsn, layer));
|
||||
traversal_path.push((result, cont_lsn, layer));
|
||||
} else if timeline.ancestor_timeline.is_some() {
|
||||
// Nothing on this timeline. Traverse to parent
|
||||
result = ValueReconstructResult::Continue;
|
||||
@@ -1512,7 +1508,7 @@ impl LayeredTimeline {
|
||||
.ensure_loaded()
|
||||
.with_context(|| {
|
||||
format!(
|
||||
"Ancestor timeline is not is not loaded. Timeline id: {} Ancestor id {:?}",
|
||||
"Ancestor timeline is not loaded. Timeline id: {} Ancestor id {:?}",
|
||||
self.timeline_id,
|
||||
self.get_ancestor_timeline_id(),
|
||||
)
|
||||
@@ -1619,22 +1615,30 @@ impl LayeredTimeline {
|
||||
pub fn check_checkpoint_distance(self: &Arc<LayeredTimeline>) -> Result<()> {
|
||||
let last_lsn = self.get_last_record_lsn();
|
||||
|
||||
// Has more than 'checkpoint_distance' of WAL been accumulated?
|
||||
let distance = last_lsn.widening_sub(self.last_freeze_at.load());
|
||||
if distance >= self.get_checkpoint_distance().into() {
|
||||
// Yes. Freeze the current in-memory layer.
|
||||
self.freeze_inmem_layer(true);
|
||||
self.last_freeze_at.store(last_lsn);
|
||||
}
|
||||
if let Ok(guard) = self.layer_flush_lock.try_lock() {
|
||||
drop(guard);
|
||||
let self_clone = Arc::clone(self);
|
||||
thread_mgr::spawn(
|
||||
thread_mgr::ThreadKind::LayerFlushThread,
|
||||
Some(self.tenant_id),
|
||||
Some(self.timeline_id),
|
||||
"layer flush thread",
|
||||
false,
|
||||
move || self_clone.flush_frozen_layers(false),
|
||||
)?;
|
||||
|
||||
// Launch a thread to flush the frozen layer to disk, unless
|
||||
// a thread was already running. (If the thread was running
|
||||
// at the time that we froze the layer, it must've seen the
|
||||
// the layer we just froze before it exited; see comments
|
||||
// in flush_frozen_layers())
|
||||
if let Ok(guard) = self.layer_flush_lock.try_lock() {
|
||||
drop(guard);
|
||||
let self_clone = Arc::clone(self);
|
||||
thread_mgr::spawn(
|
||||
thread_mgr::ThreadKind::LayerFlushThread,
|
||||
Some(self.tenant_id),
|
||||
Some(self.timeline_id),
|
||||
"layer flush thread",
|
||||
false,
|
||||
move || self_clone.flush_frozen_layers(false),
|
||||
)?;
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
@@ -1942,41 +1946,87 @@ impl LayeredTimeline {
|
||||
Ok(new_path)
|
||||
}
|
||||
|
||||
///
|
||||
/// Collect a bunch of Level 0 layer files, and compact and reshuffle them as
|
||||
/// as Level 1 files.
|
||||
///
|
||||
fn compact_level0(&self, target_file_size: u64) -> Result<()> {
|
||||
let layers = self.layers.read().unwrap();
|
||||
|
||||
let level0_deltas = layers.get_level0_deltas()?;
|
||||
|
||||
// We compact or "shuffle" the level-0 delta layers when they've
|
||||
// accumulated over the compaction threshold.
|
||||
if level0_deltas.len() < self.get_compaction_threshold() {
|
||||
return Ok(());
|
||||
}
|
||||
let mut level0_deltas = layers.get_level0_deltas()?;
|
||||
drop(layers);
|
||||
|
||||
// FIXME: this function probably won't work correctly if there's overlap
|
||||
// in the deltas.
|
||||
let lsn_range = level0_deltas
|
||||
.iter()
|
||||
.map(|l| l.get_lsn_range())
|
||||
.reduce(|a, b| min(a.start, b.start)..max(a.end, b.end))
|
||||
.unwrap();
|
||||
// Only compact if enough layers have accumulated.
|
||||
if level0_deltas.is_empty() || level0_deltas.len() < self.get_compaction_threshold() {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let all_values_iter = level0_deltas.iter().map(|l| l.iter()).kmerge_by(|a, b| {
|
||||
if let Ok((a_key, a_lsn, _)) = a {
|
||||
if let Ok((b_key, b_lsn, _)) = b {
|
||||
match a_key.cmp(b_key) {
|
||||
Ordering::Less => true,
|
||||
Ordering::Equal => a_lsn <= b_lsn,
|
||||
Ordering::Greater => false,
|
||||
// Gather the files to compact in this iteration.
|
||||
//
|
||||
// Start with the oldest Level 0 delta file, and collect any other
|
||||
// level 0 files that form a contiguous sequence, such that the end
|
||||
// LSN of previous file matches the start LSN of the next file.
|
||||
//
|
||||
// Note that if the files don't form such a sequence, we might
|
||||
// "compact" just a single file. That's a bit pointless, but it allows
|
||||
// us to get rid of the level 0 file, and compact the other files on
|
||||
// the next iteration. This could probably made smarter, but such
|
||||
// "gaps" in the sequence of level 0 files should only happen in case
|
||||
// of a crash, partial download from cloud storage, or something like
|
||||
// that, so it's not a big deal in practice.
|
||||
level0_deltas.sort_by_key(|l| l.get_lsn_range().start);
|
||||
let mut level0_deltas_iter = level0_deltas.iter();
|
||||
|
||||
let first_level0_delta = level0_deltas_iter.next().unwrap();
|
||||
let mut prev_lsn_end = first_level0_delta.get_lsn_range().end;
|
||||
let mut deltas_to_compact = vec![Arc::clone(first_level0_delta)];
|
||||
for l in level0_deltas_iter {
|
||||
let lsn_range = l.get_lsn_range();
|
||||
|
||||
if lsn_range.start != prev_lsn_end {
|
||||
break;
|
||||
}
|
||||
deltas_to_compact.push(Arc::clone(l));
|
||||
prev_lsn_end = lsn_range.end;
|
||||
}
|
||||
let lsn_range = Range {
|
||||
start: deltas_to_compact.first().unwrap().get_lsn_range().start,
|
||||
end: deltas_to_compact.last().unwrap().get_lsn_range().end,
|
||||
};
|
||||
|
||||
info!(
|
||||
"Starting Level0 compaction in LSN range {}-{} for {} layers ({} deltas in total)",
|
||||
lsn_range.start,
|
||||
lsn_range.end,
|
||||
deltas_to_compact.len(),
|
||||
level0_deltas.len()
|
||||
);
|
||||
for l in deltas_to_compact.iter() {
|
||||
info!("compact includes {}", l.filename().display());
|
||||
}
|
||||
// We don't need the original list of layers anymore. Drop it so that
|
||||
// we don't accidentally use it later in the function.
|
||||
drop(level0_deltas);
|
||||
|
||||
// This iterator walks through all key-value pairs from all the layers
|
||||
// we're compacting, in key, LSN order.
|
||||
let all_values_iter = deltas_to_compact
|
||||
.iter()
|
||||
.map(|l| l.iter())
|
||||
.kmerge_by(|a, b| {
|
||||
if let Ok((a_key, a_lsn, _)) = a {
|
||||
if let Ok((b_key, b_lsn, _)) = b {
|
||||
match a_key.cmp(b_key) {
|
||||
Ordering::Less => true,
|
||||
Ordering::Equal => a_lsn <= b_lsn,
|
||||
Ordering::Greater => false,
|
||||
}
|
||||
} else {
|
||||
false
|
||||
}
|
||||
} else {
|
||||
false
|
||||
true
|
||||
}
|
||||
} else {
|
||||
true
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
// Merge the contents of all the input delta layers into a new set
|
||||
// of delta layers, based on the current partitioning.
|
||||
@@ -2042,8 +2092,8 @@ impl LayeredTimeline {
|
||||
|
||||
// Now that we have reshuffled the data to set of new delta layers, we can
|
||||
// delete the old ones
|
||||
let mut layer_paths_do_delete = HashSet::with_capacity(level0_deltas.len());
|
||||
for l in level0_deltas {
|
||||
let mut layer_paths_do_delete = HashSet::with_capacity(deltas_to_compact.len());
|
||||
for l in deltas_to_compact {
|
||||
l.delete()?;
|
||||
if let Some(path) = l.local_path() {
|
||||
layer_paths_do_delete.insert(path);
|
||||
@@ -2115,14 +2165,57 @@ impl LayeredTimeline {
|
||||
|
||||
let gc_info = self.gc_info.read().unwrap();
|
||||
let retain_lsns = &gc_info.retain_lsns;
|
||||
let cutoff = gc_info.cutoff;
|
||||
let cutoff = min(gc_info.cutoff, disk_consistent_lsn);
|
||||
let pitr = gc_info.pitr;
|
||||
|
||||
// Calculate pitr cutoff point.
|
||||
// If we cannot determine a cutoff LSN, be conservative and don't GC anything.
|
||||
let mut pitr_cutoff_lsn: Lsn = *self.get_latest_gc_cutoff_lsn();
|
||||
|
||||
if let Ok(timeline) =
|
||||
tenant_mgr::get_local_timeline_with_load(self.tenant_id, self.timeline_id)
|
||||
{
|
||||
// First, calculate pitr_cutoff_timestamp and then convert it to LSN.
|
||||
// If we don't have enough data to convert to LSN,
|
||||
// play safe and don't remove any layers.
|
||||
if let Some(pitr_cutoff_timestamp) = now.checked_sub(pitr) {
|
||||
let pitr_timestamp = to_pg_timestamp(pitr_cutoff_timestamp);
|
||||
|
||||
match timeline.find_lsn_for_timestamp(pitr_timestamp)? {
|
||||
LsnForTimestamp::Present(lsn) => pitr_cutoff_lsn = lsn,
|
||||
LsnForTimestamp::Future(lsn) => {
|
||||
debug!("future({})", lsn);
|
||||
pitr_cutoff_lsn = cutoff;
|
||||
}
|
||||
LsnForTimestamp::Past(lsn) => {
|
||||
debug!("past({})", lsn);
|
||||
}
|
||||
}
|
||||
debug!("pitr_cutoff_lsn = {:?}", pitr_cutoff_lsn)
|
||||
}
|
||||
} else if cfg!(test) {
|
||||
// We don't have local timeline in mocked cargo tests.
|
||||
// So, just ignore pitr_interval setting in this case.
|
||||
pitr_cutoff_lsn = cutoff;
|
||||
}
|
||||
|
||||
let new_gc_cutoff = Lsn::min(cutoff, pitr_cutoff_lsn);
|
||||
|
||||
// Nothing to GC. Return early.
|
||||
if *self.get_latest_gc_cutoff_lsn() >= new_gc_cutoff {
|
||||
info!(
|
||||
"Nothing to GC for timeline {}. cutoff_lsn {}",
|
||||
self.timeline_id, new_gc_cutoff
|
||||
);
|
||||
result.elapsed = now.elapsed()?;
|
||||
return Ok(result);
|
||||
}
|
||||
|
||||
let _enter = info_span!("garbage collection", timeline = %self.timeline_id, tenant = %self.tenant_id, cutoff = %cutoff).entered();
|
||||
|
||||
// We need to ensure that no one branches at a point before latest_gc_cutoff_lsn.
|
||||
// See branch_timeline() for details.
|
||||
*self.latest_gc_cutoff_lsn.write().unwrap() = cutoff;
|
||||
*self.latest_gc_cutoff_lsn.write().unwrap() = new_gc_cutoff;
|
||||
|
||||
info!("GC starting");
|
||||
|
||||
@@ -2162,30 +2255,18 @@ impl LayeredTimeline {
|
||||
result.layers_needed_by_cutoff += 1;
|
||||
continue 'outer;
|
||||
}
|
||||
// 2. It is newer than PiTR interval?
|
||||
// We use modification time of layer file to estimate update time.
|
||||
// This estimation is not quite precise but maintaining LSN->timestamp map seems to be overkill.
|
||||
// It is not expected that users will need high precision here. And this estimation
|
||||
// is conservative: modification time of file is always newer than actual time of version
|
||||
// creation. So it is safe for users.
|
||||
// TODO A possible "bloat" issue still persists here.
|
||||
// If modification time changes because of layer upload/download, we will keep these files
|
||||
// longer than necessary.
|
||||
// https://github.com/neondatabase/neon/issues/1554
|
||||
//
|
||||
if let Ok(metadata) = fs::metadata(&l.filename()) {
|
||||
let last_modified = metadata.modified()?;
|
||||
if now.duration_since(last_modified)? < pitr {
|
||||
debug!(
|
||||
"keeping {} because it's modification time {:?} is newer than PITR {:?}",
|
||||
l.filename().display(),
|
||||
last_modified,
|
||||
pitr
|
||||
);
|
||||
result.layers_needed_by_pitr += 1;
|
||||
continue 'outer;
|
||||
}
|
||||
|
||||
// 2. It is newer than PiTR cutoff point?
|
||||
if l.get_lsn_range().end > pitr_cutoff_lsn {
|
||||
debug!(
|
||||
"keeping {} because it's newer than pitr_cutoff_lsn {}",
|
||||
l.filename().display(),
|
||||
pitr_cutoff_lsn
|
||||
);
|
||||
result.layers_needed_by_pitr += 1;
|
||||
continue 'outer;
|
||||
}
|
||||
|
||||
// 3. Is it needed by a child branch?
|
||||
// NOTE With that wee would keep data that
|
||||
// might be referenced by child branches forever.
|
||||
@@ -2213,12 +2294,20 @@ impl LayeredTimeline {
|
||||
// is 102, then it might not have been fully flushed to disk
|
||||
// before crash.
|
||||
//
|
||||
// FIXME: This logic is wrong. See https://github.com/zenithdb/zenith/issues/707
|
||||
if !layers.newer_image_layer_exists(
|
||||
&l.get_key_range(),
|
||||
l.get_lsn_range().end,
|
||||
disk_consistent_lsn + 1,
|
||||
)? {
|
||||
// For example, imagine that the following layers exist:
|
||||
//
|
||||
// 1000 - image (A)
|
||||
// 1000-2000 - delta (B)
|
||||
// 2000 - image (C)
|
||||
// 2000-3000 - delta (D)
|
||||
// 3000 - image (E)
|
||||
//
|
||||
// If GC horizon is at 2500, we can remove layers A and B, but
|
||||
// we cannot remove C, even though it's older than 2500, because
|
||||
// the delta layer 2000-3000 depends on it.
|
||||
if !layers
|
||||
.image_layer_exists(&l.get_key_range(), &(l.get_lsn_range().end..new_gc_cutoff))?
|
||||
{
|
||||
debug!(
|
||||
"keeping {} because it is the latest layer",
|
||||
l.filename().display()
|
||||
@@ -2334,6 +2423,32 @@ impl LayeredTimeline {
|
||||
}
|
||||
}
|
||||
|
||||
/// Helper function for get_reconstruct_data() to add the path of layers traversed
|
||||
/// to an error, as anyhow context information.
|
||||
fn layer_traversal_error(
|
||||
msg: String,
|
||||
path: Vec<(ValueReconstructResult, Lsn, Arc<dyn Layer>)>,
|
||||
) -> anyhow::Result<()> {
|
||||
// We want the original 'msg' to be the outermost context. The outermost context
|
||||
// is the most high-level information, which also gets propagated to the client.
|
||||
let mut msg_iter = path
|
||||
.iter()
|
||||
.map(|(r, c, l)| {
|
||||
format!(
|
||||
"layer traversal: result {:?}, cont_lsn {}, layer: {}",
|
||||
r,
|
||||
c,
|
||||
l.filename().display()
|
||||
)
|
||||
})
|
||||
.chain(std::iter::once(msg));
|
||||
// Construct initial message from the first traversed layer
|
||||
let err = anyhow!(msg_iter.next().unwrap());
|
||||
|
||||
// Append all subsequent traversals, and the error message 'msg', as contexts.
|
||||
Err(msg_iter.fold(err, |err, msg| err.context(msg)))
|
||||
}
|
||||
|
||||
struct LayeredTimelineWriter<'a> {
|
||||
tl: &'a LayeredTimeline,
|
||||
_write_guard: MutexGuard<'a, ()>,
|
||||
|
||||
@@ -23,6 +23,7 @@ distribution depends on the workload: the updates could be totally random, or
|
||||
there could be a long stream of updates to a single relation when data is bulk
|
||||
loaded, for example, or something in between.
|
||||
|
||||
```
|
||||
Cloud Storage Page Server Safekeeper
|
||||
L1 L0 Memory WAL
|
||||
|
||||
@@ -37,6 +38,7 @@ Cloud Storage Page Server Safekeeper
|
||||
+----+----+ +----+----+ | | |
|
||||
|EEEE| |EEEE|EEEE| +---+-----+
|
||||
+----+ +----+----+
|
||||
```
|
||||
|
||||
In this illustration, WAL is received as a stream from the Safekeeper, from the
|
||||
right. It is immediately captured by the page server and stored quickly in
|
||||
@@ -47,7 +49,7 @@ the same page and relation close to each other.
|
||||
From the page server memory, whenever enough WAL has been accumulated, it is flushed
|
||||
to disk into a new L0 layer file, and the memory is released.
|
||||
|
||||
When enough L0 files have been accumulated, they are merged together rand sliced
|
||||
When enough L0 files have been accumulated, they are merged together and sliced
|
||||
per key-space, producing a new set of files where each file contains a more
|
||||
narrow key range, but larger LSN range.
|
||||
|
||||
@@ -121,7 +123,7 @@ The files are called "layer files". Each layer file covers a range of keys, and
|
||||
a range of LSNs (or a single LSN, in case of image layers). You can think of it
|
||||
as a rectangle in the two-dimensional key-LSN space. The layer files for each
|
||||
timeline are stored in the timeline's subdirectory under
|
||||
.zenith/tenants/<tenantid>/timelines.
|
||||
`.zenith/tenants/<tenantid>/timelines`.
|
||||
|
||||
There are two kind of layer files: images, and delta layers. An image file
|
||||
contains a snapshot of all keys at a particular LSN, whereas a delta file
|
||||
@@ -130,8 +132,11 @@ range of LSN.
|
||||
|
||||
image file:
|
||||
|
||||
```
|
||||
000000067F000032BE0000400000000070B6-000000067F000032BE0000400000000080B6__00000000346BC568
|
||||
start key end key LSN
|
||||
```
|
||||
|
||||
|
||||
The first parts define the key range that the layer covers. See
|
||||
pgdatadir_mapping.rs for how the key space is used. The last part is the LSN.
|
||||
@@ -140,8 +145,10 @@ delta file:
|
||||
|
||||
Delta files are named similarly, but they cover a range of LSNs:
|
||||
|
||||
```
|
||||
000000067F000032BE0000400000000020B6-000000067F000032BE0000400000000030B6__000000578C6B29-0000000057A50051
|
||||
start key end key start LSN end LSN
|
||||
```
|
||||
|
||||
A delta file contains all the key-values in the key-range that were updated in
|
||||
the LSN range. If a key has not been modified, there is no trace of it in the
|
||||
@@ -151,7 +158,9 @@ delta layer.
|
||||
A delta layer file can cover a part of the overall key space, as in the previous
|
||||
example, or the whole key range like this:
|
||||
|
||||
```
|
||||
000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__000000578C6B29-0000000057A50051
|
||||
```
|
||||
|
||||
A file that covers the whole key range is called a L0 file (Level 0), while a
|
||||
file that covers only part of the key range is called a L1 file. The "level" of
|
||||
@@ -168,7 +177,9 @@ version, and how branching and GC works is still valid.
|
||||
|
||||
The full path of a delta file looks like this:
|
||||
|
||||
```
|
||||
.zenith/tenants/941ddc8604413b88b3d208bddf90396c/timelines/4af489b06af8eed9e27a841775616962/rel_1663_13990_2609_0_10_000000000169C348_0000000001702000
|
||||
```
|
||||
|
||||
For simplicity, the examples below use a simplified notation for the
|
||||
paths. The tenant ID is left out, the timeline ID is replaced with
|
||||
@@ -177,8 +188,10 @@ with a human-readable table name. The LSNs are also shorter. For
|
||||
example, a base image file at LSN 100 and a delta file between 100-200
|
||||
for 'orders' table on 'main' branch is represented like this:
|
||||
|
||||
```
|
||||
main/orders_100
|
||||
main/orders_100_200
|
||||
```
|
||||
|
||||
|
||||
# Creating layer files
|
||||
@@ -188,12 +201,14 @@ branch called 'main' and two tables, 'orders' and 'customers'. The end
|
||||
of WAL is currently at LSN 250. In this starting situation, you would
|
||||
have these files on disk:
|
||||
|
||||
```
|
||||
main/orders_100
|
||||
main/orders_100_200
|
||||
main/orders_200
|
||||
main/customers_100
|
||||
main/customers_100_200
|
||||
main/customers_200
|
||||
```
|
||||
|
||||
In addition to those files, the recent changes between LSN 200 and the
|
||||
end of WAL at 250 are kept in memory. If the page server crashes, the
|
||||
@@ -224,6 +239,7 @@ If the customers table is modified later, a new file is created for it
|
||||
at the next checkpoint. The new file will cover the "gap" from the
|
||||
last layer file, so the LSN ranges are always contiguous:
|
||||
|
||||
```
|
||||
main/orders_100
|
||||
main/orders_100_200
|
||||
main/orders_200
|
||||
@@ -236,6 +252,7 @@ last layer file, so the LSN ranges are always contiguous:
|
||||
main/customers_200
|
||||
main/customers_200_500
|
||||
main/customers_500
|
||||
```
|
||||
|
||||
## Reading page versions
|
||||
|
||||
@@ -259,15 +276,18 @@ involves replaying any WAL records applicable to the page between LSNs
|
||||
|
||||
Imagine that a child branch is created at LSN 250:
|
||||
|
||||
```
|
||||
@250
|
||||
----main--+-------------------------->
|
||||
\
|
||||
+---child-------------->
|
||||
```
|
||||
|
||||
|
||||
Then, the 'orders' table is updated differently on the 'main' and
|
||||
'child' branches. You now have this situation on disk:
|
||||
|
||||
```
|
||||
main/orders_100
|
||||
main/orders_100_200
|
||||
main/orders_200
|
||||
@@ -282,6 +302,7 @@ Then, the 'orders' table is updated differently on the 'main' and
|
||||
child/orders_300
|
||||
child/orders_300_400
|
||||
child/orders_400
|
||||
```
|
||||
|
||||
Because the 'customers' table hasn't been modified on the child
|
||||
branch, there is no file for it there. If you request a page for it on
|
||||
@@ -294,6 +315,7 @@ is linear, and the request's LSN identifies unambiguously which file
|
||||
you need to look at. For example, the history for the 'orders' table
|
||||
on the 'main' branch consists of these files:
|
||||
|
||||
```
|
||||
main/orders_100
|
||||
main/orders_100_200
|
||||
main/orders_200
|
||||
@@ -301,10 +323,12 @@ on the 'main' branch consists of these files:
|
||||
main/orders_300
|
||||
main/orders_300_400
|
||||
main/orders_400
|
||||
```
|
||||
|
||||
And from the 'child' branch's point of view, it consists of these
|
||||
files:
|
||||
|
||||
```
|
||||
main/orders_100
|
||||
main/orders_100_200
|
||||
main/orders_200
|
||||
@@ -313,6 +337,7 @@ files:
|
||||
child/orders_300
|
||||
child/orders_300_400
|
||||
child/orders_400
|
||||
```
|
||||
|
||||
The branch metadata includes the point where the child branch was
|
||||
created, LSN 250. If a page request comes with LSN 275, we read the
|
||||
@@ -345,6 +370,7 @@ Let's look at the single branch scenario again. Imagine that the end
|
||||
of the branch is LSN 525, so that the GC horizon is currently at
|
||||
525-150 = 375
|
||||
|
||||
```
|
||||
main/orders_100
|
||||
main/orders_100_200
|
||||
main/orders_200
|
||||
@@ -357,11 +383,13 @@ of the branch is LSN 525, so that the GC horizon is currently at
|
||||
main/customers_100
|
||||
main/customers_100_200
|
||||
main/customers_200
|
||||
```
|
||||
|
||||
We can remove the following files because the end LSNs of those files are
|
||||
older than GC horizon 375, and there are more recent layer files for the
|
||||
table:
|
||||
|
||||
```
|
||||
main/orders_100 DELETE
|
||||
main/orders_100_200 DELETE
|
||||
main/orders_200 DELETE
|
||||
@@ -374,8 +402,9 @@ table:
|
||||
main/customers_100 DELETE
|
||||
main/customers_100_200 DELETE
|
||||
main/customers_200 KEEP, NO NEWER VERSION
|
||||
```
|
||||
|
||||
'main/customers_100_200' is old enough, but it cannot be
|
||||
'main/customers_200' is old enough, but it cannot be
|
||||
removed because there is no newer layer file for the table.
|
||||
|
||||
Things get slightly more complicated with multiple branches. All of
|
||||
@@ -384,6 +413,7 @@ retain older shapshot files that are still needed by child branches.
|
||||
For example, if child branch is created at LSN 150, and the 'customers'
|
||||
table is updated on the branch, you would have these files:
|
||||
|
||||
```
|
||||
main/orders_100 KEEP, NEEDED BY child BRANCH
|
||||
main/orders_100_200 KEEP, NEEDED BY child BRANCH
|
||||
main/orders_200 DELETE
|
||||
@@ -398,6 +428,7 @@ table is updated on the branch, you would have these files:
|
||||
main/customers_200 KEEP, NO NEWER VERSION
|
||||
child/customers_150_300 DELETE
|
||||
child/customers_300 KEEP, NO NEWER VERSION
|
||||
```
|
||||
|
||||
In this situation, 'main/orders_100' and 'main/orders_100_200' cannot
|
||||
be removed, even though they are older than the GC horizon, because
|
||||
@@ -407,6 +438,7 @@ and 'main/orders_200_300' can still be removed.
|
||||
If 'orders' is modified later on the 'child' branch, we will create a
|
||||
new base image and delta file for it on the child:
|
||||
|
||||
```
|
||||
main/orders_100
|
||||
main/orders_100_200
|
||||
|
||||
@@ -419,6 +451,7 @@ new base image and delta file for it on the child:
|
||||
child/customers_300
|
||||
child/orders_150_400
|
||||
child/orders_400
|
||||
```
|
||||
|
||||
After this, the 'main/orders_100' and 'main/orders_100_200' file could
|
||||
be removed. It is no longer needed by the child branch, because there
|
||||
@@ -434,6 +467,7 @@ Describe GC and checkpoint interval settings.
|
||||
In principle, each relation can be checkpointed separately, i.e. the
|
||||
LSN ranges of the files don't need to line up. So this would be legal:
|
||||
|
||||
```
|
||||
main/orders_100
|
||||
main/orders_100_200
|
||||
main/orders_200
|
||||
@@ -446,6 +480,7 @@ LSN ranges of the files don't need to line up. So this would be legal:
|
||||
main/customers_250
|
||||
main/customers_250_500
|
||||
main/customers_500
|
||||
```
|
||||
|
||||
However, the code currently always checkpoints all relations together.
|
||||
So that situation doesn't arise in practice.
|
||||
@@ -468,11 +503,13 @@ does that. It could be useful, however, as a transient state when
|
||||
garbage collecting around branch points, or explicit recovery
|
||||
points. For example, if we start with this:
|
||||
|
||||
```
|
||||
main/orders_100
|
||||
main/orders_100_200
|
||||
main/orders_200
|
||||
main/orders_200_300
|
||||
main/orders_300
|
||||
```
|
||||
|
||||
And there is a branch or explicit recovery point at LSN 150, we could
|
||||
replace 'main/orders_100_200' with 'main/orders_150' to keep a
|
||||
|
||||
@@ -37,11 +37,8 @@ use crate::virtual_file::VirtualFile;
|
||||
use crate::walrecord;
|
||||
use crate::{DELTA_FILE_MAGIC, STORAGE_FORMAT_VERSION};
|
||||
use anyhow::{bail, ensure, Context, Result};
|
||||
use rand::{distributions::Alphanumeric, Rng};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use tracing::*;
|
||||
// avoid binding to Write (conflicts with std::io::Write)
|
||||
// while being able to use std::fmt::Write's methods
|
||||
use std::fmt::Write as _;
|
||||
use std::fs;
|
||||
use std::io::{BufWriter, Write};
|
||||
use std::io::{Seek, SeekFrom};
|
||||
@@ -49,6 +46,7 @@ use std::ops::Range;
|
||||
use std::os::unix::fs::FileExt;
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};
|
||||
use tracing::*;
|
||||
|
||||
use utils::{
|
||||
bin_ser::BeSer,
|
||||
@@ -254,6 +252,9 @@ impl Layer for DeltaLayer {
|
||||
return false;
|
||||
}
|
||||
let entry_lsn = DeltaKey::extract_lsn_from_buf(key);
|
||||
if entry_lsn < lsn_range.start {
|
||||
return false;
|
||||
}
|
||||
offsets.push((entry_lsn, blob_ref.pos()));
|
||||
|
||||
!blob_ref.will_init()
|
||||
@@ -362,6 +363,28 @@ impl Layer for DeltaLayer {
|
||||
tree_reader.dump()?;
|
||||
|
||||
let mut cursor = file.block_cursor();
|
||||
|
||||
// A subroutine to dump a single blob
|
||||
let mut dump_blob = |blob_ref: BlobRef| -> anyhow::Result<String> {
|
||||
let buf = cursor.read_blob(blob_ref.pos())?;
|
||||
let val = Value::des(&buf)?;
|
||||
let desc = match val {
|
||||
Value::Image(img) => {
|
||||
format!(" img {} bytes", img.len())
|
||||
}
|
||||
Value::WalRecord(rec) => {
|
||||
let wal_desc = walrecord::describe_wal_record(&rec)?;
|
||||
format!(
|
||||
" rec {} bytes will_init: {} {}",
|
||||
buf.len(),
|
||||
rec.will_init(),
|
||||
wal_desc
|
||||
)
|
||||
}
|
||||
};
|
||||
Ok(desc)
|
||||
};
|
||||
|
||||
tree_reader.visit(
|
||||
&[0u8; DELTA_KEY_SIZE],
|
||||
VisitDirection::Forwards,
|
||||
@@ -370,34 +393,10 @@ impl Layer for DeltaLayer {
|
||||
let key = DeltaKey::extract_key_from_buf(delta_key);
|
||||
let lsn = DeltaKey::extract_lsn_from_buf(delta_key);
|
||||
|
||||
let mut desc = String::new();
|
||||
match cursor.read_blob(blob_ref.pos()) {
|
||||
Ok(buf) => {
|
||||
let val = Value::des(&buf);
|
||||
match val {
|
||||
Ok(Value::Image(img)) => {
|
||||
write!(&mut desc, " img {} bytes", img.len()).unwrap();
|
||||
}
|
||||
Ok(Value::WalRecord(rec)) => {
|
||||
let wal_desc = walrecord::describe_wal_record(&rec).unwrap();
|
||||
write!(
|
||||
&mut desc,
|
||||
" rec {} bytes will_init: {} {}",
|
||||
buf.len(),
|
||||
rec.will_init(),
|
||||
wal_desc
|
||||
)
|
||||
.unwrap();
|
||||
}
|
||||
Err(err) => {
|
||||
write!(&mut desc, " DESERIALIZATION ERROR: {}", err).unwrap();
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(err) => {
|
||||
write!(&mut desc, " READ ERROR: {}", err).unwrap();
|
||||
}
|
||||
}
|
||||
let desc = match dump_blob(blob_ref) {
|
||||
Ok(desc) => desc,
|
||||
Err(err) => format!("ERROR: {}", err),
|
||||
};
|
||||
println!(" key {} at {}: {}", key, lsn, desc);
|
||||
true
|
||||
},
|
||||
@@ -422,6 +421,28 @@ impl DeltaLayer {
|
||||
}
|
||||
}
|
||||
|
||||
fn temp_path_for(
|
||||
conf: &PageServerConf,
|
||||
timelineid: ZTimelineId,
|
||||
tenantid: ZTenantId,
|
||||
key_start: Key,
|
||||
lsn_range: &Range<Lsn>,
|
||||
) -> PathBuf {
|
||||
let rand_string: String = rand::thread_rng()
|
||||
.sample_iter(&Alphanumeric)
|
||||
.take(8)
|
||||
.map(char::from)
|
||||
.collect();
|
||||
|
||||
conf.timeline_path(&timelineid, &tenantid).join(format!(
|
||||
"{}-XXX__{:016X}-{:016X}.{}.temp",
|
||||
key_start,
|
||||
u64::from(lsn_range.start),
|
||||
u64::from(lsn_range.end),
|
||||
rand_string
|
||||
))
|
||||
}
|
||||
|
||||
///
|
||||
/// Open the underlying file and read the metadata into memory, if it's
|
||||
/// not loaded already.
|
||||
@@ -609,12 +630,8 @@ impl DeltaLayerWriter {
|
||||
//
|
||||
// Note: This overwrites any existing file. There shouldn't be any.
|
||||
// FIXME: throw an error instead?
|
||||
let path = conf.timeline_path(&timelineid, &tenantid).join(format!(
|
||||
"{}-XXX__{:016X}-{:016X}.temp",
|
||||
key_start,
|
||||
u64::from(lsn_range.start),
|
||||
u64::from(lsn_range.end)
|
||||
));
|
||||
let path = DeltaLayer::temp_path_for(conf, timelineid, tenantid, key_start, &lsn_range);
|
||||
|
||||
let mut file = VirtualFile::create(&path)?;
|
||||
// make room for the header block
|
||||
file.seek(SeekFrom::Start(PAGE_SZ as u64))?;
|
||||
@@ -707,6 +724,8 @@ impl DeltaLayerWriter {
|
||||
}),
|
||||
};
|
||||
|
||||
// fsync the file
|
||||
file.sync_all()?;
|
||||
// Rename the file to its final name
|
||||
//
|
||||
// Note: This overwrites any existing file. There shouldn't be any.
|
||||
|
||||
@@ -444,6 +444,13 @@ where
|
||||
///
|
||||
/// stack[0] is the current root page, stack.last() is the leaf.
|
||||
///
|
||||
/// We maintain the length of the stack to be always greater than zero.
|
||||
/// Two exceptions are:
|
||||
/// 1. `Self::flush_node`. The method will push the new node if it extracted the last one.
|
||||
/// So because other methods cannot see the intermediate state invariant still holds.
|
||||
/// 2. `Self::finish`. It consumes self and does not return it back,
|
||||
/// which means that this is where the structure is destroyed.
|
||||
/// Thus stack of zero length cannot be observed by other methods.
|
||||
stack: Vec<BuildNode<L>>,
|
||||
|
||||
/// Last key that was appended to the tree. Used to sanity check that append
|
||||
@@ -482,7 +489,10 @@ where
|
||||
|
||||
fn append_internal(&mut self, key: &[u8; L], value: Value) -> Result<()> {
|
||||
// Try to append to the current leaf buffer
|
||||
let last = self.stack.last_mut().unwrap();
|
||||
let last = self
|
||||
.stack
|
||||
.last_mut()
|
||||
.expect("should always have at least one item");
|
||||
let level = last.level;
|
||||
if last.push(key, value) {
|
||||
return Ok(());
|
||||
@@ -512,19 +522,25 @@ where
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Flush the bottommost node in the stack to disk. Appends a downlink to its parent,
|
||||
/// and recursively flushes the parent too, if it becomes full. If the root page becomes full,
|
||||
/// creates a new root page, increasing the height of the tree.
|
||||
fn flush_node(&mut self) -> Result<()> {
|
||||
let last = self.stack.pop().unwrap();
|
||||
// Get the current bottommost node in the stack and flush it to disk.
|
||||
let last = self
|
||||
.stack
|
||||
.pop()
|
||||
.expect("should always have at least one item");
|
||||
let buf = last.pack();
|
||||
let downlink_key = last.first_key();
|
||||
let downlink_ptr = self.writer.write_blk(buf)?;
|
||||
|
||||
// Append the downlink to the parent
|
||||
// Append the downlink to the parent. If there is no parent, ie. this was the root page,
|
||||
// create a new root page, increasing the height of the tree.
|
||||
if self.stack.is_empty() {
|
||||
self.stack.push(BuildNode::new(last.level + 1));
|
||||
}
|
||||
self.append_internal(&downlink_key, Value::from_blknum(downlink_ptr))?;
|
||||
|
||||
Ok(())
|
||||
self.append_internal(&downlink_key, Value::from_blknum(downlink_ptr))
|
||||
}
|
||||
|
||||
///
|
||||
@@ -540,7 +556,10 @@ where
|
||||
self.flush_node()?;
|
||||
}
|
||||
|
||||
let root = self.stack.first().unwrap();
|
||||
let root = self
|
||||
.stack
|
||||
.first()
|
||||
.expect("by the check above we left one item there");
|
||||
let buf = root.pack();
|
||||
let root_blknum = self.writer.write_blk(buf)?;
|
||||
|
||||
|
||||
@@ -34,6 +34,7 @@ use crate::{IMAGE_FILE_MAGIC, STORAGE_FORMAT_VERSION};
|
||||
use anyhow::{bail, ensure, Context, Result};
|
||||
use bytes::Bytes;
|
||||
use hex;
|
||||
use rand::{distributions::Alphanumeric, Rng};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::fs;
|
||||
use std::io::Write;
|
||||
@@ -241,6 +242,22 @@ impl ImageLayer {
|
||||
}
|
||||
}
|
||||
|
||||
fn temp_path_for(
|
||||
conf: &PageServerConf,
|
||||
timelineid: ZTimelineId,
|
||||
tenantid: ZTenantId,
|
||||
fname: &ImageFileName,
|
||||
) -> PathBuf {
|
||||
let rand_string: String = rand::thread_rng()
|
||||
.sample_iter(&Alphanumeric)
|
||||
.take(8)
|
||||
.map(char::from)
|
||||
.collect();
|
||||
|
||||
conf.timeline_path(&timelineid, &tenantid)
|
||||
.join(format!("{}.{}.temp", fname, rand_string))
|
||||
}
|
||||
|
||||
///
|
||||
/// Open the underlying file and read the metadata into memory, if it's
|
||||
/// not loaded already.
|
||||
@@ -398,7 +415,7 @@ impl ImageLayer {
|
||||
///
|
||||
pub struct ImageLayerWriter {
|
||||
conf: &'static PageServerConf,
|
||||
_path: PathBuf,
|
||||
path: PathBuf,
|
||||
timelineid: ZTimelineId,
|
||||
tenantid: ZTenantId,
|
||||
key_range: Range<Key>,
|
||||
@@ -416,12 +433,10 @@ impl ImageLayerWriter {
|
||||
key_range: &Range<Key>,
|
||||
lsn: Lsn,
|
||||
) -> anyhow::Result<ImageLayerWriter> {
|
||||
// Create the file
|
||||
//
|
||||
// Note: This overwrites any existing file. There shouldn't be any.
|
||||
// FIXME: throw an error instead?
|
||||
let path = ImageLayer::path_for(
|
||||
&PathOrConf::Conf(conf),
|
||||
// Create the file initially with a temporary filename.
|
||||
// We'll atomically rename it to the final name when we're done.
|
||||
let path = ImageLayer::temp_path_for(
|
||||
conf,
|
||||
timelineid,
|
||||
tenantid,
|
||||
&ImageFileName {
|
||||
@@ -441,7 +456,7 @@ impl ImageLayerWriter {
|
||||
|
||||
let writer = ImageLayerWriter {
|
||||
conf,
|
||||
_path: path,
|
||||
path,
|
||||
timelineid,
|
||||
tenantid,
|
||||
key_range: key_range.clone(),
|
||||
@@ -512,6 +527,25 @@ impl ImageLayerWriter {
|
||||
index_root_blk,
|
||||
}),
|
||||
};
|
||||
|
||||
// fsync the file
|
||||
file.sync_all()?;
|
||||
|
||||
// Rename the file to its final name
|
||||
//
|
||||
// Note: This overwrites any existing file. There shouldn't be any.
|
||||
// FIXME: throw an error instead?
|
||||
let final_path = ImageLayer::path_for(
|
||||
&PathOrConf::Conf(self.conf),
|
||||
self.timelineid,
|
||||
self.tenantid,
|
||||
&ImageFileName {
|
||||
key_range: self.key_range.clone(),
|
||||
lsn: self.lsn,
|
||||
},
|
||||
);
|
||||
std::fs::rename(self.path, &final_path)?;
|
||||
|
||||
trace!("created image layer {}", layer.path().display());
|
||||
|
||||
Ok(layer)
|
||||
|
||||
@@ -201,18 +201,14 @@ impl LayerMap {
|
||||
NUM_ONDISK_LAYERS.dec();
|
||||
}
|
||||
|
||||
/// Is there a newer image layer for given key-range?
|
||||
/// Is there a newer image layer for given key- and LSN-range?
|
||||
///
|
||||
/// This is used for garbage collection, to determine if an old layer can
|
||||
/// be deleted.
|
||||
/// We ignore layers newer than disk_consistent_lsn because they will be removed at restart
|
||||
/// We also only look at historic layers
|
||||
//#[allow(dead_code)]
|
||||
pub fn newer_image_layer_exists(
|
||||
pub fn image_layer_exists(
|
||||
&self,
|
||||
key_range: &Range<Key>,
|
||||
lsn: Lsn,
|
||||
disk_consistent_lsn: Lsn,
|
||||
lsn_range: &Range<Lsn>,
|
||||
) -> Result<bool> {
|
||||
let mut range_remain = key_range.clone();
|
||||
|
||||
@@ -225,8 +221,7 @@ impl LayerMap {
|
||||
let img_lsn = l.get_lsn_range().start;
|
||||
if !l.is_incremental()
|
||||
&& l.get_key_range().contains(&range_remain.start)
|
||||
&& img_lsn > lsn
|
||||
&& img_lsn < disk_consistent_lsn
|
||||
&& lsn_range.contains(&img_lsn)
|
||||
{
|
||||
made_progress = true;
|
||||
let img_key_end = l.get_key_range().end;
|
||||
|
||||
@@ -45,7 +45,7 @@ pub const DELTA_FILE_MAGIC: u16 = 0x5A61;
|
||||
|
||||
lazy_static! {
|
||||
static ref LIVE_CONNECTIONS_COUNT: IntGaugeVec = register_int_gauge_vec!(
|
||||
"pageserver_live_connections_count",
|
||||
"pageserver_live_connections",
|
||||
"Number of live network connections",
|
||||
&["pageserver_connection_kind"]
|
||||
)
|
||||
|
||||
@@ -19,7 +19,6 @@ use std::net::TcpListener;
|
||||
use std::str;
|
||||
use std::str::FromStr;
|
||||
use std::sync::{Arc, RwLockReadGuard};
|
||||
use std::time::Duration;
|
||||
use tracing::*;
|
||||
use utils::{
|
||||
auth::{self, Claims, JwtAuth, Scope},
|
||||
@@ -326,7 +325,7 @@ const TIME_BUCKETS: &[f64] = &[
|
||||
|
||||
lazy_static! {
|
||||
static ref SMGR_QUERY_TIME: HistogramVec = register_histogram_vec!(
|
||||
"pageserver_smgr_query_time",
|
||||
"pageserver_smgr_query_seconds",
|
||||
"Time spent on smgr query handling",
|
||||
&["smgr_query_type", "tenant_id", "timeline_id"],
|
||||
TIME_BUCKETS.into()
|
||||
@@ -731,7 +730,18 @@ impl postgres_backend::Handler for PageServerHandler {
|
||||
for failpoint in failpoints.split(';') {
|
||||
if let Some((name, actions)) = failpoint.split_once('=') {
|
||||
info!("cfg failpoint: {} {}", name, actions);
|
||||
fail::cfg(name, actions).unwrap();
|
||||
|
||||
// We recognize one extra "action" that's not natively recognized
|
||||
// by the failpoints crate: exit, to immediately kill the process
|
||||
if actions == "exit" {
|
||||
fail::cfg_callback(name, || {
|
||||
info!("Exit requested by failpoint");
|
||||
std::process::exit(1);
|
||||
})
|
||||
.unwrap();
|
||||
} else {
|
||||
fail::cfg(name, actions).unwrap();
|
||||
}
|
||||
} else {
|
||||
bail!("Invalid failpoints format");
|
||||
}
|
||||
@@ -796,7 +806,9 @@ impl postgres_backend::Handler for PageServerHandler {
|
||||
.unwrap_or_else(|| Ok(repo.get_gc_horizon()))?;
|
||||
|
||||
let repo = tenant_mgr::get_repository_for_tenant(tenantid)?;
|
||||
let result = repo.gc_iteration(Some(timelineid), gc_horizon, Duration::ZERO, true)?;
|
||||
// Use tenant's pitr setting
|
||||
let pitr = repo.get_pitr_interval();
|
||||
let result = repo.gc_iteration(Some(timelineid), gc_horizon, pitr, true)?;
|
||||
pgb.write_message_noflush(&BeMessage::RowDescription(&[
|
||||
RowDescriptor::int8_col(b"layers_total"),
|
||||
RowDescriptor::int8_col(b"layers_needed_by_cutoff"),
|
||||
|
||||
@@ -9,7 +9,7 @@
|
||||
//!
|
||||
//! * public API via to interact with the external world:
|
||||
//! * [`start_local_timeline_sync`] to launch a background async loop to handle the synchronization
|
||||
//! * [`schedule_timeline_checkpoint_upload`] and [`schedule_timeline_download`] to enqueue a new upload and download tasks,
|
||||
//! * [`schedule_layer_upload`], [`schedule_layer_download`], and[`schedule_layer_delete`] to enqueue a new task
|
||||
//! to be processed by the async loop
|
||||
//!
|
||||
//! Here's a schematic overview of all interactions backup and the rest of the pageserver perform:
|
||||
@@ -44,8 +44,8 @@
|
||||
//! query their downloads later if they are accessed.
|
||||
//!
|
||||
//! Some time later, during pageserver checkpoints, in-memory data is flushed onto disk along with its metadata.
|
||||
//! If the storage sync loop was successfully started before, pageserver schedules the new checkpoint file uploads after every checkpoint.
|
||||
//! The checkpoint uploads are disabled, if no remote storage configuration is provided (no sync loop is started this way either).
|
||||
//! If the storage sync loop was successfully started before, pageserver schedules the layer files and the updated metadata file for upload, every time a layer is flushed to disk.
|
||||
//! The uploads are disabled, if no remote storage configuration is provided (no sync loop is started this way either).
|
||||
//! See [`crate::layered_repository`] for the upload calls and the adjacent logic.
|
||||
//!
|
||||
//! Synchronization logic is able to communicate back with updated timeline sync states, [`crate::repository::TimelineSyncStatusUpdate`],
|
||||
@@ -54,7 +54,7 @@
|
||||
//! * once after the sync loop startup, to signal pageserver which timelines will be synchronized in the near future
|
||||
//! * after every loop step, in case a timeline needs to be reloaded or evicted from pageserver's memory
|
||||
//!
|
||||
//! When the pageserver terminates, the sync loop finishes a current sync task (if any) and exits.
|
||||
//! When the pageserver terminates, the sync loop finishes current sync task (if any) and exits.
|
||||
//!
|
||||
//! The storage logic considers `image` as a set of local files (layers), fully representing a certain timeline at given moment (identified with `disk_consistent_lsn` from the corresponding `metadata` file).
|
||||
//! Timeline can change its state, by adding more files on disk and advancing its `disk_consistent_lsn`: this happens after pageserver checkpointing and is followed
|
||||
@@ -66,13 +66,13 @@
|
||||
//! when the newer image is downloaded
|
||||
//!
|
||||
//! Pageserver maintains similar to the local file structure remotely: all layer files are uploaded with the same names under the same directory structure.
|
||||
//! Yet instead of keeping the `metadata` file remotely, we wrap it with more data in [`IndexShard`], containing the list of remote files.
|
||||
//! Yet instead of keeping the `metadata` file remotely, we wrap it with more data in [`IndexPart`], containing the list of remote files.
|
||||
//! This file gets read to populate the cache, if the remote timeline data is missing from it and gets updated after every successful download.
|
||||
//! This way, we optimize S3 storage access by not running the `S3 list` command that could be expencive and slow: knowing both [`ZTenantId`] and [`ZTimelineId`],
|
||||
//! we can always reconstruct the path to the timeline, use this to get the same path on the remote storage and retrive its shard contents, if needed, same as any layer files.
|
||||
//!
|
||||
//! By default, pageserver reads the remote storage index data only for timelines located locally, to synchronize those, if needed.
|
||||
//! Bulk index data download happens only initially, on pageserer startup. The rest of the remote storage stays unknown to pageserver and loaded on demand only,
|
||||
//! Bulk index data download happens only initially, on pageserver startup. The rest of the remote storage stays unknown to pageserver and loaded on demand only,
|
||||
//! when a new timeline is scheduled for the download.
|
||||
//!
|
||||
//! NOTES:
|
||||
@@ -89,13 +89,12 @@
|
||||
//! Synchronization is done with the queue being emptied via separate thread asynchronously,
|
||||
//! attempting to fully store pageserver's local data on the remote storage in a custom format, beneficial for storing.
|
||||
//!
|
||||
//! A queue is implemented in the [`sync_queue`] module as a pair of sender and receiver channels, to block on zero tasks instead of checking the queue.
|
||||
//! The pair's shared buffer of a fixed size serves as an implicit queue, holding [`SyncTask`] for local files upload/download operations.
|
||||
//! A queue is implemented in the [`sync_queue`] module as a VecDeque to hold the tasks, and a condition variable for blocking when the queue is empty.
|
||||
//!
|
||||
//! The queue gets emptied by a single thread with the loop, that polls the tasks in batches of deduplicated tasks.
|
||||
//! A task from the batch corresponds to a single timeline, with its files to sync merged together: given that only one task sync loop step is active at a time,
|
||||
//! timeline uploads and downloads can happen concurrently, in no particular order due to incremental nature of the timeline layers.
|
||||
//! Deletion happens only after a successful upload only, otherwise the compation output might make the timeline inconsistent until both tasks are fully processed without errors.
|
||||
//! Deletion happens only after a successful upload only, otherwise the compaction output might make the timeline inconsistent until both tasks are fully processed without errors.
|
||||
//! Upload and download update the remote data (inmemory index and S3 json index part file) only after every layer is successfully synchronized, while the deletion task
|
||||
//! does otherwise: it requires to have the remote data updated first succesfully: blob files will be invisible to pageserver this way.
|
||||
//!
|
||||
@@ -138,8 +137,6 @@
|
||||
//! NOTE: No real contents or checksum check happens right now and is a subject to improve later.
|
||||
//!
|
||||
//! After the whole timeline is downloaded, [`crate::tenant_mgr::apply_timeline_sync_status_updates`] function is used to update pageserver memory stage for the timeline processed.
|
||||
//!
|
||||
//! When pageserver signals shutdown, current sync task gets finished and the loop exists.
|
||||
|
||||
mod delete;
|
||||
mod download;
|
||||
@@ -153,10 +150,7 @@ use std::{
|
||||
num::{NonZeroU32, NonZeroUsize},
|
||||
ops::ControlFlow,
|
||||
path::{Path, PathBuf},
|
||||
sync::{
|
||||
atomic::{AtomicUsize, Ordering},
|
||||
Arc,
|
||||
},
|
||||
sync::{Arc, Condvar, Mutex},
|
||||
};
|
||||
|
||||
use anyhow::{anyhow, bail, Context};
|
||||
@@ -167,7 +161,6 @@ use remote_storage::{GenericRemoteStorage, RemoteStorage};
|
||||
use tokio::{
|
||||
fs,
|
||||
runtime::Runtime,
|
||||
sync::mpsc::{self, error::TryRecvError, UnboundedReceiver, UnboundedSender},
|
||||
time::{Duration, Instant},
|
||||
};
|
||||
use tracing::*;
|
||||
@@ -208,12 +201,12 @@ lazy_static! {
|
||||
)
|
||||
.expect("failed to register pageserver remote storage remaining sync items int gauge");
|
||||
static ref FATAL_TASK_FAILURES: IntCounter = register_int_counter!(
|
||||
"pageserver_remote_storage_fatal_task_failures",
|
||||
"pageserver_remote_storage_fatal_task_failures_total",
|
||||
"Number of critically failed tasks"
|
||||
)
|
||||
.expect("failed to register pageserver remote storage remaining sync items int gauge");
|
||||
static ref IMAGE_SYNC_TIME: HistogramVec = register_histogram_vec!(
|
||||
"pageserver_remote_storage_image_sync_time",
|
||||
"pageserver_remote_storage_image_sync_seconds",
|
||||
"Time took to synchronize (download or upload) a whole pageserver image. \
|
||||
Grouped by `operation_kind` (upload|download) and `status` (success|failure)",
|
||||
&["operation_kind", "status"],
|
||||
@@ -428,6 +421,14 @@ fn collect_timeline_files(
|
||||
entry_path.display()
|
||||
)
|
||||
})?;
|
||||
} else if entry_path.extension().and_then(OsStr::to_str) == Some("temp") {
|
||||
info!("removing temp layer file at {}", entry_path.display());
|
||||
std::fs::remove_file(&entry_path).with_context(|| {
|
||||
format!(
|
||||
"failed to remove temp layer file at {}",
|
||||
entry_path.display()
|
||||
)
|
||||
})?;
|
||||
} else {
|
||||
timeline_files.insert(entry_path);
|
||||
}
|
||||
@@ -453,97 +454,77 @@ fn collect_timeline_files(
|
||||
Ok((timeline_id, metadata, timeline_files))
|
||||
}
|
||||
|
||||
/// Wraps mpsc channel bits around into a queue interface.
|
||||
/// mpsc approach was picked to allow blocking the sync loop if no tasks are present, to avoid meaningless spinning.
|
||||
/// Global queue of sync tasks.
|
||||
///
|
||||
/// 'queue' is protected by a mutex, and 'condvar' is used to wait for tasks to arrive.
|
||||
struct SyncQueue {
|
||||
len: AtomicUsize,
|
||||
max_timelines_per_batch: NonZeroUsize,
|
||||
sender: UnboundedSender<(ZTenantTimelineId, SyncTask)>,
|
||||
|
||||
queue: Mutex<VecDeque<(ZTenantTimelineId, SyncTask)>>,
|
||||
condvar: Condvar,
|
||||
}
|
||||
|
||||
impl SyncQueue {
|
||||
fn new(
|
||||
max_timelines_per_batch: NonZeroUsize,
|
||||
) -> (Self, UnboundedReceiver<(ZTenantTimelineId, SyncTask)>) {
|
||||
let (sender, receiver) = mpsc::unbounded_channel();
|
||||
(
|
||||
Self {
|
||||
len: AtomicUsize::new(0),
|
||||
max_timelines_per_batch,
|
||||
sender,
|
||||
},
|
||||
receiver,
|
||||
)
|
||||
fn new(max_timelines_per_batch: NonZeroUsize) -> Self {
|
||||
Self {
|
||||
max_timelines_per_batch,
|
||||
queue: Mutex::new(VecDeque::new()),
|
||||
condvar: Condvar::new(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Queue a new task
|
||||
fn push(&self, sync_id: ZTenantTimelineId, new_task: SyncTask) {
|
||||
match self.sender.send((sync_id, new_task)) {
|
||||
Ok(()) => {
|
||||
self.len.fetch_add(1, Ordering::Relaxed);
|
||||
}
|
||||
Err(e) => {
|
||||
error!("failed to push sync task to queue: {e}");
|
||||
}
|
||||
let mut q = self.queue.lock().unwrap();
|
||||
|
||||
q.push_back((sync_id, new_task));
|
||||
if q.len() <= 1 {
|
||||
self.condvar.notify_one();
|
||||
}
|
||||
}
|
||||
|
||||
/// Fetches a task batch, getting every existing entry from the queue, grouping by timelines and merging the tasks for every timeline.
|
||||
/// A timeline has to care to not to delete cetain layers from the remote storage before the corresponding uploads happen.
|
||||
/// Otherwise, due to "immutable" nature of the layers, the order of their deletion/uploading/downloading does not matter.
|
||||
/// A timeline has to care to not to delete certain layers from the remote storage before the corresponding uploads happen.
|
||||
/// Other than that, due to "immutable" nature of the layers, the order of their deletion/uploading/downloading does not matter.
|
||||
/// Hence, we merge the layers together into single task per timeline and run those concurrently (with the deletion happening only after successful uploading).
|
||||
async fn next_task_batch(
|
||||
&self,
|
||||
// The queue is based on two ends of a channel and has to be accessible statically without blocking for submissions from the sync code.
|
||||
// Its receiver needs &mut, so we cannot place it in the same container with the other end and get both static and non-blocking access.
|
||||
// Hence toss this around to use it from the sync loop directly as &mut.
|
||||
sync_queue_receiver: &mut UnboundedReceiver<(ZTenantTimelineId, SyncTask)>,
|
||||
) -> HashMap<ZTenantTimelineId, SyncTaskBatch> {
|
||||
// request the first task in blocking fashion to do less meaningless work
|
||||
let (first_sync_id, first_task) = if let Some(first_task) = sync_queue_receiver.recv().await
|
||||
{
|
||||
self.len.fetch_sub(1, Ordering::Relaxed);
|
||||
first_task
|
||||
} else {
|
||||
info!("Queue sender part was dropped, aborting");
|
||||
return HashMap::new();
|
||||
};
|
||||
fn next_task_batch(&self) -> (HashMap<ZTenantTimelineId, SyncTaskBatch>, usize) {
|
||||
// Wait for the first task in blocking fashion
|
||||
let mut q = self.queue.lock().unwrap();
|
||||
while q.is_empty() {
|
||||
q = self
|
||||
.condvar
|
||||
.wait_timeout(q, Duration::from_millis(1000))
|
||||
.unwrap()
|
||||
.0;
|
||||
|
||||
if thread_mgr::is_shutdown_requested() {
|
||||
return (HashMap::new(), q.len());
|
||||
}
|
||||
}
|
||||
let (first_sync_id, first_task) = q.pop_front().unwrap();
|
||||
|
||||
let mut timelines_left_to_batch = self.max_timelines_per_batch.get() - 1;
|
||||
let mut tasks_to_process = self.len();
|
||||
let tasks_to_process = q.len();
|
||||
|
||||
let mut batches = HashMap::with_capacity(tasks_to_process);
|
||||
batches.insert(first_sync_id, SyncTaskBatch::new(first_task));
|
||||
|
||||
let mut tasks_to_reenqueue = Vec::with_capacity(tasks_to_process);
|
||||
|
||||
// Pull the queue channel until we get all tasks that were there at the beginning of the batch construction.
|
||||
// Greedily grab as many other tasks that we can.
|
||||
// Yet do not put all timelines in the batch, but only the first ones that fit the timeline limit.
|
||||
// Still merge the rest of the pulled tasks and reenqueue those for later.
|
||||
while tasks_to_process > 0 {
|
||||
match sync_queue_receiver.try_recv() {
|
||||
Ok((sync_id, new_task)) => {
|
||||
self.len.fetch_sub(1, Ordering::Relaxed);
|
||||
tasks_to_process -= 1;
|
||||
|
||||
match batches.entry(sync_id) {
|
||||
hash_map::Entry::Occupied(mut v) => v.get_mut().add(new_task),
|
||||
hash_map::Entry::Vacant(v) => {
|
||||
timelines_left_to_batch = timelines_left_to_batch.saturating_sub(1);
|
||||
if timelines_left_to_batch == 0 {
|
||||
tasks_to_reenqueue.push((sync_id, new_task));
|
||||
} else {
|
||||
v.insert(SyncTaskBatch::new(new_task));
|
||||
}
|
||||
}
|
||||
// Re-enqueue the tasks that don't fit in this batch.
|
||||
while let Some((sync_id, new_task)) = q.pop_front() {
|
||||
match batches.entry(sync_id) {
|
||||
hash_map::Entry::Occupied(mut v) => v.get_mut().add(new_task),
|
||||
hash_map::Entry::Vacant(v) => {
|
||||
timelines_left_to_batch = timelines_left_to_batch.saturating_sub(1);
|
||||
if timelines_left_to_batch == 0 {
|
||||
tasks_to_reenqueue.push((sync_id, new_task));
|
||||
} else {
|
||||
v.insert(SyncTaskBatch::new(new_task));
|
||||
}
|
||||
}
|
||||
Err(TryRecvError::Disconnected) => {
|
||||
debug!("Sender disconnected, batch collection aborted");
|
||||
break;
|
||||
}
|
||||
Err(TryRecvError::Empty) => {
|
||||
debug!("No more data in the sync queue, task batch is not full");
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -553,14 +534,15 @@ impl SyncQueue {
|
||||
tasks_to_reenqueue.len()
|
||||
);
|
||||
for (id, task) in tasks_to_reenqueue {
|
||||
self.push(id, task);
|
||||
q.push_back((id, task));
|
||||
}
|
||||
|
||||
batches
|
||||
(batches, q.len())
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
fn len(&self) -> usize {
|
||||
self.len.load(Ordering::Relaxed)
|
||||
self.queue.lock().unwrap().len()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -823,7 +805,7 @@ pub fn schedule_layer_download(tenant_id: ZTenantId, timeline_id: ZTimelineId) {
|
||||
debug!("Download task for tenant {tenant_id}, timeline {timeline_id} sent")
|
||||
}
|
||||
|
||||
/// Uses a remote storage given to start the storage sync loop.
|
||||
/// Launch a thread to perform remote storage sync tasks.
|
||||
/// See module docs for loop step description.
|
||||
pub(super) fn spawn_storage_sync_thread<P, S>(
|
||||
conf: &'static PageServerConf,
|
||||
@@ -836,7 +818,7 @@ where
|
||||
P: Debug + Send + Sync + 'static,
|
||||
S: RemoteStorage<RemoteObjectId = P> + Send + Sync + 'static,
|
||||
{
|
||||
let (sync_queue, sync_queue_receiver) = SyncQueue::new(max_concurrent_timelines_sync);
|
||||
let sync_queue = SyncQueue::new(max_concurrent_timelines_sync);
|
||||
SYNC_QUEUE
|
||||
.set(sync_queue)
|
||||
.map_err(|_queue| anyhow!("Could not initialize sync queue"))?;
|
||||
@@ -864,7 +846,7 @@ where
|
||||
local_timeline_files,
|
||||
);
|
||||
|
||||
let loop_index = remote_index.clone();
|
||||
let remote_index_clone = remote_index.clone();
|
||||
thread_mgr::spawn(
|
||||
ThreadKind::StorageSync,
|
||||
None,
|
||||
@@ -875,12 +857,7 @@ where
|
||||
storage_sync_loop(
|
||||
runtime,
|
||||
conf,
|
||||
(
|
||||
Arc::new(storage),
|
||||
loop_index,
|
||||
sync_queue,
|
||||
sync_queue_receiver,
|
||||
),
|
||||
(Arc::new(storage), remote_index_clone, sync_queue),
|
||||
max_sync_errors,
|
||||
);
|
||||
Ok(())
|
||||
@@ -896,12 +873,7 @@ where
|
||||
fn storage_sync_loop<P, S>(
|
||||
runtime: Runtime,
|
||||
conf: &'static PageServerConf,
|
||||
(storage, index, sync_queue, mut sync_queue_receiver): (
|
||||
Arc<S>,
|
||||
RemoteIndex,
|
||||
&SyncQueue,
|
||||
UnboundedReceiver<(ZTenantTimelineId, SyncTask)>,
|
||||
),
|
||||
(storage, index, sync_queue): (Arc<S>, RemoteIndex, &SyncQueue),
|
||||
max_sync_errors: NonZeroU32,
|
||||
) where
|
||||
P: Debug + Send + Sync + 'static,
|
||||
@@ -909,16 +881,35 @@ fn storage_sync_loop<P, S>(
|
||||
{
|
||||
info!("Starting remote storage sync loop");
|
||||
loop {
|
||||
let loop_index = index.clone();
|
||||
let loop_storage = Arc::clone(&storage);
|
||||
|
||||
let (batched_tasks, remaining_queue_length) = sync_queue.next_task_batch();
|
||||
|
||||
if thread_mgr::is_shutdown_requested() {
|
||||
info!("Shutdown requested, stopping");
|
||||
break;
|
||||
}
|
||||
|
||||
REMAINING_SYNC_ITEMS.set(remaining_queue_length as i64);
|
||||
if remaining_queue_length > 0 || !batched_tasks.is_empty() {
|
||||
info!("Processing tasks for {} timelines in batch, more tasks left to process: {remaining_queue_length}", batched_tasks.len());
|
||||
} else {
|
||||
debug!("No tasks to process");
|
||||
continue;
|
||||
}
|
||||
|
||||
// Concurrently perform all the tasks in the batch
|
||||
let loop_step = runtime.block_on(async {
|
||||
tokio::select! {
|
||||
step = loop_step(
|
||||
step = process_batches(
|
||||
conf,
|
||||
(loop_storage, loop_index, sync_queue, &mut sync_queue_receiver),
|
||||
max_sync_errors,
|
||||
loop_storage,
|
||||
&index,
|
||||
batched_tasks,
|
||||
sync_queue,
|
||||
)
|
||||
.instrument(info_span!("storage_sync_loop_step")) => step,
|
||||
.instrument(info_span!("storage_sync_loop_step")) => ControlFlow::Continue(step),
|
||||
_ = thread_mgr::shutdown_watcher() => ControlFlow::Break(()),
|
||||
}
|
||||
});
|
||||
@@ -944,31 +935,18 @@ fn storage_sync_loop<P, S>(
|
||||
}
|
||||
}
|
||||
|
||||
async fn loop_step<P, S>(
|
||||
async fn process_batches<P, S>(
|
||||
conf: &'static PageServerConf,
|
||||
(storage, index, sync_queue, sync_queue_receiver): (
|
||||
Arc<S>,
|
||||
RemoteIndex,
|
||||
&SyncQueue,
|
||||
&mut UnboundedReceiver<(ZTenantTimelineId, SyncTask)>,
|
||||
),
|
||||
max_sync_errors: NonZeroU32,
|
||||
) -> ControlFlow<(), HashMap<ZTenantId, HashMap<ZTimelineId, TimelineSyncStatusUpdate>>>
|
||||
storage: Arc<S>,
|
||||
index: &RemoteIndex,
|
||||
batched_tasks: HashMap<ZTenantTimelineId, SyncTaskBatch>,
|
||||
sync_queue: &SyncQueue,
|
||||
) -> HashMap<ZTenantId, HashMap<ZTimelineId, TimelineSyncStatusUpdate>>
|
||||
where
|
||||
P: Debug + Send + Sync + 'static,
|
||||
S: RemoteStorage<RemoteObjectId = P> + Send + Sync + 'static,
|
||||
{
|
||||
let batched_tasks = sync_queue.next_task_batch(sync_queue_receiver).await;
|
||||
|
||||
let remaining_queue_length = sync_queue.len();
|
||||
REMAINING_SYNC_ITEMS.set(remaining_queue_length as i64);
|
||||
if remaining_queue_length > 0 || !batched_tasks.is_empty() {
|
||||
info!("Processing tasks for {} timelines in batch, more tasks left to process: {remaining_queue_length}", batched_tasks.len());
|
||||
} else {
|
||||
debug!("No tasks to process");
|
||||
return ControlFlow::Continue(HashMap::new());
|
||||
}
|
||||
|
||||
let mut sync_results = batched_tasks
|
||||
.into_iter()
|
||||
.map(|(sync_id, batch)| {
|
||||
@@ -993,6 +971,7 @@ where
|
||||
ZTenantId,
|
||||
HashMap<ZTimelineId, TimelineSyncStatusUpdate>,
|
||||
> = HashMap::new();
|
||||
|
||||
while let Some((sync_id, state_update)) = sync_results.next().await {
|
||||
debug!("Finished storage sync task for sync id {sync_id}");
|
||||
if let Some(state_update) = state_update {
|
||||
@@ -1003,7 +982,7 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
ControlFlow::Continue(new_timeline_states)
|
||||
new_timeline_states
|
||||
}
|
||||
|
||||
async fn process_sync_task_batch<P, S>(
|
||||
@@ -1376,7 +1355,6 @@ where
|
||||
P: Debug + Send + Sync + 'static,
|
||||
S: RemoteStorage<RemoteObjectId = P> + Send + Sync + 'static,
|
||||
{
|
||||
info!("Updating remote index for the timeline");
|
||||
let updated_remote_timeline = {
|
||||
let mut index_accessor = index.write().await;
|
||||
|
||||
@@ -1443,7 +1421,7 @@ where
|
||||
IndexPart::from_remote_timeline(&timeline_path, updated_remote_timeline)
|
||||
.context("Failed to create an index part from the updated remote timeline")?;
|
||||
|
||||
info!("Uploading remote data for the timeline");
|
||||
info!("Uploading remote index for the timeline");
|
||||
upload_index_part(conf, storage, sync_id, new_index_part)
|
||||
.await
|
||||
.context("Failed to upload new index part")
|
||||
@@ -1685,7 +1663,7 @@ mod tests {
|
||||
|
||||
#[tokio::test]
|
||||
async fn separate_task_ids_batch() {
|
||||
let (sync_queue, mut sync_queue_receiver) = SyncQueue::new(NonZeroUsize::new(100).unwrap());
|
||||
let sync_queue = SyncQueue::new(NonZeroUsize::new(100).unwrap());
|
||||
assert_eq!(sync_queue.len(), 0);
|
||||
|
||||
let sync_id_2 = ZTenantTimelineId {
|
||||
@@ -1720,7 +1698,7 @@ mod tests {
|
||||
|
||||
let submitted_tasks_count = sync_queue.len();
|
||||
assert_eq!(submitted_tasks_count, 3);
|
||||
let mut batch = sync_queue.next_task_batch(&mut sync_queue_receiver).await;
|
||||
let (mut batch, _) = sync_queue.next_task_batch();
|
||||
assert_eq!(
|
||||
batch.len(),
|
||||
submitted_tasks_count,
|
||||
@@ -1746,7 +1724,7 @@ mod tests {
|
||||
|
||||
#[tokio::test]
|
||||
async fn same_task_id_separate_tasks_batch() {
|
||||
let (sync_queue, mut sync_queue_receiver) = SyncQueue::new(NonZeroUsize::new(100).unwrap());
|
||||
let sync_queue = SyncQueue::new(NonZeroUsize::new(100).unwrap());
|
||||
assert_eq!(sync_queue.len(), 0);
|
||||
|
||||
let download = LayersDownload {
|
||||
@@ -1769,7 +1747,7 @@ mod tests {
|
||||
|
||||
let submitted_tasks_count = sync_queue.len();
|
||||
assert_eq!(submitted_tasks_count, 3);
|
||||
let mut batch = sync_queue.next_task_batch(&mut sync_queue_receiver).await;
|
||||
let (mut batch, _) = sync_queue.next_task_batch();
|
||||
assert_eq!(
|
||||
batch.len(),
|
||||
1,
|
||||
@@ -1801,7 +1779,7 @@ mod tests {
|
||||
|
||||
#[tokio::test]
|
||||
async fn same_task_id_same_tasks_batch() {
|
||||
let (sync_queue, mut sync_queue_receiver) = SyncQueue::new(NonZeroUsize::new(1).unwrap());
|
||||
let sync_queue = SyncQueue::new(NonZeroUsize::new(1).unwrap());
|
||||
let download_1 = LayersDownload {
|
||||
layers_to_skip: HashSet::from([PathBuf::from("sk1")]),
|
||||
};
|
||||
@@ -1823,11 +1801,11 @@ mod tests {
|
||||
|
||||
sync_queue.push(TEST_SYNC_ID, SyncTask::download(download_1.clone()));
|
||||
sync_queue.push(TEST_SYNC_ID, SyncTask::download(download_2.clone()));
|
||||
sync_queue.push(sync_id_2, SyncTask::download(download_3.clone()));
|
||||
sync_queue.push(sync_id_2, SyncTask::download(download_3));
|
||||
sync_queue.push(TEST_SYNC_ID, SyncTask::download(download_4.clone()));
|
||||
assert_eq!(sync_queue.len(), 4);
|
||||
|
||||
let mut smallest_batch = sync_queue.next_task_batch(&mut sync_queue_receiver).await;
|
||||
let (mut smallest_batch, _) = sync_queue.next_task_batch();
|
||||
assert_eq!(
|
||||
smallest_batch.len(),
|
||||
1,
|
||||
|
||||
@@ -119,7 +119,7 @@ mod tests {
|
||||
#[tokio::test]
|
||||
async fn delete_timeline_negative() -> anyhow::Result<()> {
|
||||
let harness = RepoHarness::create("delete_timeline_negative")?;
|
||||
let (sync_queue, _) = SyncQueue::new(NonZeroUsize::new(100).unwrap());
|
||||
let sync_queue = SyncQueue::new(NonZeroUsize::new(100).unwrap());
|
||||
let sync_id = ZTenantTimelineId::new(harness.tenant_id, TIMELINE_ID);
|
||||
let storage = LocalFs::new(
|
||||
tempdir()?.path().to_path_buf(),
|
||||
@@ -152,7 +152,7 @@ mod tests {
|
||||
#[tokio::test]
|
||||
async fn delete_timeline() -> anyhow::Result<()> {
|
||||
let harness = RepoHarness::create("delete_timeline")?;
|
||||
let (sync_queue, _) = SyncQueue::new(NonZeroUsize::new(100).unwrap());
|
||||
let sync_queue = SyncQueue::new(NonZeroUsize::new(100).unwrap());
|
||||
|
||||
let sync_id = ZTenantTimelineId::new(harness.tenant_id, TIMELINE_ID);
|
||||
let layer_files = ["a", "b", "c", "d"];
|
||||
|
||||
@@ -286,7 +286,7 @@ mod tests {
|
||||
#[tokio::test]
|
||||
async fn download_timeline() -> anyhow::Result<()> {
|
||||
let harness = RepoHarness::create("download_timeline")?;
|
||||
let (sync_queue, _) = SyncQueue::new(NonZeroUsize::new(100).unwrap());
|
||||
let sync_queue = SyncQueue::new(NonZeroUsize::new(100).unwrap());
|
||||
|
||||
let sync_id = ZTenantTimelineId::new(harness.tenant_id, TIMELINE_ID);
|
||||
let layer_files = ["a", "b", "layer_to_skip", "layer_to_keep_locally"];
|
||||
@@ -385,7 +385,7 @@ mod tests {
|
||||
#[tokio::test]
|
||||
async fn download_timeline_negatives() -> anyhow::Result<()> {
|
||||
let harness = RepoHarness::create("download_timeline_negatives")?;
|
||||
let (sync_queue, _) = SyncQueue::new(NonZeroUsize::new(100).unwrap());
|
||||
let sync_queue = SyncQueue::new(NonZeroUsize::new(100).unwrap());
|
||||
let sync_id = ZTenantTimelineId::new(harness.tenant_id, TIMELINE_ID);
|
||||
let storage = LocalFs::new(tempdir()?.path().to_owned(), harness.conf.workdir.clone())?;
|
||||
|
||||
|
||||
@@ -240,7 +240,7 @@ mod tests {
|
||||
#[tokio::test]
|
||||
async fn regular_layer_upload() -> anyhow::Result<()> {
|
||||
let harness = RepoHarness::create("regular_layer_upload")?;
|
||||
let (sync_queue, _) = SyncQueue::new(NonZeroUsize::new(100).unwrap());
|
||||
let sync_queue = SyncQueue::new(NonZeroUsize::new(100).unwrap());
|
||||
let sync_id = ZTenantTimelineId::new(harness.tenant_id, TIMELINE_ID);
|
||||
|
||||
let layer_files = ["a", "b"];
|
||||
@@ -327,7 +327,7 @@ mod tests {
|
||||
#[tokio::test]
|
||||
async fn layer_upload_after_local_fs_update() -> anyhow::Result<()> {
|
||||
let harness = RepoHarness::create("layer_upload_after_local_fs_update")?;
|
||||
let (sync_queue, _) = SyncQueue::new(NonZeroUsize::new(100).unwrap());
|
||||
let sync_queue = SyncQueue::new(NonZeroUsize::new(100).unwrap());
|
||||
let sync_id = ZTenantTimelineId::new(harness.tenant_id, TIMELINE_ID);
|
||||
|
||||
let layer_files = ["a1", "b1"];
|
||||
|
||||
@@ -80,6 +80,9 @@ pub enum TenantState {
|
||||
// The local disk might have some newer files that don't exist in cloud storage yet.
|
||||
// The tenant cannot be accessed anymore for any reason, but graceful shutdown.
|
||||
Stopping,
|
||||
|
||||
// Something went wrong loading the tenant state
|
||||
Broken,
|
||||
}
|
||||
|
||||
impl fmt::Display for TenantState {
|
||||
@@ -88,6 +91,7 @@ impl fmt::Display for TenantState {
|
||||
TenantState::Active => f.write_str("Active"),
|
||||
TenantState::Idle => f.write_str("Idle"),
|
||||
TenantState::Stopping => f.write_str("Stopping"),
|
||||
TenantState::Broken => f.write_str("Broken"),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -101,7 +105,22 @@ pub fn init_tenant_mgr(conf: &'static PageServerConf) -> anyhow::Result<RemoteIn
|
||||
local_timeline_init_statuses,
|
||||
} = storage_sync::start_local_timeline_sync(conf)
|
||||
.context("Failed to set up local files sync with external storage")?;
|
||||
init_local_repositories(conf, local_timeline_init_statuses, &remote_index)?;
|
||||
|
||||
for (tenant_id, local_timeline_init_statuses) in local_timeline_init_statuses {
|
||||
if let Err(err) =
|
||||
init_local_repository(conf, tenant_id, local_timeline_init_statuses, &remote_index)
|
||||
{
|
||||
// Report the error, but continue with the startup for other tenants. An error
|
||||
// loading a tenant is serious, but it's better to complete the startup and
|
||||
// serve other tenants, than fail completely.
|
||||
error!("Failed to initialize local tenant {tenant_id}: {:?}", err);
|
||||
let mut m = tenants_state::write_tenants();
|
||||
if let Some(tenant) = m.get_mut(&tenant_id) {
|
||||
tenant.state = TenantState::Broken;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(remote_index)
|
||||
}
|
||||
|
||||
@@ -145,8 +164,13 @@ pub fn shutdown_all_tenants() {
|
||||
let mut m = tenants_state::write_tenants();
|
||||
let mut tenantids = Vec::new();
|
||||
for (tenantid, tenant) in m.iter_mut() {
|
||||
tenant.state = TenantState::Stopping;
|
||||
tenantids.push(*tenantid)
|
||||
match tenant.state {
|
||||
TenantState::Active | TenantState::Idle | TenantState::Stopping => {
|
||||
tenant.state = TenantState::Stopping;
|
||||
tenantids.push(*tenantid)
|
||||
}
|
||||
TenantState::Broken => {}
|
||||
}
|
||||
}
|
||||
drop(m);
|
||||
|
||||
@@ -259,6 +283,10 @@ pub fn activate_tenant(tenant_id: ZTenantId) -> anyhow::Result<()> {
|
||||
TenantState::Stopping => {
|
||||
// don't re-activate it if it's being stopped
|
||||
}
|
||||
|
||||
TenantState::Broken => {
|
||||
// cannot activate
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
@@ -359,38 +387,37 @@ pub fn list_tenants() -> Vec<TenantInfo> {
|
||||
.collect()
|
||||
}
|
||||
|
||||
fn init_local_repositories(
|
||||
fn init_local_repository(
|
||||
conf: &'static PageServerConf,
|
||||
local_timeline_init_statuses: HashMap<ZTenantId, HashMap<ZTimelineId, LocalTimelineInitStatus>>,
|
||||
tenant_id: ZTenantId,
|
||||
local_timeline_init_statuses: HashMap<ZTimelineId, LocalTimelineInitStatus>,
|
||||
remote_index: &RemoteIndex,
|
||||
) -> anyhow::Result<(), anyhow::Error> {
|
||||
for (tenant_id, local_timeline_init_statuses) in local_timeline_init_statuses {
|
||||
// initialize local tenant
|
||||
let repo = load_local_repo(conf, tenant_id, remote_index)
|
||||
.with_context(|| format!("Failed to load repo for tenant {tenant_id}"))?;
|
||||
// initialize local tenant
|
||||
let repo = load_local_repo(conf, tenant_id, remote_index)
|
||||
.with_context(|| format!("Failed to load repo for tenant {tenant_id}"))?;
|
||||
|
||||
let mut status_updates = HashMap::with_capacity(local_timeline_init_statuses.len());
|
||||
for (timeline_id, init_status) in local_timeline_init_statuses {
|
||||
match init_status {
|
||||
LocalTimelineInitStatus::LocallyComplete => {
|
||||
debug!("timeline {timeline_id} for tenant {tenant_id} is locally complete, registering it in repository");
|
||||
status_updates.insert(timeline_id, TimelineSyncStatusUpdate::Downloaded);
|
||||
}
|
||||
LocalTimelineInitStatus::NeedsSync => {
|
||||
debug!(
|
||||
"timeline {tenant_id} for tenant {timeline_id} needs sync, \
|
||||
so skipped for adding into repository until sync is finished"
|
||||
);
|
||||
}
|
||||
let mut status_updates = HashMap::with_capacity(local_timeline_init_statuses.len());
|
||||
for (timeline_id, init_status) in local_timeline_init_statuses {
|
||||
match init_status {
|
||||
LocalTimelineInitStatus::LocallyComplete => {
|
||||
debug!("timeline {timeline_id} for tenant {tenant_id} is locally complete, registering it in repository");
|
||||
status_updates.insert(timeline_id, TimelineSyncStatusUpdate::Downloaded);
|
||||
}
|
||||
LocalTimelineInitStatus::NeedsSync => {
|
||||
debug!(
|
||||
"timeline {tenant_id} for tenant {timeline_id} needs sync, \
|
||||
so skipped for adding into repository until sync is finished"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
// Lets fail here loudly to be on the safe side.
|
||||
// XXX: It may be a better api to actually distinguish between repository startup
|
||||
// and processing of newly downloaded timelines.
|
||||
apply_timeline_remote_sync_status_updates(&repo, status_updates)
|
||||
.with_context(|| format!("Failed to bootstrap timelines for tenant {tenant_id}"))?
|
||||
}
|
||||
|
||||
// Lets fail here loudly to be on the safe side.
|
||||
// XXX: It may be a better api to actually distinguish between repository startup
|
||||
// and processing of newly downloaded timelines.
|
||||
apply_timeline_remote_sync_status_updates(&repo, status_updates)
|
||||
.with_context(|| format!("Failed to bootstrap timelines for tenant {tenant_id}"))?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
||||
@@ -139,7 +139,7 @@ pub fn spawn<F>(
|
||||
name: &str,
|
||||
shutdown_process_on_error: bool,
|
||||
f: F,
|
||||
) -> std::io::Result<()>
|
||||
) -> std::io::Result<u64>
|
||||
where
|
||||
F: FnOnce() -> anyhow::Result<()> + Send + 'static,
|
||||
{
|
||||
@@ -193,7 +193,7 @@ where
|
||||
drop(jh_guard);
|
||||
|
||||
// The thread is now running. Nothing more to do here
|
||||
Ok(())
|
||||
Ok(thread_id)
|
||||
}
|
||||
|
||||
/// This wrapper function runs in a newly-spawned thread. It initializes the
|
||||
|
||||
@@ -45,6 +45,8 @@ pub struct LocalTimelineInfo {
|
||||
#[serde_as(as = "Option<DisplayFromStr>")]
|
||||
pub prev_record_lsn: Option<Lsn>,
|
||||
#[serde_as(as = "DisplayFromStr")]
|
||||
pub latest_gc_cutoff_lsn: Lsn,
|
||||
#[serde_as(as = "DisplayFromStr")]
|
||||
pub disk_consistent_lsn: Lsn,
|
||||
pub current_logical_size: Option<usize>, // is None when timeline is Unloaded
|
||||
pub current_logical_size_non_incremental: Option<usize>,
|
||||
@@ -68,6 +70,7 @@ impl LocalTimelineInfo {
|
||||
disk_consistent_lsn: datadir_tline.tline.get_disk_consistent_lsn(),
|
||||
last_record_lsn,
|
||||
prev_record_lsn: Some(datadir_tline.tline.get_prev_record_lsn()),
|
||||
latest_gc_cutoff_lsn: *datadir_tline.tline.get_latest_gc_cutoff_lsn(),
|
||||
timeline_state: LocalTimelineState::Loaded,
|
||||
current_logical_size: Some(datadir_tline.get_current_logical_size()),
|
||||
current_logical_size_non_incremental: if include_non_incremental_logical_size {
|
||||
@@ -91,6 +94,7 @@ impl LocalTimelineInfo {
|
||||
disk_consistent_lsn: metadata.disk_consistent_lsn(),
|
||||
last_record_lsn: metadata.disk_consistent_lsn(),
|
||||
prev_record_lsn: metadata.prev_record_lsn(),
|
||||
latest_gc_cutoff_lsn: metadata.latest_gc_cutoff_lsn(),
|
||||
timeline_state: LocalTimelineState::Unloaded,
|
||||
current_logical_size: None,
|
||||
current_logical_size_non_incremental: None,
|
||||
|
||||
@@ -34,7 +34,7 @@ const STORAGE_IO_TIME_BUCKETS: &[f64] = &[
|
||||
|
||||
lazy_static! {
|
||||
static ref STORAGE_IO_TIME: HistogramVec = register_histogram_vec!(
|
||||
"pageserver_io_time",
|
||||
"pageserver_io_operations_seconds",
|
||||
"Time spent in IO operations",
|
||||
&["operation", "tenant_id", "timeline_id"],
|
||||
STORAGE_IO_TIME_BUCKETS.into()
|
||||
@@ -43,8 +43,8 @@ lazy_static! {
|
||||
}
|
||||
lazy_static! {
|
||||
static ref STORAGE_IO_SIZE: IntGaugeVec = register_int_gauge_vec!(
|
||||
"pageserver_io_size",
|
||||
"Amount of bytes",
|
||||
"pageserver_io_operations_bytes_total",
|
||||
"Total amount of bytes read/written in IO operations",
|
||||
&["operation", "tenant_id", "timeline_id"]
|
||||
)
|
||||
.expect("failed to define a metric");
|
||||
|
||||
@@ -24,6 +24,7 @@
|
||||
use anyhow::Context;
|
||||
use postgres_ffi::nonrelfile_utils::clogpage_precedes;
|
||||
use postgres_ffi::nonrelfile_utils::slru_may_delete_clogsegment;
|
||||
use postgres_ffi::{page_is_new, page_set_lsn};
|
||||
|
||||
use anyhow::Result;
|
||||
use bytes::{Buf, Bytes, BytesMut};
|
||||
@@ -304,8 +305,14 @@ impl<'a, R: Repository> WalIngest<'a, R> {
|
||||
image.resize(image.len() + blk.hole_length as usize, 0u8);
|
||||
image.unsplit(tail);
|
||||
}
|
||||
image[0..4].copy_from_slice(&((lsn.0 >> 32) as u32).to_le_bytes());
|
||||
image[4..8].copy_from_slice(&(lsn.0 as u32).to_le_bytes());
|
||||
//
|
||||
// Match the logic of XLogReadBufferForRedoExtended:
|
||||
// The page may be uninitialized. If so, we can't set the LSN because
|
||||
// that would corrupt the page.
|
||||
//
|
||||
if !page_is_new(&image) {
|
||||
page_set_lsn(&mut image, lsn)
|
||||
}
|
||||
assert_eq!(image.len(), pg_constants::BLCKSZ as usize);
|
||||
self.put_rel_page_image(modification, rel, blk.blkno, image.freeze())?;
|
||||
} else {
|
||||
|
||||
@@ -18,6 +18,8 @@ use lazy_static::lazy_static;
|
||||
use postgres_ffi::waldecoder::*;
|
||||
use postgres_protocol::message::backend::ReplicationMessage;
|
||||
use postgres_types::PgLsn;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use serde_with::{serde_as, DisplayFromStr};
|
||||
use std::cell::Cell;
|
||||
use std::collections::HashMap;
|
||||
use std::str::FromStr;
|
||||
@@ -35,11 +37,19 @@ use utils::{
|
||||
zid::{ZTenantId, ZTenantTimelineId, ZTimelineId},
|
||||
};
|
||||
|
||||
//
|
||||
// We keep one WAL Receiver active per timeline.
|
||||
//
|
||||
struct WalReceiverEntry {
|
||||
///
|
||||
/// A WAL receiver's data stored inside the global `WAL_RECEIVERS`.
|
||||
/// We keep one WAL receiver active per timeline.
|
||||
///
|
||||
#[serde_as]
|
||||
#[derive(Debug, Serialize, Deserialize, Clone)]
|
||||
pub struct WalReceiverEntry {
|
||||
thread_id: u64,
|
||||
wal_producer_connstr: String,
|
||||
#[serde_as(as = "Option<DisplayFromStr>")]
|
||||
last_received_msg_lsn: Option<Lsn>,
|
||||
/// the timestamp (in microseconds) of the last received message
|
||||
last_received_msg_ts: Option<u128>,
|
||||
}
|
||||
|
||||
lazy_static! {
|
||||
@@ -74,7 +84,7 @@ pub fn launch_wal_receiver(
|
||||
receiver.wal_producer_connstr = wal_producer_connstr.into();
|
||||
}
|
||||
None => {
|
||||
thread_mgr::spawn(
|
||||
let thread_id = thread_mgr::spawn(
|
||||
ThreadKind::WalReceiver,
|
||||
Some(tenantid),
|
||||
Some(timelineid),
|
||||
@@ -88,7 +98,10 @@ pub fn launch_wal_receiver(
|
||||
)?;
|
||||
|
||||
let receiver = WalReceiverEntry {
|
||||
thread_id,
|
||||
wal_producer_connstr: wal_producer_connstr.into(),
|
||||
last_received_msg_lsn: None,
|
||||
last_received_msg_ts: None,
|
||||
};
|
||||
receivers.insert((tenantid, timelineid), receiver);
|
||||
|
||||
@@ -99,15 +112,13 @@ pub fn launch_wal_receiver(
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// Look up current WAL producer connection string in the hash table
|
||||
fn get_wal_producer_connstr(tenantid: ZTenantId, timelineid: ZTimelineId) -> String {
|
||||
/// Look up a WAL receiver's data in the global `WAL_RECEIVERS`
|
||||
pub fn get_wal_receiver_entry(
|
||||
tenant_id: ZTenantId,
|
||||
timeline_id: ZTimelineId,
|
||||
) -> Option<WalReceiverEntry> {
|
||||
let receivers = WAL_RECEIVERS.lock().unwrap();
|
||||
|
||||
receivers
|
||||
.get(&(tenantid, timelineid))
|
||||
.unwrap()
|
||||
.wal_producer_connstr
|
||||
.clone()
|
||||
receivers.get(&(tenant_id, timeline_id)).cloned()
|
||||
}
|
||||
|
||||
//
|
||||
@@ -118,7 +129,18 @@ fn thread_main(conf: &'static PageServerConf, tenant_id: ZTenantId, timeline_id:
|
||||
info!("WAL receiver thread started");
|
||||
|
||||
// Look up the current WAL producer address
|
||||
let wal_producer_connstr = get_wal_producer_connstr(tenant_id, timeline_id);
|
||||
let wal_producer_connstr = {
|
||||
match get_wal_receiver_entry(tenant_id, timeline_id) {
|
||||
Some(e) => e.wal_producer_connstr,
|
||||
None => {
|
||||
info!(
|
||||
"Unable to create the WAL receiver thread: no WAL receiver entry found for tenant {} and timeline {}",
|
||||
tenant_id, timeline_id
|
||||
);
|
||||
return;
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
// Make a connection to the WAL safekeeper, or directly to the primary PostgreSQL server,
|
||||
// and start streaming WAL from it.
|
||||
@@ -318,6 +340,28 @@ fn walreceiver_main(
|
||||
let apply_lsn = u64::from(timeline_remote_consistent_lsn);
|
||||
let ts = SystemTime::now();
|
||||
|
||||
// Update the current WAL receiver's data stored inside the global hash table `WAL_RECEIVERS`
|
||||
{
|
||||
let mut receivers = WAL_RECEIVERS.lock().unwrap();
|
||||
let entry = match receivers.get_mut(&(tenant_id, timeline_id)) {
|
||||
Some(e) => e,
|
||||
None => {
|
||||
anyhow::bail!(
|
||||
"no WAL receiver entry found for tenant {} and timeline {}",
|
||||
tenant_id,
|
||||
timeline_id
|
||||
);
|
||||
}
|
||||
};
|
||||
|
||||
entry.last_received_msg_lsn = Some(last_lsn);
|
||||
entry.last_received_msg_ts = Some(
|
||||
ts.duration_since(SystemTime::UNIX_EPOCH)
|
||||
.expect("Received message time should be before UNIX EPOCH!")
|
||||
.as_micros(),
|
||||
);
|
||||
}
|
||||
|
||||
// Send zenith feedback message.
|
||||
// Regular standby_status_update fields are put into this message.
|
||||
let zenith_status_update = ZenithFeedback {
|
||||
|
||||
@@ -106,16 +106,16 @@ impl crate::walredo::WalRedoManager for DummyRedoManager {
|
||||
// each tenant.
|
||||
lazy_static! {
|
||||
static ref WAL_REDO_TIME: Histogram =
|
||||
register_histogram!("pageserver_wal_redo_time", "Time spent on WAL redo")
|
||||
register_histogram!("pageserver_wal_redo_seconds", "Time spent on WAL redo")
|
||||
.expect("failed to define a metric");
|
||||
static ref WAL_REDO_WAIT_TIME: Histogram = register_histogram!(
|
||||
"pageserver_wal_redo_wait_time",
|
||||
"pageserver_wal_redo_wait_seconds",
|
||||
"Time spent waiting for access to the WAL redo process"
|
||||
)
|
||||
.expect("failed to define a metric");
|
||||
static ref WAL_REDO_RECORD_COUNTER: IntCounter = register_int_counter!(
|
||||
"pageserver_wal_records_replayed",
|
||||
"Number of WAL records replayed"
|
||||
"pageserver_replayed_wal_records_total",
|
||||
"Number of WAL records replayed in WAL redo process"
|
||||
)
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user