mirror of
https://github.com/neondatabase/neon.git
synced 2026-02-13 07:30:38 +00:00
Compare commits
7 Commits
problame/p
...
problame/p
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
1863a04fb0 | ||
|
|
f83a71ca6a | ||
|
|
74a634c9fa | ||
|
|
fc3f8a65b3 | ||
|
|
9f03dd24c2 | ||
|
|
dc96a7604a | ||
|
|
d7c94e67ce |
4
.github/workflows/build_and_test.yml
vendored
4
.github/workflows/build_and_test.yml
vendored
@@ -1092,10 +1092,8 @@ jobs:
|
||||
run: |
|
||||
if [[ "$GITHUB_REF_NAME" == "main" ]]; then
|
||||
gh workflow --repo neondatabase/aws run deploy-dev.yml --ref main -f branch=main -f dockerTag=${{needs.tag.outputs.build-tag}} -f deployPreprodRegion=false
|
||||
|
||||
# TODO: move deployPreprodRegion to release (`"$GITHUB_REF_NAME" == "release"` block), once Staging support different compute tag prefixes for different regions
|
||||
gh workflow --repo neondatabase/aws run deploy-dev.yml --ref main -f branch=main -f dockerTag=${{needs.tag.outputs.build-tag}} -f deployPreprodRegion=true
|
||||
elif [[ "$GITHUB_REF_NAME" == "release" ]]; then
|
||||
gh workflow --repo neondatabase/aws run deploy-dev.yml --ref main -f branch=main -f dockerTag=${{needs.tag.outputs.build-tag}} -f deployPreprodRegion=true
|
||||
gh workflow --repo neondatabase/aws run deploy-prod.yml --ref main -f branch=main -f dockerTag=${{needs.tag.outputs.build-tag}} -f disclamerAcknowledged=true
|
||||
else
|
||||
echo "GITHUB_REF_NAME (value '$GITHUB_REF_NAME') is not set to either 'main' or 'release'"
|
||||
|
||||
27
Cargo.lock
generated
27
Cargo.lock
generated
@@ -158,6 +158,17 @@ dependencies = [
|
||||
"syn 1.0.109",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "async-channel"
|
||||
version = "1.9.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "81953c529336010edd6d8e358f886d9581267795c61b19475b71314bffa46d35"
|
||||
dependencies = [
|
||||
"concurrent-queue",
|
||||
"event-listener",
|
||||
"futures-core",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "async-compression"
|
||||
version = "0.4.0"
|
||||
@@ -1031,6 +1042,15 @@ dependencies = [
|
||||
"zstd",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "concurrent-queue"
|
||||
version = "2.3.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "f057a694a54f12365049b0958a1685bb52d567f5593b355fbf685838e873d400"
|
||||
dependencies = [
|
||||
"crossbeam-utils",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "const_format"
|
||||
version = "0.2.30"
|
||||
@@ -1452,6 +1472,12 @@ dependencies = [
|
||||
"libc",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "event-listener"
|
||||
version = "2.5.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "0206175f82b8d6bf6652ff7d71a1e27fd2e4efde587fd368662814d6ec1d9ce0"
|
||||
|
||||
[[package]]
|
||||
name = "fail"
|
||||
version = "0.5.1"
|
||||
@@ -2674,6 +2700,7 @@ name = "pageserver"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"async-channel",
|
||||
"async-compression",
|
||||
"async-stream",
|
||||
"async-trait",
|
||||
|
||||
@@ -35,6 +35,7 @@ license = "Apache-2.0"
|
||||
## All dependency versions, used in the project
|
||||
[workspace.dependencies]
|
||||
anyhow = { version = "1.0", features = ["backtrace"] }
|
||||
async-channel = "1.9.0"
|
||||
async-compression = { version = "0.4.0", features = ["tokio", "gzip"] }
|
||||
flate2 = "1.0.26"
|
||||
async-stream = "0.3"
|
||||
|
||||
@@ -368,8 +368,8 @@ RUN wget https://github.com/citusdata/postgresql-hll/archive/refs/tags/v2.18.tar
|
||||
FROM build-deps AS plpgsql-check-pg-build
|
||||
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
|
||||
RUN wget https://github.com/okbob/plpgsql_check/archive/refs/tags/v2.5.3.tar.gz -O plpgsql_check.tar.gz && \
|
||||
echo "6631ec3e7fb3769eaaf56e3dfedb829aa761abf163d13dba354b4c218508e1c0 plpgsql_check.tar.gz" | sha256sum --check && \
|
||||
RUN wget https://github.com/okbob/plpgsql_check/archive/refs/tags/v2.4.0.tar.gz -O plpgsql_check.tar.gz && \
|
||||
echo "9ba58387a279b35a3bfa39ee611e5684e6cddb2ba046ddb2c5190b3bd2ca254a plpgsql_check.tar.gz" | sha256sum --check && \
|
||||
mkdir plpgsql_check-src && cd plpgsql_check-src && tar xvzf ../plpgsql_check.tar.gz --strip-components=1 -C . && \
|
||||
make -j $(getconf _NPROCESSORS_ONLN) PG_CONFIG=/usr/local/pgsql/bin/pg_config USE_PGXS=1 && \
|
||||
make -j $(getconf _NPROCESSORS_ONLN) install PG_CONFIG=/usr/local/pgsql/bin/pg_config USE_PGXS=1 && \
|
||||
|
||||
@@ -116,7 +116,6 @@ fn main() -> Result<()> {
|
||||
"attachment_service" => handle_attachment_service(sub_args, &env),
|
||||
"safekeeper" => handle_safekeeper(sub_args, &env),
|
||||
"endpoint" => handle_endpoint(sub_args, &env),
|
||||
"mappings" => handle_mappings(sub_args, &mut env),
|
||||
"pg" => bail!("'pg' subcommand has been renamed to 'endpoint'"),
|
||||
_ => bail!("unexpected subcommand {sub_name}"),
|
||||
};
|
||||
@@ -817,38 +816,6 @@ fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<(
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn handle_mappings(sub_match: &ArgMatches, env: &mut local_env::LocalEnv) -> Result<()> {
|
||||
let (sub_name, sub_args) = match sub_match.subcommand() {
|
||||
Some(ep_subcommand_data) => ep_subcommand_data,
|
||||
None => bail!("no mappings subcommand provided"),
|
||||
};
|
||||
|
||||
match sub_name {
|
||||
"map" => {
|
||||
let branch_name = sub_args
|
||||
.get_one::<String>("branch-name")
|
||||
.expect("branch-name argument missing");
|
||||
|
||||
let tenant_id = sub_args
|
||||
.get_one::<String>("tenant-id")
|
||||
.map(|x| TenantId::from_str(x))
|
||||
.expect("tenant-id argument missing")
|
||||
.expect("malformed tenant-id arg");
|
||||
|
||||
let timeline_id = sub_args
|
||||
.get_one::<String>("timeline-id")
|
||||
.map(|x| TimelineId::from_str(x))
|
||||
.expect("timeline-id argument missing")
|
||||
.expect("malformed timeline-id arg");
|
||||
|
||||
env.register_branch_mapping(branch_name.to_owned(), tenant_id, timeline_id)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
other => unimplemented!("mappings subcommand {other}"),
|
||||
}
|
||||
}
|
||||
|
||||
fn handle_pageserver(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> {
|
||||
fn get_pageserver(env: &local_env::LocalEnv, args: &ArgMatches) -> Result<PageServerNode> {
|
||||
let node_id = if let Some(id_str) = args.get_one::<String>("pageserver-id") {
|
||||
@@ -1117,7 +1084,6 @@ fn cli() -> Command {
|
||||
// --id, when using a pageserver command
|
||||
let pageserver_id_arg = Arg::new("pageserver-id")
|
||||
.long("id")
|
||||
.global(true)
|
||||
.help("pageserver id")
|
||||
.required(false);
|
||||
// --pageserver-id when using a non-pageserver command
|
||||
@@ -1288,20 +1254,17 @@ fn cli() -> Command {
|
||||
Command::new("pageserver")
|
||||
.arg_required_else_help(true)
|
||||
.about("Manage pageserver")
|
||||
.arg(pageserver_id_arg)
|
||||
.subcommand(Command::new("status"))
|
||||
.subcommand(Command::new("start")
|
||||
.about("Start local pageserver")
|
||||
.arg(pageserver_config_args.clone())
|
||||
)
|
||||
.subcommand(Command::new("stop")
|
||||
.about("Stop local pageserver")
|
||||
.arg(stop_mode_arg.clone())
|
||||
)
|
||||
.subcommand(Command::new("restart")
|
||||
.about("Restart local pageserver")
|
||||
.arg(pageserver_config_args.clone())
|
||||
)
|
||||
.arg(pageserver_id_arg.clone())
|
||||
.subcommand(Command::new("start").about("Start local pageserver")
|
||||
.arg(pageserver_id_arg.clone())
|
||||
.arg(pageserver_config_args.clone()))
|
||||
.subcommand(Command::new("stop").about("Stop local pageserver")
|
||||
.arg(pageserver_id_arg.clone())
|
||||
.arg(stop_mode_arg.clone()))
|
||||
.subcommand(Command::new("restart").about("Restart local pageserver")
|
||||
.arg(pageserver_id_arg.clone())
|
||||
.arg(pageserver_config_args.clone()))
|
||||
)
|
||||
.subcommand(
|
||||
Command::new("attachment_service")
|
||||
@@ -1358,8 +1321,8 @@ fn cli() -> Command {
|
||||
.about("Start postgres.\n If the endpoint doesn't exist yet, it is created.")
|
||||
.arg(endpoint_id_arg.clone())
|
||||
.arg(tenant_id_arg.clone())
|
||||
.arg(branch_name_arg.clone())
|
||||
.arg(timeline_id_arg.clone())
|
||||
.arg(branch_name_arg)
|
||||
.arg(timeline_id_arg)
|
||||
.arg(lsn_arg)
|
||||
.arg(pg_port_arg)
|
||||
.arg(http_port_arg)
|
||||
@@ -1372,7 +1335,7 @@ fn cli() -> Command {
|
||||
.subcommand(
|
||||
Command::new("stop")
|
||||
.arg(endpoint_id_arg)
|
||||
.arg(tenant_id_arg.clone())
|
||||
.arg(tenant_id_arg)
|
||||
.arg(
|
||||
Arg::new("destroy")
|
||||
.help("Also delete data directory (now optional, should be default in future)")
|
||||
@@ -1383,18 +1346,6 @@ fn cli() -> Command {
|
||||
)
|
||||
|
||||
)
|
||||
.subcommand(
|
||||
Command::new("mappings")
|
||||
.arg_required_else_help(true)
|
||||
.about("Manage neon_local branch name mappings")
|
||||
.subcommand(
|
||||
Command::new("map")
|
||||
.about("Create new mapping which cannot exist already")
|
||||
.arg(branch_name_arg.clone())
|
||||
.arg(tenant_id_arg.clone())
|
||||
.arg(timeline_id_arg.clone())
|
||||
)
|
||||
)
|
||||
// Obsolete old name for 'endpoint'. We now just print an error if it's used.
|
||||
.subcommand(
|
||||
Command::new("pg")
|
||||
|
||||
@@ -442,20 +442,10 @@ impl<IO: AsyncRead + AsyncWrite + Unpin> PostgresBackend<IO> {
|
||||
trace!("got message {:?}", msg);
|
||||
|
||||
let result = self.process_message(handler, msg, &mut query_string).await;
|
||||
tokio::select!(
|
||||
biased;
|
||||
_ = shutdown_watcher() => {
|
||||
// We were requested to shut down.
|
||||
tracing::info!("shutdown request received during response flush");
|
||||
return Ok(())
|
||||
},
|
||||
flush_r = self.flush() => {
|
||||
flush_r?;
|
||||
}
|
||||
);
|
||||
|
||||
self.flush().await?;
|
||||
match result? {
|
||||
ProcessMsgResult::Continue => {
|
||||
self.flush().await?;
|
||||
continue;
|
||||
}
|
||||
ProcessMsgResult::Break => break,
|
||||
|
||||
@@ -1,9 +1,8 @@
|
||||
use hyper::{header, Body, Response, StatusCode};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::borrow::Cow;
|
||||
use std::error::Error as StdError;
|
||||
use thiserror::Error;
|
||||
use tracing::{error, info};
|
||||
use tracing::error;
|
||||
|
||||
#[derive(Debug, Error)]
|
||||
pub enum ApiError {
|
||||
@@ -26,7 +25,7 @@ pub enum ApiError {
|
||||
PreconditionFailed(Box<str>),
|
||||
|
||||
#[error("Resource temporarily unavailable: {0}")]
|
||||
ResourceUnavailable(Cow<'static, str>),
|
||||
ResourceUnavailable(String),
|
||||
|
||||
#[error("Shutting down")]
|
||||
ShuttingDown,
|
||||
@@ -116,12 +115,10 @@ pub async fn route_error_handler(err: routerify::RouteError) -> Response<Body> {
|
||||
|
||||
pub fn api_error_handler(api_error: ApiError) -> Response<Body> {
|
||||
// Print a stack trace for Internal Server errors
|
||||
|
||||
match api_error {
|
||||
ApiError::ResourceUnavailable(_) => info!("Error processing HTTP request: {api_error:#}"),
|
||||
ApiError::NotFound(_) => info!("Error processing HTTP request: {api_error:#}"),
|
||||
ApiError::InternalServerError(_) => error!("Error processing HTTP request: {api_error:?}"),
|
||||
_ => error!("Error processing HTTP request: {api_error:#}"),
|
||||
if let ApiError::InternalServerError(_) = api_error {
|
||||
error!("Error processing HTTP request: {api_error:?}");
|
||||
} else {
|
||||
error!("Error processing HTTP request: {api_error:#}");
|
||||
}
|
||||
|
||||
api_error.into_response()
|
||||
|
||||
@@ -58,7 +58,7 @@ where
|
||||
// to get that.
|
||||
impl<T: Ord> PartialOrd for Waiter<T> {
|
||||
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
|
||||
Some(self.cmp(other))
|
||||
other.wake_num.partial_cmp(&self.wake_num)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -12,6 +12,7 @@ testing = ["fail/failpoints"]
|
||||
|
||||
[dependencies]
|
||||
anyhow.workspace = true
|
||||
async-channel.workspace = true
|
||||
async-compression.workspace = true
|
||||
async-stream.workspace = true
|
||||
async-trait.workspace = true
|
||||
|
||||
@@ -8,7 +8,6 @@ use anyhow::{anyhow, bail, ensure, Context, Result};
|
||||
use remote_storage::{RemotePath, RemoteStorageConfig};
|
||||
use serde::de::IntoDeserializer;
|
||||
use std::env;
|
||||
use std::ops::Deref;
|
||||
use storage_broker::Uri;
|
||||
use utils::crashsafe::path_with_suffix_extension;
|
||||
use utils::id::ConnectionId;
|
||||
@@ -154,7 +153,7 @@ pub struct PageServerConf {
|
||||
// that during unit testing, because the current directory is global
|
||||
// to the process but different unit tests work on different
|
||||
// repositories.
|
||||
pub workdir: PageserverConfWorkdir,
|
||||
pub workdir: Utf8PathBuf,
|
||||
|
||||
pub pg_distrib_dir: Utf8PathBuf,
|
||||
|
||||
@@ -212,10 +211,6 @@ pub struct PageServerConf {
|
||||
|
||||
/// JWT token for use with the control plane API.
|
||||
pub control_plane_api_token: Option<SecretString>,
|
||||
|
||||
/// If true, pageserver will make best-effort to operate without a control plane: only
|
||||
/// for use in major incidents.
|
||||
pub control_plane_emergency_mode: bool,
|
||||
}
|
||||
|
||||
/// We do not want to store this in a PageServerConf because the latter may be logged
|
||||
@@ -242,45 +237,6 @@ impl<T> BuilderValue<T> {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, PartialEq, Eq)]
|
||||
pub struct PageserverConfWorkdir {
|
||||
d: Utf8PathBuf,
|
||||
}
|
||||
|
||||
impl Deref for PageServerConf {
|
||||
type Target = PageserverConfWorkdir;
|
||||
|
||||
fn deref(&self) -> &Self::Target {
|
||||
&self.workdir
|
||||
}
|
||||
}
|
||||
|
||||
impl Deref for PageserverConfWorkdir {
|
||||
type Target = Utf8Path;
|
||||
|
||||
fn deref(&self) -> &Self::Target {
|
||||
&self.d
|
||||
}
|
||||
}
|
||||
|
||||
impl AsRef<std::path::Path> for PageserverConfWorkdir {
|
||||
fn as_ref(&self) -> &std::path::Path {
|
||||
self.d.as_ref()
|
||||
}
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for PageserverConfWorkdir {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
self.d.fmt(f)
|
||||
}
|
||||
}
|
||||
|
||||
impl std::fmt::Display for PageserverConfWorkdir {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
self.d.fmt(f)
|
||||
}
|
||||
}
|
||||
|
||||
// needed to simplify config construction
|
||||
struct PageServerConfigBuilder {
|
||||
listen_pg_addr: BuilderValue<String>,
|
||||
@@ -297,7 +253,7 @@ struct PageServerConfigBuilder {
|
||||
page_cache_size: BuilderValue<usize>,
|
||||
max_file_descriptors: BuilderValue<usize>,
|
||||
|
||||
workdir: BuilderValue<PageserverConfWorkdir>,
|
||||
workdir: BuilderValue<Utf8PathBuf>,
|
||||
|
||||
pg_distrib_dir: BuilderValue<Utf8PathBuf>,
|
||||
|
||||
@@ -332,7 +288,6 @@ struct PageServerConfigBuilder {
|
||||
|
||||
control_plane_api: BuilderValue<Option<Url>>,
|
||||
control_plane_api_token: BuilderValue<Option<SecretString>>,
|
||||
control_plane_emergency_mode: BuilderValue<bool>,
|
||||
}
|
||||
|
||||
impl Default for PageServerConfigBuilder {
|
||||
@@ -350,9 +305,7 @@ impl Default for PageServerConfigBuilder {
|
||||
superuser: Set(DEFAULT_SUPERUSER.to_string()),
|
||||
page_cache_size: Set(DEFAULT_PAGE_CACHE_SIZE),
|
||||
max_file_descriptors: Set(DEFAULT_MAX_FILE_DESCRIPTORS),
|
||||
workdir: Set(PageserverConfWorkdir {
|
||||
d: Utf8PathBuf::new(),
|
||||
}),
|
||||
workdir: Set(Utf8PathBuf::new()),
|
||||
pg_distrib_dir: Set(Utf8PathBuf::from_path_buf(
|
||||
env::current_dir().expect("cannot access current directory"),
|
||||
)
|
||||
@@ -402,7 +355,6 @@ impl Default for PageServerConfigBuilder {
|
||||
|
||||
control_plane_api: Set(None),
|
||||
control_plane_api_token: Set(None),
|
||||
control_plane_emergency_mode: Set(false),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -441,7 +393,7 @@ impl PageServerConfigBuilder {
|
||||
}
|
||||
|
||||
pub fn workdir(&mut self, workdir: Utf8PathBuf) {
|
||||
self.workdir = BuilderValue::Set(PageserverConfWorkdir { d: workdir })
|
||||
self.workdir = BuilderValue::Set(workdir)
|
||||
}
|
||||
|
||||
pub fn pg_distrib_dir(&mut self, pg_distrib_dir: Utf8PathBuf) {
|
||||
@@ -539,10 +491,6 @@ impl PageServerConfigBuilder {
|
||||
self.control_plane_api_token = BuilderValue::Set(token)
|
||||
}
|
||||
|
||||
pub fn control_plane_emergency_mode(&mut self, enabled: bool) {
|
||||
self.control_plane_emergency_mode = BuilderValue::Set(enabled)
|
||||
}
|
||||
|
||||
pub fn build(self) -> anyhow::Result<PageServerConf> {
|
||||
let concurrent_tenant_size_logical_size_queries = self
|
||||
.concurrent_tenant_size_logical_size_queries
|
||||
@@ -634,24 +582,21 @@ impl PageServerConfigBuilder {
|
||||
control_plane_api_token: self
|
||||
.control_plane_api_token
|
||||
.ok_or(anyhow!("missing control_plane_api_token"))?,
|
||||
control_plane_emergency_mode: self
|
||||
.control_plane_emergency_mode
|
||||
.ok_or(anyhow!("missing control_plane_emergency_mode"))?,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl PageserverConfWorkdir {
|
||||
impl PageServerConf {
|
||||
//
|
||||
// Repository paths, relative to workdir.
|
||||
//
|
||||
|
||||
pub fn tenants_path(&self) -> Utf8PathBuf {
|
||||
self.d.join(TENANTS_SEGMENT_NAME)
|
||||
self.workdir.join(TENANTS_SEGMENT_NAME)
|
||||
}
|
||||
|
||||
pub fn deletion_prefix(&self) -> Utf8PathBuf {
|
||||
self.d.join("deletion")
|
||||
self.workdir.join("deletion")
|
||||
}
|
||||
|
||||
pub fn deletion_list_path(&self, sequence: u64) -> Utf8PathBuf {
|
||||
@@ -734,7 +679,7 @@ impl PageserverConfWorkdir {
|
||||
}
|
||||
|
||||
pub fn traces_path(&self) -> Utf8PathBuf {
|
||||
self.d.join("traces")
|
||||
self.workdir.join("traces")
|
||||
}
|
||||
|
||||
pub fn trace_path(
|
||||
@@ -758,11 +703,9 @@ impl PageserverConfWorkdir {
|
||||
|
||||
/// Turns storage remote path of a file into its local path.
|
||||
pub fn local_path(&self, remote_path: &RemotePath) -> Utf8PathBuf {
|
||||
remote_path.with_base(&self.d)
|
||||
remote_path.with_base(&self.workdir)
|
||||
}
|
||||
}
|
||||
|
||||
impl PageServerConf {
|
||||
//
|
||||
// Postgres distribution paths
|
||||
//
|
||||
@@ -864,10 +807,6 @@ impl PageServerConf {
|
||||
builder.control_plane_api_token(Some(parsed.into()))
|
||||
}
|
||||
},
|
||||
"control_plane_emergency_mode" => {
|
||||
builder.control_plane_emergency_mode(parse_toml_bool(key, item)?)
|
||||
|
||||
},
|
||||
_ => bail!("unrecognized pageserver option '{key}'"),
|
||||
}
|
||||
}
|
||||
@@ -1002,9 +941,6 @@ impl PageServerConf {
|
||||
}
|
||||
|
||||
pub fn dummy_conf(repo_dir: Utf8PathBuf) -> Self {
|
||||
|
||||
let repo_dir = PageserverConfWorkdir { d: repo_dir };
|
||||
|
||||
let pg_distrib_dir = Utf8PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("../pg_install");
|
||||
|
||||
PageServerConf {
|
||||
@@ -1040,7 +976,6 @@ impl PageServerConf {
|
||||
background_task_maximum_delay: Duration::ZERO,
|
||||
control_plane_api: None,
|
||||
control_plane_api_token: None,
|
||||
control_plane_emergency_mode: false,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1264,8 +1199,7 @@ background_task_maximum_delay = '334 s'
|
||||
defaults::DEFAULT_BACKGROUND_TASK_MAXIMUM_DELAY
|
||||
)?,
|
||||
control_plane_api: None,
|
||||
control_plane_api_token: None,
|
||||
control_plane_emergency_mode: false
|
||||
control_plane_api_token: None
|
||||
},
|
||||
"Correct defaults should be used when no config values are provided"
|
||||
);
|
||||
@@ -1321,8 +1255,7 @@ background_task_maximum_delay = '334 s'
|
||||
ondemand_download_behavior_treat_error_as_warn: false,
|
||||
background_task_maximum_delay: Duration::from_secs(334),
|
||||
control_plane_api: None,
|
||||
control_plane_api_token: None,
|
||||
control_plane_emergency_mode: false
|
||||
control_plane_api_token: None
|
||||
},
|
||||
"Should be able to parse all basic config values correctly"
|
||||
);
|
||||
@@ -1542,12 +1475,10 @@ threshold = "20m"
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn prepare_fs(tempdir: &Utf8TempDir) -> anyhow::Result<(PageserverConfWorkdir, Utf8PathBuf)> {
|
||||
fn prepare_fs(tempdir: &Utf8TempDir) -> anyhow::Result<(Utf8PathBuf, Utf8PathBuf)> {
|
||||
let tempdir_path = tempdir.path();
|
||||
|
||||
let workdir = PageserverConfWorkdir {
|
||||
d: tempdir_path.join("workdir"),
|
||||
};
|
||||
let workdir = tempdir_path.join("workdir");
|
||||
fs::create_dir_all(&workdir)?;
|
||||
|
||||
let pg_distrib_dir = tempdir_path.join("pg_distrib");
|
||||
|
||||
@@ -86,15 +86,18 @@
|
||||
//! [`RequestContext`] argument. Functions in the middle of the call chain
|
||||
//! only need to pass it on.
|
||||
|
||||
use std::sync::{Arc, Mutex, MutexGuard};
|
||||
|
||||
use crate::task_mgr::TaskKind;
|
||||
|
||||
// The main structure of this module, see module-level comment.
|
||||
#[derive(Clone, Debug)]
|
||||
#[derive(Clone)]
|
||||
pub struct RequestContext {
|
||||
task_kind: TaskKind,
|
||||
download_behavior: DownloadBehavior,
|
||||
access_stats_behavior: AccessStatsBehavior,
|
||||
page_content_kind: PageContentKind,
|
||||
page_cache_permit: Option<Arc<crate::page_cache::PinnedSlotsPermit>>,
|
||||
}
|
||||
|
||||
/// The kind of access to the page cache.
|
||||
@@ -150,6 +153,7 @@ impl RequestContextBuilder {
|
||||
download_behavior: DownloadBehavior::Download,
|
||||
access_stats_behavior: AccessStatsBehavior::Update,
|
||||
page_content_kind: PageContentKind::Unknown,
|
||||
page_cache_permit: None,
|
||||
},
|
||||
}
|
||||
}
|
||||
@@ -163,6 +167,7 @@ impl RequestContextBuilder {
|
||||
download_behavior: original.download_behavior,
|
||||
access_stats_behavior: original.access_stats_behavior,
|
||||
page_content_kind: original.page_content_kind,
|
||||
page_cache_permit: original.page_cache_permit.clone(),
|
||||
},
|
||||
}
|
||||
}
|
||||
@@ -186,6 +191,11 @@ impl RequestContextBuilder {
|
||||
self
|
||||
}
|
||||
|
||||
pub(crate) fn page_cache_permit(mut self, p: Arc<crate::page_cache::PinnedSlotsPermit>) -> Self {
|
||||
self.inner.page_cache_permit = Some(p);
|
||||
self
|
||||
}
|
||||
|
||||
pub fn build(self) -> RequestContext {
|
||||
self.inner
|
||||
}
|
||||
@@ -286,4 +296,8 @@ impl RequestContext {
|
||||
pub(crate) fn page_content_kind(&self) -> PageContentKind {
|
||||
self.page_content_kind
|
||||
}
|
||||
|
||||
pub(crate) fn permit(&self) -> Option<&crate::page_cache::PinnedSlotsPermit> {
|
||||
self.page_cache_permit.as_ref().map(|p| &**p)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -133,8 +133,6 @@ impl ControlPlaneGenerationsApi for ControlPlaneClient {
|
||||
node_id: self.node_id,
|
||||
};
|
||||
|
||||
fail::fail_point!("control-plane-client-re-attach");
|
||||
|
||||
let response: ReAttachResponse = self.retry_http_forever(&re_attach_path, request).await?;
|
||||
tracing::info!(
|
||||
"Received re-attach response with {} tenants",
|
||||
@@ -170,8 +168,6 @@ impl ControlPlaneGenerationsApi for ControlPlaneClient {
|
||||
.collect(),
|
||||
};
|
||||
|
||||
fail::fail_point!("control-plane-client-validate");
|
||||
|
||||
let response: ValidateResponse = self.retry_http_forever(&re_attach_path, request).await?;
|
||||
|
||||
Ok(response
|
||||
|
||||
@@ -40,6 +40,7 @@ use validator::ValidatorQueueMessage;
|
||||
|
||||
use crate::{config::PageServerConf, tenant::storage_layer::LayerFileName};
|
||||
|
||||
// TODO: adminstrative "panic button" config property to disable all deletions
|
||||
// TODO: configurable for how long to wait before executing deletions
|
||||
|
||||
/// We aggregate object deletions from many tenants in one place, for several reasons:
|
||||
@@ -185,7 +186,7 @@ where
|
||||
V: Serialize,
|
||||
I: AsRef<[u8]>,
|
||||
{
|
||||
let transformed = input.iter().map(|(k, v)| (hex::encode(k), v));
|
||||
let transformed = input.iter().map(|(k, v)| (hex::encode(k), v.clone()));
|
||||
|
||||
transformed
|
||||
.collect::<HashMap<String, &V>>()
|
||||
@@ -212,7 +213,7 @@ where
|
||||
|
||||
/// Files ending with this suffix will be ignored and erased
|
||||
/// during recovery as startup.
|
||||
const TEMP_SUFFIX: &str = "tmp";
|
||||
const TEMP_SUFFIX: &str = ".tmp";
|
||||
|
||||
#[serde_as]
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
@@ -324,7 +325,10 @@ impl DeletionList {
|
||||
return false;
|
||||
}
|
||||
|
||||
let timeline_entry = tenant_entry.timelines.entry(*timeline).or_default();
|
||||
let timeline_entry = tenant_entry
|
||||
.timelines
|
||||
.entry(*timeline)
|
||||
.or_insert_with(Vec::new);
|
||||
|
||||
let timeline_remote_path = remote_timeline_path(tenant, timeline);
|
||||
|
||||
|
||||
@@ -230,7 +230,6 @@ impl ListWriter {
|
||||
let list_name_pattern =
|
||||
Regex::new("(?<sequence>[a-zA-Z0-9]{16})-(?<version>[a-zA-Z0-9]{2}).list").unwrap();
|
||||
|
||||
let temp_extension = format!(".{TEMP_SUFFIX}");
|
||||
let header_path = self.conf.deletion_header_path();
|
||||
let mut seqs: Vec<u64> = Vec::new();
|
||||
while let Some(dentry) = dir.next_entry().await? {
|
||||
@@ -242,7 +241,7 @@ impl ListWriter {
|
||||
continue;
|
||||
}
|
||||
|
||||
if dentry_str.ends_with(&temp_extension) {
|
||||
if dentry_str.ends_with(TEMP_SUFFIX) {
|
||||
info!("Cleaning up temporary file {dentry_str}");
|
||||
let absolute_path =
|
||||
deletion_directory.join(dentry.file_name().to_str().expect("non-Unicode path"));
|
||||
|
||||
@@ -220,8 +220,6 @@ where
|
||||
warn!("Dropping stale deletions for tenant {tenant_id} in generation {:?}, objects may be leaked", tenant.generation);
|
||||
metrics::DELETION_QUEUE.keys_dropped.inc_by(tenant.len() as u64);
|
||||
mutated = true;
|
||||
} else {
|
||||
metrics::DELETION_QUEUE.keys_validated.inc_by(tenant.len() as u64);
|
||||
}
|
||||
this_list_valid
|
||||
});
|
||||
|
||||
@@ -93,16 +93,9 @@ paths:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/Error"
|
||||
"503":
|
||||
description: Temporarily unavailable, please retry.
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/ServiceUnavailableError"
|
||||
|
||||
delete:
|
||||
description: |
|
||||
Attempts to delete specified tenant. 500, 503 and 409 errors should be retried until 404 is retrieved.
|
||||
Attempts to delete specified tenant. 500 and 409 errors should be retried until 404 is retrieved.
|
||||
404 means that deletion successfully finished"
|
||||
responses:
|
||||
"400":
|
||||
@@ -141,13 +134,6 @@ paths:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/Error"
|
||||
"503":
|
||||
description: Temporarily unavailable, please retry.
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/ServiceUnavailableError"
|
||||
|
||||
|
||||
/v1/tenant/{tenant_id}/timeline:
|
||||
parameters:
|
||||
@@ -192,13 +178,6 @@ paths:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/Error"
|
||||
"503":
|
||||
description: Temporarily unavailable, please retry.
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/ServiceUnavailableError"
|
||||
|
||||
|
||||
/v1/tenant/{tenant_id}/timeline/{timeline_id}:
|
||||
parameters:
|
||||
@@ -247,13 +226,6 @@ paths:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/Error"
|
||||
"503":
|
||||
description: Temporarily unavailable, please retry.
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/ServiceUnavailableError"
|
||||
|
||||
delete:
|
||||
description: "Attempts to delete specified timeline. 500 and 409 errors should be retried"
|
||||
responses:
|
||||
@@ -293,19 +265,13 @@ paths:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/PreconditionFailedError"
|
||||
|
||||
"500":
|
||||
description: Generic operation error
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/Error"
|
||||
"503":
|
||||
description: Temporarily unavailable, please retry.
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/ServiceUnavailableError"
|
||||
|
||||
|
||||
/v1/tenant/{tenant_id}/timeline/{timeline_id}/get_lsn_by_timestamp:
|
||||
parameters:
|
||||
@@ -362,13 +328,6 @@ paths:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/Error"
|
||||
"503":
|
||||
description: Temporarily unavailable, please retry.
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/ServiceUnavailableError"
|
||||
|
||||
/v1/tenant/{tenant_id}/timeline/{timeline_id}/do_gc:
|
||||
parameters:
|
||||
- name: tenant_id
|
||||
@@ -416,13 +375,6 @@ paths:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/Error"
|
||||
"503":
|
||||
description: Temporarily unavailable, please retry.
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/ServiceUnavailableError"
|
||||
|
||||
/v1/tenant/{tenant_id}/attach:
|
||||
parameters:
|
||||
- name: tenant_id
|
||||
@@ -513,13 +465,6 @@ paths:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/Error"
|
||||
"503":
|
||||
description: Temporarily unavailable, please retry.
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/ServiceUnavailableError"
|
||||
|
||||
|
||||
/v1/tenant/{tenant_id}/detach:
|
||||
parameters:
|
||||
@@ -573,13 +518,6 @@ paths:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/Error"
|
||||
"503":
|
||||
description: Temporarily unavailable, please retry.
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/ServiceUnavailableError"
|
||||
|
||||
|
||||
/v1/tenant/{tenant_id}/ignore:
|
||||
parameters:
|
||||
@@ -622,13 +560,6 @@ paths:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/Error"
|
||||
"503":
|
||||
description: Temporarily unavailable, please retry.
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/ServiceUnavailableError"
|
||||
|
||||
|
||||
/v1/tenant/{tenant_id}/load:
|
||||
parameters:
|
||||
@@ -673,13 +604,6 @@ paths:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/Error"
|
||||
"503":
|
||||
description: Temporarily unavailable, please retry.
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/ServiceUnavailableError"
|
||||
|
||||
|
||||
/v1/tenant/{tenant_id}/synthetic_size:
|
||||
parameters:
|
||||
@@ -717,12 +641,6 @@ paths:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/Error"
|
||||
"503":
|
||||
description: Temporarily unavailable, please retry.
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/ServiceUnavailableError"
|
||||
|
||||
/v1/tenant/{tenant_id}/size:
|
||||
parameters:
|
||||
@@ -786,13 +704,6 @@ paths:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/Error"
|
||||
"503":
|
||||
description: Temporarily unavailable, please retry.
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/ServiceUnavailableError"
|
||||
|
||||
|
||||
/v1/tenant/{tenant_id}/timeline/:
|
||||
parameters:
|
||||
@@ -869,13 +780,6 @@ paths:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/Error"
|
||||
"503":
|
||||
description: Temporarily unavailable, please retry.
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/ServiceUnavailableError"
|
||||
|
||||
/v1/tenant/:
|
||||
get:
|
||||
description: Get tenants list
|
||||
@@ -906,13 +810,6 @@ paths:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/Error"
|
||||
"503":
|
||||
description: Temporarily unavailable, please retry.
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/ServiceUnavailableError"
|
||||
|
||||
post:
|
||||
description: |
|
||||
Create a tenant. Returns new tenant id on success.
|
||||
@@ -963,13 +860,6 @@ paths:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/Error"
|
||||
"503":
|
||||
description: Temporarily unavailable, please retry.
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/ServiceUnavailableError"
|
||||
|
||||
|
||||
/v1/tenant/config:
|
||||
put:
|
||||
@@ -1015,13 +905,6 @@ paths:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/Error"
|
||||
"503":
|
||||
description: Temporarily unavailable, please retry.
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/ServiceUnavailableError"
|
||||
|
||||
/v1/tenant/{tenant_id}/config/:
|
||||
parameters:
|
||||
- name: tenant_id
|
||||
@@ -1071,13 +954,6 @@ paths:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/Error"
|
||||
"503":
|
||||
description: Temporarily unavailable, please retry.
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/ServiceUnavailableError"
|
||||
|
||||
components:
|
||||
securitySchemes:
|
||||
JWT:
|
||||
@@ -1344,13 +1220,6 @@ components:
|
||||
properties:
|
||||
msg:
|
||||
type: string
|
||||
ServiceUnavailableError:
|
||||
type: object
|
||||
required:
|
||||
- msg
|
||||
properties:
|
||||
msg:
|
||||
type: string
|
||||
NotFoundError:
|
||||
type: object
|
||||
required:
|
||||
|
||||
@@ -6,7 +6,6 @@ use std::sync::Arc;
|
||||
|
||||
use anyhow::{anyhow, Context, Result};
|
||||
use futures::TryFutureExt;
|
||||
use hyper::header::CONTENT_TYPE;
|
||||
use hyper::StatusCode;
|
||||
use hyper::{Body, Request, Response, Uri};
|
||||
use metrics::launch_timestamp::LaunchTimestamp;
|
||||
@@ -134,7 +133,7 @@ impl From<PageReconstructError> for ApiError {
|
||||
ApiError::InternalServerError(anyhow::anyhow!("request was cancelled"))
|
||||
}
|
||||
PageReconstructError::AncestorStopping(_) => {
|
||||
ApiError::ResourceUnavailable(format!("{pre}").into())
|
||||
ApiError::ResourceUnavailable(format!("{pre}"))
|
||||
}
|
||||
PageReconstructError::WalRedo(pre) => {
|
||||
ApiError::InternalServerError(anyhow::Error::new(pre))
|
||||
@@ -147,7 +146,7 @@ impl From<TenantMapInsertError> for ApiError {
|
||||
fn from(tmie: TenantMapInsertError) -> ApiError {
|
||||
match tmie {
|
||||
TenantMapInsertError::StillInitializing | TenantMapInsertError::ShuttingDown => {
|
||||
ApiError::ResourceUnavailable(format!("{tmie}").into())
|
||||
ApiError::ResourceUnavailable(format!("{tmie}"))
|
||||
}
|
||||
TenantMapInsertError::TenantAlreadyExists(id, state) => {
|
||||
ApiError::Conflict(format!("tenant {id} already exists, state: {state:?}"))
|
||||
@@ -396,9 +395,6 @@ async fn timeline_create_handler(
|
||||
format!("{err:#}")
|
||||
))
|
||||
}
|
||||
Err(e @ tenant::CreateTimelineError::AncestorNotActive) => {
|
||||
json_response(StatusCode::SERVICE_UNAVAILABLE, HttpErrorBody::from_msg(e.to_string()))
|
||||
}
|
||||
Err(tenant::CreateTimelineError::Other(err)) => Err(ApiError::InternalServerError(err)),
|
||||
}
|
||||
}
|
||||
@@ -640,7 +636,7 @@ async fn tenant_list_handler(
|
||||
.instrument(info_span!("tenant_list"))
|
||||
.await
|
||||
.map_err(|_| {
|
||||
ApiError::ResourceUnavailable("Tenant map is initializing or shutting down".into())
|
||||
ApiError::ResourceUnavailable("Tenant map is initializing or shutting down".to_string())
|
||||
})?
|
||||
.iter()
|
||||
.map(|(id, state)| TenantInfo {
|
||||
@@ -1240,136 +1236,6 @@ async fn deletion_queue_flush(
|
||||
}
|
||||
}
|
||||
|
||||
/// Try if `GetPage@Lsn` is successful, useful for manual debugging.
|
||||
async fn getpage_at_lsn_handler(
|
||||
request: Request<Body>,
|
||||
_cancel: CancellationToken,
|
||||
) -> Result<Response<Body>, ApiError> {
|
||||
let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
|
||||
let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
|
||||
check_permission(&request, Some(tenant_id))?;
|
||||
|
||||
struct Key(crate::repository::Key);
|
||||
|
||||
impl std::str::FromStr for Key {
|
||||
type Err = anyhow::Error;
|
||||
|
||||
fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
|
||||
crate::repository::Key::from_hex(s).map(Key)
|
||||
}
|
||||
}
|
||||
|
||||
let key: Key = parse_query_param(&request, "key")?
|
||||
.ok_or_else(|| ApiError::BadRequest(anyhow!("missing 'key' query parameter")))?;
|
||||
let lsn: Lsn = parse_query_param(&request, "lsn")?
|
||||
.ok_or_else(|| ApiError::BadRequest(anyhow!("missing 'lsn' query parameter")))?;
|
||||
|
||||
async {
|
||||
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
|
||||
let timeline = active_timeline_of_active_tenant(tenant_id, timeline_id).await?;
|
||||
|
||||
let page = timeline.get(key.0, lsn, &ctx).await?;
|
||||
|
||||
Result::<_, ApiError>::Ok(
|
||||
Response::builder()
|
||||
.status(StatusCode::OK)
|
||||
.header(CONTENT_TYPE, "application/octet-stream")
|
||||
.body(hyper::Body::from(page))
|
||||
.unwrap(),
|
||||
)
|
||||
}
|
||||
.instrument(info_span!("timeline_get", %tenant_id, %timeline_id))
|
||||
.await
|
||||
}
|
||||
|
||||
async fn timeline_collect_keyspace(
|
||||
request: Request<Body>,
|
||||
_cancel: CancellationToken,
|
||||
) -> Result<Response<Body>, ApiError> {
|
||||
let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
|
||||
let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
|
||||
check_permission(&request, Some(tenant_id))?;
|
||||
|
||||
struct Partitioning {
|
||||
keys: crate::keyspace::KeySpace,
|
||||
|
||||
at_lsn: Lsn,
|
||||
}
|
||||
|
||||
impl serde::Serialize for Partitioning {
|
||||
fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
|
||||
where
|
||||
S: serde::Serializer,
|
||||
{
|
||||
use serde::ser::SerializeMap;
|
||||
let mut map = serializer.serialize_map(Some(2))?;
|
||||
map.serialize_key("keys")?;
|
||||
map.serialize_value(&KeySpace(&self.keys))?;
|
||||
map.serialize_key("at_lsn")?;
|
||||
map.serialize_value(&WithDisplay(&self.at_lsn))?;
|
||||
map.end()
|
||||
}
|
||||
}
|
||||
|
||||
struct WithDisplay<'a, T>(&'a T);
|
||||
|
||||
impl<'a, T: std::fmt::Display> serde::Serialize for WithDisplay<'a, T> {
|
||||
fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
|
||||
where
|
||||
S: serde::Serializer,
|
||||
{
|
||||
serializer.collect_str(&self.0)
|
||||
}
|
||||
}
|
||||
|
||||
struct KeySpace<'a>(&'a crate::keyspace::KeySpace);
|
||||
|
||||
impl<'a> serde::Serialize for KeySpace<'a> {
|
||||
fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
|
||||
where
|
||||
S: serde::Serializer,
|
||||
{
|
||||
use serde::ser::SerializeSeq;
|
||||
let mut seq = serializer.serialize_seq(Some(self.0.ranges.len()))?;
|
||||
for kr in &self.0.ranges {
|
||||
seq.serialize_element(&KeyRange(kr))?;
|
||||
}
|
||||
seq.end()
|
||||
}
|
||||
}
|
||||
|
||||
struct KeyRange<'a>(&'a std::ops::Range<crate::repository::Key>);
|
||||
|
||||
impl<'a> serde::Serialize for KeyRange<'a> {
|
||||
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
|
||||
where
|
||||
S: serde::Serializer,
|
||||
{
|
||||
use serde::ser::SerializeTuple;
|
||||
let mut t = serializer.serialize_tuple(2)?;
|
||||
t.serialize_element(&WithDisplay(&self.0.start))?;
|
||||
t.serialize_element(&WithDisplay(&self.0.end))?;
|
||||
t.end()
|
||||
}
|
||||
}
|
||||
|
||||
let at_lsn: Option<Lsn> = parse_query_param(&request, "at_lsn")?;
|
||||
|
||||
async {
|
||||
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
|
||||
let timeline = active_timeline_of_active_tenant(tenant_id, timeline_id).await?;
|
||||
let at_lsn = at_lsn.unwrap_or_else(|| timeline.get_last_record_lsn());
|
||||
let keys = timeline
|
||||
.collect_keyspace(at_lsn, &ctx)
|
||||
.await
|
||||
.map_err(ApiError::InternalServerError)?;
|
||||
|
||||
json_response(StatusCode::OK, Partitioning { keys, at_lsn })
|
||||
}
|
||||
.instrument(info_span!("timeline_collect_keyspace", %tenant_id, %timeline_id))
|
||||
.await
|
||||
}
|
||||
|
||||
async fn active_timeline_of_active_tenant(
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
@@ -1717,12 +1583,5 @@ pub fn make_router(
|
||||
.post("/v1/tracing/event", |r| {
|
||||
testing_api_handler("emit a tracing event", r, post_tracing_event_handler)
|
||||
})
|
||||
.get("/v1/tenant/:tenant_id/timeline/:timeline_id/getpage", |r| {
|
||||
testing_api_handler("getpage@lsn", r, getpage_at_lsn_handler)
|
||||
})
|
||||
.get(
|
||||
"/v1/tenant/:tenant_id/timeline/:timeline_id/keyspace",
|
||||
|r| testing_api_handler("read out the keyspace", r, timeline_collect_keyspace),
|
||||
)
|
||||
.any(handler_404))
|
||||
}
|
||||
|
||||
@@ -314,7 +314,6 @@ static PAGE_CACHE_ERRORS: Lazy<IntCounterVec> = Lazy::new(|| {
|
||||
#[strum(serialize_all = "kebab_case")]
|
||||
pub(crate) enum PageCacheErrorKind {
|
||||
AcquirePinnedSlotTimeout,
|
||||
EvictIterLimit,
|
||||
}
|
||||
|
||||
pub(crate) fn page_cache_errors_inc(error_kind: PageCacheErrorKind) {
|
||||
@@ -691,9 +690,10 @@ impl StorageIoTime {
|
||||
.expect("failed to define a metric");
|
||||
let metrics = std::array::from_fn(|i| {
|
||||
let op = StorageIoOperation::from_repr(i).unwrap();
|
||||
storage_io_histogram_vec
|
||||
let metric = storage_io_histogram_vec
|
||||
.get_metric_with_label_values(&[op.as_str()])
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
metric
|
||||
});
|
||||
Self { metrics }
|
||||
}
|
||||
@@ -966,7 +966,6 @@ pub(crate) struct DeletionQueueMetrics {
|
||||
pub(crate) keys_submitted: IntCounter,
|
||||
pub(crate) keys_dropped: IntCounter,
|
||||
pub(crate) keys_executed: IntCounter,
|
||||
pub(crate) keys_validated: IntCounter,
|
||||
pub(crate) dropped_lsn_updates: IntCounter,
|
||||
pub(crate) unexpected_errors: IntCounter,
|
||||
pub(crate) remote_errors: IntCounterVec,
|
||||
@@ -988,13 +987,7 @@ pub(crate) static DELETION_QUEUE: Lazy<DeletionQueueMetrics> = Lazy::new(|| {
|
||||
|
||||
keys_executed: register_int_counter!(
|
||||
"pageserver_deletion_queue_executed_total",
|
||||
"Number of objects deleted. Only includes objects that we actually deleted, sum with pageserver_deletion_queue_dropped_total for the total number of keys processed to completion"
|
||||
)
|
||||
.expect("failed to define a metric"),
|
||||
|
||||
keys_validated: register_int_counter!(
|
||||
"pageserver_deletion_queue_validated_total",
|
||||
"Number of keys validated for deletion. Sum with pageserver_deletion_queue_dropped_total for the total number of keys that have passed through the validation stage."
|
||||
"Number of objects deleted. Only includes objects that we actually deleted, sum with pageserver_deletion_queue_dropped_total for the total number of keys processed."
|
||||
)
|
||||
.expect("failed to define a metric"),
|
||||
|
||||
|
||||
@@ -78,6 +78,7 @@ use std::{
|
||||
atomic::{AtomicU64, AtomicU8, AtomicUsize, Ordering},
|
||||
Arc, Weak,
|
||||
},
|
||||
task::Poll,
|
||||
time::Duration,
|
||||
};
|
||||
|
||||
@@ -214,16 +215,21 @@ impl Slot {
|
||||
|
||||
impl SlotInner {
|
||||
/// If there is aready a reader, drop our permit and share its permit, just like we share read access.
|
||||
fn coalesce_readers_permit(&self, permit: PinnedSlotsPermit) -> Arc<PinnedSlotsPermit> {
|
||||
let mut guard = self.permit.lock().unwrap();
|
||||
if let Some(existing_permit) = guard.upgrade() {
|
||||
drop(guard);
|
||||
drop(permit);
|
||||
existing_permit
|
||||
} else {
|
||||
let permit = Arc::new(permit);
|
||||
*guard = Arc::downgrade(&permit);
|
||||
permit
|
||||
fn coalesce_readers_permit<'c>(&self, permit: PermitKind<'c>) -> PermitKindReadGuard<'c> {
|
||||
match permit {
|
||||
PermitKind::CtxProvided(permit) => PermitKindReadGuard::CtxProvided(permit),
|
||||
PermitKind::Acquired(permit) => {
|
||||
let mut guard = self.permit.lock().unwrap();
|
||||
if let Some(existing_permit) = guard.upgrade() {
|
||||
drop(guard);
|
||||
drop(permit);
|
||||
existing_permit
|
||||
} else {
|
||||
let permit = Arc::new(permit);
|
||||
*guard = Arc::downgrade(&permit);
|
||||
permit
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -251,21 +257,36 @@ pub struct PageCache {
|
||||
/// This is interpreted modulo the page cache size.
|
||||
next_evict_slot: AtomicUsize,
|
||||
|
||||
find_victim_sender:
|
||||
async_channel::Sender<(usize, tokio::sync::RwLockWriteGuard<'static, SlotInner>)>,
|
||||
find_victim_waiters:
|
||||
async_channel::Receiver<(usize, tokio::sync::RwLockWriteGuard<'static, SlotInner>)>,
|
||||
|
||||
size_metrics: &'static PageCacheSizeMetrics,
|
||||
}
|
||||
|
||||
struct PinnedSlotsPermit(tokio::sync::OwnedSemaphorePermit);
|
||||
pub(crate) struct PinnedSlotsPermit(tokio::sync::OwnedSemaphorePermit);
|
||||
|
||||
enum PermitKind<'c> {
|
||||
CtxProvided(&'c PinnedSlotsPermit),
|
||||
Acquired(PinnedSlotsPermit),
|
||||
}
|
||||
|
||||
enum PermitKindReadGuard<'c> {
|
||||
CtxProvided(&'c PinnedSlotsPermit),
|
||||
Coalesced(Arc<PinnedSlotsPermit>),
|
||||
}
|
||||
|
||||
///
|
||||
/// PageReadGuard is a "lease" on a buffer, for reading. The page is kept locked
|
||||
/// until the guard is dropped.
|
||||
///
|
||||
pub struct PageReadGuard<'i> {
|
||||
_permit: Arc<PinnedSlotsPermit>,
|
||||
pub struct PageReadGuard<'c, 'i> {
|
||||
_permit: PermitKindReadGuard<'c>,
|
||||
slot_guard: tokio::sync::RwLockReadGuard<'i, SlotInner>,
|
||||
}
|
||||
|
||||
impl std::ops::Deref for PageReadGuard<'_> {
|
||||
impl std::ops::Deref for PageReadGuard<'_, '_> {
|
||||
type Target = [u8; PAGE_SZ];
|
||||
|
||||
fn deref(&self) -> &Self::Target {
|
||||
@@ -273,7 +294,7 @@ impl std::ops::Deref for PageReadGuard<'_> {
|
||||
}
|
||||
}
|
||||
|
||||
impl AsRef<[u8; PAGE_SZ]> for PageReadGuard<'_> {
|
||||
impl AsRef<[u8; PAGE_SZ]> for PageReadGuard<'_, '_> {
|
||||
fn as_ref(&self) -> &[u8; PAGE_SZ] {
|
||||
self.slot_guard.buf
|
||||
}
|
||||
@@ -286,19 +307,19 @@ impl AsRef<[u8; PAGE_SZ]> for PageReadGuard<'_> {
|
||||
/// Counterintuitively, this is used even for a read, if the requested page is not
|
||||
/// currently found in the page cache. In that case, the caller of lock_for_read()
|
||||
/// is expected to fill in the page contents and call mark_valid().
|
||||
pub struct PageWriteGuard<'i> {
|
||||
state: PageWriteGuardState<'i>,
|
||||
pub struct PageWriteGuard<'c, 'i> {
|
||||
state: PageWriteGuardState<'c, 'i>,
|
||||
}
|
||||
|
||||
enum PageWriteGuardState<'i> {
|
||||
enum PageWriteGuardState<'c, 'i> {
|
||||
Invalid {
|
||||
inner: tokio::sync::RwLockWriteGuard<'i, SlotInner>,
|
||||
_permit: PinnedSlotsPermit,
|
||||
_permit: PermitKindReadGuard<'c>,
|
||||
},
|
||||
Downgraded,
|
||||
}
|
||||
|
||||
impl std::ops::DerefMut for PageWriteGuard<'_> {
|
||||
impl std::ops::DerefMut for PageWriteGuard<'_, '_> {
|
||||
fn deref_mut(&mut self) -> &mut Self::Target {
|
||||
match &mut self.state {
|
||||
PageWriteGuardState::Invalid { inner, _permit } => inner.buf,
|
||||
@@ -307,7 +328,7 @@ impl std::ops::DerefMut for PageWriteGuard<'_> {
|
||||
}
|
||||
}
|
||||
|
||||
impl std::ops::Deref for PageWriteGuard<'_> {
|
||||
impl std::ops::Deref for PageWriteGuard<'_, '_> {
|
||||
type Target = [u8; PAGE_SZ];
|
||||
|
||||
fn deref(&self) -> &Self::Target {
|
||||
@@ -318,25 +339,25 @@ impl std::ops::Deref for PageWriteGuard<'_> {
|
||||
}
|
||||
}
|
||||
|
||||
impl AsMut<[u8; PAGE_SZ]> for PageWriteGuard<'_> {
|
||||
impl AsMut<[u8; PAGE_SZ]> for PageWriteGuard<'_, '_> {
|
||||
fn as_mut(&mut self) -> &mut [u8; PAGE_SZ] {
|
||||
match &mut self.state {
|
||||
PageWriteGuardState::Invalid { inner, _permit } => inner.buf,
|
||||
PageWriteGuardState::Downgraded => unreachable!(),
|
||||
PageWriteGuardState::Downgraded => todo!(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> PageWriteGuard<'a> {
|
||||
impl<'c, 'a> PageWriteGuard<'c, 'a> {
|
||||
/// Mark that the buffer contents are now valid.
|
||||
#[must_use]
|
||||
pub fn mark_valid(mut self) -> PageReadGuard<'a> {
|
||||
pub fn mark_valid(mut self) -> PageReadGuard<'c, 'a> {
|
||||
let prev = std::mem::replace(&mut self.state, PageWriteGuardState::Downgraded);
|
||||
match prev {
|
||||
PageWriteGuardState::Invalid { inner, _permit } => {
|
||||
assert!(inner.key.is_some());
|
||||
PageReadGuard {
|
||||
_permit: Arc::new(_permit),
|
||||
_permit,
|
||||
slot_guard: inner.downgrade(),
|
||||
}
|
||||
}
|
||||
@@ -345,7 +366,7 @@ impl<'a> PageWriteGuard<'a> {
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for PageWriteGuard<'_> {
|
||||
impl Drop for PageWriteGuard<'_, '_> {
|
||||
///
|
||||
/// If the buffer was allocated for a page that was not already in the
|
||||
/// cache, but the lock_for_read/write() caller dropped the buffer without
|
||||
@@ -365,9 +386,9 @@ impl Drop for PageWriteGuard<'_> {
|
||||
}
|
||||
|
||||
/// lock_for_read() return value
|
||||
pub enum ReadBufResult<'a> {
|
||||
Found(PageReadGuard<'a>),
|
||||
NotFound(PageWriteGuard<'a>),
|
||||
pub enum ReadBufResult<'c, 'a> {
|
||||
Found(PageReadGuard<'c, 'a>),
|
||||
NotFound(PageWriteGuard<'c, 'a>),
|
||||
}
|
||||
|
||||
impl PageCache {
|
||||
@@ -389,10 +410,9 @@ impl PageCache {
|
||||
lsn: Lsn,
|
||||
ctx: &RequestContext,
|
||||
) -> Option<(Lsn, PageReadGuard)> {
|
||||
let Ok(permit) = self.try_get_pinned_slot_permit().await else {
|
||||
let Ok(permit) = self.try_get_pinned_slot_permit(ctx).await else {
|
||||
return None;
|
||||
};
|
||||
|
||||
crate::metrics::PAGE_CACHE
|
||||
.for_ctx(ctx)
|
||||
.read_accesses_materialized_page
|
||||
@@ -440,12 +460,13 @@ impl PageCache {
|
||||
/// Store an image of the given page in the cache.
|
||||
///
|
||||
pub async fn memorize_materialized_page(
|
||||
&self,
|
||||
&'static self,
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
key: Key,
|
||||
lsn: Lsn,
|
||||
img: &[u8],
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
let cache_key = CacheKey::MaterializedPage {
|
||||
hash_key: MaterializedPageHashKey {
|
||||
@@ -456,7 +477,7 @@ impl PageCache {
|
||||
lsn,
|
||||
};
|
||||
|
||||
let mut permit = Some(self.try_get_pinned_slot_permit().await?);
|
||||
let mut permit = Some(self.try_get_pinned_slot_permit(ctx).await?);
|
||||
loop {
|
||||
// First check if the key already exists in the cache.
|
||||
if let Some(slot_idx) = self.search_mapping_exact(&cache_key) {
|
||||
@@ -531,12 +552,12 @@ impl PageCache {
|
||||
|
||||
// Section 1.2: Public interface functions for working with immutable file pages.
|
||||
|
||||
pub async fn read_immutable_buf(
|
||||
&self,
|
||||
pub async fn read_immutable_buf<'c>(
|
||||
&'static self,
|
||||
file_id: FileId,
|
||||
blkno: u32,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<ReadBufResult> {
|
||||
ctx: &'c RequestContext,
|
||||
) -> anyhow::Result<ReadBufResult<'c, 'static>> {
|
||||
let mut cache_key = CacheKey::ImmutableFilePage { file_id, blkno };
|
||||
|
||||
self.lock_for_read(&mut cache_key, ctx).await
|
||||
@@ -550,7 +571,22 @@ impl PageCache {
|
||||
// "mappings" after this section. But the routines in this section should
|
||||
// not require changes.
|
||||
|
||||
async fn try_get_pinned_slot_permit(&self) -> anyhow::Result<PinnedSlotsPermit> {
|
||||
pub(crate) async fn get_permit(&self) -> Arc<PinnedSlotsPermit> {
|
||||
Arc::new(PinnedSlotsPermit(
|
||||
Arc::clone(&self.pinned_slots)
|
||||
.acquire_owned()
|
||||
.await
|
||||
.expect("the semaphore is never closed"),
|
||||
))
|
||||
}
|
||||
|
||||
async fn try_get_pinned_slot_permit<'c>(
|
||||
&self,
|
||||
ctx: &'c RequestContext,
|
||||
) -> anyhow::Result<PermitKind<'c>> {
|
||||
if let Some(permit) = ctx.permit() {
|
||||
return Ok(PermitKind::CtxProvided(permit));
|
||||
};
|
||||
let timer = crate::metrics::PAGE_CACHE_ACQUIRE_PINNED_SLOT_TIME.start_timer();
|
||||
match tokio::time::timeout(
|
||||
// Choose small timeout, neon_smgr does its own retries.
|
||||
@@ -560,9 +596,9 @@ impl PageCache {
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(res) => Ok(PinnedSlotsPermit(
|
||||
Ok(res) => Ok(PermitKind::Acquired(PinnedSlotsPermit(
|
||||
res.expect("this semaphore is never closed"),
|
||||
)),
|
||||
))),
|
||||
Err(_timeout) => {
|
||||
timer.stop_and_discard();
|
||||
crate::metrics::page_cache_errors_inc(
|
||||
@@ -582,10 +618,10 @@ impl PageCache {
|
||||
///
|
||||
/// If no page is found, returns None and *cache_key is left unmodified.
|
||||
///
|
||||
async fn try_lock_for_read(
|
||||
async fn try_lock_for_read<'c>(
|
||||
&self,
|
||||
cache_key: &mut CacheKey,
|
||||
permit: &mut Option<PinnedSlotsPermit>,
|
||||
permit: &mut Option<PermitKind<'c>>,
|
||||
) -> Option<PageReadGuard> {
|
||||
let cache_key_orig = cache_key.clone();
|
||||
if let Some(slot_idx) = self.search_mapping(cache_key) {
|
||||
@@ -638,11 +674,11 @@ impl PageCache {
|
||||
/// ```
|
||||
///
|
||||
async fn lock_for_read(
|
||||
&self,
|
||||
&'static self,
|
||||
cache_key: &mut CacheKey,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<ReadBufResult> {
|
||||
let mut permit = Some(self.try_get_pinned_slot_permit().await?);
|
||||
let mut permit = Some(self.try_get_pinned_slot_permit(ctx).await?);
|
||||
|
||||
let (read_access, hit) = match cache_key {
|
||||
CacheKey::MaterializedPage { .. } => {
|
||||
@@ -860,10 +896,12 @@ impl PageCache {
|
||||
///
|
||||
/// On return, the slot is empty and write-locked.
|
||||
async fn find_victim(
|
||||
&self,
|
||||
&'static self,
|
||||
_permit_witness: &PinnedSlotsPermit,
|
||||
) -> anyhow::Result<(usize, tokio::sync::RwLockWriteGuard<SlotInner>)> {
|
||||
let iter_limit = self.slots.len() * 10;
|
||||
// Get in line.
|
||||
let receiver = self.find_victim_waiters.recv();
|
||||
|
||||
let mut iters = 0;
|
||||
loop {
|
||||
iters += 1;
|
||||
@@ -875,41 +913,8 @@ impl PageCache {
|
||||
let mut inner = match slot.inner.try_write() {
|
||||
Ok(inner) => inner,
|
||||
Err(_err) => {
|
||||
if iters > iter_limit {
|
||||
// NB: Even with the permits, there's no hard guarantee that we will find a slot with
|
||||
// any particular number of iterations: other threads might race ahead and acquire and
|
||||
// release pins just as we're scanning the array.
|
||||
//
|
||||
// Imagine that nslots is 2, and as starting point, usage_count==1 on all
|
||||
// slots. There are two threads running concurrently, A and B. A has just
|
||||
// acquired the permit from the semaphore.
|
||||
//
|
||||
// A: Look at slot 1. Its usage_count == 1, so decrement it to zero, and continue the search
|
||||
// B: Acquire permit.
|
||||
// B: Look at slot 2, decrement its usage_count to zero and continue the search
|
||||
// B: Look at slot 1. Its usage_count is zero, so pin it and bump up its usage_count to 1.
|
||||
// B: Release pin and permit again
|
||||
// B: Acquire permit.
|
||||
// B: Look at slot 2. Its usage_count is zero, so pin it and bump up its usage_count to 1.
|
||||
// B: Release pin and permit again
|
||||
//
|
||||
// Now we're back in the starting situation that both slots have
|
||||
// usage_count 1, but A has now been through one iteration of the
|
||||
// find_victim() loop. This can repeat indefinitely and on each
|
||||
// iteration, A's iteration count increases by one.
|
||||
//
|
||||
// So, even though the semaphore for the permits is fair, the victim search
|
||||
// itself happens in parallel and is not fair.
|
||||
// Hence even with a permit, a task can theoretically be starved.
|
||||
// To avoid this, we'd need tokio to give priority to tasks that are holding
|
||||
// permits for longer.
|
||||
// Note that just yielding to tokio during iteration without such
|
||||
// priority boosting is likely counter-productive. We'd just give more opportunities
|
||||
// for B to bump usage count, further starving A.
|
||||
crate::metrics::page_cache_errors_inc(
|
||||
crate::metrics::PageCacheErrorKind::EvictIterLimit,
|
||||
);
|
||||
anyhow::bail!("exceeded evict iter limit");
|
||||
if iters > self.slots.len() * (MAX_USAGE_COUNT as usize) {
|
||||
unreachable!("find_victim_waiters prevents starvation");
|
||||
}
|
||||
continue;
|
||||
}
|
||||
@@ -920,7 +925,16 @@ impl PageCache {
|
||||
inner.key = None;
|
||||
}
|
||||
crate::metrics::PAGE_CACHE_FIND_VICTIMS_ITERS_TOTAL.inc_by(iters as u64);
|
||||
return Ok((slot_idx, inner));
|
||||
self.find_victim_sender
|
||||
.try_send((slot_idx, inner))
|
||||
.expect("we always get in line first");
|
||||
match futures::poll!(receiver) {
|
||||
Poll::Ready(Ok(res)) => return Ok(res),
|
||||
Poll::Ready(Err(_closed)) => unreachable!("we never close"),
|
||||
Poll::Pending => {
|
||||
unreachable!("we just sent to the channel and got in line earlier")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -957,6 +971,7 @@ impl PageCache {
|
||||
})
|
||||
.collect();
|
||||
|
||||
let (find_victim_sender, find_victim_waiters) = async_channel::bounded(num_pages);
|
||||
Self {
|
||||
materialized_page_map: Default::default(),
|
||||
immutable_page_map: Default::default(),
|
||||
@@ -964,6 +979,8 @@ impl PageCache {
|
||||
next_evict_slot: AtomicUsize::new(0),
|
||||
size_metrics,
|
||||
pinned_slots: Arc::new(tokio::sync::Semaphore::new(num_pages)),
|
||||
find_victim_sender,
|
||||
find_victim_waiters,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -35,7 +35,6 @@ use std::time::Duration;
|
||||
use tokio::io::AsyncWriteExt;
|
||||
use tokio::io::{AsyncRead, AsyncWrite};
|
||||
use tokio_util::io::StreamReader;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::field;
|
||||
use tracing::*;
|
||||
use utils::id::ConnectionId;
|
||||
@@ -65,6 +64,69 @@ use crate::trace::Tracer;
|
||||
use postgres_ffi::pg_constants::DEFAULTTABLESPACE_OID;
|
||||
use postgres_ffi::BLCKSZ;
|
||||
|
||||
fn copyin_stream<IO>(pgb: &mut PostgresBackend<IO>) -> impl Stream<Item = io::Result<Bytes>> + '_
|
||||
where
|
||||
IO: AsyncRead + AsyncWrite + Unpin,
|
||||
{
|
||||
async_stream::try_stream! {
|
||||
loop {
|
||||
let msg = tokio::select! {
|
||||
biased;
|
||||
|
||||
_ = task_mgr::shutdown_watcher() => {
|
||||
// We were requested to shut down.
|
||||
let msg = "pageserver is shutting down";
|
||||
let _ = pgb.write_message_noflush(&BeMessage::ErrorResponse(msg, None));
|
||||
Err(QueryError::Other(anyhow::anyhow!(msg)))
|
||||
}
|
||||
|
||||
msg = pgb.read_message() => { msg.map_err(QueryError::from)}
|
||||
};
|
||||
|
||||
match msg {
|
||||
Ok(Some(message)) => {
|
||||
let copy_data_bytes = match message {
|
||||
FeMessage::CopyData(bytes) => bytes,
|
||||
FeMessage::CopyDone => { break },
|
||||
FeMessage::Sync => continue,
|
||||
FeMessage::Terminate => {
|
||||
let msg = "client terminated connection with Terminate message during COPY";
|
||||
let query_error = QueryError::Disconnected(ConnectionError::Io(io::Error::new(io::ErrorKind::ConnectionReset, msg)));
|
||||
// error can't happen here, ErrorResponse serialization should be always ok
|
||||
pgb.write_message_noflush(&BeMessage::ErrorResponse(msg, Some(query_error.pg_error_code()))).map_err(|e| e.into_io_error())?;
|
||||
Err(io::Error::new(io::ErrorKind::ConnectionReset, msg))?;
|
||||
break;
|
||||
}
|
||||
m => {
|
||||
let msg = format!("unexpected message {m:?}");
|
||||
// error can't happen here, ErrorResponse serialization should be always ok
|
||||
pgb.write_message_noflush(&BeMessage::ErrorResponse(&msg, None)).map_err(|e| e.into_io_error())?;
|
||||
Err(io::Error::new(io::ErrorKind::Other, msg))?;
|
||||
break;
|
||||
}
|
||||
};
|
||||
|
||||
yield copy_data_bytes;
|
||||
}
|
||||
Ok(None) => {
|
||||
let msg = "client closed connection during COPY";
|
||||
let query_error = QueryError::Disconnected(ConnectionError::Io(io::Error::new(io::ErrorKind::ConnectionReset, msg)));
|
||||
// error can't happen here, ErrorResponse serialization should be always ok
|
||||
pgb.write_message_noflush(&BeMessage::ErrorResponse(msg, Some(query_error.pg_error_code()))).map_err(|e| e.into_io_error())?;
|
||||
pgb.flush().await?;
|
||||
Err(io::Error::new(io::ErrorKind::ConnectionReset, msg))?;
|
||||
}
|
||||
Err(QueryError::Disconnected(ConnectionError::Io(io_error))) => {
|
||||
Err(io_error)?;
|
||||
}
|
||||
Err(other) => {
|
||||
Err(io::Error::new(io::ErrorKind::Other, other.to_string()))?;
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Read the end of a tar archive.
|
||||
///
|
||||
/// A tar archive normally ends with two consecutive blocks of zeros, 512 bytes each.
|
||||
@@ -222,13 +284,7 @@ async fn page_service_conn_main(
|
||||
// and create a child per-query context when it invokes process_query.
|
||||
// But it's in a shared crate, so, we store connection_ctx inside PageServerHandler
|
||||
// and create the per-query context in process_query ourselves.
|
||||
let mut conn_handler = PageServerHandler::new(
|
||||
conf,
|
||||
broker_client,
|
||||
auth,
|
||||
connection_ctx,
|
||||
task_mgr::shutdown_token(),
|
||||
);
|
||||
let mut conn_handler = PageServerHandler::new(conf, broker_client, auth, connection_ctx);
|
||||
let pgbackend = PostgresBackend::new_from_io(socket, peer_addr, auth_type, None)?;
|
||||
|
||||
match pgbackend
|
||||
@@ -262,10 +318,6 @@ struct PageServerHandler {
|
||||
/// For each query received over the connection,
|
||||
/// `process_query` creates a child context from this one.
|
||||
connection_ctx: RequestContext,
|
||||
|
||||
/// A token that should fire when the tenant transitions from
|
||||
/// attached state, or when the pageserver is shutting down.
|
||||
cancel: CancellationToken,
|
||||
}
|
||||
|
||||
impl PageServerHandler {
|
||||
@@ -274,7 +326,6 @@ impl PageServerHandler {
|
||||
broker_client: storage_broker::BrokerClientChannel,
|
||||
auth: Option<Arc<JwtAuth>>,
|
||||
connection_ctx: RequestContext,
|
||||
cancel: CancellationToken,
|
||||
) -> Self {
|
||||
PageServerHandler {
|
||||
_conf: conf,
|
||||
@@ -282,91 +333,6 @@ impl PageServerHandler {
|
||||
auth,
|
||||
claims: None,
|
||||
connection_ctx,
|
||||
cancel,
|
||||
}
|
||||
}
|
||||
|
||||
/// Wrap PostgresBackend::flush to respect our CancellationToken: it is important to use
|
||||
/// this rather than naked flush() in order to shut down promptly. Without this, we would
|
||||
/// block shutdown of a tenant if a postgres client was failing to consume bytes we send
|
||||
/// in the flush.
|
||||
async fn flush_cancellable<IO>(&self, pgb: &mut PostgresBackend<IO>) -> Result<(), QueryError>
|
||||
where
|
||||
IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
|
||||
{
|
||||
tokio::select!(
|
||||
flush_r = pgb.flush() => {
|
||||
Ok(flush_r?)
|
||||
},
|
||||
_ = self.cancel.cancelled() => {
|
||||
Err(QueryError::Other(anyhow::anyhow!("Shutting down")))
|
||||
}
|
||||
)
|
||||
}
|
||||
|
||||
fn copyin_stream<'a, IO>(
|
||||
&'a self,
|
||||
pgb: &'a mut PostgresBackend<IO>,
|
||||
) -> impl Stream<Item = io::Result<Bytes>> + 'a
|
||||
where
|
||||
IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
|
||||
{
|
||||
async_stream::try_stream! {
|
||||
loop {
|
||||
let msg = tokio::select! {
|
||||
biased;
|
||||
|
||||
_ = task_mgr::shutdown_watcher() => {
|
||||
// We were requested to shut down.
|
||||
let msg = "pageserver is shutting down";
|
||||
let _ = pgb.write_message_noflush(&BeMessage::ErrorResponse(msg, None));
|
||||
Err(QueryError::Other(anyhow::anyhow!(msg)))
|
||||
}
|
||||
|
||||
msg = pgb.read_message() => { msg.map_err(QueryError::from)}
|
||||
};
|
||||
|
||||
match msg {
|
||||
Ok(Some(message)) => {
|
||||
let copy_data_bytes = match message {
|
||||
FeMessage::CopyData(bytes) => bytes,
|
||||
FeMessage::CopyDone => { break },
|
||||
FeMessage::Sync => continue,
|
||||
FeMessage::Terminate => {
|
||||
let msg = "client terminated connection with Terminate message during COPY";
|
||||
let query_error = QueryError::Disconnected(ConnectionError::Io(io::Error::new(io::ErrorKind::ConnectionReset, msg)));
|
||||
// error can't happen here, ErrorResponse serialization should be always ok
|
||||
pgb.write_message_noflush(&BeMessage::ErrorResponse(msg, Some(query_error.pg_error_code()))).map_err(|e| e.into_io_error())?;
|
||||
Err(io::Error::new(io::ErrorKind::ConnectionReset, msg))?;
|
||||
break;
|
||||
}
|
||||
m => {
|
||||
let msg = format!("unexpected message {m:?}");
|
||||
// error can't happen here, ErrorResponse serialization should be always ok
|
||||
pgb.write_message_noflush(&BeMessage::ErrorResponse(&msg, None)).map_err(|e| e.into_io_error())?;
|
||||
Err(io::Error::new(io::ErrorKind::Other, msg))?;
|
||||
break;
|
||||
}
|
||||
};
|
||||
|
||||
yield copy_data_bytes;
|
||||
}
|
||||
Ok(None) => {
|
||||
let msg = "client closed connection during COPY";
|
||||
let query_error = QueryError::Disconnected(ConnectionError::Io(io::Error::new(io::ErrorKind::ConnectionReset, msg)));
|
||||
// error can't happen here, ErrorResponse serialization should be always ok
|
||||
pgb.write_message_noflush(&BeMessage::ErrorResponse(msg, Some(query_error.pg_error_code()))).map_err(|e| e.into_io_error())?;
|
||||
self.flush_cancellable(pgb).await.map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?;
|
||||
Err(io::Error::new(io::ErrorKind::ConnectionReset, msg))?;
|
||||
}
|
||||
Err(QueryError::Disconnected(ConnectionError::Io(io_error))) => {
|
||||
Err(io_error)?;
|
||||
}
|
||||
Err(other) => {
|
||||
Err(io::Error::new(io::ErrorKind::Other, other.to_string()))?;
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -406,7 +372,7 @@ impl PageServerHandler {
|
||||
|
||||
// switch client to COPYBOTH
|
||||
pgb.write_message_noflush(&BeMessage::CopyBothResponse)?;
|
||||
self.flush_cancellable(pgb).await?;
|
||||
pgb.flush().await?;
|
||||
|
||||
let metrics = metrics::SmgrQueryTimePerTimeline::new(&tenant_id, &timeline_id);
|
||||
|
||||
@@ -499,7 +465,7 @@ impl PageServerHandler {
|
||||
});
|
||||
|
||||
pgb.write_message_noflush(&BeMessage::CopyData(&response.serialize()))?;
|
||||
self.flush_cancellable(pgb).await?;
|
||||
pgb.flush().await?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
@@ -542,9 +508,9 @@ impl PageServerHandler {
|
||||
// Import basebackup provided via CopyData
|
||||
info!("importing basebackup");
|
||||
pgb.write_message_noflush(&BeMessage::CopyInResponse)?;
|
||||
self.flush_cancellable(pgb).await?;
|
||||
pgb.flush().await?;
|
||||
|
||||
let mut copyin_reader = pin!(StreamReader::new(self.copyin_stream(pgb)));
|
||||
let mut copyin_reader = pin!(StreamReader::new(copyin_stream(pgb)));
|
||||
timeline
|
||||
.import_basebackup_from_tar(
|
||||
&mut copyin_reader,
|
||||
@@ -597,8 +563,8 @@ impl PageServerHandler {
|
||||
// Import wal provided via CopyData
|
||||
info!("importing wal");
|
||||
pgb.write_message_noflush(&BeMessage::CopyInResponse)?;
|
||||
self.flush_cancellable(pgb).await?;
|
||||
let mut copyin_reader = pin!(StreamReader::new(self.copyin_stream(pgb)));
|
||||
pgb.flush().await?;
|
||||
let mut copyin_reader = pin!(StreamReader::new(copyin_stream(pgb)));
|
||||
import_wal_from_tar(&timeline, &mut copyin_reader, start_lsn, end_lsn, &ctx).await?;
|
||||
info!("wal import complete");
|
||||
|
||||
@@ -806,7 +772,7 @@ impl PageServerHandler {
|
||||
|
||||
// switch client to COPYOUT
|
||||
pgb.write_message_noflush(&BeMessage::CopyOutResponse)?;
|
||||
self.flush_cancellable(pgb).await?;
|
||||
pgb.flush().await?;
|
||||
|
||||
// Send a tarball of the latest layer on the timeline. Compress if not
|
||||
// fullbackup. TODO Compress in that case too (tests need to be updated)
|
||||
@@ -858,7 +824,7 @@ impl PageServerHandler {
|
||||
}
|
||||
|
||||
pgb.write_message_noflush(&BeMessage::CopyDone)?;
|
||||
self.flush_cancellable(pgb).await?;
|
||||
pgb.flush().await?;
|
||||
|
||||
let basebackup_after = started
|
||||
.elapsed()
|
||||
|
||||
@@ -406,8 +406,6 @@ pub enum CreateTimelineError {
|
||||
AlreadyExists,
|
||||
#[error(transparent)]
|
||||
AncestorLsn(anyhow::Error),
|
||||
#[error("ancestor timeline is not active")]
|
||||
AncestorNotActive,
|
||||
#[error(transparent)]
|
||||
Other(#[from] anyhow::Error),
|
||||
}
|
||||
@@ -1589,12 +1587,6 @@ impl Tenant {
|
||||
.get_timeline(ancestor_timeline_id, false)
|
||||
.context("Cannot branch off the timeline that's not present in pageserver")?;
|
||||
|
||||
// instead of waiting around, just deny the request because ancestor is not yet
|
||||
// ready for other purposes either.
|
||||
if !ancestor_timeline.is_active() {
|
||||
return Err(CreateTimelineError::AncestorNotActive);
|
||||
}
|
||||
|
||||
if let Some(lsn) = ancestor_start_lsn.as_mut() {
|
||||
*lsn = lsn.align();
|
||||
|
||||
@@ -1627,6 +1619,8 @@ impl Tenant {
|
||||
}
|
||||
};
|
||||
|
||||
loaded_timeline.activate(broker_client, None, ctx);
|
||||
|
||||
if let Some(remote_client) = loaded_timeline.remote_client.as_ref() {
|
||||
// Wait for the upload of the 'index_part.json` file to finish, so that when we return
|
||||
// Ok, the timeline is durable in remote storage.
|
||||
@@ -1638,8 +1632,6 @@ impl Tenant {
|
||||
})?;
|
||||
}
|
||||
|
||||
loaded_timeline.activate(broker_client, None, ctx);
|
||||
|
||||
Ok(loaded_timeline)
|
||||
}
|
||||
|
||||
|
||||
@@ -20,10 +20,10 @@ use std::io::{Error, ErrorKind};
|
||||
|
||||
impl<'a> BlockCursor<'a> {
|
||||
/// Read a blob into a new buffer.
|
||||
pub async fn read_blob(
|
||||
pub async fn read_blob<'c>(
|
||||
&self,
|
||||
offset: u64,
|
||||
ctx: &RequestContext,
|
||||
ctx: &'c RequestContext,
|
||||
) -> Result<Vec<u8>, std::io::Error> {
|
||||
let mut buf = Vec::new();
|
||||
self.read_blob_into_buf(offset, &mut buf, ctx).await?;
|
||||
@@ -31,11 +31,11 @@ impl<'a> BlockCursor<'a> {
|
||||
}
|
||||
/// Read blob into the given buffer. Any previous contents in the buffer
|
||||
/// are overwritten.
|
||||
pub async fn read_blob_into_buf(
|
||||
pub async fn read_blob_into_buf<'c>(
|
||||
&self,
|
||||
offset: u64,
|
||||
dstbuf: &mut Vec<u8>,
|
||||
ctx: &RequestContext,
|
||||
ctx: &'c RequestContext,
|
||||
) -> Result<(), std::io::Error> {
|
||||
let mut blknum = (offset / PAGE_SZ as u64) as u32;
|
||||
let mut off = (offset % PAGE_SZ as u64) as usize;
|
||||
|
||||
@@ -34,27 +34,27 @@ where
|
||||
}
|
||||
|
||||
/// Reference to an in-memory copy of an immutable on-disk block.
|
||||
pub enum BlockLease<'a> {
|
||||
PageReadGuard(PageReadGuard<'static>),
|
||||
pub enum BlockLease<'c, 'a> {
|
||||
PageReadGuard(PageReadGuard<'c, 'static>),
|
||||
EphemeralFileMutableTail(&'a [u8; PAGE_SZ]),
|
||||
#[cfg(test)]
|
||||
Arc(std::sync::Arc<[u8; PAGE_SZ]>),
|
||||
}
|
||||
|
||||
impl From<PageReadGuard<'static>> for BlockLease<'static> {
|
||||
fn from(value: PageReadGuard<'static>) -> BlockLease<'static> {
|
||||
impl<'c, 'a> From<PageReadGuard<'c, 'a>> for BlockLease<'c, 'a> {
|
||||
fn from(value: PageReadGuard<'c, 'a>) -> BlockLease<'c, 'a> {
|
||||
BlockLease::PageReadGuard(value)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
impl<'a> From<std::sync::Arc<[u8; PAGE_SZ]>> for BlockLease<'a> {
|
||||
impl<'c, 'a> From<std::sync::Arc<[u8; PAGE_SZ]>> for BlockLease<'c, 'a> {
|
||||
fn from(value: std::sync::Arc<[u8; PAGE_SZ]>) -> Self {
|
||||
BlockLease::Arc(value)
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> Deref for BlockLease<'a> {
|
||||
impl<'c, 'a> Deref for BlockLease<'c, 'a> {
|
||||
type Target = [u8; PAGE_SZ];
|
||||
|
||||
fn deref(&self) -> &Self::Target {
|
||||
@@ -83,11 +83,11 @@ pub(crate) enum BlockReaderRef<'a> {
|
||||
|
||||
impl<'a> BlockReaderRef<'a> {
|
||||
#[inline(always)]
|
||||
async fn read_blk(
|
||||
async fn read_blk<'c>(
|
||||
&self,
|
||||
blknum: u32,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<BlockLease, std::io::Error> {
|
||||
ctx: &'c RequestContext,
|
||||
) -> Result<BlockLease<'c, '_>, std::io::Error> {
|
||||
use BlockReaderRef::*;
|
||||
match self {
|
||||
FileBlockReader(r) => r.read_blk(blknum, ctx).await,
|
||||
@@ -141,11 +141,11 @@ impl<'a> BlockCursor<'a> {
|
||||
/// access to the contents of the page. (For the page cache, the
|
||||
/// lease object represents a lock on the buffer.)
|
||||
#[inline(always)]
|
||||
pub async fn read_blk(
|
||||
pub async fn read_blk<'c>(
|
||||
&self,
|
||||
blknum: u32,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<BlockLease, std::io::Error> {
|
||||
ctx: &'c RequestContext,
|
||||
) -> Result<BlockLease<'c, '_>, std::io::Error> {
|
||||
self.reader.read_blk(blknum, ctx).await
|
||||
}
|
||||
}
|
||||
@@ -180,11 +180,11 @@ impl FileBlockReader {
|
||||
/// Returns a "lease" object that can be used to
|
||||
/// access to the contents of the page. (For the page cache, the
|
||||
/// lease object represents a lock on the buffer.)
|
||||
pub async fn read_blk(
|
||||
pub async fn read_blk<'c>(
|
||||
&self,
|
||||
blknum: u32,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<BlockLease, std::io::Error> {
|
||||
ctx: &'c RequestContext,
|
||||
) -> Result<BlockLease<'c, 'static>, std::io::Error> {
|
||||
let cache = page_cache::get();
|
||||
match cache
|
||||
.read_immutable_buf(self.file_id, blknum, ctx)
|
||||
|
||||
@@ -64,11 +64,11 @@ impl EphemeralFile {
|
||||
self.len
|
||||
}
|
||||
|
||||
pub(crate) async fn read_blk(
|
||||
pub(crate) async fn read_blk<'c>(
|
||||
&self,
|
||||
blknum: u32,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<BlockLease, io::Error> {
|
||||
ctx: &'c RequestContext,
|
||||
) -> Result<BlockLease<'c, '_>, io::Error> {
|
||||
let flushed_blknums = 0..self.len / PAGE_SZ as u64;
|
||||
if flushed_blknums.contains(&(blknum as u64)) {
|
||||
let cache = page_cache::get();
|
||||
|
||||
@@ -151,148 +151,6 @@ async fn safe_rename_tenant_dir(path: impl AsRef<Utf8Path>) -> std::io::Result<U
|
||||
|
||||
static TENANTS: Lazy<RwLock<TenantsMap>> = Lazy::new(|| RwLock::new(TenantsMap::Initializing));
|
||||
|
||||
fn emergency_generations(
|
||||
tenant_confs: &HashMap<TenantId, anyhow::Result<LocationConf>>,
|
||||
) -> HashMap<TenantId, Generation> {
|
||||
tenant_confs
|
||||
.iter()
|
||||
.filter_map(|(tid, lc)| {
|
||||
let lc = match lc {
|
||||
Ok(lc) => lc,
|
||||
Err(_) => return None,
|
||||
};
|
||||
let gen = match &lc.mode {
|
||||
LocationMode::Attached(alc) => Some(alc.generation),
|
||||
LocationMode::Secondary(_) => None,
|
||||
};
|
||||
|
||||
gen.map(|g| (*tid, g))
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
|
||||
async fn init_load_generations(
|
||||
conf: &'static PageServerConf,
|
||||
tenant_confs: &HashMap<TenantId, anyhow::Result<LocationConf>>,
|
||||
resources: &TenantSharedResources,
|
||||
cancel: &CancellationToken,
|
||||
) -> anyhow::Result<Option<HashMap<TenantId, Generation>>> {
|
||||
let generations = if conf.control_plane_emergency_mode {
|
||||
error!(
|
||||
"Emergency mode! Tenants will be attached unsafely using their last known generation"
|
||||
);
|
||||
emergency_generations(tenant_confs)
|
||||
} else if let Some(client) = ControlPlaneClient::new(conf, cancel) {
|
||||
info!("Calling control plane API to re-attach tenants");
|
||||
// If we are configured to use the control plane API, then it is the source of truth for what tenants to load.
|
||||
match client.re_attach().await {
|
||||
Ok(tenants) => tenants,
|
||||
Err(RetryForeverError::ShuttingDown) => {
|
||||
anyhow::bail!("Shut down while waiting for control plane re-attach response")
|
||||
}
|
||||
}
|
||||
} else {
|
||||
info!("Control plane API not configured, tenant generations are disabled");
|
||||
return Ok(None);
|
||||
};
|
||||
|
||||
// The deletion queue needs to know about the startup attachment state to decide which (if any) stored
|
||||
// deletion list entries may still be valid. We provide that by pushing a recovery operation into
|
||||
// the queue. Sequential processing of te queue ensures that recovery is done before any new tenant deletions
|
||||
// are processed, even though we don't block on recovery completing here.
|
||||
//
|
||||
// Must only do this if remote storage is enabled, otherwise deletion queue
|
||||
// is not running and channel push will fail.
|
||||
if resources.remote_storage.is_some() {
|
||||
resources
|
||||
.deletion_queue_client
|
||||
.recover(generations.clone())
|
||||
.await?;
|
||||
}
|
||||
|
||||
Ok(Some(generations))
|
||||
}
|
||||
|
||||
/// Initial stage of load: walk the local tenants directory, clean up any temp files,
|
||||
/// and load configurations for the tenants we found.
|
||||
async fn init_load_tenant_configs(
|
||||
conf: &'static PageServerConf,
|
||||
) -> anyhow::Result<HashMap<TenantId, anyhow::Result<LocationConf>>> {
|
||||
let tenants_dir = conf.tenants_path();
|
||||
|
||||
let mut dir_entries = tenants_dir
|
||||
.read_dir_utf8()
|
||||
.with_context(|| format!("Failed to list tenants dir {tenants_dir:?}"))?;
|
||||
|
||||
let mut configs = HashMap::new();
|
||||
|
||||
loop {
|
||||
match dir_entries.next() {
|
||||
None => break,
|
||||
Some(Ok(dentry)) => {
|
||||
let tenant_dir_path = dentry.path().to_path_buf();
|
||||
if crate::is_temporary(&tenant_dir_path) {
|
||||
info!("Found temporary tenant directory, removing: {tenant_dir_path}");
|
||||
// No need to use safe_remove_tenant_dir_all because this is already
|
||||
// a temporary path
|
||||
if let Err(e) = fs::remove_dir_all(&tenant_dir_path).await {
|
||||
error!(
|
||||
"Failed to remove temporary directory '{}': {:?}",
|
||||
tenant_dir_path, e
|
||||
);
|
||||
}
|
||||
continue;
|
||||
}
|
||||
|
||||
// This case happens if we:
|
||||
// * crash during attach before creating the attach marker file
|
||||
// * crash during tenant delete before removing tenant directory
|
||||
let is_empty = tenant_dir_path.is_empty_dir().with_context(|| {
|
||||
format!("Failed to check whether {tenant_dir_path:?} is an empty dir")
|
||||
})?;
|
||||
if is_empty {
|
||||
info!("removing empty tenant directory {tenant_dir_path:?}");
|
||||
if let Err(e) = fs::remove_dir(&tenant_dir_path).await {
|
||||
error!(
|
||||
"Failed to remove empty tenant directory '{}': {e:#}",
|
||||
tenant_dir_path
|
||||
)
|
||||
}
|
||||
continue;
|
||||
}
|
||||
|
||||
let tenant_ignore_mark_file = tenant_dir_path.join(IGNORED_TENANT_FILE_NAME);
|
||||
if tenant_ignore_mark_file.exists() {
|
||||
info!("Found an ignore mark file {tenant_ignore_mark_file:?}, skipping the tenant");
|
||||
continue;
|
||||
}
|
||||
|
||||
let tenant_id = match tenant_dir_path
|
||||
.file_name()
|
||||
.unwrap_or_default()
|
||||
.parse::<TenantId>()
|
||||
{
|
||||
Ok(id) => id,
|
||||
Err(_) => {
|
||||
warn!(
|
||||
"Invalid tenant path (garbage in our repo directory?): {tenant_dir_path}",
|
||||
);
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
configs.insert(tenant_id, Tenant::load_tenant_config(conf, &tenant_id));
|
||||
}
|
||||
Some(Err(e)) => {
|
||||
// An error listing the top level directory indicates serious problem
|
||||
// with local filesystem: we will fail to load, and fail to start.
|
||||
anyhow::bail!(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(configs)
|
||||
}
|
||||
|
||||
/// Initialize repositories with locally available timelines.
|
||||
/// Timelines that are only partially available locally (remote storage has more data than this pageserver)
|
||||
/// are scheduled for download and added to the tenant once download is completed.
|
||||
@@ -303,96 +161,196 @@ pub async fn init_tenant_mgr(
|
||||
init_order: InitializationOrder,
|
||||
cancel: CancellationToken,
|
||||
) -> anyhow::Result<()> {
|
||||
// Scan local filesystem for attached tenants
|
||||
let tenants_dir = conf.tenants_path();
|
||||
|
||||
let mut tenants = HashMap::new();
|
||||
|
||||
// If we are configured to use the control plane API, then it is the source of truth for what tenants to load.
|
||||
let tenant_generations = if let Some(client) = ControlPlaneClient::new(conf, &cancel) {
|
||||
let result = match client.re_attach().await {
|
||||
Ok(tenants) => tenants,
|
||||
Err(RetryForeverError::ShuttingDown) => {
|
||||
anyhow::bail!("Shut down while waiting for control plane re-attach response")
|
||||
}
|
||||
};
|
||||
|
||||
// The deletion queue needs to know about the startup attachment state to decide which (if any) stored
|
||||
// deletion list entries may still be valid. We provide that by pushing a recovery operation into
|
||||
// the queue. Sequential processing of te queue ensures that recovery is done before any new tenant deletions
|
||||
// are processed, even though we don't block on recovery completing here.
|
||||
//
|
||||
// Must only do this if remote storage is enabled, otherwise deletion queue
|
||||
// is not running and channel push will fail.
|
||||
if resources.remote_storage.is_some() {
|
||||
resources
|
||||
.deletion_queue_client
|
||||
.recover(result.clone())
|
||||
.await?;
|
||||
}
|
||||
|
||||
Some(result)
|
||||
} else {
|
||||
info!("Control plane API not configured, tenant generations are disabled");
|
||||
None
|
||||
};
|
||||
|
||||
let mut dir_entries = tenants_dir
|
||||
.read_dir_utf8()
|
||||
.with_context(|| format!("Failed to list tenants dir {tenants_dir:?}"))?;
|
||||
|
||||
let ctx = RequestContext::todo_child(TaskKind::Startup, DownloadBehavior::Warn);
|
||||
|
||||
// Scan local filesystem for attached tenants
|
||||
let tenant_configs = init_load_tenant_configs(conf).await?;
|
||||
loop {
|
||||
match dir_entries.next() {
|
||||
None => break,
|
||||
Some(Ok(dir_entry)) => {
|
||||
let tenant_dir_path = dir_entry.path().to_path_buf();
|
||||
if crate::is_temporary(&tenant_dir_path) {
|
||||
info!("Found temporary tenant directory, removing: {tenant_dir_path}");
|
||||
// No need to use safe_remove_tenant_dir_all because this is already
|
||||
// a temporary path
|
||||
if let Err(e) = fs::remove_dir_all(&tenant_dir_path).await {
|
||||
error!(
|
||||
"Failed to remove temporary directory '{}': {:?}",
|
||||
tenant_dir_path, e
|
||||
);
|
||||
}
|
||||
} else {
|
||||
// This case happens if we:
|
||||
// * crash during attach before creating the attach marker file
|
||||
// * crash during tenant delete before removing tenant directory
|
||||
let is_empty = tenant_dir_path.is_empty_dir().with_context(|| {
|
||||
format!("Failed to check whether {tenant_dir_path:?} is an empty dir")
|
||||
})?;
|
||||
if is_empty {
|
||||
info!("removing empty tenant directory {tenant_dir_path:?}");
|
||||
if let Err(e) = fs::remove_dir(&tenant_dir_path).await {
|
||||
error!(
|
||||
"Failed to remove empty tenant directory '{}': {e:#}",
|
||||
tenant_dir_path
|
||||
)
|
||||
}
|
||||
continue;
|
||||
}
|
||||
|
||||
// Determine which tenants are to be attached
|
||||
let tenant_generations =
|
||||
init_load_generations(conf, &tenant_configs, &resources, &cancel).await?;
|
||||
let tenant_ignore_mark_file = tenant_dir_path.join(IGNORED_TENANT_FILE_NAME);
|
||||
if tenant_ignore_mark_file.exists() {
|
||||
info!("Found an ignore mark file {tenant_ignore_mark_file:?}, skipping the tenant");
|
||||
continue;
|
||||
}
|
||||
|
||||
// Construct `Tenant` objects and start them running
|
||||
for (tenant_id, location_conf) in tenant_configs {
|
||||
let tenant_dir_path = conf.tenant_path(&tenant_id);
|
||||
let tenant_id = match tenant_dir_path
|
||||
.file_name()
|
||||
.unwrap_or_default()
|
||||
.parse::<TenantId>()
|
||||
{
|
||||
Ok(id) => id,
|
||||
Err(_) => {
|
||||
warn!(
|
||||
"Invalid tenant path (garbage in our repo directory?): {}",
|
||||
tenant_dir_path
|
||||
);
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
let mut location_conf = match location_conf {
|
||||
Ok(l) => l,
|
||||
Err(e) => {
|
||||
warn!(%tenant_id, "Marking tenant broken, failed to {e:#}");
|
||||
// Try loading the location configuration
|
||||
let mut location_conf = match Tenant::load_tenant_config(conf, &tenant_id)
|
||||
.context("load tenant config")
|
||||
{
|
||||
Ok(c) => c,
|
||||
Err(e) => {
|
||||
warn!("Marking tenant broken, failed to {e:#}");
|
||||
|
||||
tenants.insert(
|
||||
tenant_id,
|
||||
TenantSlot::Attached(Tenant::create_broken_tenant(
|
||||
tenants.insert(
|
||||
tenant_id,
|
||||
TenantSlot::Attached(Tenant::create_broken_tenant(
|
||||
conf,
|
||||
tenant_id,
|
||||
"error loading tenant location configuration".to_string(),
|
||||
)),
|
||||
);
|
||||
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
let generation = if let Some(generations) = &tenant_generations {
|
||||
// We have a generation map: treat it as the authority for whether
|
||||
// this tenant is really attached.
|
||||
if let Some(gen) = generations.get(&tenant_id) {
|
||||
*gen
|
||||
} else {
|
||||
match &location_conf.mode {
|
||||
LocationMode::Secondary(_) => {
|
||||
// We do not require the control plane's permission for secondary mode
|
||||
// tenants, because they do no remote writes and hence require no
|
||||
// generation number
|
||||
info!("Loaded tenant {tenant_id} in secondary mode");
|
||||
tenants.insert(tenant_id, TenantSlot::Secondary);
|
||||
}
|
||||
LocationMode::Attached(_) => {
|
||||
// TODO: augment re-attach API to enable the control plane to
|
||||
// instruct us about secondary attachments. That way, instead of throwing
|
||||
// away local state, we can gracefully fall back to secondary here, if the control
|
||||
// plane tells us so.
|
||||
// (https://github.com/neondatabase/neon/issues/5377)
|
||||
info!("Detaching tenant {tenant_id}, control plane omitted it in re-attach response");
|
||||
if let Err(e) =
|
||||
safe_remove_tenant_dir_all(&tenant_dir_path).await
|
||||
{
|
||||
error!(
|
||||
"Failed to remove detached tenant directory '{}': {:?}",
|
||||
tenant_dir_path, e
|
||||
);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
continue;
|
||||
}
|
||||
} else {
|
||||
// Legacy mode: no generation information, any tenant present
|
||||
// on local disk may activate
|
||||
info!(
|
||||
"Starting tenant {} in legacy mode, no generation",
|
||||
tenant_dir_path
|
||||
);
|
||||
Generation::none()
|
||||
};
|
||||
|
||||
// Presence of a generation number implies attachment: attach the tenant
|
||||
// if it wasn't already, and apply the generation number.
|
||||
location_conf.attach_in_generation(generation);
|
||||
Tenant::persist_tenant_config(conf, &tenant_id, &location_conf).await?;
|
||||
|
||||
match schedule_local_tenant_processing(
|
||||
conf,
|
||||
tenant_id,
|
||||
format!("{}", e),
|
||||
)),
|
||||
);
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
let generation = if let Some(generations) = &tenant_generations {
|
||||
// We have a generation map: treat it as the authority for whether
|
||||
// this tenant is really attached.
|
||||
if let Some(gen) = generations.get(&tenant_id) {
|
||||
*gen
|
||||
} else {
|
||||
match &location_conf.mode {
|
||||
LocationMode::Secondary(_) => {
|
||||
// We do not require the control plane's permission for secondary mode
|
||||
// tenants, because they do no remote writes and hence require no
|
||||
// generation number
|
||||
info!(%tenant_id, "Loaded tenant in secondary mode");
|
||||
tenants.insert(tenant_id, TenantSlot::Secondary);
|
||||
}
|
||||
LocationMode::Attached(_) => {
|
||||
// TODO: augment re-attach API to enable the control plane to
|
||||
// instruct us about secondary attachments. That way, instead of throwing
|
||||
// away local state, we can gracefully fall back to secondary here, if the control
|
||||
// plane tells us so.
|
||||
// (https://github.com/neondatabase/neon/issues/5377)
|
||||
info!(%tenant_id, "Detaching tenant, control plane omitted it in re-attach response");
|
||||
if let Err(e) = safe_remove_tenant_dir_all(&tenant_dir_path).await {
|
||||
error!(%tenant_id,
|
||||
"Failed to remove detached tenant directory '{tenant_dir_path}': {e:?}",
|
||||
);
|
||||
&tenant_dir_path,
|
||||
AttachedTenantConf::try_from(location_conf)?,
|
||||
resources.clone(),
|
||||
Some(init_order.clone()),
|
||||
&TENANTS,
|
||||
&ctx,
|
||||
) {
|
||||
Ok(tenant) => {
|
||||
tenants.insert(tenant.tenant_id(), TenantSlot::Attached(tenant));
|
||||
}
|
||||
Err(e) => {
|
||||
error!("Failed to collect tenant files from dir {tenants_dir:?} for entry {dir_entry:?}, reason: {e:#}");
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
continue;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// Legacy mode: no generation information, any tenant present
|
||||
// on local disk may activate
|
||||
info!(%tenant_id, "Starting tenant in legacy mode, no generation",);
|
||||
Generation::none()
|
||||
};
|
||||
|
||||
// Presence of a generation number implies attachment: attach the tenant
|
||||
// if it wasn't already, and apply the generation number.
|
||||
location_conf.attach_in_generation(generation);
|
||||
Tenant::persist_tenant_config(conf, &tenant_id, &location_conf).await?;
|
||||
|
||||
match schedule_local_tenant_processing(
|
||||
conf,
|
||||
tenant_id,
|
||||
&tenant_dir_path,
|
||||
AttachedTenantConf::try_from(location_conf)?,
|
||||
resources.clone(),
|
||||
Some(init_order.clone()),
|
||||
&TENANTS,
|
||||
&ctx,
|
||||
) {
|
||||
Ok(tenant) => {
|
||||
tenants.insert(tenant.tenant_id(), TenantSlot::Attached(tenant));
|
||||
}
|
||||
Err(e) => {
|
||||
error!(%tenant_id, "Failed to start tenant: {e:#}");
|
||||
Some(Err(e)) => {
|
||||
// On error, print it, but continue with the other tenants. If we error out
|
||||
// here, the pageserver startup fails altogether, causing outage for *all*
|
||||
// tenants. That seems worse.
|
||||
error!(
|
||||
"Failed to list tenants dir entry in directory {tenants_dir:?}, reason: {e:?}"
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -901,27 +901,9 @@ impl RemoteTimelineClient {
|
||||
.await
|
||||
.context("list prefixes")?;
|
||||
|
||||
// We will delete the current index_part object last, since it acts as a deletion
|
||||
// marker via its deleted_at attribute
|
||||
let latest_index = remaining
|
||||
.iter()
|
||||
.filter(|p| {
|
||||
p.object_name()
|
||||
.map(|n| n.starts_with(IndexPart::FILE_NAME))
|
||||
.unwrap_or(false)
|
||||
})
|
||||
.filter_map(|path| parse_remote_index_path(path.clone()).map(|gen| (path, gen)))
|
||||
.max_by_key(|i| i.1)
|
||||
.map(|i| i.0.clone())
|
||||
.unwrap_or(
|
||||
// No generation-suffixed indices, assume we are dealing with
|
||||
// a legacy index.
|
||||
remote_index_path(&self.tenant_id, &self.timeline_id, Generation::none()),
|
||||
);
|
||||
|
||||
let remaining_layers: Vec<RemotePath> = remaining
|
||||
let remaining: Vec<RemotePath> = remaining
|
||||
.into_iter()
|
||||
.filter(|p| p!= &latest_index)
|
||||
.filter(|p| p.object_name() != Some(IndexPart::FILE_NAME))
|
||||
.inspect(|path| {
|
||||
if let Some(name) = path.object_name() {
|
||||
info!(%name, "deleting a file not referenced from index_part.json");
|
||||
@@ -931,11 +913,9 @@ impl RemoteTimelineClient {
|
||||
})
|
||||
.collect();
|
||||
|
||||
let not_referenced_count = remaining_layers.len();
|
||||
if !remaining_layers.is_empty() {
|
||||
self.deletion_queue_client
|
||||
.push_immediate(remaining_layers)
|
||||
.await?;
|
||||
let not_referenced_count = remaining.len();
|
||||
if !remaining.is_empty() {
|
||||
self.deletion_queue_client.push_immediate(remaining).await?;
|
||||
}
|
||||
|
||||
fail::fail_point!("timeline-delete-before-index-delete", |_| {
|
||||
@@ -944,9 +924,11 @@ impl RemoteTimelineClient {
|
||||
))?
|
||||
});
|
||||
|
||||
let index_file_path = timeline_storage_path.join(Utf8Path::new(IndexPart::FILE_NAME));
|
||||
|
||||
debug!("enqueuing index part deletion");
|
||||
self.deletion_queue_client
|
||||
.push_immediate([latest_index].to_vec())
|
||||
.push_immediate([index_file_path].to_vec())
|
||||
.await?;
|
||||
|
||||
// Timeline deletion is rare and we have probably emitted a reasonably number of objects: wait
|
||||
|
||||
@@ -31,7 +31,6 @@ pub(super) async fn upload_index_part<'a>(
|
||||
fail_point!("before-upload-index", |_| {
|
||||
bail!("failpoint before-upload-index")
|
||||
});
|
||||
pausable_failpoint!("before-upload-index-pausable");
|
||||
|
||||
let index_part_bytes =
|
||||
serde_json::to_vec(&index_part).context("serialize index part file into bytes")?;
|
||||
|
||||
@@ -511,7 +511,8 @@ impl DeltaLayer {
|
||||
///
|
||||
/// This variant is only used for debugging purposes, by the 'pagectl' binary.
|
||||
pub fn new_for_path(path: &Utf8Path, file: File) -> Result<Self> {
|
||||
let mut summary_buf = vec![0; PAGE_SZ];
|
||||
let mut summary_buf = Vec::new();
|
||||
summary_buf.resize(PAGE_SZ, 0);
|
||||
file.read_exact_at(&mut summary_buf, 0)?;
|
||||
let summary = Summary::des_prefix(&summary_buf)?;
|
||||
|
||||
@@ -548,7 +549,7 @@ impl DeltaLayer {
|
||||
/// Loads all keys stored in the layer. Returns key, lsn, value size and value reference.
|
||||
///
|
||||
/// The value can be obtained via the [`ValueRef::load`] function.
|
||||
pub(crate) async fn load_keys(&self, ctx: &RequestContext) -> Result<Vec<DeltaEntry<'_>>> {
|
||||
pub(crate) async fn load_keys<'c>(&self, ctx: &RequestContext) -> Result<Vec<DeltaEntry<'_>>> {
|
||||
let inner = self
|
||||
.load(LayerAccessKind::KeyIter, ctx)
|
||||
.await
|
||||
@@ -1037,9 +1038,9 @@ pub struct ValueRef<'a> {
|
||||
reader: BlockCursor<'a>,
|
||||
}
|
||||
|
||||
impl<'a> ValueRef<'a> {
|
||||
impl<'c, 'a> ValueRef<'a> {
|
||||
/// Loads the value from disk
|
||||
pub async fn load(&self, ctx: &RequestContext) -> Result<Value> {
|
||||
pub async fn load(&self, ctx: &'c RequestContext) -> Result<Value> {
|
||||
// theoretically we *could* record an access time for each, but it does not really matter
|
||||
let buf = self.reader.read_blob(self.blob_ref.pos(), ctx).await?;
|
||||
let val = Value::des(&buf)?;
|
||||
@@ -1050,11 +1051,11 @@ impl<'a> ValueRef<'a> {
|
||||
pub(crate) struct Adapter<T>(T);
|
||||
|
||||
impl<T: AsRef<DeltaLayerInner>> Adapter<T> {
|
||||
pub(crate) async fn read_blk(
|
||||
pub(crate) async fn read_blk<'c>(
|
||||
&self,
|
||||
blknum: u32,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<BlockLease, std::io::Error> {
|
||||
ctx: &'c RequestContext,
|
||||
) -> Result<BlockLease<'c, '_>, std::io::Error> {
|
||||
self.0.as_ref().file.read_blk(blknum, ctx).await
|
||||
}
|
||||
}
|
||||
|
||||
@@ -400,7 +400,8 @@ impl ImageLayer {
|
||||
///
|
||||
/// This variant is only used for debugging purposes, by the 'pagectl' binary.
|
||||
pub fn new_for_path(path: &Utf8Path, file: File) -> Result<ImageLayer> {
|
||||
let mut summary_buf = vec![0; PAGE_SZ];
|
||||
let mut summary_buf = Vec::new();
|
||||
summary_buf.resize(PAGE_SZ, 0);
|
||||
file.read_exact_at(&mut summary_buf, 0)?;
|
||||
let summary = Summary::des_prefix(&summary_buf)?;
|
||||
let metadata = file
|
||||
|
||||
@@ -505,7 +505,7 @@ impl Timeline {
|
||||
timer.stop_and_record();
|
||||
|
||||
let start = Instant::now();
|
||||
let res = self.reconstruct_value(key, lsn, reconstruct_state).await;
|
||||
let res = self.reconstruct_value(key, lsn, reconstruct_state, ctx).await;
|
||||
let elapsed = start.elapsed();
|
||||
crate::metrics::RECONSTRUCT_TIME
|
||||
.for_result(&res)
|
||||
@@ -2363,7 +2363,7 @@ impl Timeline {
|
||||
// during branch creation.
|
||||
match ancestor.wait_to_become_active(ctx).await {
|
||||
Ok(()) => {}
|
||||
Err(TimelineState::Stopping) => {
|
||||
Err(state) if state == TimelineState::Stopping => {
|
||||
return Err(PageReconstructError::AncestorStopping(ancestor.timeline_id));
|
||||
}
|
||||
Err(state) => {
|
||||
@@ -4279,6 +4279,7 @@ impl Timeline {
|
||||
key: Key,
|
||||
request_lsn: Lsn,
|
||||
mut data: ValueReconstructState,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<Bytes, PageReconstructError> {
|
||||
// Perform WAL redo if needed
|
||||
data.records.reverse();
|
||||
@@ -4342,6 +4343,7 @@ impl Timeline {
|
||||
key,
|
||||
last_rec_lsn,
|
||||
&img,
|
||||
ctx,
|
||||
)
|
||||
.await
|
||||
.context("Materialized page memoization failed")
|
||||
|
||||
@@ -16,7 +16,7 @@
|
||||
use std::{
|
||||
collections::HashMap,
|
||||
ops::ControlFlow,
|
||||
sync::Arc,
|
||||
sync::{Arc, Mutex},
|
||||
time::{Duration, SystemTime},
|
||||
};
|
||||
|
||||
@@ -25,7 +25,7 @@ use tokio_util::sync::CancellationToken;
|
||||
use tracing::{debug, error, info, info_span, instrument, warn, Instrument};
|
||||
|
||||
use crate::{
|
||||
context::{DownloadBehavior, RequestContext},
|
||||
context::{DownloadBehavior, RequestContext, RequestContextBuilder},
|
||||
task_mgr::{self, TaskKind, BACKGROUND_RUNTIME},
|
||||
tenant::{
|
||||
config::{EvictionPolicy, EvictionPolicyLayerAccessThreshold},
|
||||
@@ -397,9 +397,14 @@ impl Timeline {
|
||||
}
|
||||
}
|
||||
|
||||
let permit = crate::page_cache::get().get_permit().await;
|
||||
let ctx = RequestContextBuilder::extend(ctx)
|
||||
.page_cache_permit(permit)
|
||||
.build();
|
||||
|
||||
// imitiate repartiting on first compactation
|
||||
if let Err(e) = self
|
||||
.collect_keyspace(lsn, ctx)
|
||||
.collect_keyspace(lsn, &ctx)
|
||||
.instrument(info_span!("collect_keyspace"))
|
||||
.await
|
||||
{
|
||||
|
||||
@@ -544,7 +544,7 @@ impl VirtualFile {
|
||||
pub(crate) async fn read_blk(
|
||||
&self,
|
||||
blknum: u32,
|
||||
) -> Result<crate::tenant::block_io::BlockLease<'_>, std::io::Error> {
|
||||
) -> Result<crate::tenant::block_io::BlockLease<'_, '_>, std::io::Error> {
|
||||
use crate::page_cache::PAGE_SZ;
|
||||
let mut buf = [0; PAGE_SZ];
|
||||
self.read_exact_at(&mut buf, blknum as u64 * (PAGE_SZ as u64))
|
||||
|
||||
@@ -825,7 +825,7 @@ impl PostgresRedoManager {
|
||||
while nwrite < writebuf.len() {
|
||||
let n = loop {
|
||||
match nix::poll::poll(&mut pollfds[0..2], wal_redo_timeout.as_millis() as i32) {
|
||||
Err(nix::errno::Errno::EINTR) => continue,
|
||||
Err(e) if e == nix::errno::Errno::EINTR => continue,
|
||||
res => break res,
|
||||
}
|
||||
}?;
|
||||
@@ -917,7 +917,7 @@ impl PostgresRedoManager {
|
||||
// and forward any logging information that the child writes to its stderr to the page server's log.
|
||||
let n = loop {
|
||||
match nix::poll::poll(&mut pollfds[1..3], wal_redo_timeout.as_millis() as i32) {
|
||||
Err(nix::errno::Errno::EINTR) => continue,
|
||||
Err(e) if e == nix::errno::Errno::EINTR => continue,
|
||||
res => break res,
|
||||
}
|
||||
}?;
|
||||
|
||||
@@ -7,12 +7,12 @@ OBJS = \
|
||||
extension_server.o \
|
||||
file_cache.o \
|
||||
libpagestore.o \
|
||||
libpqwalproposer.o \
|
||||
neon.o \
|
||||
neon_utils.o \
|
||||
pagestore_smgr.o \
|
||||
relsize_cache.o \
|
||||
walproposer.o \
|
||||
walproposer_pg.o \
|
||||
walproposer_utils.o \
|
||||
control_plane_connector.o
|
||||
|
||||
PG_CPPFLAGS = -I$(libpq_srcdir)
|
||||
|
||||
@@ -30,7 +30,7 @@
|
||||
|
||||
#include "neon.h"
|
||||
#include "walproposer.h"
|
||||
#include "neon_utils.h"
|
||||
#include "walproposer_utils.h"
|
||||
|
||||
#define PageStoreTrace DEBUG5
|
||||
|
||||
|
||||
424
pgxn/neon/libpqwalproposer.c
Normal file
424
pgxn/neon/libpqwalproposer.c
Normal file
@@ -0,0 +1,424 @@
|
||||
#include "postgres.h"
|
||||
|
||||
#include "libpq-fe.h"
|
||||
#include "neon.h"
|
||||
#include "walproposer.h"
|
||||
|
||||
/* Header in walproposer.h -- Wrapper struct to abstract away the libpq connection */
|
||||
struct WalProposerConn
|
||||
{
|
||||
PGconn *pg_conn;
|
||||
bool is_nonblocking; /* whether the connection is non-blocking */
|
||||
char *recvbuf; /* last received data from
|
||||
* walprop_async_read */
|
||||
};
|
||||
|
||||
/* Helper function */
|
||||
static bool
|
||||
ensure_nonblocking_status(WalProposerConn *conn, bool is_nonblocking)
|
||||
{
|
||||
/* If we're already correctly blocking or nonblocking, all good */
|
||||
if (is_nonblocking == conn->is_nonblocking)
|
||||
return true;
|
||||
|
||||
/* Otherwise, set it appropriately */
|
||||
if (PQsetnonblocking(conn->pg_conn, is_nonblocking) == -1)
|
||||
return false;
|
||||
|
||||
conn->is_nonblocking = is_nonblocking;
|
||||
return true;
|
||||
}
|
||||
|
||||
/* Exported function definitions */
|
||||
char *
|
||||
walprop_error_message(WalProposerConn *conn)
|
||||
{
|
||||
return PQerrorMessage(conn->pg_conn);
|
||||
}
|
||||
|
||||
WalProposerConnStatusType
|
||||
walprop_status(WalProposerConn *conn)
|
||||
{
|
||||
switch (PQstatus(conn->pg_conn))
|
||||
{
|
||||
case CONNECTION_OK:
|
||||
return WP_CONNECTION_OK;
|
||||
case CONNECTION_BAD:
|
||||
return WP_CONNECTION_BAD;
|
||||
default:
|
||||
return WP_CONNECTION_IN_PROGRESS;
|
||||
}
|
||||
}
|
||||
|
||||
WalProposerConn *
|
||||
walprop_connect_start(char *conninfo, char *password)
|
||||
{
|
||||
WalProposerConn *conn;
|
||||
PGconn *pg_conn;
|
||||
const char *keywords[3];
|
||||
const char *values[3];
|
||||
int n;
|
||||
|
||||
/*
|
||||
* Connect using the given connection string. If the
|
||||
* NEON_AUTH_TOKEN environment variable was set, use that as
|
||||
* the password.
|
||||
*
|
||||
* The connection options are parsed in the order they're given, so
|
||||
* when we set the password before the connection string, the
|
||||
* connection string can override the password from the env variable.
|
||||
* Seems useful, although we don't currently use that capability
|
||||
* anywhere.
|
||||
*/
|
||||
n = 0;
|
||||
if (password)
|
||||
{
|
||||
keywords[n] = "password";
|
||||
values[n] = password;
|
||||
n++;
|
||||
}
|
||||
keywords[n] = "dbname";
|
||||
values[n] = conninfo;
|
||||
n++;
|
||||
keywords[n] = NULL;
|
||||
values[n] = NULL;
|
||||
n++;
|
||||
pg_conn = PQconnectStartParams(keywords, values, 1);
|
||||
|
||||
/*
|
||||
* Allocation of a PQconn can fail, and will return NULL. We want to fully
|
||||
* replicate the behavior of PQconnectStart here.
|
||||
*/
|
||||
if (!pg_conn)
|
||||
return NULL;
|
||||
|
||||
/*
|
||||
* And in theory this allocation can fail as well, but it's incredibly
|
||||
* unlikely if we just successfully allocated a PGconn.
|
||||
*
|
||||
* palloc will exit on failure though, so there's not much we could do if
|
||||
* it *did* fail.
|
||||
*/
|
||||
conn = palloc(sizeof(WalProposerConn));
|
||||
conn->pg_conn = pg_conn;
|
||||
conn->is_nonblocking = false; /* connections always start in blocking
|
||||
* mode */
|
||||
conn->recvbuf = NULL;
|
||||
return conn;
|
||||
}
|
||||
|
||||
WalProposerConnectPollStatusType
|
||||
walprop_connect_poll(WalProposerConn *conn)
|
||||
{
|
||||
WalProposerConnectPollStatusType return_val;
|
||||
|
||||
switch (PQconnectPoll(conn->pg_conn))
|
||||
{
|
||||
case PGRES_POLLING_FAILED:
|
||||
return_val = WP_CONN_POLLING_FAILED;
|
||||
break;
|
||||
case PGRES_POLLING_READING:
|
||||
return_val = WP_CONN_POLLING_READING;
|
||||
break;
|
||||
case PGRES_POLLING_WRITING:
|
||||
return_val = WP_CONN_POLLING_WRITING;
|
||||
break;
|
||||
case PGRES_POLLING_OK:
|
||||
return_val = WP_CONN_POLLING_OK;
|
||||
break;
|
||||
|
||||
/*
|
||||
* There's a comment at its source about this constant being
|
||||
* unused. We'll expect it's never returned.
|
||||
*/
|
||||
case PGRES_POLLING_ACTIVE:
|
||||
elog(FATAL, "Unexpected PGRES_POLLING_ACTIVE returned from PQconnectPoll");
|
||||
|
||||
/*
|
||||
* This return is never actually reached, but it's here to make
|
||||
* the compiler happy
|
||||
*/
|
||||
return WP_CONN_POLLING_FAILED;
|
||||
|
||||
default:
|
||||
Assert(false);
|
||||
return_val = WP_CONN_POLLING_FAILED; /* keep the compiler quiet */
|
||||
}
|
||||
|
||||
return return_val;
|
||||
}
|
||||
|
||||
bool
|
||||
walprop_send_query(WalProposerConn *conn, char *query)
|
||||
{
|
||||
/*
|
||||
* We need to be in blocking mode for sending the query to run without
|
||||
* requiring a call to PQflush
|
||||
*/
|
||||
if (!ensure_nonblocking_status(conn, false))
|
||||
return false;
|
||||
|
||||
/* PQsendQuery returns 1 on success, 0 on failure */
|
||||
if (!PQsendQuery(conn->pg_conn, query))
|
||||
return false;
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
WalProposerExecStatusType
|
||||
walprop_get_query_result(WalProposerConn *conn)
|
||||
{
|
||||
PGresult *result;
|
||||
WalProposerExecStatusType return_val;
|
||||
|
||||
/* Marker variable if we need to log an unexpected success result */
|
||||
char *unexpected_success = NULL;
|
||||
|
||||
/* Consume any input that we might be missing */
|
||||
if (!PQconsumeInput(conn->pg_conn))
|
||||
return WP_EXEC_FAILED;
|
||||
|
||||
if (PQisBusy(conn->pg_conn))
|
||||
return WP_EXEC_NEEDS_INPUT;
|
||||
|
||||
|
||||
result = PQgetResult(conn->pg_conn);
|
||||
|
||||
/*
|
||||
* PQgetResult returns NULL only if getting the result was successful &
|
||||
* there's no more of the result to get.
|
||||
*/
|
||||
if (!result)
|
||||
{
|
||||
elog(WARNING, "[libpqwalproposer] Unexpected successful end of command results");
|
||||
return WP_EXEC_UNEXPECTED_SUCCESS;
|
||||
}
|
||||
|
||||
/* Helper macro to reduce boilerplate */
|
||||
#define UNEXPECTED_SUCCESS(msg) \
|
||||
return_val = WP_EXEC_UNEXPECTED_SUCCESS; \
|
||||
unexpected_success = msg; \
|
||||
break;
|
||||
|
||||
|
||||
switch (PQresultStatus(result))
|
||||
{
|
||||
/* "true" success case */
|
||||
case PGRES_COPY_BOTH:
|
||||
return_val = WP_EXEC_SUCCESS_COPYBOTH;
|
||||
break;
|
||||
|
||||
/* Unexpected success case */
|
||||
case PGRES_EMPTY_QUERY:
|
||||
UNEXPECTED_SUCCESS("empty query return");
|
||||
case PGRES_COMMAND_OK:
|
||||
UNEXPECTED_SUCCESS("data-less command end");
|
||||
case PGRES_TUPLES_OK:
|
||||
UNEXPECTED_SUCCESS("tuples return");
|
||||
case PGRES_COPY_OUT:
|
||||
UNEXPECTED_SUCCESS("'Copy Out' response");
|
||||
case PGRES_COPY_IN:
|
||||
UNEXPECTED_SUCCESS("'Copy In' response");
|
||||
case PGRES_SINGLE_TUPLE:
|
||||
UNEXPECTED_SUCCESS("single tuple return");
|
||||
case PGRES_PIPELINE_SYNC:
|
||||
UNEXPECTED_SUCCESS("pipeline sync point");
|
||||
|
||||
/* Failure cases */
|
||||
case PGRES_BAD_RESPONSE:
|
||||
case PGRES_NONFATAL_ERROR:
|
||||
case PGRES_FATAL_ERROR:
|
||||
case PGRES_PIPELINE_ABORTED:
|
||||
return_val = WP_EXEC_FAILED;
|
||||
break;
|
||||
|
||||
default:
|
||||
Assert(false);
|
||||
return_val = WP_EXEC_FAILED; /* keep the compiler quiet */
|
||||
}
|
||||
|
||||
if (unexpected_success)
|
||||
elog(WARNING, "[libpqwalproposer] Unexpected successful %s", unexpected_success);
|
||||
|
||||
return return_val;
|
||||
}
|
||||
|
||||
pgsocket
|
||||
walprop_socket(WalProposerConn *conn)
|
||||
{
|
||||
return PQsocket(conn->pg_conn);
|
||||
}
|
||||
|
||||
int
|
||||
walprop_flush(WalProposerConn *conn)
|
||||
{
|
||||
return (PQflush(conn->pg_conn));
|
||||
}
|
||||
|
||||
void
|
||||
walprop_finish(WalProposerConn *conn)
|
||||
{
|
||||
if (conn->recvbuf != NULL)
|
||||
PQfreemem(conn->recvbuf);
|
||||
PQfinish(conn->pg_conn);
|
||||
pfree(conn);
|
||||
}
|
||||
|
||||
/*
|
||||
* Receive a message from the safekeeper.
|
||||
*
|
||||
* On success, the data is placed in *buf. It is valid until the next call
|
||||
* to this function.
|
||||
*/
|
||||
PGAsyncReadResult
|
||||
walprop_async_read(WalProposerConn *conn, char **buf, int *amount)
|
||||
{
|
||||
int result;
|
||||
|
||||
if (conn->recvbuf != NULL)
|
||||
{
|
||||
PQfreemem(conn->recvbuf);
|
||||
conn->recvbuf = NULL;
|
||||
}
|
||||
|
||||
/* Call PQconsumeInput so that we have the data we need */
|
||||
if (!PQconsumeInput(conn->pg_conn))
|
||||
{
|
||||
*amount = 0;
|
||||
*buf = NULL;
|
||||
return PG_ASYNC_READ_FAIL;
|
||||
}
|
||||
|
||||
/*
|
||||
* The docs for PQgetCopyData list the return values as: 0 if the copy is
|
||||
* still in progress, but no "complete row" is available -1 if the copy is
|
||||
* done -2 if an error occurred (> 0) if it was successful; that value is
|
||||
* the amount transferred.
|
||||
*
|
||||
* The protocol we use between walproposer and safekeeper means that we
|
||||
* *usually* wouldn't expect to see that the copy is done, but this can
|
||||
* sometimes be triggered by the server returning an ErrorResponse (which
|
||||
* also happens to have the effect that the copy is done).
|
||||
*/
|
||||
switch (result = PQgetCopyData(conn->pg_conn, &conn->recvbuf, true))
|
||||
{
|
||||
case 0:
|
||||
*amount = 0;
|
||||
*buf = NULL;
|
||||
return PG_ASYNC_READ_TRY_AGAIN;
|
||||
case -1:
|
||||
{
|
||||
/*
|
||||
* If we get -1, it's probably because of a server error; the
|
||||
* safekeeper won't normally send a CopyDone message.
|
||||
*
|
||||
* We can check PQgetResult to make sure that the server
|
||||
* failed; it'll always result in PGRES_FATAL_ERROR
|
||||
*/
|
||||
ExecStatusType status = PQresultStatus(PQgetResult(conn->pg_conn));
|
||||
|
||||
if (status != PGRES_FATAL_ERROR)
|
||||
elog(FATAL, "unexpected result status %d after failed PQgetCopyData", status);
|
||||
|
||||
/*
|
||||
* If there was actually an error, it'll be properly reported
|
||||
* by calls to PQerrorMessage -- we don't have to do anything
|
||||
* else
|
||||
*/
|
||||
*amount = 0;
|
||||
*buf = NULL;
|
||||
return PG_ASYNC_READ_FAIL;
|
||||
}
|
||||
case -2:
|
||||
*amount = 0;
|
||||
*buf = NULL;
|
||||
return PG_ASYNC_READ_FAIL;
|
||||
default:
|
||||
/* Positive values indicate the size of the returned result */
|
||||
*amount = result;
|
||||
*buf = conn->recvbuf;
|
||||
return PG_ASYNC_READ_SUCCESS;
|
||||
}
|
||||
}
|
||||
|
||||
PGAsyncWriteResult
|
||||
walprop_async_write(WalProposerConn *conn, void const *buf, size_t size)
|
||||
{
|
||||
int result;
|
||||
|
||||
/* If we aren't in non-blocking mode, switch to it. */
|
||||
if (!ensure_nonblocking_status(conn, true))
|
||||
return PG_ASYNC_WRITE_FAIL;
|
||||
|
||||
/*
|
||||
* The docs for PQputcopyData list the return values as: 1 if the data was
|
||||
* queued, 0 if it was not queued because of full buffers, or -1 if an
|
||||
* error occurred
|
||||
*/
|
||||
result = PQputCopyData(conn->pg_conn, buf, size);
|
||||
|
||||
/*
|
||||
* We won't get a result of zero because walproposer always empties the
|
||||
* connection's buffers before sending more
|
||||
*/
|
||||
Assert(result != 0);
|
||||
|
||||
switch (result)
|
||||
{
|
||||
case 1:
|
||||
/* good -- continue */
|
||||
break;
|
||||
case -1:
|
||||
return PG_ASYNC_WRITE_FAIL;
|
||||
default:
|
||||
elog(FATAL, "invalid return %d from PQputCopyData", result);
|
||||
}
|
||||
|
||||
/*
|
||||
* After queueing the data, we still need to flush to get it to send. This
|
||||
* might take multiple tries, but we don't want to wait around until it's
|
||||
* done.
|
||||
*
|
||||
* PQflush has the following returns (directly quoting the docs): 0 if
|
||||
* sucessful, 1 if it was unable to send all the data in the send queue
|
||||
* yet -1 if it failed for some reason
|
||||
*/
|
||||
switch (result = PQflush(conn->pg_conn))
|
||||
{
|
||||
case 0:
|
||||
return PG_ASYNC_WRITE_SUCCESS;
|
||||
case 1:
|
||||
return PG_ASYNC_WRITE_TRY_FLUSH;
|
||||
case -1:
|
||||
return PG_ASYNC_WRITE_FAIL;
|
||||
default:
|
||||
elog(FATAL, "invalid return %d from PQflush", result);
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* This function is very similar to walprop_async_write. For more
|
||||
* information, refer to the comments there.
|
||||
*/
|
||||
bool
|
||||
walprop_blocking_write(WalProposerConn *conn, void const *buf, size_t size)
|
||||
{
|
||||
int result;
|
||||
|
||||
/* If we are in non-blocking mode, switch out of it. */
|
||||
if (!ensure_nonblocking_status(conn, false))
|
||||
return false;
|
||||
|
||||
if ((result = PQputCopyData(conn->pg_conn, buf, size)) == -1)
|
||||
return false;
|
||||
|
||||
Assert(result == 1);
|
||||
|
||||
/* Because the connection is non-blocking, flushing returns 0 or -1 */
|
||||
|
||||
if ((result = PQflush(conn->pg_conn)) == -1)
|
||||
return false;
|
||||
|
||||
Assert(result == 0);
|
||||
return true;
|
||||
}
|
||||
@@ -18,10 +18,6 @@ extern char *neon_auth_token;
|
||||
extern char *neon_timeline;
|
||||
extern char *neon_tenant;
|
||||
|
||||
extern char *wal_acceptors_list;
|
||||
extern int wal_acceptor_reconnect_timeout;
|
||||
extern int wal_acceptor_connection_timeout;
|
||||
|
||||
extern void pg_init_libpagestore(void);
|
||||
extern void pg_init_walproposer(void);
|
||||
|
||||
@@ -34,10 +30,4 @@ extern void pg_init_extension_server(void);
|
||||
extern bool neon_redo_read_buffer_filter(XLogReaderState *record, uint8 block_id);
|
||||
extern bool (*old_redo_read_buffer_filter) (XLogReaderState *record, uint8 block_id);
|
||||
|
||||
extern uint64 BackpressureThrottlingTime(void);
|
||||
extern void replication_feedback_get_lsns(XLogRecPtr *writeLsn, XLogRecPtr *flushLsn, XLogRecPtr *applyLsn);
|
||||
|
||||
extern void PGDLLEXPORT WalProposerSync(int argc, char *argv[]);
|
||||
extern void PGDLLEXPORT WalProposerMain(Datum main_arg);
|
||||
|
||||
#endif /* NEON_H */
|
||||
|
||||
@@ -1,116 +0,0 @@
|
||||
#include "postgres.h"
|
||||
|
||||
#include "access/timeline.h"
|
||||
#include "access/xlogutils.h"
|
||||
#include "common/logging.h"
|
||||
#include "common/ip.h"
|
||||
#include "funcapi.h"
|
||||
#include "libpq/libpq.h"
|
||||
#include "libpq/pqformat.h"
|
||||
#include "miscadmin.h"
|
||||
#include "postmaster/interrupt.h"
|
||||
#include "replication/slot.h"
|
||||
#include "replication/walsender_private.h"
|
||||
|
||||
#include "storage/ipc.h"
|
||||
#include "utils/builtins.h"
|
||||
#include "utils/ps_status.h"
|
||||
|
||||
#include "libpq-fe.h"
|
||||
#include <netinet/tcp.h>
|
||||
#include <unistd.h>
|
||||
|
||||
#if PG_VERSION_NUM >= 150000
|
||||
#include "access/xlogutils.h"
|
||||
#include "access/xlogrecovery.h"
|
||||
#endif
|
||||
#if PG_MAJORVERSION_NUM >= 16
|
||||
#include "utils/guc.h"
|
||||
#endif
|
||||
|
||||
/*
|
||||
* Convert a character which represents a hexadecimal digit to an integer.
|
||||
*
|
||||
* Returns -1 if the character is not a hexadecimal digit.
|
||||
*/
|
||||
int
|
||||
HexDecodeChar(char c)
|
||||
{
|
||||
if (c >= '0' && c <= '9')
|
||||
return c - '0';
|
||||
if (c >= 'a' && c <= 'f')
|
||||
return c - 'a' + 10;
|
||||
if (c >= 'A' && c <= 'F')
|
||||
return c - 'A' + 10;
|
||||
|
||||
return -1;
|
||||
}
|
||||
|
||||
/*
|
||||
* Decode a hex string into a byte string, 2 hex chars per byte.
|
||||
*
|
||||
* Returns false if invalid characters are encountered; otherwise true.
|
||||
*/
|
||||
bool
|
||||
HexDecodeString(uint8 *result, char *input, int nbytes)
|
||||
{
|
||||
int i;
|
||||
|
||||
for (i = 0; i < nbytes; ++i)
|
||||
{
|
||||
int n1 = HexDecodeChar(input[i * 2]);
|
||||
int n2 = HexDecodeChar(input[i * 2 + 1]);
|
||||
|
||||
if (n1 < 0 || n2 < 0)
|
||||
return false;
|
||||
result[i] = n1 * 16 + n2;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
/* --------------------------------
|
||||
* pq_getmsgint32_le - get a binary 4-byte int from a message buffer in native (LE) order
|
||||
* --------------------------------
|
||||
*/
|
||||
uint32
|
||||
pq_getmsgint32_le(StringInfo msg)
|
||||
{
|
||||
uint32 n32;
|
||||
|
||||
pq_copymsgbytes(msg, (char *) &n32, sizeof(n32));
|
||||
|
||||
return n32;
|
||||
}
|
||||
|
||||
/* --------------------------------
|
||||
* pq_getmsgint64 - get a binary 8-byte int from a message buffer in native (LE) order
|
||||
* --------------------------------
|
||||
*/
|
||||
uint64
|
||||
pq_getmsgint64_le(StringInfo msg)
|
||||
{
|
||||
uint64 n64;
|
||||
|
||||
pq_copymsgbytes(msg, (char *) &n64, sizeof(n64));
|
||||
|
||||
return n64;
|
||||
}
|
||||
|
||||
/* append a binary [u]int32 to a StringInfo buffer in native (LE) order */
|
||||
void
|
||||
pq_sendint32_le(StringInfo buf, uint32 i)
|
||||
{
|
||||
enlargeStringInfo(buf, sizeof(uint32));
|
||||
memcpy(buf->data + buf->len, &i, sizeof(uint32));
|
||||
buf->len += sizeof(uint32);
|
||||
}
|
||||
|
||||
/* append a binary [u]int64 to a StringInfo buffer in native (LE) order */
|
||||
void
|
||||
pq_sendint64_le(StringInfo buf, uint64 i)
|
||||
{
|
||||
enlargeStringInfo(buf, sizeof(uint64));
|
||||
memcpy(buf->data + buf->len, &i, sizeof(uint64));
|
||||
buf->len += sizeof(uint64);
|
||||
}
|
||||
@@ -1,12 +0,0 @@
|
||||
#ifndef __NEON_UTILS_H__
|
||||
#define __NEON_UTILS_H__
|
||||
|
||||
#include "postgres.h"
|
||||
|
||||
bool HexDecodeString(uint8 *result, char *input, int nbytes);
|
||||
uint32 pq_getmsgint32_le(StringInfo msg);
|
||||
uint64 pq_getmsgint64_le(StringInfo msg);
|
||||
void pq_sendint32_le(StringInfo buf, uint32 i);
|
||||
void pq_sendint64_le(StringInfo buf, uint64 i);
|
||||
|
||||
#endif /* __NEON_UTILS_H__ */
|
||||
@@ -721,7 +721,7 @@ prefetch_register_buffer(BufferTag tag, bool *force_latest, XLogRecPtr *force_ls
|
||||
|
||||
/* use an intermediate PrefetchRequest struct to ensure correct alignment */
|
||||
req.buftag = tag;
|
||||
Retry:
|
||||
|
||||
entry = prfh_lookup(MyPState->prf_hash, (PrefetchRequest *) &req);
|
||||
|
||||
if (entry != NULL)
|
||||
@@ -858,11 +858,7 @@ prefetch_register_buffer(BufferTag tag, bool *force_latest, XLogRecPtr *force_ls
|
||||
if (flush_every_n_requests > 0 &&
|
||||
MyPState->ring_unused - MyPState->ring_flush >= flush_every_n_requests)
|
||||
{
|
||||
if (!page_server->flush())
|
||||
{
|
||||
/* Prefetch set is reset in case of error, so we should try to register our request once again */
|
||||
goto Retry;
|
||||
}
|
||||
page_server->flush();
|
||||
MyPState->ring_flush = MyPState->ring_unused;
|
||||
}
|
||||
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@@ -1,8 +1,8 @@
|
||||
#ifndef __NEON_WALPROPOSER_H__
|
||||
#define __NEON_WALPROPOSER_H__
|
||||
|
||||
#include "postgres.h"
|
||||
#include "access/xlogdefs.h"
|
||||
#include "postgres.h"
|
||||
#include "port.h"
|
||||
#include "access/xlog_internal.h"
|
||||
#include "access/transam.h"
|
||||
@@ -16,15 +16,29 @@
|
||||
#define MAX_SAFEKEEPERS 32
|
||||
#define MAX_SEND_SIZE (XLOG_BLCKSZ * 16) /* max size of a single* WAL
|
||||
* message */
|
||||
#define XLOG_HDR_SIZE (1 + 8 * 3) /* 'w' + startPos + walEnd + timestamp */
|
||||
#define XLOG_HDR_START_POS 1 /* offset of start position in wal sender*
|
||||
* message header */
|
||||
#define XLOG_HDR_END_POS (1 + 8) /* offset of end position in wal sender*
|
||||
* message header */
|
||||
|
||||
/*
|
||||
* In the spirit of WL_SOCKET_READABLE and others, this corresponds to no events having occurred,
|
||||
* because all WL_* events are given flags equal to some (1 << i), starting from i = 0
|
||||
*/
|
||||
#define WL_NO_EVENTS 0
|
||||
|
||||
struct WalProposerConn; /* Defined in implementation (walprop_pg.c) */
|
||||
extern char *wal_acceptors_list;
|
||||
extern int wal_acceptor_reconnect_timeout;
|
||||
extern int wal_acceptor_connection_timeout;
|
||||
extern bool am_wal_proposer;
|
||||
|
||||
struct WalProposerConn; /* Defined in libpqwalproposer */
|
||||
typedef struct WalProposerConn WalProposerConn;
|
||||
|
||||
struct WalMessage;
|
||||
typedef struct WalMessage WalMessage;
|
||||
|
||||
/* Possible return values from ReadPGAsync */
|
||||
typedef enum
|
||||
{
|
||||
@@ -38,7 +52,7 @@ typedef enum
|
||||
PG_ASYNC_READ_TRY_AGAIN,
|
||||
/* Reading failed. Check PQerrorMessage(conn) */
|
||||
PG_ASYNC_READ_FAIL,
|
||||
} PGAsyncReadResult;
|
||||
} PGAsyncReadResult;
|
||||
|
||||
/* Possible return values from WritePGAsync */
|
||||
typedef enum
|
||||
@@ -57,7 +71,7 @@ typedef enum
|
||||
PG_ASYNC_WRITE_TRY_FLUSH,
|
||||
/* Writing failed. Check PQerrorMessage(conn) */
|
||||
PG_ASYNC_WRITE_FAIL,
|
||||
} PGAsyncWriteResult;
|
||||
} PGAsyncWriteResult;
|
||||
|
||||
/*
|
||||
* WAL safekeeper state, which is used to wait for some event.
|
||||
@@ -133,7 +147,7 @@ typedef enum
|
||||
* to read.
|
||||
*/
|
||||
SS_ACTIVE,
|
||||
} SafekeeperState;
|
||||
} SafekeeperState;
|
||||
|
||||
/* Consensus logical timestamp. */
|
||||
typedef uint64 term_t;
|
||||
@@ -157,12 +171,12 @@ typedef struct ProposerGreeting
|
||||
uint8 tenant_id[16];
|
||||
TimeLineID timeline;
|
||||
uint32 walSegSize;
|
||||
} ProposerGreeting;
|
||||
} ProposerGreeting;
|
||||
|
||||
typedef struct AcceptorProposerMessage
|
||||
{
|
||||
uint64 tag;
|
||||
} AcceptorProposerMessage;
|
||||
} AcceptorProposerMessage;
|
||||
|
||||
/*
|
||||
* Acceptor -> Proposer initial response: the highest term acceptor voted for.
|
||||
@@ -172,7 +186,7 @@ typedef struct AcceptorGreeting
|
||||
AcceptorProposerMessage apm;
|
||||
term_t term;
|
||||
NNodeId nodeId;
|
||||
} AcceptorGreeting;
|
||||
} AcceptorGreeting;
|
||||
|
||||
/*
|
||||
* Proposer -> Acceptor vote request.
|
||||
@@ -182,20 +196,20 @@ typedef struct VoteRequest
|
||||
uint64 tag;
|
||||
term_t term;
|
||||
pg_uuid_t proposerId; /* for monitoring/debugging */
|
||||
} VoteRequest;
|
||||
} VoteRequest;
|
||||
|
||||
/* Element of term switching chain. */
|
||||
typedef struct TermSwitchEntry
|
||||
{
|
||||
term_t term;
|
||||
XLogRecPtr lsn;
|
||||
} TermSwitchEntry;
|
||||
} TermSwitchEntry;
|
||||
|
||||
typedef struct TermHistory
|
||||
{
|
||||
uint32 n_entries;
|
||||
TermSwitchEntry *entries;
|
||||
} TermHistory;
|
||||
} TermHistory;
|
||||
|
||||
/* Vote itself, sent from safekeeper to proposer */
|
||||
typedef struct VoteResponse
|
||||
@@ -213,7 +227,7 @@ typedef struct VoteResponse
|
||||
* recovery of some safekeeper */
|
||||
TermHistory termHistory;
|
||||
XLogRecPtr timelineStartLsn; /* timeline globally starts at this LSN */
|
||||
} VoteResponse;
|
||||
} VoteResponse;
|
||||
|
||||
/*
|
||||
* Proposer -> Acceptor message announcing proposer is elected and communicating
|
||||
@@ -229,7 +243,7 @@ typedef struct ProposerElected
|
||||
TermHistory *termHistory;
|
||||
/* timeline globally starts at this LSN */
|
||||
XLogRecPtr timelineStartLsn;
|
||||
} ProposerElected;
|
||||
} ProposerElected;
|
||||
|
||||
/*
|
||||
* Header of request with WAL message sent from proposer to safekeeper.
|
||||
@@ -254,7 +268,7 @@ typedef struct AppendRequestHeader
|
||||
*/
|
||||
XLogRecPtr truncateLsn;
|
||||
pg_uuid_t proposerId; /* for monitoring/debugging */
|
||||
} AppendRequestHeader;
|
||||
} AppendRequestHeader;
|
||||
|
||||
/*
|
||||
* Hot standby feedback received from replica
|
||||
@@ -264,7 +278,7 @@ typedef struct HotStandbyFeedback
|
||||
TimestampTz ts;
|
||||
FullTransactionId xmin;
|
||||
FullTransactionId catalog_xmin;
|
||||
} HotStandbyFeedback;
|
||||
} HotStandbyFeedback;
|
||||
|
||||
typedef struct PageserverFeedback
|
||||
{
|
||||
@@ -275,7 +289,7 @@ typedef struct PageserverFeedback
|
||||
XLogRecPtr disk_consistent_lsn;
|
||||
XLogRecPtr remote_consistent_lsn;
|
||||
TimestampTz replytime;
|
||||
} PageserverFeedback;
|
||||
} PageserverFeedback;
|
||||
|
||||
typedef struct WalproposerShmemState
|
||||
{
|
||||
@@ -283,7 +297,7 @@ typedef struct WalproposerShmemState
|
||||
PageserverFeedback feedback;
|
||||
term_t mineLastElectedTerm;
|
||||
pg_atomic_uint64 backpressureThrottlingTime;
|
||||
} WalproposerShmemState;
|
||||
} WalproposerShmemState;
|
||||
|
||||
/*
|
||||
* Report safekeeper state to proposer
|
||||
@@ -307,22 +321,17 @@ typedef struct AppendResponse
|
||||
/* and custom neon feedback. */
|
||||
/* This part of the message is extensible. */
|
||||
PageserverFeedback rf;
|
||||
} AppendResponse;
|
||||
} AppendResponse;
|
||||
|
||||
/* PageserverFeedback is extensible part of the message that is parsed separately */
|
||||
/* Other fields are fixed part */
|
||||
#define APPENDRESPONSE_FIXEDPART_SIZE offsetof(AppendResponse, rf)
|
||||
|
||||
struct WalProposer;
|
||||
typedef struct WalProposer WalProposer;
|
||||
|
||||
/*
|
||||
* Descriptor of safekeeper
|
||||
*/
|
||||
typedef struct Safekeeper
|
||||
{
|
||||
WalProposer *wp;
|
||||
|
||||
char const *host;
|
||||
char const *port;
|
||||
|
||||
@@ -331,7 +340,7 @@ typedef struct Safekeeper
|
||||
*
|
||||
* May contain private information like password and should not be logged.
|
||||
*/
|
||||
char conninfo[MAXCONNINFO];
|
||||
char conninfo[MAXCONNINFO];
|
||||
|
||||
/*
|
||||
* postgres protocol connection to the WAL acceptor
|
||||
@@ -364,12 +373,27 @@ typedef struct Safekeeper
|
||||
int eventPos; /* position in wait event set. Equal to -1 if*
|
||||
* no event */
|
||||
SafekeeperState state; /* safekeeper state machine state */
|
||||
TimestampTz latestMsgReceivedAt; /* when latest msg is received */
|
||||
TimestampTz latestMsgReceivedAt; /* when latest msg is received */
|
||||
AcceptorGreeting greetResponse; /* acceptor greeting */
|
||||
VoteResponse voteResponse; /* the vote */
|
||||
AppendResponse appendResponse; /* feedback for master */
|
||||
} Safekeeper;
|
||||
|
||||
extern void PGDLLEXPORT WalProposerSync(int argc, char *argv[]);
|
||||
extern void PGDLLEXPORT WalProposerMain(Datum main_arg);
|
||||
extern void WalProposerBroadcast(XLogRecPtr startpos, XLogRecPtr endpos);
|
||||
extern void WalProposerPoll(void);
|
||||
extern void ParsePageserverFeedbackMessage(StringInfo reply_message,
|
||||
PageserverFeedback *rf);
|
||||
extern void StartProposerReplication(StartReplicationCmd *cmd);
|
||||
|
||||
extern Size WalproposerShmemSize(void);
|
||||
extern bool WalproposerShmemInit(void);
|
||||
extern void replication_feedback_set(PageserverFeedback *rf);
|
||||
extern void replication_feedback_get_lsns(XLogRecPtr *writeLsn, XLogRecPtr *flushLsn, XLogRecPtr *applyLsn);
|
||||
|
||||
/* libpqwalproposer hooks & helper type */
|
||||
|
||||
/* Re-exported PostgresPollingStatusType */
|
||||
typedef enum
|
||||
{
|
||||
@@ -382,7 +406,7 @@ typedef enum
|
||||
* 'libpq-fe.h' still has PGRES_POLLING_ACTIVE, but says it's unused.
|
||||
* We've removed it here to avoid clutter.
|
||||
*/
|
||||
} WalProposerConnectPollStatusType;
|
||||
} WalProposerConnectPollStatusType;
|
||||
|
||||
/* Re-exported and modified ExecStatusType */
|
||||
typedef enum
|
||||
@@ -407,7 +431,7 @@ typedef enum
|
||||
WP_EXEC_NEEDS_INPUT,
|
||||
/* Catch-all failure. Check PQerrorMessage. */
|
||||
WP_EXEC_FAILED,
|
||||
} WalProposerExecStatusType;
|
||||
} WalProposerExecStatusType;
|
||||
|
||||
/* Re-exported ConnStatusType */
|
||||
typedef enum
|
||||
@@ -421,252 +445,67 @@ typedef enum
|
||||
* that extra functionality, so we collect them into a single tag here.
|
||||
*/
|
||||
WP_CONNECTION_IN_PROGRESS,
|
||||
} WalProposerConnStatusType;
|
||||
} WalProposerConnStatusType;
|
||||
|
||||
/* Re-exported PQerrorMessage */
|
||||
extern char *walprop_error_message(WalProposerConn *conn);
|
||||
|
||||
/* Re-exported PQstatus */
|
||||
extern WalProposerConnStatusType walprop_status(WalProposerConn *conn);
|
||||
|
||||
/* Re-exported PQconnectStart */
|
||||
extern WalProposerConn * walprop_connect_start(char *conninfo, char *password);
|
||||
|
||||
/* Re-exported PQconectPoll */
|
||||
extern WalProposerConnectPollStatusType walprop_connect_poll(WalProposerConn *conn);
|
||||
|
||||
/* Blocking wrapper around PQsendQuery */
|
||||
extern bool walprop_send_query(WalProposerConn *conn, char *query);
|
||||
|
||||
/* Wrapper around PQconsumeInput + PQisBusy + PQgetResult */
|
||||
extern WalProposerExecStatusType walprop_get_query_result(WalProposerConn *conn);
|
||||
|
||||
/* Re-exported PQsocket */
|
||||
extern pgsocket walprop_socket(WalProposerConn *conn);
|
||||
|
||||
/* Wrapper around PQconsumeInput (if socket's read-ready) + PQflush */
|
||||
extern int walprop_flush(WalProposerConn *conn);
|
||||
|
||||
/* Re-exported PQfinish */
|
||||
extern void walprop_finish(WalProposerConn *conn);
|
||||
|
||||
/*
|
||||
* Collection of hooks for walproposer, to call postgres functions,
|
||||
* read WAL and send it over the network.
|
||||
* Ergonomic wrapper around PGgetCopyData
|
||||
*
|
||||
* Reads a CopyData block from a safekeeper, setting *amount to the number
|
||||
* of bytes returned.
|
||||
*
|
||||
* This function is allowed to assume certain properties specific to the
|
||||
* protocol with the safekeepers, so it should not be used as-is for any
|
||||
* other purpose.
|
||||
*
|
||||
* Note: If possible, using <AsyncRead> is generally preferred, because it
|
||||
* performs a bit of extra checking work that's always required and is normally
|
||||
* somewhat verbose.
|
||||
*/
|
||||
typedef struct walproposer_api
|
||||
{
|
||||
/*
|
||||
* Get WalproposerShmemState. This is used to store information about last
|
||||
* elected term.
|
||||
*/
|
||||
WalproposerShmemState *(*get_shmem_state) (void);
|
||||
|
||||
/*
|
||||
* Start receiving notifications about new WAL. This is an infinite loop
|
||||
* which calls WalProposerBroadcast() and WalProposerPoll() to send the
|
||||
* WAL.
|
||||
*/
|
||||
void (*start_streaming) (WalProposer *wp, XLogRecPtr startpos);
|
||||
|
||||
/* Get pointer to the latest available WAL. */
|
||||
XLogRecPtr (*get_flush_rec_ptr) (void);
|
||||
|
||||
/* Get current time. */
|
||||
TimestampTz (*get_current_timestamp) (void);
|
||||
|
||||
/* Get postgres timeline. */
|
||||
TimeLineID (*get_timeline_id) (void);
|
||||
|
||||
/* Current error message, aka PQerrorMessage. */
|
||||
char *(*conn_error_message) (WalProposerConn *conn);
|
||||
|
||||
/* Connection status, aka PQstatus. */
|
||||
WalProposerConnStatusType (*conn_status) (WalProposerConn *conn);
|
||||
|
||||
/* Start the connection, aka PQconnectStart. */
|
||||
WalProposerConn *(*conn_connect_start) (char *conninfo);
|
||||
|
||||
/* Poll an asynchronous connection, aka PQconnectPoll. */
|
||||
WalProposerConnectPollStatusType (*conn_connect_poll) (WalProposerConn *conn);
|
||||
|
||||
/* Send a blocking SQL query, aka PQsendQuery. */
|
||||
bool (*conn_send_query) (WalProposerConn *conn, char *query);
|
||||
|
||||
/* Read the query result, aka PQgetResult. */
|
||||
WalProposerExecStatusType (*conn_get_query_result) (WalProposerConn *conn);
|
||||
|
||||
/* Flush buffer to the network, aka PQflush. */
|
||||
int (*conn_flush) (WalProposerConn *conn);
|
||||
|
||||
/* Close the connection, aka PQfinish. */
|
||||
void (*conn_finish) (WalProposerConn *conn);
|
||||
|
||||
/* Try to read CopyData message, aka PQgetCopyData. */
|
||||
PGAsyncReadResult (*conn_async_read) (WalProposerConn *conn, char **buf, int *amount);
|
||||
|
||||
/* Try to write CopyData message, aka PQputCopyData. */
|
||||
PGAsyncWriteResult (*conn_async_write) (WalProposerConn *conn, void const *buf, size_t size);
|
||||
|
||||
/* Blocking CopyData write, aka PQputCopyData + PQflush. */
|
||||
bool (*conn_blocking_write) (WalProposerConn *conn, void const *buf, size_t size);
|
||||
|
||||
/* Download WAL from startpos to endpos and make it available locally. */
|
||||
bool (*recovery_download) (Safekeeper *sk, TimeLineID timeline, XLogRecPtr startpos, XLogRecPtr endpos);
|
||||
|
||||
/* Read WAL from disk to buf. */
|
||||
void (*wal_read) (XLogReaderState *state, char *buf, XLogRecPtr startptr, Size count);
|
||||
|
||||
/* Allocate WAL reader. */
|
||||
XLogReaderState *(*wal_reader_allocate) (void);
|
||||
|
||||
/* Deallocate event set. */
|
||||
void (*free_event_set) (void);
|
||||
|
||||
/* Initialize event set. */
|
||||
void (*init_event_set) (int n_safekeepers);
|
||||
|
||||
/* Update events for an existing safekeeper connection. */
|
||||
void (*update_event_set) (Safekeeper *sk, uint32 events);
|
||||
|
||||
/* Add a new safekeeper connection to the event set. */
|
||||
void (*add_safekeeper_event_set) (Safekeeper *sk, uint32 events);
|
||||
|
||||
/*
|
||||
* Wait until some event happens: - timeout is reached - socket event for
|
||||
* safekeeper connection - new WAL is available
|
||||
*
|
||||
* Returns 0 if timeout is reached, 1 if some event happened. Updates
|
||||
* events mask to indicate events and sets sk to the safekeeper which has
|
||||
* an event.
|
||||
*/
|
||||
int (*wait_event_set) (long timeout, Safekeeper **sk, uint32 *events);
|
||||
|
||||
/* Read random bytes. */
|
||||
bool (*strong_random) (void *buf, size_t len);
|
||||
|
||||
/*
|
||||
* Get a basebackup LSN. Used to cross-validate with the latest available
|
||||
* LSN on the safekeepers.
|
||||
*/
|
||||
XLogRecPtr (*get_redo_start_lsn) (void);
|
||||
|
||||
/*
|
||||
* Finish sync safekeepers with the given LSN. This function should not
|
||||
* return and should exit the program.
|
||||
*/
|
||||
void (*finish_sync_safekeepers) (XLogRecPtr lsn);
|
||||
|
||||
/*
|
||||
* Called after every new message from the safekeeper. Used to propagate
|
||||
* backpressure feedback and to confirm WAL persistence (has been commited
|
||||
* on the quorum of safekeepers).
|
||||
*/
|
||||
void (*process_safekeeper_feedback) (WalProposer *wp, XLogRecPtr commitLsn);
|
||||
|
||||
/*
|
||||
* Called on peer_horizon_lsn updates. Used to advance replication slot
|
||||
* and to free up disk space by deleting unnecessary WAL.
|
||||
*/
|
||||
void (*confirm_wal_streamed) (XLogRecPtr lsn);
|
||||
} walproposer_api;
|
||||
extern PGAsyncReadResult walprop_async_read(WalProposerConn *conn, char **buf, int *amount);
|
||||
|
||||
/*
|
||||
* Configuration of the WAL proposer.
|
||||
* Ergonomic wrapper around PQputCopyData + PQflush
|
||||
*
|
||||
* Starts to write a CopyData block to a safekeeper.
|
||||
*
|
||||
* For information on the meaning of return codes, refer to PGAsyncWriteResult.
|
||||
*/
|
||||
typedef struct WalProposerConfig
|
||||
{
|
||||
/* hex-encoded TenantId cstr */
|
||||
char *neon_tenant;
|
||||
|
||||
/* hex-encoded TimelineId cstr */
|
||||
char *neon_timeline;
|
||||
|
||||
/*
|
||||
* Comma-separated list of safekeepers, in the following format:
|
||||
* host1:port1,host2:port2,host3:port3
|
||||
*
|
||||
* This cstr should be editable.
|
||||
*/
|
||||
char *safekeepers_list;
|
||||
|
||||
/*
|
||||
* WalProposer reconnects to offline safekeepers once in this interval.
|
||||
* Time is in milliseconds.
|
||||
*/
|
||||
int safekeeper_reconnect_timeout;
|
||||
|
||||
/*
|
||||
* WalProposer terminates the connection if it doesn't receive any message
|
||||
* from the safekeeper in this interval. Time is in milliseconds.
|
||||
*/
|
||||
int safekeeper_connection_timeout;
|
||||
|
||||
/*
|
||||
* WAL segment size. Will be passed to safekeepers in greet request. Also
|
||||
* used to detect page headers.
|
||||
*/
|
||||
int wal_segment_size;
|
||||
|
||||
/*
|
||||
* If safekeeper was started in sync mode, walproposer will not subscribe
|
||||
* for new WAL and will exit when quorum of safekeepers will be synced to
|
||||
* the latest available LSN.
|
||||
*/
|
||||
bool syncSafekeepers;
|
||||
|
||||
/* Will be passed to safekeepers in greet request. */
|
||||
uint64 systemId;
|
||||
} WalProposerConfig;
|
||||
|
||||
extern PGAsyncWriteResult walprop_async_write(WalProposerConn *conn, void const *buf, size_t size);
|
||||
|
||||
/*
|
||||
* WAL proposer state.
|
||||
* Blocking equivalent to walprop_async_write_fn
|
||||
*
|
||||
* Returns 'true' if successful, 'false' on failure.
|
||||
*/
|
||||
typedef struct WalProposer
|
||||
{
|
||||
WalProposerConfig *config;
|
||||
int n_safekeepers;
|
||||
extern bool walprop_blocking_write(WalProposerConn *conn, void const *buf, size_t size);
|
||||
|
||||
/* (n_safekeepers / 2) + 1 */
|
||||
int quorum;
|
||||
|
||||
Safekeeper safekeeper[MAX_SAFEKEEPERS];
|
||||
|
||||
/* WAL has been generated up to this point */
|
||||
XLogRecPtr availableLsn;
|
||||
|
||||
/* last commitLsn broadcasted to safekeepers */
|
||||
XLogRecPtr lastSentCommitLsn;
|
||||
|
||||
ProposerGreeting greetRequest;
|
||||
|
||||
/* Vote request for safekeeper */
|
||||
VoteRequest voteRequest;
|
||||
|
||||
/*
|
||||
* Minimal LSN which may be needed for recovery of some safekeeper,
|
||||
* record-aligned (first record which might not yet received by someone).
|
||||
*/
|
||||
XLogRecPtr truncateLsn;
|
||||
|
||||
/*
|
||||
* Term of the proposer. We want our term to be highest and unique, so we
|
||||
* collect terms from safekeepers quorum, choose max and +1. After that
|
||||
* our term is fixed and must not change. If we observe that some
|
||||
* safekeeper has higher term, it means that we have another running
|
||||
* compute, so we must stop immediately.
|
||||
*/
|
||||
term_t propTerm;
|
||||
|
||||
/* term history of the proposer */
|
||||
TermHistory propTermHistory;
|
||||
|
||||
/* epoch start lsn of the proposer */
|
||||
XLogRecPtr propEpochStartLsn;
|
||||
|
||||
/* Most advanced acceptor epoch */
|
||||
term_t donorEpoch;
|
||||
|
||||
/* Most advanced acceptor */
|
||||
int donor;
|
||||
|
||||
/* timeline globally starts at this LSN */
|
||||
XLogRecPtr timelineStartLsn;
|
||||
|
||||
/* number of votes collected from safekeepers */
|
||||
int n_votes;
|
||||
|
||||
/* number of successful connections over the lifetime of walproposer */
|
||||
int n_connected;
|
||||
|
||||
/*
|
||||
* Timestamp of the last reconnection attempt. Related to
|
||||
* config->safekeeper_reconnect_timeout
|
||||
*/
|
||||
TimestampTz last_reconnect_attempt;
|
||||
|
||||
walproposer_api api;
|
||||
} WalProposer;
|
||||
|
||||
extern WalProposer *WalProposerCreate(WalProposerConfig *config, walproposer_api api);
|
||||
extern void WalProposerStart(WalProposer *wp);
|
||||
extern void WalProposerBroadcast(WalProposer *wp, XLogRecPtr startpos, XLogRecPtr endpos);
|
||||
extern void WalProposerPoll(WalProposer *wp);
|
||||
extern void ParsePageserverFeedbackMessage(StringInfo reply_message,
|
||||
PageserverFeedback *rf);
|
||||
extern uint64 BackpressureThrottlingTime(void);
|
||||
|
||||
#endif /* __NEON_WALPROPOSER_H__ */
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
659
pgxn/neon/walproposer_utils.c
Normal file
659
pgxn/neon/walproposer_utils.c
Normal file
@@ -0,0 +1,659 @@
|
||||
#include "postgres.h"
|
||||
|
||||
#include "access/timeline.h"
|
||||
#include "access/xlogutils.h"
|
||||
#include "common/logging.h"
|
||||
#include "common/ip.h"
|
||||
#include "funcapi.h"
|
||||
#include "libpq/libpq.h"
|
||||
#include "libpq/pqformat.h"
|
||||
#include "miscadmin.h"
|
||||
#include "postmaster/interrupt.h"
|
||||
#include "replication/slot.h"
|
||||
#include "walproposer_utils.h"
|
||||
#include "replication/walsender_private.h"
|
||||
|
||||
#include "storage/ipc.h"
|
||||
#include "utils/builtins.h"
|
||||
#include "utils/ps_status.h"
|
||||
|
||||
#include "libpq-fe.h"
|
||||
#include <netinet/tcp.h>
|
||||
#include <unistd.h>
|
||||
|
||||
#if PG_VERSION_NUM >= 150000
|
||||
#include "access/xlogutils.h"
|
||||
#include "access/xlogrecovery.h"
|
||||
#endif
|
||||
#if PG_MAJORVERSION_NUM >= 16
|
||||
#include "utils/guc.h"
|
||||
#endif
|
||||
|
||||
/*
|
||||
* These variables are used similarly to openLogFile/SegNo,
|
||||
* but for walproposer to write the XLOG during recovery. walpropFileTLI is the TimeLineID
|
||||
* corresponding the filename of walpropFile.
|
||||
*/
|
||||
static int walpropFile = -1;
|
||||
static TimeLineID walpropFileTLI = 0;
|
||||
static XLogSegNo walpropSegNo = 0;
|
||||
|
||||
/* START cloned file-local variables and functions from walsender.c */
|
||||
|
||||
/*
|
||||
* How far have we sent WAL already? This is also advertised in
|
||||
* MyWalSnd->sentPtr. (Actually, this is the next WAL location to send.)
|
||||
*/
|
||||
static XLogRecPtr sentPtr = InvalidXLogRecPtr;
|
||||
|
||||
static void WalSndLoop(void);
|
||||
static void XLogBroadcastWalProposer(void);
|
||||
/* END cloned file-level variables and functions from walsender.c */
|
||||
|
||||
int
|
||||
CompareLsn(const void *a, const void *b)
|
||||
{
|
||||
XLogRecPtr lsn1 = *((const XLogRecPtr *) a);
|
||||
XLogRecPtr lsn2 = *((const XLogRecPtr *) b);
|
||||
|
||||
if (lsn1 < lsn2)
|
||||
return -1;
|
||||
else if (lsn1 == lsn2)
|
||||
return 0;
|
||||
else
|
||||
return 1;
|
||||
}
|
||||
|
||||
/* Returns a human-readable string corresonding to the SafekeeperState
|
||||
*
|
||||
* The string should not be freed.
|
||||
*
|
||||
* The strings are intended to be used as a prefix to "state", e.g.:
|
||||
*
|
||||
* elog(LOG, "currently in %s state", FormatSafekeeperState(sk->state));
|
||||
*
|
||||
* If this sort of phrasing doesn't fit the message, instead use something like:
|
||||
*
|
||||
* elog(LOG, "currently in state [%s]", FormatSafekeeperState(sk->state));
|
||||
*/
|
||||
char *
|
||||
FormatSafekeeperState(SafekeeperState state)
|
||||
{
|
||||
char *return_val = NULL;
|
||||
|
||||
switch (state)
|
||||
{
|
||||
case SS_OFFLINE:
|
||||
return_val = "offline";
|
||||
break;
|
||||
case SS_CONNECTING_READ:
|
||||
case SS_CONNECTING_WRITE:
|
||||
return_val = "connecting";
|
||||
break;
|
||||
case SS_WAIT_EXEC_RESULT:
|
||||
return_val = "receiving query result";
|
||||
break;
|
||||
case SS_HANDSHAKE_RECV:
|
||||
return_val = "handshake (receiving)";
|
||||
break;
|
||||
case SS_VOTING:
|
||||
return_val = "voting";
|
||||
break;
|
||||
case SS_WAIT_VERDICT:
|
||||
return_val = "wait-for-verdict";
|
||||
break;
|
||||
case SS_SEND_ELECTED_FLUSH:
|
||||
return_val = "send-announcement-flush";
|
||||
break;
|
||||
case SS_IDLE:
|
||||
return_val = "idle";
|
||||
break;
|
||||
case SS_ACTIVE:
|
||||
return_val = "active";
|
||||
break;
|
||||
}
|
||||
|
||||
Assert(return_val != NULL);
|
||||
|
||||
return return_val;
|
||||
}
|
||||
|
||||
/* Asserts that the provided events are expected for given safekeeper's state */
|
||||
void
|
||||
AssertEventsOkForState(uint32 events, Safekeeper *sk)
|
||||
{
|
||||
uint32 expected = SafekeeperStateDesiredEvents(sk->state);
|
||||
|
||||
/*
|
||||
* The events are in-line with what we're expecting, under two conditions:
|
||||
* (a) if we aren't expecting anything, `events` has no read- or
|
||||
* write-ready component. (b) if we are expecting something, there's
|
||||
* overlap (i.e. `events & expected != 0`)
|
||||
*/
|
||||
bool events_ok_for_state; /* long name so the `Assert` is more
|
||||
* clear later */
|
||||
|
||||
if (expected == WL_NO_EVENTS)
|
||||
events_ok_for_state = ((events & (WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE)) == 0);
|
||||
else
|
||||
events_ok_for_state = ((events & expected) != 0);
|
||||
|
||||
if (!events_ok_for_state)
|
||||
{
|
||||
/*
|
||||
* To give a descriptive message in the case of failure, we use elog
|
||||
* and then an assertion that's guaranteed to fail.
|
||||
*/
|
||||
elog(WARNING, "events %s mismatched for safekeeper %s:%s in state [%s]",
|
||||
FormatEvents(events), sk->host, sk->port, FormatSafekeeperState(sk->state));
|
||||
Assert(events_ok_for_state);
|
||||
}
|
||||
}
|
||||
|
||||
/* Returns the set of events a safekeeper in this state should be waiting on
|
||||
*
|
||||
* This will return WL_NO_EVENTS (= 0) for some events. */
|
||||
uint32
|
||||
SafekeeperStateDesiredEvents(SafekeeperState state)
|
||||
{
|
||||
uint32 result = WL_NO_EVENTS;
|
||||
|
||||
/* If the state doesn't have a modifier, we can check the base state */
|
||||
switch (state)
|
||||
{
|
||||
/* Connecting states say what they want in the name */
|
||||
case SS_CONNECTING_READ:
|
||||
result = WL_SOCKET_READABLE;
|
||||
break;
|
||||
case SS_CONNECTING_WRITE:
|
||||
result = WL_SOCKET_WRITEABLE;
|
||||
break;
|
||||
|
||||
/* Reading states need the socket to be read-ready to continue */
|
||||
case SS_WAIT_EXEC_RESULT:
|
||||
case SS_HANDSHAKE_RECV:
|
||||
case SS_WAIT_VERDICT:
|
||||
result = WL_SOCKET_READABLE;
|
||||
break;
|
||||
|
||||
/*
|
||||
* Idle states use read-readiness as a sign that the connection
|
||||
* has been disconnected.
|
||||
*/
|
||||
case SS_VOTING:
|
||||
case SS_IDLE:
|
||||
result = WL_SOCKET_READABLE;
|
||||
break;
|
||||
|
||||
/*
|
||||
* Flush states require write-ready for flushing. Active state
|
||||
* does both reading and writing.
|
||||
*
|
||||
* TODO: SS_ACTIVE sometimes doesn't need to be write-ready. We
|
||||
* should check sk->flushWrite here to set WL_SOCKET_WRITEABLE.
|
||||
*/
|
||||
case SS_SEND_ELECTED_FLUSH:
|
||||
case SS_ACTIVE:
|
||||
result = WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE;
|
||||
break;
|
||||
|
||||
/* The offline state expects no events. */
|
||||
case SS_OFFLINE:
|
||||
result = WL_NO_EVENTS;
|
||||
break;
|
||||
|
||||
default:
|
||||
Assert(false);
|
||||
break;
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
/* Returns a human-readable string corresponding to the event set
|
||||
*
|
||||
* If the events do not correspond to something set as the `events` field of a `WaitEvent`, the
|
||||
* returned string may be meaingless.
|
||||
*
|
||||
* The string should not be freed. It should also not be expected to remain the same between
|
||||
* function calls. */
|
||||
char *
|
||||
FormatEvents(uint32 events)
|
||||
{
|
||||
static char return_str[8];
|
||||
|
||||
/* Helper variable to check if there's extra bits */
|
||||
uint32 all_flags = WL_LATCH_SET
|
||||
| WL_SOCKET_READABLE
|
||||
| WL_SOCKET_WRITEABLE
|
||||
| WL_TIMEOUT
|
||||
| WL_POSTMASTER_DEATH
|
||||
| WL_EXIT_ON_PM_DEATH
|
||||
| WL_SOCKET_CONNECTED;
|
||||
|
||||
/*
|
||||
* The formatting here isn't supposed to be *particularly* useful -- it's
|
||||
* just to give an sense of what events have been triggered without
|
||||
* needing to remember your powers of two.
|
||||
*/
|
||||
|
||||
return_str[0] = (events & WL_LATCH_SET) ? 'L' : '_';
|
||||
return_str[1] = (events & WL_SOCKET_READABLE) ? 'R' : '_';
|
||||
return_str[2] = (events & WL_SOCKET_WRITEABLE) ? 'W' : '_';
|
||||
return_str[3] = (events & WL_TIMEOUT) ? 'T' : '_';
|
||||
return_str[4] = (events & WL_POSTMASTER_DEATH) ? 'D' : '_';
|
||||
return_str[5] = (events & WL_EXIT_ON_PM_DEATH) ? 'E' : '_';
|
||||
return_str[5] = (events & WL_SOCKET_CONNECTED) ? 'C' : '_';
|
||||
|
||||
if (events & (~all_flags))
|
||||
{
|
||||
elog(WARNING, "Event formatting found unexpected component %d",
|
||||
events & (~all_flags));
|
||||
return_str[6] = '*';
|
||||
return_str[7] = '\0';
|
||||
}
|
||||
else
|
||||
return_str[6] = '\0';
|
||||
|
||||
return (char *) &return_str;
|
||||
}
|
||||
|
||||
/*
|
||||
* Convert a character which represents a hexadecimal digit to an integer.
|
||||
*
|
||||
* Returns -1 if the character is not a hexadecimal digit.
|
||||
*/
|
||||
static int
|
||||
HexDecodeChar(char c)
|
||||
{
|
||||
if (c >= '0' && c <= '9')
|
||||
return c - '0';
|
||||
if (c >= 'a' && c <= 'f')
|
||||
return c - 'a' + 10;
|
||||
if (c >= 'A' && c <= 'F')
|
||||
return c - 'A' + 10;
|
||||
|
||||
return -1;
|
||||
}
|
||||
|
||||
/*
|
||||
* Decode a hex string into a byte string, 2 hex chars per byte.
|
||||
*
|
||||
* Returns false if invalid characters are encountered; otherwise true.
|
||||
*/
|
||||
bool
|
||||
HexDecodeString(uint8 *result, char *input, int nbytes)
|
||||
{
|
||||
int i;
|
||||
|
||||
for (i = 0; i < nbytes; ++i)
|
||||
{
|
||||
int n1 = HexDecodeChar(input[i * 2]);
|
||||
int n2 = HexDecodeChar(input[i * 2 + 1]);
|
||||
|
||||
if (n1 < 0 || n2 < 0)
|
||||
return false;
|
||||
result[i] = n1 * 16 + n2;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
/* --------------------------------
|
||||
* pq_getmsgint32_le - get a binary 4-byte int from a message buffer in native (LE) order
|
||||
* --------------------------------
|
||||
*/
|
||||
uint32
|
||||
pq_getmsgint32_le(StringInfo msg)
|
||||
{
|
||||
uint32 n32;
|
||||
|
||||
pq_copymsgbytes(msg, (char *) &n32, sizeof(n32));
|
||||
|
||||
return n32;
|
||||
}
|
||||
|
||||
/* --------------------------------
|
||||
* pq_getmsgint64 - get a binary 8-byte int from a message buffer in native (LE) order
|
||||
* --------------------------------
|
||||
*/
|
||||
uint64
|
||||
pq_getmsgint64_le(StringInfo msg)
|
||||
{
|
||||
uint64 n64;
|
||||
|
||||
pq_copymsgbytes(msg, (char *) &n64, sizeof(n64));
|
||||
|
||||
return n64;
|
||||
}
|
||||
|
||||
/* append a binary [u]int32 to a StringInfo buffer in native (LE) order */
|
||||
void
|
||||
pq_sendint32_le(StringInfo buf, uint32 i)
|
||||
{
|
||||
enlargeStringInfo(buf, sizeof(uint32));
|
||||
memcpy(buf->data + buf->len, &i, sizeof(uint32));
|
||||
buf->len += sizeof(uint32);
|
||||
}
|
||||
|
||||
/* append a binary [u]int64 to a StringInfo buffer in native (LE) order */
|
||||
void
|
||||
pq_sendint64_le(StringInfo buf, uint64 i)
|
||||
{
|
||||
enlargeStringInfo(buf, sizeof(uint64));
|
||||
memcpy(buf->data + buf->len, &i, sizeof(uint64));
|
||||
buf->len += sizeof(uint64);
|
||||
}
|
||||
|
||||
/*
|
||||
* Write XLOG data to disk.
|
||||
*/
|
||||
void
|
||||
XLogWalPropWrite(char *buf, Size nbytes, XLogRecPtr recptr)
|
||||
{
|
||||
int startoff;
|
||||
int byteswritten;
|
||||
|
||||
while (nbytes > 0)
|
||||
{
|
||||
int segbytes;
|
||||
|
||||
/* Close the current segment if it's completed */
|
||||
if (walpropFile >= 0 && !XLByteInSeg(recptr, walpropSegNo, wal_segment_size))
|
||||
XLogWalPropClose(recptr);
|
||||
|
||||
if (walpropFile < 0)
|
||||
{
|
||||
#if PG_VERSION_NUM >= 150000
|
||||
/* FIXME Is it ok to use hardcoded value here? */
|
||||
TimeLineID tli = 1;
|
||||
#else
|
||||
bool use_existent = true;
|
||||
#endif
|
||||
/* Create/use new log file */
|
||||
XLByteToSeg(recptr, walpropSegNo, wal_segment_size);
|
||||
#if PG_VERSION_NUM >= 150000
|
||||
walpropFile = XLogFileInit(walpropSegNo, tli);
|
||||
walpropFileTLI = tli;
|
||||
#else
|
||||
walpropFile = XLogFileInit(walpropSegNo, &use_existent, false);
|
||||
walpropFileTLI = ThisTimeLineID;
|
||||
#endif
|
||||
}
|
||||
|
||||
/* Calculate the start offset of the received logs */
|
||||
startoff = XLogSegmentOffset(recptr, wal_segment_size);
|
||||
|
||||
if (startoff + nbytes > wal_segment_size)
|
||||
segbytes = wal_segment_size - startoff;
|
||||
else
|
||||
segbytes = nbytes;
|
||||
|
||||
/* OK to write the logs */
|
||||
errno = 0;
|
||||
|
||||
byteswritten = pg_pwrite(walpropFile, buf, segbytes, (off_t) startoff);
|
||||
if (byteswritten <= 0)
|
||||
{
|
||||
char xlogfname[MAXFNAMELEN];
|
||||
int save_errno;
|
||||
|
||||
/* if write didn't set errno, assume no disk space */
|
||||
if (errno == 0)
|
||||
errno = ENOSPC;
|
||||
|
||||
save_errno = errno;
|
||||
XLogFileName(xlogfname, walpropFileTLI, walpropSegNo, wal_segment_size);
|
||||
errno = save_errno;
|
||||
ereport(PANIC,
|
||||
(errcode_for_file_access(),
|
||||
errmsg("could not write to log segment %s "
|
||||
"at offset %u, length %lu: %m",
|
||||
xlogfname, startoff, (unsigned long) segbytes)));
|
||||
}
|
||||
|
||||
/* Update state for write */
|
||||
recptr += byteswritten;
|
||||
|
||||
nbytes -= byteswritten;
|
||||
buf += byteswritten;
|
||||
}
|
||||
|
||||
/*
|
||||
* Close the current segment if it's fully written up in the last cycle of
|
||||
* the loop.
|
||||
*/
|
||||
if (walpropFile >= 0 && !XLByteInSeg(recptr, walpropSegNo, wal_segment_size))
|
||||
{
|
||||
XLogWalPropClose(recptr);
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Close the current segment.
|
||||
*/
|
||||
void
|
||||
XLogWalPropClose(XLogRecPtr recptr)
|
||||
{
|
||||
Assert(walpropFile >= 0 && !XLByteInSeg(recptr, walpropSegNo, wal_segment_size));
|
||||
|
||||
if (close(walpropFile) != 0)
|
||||
{
|
||||
char xlogfname[MAXFNAMELEN];
|
||||
|
||||
XLogFileName(xlogfname, walpropFileTLI, walpropSegNo, wal_segment_size);
|
||||
|
||||
ereport(PANIC,
|
||||
(errcode_for_file_access(),
|
||||
errmsg("could not close log segment %s: %m",
|
||||
xlogfname)));
|
||||
}
|
||||
|
||||
walpropFile = -1;
|
||||
}
|
||||
|
||||
/* START of cloned functions from walsender.c */
|
||||
|
||||
/*
|
||||
* Subscribe for new WAL and stream it in the loop to safekeepers.
|
||||
*
|
||||
* At the moment, this never returns, but an ereport(ERROR) will take us back
|
||||
* to the main loop.
|
||||
*/
|
||||
void
|
||||
StartProposerReplication(StartReplicationCmd *cmd)
|
||||
{
|
||||
XLogRecPtr FlushPtr;
|
||||
TimeLineID currTLI;
|
||||
|
||||
#if PG_VERSION_NUM < 150000
|
||||
if (ThisTimeLineID == 0)
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
|
||||
errmsg("IDENTIFY_SYSTEM has not been run before START_REPLICATION")));
|
||||
#endif
|
||||
|
||||
/*
|
||||
* We assume here that we're logging enough information in the WAL for
|
||||
* log-shipping, since this is checked in PostmasterMain().
|
||||
*
|
||||
* NOTE: wal_level can only change at shutdown, so in most cases it is
|
||||
* difficult for there to be WAL data that we can still see that was
|
||||
* written at wal_level='minimal'.
|
||||
*/
|
||||
|
||||
if (cmd->slotname)
|
||||
{
|
||||
ReplicationSlotAcquire(cmd->slotname, true);
|
||||
if (SlotIsLogical(MyReplicationSlot))
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
|
||||
errmsg("cannot use a logical replication slot for physical replication")));
|
||||
|
||||
/*
|
||||
* We don't need to verify the slot's restart_lsn here; instead we
|
||||
* rely on the caller requesting the starting point to use. If the
|
||||
* WAL segment doesn't exist, we'll fail later.
|
||||
*/
|
||||
}
|
||||
|
||||
/*
|
||||
* Select the timeline. If it was given explicitly by the client, use
|
||||
* that. Otherwise use the timeline of the last replayed record, which is
|
||||
* kept in ThisTimeLineID.
|
||||
*
|
||||
* Neon doesn't currently use PG Timelines, but it may in the future, so
|
||||
* we keep this code around to lighten the load for when we need it.
|
||||
*/
|
||||
#if PG_VERSION_NUM >= 150000
|
||||
FlushPtr = GetFlushRecPtr(&currTLI);
|
||||
#else
|
||||
FlushPtr = GetFlushRecPtr();
|
||||
currTLI = ThisTimeLineID;
|
||||
#endif
|
||||
|
||||
/*
|
||||
* When we first start replication the standby will be behind the
|
||||
* primary. For some applications, for example synchronous
|
||||
* replication, it is important to have a clear state for this initial
|
||||
* catchup mode, so we can trigger actions when we change streaming
|
||||
* state later. We may stay in this state for a long time, which is
|
||||
* exactly why we want to be able to monitor whether or not we are
|
||||
* still here.
|
||||
*/
|
||||
WalSndSetState(WALSNDSTATE_CATCHUP);
|
||||
|
||||
/*
|
||||
* Don't allow a request to stream from a future point in WAL that
|
||||
* hasn't been flushed to disk in this server yet.
|
||||
*/
|
||||
if (FlushPtr < cmd->startpoint)
|
||||
{
|
||||
ereport(ERROR,
|
||||
(errmsg("requested starting point %X/%X is ahead of the WAL flush position of this server %X/%X",
|
||||
LSN_FORMAT_ARGS(cmd->startpoint),
|
||||
LSN_FORMAT_ARGS(FlushPtr))));
|
||||
}
|
||||
|
||||
/* Start streaming from the requested point */
|
||||
sentPtr = cmd->startpoint;
|
||||
|
||||
/* Initialize shared memory status, too */
|
||||
SpinLockAcquire(&MyWalSnd->mutex);
|
||||
MyWalSnd->sentPtr = sentPtr;
|
||||
SpinLockRelease(&MyWalSnd->mutex);
|
||||
|
||||
SyncRepInitConfig();
|
||||
|
||||
/* Infinite send loop, never returns */
|
||||
WalSndLoop();
|
||||
|
||||
WalSndSetState(WALSNDSTATE_STARTUP);
|
||||
|
||||
if (cmd->slotname)
|
||||
ReplicationSlotRelease();
|
||||
}
|
||||
|
||||
/*
|
||||
* Main loop that waits for LSN updates and calls the walproposer.
|
||||
* Synchronous replication sets latch in WalSndWakeup at walsender.c
|
||||
*/
|
||||
static void
|
||||
WalSndLoop(void)
|
||||
{
|
||||
/* Clear any already-pending wakeups */
|
||||
ResetLatch(MyLatch);
|
||||
|
||||
for (;;)
|
||||
{
|
||||
CHECK_FOR_INTERRUPTS();
|
||||
|
||||
XLogBroadcastWalProposer();
|
||||
|
||||
if (MyWalSnd->state == WALSNDSTATE_CATCHUP)
|
||||
WalSndSetState(WALSNDSTATE_STREAMING);
|
||||
WalProposerPoll();
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Notify walproposer about the new WAL position.
|
||||
*/
|
||||
static void
|
||||
XLogBroadcastWalProposer(void)
|
||||
{
|
||||
XLogRecPtr startptr;
|
||||
XLogRecPtr endptr;
|
||||
|
||||
/* Start from the last sent position */
|
||||
startptr = sentPtr;
|
||||
|
||||
/*
|
||||
* Streaming the current timeline on a primary.
|
||||
*
|
||||
* Attempt to send all data that's already been written out and
|
||||
* fsync'd to disk. We cannot go further than what's been written out
|
||||
* given the current implementation of WALRead(). And in any case
|
||||
* it's unsafe to send WAL that is not securely down to disk on the
|
||||
* primary: if the primary subsequently crashes and restarts, standbys
|
||||
* must not have applied any WAL that got lost on the primary.
|
||||
*/
|
||||
#if PG_VERSION_NUM >= 150000
|
||||
endptr = GetFlushRecPtr(NULL);
|
||||
#else
|
||||
endptr = GetFlushRecPtr();
|
||||
#endif
|
||||
|
||||
/*
|
||||
* Record the current system time as an approximation of the time at which
|
||||
* this WAL location was written for the purposes of lag tracking.
|
||||
*
|
||||
* In theory we could make XLogFlush() record a time in shmem whenever WAL
|
||||
* is flushed and we could get that time as well as the LSN when we call
|
||||
* GetFlushRecPtr() above (and likewise for the cascading standby
|
||||
* equivalent), but rather than putting any new code into the hot WAL path
|
||||
* it seems good enough to capture the time here. We should reach this
|
||||
* after XLogFlush() runs WalSndWakeupProcessRequests(), and although that
|
||||
* may take some time, we read the WAL flush pointer and take the time
|
||||
* very close to together here so that we'll get a later position if it is
|
||||
* still moving.
|
||||
*
|
||||
* Because LagTrackerWrite ignores samples when the LSN hasn't advanced,
|
||||
* this gives us a cheap approximation for the WAL flush time for this
|
||||
* LSN.
|
||||
*
|
||||
* Note that the LSN is not necessarily the LSN for the data contained in
|
||||
* the present message; it's the end of the WAL, which might be further
|
||||
* ahead. All the lag tracking machinery cares about is finding out when
|
||||
* that arbitrary LSN is eventually reported as written, flushed and
|
||||
* applied, so that it can measure the elapsed time.
|
||||
*/
|
||||
LagTrackerWrite(endptr, GetCurrentTimestamp());
|
||||
|
||||
/* Do we have any work to do? */
|
||||
Assert(startptr <= endptr);
|
||||
if (endptr <= startptr)
|
||||
return;
|
||||
|
||||
WalProposerBroadcast(startptr, endptr);
|
||||
sentPtr = endptr;
|
||||
|
||||
/* Update shared memory status */
|
||||
{
|
||||
WalSnd *walsnd = MyWalSnd;
|
||||
|
||||
SpinLockAcquire(&walsnd->mutex);
|
||||
walsnd->sentPtr = sentPtr;
|
||||
SpinLockRelease(&walsnd->mutex);
|
||||
}
|
||||
|
||||
/* Report progress of XLOG streaming in PS display */
|
||||
if (update_process_title)
|
||||
{
|
||||
char activitymsg[50];
|
||||
|
||||
snprintf(activitymsg, sizeof(activitymsg), "streaming %X/%X",
|
||||
LSN_FORMAT_ARGS(sentPtr));
|
||||
set_ps_display(activitymsg);
|
||||
}
|
||||
}
|
||||
19
pgxn/neon/walproposer_utils.h
Normal file
19
pgxn/neon/walproposer_utils.h
Normal file
@@ -0,0 +1,19 @@
|
||||
#ifndef __NEON_WALPROPOSER_UTILS_H__
|
||||
#define __NEON_WALPROPOSER_UTILS_H__
|
||||
|
||||
#include "walproposer.h"
|
||||
|
||||
int CompareLsn(const void *a, const void *b);
|
||||
char *FormatSafekeeperState(SafekeeperState state);
|
||||
void AssertEventsOkForState(uint32 events, Safekeeper *sk);
|
||||
uint32 SafekeeperStateDesiredEvents(SafekeeperState state);
|
||||
char *FormatEvents(uint32 events);
|
||||
bool HexDecodeString(uint8 *result, char *input, int nbytes);
|
||||
uint32 pq_getmsgint32_le(StringInfo msg);
|
||||
uint64 pq_getmsgint64_le(StringInfo msg);
|
||||
void pq_sendint32_le(StringInfo buf, uint32 i);
|
||||
void pq_sendint64_le(StringInfo buf, uint64 i);
|
||||
void XLogWalPropWrite(char *buf, Size nbytes, XLogRecPtr recptr);
|
||||
void XLogWalPropClose(XLogRecPtr recptr);
|
||||
|
||||
#endif /* __NEON_WALPROPOSER_UTILS_H__ */
|
||||
@@ -89,10 +89,7 @@ pub mod errors {
|
||||
Self::Console {
|
||||
status: http::StatusCode::LOCKED,
|
||||
ref text,
|
||||
} => {
|
||||
!text.contains("written data quota exceeded")
|
||||
&& !text.contains("the limit for current plan reached")
|
||||
}
|
||||
} => !text.contains("quota"),
|
||||
// retry server errors
|
||||
Self::Console { status, .. } if status.is_server_error() => true,
|
||||
_ => false,
|
||||
|
||||
@@ -723,9 +723,9 @@ impl Timeline {
|
||||
if horizon_segno <= 1 || horizon_segno <= shared_state.last_removed_segno {
|
||||
return Ok(()); // nothing to do
|
||||
}
|
||||
|
||||
let remover = shared_state.sk.wal_store.remove_up_to(horizon_segno - 1);
|
||||
// release the lock before removing
|
||||
shared_state.sk.wal_store.remove_up_to(horizon_segno - 1)
|
||||
remover
|
||||
};
|
||||
|
||||
// delete old WAL files
|
||||
|
||||
@@ -1085,32 +1085,15 @@ class AbstractNeonCli(abc.ABC):
|
||||
stderr=subprocess.PIPE,
|
||||
timeout=timeout,
|
||||
)
|
||||
|
||||
indent = " "
|
||||
if not res.returncode:
|
||||
stripped = res.stdout.strip()
|
||||
lines = stripped.splitlines()
|
||||
if len(lines) < 2:
|
||||
log.debug(f"Run {res.args} success: {stripped}")
|
||||
else:
|
||||
log.debug("Run %s success:\n%s" % (res.args, textwrap.indent(stripped, indent)))
|
||||
log.info(f"Run {res.args} success: {res.stdout}")
|
||||
elif check_return_code:
|
||||
# this way command output will be in recorded and shown in CI in failure message
|
||||
indent = indent * 2
|
||||
msg = textwrap.dedent(
|
||||
"""\
|
||||
Run %s failed:
|
||||
stdout:
|
||||
%s
|
||||
stderr:
|
||||
%s
|
||||
msg = f"""\
|
||||
Run {res.args} failed:
|
||||
stdout: {res.stdout}
|
||||
stderr: {res.stderr}
|
||||
"""
|
||||
)
|
||||
msg = msg % (
|
||||
res.args,
|
||||
textwrap.indent(res.stdout.strip(), indent),
|
||||
textwrap.indent(res.stderr.strip(), indent),
|
||||
)
|
||||
log.info(msg)
|
||||
raise RuntimeError(msg) from subprocess.CalledProcessError(
|
||||
res.returncode, res.args, res.stdout, res.stderr
|
||||
@@ -1464,29 +1447,6 @@ class NeonCli(AbstractNeonCli):
|
||||
|
||||
return self.raw_cli(args, check_return_code=check_return_code)
|
||||
|
||||
def map_branch(
|
||||
self, name: str, tenant_id: TenantId, timeline_id: TimelineId
|
||||
) -> "subprocess.CompletedProcess[str]":
|
||||
"""
|
||||
Map tenant id and timeline id to a neon_local branch name. They do not have to exist.
|
||||
Usually needed when creating branches via PageserverHttpClient and not neon_local.
|
||||
|
||||
After creating a name mapping, you can use EndpointFactory.create_start
|
||||
with this registered branch name.
|
||||
"""
|
||||
args = [
|
||||
"mappings",
|
||||
"map",
|
||||
"--branch-name",
|
||||
name,
|
||||
"--tenant-id",
|
||||
str(tenant_id),
|
||||
"--timeline-id",
|
||||
str(timeline_id),
|
||||
]
|
||||
|
||||
return self.raw_cli(args, check_return_code=True)
|
||||
|
||||
def start(self, check_return_code=True) -> "subprocess.CompletedProcess[str]":
|
||||
return self.raw_cli(["start"], check_return_code=check_return_code)
|
||||
|
||||
|
||||
@@ -74,14 +74,11 @@ def wait_until_tenant_state(
|
||||
for _ in range(iterations):
|
||||
try:
|
||||
tenant = pageserver_http.tenant_status(tenant_id=tenant_id)
|
||||
except Exception as e:
|
||||
log.debug(f"Tenant {tenant_id} state retrieval failure: {e}")
|
||||
else:
|
||||
log.debug(f"Tenant {tenant_id} data: {tenant}")
|
||||
if tenant["state"]["slug"] == expected_state:
|
||||
return tenant
|
||||
if tenant["state"]["slug"] == "Broken":
|
||||
raise RuntimeError(f"tenant became Broken, not {expected_state}")
|
||||
except Exception as e:
|
||||
log.debug(f"Tenant {tenant_id} state retrieval failure: {e}")
|
||||
|
||||
time.sleep(period)
|
||||
|
||||
|
||||
@@ -1,24 +1,14 @@
|
||||
import random
|
||||
import threading
|
||||
import time
|
||||
from queue import SimpleQueue
|
||||
from typing import Any, Dict, List, Union
|
||||
from typing import List
|
||||
|
||||
import pytest
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import (
|
||||
Endpoint,
|
||||
NeonEnv,
|
||||
NeonEnvBuilder,
|
||||
PgBin,
|
||||
)
|
||||
from fixtures.pageserver.http import PageserverApiException
|
||||
from fixtures.pageserver.utils import wait_until_tenant_active
|
||||
from fixtures.types import Lsn, TimelineId
|
||||
from fixtures.neon_fixtures import Endpoint, NeonEnv, PgBin
|
||||
from fixtures.types import Lsn
|
||||
from fixtures.utils import query_scalar
|
||||
from performance.test_perf_pgbench import get_scales_matrix
|
||||
from requests import RequestException
|
||||
from requests.exceptions import RetryError
|
||||
|
||||
|
||||
# Test branch creation
|
||||
@@ -138,245 +128,3 @@ def test_branching_unnormalized_start_lsn(neon_simple_env: NeonEnv, pg_bin: PgBi
|
||||
endpoint1 = env.endpoints.create_start("b1")
|
||||
|
||||
pg_bin.run_capture(["pgbench", "-i", endpoint1.connstr()])
|
||||
|
||||
|
||||
def test_cannot_create_endpoint_on_non_uploaded_timeline(neon_env_builder: NeonEnvBuilder):
|
||||
"""
|
||||
Endpoint should not be possible to create because branch has not been uploaded.
|
||||
"""
|
||||
|
||||
env = neon_env_builder.init_configs()
|
||||
env.start()
|
||||
|
||||
env.pageserver.allowed_errors.append(
|
||||
".*request{method=POST path=/v1/tenant/.*/timeline request_id=.*}: request was dropped before completing.*"
|
||||
)
|
||||
env.pageserver.allowed_errors.append(
|
||||
".*page_service_conn_main.*: query handler for 'basebackup .* is not active, state: Loading"
|
||||
)
|
||||
ps_http = env.pageserver.http_client()
|
||||
|
||||
# pause all uploads
|
||||
ps_http.configure_failpoints(("before-upload-index-pausable", "pause"))
|
||||
ps_http.tenant_create(env.initial_tenant)
|
||||
|
||||
initial_branch = "initial_branch"
|
||||
|
||||
def start_creating_timeline():
|
||||
with pytest.raises(RequestException):
|
||||
ps_http.timeline_create(
|
||||
env.pg_version, env.initial_tenant, env.initial_timeline, timeout=60
|
||||
)
|
||||
|
||||
t = threading.Thread(target=start_creating_timeline)
|
||||
try:
|
||||
t.start()
|
||||
|
||||
wait_until_paused(env, "before-upload-index-pausable")
|
||||
|
||||
env.neon_cli.map_branch(initial_branch, env.initial_tenant, env.initial_timeline)
|
||||
|
||||
with pytest.raises(RuntimeError, match="is not active, state: Loading"):
|
||||
env.endpoints.create_start(initial_branch, tenant_id=env.initial_tenant)
|
||||
finally:
|
||||
# FIXME: paused uploads bother shutdown
|
||||
env.pageserver.stop(immediate=True)
|
||||
|
||||
t.join()
|
||||
|
||||
|
||||
def test_cannot_branch_from_non_uploaded_branch(neon_env_builder: NeonEnvBuilder):
|
||||
"""
|
||||
Branch should not be possible to create because ancestor has not been uploaded.
|
||||
"""
|
||||
|
||||
env = neon_env_builder.init_configs()
|
||||
env.start()
|
||||
|
||||
env.pageserver.allowed_errors.append(
|
||||
".*request{method=POST path=/v1/tenant/.*/timeline request_id=.*}: request was dropped before completing.*"
|
||||
)
|
||||
ps_http = env.pageserver.http_client()
|
||||
|
||||
# pause all uploads
|
||||
ps_http.configure_failpoints(("before-upload-index-pausable", "pause"))
|
||||
ps_http.tenant_create(env.initial_tenant)
|
||||
|
||||
def start_creating_timeline():
|
||||
with pytest.raises(RequestException):
|
||||
ps_http.timeline_create(
|
||||
env.pg_version, env.initial_tenant, env.initial_timeline, timeout=60
|
||||
)
|
||||
|
||||
t = threading.Thread(target=start_creating_timeline)
|
||||
try:
|
||||
t.start()
|
||||
|
||||
wait_until_paused(env, "before-upload-index-pausable")
|
||||
|
||||
branch_id = TimelineId.generate()
|
||||
|
||||
with pytest.raises(RetryError, match="too many 503 error responses"):
|
||||
ps_http.timeline_create(
|
||||
env.pg_version,
|
||||
env.initial_tenant,
|
||||
branch_id,
|
||||
ancestor_timeline_id=env.initial_timeline,
|
||||
)
|
||||
|
||||
with pytest.raises(
|
||||
PageserverApiException,
|
||||
match=f"NotFound: Timeline {env.initial_tenant}/{branch_id} was not found",
|
||||
):
|
||||
ps_http.timeline_detail(env.initial_tenant, branch_id)
|
||||
# important to note that a task might still be in progress to complete
|
||||
# the work, but will never get to that because we have the pause
|
||||
# failpoint
|
||||
finally:
|
||||
# FIXME: paused uploads bother shutdown
|
||||
env.pageserver.stop(immediate=True)
|
||||
|
||||
t.join()
|
||||
|
||||
|
||||
def test_competing_branchings_from_loading_race_to_ok_or_err(neon_env_builder: NeonEnvBuilder):
|
||||
"""
|
||||
If the activate only after upload is used, then retries could become competing.
|
||||
"""
|
||||
|
||||
env = neon_env_builder.init_configs()
|
||||
env.start()
|
||||
|
||||
env.pageserver.allowed_errors.append(
|
||||
".*request{method=POST path=/v1/tenant/.*/timeline request_id=.*}: request was dropped before completing.*"
|
||||
)
|
||||
env.pageserver.allowed_errors.append(
|
||||
".*Error processing HTTP request: InternalServerError\\(Timeline .*/.* already exists in pageserver's memory"
|
||||
)
|
||||
ps_http = env.pageserver.http_client()
|
||||
|
||||
# pause all uploads
|
||||
ps_http.configure_failpoints(("before-upload-index-pausable", "pause"))
|
||||
ps_http.tenant_create(env.initial_tenant)
|
||||
|
||||
def start_creating_timeline():
|
||||
ps_http.timeline_create(
|
||||
env.pg_version, env.initial_tenant, env.initial_timeline, timeout=60
|
||||
)
|
||||
|
||||
create_root = threading.Thread(target=start_creating_timeline)
|
||||
|
||||
branch_id = TimelineId.generate()
|
||||
|
||||
queue: SimpleQueue[Union[Dict[Any, Any], Exception]] = SimpleQueue()
|
||||
barrier = threading.Barrier(3)
|
||||
|
||||
def try_branch():
|
||||
barrier.wait()
|
||||
barrier.wait()
|
||||
try:
|
||||
ret = ps_http.timeline_create(
|
||||
env.pg_version,
|
||||
env.initial_tenant,
|
||||
branch_id,
|
||||
ancestor_timeline_id=env.initial_timeline,
|
||||
timeout=5,
|
||||
)
|
||||
queue.put(ret)
|
||||
except Exception as e:
|
||||
queue.put(e)
|
||||
|
||||
threads = [threading.Thread(target=try_branch) for _ in range(2)]
|
||||
|
||||
try:
|
||||
create_root.start()
|
||||
|
||||
for t in threads:
|
||||
t.start()
|
||||
|
||||
wait_until_paused(env, "before-upload-index-pausable")
|
||||
|
||||
barrier.wait()
|
||||
ps_http.configure_failpoints(("before-upload-index-pausable", "off"))
|
||||
barrier.wait()
|
||||
|
||||
# now both requests race to branch, only one can win because they take gc_cs, Tenant::timelines or marker files
|
||||
first = queue.get()
|
||||
second = queue.get()
|
||||
|
||||
log.info(first)
|
||||
log.info(second)
|
||||
|
||||
(succeeded, failed) = (first, second) if isinstance(second, Exception) else (second, first)
|
||||
assert isinstance(failed, Exception)
|
||||
assert isinstance(succeeded, Dict)
|
||||
|
||||
# FIXME: there's probably multiple valid status codes:
|
||||
# - Timeline 62505b9a9f6b1d29117b1b74eaf07b12/56cd19d3b2dbcc65e9d53ec6ca304f24 already exists
|
||||
# - whatever 409 response says, but that is a subclass of PageserverApiException
|
||||
assert isinstance(failed, PageserverApiException)
|
||||
assert succeeded["state"] == "Active"
|
||||
finally:
|
||||
# we might still have the failpoint active
|
||||
env.pageserver.stop(immediate=True)
|
||||
|
||||
# pytest should nag if we leave threads unjoined
|
||||
for t in threads:
|
||||
t.join()
|
||||
create_root.join()
|
||||
|
||||
|
||||
def test_non_uploaded_branch_availability_after_restart(neon_env_builder: NeonEnvBuilder):
|
||||
"""
|
||||
Currently before RFC#27 we keep and continue uploading branches which were not successfully uploaded before shutdown.
|
||||
|
||||
This test likely duplicates some other test, but it's easier to write one than to make sure there will be a failing test when the rfc is implemented.
|
||||
"""
|
||||
|
||||
env = neon_env_builder.init_configs()
|
||||
env.start()
|
||||
|
||||
env.pageserver.allowed_errors.append(
|
||||
".*request{method=POST path=/v1/tenant/.*/timeline request_id=.*}: request was dropped before completing.*"
|
||||
)
|
||||
ps_http = env.pageserver.http_client()
|
||||
|
||||
# pause all uploads
|
||||
ps_http.configure_failpoints(("before-upload-index-pausable", "pause"))
|
||||
ps_http.tenant_create(env.initial_tenant)
|
||||
|
||||
def start_creating_timeline():
|
||||
with pytest.raises(RequestException):
|
||||
ps_http.timeline_create(
|
||||
env.pg_version, env.initial_tenant, env.initial_timeline, timeout=60
|
||||
)
|
||||
|
||||
t = threading.Thread(target=start_creating_timeline)
|
||||
try:
|
||||
t.start()
|
||||
|
||||
wait_until_paused(env, "before-upload-index-pausable")
|
||||
finally:
|
||||
# FIXME: paused uploads bother shutdown
|
||||
env.pageserver.stop(immediate=True)
|
||||
t.join()
|
||||
|
||||
# now without a failpoint
|
||||
env.pageserver.start()
|
||||
|
||||
wait_until_tenant_active(ps_http, env.initial_tenant)
|
||||
|
||||
# currently it lives on and will get eventually uploaded, but this will change
|
||||
detail = ps_http.timeline_detail(env.initial_tenant, env.initial_timeline)
|
||||
assert detail["state"] == "Active"
|
||||
|
||||
|
||||
def wait_until_paused(env: NeonEnv, failpoint: str):
|
||||
found = False
|
||||
msg = f"at failpoint {failpoint}"
|
||||
for _ in range(20):
|
||||
time.sleep(1)
|
||||
found = env.pageserver.log_contains(msg) is not None
|
||||
if found:
|
||||
break
|
||||
assert found
|
||||
|
||||
@@ -3,10 +3,7 @@ import time
|
||||
import pytest
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import NeonEnvBuilder, PgBin, wait_for_last_flush_lsn
|
||||
from fixtures.pageserver.utils import (
|
||||
wait_for_upload_queue_empty,
|
||||
wait_until_tenant_active,
|
||||
)
|
||||
from fixtures.pageserver.utils import wait_for_upload_queue_empty
|
||||
from fixtures.remote_storage import LocalFsStorage, RemoteStorageKind
|
||||
from requests.exceptions import ConnectionError
|
||||
|
||||
@@ -116,8 +113,6 @@ def test_actually_duplicated_l1(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin)
|
||||
time.sleep(1)
|
||||
|
||||
env.pageserver.start()
|
||||
wait_until_tenant_active(pageserver_http, tenant_id)
|
||||
|
||||
message = f".*duplicated L1 layer layer={l1_found.name}"
|
||||
env.pageserver.allowed_errors.append(message)
|
||||
|
||||
|
||||
@@ -10,7 +10,6 @@ of the pageserver are:
|
||||
"""
|
||||
|
||||
|
||||
import enum
|
||||
import re
|
||||
import time
|
||||
from typing import Optional
|
||||
@@ -82,7 +81,7 @@ def generate_uploads_and_deletions(
|
||||
f"""
|
||||
INSERT INTO foo (id, val)
|
||||
SELECT g, '{data}'
|
||||
FROM generate_series(1, 200) g
|
||||
FROM generate_series(1, 20000) g
|
||||
ON CONFLICT (id) DO UPDATE
|
||||
SET val = EXCLUDED.val
|
||||
""",
|
||||
@@ -117,10 +116,6 @@ def get_deletion_queue_submitted(ps_http) -> int:
|
||||
return get_metric_or_0(ps_http, "pageserver_deletion_queue_submitted_total")
|
||||
|
||||
|
||||
def get_deletion_queue_validated(ps_http) -> int:
|
||||
return get_metric_or_0(ps_http, "pageserver_deletion_queue_validated_total")
|
||||
|
||||
|
||||
def get_deletion_queue_dropped(ps_http) -> int:
|
||||
return get_metric_or_0(ps_http, "pageserver_deletion_queue_dropped_total")
|
||||
|
||||
@@ -277,29 +272,13 @@ def test_deferred_deletion(neon_env_builder: NeonEnvBuilder):
|
||||
assert get_deletion_queue_unexpected_errors(ps_http) == 0
|
||||
|
||||
|
||||
class KeepAttachment(str, enum.Enum):
|
||||
KEEP = "keep"
|
||||
LOSE = "lose"
|
||||
|
||||
|
||||
class ValidateBefore(str, enum.Enum):
|
||||
VALIDATE = "validate"
|
||||
NO_VALIDATE = "no-validate"
|
||||
|
||||
|
||||
@pytest.mark.parametrize("keep_attachment", [KeepAttachment.KEEP, KeepAttachment.LOSE])
|
||||
@pytest.mark.parametrize("validate_before", [ValidateBefore.VALIDATE, ValidateBefore.NO_VALIDATE])
|
||||
@pytest.mark.parametrize("keep_attachment", [True, False])
|
||||
def test_deletion_queue_recovery(
|
||||
neon_env_builder: NeonEnvBuilder,
|
||||
pg_bin: PgBin,
|
||||
keep_attachment: KeepAttachment,
|
||||
validate_before: ValidateBefore,
|
||||
neon_env_builder: NeonEnvBuilder, pg_bin: PgBin, keep_attachment: bool
|
||||
):
|
||||
"""
|
||||
:param keep_attachment: whether to re-attach after restart. Else, we act as if some other
|
||||
:param keep_attachment: If true, we re-attach after restart. Else, we act as if some other
|
||||
node took the attachment while we were restarting.
|
||||
:param validate_before: whether to wait for deletions to be validated before restart. This
|
||||
makes them elegible to be executed after restart, if the same node keeps the attachment.
|
||||
"""
|
||||
neon_env_builder.enable_generations = True
|
||||
neon_env_builder.enable_pageserver_remote_storage(
|
||||
@@ -309,20 +288,12 @@ def test_deletion_queue_recovery(
|
||||
|
||||
ps_http = env.pageserver.http_client()
|
||||
|
||||
failpoints = [
|
||||
# Prevent deletion lists from being executed, to build up some backlog of deletions
|
||||
("deletion-queue-before-execute", "return"),
|
||||
]
|
||||
|
||||
if validate_before == ValidateBefore.NO_VALIDATE:
|
||||
failpoints.append(
|
||||
# Prevent deletion lists from being validated, we will test that they are
|
||||
# dropped properly during recovery. 'pause' is okay here because we kill
|
||||
# the pageserver with immediate=true
|
||||
("control-plane-client-validate", "pause")
|
||||
)
|
||||
|
||||
ps_http.configure_failpoints(failpoints)
|
||||
# Prevent deletion lists from being executed, to build up some backlog of deletions
|
||||
ps_http.configure_failpoints(
|
||||
[
|
||||
("deletion-queue-before-execute", "return"),
|
||||
]
|
||||
)
|
||||
|
||||
generate_uploads_and_deletions(env)
|
||||
|
||||
@@ -334,25 +305,10 @@ def test_deletion_queue_recovery(
|
||||
assert get_deletion_queue_unexpected_errors(ps_http) == 0
|
||||
assert get_deletion_queue_dropped_lsn_updates(ps_http) == 0
|
||||
|
||||
if validate_before == ValidateBefore.VALIDATE:
|
||||
|
||||
def assert_validation_complete():
|
||||
assert get_deletion_queue_submitted(ps_http) == get_deletion_queue_validated(ps_http)
|
||||
|
||||
wait_until(20, 1, assert_validation_complete)
|
||||
|
||||
# The validatated keys statistic advances before the header is written, so we
|
||||
# also wait to see the header hit the disk: this seems paranoid but the race
|
||||
# can really happen on a heavily overloaded test machine.
|
||||
def assert_header_written():
|
||||
assert (env.pageserver.workdir / "deletion" / "header-01").exists()
|
||||
|
||||
wait_until(20, 1, assert_header_written)
|
||||
|
||||
log.info(f"Restarting pageserver with {before_restart_depth} deletions enqueued")
|
||||
env.pageserver.stop(immediate=True)
|
||||
|
||||
if keep_attachment == KeepAttachment.LOSE:
|
||||
if not keep_attachment:
|
||||
some_other_pageserver = 101010
|
||||
assert env.attachment_service is not None
|
||||
env.attachment_service.attach_hook(env.initial_tenant, some_other_pageserver)
|
||||
@@ -371,17 +327,14 @@ def test_deletion_queue_recovery(
|
||||
ps_http.deletion_queue_flush(execute=True)
|
||||
wait_until(10, 1, lambda: assert_deletion_queue(ps_http, lambda n: n == 0))
|
||||
|
||||
if keep_attachment == KeepAttachment.KEEP or validate_before == ValidateBefore.VALIDATE:
|
||||
# - If we kept the attachment, then our pre-restart deletions should execute
|
||||
# because on re-attach they were from the immediately preceding generation
|
||||
# - If we validated before restart, then the deletions should execute because the
|
||||
# deletion queue header records a validated deletion list sequence number.
|
||||
if keep_attachment:
|
||||
# If we kept the attachment, then our pre-restart deletions should have executed
|
||||
# successfully
|
||||
assert get_deletion_queue_executed(ps_http) == before_restart_depth
|
||||
else:
|
||||
env.pageserver.allowed_errors.extend([".*Dropping stale deletions.*"])
|
||||
|
||||
# If we lost the attachment, we should have dropped our pre-restart deletions.
|
||||
assert get_deletion_queue_dropped(ps_http) == before_restart_depth
|
||||
env.pageserver.allowed_errors.extend([".*Dropping stale deletions.*"])
|
||||
|
||||
assert get_deletion_queue_unexpected_errors(ps_http) == 0
|
||||
assert get_deletion_queue_dropped_lsn_updates(ps_http) == 0
|
||||
@@ -397,73 +350,3 @@ def test_deletion_queue_recovery(
|
||||
|
||||
assert get_deletion_queue_unexpected_errors(ps_http) == 0
|
||||
assert get_deletion_queue_dropped_lsn_updates(ps_http) == 0
|
||||
|
||||
|
||||
def test_emergency_mode(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin):
|
||||
neon_env_builder.enable_generations = True
|
||||
neon_env_builder.enable_pageserver_remote_storage(
|
||||
RemoteStorageKind.MOCK_S3,
|
||||
)
|
||||
env = neon_env_builder.init_start(initial_tenant_conf=TENANT_CONF)
|
||||
|
||||
ps_http = env.pageserver.http_client()
|
||||
|
||||
generate_uploads_and_deletions(env)
|
||||
|
||||
env.pageserver.allowed_errors.extend(
|
||||
[
|
||||
# When the pageserver can't reach the control plane, it will complain
|
||||
".*calling control plane generation validation API failed.*",
|
||||
# Emergency mode is a big deal, we log errors whenever it is used.
|
||||
".*Emergency mode!.*",
|
||||
]
|
||||
)
|
||||
|
||||
# Simulate a major incident: the control plane goes offline
|
||||
assert env.attachment_service is not None
|
||||
env.attachment_service.stop()
|
||||
|
||||
# Remember how many validations had happened before the control plane went offline
|
||||
validated = get_deletion_queue_validated(ps_http)
|
||||
|
||||
generate_uploads_and_deletions(env, init=False)
|
||||
|
||||
# The running pageserver should stop progressing deletions
|
||||
time.sleep(10)
|
||||
assert get_deletion_queue_validated(ps_http) == validated
|
||||
|
||||
# Restart the pageserver: ordinarily we would _avoid_ doing this during such an
|
||||
# incident, but it might be unavoidable: if so, we want to be able to start up
|
||||
# and serve clients.
|
||||
env.pageserver.stop() # Non-immediate: implicitly checking that shutdown doesn't hang waiting for CP
|
||||
env.pageserver.start(
|
||||
overrides=("--pageserver-config-override=control_plane_emergency_mode=true",)
|
||||
)
|
||||
|
||||
# The pageserver should provide service to clients
|
||||
generate_uploads_and_deletions(env, init=False)
|
||||
|
||||
# The pageserver should neither validate nor execute any deletions, it should have
|
||||
# loaded the DeletionLists from before though
|
||||
time.sleep(10)
|
||||
assert get_deletion_queue_depth(ps_http) > 0
|
||||
assert get_deletion_queue_validated(ps_http) == 0
|
||||
assert get_deletion_queue_executed(ps_http) == 0
|
||||
|
||||
# When the control plane comes back up, normal service should resume
|
||||
env.attachment_service.start()
|
||||
|
||||
ps_http.deletion_queue_flush(execute=True)
|
||||
assert get_deletion_queue_depth(ps_http) == 0
|
||||
assert get_deletion_queue_validated(ps_http) > 0
|
||||
assert get_deletion_queue_executed(ps_http) > 0
|
||||
|
||||
# The pageserver should work fine when subsequently restarted in non-emergency mode
|
||||
env.pageserver.stop() # Non-immediate: implicitly checking that shutdown doesn't hang waiting for CP
|
||||
env.pageserver.start()
|
||||
|
||||
generate_uploads_and_deletions(env, init=False)
|
||||
ps_http.deletion_queue_flush(execute=True)
|
||||
assert get_deletion_queue_depth(ps_http) == 0
|
||||
assert get_deletion_queue_validated(ps_http) > 0
|
||||
assert get_deletion_queue_executed(ps_http) > 0
|
||||
|
||||
@@ -17,8 +17,6 @@ def test_pageserver_restarts_under_worload(neon_simple_env: NeonEnv, pg_bin: PgB
|
||||
n_restarts = 10
|
||||
scale = 10
|
||||
|
||||
env.pageserver.allowed_errors.append(".*query handler.*failed.*Shutting down")
|
||||
|
||||
def run_pgbench(connstr: str):
|
||||
log.info(f"Start a pgbench workload on pg {connstr}")
|
||||
pg_bin.run_capture(["pgbench", "-i", f"-s{scale}", connstr])
|
||||
|
||||
@@ -45,11 +45,14 @@ def test_tenant_delete_smoke(
|
||||
[
|
||||
# The deletion queue will complain when it encounters simulated S3 errors
|
||||
".*deletion executor: DeleteObjects request failed.*",
|
||||
# lucky race with stopping from flushing a layer we fail to schedule any uploads
|
||||
".*layer flush task.+: could not flush frozen layer: update_metadata_file",
|
||||
]
|
||||
)
|
||||
|
||||
# lucky race with stopping from flushing a layer we fail to schedule any uploads
|
||||
env.pageserver.allowed_errors.append(
|
||||
".*layer flush task.+: could not flush frozen layer: update_metadata_file"
|
||||
)
|
||||
|
||||
ps_http = env.pageserver.http_client()
|
||||
|
||||
# first try to delete non existing tenant
|
||||
@@ -191,9 +194,11 @@ def test_delete_tenant_exercise_crash_safety_failpoints(
|
||||
)
|
||||
|
||||
if simulate_failures:
|
||||
env.pageserver.allowed_errors.append(
|
||||
# The deletion queue will complain when it encounters simulated S3 errors
|
||||
".*deletion executor: DeleteObjects request failed.*",
|
||||
env.pageserver.allowed_errors.extend(
|
||||
[
|
||||
# The deletion queue will complain when it encounters simulated S3 errors
|
||||
".*deletion executor: DeleteObjects request failed.*",
|
||||
]
|
||||
)
|
||||
|
||||
ps_http = env.pageserver.http_client()
|
||||
@@ -288,10 +293,6 @@ def test_tenant_delete_is_resumed_on_attach(
|
||||
neon_env_builder.enable_pageserver_remote_storage(remote_storage_kind)
|
||||
|
||||
env = neon_env_builder.init_start(initial_tenant_conf=MANY_SMALL_LAYERS_TENANT_CONFIG)
|
||||
env.pageserver.allowed_errors.append(
|
||||
# lucky race with stopping from flushing a layer we fail to schedule any uploads
|
||||
".*layer flush task.+: could not flush frozen layer: update_metadata_file"
|
||||
)
|
||||
|
||||
tenant_id = env.initial_tenant
|
||||
|
||||
|
||||
@@ -752,9 +752,6 @@ def test_ignore_while_attaching(
|
||||
env.pageserver.allowed_errors.append(
|
||||
f".*Tenant {tenant_id} will not become active\\. Current state: Stopping.*"
|
||||
)
|
||||
# An endpoint is starting up concurrently with our detach, it can
|
||||
# experience RPC failure due to shutdown.
|
||||
env.pageserver.allowed_errors.append(".*query handler.*failed.*Shutting down")
|
||||
|
||||
data_id = 1
|
||||
data_secret = "very secret secret"
|
||||
|
||||
Reference in New Issue
Block a user